Monday, February 1, 2016

Kafka Fun

In this post, I will show you how to set up Kafka locally via three brokers and how to get Producer and Consumer going.

Installation


First of all, download Kafka from http://kafka.apache.org/downloads.html.
(Make sure to download binary version, not the source!)

I chose to go with version 8 only because I rather avoid x.0 version of Kafka 9.0. Nothing in my experience works well when it is x.0 version. I only work with x.1 and above :) (j.k. of course)

Download it into directory of your choice and untar it

tar xzf kafka_2.11-0.8.2.1.tgz

Create log directories for three brokers


Now since we will be creating three brokers on the same machine, we need to create three separate log directories in order to avoid these three to be overwriting each other logs.

Als-MacBook-Pro:kafka alkrinker$ mkdir broker-1-log
Als-MacBook-Pro:kafka alkrinker$ mkdir broker-2-log
Als-MacBook-Pro:kafka alkrinker$ mkdir broker-3-log

Start zookeeper


Kafka requires zookeeper. Conveniently enough, Kafka comes with its own version of zookeeper. This version of the zookeeper is not meant for production, more for test and demo. Let's start it.

cd kafka_2.11-0.8.2.1/
bin/zookeeper-server-start.sh config/zookeeper.properties &

Check if zookeeper is running

ps -ef | grep zookeeper

You should see something like that

Als-MacBook-Pro:kafka_2.11-0.8.2.1 alkrinker$ ps -ef | grep zookeeper
  501   849   596   0  5:49PM ttys000    0:00.55 /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java -Xmx512M -Xms512M -server -XX:+UseParNewGC /kafka
......
/kafka_2.11-0.8.2.1/bin/../core/build/libs/kafka_2.10*.jar org.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper.properties


Up to this point, we started built in zookeeper with default property file and created three log directories for each broker

Configure brokers


Each broker would require its own configuration file in order to run. Once again, Kafka comes with default configuration file located in

config/server.properties

Let's copy this file three times for each broker so we can have something like

config/server1.properties
config/server2.properties
config/server3.properties

Open server1.properties and change following properties to provided values
--- server 1 ---
broker.id=1
port=9091
log.dirs=/Users/alkrinker/Documents/Projects/kafka/broker-1-log

--- server 2 ---
broker.id=2
port=9092
log.dirs=/Users/alkrinker/Documents/Projects/kafka/broker-2-log

--- server 3 ---
broker.id=3
port=9093
log.dirs=/Users/alkrinker/Documents/Projects/kafka/broker-3-log

Start brokers


Time to start your brokers!
$ bin/kafka-server-start.sh config/server1.properties &
$ bin/kafka-server-start.sh config/server2.properties &
$ bin/kafka-server-start.sh config/server3.properties &


Use grep kafka to make sure you have 3 brokers started (plus zookeeper)
Output should look something like that
Als-MacBook-Pro:Installation Files alkrinker$ ps -ef | grep kafka
  501   849   596   0  5:49PM ttys000    0:04.40 /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java -Xmx512M -Xms512M -server -XX:+UseParNewGC 

...........
org.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper.properties
 


 501   969   596   0  8:44PM ttys000    0:02.20 /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java -Xmx1G -Xms1G -server -XX:+UseParNewGC 
.......
/kafka/kafka_2.11-0.8.2.1/bin/../libs/zkclient-0.3.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../libs/zookeeper-3.4.6.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../core/build/libs/kafka_2.10*.jar kafka.Kafka config/server1.properties
  

501   975   596   0  8:44PM ttys000    0:01.68 /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java -Xmx1G -Xms1G -server -XX:+UseParNewGC 
.......
/kafka/kafka_2.11-0.8.2.1/bin/../libs/zkclient-0.3.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../libs/zookeeper-3.4.6.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../core/build/libs/kafka_2.10*.jar kafka.Kafka config/server2.properties

  501   978   596   0  8:44PM ttys000    0:01.61 /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java -Xmx1G -Xms1G -server -XX:+UseParNewGC 

......

/kafka/kafka_2.11-0.8.2.1/bin/../libs/snappy-java-1.1.1.6.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../libs/zkclient-0.3.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../libs/zookeeper-3.4.6.jar:/Users/alkrinker/Documents/Projects/kafka/kafka_2.11-0.8.2.1/bin/../core/build/libs/kafka_2.10*.jar kafka.Kafka config/server3.properties
 


 501   984   826   0  8:45PM ttys002    0:00.00 grep kafka


Create topic


(See man page for kafka-topics.sh for more details and all available options)

$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic krinker --partitions 3 --replication-factor 3

Let's go over this line
--zookeeper localhost:2181
first we specify where is our zookeeper. We are using the default zookeeper here
--create --topic krinker
creates topic called krinker
--partitions 3 --replication-factor 3
since we have 3 brokers we can safely do 3 partitions with replication factor of 3. Note that we cant specify more than 3 here since we have only 3 brokers

After it let's what what we created


$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic krinker

Topic:krinker    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: krinker    Partition: 0    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
    Topic: krinker    Partition: 1    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
    Topic: krinker    Partition: 2    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2


Start Producer


Let's start producer on broker 1
$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic krinker

Start Consumer


And let's start consumer to be able to read what our producer would write to topic krinker

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic krinker

See it in Action!


Now go back to the terminal where you started your producer and type whatever you like

Hello World



Now go to the consumer window and you shall see Hello World displayed there.


Shut it down



$ bin/kafka-server-stop.sh config/server1.properties
$ bin/kafka-server-stop.sh config/server2.properties
$ bin/kafka-server-stop.sh config/server3.properties
$ bin/zookeeper-server-stop.sh config/zookeeper.properties

Verify
$ ps aux | grep kafka
alkrinker        1115   0.0  0.0  2432784    628 s000  S+    9:30PM   0:00.00 grep kafka