1#include <IO/ReadHelpers.h>
2#include <IO/WriteHelpers.h>
3#include <Common/Config/ConfigProcessor.h>
4#include <Common/ZooKeeper/ZooKeeper.h>
5#include <Common/Exception.h>
6#include <Common/Stopwatch.h>
7#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
8
9#include <ext/scope_guard.h>
10#include <pcg_random.hpp>
11
12#include <iostream>
13
14
15using namespace DB;
16
17/// This test is useful for assessing the performance of getting the numbers of all currently committing
18/// blocks from ZooKeeper. This is needed to select merges without checking that all block numbers between
19/// parts have been abandoned (see DB::ReplicatedMergeTreeMergePredicate for details).
20int main(int argc, char ** argv)
21try
22{
23 if (argc != 3)
24 {
25 std::cerr << "usage: " << argv[0] << " <zookeeper_config> <path_to_table>" << std::endl;
26 return 3;
27 }
28
29 ConfigProcessor processor(argv[1], false, true);
30 auto config = processor.loadConfig().configuration;
31 String zookeeper_path = argv[2];
32
33 auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper");
34
35 std::unordered_map<String, std::set<Int64>> current_inserts;
36
37 Stopwatch total;
38 Stopwatch stage;
39 /// Load current inserts
40 std::unordered_set<String> lock_holder_paths;
41 for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp"))
42 {
43 if (startsWith(entry, "abandonable_lock-"))
44 lock_holder_paths.insert(zookeeper_path + "/temp/" + entry);
45 }
46 std::cerr << "Stage 1 (get lock holders): " << lock_holder_paths.size()
47 << " lock holders, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
48 stage.restart();
49
50 if (!lock_holder_paths.empty())
51 {
52 Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
53 std::cerr << "Stage 2 (get partitions): " << partitions.size()
54 << " partitions, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
55 stage.restart();
56
57 std::vector<std::future<Coordination::ListResponse>> lock_futures;
58 for (const String & partition : partitions)
59 lock_futures.push_back(zookeeper->asyncGetChildren(zookeeper_path + "/block_numbers/" + partition));
60
61 struct BlockInfo
62 {
63 String partition;
64 Int64 number;
65 String zk_path;
66 std::future<Coordination::GetResponse> contents_future;
67 };
68
69 std::vector<BlockInfo> block_infos;
70 for (size_t i = 0; i < partitions.size(); ++i)
71 {
72 Strings partition_block_numbers = lock_futures[i].get().names;
73 for (const String & entry : partition_block_numbers)
74 {
75 Int64 block_number = parse<Int64>(entry.substr(strlen("block-")));
76 String zk_path = zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry;
77 block_infos.push_back(
78 BlockInfo{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)});
79 }
80 }
81 std::cerr << "Stage 3 (get block numbers): " << block_infos.size()
82 << " block numbers, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
83 stage.restart();
84
85 size_t total_count = 0;
86 for (BlockInfo & block : block_infos)
87 {
88 Coordination::GetResponse resp = block.contents_future.get();
89 if (!resp.error && lock_holder_paths.count(resp.data))
90 {
91 ++total_count;
92 current_inserts[block.partition].insert(block.number);
93 }
94 }
95 std::cerr << "Stage 4 (get block number contents): " << total_count
96 << " current_inserts, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
97 stage.restart();
98 }
99
100 std::cerr << "Total elapsed: " << total.elapsedSeconds() << "s." << std::endl;
101
102 for (const auto & kv : current_inserts)
103 {
104 std::cout << kv.first << ": ";
105 for (Int64 num : kv.second)
106 std::cout << num << ", ";
107 std::cout << std::endl;
108 }
109
110 return 0;
111}
112catch (const Exception & e)
113{
114 std::cerr << e.what() << ", " << e.displayText() << ": " << std::endl
115 << e.getStackTrace().toString() << std::endl;
116 throw;
117}
118catch (Poco::Exception & e)
119{
120 std::cerr << "Exception: " << e.displayText() << std::endl;
121 throw;
122}
123catch (std::exception & e)
124{
125 std::cerr << "std::exception: " << e.what() << std::endl;
126 throw;
127}
128catch (...)
129{
130 std::cerr << "Some exception" << std::endl;
131 throw;
132}
133