| 1 | #include <IO/ReadHelpers.h> | 
|---|
| 2 | #include <IO/WriteHelpers.h> | 
|---|
| 3 | #include <Common/Config/ConfigProcessor.h> | 
|---|
| 4 | #include <Common/ZooKeeper/ZooKeeper.h> | 
|---|
| 5 | #include <Common/Exception.h> | 
|---|
| 6 | #include <Common/Stopwatch.h> | 
|---|
| 7 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> | 
|---|
| 8 |  | 
|---|
| 9 | #include <ext/scope_guard.h> | 
|---|
| 10 | #include <pcg_random.hpp> | 
|---|
| 11 |  | 
|---|
| 12 | #include <iostream> | 
|---|
| 13 |  | 
|---|
| 14 |  | 
|---|
| 15 | using namespace DB; | 
|---|
| 16 |  | 
|---|
| 17 | /// This test is useful for assessing the performance of getting the numbers of all currently committing | 
|---|
| 18 | /// blocks from ZooKeeper. This is needed to select merges without checking that all block numbers between | 
|---|
| 19 | /// parts have been abandoned (see DB::ReplicatedMergeTreeMergePredicate for details). | 
|---|
| 20 | int main(int argc, char ** argv) | 
|---|
| 21 | try | 
|---|
| 22 | { | 
|---|
| 23 | if (argc != 3) | 
|---|
| 24 | { | 
|---|
| 25 | std::cerr << "usage: "<< argv[0] << " <zookeeper_config> <path_to_table>"<< std::endl; | 
|---|
| 26 | return 3; | 
|---|
| 27 | } | 
|---|
| 28 |  | 
|---|
| 29 | ConfigProcessor processor(argv[1], false, true); | 
|---|
| 30 | auto config = processor.loadConfig().configuration; | 
|---|
| 31 | String zookeeper_path = argv[2]; | 
|---|
| 32 |  | 
|---|
| 33 | auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper"); | 
|---|
| 34 |  | 
|---|
| 35 | std::unordered_map<String, std::set<Int64>> current_inserts; | 
|---|
| 36 |  | 
|---|
| 37 | Stopwatch total; | 
|---|
| 38 | Stopwatch stage; | 
|---|
| 39 | /// Load current inserts | 
|---|
| 40 | std::unordered_set<String> lock_holder_paths; | 
|---|
| 41 | for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp")) | 
|---|
| 42 | { | 
|---|
| 43 | if (startsWith(entry, "abandonable_lock-")) | 
|---|
| 44 | lock_holder_paths.insert(zookeeper_path + "/temp/"+ entry); | 
|---|
| 45 | } | 
|---|
| 46 | std::cerr << "Stage 1 (get lock holders): "<< lock_holder_paths.size() | 
|---|
| 47 | << " lock holders, elapsed: "<< stage.elapsedSeconds() << "s."<< std::endl; | 
|---|
| 48 | stage.restart(); | 
|---|
| 49 |  | 
|---|
| 50 | if (!lock_holder_paths.empty()) | 
|---|
| 51 | { | 
|---|
| 52 | Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers"); | 
|---|
| 53 | std::cerr << "Stage 2 (get partitions): "<< partitions.size() | 
|---|
| 54 | << " partitions, elapsed: "<< stage.elapsedSeconds() << "s."<< std::endl; | 
|---|
| 55 | stage.restart(); | 
|---|
| 56 |  | 
|---|
| 57 | std::vector<std::future<Coordination::ListResponse>> lock_futures; | 
|---|
| 58 | for (const String & partition : partitions) | 
|---|
| 59 | lock_futures.push_back(zookeeper->asyncGetChildren(zookeeper_path + "/block_numbers/"+ partition)); | 
|---|
| 60 |  | 
|---|
| 61 | struct BlockInfo | 
|---|
| 62 | { | 
|---|
| 63 | String partition; | 
|---|
| 64 | Int64 number; | 
|---|
| 65 | String zk_path; | 
|---|
| 66 | std::future<Coordination::GetResponse> contents_future; | 
|---|
| 67 | }; | 
|---|
| 68 |  | 
|---|
| 69 | std::vector<BlockInfo> block_infos; | 
|---|
| 70 | for (size_t i = 0; i < partitions.size(); ++i) | 
|---|
| 71 | { | 
|---|
| 72 | Strings partition_block_numbers = lock_futures[i].get().names; | 
|---|
| 73 | for (const String & entry : partition_block_numbers) | 
|---|
| 74 | { | 
|---|
| 75 | Int64 block_number = parse<Int64>(entry.substr(strlen( "block-"))); | 
|---|
| 76 | String zk_path = zookeeper_path + "/block_numbers/"+ partitions[i] + "/"+ entry; | 
|---|
| 77 | block_infos.push_back( | 
|---|
| 78 | BlockInfo{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)}); | 
|---|
| 79 | } | 
|---|
| 80 | } | 
|---|
| 81 | std::cerr << "Stage 3 (get block numbers): "<< block_infos.size() | 
|---|
| 82 | << " block numbers, elapsed: "<< stage.elapsedSeconds() << "s."<< std::endl; | 
|---|
| 83 | stage.restart(); | 
|---|
| 84 |  | 
|---|
| 85 | size_t total_count = 0; | 
|---|
| 86 | for (BlockInfo & block : block_infos) | 
|---|
| 87 | { | 
|---|
| 88 | Coordination::GetResponse resp = block.contents_future.get(); | 
|---|
| 89 | if (!resp.error && lock_holder_paths.count(resp.data)) | 
|---|
| 90 | { | 
|---|
| 91 | ++total_count; | 
|---|
| 92 | current_inserts[block.partition].insert(block.number); | 
|---|
| 93 | } | 
|---|
| 94 | } | 
|---|
| 95 | std::cerr << "Stage 4 (get block number contents): "<< total_count | 
|---|
| 96 | << " current_inserts, elapsed: "<< stage.elapsedSeconds() << "s."<< std::endl; | 
|---|
| 97 | stage.restart(); | 
|---|
| 98 | } | 
|---|
| 99 |  | 
|---|
| 100 | std::cerr << "Total elapsed: "<< total.elapsedSeconds() << "s."<< std::endl; | 
|---|
| 101 |  | 
|---|
| 102 | for (const auto & kv : current_inserts) | 
|---|
| 103 | { | 
|---|
| 104 | std::cout << kv.first << ": "; | 
|---|
| 105 | for (Int64 num : kv.second) | 
|---|
| 106 | std::cout << num << ", "; | 
|---|
| 107 | std::cout << std::endl; | 
|---|
| 108 | } | 
|---|
| 109 |  | 
|---|
| 110 | return 0; | 
|---|
| 111 | } | 
|---|
| 112 | catch (const Exception & e) | 
|---|
| 113 | { | 
|---|
| 114 | std::cerr << e.what() << ", "<< e.displayText() << ": "<< std::endl | 
|---|
| 115 | << e.getStackTrace().toString() << std::endl; | 
|---|
| 116 | throw; | 
|---|
| 117 | } | 
|---|
| 118 | catch (Poco::Exception & e) | 
|---|
| 119 | { | 
|---|
| 120 | std::cerr << "Exception: "<< e.displayText() << std::endl; | 
|---|
| 121 | throw; | 
|---|
| 122 | } | 
|---|
| 123 | catch (std::exception & e) | 
|---|
| 124 | { | 
|---|
| 125 | std::cerr << "std::exception: "<< e.what() << std::endl; | 
|---|
| 126 | throw; | 
|---|
| 127 | } | 
|---|
| 128 | catch (...) | 
|---|
| 129 | { | 
|---|
| 130 | std::cerr << "Some exception"<< std::endl; | 
|---|
| 131 | throw; | 
|---|
| 132 | } | 
|---|
| 133 |  | 
|---|