1#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
2#include <IO/WriteBufferFromString.h>
3#include <IO/WriteHelpers.h>
4#include <Common/Exception.h>
5#include <Common/StringUtils/StringUtils.h>
6#include <Common/ZooKeeper/ZooKeeper.h>
7#include <Common/ZooKeeper/KeeperException.h>
8
9#include <boost/program_options.hpp>
10
11#include <list>
12#include <iostream>
13
14
15int main(int argc, char ** argv)
16try
17{
18 boost::program_options::options_description desc("Allowed options");
19 desc.add_options()
20 ("help,h", "produce help message")
21 ("address,a", boost::program_options::value<std::string>()->required(),
22 "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181")
23 ("path,p", boost::program_options::value<std::string>()->required(),
24 "where to start")
25 ;
26
27 boost::program_options::variables_map options;
28 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
29
30 if (options.count("help"))
31 {
32 std::cout << "Transform contents of part nodes in ZooKeeper to more compact storage scheme." << std::endl;
33 std::cout << "Usage: " << argv[0] << " [options]" << std::endl;
34 std::cout << desc << std::endl;
35 return 1;
36 }
37
38 zkutil::ZooKeeper zookeeper(options.at("address").as<std::string>());
39
40 std::string initial_path = options.at("path").as<std::string>();
41
42 struct Node
43 {
44 Node(
45 std::string path_,
46 std::future<Coordination::GetResponse> get_future_,
47 std::future<Coordination::ListResponse> children_future_,
48 Node * parent_)
49 : path(std::move(path_))
50 , get_future(std::move(get_future_))
51 , children_future(std::move(children_future_))
52 , parent(parent_)
53 {
54 }
55
56 std::string path;
57 std::future<Coordination::GetResponse> get_future;
58 std::future<Coordination::ListResponse> children_future;
59
60 Node * parent = nullptr;
61 std::future<Coordination::MultiResponse> set_future;
62 };
63
64 std::list<Node> nodes_queue;
65 nodes_queue.emplace_back(
66 initial_path, zookeeper.asyncGet(initial_path), zookeeper.asyncGetChildren(initial_path), nullptr);
67
68 for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it)
69 {
70 Coordination::GetResponse get_response;
71 Coordination::ListResponse children_response;
72 try
73 {
74 get_response = it->get_future.get();
75 children_response = it->children_future.get();
76 }
77 catch (const Coordination::Exception & e)
78 {
79 if (e.code == Coordination::ZNONODE)
80 continue;
81 throw;
82 }
83
84 if (get_response.stat.ephemeralOwner)
85 continue;
86
87 if (it->path.find("/parts/") != std::string::npos
88 && !endsWith(it->path, "/columns")
89 && !endsWith(it->path, "/checksums"))
90 {
91 if (!children_response.names.empty())
92 {
93 auto part_header = DB::ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(
94 zookeeper.get(it->path + "/columns"), zookeeper.get(it->path + "/checksums"));
95
96 Coordination::Requests ops;
97 ops.emplace_back(zkutil::makeRemoveRequest(it->path + "/columns", -1));
98 ops.emplace_back(zkutil::makeRemoveRequest(it->path + "/checksums", -1));
99 ops.emplace_back(zkutil::makeSetRequest(it->path, part_header.toString(), -1));
100
101 it->set_future = zookeeper.asyncMulti(ops);
102 }
103 }
104 else
105 {
106 for (const auto & name : children_response.names)
107 {
108 std::string child_path = it->path == "/" ? it->path + name : it->path + '/' + name;
109 nodes_queue.emplace_back(
110 child_path, zookeeper.asyncGet(child_path), zookeeper.asyncGetChildren(child_path),
111 &(*it));
112 }
113 }
114 }
115
116 for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it)
117 {
118 if (it->set_future.valid())
119 {
120 it->set_future.get();
121 std::cerr << it->path << " changed!" << std::endl;
122 }
123 }
124}
125catch (...)
126{
127 std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
128 throw;
129}
130