1#pragma once
2#include <Poco/Util/ServerApplication.h>
3#include <daemon/BaseDaemon.h>
4
5/* clickhouse cluster copier util
6 * Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
7 *
8 * See overview in the docs: docs/en/utils/clickhouse-copier.md
9 *
10 * Implementation details:
11 *
12 * cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
13 * Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
14 * A job has three states: Active, Finished and Abandoned. Abandoned means that worker died and did not finish the job.
15 *
16 * If an error occurred during the copying (a worker failed or a worker did not finish the INSERT), then the whole partition (on
17 * all destination servers) should be dropped and refilled. So, copying entity is a partition of all destination shards.
18 * If a failure is detected a special /is_dirty node is created in ZooKeeper signalling that other workers copying the same partition
19 * should stop, after a refilling procedure should start.
20 *
21 * ZooKeeper task node has the following structure:
22 * /task/path_root - path passed in --task-path parameter
23 * /description - contains user-defined XML config of the task
24 * /task_active_workers - contains ephemeral nodes of all currently active workers, used to implement max_workers limitation
25 * /server_fqdn#PID_timestamp - cluster-copier worker ID
26 * ...
27 * /tables - directory with table tasks
28 * /cluster.db.table1 - directory of table_hits task
29 * /partition1 - directory for partition1
30 * /shards - directory for source cluster shards
31 * /1 - worker job for the first shard of partition1 of table test.hits
32 * Contains info about current status (Active or Finished) and worker ID.
33 * /2
34 * ...
35 * /partition_active_workers
36 * /1 - for each job in /shards a corresponding ephemeral node created in /partition_active_workers
37 * It is used to detect Abandoned jobs (if there is Active node in /shards and there is no node in
38 * /partition_active_workers).
39 * Also, it is used to track active workers in the partition (when we need to refill the partition we do
40 * not DROP PARTITION while there are active workers)
41 * /2
42 * ...
43 * /is_dirty - the node is set if some worker detected that an error occurred (the INSERT is failed or an Abandoned node is
44 * detected). If the node appeared workers in this partition should stop and start cleaning and refilling
45 * partition procedure.
46 * During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
47 * workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
48 * /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
49 * /cluster.db.table2
50 * ...
51 */
52
53namespace DB
54{
55
56class ClusterCopierApp : public BaseDaemon
57{
58public:
59
60 void initialize(Poco::Util::Application & self) override;
61
62 void handleHelp(const std::string &, const std::string &);
63
64 void defineOptions(Poco::Util::OptionSet & options) override;
65
66 int main(const std::vector<std::string> &) override;
67
68private:
69
70 using Base = BaseDaemon;
71
72 void mainImpl();
73
74 void setupLogging();
75
76 std::string config_xml_path;
77 std::string task_path;
78 std::string log_level = "debug";
79 bool is_safe_mode = false;
80 double copy_fault_probability = 0;
81 bool is_help = false;
82
83 std::string base_dir;
84 std::string process_path;
85 std::string process_id;
86 std::string host_id;
87};
88
89}
90