1#pragma once
2
3#include "Types.h"
4#include <Poco/Util/LayeredConfiguration.h>
5#include <unordered_set>
6#include <future>
7#include <memory>
8#include <mutex>
9#include <string>
10#include <common/logger_useful.h>
11#include <Common/ProfileEvents.h>
12#include <Common/CurrentMetrics.h>
13#include <Common/ZooKeeper/IKeeper.h>
14#include <port/unistd.h>
15
16
17namespace ProfileEvents
18{
19 extern const Event CannotRemoveEphemeralNode;
20}
21
22namespace CurrentMetrics
23{
24 extern const Metric EphemeralNode;
25}
26
27
28namespace zkutil
29{
30
31const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
32const UInt32 DEFAULT_OPERATION_TIMEOUT = 10000;
33
34/// Preferred size of multi() command (in number of ops)
35constexpr size_t MULTI_BATCH_SIZE = 100;
36
37
38/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
39///
40/// Poco::Event objects are used for watches. The event is set only once on the first
41/// watch notification.
42/// Callback-based watch interface is also provided.
43///
44/// Read-only methods retry retry_num times if recoverable errors like OperationTimeout
45/// or ConnectionLoss are encountered.
46///
47/// Modifying methods do not retry, because it leads to problems of the double-delete type.
48///
49/// Methods with names not starting at try- raise KeeperException on any error.
50class ZooKeeper
51{
52public:
53 using Ptr = std::shared_ptr<ZooKeeper>;
54
55 ZooKeeper(const std::string & hosts_, const std::string & identity_ = "",
56 int32_t session_timeout_ms_ = DEFAULT_SESSION_TIMEOUT,
57 int32_t operation_timeout_ms_ = DEFAULT_OPERATION_TIMEOUT,
58 const std::string & chroot_ = "",
59 const std::string & implementation = "zookeeper");
60
61 /** Config of the form:
62 <zookeeper>
63 <node>
64 <host>example1</host>
65 <port>2181</port>
66 </node>
67 <node>
68 <host>example2</host>
69 <port>2181</port>
70 </node>
71 <session_timeout_ms>30000</session_timeout_ms>
72 <operation_timeout_ms>10000</operation_timeout_ms>
73 <!-- Optional. Chroot suffix. Should exist. -->
74 <root>/path/to/zookeeper/node</root>
75 <!-- Optional. Zookeeper digest ACL string. -->
76 <identity>user:password</identity>
77 </zookeeper>
78 */
79 ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
80
81 /// Creates a new session with the same parameters. This method can be used for reconnecting
82 /// after the session has expired.
83 /// This object remains unchanged, and the new session is returned.
84 Ptr startNewSession() const;
85
86 /// Returns true, if the session has expired.
87 bool expired();
88
89 /// Create a znode.
90 /// Throw an exception if something went wrong.
91 std::string create(const std::string & path, const std::string & data, int32_t mode);
92
93 /// Does not throw in the following cases:
94 /// * The parent for the created node does not exist
95 /// * The parent is ephemeral.
96 /// * The node already exists.
97 /// In case of other errors throws an exception.
98 int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
99 int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode);
100
101 /// Create a Persistent node.
102 /// Does nothing if the node already exists.
103 void createIfNotExists(const std::string & path, const std::string & data);
104
105 /// Creates all non-existent ancestors of the given path with empty contents.
106 /// Does not create the node itself.
107 void createAncestors(const std::string & path);
108
109 /// Remove the node if the version matches. (if version == -1, remove any version).
110 void remove(const std::string & path, int32_t version = -1);
111
112 /// Doesn't throw in the following cases:
113 /// * The node doesn't exist
114 /// * Versions don't match
115 /// * The node has children.
116 int32_t tryRemove(const std::string & path, int32_t version = -1);
117
118 bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
119 bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
120
121 std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
122 std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
123
124 /// Doesn't not throw in the following cases:
125 /// * The node doesn't exist. Returns false in this case.
126 bool tryGet(const std::string & path, std::string & res, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr);
127
128 bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * code = nullptr);
129
130 void set(const std::string & path, const std::string & data,
131 int32_t version = -1, Coordination::Stat * stat = nullptr);
132
133 /// Creates the node if it doesn't exist. Updates its contents otherwise.
134 void createOrUpdate(const std::string & path, const std::string & data, int32_t mode);
135
136 /// Doesn't not throw in the following cases:
137 /// * The node doesn't exist.
138 /// * Versions do not match.
139 int32_t trySet(const std::string & path, const std::string & data,
140 int32_t version = -1, Coordination::Stat * stat = nullptr);
141
142 Strings getChildren(const std::string & path,
143 Coordination::Stat * stat = nullptr,
144 const EventPtr & watch = nullptr);
145
146 Strings getChildrenWatch(const std::string & path,
147 Coordination::Stat * stat,
148 Coordination::WatchCallback watch_callback);
149
150 /// Doesn't not throw in the following cases:
151 /// * The node doesn't exist.
152 int32_t tryGetChildren(const std::string & path, Strings & res,
153 Coordination::Stat * stat = nullptr,
154 const EventPtr & watch = nullptr);
155
156 int32_t tryGetChildrenWatch(const std::string & path, Strings & res,
157 Coordination::Stat * stat,
158 Coordination::WatchCallback watch_callback);
159
160 /// Performs several operations in a transaction.
161 /// Throws on every error.
162 Coordination::Responses multi(const Coordination::Requests & requests);
163 /// Throws only if some operation has returned an "unexpected" error
164 /// - an error that would cause the corresponding try- method to throw.
165 int32_t tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses);
166 /// Throws nothing (even session expired errors)
167 int32_t tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
168
169 Int64 getClientID();
170
171 /// Remove the node with the subtree. If someone concurrently adds or removes a node
172 /// in the subtree, the result is undefined.
173 void removeRecursive(const std::string & path);
174
175 /// Remove the node with the subtree. If someone concurrently removes a node in the subtree,
176 /// this will not cause errors.
177 /// For instance, you can call this method twice concurrently for the same node and the end
178 /// result would be the same as for the single call.
179 void tryRemoveRecursive(const std::string & path);
180
181 /// Remove all children nodes (non recursive).
182 void removeChildren(const std::string & path);
183
184 /// Wait for the node to disappear or return immediately if it doesn't exist.
185 void waitForDisappear(const std::string & path);
186
187 /// Async interface (a small subset of operations is implemented).
188 ///
189 /// Usage:
190 ///
191 /// // Non-blocking calls:
192 /// auto future1 = zk.asyncGet("/path1");
193 /// auto future2 = zk.asyncGet("/path2");
194 /// ...
195 ///
196 /// // These calls can block until the operations are completed:
197 /// auto result1 = future1.get();
198 /// auto result2 = future2.get();
199 ///
200 /// Future should not be destroyed before the result is gotten.
201
202 using FutureCreate = std::future<Coordination::CreateResponse>;
203 FutureCreate asyncCreate(const std::string & path, const std::string & data, int32_t mode);
204
205 using FutureGet = std::future<Coordination::GetResponse>;
206 FutureGet asyncGet(const std::string & path);
207
208 FutureGet asyncTryGet(const std::string & path);
209
210 using FutureExists = std::future<Coordination::ExistsResponse>;
211 FutureExists asyncExists(const std::string & path);
212
213 using FutureGetChildren = std::future<Coordination::ListResponse>;
214 FutureGetChildren asyncGetChildren(const std::string & path);
215
216 using FutureSet = std::future<Coordination::SetResponse>;
217 FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1);
218
219 using FutureRemove = std::future<Coordination::RemoveResponse>;
220 FutureRemove asyncRemove(const std::string & path, int32_t version = -1);
221
222 /// Doesn't throw in the following cases:
223 /// * The node doesn't exist
224 /// * The versions do not match
225 /// * The node has children
226 FutureRemove asyncTryRemove(const std::string & path, int32_t version = -1);
227
228 using FutureMulti = std::future<Coordination::MultiResponse>;
229 FutureMulti asyncMulti(const Coordination::Requests & ops);
230
231 /// Like the previous one but don't throw any exceptions on future.get()
232 FutureMulti tryAsyncMulti(const Coordination::Requests & ops);
233
234 static std::string error2string(int32_t code);
235
236private:
237 friend class EphemeralNodeHolder;
238
239 void init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
240 int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
241
242 void removeChildrenRecursive(const std::string & path);
243 void tryRemoveChildrenRecursive(const std::string & path);
244
245 /// The following methods don't throw exceptions but return error codes.
246 int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
247 int32_t removeImpl(const std::string & path, int32_t version);
248 int32_t getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
249 int32_t setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
250 int32_t getChildrenImpl(const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
251 int32_t multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
252 int32_t existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
253
254 std::unique_ptr<Coordination::IKeeper> impl;
255
256 std::string hosts;
257 std::string identity;
258 int32_t session_timeout_ms;
259 int32_t operation_timeout_ms;
260 std::string chroot;
261
262 std::mutex mutex;
263
264 Logger * log = nullptr;
265};
266
267
268using ZooKeeperPtr = ZooKeeper::Ptr;
269
270
271/// Creates an ephemeral node in the constructor, removes it in the destructor.
272class EphemeralNodeHolder
273{
274public:
275 using Ptr = std::shared_ptr<EphemeralNodeHolder>;
276
277 EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
278 : path(path_), zookeeper(zookeeper_)
279 {
280 if (create)
281 path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
282 }
283
284 std::string getPath() const
285 {
286 return path;
287 }
288
289 static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
290 {
291 return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, data);
292 }
293
294 static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
295 {
296 return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, true, data);
297 }
298
299 static Ptr existing(const std::string & path, ZooKeeper & zookeeper)
300 {
301 return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, "");
302 }
303
304 ~EphemeralNodeHolder()
305 {
306 try
307 {
308 zookeeper.tryRemove(path);
309 }
310 catch (...)
311 {
312 ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
313 DB::tryLogCurrentException(__PRETTY_FUNCTION__);
314 }
315 }
316
317private:
318 std::string path;
319 ZooKeeper & zookeeper;
320 CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode};
321};
322
323using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
324}
325