1#include <Storages/MergeTree/MergeTreeData.h>
2#include <Interpreters/SyntaxAnalyzer.h>
3#include <Interpreters/ExpressionAnalyzer.h>
4#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
5#include <Storages/MergeTree/MergedBlockOutputStream.h>
6#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
7#include <Storages/MergeTree/checkDataPart.h>
8#include <Storages/StorageMergeTree.h>
9#include <Storages/StorageReplicatedMergeTree.h>
10#include <Storages/AlterCommands.h>
11#include <Parsers/ASTNameTypePair.h>
12#include <Parsers/ASTLiteral.h>
13#include <Parsers/ASTFunction.h>
14#include <Parsers/ASTPartition.h>
15#include <Parsers/ASTSetQuery.h>
16#include <Parsers/ExpressionListParsers.h>
17#include <Parsers/parseQuery.h>
18#include <Parsers/queryToString.h>
19#include <DataStreams/ExpressionBlockInputStream.h>
20#include <DataStreams/MarkInCompressedFile.h>
21#include <Formats/FormatFactory.h>
22#include <DataStreams/copyData.h>
23#include <IO/WriteBufferFromFile.h>
24#include <IO/WriteBufferFromString.h>
25#include <Compression/CompressedReadBuffer.h>
26#include <IO/ReadBufferFromMemory.h>
27#include <IO/ConcatReadBuffer.h>
28#include <IO/HexWriteBuffer.h>
29#include <IO/Operators.h>
30#include <DataTypes/DataTypeDate.h>
31#include <DataTypes/DataTypeDateTime.h>
32#include <DataTypes/DataTypeEnum.h>
33#include <DataTypes/NestedUtils.h>
34#include <DataTypes/DataTypeArray.h>
35#include <DataTypes/DataTypeNullable.h>
36#include <Functions/FunctionFactory.h>
37#include <Functions/IFunction.h>
38#include <Common/Increment.h>
39#include <Common/SimpleIncrement.h>
40#include <Common/escapeForFileName.h>
41#include <Common/quoteString.h>
42#include <Common/StringUtils/StringUtils.h>
43#include <Common/Stopwatch.h>
44#include <Common/typeid_cast.h>
45#include <Common/localBackup.h>
46#include <Interpreters/PartLog.h>
47
48#include <Poco/DirectoryIterator.h>
49
50#include <boost/range/adaptor/filtered.hpp>
51
52#include <algorithm>
53#include <iomanip>
54#include <optional>
55#include <set>
56#include <thread>
57#include <typeinfo>
58#include <typeindex>
59#include <unordered_set>
60
61
62namespace ProfileEvents
63{
64 extern const Event RejectedInserts;
65 extern const Event DelayedInserts;
66 extern const Event DelayedInsertsMilliseconds;
67}
68
69namespace CurrentMetrics
70{
71 extern const Metric DelayedInserts;
72}
73
74
75namespace
76{
77 constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; /// 1MB
78}
79
80
81namespace DB
82{
83
84namespace ErrorCodes
85{
86 extern const int BAD_ARGUMENTS;
87 extern const int MEMORY_LIMIT_EXCEEDED;
88 extern const int SYNTAX_ERROR;
89 extern const int INVALID_PARTITION_VALUE;
90 extern const int METADATA_MISMATCH;
91 extern const int PART_IS_TEMPORARILY_LOCKED;
92 extern const int TOO_MANY_PARTS;
93 extern const int INCOMPATIBLE_COLUMNS;
94 extern const int CANNOT_UPDATE_COLUMN;
95 extern const int CANNOT_ALLOCATE_MEMORY;
96 extern const int CANNOT_MUNMAP;
97 extern const int CANNOT_MREMAP;
98 extern const int BAD_TTL_EXPRESSION;
99 extern const int INCORRECT_FILE_NAME;
100 extern const int BAD_DATA_PART_NAME;
101 extern const int UNKNOWN_SETTING;
102 extern const int READONLY_SETTING;
103 extern const int ABORTED;
104}
105
106
107namespace
108{
109 const char * DELETE_ON_DESTROY_MARKER_PATH = "delete-on-destroy.txt";
110}
111
112
113MergeTreeData::MergeTreeData(
114 const String & database_,
115 const String & table_,
116 const String & relative_data_path_,
117 const StorageInMemoryMetadata & metadata,
118 Context & context_,
119 const String & date_column_name,
120 const MergingParams & merging_params_,
121 std::unique_ptr<MergeTreeSettings> storage_settings_,
122 bool require_part_metadata_,
123 bool attach,
124 BrokenPartCallback broken_part_callback_)
125 : global_context(context_)
126 , merging_params(merging_params_)
127 , partition_by_ast(metadata.partition_by_ast)
128 , sample_by_ast(metadata.sample_by_ast)
129 , settings_ast(metadata.settings_ast)
130 , require_part_metadata(require_part_metadata_)
131 , database_name(database_)
132 , table_name(table_)
133 , relative_data_path(relative_data_path_)
134 , broken_part_callback(broken_part_callback_)
135 , log_name(database_name + "." + table_name)
136 , log(&Logger::get(log_name))
137 , storage_settings(std::move(storage_settings_))
138 , storage_policy(context_.getStoragePolicy(getSettings()->storage_policy))
139 , data_parts_by_info(data_parts_indexes.get<TagByInfo>())
140 , data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
141 , parts_mover(this)
142{
143 const auto settings = getSettings();
144 setProperties(metadata);
145
146 /// NOTE: using the same columns list as is read when performing actual merges.
147 merging_params.check(getColumns().getAllPhysical());
148
149 if (sample_by_ast)
150 {
151 sampling_expr_column_name = sample_by_ast->getColumnName();
152
153 if (!primary_key_sample.has(sampling_expr_column_name)
154 && !attach && !settings->compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
155 throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
156
157 auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical());
158 columns_required_for_sampling = syntax->requiredSourceColumns();
159 }
160
161 MergeTreeDataFormatVersion min_format_version(0);
162 if (!date_column_name.empty())
163 {
164 try
165 {
166 partition_by_ast = makeASTFunction("toYYYYMM", std::make_shared<ASTIdentifier>(date_column_name));
167 initPartitionKey();
168
169 if (minmax_idx_date_column_pos == -1)
170 throw Exception("Could not find Date column", ErrorCodes::BAD_TYPE_OF_FIELD);
171 }
172 catch (Exception & e)
173 {
174 /// Better error message.
175 e.addMessage("(while initializing MergeTree partition key from date column " + backQuote(date_column_name) + ")");
176 throw;
177 }
178 }
179 else
180 {
181 is_custom_partitioned = true;
182 initPartitionKey();
183 min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
184 }
185
186 setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
187
188 // format_file always contained on any data path
189 String version_file_path;
190
191 /// Creating directories, if not exist.
192 auto paths = getDataPaths();
193 for (const String & path : paths)
194 {
195 Poco::File(path).createDirectories();
196 Poco::File(path + "detached").createDirectory();
197 if (Poco::File{path + "format_version.txt"}.exists())
198 {
199 if (!version_file_path.empty())
200 {
201 LOG_ERROR(log, "Duplication of version file " << version_file_path << " and " << path << "format_file.txt");
202 throw Exception("Multiple format_version.txt file", ErrorCodes::CORRUPTED_DATA);
203 }
204 version_file_path = path + "format_version.txt";
205 }
206 }
207
208 /// If not choose any
209 if (version_file_path.empty())
210 version_file_path = getFullPathOnDisk(storage_policy->getAnyDisk()) + "format_version.txt";
211
212 bool version_file_exists = Poco::File(version_file_path).exists();
213
214 // When data path or file not exists, ignore the format_version check
215 if (!attach || !version_file_exists)
216 {
217 format_version = min_format_version;
218 WriteBufferFromFile buf(version_file_path);
219 writeIntText(format_version.toUnderType(), buf);
220 }
221 else
222 {
223 ReadBufferFromFile buf(version_file_path);
224 UInt32 read_format_version;
225 readIntText(read_format_version, buf);
226 format_version = read_format_version;
227 if (!buf.eof())
228 throw Exception("Bad version file: " + version_file_path, ErrorCodes::CORRUPTED_DATA);
229 }
230
231 if (format_version < min_format_version)
232 {
233 if (min_format_version == MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING.toUnderType())
234 throw Exception(
235 "MergeTree data format version on disk doesn't support custom partitioning",
236 ErrorCodes::METADATA_MISMATCH);
237 }
238}
239
240
241StorageInMemoryMetadata MergeTreeData::getInMemoryMetadata() const
242{
243 StorageInMemoryMetadata metadata{
244 .columns = getColumns(),
245 .indices = getIndices(),
246 .constraints = getConstraints(),
247 };
248
249 if (partition_by_ast)
250 metadata.partition_by_ast = partition_by_ast->clone();
251
252 if (order_by_ast)
253 metadata.order_by_ast = order_by_ast->clone();
254
255 if (primary_key_ast)
256 metadata.primary_key_ast = primary_key_ast->clone();
257
258 if (ttl_table_ast)
259 metadata.ttl_for_table_ast = ttl_table_ast->clone();
260
261 if (sample_by_ast)
262 metadata.sample_by_ast = sample_by_ast->clone();
263
264 if (settings_ast)
265 metadata.settings_ast = settings_ast->clone();
266
267 return metadata;
268}
269
270static void checkKeyExpression(const ExpressionActions & expr, const Block & sample_block, const String & key_name)
271{
272 for (const ExpressionAction & action : expr.getActions())
273 {
274 if (action.type == ExpressionAction::ARRAY_JOIN)
275 throw Exception(key_name + " key cannot contain array joins", ErrorCodes::ILLEGAL_COLUMN);
276
277 if (action.type == ExpressionAction::APPLY_FUNCTION)
278 {
279 IFunctionBase & func = *action.function_base;
280 if (!func.isDeterministic())
281 throw Exception(key_name + " key cannot contain non-deterministic functions, "
282 "but contains function " + func.getName(),
283 ErrorCodes::BAD_ARGUMENTS);
284 }
285 }
286
287 for (const ColumnWithTypeAndName & element : sample_block)
288 {
289 const ColumnPtr & column = element.column;
290 if (column && (isColumnConst(*column) || column->isDummy()))
291 throw Exception{key_name + " key cannot contain constants", ErrorCodes::ILLEGAL_COLUMN};
292
293 if (element.type->isNullable())
294 throw Exception{key_name + " key cannot contain nullable columns", ErrorCodes::ILLEGAL_COLUMN};
295 }
296}
297
298void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool only_check)
299{
300 if (!metadata.order_by_ast)
301 throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
302
303 ASTPtr new_sorting_key_expr_list = extractKeyExpressionList(metadata.order_by_ast);
304 ASTPtr new_primary_key_expr_list = metadata.primary_key_ast
305 ? extractKeyExpressionList(metadata.primary_key_ast) : new_sorting_key_expr_list->clone();
306
307 if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
308 new_sorting_key_expr_list->children.push_back(std::make_shared<ASTIdentifier>(merging_params.version_column));
309
310 size_t primary_key_size = new_primary_key_expr_list->children.size();
311 size_t sorting_key_size = new_sorting_key_expr_list->children.size();
312 if (primary_key_size > sorting_key_size)
313 throw Exception("Primary key must be a prefix of the sorting key, but its length: "
314 + toString(primary_key_size) + " is greater than the sorting key length: " + toString(sorting_key_size),
315 ErrorCodes::BAD_ARGUMENTS);
316
317 Names new_primary_key_columns;
318 Names new_sorting_key_columns;
319
320 for (size_t i = 0; i < sorting_key_size; ++i)
321 {
322 String sorting_key_column = new_sorting_key_expr_list->children[i]->getColumnName();
323 new_sorting_key_columns.push_back(sorting_key_column);
324
325 if (i < primary_key_size)
326 {
327 String pk_column = new_primary_key_expr_list->children[i]->getColumnName();
328 if (pk_column != sorting_key_column)
329 throw Exception("Primary key must be a prefix of the sorting key, but in position "
330 + toString(i) + " its column is " + pk_column + ", not " + sorting_key_column,
331 ErrorCodes::BAD_ARGUMENTS);
332
333 new_primary_key_columns.push_back(pk_column);
334 }
335 }
336
337 auto all_columns = metadata.columns.getAllPhysical();
338
339 /// Order by check AST
340 if (order_by_ast && only_check)
341 {
342 /// This is ALTER, not CREATE/ATTACH TABLE. Let us check that all new columns used in the sorting key
343 /// expression have just been added (so that the sorting order is guaranteed to be valid with the new key).
344
345 ASTPtr added_key_column_expr_list = std::make_shared<ASTExpressionList>();
346 for (size_t new_i = 0, old_i = 0; new_i < sorting_key_size; ++new_i)
347 {
348 if (old_i < sorting_key_columns.size())
349 {
350 if (new_sorting_key_columns[new_i] != sorting_key_columns[old_i])
351 added_key_column_expr_list->children.push_back(new_sorting_key_expr_list->children[new_i]);
352 else
353 ++old_i;
354 }
355 else
356 added_key_column_expr_list->children.push_back(new_sorting_key_expr_list->children[new_i]);
357 }
358
359 if (!added_key_column_expr_list->children.empty())
360 {
361 auto syntax = SyntaxAnalyzer(global_context).analyze(added_key_column_expr_list, all_columns);
362 Names used_columns = syntax->requiredSourceColumns();
363
364 NamesAndTypesList deleted_columns;
365 NamesAndTypesList added_columns;
366 getColumns().getAllPhysical().getDifference(all_columns, deleted_columns, added_columns);
367
368 for (const String & col : used_columns)
369 {
370 if (!added_columns.contains(col) || deleted_columns.contains(col))
371 throw Exception("Existing column " + col + " is used in the expression that was "
372 "added to the sorting key. You can add expressions that use only the newly added columns",
373 ErrorCodes::BAD_ARGUMENTS);
374
375 if (metadata.columns.getDefaults().count(col))
376 throw Exception("Newly added column " + col + " has a default expression, so adding "
377 "expressions that use it to the sorting key is forbidden",
378 ErrorCodes::BAD_ARGUMENTS);
379 }
380 }
381 }
382
383 auto new_sorting_key_syntax = SyntaxAnalyzer(global_context).analyze(new_sorting_key_expr_list, all_columns);
384 auto new_sorting_key_expr = ExpressionAnalyzer(new_sorting_key_expr_list, new_sorting_key_syntax, global_context)
385 .getActions(false);
386 auto new_sorting_key_sample =
387 ExpressionAnalyzer(new_sorting_key_expr_list, new_sorting_key_syntax, global_context)
388 .getActions(true)->getSampleBlock();
389
390 checkKeyExpression(*new_sorting_key_expr, new_sorting_key_sample, "Sorting");
391
392 auto new_primary_key_syntax = SyntaxAnalyzer(global_context).analyze(new_primary_key_expr_list, all_columns);
393 auto new_primary_key_expr = ExpressionAnalyzer(new_primary_key_expr_list, new_primary_key_syntax, global_context)
394 .getActions(false);
395
396 Block new_primary_key_sample;
397 DataTypes new_primary_key_data_types;
398 for (size_t i = 0; i < primary_key_size; ++i)
399 {
400 const auto & elem = new_sorting_key_sample.getByPosition(i);
401 new_primary_key_sample.insert(elem);
402 new_primary_key_data_types.push_back(elem.type);
403 }
404
405 ASTPtr skip_indices_with_primary_key_expr_list = new_primary_key_expr_list->clone();
406 ASTPtr skip_indices_with_sorting_key_expr_list = new_sorting_key_expr_list->clone();
407
408 MergeTreeIndices new_indices;
409
410 if (!metadata.indices.indices.empty())
411 {
412 std::set<String> indices_names;
413
414 for (const auto & index_ast : metadata.indices.indices)
415 {
416 const auto & index_decl = std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast);
417
418 new_indices.push_back(
419 MergeTreeIndexFactory::instance().get(
420 all_columns,
421 std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl->clone()),
422 global_context));
423
424 if (indices_names.find(new_indices.back()->name) != indices_names.end())
425 throw Exception(
426 "Index with name " + backQuote(new_indices.back()->name) + " already exsists",
427 ErrorCodes::LOGICAL_ERROR);
428
429 ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(index_decl->expr->clone());
430 for (const auto & expr : expr_list->children)
431 {
432 skip_indices_with_primary_key_expr_list->children.push_back(expr->clone());
433 skip_indices_with_sorting_key_expr_list->children.push_back(expr->clone());
434 }
435
436 indices_names.insert(new_indices.back()->name);
437 }
438 }
439 auto syntax_primary = SyntaxAnalyzer(global_context, {}).analyze(
440 skip_indices_with_primary_key_expr_list, all_columns);
441 auto new_indices_with_primary_key_expr = ExpressionAnalyzer(
442 skip_indices_with_primary_key_expr_list, syntax_primary, global_context).getActions(false);
443
444 auto syntax_sorting = SyntaxAnalyzer(global_context, {}).analyze(
445 skip_indices_with_sorting_key_expr_list, all_columns);
446 auto new_indices_with_sorting_key_expr = ExpressionAnalyzer(
447 skip_indices_with_sorting_key_expr_list, syntax_sorting, global_context).getActions(false);
448
449 if (!only_check)
450 {
451 setColumns(std::move(metadata.columns));
452
453 order_by_ast = metadata.order_by_ast;
454 sorting_key_columns = std::move(new_sorting_key_columns);
455 sorting_key_expr_ast = std::move(new_sorting_key_expr_list);
456 sorting_key_expr = std::move(new_sorting_key_expr);
457
458 primary_key_ast = metadata.primary_key_ast;
459 primary_key_columns = std::move(new_primary_key_columns);
460 primary_key_expr_ast = std::move(new_primary_key_expr_list);
461 primary_key_expr = std::move(new_primary_key_expr);
462 primary_key_sample = std::move(new_primary_key_sample);
463 primary_key_data_types = std::move(new_primary_key_data_types);
464
465 setIndices(metadata.indices);
466 skip_indices = std::move(new_indices);
467
468 setConstraints(metadata.constraints);
469
470 primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
471 sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
472 }
473}
474
475
476ASTPtr MergeTreeData::extractKeyExpressionList(const ASTPtr & node)
477{
478 if (!node)
479 return std::make_shared<ASTExpressionList>();
480
481 const auto * expr_func = node->as<ASTFunction>();
482
483 if (expr_func && expr_func->name == "tuple")
484 {
485 /// Primary key is specified in tuple, extract its arguments.
486 return expr_func->arguments->clone();
487 }
488 else
489 {
490 /// Primary key consists of one column.
491 auto res = std::make_shared<ASTExpressionList>();
492 res->children.push_back(node);
493 return res;
494 }
495}
496
497
498void MergeTreeData::initPartitionKey()
499{
500 ASTPtr partition_key_expr_list = extractKeyExpressionList(partition_by_ast);
501
502 if (partition_key_expr_list->children.empty())
503 return;
504
505 {
506 auto syntax_result = SyntaxAnalyzer(global_context).analyze(partition_key_expr_list, getColumns().getAllPhysical());
507 partition_key_expr = ExpressionAnalyzer(partition_key_expr_list, syntax_result, global_context).getActions(false);
508 }
509
510 for (const ASTPtr & ast : partition_key_expr_list->children)
511 {
512 String col_name = ast->getColumnName();
513 partition_key_sample.insert(partition_key_expr->getSampleBlock().getByName(col_name));
514 }
515
516 checkKeyExpression(*partition_key_expr, partition_key_sample, "Partition");
517
518 /// Add all columns used in the partition key to the min-max index.
519 const NamesAndTypesList & minmax_idx_columns_with_types = partition_key_expr->getRequiredColumnsWithTypes();
520 minmax_idx_expr = std::make_shared<ExpressionActions>(minmax_idx_columns_with_types, global_context);
521 for (const NameAndTypePair & column : minmax_idx_columns_with_types)
522 {
523 minmax_idx_columns.emplace_back(column.name);
524 minmax_idx_column_types.emplace_back(column.type);
525 }
526
527 /// Try to find the date column in columns used by the partition key (a common case).
528 bool encountered_date_column = false;
529 for (size_t i = 0; i < minmax_idx_column_types.size(); ++i)
530 {
531 if (typeid_cast<const DataTypeDate *>(minmax_idx_column_types[i].get()))
532 {
533 if (!encountered_date_column)
534 {
535 minmax_idx_date_column_pos = i;
536 encountered_date_column = true;
537 }
538 else
539 {
540 /// There is more than one Date column in partition key and we don't know which one to choose.
541 minmax_idx_date_column_pos = -1;
542 }
543 }
544 }
545 if (!encountered_date_column)
546 {
547 for (size_t i = 0; i < minmax_idx_column_types.size(); ++i)
548 {
549 if (typeid_cast<const DataTypeDateTime *>(minmax_idx_column_types[i].get()))
550 {
551 if (!encountered_date_column)
552 {
553 minmax_idx_time_column_pos = i;
554 encountered_date_column = true;
555 }
556 else
557 {
558 /// There is more than one DateTime column in partition key and we don't know which one to choose.
559 minmax_idx_time_column_pos = -1;
560 }
561 }
562 }
563 }
564}
565
566namespace
567{
568
569void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const String & result_column_name)
570{
571 for (const auto & action : ttl_expression->getActions())
572 {
573 if (action.type == ExpressionAction::APPLY_FUNCTION)
574 {
575 IFunctionBase & func = *action.function_base;
576 if (!func.isDeterministic())
577 throw Exception("TTL expression cannot contain non-deterministic functions, "
578 "but contains function " + func.getName(), ErrorCodes::BAD_ARGUMENTS);
579 }
580 }
581
582 const auto & result_column = ttl_expression->getSampleBlock().getByName(result_column_name);
583
584 if (!typeid_cast<const DataTypeDateTime *>(result_column.type.get())
585 && !typeid_cast<const DataTypeDate *>(result_column.type.get()))
586 {
587 throw Exception("TTL expression result column should have DateTime or Date type, but has "
588 + result_column.type->getName(), ErrorCodes::BAD_TTL_EXPRESSION);
589 }
590}
591
592}
593
594
595void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls,
596 const ASTPtr & new_ttl_table_ast, bool only_check)
597{
598 auto create_ttl_entry = [this](ASTPtr ttl_ast)
599 {
600 TTLEntry result;
601
602 auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, getColumns().getAllPhysical());
603 result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
604 result.destination_type = PartDestinationType::DELETE;
605 result.result_column = ttl_ast->getColumnName();
606
607 checkTTLExpression(result.expression, result.result_column);
608 return result;
609 };
610
611 if (!new_column_ttls.empty())
612 {
613 NameSet columns_ttl_forbidden;
614
615 if (partition_key_expr)
616 for (const auto & col : partition_key_expr->getRequiredColumns())
617 columns_ttl_forbidden.insert(col);
618
619 if (sorting_key_expr)
620 for (const auto & col : sorting_key_expr->getRequiredColumns())
621 columns_ttl_forbidden.insert(col);
622
623 for (const auto & [name, ast] : new_column_ttls)
624 {
625 if (columns_ttl_forbidden.count(name))
626 throw Exception("Trying to set TTL for key column " + name, ErrorCodes::ILLEGAL_COLUMN);
627 else
628 {
629 auto new_ttl_entry = create_ttl_entry(ast);
630 if (!only_check)
631 column_ttl_entries_by_name.emplace(name, new_ttl_entry);
632 }
633 }
634 }
635
636 if (new_ttl_table_ast)
637 {
638 bool seen_delete_ttl = false;
639 for (auto ttl_element_ptr : new_ttl_table_ast->children)
640 {
641 ASTTTLElement & ttl_element = static_cast<ASTTTLElement &>(*ttl_element_ptr);
642 if (ttl_element.destination_type == PartDestinationType::DELETE)
643 {
644 if (seen_delete_ttl)
645 {
646 throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
647 }
648
649 auto new_ttl_table_entry = create_ttl_entry(ttl_element.children[0]);
650 if (!only_check)
651 {
652 ttl_table_ast = ttl_element.children[0];
653 ttl_table_entry = new_ttl_table_entry;
654 }
655
656 seen_delete_ttl = true;
657 }
658 else
659 {
660 auto new_ttl_entry = create_ttl_entry(ttl_element.children[0]);
661
662 new_ttl_entry.entry_ast = ttl_element_ptr;
663 new_ttl_entry.destination_type = ttl_element.destination_type;
664 new_ttl_entry.destination_name = ttl_element.destination_name;
665 if (!new_ttl_entry.getDestination(getStoragePolicy()))
666 {
667 String message;
668 if (new_ttl_entry.destination_type == PartDestinationType::DISK)
669 message = "No such disk " + backQuote(new_ttl_entry.destination_name) + " for given storage policy.";
670 else
671 message = "No such volume " + backQuote(new_ttl_entry.destination_name) + " for given storage policy.";
672 throw Exception(message, ErrorCodes::BAD_TTL_EXPRESSION);
673 }
674
675 if (!only_check)
676 {
677 move_ttl_entries.emplace_back(std::move(new_ttl_entry));
678 }
679 }
680 }
681 }
682}
683
684
685void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
686{
687 if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
688 throw Exception("Sign column for MergeTree cannot be specified in modes except Collapsing or VersionedCollapsing.",
689 ErrorCodes::LOGICAL_ERROR);
690
691 if (!version_column.empty() && mode != MergingParams::Replacing && mode != MergingParams::VersionedCollapsing)
692 throw Exception("Version column for MergeTree cannot be specified in modes except Replacing or VersionedCollapsing.",
693 ErrorCodes::LOGICAL_ERROR);
694
695 if (!columns_to_sum.empty() && mode != MergingParams::Summing)
696 throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
697 ErrorCodes::LOGICAL_ERROR);
698
699 /// Check that if the sign column is needed, it exists and is of type Int8.
700 auto check_sign_column = [this, & columns](bool is_optional, const std::string & storage)
701 {
702 if (sign_column.empty())
703 {
704 if (is_optional)
705 return;
706
707 throw Exception("Logical error: Sign column for storage " + storage + " is empty", ErrorCodes::LOGICAL_ERROR);
708 }
709
710 bool miss_column = true;
711 for (const auto & column : columns)
712 {
713 if (column.name == sign_column)
714 {
715 if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
716 throw Exception("Sign column (" + sign_column + ") for storage " + storage + " must have type Int8."
717 " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
718 miss_column = false;
719 break;
720 }
721 }
722 if (miss_column)
723 throw Exception("Sign column " + sign_column + " does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
724 };
725
726 /// that if the version_column column is needed, it exists and is of unsigned integer type.
727 auto check_version_column = [this, & columns](bool is_optional, const std::string & storage)
728 {
729 if (version_column.empty())
730 {
731 if (is_optional)
732 return;
733
734 throw Exception("Logical error: Version column for storage " + storage + " is empty", ErrorCodes::LOGICAL_ERROR);
735 }
736
737 bool miss_column = true;
738 for (const auto & column : columns)
739 {
740 if (column.name == version_column)
741 {
742 if (!column.type->canBeUsedAsVersion())
743 throw Exception("The column " + version_column +
744 " cannot be used as a version column for storage " + storage +
745 " because it is of type " + column.type->getName() +
746 " (must be of an integer type or of type Date or DateTime)", ErrorCodes::BAD_TYPE_OF_FIELD);
747 miss_column = false;
748 break;
749 }
750 }
751 if (miss_column)
752 throw Exception("Version column " + version_column + " does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
753 };
754
755 if (mode == MergingParams::Collapsing)
756 check_sign_column(false, "CollapsingMergeTree");
757
758 if (mode == MergingParams::Summing)
759 {
760 /// If columns_to_sum are set, then check that such columns exist.
761 for (const auto & column_to_sum : columns_to_sum)
762 {
763 auto check_column_to_sum_exists = [& column_to_sum](const NameAndTypePair & name_and_type)
764 {
765 return column_to_sum == Nested::extractTableName(name_and_type.name);
766 };
767 if (columns.end() == std::find_if(columns.begin(), columns.end(), check_column_to_sum_exists))
768 throw Exception(
769 "Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
770 }
771 }
772
773 if (mode == MergingParams::Replacing)
774 check_version_column(true, "ReplacingMergeTree");
775
776 if (mode == MergingParams::VersionedCollapsing)
777 {
778 check_sign_column(false, "VersionedCollapsingMergeTree");
779 check_version_column(false, "VersionedCollapsingMergeTree");
780 }
781
782 /// TODO Checks for Graphite mode.
783}
784
785
786String MergeTreeData::MergingParams::getModeName() const
787{
788 switch (mode)
789 {
790 case Ordinary: return "";
791 case Collapsing: return "Collapsing";
792 case Summing: return "Summing";
793 case Aggregating: return "Aggregating";
794 case Replacing: return "Replacing";
795 case Graphite: return "Graphite";
796 case VersionedCollapsing: return "VersionedCollapsing";
797 }
798
799 __builtin_unreachable();
800}
801
802
803Int64 MergeTreeData::getMaxBlockNumber() const
804{
805 auto lock = lockParts();
806
807 Int64 max_block_num = 0;
808 for (const DataPartPtr & part : data_parts_by_info)
809 max_block_num = std::max({max_block_num, part->info.max_block, part->info.mutation});
810
811 return max_block_num;
812}
813
814
815void MergeTreeData::loadDataParts(bool skip_sanity_checks)
816{
817 LOG_DEBUG(log, "Loading data parts");
818
819 const auto settings = getSettings();
820 std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
821 Strings part_file_names;
822 Poco::DirectoryIterator end;
823
824 auto disks = storage_policy->getDisks();
825
826 /// Only check if user did touch storage configuration for this table.
827 if (!getStoragePolicy()->isDefaultPolicy() && !skip_sanity_checks)
828 {
829 /// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
830 std::unordered_set<String> defined_disk_names;
831 for (const auto & disk_ptr : disks)
832 defined_disk_names.insert(disk_ptr->getName());
833
834 for (auto & [disk_name, disk_ptr] : global_context.getDiskSelector().getDisksMap())
835 {
836 if (defined_disk_names.count(disk_name) == 0 && Poco::File(getFullPathOnDisk(disk_ptr)).exists())
837 {
838 for (Poco::DirectoryIterator it(getFullPathOnDisk(disk_ptr)); it != end; ++it)
839 {
840 MergeTreePartInfo part_info;
841 if (MergeTreePartInfo::tryParsePartName(it.name(), &part_info, format_version))
842 throw Exception("Part " + backQuote(it.name()) + " was found on disk " + backQuote(disk_name) + " which is not defined in the storage policy", ErrorCodes::UNKNOWN_DISK);
843 }
844 }
845 }
846 }
847
848 /// Reversed order to load part from low priority disks firstly.
849 /// Used for keep part on low priority disk if duplication found
850 for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
851 {
852 auto disk_ptr = *disk_it;
853 for (Poco::DirectoryIterator it(getFullPathOnDisk(disk_ptr)); it != end; ++it)
854 {
855 /// Skip temporary directories.
856 if (startsWith(it.name(), "tmp"))
857 continue;
858
859 part_names_with_disks.emplace_back(it.name(), disk_ptr);
860 }
861 }
862
863 auto part_lock = lockParts();
864 data_parts_indexes.clear();
865
866 if (part_names_with_disks.empty())
867 {
868 LOG_DEBUG(log, "There is no data parts");
869 return;
870 }
871
872 /// Parallel loading of data parts.
873 size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_names_with_disks.size());
874
875 std::mutex mutex;
876
877 DataPartsVector broken_parts_to_remove;
878 DataPartsVector broken_parts_to_detach;
879 size_t suspicious_broken_parts = 0;
880
881 std::atomic<bool> has_adaptive_parts = false;
882 std::atomic<bool> has_non_adaptive_parts = false;
883
884 ThreadPool pool(num_threads);
885
886 for (size_t i = 0; i < part_names_with_disks.size(); ++i)
887 {
888 pool.scheduleOrThrowOnError([&, i]
889 {
890 const auto & part_name = part_names_with_disks[i].first;
891 const auto part_disk_ptr = part_names_with_disks[i].second;
892 MergeTreePartInfo part_info;
893 if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
894 return;
895
896 MutableDataPartPtr part = std::make_shared<DataPart>(*this, part_disk_ptr, part_name, part_info);
897 part->relative_path = part_name;
898 bool broken = false;
899
900 Poco::Path part_path(getFullPathOnDisk(part_disk_ptr), part_name);
901 Poco::Path marker_path(part_path, DELETE_ON_DESTROY_MARKER_PATH);
902 if (Poco::File(marker_path).exists())
903 {
904 LOG_WARNING(log, "Detaching stale part " << getFullPathOnDisk(part_disk_ptr) << part_name << ", which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.");
905 std::lock_guard loading_lock(mutex);
906 broken_parts_to_detach.push_back(part);
907 ++suspicious_broken_parts;
908 return;
909 }
910
911 try
912 {
913 part->loadColumnsChecksumsIndexes(require_part_metadata, true);
914 }
915 catch (const Exception & e)
916 {
917 /// Don't count the part as broken if there is not enough memory to load it.
918 /// In fact, there can be many similar situations.
919 /// But it is OK, because there is a safety guard against deleting too many parts.
920 if (e.code() == ErrorCodes::MEMORY_LIMIT_EXCEEDED
921 || e.code() == ErrorCodes::CANNOT_ALLOCATE_MEMORY
922 || e.code() == ErrorCodes::CANNOT_MUNMAP
923 || e.code() == ErrorCodes::CANNOT_MREMAP)
924 throw;
925
926 broken = true;
927 tryLogCurrentException(__PRETTY_FUNCTION__);
928 }
929 catch (...)
930 {
931 broken = true;
932 tryLogCurrentException(__PRETTY_FUNCTION__);
933 }
934
935 /// Ignore and possibly delete broken parts that can appear as a result of hard server restart.
936 if (broken)
937 {
938 if (part->info.level == 0)
939 {
940 /// It is impossible to restore level 0 parts.
941 LOG_ERROR(log, "Considering to remove broken part " << getFullPathOnDisk(part_disk_ptr) << part_name << " because it's impossible to repair.");
942 std::lock_guard loading_lock(mutex);
943 broken_parts_to_remove.push_back(part);
944 }
945 else
946 {
947 /// Count the number of parts covered by the broken part. If it is at least two, assume that
948 /// the broken part was created as a result of merging them and we won't lose data if we
949 /// delete it.
950 size_t contained_parts = 0;
951
952 LOG_ERROR(log, "Part " << getFullPathOnDisk(part_disk_ptr) << part_name << " is broken. Looking for parts to replace it.");
953
954 for (const auto & [contained_name, contained_disk_ptr] : part_names_with_disks)
955 {
956 if (contained_name == part_name)
957 continue;
958
959 MergeTreePartInfo contained_part_info;
960 if (!MergeTreePartInfo::tryParsePartName(contained_name, &contained_part_info, format_version))
961 continue;
962
963 if (part->info.contains(contained_part_info))
964 {
965 LOG_ERROR(log, "Found part " << getFullPathOnDisk(contained_disk_ptr) << contained_name);
966 ++contained_parts;
967 }
968 }
969
970 if (contained_parts >= 2)
971 {
972 LOG_ERROR(log, "Considering to remove broken part " << getFullPathOnDisk(part_disk_ptr) << part_name << " because it covers at least 2 other parts");
973 std::lock_guard loading_lock(mutex);
974 broken_parts_to_remove.push_back(part);
975 }
976 else
977 {
978 LOG_ERROR(log, "Detaching broken part " << getFullPathOnDisk(part_disk_ptr) << part_name
979 << " because it covers less than 2 parts. You need to resolve this manually");
980 std::lock_guard loading_lock(mutex);
981 broken_parts_to_detach.push_back(part);
982 ++suspicious_broken_parts;
983 }
984 }
985
986 return;
987 }
988 if (!part->index_granularity_info.is_adaptive)
989 has_non_adaptive_parts.store(true, std::memory_order_relaxed);
990 else
991 has_adaptive_parts.store(true, std::memory_order_relaxed);
992
993 part->modification_time = Poco::File(getFullPathOnDisk(part_disk_ptr) + part_name).getLastModified().epochTime();
994 /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
995 part->state = DataPartState::Committed;
996
997 std::lock_guard loading_lock(mutex);
998 if (!data_parts_indexes.insert(part).second)
999 throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
1000 });
1001 }
1002
1003 pool.wait();
1004
1005 if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
1006 throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
1007
1008 has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
1009
1010 if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
1011 throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
1012 ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
1013
1014 for (auto & part : broken_parts_to_remove)
1015 part->remove();
1016 for (auto & part : broken_parts_to_detach)
1017 part->renameToDetached("");
1018
1019 /// Delete from the set of current parts those parts that are covered by another part (those parts that
1020 /// were merged), but that for some reason are still not deleted from the filesystem.
1021 /// Deletion of files will be performed later in the clearOldParts() method.
1022
1023 if (data_parts_indexes.size() >= 2)
1024 {
1025 /// Now all parts are committed, so data_parts_by_state_and_info == committed_parts_range
1026 auto prev_jt = data_parts_by_state_and_info.begin();
1027 auto curr_jt = std::next(prev_jt);
1028
1029 auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
1030 {
1031 (*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
1032 modifyPartState(it, DataPartState::Outdated);
1033 };
1034
1035 (*prev_jt)->assertState({DataPartState::Committed});
1036
1037 while (curr_jt != data_parts_by_state_and_info.end() && (*curr_jt)->state == DataPartState::Committed)
1038 {
1039 /// Don't consider data parts belonging to different partitions.
1040 if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
1041 {
1042 ++prev_jt;
1043 ++curr_jt;
1044 continue;
1045 }
1046
1047 if ((*curr_jt)->contains(**prev_jt))
1048 {
1049 deactivate_part(prev_jt);
1050 prev_jt = curr_jt;
1051 ++curr_jt;
1052 }
1053 else if ((*prev_jt)->contains(**curr_jt))
1054 {
1055 auto next = std::next(curr_jt);
1056 deactivate_part(curr_jt);
1057 curr_jt = next;
1058 }
1059 else
1060 {
1061 ++prev_jt;
1062 ++curr_jt;
1063 }
1064 }
1065 }
1066
1067 calculateColumnSizesImpl();
1068
1069 LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)");
1070}
1071
1072
1073/// Is the part directory old.
1074/// True if its modification time and the modification time of all files inside it is less then threshold.
1075/// (Only files on the first level of nesting are considered).
1076static bool isOldPartDirectory(Poco::File & directory, time_t threshold)
1077{
1078 if (directory.getLastModified().epochTime() >= threshold)
1079 return false;
1080
1081 Poco::DirectoryIterator end;
1082 for (Poco::DirectoryIterator it(directory); it != end; ++it)
1083 if (it->getLastModified().epochTime() >= threshold)
1084 return false;
1085
1086 return true;
1087}
1088
1089
1090void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds)
1091{
1092 /// If the method is already called from another thread, then we don't need to do anything.
1093 std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
1094 if (!lock.try_lock())
1095 return;
1096
1097 const auto settings = getSettings();
1098 time_t current_time = time(nullptr);
1099 ssize_t deadline = (custom_directories_lifetime_seconds >= 0)
1100 ? current_time - custom_directories_lifetime_seconds
1101 : current_time - settings->temporary_directories_lifetime.totalSeconds();
1102
1103 const auto full_paths = getDataPaths();
1104
1105 /// Delete temporary directories older than a day.
1106 Poco::DirectoryIterator end;
1107 for (auto && full_data_path : full_paths)
1108 {
1109 for (Poco::DirectoryIterator it{full_data_path}; it != end; ++it)
1110 {
1111 if (startsWith(it.name(), "tmp_"))
1112 {
1113 Poco::File tmp_dir(full_data_path + it.name());
1114
1115 try
1116 {
1117 if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline))
1118 {
1119 LOG_WARNING(log, "Removing temporary directory " << full_data_path << it.name());
1120 Poco::File(full_data_path + it.name()).remove(true);
1121 }
1122 }
1123 catch (const Poco::FileNotFoundException &)
1124 {
1125 /// If the file is already deleted, do nothing.
1126 }
1127 }
1128 }
1129 }
1130}
1131
1132
1133MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
1134{
1135 DataPartsVector res;
1136
1137 /// If the method is already called from another thread, then we don't need to do anything.
1138 std::unique_lock lock(grab_old_parts_mutex, std::defer_lock);
1139 if (!lock.try_lock())
1140 return res;
1141
1142 time_t now = time(nullptr);
1143 std::vector<DataPartIteratorByStateAndInfo> parts_to_delete;
1144
1145 {
1146 auto parts_lock = lockParts();
1147
1148 auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
1149 for (auto it = outdated_parts_range.begin(); it != outdated_parts_range.end(); ++it)
1150 {
1151 const DataPartPtr & part = *it;
1152
1153 auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
1154
1155 if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
1156 part_remove_time < now &&
1157 now - part_remove_time > getSettings()->old_parts_lifetime.totalSeconds())
1158 {
1159 parts_to_delete.emplace_back(it);
1160 }
1161 }
1162
1163 res.reserve(parts_to_delete.size());
1164 for (const auto & it_to_delete : parts_to_delete)
1165 {
1166 res.emplace_back(*it_to_delete);
1167 modifyPartState(it_to_delete, DataPartState::Deleting);
1168 }
1169 }
1170
1171 if (!res.empty())
1172 LOG_TRACE(log, "Found " << res.size() << " old parts to remove.");
1173
1174 return res;
1175}
1176
1177
1178void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & parts)
1179{
1180 auto lock = lockParts();
1181 for (auto & part : parts)
1182 {
1183 /// We should modify it under data_parts_mutex
1184 part->assertState({DataPartState::Deleting});
1185 modifyPartState(part, DataPartState::Outdated);
1186 }
1187}
1188
1189void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts)
1190{
1191 {
1192 auto lock = lockParts();
1193
1194 /// TODO: use data_parts iterators instead of pointers
1195 for (auto & part : parts)
1196 {
1197 auto it = data_parts_by_info.find(part->info);
1198 if (it == data_parts_by_info.end())
1199 throw Exception("Deleting data part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
1200
1201 (*it)->assertState({DataPartState::Deleting});
1202
1203 data_parts_indexes.erase(it);
1204 }
1205 }
1206
1207 /// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging
1208 /// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away
1209 if (auto part_log = global_context.getPartLog(database_name))
1210 {
1211 PartLogElement part_log_elem;
1212
1213 part_log_elem.event_type = PartLogElement::REMOVE_PART;
1214 part_log_elem.event_time = time(nullptr);
1215 part_log_elem.duration_ms = 0;
1216
1217 part_log_elem.database_name = database_name;
1218 part_log_elem.table_name = table_name;
1219
1220 for (auto & part : parts)
1221 {
1222 part_log_elem.partition_id = part->info.partition_id;
1223 part_log_elem.part_name = part->name;
1224 part_log_elem.bytes_compressed_on_disk = part->bytes_on_disk;
1225 part_log_elem.rows = part->rows_count;
1226
1227 part_log->add(part_log_elem);
1228 }
1229 }
1230}
1231
1232void MergeTreeData::clearOldPartsFromFilesystem()
1233{
1234 DataPartsVector parts_to_remove = grabOldParts();
1235 clearPartsFromFilesystem(parts_to_remove);
1236 removePartsFinally(parts_to_remove);
1237}
1238
1239void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove)
1240{
1241 const auto settings = getSettings();
1242 if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
1243 {
1244 /// Parallel parts removal.
1245
1246 size_t num_threads = std::min(size_t(settings->max_part_removal_threads), parts_to_remove.size());
1247 ThreadPool pool(num_threads);
1248
1249 /// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
1250 for (const DataPartPtr & part : parts_to_remove)
1251 {
1252 pool.scheduleOrThrowOnError([&]
1253 {
1254 LOG_DEBUG(log, "Removing part from filesystem " << part->name);
1255 part->remove();
1256 });
1257 }
1258
1259 pool.wait();
1260 }
1261 else
1262 {
1263 for (const DataPartPtr & part : parts_to_remove)
1264 {
1265 LOG_DEBUG(log, "Removing part from filesystem " << part->name);
1266 part->remove();
1267 }
1268 }
1269}
1270
1271void MergeTreeData::rename(
1272 const String & new_table_path, const String & new_database_name,
1273 const String & new_table_name, TableStructureWriteLockHolder &)
1274{
1275 auto disks = storage_policy->getDisks();
1276
1277 for (const auto & disk : disks)
1278 {
1279 if (disk->exists(new_table_path))
1280 throw Exception{"Target path already exists: " + fullPath(disk, new_table_path), ErrorCodes::DIRECTORY_ALREADY_EXISTS};
1281 }
1282
1283 for (const auto & disk : disks)
1284 {
1285 auto new_table_path_parent = Poco::Path(new_table_path).makeParent().toString();
1286 disk->createDirectory(new_table_path_parent);
1287 disk->moveFile(relative_data_path, new_table_path);
1288 }
1289
1290 global_context.dropCaches();
1291
1292 relative_data_path = new_table_path;
1293 table_name = new_table_name;
1294 database_name = new_database_name;
1295}
1296
1297void MergeTreeData::dropAllData()
1298{
1299 LOG_TRACE(log, "dropAllData: waiting for locks.");
1300
1301 auto lock = lockParts();
1302
1303 LOG_TRACE(log, "dropAllData: removing data from memory.");
1304
1305 DataPartsVector all_parts(data_parts_by_info.begin(), data_parts_by_info.end());
1306
1307 data_parts_indexes.clear();
1308 column_sizes.clear();
1309
1310 global_context.dropCaches();
1311
1312 LOG_TRACE(log, "dropAllData: removing data from filesystem.");
1313
1314 /// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
1315 clearPartsFromFilesystem(all_parts);
1316
1317 auto full_paths = getDataPaths();
1318
1319 for (auto && full_data_path : full_paths)
1320 Poco::File(full_data_path).remove(true);
1321
1322 LOG_TRACE(log, "dropAllData: done.");
1323}
1324
1325namespace
1326{
1327
1328/// If true, then in order to ALTER the type of the column from the type from to the type to
1329/// we don't need to rewrite the data, we only need to update metadata and columns.txt in part directories.
1330/// The function works for Arrays and Nullables of the same structure.
1331bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
1332{
1333 if (from->getName() == to->getName())
1334 return true;
1335
1336 static const std::unordered_multimap<std::type_index, const std::type_info &> ALLOWED_CONVERSIONS =
1337 {
1338 { typeid(DataTypeEnum8), typeid(DataTypeEnum8) },
1339 { typeid(DataTypeEnum8), typeid(DataTypeInt8) },
1340 { typeid(DataTypeEnum16), typeid(DataTypeEnum16) },
1341 { typeid(DataTypeEnum16), typeid(DataTypeInt16) },
1342 { typeid(DataTypeDateTime), typeid(DataTypeUInt32) },
1343 { typeid(DataTypeUInt32), typeid(DataTypeDateTime) },
1344 { typeid(DataTypeDate), typeid(DataTypeUInt16) },
1345 { typeid(DataTypeUInt16), typeid(DataTypeDate) },
1346 };
1347
1348 while (true)
1349 {
1350 auto it_range = ALLOWED_CONVERSIONS.equal_range(typeid(*from));
1351 for (auto it = it_range.first; it != it_range.second; ++it)
1352 {
1353 if (it->second == typeid(*to))
1354 return true;
1355 }
1356
1357 const auto * arr_from = typeid_cast<const DataTypeArray *>(from);
1358 const auto * arr_to = typeid_cast<const DataTypeArray *>(to);
1359 if (arr_from && arr_to)
1360 {
1361 from = arr_from->getNestedType().get();
1362 to = arr_to->getNestedType().get();
1363 continue;
1364 }
1365
1366 const auto * nullable_from = typeid_cast<const DataTypeNullable *>(from);
1367 const auto * nullable_to = typeid_cast<const DataTypeNullable *>(to);
1368 if (nullable_from && nullable_to)
1369 {
1370 from = nullable_from->getNestedType().get();
1371 to = nullable_to->getNestedType().get();
1372 continue;
1373 }
1374
1375 return false;
1376 }
1377}
1378
1379}
1380
1381void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const Settings & settings)
1382{
1383 /// Check that needed transformations can be applied to the list of columns without considering type conversions.
1384 StorageInMemoryMetadata metadata = getInMemoryMetadata();
1385 commands.apply(metadata);
1386 if (getIndices().empty() && !metadata.indices.empty() &&
1387 !settings.allow_experimental_data_skipping_indices)
1388 throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
1389 "before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
1390
1391 /// Set of columns that shouldn't be altered.
1392 NameSet columns_alter_type_forbidden;
1393
1394 /// Primary key columns can be ALTERed only if they are used in the key as-is
1395 /// (and not as a part of some expression) and if the ALTER only affects column metadata.
1396 NameSet columns_alter_type_metadata_only;
1397
1398 if (partition_key_expr)
1399 {
1400 /// Forbid altering partition key columns because it can change partition ID format.
1401 /// TODO: in some cases (e.g. adding an Enum value) a partition key column can still be ALTERed.
1402 /// We should allow it.
1403 for (const String & col : partition_key_expr->getRequiredColumns())
1404 columns_alter_type_forbidden.insert(col);
1405 }
1406
1407 for (const auto & index : skip_indices)
1408 {
1409 for (const String & col : index->expr->getRequiredColumns())
1410 columns_alter_type_forbidden.insert(col);
1411 }
1412
1413 if (sorting_key_expr)
1414 {
1415 for (const ExpressionAction & action : sorting_key_expr->getActions())
1416 {
1417 auto action_columns = action.getNeededColumns();
1418 columns_alter_type_forbidden.insert(action_columns.begin(), action_columns.end());
1419 }
1420 for (const String & col : sorting_key_expr->getRequiredColumns())
1421 columns_alter_type_metadata_only.insert(col);
1422
1423 /// We don't process sample_by_ast separately because it must be among the primary key columns
1424 /// and we don't process primary_key_expr separately because it is a prefix of sorting_key_expr.
1425 }
1426 if (!merging_params.sign_column.empty())
1427 columns_alter_type_forbidden.insert(merging_params.sign_column);
1428
1429 std::map<String, const IDataType *> old_types;
1430 for (const auto & column : getColumns().getAllPhysical())
1431 old_types.emplace(column.name, column.type.get());
1432
1433 for (const AlterCommand & command : commands)
1434 {
1435 if (command.type == AlterCommand::MODIFY_ORDER_BY && !is_custom_partitioned)
1436 {
1437 throw Exception(
1438 "ALTER MODIFY ORDER BY is not supported for default-partitioned tables created with the old syntax",
1439 ErrorCodes::BAD_ARGUMENTS);
1440 }
1441 else if (command.isModifyingData())
1442 {
1443 if (columns_alter_type_forbidden.count(command.column_name))
1444 throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
1445
1446 if (columns_alter_type_metadata_only.count(command.column_name))
1447 {
1448 if (command.type == AlterCommand::MODIFY_COLUMN)
1449 {
1450 auto it = old_types.find(command.column_name);
1451 if (it == old_types.end() || !isMetadataOnlyConversion(it->second, command.data_type.get()))
1452 throw Exception("ALTER of key column " + command.column_name + " must be metadata-only", ErrorCodes::ILLEGAL_COLUMN);
1453 }
1454 }
1455 }
1456 }
1457
1458 setProperties(metadata, /* only_check = */ true);
1459
1460 setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast, /* only_check = */ true);
1461
1462 if (settings_ast)
1463 {
1464 const auto & current_changes = settings_ast->as<const ASTSetQuery &>().changes;
1465 for (const auto & changed_setting : metadata.settings_ast->as<const ASTSetQuery &>().changes)
1466 {
1467 if (MergeTreeSettings::findIndex(changed_setting.name) == MergeTreeSettings::npos)
1468 throw Exception{"Storage '" + getName() + "' doesn't have setting '" + changed_setting.name + "'",
1469 ErrorCodes::UNKNOWN_SETTING};
1470
1471 auto comparator = [&changed_setting](const auto & change) { return change.name == changed_setting.name; };
1472
1473 auto current_setting_it
1474 = std::find_if(current_changes.begin(), current_changes.end(), comparator);
1475
1476 if ((current_setting_it == current_changes.end() || *current_setting_it != changed_setting)
1477 && MergeTreeSettings::isReadonlySetting(changed_setting.name))
1478 {
1479 throw Exception{"Setting '" + changed_setting.name + "' is readonly for storage '" + getName() + "'",
1480 ErrorCodes::READONLY_SETTING};
1481 }
1482 }
1483 }
1484
1485 if (commands.isModifyingData())
1486 {
1487 /// Check that type conversions are possible.
1488 ExpressionActionsPtr unused_expression;
1489 NameToNameMap unused_map;
1490 bool unused_bool;
1491 createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(),
1492 getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool);
1493 }
1494}
1495
1496void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns,
1497 const NamesAndTypesList & new_columns, const IndicesASTs & old_indices, const IndicesASTs & new_indices,
1498 ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
1499{
1500 const auto settings = getSettings();
1501 out_expression = nullptr;
1502 out_rename_map = {};
1503 out_force_update_metadata = false;
1504 String part_mrk_file_extension;
1505 if (part)
1506 part_mrk_file_extension = part->index_granularity_info.marks_file_extension;
1507 else
1508 part_mrk_file_extension = settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
1509
1510 using NameToType = std::map<String, const IDataType *>;
1511 NameToType new_types;
1512 for (const NameAndTypePair & column : new_columns)
1513 new_types.emplace(column.name, column.type.get());
1514
1515 /// For every column that need to be converted: source column name, column name of calculated expression for conversion.
1516 std::vector<std::pair<String, String>> conversions;
1517
1518
1519 /// Remove old indices
1520 std::unordered_set<String> new_indices_set;
1521 for (const auto & index_decl : new_indices)
1522 new_indices_set.emplace(index_decl->as<ASTIndexDeclaration &>().name);
1523 for (const auto & index_decl : old_indices)
1524 {
1525 const auto & index = index_decl->as<ASTIndexDeclaration &>();
1526 if (!new_indices_set.count(index.name))
1527 {
1528 out_rename_map["skp_idx_" + index.name + ".idx"] = ""; /// drop this file
1529 out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = ""; /// and this one
1530 }
1531 }
1532
1533 /// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
1534 std::map<String, size_t> stream_counts;
1535 for (const NameAndTypePair & column : old_columns)
1536 {
1537 column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
1538 {
1539 ++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
1540 }, {});
1541 }
1542
1543 for (const NameAndTypePair & column : old_columns)
1544 {
1545 if (!new_types.count(column.name))
1546 {
1547 /// The column was deleted.
1548 if (!part || part->hasColumnFiles(column.name, *column.type))
1549 {
1550 column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
1551 {
1552 String file_name = IDataType::getFileNameForStream(column.name, substream_path);
1553
1554 /// Delete files if they are no longer shared with another column.
1555 if (--stream_counts[file_name] == 0)
1556 {
1557 out_rename_map[file_name + ".bin"] = ""; /// drop this file
1558 out_rename_map[file_name + part_mrk_file_extension] = ""; /// and this one
1559 }
1560 }, {});
1561 }
1562 }
1563 else
1564 {
1565 /// The column was converted. Collect conversions.
1566 const auto * new_type = new_types[column.name];
1567 const String new_type_name = new_type->getName();
1568 const auto * old_type = column.type.get();
1569
1570 if (!new_type->equals(*old_type) && (!part || part->hasColumnFiles(column.name, *column.type)))
1571 {
1572 if (isMetadataOnlyConversion(old_type, new_type))
1573 {
1574 out_force_update_metadata = true;
1575 continue;
1576 }
1577
1578 /// Need to modify column type.
1579 if (!out_expression)
1580 out_expression = std::make_shared<ExpressionActions>(NamesAndTypesList(), global_context);
1581
1582 out_expression->addInput(ColumnWithTypeAndName(nullptr, column.type, column.name));
1583
1584 Names out_names;
1585
1586 /// This is temporary name for expression. TODO Invent the name more safely.
1587 const String new_type_name_column = '#' + new_type_name + "_column";
1588 out_expression->add(ExpressionAction::addColumn(
1589 { DataTypeString().createColumnConst(1, new_type_name), std::make_shared<DataTypeString>(), new_type_name_column }));
1590
1591 const auto & function = FunctionFactory::instance().get("CAST", global_context);
1592 out_expression->add(ExpressionAction::applyFunction(
1593 function, Names{column.name, new_type_name_column}), out_names);
1594
1595 out_expression->add(ExpressionAction::removeColumn(new_type_name_column));
1596 out_expression->add(ExpressionAction::removeColumn(column.name));
1597
1598 conversions.emplace_back(column.name, out_names.at(0));
1599
1600 }
1601 }
1602 }
1603
1604 if (!conversions.empty())
1605 {
1606 /// Give proper names for temporary columns with conversion results.
1607
1608 NamesWithAliases projection;
1609 projection.reserve(conversions.size());
1610
1611 for (const auto & source_and_expression : conversions)
1612 {
1613 /// Column name for temporary filenames before renaming. NOTE The is unnecessarily tricky.
1614
1615 String original_column_name = source_and_expression.first;
1616 String temporary_column_name = original_column_name + " converting";
1617
1618 projection.emplace_back(source_and_expression.second, temporary_column_name);
1619
1620 /// After conversion, we need to rename temporary files into original.
1621
1622 new_types[source_and_expression.first]->enumerateStreams(
1623 [&](const IDataType::SubstreamPath & substream_path)
1624 {
1625 /// Skip array sizes, because they cannot be modified in ALTER.
1626 if (!substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
1627 return;
1628
1629 String original_file_name = IDataType::getFileNameForStream(original_column_name, substream_path);
1630 String temporary_file_name = IDataType::getFileNameForStream(temporary_column_name, substream_path);
1631
1632 out_rename_map[temporary_file_name + ".bin"] = original_file_name + ".bin";
1633 out_rename_map[temporary_file_name + part_mrk_file_extension] = original_file_name + part_mrk_file_extension;
1634 }, {});
1635 }
1636
1637 out_expression->add(ExpressionAction::project(projection));
1638 }
1639
1640 if (part && !out_rename_map.empty())
1641 {
1642 WriteBufferFromOwnString out;
1643 out << "Will ";
1644 bool first = true;
1645 for (const auto & from_to : out_rename_map)
1646 {
1647 if (!first)
1648 out << ", ";
1649 first = false;
1650 if (from_to.second.empty())
1651 out << "remove " << from_to.first;
1652 else
1653 out << "rename " << from_to.first << " to " << from_to.second;
1654 }
1655 out << " in part " << part->name;
1656 LOG_DEBUG(log, out.str());
1657 }
1658}
1659
1660void MergeTreeData::alterDataPart(
1661 const NamesAndTypesList & new_columns,
1662 const IndicesASTs & new_indices,
1663 bool skip_sanity_checks,
1664 AlterDataPartTransactionPtr & transaction)
1665{
1666 const auto settings = getSettings();
1667 ExpressionActionsPtr expression;
1668 const auto & part = transaction->getDataPart();
1669 bool force_update_metadata;
1670 createConvertExpression(part, part->columns, new_columns,
1671 getIndices().indices, new_indices,
1672 expression, transaction->rename_map, force_update_metadata);
1673
1674 size_t num_files_to_modify = transaction->rename_map.size();
1675 size_t num_files_to_remove = 0;
1676
1677 for (const auto & from_to : transaction->rename_map)
1678 if (from_to.second.empty())
1679 ++num_files_to_remove;
1680
1681 if (!skip_sanity_checks
1682 && (num_files_to_modify > settings->max_files_to_modify_in_alter_columns
1683 || num_files_to_remove > settings->max_files_to_remove_in_alter_columns))
1684 {
1685 transaction->clear();
1686
1687 const bool forbidden_because_of_modify = num_files_to_modify > settings->max_files_to_modify_in_alter_columns;
1688
1689 std::stringstream exception_message;
1690 exception_message
1691 << "Suspiciously many ("
1692 << (forbidden_because_of_modify ? num_files_to_modify : num_files_to_remove)
1693 << ") files (";
1694
1695 bool first = true;
1696 for (const auto & from_to : transaction->rename_map)
1697 {
1698 if (!first)
1699 exception_message << ", ";
1700 if (forbidden_because_of_modify)
1701 {
1702 exception_message << "from " << backQuote(from_to.first) << " to " << backQuote(from_to.second);
1703 first = false;
1704 }
1705 else if (from_to.second.empty())
1706 {
1707 exception_message << backQuote(from_to.first);
1708 first = false;
1709 }
1710 }
1711
1712 exception_message
1713 << ") need to be "
1714 << (forbidden_because_of_modify ? "modified" : "removed")
1715 << " in part " << part->name << " of table at " << part->getFullPath() << ". Aborting just in case."
1716 << " If it is not an error, you could increase merge_tree/"
1717 << (forbidden_because_of_modify ? "max_files_to_modify_in_alter_columns" : "max_files_to_remove_in_alter_columns")
1718 << " parameter in configuration file (current value: "
1719 << (forbidden_because_of_modify ? settings->max_files_to_modify_in_alter_columns : settings->max_files_to_remove_in_alter_columns)
1720 << ")";
1721
1722 throw Exception(exception_message.str(), ErrorCodes::TABLE_DIFFERS_TOO_MUCH);
1723 }
1724
1725 DataPart::Checksums add_checksums;
1726
1727 if (transaction->rename_map.empty() && !force_update_metadata)
1728 {
1729 transaction->clear();
1730 return;
1731 }
1732
1733 /// Apply the expression and write the result to temporary files.
1734 if (expression)
1735 {
1736 BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
1737 *this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
1738
1739
1740 auto compression_codec = global_context.chooseCompressionCodec(
1741 part->bytes_on_disk,
1742 static_cast<double>(part->bytes_on_disk) / this->getTotalActiveSizeInBytes());
1743 ExpressionBlockInputStream in(part_in, expression);
1744
1745 /** Don't write offsets for arrays, because ALTER never change them
1746 * (MODIFY COLUMN could only change types of elements but never modify array sizes).
1747 * Also note that they does not participate in 'rename_map'.
1748 * Also note, that for columns, that are parts of Nested,
1749 * temporary column name ('converting_column_name') created in 'createConvertExpression' method
1750 * will have old name of shared offsets for arrays.
1751 */
1752 IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
1753
1754 MergedColumnOnlyOutputStream out(
1755 *this,
1756 in.getHeader(),
1757 part->getFullPath(),
1758 true /* sync */,
1759 compression_codec,
1760 true /* skip_offsets */,
1761 /// Don't recalc indices because indices alter is restricted
1762 std::vector<MergeTreeIndexPtr>{},
1763 unused_written_offsets,
1764 part->index_granularity,
1765 &part->index_granularity_info);
1766
1767 in.readPrefix();
1768 out.writePrefix();
1769
1770 while (Block b = in.read())
1771 out.write(b);
1772
1773 in.readSuffix();
1774 add_checksums = out.writeSuffixAndGetChecksums();
1775 }
1776
1777 /// Update the checksums.
1778 DataPart::Checksums new_checksums = part->checksums;
1779 for (auto it : transaction->rename_map)
1780 {
1781 if (it.second.empty())
1782 new_checksums.files.erase(it.first);
1783 else
1784 new_checksums.files[it.second] = add_checksums.files[it.first];
1785 }
1786
1787 /// Write the checksums to the temporary file.
1788 if (!part->checksums.empty())
1789 {
1790 transaction->new_checksums = new_checksums;
1791 WriteBufferFromFile checksums_file(part->getFullPath() + "checksums.txt.tmp", 4096);
1792 new_checksums.write(checksums_file);
1793 transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
1794 }
1795
1796 /// Write the new column list to the temporary file.
1797 {
1798 transaction->new_columns = new_columns.filter(part->columns.getNames());
1799 WriteBufferFromFile columns_file(part->getFullPath() + "columns.txt.tmp", 4096);
1800 transaction->new_columns.writeText(columns_file);
1801 transaction->rename_map["columns.txt.tmp"] = "columns.txt";
1802 }
1803
1804 return;
1805}
1806
1807void MergeTreeData::changeSettings(
1808 const ASTPtr & new_settings,
1809 TableStructureWriteLockHolder & /* table_lock_holder */)
1810{
1811 if (new_settings)
1812 {
1813 const auto & new_changes = new_settings->as<const ASTSetQuery &>().changes;
1814 MergeTreeSettings copy = *getSettings();
1815 copy.applyChanges(new_changes);
1816 storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
1817 settings_ast = new_settings;
1818 }
1819}
1820
1821void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
1822{
1823 auto & empty_columns = data_part->empty_columns;
1824 if (empty_columns.empty())
1825 return;
1826
1827 NamesAndTypesList new_columns;
1828 for (const auto & [name, type] : data_part->columns)
1829 if (!empty_columns.count(name))
1830 new_columns.emplace_back(name, type);
1831
1832 std::stringstream log_message;
1833 for (auto it = empty_columns.begin(); it != empty_columns.end(); ++it)
1834 {
1835 if (it != empty_columns.begin())
1836 log_message << ", ";
1837 log_message << *it;
1838 }
1839
1840 LOG_INFO(log, "Removing empty columns: " << log_message.str() << " from part " << data_part->name);
1841 AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(data_part));
1842 alterDataPart(new_columns, getIndices().indices, false, transaction);
1843 if (transaction->isValid())
1844 transaction->commit();
1845
1846 empty_columns.clear();
1847}
1848
1849void MergeTreeData::freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder &)
1850{
1851 freezePartitionsByMatcher([] (const DataPartPtr &){ return true; }, with_name, context);
1852}
1853
1854
1855bool MergeTreeData::AlterDataPartTransaction::isValid() const
1856{
1857 return valid && data_part;
1858}
1859
1860void MergeTreeData::AlterDataPartTransaction::clear()
1861{
1862 valid = false;
1863}
1864
1865void MergeTreeData::AlterDataPartTransaction::commit()
1866{
1867 if (!isValid())
1868 return;
1869 if (!data_part)
1870 return;
1871
1872 try
1873 {
1874 std::unique_lock<std::shared_mutex> lock(data_part->columns_lock);
1875
1876 String path = data_part->getFullPath();
1877
1878 /// NOTE: checking that a file exists before renaming or deleting it
1879 /// is justified by the fact that, when converting an ordinary column
1880 /// to a nullable column, new files are created which did not exist
1881 /// before, i.e. they do not have older versions.
1882
1883 /// 1) Rename the old files.
1884 for (const auto & from_to : rename_map)
1885 {
1886 String name = from_to.second.empty() ? from_to.first : from_to.second;
1887 Poco::File file{path + name};
1888 if (file.exists())
1889 file.renameTo(path + name + ".tmp2");
1890 }
1891
1892 /// 2) Move new files in the place of old and update the metadata in memory.
1893 for (const auto & from_to : rename_map)
1894 {
1895 if (!from_to.second.empty())
1896 Poco::File{path + from_to.first}.renameTo(path + from_to.second);
1897 }
1898
1899 auto & mutable_part = const_cast<DataPart &>(*data_part);
1900 mutable_part.checksums = new_checksums;
1901 mutable_part.columns = new_columns;
1902
1903 /// 3) Delete the old files and drop required columns (DROP COLUMN)
1904 for (const auto & from_to : rename_map)
1905 {
1906 String name = from_to.second.empty() ? from_to.first : from_to.second;
1907 Poco::File file{path + name + ".tmp2"};
1908 if (file.exists())
1909 file.remove();
1910 }
1911
1912 mutable_part.bytes_on_disk = new_checksums.getTotalSizeOnDisk();
1913
1914 /// TODO: we can skip resetting caches when the column is added.
1915 data_part->storage.global_context.dropCaches();
1916
1917 clear();
1918 }
1919 catch (...)
1920 {
1921 /// Don't delete temporary files in the destructor in case something went wrong.
1922 clear();
1923 throw;
1924 }
1925}
1926
1927MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
1928{
1929
1930 if (!isValid())
1931 return;
1932 if (!data_part)
1933 return;
1934
1935 try
1936 {
1937 LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->relative_path);
1938
1939 String path = data_part->getFullPath();
1940 for (const auto & from_to : rename_map)
1941 {
1942 if (!from_to.second.empty())
1943 {
1944 try
1945 {
1946 Poco::File file(path + from_to.first);
1947 if (file.exists())
1948 file.remove();
1949 }
1950 catch (Poco::Exception & e)
1951 {
1952 LOG_WARNING(data_part->storage.log, "Can't remove " << path + from_to.first << ": " << e.displayText());
1953 }
1954 }
1955 }
1956 }
1957 catch (...)
1958 {
1959 tryLogCurrentException(__PRETTY_FUNCTION__);
1960 }
1961}
1962
1963void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
1964{
1965 old_and_new_names.push_back({old_name, new_name});
1966 const auto paths = storage.getDataPaths();
1967 for (const auto & full_path : paths)
1968 {
1969 for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
1970 {
1971 String name = it.name();
1972 if (name == old_name)
1973 {
1974 old_part_name_to_full_path[old_name] = full_path;
1975 break;
1976 }
1977 }
1978 }
1979}
1980
1981void MergeTreeData::PartsTemporaryRename::tryRenameAll()
1982{
1983 renamed = true;
1984 for (size_t i = 0; i < old_and_new_names.size(); ++i)
1985 {
1986 try
1987 {
1988 const auto & names = old_and_new_names[i];
1989 if (names.first.empty() || names.second.empty())
1990 throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME);
1991 const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name
1992 Poco::File(full_path + names.first).renameTo(full_path + names.second);
1993 }
1994 catch (...)
1995 {
1996 old_and_new_names.resize(i);
1997 LOG_WARNING(storage.log, "Cannot rename parts to perform operation on them: " << getCurrentExceptionMessage(false));
1998 throw;
1999 }
2000 }
2001}
2002
2003MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
2004{
2005 // TODO what if server had crashed before this destructor was called?
2006 if (!renamed)
2007 return;
2008 for (const auto & names : old_and_new_names)
2009 {
2010 if (names.first.empty())
2011 continue;
2012
2013 try
2014 {
2015 const auto full_path = old_part_name_to_full_path[names.first] + source_dir; /// old_name
2016 Poco::File(full_path + names.second).renameTo(full_path + names.first);
2017 }
2018 catch (...)
2019 {
2020 tryLogCurrentException(__PRETTY_FUNCTION__);
2021 }
2022 }
2023}
2024
2025
2026MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
2027 const MergeTreePartInfo & new_part_info,
2028 const String & new_part_name,
2029 DataPartPtr & out_covering_part,
2030 DataPartsLock & /* data_parts_lock */) const
2031{
2032 /// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
2033 auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{DataPartState::Committed, new_part_info});
2034 auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
2035
2036 /// Go to the left.
2037 DataPartIteratorByStateAndInfo begin = it_middle;
2038 while (begin != committed_parts_range.begin())
2039 {
2040 auto prev = std::prev(begin);
2041
2042 if (!new_part_info.contains((*prev)->info))
2043 {
2044 if ((*prev)->info.contains(new_part_info))
2045 {
2046 out_covering_part = *prev;
2047 return {};
2048 }
2049
2050 if (!new_part_info.isDisjoint((*prev)->info))
2051 throw Exception("Part " + new_part_name + " intersects previous part " + (*prev)->getNameWithState() +
2052 ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
2053
2054 break;
2055 }
2056
2057 begin = prev;
2058 }
2059
2060 /// Go to the right.
2061 DataPartIteratorByStateAndInfo end = it_middle;
2062 while (end != committed_parts_range.end())
2063 {
2064 if ((*end)->info == new_part_info)
2065 throw Exception("Unexpected duplicate part " + (*end)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
2066
2067 if (!new_part_info.contains((*end)->info))
2068 {
2069 if ((*end)->info.contains(new_part_info))
2070 {
2071 out_covering_part = *end;
2072 return {};
2073 }
2074
2075 if (!new_part_info.isDisjoint((*end)->info))
2076 throw Exception("Part " + new_part_name + " intersects next part " + (*end)->getNameWithState() +
2077 ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
2078
2079 break;
2080 }
2081
2082 ++end;
2083 }
2084
2085 return DataPartsVector{begin, end};
2086}
2087
2088
2089void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
2090{
2091 auto removed = renameTempPartAndReplace(part, increment, out_transaction);
2092 if (!removed.empty())
2093 throw Exception("Added part " + part->name + " covers " + toString(removed.size())
2094 + " existing part(s) (including " + removed[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
2095}
2096
2097
2098void MergeTreeData::renameTempPartAndReplace(
2099 MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
2100 std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts)
2101{
2102 if (out_transaction && &out_transaction->data != this)
2103 throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
2104 ErrorCodes::LOGICAL_ERROR);
2105
2106 part->assertState({DataPartState::Temporary});
2107
2108 MergeTreePartInfo part_info = part->info;
2109 String part_name;
2110
2111 if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
2112 {
2113 if (part->partition.value != existing_part_in_partition->partition.value)
2114 throw Exception(
2115 "Partition value mismatch between two parts with the same partition ID. Existing part: "
2116 + existing_part_in_partition->name + ", newly added part: " + part->name,
2117 ErrorCodes::CORRUPTED_DATA);
2118 }
2119
2120 /** It is important that obtaining new block number and adding that block to parts set is done atomically.
2121 * Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
2122 */
2123 if (increment)
2124 {
2125 part_info.min_block = part_info.max_block = increment->get();
2126 part_info.mutation = 0; /// it's equal to min_block by default
2127 part_name = part->getNewName(part_info);
2128 }
2129 else
2130 part_name = part->name;
2131
2132 LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << part_name << ".");
2133
2134 auto it_duplicate = data_parts_by_info.find(part_info);
2135 if (it_duplicate != data_parts_by_info.end())
2136 {
2137 String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
2138
2139 if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
2140 throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
2141
2142 throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
2143 }
2144
2145 DataPartPtr covering_part;
2146 DataPartsVector covered_parts = getActivePartsToReplace(part_info, part_name, covering_part, lock);
2147
2148 if (covering_part)
2149 {
2150 LOG_WARNING(log, "Tried to add obsolete part " << part_name << " covered by " << covering_part->getNameWithState());
2151 return;
2152 }
2153
2154 /// All checks are passed. Now we can rename the part on disk.
2155 /// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
2156 ///
2157 /// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.
2158 part->name = part_name;
2159 part->info = part_info;
2160 part->is_temp = false;
2161 part->state = DataPartState::PreCommitted;
2162 part->renameTo(part_name);
2163
2164 auto part_it = data_parts_indexes.insert(part).first;
2165
2166 if (out_transaction)
2167 {
2168 out_transaction->precommitted_parts.insert(part);
2169 }
2170 else
2171 {
2172 auto current_time = time(nullptr);
2173 for (const DataPartPtr & covered_part : covered_parts)
2174 {
2175 covered_part->remove_time.store(current_time, std::memory_order_relaxed);
2176 modifyPartState(covered_part, DataPartState::Outdated);
2177 removePartContributionToColumnSizes(covered_part);
2178 }
2179
2180 modifyPartState(part_it, DataPartState::Committed);
2181 addPartContributionToColumnSizes(part);
2182 }
2183
2184 if (out_covered_parts)
2185 {
2186 for (DataPartPtr & covered_part : covered_parts)
2187 out_covered_parts->emplace_back(std::move(covered_part));
2188 }
2189}
2190
2191MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
2192 MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
2193{
2194 if (out_transaction && &out_transaction->data != this)
2195 throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
2196 ErrorCodes::LOGICAL_ERROR);
2197
2198 DataPartsVector covered_parts;
2199 {
2200 auto lock = lockParts();
2201 renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts);
2202 }
2203 return covered_parts;
2204}
2205
2206void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & /*acquired_lock*/)
2207{
2208 auto remove_time = clear_without_timeout ? 0 : time(nullptr);
2209
2210 for (const DataPartPtr & part : remove)
2211 {
2212 if (part->state == MergeTreeDataPart::State::Committed)
2213 removePartContributionToColumnSizes(part);
2214
2215 if (part->state == MergeTreeDataPart::State::Committed || clear_without_timeout)
2216 part->remove_time.store(remove_time, std::memory_order_relaxed);
2217
2218 if (part->state != MergeTreeDataPart::State::Outdated)
2219 modifyPartState(part, MergeTreeDataPart::State::Outdated);
2220 }
2221}
2222
2223void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock)
2224{
2225 auto lock = (acquired_lock) ? DataPartsLock() : lockParts();
2226
2227 for (auto & part : remove)
2228 {
2229 if (!data_parts_by_info.count(part->info))
2230 throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
2231
2232 part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
2233 }
2234
2235 removePartsFromWorkingSet(remove, clear_without_timeout, lock);
2236}
2237
2238MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(const MergeTreePartInfo & drop_range, bool clear_without_timeout,
2239 bool skip_intersecting_parts, DataPartsLock & lock)
2240{
2241 DataPartsVector parts_to_remove;
2242
2243 if (drop_range.min_block > drop_range.max_block)
2244 return parts_to_remove;
2245
2246 auto partition_range = getDataPartsPartitionRange(drop_range.partition_id);
2247
2248 for (const DataPartPtr & part : partition_range)
2249 {
2250 if (part->info.partition_id != drop_range.partition_id)
2251 throw Exception("Unexpected partition_id of part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
2252
2253 if (part->info.min_block < drop_range.min_block)
2254 {
2255 if (drop_range.min_block <= part->info.max_block)
2256 {
2257 /// Intersect left border
2258 String error = "Unexpected merged part " + part->name + " intersecting drop range " + drop_range.getPartName();
2259 if (!skip_intersecting_parts)
2260 throw Exception(error, ErrorCodes::LOGICAL_ERROR);
2261
2262 LOG_WARNING(log, error);
2263 }
2264
2265 continue;
2266 }
2267
2268 /// Stop on new parts
2269 if (part->info.min_block > drop_range.max_block)
2270 break;
2271
2272 if (part->info.min_block <= drop_range.max_block && drop_range.max_block < part->info.max_block)
2273 {
2274 /// Intersect right border
2275 String error = "Unexpected merged part " + part->name + " intersecting drop range " + drop_range.getPartName();
2276 if (!skip_intersecting_parts)
2277 throw Exception(error, ErrorCodes::LOGICAL_ERROR);
2278
2279 LOG_WARNING(log, error);
2280 continue;
2281 }
2282
2283 if (part->state != DataPartState::Deleting)
2284 parts_to_remove.emplace_back(part);
2285 }
2286
2287 removePartsFromWorkingSet(parts_to_remove, clear_without_timeout, lock);
2288
2289 return parts_to_remove;
2290}
2291
2292void MergeTreeData::forgetPartAndMoveToDetached(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool
2293restore_covered)
2294{
2295 LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and forgiving it.");
2296
2297 auto lock = lockParts();
2298
2299 auto it_part = data_parts_by_info.find(part_to_detach->info);
2300 if (it_part == data_parts_by_info.end())
2301 throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART);
2302
2303 /// What if part_to_detach is a reference to *it_part? Make a new owner just in case.
2304 DataPartPtr part = *it_part;
2305
2306 if (part->state == DataPartState::Committed)
2307 removePartContributionToColumnSizes(part);
2308 modifyPartState(it_part, DataPartState::Deleting);
2309
2310 part->renameToDetached(prefix);
2311
2312 data_parts_indexes.erase(it_part);
2313
2314 if (restore_covered && part->info.level == 0)
2315 {
2316 LOG_WARNING(log, "Will not recover parts covered by zero-level part " << part->name);
2317 return;
2318 }
2319
2320 if (restore_covered)
2321 {
2322 Strings restored;
2323 bool error = false;
2324 String error_parts;
2325
2326 Int64 pos = part->info.min_block;
2327
2328 auto is_appropriate_state = [] (DataPartState state)
2329 {
2330 return state == DataPartState::Committed || state == DataPartState::Outdated;
2331 };
2332
2333 auto update_error = [&] (DataPartIteratorByInfo it)
2334 {
2335 error = true;
2336 error_parts += (*it)->getNameWithState() + " ";
2337 };
2338
2339 auto it_middle = data_parts_by_info.lower_bound(part->info);
2340
2341 /// Restore the leftmost part covered by the part
2342 if (it_middle != data_parts_by_info.begin())
2343 {
2344 auto it = std::prev(it_middle);
2345
2346 if (part->contains(**it) && is_appropriate_state((*it)->state))
2347 {
2348 /// Maybe, we must consider part level somehow
2349 if ((*it)->info.min_block != part->info.min_block)
2350 update_error(it);
2351
2352 if ((*it)->state != DataPartState::Committed)
2353 {
2354 addPartContributionToColumnSizes(*it);
2355 modifyPartState(it, DataPartState::Committed); // iterator is not invalidated here
2356 }
2357
2358 pos = (*it)->info.max_block + 1;
2359 restored.push_back((*it)->name);
2360 }
2361 else
2362 update_error(it);
2363 }
2364 else
2365 error = true;
2366
2367 /// Restore "right" parts
2368 for (auto it = it_middle; it != data_parts_by_info.end() && part->contains(**it); ++it)
2369 {
2370 if ((*it)->info.min_block < pos)
2371 continue;
2372
2373 if (!is_appropriate_state((*it)->state))
2374 {
2375 update_error(it);
2376 continue;
2377 }
2378
2379 if ((*it)->info.min_block > pos)
2380 update_error(it);
2381
2382 if ((*it)->state != DataPartState::Committed)
2383 {
2384 addPartContributionToColumnSizes(*it);
2385 modifyPartState(it, DataPartState::Committed);
2386 }
2387
2388 pos = (*it)->info.max_block + 1;
2389 restored.push_back((*it)->name);
2390 }
2391
2392 if (pos != part->info.max_block + 1)
2393 error = true;
2394
2395 for (const String & name : restored)
2396 {
2397 LOG_INFO(log, "Activated part " << name);
2398 }
2399
2400 if (error)
2401 {
2402 LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete."
2403 << " There might or might not be a data loss."
2404 << (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
2405 }
2406 }
2407}
2408
2409
2410void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
2411{
2412 DataPartPtr part_to_delete;
2413 {
2414 auto lock = lockParts();
2415
2416 LOG_TRACE(log, "Trying to immediately remove part " << part->getNameWithState());
2417
2418 auto it = data_parts_by_info.find(part->info);
2419 if (it == data_parts_by_info.end() || (*it).get() != part.get())
2420 throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
2421
2422 part.reset();
2423
2424 if (!((*it)->state == DataPartState::Outdated && it->unique()))
2425 return;
2426
2427 modifyPartState(it, DataPartState::Deleting);
2428 part_to_delete = *it;
2429 }
2430
2431 try
2432 {
2433 part_to_delete->remove();
2434 }
2435 catch (...)
2436 {
2437 rollbackDeletingParts({part_to_delete});
2438 throw;
2439 }
2440
2441 removePartsFinally({part_to_delete});
2442 LOG_TRACE(log, "Removed part " << part_to_delete->name);
2443}
2444
2445
2446size_t MergeTreeData::getTotalActiveSizeInBytes() const
2447{
2448 size_t res = 0;
2449 {
2450 auto lock = lockParts();
2451
2452 for (auto & part : getDataPartsStateRange(DataPartState::Committed))
2453 res += part->bytes_on_disk;
2454 }
2455
2456 return res;
2457}
2458
2459
2460size_t MergeTreeData::getTotalActiveSizeInRows() const
2461{
2462 size_t res = 0;
2463 {
2464 auto lock = lockParts();
2465
2466 for (auto & part : getDataPartsStateRange(DataPartState::Committed))
2467 res += part->rows_count;
2468 }
2469
2470 return res;
2471}
2472
2473
2474size_t MergeTreeData::getPartsCount() const
2475{
2476 auto lock = lockParts();
2477
2478 size_t res = 0;
2479 for (const auto & part [[maybe_unused]] : getDataPartsStateRange(DataPartState::Committed))
2480 ++res;
2481
2482 return res;
2483}
2484
2485
2486size_t MergeTreeData::getMaxPartsCountForPartition() const
2487{
2488 auto lock = lockParts();
2489
2490 size_t res = 0;
2491 size_t cur_count = 0;
2492 const String * cur_partition_id = nullptr;
2493
2494 for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
2495 {
2496 if (cur_partition_id && part->info.partition_id == *cur_partition_id)
2497 {
2498 ++cur_count;
2499 }
2500 else
2501 {
2502 cur_partition_id = &part->info.partition_id;
2503 cur_count = 1;
2504 }
2505
2506 res = std::max(res, cur_count);
2507 }
2508
2509 return res;
2510}
2511
2512
2513std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
2514{
2515 auto lock = lockParts();
2516
2517 std::optional<Int64> result;
2518 for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
2519 {
2520 if (!result || *result > part->info.getDataVersion())
2521 result = part->info.getDataVersion();
2522 }
2523
2524 return result;
2525}
2526
2527
2528void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
2529{
2530 const auto settings = getSettings();
2531 const size_t parts_count_in_total = getPartsCount();
2532 if (parts_count_in_total >= settings->max_parts_in_total)
2533 {
2534 ProfileEvents::increment(ProfileEvents::RejectedInserts);
2535 throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
2536 }
2537
2538 const size_t parts_count_in_partition = getMaxPartsCountForPartition();
2539 if (parts_count_in_partition < settings->parts_to_delay_insert)
2540 return;
2541
2542 if (parts_count_in_partition >= settings->parts_to_throw_insert)
2543 {
2544 ProfileEvents::increment(ProfileEvents::RejectedInserts);
2545 throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
2546 }
2547
2548 const size_t max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; /// always > 0
2549 const size_t k = 1 + parts_count_in_partition - settings->parts_to_delay_insert; /// from 1 to max_k
2550 const double delay_milliseconds = ::pow(settings->max_delay_to_insert * 1000, static_cast<double>(k) / max_k);
2551
2552 ProfileEvents::increment(ProfileEvents::DelayedInserts);
2553 ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);
2554
2555 CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts);
2556
2557 LOG_INFO(log, "Delaying inserting block by "
2558 << std::fixed << std::setprecision(4) << delay_milliseconds << " ms. because there are " << parts_count_in_partition << " parts");
2559
2560 if (until)
2561 until->tryWait(delay_milliseconds);
2562 else
2563 std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
2564}
2565
2566void MergeTreeData::throwInsertIfNeeded() const
2567{
2568 const auto settings = getSettings();
2569 const size_t parts_count_in_total = getPartsCount();
2570 if (parts_count_in_total >= settings->max_parts_in_total)
2571 {
2572 ProfileEvents::increment(ProfileEvents::RejectedInserts);
2573 throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
2574 }
2575
2576 const size_t parts_count_in_partition = getMaxPartsCountForPartition();
2577
2578 if (parts_count_in_partition >= settings->parts_to_throw_insert)
2579 {
2580 ProfileEvents::increment(ProfileEvents::RejectedInserts);
2581 throw Exception("Too many parts (" + toString(parts_count_in_partition) + "). Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MANY_PARTS);
2582 }
2583}
2584
2585MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
2586 const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
2587{
2588 auto current_state_parts_range = getDataPartsStateRange(state);
2589
2590 /// The part can be covered only by the previous or the next one in data_parts.
2591 auto it = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{state, part_info});
2592
2593 if (it != current_state_parts_range.end())
2594 {
2595 if ((*it)->info == part_info)
2596 return *it;
2597 if ((*it)->info.contains(part_info))
2598 return *it;
2599 }
2600
2601 if (it != current_state_parts_range.begin())
2602 {
2603 --it;
2604 if ((*it)->info.contains(part_info))
2605 return *it;
2606 }
2607
2608 return nullptr;
2609}
2610
2611void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
2612{
2613 auto lock = lockParts();
2614 for (auto original_active_part : getDataPartsStateRange(DataPartState::Committed))
2615 {
2616 if (part_copy->name == original_active_part->name)
2617 {
2618 auto active_part_it = data_parts_by_info.find(original_active_part->info);
2619 if (active_part_it == data_parts_by_info.end())
2620 throw Exception("Cannot swap part '" + part_copy->name + "', no such active part.", ErrorCodes::NO_SUCH_DATA_PART);
2621
2622 modifyPartState(original_active_part, DataPartState::DeleteOnDestroy);
2623 data_parts_indexes.erase(active_part_it);
2624
2625 auto part_it = data_parts_indexes.insert(part_copy).first;
2626 modifyPartState(part_it, DataPartState::Committed);
2627
2628 Poco::Path marker_path(Poco::Path(original_active_part->getFullPath()), DELETE_ON_DESTROY_MARKER_PATH);
2629 try
2630 {
2631 Poco::File(marker_path).createFile();
2632 }
2633 catch (Poco::Exception & e)
2634 {
2635 LOG_ERROR(log, e.what() << " (while creating DeleteOnDestroy marker: " + backQuote(marker_path.toString()) + ")");
2636 }
2637 return;
2638 }
2639 }
2640 throw Exception("Cannot swap part '" + part_copy->name + "', no such active part.", ErrorCodes::NO_SUCH_DATA_PART);
2641}
2642
2643
2644MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) const
2645{
2646 auto lock = lockParts();
2647 return getActiveContainingPart(part_info, DataPartState::Committed, lock);
2648}
2649
2650MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) const
2651{
2652 auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
2653 return getActiveContainingPart(part_info);
2654}
2655
2656MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id)
2657{
2658 DataPartStateAndPartitionID state_with_partition{state, partition_id};
2659
2660 auto lock = lockParts();
2661 return DataPartsVector(
2662 data_parts_by_state_and_info.lower_bound(state_with_partition),
2663 data_parts_by_state_and_info.upper_bound(state_with_partition));
2664}
2665
2666
2667MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states)
2668{
2669 auto lock = lockParts();
2670
2671 auto it = data_parts_by_info.find(part_info);
2672 if (it == data_parts_by_info.end())
2673 return nullptr;
2674
2675 for (auto state : valid_states)
2676 {
2677 if ((*it)->state == state)
2678 return *it;
2679 }
2680
2681 return nullptr;
2682}
2683
2684MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name, const MergeTreeData::DataPartStates & valid_states)
2685{
2686 return getPartIfExists(MergeTreePartInfo::fromPartName(part_name, format_version), valid_states);
2687}
2688
2689
2690MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path)
2691{
2692 MutableDataPartPtr part = std::make_shared<DataPart>(*this, disk, Poco::Path(relative_path).getFileName());
2693 part->relative_path = relative_path;
2694 loadPartAndFixMetadata(part);
2695 return part;
2696}
2697
2698void MergeTreeData::loadPartAndFixMetadata(MutableDataPartPtr part)
2699{
2700 String full_part_path = part->getFullPath();
2701
2702 /// Earlier the list of columns was written incorrectly. Delete it and re-create.
2703 if (Poco::File(full_part_path + "columns.txt").exists())
2704 Poco::File(full_part_path + "columns.txt").remove();
2705
2706 part->loadColumnsChecksumsIndexes(false, true);
2707 part->modification_time = Poco::File(full_part_path).getLastModified().epochTime();
2708
2709 /// If the checksums file is not present, calculate the checksums and write them to disk.
2710 /// Check the data while we are at it.
2711 if (part->checksums.empty())
2712 {
2713 part->checksums = checkDataPart(part, false, primary_key_data_types, skip_indices);
2714 {
2715 WriteBufferFromFile out(full_part_path + "checksums.txt.tmp", 4096);
2716 part->checksums.write(out);
2717 }
2718
2719 Poco::File(full_part_path + "checksums.txt.tmp").renameTo(full_part_path + "checksums.txt");
2720 }
2721}
2722
2723
2724void MergeTreeData::calculateColumnSizesImpl()
2725{
2726 column_sizes.clear();
2727
2728 /// Take into account only committed parts
2729 auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
2730 for (const auto & part : committed_parts_range)
2731 addPartContributionToColumnSizes(part);
2732}
2733
2734void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
2735{
2736 std::shared_lock<std::shared_mutex> lock(part->columns_lock);
2737
2738 for (const auto & column : part->columns)
2739 {
2740 ColumnSize & total_column_size = column_sizes[column.name];
2741 ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
2742 total_column_size.add(part_column_size);
2743 }
2744}
2745
2746void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
2747{
2748 std::shared_lock<std::shared_mutex> lock(part->columns_lock);
2749
2750 for (const auto & column : part->columns)
2751 {
2752 ColumnSize & total_column_size = column_sizes[column.name];
2753 ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
2754
2755 auto log_subtract = [&](size_t & from, size_t value, const char * field)
2756 {
2757 if (value > from)
2758 LOG_ERROR(log, "Possibly incorrect column size subtraction: "
2759 << from << " - " << value << " = " << from - value
2760 << ", column: " << column.name << ", field: " << field);
2761
2762 from -= value;
2763 };
2764
2765 log_subtract(total_column_size.data_compressed, part_column_size.data_compressed, ".data_compressed");
2766 log_subtract(total_column_size.data_uncompressed, part_column_size.data_uncompressed, ".data_uncompressed");
2767 log_subtract(total_column_size.marks, part_column_size.marks, ".marks");
2768 }
2769}
2770
2771
2772void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String & with_name, const Context & context, TableStructureReadLockHolder &)
2773{
2774 std::optional<String> prefix;
2775 String partition_id;
2776
2777 if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
2778 {
2779 /// Month-partitioning specific - partition value can represent a prefix of the partition to freeze.
2780 if (const auto * partition_lit = partition_ast->as<ASTPartition &>().value->as<ASTLiteral>())
2781 prefix = partition_lit->value.getType() == Field::Types::UInt64
2782 ? toString(partition_lit->value.get<UInt64>())
2783 : partition_lit->value.safeGet<String>();
2784 else
2785 partition_id = getPartitionIDFromQuery(partition_ast, context);
2786 }
2787 else
2788 partition_id = getPartitionIDFromQuery(partition_ast, context);
2789
2790 if (prefix)
2791 LOG_DEBUG(log, "Freezing parts with prefix " + *prefix);
2792 else
2793 LOG_DEBUG(log, "Freezing parts with partition ID " + partition_id);
2794
2795
2796 freezePartitionsByMatcher(
2797 [&prefix, &partition_id](const DataPartPtr & part)
2798 {
2799 if (prefix)
2800 return startsWith(part->info.partition_id, *prefix);
2801 else
2802 return part->info.partition_id == partition_id;
2803 },
2804 with_name,
2805 context);
2806}
2807
2808
2809void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, const Context & context)
2810{
2811 String partition_id;
2812
2813 if (moving_part)
2814 partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
2815 else
2816 partition_id = getPartitionIDFromQuery(partition, context);
2817
2818 DataPartsVector parts;
2819 if (moving_part)
2820 {
2821 auto part_info = MergeTreePartInfo::fromPartName(partition_id, format_version);
2822 parts.push_back(getActiveContainingPart(part_info));
2823 if (!parts.back() || parts.back()->name != part_info.getPartName())
2824 throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
2825 }
2826 else
2827 parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
2828
2829 auto disk = storage_policy->getDiskByName(name);
2830 if (!disk)
2831 throw Exception("Disk " + name + " does not exists on policy " + storage_policy->getName(), ErrorCodes::UNKNOWN_DISK);
2832
2833 parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
2834 {
2835 return part_ptr->disk->getName() == disk->getName();
2836 }), parts.end());
2837
2838 if (parts.empty())
2839 throw Exception("Nothing to move", ErrorCodes::NO_SUCH_DATA_PART);
2840
2841 if (parts.empty())
2842 {
2843 String no_parts_to_move_message;
2844 if (moving_part)
2845 no_parts_to_move_message = "Part '" + partition_id + "' is already on disk '" + disk->getName() + "'";
2846 else
2847 no_parts_to_move_message = "All parts of partition '" + partition_id + "' are already on disk '" + disk->getName() + "'";
2848
2849 throw Exception(no_parts_to_move_message, ErrorCodes::UNKNOWN_DISK);
2850 }
2851
2852 if (!movePartsToSpace(parts, std::static_pointer_cast<Space>(disk)))
2853 throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
2854}
2855
2856
2857void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, const Context & context)
2858{
2859 String partition_id;
2860
2861 if (moving_part)
2862 partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
2863 else
2864 partition_id = getPartitionIDFromQuery(partition, context);
2865
2866 DataPartsVector parts;
2867 if (moving_part)
2868 {
2869 auto part_info = MergeTreePartInfo::fromPartName(partition_id, format_version);
2870 parts.emplace_back(getActiveContainingPart(part_info));
2871 if (!parts.back() || parts.back()->name != part_info.getPartName())
2872 throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
2873 }
2874 else
2875 parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
2876
2877 auto volume = storage_policy->getVolumeByName(name);
2878 if (!volume)
2879 throw Exception("Volume " + name + " does not exists on policy " + storage_policy->getName(), ErrorCodes::UNKNOWN_DISK);
2880
2881 if (parts.empty())
2882 throw Exception("Nothing to move", ErrorCodes::NO_SUCH_DATA_PART);
2883
2884 parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
2885 {
2886 for (const auto & disk : volume->disks)
2887 {
2888 if (part_ptr->disk->getName() == disk->getName())
2889 {
2890 return true;
2891 }
2892 }
2893 return false;
2894 }), parts.end());
2895
2896 if (parts.empty())
2897 {
2898 String no_parts_to_move_message;
2899 if (moving_part)
2900 no_parts_to_move_message = "Part '" + partition_id + "' is already on volume '" + volume->getName() + "'";
2901 else
2902 no_parts_to_move_message = "All parts of partition '" + partition_id + "' are already on volume '" + volume->getName() + "'";
2903
2904 throw Exception(no_parts_to_move_message, ErrorCodes::UNKNOWN_DISK);
2905 }
2906
2907 if (!movePartsToSpace(parts, std::static_pointer_cast<Space>(volume)))
2908 throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
2909}
2910
2911
2912String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context & context)
2913{
2914 const auto & partition_ast = ast->as<ASTPartition &>();
2915
2916 if (!partition_ast.value)
2917 return partition_ast.id;
2918
2919 if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
2920 {
2921 /// Month-partitioning specific - partition ID can be passed in the partition value.
2922 const auto * partition_lit = partition_ast.value->as<ASTLiteral>();
2923 if (partition_lit && partition_lit->value.getType() == Field::Types::String)
2924 {
2925 String partition_id = partition_lit->value.get<String>();
2926 if (partition_id.size() != 6 || !std::all_of(partition_id.begin(), partition_id.end(), isNumericASCII))
2927 throw Exception(
2928 "Invalid partition format: " + partition_id + ". Partition should consist of 6 digits: YYYYMM",
2929 ErrorCodes::INVALID_PARTITION_VALUE);
2930 return partition_id;
2931 }
2932 }
2933
2934 /// Re-parse partition key fields using the information about expected field types.
2935
2936 size_t fields_count = partition_key_sample.columns();
2937 if (partition_ast.fields_count != fields_count)
2938 throw Exception(
2939 "Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
2940 ", must be: " + toString(fields_count),
2941 ErrorCodes::INVALID_PARTITION_VALUE);
2942
2943 const FormatSettings format_settings;
2944 Row partition_row(fields_count);
2945
2946 if (fields_count)
2947 {
2948 ReadBufferFromMemory left_paren_buf("(", 1);
2949 ReadBufferFromMemory fields_buf(partition_ast.fields_str.data(), partition_ast.fields_str.size());
2950 ReadBufferFromMemory right_paren_buf(")", 1);
2951 ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
2952
2953 auto input_stream = FormatFactory::instance().getInput("Values", buf, partition_key_sample, context, context.getSettingsRef().max_block_size);
2954
2955 auto block = input_stream->read();
2956 if (!block || !block.rows())
2957 throw Exception(
2958 "Could not parse partition value: `" + partition_ast.fields_str + "`",
2959 ErrorCodes::INVALID_PARTITION_VALUE);
2960
2961 for (size_t i = 0; i < fields_count; ++i)
2962 block.getByPosition(i).column->get(0, partition_row[i]);
2963 }
2964
2965 MergeTreePartition partition(std::move(partition_row));
2966 String partition_id = partition.getID(*this);
2967
2968 {
2969 auto data_parts_lock = lockParts();
2970 DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
2971 if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
2972 {
2973 WriteBufferFromOwnString buf;
2974 writeCString("Parsed partition value: ", buf);
2975 partition.serializeText(*this, buf, format_settings);
2976 writeCString(" doesn't match partition value for an existing part with the same partition ID: ", buf);
2977 writeString(existing_part_in_partition->name, buf);
2978 throw Exception(buf.str(), ErrorCodes::INVALID_PARTITION_VALUE);
2979 }
2980 }
2981
2982 return partition_id;
2983}
2984
2985MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states) const
2986{
2987 DataPartsVector res;
2988 DataPartsVector buf;
2989 {
2990 auto lock = lockParts();
2991
2992 for (auto state : affordable_states)
2993 {
2994 std::swap(buf, res);
2995 res.clear();
2996
2997 auto range = getDataPartsStateRange(state);
2998 std::merge(range.begin(), range.end(), buf.begin(), buf.end(), std::back_inserter(res), LessDataPart());
2999 }
3000
3001 if (out_states != nullptr)
3002 {
3003 out_states->resize(res.size());
3004 for (size_t i = 0; i < res.size(); ++i)
3005 (*out_states)[i] = res[i]->state;
3006 }
3007 }
3008
3009 return res;
3010}
3011
3012MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeData::DataPartStateVector * out_states) const
3013{
3014 DataPartsVector res;
3015 {
3016 auto lock = lockParts();
3017 res.assign(data_parts_by_info.begin(), data_parts_by_info.end());
3018
3019 if (out_states != nullptr)
3020 {
3021 out_states->resize(res.size());
3022 for (size_t i = 0; i < res.size(); ++i)
3023 (*out_states)[i] = res[i]->state;
3024 }
3025 }
3026
3027 return res;
3028}
3029
3030std::vector<DetachedPartInfo>
3031MergeTreeData::getDetachedParts() const
3032{
3033 std::vector<DetachedPartInfo> res;
3034
3035 for (const auto & [path, disk] : getDataPathsWithDisks())
3036 {
3037 for (Poco::DirectoryIterator it(path + "detached");
3038 it != Poco::DirectoryIterator(); ++it)
3039 {
3040 auto dir_name = it.name();
3041
3042 res.emplace_back();
3043 auto & part = res.back();
3044
3045 DetachedPartInfo::tryParseDetachedPartName(dir_name, part, format_version);
3046 part.disk = disk->getName();
3047 }
3048 }
3049 return res;
3050}
3051
3052void MergeTreeData::validateDetachedPartName(const String & name) const
3053{
3054 if (name.find('/') != std::string::npos || name == "." || name == "..")
3055 throw DB::Exception("Invalid part name '" + name + "'", ErrorCodes::INCORRECT_FILE_NAME);
3056
3057 String full_path = getFullPathForPart(name, "detached/");
3058
3059 if (full_path.empty() || !Poco::File(full_path + name).exists())
3060 throw DB::Exception("Detached part \"" + name + "\" not found" , ErrorCodes::BAD_DATA_PART_NAME);
3061
3062 if (startsWith(name, "attaching_") || startsWith(name, "deleting_"))
3063 throw DB::Exception("Cannot drop part " + name + ": "
3064 "most likely it is used by another DROP or ATTACH query.",
3065 ErrorCodes::BAD_DATA_PART_NAME);
3066}
3067
3068void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Context & context)
3069{
3070 PartsTemporaryRename renamed_parts(*this, "detached/");
3071
3072 if (part)
3073 {
3074 String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
3075 validateDetachedPartName(part_name);
3076 renamed_parts.addPart(part_name, "deleting_" + part_name);
3077 }
3078 else
3079 {
3080 String partition_id = getPartitionIDFromQuery(partition, context);
3081 DetachedPartsInfo detached_parts = getDetachedParts();
3082 for (const auto & part_info : detached_parts)
3083 if (part_info.valid_name && part_info.partition_id == partition_id
3084 && part_info.prefix != "attaching" && part_info.prefix != "deleting")
3085 renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name);
3086 }
3087
3088 LOG_DEBUG(log, "Will drop " << renamed_parts.old_and_new_names.size() << " detached parts.");
3089
3090 renamed_parts.tryRenameAll();
3091
3092 for (auto & [old_name, new_name] : renamed_parts.old_and_new_names)
3093 {
3094 Poco::File(renamed_parts.old_part_name_to_full_path[old_name] + "detached/" + new_name).remove(true);
3095 LOG_DEBUG(log, "Dropped detached part " << old_name);
3096 old_name.clear();
3097 }
3098}
3099
3100MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
3101 const Context & context, PartsTemporaryRename & renamed_parts)
3102{
3103 String source_dir = "detached/";
3104
3105 std::map<String, DiskPtr> name_to_disk;
3106 /// Let's compose a list of parts that should be added.
3107 if (attach_part)
3108 {
3109 String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
3110 validateDetachedPartName(part_id);
3111 renamed_parts.addPart(part_id, "attaching_" + part_id);
3112 if (MergeTreePartInfo::tryParsePartName(part_id, nullptr, format_version))
3113 name_to_disk[part_id] = getDiskForPart(part_id, source_dir);
3114 }
3115 else
3116 {
3117 String partition_id = getPartitionIDFromQuery(partition, context);
3118 LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
3119 ActiveDataPartSet active_parts(format_version);
3120
3121 const auto disks = storage_policy->getDisks();
3122 for (const DiskPtr & disk : disks)
3123 {
3124 const auto full_path = getFullPathOnDisk(disk);
3125 for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
3126 {
3127 const String & name = it.name();
3128 MergeTreePartInfo part_info;
3129 // TODO what if name contains "_tryN" suffix?
3130 /// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
3131 if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
3132 || part_info.partition_id != partition_id)
3133 {
3134 continue;
3135 }
3136 LOG_DEBUG(log, "Found part " << name);
3137 active_parts.add(name);
3138 name_to_disk[name] = disk;
3139 }
3140 }
3141 LOG_DEBUG(log, active_parts.size() << " of them are active");
3142 /// Inactive parts rename so they can not be attached in case of repeated ATTACH.
3143 for (const auto & [name, disk] : name_to_disk)
3144 {
3145 String containing_part = active_parts.getContainingPart(name);
3146 if (!containing_part.empty() && containing_part != name)
3147 {
3148 auto full_path = getFullPathOnDisk(disk);
3149 // TODO maybe use PartsTemporaryRename here?
3150 Poco::File(full_path + source_dir + name)
3151 .renameTo(full_path + source_dir + "inactive_" + name);
3152 }
3153 else
3154 renamed_parts.addPart(name, "attaching_" + name);
3155 }
3156 }
3157
3158
3159 /// Try to rename all parts before attaching to prevent race with DROP DETACHED and another ATTACH.
3160 renamed_parts.tryRenameAll();
3161
3162 /// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist.
3163 LOG_DEBUG(log, "Checking parts");
3164 MutableDataPartsVector loaded_parts;
3165 loaded_parts.reserve(renamed_parts.old_and_new_names.size());
3166 for (const auto & part_names : renamed_parts.old_and_new_names)
3167 {
3168 LOG_DEBUG(log, "Checking part " << part_names.second);
3169 MutableDataPartPtr part = std::make_shared<DataPart>(*this, name_to_disk[part_names.first], part_names.first);
3170 part->relative_path = source_dir + part_names.second;
3171 loadPartAndFixMetadata(part);
3172 loaded_parts.push_back(part);
3173 }
3174
3175 return loaded_parts;
3176}
3177
3178namespace
3179{
3180
3181inline ReservationPtr checkAndReturnReservation(UInt64 expected_size, ReservationPtr reservation)
3182{
3183 if (reservation)
3184 return reservation;
3185
3186 throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(expected_size) + ", not enough space",
3187 ErrorCodes::NOT_ENOUGH_SPACE);
3188}
3189
3190}
3191
3192ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size) const
3193{
3194 expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
3195
3196 auto reservation = storage_policy->reserve(expected_size);
3197
3198 return checkAndReturnReservation(expected_size, std::move(reservation));
3199}
3200
3201ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, SpacePtr space) const
3202{
3203 expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
3204
3205 auto reservation = tryReserveSpace(expected_size, space);
3206
3207 return checkAndReturnReservation(expected_size, std::move(reservation));
3208}
3209
3210ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr space) const
3211{
3212 expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
3213
3214 return space->reserve(expected_size);
3215}
3216
3217ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size,
3218 const MergeTreeDataPart::TTLInfos & ttl_infos,
3219 time_t time_of_move) const
3220{
3221 expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
3222
3223 ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move);
3224
3225 return checkAndReturnReservation(expected_size, std::move(reservation));
3226}
3227
3228ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size,
3229 const MergeTreeDataPart::TTLInfos & ttl_infos,
3230 time_t time_of_move) const
3231{
3232 expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
3233
3234 ReservationPtr reservation;
3235
3236 auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move);
3237 if (ttl_entry != nullptr)
3238 {
3239 SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy);
3240 if (!destination_ptr)
3241 {
3242 if (ttl_entry->destination_type == PartDestinationType::VOLUME)
3243 LOG_WARNING(log, "Would like to reserve space on volume '"
3244 << ttl_entry->destination_name << "' by TTL rule of table '"
3245 << log_name << "' but volume was not found");
3246 else if (ttl_entry->destination_type == PartDestinationType::DISK)
3247 LOG_WARNING(log, "Would like to reserve space on disk '"
3248 << ttl_entry->destination_name << "' by TTL rule of table '"
3249 << log_name << "' but disk was not found");
3250 }
3251 else
3252 {
3253 reservation = destination_ptr->reserve(expected_size);
3254 if (reservation)
3255 return reservation;
3256 }
3257 }
3258
3259 reservation = storage_policy->reserve(expected_size);
3260
3261 return reservation;
3262}
3263
3264SpacePtr MergeTreeData::TTLEntry::getDestination(const StoragePolicyPtr & policy) const
3265{
3266 if (destination_type == PartDestinationType::VOLUME)
3267 return policy->getVolumeByName(destination_name);
3268 else if (destination_type == PartDestinationType::DISK)
3269 return policy->getDiskByName(destination_name);
3270 else
3271 return {};
3272}
3273
3274bool MergeTreeData::TTLEntry::isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const
3275{
3276 if (destination_type == PartDestinationType::VOLUME)
3277 {
3278 for (const auto & disk : policy->getVolumeByName(destination_name)->disks)
3279 if (disk->getName() == part.disk->getName())
3280 return true;
3281 }
3282 else if (destination_type == PartDestinationType::DISK)
3283 return policy->getDiskByName(destination_name)->getName() == part.disk->getName();
3284 return false;
3285}
3286
3287const MergeTreeData::TTLEntry * MergeTreeData::selectTTLEntryForTTLInfos(
3288 const MergeTreeDataPart::TTLInfos & ttl_infos,
3289 time_t time_of_move) const
3290{
3291 const MergeTreeData::TTLEntry * result = nullptr;
3292 /// Prefer TTL rule which went into action last.
3293 time_t max_max_ttl = 0;
3294
3295 for (const auto & ttl_entry : move_ttl_entries)
3296 {
3297 auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column);
3298 if (ttl_info_it != ttl_infos.moves_ttl.end()
3299 && ttl_info_it->second.max <= time_of_move
3300 && max_max_ttl <= ttl_info_it->second.max)
3301 {
3302 result = &ttl_entry;
3303 max_max_ttl = ttl_info_it->second.max;
3304 }
3305 }
3306
3307 return result;
3308}
3309
3310MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
3311{
3312 DataParts res;
3313 {
3314 auto lock = lockParts();
3315 for (auto state : affordable_states)
3316 {
3317 auto range = getDataPartsStateRange(state);
3318 res.insert(range.begin(), range.end());
3319 }
3320 }
3321 return res;
3322}
3323
3324MergeTreeData::DataParts MergeTreeData::getDataParts() const
3325{
3326 return getDataParts({DataPartState::Committed});
3327}
3328
3329MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
3330{
3331 return getDataPartsVector({DataPartState::Committed});
3332}
3333
3334MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
3335 const String & partition_id, DataPartsLock & /*data_parts_lock*/)
3336{
3337 auto it = data_parts_by_state_and_info.lower_bound(DataPartStateAndPartitionID{DataPartState::Committed, partition_id});
3338
3339 if (it != data_parts_by_state_and_info.end() && (*it)->state == DataPartState::Committed && (*it)->info.partition_id == partition_id)
3340 return *it;
3341
3342 return nullptr;
3343}
3344
3345void MergeTreeData::Transaction::rollback()
3346{
3347 if (!isEmpty())
3348 {
3349 std::stringstream ss;
3350 ss << " Removing parts:";
3351 for (const auto & part : precommitted_parts)
3352 ss << " " << part->relative_path;
3353 ss << ".";
3354 LOG_DEBUG(data.log, "Undoing transaction." << ss.str());
3355
3356 data.removePartsFromWorkingSet(
3357 DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
3358 /* clear_without_timeout = */ true);
3359 }
3360
3361 clear();
3362}
3363
3364MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData::DataPartsLock * acquired_parts_lock)
3365{
3366 DataPartsVector total_covered_parts;
3367
3368 if (!isEmpty())
3369 {
3370 auto parts_lock = acquired_parts_lock ? MergeTreeData::DataPartsLock() : data.lockParts();
3371 auto owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
3372
3373 auto current_time = time(nullptr);
3374 for (const DataPartPtr & part : precommitted_parts)
3375 {
3376 DataPartPtr covering_part;
3377 DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, *owing_parts_lock);
3378 if (covering_part)
3379 {
3380 LOG_WARNING(data.log, "Tried to commit obsolete part " << part->name
3381 << " covered by " << covering_part->getNameWithState());
3382
3383 part->remove_time.store(0, std::memory_order_relaxed); /// The part will be removed without waiting for old_parts_lifetime seconds.
3384 data.modifyPartState(part, DataPartState::Outdated);
3385 }
3386 else
3387 {
3388 total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end());
3389 for (const DataPartPtr & covered_part : covered_parts)
3390 {
3391 covered_part->remove_time.store(current_time, std::memory_order_relaxed);
3392 data.modifyPartState(covered_part, DataPartState::Outdated);
3393 data.removePartContributionToColumnSizes(covered_part);
3394 }
3395
3396 data.modifyPartState(part, DataPartState::Committed);
3397 data.addPartContributionToColumnSizes(part);
3398 }
3399 }
3400 }
3401
3402 clear();
3403
3404 return total_covered_parts;
3405}
3406
3407bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
3408{
3409 const String column_name = node->getColumnName();
3410
3411 for (const auto & name : primary_key_columns)
3412 if (column_name == name)
3413 return true;
3414
3415 for (const auto & name : minmax_idx_columns)
3416 if (column_name == name)
3417 return true;
3418
3419 if (const auto * func = node->as<ASTFunction>())
3420 if (func->arguments->children.size() == 1)
3421 return isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front());
3422
3423 return false;
3424}
3425
3426bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const
3427{
3428 /// Make sure that the left side of the IN operator contain part of the key.
3429 /// If there is a tuple on the left side of the IN operator, at least one item of the tuple
3430 /// must be part of the key (probably wrapped by a chain of some acceptable functions).
3431 const auto * left_in_operand_tuple = left_in_operand->as<ASTFunction>();
3432 if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple")
3433 {
3434 for (const auto & item : left_in_operand_tuple->arguments->children)
3435 {
3436 if (isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(item))
3437 return true;
3438 for (const auto & index : skip_indices)
3439 if (index->mayBenefitFromIndexForIn(item))
3440 return true;
3441 }
3442 /// The tuple itself may be part of the primary key, so check that as a last resort.
3443 return isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(left_in_operand);
3444 }
3445 else
3446 {
3447 for (const auto & index : skip_indices)
3448 if (index->mayBenefitFromIndexForIn(left_in_operand))
3449 return true;
3450
3451 return isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(left_in_operand);
3452 }
3453}
3454
3455MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const
3456{
3457 MergeTreeData * src_data = dynamic_cast<MergeTreeData *>(source_table.get());
3458 if (!src_data)
3459 throw Exception("Table " + table_name + " supports attachPartitionFrom only for MergeTree family of table engines."
3460 " Got " + source_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
3461
3462 if (getColumns().getAllPhysical().sizeOfDifference(src_data->getColumns().getAllPhysical()))
3463 throw Exception("Tables have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
3464
3465 auto query_to_string = [] (const ASTPtr & ast)
3466 {
3467 return ast ? queryToString(ast) : "";
3468 };
3469
3470 if (query_to_string(order_by_ast) != query_to_string(src_data->order_by_ast))
3471 throw Exception("Tables have different ordering", ErrorCodes::BAD_ARGUMENTS);
3472
3473 if (query_to_string(partition_by_ast) != query_to_string(src_data->partition_by_ast))
3474 throw Exception("Tables have different partition key", ErrorCodes::BAD_ARGUMENTS);
3475
3476 if (format_version != src_data->format_version)
3477 throw Exception("Tables have different format_version", ErrorCodes::BAD_ARGUMENTS);
3478
3479 return *src_data;
3480}
3481
3482MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(const MergeTreeData::DataPartPtr & src_part,
3483 const String & tmp_part_prefix,
3484 const MergeTreePartInfo & dst_part_info)
3485{
3486 String dst_part_name = src_part->getNewName(dst_part_info);
3487 String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
3488
3489 auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk);
3490 String dst_part_path = getFullPathOnDisk(reservation->getDisk());
3491 Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
3492 Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();
3493
3494 if (Poco::File(dst_part_absolute_path).exists())
3495 throw Exception("Part in " + dst_part_absolute_path.toString() + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
3496
3497 LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
3498 localBackup(src_part_absolute_path, dst_part_absolute_path);
3499
3500 MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(
3501 *this, reservation->getDisk(), dst_part_name, dst_part_info);
3502
3503 dst_data_part->relative_path = tmp_dst_part_name;
3504 dst_data_part->is_temp = true;
3505
3506 dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
3507 dst_data_part->modification_time = Poco::File(dst_part_absolute_path).getLastModified().epochTime();
3508 return dst_data_part;
3509}
3510
3511String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
3512{
3513 return disk->getPath() + relative_data_path;
3514}
3515
3516
3517DiskPtr MergeTreeData::getDiskForPart(const String & part_name, const String & relative_path) const
3518{
3519 const auto disks = storage_policy->getDisks();
3520 for (const DiskPtr & disk : disks)
3521 {
3522 const auto disk_path = getFullPathOnDisk(disk);
3523 for (Poco::DirectoryIterator it = Poco::DirectoryIterator(disk_path + relative_path); it != Poco::DirectoryIterator(); ++it)
3524 if (it.name() == part_name)
3525 return disk;
3526 }
3527 return nullptr;
3528}
3529
3530
3531String MergeTreeData::getFullPathForPart(const String & part_name, const String & relative_path) const
3532{
3533 auto disk = getDiskForPart(part_name, relative_path);
3534 if (disk)
3535 return getFullPathOnDisk(disk) + relative_path;
3536 return "";
3537}
3538
3539Strings MergeTreeData::getDataPaths() const
3540{
3541 Strings res;
3542 auto disks = storage_policy->getDisks();
3543 for (const auto & disk : disks)
3544 res.push_back(getFullPathOnDisk(disk));
3545 return res;
3546}
3547
3548MergeTreeData::PathsWithDisks MergeTreeData::getDataPathsWithDisks() const
3549{
3550 PathsWithDisks res;
3551 auto disks = storage_policy->getDisks();
3552 for (const auto & disk : disks)
3553 res.emplace_back(getFullPathOnDisk(disk), disk);
3554 return res;
3555}
3556
3557void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context)
3558{
3559 String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
3560 String default_shadow_path = clickhouse_path + "shadow/";
3561 Poco::File(default_shadow_path).createDirectories();
3562 auto increment = Increment(default_shadow_path + "increment.txt").get(true);
3563
3564 /// Acquire a snapshot of active data parts to prevent removing while doing backup.
3565 const auto data_parts = getDataParts();
3566
3567 size_t parts_processed = 0;
3568 for (const auto & part : data_parts)
3569 {
3570 if (!matcher(part))
3571 continue;
3572
3573 String shadow_path = part->disk->getPath() + "shadow/";
3574
3575 Poco::File(shadow_path).createDirectories();
3576 String backup_path = shadow_path
3577 + (!with_name.empty()
3578 ? escapeForFileName(with_name)
3579 : toString(increment))
3580 + "/";
3581
3582 LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path);
3583
3584 String part_absolute_path = Poco::Path(part->getFullPath()).absolute().toString();
3585 String backup_part_absolute_path = backup_path
3586 + relative_data_path
3587 + part->relative_path;
3588 localBackup(part_absolute_path, backup_part_absolute_path);
3589 part->is_frozen.store(true, std::memory_order_relaxed);
3590 ++parts_processed;
3591 }
3592
3593 LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
3594}
3595
3596bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
3597{
3598 const auto settings = getSettings();
3599
3600 if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0)
3601 {
3602 if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive)
3603 return false;
3604 if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.is_adaptive)
3605 return false;
3606 }
3607 return true;
3608}
3609
3610void MergeTreeData::writePartLog(
3611 PartLogElement::Type type,
3612 const ExecutionStatus & execution_status,
3613 UInt64 elapsed_ns,
3614 const String & new_part_name,
3615 const DataPartPtr & result_part,
3616 const DataPartsVector & source_parts,
3617 const MergeListEntry * merge_entry)
3618try
3619{
3620 auto part_log = global_context.getPartLog(database_name);
3621 if (!part_log)
3622 return;
3623
3624 PartLogElement part_log_elem;
3625
3626 part_log_elem.event_type = type;
3627
3628 part_log_elem.error = static_cast<UInt16>(execution_status.code);
3629 part_log_elem.exception = execution_status.message;
3630
3631 part_log_elem.event_time = time(nullptr);
3632 /// TODO: Stop stopwatch in outer code to exclude ZK timings and so on
3633 part_log_elem.duration_ms = elapsed_ns / 1000000;
3634
3635 part_log_elem.database_name = database_name;
3636 part_log_elem.table_name = table_name;
3637 part_log_elem.partition_id = MergeTreePartInfo::fromPartName(new_part_name, format_version).partition_id;
3638 part_log_elem.part_name = new_part_name;
3639
3640 if (result_part)
3641 {
3642 part_log_elem.path_on_disk = result_part->getFullPath();
3643 part_log_elem.bytes_compressed_on_disk = result_part->bytes_on_disk;
3644 part_log_elem.rows = result_part->rows_count;
3645 }
3646
3647 part_log_elem.source_part_names.reserve(source_parts.size());
3648 for (const auto & source_part : source_parts)
3649 part_log_elem.source_part_names.push_back(source_part->name);
3650
3651 if (merge_entry)
3652 {
3653 part_log_elem.rows_read = (*merge_entry)->rows_read;
3654 part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;
3655
3656 part_log_elem.rows = (*merge_entry)->rows_written;
3657 part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
3658 }
3659
3660 part_log->add(part_log_elem);
3661}
3662catch (...)
3663{
3664 tryLogCurrentException(log, __PRETTY_FUNCTION__);
3665}
3666
3667MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_)
3668 : parts_to_move(std::move(moving_parts_)), data(data_)
3669{
3670 for (const auto & moving_part : parts_to_move)
3671 if (!data.currently_moving_parts.emplace(moving_part.part).second)
3672 throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
3673}
3674
3675MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
3676{
3677 std::lock_guard lock(data.moving_parts_mutex);
3678 for (const auto & moving_part : parts_to_move)
3679 {
3680 /// Something went completely wrong
3681 if (!data.currently_moving_parts.count(moving_part.part))
3682 std::terminate();
3683 data.currently_moving_parts.erase(moving_part.part);
3684 }
3685}
3686
3687bool MergeTreeData::selectPartsAndMove()
3688{
3689 if (parts_mover.moves_blocker.isCancelled())
3690 return false;
3691
3692 auto moving_tagger = selectPartsForMove();
3693 if (moving_tagger.parts_to_move.empty())
3694 return false;
3695
3696 return moveParts(std::move(moving_tagger));
3697}
3698
3699bool MergeTreeData::areBackgroundMovesNeeded() const
3700{
3701 return storage_policy->getVolumes().size() > 1;
3702}
3703
3704bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
3705{
3706 if (parts_mover.moves_blocker.isCancelled())
3707 return false;
3708
3709 auto moving_tagger = checkPartsForMove(parts, space);
3710 if (moving_tagger.parts_to_move.empty())
3711 return false;
3712
3713 return moveParts(std::move(moving_tagger));
3714}
3715
3716MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
3717{
3718 MergeTreeMovingParts parts_to_move;
3719
3720 auto can_move = [this](const DataPartPtr & part, String * reason) -> bool
3721 {
3722 if (partIsAssignedToBackgroundOperation(part))
3723 {
3724 *reason = "part already assigned to background operation.";
3725 return false;
3726 }
3727 if (currently_moving_parts.count(part))
3728 {
3729 *reason = "part is already moving.";
3730 return false;
3731 }
3732
3733 return true;
3734 };
3735
3736 std::lock_guard moving_lock(moving_parts_mutex);
3737
3738 parts_mover.selectPartsForMove(parts_to_move, can_move, moving_lock);
3739 return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this);
3740}
3741
3742MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const DataPartsVector & parts, SpacePtr space)
3743{
3744 std::lock_guard moving_lock(moving_parts_mutex);
3745
3746 MergeTreeMovingParts parts_to_move;
3747 for (const auto & part : parts)
3748 {
3749 auto reservation = space->reserve(part->bytes_on_disk);
3750 if (!reservation)
3751 throw Exception("Move is not possible. Not enough space on '" + space->getName() + "'", ErrorCodes::NOT_ENOUGH_SPACE);
3752
3753 auto reserved_disk = reservation->getDisk();
3754 String path_to_clone = getFullPathOnDisk(reserved_disk);
3755
3756 if (Poco::File(path_to_clone + part->name).exists())
3757 throw Exception(
3758 "Move is not possible: " + path_to_clone + part->name + " already exists",
3759 ErrorCodes::DIRECTORY_ALREADY_EXISTS);
3760
3761 if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part))
3762 throw Exception(
3763 "Cannot move part '" + part->name + "' because it's participating in background process",
3764 ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
3765
3766 parts_to_move.emplace_back(part, std::move(reservation));
3767 }
3768 return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this);
3769}
3770
3771bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger)
3772{
3773 LOG_INFO(log, "Got " << moving_tagger.parts_to_move.size() << " parts to move.");
3774
3775 for (const auto & moving_part : moving_tagger.parts_to_move)
3776 {
3777 Stopwatch stopwatch;
3778 DataPartPtr cloned_part;
3779
3780 auto write_part_log = [&](const ExecutionStatus & execution_status)
3781 {
3782 writePartLog(
3783 PartLogElement::Type::MOVE_PART,
3784 execution_status,
3785 stopwatch.elapsed(),
3786 moving_part.part->name,
3787 cloned_part,
3788 {moving_part.part},
3789 nullptr);
3790 };
3791
3792 try
3793 {
3794 cloned_part = parts_mover.clonePart(moving_part);
3795 parts_mover.swapClonedPart(cloned_part);
3796 write_part_log({});
3797 }
3798 catch (...)
3799 {
3800 write_part_log(ExecutionStatus::fromCurrentException());
3801 if (cloned_part)
3802 cloned_part->remove();
3803
3804 throw;
3805 }
3806 }
3807 return true;
3808}
3809
3810}
3811