About two years ago, a small article titled “Kafka Streams — a Challenging Journey in Production” was published, in which I described the difficulties our team encountered when attempting to tackle project tasks using Kafka Streams. The experiment didn’t yield the desired results, and we ultimately abandoned this technology altogether. Instead, we decided to give ClickHouse (CH) a try, and this database has proven to be an excellent fit for us. It effectively addresses almost all the business challenges we face. In this article, I’ll share insights into the use of CH.
I don’t intend to pit Kafka Streams against ClickHouse or even compare them, as they are entirely different technologies. Both, in theory, could have solved the challenges we faced. This is perhaps the only similarity between them.
Implementing CH in our project was a complex and labor-intensive process that ultimately ended successfully, though not without its share of issues. In this article, similar to the previous one, I will describe the challenges we encountered at various stages. I will also mention some time-tested practices we employed. This is our experience, much of which is not found in CH documentation. I hope it will be helpful to the readers.
When I first started working with CH and designing the system architecture around it, I sorely lacked external experience to provide answers like “yes, this will work” or “no, it’s better not to do it this way.” The purpose of this article is to offer answers to similar questions for those who find themselves in need of guidance.
What Do We Have Now?
As of today, we have a ClickHouse (CH) cluster which is located in two data centers on different continents, comprising a total of 14 servers: 7 shards with 2 replicas in each.
Initially, we deployed CH version 20.3.19.4, and it ran for a long time without updates. Only recently did we decide to update it to version 23.3.1.2823. Below, I will describe the challenges we faced during the update.
We have one primary “raw” (or “wide”) table where events from the entire platform are recorded. Additionally, we have around 70 small tables known as Materialized Views (MVs), which are derived from the primary table and mostly contain aggregated data.
The raw table has around 150 columns, and we insert about 3 million INSERTs per second into it. This amounts to approximately 3 terabytes of compressed data per day. The insertion is performed through the built-in Kafka engine of the database. At the same time, the cluster receives about 3 SELECT queries per second (just three, not three million). The servers themselves are quite powerful and not too idle. Still, they have sufficient resilience to handle increased loads.
Before implementing CH, we had significant doubts about whether we could handle such a large volume of incoming events, especially considering that we store not only aggregated values (MVs) but also the raw data for analysis. The number of servers on the platform is a critical factor for us. As it turned out, CH is remarkably well optimized and consumes surprisingly few resources relative to the data volume. In this regard, we were pleasantly surprised.
Next, let’s go over the key solutions we use in our system.
Kafka Engine
As I’ve mentioned before, we decided to insert data into the database using the built-in ClickHouse (CH) engine. There were several reasons for this choice:
- We didn’t want to independently handle the task of grouping events into batches before insertion.
- Developing a separate service for insertion (or using a pre-built one) would entail additional servers, extra network load, as well as development and support costs, among other things. In simpler terms, using the Kafka engine was more cost-effective.
- Alexey Milovidov recommended its use here: https://www.youtube.com/watch?v=Ac2C2G2g8Cg
- Considerable work to stabilize the engine was carried out by these individuals: https://altinity.com/blog/clickhouse-kafka-engine-faq
Overall, we didn’t have many complaints about the engine, except for the fact that in our version 20.3.19.4, we had to create one table for each consumer. As a result, we had 15 identical tables on each server, with suffixes _1, _2, … _15 in their names. Without this approach, scaling to a large number of partitions was impossible.
In newer versions of CH, the ability to create a single table with the kafka_num_consumers parameter was introduced. I recommend using this option, as each individual table itself places a significant load on the processor.
For some reason, the kafka_num_consumers parameter is limited by the number of cores on the server, so if you need a greater number of threads, you have to create additional tables.
Migrations
One of the main challenges we encountered in this database is adding or removing columns, especially when it concerns Materialized Views (MV). When we upgraded ClickHouse from version 20.3.19.4 to 23.3.1.2823, we did it in several stages. Each stage sometimes lasted for more than a week, so I have experience in performing migrations on different versions over an extended period. Here’s an incomplete list of various issues we faced, not tied to specific versions:
- Commands like “alter” sometimes hang indefinitely, occasionally for over a day, randomly.
- The “drop view” command occasionally deletes not only the MV itself but also the table where the data is stored.
- It’s impossible to modify an MV query without downtime. There is a time interval between the “drop view” and “create materialized view” commands when no data is written to the table. In newer versions, attempts were made to address this, but the fix doesn’t work for all types of MV and still seems to be an experimental feature.
- It’s impossible to execute “alter” commands smoothly with “on cluster” on replicated tables.
- You can’t detach an MV from a physical table before executing an “alter.”
I want to emphasize that these are just the issues we faced at different times. I can’t guarantee that you’ll encounter the same problems. I can say one thing for sure: migrations in ClickHouse are a lengthy and labor-intensive manual process that is difficult to automate (if possible at all). What’s worse, it is complicated by issues and bugs in ClickHouse itself, some of which I mentioned above.
Denormalization — It’s Normal
I’ll start with a seemingly scary statement: in our raw table, there are actually 11 tables when viewed from the perspective of a relational database. However, for ClickHouse (CH), this is perfectly normal.
Imagine a scenario where you have two large streams of events in your system. The first stream is enormous, and the second is just large. The first and second streams are related to each other in a one-to-many relationship. In the case of a relational database, you would create two tables and write each stream of events to its own table. Then, you would use JOIN operations to work with this data.
In the context of CH, this is a significant mistake. You cannot perform JOIN operations on tables with raw data because there’s simply too much data to fit into memory.
If you are confident that events from the second stream occur only after events from the first stream (as is the case for us), the right solution here is to create a single table for the first stream, structured with Nested data, which stores related events from the second stream. Nested is essentially an array of events where you can add new events as they arrive. You might have two questions here:
1. How can we add new events to the array if CH doesn’t allow frequent UPDATE commands? (Especially considering that the second stream is also large)
The answer to this question is the VersionedCollapsingMergeTree engine. It works wonderfully in this case, and I recommend using it. The complexity here lies in incrementing the “version” field in a thread-safe manner. For this, you’ll need to write additional code, using an external key-value storage for locks, such as Redis.
2. Is it convenient to execute analytical queries when events are stored in an array?
Yes, it is. CH offers a wide range of functions for working with arrays, with “array join” making life easier. We haven’t encountered any issues in this regard.
I’ll also provide another example of denormalization that we use. Imagine you have an event A that generates multiple events B, each of which generates multiple events C. All three events happen simultaneously. In a relational database, you’d typically have three tables connected with one-to-many relationships.
In CH, you can store all of this in one table. Each row in this table represents an event C, and it also contains data about events A and B. For example:
Row 1: A1 B1 C1
Row 2: A1 B1 C2
Row 3: A1 B1 C3
Row 4: A1 B2 C4
Row 5: A1 B2 C5
...etc
And so on. You might argue that there will be a lot of duplication, and you’re right. However, for CH, this is a normal situation because it efficiently compresses such data. With this table schema implementation, you can perform any analytical queries and create various Materialized Views (MV) without the need for JOIN operations.
SummingMergeTree or AggregatingMergeTree?
Based on our experience, it can be said that the vast majority of Materialized Views (MV) use the SummingMergeTree engine because, in practice, other aggregation methods are rarely used. Of course, it’s important to note that this may vary depending on the specific domain. In our case, out of 70 tables, we only use AggregatingMergeTree in one.
We also have had a negative experience with using uniqState/uniqMerge functions in AggregatingMergeTree: these functions can severely impact the database’s performance, generating a large amount of data in each row. In hindsight, this becomes clear when you understand how these functions work. However, this doesn’t negate the fact that using them on large volumes of data is not advisable, and you won’t know this until you try. It’s quite possible that this pair of functions is not the only one with such issues, and the AggregatingMergeTree engine may have other similar surprises.
The general idea is that by default, it’s better to choose SummingMergeTree, and AggregatingMergeTree should be used in specific cases, closely monitoring the load it places on the servers. Unfortunately, there is no magic in AggregatingMergeTree.
Queries Without Indexes
Before we started using the database, we were initially concerned about the fact that the raw table would have many columns, but there would be only a primary key, and we couldn’t create indexes. How fast would queries work?
As it turned out, in most cases, this isn’t a problem if you follow these conditions:
- Always specify a time interval in your query for which you need to calculate something. The column with the event’s timestamp should be part of the primary key.
- Use the minimum number of columns in your queries. The fewer columns in the query, the less data ClickHouse has to read from the disk.
- Don’t request very old data, for example, data from the last few hours. In this case, the data will be retrieved from the fast system cache instead of the slower disk.
Practically, almost all of our queries meet these conditions and work at an acceptable speed for analysts. Occasionally, there’s a need to calculate something based on older data, such as data from previous days. In this case, the database has to read them from the disk, and you’ll need patience to wait for the results.
I want to emphasize that this is not about Materialized Views (MV), but specifically about queries in the raw table that analysts in our team perform. There are no problems with MV in this regard, as the data there is significantly smaller. If you need to provide some aggregated information to end users, then MV should be used.
Multiple Data Centers
Our system processes events in two data centers: one located in Europe and the other in the USA. ClickHouse needs to display combined data from both locations. We faced a choice between two options:
1. Transfer data from one continent to the other using replication mechanisms.
2. Set up a separate set of shards in each data center without cross-data center replication.
In the first case, we would be transmitting much larger volumes of data over the internet, but SELECT queries would be faster due to all data in the cluster being located in the local network. In the second case, it’s the opposite; relatively little data is transmitted between continents, but for each SELECT query, we have to wait for responses from both the local and remote data centers, which noticeably slows down query execution.
We chose the second option because the volume of incoming data in our system is significantly higher than the data volume used in SELECT queries. There was a concern that query delays would be too significant, but this concern was not justified. During testing, we observed a slowdown of around 200 milliseconds, which is acceptable for us.
Monitoring
ClickHouse provides a vast number of metrics that can be exported to Prometheus. With these metrics, we easily set up an informative dashboard that, in most cases, helps us understand whether the database is operating normally or if there are any issues. Here is a list of the main metrics we use:
ClickHouseMetrics_ReadonlyReplica
ClickHouseMetrics_VersionInteger
ClickHouseMetrics_BackgroundSchedulePoolTask
ClickHouseMetrics_BackgroundMergesAndMutationsPoolTask
ClickHouseMetrics_BackgroundFetchesPoolTask
ClickHouseMetrics_ReplicatedFetch
ClickHouseMetrics_BackgroundMessageBrokerSchedulePoolTask
ClickHouseMetrics_HTTPConnection
ClickHouseProfileEvents_ReplicatedPartFailedFetches
ClickHouseProfileEvents_InsertedRows
ClickHouseProfileEvents_DelayedInserts
ClickHouseProfileEvents_InsertedBytes
ClickHouseProfileEvents_DelayedInsertsMilliseconds
ClickHouseProfileEvents_SelectQuery
ClickHouseProfileEvents_MergedRows
ClickHouseProfileEvents_RejectedInserts
ClickHouseProfileEvents_DistributedConnectionStaleReplica
ClickHouseAsyncMetrics_Uptime
ClickHouseAsyncMetrics_MaxPartCountForPartition
ClickHouseAsyncMetrics_ReplicasMaxAbsoluteDelay
This is not an exhaustive list of available metrics, but if you’re not sure where to start, you can begin with these metrics and add or modify them as needed.
External Dictionaries
The external dictionaries feature has proven to be so successful for us that we use it as often as Materialized Views (MV). Hardly any of our queries to the database are performed without external dictionaries.
The main advantage here is that you don’t need to store all possible information in the raw table. You can store, for example, only an ID and then request additional information from external dictionaries in your queries. In terms of query performance, this approach feels almost cost-free (we primarily use the flat key type and occasionally ip_trie).
Problems with Updates
As I mentioned earlier, we updated the database from version 20.3.19.4 to 23.3.1.2823. We did this not in one go but in multiple iterations, gradually transitioning from one stable version to another from the altinity list: https://docs.altinity.com/altinitystablebuilds/.
The motivation for the database update was primarily the introduction of User-Defined Functions (UDF) and window functions, which added additional optimization possibilities to our toolkit and, in many aspects, simplified our lives. Otherwise, the old version of the database was sufficient for our business needs.
The update was not without its challenges, to say the least. I’ll describe the most glaring issues.
The first problem we encountered when moving to one of the stable versions was data loss in tables. The schema was straightforward and looked something like this:
create table foo
(
date Date,
field1 UInt16,
field2 FixedString(2),
field3 UInt16,
aggregate1 Int64,
aggregate2 Decimal128(15)
)
engine = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/foo', '{replica}')
partition by toYYYYMMDD(date)
order by (date, field1, field2, field3)
- Insert with MV:
create materialized view foo_mv to foo
as select date,
field1,
field2,
field3,
sum(sign) as aggregate1,
sum(money * sign) as aggregate2
from bar
group by toDate(eventTime) as date, field1, field2, field3;
Here, “bar” is our raw table through which aggregates are inserted into other Materialized Views (MV).
It’s important to note that we generally try not to use any undocumented, unrecommended, or experimental features in ClickHouse. We strictly adhere to the documentation. As a result, most of our MVs and tables are quite simple — I would even say they are unexciting.
We were quite surprised when, immediately after updating the database, we started receiving complaints from users about missing data in their reports. The data loss occurred at random times; sometimes everything was fine for an entire day, and at other times, data would disappear several times an hour. The data loss didn’t occur completely — some of it disappeared while some remained. This data loss only affected recent days, and it happened randomly, like data from yesterday, from a few days ago, and so on.
Users might see, for instance, that they had earned $1000 at the end of the day. However, the next day in the morning, they might see only $700 instead of $1000. Over the past week, they might be missing an additional $2000, and so on.
We tried to find this issue on the ClickHouse Telegram channel and GitHub but, unfortunately, without success. A quick analysis on our part showed that the loss occurred during the merge of data chunks. After merging two pieces with correct data, a piece without data might be generated.
The problematic tables worked flawlessly before the update for two years. They broke after the update, so we were confident that the problem lay within the database itself and that it wasn’t our mistake.
The method we used to overcome this issue was simple but effective. First, we established a mechanism for quick data recovery, which we executed each time data went missing. Afterward, we began to disable all the settings introduced or altered in the new ClickHouse version one by one until the problem ceased. It took us about a week to pinpoint the culprit — the setting “optimize_on_insert,” which was turned off by default in the old version but enabled in the new one. After we disabled this setting, the data stopped disappearing from the database.
During the updates, we also encountered issues with backward compatibility. For instance, we have several dictionaries with an “ip_trie” key. In the old version, to retrieve data using such a key, we had to pass IP addresses of the FixedString(16) type to the dictGet function. However, our IP addresses were IPv6. To make it work, we had to perform this transformation:
dictGet('my_dictionary', 'my_field',
tuple(cast(ip_field as FixedString(16)))
After the update, this stopped working, and we started receiving an error:
<Error> void DB::StorageKafka::threadFunc(size_t): Code: 48. DB::Exception:
CAST AS FixedString is only implemented for types String and FixedString:
While processing dictGet … (NOT_IMPLEMENTED).
We tested all updates in a testing environment, so these errors didn’t cause us significant inconvenience. We simply modified our queries to make them work in the updated version. However, this didn’t always work. For data ingestion from Kafka to ClickHouse, we use the Cap’n Proto protocol, and after one of the updates, deserialization broke in multiple places, leaving no viable workarounds in some cases. I raised a couple of tickets for these issues: https://github.com/ClickHouse/ClickHouse/issues/43319 and https://github.com/ClickHouse/ClickHouse/issues/46522. Fortunately, we managed to get in touch with the author of the changes, and he promptly resolved all the issues.
Finally, let me describe the very latest problem we encountered just a few days ago. We recently started using User-Defined Functions (UDF) in our service queries. We immediately noticed that in ClickHouse, the system.processes table started to have “stuck” or “hanging” queries. What’s interesting is that these queries get stuck randomly, at random times, and it’s not necessarily related to the presence of UDF. Even old, lightweight queries that haven’t changed in a while can get stuck. Over several days, these stuck queries accumulate until they reach the maximum limit, and ClickHouse stops accepting new queries.
We’ve tried to fix this issue with both ClickHouse configurations and on the side of our services, but so far, without any success. Every few days, we have to restart all the servers to clear the table of stuck queries. And yes, the “kill query” command doesn’t work on them.
Not long ago, another stable version of the database was released: https://docs.altinity.com/releasenotes/altinity-stable-release-notes/23.3/altinity-stable-23.3.8/. It’s a few minor versions newer than the one we’re currently using. As of today, our main hope is that updating to this newer version will resolve the issue.
Conclusion
If I were to go back several years and choose the technology for our project again, my choice wouldn’t change: I would still choose ClickHouse and implement the same architectural solutions. ClickHouse fits perfectly with our project. I’m not aware of better alternatives. In my opinion, ClickHouse is the only solution for our project today.
However, I still urge readers to think twice before deciding to start using ClickHouse. It’s a very specific tool for very specific tasks. You will undoubtedly encounter a variety of problems with it, but these problems are typically solvable.
If you’re unsure whether you need ClickHouse for your project, then most likely, you don’t need it, and you should opt for a classic database.
Article author: Andrey Burov, Maxilect.
PS. Subscribe to our social networks: Twitter, Telegram, FB to learn about our publications and Maxilect news.