1#pragma once
2
3#include <mutex>
4#include <map>
5#include <atomic>
6#include <thread>
7#include <chrono>
8
9#include <Poco/Timespan.h>
10#include <Common/ZooKeeper/IKeeper.h>
11#include <Common/ThreadPool.h>
12#include <Common/ConcurrentBoundedQueue.h>
13
14
15namespace Coordination
16{
17
18struct TestKeeperRequest;
19using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
20
21
22/** Looks like ZooKeeper but stores all data in memory of server process.
23 * All data is not shared between different servers and is lost after server restart.
24 *
25 * The only purpose is to more simple testing for interaction with ZooKeeper within a single server.
26 * This still makes sense, because multiple replicas of a single table can be created on a single server,
27 * and it is used to test replication logic.
28 *
29 * Does not support ACLs. Does not support NULL node values.
30 *
31 * NOTE: You can add various failure modes for better testing.
32 */
33class TestKeeper : public IKeeper
34{
35public:
36 TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_);
37 ~TestKeeper() override;
38
39 bool isExpired() const override { return expired; }
40 int64_t getSessionID() const override { return 0; }
41
42
43 void create(
44 const String & path,
45 const String & data,
46 bool is_ephemeral,
47 bool is_sequential,
48 const ACLs & acls,
49 CreateCallback callback) override;
50
51 void remove(
52 const String & path,
53 int32_t version,
54 RemoveCallback callback) override;
55
56 void exists(
57 const String & path,
58 ExistsCallback callback,
59 WatchCallback watch) override;
60
61 void get(
62 const String & path,
63 GetCallback callback,
64 WatchCallback watch) override;
65
66 void set(
67 const String & path,
68 const String & data,
69 int32_t version,
70 SetCallback callback) override;
71
72 void list(
73 const String & path,
74 ListCallback callback,
75 WatchCallback watch) override;
76
77 void check(
78 const String & path,
79 int32_t version,
80 CheckCallback callback) override;
81
82 void multi(
83 const Requests & requests,
84 MultiCallback callback) override;
85
86
87 struct Node
88 {
89 String data;
90 ACLs acls;
91 bool is_ephemeral = false;
92 bool is_sequental = false;
93 Stat stat{};
94 int32_t seq_num = 0;
95 };
96
97 using Container = std::map<std::string, Node>;
98
99 using WatchCallbacks = std::vector<WatchCallback>;
100 using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
101
102private:
103 using clock = std::chrono::steady_clock;
104
105 struct RequestInfo
106 {
107 TestKeeperRequestPtr request;
108 ResponseCallback callback;
109 WatchCallback watch;
110 clock::time_point time;
111 };
112
113 Container container;
114
115 String root_path;
116 ACLs default_acls;
117
118 Poco::Timespan operation_timeout;
119
120 std::mutex push_request_mutex;
121 std::atomic<bool> expired{false};
122
123 int64_t zxid = 0;
124
125 Watches watches;
126 Watches list_watches; /// Watches for 'list' request (watches on children).
127
128 void createWatchCallBack(const String & path);
129
130 using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
131 RequestsQueue requests_queue{1};
132
133 void pushRequest(RequestInfo && request);
134
135 void finalize();
136
137 ThreadFromGlobalPool processing_thread;
138
139 void processingThread();
140};
141
142}
143
144