Sunday, October 15, 2017

A tool for Kafka consumers details

I was looking for a tool to set the offsets of Kafka high level consumers. After doing some search, I decided to built one of my own. The tool helps to get the consumer details of a consumer group, get the offsets of a consumer group and also set the offsets of the partitions consumed by a consumer group. Sometimes setting the consumer offsets at desired position is required to skip some messages, and also to re-process the messages from the same consumer group.

The tool is available here
The link also has the details about how to install and run the tool.
Hopefully, it may help somebody.


Below are some examples from running the tool:

$ kafka_consumer_tool --consumergroup newgroup --command commitoffsets --inputjson topic_offsets.json
2017/10/15 15:33:52 Commiting offsets
2017/10/15 15:33:52 Topic: a1, Partition: 6,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 7,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 8,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 1,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 3,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 5,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 2,  Offset committed successfully
2017/10/15 15:33:52 Topic: a1, Partition: 4,  Offset committed successfully
2017/10/15 15:33:52 Topic: a2, Partition: 1,  Offset committed successfully
2017/10/15 15:33:52 Topic: a2, Partition: 3,  Offset committed successfully
2017/10/15 15:33:52 Topic: a3, Partition: 1,  Offset committed successfully
2017/10/15 15:33:52 Topic: a3, Partition: 3,  Offset committed successfully

$ kafka_consumer_tool --consumergroup newgroup --command getoffsets --topics a1,a3
2017/10/15 15:35:48 Current offset details:
2017/10/15 15:35:48 Topic: a1, Partition: 0, Offset: 1830
2017/10/15 15:35:48 Topic: a1, Partition: 1, Offset: 20
2017/10/15 15:35:48 Topic: a1, Partition: 2, Offset: 1
2017/10/15 15:35:48 Topic: a1, Partition: 3, Offset: 0
2017/10/15 15:35:48 Topic: a1, Partition: 4, Offset: 1
2017/10/15 15:35:48 Topic: a1, Partition: 5, Offset: 1
2017/10/15 15:35:48 Topic: a1, Partition: 6, Offset: 1
2017/10/15 15:35:48 Topic: a1, Partition: 7, Offset: 100
2017/10/15 15:35:48 Topic: a1, Partition: 8, Offset: 1
2017/10/15 15:35:48 Topic: a1, Partition: 9, Offset: 1812
2017/10/15 15:35:48 Topic: a1, Partition: 10, Offset: 1920
2017/10/15 15:35:48 Topic: a1, Partition: 11, Offset: 1920
2017/10/15 15:35:48 Topic: a1, Partition: 12, Offset: 1908
2017/10/15 15:35:48 Topic: a1, Partition: 13, Offset: 1818
2017/10/15 15:35:48 Topic: a1, Partition: 14, Offset: 1908
2017/10/15 15:35:48 Topic: a1, Partition: 15, Offset: 1926
2017/10/15 15:35:48 Topic: a1, Partition: 16, Offset: 1860
2017/10/15 15:35:48 Topic: a1, Partition: 17, Offset: 1860
2017/10/15 15:35:48 Topic: a1, Partition: 18, Offset: 1830
2017/10/15 15:35:48 Topic: a1, Partition: 19, Offset: 1836
2017/10/15 15:35:48 Topic: a1, Partition: 20, Offset: 1866
2017/10/15 15:35:48 Topic: a1, Partition: 21, Offset: 1896
2017/10/15 15:35:48 Topic: a1, Partition: 22, Offset: 1902
2017/10/15 15:35:48 Topic: a1, Partition: 23, Offset: 1932
2017/10/15 15:35:48 Topic: a1, Partition: 24, Offset: 1956
2017/10/15 15:35:48 Topic: a1, Partition: 25, Offset: 1842
2017/10/15 15:35:48 Topic: a1, Partition: 26, Offset: 1890
2017/10/15 15:35:48 Topic: a1, Partition: 27, Offset: 1920
2017/10/15 15:35:48 Topic: a1, Partition: 28, Offset: 1848
2017/10/15 15:35:48 Topic: a1, Partition: 29, Offset: 1734
2017/10/15 15:35:48 Topic: a1, Partition: 30, Offset: 1746
2017/10/15 15:35:48 Topic: a1, Partition: 31, Offset: 1872
2017/10/15 15:35:48 Topic: a3, Partition: 0, Offset: 615
2017/10/15 15:35:48 Topic: a3, Partition: 1, Offset: 2
2017/10/15 15:35:48 Topic: a3, Partition: 2, Offset: 623
2017/10/15 15:35:48 Topic: a3, Partition: 3, Offset: 4
2017/10/15 15:35:48 Topic: a3, Partition: 4, Offset: 627
2017/10/15 15:35:48 Topic: a3, Partition: 5, Offset: 641
2017/10/15 15:35:48 Topic: a3, Partition: 6, Offset: 632
2017/10/15 15:35:48 Topic: a3, Partition: 7, Offset: 631
2017/10/15 15:35:48 Topic: a3, Partition: 8, Offset: 633
2017/10/15 15:35:48 Topic: a3, Partition: 9, Offset: 609
2017/10/15 15:35:48 Topic: a3, Partition: 10, Offset: 635
2017/10/15 15:35:48 Topic: a3, Partition: 11, Offset: 640
2017/10/15 15:35:48 Topic: a3, Partition: 12, Offset: 626
2017/10/15 15:35:48 Topic: a3, Partition: 13, Offset: 592
2017/10/15 15:35:48 Topic: a3, Partition: 14, Offset: 609
2017/10/15 15:35:48 Topic: a3, Partition: 15, Offset: 633


$ kafka_consumer_tool --consumergroup newgroup --command listconsumers
2017/10/15 15:36:47 Group: newgroup,  state: Stable
2017/10/15 15:36:47 id: myid5-45d71997-3ed2-4443-9f1b-30d972aefc35, host: /192.168.1.3, clientid: myid5
2017/10/15 15:36:47 id: myid1-0fa736ca-56a6-495d-9887-83f63396e4de, host: /192.168.1.3, clientid: myid1
2017/10/15 15:36:47 id: myid6-4ad5c512-6599-4706-802c-872b4257e5c8, host: /192.168.1.3, clientid: myid6
2017/10/15 15:36:47 id: myid7-b7a3c69b-32ad-43c7-ab87-467eee5d65d9, host: /192.168.1.3, clientid: myid7
2017/10/15 15:36:47 id: myid4-1c4f50a1-8bfd-4752-9371-0d0cd31407f6, host: /192.168.1.3, clientid: myid4
2017/10/15 15:36:47 id: myid2-581bbba8-003e-4322-adce-18573c71dac3, host: /192.168.1.3, clientid: myid2
2017/10/15 15:36:47 id: myid0-48bb6ae8-b6c2-4893-a34d-8f842ef2f61c, host: /192.168.1.3, clientid: myid0
2017/10/15 15:36:47 id: myid3-548e4d32-a458-4edd-b9da-084a07db0440, host: /192.168.1.3, clientid: myid3
2017/10/15 15:36:47 id: myid9-bacf18ad-9e5c-45ca-92e2-c5fa85ea9963, host: /192.168.1.3, clientid: myid9
2017/10/15 15:36:47 id: myid8-796f0609-9734-4d75-b35e-21efbbf7d6ee, host: /192.168.1.3, clientid: myid8
Nipun:kafka_consumer_tool samrat$

Saturday, July 29, 2017

Approach to Designing a distributed system

Most of the systems we work on or we design now-a-days are distributed. The data is distributed, data is served by may be many instances of identical components, data is partitioned, different functionalities may be served by different components running independently and each components may be running different number of instances. The components will most likely run across different machines, and may run on different data centers. So, it is really complex to weaving such system together and making it function correctly.
No matter how much precautions we take,  things may go wrong. But we should try to minimize the possibility of that happening. When we try to overlook something, saying that that this is unlikely to happen in production and let us not waste time in writing devising safeguards against them, then we are fooling ourselves. Things that unlikely to happen or don't happen during tests will surely happen in production someday, they may not be that we that infrequent we hoped them to be :(


Below are the principles, I guess we should follow while designing a system
  • System must behave correctly as far as possible, in some cases we can afford not more than 0% errors, that must be unit tested and integration tested to the maximum. In other cases also, let us design it for 99.99% correctness. Designing system which tries to overlook error condition is a recipe for disaster and unstable system; many capable engineers will have to spend hours or days chasing and correcting the errors. So, correctness is the first and most important consideration for our design
  • System must be reliable. Different components may come up and die,  a component or service may run with different number of instances at different time. But this should not hamper reliability of the system. If the client pushed a document to our platform and we returned him a 200 OK response, then we must ensure all subsequent get for that document by that client at least (if the document is not public) always returns with a status of 200 OK
  • System should be highly available. First let us make the system HA and then think about scalability. There should not be any component or data store on our deployment which is not HA; otherwise things may turn ugly when the non HA component  goes down. We may end up losing data, many services may stop working which were dependent on non HA component which went down. Don't be selective about making only some components HA, but all the components must be HA. A deployment where some components are made HA and some are not, is a pathetic deployment architecture to follow. Avoid that at any cost. In many cases we don't really deal with "big data" (E.g. Handling 4 TB of data is not actually a big data problem :))
  • Scalability is important as we may have to serve larger number of clients,  also data will keep growing. Our system should be able to grow with the number of requests and increasing data. 
  • The system must handle concurrent updates and at the same time it should not unnecessarily lock some records resulting in performance degradation. Optimistic locking is the technique we may use here. Most databases have the facility for optimistic  locking of records. I will give an example where things may go wrong if we don't use optimistic locking. Assume there is an website where the user maintains their Job profiles. Each job is maintained as a separate record in some database. Now assume an user opened two browsers and read the job record from both the browsers. The user leisurely changes the job description details from both the browsers and submits the changes. The problem is whichever browser submitted the changes later will override the changes done by the browser who submitted its changes earlier( but they read the same version of record). How to prevent such errors efficiently? The answer is optimistic locking. Every record will have a version and every update should do a compare and swap of the version read with a new version(new version may be "old version + 1" or "current time stamp" etc.).  The compare and swap operation must be atomic for optimistic locking to work correctly. It compares the version with the expected value (which is the version read by the client) and if that comparison finds that the version is not the expected value (means some other client changes the version in between), then it returns an error. When the client gets the error, it should again read the latest record and try to apply the change on the latest version of the record.  This simple technique prevents a lot of errors. People tries to ignore  this scenario out of ignorance or thinking the chances of happening this issue is less. But when there is a simple and robust solution for the problem, why should we ignore that?
  • Be careful when you do local caching of some records in application servers especially when there will be always multiple version of the application server running. Local cache is a cache within the memory of the application process. It is useful when we want to avoid network calls for getting the cached data from Memcache, Redis, Hazelcast etc. Basically, the application server looks into its local cache for some records before it makes calls to the distributed cache or database etc. But if the record may get updated by multiple copies of the service, then use of local cache will be incorrect. For example, let us say instance 1 of service A update some record for key "counter_x" to value VAL_X100 at 10:30 am. Same value was read by instance 2 of service A  at 10:25 am and at that time value was VAL_X80 and it cached the value locally. Now some client's call reaches instance 2 of service A requiring the value of counter_x. If instance 2 of service A returns the value cached locally, then the client ends up reading an incorrect value (VAL_X80) for counter_x.  Applying some expiry time for the keys in local cache won't solve the problem anyway.  So, we should think when we want to cache something locally in a distributed system. There are cases where local caching may result in erroneous behavior. 

Saturday, April 8, 2017

Distributed Lock using MongoDB

Distributed system needs synchronization many times. If the synchronizing processes are running within the same machine or container, then no problem; we have plenty of choices such as operating system provided semaphores, file locks etc. Synchronizing within the same process (across multiple threads, green threads etc.) is also fine, we have thread level locks, atomic integers, semaphores etc. Of course, we need to be very careful while we go for synchronization; there may deadlocks occasionally which will require us hours of debugging to uncover the root cause.

When we need to synchronize the activity of processes running across machines, then we will need distributed lock and there are systems suitable for that. Zookeeper, Etcd., Hazelcast, Redis etc. may be used for distributed locking.

In this post I will explain how we may use MongoDB for distributed locking. The locks described are exclusive locks(i.e. they are not read locks, implementing read locks using MongoDB will need some more thoughts :))


Now let us discuss how we can achieve exclusive locks using MongoDB. We will use below MongoDB document to describe a lock.
{
     _id :  "lock_id",
    acquirer: "acquirer_unique_id",
    updated: "timestamp"

}

There are two operations, lock and release lock. Also, the lock should be automatically released if the locking process is not alive, or unable connect to MongoDB for some interval.

Lock operation:
Lock("a_new_lock") ->  insert a record with _id = "a_new_lock" and acquirer="unique id for the acquirer", and expire after = current-time-stamp

The process will run a thread to update the expireafter time to current timestamp periodically.

UnLock operation:
UnLock("a_lock") -> delete the record with _id = "a_lock" and if the record's acquirer_id is same as the caller's acquirer id. Matching the acquirer_id is important, without that the process may end up releasing  a lock acquired by some other process.

Things needed to be done for MongoLock operations:
A mongo collection, let us name the collection "locks" in database distlock.
A ttl index in the collection, which will ensure that the locks eventually get released if the lock owner process goes down without releasing the lock.

Let us create a ttl index on the table locks:
db.locks.createIndex( { "updated": 1 }, { expireAfterSeconds: 600 } )


Let us create a lock with name "newlock":
db.locks.insert({_id : "newlock", aqcuirer : "myid", updated : new Date()})


Now if another process tries to get the "newlock", it will fail:

db.locks.insert({_id : "newlock", aqcuirer : "myid", updated : new Date()})

WriteResult({
"nInserted" : 0,
"writeError" : {
"code" : 11000,
"errmsg" : "E11000 duplicate key error collection: distlock.locks index: _id_ dup key: { : \"newlock\" }"
}
})

Releasing the lock is a little tricky. We shouldn't delete the lock by its id only, we should delete the lock if the lock id and acquirer id both match. Otherwise, there may be a chance that a process will end up releasing a lock acquired by another process.
Lets release the lock "newlock":

db.locks.remove({_id : "newlock", aqcuirer : "myid"})

WriteResult({ "nRemoved" : 1 })


What happens if the process dies without releasing the lock? Well, the lock will be automatically released by MongoDB after 10 minutes as the locks collection has a ttl index on it with expireAfterSeconds set to 600 seconds. While the lock acquirer is running, it should update the "updated" field every 10 seconds or so, because some long running operation may continue for more than 10 minutes (after acquiring the lock).

The above simple operations demonstrate how we may implement simple distributed exclusive lock using MongoDB. Hope it was helpful :)

The prototype implementation is available here:
https://gist.github.com/nipuntalukdar/e9c1db9a78b45266a4ccfcff0f8f24a4