1#pragma once
2
3#include <Common/Stopwatch.h>
4#include <Common/CurrentMetrics.h>
5#include <Common/MemoryTracker.h>
6#include <Interpreters/Context.h>
7#include <Storages/MergeTree/MergeTreeData.h>
8#include <memory>
9#include <list>
10#include <mutex>
11#include <atomic>
12
13
14/** Maintains a list of currently running merges.
15 * For implementation of system.merges table.
16 */
17
18namespace CurrentMetrics
19{
20 extern const Metric Merge;
21}
22
23namespace DB
24{
25
26struct MergeInfo
27{
28 std::string database;
29 std::string table;
30 std::string result_part_name;
31 std::string result_part_path;
32 Array source_part_names;
33 Array source_part_paths;
34 std::string partition_id;
35 bool is_mutation;
36 Float64 elapsed;
37 Float64 progress;
38 UInt64 num_parts;
39 UInt64 total_size_bytes_compressed;
40 UInt64 total_size_marks;
41 UInt64 total_rows_count;
42 UInt64 bytes_read_uncompressed;
43 UInt64 bytes_written_uncompressed;
44 UInt64 rows_read;
45 UInt64 rows_written;
46 UInt64 columns_written;
47 UInt64 memory_usage;
48 UInt64 thread_number;
49};
50
51struct FutureMergedMutatedPart;
52
53struct MergeListElement : boost::noncopyable
54{
55 const std::string database;
56 const std::string table;
57 std::string partition_id;
58
59 const std::string result_part_name;
60 const std::string result_part_path;
61 Int64 result_data_version{};
62 bool is_mutation{};
63
64 UInt64 num_parts{};
65 Names source_part_names;
66 Names source_part_paths;
67 Int64 source_data_version{};
68
69 Stopwatch watch;
70 std::atomic<Float64> progress{};
71 std::atomic<bool> is_cancelled{};
72
73 UInt64 total_size_bytes_compressed{};
74 UInt64 total_size_marks{};
75 UInt64 total_rows_count{};
76 std::atomic<UInt64> bytes_read_uncompressed{};
77 std::atomic<UInt64> bytes_written_uncompressed{};
78
79 /// In case of Vertical algorithm they are actual only for primary key columns
80 std::atomic<UInt64> rows_read{};
81 std::atomic<UInt64> rows_written{};
82
83 /// Updated only for Vertical algorithm
84 std::atomic<UInt64> columns_written{};
85
86 MemoryTracker memory_tracker{VariableContext::Process};
87 MemoryTracker * background_thread_memory_tracker;
88 MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;
89
90 /// Poco thread number used in logs
91 UInt32 thread_number;
92
93
94 MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);
95
96 MergeInfo getInfo() const;
97
98 ~MergeListElement();
99};
100
101
102class MergeList;
103
104class MergeListEntry
105{
106 MergeList & list;
107
108 using container_t = std::list<MergeListElement>;
109 container_t::iterator it;
110
111 CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
112
113public:
114 MergeListEntry(const MergeListEntry &) = delete;
115 MergeListEntry & operator=(const MergeListEntry &) = delete;
116
117 MergeListEntry(MergeList & list_, const container_t::iterator it_) : list(list_), it{it_} {}
118 ~MergeListEntry();
119
120 MergeListElement * operator->() { return &*it; }
121 const MergeListElement * operator->() const { return &*it; }
122};
123
124
125class MergeList
126{
127 friend class MergeListEntry;
128
129 using container_t = std::list<MergeListElement>;
130 using info_container_t = std::list<MergeInfo>;
131
132 mutable std::mutex mutex;
133 container_t merges;
134
135public:
136 using Entry = MergeListEntry;
137 using EntryPtr = std::unique_ptr<Entry>;
138
139 template <typename... Args>
140 EntryPtr insert(Args &&... args)
141 {
142 std::lock_guard lock{mutex};
143 return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
144 }
145
146 info_container_t get() const
147 {
148 std::lock_guard lock{mutex};
149 info_container_t res;
150 for (const auto & merge_element : merges)
151 res.emplace_back(merge_element.getInfo());
152 return res;
153 }
154
155 void cancelPartMutations(const String & partition_id, Int64 mutation_version)
156 {
157 std::lock_guard lock{mutex};
158 for (auto & merge_element : merges)
159 {
160 if ((partition_id.empty() || merge_element.partition_id == partition_id)
161 && merge_element.source_data_version < mutation_version
162 && merge_element.result_data_version >= mutation_version)
163 merge_element.is_cancelled = true;
164 }
165 }
166};
167
168
169inline MergeListEntry::~MergeListEntry()
170{
171 std::lock_guard lock{list.mutex};
172 list.merges.erase(it);
173}
174
175
176}
177