How we transferred the Clickhouse database between geographically distant data centers
At the end of last year, we wrote about the complex move of our own service to a new data center in Detroit. Among other tasks, we transferred Clickhouse. Let me remind you that we are talking about a heavily loaded service that serves dozens of servers, handling hundreds of thousands of low-latency requests per second.
In this article, we explain how we transferred the data without the ability to shut down the service or use automatic replication.
The volume of data for Clickhouse is not that large for us — the process turned out to be not so much voluminous as resource-intensive. However, there is minimal information available in open sources about the mechanisms we used, so consider this as a guide to the clickhouse-copier utility (https://github.com/ClickHouse/copier) with a concrete example including scripts and commands for execution.
The Simplest Approach
Our service operates 24/7, which means we can’t just turn it off and transfer everything by copying. We planned the move, but due to some limitations of Clickhouse itself (more precisely, its replication schema), we couldn’t implement it according to plan.
Initially, we assumed that we could connect a replica from the new data center to each shard in the old one, wait for synchronization, and then simply disconnect the replicas in the old data center. However, due to the geographical distance, we experienced too much data transfer latency between the data centers, despite a speed of 2–2.5 Gbps. As a result, the amount of data in ZooKeeper, which coordinated the replication, increased significantly. So, we had to stop the process as it threatened to slow down production. We had to look for other ways to move.
The database stores all requests coming to our service. In Clickhouse, we store two types of data:
- “Raw data” for two weeks. During this time, about 6 TB accumulates.
- “Aggregates” — important results from processing raw data. Aggregates take up about 300 GB.
Our first solution was to wait for the data aggregation process and transfer only the aggregates to the new data center; meanwhile, launching new shards and transferring services. So, the task boiled down to finding a way to transfer data without losing a byte of those 300 GB of aggregates.
We found a list of possible approaches in an article by Altinity: https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-data-migration/. In our case, there were two options for further actions: physical transfer or copying using clickhouse-copier
Next, we’ll talk about each of these options.
Physical Data Transfer
The first possible option is low-level (physical) data transfer.
Clickhouse stores data in the form of Parts, which can be physically copied from one server to another as files. To do this, on the source server, you need to execute a similar query and detach all Parts sequentially for all tables:
#DETACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> DETACH PARTITION|PART <PARTITION_EXPRESSION>
Then, copy the files from the directories
`ClickHouse_data_dir/<DATABASE_NAME>/<TABLE_NAME>/detached`
to the new server, and then execute the reverse query:
#ATTACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> ATTACH PARTITION|PART <PARTITION_EXPRESSION>
We tested this option, but in one of the experiments, the number of columns in the tables on the source and destination servers did not match. Perhaps something was incorrectly merged. We did not investigate further, let alone risk it in production, and decided to use a safer method.
Copying with clickhouse-copier
The second option is higher-level copying using a utility from Clickhouse. A SELECT is performed on the source database, followed by an INSERT on the destination side. The utility keeps track of all its tasks and progress in ZooKeeper, which eliminates problems.
There was very little information available in open sources about this method. We had to figure it out from scratch.
This method seemed more reliable, although not without its peculiarities. For example, it turned out that if the utility is run on the source side, the data does not match up after copying. Possibly, network issues were to blame — it seemed that some data fragments simply did not reach the new data center, and the utility did not register this. However, if it was run on the destination side, the data matched 100%.
The process of transferring data for one shard took 3–4 hours, with an additional half-hour needed for various associated manipulations, particularly copying within the server, since we initially transferred the data to a “temporary” table. We could not perform the copy directly into the production table. Because the cluster consists of machines in two data centers during the copying process, we would have ended up with duplicate statistics. So, we copied data from Miami to a temporary table in Detroit (overcoming the geographical distance), and then within the Detroit data center, we merged it with production, performing inserts of 500–600 million columns.
Despite the precautions, we still encountered a couple of incidents where clients exceeded the limits set in the admin panel, as they were seeing incomplete statistics during the copying process. However, the total losses were around $20.
Copying in practice
The copying process looks as follows.
To start working, we need to put data for the utility into ZooKeeper.
zkCli.sh -server localhost:2181 create /clickhouse/copytasks ""
Next, we need to create a copying schema — a file named `schema.xml`.
<clickhouse>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>IP</host>
<port>9000</port>
<user>default</user>
<password></password>
</replica>
</shard>
</source_cluster>
<destination_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>IP</host>
<port>9000</port>
<user>default</user>
<password></password>
</replica>
</shard>
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>1</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>database</database_pull>
<table_pull>table_local</table_pull>
<cluster_push>destination_cluster</cluster_push>
<database_push>database</database_push>
<table_push>table_local1</table_push>
<engine>
ENGINE=ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ssp_report_common1', '{replica}')
partition by toYYYYMMDD(sspRequestDate)
order by (sspRequestDate, dspId, sspRequestCountry, endpointId)
</engine>
<sharding_key>rand()</sharding_key>
</table_hits>
</tables>
</clickhouse>
Here:
- The `clickhouse` section describes the servers — from where and to where we are copying;
- The `tables` section describes which tables we are copying;
- In `table_hits` is the description of the process itself.
After that, we send this file to ZooKeeper:
zkCli.sh -server localhost:2181 create /clickhouse/copytasks/description "`cat schema.xml`"
Next, we go to the Clickhouse server and create a file `zookeeper.xml`, which is necessary for the utility to work:
<clickhouse>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>
Execute:
clickhouse-copier --config-file=zookeeper.xml --task-path=/clickhouse/copytasks
After everything finished, clear zookeeper:
zkCli.sh -server localhost:2181 deleteall /clickhouse/copytasks
After verifying that all the data matched, the final step is to execute the SQL command:
INSERT INTO table_local SELECT * FROM table_local1;
Overall, the migration took 10 days (considering that we were concurrently migrating other parts of the service). We hope this article will help you save time when approaching similar tasks.
Article authors: Igor Ivanov and Denis Palaguta, Maxilect.