Kafka Streams — a life in production

Maxilect
7 min readMar 1, 2021

There is an exceptionally positive informational background on the Kafka Streams handling. This tool seems attractive because it has detailed documentation, an understandable API, and beautiful architecture. Some of my friends and colleagues use it to develop their systems. So, what happens in real life when these systems go into production?

In this article, I will omit the introduction to Kafka Streams, assuming that the reader is already familiar with it, and will talk about our experience with this library on the example of a highly loaded system.

Briefly about the project

An internal team is working on the Ad Exchange, which helps resell advertising traffic. We have already described the specifics of such tools. As the number of partners among SSP and DSP grows, the load on the exchange servers increases. To increase the value of the exchange itself, we must collect detailed analytics from this traffic. This is where we tried to use Kafka Streams.

Implementing a new tool in your production is always a risk. We were aware of it, but Kafka Streams conceptually well suited the calculation of the aggregates. Although the first impression was great, I will talk about the problems.

The article does not pretend to be objective and only describes our experience and the problems we faced while working with this technology. I hope it helps someone avoid mistakes when using Kafka Streams. Or maybe it will give a reason to choose a more fitting solution.

Aggregates

Hundreds of thousands of messages per second come to our system. The first thing we needed to do was aggregate by different keys. For example, there is a stream of events called “Ads display”. For it, you need to calculate the total cost and quantity on the fly, grouping by different keys: country, partner, advertising platform, etc.

It seems to be a standard case for Kafka Streams: we use the groupBy and aggregate functions and get the desired result. Everything works as intended and with excellent speed due to the internal cache: several consecutive changes for the same key are performed in the cache. Only at certain moments are keys sent to the changelog topic. Then, Kafka in the background removes obsolete duplicate keys through the log compaction mechanism. What could go wrong here?

Repartitioning

Suppose your grouping key differs from the key under which the event originally came. In that case, Kafka Streams creates a particular repartition topic, sends the event to it under the new key, and then reads it from there and only after that aggregates and sends it to the changelog topic. In our example, the “Ads display” event can come with a key in the form of a UUID. Why not? If you need to do grouping, for example, by three other keys, then these will be three different repartition topics. There will be one additional read and one additional Kafka entry for each topic.

Let’s say you have 100 thousand ad impressions per second at the entrance. In our example, you will create an additional load on the message broker in the amount of +600 thousand messages per second (300 for writing and 300 for reading). And it’s not just a broker. For such volumes, you need to add additional servers with Kafka Streams services. You can calculate how many thousands of dollars such a solution will cost, taking into account the hardware prices.

For readers who are not very familiar with the repartition mechanism, I will clarify one point. This is not a bug or flaw in Kafka Streams. Given its ideology and architecture, this is the only possible behavior — you can’t just turn it off. When a message has a key change, this new key must be spread evenly across the cluster so that each Kafka Streams instance has its own set of keys. For this, additional writing/reading in the repartition topic is used.

Moreover, if instance A has written a message to the topic, it is not a fact that it will read it. Instances B, C, etc. can do this. It depends on who reads which partition. As a result, each grouping key will be more or less evenly distributed across the cluster servers (if you hash it, of course).

Queries to Aggregates

Data is written so that you can work with it later. For example, make requests to them. Here the possibilities were seriously limited. An exhaustive list of how we can request data from Kafka Streams is here or here for window aggregates. If the state store’s standard implementation based on RocksDB is used (and most likely it is), data can be obtained only by keys.

Let’s assume that the first thing you want to do with the resulting data is filtering, sorting, and pagination. But there are no such opportunities here. The most you can do is read everything using the “all()” method and then manually do the necessary operations. You will be lucky if there are not too many keys and the data will fit in the RAM. We were not that lucky. I had to write an additional module that dumped data from RocksDB at night and sent it to Postgres.

You also need to remember that on each service, there is only a part of the keys. What if you need to serve your aggregates, say, over HTTP? Someone requested one of the Kafka Streams services: they say, give me such and such data. The service looks at itself locally — no data. Or there is, but some part. The other part could be on another server. What to do? Kafka Streams Documentation says that it is our problem.

Stability of Kafka Streams

We have problems with the hosting provider. Sometimes equipment fails, sometimes a human factor, sometimes both. If for some reason, the connection with Kafka is lost, then Kafka Streams transfers all its streams to the DEAD state and waits for us to wake up and restart it. Simultaneously, there are neighboring services nearby that work with the same Kafka through Spring and @KafkaListener. They are restored by themselves as if nothing had happened.

Kafka Streams can die for another reason: uncaught exceptions in your code. Let’s say you are making some external call inside Kafka Streams. Let it be a database call. When the database crashes, you have two options. First, you catch this error and keep working. But if the database will be dead for a long time, and working with it is critical, you will read a large number of messages from topics and process them incorrectly. The second option seems to be better — not catch the exception and let Kafka Streams die. Further, as usual: night rise, the restart of all servers with Kafka Streams. You have no default option to wait until the database is restored and continue working automatically.

We had to add the module to each Kafka Streams service that works like a watchdog: it raises Kafka Streams if it sees that it has died.

By the way, if you work with Kafka Streams through Spring, then do not forget to override the standard StreamsBuilderFactoryBean by specifying your CleanupConfig in it. Otherwise, you will be unpleasantly surprised that the entire local RocksDB database is deleted with each restart. Let me remind you that this will lead all servers to read data from the changelog topic with each restart actively. Trust me; you don’t need this.

KStream-KStream Join

Never do that. The Joining of two threads creates dozens of Kafka topics and a huge load on the entire system. Just don’t do it. Or at least check everything under load before putting it into production.

In general, Kafka Streams loves to create topics for its various needs. If you haven’t studied the documentation under a magnifying glass and haven’t tested how it works, then this can come as an unpleasant surprise for you and your DevOps. The more topics there are, the more difficult it is to administer them.

Scalability

Anyone familiar with Kafka will tell you that scalability is one of its main advantages: “What’s the problem? Add partitions and rejoice.” It’s like that. But not for Kafka Streams.

If you use joins, then all of your topics must be co-partitioned, which means, among other things, that they must have the same number of partitions. So what’s the problem?

Let’s imagine that you have made several topologies: with aggregates, joins, everything is as it should be. We put it in production; it works, you are happy. Then the business went uphill, the load began to grow, you added servers, and you want to increase the number of partitions to load these new servers. But Kafka Streams spawned dozens of topics, and everyone needs to raise the number of partitions. Otherwise, the next restart of your services may be the last. If Kafka Streams sees that the topics are not co-partitioned, it will throw an error and won’t start.

Today I have no answer to the question of what to do with this. You can probably extinguish all working Kafka Streams instances, raise the number of partitions on all involved topics, then bring the Kafka Streams back up and pray. Or maybe follow the advice from here: Matthias J. Sax writes that this should be done by creating a new topic with a new number of partitions and connecting Kafka Streams to it with a new “application.id”. There is also an advice that if you know in advance that the load will be large, it is better to make enough partitions beforehand.

Conclusion

If you do not have and do not plan to have high loads in your system, you will most likely not face all these problems. Everything will work fine. Otherwise, it’s worth considering whether you need it, especially if you plan to use the next level of abstraction — KSQL.

Article author: Andrey Burov, Maxilect.

PS. Subscribe to our social networks: Twitter, Telegram, FB to learn about our publications and Maxilect news.

--

--

Maxilect

We are building IT-solutions for the Adtech and Fintech industries. Our clients are SMBs across the Globe (including USA, EU, Australia).