| 1 | #pragma once |
| 2 | |
| 3 | #include <optional> |
| 4 | |
| 5 | #include <Common/ActionBlocker.h> |
| 6 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> |
| 7 | #include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h> |
| 8 | #include <Storages/MergeTree/ActiveDataPartSet.h> |
| 9 | #include <Storages/MergeTree/MergeTreeData.h> |
| 10 | #include <Storages/MergeTree/MergeTreeMutationStatus.h> |
| 11 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h> |
| 12 | |
| 13 | #include <Common/ZooKeeper/ZooKeeper.h> |
| 14 | #include <Core/BackgroundSchedulePool.h> |
| 15 | |
| 16 | |
| 17 | namespace DB |
| 18 | { |
| 19 | |
| 20 | class StorageReplicatedMergeTree; |
| 21 | class MergeTreeDataMergerMutator; |
| 22 | |
| 23 | class ReplicatedMergeTreeMergePredicate; |
| 24 | |
| 25 | |
| 26 | class ReplicatedMergeTreeQueue |
| 27 | { |
| 28 | private: |
| 29 | friend class CurrentlyExecuting; |
| 30 | friend class ReplicatedMergeTreeMergePredicate; |
| 31 | |
| 32 | using LogEntry = ReplicatedMergeTreeLogEntry; |
| 33 | using LogEntryPtr = LogEntry::Ptr; |
| 34 | |
| 35 | using Queue = std::list<LogEntryPtr>; |
| 36 | |
| 37 | using StringSet = std::set<String>; |
| 38 | |
| 39 | struct ByTime |
| 40 | { |
| 41 | bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const |
| 42 | { |
| 43 | return std::forward_as_tuple(lhs->create_time, lhs.get()) |
| 44 | < std::forward_as_tuple(rhs->create_time, rhs.get()); |
| 45 | } |
| 46 | }; |
| 47 | |
| 48 | /// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated. |
| 49 | using InsertsByTime = std::set<LogEntryPtr, ByTime>; |
| 50 | |
| 51 | |
| 52 | StorageReplicatedMergeTree & storage; |
| 53 | MergeTreeDataFormatVersion format_version; |
| 54 | |
| 55 | String zookeeper_path; |
| 56 | String replica_path; |
| 57 | String logger_name; |
| 58 | Logger * log = nullptr; |
| 59 | |
| 60 | /// Protects the queue, future_parts and other queue state variables. |
| 61 | mutable std::mutex state_mutex; |
| 62 | |
| 63 | /// A set of parts that should be on this replica according to the queue entries that have been done |
| 64 | /// up to this point. The invariant holds: `virtual_parts` = `current_parts` + `queue`. |
| 65 | /// Note: it can be different from the actual set of parts because the replica can decide to fetch |
| 66 | /// a bigger part instead of the part mentioned in the log entry. |
| 67 | ActiveDataPartSet current_parts; |
| 68 | |
| 69 | /** The queue of what you need to do on this line to catch up. It is taken from ZooKeeper (/replicas/me/queue/). |
| 70 | * In ZK records in chronological order. Here it is not necessary. |
| 71 | */ |
| 72 | Queue queue; |
| 73 | |
| 74 | InsertsByTime inserts_by_time; |
| 75 | time_t min_unprocessed_insert_time = 0; |
| 76 | time_t max_processed_insert_time = 0; |
| 77 | |
| 78 | time_t last_queue_update = 0; |
| 79 | |
| 80 | /// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue). |
| 81 | /// Used to block other actions on parts in the range covered by future_parts. |
| 82 | using FuturePartsSet = std::map<String, LogEntryPtr>; |
| 83 | FuturePartsSet future_parts; |
| 84 | |
| 85 | /// Index of the first log entry that we didn't see yet. |
| 86 | Int64 log_pointer = 0; |
| 87 | |
| 88 | /** What will be the set of active parts after executing all log entries up to log_pointer. |
| 89 | * Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate) |
| 90 | */ |
| 91 | ActiveDataPartSet virtual_parts; |
| 92 | |
| 93 | /// A set of mutations loaded from ZooKeeper. |
| 94 | /// mutations_by_partition is an index partition ID -> block ID -> mutation into this set. |
| 95 | /// Note that mutations are updated in such a way that they are always more recent than |
| 96 | /// log_pointer (see pullLogsToQueue()). |
| 97 | |
| 98 | struct MutationStatus |
| 99 | { |
| 100 | MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_) |
| 101 | : entry(entry_) |
| 102 | { |
| 103 | } |
| 104 | |
| 105 | ReplicatedMergeTreeMutationEntryPtr entry; |
| 106 | |
| 107 | /// A number of parts that should be mutated/merged or otherwise moved to Obsolete state for this mutation to complete. |
| 108 | Int64 parts_to_do = 0; |
| 109 | |
| 110 | /// Note that is_done is not equivalent to parts_to_do == 0 |
| 111 | /// (even if parts_to_do == 0 some relevant parts can still commit in the future). |
| 112 | bool is_done = false; |
| 113 | |
| 114 | String latest_failed_part; |
| 115 | MergeTreePartInfo latest_failed_part_info; |
| 116 | time_t latest_fail_time = 0; |
| 117 | String latest_fail_reason; |
| 118 | }; |
| 119 | |
| 120 | std::map<String, MutationStatus> mutations_by_znode; |
| 121 | std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition; |
| 122 | /// Znode ID of the latest mutation that is done. |
| 123 | String mutation_pointer; |
| 124 | |
| 125 | |
| 126 | /// Provides only one simultaneous call to pullLogsToQueue. |
| 127 | std::mutex pull_logs_to_queue_mutex; |
| 128 | |
| 129 | |
| 130 | /// List of subscribers |
| 131 | /// A subscriber callback is called when an entry queue is deleted |
| 132 | mutable std::mutex subscribers_mutex; |
| 133 | |
| 134 | using SubscriberCallBack = std::function<void(size_t /* queue_size */)>; |
| 135 | using Subscribers = std::list<SubscriberCallBack>; |
| 136 | using SubscriberIterator = Subscribers::iterator; |
| 137 | |
| 138 | friend class SubscriberHandler; |
| 139 | struct SubscriberHandler : public boost::noncopyable |
| 140 | { |
| 141 | SubscriberHandler(SubscriberIterator it_, ReplicatedMergeTreeQueue & queue_) : it(it_), queue(queue_) {} |
| 142 | ~SubscriberHandler(); |
| 143 | |
| 144 | private: |
| 145 | SubscriberIterator it; |
| 146 | ReplicatedMergeTreeQueue & queue; |
| 147 | }; |
| 148 | |
| 149 | Subscribers subscribers; |
| 150 | |
| 151 | /// Notify subscribers about queue change |
| 152 | void notifySubscribers(size_t new_queue_size); |
| 153 | |
| 154 | /// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it |
| 155 | bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const; |
| 156 | |
| 157 | /// Ensures that only one thread is simultaneously updating mutations. |
| 158 | std::mutex update_mutations_mutex; |
| 159 | |
| 160 | /// Put a set of (already existing) parts in virtual_parts. |
| 161 | void addVirtualParts(const MergeTreeData::DataParts & parts); |
| 162 | |
| 163 | void insertUnlocked( |
| 164 | const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, |
| 165 | std::lock_guard<std::mutex> & state_lock); |
| 166 | |
| 167 | void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); |
| 168 | |
| 169 | /** Can I now try this action. If not, you need to leave it in the queue and try another one. |
| 170 | * Called under the state_mutex. |
| 171 | */ |
| 172 | bool shouldExecuteLogEntry( |
| 173 | const LogEntry & entry, String & out_postpone_reason, |
| 174 | MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data, |
| 175 | std::lock_guard<std::mutex> & state_lock) const; |
| 176 | |
| 177 | Int64 getCurrentMutationVersionImpl(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const; |
| 178 | |
| 179 | /** Check that part isn't in currently generating parts and isn't covered by them. |
| 180 | * Should be called under state_mutex. |
| 181 | */ |
| 182 | bool isNotCoveredByFuturePartsImpl( |
| 183 | const String & new_part_name, String & out_reason, |
| 184 | std::lock_guard<std::mutex> & state_lock) const; |
| 185 | |
| 186 | /// After removing the queue element, update the insertion times in the RAM. Running under state_mutex. |
| 187 | /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper. |
| 188 | void updateStateOnQueueEntryRemoval(const LogEntryPtr & entry, |
| 189 | bool is_successful, |
| 190 | std::optional<time_t> & min_unprocessed_insert_time_changed, |
| 191 | std::optional<time_t> & max_processed_insert_time_changed, |
| 192 | std::unique_lock<std::mutex> & state_lock); |
| 193 | |
| 194 | /// If the new part appears (add == true) or becomes obsolete (add == false), update parts_to_do of all affected mutations. |
| 195 | /// Notifies storage.mutations_finalizing_task if some mutations are probably finished. |
| 196 | void updateMutationsPartsToDo(const String & part_name, bool add); |
| 197 | |
| 198 | /// Update the insertion times in ZooKeeper. |
| 199 | void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, |
| 200 | std::optional<time_t> min_unprocessed_insert_time_changed, |
| 201 | std::optional<time_t> max_processed_insert_time_changed) const; |
| 202 | |
| 203 | /// Returns list of currently executing parts blocking execution a command modifying specified range |
| 204 | size_t getConflictsCountForRange( |
| 205 | const MergeTreePartInfo & range, const LogEntry & entry, String * out_description, |
| 206 | std::lock_guard<std::mutex> & state_lock) const; |
| 207 | |
| 208 | /// Marks the element of the queue as running. |
| 209 | class CurrentlyExecuting |
| 210 | { |
| 211 | private: |
| 212 | ReplicatedMergeTreeQueue::LogEntryPtr entry; |
| 213 | ReplicatedMergeTreeQueue & queue; |
| 214 | |
| 215 | friend class ReplicatedMergeTreeQueue; |
| 216 | |
| 217 | /// Created only in the selectEntryToProcess function. It is called under mutex. |
| 218 | CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_); |
| 219 | |
| 220 | /// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex. |
| 221 | static void setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, const String & actual_part_name, |
| 222 | ReplicatedMergeTreeQueue & queue); |
| 223 | public: |
| 224 | ~CurrentlyExecuting(); |
| 225 | }; |
| 226 | |
| 227 | public: |
| 228 | ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); |
| 229 | |
| 230 | ~ReplicatedMergeTreeQueue(); |
| 231 | |
| 232 | |
| 233 | void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, |
| 234 | const MergeTreeData::DataParts & parts); |
| 235 | |
| 236 | /** Inserts an action to the end of the queue. |
| 237 | * To restore broken parts during operation. |
| 238 | * Do not insert the action itself into ZK (do it yourself). |
| 239 | */ |
| 240 | void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); |
| 241 | |
| 242 | /** Delete the action with the specified part (as new_part_name) from the queue. |
| 243 | * Called for unreachable actions in the queue - old lost parts. |
| 244 | */ |
| 245 | bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); |
| 246 | |
| 247 | /** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). |
| 248 | * If queue was not empty load() would not load duplicate records. |
| 249 | * return true, if we update queue. |
| 250 | */ |
| 251 | bool load(zkutil::ZooKeeperPtr zookeeper); |
| 252 | |
| 253 | bool removeFromVirtualParts(const MergeTreePartInfo & part_info); |
| 254 | |
| 255 | /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. |
| 256 | * If watch_callback is not empty, will call it when new entries appear in the log. |
| 257 | * If there were new entries, notifies storage.queue_task_handle. |
| 258 | * Additionally loads mutations (so that the set of mutations is always more recent than the queue). |
| 259 | */ |
| 260 | void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); |
| 261 | |
| 262 | /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task. |
| 263 | /// If watch_callback is not empty, will call it when new mutations appear in ZK. |
| 264 | void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); |
| 265 | |
| 266 | /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr |
| 267 | /// if it could not be found. |
| 268 | ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id); |
| 269 | |
| 270 | /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). |
| 271 | * And also wait for the completion of their execution, if they are now being executed. |
| 272 | */ |
| 273 | void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current); |
| 274 | |
| 275 | /** Throws and exception if there are currently executing entries in the range . |
| 276 | */ |
| 277 | void checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry); |
| 278 | |
| 279 | /** In the case where there are not enough parts to perform the merge in part_name |
| 280 | * - move actions with merged parts to the end of the queue |
| 281 | * (in order to download a already merged part from another replica). |
| 282 | */ |
| 283 | StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name); |
| 284 | |
| 285 | /** Select the next action to process. |
| 286 | * merger_mutator is used only to check if the merges are not suspended. |
| 287 | */ |
| 288 | using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>; |
| 289 | SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data); |
| 290 | |
| 291 | /** Execute `func` function to handle the action. |
| 292 | * In this case, at runtime, mark the queue element as running |
| 293 | * (add into future_parts and more). |
| 294 | * If there was an exception during processing, it saves it in `entry`. |
| 295 | * Returns true if there were no exceptions during the processing. |
| 296 | */ |
| 297 | bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func); |
| 298 | |
| 299 | /// Count the number of merges and mutations of single parts in the queue. |
| 300 | std::pair<size_t, size_t> countMergesAndPartMutations() const; |
| 301 | |
| 302 | /// Count the total number of active mutations. |
| 303 | size_t countMutations() const; |
| 304 | |
| 305 | /// Count the total number of active mutations that are finished (is_done = true). |
| 306 | size_t countFinishedMutations() const; |
| 307 | |
| 308 | /// Returns functor which used by MergeTreeMergerMutator to select parts for merge |
| 309 | ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper); |
| 310 | |
| 311 | /// Return the version (block number) of the last mutation that we don't need to apply to the part |
| 312 | /// with getDataVersion() == data_version. (Either this mutation was already applied or the part |
| 313 | /// was created after the mutation). |
| 314 | /// If there is no such mutation or it has already been executed and deleted, return 0. |
| 315 | Int64 getCurrentMutationVersion(const String & partition_id, Int64 data_version) const; |
| 316 | |
| 317 | MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const; |
| 318 | |
| 319 | /// Mark finished mutations as done. If the function needs to be called again at some later time |
| 320 | /// (because some mutations are probably done but we are not sure yet), returns true. |
| 321 | bool tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper); |
| 322 | |
| 323 | /// Prohibit merges in the specified blocks range. |
| 324 | /// Add part to virtual_parts, which means that part must exist |
| 325 | /// after processing replication log up to log_pointer. |
| 326 | /// Part maybe fake (look at ReplicatedMergeTreeMergePredicate). |
| 327 | void disableMergesInBlockRange(const String & part_name); |
| 328 | |
| 329 | /// Cheks that part is already in virtual parts |
| 330 | bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const; |
| 331 | |
| 332 | /// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts. |
| 333 | /// Locks queue's mutex. |
| 334 | bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason); |
| 335 | |
| 336 | /// A blocker that stops selects from the queue |
| 337 | ActionBlocker actions_blocker; |
| 338 | |
| 339 | /// Adds a subscriber |
| 340 | SubscriberHandler addSubscriber(SubscriberCallBack && callback); |
| 341 | |
| 342 | struct Status |
| 343 | { |
| 344 | UInt32 future_parts; |
| 345 | UInt32 queue_size; |
| 346 | UInt32 inserts_in_queue; |
| 347 | UInt32 merges_in_queue; |
| 348 | UInt32 part_mutations_in_queue; |
| 349 | UInt32 queue_oldest_time; |
| 350 | UInt32 inserts_oldest_time; |
| 351 | UInt32 merges_oldest_time; |
| 352 | UInt32 part_mutations_oldest_time; |
| 353 | String oldest_part_to_get; |
| 354 | String oldest_part_to_merge_to; |
| 355 | String oldest_part_to_mutate_to; |
| 356 | UInt32 last_queue_update; |
| 357 | }; |
| 358 | |
| 359 | /// Get information about the queue. |
| 360 | Status getStatus() const; |
| 361 | |
| 362 | /// Get the data of the queue elements. |
| 363 | using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; |
| 364 | void getEntries(LogEntriesData & res) const; |
| 365 | |
| 366 | /// Get information about the insertion times. |
| 367 | void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; |
| 368 | |
| 369 | std::vector<MergeTreeMutationStatus> getMutationsStatus() const; |
| 370 | }; |
| 371 | |
| 372 | class ReplicatedMergeTreeMergePredicate |
| 373 | { |
| 374 | public: |
| 375 | ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper); |
| 376 | |
| 377 | /// Can we assign a merge with these two parts? |
| 378 | /// (assuming that no merge was assigned after the predicate was constructed) |
| 379 | /// If we can't and out_reason is not nullptr, set it to the reason why we can't merge. |
| 380 | bool operator()( |
| 381 | const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, |
| 382 | String * out_reason = nullptr) const; |
| 383 | |
| 384 | /// Return nonempty optional if the part can and should be mutated. |
| 385 | /// Returned mutation version number is always the biggest possible. |
| 386 | std::optional<Int64> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; |
| 387 | |
| 388 | bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const; |
| 389 | |
| 390 | private: |
| 391 | const ReplicatedMergeTreeQueue & queue; |
| 392 | |
| 393 | /// A snapshot of active parts that would appear if the replica executes all log entries in its queue. |
| 394 | ActiveDataPartSet prev_virtual_parts; |
| 395 | /// partition ID -> block numbers of the inserts and mutations that are about to commit |
| 396 | /// (loaded at some later time than prev_virtual_parts). |
| 397 | std::unordered_map<String, std::set<Int64>> committing_blocks; |
| 398 | |
| 399 | /// Quorum state taken at some later time than prev_virtual_parts. |
| 400 | std::set<std::string> last_quorum_parts; |
| 401 | String inprogress_quorum_part; |
| 402 | }; |
| 403 | |
| 404 | |
| 405 | /** Convert a number to a string in the format of the suffixes of auto-incremental nodes in ZooKeeper. |
| 406 | * Negative numbers are also supported - for them the name of the node looks somewhat silly |
| 407 | * and does not match any auto-incremented node in ZK. |
| 408 | */ |
| 409 | String padIndex(Int64 index); |
| 410 | |
| 411 | } |
| 412 | |