Wednesday, September 24, 2014

Apache Kafka. Set up. Writing to. Reading from. Monitoring. Part 4

Now that we have Kafka cluster up and running (Part 1 Part 2), and we are able to monitor it (Part 3), we need to learn how to write to and read from it by using Java.

I have created and uploaded my projects onto github so feel free to download the code and follow along. It is configured to work with IP addresses and topics that we created and configured in Part 1 and 2.

Producer code
First you need to specify connection information. This is as simple as specifying broker list IPs and ports.
// List of brokers that the producer will try to connect to
props.put("metadata.broker.list", "192.168.33.21:9092,192.168.33.22:9092");

Inside of properties you can set number of different choices and flags. Later you would use it to create ProducerConfig object that would in turn be used to create Producer object that would be able to send out messages to Kafka cluster.

Then select a topic that you want to write to
String topic = "my_test_topic" ;

Construct a message that would contain your topic, message and id and use Producer object to send it.

String msg = "Test message # " + 1 ;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, String.valueOf(1), msg);
producer.send(data);

Consumer code
This code is a little bit more complex and I had to use lots of tricks to make sure that I will be able to read from a topic of interest. Basically, if you were not careful with offsets, you would be able to read anything... I tried to comment the code as much as possible so that it would be self explanatory, but the best way to understand it all is by running it with debug and see the process flow.

Hope you enjoyed this start to end Kafka cluster guide and don't hesitate to reach out to me with comments or questions!

Apache Kafka. Set up. Writing to. Reading from. Monitoring. Part 3

Now that we were able to communicate with our Kafka cluster by writing to and reading from it, we might be curious what we have there. What brokers we have, what are the offsets on different topics, etc.

The best way to grasp the big picture is tool that can give you nice graphical interface that is lightweight. I was able to find and get KafkaOffsetMonitor working on my cluster.

The instructions on the website are pretty easy to follow along so I won't repeat them here. Main point of this point is to make reader aware of KafkaOffsetMonitor tool to give start to end hands on experience with Kafka.

Enjoy!

Friday, September 12, 2014

SVN to GIT Migration

You need to move your code from SVN to GIT. What do you need to do?

First of all, get yourself familiar with the process and what steps you would need to do: https://www.atlassian.com/pt/git/migration#!migration-overview. In this tutorial, I will use a svn2git tool to help me with migration, but svn2git is a wrapper of git svn clone, so all of the basics would still apply. Now you are wondering why svn2git? Well, when I was using git svn clone described in the above mentioned article, I ran into the issue where SVN had spaces in tags, and git replaced them with %20 and that broke things. Go figure. Here is the issue. So after some digging and trying to resolve it, I came across svn2git project that had a work around for this issue

Download handy svn migration scripts from https://bitbucket.org/atlassian/svn-migration-scripts/downloads. We would use it to generate authors.txt file. You can find more details here.

After you have authors.txt file in hand, let's go ahead and make sure that we can run svn2git. I am on Mac, for Debian-based system refer to the svn2git installation guide.
Run following commands to make sure that you have git-core, git-svn, ruby and rubygems installed on your system. You should have them if you have Xcode installed. If not, install it!
git --help
git svn --help
ruby --help
gem --help

With help of rubygems install svn2get. This would also add it to your PATH
sudo gem install svn2git

Create new directory where you want your converted files to be stored. This directory will become your new local git repository.
mkdir gitcode
cd gitcode
svn2git http://svn.repo.com/path/to/repo --authors /path/to/authors/authors.txt --verbose

Refer to the svn2git installation guide for more options. In my case, I received
'master' did not match any file(s) known to git.
error and had to tweak my command slightly to make it work. Like so,
svn2git http://svn.repo.com/path/to/repo --authors /path/to/authors/authors.txt --verbose --trunk / --nobranches --notags
Basically, my SVN was not properly set up and I had to manually specify trunk location, and there were not branches or tags.

Depending on how much source code you have it can take a while... when it is all done: review the code and push it to a remote repository where everyone will be able to access it.
git remote add origin ssh://server/path/to/repo
git push origin master

That's it!


Tuesday, September 9, 2014

Apache Kafka. Set up. Writing to. Reading from. Monitoring. Part 2

In Part 1, we create single machine that was running Kafka. Now let's do some horizontal scaling!

Step 1. Create new directory and initialize it with the box that we created in Part 1.
mkdir debianKafkaClusterNode2
cd debianKafkaClusterNode2
vagrant init debianKafkaClusterNode2 <path_to_the_box>/debian-kafka-cluster.box

Step 2. Edit generated Vagrant file (See Part 1 for details)
- Make sure that the memory is set to at least 2048
- Change the IP to be 192.168.33.11

Step 3. Start this box up and log in
vagrant up
vagrant ssh

Step 4. Configure Kafka.
Open $KAFKA_HOME/config/server.properties and set following values
broker.id=2
host.name=192.168.33.11

Now, repeat Steps 1-4 for number of boxes that you want to set up for your Kafka cluster. Don't forget to keep track of broker.id and IP (Step 2 and 4) - make sure they are unique!

After you successfully created n number of boxes, bring up your first Kafka cluster box that your created in Part 1. We shall refer to it as Node1
vagrant up
vagrant ssh

Start Zookeeper and Kafka on Node1
sudo $ZK_HOME/bin/zkServer.sh start
sudo $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

Start Kafka on rest of the nodes.
sudo $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

Congrats! You have Kafka cluster! Test it out by going to Node1 and adding few messages to the topic. Use Ctrl+C to exit.
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.33.10:9092, 192.168.33.11:9092 --topic test-topic
this 
is

test

Test that you can retrieve the messages from some other node in the cluster
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.33.10:2181 --topic test-topic --from-beginning

As you may have noticed, we use only one Zookeeper! To add more while still following majority rule, edit $KAFKA_HOME/config/server.properties by setting zookeeper.connect to the list of appropriate machines. This has to be done on each server. Don't forget to change $ZK_HOME/conf/zoo.cfg as well as myid. See Part 1 for more details. For example for 3 machine set up:
zoo.cfg file
server.1=192.168.33.10:2888:3888
server.2=192.168.33.11:2888:3888
server.3=192.168.33.12:2888:3888

echo "2" > /var/zookeeper/data/myid
echo "3" > /var/zookeeper/data/myid

Just to make sure that we have everything at this point, let's shut everything down and start it back up:
On each VM.
exit
vagrant halt

vagrant up
vagrant ssh

(Start Zookeeper and Kafka)
sudo $ZK_HOME/bin/zkServer.sh start
sudo $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

Use commands to add topics and messages and read them. Have fun!

Apache Kafka. Set up. Writing to. Reading from. Monitoring. Part 1

Phew! That was a mouth full... but, basically what I am going to try to do here is:
  1. Show you how to set up your own Apache Kafka cluster.
  2. Write to Apache Kafka and read from it using Kafka supplied shell scripts and then by using Java client.
  3. Finally, we quickly review KafkaOffsetMonitor - an app to monitor your Kafka consumers and their position (offset) in the queue.

First thing first... What is Apache Kafka? Straight from the source: "Apache Kafka is publish-subscribe messaging rethought as a distributed commit log." There are number of documents and guides out there that talk about the architecture and design, so I am not going to restate it here. What we want is some hands on experience with Kafka to test it out... in my case, it is much easier for me to learn things if I can play with them as I read about them.

In order to set up our cluster, let's use Vagrant to help us with the set up and configuration of VMs. This way, we can always reuse it one a different system and most importantly, we will be able to delete VM and start from scratch if we were to mess up without trying to break our heads if we removed something properly from the system or not.

If you never used Vagrant before, consider walking through Getting Started tutorial to truly appreciate it!

Anyway let's download VirtualBox and Vagrant and install them. Please follow installation guides...

Now, create a directory which would hold your VM instance and properly name it so that you could tell it apart from other VMs in the future.
In this case, we are going to use very basic debian since we won't need fancy GUI, etc.
 
mkdir debianKafkaClusterNode1
cd debianKafkaClusterNode1
vagrant init debianKafkaClusterNode1 http://puppet-vagrant-boxes.puppetlabs.com/debian-70rc1-x64-vbox4210.box 
 
After this we are going to have Vagrantfile that can be used to configure basic VM configurations like memory, network configuration, etc. That being said, let's bump up our memory to 2GB. Find and upcomment following lines and set memory to 2048 or however much you want to give to the box. In this tutorial, we are going to set up cluster of 3 machines, so we are going to need 2x3=6GB of memory only for VM, don't forget about your own host system ;)

  config.vm.provider "virtualbox" do |vb|
  #   # Don't boot with headless mode
  #   vb.gui = true
  #
  #   # Use VBoxManage to customize the VM. For example to change memory:
     vb.customize ["modifyvm", :id, "--memory", "2048"]
  end

Since we want for this VM to be able to communicate with other boxes and it might be a good idea to SSH to it from our host system, let's find config.vm.network in the Vagrantfile and uncomment it out. Here we are going to assign it IP: 192.168.33.10

  # Create a private network, which allows host-only access to the machine
  # using a specific IP.
  config.vm.network "private_network", ip: "192.168.33.10"

Now we are ready to start the box!
vagrant up
 
The very first time it might take sometime since Vagrant will attempt to download
debian-70rc1-x64-vbox4210.box from http://puppet-vagrant-boxes.puppetlabs.com/.

Now log into the box
vagrant ssh

And install java and text editor
sudo apt-get update
sudo apt-get install openjdk-8-jdk
sudo apt-get install vim
 
Now we are going to install Apache Kafka by downloading its source code and building it.
sudo su -
wget https://archive.apache.org/dist/kafka/kafka-0.8.0-beta1-src.tgz
mkdir /opt/kafka
tar -zxvf kafka-0.8.0-beta1-src.tgz
cd kafka-0.8.0-beta1-src
./sbt update
./sbt package
./sbt assembly-package-dependency
cd ../
mv kafka-0.8.0-beta1-src /opt/kafka

Install Zookeeper
wget http://apache.claz.org/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
mkdir /opt/zookeeper
tar -zxvf zookeeper-3.4.6.tar.gz --directory /opt/zookeeper
cp /opt/zookeeper/zookeeper-3.4.6/conf/zoo_sample.cfg /opt/zookeeper/zookeeper-3.4.6/conf/zoo.cfg

Configure Zookeeper by creating a directory for zookeeper data
mkdir -p /var/zookeeper/data

and specifying this directory in /opt/zookeeper/zookeeper-3.4.6/conf/zoo.cfg file
by editing dataDir property like so
dataDir=/var/zookeeper/data

In the same file add if not already there value for first zookeeper server. Specify IP of our newly created machine.
server.1=192.168.33.10:2888:3888

Specify myid so that the server would be able to identify itself in the zookeeper cluster
echo "1" > /var/zookeeper/data/myid

Configure Kafka
Edit server.properties files in /opt/kafka/kafka-0.8.0-beta1-src/config
Set following values by finding them in the above mentioned file and making sure they are uncommented as well
broker.id=1
host.name=192.168.33.10
zookeeper.connect=192.168.33.10:2181

It would be much easier to add Zookeeper and Kafka location to the PATH so we don't have to refer to it by the entire path, plus if you later decide to move them you won't have to change hardcoded path. To add them to your environmental variables create or edit if already exists .bash_profile file like so
vim ~/.bash_profile

add following entries to it
export ZK_HOME=/opt/zookeeper/zookeeper-3.4.6/
export KAFKA_HOME=/opt/kafka/kafka-0.8.0-beta1-src/
export PATH=$ZK_HOME/bin:$KAFKA_HOME/bin:$PATH

Make sure to close and open new terminal window for changes to take effect!

Start Zookeeper
sudo $ZK_HOME/bin/zkServer.sh start

Start Kafka
sudo $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

Test Kafka by listing available topics. At first, it should not have any:
$KAFKA_HOME/bin/kafka-list-topic.sh --zookeeper 192.168.33.10:2181

Test Kafka by creating a new topic
$KAFKA_HOME/bin/kafka-create-topic.sh --zookeeper 192.168.33.10:2181 --replica 1 --partition 1 --topic test-topic

Re-run command to list available topics. You should see test-topic in the list

Test Kafka by producing massages tot hat topic from the console. Use ctrl+c when finished.
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.33.10:9092 --topic test-topic
This
is
Kafka
Test

Test Kafka consumer to verify that messages are there for that topic.
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.33.10:2181 --topic test-topic --from-beginning

You should see
This
is
Kafka
Test

If you followed above test steps and everything worked as expected, it is time to package this box so that you can re-use it for our future boxes within cluster. On your host machine execute
VBoxManage list vms

You should see something like this:
"debian-cluster-node-1_default_1409266303013_22617" {3d996de2-94e1-4d72-be8f-29f36150ac84}

Use this name to package it up into a box.
vagrant package --base debian-cluster-node-1_default_1409266303013_22617 --output debian-kafka-cluster.box

Once Vagrant is done you should see debian-kafka-cluster.box in the same directory. Go ahead and shutdown the VM.
vagrant halt

In summary, we created a VM, installed all required software on it (java, zookeeper, kafka) and configured it to work within a cluster. Next we would have to duplicate our box into several machines so that we can have a Kafka cluster.

Friday, September 5, 2014

How to kill MR job

Recently, I needed to kill a job that I executed by mistake. What did I do?


If you are comfortable around Linux, steps are pretty much the same as killing any job. Find the job's pid and kill it... Let's see how you would do it in Hadoop environment:


First of all, I listed current running jobs in Hadoop by executing following command in the shell:


$ bin/hadoop job –list


The output would look something like this:


1 jobs currently running
JobId                  State          StartTime       UserName
job_201203293423_0001   1             1334506474312   krinkera


JobId is what we want.


Now use following command to kill it. Remember to substitute jobId with what you found in previous step.


$ bin/hadoop job -kill <jobId>


You should see confirmation that your job was indeed killed
Killed job job_201204011859_0002


Please note that it might take some time for UI to refresh with the new job status.