1 | #include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h> |
2 | #include <IO/WriteBufferFromString.h> |
3 | #include <IO/WriteHelpers.h> |
4 | #include <Common/Exception.h> |
5 | #include <Common/StringUtils/StringUtils.h> |
6 | #include <Common/ZooKeeper/ZooKeeper.h> |
7 | #include <Common/ZooKeeper/KeeperException.h> |
8 | |
9 | #include <boost/program_options.hpp> |
10 | |
11 | #include <list> |
12 | #include <iostream> |
13 | |
14 | |
15 | int main(int argc, char ** argv) |
16 | try |
17 | { |
18 | boost::program_options::options_description desc("Allowed options" ); |
19 | desc.add_options() |
20 | ("help,h" , "produce help message" ) |
21 | ("address,a" , boost::program_options::value<std::string>()->required(), |
22 | "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181" ) |
23 | ("path,p" , boost::program_options::value<std::string>()->required(), |
24 | "where to start" ) |
25 | ; |
26 | |
27 | boost::program_options::variables_map options; |
28 | boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); |
29 | |
30 | if (options.count("help" )) |
31 | { |
32 | std::cout << "Transform contents of part nodes in ZooKeeper to more compact storage scheme." << std::endl; |
33 | std::cout << "Usage: " << argv[0] << " [options]" << std::endl; |
34 | std::cout << desc << std::endl; |
35 | return 1; |
36 | } |
37 | |
38 | zkutil::ZooKeeper zookeeper(options.at("address" ).as<std::string>()); |
39 | |
40 | std::string initial_path = options.at("path" ).as<std::string>(); |
41 | |
42 | struct Node |
43 | { |
44 | Node( |
45 | std::string path_, |
46 | std::future<Coordination::GetResponse> get_future_, |
47 | std::future<Coordination::ListResponse> children_future_, |
48 | Node * parent_) |
49 | : path(std::move(path_)) |
50 | , get_future(std::move(get_future_)) |
51 | , children_future(std::move(children_future_)) |
52 | , parent(parent_) |
53 | { |
54 | } |
55 | |
56 | std::string path; |
57 | std::future<Coordination::GetResponse> get_future; |
58 | std::future<Coordination::ListResponse> children_future; |
59 | |
60 | Node * parent = nullptr; |
61 | std::future<Coordination::MultiResponse> set_future; |
62 | }; |
63 | |
64 | std::list<Node> nodes_queue; |
65 | nodes_queue.emplace_back( |
66 | initial_path, zookeeper.asyncGet(initial_path), zookeeper.asyncGetChildren(initial_path), nullptr); |
67 | |
68 | for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it) |
69 | { |
70 | Coordination::GetResponse get_response; |
71 | Coordination::ListResponse children_response; |
72 | try |
73 | { |
74 | get_response = it->get_future.get(); |
75 | children_response = it->children_future.get(); |
76 | } |
77 | catch (const Coordination::Exception & e) |
78 | { |
79 | if (e.code == Coordination::ZNONODE) |
80 | continue; |
81 | throw; |
82 | } |
83 | |
84 | if (get_response.stat.ephemeralOwner) |
85 | continue; |
86 | |
87 | if (it->path.find("/parts/" ) != std::string::npos |
88 | && !endsWith(it->path, "/columns" ) |
89 | && !endsWith(it->path, "/checksums" )) |
90 | { |
91 | if (!children_response.names.empty()) |
92 | { |
93 | auto = DB::ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes( |
94 | zookeeper.get(it->path + "/columns" ), zookeeper.get(it->path + "/checksums" )); |
95 | |
96 | Coordination::Requests ops; |
97 | ops.emplace_back(zkutil::makeRemoveRequest(it->path + "/columns" , -1)); |
98 | ops.emplace_back(zkutil::makeRemoveRequest(it->path + "/checksums" , -1)); |
99 | ops.emplace_back(zkutil::makeSetRequest(it->path, part_header.toString(), -1)); |
100 | |
101 | it->set_future = zookeeper.asyncMulti(ops); |
102 | } |
103 | } |
104 | else |
105 | { |
106 | for (const auto & name : children_response.names) |
107 | { |
108 | std::string child_path = it->path == "/" ? it->path + name : it->path + '/' + name; |
109 | nodes_queue.emplace_back( |
110 | child_path, zookeeper.asyncGet(child_path), zookeeper.asyncGetChildren(child_path), |
111 | &(*it)); |
112 | } |
113 | } |
114 | } |
115 | |
116 | for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it) |
117 | { |
118 | if (it->set_future.valid()) |
119 | { |
120 | it->set_future.get(); |
121 | std::cerr << it->path << " changed!" << std::endl; |
122 | } |
123 | } |
124 | } |
125 | catch (...) |
126 | { |
127 | std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; |
128 | throw; |
129 | } |
130 | |