Saturday, April 8, 2017

Distributed Lock using MongoDB

Distributed system needs synchronization many times. If the synchronizing processes are running within the same machine or container, then know problem; we have plenty of choices such as operating system provided semaphores, file locks etc. Synchronizing within the same process (across multiple threads, green threads etc.) is also fine, we have thread level locks, atomic integers, semaphores etc. Of course, we need to be very careful while we synchronization, there may deadlocks occasionally which will require us hours of debugging to uncover the root cause.

When we need to synchronize the activity of processes running across machines, then we will need distributed lock and there are systems suitable for that. Zookeeper, Etcd., Hazelcast, Redis etc. may be used for distributed locking.

In this post I will explain how we may use MongoDB for distributed locking. The locks described are exclusive locks(i.e. they are not read locks, implementing read locks using MongoDB will need some more thoughts :))

Now let us discuss how we can achieve exclusive locks using MongoDB. We will use below MongoDB document to describe a lock.
     _id :  "lock_id",
    acquirer: "acquirer_unique_id",
    updated: "timestamp"


There are two operations, lock and release lock. Also, the lock should be automatically released if the locking process is not alive, or unable connect to MongoDB for some interval.

Lock operation:
Lock("a_new_lock") ->  insert a record with _id = "a_new_lock" and acquirer="unique id for the acquirer", and expire after = current-time-stamp

The process will run a thread to update the expireafter time to current timestamp periodically.

UnLock operation:
UnLock("a_lock") -> delete the record with _id = "a_lock" and if the record's acquirer_id is same as the caller's acquirer id. Matching the acquirer_id is important, without that the process may end up releasing  a lock acquired by some other process.

Things needed to be done for MongoLock operations:
A mongo collection, let us name the collection "locks" in database distlock.
A ttl index in the collection, which will ensure that the locks eventually get released if the lock owner process goes down without releasing the lock.

Let us create a ttl index on the table locks:
db.locks.createIndex( { "updated": 1 }, { expireAfterSeconds: 600 } )

Let us create a lock with name "newlock":
db.locks.insert({_id : "newlock", aqcuirer : "myid", updated : new Date()})

Now if another process tries to get the "newlock", it will fail:

db.locks.insert({_id : "newlock", aqcuirer : "myid", updated : new Date()})

"nInserted" : 0,
"writeError" : {
"code" : 11000,
"errmsg" : "E11000 duplicate key error collection: distlock.locks index: _id_ dup key: { : \"newlock\" }"

Releasing the lock is a little tricky. We shouldn't delete the lock by its id only, we should delete the lock if the lock id and acquirer id both match. Otherwise, there may be a chance that a process will end up releasing a lock acquired by another process.
Lets release the lock "newlock":

db.locks.remove({_id : "newlock", aqcuirer : "myid"})

WriteResult({ "nRemoved" : 1 })

What happens if the process dies without releasing the lock? Well, the lock will be automatically released by MongoDB after 10 minutes as the locks collection has a ttl index on it with expireAfterSeconds set to 600 seconds. While the lock acquirer is running, it should update the "updated" field every 10 seconds or so, because some long running operation may continue for more than 10 minutes (after acquiring the lock).

The above simple operations demonstrate how we may implement simple distributed exclusive lock using MongoDB. Hope it was helpful :)

The prototype implementation is available here:

Friday, October 7, 2016

A high performance Hyperloglog server

Hyperloglog is an algorithm to get the approximate distinct element count of a multiset. Calculating the exact value for distinct count requires lot of memory and may not be practical for all situations. Generally, the memory required is proportional to the cardinality of the multiset.

The basis of the algorithm is:
Cardinality of a multiset of uniformly distributed random numbers can be calculated by finding the maximum number of leading zeroes in the binary representations of those numbers. If the maximum number of leading zeroes  observed is n, then  is the cardinality of the numbers. It comes from the fact that, probability of encountering a 0 in the first bit is 1/2, probability of encountering 2 consecutive zero bits is 1/ 2 * 1/2 = 1/4, probability of 3 consecutive zero bit is 1/8,.... and so probability of encountering n leading zero is . That means if we encountered n leading zeros in the randomly distributed numbers, then it is highly probable that there are around  distinct numbers in the multiset.

Generally, we apply a hash function on the original elements of the multiset and calculate the maximum number of leading zeroes from the hash values. For example, to find the distinct count of the documents in a large collection of documents, we may apply a hash function on the title (or on the content)  of the documents and compute the leading zeroes of the hash values returned by the hash function.

Simply using the above approach sometimes may give very incorrect results when the multiset has few distinct elements. So, the algorithm uses different approaches when the number of distinct elements are less. Secondly, it splits the input multiset into multiple subsets and calculate maximum leading zeroes in each subset and applies a harmonic mean on the value calculated for each subset to obtain the final result.

I have implemented a Hyperloglog server in Go. It exposes its functionalities over HTTP and Thrift protocol and so we may access the functionalities from almost all the programming languages.
For detailed installation and API information, please check this GITHUB LINK.  It provides persistence (optional) and hence the data can be maintained across restarts. Also, it works blazingly fast and can take advantages of multiple processors of the running machine.

Installing the server:
$ go get

Running the sever:
$hllserverd -db /tmp -persist

Detailed usage of hllserverd:

$ hllserverd --help
Usage of hllserverd:
  -db string
        directory for hyperlog db (default "/tmp")
  -dbfile string
        hyperlogdb file (default "hyperlogs.db")
  -http string
        give http lister_address (default ":55123")
  -logbackup int
        maximum backup for logs (default 10)
  -logfile string
        log file path (default "/tmp/hllogs.log")
  -logfilesize int
        log rollover size (default 2048000)
  -loglevel string
        logging level (default "INFO")
        should persist the hyperlogs in db?
  -thrift string
        thrift rpc address (default "")

Below is a simple python program ( demonstrating the use of various hyperloglog server APIs over HTTP:

import httplib
import json
import urllib
import sys
from base64 import b64encode
from uuid import uuid1

hlld_http_address = ''
if len(sys.argv) > 1:
    hlld_http_address = sys.argv[1]

def check_response(resp):
    if resp.status != 200:
        print 'Failed'
        resp_data =
        resp_json = json.loads(resp_data)
        if resp_json['status'] != 'success':
            print 'Failed to add logkey'
        return resp_json
    return None

print 'Adding logkey {} with expiry {}'.format('visitors', 86400)
client = httplib.HTTPConnection(hlld_http_address)
client.request('GET', '/addlogkey?logkey=visitors&expiry=86400')
resp = client.getresponse()

print 'Successfully added hyperlog key {}'.format('visitors')

print 'Updating logkey visitors'.format('visitors')
i = 0
while i < 100:
    j = 0
    params = {'logkey': 'visitors', 'values': []}
    while j < 100:
        j += 1
    client = httplib.HTTPConnection(hlld_http_address)
    client.request("POST", "/updatelog", json.dumps(params))
    resp = client.getresponse()
    i += 1
print 'Updated logkey {} successfully'.format('visitors')

print 'Getting cardinality of the logkey {}'.format('visitors')
client = httplib.HTTPConnection(hlld_http_address)
url = '/cardinality?logkey={}'.format('visitors')
client.request('GET', url)
resp = client.getresponse()
resp = check_response(resp)
print 'Cardinality for logkey {}: {}'.format('visitors', resp['cardinality'])

print 'Updating expiry of logkey {}'.format('visitors')
client = httplib.HTTPConnection(hlld_http_address)
url = '/updexpiry?logkey={}&expiry={}'.format('visitors', 10)
client.request('GET', url)
resp = client.getresponse()
resp = check_response(resp)
print 'Successfully updated expiry of logkey {}'.format('visitors')

print 'Deleting logkey {}'.format('visitors')
client = httplib.HTTPConnection(hlld_http_address)
url = '/dellogkey?logkey={}'.format('visitors')
client.request('GET', url)
resp = client.getresponse()
resp = check_response(resp)
print 'Successfully deleted logkey {}'.format('visitors')

Output of a sample run of the above script:

$ python
Adding logkey visitors with expiry 86400
Successfully added hyperlog key visitors
Updating logkey visitors
Updated logkey visitors successfully
Getting cardinality of the logkey visitors
Cardinality for logkey visitors: 9540
Updating expiry of logkey visitors
Successfully updated expiry of logkey visitors
Deleting logkey visitors

Successfully deleted logkey visitors

Thursday, September 22, 2016

Using apache Kafka correctly

Apache Kafka is one of the most popular message broker with high performance, scalability and reliability and high availability. It provides replication which is the most important feature as replication is a much for making the service highly available.
It is open source and hence it is free!  We get enough examples in the web to write and execute our POCs successfully.  But our proof of concepts are just "hello world"s and these are not enough to know what problems may happen in productions.

I have seen terrible misuse of Kafka happening in production and I was really amazed that Kafka was mostly able to handle even that kind of assault!!!

Now few lines about Kafka:
Apache Kafka is a publish-subscribe messaging system with partitioning, replication and persistence. The three components of the messaging system are brokers, producers, consumers. Kafka also needs Zookeeper to maintain cluster state, discovering new brokers etc. and hence Zookeeper is an essential component for Kafka deployment. Below diagram explains a typical Kafka installation along with producers and consumers.

 Producers sends messages to brokers and consumers pulls messages from the brokers. In Kafka, topics are partitioned and partitions may be replicated depending on the configurations. Each partition actually can be visualized as an independent FIFO queue. Partitions can be written in parallel and consumed in parallel. Basically, consumers consume the messages in sequence from a partition, but there is no need to maintain any order for consuming messages across partitions.

Now let us examine some innocent looking code which may tax the Kafka cluster like anything, but doesn't looking problematic at all.
Below is a program to write messages to Kafka (it is using kafka-python library version 1.3.1)

from kafka import KafkaProducer
from kafka import KafkaProducer

def send_message(partion_key, message):
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    producer.send('topic1', key=partion_key, value=message)

for i in range(10000):
    key = str(i) + '_partition_key'
    msg = 'This is message ' + str(i)
    send_message(key, msg)

The above code snippet is connecting to a Kafka cluster and sends 10000 messages to Kafka. But one terrible thing we are doing here is that for every message we are creating a Kafka producer object and after sending the message to Kafka we are just closing the producer.  The producer object is really heavy. What happens behind is: it first connects to one of the bootstrap servers and gets meta-data about the topic(s) and brokers. The meta data may be few killobytes if there are many partitions for the topics or it is retrieving meta-data for many topics. From the meta data, the producer comes to know which broker is leader for which partition of a topic and many other details.  It is the producers which partitions the messages based on some partition key. So, distributing messages among the partitions is responsibility of the producers and brokers have no role to play here. Before sending the message to a particular broker, the producer will check if there is connection already made to that broker and will reuse an existing connection. If there is no connection to the leader broker for that partition, then the producer will make a connection to that broker and then send the data to broker.
If we repeat the above process for every message sent to Kafka, then there will be huge impact on the Kafka brokers as well as they may be serving metadata for each message and sometimes we will see long GC pause on Kafka brokers.  Another funny thing may be: Kafka brokers will be sending more data (metadata) to the producers, than the producers will be sending to to Kafka brokers  when pro meta-data for the topic-partitions is big and producer is sending small messages as shown in the above example.

I saw this actually happening in production and they were using the producers for long time like that!!!

So, always re-use the Kafka producer, consumer instances. In one process, it is good practice to use just one Kafka producer or consumer object.

Number of partitions for a topic

Topic should be created with enough number of partitions so we don't need to increase the number of partitions for the topic in near future. Suppose we have a topic "logtopic" and we calculated 100 partitions in 5 brokers should be good enough for handling the load. Then create the topic with 200 partitions, or just over commit the number of partitions. But don't start with 20 times (or 4000 partitions in this example) of the required number of partitions.
Each partition of a topic translates to at least two files in Kafka broker directories. With replication-factor > 1, there will be at-least 2 * number-of-partitions files distributed among the brokers. The number of files will increase when rollover happens. A partition files is only deleted when the file older than the time specified in retention-interval. So, there may be many partition files in Kafka brokers and Kafka servers may not be closing the old files when rollover happens. That may result in some errors related to "too many file descriptors open" etc. Though it is easy to increase maximum number of open file descriptors for the process, it is always better to take preventive measure for this problem. One way to achieve that is by creating the topics with judicious numbers of partitions.
More number of partitions also results in bigger size of meta-data and higher number of offset commits etc. If the Kafka consumer offsets are maintained on Zookeeper, then each offset commit translates to a write operation to Zookeeper, which may be problematic when there too many of such writes happening. With latest versions of Kafka, the topic offsets may be maintained in Kafka itself, but then also, each Kafka offset commit is a separate write operations to Kafka.

Partition to tenant mapping??

Sometimes people create topics which is served among multiple tenants. It is bad design to maintain topic-partition to tenant mapping. If we need tenants' data separation even at Kafka level, then we should create different topic for different tenants. Tenants which are paying us more or which generates more data should be allocated topic with more number of partitions. The data in Kafka is data in transit and Kafka is not a database for querying or long term object storage. So, sharing a topic across multiple tenant's should not be an issue. All we need to "tag" each message (from producer) with a field which let us know the actual tenant the message belongs to.

Retention period

Kafka deletes  message volume files when they are older than the period defined in its retention period. While deleting the messages, Kafka doesn't care about if the consumers actually consumed the message or not. So it may happen that some messages get deleted even before they are processed by the consumer. When consumer's processing of message is very slow, then this situation may very likely happen. I have seen this happening in production where millions of messages were lost because the consumers could not process them within the the configured retention period. So, we have to be careful about what values we set for retention period of the topics.

Message size

Kafka brokers has configuration for maximum message size(message.max.bytes) and for replication the maximum size of the data fetch(replica.fetch.max.bytes). We should keep the value for both same or replica.fetch.max.bytes > message.max.bytes. If message.max.bytes > replica.fetch.max.bytes (let us call it m_big) then it may happen that producer publishes a message with size > replica.fetch.max.bytes. This message will fail to get copied to its replica because the broker holding that replica is sending a fetch request(to the the leader for that partition) setting the maximum fetch byte at lower value then the size of m_big.  This failure will repeated continuously and replication for that partition stops completely. Only way to recover from the failure would be to shutdown the follower broker and change the replica.fetch.max.bytes value in the configuration to greater than or equal to that of message.max.bytes.

fetch.message.max.bytes is also defined for the Scala/Java consumer API or similar configs for kafka client library in other languages. We should be careful enough to set the value for this property >= message.max.bytes of Kafka cluster.

Meta data issue

Meta data for the Kafka cluster may be huge if there are many numbers of topic and partition in the cluster. Whoever is writing Kafka producer or client drivers should not that it only fetches the meta data for the required topics as requested by the callers. Required meta data may be fetched every few seconds to keeping the information up-to-date or only when needed, but that should not be very frequent. Caller of the producer or consumer driver should not be explicitly requesting  meta data information as far as possible.


Kafka supports gzip and snappy compression for the messages. This can be set at message level. In general, gzip provides better compression ratio, while snappy provides better speed of compression. If the messages being published to Kafka is bigger than few bytes (> 100 bytes or so) and the message mainly consists of texts, JSON, XMLs etc., then enabling compression on the messages may significantly reduce the disk space consumed by Kafka brokers and it also improves both network and disk IO. Both the producers and consumers though may be using more CPU for compression and decompression of the messages.

Synchronous vs asynchronus Kafka producers

Kafka wire protocol for the producers has the facility to make the producers asynchronous. The requests crafted by the producer has a 32-bit correaltion-id which is sent back by server in the response and can be used for  matching the responses with corresponding requests.  This way a  producer can use just one connection to a server/broker for sending all the producer requests and it doesn't need to wait for the response for a request before it sends another producer request to the server.
In case of synchronous producers, a connection to a server is used exclusively for a request at a time. That means, for every request the producer will take away a connection to a server from a  connection pool, send the request to server over that connection, wait for the response from the server and after the response is received, the connection is released back to the connection pool. So, suppose the producer wants to send 32 requests in parallel to a server, it will need to have 32 connection to the server. Having too many connections, both on client and server is not a very good thing.

Kafka producer library bundled with Kafka (>= 0.8.2 version), is asynchronous and that is really good. Before selecting a Kafka library, we should see if that library really supports an asynchronous producer and we should always prefer asynchronous producers.  

Ack for writes to servers

High availability doesn't mean persistence alone. High availability includes persistence and replication of the data so that if a server goes down, the data still available on another server. Kafka provides high availability by persisting the messages on disk and replicating them. Producers can send the messages with different "ack levels". It may indicate that producer doesn't need any ack from the server, or may want an ack only if the server persists the message to its local log, or when the server finds the message is committed at all in sync replicas, or if the message is replicated to certain number of replicas. Generally, the messages that are not very important or if we may afford to lose few of them should be added to separate topics with low replication factor (e.g. <= 3). For such messages, producer can send the messages which indicates that it will want an ack from server as soon as the message is added to the servers local logs.

For messages, which we cannot afford to lose, we should use topics with higher replication factor (>=3) and also the producer should send the messages with "required acks" which instructs the server to send the response only when the message is replicated to all in sync replicas. This will minimize chances of data loss to a great extent.

Monday, August 1, 2016

A CountdownLatch for C++ multi-threaded programs

Java standard library comes with CountDownLatch which is  synchronization aid which allows one or more threads to wait on until a set of operations being performed by other threads completes.

For  example, let one thread input a string to some queues (one or two queues) and wait for completion of two operations, which are:
  1. Calculating the hashcode of the string
  2. Finding the count of unique chars in the string
The task submitter may wait asynchronously on the countdownlatch for the operations to be completed. The task executor threads will countdown the countdown latch once they complete their operations. Waiters get notified once the count of the countdownlatch reches zero. In this example, the waiter thread will create a countdownlatch object with count 2 and somehow pass the reference of the countdownlatch object to the executor threads (May be putting the string and pointer to the countdownlatch object in the task object).

We may achieve similar functionalities using Posix thread mutexes and condition variables (Threading libraries other than pthread will work as well). But as CountDownLatch is an useful and frequently used patterns, so it will be nice to have a library for the same.
I implemented a CountDownLatch class for C++ in less than 100 lines of code which provides similar functionalities as its Java counterpart. The code is available in Github(countdownlatch). May be useful for you as well :):):)...........

Below is a sample program demonstrating the use of countdownlatch:

#include <unistd.h>
#include <thread>
#include <vector>
#include <iostream>
#include <countdownlatch.hpp>

void fun(clatch::countdownlatch *cl) {
    std::cout << "Wait is over " << std::endl;

int main() {
    auto cl = new clatch::countdownlatch(10);
    int i = 0;
    std::vector<std::thread*> ts;
    while (i++ < 2) {
        std::thread *t  = new std::thread(fun, cl);

    i = 0;
    while (i++ < 10) {
    i = 0;
    while (i < 2) {
    i = 0;
    while (i < 2) {
        delete ts[i++];
    delete cl;
    return 0;


Sunday, July 31, 2016

Cyclicbarrier for C++ multi-threaded programs

Java has very good concurrency support and a very good library providing useful classes for multi-threaded programming. One such class is CyclicBarrier.
From Javadoc, CyclicBarrier is:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. 

C++ standard library lacks this utility. This is not a problem anyway as pthread_barrier routines (pthread_barrier_wait, pthread_barrier_init etc.) exactly do the same and we may always use pthread library for multi-threaded programming in C++ and Pthread library provides much more control on how we use the threads. But C++11 has multi-threading built into the language (and on Linux platform G++ internally uses Pthreads only) and so it would have been nice if a cyclicbarrier like construct was part of C++ library itself.

So, I spent some time building a library that provides silmilar functionality as the Java CyclicBarrier class. which is accessible at github. There is an example program demonstrating how to use it. May be you like to use it.

Wednesday, July 27, 2016

Bitset data structure in Golang

I have created a Library for bitset where each bit can be treated as a boolean value. The library is accessible here. It's a thread-safe library (or rather a goroutine safe library) and can be happily used from multiple goroutines concurrently. It has many features like get, set, xor, end, clone etc. The API documentation for the library is accessible here.

I have created this library as I need similar functionalities in the distributed cache (geetcache) project I am currently working on. It helps me to serialize-deserialize the hyperlogs data types in a compact manner. An example usage is given below:

package main

import (

func main() {
 bs := bitset.NewBitset(8)
 if bs.IsAllZero() {
  fmt.Printf("All bits are set to zero\n")
 fmt.Printf("Number of ones %d\n", bs.GetSetbitCount())
 fmt.Printf("Number of zeros %d\n", bs.GetZerobitCount())
 bs = bitset.NewBitset(10000)
 fmt.Printf("Number of ones %d\n", bs.GetSetbitCount())
 fmt.Printf("Number of zeros %d\n", bs.GetZerobitCount())

A more detailed example is given below:

package main

import (

func main() {
 bs := bitset.NewBitset(80)
 if bs.IsAllZero() {
  fmt.Printf("All bits are set to zero\n")
 fmt.Printf("Number of ones %d\n", bs.GetSetbitCount())
 fmt.Printf("Number of zeros %d\n", bs.GetZerobitCount())
 ret, err := bs.IsSet(9)
 if err != nil || !ret {
  fmt.Printf("Some issue\n")

 bs.SetVal(1, 10, 511)
 recvd, err := bs.GetByte(0)
 if err == nil {
  fmt.Printf("First byte in the underlying array %d\n", recvd)

 // set bytes 6,7,8 to zero and don't touch other bits
 bs.SetVal(6, 8, 0)

 // Clear all the bits, or make them all zero

 // Set all the bits to 1

 // Flip 10th bit
 recvd, _ = bs.GetByte(10)
 fmt.Printf("The 10 th byte %d\n", recvd)

 // Get a copy of underlying bytes of the bitset
 bytes := bs.GetBytes()
 fmt.Printf("The first four bytes in the bitset %v\n", bytes[0:4])

 // Flip all the bits
 bs.FlipRange(1, 639)

 // Set the bits 20,21,....,39,40 to 1
 bs.SetRange(20, 40)

 // Set the bits from 20,21,22,....,79,80 to zero
 bs.ClearRange(20, 80)

 other1 := bitset.NewBitset(4)

 other2 := bitset.NewBitset(10)
 other2.SetRange(40, 60)

 // Get next zero bit position after index 0
 indx, _ := bs.GetNextZeroBit(0)
 fmt.Printf("Index of next zero bit %d\n", indx)

 // Get the zero bit before bit 80
 indx, _ = bs.GetPrevSetBit(80)

 // Get the bits from 10 to 28 packed in an uint32
 retuint32, _ := bs.GetVal(10, 28)
 fmt.Printf("The packed uint32 returned is %d\n", retuint32)

Sunday, May 29, 2016

Things that I miss in Golang

Go is a nice language and I feel it sits somewhere between C and C++. It has good libraries. Very good Concurrency support can make creating highly concurrent application much easier. Goroutines and channels are great and they help in creating simple and efficient programs.

But Go still lacks many facilities that C++, Java, Python provides. Below are the list of features I wish Go would have included:

  1. Default parameters for functions or methods: Go doesn't have this feature. That makes things awkward sometime. Let us say, I am writing a pipeline in Go (trying to emulate Storm in Go). Each stages in the pipeline may emit a tuple calling the method collector.Emit(tuple). Now I wish to add the feature of tracking a tuple tree. I want to signal that tracking is needed. I would have loved to do that using the same method, something like  collector.Emit(tuple, context) where context may be some struct containing the tracking information. Let us say, we could have declared the method as shown below: 

    func (collector *Collector) Emit(t Tuple, context *Context = nil) {

    We can do this in C++ and Python. In Java also method overloading is available. Wish we could do the same in Golang as well. It is not a thing that we cannot work around. But suppose I designed Emit the method without the tracking stuff in mind and later I wanted to add the tracking part; then the default parameters will help as I don't need to change my existing application code that will use the pipeline library.
  2. No great  debugger bundled with the Go distribution I download from Google. Gccgo has integration with GDB, but I don't use gccgo. Having a good debugger is a great help for lesser mortals like us. Java/C++/Python/C doesn't have this problem.
  3. Lack of inheritance: This hampers our ability to re-use code. Most probably because, Go is not a purely object oriented language and it wanted to have minimal support for OO paradigm. 
  4. Type safety is less:  In Go programs, we tend to pass interface{} as parameter too often. But that is like void * in C/C++ or Object in Java. If we had inheritance in Go, and if we could make a base "class" pointer/reference to point/refer a derived class object, things would have been really easier.
  5. No generics:  We end up writing the same code for different types, which could have been avoided if we had generics
  6. No exception handling: I have used Java/C++ a lot. Lack of exception handling means, we have to check the return status of functions everywhere.

Wednesday, March 9, 2016

Design of a Scalable and Reliable pipleline with Storm and Kafka

Apache Storm is a real-time event processing system. We may even call it real-time Hadoop. Unlike Hadoop map-reduce jobs, Storm topologies keep running continuously looking for events to process. A storm topology defines how the events are moved in the execution pipeline which is expressed as directed acyclic graph. A topology has hooks to work on the data. The hooks are called spouts and bolts. Generally, the spout is the event "generator" and the entry point to the topology. Spout may get the events from anything, but it is common to get the events from Kafka, Kestrel, Mongodb etc. as the event source. Hooks are the entry points for code and the routing of tuples (data) is controlled by Storm framework. When a Spout or Bolt emits a tuple, the tuple is sent to next bolt in the pipeline. Each spout or bolts run a set of tasks doing the exactly same work. The tasks may be run in separate threads or a set of threads may be multiplexed among the tasks.

A typical topology looks like as shown below:

The spout is the entry point to the topology. There may be multiple Spouts in a topology and there may be multiple data sources. In this above example, a spout is emitting data to bolt 1 and bolt 2. Bolt 2 is further emitting data to bolt 3. Each of the spouts and bolts may running multiple tasks (may be separate threads) and the tasks may running on same or different JVM or across machines. Storm framework takes care of routing the data (tuples) among the various components.

Storm cluster has three different components. They are Nimbus, Supervisors and UI. A storm cluster has just one Nimbus node and this is a single point of failure till now.  When a topology is submitted, the topology Jar, along with configs are sent to Nimbus (using Thrift protocol). Nimbus decides which Supervisors to run the the topology and the topology Jar is submitted to those Supervisor machines as well (For testing purpose we may run Nimbus, Supervisors and UI all in the same machine.
Each supervisor exposes a few slots. Each slot actually can be occupied by a topology worker, where a worker is a JVM process. When we create the topology, we can specify the number workers the topology should be run with and Storm tries best to provide that number of workers provided that many slots available in the cluster.

Another essential component in the cluster is Zookeeper. Storm uses Zookeeper  for co-ordination of Nimbus, Supervisor and UI components.

Now enough of theories. Let us make a topology and run it in a Storm cluster.

Our data source is Kafka which is reliable and highly available message queue. It is simple and performs really well.

Now let is assume a simple scenario where we collect details of people for doing some analysis. As this is a demo, so I will just put the very basic information about the person: name, age and id. The information is a Json document and a sample is given below:

    "id" : "udhdgcbcdg",
    "name" : "Geet",
    "age" : 8


The document is added to a Mongodb collection person
Now let us create a Kafka topic "demo". I assume you have set up a Zookeeper and Kafka cluster . A Kafka cluster with just one node is enough for demo. Similarly a Zookeeper node is enough for our demo. We don't need high availability to run a demo :)

Cd to Kafka install directory and issue the below command:
$ bin/ --create --topic demo --partitions 16 --replication-factor 1 --zookeeper

If you are running Zookeeper and Kafka locally, use for host and port.
$bin/ --create --topic demo --partitions 16 --replication-factor 1 --zookeeper

 Now a topic "demo" is created with 16 partitions, and no replication for the topic. So, no high-availability!

Let us download Storm 0.9.6. Extract the package. I modified Storm config storm.yaml as shown below:
# Storm.yaml ###########
    - "" ""
    - 6700
    - 6701
    - 6702
    - 6703

Note that I am running Nimbus and Zookeeper on my localhost.

Change directory to storm installation directory (delete the jar log4j-over-slf4j-1.6.6.jar from Storm lib directory, otherwise we may face some conflicts in running our topology):

and issue the below commands:
$ bin/storm nimbus  #Starts nimbus
$ bin/storm supervisor #Starts Supervisor
$ bin/storm ui #Starts the UI

Now we may check the Storm UI by pointing our browser to

The sample code for the topology prototype is here. Please download the code so that we may run the topology.
After you downloaded the code, build the topology jar by issuing the below command:

$cd  datainfralytics/pipeline/proto
$ mvn package

This creates the a jar for our topology (proto-1.0-SNAPSHOT-jar-with-dependencies.jar in directory datainfralytics/pipeline/proto/target).
We submit the jar issuing the below command:
$  $STORM_HOME/bin jar proto-1.0-SNAPSHOT-jar-with-dependencies.jar

STORM_HOME environment variable should be pointing to storm installation directory. file contain the configuration details such as elasticsearch cluster name, mongos host ip etc. A sample is given below:


Now our topology will be running and we can inspect the details from the Storm UI.

The topology is consuming events from Kafka. These events are nothing but JSON documents containing information about persons.  A producer process writes the person details to Kafka and our topology consumes the events from Kafka. A very simple Python producer script is given below:

from time import time, sleep
from random import randint
from json import dumps
from uuid import uuid1
from kafka import KafkaClient, KeyedProducer
kafka_brokers = '' # Comma separated list of brokers
topic = 'demo' 
kclient = KafkaClient(kafka_brokers)
producer = KeyedProducer(kclient)

    i = 1
    namein = 'name_{}'.format(int(time()))
    while i <= 10000:
        name = '{}_{}'.format(namein, i)
        person = {'id' : uuid1().hex , 'name' : name, 'age' : randint(1,100)}
        producer.send(topic, name, dumps(person))
        i += 1
        print i
except Exception as e:
    print e

Run the script and it will put 10000 person documents in Kafka in the topic demo. The topology will almost instantly consume the person JSON documents and put them to MongoDB and index them on Elasticsearch.

Now let us look at the Topology and understand how can it be scalable and highly available. The topology is defined in the following Java class:


import java.util.Properties;
import java.util.Set;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class StartTopology {
 private static final String TOPOLOGY = "DemoTopolgy";

 public static void main(String[] args) {
  if (args.length < 1) {
   System.err.println("Please provide the proprty file name");
  TopologyBuilder tbuilder = new TopologyBuilder();
  tbuilder.setSpout("dataspout", new DataSpout(), 16);
  Fields groupingField = new Fields("person");
  tbuilder.setBolt("boltdb", new MongoBolt(), 16).fieldsGrouping("dataspout", groupingField);
  tbuilder.setBolt("boltindex", new EsBolt(), 32).fieldsGrouping("boltdb", groupingField);
  Config conf = new Config();
  conf.put(Config.TOPOLOGY_DEBUG, false);

  Properties props = new Properties();
  try {
   FileInputStream inf = new FileInputStream(args[0]);
   Set<Object> propnames = props.keySet();
   for (Object x : propnames) {
    conf.put((String) x, props.get(x));
  } catch (IOException e) {
  try {
   StormSubmitter.submitTopology(TOPOLOGY, conf, tbuilder.createTopology());
  } catch (AlreadyAliveException | InvalidTopologyException e) {

At line 28: We are specifying a spout (labelled "dataspout") which will be run with 16 tasks. It is implemented by the DataSpout class.
Line 30: Specifies the DB updater bolt (labelled "boltdb") which will be run with 16 tasks. It takes input from "dataspout", i.e. the tuples emitted by "dataspout" are directed to this bolt. It is implemented by MongoBolt class.
Line 31:  Specifies the index updater bolt (labelled "boltindex") which is run with 32 tasks. It takes input from "boltdb" and updates the index. It is implemented by EsBolt class.

So, data flows from Kafka to Spout (dataspout), from Spout to DB Updater bolt(boltdb) and from DB updater bolt to index updater bolt (indexbolt)

Now how it is scalable?
It is scalable because we are using scalable technologies such as Kafka, MongoDB, ElasticSearch:)
But what about the pipeline we have just built ? Is it also scalable, is it also reliable?
The pipeline itself is scalable. As we may  increase the number of tasks for each stage.  We may also increase the number of workers executing the topology, and may also increase the number of machines running the workers. So, scaling is not a problem here.

Order of updates:
Sometimes the order in which updates are applied across storage systems is very important. For example. there may be an event which specify update of a record, and a subsequent event which specifies the delete of the same record. If the events are applied in out of sequence, then the deleted record may actually reappear in our storage systems, in such a case our customers will scold us and may even switch to our competitor's product. That is why we should make sure that events are applied in order. That is achievable by applying the events to the queue (Kafka) in order. Each event may be associated with a time stamp and ensure that events are added to the queue in the order of that time stamp.

How to keep the storage systems in Sync:
Maintaining the order of processing:
Error handling:

Wednesday, May 27, 2015

An Eventbus for Python

Sometimes we need publish-subscribe kind of communications between components running in the same process. It is even better if the components don't need to know each-other as that simplifies the design of complex and highly-concurrent components. 

We may design similar systems using Queues also. But then also, coupling between components are not completely eliminated and we end up spawning threads and writing lots of complex logic ourselves. Moreover the pattern gets repeated across many pieces and ultimately making it a nightmare to weave the components together in a simpler way. 

So, Here is a Python eventbus library, geeteventbus  which is addressing exactly these problems we are facing. 

The API related documentations are available  here .

geeteventbus is inspired by the Java library guava eventbus. But it is not exactly the similar to Guava eventbus. 

To install the library, issue the below command:

$ sudo pip install geeteventbus

This is a platform independent package and so may be used on any OS where Python can be run.

Below diagram explains the architecture of the event-bus.

There are three type of components here. The publishers post events to the event-bus. The event-bus takes care of delivering the events to appropriate subscribers. The subscribers registers themselves to the event-bus and they register themselves for certain topics. Each event is associated with a topic, data and an optional ordering field. The topic of the event decides which subscribers it will be delivered to by the event-bus.

The event-bus can be synchronous or asynchronous. 

  • A synchronous event-bus delivers the events from the same threads the were posted from.
  • An asynchronous event-bus delivers the events to the subscribers from different threads. Basically, here the events are delivered from some of the executor threads run by the event-bus internally.
  • While creating the event-bus, we may declare the future subscribers to be thread-safe. In that case, the subscribers invocation won't be synchronized. 
  • In some cases, we may want the events to be processed by the subscribers in the same order as they were posted. To enforce that, event objects are created with a special ordering field. E.g. event('atopic', 'somedata for this event', 'an-ordering-key'). All the events with the ordering key "an-ordering-key" will be processed in the same order they were posted to the event-bus.
  • Multiple event-bus can be created in the same process if needed.

Basic working

  1. We create an eventbus
    from geeteventbus.eventbus import eventbus
    eb = eventbus()
    This will create an eventbus with the defaults. The default eventbus will have below characteristics:
    1. the maximum queued event limit is set to 10000
    2. number of executor thread is 8
    3. the subscribers will be called asynchronously
    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:
    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
  3. Post some events to the eventbus with the topic "an_important_topic".
    from geeteventbus.event import event
    eobj1 = ('an_important_topic', 'This is some data for the event 1')
    eobj2 = ('an_important_topic', 'This is some data for the event 2')
    eobj3 = ('an_important_topic', 'This is some data for the event 3')
    eobj3 = ('an_important_topic', 'This is some data for the event 4')
  4. We may gracefully shutdown the eventbus before exiting the process
The complete example is below:
from time import sleep
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event

class mysubscriber(subscriber):
    def process(self, eventobj):
        if not isinstance(eventobj, event):
            print('Invalid object type is passed.')
        topic = eventobj.get_topic()
        data = eventobj.get_data()
        print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))

eb = eventbus()
subscr = mysubscriber()
eb.register_consumer(subscr, 'an_important_topic')

eobj1 = event('an_important_topic', 'This is some data for the event 1')
eobj2 = event('an_important_topic', 'This is some data for the event 2')
eobj3 = event('an_important_topic', 'This is some data for the event 3')
eobj4 = event('an_important_topic', 'This is some data for the event 4')

A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:
from threading import Lock, Thread
from time import sleep, time
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
from random import randint

class counter_aggregator(subscriber, Thread):
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    def __init__(self, counter_names):
        self.counter_names = counter_names
        self.locks = {}
        self.counts = {}
        self.keep_running = True
        self.collect_times = {}
        for counter in counter_names:
            self.locks[counter] = Lock()
            self.counts[counter] = 0
            self.collect_times[counter] = time()

    def process(self, eobj):
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        counter_name = eobj.get_topic()
        if counter_name not in self.counter_names:
        count = eobj.get_data()
        with self.locks[counter_name]:
            self.counts[counter_name] += count

    def stop(self):
        self.keep_running = False

    def __call__(self):
        Keep outputing the aggregated counts every 2 seconds
        while self.keep_running:
            for counter_name in self.counter_names:
                with self.locks[counter_name]:
                    print('Change for counter %s = %d, in last %f secs' % (counter_name,
                          self.counts[counter_name], time() - self.collect_times[counter_name]))
                    self.counts[counter_name] = 0
                    self.collect_times[counter_name] = time()
        print('Aggregator exited')

class count_producer:
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    def __init__(self, counters, ebus):
        self.counters = counters
        self.ebus = ebus
        self.keep_running = True
        self.num_counter = len(counters)

    def stop(self):
        self.keep_running = False

    def __call__(self):
        while self.keep_running:
            ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
        print('producer exited')

if __name__ == '__main__':
    ebus = eventbus()
    counters = ['c1', 'c2', 'c3', 'c4']
    subcr = counter_aggregator(counters)
    producer = count_producer(counters, ebus)
    for counter in counters:
        ebus.register_consumer(subcr, counter)
    threads = []
    i = 30
    while i > 0:
        i -= 1

    aggregator_thread = Thread(target=subcr)
    for thrd in threads: