1#include "MergeTreeDataMergerMutator.h"
2
3#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
4#include <Storages/MergeTree/MergedBlockOutputStream.h>
5#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
6#include <Disks/DiskSpaceMonitor.h>
7#include <Storages/MergeTree/SimpleMergeSelector.h>
8#include <Storages/MergeTree/AllMergeSelector.h>
9#include <Storages/MergeTree/TTLMergeSelector.h>
10#include <Storages/MergeTree/MergeList.h>
11#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
12#include <DataStreams/TTLBlockInputStream.h>
13#include <DataStreams/DistinctSortedBlockInputStream.h>
14#include <DataStreams/ExpressionBlockInputStream.h>
15#include <DataStreams/MergingSortedBlockInputStream.h>
16#include <DataStreams/CollapsingSortedBlockInputStream.h>
17#include <DataStreams/SummingSortedBlockInputStream.h>
18#include <DataStreams/ReplacingSortedBlockInputStream.h>
19#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
20#include <DataStreams/AggregatingSortedBlockInputStream.h>
21#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
22#include <DataStreams/MaterializingBlockInputStream.h>
23#include <DataStreams/ConcatBlockInputStream.h>
24#include <DataStreams/ColumnGathererStream.h>
25#include <Interpreters/MutationsInterpreter.h>
26#include <Common/SimpleIncrement.h>
27#include <Common/interpolate.h>
28#include <Common/typeid_cast.h>
29#include <Common/createHardLink.h>
30#include <Poco/File.h>
31#include <Poco/DirectoryIterator.h>
32#include <cmath>
33#include <numeric>
34#include <iomanip>
35
36
37namespace ProfileEvents
38{
39 extern const Event MergedRows;
40 extern const Event MergedUncompressedBytes;
41 extern const Event MergesTimeMilliseconds;
42 extern const Event Merge;
43}
44
45namespace CurrentMetrics
46{
47 extern const Metric BackgroundPoolTask;
48 extern const Metric PartMutation;
49}
50
51namespace DB
52{
53
54namespace ErrorCodes
55{
56 extern const int ABORTED;
57}
58
59
60using MergeAlgorithm = MergeTreeDataMergerMutator::MergeAlgorithm;
61
62
63/// Do not start to merge parts, if free space is less than sum size of parts times specified coefficient.
64/// This value is chosen to not allow big merges to eat all free space. Thus allowing small merges to proceed.
65static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2;
66
67/// To do merge, reserve amount of space equals to sum size of parts times specified coefficient.
68/// Must be strictly less than DISK_USAGE_COEFFICIENT_TO_SELECT,
69/// because between selecting parts to merge and doing merge, amount of free space could have decreased.
70static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1;
71
72
73void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
74{
75 if (parts_.empty())
76 return;
77
78 for (const MergeTreeData::DataPartPtr & part : parts_)
79 {
80 const MergeTreeData::DataPartPtr & first_part = parts_.front();
81
82 if (part->partition.value != first_part->partition.value)
83 throw Exception(
84 "Attempting to merge parts " + first_part->name + " and " + part->name + " that are in different partitions",
85 ErrorCodes::LOGICAL_ERROR);
86 }
87
88 parts = std::move(parts_);
89
90 UInt32 max_level = 0;
91 Int64 max_mutation = 0;
92 for (const auto & part : parts)
93 {
94 max_level = std::max(max_level, part->info.level);
95 max_mutation = std::max(max_mutation, part->info.mutation);
96 }
97
98 part_info.partition_id = parts.front()->info.partition_id;
99 part_info.min_block = parts.front()->info.min_block;
100 part_info.max_block = parts.back()->info.max_block;
101 part_info.level = max_level + 1;
102 part_info.mutation = max_mutation;
103
104 if (parts.front()->storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
105 {
106 DayNum min_date = DayNum(std::numeric_limits<UInt16>::max());
107 DayNum max_date = DayNum(std::numeric_limits<UInt16>::min());
108 for (const auto & part : parts)
109 {
110 /// NOTE: getting min and max dates from part names (instead of part data) because we want
111 /// the merged part name be determined only by source part names.
112 /// It is simpler this way when the real min and max dates for the block range can change
113 /// (e.g. after an ALTER DELETE command).
114 DayNum part_min_date;
115 DayNum part_max_date;
116 MergeTreePartInfo::parseMinMaxDatesFromPartName(part->name, part_min_date, part_max_date);
117 min_date = std::min(min_date, part_min_date);
118 max_date = std::max(max_date, part_max_date);
119 }
120
121 name = part_info.getPartNameV0(min_date, max_date);
122 }
123 else
124 name = part_info.getPartName();
125}
126
127void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const ReservationPtr & reservation)
128{
129 path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
130}
131
132MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size_)
133 : data(data_), background_pool_size(background_pool_size_), log(&Logger::get(data.getLogName() + " (MergerMutator)"))
134{
135}
136
137
138UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge()
139{
140 size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
141
142 return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
143}
144
145
146UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used)
147{
148 if (pool_used > pool_size)
149 throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
150
151 size_t free_entries = pool_size - pool_used;
152 const auto data_settings = data.getSettings();
153
154 UInt64 max_size = 0;
155 if (free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge)
156 max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool;
157 else
158 max_size = interpolateExponential(
159 data_settings->max_bytes_to_merge_at_min_space_in_pool,
160 data_settings->max_bytes_to_merge_at_max_space_in_pool,
161 static_cast<double>(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge);
162
163 return std::min(max_size, static_cast<UInt64>(data.storage_policy->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT));
164}
165
166
167UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation()
168{
169 const auto data_settings = data.getSettings();
170 size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
171
172 /// Allow mutations only if there are enough threads, leave free threads for merges else
173 if (background_pool_size - busy_threads_in_pool >= data_settings->number_of_free_entries_in_pool_to_execute_mutation)
174 return static_cast<UInt64>(data.storage_policy->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_RESERVE);
175
176 return 0;
177}
178
179
180bool MergeTreeDataMergerMutator::selectPartsToMerge(
181 FutureMergedMutatedPart & future_part,
182 bool aggressive,
183 size_t max_total_size_to_merge,
184 const AllowedMergingPredicate & can_merge_callback,
185 String * out_disable_reason)
186{
187 MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
188 const auto data_settings = data.getSettings();
189
190 if (data_parts.empty())
191 {
192 if (out_disable_reason)
193 *out_disable_reason = "There are no parts in the table";
194 return false;
195 }
196
197 time_t current_time = time(nullptr);
198
199 IMergeSelector::Partitions partitions;
200
201 const String * prev_partition_id = nullptr;
202 const MergeTreeData::DataPartPtr * prev_part = nullptr;
203 bool has_part_with_expired_ttl = false;
204 for (const MergeTreeData::DataPartPtr & part : data_parts)
205 {
206 const String & partition_id = part->info.partition_id;
207 if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part, nullptr)))
208 {
209 if (partitions.empty() || !partitions.back().empty())
210 partitions.emplace_back();
211 prev_partition_id = &partition_id;
212 }
213
214 IMergeSelector::Part part_info;
215 part_info.size = part->bytes_on_disk;
216 part_info.age = current_time - part->modification_time;
217 part_info.level = part->info.level;
218 part_info.data = &part;
219 part_info.min_ttl = part->ttl_infos.part_min_ttl;
220 part_info.max_ttl = part->ttl_infos.part_max_ttl;
221
222 time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl;
223
224 if (ttl && ttl <= current_time)
225 has_part_with_expired_ttl = true;
226
227 partitions.back().emplace_back(part_info);
228
229 /// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
230 if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id
231 && part->info.min_block <= (*prev_part)->info.max_block)
232 {
233 LOG_ERROR(log, "Part " << part->name << " intersects previous part " << (*prev_part)->name);
234 }
235
236 prev_part = &part;
237 }
238
239 std::unique_ptr<IMergeSelector> merge_selector;
240
241 SimpleMergeSelector::Settings merge_settings;
242 if (aggressive)
243 merge_settings.base = 1;
244
245 bool can_merge_with_ttl =
246 (current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout);
247
248 /// NOTE Could allow selection of different merge strategy.
249 if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
250 {
251 merge_selector = std::make_unique<TTLMergeSelector>(current_time, data_settings->ttl_only_drop_parts);
252 last_merge_with_ttl = current_time;
253 }
254 else
255 merge_selector = std::make_unique<SimpleMergeSelector>(merge_settings);
256
257 IMergeSelector::PartsInPartition parts_to_merge = merge_selector->select(
258 partitions,
259 max_total_size_to_merge);
260
261 if (parts_to_merge.empty())
262 {
263 if (out_disable_reason)
264 *out_disable_reason = "There are no need to merge parts according to merge selector algorithm";
265 return false;
266 }
267
268 /// Allow to "merge" part with itself if we need remove some values with expired ttl
269 if (parts_to_merge.size() == 1 && !has_part_with_expired_ttl)
270 throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
271
272 MergeTreeData::DataPartsVector parts;
273 parts.reserve(parts_to_merge.size());
274 for (IMergeSelector::Part & part_info : parts_to_merge)
275 {
276 const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
277 parts.push_back(part);
278 }
279
280 LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
281 future_part.assign(std::move(parts));
282 return true;
283}
284
285bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
286 FutureMergedMutatedPart & future_part,
287 UInt64 & available_disk_space,
288 const AllowedMergingPredicate & can_merge,
289 const String & partition_id,
290 bool final,
291 String * out_disable_reason)
292{
293 MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id);
294
295 if (parts.empty())
296 return false;
297
298 if (!final && parts.size() == 1)
299 {
300 if (out_disable_reason)
301 *out_disable_reason = "There is only one part inside partition";
302 return false;
303 }
304
305 auto it = parts.begin();
306 auto prev_it = it;
307
308 UInt64 sum_bytes = 0;
309 while (it != parts.end())
310 {
311 /// For the case of one part, we check that it can be merged "with itself".
312 if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, out_disable_reason))
313 {
314 return false;
315 }
316
317 sum_bytes += (*it)->bytes_on_disk;
318
319 prev_it = it;
320 ++it;
321 }
322
323 /// Enough disk space to cover the new merge with a margin.
324 auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT;
325 if (available_disk_space <= required_disk_space)
326 {
327 time_t now = time(nullptr);
328 if (now - disk_space_warning_time > 3600)
329 {
330 disk_space_warning_time = now;
331 LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name
332 << " because not enough free space: "
333 << formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved, "
334 << formatReadableSizeWithBinarySuffix(sum_bytes)
335 << " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
336 << "% on overhead); suppressing similar warnings for the next hour");
337 }
338
339 if (out_disable_reason)
340 *out_disable_reason = "Insufficient available disk space, required " +
341 formatReadableSizeWithDecimalSuffix(required_disk_space);
342
343 return false;
344 }
345
346 LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
347 future_part.assign(std::move(parts));
348 available_disk_space -= required_disk_space;
349 return true;
350}
351
352
353MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::selectAllPartsFromPartition(const String & partition_id)
354{
355 MergeTreeData::DataPartsVector parts_from_partition;
356
357 MergeTreeData::DataParts data_parts = data.getDataParts();
358
359 for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
360 {
361 const MergeTreeData::DataPartPtr & current_part = *it;
362 if (current_part->info.partition_id != partition_id)
363 continue;
364
365 parts_from_partition.push_back(current_part);
366 }
367
368 return parts_from_partition;
369}
370
371
372/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
373static void extractMergingAndGatheringColumns(
374 const NamesAndTypesList & all_columns,
375 const ExpressionActionsPtr & sorting_key_expr,
376 const MergeTreeIndices & indexes,
377 const MergeTreeData::MergingParams & merging_params,
378 NamesAndTypesList & gathering_columns, Names & gathering_column_names,
379 NamesAndTypesList & merging_columns, Names & merging_column_names)
380{
381 Names sort_key_columns_vec = sorting_key_expr->getRequiredColumns();
382 std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
383 for (const auto & index : indexes)
384 {
385 Names index_columns_vec = index->getColumnsRequiredForIndexCalc();
386 std::copy(index_columns_vec.cbegin(), index_columns_vec.cend(),
387 std::inserter(key_columns, key_columns.end()));
388 }
389
390 /// Force sign column for Collapsing mode
391 if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
392 key_columns.emplace(merging_params.sign_column);
393
394 /// Force version column for Replacing mode
395 if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
396 key_columns.emplace(merging_params.version_column);
397
398 /// Force sign column for VersionedCollapsing mode. Version is already in primary key.
399 if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
400 key_columns.emplace(merging_params.sign_column);
401
402 /// Force to merge at least one column in case of empty key
403 if (key_columns.empty())
404 key_columns.emplace(all_columns.front().name);
405
406 /// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
407
408 for (const auto & column : all_columns)
409 {
410 if (key_columns.count(column.name))
411 {
412 merging_columns.emplace_back(column);
413 merging_column_names.emplace_back(column.name);
414 }
415 else
416 {
417 gathering_columns.emplace_back(column);
418 gathering_column_names.emplace_back(column.name);
419 }
420 }
421}
422
423/* Allow to compute more accurate progress statistics */
424class ColumnSizeEstimator
425{
426 MergeTreeData::DataPart::ColumnToSize map;
427public:
428
429 /// Stores approximate size of columns in bytes
430 /// Exact values are not required since it used for relative values estimation (progress).
431 size_t sum_total = 0;
432 size_t sum_index_columns = 0;
433 size_t sum_ordinary_columns = 0;
434
435 ColumnSizeEstimator(const MergeTreeData::DataPart::ColumnToSize & map_, const Names & key_columns, const Names & ordinary_columns)
436 : map(map_)
437 {
438 for (const auto & name : key_columns)
439 if (!map.count(name)) map[name] = 0;
440 for (const auto & name : ordinary_columns)
441 if (!map.count(name)) map[name] = 0;
442
443 for (const auto & name : key_columns)
444 sum_index_columns += map.at(name);
445
446 for (const auto & name : ordinary_columns)
447 sum_ordinary_columns += map.at(name);
448
449 sum_total = std::max(static_cast<decltype(sum_index_columns)>(1), sum_index_columns + sum_ordinary_columns);
450 }
451
452 Float64 columnWeight(const String & column) const
453 {
454 return static_cast<Float64>(map.at(column)) / sum_total;
455 }
456
457 Float64 keyColumnsWeight() const
458 {
459 return static_cast<Float64>(sum_index_columns) / sum_total;
460 }
461};
462
463/** Progress callback.
464 * What it should update:
465 * - approximate progress
466 * - amount of read rows
467 * - various metrics
468 * - time elapsed for current merge.
469 */
470
471/// Auxilliary struct that for each merge stage stores its current progress.
472/// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a
473/// Vertical merge) or a mutation of a single part. During a single stage all rows are read.
474struct MergeStageProgress
475{
476 MergeStageProgress(Float64 weight_)
477 : is_first(true) , weight(weight_)
478 {
479 }
480
481 MergeStageProgress(Float64 initial_progress_, Float64 weight_)
482 : initial_progress(initial_progress_), is_first(false), weight(weight_)
483 {
484 }
485
486 Float64 initial_progress = 0.0;
487 bool is_first;
488 Float64 weight;
489
490 UInt64 total_rows = 0;
491 UInt64 rows_read = 0;
492};
493
494class MergeProgressCallback
495{
496public:
497 MergeProgressCallback(
498 MergeList::Entry & merge_entry_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
499 : merge_entry(merge_entry_)
500 , watch_prev_elapsed(watch_prev_elapsed_)
501 , stage(stage_)
502 {
503 updateWatch();
504 }
505
506 MergeList::Entry & merge_entry;
507 UInt64 & watch_prev_elapsed;
508 MergeStageProgress & stage;
509
510 void updateWatch()
511 {
512 UInt64 watch_curr_elapsed = merge_entry->watch.elapsed();
513 ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
514 watch_prev_elapsed = watch_curr_elapsed;
515 }
516
517 void operator() (const Progress & value)
518 {
519 ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
520 if (stage.is_first)
521 {
522 ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
523 ProfileEvents::increment(ProfileEvents::Merge);
524 }
525 updateWatch();
526
527 merge_entry->bytes_read_uncompressed += value.read_bytes;
528 if (stage.is_first)
529 merge_entry->rows_read += value.read_rows;
530
531 stage.total_rows += value.total_rows_to_read;
532 stage.rows_read += value.read_rows;
533 if (stage.total_rows > 0)
534 {
535 merge_entry->progress.store(
536 stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows,
537 std::memory_order_relaxed);
538 }
539 }
540};
541
542/// parts should be sorted.
543MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
544 const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry, TableStructureReadLockHolder &,
545 time_t time_of_merge, const ReservationPtr & space_reservation, bool deduplicate, bool force_ttl)
546{
547 static const String TMP_PREFIX = "tmp_merge_";
548
549 if (merges_blocker.isCancelled())
550 throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
551
552 const MergeTreeData::DataPartsVector & parts = future_part.parts;
553
554 LOG_DEBUG(log, "Merging " << parts.size() << " parts: from "
555 << parts.front()->name << " to " << parts.back()->name
556 << " into " << TMP_PREFIX + future_part.name);
557
558 String part_path = data.getFullPathOnDisk(space_reservation->getDisk());
559 String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";
560 if (Poco::File(new_part_tmp_path).exists())
561 throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
562
563 MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
564 for (const MergeTreeData::DataPartPtr & part : parts)
565 part->accumulateColumnSizes(merged_column_to_size);
566
567 Names all_column_names = data.getColumns().getNamesOfPhysical();
568 NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
569 const auto data_settings = data.getSettings();
570
571 NamesAndTypesList gathering_columns;
572 NamesAndTypesList merging_columns;
573 Names gathering_column_names, merging_column_names;
574 extractMergingAndGatheringColumns(
575 all_columns, data.sorting_key_expr, data.skip_indices,
576 data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
577
578 MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
579 data, space_reservation->getDisk(), future_part.name, future_part.part_info);
580 new_data_part->partition.assign(future_part.getPartition());
581 new_data_part->relative_path = TMP_PREFIX + future_part.name;
582 new_data_part->is_temp = true;
583
584 size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
585
586 bool need_remove_expired_values = force_ttl;
587 for (const MergeTreeData::DataPartPtr & part : parts)
588 new_data_part->ttl_infos.update(part->ttl_infos);
589
590 const auto & part_min_ttl = new_data_part->ttl_infos.part_min_ttl;
591 if (part_min_ttl && part_min_ttl <= time_of_merge)
592 need_remove_expired_values = true;
593
594 if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
595 {
596 LOG_INFO(log, "Part " << new_data_part->name << " has values with expired TTL, but merges with TTL are cancelled.");
597 need_remove_expired_values = false;
598 }
599
600 MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
601
602 LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
603
604 /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
605 /// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
606 /// (which is locked in shared mode when input streams are created) and when inserting new data
607 /// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
608 /// deadlock is impossible.
609 auto compression_codec = data.global_context.chooseCompressionCodec(
610 merge_entry->total_size_bytes_compressed,
611 static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
612
613 String rows_sources_file_path;
614 std::unique_ptr<WriteBuffer> rows_sources_uncompressed_write_buf;
615 std::unique_ptr<WriteBuffer> rows_sources_write_buf;
616
617 if (merge_alg == MergeAlgorithm::Vertical)
618 {
619 Poco::File(new_part_tmp_path).createDirectories();
620 rows_sources_file_path = new_part_tmp_path + "rows_sources";
621 rows_sources_uncompressed_write_buf = std::make_unique<WriteBufferFromFile>(rows_sources_file_path);
622 rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
623 }
624 else
625 {
626 merging_columns = all_columns;
627 merging_column_names = all_column_names;
628 gathering_columns.clear();
629 gathering_column_names.clear();
630 }
631
632 ColumnSizeEstimator column_sizes(merged_column_to_size, merging_column_names, gathering_column_names);
633
634 /** Read from all parts, merge and write into a new one.
635 * In passing, we calculate expression for sorting.
636 */
637 BlockInputStreams src_streams;
638 UInt64 watch_prev_elapsed = 0;
639
640 /// We count total amount of bytes in parts
641 /// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
642 bool read_with_direct_io = false;
643 if (data_settings->min_merge_bytes_to_use_direct_io != 0)
644 {
645 size_t total_size = 0;
646 for (const auto & part : parts)
647 {
648 total_size += part->bytes_on_disk;
649 if (total_size >= data_settings->min_merge_bytes_to_use_direct_io)
650 {
651 LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT");
652 read_with_direct_io = true;
653
654 break;
655 }
656 }
657 }
658
659 MergeStageProgress horizontal_stage_progress(
660 merge_alg == MergeAlgorithm::Horizontal ? 1.0 : column_sizes.keyColumnsWeight());
661
662 for (const auto & part : parts)
663 {
664 auto input = std::make_unique<MergeTreeSequentialBlockInputStream>(
665 data, part, merging_column_names, read_with_direct_io, true);
666
667 input->setProgressCallback(
668 MergeProgressCallback(merge_entry, watch_prev_elapsed, horizontal_stage_progress));
669
670 BlockInputStreamPtr stream = std::move(input);
671 if (data.hasPrimaryKey() || data.hasSkipIndices())
672 stream = std::make_shared<MaterializingBlockInputStream>(
673 std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_and_skip_indices_expr));
674
675 src_streams.emplace_back(stream);
676 }
677
678 Names sort_columns = data.sorting_key_columns;
679 SortDescription sort_description;
680 size_t sort_columns_size = sort_columns.size();
681 sort_description.reserve(sort_columns_size);
682
683 Block header = src_streams.at(0)->getHeader();
684 for (size_t i = 0; i < sort_columns_size; ++i)
685 sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
686
687 /// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
688 /// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
689 /// that is going in insertion order.
690 std::shared_ptr<IBlockInputStream> merged_stream;
691
692 /// If merge is vertical we cannot calculate it
693 bool blocks_are_granules_size = (merge_alg == MergeAlgorithm::Vertical);
694
695 UInt64 merge_block_size = data_settings->merge_max_block_size;
696 switch (data.merging_params.mode)
697 {
698 case MergeTreeData::MergingParams::Ordinary:
699 merged_stream = std::make_unique<MergingSortedBlockInputStream>(
700 src_streams, sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size);
701 break;
702
703 case MergeTreeData::MergingParams::Collapsing:
704 merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
705 src_streams, sort_description, data.merging_params.sign_column,
706 merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
707 break;
708
709 case MergeTreeData::MergingParams::Summing:
710 merged_stream = std::make_unique<SummingSortedBlockInputStream>(
711 src_streams, sort_description, data.merging_params.columns_to_sum, merge_block_size);
712 break;
713
714 case MergeTreeData::MergingParams::Aggregating:
715 merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
716 src_streams, sort_description, merge_block_size);
717 break;
718
719 case MergeTreeData::MergingParams::Replacing:
720 merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
721 src_streams, sort_description, data.merging_params.version_column,
722 merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
723 break;
724
725 case MergeTreeData::MergingParams::Graphite:
726 merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
727 src_streams, sort_description, merge_block_size,
728 data.merging_params.graphite_params, time_of_merge);
729 break;
730
731 case MergeTreeData::MergingParams::VersionedCollapsing:
732 merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
733 src_streams, sort_description, data.merging_params.sign_column,
734 merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
735 break;
736 }
737
738 if (deduplicate)
739 merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
740
741 if (need_remove_expired_values)
742 merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, new_data_part, time_of_merge, force_ttl);
743
744 MergedBlockOutputStream to{
745 data,
746 new_part_tmp_path,
747 merging_columns,
748 compression_codec,
749 merged_column_to_size,
750 data_settings->min_merge_bytes_to_use_direct_io,
751 blocks_are_granules_size};
752
753 merged_stream->readPrefix();
754 to.writePrefix();
755
756 size_t rows_written = 0;
757 const size_t initial_reservation = space_reservation ? space_reservation->getSize() : 0;
758
759 auto is_cancelled = [&]() { return merges_blocker.isCancelled()
760 || (need_remove_expired_values && ttl_merges_blocker.isCancelled()); };
761
762 Block block;
763 while (!is_cancelled() && (block = merged_stream->read()))
764 {
765 rows_written += block.rows();
766
767 to.write(block);
768
769 merge_entry->rows_written = merged_stream->getProfileInfo().rows;
770 merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
771
772 /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
773 if (space_reservation && sum_input_rows_upper_bound)
774 {
775 /// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
776 /// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
777 Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
778 ? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
779 : std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
780
781 space_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
782 }
783 }
784
785 merged_stream->readSuffix();
786 merged_stream.reset();
787
788 if (merges_blocker.isCancelled())
789 throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
790
791 if (need_remove_expired_values && ttl_merges_blocker.isCancelled())
792 throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED);
793
794 MergeTreeData::DataPart::Checksums checksums_gathered_columns;
795
796 /// Gather ordinary columns
797 if (merge_alg == MergeAlgorithm::Vertical)
798 {
799 size_t sum_input_rows_exact = merge_entry->rows_read;
800 merge_entry->columns_written = merging_column_names.size();
801 merge_entry->progress.store(column_sizes.keyColumnsWeight(), std::memory_order_relaxed);
802
803 BlockInputStreams column_part_streams(parts.size());
804
805 auto it_name_and_type = gathering_columns.cbegin();
806
807 rows_sources_write_buf->next();
808 rows_sources_uncompressed_write_buf->next();
809
810 size_t rows_sources_count = rows_sources_write_buf->count();
811 /// In special case, when there is only one source part, and no rows were skipped, we may have
812 /// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
813 /// number of input rows.
814 if ((rows_sources_count > 0 || parts.size() > 1) && sum_input_rows_exact != rows_sources_count)
815 throw Exception("Number of rows in source parts (" + toString(sum_input_rows_exact)
816 + ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
817 + "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
818
819 CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
820 IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
821
822 for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
823 column_num < gathering_column_names_size;
824 ++column_num, ++it_name_and_type)
825 {
826 const String & column_name = it_name_and_type->name;
827 Names column_names{column_name};
828 Float64 progress_before = merge_entry->progress.load(std::memory_order_relaxed);
829
830 MergeStageProgress column_progress(progress_before, column_sizes.columnWeight(column_name));
831 for (size_t part_num = 0; part_num < parts.size(); ++part_num)
832 {
833 auto column_part_stream = std::make_shared<MergeTreeSequentialBlockInputStream>(
834 data, parts[part_num], column_names, read_with_direct_io, true);
835
836 column_part_stream->setProgressCallback(
837 MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));
838
839 column_part_streams[part_num] = std::move(column_part_stream);
840 }
841
842 rows_sources_read_buf.seek(0, 0);
843 ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
844
845 MergedColumnOnlyOutputStream column_to(
846 data,
847 column_gathered_stream.getHeader(),
848 new_part_tmp_path,
849 false,
850 compression_codec,
851 false,
852 /// we don't need to recalc indices here
853 /// because all of them were already recalculated and written
854 /// as key part of vertical merge
855 std::vector<MergeTreeIndexPtr>{},
856 written_offset_columns,
857 to.getIndexGranularity());
858
859 size_t column_elems_written = 0;
860
861 column_to.writePrefix();
862 while (!merges_blocker.isCancelled() && (block = column_gathered_stream.read()))
863 {
864 column_elems_written += block.rows();
865 column_to.write(block);
866 }
867
868 if (merges_blocker.isCancelled())
869 throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
870
871 column_gathered_stream.readSuffix();
872 checksums_gathered_columns.add(column_to.writeSuffixAndGetChecksums());
873
874 if (rows_written != column_elems_written)
875 {
876 throw Exception("Written " + toString(column_elems_written) + " elements of column " + column_name +
877 ", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
878 }
879
880 /// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
881
882 merge_entry->columns_written += 1;
883 merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
884 merge_entry->progress.store(progress_before + column_sizes.columnWeight(column_name), std::memory_order_relaxed);
885 }
886
887 Poco::File(rows_sources_file_path).remove();
888 }
889
890 for (const auto & part : parts)
891 new_data_part->minmax_idx.merge(part->minmax_idx);
892
893 /// Print overall profiling info. NOTE: it may duplicates previous messages
894 {
895 double elapsed_seconds = merge_entry->watch.elapsedSeconds();
896 LOG_DEBUG(log, std::fixed << std::setprecision(2)
897 << "Merge sorted " << merge_entry->rows_read << " rows"
898 << ", containing " << all_column_names.size() << " columns"
899 << " (" << merging_column_names.size() << " merged, " << gathering_column_names.size() << " gathered)"
900 << " in " << elapsed_seconds << " sec., "
901 << merge_entry->rows_read / elapsed_seconds << " rows/sec., "
902 << merge_entry->bytes_read_uncompressed / 1000000.0 / elapsed_seconds << " MB/sec.");
903 }
904
905 if (merge_alg != MergeAlgorithm::Vertical)
906 to.writeSuffixAndFinalizePart(new_data_part);
907 else
908 to.writeSuffixAndFinalizePart(new_data_part, &all_columns, &checksums_gathered_columns);
909
910 return new_data_part;
911}
912
913
914MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
915 const FutureMergedMutatedPart & future_part,
916 const std::vector<MutationCommand> & commands,
917 MergeListEntry & merge_entry,
918 const Context & context,
919 const ReservationPtr & space_reservation,
920 TableStructureReadLockHolder & table_lock_holder)
921{
922 auto check_not_cancelled = [&]()
923 {
924 if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
925 throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
926
927 return true;
928 };
929
930 check_not_cancelled();
931
932 if (future_part.parts.size() != 1)
933 throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
934 "This is a bug.", ErrorCodes::LOGICAL_ERROR);
935
936 CurrentMetrics::Increment num_mutations{CurrentMetrics::PartMutation};
937 const auto & source_part = future_part.parts[0];
938 auto storage_from_source_part = StorageFromMergeTreeDataPart::create(source_part);
939
940 auto context_for_reading = context;
941 context_for_reading.getSettingsRef().max_streams_to_max_threads_ratio = 1;
942 context_for_reading.getSettingsRef().max_threads = 1;
943
944 std::vector<MutationCommand> commands_for_part;
945 std::copy_if(
946 std::cbegin(commands), std::cend(commands),
947 std::back_inserter(commands_for_part),
948 [&] (const MutationCommand & command)
949 {
950 return command.partition == nullptr ||
951 future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery(
952 command.partition, context_for_reading);
953 });
954
955
956 if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
957 {
958 LOG_TRACE(log, "Part " << source_part->name << " doesn't change up to mutation version " << future_part.part_info.mutation);
959 return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info);
960 }
961 else
962 LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
963
964 MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
965 data, space_reservation->getDisk(), future_part.name, future_part.part_info);
966 new_data_part->relative_path = "tmp_mut_" + future_part.name;
967 new_data_part->is_temp = true;
968 new_data_part->ttl_infos = source_part->ttl_infos;
969 new_data_part->index_granularity_info = source_part->index_granularity_info;
970
971 String new_part_tmp_path = new_data_part->getFullPath();
972
973 /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
974 /// (which is locked in data.getTotalActiveSizeInBytes()) is locked after part->columns_lock
975 /// (which is locked in shared mode when input streams are created) and when inserting new data
976 /// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
977 /// deadlock is impossible.
978 auto compression_codec = context.chooseCompressionCodec(
979 source_part->bytes_on_disk,
980 static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
981
982 Poco::File(new_part_tmp_path).createDirectories();
983
984 MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
985 auto in = mutations_interpreter.execute(table_lock_holder);
986 const auto & updated_header = mutations_interpreter.getUpdatedHeader();
987
988 NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
989 const auto data_settings = data.getSettings();
990
991 Block in_header = in->getHeader();
992
993 UInt64 watch_prev_elapsed = 0;
994 MergeStageProgress stage_progress(1.0);
995 in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
996
997 if (updated_header.columns() == all_columns.size())
998 {
999 /// All columns are modified, proceed to write a new part from scratch.
1000 if (data.hasPrimaryKey() || data.hasSkipIndices())
1001 in = std::make_shared<MaterializingBlockInputStream>(
1002 std::make_shared<ExpressionBlockInputStream>(in, data.primary_key_and_skip_indices_expr));
1003
1004 MergeTreeDataPart::MinMaxIndex minmax_idx;
1005
1006 MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_codec);
1007
1008 in->readPrefix();
1009 out.writePrefix();
1010
1011 Block block;
1012 while (check_not_cancelled() && (block = in->read()))
1013 {
1014 minmax_idx.update(block, data.minmax_idx_columns);
1015 out.write(block);
1016
1017 merge_entry->rows_written += block.rows();
1018 merge_entry->bytes_written_uncompressed += block.bytes();
1019 }
1020
1021 new_data_part->partition.assign(source_part->partition);
1022 new_data_part->minmax_idx = std::move(minmax_idx);
1023
1024 in->readSuffix();
1025 out.writeSuffixAndFinalizePart(new_data_part);
1026 }
1027 else
1028 {
1029 /// We will modify only some of the columns. Other columns and key values can be copied as-is.
1030 /// TODO: check that we modify only non-key columns in this case.
1031
1032 /// Checks if columns used in skipping indexes modified.
1033 std::set<MergeTreeIndexPtr> indices_to_recalc;
1034 ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
1035 for (const auto & col : in_header.getNames())
1036 {
1037 for (size_t i = 0; i < data.skip_indices.size(); ++i)
1038 {
1039 const auto & index = data.skip_indices[i];
1040 const auto & index_cols = index->getColumnsRequiredForIndexCalc();
1041 auto it = std::find(std::cbegin(index_cols), std::cend(index_cols), col);
1042 if (it != std::cend(index_cols) && indices_to_recalc.insert(index).second)
1043 {
1044 ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(
1045 storage_from_source_part->getIndices().indices[i]->expr->clone());
1046 for (const auto & expr : expr_list->children)
1047 indices_recalc_expr_list->children.push_back(expr->clone());
1048 }
1049 }
1050 }
1051
1052 if (!indices_to_recalc.empty())
1053 {
1054 auto indices_recalc_syntax = SyntaxAnalyzer(context, {}).analyze(
1055 indices_recalc_expr_list, in_header.getNamesAndTypesList());
1056 auto indices_recalc_expr = ExpressionAnalyzer(
1057 indices_recalc_expr_list,
1058 indices_recalc_syntax, context).getActions(false);
1059
1060 /// We can update only one column, but some skip idx expression may depend on several
1061 /// columns (c1 + c2 * c3). It works because in stream was created with help of
1062 /// MutationsInterpreter which knows about skip indices and stream 'in' already has
1063 /// all required columns.
1064 /// TODO move this logic to single place.
1065 in = std::make_shared<MaterializingBlockInputStream>(
1066 std::make_shared<ExpressionBlockInputStream>(in, indices_recalc_expr));
1067 }
1068
1069 NameSet files_to_skip = {"checksums.txt", "columns.txt"};
1070
1071 /// Don't change granularity type while mutating subset of columns
1072 auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
1073 for (const auto & entry : updated_header)
1074 {
1075 IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
1076 {
1077 String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
1078 files_to_skip.insert(stream_name + ".bin");
1079 files_to_skip.insert(stream_name + mrk_extension);
1080 };
1081
1082 IDataType::SubstreamPath stream_path;
1083 entry.type->enumerateStreams(callback, stream_path);
1084 }
1085 for (const auto & index : indices_to_recalc)
1086 {
1087 files_to_skip.insert(index->getFileName() + ".idx");
1088 files_to_skip.insert(index->getFileName() + mrk_extension);
1089 }
1090
1091 Poco::DirectoryIterator dir_end;
1092 for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
1093 {
1094 if (files_to_skip.count(dir_it.name()))
1095 continue;
1096
1097 Poco::Path destination(new_part_tmp_path);
1098 destination.append(dir_it.name());
1099
1100 createHardLink(dir_it.path().toString(), destination.toString());
1101 }
1102
1103 merge_entry->columns_written = all_columns.size() - updated_header.columns();
1104
1105 IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
1106 MergedColumnOnlyOutputStream out(
1107 data,
1108 updated_header,
1109 new_part_tmp_path,
1110 /* sync = */ false,
1111 compression_codec,
1112 /* skip_offsets = */ false,
1113 std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
1114 unused_written_offsets,
1115 source_part->index_granularity,
1116 &source_part->index_granularity_info
1117 );
1118
1119 in->readPrefix();
1120 out.writePrefix();
1121
1122 Block block;
1123 while (check_not_cancelled() && (block = in->read()))
1124 {
1125 out.write(block);
1126
1127 merge_entry->rows_written += block.rows();
1128 merge_entry->bytes_written_uncompressed += block.bytes();
1129 }
1130
1131 in->readSuffix();
1132 auto changed_checksums = out.writeSuffixAndGetChecksums();
1133
1134 new_data_part->checksums = source_part->checksums;
1135 new_data_part->checksums.add(std::move(changed_checksums));
1136 {
1137 /// Write file with checksums.
1138 WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096);
1139 new_data_part->checksums.write(out_checksums);
1140 }
1141
1142 /// Write the columns list of the resulting part in the same order as all_columns.
1143 new_data_part->columns = all_columns;
1144 Names source_column_names = source_part->columns.getNames();
1145 NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
1146 for (auto it = new_data_part->columns.begin(); it != new_data_part->columns.end();)
1147 {
1148 if (source_columns_name_set.count(it->name) || updated_header.has(it->name))
1149 ++it;
1150 else
1151 it = new_data_part->columns.erase(it);
1152 }
1153 {
1154 /// Write a file with a description of columns.
1155 WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096);
1156 new_data_part->columns.writeText(out_columns);
1157 }
1158
1159 new_data_part->rows_count = source_part->rows_count;
1160 new_data_part->index_granularity = source_part->index_granularity;
1161 new_data_part->index = source_part->index;
1162 new_data_part->partition.assign(source_part->partition);
1163 new_data_part->minmax_idx = source_part->minmax_idx;
1164 new_data_part->modification_time = time(nullptr);
1165 new_data_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->getFullPath());
1166 }
1167
1168 return new_data_part;
1169}
1170
1171
1172MergeTreeDataMergerMutator::MergeAlgorithm MergeTreeDataMergerMutator::chooseMergeAlgorithm(
1173 const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
1174 const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const
1175{
1176 const auto data_settings = data.getSettings();
1177
1178 if (deduplicate)
1179 return MergeAlgorithm::Horizontal;
1180 if (data_settings->enable_vertical_merge_algorithm == 0)
1181 return MergeAlgorithm::Horizontal;
1182 if (need_remove_expired_values)
1183 return MergeAlgorithm::Horizontal;
1184
1185 bool is_supported_storage =
1186 data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
1187 data.merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
1188 data.merging_params.mode == MergeTreeData::MergingParams::Replacing ||
1189 data.merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
1190
1191 bool enough_ordinary_cols = gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
1192
1193 bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
1194
1195 bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;
1196
1197 auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
1198 MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
1199
1200 return merge_alg;
1201}
1202
1203
1204MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart(
1205 MergeTreeData::MutableDataPartPtr & new_data_part,
1206 const MergeTreeData::DataPartsVector & parts,
1207 MergeTreeData::Transaction * out_transaction)
1208{
1209 /// Rename new part, add to the set and remove original parts.
1210 auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
1211
1212 /// Let's check that all original parts have been deleted and only them.
1213 if (replaced_parts.size() != parts.size())
1214 {
1215 /** This is normal, although this happens rarely.
1216 *
1217 * The situation - was replaced 0 parts instead of N can be, for example, in the following case
1218 * - we had A part, but there was no B and C parts;
1219 * - A, B -> AB was in the queue, but it has not been done, because there is no B part;
1220 * - AB, C -> ABC was in the queue, but it has not been done, because there are no AB and C parts;
1221 * - we have completed the task of downloading a B part;
1222 * - we started to make A, B -> AB merge, since all parts appeared;
1223 * - we decided to download ABC part from another replica, since it was impossible to make merge AB, C -> ABC;
1224 * - ABC part appeared. When it was added, old A, B, C parts were deleted;
1225 * - AB merge finished. AB part was added. But this is an obsolete part. The log will contain the message `Obsolete part added`,
1226 * then we get here.
1227 *
1228 * When M > N parts could be replaced?
1229 * - new block was added in ReplicatedMergeTreeBlockOutputStream;
1230 * - it was added to working dataset in memory and renamed on filesystem;
1231 * - but ZooKeeper transaction that adds it to reference dataset in ZK failed;
1232 * - and it is failed due to connection loss, so we don't rollback working dataset in memory,
1233 * because we don't know if the part was added to ZK or not
1234 * (see ReplicatedMergeTreeBlockOutputStream)
1235 * - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked,
1236 * and so it is possible to merge a range skipping this part.
1237 * (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.)
1238 * - and after merge, this part will be removed in addition to parts that was merged.
1239 */
1240 LOG_WARNING(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
1241 << " instead of " << parts.size());
1242 }
1243 else
1244 {
1245 for (size_t i = 0; i < parts.size(); ++i)
1246 if (parts[i]->name != replaced_parts[i]->name)
1247 throw Exception("Unexpected part removed when adding " + new_data_part->name + ": " + replaced_parts[i]->name
1248 + " instead of " + parts[i]->name, ErrorCodes::LOGICAL_ERROR);
1249 }
1250
1251 LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
1252 return new_data_part;
1253}
1254
1255
1256size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
1257{
1258 size_t res = 0;
1259 for (const MergeTreeData::DataPartPtr & part : source_parts)
1260 res += part->bytes_on_disk;
1261
1262 return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
1263}
1264
1265}
1266