1 | #pragma once |
2 | |
3 | #include <ext/shared_ptr_helper.h> |
4 | #include <atomic> |
5 | #include <pcg_random.hpp> |
6 | #include <Storages/IStorage.h> |
7 | #include <Storages/MergeTree/MergeTreeData.h> |
8 | #include <Storages/MergeTree/MergeTreeDataMergerMutator.h> |
9 | #include <Storages/MergeTree/MergeTreePartsMover.h> |
10 | #include <Storages/MergeTree/MergeTreeDataWriter.h> |
11 | #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> |
12 | #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> |
13 | #include <Storages/MergeTree/ReplicatedMergeTreeQueue.h> |
14 | #include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h> |
15 | #include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h> |
16 | #include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h> |
17 | #include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h> |
18 | #include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h> |
19 | #include <Storages/MergeTree/EphemeralLockInZooKeeper.h> |
20 | #include <Storages/MergeTree/BackgroundProcessingPool.h> |
21 | #include <Storages/MergeTree/DataPartsExchange.h> |
22 | #include <Storages/MergeTree/ReplicatedMergeTreeAddress.h> |
23 | #include <DataTypes/DataTypesNumber.h> |
24 | #include <Interpreters/Cluster.h> |
25 | #include <Interpreters/PartLog.h> |
26 | #include <Common/randomSeed.h> |
27 | #include <Common/ZooKeeper/ZooKeeper.h> |
28 | #include <Common/ZooKeeper/LeaderElection.h> |
29 | #include <Core/BackgroundSchedulePool.h> |
30 | #include <Processors/Pipe.h> |
31 | |
32 | |
33 | namespace DB |
34 | { |
35 | |
36 | /** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper. |
37 | * |
38 | * ZooKeeper is used for the following things: |
39 | * - the structure of the table (/ metadata, /columns) |
40 | * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...); |
41 | * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host); |
42 | * - select the leader replica (/leader_election) - this is the replica that assigns the merge; |
43 | * - a set of parts of data on each replica (/replicas/replica_name/parts); |
44 | * - list of the last N blocks of data with checksum, for deduplication (/blocks); |
45 | * - the list of incremental block numbers (/block_numbers) that we are about to insert, |
46 | * to ensure the linear order of data insertion and data merge only on the intervals in this sequence; |
47 | * - coordinates writes with quorum (/quorum). |
48 | * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations). |
49 | * See comments in StorageReplicatedMergeTree::mutate() for details. |
50 | */ |
51 | |
52 | /** The replicated tables have a common log (/log/log-...). |
53 | * Log - a sequence of entries (LogEntry) about what to do. |
54 | * Each entry is one of: |
55 | * - normal data insertion (GET), |
56 | * - merge (MERGE), |
57 | * - delete the partition (DROP). |
58 | * |
59 | * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...) |
60 | * and then executes them (queueTask). |
61 | * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry). |
62 | * In addition, the records in the queue can be generated independently (not from the log), in the following cases: |
63 | * - when creating a new replica, actions are put on GET from other replicas (createReplica); |
64 | * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check (at start - checkParts, while running - searchForMissingPart), |
65 | * actions are put on GET from other replicas; |
66 | * |
67 | * The replica to which INSERT was made in the queue will also have an entry of the GET of this data. |
68 | * Such an entry is considered to be executed as soon as the queue handler sees it. |
69 | * |
70 | * The log entry has a creation time. This time is generated by the clock of server that created entry |
71 | * - the one on which the corresponding INSERT or ALTER query came. |
72 | * |
73 | * For the entries in the queue that the replica made for itself, |
74 | * as the time will take the time of creation the appropriate part on any of the replicas. |
75 | */ |
76 | |
77 | class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicatedMergeTree>, public MergeTreeData |
78 | { |
79 | friend struct ext::shared_ptr_helper<StorageReplicatedMergeTree>; |
80 | public: |
81 | void startup() override; |
82 | void shutdown() override; |
83 | ~StorageReplicatedMergeTree() override; |
84 | |
85 | std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree" ; } |
86 | std::string getTableName() const override { return table_name; } |
87 | std::string getDatabaseName() const override { return database_name; } |
88 | |
89 | bool supportsReplication() const override { return true; } |
90 | bool supportsDeduplication() const override { return true; } |
91 | |
92 | Pipes readWithProcessors( |
93 | const Names & column_names, |
94 | const SelectQueryInfo & query_info, |
95 | const Context & context, |
96 | QueryProcessingStage::Enum processed_stage, |
97 | size_t max_block_size, |
98 | unsigned num_streams) override; |
99 | |
100 | bool supportProcessorsPipeline() const override { return true; } |
101 | |
102 | std::optional<UInt64> totalRows() const override; |
103 | |
104 | BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; |
105 | |
106 | bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context) override; |
107 | |
108 | void alter(const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override; |
109 | |
110 | void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override; |
111 | |
112 | void mutate(const MutationCommands & commands, const Context & context) override; |
113 | std::vector<MergeTreeMutationStatus> getMutationsStatus() const override; |
114 | CancellationCode killMutation(const String & mutation_id) override; |
115 | |
116 | /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper. |
117 | */ |
118 | void drop(TableStructureWriteLockHolder &) override; |
119 | |
120 | void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override; |
121 | |
122 | void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override; |
123 | |
124 | bool supportsIndexForIn() const override { return true; } |
125 | |
126 | void checkTableCanBeDropped() const override; |
127 | |
128 | void checkPartitionCanBeDropped(const ASTPtr & partition) override; |
129 | |
130 | ActionLock getActionLock(StorageActionBlockType action_type) override; |
131 | |
132 | /// Wait when replication queue size becomes less or equal than queue_size |
133 | /// If timeout is exceeded returns false |
134 | bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0); |
135 | |
136 | /** For the system table replicas. */ |
137 | struct Status |
138 | { |
139 | bool is_leader; |
140 | bool can_become_leader; |
141 | bool is_readonly; |
142 | bool is_session_expired; |
143 | ReplicatedMergeTreeQueue::Status queue; |
144 | UInt32 parts_to_check; |
145 | String zookeeper_path; |
146 | String replica_name; |
147 | String replica_path; |
148 | Int32 columns_version; |
149 | UInt64 log_max_index; |
150 | UInt64 log_pointer; |
151 | UInt64 absolute_delay; |
152 | UInt8 total_replicas; |
153 | UInt8 active_replicas; |
154 | }; |
155 | |
156 | /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK. |
157 | void getStatus(Status & res, bool with_zk_fields = true); |
158 | |
159 | using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>; |
160 | void getQueue(LogEntriesData & res, String & replica_name); |
161 | |
162 | /// Get replica delay relative to current time. |
163 | time_t getAbsoluteDelay() const; |
164 | |
165 | /// If the absolute delay is greater than min_relative_delay_to_yield_leadership, |
166 | /// will also calculate the difference from the unprocessed time of the best replica. |
167 | /// NOTE: Will communicate to ZooKeeper to calculate relative delay. |
168 | void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay); |
169 | |
170 | /// Add a part to the queue of parts whose data you want to check in the background thread. |
171 | void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0) |
172 | { |
173 | part_check_thread.enqueuePart(part_name, delay_to_check_seconds); |
174 | } |
175 | |
176 | CheckResults checkData(const ASTPtr & query, const Context & context) override; |
177 | |
178 | /// Checks ability to use granularity |
179 | bool canUseAdaptiveGranularity() const override; |
180 | |
181 | private: |
182 | |
183 | /// Get a sequential consistent view of current parts. |
184 | ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; |
185 | |
186 | /// Delete old parts from disk and from ZooKeeper. |
187 | void clearOldPartsAndRemoveFromZK(); |
188 | |
189 | friend class ReplicatedMergeTreeBlockOutputStream; |
190 | friend class ReplicatedMergeTreePartCheckThread; |
191 | friend class ReplicatedMergeTreeCleanupThread; |
192 | friend class ReplicatedMergeTreeAlterThread; |
193 | friend class ReplicatedMergeTreeRestartingThread; |
194 | friend struct ReplicatedMergeTreeLogEntry; |
195 | friend class ScopedPartitionMergeLock; |
196 | friend class ReplicatedMergeTreeQueue; |
197 | friend class MergeTreeData; |
198 | |
199 | using LogEntry = ReplicatedMergeTreeLogEntry; |
200 | using LogEntryPtr = LogEntry::Ptr; |
201 | |
202 | zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below. |
203 | mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread. |
204 | |
205 | zkutil::ZooKeeperPtr tryGetZooKeeper() const; |
206 | zkutil::ZooKeeperPtr getZooKeeper() const; |
207 | void setZooKeeper(zkutil::ZooKeeperPtr zookeeper); |
208 | |
209 | /// If true, the table is offline and can not be written to it. |
210 | std::atomic_bool is_readonly {false}; |
211 | |
212 | String zookeeper_path; |
213 | String replica_name; |
214 | String replica_path; |
215 | |
216 | /** /replicas/me/is_active. |
217 | */ |
218 | zkutil::EphemeralNodeHolderPtr replica_is_active_node; |
219 | |
220 | /** Version of the /columns node in ZooKeeper corresponding to the current data.columns. |
221 | * Read and modify along with the data.columns - under TableStructureLock. |
222 | */ |
223 | int columns_version = -1; |
224 | |
225 | /// Version of the /metadata node in ZooKeeper. |
226 | int metadata_version = -1; |
227 | |
228 | /// Used to delay setting table structure till startup() in case of an offline ALTER. |
229 | std::function<void()> set_table_structure_at_startup; |
230 | |
231 | /** Is this replica "leading". The leader replica selects the parts to merge. |
232 | */ |
233 | std::atomic<bool> is_leader {false}; |
234 | zkutil::LeaderElectionPtr leader_election; |
235 | |
236 | InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; |
237 | |
238 | MergeTreeDataSelectExecutor reader; |
239 | MergeTreeDataWriter writer; |
240 | MergeTreeDataMergerMutator merger_mutator; |
241 | |
242 | /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/). |
243 | * In ZK entries in chronological order. Here it is not necessary. |
244 | */ |
245 | ReplicatedMergeTreeQueue queue; |
246 | std::atomic<time_t> last_queue_update_start_time{0}; |
247 | std::atomic<time_t> last_queue_update_finish_time{0}; |
248 | |
249 | DataPartsExchange::Fetcher fetcher; |
250 | |
251 | |
252 | /// When activated, replica is initialized and startup() method could exit |
253 | Poco::Event startup_event; |
254 | |
255 | /// Do I need to complete background threads (except restarting_thread)? |
256 | std::atomic<bool> partial_shutdown_called {false}; |
257 | |
258 | /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires. |
259 | Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET |
260 | |
261 | /// Limiting parallel fetches per one table |
262 | std::atomic_uint current_table_fetches {0}; |
263 | |
264 | /// Threads. |
265 | |
266 | /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. |
267 | bool queue_update_in_progress = false; |
268 | BackgroundSchedulePool::TaskHolder queue_updating_task; |
269 | |
270 | BackgroundSchedulePool::TaskHolder mutations_updating_task; |
271 | |
272 | /// A task that performs actions from the queue. |
273 | BackgroundProcessingPool::TaskHandle queue_task_handle; |
274 | |
275 | /// A task which move parts to another disks/volumes |
276 | /// Transparent for replication. |
277 | BackgroundProcessingPool::TaskHandle move_parts_task_handle; |
278 | |
279 | /// A task that selects parts to merge. |
280 | BackgroundSchedulePool::TaskHolder merge_selecting_task; |
281 | /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. |
282 | std::mutex merge_selecting_mutex; |
283 | |
284 | /// A task that marks finished mutations as done. |
285 | BackgroundSchedulePool::TaskHolder mutations_finalizing_task; |
286 | |
287 | /// A thread that removes old parts, log entries, and blocks. |
288 | ReplicatedMergeTreeCleanupThread cleanup_thread; |
289 | |
290 | /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. |
291 | ReplicatedMergeTreeAlterThread alter_thread; |
292 | |
293 | /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. |
294 | ReplicatedMergeTreePartCheckThread part_check_thread; |
295 | |
296 | /// A thread that processes reconnection to ZooKeeper when the session expires. |
297 | ReplicatedMergeTreeRestartingThread restarting_thread; |
298 | |
299 | /// An event that awakens `alter` method from waiting for the completion of the ALTER query. |
300 | zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>(); |
301 | |
302 | /// True if replica was created for existing table with fixed granularity |
303 | bool other_replicas_fixed_granularity = false; |
304 | |
305 | /** Creates the minimum set of nodes in ZooKeeper. |
306 | */ |
307 | void createTableIfNotExists(); |
308 | |
309 | /** Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas. |
310 | */ |
311 | void createReplica(); |
312 | |
313 | /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running. |
314 | */ |
315 | void createNewZooKeeperNodes(); |
316 | |
317 | /** Verify that the list of columns and table settings match those specified in ZK (/metadata). |
318 | * If not, throw an exception. |
319 | * Must be called before startup(). |
320 | */ |
321 | void checkTableStructure(bool skip_sanity_checks, bool allow_alter); |
322 | |
323 | /// A part of ALTER: apply metadata changes only (data parts are altered separately). |
324 | /// Must be called under IStorage::lockStructureForAlter() lock. |
325 | void setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff); |
326 | |
327 | /** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/). |
328 | * If any parts described in ZK are not locally, throw an exception. |
329 | * If any local parts are not mentioned in ZK, remove them. |
330 | * But if there are too many, throw an exception just in case - it's probably a configuration error. |
331 | */ |
332 | void checkParts(bool skip_sanity_checks); |
333 | |
334 | /** Check that the part's checksum is the same as the checksum of the same part on some other replica. |
335 | * If no one has such a part, nothing checks. |
336 | * Not very reliable: if two replicas add a part almost at the same time, no checks will occur. |
337 | * Adds actions to `ops` that add data about the part into ZooKeeper. |
338 | * Call under TableStructureLock. |
339 | */ |
340 | void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part, |
341 | Coordination::Requests & ops, String part_name = "" , NameSet * absent_replicas_paths = nullptr); |
342 | |
343 | String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const; |
344 | |
345 | /// Accepts a PreComitted part, atomically checks its checksums with ones on other replicas and commit the part |
346 | DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, |
347 | const DataPartPtr & part); |
348 | |
349 | bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; |
350 | |
351 | void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "" ) const; |
352 | |
353 | /// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful. |
354 | void updatePartHeaderInZooKeeperAndCommit( |
355 | const zkutil::ZooKeeperPtr & zookeeper, |
356 | AlterDataPartTransaction & transaction); |
357 | |
358 | /// Adds actions to `ops` that remove a part from ZooKeeper. |
359 | /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes). |
360 | void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children); |
361 | |
362 | /// Quickly removes big set of parts from ZooKeeper (using async multi queries) |
363 | void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, |
364 | NameSet * parts_should_be_retried = nullptr); |
365 | |
366 | bool tryRemovePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5); |
367 | bool tryRemovePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5); |
368 | |
369 | /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts. |
370 | void removePartAndEnqueueFetch(const String & part_name); |
371 | |
372 | /// Running jobs from the queue. |
373 | |
374 | /** Execute the action from the queue. Throws an exception if something is wrong. |
375 | * Returns whether or not it succeeds. If it did not work, write it to the end of the queue. |
376 | */ |
377 | bool executeLogEntry(LogEntry & entry); |
378 | |
379 | |
380 | void executeDropRange(const LogEntry & entry); |
381 | |
382 | /// Do the merge or recommend to make the fetch instead of the merge |
383 | bool tryExecuteMerge(const LogEntry & entry); |
384 | |
385 | bool tryExecutePartMutation(const LogEntry & entry); |
386 | |
387 | |
388 | bool executeFetch(LogEntry & entry); |
389 | |
390 | void executeClearColumnOrIndexInPartition(const LogEntry & entry); |
391 | |
392 | bool executeReplaceRange(const LogEntry & entry); |
393 | |
394 | /** Updates the queue. |
395 | */ |
396 | void queueUpdatingTask(); |
397 | |
398 | void mutationsUpdatingTask(); |
399 | |
400 | /** Clone data from another replica. |
401 | * If replica can not be cloned throw Exception. |
402 | */ |
403 | void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper); |
404 | |
405 | /// Clone replica if it is lost. |
406 | void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper); |
407 | |
408 | /** Performs actions from the queue. |
409 | */ |
410 | BackgroundProcessingPoolTaskResult queueTask(); |
411 | |
412 | /// Perform moves of parts to another disks. |
413 | /// Local operation, doesn't interact with replicationg queue. |
414 | BackgroundProcessingPoolTaskResult movePartsTask(); |
415 | |
416 | |
417 | /// Postcondition: |
418 | /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) |
419 | /// or an exception is thrown and leader_election is destroyed. |
420 | void enterLeaderElection(); |
421 | |
422 | /// Postcondition: |
423 | /// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr. |
424 | /// leader_election node in ZK is either deleted, or the session is marked expired. |
425 | void exitLeaderElection(); |
426 | |
427 | /** Selects the parts to merge and writes to the log. |
428 | */ |
429 | void mergeSelectingTask(); |
430 | |
431 | /// Checks if some mutations are done and marks them as done. |
432 | void mutationsFinalizingTask(); |
433 | |
434 | /** Write the selected parts to merge into the log, |
435 | * Call when merge_selecting_mutex is locked. |
436 | * Returns false if any part is not in ZK. |
437 | */ |
438 | bool createLogEntryToMergeParts( |
439 | zkutil::ZooKeeperPtr & zookeeper, |
440 | const DataPartsVector & parts, |
441 | const String & merged_name, |
442 | bool deduplicate, |
443 | bool force_ttl, |
444 | ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr); |
445 | |
446 | bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version); |
447 | |
448 | /// Exchange parts. |
449 | |
450 | /** Returns an empty string if no one has a part. |
451 | */ |
452 | String findReplicaHavingPart(const String & part_name, bool active); |
453 | |
454 | /** Find replica having specified part or any part that covers it. |
455 | * If active = true, consider only active replicas. |
456 | * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part. |
457 | * If not found, returns empty string. |
458 | */ |
459 | String findReplicaHavingCoveringPart(LogEntry & entry, bool active); |
460 | String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name); |
461 | |
462 | /** Download the specified part from the specified replica. |
463 | * If `to_detached`, the part is placed in the `detached` directory. |
464 | * If quorum != 0, then the node for tracking the quorum is updated. |
465 | * Returns false if part is already fetching right now. |
466 | */ |
467 | bool fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum); |
468 | |
469 | /// Required only to avoid races between executeLogEntry and fetchPartition |
470 | std::unordered_set<String> currently_fetching_parts; |
471 | std::mutex currently_fetching_parts_mutex; |
472 | |
473 | |
474 | /// With the quorum being tracked, add a replica to the quorum for the part. |
475 | void updateQuorum(const String & part_name); |
476 | |
477 | /// Creates new block number if block with such block_id does not exist |
478 | std::optional<EphemeralLockInZooKeeper> allocateBlockNumber( |
479 | const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, |
480 | const String & zookeeper_block_id_path = "" ); |
481 | |
482 | /** Wait until all replicas, including this, execute the specified action from the log. |
483 | * If replicas are added at the same time, it can not wait the added replica . |
484 | * |
485 | * NOTE: This method must be called without table lock held. |
486 | * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. |
487 | * TODO: There are wrong usages of this method that are not fixed yet. |
488 | */ |
489 | void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry); |
490 | |
491 | /** Wait until the specified replica executes the specified action from the log. |
492 | * NOTE: See comment about locks above. |
493 | */ |
494 | void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); |
495 | |
496 | /// Choose leader replica, send requst to it and wait. |
497 | void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); |
498 | |
499 | /// Throw an exception if the table is readonly. |
500 | void assertNotReadonly() const; |
501 | |
502 | /// Produce an imaginary part info covering all parts in the specified partition (at the call moment). |
503 | /// Returns false if the partition doesn't exist yet. |
504 | bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info); |
505 | |
506 | /// Check for a node in ZK. If it is, remember this information, and then immediately answer true. |
507 | std::unordered_set<std::string> existing_nodes_cache; |
508 | std::mutex existing_nodes_cache_mutex; |
509 | bool existsNodeCached(const std::string & path); |
510 | |
511 | /// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range. |
512 | void clearBlocksInPartition( |
513 | zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); |
514 | |
515 | /// Info about how other replicas can access this one. |
516 | ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const; |
517 | |
518 | bool dropPartsInPartition(zkutil::ZooKeeper & zookeeper, String & partition_id, |
519 | StorageReplicatedMergeTree::LogEntry & entry, bool detach); |
520 | |
521 | /// Find cluster address for host |
522 | std::optional<Cluster::Address> findClusterAddress(const ReplicatedMergeTreeAddress & leader_address) const; |
523 | |
524 | // Partition helpers |
525 | void clearColumnOrIndexInPartition(const ASTPtr & partition, LogEntry && entry, const Context & query_context); |
526 | void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context); |
527 | void attachPartition(const ASTPtr & partition, bool part, const Context & query_context); |
528 | void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context); |
529 | void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context); |
530 | |
531 | /// Check granularity of already existing replicated table in zookeeper if it exists |
532 | /// return true if it's fixed |
533 | bool checkFixedGranualrityInZookeeper(); |
534 | |
535 | /// Wait for timeout seconds mutation is finished on replicas |
536 | void waitMutationToFinishOnReplicas( |
537 | const Strings & replicas, const String & mutation_id) const; |
538 | |
539 | protected: |
540 | /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. |
541 | */ |
542 | StorageReplicatedMergeTree( |
543 | const String & zookeeper_path_, |
544 | const String & replica_name_, |
545 | bool attach, |
546 | const String & database_name_, const String & name_, |
547 | const String & relative_data_path_, |
548 | const StorageInMemoryMetadata & metadata, |
549 | Context & context_, |
550 | const String & date_column_name, |
551 | const MergingParams & merging_params_, |
552 | std::unique_ptr<MergeTreeSettings> settings_, |
553 | bool has_force_restore_data_flag); |
554 | }; |
555 | |
556 | |
557 | extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER; |
558 | |
559 | } |
560 | |