Real Time Distributed System

01 Mar 2016

Today, lot of features mean lot of processing and lots of processing means increasing response times. Now response times upto a certain limit are acceptable, but slowly with increasing complexities, it starts going out of hand. This is what happened with us with city wide hotel search result. There are just way too many things to calculate, each city has hotels, hotels have rooms, rooms have offers, which can vary according to way too many parameters, and a lot of other things, and we simply could not cache that amount of data with so many permutations and combinations.

So, if we could not reduce our complexities, we started thinking along the lines of parallel computation. Now one of the problems that come with python and parallelization is GIL, we simply could not do anything about it.  

Now we started exploring on how to distribute the requests real time. We explored a few options but finally settled on celery and kafka. Now we started testing things with celery.

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously(wait until ready).

Task queues are used as a mechanism to distribute work across threads or machines. A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, a client adds a message to the queue, which the broker then delivers to a worker. A Celery system can consist of multiple workers and brokers like redis and rabbitMQ. For more info regarding celery, go to: -

We have a Celery farm which is set up with redis. Now we did a complete revamp of  the code so that we could distribute processing of requests, and started experimenting with celery.

We also started to work with kafka and figuring out the semantics. Those of you who are unaware with kafka, it is a message broker system(pub/sub).  Like many publish-subscribe messaging systems, Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. Since Kafka is a distributed system, topics are partitioned and replicated across multiple nodes.

Messages are simply byte arrays and the developers can use them to store any object in any format – with String, JSON, and Avro the most common. It is possible to attach a key to each message, in which case the producer guarantees that all messages with the same key will arrive to the same partition. When consuming from a topic, it is possible to configure a consumer group with multiple consumers. Each consumer in a consumer group will read messages from a unique subset of partitions in each topic they subscribe to, so each message is delivered to one consumer in the group, and all messages with the same key arrive at the same consumer. To get to know more about kafka, go to :-

Using kafka came with a lot of pitfalls.

We started exploring, came to see a concept of high level consumer groups. The idea was that different consumers who are subscribed to the same group will get different messages. We assumed this to be incorporated with kafka. Here was where we were wrong!! So wrong!! and here came our first problem.

All workers listening to same topics even while listening to the same group, were getting the same message. So the assumption(my bad for not properly understanding the documentation) we took about kafka coordinating the response was kind of a setback for us. We explored a little bit and got to know, that coordination was done by a lead worker which would assign different partitions to different consumers. I forgot to add, each topic can have different partitions, basically each partition is like a different queue. Then, kafka python library did not have ability to coordinate workers. (Recently there has been a change in kafka-python library, and it supports coordinated workers, but what it does is, whenever a worker dies, all workers stop processing requests, and are re-assigned partitions for processing, which has a latency which we could not afford.) So we basically created different workers with different partitions. And started monitoring it with supervisor.

Now we started off with tests, everything was working great, then slowly  while increasing the load, we noticed that some of the workers were sitting idle, and some workers were full for the whole of the time, and kafka queues were queueing up. which was our next problem, Partition assigning for each push request to kafka was not random enough. We tried using different hash algorithms, but result was the same, some workers were getting queued up and some workers were not getting any requests at all.

So to solve the problem of properly distributing the requests. We added a worker whose only job is to listen from a queue, and distribute the request to different servers.

Now our setup was ready.

We started testing out on both the setups, and after running a few scenarios, we found out kafka to be more performant than celery farm.

It was time to put it on production, and the results were well, really  unexpected, we were surprised with the gain in performance we got during peak traffic time, you can see the results yourself: -  

Older graph for requests

The newer distributed code.

As you can see the average time came down from 14.2 seconds to 2.57 seconds, and worst case response time which was closer to 32 seconds in older setup came down to around 4 seconds.

Conclusion - Point is you have Kafka at your hand, you have Celery at your hand but neither of them can help you at bigger scale of data if you use them just-as-is. This blog by no means proves anyone’s superiority over another. Its just that for our use case Kafka proved to be better fit according to our implementation. So devs go out and Experiment, Try out new things and Implement!