1 | #include "StorageMergeTree.h" |
2 | |
3 | #include <Databases/IDatabase.h> |
4 | #include <Common/escapeForFileName.h> |
5 | #include <Common/typeid_cast.h> |
6 | #include <Common/FieldVisitors.h> |
7 | #include <Common/ThreadPool.h> |
8 | #include <Interpreters/InterpreterAlterQuery.h> |
9 | #include <Interpreters/PartLog.h> |
10 | #include <Parsers/ASTCheckQuery.h> |
11 | #include <Parsers/ASTFunction.h> |
12 | #include <Parsers/ASTLiteral.h> |
13 | #include <Parsers/ASTPartition.h> |
14 | #include <Parsers/ASTSetQuery.h> |
15 | #include <Parsers/queryToString.h> |
16 | #include <Storages/MergeTree/MergeTreeData.h> |
17 | #include <Storages/MergeTree/ActiveDataPartSet.h> |
18 | #include <Storages/AlterCommands.h> |
19 | #include <Storages/PartitionCommands.h> |
20 | #include <Storages/MergeTree/MergeTreeBlockOutputStream.h> |
21 | #include <Disks/DiskSpaceMonitor.h> |
22 | #include <Storages/MergeTree/MergeList.h> |
23 | #include <Storages/MergeTree/checkDataPart.h> |
24 | #include <Poco/DirectoryIterator.h> |
25 | #include <Poco/File.h> |
26 | #include <optional> |
27 | #include <Interpreters/MutationsInterpreter.h> |
28 | #include <Processors/Pipe.h> |
29 | |
30 | |
31 | namespace DB |
32 | { |
33 | |
34 | namespace ErrorCodes |
35 | { |
36 | extern const int ABORTED; |
37 | extern const int BAD_ARGUMENTS; |
38 | extern const int INCORRECT_DATA; |
39 | extern const int INCORRECT_FILE_NAME; |
40 | extern const int CANNOT_ASSIGN_OPTIMIZE; |
41 | extern const int INCOMPATIBLE_COLUMNS; |
42 | extern const int PART_IS_TEMPORARILY_LOCKED; |
43 | extern const int UNKNOWN_SETTING; |
44 | extern const int TOO_BIG_AST; |
45 | } |
46 | |
47 | namespace ActionLocks |
48 | { |
49 | extern const StorageActionBlockType PartsMerge; |
50 | extern const StorageActionBlockType PartsTTLMerge; |
51 | extern const StorageActionBlockType PartsMove; |
52 | } |
53 | |
54 | |
55 | StorageMergeTree::StorageMergeTree( |
56 | const String & database_name_, |
57 | const String & table_name_, |
58 | const String & relative_data_path_, |
59 | const StorageInMemoryMetadata & metadata, |
60 | bool attach, |
61 | Context & context_, |
62 | const String & date_column_name, |
63 | const MergingParams & merging_params_, |
64 | std::unique_ptr<MergeTreeSettings> storage_settings_, |
65 | bool has_force_restore_data_flag) |
66 | : MergeTreeData( |
67 | database_name_, |
68 | table_name_, |
69 | relative_data_path_, |
70 | metadata, |
71 | context_, |
72 | date_column_name, |
73 | merging_params_, |
74 | std::move(storage_settings_), |
75 | false, |
76 | attach) |
77 | , reader(*this) |
78 | , writer(*this) |
79 | , merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()) |
80 | { |
81 | loadDataParts(has_force_restore_data_flag); |
82 | |
83 | if (!attach && !getDataParts().empty()) |
84 | throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts." , ErrorCodes::INCORRECT_DATA); |
85 | |
86 | increment.set(getMaxBlockNumber()); |
87 | |
88 | loadMutations(); |
89 | } |
90 | |
91 | |
92 | void StorageMergeTree::startup() |
93 | { |
94 | clearOldPartsFromFilesystem(); |
95 | |
96 | /// Temporary directories contain incomplete results of merges (after forced restart) |
97 | /// and don't allow to reinitialize them, so delete each of them immediately |
98 | clearOldTemporaryDirectories(0); |
99 | |
100 | /// NOTE background task will also do the above cleanups periodically. |
101 | time_after_previous_cleanup.restart(); |
102 | merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); }); |
103 | if (areBackgroundMovesNeeded()) |
104 | moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); }); |
105 | } |
106 | |
107 | |
108 | void StorageMergeTree::shutdown() |
109 | { |
110 | if (shutdown_called) |
111 | return; |
112 | shutdown_called = true; |
113 | merger_mutator.merges_blocker.cancelForever(); |
114 | parts_mover.moves_blocker.cancelForever(); |
115 | |
116 | if (merging_mutating_task_handle) |
117 | global_context.getBackgroundPool().removeTask(merging_mutating_task_handle); |
118 | |
119 | if (moving_task_handle) |
120 | global_context.getBackgroundMovePool().removeTask(moving_task_handle); |
121 | } |
122 | |
123 | |
124 | StorageMergeTree::~StorageMergeTree() |
125 | { |
126 | shutdown(); |
127 | } |
128 | |
129 | Pipes StorageMergeTree::readWithProcessors( |
130 | const Names & column_names, |
131 | const SelectQueryInfo & query_info, |
132 | const Context & context, |
133 | QueryProcessingStage::Enum /*processed_stage*/, |
134 | const size_t max_block_size, |
135 | const unsigned num_streams) |
136 | { |
137 | return reader.read(column_names, query_info, context, max_block_size, num_streams); |
138 | } |
139 | |
140 | std::optional<UInt64> StorageMergeTree::totalRows() const |
141 | { |
142 | return getTotalActiveSizeInRows(); |
143 | } |
144 | |
145 | BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context) |
146 | { |
147 | return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block); |
148 | } |
149 | |
150 | void StorageMergeTree::checkTableCanBeDropped() const |
151 | { |
152 | const_cast<StorageMergeTree &>(*this).recalculateColumnSizes(); |
153 | global_context.checkTableCanBeDropped(database_name, table_name, getTotalActiveSizeInBytes()); |
154 | } |
155 | |
156 | void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition) |
157 | { |
158 | const_cast<StorageMergeTree &>(*this).recalculateColumnSizes(); |
159 | |
160 | const String partition_id = getPartitionIDFromQuery(partition, global_context); |
161 | auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); |
162 | |
163 | UInt64 partition_size = 0; |
164 | |
165 | for (const auto & part : parts_to_remove) |
166 | { |
167 | partition_size += part->bytes_on_disk; |
168 | } |
169 | global_context.checkPartitionCanBeDropped(database_name, table_name, partition_size); |
170 | } |
171 | |
172 | void StorageMergeTree::drop(TableStructureWriteLockHolder &) |
173 | { |
174 | shutdown(); |
175 | dropAllData(); |
176 | } |
177 | |
178 | void StorageMergeTree::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
179 | { |
180 | { |
181 | /// Asks to complete merges and does not allow them to start. |
182 | /// This protects against "revival" of data for a removed partition after completion of merge. |
183 | auto merge_blocker = merger_mutator.merges_blocker.cancel(); |
184 | |
185 | /// NOTE: It's assumed that this method is called under lockForAlter. |
186 | |
187 | auto parts_to_remove = getDataPartsVector(); |
188 | removePartsFromWorkingSet(parts_to_remove, true); |
189 | |
190 | LOG_INFO(log, "Removed " << parts_to_remove.size() << " parts." ); |
191 | } |
192 | |
193 | clearOldMutations(true); |
194 | clearOldPartsFromFilesystem(); |
195 | } |
196 | |
197 | |
198 | std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepareAlterTransactions( |
199 | const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context) |
200 | { |
201 | auto parts = getDataParts({MergeTreeDataPartState::PreCommitted, |
202 | MergeTreeDataPartState::Committed, |
203 | MergeTreeDataPartState::Outdated}); |
204 | std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions; |
205 | transactions.reserve(parts.size()); |
206 | |
207 | const auto & columns_for_parts = new_columns.getAllPhysical(); |
208 | |
209 | const Settings & settings_ = context.getSettingsRef(); |
210 | size_t thread_pool_size = std::min<size_t>(parts.size(), settings_.max_alter_threads); |
211 | |
212 | std::optional<ThreadPool> thread_pool; |
213 | |
214 | if (thread_pool_size > 1) |
215 | thread_pool.emplace(thread_pool_size); |
216 | |
217 | for (const auto & part : parts) |
218 | { |
219 | transactions.push_back(std::make_unique<MergeTreeData::AlterDataPartTransaction>(part)); |
220 | |
221 | auto job = [this, & transaction = transactions.back(), & columns_for_parts, & new_indices = new_indices.indices] |
222 | { |
223 | this->alterDataPart(columns_for_parts, new_indices, false, transaction); |
224 | }; |
225 | |
226 | if (thread_pool) |
227 | thread_pool->scheduleOrThrowOnError(job); |
228 | else |
229 | job(); |
230 | } |
231 | |
232 | if (thread_pool) |
233 | thread_pool->wait(); |
234 | |
235 | auto erase_pos = std::remove_if(transactions.begin(), transactions.end(), |
236 | [](const MergeTreeData::AlterDataPartTransactionPtr & transaction) |
237 | { |
238 | return !transaction->isValid(); |
239 | }); |
240 | transactions.erase(erase_pos, transactions.end()); |
241 | |
242 | return transactions; |
243 | } |
244 | |
245 | void StorageMergeTree::alter( |
246 | const AlterCommands & params, |
247 | const Context & context, |
248 | TableStructureWriteLockHolder & table_lock_holder) |
249 | { |
250 | const String current_database_name = getDatabaseName(); |
251 | const String current_table_name = getTableName(); |
252 | |
253 | lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId()); |
254 | |
255 | StorageInMemoryMetadata metadata = getInMemoryMetadata(); |
256 | |
257 | params.apply(metadata); |
258 | |
259 | /// Update metdata in memory |
260 | auto update_metadata = [&metadata, &table_lock_holder, this]() |
261 | { |
262 | |
263 | changeSettings(metadata.settings_ast, table_lock_holder); |
264 | /// Reinitialize primary key because primary key column types might have changed. |
265 | setProperties(metadata); |
266 | |
267 | setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast); |
268 | }; |
269 | |
270 | /// This alter can be performed at metadata level only |
271 | if (!params.isModifyingData()) |
272 | { |
273 | lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); |
274 | |
275 | context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata); |
276 | |
277 | update_metadata(); |
278 | } |
279 | else |
280 | { |
281 | |
282 | /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. |
283 | /// Also block moves, because they can replace part with old state. |
284 | auto merge_blocker = merger_mutator.merges_blocker.cancel(); |
285 | auto moves_blocked = parts_mover.moves_blocker.cancel(); |
286 | |
287 | |
288 | auto transactions = prepareAlterTransactions(metadata.columns, metadata.indices, context); |
289 | |
290 | lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); |
291 | |
292 | context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata); |
293 | |
294 | update_metadata(); |
295 | |
296 | for (auto & transaction : transactions) |
297 | { |
298 | transaction->commit(); |
299 | transaction.reset(); |
300 | } |
301 | |
302 | /// Columns sizes could be changed |
303 | recalculateColumnSizes(); |
304 | } |
305 | } |
306 | |
307 | |
308 | /// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem. |
309 | struct CurrentlyMergingPartsTagger |
310 | { |
311 | FutureMergedMutatedPart future_part; |
312 | ReservationPtr reserved_space; |
313 | |
314 | bool is_successful = false; |
315 | String exception_message; |
316 | |
317 | StorageMergeTree & storage; |
318 | |
319 | public: |
320 | CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) |
321 | : future_part(future_part_), storage(storage_) |
322 | { |
323 | /// Assume mutex is already locked, because this method is called from mergeTask. |
324 | |
325 | /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks |
326 | if (is_mutation) |
327 | reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->disk); |
328 | else |
329 | { |
330 | MergeTreeDataPart::TTLInfos ttl_infos; |
331 | for (auto & part_ptr : future_part_.parts) |
332 | ttl_infos.update(part_ptr->ttl_infos); |
333 | |
334 | reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr)); |
335 | } |
336 | if (!reserved_space) |
337 | { |
338 | if (is_mutation) |
339 | throw Exception("Not enough space for mutating part '" + future_part_.parts[0]->name + "'" , ErrorCodes::NOT_ENOUGH_SPACE); |
340 | else |
341 | throw Exception("Not enough space for merging parts" , ErrorCodes::NOT_ENOUGH_SPACE); |
342 | } |
343 | |
344 | future_part_.updatePath(storage, reserved_space); |
345 | |
346 | for (const auto & part : future_part.parts) |
347 | { |
348 | if (storage.currently_merging_mutating_parts.count(part)) |
349 | throw Exception("Tagging already tagged part " + part->name + ". This is a bug." , ErrorCodes::LOGICAL_ERROR); |
350 | } |
351 | storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end()); |
352 | } |
353 | |
354 | ~CurrentlyMergingPartsTagger() |
355 | { |
356 | std::lock_guard lock(storage.currently_processing_in_background_mutex); |
357 | |
358 | for (const auto & part : future_part.parts) |
359 | { |
360 | if (!storage.currently_merging_mutating_parts.count(part)) |
361 | std::terminate(); |
362 | storage.currently_merging_mutating_parts.erase(part); |
363 | } |
364 | |
365 | /// Update the information about failed parts in the system.mutations table. |
366 | |
367 | Int64 sources_data_version = future_part.parts.at(0)->info.getDataVersion(); |
368 | Int64 result_data_version = future_part.part_info.getDataVersion(); |
369 | auto mutations_begin_it = storage.current_mutations_by_version.end(); |
370 | auto mutations_end_it = storage.current_mutations_by_version.end(); |
371 | if (sources_data_version != result_data_version) |
372 | { |
373 | mutations_begin_it = storage.current_mutations_by_version.upper_bound(sources_data_version); |
374 | mutations_end_it = storage.current_mutations_by_version.upper_bound(result_data_version); |
375 | } |
376 | |
377 | for (auto it = mutations_begin_it; it != mutations_end_it; ++it) |
378 | { |
379 | MergeTreeMutationEntry & entry = it->second; |
380 | if (is_successful) |
381 | { |
382 | if (!entry.latest_failed_part.empty() && future_part.part_info.contains(entry.latest_failed_part_info)) |
383 | { |
384 | entry.latest_failed_part.clear(); |
385 | entry.latest_failed_part_info = MergeTreePartInfo(); |
386 | entry.latest_fail_time = 0; |
387 | entry.latest_fail_reason.clear(); |
388 | } |
389 | } |
390 | else |
391 | { |
392 | entry.latest_failed_part = future_part.parts.at(0)->name; |
393 | entry.latest_failed_part_info = future_part.parts.at(0)->info; |
394 | entry.latest_fail_time = time(nullptr); |
395 | entry.latest_fail_reason = exception_message; |
396 | } |
397 | } |
398 | } |
399 | }; |
400 | |
401 | |
402 | void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context) |
403 | { |
404 | /// Choose any disk, because when we load mutations we search them at each disk |
405 | /// where storage can be placed. See loadMutations(). |
406 | auto disk = storage_policy->getAnyDisk(); |
407 | MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get()); |
408 | String file_name; |
409 | Int64 version; |
410 | { |
411 | std::lock_guard lock(currently_processing_in_background_mutex); |
412 | |
413 | version = increment.get(); |
414 | entry.commit(version); |
415 | file_name = entry.file_name; |
416 | auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry)); |
417 | current_mutations_by_version.emplace(version, insertion.first->second); |
418 | } |
419 | |
420 | LOG_INFO(log, "Added mutation: " << file_name); |
421 | merging_mutating_task_handle->wake(); |
422 | |
423 | /// We have to wait mutation end |
424 | if (query_context.getSettingsRef().mutations_sync > 0) |
425 | { |
426 | LOG_INFO(log, "Waiting mutation: " << file_name); |
427 | auto check = [version, this]() { return isMutationDone(version); }; |
428 | std::unique_lock lock(mutation_wait_mutex); |
429 | mutation_wait_event.wait(lock, check); |
430 | } |
431 | } |
432 | |
433 | namespace |
434 | { |
435 | |
436 | struct PartVersionWithName |
437 | { |
438 | Int64 version; |
439 | String name; |
440 | }; |
441 | |
442 | bool comparator(const PartVersionWithName & f, const PartVersionWithName & s) |
443 | { |
444 | return f.version < s.version; |
445 | } |
446 | |
447 | } |
448 | |
449 | bool StorageMergeTree::isMutationDone(Int64 mutation_version) const |
450 | { |
451 | std::lock_guard lock(currently_processing_in_background_mutex); |
452 | |
453 | /// Killed |
454 | if (!current_mutations_by_version.count(mutation_version)) |
455 | return true; |
456 | |
457 | auto data_parts = getDataPartsVector(); |
458 | for (const auto & data_part : data_parts) |
459 | if (data_part->info.getDataVersion() < mutation_version) |
460 | return false; |
461 | return true; |
462 | } |
463 | |
464 | std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const |
465 | { |
466 | std::lock_guard lock(currently_processing_in_background_mutex); |
467 | |
468 | std::vector<PartVersionWithName> part_versions_with_names; |
469 | auto data_parts = getDataPartsVector(); |
470 | part_versions_with_names.reserve(data_parts.size()); |
471 | for (const auto & part : data_parts) |
472 | part_versions_with_names.emplace_back(PartVersionWithName{part->info.getDataVersion(), part->name}); |
473 | std::sort(part_versions_with_names.begin(), part_versions_with_names.end(), comparator); |
474 | |
475 | std::vector<MergeTreeMutationStatus> result; |
476 | for (const auto & kv : current_mutations_by_version) |
477 | { |
478 | |
479 | Int64 mutation_version = kv.first; |
480 | const MergeTreeMutationEntry & entry = kv.second; |
481 | const PartVersionWithName needle{mutation_version, "" }; |
482 | auto versions_it = std::lower_bound( |
483 | part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator); |
484 | |
485 | size_t parts_to_do = versions_it - part_versions_with_names.begin(); |
486 | Names parts_to_do_names; |
487 | parts_to_do_names.reserve(parts_to_do); |
488 | for (size_t i = 0; i < parts_to_do; ++i) |
489 | parts_to_do_names.push_back(part_versions_with_names[i].name); |
490 | |
491 | std::map<String, Int64> block_numbers_map({{"" , entry.block_number}}); |
492 | |
493 | for (const MutationCommand & command : entry.commands) |
494 | { |
495 | std::stringstream ss; |
496 | formatAST(*command.ast, ss, false, true); |
497 | result.push_back(MergeTreeMutationStatus |
498 | { |
499 | entry.file_name, |
500 | ss.str(), |
501 | entry.create_time, |
502 | block_numbers_map, |
503 | parts_to_do_names, |
504 | parts_to_do_names.empty(), |
505 | entry.latest_failed_part, |
506 | entry.latest_fail_time, |
507 | entry.latest_fail_reason, |
508 | }); |
509 | } |
510 | } |
511 | |
512 | return result; |
513 | } |
514 | |
515 | CancellationCode StorageMergeTree::killMutation(const String & mutation_id) |
516 | { |
517 | LOG_TRACE(log, "Killing mutation " << mutation_id); |
518 | |
519 | std::optional<MergeTreeMutationEntry> to_kill; |
520 | { |
521 | std::lock_guard lock(currently_processing_in_background_mutex); |
522 | auto it = current_mutations_by_id.find(mutation_id); |
523 | if (it != current_mutations_by_id.end()) |
524 | { |
525 | to_kill.emplace(std::move(it->second)); |
526 | current_mutations_by_id.erase(it); |
527 | current_mutations_by_version.erase(to_kill->block_number); |
528 | } |
529 | } |
530 | |
531 | if (!to_kill) |
532 | return CancellationCode::NotFound; |
533 | |
534 | global_context.getMergeList().cancelPartMutations({}, to_kill->block_number); |
535 | to_kill->removeFile(); |
536 | LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id); |
537 | mutation_wait_event.notify_all(); |
538 | |
539 | /// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately. |
540 | merging_mutating_task_handle->wake(); |
541 | |
542 | return CancellationCode::CancelSent; |
543 | } |
544 | |
545 | |
546 | void StorageMergeTree::loadMutations() |
547 | { |
548 | Poco::DirectoryIterator end; |
549 | const auto full_paths = getDataPaths(); |
550 | for (const String & full_path : full_paths) |
551 | { |
552 | for (auto it = Poco::DirectoryIterator(full_path); it != end; ++it) |
553 | { |
554 | if (startsWith(it.name(), "mutation_" )) |
555 | { |
556 | MergeTreeMutationEntry entry(full_path, it.name()); |
557 | Int64 block_number = entry.block_number; |
558 | auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry)); |
559 | current_mutations_by_version.emplace(block_number, insertion.first->second); |
560 | } |
561 | else if (startsWith(it.name(), "tmp_mutation_" )) |
562 | { |
563 | it->remove(); |
564 | } |
565 | } |
566 | } |
567 | |
568 | if (!current_mutations_by_version.empty()) |
569 | increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first); |
570 | } |
571 | |
572 | |
573 | bool StorageMergeTree::merge( |
574 | bool aggressive, |
575 | const String & partition_id, |
576 | bool final, |
577 | bool deduplicate, |
578 | String * out_disable_reason) |
579 | { |
580 | auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); |
581 | |
582 | FutureMergedMutatedPart future_part; |
583 | |
584 | /// You must call destructor with unlocked `currently_processing_in_background_mutex`. |
585 | std::optional<CurrentlyMergingPartsTagger> merging_tagger; |
586 | |
587 | { |
588 | std::lock_guard lock(currently_processing_in_background_mutex); |
589 | |
590 | auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) |
591 | { |
592 | return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right) |
593 | && getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock); |
594 | }; |
595 | |
596 | bool selected = false; |
597 | |
598 | if (partition_id.empty()) |
599 | { |
600 | UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); |
601 | if (max_source_parts_size > 0) |
602 | selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); |
603 | else if (out_disable_reason) |
604 | *out_disable_reason = "Current value of max_source_parts_size is zero" ; |
605 | } |
606 | else |
607 | { |
608 | UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace(); |
609 | selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); |
610 | } |
611 | |
612 | if (!selected) |
613 | { |
614 | if (out_disable_reason) |
615 | *out_disable_reason = "Cannot select parts for optimization" ; |
616 | return false; |
617 | } |
618 | |
619 | merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false); |
620 | } |
621 | |
622 | MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part); |
623 | |
624 | /// Logging |
625 | Stopwatch stopwatch; |
626 | MutableDataPartPtr new_part; |
627 | |
628 | auto write_part_log = [&] (const ExecutionStatus & execution_status) |
629 | { |
630 | writePartLog( |
631 | PartLogElement::MERGE_PARTS, |
632 | execution_status, |
633 | stopwatch.elapsed(), |
634 | future_part.name, |
635 | new_part, |
636 | future_part.parts, |
637 | merge_entry.get()); |
638 | }; |
639 | |
640 | try |
641 | { |
642 | /// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts |
643 | /// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query. |
644 | bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL())); |
645 | |
646 | new_part = merger_mutator.mergePartsToTemporaryPart( |
647 | future_part, *merge_entry, table_lock_holder, time(nullptr), |
648 | merging_tagger->reserved_space, deduplicate, force_ttl); |
649 | merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); |
650 | removeEmptyColumnsFromPart(new_part); |
651 | |
652 | merging_tagger->is_successful = true; |
653 | write_part_log({}); |
654 | } |
655 | catch (...) |
656 | { |
657 | merging_tagger->exception_message = getCurrentExceptionMessage(false); |
658 | write_part_log(ExecutionStatus::fromCurrentException()); |
659 | throw; |
660 | } |
661 | |
662 | return true; |
663 | } |
664 | |
665 | |
666 | bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const |
667 | { |
668 | std::lock_guard background_processing_lock(currently_processing_in_background_mutex); |
669 | return currently_merging_mutating_parts.count(part); |
670 | } |
671 | |
672 | |
673 | BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask() |
674 | { |
675 | try |
676 | { |
677 | if (!selectPartsAndMove()) |
678 | return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; |
679 | |
680 | return BackgroundProcessingPoolTaskResult::SUCCESS; |
681 | } |
682 | catch (...) |
683 | { |
684 | tryLogCurrentException(log); |
685 | return BackgroundProcessingPoolTaskResult::ERROR; |
686 | } |
687 | } |
688 | |
689 | |
690 | bool StorageMergeTree::tryMutatePart() |
691 | { |
692 | auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); |
693 | size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; |
694 | |
695 | FutureMergedMutatedPart future_part; |
696 | MutationCommands commands; |
697 | /// You must call destructor with unlocked `currently_processing_in_background_mutex`. |
698 | std::optional<CurrentlyMergingPartsTagger> tagger; |
699 | { |
700 | /// DataPart can be store only at one disk. Get Max of free space at all disks |
701 | UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace(); |
702 | |
703 | std::lock_guard lock(currently_processing_in_background_mutex); |
704 | |
705 | if (current_mutations_by_version.empty()) |
706 | return false; |
707 | |
708 | auto mutations_end_it = current_mutations_by_version.end(); |
709 | for (const auto & part : getDataPartsVector()) |
710 | { |
711 | if (currently_merging_mutating_parts.count(part)) |
712 | continue; |
713 | |
714 | auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); |
715 | if (mutations_begin_it == mutations_end_it) |
716 | continue; |
717 | |
718 | if (merger_mutator.getMaxSourcePartSizeForMutation() > disk_space) |
719 | continue; |
720 | |
721 | size_t current_ast_elements = 0; |
722 | for (auto it = mutations_begin_it; it != mutations_end_it; ++it) |
723 | { |
724 | MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context, false); |
725 | |
726 | size_t commands_size = interpreter.evaluateCommandsSize(); |
727 | if (current_ast_elements + commands_size >= max_ast_elements) |
728 | break; |
729 | |
730 | current_ast_elements += commands_size; |
731 | commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end()); |
732 | } |
733 | |
734 | auto new_part_info = part->info; |
735 | new_part_info.mutation = current_mutations_by_version.rbegin()->first; |
736 | |
737 | future_part.parts.push_back(part); |
738 | future_part.part_info = new_part_info; |
739 | future_part.name = part->getNewName(new_part_info); |
740 | |
741 | tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true); |
742 | break; |
743 | } |
744 | } |
745 | |
746 | if (!tagger) |
747 | return false; |
748 | |
749 | MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part); |
750 | |
751 | Stopwatch stopwatch; |
752 | MutableDataPartPtr new_part; |
753 | |
754 | auto write_part_log = [&] (const ExecutionStatus & execution_status) |
755 | { |
756 | writePartLog( |
757 | PartLogElement::MUTATE_PART, |
758 | execution_status, |
759 | stopwatch.elapsed(), |
760 | future_part.name, |
761 | new_part, |
762 | future_part.parts, |
763 | merge_entry.get()); |
764 | }; |
765 | |
766 | try |
767 | { |
768 | new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context, |
769 | tagger->reserved_space, table_lock_holder); |
770 | |
771 | renameTempPartAndReplace(new_part); |
772 | tagger->is_successful = true; |
773 | write_part_log({}); |
774 | |
775 | /// Notify all, who wait for this or previous mutations |
776 | mutation_wait_event.notify_all(); |
777 | } |
778 | catch (...) |
779 | { |
780 | tagger->exception_message = getCurrentExceptionMessage(false); |
781 | write_part_log(ExecutionStatus::fromCurrentException()); |
782 | throw; |
783 | } |
784 | |
785 | return true; |
786 | } |
787 | |
788 | |
789 | BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask() |
790 | { |
791 | if (shutdown_called) |
792 | return BackgroundProcessingPoolTaskResult::ERROR; |
793 | |
794 | if (merger_mutator.merges_blocker.isCancelled()) |
795 | return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; |
796 | |
797 | try |
798 | { |
799 | /// Clear old parts. It is unnecessary to do it more than once a second. |
800 | if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1)) |
801 | { |
802 | { |
803 | /// TODO: Implement tryLockStructureForShare. |
804 | auto lock_structure = lockStructureForShare(false, "" ); |
805 | clearOldPartsFromFilesystem(); |
806 | clearOldTemporaryDirectories(); |
807 | } |
808 | clearOldMutations(); |
809 | } |
810 | |
811 | ///TODO: read deduplicate option from table config |
812 | if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/)) |
813 | return BackgroundProcessingPoolTaskResult::SUCCESS; |
814 | |
815 | |
816 | if (tryMutatePart()) |
817 | return BackgroundProcessingPoolTaskResult::SUCCESS; |
818 | |
819 | return BackgroundProcessingPoolTaskResult::ERROR; |
820 | } |
821 | catch (const Exception & e) |
822 | { |
823 | if (e.code() == ErrorCodes::ABORTED) |
824 | { |
825 | LOG_INFO(log, e.message()); |
826 | return BackgroundProcessingPoolTaskResult::ERROR; |
827 | } |
828 | |
829 | throw; |
830 | } |
831 | } |
832 | |
833 | Int64 StorageMergeTree::getCurrentMutationVersion( |
834 | const DataPartPtr & part, |
835 | std::lock_guard<std::mutex> & /* currently_processing_in_background_mutex_lock */) const |
836 | { |
837 | auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); |
838 | if (it == current_mutations_by_version.begin()) |
839 | return 0; |
840 | --it; |
841 | return it->first; |
842 | } |
843 | |
844 | void StorageMergeTree::clearOldMutations(bool truncate) |
845 | { |
846 | const auto settings = getSettings(); |
847 | if (!truncate && !settings->finished_mutations_to_keep) |
848 | return; |
849 | |
850 | std::vector<MergeTreeMutationEntry> mutations_to_delete; |
851 | { |
852 | std::lock_guard lock(currently_processing_in_background_mutex); |
853 | |
854 | if (!truncate && current_mutations_by_version.size() <= settings->finished_mutations_to_keep) |
855 | return; |
856 | |
857 | auto end_it = current_mutations_by_version.end(); |
858 | auto begin_it = current_mutations_by_version.begin(); |
859 | size_t to_delete_count = std::distance(begin_it, end_it); |
860 | |
861 | if (!truncate) |
862 | { |
863 | if (std::optional<Int64> min_version = getMinPartDataVersion()) |
864 | end_it = current_mutations_by_version.upper_bound(*min_version); |
865 | |
866 | size_t done_count = std::distance(begin_it, end_it); |
867 | if (done_count <= settings->finished_mutations_to_keep) |
868 | return; |
869 | |
870 | to_delete_count = done_count - settings->finished_mutations_to_keep; |
871 | } |
872 | |
873 | auto it = begin_it; |
874 | for (size_t i = 0; i < to_delete_count; ++i) |
875 | { |
876 | mutations_to_delete.push_back(std::move(it->second)); |
877 | current_mutations_by_id.erase(mutations_to_delete.back().file_name); |
878 | it = current_mutations_by_version.erase(it); |
879 | } |
880 | } |
881 | |
882 | for (auto & mutation : mutations_to_delete) |
883 | { |
884 | LOG_TRACE(log, "Removing mutation: " << mutation.file_name); |
885 | mutation.removeFile(); |
886 | } |
887 | } |
888 | |
889 | |
890 | void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context) |
891 | { |
892 | /// Asks to complete merges and moves and does not allow them to start. |
893 | /// This protects against "revival" of data for a removed partition after completion of merge. |
894 | auto merge_blocker = merger_mutator.merges_blocker.cancel(); |
895 | auto move_blocker = parts_mover.moves_blocker.cancel(); |
896 | |
897 | /// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function |
898 | auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId()); |
899 | |
900 | String partition_id = getPartitionIDFromQuery(partition, context); |
901 | auto parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); |
902 | |
903 | std::vector<AlterDataPartTransactionPtr> transactions; |
904 | |
905 | |
906 | StorageInMemoryMetadata metadata = getInMemoryMetadata(); |
907 | alter_command.apply(metadata); |
908 | |
909 | auto columns_for_parts = metadata.columns.getAllPhysical(); |
910 | for (const auto & part : parts) |
911 | { |
912 | if (part->info.partition_id != partition_id) |
913 | throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug." , ErrorCodes::LOGICAL_ERROR); |
914 | |
915 | MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part)); |
916 | alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction); |
917 | if (transaction->isValid()) |
918 | transactions.push_back(std::move(transaction)); |
919 | |
920 | if (alter_command.type == AlterCommand::DROP_COLUMN) |
921 | LOG_DEBUG(log, "Removing column " << alter_command.column_name << " from part " << part->name); |
922 | else if (alter_command.type == AlterCommand::DROP_INDEX) |
923 | LOG_DEBUG(log, "Removing index " << alter_command.index_name << " from part " << part->name); |
924 | } |
925 | |
926 | if (transactions.empty()) |
927 | return; |
928 | |
929 | for (auto & transaction : transactions) |
930 | { |
931 | transaction->commit(); |
932 | transaction.reset(); |
933 | } |
934 | |
935 | /// Recalculate columns size (not only for the modified column) |
936 | recalculateColumnSizes(); |
937 | } |
938 | |
939 | |
940 | bool StorageMergeTree::optimize( |
941 | const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) |
942 | { |
943 | String disable_reason; |
944 | if (!partition && final) |
945 | { |
946 | DataPartsVector data_parts = getDataPartsVector(); |
947 | std::unordered_set<String> partition_ids; |
948 | |
949 | for (const DataPartPtr & part : data_parts) |
950 | partition_ids.emplace(part->info.partition_id); |
951 | |
952 | for (const String & partition_id : partition_ids) |
953 | { |
954 | if (!merge(true, partition_id, true, deduplicate, &disable_reason)) |
955 | { |
956 | std::stringstream message; |
957 | message << "Cannot OPTIMIZE table" ; |
958 | if (!disable_reason.empty()) |
959 | message << ": " << disable_reason; |
960 | else |
961 | message << " by some reason." ; |
962 | LOG_INFO(log, message.rdbuf()); |
963 | |
964 | if (context.getSettingsRef().optimize_throw_if_noop) |
965 | throw Exception(message.str(), ErrorCodes::CANNOT_ASSIGN_OPTIMIZE); |
966 | return false; |
967 | } |
968 | } |
969 | } |
970 | else |
971 | { |
972 | String partition_id; |
973 | if (partition) |
974 | partition_id = getPartitionIDFromQuery(partition, context); |
975 | |
976 | if (!merge(true, partition_id, final, deduplicate, &disable_reason)) |
977 | { |
978 | std::stringstream message; |
979 | message << "Cannot OPTIMIZE table" ; |
980 | if (!disable_reason.empty()) |
981 | message << ": " << disable_reason; |
982 | else |
983 | message << " by some reason." ; |
984 | LOG_INFO(log, message.rdbuf()); |
985 | |
986 | if (context.getSettingsRef().optimize_throw_if_noop) |
987 | throw Exception(message.str(), ErrorCodes::CANNOT_ASSIGN_OPTIMIZE); |
988 | return false; |
989 | } |
990 | } |
991 | |
992 | return true; |
993 | } |
994 | |
995 | void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) |
996 | { |
997 | for (const PartitionCommand & command : commands) |
998 | { |
999 | switch (command.type) |
1000 | { |
1001 | case PartitionCommand::DROP_PARTITION: |
1002 | checkPartitionCanBeDropped(command.partition); |
1003 | dropPartition(command.partition, command.detach, context); |
1004 | break; |
1005 | |
1006 | case PartitionCommand::DROP_DETACHED_PARTITION: |
1007 | dropDetached(command.partition, command.part, context); |
1008 | break; |
1009 | |
1010 | case PartitionCommand::ATTACH_PARTITION: |
1011 | attachPartition(command.partition, command.part, context); |
1012 | break; |
1013 | |
1014 | case PartitionCommand::MOVE_PARTITION: |
1015 | { |
1016 | switch (command.move_destination_type) |
1017 | { |
1018 | case PartitionCommand::MoveDestinationType::DISK: |
1019 | movePartitionToDisk(command.partition, command.move_destination_name, command.part, context); |
1020 | break; |
1021 | |
1022 | case PartitionCommand::MoveDestinationType::VOLUME: |
1023 | movePartitionToVolume(command.partition, command.move_destination_name, command.part, context); |
1024 | break; |
1025 | } |
1026 | |
1027 | } |
1028 | break; |
1029 | |
1030 | case PartitionCommand::REPLACE_PARTITION: |
1031 | { |
1032 | checkPartitionCanBeDropped(command.partition); |
1033 | String from_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database; |
1034 | auto from_storage = context.getTable(from_database, command.from_table); |
1035 | replacePartitionFrom(from_storage, command.partition, command.replace, context); |
1036 | } |
1037 | break; |
1038 | |
1039 | case PartitionCommand::FREEZE_PARTITION: |
1040 | { |
1041 | auto lock = lockStructureForShare(false, context.getCurrentQueryId()); |
1042 | freezePartition(command.partition, command.with_name, context, lock); |
1043 | } |
1044 | break; |
1045 | |
1046 | case PartitionCommand::CLEAR_COLUMN: |
1047 | { |
1048 | AlterCommand alter_command; |
1049 | alter_command.type = AlterCommand::DROP_COLUMN; |
1050 | alter_command.column_name = get<String>(command.column_name); |
1051 | clearColumnOrIndexInPartition(command.partition, alter_command, context); |
1052 | } |
1053 | break; |
1054 | |
1055 | case PartitionCommand::CLEAR_INDEX: |
1056 | { |
1057 | AlterCommand alter_command; |
1058 | alter_command.type = AlterCommand::DROP_INDEX; |
1059 | alter_command.index_name = get<String>(command.index_name); |
1060 | clearColumnOrIndexInPartition(command.partition, alter_command, context); |
1061 | } |
1062 | break; |
1063 | |
1064 | case PartitionCommand::FREEZE_ALL_PARTITIONS: |
1065 | { |
1066 | auto lock = lockStructureForShare(false, context.getCurrentQueryId()); |
1067 | freezeAll(command.with_name, context, lock); |
1068 | } |
1069 | break; |
1070 | |
1071 | default: |
1072 | IStorage::alterPartition(query, commands, context); // should throw an exception. |
1073 | } |
1074 | } |
1075 | } |
1076 | |
1077 | void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context) |
1078 | { |
1079 | { |
1080 | /// Asks to complete merges and does not allow them to start. |
1081 | /// This protects against "revival" of data for a removed partition after completion of merge. |
1082 | auto merge_blocker = merger_mutator.merges_blocker.cancel(); |
1083 | /// Waits for completion of merge and does not start new ones. |
1084 | auto lock = lockExclusively(context.getCurrentQueryId()); |
1085 | |
1086 | String partition_id = getPartitionIDFromQuery(partition, context); |
1087 | |
1088 | /// TODO: should we include PreComitted parts like in Replicated case? |
1089 | auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); |
1090 | // TODO should we throw an exception if parts_to_remove is empty? |
1091 | removePartsFromWorkingSet(parts_to_remove, true); |
1092 | |
1093 | if (detach) |
1094 | { |
1095 | /// If DETACH clone parts to detached/ directory |
1096 | for (const auto & part : parts_to_remove) |
1097 | { |
1098 | LOG_INFO(log, "Detaching " << part->relative_path); |
1099 | part->makeCloneInDetached("" ); |
1100 | } |
1101 | } |
1102 | |
1103 | LOG_INFO(log, (detach ? "Detached " : "Removed " ) << parts_to_remove.size() << " parts inside partition ID " << partition_id << "." ); |
1104 | } |
1105 | |
1106 | clearOldPartsFromFilesystem(); |
1107 | } |
1108 | |
1109 | |
1110 | void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_part, const Context & context) |
1111 | { |
1112 | // TODO: should get some locks to prevent race with 'alter … modify column' |
1113 | |
1114 | PartsTemporaryRename renamed_parts(*this, "detached/" ); |
1115 | MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts); |
1116 | |
1117 | for (size_t i = 0; i < loaded_parts.size(); ++i) |
1118 | { |
1119 | LOG_INFO(log, "Attaching part " << loaded_parts[i]->name << " from " << renamed_parts.old_and_new_names[i].second); |
1120 | renameTempPartAndAdd(loaded_parts[i], &increment); |
1121 | renamed_parts.old_and_new_names[i].first.clear(); |
1122 | LOG_INFO(log, "Finished attaching part" ); |
1123 | } |
1124 | |
1125 | /// New parts with other data may appear in place of deleted parts. |
1126 | context.dropCaches(); |
1127 | } |
1128 | |
1129 | void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) |
1130 | { |
1131 | auto lock1 = lockStructureForShare(false, context.getCurrentQueryId()); |
1132 | auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId()); |
1133 | |
1134 | Stopwatch watch; |
1135 | MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table); |
1136 | String partition_id = getPartitionIDFromQuery(partition, context); |
1137 | |
1138 | DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); |
1139 | MutableDataPartsVector dst_parts; |
1140 | |
1141 | static const String TMP_PREFIX = "tmp_replace_from_" ; |
1142 | |
1143 | for (const DataPartPtr & src_part : src_parts) |
1144 | { |
1145 | if (!canReplacePartition(src_part)) |
1146 | throw Exception( |
1147 | "Cannot replace partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table" , |
1148 | ErrorCodes::LOGICAL_ERROR); |
1149 | |
1150 | /// This will generate unique name in scope of current server process. |
1151 | Int64 temp_index = insert_increment.get(); |
1152 | MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); |
1153 | |
1154 | std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock); |
1155 | dst_parts.emplace_back(cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info)); |
1156 | } |
1157 | |
1158 | /// ATTACH empty part set |
1159 | if (!replace && dst_parts.empty()) |
1160 | return; |
1161 | |
1162 | MergeTreePartInfo drop_range; |
1163 | if (replace) |
1164 | { |
1165 | drop_range.partition_id = partition_id; |
1166 | drop_range.min_block = 0; |
1167 | drop_range.max_block = increment.get(); // there will be a "hole" in block numbers |
1168 | drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max(); |
1169 | } |
1170 | |
1171 | /// Atomically add new parts and remove old ones |
1172 | try |
1173 | { |
1174 | { |
1175 | /// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible |
1176 | /// and we should be able to rollback already added (Precomitted) parts |
1177 | Transaction transaction(*this); |
1178 | |
1179 | auto data_parts_lock = lockParts(); |
1180 | |
1181 | /// Populate transaction |
1182 | for (MutableDataPartPtr & part : dst_parts) |
1183 | renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock); |
1184 | |
1185 | transaction.commit(&data_parts_lock); |
1186 | |
1187 | /// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block |
1188 | if (replace) |
1189 | removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock); |
1190 | } |
1191 | |
1192 | PartLog::addNewParts(global_context, dst_parts, watch.elapsed()); |
1193 | } |
1194 | catch (...) |
1195 | { |
1196 | PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException()); |
1197 | throw; |
1198 | } |
1199 | } |
1200 | |
1201 | |
1202 | ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) |
1203 | { |
1204 | if (action_type == ActionLocks::PartsMerge) |
1205 | return merger_mutator.merges_blocker.cancel(); |
1206 | else if (action_type == ActionLocks::PartsTTLMerge) |
1207 | return merger_mutator.ttl_merges_blocker.cancel(); |
1208 | else if (action_type == ActionLocks::PartsMove) |
1209 | return parts_mover.moves_blocker.cancel(); |
1210 | |
1211 | return {}; |
1212 | } |
1213 | |
1214 | CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & context) |
1215 | { |
1216 | CheckResults results; |
1217 | DataPartsVector data_parts; |
1218 | if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition) |
1219 | { |
1220 | String partition_id = getPartitionIDFromQuery(check_query.partition, context); |
1221 | data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); |
1222 | } |
1223 | else |
1224 | data_parts = getDataPartsVector(); |
1225 | |
1226 | for (auto & part : data_parts) |
1227 | { |
1228 | String full_part_path = part->getFullPath(); |
1229 | /// If the checksums file is not present, calculate the checksums and write them to disk. |
1230 | String checksums_path = full_part_path + "checksums.txt" ; |
1231 | String tmp_checksums_path = full_part_path + "checksums.txt.tmp" ; |
1232 | if (!Poco::File(checksums_path).exists()) |
1233 | { |
1234 | try |
1235 | { |
1236 | auto calculated_checksums = checkDataPart(part, false, primary_key_data_types, skip_indices); |
1237 | calculated_checksums.checkEqual(part->checksums, true); |
1238 | WriteBufferFromFile out(tmp_checksums_path, 4096); |
1239 | part->checksums.write(out); |
1240 | Poco::File(tmp_checksums_path).renameTo(checksums_path); |
1241 | results.emplace_back(part->name, true, "Checksums recounted and written to disk." ); |
1242 | } |
1243 | catch (const Exception & ex) |
1244 | { |
1245 | Poco::File tmp_file(tmp_checksums_path); |
1246 | if (tmp_file.exists()) |
1247 | tmp_file.remove(); |
1248 | |
1249 | results.emplace_back(part->name, false, |
1250 | "Check of part finished with error: '" + ex.message() + "'" ); |
1251 | } |
1252 | } |
1253 | else |
1254 | { |
1255 | try |
1256 | { |
1257 | checkDataPart(part, true, primary_key_data_types, skip_indices); |
1258 | results.emplace_back(part->name, true, "" ); |
1259 | } |
1260 | catch (const Exception & ex) |
1261 | { |
1262 | results.emplace_back(part->name, false, ex.message()); |
1263 | } |
1264 | } |
1265 | } |
1266 | return results; |
1267 | } |
1268 | |
1269 | } |
1270 | |