Friday, August 29, 2014

Apache Kafka Introduction : Should I use Kafka as a message broker?

Found this very good explanation of Kafka (see reference below). Thought I would repost it in case something were to happen to the original blog. Enjoy!
-----------------------------------------------------------------------------------------------------------------------------------

Asynchronous messaging is an important component of any distributed application. Producers and consumers of messages are de-coupled. Producers send messages to a queue or topic. Consumers consume messages from the queue or topic. The consumers do not have to be running when the message is sent. New consumers can be added on the fly. For Java programmers, JMS was and is the popular API for programming messaging applications. ActiveMQ, RabbitMQ , MQSeries (henceforth referred to as traditional brokers) are some of the popular message brokers that are widely used. While these brokers are very popular, they do have some limitations when it comes to internet scale applications. Generally their throughput will max out at few ten thousands of messages per second. Also, in many cases, the broker is a single point of failure.

A message broker is little bit like a database. It takes a message from a producer, stores it. Later a consumer reads the messages. The concepts involved in scaling a message broker are the same concepts as in scaling databases.  Databases are scaled by partitioning the data storage and we have seen that applied in Hadoop, HBASE, Cassandra and many other popular open source projects. Replication adds redundancy and failure tolerance.

A common use case in internet companies is that log messages from thousands of servers need to sent to other servers that do number crunching and analytics. The rate at which messages are produced and consumed is several thousands per sec, much higher than a typical enterprise application. This needs message brokers that can handle internet scale traffic.

Apache Kafka is a open source message broker that claims to support internet scale traffic. Some key highlights of Kafka are
  • Message broker is a cluster of brokers. So there is partitioning and no single point of failure.
  • Producers send messages to Topics.
  • Messages in a Topic are partitioned among brokers so that you are not limited by machine size.
    • For each topic partition 1 broker is a leader
    • leader handles reads and writes
    • followers replicate
  • For redundancy, partitions can be replicated.
  • A topic is like a log file with new messages appended to the end.
  • Messages are deleted after a configurable period of time. Unlike other messaging systems where message is deleted after it is consumed. Consumer can re-consume messages if necessary.
  • Each consumer maintains the position in the log file where it last read.
  • Point to point messaging is implemented using Consumer groups. Consumer groups is a set of consumers with the same groupid. Within a group, each message is delivered to only one member of the group.
  • Every message is delivered at least once to every consumer group. You can get publish subscribe using multiple consumer groups.
  • Ordering of messages is preserved per partition. Partition is assigned to consumer within a consumer group. If you have same number of partitions and consumers in a group, then each consumer is assigned one partition and will get messages from that partition in order.
  • Message delivery: For a producer , once a message is committed, it will be available as long as at least one replica is available. For the consumer, by default, Kafka provides at least once delivery, which means, in case of a crash, the message could be delivered multiple times. However with each consume, Kafka returns the offset in the logfile. The offset can be stored with the message consumed and in the event of a consumer crash, the consumer that takes over can start reading from the stored offset. For both producer and consumer, acknowledgement from broker is configurable.
  • Kafka uses zookeeper to store metadata.
  • Producer API is easy to use. There 2 consumer APIs.
  • High level API is the simple API to use when you don'nt want to manage read offset within the topic. ConsumerConnector is the consumer class in this API and it stores offsets in zookeeper.
  • What they call the Simple API is the hard to use API to be used when you want low level control of read offsets.
  • Relies on filesystem for storage and caching. Caching is file system page cache.
  • O(1) reads and writes since message and written to end of log and read sequentially. Reads and writes are batched for further efficiency.
  • Developed in Scala programming language
Apache Kafka can be downloaded at http://kafka.apache.org/downloads.html.

They have a good starter tutorial at http://kafka.apache.org/documentation.html#quickstart. So I will not repeat it. I will however write a future tutorial for JAVA producers and consumers.

Apache Kafka is a suitable choice for a messaging engine when
  • You have a very high volume of messages - several billion per day
  • You need high through put
  • You need the broker to be highly available
  • You need cross data center replication
  • You messages are logs from web servers
  • Some loss of messages is tolerable
Some concerns that you need to be aware of are
  • Compared to JMS, the APIs are low level and hard to use
  • APIs are not well documented. Documentation does not have javadocs
  • APIs are changing and the product is evolving
  • Default delivery is at least once delivery. Once and only once delivery requires additional work for the application developer
  • Application developer needs to understand lower level storage details like partitions and consumer read offsets within the partition
It is useful to remember history and draw an analogy with NoSQL databases. 3 or 4 years ago Nosql database were hot and people wanted to use them everywhere. Today we know that traditional RDBMS are not going anywhere and the NoSQL databases are suitable for some specialized use cases. In fact NoSQL database are going in the direction of additing features that are available in RDBMSs. Kafka today is where NoSql databases were a few years ago. Don'nt throw away your traditional message broker yet. While Kafka will be great for the cases mentioned above, lot of the simpler messaging use cases can be done lot more easily  with a traditional message broker. 

Resource: http://khangaonkar.blogspot.com/2014/04/apache-kafka-introduction-should-i-use.html

Thursday, August 28, 2014

Getting Twitter data using Python.

A lot of times developers need sample Twitter test data for their apps. Twitter data is used for trending, for analysis of sentiment, etc.

Instead of using some old file with few tweets or registering for some service to give you the data, why not to get the data yourself? It is very easy with Python script that uses Tweepy, a Python library that supports Twitter API.

First of all install Python. I am on MacOS so python was already installed for me. Please refer to this guide that goes in more details about the set up.

Next you need IDE to write your Python script(s). Personally, I use TextWrangler like it was suggested by Dr. Chuck.

Install Tweepy using HomeBrew
  $ brew search pip
  $ sudo easy_install pip
  $ sudo pip install tweepy

Register with Dev Twitter to get your tokens, etc.
Go to https://dev.twitter.com/, sign-in to twitter ( create an account if you don't already have one)
    Click the profile Icon ( top left) -> My Applications -> Create New App
    Provide the necessary data and it will create an application.
    Go to the application -> click on API Keys tab
    This will show you the necessary keys to authenticate your application using OAuth.

Now you are ready to write your script that would query for a phase "Big Data" and would store first 100 results for you in csv file along with the date of the tweet.

#!/usr/bin/python
import tweepy
import csv #Import csv
auth = tweepy.auth.OAuthHandler('XXX', 'XXX')
auth.set_access_token('XX-XXX', 'XXX')

api = tweepy.API(auth)

query = 'Big Data'
max_tweets = 100
# Query for 100 twits that have Big Data in them and store it in a list
searched_tweets = [status for status in tweepy.Cursor(api.search, q=query).items(max_tweets)]
# Print entire object
print searched_tweets

# Open/Create a file to append data
csvFile = open('result.csv', 'a')
#Use csv Writer
csvWriter = csv.writer(csvFile)
counter = 0

for tweet in tweepy.Cursor(api.search,
                    q=query,
                    lang="en").items(max_tweets):
    #Write a row to the csv file. Use utf-8 since twits might have special characters
    csvWriter.writerow([tweet.created_at, tweet.text.encode('utf-8')])
    print tweet.created_at, tweet.text
csvFile.close()

You can examine entire tweeter object that being returned and pull more data if you like by iterating through searched_tweets and pulling each element.

Please refer to this blog post if you want to see Java version of the same concept. 

References:
http://www.pythonlearn.com/install.php
http://sachithdhanushka.blogspot.com/2014/02/mining-twitter-data-using-python.html
http://stackoverflow.com/questions/22469713/managing-tweepy-api-search

Thursday, August 21, 2014

Temporarily Disable Puppet

Every now and then you need to disable puppet on a box to quickly test something and to avoid the situation where puppet would overwrite your changes.


Here is what you do:


# puppet agent --disable "Reason why disabled"


Now when you are done, make sure to re-enable it again


# puppet agent --enable

Saturday, August 16, 2014

How to create SolrCloud Instance.

In this post, I decided to talk about how to quickly get SolrCloud instance up and running on your local box.

First of all, make sure that you have Java 6+ installed on your system.
$ java -version
java version "1.7.0_25"
Java(TM) SE Runtime Environment (build 1.7.0_25-b15)

Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)

Download most recent version of Solr from http://lucene.apache.org/solr/. If you are on Unix, Linux or Mac OS, grab tgz file. Download it somewhere where you would have permissions to untar it and to run it. I downloaded and decompressed it under my home directory in /opt/solr by running:

tar solr-<version>.tgz

In this example, I am going to be using default jetty server, which was proven to scale even on large production systems and since I don't want to further complicate this example. If you were to google "sold jetty vs tomcat", you would see tons of opinions out there on which one to use. In my opinion, use whatever makes more sense in your case.

To run very basic Solr instance, all you have to do is to navigate to example folder and to run start.jar like so

$ cd opt/solr/example/
$ java -jar start.jar

However, we are interested in SolrCloud Instance in this case... so let's create two shards with replication of 2. For detailed discussion of shards, replication and SolrCloud in general please visit SolrCloud wiki.

Copy example directory and name it node1. After that rename collection name to something more useful, like wikipedia. Remove any data that might be there. And do some autodiscovery magic.

$ cp -r example/ node1/
$ cd node1
$ cp -r solr/collection1/ solr/wikipedia
$ rm -rf solr/wikipedia/data/
$ find . -name "core.properties" -type f -exec rm {} \;
$ echo "name=wikipedia" > solr/wikipedia/core.properties

Now you are ready to start your first node in your SolrCloud
$ java -Dcollection.configName=wikipedia -DzkRun -DnumShards=2 -Dbootstrap_confdir=./solr/wikipedia/conf/ -jar start.jar 



Now let's create 3 more nodes and start them to complete our SolrCloud Instance

$ cp -r node1/ node2/
$ cd node2/
$ rm -rf solr/wikipedia/conf/
$ java -DzkHost=localhost:9983 -Djetty.port=8984 -jar start.jar 

$ cp -r node1/ node3/
$ cd node3/
$ rm -rf solr/wikipedia/conf/
$ java -DzkHost=localhost:9983 -Djetty.port=8985 -jar start.jar 

$ cp -r node1/ node4/
$ cd node4/
$ rm -rf solr/wikipedia/conf/
$ java -DzkHost=localhost:9983 -Djetty.port=8986 -jar start.jar 


Since all of our configuration files are now managed by Zookeeper, we would need to download them, then modify them and upload them back... yes, I know it is a pain, but this way you don't have to do it manually on each server, just do it once, upload it to Zookeeper and it would take care of the rest for you!

Navigate to /opt/solr/node1/scripts/cloud-scripts directory and run following command

$ ./zkcli.sh -zkhost localhost:9983 -cmd downconfig -confdir /<directory_of_your_choice>/solr_conf -confname wikipedia

Navigate to the directory and you should see all of the configuration files.
$ ls
_schema_analysis_stopwords_english.json elevate.xml solrconfig.xml
_schema_analysis_synonyms_english.json lang spellings.txt
admin-extra.html mapping-FoldToASCII.txt stopwords.txt
admin-extra.menu-bottom.html mapping-ISOLatin1Accent.txt synonyms.txt
admin-extra.menu-top.html protwords.txt update-script.js
clustering schema.xml velocity
currency.xml scripts.conf xslt

Make changes (usually to schema.xmla nd to solrconfig.xml) and upload it back by running similar command.

$ ./zkcli.sh -zkhost localhost:9983 -cmd upconfig -confdir /<directory_of_your_choice>/solr_conf -confname wikipedia

That's it!

Wednesday, August 13, 2014

Accumulo: How to creat a new table and set permissions

Creating a new table in Accumulo is pretty easy. It is as simple as


createtable my_new_cool_table


Now let's say that you create this table as a root. How can you check if another user will be able to read or write to this table? Let's say that you have user bob, how can you check what this user sees or can do?


Run following command
userpermissions -u bob


You should see a list of tables and current user authorization on a particular table


userpermissions -u bob
System permissions: System.CREATE_TABLE, System.DROP_TABLE, System.SYSTEM
Table permissions (!METADATA): Table.READ
Table permissions (META): Table.READ, Table.WRITE


The new table is not in the list since user bob can't do anything with the table that was created by root. Let's change that! In order for user bob to be able to read from new table, execute this command


grant Table.READ -t my_new_cool_table -u bob


If you were to re-execute userpermissions command, you would see


userpermissions -u bob
System permissions: System.CREATE_TABLE, System.DROP_TABLE, System.SYSTEM
Table permissions (!METADATA): Table.READ
Table permissions (META): Table.READ, Table.WRITE
Table permissions (my_new_cool_table): Table.READ


Full list of authorizations:
Table.ALTER_TABLE  
Table.BULK_IMPORT  
Table.DROP_TABLE   
Table.GRANT        
Table.READ         
Table.WRITE