Sunday, January 5, 2014

Java Event Bus with Guava

Sometimes we need to different modules running in a Java process to communicate with each other. Especially threads will communicate or pass messages between them. IPC is never a good option in such scenarios. Message queue within the same process is even less appealing than a linked list.
Also, coding the stuffs ourselves tedious and we should look around to see if somebody already did that and published the same.

Google Guava library is one of the most popular Java library. This also includes the "EventBus" module.  EventBus provides publish-subscribe-style communication between components without requiring the components to explicitly register with one another. Here the publisher and subscribers communicate via the highway, the shared event bus.
The working is simple. We create an EventBus, the subscribers register themselves to the event bus, and the publisher publish or post the "messages" to the event bus. The event bus takes care of delivering the messages to the  appropriate subscribers. There are synchronous and asynchronous  event buses. In synchronous event bus, when a publisher posts a "message",  it will not return till all the subscribers receive the message. In asynchronous event bus, the subscribers are invoked in different threads.

Now let us look at a simple example. There are many publishers who keep publishing the delta of some counters while there is an aggregator which receives the counters and add the delta (It may persist them in database or file etc.)

As the counter publishers and the aggregator will communicate with a common event bus (may be asynchronous or synchronous depending our need, but in most real world cases we generally have to go with asynchronous bus).

Below is the asynchronus event bus provider module:

package geet.example.eventbus;
import java.util.concurrent.Executors;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;

public class CounterAsyncEventBus{
 private static EventBus bus = new AsyncEventBus(Executors.newFixedThreadPool(1));
 
 public static EventBus getEventBus(){
  return bus;
 }
}


If we want a synchronous event bus, below is the provider:
package geet.example.eventbus;

import com.google.common.eventbus.EventBus;

public class CounterEventBus{
 private static EventBus bus = new EventBus();
 
 public static EventBus getEventBus(){
  return bus;
 }
}



Below is the Counter class, complex :)
package geet.example.eventbus;

public class Counter {
 private String counterName = null;
 private Integer counterDelta = null;
 public Counter(String counterName, Integer counterDelta) {
  this.counterName = counterName;
  this.counterDelta = counterDelta;
 }
 public String getCounterName() {
  return counterName;
 }
 public void setCounterName(String counterName) {
  this.counterName = counterName;
 }
 public Integer getCounterDelta() {
  return counterDelta;
 }
 public void setCounterDelta(Integer counterDelta) {
  this.counterDelta = counterDelta;
 }
}




Counter Aggregator code is below. The counter aggregator object gets an instance of the common event bus and then registers itself to the event bus. The constructor also takes a boolean flag as an argument which acts an indicator whether to use synchronous or the asynchronous event bus. Also, note the "@Subscribe" annotation which tells that if publisher posts a "Counter", the method "addCount" has to be invoked.

package geet.example.eventbus;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class CounterAggregator {
 private ConcurrentHashMap<String, AtomicInteger> counters = null;
 private EventBus bus = null;

 public CounterAggregator(boolean async) {
  counters = new ConcurrentHashMap<String, AtomicInteger>();
  if (async)
   bus = CounterAsyncEventBus.getEventBus();
  else
   bus = CounterEventBus.getEventBus();
  bus.register(this);
 }

 @Subscribe
 @AllowConcurrentEvents
 public void addCount(Counter counter) {
  Integer val = counter.getCounterDelta();
  String counterName = counter.getCounterName();

  AtomicInteger oldval = counters.putIfAbsent(counterName,
    new AtomicInteger(val));
  if (oldval != null) {
   oldval.addAndGet(val);
  }
  System.out.println(Thread.currentThread().getName()
    + " Value for the counter " + counterName + "="
    + counters.get(counterName));

 }
}




Now below is the code for counter generators. Here also counter generators get the asynchronous or synchronous event bus instance. The "publish" method internally posts Counter objects to the event bus which will be consumed by the subscriber.

package geet.example.eventbus;

import com.google.common.eventbus.EventBus;
public class CounterGenerator {
 private EventBus bus = null;

 public CounterGenerator(boolean async) {
  if (async)
   bus = CounterAsyncEventBus.getEventBus();
  else
   bus = CounterEventBus.getEventBus();
 }

 public void publish(String name, int val) {
  bus.post(new Counter(name, val));
 }

}


Now we have everything set for the event bus demo application. The code is given below. In this example I am showing how to work with asynchronous event bus, but this can easily modified to use synchronous event bus also. It creates a counter generator, a counter aggregator. Counter generator publishes some delta for the counters c1,c2,c3 and c4. The aggregator keeps accumulating the counts. Bottom line is, the event bus elegantly handles the routing of the messages internally saving us from writing a lot of tedious code and reinventing another wheel :)

package geet.example.eventbus;

import java.util.Random;

public class Example {
 public static void main(String[] args) {
  String []counterNames = {"c1", "c2", "c3", "c4" };
  Random random = new Random();
  @SuppressWarnings("unused")
  CounterAggregator aggr = new CounterAggregator(true);
  CounterGenerator gen = new CounterGenerator(true);
  int i = 0;
  while (i++ < 10000){
   gen.publish(counterNames[i & 3], random.nextInt(50000));
  }
 }
}

You may access the all the samples from this github link