1 | #include "ZooKeeper.h" |
2 | #include "ZooKeeperImpl.h" |
3 | #include "KeeperException.h" |
4 | #include "TestKeeper.h" |
5 | |
6 | #include <random> |
7 | #include <functional> |
8 | #include <boost/algorithm/string.hpp> |
9 | |
10 | #include <common/logger_useful.h> |
11 | #include <Common/StringUtils/StringUtils.h> |
12 | #include <Common/PODArray.h> |
13 | #include <Common/thread_local_rng.h> |
14 | #include <Common/Exception.h> |
15 | |
16 | #include <Poco/Net/NetException.h> |
17 | |
18 | |
19 | #define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000 |
20 | |
21 | |
22 | namespace DB |
23 | { |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int LOGICAL_ERROR; |
27 | extern const int NOT_IMPLEMENTED; |
28 | } |
29 | } |
30 | |
31 | |
32 | namespace zkutil |
33 | { |
34 | |
35 | const int CreateMode::Persistent = 0; |
36 | const int CreateMode::Ephemeral = 1; |
37 | const int CreateMode::PersistentSequential = 2; |
38 | const int CreateMode::EphemeralSequential = 3; |
39 | |
40 | |
41 | static void check(int32_t code, const std::string & path) |
42 | { |
43 | if (code) |
44 | throw KeeperException(code, path); |
45 | } |
46 | |
47 | |
48 | void ZooKeeper::init(const std::string & implementation, const std::string & hosts_, const std::string & identity_, |
49 | int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_) |
50 | { |
51 | log = &Logger::get("ZooKeeper" ); |
52 | hosts = hosts_; |
53 | identity = identity_; |
54 | session_timeout_ms = session_timeout_ms_; |
55 | operation_timeout_ms = operation_timeout_ms_; |
56 | chroot = chroot_; |
57 | |
58 | if (implementation == "zookeeper" ) |
59 | { |
60 | if (hosts.empty()) |
61 | throw KeeperException("No addresses passed to ZooKeeper constructor." , Coordination::ZBADARGUMENTS); |
62 | |
63 | std::vector<std::string> addresses_strings; |
64 | boost::split(addresses_strings, hosts, boost::is_any_of("," )); |
65 | Coordination::ZooKeeper::Addresses addresses; |
66 | addresses.reserve(addresses_strings.size()); |
67 | |
68 | for (const auto &address_string : addresses_strings) |
69 | { |
70 | try |
71 | { |
72 | addresses.emplace_back(address_string); |
73 | } |
74 | catch (const Poco::Net::DNSException &e) |
75 | { |
76 | LOG_ERROR(log, "Cannot use ZooKeeper address " << address_string << ", reason: " << e.displayText()); |
77 | } |
78 | } |
79 | |
80 | if (addresses.empty()) |
81 | throw KeeperException("Cannot use any of provided ZooKeeper addresses" , Coordination::ZBADARGUMENTS); |
82 | |
83 | impl = std::make_unique<Coordination::ZooKeeper>( |
84 | addresses, |
85 | chroot, |
86 | identity_.empty() ? "" : "digest" , |
87 | identity_, |
88 | Poco::Timespan(0, session_timeout_ms_ * 1000), |
89 | Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000), |
90 | Poco::Timespan(0, operation_timeout_ms_ * 1000)); |
91 | |
92 | LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot)); |
93 | } |
94 | else if (implementation == "testkeeper" ) |
95 | { |
96 | impl = std::make_unique<Coordination::TestKeeper>( |
97 | chroot, |
98 | Poco::Timespan(0, operation_timeout_ms_ * 1000)); |
99 | } |
100 | else |
101 | { |
102 | throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED); |
103 | } |
104 | |
105 | if (!chroot.empty() && !exists("/" )) |
106 | throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start." , Coordination::ZNONODE); |
107 | } |
108 | |
109 | ZooKeeper::ZooKeeper(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_, |
110 | int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation) |
111 | { |
112 | init(implementation, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_); |
113 | } |
114 | |
115 | struct ZooKeeperArgs |
116 | { |
117 | ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) |
118 | { |
119 | Poco::Util::AbstractConfiguration::Keys keys; |
120 | config.keys(config_name, keys); |
121 | |
122 | std::vector<std::string> hosts_strings; |
123 | |
124 | session_timeout_ms = DEFAULT_SESSION_TIMEOUT; |
125 | operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT; |
126 | implementation = "zookeeper" ; |
127 | for (const auto & key : keys) |
128 | { |
129 | if (startsWith(key, "node" )) |
130 | { |
131 | hosts_strings.push_back( |
132 | config.getString(config_name + "." + key + ".host" ) + ":" |
133 | + config.getString(config_name + "." + key + ".port" , "2181" ) |
134 | ); |
135 | } |
136 | else if (key == "session_timeout_ms" ) |
137 | { |
138 | session_timeout_ms = config.getInt(config_name + "." + key); |
139 | } |
140 | else if (key == "operation_timeout_ms" ) |
141 | { |
142 | operation_timeout_ms = config.getInt(config_name + "." + key); |
143 | } |
144 | else if (key == "identity" ) |
145 | { |
146 | identity = config.getString(config_name + "." + key); |
147 | } |
148 | else if (key == "root" ) |
149 | { |
150 | chroot = config.getString(config_name + "." + key); |
151 | } |
152 | else if (key == "implementation" ) |
153 | { |
154 | implementation = config.getString(config_name + "." + key); |
155 | } |
156 | else |
157 | throw KeeperException(std::string("Unknown key " ) + key + " in config file" , Coordination::ZBADARGUMENTS); |
158 | } |
159 | |
160 | /// Shuffle the hosts to distribute the load among ZooKeeper nodes. |
161 | std::random_device rd; |
162 | std::mt19937 g(rd()); |
163 | std::shuffle(hosts_strings.begin(), hosts_strings.end(), g); |
164 | |
165 | for (auto & host : hosts_strings) |
166 | { |
167 | if (hosts.size()) |
168 | hosts += "," ; |
169 | hosts += host; |
170 | } |
171 | |
172 | if (!chroot.empty()) |
173 | { |
174 | if (chroot.front() != '/') |
175 | throw KeeperException(std::string("Root path in config file should start with '/', but got " ) + chroot, Coordination::ZBADARGUMENTS); |
176 | if (chroot.back() == '/') |
177 | chroot.pop_back(); |
178 | } |
179 | } |
180 | |
181 | std::string hosts; |
182 | std::string identity; |
183 | int session_timeout_ms; |
184 | int operation_timeout_ms; |
185 | std::string chroot; |
186 | std::string implementation; |
187 | }; |
188 | |
189 | ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) |
190 | { |
191 | ZooKeeperArgs args(config, config_name); |
192 | init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot); |
193 | } |
194 | |
195 | |
196 | static Coordination::WatchCallback callbackForEvent(const EventPtr & watch) |
197 | { |
198 | if (!watch) |
199 | return {}; |
200 | return [watch](const Coordination::WatchResponse &) { watch->set(); }; |
201 | } |
202 | |
203 | |
204 | int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res, |
205 | Coordination::Stat * stat, |
206 | Coordination::WatchCallback watch_callback) |
207 | { |
208 | int32_t code = 0; |
209 | Poco::Event event; |
210 | |
211 | auto callback = [&](const Coordination::ListResponse & response) |
212 | { |
213 | code = response.error; |
214 | if (!code) |
215 | { |
216 | res = response.names; |
217 | if (stat) |
218 | *stat = response.stat; |
219 | } |
220 | event.set(); |
221 | }; |
222 | |
223 | impl->list(path, callback, watch_callback); |
224 | event.wait(); |
225 | return code; |
226 | } |
227 | |
228 | Strings ZooKeeper::getChildren( |
229 | const std::string & path, Coordination::Stat * stat, const EventPtr & watch) |
230 | { |
231 | Strings res; |
232 | check(tryGetChildren(path, res, stat, watch), path); |
233 | return res; |
234 | } |
235 | |
236 | Strings ZooKeeper::getChildrenWatch( |
237 | const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) |
238 | { |
239 | Strings res; |
240 | check(tryGetChildrenWatch(path, res, stat, watch_callback), path); |
241 | return res; |
242 | } |
243 | |
244 | int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res, |
245 | Coordination::Stat * stat, const EventPtr & watch) |
246 | { |
247 | int32_t code = getChildrenImpl(path, res, stat, callbackForEvent(watch)); |
248 | |
249 | if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) |
250 | throw KeeperException(code, path); |
251 | |
252 | return code; |
253 | } |
254 | |
255 | int32_t ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res, |
256 | Coordination::Stat * stat, Coordination::WatchCallback watch_callback) |
257 | { |
258 | int32_t code = getChildrenImpl(path, res, stat, watch_callback); |
259 | |
260 | if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) |
261 | throw KeeperException(code, path); |
262 | |
263 | return code; |
264 | } |
265 | |
266 | int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) |
267 | { |
268 | int32_t code = 0; |
269 | Poco::Event event; |
270 | |
271 | auto callback = [&](const Coordination::CreateResponse & response) |
272 | { |
273 | code = response.error; |
274 | if (!code) |
275 | path_created = response.path_created; |
276 | event.set(); |
277 | }; |
278 | |
279 | impl->create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode |
280 | event.wait(); |
281 | return code; |
282 | } |
283 | |
284 | std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t type) |
285 | { |
286 | std::string path_created; |
287 | check(tryCreate(path, data, type, path_created), path); |
288 | return path_created; |
289 | } |
290 | |
291 | int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) |
292 | { |
293 | int32_t code = createImpl(path, data, mode, path_created); |
294 | |
295 | if (!(code == Coordination::ZOK || |
296 | code == Coordination::ZNONODE || |
297 | code == Coordination::ZNODEEXISTS || |
298 | code == Coordination::ZNOCHILDRENFOREPHEMERALS)) |
299 | throw KeeperException(code, path); |
300 | |
301 | return code; |
302 | } |
303 | |
304 | int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode) |
305 | { |
306 | std::string path_created; |
307 | return tryCreate(path, data, mode, path_created); |
308 | } |
309 | |
310 | void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data) |
311 | { |
312 | std::string path_created; |
313 | int32_t code = createImpl(path, data, CreateMode::Persistent, path_created); |
314 | |
315 | if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS) |
316 | return; |
317 | else |
318 | throw KeeperException(code, path); |
319 | } |
320 | |
321 | void ZooKeeper::createAncestors(const std::string & path) |
322 | { |
323 | size_t pos = 1; |
324 | while (true) |
325 | { |
326 | pos = path.find('/', pos); |
327 | if (pos == std::string::npos) |
328 | break; |
329 | createIfNotExists(path.substr(0, pos), "" ); |
330 | ++pos; |
331 | } |
332 | } |
333 | |
334 | int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version) |
335 | { |
336 | int32_t code = 0; |
337 | Poco::Event event; |
338 | |
339 | auto callback = [&](const Coordination::RemoveResponse & response) |
340 | { |
341 | if (response.error) |
342 | code = response.error; |
343 | event.set(); |
344 | }; |
345 | |
346 | impl->remove(path, version, callback); |
347 | event.wait(); |
348 | return code; |
349 | } |
350 | |
351 | void ZooKeeper::remove(const std::string & path, int32_t version) |
352 | { |
353 | check(tryRemove(path, version), path); |
354 | } |
355 | |
356 | int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version) |
357 | { |
358 | int32_t code = removeImpl(path, version); |
359 | if (!(code == Coordination::ZOK || |
360 | code == Coordination::ZNONODE || |
361 | code == Coordination::ZBADVERSION || |
362 | code == Coordination::ZNOTEMPTY)) |
363 | throw KeeperException(code, path); |
364 | return code; |
365 | } |
366 | |
367 | int32_t ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) |
368 | { |
369 | int32_t code = 0; |
370 | Poco::Event event; |
371 | |
372 | auto callback = [&](const Coordination::ExistsResponse & response) |
373 | { |
374 | code = response.error; |
375 | if (!code && stat) |
376 | *stat = response.stat; |
377 | event.set(); |
378 | }; |
379 | |
380 | impl->exists(path, callback, watch_callback); |
381 | event.wait(); |
382 | return code; |
383 | } |
384 | |
385 | bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, const EventPtr & watch) |
386 | { |
387 | return existsWatch(path, stat, callbackForEvent(watch)); |
388 | } |
389 | |
390 | bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) |
391 | { |
392 | int32_t code = existsImpl(path, stat, watch_callback); |
393 | |
394 | if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) |
395 | throw KeeperException(code, path); |
396 | if (code == Coordination::ZNONODE) |
397 | return false; |
398 | return true; |
399 | } |
400 | |
401 | int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) |
402 | { |
403 | int32_t code = 0; |
404 | Poco::Event event; |
405 | |
406 | auto callback = [&](const Coordination::GetResponse & response) |
407 | { |
408 | code = response.error; |
409 | if (!code) |
410 | { |
411 | res = response.data; |
412 | if (stat) |
413 | *stat = response.stat; |
414 | } |
415 | event.set(); |
416 | }; |
417 | |
418 | impl->get(path, callback, watch_callback); |
419 | event.wait(); |
420 | return code; |
421 | } |
422 | |
423 | |
424 | std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat, const EventPtr & watch) |
425 | { |
426 | int32_t code = 0; |
427 | std::string res; |
428 | if (tryGet(path, res, stat, watch, &code)) |
429 | return res; |
430 | else |
431 | throw KeeperException("Can't get data for node " + path + ": node doesn't exist" , code); |
432 | } |
433 | |
434 | std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) |
435 | { |
436 | int32_t code = 0; |
437 | std::string res; |
438 | if (tryGetWatch(path, res, stat, watch_callback, &code)) |
439 | return res; |
440 | else |
441 | throw KeeperException("Can't get data for node " + path + ": node doesn't exist" , code); |
442 | } |
443 | |
444 | bool ZooKeeper::tryGet(const std::string & path, std::string & res, Coordination::Stat * stat, const EventPtr & watch, int * return_code) |
445 | { |
446 | return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code); |
447 | } |
448 | |
449 | bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * return_code) |
450 | { |
451 | int32_t code = getImpl(path, res, stat, watch_callback); |
452 | |
453 | if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) |
454 | throw KeeperException(code, path); |
455 | |
456 | if (return_code) |
457 | *return_code = code; |
458 | |
459 | return code == Coordination::ZOK; |
460 | } |
461 | |
462 | int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data, |
463 | int32_t version, Coordination::Stat * stat) |
464 | { |
465 | int32_t code = 0; |
466 | Poco::Event event; |
467 | |
468 | auto callback = [&](const Coordination::SetResponse & response) |
469 | { |
470 | code = response.error; |
471 | if (!code && stat) |
472 | *stat = response.stat; |
473 | event.set(); |
474 | }; |
475 | |
476 | impl->set(path, data, version, callback); |
477 | event.wait(); |
478 | return code; |
479 | } |
480 | |
481 | void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat) |
482 | { |
483 | check(trySet(path, data, version, stat), path); |
484 | } |
485 | |
486 | void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode) |
487 | { |
488 | int32_t code = trySet(path, data, -1); |
489 | if (code == Coordination::ZNONODE) |
490 | { |
491 | create(path, data, mode); |
492 | } |
493 | else if (code != Coordination::ZOK) |
494 | throw KeeperException(code, path); |
495 | } |
496 | |
497 | int32_t ZooKeeper::trySet(const std::string & path, const std::string & data, |
498 | int32_t version, Coordination::Stat * stat) |
499 | { |
500 | int32_t code = setImpl(path, data, version, stat); |
501 | |
502 | if (!(code == Coordination::ZOK || |
503 | code == Coordination::ZNONODE || |
504 | code == Coordination::ZBADVERSION)) |
505 | throw KeeperException(code, path); |
506 | return code; |
507 | } |
508 | |
509 | |
510 | int32_t ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses) |
511 | { |
512 | if (requests.empty()) |
513 | return Coordination::ZOK; |
514 | |
515 | int32_t code = 0; |
516 | Poco::Event event; |
517 | |
518 | auto callback = [&](const Coordination::MultiResponse & response) |
519 | { |
520 | code = response.error; |
521 | responses = response.responses; |
522 | event.set(); |
523 | }; |
524 | |
525 | impl->multi(requests, callback); |
526 | event.wait(); |
527 | return code; |
528 | } |
529 | |
530 | Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests) |
531 | { |
532 | Coordination::Responses responses; |
533 | int32_t code = multiImpl(requests, responses); |
534 | KeeperMultiException::check(code, requests, responses); |
535 | return responses; |
536 | } |
537 | |
538 | int32_t ZooKeeper::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses) |
539 | { |
540 | int32_t code = multiImpl(requests, responses); |
541 | if (code && !Coordination::isUserError(code)) |
542 | throw KeeperException(code); |
543 | return code; |
544 | } |
545 | |
546 | |
547 | void ZooKeeper::removeChildren(const std::string & path) |
548 | { |
549 | Strings children = getChildren(path); |
550 | while (!children.empty()) |
551 | { |
552 | Coordination::Requests ops; |
553 | for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i) |
554 | { |
555 | ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1)); |
556 | children.pop_back(); |
557 | } |
558 | multi(ops); |
559 | } |
560 | } |
561 | |
562 | |
563 | void ZooKeeper::removeChildrenRecursive(const std::string & path) |
564 | { |
565 | Strings children = getChildren(path); |
566 | while (!children.empty()) |
567 | { |
568 | Coordination::Requests ops; |
569 | for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i) |
570 | { |
571 | removeChildrenRecursive(path + "/" + children.back()); |
572 | ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1)); |
573 | children.pop_back(); |
574 | } |
575 | multi(ops); |
576 | } |
577 | } |
578 | |
579 | void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path) |
580 | { |
581 | Strings children; |
582 | if (tryGetChildren(path, children) != Coordination::ZOK) |
583 | return; |
584 | while (!children.empty()) |
585 | { |
586 | Coordination::Requests ops; |
587 | Strings batch; |
588 | for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i) |
589 | { |
590 | batch.push_back(path + "/" + children.back()); |
591 | children.pop_back(); |
592 | tryRemoveChildrenRecursive(batch.back()); |
593 | |
594 | Coordination::RemoveRequest request; |
595 | request.path = batch.back(); |
596 | |
597 | ops.emplace_back(std::make_shared<Coordination::RemoveRequest>(std::move(request))); |
598 | } |
599 | |
600 | /// Try to remove the children with a faster method - in bulk. If this fails, |
601 | /// this means someone is concurrently removing these children and we will have |
602 | /// to remove them one by one. |
603 | Coordination::Responses responses; |
604 | if (tryMulti(ops, responses) != Coordination::ZOK) |
605 | for (const std::string & child : batch) |
606 | tryRemove(child); |
607 | } |
608 | } |
609 | |
610 | void ZooKeeper::removeRecursive(const std::string & path) |
611 | { |
612 | removeChildrenRecursive(path); |
613 | remove(path); |
614 | } |
615 | |
616 | void ZooKeeper::tryRemoveRecursive(const std::string & path) |
617 | { |
618 | tryRemoveChildrenRecursive(path); |
619 | tryRemove(path); |
620 | } |
621 | |
622 | |
623 | namespace |
624 | { |
625 | struct WaitForDisappearState |
626 | { |
627 | int32_t code = 0; |
628 | int32_t event_type = 0; |
629 | Poco::Event event; |
630 | }; |
631 | using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>; |
632 | } |
633 | |
634 | void ZooKeeper::waitForDisappear(const std::string & path) |
635 | { |
636 | WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>(); |
637 | |
638 | while (true) |
639 | { |
640 | auto callback = [state](const Coordination::ExistsResponse & response) |
641 | { |
642 | state->code = response.error; |
643 | if (state->code) |
644 | state->event.set(); |
645 | }; |
646 | |
647 | auto watch = [state](const Coordination::WatchResponse & response) |
648 | { |
649 | if (!state->code) |
650 | { |
651 | state->code = response.error; |
652 | if (!state->code) |
653 | state->event_type = response.type; |
654 | state->event.set(); |
655 | } |
656 | }; |
657 | |
658 | /// NOTE: if the node doesn't exist, the watch will leak. |
659 | |
660 | impl->exists(path, callback, watch); |
661 | state->event.wait(); |
662 | |
663 | if (state->code == Coordination::ZNONODE) |
664 | return; |
665 | |
666 | if (state->code) |
667 | throw KeeperException(state->code, path); |
668 | |
669 | if (state->event_type == Coordination::DELETED) |
670 | return; |
671 | } |
672 | } |
673 | |
674 | ZooKeeperPtr ZooKeeper::startNewSession() const |
675 | { |
676 | return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot); |
677 | } |
678 | |
679 | |
680 | std::string ZooKeeper::error2string(int32_t code) |
681 | { |
682 | return Coordination::errorMessage(code); |
683 | } |
684 | |
685 | bool ZooKeeper::expired() |
686 | { |
687 | return impl->isExpired(); |
688 | } |
689 | |
690 | Int64 ZooKeeper::getClientID() |
691 | { |
692 | return impl->getSessionID(); |
693 | } |
694 | |
695 | |
696 | std::future<Coordination::CreateResponse> ZooKeeper::asyncCreate(const std::string & path, const std::string & data, int32_t mode) |
697 | { |
698 | /// https://stackoverflow.com/questions/25421346/how-to-create-an-stdfunction-from-a-move-capturing-lambda-expression |
699 | auto promise = std::make_shared<std::promise<Coordination::CreateResponse>>(); |
700 | auto future = promise->get_future(); |
701 | |
702 | auto callback = [promise, path](const Coordination::CreateResponse & response) mutable |
703 | { |
704 | if (response.error) |
705 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
706 | else |
707 | promise->set_value(response); |
708 | }; |
709 | |
710 | impl->create(path, data, mode & 1, mode & 2, {}, std::move(callback)); |
711 | return future; |
712 | } |
713 | |
714 | |
715 | std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & path) |
716 | { |
717 | auto promise = std::make_shared<std::promise<Coordination::GetResponse>>(); |
718 | auto future = promise->get_future(); |
719 | |
720 | auto callback = [promise, path](const Coordination::GetResponse & response) mutable |
721 | { |
722 | if (response.error) |
723 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
724 | else |
725 | promise->set_value(response); |
726 | }; |
727 | |
728 | impl->get(path, std::move(callback), {}); |
729 | return future; |
730 | } |
731 | |
732 | |
733 | std::future<Coordination::GetResponse> ZooKeeper::asyncTryGet(const std::string & path) |
734 | { |
735 | auto promise = std::make_shared<std::promise<Coordination::GetResponse>>(); |
736 | auto future = promise->get_future(); |
737 | |
738 | auto callback = [promise, path](const Coordination::GetResponse & response) mutable |
739 | { |
740 | if (response.error && response.error != Coordination::ZNONODE) |
741 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
742 | else |
743 | promise->set_value(response); |
744 | }; |
745 | |
746 | impl->get(path, std::move(callback), {}); |
747 | return future; |
748 | } |
749 | |
750 | std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::string & path) |
751 | { |
752 | auto promise = std::make_shared<std::promise<Coordination::ExistsResponse>>(); |
753 | auto future = promise->get_future(); |
754 | |
755 | auto callback = [promise, path](const Coordination::ExistsResponse & response) mutable |
756 | { |
757 | if (response.error && response.error != Coordination::ZNONODE) |
758 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
759 | else |
760 | promise->set_value(response); |
761 | }; |
762 | |
763 | impl->exists(path, std::move(callback), {}); |
764 | return future; |
765 | } |
766 | |
767 | std::future<Coordination::SetResponse> ZooKeeper::asyncSet(const std::string & path, const std::string & data, int32_t version) |
768 | { |
769 | auto promise = std::make_shared<std::promise<Coordination::SetResponse>>(); |
770 | auto future = promise->get_future(); |
771 | |
772 | auto callback = [promise, path](const Coordination::SetResponse & response) mutable |
773 | { |
774 | if (response.error) |
775 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
776 | else |
777 | promise->set_value(response); |
778 | }; |
779 | |
780 | impl->set(path, data, version, std::move(callback)); |
781 | return future; |
782 | } |
783 | |
784 | std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path) |
785 | { |
786 | auto promise = std::make_shared<std::promise<Coordination::ListResponse>>(); |
787 | auto future = promise->get_future(); |
788 | |
789 | auto callback = [promise, path](const Coordination::ListResponse & response) mutable |
790 | { |
791 | if (response.error) |
792 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
793 | else |
794 | promise->set_value(response); |
795 | }; |
796 | |
797 | impl->list(path, std::move(callback), {}); |
798 | return future; |
799 | } |
800 | |
801 | std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::string & path, int32_t version) |
802 | { |
803 | auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>(); |
804 | auto future = promise->get_future(); |
805 | |
806 | auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable |
807 | { |
808 | if (response.error) |
809 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
810 | else |
811 | promise->set_value(response); |
812 | }; |
813 | |
814 | impl->remove(path, version, std::move(callback)); |
815 | return future; |
816 | } |
817 | |
818 | std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::string & path, int32_t version) |
819 | { |
820 | auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>(); |
821 | auto future = promise->get_future(); |
822 | |
823 | auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable |
824 | { |
825 | if (response.error && response.error != Coordination::ZNONODE && response.error != Coordination::ZBADVERSION && response.error != Coordination::ZNOTEMPTY) |
826 | promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); |
827 | else |
828 | promise->set_value(response); |
829 | }; |
830 | |
831 | impl->remove(path, version, std::move(callback)); |
832 | return future; |
833 | } |
834 | |
835 | std::future<Coordination::MultiResponse> ZooKeeper::tryAsyncMulti(const Coordination::Requests & ops) |
836 | { |
837 | auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>(); |
838 | auto future = promise->get_future(); |
839 | |
840 | auto callback = [promise](const Coordination::MultiResponse & response) mutable |
841 | { |
842 | promise->set_value(response); |
843 | }; |
844 | |
845 | impl->multi(ops, std::move(callback)); |
846 | return future; |
847 | } |
848 | |
849 | std::future<Coordination::MultiResponse> ZooKeeper::asyncMulti(const Coordination::Requests & ops) |
850 | { |
851 | auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>(); |
852 | auto future = promise->get_future(); |
853 | |
854 | auto callback = [promise](const Coordination::MultiResponse & response) mutable |
855 | { |
856 | if (response.error) |
857 | promise->set_exception(std::make_exception_ptr(KeeperException(response.error))); |
858 | else |
859 | promise->set_value(response); |
860 | }; |
861 | |
862 | impl->multi(ops, std::move(callback)); |
863 | return future; |
864 | } |
865 | |
866 | int32_t ZooKeeper::tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses) |
867 | { |
868 | try |
869 | { |
870 | return multiImpl(requests, responses); |
871 | } |
872 | catch (const Coordination::Exception & e) |
873 | { |
874 | return e.code; |
875 | } |
876 | } |
877 | |
878 | |
879 | size_t KeeperMultiException::getFailedOpIndex(int32_t exception_code, const Coordination::Responses & responses) |
880 | { |
881 | if (responses.empty()) |
882 | throw DB::Exception("Responses for multi transaction is empty" , DB::ErrorCodes::LOGICAL_ERROR); |
883 | |
884 | for (size_t index = 0, size = responses.size(); index < size; ++index) |
885 | if (responses[index]->error) |
886 | return index; |
887 | |
888 | if (!Coordination::isUserError(exception_code)) |
889 | throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(exception_code) + "' is not valid response code for that" , |
890 | DB::ErrorCodes::LOGICAL_ERROR); |
891 | |
892 | throw DB::Exception("There is no failed OpResult" , DB::ErrorCodes::LOGICAL_ERROR); |
893 | } |
894 | |
895 | |
896 | KeeperMultiException::KeeperMultiException(int32_t exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_) |
897 | : KeeperException("Transaction failed" , exception_code), |
898 | requests(requests_), responses(responses_), failed_op_index(getFailedOpIndex(exception_code, responses)) |
899 | { |
900 | addMessage("Op #" + std::to_string(failed_op_index) + ", path: " + getPathForFirstFailedOp()); |
901 | } |
902 | |
903 | |
904 | std::string KeeperMultiException::getPathForFirstFailedOp() const |
905 | { |
906 | return requests[failed_op_index]->getPath(); |
907 | } |
908 | |
909 | void KeeperMultiException::check(int32_t exception_code, const Coordination::Requests & requests, const Coordination::Responses & responses) |
910 | { |
911 | if (!exception_code) |
912 | return; |
913 | |
914 | if (Coordination::isUserError(exception_code)) |
915 | throw KeeperMultiException(exception_code, requests, responses); |
916 | else |
917 | throw KeeperException(exception_code); |
918 | } |
919 | |
920 | |
921 | Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode) |
922 | { |
923 | auto request = std::make_shared<Coordination::CreateRequest>(); |
924 | request->path = path; |
925 | request->data = data; |
926 | request->is_ephemeral = create_mode == CreateMode::Ephemeral || create_mode == CreateMode::EphemeralSequential; |
927 | request->is_sequential = create_mode == CreateMode::PersistentSequential || create_mode == CreateMode::EphemeralSequential; |
928 | return request; |
929 | } |
930 | |
931 | Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version) |
932 | { |
933 | auto request = std::make_shared<Coordination::RemoveRequest>(); |
934 | request->path = path; |
935 | request->version = version; |
936 | return request; |
937 | } |
938 | |
939 | Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version) |
940 | { |
941 | auto request = std::make_shared<Coordination::SetRequest>(); |
942 | request->path = path; |
943 | request->data = data; |
944 | request->version = version; |
945 | return request; |
946 | } |
947 | |
948 | Coordination::RequestPtr makeCheckRequest(const std::string & path, int version) |
949 | { |
950 | auto request = std::make_shared<Coordination::CheckRequest>(); |
951 | request->path = path; |
952 | request->version = version; |
953 | return request; |
954 | } |
955 | } |
956 | |