Pyspark Stream processing with Kafka

Vignesh Thirunavukkarasu
5 min readApr 23, 2023

--

A detailed example on how to stream process kafka messages, identify Named Entities and visualize Top 10 Entities in Kibana.

Topics Covered — PySpark | Kafka Streaming | ELK Dashboard

Problem Statement

Stream reddit comments from a specific subreddit into a Kafka topic (reddit-comments). Pyspark will process the comments by extracting Named Entities and perform a word count of them and push it to another Kafka topic (word-counts). This topic is being consumed by Logstash and messages are converted into JSON format and pushed to ElasticSearch from where we will create a Top 10 word count dashboard in Kibana.

High level view of the application
High Level view of the application

Setup

We will setup the below components one by one.

  • java
  • Kafka
  • ELK
  • Pyspark
  • Streaming Reddit Comments (python producer)
  • Process Reddit Comments (pyspark consumer)

NOTE: I’ve used specific versions of Pyspark, Kafka & ELK, so please make sure you either use the same version or other compatible versions accordingly.

All the below setup is done in my local machine. In case you are opting to do this in AWS / GCP, make sure that you have all the necessary ports opened.

Install java

To install java, I’ve used OpenJdk 1.8 version. Download the package (as dmg) and install it.

By default it will be installed in /Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home, So set JAVA_HOME dir the same.

export JAVA_HOME=/Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home

To ensure that the export command is applied universally across all processes, it is recommended to include it in the ~/.zshrc file.

Kafka Setup

Download Kafka 3.4.0 and untar the package in your local directory.

Start Kafka server and Zookeeper with the below commands

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &

Ensure that the Kafka versions are correct.

bin/kafka-topics.sh --version

Create two topics (reddit-comments & word-counts)

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic reddit-comments
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic word-counts

List the topics to confirm if they are created properly

bin/kafka-topics.sh --bootstrap-server=localhost:9092 --list

ELK Setup

Download the packages for ElasticSearch, Logstash and Kibana and extract them in your local system. I’ve used #8.7 version

Disable ElasticSearch SSL (https is enabled by default) config/elasticsearch.yml

xpack.security.enabled: false
xpack.security.http.ssl:
enabled: false

For Kibana modify the elasticsearch url from https to http in config/kibana.yml

elasticsearch.hosts: ['http://192.168.10.21:9200']

For Logstash create a new config file logstash.conf. Input block refers to the input stream (add kafka broker url & topic). Output block is the ElasticSearch url and the index to create and filter converts the string to JSON.

input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["word-counts"]
}
}

filter {
json {
source => "message"
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "word-counts"
workers => 1
}
}

Start ELK

Make sure you start ElasticSearch first so that kibana and Logstash can connect without any issues.

bin/elasticsearch &
bin/kibana &
bin/logstash -f logstash.conf &

Pyspark Setup

Before setting up pyspark, I installed conda to ensure that I have a dedicated environment to avoid conflicting libraries.

https://docs.conda.io/projects/continuumio-conda/en/latest/user-guide/install/macos.html

Install the necessary libs in requirements.txt

pip install -r requirements.txt

Let’s talk Code

Python Producer

Setting up Reddit API Token

Reddit needs api token to be setup for streaming comments. I’ve referred to this article from GeeksforGeeks for step-by-step process of setting up the client_id and client_secret.

I’ve used reddit praw to stream comments from a specific subreddit into Kafka. It takes kafka bootstrap server, kafka topic and subreddit name as input arguments.

There are two ways to pass the reddit credentials. One is via CLI arguments and other via .env file. You can choose any one of the option.

.env file sample

user_name=
password=
client_id=
secret=

Cli based approach

python -u comments_producer.py --kafka-topic reddit-comments --bootstrap-server localhost:9092 --subreddit AskReddit --user-name abc --password abc --client-id abc --secret abc

Pyspark Consumer

The code is pretty straightforward. Here the messages in the Kafka topic are consumed by Spark application which reads each messages and loads it as a Dataframe object. Then DF filters are applied to extract the Named Entities using spacy, remove special characters, empty / null strings and, finally sends the word count of messages to the kafka topic word-counts.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 --conf spark.sql.streaming.forceDeleteTempCheckpointLocation=true spark_connector.py --checkpoint-dir /tmp/checkpoint --bootstrap-server localhost:9092 --read-topic reddit-comments --write-topic word-counts

Here, I’ve added the package org.apache.spark:spark-sql-kafka-0–10_2.12:3.1.1 corresponding to that of the pyspark version used. Update them according to your version.

ELK Dashboard

Since we have configured the logstash to read messages from kafka topic, it will send those messages to elasticsearch in the specified index.

In Kibana 8.7 version, I used the discover logs using Index Management. Then, in the Visualization tab choose aggregation option and bucket the data on the words keyword. Finally, customize the total words to be displayed.

Here is a sample dashboard.

Top 10 Named Entities from subreddit topic (30 min time window)

The objective of this is to gain knowledge on a real world use case of PySpark with Kafka streaming. By looking at the count of Named Entities we can we can derive insights on the topics that are actively discussed in the subreddit. This is a simple use case, but the same can be extended to more productive analysis like trying to analyze the stock data based on the discussions on the company by analysing NEWS API’s etc.

Although it may seem easy to set up at first glance, there are numerous challenges involved in implementing each component, such as ensuring compatibility between Java versions, Spark versions, and running PySpark.

Additionally, running all these services on a single machine can cause overloading and slow performance. Therefore, I would recommend running Kafka on AWS or GCP, while keeping the remaining components on the local machine.

The complete project is available on Github for reference.

-Cheers

--

--

Vignesh Thirunavukkarasu
Vignesh Thirunavukkarasu

No responses yet