1#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
2#include <Storages/MergeTree/MergeTreePartInfo.h>
3#include <Common/ZooKeeper/ZooKeeper.h>
4#include <boost/algorithm/string.hpp>
5#include <boost/program_options.hpp>
6#include <IO/ReadHelpers.h>
7
8#include <unordered_map>
9#include <cmath>
10
11
12std::vector<std::string> getAllShards(zkutil::ZooKeeper & zk, const std::string & root)
13{
14 return zk.getChildren(root);
15}
16
17
18std::vector<std::string> removeNotExistingShards(zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards)
19{
20 auto existing_shards = getAllShards(zk, root);
21 std::vector<std::string> filtered_shards;
22 filtered_shards.reserve(shards.size());
23 for (const auto & shard : shards)
24 if (std::find(existing_shards.begin(), existing_shards.end(), shard) == existing_shards.end())
25 std::cerr << "Shard " << shard << " not found." << std::endl;
26 else
27 filtered_shards.emplace_back(shard);
28 return filtered_shards;
29}
30
31
32std::vector<std::string> getAllTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard)
33{
34 return zk.getChildren(root + "/" + shard);
35}
36
37
38std::vector<std::string> removeNotExistingTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard, const std::vector<std::string> & tables)
39{
40 auto existing_tables = getAllTables(zk, root, shard);
41 std::vector<std::string> filtered_tables;
42 filtered_tables.reserve(tables.size());
43 for (const auto & table : tables)
44 if (std::find(existing_tables.begin(), existing_tables.end(), table) == existing_tables.end())
45 std::cerr << "\tTable " << table << " not found on shard " << shard << "." << std::endl;
46 else
47 filtered_tables.emplace_back(table);
48 return filtered_tables;
49}
50
51
52Int64 getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk,
53 const std::string & replica_path,
54 const std::string & partition_name,
55 const DB::MergeTreeDataFormatVersion & format_version)
56{
57 auto replicas_path = replica_path + "/replicas";
58 auto replica_hosts = zk.getChildren(replicas_path);
59 Int64 max_block_num = 0;
60 for (const auto & replica_host : replica_hosts)
61 {
62 auto parts = zk.getChildren(replicas_path + "/" + replica_host + "/parts");
63 for (const auto & part : parts)
64 {
65 try
66 {
67 auto info = DB::MergeTreePartInfo::fromPartName(part, format_version);
68 if (info.partition_id == partition_name)
69 max_block_num = std::max<Int64>(info.max_block, max_block_num);
70 }
71 catch (const DB::Exception & ex)
72 {
73 std::cerr << ex.displayText() << ", Part " << part << "skipped." << std::endl;
74 }
75 }
76 }
77 return max_block_num;
78}
79
80
81Int64 getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path)
82{
83 Coordination::Stat stat;
84 zk.get(part_path, &stat);
85
86 /// References:
87 /// https://stackoverflow.com/a/10347910
88 /// https://bowenli86.github.io/2016/07/07/distributed%20system/zookeeper/How-does-ZooKeeper-s-persistent-sequential-id-work/
89 return (stat.cversion + stat.numChildren) / 2;
90}
91
92
93std::unordered_map<std::string, Int64> getPartitionsNeedAdjustingBlockNumbers(
94 zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards, const std::vector<std::string> & tables)
95{
96 std::unordered_map<std::string, Int64> result;
97
98 std::vector<std::string> use_shards = shards.empty() ? getAllShards(zk, root) : removeNotExistingShards(zk, root, shards);
99
100 for (const auto & shard : use_shards)
101 {
102 std::cout << "Shard: " << shard << std::endl;
103 std::vector<std::string> use_tables = tables.empty() ? getAllTables(zk, root, shard) : removeNotExistingTables(zk, root, shard, tables);
104
105 for (auto table : use_tables)
106 {
107 std::cout << "\tTable: " << table << std::endl;
108 std::string table_path = root + "/" + shard + "/" + table;
109 std::string blocks_path = table_path + "/block_numbers";
110
111 std::vector<std::string> partitions;
112 DB::MergeTreeDataFormatVersion format_version;
113 try
114 {
115 format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata")).data_format_version;
116 partitions = zk.getChildren(blocks_path);
117 }
118 catch (const DB::Exception & ex)
119 {
120 std::cerr << ex.displayText() << ", table " << table << " skipped." << std::endl;
121 continue;
122 }
123
124 for (auto partition : partitions)
125 {
126 try
127 {
128 std::string part_path = blocks_path + "/" + partition;
129 Int64 partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version);
130 Int64 current_block_number = getCurrentBlockNumberForPartition(zk, part_path);
131 if (current_block_number < partition_max_block + 1)
132 {
133 std::cout << "\t\tPartition: " << partition << ": current block_number: " << current_block_number
134 << ", max block number: " << partition_max_block << ". Adjusting is required." << std::endl;
135 result.emplace(part_path, partition_max_block);
136 }
137 }
138 catch (const DB::Exception & ex)
139 {
140 std::cerr << ex.displayText() << ", partition " << partition << " skipped." << std::endl;
141 }
142 }
143 }
144 }
145 return result;
146}
147
148
149void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, Int64 new_current_block_number)
150{
151 Int64 current_block_number = getCurrentBlockNumberForPartition(zk, path);
152
153 auto create_ephemeral_nodes = [&](size_t count)
154 {
155 std::string block_prefix = path + "/block-";
156 Coordination::Requests requests;
157 requests.reserve(count);
158 for (size_t i = 0; i != count; ++i)
159 requests.emplace_back(zkutil::makeCreateRequest(block_prefix, "", zkutil::CreateMode::EphemeralSequential));
160 auto responses = zk.multi(requests);
161
162 std::vector<std::string> paths_created;
163 paths_created.reserve(responses.size());
164 for (const auto & response : responses)
165 {
166 const auto * create_response = dynamic_cast<Coordination::CreateResponse*>(response.get());
167 if (!create_response)
168 {
169 std::cerr << "\tCould not create ephemeral node " << block_prefix << std::endl;
170 return false;
171 }
172 paths_created.emplace_back(create_response->path_created);
173 }
174
175 std::sort(paths_created.begin(), paths_created.end());
176 for (const auto & path_created : paths_created)
177 {
178 Int64 number = DB::parse<Int64>(path_created.c_str() + block_prefix.size(), path_created.size() - block_prefix.size());
179 if (number != current_block_number)
180 {
181 char suffix[11] = "";
182 sprintf(suffix, "%010lld", current_block_number);
183 std::string expected_path = block_prefix + suffix;
184 std::cerr << "\t" << path_created << ": Ephemeral node has been created with an unexpected path (expected something like "
185 << expected_path << ")." << std::endl;
186 return false;
187 }
188 std::cout << "\t" << path_created << std::endl;
189 ++current_block_number;
190 }
191
192 return true;
193 };
194
195 if (current_block_number >= new_current_block_number)
196 return;
197
198 std::cout << "Creating ephemeral sequential nodes:" << std::endl;
199 create_ephemeral_nodes(1); /// Firstly try to create just a single node.
200
201 /// Create other nodes in batches of 50 nodes.
202 while (current_block_number + 50 <= new_current_block_number)
203 create_ephemeral_nodes(50);
204
205 create_ephemeral_nodes(new_current_block_number - current_block_number);
206}
207
208
209int main(int argc, char ** argv)
210try
211{
212 /// Parse the command line.
213 namespace po = boost::program_options;
214 po::options_description desc("Allowed options");
215 desc.add_options()
216 ("help,h", "show help")
217 ("zookeeper,z", po::value<std::string>(), "Addresses of ZooKeeper instances, comma-separated. Example: example01e.yandex.ru:2181")
218 ("path,p", po::value<std::string>(), "[optional] Path of replica queue to insert node (without trailing slash). By default it's /clickhouse/tables")
219 ("shard,s", po::value<std::string>(), "[optional] Shards to process, comma-separated. If not specified then the utility will process all the shards.")
220 ("table,t", po::value<std::string>(), "[optional] Tables to process, comma-separated. If not specified then the utility will process all the tables.")
221 ("dry-run", "[optional] Specify if you want this utility just to analyze block numbers without any changes.");
222
223 po::variables_map options;
224 po::store(po::parse_command_line(argc, argv, desc), options);
225
226 auto show_usage = [&]
227 {
228 std::cout << "Usage: " << std::endl;
229 std::cout << " " << argv[0] << " [options]" << std::endl;
230 std::cout << desc << std::endl;
231 };
232
233 if (options.count("help") || (argc == 1))
234 {
235 std::cout << "This utility adjusts the /block_numbers zookeeper nodes to the correct block number in partition." << std::endl;
236 std::cout << "It might be useful when incorrect block numbers stored in zookeeper don't allow you to insert data into a table or drop/detach a partition." << std::endl;
237 show_usage();
238 return 0;
239 }
240
241 if (!options.count("zookeeper"))
242 {
243 std::cerr << "Option --zookeeper should be set." << std::endl;
244 show_usage();
245 return 1;
246 }
247
248 std::string root = options.count("path") ? options.at("path").as<std::string>() : "/clickhouse/tables";
249
250 std::vector<std::string> shards, tables;
251 if (options.count("shard"))
252 boost::split(shards, options.at("shard").as<std::string>(), boost::algorithm::is_any_of(","));
253 if (options.count("table"))
254 boost::split(tables, options.at("table").as<std::string>(), boost::algorithm::is_any_of(","));
255
256 /// Check if the adjusting of the block numbers is required.
257 std::cout << "Checking if adjusting of the block numbers is required:" << std::endl;
258 zkutil::ZooKeeper zookeeper(options.at("zookeeper").as<std::string>());
259 auto part_paths_with_max_block_numbers = getPartitionsNeedAdjustingBlockNumbers(zookeeper, root, shards, tables);
260
261 if (part_paths_with_max_block_numbers.empty())
262 {
263 std::cout << "No adjusting required." << std::endl;
264 return 0;
265 }
266
267 std::cout << "Required adjusting of " << part_paths_with_max_block_numbers.size() << " block numbers." << std::endl;
268
269 /// Adjust the block numbers.
270 if (options.count("dry-run"))
271 {
272 std::cout << "This is a dry-run, exiting." << std::endl;
273 return 0;
274 }
275
276 std::cout << std::endl << "Adjusting the block numbers:" << std::endl;
277 for (const auto & [part_path, max_block_number] : part_paths_with_max_block_numbers)
278 setCurrentBlockNumber(zookeeper, part_path, max_block_number + 1);
279
280 return 0;
281}
282catch (const Poco::Exception & e)
283{
284 std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
285 throw;
286}
287