1 | #include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h> |
2 | #include <Storages/MergeTree/MergeTreePartInfo.h> |
3 | #include <Common/ZooKeeper/ZooKeeper.h> |
4 | #include <boost/algorithm/string.hpp> |
5 | #include <boost/program_options.hpp> |
6 | #include <IO/ReadHelpers.h> |
7 | |
8 | #include <unordered_map> |
9 | #include <cmath> |
10 | |
11 | |
12 | std::vector<std::string> getAllShards(zkutil::ZooKeeper & zk, const std::string & root) |
13 | { |
14 | return zk.getChildren(root); |
15 | } |
16 | |
17 | |
18 | std::vector<std::string> removeNotExistingShards(zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards) |
19 | { |
20 | auto existing_shards = getAllShards(zk, root); |
21 | std::vector<std::string> filtered_shards; |
22 | filtered_shards.reserve(shards.size()); |
23 | for (const auto & shard : shards) |
24 | if (std::find(existing_shards.begin(), existing_shards.end(), shard) == existing_shards.end()) |
25 | std::cerr << "Shard " << shard << " not found." << std::endl; |
26 | else |
27 | filtered_shards.emplace_back(shard); |
28 | return filtered_shards; |
29 | } |
30 | |
31 | |
32 | std::vector<std::string> getAllTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard) |
33 | { |
34 | return zk.getChildren(root + "/" + shard); |
35 | } |
36 | |
37 | |
38 | std::vector<std::string> removeNotExistingTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard, const std::vector<std::string> & tables) |
39 | { |
40 | auto existing_tables = getAllTables(zk, root, shard); |
41 | std::vector<std::string> filtered_tables; |
42 | filtered_tables.reserve(tables.size()); |
43 | for (const auto & table : tables) |
44 | if (std::find(existing_tables.begin(), existing_tables.end(), table) == existing_tables.end()) |
45 | std::cerr << "\tTable " << table << " not found on shard " << shard << "." << std::endl; |
46 | else |
47 | filtered_tables.emplace_back(table); |
48 | return filtered_tables; |
49 | } |
50 | |
51 | |
52 | Int64 getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk, |
53 | const std::string & replica_path, |
54 | const std::string & partition_name, |
55 | const DB::MergeTreeDataFormatVersion & format_version) |
56 | { |
57 | auto replicas_path = replica_path + "/replicas" ; |
58 | auto replica_hosts = zk.getChildren(replicas_path); |
59 | Int64 max_block_num = 0; |
60 | for (const auto & replica_host : replica_hosts) |
61 | { |
62 | auto parts = zk.getChildren(replicas_path + "/" + replica_host + "/parts" ); |
63 | for (const auto & part : parts) |
64 | { |
65 | try |
66 | { |
67 | auto info = DB::MergeTreePartInfo::fromPartName(part, format_version); |
68 | if (info.partition_id == partition_name) |
69 | max_block_num = std::max<Int64>(info.max_block, max_block_num); |
70 | } |
71 | catch (const DB::Exception & ex) |
72 | { |
73 | std::cerr << ex.displayText() << ", Part " << part << "skipped." << std::endl; |
74 | } |
75 | } |
76 | } |
77 | return max_block_num; |
78 | } |
79 | |
80 | |
81 | Int64 getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path) |
82 | { |
83 | Coordination::Stat stat; |
84 | zk.get(part_path, &stat); |
85 | |
86 | /// References: |
87 | /// https://stackoverflow.com/a/10347910 |
88 | /// https://bowenli86.github.io/2016/07/07/distributed%20system/zookeeper/How-does-ZooKeeper-s-persistent-sequential-id-work/ |
89 | return (stat.cversion + stat.numChildren) / 2; |
90 | } |
91 | |
92 | |
93 | std::unordered_map<std::string, Int64> getPartitionsNeedAdjustingBlockNumbers( |
94 | zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards, const std::vector<std::string> & tables) |
95 | { |
96 | std::unordered_map<std::string, Int64> result; |
97 | |
98 | std::vector<std::string> use_shards = shards.empty() ? getAllShards(zk, root) : removeNotExistingShards(zk, root, shards); |
99 | |
100 | for (const auto & shard : use_shards) |
101 | { |
102 | std::cout << "Shard: " << shard << std::endl; |
103 | std::vector<std::string> use_tables = tables.empty() ? getAllTables(zk, root, shard) : removeNotExistingTables(zk, root, shard, tables); |
104 | |
105 | for (auto table : use_tables) |
106 | { |
107 | std::cout << "\tTable: " << table << std::endl; |
108 | std::string table_path = root + "/" + shard + "/" + table; |
109 | std::string blocks_path = table_path + "/block_numbers" ; |
110 | |
111 | std::vector<std::string> partitions; |
112 | DB::MergeTreeDataFormatVersion format_version; |
113 | try |
114 | { |
115 | format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata" )).data_format_version; |
116 | partitions = zk.getChildren(blocks_path); |
117 | } |
118 | catch (const DB::Exception & ex) |
119 | { |
120 | std::cerr << ex.displayText() << ", table " << table << " skipped." << std::endl; |
121 | continue; |
122 | } |
123 | |
124 | for (auto partition : partitions) |
125 | { |
126 | try |
127 | { |
128 | std::string part_path = blocks_path + "/" + partition; |
129 | Int64 partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version); |
130 | Int64 current_block_number = getCurrentBlockNumberForPartition(zk, part_path); |
131 | if (current_block_number < partition_max_block + 1) |
132 | { |
133 | std::cout << "\t\tPartition: " << partition << ": current block_number: " << current_block_number |
134 | << ", max block number: " << partition_max_block << ". Adjusting is required." << std::endl; |
135 | result.emplace(part_path, partition_max_block); |
136 | } |
137 | } |
138 | catch (const DB::Exception & ex) |
139 | { |
140 | std::cerr << ex.displayText() << ", partition " << partition << " skipped." << std::endl; |
141 | } |
142 | } |
143 | } |
144 | } |
145 | return result; |
146 | } |
147 | |
148 | |
149 | void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, Int64 new_current_block_number) |
150 | { |
151 | Int64 current_block_number = getCurrentBlockNumberForPartition(zk, path); |
152 | |
153 | auto create_ephemeral_nodes = [&](size_t count) |
154 | { |
155 | std::string block_prefix = path + "/block-" ; |
156 | Coordination::Requests requests; |
157 | requests.reserve(count); |
158 | for (size_t i = 0; i != count; ++i) |
159 | requests.emplace_back(zkutil::makeCreateRequest(block_prefix, "" , zkutil::CreateMode::EphemeralSequential)); |
160 | auto responses = zk.multi(requests); |
161 | |
162 | std::vector<std::string> paths_created; |
163 | paths_created.reserve(responses.size()); |
164 | for (const auto & response : responses) |
165 | { |
166 | const auto * create_response = dynamic_cast<Coordination::CreateResponse*>(response.get()); |
167 | if (!create_response) |
168 | { |
169 | std::cerr << "\tCould not create ephemeral node " << block_prefix << std::endl; |
170 | return false; |
171 | } |
172 | paths_created.emplace_back(create_response->path_created); |
173 | } |
174 | |
175 | std::sort(paths_created.begin(), paths_created.end()); |
176 | for (const auto & path_created : paths_created) |
177 | { |
178 | Int64 number = DB::parse<Int64>(path_created.c_str() + block_prefix.size(), path_created.size() - block_prefix.size()); |
179 | if (number != current_block_number) |
180 | { |
181 | char suffix[11] = "" ; |
182 | sprintf(suffix, "%010lld" , current_block_number); |
183 | std::string expected_path = block_prefix + suffix; |
184 | std::cerr << "\t" << path_created << ": Ephemeral node has been created with an unexpected path (expected something like " |
185 | << expected_path << ")." << std::endl; |
186 | return false; |
187 | } |
188 | std::cout << "\t" << path_created << std::endl; |
189 | ++current_block_number; |
190 | } |
191 | |
192 | return true; |
193 | }; |
194 | |
195 | if (current_block_number >= new_current_block_number) |
196 | return; |
197 | |
198 | std::cout << "Creating ephemeral sequential nodes:" << std::endl; |
199 | create_ephemeral_nodes(1); /// Firstly try to create just a single node. |
200 | |
201 | /// Create other nodes in batches of 50 nodes. |
202 | while (current_block_number + 50 <= new_current_block_number) |
203 | create_ephemeral_nodes(50); |
204 | |
205 | create_ephemeral_nodes(new_current_block_number - current_block_number); |
206 | } |
207 | |
208 | |
209 | int main(int argc, char ** argv) |
210 | try |
211 | { |
212 | /// Parse the command line. |
213 | namespace po = boost::program_options; |
214 | po::options_description desc("Allowed options" ); |
215 | desc.add_options() |
216 | ("help,h" , "show help" ) |
217 | ("zookeeper,z" , po::value<std::string>(), "Addresses of ZooKeeper instances, comma-separated. Example: example01e.yandex.ru:2181" ) |
218 | ("path,p" , po::value<std::string>(), "[optional] Path of replica queue to insert node (without trailing slash). By default it's /clickhouse/tables" ) |
219 | ("shard,s" , po::value<std::string>(), "[optional] Shards to process, comma-separated. If not specified then the utility will process all the shards." ) |
220 | ("table,t" , po::value<std::string>(), "[optional] Tables to process, comma-separated. If not specified then the utility will process all the tables." ) |
221 | ("dry-run" , "[optional] Specify if you want this utility just to analyze block numbers without any changes." ); |
222 | |
223 | po::variables_map options; |
224 | po::store(po::parse_command_line(argc, argv, desc), options); |
225 | |
226 | auto show_usage = [&] |
227 | { |
228 | std::cout << "Usage: " << std::endl; |
229 | std::cout << " " << argv[0] << " [options]" << std::endl; |
230 | std::cout << desc << std::endl; |
231 | }; |
232 | |
233 | if (options.count("help" ) || (argc == 1)) |
234 | { |
235 | std::cout << "This utility adjusts the /block_numbers zookeeper nodes to the correct block number in partition." << std::endl; |
236 | std::cout << "It might be useful when incorrect block numbers stored in zookeeper don't allow you to insert data into a table or drop/detach a partition." << std::endl; |
237 | show_usage(); |
238 | return 0; |
239 | } |
240 | |
241 | if (!options.count("zookeeper" )) |
242 | { |
243 | std::cerr << "Option --zookeeper should be set." << std::endl; |
244 | show_usage(); |
245 | return 1; |
246 | } |
247 | |
248 | std::string root = options.count("path" ) ? options.at("path" ).as<std::string>() : "/clickhouse/tables" ; |
249 | |
250 | std::vector<std::string> shards, tables; |
251 | if (options.count("shard" )) |
252 | boost::split(shards, options.at("shard" ).as<std::string>(), boost::algorithm::is_any_of("," )); |
253 | if (options.count("table" )) |
254 | boost::split(tables, options.at("table" ).as<std::string>(), boost::algorithm::is_any_of("," )); |
255 | |
256 | /// Check if the adjusting of the block numbers is required. |
257 | std::cout << "Checking if adjusting of the block numbers is required:" << std::endl; |
258 | zkutil::ZooKeeper zookeeper(options.at("zookeeper" ).as<std::string>()); |
259 | auto part_paths_with_max_block_numbers = getPartitionsNeedAdjustingBlockNumbers(zookeeper, root, shards, tables); |
260 | |
261 | if (part_paths_with_max_block_numbers.empty()) |
262 | { |
263 | std::cout << "No adjusting required." << std::endl; |
264 | return 0; |
265 | } |
266 | |
267 | std::cout << "Required adjusting of " << part_paths_with_max_block_numbers.size() << " block numbers." << std::endl; |
268 | |
269 | /// Adjust the block numbers. |
270 | if (options.count("dry-run" )) |
271 | { |
272 | std::cout << "This is a dry-run, exiting." << std::endl; |
273 | return 0; |
274 | } |
275 | |
276 | std::cout << std::endl << "Adjusting the block numbers:" << std::endl; |
277 | for (const auto & [part_path, max_block_number] : part_paths_with_max_block_numbers) |
278 | setCurrentBlockNumber(zookeeper, part_path, max_block_number + 1); |
279 | |
280 | return 0; |
281 | } |
282 | catch (const Poco::Exception & e) |
283 | { |
284 | std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; |
285 | throw; |
286 | } |
287 | |