| 1 | #pragma once | 
|---|
| 2 |  | 
|---|
| 3 | #include <ext/shared_ptr_helper.h> | 
|---|
| 4 | #include <atomic> | 
|---|
| 5 | #include <pcg_random.hpp> | 
|---|
| 6 | #include <Storages/IStorage.h> | 
|---|
| 7 | #include <Storages/MergeTree/MergeTreeData.h> | 
|---|
| 8 | #include <Storages/MergeTree/MergeTreeDataMergerMutator.h> | 
|---|
| 9 | #include <Storages/MergeTree/MergeTreePartsMover.h> | 
|---|
| 10 | #include <Storages/MergeTree/MergeTreeDataWriter.h> | 
|---|
| 11 | #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> | 
|---|
| 12 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> | 
|---|
| 13 | #include <Storages/MergeTree/ReplicatedMergeTreeQueue.h> | 
|---|
| 14 | #include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h> | 
|---|
| 15 | #include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h> | 
|---|
| 16 | #include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h> | 
|---|
| 17 | #include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h> | 
|---|
| 18 | #include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h> | 
|---|
| 19 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> | 
|---|
| 20 | #include <Storages/MergeTree/BackgroundProcessingPool.h> | 
|---|
| 21 | #include <Storages/MergeTree/DataPartsExchange.h> | 
|---|
| 22 | #include <Storages/MergeTree/ReplicatedMergeTreeAddress.h> | 
|---|
| 23 | #include <DataTypes/DataTypesNumber.h> | 
|---|
| 24 | #include <Interpreters/Cluster.h> | 
|---|
| 25 | #include <Interpreters/PartLog.h> | 
|---|
| 26 | #include <Common/randomSeed.h> | 
|---|
| 27 | #include <Common/ZooKeeper/ZooKeeper.h> | 
|---|
| 28 | #include <Common/ZooKeeper/LeaderElection.h> | 
|---|
| 29 | #include <Core/BackgroundSchedulePool.h> | 
|---|
| 30 | #include <Processors/Pipe.h> | 
|---|
| 31 |  | 
|---|
| 32 |  | 
|---|
| 33 | namespace DB | 
|---|
| 34 | { | 
|---|
| 35 |  | 
|---|
| 36 | /** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper. | 
|---|
| 37 | * | 
|---|
| 38 | * ZooKeeper is used for the following things: | 
|---|
| 39 | * - the structure of the table (/ metadata, /columns) | 
|---|
| 40 | * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...); | 
|---|
| 41 | * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host); | 
|---|
| 42 | * - select the leader replica (/leader_election) - this is the replica that assigns the merge; | 
|---|
| 43 | * - a set of parts of data on each replica (/replicas/replica_name/parts); | 
|---|
| 44 | * - list of the last N blocks of data with checksum, for deduplication (/blocks); | 
|---|
| 45 | * - the list of incremental block numbers (/block_numbers) that we are about to insert, | 
|---|
| 46 | *   to ensure the linear order of data insertion and data merge only on the intervals in this sequence; | 
|---|
| 47 | * - coordinates writes with quorum (/quorum). | 
|---|
| 48 | * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations). | 
|---|
| 49 | *   See comments in StorageReplicatedMergeTree::mutate() for details. | 
|---|
| 50 | */ | 
|---|
| 51 |  | 
|---|
| 52 | /** The replicated tables have a common log (/log/log-...). | 
|---|
| 53 | * Log - a sequence of entries (LogEntry) about what to do. | 
|---|
| 54 | * Each entry is one of: | 
|---|
| 55 | * - normal data insertion (GET), | 
|---|
| 56 | * - merge (MERGE), | 
|---|
| 57 | * - delete the partition (DROP). | 
|---|
| 58 | * | 
|---|
| 59 | * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...) | 
|---|
| 60 | *  and then executes them (queueTask). | 
|---|
| 61 | * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry). | 
|---|
| 62 | * In addition, the records in the queue can be generated independently (not from the log), in the following cases: | 
|---|
| 63 | * - when creating a new replica, actions are put on GET from other replicas (createReplica); | 
|---|
| 64 | * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check (at start - checkParts, while running - searchForMissingPart), | 
|---|
| 65 | *   actions are put on GET from other replicas; | 
|---|
| 66 | * | 
|---|
| 67 | * The replica to which INSERT was made in the queue will also have an entry of the GET of this data. | 
|---|
| 68 | * Such an entry is considered to be executed as soon as the queue handler sees it. | 
|---|
| 69 | * | 
|---|
| 70 | * The log entry has a creation time. This time is generated by the clock of server that created entry | 
|---|
| 71 | * - the one on which the corresponding INSERT or ALTER query came. | 
|---|
| 72 | * | 
|---|
| 73 | * For the entries in the queue that the replica made for itself, | 
|---|
| 74 | * as the time will take the time of creation the appropriate part on any of the replicas. | 
|---|
| 75 | */ | 
|---|
| 76 |  | 
|---|
| 77 | class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicatedMergeTree>, public MergeTreeData | 
|---|
| 78 | { | 
|---|
| 79 | friend struct ext::shared_ptr_helper<StorageReplicatedMergeTree>; | 
|---|
| 80 | public: | 
|---|
| 81 | void startup() override; | 
|---|
| 82 | void shutdown() override; | 
|---|
| 83 | ~StorageReplicatedMergeTree() override; | 
|---|
| 84 |  | 
|---|
| 85 | std::string getName() const override { return "Replicated"+ merging_params.getModeName() + "MergeTree"; } | 
|---|
| 86 | std::string getTableName() const override { return table_name; } | 
|---|
| 87 | std::string getDatabaseName() const override { return database_name; } | 
|---|
| 88 |  | 
|---|
| 89 | bool supportsReplication() const override { return true; } | 
|---|
| 90 | bool supportsDeduplication() const override { return true; } | 
|---|
| 91 |  | 
|---|
| 92 | Pipes readWithProcessors( | 
|---|
| 93 | const Names & column_names, | 
|---|
| 94 | const SelectQueryInfo & query_info, | 
|---|
| 95 | const Context & context, | 
|---|
| 96 | QueryProcessingStage::Enum processed_stage, | 
|---|
| 97 | size_t max_block_size, | 
|---|
| 98 | unsigned num_streams) override; | 
|---|
| 99 |  | 
|---|
| 100 | bool supportProcessorsPipeline() const override { return true; } | 
|---|
| 101 |  | 
|---|
| 102 | std::optional<UInt64> totalRows() const override; | 
|---|
| 103 |  | 
|---|
| 104 | BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; | 
|---|
| 105 |  | 
|---|
| 106 | bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override; | 
|---|
| 107 |  | 
|---|
| 108 | void alter(const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override; | 
|---|
| 109 |  | 
|---|
| 110 | void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override; | 
|---|
| 111 |  | 
|---|
| 112 | void mutate(const MutationCommands & commands, const Context & context) override; | 
|---|
| 113 | std::vector<MergeTreeMutationStatus> getMutationsStatus() const override; | 
|---|
| 114 | CancellationCode killMutation(const String & mutation_id) override; | 
|---|
| 115 |  | 
|---|
| 116 | /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper. | 
|---|
| 117 | */ | 
|---|
| 118 | void drop(TableStructureWriteLockHolder &) override; | 
|---|
| 119 |  | 
|---|
| 120 | void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override; | 
|---|
| 121 |  | 
|---|
| 122 | void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override; | 
|---|
| 123 |  | 
|---|
| 124 | bool supportsIndexForIn() const override { return true; } | 
|---|
| 125 |  | 
|---|
| 126 | void checkTableCanBeDropped() const override; | 
|---|
| 127 |  | 
|---|
| 128 | void checkPartitionCanBeDropped(const ASTPtr & partition) override; | 
|---|
| 129 |  | 
|---|
| 130 | ActionLock getActionLock(StorageActionBlockType action_type) override; | 
|---|
| 131 |  | 
|---|
| 132 | /// Wait when replication queue size becomes less or equal than queue_size | 
|---|
| 133 | /// If timeout is exceeded returns false | 
|---|
| 134 | bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0); | 
|---|
| 135 |  | 
|---|
| 136 | /** For the system table replicas. */ | 
|---|
| 137 | struct Status | 
|---|
| 138 | { | 
|---|
| 139 | bool is_leader; | 
|---|
| 140 | bool can_become_leader; | 
|---|
| 141 | bool is_readonly; | 
|---|
| 142 | bool is_session_expired; | 
|---|
| 143 | ReplicatedMergeTreeQueue::Status queue; | 
|---|
| 144 | UInt32 parts_to_check; | 
|---|
| 145 | String zookeeper_path; | 
|---|
| 146 | String replica_name; | 
|---|
| 147 | String replica_path; | 
|---|
| 148 | Int32 columns_version; | 
|---|
| 149 | UInt64 log_max_index; | 
|---|
| 150 | UInt64 log_pointer; | 
|---|
| 151 | UInt64 absolute_delay; | 
|---|
| 152 | UInt8 total_replicas; | 
|---|
| 153 | UInt8 active_replicas; | 
|---|
| 154 | }; | 
|---|
| 155 |  | 
|---|
| 156 | /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK. | 
|---|
| 157 | void getStatus(Status & res, bool with_zk_fields = true); | 
|---|
| 158 |  | 
|---|
| 159 | using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; | 
|---|
| 160 | void getQueue(LogEntriesData & res, String & replica_name); | 
|---|
| 161 |  | 
|---|
| 162 | /// Get replica delay relative to current time. | 
|---|
| 163 | time_t getAbsoluteDelay() const; | 
|---|
| 164 |  | 
|---|
| 165 | /// If the absolute delay is greater than min_relative_delay_to_yield_leadership, | 
|---|
| 166 | /// will also calculate the difference from the unprocessed time of the best replica. | 
|---|
| 167 | /// NOTE: Will communicate to ZooKeeper to calculate relative delay. | 
|---|
| 168 | void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay); | 
|---|
| 169 |  | 
|---|
| 170 | /// Add a part to the queue of parts whose data you want to check in the background thread. | 
|---|
| 171 | void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0) | 
|---|
| 172 | { | 
|---|
| 173 | part_check_thread.enqueuePart(part_name, delay_to_check_seconds); | 
|---|
| 174 | } | 
|---|
| 175 |  | 
|---|
| 176 | CheckResults checkData(const ASTPtr & query, const Context & context) override; | 
|---|
| 177 |  | 
|---|
| 178 | /// Checks ability to use granularity | 
|---|
| 179 | bool canUseAdaptiveGranularity() const override; | 
|---|
| 180 |  | 
|---|
| 181 | private: | 
|---|
| 182 |  | 
|---|
| 183 | /// Get a sequential consistent view of current parts. | 
|---|
| 184 | ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; | 
|---|
| 185 |  | 
|---|
| 186 | /// Delete old parts from disk and from ZooKeeper. | 
|---|
| 187 | void clearOldPartsAndRemoveFromZK(); | 
|---|
| 188 |  | 
|---|
| 189 | friend class ReplicatedMergeTreeBlockOutputStream; | 
|---|
| 190 | friend class ReplicatedMergeTreePartCheckThread; | 
|---|
| 191 | friend class ReplicatedMergeTreeCleanupThread; | 
|---|
| 192 | friend class ReplicatedMergeTreeAlterThread; | 
|---|
| 193 | friend class ReplicatedMergeTreeRestartingThread; | 
|---|
| 194 | friend struct ReplicatedMergeTreeLogEntry; | 
|---|
| 195 | friend class ScopedPartitionMergeLock; | 
|---|
| 196 | friend class ReplicatedMergeTreeQueue; | 
|---|
| 197 | friend class MergeTreeData; | 
|---|
| 198 |  | 
|---|
| 199 | using LogEntry = ReplicatedMergeTreeLogEntry; | 
|---|
| 200 | using LogEntryPtr = LogEntry::Ptr; | 
|---|
| 201 |  | 
|---|
| 202 | zkutil::ZooKeeperPtr current_zookeeper;        /// Use only the methods below. | 
|---|
| 203 | mutable std::mutex current_zookeeper_mutex;    /// To recreate the session in the background thread. | 
|---|
| 204 |  | 
|---|
| 205 | zkutil::ZooKeeperPtr tryGetZooKeeper() const; | 
|---|
| 206 | zkutil::ZooKeeperPtr getZooKeeper() const; | 
|---|
| 207 | void setZooKeeper(zkutil::ZooKeeperPtr zookeeper); | 
|---|
| 208 |  | 
|---|
| 209 | /// If true, the table is offline and can not be written to it. | 
|---|
| 210 | std::atomic_bool is_readonly {false}; | 
|---|
| 211 |  | 
|---|
| 212 | String zookeeper_path; | 
|---|
| 213 | String replica_name; | 
|---|
| 214 | String replica_path; | 
|---|
| 215 |  | 
|---|
| 216 | /** /replicas/me/is_active. | 
|---|
| 217 | */ | 
|---|
| 218 | zkutil::EphemeralNodeHolderPtr replica_is_active_node; | 
|---|
| 219 |  | 
|---|
| 220 | /** Version of the /columns node in ZooKeeper corresponding to the current data.columns. | 
|---|
| 221 | * Read and modify along with the data.columns - under TableStructureLock. | 
|---|
| 222 | */ | 
|---|
| 223 | int columns_version = -1; | 
|---|
| 224 |  | 
|---|
| 225 | /// Version of the /metadata node in ZooKeeper. | 
|---|
| 226 | int metadata_version = -1; | 
|---|
| 227 |  | 
|---|
| 228 | /// Used to delay setting table structure till startup() in case of an offline ALTER. | 
|---|
| 229 | std::function<void()> set_table_structure_at_startup; | 
|---|
| 230 |  | 
|---|
| 231 | /** Is this replica "leading". The leader replica selects the parts to merge. | 
|---|
| 232 | */ | 
|---|
| 233 | std::atomic<bool> is_leader {false}; | 
|---|
| 234 | zkutil::LeaderElectionPtr leader_election; | 
|---|
| 235 |  | 
|---|
| 236 | InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; | 
|---|
| 237 |  | 
|---|
| 238 | MergeTreeDataSelectExecutor reader; | 
|---|
| 239 | MergeTreeDataWriter writer; | 
|---|
| 240 | MergeTreeDataMergerMutator merger_mutator; | 
|---|
| 241 |  | 
|---|
| 242 | /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/). | 
|---|
| 243 | * In ZK entries in chronological order. Here it is not necessary. | 
|---|
| 244 | */ | 
|---|
| 245 | ReplicatedMergeTreeQueue queue; | 
|---|
| 246 | std::atomic<time_t> last_queue_update_start_time{0}; | 
|---|
| 247 | std::atomic<time_t> last_queue_update_finish_time{0}; | 
|---|
| 248 |  | 
|---|
| 249 | DataPartsExchange::Fetcher fetcher; | 
|---|
| 250 |  | 
|---|
| 251 |  | 
|---|
| 252 | /// When activated, replica is initialized and startup() method could exit | 
|---|
| 253 | Poco::Event startup_event; | 
|---|
| 254 |  | 
|---|
| 255 | /// Do I need to complete background threads (except restarting_thread)? | 
|---|
| 256 | std::atomic<bool> partial_shutdown_called {false}; | 
|---|
| 257 |  | 
|---|
| 258 | /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires. | 
|---|
| 259 | Poco::Event partial_shutdown_event {false};     /// Poco::Event::EVENT_MANUALRESET | 
|---|
| 260 |  | 
|---|
| 261 | /// Limiting parallel fetches per one table | 
|---|
| 262 | std::atomic_uint current_table_fetches {0}; | 
|---|
| 263 |  | 
|---|
| 264 | /// Threads. | 
|---|
| 265 |  | 
|---|
| 266 | /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. | 
|---|
| 267 | bool queue_update_in_progress = false; | 
|---|
| 268 | BackgroundSchedulePool::TaskHolder queue_updating_task; | 
|---|
| 269 |  | 
|---|
| 270 | BackgroundSchedulePool::TaskHolder mutations_updating_task; | 
|---|
| 271 |  | 
|---|
| 272 | /// A task that performs actions from the queue. | 
|---|
| 273 | BackgroundProcessingPool::TaskHandle queue_task_handle; | 
|---|
| 274 |  | 
|---|
| 275 | /// A task which move parts to another disks/volumes | 
|---|
| 276 | /// Transparent for replication. | 
|---|
| 277 | BackgroundProcessingPool::TaskHandle move_parts_task_handle; | 
|---|
| 278 |  | 
|---|
| 279 | /// A task that selects parts to merge. | 
|---|
| 280 | BackgroundSchedulePool::TaskHolder merge_selecting_task; | 
|---|
| 281 | /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. | 
|---|
| 282 | std::mutex merge_selecting_mutex; | 
|---|
| 283 |  | 
|---|
| 284 | /// A task that marks finished mutations as done. | 
|---|
| 285 | BackgroundSchedulePool::TaskHolder mutations_finalizing_task; | 
|---|
| 286 |  | 
|---|
| 287 | /// A thread that removes old parts, log entries, and blocks. | 
|---|
| 288 | ReplicatedMergeTreeCleanupThread cleanup_thread; | 
|---|
| 289 |  | 
|---|
| 290 | /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. | 
|---|
| 291 | ReplicatedMergeTreeAlterThread alter_thread; | 
|---|
| 292 |  | 
|---|
| 293 | /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. | 
|---|
| 294 | ReplicatedMergeTreePartCheckThread part_check_thread; | 
|---|
| 295 |  | 
|---|
| 296 | /// A thread that processes reconnection to ZooKeeper when the session expires. | 
|---|
| 297 | ReplicatedMergeTreeRestartingThread restarting_thread; | 
|---|
| 298 |  | 
|---|
| 299 | /// An event that awakens `alter` method from waiting for the completion of the ALTER query. | 
|---|
| 300 | zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>(); | 
|---|
| 301 |  | 
|---|
| 302 | /// True if replica was created for existing table with fixed granularity | 
|---|
| 303 | bool other_replicas_fixed_granularity = false; | 
|---|
| 304 |  | 
|---|
| 305 | /** Creates the minimum set of nodes in ZooKeeper. | 
|---|
| 306 | */ | 
|---|
| 307 | void createTableIfNotExists(); | 
|---|
| 308 |  | 
|---|
| 309 | /** Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. | 
|---|
| 310 | */ | 
|---|
| 311 | void createReplica(); | 
|---|
| 312 |  | 
|---|
| 313 | /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. | 
|---|
| 314 | */ | 
|---|
| 315 | void createNewZooKeeperNodes(); | 
|---|
| 316 |  | 
|---|
| 317 | /** Verify that the list of columns and table settings match those specified in ZK (/metadata). | 
|---|
| 318 | * If not, throw an exception. | 
|---|
| 319 | * Must be called before startup(). | 
|---|
| 320 | */ | 
|---|
| 321 | void checkTableStructure(bool skip_sanity_checks, bool allow_alter); | 
|---|
| 322 |  | 
|---|
| 323 | /// A part of ALTER: apply metadata changes only (data parts are altered separately). | 
|---|
| 324 | /// Must be called under IStorage::lockStructureForAlter() lock. | 
|---|
| 325 | void setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff); | 
|---|
| 326 |  | 
|---|
| 327 | /** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/). | 
|---|
| 328 | * If any parts described in ZK are not locally, throw an exception. | 
|---|
| 329 | * If any local parts are not mentioned in ZK, remove them. | 
|---|
| 330 | *  But if there are too many, throw an exception just in case - it's probably a configuration error. | 
|---|
| 331 | */ | 
|---|
| 332 | void checkParts(bool skip_sanity_checks); | 
|---|
| 333 |  | 
|---|
| 334 | /** Check that the part's checksum is the same as the checksum of the same part on some other replica. | 
|---|
| 335 | * If no one has such a part, nothing checks. | 
|---|
| 336 | * Not very reliable: if two replicas add a part almost at the same time, no checks will occur. | 
|---|
| 337 | * Adds actions to `ops` that add data about the part into ZooKeeper. | 
|---|
| 338 | * Call under TableStructureLock. | 
|---|
| 339 | */ | 
|---|
| 340 | void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part, | 
|---|
| 341 | Coordination::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr); | 
|---|
| 342 |  | 
|---|
| 343 | String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const; | 
|---|
| 344 |  | 
|---|
| 345 | /// Accepts a PreComitted part, atomically checks its checksums with ones on other replicas and commit the part | 
|---|
| 346 | DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, | 
|---|
| 347 | const DataPartPtr & part); | 
|---|
| 348 |  | 
|---|
| 349 | bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; | 
|---|
| 350 |  | 
|---|
| 351 | void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const; | 
|---|
| 352 |  | 
|---|
| 353 | /// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful. | 
|---|
| 354 | void updatePartHeaderInZooKeeperAndCommit( | 
|---|
| 355 | const zkutil::ZooKeeperPtr & zookeeper, | 
|---|
| 356 | AlterDataPartTransaction & transaction); | 
|---|
| 357 |  | 
|---|
| 358 | /// Adds actions to `ops` that remove a part from ZooKeeper. | 
|---|
| 359 | /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes). | 
|---|
| 360 | void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children); | 
|---|
| 361 |  | 
|---|
| 362 | /// Quickly removes big set of parts from ZooKeeper (using async multi queries) | 
|---|
| 363 | void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, | 
|---|
| 364 | NameSet * parts_should_be_retried = nullptr); | 
|---|
| 365 |  | 
|---|
| 366 | bool tryRemovePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5); | 
|---|
| 367 | bool tryRemovePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5); | 
|---|
| 368 |  | 
|---|
| 369 | /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts. | 
|---|
| 370 | void removePartAndEnqueueFetch(const String & part_name); | 
|---|
| 371 |  | 
|---|
| 372 | /// Running jobs from the queue. | 
|---|
| 373 |  | 
|---|
| 374 | /** Execute the action from the queue. Throws an exception if something is wrong. | 
|---|
| 375 | * Returns whether or not it succeeds. If it did not work, write it to the end of the queue. | 
|---|
| 376 | */ | 
|---|
| 377 | bool executeLogEntry(LogEntry & entry); | 
|---|
| 378 |  | 
|---|
| 379 |  | 
|---|
| 380 | void executeDropRange(const LogEntry & entry); | 
|---|
| 381 |  | 
|---|
| 382 | /// Do the merge or recommend to make the fetch instead of the merge | 
|---|
| 383 | bool tryExecuteMerge(const LogEntry & entry); | 
|---|
| 384 |  | 
|---|
| 385 | bool tryExecutePartMutation(const LogEntry & entry); | 
|---|
| 386 |  | 
|---|
| 387 |  | 
|---|
| 388 | bool executeFetch(LogEntry & entry); | 
|---|
| 389 |  | 
|---|
| 390 | void executeClearColumnOrIndexInPartition(const LogEntry & entry); | 
|---|
| 391 |  | 
|---|
| 392 | bool executeReplaceRange(const LogEntry & entry); | 
|---|
| 393 |  | 
|---|
| 394 | /** Updates the queue. | 
|---|
| 395 | */ | 
|---|
| 396 | void queueUpdatingTask(); | 
|---|
| 397 |  | 
|---|
| 398 | void mutationsUpdatingTask(); | 
|---|
| 399 |  | 
|---|
| 400 | /** Clone data from another replica. | 
|---|
| 401 | * If replica can not be cloned throw Exception. | 
|---|
| 402 | */ | 
|---|
| 403 | void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper); | 
|---|
| 404 |  | 
|---|
| 405 | /// Clone replica if it is lost. | 
|---|
| 406 | void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper); | 
|---|
| 407 |  | 
|---|
| 408 | /** Performs actions from the queue. | 
|---|
| 409 | */ | 
|---|
| 410 | BackgroundProcessingPoolTaskResult queueTask(); | 
|---|
| 411 |  | 
|---|
| 412 | /// Perform moves of parts to another disks. | 
|---|
| 413 | /// Local operation, doesn't interact with replicationg queue. | 
|---|
| 414 | BackgroundProcessingPoolTaskResult movePartsTask(); | 
|---|
| 415 |  | 
|---|
| 416 |  | 
|---|
| 417 | /// Postcondition: | 
|---|
| 418 | /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) | 
|---|
| 419 | /// or an exception is thrown and leader_election is destroyed. | 
|---|
| 420 | void enterLeaderElection(); | 
|---|
| 421 |  | 
|---|
| 422 | /// Postcondition: | 
|---|
| 423 | /// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr. | 
|---|
| 424 | /// leader_election node in ZK is either deleted, or the session is marked expired. | 
|---|
| 425 | void exitLeaderElection(); | 
|---|
| 426 |  | 
|---|
| 427 | /** Selects the parts to merge and writes to the log. | 
|---|
| 428 | */ | 
|---|
| 429 | void mergeSelectingTask(); | 
|---|
| 430 |  | 
|---|
| 431 | /// Checks if some mutations are done and marks them as done. | 
|---|
| 432 | void mutationsFinalizingTask(); | 
|---|
| 433 |  | 
|---|
| 434 | /** Write the selected parts to merge into the log, | 
|---|
| 435 | * Call when merge_selecting_mutex is locked. | 
|---|
| 436 | * Returns false if any part is not in ZK. | 
|---|
| 437 | */ | 
|---|
| 438 | bool createLogEntryToMergeParts( | 
|---|
| 439 | zkutil::ZooKeeperPtr & zookeeper, | 
|---|
| 440 | const DataPartsVector & parts, | 
|---|
| 441 | const String & merged_name, | 
|---|
| 442 | bool deduplicate, | 
|---|
| 443 | bool force_ttl, | 
|---|
| 444 | ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr); | 
|---|
| 445 |  | 
|---|
| 446 | bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version); | 
|---|
| 447 |  | 
|---|
| 448 | /// Exchange parts. | 
|---|
| 449 |  | 
|---|
| 450 | /** Returns an empty string if no one has a part. | 
|---|
| 451 | */ | 
|---|
| 452 | String findReplicaHavingPart(const String & part_name, bool active); | 
|---|
| 453 |  | 
|---|
| 454 | /** Find replica having specified part or any part that covers it. | 
|---|
| 455 | * If active = true, consider only active replicas. | 
|---|
| 456 | * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part. | 
|---|
| 457 | * If not found, returns empty string. | 
|---|
| 458 | */ | 
|---|
| 459 | String findReplicaHavingCoveringPart(LogEntry & entry, bool active); | 
|---|
| 460 | String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name); | 
|---|
| 461 |  | 
|---|
| 462 | /** Download the specified part from the specified replica. | 
|---|
| 463 | * If `to_detached`, the part is placed in the `detached` directory. | 
|---|
| 464 | * If quorum != 0, then the node for tracking the quorum is updated. | 
|---|
| 465 | * Returns false if part is already fetching right now. | 
|---|
| 466 | */ | 
|---|
| 467 | bool fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum); | 
|---|
| 468 |  | 
|---|
| 469 | /// Required only to avoid races between executeLogEntry and fetchPartition | 
|---|
| 470 | std::unordered_set<String> currently_fetching_parts; | 
|---|
| 471 | std::mutex currently_fetching_parts_mutex; | 
|---|
| 472 |  | 
|---|
| 473 |  | 
|---|
| 474 | /// With the quorum being tracked, add a replica to the quorum for the part. | 
|---|
| 475 | void updateQuorum(const String & part_name); | 
|---|
| 476 |  | 
|---|
| 477 | /// Creates new block number if block with such block_id does not exist | 
|---|
| 478 | std::optional<EphemeralLockInZooKeeper> allocateBlockNumber( | 
|---|
| 479 | const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, | 
|---|
| 480 | const String & zookeeper_block_id_path = ""); | 
|---|
| 481 |  | 
|---|
| 482 | /** Wait until all replicas, including this, execute the specified action from the log. | 
|---|
| 483 | * If replicas are added at the same time, it can not wait the added replica . | 
|---|
| 484 | * | 
|---|
| 485 | * NOTE: This method must be called without table lock held. | 
|---|
| 486 | * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. | 
|---|
| 487 | * TODO: There are wrong usages of this method that are not fixed yet. | 
|---|
| 488 | */ | 
|---|
| 489 | void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry); | 
|---|
| 490 |  | 
|---|
| 491 | /** Wait until the specified replica executes the specified action from the log. | 
|---|
| 492 | * NOTE: See comment about locks above. | 
|---|
| 493 | */ | 
|---|
| 494 | void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); | 
|---|
| 495 |  | 
|---|
| 496 | /// Choose leader replica, send requst to it and wait. | 
|---|
| 497 | void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); | 
|---|
| 498 |  | 
|---|
| 499 | /// Throw an exception if the table is readonly. | 
|---|
| 500 | void assertNotReadonly() const; | 
|---|
| 501 |  | 
|---|
| 502 | /// Produce an imaginary part info covering all parts in the specified partition (at the call moment). | 
|---|
| 503 | /// Returns false if the partition doesn't exist yet. | 
|---|
| 504 | bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info); | 
|---|
| 505 |  | 
|---|
| 506 | /// Check for a node in ZK. If it is, remember this information, and then immediately answer true. | 
|---|
| 507 | std::unordered_set<std::string> existing_nodes_cache; | 
|---|
| 508 | std::mutex existing_nodes_cache_mutex; | 
|---|
| 509 | bool existsNodeCached(const std::string & path); | 
|---|
| 510 |  | 
|---|
| 511 | /// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range. | 
|---|
| 512 | void clearBlocksInPartition( | 
|---|
| 513 | zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); | 
|---|
| 514 |  | 
|---|
| 515 | /// Info about how other replicas can access this one. | 
|---|
| 516 | ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const; | 
|---|
| 517 |  | 
|---|
| 518 | bool dropPartsInPartition(zkutil::ZooKeeper & zookeeper, String & partition_id, | 
|---|
| 519 | StorageReplicatedMergeTree::LogEntry & entry, bool detach); | 
|---|
| 520 |  | 
|---|
| 521 | /// Find cluster address for host | 
|---|
| 522 | std::optional<Cluster::Address> findClusterAddress(const ReplicatedMergeTreeAddress & leader_address) const; | 
|---|
| 523 |  | 
|---|
| 524 | // Partition helpers | 
|---|
| 525 | void clearColumnOrIndexInPartition(const ASTPtr & partition, LogEntry && entry, const Context & query_context); | 
|---|
| 526 | void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context); | 
|---|
| 527 | void attachPartition(const ASTPtr & partition, bool part, const Context & query_context); | 
|---|
| 528 | void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); | 
|---|
| 529 | void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context); | 
|---|
| 530 |  | 
|---|
| 531 | /// Check granularity of already existing replicated table in zookeeper if it exists | 
|---|
| 532 | /// return true if it's fixed | 
|---|
| 533 | bool checkFixedGranualrityInZookeeper(); | 
|---|
| 534 |  | 
|---|
| 535 | /// Wait for timeout seconds mutation is finished on replicas | 
|---|
| 536 | void waitMutationToFinishOnReplicas( | 
|---|
| 537 | const Strings & replicas, const String & mutation_id) const; | 
|---|
| 538 |  | 
|---|
| 539 | protected: | 
|---|
| 540 | /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. | 
|---|
| 541 | */ | 
|---|
| 542 | StorageReplicatedMergeTree( | 
|---|
| 543 | const String & zookeeper_path_, | 
|---|
| 544 | const String & replica_name_, | 
|---|
| 545 | bool attach, | 
|---|
| 546 | const String & database_name_, const String & name_, | 
|---|
| 547 | const String & relative_data_path_, | 
|---|
| 548 | const StorageInMemoryMetadata & metadata, | 
|---|
| 549 | Context & context_, | 
|---|
| 550 | const String & date_column_name, | 
|---|
| 551 | const MergingParams & merging_params_, | 
|---|
| 552 | std::unique_ptr<MergeTreeSettings> settings_, | 
|---|
| 553 | bool has_force_restore_data_flag); | 
|---|
| 554 | }; | 
|---|
| 555 |  | 
|---|
| 556 |  | 
|---|
| 557 | extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER; | 
|---|
| 558 |  | 
|---|
| 559 | } | 
|---|
| 560 |  | 
|---|