1 | #include <IO/ReadHelpers.h> |
2 | #include <IO/WriteHelpers.h> |
3 | #include <Common/Config/ConfigProcessor.h> |
4 | #include <Common/ZooKeeper/ZooKeeper.h> |
5 | #include <Common/Exception.h> |
6 | #include <Common/Stopwatch.h> |
7 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> |
8 | |
9 | #include <ext/scope_guard.h> |
10 | #include <pcg_random.hpp> |
11 | |
12 | #include <iostream> |
13 | |
14 | |
15 | using namespace DB; |
16 | |
17 | /// This test is useful for assessing the performance of getting the numbers of all currently committing |
18 | /// blocks from ZooKeeper. This is needed to select merges without checking that all block numbers between |
19 | /// parts have been abandoned (see DB::ReplicatedMergeTreeMergePredicate for details). |
20 | int main(int argc, char ** argv) |
21 | try |
22 | { |
23 | if (argc != 3) |
24 | { |
25 | std::cerr << "usage: " << argv[0] << " <zookeeper_config> <path_to_table>" << std::endl; |
26 | return 3; |
27 | } |
28 | |
29 | ConfigProcessor processor(argv[1], false, true); |
30 | auto config = processor.loadConfig().configuration; |
31 | String zookeeper_path = argv[2]; |
32 | |
33 | auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper" ); |
34 | |
35 | std::unordered_map<String, std::set<Int64>> current_inserts; |
36 | |
37 | Stopwatch total; |
38 | Stopwatch stage; |
39 | /// Load current inserts |
40 | std::unordered_set<String> lock_holder_paths; |
41 | for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp" )) |
42 | { |
43 | if (startsWith(entry, "abandonable_lock-" )) |
44 | lock_holder_paths.insert(zookeeper_path + "/temp/" + entry); |
45 | } |
46 | std::cerr << "Stage 1 (get lock holders): " << lock_holder_paths.size() |
47 | << " lock holders, elapsed: " << stage.elapsedSeconds() << "s." << std::endl; |
48 | stage.restart(); |
49 | |
50 | if (!lock_holder_paths.empty()) |
51 | { |
52 | Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers" ); |
53 | std::cerr << "Stage 2 (get partitions): " << partitions.size() |
54 | << " partitions, elapsed: " << stage.elapsedSeconds() << "s." << std::endl; |
55 | stage.restart(); |
56 | |
57 | std::vector<std::future<Coordination::ListResponse>> lock_futures; |
58 | for (const String & partition : partitions) |
59 | lock_futures.push_back(zookeeper->asyncGetChildren(zookeeper_path + "/block_numbers/" + partition)); |
60 | |
61 | struct BlockInfo |
62 | { |
63 | String partition; |
64 | Int64 number; |
65 | String zk_path; |
66 | std::future<Coordination::GetResponse> contents_future; |
67 | }; |
68 | |
69 | std::vector<BlockInfo> block_infos; |
70 | for (size_t i = 0; i < partitions.size(); ++i) |
71 | { |
72 | Strings partition_block_numbers = lock_futures[i].get().names; |
73 | for (const String & entry : partition_block_numbers) |
74 | { |
75 | Int64 block_number = parse<Int64>(entry.substr(strlen("block-" ))); |
76 | String zk_path = zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry; |
77 | block_infos.push_back( |
78 | BlockInfo{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)}); |
79 | } |
80 | } |
81 | std::cerr << "Stage 3 (get block numbers): " << block_infos.size() |
82 | << " block numbers, elapsed: " << stage.elapsedSeconds() << "s." << std::endl; |
83 | stage.restart(); |
84 | |
85 | size_t total_count = 0; |
86 | for (BlockInfo & block : block_infos) |
87 | { |
88 | Coordination::GetResponse resp = block.contents_future.get(); |
89 | if (!resp.error && lock_holder_paths.count(resp.data)) |
90 | { |
91 | ++total_count; |
92 | current_inserts[block.partition].insert(block.number); |
93 | } |
94 | } |
95 | std::cerr << "Stage 4 (get block number contents): " << total_count |
96 | << " current_inserts, elapsed: " << stage.elapsedSeconds() << "s." << std::endl; |
97 | stage.restart(); |
98 | } |
99 | |
100 | std::cerr << "Total elapsed: " << total.elapsedSeconds() << "s." << std::endl; |
101 | |
102 | for (const auto & kv : current_inserts) |
103 | { |
104 | std::cout << kv.first << ": " ; |
105 | for (Int64 num : kv.second) |
106 | std::cout << num << ", " ; |
107 | std::cout << std::endl; |
108 | } |
109 | |
110 | return 0; |
111 | } |
112 | catch (const Exception & e) |
113 | { |
114 | std::cerr << e.what() << ", " << e.displayText() << ": " << std::endl |
115 | << e.getStackTrace().toString() << std::endl; |
116 | throw; |
117 | } |
118 | catch (Poco::Exception & e) |
119 | { |
120 | std::cerr << "Exception: " << e.displayText() << std::endl; |
121 | throw; |
122 | } |
123 | catch (std::exception & e) |
124 | { |
125 | std::cerr << "std::exception: " << e.what() << std::endl; |
126 | throw; |
127 | } |
128 | catch (...) |
129 | { |
130 | std::cerr << "Some exception" << std::endl; |
131 | throw; |
132 | } |
133 | |