1#pragma once
2
3#include <Common/SimpleIncrement.h>
4#include <Interpreters/Context.h>
5#include <Interpreters/ExpressionActions.h>
6#include <Storages/IStorage.h>
7#include <Storages/MergeTree/MergeTreeIndices.h>
8#include <Storages/MergeTree/MergeTreePartInfo.h>
9#include <Storages/MergeTree/MergeTreeSettings.h>
10#include <Storages/MergeTree/MergeTreeMutationStatus.h>
11#include <Storages/MergeTree/MergeList.h>
12#include <Storages/MergeTree/PartDestinationType.h>
13#include <IO/ReadBufferFromString.h>
14#include <IO/WriteBufferFromFile.h>
15#include <IO/ReadBufferFromFile.h>
16#include <DataTypes/DataTypeString.h>
17#include <DataTypes/DataTypesNumber.h>
18#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
19#include <Storages/MergeTree/MergeTreeDataPart.h>
20#include <Storages/IndicesDescription.h>
21#include <Storages/MergeTree/MergeTreePartsMover.h>
22#include <Interpreters/PartLog.h>
23#include <Disks/DiskSpaceMonitor.h>
24
25#include <boost/multi_index_container.hpp>
26#include <boost/multi_index/ordered_index.hpp>
27#include <boost/multi_index/global_fun.hpp>
28#include <boost/range/iterator_range_core.hpp>
29
30
31namespace DB
32{
33
34class MergeListEntry;
35class AlterCommands;
36class MergeTreePartsMover;
37
38namespace ErrorCodes
39{
40 extern const int LOGICAL_ERROR;
41 extern const int INVALID_PARTITION_NAME;
42 extern const int NO_SUCH_DATA_PART;
43 extern const int DUPLICATE_DATA_PART;
44 extern const int DIRECTORY_ALREADY_EXISTS;
45 extern const int TOO_MANY_UNEXPECTED_DATA_PARTS;
46 extern const int NO_SUCH_COLUMN_IN_TABLE;
47 extern const int TABLE_DIFFERS_TOO_MUCH;
48}
49
50
51/// Data structure for *MergeTree engines.
52/// Merge tree is used for incremental sorting of data.
53/// The table consists of several sorted parts.
54/// During insertion new data is sorted according to the primary key and is written to the new part.
55/// Parts are merged in the background according to a heuristic algorithm.
56/// For each part the index file is created containing primary key values for every n-th row.
57/// This allows efficient selection by primary key range predicate.
58///
59/// Additionally:
60///
61/// The date column is specified. For each part min and max dates are remembered.
62/// Essentially it is an index too.
63///
64/// Data is partitioned by the value of the partitioning expression.
65/// Parts belonging to different partitions are not merged - for the ease of administration (data sync and backup).
66///
67/// File structure of old-style month-partitioned tables (format_version = 0):
68/// Part directory - / min-date _ max-date _ min-id _ max-id _ level /
69/// Inside the part directory:
70/// checksums.txt - contains the list of all files along with their sizes and checksums.
71/// columns.txt - contains the list of all columns and their types.
72/// primary.idx - contains the primary index.
73/// [Column].bin - contains compressed column data.
74/// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows.
75///
76/// File structure of tables with custom partitioning (format_version >= 1):
77/// Part directory - / partiiton-id _ min-id _ max-id _ level /
78/// Inside the part directory:
79/// The same files as for month-partitioned tables, plus
80/// count.txt - contains total number of rows in this part.
81/// partition.dat - contains the value of the partitioning expression.
82/// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
83///
84/// Several modes are implemented. Modes determine additional actions during merge:
85/// - Ordinary - don't do anything special
86/// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values
87/// of primary key (cf. CollapsingSortedBlockInputStream.h)
88/// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version
89/// column is set, keep the latest row with the maximal version.
90/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
91/// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key.
92/// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring).
93
94/// The MergeTreeData class contains a list of parts and the data structure parameters.
95/// To read and modify the data use other classes:
96/// - MergeTreeDataSelectExecutor
97/// - MergeTreeDataWriter
98/// - MergeTreeDataMergerMutator
99
100class MergeTreeData : public IStorage
101{
102public:
103 /// Function to call if the part is suspected to contain corrupt data.
104 using BrokenPartCallback = std::function<void (const String &)>;
105 using DataPart = MergeTreeDataPart;
106
107 using MutableDataPartPtr = std::shared_ptr<DataPart>;
108 using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
109 /// After the DataPart is added to the working set, it cannot be changed.
110 using DataPartPtr = std::shared_ptr<const DataPart>;
111
112 using DataPartState = MergeTreeDataPart::State;
113 using DataPartStates = std::initializer_list<DataPartState>;
114 using DataPartStateVector = std::vector<DataPartState>;
115
116 /// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
117 struct DataPartStateAndInfo
118 {
119 DataPartState state;
120 const MergeTreePartInfo & info;
121 };
122
123 /// Auxiliary structure for index comparison
124 struct DataPartStateAndPartitionID
125 {
126 DataPartState state;
127 String partition_id;
128 };
129
130 STRONG_TYPEDEF(String, PartitionID)
131
132 struct LessDataPart
133 {
134 using is_transparent = void;
135
136 bool operator()(const DataPartPtr & lhs, const MergeTreePartInfo & rhs) const { return lhs->info < rhs; }
137 bool operator()(const MergeTreePartInfo & lhs, const DataPartPtr & rhs) const { return lhs < rhs->info; }
138 bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
139 bool operator()(const MergeTreePartInfo & lhs, const PartitionID & rhs) const { return lhs.partition_id < rhs.toUnderType(); }
140 bool operator()(const PartitionID & lhs, const MergeTreePartInfo & rhs) const { return lhs.toUnderType() < rhs.partition_id; }
141 };
142
143 struct LessStateDataPart
144 {
145 using is_transparent = void;
146
147 bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
148 {
149 return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
150 < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
151 }
152
153 bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
154 {
155 return static_cast<size_t>(info.state) < static_cast<size_t>(state);
156 }
157
158 bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
159 {
160 return static_cast<size_t>(state) < static_cast<size_t>(info.state);
161 }
162
163 bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndPartitionID & rhs) const
164 {
165 return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.partition_id)
166 < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.partition_id);
167 }
168
169 bool operator() (const DataPartStateAndPartitionID & lhs, const DataPartStateAndInfo & rhs) const
170 {
171 return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.partition_id)
172 < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.partition_id);
173 }
174 };
175
176 using DataParts = std::set<DataPartPtr, LessDataPart>;
177 using DataPartsVector = std::vector<DataPartPtr>;
178
179 using DataPartsLock = std::unique_lock<std::mutex>;
180 DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }
181
182 /// Auxiliary object to add a set of parts into the working set in two steps:
183 /// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
184 /// * Next, if commit() is called, the parts are added to the active set and the parts that are
185 /// covered by them are marked Outdated.
186 /// If neither commit() nor rollback() was called, the destructor rollbacks the operation.
187 class Transaction : private boost::noncopyable
188 {
189 public:
190 Transaction(MergeTreeData & data_) : data(data_) {}
191
192 DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
193
194 void rollback();
195
196 size_t size() const { return precommitted_parts.size(); }
197 bool isEmpty() const { return precommitted_parts.empty(); }
198
199 ~Transaction()
200 {
201 try
202 {
203 rollback();
204 }
205 catch (...)
206 {
207 tryLogCurrentException("~MergeTreeData::Transaction");
208 }
209 }
210
211 private:
212 friend class MergeTreeData;
213
214 MergeTreeData & data;
215 DataParts precommitted_parts;
216
217 void clear() { precommitted_parts.clear(); }
218 };
219
220 /// An object that stores the names of temporary files created in the part directory during ALTER of its
221 /// columns.
222 class AlterDataPartTransaction : private boost::noncopyable
223 {
224 public:
225 /// Renames temporary files, finishing the ALTER of the part.
226 void commit();
227
228 /// If commit() was not called, deletes temporary files, canceling the ALTER.
229 ~AlterDataPartTransaction();
230
231 const String & getPartName() const { return data_part->name; }
232
233 /// Review the changes before the commit.
234 const NamesAndTypesList & getNewColumns() const { return new_columns; }
235 const DataPart::Checksums & getNewChecksums() const { return new_checksums; }
236
237 AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
238 const DataPartPtr & getDataPart() const { return data_part; }
239 bool isValid() const;
240
241 private:
242 friend class MergeTreeData;
243 void clear();
244
245 bool valid = true;
246
247 DataPartPtr data_part;
248
249 DataPart::Checksums new_checksums;
250 NamesAndTypesList new_columns;
251 /// If the value is an empty string, the file is not temporary, and it must be deleted.
252 NameToNameMap rename_map;
253 };
254
255 using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>;
256
257 struct PartsTemporaryRename : private boost::noncopyable
258 {
259 PartsTemporaryRename(
260 const MergeTreeData & storage_,
261 const String & source_dir_)
262 : storage(storage_)
263 , source_dir(source_dir_)
264 {
265 }
266
267 void addPart(const String & old_name, const String & new_name);
268
269 /// Renames part from old_name to new_name
270 void tryRenameAll();
271
272 /// Renames all added parts from new_name to old_name if old name is not empty
273 ~PartsTemporaryRename();
274
275 const MergeTreeData & storage;
276 const String source_dir;
277 std::vector<std::pair<String, String>> old_and_new_names;
278 std::unordered_map<String, String> old_part_name_to_full_path;
279 bool renamed = false;
280 };
281
282 /// Parameters for various modes.
283 struct MergingParams
284 {
285 /// Merging mode. See above.
286 enum Mode
287 {
288 Ordinary = 0, /// Enum values are saved. Do not change them.
289 Collapsing = 1,
290 Summing = 2,
291 Aggregating = 3,
292 Replacing = 5,
293 Graphite = 6,
294 VersionedCollapsing = 7,
295 };
296
297 Mode mode;
298
299 /// For Collapsing and VersionedCollapsing mode.
300 String sign_column;
301
302 /// For Summing mode. If empty - columns_to_sum is determined automatically.
303 Names columns_to_sum;
304
305 /// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
306 String version_column;
307
308 /// For Graphite mode.
309 Graphite::Params graphite_params;
310
311 /// Check that needed columns are present and have correct types.
312 void check(const NamesAndTypesList & columns) const;
313
314 String getModeName() const;
315 };
316
317 /// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns.
318 /// Correctness of names and paths is not checked.
319 ///
320 /// date_column_name - if not empty, the name of the Date column used for partitioning by month.
321 /// Otherwise, partition_by_ast is used for partitioning.
322 ///
323 /// order_by_ast - a single expression or a tuple. It is used as a sorting key
324 /// (an ASTExpressionList used for sorting data in parts);
325 /// primary_key_ast - can be nullptr, an expression, or a tuple.
326 /// Used to determine an ASTExpressionList values of which are written in the primary.idx file
327 /// for one row in every `index_granularity` rows to speed up range queries.
328 /// Primary key must be a prefix of the sorting key;
329 /// If it is nullptr, then it will be determined from order_by_ast.
330 ///
331 /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
332 /// attach - whether the existing table is attached or the new table is created.
333 MergeTreeData(const String & database_, const String & table_,
334 const String & relative_data_path_,
335 const StorageInMemoryMetadata & metadata,
336 Context & context_,
337 const String & date_column_name,
338 const MergingParams & merging_params_,
339 std::unique_ptr<MergeTreeSettings> settings_,
340 bool require_part_metadata_,
341 bool attach,
342 BrokenPartCallback broken_part_callback_ = [](const String &){});
343
344
345 StorageInMemoryMetadata getInMemoryMetadata() const override;
346 ASTPtr getPartitionKeyAST() const override { return partition_by_ast; }
347 ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; }
348 ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; }
349 ASTPtr getSamplingKeyAST() const override { return sample_by_ast; }
350
351 Names getColumnsRequiredForPartitionKey() const override { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); }
352 Names getColumnsRequiredForSortingKey() const override { return sorting_key_expr->getRequiredColumns(); }
353 Names getColumnsRequiredForPrimaryKey() const override { return primary_key_expr->getRequiredColumns(); }
354 Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
355 Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); }
356 Names getSortingKeyColumns() const override { return sorting_key_columns; }
357
358 StoragePolicyPtr getStoragePolicy() const override { return storage_policy; }
359
360 bool supportsPrewhere() const override { return true; }
361 bool supportsSampling() const override { return sample_by_ast != nullptr; }
362
363 bool supportsFinal() const override
364 {
365 return merging_params.mode == MergingParams::Collapsing
366 || merging_params.mode == MergingParams::Summing
367 || merging_params.mode == MergingParams::Aggregating
368 || merging_params.mode == MergingParams::Replacing
369 || merging_params.mode == MergingParams::VersionedCollapsing;
370 }
371
372 bool supportsSettings() const override { return true; }
373
374 bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override;
375
376 NameAndTypePair getColumn(const String & column_name) const override
377 {
378 if (column_name == "_part")
379 return NameAndTypePair("_part", std::make_shared<DataTypeString>());
380 if (column_name == "_part_index")
381 return NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>());
382 if (column_name == "_partition_id")
383 return NameAndTypePair("_partition_id", std::make_shared<DataTypeString>());
384 if (column_name == "_sample_factor")
385 return NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>());
386
387 return getColumns().getPhysical(column_name);
388 }
389
390 bool hasColumn(const String & column_name) const override
391 {
392 return getColumns().hasPhysical(column_name)
393 || column_name == "_part"
394 || column_name == "_part_index"
395 || column_name == "_partition_id"
396 || column_name == "_sample_factor";
397 }
398
399 String getDatabaseName() const override { return database_name; }
400 String getTableName() const override { return table_name; }
401
402 /// Load the set of data parts from disk. Call once - immediately after the object is created.
403 void loadDataParts(bool skip_sanity_checks);
404
405 String getLogName() const { return log_name; }
406
407 Int64 getMaxBlockNumber() const;
408
409 /// Returns a copy of the list so that the caller shouldn't worry about locks.
410 DataParts getDataParts(const DataPartStates & affordable_states) const;
411 /// Returns sorted list of the parts with specified states
412 /// out_states will contain snapshot of each part state
413 DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
414
415 /// Returns absolutely all parts (and snapshot of their states)
416 DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
417
418 /// Returns all detached parts
419 DetachedPartsInfo getDetachedParts() const;
420
421 void validateDetachedPartName(const String & name) const;
422
423 void dropDetached(const ASTPtr & partition, bool part, const Context & context);
424
425 MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
426 const Context & context, PartsTemporaryRename & renamed_parts);
427
428 /// Returns Committed parts
429 DataParts getDataParts() const;
430 DataPartsVector getDataPartsVector() const;
431
432 /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
433 DataPartPtr getActiveContainingPart(const String & part_name) const;
434 DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
435 DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
436
437 /// Swap part with it's identical copy (possible with another path on another disk).
438 /// If original part is not active or doesn't exist exception will be thrown.
439 void swapActivePart(MergeTreeData::DataPartPtr part_copy);
440
441 /// Returns all parts in specified partition
442 DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id);
443
444 /// Returns the part with the given name and state or nullptr if no such part.
445 DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
446 DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
447
448 /// Total size of active parts in bytes.
449 size_t getTotalActiveSizeInBytes() const;
450
451 size_t getTotalActiveSizeInRows() const;
452
453 size_t getPartsCount() const;
454 size_t getMaxPartsCountForPartition() const;
455
456 /// Get min value of part->info.getDataVersion() for all active parts.
457 /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
458 std::optional<Int64> getMinPartDataVersion() const;
459
460 /// If the table contains too many active parts, sleep for a while to give them time to merge.
461 /// If until is non-null, wake up from the sleep earlier if the event happened.
462 void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
463 void throwInsertIfNeeded() const;
464
465 /// Renames temporary part to a permanent part and adds it to the parts set.
466 /// It is assumed that the part does not intersect with existing parts.
467 /// If increment != nullptr, part index is determing using increment. Otherwise part index remains unchanged.
468 /// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the
469 /// active set later with out_transaction->commit()).
470 /// Else, commits the part immediately.
471 void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
472
473 /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
474 /// Returns all parts covered by the added part (in ascending order).
475 /// If out_transaction == nullptr, marks covered parts as Outdated.
476 DataPartsVector renameTempPartAndReplace(
477 MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
478
479 /// Low-level version of previous one, doesn't lock mutex
480 void renameTempPartAndReplace(
481 MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
482 DataPartsVector * out_covered_parts = nullptr);
483
484 /// Removes parts from the working set parts.
485 /// Parts in add must already be in data_parts with PreCommitted, Committed, or Outdated states.
486 /// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
487 /// clearOldParts (ignoring old_parts_lifetime).
488 void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr);
489 void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock);
490
491 /// Removes all parts from the working set parts
492 /// for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block).
493 /// If a part intersecting drop_range.max_block is found, an exception will be thrown.
494 /// Used in REPLACE PARTITION command;
495 DataPartsVector removePartsInRangeFromWorkingSet(const MergeTreePartInfo & drop_range, bool clear_without_timeout,
496 bool skip_intersecting_parts, DataPartsLock & lock);
497
498 /// Renames the part to detached/<prefix>_<part> and removes it from working set.
499 void removePartsFromWorkingSetAndCloneToDetached(const DataPartsVector & parts, bool clear_without_timeout, const String & prefix = "");
500
501 /// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
502 //// so it will not be deleted in clearOldParts.
503 /// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
504 void forgetPartAndMoveToDetached(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
505
506 /// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
507 void tryRemovePartImmediately(DataPartPtr && part);
508
509 /// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts
510 /// but not from the disk.
511 DataPartsVector grabOldParts();
512
513 /// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
514 void rollbackDeletingParts(const DataPartsVector & parts);
515
516 /// Removes parts from data_parts, they should be in Deleting state
517 void removePartsFinally(const DataPartsVector & parts);
518
519 /// Delete irrelevant parts from memory and disk.
520 void clearOldPartsFromFilesystem();
521 void clearPartsFromFilesystem(const DataPartsVector & parts);
522
523 /// Delete all directories which names begin with "tmp"
524 /// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
525 /// Must be called with locked lockStructureForShare().
526 void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1);
527
528 /// After the call to dropAllData() no method can be called.
529 /// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
530 void dropAllData();
531
532 /// Moves the entire data directory.
533 /// Flushes the uncompressed blocks cache and the marks cache.
534 /// Must be called with locked lockStructureForAlter().
535 void rename(const String & new_path_to_table_data, const String & new_database_name,
536 const String & new_table_name, TableStructureWriteLockHolder &) override;
537
538 /// Check if the ALTER can be performed:
539 /// - all needed columns are present.
540 /// - all type conversions can be done.
541 /// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected.
542 /// If something is wrong, throws an exception.
543 void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) override;
544
545 /// Performs ALTER of the data part, writes the result to temporary files.
546 /// Returns an object allowing to rename temporary files to permanent files.
547 /// If the number of affected columns is suspiciously high and skip_sanity_checks is false, throws an exception.
548 /// If no data transformations are necessary, returns nullptr.
549 void alterDataPart(
550 const NamesAndTypesList & new_columns,
551 const IndicesASTs & new_indices,
552 bool skip_sanity_checks,
553 AlterDataPartTransactionPtr& transaction);
554
555 /// Change MergeTreeSettings
556 void changeSettings(
557 const ASTPtr & new_changes,
558 TableStructureWriteLockHolder & table_lock_holder);
559
560 /// Remove columns, that have been marked as empty after zeroing values with expired ttl
561 void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
562
563 /// Freezes all parts.
564 void freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder);
565
566 /// Should be called if part data is suspected to be corrupted.
567 void reportBrokenPart(const String & name) const
568 {
569 broken_part_callback(name);
570 }
571
572 /** Get the key expression AST as an ASTExpressionList.
573 * It can be specified in the tuple: (CounterID, Date),
574 * or as one column: CounterID.
575 */
576 static ASTPtr extractKeyExpressionList(const ASTPtr & node);
577
578 bool hasSortingKey() const { return !sorting_key_columns.empty(); }
579 bool hasPrimaryKey() const { return !primary_key_columns.empty(); }
580 bool hasSkipIndices() const { return !skip_indices.empty(); }
581 bool hasTableTTL() const { return ttl_table_ast != nullptr; }
582 bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); }
583
584 /// Check that the part is not broken and calculate the checksums for it if they are not present.
585 MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path);
586 void loadPartAndFixMetadata(MutableDataPartPtr part);
587
588 /** Create local backup (snapshot) for parts with specified prefix.
589 * Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
590 * or if 'with_name' is specified - backup is created in directory with specified name.
591 */
592 void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder);
593
594
595public:
596 /// Moves partition to specified Disk
597 void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, const Context & context);
598
599 /// Moves partition to specified Volume
600 void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, const Context & context);
601
602 size_t getColumnCompressedSize(const std::string & name) const
603 {
604 auto lock = lockParts();
605 const auto it = column_sizes.find(name);
606 return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
607 }
608
609 ColumnSizeByName getColumnSizes() const override
610 {
611 auto lock = lockParts();
612 return column_sizes;
613 }
614
615 /// Calculates column sizes in compressed form for the current state of data_parts.
616 void recalculateColumnSizes()
617 {
618 auto lock = lockParts();
619 calculateColumnSizesImpl();
620 }
621
622 /// For ATTACH/DETACH/DROP PARTITION.
623 String getPartitionIDFromQuery(const ASTPtr & partition, const Context & context);
624
625 /// Extracts MergeTreeData of other *MergeTree* storage
626 /// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
627 /// Tables structure should be locked.
628 MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const;
629
630 MergeTreeData::MutableDataPartPtr cloneAndLoadDataPartOnSameDisk(
631 const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info);
632
633 virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
634
635 /// Returns true if table can create new parts with adaptive granularity
636 /// Has additional constraint in replicated version
637 virtual bool canUseAdaptiveGranularity() const
638 {
639 const auto settings = getSettings();
640 return settings->index_granularity_bytes != 0 &&
641 (settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
642 }
643
644 /// Get constant pointer to storage settings.
645 /// Copy this pointer into your scope and you will
646 /// get consistent settings.
647 MergeTreeSettingsPtr getSettings() const
648 {
649 return storage_settings.get();
650 }
651
652 /// Get table path on disk
653 String getFullPathOnDisk(const DiskPtr & disk) const;
654
655 /// Get disk for part. Looping through directories on FS because some parts maybe not in
656 /// active dataparts set (detached)
657 DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "") const;
658
659 /// Get full path for part. Uses getDiskForPart and returns the full path
660 String getFullPathForPart(const String & part_name, const String & relative_path = "") const;
661
662 Strings getDataPaths() const override;
663
664 using PathWithDisk = std::pair<String, DiskPtr>;
665 using PathsWithDisks = std::vector<PathWithDisk>;
666 PathsWithDisks getDataPathsWithDisks() const;
667
668 /// Reserves space at least 1MB.
669 ReservationPtr reserveSpace(UInt64 expected_size) const;
670
671 /// Reserves space at least 1MB on specific disk or volume.
672 ReservationPtr reserveSpace(UInt64 expected_size, SpacePtr space) const;
673 ReservationPtr tryReserveSpace(UInt64 expected_size, SpacePtr space) const;
674
675 /// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
676 ReservationPtr reserveSpacePreferringTTLRules(UInt64 expected_size,
677 const MergeTreeDataPart::TTLInfos & ttl_infos,
678 time_t time_of_move) const;
679 ReservationPtr tryReserveSpacePreferringTTLRules(UInt64 expected_size,
680 const MergeTreeDataPart::TTLInfos & ttl_infos,
681 time_t time_of_move) const;
682 /// Choose disk with max available free space
683 /// Reserves 0 bytes
684 ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); }
685
686 MergeTreeDataFormatVersion format_version;
687
688 Context & global_context;
689
690 /// Merging params - what additional actions to perform during merge.
691 const MergingParams merging_params;
692
693 bool is_custom_partitioned = false;
694 ExpressionActionsPtr partition_key_expr;
695 Block partition_key_sample;
696
697 ExpressionActionsPtr minmax_idx_expr;
698 Names minmax_idx_columns;
699 DataTypes minmax_idx_column_types;
700 Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
701 Int64 minmax_idx_time_column_pos = -1; /// In other cases, minmax index often includes a dateTime column.
702
703 /// Secondary (data skipping) indices for MergeTree
704 MergeTreeIndices skip_indices;
705
706 ExpressionActionsPtr primary_key_and_skip_indices_expr;
707 ExpressionActionsPtr sorting_key_and_skip_indices_expr;
708
709 /// Names of columns for primary key + secondary sorting columns.
710 Names sorting_key_columns;
711 ASTPtr sorting_key_expr_ast;
712 ExpressionActionsPtr sorting_key_expr;
713
714 /// Names of columns for primary key.
715 Names primary_key_columns;
716 ASTPtr primary_key_expr_ast;
717 ExpressionActionsPtr primary_key_expr;
718 Block primary_key_sample;
719 DataTypes primary_key_data_types;
720
721 struct TTLEntry
722 {
723 ExpressionActionsPtr expression;
724 String result_column;
725
726 /// Name and type of a destination are only valid in table-level context.
727 PartDestinationType destination_type;
728 String destination_name;
729
730 ASTPtr entry_ast;
731
732 /// Returns destination disk or volume for this rule.
733 SpacePtr getDestination(const StoragePolicyPtr & policy) const;
734
735 /// Checks if given part already belongs destination disk or volume for this rule.
736 bool isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const;
737 };
738
739 const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
740
741 using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
742 TTLEntriesByName column_ttl_entries_by_name;
743
744 TTLEntry ttl_table_entry;
745 std::vector<TTLEntry> move_ttl_entries;
746
747 String sampling_expr_column_name;
748 Names columns_required_for_sampling;
749
750 /// Limiting parallel sends per one table, used in DataPartsExchange
751 std::atomic_uint current_table_sends {0};
752
753 /// For generating names of temporary parts during insertion.
754 SimpleIncrement insert_increment;
755
756 bool has_non_adaptive_index_granularity_parts = false;
757
758 /// Parts that currently moving from disk/volume to another.
759 /// This set have to be used with `currently_processing_in_background_mutex`.
760 /// Moving may conflict with merges and mutations, but this is OK, because
761 /// if we decide to move some part to another disk, than we
762 /// assuredly will choose this disk for containing part, which will appear
763 /// as result of merge or mutation.
764 DataParts currently_moving_parts;
765
766 /// Mutex for currently_moving_parts
767 mutable std::mutex moving_parts_mutex;
768
769protected:
770
771 friend struct MergeTreeDataPart;
772 friend class MergeTreeDataMergerMutator;
773 friend class ReplicatedMergeTreeAlterThread;
774 friend struct ReplicatedMergeTreeTableMetadata;
775 friend class StorageReplicatedMergeTree;
776
777 ASTPtr partition_by_ast;
778 ASTPtr order_by_ast;
779 ASTPtr primary_key_ast;
780 ASTPtr sample_by_ast;
781 ASTPtr ttl_table_ast;
782 ASTPtr settings_ast;
783
784 bool require_part_metadata;
785
786 String database_name;
787 String table_name;
788 String relative_data_path;
789
790
791 /// Current column sizes in compressed and uncompressed form.
792 ColumnSizeByName column_sizes;
793
794 /// Engine-specific methods
795 BrokenPartCallback broken_part_callback;
796
797 String log_name;
798 Logger * log;
799
800 /// Storage settings.
801 /// Use get and set to receive readonly versions.
802 MultiVersion<MergeTreeSettings> storage_settings;
803
804 StoragePolicyPtr storage_policy;
805
806 /// Work with data parts
807
808 struct TagByInfo{};
809 struct TagByStateAndInfo{};
810
811 static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
812 {
813 return part->info;
814 }
815
816 static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part)
817 {
818 return {part->state, part->info};
819 }
820
821 using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
822 boost::multi_index::indexed_by<
823 /// Index by Info
824 boost::multi_index::ordered_unique<
825 boost::multi_index::tag<TagByInfo>,
826 boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
827 >,
828 /// Index by (State, Info), is used to obtain ordered slices of parts with the same state
829 boost::multi_index::ordered_unique<
830 boost::multi_index::tag<TagByStateAndInfo>,
831 boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
832 LessStateDataPart
833 >
834 >
835 >;
836
837 /// Current set of data parts.
838 mutable std::mutex data_parts_mutex;
839 DataPartsIndexes data_parts_indexes;
840 DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
841 DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
842
843 MergeTreePartsMover parts_mover;
844
845 using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
846 using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;
847
848 boost::iterator_range<DataPartIteratorByStateAndInfo> getDataPartsStateRange(DataPartState state) const
849 {
850 auto begin = data_parts_by_state_and_info.lower_bound(state, LessStateDataPart());
851 auto end = data_parts_by_state_and_info.upper_bound(state, LessStateDataPart());
852 return {begin, end};
853 }
854
855 boost::iterator_range<DataPartIteratorByInfo> getDataPartsPartitionRange(const String & partition_id) const
856 {
857 auto begin = data_parts_by_info.lower_bound(PartitionID(partition_id), LessDataPart());
858 auto end = data_parts_by_info.upper_bound(PartitionID(partition_id), LessDataPart());
859 return {begin, end};
860 }
861
862 static decltype(auto) getStateModifier(DataPartState state)
863 {
864 return [state] (const DataPartPtr & part) { part->state = state; };
865 }
866
867 void modifyPartState(DataPartIteratorByStateAndInfo it, DataPartState state)
868 {
869 if (!data_parts_by_state_and_info.modify(it, getStateModifier(state)))
870 throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
871 }
872
873 void modifyPartState(DataPartIteratorByInfo it, DataPartState state)
874 {
875 if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
876 throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
877 }
878
879 void modifyPartState(const DataPartPtr & part, DataPartState state)
880 {
881 auto it = data_parts_by_info.find(part->info);
882 if (it == data_parts_by_info.end() || (*it).get() != part.get())
883 throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
884
885 if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
886 throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
887 }
888
889 /// Used to serialize calls to grabOldParts.
890 std::mutex grab_old_parts_mutex;
891 /// The same for clearOldTemporaryDirectories.
892 std::mutex clear_old_temporary_directories_mutex;
893 /// Mutex for settings usage
894
895 void setProperties(const StorageInMemoryMetadata & metadata, bool only_check = false);
896
897 void initPartitionKey();
898
899 void setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls,
900 const ASTPtr & new_ttl_table_ast, bool only_check = false);
901
902 /// Expression for column type conversion.
903 /// If no conversions are needed, out_expression=nullptr.
904 /// out_rename_map maps column files for the out_expression onto new table files.
905 /// out_force_update_metadata denotes if metadata must be changed even if out_rename_map is empty (used
906 /// for transformation-free changing of Enum values list).
907 /// Files to be deleted are mapped to an empty string in out_rename_map.
908 /// If part == nullptr, just checks that all type conversions are possible.
909 void createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
910 const IndicesASTs & old_indices, const IndicesASTs & new_indices,
911 ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const;
912
913 /// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
914 void calculateColumnSizesImpl();
915 /// Adds or subtracts the contribution of the part to compressed column sizes.
916 void addPartContributionToColumnSizes(const DataPartPtr & part);
917 void removePartContributionToColumnSizes(const DataPartPtr & part);
918
919 /// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
920 DataPartPtr getAnyPartInPartition(const String & partition_id, DataPartsLock & data_parts_lock);
921
922 /// Return parts in the Committed set that are covered by the new_part_info or the part that covers it.
923 /// Will check that the new part doesn't already exist and that it doesn't intersect existing part.
924 DataPartsVector getActivePartsToReplace(
925 const MergeTreePartInfo & new_part_info,
926 const String & new_part_name,
927 DataPartPtr & out_covering_part,
928 DataPartsLock & data_parts_lock) const;
929
930 /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
931 bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
932
933 /// Common part for |freezePartition()| and |freezeAll()|.
934 using MatcherFn = std::function<bool(const DataPartPtr &)>;
935 void freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context);
936
937 bool canReplacePartition(const DataPartPtr & data_part) const;
938
939 void writePartLog(
940 PartLogElement::Type type,
941 const ExecutionStatus & execution_status,
942 UInt64 elapsed_ns,
943 const String & new_part_name,
944 const DataPartPtr & result_part,
945 const DataPartsVector & source_parts,
946 const MergeListEntry * merge_entry);
947
948 /// If part is assigned to merge or mutation (possibly replicated)
949 /// Should be overriden by childs, because they can have different
950 /// mechanisms for parts locking
951 virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
952
953 /// Moves part to specified space, used in ALTER ... MOVE ... queries
954 bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
955
956 /// Selects parts for move and moves them, used in background process
957 bool selectPartsAndMove();
958
959 bool areBackgroundMovesNeeded() const;
960
961private:
962 /// RAII Wrapper for atomic work with currently moving parts
963 /// Acuire them in constructor and remove them in destructor
964 /// Uses data.currently_moving_parts_mutex
965 struct CurrentlyMovingPartsTagger
966 {
967 MergeTreeMovingParts parts_to_move;
968 MergeTreeData & data;
969 CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_);
970
971 CurrentlyMovingPartsTagger(const CurrentlyMovingPartsTagger & other) = delete;
972 ~CurrentlyMovingPartsTagger();
973 };
974
975 /// Move selected parts to corresponding disks
976 bool moveParts(CurrentlyMovingPartsTagger && parts_to_move);
977
978 /// Select parts for move and disks for them. Used in background moving processes.
979 CurrentlyMovingPartsTagger selectPartsForMove();
980
981 /// Check selected parts for movements. Used by ALTER ... MOVE queries.
982 CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
983};
984
985}
986