1 | #pragma once |
2 | |
3 | #include "ZooKeeper.h" |
4 | #include "KeeperException.h" |
5 | #include <functional> |
6 | #include <memory> |
7 | #include <common/logger_useful.h> |
8 | #include <Common/CurrentMetrics.h> |
9 | #include <Core/BackgroundSchedulePool.h> |
10 | |
11 | |
12 | namespace ProfileEvents |
13 | { |
14 | extern const Event LeaderElectionAcquiredLeadership; |
15 | } |
16 | |
17 | namespace CurrentMetrics |
18 | { |
19 | extern const Metric LeaderElection; |
20 | } |
21 | |
22 | |
23 | namespace zkutil |
24 | { |
25 | |
26 | /** Implements leader election algorithm described here: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection |
27 | */ |
28 | class LeaderElection |
29 | { |
30 | public: |
31 | using LeadershipHandler = std::function<void()>; |
32 | |
33 | /** handler is called when this instance become leader. |
34 | * |
35 | * identifier - if not empty, must uniquely (within same path) identify participant of leader election. |
36 | * It means that different participants of leader election have different identifiers |
37 | * and existence of more than one ephemeral node with same identifier indicates an error. |
38 | */ |
39 | LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "" ) |
40 | : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) |
41 | , log_name("LeaderElection (" + path + ")" ) |
42 | , log(&Logger::get(log_name)) |
43 | { |
44 | task = pool.createTask(log_name, [this] { threadFunction(); }); |
45 | createNode(); |
46 | } |
47 | |
48 | void shutdown() |
49 | { |
50 | if (shutdown_called) |
51 | return; |
52 | |
53 | shutdown_called = true; |
54 | task->deactivate(); |
55 | } |
56 | |
57 | ~LeaderElection() |
58 | { |
59 | releaseNode(); |
60 | } |
61 | |
62 | private: |
63 | DB::BackgroundSchedulePool & pool; |
64 | DB::BackgroundSchedulePool::TaskHolder task; |
65 | std::string path; |
66 | ZooKeeper & zookeeper; |
67 | LeadershipHandler handler; |
68 | std::string identifier; |
69 | std::string log_name; |
70 | Logger * log; |
71 | |
72 | EphemeralNodeHolderPtr node; |
73 | std::string node_name; |
74 | |
75 | std::atomic<bool> shutdown_called {false}; |
76 | |
77 | CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; |
78 | |
79 | void createNode() |
80 | { |
81 | shutdown_called = false; |
82 | node = EphemeralNodeHolder::createSequential(path + "/leader_election-" , zookeeper, identifier); |
83 | |
84 | std::string node_path = node->getPath(); |
85 | node_name = node_path.substr(node_path.find_last_of('/') + 1); |
86 | |
87 | task->activateAndSchedule(); |
88 | } |
89 | |
90 | void releaseNode() |
91 | { |
92 | shutdown(); |
93 | node = nullptr; |
94 | } |
95 | |
96 | void threadFunction() |
97 | { |
98 | bool success = false; |
99 | |
100 | try |
101 | { |
102 | Strings children = zookeeper.getChildren(path); |
103 | std::sort(children.begin(), children.end()); |
104 | auto it = std::lower_bound(children.begin(), children.end(), node_name); |
105 | if (it == children.end() || *it != node_name) |
106 | throw Poco::Exception("Assertion failed in LeaderElection" ); |
107 | |
108 | if (it == children.begin()) |
109 | { |
110 | ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); |
111 | handler(); |
112 | return; |
113 | } |
114 | |
115 | if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback())) |
116 | task->schedule(); |
117 | |
118 | success = true; |
119 | } |
120 | catch (const KeeperException & e) |
121 | { |
122 | DB::tryLogCurrentException(log); |
123 | |
124 | if (e.code == Coordination::ZSESSIONEXPIRED) |
125 | return; |
126 | } |
127 | catch (...) |
128 | { |
129 | DB::tryLogCurrentException(log); |
130 | } |
131 | |
132 | if (!success) |
133 | task->scheduleAfter(10 * 1000); |
134 | } |
135 | }; |
136 | |
137 | using LeaderElectionPtr = std::shared_ptr<LeaderElection>; |
138 | |
139 | } |
140 | |