1 | #pragma once |
2 | |
3 | #include <optional> |
4 | |
5 | #include <Common/ActionBlocker.h> |
6 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> |
7 | #include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h> |
8 | #include <Storages/MergeTree/ActiveDataPartSet.h> |
9 | #include <Storages/MergeTree/MergeTreeData.h> |
10 | #include <Storages/MergeTree/MergeTreeMutationStatus.h> |
11 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h> |
12 | |
13 | #include <Common/ZooKeeper/ZooKeeper.h> |
14 | #include <Core/BackgroundSchedulePool.h> |
15 | |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | class StorageReplicatedMergeTree; |
21 | class MergeTreeDataMergerMutator; |
22 | |
23 | class ReplicatedMergeTreeMergePredicate; |
24 | |
25 | |
26 | class ReplicatedMergeTreeQueue |
27 | { |
28 | private: |
29 | friend class CurrentlyExecuting; |
30 | friend class ReplicatedMergeTreeMergePredicate; |
31 | |
32 | using LogEntry = ReplicatedMergeTreeLogEntry; |
33 | using LogEntryPtr = LogEntry::Ptr; |
34 | |
35 | using Queue = std::list<LogEntryPtr>; |
36 | |
37 | using StringSet = std::set<String>; |
38 | |
39 | struct ByTime |
40 | { |
41 | bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const |
42 | { |
43 | return std::forward_as_tuple(lhs->create_time, lhs.get()) |
44 | < std::forward_as_tuple(rhs->create_time, rhs.get()); |
45 | } |
46 | }; |
47 | |
48 | /// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated. |
49 | using InsertsByTime = std::set<LogEntryPtr, ByTime>; |
50 | |
51 | |
52 | StorageReplicatedMergeTree & storage; |
53 | MergeTreeDataFormatVersion format_version; |
54 | |
55 | String zookeeper_path; |
56 | String replica_path; |
57 | String logger_name; |
58 | Logger * log = nullptr; |
59 | |
60 | /// Protects the queue, future_parts and other queue state variables. |
61 | mutable std::mutex state_mutex; |
62 | |
63 | /// A set of parts that should be on this replica according to the queue entries that have been done |
64 | /// up to this point. The invariant holds: `virtual_parts` = `current_parts` + `queue`. |
65 | /// Note: it can be different from the actual set of parts because the replica can decide to fetch |
66 | /// a bigger part instead of the part mentioned in the log entry. |
67 | ActiveDataPartSet current_parts; |
68 | |
69 | /** The queue of what you need to do on this line to catch up. It is taken from ZooKeeper (/replicas/me/queue/). |
70 | * In ZK records in chronological order. Here it is not necessary. |
71 | */ |
72 | Queue queue; |
73 | |
74 | InsertsByTime inserts_by_time; |
75 | time_t min_unprocessed_insert_time = 0; |
76 | time_t max_processed_insert_time = 0; |
77 | |
78 | time_t last_queue_update = 0; |
79 | |
80 | /// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue). |
81 | /// Used to block other actions on parts in the range covered by future_parts. |
82 | using FuturePartsSet = std::map<String, LogEntryPtr>; |
83 | FuturePartsSet future_parts; |
84 | |
85 | /// Index of the first log entry that we didn't see yet. |
86 | Int64 log_pointer = 0; |
87 | |
88 | /** What will be the set of active parts after executing all log entries up to log_pointer. |
89 | * Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate) |
90 | */ |
91 | ActiveDataPartSet virtual_parts; |
92 | |
93 | /// A set of mutations loaded from ZooKeeper. |
94 | /// mutations_by_partition is an index partition ID -> block ID -> mutation into this set. |
95 | /// Note that mutations are updated in such a way that they are always more recent than |
96 | /// log_pointer (see pullLogsToQueue()). |
97 | |
98 | struct MutationStatus |
99 | { |
100 | MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_) |
101 | : entry(entry_) |
102 | { |
103 | } |
104 | |
105 | ReplicatedMergeTreeMutationEntryPtr entry; |
106 | |
107 | /// A number of parts that should be mutated/merged or otherwise moved to Obsolete state for this mutation to complete. |
108 | Int64 parts_to_do = 0; |
109 | |
110 | /// Note that is_done is not equivalent to parts_to_do == 0 |
111 | /// (even if parts_to_do == 0 some relevant parts can still commit in the future). |
112 | bool is_done = false; |
113 | |
114 | String latest_failed_part; |
115 | MergeTreePartInfo latest_failed_part_info; |
116 | time_t latest_fail_time = 0; |
117 | String latest_fail_reason; |
118 | }; |
119 | |
120 | std::map<String, MutationStatus> mutations_by_znode; |
121 | std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition; |
122 | /// Znode ID of the latest mutation that is done. |
123 | String mutation_pointer; |
124 | |
125 | |
126 | /// Provides only one simultaneous call to pullLogsToQueue. |
127 | std::mutex pull_logs_to_queue_mutex; |
128 | |
129 | |
130 | /// List of subscribers |
131 | /// A subscriber callback is called when an entry queue is deleted |
132 | mutable std::mutex subscribers_mutex; |
133 | |
134 | using SubscriberCallBack = std::function<void(size_t /* queue_size */)>; |
135 | using Subscribers = std::list<SubscriberCallBack>; |
136 | using SubscriberIterator = Subscribers::iterator; |
137 | |
138 | friend class SubscriberHandler; |
139 | struct SubscriberHandler : public boost::noncopyable |
140 | { |
141 | SubscriberHandler(SubscriberIterator it_, ReplicatedMergeTreeQueue & queue_) : it(it_), queue(queue_) {} |
142 | ~SubscriberHandler(); |
143 | |
144 | private: |
145 | SubscriberIterator it; |
146 | ReplicatedMergeTreeQueue & queue; |
147 | }; |
148 | |
149 | Subscribers subscribers; |
150 | |
151 | /// Notify subscribers about queue change |
152 | void notifySubscribers(size_t new_queue_size); |
153 | |
154 | /// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it |
155 | bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const; |
156 | |
157 | /// Ensures that only one thread is simultaneously updating mutations. |
158 | std::mutex update_mutations_mutex; |
159 | |
160 | /// Put a set of (already existing) parts in virtual_parts. |
161 | void addVirtualParts(const MergeTreeData::DataParts & parts); |
162 | |
163 | void insertUnlocked( |
164 | const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, |
165 | std::lock_guard<std::mutex> & state_lock); |
166 | |
167 | void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); |
168 | |
169 | /** Can I now try this action. If not, you need to leave it in the queue and try another one. |
170 | * Called under the state_mutex. |
171 | */ |
172 | bool shouldExecuteLogEntry( |
173 | const LogEntry & entry, String & out_postpone_reason, |
174 | MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data, |
175 | std::lock_guard<std::mutex> & state_lock) const; |
176 | |
177 | Int64 getCurrentMutationVersionImpl(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const; |
178 | |
179 | /** Check that part isn't in currently generating parts and isn't covered by them. |
180 | * Should be called under state_mutex. |
181 | */ |
182 | bool isNotCoveredByFuturePartsImpl( |
183 | const String & new_part_name, String & out_reason, |
184 | std::lock_guard<std::mutex> & state_lock) const; |
185 | |
186 | /// After removing the queue element, update the insertion times in the RAM. Running under state_mutex. |
187 | /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper. |
188 | void updateStateOnQueueEntryRemoval(const LogEntryPtr & entry, |
189 | bool is_successful, |
190 | std::optional<time_t> & min_unprocessed_insert_time_changed, |
191 | std::optional<time_t> & max_processed_insert_time_changed, |
192 | std::unique_lock<std::mutex> & state_lock); |
193 | |
194 | /// If the new part appears (add == true) or becomes obsolete (add == false), update parts_to_do of all affected mutations. |
195 | /// Notifies storage.mutations_finalizing_task if some mutations are probably finished. |
196 | void updateMutationsPartsToDo(const String & part_name, bool add); |
197 | |
198 | /// Update the insertion times in ZooKeeper. |
199 | void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, |
200 | std::optional<time_t> min_unprocessed_insert_time_changed, |
201 | std::optional<time_t> max_processed_insert_time_changed) const; |
202 | |
203 | /// Returns list of currently executing parts blocking execution a command modifying specified range |
204 | size_t getConflictsCountForRange( |
205 | const MergeTreePartInfo & range, const LogEntry & entry, String * out_description, |
206 | std::lock_guard<std::mutex> & state_lock) const; |
207 | |
208 | /// Marks the element of the queue as running. |
209 | class CurrentlyExecuting |
210 | { |
211 | private: |
212 | ReplicatedMergeTreeQueue::LogEntryPtr entry; |
213 | ReplicatedMergeTreeQueue & queue; |
214 | |
215 | friend class ReplicatedMergeTreeQueue; |
216 | |
217 | /// Created only in the selectEntryToProcess function. It is called under mutex. |
218 | CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_); |
219 | |
220 | /// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex. |
221 | static void setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, const String & actual_part_name, |
222 | ReplicatedMergeTreeQueue & queue); |
223 | public: |
224 | ~CurrentlyExecuting(); |
225 | }; |
226 | |
227 | public: |
228 | ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); |
229 | |
230 | ~ReplicatedMergeTreeQueue(); |
231 | |
232 | |
233 | void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, |
234 | const MergeTreeData::DataParts & parts); |
235 | |
236 | /** Inserts an action to the end of the queue. |
237 | * To restore broken parts during operation. |
238 | * Do not insert the action itself into ZK (do it yourself). |
239 | */ |
240 | void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); |
241 | |
242 | /** Delete the action with the specified part (as new_part_name) from the queue. |
243 | * Called for unreachable actions in the queue - old lost parts. |
244 | */ |
245 | bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); |
246 | |
247 | /** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). |
248 | * If queue was not empty load() would not load duplicate records. |
249 | * return true, if we update queue. |
250 | */ |
251 | bool load(zkutil::ZooKeeperPtr zookeeper); |
252 | |
253 | bool removeFromVirtualParts(const MergeTreePartInfo & part_info); |
254 | |
255 | /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. |
256 | * If watch_callback is not empty, will call it when new entries appear in the log. |
257 | * If there were new entries, notifies storage.queue_task_handle. |
258 | * Additionally loads mutations (so that the set of mutations is always more recent than the queue). |
259 | */ |
260 | void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); |
261 | |
262 | /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task. |
263 | /// If watch_callback is not empty, will call it when new mutations appear in ZK. |
264 | void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); |
265 | |
266 | /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr |
267 | /// if it could not be found. |
268 | ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id); |
269 | |
270 | /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). |
271 | * And also wait for the completion of their execution, if they are now being executed. |
272 | */ |
273 | void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current); |
274 | |
275 | /** Throws and exception if there are currently executing entries in the range . |
276 | */ |
277 | void checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry); |
278 | |
279 | /** In the case where there are not enough parts to perform the merge in part_name |
280 | * - move actions with merged parts to the end of the queue |
281 | * (in order to download a already merged part from another replica). |
282 | */ |
283 | StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name); |
284 | |
285 | /** Select the next action to process. |
286 | * merger_mutator is used only to check if the merges are not suspended. |
287 | */ |
288 | using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>; |
289 | SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data); |
290 | |
291 | /** Execute `func` function to handle the action. |
292 | * In this case, at runtime, mark the queue element as running |
293 | * (add into future_parts and more). |
294 | * If there was an exception during processing, it saves it in `entry`. |
295 | * Returns true if there were no exceptions during the processing. |
296 | */ |
297 | bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func); |
298 | |
299 | /// Count the number of merges and mutations of single parts in the queue. |
300 | std::pair<size_t, size_t> countMergesAndPartMutations() const; |
301 | |
302 | /// Count the total number of active mutations. |
303 | size_t countMutations() const; |
304 | |
305 | /// Count the total number of active mutations that are finished (is_done = true). |
306 | size_t countFinishedMutations() const; |
307 | |
308 | /// Returns functor which used by MergeTreeMergerMutator to select parts for merge |
309 | ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper); |
310 | |
311 | /// Return the version (block number) of the last mutation that we don't need to apply to the part |
312 | /// with getDataVersion() == data_version. (Either this mutation was already applied or the part |
313 | /// was created after the mutation). |
314 | /// If there is no such mutation or it has already been executed and deleted, return 0. |
315 | Int64 getCurrentMutationVersion(const String & partition_id, Int64 data_version) const; |
316 | |
317 | MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const; |
318 | |
319 | /// Mark finished mutations as done. If the function needs to be called again at some later time |
320 | /// (because some mutations are probably done but we are not sure yet), returns true. |
321 | bool tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper); |
322 | |
323 | /// Prohibit merges in the specified blocks range. |
324 | /// Add part to virtual_parts, which means that part must exist |
325 | /// after processing replication log up to log_pointer. |
326 | /// Part maybe fake (look at ReplicatedMergeTreeMergePredicate). |
327 | void disableMergesInBlockRange(const String & part_name); |
328 | |
329 | /// Cheks that part is already in virtual parts |
330 | bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const; |
331 | |
332 | /// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts. |
333 | /// Locks queue's mutex. |
334 | bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason); |
335 | |
336 | /// A blocker that stops selects from the queue |
337 | ActionBlocker actions_blocker; |
338 | |
339 | /// Adds a subscriber |
340 | SubscriberHandler addSubscriber(SubscriberCallBack && callback); |
341 | |
342 | struct Status |
343 | { |
344 | UInt32 future_parts; |
345 | UInt32 queue_size; |
346 | UInt32 inserts_in_queue; |
347 | UInt32 merges_in_queue; |
348 | UInt32 part_mutations_in_queue; |
349 | UInt32 queue_oldest_time; |
350 | UInt32 inserts_oldest_time; |
351 | UInt32 merges_oldest_time; |
352 | UInt32 part_mutations_oldest_time; |
353 | String oldest_part_to_get; |
354 | String oldest_part_to_merge_to; |
355 | String oldest_part_to_mutate_to; |
356 | UInt32 last_queue_update; |
357 | }; |
358 | |
359 | /// Get information about the queue. |
360 | Status getStatus() const; |
361 | |
362 | /// Get the data of the queue elements. |
363 | using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; |
364 | void getEntries(LogEntriesData & res) const; |
365 | |
366 | /// Get information about the insertion times. |
367 | void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; |
368 | |
369 | std::vector<MergeTreeMutationStatus> getMutationsStatus() const; |
370 | }; |
371 | |
372 | class ReplicatedMergeTreeMergePredicate |
373 | { |
374 | public: |
375 | ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper); |
376 | |
377 | /// Can we assign a merge with these two parts? |
378 | /// (assuming that no merge was assigned after the predicate was constructed) |
379 | /// If we can't and out_reason is not nullptr, set it to the reason why we can't merge. |
380 | bool operator()( |
381 | const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, |
382 | String * out_reason = nullptr) const; |
383 | |
384 | /// Return nonempty optional if the part can and should be mutated. |
385 | /// Returned mutation version number is always the biggest possible. |
386 | std::optional<Int64> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; |
387 | |
388 | bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const; |
389 | |
390 | private: |
391 | const ReplicatedMergeTreeQueue & queue; |
392 | |
393 | /// A snapshot of active parts that would appear if the replica executes all log entries in its queue. |
394 | ActiveDataPartSet prev_virtual_parts; |
395 | /// partition ID -> block numbers of the inserts and mutations that are about to commit |
396 | /// (loaded at some later time than prev_virtual_parts). |
397 | std::unordered_map<String, std::set<Int64>> committing_blocks; |
398 | |
399 | /// Quorum state taken at some later time than prev_virtual_parts. |
400 | std::set<std::string> last_quorum_parts; |
401 | String inprogress_quorum_part; |
402 | }; |
403 | |
404 | |
405 | /** Convert a number to a string in the format of the suffixes of auto-incremental nodes in ZooKeeper. |
406 | * Negative numbers are also supported - for them the name of the node looks somewhat silly |
407 | * and does not match any auto-incremented node in ZK. |
408 | */ |
409 | String padIndex(Int64 index); |
410 | |
411 | } |
412 | |