Wednesday, May 27, 2015

An Eventbus for Python

Sometimes we need publish-subscribe kind of communications between components running in the same process. It is even better if the components don't need to know each-other as that simplifies the design of complex and highly-concurrent components. 

We may design similar systems using Queues also. But then also, coupling between components are not completely eliminated and we end up spawning threads and writing lots of complex logic ourselves. Moreover the pattern gets repeated across many pieces and ultimately making it a nightmare to weave the components together in a simpler way. 

So, Here is a Python eventbus library, geeteventbus  which is addressing exactly these problems we are facing. 

The API related documentations are available  here .

geeteventbus is inspired by the Java library guava eventbus. But it is not exactly the similar to Guava eventbus. 


To install the library, issue the below command:

$ sudo pip install geeteventbus

This is a platform independent package and so may be used on any OS where Python can be run.


Below diagram explains the architecture of the event-bus.



There are three type of components here. The publishers post events to the event-bus. The event-bus takes care of delivering the events to appropriate subscribers. The subscribers registers themselves to the event-bus and they register themselves for certain topics. Each event is associated with a topic, data and an optional ordering field. The topic of the event decides which subscribers it will be delivered to by the event-bus.

The event-bus can be synchronous or asynchronous. 

  • A synchronous event-bus delivers the events from the same threads the were posted from.
  • An asynchronous event-bus delivers the events to the subscribers from different threads. Basically, here the events are delivered from some of the executor threads run by the event-bus internally.
  • While creating the event-bus, we may declare the future subscribers to be thread-safe. In that case, the subscribers invocation won't be synchronized. 
  • In some cases, we may want the events to be processed by the subscribers in the same order as they were posted. To enforce that, event objects are created with a special ordering field. E.g. event('atopic', 'somedata for this event', 'an-ordering-key'). All the events with the ordering key "an-ordering-key" will be processed in the same order they were posted to the event-bus.
  • Multiple event-bus can be created in the same process if needed.



Basic working

  1. We create an eventbus
    from geeteventbus.eventbus import eventbus
    eb = eventbus()
    This will create an eventbus with the defaults. The default eventbus will have below characteristics:
    1. the maximum queued event limit is set to 10000
    2. number of executor thread is 8
    3. the subscribers will be called asynchronously
    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:
    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    
    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
                return
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
  3. Post some events to the eventbus with the topic "an_important_topic".
    from geeteventbus.event import event
    
    eobj1 = ('an_important_topic', 'This is some data for the event 1')
    eobj2 = ('an_important_topic', 'This is some data for the event 2')
    eobj3 = ('an_important_topic', 'This is some data for the event 3')
    eobj3 = ('an_important_topic', 'This is some data for the event 4')
    
    eb.post(eobj1)
    eb.post(eobj2)
    eb.post(eobj3)
    eb.post(eobj4)
  4. We may gracefully shutdown the eventbus before exiting the process
    eb.shutdown()
The complete example is below:
from time import sleep
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event

class mysubscriber(subscriber):
    def process(self, eventobj):
        if not isinstance(eventobj, event):
            print('Invalid object type is passed.')
            return
        topic = eventobj.get_topic()
        data = eventobj.get_data()
        print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))


eb = eventbus()
subscr = mysubscriber()
eb.register_consumer(subscr, 'an_important_topic')


eobj1 = event('an_important_topic', 'This is some data for the event 1')
eobj2 = event('an_important_topic', 'This is some data for the event 2')
eobj3 = event('an_important_topic', 'This is some data for the event 3')
eobj4 = event('an_important_topic', 'This is some data for the event 4')

eb.post(eobj1)
eb.post(eobj2)
eb.post(eobj3)
eb.post(eobj4)

eb.shutdown()
sleep(2)
A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:
from threading import Lock, Thread
from time import sleep, time
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
from random import randint


class counter_aggregator(subscriber, Thread):
    '''
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    '''
    def __init__(self, counter_names):
        Thread.__init__(self)
        self.counter_names = counter_names
        self.locks = {}
        self.counts = {}
        self.keep_running = True
        self.collect_times = {}
        for counter in counter_names:
            self.locks[counter] = Lock()
            self.counts[counter] = 0
            self.collect_times[counter] = time()

    def process(self, eobj):
        '''
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        '''
        counter_name = eobj.get_topic()
        if counter_name not in self.counter_names:
            return
        count = eobj.get_data()
        with self.locks[counter_name]:
            self.counts[counter_name] += count

    def stop(self):
        self.keep_running = False

    def __call__(self):
        '''
        Keep outputing the aggregated counts every 2 seconds
        '''
        while self.keep_running:
            sleep(2)
            for counter_name in self.counter_names:
                with self.locks[counter_name]:
                    print('Change for counter %s = %d, in last %f secs' % (counter_name,
                          self.counts[counter_name], time() - self.collect_times[counter_name]))
                    self.counts[counter_name] = 0
                    self.collect_times[counter_name] = time()
        print('Aggregator exited')


class count_producer:
    '''
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    '''
    def __init__(self, counters, ebus):
        self.counters = counters
        self.ebus = ebus
        self.keep_running = True
        self.num_counter = len(counters)

    def stop(self):
        self.keep_running = False

    def __call__(self):
        while self.keep_running:
            ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
            ebus.post(ev)
            sleep(0.02)
        print('producer exited')

if __name__ == '__main__':
    ebus = eventbus()
    counters = ['c1', 'c2', 'c3', 'c4']
    subcr = counter_aggregator(counters)
    producer = count_producer(counters, ebus)
    for counter in counters:
        ebus.register_consumer(subcr, counter)
    threads = []
    i = 30
    while i > 0:
        threads.append(Thread(target=producer))
        i -= 1

    aggregator_thread = Thread(target=subcr)
    aggregator_thread.start()
    for thrd in threads:
        thrd.start()
    sleep(20)
    producer.stop()
    subcr.stop()
    sleep(2)
    ebus.shutdown()




Wednesday, April 29, 2015

Using GDB to debug Java!

GDB is a very powerful debugger which is mainly used to debug C/C++ programs. But as many of the languages such as Java, Python, Perl, PHP, Ruby etc. are implemented in C/C++, so sometimes GDB is even useful in debugging some issues in programs implemented in these languages as well. Today I demonstrate one such use cases with Java programs.

Below is the Java program that calls a native API HelloWorld.  The HelloWorld internally calls printf to print some string to console and also prints some string to a file /tmp/testout.txt

//JniHelloWorld.java
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

public class JniHelloWorld implements Runnable{
	
	private native void HelloWorld();
	private Object lock1 = null;
	private Object lock2 = null;
	private FileOutputStream fos = null;

	public JniHelloWorld() {
		lock1 = new Object();
		lock2 = new Object();
		try {
			fos = new FileOutputStream(new File("/tmp/testout.txt"));
		} catch (FileNotFoundException e) {
			e.printStackTrace();
			System.exit(1);
		}
	}
	
	@Override
	public void run() {
		byte []outbytes = "This will go to /tmp/testout.txt file\n".getBytes();
        while (true) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
            }
            synchronized (lock1) {
                HelloWorld();
            }
            synchronized (lock2) {
                try {
                    fos.write(outbytes);
                } catch (IOException e) {
                }
            }
        }
	}	
	
	public static void main(String[] args) {
        System.loadLibrary("test");
		JniHelloWorld jnh = new JniHelloWorld();
		int i = 0;
		while (i++ < 20){
			new Thread(jnh).start();
		}
	}
}
Let us compile the Java code. $ javac javac JniHelloWorld.java Generate the JNI header file: $ javah JniHelloWorld Create the C file test.c
#include <stdio.h>
#include "JniHelloWorld.h"

JNIEXPORT void JNICALL Java_JniHelloWorld_HelloWorld
  (JNIEnv *env, jobject o){
              printf("hello from printf\n");  
  }

Now let us create the shared library:
$ gcc -I . -I $JAVA_HOME/include -I $JAVA_HOME/include/linux -fPIC -c test.c
In my machine JAVA_HOME points to /home/geet/sws/jdk1.8.0_20.

Now create the shared library:
$ gcc -shared -o libtest.so test.o

Set LD_LIBRARY_PATH (eg. export LD_LIBRARY_PATH=.) and then run the example Java program.
$ java JniHelloWorld

The process keeps running continuously printing to console and /tmp/testout.txt. If you check the program there is no chance for any dead lock in the code.

We may check the threads of Java thread using the commands in the below example (I am not using Jstack or jconsole, JVisualVM etc. as I want to check even the threads created by the native library). Every Java thread is mapped to a native thread (a posix thread on a Linux machine).

$ jps
4512 JniHelloWorld
4602 Jps

$ ps -fT -p 4512

Output will be something like:
UID PID SPID PPID C STIME TTY TIME CMD
geet 4512 4512 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4513 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4514 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4515 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4516 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4517 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4518 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4519 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4520 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4521 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4522 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4523 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4524 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
...
...

We may attach gdb to any of the thread to examine what it is doing
Eg. $ sudo gdb -p 4551

Now everything runs fine and no issues. Now let us spawn this Java program from another Java program.

// Spawner.java
import java.io.IOException;

public class Spawner {
	
	public static void main(String[] args) {
		ProcessBuilder pb =
				   new ProcessBuilder("java", "JniHelloWorld");
		Process p;
		try {
			p = pb.start();
			while (p.isAlive()){
				Thread.sleep(200);
			}
		} catch (IOException e) {
		} catch (InterruptedException e ){
		}
	}

}

$javac Spawner.java

Now run the Java process
$java Spawner

We may look at the modification time of /tmp/testout.txt to check if it is being updated continuously. After some time, we will see that the file is not being updated at all, but all the Java processes are running!

$ jps
5602 Spawner
6086 Jps
5622 JniHelloWorld


Definitely something wrong with JniHelloWorld process. Let us take its thread-dump

$jstack 5622

Now we examine the output of jstack.
Most of the threads are blocked,
"Thread-0" #10 prio=5 os_prio=0 tid=0x00007fde040e5000 nid=0x160b waiting for monitor entry [0x00007fdddae06000]
java.lang.Thread.State: BLOCKED (on object monitor)
at JniHelloWorld.run(JniHelloWorld.java:33)
- waiting to lock <0x000000076ce5b140> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:745)


They are waiting for lock 0x000000076ce5b140 which corresponds to lock1.

Let us see who acquired the lock lock1 from output of jstack, I found it be the below thread
"Thread-6" #16 prio=5 os_prio=0 tid=0x00007fde040ef800 nid=0x1611 runnable [0x00007fddda800000]
java.lang.Thread.State: RUNNABLE
at JniHelloWorld.HelloWorld(Native Method)
at JniHelloWorld.run(JniHelloWorld.java:33)
- locked <0x000000076ce5b140> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:745)

It has locked lock1 and stuck at native HelloWorld method, effectively blocking other threads waiting for that lock. It is a special kind of dead lock.

Now we need to check why this thread stuck at native method, Java debugger or tools won't help as we are in the native world ! The value of nid for this thread is 0x1611 (= 5649) which is the light weight process id for this thread. Let us attach gdb to this process and get its backtrace,

$sudo gdb -p 0x1611

The backtrace shows:
(gdb) bt
#0 0x00007fde09d7c3cd in write () at ../sysdeps/unix/syscall-template.S:81
#1 0x00007fde09d09a83 in _IO_new_file_write (f=0x7fde0a050400 <_io_2_1_stdout_>, data=0x7fde0a8b4000, n=4096)
at fileops.c:1261
#2 0x00007fde09d0af5c in new_do_write (to_do=4096,
data=0x7fde0a8b4000 "f\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\n"...,
fp=0x7fde0a050400 <_io_2_1_stdout_>) at fileops.c:538
#3 _IO_new_do_write (fp=0x7fde0a050400 <_io_2_1_stdout_>,
data=0x7fde0a8b4000 "f\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\n"...,
to_do=4096) at fileops.c:511
#4 0x00007fde09d0a121 in _IO_new_file_xsputn (f=0x7fde0a050400 <_io_2_1_stdout_>, data=, n=17)
at fileops.c:1332
#5 0x00007fde09d00957 in _IO_puts (str=0x7fdddae086fd "hello from printf") at ioputs.c:41
#6 0x00007fdddae086f1 in Java_JniHelloWorld_HelloWorld (env=0x7fde040ef9e8, o=0x7fddda8005f0) at test.c:6
#7 0x00007fddf489f8e6 in ?? ()
#8 0x000000076ce5b128 in ?? ()
#9 0x00007fde0935f9ac in InterpreterRuntime::monitorexit(JavaThread*, BasicObjectLock*) ()
from /home/geet/sws/jdk1.8.0_20/jre/lib/amd64/server/libjvm.so
#10 0x00007fddf47a798d in ?? ()
#11 0x00007fddf47a798d in ?? ()
#12 0x000000076ce5b128 in ?? ()
#13 0x0000000000000003 in ?? ()
#14 0x000000076ce5b140 in ?? ()
#15 0x00007fddda800650 in ?? ()
#16 0x00007fddf2410668 in ?? ()
#17 0x00007fddda8006d0 in ?? ()
#18 0x00007fddf2410850 in ?? ()
#19 0x0000000000000000 in ?? ()


Clearly the back trace shows that the thread is stuck at write system call (results from call to printf). Why is it blocked? Because its output buffer is full and nobody is taking out or cleaning the data. When we directly run JniHelloWorld from a terminal, the terminal is continuously clearing and spitting out this data and hence then it doesn't block.

BTW, there are neat ways to consume or redirect output buffers of spwaned processes to avoid such problems. I have used this not-very-neat example mainly to show how we can use GDB to debug Java programs in such situations.



Saturday, February 28, 2015

A ganglia plugin for collecting Stats from Storm topologies

Ganglia is a highly scalable monitor for distributed systems. It facilitates users to examine  different stats of the monitored machines in a single pane. It maintains historical stats and hence very good for checking how the CPU usage, memory usage, disk usage etc., JVM GC pauses are varying over time on the distributed machines.
Ganglia basically have three components.  They are Ganglia monitoring daemon(gmond), Ganglia meta daemon (gmetad) and Ganglia web front end.

Gmond monitors the changes in the host state, announces them, listen state changes from other nodes running gmond, and returns the XML description of the cluster state when queried. Gmond depends on plug-ins to collect host stats. The plug-ins may be written in C,C++,Python, Perl or PHP. In fact, we can implement real piece of code that collects the stats in any language with some tricks. Also, it won't be difficult to extend it support plug-ins  from other languages as well.


Gmetad polls the clusters it is monitoring and collects the reported metrics and write them to individual round robin databases.


Ganglia web front end provides  the metric Visualization UI.

There is a very good link to install and configure Ganglia on Ubuntu 14.04 here (from digital ocean).  Please follow the steps to setup Ganglia monitoring on your machines.  You may follow the similar steps to install on other Linux systems.

As you have installed and and configured Ganglia, now it is time to write and test a new Ganglia plug-in. From Ganglia 3.1 onwards,  Gmond has a modular interface that allows us to extend its capability through plug-ins. We can write modules or plug-ins to collect new kind of metrices. Gmond uses the Mod_Python module (modpython.so) to interface with all the Pythin plug-ins to collect the metrices published by them.

First we need to create an entry for modpython.so in gmond configuration file. Edit gmond.conf (by default it is installed at /etc/ganglia/gmond.conf) and add the below lines in the "modules" section:

  module {
    name = "python_module"
    path = "/usr/lib/ganglia/modpython.so"
    params = "/usr/lib/ganglia/python"
  }

So, the modules section in gmond.conf will look like somewhat like as shown below:

modules {
  module {
    name = "core_metrics"
  }
  module {
    name = "cpu_module"
    path = "/usr/lib/ganglia/modcpu.so"
  }
  module {
    name = "disk_module"
    path = "/usr/lib/ganglia/moddisk.so"
  }
  module {
    name = "load_module"
    path = "/usr/lib/ganglia/modload.so"
  }
  module {
    name = "mem_module"
    path = "/usr/lib/ganglia/modmem.so"
  }
  module {
    name = "net_module"
    path = "/usr/lib/ganglia/modnet.so"
  }
  module {
    name = "proc_module"
    path = "/usr/lib/ganglia/modproc.so"
  }
  module {
    name = "sys_module"
    path = "/usr/lib/ganglia/modsys.so"
  }
  module {
    name = "python_module"
    path = "/usr/lib/ganglia/modpython.so"
    params = "/usr/lib/ganglia/python"
  } 




Also, create a directory /etc/ganglia/pyconfs to store the configuration files for the python modules. Let us use .pyconf as the extension of the Python modules.  Add the below line in /etc/ganglia/gmond.conf
include ('/etc/ganglia/pyconfs/*.pyconf')

We will add a configuration file for the sample plugin (/etc/ganglia/pyconfs/ganglia_plugin_sample.pyconf). Content of this file is pasted below:
modules {
    module {
        name = "ganglia_plugin_sample"
        language = "python"
        param firstparam {
            value = 100
        }
        param secondparam {
            value = 500
        }
    }
}

collection_group {
  collect_every = 20
  time_threshold = 90
  metric {
    name_match = ".*" 
    value_threshold = "1"
  }
}



Now let us add a very simple Ganglia python plug-in. The module should implement the two functions at least, they are metric_init and metric_cleanup.  metric_init generally initializes the values for different metrices that the plug-in will be publishing. 

Below is the script:

import sys
import os
import random

descriptors = list()
firstparam_max = 1000
secondparam_max = 1000

def callback_fun1(name):
    '''
    Returns a random number between 1 and firstparam_max
    '''
    random.seed()
    return random.randint(1, firstparam_max)

def callback_fun2(name):
    '''
    Returns a random number between 5 and secondparam_max
    '''
    random.seed()
    return random.randint(5,secondparam_max)

def metric_init(params):
    global descriptors, firstparam_max, secondparam_max
    if 'firstparam' in params:
        firstparam_max = int(params['firstparam'])
        d = {'name': 'firstparam',
                'call_back': callback_fun1,
                'time_max': 90,
                'value_type': 'uint',
                'units': 'Count',
                'slope': 'both',
                'format': '%u',
                'description': 'Sample metric',
                'groups': 'Sample'}
        descriptors.append(d)
    
    if 'secondparam' in params:
        secondparam_max = int(params['secondparam'])
        d = {'name': 'secondparam',
                'call_back': callback_fun2,
                'time_max': 90,
                'value_type': 'uint',
                'units': 'Count',
                'slope': 'both',
                'format': '%u',
                'description': 'Sample metric',
                'groups': 'Sample'}
        descriptors.append(d)
    return descriptors

    def metric_cleanup():
        '''
        We don't need any cleanup :) :)
        '''
        pass


# This routine is for debugging purpose only and not used by gmond
# To debug the output, run as below:
# $ python ganglia_plugin_sample.py
if __name__ == '__main__':
    params = {'firstparam': 100, 'secondparam' : 500}
    metric_init(params)
    for d in descriptors:
        v = d['call_back'](d['name'])
        print '%s --> %u' % (d['name'], v)








Here, the plug-in is collecting two metrices (firstparam and secondparam). The value for firstparam is computed by function callback_fun1 and for secondparam is collected by callback_fun2. THe "slope" is set to "both" for both the metrices. This a hint for the gmetad to know that the value may increase or decrease.

We should copy this script to/usr/lib/ganglia/python/ganglia_plugin_sample.py.

After you copy the plugin-script and make the configuration changes, restart ganglia-monitor (gmond) service.

$sudo service ganglia-monitor restart

After some time you should see the "Sample metrics" group and the graphs for the metrices firstparam and secondparam under it.


I have created a Python module for collecting storm stats. The module is available here . Also for instructions please refer this readme.

I have pasted the module below for your easy reference:
import logging
import md5
import pickle
import re
import sys
from time import time
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol

clusterinfo = None
topology_found = True
descriptors = list()
topologies = []
serialfile_dir = '/tmp'
topology_summary_cols_map = {'status': 'Status', 'num_workers': 'Worker Count',
                             'num_executors': 'ExecutorCount', 'uptime_secs': 'Uptime',
                             'num_tasks': 'TaskCount'}

spout_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
               'Emitted': ['Count', '%f', 'double'],
               'Transferred': ['Count/sec', '%f', 'double'],
               'CompleteLatency': ['ms', '%f', 'double'], 'Acked': ['Count/Sec', '%f', 'double'],
               'Failed': ['Count/sec', '%f', 'double']}

bolt_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
              'Emitted': ['Count/sec', '%f', 'double'],
              'Executed': ['Count/sec', '%f', 'double'],
              'Transferred': ['Count/sec', '%f', 'double'],
              'ExecuteLatency': ['ms', '%f', 'double'],
              'ProcessLatency': ['ms', '%f', 'double'],
              'Acked': ['Count/sec', '%f', 'double'],
              'Failed': ['Count/sec', '%f', 'double']}

diff_cols = ['Acked', 'Failed', 'Executed', 'Transferred', 'Emitted']
overall = {'ExecutorCount': ['Count', '%u', 'uint'],
           'WorkerCount': ['Count', '%u', 'uint'],
           'TaskCount': ['Count', '%u', 'uint'],
           'UptimeSecs': ['Count', '%u', 'uint']}

toplogy_mods = {}
lastchecktime = 0
lastinfotime = 0
maxinterval = 6
all_topology_stats = {}
bolt_array = {}
spout_array = {}
nimbus_host = '127.0.0.1'
nimbus_port = 6627
logging.basicConfig(filename='/tmp/storm_topology.log', level=logging.INFO,
                    format='%(asctime)s  %(levelname)s line:%(lineno)d %(message)s', filemode='w')
logging_levels = {'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'WARNING': logging.WARNING,
                  'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL}


def get_avg(arr):
    if len(arr) < 1:
        return 0
    return sum(arr) / len(arr)


def normalize_stats(stats, duration):
    for k in stats:
        statsk = stats[k]
        if 'Emitted' in statsk and duration > 0:
            if statsk['Emitted'] > 0:
                statsk['Emitted'] = statsk['Emitted'] / duration
        if 'Acked' in statsk and duration > 0:
            if statsk['Acked'] > 0:
                statsk['Acked'] = statsk['Acked'] / duration
        if 'Executed' in statsk and duration > 0:
            if statsk['Executed'] > 0:
                statsk['Executed'] = statsk['Executed'] / duration


def freshen_topology(topology):
    tmpsavestats = None
    inf = None
    savedlastchecktime = 0
    tmp = md5.new()
    tmp.update(topology)
    filename = '/tmp/save_stats_for_' + tmp.hexdigest() + '.pk'
    try:
        inf = open(filename, 'rb')
    except IOError as e:
        logging.warn(e)
    if inf is not None:
        try:
            tmpsavestats = pickle.load(inf)
            savedlastchecktime = pickle.load(inf)
        except EOFError as e:
            logging.warn(e.message())
        inf.close()
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.warn('Info not got for topology ' + topology)
        return
    overallstats = all_topology_stats[topology]['overallstats']
    boltspoutstats = all_topology_stats[topology]['boltspoutstats']
    of = open(filename, 'wb')
    if of is not None:
        pickle.dump(boltspoutstats, of)
        pickle.dump(time(), of)
        of.close()
    if overallstats['UptimeSecs'] > (lastchecktime - savedlastchecktime):
        if tmpsavestats is not None:
            for bolt in bolt_array[topology]:
                if bolt in tmpsavestats and bolt in boltspoutstats:
                    stats_new = boltspoutstats[bolt]
                    stats_old = tmpsavestats[bolt]
                    for key in bolt_stats:
                        if key == 'ExecuteLatency' or key == 'ProcessLatency':
                            continue
                        if key not in stats_new:
                            continue
                        if key not in stats_old:
                            continue
                        if key in diff_cols:
                            stats_new[key] -= stats_old[key]
            for spout in spout_array[topology]:
                if spout in tmpsavestats and spout in boltspoutstats:
                    stats_new = boltspoutstats[spout]
                    stats_old = tmpsavestats[spout]
                    for key in spout_stats:
                        if key == 'CompleteLatency':
                            continue
                        if key not in stats_new:
                            continue
                        if key not in stats_old:
                            continue
                        if key in diff_cols:
                            stats_new[key] -= stats_old[key]
            normalize_stats(boltspoutstats, lastchecktime - savedlastchecktime)
        else:
            normalize_stats(boltspoutstats, overallstats['UptimeSecs'])
    else:
        normalize_stats(boltspoutstats, overallstats['UptimeSecs'])


def freshen():
    global lastchecktime
    if time() > (lastchecktime + maxinterval):
        lastchecktime = time()
        get_topology_stats_for(topologies)
        for topology in topologies:
            freshen_topology(topology)


def callback_boltspout(name):
    freshen()
    first_pos = name.find('_')
    last_pos = name.rfind('_')
    topology_mod = name[0:first_pos]
    bolt = name[first_pos + 1: last_pos]
    statname = name[last_pos + 1:]
    topology = toplogy_mods[topology_mod]
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.debug('Returing 0 for ' + name)
        return 0
    logging.debug('Got stats for ' + name + " " +
                  str(all_topology_stats[topology]['boltspoutstats'][bolt][statname]))
    return all_topology_stats[topology]['boltspoutstats'][bolt][statname]


def callback_overall(name):
    freshen()
    topology_mod, name = name.split('_')
    topology = toplogy_mods[topology_mod]
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.debug('Returing 0 for ' + name)
        return 0
    logging.debug(topology + ' ' + name + ' ' +
                  str(all_topology_stats[topology]['overallstats'][name]))
    return all_topology_stats[topology]['overallstats'][name]


def update_task_count(component_task_count, component_name, count):
    if component_name not in component_task_count:
        component_task_count[component_name] = 0
    component_task_count[component_name] += count


def update_exec_count(component_exec_count, component_name, count):
    if component_name not in component_exec_count:
        component_exec_count[component_name] = 0
    component_exec_count[component_name] += count


def update_whole_num_stat_special(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = 0
    for k in stats:
        if k == '__metrics' or k == '__ack_init' or k == '__ack_ack' or k == '__system':
            continue
        store[boltname][statname] += stats[k]


def update_whole_num_stat(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = 0
    for k in stats:
        store[boltname][statname] += stats[k]


def update_avg_stats(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = []
    for k in stats:
        store[boltname][statname].append(stats[k])


def get_topology_stats_for(topologies):
    all_topology_stats.clear()
    for topology in topologies:
        all_topology_stats[topology] = get_topology_stats(topology)


def refresh_topology_stats():
    logging.debug('Refreshing topology stats')
    for t in topologies:
        all_topology_stats[t] = {'topology_stats_got': False}
    global clusterinfo
    try:
        transport = TSocket.TSocket(nimbus_host, nimbus_port)
        transport.setTimeout(1000)
        framedtrasp = TTransport.TFramedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(framedtrasp)
        client = Nimbus.Client(protocol)
        framedtrasp.open()
        boltspoutstats = None
        component_task_count = None
        component_exec_count = None
        clusterinfo = client.getClusterInfo()
        for tsummary in clusterinfo.topologies:
            if tsummary.name not in topologies:
                continue
            toplogyname = tsummary.name
            overallstats = {}
            overallstats['ExecutorCount'] = tsummary.num_executors
            overallstats['TaskCount'] = tsummary.num_tasks
            overallstats['WorkerCount'] = tsummary.num_workers
            overallstats['UptimeSecs'] = tsummary.uptime_secs
            all_topology_stats[toplogyname]['overallstats'] = overallstats
            boltspoutstats = {}
            component_task_count = {}
            component_exec_count = {}
            all_topology_stats[toplogyname]['boltspoutstats'] = boltspoutstats
            all_topology_stats[toplogyname]['component_task_count'] = component_task_count
            all_topology_stats[toplogyname]['component_exec_count'] = component_exec_count
            tinfo = client.getTopologyInfo(tsummary.id)
            all_topology_stats[toplogyname]['topology_stats_got'] = True
            for exstat in tinfo.executors:
                stats = exstat.stats
                update_whole_num_stat_special(stats.emitted[":all-time"], boltspoutstats,
                                              exstat.component_id, 'Emitted')
                update_whole_num_stat_special(stats.transferred[":all-time"], boltspoutstats,
                                              exstat.component_id, 'Transferred')

                numtask = exstat.executor_info.task_end - exstat.executor_info.task_end + 1
                update_task_count(component_task_count, exstat.component_id, numtask)
                update_exec_count(component_exec_count, exstat.component_id, 1)
                if stats.specific.bolt is not None:
                    update_whole_num_stat(stats.specific.bolt.acked[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Acked')
                    update_whole_num_stat(stats.specific.bolt.failed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Failed')
                    update_whole_num_stat(stats.specific.bolt.executed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Executed')
                    update_avg_stats(stats.specific.bolt.process_ms_avg["600"], boltspoutstats,
                                     exstat.component_id, 'process_ms_avg')
                    update_avg_stats(stats.specific.bolt.execute_ms_avg["600"], boltspoutstats,
                                     exstat.component_id, 'execute_ms_avg')
                if stats.specific.spout is not None:
                    update_whole_num_stat(stats.specific.spout.acked[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Acked')
                    update_whole_num_stat(stats.specific.spout.failed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Failed')
                    update_avg_stats(stats.specific.spout.complete_ms_avg[":all-time"], boltspoutstats,
                                     exstat.component_id, 'complete_ms_avg')
            if '__acker' in boltspoutstats:
                del boltspoutstats['__acker']
            for key in boltspoutstats:
                if 'complete_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['complete_ms_avg'])
                    boltspoutstats[key]['CompleteLatency'] = avg
                    del boltspoutstats[key]['complete_ms_avg']
                if 'process_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['process_ms_avg'])
                    boltspoutstats[key]['ProcessLatency'] = avg
                    del boltspoutstats[key]['process_ms_avg']
                if 'execute_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['execute_ms_avg'])
                    boltspoutstats[key]['ExecuteLatency'] = avg
                    del boltspoutstats[key]['execute_ms_avg']

            for key in component_task_count:
                if key in boltspoutstats:
                    boltspoutstats[key]['Tasks'] = component_task_count[key]
            for key in component_exec_count:
                if key in boltspoutstats:
                    boltspoutstats[key]['Executors'] = component_exec_count[key]
        framedtrasp.close()

    except Exception as e:
        clusterinfo = None
        logging.warn(e)


def get_topology_stats(toplogyname):
    global lastinfotime
    if (lastinfotime + 4) < time():
        for t in all_topology_stats:
            all_topology_stats[t] = None
        lastinfotime = time()
        refresh_topology_stats()
    return all_topology_stats[toplogyname]


def metric_init_topology(params):
    global descriptors
    groupname = ''
    if 'topology' in params and len(params['topology']):
        groupname = params['topology']
    else:
        return
    topology = groupname
    topology_mod = re.sub("\s+", "", topology)
    topology_mod = re.sub("[_]+", "", topology_mod)
    toplogy_mods[topology_mod] = topology
    if 'spouts' in params:
        spout_array[topology] = re.split('[,]+', params['spouts'])
        for spout in spout_array[topology]:
            for statname in spout_stats:
                d = {'name': topology_mod + '_' + spout + '_' + statname, 'call_back': callback_boltspout,
                     'time_max': 90,
                     'value_type': spout_stats[statname][2],
                     'units': spout_stats[statname][0],
                     'slope': 'both',
                     'format': spout_stats[statname][1],
                     'description': '',
                     'groups': groupname}
                descriptors.append(d)

    if 'bolts' in params:
        bolt_array[topology] = re.split('[,]+', params['bolts'])
        for bolt in bolt_array[topology]:
            for statname in bolt_stats:
                d = {'name': topology_mod + '_' + bolt + '_' + statname, 'call_back': callback_boltspout,
                     'time_max': 90,
                     'value_type': bolt_stats[statname][2],
                     'units': bolt_stats[statname][0],
                     'slope': 'both',
                     'format': bolt_stats[statname][1],
                     'description': '',
                     'groups': groupname}
                descriptors.append(d)

    for key in overall:
        d = {'name': topology_mod + '_' + key, 'call_back': callback_overall,
             'time_max': 90,
             'value_type': overall[key][2],
             'units': overall[key][0],
             'slope': 'both',
             'format': overall[key][1],
             'description': '',
             'groups': groupname}
        descriptors.append(d)
    logging.info('Inited metric for ' + groupname)


def metric_init(params):
    global topologies, nimbus_host, nimbus_port
    if 'nimbus_host' in params:
        nimbus_host = params['nimbus_host']
    if 'nimbus_port' in params:
        nimbus_port = params['nimbus_port']
    if 'topologies' not in params:
        return
    if 'storm_thrift_gen' in params:
        sys.path.append(params['storm_thrift_gen'])
    else:
        sys.path.append('/usr/lib/ganglia')
    if 'loglevel' in params:
        loglevel = params['loglevel'].strip().upper()
        if loglevel in logging_levels:
            logging.getLogger().setLevel(logging_levels[loglevel])

    global Nimbus, ttypes
    from stormpy.storm import Nimbus, ttypes

    tss = re.split('[,]+', params['topologies'])
    topologies = tss
    alltops = {}
    for t in tss:
        alltops[t] = {'topology': t}
        alltops[t]['tlen'] = len(t)
        t_bolts = t + '_bolts'
        if t_bolts in params:
            alltops[t]['bolts'] = params[t_bolts]
        t_spouts = t + '_spouts'
        if t_spouts in params:
            alltops[t]['spouts'] = params[t_spouts]
    for t in alltops:
        logging.info('Initing metric for ' + t)
        metric_init_topology(alltops[t])
    return descriptors

if __name__ == '__main__':
    params = {'topologies': 'SampleTopology,AnotherTopology',
              'SampleTopology_spouts': 'SampleSpoutTwo', 'SampleTopology_bolts': 'boltc',
              'AnotherTopology_spouts': 'Spout', 'AnotherTopology_bolts': 'bolta,boltb,boltd',
              'loglevel': 'ERROR'}
    metric_init(params)
    for d in descriptors:
        v = d['call_back'](d['name'])
        formt = "%s " + d['format']
        print formt % (d['name'], v)

Saturday, February 14, 2015

Splitting Elasticsearch Index


Elasticsearch doesn't provide facilities for splitting an index. The main reason may be because the Elasticsearch nodes may not be able to hold the intermediate data created for splitting an index. So, if we need split an index,  we need to do something like  (a) Create the two new indices (b) reindex the data from the original index to the new indices by adding the alternate documents to the two new indices created.
Problem with the above approach is that, most of the time we disable to storing the source documents in the index. For example we may index 5 petabytes of data in an index, but we may not like to store the documents in the index as it will result in a very large index. So, for re-indexing we need have all original documents somewhere. We cannot just get all the documents from the original index itself.

But sometimes we may want to split an existing index when the index grows very large. This may be due to performance issues, when an index is too big there is a performance hit.

So, I came up with the below approach which worked fine. Hopefully it will be useful for you as well.

Let us assume, we have an index "original-index"  and we may want to split it to "original-index-firsthalf" and "original-index-secondhalf". 

Basically we need to follow the below steps.

  • Create an index  original-index-firsthalf with the same settings as that of original-index,  and put same mappings on the new index.
  •  Stop adding new docs to original-index-firsthalf and original-index till the splitting is over.
  • Flush original-index
  • Shutdown Elasticsearch nodes
  •  Copy (scp or something like that) the lucne indices in shards from original-index to original-index-firsthalf. We need to copy shard 0 directory index from source to shard 0 directory index of destination (Eg. original-index-firsthalf/0/index/* is copied to original-index-firsthalf/0/index/*.  Same needs to be repeated for all other shards (and for the replicas as well)
  • Restart Elasticsearch cluster
  •  Now original-index and original-index-firsthalf contain same documents indexed and will produce similar search results
  • Let us assume there were two mappings mapping1 and mapping2 in the indices for two types type1 and type2. Let us assume there is a field mapping1.date1 and mapping2.date2 in the two mappings and they are of "date" types (We may chose to split on the basis of some other mapping field as well, just for this example I am chosing some date fields
  • Let us assume docs in type1 includes values for mapping1.date1  in the range start_date and end_date and for simplicity let us assume docs in type2 also includes dates in the same range (from start_date to end_date).  Let us assume middle_date is the date which lies almost halfway from start_date and end_date.
  •  Delete all the documents in type1 and type2 that matches the queries with "type1.date1 >=  middle_date"  and "type2.date2 >=  middle_date"  respectively from original-index-firsthalf.
  • Delete all the documents in type1 and type2 that matches the queries with "type1.date1 <  middle_date"  and "type2.date2 < middle_date"  respectively from original-index.
  • Optimize original-index and original-index-firsthalf.
  • Now original-index-firsthalf and original-index contains almost half the documents from the original index, but they don’t share any documents
  •  May be we create an alias for original-index as original-index-secondhalf or simply create original-index-secondhalf index and replace its data from original-index and then delete original-index.


This may be useful when we want to split big indices into smaller indices (with same number of shards) as we don’t have to re-index the all the documents again. I could have written a shell-script to demonstrate the operation, but don't have time today. But shortly will post a shell script for your benefits :):)

Monday, December 29, 2014

A simple source code line counter

We sometimes desperately want a tool that will count the total "real" line count of an opensource project or the projects we are working in. It should not count the blank lines and the comments so that we know how big is the code base.

Simpler the tool, better it is. Here is a simple tool (sourcelines.py click here ) I wrote which will do the simple job. Currently it supports C, C++, Java, Scala, Python, PHP and Perl, Go. But you may add other types as well by providing a comment syntax file (explained later).

How do we run the tool? Let us print the helps.
$ python sourcelines.py -h
Usage: sourcelines.py [options]

Options:
  -h, --help            show this help message and exit
  -c COMMENT_FILE, --comment-file=COMMENT_FILE
                        comment syntrax description file
  -d SOURCE_ROOT_DIR, --root-source-dir=SOURCE_ROOT_DIR
                        root directory for source code

An example run for counting the source code lines for Go 1.4
$ python sourcelines.py -d /home/geet/sws/go
File-type:       Go  Line-count:        473968
File-type:       Python  Line-count:       313
File-type:       C  Line-count:          170744
File-type:       C++  Line-count:               7
File-type:       Perl  Line-count:           929

The tool determines the file type by looking at the extension of the files and doesn't do any other magic for that. All the files with extension .pl will be assumed to be Perl files, all the files with extension .java will be assumed to be Java files etc.

Now the tool doesn't know about Haskell files and how Haskell code is commented. It also doesn't know about Javascript files. So, we instruct the tool by providing it a Json file that describes how commenting is done in Haskell and Javascript files.

Below is content from the sample Json file  (let us name it as syntax_haskell_js.json):
{  "hs" : {
        "output_as" : "Haskell",
        "other_extns" : [ "haskell" , "hask" ],
        "start" : "{-",
        "end" : "-}",
        "whole_line" : ["--"]
    },   
    "js" : {
        "output_as" : "Javascript",
        "other_extns" : [ "javascript"],
        "start" : "/*",
        "end" : "*/",
        "whole_line" : ["//"]
    }
}

The Json file describes how the Haskell and Javascript files are commented. The top level keys denote the languages. So, hs and js are denoting Haskell and Javascript languages. Files with extension .hs are output as "Haskell" files.  Also, files with extensions .haskell and .hask will be treated as Haskell files. 
"start" tag denotes the start tag of a comment. For Haskell, it is '{-'. 
"end" tag denotes the end of a comment. For Haskell, it is '-}'.  
"whole_line" tag denotes the commenting tag which indicates the rest of the line to be a comment. For Haskell it is '--', for Javascript it is '//'.

A sample test run output is shown below:
$ python sourcelines.py -d . -c syntax_haskell_js.json
File-type:    Python  Line-count:     173
File-type:   Haskell  Line-count:      49






Sunday, November 30, 2014

Using Zookeeper to coordinate distributed Job watcher

Zookeeper is a tool that can be used for distributed system synchronization, coordination, registry, lock service etc. There are not many open source alternatives to Zookeeper and Zookeeper seems to be pretty good at what it does. That is why many open source projects such as Storm, Kafka use it for service discovery, service registry etc.

In this blog I will explain, how we can use Zookeeper as a coordinator for distributed job watchers.

Let us assume there are processes running on different machines which collect and submits batches (small batches) of works to some queues. There are job executors which polls for the jobs submitted and executes them and submit the results back to the queue. Job watchers keep on monitoring the job statuses and once a job completed they collect the results and do something with the result.
There may be multiple job submitters, watchers and executors. Many instances of job submitters, watchers or executors may come up and go down. The job watchers and executors do a fair sharing of the jobs to watch and execute.  Appearance of new executors or disappearance of existing of executors will trigger re-sharing of the jobs among the executors. Similarly, appearance of disappearance of watchers will trigger re-sharing of watches.

All the above necessitates reliable co-ordination among the different tasks may be executing different machines. Doing the co-ordination correctly is hard as there are too many nuances to address. Fortunately Zookeeper has addressed all of these and it is a proven piece of software that in use for some years now. Let me explain how we can do the co-ordination through zookeeper. (Please refer Zookeeper getting started guide for an overview of Zookeeper if you haven't used it already.)

First, a job watcher has to be notified about the appearance and disappearance of other job watchers. Each watcher while starting up registers themselves under an well known znode /watchers. So, /watchers will have a list of children which are the unique ids of the watcher processes. A child node gets added when a new watcher starts and a child node will disappear when the corresponding watcher process dies, disconnects or loses network connection. Each job watcher sets a watch on the /watchers node and when a watcher process appears or disappears, it gets a notification with list of currently registered watchers ids.




Also, when a watcher chooses to watch a job, it locks that job so as to signal others that they shouldn't spend cycles watching the job it already watching. It will create a lock with the same name under another znode /watchlocks.

The job executors share the jobs to execute. So, whenever a new executor comes up re-sharing of jobs is triggered. Similarly whenever a executor disappears, re-sharing of jobs is again triggered. Each job executors registers themselves under the node /executors and they also put an watch on the znode "/watchers" so that they get notifications for changes in the list of executors currently working.

The executors may not know when a job completes. But the watchers know when a job is complete. This is because when the job submitter submits a job, it submits the job with enough information so that watcher can know when a job is complete. Actually, the executors can also know when a job is complete. But in this example, I am assigning just one responsibility to the executors, which they execute the tasks and submit the results to some results queue.

Whenever watcher detects the completion of a job, it collects the results of the completed job, remove job details from /jobs znode and do something with the result. As a node under /jobs znode is deleted, hence the watcher again re-share the jobs to watch.


This approach gives us the ability to monitor complex workflows. Because there is no reason watcher cannot submit the completed jobs to some other queues which is again executed by the executors. Here I am just giving a very basic example to explain the overall working.

We will use Python for our examples.

We will use Kazoo Zookeeper client.   Install it:
$ sudo pip install kazoo

We will use Redis  as the jobs queue and jobs results queue. We need to install  Redis.
Download Redis from here.
Build Redis.
Extract the downloaded tar archive:
$ tar zxf redis-2.8.17.tar.gz
$ cd redis-2.8.17
$ sudo make
$ sudo make install

Now we install Python redis clients
$ sudo pip install redis
$ sudo pip install hiredis  # Needed for better performance of Python redis client

Let us start Zookeeper server and Redis servers. We will run all run all of them in the same machine as this post is for demonstration purpose only.

$ (zookeeper-install-directory)/bin/zkServer.sh start-foreground
$redis-server



Now let us look at the Python code samples. You may get the complete example in this github link.

I have also pasted the code below:

import sys
from atexit import register
from time import sleep
from random import randint
import logging
import uuid
from redis import ConnectionPool, Redis
from kazoo.client import KazooClient
from math import sqrt
from threading import Thread, Lock, Condition

SUBMITTED = 'subm'
PROCESSED = 'prcd'
ZIPPED = 'zpd'
UPLOADED = 'uploaded'

inited = False

ALLOWED_COMMANDS = ['watcher', 'jobsubmitter', 'jobexecutor']


def state_listener(state):
    print state


def create_path_if_not_exists(zk, path):
    '''
    Create the znode path if it is not existing already
    '''
    if not zk.exists(path):
        try:
            zk.ensure_path(path)
        except Exception as e:
            print e
            return False
    return True


def stop_zk(zkwrapper):
    zkwrapper.stop()


def init_redis():
    '''
    Connect to redis server. For this example, we are running
    Redis on the same machine
    '''
    pool = ConnectionPool(host='localhost', port=6379, db=0)
    r = Redis(connection_pool=pool)
    return r


class zk_wrapper:
    '''
    Callable class wrapping a zookeeper kazooclient object
    '''
    def __init__(self, zk):
        self.zk = zk
        self.state = ''
        register(stop_zk, self)

    def stop(self):
        self.zk.stop()

    def __call__(self, state):
        self.state = state


def init():

    global inited
    zk = None
    try:
        zk = KazooClient(hosts='127.0.0.1:2181')
        zk.add_listener(state_listener)
        zk.start()
        register(stop_zk, zk)
        create_path_if_not_exists(zk, '/jobs')
        create_path_if_not_exists(zk, '/watchers')
        create_path_if_not_exists(zk, '/watchlocks')
        create_path_if_not_exists(zk, '/executors')
    except Exception as e:
        print 'Zk problem ', e
        if zk is not None:
            zk.stop()
        sys.exit(1)

    inited = True
    return zk


class job_watcher:
    def register_myself(self):
        self.zk.create('/watchers/' + self.myid, ephemeral=True)

    def __init__(self):
        self.lock = Lock()
        self.zk = init()
        self.redis = init_redis()
        self.myid = uuid.uuid4().hex
        self.register_myself()
        self.my_jobs = {}
        children = self.zk.get_children('/jobs', watch=self)
        self.alljobs = children
        children = self.zk.get_children('/watchers', watch=self)
        self.watchers = children
        self.myindex = self.watchers.index(self.myid)
        self.num_watchers = len(self.watchers)
        self.lock_my_job_watches()
        self.job_watcher_thread = Thread(target=self, args=[None])
        self.job_watcher_thread.start()

    def unlock_my_jobs(self):
        self.lock.acquire()
        for job, lock in self.my_jobs.items():
            try:
                lock.release()
            except Exception as e:
                print 'Unlocking issue', e
        self.my_jobs.clear()
        self.lock.release()

    def stop_monitoring(self):
        self.stall_monitor = True

    def start_monitoring(self):
        self.stall_monitor = False

    def watch_for_completion(self):
        jobcount = {}
        self.lock.acquire()
        for job in self.my_jobs:
            try:
                vals = self.zk.get('/jobs/' + job)
                stat, count = vals[0].split('=')
                jobcount[job] = {'count': int(count), 'stat': stat}
            except Exception as e:
                print 'Job watch error ', e
                self.lock.release()
                return
        self.lock.release()
        times = 0
        while (not self.stall_monitor) and (times < 4):
            times += 1
            temp = ''
            self.lock.acquire()
            for job in self.my_jobs:
                try:
                    if (job not in jobcount) or jobcount[job]['stat'] != PROCESSED:
                        continue
                    processedcount = self.redis.hlen(job + '_completed')
                    if processedcount == jobcount[job]['count'] or processedcount == 0:
                        self.my_jobs[job].release()
                        self.zk.delete('/watchlocks/' + job)
                        self.redis.delete(job + '_completed')
                        self.zk.delete('/jobs/' + job)
                        print 'Job finished ' + job
                        temp = job
                        break
                except Exception as e:
                    print 'Monitor error ', e
            if temp != '':
                del self.my_jobs[temp]
                sleep(0.4)
            self.lock.release()

    def run(self):
        while True:
            if self.stall_monitor:
                sleep(1)
                continue
            self.watch_for_completion()

    def lock_my_job_watches(self):
        self.stop_monitoring()
        self.unlock_my_jobs()
        self.lock.acquire()
        for child in self.alljobs:
            slot = abs(hash(child)) % self.num_watchers
            if slot != self.myindex:
                continue
            lock = self.zk.Lock('/watchlocks/' + child)
            try:
                if lock.acquire(blocking=True, timeout=1):
                    self.my_jobs[child] = lock
            except Exception as e:
                print 'Lock problem ', e
        self.lock.release()
        if len(self.my_jobs) > 0:
            self.start_monitoring()

    def __call__(self, event):
        if event is None:
            '''
            I am not the zookeeper event callback
            '''
            self.run()
        if event.path == '/jobs':
            children = self.zk.get_children('/jobs', watch=self)
            self.alljobs = children
        else:
            self.watchers = self.zk.get_children('/watchers', watch=self)
            self.num_watchers = len(self.watchers)
            self.myindex = self.watchers.index(self.myid)
        self.lock_my_job_watches()


class job_executor:

    def register_myself(self):
        self.zk.create('/executors/' + self.myid, ephemeral=True)

    def __init__(self):
        zk = init()
        self.zk = zk
        self.lock = Lock()
        self.condition = Condition(self.lock)
        self.some_change = 0
        self.redis = init_redis()
        self.myid = uuid.uuid4().hex
        self.register_myself()
        self.my_jobs = {}
        children = zk.get_children('/jobs', watch=self)
        self.alljobs = children
        children = zk.get_children('/executors', watch=self)
        self.executors = children
        self.myindex = self.executors.index(self.myid)
        self.num_executors = len(self.executors)
        self.keep_running = True
        self.executor_thread = Thread(target=self, args=[None])
        self.executor_thread.start()

    def execute(self):
        self.my_jobs = filter(lambda x: (self.alljobs.index(x) % self.num_executors)
                              == self.myindex, self.alljobs)
        self.execute_jobs()

    def execute_jobs(self):
        some_change = self.some_change

        def isprime(number):
            number = abs(number)
            if number <= 1:
                return False
            if number <= 3:
                return True
            if number & 1 == 0:
                return False
            end = int(sqrt(number))
            i = 3
            while i <= end:
                if number % i == 0:
                    return False
                i += 2
            return True

        if some_change != self.some_change:
            return
        jobs = set()

        for job in self.my_jobs:
            if some_change != self.some_change:
                return
            try:
                jobval = self.zk.get('/jobs/' + job)
                stat, counts = jobval[0].split('=')
                if stat == SUBMITTED:
                    jobs.add(job)
            except Exception as e:
                print 'Problem happened ', e

        while len(jobs) > 0:
            for job in jobs:
                if some_change != self.some_change:
                    return
                try:
                    val = self.redis.lrange(job, 0, 0)
                    if val is None or len(val) == 0:
                        stat = PROCESSED
                        self.zk.set('/jobs/' + job, PROCESSED + '=' + counts)
                        jobs.remove(job)
                        break
                    ival = int(val[0])
                    if self.redis.hmset(job + '_completed', {ival: isprime(ival)}):
                        self.redis.lpop(job)
                except Exception as e:
                    print 'Some problem ', e
                    sys.exit(1)

    def run(self):
        while self.keep_running:
            self.execute()
            self.condition.acquire()
            self.condition.wait(1.0)
            self.condition.release()

    def __call__(self, event):
        if event is None:
            self.run()
        if event.path == '/jobs':
            children = self.zk.get_children('/jobs', watch=self)
            self.alljobs = children
        else:
            self.executors = self.zk.get_children('/executors', watch=self)
            self.num_executors = len(self.executors)
            self.myindex = self.executors.index(self.myid)
        self.some_change += 1
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()


def job_submitter_main():
    try:
        zk = init()
        cpool = ConnectionPool(host='localhost', port=6379, db=0)
        r = Redis(connection_pool=cpool)
        added = 0
        tried = 0
        max_add_try = 5000
        jobname = uuid.uuid4().hex
        added_nums = set()

        while tried < max_add_try:
            value = randint(5000, 90000000)
            tried += 1
            if value not in added_nums:
                added_nums.add(value)
            else:
                continue

            while True:
                try:
                    r.lpush(jobname, value)
                    added += 1
                    break
                except Exception as e:
                    sleep(1)
                    print "Lpush ", jobname, e

        zk = KazooClient(hosts='127.0.0.1:2181')
        zk.add_listener(state_listener)
        zk.start()
        value = SUBMITTED + "=" + str(added)
        zk.create('/jobs/' + jobname, value=value)
        zk.stop()

    except Exception as e:
        print 'Big problem in submitting job ', e
        sys.exit(1)
    print 'Job submitted ' + jobname


def watcher_main():
    job_watcher()


def job_executor_main():
    job_executor()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print 'Usage: ' + sys.argv[0] + ' command'
        print 'Valid commands are : ' + ', '.join(ALLOWED_COMMANDS)
        sys.exit(1)
    logging.basicConfig()
    if sys.argv[1] not in ALLOWED_COMMANDS:
        print sys.argv[1] + ' not a valid command'
        sys.exit(1)
    if sys.argv[1] == 'watcher':
        watcher_main()
        sleep(86400)
    elif sys.argv[1] == 'jobsubmitter':
        job_submitter_main()
        sleep(2)
    elif sys.argv[1] == 'jobexecutor':
        job_executor_main()
        sleep(86400)

The tasks here are some numbers and the executors checks if the numbers are prime or not. As I said before, this is just for demonstration purpose and hence the tasks are simplest possible tasks, in real life we don't need any distributed environment to check if the numbers are prime. The tasks (i.e. the numbers) are submitted to a queue in Redis in small batches. For the queue, we are just using lists here and hence and hence a job is submitted as a list numbers to a Redis list.

job_submitter_main is the function that submits the job to Redis. It pushes the list of numbers to Redis and also create a job description znode under the node /jobs node in Zookeeper.  The znode name and the name of the list created in Redis are same. The znode created for a job will have the data "subm=count" where count is the number of tasks for the job (so, it will be length of the corresponding list in Zookeeper, let us say it is the job size)

job_watcher is a callable class which watches the /watcher znode and also /jobs znode. All the job_watchers gets a notifications when new job description is created under the /jobs znode. The watchers shares the jobs to watch by following the below algorithm:
Let us say there are N watchers and J is the sorted list of incomplete jobs. All the watchers has the list of currently running registered watchers.  Each watcher sorts the list and find its position within the list. Watcher with position 0 will watch the jobs at index 0, N, 2N, 3N, 4N,....,  watcher with position 1 will watch the jobs at index 1, N + 1, 2N + 1, 3N + 1, 4N + 1, ....  in list J,  watcher will position n will watch the jobs at index n, N +n , 2N + n, 3N + n, 4N + n,  .,.. etc.
The watcher processes create an job_watcher object and start watching the jobs for its completion.

job_executor is the callable class which executes the jobs. The executor processes creates instances of job_executor class and start executing the jobs. The executors share the jobs for execution using a similar algorithm as done by the watchers. An executor completes a task and write the result in a hashmap  in Redis. The hashmap is named as _completed. Each key in the hashmap is a number and the value is 0 or 1. A value of zero indicates the number is not prime, 1 indicates the number is prime. Once a task is completely executed (i,e. the number is checked),  the executor removes the number from the job queue (list) and puts an entry in the hashmap for the result.

The job watchers keep checking size of the completed queues (hashmaps), and once the size becomes equal to the job size it assumes the job is complete. The watcher simply deletes the job node from zookeeper and the _completed hashmap from Redis.

We can run the different components as follows (save the above script in file distjob.py):
To submit jobs:
$ python distjob.py   jobsubmitter
We can submit any number of jobs and we don't really care if the job executors or watchers are running or not.

To run the watchers:
$ python distjob.py  watcher
We can run any number of watchers and we don't care how many job executors or job submitters are running.

To run the job executors:
$ python distjob.py  jobexecutor
We can run any number of job executors and we don't really care how many job submitters or watchers are running.

In this example, to demonstrate distributed job co-ordination using Zookeeper, we are running all the watchers, submitters, executors and Redis server and Zookeeper servers from the same node. But that need not be so. We can easily make it really distributed system. We need to just replace "localhost" with the distributed Zookeeper server list. Also, we also have to use the remote Redis server host or IP whereas in the example script above we are using "localhost" as the Redis server host.