Running Apache Kafka On Containers
Apache Kafka is one of the most famous data stores. It’s a go-to tool to collect streaming data at scale and process them with either Kafka streams or Apache Spark.
Getting started with Kafka is challenging as it involves familiarizing a lot of new concepts such as topics, replication, and offsets, and then you have to understand what a Zookeeper is.
If only there was a better way to get started with Kafka. Well, look on further. Confluent comes to your rescue, Confluent is a company specializing in Kafka with their cloud-based offering called Confluent cloud.
Why are we talking about a company with a SaaS offering, AWS has Managed Streaming Kafka why not them? Well, the reason is simple, Confluent has one focus which is Kafka and they have created some great tools to help with Kafka such as KsqlDB that allows us to query streaming data (It’s amazing, try it).
Apart from that Confluent has great articles on understanding Kafka internals and is one of the biggest contributors to the Kafka open-source project.
Kafka with Docker
To get started with Kafka on Docker, we are going to use confluent Kafka images.
- Create a docker-compose.yaml file with one zookeeper and one Kafka container:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Start both containers in detached mode:
docker-compose up -d
docker-compose starts zookeeper on port 2181
and Kafka on port 9092
along with some configurations:
- Zookeeper
ZOOKEEPER_CLIENT_PORT
- Port where Zookeeper would be available.ZOOKEEPER_TICK_TIME
- the length of a single tick.
- Kafka
KAFKA_ZOOKEEPER_CONNECT
- Instructs Kafka how to connect to ZooKeeper.KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- Defines key/value pairs for the security protocol to use, per listener name.KAFKA_ADVERTISED_LISTENERS
- A comma-separated list of listeners with their host/IP and port. Read more about kafka listeners here.KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
- Equivalent of broker configurationoffsets.topic.replication.factor
which is the replication factor for the offsets topic. Since we are running with just one Kafka node, we need to set this to 1.
Read nore about how the connection to kafka broker works in a docker container here
Create Topics in Kafka
Kafka topics are like database tables. Just like a database, we have to create a table to start storing the data, for Kafka we have to create a topic.
Unlike a database that has a command to create a database, Kafka comes with some utility scripts, one of which is to create a topic that requires mandatory input as the topic name and a few other optional arguments.
- Log in to the Kafka container
docker-compose exec kafka bash
- Create a topic by the name
kafka-test
/usr/bin/kafka-topics \ --bootstrap-server broker:9092 \ --create \ --topic kafka-test Created topic kafka-test.
Try running this command again and you will get this error:
Error while executing the topic command: Topic ‘kafka-test’ already exists.
Not so much CI friendly right ?? --if-not-exists
allows you to create a topic if it doesn’t exist and retuns exit code 0.
/usr/bin/kafka-topics \
--bootstrap-server broker:9092 \
--create \
--if-not-exists \
--topic kafka-test
There are a couple of other arguments that are essential for a good understanding of Kafka:
--partitions
- Kafka topics are partitioned i.e the data of topics are spread across multiple brokers for scalability.--replication-factor
- To make data in a topic fault-tolerant and highly-available, every topic can be replicated, so that there are always multiple brokers that have a copy of the data.
What’s a Kafka Message
Once we have the topic created, we can start sending messages to the topic. A Message consists of headers, a key, and a value. Let’s talk about each of these.
-
Headers
- Headers are key-value pairs and give the ability to add some metadata about the kafka message. Read the original KIP(Kafka Improvement Proposals) proposing headers here. -
Key
- Key for the kafka message. The key value can be null. Randomly chosen keys (i.e. serial numbers and UUID) are the best example of message keys. Read more about when you should use a key here. -
Value
- Actual data to be stored in kafka. Could be a string, json, Protobuf, or AVRO data format.
Writing to a Kafka Topic
Kafka provides a Producer API to send a message to the Kafka topic. This API is available in java with kafka-clients library and python with kafka-python package.
Luckily for us, we don’t have to use any of these. Kafka comes with an out of box script kafka-console-producer
that allows us to write data to a kafka topic.
Run the command and as soon as the command returns >
with a new line, enter the Json message:
/usr/bin/kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic kafka-test
>{"tickers": [{"name": "AMZN", "price": 1902}, {"name": "MSFT", "price": 107}, {"name": "AAPL", "price": 215}]}
We have successfully sent a message to the topic.
Enter
Control + C
to stop the script.
however this message was sent without any key, To send a key we have to set the properties parse.key
to allow sending the key.
Default key separator is
\t(tab)
,we can change it by setting the propertykey.separator
. Eg: –property “key.separator=:”
Let’s try to send a message with a random key stocks-123
this time:
/usr/bin/kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic kafka-test \
--property "parse.key=true"
>stocks-123 {"tickers": [{"name": "AMZN", "price": 1902}, {"name": "MSFT", "price": 107}, {"name": "AAPL", "price": 215}]}
With the release of kafka version 3.2.0, it’s possible to send headers using ConsoleProducer by setting the property parse.headers
to true.
Headers are metadata about the kafka message, souce of these stock prices would be a good candidate for the headers. Let’s add a header key as stock_source
and value as nyse
to the Kafka message :
/usr/bin/kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic kafka-test \
--property "parse.key=true" \
--property "parse.headers=true"
>stock_source:nyse stocks-123 {"tickers": [{"name": "AMZN", "price": 1902}, {"name": "MSFT", "price": 107}, {"name": "AAPL", "price": 215}]}
Now we have successfully sent a kafka message with a header, key and value.
Check out supported properties for kafka-consoler-producer here.
Reading from a Kafka Topic
Kafka provides a Consumer API to read messages from a Kafka topic. This API is available in java with kafka-clients library and python with kafka-python package.
Kafka comes with an out of box script kafka-console-consumer
to read messages from the kafka topic:
/usr/bin/kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic kafka-test
However, this command only prints the values of the kafka message. To print the key and headers, we have to set the properties print.headers
, print.key
to true. We can also print the timestamp of the message with the property print.timestamp
.
/usr/bin/kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic kafka-test \
--property print.headers=true \
--property print.key=true \
--property print.timestamp=true
There is other information such as partition
and offset
, they can be printed by setting the properties --property print.offset=true
and --property print.partition=true
.
Everytime we read from the a kafka topic, Kafka keeps track of the last offset the consumer read from and allows you to read from that point next time, however we can always read from the beginning using the arguments --from-beginning
.
To always read from a kafka topic from the beginning:
/usr/bin/kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic kafka-test \
--from-beginning \
--property print.headers=true \
--property print.key=true \
--property print.timestamp=true
If you think, what we discussed above is too much to remember then don’t worry We have an easier way of reading and writing from Kafka topics.
kcat Utility
kcat is an awesome tool to make our life easier, it allows us to read and write from kafka topics without tons of scripts and in a more user-friendly way.
As Confluent puts it, “It is a swiss-army knife of tools for inspecting and creating data in Kafka”
kcat
has two modes, it runs in producer mode by specifying the argument -P
and consumer mode by specifying the argument -C
.It also automatically selects its mode depending on the terminal or pipe type. If data is being piped to kcat it will automatically select producer (-P) mode. If data is being piped from kcat (e.g. standard terminal output) it will automatically select consumer (-C) mode.
- To read data from kafka topics, simply run
kcat -b localhost:9092 -t kafka-test
- To write data to a Kafka topic, run
kcat -P -b localhost:9092 -t kafka-test
Take a look at the examples here to find out more about the usage.
here are some tips and tricks of using Kafka.