1#include <Common/ZooKeeper/ZooKeeperImpl.h>
2#include <Common/Exception.h>
3#include <Common/ProfileEvents.h>
4#include <Common/setThreadName.h>
5
6#include <IO/WriteHelpers.h>
7#include <IO/ReadHelpers.h>
8#include <IO/Operators.h>
9#include <IO/WriteBufferFromString.h>
10
11#include <Poco/Exception.h>
12#include <Poco/Net/NetException.h>
13
14#include <array>
15
16
17/// ZooKeeper has 1 MB node size and serialization limit by default,
18/// but it can be raised up, so we have a slightly larger limit on our side.
19#define MAX_STRING_OR_ARRAY_SIZE (1 << 28) /// 256 MiB
20
21
22namespace ProfileEvents
23{
24 extern const Event ZooKeeperInit;
25 extern const Event ZooKeeperTransactions;
26 extern const Event ZooKeeperCreate;
27 extern const Event ZooKeeperRemove;
28 extern const Event ZooKeeperExists;
29 extern const Event ZooKeeperMulti;
30 extern const Event ZooKeeperGet;
31 extern const Event ZooKeeperSet;
32 extern const Event ZooKeeperList;
33 extern const Event ZooKeeperCheck;
34 extern const Event ZooKeeperClose;
35 extern const Event ZooKeeperWaitMicroseconds;
36 extern const Event ZooKeeperBytesSent;
37 extern const Event ZooKeeperBytesReceived;
38 extern const Event ZooKeeperWatchResponse;
39}
40
41namespace CurrentMetrics
42{
43 extern const Metric ZooKeeperRequest;
44 extern const Metric ZooKeeperWatch;
45}
46
47
48/** ZooKeeper wire protocol.
49
50Debugging example:
51strace -t -f -e trace=network -s1000 -x ./clickhouse-zookeeper-cli localhost:2181
52
53All numbers are in network byte order (big endian). Sizes are 32 bit. Numbers are signed.
54
55zxid - incremental transaction number at server side.
56xid - unique request number at client side.
57
58Client connects to one of the specified hosts.
59Client sends:
60
61int32_t sizeof_connect_req; \x00\x00\x00\x2c (44 bytes)
62
63struct connect_req
64{
65 int32_t protocolVersion; \x00\x00\x00\x00 (Currently zero)
66 int64_t lastZxidSeen; \x00\x00\x00\x00\x00\x00\x00\x00 (Zero at first connect)
67 int32_t timeOut; \x00\x00\x75\x30 (Session timeout in milliseconds: 30000)
68 int64_t sessionId; \x00\x00\x00\x00\x00\x00\x00\x00 (Zero at first connect)
69 int32_t passwd_len; \x00\x00\x00\x10 (16)
70 char passwd[16]; \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 (Zero at first connect)
71};
72
73Server replies:
74
75struct prime_struct
76{
77 int32_t len; \x00\x00\x00\x24 (36 bytes)
78 int32_t protocolVersion; \x00\x00\x00\x00
79 int32_t timeOut; \x00\x00\x75\x30
80 int64_t sessionId; \x01\x62\x2c\x3d\x82\x43\x00\x27
81 int32_t passwd_len; \x00\x00\x00\x10
82 char passwd[16]; \x3b\x8c\xe0\xd4\x1f\x34\xbc\x88\x9c\xa7\x68\x69\x78\x64\x98\xe9
83};
84
85Client remembers session id and session password.
86
87
88Client may send authentication request (optional).
89
90
91Each one third of timeout, client sends heartbeat:
92
93int32_t length_of_heartbeat_request \x00\x00\x00\x08 (8)
94int32_t ping_xid \xff\xff\xff\xfe (-2, constant)
95int32_t ping_op \x00\x00\x00\x0b ZOO_PING_OP 11
96
97Server replies:
98
99int32_t length_of_heartbeat_response \x00\x00\x00\x10
100int32_t ping_xid \xff\xff\xff\xfe
101int64 zxid \x00\x00\x00\x00\x00\x01\x87\x98 (incremental server generated number)
102int32_t err \x00\x00\x00\x00
103
104
105Client sends requests. For example, create persistent node '/hello' with value 'world'.
106
107int32_t request_length \x00\x00\x00\x3a
108int32_t xid \x5a\xad\x72\x3f Arbitary number. Used for identification of requests/responses.
109 libzookeeper uses unix timestamp for first xid and then autoincrement to that value.
110int32_t op_num \x00\x00\x00\x01 ZOO_CREATE_OP 1
111int32_t path_length \x00\x00\x00\x06
112path \x2f\x68\x65\x6c\x6c\x6f /hello
113int32_t data_length \x00\x00\x00\x05
114data \x77\x6f\x72\x6c\x64 world
115ACLs:
116 int32_t num_acls \x00\x00\x00\x01
117 ACL:
118 int32_t permissions \x00\x00\x00\x1f
119 string scheme \x00\x00\x00\x05
120 \x77\x6f\x72\x6c\x64 world
121 string id \x00\x00\x00\x06
122 \x61\x6e\x79\x6f\x6e\x65 anyone
123int32_t flags \x00\x00\x00\x00
124
125Server replies:
126
127int32_t response_length \x00\x00\x00\x1a
128int32_t xid \x5a\xad\x72\x3f
129int64 zxid \x00\x00\x00\x00\x00\x01\x87\x99
130int32_t err \x00\x00\x00\x00
131string path_created \x00\x00\x00\x06
132 \x2f\x68\x65\x6c\x6c\x6f /hello - may differ to original path in case of sequential nodes.
133
134
135Client may place a watch in their request.
136
137For example, client sends "exists" request with watch:
138
139request length \x00\x00\x00\x12
140xid \x5a\xae\xb2\x0d
141op_num \x00\x00\x00\x03
142path \x00\x00\x00\x05
143 \x2f\x74\x65\x73\x74 /test
144bool watch \x01
145
146Server will send response as usual.
147And later, server may send special watch event.
148
149struct WatcherEvent
150{
151 int32_t type;
152 int32_t state;
153 char * path;
154};
155
156response length \x00\x00\x00\x21
157special watch xid \xff\xff\xff\xff
158special watch zxid \xff\xff\xff\xff\xff\xff\xff\xff
159err \x00\x00\x00\x00
160type \x00\x00\x00\x02 DELETED_EVENT_DEF 2
161state \x00\x00\x00\x03 CONNECTED_STATE_DEF 3
162path \x00\x00\x00\x05
163 \x2f\x74\x65\x73\x74 /test
164
165
166Example of multi request:
167
168request length \x00\x00\x00\x82 130
169xid \x5a\xae\xd6\x16
170op_num \x00\x00\x00\x0e 14
171
172for every command:
173
174 int32_t type; \x00\x00\x00\x01 create
175 bool done; \x00 false
176 int32_t err; \xff\xff\xff\xff -1
177
178 path \x00\x00\x00\x05
179 \x2f\x74\x65\x73\x74 /test
180 data \x00\x00\x00\x06
181 \x6d\x75\x6c\x74\x69\x31 multi1
182 acl \x00\x00\x00\x01
183 \x00\x00\x00\x1f
184 \x00\x00\x00\x05
185 \x77\x6f\x72\x6c\x64 world
186 \x00\x00\x00\x06
187 \x61\x6e\x79\x6f\x6e\x65 anyone
188 flags \x00\x00\x00\x00
189
190 int32_t type; \x00\x00\x00\x05 set
191 bool done \x00 false
192 int32_t err; \xff\xff\xff\xff -1
193
194 path \x00\x00\x00\x05
195 \x2f\x74\x65\x73\x74
196 data \x00\x00\x00\x06
197 \x6d\x75\x6c\x74\x69\x32 multi2
198 version \xff\xff\xff\xff
199
200 int32_t type \x00\x00\x00\x02 remove
201 bool done \x00
202 int32_t err \xff\xff\xff\xff -1
203
204 path \x00\x00\x00\x05
205 \x2f\x74\x65\x73\x74
206 version \xff\xff\xff\xff
207
208after commands:
209
210 int32_t type \xff\xff\xff\xff -1
211 bool done \x01 true
212 int32_t err \xff\xff\xff\xff
213
214Example of multi response:
215
216response length \x00\x00\x00\x81 129
217xid \x5a\xae\xd6\x16
218zxid \x00\x00\x00\x00\x00\x01\x87\xe1
219err \x00\x00\x00\x00
220
221in a loop:
222
223 type \x00\x00\x00\x01 create
224 done \x00
225 err \x00\x00\x00\x00
226
227 path_created \x00\x00\x00\x05
228 \x2f\x74\x65\x73\x74
229
230 type \x00\x00\x00\x05 set
231 done \x00
232 err \x00\x00\x00\x00
233
234 stat \x00\x00\x00\x00\x00\x01\x87\xe1
235 \x00\x00\x00\x00\x00\x01\x87\xe1
236 \x00\x00\x01\x62\x3a\xf4\x35\x0c
237 \x00\x00\x01\x62\x3a\xf4\x35\x0c
238 \x00\x00\x00\x01
239 \x00\x00\x00\x00
240 \x00\x00\x00\x00
241 \x00\x00\x00\x00\x00\x00\x00\x00
242 \x00\x00\x00\x06
243 \x00\x00\x00\x00
244 \x00\x00\x00\x00\x00\x01\x87\xe1
245
246 type \x00\x00\x00\x02 remove
247 done \x00
248 err \x00\x00\x00\x00
249
250after:
251
252 type \xff\xff\xff\xff
253 done \x01
254 err \xff\xff\xff\xff
255
256 */
257
258
259namespace Coordination
260{
261
262using namespace DB;
263
264
265/// Assuming we are at little endian.
266
267static void write(int64_t x, WriteBuffer & out)
268{
269 x = __builtin_bswap64(x);
270 writeBinary(x, out);
271}
272
273static void write(int32_t x, WriteBuffer & out)
274{
275 x = __builtin_bswap32(x);
276 writeBinary(x, out);
277}
278
279static void write(bool x, WriteBuffer & out)
280{
281 writeBinary(x, out);
282}
283
284static void write(const String & s, WriteBuffer & out)
285{
286 write(int32_t(s.size()), out);
287 out.write(s.data(), s.size());
288}
289
290template <size_t N> void write(std::array<char, N> s, WriteBuffer & out)
291{
292 write(int32_t(N), out);
293 out.write(s.data(), N);
294}
295
296template <typename T> void write(const std::vector<T> & arr, WriteBuffer & out)
297{
298 write(int32_t(arr.size()), out);
299 for (const auto & elem : arr)
300 write(elem, out);
301}
302
303static void write(const ACL & acl, WriteBuffer & out)
304{
305 write(acl.permissions, out);
306 write(acl.scheme, out);
307 write(acl.id, out);
308}
309
310
311static void read(int64_t & x, ReadBuffer & in)
312{
313 readBinary(x, in);
314 x = __builtin_bswap64(x);
315}
316
317static void read(int32_t & x, ReadBuffer & in)
318{
319 readBinary(x, in);
320 x = __builtin_bswap32(x);
321}
322
323static void read(bool & x, ReadBuffer & in)
324{
325 readBinary(x, in);
326}
327
328static void read(String & s, ReadBuffer & in)
329{
330 int32_t size = 0;
331 read(size, in);
332
333 if (size == -1)
334 {
335 /// It means that zookeeper node has NULL value. We will treat it like empty string.
336 s.clear();
337 return;
338 }
339
340 if (size < 0)
341 throw Exception("Negative size while reading string from ZooKeeper", ZMARSHALLINGERROR);
342
343 if (size > MAX_STRING_OR_ARRAY_SIZE)
344 throw Exception("Too large string size while reading from ZooKeeper", ZMARSHALLINGERROR);
345
346 s.resize(size);
347 in.read(s.data(), size);
348}
349
350template <size_t N> void read(std::array<char, N> & s, ReadBuffer & in)
351{
352 int32_t size = 0;
353 read(size, in);
354 if (size != N)
355 throw Exception("Unexpected array size while reading from ZooKeeper", ZMARSHALLINGERROR);
356 in.read(s.data(), N);
357}
358
359static void read(Stat & stat, ReadBuffer & in)
360{
361 read(stat.czxid, in);
362 read(stat.mzxid, in);
363 read(stat.ctime, in);
364 read(stat.mtime, in);
365 read(stat.version, in);
366 read(stat.cversion, in);
367 read(stat.aversion, in);
368 read(stat.ephemeralOwner, in);
369 read(stat.dataLength, in);
370 read(stat.numChildren, in);
371 read(stat.pzxid, in);
372}
373
374template <typename T> void read(std::vector<T> & arr, ReadBuffer & in)
375{
376 int32_t size = 0;
377 read(size, in);
378 if (size < 0)
379 throw Exception("Negative size while reading array from ZooKeeper", ZMARSHALLINGERROR);
380 if (size > MAX_STRING_OR_ARRAY_SIZE)
381 throw Exception("Too large array size while reading from ZooKeeper", ZMARSHALLINGERROR);
382 arr.resize(size);
383 for (auto & elem : arr)
384 read(elem, in);
385}
386
387
388template <typename T>
389void ZooKeeper::write(const T & x)
390{
391 Coordination::write(x, *out);
392}
393
394template <typename T>
395void ZooKeeper::read(T & x)
396{
397 Coordination::read(x, *in);
398}
399
400
401void ZooKeeperRequest::write(WriteBuffer & out) const
402{
403 /// Excessive copy to calculate length.
404 WriteBufferFromOwnString buf;
405 Coordination::write(xid, buf);
406 Coordination::write(getOpNum(), buf);
407 writeImpl(buf);
408 Coordination::write(buf.str(), out);
409 out.next();
410}
411
412
413struct ZooKeeperResponse : virtual Response
414{
415 virtual ~ZooKeeperResponse() {}
416 virtual void readImpl(ReadBuffer &) = 0;
417};
418
419
420struct ZooKeeperHeartbeatRequest final : ZooKeeperRequest
421{
422 String getPath() const override { return {}; }
423 ZooKeeper::OpNum getOpNum() const override { return 11; }
424 void writeImpl(WriteBuffer &) const override {}
425 ZooKeeperResponsePtr makeResponse() const override;
426};
427
428struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse
429{
430 void readImpl(ReadBuffer &) override {}
431};
432
433struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
434{
435 void readImpl(ReadBuffer & in) override
436 {
437 Coordination::read(type, in);
438 Coordination::read(state, in);
439 Coordination::read(path, in);
440 }
441};
442
443struct ZooKeeperAuthRequest final : ZooKeeperRequest
444{
445 int32_t type = 0; /// ignored by the server
446 String scheme;
447 String data;
448
449 String getPath() const override { return {}; }
450 ZooKeeper::OpNum getOpNum() const override { return 100; }
451 void writeImpl(WriteBuffer & out) const override
452 {
453 Coordination::write(type, out);
454 Coordination::write(scheme, out);
455 Coordination::write(data, out);
456 }
457 ZooKeeperResponsePtr makeResponse() const override;
458};
459
460struct ZooKeeperAuthResponse final : ZooKeeperResponse
461{
462 void readImpl(ReadBuffer &) override {}
463};
464
465struct ZooKeeperCloseRequest final : ZooKeeperRequest
466{
467 String getPath() const override { return {}; }
468 ZooKeeper::OpNum getOpNum() const override { return -11; }
469 void writeImpl(WriteBuffer &) const override {}
470 ZooKeeperResponsePtr makeResponse() const override;
471};
472
473struct ZooKeeperCloseResponse final : ZooKeeperResponse
474{
475 void readImpl(ReadBuffer &) override
476 {
477 throw Exception("Received response for close request", ZRUNTIMEINCONSISTENCY);
478 }
479};
480
481struct ZooKeeperCreateRequest final : CreateRequest, ZooKeeperRequest
482{
483 ZooKeeperCreateRequest() {}
484 ZooKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
485
486 ZooKeeper::OpNum getOpNum() const override { return 1; }
487 void writeImpl(WriteBuffer & out) const override
488 {
489 Coordination::write(path, out);
490 Coordination::write(data, out);
491 Coordination::write(acls, out);
492
493 int32_t flags = 0;
494
495 if (is_ephemeral)
496 flags |= 1;
497 if (is_sequential)
498 flags |= 2;
499
500 Coordination::write(flags, out);
501 }
502 ZooKeeperResponsePtr makeResponse() const override;
503};
504
505struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
506{
507 void readImpl(ReadBuffer & in) override
508 {
509 Coordination::read(path_created, in);
510 }
511};
512
513struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
514{
515 ZooKeeperRemoveRequest() {}
516 ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
517
518 ZooKeeper::OpNum getOpNum() const override { return 2; }
519 void writeImpl(WriteBuffer & out) const override
520 {
521 Coordination::write(path, out);
522 Coordination::write(version, out);
523 }
524 ZooKeeperResponsePtr makeResponse() const override;
525};
526
527struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
528{
529 void readImpl(ReadBuffer &) override {}
530};
531
532struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
533{
534 ZooKeeper::OpNum getOpNum() const override { return 3; }
535 void writeImpl(WriteBuffer & out) const override
536 {
537 Coordination::write(path, out);
538 Coordination::write(has_watch, out);
539 }
540 ZooKeeperResponsePtr makeResponse() const override;
541};
542
543struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
544{
545 void readImpl(ReadBuffer & in) override
546 {
547 Coordination::read(stat, in);
548 }
549};
550
551struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
552{
553 ZooKeeper::OpNum getOpNum() const override { return 4; }
554 void writeImpl(WriteBuffer & out) const override
555 {
556 Coordination::write(path, out);
557 Coordination::write(has_watch, out);
558 }
559 ZooKeeperResponsePtr makeResponse() const override;
560};
561
562struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
563{
564 void readImpl(ReadBuffer & in) override
565 {
566 Coordination::read(data, in);
567 Coordination::read(stat, in);
568 }
569};
570
571struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
572{
573 ZooKeeperSetRequest() {}
574 ZooKeeperSetRequest(const SetRequest & base) : SetRequest(base) {}
575
576 ZooKeeper::OpNum getOpNum() const override { return 5; }
577 void writeImpl(WriteBuffer & out) const override
578 {
579 Coordination::write(path, out);
580 Coordination::write(data, out);
581 Coordination::write(version, out);
582 }
583 ZooKeeperResponsePtr makeResponse() const override;
584};
585
586struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
587{
588 void readImpl(ReadBuffer & in) override
589 {
590 Coordination::read(stat, in);
591 }
592};
593
594struct ZooKeeperListRequest final : ListRequest, ZooKeeperRequest
595{
596 ZooKeeper::OpNum getOpNum() const override { return 12; }
597 void writeImpl(WriteBuffer & out) const override
598 {
599 Coordination::write(path, out);
600 Coordination::write(has_watch, out);
601 }
602 ZooKeeperResponsePtr makeResponse() const override;
603};
604
605struct ZooKeeperListResponse final : ListResponse, ZooKeeperResponse
606{
607 void readImpl(ReadBuffer & in) override
608 {
609 Coordination::read(names, in);
610 Coordination::read(stat, in);
611 }
612};
613
614struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
615{
616 ZooKeeperCheckRequest() {}
617 ZooKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
618
619 ZooKeeper::OpNum getOpNum() const override { return 13; }
620 void writeImpl(WriteBuffer & out) const override
621 {
622 Coordination::write(path, out);
623 Coordination::write(version, out);
624 }
625 ZooKeeperResponsePtr makeResponse() const override;
626};
627
628struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
629{
630 void readImpl(ReadBuffer &) override {}
631};
632
633/// This response may be received only as an element of responses in MultiResponse.
634struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
635{
636 void readImpl(ReadBuffer & in) override
637 {
638 int32_t read_error;
639 Coordination::read(read_error, in);
640
641 if (read_error != error)
642 throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")",
643 ZMARSHALLINGERROR);
644 }
645};
646
647struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
648{
649 ZooKeeper::OpNum getOpNum() const override { return 14; }
650
651 ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls)
652 {
653 /// Convert nested Requests to ZooKeeperRequests.
654 /// Note that deep copy is required to avoid modifying path in presence of chroot prefix.
655 requests.reserve(generic_requests.size());
656
657 for (const auto & generic_request : generic_requests)
658 {
659 if (auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get()))
660 {
661 auto create = std::make_shared<ZooKeeperCreateRequest>(*concrete_request_create);
662 if (create->acls.empty())
663 create->acls = default_acls;
664 requests.push_back(create);
665 }
666 else if (auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get()))
667 {
668 requests.push_back(std::make_shared<ZooKeeperRemoveRequest>(*concrete_request_remove));
669 }
670 else if (auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
671 {
672 requests.push_back(std::make_shared<ZooKeeperSetRequest>(*concrete_request_set));
673 }
674 else if (auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get()))
675 {
676 requests.push_back(std::make_shared<ZooKeeperCheckRequest>(*concrete_request_check));
677 }
678 else
679 throw Exception("Illegal command as part of multi ZooKeeper request", ZBADARGUMENTS);
680 }
681 }
682
683 void writeImpl(WriteBuffer & out) const override
684 {
685 for (const auto & request : requests)
686 {
687 const auto & zk_request = dynamic_cast<const ZooKeeperRequest &>(*request);
688
689 bool done = false;
690 int32_t error = -1;
691
692 Coordination::write(zk_request.getOpNum(), out);
693 Coordination::write(done, out);
694 Coordination::write(error, out);
695
696 zk_request.writeImpl(out);
697 }
698
699 ZooKeeper::OpNum op_num = -1;
700 bool done = true;
701 int32_t error = -1;
702
703 Coordination::write(op_num, out);
704 Coordination::write(done, out);
705 Coordination::write(error, out);
706 }
707
708 ZooKeeperResponsePtr makeResponse() const override;
709};
710
711struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
712{
713 ZooKeeperMultiResponse(const Requests & requests)
714 {
715 responses.reserve(requests.size());
716
717 for (const auto & request : requests)
718 responses.emplace_back(dynamic_cast<const ZooKeeperRequest &>(*request).makeResponse());
719 }
720
721 void readImpl(ReadBuffer & in) override
722 {
723 for (auto & response : responses)
724 {
725 ZooKeeper::OpNum op_num;
726 bool done;
727 int32_t op_error;
728
729 Coordination::read(op_num, in);
730 Coordination::read(done, in);
731 Coordination::read(op_error, in);
732
733 if (done)
734 throw Exception("Not enough results received for multi transaction", ZMARSHALLINGERROR);
735
736 /// op_num == -1 is special for multi transaction.
737 /// For unknown reason, error code is duplicated in header and in response body.
738
739 if (op_num == -1)
740 response = std::make_shared<ZooKeeperErrorResponse>();
741
742 if (op_error)
743 {
744 response->error = op_error;
745
746 /// Set error for whole transaction.
747 /// If some operations fail, ZK send global error as zero and then send details about each operation.
748 /// It will set error code for first failed operation and it will set special "runtime inconsistency" code for other operations.
749 if (!error && op_error != ZRUNTIMEINCONSISTENCY)
750 error = op_error;
751 }
752
753 if (!op_error || op_num == -1)
754 dynamic_cast<ZooKeeperResponse &>(*response).readImpl(in);
755 }
756
757 /// Footer.
758 {
759 ZooKeeper::OpNum op_num;
760 bool done;
761 int32_t error_;
762
763 Coordination::read(op_num, in);
764 Coordination::read(done, in);
765 Coordination::read(error_, in);
766
767 if (!done)
768 throw Exception("Too many results received for multi transaction", ZMARSHALLINGERROR);
769 if (op_num != -1)
770 throw Exception("Unexpected op_num received at the end of results for multi transaction", ZMARSHALLINGERROR);
771 if (error_ != -1)
772 throw Exception("Unexpected error value received at the end of results for multi transaction", ZMARSHALLINGERROR);
773 }
774 }
775};
776
777
778ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return std::make_shared<ZooKeeperHeartbeatResponse>(); }
779ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return std::make_shared<ZooKeeperAuthResponse>(); }
780ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const { return std::make_shared<ZooKeeperCreateResponse>(); }
781ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return std::make_shared<ZooKeeperRemoveResponse>(); }
782ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return std::make_shared<ZooKeeperExistsResponse>(); }
783ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return std::make_shared<ZooKeeperGetResponse>(); }
784ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return std::make_shared<ZooKeeperSetResponse>(); }
785ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::make_shared<ZooKeeperListResponse>(); }
786ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::make_shared<ZooKeeperCheckResponse>(); }
787ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared<ZooKeeperMultiResponse>(requests); }
788ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
789
790
791static constexpr int32_t protocol_version = 0;
792
793static constexpr ZooKeeper::XID watch_xid = -1;
794static constexpr ZooKeeper::XID ping_xid = -2;
795static constexpr ZooKeeper::XID auth_xid = -4;
796
797static constexpr ZooKeeper::XID close_xid = 0x7FFFFFFF;
798
799
800ZooKeeper::~ZooKeeper()
801{
802 try
803 {
804 finalize(false, false);
805
806 if (send_thread.joinable())
807 send_thread.join();
808
809 if (receive_thread.joinable())
810 receive_thread.join();
811 }
812 catch (...)
813 {
814 tryLogCurrentException(__PRETTY_FUNCTION__);
815 }
816}
817
818
819ZooKeeper::ZooKeeper(
820 const Addresses & addresses,
821 const String & root_path_,
822 const String & auth_scheme,
823 const String & auth_data,
824 Poco::Timespan session_timeout_,
825 Poco::Timespan connection_timeout,
826 Poco::Timespan operation_timeout_)
827 : root_path(root_path_),
828 session_timeout(session_timeout_),
829 operation_timeout(std::min(operation_timeout_, session_timeout_))
830{
831 if (!root_path.empty())
832 {
833 if (root_path.back() == '/')
834 root_path.pop_back();
835 }
836
837 if (auth_scheme.empty())
838 {
839 ACL acl;
840 acl.permissions = ACL::All;
841 acl.scheme = "world";
842 acl.id = "anyone";
843 default_acls.emplace_back(std::move(acl));
844 }
845 else
846 {
847 ACL acl;
848 acl.permissions = ACL::All;
849 acl.scheme = "auth";
850 acl.id = "";
851 default_acls.emplace_back(std::move(acl));
852 }
853
854 connect(addresses, connection_timeout);
855
856 if (!auth_scheme.empty())
857 sendAuth(auth_scheme, auth_data);
858
859 send_thread = ThreadFromGlobalPool([this] { sendThread(); });
860 receive_thread = ThreadFromGlobalPool([this] { receiveThread(); });
861
862 ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
863}
864
865
866void ZooKeeper::connect(
867 const Addresses & addresses,
868 Poco::Timespan connection_timeout)
869{
870 if (addresses.empty())
871 throw Exception("No addresses passed to ZooKeeper constructor", ZBADARGUMENTS);
872
873 static constexpr size_t num_tries = 3;
874 bool connected = false;
875
876 WriteBufferFromOwnString fail_reasons;
877 for (size_t try_no = 0; try_no < num_tries; ++try_no)
878 {
879 for (const auto & address : addresses)
880 {
881 try
882 {
883 socket = Poco::Net::StreamSocket(); /// Reset the state of previous attempt.
884 socket.connect(address, connection_timeout);
885
886 socket.setReceiveTimeout(operation_timeout);
887 socket.setSendTimeout(operation_timeout);
888 socket.setNoDelay(true);
889
890 in.emplace(socket);
891 out.emplace(socket);
892
893 try
894 {
895 sendHandshake();
896 }
897 catch (DB::Exception & e)
898 {
899 e.addMessage("while sending handshake to ZooKeeper");
900 throw;
901 }
902
903 try
904 {
905 receiveHandshake();
906 }
907 catch (DB::Exception & e)
908 {
909 e.addMessage("while receiving handshake from ZooKeeper");
910 throw;
911 }
912
913 connected = true;
914 break;
915 }
916 catch (...)
917 {
918 fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << address.toString();
919 }
920 }
921
922 if (connected)
923 break;
924 }
925
926 if (!connected)
927 {
928 WriteBufferFromOwnString message;
929 message << "All connection tries failed while connecting to ZooKeeper. Addresses: ";
930 bool first = true;
931 for (const auto & address : addresses)
932 {
933 if (first)
934 first = false;
935 else
936 message << ", ";
937 message << address.toString();
938 }
939
940 message << fail_reasons.str() << "\n";
941 throw Exception(message.str(), ZCONNECTIONLOSS);
942 }
943}
944
945
946void ZooKeeper::sendHandshake()
947{
948 int32_t handshake_length = 44;
949 int64_t last_zxid_seen = 0;
950 int32_t timeout = session_timeout.totalMilliseconds();
951 int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
952 constexpr int32_t passwd_len = 16;
953 std::array<char, passwd_len> passwd {};
954
955 write(handshake_length);
956 write(protocol_version);
957 write(last_zxid_seen);
958 write(timeout);
959 write(previous_session_id);
960 write(passwd);
961
962 out->next();
963}
964
965
966void ZooKeeper::receiveHandshake()
967{
968 int32_t handshake_length;
969 int32_t protocol_version_read;
970 int32_t timeout;
971 constexpr int32_t passwd_len = 16;
972 std::array<char, passwd_len> passwd;
973
974 read(handshake_length);
975 if (handshake_length != 36)
976 throw Exception("Unexpected handshake length received: " + toString(handshake_length), ZMARSHALLINGERROR);
977
978 read(protocol_version_read);
979 if (protocol_version_read != protocol_version)
980 throw Exception("Unexpected protocol version: " + toString(protocol_version_read), ZMARSHALLINGERROR);
981
982 read(timeout);
983 if (timeout != session_timeout.totalMilliseconds())
984 /// Use timeout from server.
985 session_timeout = timeout * Poco::Timespan::MILLISECONDS;
986
987 read(session_id);
988 read(passwd);
989}
990
991
992void ZooKeeper::sendAuth(const String & scheme, const String & data)
993{
994 ZooKeeperAuthRequest request;
995 request.scheme = scheme;
996 request.data = data;
997 request.xid = auth_xid;
998 request.write(*out);
999
1000 int32_t length;
1001 XID read_xid;
1002 int64_t zxid;
1003 int32_t err;
1004
1005 read(length);
1006 size_t count_before_event = in->count();
1007 read(read_xid);
1008 read(zxid);
1009 read(err);
1010
1011 if (read_xid != auth_xid)
1012 throw Exception("Unexpected event received in reply to auth request: " + toString(read_xid),
1013 ZMARSHALLINGERROR);
1014
1015 int32_t actual_length = in->count() - count_before_event;
1016 if (length != actual_length)
1017 throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length),
1018 ZMARSHALLINGERROR);
1019
1020 if (err)
1021 throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)),
1022 ZMARSHALLINGERROR);
1023}
1024
1025
1026void ZooKeeper::sendThread()
1027{
1028 setThreadName("ZooKeeperSend");
1029
1030 auto prev_heartbeat_time = clock::now();
1031
1032 try
1033 {
1034 while (!expired)
1035 {
1036 auto prev_bytes_sent = out->count();
1037
1038 auto now = clock::now();
1039 auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(session_timeout.totalMilliseconds() / 3);
1040
1041 if (next_heartbeat_time > now)
1042 {
1043 /// Wait for the next request in queue. No more than operation timeout. No more than until next heartbeat time.
1044 UInt64 max_wait = std::min(
1045 UInt64(std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count()),
1046 UInt64(operation_timeout.totalMilliseconds()));
1047
1048 RequestInfo info;
1049 if (requests_queue.tryPop(info, max_wait))
1050 {
1051 /// After we popped element from the queue, we must register callbacks (even in the case when expired == true right now),
1052 /// because they must not be lost (callbacks must be called because the user will wait for them).
1053
1054 if (info.request->xid != close_xid)
1055 {
1056 CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
1057 std::lock_guard lock(operations_mutex);
1058 operations[info.request->xid] = info;
1059 }
1060
1061 if (info.watch)
1062 {
1063 info.request->has_watch = true;
1064 CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
1065 std::lock_guard lock(watches_mutex);
1066 watches[info.request->getPath()].emplace_back(std::move(info.watch));
1067 }
1068
1069 if (expired)
1070 break;
1071
1072 info.request->addRootPath(root_path);
1073
1074 info.request->probably_sent = true;
1075 info.request->write(*out);
1076
1077 if (info.request->xid == close_xid)
1078 break;
1079 }
1080 }
1081 else
1082 {
1083 /// Send heartbeat.
1084 prev_heartbeat_time = clock::now();
1085
1086 ZooKeeperHeartbeatRequest request;
1087 request.xid = ping_xid;
1088 request.write(*out);
1089 }
1090
1091 ProfileEvents::increment(ProfileEvents::ZooKeeperBytesSent, out->count() - prev_bytes_sent);
1092 }
1093 }
1094 catch (...)
1095 {
1096 tryLogCurrentException(__PRETTY_FUNCTION__);
1097 finalize(true, false);
1098 }
1099}
1100
1101
1102void ZooKeeper::receiveThread()
1103{
1104 setThreadName("ZooKeeperRecv");
1105
1106 try
1107 {
1108 Int64 waited = 0;
1109 while (!expired)
1110 {
1111 auto prev_bytes_received = in->count();
1112
1113 clock::time_point now = clock::now();
1114 UInt64 max_wait = operation_timeout.totalMicroseconds();
1115 std::optional<RequestInfo> earliest_operation;
1116
1117 {
1118 std::lock_guard lock(operations_mutex);
1119 if (!operations.empty())
1120 {
1121 /// Operations are ordered by xid (and consequently, by time).
1122 earliest_operation = operations.begin()->second;
1123 auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
1124 if (now > earliest_operation_deadline)
1125 throw Exception("Operation timeout (deadline already expired) for path: " + earliest_operation->request->getPath(), ZOPERATIONTIMEOUT);
1126 max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
1127 }
1128 }
1129
1130 if (in->poll(max_wait))
1131 {
1132 if (expired)
1133 break;
1134
1135 receiveEvent();
1136 waited = 0;
1137 }
1138 else
1139 {
1140 if (earliest_operation)
1141 throw Exception("Operation timeout (no response) for path: " + earliest_operation->request->getPath(), ZOPERATIONTIMEOUT);
1142 waited += max_wait;
1143 if (waited >= session_timeout.totalMicroseconds())
1144 throw Exception("Nothing is received in session timeout", ZOPERATIONTIMEOUT);
1145
1146 }
1147
1148 ProfileEvents::increment(ProfileEvents::ZooKeeperBytesReceived, in->count() - prev_bytes_received);
1149 }
1150 }
1151 catch (...)
1152 {
1153 tryLogCurrentException(__PRETTY_FUNCTION__);
1154 finalize(false, true);
1155 }
1156}
1157
1158
1159void ZooKeeper::receiveEvent()
1160{
1161 int32_t length;
1162 XID xid;
1163 int64_t zxid;
1164 int32_t err;
1165
1166 read(length);
1167 size_t count_before_event = in->count();
1168 read(xid);
1169 read(zxid);
1170 read(err);
1171
1172 RequestInfo request_info;
1173 ZooKeeperResponsePtr response;
1174
1175 if (xid == ping_xid)
1176 {
1177 if (err)
1178 throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), ZRUNTIMEINCONSISTENCY);
1179
1180 response = std::make_shared<ZooKeeperHeartbeatResponse>();
1181 }
1182 else if (xid == watch_xid)
1183 {
1184 ProfileEvents::increment(ProfileEvents::ZooKeeperWatchResponse);
1185 response = std::make_shared<ZooKeeperWatchResponse>();
1186
1187 request_info.callback = [this](const Response & response_)
1188 {
1189 const WatchResponse & watch_response = dynamic_cast<const WatchResponse &>(response_);
1190
1191 std::lock_guard lock(watches_mutex);
1192
1193 auto it = watches.find(watch_response.path);
1194 if (it == watches.end())
1195 {
1196 /// This is Ok.
1197 /// Because watches are identified by path.
1198 /// And there may exist many watches for single path.
1199 /// And watch is added to the list of watches on client side
1200 /// slightly before than it is registered by the server.
1201 /// And that's why new watch may be already fired by old event,
1202 /// but then the server will actually register new watch
1203 /// and will send event again later.
1204 }
1205 else
1206 {
1207 for (auto & callback : it->second)
1208 if (callback)
1209 callback(watch_response); /// NOTE We may process callbacks not under mutex.
1210
1211 CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, it->second.size());
1212 watches.erase(it);
1213 }
1214 };
1215 }
1216 else
1217 {
1218 {
1219 std::lock_guard lock(operations_mutex);
1220
1221 auto it = operations.find(xid);
1222 if (it == operations.end())
1223 throw Exception("Received response for unknown xid", ZRUNTIMEINCONSISTENCY);
1224
1225 /// After this point, we must invoke callback, that we've grabbed from 'operations'.
1226 /// Invariant: all callbacks are invoked either in case of success or in case of error.
1227 /// (all callbacks in 'operations' are guaranteed to be invoked)
1228
1229 request_info = std::move(it->second);
1230 operations.erase(it);
1231 CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest);
1232 }
1233
1234 auto elapsed_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - request_info.time).count();
1235 ProfileEvents::increment(ProfileEvents::ZooKeeperWaitMicroseconds, elapsed_microseconds);
1236 }
1237
1238 try
1239 {
1240 if (!response)
1241 response = request_info.request->makeResponse();
1242
1243 if (err)
1244 response->error = err;
1245 else
1246 {
1247 response->readImpl(*in);
1248 response->removeRootPath(root_path);
1249 }
1250
1251 int32_t actual_length = in->count() - count_before_event;
1252 if (length != actual_length)
1253 throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), ZMARSHALLINGERROR);
1254 }
1255 catch (...)
1256 {
1257 tryLogCurrentException(__PRETTY_FUNCTION__);
1258
1259 /// Unrecoverable. Don't leave incorrect state in memory.
1260 if (!response)
1261 std::terminate();
1262
1263 /// In case we cannot read the response, we should indicate it as the error of that type
1264 /// when the user cannot assume whether the request was processed or not.
1265 response->error = ZCONNECTIONLOSS;
1266 if (request_info.callback)
1267 request_info.callback(*response);
1268
1269 throw;
1270 }
1271
1272 /// Exception in callback will propagate to receiveThread and will lead to session expiration. This is Ok.
1273
1274 if (request_info.callback)
1275 request_info.callback(*response);
1276}
1277
1278
1279void ZooKeeper::finalize(bool error_send, bool error_receive)
1280{
1281 {
1282 std::lock_guard lock(push_request_mutex);
1283
1284 if (expired)
1285 return;
1286 expired = true;
1287 }
1288
1289 active_session_metric_increment.destroy();
1290
1291 try
1292 {
1293 if (!error_send)
1294 {
1295 /// Send close event. This also signals sending thread to wakeup and then stop.
1296 try
1297 {
1298 close();
1299 }
1300 catch (...)
1301 {
1302 /// This happens for example, when "Cannot push request to queue within operation timeout".
1303 tryLogCurrentException(__PRETTY_FUNCTION__);
1304 }
1305
1306 send_thread.join();
1307 }
1308
1309 try
1310 {
1311 /// This will also wakeup the receiving thread.
1312 socket.shutdown();
1313 }
1314 catch (...)
1315 {
1316 /// We must continue to execute all callbacks, because the user is waiting for them.
1317 tryLogCurrentException(__PRETTY_FUNCTION__);
1318 }
1319
1320 if (!error_receive)
1321 receive_thread.join();
1322
1323 {
1324 std::lock_guard lock(operations_mutex);
1325
1326 for (auto & op : operations)
1327 {
1328 RequestInfo & request_info = op.second;
1329 ResponsePtr response = request_info.request->makeResponse();
1330
1331 response->error = request_info.request->probably_sent
1332 ? ZCONNECTIONLOSS
1333 : ZSESSIONEXPIRED;
1334
1335 if (request_info.callback)
1336 {
1337 try
1338 {
1339 request_info.callback(*response);
1340 }
1341 catch (...)
1342 {
1343 /// We must continue to all other callbacks, because the user is waiting for them.
1344 tryLogCurrentException(__PRETTY_FUNCTION__);
1345 }
1346 }
1347 }
1348
1349 CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
1350 operations.clear();
1351 }
1352
1353 {
1354 std::lock_guard lock(watches_mutex);
1355
1356 for (auto & path_watches : watches)
1357 {
1358 WatchResponse response;
1359 response.type = SESSION;
1360 response.state = EXPIRED_SESSION;
1361 response.error = ZSESSIONEXPIRED;
1362
1363 for (auto & callback : path_watches.second)
1364 {
1365 if (callback)
1366 {
1367 try
1368 {
1369 callback(response);
1370 }
1371 catch (...)
1372 {
1373 tryLogCurrentException(__PRETTY_FUNCTION__);
1374 }
1375 }
1376 }
1377 }
1378
1379 CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
1380 watches.clear();
1381 }
1382
1383 /// Drain queue
1384 RequestInfo info;
1385 while (requests_queue.tryPop(info))
1386 {
1387 if (info.callback)
1388 {
1389 ResponsePtr response = info.request->makeResponse();
1390 if (response)
1391 {
1392 response->error = ZSESSIONEXPIRED;
1393 try
1394 {
1395 info.callback(*response);
1396 }
1397 catch (...)
1398 {
1399 tryLogCurrentException(__PRETTY_FUNCTION__);
1400 }
1401 }
1402 }
1403 if (info.watch)
1404 {
1405 WatchResponse response;
1406 response.type = SESSION;
1407 response.state = EXPIRED_SESSION;
1408 response.error = ZSESSIONEXPIRED;
1409 try
1410 {
1411 info.watch(response);
1412 }
1413 catch (...)
1414 {
1415 tryLogCurrentException(__PRETTY_FUNCTION__);
1416 }
1417 }
1418 }
1419 }
1420 catch (...)
1421 {
1422 tryLogCurrentException(__PRETTY_FUNCTION__);
1423 }
1424}
1425
1426
1427void ZooKeeper::pushRequest(RequestInfo && info)
1428{
1429 try
1430 {
1431 info.time = clock::now();
1432
1433 if (!info.request->xid)
1434 {
1435 info.request->xid = next_xid.fetch_add(1);
1436 if (info.request->xid == close_xid)
1437 throw Exception("xid equal to close_xid", ZSESSIONEXPIRED);
1438 if (info.request->xid < 0)
1439 throw Exception("XID overflow", ZSESSIONEXPIRED);
1440 }
1441
1442 /// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
1443 /// to avoid forgotten operations in the queue when session is expired.
1444 /// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
1445 /// and the queue will be drained in 'finalize'.
1446 std::lock_guard lock(push_request_mutex);
1447
1448 if (expired)
1449 throw Exception("Session expired", ZSESSIONEXPIRED);
1450
1451 if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
1452 throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
1453 }
1454 catch (...)
1455 {
1456 finalize(false, false);
1457 throw;
1458 }
1459
1460 ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
1461}
1462
1463
1464void ZooKeeper::create(
1465 const String & path,
1466 const String & data,
1467 bool is_ephemeral,
1468 bool is_sequential,
1469 const ACLs & acls,
1470 CreateCallback callback)
1471{
1472 ZooKeeperCreateRequest request;
1473 request.path = path;
1474 request.data = data;
1475 request.is_ephemeral = is_ephemeral;
1476 request.is_sequential = is_sequential;
1477 request.acls = acls.empty() ? default_acls : acls;
1478
1479 RequestInfo request_info;
1480 request_info.request = std::make_shared<ZooKeeperCreateRequest>(std::move(request));
1481 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CreateResponse &>(response)); };
1482
1483 pushRequest(std::move(request_info));
1484 ProfileEvents::increment(ProfileEvents::ZooKeeperCreate);
1485}
1486
1487
1488void ZooKeeper::remove(
1489 const String & path,
1490 int32_t version,
1491 RemoveCallback callback)
1492{
1493 ZooKeeperRemoveRequest request;
1494 request.path = path;
1495 request.version = version;
1496
1497 RequestInfo request_info;
1498 request_info.request = std::make_shared<ZooKeeperRemoveRequest>(std::move(request));
1499 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); };
1500
1501 pushRequest(std::move(request_info));
1502 ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
1503}
1504
1505
1506void ZooKeeper::exists(
1507 const String & path,
1508 ExistsCallback callback,
1509 WatchCallback watch)
1510{
1511 ZooKeeperExistsRequest request;
1512 request.path = path;
1513
1514 RequestInfo request_info;
1515 request_info.request = std::make_shared<ZooKeeperExistsRequest>(std::move(request));
1516 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ExistsResponse &>(response)); };
1517 request_info.watch = watch;
1518
1519 pushRequest(std::move(request_info));
1520 ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
1521}
1522
1523
1524void ZooKeeper::get(
1525 const String & path,
1526 GetCallback callback,
1527 WatchCallback watch)
1528{
1529 ZooKeeperGetRequest request;
1530 request.path = path;
1531
1532 RequestInfo request_info;
1533 request_info.request = std::make_shared<ZooKeeperGetRequest>(std::move(request));
1534 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const GetResponse &>(response)); };
1535 request_info.watch = watch;
1536
1537 pushRequest(std::move(request_info));
1538 ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
1539}
1540
1541
1542void ZooKeeper::set(
1543 const String & path,
1544 const String & data,
1545 int32_t version,
1546 SetCallback callback)
1547{
1548 ZooKeeperSetRequest request;
1549 request.path = path;
1550 request.data = data;
1551 request.version = version;
1552
1553 RequestInfo request_info;
1554 request_info.request = std::make_shared<ZooKeeperSetRequest>(std::move(request));
1555 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SetResponse &>(response)); };
1556
1557 pushRequest(std::move(request_info));
1558 ProfileEvents::increment(ProfileEvents::ZooKeeperSet);
1559}
1560
1561
1562void ZooKeeper::list(
1563 const String & path,
1564 ListCallback callback,
1565 WatchCallback watch)
1566{
1567 ZooKeeperListRequest request;
1568 request.path = path;
1569
1570 RequestInfo request_info;
1571 request_info.request = std::make_shared<ZooKeeperListRequest>(std::move(request));
1572 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ListResponse &>(response)); };
1573 request_info.watch = watch;
1574
1575 pushRequest(std::move(request_info));
1576 ProfileEvents::increment(ProfileEvents::ZooKeeperList);
1577}
1578
1579
1580void ZooKeeper::check(
1581 const String & path,
1582 int32_t version,
1583 CheckCallback callback)
1584{
1585 ZooKeeperCheckRequest request;
1586 request.path = path;
1587 request.version = version;
1588
1589 RequestInfo request_info;
1590 request_info.request = std::make_shared<ZooKeeperCheckRequest>(std::move(request));
1591 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CheckResponse &>(response)); };
1592
1593 pushRequest(std::move(request_info));
1594 ProfileEvents::increment(ProfileEvents::ZooKeeperCheck);
1595}
1596
1597
1598void ZooKeeper::multi(
1599 const Requests & requests,
1600 MultiCallback callback)
1601{
1602 ZooKeeperMultiRequest request(requests, default_acls);
1603
1604 RequestInfo request_info;
1605 request_info.request = std::make_shared<ZooKeeperMultiRequest>(std::move(request));
1606 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); };
1607
1608 pushRequest(std::move(request_info));
1609 ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
1610}
1611
1612
1613void ZooKeeper::close()
1614{
1615 ZooKeeperCloseRequest request;
1616 request.xid = close_xid;
1617
1618 RequestInfo request_info;
1619 request_info.request = std::make_shared<ZooKeeperCloseRequest>(std::move(request));
1620
1621 if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
1622 throw Exception("Cannot push close request to queue within operation timeout", ZOPERATIONTIMEOUT);
1623
1624 ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
1625}
1626
1627
1628}
1629