1#include <Common/ZooKeeper/TestKeeper.h>
2#include <Common/setThreadName.h>
3#include <Common/StringUtils/StringUtils.h>
4#include <Core/Types.h>
5
6#include <sstream>
7#include <iomanip>
8
9
10namespace Coordination
11{
12
13static String parentPath(const String & path)
14{
15 auto rslash_pos = path.rfind('/');
16 if (rslash_pos > 0)
17 return path.substr(0, rslash_pos);
18 return "/";
19}
20
21static String baseName(const String & path)
22{
23 auto rslash_pos = path.rfind('/');
24 return path.substr(rslash_pos + 1);
25}
26
27
28struct TestKeeperRequest : virtual Request
29{
30 virtual bool isMutable() const { return false; }
31 virtual ResponsePtr createResponse() const = 0;
32 virtual ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const = 0;
33 virtual void processWatches(TestKeeper::Watches & /*watches*/, TestKeeper::Watches & /*list_watches*/) const {}
34};
35
36
37static void processWatchesImpl(const String & path, TestKeeper::Watches & watches, TestKeeper::Watches & list_watches)
38{
39 WatchResponse watch_response;
40 watch_response.path = path;
41
42 auto it = watches.find(watch_response.path);
43 if (it != watches.end())
44 {
45 for (auto & callback : it->second)
46 if (callback)
47 callback(watch_response);
48
49 watches.erase(it);
50 }
51
52 WatchResponse watch_list_response;
53 watch_list_response.path = parentPath(path);
54
55 it = list_watches.find(watch_list_response.path);
56 if (it != list_watches.end())
57 {
58 for (auto & callback : it->second)
59 if (callback)
60 callback(watch_list_response);
61
62 list_watches.erase(it);
63 }
64}
65
66
67struct TestKeeperCreateRequest final : CreateRequest, TestKeeperRequest
68{
69 TestKeeperCreateRequest() {}
70 TestKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
71 ResponsePtr createResponse() const override;
72 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
73
74 void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
75 {
76 processWatchesImpl(getPath(), node_watches, list_watches);
77 }
78};
79
80struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
81{
82 TestKeeperRemoveRequest() {}
83 TestKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
84 bool isMutable() const override { return true; }
85 ResponsePtr createResponse() const override;
86 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
87
88 void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
89 {
90 processWatchesImpl(getPath(), node_watches, list_watches);
91 }
92};
93
94struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
95{
96 ResponsePtr createResponse() const override;
97 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
98};
99
100struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
101{
102 TestKeeperGetRequest() {}
103 ResponsePtr createResponse() const override;
104 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
105};
106
107struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
108{
109 TestKeeperSetRequest() {}
110 TestKeeperSetRequest(const SetRequest & base) : SetRequest(base) {}
111 bool isMutable() const override { return true; }
112 ResponsePtr createResponse() const override;
113 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
114
115 void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
116 {
117 processWatchesImpl(getPath(), node_watches, list_watches);
118 }
119};
120
121struct TestKeeperListRequest final : ListRequest, TestKeeperRequest
122{
123 ResponsePtr createResponse() const override;
124 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
125};
126
127struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
128{
129 TestKeeperCheckRequest() {}
130 TestKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
131 ResponsePtr createResponse() const override;
132 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
133};
134
135struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
136{
137 TestKeeperMultiRequest(const Requests & generic_requests)
138 {
139 requests.reserve(generic_requests.size());
140
141 for (const auto & generic_request : generic_requests)
142 {
143 if (auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get()))
144 {
145 auto create = std::make_shared<TestKeeperCreateRequest>(*concrete_request_create);
146 requests.push_back(create);
147 }
148 else if (auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get()))
149 {
150 requests.push_back(std::make_shared<TestKeeperRemoveRequest>(*concrete_request_remove));
151 }
152 else if (auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
153 {
154 requests.push_back(std::make_shared<TestKeeperSetRequest>(*concrete_request_set));
155 }
156 else if (auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get()))
157 {
158 requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
159 }
160 else
161 throw Exception("Illegal command as part of multi ZooKeeper request", ZBADARGUMENTS);
162 }
163 }
164
165 void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
166 {
167 for (const auto & generic_request : requests)
168 dynamic_cast<const TestKeeperRequest &>(*generic_request).processWatches(node_watches, list_watches);
169 }
170
171 ResponsePtr createResponse() const override;
172 ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
173};
174
175
176ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const
177{
178 CreateResponse response;
179 if (container.count(path))
180 {
181 response.error = Error::ZNODEEXISTS;
182 }
183 else
184 {
185 auto it = container.find(parentPath(path));
186
187 if (it == container.end())
188 {
189 response.error = Error::ZNONODE;
190 }
191 else if (it->second.is_ephemeral)
192 {
193 response.error = Error::ZNOCHILDRENFOREPHEMERALS;
194 }
195 else
196 {
197 TestKeeper::Node created_node;
198 created_node.seq_num = 0;
199 created_node.stat.czxid = zxid;
200 created_node.stat.mzxid = zxid;
201 created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
202 created_node.stat.mtime = created_node.stat.ctime;
203 created_node.stat.numChildren = 0;
204 created_node.stat.dataLength = data.length();
205 created_node.data = data;
206 created_node.is_ephemeral = is_ephemeral;
207 created_node.is_sequental = is_sequential;
208 std::string path_created = path;
209
210 if (is_sequential)
211 {
212 auto seq_num = it->second.seq_num;
213 ++it->second.seq_num;
214
215 std::stringstream seq_num_str;
216 seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
217
218 path_created += seq_num_str.str();
219 }
220
221 response.path_created = path_created;
222 container.emplace(std::move(path_created), std::move(created_node));
223
224 ++it->second.stat.cversion;
225 ++it->second.stat.numChildren;
226
227 response.error = Error::ZOK;
228 }
229 }
230
231 return std::make_shared<CreateResponse>(response);
232}
233
234ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const
235{
236 RemoveResponse response;
237
238 auto it = container.find(path);
239 if (it == container.end())
240 {
241 response.error = Error::ZNONODE;
242 }
243 else if (version != -1 && version != it->second.stat.version)
244 {
245 response.error = Error::ZBADVERSION;
246 }
247 else if (it->second.stat.numChildren)
248 {
249 response.error = Error::ZNOTEMPTY;
250 }
251 else
252 {
253 container.erase(it);
254 auto & parent = container.at(parentPath(path));
255 --parent.stat.numChildren;
256 ++parent.stat.cversion;
257 response.error = Error::ZOK;
258 }
259
260 return std::make_shared<RemoveResponse>(response);
261}
262
263ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const
264{
265 ExistsResponse response;
266
267 auto it = container.find(path);
268 if (it != container.end())
269 {
270 response.stat = it->second.stat;
271 response.error = Error::ZOK;
272 }
273 else
274 {
275 response.error = Error::ZNONODE;
276 }
277
278 return std::make_shared<ExistsResponse>(response);
279}
280
281ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const
282{
283 GetResponse response;
284
285 auto it = container.find(path);
286 if (it == container.end())
287 {
288 response.error = Error::ZNONODE;
289 }
290 else
291 {
292 response.stat = it->second.stat;
293 response.data = it->second.data;
294 response.error = Error::ZOK;
295 }
296
297 return std::make_shared<GetResponse>(response);
298}
299
300ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const
301{
302 SetResponse response;
303
304 auto it = container.find(path);
305 if (it == container.end())
306 {
307 response.error = Error::ZNONODE;
308 }
309 else if (version == -1 || version == it->second.stat.version)
310 {
311 it->second.data = data;
312 ++it->second.stat.version;
313 it->second.stat.mzxid = zxid;
314 it->second.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
315 it->second.data = data;
316 ++container.at(parentPath(path)).stat.cversion;
317 response.stat = it->second.stat;
318 response.error = Error::ZOK;
319 }
320 else
321 {
322 response.error = Error::ZBADVERSION;
323 }
324
325 return std::make_shared<SetResponse>(response);
326}
327
328ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const
329{
330 ListResponse response;
331
332 auto it = container.find(path);
333 if (it == container.end())
334 {
335 response.error = Error::ZNONODE;
336 }
337 else
338 {
339 auto path_prefix = path;
340 if (path_prefix.empty())
341 throw Exception("Logical error: path cannot be empty", ZSESSIONEXPIRED);
342
343 if (path_prefix.back() != '/')
344 path_prefix += '/';
345
346 /// Fairly inefficient.
347 for (auto child_it = container.upper_bound(path_prefix); child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it)
348 if (parentPath(child_it->first) == path)
349 response.names.emplace_back(baseName(child_it->first));
350
351 response.stat = it->second.stat;
352 response.error = Error::ZOK;
353 }
354
355 return std::make_shared<ListResponse>(response);
356}
357
358ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const
359{
360 CheckResponse response;
361 auto it = container.find(path);
362 if (it == container.end())
363 {
364 response.error = Error::ZNONODE;
365 }
366 else if (version != -1 && version != it->second.stat.version)
367 {
368 response.error = Error::ZBADVERSION;
369 }
370 else
371 {
372 response.error = Error::ZOK;
373 }
374
375 return std::make_shared<CheckResponse>(response);
376}
377
378ResponsePtr TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
379{
380 MultiResponse response;
381 response.responses.reserve(requests.size());
382
383 /// Fairly inefficient.
384 auto container_copy = container;
385
386 try
387 {
388 for (const auto & request : requests)
389 {
390 const TestKeeperRequest & concrete_request = dynamic_cast<const TestKeeperRequest &>(*request);
391 auto cur_response = concrete_request.process(container, zxid);
392 response.responses.emplace_back(cur_response);
393 if (cur_response->error != Error::ZOK)
394 {
395 response.error = cur_response->error;
396 container = container_copy;
397 return std::make_shared<MultiResponse>(response);
398 }
399 }
400
401 response.error = Error::ZOK;
402 return std::make_shared<MultiResponse>(response);
403 }
404 catch (...)
405 {
406 container = container_copy;
407 throw;
408 }
409}
410
411ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared<CreateResponse>(); }
412ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared<RemoveResponse>(); }
413ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared<ExistsResponse>(); }
414ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared<GetResponse>(); }
415ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); }
416ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared<ListResponse>(); }
417ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared<CheckResponse>(); }
418ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
419
420
421TestKeeper::TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_)
422 : root_path(root_path_), operation_timeout(operation_timeout_)
423{
424 container.emplace("/", Node());
425
426 if (!root_path.empty())
427 {
428 if (root_path.back() == '/')
429 root_path.pop_back();
430 }
431
432 processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
433}
434
435
436TestKeeper::~TestKeeper()
437{
438 try
439 {
440 finalize();
441 if (processing_thread.joinable())
442 processing_thread.join();
443 }
444 catch (...)
445 {
446 tryLogCurrentException(__PRETTY_FUNCTION__);
447 }
448}
449
450
451void TestKeeper::processingThread()
452{
453 setThreadName("TestKeeperProc");
454
455 try
456 {
457 while (!expired)
458 {
459 RequestInfo info;
460
461 UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
462 if (requests_queue.tryPop(info, max_wait))
463 {
464 if (expired)
465 break;
466
467 if (info.watch)
468 {
469 auto & watches_type = dynamic_cast<const ListRequest *>(info.request.get())
470 ? list_watches
471 : watches;
472
473 watches_type[info.request->getPath()].emplace_back(std::move(info.watch));
474 }
475
476 ++zxid;
477
478 info.request->addRootPath(root_path);
479 ResponsePtr response = info.request->process(container, zxid);
480 if (response->error == Error::ZOK)
481 info.request->processWatches(watches, list_watches);
482
483 response->removeRootPath(root_path);
484 if (info.callback)
485 info.callback(*response);
486 }
487 }
488 }
489 catch (...)
490 {
491 tryLogCurrentException(__PRETTY_FUNCTION__);
492 finalize();
493 }
494}
495
496
497void TestKeeper::finalize()
498{
499 {
500 std::lock_guard lock(push_request_mutex);
501
502 if (expired)
503 return;
504 expired = true;
505 }
506
507 processing_thread.join();
508
509 try
510 {
511 {
512 for (auto & path_watch : watches)
513 {
514 WatchResponse response;
515 response.type = SESSION;
516 response.state = EXPIRED_SESSION;
517 response.error = ZSESSIONEXPIRED;
518
519 for (auto & callback : path_watch.second)
520 {
521 if (callback)
522 {
523 try
524 {
525 callback(response);
526 }
527 catch (...)
528 {
529 tryLogCurrentException(__PRETTY_FUNCTION__);
530 }
531 }
532 }
533 }
534
535 watches.clear();
536 }
537
538 RequestInfo info;
539 while (requests_queue.tryPop(info))
540 {
541 if (info.callback)
542 {
543 ResponsePtr response = info.request->createResponse();
544 response->error = ZSESSIONEXPIRED;
545 try
546 {
547 info.callback(*response);
548 }
549 catch (...)
550 {
551 tryLogCurrentException(__PRETTY_FUNCTION__);
552 }
553 }
554 if (info.watch)
555 {
556 WatchResponse response;
557 response.type = SESSION;
558 response.state = EXPIRED_SESSION;
559 response.error = ZSESSIONEXPIRED;
560 try
561 {
562 info.watch(response);
563 }
564 catch (...)
565 {
566 tryLogCurrentException(__PRETTY_FUNCTION__);
567 }
568 }
569 }
570 }
571 catch (...)
572 {
573 tryLogCurrentException(__PRETTY_FUNCTION__);
574 }
575}
576
577void TestKeeper::pushRequest(RequestInfo && info)
578{
579 try
580 {
581 info.time = clock::now();
582
583 /// We must serialize 'pushRequest' and 'finalize' (from processingThread) calls
584 /// to avoid forgotten operations in the queue when session is expired.
585 /// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
586 /// and the queue will be drained in 'finalize'.
587 std::lock_guard lock(push_request_mutex);
588
589 if (expired)
590 throw Exception("Session expired", ZSESSIONEXPIRED);
591
592 if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
593 throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
594 }
595 catch (...)
596 {
597 finalize();
598 throw;
599 }
600}
601
602
603void TestKeeper::create(
604 const String & path,
605 const String & data,
606 bool is_ephemeral,
607 bool is_sequential,
608 const ACLs &,
609 CreateCallback callback)
610{
611 TestKeeperCreateRequest request;
612 request.path = path;
613 request.data = data;
614 request.is_ephemeral = is_ephemeral;
615 request.is_sequential = is_sequential;
616
617 RequestInfo request_info;
618 request_info.request = std::make_shared<TestKeeperCreateRequest>(std::move(request));
619 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CreateResponse &>(response)); };
620 pushRequest(std::move(request_info));
621}
622
623void TestKeeper::remove(
624 const String & path,
625 int32_t version,
626 RemoveCallback callback)
627{
628 TestKeeperRemoveRequest request;
629 request.path = path;
630 request.version = version;
631
632 RequestInfo request_info;
633 request_info.request = std::make_shared<TestKeeperRemoveRequest>(std::move(request));
634 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); };
635 pushRequest(std::move(request_info));
636}
637
638void TestKeeper::exists(
639 const String & path,
640 ExistsCallback callback,
641 WatchCallback watch)
642{
643 TestKeeperExistsRequest request;
644 request.path = path;
645
646 RequestInfo request_info;
647 request_info.request = std::make_shared<TestKeeperExistsRequest>(std::move(request));
648 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ExistsResponse &>(response)); };
649 request_info.watch = watch;
650 pushRequest(std::move(request_info));
651}
652
653void TestKeeper::get(
654 const String & path,
655 GetCallback callback,
656 WatchCallback watch)
657{
658 TestKeeperGetRequest request;
659 request.path = path;
660
661 RequestInfo request_info;
662 request_info.request = std::make_shared<TestKeeperGetRequest>(std::move(request));
663 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const GetResponse &>(response)); };
664 request_info.watch = watch;
665 pushRequest(std::move(request_info));
666}
667
668void TestKeeper::set(
669 const String & path,
670 const String & data,
671 int32_t version,
672 SetCallback callback)
673{
674 TestKeeperSetRequest request;
675 request.path = path;
676 request.data = data;
677 request.version = version;
678
679 RequestInfo request_info;
680 request_info.request = std::make_shared<TestKeeperSetRequest>(std::move(request));
681 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SetResponse &>(response)); };
682 pushRequest(std::move(request_info));
683}
684
685void TestKeeper::list(
686 const String & path,
687 ListCallback callback,
688 WatchCallback watch)
689{
690 TestKeeperListRequest request;
691 request.path = path;
692
693 RequestInfo request_info;
694 request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request));
695 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ListResponse &>(response)); };
696 request_info.watch = watch;
697 pushRequest(std::move(request_info));
698}
699
700void TestKeeper::check(
701 const String & path,
702 int32_t version,
703 CheckCallback callback)
704{
705 TestKeeperCheckRequest request;
706 request.path = path;
707 request.version = version;
708
709 RequestInfo request_info;
710 request_info.request = std::make_shared<TestKeeperCheckRequest>(std::move(request));
711 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CheckResponse &>(response)); };
712 pushRequest(std::move(request_info));
713}
714
715void TestKeeper::multi(
716 const Requests & requests,
717 MultiCallback callback)
718{
719 TestKeeperMultiRequest request(requests);
720
721 RequestInfo request_info;
722 request_info.request = std::make_shared<TestKeeperMultiRequest>(std::move(request));
723 request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); };
724 pushRequest(std::move(request_info));
725}
726
727}
728