1#pragma once
2
3#include <Core/Types.h>
4#include <Common/ConcurrentBoundedQueue.h>
5#include <Common/CurrentMetrics.h>
6#include <Common/ThreadPool.h>
7#include <Common/ZooKeeper/IKeeper.h>
8
9#include <IO/ReadBuffer.h>
10#include <IO/WriteBuffer.h>
11#include <IO/ReadBufferFromPocoSocket.h>
12#include <IO/WriteBufferFromPocoSocket.h>
13
14#include <Poco/Net/StreamSocket.h>
15#include <Poco/Net/SocketAddress.h>
16
17#include <map>
18#include <mutex>
19#include <chrono>
20#include <vector>
21#include <memory>
22#include <thread>
23#include <atomic>
24#include <cstdint>
25#include <optional>
26#include <functional>
27
28
29/** ZooKeeper C++ library, a replacement for libzookeeper.
30 *
31 * Motivation.
32 *
33 * libzookeeper has many bugs:
34 * - segfaults: for example, if zookeeper connection was interrupted while reading result of multi response;
35 * - memory corruption: for example, as a result of double free inside libzookeeper;
36 * - no timeouts for synchronous operations: they may stuck forever under simple Jepsen-like tests;
37 * - logical errors: for example, chroot prefix is not removed from the results of multi responses.
38 * - data races;
39 *
40 * The code of libzookeeper is over complicated:
41 * - memory ownership is unclear and bugs are very difficult to track and fix.
42 * - extremely creepy code for implementation of "chroot" feature.
43 *
44 * As of 2018, there are no active maintainers of libzookeeper:
45 * - bugs in JIRA are fixed only occasionally with ad-hoc patches by library users.
46 *
47 * libzookeeper is a classical example of bad code written in C.
48 *
49 * In Go, Python and Rust programming languages,
50 * there are separate libraries for ZooKeeper, not based on libzookeeper.
51 * Motivation is almost the same. Example:
52 * https://github.com/python-zk/kazoo/blob/master/docs/implementation.rst
53 *
54 * About "session restore" feature.
55 *
56 * libzookeeper has the feature of session restore. Client receives session id and session token from the server,
57 * and when connection is lost, it can quickly reconnect to any server with the same session id and token,
58 * to continue with existing session.
59 * libzookeeper performs this reconnection automatically.
60 *
61 * This feature is proven to be harmful.
62 * For example, it makes very difficult to correctly remove ephemeral nodes.
63 * This may lead to weird bugs in application code.
64 * For example, our developers have found that type of bugs in Curator Java library.
65 *
66 * On the other side, session restore feature has no advantages,
67 * because every application should be able to establish new session and reinitialize internal state,
68 * when the session is lost and cannot be restored.
69 *
70 * This library never restores the session. In case of any error, the session is considered as expired
71 * and you should create a new instance of ZooKeeper object and reinitialize the application state.
72 *
73 * This library is not intended to be CPU efficient. Hundreds of thousands operations per second is usually enough.
74 */
75
76
77namespace CurrentMetrics
78{
79 extern const Metric ZooKeeperSession;
80}
81
82
83namespace Coordination
84{
85
86using namespace DB;
87
88struct ZooKeeperRequest;
89
90
91
92/** Usage scenario: look at the documentation for IKeeper class.
93 */
94class ZooKeeper : public IKeeper
95{
96public:
97 using Addresses = std::vector<Poco::Net::SocketAddress>;
98
99 using XID = int32_t;
100 using OpNum = int32_t;
101
102 /** Connection to addresses is performed in order. If you want, shuffle them manually.
103 * Operation timeout couldn't be greater than session timeout.
104 * Operation timeout applies independently for network read, network write, waiting for events and synchronization.
105 */
106 ZooKeeper(
107 const Addresses & addresses,
108 const String & root_path,
109 const String & auth_scheme,
110 const String & auth_data,
111 Poco::Timespan session_timeout_,
112 Poco::Timespan connection_timeout,
113 Poco::Timespan operation_timeout_);
114
115 ~ZooKeeper() override;
116
117
118 /// If expired, you can only destroy the object. All other methods will throw exception.
119 bool isExpired() const override { return expired; }
120
121 /// Useful to check owner of ephemeral node.
122 int64_t getSessionID() const override { return session_id; }
123
124
125 /// See the documentation about semantics of these methods in IKeeper class.
126
127 void create(
128 const String & path,
129 const String & data,
130 bool is_ephemeral,
131 bool is_sequential,
132 const ACLs & acls,
133 CreateCallback callback) override;
134
135 void remove(
136 const String & path,
137 int32_t version,
138 RemoveCallback callback) override;
139
140 void exists(
141 const String & path,
142 ExistsCallback callback,
143 WatchCallback watch) override;
144
145 void get(
146 const String & path,
147 GetCallback callback,
148 WatchCallback watch) override;
149
150 void set(
151 const String & path,
152 const String & data,
153 int32_t version,
154 SetCallback callback) override;
155
156 void list(
157 const String & path,
158 ListCallback callback,
159 WatchCallback watch) override;
160
161 void check(
162 const String & path,
163 int32_t version,
164 CheckCallback callback) override;
165
166 void multi(
167 const Requests & requests,
168 MultiCallback callback) override;
169
170private:
171 String root_path;
172 ACLs default_acls;
173
174 Poco::Timespan session_timeout;
175 Poco::Timespan operation_timeout;
176
177 Poco::Net::StreamSocket socket;
178 std::optional<ReadBufferFromPocoSocket> in;
179 std::optional<WriteBufferFromPocoSocket> out;
180
181 int64_t session_id = 0;
182
183 std::atomic<XID> next_xid {1};
184 std::atomic<bool> expired {false};
185 std::mutex push_request_mutex;
186
187 using clock = std::chrono::steady_clock;
188
189 struct RequestInfo
190 {
191 std::shared_ptr<ZooKeeperRequest> request;
192 ResponseCallback callback;
193 WatchCallback watch;
194 clock::time_point time;
195 };
196
197 using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
198
199 RequestsQueue requests_queue{1};
200 void pushRequest(RequestInfo && request);
201
202 using Operations = std::map<XID, RequestInfo>;
203
204 Operations operations;
205 std::mutex operations_mutex;
206
207 using WatchCallbacks = std::vector<WatchCallback>;
208 using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
209
210 Watches watches;
211 std::mutex watches_mutex;
212
213 ThreadFromGlobalPool send_thread;
214 ThreadFromGlobalPool receive_thread;
215
216 void connect(
217 const Addresses & addresses,
218 Poco::Timespan connection_timeout);
219
220 void sendHandshake();
221 void receiveHandshake();
222
223 void sendAuth(const String & scheme, const String & data);
224
225 void receiveEvent();
226
227 void sendThread();
228 void receiveThread();
229
230 void close();
231
232 /// Call all remaining callbacks and watches, passing errors to them.
233 void finalize(bool error_send, bool error_receive);
234
235 template <typename T>
236 void write(const T &);
237
238 template <typename T>
239 void read(T &);
240
241 CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
242};
243
244struct ZooKeeperResponse;
245using ZooKeeperResponsePtr = std::shared_ptr<ZooKeeperResponse>;
246
247/// Exposed in header file for Yandex.Metrica code.
248struct ZooKeeperRequest : virtual Request
249{
250 ZooKeeper::XID xid = 0;
251 bool has_watch = false;
252 /// If the request was not send and the error happens, we definitely sure, that is has not been processed by the server.
253 /// If the request was sent and we didn't get the response and the error happens, then we cannot be sure was it processed or not.
254 bool probably_sent = false;
255
256 ZooKeeperRequest() = default;
257 ZooKeeperRequest(const ZooKeeperRequest &) = default;
258 virtual ~ZooKeeperRequest() = default;
259
260 virtual ZooKeeper::OpNum getOpNum() const = 0;
261
262 /// Writes length, xid, op_num, then the rest.
263 void write(WriteBuffer & out) const;
264
265 virtual void writeImpl(WriteBuffer &) const = 0;
266
267 virtual ZooKeeperResponsePtr makeResponse() const = 0;
268};
269
270
271}
272