1 | #pragma once |
---|---|
2 | |
3 | #include <Common/Stopwatch.h> |
4 | #include <Common/CurrentMetrics.h> |
5 | #include <Common/MemoryTracker.h> |
6 | #include <Interpreters/Context.h> |
7 | #include <Storages/MergeTree/MergeTreeData.h> |
8 | #include <memory> |
9 | #include <list> |
10 | #include <mutex> |
11 | #include <atomic> |
12 | |
13 | |
14 | /** Maintains a list of currently running merges. |
15 | * For implementation of system.merges table. |
16 | */ |
17 | |
18 | namespace CurrentMetrics |
19 | { |
20 | extern const Metric Merge; |
21 | } |
22 | |
23 | namespace DB |
24 | { |
25 | |
26 | struct MergeInfo |
27 | { |
28 | std::string database; |
29 | std::string table; |
30 | std::string result_part_name; |
31 | std::string result_part_path; |
32 | Array source_part_names; |
33 | Array source_part_paths; |
34 | std::string partition_id; |
35 | bool is_mutation; |
36 | Float64 elapsed; |
37 | Float64 progress; |
38 | UInt64 num_parts; |
39 | UInt64 total_size_bytes_compressed; |
40 | UInt64 total_size_marks; |
41 | UInt64 total_rows_count; |
42 | UInt64 bytes_read_uncompressed; |
43 | UInt64 bytes_written_uncompressed; |
44 | UInt64 rows_read; |
45 | UInt64 rows_written; |
46 | UInt64 columns_written; |
47 | UInt64 memory_usage; |
48 | UInt64 thread_number; |
49 | }; |
50 | |
51 | struct FutureMergedMutatedPart; |
52 | |
53 | struct MergeListElement : boost::noncopyable |
54 | { |
55 | const std::string database; |
56 | const std::string table; |
57 | std::string partition_id; |
58 | |
59 | const std::string result_part_name; |
60 | const std::string result_part_path; |
61 | Int64 result_data_version{}; |
62 | bool is_mutation{}; |
63 | |
64 | UInt64 num_parts{}; |
65 | Names source_part_names; |
66 | Names source_part_paths; |
67 | Int64 source_data_version{}; |
68 | |
69 | Stopwatch watch; |
70 | std::atomic<Float64> progress{}; |
71 | std::atomic<bool> is_cancelled{}; |
72 | |
73 | UInt64 total_size_bytes_compressed{}; |
74 | UInt64 total_size_marks{}; |
75 | UInt64 total_rows_count{}; |
76 | std::atomic<UInt64> bytes_read_uncompressed{}; |
77 | std::atomic<UInt64> bytes_written_uncompressed{}; |
78 | |
79 | /// In case of Vertical algorithm they are actual only for primary key columns |
80 | std::atomic<UInt64> rows_read{}; |
81 | std::atomic<UInt64> rows_written{}; |
82 | |
83 | /// Updated only for Vertical algorithm |
84 | std::atomic<UInt64> columns_written{}; |
85 | |
86 | MemoryTracker memory_tracker{VariableContext::Process}; |
87 | MemoryTracker * background_thread_memory_tracker; |
88 | MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr; |
89 | |
90 | /// Poco thread number used in logs |
91 | UInt32 thread_number; |
92 | |
93 | |
94 | MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part); |
95 | |
96 | MergeInfo getInfo() const; |
97 | |
98 | ~MergeListElement(); |
99 | }; |
100 | |
101 | |
102 | class MergeList; |
103 | |
104 | class MergeListEntry |
105 | { |
106 | MergeList & list; |
107 | |
108 | using container_t = std::list<MergeListElement>; |
109 | container_t::iterator it; |
110 | |
111 | CurrentMetrics::Increment num_merges {CurrentMetrics::Merge}; |
112 | |
113 | public: |
114 | MergeListEntry(const MergeListEntry &) = delete; |
115 | MergeListEntry & operator=(const MergeListEntry &) = delete; |
116 | |
117 | MergeListEntry(MergeList & list_, const container_t::iterator it_) : list(list_), it{it_} {} |
118 | ~MergeListEntry(); |
119 | |
120 | MergeListElement * operator->() { return &*it; } |
121 | const MergeListElement * operator->() const { return &*it; } |
122 | }; |
123 | |
124 | |
125 | class MergeList |
126 | { |
127 | friend class MergeListEntry; |
128 | |
129 | using container_t = std::list<MergeListElement>; |
130 | using info_container_t = std::list<MergeInfo>; |
131 | |
132 | mutable std::mutex mutex; |
133 | container_t merges; |
134 | |
135 | public: |
136 | using Entry = MergeListEntry; |
137 | using EntryPtr = std::unique_ptr<Entry>; |
138 | |
139 | template <typename... Args> |
140 | EntryPtr insert(Args &&... args) |
141 | { |
142 | std::lock_guard lock{mutex}; |
143 | return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...)); |
144 | } |
145 | |
146 | info_container_t get() const |
147 | { |
148 | std::lock_guard lock{mutex}; |
149 | info_container_t res; |
150 | for (const auto & merge_element : merges) |
151 | res.emplace_back(merge_element.getInfo()); |
152 | return res; |
153 | } |
154 | |
155 | void cancelPartMutations(const String & partition_id, Int64 mutation_version) |
156 | { |
157 | std::lock_guard lock{mutex}; |
158 | for (auto & merge_element : merges) |
159 | { |
160 | if ((partition_id.empty() || merge_element.partition_id == partition_id) |
161 | && merge_element.source_data_version < mutation_version |
162 | && merge_element.result_data_version >= mutation_version) |
163 | merge_element.is_cancelled = true; |
164 | } |
165 | } |
166 | }; |
167 | |
168 | |
169 | inline MergeListEntry::~MergeListEntry() |
170 | { |
171 | std::lock_guard lock{list.mutex}; |
172 | list.merges.erase(it); |
173 | } |
174 | |
175 | |
176 | } |
177 |