| 1 | #pragma once |
| 2 | |
| 3 | #include <mutex> |
| 4 | #include <map> |
| 5 | #include <atomic> |
| 6 | #include <thread> |
| 7 | #include <chrono> |
| 8 | |
| 9 | #include <Poco/Timespan.h> |
| 10 | #include <Common/ZooKeeper/IKeeper.h> |
| 11 | #include <Common/ThreadPool.h> |
| 12 | #include <Common/ConcurrentBoundedQueue.h> |
| 13 | |
| 14 | |
| 15 | namespace Coordination |
| 16 | { |
| 17 | |
| 18 | struct TestKeeperRequest; |
| 19 | using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>; |
| 20 | |
| 21 | |
| 22 | /** Looks like ZooKeeper but stores all data in memory of server process. |
| 23 | * All data is not shared between different servers and is lost after server restart. |
| 24 | * |
| 25 | * The only purpose is to more simple testing for interaction with ZooKeeper within a single server. |
| 26 | * This still makes sense, because multiple replicas of a single table can be created on a single server, |
| 27 | * and it is used to test replication logic. |
| 28 | * |
| 29 | * Does not support ACLs. Does not support NULL node values. |
| 30 | * |
| 31 | * NOTE: You can add various failure modes for better testing. |
| 32 | */ |
| 33 | class TestKeeper : public IKeeper |
| 34 | { |
| 35 | public: |
| 36 | TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_); |
| 37 | ~TestKeeper() override; |
| 38 | |
| 39 | bool isExpired() const override { return expired; } |
| 40 | int64_t getSessionID() const override { return 0; } |
| 41 | |
| 42 | |
| 43 | void create( |
| 44 | const String & path, |
| 45 | const String & data, |
| 46 | bool is_ephemeral, |
| 47 | bool is_sequential, |
| 48 | const ACLs & acls, |
| 49 | CreateCallback callback) override; |
| 50 | |
| 51 | void remove( |
| 52 | const String & path, |
| 53 | int32_t version, |
| 54 | RemoveCallback callback) override; |
| 55 | |
| 56 | void exists( |
| 57 | const String & path, |
| 58 | ExistsCallback callback, |
| 59 | WatchCallback watch) override; |
| 60 | |
| 61 | void get( |
| 62 | const String & path, |
| 63 | GetCallback callback, |
| 64 | WatchCallback watch) override; |
| 65 | |
| 66 | void set( |
| 67 | const String & path, |
| 68 | const String & data, |
| 69 | int32_t version, |
| 70 | SetCallback callback) override; |
| 71 | |
| 72 | void list( |
| 73 | const String & path, |
| 74 | ListCallback callback, |
| 75 | WatchCallback watch) override; |
| 76 | |
| 77 | void check( |
| 78 | const String & path, |
| 79 | int32_t version, |
| 80 | CheckCallback callback) override; |
| 81 | |
| 82 | void multi( |
| 83 | const Requests & requests, |
| 84 | MultiCallback callback) override; |
| 85 | |
| 86 | |
| 87 | struct Node |
| 88 | { |
| 89 | String data; |
| 90 | ACLs acls; |
| 91 | bool is_ephemeral = false; |
| 92 | bool is_sequental = false; |
| 93 | Stat stat{}; |
| 94 | int32_t seq_num = 0; |
| 95 | }; |
| 96 | |
| 97 | using Container = std::map<std::string, Node>; |
| 98 | |
| 99 | using WatchCallbacks = std::vector<WatchCallback>; |
| 100 | using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>; |
| 101 | |
| 102 | private: |
| 103 | using clock = std::chrono::steady_clock; |
| 104 | |
| 105 | struct RequestInfo |
| 106 | { |
| 107 | TestKeeperRequestPtr request; |
| 108 | ResponseCallback callback; |
| 109 | WatchCallback watch; |
| 110 | clock::time_point time; |
| 111 | }; |
| 112 | |
| 113 | Container container; |
| 114 | |
| 115 | String root_path; |
| 116 | ACLs default_acls; |
| 117 | |
| 118 | Poco::Timespan operation_timeout; |
| 119 | |
| 120 | std::mutex push_request_mutex; |
| 121 | std::atomic<bool> expired{false}; |
| 122 | |
| 123 | int64_t zxid = 0; |
| 124 | |
| 125 | Watches watches; |
| 126 | Watches list_watches; /// Watches for 'list' request (watches on children). |
| 127 | |
| 128 | void createWatchCallBack(const String & path); |
| 129 | |
| 130 | using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>; |
| 131 | RequestsQueue requests_queue{1}; |
| 132 | |
| 133 | void pushRequest(RequestInfo && request); |
| 134 | |
| 135 | void finalize(); |
| 136 | |
| 137 | ThreadFromGlobalPool processing_thread; |
| 138 | |
| 139 | void processingThread(); |
| 140 | }; |
| 141 | |
| 142 | } |
| 143 | |
| 144 | |