1#include <IO/Operators.h>
2#include <Storages/StorageReplicatedMergeTree.h>
3#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
4#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
5#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
6#include <Common/ZooKeeper/KeeperException.h>
7#include <Common/randomSeed.h>
8
9
10namespace ProfileEvents
11{
12 extern const Event ReplicaYieldLeadership;
13 extern const Event ReplicaPartialShutdown;
14}
15
16namespace CurrentMetrics
17{
18 extern const Metric ReadonlyReplica;
19}
20
21
22namespace DB
23{
24
25namespace ErrorCodes
26{
27 extern const int REPLICA_IS_ALREADY_ACTIVE;
28}
29
30namespace
31{
32 constexpr auto retry_period_ms = 10 * 1000;
33}
34
35/// Used to check whether it's us who set node `is_active`, or not.
36static String generateActiveNodeIdentifier()
37{
38 return "pid: " + toString(getpid()) + ", random: " + toString(randomSeed());
39}
40
41ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_)
42 : storage(storage_)
43 , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeRestartingThread)")
44 , log(&Logger::get(log_name))
45 , active_node_identifier(generateActiveNodeIdentifier())
46{
47 const auto storage_settings = storage.getSettings();
48 check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000;
49
50 /// Periodicity of checking lag of replica.
51 if (check_period_ms > static_cast<Int64>(storage_settings->check_delay_period) * 1000)
52 check_period_ms = storage_settings->check_delay_period * 1000;
53
54 task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
55}
56
57void ReplicatedMergeTreeRestartingThread::run()
58{
59 if (need_stop)
60 return;
61
62 try
63 {
64 if (first_time || storage.getZooKeeper()->expired())
65 {
66 startup_completed = false;
67
68 if (first_time)
69 {
70 LOG_DEBUG(log, "Activating replica.");
71 }
72 else
73 {
74 LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
75
76 bool old_val = false;
77 if (storage.is_readonly.compare_exchange_strong(old_val, true))
78 CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
79
80 partialShutdown();
81 }
82
83 if (!startup_completed)
84 {
85 try
86 {
87 storage.setZooKeeper(storage.global_context.getZooKeeper());
88 }
89 catch (const Coordination::Exception &)
90 {
91 /// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
92 tryLogCurrentException(log, __PRETTY_FUNCTION__);
93
94 if (first_time)
95 storage.startup_event.set();
96 task->scheduleAfter(retry_period_ms);
97 return;
98 }
99
100 if (!need_stop && !tryStartup())
101 {
102 if (first_time)
103 storage.startup_event.set();
104 task->scheduleAfter(retry_period_ms);
105 return;
106 }
107
108 if (first_time)
109 storage.startup_event.set();
110
111 startup_completed = true;
112 }
113
114 if (need_stop)
115 return;
116
117 bool old_val = true;
118 if (storage.is_readonly.compare_exchange_strong(old_val, false))
119 CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
120
121 first_time = false;
122 }
123
124 time_t current_time = time(nullptr);
125 const auto storage_settings = storage.getSettings();
126 if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage_settings->check_delay_period))
127 {
128 /// Find out lag of replicas.
129 time_t absolute_delay = 0;
130 time_t relative_delay = 0;
131
132 storage.getReplicaDelays(absolute_delay, relative_delay);
133
134 if (absolute_delay)
135 LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
136
137 prev_time_of_check_delay = current_time;
138
139 /// We give up leadership if the relative lag is greater than threshold.
140 if (storage.is_leader
141 && relative_delay > static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
142 {
143 LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
144 << storage_settings->min_relative_delay_to_yield_leadership << "). Will yield leadership.");
145
146 ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
147
148 storage.exitLeaderElection();
149 /// NOTE: enterLeaderElection() can throw if node creation in ZK fails.
150 /// This is bad because we can end up without a leader on any replica.
151 /// In this case we rely on the fact that the session will expire and we will reconnect.
152 storage.enterLeaderElection();
153 }
154 }
155 }
156 catch (...)
157 {
158 storage.startup_event.set();
159 tryLogCurrentException(log, __PRETTY_FUNCTION__);
160 }
161
162 task->scheduleAfter(check_period_ms);
163}
164
165
166bool ReplicatedMergeTreeRestartingThread::tryStartup()
167{
168 try
169 {
170 removeFailedQuorumParts();
171 activateReplica();
172
173 const auto & zookeeper = storage.getZooKeeper();
174 const auto storage_settings = storage.getSettings();
175
176 storage.cloneReplicaIfNeeded(zookeeper);
177
178 storage.queue.load(zookeeper);
179
180 /// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
181 /// because cleanup_thread doesn't delete log_pointer of active replicas.
182 storage.queue.pullLogsToQueue(zookeeper);
183 storage.last_queue_update_finish_time.store(time(nullptr));
184
185 updateQuorumIfWeHavePart();
186
187 if (storage_settings->replicated_can_become_leader)
188 storage.enterLeaderElection();
189 else
190 LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
191
192 /// Anything above can throw a KeeperException if something is wrong with ZK.
193 /// Anything below should not throw exceptions.
194
195 storage.partial_shutdown_called = false;
196 storage.partial_shutdown_event.reset();
197
198 storage.queue_updating_task->activateAndSchedule();
199 storage.mutations_updating_task->activateAndSchedule();
200 storage.mutations_finalizing_task->activateAndSchedule();
201 storage.cleanup_thread.start();
202 storage.alter_thread.start();
203 storage.part_check_thread.start();
204
205 return true;
206 }
207 catch (...)
208 {
209 storage.replica_is_active_node = nullptr;
210
211 try
212 {
213 throw;
214 }
215 catch (const Coordination::Exception & e)
216 {
217 LOG_ERROR(log, "Couldn't start replication: " << e.what() << ". " << DB::getCurrentExceptionMessage(true));
218 return false;
219 }
220 catch (const Exception & e)
221 {
222 if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE)
223 throw;
224
225 LOG_ERROR(log, "Couldn't start replication: " << e.what() << ". " << DB::getCurrentExceptionMessage(true));
226 return false;
227 }
228 }
229}
230
231
232void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
233{
234 auto zookeeper = storage.getZooKeeper();
235
236 Strings failed_parts;
237 if (zookeeper->tryGetChildren(storage.zookeeper_path + "/quorum/failed_parts", failed_parts) != Coordination::ZOK)
238 return;
239
240 /// Firstly, remove parts from ZooKeeper
241 storage.tryRemovePartsFromZooKeeperWithRetries(failed_parts);
242
243 for (auto part_name : failed_parts)
244 {
245 auto part = storage.getPartIfExists(
246 part_name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
247
248 if (part)
249 {
250 LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
251 storage.forgetPartAndMoveToDetached(part, "noquorum");
252 storage.queue.removeFromVirtualParts(part->info);
253 }
254 }
255}
256
257
258void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
259{
260 auto zookeeper = storage.getZooKeeper();
261
262 String quorum_str;
263 if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status", quorum_str))
264 {
265 ReplicatedMergeTreeQuorumEntry quorum_entry;
266 quorum_entry.fromString(quorum_str);
267
268 if (!quorum_entry.replicas.count(storage.replica_name)
269 && zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name))
270 {
271 LOG_WARNING(log, "We have part " << quorum_entry.part_name
272 << " but we is not in quorum. Updating quorum. This shouldn't happen often.");
273 storage.updateQuorum(quorum_entry.part_name);
274 }
275 }
276}
277
278
279void ReplicatedMergeTreeRestartingThread::activateReplica()
280{
281 auto zookeeper = storage.getZooKeeper();
282
283 /// How other replicas can access this one.
284 ReplicatedMergeTreeAddress address = storage.getReplicatedMergeTreeAddress();
285
286 String is_active_path = storage.replica_path + "/is_active";
287
288 /** If the node is marked as active, but the mark is made in the same instance, delete it.
289 * This is possible only when session in ZooKeeper expires.
290 */
291 String data;
292 Coordination::Stat stat;
293 bool has_is_active = zookeeper->tryGet(is_active_path, data, &stat);
294 if (has_is_active && data == active_node_identifier)
295 {
296 auto code = zookeeper->tryRemove(is_active_path, stat.version);
297
298 if (code == Coordination::ZBADVERSION)
299 throw Exception("Another instance of replica " + storage.replica_path + " was created just now."
300 " You shouldn't run multiple instances of same replica. You need to check configuration files.",
301 ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
302
303 if (code && code != Coordination::ZNONODE)
304 throw Coordination::Exception(code, is_active_path);
305 }
306
307 /// Simultaneously declare that this replica is active, and update the host.
308 Coordination::Requests ops;
309 ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral));
310 ops.emplace_back(zkutil::makeSetRequest(storage.replica_path + "/host", address.toString(), -1));
311
312 try
313 {
314 zookeeper->multi(ops);
315 }
316 catch (const Coordination::Exception & e)
317 {
318 if (e.code == Coordination::ZNODEEXISTS)
319 throw Exception("Replica " + storage.replica_path + " appears to be already active. If you're sure it's not, "
320 "try again in a minute or remove znode " + storage.replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
321
322 throw;
323 }
324
325 /// `current_zookeeper` lives for the lifetime of `replica_is_active_node`,
326 /// since before changing `current_zookeeper`, `replica_is_active_node` object is destroyed in `partialShutdown` method.
327 storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(is_active_path, *storage.current_zookeeper);
328}
329
330
331void ReplicatedMergeTreeRestartingThread::partialShutdown()
332{
333 ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
334
335 storage.partial_shutdown_called = true;
336 storage.partial_shutdown_event.set();
337 storage.alter_query_event->set();
338 storage.replica_is_active_node = nullptr;
339
340 LOG_TRACE(log, "Waiting for threads to finish");
341
342 storage.exitLeaderElection();
343
344 storage.queue_updating_task->deactivate();
345 storage.mutations_updating_task->deactivate();
346 storage.mutations_finalizing_task->deactivate();
347
348 storage.cleanup_thread.stop();
349 storage.alter_thread.stop();
350 storage.part_check_thread.stop();
351
352 LOG_TRACE(log, "Threads finished");
353}
354
355
356void ReplicatedMergeTreeRestartingThread::shutdown()
357{
358 /// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
359 need_stop = true;
360 task->deactivate();
361 LOG_TRACE(log, "Restarting thread finished");
362
363 /// Stop other tasks.
364 partialShutdown();
365}
366
367}
368