1 | #pragma once |
2 | |
3 | #include <mutex> |
4 | #include <map> |
5 | #include <atomic> |
6 | #include <thread> |
7 | #include <chrono> |
8 | |
9 | #include <Poco/Timespan.h> |
10 | #include <Common/ZooKeeper/IKeeper.h> |
11 | #include <Common/ThreadPool.h> |
12 | #include <Common/ConcurrentBoundedQueue.h> |
13 | |
14 | |
15 | namespace Coordination |
16 | { |
17 | |
18 | struct TestKeeperRequest; |
19 | using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>; |
20 | |
21 | |
22 | /** Looks like ZooKeeper but stores all data in memory of server process. |
23 | * All data is not shared between different servers and is lost after server restart. |
24 | * |
25 | * The only purpose is to more simple testing for interaction with ZooKeeper within a single server. |
26 | * This still makes sense, because multiple replicas of a single table can be created on a single server, |
27 | * and it is used to test replication logic. |
28 | * |
29 | * Does not support ACLs. Does not support NULL node values. |
30 | * |
31 | * NOTE: You can add various failure modes for better testing. |
32 | */ |
33 | class TestKeeper : public IKeeper |
34 | { |
35 | public: |
36 | TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_); |
37 | ~TestKeeper() override; |
38 | |
39 | bool isExpired() const override { return expired; } |
40 | int64_t getSessionID() const override { return 0; } |
41 | |
42 | |
43 | void create( |
44 | const String & path, |
45 | const String & data, |
46 | bool is_ephemeral, |
47 | bool is_sequential, |
48 | const ACLs & acls, |
49 | CreateCallback callback) override; |
50 | |
51 | void remove( |
52 | const String & path, |
53 | int32_t version, |
54 | RemoveCallback callback) override; |
55 | |
56 | void exists( |
57 | const String & path, |
58 | ExistsCallback callback, |
59 | WatchCallback watch) override; |
60 | |
61 | void get( |
62 | const String & path, |
63 | GetCallback callback, |
64 | WatchCallback watch) override; |
65 | |
66 | void set( |
67 | const String & path, |
68 | const String & data, |
69 | int32_t version, |
70 | SetCallback callback) override; |
71 | |
72 | void list( |
73 | const String & path, |
74 | ListCallback callback, |
75 | WatchCallback watch) override; |
76 | |
77 | void check( |
78 | const String & path, |
79 | int32_t version, |
80 | CheckCallback callback) override; |
81 | |
82 | void multi( |
83 | const Requests & requests, |
84 | MultiCallback callback) override; |
85 | |
86 | |
87 | struct Node |
88 | { |
89 | String data; |
90 | ACLs acls; |
91 | bool is_ephemeral = false; |
92 | bool is_sequental = false; |
93 | Stat stat{}; |
94 | int32_t seq_num = 0; |
95 | }; |
96 | |
97 | using Container = std::map<std::string, Node>; |
98 | |
99 | using WatchCallbacks = std::vector<WatchCallback>; |
100 | using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>; |
101 | |
102 | private: |
103 | using clock = std::chrono::steady_clock; |
104 | |
105 | struct RequestInfo |
106 | { |
107 | TestKeeperRequestPtr request; |
108 | ResponseCallback callback; |
109 | WatchCallback watch; |
110 | clock::time_point time; |
111 | }; |
112 | |
113 | Container container; |
114 | |
115 | String root_path; |
116 | ACLs default_acls; |
117 | |
118 | Poco::Timespan operation_timeout; |
119 | |
120 | std::mutex push_request_mutex; |
121 | std::atomic<bool> expired{false}; |
122 | |
123 | int64_t zxid = 0; |
124 | |
125 | Watches watches; |
126 | Watches list_watches; /// Watches for 'list' request (watches on children). |
127 | |
128 | void createWatchCallBack(const String & path); |
129 | |
130 | using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>; |
131 | RequestsQueue requests_queue{1}; |
132 | |
133 | void pushRequest(RequestInfo && request); |
134 | |
135 | void finalize(); |
136 | |
137 | ThreadFromGlobalPool processing_thread; |
138 | |
139 | void processingThread(); |
140 | }; |
141 | |
142 | } |
143 | |
144 | |