| 1 | #pragma once | 
|---|
| 2 |  | 
|---|
| 3 | #include <optional> | 
|---|
| 4 |  | 
|---|
| 5 | #include <Common/ActionBlocker.h> | 
|---|
| 6 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> | 
|---|
| 7 | #include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h> | 
|---|
| 8 | #include <Storages/MergeTree/ActiveDataPartSet.h> | 
|---|
| 9 | #include <Storages/MergeTree/MergeTreeData.h> | 
|---|
| 10 | #include <Storages/MergeTree/MergeTreeMutationStatus.h> | 
|---|
| 11 | #include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h> | 
|---|
| 12 |  | 
|---|
| 13 | #include <Common/ZooKeeper/ZooKeeper.h> | 
|---|
| 14 | #include <Core/BackgroundSchedulePool.h> | 
|---|
| 15 |  | 
|---|
| 16 |  | 
|---|
| 17 | namespace DB | 
|---|
| 18 | { | 
|---|
| 19 |  | 
|---|
| 20 | class StorageReplicatedMergeTree; | 
|---|
| 21 | class MergeTreeDataMergerMutator; | 
|---|
| 22 |  | 
|---|
| 23 | class ReplicatedMergeTreeMergePredicate; | 
|---|
| 24 |  | 
|---|
| 25 |  | 
|---|
| 26 | class ReplicatedMergeTreeQueue | 
|---|
| 27 | { | 
|---|
| 28 | private: | 
|---|
| 29 | friend class CurrentlyExecuting; | 
|---|
| 30 | friend class ReplicatedMergeTreeMergePredicate; | 
|---|
| 31 |  | 
|---|
| 32 | using LogEntry = ReplicatedMergeTreeLogEntry; | 
|---|
| 33 | using LogEntryPtr = LogEntry::Ptr; | 
|---|
| 34 |  | 
|---|
| 35 | using Queue = std::list<LogEntryPtr>; | 
|---|
| 36 |  | 
|---|
| 37 | using StringSet = std::set<String>; | 
|---|
| 38 |  | 
|---|
| 39 | struct ByTime | 
|---|
| 40 | { | 
|---|
| 41 | bool operator()(const LogEntryPtr & lhs, const LogEntryPtr & rhs) const | 
|---|
| 42 | { | 
|---|
| 43 | return std::forward_as_tuple(lhs->create_time, lhs.get()) | 
|---|
| 44 | < std::forward_as_tuple(rhs->create_time, rhs.get()); | 
|---|
| 45 | } | 
|---|
| 46 | }; | 
|---|
| 47 |  | 
|---|
| 48 | /// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated. | 
|---|
| 49 | using InsertsByTime = std::set<LogEntryPtr, ByTime>; | 
|---|
| 50 |  | 
|---|
| 51 |  | 
|---|
| 52 | StorageReplicatedMergeTree & storage; | 
|---|
| 53 | MergeTreeDataFormatVersion format_version; | 
|---|
| 54 |  | 
|---|
| 55 | String zookeeper_path; | 
|---|
| 56 | String replica_path; | 
|---|
| 57 | String logger_name; | 
|---|
| 58 | Logger * log = nullptr; | 
|---|
| 59 |  | 
|---|
| 60 | /// Protects the queue, future_parts and other queue state variables. | 
|---|
| 61 | mutable std::mutex state_mutex; | 
|---|
| 62 |  | 
|---|
| 63 | /// A set of parts that should be on this replica according to the queue entries that have been done | 
|---|
| 64 | /// up to this point. The invariant holds: `virtual_parts` = `current_parts` + `queue`. | 
|---|
| 65 | /// Note: it can be different from the actual set of parts because the replica can decide to fetch | 
|---|
| 66 | /// a bigger part instead of the part mentioned in the log entry. | 
|---|
| 67 | ActiveDataPartSet current_parts; | 
|---|
| 68 |  | 
|---|
| 69 | /** The queue of what you need to do on this line to catch up. It is taken from ZooKeeper (/replicas/me/queue/). | 
|---|
| 70 | * In ZK records in chronological order. Here it is not necessary. | 
|---|
| 71 | */ | 
|---|
| 72 | Queue queue; | 
|---|
| 73 |  | 
|---|
| 74 | InsertsByTime inserts_by_time; | 
|---|
| 75 | time_t min_unprocessed_insert_time = 0; | 
|---|
| 76 | time_t max_processed_insert_time = 0; | 
|---|
| 77 |  | 
|---|
| 78 | time_t last_queue_update = 0; | 
|---|
| 79 |  | 
|---|
| 80 | /// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue). | 
|---|
| 81 | /// Used to block other actions on parts in the range covered by future_parts. | 
|---|
| 82 | using FuturePartsSet = std::map<String, LogEntryPtr>; | 
|---|
| 83 | FuturePartsSet future_parts; | 
|---|
| 84 |  | 
|---|
| 85 | /// Index of the first log entry that we didn't see yet. | 
|---|
| 86 | Int64 log_pointer = 0; | 
|---|
| 87 |  | 
|---|
| 88 | /** What will be the set of active parts after executing all log entries up to log_pointer. | 
|---|
| 89 | * Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate) | 
|---|
| 90 | */ | 
|---|
| 91 | ActiveDataPartSet virtual_parts; | 
|---|
| 92 |  | 
|---|
| 93 | /// A set of mutations loaded from ZooKeeper. | 
|---|
| 94 | /// mutations_by_partition is an index partition ID -> block ID -> mutation into this set. | 
|---|
| 95 | /// Note that mutations are updated in such a way that they are always more recent than | 
|---|
| 96 | /// log_pointer (see pullLogsToQueue()). | 
|---|
| 97 |  | 
|---|
| 98 | struct MutationStatus | 
|---|
| 99 | { | 
|---|
| 100 | MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_) | 
|---|
| 101 | : entry(entry_) | 
|---|
| 102 | { | 
|---|
| 103 | } | 
|---|
| 104 |  | 
|---|
| 105 | ReplicatedMergeTreeMutationEntryPtr entry; | 
|---|
| 106 |  | 
|---|
| 107 | /// A number of parts that should be mutated/merged or otherwise moved to Obsolete state for this mutation to complete. | 
|---|
| 108 | Int64 parts_to_do = 0; | 
|---|
| 109 |  | 
|---|
| 110 | /// Note that is_done is not equivalent to parts_to_do == 0 | 
|---|
| 111 | /// (even if parts_to_do == 0 some relevant parts can still commit in the future). | 
|---|
| 112 | bool is_done = false; | 
|---|
| 113 |  | 
|---|
| 114 | String latest_failed_part; | 
|---|
| 115 | MergeTreePartInfo latest_failed_part_info; | 
|---|
| 116 | time_t latest_fail_time = 0; | 
|---|
| 117 | String latest_fail_reason; | 
|---|
| 118 | }; | 
|---|
| 119 |  | 
|---|
| 120 | std::map<String, MutationStatus> mutations_by_znode; | 
|---|
| 121 | std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition; | 
|---|
| 122 | /// Znode ID of the latest mutation that is done. | 
|---|
| 123 | String mutation_pointer; | 
|---|
| 124 |  | 
|---|
| 125 |  | 
|---|
| 126 | /// Provides only one simultaneous call to pullLogsToQueue. | 
|---|
| 127 | std::mutex pull_logs_to_queue_mutex; | 
|---|
| 128 |  | 
|---|
| 129 |  | 
|---|
| 130 | /// List of subscribers | 
|---|
| 131 | /// A subscriber callback is called when an entry queue is deleted | 
|---|
| 132 | mutable std::mutex subscribers_mutex; | 
|---|
| 133 |  | 
|---|
| 134 | using SubscriberCallBack = std::function<void(size_t /* queue_size */)>; | 
|---|
| 135 | using Subscribers = std::list<SubscriberCallBack>; | 
|---|
| 136 | using SubscriberIterator = Subscribers::iterator; | 
|---|
| 137 |  | 
|---|
| 138 | friend class SubscriberHandler; | 
|---|
| 139 | struct SubscriberHandler : public boost::noncopyable | 
|---|
| 140 | { | 
|---|
| 141 | SubscriberHandler(SubscriberIterator it_, ReplicatedMergeTreeQueue & queue_) : it(it_), queue(queue_) {} | 
|---|
| 142 | ~SubscriberHandler(); | 
|---|
| 143 |  | 
|---|
| 144 | private: | 
|---|
| 145 | SubscriberIterator it; | 
|---|
| 146 | ReplicatedMergeTreeQueue & queue; | 
|---|
| 147 | }; | 
|---|
| 148 |  | 
|---|
| 149 | Subscribers subscribers; | 
|---|
| 150 |  | 
|---|
| 151 | /// Notify subscribers about queue change | 
|---|
| 152 | void notifySubscribers(size_t new_queue_size); | 
|---|
| 153 |  | 
|---|
| 154 | /// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it | 
|---|
| 155 | bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const; | 
|---|
| 156 |  | 
|---|
| 157 | /// Ensures that only one thread is simultaneously updating mutations. | 
|---|
| 158 | std::mutex update_mutations_mutex; | 
|---|
| 159 |  | 
|---|
| 160 | /// Put a set of (already existing) parts in virtual_parts. | 
|---|
| 161 | void addVirtualParts(const MergeTreeData::DataParts & parts); | 
|---|
| 162 |  | 
|---|
| 163 | void insertUnlocked( | 
|---|
| 164 | const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, | 
|---|
| 165 | std::lock_guard<std::mutex> & state_lock); | 
|---|
| 166 |  | 
|---|
| 167 | void removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); | 
|---|
| 168 |  | 
|---|
| 169 | /** Can I now try this action. If not, you need to leave it in the queue and try another one. | 
|---|
| 170 | * Called under the state_mutex. | 
|---|
| 171 | */ | 
|---|
| 172 | bool shouldExecuteLogEntry( | 
|---|
| 173 | const LogEntry & entry, String & out_postpone_reason, | 
|---|
| 174 | MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data, | 
|---|
| 175 | std::lock_guard<std::mutex> & state_lock) const; | 
|---|
| 176 |  | 
|---|
| 177 | Int64 getCurrentMutationVersionImpl(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const; | 
|---|
| 178 |  | 
|---|
| 179 | /** Check that part isn't in currently generating parts and isn't covered by them. | 
|---|
| 180 | * Should be called under state_mutex. | 
|---|
| 181 | */ | 
|---|
| 182 | bool isNotCoveredByFuturePartsImpl( | 
|---|
| 183 | const String & new_part_name, String & out_reason, | 
|---|
| 184 | std::lock_guard<std::mutex> & state_lock) const; | 
|---|
| 185 |  | 
|---|
| 186 | /// After removing the queue element, update the insertion times in the RAM. Running under state_mutex. | 
|---|
| 187 | /// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper. | 
|---|
| 188 | void updateStateOnQueueEntryRemoval(const LogEntryPtr & entry, | 
|---|
| 189 | bool is_successful, | 
|---|
| 190 | std::optional<time_t> & min_unprocessed_insert_time_changed, | 
|---|
| 191 | std::optional<time_t> & max_processed_insert_time_changed, | 
|---|
| 192 | std::unique_lock<std::mutex> & state_lock); | 
|---|
| 193 |  | 
|---|
| 194 | /// If the new part appears (add == true) or becomes obsolete (add == false), update parts_to_do of all affected mutations. | 
|---|
| 195 | /// Notifies storage.mutations_finalizing_task if some mutations are probably finished. | 
|---|
| 196 | void updateMutationsPartsToDo(const String & part_name, bool add); | 
|---|
| 197 |  | 
|---|
| 198 | /// Update the insertion times in ZooKeeper. | 
|---|
| 199 | void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, | 
|---|
| 200 | std::optional<time_t> min_unprocessed_insert_time_changed, | 
|---|
| 201 | std::optional<time_t> max_processed_insert_time_changed) const; | 
|---|
| 202 |  | 
|---|
| 203 | /// Returns list of currently executing parts blocking execution a command modifying specified range | 
|---|
| 204 | size_t getConflictsCountForRange( | 
|---|
| 205 | const MergeTreePartInfo & range, const LogEntry & entry, String * out_description, | 
|---|
| 206 | std::lock_guard<std::mutex> & state_lock) const; | 
|---|
| 207 |  | 
|---|
| 208 | /// Marks the element of the queue as running. | 
|---|
| 209 | class CurrentlyExecuting | 
|---|
| 210 | { | 
|---|
| 211 | private: | 
|---|
| 212 | ReplicatedMergeTreeQueue::LogEntryPtr entry; | 
|---|
| 213 | ReplicatedMergeTreeQueue & queue; | 
|---|
| 214 |  | 
|---|
| 215 | friend class ReplicatedMergeTreeQueue; | 
|---|
| 216 |  | 
|---|
| 217 | /// Created only in the selectEntryToProcess function. It is called under mutex. | 
|---|
| 218 | CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_); | 
|---|
| 219 |  | 
|---|
| 220 | /// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex. | 
|---|
| 221 | static void setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry, const String & actual_part_name, | 
|---|
| 222 | ReplicatedMergeTreeQueue & queue); | 
|---|
| 223 | public: | 
|---|
| 224 | ~CurrentlyExecuting(); | 
|---|
| 225 | }; | 
|---|
| 226 |  | 
|---|
| 227 | public: | 
|---|
| 228 | ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); | 
|---|
| 229 |  | 
|---|
| 230 | ~ReplicatedMergeTreeQueue(); | 
|---|
| 231 |  | 
|---|
| 232 |  | 
|---|
| 233 | void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, | 
|---|
| 234 | const MergeTreeData::DataParts & parts); | 
|---|
| 235 |  | 
|---|
| 236 | /** Inserts an action to the end of the queue. | 
|---|
| 237 | * To restore broken parts during operation. | 
|---|
| 238 | * Do not insert the action itself into ZK (do it yourself). | 
|---|
| 239 | */ | 
|---|
| 240 | void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); | 
|---|
| 241 |  | 
|---|
| 242 | /** Delete the action with the specified part (as new_part_name) from the queue. | 
|---|
| 243 | * Called for unreachable actions in the queue - old lost parts. | 
|---|
| 244 | */ | 
|---|
| 245 | bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); | 
|---|
| 246 |  | 
|---|
| 247 | /** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). | 
|---|
| 248 | * If queue was not empty load() would not load duplicate records. | 
|---|
| 249 | * return true, if we update queue. | 
|---|
| 250 | */ | 
|---|
| 251 | bool load(zkutil::ZooKeeperPtr zookeeper); | 
|---|
| 252 |  | 
|---|
| 253 | bool removeFromVirtualParts(const MergeTreePartInfo & part_info); | 
|---|
| 254 |  | 
|---|
| 255 | /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. | 
|---|
| 256 | * If watch_callback is not empty, will call it when new entries appear in the log. | 
|---|
| 257 | * If there were new entries, notifies storage.queue_task_handle. | 
|---|
| 258 | * Additionally loads mutations (so that the set of mutations is always more recent than the queue). | 
|---|
| 259 | */ | 
|---|
| 260 | void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); | 
|---|
| 261 |  | 
|---|
| 262 | /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task. | 
|---|
| 263 | /// If watch_callback is not empty, will call it when new mutations appear in ZK. | 
|---|
| 264 | void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); | 
|---|
| 265 |  | 
|---|
| 266 | /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr | 
|---|
| 267 | /// if it could not be found. | 
|---|
| 268 | ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id); | 
|---|
| 269 |  | 
|---|
| 270 | /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). | 
|---|
| 271 | * And also wait for the completion of their execution, if they are now being executed. | 
|---|
| 272 | */ | 
|---|
| 273 | void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current); | 
|---|
| 274 |  | 
|---|
| 275 | /** Throws and exception if there are currently executing entries in the range . | 
|---|
| 276 | */ | 
|---|
| 277 | void checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry); | 
|---|
| 278 |  | 
|---|
| 279 | /** In the case where there are not enough parts to perform the merge in part_name | 
|---|
| 280 | * - move actions with merged parts to the end of the queue | 
|---|
| 281 | * (in order to download a already merged part from another replica). | 
|---|
| 282 | */ | 
|---|
| 283 | StringSet moveSiblingPartsForMergeToEndOfQueue(const String & part_name); | 
|---|
| 284 |  | 
|---|
| 285 | /** Select the next action to process. | 
|---|
| 286 | * merger_mutator is used only to check if the merges are not suspended. | 
|---|
| 287 | */ | 
|---|
| 288 | using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>; | 
|---|
| 289 | SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data); | 
|---|
| 290 |  | 
|---|
| 291 | /** Execute `func` function to handle the action. | 
|---|
| 292 | * In this case, at runtime, mark the queue element as running | 
|---|
| 293 | *  (add into future_parts and more). | 
|---|
| 294 | * If there was an exception during processing, it saves it in `entry`. | 
|---|
| 295 | * Returns true if there were no exceptions during the processing. | 
|---|
| 296 | */ | 
|---|
| 297 | bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func); | 
|---|
| 298 |  | 
|---|
| 299 | /// Count the number of merges and mutations of single parts in the queue. | 
|---|
| 300 | std::pair<size_t, size_t> countMergesAndPartMutations() const; | 
|---|
| 301 |  | 
|---|
| 302 | /// Count the total number of active mutations. | 
|---|
| 303 | size_t countMutations() const; | 
|---|
| 304 |  | 
|---|
| 305 | /// Count the total number of active mutations that are finished (is_done = true). | 
|---|
| 306 | size_t countFinishedMutations() const; | 
|---|
| 307 |  | 
|---|
| 308 | /// Returns functor which used by MergeTreeMergerMutator to select parts for merge | 
|---|
| 309 | ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper); | 
|---|
| 310 |  | 
|---|
| 311 | /// Return the version (block number) of the last mutation that we don't need to apply to the part | 
|---|
| 312 | /// with getDataVersion() == data_version. (Either this mutation was already applied or the part | 
|---|
| 313 | /// was created after the mutation). | 
|---|
| 314 | /// If there is no such mutation or it has already been executed and deleted, return 0. | 
|---|
| 315 | Int64 getCurrentMutationVersion(const String & partition_id, Int64 data_version) const; | 
|---|
| 316 |  | 
|---|
| 317 | MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const; | 
|---|
| 318 |  | 
|---|
| 319 | /// Mark finished mutations as done. If the function needs to be called again at some later time | 
|---|
| 320 | /// (because some mutations are probably done but we are not sure yet), returns true. | 
|---|
| 321 | bool tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper); | 
|---|
| 322 |  | 
|---|
| 323 | /// Prohibit merges in the specified blocks range. | 
|---|
| 324 | /// Add part to virtual_parts, which means that part must exist | 
|---|
| 325 | /// after processing replication log up to log_pointer. | 
|---|
| 326 | /// Part maybe fake (look at ReplicatedMergeTreeMergePredicate). | 
|---|
| 327 | void disableMergesInBlockRange(const String & part_name); | 
|---|
| 328 |  | 
|---|
| 329 | /// Cheks that part is already in virtual parts | 
|---|
| 330 | bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const; | 
|---|
| 331 |  | 
|---|
| 332 | /// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts. | 
|---|
| 333 | /// Locks queue's mutex. | 
|---|
| 334 | bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason); | 
|---|
| 335 |  | 
|---|
| 336 | /// A blocker that stops selects from the queue | 
|---|
| 337 | ActionBlocker actions_blocker; | 
|---|
| 338 |  | 
|---|
| 339 | /// Adds a subscriber | 
|---|
| 340 | SubscriberHandler addSubscriber(SubscriberCallBack && callback); | 
|---|
| 341 |  | 
|---|
| 342 | struct Status | 
|---|
| 343 | { | 
|---|
| 344 | UInt32 future_parts; | 
|---|
| 345 | UInt32 queue_size; | 
|---|
| 346 | UInt32 inserts_in_queue; | 
|---|
| 347 | UInt32 merges_in_queue; | 
|---|
| 348 | UInt32 part_mutations_in_queue; | 
|---|
| 349 | UInt32 queue_oldest_time; | 
|---|
| 350 | UInt32 inserts_oldest_time; | 
|---|
| 351 | UInt32 merges_oldest_time; | 
|---|
| 352 | UInt32 part_mutations_oldest_time; | 
|---|
| 353 | String oldest_part_to_get; | 
|---|
| 354 | String oldest_part_to_merge_to; | 
|---|
| 355 | String oldest_part_to_mutate_to; | 
|---|
| 356 | UInt32 last_queue_update; | 
|---|
| 357 | }; | 
|---|
| 358 |  | 
|---|
| 359 | /// Get information about the queue. | 
|---|
| 360 | Status getStatus() const; | 
|---|
| 361 |  | 
|---|
| 362 | /// Get the data of the queue elements. | 
|---|
| 363 | using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; | 
|---|
| 364 | void getEntries(LogEntriesData & res) const; | 
|---|
| 365 |  | 
|---|
| 366 | /// Get information about the insertion times. | 
|---|
| 367 | void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; | 
|---|
| 368 |  | 
|---|
| 369 | std::vector<MergeTreeMutationStatus> getMutationsStatus() const; | 
|---|
| 370 | }; | 
|---|
| 371 |  | 
|---|
| 372 | class ReplicatedMergeTreeMergePredicate | 
|---|
| 373 | { | 
|---|
| 374 | public: | 
|---|
| 375 | ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper); | 
|---|
| 376 |  | 
|---|
| 377 | /// Can we assign a merge with these two parts? | 
|---|
| 378 | /// (assuming that no merge was assigned after the predicate was constructed) | 
|---|
| 379 | /// If we can't and out_reason is not nullptr, set it to the reason why we can't merge. | 
|---|
| 380 | bool operator()( | 
|---|
| 381 | const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, | 
|---|
| 382 | String * out_reason = nullptr) const; | 
|---|
| 383 |  | 
|---|
| 384 | /// Return nonempty optional if the part can and should be mutated. | 
|---|
| 385 | /// Returned mutation version number is always the biggest possible. | 
|---|
| 386 | std::optional<Int64> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; | 
|---|
| 387 |  | 
|---|
| 388 | bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const; | 
|---|
| 389 |  | 
|---|
| 390 | private: | 
|---|
| 391 | const ReplicatedMergeTreeQueue & queue; | 
|---|
| 392 |  | 
|---|
| 393 | /// A snapshot of active parts that would appear if the replica executes all log entries in its queue. | 
|---|
| 394 | ActiveDataPartSet prev_virtual_parts; | 
|---|
| 395 | /// partition ID -> block numbers of the inserts and mutations that are about to commit | 
|---|
| 396 | /// (loaded at some later time than prev_virtual_parts). | 
|---|
| 397 | std::unordered_map<String, std::set<Int64>> committing_blocks; | 
|---|
| 398 |  | 
|---|
| 399 | /// Quorum state taken at some later time than prev_virtual_parts. | 
|---|
| 400 | std::set<std::string> last_quorum_parts; | 
|---|
| 401 | String inprogress_quorum_part; | 
|---|
| 402 | }; | 
|---|
| 403 |  | 
|---|
| 404 |  | 
|---|
| 405 | /** Convert a number to a string in the format of the suffixes of auto-incremental nodes in ZooKeeper. | 
|---|
| 406 | * Negative numbers are also supported - for them the name of the node looks somewhat silly | 
|---|
| 407 | *  and does not match any auto-incremented node in ZK. | 
|---|
| 408 | */ | 
|---|
| 409 | String padIndex(Int64 index); | 
|---|
| 410 |  | 
|---|
| 411 | } | 
|---|
| 412 |  | 
|---|