| 1 | #pragma once |
| 2 | |
| 3 | #include <ext/shared_ptr_helper.h> |
| 4 | #include <atomic> |
| 5 | #include <pcg_random.hpp> |
| 6 | #include <Storages/IStorage.h> |
| 7 | #include <Storages/MergeTree/MergeTreeData.h> |
| 8 | #include <Storages/MergeTree/MergeTreeDataMergerMutator.h> |
| 9 | #include <Storages/MergeTree/MergeTreePartsMover.h> |
| 10 | #include <Storages/MergeTree/MergeTreeDataWriter.h> |
| 11 | #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> |
| 12 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> |
| 13 | #include <Storages/MergeTree/ReplicatedMergeTreeQueue.h> |
| 14 | #include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h> |
| 15 | #include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h> |
| 16 | #include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h> |
| 17 | #include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h> |
| 18 | #include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h> |
| 19 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> |
| 20 | #include <Storages/MergeTree/BackgroundProcessingPool.h> |
| 21 | #include <Storages/MergeTree/DataPartsExchange.h> |
| 22 | #include <Storages/MergeTree/ReplicatedMergeTreeAddress.h> |
| 23 | #include <DataTypes/DataTypesNumber.h> |
| 24 | #include <Interpreters/Cluster.h> |
| 25 | #include <Interpreters/PartLog.h> |
| 26 | #include <Common/randomSeed.h> |
| 27 | #include <Common/ZooKeeper/ZooKeeper.h> |
| 28 | #include <Common/ZooKeeper/LeaderElection.h> |
| 29 | #include <Core/BackgroundSchedulePool.h> |
| 30 | #include <Processors/Pipe.h> |
| 31 | |
| 32 | |
| 33 | namespace DB |
| 34 | { |
| 35 | |
| 36 | /** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper. |
| 37 | * |
| 38 | * ZooKeeper is used for the following things: |
| 39 | * - the structure of the table (/ metadata, /columns) |
| 40 | * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...); |
| 41 | * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host); |
| 42 | * - select the leader replica (/leader_election) - this is the replica that assigns the merge; |
| 43 | * - a set of parts of data on each replica (/replicas/replica_name/parts); |
| 44 | * - list of the last N blocks of data with checksum, for deduplication (/blocks); |
| 45 | * - the list of incremental block numbers (/block_numbers) that we are about to insert, |
| 46 | * to ensure the linear order of data insertion and data merge only on the intervals in this sequence; |
| 47 | * - coordinates writes with quorum (/quorum). |
| 48 | * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations). |
| 49 | * See comments in StorageReplicatedMergeTree::mutate() for details. |
| 50 | */ |
| 51 | |
| 52 | /** The replicated tables have a common log (/log/log-...). |
| 53 | * Log - a sequence of entries (LogEntry) about what to do. |
| 54 | * Each entry is one of: |
| 55 | * - normal data insertion (GET), |
| 56 | * - merge (MERGE), |
| 57 | * - delete the partition (DROP). |
| 58 | * |
| 59 | * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...) |
| 60 | * and then executes them (queueTask). |
| 61 | * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry). |
| 62 | * In addition, the records in the queue can be generated independently (not from the log), in the following cases: |
| 63 | * - when creating a new replica, actions are put on GET from other replicas (createReplica); |
| 64 | * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check (at start - checkParts, while running - searchForMissingPart), |
| 65 | * actions are put on GET from other replicas; |
| 66 | * |
| 67 | * The replica to which INSERT was made in the queue will also have an entry of the GET of this data. |
| 68 | * Such an entry is considered to be executed as soon as the queue handler sees it. |
| 69 | * |
| 70 | * The log entry has a creation time. This time is generated by the clock of server that created entry |
| 71 | * - the one on which the corresponding INSERT or ALTER query came. |
| 72 | * |
| 73 | * For the entries in the queue that the replica made for itself, |
| 74 | * as the time will take the time of creation the appropriate part on any of the replicas. |
| 75 | */ |
| 76 | |
| 77 | class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicatedMergeTree>, public MergeTreeData |
| 78 | { |
| 79 | friend struct ext::shared_ptr_helper<StorageReplicatedMergeTree>; |
| 80 | public: |
| 81 | void startup() override; |
| 82 | void shutdown() override; |
| 83 | ~StorageReplicatedMergeTree() override; |
| 84 | |
| 85 | std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree" ; } |
| 86 | std::string getTableName() const override { return table_name; } |
| 87 | std::string getDatabaseName() const override { return database_name; } |
| 88 | |
| 89 | bool supportsReplication() const override { return true; } |
| 90 | bool supportsDeduplication() const override { return true; } |
| 91 | |
| 92 | Pipes readWithProcessors( |
| 93 | const Names & column_names, |
| 94 | const SelectQueryInfo & query_info, |
| 95 | const Context & context, |
| 96 | QueryProcessingStage::Enum processed_stage, |
| 97 | size_t max_block_size, |
| 98 | unsigned num_streams) override; |
| 99 | |
| 100 | bool supportProcessorsPipeline() const override { return true; } |
| 101 | |
| 102 | std::optional<UInt64> totalRows() const override; |
| 103 | |
| 104 | BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; |
| 105 | |
| 106 | bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override; |
| 107 | |
| 108 | void alter(const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override; |
| 109 | |
| 110 | void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override; |
| 111 | |
| 112 | void mutate(const MutationCommands & commands, const Context & context) override; |
| 113 | std::vector<MergeTreeMutationStatus> getMutationsStatus() const override; |
| 114 | CancellationCode killMutation(const String & mutation_id) override; |
| 115 | |
| 116 | /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper. |
| 117 | */ |
| 118 | void drop(TableStructureWriteLockHolder &) override; |
| 119 | |
| 120 | void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override; |
| 121 | |
| 122 | void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override; |
| 123 | |
| 124 | bool supportsIndexForIn() const override { return true; } |
| 125 | |
| 126 | void checkTableCanBeDropped() const override; |
| 127 | |
| 128 | void checkPartitionCanBeDropped(const ASTPtr & partition) override; |
| 129 | |
| 130 | ActionLock getActionLock(StorageActionBlockType action_type) override; |
| 131 | |
| 132 | /// Wait when replication queue size becomes less or equal than queue_size |
| 133 | /// If timeout is exceeded returns false |
| 134 | bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0); |
| 135 | |
| 136 | /** For the system table replicas. */ |
| 137 | struct Status |
| 138 | { |
| 139 | bool is_leader; |
| 140 | bool can_become_leader; |
| 141 | bool is_readonly; |
| 142 | bool is_session_expired; |
| 143 | ReplicatedMergeTreeQueue::Status queue; |
| 144 | UInt32 parts_to_check; |
| 145 | String zookeeper_path; |
| 146 | String replica_name; |
| 147 | String replica_path; |
| 148 | Int32 columns_version; |
| 149 | UInt64 log_max_index; |
| 150 | UInt64 log_pointer; |
| 151 | UInt64 absolute_delay; |
| 152 | UInt8 total_replicas; |
| 153 | UInt8 active_replicas; |
| 154 | }; |
| 155 | |
| 156 | /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK. |
| 157 | void getStatus(Status & res, bool with_zk_fields = true); |
| 158 | |
| 159 | using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; |
| 160 | void getQueue(LogEntriesData & res, String & replica_name); |
| 161 | |
| 162 | /// Get replica delay relative to current time. |
| 163 | time_t getAbsoluteDelay() const; |
| 164 | |
| 165 | /// If the absolute delay is greater than min_relative_delay_to_yield_leadership, |
| 166 | /// will also calculate the difference from the unprocessed time of the best replica. |
| 167 | /// NOTE: Will communicate to ZooKeeper to calculate relative delay. |
| 168 | void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay); |
| 169 | |
| 170 | /// Add a part to the queue of parts whose data you want to check in the background thread. |
| 171 | void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0) |
| 172 | { |
| 173 | part_check_thread.enqueuePart(part_name, delay_to_check_seconds); |
| 174 | } |
| 175 | |
| 176 | CheckResults checkData(const ASTPtr & query, const Context & context) override; |
| 177 | |
| 178 | /// Checks ability to use granularity |
| 179 | bool canUseAdaptiveGranularity() const override; |
| 180 | |
| 181 | private: |
| 182 | |
| 183 | /// Get a sequential consistent view of current parts. |
| 184 | ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; |
| 185 | |
| 186 | /// Delete old parts from disk and from ZooKeeper. |
| 187 | void clearOldPartsAndRemoveFromZK(); |
| 188 | |
| 189 | friend class ReplicatedMergeTreeBlockOutputStream; |
| 190 | friend class ReplicatedMergeTreePartCheckThread; |
| 191 | friend class ReplicatedMergeTreeCleanupThread; |
| 192 | friend class ReplicatedMergeTreeAlterThread; |
| 193 | friend class ReplicatedMergeTreeRestartingThread; |
| 194 | friend struct ReplicatedMergeTreeLogEntry; |
| 195 | friend class ScopedPartitionMergeLock; |
| 196 | friend class ReplicatedMergeTreeQueue; |
| 197 | friend class MergeTreeData; |
| 198 | |
| 199 | using LogEntry = ReplicatedMergeTreeLogEntry; |
| 200 | using LogEntryPtr = LogEntry::Ptr; |
| 201 | |
| 202 | zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below. |
| 203 | mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread. |
| 204 | |
| 205 | zkutil::ZooKeeperPtr tryGetZooKeeper() const; |
| 206 | zkutil::ZooKeeperPtr getZooKeeper() const; |
| 207 | void setZooKeeper(zkutil::ZooKeeperPtr zookeeper); |
| 208 | |
| 209 | /// If true, the table is offline and can not be written to it. |
| 210 | std::atomic_bool is_readonly {false}; |
| 211 | |
| 212 | String zookeeper_path; |
| 213 | String replica_name; |
| 214 | String replica_path; |
| 215 | |
| 216 | /** /replicas/me/is_active. |
| 217 | */ |
| 218 | zkutil::EphemeralNodeHolderPtr replica_is_active_node; |
| 219 | |
| 220 | /** Version of the /columns node in ZooKeeper corresponding to the current data.columns. |
| 221 | * Read and modify along with the data.columns - under TableStructureLock. |
| 222 | */ |
| 223 | int columns_version = -1; |
| 224 | |
| 225 | /// Version of the /metadata node in ZooKeeper. |
| 226 | int metadata_version = -1; |
| 227 | |
| 228 | /// Used to delay setting table structure till startup() in case of an offline ALTER. |
| 229 | std::function<void()> set_table_structure_at_startup; |
| 230 | |
| 231 | /** Is this replica "leading". The leader replica selects the parts to merge. |
| 232 | */ |
| 233 | std::atomic<bool> is_leader {false}; |
| 234 | zkutil::LeaderElectionPtr leader_election; |
| 235 | |
| 236 | InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; |
| 237 | |
| 238 | MergeTreeDataSelectExecutor reader; |
| 239 | MergeTreeDataWriter writer; |
| 240 | MergeTreeDataMergerMutator merger_mutator; |
| 241 | |
| 242 | /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/). |
| 243 | * In ZK entries in chronological order. Here it is not necessary. |
| 244 | */ |
| 245 | ReplicatedMergeTreeQueue queue; |
| 246 | std::atomic<time_t> last_queue_update_start_time{0}; |
| 247 | std::atomic<time_t> last_queue_update_finish_time{0}; |
| 248 | |
| 249 | DataPartsExchange::Fetcher fetcher; |
| 250 | |
| 251 | |
| 252 | /// When activated, replica is initialized and startup() method could exit |
| 253 | Poco::Event startup_event; |
| 254 | |
| 255 | /// Do I need to complete background threads (except restarting_thread)? |
| 256 | std::atomic<bool> partial_shutdown_called {false}; |
| 257 | |
| 258 | /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires. |
| 259 | Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET |
| 260 | |
| 261 | /// Limiting parallel fetches per one table |
| 262 | std::atomic_uint current_table_fetches {0}; |
| 263 | |
| 264 | /// Threads. |
| 265 | |
| 266 | /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. |
| 267 | bool queue_update_in_progress = false; |
| 268 | BackgroundSchedulePool::TaskHolder queue_updating_task; |
| 269 | |
| 270 | BackgroundSchedulePool::TaskHolder mutations_updating_task; |
| 271 | |
| 272 | /// A task that performs actions from the queue. |
| 273 | BackgroundProcessingPool::TaskHandle queue_task_handle; |
| 274 | |
| 275 | /// A task which move parts to another disks/volumes |
| 276 | /// Transparent for replication. |
| 277 | BackgroundProcessingPool::TaskHandle move_parts_task_handle; |
| 278 | |
| 279 | /// A task that selects parts to merge. |
| 280 | BackgroundSchedulePool::TaskHolder merge_selecting_task; |
| 281 | /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. |
| 282 | std::mutex merge_selecting_mutex; |
| 283 | |
| 284 | /// A task that marks finished mutations as done. |
| 285 | BackgroundSchedulePool::TaskHolder mutations_finalizing_task; |
| 286 | |
| 287 | /// A thread that removes old parts, log entries, and blocks. |
| 288 | ReplicatedMergeTreeCleanupThread cleanup_thread; |
| 289 | |
| 290 | /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. |
| 291 | ReplicatedMergeTreeAlterThread alter_thread; |
| 292 | |
| 293 | /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. |
| 294 | ReplicatedMergeTreePartCheckThread part_check_thread; |
| 295 | |
| 296 | /// A thread that processes reconnection to ZooKeeper when the session expires. |
| 297 | ReplicatedMergeTreeRestartingThread restarting_thread; |
| 298 | |
| 299 | /// An event that awakens `alter` method from waiting for the completion of the ALTER query. |
| 300 | zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>(); |
| 301 | |
| 302 | /// True if replica was created for existing table with fixed granularity |
| 303 | bool other_replicas_fixed_granularity = false; |
| 304 | |
| 305 | /** Creates the minimum set of nodes in ZooKeeper. |
| 306 | */ |
| 307 | void createTableIfNotExists(); |
| 308 | |
| 309 | /** Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. |
| 310 | */ |
| 311 | void createReplica(); |
| 312 | |
| 313 | /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. |
| 314 | */ |
| 315 | void createNewZooKeeperNodes(); |
| 316 | |
| 317 | /** Verify that the list of columns and table settings match those specified in ZK (/metadata). |
| 318 | * If not, throw an exception. |
| 319 | * Must be called before startup(). |
| 320 | */ |
| 321 | void checkTableStructure(bool skip_sanity_checks, bool allow_alter); |
| 322 | |
| 323 | /// A part of ALTER: apply metadata changes only (data parts are altered separately). |
| 324 | /// Must be called under IStorage::lockStructureForAlter() lock. |
| 325 | void setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff); |
| 326 | |
| 327 | /** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/). |
| 328 | * If any parts described in ZK are not locally, throw an exception. |
| 329 | * If any local parts are not mentioned in ZK, remove them. |
| 330 | * But if there are too many, throw an exception just in case - it's probably a configuration error. |
| 331 | */ |
| 332 | void checkParts(bool skip_sanity_checks); |
| 333 | |
| 334 | /** Check that the part's checksum is the same as the checksum of the same part on some other replica. |
| 335 | * If no one has such a part, nothing checks. |
| 336 | * Not very reliable: if two replicas add a part almost at the same time, no checks will occur. |
| 337 | * Adds actions to `ops` that add data about the part into ZooKeeper. |
| 338 | * Call under TableStructureLock. |
| 339 | */ |
| 340 | void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part, |
| 341 | Coordination::Requests & ops, String part_name = "" , NameSet * absent_replicas_paths = nullptr); |
| 342 | |
| 343 | String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const; |
| 344 | |
| 345 | /// Accepts a PreComitted part, atomically checks its checksums with ones on other replicas and commit the part |
| 346 | DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, |
| 347 | const DataPartPtr & part); |
| 348 | |
| 349 | bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; |
| 350 | |
| 351 | void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "" ) const; |
| 352 | |
| 353 | /// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful. |
| 354 | void updatePartHeaderInZooKeeperAndCommit( |
| 355 | const zkutil::ZooKeeperPtr & zookeeper, |
| 356 | AlterDataPartTransaction & transaction); |
| 357 | |
| 358 | /// Adds actions to `ops` that remove a part from ZooKeeper. |
| 359 | /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes). |
| 360 | void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children); |
| 361 | |
| 362 | /// Quickly removes big set of parts from ZooKeeper (using async multi queries) |
| 363 | void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, |
| 364 | NameSet * parts_should_be_retried = nullptr); |
| 365 | |
| 366 | bool tryRemovePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5); |
| 367 | bool tryRemovePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5); |
| 368 | |
| 369 | /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts. |
| 370 | void removePartAndEnqueueFetch(const String & part_name); |
| 371 | |
| 372 | /// Running jobs from the queue. |
| 373 | |
| 374 | /** Execute the action from the queue. Throws an exception if something is wrong. |
| 375 | * Returns whether or not it succeeds. If it did not work, write it to the end of the queue. |
| 376 | */ |
| 377 | bool executeLogEntry(LogEntry & entry); |
| 378 | |
| 379 | |
| 380 | void executeDropRange(const LogEntry & entry); |
| 381 | |
| 382 | /// Do the merge or recommend to make the fetch instead of the merge |
| 383 | bool tryExecuteMerge(const LogEntry & entry); |
| 384 | |
| 385 | bool tryExecutePartMutation(const LogEntry & entry); |
| 386 | |
| 387 | |
| 388 | bool executeFetch(LogEntry & entry); |
| 389 | |
| 390 | void executeClearColumnOrIndexInPartition(const LogEntry & entry); |
| 391 | |
| 392 | bool executeReplaceRange(const LogEntry & entry); |
| 393 | |
| 394 | /** Updates the queue. |
| 395 | */ |
| 396 | void queueUpdatingTask(); |
| 397 | |
| 398 | void mutationsUpdatingTask(); |
| 399 | |
| 400 | /** Clone data from another replica. |
| 401 | * If replica can not be cloned throw Exception. |
| 402 | */ |
| 403 | void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper); |
| 404 | |
| 405 | /// Clone replica if it is lost. |
| 406 | void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper); |
| 407 | |
| 408 | /** Performs actions from the queue. |
| 409 | */ |
| 410 | BackgroundProcessingPoolTaskResult queueTask(); |
| 411 | |
| 412 | /// Perform moves of parts to another disks. |
| 413 | /// Local operation, doesn't interact with replicationg queue. |
| 414 | BackgroundProcessingPoolTaskResult movePartsTask(); |
| 415 | |
| 416 | |
| 417 | /// Postcondition: |
| 418 | /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) |
| 419 | /// or an exception is thrown and leader_election is destroyed. |
| 420 | void enterLeaderElection(); |
| 421 | |
| 422 | /// Postcondition: |
| 423 | /// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr. |
| 424 | /// leader_election node in ZK is either deleted, or the session is marked expired. |
| 425 | void exitLeaderElection(); |
| 426 | |
| 427 | /** Selects the parts to merge and writes to the log. |
| 428 | */ |
| 429 | void mergeSelectingTask(); |
| 430 | |
| 431 | /// Checks if some mutations are done and marks them as done. |
| 432 | void mutationsFinalizingTask(); |
| 433 | |
| 434 | /** Write the selected parts to merge into the log, |
| 435 | * Call when merge_selecting_mutex is locked. |
| 436 | * Returns false if any part is not in ZK. |
| 437 | */ |
| 438 | bool createLogEntryToMergeParts( |
| 439 | zkutil::ZooKeeperPtr & zookeeper, |
| 440 | const DataPartsVector & parts, |
| 441 | const String & merged_name, |
| 442 | bool deduplicate, |
| 443 | bool force_ttl, |
| 444 | ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr); |
| 445 | |
| 446 | bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version); |
| 447 | |
| 448 | /// Exchange parts. |
| 449 | |
| 450 | /** Returns an empty string if no one has a part. |
| 451 | */ |
| 452 | String findReplicaHavingPart(const String & part_name, bool active); |
| 453 | |
| 454 | /** Find replica having specified part or any part that covers it. |
| 455 | * If active = true, consider only active replicas. |
| 456 | * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part. |
| 457 | * If not found, returns empty string. |
| 458 | */ |
| 459 | String findReplicaHavingCoveringPart(LogEntry & entry, bool active); |
| 460 | String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name); |
| 461 | |
| 462 | /** Download the specified part from the specified replica. |
| 463 | * If `to_detached`, the part is placed in the `detached` directory. |
| 464 | * If quorum != 0, then the node for tracking the quorum is updated. |
| 465 | * Returns false if part is already fetching right now. |
| 466 | */ |
| 467 | bool fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum); |
| 468 | |
| 469 | /// Required only to avoid races between executeLogEntry and fetchPartition |
| 470 | std::unordered_set<String> currently_fetching_parts; |
| 471 | std::mutex currently_fetching_parts_mutex; |
| 472 | |
| 473 | |
| 474 | /// With the quorum being tracked, add a replica to the quorum for the part. |
| 475 | void updateQuorum(const String & part_name); |
| 476 | |
| 477 | /// Creates new block number if block with such block_id does not exist |
| 478 | std::optional<EphemeralLockInZooKeeper> allocateBlockNumber( |
| 479 | const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, |
| 480 | const String & zookeeper_block_id_path = "" ); |
| 481 | |
| 482 | /** Wait until all replicas, including this, execute the specified action from the log. |
| 483 | * If replicas are added at the same time, it can not wait the added replica . |
| 484 | * |
| 485 | * NOTE: This method must be called without table lock held. |
| 486 | * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. |
| 487 | * TODO: There are wrong usages of this method that are not fixed yet. |
| 488 | */ |
| 489 | void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry); |
| 490 | |
| 491 | /** Wait until the specified replica executes the specified action from the log. |
| 492 | * NOTE: See comment about locks above. |
| 493 | */ |
| 494 | void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); |
| 495 | |
| 496 | /// Choose leader replica, send requst to it and wait. |
| 497 | void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); |
| 498 | |
| 499 | /// Throw an exception if the table is readonly. |
| 500 | void assertNotReadonly() const; |
| 501 | |
| 502 | /// Produce an imaginary part info covering all parts in the specified partition (at the call moment). |
| 503 | /// Returns false if the partition doesn't exist yet. |
| 504 | bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info); |
| 505 | |
| 506 | /// Check for a node in ZK. If it is, remember this information, and then immediately answer true. |
| 507 | std::unordered_set<std::string> existing_nodes_cache; |
| 508 | std::mutex existing_nodes_cache_mutex; |
| 509 | bool existsNodeCached(const std::string & path); |
| 510 | |
| 511 | /// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range. |
| 512 | void clearBlocksInPartition( |
| 513 | zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); |
| 514 | |
| 515 | /// Info about how other replicas can access this one. |
| 516 | ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const; |
| 517 | |
| 518 | bool dropPartsInPartition(zkutil::ZooKeeper & zookeeper, String & partition_id, |
| 519 | StorageReplicatedMergeTree::LogEntry & entry, bool detach); |
| 520 | |
| 521 | /// Find cluster address for host |
| 522 | std::optional<Cluster::Address> findClusterAddress(const ReplicatedMergeTreeAddress & leader_address) const; |
| 523 | |
| 524 | // Partition helpers |
| 525 | void clearColumnOrIndexInPartition(const ASTPtr & partition, LogEntry && entry, const Context & query_context); |
| 526 | void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context); |
| 527 | void attachPartition(const ASTPtr & partition, bool part, const Context & query_context); |
| 528 | void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); |
| 529 | void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context); |
| 530 | |
| 531 | /// Check granularity of already existing replicated table in zookeeper if it exists |
| 532 | /// return true if it's fixed |
| 533 | bool checkFixedGranualrityInZookeeper(); |
| 534 | |
| 535 | /// Wait for timeout seconds mutation is finished on replicas |
| 536 | void waitMutationToFinishOnReplicas( |
| 537 | const Strings & replicas, const String & mutation_id) const; |
| 538 | |
| 539 | protected: |
| 540 | /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. |
| 541 | */ |
| 542 | StorageReplicatedMergeTree( |
| 543 | const String & zookeeper_path_, |
| 544 | const String & replica_name_, |
| 545 | bool attach, |
| 546 | const String & database_name_, const String & name_, |
| 547 | const String & relative_data_path_, |
| 548 | const StorageInMemoryMetadata & metadata, |
| 549 | Context & context_, |
| 550 | const String & date_column_name, |
| 551 | const MergingParams & merging_params_, |
| 552 | std::unique_ptr<MergeTreeSettings> settings_, |
| 553 | bool has_force_restore_data_flag); |
| 554 | }; |
| 555 | |
| 556 | |
| 557 | extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER; |
| 558 | |
| 559 | } |
| 560 | |