| 1 | #include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h> |
| 2 | #include <IO/WriteBufferFromString.h> |
| 3 | #include <IO/WriteHelpers.h> |
| 4 | #include <Common/Exception.h> |
| 5 | #include <Common/StringUtils/StringUtils.h> |
| 6 | #include <Common/ZooKeeper/ZooKeeper.h> |
| 7 | #include <Common/ZooKeeper/KeeperException.h> |
| 8 | |
| 9 | #include <boost/program_options.hpp> |
| 10 | |
| 11 | #include <list> |
| 12 | #include <iostream> |
| 13 | |
| 14 | |
| 15 | int main(int argc, char ** argv) |
| 16 | try |
| 17 | { |
| 18 | boost::program_options::options_description desc("Allowed options" ); |
| 19 | desc.add_options() |
| 20 | ("help,h" , "produce help message" ) |
| 21 | ("address,a" , boost::program_options::value<std::string>()->required(), |
| 22 | "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181" ) |
| 23 | ("path,p" , boost::program_options::value<std::string>()->required(), |
| 24 | "where to start" ) |
| 25 | ; |
| 26 | |
| 27 | boost::program_options::variables_map options; |
| 28 | boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); |
| 29 | |
| 30 | if (options.count("help" )) |
| 31 | { |
| 32 | std::cout << "Transform contents of part nodes in ZooKeeper to more compact storage scheme." << std::endl; |
| 33 | std::cout << "Usage: " << argv[0] << " [options]" << std::endl; |
| 34 | std::cout << desc << std::endl; |
| 35 | return 1; |
| 36 | } |
| 37 | |
| 38 | zkutil::ZooKeeper zookeeper(options.at("address" ).as<std::string>()); |
| 39 | |
| 40 | std::string initial_path = options.at("path" ).as<std::string>(); |
| 41 | |
| 42 | struct Node |
| 43 | { |
| 44 | Node( |
| 45 | std::string path_, |
| 46 | std::future<Coordination::GetResponse> get_future_, |
| 47 | std::future<Coordination::ListResponse> children_future_, |
| 48 | Node * parent_) |
| 49 | : path(std::move(path_)) |
| 50 | , get_future(std::move(get_future_)) |
| 51 | , children_future(std::move(children_future_)) |
| 52 | , parent(parent_) |
| 53 | { |
| 54 | } |
| 55 | |
| 56 | std::string path; |
| 57 | std::future<Coordination::GetResponse> get_future; |
| 58 | std::future<Coordination::ListResponse> children_future; |
| 59 | |
| 60 | Node * parent = nullptr; |
| 61 | std::future<Coordination::MultiResponse> set_future; |
| 62 | }; |
| 63 | |
| 64 | std::list<Node> nodes_queue; |
| 65 | nodes_queue.emplace_back( |
| 66 | initial_path, zookeeper.asyncGet(initial_path), zookeeper.asyncGetChildren(initial_path), nullptr); |
| 67 | |
| 68 | for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it) |
| 69 | { |
| 70 | Coordination::GetResponse get_response; |
| 71 | Coordination::ListResponse children_response; |
| 72 | try |
| 73 | { |
| 74 | get_response = it->get_future.get(); |
| 75 | children_response = it->children_future.get(); |
| 76 | } |
| 77 | catch (const Coordination::Exception & e) |
| 78 | { |
| 79 | if (e.code == Coordination::ZNONODE) |
| 80 | continue; |
| 81 | throw; |
| 82 | } |
| 83 | |
| 84 | if (get_response.stat.ephemeralOwner) |
| 85 | continue; |
| 86 | |
| 87 | if (it->path.find("/parts/" ) != std::string::npos |
| 88 | && !endsWith(it->path, "/columns" ) |
| 89 | && !endsWith(it->path, "/checksums" )) |
| 90 | { |
| 91 | if (!children_response.names.empty()) |
| 92 | { |
| 93 | auto = DB::ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes( |
| 94 | zookeeper.get(it->path + "/columns" ), zookeeper.get(it->path + "/checksums" )); |
| 95 | |
| 96 | Coordination::Requests ops; |
| 97 | ops.emplace_back(zkutil::makeRemoveRequest(it->path + "/columns" , -1)); |
| 98 | ops.emplace_back(zkutil::makeRemoveRequest(it->path + "/checksums" , -1)); |
| 99 | ops.emplace_back(zkutil::makeSetRequest(it->path, part_header.toString(), -1)); |
| 100 | |
| 101 | it->set_future = zookeeper.asyncMulti(ops); |
| 102 | } |
| 103 | } |
| 104 | else |
| 105 | { |
| 106 | for (const auto & name : children_response.names) |
| 107 | { |
| 108 | std::string child_path = it->path == "/" ? it->path + name : it->path + '/' + name; |
| 109 | nodes_queue.emplace_back( |
| 110 | child_path, zookeeper.asyncGet(child_path), zookeeper.asyncGetChildren(child_path), |
| 111 | &(*it)); |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | for (auto it = nodes_queue.begin(); it != nodes_queue.end(); ++it) |
| 117 | { |
| 118 | if (it->set_future.valid()) |
| 119 | { |
| 120 | it->set_future.get(); |
| 121 | std::cerr << it->path << " changed!" << std::endl; |
| 122 | } |
| 123 | } |
| 124 | } |
| 125 | catch (...) |
| 126 | { |
| 127 | std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; |
| 128 | throw; |
| 129 | } |
| 130 | |