1#include "ZooKeeper.h"
2#include "ZooKeeperImpl.h"
3#include "KeeperException.h"
4#include "TestKeeper.h"
5
6#include <random>
7#include <functional>
8#include <boost/algorithm/string.hpp>
9
10#include <common/logger_useful.h>
11#include <Common/StringUtils/StringUtils.h>
12#include <Common/PODArray.h>
13#include <Common/thread_local_rng.h>
14#include <Common/Exception.h>
15
16#include <Poco/Net/NetException.h>
17
18
19#define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000
20
21
22namespace DB
23{
24namespace ErrorCodes
25{
26 extern const int LOGICAL_ERROR;
27 extern const int NOT_IMPLEMENTED;
28}
29}
30
31
32namespace zkutil
33{
34
35const int CreateMode::Persistent = 0;
36const int CreateMode::Ephemeral = 1;
37const int CreateMode::PersistentSequential = 2;
38const int CreateMode::EphemeralSequential = 3;
39
40
41static void check(int32_t code, const std::string & path)
42{
43 if (code)
44 throw KeeperException(code, path);
45}
46
47
48void ZooKeeper::init(const std::string & implementation, const std::string & hosts_, const std::string & identity_,
49 int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
50{
51 log = &Logger::get("ZooKeeper");
52 hosts = hosts_;
53 identity = identity_;
54 session_timeout_ms = session_timeout_ms_;
55 operation_timeout_ms = operation_timeout_ms_;
56 chroot = chroot_;
57
58 if (implementation == "zookeeper")
59 {
60 if (hosts.empty())
61 throw KeeperException("No addresses passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
62
63 std::vector<std::string> addresses_strings;
64 boost::split(addresses_strings, hosts, boost::is_any_of(","));
65 Coordination::ZooKeeper::Addresses addresses;
66 addresses.reserve(addresses_strings.size());
67
68 for (const auto &address_string : addresses_strings)
69 {
70 try
71 {
72 addresses.emplace_back(address_string);
73 }
74 catch (const Poco::Net::DNSException &e)
75 {
76 LOG_ERROR(log, "Cannot use ZooKeeper address " << address_string << ", reason: " << e.displayText());
77 }
78 }
79
80 if (addresses.empty())
81 throw KeeperException("Cannot use any of provided ZooKeeper addresses", Coordination::ZBADARGUMENTS);
82
83 impl = std::make_unique<Coordination::ZooKeeper>(
84 addresses,
85 chroot,
86 identity_.empty() ? "" : "digest",
87 identity_,
88 Poco::Timespan(0, session_timeout_ms_ * 1000),
89 Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
90 Poco::Timespan(0, operation_timeout_ms_ * 1000));
91
92 LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
93 }
94 else if (implementation == "testkeeper")
95 {
96 impl = std::make_unique<Coordination::TestKeeper>(
97 chroot,
98 Poco::Timespan(0, operation_timeout_ms_ * 1000));
99 }
100 else
101 {
102 throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
103 }
104
105 if (!chroot.empty() && !exists("/"))
106 throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::ZNONODE);
107}
108
109ZooKeeper::ZooKeeper(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
110 int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation)
111{
112 init(implementation, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
113}
114
115struct ZooKeeperArgs
116{
117 ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
118 {
119 Poco::Util::AbstractConfiguration::Keys keys;
120 config.keys(config_name, keys);
121
122 std::vector<std::string> hosts_strings;
123
124 session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
125 operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT;
126 implementation = "zookeeper";
127 for (const auto & key : keys)
128 {
129 if (startsWith(key, "node"))
130 {
131 hosts_strings.push_back(
132 config.getString(config_name + "." + key + ".host") + ":"
133 + config.getString(config_name + "." + key + ".port", "2181")
134 );
135 }
136 else if (key == "session_timeout_ms")
137 {
138 session_timeout_ms = config.getInt(config_name + "." + key);
139 }
140 else if (key == "operation_timeout_ms")
141 {
142 operation_timeout_ms = config.getInt(config_name + "." + key);
143 }
144 else if (key == "identity")
145 {
146 identity = config.getString(config_name + "." + key);
147 }
148 else if (key == "root")
149 {
150 chroot = config.getString(config_name + "." + key);
151 }
152 else if (key == "implementation")
153 {
154 implementation = config.getString(config_name + "." + key);
155 }
156 else
157 throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::ZBADARGUMENTS);
158 }
159
160 /// Shuffle the hosts to distribute the load among ZooKeeper nodes.
161 std::random_device rd;
162 std::mt19937 g(rd());
163 std::shuffle(hosts_strings.begin(), hosts_strings.end(), g);
164
165 for (auto & host : hosts_strings)
166 {
167 if (hosts.size())
168 hosts += ",";
169 hosts += host;
170 }
171
172 if (!chroot.empty())
173 {
174 if (chroot.front() != '/')
175 throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot, Coordination::ZBADARGUMENTS);
176 if (chroot.back() == '/')
177 chroot.pop_back();
178 }
179 }
180
181 std::string hosts;
182 std::string identity;
183 int session_timeout_ms;
184 int operation_timeout_ms;
185 std::string chroot;
186 std::string implementation;
187};
188
189ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
190{
191 ZooKeeperArgs args(config, config_name);
192 init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
193}
194
195
196static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
197{
198 if (!watch)
199 return {};
200 return [watch](const Coordination::WatchResponse &) { watch->set(); };
201}
202
203
204int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
205 Coordination::Stat * stat,
206 Coordination::WatchCallback watch_callback)
207{
208 int32_t code = 0;
209 Poco::Event event;
210
211 auto callback = [&](const Coordination::ListResponse & response)
212 {
213 code = response.error;
214 if (!code)
215 {
216 res = response.names;
217 if (stat)
218 *stat = response.stat;
219 }
220 event.set();
221 };
222
223 impl->list(path, callback, watch_callback);
224 event.wait();
225 return code;
226}
227
228Strings ZooKeeper::getChildren(
229 const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
230{
231 Strings res;
232 check(tryGetChildren(path, res, stat, watch), path);
233 return res;
234}
235
236Strings ZooKeeper::getChildrenWatch(
237 const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
238{
239 Strings res;
240 check(tryGetChildrenWatch(path, res, stat, watch_callback), path);
241 return res;
242}
243
244int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
245 Coordination::Stat * stat, const EventPtr & watch)
246{
247 int32_t code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
248
249 if (!(code == Coordination::ZOK || code == Coordination::ZNONODE))
250 throw KeeperException(code, path);
251
252 return code;
253}
254
255int32_t ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res,
256 Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
257{
258 int32_t code = getChildrenImpl(path, res, stat, watch_callback);
259
260 if (!(code == Coordination::ZOK || code == Coordination::ZNONODE))
261 throw KeeperException(code, path);
262
263 return code;
264}
265
266int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
267{
268 int32_t code = 0;
269 Poco::Event event;
270
271 auto callback = [&](const Coordination::CreateResponse & response)
272 {
273 code = response.error;
274 if (!code)
275 path_created = response.path_created;
276 event.set();
277 };
278
279 impl->create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode
280 event.wait();
281 return code;
282}
283
284std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t type)
285{
286 std::string path_created;
287 check(tryCreate(path, data, type, path_created), path);
288 return path_created;
289}
290
291int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
292{
293 int32_t code = createImpl(path, data, mode, path_created);
294
295 if (!(code == Coordination::ZOK ||
296 code == Coordination::ZNONODE ||
297 code == Coordination::ZNODEEXISTS ||
298 code == Coordination::ZNOCHILDRENFOREPHEMERALS))
299 throw KeeperException(code, path);
300
301 return code;
302}
303
304int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode)
305{
306 std::string path_created;
307 return tryCreate(path, data, mode, path_created);
308}
309
310void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
311{
312 std::string path_created;
313 int32_t code = createImpl(path, data, CreateMode::Persistent, path_created);
314
315 if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS)
316 return;
317 else
318 throw KeeperException(code, path);
319}
320
321void ZooKeeper::createAncestors(const std::string & path)
322{
323 size_t pos = 1;
324 while (true)
325 {
326 pos = path.find('/', pos);
327 if (pos == std::string::npos)
328 break;
329 createIfNotExists(path.substr(0, pos), "");
330 ++pos;
331 }
332}
333
334int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version)
335{
336 int32_t code = 0;
337 Poco::Event event;
338
339 auto callback = [&](const Coordination::RemoveResponse & response)
340 {
341 if (response.error)
342 code = response.error;
343 event.set();
344 };
345
346 impl->remove(path, version, callback);
347 event.wait();
348 return code;
349}
350
351void ZooKeeper::remove(const std::string & path, int32_t version)
352{
353 check(tryRemove(path, version), path);
354}
355
356int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
357{
358 int32_t code = removeImpl(path, version);
359 if (!(code == Coordination::ZOK ||
360 code == Coordination::ZNONODE ||
361 code == Coordination::ZBADVERSION ||
362 code == Coordination::ZNOTEMPTY))
363 throw KeeperException(code, path);
364 return code;
365}
366
367int32_t ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
368{
369 int32_t code = 0;
370 Poco::Event event;
371
372 auto callback = [&](const Coordination::ExistsResponse & response)
373 {
374 code = response.error;
375 if (!code && stat)
376 *stat = response.stat;
377 event.set();
378 };
379
380 impl->exists(path, callback, watch_callback);
381 event.wait();
382 return code;
383}
384
385bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
386{
387 return existsWatch(path, stat, callbackForEvent(watch));
388}
389
390bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
391{
392 int32_t code = existsImpl(path, stat, watch_callback);
393
394 if (!(code == Coordination::ZOK || code == Coordination::ZNONODE))
395 throw KeeperException(code, path);
396 if (code == Coordination::ZNONODE)
397 return false;
398 return true;
399}
400
401int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
402{
403 int32_t code = 0;
404 Poco::Event event;
405
406 auto callback = [&](const Coordination::GetResponse & response)
407 {
408 code = response.error;
409 if (!code)
410 {
411 res = response.data;
412 if (stat)
413 *stat = response.stat;
414 }
415 event.set();
416 };
417
418 impl->get(path, callback, watch_callback);
419 event.wait();
420 return code;
421}
422
423
424std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
425{
426 int32_t code = 0;
427 std::string res;
428 if (tryGet(path, res, stat, watch, &code))
429 return res;
430 else
431 throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
432}
433
434std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
435{
436 int32_t code = 0;
437 std::string res;
438 if (tryGetWatch(path, res, stat, watch_callback, &code))
439 return res;
440 else
441 throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
442}
443
444bool ZooKeeper::tryGet(const std::string & path, std::string & res, Coordination::Stat * stat, const EventPtr & watch, int * return_code)
445{
446 return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
447}
448
449bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * return_code)
450{
451 int32_t code = getImpl(path, res, stat, watch_callback);
452
453 if (!(code == Coordination::ZOK || code == Coordination::ZNONODE))
454 throw KeeperException(code, path);
455
456 if (return_code)
457 *return_code = code;
458
459 return code == Coordination::ZOK;
460}
461
462int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data,
463 int32_t version, Coordination::Stat * stat)
464{
465 int32_t code = 0;
466 Poco::Event event;
467
468 auto callback = [&](const Coordination::SetResponse & response)
469 {
470 code = response.error;
471 if (!code && stat)
472 *stat = response.stat;
473 event.set();
474 };
475
476 impl->set(path, data, version, callback);
477 event.wait();
478 return code;
479}
480
481void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat)
482{
483 check(trySet(path, data, version, stat), path);
484}
485
486void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode)
487{
488 int32_t code = trySet(path, data, -1);
489 if (code == Coordination::ZNONODE)
490 {
491 create(path, data, mode);
492 }
493 else if (code != Coordination::ZOK)
494 throw KeeperException(code, path);
495}
496
497int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
498 int32_t version, Coordination::Stat * stat)
499{
500 int32_t code = setImpl(path, data, version, stat);
501
502 if (!(code == Coordination::ZOK ||
503 code == Coordination::ZNONODE ||
504 code == Coordination::ZBADVERSION))
505 throw KeeperException(code, path);
506 return code;
507}
508
509
510int32_t ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses)
511{
512 if (requests.empty())
513 return Coordination::ZOK;
514
515 int32_t code = 0;
516 Poco::Event event;
517
518 auto callback = [&](const Coordination::MultiResponse & response)
519 {
520 code = response.error;
521 responses = response.responses;
522 event.set();
523 };
524
525 impl->multi(requests, callback);
526 event.wait();
527 return code;
528}
529
530Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests)
531{
532 Coordination::Responses responses;
533 int32_t code = multiImpl(requests, responses);
534 KeeperMultiException::check(code, requests, responses);
535 return responses;
536}
537
538int32_t ZooKeeper::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses)
539{
540 int32_t code = multiImpl(requests, responses);
541 if (code && !Coordination::isUserError(code))
542 throw KeeperException(code);
543 return code;
544}
545
546
547void ZooKeeper::removeChildren(const std::string & path)
548{
549 Strings children = getChildren(path);
550 while (!children.empty())
551 {
552 Coordination::Requests ops;
553 for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
554 {
555 ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1));
556 children.pop_back();
557 }
558 multi(ops);
559 }
560}
561
562
563void ZooKeeper::removeChildrenRecursive(const std::string & path)
564{
565 Strings children = getChildren(path);
566 while (!children.empty())
567 {
568 Coordination::Requests ops;
569 for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
570 {
571 removeChildrenRecursive(path + "/" + children.back());
572 ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1));
573 children.pop_back();
574 }
575 multi(ops);
576 }
577}
578
579void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
580{
581 Strings children;
582 if (tryGetChildren(path, children) != Coordination::ZOK)
583 return;
584 while (!children.empty())
585 {
586 Coordination::Requests ops;
587 Strings batch;
588 for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
589 {
590 batch.push_back(path + "/" + children.back());
591 children.pop_back();
592 tryRemoveChildrenRecursive(batch.back());
593
594 Coordination::RemoveRequest request;
595 request.path = batch.back();
596
597 ops.emplace_back(std::make_shared<Coordination::RemoveRequest>(std::move(request)));
598 }
599
600 /// Try to remove the children with a faster method - in bulk. If this fails,
601 /// this means someone is concurrently removing these children and we will have
602 /// to remove them one by one.
603 Coordination::Responses responses;
604 if (tryMulti(ops, responses) != Coordination::ZOK)
605 for (const std::string & child : batch)
606 tryRemove(child);
607 }
608}
609
610void ZooKeeper::removeRecursive(const std::string & path)
611{
612 removeChildrenRecursive(path);
613 remove(path);
614}
615
616void ZooKeeper::tryRemoveRecursive(const std::string & path)
617{
618 tryRemoveChildrenRecursive(path);
619 tryRemove(path);
620}
621
622
623namespace
624{
625 struct WaitForDisappearState
626 {
627 int32_t code = 0;
628 int32_t event_type = 0;
629 Poco::Event event;
630 };
631 using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>;
632}
633
634void ZooKeeper::waitForDisappear(const std::string & path)
635{
636 WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
637
638 while (true)
639 {
640 auto callback = [state](const Coordination::ExistsResponse & response)
641 {
642 state->code = response.error;
643 if (state->code)
644 state->event.set();
645 };
646
647 auto watch = [state](const Coordination::WatchResponse & response)
648 {
649 if (!state->code)
650 {
651 state->code = response.error;
652 if (!state->code)
653 state->event_type = response.type;
654 state->event.set();
655 }
656 };
657
658 /// NOTE: if the node doesn't exist, the watch will leak.
659
660 impl->exists(path, callback, watch);
661 state->event.wait();
662
663 if (state->code == Coordination::ZNONODE)
664 return;
665
666 if (state->code)
667 throw KeeperException(state->code, path);
668
669 if (state->event_type == Coordination::DELETED)
670 return;
671 }
672}
673
674ZooKeeperPtr ZooKeeper::startNewSession() const
675{
676 return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
677}
678
679
680std::string ZooKeeper::error2string(int32_t code)
681{
682 return Coordination::errorMessage(code);
683}
684
685bool ZooKeeper::expired()
686{
687 return impl->isExpired();
688}
689
690Int64 ZooKeeper::getClientID()
691{
692 return impl->getSessionID();
693}
694
695
696std::future<Coordination::CreateResponse> ZooKeeper::asyncCreate(const std::string & path, const std::string & data, int32_t mode)
697{
698 /// https://stackoverflow.com/questions/25421346/how-to-create-an-stdfunction-from-a-move-capturing-lambda-expression
699 auto promise = std::make_shared<std::promise<Coordination::CreateResponse>>();
700 auto future = promise->get_future();
701
702 auto callback = [promise, path](const Coordination::CreateResponse & response) mutable
703 {
704 if (response.error)
705 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
706 else
707 promise->set_value(response);
708 };
709
710 impl->create(path, data, mode & 1, mode & 2, {}, std::move(callback));
711 return future;
712}
713
714
715std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & path)
716{
717 auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
718 auto future = promise->get_future();
719
720 auto callback = [promise, path](const Coordination::GetResponse & response) mutable
721 {
722 if (response.error)
723 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
724 else
725 promise->set_value(response);
726 };
727
728 impl->get(path, std::move(callback), {});
729 return future;
730}
731
732
733std::future<Coordination::GetResponse> ZooKeeper::asyncTryGet(const std::string & path)
734{
735 auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
736 auto future = promise->get_future();
737
738 auto callback = [promise, path](const Coordination::GetResponse & response) mutable
739 {
740 if (response.error && response.error != Coordination::ZNONODE)
741 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
742 else
743 promise->set_value(response);
744 };
745
746 impl->get(path, std::move(callback), {});
747 return future;
748}
749
750std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::string & path)
751{
752 auto promise = std::make_shared<std::promise<Coordination::ExistsResponse>>();
753 auto future = promise->get_future();
754
755 auto callback = [promise, path](const Coordination::ExistsResponse & response) mutable
756 {
757 if (response.error && response.error != Coordination::ZNONODE)
758 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
759 else
760 promise->set_value(response);
761 };
762
763 impl->exists(path, std::move(callback), {});
764 return future;
765}
766
767std::future<Coordination::SetResponse> ZooKeeper::asyncSet(const std::string & path, const std::string & data, int32_t version)
768{
769 auto promise = std::make_shared<std::promise<Coordination::SetResponse>>();
770 auto future = promise->get_future();
771
772 auto callback = [promise, path](const Coordination::SetResponse & response) mutable
773 {
774 if (response.error)
775 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
776 else
777 promise->set_value(response);
778 };
779
780 impl->set(path, data, version, std::move(callback));
781 return future;
782}
783
784std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path)
785{
786 auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
787 auto future = promise->get_future();
788
789 auto callback = [promise, path](const Coordination::ListResponse & response) mutable
790 {
791 if (response.error)
792 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
793 else
794 promise->set_value(response);
795 };
796
797 impl->list(path, std::move(callback), {});
798 return future;
799}
800
801std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::string & path, int32_t version)
802{
803 auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
804 auto future = promise->get_future();
805
806 auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable
807 {
808 if (response.error)
809 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
810 else
811 promise->set_value(response);
812 };
813
814 impl->remove(path, version, std::move(callback));
815 return future;
816}
817
818std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::string & path, int32_t version)
819{
820 auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
821 auto future = promise->get_future();
822
823 auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable
824 {
825 if (response.error && response.error != Coordination::ZNONODE && response.error != Coordination::ZBADVERSION && response.error != Coordination::ZNOTEMPTY)
826 promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
827 else
828 promise->set_value(response);
829 };
830
831 impl->remove(path, version, std::move(callback));
832 return future;
833}
834
835std::future<Coordination::MultiResponse> ZooKeeper::tryAsyncMulti(const Coordination::Requests & ops)
836{
837 auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>();
838 auto future = promise->get_future();
839
840 auto callback = [promise](const Coordination::MultiResponse & response) mutable
841 {
842 promise->set_value(response);
843 };
844
845 impl->multi(ops, std::move(callback));
846 return future;
847}
848
849std::future<Coordination::MultiResponse> ZooKeeper::asyncMulti(const Coordination::Requests & ops)
850{
851 auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>();
852 auto future = promise->get_future();
853
854 auto callback = [promise](const Coordination::MultiResponse & response) mutable
855 {
856 if (response.error)
857 promise->set_exception(std::make_exception_ptr(KeeperException(response.error)));
858 else
859 promise->set_value(response);
860 };
861
862 impl->multi(ops, std::move(callback));
863 return future;
864}
865
866int32_t ZooKeeper::tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses)
867{
868 try
869 {
870 return multiImpl(requests, responses);
871 }
872 catch (const Coordination::Exception & e)
873 {
874 return e.code;
875 }
876}
877
878
879size_t KeeperMultiException::getFailedOpIndex(int32_t exception_code, const Coordination::Responses & responses)
880{
881 if (responses.empty())
882 throw DB::Exception("Responses for multi transaction is empty", DB::ErrorCodes::LOGICAL_ERROR);
883
884 for (size_t index = 0, size = responses.size(); index < size; ++index)
885 if (responses[index]->error)
886 return index;
887
888 if (!Coordination::isUserError(exception_code))
889 throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(exception_code) + "' is not valid response code for that",
890 DB::ErrorCodes::LOGICAL_ERROR);
891
892 throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
893}
894
895
896KeeperMultiException::KeeperMultiException(int32_t exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
897 : KeeperException("Transaction failed", exception_code),
898 requests(requests_), responses(responses_), failed_op_index(getFailedOpIndex(exception_code, responses))
899{
900 addMessage("Op #" + std::to_string(failed_op_index) + ", path: " + getPathForFirstFailedOp());
901}
902
903
904std::string KeeperMultiException::getPathForFirstFailedOp() const
905{
906 return requests[failed_op_index]->getPath();
907}
908
909void KeeperMultiException::check(int32_t exception_code, const Coordination::Requests & requests, const Coordination::Responses & responses)
910{
911 if (!exception_code)
912 return;
913
914 if (Coordination::isUserError(exception_code))
915 throw KeeperMultiException(exception_code, requests, responses);
916 else
917 throw KeeperException(exception_code);
918}
919
920
921Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode)
922{
923 auto request = std::make_shared<Coordination::CreateRequest>();
924 request->path = path;
925 request->data = data;
926 request->is_ephemeral = create_mode == CreateMode::Ephemeral || create_mode == CreateMode::EphemeralSequential;
927 request->is_sequential = create_mode == CreateMode::PersistentSequential || create_mode == CreateMode::EphemeralSequential;
928 return request;
929}
930
931Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version)
932{
933 auto request = std::make_shared<Coordination::RemoveRequest>();
934 request->path = path;
935 request->version = version;
936 return request;
937}
938
939Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version)
940{
941 auto request = std::make_shared<Coordination::SetRequest>();
942 request->path = path;
943 request->data = data;
944 request->version = version;
945 return request;
946}
947
948Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
949{
950 auto request = std::make_shared<Coordination::CheckRequest>();
951 request->path = path;
952 request->version = version;
953 return request;
954}
955}
956