1 | #include <Common/ZooKeeper/TestKeeper.h> |
2 | #include <Common/setThreadName.h> |
3 | #include <Common/StringUtils/StringUtils.h> |
4 | #include <Core/Types.h> |
5 | |
6 | #include <sstream> |
7 | #include <iomanip> |
8 | |
9 | |
10 | namespace Coordination |
11 | { |
12 | |
13 | static String parentPath(const String & path) |
14 | { |
15 | auto rslash_pos = path.rfind('/'); |
16 | if (rslash_pos > 0) |
17 | return path.substr(0, rslash_pos); |
18 | return "/" ; |
19 | } |
20 | |
21 | static String baseName(const String & path) |
22 | { |
23 | auto rslash_pos = path.rfind('/'); |
24 | return path.substr(rslash_pos + 1); |
25 | } |
26 | |
27 | |
28 | struct TestKeeperRequest : virtual Request |
29 | { |
30 | virtual bool isMutable() const { return false; } |
31 | virtual ResponsePtr createResponse() const = 0; |
32 | virtual ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const = 0; |
33 | virtual void processWatches(TestKeeper::Watches & /*watches*/, TestKeeper::Watches & /*list_watches*/) const {} |
34 | }; |
35 | |
36 | |
37 | static void processWatchesImpl(const String & path, TestKeeper::Watches & watches, TestKeeper::Watches & list_watches) |
38 | { |
39 | WatchResponse watch_response; |
40 | watch_response.path = path; |
41 | |
42 | auto it = watches.find(watch_response.path); |
43 | if (it != watches.end()) |
44 | { |
45 | for (auto & callback : it->second) |
46 | if (callback) |
47 | callback(watch_response); |
48 | |
49 | watches.erase(it); |
50 | } |
51 | |
52 | WatchResponse watch_list_response; |
53 | watch_list_response.path = parentPath(path); |
54 | |
55 | it = list_watches.find(watch_list_response.path); |
56 | if (it != list_watches.end()) |
57 | { |
58 | for (auto & callback : it->second) |
59 | if (callback) |
60 | callback(watch_list_response); |
61 | |
62 | list_watches.erase(it); |
63 | } |
64 | } |
65 | |
66 | |
67 | struct TestKeeperCreateRequest final : CreateRequest, TestKeeperRequest |
68 | { |
69 | TestKeeperCreateRequest() {} |
70 | TestKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {} |
71 | ResponsePtr createResponse() const override; |
72 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
73 | |
74 | void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override |
75 | { |
76 | processWatchesImpl(getPath(), node_watches, list_watches); |
77 | } |
78 | }; |
79 | |
80 | struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest |
81 | { |
82 | TestKeeperRemoveRequest() {} |
83 | TestKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} |
84 | bool isMutable() const override { return true; } |
85 | ResponsePtr createResponse() const override; |
86 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
87 | |
88 | void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override |
89 | { |
90 | processWatchesImpl(getPath(), node_watches, list_watches); |
91 | } |
92 | }; |
93 | |
94 | struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest |
95 | { |
96 | ResponsePtr createResponse() const override; |
97 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
98 | }; |
99 | |
100 | struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest |
101 | { |
102 | TestKeeperGetRequest() {} |
103 | ResponsePtr createResponse() const override; |
104 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
105 | }; |
106 | |
107 | struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest |
108 | { |
109 | TestKeeperSetRequest() {} |
110 | TestKeeperSetRequest(const SetRequest & base) : SetRequest(base) {} |
111 | bool isMutable() const override { return true; } |
112 | ResponsePtr createResponse() const override; |
113 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
114 | |
115 | void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override |
116 | { |
117 | processWatchesImpl(getPath(), node_watches, list_watches); |
118 | } |
119 | }; |
120 | |
121 | struct TestKeeperListRequest final : ListRequest, TestKeeperRequest |
122 | { |
123 | ResponsePtr createResponse() const override; |
124 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
125 | }; |
126 | |
127 | struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest |
128 | { |
129 | TestKeeperCheckRequest() {} |
130 | TestKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {} |
131 | ResponsePtr createResponse() const override; |
132 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
133 | }; |
134 | |
135 | struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest |
136 | { |
137 | TestKeeperMultiRequest(const Requests & generic_requests) |
138 | { |
139 | requests.reserve(generic_requests.size()); |
140 | |
141 | for (const auto & generic_request : generic_requests) |
142 | { |
143 | if (auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get())) |
144 | { |
145 | auto create = std::make_shared<TestKeeperCreateRequest>(*concrete_request_create); |
146 | requests.push_back(create); |
147 | } |
148 | else if (auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get())) |
149 | { |
150 | requests.push_back(std::make_shared<TestKeeperRemoveRequest>(*concrete_request_remove)); |
151 | } |
152 | else if (auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get())) |
153 | { |
154 | requests.push_back(std::make_shared<TestKeeperSetRequest>(*concrete_request_set)); |
155 | } |
156 | else if (auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get())) |
157 | { |
158 | requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check)); |
159 | } |
160 | else |
161 | throw Exception("Illegal command as part of multi ZooKeeper request" , ZBADARGUMENTS); |
162 | } |
163 | } |
164 | |
165 | void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override |
166 | { |
167 | for (const auto & generic_request : requests) |
168 | dynamic_cast<const TestKeeperRequest &>(*generic_request).processWatches(node_watches, list_watches); |
169 | } |
170 | |
171 | ResponsePtr createResponse() const override; |
172 | ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; |
173 | }; |
174 | |
175 | |
176 | ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const |
177 | { |
178 | CreateResponse response; |
179 | if (container.count(path)) |
180 | { |
181 | response.error = Error::ZNODEEXISTS; |
182 | } |
183 | else |
184 | { |
185 | auto it = container.find(parentPath(path)); |
186 | |
187 | if (it == container.end()) |
188 | { |
189 | response.error = Error::ZNONODE; |
190 | } |
191 | else if (it->second.is_ephemeral) |
192 | { |
193 | response.error = Error::ZNOCHILDRENFOREPHEMERALS; |
194 | } |
195 | else |
196 | { |
197 | TestKeeper::Node created_node; |
198 | created_node.seq_num = 0; |
199 | created_node.stat.czxid = zxid; |
200 | created_node.stat.mzxid = zxid; |
201 | created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); |
202 | created_node.stat.mtime = created_node.stat.ctime; |
203 | created_node.stat.numChildren = 0; |
204 | created_node.stat.dataLength = data.length(); |
205 | created_node.data = data; |
206 | created_node.is_ephemeral = is_ephemeral; |
207 | created_node.is_sequental = is_sequential; |
208 | std::string path_created = path; |
209 | |
210 | if (is_sequential) |
211 | { |
212 | auto seq_num = it->second.seq_num; |
213 | ++it->second.seq_num; |
214 | |
215 | std::stringstream seq_num_str; |
216 | seq_num_str << std::setw(10) << std::setfill('0') << seq_num; |
217 | |
218 | path_created += seq_num_str.str(); |
219 | } |
220 | |
221 | response.path_created = path_created; |
222 | container.emplace(std::move(path_created), std::move(created_node)); |
223 | |
224 | ++it->second.stat.cversion; |
225 | ++it->second.stat.numChildren; |
226 | |
227 | response.error = Error::ZOK; |
228 | } |
229 | } |
230 | |
231 | return std::make_shared<CreateResponse>(response); |
232 | } |
233 | |
234 | ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const |
235 | { |
236 | RemoveResponse response; |
237 | |
238 | auto it = container.find(path); |
239 | if (it == container.end()) |
240 | { |
241 | response.error = Error::ZNONODE; |
242 | } |
243 | else if (version != -1 && version != it->second.stat.version) |
244 | { |
245 | response.error = Error::ZBADVERSION; |
246 | } |
247 | else if (it->second.stat.numChildren) |
248 | { |
249 | response.error = Error::ZNOTEMPTY; |
250 | } |
251 | else |
252 | { |
253 | container.erase(it); |
254 | auto & parent = container.at(parentPath(path)); |
255 | --parent.stat.numChildren; |
256 | ++parent.stat.cversion; |
257 | response.error = Error::ZOK; |
258 | } |
259 | |
260 | return std::make_shared<RemoveResponse>(response); |
261 | } |
262 | |
263 | ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const |
264 | { |
265 | ExistsResponse response; |
266 | |
267 | auto it = container.find(path); |
268 | if (it != container.end()) |
269 | { |
270 | response.stat = it->second.stat; |
271 | response.error = Error::ZOK; |
272 | } |
273 | else |
274 | { |
275 | response.error = Error::ZNONODE; |
276 | } |
277 | |
278 | return std::make_shared<ExistsResponse>(response); |
279 | } |
280 | |
281 | ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const |
282 | { |
283 | GetResponse response; |
284 | |
285 | auto it = container.find(path); |
286 | if (it == container.end()) |
287 | { |
288 | response.error = Error::ZNONODE; |
289 | } |
290 | else |
291 | { |
292 | response.stat = it->second.stat; |
293 | response.data = it->second.data; |
294 | response.error = Error::ZOK; |
295 | } |
296 | |
297 | return std::make_shared<GetResponse>(response); |
298 | } |
299 | |
300 | ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const |
301 | { |
302 | SetResponse response; |
303 | |
304 | auto it = container.find(path); |
305 | if (it == container.end()) |
306 | { |
307 | response.error = Error::ZNONODE; |
308 | } |
309 | else if (version == -1 || version == it->second.stat.version) |
310 | { |
311 | it->second.data = data; |
312 | ++it->second.stat.version; |
313 | it->second.stat.mzxid = zxid; |
314 | it->second.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); |
315 | it->second.data = data; |
316 | ++container.at(parentPath(path)).stat.cversion; |
317 | response.stat = it->second.stat; |
318 | response.error = Error::ZOK; |
319 | } |
320 | else |
321 | { |
322 | response.error = Error::ZBADVERSION; |
323 | } |
324 | |
325 | return std::make_shared<SetResponse>(response); |
326 | } |
327 | |
328 | ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const |
329 | { |
330 | ListResponse response; |
331 | |
332 | auto it = container.find(path); |
333 | if (it == container.end()) |
334 | { |
335 | response.error = Error::ZNONODE; |
336 | } |
337 | else |
338 | { |
339 | auto path_prefix = path; |
340 | if (path_prefix.empty()) |
341 | throw Exception("Logical error: path cannot be empty" , ZSESSIONEXPIRED); |
342 | |
343 | if (path_prefix.back() != '/') |
344 | path_prefix += '/'; |
345 | |
346 | /// Fairly inefficient. |
347 | for (auto child_it = container.upper_bound(path_prefix); child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it) |
348 | if (parentPath(child_it->first) == path) |
349 | response.names.emplace_back(baseName(child_it->first)); |
350 | |
351 | response.stat = it->second.stat; |
352 | response.error = Error::ZOK; |
353 | } |
354 | |
355 | return std::make_shared<ListResponse>(response); |
356 | } |
357 | |
358 | ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const |
359 | { |
360 | CheckResponse response; |
361 | auto it = container.find(path); |
362 | if (it == container.end()) |
363 | { |
364 | response.error = Error::ZNONODE; |
365 | } |
366 | else if (version != -1 && version != it->second.stat.version) |
367 | { |
368 | response.error = Error::ZBADVERSION; |
369 | } |
370 | else |
371 | { |
372 | response.error = Error::ZOK; |
373 | } |
374 | |
375 | return std::make_shared<CheckResponse>(response); |
376 | } |
377 | |
378 | ResponsePtr TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const |
379 | { |
380 | MultiResponse response; |
381 | response.responses.reserve(requests.size()); |
382 | |
383 | /// Fairly inefficient. |
384 | auto container_copy = container; |
385 | |
386 | try |
387 | { |
388 | for (const auto & request : requests) |
389 | { |
390 | const TestKeeperRequest & concrete_request = dynamic_cast<const TestKeeperRequest &>(*request); |
391 | auto cur_response = concrete_request.process(container, zxid); |
392 | response.responses.emplace_back(cur_response); |
393 | if (cur_response->error != Error::ZOK) |
394 | { |
395 | response.error = cur_response->error; |
396 | container = container_copy; |
397 | return std::make_shared<MultiResponse>(response); |
398 | } |
399 | } |
400 | |
401 | response.error = Error::ZOK; |
402 | return std::make_shared<MultiResponse>(response); |
403 | } |
404 | catch (...) |
405 | { |
406 | container = container_copy; |
407 | throw; |
408 | } |
409 | } |
410 | |
411 | ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared<CreateResponse>(); } |
412 | ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared<RemoveResponse>(); } |
413 | ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared<ExistsResponse>(); } |
414 | ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared<GetResponse>(); } |
415 | ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); } |
416 | ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared<ListResponse>(); } |
417 | ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared<CheckResponse>(); } |
418 | ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); } |
419 | |
420 | |
421 | TestKeeper::TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_) |
422 | : root_path(root_path_), operation_timeout(operation_timeout_) |
423 | { |
424 | container.emplace("/" , Node()); |
425 | |
426 | if (!root_path.empty()) |
427 | { |
428 | if (root_path.back() == '/') |
429 | root_path.pop_back(); |
430 | } |
431 | |
432 | processing_thread = ThreadFromGlobalPool([this] { processingThread(); }); |
433 | } |
434 | |
435 | |
436 | TestKeeper::~TestKeeper() |
437 | { |
438 | try |
439 | { |
440 | finalize(); |
441 | if (processing_thread.joinable()) |
442 | processing_thread.join(); |
443 | } |
444 | catch (...) |
445 | { |
446 | tryLogCurrentException(__PRETTY_FUNCTION__); |
447 | } |
448 | } |
449 | |
450 | |
451 | void TestKeeper::processingThread() |
452 | { |
453 | setThreadName("TestKeeperProc" ); |
454 | |
455 | try |
456 | { |
457 | while (!expired) |
458 | { |
459 | RequestInfo info; |
460 | |
461 | UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); |
462 | if (requests_queue.tryPop(info, max_wait)) |
463 | { |
464 | if (expired) |
465 | break; |
466 | |
467 | if (info.watch) |
468 | { |
469 | auto & watches_type = dynamic_cast<const ListRequest *>(info.request.get()) |
470 | ? list_watches |
471 | : watches; |
472 | |
473 | watches_type[info.request->getPath()].emplace_back(std::move(info.watch)); |
474 | } |
475 | |
476 | ++zxid; |
477 | |
478 | info.request->addRootPath(root_path); |
479 | ResponsePtr response = info.request->process(container, zxid); |
480 | if (response->error == Error::ZOK) |
481 | info.request->processWatches(watches, list_watches); |
482 | |
483 | response->removeRootPath(root_path); |
484 | if (info.callback) |
485 | info.callback(*response); |
486 | } |
487 | } |
488 | } |
489 | catch (...) |
490 | { |
491 | tryLogCurrentException(__PRETTY_FUNCTION__); |
492 | finalize(); |
493 | } |
494 | } |
495 | |
496 | |
497 | void TestKeeper::finalize() |
498 | { |
499 | { |
500 | std::lock_guard lock(push_request_mutex); |
501 | |
502 | if (expired) |
503 | return; |
504 | expired = true; |
505 | } |
506 | |
507 | processing_thread.join(); |
508 | |
509 | try |
510 | { |
511 | { |
512 | for (auto & path_watch : watches) |
513 | { |
514 | WatchResponse response; |
515 | response.type = SESSION; |
516 | response.state = EXPIRED_SESSION; |
517 | response.error = ZSESSIONEXPIRED; |
518 | |
519 | for (auto & callback : path_watch.second) |
520 | { |
521 | if (callback) |
522 | { |
523 | try |
524 | { |
525 | callback(response); |
526 | } |
527 | catch (...) |
528 | { |
529 | tryLogCurrentException(__PRETTY_FUNCTION__); |
530 | } |
531 | } |
532 | } |
533 | } |
534 | |
535 | watches.clear(); |
536 | } |
537 | |
538 | RequestInfo info; |
539 | while (requests_queue.tryPop(info)) |
540 | { |
541 | if (info.callback) |
542 | { |
543 | ResponsePtr response = info.request->createResponse(); |
544 | response->error = ZSESSIONEXPIRED; |
545 | try |
546 | { |
547 | info.callback(*response); |
548 | } |
549 | catch (...) |
550 | { |
551 | tryLogCurrentException(__PRETTY_FUNCTION__); |
552 | } |
553 | } |
554 | if (info.watch) |
555 | { |
556 | WatchResponse response; |
557 | response.type = SESSION; |
558 | response.state = EXPIRED_SESSION; |
559 | response.error = ZSESSIONEXPIRED; |
560 | try |
561 | { |
562 | info.watch(response); |
563 | } |
564 | catch (...) |
565 | { |
566 | tryLogCurrentException(__PRETTY_FUNCTION__); |
567 | } |
568 | } |
569 | } |
570 | } |
571 | catch (...) |
572 | { |
573 | tryLogCurrentException(__PRETTY_FUNCTION__); |
574 | } |
575 | } |
576 | |
577 | void TestKeeper::pushRequest(RequestInfo && info) |
578 | { |
579 | try |
580 | { |
581 | info.time = clock::now(); |
582 | |
583 | /// We must serialize 'pushRequest' and 'finalize' (from processingThread) calls |
584 | /// to avoid forgotten operations in the queue when session is expired. |
585 | /// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest' |
586 | /// and the queue will be drained in 'finalize'. |
587 | std::lock_guard lock(push_request_mutex); |
588 | |
589 | if (expired) |
590 | throw Exception("Session expired" , ZSESSIONEXPIRED); |
591 | |
592 | if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) |
593 | throw Exception("Cannot push request to queue within operation timeout" , ZOPERATIONTIMEOUT); |
594 | } |
595 | catch (...) |
596 | { |
597 | finalize(); |
598 | throw; |
599 | } |
600 | } |
601 | |
602 | |
603 | void TestKeeper::create( |
604 | const String & path, |
605 | const String & data, |
606 | bool is_ephemeral, |
607 | bool is_sequential, |
608 | const ACLs &, |
609 | CreateCallback callback) |
610 | { |
611 | TestKeeperCreateRequest request; |
612 | request.path = path; |
613 | request.data = data; |
614 | request.is_ephemeral = is_ephemeral; |
615 | request.is_sequential = is_sequential; |
616 | |
617 | RequestInfo request_info; |
618 | request_info.request = std::make_shared<TestKeeperCreateRequest>(std::move(request)); |
619 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CreateResponse &>(response)); }; |
620 | pushRequest(std::move(request_info)); |
621 | } |
622 | |
623 | void TestKeeper::remove( |
624 | const String & path, |
625 | int32_t version, |
626 | RemoveCallback callback) |
627 | { |
628 | TestKeeperRemoveRequest request; |
629 | request.path = path; |
630 | request.version = version; |
631 | |
632 | RequestInfo request_info; |
633 | request_info.request = std::make_shared<TestKeeperRemoveRequest>(std::move(request)); |
634 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); }; |
635 | pushRequest(std::move(request_info)); |
636 | } |
637 | |
638 | void TestKeeper::exists( |
639 | const String & path, |
640 | ExistsCallback callback, |
641 | WatchCallback watch) |
642 | { |
643 | TestKeeperExistsRequest request; |
644 | request.path = path; |
645 | |
646 | RequestInfo request_info; |
647 | request_info.request = std::make_shared<TestKeeperExistsRequest>(std::move(request)); |
648 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ExistsResponse &>(response)); }; |
649 | request_info.watch = watch; |
650 | pushRequest(std::move(request_info)); |
651 | } |
652 | |
653 | void TestKeeper::get( |
654 | const String & path, |
655 | GetCallback callback, |
656 | WatchCallback watch) |
657 | { |
658 | TestKeeperGetRequest request; |
659 | request.path = path; |
660 | |
661 | RequestInfo request_info; |
662 | request_info.request = std::make_shared<TestKeeperGetRequest>(std::move(request)); |
663 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const GetResponse &>(response)); }; |
664 | request_info.watch = watch; |
665 | pushRequest(std::move(request_info)); |
666 | } |
667 | |
668 | void TestKeeper::set( |
669 | const String & path, |
670 | const String & data, |
671 | int32_t version, |
672 | SetCallback callback) |
673 | { |
674 | TestKeeperSetRequest request; |
675 | request.path = path; |
676 | request.data = data; |
677 | request.version = version; |
678 | |
679 | RequestInfo request_info; |
680 | request_info.request = std::make_shared<TestKeeperSetRequest>(std::move(request)); |
681 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SetResponse &>(response)); }; |
682 | pushRequest(std::move(request_info)); |
683 | } |
684 | |
685 | void TestKeeper::list( |
686 | const String & path, |
687 | ListCallback callback, |
688 | WatchCallback watch) |
689 | { |
690 | TestKeeperListRequest request; |
691 | request.path = path; |
692 | |
693 | RequestInfo request_info; |
694 | request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request)); |
695 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ListResponse &>(response)); }; |
696 | request_info.watch = watch; |
697 | pushRequest(std::move(request_info)); |
698 | } |
699 | |
700 | void TestKeeper::check( |
701 | const String & path, |
702 | int32_t version, |
703 | CheckCallback callback) |
704 | { |
705 | TestKeeperCheckRequest request; |
706 | request.path = path; |
707 | request.version = version; |
708 | |
709 | RequestInfo request_info; |
710 | request_info.request = std::make_shared<TestKeeperCheckRequest>(std::move(request)); |
711 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CheckResponse &>(response)); }; |
712 | pushRequest(std::move(request_info)); |
713 | } |
714 | |
715 | void TestKeeper::multi( |
716 | const Requests & requests, |
717 | MultiCallback callback) |
718 | { |
719 | TestKeeperMultiRequest request(requests); |
720 | |
721 | RequestInfo request_info; |
722 | request_info.request = std::make_shared<TestKeeperMultiRequest>(std::move(request)); |
723 | request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); }; |
724 | pushRequest(std::move(request_info)); |
725 | } |
726 | |
727 | } |
728 | |