Monday, August 1, 2016

A CountdownLatch for C++ multi-threaded programs

Java standard library comes with CountDownLatch which is  synchronization aid which allows one or more threads to wait on until a set of operations being performed by other threads completes.

For  example, let one thread input a string to some queues (one or two queues) and wait for completion of two operations, which are:
  1. Calculating the hashcode of the string
  2. Finding the count of unique chars in the string
The task submitter may wait asynchronously on the countdownlatch for the operations to be completed. The task executor threads will countdown the countdown latch once they complete their operations. Waiters get notified once the count of the countdownlatch reches zero. In this example, the waiter thread will create a countdownlatch object with count 2 and somehow pass the reference of the countdownlatch object to the executor threads (May be putting the string and pointer to the countdownlatch object in the task object).

We may achieve similar functionalities using Posix thread mutexes and condition variables (Threading libraries other than pthread will work as well). But as CountDownLatch is an useful and frequently used patterns, so it will be nice to have a library for the same.
I implemented a CountDownLatch class for C++ in less than 100 lines of code which provides similar functionalities as its Java counterpart. The code is available in Github(countdownlatch). May be useful for you as well :):):)...........

Below is a sample program demonstrating the use of countdownlatch:

#include <unistd.h>
#include <thread>
#include <vector>
#include <iostream>
#include <countdownlatch.hpp>

void fun(clatch::countdownlatch *cl) {
    cl->await();
    std::cout << "Wait is over " << std::endl;
}

int main() {
    auto cl = new clatch::countdownlatch(10);
    int i = 0;
    std::vector<std::thread*> ts;
    while (i++ < 2) {
        std::thread *t  = new std::thread(fun, cl);
        ts.push_back(t);
    }

    i = 0;
    while (i++ < 10) {
        sleep(1);
        cl->count_down();
    }
    i = 0;
    while (i < 2) {
        ts[i++]->join();
    }
    i = 0;
    while (i < 2) {
        delete ts[i++];
    }
    delete cl;
    return 0;
}

 

Sunday, July 31, 2016

Cyclicbarrier for C++ multi-threaded programs

Java has very good concurrency support and a very good library providing useful classes for multi-threaded programming. One such class is CyclicBarrier.
From Javadoc, CyclicBarrier is:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. 

C++ standard library lacks this utility. This is not a problem anyway as pthread_barrier routines (pthread_barrier_wait, pthread_barrier_init etc.) exactly do the same and we may always use pthread library for multi-threaded programming in C++ and Pthread library provides much more control on how we use the threads. But C++11 has multi-threading built into the language (and on Linux platform G++ internally uses Pthreads only) and so it would have been nice if a cyclicbarrier like construct was part of C++ library itself.

So, I spent some time building a library that provides silmilar functionality as the Java CyclicBarrier class. which is accessible at github. There is an example program demonstrating how to use it. May be you like to use it.

Wednesday, July 27, 2016

Bitset data structure in Golang

I have created a Library for bitset where each bit can be treated as a boolean value. The library is accessible here. It's a thread-safe library (or rather a goroutine safe library) and can be happily used from multiple goroutines concurrently. It has many features like get, set, xor, end, clone etc. The API documentation for the library is accessible here.

I have created this library as I need similar functionalities in the distributed cache (geetcache) project I am currently working on. It helps me to serialize-deserialize the hyperlogs data types in a compact manner. An example usage is given below:


package main

import (
 "fmt"
 "github.com/nipuntalukdar/bitset"
)

func main() {
 bs := bitset.NewBitset(8)
 if bs.IsAllZero() {
  fmt.Printf("All bits are set to zero\n")
 }
 bs.Flip(60)
 fmt.Printf("Number of ones %d\n", bs.GetSetbitCount())
 fmt.Printf("Number of zeros %d\n", bs.GetZerobitCount())
 bs = bitset.NewBitset(10000)
 fmt.Printf("Number of ones %d\n", bs.GetSetbitCount())
 fmt.Printf("Number of zeros %d\n", bs.GetZerobitCount())
}

A more detailed example is given below:


package main

import (
 "fmt"
 "github.com/nipuntalukdar/bitset"
)

func main() {
 bs := bitset.NewBitset(80)
 if bs.IsAllZero() {
  fmt.Printf("All bits are set to zero\n")
 }
 bs.Flip(60)
 fmt.Printf("Number of ones %d\n", bs.GetSetbitCount())
 fmt.Printf("Number of zeros %d\n", bs.GetZerobitCount())
 bs.SetBit(9)
 ret, err := bs.IsSet(9)
 if err != nil || !ret {
  fmt.Printf("Some issue\n")
 }

 bs.SetVal(1, 10, 511)
 recvd, err := bs.GetByte(0)
 if err == nil {
  fmt.Printf("First byte in the underlying array %d\n", recvd)
 }

 // set bytes 6,7,8 to zero and don't touch other bits
 bs.SetVal(6, 8, 0)

 // Clear all the bits, or make them all zero
 bs.ClearAll()

 // Set all the bits to 1
 bs.SetAll()

 // Flip 10th bit
 bs.Flip(10)
 recvd, _ = bs.GetByte(10)
 fmt.Printf("The 10 th byte %d\n", recvd)

 // Get a copy of underlying bytes of the bitset
 bytes := bs.GetBytes()
 fmt.Printf("The first four bytes in the bitset %v\n", bytes[0:4])

 // Flip all the bits
 bs.FlipRange(1, 639)

 // Set the bits 20,21,....,39,40 to 1
 bs.SetRange(20, 40)

 // Set the bits from 20,21,22,....,79,80 to zero
 bs.ClearRange(20, 80)

 other1 := bitset.NewBitset(4)
 other1.SetAll()
 bs.Xor(other1)

 other2 := bitset.NewBitset(10)
 other2.SetRange(40, 60)
 bs.And(other2)
 bs.ResetBit(80)

 // Get next zero bit position after index 0
 indx, _ := bs.GetNextZeroBit(0)
 fmt.Printf("Index of next zero bit %d\n", indx)

 // Get the zero bit before bit 80
 indx, _ = bs.GetPrevSetBit(80)

 // Get the bits from 10 to 28 packed in an uint32
 retuint32, _ := bs.GetVal(10, 28)
 fmt.Printf("The packed uint32 returned is %d\n", retuint32)
}



Sunday, May 29, 2016

Things that I miss in Golang

Go is a nice language and I feel it sits somewhere between C and C++. It has good libraries. Very good Concurrency support can make creating highly concurrent application much easier. Goroutines and channels are great and they help in creating simple and efficient programs.

But Go still lacks many facilities that C++, Java, Python provides. Below are the list of features I wish Go would have included:

  1. Default parameters for functions or methods: Go doesn't have this feature. That makes things awkward sometime. Let us say, I am writing a pipeline in Go (trying to emulate Storm in Go). Each stages in the pipeline may emit a tuple calling the method collector.Emit(tuple). Now I wish to add the feature of tracking a tuple tree. I want to signal that tracking is needed. I would have loved to do that using the same method, something like  collector.Emit(tuple, context) where context may be some struct containing the tracking information. Let us say, we could have declared the method as shown below: 

    func (collector *Collector) Emit(t Tuple, context *Context = nil) {
        ....
      
    }

    We can do this in C++ and Python. In Java also method overloading is available. Wish we could do the same in Golang as well. It is not a thing that we cannot work around. But suppose I designed Emit the method without the tracking stuff in mind and later I wanted to add the tracking part; then the default parameters will help as I don't need to change my existing application code that will use the pipeline library.
  2. No great  debugger bundled with the Go distribution I download from Google. Gccgo has integration with GDB, but I don't use gccgo. Having a good debugger is a great help for lesser mortals like us. Java/C++/Python/C doesn't have this problem.
  3. Lack of inheritance: This hampers our ability to re-use code. Most probably because, Go is not a purely object oriented language and it wanted to have minimal support for OO paradigm. 
  4. Type safety is less:  In Go programs, we tend to pass interface{} as parameter too often. But that is like void * in C/C++ or Object in Java. If we had inheritance in Go, and if we could make a base "class" pointer/reference to point/refer a derived class object, things would have been really easier.
  5. No generics:  We end up writing the same code for different types, which could have been avoided if we had generics
  6. No exception handling: I have used Java/C++ a lot. Lack of exception handling means, we have to check the return status of functions everywhere.

Wednesday, March 9, 2016

Design of a Scalable and Reliable pipleline with Storm and Kafka

Apache Storm is a real-time event processing system. We may even call it real-time Hadoop. Unlike Hadoop map-reduce jobs, Storm topologies keep running continuously looking for events to process. A storm topology defines how the events are moved in the execution pipeline which is expressed as directed acyclic graph. A topology has hooks to work on the data. The hooks are called spouts and bolts. Generally, the spout is the event "generator" and the entry point to the topology. Spout may get the events from anything, but it is common to get the events from Kafka, Kestrel, Mongodb etc. as the event source. Hooks are the entry points for code and the routing of tuples (data) is controlled by Storm framework. When a Spout or Bolt emits a tuple, the tuple is sent to next bolt in the pipeline. Each spout or bolts run a set of tasks doing the exactly same work. The tasks may be run in separate threads or a set of threads may be multiplexed among the tasks.


A typical topology looks like as shown below:

The spout is the entry point to the topology. There may be multiple Spouts in a topology and there may be multiple data sources. In this above example, a spout is emitting data to bolt 1 and bolt 2. Bolt 2 is further emitting data to bolt 3. Each of the spouts and bolts may running multiple tasks (may be separate threads) and the tasks may running on same or different JVM or across machines. Storm framework takes care of routing the data (tuples) among the various components.

Storm cluster has three different components. They are Nimbus, Supervisors and UI. A storm cluster has just one Nimbus node and this is a single point of failure till now.  When a topology is submitted, the topology Jar, along with configs are sent to Nimbus (using Thrift protocol). Nimbus decides which Supervisors to run the the topology and the topology Jar is submitted to those Supervisor machines as well (For testing purpose we may run Nimbus, Supervisors and UI all in the same machine.
Each supervisor exposes a few slots. Each slot actually can be occupied by a topology worker, where a worker is a JVM process. When we create the topology, we can specify the number workers the topology should be run with and Storm tries best to provide that number of workers provided that many slots available in the cluster.

Another essential component in the cluster is Zookeeper. Storm uses Zookeeper  for co-ordination of Nimbus, Supervisor and UI components.


Now enough of theories. Let us make a topology and run it in a Storm cluster.

Our data source is Kafka which is reliable and highly available message queue. It is simple and performs really well.

Now let is assume a simple scenario where we collect details of people for doing some analysis. As this is a demo, so I will just put the very basic information about the person: name, age and id. The information is a Json document and a sample is given below:

{    
    "id" : "udhdgcbcdg",
    "name" : "Geet",
    "age" : 8

}


The document is added to a Mongodb collection person
Now let us create a Kafka topic "demo". I assume you have set up a Zookeeper and Kafka cluster . A Kafka cluster with just one node is enough for demo. Similarly a Zookeeper node is enough for our demo. We don't need high availability to run a demo :)

Cd to Kafka install directory and issue the below command:
$ bin/kafka-topics.sh --create --topic demo --partitions 16 --replication-factor 1 --zookeeper

If you are running Zookeeper and Kafka locally, use 127.0.0.1:2181 for host and port.
$bin/kafka-topics.sh --create --topic demo --partitions 16 --replication-factor 1 --zookeeper 127.0.0.1:2181

 Now a topic "demo" is created with 16 partitions, and no replication for the topic. So, no high-availability!

Let us download Storm 0.9.6. Extract the package. I modified Storm config storm.yaml as shown below:
# Storm.yaml ###########
storm.zookeeper.servers:
    - "127.0.0.1"
nimbus.host: "127.0.0.1"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
###################

Note that I am running Nimbus and Zookeeper on my localhost.


Change directory to storm installation directory (delete the jar log4j-over-slf4j-1.6.6.jar from Storm lib directory, otherwise we may face some conflicts in running our topology):


and issue the below commands:
$ bin/storm nimbus  #Starts nimbus
$ bin/storm supervisor #Starts Supervisor
$ bin/storm ui #Starts the UI

Now we may check the Storm UI by pointing our browser to http://127.0.0.1:8080

The sample code for the topology prototype is here. Please download the code so that we may run the topology.
After you downloaded the code, build the topology jar by issuing the below command:

$cd  datainfralytics/pipeline/proto
$ mvn package

This creates the a jar for our topology (proto-1.0-SNAPSHOT-jar-with-dependencies.jar in directory datainfralytics/pipeline/proto/target).
We submit the jar issuing the below command:
$  $STORM_HOME/bin jar proto-1.0-SNAPSHOT-jar-with-dependencies.jar geet.org.topolgy.StartTopology  topology.properties

STORM_HOME environment variable should be pointing to storm installation directory.
topology.properties file contain the configuration details such as elasticsearch cluster name, mongos host ip etc. A sample is given below:

#### topology.properties####
es.cluster=demo
es.host=127.0.0.1
es.port=9300
mongos.host=127.0.0.1
mongos.port=27017
###################

Now our topology will be running and we can inspect the details from the Storm UI.

The topology is consuming events from Kafka. These events are nothing but JSON documents containing information about persons.  A producer process writes the person details to Kafka and our topology consumes the events from Kafka. A very simple Python producer script is given below:

from time import time, sleep
from random import randint
from json import dumps
from uuid import uuid1
from kafka import KafkaClient, KeyedProducer
 
kafka_brokers = '127.0.0.1:9092' # Comma separated list of brokers
topic = 'demo' 
kclient = KafkaClient(kafka_brokers)
producer = KeyedProducer(kclient)

try:
    i = 1
    namein = 'name_{}'.format(int(time()))
    while i <= 10000:
        name = '{}_{}'.format(namein, i)
        person = {'id' : uuid1().hex , 'name' : name, 'age' : randint(1,100)}
        producer.send(topic, name, dumps(person))
        i += 1
        print i
except Exception as e:
    print e
    exit(1)


Run the script and it will put 10000 person documents in Kafka in the topic demo. The topology will almost instantly consume the person JSON documents and put them to MongoDB and index them on Elasticsearch.

Now let us look at the Topology and understand how can it be scalable and highly available. The topology is defined in the following Java class:

package geet.org.topolgy;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import geet.org.dbbolt.MongoBolt;
import geet.org.indexbolt.EsBolt;
import geet.org.spout.DataSpout;

public class StartTopology {
 private static final String TOPOLOGY = "DemoTopolgy";

 public static void main(String[] args) {
  if (args.length < 1) {
   System.err.println("Please provide the proprty file name");
   System.exit(1);
  }
  TopologyBuilder tbuilder = new TopologyBuilder();
  tbuilder.setSpout("dataspout", new DataSpout(), 16);
  Fields groupingField = new Fields("person");
  tbuilder.setBolt("boltdb", new MongoBolt(), 16).fieldsGrouping("dataspout", groupingField);
  tbuilder.setBolt("boltindex", new EsBolt(), 32).fieldsGrouping("boltdb", groupingField);
  Config conf = new Config();
  conf.setMaxTaskParallelism(128);
  conf.setNumWorkers(2);
  conf.setNumAckers(2);
  conf.setDebug(false);
  conf.put(Config.TOPOLOGY_DEBUG, false);
  conf.setMaxSpoutPending(64);
  conf.setMessageTimeoutSecs(300);

  Properties props = new Properties();
  try {
   FileInputStream inf = new FileInputStream(args[0]);
   props.load(inf);
   Set<Object> propnames = props.keySet();
   for (Object x : propnames) {
    conf.put((String) x, props.get(x));
   }
   inf.close();
   props.list(System.out);
  } catch (IOException e) {
   e.printStackTrace();
   System.exit(1);
  }
  try {
   StormSubmitter.submitTopology(TOPOLOGY, conf, tbuilder.createTopology());
  } catch (AlreadyAliveException | InvalidTopologyException e) {
   e.printStackTrace();
   System.exit(1);
  }
 }
}


At line 28: We are specifying a spout (labelled "dataspout") which will be run with 16 tasks. It is implemented by the DataSpout class.
Line 30: Specifies the DB updater bolt (labelled "boltdb") which will be run with 16 tasks. It takes input from "dataspout", i.e. the tuples emitted by "dataspout" are directed to this bolt. It is implemented by MongoBolt class.
Line 31:  Specifies the index updater bolt (labelled "boltindex") which is run with 32 tasks. It takes input from "boltdb" and updates the index. It is implemented by EsBolt class.

So, data flows from Kafka to Spout (dataspout), from Spout to DB Updater bolt(boltdb) and from DB updater bolt to index updater bolt (indexbolt)

Now how it is scalable?
It is scalable because we are using scalable technologies such as Kafka, MongoDB, ElasticSearch:)
But what about the pipeline we have just built ? Is it also scalable, is it also reliable?
The pipeline itself is scalable. As we may  increase the number of tasks for each stage.  We may also increase the number of workers executing the topology, and may also increase the number of machines running the workers. So, scaling is not a problem here.
 

Order of updates:
Sometimes the order in which updates are applied across storage systems is very important. For example. there may be an event which specify update of a record, and a subsequent event which specifies the delete of the same record. If the events are applied in out of sequence, then the deleted record may actually reappear in our storage systems, in such a case our customers will scold us and may even switch to our competitor's product. That is why we should make sure that events are applied in order. That is achievable by applying the events to the queue (Kafka) in order. Each event may be associated with a time stamp and ensure that events are added to the queue in the order of that time stamp.

How to keep the storage systems in Sync:
Maintaining the order of processing:
Error handling:

Wednesday, May 27, 2015

An Eventbus for Python

Sometimes we need publish-subscribe kind of communications between components running in the same process. It is even better if the components don't need to know each-other as that simplifies the design of complex and highly-concurrent components. 

We may design similar systems using Queues also. But then also, coupling between components are not completely eliminated and we end up spawning threads and writing lots of complex logic ourselves. Moreover the pattern gets repeated across many pieces and ultimately making it a nightmare to weave the components together in a simpler way. 

So, Here is a Python eventbus library, geeteventbus  which is addressing exactly these problems we are facing. 

The API related documentations are available  here .

geeteventbus is inspired by the Java library guava eventbus. But it is not exactly the similar to Guava eventbus. 


To install the library, issue the below command:

$ sudo pip install geeteventbus

This is a platform independent package and so may be used on any OS where Python can be run.


Below diagram explains the architecture of the event-bus.



There are three type of components here. The publishers post events to the event-bus. The event-bus takes care of delivering the events to appropriate subscribers. The subscribers registers themselves to the event-bus and they register themselves for certain topics. Each event is associated with a topic, data and an optional ordering field. The topic of the event decides which subscribers it will be delivered to by the event-bus.

The event-bus can be synchronous or asynchronous. 

  • A synchronous event-bus delivers the events from the same threads the were posted from.
  • An asynchronous event-bus delivers the events to the subscribers from different threads. Basically, here the events are delivered from some of the executor threads run by the event-bus internally.
  • While creating the event-bus, we may declare the future subscribers to be thread-safe. In that case, the subscribers invocation won't be synchronized. 
  • In some cases, we may want the events to be processed by the subscribers in the same order as they were posted. To enforce that, event objects are created with a special ordering field. E.g. event('atopic', 'somedata for this event', 'an-ordering-key'). All the events with the ordering key "an-ordering-key" will be processed in the same order they were posted to the event-bus.
  • Multiple event-bus can be created in the same process if needed.



Basic working

  1. We create an eventbus
    from geeteventbus.eventbus import eventbus
    eb = eventbus()
    This will create an eventbus with the defaults. The default eventbus will have below characteristics:
    1. the maximum queued event limit is set to 10000
    2. number of executor thread is 8
    3. the subscribers will be called asynchronously
    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:
    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    
    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
                return
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
  3. Post some events to the eventbus with the topic "an_important_topic".
    from geeteventbus.event import event
    
    eobj1 = ('an_important_topic', 'This is some data for the event 1')
    eobj2 = ('an_important_topic', 'This is some data for the event 2')
    eobj3 = ('an_important_topic', 'This is some data for the event 3')
    eobj3 = ('an_important_topic', 'This is some data for the event 4')
    
    eb.post(eobj1)
    eb.post(eobj2)
    eb.post(eobj3)
    eb.post(eobj4)
  4. We may gracefully shutdown the eventbus before exiting the process
    eb.shutdown()
The complete example is below:
from time import sleep
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event

class mysubscriber(subscriber):
    def process(self, eventobj):
        if not isinstance(eventobj, event):
            print('Invalid object type is passed.')
            return
        topic = eventobj.get_topic()
        data = eventobj.get_data()
        print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))


eb = eventbus()
subscr = mysubscriber()
eb.register_consumer(subscr, 'an_important_topic')


eobj1 = event('an_important_topic', 'This is some data for the event 1')
eobj2 = event('an_important_topic', 'This is some data for the event 2')
eobj3 = event('an_important_topic', 'This is some data for the event 3')
eobj4 = event('an_important_topic', 'This is some data for the event 4')

eb.post(eobj1)
eb.post(eobj2)
eb.post(eobj3)
eb.post(eobj4)

eb.shutdown()
sleep(2)
A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:
from threading import Lock, Thread
from time import sleep, time
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
from random import randint


class counter_aggregator(subscriber, Thread):
    '''
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    '''
    def __init__(self, counter_names):
        Thread.__init__(self)
        self.counter_names = counter_names
        self.locks = {}
        self.counts = {}
        self.keep_running = True
        self.collect_times = {}
        for counter in counter_names:
            self.locks[counter] = Lock()
            self.counts[counter] = 0
            self.collect_times[counter] = time()

    def process(self, eobj):
        '''
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        '''
        counter_name = eobj.get_topic()
        if counter_name not in self.counter_names:
            return
        count = eobj.get_data()
        with self.locks[counter_name]:
            self.counts[counter_name] += count

    def stop(self):
        self.keep_running = False

    def __call__(self):
        '''
        Keep outputing the aggregated counts every 2 seconds
        '''
        while self.keep_running:
            sleep(2)
            for counter_name in self.counter_names:
                with self.locks[counter_name]:
                    print('Change for counter %s = %d, in last %f secs' % (counter_name,
                          self.counts[counter_name], time() - self.collect_times[counter_name]))
                    self.counts[counter_name] = 0
                    self.collect_times[counter_name] = time()
        print('Aggregator exited')


class count_producer:
    '''
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    '''
    def __init__(self, counters, ebus):
        self.counters = counters
        self.ebus = ebus
        self.keep_running = True
        self.num_counter = len(counters)

    def stop(self):
        self.keep_running = False

    def __call__(self):
        while self.keep_running:
            ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
            ebus.post(ev)
            sleep(0.02)
        print('producer exited')

if __name__ == '__main__':
    ebus = eventbus()
    counters = ['c1', 'c2', 'c3', 'c4']
    subcr = counter_aggregator(counters)
    producer = count_producer(counters, ebus)
    for counter in counters:
        ebus.register_consumer(subcr, counter)
    threads = []
    i = 30
    while i > 0:
        threads.append(Thread(target=producer))
        i -= 1

    aggregator_thread = Thread(target=subcr)
    aggregator_thread.start()
    for thrd in threads:
        thrd.start()
    sleep(20)
    producer.stop()
    subcr.stop()
    sleep(2)
    ebus.shutdown()




Wednesday, April 29, 2015

Using GDB to debug Java!

GDB is a very powerful debugger which is mainly used to debug C/C++ programs. But as many of the languages such as Java, Python, Perl, PHP, Ruby etc. are implemented in C/C++, so sometimes GDB is even useful in debugging some issues in programs implemented in these languages as well. Today I demonstrate one such use cases with Java programs.

Below is the Java program that calls a native API HelloWorld.  The HelloWorld internally calls printf to print some string to console and also prints some string to a file /tmp/testout.txt

//JniHelloWorld.java
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

public class JniHelloWorld implements Runnable{
	
	private native void HelloWorld();
	private Object lock1 = null;
	private Object lock2 = null;
	private FileOutputStream fos = null;

	public JniHelloWorld() {
		lock1 = new Object();
		lock2 = new Object();
		try {
			fos = new FileOutputStream(new File("/tmp/testout.txt"));
		} catch (FileNotFoundException e) {
			e.printStackTrace();
			System.exit(1);
		}
	}
	
	@Override
	public void run() {
		byte []outbytes = "This will go to /tmp/testout.txt file\n".getBytes();
        while (true) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
            }
            synchronized (lock1) {
                HelloWorld();
            }
            synchronized (lock2) {
                try {
                    fos.write(outbytes);
                } catch (IOException e) {
                }
            }
        }
	}	
	
	public static void main(String[] args) {
        System.loadLibrary("test");
		JniHelloWorld jnh = new JniHelloWorld();
		int i = 0;
		while (i++ < 20){
			new Thread(jnh).start();
		}
	}
}
Let us compile the Java code. $ javac javac JniHelloWorld.java Generate the JNI header file: $ javah JniHelloWorld Create the C file test.c
#include <stdio.h>
#include "JniHelloWorld.h"

JNIEXPORT void JNICALL Java_JniHelloWorld_HelloWorld
  (JNIEnv *env, jobject o){
              printf("hello from printf\n");  
  }

Now let us create the shared library:
$ gcc -I . -I $JAVA_HOME/include -I $JAVA_HOME/include/linux -fPIC -c test.c
In my machine JAVA_HOME points to /home/geet/sws/jdk1.8.0_20.

Now create the shared library:
$ gcc -shared -o libtest.so test.o

Set LD_LIBRARY_PATH (eg. export LD_LIBRARY_PATH=.) and then run the example Java program.
$ java JniHelloWorld

The process keeps running continuously printing to console and /tmp/testout.txt. If you check the program there is no chance for any dead lock in the code.

We may check the threads of Java thread using the commands in the below example (I am not using Jstack or jconsole, JVisualVM etc. as I want to check even the threads created by the native library). Every Java thread is mapped to a native thread (a posix thread on a Linux machine).

$ jps
4512 JniHelloWorld
4602 Jps

$ ps -fT -p 4512

Output will be something like:
UID PID SPID PPID C STIME TTY TIME CMD
geet 4512 4512 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4513 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4514 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4515 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4516 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4517 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4518 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4519 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4520 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4521 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4522 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4523 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
geet 4512 4524 3068 0 08:56 pts/15 00:00:00 java JniHelloWorld
...
...

We may attach gdb to any of the thread to examine what it is doing
Eg. $ sudo gdb -p 4551

Now everything runs fine and no issues. Now let us spawn this Java program from another Java program.

// Spawner.java
import java.io.IOException;

public class Spawner {
	
	public static void main(String[] args) {
		ProcessBuilder pb =
				   new ProcessBuilder("java", "JniHelloWorld");
		Process p;
		try {
			p = pb.start();
			while (p.isAlive()){
				Thread.sleep(200);
			}
		} catch (IOException e) {
		} catch (InterruptedException e ){
		}
	}

}

$javac Spawner.java

Now run the Java process
$java Spawner

We may look at the modification time of /tmp/testout.txt to check if it is being updated continuously. After some time, we will see that the file is not being updated at all, but all the Java processes are running!

$ jps
5602 Spawner
6086 Jps
5622 JniHelloWorld


Definitely something wrong with JniHelloWorld process. Let us take its thread-dump

$jstack 5622

Now we examine the output of jstack.
Most of the threads are blocked,
"Thread-0" #10 prio=5 os_prio=0 tid=0x00007fde040e5000 nid=0x160b waiting for monitor entry [0x00007fdddae06000]
java.lang.Thread.State: BLOCKED (on object monitor)
at JniHelloWorld.run(JniHelloWorld.java:33)
- waiting to lock <0x000000076ce5b140> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:745)


They are waiting for lock 0x000000076ce5b140 which corresponds to lock1.

Let us see who acquired the lock lock1 from output of jstack, I found it be the below thread
"Thread-6" #16 prio=5 os_prio=0 tid=0x00007fde040ef800 nid=0x1611 runnable [0x00007fddda800000]
java.lang.Thread.State: RUNNABLE
at JniHelloWorld.HelloWorld(Native Method)
at JniHelloWorld.run(JniHelloWorld.java:33)
- locked <0x000000076ce5b140> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:745)

It has locked lock1 and stuck at native HelloWorld method, effectively blocking other threads waiting for that lock. It is a special kind of dead lock.

Now we need to check why this thread stuck at native method, Java debugger or tools won't help as we are in the native world ! The value of nid for this thread is 0x1611 (= 5649) which is the light weight process id for this thread. Let us attach gdb to this process and get its backtrace,

$sudo gdb -p 0x1611

The backtrace shows:
(gdb) bt
#0 0x00007fde09d7c3cd in write () at ../sysdeps/unix/syscall-template.S:81
#1 0x00007fde09d09a83 in _IO_new_file_write (f=0x7fde0a050400 <_io_2_1_stdout_>, data=0x7fde0a8b4000, n=4096)
at fileops.c:1261
#2 0x00007fde09d0af5c in new_do_write (to_do=4096,
data=0x7fde0a8b4000 "f\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\n"...,
fp=0x7fde0a050400 <_io_2_1_stdout_>) at fileops.c:538
#3 _IO_new_do_write (fp=0x7fde0a050400 <_io_2_1_stdout_>,
data=0x7fde0a8b4000 "f\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\nhello from printf\n"...,
to_do=4096) at fileops.c:511
#4 0x00007fde09d0a121 in _IO_new_file_xsputn (f=0x7fde0a050400 <_io_2_1_stdout_>, data=, n=17)
at fileops.c:1332
#5 0x00007fde09d00957 in _IO_puts (str=0x7fdddae086fd "hello from printf") at ioputs.c:41
#6 0x00007fdddae086f1 in Java_JniHelloWorld_HelloWorld (env=0x7fde040ef9e8, o=0x7fddda8005f0) at test.c:6
#7 0x00007fddf489f8e6 in ?? ()
#8 0x000000076ce5b128 in ?? ()
#9 0x00007fde0935f9ac in InterpreterRuntime::monitorexit(JavaThread*, BasicObjectLock*) ()
from /home/geet/sws/jdk1.8.0_20/jre/lib/amd64/server/libjvm.so
#10 0x00007fddf47a798d in ?? ()
#11 0x00007fddf47a798d in ?? ()
#12 0x000000076ce5b128 in ?? ()
#13 0x0000000000000003 in ?? ()
#14 0x000000076ce5b140 in ?? ()
#15 0x00007fddda800650 in ?? ()
#16 0x00007fddf2410668 in ?? ()
#17 0x00007fddda8006d0 in ?? ()
#18 0x00007fddf2410850 in ?? ()
#19 0x0000000000000000 in ?? ()


Clearly the back trace shows that the thread is stuck at write system call (results from call to printf). Why is it blocked? Because its output buffer is full and nobody is taking out or cleaning the data. When we directly run JniHelloWorld from a terminal, the terminal is continuously clearing and spitting out this data and hence then it doesn't block.

BTW, there are neat ways to consume or redirect output buffers of spwaned processes to avoid such problems. I have used this not-very-neat example mainly to show how we can use GDB to debug Java programs in such situations.



Saturday, February 28, 2015

A ganglia plugin for collecting Stats from Storm topologies

Ganglia is a highly scalable monitor for distributed systems. It facilitates users to examine  different stats of the monitored machines in a single pane. It maintains historical stats and hence very good for checking how the CPU usage, memory usage, disk usage etc., JVM GC pauses are varying over time on the distributed machines.
Ganglia basically have three components.  They are Ganglia monitoring daemon(gmond), Ganglia meta daemon (gmetad) and Ganglia web front end.

Gmond monitors the changes in the host state, announces them, listen state changes from other nodes running gmond, and returns the XML description of the cluster state when queried. Gmond depends on plug-ins to collect host stats. The plug-ins may be written in C,C++,Python, Perl or PHP. In fact, we can implement real piece of code that collects the stats in any language with some tricks. Also, it won't be difficult to extend it support plug-ins  from other languages as well.


Gmetad polls the clusters it is monitoring and collects the reported metrics and write them to individual round robin databases.


Ganglia web front end provides  the metric Visualization UI.

There is a very good link to install and configure Ganglia on Ubuntu 14.04 here (from digital ocean).  Please follow the steps to setup Ganglia monitoring on your machines.  You may follow the similar steps to install on other Linux systems.

As you have installed and and configured Ganglia, now it is time to write and test a new Ganglia plug-in. From Ganglia 3.1 onwards,  Gmond has a modular interface that allows us to extend its capability through plug-ins. We can write modules or plug-ins to collect new kind of metrices. Gmond uses the Mod_Python module (modpython.so) to interface with all the Pythin plug-ins to collect the metrices published by them.

First we need to create an entry for modpython.so in gmond configuration file. Edit gmond.conf (by default it is installed at /etc/ganglia/gmond.conf) and add the below lines in the "modules" section:

  module {
    name = "python_module"
    path = "/usr/lib/ganglia/modpython.so"
    params = "/usr/lib/ganglia/python"
  }

So, the modules section in gmond.conf will look like somewhat like as shown below:

modules {
  module {
    name = "core_metrics"
  }
  module {
    name = "cpu_module"
    path = "/usr/lib/ganglia/modcpu.so"
  }
  module {
    name = "disk_module"
    path = "/usr/lib/ganglia/moddisk.so"
  }
  module {
    name = "load_module"
    path = "/usr/lib/ganglia/modload.so"
  }
  module {
    name = "mem_module"
    path = "/usr/lib/ganglia/modmem.so"
  }
  module {
    name = "net_module"
    path = "/usr/lib/ganglia/modnet.so"
  }
  module {
    name = "proc_module"
    path = "/usr/lib/ganglia/modproc.so"
  }
  module {
    name = "sys_module"
    path = "/usr/lib/ganglia/modsys.so"
  }
  module {
    name = "python_module"
    path = "/usr/lib/ganglia/modpython.so"
    params = "/usr/lib/ganglia/python"
  } 




Also, create a directory /etc/ganglia/pyconfs to store the configuration files for the python modules. Let us use .pyconf as the extension of the Python modules.  Add the below line in /etc/ganglia/gmond.conf
include ('/etc/ganglia/pyconfs/*.pyconf')

We will add a configuration file for the sample plugin (/etc/ganglia/pyconfs/ganglia_plugin_sample.pyconf). Content of this file is pasted below:
modules {
    module {
        name = "ganglia_plugin_sample"
        language = "python"
        param firstparam {
            value = 100
        }
        param secondparam {
            value = 500
        }
    }
}

collection_group {
  collect_every = 20
  time_threshold = 90
  metric {
    name_match = ".*" 
    value_threshold = "1"
  }
}



Now let us add a very simple Ganglia python plug-in. The module should implement the two functions at least, they are metric_init and metric_cleanup.  metric_init generally initializes the values for different metrices that the plug-in will be publishing. 

Below is the script:

import sys
import os
import random

descriptors = list()
firstparam_max = 1000
secondparam_max = 1000

def callback_fun1(name):
    '''
    Returns a random number between 1 and firstparam_max
    '''
    random.seed()
    return random.randint(1, firstparam_max)

def callback_fun2(name):
    '''
    Returns a random number between 5 and secondparam_max
    '''
    random.seed()
    return random.randint(5,secondparam_max)

def metric_init(params):
    global descriptors, firstparam_max, secondparam_max
    if 'firstparam' in params:
        firstparam_max = int(params['firstparam'])
        d = {'name': 'firstparam',
                'call_back': callback_fun1,
                'time_max': 90,
                'value_type': 'uint',
                'units': 'Count',
                'slope': 'both',
                'format': '%u',
                'description': 'Sample metric',
                'groups': 'Sample'}
        descriptors.append(d)
    
    if 'secondparam' in params:
        secondparam_max = int(params['secondparam'])
        d = {'name': 'secondparam',
                'call_back': callback_fun2,
                'time_max': 90,
                'value_type': 'uint',
                'units': 'Count',
                'slope': 'both',
                'format': '%u',
                'description': 'Sample metric',
                'groups': 'Sample'}
        descriptors.append(d)
    return descriptors

    def metric_cleanup():
        '''
        We don't need any cleanup :) :)
        '''
        pass


# This routine is for debugging purpose only and not used by gmond
# To debug the output, run as below:
# $ python ganglia_plugin_sample.py
if __name__ == '__main__':
    params = {'firstparam': 100, 'secondparam' : 500}
    metric_init(params)
    for d in descriptors:
        v = d['call_back'](d['name'])
        print '%s --> %u' % (d['name'], v)








Here, the plug-in is collecting two metrices (firstparam and secondparam). The value for firstparam is computed by function callback_fun1 and for secondparam is collected by callback_fun2. THe "slope" is set to "both" for both the metrices. This a hint for the gmetad to know that the value may increase or decrease.

We should copy this script to/usr/lib/ganglia/python/ganglia_plugin_sample.py.

After you copy the plugin-script and make the configuration changes, restart ganglia-monitor (gmond) service.

$sudo service ganglia-monitor restart

After some time you should see the "Sample metrics" group and the graphs for the metrices firstparam and secondparam under it.


I have created a Python module for collecting storm stats. The module is available here . Also for instructions please refer this readme.

I have pasted the module below for your easy reference:
import logging
import md5
import pickle
import re
import sys
from time import time
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol

clusterinfo = None
topology_found = True
descriptors = list()
topologies = []
serialfile_dir = '/tmp'
topology_summary_cols_map = {'status': 'Status', 'num_workers': 'Worker Count',
                             'num_executors': 'ExecutorCount', 'uptime_secs': 'Uptime',
                             'num_tasks': 'TaskCount'}

spout_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
               'Emitted': ['Count', '%f', 'double'],
               'Transferred': ['Count/sec', '%f', 'double'],
               'CompleteLatency': ['ms', '%f', 'double'], 'Acked': ['Count/Sec', '%f', 'double'],
               'Failed': ['Count/sec', '%f', 'double']}

bolt_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
              'Emitted': ['Count/sec', '%f', 'double'],
              'Executed': ['Count/sec', '%f', 'double'],
              'Transferred': ['Count/sec', '%f', 'double'],
              'ExecuteLatency': ['ms', '%f', 'double'],
              'ProcessLatency': ['ms', '%f', 'double'],
              'Acked': ['Count/sec', '%f', 'double'],
              'Failed': ['Count/sec', '%f', 'double']}

diff_cols = ['Acked', 'Failed', 'Executed', 'Transferred', 'Emitted']
overall = {'ExecutorCount': ['Count', '%u', 'uint'],
           'WorkerCount': ['Count', '%u', 'uint'],
           'TaskCount': ['Count', '%u', 'uint'],
           'UptimeSecs': ['Count', '%u', 'uint']}

toplogy_mods = {}
lastchecktime = 0
lastinfotime = 0
maxinterval = 6
all_topology_stats = {}
bolt_array = {}
spout_array = {}
nimbus_host = '127.0.0.1'
nimbus_port = 6627
logging.basicConfig(filename='/tmp/storm_topology.log', level=logging.INFO,
                    format='%(asctime)s  %(levelname)s line:%(lineno)d %(message)s', filemode='w')
logging_levels = {'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'WARNING': logging.WARNING,
                  'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL}


def get_avg(arr):
    if len(arr) < 1:
        return 0
    return sum(arr) / len(arr)


def normalize_stats(stats, duration):
    for k in stats:
        statsk = stats[k]
        if 'Emitted' in statsk and duration > 0:
            if statsk['Emitted'] > 0:
                statsk['Emitted'] = statsk['Emitted'] / duration
        if 'Acked' in statsk and duration > 0:
            if statsk['Acked'] > 0:
                statsk['Acked'] = statsk['Acked'] / duration
        if 'Executed' in statsk and duration > 0:
            if statsk['Executed'] > 0:
                statsk['Executed'] = statsk['Executed'] / duration


def freshen_topology(topology):
    tmpsavestats = None
    inf = None
    savedlastchecktime = 0
    tmp = md5.new()
    tmp.update(topology)
    filename = '/tmp/save_stats_for_' + tmp.hexdigest() + '.pk'
    try:
        inf = open(filename, 'rb')
    except IOError as e:
        logging.warn(e)
    if inf is not None:
        try:
            tmpsavestats = pickle.load(inf)
            savedlastchecktime = pickle.load(inf)
        except EOFError as e:
            logging.warn(e.message())
        inf.close()
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.warn('Info not got for topology ' + topology)
        return
    overallstats = all_topology_stats[topology]['overallstats']
    boltspoutstats = all_topology_stats[topology]['boltspoutstats']
    of = open(filename, 'wb')
    if of is not None:
        pickle.dump(boltspoutstats, of)
        pickle.dump(time(), of)
        of.close()
    if overallstats['UptimeSecs'] > (lastchecktime - savedlastchecktime):
        if tmpsavestats is not None:
            for bolt in bolt_array[topology]:
                if bolt in tmpsavestats and bolt in boltspoutstats:
                    stats_new = boltspoutstats[bolt]
                    stats_old = tmpsavestats[bolt]
                    for key in bolt_stats:
                        if key == 'ExecuteLatency' or key == 'ProcessLatency':
                            continue
                        if key not in stats_new:
                            continue
                        if key not in stats_old:
                            continue
                        if key in diff_cols:
                            stats_new[key] -= stats_old[key]
            for spout in spout_array[topology]:
                if spout in tmpsavestats and spout in boltspoutstats:
                    stats_new = boltspoutstats[spout]
                    stats_old = tmpsavestats[spout]
                    for key in spout_stats:
                        if key == 'CompleteLatency':
                            continue
                        if key not in stats_new:
                            continue
                        if key not in stats_old:
                            continue
                        if key in diff_cols:
                            stats_new[key] -= stats_old[key]
            normalize_stats(boltspoutstats, lastchecktime - savedlastchecktime)
        else:
            normalize_stats(boltspoutstats, overallstats['UptimeSecs'])
    else:
        normalize_stats(boltspoutstats, overallstats['UptimeSecs'])


def freshen():
    global lastchecktime
    if time() > (lastchecktime + maxinterval):
        lastchecktime = time()
        get_topology_stats_for(topologies)
        for topology in topologies:
            freshen_topology(topology)


def callback_boltspout(name):
    freshen()
    first_pos = name.find('_')
    last_pos = name.rfind('_')
    topology_mod = name[0:first_pos]
    bolt = name[first_pos + 1: last_pos]
    statname = name[last_pos + 1:]
    topology = toplogy_mods[topology_mod]
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.debug('Returing 0 for ' + name)
        return 0
    logging.debug('Got stats for ' + name + " " +
                  str(all_topology_stats[topology]['boltspoutstats'][bolt][statname]))
    return all_topology_stats[topology]['boltspoutstats'][bolt][statname]


def callback_overall(name):
    freshen()
    topology_mod, name = name.split('_')
    topology = toplogy_mods[topology_mod]
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.debug('Returing 0 for ' + name)
        return 0
    logging.debug(topology + ' ' + name + ' ' +
                  str(all_topology_stats[topology]['overallstats'][name]))
    return all_topology_stats[topology]['overallstats'][name]


def update_task_count(component_task_count, component_name, count):
    if component_name not in component_task_count:
        component_task_count[component_name] = 0
    component_task_count[component_name] += count


def update_exec_count(component_exec_count, component_name, count):
    if component_name not in component_exec_count:
        component_exec_count[component_name] = 0
    component_exec_count[component_name] += count


def update_whole_num_stat_special(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = 0
    for k in stats:
        if k == '__metrics' or k == '__ack_init' or k == '__ack_ack' or k == '__system':
            continue
        store[boltname][statname] += stats[k]


def update_whole_num_stat(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = 0
    for k in stats:
        store[boltname][statname] += stats[k]


def update_avg_stats(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = []
    for k in stats:
        store[boltname][statname].append(stats[k])


def get_topology_stats_for(topologies):
    all_topology_stats.clear()
    for topology in topologies:
        all_topology_stats[topology] = get_topology_stats(topology)


def refresh_topology_stats():
    logging.debug('Refreshing topology stats')
    for t in topologies:
        all_topology_stats[t] = {'topology_stats_got': False}
    global clusterinfo
    try:
        transport = TSocket.TSocket(nimbus_host, nimbus_port)
        transport.setTimeout(1000)
        framedtrasp = TTransport.TFramedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(framedtrasp)
        client = Nimbus.Client(protocol)
        framedtrasp.open()
        boltspoutstats = None
        component_task_count = None
        component_exec_count = None
        clusterinfo = client.getClusterInfo()
        for tsummary in clusterinfo.topologies:
            if tsummary.name not in topologies:
                continue
            toplogyname = tsummary.name
            overallstats = {}
            overallstats['ExecutorCount'] = tsummary.num_executors
            overallstats['TaskCount'] = tsummary.num_tasks
            overallstats['WorkerCount'] = tsummary.num_workers
            overallstats['UptimeSecs'] = tsummary.uptime_secs
            all_topology_stats[toplogyname]['overallstats'] = overallstats
            boltspoutstats = {}
            component_task_count = {}
            component_exec_count = {}
            all_topology_stats[toplogyname]['boltspoutstats'] = boltspoutstats
            all_topology_stats[toplogyname]['component_task_count'] = component_task_count
            all_topology_stats[toplogyname]['component_exec_count'] = component_exec_count
            tinfo = client.getTopologyInfo(tsummary.id)
            all_topology_stats[toplogyname]['topology_stats_got'] = True
            for exstat in tinfo.executors:
                stats = exstat.stats
                update_whole_num_stat_special(stats.emitted[":all-time"], boltspoutstats,
                                              exstat.component_id, 'Emitted')
                update_whole_num_stat_special(stats.transferred[":all-time"], boltspoutstats,
                                              exstat.component_id, 'Transferred')

                numtask = exstat.executor_info.task_end - exstat.executor_info.task_end + 1
                update_task_count(component_task_count, exstat.component_id, numtask)
                update_exec_count(component_exec_count, exstat.component_id, 1)
                if stats.specific.bolt is not None:
                    update_whole_num_stat(stats.specific.bolt.acked[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Acked')
                    update_whole_num_stat(stats.specific.bolt.failed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Failed')
                    update_whole_num_stat(stats.specific.bolt.executed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Executed')
                    update_avg_stats(stats.specific.bolt.process_ms_avg["600"], boltspoutstats,
                                     exstat.component_id, 'process_ms_avg')
                    update_avg_stats(stats.specific.bolt.execute_ms_avg["600"], boltspoutstats,
                                     exstat.component_id, 'execute_ms_avg')
                if stats.specific.spout is not None:
                    update_whole_num_stat(stats.specific.spout.acked[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Acked')
                    update_whole_num_stat(stats.specific.spout.failed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Failed')
                    update_avg_stats(stats.specific.spout.complete_ms_avg[":all-time"], boltspoutstats,
                                     exstat.component_id, 'complete_ms_avg')
            if '__acker' in boltspoutstats:
                del boltspoutstats['__acker']
            for key in boltspoutstats:
                if 'complete_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['complete_ms_avg'])
                    boltspoutstats[key]['CompleteLatency'] = avg
                    del boltspoutstats[key]['complete_ms_avg']
                if 'process_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['process_ms_avg'])
                    boltspoutstats[key]['ProcessLatency'] = avg
                    del boltspoutstats[key]['process_ms_avg']
                if 'execute_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['execute_ms_avg'])
                    boltspoutstats[key]['ExecuteLatency'] = avg
                    del boltspoutstats[key]['execute_ms_avg']

            for key in component_task_count:
                if key in boltspoutstats:
                    boltspoutstats[key]['Tasks'] = component_task_count[key]
            for key in component_exec_count:
                if key in boltspoutstats:
                    boltspoutstats[key]['Executors'] = component_exec_count[key]
        framedtrasp.close()

    except Exception as e:
        clusterinfo = None
        logging.warn(e)


def get_topology_stats(toplogyname):
    global lastinfotime
    if (lastinfotime + 4) < time():
        for t in all_topology_stats:
            all_topology_stats[t] = None
        lastinfotime = time()
        refresh_topology_stats()
    return all_topology_stats[toplogyname]


def metric_init_topology(params):
    global descriptors
    groupname = ''
    if 'topology' in params and len(params['topology']):
        groupname = params['topology']
    else:
        return
    topology = groupname
    topology_mod = re.sub("\s+", "", topology)
    topology_mod = re.sub("[_]+", "", topology_mod)
    toplogy_mods[topology_mod] = topology
    if 'spouts' in params:
        spout_array[topology] = re.split('[,]+', params['spouts'])
        for spout in spout_array[topology]:
            for statname in spout_stats:
                d = {'name': topology_mod + '_' + spout + '_' + statname, 'call_back': callback_boltspout,
                     'time_max': 90,
                     'value_type': spout_stats[statname][2],
                     'units': spout_stats[statname][0],
                     'slope': 'both',
                     'format': spout_stats[statname][1],
                     'description': '',
                     'groups': groupname}
                descriptors.append(d)

    if 'bolts' in params:
        bolt_array[topology] = re.split('[,]+', params['bolts'])
        for bolt in bolt_array[topology]:
            for statname in bolt_stats:
                d = {'name': topology_mod + '_' + bolt + '_' + statname, 'call_back': callback_boltspout,
                     'time_max': 90,
                     'value_type': bolt_stats[statname][2],
                     'units': bolt_stats[statname][0],
                     'slope': 'both',
                     'format': bolt_stats[statname][1],
                     'description': '',
                     'groups': groupname}
                descriptors.append(d)

    for key in overall:
        d = {'name': topology_mod + '_' + key, 'call_back': callback_overall,
             'time_max': 90,
             'value_type': overall[key][2],
             'units': overall[key][0],
             'slope': 'both',
             'format': overall[key][1],
             'description': '',
             'groups': groupname}
        descriptors.append(d)
    logging.info('Inited metric for ' + groupname)


def metric_init(params):
    global topologies, nimbus_host, nimbus_port
    if 'nimbus_host' in params:
        nimbus_host = params['nimbus_host']
    if 'nimbus_port' in params:
        nimbus_port = params['nimbus_port']
    if 'topologies' not in params:
        return
    if 'storm_thrift_gen' in params:
        sys.path.append(params['storm_thrift_gen'])
    else:
        sys.path.append('/usr/lib/ganglia')
    if 'loglevel' in params:
        loglevel = params['loglevel'].strip().upper()
        if loglevel in logging_levels:
            logging.getLogger().setLevel(logging_levels[loglevel])

    global Nimbus, ttypes
    from stormpy.storm import Nimbus, ttypes

    tss = re.split('[,]+', params['topologies'])
    topologies = tss
    alltops = {}
    for t in tss:
        alltops[t] = {'topology': t}
        alltops[t]['tlen'] = len(t)
        t_bolts = t + '_bolts'
        if t_bolts in params:
            alltops[t]['bolts'] = params[t_bolts]
        t_spouts = t + '_spouts'
        if t_spouts in params:
            alltops[t]['spouts'] = params[t_spouts]
    for t in alltops:
        logging.info('Initing metric for ' + t)
        metric_init_topology(alltops[t])
    return descriptors

if __name__ == '__main__':
    params = {'topologies': 'SampleTopology,AnotherTopology',
              'SampleTopology_spouts': 'SampleSpoutTwo', 'SampleTopology_bolts': 'boltc',
              'AnotherTopology_spouts': 'Spout', 'AnotherTopology_bolts': 'bolta,boltb,boltd',
              'loglevel': 'ERROR'}
    metric_init(params)
    for d in descriptors:
        v = d['call_back'](d['name'])
        formt = "%s " + d['format']
        print formt % (d['name'], v)