1#pragma once
2
3#include <optional>
4
5#include <Common/ActionBlocker.h>
6#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
7#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
8#include <Storages/MergeTree/ActiveDataPartSet.h>
9#include <Storages/MergeTree/MergeTreeData.h>
10#include <Storages/MergeTree/MergeTreeMutationStatus.h>
11#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
12
13#include <Common/ZooKeeper/ZooKeeper.h>
14#include <Core/BackgroundSchedulePool.h>
15
16
17namespace DB
18{
19
20class StorageReplicatedMergeTree;
21class MergeTreeDataMergerMutator;
22
23class ReplicatedMergeTreeMergePredicate;
24
25
26class ReplicatedMergeTreeQueue
27{
28private:
29 friend class CurrentlyExecuting;
30 friend class ReplicatedMergeTreeMergePredicate;
31
32 using LogEntry = ReplicatedMergeTreeLogEntry;
33 using LogEntryPtr = LogEntry::Ptr;
34
35 using Queue = std::list<LogEntryPtr>;
36
37 using StringSet = std::set<String>;
38
39 struct ByTime
40 {
41 bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const
42 {
43 return std::forward_as_tuple(lhs->create_time, lhs.get())
44 < std::forward_as_tuple(rhs->create_time, rhs.get());
45 }
46 };
47
48 /// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated.
49 using InsertsByTime = std::set<LogEntryPtr, ByTime>;
50
51
52 StorageReplicatedMergeTree & storage;
53 MergeTreeDataFormatVersion format_version;
54
55 String zookeeper_path;
56 String replica_path;
57 String logger_name;
58 Logger * log = nullptr;
59
60 /// Protects the queue, future_parts and other queue state variables.
61 mutable std::mutex state_mutex;
62
63 /// A set of parts that should be on this replica according to the queue entries that have been done
64 /// up to this point. The invariant holds: `virtual_parts` = `current_parts` + `queue`.
65 /// Note: it can be different from the actual set of parts because the replica can decide to fetch
66 /// a bigger part instead of the part mentioned in the log entry.
67 ActiveDataPartSet current_parts;
68
69 /** The queue of what you need to do on this line to catch up. It is taken from ZooKeeper (/replicas/me/queue/).
70 * In ZK records in chronological order. Here it is not necessary.
71 */
72 Queue queue;
73
74 InsertsByTime inserts_by_time;
75 time_t min_unprocessed_insert_time = 0;
76 time_t max_processed_insert_time = 0;
77
78 time_t last_queue_update = 0;
79
80 /// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue).
81 /// Used to block other actions on parts in the range covered by future_parts.
82 using FuturePartsSet = std::map<String, LogEntryPtr>;
83 FuturePartsSet future_parts;
84
85 /// Index of the first log entry that we didn't see yet.
86 Int64 log_pointer = 0;
87
88 /** What will be the set of active parts after executing all log entries up to log_pointer.
89 * Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate)
90 */
91 ActiveDataPartSet virtual_parts;
92
93 /// A set of mutations loaded from ZooKeeper.
94 /// mutations_by_partition is an index partition ID -> block ID -> mutation into this set.
95 /// Note that mutations are updated in such a way that they are always more recent than
96 /// log_pointer (see pullLogsToQueue()).
97
98 struct MutationStatus
99 {
100 MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_)
101 : entry(entry_)
102 {
103 }
104
105 ReplicatedMergeTreeMutationEntryPtr entry;
106
107 /// A number of parts that should be mutated/merged or otherwise moved to Obsolete state for this mutation to complete.
108 Int64 parts_to_do = 0;
109
110 /// Note that is_done is not equivalent to parts_to_do == 0
111 /// (even if parts_to_do == 0 some relevant parts can still commit in the future).
112 bool is_done = false;
113
114 String latest_failed_part;
115 MergeTreePartInfo latest_failed_part_info;
116 time_t latest_fail_time = 0;
117 String latest_fail_reason;
118 };
119
120 std::map<String, MutationStatus> mutations_by_znode;
121 std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
122 /// Znode ID of the latest mutation that is done.
123 String mutation_pointer;
124
125
126 /// Provides only one simultaneous call to pullLogsToQueue.
127 std::mutex pull_logs_to_queue_mutex;
128
129
130 /// List of subscribers
131 /// A subscriber callback is called when an entry queue is deleted
132 mutable std::mutex subscribers_mutex;
133
134 using SubscriberCallBack = std::function<void(size_t /* queue_size */)>;
135 using Subscribers = std::list<SubscriberCallBack>;
136 using SubscriberIterator = Subscribers::iterator;
137
138 friend class SubscriberHandler;
139 struct SubscriberHandler : public boost::noncopyable
140 {
141 SubscriberHandler(SubscriberIterator it_, ReplicatedMergeTreeQueue & queue_) : it(it_), queue(queue_) {}
142 ~SubscriberHandler();
143
144 private:
145 SubscriberIterator it;
146 ReplicatedMergeTreeQueue & queue;
147 };
148
149 Subscribers subscribers;
150
151 /// Notify subscribers about queue change
152 void notifySubscribers(size_t new_queue_size);
153
154 /// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
155 bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;
156
157 /// Ensures that only one thread is simultaneously updating mutations.
158 std::mutex update_mutations_mutex;
159
160 /// Put a set of (already existing) parts in virtual_parts.
161 void addVirtualParts(const MergeTreeData::DataParts & parts);
162
163 void insertUnlocked(
164 const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
165 std::lock_guard<std::mutex> & state_lock);
166
167 void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
168
169 /** Can I now try this action. If not, you need to leave it in the queue and try another one.
170 * Called under the state_mutex.
171 */
172 bool shouldExecuteLogEntry(
173 const LogEntry & entry, String & out_postpone_reason,
174 MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data,
175 std::lock_guard<std::mutex> & state_lock) const;
176
177 Int64 getCurrentMutationVersionImpl(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const;
178
179 /** Check that part isn't in currently generating parts and isn't covered by them.
180 * Should be called under state_mutex.
181 */
182 bool isNotCoveredByFuturePartsImpl(
183 const String & new_part_name, String & out_reason,
184 std::lock_guard<std::mutex> & state_lock) const;
185
186 /// After removing the queue element, update the insertion times in the RAM. Running under state_mutex.
187 /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
188 void updateStateOnQueueEntryRemoval(const LogEntryPtr & entry,
189 bool is_successful,
190 std::optional<time_t> & min_unprocessed_insert_time_changed,
191 std::optional<time_t> & max_processed_insert_time_changed,
192 std::unique_lock<std::mutex> & state_lock);
193
194 /// If the new part appears (add == true) or becomes obsolete (add == false), update parts_to_do of all affected mutations.
195 /// Notifies storage.mutations_finalizing_task if some mutations are probably finished.
196 void updateMutationsPartsToDo(const String & part_name, bool add);
197
198 /// Update the insertion times in ZooKeeper.
199 void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,
200 std::optional<time_t> min_unprocessed_insert_time_changed,
201 std::optional<time_t> max_processed_insert_time_changed) const;
202
203 /// Returns list of currently executing parts blocking execution a command modifying specified range
204 size_t getConflictsCountForRange(
205 const MergeTreePartInfo & range, const LogEntry & entry, String * out_description,
206 std::lock_guard<std::mutex> & state_lock) const;
207
208 /// Marks the element of the queue as running.
209 class CurrentlyExecuting
210 {
211 private:
212 ReplicatedMergeTreeQueue::LogEntryPtr entry;
213 ReplicatedMergeTreeQueue & queue;
214
215 friend class ReplicatedMergeTreeQueue;
216
217 /// Created only in the selectEntryToProcess function. It is called under mutex.
218 CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_);
219
220 /// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex.
221 static void setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, const String & actual_part_name,
222 ReplicatedMergeTreeQueue & queue);
223 public:
224 ~CurrentlyExecuting();
225 };
226
227public:
228 ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
229
230 ~ReplicatedMergeTreeQueue();
231
232
233 void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
234 const MergeTreeData::DataParts & parts);
235
236 /** Inserts an action to the end of the queue.
237 * To restore broken parts during operation.
238 * Do not insert the action itself into ZK (do it yourself).
239 */
240 void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
241
242 /** Delete the action with the specified part (as new_part_name) from the queue.
243 * Called for unreachable actions in the queue - old lost parts.
244 */
245 bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
246
247 /** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
248 * If queue was not empty load() would not load duplicate records.
249 * return true, if we update queue.
250 */
251 bool load(zkutil::ZooKeeperPtr zookeeper);
252
253 bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
254
255 /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
256 * If watch_callback is not empty, will call it when new entries appear in the log.
257 * If there were new entries, notifies storage.queue_task_handle.
258 * Additionally loads mutations (so that the set of mutations is always more recent than the queue).
259 */
260 void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
261
262 /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
263 /// If watch_callback is not empty, will call it when new mutations appear in ZK.
264 void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
265
266 /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr
267 /// if it could not be found.
268 ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id);
269
270 /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
271 * And also wait for the completion of their execution, if they are now being executed.
272 */
273 void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current);
274
275 /** Throws and exception if there are currently executing entries in the range .
276 */
277 void checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry);
278
279 /** In the case where there are not enough parts to perform the merge in part_name
280 * - move actions with merged parts to the end of the queue
281 * (in order to download a already merged part from another replica).
282 */
283 StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name);
284
285 /** Select the next action to process.
286 * merger_mutator is used only to check if the merges are not suspended.
287 */
288 using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>;
289 SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data);
290
291 /** Execute `func` function to handle the action.
292 * In this case, at runtime, mark the queue element as running
293 * (add into future_parts and more).
294 * If there was an exception during processing, it saves it in `entry`.
295 * Returns true if there were no exceptions during the processing.
296 */
297 bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
298
299 /// Count the number of merges and mutations of single parts in the queue.
300 std::pair<size_t, size_t> countMergesAndPartMutations() const;
301
302 /// Count the total number of active mutations.
303 size_t countMutations() const;
304
305 /// Count the total number of active mutations that are finished (is_done = true).
306 size_t countFinishedMutations() const;
307
308 /// Returns functor which used by MergeTreeMergerMutator to select parts for merge
309 ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper);
310
311 /// Return the version (block number) of the last mutation that we don't need to apply to the part
312 /// with getDataVersion() == data_version. (Either this mutation was already applied or the part
313 /// was created after the mutation).
314 /// If there is no such mutation or it has already been executed and deleted, return 0.
315 Int64 getCurrentMutationVersion(const String & partition_id, Int64 data_version) const;
316
317 MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const;
318
319 /// Mark finished mutations as done. If the function needs to be called again at some later time
320 /// (because some mutations are probably done but we are not sure yet), returns true.
321 bool tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper);
322
323 /// Prohibit merges in the specified blocks range.
324 /// Add part to virtual_parts, which means that part must exist
325 /// after processing replication log up to log_pointer.
326 /// Part maybe fake (look at ReplicatedMergeTreeMergePredicate).
327 void disableMergesInBlockRange(const String & part_name);
328
329 /// Cheks that part is already in virtual parts
330 bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const;
331
332 /// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
333 /// Locks queue's mutex.
334 bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason);
335
336 /// A blocker that stops selects from the queue
337 ActionBlocker actions_blocker;
338
339 /// Adds a subscriber
340 SubscriberHandler addSubscriber(SubscriberCallBack && callback);
341
342 struct Status
343 {
344 UInt32 future_parts;
345 UInt32 queue_size;
346 UInt32 inserts_in_queue;
347 UInt32 merges_in_queue;
348 UInt32 part_mutations_in_queue;
349 UInt32 queue_oldest_time;
350 UInt32 inserts_oldest_time;
351 UInt32 merges_oldest_time;
352 UInt32 part_mutations_oldest_time;
353 String oldest_part_to_get;
354 String oldest_part_to_merge_to;
355 String oldest_part_to_mutate_to;
356 UInt32 last_queue_update;
357 };
358
359 /// Get information about the queue.
360 Status getStatus() const;
361
362 /// Get the data of the queue elements.
363 using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
364 void getEntries(LogEntriesData & res) const;
365
366 /// Get information about the insertion times.
367 void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
368
369 std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
370};
371
372class ReplicatedMergeTreeMergePredicate
373{
374public:
375 ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper);
376
377 /// Can we assign a merge with these two parts?
378 /// (assuming that no merge was assigned after the predicate was constructed)
379 /// If we can't and out_reason is not nullptr, set it to the reason why we can't merge.
380 bool operator()(
381 const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
382 String * out_reason = nullptr) const;
383
384 /// Return nonempty optional if the part can and should be mutated.
385 /// Returned mutation version number is always the biggest possible.
386 std::optional<Int64> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
387
388 bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const;
389
390private:
391 const ReplicatedMergeTreeQueue & queue;
392
393 /// A snapshot of active parts that would appear if the replica executes all log entries in its queue.
394 ActiveDataPartSet prev_virtual_parts;
395 /// partition ID -> block numbers of the inserts and mutations that are about to commit
396 /// (loaded at some later time than prev_virtual_parts).
397 std::unordered_map<String, std::set<Int64>> committing_blocks;
398
399 /// Quorum state taken at some later time than prev_virtual_parts.
400 std::set<std::string> last_quorum_parts;
401 String inprogress_quorum_part;
402};
403
404
405/** Convert a number to a string in the format of the suffixes of auto-incremental nodes in ZooKeeper.
406 * Negative numbers are also supported - for them the name of the node looks somewhat silly
407 * and does not match any auto-incremented node in ZK.
408 */
409String padIndex(Int64 index);
410
411}
412