1 | #include <IO/Operators.h> |
2 | #include <Storages/StorageReplicatedMergeTree.h> |
3 | #include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h> |
4 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> |
5 | #include <Storages/MergeTree/ReplicatedMergeTreeAddress.h> |
6 | #include <Common/ZooKeeper/KeeperException.h> |
7 | #include <Common/randomSeed.h> |
8 | |
9 | |
10 | namespace ProfileEvents |
11 | { |
12 | extern const Event ReplicaYieldLeadership; |
13 | extern const Event ReplicaPartialShutdown; |
14 | } |
15 | |
16 | namespace CurrentMetrics |
17 | { |
18 | extern const Metric ReadonlyReplica; |
19 | } |
20 | |
21 | |
22 | namespace DB |
23 | { |
24 | |
25 | namespace ErrorCodes |
26 | { |
27 | extern const int REPLICA_IS_ALREADY_ACTIVE; |
28 | } |
29 | |
30 | namespace |
31 | { |
32 | constexpr auto retry_period_ms = 10 * 1000; |
33 | } |
34 | |
35 | /// Used to check whether it's us who set node `is_active`, or not. |
36 | static String generateActiveNodeIdentifier() |
37 | { |
38 | return "pid: " + toString(getpid()) + ", random: " + toString(randomSeed()); |
39 | } |
40 | |
41 | ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_) |
42 | : storage(storage_) |
43 | , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeRestartingThread)" ) |
44 | , log(&Logger::get(log_name)) |
45 | , active_node_identifier(generateActiveNodeIdentifier()) |
46 | { |
47 | const auto storage_settings = storage.getSettings(); |
48 | check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000; |
49 | |
50 | /// Periodicity of checking lag of replica. |
51 | if (check_period_ms > static_cast<Int64>(storage_settings->check_delay_period) * 1000) |
52 | check_period_ms = storage_settings->check_delay_period * 1000; |
53 | |
54 | task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); }); |
55 | } |
56 | |
57 | void ReplicatedMergeTreeRestartingThread::run() |
58 | { |
59 | if (need_stop) |
60 | return; |
61 | |
62 | try |
63 | { |
64 | if (first_time || storage.getZooKeeper()->expired()) |
65 | { |
66 | startup_completed = false; |
67 | |
68 | if (first_time) |
69 | { |
70 | LOG_DEBUG(log, "Activating replica." ); |
71 | } |
72 | else |
73 | { |
74 | LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session." ); |
75 | |
76 | bool old_val = false; |
77 | if (storage.is_readonly.compare_exchange_strong(old_val, true)) |
78 | CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); |
79 | |
80 | partialShutdown(); |
81 | } |
82 | |
83 | if (!startup_completed) |
84 | { |
85 | try |
86 | { |
87 | storage.setZooKeeper(storage.global_context.getZooKeeper()); |
88 | } |
89 | catch (const Coordination::Exception &) |
90 | { |
91 | /// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again. |
92 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
93 | |
94 | if (first_time) |
95 | storage.startup_event.set(); |
96 | task->scheduleAfter(retry_period_ms); |
97 | return; |
98 | } |
99 | |
100 | if (!need_stop && !tryStartup()) |
101 | { |
102 | if (first_time) |
103 | storage.startup_event.set(); |
104 | task->scheduleAfter(retry_period_ms); |
105 | return; |
106 | } |
107 | |
108 | if (first_time) |
109 | storage.startup_event.set(); |
110 | |
111 | startup_completed = true; |
112 | } |
113 | |
114 | if (need_stop) |
115 | return; |
116 | |
117 | bool old_val = true; |
118 | if (storage.is_readonly.compare_exchange_strong(old_val, false)) |
119 | CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); |
120 | |
121 | first_time = false; |
122 | } |
123 | |
124 | time_t current_time = time(nullptr); |
125 | const auto storage_settings = storage.getSettings(); |
126 | if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage_settings->check_delay_period)) |
127 | { |
128 | /// Find out lag of replicas. |
129 | time_t absolute_delay = 0; |
130 | time_t relative_delay = 0; |
131 | |
132 | storage.getReplicaDelays(absolute_delay, relative_delay); |
133 | |
134 | if (absolute_delay) |
135 | LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << "." ); |
136 | |
137 | prev_time_of_check_delay = current_time; |
138 | |
139 | /// We give up leadership if the relative lag is greater than threshold. |
140 | if (storage.is_leader |
141 | && relative_delay > static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership)) |
142 | { |
143 | LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" |
144 | << storage_settings->min_relative_delay_to_yield_leadership << "). Will yield leadership." ); |
145 | |
146 | ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); |
147 | |
148 | storage.exitLeaderElection(); |
149 | /// NOTE: enterLeaderElection() can throw if node creation in ZK fails. |
150 | /// This is bad because we can end up without a leader on any replica. |
151 | /// In this case we rely on the fact that the session will expire and we will reconnect. |
152 | storage.enterLeaderElection(); |
153 | } |
154 | } |
155 | } |
156 | catch (...) |
157 | { |
158 | storage.startup_event.set(); |
159 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
160 | } |
161 | |
162 | task->scheduleAfter(check_period_ms); |
163 | } |
164 | |
165 | |
166 | bool ReplicatedMergeTreeRestartingThread::tryStartup() |
167 | { |
168 | try |
169 | { |
170 | removeFailedQuorumParts(); |
171 | activateReplica(); |
172 | |
173 | const auto & zookeeper = storage.getZooKeeper(); |
174 | const auto storage_settings = storage.getSettings(); |
175 | |
176 | storage.cloneReplicaIfNeeded(zookeeper); |
177 | |
178 | storage.queue.load(zookeeper); |
179 | |
180 | /// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost); |
181 | /// because cleanup_thread doesn't delete log_pointer of active replicas. |
182 | storage.queue.pullLogsToQueue(zookeeper); |
183 | storage.last_queue_update_finish_time.store(time(nullptr)); |
184 | |
185 | updateQuorumIfWeHavePart(); |
186 | |
187 | if (storage_settings->replicated_can_become_leader) |
188 | storage.enterLeaderElection(); |
189 | else |
190 | LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0" ); |
191 | |
192 | /// Anything above can throw a KeeperException if something is wrong with ZK. |
193 | /// Anything below should not throw exceptions. |
194 | |
195 | storage.partial_shutdown_called = false; |
196 | storage.partial_shutdown_event.reset(); |
197 | |
198 | storage.queue_updating_task->activateAndSchedule(); |
199 | storage.mutations_updating_task->activateAndSchedule(); |
200 | storage.mutations_finalizing_task->activateAndSchedule(); |
201 | storage.cleanup_thread.start(); |
202 | storage.alter_thread.start(); |
203 | storage.part_check_thread.start(); |
204 | |
205 | return true; |
206 | } |
207 | catch (...) |
208 | { |
209 | storage.replica_is_active_node = nullptr; |
210 | |
211 | try |
212 | { |
213 | throw; |
214 | } |
215 | catch (const Coordination::Exception & e) |
216 | { |
217 | LOG_ERROR(log, "Couldn't start replication: " << e.what() << ". " << DB::getCurrentExceptionMessage(true)); |
218 | return false; |
219 | } |
220 | catch (const Exception & e) |
221 | { |
222 | if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE) |
223 | throw; |
224 | |
225 | LOG_ERROR(log, "Couldn't start replication: " << e.what() << ". " << DB::getCurrentExceptionMessage(true)); |
226 | return false; |
227 | } |
228 | } |
229 | } |
230 | |
231 | |
232 | void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts() |
233 | { |
234 | auto zookeeper = storage.getZooKeeper(); |
235 | |
236 | Strings failed_parts; |
237 | if (zookeeper->tryGetChildren(storage.zookeeper_path + "/quorum/failed_parts" , failed_parts) != Coordination::ZOK) |
238 | return; |
239 | |
240 | /// Firstly, remove parts from ZooKeeper |
241 | storage.tryRemovePartsFromZooKeeperWithRetries(failed_parts); |
242 | |
243 | for (auto part_name : failed_parts) |
244 | { |
245 | auto part = storage.getPartIfExists( |
246 | part_name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); |
247 | |
248 | if (part) |
249 | { |
250 | LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often." ); |
251 | storage.forgetPartAndMoveToDetached(part, "noquorum" ); |
252 | storage.queue.removeFromVirtualParts(part->info); |
253 | } |
254 | } |
255 | } |
256 | |
257 | |
258 | void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart() |
259 | { |
260 | auto zookeeper = storage.getZooKeeper(); |
261 | |
262 | String quorum_str; |
263 | if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status" , quorum_str)) |
264 | { |
265 | ReplicatedMergeTreeQuorumEntry quorum_entry; |
266 | quorum_entry.fromString(quorum_str); |
267 | |
268 | if (!quorum_entry.replicas.count(storage.replica_name) |
269 | && zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name)) |
270 | { |
271 | LOG_WARNING(log, "We have part " << quorum_entry.part_name |
272 | << " but we is not in quorum. Updating quorum. This shouldn't happen often." ); |
273 | storage.updateQuorum(quorum_entry.part_name); |
274 | } |
275 | } |
276 | } |
277 | |
278 | |
279 | void ReplicatedMergeTreeRestartingThread::activateReplica() |
280 | { |
281 | auto zookeeper = storage.getZooKeeper(); |
282 | |
283 | /// How other replicas can access this one. |
284 | ReplicatedMergeTreeAddress address = storage.getReplicatedMergeTreeAddress(); |
285 | |
286 | String is_active_path = storage.replica_path + "/is_active" ; |
287 | |
288 | /** If the node is marked as active, but the mark is made in the same instance, delete it. |
289 | * This is possible only when session in ZooKeeper expires. |
290 | */ |
291 | String data; |
292 | Coordination::Stat stat; |
293 | bool has_is_active = zookeeper->tryGet(is_active_path, data, &stat); |
294 | if (has_is_active && data == active_node_identifier) |
295 | { |
296 | auto code = zookeeper->tryRemove(is_active_path, stat.version); |
297 | |
298 | if (code == Coordination::ZBADVERSION) |
299 | throw Exception("Another instance of replica " + storage.replica_path + " was created just now." |
300 | " You shouldn't run multiple instances of same replica. You need to check configuration files." , |
301 | ErrorCodes::REPLICA_IS_ALREADY_ACTIVE); |
302 | |
303 | if (code && code != Coordination::ZNONODE) |
304 | throw Coordination::Exception(code, is_active_path); |
305 | } |
306 | |
307 | /// Simultaneously declare that this replica is active, and update the host. |
308 | Coordination::Requests ops; |
309 | ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral)); |
310 | ops.emplace_back(zkutil::makeSetRequest(storage.replica_path + "/host" , address.toString(), -1)); |
311 | |
312 | try |
313 | { |
314 | zookeeper->multi(ops); |
315 | } |
316 | catch (const Coordination::Exception & e) |
317 | { |
318 | if (e.code == Coordination::ZNODEEXISTS) |
319 | throw Exception("Replica " + storage.replica_path + " appears to be already active. If you're sure it's not, " |
320 | "try again in a minute or remove znode " + storage.replica_path + "/is_active manually" , ErrorCodes::REPLICA_IS_ALREADY_ACTIVE); |
321 | |
322 | throw; |
323 | } |
324 | |
325 | /// `current_zookeeper` lives for the lifetime of `replica_is_active_node`, |
326 | /// since before changing `current_zookeeper`, `replica_is_active_node` object is destroyed in `partialShutdown` method. |
327 | storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(is_active_path, *storage.current_zookeeper); |
328 | } |
329 | |
330 | |
331 | void ReplicatedMergeTreeRestartingThread::partialShutdown() |
332 | { |
333 | ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown); |
334 | |
335 | storage.partial_shutdown_called = true; |
336 | storage.partial_shutdown_event.set(); |
337 | storage.alter_query_event->set(); |
338 | storage.replica_is_active_node = nullptr; |
339 | |
340 | LOG_TRACE(log, "Waiting for threads to finish" ); |
341 | |
342 | storage.exitLeaderElection(); |
343 | |
344 | storage.queue_updating_task->deactivate(); |
345 | storage.mutations_updating_task->deactivate(); |
346 | storage.mutations_finalizing_task->deactivate(); |
347 | |
348 | storage.cleanup_thread.stop(); |
349 | storage.alter_thread.stop(); |
350 | storage.part_check_thread.stop(); |
351 | |
352 | LOG_TRACE(log, "Threads finished" ); |
353 | } |
354 | |
355 | |
356 | void ReplicatedMergeTreeRestartingThread::shutdown() |
357 | { |
358 | /// Stop restarting_thread before stopping other tasks - so that it won't restart them again. |
359 | need_stop = true; |
360 | task->deactivate(); |
361 | LOG_TRACE(log, "Restarting thread finished" ); |
362 | |
363 | /// Stop other tasks. |
364 | partialShutdown(); |
365 | } |
366 | |
367 | } |
368 | |