1#include <Storages/MergeTree/MergeList.h>
2#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
3#include <Common/CurrentMetrics.h>
4#include <common/getThreadNumber.h>
5#include <Common/CurrentThread.h>
6
7
8namespace CurrentMetrics
9{
10 extern const Metric MemoryTrackingForMerges;
11}
12
13
14namespace DB
15{
16
17MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
18 : database{database_}, table{table_}, partition_id{future_part.part_info.partition_id}
19 , result_part_name{future_part.name}
20 , result_part_path{future_part.path}
21 , result_data_version{future_part.part_info.getDataVersion()}
22 , num_parts{future_part.parts.size()}
23 , thread_number{getThreadNumber()}
24{
25 for (const auto & source_part : future_part.parts)
26 {
27 source_part_names.emplace_back(source_part->name);
28 source_part_paths.emplace_back(source_part->getFullPath());
29
30 std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);
31
32 total_size_bytes_compressed += source_part->bytes_on_disk;
33 total_size_marks += source_part->getMarksCount();
34 total_rows_count += source_part->index_granularity.getTotalRows();
35 }
36
37 if (!future_part.parts.empty())
38 {
39 source_data_version = future_part.parts[0]->info.getDataVersion();
40 is_mutation = (result_data_version != source_data_version);
41 }
42
43 /// Each merge is executed into separate background processing pool thread
44 background_thread_memory_tracker = CurrentThread::getMemoryTracker();
45 if (background_thread_memory_tracker)
46 {
47 memory_tracker.setMetric(CurrentMetrics::MemoryTrackingForMerges);
48 background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
49 background_thread_memory_tracker->setParent(&memory_tracker);
50 }
51}
52
53MergeInfo MergeListElement::getInfo() const
54{
55 MergeInfo res;
56 res.database = database;
57 res.table = table;
58 res.result_part_name = result_part_name;
59 res.result_part_path = result_part_path;
60 res.partition_id = partition_id;
61 res.is_mutation = is_mutation;
62 res.elapsed = watch.elapsedSeconds();
63 res.progress = progress.load(std::memory_order_relaxed);
64 res.num_parts = num_parts;
65 res.total_size_bytes_compressed = total_size_bytes_compressed;
66 res.total_size_marks = total_size_marks;
67 res.total_rows_count = total_rows_count;
68 res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
69 res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed);
70 res.rows_read = rows_read.load(std::memory_order_relaxed);
71 res.rows_written = rows_written.load(std::memory_order_relaxed);
72 res.columns_written = columns_written.load(std::memory_order_relaxed);
73 res.memory_usage = memory_tracker.get();
74 res.thread_number = thread_number;
75
76 for (const auto & source_part_name : source_part_names)
77 res.source_part_names.emplace_back(source_part_name);
78
79 for (const auto & source_part_path : source_part_paths)
80 res.source_part_paths.emplace_back(source_part_path);
81
82 return res;
83}
84
85MergeListElement::~MergeListElement()
86{
87 /// Unplug memory_tracker from current background processing pool thread
88 if (background_thread_memory_tracker)
89 background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
90}
91
92}
93