1 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> |
2 | #include <Common/ZooKeeper/KeeperException.h> |
3 | #include <common/logger_useful.h> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | namespace ErrorCodes |
10 | { |
11 | extern const int LOGICAL_ERROR; |
12 | } |
13 | |
14 | EphemeralLockInZooKeeper::EphemeralLockInZooKeeper( |
15 | const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, Coordination::Requests * precheck_ops) |
16 | : zookeeper(&zookeeper_), path_prefix(path_prefix_) |
17 | { |
18 | /// The /abandonable_lock- name is for backward compatibility. |
19 | String holder_path_prefix = temp_path + "/abandonable_lock-" ; |
20 | |
21 | /// Let's create an secondary ephemeral node. |
22 | if (!precheck_ops || precheck_ops->empty()) |
23 | { |
24 | holder_path = zookeeper->create(holder_path_prefix, "" , zkutil::CreateMode::EphemeralSequential); |
25 | } |
26 | else |
27 | { |
28 | precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "" , zkutil::CreateMode::EphemeralSequential)); |
29 | Coordination::Responses op_results = zookeeper->multi(*precheck_ops); |
30 | holder_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created; |
31 | } |
32 | |
33 | /// Write the path to the secondary node in the main node. |
34 | path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential); |
35 | |
36 | if (path.size() <= path_prefix.size()) |
37 | throw Exception("Logical error: name of the main node is shorter than prefix." , ErrorCodes::LOGICAL_ERROR); |
38 | } |
39 | |
40 | void EphemeralLockInZooKeeper::unlock() |
41 | { |
42 | Coordination::Requests ops; |
43 | getUnlockOps(ops); |
44 | zookeeper->multi(ops); |
45 | holder_path = "" ; |
46 | } |
47 | |
48 | void EphemeralLockInZooKeeper::getUnlockOps(Coordination::Requests & ops) |
49 | { |
50 | checkCreated(); |
51 | ops.emplace_back(zkutil::makeRemoveRequest(path, -1)); |
52 | ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1)); |
53 | } |
54 | |
55 | EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper() |
56 | { |
57 | if (!isCreated()) |
58 | return; |
59 | |
60 | try |
61 | { |
62 | unlock(); |
63 | } |
64 | catch (...) |
65 | { |
66 | tryLogCurrentException("~EphemeralLockInZooKeeper" ); |
67 | } |
68 | } |
69 | |
70 | |
71 | EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions( |
72 | const String & block_numbers_path, const String & path_prefix, const String & temp_path, |
73 | zkutil::ZooKeeper & zookeeper_) |
74 | : zookeeper(zookeeper_) |
75 | { |
76 | std::vector<String> holders; |
77 | while (true) |
78 | { |
79 | Coordination::Stat partitions_stat; |
80 | Strings partitions = zookeeper.getChildren(block_numbers_path, &partitions_stat); |
81 | |
82 | if (holders.size() < partitions.size()) |
83 | { |
84 | std::vector<std::future<Coordination::CreateResponse>> holder_futures; |
85 | for (size_t i = 0; i < partitions.size() - holders.size(); ++i) |
86 | { |
87 | String path = temp_path + "/abandonable_lock-" ; |
88 | holder_futures.push_back(zookeeper.asyncCreate(path, {}, zkutil::CreateMode::EphemeralSequential)); |
89 | } |
90 | for (auto & future : holder_futures) |
91 | { |
92 | auto resp = future.get(); |
93 | holders.push_back(resp.path_created); |
94 | } |
95 | } |
96 | |
97 | Coordination::Requests lock_ops; |
98 | for (size_t i = 0; i < partitions.size(); ++i) |
99 | { |
100 | String partition_path_prefix = block_numbers_path + "/" + partitions[i] + "/" + path_prefix; |
101 | lock_ops.push_back(zkutil::makeCreateRequest( |
102 | partition_path_prefix, holders[i], zkutil::CreateMode::EphemeralSequential)); |
103 | } |
104 | lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version)); |
105 | |
106 | Coordination::Responses lock_responses; |
107 | int rc = zookeeper.tryMulti(lock_ops, lock_responses); |
108 | if (rc == Coordination::ZBADVERSION) |
109 | { |
110 | LOG_TRACE(&Logger::get("EphemeralLocksInAllPartitions" ), |
111 | "Someone has inserted a block in a new partition while we were creating locks. Retry." ); |
112 | continue; |
113 | } |
114 | else if (rc != Coordination::ZOK) |
115 | throw Coordination::Exception(rc); |
116 | |
117 | for (size_t i = 0; i < partitions.size(); ++i) |
118 | { |
119 | size_t prefix_size = block_numbers_path.size() + 1 + partitions[i].size() + 1 + path_prefix.size(); |
120 | const String & path = dynamic_cast<const Coordination::CreateResponse &>(*lock_responses[i]).path_created; |
121 | if (path.size() <= prefix_size) |
122 | throw Exception("Logical error: name of the sequential node is shorter than prefix." , |
123 | ErrorCodes::LOGICAL_ERROR); |
124 | |
125 | UInt64 number = parse<UInt64>(path.c_str() + prefix_size, path.size() - prefix_size); |
126 | locks.push_back(LockInfo{path, holders[i], partitions[i], number}); |
127 | } |
128 | |
129 | return; |
130 | } |
131 | } |
132 | |
133 | void EphemeralLocksInAllPartitions::unlock() |
134 | { |
135 | std::vector<zkutil::ZooKeeper::FutureMulti> futures; |
136 | for (const auto & lock : locks) |
137 | { |
138 | Coordination::Requests unlock_ops; |
139 | unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.path, -1)); |
140 | unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.holder_path, -1)); |
141 | futures.push_back(zookeeper.asyncMulti(unlock_ops)); |
142 | } |
143 | |
144 | for (auto & future : futures) |
145 | future.get(); |
146 | |
147 | locks.clear(); |
148 | } |
149 | |
150 | EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions() |
151 | { |
152 | try |
153 | { |
154 | unlock(); |
155 | } |
156 | catch (...) |
157 | { |
158 | tryLogCurrentException("~EphemeralLocksInAllPartitions" ); |
159 | } |
160 | } |
161 | |
162 | } |
163 | |