Thursday, September 4, 2014

Lets ZeroMQ

ZeroMQ is a high performance networking and messaging library. It can carry messages over TCP, multi-cast, in-proc,  inter-process transports. It provides messaging like request-reply, publish-subscribe, push-pull etc. Using these patterns as building blocks, complex messaging systems, execution pipelines can be built. The library is surprisingly easy to use and yet it delivers lightning performance.

In this blog, I will describe how to use ZeroMQ with some very Simple examples. Then I will show  how these building blocks can be used to assemble to build complex systems. At the end I will provide a brief regarding how ZeroMQ works internally.

The examples are in C++, but can be mapped easily to any other languages. ZeroMQ has bindings for more than 40 languages. I will submit python implementation of the same shortly in my github repository.


The built-in ZeroMQ messaging patterns are Request-Reply, Publish-Subscribe, Push-Pull etc.

Requst-Reply

 This pattern connects a set of clients to a set of servers. The service requests from clients get distributed across the servers and the clients may get the response from any of the servers.

A simple example is given below. The client keeps on calling send and recv in a loop while the server keeps on calling recv and send in its loop. The client sends a request, the server receives and process the request and send the response back to the client.



server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
int main ()
{
  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REP);
  socket.bind ("tcp://*:5555");
  int i = 0;
  while (true){
      zmq::message_t request;
      socket.recv (&request);
      std::cout << "Received " << (char *)request.data() << " #times: " << i++ << std::endl;
      zmq::message_t reply (1024);
      memcpy ((void *) reply.data (), "World", 6);
      socket.send (reply);
  }
  socket.close();
  context.close();
  return 0;
}

client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>

int main ()
{
  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REQ);
  socket.connect("tcp://127.0.0.1:5555");
  int i  = 0;
  while (true){
      zmq::message_t request (1024);
      memcpy ((void *) request.data (), "Hello", 6);
      socket.send (request);
      zmq::message_t reply;
      socket.recv (&reply);
      std::cout << "Received " << (char *)reply.data() 
          << "#times " << i++  << std::endl;
  }
  socket.close();
  context.close();
  return 0;
}

Here the server is waiting for request in an infinite loop. Whenever it receives a request, it processes the request and send an appropriate response. In this example, the response is always the same, a string "World" !
The client is keeps sending the request in an infinite loop and it always sends the same request "Hello"!
Both the client and server creates a ZeroMQ context.
The server creates a socket of type ZMQ_REP using the context and binds the socket to the transport "tcp://*:5555". This will effectively bind the socket to all the servers' IPs and port 5555. Now the server can recv requests from any number of clients.
The client  connects to server over the TCP transport "tcp://127.0.0.1:5555". Now it can send requests to server and receive responses.

Now let us see how a set of clients can communicate with a set of servers for getting the work done and receiving the status of the work. This is distributed RPC where a set of servers serve a set of clients. The client connects to all the servers and each request is distributed in a round-robin manner to all the servers. Application code does the minimal thing and the ZeroMQ library handle everything transparently.



Slightly modified code is below:
 server_arg_port.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>

using std::cerr;
using std::cout;
using std::endl;

int main (int argc, char *argv[])
{
  if (argc != 2) {
    cerr << argv[0] << " " << "bind-port" << endl;
    exit(1);
  }

  char transport[255] = "";
  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REP);
  snprintf(transport, 255, "tcp://*:%s", argv[1]);
  socket.bind (transport);

  char response[2048]; 
  memset(response, 0, 2048);
  snprintf(response, 2048, "Response from server lisetning on %s", argv[1]);
  int i = strlen(response);
  int times = 0;
  
  while (true){
      zmq::message_t request;
      socket.recv (&request);
      zmq::message_t reply (2048);
      snprintf(response + i , 2048, " #%d -- request data: %s", 
              times++, (char *)request.data());
      memcpy ((void *) reply.data (), response, strlen(response));
      socket.send (reply);
  }
  socket.close();
  context.close();
  return 0;
}

client_connect_multiple_server.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

using std::cerr;
using std::cout;
using std::endl;

int main (int argc, char *argv[])
{
  if (argc < 2){
      cerr << "Usage: " << argv[0] << " server1-port server2-port ... ";
      exit(1);
  }

  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REQ);

  int i  = 1;
  char transport[128];

  while (i < argc) {
    snprintf(transport, 128, "tcp://127.0.0.1:%s", argv[i++]);
    socket.connect(transport);
  }

  char request_data[1024];
  i = 0;
  while (true){
      zmq::message_t request (1024);
      snprintf(request_data, 1024, "Hello %08d", i++); 
      memcpy ((void *) request.data (), request_data, strlen(request_data));
      socket.send (request);
      zmq::message_t reply;
      socket.recv (&reply);
      cout << "Received " << (char *)reply.data() << endl;
      sleep(1);
  }
  socket.close();
  context.close();
  return 0;
}

As we see, each client connects to a set of servers. Servers are capable of handling multiple clients. Each server actually is capable of handling 1000s of clients. This due to the powerful asynchronus socket communication mechanism ZeroMQ uses internally. On Linux platform it uses epoll, on BSD it uses kqueue that facilitates to create servers capable of handling large number of connections.

Compile and run the above examples as as shown below:
$ g++ -o server_arg server_arg_port.cpp -lzmq
$ g++ -o client_connect_to_many client_connect_multiple_server.cpp -lzmq

Start three servers in the background:
$server_arg 5555 &
$server_arg 5556 &
$server_arg 5557 &

Start a client that connects to all of the servers:
$ ./client_connect_to_many 5555 5556 5557

Sample output from the client is given below. We can see the requests are distributed in round-robin manner among the servers.

Received Response from server lisetning on 5555 #0 -- request data: Hello 00000000
Received Response from server lisetning on 5556 #0 -- request data: Hello 00000001
Received Response from server lisetning on 5557 #0 -- request data: Hello 00000002
Received Response from server lisetning on 5555 #1 -- request data: Hello 00000003
Received Response from server lisetning on 5556 #1 -- request data: Hello 00000004
Received Response from server lisetning on 5557 #1 -- request data: Hello 00000005
Received Response from server lisetning on 5555 #2 -- request data: Hello 00000006
Received Response from server lisetning on 5556 #2 -- request data: Hello 00000007
Received Response from server lisetning on 5557 #2 -- request data: Hello 00000008
Received Response from server lisetning on 5555 #3 -- request data: Hello 00000009
Received Response from server lisetning on 5556 #3 -- request data: Hello 00000010
Received Response from server lisetning on 5557 #3 -- request data: Hello 00000011
Received Response from server lisetning on 5555 #4 -- request data: Hello 00000012

.....

Now let us kill a server process. Then the client will block forever in some send or receive. There is no easy way to recover from the situation. Below is an example of a workaround, but this is also not full-proof. We need to ensure that client or server doesn't issue a recv before a send is satisfied or issue a send before a previous read  is satisfied. Otherwise it will through an exception "Operation cannot be accomplished in current state". Once we restart the particular server again, it will start working fine. But bottom line is if a server which we connected initially goes down, the request-reply pattern doesn't work. I will explain in a later example how to address such problems and make the pattern reliable.

client_connect_multiple_server_timeout.cpp
 
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

using std::cerr;
using std::cout;
using std::endl;

int main (int argc, char *argv[])
{
  if (argc < 2){
      cerr << "Usage: " << argv[0] << " server1-port server2-port ... ";
      exit(1);
  }

  zmq::context_t context (1);
  zmq::socket_t socket (context, ZMQ_REQ);
  int timeout = 4000;
  socket.setsockopt(ZMQ_SNDTIMEO, &timeout, sizeof(int)); 
  socket.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(int)); 

  int i  = 1;
  char transport[128];

  while (i < argc) {
    snprintf(transport, 128, "tcp://127.0.0.1:%s", argv[i++]);
    socket.connect(transport);
  }

  char request_data[1024];
  i = 0;
  while (true){
      do {
        zmq::message_t request (1024);
        snprintf(request_data, 1024, "Hello %08d", i++); 
        memcpy ((void *) request.data (), request_data, strlen(request_data));
        if ( socket.send (request) == false) {
            cout << "Some error in sending request " << endl;
            continue;
        }
        break;
      } while (true);
      do {
        zmq::message_t reply;
        if (socket.recv (&reply) == 0) {
            cout << "Some error in read " << endl;
            continue;
        }
        cout << "Received " << (char *)reply.data() << endl;
        break;
      }while (true);
      sleep(1);
  }
  socket.close();
  context.close();
  return 0;
}