| 1 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> |
| 2 | #include <Common/ZooKeeper/KeeperException.h> |
| 3 | #include <common/logger_useful.h> |
| 4 | |
| 5 | |
| 6 | namespace DB |
| 7 | { |
| 8 | |
| 9 | namespace ErrorCodes |
| 10 | { |
| 11 | extern const int LOGICAL_ERROR; |
| 12 | } |
| 13 | |
| 14 | EphemeralLockInZooKeeper::EphemeralLockInZooKeeper( |
| 15 | const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, Coordination::Requests * precheck_ops) |
| 16 | : zookeeper(&zookeeper_), path_prefix(path_prefix_) |
| 17 | { |
| 18 | /// The /abandonable_lock- name is for backward compatibility. |
| 19 | String holder_path_prefix = temp_path + "/abandonable_lock-" ; |
| 20 | |
| 21 | /// Let's create an secondary ephemeral node. |
| 22 | if (!precheck_ops || precheck_ops->empty()) |
| 23 | { |
| 24 | holder_path = zookeeper->create(holder_path_prefix, "" , zkutil::CreateMode::EphemeralSequential); |
| 25 | } |
| 26 | else |
| 27 | { |
| 28 | precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "" , zkutil::CreateMode::EphemeralSequential)); |
| 29 | Coordination::Responses op_results = zookeeper->multi(*precheck_ops); |
| 30 | holder_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created; |
| 31 | } |
| 32 | |
| 33 | /// Write the path to the secondary node in the main node. |
| 34 | path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential); |
| 35 | |
| 36 | if (path.size() <= path_prefix.size()) |
| 37 | throw Exception("Logical error: name of the main node is shorter than prefix." , ErrorCodes::LOGICAL_ERROR); |
| 38 | } |
| 39 | |
| 40 | void EphemeralLockInZooKeeper::unlock() |
| 41 | { |
| 42 | Coordination::Requests ops; |
| 43 | getUnlockOps(ops); |
| 44 | zookeeper->multi(ops); |
| 45 | holder_path = "" ; |
| 46 | } |
| 47 | |
| 48 | void EphemeralLockInZooKeeper::getUnlockOps(Coordination::Requests & ops) |
| 49 | { |
| 50 | checkCreated(); |
| 51 | ops.emplace_back(zkutil::makeRemoveRequest(path, -1)); |
| 52 | ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1)); |
| 53 | } |
| 54 | |
| 55 | EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper() |
| 56 | { |
| 57 | if (!isCreated()) |
| 58 | return; |
| 59 | |
| 60 | try |
| 61 | { |
| 62 | unlock(); |
| 63 | } |
| 64 | catch (...) |
| 65 | { |
| 66 | tryLogCurrentException("~EphemeralLockInZooKeeper" ); |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | |
| 71 | EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions( |
| 72 | const String & block_numbers_path, const String & path_prefix, const String & temp_path, |
| 73 | zkutil::ZooKeeper & zookeeper_) |
| 74 | : zookeeper(zookeeper_) |
| 75 | { |
| 76 | std::vector<String> holders; |
| 77 | while (true) |
| 78 | { |
| 79 | Coordination::Stat partitions_stat; |
| 80 | Strings partitions = zookeeper.getChildren(block_numbers_path, &partitions_stat); |
| 81 | |
| 82 | if (holders.size() < partitions.size()) |
| 83 | { |
| 84 | std::vector<std::future<Coordination::CreateResponse>> holder_futures; |
| 85 | for (size_t i = 0; i < partitions.size() - holders.size(); ++i) |
| 86 | { |
| 87 | String path = temp_path + "/abandonable_lock-" ; |
| 88 | holder_futures.push_back(zookeeper.asyncCreate(path, {}, zkutil::CreateMode::EphemeralSequential)); |
| 89 | } |
| 90 | for (auto & future : holder_futures) |
| 91 | { |
| 92 | auto resp = future.get(); |
| 93 | holders.push_back(resp.path_created); |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | Coordination::Requests lock_ops; |
| 98 | for (size_t i = 0; i < partitions.size(); ++i) |
| 99 | { |
| 100 | String partition_path_prefix = block_numbers_path + "/" + partitions[i] + "/" + path_prefix; |
| 101 | lock_ops.push_back(zkutil::makeCreateRequest( |
| 102 | partition_path_prefix, holders[i], zkutil::CreateMode::EphemeralSequential)); |
| 103 | } |
| 104 | lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version)); |
| 105 | |
| 106 | Coordination::Responses lock_responses; |
| 107 | int rc = zookeeper.tryMulti(lock_ops, lock_responses); |
| 108 | if (rc == Coordination::ZBADVERSION) |
| 109 | { |
| 110 | LOG_TRACE(&Logger::get("EphemeralLocksInAllPartitions" ), |
| 111 | "Someone has inserted a block in a new partition while we were creating locks. Retry." ); |
| 112 | continue; |
| 113 | } |
| 114 | else if (rc != Coordination::ZOK) |
| 115 | throw Coordination::Exception(rc); |
| 116 | |
| 117 | for (size_t i = 0; i < partitions.size(); ++i) |
| 118 | { |
| 119 | size_t prefix_size = block_numbers_path.size() + 1 + partitions[i].size() + 1 + path_prefix.size(); |
| 120 | const String & path = dynamic_cast<const Coordination::CreateResponse &>(*lock_responses[i]).path_created; |
| 121 | if (path.size() <= prefix_size) |
| 122 | throw Exception("Logical error: name of the sequential node is shorter than prefix." , |
| 123 | ErrorCodes::LOGICAL_ERROR); |
| 124 | |
| 125 | UInt64 number = parse<UInt64>(path.c_str() + prefix_size, path.size() - prefix_size); |
| 126 | locks.push_back(LockInfo{path, holders[i], partitions[i], number}); |
| 127 | } |
| 128 | |
| 129 | return; |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | void EphemeralLocksInAllPartitions::unlock() |
| 134 | { |
| 135 | std::vector<zkutil::ZooKeeper::FutureMulti> futures; |
| 136 | for (const auto & lock : locks) |
| 137 | { |
| 138 | Coordination::Requests unlock_ops; |
| 139 | unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.path, -1)); |
| 140 | unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.holder_path, -1)); |
| 141 | futures.push_back(zookeeper.asyncMulti(unlock_ops)); |
| 142 | } |
| 143 | |
| 144 | for (auto & future : futures) |
| 145 | future.get(); |
| 146 | |
| 147 | locks.clear(); |
| 148 | } |
| 149 | |
| 150 | EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions() |
| 151 | { |
| 152 | try |
| 153 | { |
| 154 | unlock(); |
| 155 | } |
| 156 | catch (...) |
| 157 | { |
| 158 | tryLogCurrentException("~EphemeralLocksInAllPartitions" ); |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | } |
| 163 | |