1#pragma once
2
3#include <Core/Types.h>
4#include <Common/Exception.h>
5
6#include <vector>
7#include <memory>
8#include <cstdint>
9#include <functional>
10
11/** Generic interface for ZooKeeper-like services.
12 * Possible examples are:
13 * - ZooKeeper client itself;
14 * - fake ZooKeeper client for testing;
15 * - ZooKeeper emulation layer on top of Etcd, FoundationDB, whatever.
16 */
17
18
19namespace Coordination
20{
21
22using namespace DB;
23
24
25struct ACL
26{
27 static constexpr int32_t Read = 1;
28 static constexpr int32_t Write = 2;
29 static constexpr int32_t Create = 4;
30 static constexpr int32_t Delete = 8;
31 static constexpr int32_t Admin = 16;
32 static constexpr int32_t All = 0x1F;
33
34 int32_t permissions;
35 String scheme;
36 String id;
37};
38
39using ACLs = std::vector<ACL>;
40
41struct Stat
42{
43 int64_t czxid;
44 int64_t mzxid;
45 int64_t ctime;
46 int64_t mtime;
47 int32_t version;
48 int32_t cversion;
49 int32_t aversion;
50 int64_t ephemeralOwner;
51 int32_t dataLength;
52 int32_t numChildren;
53 int64_t pzxid;
54};
55
56struct Request;
57using RequestPtr = std::shared_ptr<Request>;
58using Requests = std::vector<RequestPtr>;
59
60struct Request
61{
62 Request() = default;
63 Request(const Request &) = default;
64 Request & operator=(const Request &) = default;
65 virtual ~Request() = default;
66 virtual String getPath() const = 0;
67 virtual void addRootPath(const String & /* root_path */) {}
68};
69
70struct Response;
71using ResponsePtr = std::shared_ptr<Response>;
72using Responses = std::vector<ResponsePtr>;
73using ResponseCallback = std::function<void(const Response &)>;
74
75struct Response
76{
77 int32_t error = 0;
78 Response() = default;
79 Response(const Response &) = default;
80 Response & operator=(const Response &) = default;
81 virtual ~Response() = default;
82 virtual void removeRootPath(const String & /* root_path */) {}
83};
84
85struct WatchResponse : virtual Response
86{
87 int32_t type = 0;
88 int32_t state = 0;
89 String path;
90
91 void removeRootPath(const String & root_path) override;
92};
93
94using WatchCallback = std::function<void(const WatchResponse &)>;
95
96struct CreateRequest : virtual Request
97{
98 String path;
99 String data;
100 bool is_ephemeral = false;
101 bool is_sequential = false;
102 ACLs acls;
103
104 void addRootPath(const String & root_path) override;
105 String getPath() const override { return path; }
106};
107
108struct CreateResponse : virtual Response
109{
110 String path_created;
111
112 void removeRootPath(const String & root_path) override;
113};
114
115struct RemoveRequest : virtual Request
116{
117 String path;
118 int32_t version = -1;
119
120 void addRootPath(const String & root_path) override;
121 String getPath() const override { return path; }
122};
123
124struct RemoveResponse : virtual Response
125{
126};
127
128struct ExistsRequest : virtual Request
129{
130 String path;
131
132 void addRootPath(const String & root_path) override;
133 String getPath() const override { return path; }
134};
135
136struct ExistsResponse : virtual Response
137{
138 Stat stat;
139};
140
141struct GetRequest : virtual Request
142{
143 String path;
144
145 void addRootPath(const String & root_path) override;
146 String getPath() const override { return path; }
147};
148
149struct GetResponse : virtual Response
150{
151 String data;
152 Stat stat;
153};
154
155struct SetRequest : virtual Request
156{
157 String path;
158 String data;
159 int32_t version = -1;
160
161 void addRootPath(const String & root_path) override;
162 String getPath() const override { return path; }
163};
164
165struct SetResponse : virtual Response
166{
167 Stat stat;
168};
169
170struct ListRequest : virtual Request
171{
172 String path;
173
174 void addRootPath(const String & root_path) override;
175 String getPath() const override { return path; }
176};
177
178struct ListResponse : virtual Response
179{
180 std::vector<String> names;
181 Stat stat;
182};
183
184struct CheckRequest : virtual Request
185{
186 String path;
187 int32_t version = -1;
188
189 void addRootPath(const String & root_path) override;
190 String getPath() const override { return path; }
191};
192
193struct CheckResponse : virtual Response
194{
195};
196
197struct MultiRequest : virtual Request
198{
199 Requests requests;
200
201 void addRootPath(const String & root_path) override;
202 String getPath() const override { return {}; }
203};
204
205struct MultiResponse : virtual Response
206{
207 Responses responses;
208
209 void removeRootPath(const String & root_path) override;
210};
211
212/// This response may be received only as an element of responses in MultiResponse.
213struct ErrorResponse : virtual Response
214{
215};
216
217
218using CreateCallback = std::function<void(const CreateResponse &)>;
219using RemoveCallback = std::function<void(const RemoveResponse &)>;
220using ExistsCallback = std::function<void(const ExistsResponse &)>;
221using GetCallback = std::function<void(const GetResponse &)>;
222using SetCallback = std::function<void(const SetResponse &)>;
223using ListCallback = std::function<void(const ListResponse &)>;
224using CheckCallback = std::function<void(const CheckResponse &)>;
225using MultiCallback = std::function<void(const MultiResponse &)>;
226
227
228enum Error
229{
230 ZOK = 0,
231
232 /** System and server-side errors.
233 * This is never thrown by the server, it shouldn't be used other than
234 * to indicate a range. Specifically error codes greater than this
235 * value, but lesser than ZAPIERROR, are system errors.
236 */
237 ZSYSTEMERROR = -1,
238
239 ZRUNTIMEINCONSISTENCY = -2, /// A runtime inconsistency was found
240 ZDATAINCONSISTENCY = -3, /// A data inconsistency was found
241 ZCONNECTIONLOSS = -4, /// Connection to the server has been lost
242 ZMARSHALLINGERROR = -5, /// Error while marshalling or unmarshalling data
243 ZUNIMPLEMENTED = -6, /// Operation is unimplemented
244 ZOPERATIONTIMEOUT = -7, /// Operation timeout
245 ZBADARGUMENTS = -8, /// Invalid arguments
246 ZINVALIDSTATE = -9, /// Invliad zhandle state
247
248 /** API errors.
249 * This is never thrown by the server, it shouldn't be used other than
250 * to indicate a range. Specifically error codes greater than this
251 * value are API errors.
252 */
253 ZAPIERROR = -100,
254
255 ZNONODE = -101, /// Node does not exist
256 ZNOAUTH = -102, /// Not authenticated
257 ZBADVERSION = -103, /// Version conflict
258 ZNOCHILDRENFOREPHEMERALS = -108, /// Ephemeral nodes may not have children
259 ZNODEEXISTS = -110, /// The node already exists
260 ZNOTEMPTY = -111, /// The node has children
261 ZSESSIONEXPIRED = -112, /// The session has been expired by the server
262 ZINVALIDCALLBACK = -113, /// Invalid callback specified
263 ZINVALIDACL = -114, /// Invalid ACL specified
264 ZAUTHFAILED = -115, /// Client authentication failed
265 ZCLOSING = -116, /// ZooKeeper is closing
266 ZNOTHING = -117, /// (not error) no server responses to process
267 ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
268};
269
270/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
271bool isHardwareError(int32_t code);
272
273/// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here.
274bool isUserError(int32_t code);
275
276const char * errorMessage(int32_t code);
277
278/// For watches.
279enum State
280{
281 EXPIRED_SESSION = -112,
282 AUTH_FAILED = -113,
283 CONNECTING = 1,
284 ASSOCIATING = 2,
285 CONNECTED = 3,
286 NOTCONNECTED = 999
287};
288
289enum Event
290{
291 CREATED = 1,
292 DELETED = 2,
293 CHANGED = 3,
294 CHILD = 4,
295 SESSION = -1,
296 NOTWATCHING = -2
297};
298
299
300class Exception : public DB::Exception
301{
302private:
303 /// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
304 Exception(const std::string & msg, const int32_t code_, int);
305
306public:
307 explicit Exception(const int32_t code_);
308 Exception(const std::string & msg, const int32_t code_);
309 Exception(const int32_t code_, const std::string & path);
310 Exception(const Exception & exc);
311
312 const char * name() const throw() override { return "Coordination::Exception"; }
313 const char * className() const throw() override { return "Coordination::Exception"; }
314 Exception * clone() const override { return new Exception(*this); }
315
316 const int32_t code;
317};
318
319
320/** Usage scenario:
321 * - create an object and issue commands;
322 * - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
323 * for example, just signal a condvar / fulfull a promise.
324 * - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
325 * - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
326 * the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
327 * - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event.
328 * - data for callbacks must be alive when ZooKeeper instance is alive.
329 */
330class IKeeper
331{
332public:
333 virtual ~IKeeper() {}
334
335 /// If expired, you can only destroy the object. All other methods will throw exception.
336 virtual bool isExpired() const = 0;
337
338 /// Useful to check owner of ephemeral node.
339 virtual int64_t getSessionID() const = 0;
340
341 /// If the method will throw an exception, callbacks won't be called.
342 ///
343 /// After the method is executed successfully, you must wait for callbacks
344 /// (don't destroy callback data before it will be called).
345 ///
346 /// All callbacks are executed sequentially (the execution of callbacks is serialized).
347 ///
348 /// If an exception is thrown inside the callback, the session will expire,
349 /// and all other callbacks will be called with "Session expired" error.
350
351 virtual void create(
352 const String & path,
353 const String & data,
354 bool is_ephemeral,
355 bool is_sequential,
356 const ACLs & acls,
357 CreateCallback callback) = 0;
358
359 virtual void remove(
360 const String & path,
361 int32_t version,
362 RemoveCallback callback) = 0;
363
364 virtual void exists(
365 const String & path,
366 ExistsCallback callback,
367 WatchCallback watch) = 0;
368
369 virtual void get(
370 const String & path,
371 GetCallback callback,
372 WatchCallback watch) = 0;
373
374 virtual void set(
375 const String & path,
376 const String & data,
377 int32_t version,
378 SetCallback callback) = 0;
379
380 virtual void list(
381 const String & path,
382 ListCallback callback,
383 WatchCallback watch) = 0;
384
385 virtual void check(
386 const String & path,
387 int32_t version,
388 CheckCallback callback) = 0;
389
390 virtual void multi(
391 const Requests & requests,
392 MultiCallback callback) = 0;
393};
394
395}
396