1#pragma once
2
3#include "ZooKeeper.h"
4#include "KeeperException.h"
5#include <functional>
6#include <memory>
7#include <common/logger_useful.h>
8#include <Common/CurrentMetrics.h>
9#include <Core/BackgroundSchedulePool.h>
10
11
12namespace ProfileEvents
13{
14 extern const Event LeaderElectionAcquiredLeadership;
15}
16
17namespace CurrentMetrics
18{
19 extern const Metric LeaderElection;
20}
21
22
23namespace zkutil
24{
25
26/** Implements leader election algorithm described here: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection
27 */
28class LeaderElection
29{
30public:
31 using LeadershipHandler = std::function<void()>;
32
33 /** handler is called when this instance become leader.
34 *
35 * identifier - if not empty, must uniquely (within same path) identify participant of leader election.
36 * It means that different participants of leader election have different identifiers
37 * and existence of more than one ephemeral node with same identifier indicates an error.
38 */
39 LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
40 : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
41 , log_name("LeaderElection (" + path + ")")
42 , log(&Logger::get(log_name))
43 {
44 task = pool.createTask(log_name, [this] { threadFunction(); });
45 createNode();
46 }
47
48 void shutdown()
49 {
50 if (shutdown_called)
51 return;
52
53 shutdown_called = true;
54 task->deactivate();
55 }
56
57 ~LeaderElection()
58 {
59 releaseNode();
60 }
61
62private:
63 DB::BackgroundSchedulePool & pool;
64 DB::BackgroundSchedulePool::TaskHolder task;
65 std::string path;
66 ZooKeeper & zookeeper;
67 LeadershipHandler handler;
68 std::string identifier;
69 std::string log_name;
70 Logger * log;
71
72 EphemeralNodeHolderPtr node;
73 std::string node_name;
74
75 std::atomic<bool> shutdown_called {false};
76
77 CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
78
79 void createNode()
80 {
81 shutdown_called = false;
82 node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
83
84 std::string node_path = node->getPath();
85 node_name = node_path.substr(node_path.find_last_of('/') + 1);
86
87 task->activateAndSchedule();
88 }
89
90 void releaseNode()
91 {
92 shutdown();
93 node = nullptr;
94 }
95
96 void threadFunction()
97 {
98 bool success = false;
99
100 try
101 {
102 Strings children = zookeeper.getChildren(path);
103 std::sort(children.begin(), children.end());
104 auto it = std::lower_bound(children.begin(), children.end(), node_name);
105 if (it == children.end() || *it != node_name)
106 throw Poco::Exception("Assertion failed in LeaderElection");
107
108 if (it == children.begin())
109 {
110 ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
111 handler();
112 return;
113 }
114
115 if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback()))
116 task->schedule();
117
118 success = true;
119 }
120 catch (const KeeperException & e)
121 {
122 DB::tryLogCurrentException(log);
123
124 if (e.code == Coordination::ZSESSIONEXPIRED)
125 return;
126 }
127 catch (...)
128 {
129 DB::tryLogCurrentException(log);
130 }
131
132 if (!success)
133 task->scheduleAfter(10 * 1000);
134 }
135};
136
137using LeaderElectionPtr = std::shared_ptr<LeaderElection>;
138
139}
140