1 | #pragma once |
2 | |
3 | #include <Common/SimpleIncrement.h> |
4 | #include <Interpreters/Context.h> |
5 | #include <Interpreters/ExpressionActions.h> |
6 | #include <Storages/IStorage.h> |
7 | #include <Storages/MergeTree/MergeTreeIndices.h> |
8 | #include <Storages/MergeTree/MergeTreePartInfo.h> |
9 | #include <Storages/MergeTree/MergeTreeSettings.h> |
10 | #include <Storages/MergeTree/MergeTreeMutationStatus.h> |
11 | #include <Storages/MergeTree/MergeList.h> |
12 | #include <Storages/MergeTree/PartDestinationType.h> |
13 | #include <IO/ReadBufferFromString.h> |
14 | #include <IO/WriteBufferFromFile.h> |
15 | #include <IO/ReadBufferFromFile.h> |
16 | #include <DataTypes/DataTypeString.h> |
17 | #include <DataTypes/DataTypesNumber.h> |
18 | #include <DataStreams/GraphiteRollupSortedBlockInputStream.h> |
19 | #include <Storages/MergeTree/MergeTreeDataPart.h> |
20 | #include <Storages/IndicesDescription.h> |
21 | #include <Storages/MergeTree/MergeTreePartsMover.h> |
22 | #include <Interpreters/PartLog.h> |
23 | #include <Disks/DiskSpaceMonitor.h> |
24 | |
25 | #include <boost/multi_index_container.hpp> |
26 | #include <boost/multi_index/ordered_index.hpp> |
27 | #include <boost/multi_index/global_fun.hpp> |
28 | #include <boost/range/iterator_range_core.hpp> |
29 | |
30 | |
31 | namespace DB |
32 | { |
33 | |
34 | class MergeListEntry; |
35 | class AlterCommands; |
36 | class MergeTreePartsMover; |
37 | |
38 | namespace ErrorCodes |
39 | { |
40 | extern const int LOGICAL_ERROR; |
41 | extern const int INVALID_PARTITION_NAME; |
42 | extern const int NO_SUCH_DATA_PART; |
43 | extern const int DUPLICATE_DATA_PART; |
44 | extern const int DIRECTORY_ALREADY_EXISTS; |
45 | extern const int TOO_MANY_UNEXPECTED_DATA_PARTS; |
46 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
47 | extern const int TABLE_DIFFERS_TOO_MUCH; |
48 | } |
49 | |
50 | |
51 | /// Data structure for *MergeTree engines. |
52 | /// Merge tree is used for incremental sorting of data. |
53 | /// The table consists of several sorted parts. |
54 | /// During insertion new data is sorted according to the primary key and is written to the new part. |
55 | /// Parts are merged in the background according to a heuristic algorithm. |
56 | /// For each part the index file is created containing primary key values for every n-th row. |
57 | /// This allows efficient selection by primary key range predicate. |
58 | /// |
59 | /// Additionally: |
60 | /// |
61 | /// The date column is specified. For each part min and max dates are remembered. |
62 | /// Essentially it is an index too. |
63 | /// |
64 | /// Data is partitioned by the value of the partitioning expression. |
65 | /// Parts belonging to different partitions are not merged - for the ease of administration (data sync and backup). |
66 | /// |
67 | /// File structure of old-style month-partitioned tables (format_version = 0): |
68 | /// Part directory - / min-date _ max-date _ min-id _ max-id _ level / |
69 | /// Inside the part directory: |
70 | /// checksums.txt - contains the list of all files along with their sizes and checksums. |
71 | /// columns.txt - contains the list of all columns and their types. |
72 | /// primary.idx - contains the primary index. |
73 | /// [Column].bin - contains compressed column data. |
74 | /// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows. |
75 | /// |
76 | /// File structure of tables with custom partitioning (format_version >= 1): |
77 | /// Part directory - / partiiton-id _ min-id _ max-id _ level / |
78 | /// Inside the part directory: |
79 | /// The same files as for month-partitioned tables, plus |
80 | /// count.txt - contains total number of rows in this part. |
81 | /// partition.dat - contains the value of the partitioning expression. |
82 | /// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression. |
83 | /// |
84 | /// Several modes are implemented. Modes determine additional actions during merge: |
85 | /// - Ordinary - don't do anything special |
86 | /// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values |
87 | /// of primary key (cf. CollapsingSortedBlockInputStream.h) |
88 | /// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version |
89 | /// column is set, keep the latest row with the maximal version. |
90 | /// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key. |
91 | /// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key. |
92 | /// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring). |
93 | |
94 | /// The MergeTreeData class contains a list of parts and the data structure parameters. |
95 | /// To read and modify the data use other classes: |
96 | /// - MergeTreeDataSelectExecutor |
97 | /// - MergeTreeDataWriter |
98 | /// - MergeTreeDataMergerMutator |
99 | |
100 | class MergeTreeData : public IStorage |
101 | { |
102 | public: |
103 | /// Function to call if the part is suspected to contain corrupt data. |
104 | using BrokenPartCallback = std::function<void (const String &)>; |
105 | using DataPart = MergeTreeDataPart; |
106 | |
107 | using MutableDataPartPtr = std::shared_ptr<DataPart>; |
108 | using MutableDataPartsVector = std::vector<MutableDataPartPtr>; |
109 | /// After the DataPart is added to the working set, it cannot be changed. |
110 | using DataPartPtr = std::shared_ptr<const DataPart>; |
111 | |
112 | using DataPartState = MergeTreeDataPart::State; |
113 | using DataPartStates = std::initializer_list<DataPartState>; |
114 | using DataPartStateVector = std::vector<DataPartState>; |
115 | |
116 | /// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo. |
117 | struct DataPartStateAndInfo |
118 | { |
119 | DataPartState state; |
120 | const MergeTreePartInfo & info; |
121 | }; |
122 | |
123 | /// Auxiliary structure for index comparison |
124 | struct DataPartStateAndPartitionID |
125 | { |
126 | DataPartState state; |
127 | String partition_id; |
128 | }; |
129 | |
130 | STRONG_TYPEDEF(String, PartitionID) |
131 | |
132 | struct LessDataPart |
133 | { |
134 | using is_transparent = void; |
135 | |
136 | bool operator()(const DataPartPtr & lhs, const MergeTreePartInfo & rhs) const { return lhs->info < rhs; } |
137 | bool operator()(const MergeTreePartInfo & lhs, const DataPartPtr & rhs) const { return lhs < rhs->info; } |
138 | bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; } |
139 | bool operator()(const MergeTreePartInfo & lhs, const PartitionID & rhs) const { return lhs.partition_id < rhs.toUnderType(); } |
140 | bool operator()(const PartitionID & lhs, const MergeTreePartInfo & rhs) const { return lhs.toUnderType() < rhs.partition_id; } |
141 | }; |
142 | |
143 | struct LessStateDataPart |
144 | { |
145 | using is_transparent = void; |
146 | |
147 | bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const |
148 | { |
149 | return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info) |
150 | < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info); |
151 | } |
152 | |
153 | bool operator() (DataPartStateAndInfo info, const DataPartState & state) const |
154 | { |
155 | return static_cast<size_t>(info.state) < static_cast<size_t>(state); |
156 | } |
157 | |
158 | bool operator() (const DataPartState & state, DataPartStateAndInfo info) const |
159 | { |
160 | return static_cast<size_t>(state) < static_cast<size_t>(info.state); |
161 | } |
162 | |
163 | bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndPartitionID & rhs) const |
164 | { |
165 | return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.partition_id) |
166 | < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.partition_id); |
167 | } |
168 | |
169 | bool operator() (const DataPartStateAndPartitionID & lhs, const DataPartStateAndInfo & rhs) const |
170 | { |
171 | return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.partition_id) |
172 | < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.partition_id); |
173 | } |
174 | }; |
175 | |
176 | using DataParts = std::set<DataPartPtr, LessDataPart>; |
177 | using DataPartsVector = std::vector<DataPartPtr>; |
178 | |
179 | using DataPartsLock = std::unique_lock<std::mutex>; |
180 | DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); } |
181 | |
182 | /// Auxiliary object to add a set of parts into the working set in two steps: |
183 | /// * First, as PreCommitted parts (the parts are ready, but not yet in the active set). |
184 | /// * Next, if commit() is called, the parts are added to the active set and the parts that are |
185 | /// covered by them are marked Outdated. |
186 | /// If neither commit() nor rollback() was called, the destructor rollbacks the operation. |
187 | class Transaction : private boost::noncopyable |
188 | { |
189 | public: |
190 | Transaction(MergeTreeData & data_) : data(data_) {} |
191 | |
192 | DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr); |
193 | |
194 | void rollback(); |
195 | |
196 | size_t size() const { return precommitted_parts.size(); } |
197 | bool isEmpty() const { return precommitted_parts.empty(); } |
198 | |
199 | ~Transaction() |
200 | { |
201 | try |
202 | { |
203 | rollback(); |
204 | } |
205 | catch (...) |
206 | { |
207 | tryLogCurrentException("~MergeTreeData::Transaction" ); |
208 | } |
209 | } |
210 | |
211 | private: |
212 | friend class MergeTreeData; |
213 | |
214 | MergeTreeData & data; |
215 | DataParts precommitted_parts; |
216 | |
217 | void clear() { precommitted_parts.clear(); } |
218 | }; |
219 | |
220 | /// An object that stores the names of temporary files created in the part directory during ALTER of its |
221 | /// columns. |
222 | class AlterDataPartTransaction : private boost::noncopyable |
223 | { |
224 | public: |
225 | /// Renames temporary files, finishing the ALTER of the part. |
226 | void commit(); |
227 | |
228 | /// If commit() was not called, deletes temporary files, canceling the ALTER. |
229 | ~AlterDataPartTransaction(); |
230 | |
231 | const String & getPartName() const { return data_part->name; } |
232 | |
233 | /// Review the changes before the commit. |
234 | const NamesAndTypesList & getNewColumns() const { return new_columns; } |
235 | const DataPart::Checksums & getNewChecksums() const { return new_checksums; } |
236 | |
237 | AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {} |
238 | const DataPartPtr & getDataPart() const { return data_part; } |
239 | bool isValid() const; |
240 | |
241 | private: |
242 | friend class MergeTreeData; |
243 | void clear(); |
244 | |
245 | bool valid = true; |
246 | |
247 | DataPartPtr data_part; |
248 | |
249 | DataPart::Checksums new_checksums; |
250 | NamesAndTypesList new_columns; |
251 | /// If the value is an empty string, the file is not temporary, and it must be deleted. |
252 | NameToNameMap rename_map; |
253 | }; |
254 | |
255 | using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>; |
256 | |
257 | struct PartsTemporaryRename : private boost::noncopyable |
258 | { |
259 | PartsTemporaryRename( |
260 | const MergeTreeData & storage_, |
261 | const String & source_dir_) |
262 | : storage(storage_) |
263 | , source_dir(source_dir_) |
264 | { |
265 | } |
266 | |
267 | void addPart(const String & old_name, const String & new_name); |
268 | |
269 | /// Renames part from old_name to new_name |
270 | void tryRenameAll(); |
271 | |
272 | /// Renames all added parts from new_name to old_name if old name is not empty |
273 | ~PartsTemporaryRename(); |
274 | |
275 | const MergeTreeData & storage; |
276 | const String source_dir; |
277 | std::vector<std::pair<String, String>> old_and_new_names; |
278 | std::unordered_map<String, String> old_part_name_to_full_path; |
279 | bool renamed = false; |
280 | }; |
281 | |
282 | /// Parameters for various modes. |
283 | struct MergingParams |
284 | { |
285 | /// Merging mode. See above. |
286 | enum Mode |
287 | { |
288 | Ordinary = 0, /// Enum values are saved. Do not change them. |
289 | Collapsing = 1, |
290 | Summing = 2, |
291 | Aggregating = 3, |
292 | Replacing = 5, |
293 | Graphite = 6, |
294 | VersionedCollapsing = 7, |
295 | }; |
296 | |
297 | Mode mode; |
298 | |
299 | /// For Collapsing and VersionedCollapsing mode. |
300 | String sign_column; |
301 | |
302 | /// For Summing mode. If empty - columns_to_sum is determined automatically. |
303 | Names columns_to_sum; |
304 | |
305 | /// For Replacing and VersionedCollapsing mode. Can be empty for Replacing. |
306 | String version_column; |
307 | |
308 | /// For Graphite mode. |
309 | Graphite::Params graphite_params; |
310 | |
311 | /// Check that needed columns are present and have correct types. |
312 | void check(const NamesAndTypesList & columns) const; |
313 | |
314 | String getModeName() const; |
315 | }; |
316 | |
317 | /// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns. |
318 | /// Correctness of names and paths is not checked. |
319 | /// |
320 | /// date_column_name - if not empty, the name of the Date column used for partitioning by month. |
321 | /// Otherwise, partition_by_ast is used for partitioning. |
322 | /// |
323 | /// order_by_ast - a single expression or a tuple. It is used as a sorting key |
324 | /// (an ASTExpressionList used for sorting data in parts); |
325 | /// primary_key_ast - can be nullptr, an expression, or a tuple. |
326 | /// Used to determine an ASTExpressionList values of which are written in the primary.idx file |
327 | /// for one row in every `index_granularity` rows to speed up range queries. |
328 | /// Primary key must be a prefix of the sorting key; |
329 | /// If it is nullptr, then it will be determined from order_by_ast. |
330 | /// |
331 | /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. |
332 | /// attach - whether the existing table is attached or the new table is created. |
333 | MergeTreeData(const String & database_, const String & table_, |
334 | const String & relative_data_path_, |
335 | const StorageInMemoryMetadata & metadata, |
336 | Context & context_, |
337 | const String & date_column_name, |
338 | const MergingParams & merging_params_, |
339 | std::unique_ptr<MergeTreeSettings> settings_, |
340 | bool require_part_metadata_, |
341 | bool attach, |
342 | BrokenPartCallback broken_part_callback_ = [](const String &){}); |
343 | |
344 | |
345 | StorageInMemoryMetadata getInMemoryMetadata() const override; |
346 | ASTPtr getPartitionKeyAST() const override { return partition_by_ast; } |
347 | ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; } |
348 | ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; } |
349 | ASTPtr getSamplingKeyAST() const override { return sample_by_ast; } |
350 | |
351 | Names getColumnsRequiredForPartitionKey() const override { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); } |
352 | Names getColumnsRequiredForSortingKey() const override { return sorting_key_expr->getRequiredColumns(); } |
353 | Names getColumnsRequiredForPrimaryKey() const override { return primary_key_expr->getRequiredColumns(); } |
354 | Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; } |
355 | Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); } |
356 | Names getSortingKeyColumns() const override { return sorting_key_columns; } |
357 | |
358 | StoragePolicyPtr getStoragePolicy() const override { return storage_policy; } |
359 | |
360 | bool supportsPrewhere() const override { return true; } |
361 | bool supportsSampling() const override { return sample_by_ast != nullptr; } |
362 | |
363 | bool supportsFinal() const override |
364 | { |
365 | return merging_params.mode == MergingParams::Collapsing |
366 | || merging_params.mode == MergingParams::Summing |
367 | || merging_params.mode == MergingParams::Aggregating |
368 | || merging_params.mode == MergingParams::Replacing |
369 | || merging_params.mode == MergingParams::VersionedCollapsing; |
370 | } |
371 | |
372 | bool supportsSettings() const override { return true; } |
373 | |
374 | bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override; |
375 | |
376 | NameAndTypePair getColumn(const String & column_name) const override |
377 | { |
378 | if (column_name == "_part" ) |
379 | return NameAndTypePair("_part" , std::make_shared<DataTypeString>()); |
380 | if (column_name == "_part_index" ) |
381 | return NameAndTypePair("_part_index" , std::make_shared<DataTypeUInt64>()); |
382 | if (column_name == "_partition_id" ) |
383 | return NameAndTypePair("_partition_id" , std::make_shared<DataTypeString>()); |
384 | if (column_name == "_sample_factor" ) |
385 | return NameAndTypePair("_sample_factor" , std::make_shared<DataTypeFloat64>()); |
386 | |
387 | return getColumns().getPhysical(column_name); |
388 | } |
389 | |
390 | bool hasColumn(const String & column_name) const override |
391 | { |
392 | return getColumns().hasPhysical(column_name) |
393 | || column_name == "_part" |
394 | || column_name == "_part_index" |
395 | || column_name == "_partition_id" |
396 | || column_name == "_sample_factor" ; |
397 | } |
398 | |
399 | String getDatabaseName() const override { return database_name; } |
400 | String getTableName() const override { return table_name; } |
401 | |
402 | /// Load the set of data parts from disk. Call once - immediately after the object is created. |
403 | void loadDataParts(bool skip_sanity_checks); |
404 | |
405 | String getLogName() const { return log_name; } |
406 | |
407 | Int64 getMaxBlockNumber() const; |
408 | |
409 | /// Returns a copy of the list so that the caller shouldn't worry about locks. |
410 | DataParts getDataParts(const DataPartStates & affordable_states) const; |
411 | /// Returns sorted list of the parts with specified states |
412 | /// out_states will contain snapshot of each part state |
413 | DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const; |
414 | |
415 | /// Returns absolutely all parts (and snapshot of their states) |
416 | DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const; |
417 | |
418 | /// Returns all detached parts |
419 | DetachedPartsInfo getDetachedParts() const; |
420 | |
421 | void validateDetachedPartName(const String & name) const; |
422 | |
423 | void dropDetached(const ASTPtr & partition, bool part, const Context & context); |
424 | |
425 | MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part, |
426 | const Context & context, PartsTemporaryRename & renamed_parts); |
427 | |
428 | /// Returns Committed parts |
429 | DataParts getDataParts() const; |
430 | DataPartsVector getDataPartsVector() const; |
431 | |
432 | /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr. |
433 | DataPartPtr getActiveContainingPart(const String & part_name) const; |
434 | DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const; |
435 | DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const; |
436 | |
437 | /// Swap part with it's identical copy (possible with another path on another disk). |
438 | /// If original part is not active or doesn't exist exception will be thrown. |
439 | void swapActivePart(MergeTreeData::DataPartPtr part_copy); |
440 | |
441 | /// Returns all parts in specified partition |
442 | DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id); |
443 | |
444 | /// Returns the part with the given name and state or nullptr if no such part. |
445 | DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); |
446 | DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states); |
447 | |
448 | /// Total size of active parts in bytes. |
449 | size_t getTotalActiveSizeInBytes() const; |
450 | |
451 | size_t getTotalActiveSizeInRows() const; |
452 | |
453 | size_t getPartsCount() const; |
454 | size_t getMaxPartsCountForPartition() const; |
455 | |
456 | /// Get min value of part->info.getDataVersion() for all active parts. |
457 | /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. |
458 | std::optional<Int64> getMinPartDataVersion() const; |
459 | |
460 | /// If the table contains too many active parts, sleep for a while to give them time to merge. |
461 | /// If until is non-null, wake up from the sleep earlier if the event happened. |
462 | void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const; |
463 | void throwInsertIfNeeded() const; |
464 | |
465 | /// Renames temporary part to a permanent part and adds it to the parts set. |
466 | /// It is assumed that the part does not intersect with existing parts. |
467 | /// If increment != nullptr, part index is determing using increment. Otherwise part index remains unchanged. |
468 | /// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the |
469 | /// active set later with out_transaction->commit()). |
470 | /// Else, commits the part immediately. |
471 | void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); |
472 | |
473 | /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts. |
474 | /// Returns all parts covered by the added part (in ascending order). |
475 | /// If out_transaction == nullptr, marks covered parts as Outdated. |
476 | DataPartsVector renameTempPartAndReplace( |
477 | MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); |
478 | |
479 | /// Low-level version of previous one, doesn't lock mutex |
480 | void renameTempPartAndReplace( |
481 | MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock, |
482 | DataPartsVector * out_covered_parts = nullptr); |
483 | |
484 | /// Removes parts from the working set parts. |
485 | /// Parts in add must already be in data_parts with PreCommitted, Committed, or Outdated states. |
486 | /// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to |
487 | /// clearOldParts (ignoring old_parts_lifetime). |
488 | void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr); |
489 | void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock); |
490 | |
491 | /// Removes all parts from the working set parts |
492 | /// for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block). |
493 | /// If a part intersecting drop_range.max_block is found, an exception will be thrown. |
494 | /// Used in REPLACE PARTITION command; |
495 | DataPartsVector removePartsInRangeFromWorkingSet(const MergeTreePartInfo & drop_range, bool clear_without_timeout, |
496 | bool skip_intersecting_parts, DataPartsLock & lock); |
497 | |
498 | /// Renames the part to detached/<prefix>_<part> and removes it from working set. |
499 | void removePartsFromWorkingSetAndCloneToDetached(const DataPartsVector & parts, bool clear_without_timeout, const String & prefix = "" ); |
500 | |
501 | /// Renames the part to detached/<prefix>_<part> and removes it from data_parts, |
502 | //// so it will not be deleted in clearOldParts. |
503 | /// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part. |
504 | void forgetPartAndMoveToDetached(const DataPartPtr & part, const String & prefix = "" , bool restore_covered = false); |
505 | |
506 | /// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory. |
507 | void tryRemovePartImmediately(DataPartPtr && part); |
508 | |
509 | /// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts |
510 | /// but not from the disk. |
511 | DataPartsVector grabOldParts(); |
512 | |
513 | /// Reverts the changes made by grabOldParts(), parts should be in Deleting state. |
514 | void rollbackDeletingParts(const DataPartsVector & parts); |
515 | |
516 | /// Removes parts from data_parts, they should be in Deleting state |
517 | void removePartsFinally(const DataPartsVector & parts); |
518 | |
519 | /// Delete irrelevant parts from memory and disk. |
520 | void clearOldPartsFromFilesystem(); |
521 | void clearPartsFromFilesystem(const DataPartsVector & parts); |
522 | |
523 | /// Delete all directories which names begin with "tmp" |
524 | /// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime |
525 | /// Must be called with locked lockStructureForShare(). |
526 | void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1); |
527 | |
528 | /// After the call to dropAllData() no method can be called. |
529 | /// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache. |
530 | void dropAllData(); |
531 | |
532 | /// Moves the entire data directory. |
533 | /// Flushes the uncompressed blocks cache and the marks cache. |
534 | /// Must be called with locked lockStructureForAlter(). |
535 | void rename(const String & new_path_to_table_data, const String & new_database_name, |
536 | const String & new_table_name, TableStructureWriteLockHolder &) override; |
537 | |
538 | /// Check if the ALTER can be performed: |
539 | /// - all needed columns are present. |
540 | /// - all type conversions can be done. |
541 | /// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected. |
542 | /// If something is wrong, throws an exception. |
543 | void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) override; |
544 | |
545 | /// Performs ALTER of the data part, writes the result to temporary files. |
546 | /// Returns an object allowing to rename temporary files to permanent files. |
547 | /// If the number of affected columns is suspiciously high and skip_sanity_checks is false, throws an exception. |
548 | /// If no data transformations are necessary, returns nullptr. |
549 | void alterDataPart( |
550 | const NamesAndTypesList & new_columns, |
551 | const IndicesASTs & new_indices, |
552 | bool skip_sanity_checks, |
553 | AlterDataPartTransactionPtr& transaction); |
554 | |
555 | /// Change MergeTreeSettings |
556 | void changeSettings( |
557 | const ASTPtr & new_changes, |
558 | TableStructureWriteLockHolder & table_lock_holder); |
559 | |
560 | /// Remove columns, that have been marked as empty after zeroing values with expired ttl |
561 | void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part); |
562 | |
563 | /// Freezes all parts. |
564 | void freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder); |
565 | |
566 | /// Should be called if part data is suspected to be corrupted. |
567 | void reportBrokenPart(const String & name) const |
568 | { |
569 | broken_part_callback(name); |
570 | } |
571 | |
572 | /** Get the key expression AST as an ASTExpressionList. |
573 | * It can be specified in the tuple: (CounterID, Date), |
574 | * or as one column: CounterID. |
575 | */ |
576 | static ASTPtr (const ASTPtr & node); |
577 | |
578 | bool hasSortingKey() const { return !sorting_key_columns.empty(); } |
579 | bool hasPrimaryKey() const { return !primary_key_columns.empty(); } |
580 | bool hasSkipIndices() const { return !skip_indices.empty(); } |
581 | bool hasTableTTL() const { return ttl_table_ast != nullptr; } |
582 | bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); } |
583 | |
584 | /// Check that the part is not broken and calculate the checksums for it if they are not present. |
585 | MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path); |
586 | void loadPartAndFixMetadata(MutableDataPartPtr part); |
587 | |
588 | /** Create local backup (snapshot) for parts with specified prefix. |
589 | * Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number, |
590 | * or if 'with_name' is specified - backup is created in directory with specified name. |
591 | */ |
592 | void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder); |
593 | |
594 | |
595 | public: |
596 | /// Moves partition to specified Disk |
597 | void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, const Context & context); |
598 | |
599 | /// Moves partition to specified Volume |
600 | void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, const Context & context); |
601 | |
602 | size_t getColumnCompressedSize(const std::string & name) const |
603 | { |
604 | auto lock = lockParts(); |
605 | const auto it = column_sizes.find(name); |
606 | return it == std::end(column_sizes) ? 0 : it->second.data_compressed; |
607 | } |
608 | |
609 | ColumnSizeByName getColumnSizes() const override |
610 | { |
611 | auto lock = lockParts(); |
612 | return column_sizes; |
613 | } |
614 | |
615 | /// Calculates column sizes in compressed form for the current state of data_parts. |
616 | void recalculateColumnSizes() |
617 | { |
618 | auto lock = lockParts(); |
619 | calculateColumnSizesImpl(); |
620 | } |
621 | |
622 | /// For ATTACH/DETACH/DROP PARTITION. |
623 | String getPartitionIDFromQuery(const ASTPtr & partition, const Context & context); |
624 | |
625 | /// Extracts MergeTreeData of other *MergeTree* storage |
626 | /// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM |
627 | /// Tables structure should be locked. |
628 | MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const; |
629 | |
630 | MergeTreeData::MutableDataPartPtr cloneAndLoadDataPartOnSameDisk( |
631 | const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info); |
632 | |
633 | virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0; |
634 | |
635 | /// Returns true if table can create new parts with adaptive granularity |
636 | /// Has additional constraint in replicated version |
637 | virtual bool canUseAdaptiveGranularity() const |
638 | { |
639 | const auto settings = getSettings(); |
640 | return settings->index_granularity_bytes != 0 && |
641 | (settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts); |
642 | } |
643 | |
644 | /// Get constant pointer to storage settings. |
645 | /// Copy this pointer into your scope and you will |
646 | /// get consistent settings. |
647 | MergeTreeSettingsPtr getSettings() const |
648 | { |
649 | return storage_settings.get(); |
650 | } |
651 | |
652 | /// Get table path on disk |
653 | String getFullPathOnDisk(const DiskPtr & disk) const; |
654 | |
655 | /// Get disk for part. Looping through directories on FS because some parts maybe not in |
656 | /// active dataparts set (detached) |
657 | DiskPtr getDiskForPart(const String & part_name, const String & relative_path = "" ) const; |
658 | |
659 | /// Get full path for part. Uses getDiskForPart and returns the full path |
660 | String getFullPathForPart(const String & part_name, const String & relative_path = "" ) const; |
661 | |
662 | Strings getDataPaths() const override; |
663 | |
664 | using PathWithDisk = std::pair<String, DiskPtr>; |
665 | using PathsWithDisks = std::vector<PathWithDisk>; |
666 | PathsWithDisks getDataPathsWithDisks() const; |
667 | |
668 | /// Reserves space at least 1MB. |
669 | ReservationPtr reserveSpace(UInt64 expected_size) const; |
670 | |
671 | /// Reserves space at least 1MB on specific disk or volume. |
672 | ReservationPtr reserveSpace(UInt64 expected_size, SpacePtr space) const; |
673 | ReservationPtr tryReserveSpace(UInt64 expected_size, SpacePtr space) const; |
674 | |
675 | /// Reserves space at least 1MB preferring best destination according to `ttl_infos`. |
676 | ReservationPtr reserveSpacePreferringTTLRules(UInt64 expected_size, |
677 | const MergeTreeDataPart::TTLInfos & ttl_infos, |
678 | time_t time_of_move) const; |
679 | ReservationPtr tryReserveSpacePreferringTTLRules(UInt64 expected_size, |
680 | const MergeTreeDataPart::TTLInfos & ttl_infos, |
681 | time_t time_of_move) const; |
682 | /// Choose disk with max available free space |
683 | /// Reserves 0 bytes |
684 | ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); } |
685 | |
686 | MergeTreeDataFormatVersion format_version; |
687 | |
688 | Context & global_context; |
689 | |
690 | /// Merging params - what additional actions to perform during merge. |
691 | const MergingParams merging_params; |
692 | |
693 | bool is_custom_partitioned = false; |
694 | ExpressionActionsPtr partition_key_expr; |
695 | Block partition_key_sample; |
696 | |
697 | ExpressionActionsPtr minmax_idx_expr; |
698 | Names minmax_idx_columns; |
699 | DataTypes minmax_idx_column_types; |
700 | Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column. |
701 | Int64 minmax_idx_time_column_pos = -1; /// In other cases, minmax index often includes a dateTime column. |
702 | |
703 | /// Secondary (data skipping) indices for MergeTree |
704 | MergeTreeIndices skip_indices; |
705 | |
706 | ExpressionActionsPtr primary_key_and_skip_indices_expr; |
707 | ExpressionActionsPtr sorting_key_and_skip_indices_expr; |
708 | |
709 | /// Names of columns for primary key + secondary sorting columns. |
710 | Names sorting_key_columns; |
711 | ASTPtr sorting_key_expr_ast; |
712 | ExpressionActionsPtr sorting_key_expr; |
713 | |
714 | /// Names of columns for primary key. |
715 | Names primary_key_columns; |
716 | ASTPtr primary_key_expr_ast; |
717 | ExpressionActionsPtr primary_key_expr; |
718 | Block primary_key_sample; |
719 | DataTypes primary_key_data_types; |
720 | |
721 | struct TTLEntry |
722 | { |
723 | ExpressionActionsPtr expression; |
724 | String result_column; |
725 | |
726 | /// Name and type of a destination are only valid in table-level context. |
727 | PartDestinationType destination_type; |
728 | String destination_name; |
729 | |
730 | ASTPtr entry_ast; |
731 | |
732 | /// Returns destination disk or volume for this rule. |
733 | SpacePtr getDestination(const StoragePolicyPtr & policy) const; |
734 | |
735 | /// Checks if given part already belongs destination disk or volume for this rule. |
736 | bool isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const; |
737 | }; |
738 | |
739 | const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; |
740 | |
741 | using TTLEntriesByName = std::unordered_map<String, TTLEntry>; |
742 | TTLEntriesByName column_ttl_entries_by_name; |
743 | |
744 | TTLEntry ttl_table_entry; |
745 | std::vector<TTLEntry> move_ttl_entries; |
746 | |
747 | String sampling_expr_column_name; |
748 | Names columns_required_for_sampling; |
749 | |
750 | /// Limiting parallel sends per one table, used in DataPartsExchange |
751 | std::atomic_uint current_table_sends {0}; |
752 | |
753 | /// For generating names of temporary parts during insertion. |
754 | SimpleIncrement insert_increment; |
755 | |
756 | bool has_non_adaptive_index_granularity_parts = false; |
757 | |
758 | /// Parts that currently moving from disk/volume to another. |
759 | /// This set have to be used with `currently_processing_in_background_mutex`. |
760 | /// Moving may conflict with merges and mutations, but this is OK, because |
761 | /// if we decide to move some part to another disk, than we |
762 | /// assuredly will choose this disk for containing part, which will appear |
763 | /// as result of merge or mutation. |
764 | DataParts currently_moving_parts; |
765 | |
766 | /// Mutex for currently_moving_parts |
767 | mutable std::mutex moving_parts_mutex; |
768 | |
769 | protected: |
770 | |
771 | friend struct MergeTreeDataPart; |
772 | friend class MergeTreeDataMergerMutator; |
773 | friend class ReplicatedMergeTreeAlterThread; |
774 | friend struct ReplicatedMergeTreeTableMetadata; |
775 | friend class StorageReplicatedMergeTree; |
776 | |
777 | ASTPtr partition_by_ast; |
778 | ASTPtr order_by_ast; |
779 | ASTPtr primary_key_ast; |
780 | ASTPtr sample_by_ast; |
781 | ASTPtr ttl_table_ast; |
782 | ASTPtr settings_ast; |
783 | |
784 | bool require_part_metadata; |
785 | |
786 | String database_name; |
787 | String table_name; |
788 | String relative_data_path; |
789 | |
790 | |
791 | /// Current column sizes in compressed and uncompressed form. |
792 | ColumnSizeByName column_sizes; |
793 | |
794 | /// Engine-specific methods |
795 | BrokenPartCallback broken_part_callback; |
796 | |
797 | String log_name; |
798 | Logger * log; |
799 | |
800 | /// Storage settings. |
801 | /// Use get and set to receive readonly versions. |
802 | MultiVersion<MergeTreeSettings> storage_settings; |
803 | |
804 | StoragePolicyPtr storage_policy; |
805 | |
806 | /// Work with data parts |
807 | |
808 | struct TagByInfo{}; |
809 | struct TagByStateAndInfo{}; |
810 | |
811 | static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part) |
812 | { |
813 | return part->info; |
814 | } |
815 | |
816 | static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part) |
817 | { |
818 | return {part->state, part->info}; |
819 | } |
820 | |
821 | using DataPartsIndexes = boost::multi_index_container<DataPartPtr, |
822 | boost::multi_index::indexed_by< |
823 | /// Index by Info |
824 | boost::multi_index::ordered_unique< |
825 | boost::multi_index::tag<TagByInfo>, |
826 | boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo> |
827 | >, |
828 | /// Index by (State, Info), is used to obtain ordered slices of parts with the same state |
829 | boost::multi_index::ordered_unique< |
830 | boost::multi_index::tag<TagByStateAndInfo>, |
831 | boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>, |
832 | LessStateDataPart |
833 | > |
834 | > |
835 | >; |
836 | |
837 | /// Current set of data parts. |
838 | mutable std::mutex data_parts_mutex; |
839 | DataPartsIndexes data_parts_indexes; |
840 | DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info; |
841 | DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info; |
842 | |
843 | MergeTreePartsMover parts_mover; |
844 | |
845 | using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator; |
846 | using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator; |
847 | |
848 | boost::iterator_range<DataPartIteratorByStateAndInfo> getDataPartsStateRange(DataPartState state) const |
849 | { |
850 | auto begin = data_parts_by_state_and_info.lower_bound(state, LessStateDataPart()); |
851 | auto end = data_parts_by_state_and_info.upper_bound(state, LessStateDataPart()); |
852 | return {begin, end}; |
853 | } |
854 | |
855 | boost::iterator_range<DataPartIteratorByInfo> getDataPartsPartitionRange(const String & partition_id) const |
856 | { |
857 | auto begin = data_parts_by_info.lower_bound(PartitionID(partition_id), LessDataPart()); |
858 | auto end = data_parts_by_info.upper_bound(PartitionID(partition_id), LessDataPart()); |
859 | return {begin, end}; |
860 | } |
861 | |
862 | static decltype(auto) getStateModifier(DataPartState state) |
863 | { |
864 | return [state] (const DataPartPtr & part) { part->state = state; }; |
865 | } |
866 | |
867 | void modifyPartState(DataPartIteratorByStateAndInfo it, DataPartState state) |
868 | { |
869 | if (!data_parts_by_state_and_info.modify(it, getStateModifier(state))) |
870 | throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); |
871 | } |
872 | |
873 | void modifyPartState(DataPartIteratorByInfo it, DataPartState state) |
874 | { |
875 | if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state))) |
876 | throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); |
877 | } |
878 | |
879 | void modifyPartState(const DataPartPtr & part, DataPartState state) |
880 | { |
881 | auto it = data_parts_by_info.find(part->info); |
882 | if (it == data_parts_by_info.end() || (*it).get() != part.get()) |
883 | throw Exception("Part " + part->name + " doesn't exist" , ErrorCodes::LOGICAL_ERROR); |
884 | |
885 | if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state))) |
886 | throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); |
887 | } |
888 | |
889 | /// Used to serialize calls to grabOldParts. |
890 | std::mutex grab_old_parts_mutex; |
891 | /// The same for clearOldTemporaryDirectories. |
892 | std::mutex clear_old_temporary_directories_mutex; |
893 | /// Mutex for settings usage |
894 | |
895 | void setProperties(const StorageInMemoryMetadata & metadata, bool only_check = false); |
896 | |
897 | void initPartitionKey(); |
898 | |
899 | void setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls, |
900 | const ASTPtr & new_ttl_table_ast, bool only_check = false); |
901 | |
902 | /// Expression for column type conversion. |
903 | /// If no conversions are needed, out_expression=nullptr. |
904 | /// out_rename_map maps column files for the out_expression onto new table files. |
905 | /// out_force_update_metadata denotes if metadata must be changed even if out_rename_map is empty (used |
906 | /// for transformation-free changing of Enum values list). |
907 | /// Files to be deleted are mapped to an empty string in out_rename_map. |
908 | /// If part == nullptr, just checks that all type conversions are possible. |
909 | void createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns, |
910 | const IndicesASTs & old_indices, const IndicesASTs & new_indices, |
911 | ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const; |
912 | |
913 | /// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked. |
914 | void calculateColumnSizesImpl(); |
915 | /// Adds or subtracts the contribution of the part to compressed column sizes. |
916 | void addPartContributionToColumnSizes(const DataPartPtr & part); |
917 | void removePartContributionToColumnSizes(const DataPartPtr & part); |
918 | |
919 | /// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock. |
920 | DataPartPtr getAnyPartInPartition(const String & partition_id, DataPartsLock & data_parts_lock); |
921 | |
922 | /// Return parts in the Committed set that are covered by the new_part_info or the part that covers it. |
923 | /// Will check that the new part doesn't already exist and that it doesn't intersect existing part. |
924 | DataPartsVector getActivePartsToReplace( |
925 | const MergeTreePartInfo & new_part_info, |
926 | const String & new_part_name, |
927 | DataPartPtr & out_covering_part, |
928 | DataPartsLock & data_parts_lock) const; |
929 | |
930 | /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument. |
931 | bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const; |
932 | |
933 | /// Common part for |freezePartition()| and |freezeAll()|. |
934 | using MatcherFn = std::function<bool(const DataPartPtr &)>; |
935 | void freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context); |
936 | |
937 | bool canReplacePartition(const DataPartPtr & data_part) const; |
938 | |
939 | void writePartLog( |
940 | PartLogElement::Type type, |
941 | const ExecutionStatus & execution_status, |
942 | UInt64 elapsed_ns, |
943 | const String & new_part_name, |
944 | const DataPartPtr & result_part, |
945 | const DataPartsVector & source_parts, |
946 | const MergeListEntry * merge_entry); |
947 | |
948 | /// If part is assigned to merge or mutation (possibly replicated) |
949 | /// Should be overriden by childs, because they can have different |
950 | /// mechanisms for parts locking |
951 | virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0; |
952 | |
953 | /// Moves part to specified space, used in ALTER ... MOVE ... queries |
954 | bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space); |
955 | |
956 | /// Selects parts for move and moves them, used in background process |
957 | bool selectPartsAndMove(); |
958 | |
959 | bool areBackgroundMovesNeeded() const; |
960 | |
961 | private: |
962 | /// RAII Wrapper for atomic work with currently moving parts |
963 | /// Acuire them in constructor and remove them in destructor |
964 | /// Uses data.currently_moving_parts_mutex |
965 | struct CurrentlyMovingPartsTagger |
966 | { |
967 | MergeTreeMovingParts parts_to_move; |
968 | MergeTreeData & data; |
969 | CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_); |
970 | |
971 | CurrentlyMovingPartsTagger(const CurrentlyMovingPartsTagger & other) = delete; |
972 | ~CurrentlyMovingPartsTagger(); |
973 | }; |
974 | |
975 | /// Move selected parts to corresponding disks |
976 | bool moveParts(CurrentlyMovingPartsTagger && parts_to_move); |
977 | |
978 | /// Select parts for move and disks for them. Used in background moving processes. |
979 | CurrentlyMovingPartsTagger selectPartsForMove(); |
980 | |
981 | /// Check selected parts for movements. Used by ALTER ... MOVE queries. |
982 | CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space); |
983 | }; |
984 | |
985 | } |
986 | |