Thursday, September 22, 2016

Using apache Kafka correctly

Apache Kafka is one of the most popular message broker with high performance, scalability and reliability and high availability. It provides replication which is the most important feature as replication is a much for making the service highly available.
It is open source and hence it is free!  We get enough examples in the web to write and execute our POCs successfully.  But our proof of concepts are just "hello world"s and these are not enough to know what problems may happen in productions.

I have seen terrible misuse of Kafka happening in production and I was really amazed that Kafka was mostly able to handle even that kind of assault!!!


Now few lines about Kafka:
Apache Kafka is a publish-subscribe messaging system with partitioning, replication and persistence. The three components of the messaging system are brokers, producers, consumers. Kafka also needs Zookeeper to maintain cluster state, discovering new brokers etc. and hence Zookeeper is an essential component for Kafka deployment. Below diagram explains a typical Kafka installation along with producers and consumers.




 Producers sends messages to brokers and consumers pulls messages from the brokers. In Kafka, topics are partitioned and partitions may be replicated depending on the configurations. Each partition actually can be visualized as an independent FIFO queue. Partitions can be written in parallel and consumed in parallel. Basically, consumers consume the messages in sequence from a partition, but there is no need to maintain any order for consuming messages across partitions.


Now let us examine some innocent looking code which may tax the Kafka cluster like anything, but doesn't looking problematic at all.
Below is a program to write messages to Kafka (it is using kafka-python library version 1.3.1)

from kafka import KafkaProducer
from kafka import KafkaProducer

def send_message(partion_key, message):
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    producer.send('topic1', key=partion_key, value=message)
    producer.flush()
    producer.close()


for i in range(10000):
    key = str(i) + '_partition_key'
    msg = 'This is message ' + str(i)
    send_message(key, msg)



The above code snippet is connecting to a Kafka cluster and sends 10000 messages to Kafka. But one terrible thing we are doing here is that for every message we are creating a Kafka producer object and after sending the message to Kafka we are just closing the producer.  The producer object is really heavy. What happens behind is: it first connects to one of the bootstrap servers and gets meta-data about the topic(s) and brokers. The meta data may be few killobytes if there are many partitions for the topics or it is retrieving meta-data for many topics. From the meta data, the producer comes to know which broker is leader for which partition of a topic and many other details.  It is the producers which partitions the messages based on some partition key. So, distributing messages among the partitions is responsibility of the producers and brokers have no role to play here. Before sending the message to a particular broker, the producer will check if there is connection already made to that broker and will reuse an existing connection. If there is no connection to the leader broker for that partition, then the producer will make a connection to that broker and then send the data to broker.
If we repeat the above process for every message sent to Kafka, then there will be huge impact on the Kafka brokers as well as they may be serving metadata for each message and sometimes we will see long GC pause on Kafka brokers.  Another funny thing may be: Kafka brokers will be sending more data (metadata) to the producers, than the producers will be sending to to Kafka brokers  when pro meta-data for the topic-partitions is big and producer is sending small messages as shown in the above example.

I saw this actually happening in production and they were using the producers for long time like that!!!

So, always re-use the Kafka producer, consumer instances. In one process, it is good practice to use just one Kafka producer or consumer object.


Number of partitions for a topic

Topic should be created with enough number of partitions so we don't need to increase the number of partitions for the topic in near future. Suppose we have a topic "logtopic" and we calculated 100 partitions in 5 brokers should be good enough for handling the load. Then create the topic with 200 partitions, or just over commit the number of partitions. But don't start with 20 times (or 4000 partitions in this example) of the required number of partitions.
Each partition of a topic translates to at least two files in Kafka broker directories. With replication-factor > 1, there will be at-least 2 * number-of-partitions files distributed among the brokers. The number of files will increase when rollover happens. A partition files is only deleted when the file older than the time specified in retention-interval. So, there may be many partition files in Kafka brokers and Kafka servers may not be closing the old files when rollover happens. That may result in some errors related to "too many file descriptors open" etc. Though it is easy to increase maximum number of open file descriptors for the process, it is always better to take preventive measure for this problem. One way to achieve that is by creating the topics with judicious numbers of partitions.
More number of partitions also results in bigger size of meta-data and higher number of offset commits etc. If the Kafka consumer offsets are maintained on Zookeeper, then each offset commit translates to a write operation to Zookeeper, which may be problematic when there too many of such writes happening. With latest versions of Kafka, the topic offsets may be maintained in Kafka itself, but then also, each Kafka offset commit is a separate write operations to Kafka.


Partition to tenant mapping??

Sometimes people create topics which is served among multiple tenants. It is bad design to maintain topic-partition to tenant mapping. If we need tenants' data separation even at Kafka level, then we should create different topic for different tenants. Tenants which are paying us more or which generates more data should be allocated topic with more number of partitions. The data in Kafka is data in transit and Kafka is not a database for querying or long term object storage. So, sharing a topic across multiple tenant's should not be an issue. All we need to "tag" each message (from producer) with a field which let us know the actual tenant the message belongs to.

Retention period

Kafka deletes  message volume files when they are older than the period defined in its retention period. While deleting the messages, Kafka doesn't care about if the consumers actually consumed the message or not. So it may happen that some messages get deleted even before they are processed by the consumer. When consumer's processing of message is very slow, then this situation may very likely happen. I have seen this happening in production where millions of messages were lost because the consumers could not process them within the the configured retention period. So, we have to be careful about what values we set for retention period of the topics.


Message size

Kafka brokers has configuration for maximum message size(message.max.bytes) and for replication the maximum size of the data fetch(replica.fetch.max.bytes). We should keep the value for both same or replica.fetch.max.bytes > message.max.bytes. If message.max.bytes > replica.fetch.max.bytes (let us call it m_big) then it may happen that producer publishes a message with size > replica.fetch.max.bytes. This message will fail to get copied to its replica because the broker holding that replica is sending a fetch request(to the the leader for that partition) setting the maximum fetch byte at lower value then the size of m_big.  This failure will repeated continuously and replication for that partition stops completely. Only way to recover from the failure would be to shutdown the follower broker and change the replica.fetch.max.bytes value in the configuration to greater than or equal to that of message.max.bytes.

fetch.message.max.bytes is also defined for the Scala/Java consumer API or similar configs for kafka client library in other languages. We should be careful enough to set the value for this property >= message.max.bytes of Kafka cluster.


Meta data issue

Meta data for the Kafka cluster may be huge if there are many numbers of topic and partition in the cluster. Whoever is writing Kafka producer or client drivers should not that it only fetches the meta data for the required topics as requested by the callers. Required meta data may be fetched every few seconds to keeping the information up-to-date or only when needed, but that should not be very frequent. Caller of the producer or consumer driver should not be explicitly requesting  meta data information as far as possible.

Compression

Kafka supports gzip and snappy compression for the messages. This can be set at message level. In general, gzip provides better compression ratio, while snappy provides better speed of compression. If the messages being published to Kafka is bigger than few bytes (> 100 bytes or so) and the message mainly consists of texts, JSON, XMLs etc., then enabling compression on the messages may significantly reduce the disk space consumed by Kafka brokers and it also improves both network and disk IO. Both the producers and consumers though may be using more CPU for compression and decompression of the messages.

Synchronous vs asynchronus Kafka producers

Kafka wire protocol for the producers has the facility to make the producers asynchronous. The requests crafted by the producer has a 32-bit correaltion-id which is sent back by server in the response and can be used for  matching the responses with corresponding requests.  This way a  producer can use just one connection to a server/broker for sending all the producer requests and it doesn't need to wait for the response for a request before it sends another producer request to the server.
In case of synchronous producers, a connection to a server is used exclusively for a request at a time. That means, for every request the producer will take away a connection to a server from a  connection pool, send the request to server over that connection, wait for the response from the server and after the response is received, the connection is released back to the connection pool. So, suppose the producer wants to send 32 requests in parallel to a server, it will need to have 32 connection to the server. Having too many connections, both on client and server is not a very good thing.

Kafka producer library bundled with Kafka (>= 0.8.2 version), is asynchronous and that is really good. Before selecting a Kafka library, we should see if that library really supports an asynchronous producer and we should always prefer asynchronous producers.  

Ack for writes to servers

High availability doesn't mean persistence alone. High availability includes persistence and replication of the data so that if a server goes down, the data still available on another server. Kafka provides high availability by persisting the messages on disk and replicating them. Producers can send the messages with different "ack levels". It may indicate that producer doesn't need any ack from the server, or may want an ack only if the server persists the message to its local log, or when the server finds the message is committed at all in sync replicas, or if the message is replicated to certain number of replicas. Generally, the messages that are not very important or if we may afford to lose few of them should be added to separate topics with low replication factor (e.g. <= 3). For such messages, producer can send the messages which indicates that it will want an ack from server as soon as the message is added to the servers local logs.

For messages, which we cannot afford to lose, we should use topics with higher replication factor (>=3) and also the producer should send the messages with "required acks" which instructs the server to send the response only when the message is replicated to all in sync replicas. This will minimize chances of data loss to a great extent.