Sunday, January 5, 2014

Java Event Bus with Guava

Sometimes we need to different modules running in a Java process to communicate with each other. Especially threads will communicate or pass messages between them. IPC is never a good option in such scenarios. Message queue within the same process is even less appealing than a linked list.
Also, coding the stuffs ourselves tedious and we should look around to see if somebody already did that and published the same.

Google Guava library is one of the most popular Java library. This also includes the "EventBus" module.  EventBus provides publish-subscribe-style communication between components without requiring the components to explicitly register with one another. Here the publisher and subscribers communicate via the highway, the shared event bus.
The working is simple. We create an EventBus, the subscribers register themselves to the event bus, and the publisher publish or post the "messages" to the event bus. The event bus takes care of delivering the messages to the  appropriate subscribers. There are synchronous and asynchronous  event buses. In synchronous event bus, when a publisher posts a "message",  it will not return till all the subscribers receive the message. In asynchronous event bus, the subscribers are invoked in different threads.

Now let us look at a simple example. There are many publishers who keep publishing the delta of some counters while there is an aggregator which receives the counters and add the delta (It may persist them in database or file etc.)

As the counter publishers and the aggregator will communicate with a common event bus (may be asynchronous or synchronous depending our need, but in most real world cases we generally have to go with asynchronous bus).

Below is the asynchronus event bus provider module:

package geet.example.eventbus;
import java.util.concurrent.Executors;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;

public class CounterAsyncEventBus{
 private static EventBus bus = new AsyncEventBus(Executors.newFixedThreadPool(1));
 
 public static EventBus getEventBus(){
  return bus;
 }
}


If we want a synchronous event bus, below is the provider:
package geet.example.eventbus;

import com.google.common.eventbus.EventBus;

public class CounterEventBus{
 private static EventBus bus = new EventBus();
 
 public static EventBus getEventBus(){
  return bus;
 }
}



Below is the Counter class, complex :)
package geet.example.eventbus;

public class Counter {
 private String counterName = null;
 private Integer counterDelta = null;
 public Counter(String counterName, Integer counterDelta) {
  this.counterName = counterName;
  this.counterDelta = counterDelta;
 }
 public String getCounterName() {
  return counterName;
 }
 public void setCounterName(String counterName) {
  this.counterName = counterName;
 }
 public Integer getCounterDelta() {
  return counterDelta;
 }
 public void setCounterDelta(Integer counterDelta) {
  this.counterDelta = counterDelta;
 }
}




Counter Aggregator code is below. The counter aggregator object gets an instance of the common event bus and then registers itself to the event bus. The constructor also takes a boolean flag as an argument which acts an indicator whether to use synchronous or the asynchronous event bus. Also, note the "@Subscribe" annotation which tells that if publisher posts a "Counter", the method "addCount" has to be invoked.

package geet.example.eventbus;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class CounterAggregator {
 private ConcurrentHashMap<String, AtomicInteger> counters = null;
 private EventBus bus = null;

 public CounterAggregator(boolean async) {
  counters = new ConcurrentHashMap<String, AtomicInteger>();
  if (async)
   bus = CounterAsyncEventBus.getEventBus();
  else
   bus = CounterEventBus.getEventBus();
  bus.register(this);
 }

 @Subscribe
 @AllowConcurrentEvents
 public void addCount(Counter counter) {
  Integer val = counter.getCounterDelta();
  String counterName = counter.getCounterName();

  AtomicInteger oldval = counters.putIfAbsent(counterName,
    new AtomicInteger(val));
  if (oldval != null) {
   oldval.addAndGet(val);
  }
  System.out.println(Thread.currentThread().getName()
    + " Value for the counter " + counterName + "="
    + counters.get(counterName));

 }
}




Now below is the code for counter generators. Here also counter generators get the asynchronous or synchronous event bus instance. The "publish" method internally posts Counter objects to the event bus which will be consumed by the subscriber.

package geet.example.eventbus;

import com.google.common.eventbus.EventBus;
public class CounterGenerator {
 private EventBus bus = null;

 public CounterGenerator(boolean async) {
  if (async)
   bus = CounterAsyncEventBus.getEventBus();
  else
   bus = CounterEventBus.getEventBus();
 }

 public void publish(String name, int val) {
  bus.post(new Counter(name, val));
 }

}


Now we have everything set for the event bus demo application. The code is given below. In this example I am showing how to work with asynchronous event bus, but this can easily modified to use synchronous event bus also. It creates a counter generator, a counter aggregator. Counter generator publishes some delta for the counters c1,c2,c3 and c4. The aggregator keeps accumulating the counts. Bottom line is, the event bus elegantly handles the routing of the messages internally saving us from writing a lot of tedious code and reinventing another wheel :)

package geet.example.eventbus;

import java.util.Random;

public class Example {
 public static void main(String[] args) {
  String []counterNames = {"c1", "c2", "c3", "c4" };
  Random random = new Random();
  @SuppressWarnings("unused")
  CounterAggregator aggr = new CounterAggregator(true);
  CounterGenerator gen = new CounterGenerator(true);
  int i = 0;
  while (i++ < 10000){
   gen.publish(counterNames[i & 3], random.nextInt(50000));
  }
 }
}

You may access the all the samples from this github link   

Monday, August 12, 2013

Bytebuffer class in C++

I was wondering if a class similar to Java ByteBuffer is available in C++ is as well. Strinstream works, but it doesn't have the control what Java ByteBuffer can offer. It does a very simple job. But if a class is already available, then it becomes convenient for the developers. There are many libraries that can generate the data encoding code for us. Good examples are Apache Thrift and Google protobuf.  But there are many softwares requiring messages to be encoded in some other formats hoping they won't be accessible only by the languages supported by Thrift, Protobuf etc.  

My implementation for ByteBuffer in C++ is accessible at this github link.
There is a test program (test.cpp) which demonstrates the operation.
 This can be compiled as shown below:
$g++ -I .  test.cpp bytebuffer.cpp bytebuffer_exception.cpp

Then run it as shown below:
$./a.out

The source for the test program is pasted below:

#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <bytebuffer.hpp>
#include <stdio.h>

using namespace GeetPutula;
using namespace std;

int main()
{
    ByteBuffer buffer(256, ByteBuffer::LITTLE);
    buffer.putInt32(102);
    buffer.rewind();
    int32_t val = buffer.getInt32();
    cout << val << endl;
    val = buffer.getInt32();
    cout << val << endl;
    size_t position = buffer.currentPosition();
    cout << buffer.position(1024) << endl;
    cout << buffer.position(1024) << endl;
    if (buffer.putInt32(100) == false) {
        cout << "Worked as expected \n";
    }
    buffer.position(position);
    buffer.putBytes((void *)"this is my string", strlen("this is my string") + 1);
    void *buf = malloc(strlen("this is my string") + 1);
    buffer.position(position);
    buffer.readBytes(buf, strlen("this is my string") + 1);
    cout << (char *) buf << endl;
    free(buf);
    position = buffer.currentPosition();
    buffer.putInt16(45);
    buffer.position(position);
    cout << buffer.getInt16() << endl;
    cout << "Current position " << buffer.currentPosition() << endl;
    position = buffer.currentPosition();
    buffer.putInt64(-123456789123400);
    buffer.position(position);
    cout << buffer.getInt64() << endl;
    position = buffer.currentPosition() ;
    cout << "Current position " << position << endl;
    buffer.putFloat(-1223.3233);
    buffer.position(position);
    float f  = buffer.getFloat() ;
    printf("%f floattttt\n", f);
    
    position = buffer.currentPosition() ;
    cout << "Current position " << position << endl;
    buffer.putDouble(-1223879967.3233129);
    buffer.position(position);
    cout << buffer.getDouble() << endl;
    buffer.position(position);
    double x = buffer.getDouble();
    printf("%lf \n", x);
    return 0;
}

Monday, July 15, 2013

Daemontools compilation problem on Linux 64 bit system

Recently I was working on evaluating daemontools for some components which need to be run as Linux services and which are to be restarted if they goes down for some reason. I downloaded daemontools and followed the instruction to build it. But it failed with the below error:

./makelib unix.a alloc.o alloc_re.o buffer.o buffer_0.o buffer_1.o \
buffer_2.o buffer_get.o buffer_put.o buffer_read.o buffer_write.o \
coe.o env.o error.o error_str.o fd_copy.o fd_move.o fifo.o lock_ex.o \
lock_exnb.o ndelay_off.o ndelay_on.o open_append.o open_read.o \
open_trunc.o open_write.o openreadclose.o pathexec_env.o \
pathexec_run.o prot.o readclose.o seek_set.o sgetopt.o sig.o \
sig_block.o sig_catch.o sig_pause.o stralloc_cat.o stralloc_catb.o \
stralloc_cats.o stralloc_eady.o stralloc_opyb.o stralloc_opys.o \
stralloc_pend.o strerr_die.o strerr_sys.o subgetopt.o wait_nohang.o \
wait_pid.o
./load envdir unix.a byte.a
/usr/bin/ld: errno: TLS definition in /lib64/libc.so.6 section .tbss mismatches non-TLS reference in envdir.o
/lib64/libc.so.6: could not read symbols: Bad value
collect2: ld returned 1 exit status
make: *** [envdir] Error 1

The problem was solved by including the standard errno.h  header file in error.h (extracted to admin/daemontools-0.76/src/error.h from daemontools-0.76.tar.gz) in the source. And we can use the nice tool without any issue :):)

Friday, April 5, 2013

Executing commands on multiple Linux machines


Here is a small and convenient tool to execute same command over a set of Linux machines.
To use the tool we need to configure the machines for password-less SSH, also need Perl module Net::OpenSSH. We have to create a file $HOME/.allhosts and put the IP addresses or host names in this file in separate lines. 

Configuring password-less access to all the machines you have

generate punblic and private keys
$ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

copy the public and private keys to all the machines you want to passwordless access:
scp ~/.ssh/id_rsa.pub userid@machine-ip:~/.ssh/id_rsa.pub
scp ~/.ssh/id_rsa userid@machine-ip:~/.ssh/id_rsa

Add the public key to the authorized keys files on every machines:
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Now you will be able to ssh from any of these machines to any of these machines without a password

Install perl Net::OpenSSH module as shown below:
$sudo yum install perl-CPAN
$sudo perl -MCPAN -e "install Net::OpenSSH" 

Examples
to kill all the java processes running in the machines listed in yout $HOME/.allhosts, issue the below command:
$ perl multihostcmd.pl  "ps -ef  | grep java  | grep -v grep | awk ' { print \$2 }'  | xargs kill -9"
Of course you will need permission to kill those processs

To check all the users logged to these machines, issue the below command
$ perl multihostcmd.pl  who

Below is the script (you may also access it in github ):

#!/usr/bin/perl                                
############################################################################
# Save this in a a file, say multihostcmd.pl                                            
############################################################################   

use threads (
    'yield', 
    'stack_size' => 64 * 4096,
    'exit'       => 'threads_only',
    'stringify'                    
);                                 
use Cwd qw(abs_path);              
use Net::OpenSSH;                  

#
# executeCmdRemote
# executes the command on a remote host 
# command execution happens over ssh    
#                                       

sub executeCmdRemote {
    my ( $host, $command, @others ) = @_;
    my $ssh =  Net::OpenSSH->new($host,  
         master_opts => [-o => "ConnectionAttempts=2", -o => "ConnectTimeout=5"] );
    if ($ssh->error) {                                                             
        return;                                                                    
    }                                                                              
    ($out, $err, @others) =  $ssh->capture2({timeout => 20}, $command);            
    $outdata = "FROM $host ******************************************\n";          
    if ($out ne "") {                                                              
        $outdata .= $out;                                                          
    }                                                                              
    if ($err ne "") {                                                              
        $outdata .= $err;                                                          
    }                                                                              
    $outdata .= "\nEND OF DATA FROM $host ************************************* \n\n";
    print $outdata;                                                                   
}                                                                                     

#
# readHostFile
# reads from the file $HOME/.allhosts where this file contain a list of hosts
# each line on this file denotes a host name or host ip                      
# The given command is executed on this host                                 
#                                                                            
sub readHostFile {                                                           
    my ($array, @others) = @_;                                               
    $hostfile = $ENV{'HOME'} . '/.allhosts';                                 
    if (! -e  $hostfile) {                                                   
        print $hostfile . ' doesn\'t exist';                                 
        return;                                                              
    }                                                                        
    if (! -f  $hostfile) {                                                   
        print $hostfile . ' is not a regular file';                          
        return;                                                              
    }                                                                        
    open FH, "< $hostfile"  || return;                                       
    @$array = <FH>;                                                          
    close FH;
}

sub main {
    if ( $#ARGV < 0 ) {
        $executing_script = abs_path($0);
        print "Usage: $executing_script command-to-execute\n";
        exit(1);
    }
    $cmdline = join( ' ', @ARGV );
    my @hostarry = ();
    readHostFile(\@hostarry);
    if ($#hostarry < 0) {
        print "Empty list for hosts";
        exit(1);
    }

    my $thr = undef;
    my @allthreads = ();
    foreach my $host (@hostarry) {
       $host =~ s/^\s+//;
       $host =~ s/\s+$//;
       if  ($host eq "") {
           next;
       }
       # Create a thread to execute the command on the remote host
       $thr = threads->create('executeCmdRemote', $host, $cmdline);
       push @allthreads, $thr;
    }
    foreach $thr (@allthreads){
        $thr->join();
    }
}

# Call main
main();

Tuesday, December 18, 2012

Ehcache or Memcached ?



Should we use memcached or should we go with Java based ehcache? It is not a million dollar question, but at least qualifies as a 100 dollar question :)
I think ehcache be a good option if it is used as an embedded cache in the application server, as it won’t involve sockets and serializations. Of course the application or application server must be in Java.
For all other modes, I think performance of ehcache will suffer as it will involve serialize/unserialize of data and data transfer will happen over sockets.  Then I doubt ehcache will be able perform better than memcached.

One major advantage of memcached  is that it has clients almost in every languages. The disadvantage is,  it doesn’t support persistence natively.  Membase (or couchbase) and memcachedb have data persistence  and they speak memcached protocol.  But do we need the memory cache to be persistent anyway ?

Ehcache supports HA. Then it cannot act as an embedded cache alone. 
HA in memcached can also be achieved very easily though generally the clients don’t support it directly. We need to just put the key-value pair in the memcached server that key is hashed to , and also in the next memcached server in the cluster (or also putting the key-value on a backup cluster of memcached servers).

Achieving elasticity with memcached looks like a big thing, because I don’t know if we can list all the keys in a memcached servers. There is the “stats” command for getting a list of keys, but no idea if it returns all the keys. If we can get the list of all keys we can use “extendable hashing” to shuffle few keys across the nodes. Only thing is clients also need to be made aware  of addition/deletion of new memcached servers in a cluster of memcached servers.  I am wondering how in a HA environment ehcache does that,  basically how the clients (especially the REST based non-Java clients) will know of the new destination for a key in cache of there is a change in number of ehcache nodes in the cluster.

I guess for embedded cache (for JAVA apps only) ehcache will be better.
In all other cases  memcached might  be better, but that is my thought only.

Monday, December 17, 2012

Simple program to list files sorted by size in a directory recursively

Below is a simple program to list files in a directory recursively and sort them by their sizes. I am using this tool in my work for some time as I want to detect huge files in some directories to free up disk spaces by deleting them. Hopefully it will be useful for you as well and hence I am posting it here :)   I am using boost filesystem library to do the real work.

#include <iostream>                                            
#include <iterator>                                            
#include <queue>                                               
#include <vector>                                              
#include <map>                                                 
#include <algorithm>                                           
#include <boost/filesystem.hpp>                                

using namespace std;
using namespace boost::filesystem;

int main(int argc, char *argv[])
{                               
    queue<path> directories;    
    multimap<uintmax_t, path> filesizes;

    if (argc < 2) {
        cout << "Usage: " << argv[1] <<  " path\n";
        return 1;                                  
    }                                              
    path p(argv[1]);                               

    try {
        if (exists(p)){
            if (is_regular_file(p)){
                cout << file_size(p) << "  " << p << endl;
                return 0;                                 
            }

            if (!is_directory(p)) {
                return 0;
            }
            directories.push(p);
            vector <path> entries;
            path thispath;
            while (!directories.empty()) {
                thispath = directories.front();
                directories.pop();
                copy(directory_iterator(thispath), directory_iterator(), back_inserter(entries));
                for (vector <path>::iterator it = entries.begin();
                        it != entries.end(); ++it) {
                    if (!exists(*it))
                        continue;
                    if (!is_symlink(*it) && is_directory(*it)) {
                        directories.push(*it);
                    } else if (is_regular(*it)) {
                        uintmax_t file_siz = file_size(*it);
                        filesizes.insert(pair<uintmax_t, path>(file_siz, *it));
                    }
                }
                entries.clear();
            }
        }
        for (multimap<uintmax_t, path>::iterator it1 = filesizes.begin(), it2 = filesizes.end();
                it1 != it2; ++it1) {
            cout << it1->first << " " << it1->second << endl;
        }
        filesizes.clear();
    } catch(const filesystem_error & ex) {
        cout << ex.what() << '\n';
    }
    return 0;
}



Tuesday, December 11, 2012

Apache Zookeeper examples for distributed configuration service, distributed registry, distributed coordination etc

Apache Zookeeper is an excellent piece of software that is really helpful for achieving the below stuffs in distributed systems:

  • coordination 
  • synchronization
  • lock service
  • configuration service
  • naming registry
etc. etc.
Zookeeper supports high availability by running multiple instances of the service. In case of one of the server that the clients are connecting to goes down, then it will switch over another server transparently. Great thing is irrespective of the server a client may connect to, it will see the updates in the same order. It is a high performance system and can be used for large distributed systems as well.
Zookeeper lets clients coordinate by a shared hierarchical namespace. It is a tree like structure as in normal file system. A client will be connected to a single zookeeper server (as long as the server is accessible).

Please go through Apache Zookeeper getting started guide for how to build, configure zookeeper first.
All these examples are available  in github.

Below is an example (zoo_create_node.c) for how to create Znodes (nodes in the zookeeper database).
In this example we demonstrate a simple way to create nodes in the zookeeper
server. Twenty nodes will be created with path /testpath0, /testpath1,     
/testpath2, /testpath3, ....., /testpath19.                                
All these nodes will be initialized to the same value "myvalue1".          
We will use zookeeper synchronus API to create the nodes. As soon as our   
client enters connected state, we start creating the nodes.                

All the examples used latest stable version of zookeeper that is version
3.3.6                                                                  
We may use the zookeeper client program to get the nodes and examine their
contents.                                                                
Suppose you have downloaded and extracted zookeeper to a directory       
/home/yourdir/packages/zookeeper-3.3.6 . Then after you build the C libraries
,they will be available at /home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/
and the c command line tool will available at                                
/home/yourdir/packages/zookeeper-3.3.6/src/c. The command line tools are cli_st
and cli_mt and cli. They are convenient tool to examine zookeeper data.       

Compile the below code as shown below:
$gcc -o testzk1 zoo_create_node.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Make sure that your LD_LIBRARY_PATH includes the zookeeper C libraries to run
the example. Before you run the example, you have to configure and run the  
zookeeper server. Please go through the zookeeper wiki how to do that.      
Now you run the example as shown below:                                     
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port 
22181 and IP 127.0.0.1                                                      
Now use one of the cli tools to examine the znodes created and also their   
values.

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  


int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        if (connected) {                                       
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_create(zh, mypath, "myvalue1", 9, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
                if (rc){                                                                  
                    printf("Problems %s %d\n", mypath, rc);                               
                }                                                                         
                x++;                                                                      
            }                                                                             
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
        if (2 == connected ) {
            // We created the nodes, so we will exit now
            zookeeper_close(zh);
            break;
        }
    }
    return 0;
}


In the below example (zoo_get_node.c) we demonstrate a simple way to get nodes in the zookeeper
server. Twenty nodes with path /testpath0, /testpath1,                       
/testpath2, /testpath3, ....., /testpath19 will be examined and value        
associated with them will be printed.                                        
We will use zookeeper synchronus API to get the value of the nodes. As soon  
as our client enters connected state, we start getting the node values.      

Compile the code as shown below:
$gcc -o testzk1 zoo_get_node.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Now you run the example as shown below:                          
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port
22181 and IP 127.0.0.1                                                    

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watcherforwget(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                               
{                                                                         
    char *p = (char *)context;                                            
    if (type == ZOO_SESSION_EVENT) {                                      
        if (state == ZOO_CONNECTED_STATE) {                               
            connected = 1;                                                
        } else if (state == ZOO_AUTH_FAILED_STATE) {                      
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {                  
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        }                                                                 
    }                                                                     
    printf("Watcher context %s\n", p);                                    
}                                                                         

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        char buffer[255];                                      
        struct Stat st;                                        
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        memset(buffer, 0, 255);                                
        if (connected) {                                       
            char mycontext[] = "This is context data for test";
            int len = 254;                                     
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_wget(zh, mypath, watcherforwget , mycontext, buffer, &len, &st);
                if (ZOK != rc){                                                          
                    printf("Problems %s %d\n", mypath, rc);                              
                } else if (len >= 0) {                                                   
                   buffer[len] = 0;                                                      
                   printf("Path: %s Data: %s\n", mypath, buffer);                        
                }                                                                        
                x++;                                                                     
                len = 254;                                                               
            }                                                                            
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
        if (2 == connected ) {
            // We created the nodes, so we will exit now
            zookeeper_close(zh);
            break;
        }
    }
    return 0;
}

In this example below (zoo_data_watches.c) we demonstrate a simple way to watch nodes in the zookeeper                                          
server. Twenty nodes with path /testpath0, /testpath1, /testpath2, /testpath3, .....,                                      
/testpath19 will be watches for any changes in their values.                                                               
We will use zookeeper synchronus API to watch the nodes. As soon                                                           
as our client enters connected state, we start putting watches for the nodes.                                              

Compile the below code as shown below:
$gcc -o testzk1 zoo_data_watches.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Now you run the example as shown below:                          
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port
22181 and IP 127.0.0.1                                    

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watcherforwget(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                               
{                                                                         
    char buffer[255];                                                     
    int len, rc;                                                          
    struct Stat st;                                                       
    char *p = (char *)context;                                            
    if (type == ZOO_SESSION_EVENT) {                                      
        if (state == ZOO_CONNECTED_STATE) {                               
            return;                                                       
        } else if (state == ZOO_AUTH_FAILED_STATE) {                      
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {                  
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        }                                                                 
    } else if (type == ZOO_CHANGED_EVENT) {                               
        printf("Data changed for %s \n", path);                           
        len = 254;                                                        
        //get the changed data and set an watch again                     
        rc = zoo_wget(zh, path, watcherforwget , mycontext, buffer, &len, &st);
        if (ZOK != rc){                                                        
            printf("Problems %s %d\n", path, rc);                              
        } else if (len >= 0) {                                                 
           buffer[len] = 0;                                                    
           printf("Path: %s changed data: %s\n", path, buffer);                
        }                                                                      
    }                                                                          

    printf("Watcher context %s\n", p);
}                                     

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        char buffer[255];                                      
        struct Stat st;                                        
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        memset(buffer, 0, 255);                                
        if (connected) {                                       
            //Put the watches for the nodes                    
            int len = 254;                                     
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_wget(zh, mypath, watcherforwget , mycontext, buffer, &len, &st);
                if (ZOK != rc){                                                          
                    printf("Problems %s %d\n", mypath, rc);                              
                } else if (len >= 0) {                                                   
                   buffer[len] = 0;                                                      
                   printf("Path: %s Data: %s\n", mypath, buffer);
                }
                x++;
                len = 254;
            }
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


In this example below (zoo_data_watches.c) we demonstrate a simple way to watch the appearance of a node
in the server. It will also watch if the node is deleted after it was created.

In this program we will watch for the appearance and deletion of a node
"/testforappearance". Once we create the node from another program, the watch
event will be sent to the client and the watcher routine woll be called.    
 *                                                                          

Compile the below code as shown below:
$gcc -o testzk1 zoo_exist_watch.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port 22181 and
IP 127.0.0.1                                                                         

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h> 
#include <time.h>     
#include <stdlib.h>   
#include <stdio.h>    
#include <string.h>   
#include <errno.h>    

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;      
static clientid_t myid;    
static int connected;      
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                       
{                                                                 
    if (type == ZOO_SESSION_EVENT) {                              
        if (state == ZOO_CONNECTED_STATE) {                       
            connected = 1;                                        
        } else if (state == ZOO_AUTH_FAILED_STATE) {              
            zookeeper_close(zzh);                                 
            exit(1);                                              
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {          
            zookeeper_close(zzh);                                 
            exit(1);                                              
        }                                                         
    }                                                             
}                                                                 

void watchexistence(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                              
{                                                                        
    static struct Stat st;                                               
    int rc;                                                              

    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            return;                       
        } else if (state == ZOO_AUTH_FAILED_STATE) {
            zookeeper_close(zzh);                  
            exit(1);                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
            zookeeper_close(zzh);                      
            exit(1);                                   
        }                                              
    } else if (type == ZOO_CREATED_EVENT) {            
        printf("Node appeared %s, now Let us watch for its delete \n", path);
        rc = zoo_wexists(zh, path,                                          
                watchexistence , mycontext, &st);                           
        if (ZOK != rc){                                                     
            printf("Problems  %d\n", rc);                                   
        }                                                                   
    } else if (type == ZOO_DELETED_EVENT) {                                 
        printf("Node deleted %s, now Let us watch for its creation \n", path);
        rc = zoo_wexists(zh, path,                                           
                watchexistence , mycontext, &st);                            
        if (ZOK != rc){                                                      
            printf("Problems  %d\n", rc);                                    
        }                                                                    
    }                                                                        
}                                                                            

int main(int argc, char *argv[])
{                              
    int rc;                    
    int fd;                    
    int interest;              
    int events;                
    struct timeval tv;         
    fd_set rfds, wfds, efds;   

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                         
    }                                                    

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);       
    hostPort = argv[1];                    

    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                
        return errno;                                         
    }                                                         

    while (1) {
        static struct Stat st;

        zookeeper_interest(zh, &fd, &interest, &tv);
        usleep(10);                                
        if (connected == 1) {                      
            // watch existence of the node         
            usleep(10);                            
            rc = zoo_wexists(zh, "/testforappearance",
                    watchexistence , mycontext, &st);
            if (ZOK != rc){
                printf("Problems  %d\n", rc);
            }
            connected++;
        }
        if (fd != -1) {
            if (interest & ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


In this example below(zoo_children_watch.c) we demonstrate a simple way to monitot the appearance of children znode in a path
in the server. We will check znode /testpath1 for its children and will examine if any children is
added or deleted under this path.

Compile the below code as shown below:
$gcc -o testzk1 zoo_childten_watch.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
*
Make sure that your LD_LIBRARY_PATH includes the zookeeper C libraries to run the example. Before
you run the example, you have to configure and run the zookeeper server. Please go through the
zookeeper wiki how to do that. Now you run the example as shown below:
./testzk1 127.0.0.1:22181 # Assuming zookeeper server is listening on port 22181 and IP 127.0.0.1

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watchchildren(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                              
{                                                                        
    struct String_vector str;                                            
    int rc;                                                              

    printf("The event path %s, event type %d\n", path, type);
    if (type == ZOO_SESSION_EVENT) {                         
        if (state == ZOO_CONNECTED_STATE) {                  
            return;                                          
        } else if (state == ZOO_AUTH_FAILED_STATE) {         
            zookeeper_close(zzh);                            
            exit(1);                                         
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {     
            zookeeper_close(zzh);                            
            exit(1);                                         
        }                                                    
    }                                                        
    // Put the watch again                                   
    rc = zoo_wget_children(zh, "/testpath1",                 
            watchchildren , mycontext, &str);                
    if (ZOK != rc){                                          
        printf("Problems  %d\n", rc);                        
    } else {                                                 
        int i = 0;                                           
        while (i < str.count) {                              
            printf("Children %s\n", str.data[i++]);          
        }                                                    
        if (str.count) {                                     
            deallocate_String_vector(&str);                  
        }                                                    
    }                                                        
}                                                            

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     

    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          

    while (1) {
        zookeeper_interest(zh, &fd, &interest, &tv);
        usleep(10);                                 
        if (connected == 1) {                       
            struct String_vector str;               

            usleep(10);
            // watch existence of the node
            rc = zoo_wget_children(zh, "/testpath1", 
                    watchchildren , mycontext, &str);
            if (ZOK != rc){                          
                printf("Problems  %d\n", rc);        
            } else {                                 
                int i = 0;                           
                while (i < str.count) {              
                    printf("Children %s\n", str.data[i++]);
                }
                if (str.count) {
                    deallocate_String_vector(&str);
                }
            }
            connected++;
        }
        if (fd != -1) {
            if (interest & ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


Monday, November 12, 2012

Print all the permutations of the characters in a word

A interesting computer programming problem is to print the all permutations of a the characters in a word.

How can we do that? Below is a simple approach:

Suppose your word has 'n' chars,
then first letter of your word can have n options,
second n -1 options
third n -2 options
and so on....
last will have just one option.
Trick is to add the options successively to the prefix, and sending
the suffix (i.e.) the options for remaining positions to function
and call the function for printing permutations recursively.


Below is the code for printing permutations of a word. Here I am
not taking into account the case of repeated letters. Which results
in printing duplicates. More thoughts for avoiding printing duplicates
efficiently:). I don't want to use a set to store the words and printing them
at the end. I want an algorithmic way out to solve the issue. Also, in this example the maximum word size is taken as 26, but that can be easily changed; just replace 26 with a bigger number:)




// Below is the code for printing permutations of an word. Here I am
// not taking into account the case of repeated letters. Which results
// in printing duplicates. More thoughts for avoiding printing duplicates
// efficiently. I don't want to use a store the words and printing them
// at them at the end. I want an algorithmic way out to solve the issue
//
// How does it work ?
// Suppose your whord has 'n' chars,
// then first letter of your word can have n options,
// second n -1 options
// third n -2 options
// and so on....
// last will have just one option.
// Trick is to add the options successively to the prefix, and sending
// the suffix (i.e.) the options for remaining positions to function
// and call the function print_permutations_word repeatedly
//

#include <stdio.h>
#include <string.h>

int print_permutations_word(const char *prefix, const char *suffix, int suffixlen)
{
    char myprefix[26] = "";
    char mysuffix[26] = "";
    int i = 0, j = 0, k = 0;
    if (suffixlen == 0) {
        printf("word %s\n", prefix);
        return 0;
    }

    while (i < suffixlen) {
        memset(myprefix, 0, 26);
        memset(mysuffix, 0, 26);
        snprintf(myprefix,26,"%s%c", prefix,suffix[i]);
        j = 0;
        k = 0;
        while (j < suffixlen) {
           if (i != j){
                mysuffix[k++] = suffix[j];
           }
           j++;
        }
        i++;
        print_permutations_word(myprefix, mysuffix, suffixlen - 1);
    }
    return 0;

}

#if 0
//example run
int main()
{
print_permutations_word("", "abcde", 5);
return 0;
}
#endif


Monday, November 5, 2012

Apache Thrift File based transport

Today I will demonstrate how we can use Apache Thrift FileTransport. Thrift file based transport is useful when we want to store the messages in some local files if the remote server is down. We may replay the files when the remote server comes up again. If we want to implement a Thrift based messaging system, then also this feature will be useful for "store and forwarding message" approach.

exception BadOperation {
1: i32 what,
2: optional string reason
}
enum Operation {
CREATE,
DELETE,
FILECONTENT,
DIRCONTENT
}
struct Work {
1: Operation op
2: string filename
3: string data
4: string rootdir
}
service FileService {
i32 createFile(1:Work w) throws (1:BadOperation badop),
list&lt;string> getFiles(1:Work w) throws (1:BadOperation badop)
}

Please refer my previous post or google for how to use the thrift compiler to generate code for handling thrift messages following the above definitions.My example will be in C++, but a PHP, Java or Python example would look very much similar.The generated code has a file FileService.cpp. Examining the code will show that createFileinterface calls two member functions of FileServiceClient class. They aresend_createFile and recv_createFile. send_createFile actually serialize our data,and send them over the socket in case of a typical client-server scenario.recv_createFile processes the response back from the server. In case we are using File based transport to "send" data, then there won't be any response. Hence, we don't need to call recv_createFile function. In stead we just call send_createFile to "send" or write the messages to the file. This is the trick.Same is the case with writing "getFiles" messages to the file as well.Actually we can make this much more efficient by serializing and batching the writes to the file ourselves. But here I am not showing that, as my purpose is to demonstrate the basic operations of File based transport.
Below I have pasted my code for "writing" messages to the file (file_writer.cpp)
#include <stdlib.h>
#include <time.h>  
#include <iostream>
#include <sstream> 
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>       
#include <transport/TTransportUtils.h>

#include "FileService.h"

using std::cout;
using std::endl;
using std::stringstream;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

using namespace FileHandler;

using namespace boost;

int main(int argc, char** argv) {
    shared_ptr<TTransport> file(new TFileTransport("testfiletransport"));
    shared_ptr<TTransport> transport(new TBufferedTransport(file));      
    shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));      
    FileServiceClient client(protocol);                                  
    try {                                                                
        int i = 0;                                                       
        stringstream ss (stringstream::in | stringstream::out);          
        Work work;
        work.data = "mydata";
        work.rootdir = "/home/nipun/test";

        try {
            while (i++ < 100) {
                work.op = Operation::CREATE;
                ss << "filename" << i ;
                work.filename = ss.str();
                ss.clear();
                ss.str("");
                ss << "data" << rand() << "-" << rand() << time(0);
                work.data = ss.str();
                ss.clear();
                ss.str("");
                client.send_createFile(work);

                work.op = Operation::DIRCONTENT;
                ss << "filename" << i  << rand();
                work.filename = ss.str();
                ss.clear();
                ss.str("");
                ss << "data" << rand() << "-" << rand() << time(0) << rand();
                work.data = ss.str();
                ss.clear();
                ss.str("");
                client.send_getFiles(work);
            }
        } catch (BadOperation &op) {
            cout << "Exception "  <<  op.reason  << endl;
        }

    } catch (TException &tx) {
        cout <<  "ERROR: " << tx.what() << endl;
    }
}


Here we wrote "createFile" and "getFiles" messages 100 times each to the file "testfiletransport" in current directory. We used send_createFiles and send_getFiles routines for the same. Remember that we have to use exact similar type of transport while reading from the file also. TBinary protocol specifies how the data is serialized and TBuffered transport is used to buffer data before they are flushed to underlying transport which is the file "testfiletransport" here.

Now is the time to read the data from the file. Generally we write the data to a file and read it later when want to replay them later. In such cases the messages are read and send over another transport which may be over TSocket (socket based transport class defined in Thrift library). But in this example we will just read the messages and print them to standard out.
 

Code for file_reader.cpp

#include <stdlib.h>
#include <time.h>  
#include <iostream>
#include <sstream> 
#include <string>  
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>       
#include <transport/TTransportUtils.h>

#include "FileService.h"

using std::cout;
using std::endl;
using std::stringstream;
using std::string;      
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

using namespace FileHandler;

using namespace boost;

int main(int argc, char** argv) {
    shared_ptr<TTransport> file(new TFileTransport("testfiletransport", true));
    shared_ptr<TTransport> transport(new TBufferedTransport(file));
    shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    FileServiceClient client(protocol);
    string fname;
    TMessageType mtype;
    int32_t seqid;
    Work w;
    try {
        while (true) {
            protocol->readMessageBegin(fname, mtype, seqid);
            cout << fname << endl;
            if (fname == "createFile") {
                FileService_createFile_args args;
                args.read(protocol.get());
                protocol->readMessageEnd();
                w = args.w;
            } else if (fname == "getFiles") {
                FileService_getFiles_args args;
                args.read(protocol.get());
                protocol->readMessageEnd();
                w = args.w;
            }
            cout <<"\tData:\t" << w.data << endl;
            cout <<"\tFilename:\t" << w.filename << endl;
            cout <<"\tRootdir:\t" << w.rootdir << endl;
            cout <<"\tOperation:\t" << w.op << endl;

        }
    } catch (TTransportException& e) {
        if (e.getType() == TTransportException::END_OF_FILE) {
            cout << "\n\n\t\tRead All Data successfully\n";
        }
    } catch (TException &tx) {
        cout << "ERROR " <<  tx.what() << endl;
    }
}


In the file_reader.cpp example, we first read the message type using readMessageBegin interface of binary protocol.  Then we use "fname" (function name) field to decide if the message is a "createFile" or "getFiles" message. Then we use use appropriate FileService args class to read the actual message.
After that we just print the message on our terminal :)

Hope this will be useful for you and if so please don't forget to leave a comment here :)

In my next post, I will show how we may build a simple messaging system using Apache Thrift. Hopefully you will visit again.