1#include "StorageMergeTree.h"
2
3#include <Databases/IDatabase.h>
4#include <Common/escapeForFileName.h>
5#include <Common/typeid_cast.h>
6#include <Common/FieldVisitors.h>
7#include <Common/ThreadPool.h>
8#include <Interpreters/InterpreterAlterQuery.h>
9#include <Interpreters/PartLog.h>
10#include <Parsers/ASTCheckQuery.h>
11#include <Parsers/ASTFunction.h>
12#include <Parsers/ASTLiteral.h>
13#include <Parsers/ASTPartition.h>
14#include <Parsers/ASTSetQuery.h>
15#include <Parsers/queryToString.h>
16#include <Storages/MergeTree/MergeTreeData.h>
17#include <Storages/MergeTree/ActiveDataPartSet.h>
18#include <Storages/AlterCommands.h>
19#include <Storages/PartitionCommands.h>
20#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
21#include <Disks/DiskSpaceMonitor.h>
22#include <Storages/MergeTree/MergeList.h>
23#include <Storages/MergeTree/checkDataPart.h>
24#include <Poco/DirectoryIterator.h>
25#include <Poco/File.h>
26#include <optional>
27#include <Interpreters/MutationsInterpreter.h>
28#include <Processors/Pipe.h>
29
30
31namespace DB
32{
33
34namespace ErrorCodes
35{
36 extern const int ABORTED;
37 extern const int BAD_ARGUMENTS;
38 extern const int INCORRECT_DATA;
39 extern const int INCORRECT_FILE_NAME;
40 extern const int CANNOT_ASSIGN_OPTIMIZE;
41 extern const int INCOMPATIBLE_COLUMNS;
42 extern const int PART_IS_TEMPORARILY_LOCKED;
43 extern const int UNKNOWN_SETTING;
44 extern const int TOO_BIG_AST;
45}
46
47namespace ActionLocks
48{
49 extern const StorageActionBlockType PartsMerge;
50 extern const StorageActionBlockType PartsTTLMerge;
51 extern const StorageActionBlockType PartsMove;
52}
53
54
55StorageMergeTree::StorageMergeTree(
56 const String & database_name_,
57 const String & table_name_,
58 const String & relative_data_path_,
59 const StorageInMemoryMetadata & metadata,
60 bool attach,
61 Context & context_,
62 const String & date_column_name,
63 const MergingParams & merging_params_,
64 std::unique_ptr<MergeTreeSettings> storage_settings_,
65 bool has_force_restore_data_flag)
66 : MergeTreeData(
67 database_name_,
68 table_name_,
69 relative_data_path_,
70 metadata,
71 context_,
72 date_column_name,
73 merging_params_,
74 std::move(storage_settings_),
75 false,
76 attach)
77 , reader(*this)
78 , writer(*this)
79 , merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
80{
81 loadDataParts(has_force_restore_data_flag);
82
83 if (!attach && !getDataParts().empty())
84 throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
85
86 increment.set(getMaxBlockNumber());
87
88 loadMutations();
89}
90
91
92void StorageMergeTree::startup()
93{
94 clearOldPartsFromFilesystem();
95
96 /// Temporary directories contain incomplete results of merges (after forced restart)
97 /// and don't allow to reinitialize them, so delete each of them immediately
98 clearOldTemporaryDirectories(0);
99
100 /// NOTE background task will also do the above cleanups periodically.
101 time_after_previous_cleanup.restart();
102 merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
103 if (areBackgroundMovesNeeded())
104 moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
105}
106
107
108void StorageMergeTree::shutdown()
109{
110 if (shutdown_called)
111 return;
112 shutdown_called = true;
113 merger_mutator.merges_blocker.cancelForever();
114 parts_mover.moves_blocker.cancelForever();
115
116 if (merging_mutating_task_handle)
117 global_context.getBackgroundPool().removeTask(merging_mutating_task_handle);
118
119 if (moving_task_handle)
120 global_context.getBackgroundMovePool().removeTask(moving_task_handle);
121}
122
123
124StorageMergeTree::~StorageMergeTree()
125{
126 shutdown();
127}
128
129Pipes StorageMergeTree::readWithProcessors(
130 const Names & column_names,
131 const SelectQueryInfo & query_info,
132 const Context & context,
133 QueryProcessingStage::Enum /*processed_stage*/,
134 const size_t max_block_size,
135 const unsigned num_streams)
136{
137 return reader.read(column_names, query_info, context, max_block_size, num_streams);
138}
139
140std::optional<UInt64> StorageMergeTree::totalRows() const
141{
142 return getTotalActiveSizeInRows();
143}
144
145BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
146{
147 return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
148}
149
150void StorageMergeTree::checkTableCanBeDropped() const
151{
152 const_cast<StorageMergeTree &>(*this).recalculateColumnSizes();
153 global_context.checkTableCanBeDropped(database_name, table_name, getTotalActiveSizeInBytes());
154}
155
156void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
157{
158 const_cast<StorageMergeTree &>(*this).recalculateColumnSizes();
159
160 const String partition_id = getPartitionIDFromQuery(partition, global_context);
161 auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
162
163 UInt64 partition_size = 0;
164
165 for (const auto & part : parts_to_remove)
166 {
167 partition_size += part->bytes_on_disk;
168 }
169 global_context.checkPartitionCanBeDropped(database_name, table_name, partition_size);
170}
171
172void StorageMergeTree::drop(TableStructureWriteLockHolder &)
173{
174 shutdown();
175 dropAllData();
176}
177
178void StorageMergeTree::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
179{
180 {
181 /// Asks to complete merges and does not allow them to start.
182 /// This protects against "revival" of data for a removed partition after completion of merge.
183 auto merge_blocker = merger_mutator.merges_blocker.cancel();
184
185 /// NOTE: It's assumed that this method is called under lockForAlter.
186
187 auto parts_to_remove = getDataPartsVector();
188 removePartsFromWorkingSet(parts_to_remove, true);
189
190 LOG_INFO(log, "Removed " << parts_to_remove.size() << " parts.");
191 }
192
193 clearOldMutations(true);
194 clearOldPartsFromFilesystem();
195}
196
197
198std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepareAlterTransactions(
199 const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context)
200{
201 auto parts = getDataParts({MergeTreeDataPartState::PreCommitted,
202 MergeTreeDataPartState::Committed,
203 MergeTreeDataPartState::Outdated});
204 std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
205 transactions.reserve(parts.size());
206
207 const auto & columns_for_parts = new_columns.getAllPhysical();
208
209 const Settings & settings_ = context.getSettingsRef();
210 size_t thread_pool_size = std::min<size_t>(parts.size(), settings_.max_alter_threads);
211
212 std::optional<ThreadPool> thread_pool;
213
214 if (thread_pool_size > 1)
215 thread_pool.emplace(thread_pool_size);
216
217 for (const auto & part : parts)
218 {
219 transactions.push_back(std::make_unique<MergeTreeData::AlterDataPartTransaction>(part));
220
221 auto job = [this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices]
222 {
223 this->alterDataPart(columns_for_parts, new_indices, false, transaction);
224 };
225
226 if (thread_pool)
227 thread_pool->scheduleOrThrowOnError(job);
228 else
229 job();
230 }
231
232 if (thread_pool)
233 thread_pool->wait();
234
235 auto erase_pos = std::remove_if(transactions.begin(), transactions.end(),
236 [](const MergeTreeData::AlterDataPartTransactionPtr & transaction)
237 {
238 return !transaction->isValid();
239 });
240 transactions.erase(erase_pos, transactions.end());
241
242 return transactions;
243}
244
245void StorageMergeTree::alter(
246 const AlterCommands & params,
247 const Context & context,
248 TableStructureWriteLockHolder & table_lock_holder)
249{
250 const String current_database_name = getDatabaseName();
251 const String current_table_name = getTableName();
252
253 lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
254
255 StorageInMemoryMetadata metadata = getInMemoryMetadata();
256
257 params.apply(metadata);
258
259 /// Update metdata in memory
260 auto update_metadata = [&metadata, &table_lock_holder, this]()
261 {
262
263 changeSettings(metadata.settings_ast, table_lock_holder);
264 /// Reinitialize primary key because primary key column types might have changed.
265 setProperties(metadata);
266
267 setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
268 };
269
270 /// This alter can be performed at metadata level only
271 if (!params.isModifyingData())
272 {
273 lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
274
275 context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
276
277 update_metadata();
278 }
279 else
280 {
281
282 /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
283 /// Also block moves, because they can replace part with old state.
284 auto merge_blocker = merger_mutator.merges_blocker.cancel();
285 auto moves_blocked = parts_mover.moves_blocker.cancel();
286
287
288 auto transactions = prepareAlterTransactions(metadata.columns, metadata.indices, context);
289
290 lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
291
292 context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
293
294 update_metadata();
295
296 for (auto & transaction : transactions)
297 {
298 transaction->commit();
299 transaction.reset();
300 }
301
302 /// Columns sizes could be changed
303 recalculateColumnSizes();
304 }
305}
306
307
308/// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem.
309struct CurrentlyMergingPartsTagger
310{
311 FutureMergedMutatedPart future_part;
312 ReservationPtr reserved_space;
313
314 bool is_successful = false;
315 String exception_message;
316
317 StorageMergeTree & storage;
318
319public:
320 CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
321 : future_part(future_part_), storage(storage_)
322 {
323 /// Assume mutex is already locked, because this method is called from mergeTask.
324
325 /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
326 if (is_mutation)
327 reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->disk);
328 else
329 {
330 MergeTreeDataPart::TTLInfos ttl_infos;
331 for (auto & part_ptr : future_part_.parts)
332 ttl_infos.update(part_ptr->ttl_infos);
333
334 reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr));
335 }
336 if (!reserved_space)
337 {
338 if (is_mutation)
339 throw Exception("Not enough space for mutating part '" + future_part_.parts[0]->name + "'", ErrorCodes::NOT_ENOUGH_SPACE);
340 else
341 throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE);
342 }
343
344 future_part_.updatePath(storage, reserved_space);
345
346 for (const auto & part : future_part.parts)
347 {
348 if (storage.currently_merging_mutating_parts.count(part))
349 throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
350 }
351 storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end());
352 }
353
354 ~CurrentlyMergingPartsTagger()
355 {
356 std::lock_guard lock(storage.currently_processing_in_background_mutex);
357
358 for (const auto & part : future_part.parts)
359 {
360 if (!storage.currently_merging_mutating_parts.count(part))
361 std::terminate();
362 storage.currently_merging_mutating_parts.erase(part);
363 }
364
365 /// Update the information about failed parts in the system.mutations table.
366
367 Int64 sources_data_version = future_part.parts.at(0)->info.getDataVersion();
368 Int64 result_data_version = future_part.part_info.getDataVersion();
369 auto mutations_begin_it = storage.current_mutations_by_version.end();
370 auto mutations_end_it = storage.current_mutations_by_version.end();
371 if (sources_data_version != result_data_version)
372 {
373 mutations_begin_it = storage.current_mutations_by_version.upper_bound(sources_data_version);
374 mutations_end_it = storage.current_mutations_by_version.upper_bound(result_data_version);
375 }
376
377 for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
378 {
379 MergeTreeMutationEntry & entry = it->second;
380 if (is_successful)
381 {
382 if (!entry.latest_failed_part.empty() && future_part.part_info.contains(entry.latest_failed_part_info))
383 {
384 entry.latest_failed_part.clear();
385 entry.latest_failed_part_info = MergeTreePartInfo();
386 entry.latest_fail_time = 0;
387 entry.latest_fail_reason.clear();
388 }
389 }
390 else
391 {
392 entry.latest_failed_part = future_part.parts.at(0)->name;
393 entry.latest_failed_part_info = future_part.parts.at(0)->info;
394 entry.latest_fail_time = time(nullptr);
395 entry.latest_fail_reason = exception_message;
396 }
397 }
398 }
399};
400
401
402void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
403{
404 /// Choose any disk, because when we load mutations we search them at each disk
405 /// where storage can be placed. See loadMutations().
406 auto disk = storage_policy->getAnyDisk();
407 MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
408 String file_name;
409 Int64 version;
410 {
411 std::lock_guard lock(currently_processing_in_background_mutex);
412
413 version = increment.get();
414 entry.commit(version);
415 file_name = entry.file_name;
416 auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry));
417 current_mutations_by_version.emplace(version, insertion.first->second);
418 }
419
420 LOG_INFO(log, "Added mutation: " << file_name);
421 merging_mutating_task_handle->wake();
422
423 /// We have to wait mutation end
424 if (query_context.getSettingsRef().mutations_sync > 0)
425 {
426 LOG_INFO(log, "Waiting mutation: " << file_name);
427 auto check = [version, this]() { return isMutationDone(version); };
428 std::unique_lock lock(mutation_wait_mutex);
429 mutation_wait_event.wait(lock, check);
430 }
431}
432
433namespace
434{
435
436struct PartVersionWithName
437{
438 Int64 version;
439 String name;
440};
441
442bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
443{
444 return f.version < s.version;
445}
446
447}
448
449bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
450{
451 std::lock_guard lock(currently_processing_in_background_mutex);
452
453 /// Killed
454 if (!current_mutations_by_version.count(mutation_version))
455 return true;
456
457 auto data_parts = getDataPartsVector();
458 for (const auto & data_part : data_parts)
459 if (data_part->info.getDataVersion() < mutation_version)
460 return false;
461 return true;
462}
463
464std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
465{
466 std::lock_guard lock(currently_processing_in_background_mutex);
467
468 std::vector<PartVersionWithName> part_versions_with_names;
469 auto data_parts = getDataPartsVector();
470 part_versions_with_names.reserve(data_parts.size());
471 for (const auto & part : data_parts)
472 part_versions_with_names.emplace_back(PartVersionWithName{part->info.getDataVersion(), part->name});
473 std::sort(part_versions_with_names.begin(), part_versions_with_names.end(), comparator);
474
475 std::vector<MergeTreeMutationStatus> result;
476 for (const auto & kv : current_mutations_by_version)
477 {
478
479 Int64 mutation_version = kv.first;
480 const MergeTreeMutationEntry & entry = kv.second;
481 const PartVersionWithName needle{mutation_version, ""};
482 auto versions_it = std::lower_bound(
483 part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator);
484
485 size_t parts_to_do = versions_it - part_versions_with_names.begin();
486 Names parts_to_do_names;
487 parts_to_do_names.reserve(parts_to_do);
488 for (size_t i = 0; i < parts_to_do; ++i)
489 parts_to_do_names.push_back(part_versions_with_names[i].name);
490
491 std::map<String, Int64> block_numbers_map({{"", entry.block_number}});
492
493 for (const MutationCommand & command : entry.commands)
494 {
495 std::stringstream ss;
496 formatAST(*command.ast, ss, false, true);
497 result.push_back(MergeTreeMutationStatus
498 {
499 entry.file_name,
500 ss.str(),
501 entry.create_time,
502 block_numbers_map,
503 parts_to_do_names,
504 parts_to_do_names.empty(),
505 entry.latest_failed_part,
506 entry.latest_fail_time,
507 entry.latest_fail_reason,
508 });
509 }
510 }
511
512 return result;
513}
514
515CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
516{
517 LOG_TRACE(log, "Killing mutation " << mutation_id);
518
519 std::optional<MergeTreeMutationEntry> to_kill;
520 {
521 std::lock_guard lock(currently_processing_in_background_mutex);
522 auto it = current_mutations_by_id.find(mutation_id);
523 if (it != current_mutations_by_id.end())
524 {
525 to_kill.emplace(std::move(it->second));
526 current_mutations_by_id.erase(it);
527 current_mutations_by_version.erase(to_kill->block_number);
528 }
529 }
530
531 if (!to_kill)
532 return CancellationCode::NotFound;
533
534 global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
535 to_kill->removeFile();
536 LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
537 mutation_wait_event.notify_all();
538
539 /// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
540 merging_mutating_task_handle->wake();
541
542 return CancellationCode::CancelSent;
543}
544
545
546void StorageMergeTree::loadMutations()
547{
548 Poco::DirectoryIterator end;
549 const auto full_paths = getDataPaths();
550 for (const String & full_path : full_paths)
551 {
552 for (auto it = Poco::DirectoryIterator(full_path); it != end; ++it)
553 {
554 if (startsWith(it.name(), "mutation_"))
555 {
556 MergeTreeMutationEntry entry(full_path, it.name());
557 Int64 block_number = entry.block_number;
558 auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
559 current_mutations_by_version.emplace(block_number, insertion.first->second);
560 }
561 else if (startsWith(it.name(), "tmp_mutation_"))
562 {
563 it->remove();
564 }
565 }
566 }
567
568 if (!current_mutations_by_version.empty())
569 increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first);
570}
571
572
573bool StorageMergeTree::merge(
574 bool aggressive,
575 const String & partition_id,
576 bool final,
577 bool deduplicate,
578 String * out_disable_reason)
579{
580 auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
581
582 FutureMergedMutatedPart future_part;
583
584 /// You must call destructor with unlocked `currently_processing_in_background_mutex`.
585 std::optional<CurrentlyMergingPartsTagger> merging_tagger;
586
587 {
588 std::lock_guard lock(currently_processing_in_background_mutex);
589
590 auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *)
591 {
592 return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right)
593 && getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
594 };
595
596 bool selected = false;
597
598 if (partition_id.empty())
599 {
600 UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
601 if (max_source_parts_size > 0)
602 selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason);
603 else if (out_disable_reason)
604 *out_disable_reason = "Current value of max_source_parts_size is zero";
605 }
606 else
607 {
608 UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
609 selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
610 }
611
612 if (!selected)
613 {
614 if (out_disable_reason)
615 *out_disable_reason = "Cannot select parts for optimization";
616 return false;
617 }
618
619 merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
620 }
621
622 MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part);
623
624 /// Logging
625 Stopwatch stopwatch;
626 MutableDataPartPtr new_part;
627
628 auto write_part_log = [&] (const ExecutionStatus & execution_status)
629 {
630 writePartLog(
631 PartLogElement::MERGE_PARTS,
632 execution_status,
633 stopwatch.elapsed(),
634 future_part.name,
635 new_part,
636 future_part.parts,
637 merge_entry.get());
638 };
639
640 try
641 {
642 /// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
643 /// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
644 bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
645
646 new_part = merger_mutator.mergePartsToTemporaryPart(
647 future_part, *merge_entry, table_lock_holder, time(nullptr),
648 merging_tagger->reserved_space, deduplicate, force_ttl);
649 merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
650 removeEmptyColumnsFromPart(new_part);
651
652 merging_tagger->is_successful = true;
653 write_part_log({});
654 }
655 catch (...)
656 {
657 merging_tagger->exception_message = getCurrentExceptionMessage(false);
658 write_part_log(ExecutionStatus::fromCurrentException());
659 throw;
660 }
661
662 return true;
663}
664
665
666bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
667{
668 std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
669 return currently_merging_mutating_parts.count(part);
670}
671
672
673BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
674{
675 try
676 {
677 if (!selectPartsAndMove())
678 return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
679
680 return BackgroundProcessingPoolTaskResult::SUCCESS;
681 }
682 catch (...)
683 {
684 tryLogCurrentException(log);
685 return BackgroundProcessingPoolTaskResult::ERROR;
686 }
687}
688
689
690bool StorageMergeTree::tryMutatePart()
691{
692 auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
693 size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
694
695 FutureMergedMutatedPart future_part;
696 MutationCommands commands;
697 /// You must call destructor with unlocked `currently_processing_in_background_mutex`.
698 std::optional<CurrentlyMergingPartsTagger> tagger;
699 {
700 /// DataPart can be store only at one disk. Get Max of free space at all disks
701 UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
702
703 std::lock_guard lock(currently_processing_in_background_mutex);
704
705 if (current_mutations_by_version.empty())
706 return false;
707
708 auto mutations_end_it = current_mutations_by_version.end();
709 for (const auto & part : getDataPartsVector())
710 {
711 if (currently_merging_mutating_parts.count(part))
712 continue;
713
714 auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
715 if (mutations_begin_it == mutations_end_it)
716 continue;
717
718 if (merger_mutator.getMaxSourcePartSizeForMutation() > disk_space)
719 continue;
720
721 size_t current_ast_elements = 0;
722 for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
723 {
724 MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false);
725
726 size_t commands_size = interpreter.evaluateCommandsSize();
727 if (current_ast_elements + commands_size >= max_ast_elements)
728 break;
729
730 current_ast_elements += commands_size;
731 commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
732 }
733
734 auto new_part_info = part->info;
735 new_part_info.mutation = current_mutations_by_version.rbegin()->first;
736
737 future_part.parts.push_back(part);
738 future_part.part_info = new_part_info;
739 future_part.name = part->getNewName(new_part_info);
740
741 tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
742 break;
743 }
744 }
745
746 if (!tagger)
747 return false;
748
749 MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part);
750
751 Stopwatch stopwatch;
752 MutableDataPartPtr new_part;
753
754 auto write_part_log = [&] (const ExecutionStatus & execution_status)
755 {
756 writePartLog(
757 PartLogElement::MUTATE_PART,
758 execution_status,
759 stopwatch.elapsed(),
760 future_part.name,
761 new_part,
762 future_part.parts,
763 merge_entry.get());
764 };
765
766 try
767 {
768 new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context,
769 tagger->reserved_space, table_lock_holder);
770
771 renameTempPartAndReplace(new_part);
772 tagger->is_successful = true;
773 write_part_log({});
774
775 /// Notify all, who wait for this or previous mutations
776 mutation_wait_event.notify_all();
777 }
778 catch (...)
779 {
780 tagger->exception_message = getCurrentExceptionMessage(false);
781 write_part_log(ExecutionStatus::fromCurrentException());
782 throw;
783 }
784
785 return true;
786}
787
788
789BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
790{
791 if (shutdown_called)
792 return BackgroundProcessingPoolTaskResult::ERROR;
793
794 if (merger_mutator.merges_blocker.isCancelled())
795 return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
796
797 try
798 {
799 /// Clear old parts. It is unnecessary to do it more than once a second.
800 if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
801 {
802 {
803 /// TODO: Implement tryLockStructureForShare.
804 auto lock_structure = lockStructureForShare(false, "");
805 clearOldPartsFromFilesystem();
806 clearOldTemporaryDirectories();
807 }
808 clearOldMutations();
809 }
810
811 ///TODO: read deduplicate option from table config
812 if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
813 return BackgroundProcessingPoolTaskResult::SUCCESS;
814
815
816 if (tryMutatePart())
817 return BackgroundProcessingPoolTaskResult::SUCCESS;
818
819 return BackgroundProcessingPoolTaskResult::ERROR;
820 }
821 catch (const Exception & e)
822 {
823 if (e.code() == ErrorCodes::ABORTED)
824 {
825 LOG_INFO(log, e.message());
826 return BackgroundProcessingPoolTaskResult::ERROR;
827 }
828
829 throw;
830 }
831}
832
833Int64 StorageMergeTree::getCurrentMutationVersion(
834 const DataPartPtr & part,
835 std::lock_guard<std::mutex> & /* currently_processing_in_background_mutex_lock */) const
836{
837 auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
838 if (it == current_mutations_by_version.begin())
839 return 0;
840 --it;
841 return it->first;
842}
843
844void StorageMergeTree::clearOldMutations(bool truncate)
845{
846 const auto settings = getSettings();
847 if (!truncate && !settings->finished_mutations_to_keep)
848 return;
849
850 std::vector<MergeTreeMutationEntry> mutations_to_delete;
851 {
852 std::lock_guard lock(currently_processing_in_background_mutex);
853
854 if (!truncate && current_mutations_by_version.size() <= settings->finished_mutations_to_keep)
855 return;
856
857 auto end_it = current_mutations_by_version.end();
858 auto begin_it = current_mutations_by_version.begin();
859 size_t to_delete_count = std::distance(begin_it, end_it);
860
861 if (!truncate)
862 {
863 if (std::optional<Int64> min_version = getMinPartDataVersion())
864 end_it = current_mutations_by_version.upper_bound(*min_version);
865
866 size_t done_count = std::distance(begin_it, end_it);
867 if (done_count <= settings->finished_mutations_to_keep)
868 return;
869
870 to_delete_count = done_count - settings->finished_mutations_to_keep;
871 }
872
873 auto it = begin_it;
874 for (size_t i = 0; i < to_delete_count; ++i)
875 {
876 mutations_to_delete.push_back(std::move(it->second));
877 current_mutations_by_id.erase(mutations_to_delete.back().file_name);
878 it = current_mutations_by_version.erase(it);
879 }
880 }
881
882 for (auto & mutation : mutations_to_delete)
883 {
884 LOG_TRACE(log, "Removing mutation: " << mutation.file_name);
885 mutation.removeFile();
886 }
887}
888
889
890void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context)
891{
892 /// Asks to complete merges and moves and does not allow them to start.
893 /// This protects against "revival" of data for a removed partition after completion of merge.
894 auto merge_blocker = merger_mutator.merges_blocker.cancel();
895 auto move_blocker = parts_mover.moves_blocker.cancel();
896
897 /// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
898 auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
899
900 String partition_id = getPartitionIDFromQuery(partition, context);
901 auto parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
902
903 std::vector<AlterDataPartTransactionPtr> transactions;
904
905
906 StorageInMemoryMetadata metadata = getInMemoryMetadata();
907 alter_command.apply(metadata);
908
909 auto columns_for_parts = metadata.columns.getAllPhysical();
910 for (const auto & part : parts)
911 {
912 if (part->info.partition_id != partition_id)
913 throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
914
915 MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
916 alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
917 if (transaction->isValid())
918 transactions.push_back(std::move(transaction));
919
920 if (alter_command.type == AlterCommand::DROP_COLUMN)
921 LOG_DEBUG(log, "Removing column " << alter_command.column_name << " from part " << part->name);
922 else if (alter_command.type == AlterCommand::DROP_INDEX)
923 LOG_DEBUG(log, "Removing index " << alter_command.index_name << " from part " << part->name);
924 }
925
926 if (transactions.empty())
927 return;
928
929 for (auto & transaction : transactions)
930 {
931 transaction->commit();
932 transaction.reset();
933 }
934
935 /// Recalculate columns size (not only for the modified column)
936 recalculateColumnSizes();
937}
938
939
940bool StorageMergeTree::optimize(
941 const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
942{
943 String disable_reason;
944 if (!partition && final)
945 {
946 DataPartsVector data_parts = getDataPartsVector();
947 std::unordered_set<String> partition_ids;
948
949 for (const DataPartPtr & part : data_parts)
950 partition_ids.emplace(part->info.partition_id);
951
952 for (const String & partition_id : partition_ids)
953 {
954 if (!merge(true, partition_id, true, deduplicate, &disable_reason))
955 {
956 std::stringstream message;
957 message << "Cannot OPTIMIZE table";
958 if (!disable_reason.empty())
959 message << ": " << disable_reason;
960 else
961 message << " by some reason.";
962 LOG_INFO(log, message.rdbuf());
963
964 if (context.getSettingsRef().optimize_throw_if_noop)
965 throw Exception(message.str(), ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
966 return false;
967 }
968 }
969 }
970 else
971 {
972 String partition_id;
973 if (partition)
974 partition_id = getPartitionIDFromQuery(partition, context);
975
976 if (!merge(true, partition_id, final, deduplicate, &disable_reason))
977 {
978 std::stringstream message;
979 message << "Cannot OPTIMIZE table";
980 if (!disable_reason.empty())
981 message << ": " << disable_reason;
982 else
983 message << " by some reason.";
984 LOG_INFO(log, message.rdbuf());
985
986 if (context.getSettingsRef().optimize_throw_if_noop)
987 throw Exception(message.str(), ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
988 return false;
989 }
990 }
991
992 return true;
993}
994
995void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context)
996{
997 for (const PartitionCommand & command : commands)
998 {
999 switch (command.type)
1000 {
1001 case PartitionCommand::DROP_PARTITION:
1002 checkPartitionCanBeDropped(command.partition);
1003 dropPartition(command.partition, command.detach, context);
1004 break;
1005
1006 case PartitionCommand::DROP_DETACHED_PARTITION:
1007 dropDetached(command.partition, command.part, context);
1008 break;
1009
1010 case PartitionCommand::ATTACH_PARTITION:
1011 attachPartition(command.partition, command.part, context);
1012 break;
1013
1014 case PartitionCommand::MOVE_PARTITION:
1015 {
1016 switch (command.move_destination_type)
1017 {
1018 case PartitionCommand::MoveDestinationType::DISK:
1019 movePartitionToDisk(command.partition, command.move_destination_name, command.part, context);
1020 break;
1021
1022 case PartitionCommand::MoveDestinationType::VOLUME:
1023 movePartitionToVolume(command.partition, command.move_destination_name, command.part, context);
1024 break;
1025 }
1026
1027 }
1028 break;
1029
1030 case PartitionCommand::REPLACE_PARTITION:
1031 {
1032 checkPartitionCanBeDropped(command.partition);
1033 String from_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database;
1034 auto from_storage = context.getTable(from_database, command.from_table);
1035 replacePartitionFrom(from_storage, command.partition, command.replace, context);
1036 }
1037 break;
1038
1039 case PartitionCommand::FREEZE_PARTITION:
1040 {
1041 auto lock = lockStructureForShare(false, context.getCurrentQueryId());
1042 freezePartition(command.partition, command.with_name, context, lock);
1043 }
1044 break;
1045
1046 case PartitionCommand::CLEAR_COLUMN:
1047 {
1048 AlterCommand alter_command;
1049 alter_command.type = AlterCommand::DROP_COLUMN;
1050 alter_command.column_name = get<String>(command.column_name);
1051 clearColumnOrIndexInPartition(command.partition, alter_command, context);
1052 }
1053 break;
1054
1055 case PartitionCommand::CLEAR_INDEX:
1056 {
1057 AlterCommand alter_command;
1058 alter_command.type = AlterCommand::DROP_INDEX;
1059 alter_command.index_name = get<String>(command.index_name);
1060 clearColumnOrIndexInPartition(command.partition, alter_command, context);
1061 }
1062 break;
1063
1064 case PartitionCommand::FREEZE_ALL_PARTITIONS:
1065 {
1066 auto lock = lockStructureForShare(false, context.getCurrentQueryId());
1067 freezeAll(command.with_name, context, lock);
1068 }
1069 break;
1070
1071 default:
1072 IStorage::alterPartition(query, commands, context); // should throw an exception.
1073 }
1074 }
1075}
1076
1077void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context)
1078{
1079 {
1080 /// Asks to complete merges and does not allow them to start.
1081 /// This protects against "revival" of data for a removed partition after completion of merge.
1082 auto merge_blocker = merger_mutator.merges_blocker.cancel();
1083 /// Waits for completion of merge and does not start new ones.
1084 auto lock = lockExclusively(context.getCurrentQueryId());
1085
1086 String partition_id = getPartitionIDFromQuery(partition, context);
1087
1088 /// TODO: should we include PreComitted parts like in Replicated case?
1089 auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
1090 // TODO should we throw an exception if parts_to_remove is empty?
1091 removePartsFromWorkingSet(parts_to_remove, true);
1092
1093 if (detach)
1094 {
1095 /// If DETACH clone parts to detached/ directory
1096 for (const auto & part : parts_to_remove)
1097 {
1098 LOG_INFO(log, "Detaching " << part->relative_path);
1099 part->makeCloneInDetached("");
1100 }
1101 }
1102
1103 LOG_INFO(log, (detach ? "Detached " : "Removed ") << parts_to_remove.size() << " parts inside partition ID " << partition_id << ".");
1104 }
1105
1106 clearOldPartsFromFilesystem();
1107}
1108
1109
1110void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_part, const Context & context)
1111{
1112 // TODO: should get some locks to prevent race with 'alter … modify column'
1113
1114 PartsTemporaryRename renamed_parts(*this, "detached/");
1115 MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts);
1116
1117 for (size_t i = 0; i < loaded_parts.size(); ++i)
1118 {
1119 LOG_INFO(log, "Attaching part " << loaded_parts[i]->name << " from " << renamed_parts.old_and_new_names[i].second);
1120 renameTempPartAndAdd(loaded_parts[i], &increment);
1121 renamed_parts.old_and_new_names[i].first.clear();
1122 LOG_INFO(log, "Finished attaching part");
1123 }
1124
1125 /// New parts with other data may appear in place of deleted parts.
1126 context.dropCaches();
1127}
1128
1129void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
1130{
1131 auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
1132 auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
1133
1134 Stopwatch watch;
1135 MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
1136 String partition_id = getPartitionIDFromQuery(partition, context);
1137
1138 DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
1139 MutableDataPartsVector dst_parts;
1140
1141 static const String TMP_PREFIX = "tmp_replace_from_";
1142
1143 for (const DataPartPtr & src_part : src_parts)
1144 {
1145 if (!canReplacePartition(src_part))
1146 throw Exception(
1147 "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
1148 ErrorCodes::LOGICAL_ERROR);
1149
1150 /// This will generate unique name in scope of current server process.
1151 Int64 temp_index = insert_increment.get();
1152 MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
1153
1154 std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
1155 dst_parts.emplace_back(cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info));
1156 }
1157
1158 /// ATTACH empty part set
1159 if (!replace && dst_parts.empty())
1160 return;
1161
1162 MergeTreePartInfo drop_range;
1163 if (replace)
1164 {
1165 drop_range.partition_id = partition_id;
1166 drop_range.min_block = 0;
1167 drop_range.max_block = increment.get(); // there will be a "hole" in block numbers
1168 drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
1169 }
1170
1171 /// Atomically add new parts and remove old ones
1172 try
1173 {
1174 {
1175 /// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
1176 /// and we should be able to rollback already added (Precomitted) parts
1177 Transaction transaction(*this);
1178
1179 auto data_parts_lock = lockParts();
1180
1181 /// Populate transaction
1182 for (MutableDataPartPtr & part : dst_parts)
1183 renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock);
1184
1185 transaction.commit(&data_parts_lock);
1186
1187 /// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
1188 if (replace)
1189 removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
1190 }
1191
1192 PartLog::addNewParts(global_context, dst_parts, watch.elapsed());
1193 }
1194 catch (...)
1195 {
1196 PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
1197 throw;
1198 }
1199}
1200
1201
1202ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
1203{
1204 if (action_type == ActionLocks::PartsMerge)
1205 return merger_mutator.merges_blocker.cancel();
1206 else if (action_type == ActionLocks::PartsTTLMerge)
1207 return merger_mutator.ttl_merges_blocker.cancel();
1208 else if (action_type == ActionLocks::PartsMove)
1209 return parts_mover.moves_blocker.cancel();
1210
1211 return {};
1212}
1213
1214CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & context)
1215{
1216 CheckResults results;
1217 DataPartsVector data_parts;
1218 if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
1219 {
1220 String partition_id = getPartitionIDFromQuery(check_query.partition, context);
1221 data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
1222 }
1223 else
1224 data_parts = getDataPartsVector();
1225
1226 for (auto & part : data_parts)
1227 {
1228 String full_part_path = part->getFullPath();
1229 /// If the checksums file is not present, calculate the checksums and write them to disk.
1230 String checksums_path = full_part_path + "checksums.txt";
1231 String tmp_checksums_path = full_part_path + "checksums.txt.tmp";
1232 if (!Poco::File(checksums_path).exists())
1233 {
1234 try
1235 {
1236 auto calculated_checksums = checkDataPart(part, false, primary_key_data_types, skip_indices);
1237 calculated_checksums.checkEqual(part->checksums, true);
1238 WriteBufferFromFile out(tmp_checksums_path, 4096);
1239 part->checksums.write(out);
1240 Poco::File(tmp_checksums_path).renameTo(checksums_path);
1241 results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
1242 }
1243 catch (const Exception & ex)
1244 {
1245 Poco::File tmp_file(tmp_checksums_path);
1246 if (tmp_file.exists())
1247 tmp_file.remove();
1248
1249 results.emplace_back(part->name, false,
1250 "Check of part finished with error: '" + ex.message() + "'");
1251 }
1252 }
1253 else
1254 {
1255 try
1256 {
1257 checkDataPart(part, true, primary_key_data_types, skip_indices);
1258 results.emplace_back(part->name, true, "");
1259 }
1260 catch (const Exception & ex)
1261 {
1262 results.emplace_back(part->name, false, ex.message());
1263 }
1264 }
1265 }
1266 return results;
1267}
1268
1269}
1270