Sunday, November 30, 2014

Using Zookeeper to coordinate distributed Job watcher

Zookeeper is a tool that can be used for distributed system synchronization, coordination, registry, lock service etc. There are not many open source alternatives to Zookeeper and Zookeeper seems to be pretty good at what it does. That is why many open source projects such as Storm, Kafka use it for service discovery, service registry etc.

In this blog I will explain, how we can use Zookeeper as a coordinator for distributed job watchers.

Let us assume there are processes running on different machines which collect and submits batches (small batches) of works to some queues. There are job executors which polls for the jobs submitted and executes them and submit the results back to the queue. Job watchers keep on monitoring the job statuses and once a job completed they collect the results and do something with the result.
There may be multiple job submitters, watchers and executors. Many instances of job submitters, watchers or executors may come up and go down. The job watchers and executors do a fair sharing of the jobs to watch and execute.  Appearance of new executors or disappearance of existing of executors will trigger re-sharing of the jobs among the executors. Similarly, appearance of disappearance of watchers will trigger re-sharing of watches.

All the above necessitates reliable co-ordination among the different tasks may be executing different machines. Doing the co-ordination correctly is hard as there are too many nuances to address. Fortunately Zookeeper has addressed all of these and it is a proven piece of software that in use for some years now. Let me explain how we can do the co-ordination through zookeeper. (Please refer Zookeeper getting started guide for an overview of Zookeeper if you haven't used it already.)

First, a job watcher has to be notified about the appearance and disappearance of other job watchers. Each watcher while starting up registers themselves under an well known znode /watchers. So, /watchers will have a list of children which are the unique ids of the watcher processes. A child node gets added when a new watcher starts and a child node will disappear when the corresponding watcher process dies, disconnects or loses network connection. Each job watcher sets a watch on the /watchers node and when a watcher process appears or disappears, it gets a notification with list of currently registered watchers ids.




Also, when a watcher chooses to watch a job, it locks that job so as to signal others that they shouldn't spend cycles watching the job it already watching. It will create a lock with the same name under another znode /watchlocks.

The job executors share the jobs to execute. So, whenever a new executor comes up re-sharing of jobs is triggered. Similarly whenever a executor disappears, re-sharing of jobs is again triggered. Each job executors registers themselves under the node /executors and they also put an watch on the znode "/watchers" so that they get notifications for changes in the list of executors currently working.

The executors may not know when a job completes. But the watchers know when a job is complete. This is because when the job submitter submits a job, it submits the job with enough information so that watcher can know when a job is complete. Actually, the executors can also know when a job is complete. But in this example, I am assigning just one responsibility to the executors, which they execute the tasks and submit the results to some results queue.

Whenever watcher detects the completion of a job, it collects the results of the completed job, remove job details from /jobs znode and do something with the result. As a node under /jobs znode is deleted, hence the watcher again re-share the jobs to watch.


This approach gives us the ability to monitor complex workflows. Because there is no reason watcher cannot submit the completed jobs to some other queues which is again executed by the executors. Here I am just giving a very basic example to explain the overall working.

We will use Python for our examples.

We will use Kazoo Zookeeper client.   Install it:
$ sudo pip install kazoo

We will use Redis  as the jobs queue and jobs results queue. We need to install  Redis.
Download Redis from here.
Build Redis.
Extract the downloaded tar archive:
$ tar zxf redis-2.8.17.tar.gz
$ cd redis-2.8.17
$ sudo make
$ sudo make install

Now we install Python redis clients
$ sudo pip install redis
$ sudo pip install hiredis  # Needed for better performance of Python redis client

Let us start Zookeeper server and Redis servers. We will run all run all of them in the same machine as this post is for demonstration purpose only.

$ (zookeeper-install-directory)/bin/zkServer.sh start-foreground
$redis-server



Now let us look at the Python code samples. You may get the complete example in this github link.

I have also pasted the code below:

import sys
from atexit import register
from time import sleep
from random import randint
import logging
import uuid
from redis import ConnectionPool, Redis
from kazoo.client import KazooClient
from math import sqrt
from threading import Thread, Lock, Condition

SUBMITTED = 'subm'
PROCESSED = 'prcd'
ZIPPED = 'zpd'
UPLOADED = 'uploaded'

inited = False

ALLOWED_COMMANDS = ['watcher', 'jobsubmitter', 'jobexecutor']


def state_listener(state):
    print state


def create_path_if_not_exists(zk, path):
    '''
    Create the znode path if it is not existing already
    '''
    if not zk.exists(path):
        try:
            zk.ensure_path(path)
        except Exception as e:
            print e
            return False
    return True


def stop_zk(zkwrapper):
    zkwrapper.stop()


def init_redis():
    '''
    Connect to redis server. For this example, we are running
    Redis on the same machine
    '''
    pool = ConnectionPool(host='localhost', port=6379, db=0)
    r = Redis(connection_pool=pool)
    return r


class zk_wrapper:
    '''
    Callable class wrapping a zookeeper kazooclient object
    '''
    def __init__(self, zk):
        self.zk = zk
        self.state = ''
        register(stop_zk, self)

    def stop(self):
        self.zk.stop()

    def __call__(self, state):
        self.state = state


def init():

    global inited
    zk = None
    try:
        zk = KazooClient(hosts='127.0.0.1:2181')
        zk.add_listener(state_listener)
        zk.start()
        register(stop_zk, zk)
        create_path_if_not_exists(zk, '/jobs')
        create_path_if_not_exists(zk, '/watchers')
        create_path_if_not_exists(zk, '/watchlocks')
        create_path_if_not_exists(zk, '/executors')
    except Exception as e:
        print 'Zk problem ', e
        if zk is not None:
            zk.stop()
        sys.exit(1)

    inited = True
    return zk


class job_watcher:
    def register_myself(self):
        self.zk.create('/watchers/' + self.myid, ephemeral=True)

    def __init__(self):
        self.lock = Lock()
        self.zk = init()
        self.redis = init_redis()
        self.myid = uuid.uuid4().hex
        self.register_myself()
        self.my_jobs = {}
        children = self.zk.get_children('/jobs', watch=self)
        self.alljobs = children
        children = self.zk.get_children('/watchers', watch=self)
        self.watchers = children
        self.myindex = self.watchers.index(self.myid)
        self.num_watchers = len(self.watchers)
        self.lock_my_job_watches()
        self.job_watcher_thread = Thread(target=self, args=[None])
        self.job_watcher_thread.start()

    def unlock_my_jobs(self):
        self.lock.acquire()
        for job, lock in self.my_jobs.items():
            try:
                lock.release()
            except Exception as e:
                print 'Unlocking issue', e
        self.my_jobs.clear()
        self.lock.release()

    def stop_monitoring(self):
        self.stall_monitor = True

    def start_monitoring(self):
        self.stall_monitor = False

    def watch_for_completion(self):
        jobcount = {}
        self.lock.acquire()
        for job in self.my_jobs:
            try:
                vals = self.zk.get('/jobs/' + job)
                stat, count = vals[0].split('=')
                jobcount[job] = {'count': int(count), 'stat': stat}
            except Exception as e:
                print 'Job watch error ', e
                self.lock.release()
                return
        self.lock.release()
        times = 0
        while (not self.stall_monitor) and (times < 4):
            times += 1
            temp = ''
            self.lock.acquire()
            for job in self.my_jobs:
                try:
                    if (job not in jobcount) or jobcount[job]['stat'] != PROCESSED:
                        continue
                    processedcount = self.redis.hlen(job + '_completed')
                    if processedcount == jobcount[job]['count'] or processedcount == 0:
                        self.my_jobs[job].release()
                        self.zk.delete('/watchlocks/' + job)
                        self.redis.delete(job + '_completed')
                        self.zk.delete('/jobs/' + job)
                        print 'Job finished ' + job
                        temp = job
                        break
                except Exception as e:
                    print 'Monitor error ', e
            if temp != '':
                del self.my_jobs[temp]
                sleep(0.4)
            self.lock.release()

    def run(self):
        while True:
            if self.stall_monitor:
                sleep(1)
                continue
            self.watch_for_completion()

    def lock_my_job_watches(self):
        self.stop_monitoring()
        self.unlock_my_jobs()
        self.lock.acquire()
        for child in self.alljobs:
            slot = abs(hash(child)) % self.num_watchers
            if slot != self.myindex:
                continue
            lock = self.zk.Lock('/watchlocks/' + child)
            try:
                if lock.acquire(blocking=True, timeout=1):
                    self.my_jobs[child] = lock
            except Exception as e:
                print 'Lock problem ', e
        self.lock.release()
        if len(self.my_jobs) > 0:
            self.start_monitoring()

    def __call__(self, event):
        if event is None:
            '''
            I am not the zookeeper event callback
            '''
            self.run()
        if event.path == '/jobs':
            children = self.zk.get_children('/jobs', watch=self)
            self.alljobs = children
        else:
            self.watchers = self.zk.get_children('/watchers', watch=self)
            self.num_watchers = len(self.watchers)
            self.myindex = self.watchers.index(self.myid)
        self.lock_my_job_watches()


class job_executor:

    def register_myself(self):
        self.zk.create('/executors/' + self.myid, ephemeral=True)

    def __init__(self):
        zk = init()
        self.zk = zk
        self.lock = Lock()
        self.condition = Condition(self.lock)
        self.some_change = 0
        self.redis = init_redis()
        self.myid = uuid.uuid4().hex
        self.register_myself()
        self.my_jobs = {}
        children = zk.get_children('/jobs', watch=self)
        self.alljobs = children
        children = zk.get_children('/executors', watch=self)
        self.executors = children
        self.myindex = self.executors.index(self.myid)
        self.num_executors = len(self.executors)
        self.keep_running = True
        self.executor_thread = Thread(target=self, args=[None])
        self.executor_thread.start()

    def execute(self):
        self.my_jobs = filter(lambda x: (self.alljobs.index(x) % self.num_executors)
                              == self.myindex, self.alljobs)
        self.execute_jobs()

    def execute_jobs(self):
        some_change = self.some_change

        def isprime(number):
            number = abs(number)
            if number <= 1:
                return False
            if number <= 3:
                return True
            if number & 1 == 0:
                return False
            end = int(sqrt(number))
            i = 3
            while i <= end:
                if number % i == 0:
                    return False
                i += 2
            return True

        if some_change != self.some_change:
            return
        jobs = set()

        for job in self.my_jobs:
            if some_change != self.some_change:
                return
            try:
                jobval = self.zk.get('/jobs/' + job)
                stat, counts = jobval[0].split('=')
                if stat == SUBMITTED:
                    jobs.add(job)
            except Exception as e:
                print 'Problem happened ', e

        while len(jobs) > 0:
            for job in jobs:
                if some_change != self.some_change:
                    return
                try:
                    val = self.redis.lrange(job, 0, 0)
                    if val is None or len(val) == 0:
                        stat = PROCESSED
                        self.zk.set('/jobs/' + job, PROCESSED + '=' + counts)
                        jobs.remove(job)
                        break
                    ival = int(val[0])
                    if self.redis.hmset(job + '_completed', {ival: isprime(ival)}):
                        self.redis.lpop(job)
                except Exception as e:
                    print 'Some problem ', e
                    sys.exit(1)

    def run(self):
        while self.keep_running:
            self.execute()
            self.condition.acquire()
            self.condition.wait(1.0)
            self.condition.release()

    def __call__(self, event):
        if event is None:
            self.run()
        if event.path == '/jobs':
            children = self.zk.get_children('/jobs', watch=self)
            self.alljobs = children
        else:
            self.executors = self.zk.get_children('/executors', watch=self)
            self.num_executors = len(self.executors)
            self.myindex = self.executors.index(self.myid)
        self.some_change += 1
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()


def job_submitter_main():
    try:
        zk = init()
        cpool = ConnectionPool(host='localhost', port=6379, db=0)
        r = Redis(connection_pool=cpool)
        added = 0
        tried = 0
        max_add_try = 5000
        jobname = uuid.uuid4().hex
        added_nums = set()

        while tried < max_add_try:
            value = randint(5000, 90000000)
            tried += 1
            if value not in added_nums:
                added_nums.add(value)
            else:
                continue

            while True:
                try:
                    r.lpush(jobname, value)
                    added += 1
                    break
                except Exception as e:
                    sleep(1)
                    print "Lpush ", jobname, e

        zk = KazooClient(hosts='127.0.0.1:2181')
        zk.add_listener(state_listener)
        zk.start()
        value = SUBMITTED + "=" + str(added)
        zk.create('/jobs/' + jobname, value=value)
        zk.stop()

    except Exception as e:
        print 'Big problem in submitting job ', e
        sys.exit(1)
    print 'Job submitted ' + jobname


def watcher_main():
    job_watcher()


def job_executor_main():
    job_executor()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print 'Usage: ' + sys.argv[0] + ' command'
        print 'Valid commands are : ' + ', '.join(ALLOWED_COMMANDS)
        sys.exit(1)
    logging.basicConfig()
    if sys.argv[1] not in ALLOWED_COMMANDS:
        print sys.argv[1] + ' not a valid command'
        sys.exit(1)
    if sys.argv[1] == 'watcher':
        watcher_main()
        sleep(86400)
    elif sys.argv[1] == 'jobsubmitter':
        job_submitter_main()
        sleep(2)
    elif sys.argv[1] == 'jobexecutor':
        job_executor_main()
        sleep(86400)

The tasks here are some numbers and the executors checks if the numbers are prime or not. As I said before, this is just for demonstration purpose and hence the tasks are simplest possible tasks, in real life we don't need any distributed environment to check if the numbers are prime. The tasks (i.e. the numbers) are submitted to a queue in Redis in small batches. For the queue, we are just using lists here and hence and hence a job is submitted as a list numbers to a Redis list.

job_submitter_main is the function that submits the job to Redis. It pushes the list of numbers to Redis and also create a job description znode under the node /jobs node in Zookeeper.  The znode name and the name of the list created in Redis are same. The znode created for a job will have the data "subm=count" where count is the number of tasks for the job (so, it will be length of the corresponding list in Zookeeper, let us say it is the job size)

job_watcher is a callable class which watches the /watcher znode and also /jobs znode. All the job_watchers gets a notifications when new job description is created under the /jobs znode. The watchers shares the jobs to watch by following the below algorithm:
Let us say there are N watchers and J is the sorted list of incomplete jobs. All the watchers has the list of currently running registered watchers.  Each watcher sorts the list and find its position within the list. Watcher with position 0 will watch the jobs at index 0, N, 2N, 3N, 4N,....,  watcher with position 1 will watch the jobs at index 1, N + 1, 2N + 1, 3N + 1, 4N + 1, ....  in list J,  watcher will position n will watch the jobs at index n, N +n , 2N + n, 3N + n, 4N + n,  .,.. etc.
The watcher processes create an job_watcher object and start watching the jobs for its completion.

job_executor is the callable class which executes the jobs. The executor processes creates instances of job_executor class and start executing the jobs. The executors share the jobs for execution using a similar algorithm as done by the watchers. An executor completes a task and write the result in a hashmap  in Redis. The hashmap is named as _completed. Each key in the hashmap is a number and the value is 0 or 1. A value of zero indicates the number is not prime, 1 indicates the number is prime. Once a task is completely executed (i,e. the number is checked),  the executor removes the number from the job queue (list) and puts an entry in the hashmap for the result.

The job watchers keep checking size of the completed queues (hashmaps), and once the size becomes equal to the job size it assumes the job is complete. The watcher simply deletes the job node from zookeeper and the _completed hashmap from Redis.

We can run the different components as follows (save the above script in file distjob.py):
To submit jobs:
$ python distjob.py   jobsubmitter
We can submit any number of jobs and we don't really care if the job executors or watchers are running or not.

To run the watchers:
$ python distjob.py  watcher
We can run any number of watchers and we don't care how many job executors or job submitters are running.

To run the job executors:
$ python distjob.py  jobexecutor
We can run any number of job executors and we don't really care how many job submitters or watchers are running.

In this example, to demonstrate distributed job co-ordination using Zookeeper, we are running all the watchers, submitters, executors and Redis server and Zookeeper servers from the same node. But that need not be so. We can easily make it really distributed system. We need to just replace "localhost" with the distributed Zookeeper server list. Also, we also have to use the remote Redis server host or IP whereas in the example script above we are using "localhost" as the Redis server host.










Sunday, October 5, 2014

Docker the lightweight virtual machine on Linux platform

Docker is an open source technology that facilitates the deployment of software in containers by providing operating system-level virtualization and resource isolation. It is a very convenient tool for creating self contained software environments with its own view process tree, memory, network and installed softwares.

It is very lightweight compared the virtual machines while capable of providing a lot of facilities that the virtual machines can offer. For example, let us assume we are developing a real-time-big-data analytics product and we need Elasticsearch, Memcached and Cassandra clusters for our back-end storage. We start prototyping using a single instances for each of these components. Then we start working with real clusters and generally developers will have them installed on
virtual machines (Though we can create the clusters on the same machine by using different ports for each instance, but that is just inconvenient ways to do the things). VMs are generally heavy-weights. What if we can have some lightweight isolated process containers to run each instance of Cassandra or Elasticsearch or Memcached?  Yes, we can use Docker to solve our problem by running them in a "virtually real" distributed world !!


In this post I will explain how we create images for Docker containers and run them on Ubuntu 14.0 platform. But this can be easily mapped to other Linux distributions or other versions of Ubuntu as well.

Let us install Docker (docker.io) software first.

$sudo apt-get update
$sudo apt-get install docker.io

Now let us create a docker image based on Ubuntu 14.04 and add memcached to it. In short we will have a memcached docker images on Ubuntu 14.04.
Let us create a Docker file (Dockerfile) in a directory, say /home/geet/test and put the below lines:

# Dockerfile content
FROM ubuntu:14.04
MAINTAINER "Put your name"
RUN apt-get update
RUN apt-get install -y memcached
ENTRYPOINT ["/usr/bin/memcached" ]
VOLUME [ "/home/geet/sws" ]

The first line "FROM ubuntu:14.04"  specifies a base image ubuntu with tag 14.04. Docker build will look up the image in the current host, and if it cannot find it will download it from Docker hub which is a public Docker image registry.

MAINTAINER specifies the name of the maintainer and can be omitted.
RUN instruction executes the commands (i.e. here it will run "apt-get update" and  "apt-get install -y memcached").

ENTRYPOINT specifies the command to run when we run the image or containers based on the image. Note that memcached will run in foreground and not as a daemon. Because docker container will exit as soon as the command specified in the ENTRYPOINT exits. Hence we will keep the command running in foreground.

VOLUME specifies a list of external directories (in the host machine or other containers) which will be mounted by the container when started.

We create the new docker image issuing the below commands sample:

$ cd /home/geet/test  # this directory has the Dockerfile
$ sudo docker build -t "nipun/memcached:ubuntu14.0"  .

The "-t" option is used to give a name to the new image.

After the build is complete we will get a new image ready to run memcached. Let us check the output of the below commands:

 $ sudo docker images
REPOSITORY           TAG                 IMAGE ID               CREATED           VIRTUAL SIZE
nipun/memcached     ubuntu14.0     7416281a318d       6 hours ago         217.4 MB
ubuntu                      14.04               6b4e8a7373fe        3 days ago          194.9 MB

So, we have two images now and the image  nipun/memcached:nipun/memcached is create successfully.

Now let us run the images
$ sudo docker run -h m1  --name="memcache1" -P nipun/memcached:ubuntu14.0 -u root
#--Press control-C after few seconds
$ sudo docker run -h m2  --name="memcache2" -P nipun/memcached:ubuntu14.0 -u root
#--Press control-C after few seconds 

$ sudo docker run -h m3  --name="memcache3" -P nipun/memcached:ubuntu14.0 -u root
#--Press control-C after few seconds 

-P option is to expose all the ports opened in the container to the host. Memcached listens on port 11211 and because we used -P option, clients running on host will be able to connect to the memcached servers running on the containers.

Now let us run a test to check if our memcached cluster is running fine. We install our favourite python memcached client.

$ sudo docker start memcache1 memcache2 memcache3
$ sudo pip install python-memcached

And run the below script

# script memcache_test.py
#
import memcache

# Pass the list of servers to memcache.Clent API
client = memcache.Client(['172.17.0.5:11211', '172.17.0.6:11211', '172.17.0.7:11211'])
client.set('testkey', 'This is value for the testkey')
val = client.get('testkey')
print val
if val == 'This is value for the testkey' :
    print 'Got correct value. Success!!!!!'
client.disconnect_all()


$python memcache_test.py
The output...
This is value for the testkey
Got correct value. Success!!!!!

So, our pseudo distributed setup for memcached clusters where each memcached server is running in Docker container just worked !!

Thursday, September 4, 2014

Lets ZeroMQ

ZeroMQ is a high performance networking and messaging library. It can carry messages over TCP, multi-cast, in-proc,  inter-process transports. It provides messaging like request-reply, publish-subscribe, push-pull etc. Using these patterns as building blocks, complex messaging systems, execution pipelines can be built. The library is surprisingly easy to use and yet it delivers lightning performance.

In this blog, I will describe how to use ZeroMQ with some very Simple examples. Then I will show  how these building blocks can be used to assemble to build complex systems. At the end I will provide a brief regarding how ZeroMQ works internally.

The examples are in C++, but can be mapped easily to any other languages. ZeroMQ has bindings for more than 40 languages. I will submit python implementation of the same shortly in my github repository.


The built-in ZeroMQ messaging patterns are Request-Reply, Publish-Subscribe, Push-Pull etc.

Requst-Reply

 This pattern connects a set of clients to a set of servers. The service requests from clients get distributed across the servers and the clients may get the response from any of the servers.

A simple example is given below. The client keeps on calling send and recv in a loop while the server keeps on calling recv and send in its loop. The client sends a request, the server receives and process the request and send the response back to the client.



server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
int main ()
{
  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REP);
  socket.bind ("tcp://*:5555");
  int i = 0;
  while (true){
      zmq::message_t request;
      socket.recv (&request);
      std::cout << "Received " << (char *)request.data() << " #times: " << i++ << std::endl;
      zmq::message_t reply (1024);
      memcpy ((void *) reply.data (), "World", 6);
      socket.send (reply);
  }
  socket.close();
  context.close();
  return 0;
}

client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>

int main ()
{
  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REQ);
  socket.connect("tcp://127.0.0.1:5555");
  int i  = 0;
  while (true){
      zmq::message_t request (1024);
      memcpy ((void *) request.data (), "Hello", 6);
      socket.send (request);
      zmq::message_t reply;
      socket.recv (&reply);
      std::cout << "Received " << (char *)reply.data() 
          << "#times " << i++  << std::endl;
  }
  socket.close();
  context.close();
  return 0;
}

Here the server is waiting for request in an infinite loop. Whenever it receives a request, it processes the request and send an appropriate response. In this example, the response is always the same, a string "World" !
The client is keeps sending the request in an infinite loop and it always sends the same request "Hello"!
Both the client and server creates a ZeroMQ context.
The server creates a socket of type ZMQ_REP using the context and binds the socket to the transport "tcp://*:5555". This will effectively bind the socket to all the servers' IPs and port 5555. Now the server can recv requests from any number of clients.
The client  connects to server over the TCP transport "tcp://127.0.0.1:5555". Now it can send requests to server and receive responses.

Now let us see how a set of clients can communicate with a set of servers for getting the work done and receiving the status of the work. This is distributed RPC where a set of servers serve a set of clients. The client connects to all the servers and each request is distributed in a round-robin manner to all the servers. Application code does the minimal thing and the ZeroMQ library handle everything transparently.



Slightly modified code is below:
 server_arg_port.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>

using std::cerr;
using std::cout;
using std::endl;

int main (int argc, char *argv[])
{
  if (argc != 2) {
    cerr << argv[0] << " " << "bind-port" << endl;
    exit(1);
  }

  char transport[255] = "";
  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REP);
  snprintf(transport, 255, "tcp://*:%s", argv[1]);
  socket.bind (transport);

  char response[2048]; 
  memset(response, 0, 2048);
  snprintf(response, 2048, "Response from server lisetning on %s", argv[1]);
  int i = strlen(response);
  int times = 0;
  
  while (true){
      zmq::message_t request;
      socket.recv (&request);
      zmq::message_t reply (2048);
      snprintf(response + i , 2048, " #%d -- request data: %s", 
              times++, (char *)request.data());
      memcpy ((void *) reply.data (), response, strlen(response));
      socket.send (reply);
  }
  socket.close();
  context.close();
  return 0;
}

client_connect_multiple_server.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

using std::cerr;
using std::cout;
using std::endl;

int main (int argc, char *argv[])
{
  if (argc < 2){
      cerr << "Usage: " << argv[0] << " server1-port server2-port ... ";
      exit(1);
  }

  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REQ);

  int i  = 1;
  char transport[128];

  while (i < argc) {
    snprintf(transport, 128, "tcp://127.0.0.1:%s", argv[i++]);
    socket.connect(transport);
  }

  char request_data[1024];
  i = 0;
  while (true){
      zmq::message_t request (1024);
      snprintf(request_data, 1024, "Hello %08d", i++); 
      memcpy ((void *) request.data (), request_data, strlen(request_data));
      socket.send (request);
      zmq::message_t reply;
      socket.recv (&reply);
      cout << "Received " << (char *)reply.data() << endl;
      sleep(1);
  }
  socket.close();
  context.close();
  return 0;
}

As we see, each client connects to a set of servers. Servers are capable of handling multiple clients. Each server actually is capable of handling 1000s of clients. This due to the powerful asynchronus socket communication mechanism ZeroMQ uses internally. On Linux platform it uses epoll, on BSD it uses kqueue that facilitates to create servers capable of handling large number of connections.

Compile and run the above examples as as shown below:
$ g++ -o server_arg server_arg_port.cpp -lzmq
$ g++ -o client_connect_to_many client_connect_multiple_server.cpp -lzmq

Start three servers in the background:
$server_arg 5555 &
$server_arg 5556 &
$server_arg 5557 &

Start a client that connects to all of the servers:
$ ./client_connect_to_many 5555 5556 5557

Sample output from the client is given below. We can see the requests are distributed in round-robin manner among the servers.

Received Response from server lisetning on 5555 #0 -- request data: Hello 00000000
Received Response from server lisetning on 5556 #0 -- request data: Hello 00000001
Received Response from server lisetning on 5557 #0 -- request data: Hello 00000002
Received Response from server lisetning on 5555 #1 -- request data: Hello 00000003
Received Response from server lisetning on 5556 #1 -- request data: Hello 00000004
Received Response from server lisetning on 5557 #1 -- request data: Hello 00000005
Received Response from server lisetning on 5555 #2 -- request data: Hello 00000006
Received Response from server lisetning on 5556 #2 -- request data: Hello 00000007
Received Response from server lisetning on 5557 #2 -- request data: Hello 00000008
Received Response from server lisetning on 5555 #3 -- request data: Hello 00000009
Received Response from server lisetning on 5556 #3 -- request data: Hello 00000010
Received Response from server lisetning on 5557 #3 -- request data: Hello 00000011
Received Response from server lisetning on 5555 #4 -- request data: Hello 00000012

.....

Now let us kill a server process. Then the client will block forever in some send or receive. There is no easy way to recover from the situation. Below is an example of a workaround, but this is also not full-proof. We need to ensure that client or server doesn't issue a recv before a send is satisfied or issue a send before a previous read  is satisfied. Otherwise it will through an exception "Operation cannot be accomplished in current state". Once we restart the particular server again, it will start working fine. But bottom line is if a server which we connected initially goes down, the request-reply pattern doesn't work. I will explain in a later example how to address such problems and make the pattern reliable.

client_connect_multiple_server_timeout.cpp
 
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

using std::cerr;
using std::cout;
using std::endl;

int main (int argc, char *argv[])
{
  if (argc < 2){
      cerr << "Usage: " << argv[0] << " server1-port server2-port ... ";
      exit(1);
  }

  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REQ);
  int timeout = 4000;
  socket.setsockopt(ZMQ_SNDTIMEO, &timeout, sizeof(int)); 
  socket.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(int)); 

  int i  = 1;
  char transport[128];

  while (i < argc) {
    snprintf(transport, 128, "tcp://127.0.0.1:%s", argv[i++]);
    socket.connect(transport);
  }

  char request_data[1024];
  i = 0;
  while (true){
      do {
        zmq::message_t request (1024);
        snprintf(request_data, 1024, "Hello %08d", i++); 
        memcpy ((void *) request.data (), request_data, strlen(request_data));
        if ( socket.send (request) == false) {
            cout << "Some error in sending request " << endl;
            continue;
        }
        break;
      } while (true);
      do {
        zmq::message_t reply;
        if (socket.recv (&reply) == 0) {
            cout << "Some error in read " << endl;
            continue;
        }
        cout << "Received " << (char *)reply.data() << endl;
        break;
      }while (true);
      sleep(1);
  }
  socket.close();
  context.close();
  return 0;
}

Sunday, January 5, 2014

Java Event Bus with Guava

Sometimes we need to different modules running in a Java process to communicate with each other. Especially threads will communicate or pass messages between them. IPC is never a good option in such scenarios. Message queue within the same process is even less appealing than a linked list.
Also, coding the stuffs ourselves tedious and we should look around to see if somebody already did that and published the same.

Google Guava library is one of the most popular Java library. This also includes the "EventBus" module.  EventBus provides publish-subscribe-style communication between components without requiring the components to explicitly register with one another. Here the publisher and subscribers communicate via the highway, the shared event bus.
The working is simple. We create an EventBus, the subscribers register themselves to the event bus, and the publisher publish or post the "messages" to the event bus. The event bus takes care of delivering the messages to the  appropriate subscribers. There are synchronous and asynchronous  event buses. In synchronous event bus, when a publisher posts a "message",  it will not return till all the subscribers receive the message. In asynchronous event bus, the subscribers are invoked in different threads.

Now let us look at a simple example. There are many publishers who keep publishing the delta of some counters while there is an aggregator which receives the counters and add the delta (It may persist them in database or file etc.)

As the counter publishers and the aggregator will communicate with a common event bus (may be asynchronous or synchronous depending our need, but in most real world cases we generally have to go with asynchronous bus).

Below is the asynchronus event bus provider module:

package geet.example.eventbus;
import java.util.concurrent.Executors;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;

public class CounterAsyncEventBus{
 private static EventBus bus = new AsyncEventBus(Executors.newFixedThreadPool(1));
 
 public static EventBus getEventBus(){
  return bus;
 }
}


If we want a synchronous event bus, below is the provider:
package geet.example.eventbus;

import com.google.common.eventbus.EventBus;

public class CounterEventBus{
 private static EventBus bus = new EventBus();
 
 public static EventBus getEventBus(){
  return bus;
 }
}



Below is the Counter class, complex :)
package geet.example.eventbus;

public class Counter {
 private String counterName = null;
 private Integer counterDelta = null;
 public Counter(String counterName, Integer counterDelta) {
  this.counterName = counterName;
  this.counterDelta = counterDelta;
 }
 public String getCounterName() {
  return counterName;
 }
 public void setCounterName(String counterName) {
  this.counterName = counterName;
 }
 public Integer getCounterDelta() {
  return counterDelta;
 }
 public void setCounterDelta(Integer counterDelta) {
  this.counterDelta = counterDelta;
 }
}




Counter Aggregator code is below. The counter aggregator object gets an instance of the common event bus and then registers itself to the event bus. The constructor also takes a boolean flag as an argument which acts an indicator whether to use synchronous or the asynchronous event bus. Also, note the "@Subscribe" annotation which tells that if publisher posts a "Counter", the method "addCount" has to be invoked.

package geet.example.eventbus;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class CounterAggregator {
 private ConcurrentHashMap<String, AtomicInteger> counters = null;
 private EventBus bus = null;

 public CounterAggregator(boolean async) {
  counters = new ConcurrentHashMap<String, AtomicInteger>();
  if (async)
   bus = CounterAsyncEventBus.getEventBus();
  else
   bus = CounterEventBus.getEventBus();
  bus.register(this);
 }

 @Subscribe
 @AllowConcurrentEvents
 public void addCount(Counter counter) {
  Integer val = counter.getCounterDelta();
  String counterName = counter.getCounterName();

  AtomicInteger oldval = counters.putIfAbsent(counterName,
    new AtomicInteger(val));
  if (oldval != null) {
   oldval.addAndGet(val);
  }
  System.out.println(Thread.currentThread().getName()
    + " Value for the counter " + counterName + "="
    + counters.get(counterName));

 }
}




Now below is the code for counter generators. Here also counter generators get the asynchronous or synchronous event bus instance. The "publish" method internally posts Counter objects to the event bus which will be consumed by the subscriber.

package geet.example.eventbus;

import com.google.common.eventbus.EventBus;
public class CounterGenerator {
 private EventBus bus = null;

 public CounterGenerator(boolean async) {
  if (async)
   bus = CounterAsyncEventBus.getEventBus();
  else
   bus = CounterEventBus.getEventBus();
 }

 public void publish(String name, int val) {
  bus.post(new Counter(name, val));
 }

}


Now we have everything set for the event bus demo application. The code is given below. In this example I am showing how to work with asynchronous event bus, but this can easily modified to use synchronous event bus also. It creates a counter generator, a counter aggregator. Counter generator publishes some delta for the counters c1,c2,c3 and c4. The aggregator keeps accumulating the counts. Bottom line is, the event bus elegantly handles the routing of the messages internally saving us from writing a lot of tedious code and reinventing another wheel :)

package geet.example.eventbus;

import java.util.Random;

public class Example {
 public static void main(String[] args) {
  String []counterNames = {"c1", "c2", "c3", "c4" };
  Random random = new Random();
  @SuppressWarnings("unused")
  CounterAggregator aggr = new CounterAggregator(true);
  CounterGenerator gen = new CounterGenerator(true);
  int i = 0;
  while (i++ < 10000){
   gen.publish(counterNames[i & 3], random.nextInt(50000));
  }
 }
}

You may access the all the samples from this github link   

Monday, August 12, 2013

Bytebuffer class in C++

I was wondering if a class similar to Java ByteBuffer is available in C++ is as well. Strinstream works, but it doesn't have the control what Java ByteBuffer can offer. It does a very simple job. But if a class is already available, then it becomes convenient for the developers. There are many libraries that can generate the data encoding code for us. Good examples are Apache Thrift and Google protobuf.  But there are many softwares requiring messages to be encoded in some other formats hoping they won't be accessible only by the languages supported by Thrift, Protobuf etc.  

My implementation for ByteBuffer in C++ is accessible at this github link.
There is a test program (test.cpp) which demonstrates the operation.
 This can be compiled as shown below:
$g++ -I .  test.cpp bytebuffer.cpp bytebuffer_exception.cpp

Then run it as shown below:
$./a.out

The source for the test program is pasted below:

#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <bytebuffer.hpp>
#include <stdio.h>

using namespace GeetPutula;
using namespace std;

int main()
{
    ByteBuffer buffer(256, ByteBuffer::LITTLE);
    buffer.putInt32(102);
    buffer.rewind();
    int32_t val = buffer.getInt32();
    cout << val << endl;
    val = buffer.getInt32();
    cout << val << endl;
    size_t position = buffer.currentPosition();
    cout << buffer.position(1024) << endl;
    cout << buffer.position(1024) << endl;
    if (buffer.putInt32(100) == false) {
        cout << "Worked as expected \n";
    }
    buffer.position(position);
    buffer.putBytes((void *)"this is my string", strlen("this is my string") + 1);
    void *buf = malloc(strlen("this is my string") + 1);
    buffer.position(position);
    buffer.readBytes(buf, strlen("this is my string") + 1);
    cout << (char *) buf << endl;
    free(buf);
    position = buffer.currentPosition();
    buffer.putInt16(45);
    buffer.position(position);
    cout << buffer.getInt16() << endl;
    cout << "Current position " << buffer.currentPosition() << endl;
    position = buffer.currentPosition();
    buffer.putInt64(-123456789123400);
    buffer.position(position);
    cout << buffer.getInt64() << endl;
    position = buffer.currentPosition() ;
    cout << "Current position " << position << endl;
    buffer.putFloat(-1223.3233);
    buffer.position(position);
    float f  = buffer.getFloat() ;
    printf("%f floattttt\n", f);
    
    position = buffer.currentPosition() ;
    cout << "Current position " << position << endl;
    buffer.putDouble(-1223879967.3233129);
    buffer.position(position);
    cout << buffer.getDouble() << endl;
    buffer.position(position);
    double x = buffer.getDouble();
    printf("%lf \n", x);
    return 0;
}

Monday, July 15, 2013

Daemontools compilation problem on Linux 64 bit system

Recently I was working on evaluating daemontools for some components which need to be run as Linux services and which are to be restarted if they goes down for some reason. I downloaded daemontools and followed the instruction to build it. But it failed with the below error:

./makelib unix.a alloc.o alloc_re.o buffer.o buffer_0.o buffer_1.o \
buffer_2.o buffer_get.o buffer_put.o buffer_read.o buffer_write.o \
coe.o env.o error.o error_str.o fd_copy.o fd_move.o fifo.o lock_ex.o \
lock_exnb.o ndelay_off.o ndelay_on.o open_append.o open_read.o \
open_trunc.o open_write.o openreadclose.o pathexec_env.o \
pathexec_run.o prot.o readclose.o seek_set.o sgetopt.o sig.o \
sig_block.o sig_catch.o sig_pause.o stralloc_cat.o stralloc_catb.o \
stralloc_cats.o stralloc_eady.o stralloc_opyb.o stralloc_opys.o \
stralloc_pend.o strerr_die.o strerr_sys.o subgetopt.o wait_nohang.o \
wait_pid.o
./load envdir unix.a byte.a
/usr/bin/ld: errno: TLS definition in /lib64/libc.so.6 section .tbss mismatches non-TLS reference in envdir.o
/lib64/libc.so.6: could not read symbols: Bad value
collect2: ld returned 1 exit status
make: *** [envdir] Error 1

The problem was solved by including the standard errno.h  header file in error.h (extracted to admin/daemontools-0.76/src/error.h from daemontools-0.76.tar.gz) in the source. And we can use the nice tool without any issue :):)

Friday, April 5, 2013

Executing commands on multiple Linux machines


Here is a small and convenient tool to execute same command over a set of Linux machines.
To use the tool we need to configure the machines for password-less SSH, also need Perl module Net::OpenSSH. We have to create a file $HOME/.allhosts and put the IP addresses or host names in this file in separate lines. 

Configuring password-less access to all the machines you have

generate punblic and private keys
$ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

copy the public and private keys to all the machines you want to passwordless access:
scp ~/.ssh/id_rsa.pub userid@machine-ip:~/.ssh/id_rsa.pub
scp ~/.ssh/id_rsa userid@machine-ip:~/.ssh/id_rsa

Add the public key to the authorized keys files on every machines:
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Now you will be able to ssh from any of these machines to any of these machines without a password

Install perl Net::OpenSSH module as shown below:
$sudo yum install perl-CPAN
$sudo perl -MCPAN -e "install Net::OpenSSH" 

Examples
to kill all the java processes running in the machines listed in yout $HOME/.allhosts, issue the below command:
$ perl multihostcmd.pl  "ps -ef  | grep java  | grep -v grep | awk ' { print \$2 }'  | xargs kill -9"
Of course you will need permission to kill those processs

To check all the users logged to these machines, issue the below command
$ perl multihostcmd.pl  who

Below is the script (you may also access it in github ):

#!/usr/bin/perl                                
############################################################################
# Save this in a a file, say multihostcmd.pl                                            
############################################################################   

use threads (
    'yield', 
    'stack_size' => 64 * 4096,
    'exit'       => 'threads_only',
    'stringify'                    
);                                 
use Cwd qw(abs_path);              
use Net::OpenSSH;                  

#
# executeCmdRemote
# executes the command on a remote host 
# command execution happens over ssh    
#                                       

sub executeCmdRemote {
    my ( $host, $command, @others ) = @_;
    my $ssh =  Net::OpenSSH->new($host,  
         master_opts => [-o => "ConnectionAttempts=2", -o => "ConnectTimeout=5"] );
    if ($ssh->error) {                                                             
        return;                                                                    
    }                                                                              
    ($out, $err, @others) =  $ssh->capture2({timeout => 20}, $command);            
    $outdata = "FROM $host ******************************************\n";          
    if ($out ne "") {                                                              
        $outdata .= $out;                                                          
    }                                                                              
    if ($err ne "") {                                                              
        $outdata .= $err;                                                          
    }                                                                              
    $outdata .= "\nEND OF DATA FROM $host ************************************* \n\n";
    print $outdata;                                                                   
}                                                                                     

#
# readHostFile
# reads from the file $HOME/.allhosts where this file contain a list of hosts
# each line on this file denotes a host name or host ip                      
# The given command is executed on this host                                 
#                                                                            
sub readHostFile {                                                           
    my ($array, @others) = @_;                                               
    $hostfile = $ENV{'HOME'} . '/.allhosts';                                 
    if (! -e  $hostfile) {                                                   
        print $hostfile . ' doesn\'t exist';                                 
        return;                                                              
    }                                                                        
    if (! -f  $hostfile) {                                                   
        print $hostfile . ' is not a regular file';                          
        return;                                                              
    }                                                                        
    open FH, "< $hostfile"  || return;                                       
    @$array = <FH>;                                                          
    close FH;
}

sub main {
    if ( $#ARGV < 0 ) {
        $executing_script = abs_path($0);
        print "Usage: $executing_script command-to-execute\n";
        exit(1);
    }
    $cmdline = join( ' ', @ARGV );
    my @hostarry = ();
    readHostFile(\@hostarry);
    if ($#hostarry < 0) {
        print "Empty list for hosts";
        exit(1);
    }

    my $thr = undef;
    my @allthreads = ();
    foreach my $host (@hostarry) {
       $host =~ s/^\s+//;
       $host =~ s/\s+$//;
       if  ($host eq "") {
           next;
       }
       # Create a thread to execute the command on the remote host
       $thr = threads->create('executeCmdRemote', $host, $cmdline);
       push @allthreads, $thr;
    }
    foreach $thr (@allthreads){
        $thr->join();
    }
}

# Call main
main();

Tuesday, December 18, 2012

Ehcache or Memcached ?



Should we use memcached or should we go with Java based ehcache? It is not a million dollar question, but at least qualifies as a 100 dollar question :)
I think ehcache be a good option if it is used as an embedded cache in the application server, as it won’t involve sockets and serializations. Of course the application or application server must be in Java.
For all other modes, I think performance of ehcache will suffer as it will involve serialize/unserialize of data and data transfer will happen over sockets.  Then I doubt ehcache will be able perform better than memcached.

One major advantage of memcached  is that it has clients almost in every languages. The disadvantage is,  it doesn’t support persistence natively.  Membase (or couchbase) and memcachedb have data persistence  and they speak memcached protocol.  But do we need the memory cache to be persistent anyway ?

Ehcache supports HA. Then it cannot act as an embedded cache alone. 
HA in memcached can also be achieved very easily though generally the clients don’t support it directly. We need to just put the key-value pair in the memcached server that key is hashed to , and also in the next memcached server in the cluster (or also putting the key-value on a backup cluster of memcached servers).

Achieving elasticity with memcached looks like a big thing, because I don’t know if we can list all the keys in a memcached servers. There is the “stats” command for getting a list of keys, but no idea if it returns all the keys. If we can get the list of all keys we can use “extendable hashing” to shuffle few keys across the nodes. Only thing is clients also need to be made aware  of addition/deletion of new memcached servers in a cluster of memcached servers.  I am wondering how in a HA environment ehcache does that,  basically how the clients (especially the REST based non-Java clients) will know of the new destination for a key in cache of there is a change in number of ehcache nodes in the cluster.

I guess for embedded cache (for JAVA apps only) ehcache will be better.
In all other cases  memcached might  be better, but that is my thought only.

Monday, December 17, 2012

Simple program to list files sorted by size in a directory recursively

Below is a simple program to list files in a directory recursively and sort them by their sizes. I am using this tool in my work for some time as I want to detect huge files in some directories to free up disk spaces by deleting them. Hopefully it will be useful for you as well and hence I am posting it here :)   I am using boost filesystem library to do the real work.

#include <iostream>                                            
#include <iterator>                                            
#include <queue>                                               
#include <vector>                                              
#include <map>                                                 
#include <algorithm>                                           
#include <boost/filesystem.hpp>                                

using namespace std;
using namespace boost::filesystem;

int main(int argc, char *argv[])
{                               
    queue<path> directories;    
    multimap<uintmax_t, path> filesizes;

    if (argc < 2) {
        cout << "Usage: " << argv[1] <<  " path\n";
        return 1;                                  
    }                                              
    path p(argv[1]);                               

    try {
        if (exists(p)){
            if (is_regular_file(p)){
                cout << file_size(p) << "  " << p << endl;
                return 0;                                 
            }

            if (!is_directory(p)) {
                return 0;
            }
            directories.push(p);
            vector <path> entries;
            path thispath;
            while (!directories.empty()) {
                thispath = directories.front();
                directories.pop();
                copy(directory_iterator(thispath), directory_iterator(), back_inserter(entries));
                for (vector <path>::iterator it = entries.begin();
                        it != entries.end(); ++it) {
                    if (!exists(*it))
                        continue;
                    if (!is_symlink(*it) && is_directory(*it)) {
                        directories.push(*it);
                    } else if (is_regular(*it)) {
                        uintmax_t file_siz = file_size(*it);
                        filesizes.insert(pair<uintmax_t, path>(file_siz, *it));
                    }
                }
                entries.clear();
            }
        }
        for (multimap<uintmax_t, path>::iterator it1 = filesizes.begin(), it2 = filesizes.end();
                it1 != it2; ++it1) {
            cout << it1->first << " " << it1->second << endl;
        }
        filesizes.clear();
    } catch(const filesystem_error & ex) {
        cout << ex.what() << '\n';
    }
    return 0;
}



Tuesday, December 11, 2012

Apache Zookeeper examples for distributed configuration service, distributed registry, distributed coordination etc

Apache Zookeeper is an excellent piece of software that is really helpful for achieving the below stuffs in distributed systems:

  • coordination 
  • synchronization
  • lock service
  • configuration service
  • naming registry
etc. etc.
Zookeeper supports high availability by running multiple instances of the service. In case of one of the server that the clients are connecting to goes down, then it will switch over another server transparently. Great thing is irrespective of the server a client may connect to, it will see the updates in the same order. It is a high performance system and can be used for large distributed systems as well.
Zookeeper lets clients coordinate by a shared hierarchical namespace. It is a tree like structure as in normal file system. A client will be connected to a single zookeeper server (as long as the server is accessible).

Please go through Apache Zookeeper getting started guide for how to build, configure zookeeper first.
All these examples are available  in github.

Below is an example (zoo_create_node.c) for how to create Znodes (nodes in the zookeeper database).
In this example we demonstrate a simple way to create nodes in the zookeeper
server. Twenty nodes will be created with path /testpath0, /testpath1,     
/testpath2, /testpath3, ....., /testpath19.                                
All these nodes will be initialized to the same value "myvalue1".          
We will use zookeeper synchronus API to create the nodes. As soon as our   
client enters connected state, we start creating the nodes.                

All the examples used latest stable version of zookeeper that is version
3.3.6                                                                  
We may use the zookeeper client program to get the nodes and examine their
contents.                                                                
Suppose you have downloaded and extracted zookeeper to a directory       
/home/yourdir/packages/zookeeper-3.3.6 . Then after you build the C libraries
,they will be available at /home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/
and the c command line tool will available at                                
/home/yourdir/packages/zookeeper-3.3.6/src/c. The command line tools are cli_st
and cli_mt and cli. They are convenient tool to examine zookeeper data.       

Compile the below code as shown below:
$gcc -o testzk1 zoo_create_node.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Make sure that your LD_LIBRARY_PATH includes the zookeeper C libraries to run
the example. Before you run the example, you have to configure and run the  
zookeeper server. Please go through the zookeeper wiki how to do that.      
Now you run the example as shown below:                                     
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port 
22181 and IP 127.0.0.1                                                      
Now use one of the cli tools to examine the znodes created and also their   
values.

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  


int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        if (connected) {                                       
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_create(zh, mypath, "myvalue1", 9, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
                if (rc){                                                                  
                    printf("Problems %s %d\n", mypath, rc);                               
                }                                                                         
                x++;                                                                      
            }                                                                             
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
        if (2 == connected ) {
            // We created the nodes, so we will exit now
            zookeeper_close(zh);
            break;
        }
    }
    return 0;
}


In the below example (zoo_get_node.c) we demonstrate a simple way to get nodes in the zookeeper
server. Twenty nodes with path /testpath0, /testpath1,                       
/testpath2, /testpath3, ....., /testpath19 will be examined and value        
associated with them will be printed.                                        
We will use zookeeper synchronus API to get the value of the nodes. As soon  
as our client enters connected state, we start getting the node values.      

Compile the code as shown below:
$gcc -o testzk1 zoo_get_node.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Now you run the example as shown below:                          
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port
22181 and IP 127.0.0.1                                                    

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watcherforwget(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                               
{                                                                         
    char *p = (char *)context;                                            
    if (type == ZOO_SESSION_EVENT) {                                      
        if (state == ZOO_CONNECTED_STATE) {                               
            connected = 1;                                                
        } else if (state == ZOO_AUTH_FAILED_STATE) {                      
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {                  
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        }                                                                 
    }                                                                     
    printf("Watcher context %s\n", p);                                    
}                                                                         

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        char buffer[255];                                      
        struct Stat st;                                        
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        memset(buffer, 0, 255);                                
        if (connected) {                                       
            char mycontext[] = "This is context data for test";
            int len = 254;                                     
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_wget(zh, mypath, watcherforwget , mycontext, buffer, &len, &st);
                if (ZOK != rc){                                                          
                    printf("Problems %s %d\n", mypath, rc);                              
                } else if (len >= 0) {                                                   
                   buffer[len] = 0;                                                      
                   printf("Path: %s Data: %s\n", mypath, buffer);                        
                }                                                                        
                x++;                                                                     
                len = 254;                                                               
            }                                                                            
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
        if (2 == connected ) {
            // We created the nodes, so we will exit now
            zookeeper_close(zh);
            break;
        }
    }
    return 0;
}

In this example below (zoo_data_watches.c) we demonstrate a simple way to watch nodes in the zookeeper                                          
server. Twenty nodes with path /testpath0, /testpath1, /testpath2, /testpath3, .....,                                      
/testpath19 will be watches for any changes in their values.                                                               
We will use zookeeper synchronus API to watch the nodes. As soon                                                           
as our client enters connected state, we start putting watches for the nodes.                                              

Compile the below code as shown below:
$gcc -o testzk1 zoo_data_watches.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Now you run the example as shown below:                          
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port
22181 and IP 127.0.0.1                                    

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watcherforwget(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                               
{                                                                         
    char buffer[255];                                                     
    int len, rc;                                                          
    struct Stat st;                                                       
    char *p = (char *)context;                                            
    if (type == ZOO_SESSION_EVENT) {                                      
        if (state == ZOO_CONNECTED_STATE) {                               
            return;                                                       
        } else if (state == ZOO_AUTH_FAILED_STATE) {                      
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {                  
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        }                                                                 
    } else if (type == ZOO_CHANGED_EVENT) {                               
        printf("Data changed for %s \n", path);                           
        len = 254;                                                        
        //get the changed data and set an watch again                     
        rc = zoo_wget(zh, path, watcherforwget , mycontext, buffer, &len, &st);
        if (ZOK != rc){                                                        
            printf("Problems %s %d\n", path, rc);                              
        } else if (len >= 0) {                                                 
           buffer[len] = 0;                                                    
           printf("Path: %s changed data: %s\n", path, buffer);                
        }                                                                      
    }                                                                          

    printf("Watcher context %s\n", p);
}                                     

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        char buffer[255];                                      
        struct Stat st;                                        
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        memset(buffer, 0, 255);                                
        if (connected) {                                       
            //Put the watches for the nodes                    
            int len = 254;                                     
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_wget(zh, mypath, watcherforwget , mycontext, buffer, &len, &st);
                if (ZOK != rc){                                                          
                    printf("Problems %s %d\n", mypath, rc);                              
                } else if (len >= 0) {                                                   
                   buffer[len] = 0;                                                      
                   printf("Path: %s Data: %s\n", mypath, buffer);
                }
                x++;
                len = 254;
            }
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


In this example below (zoo_data_watches.c) we demonstrate a simple way to watch the appearance of a node
in the server. It will also watch if the node is deleted after it was created.

In this program we will watch for the appearance and deletion of a node
"/testforappearance". Once we create the node from another program, the watch
event will be sent to the client and the watcher routine woll be called.    
 *                                                                          

Compile the below code as shown below:
$gcc -o testzk1 zoo_exist_watch.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port 22181 and
IP 127.0.0.1                                                                         

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h> 
#include <time.h>     
#include <stdlib.h>   
#include <stdio.h>    
#include <string.h>   
#include <errno.h>    

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;      
static clientid_t myid;    
static int connected;      
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                       
{                                                                 
    if (type == ZOO_SESSION_EVENT) {                              
        if (state == ZOO_CONNECTED_STATE) {                       
            connected = 1;                                        
        } else if (state == ZOO_AUTH_FAILED_STATE) {              
            zookeeper_close(zzh);                                 
            exit(1);                                              
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {          
            zookeeper_close(zzh);                                 
            exit(1);                                              
        }                                                         
    }                                                             
}                                                                 

void watchexistence(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                              
{                                                                        
    static struct Stat st;                                               
    int rc;                                                              

    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            return;                       
        } else if (state == ZOO_AUTH_FAILED_STATE) {
            zookeeper_close(zzh);                  
            exit(1);                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
            zookeeper_close(zzh);                      
            exit(1);                                   
        }                                              
    } else if (type == ZOO_CREATED_EVENT) {            
        printf("Node appeared %s, now Let us watch for its delete \n", path);
        rc = zoo_wexists(zh, path,                                          
                watchexistence , mycontext, &st);                           
        if (ZOK != rc){                                                     
            printf("Problems  %d\n", rc);                                   
        }                                                                   
    } else if (type == ZOO_DELETED_EVENT) {                                 
        printf("Node deleted %s, now Let us watch for its creation \n", path);
        rc = zoo_wexists(zh, path,                                           
                watchexistence , mycontext, &st);                            
        if (ZOK != rc){                                                      
            printf("Problems  %d\n", rc);                                    
        }                                                                    
    }                                                                        
}                                                                            

int main(int argc, char *argv[])
{                              
    int rc;                    
    int fd;                    
    int interest;              
    int events;                
    struct timeval tv;         
    fd_set rfds, wfds, efds;   

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                         
    }                                                    

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);       
    hostPort = argv[1];                    

    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                
        return errno;                                         
    }                                                         

    while (1) {
        static struct Stat st;

        zookeeper_interest(zh, &fd, &interest, &tv);
        usleep(10);                                
        if (connected == 1) {                      
            // watch existence of the node         
            usleep(10);                            
            rc = zoo_wexists(zh, "/testforappearance",
                    watchexistence , mycontext, &st);
            if (ZOK != rc){
                printf("Problems  %d\n", rc);
            }
            connected++;
        }
        if (fd != -1) {
            if (interest & ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


In this example below(zoo_children_watch.c) we demonstrate a simple way to monitot the appearance of children znode in a path
in the server. We will check znode /testpath1 for its children and will examine if any children is
added or deleted under this path.

Compile the below code as shown below:
$gcc -o testzk1 zoo_childten_watch.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
*
Make sure that your LD_LIBRARY_PATH includes the zookeeper C libraries to run the example. Before
you run the example, you have to configure and run the zookeeper server. Please go through the
zookeeper wiki how to do that. Now you run the example as shown below:
./testzk1 127.0.0.1:22181 # Assuming zookeeper server is listening on port 22181 and IP 127.0.0.1

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watchchildren(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                              
{                                                                        
    struct String_vector str;                                            
    int rc;                                                              

    printf("The event path %s, event type %d\n", path, type);
    if (type == ZOO_SESSION_EVENT) {                         
        if (state == ZOO_CONNECTED_STATE) {                  
            return;                                          
        } else if (state == ZOO_AUTH_FAILED_STATE) {         
            zookeeper_close(zzh);                            
            exit(1);                                         
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {     
            zookeeper_close(zzh);                            
            exit(1);                                         
        }                                                    
    }                                                        
    // Put the watch again                                   
    rc = zoo_wget_children(zh, "/testpath1",                 
            watchchildren , mycontext, &str);                
    if (ZOK != rc){                                          
        printf("Problems  %d\n", rc);                        
    } else {                                                 
        int i = 0;                                           
        while (i < str.count) {                              
            printf("Children %s\n", str.data[i++]);          
        }                                                    
        if (str.count) {                                     
            deallocate_String_vector(&str);                  
        }                                                    
    }                                                        
}                                                            

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     

    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          

    while (1) {
        zookeeper_interest(zh, &fd, &interest, &tv);
        usleep(10);                                 
        if (connected == 1) {                       
            struct String_vector str;               

            usleep(10);
            // watch existence of the node
            rc = zoo_wget_children(zh, "/testpath1", 
                    watchchildren , mycontext, &str);
            if (ZOK != rc){                          
                printf("Problems  %d\n", rc);        
            } else {                                 
                int i = 0;                           
                while (i < str.count) {              
                    printf("Children %s\n", str.data[i++]);
                }
                if (str.count) {
                    deallocate_String_vector(&str);
                }
            }
            connected++;
        }
        if (fd != -1) {
            if (interest & ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}