1 | #include <Storages/MergeTree/MergeTreePartsMover.h> |
2 | #include <Storages/MergeTree/MergeTreeData.h> |
3 | #include <set> |
4 | #include <boost/algorithm/string/join.hpp> |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | namespace ErrorCodes |
10 | { |
11 | extern const int ABORTED; |
12 | extern const int NO_SUCH_DATA_PART; |
13 | extern const int LOGICAL_ERROR; |
14 | } |
15 | |
16 | namespace |
17 | { |
18 | |
19 | /// Contains minimal number of heaviest parts, which sum size on disk is greater than required. |
20 | /// If there are not enough summary size, than contains all parts. |
21 | class LargestPartsWithRequiredSize |
22 | { |
23 | struct PartsSizeOnDiskComparator |
24 | { |
25 | bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const |
26 | { |
27 | /// If parts have equal sizes, than order them by names (names are unique) |
28 | return std::tie(f->bytes_on_disk, f->name) < std::tie(s->bytes_on_disk, s->name); |
29 | } |
30 | }; |
31 | |
32 | std::set<MergeTreeData::DataPartPtr, PartsSizeOnDiskComparator> elems; |
33 | UInt64 required_size_sum; |
34 | UInt64 current_size_sum = 0; |
35 | |
36 | public: |
37 | LargestPartsWithRequiredSize(UInt64 required_sum_size_) : required_size_sum(required_sum_size_) {} |
38 | |
39 | void add(MergeTreeData::DataPartPtr part) |
40 | { |
41 | if (current_size_sum < required_size_sum) |
42 | { |
43 | elems.emplace(part); |
44 | current_size_sum += part->bytes_on_disk; |
45 | return; |
46 | } |
47 | |
48 | /// Adding smaller element |
49 | if (!elems.empty() && (*elems.begin())->bytes_on_disk >= part->bytes_on_disk) |
50 | return; |
51 | |
52 | elems.emplace(part); |
53 | current_size_sum += part->bytes_on_disk; |
54 | |
55 | removeRedundantElements(); |
56 | } |
57 | |
58 | /// Weaken requirements on size |
59 | void decreaseRequiredSizeAndRemoveRedundantParts(UInt64 size_decrease) |
60 | { |
61 | required_size_sum -= std::min(size_decrease, required_size_sum); |
62 | removeRedundantElements(); |
63 | } |
64 | |
65 | /// Returns parts ordered by size |
66 | MergeTreeData::DataPartsVector getAccumulatedParts() |
67 | { |
68 | MergeTreeData::DataPartsVector res; |
69 | for (const auto & elem : elems) |
70 | res.push_back(elem); |
71 | return res; |
72 | } |
73 | |
74 | private: |
75 | void removeRedundantElements() |
76 | { |
77 | while (!elems.empty() && (current_size_sum - (*elems.begin())->bytes_on_disk >= required_size_sum)) |
78 | { |
79 | current_size_sum -= (*elems.begin())->bytes_on_disk; |
80 | elems.erase(elems.begin()); |
81 | } |
82 | } |
83 | }; |
84 | |
85 | } |
86 | |
87 | bool MergeTreePartsMover::selectPartsForMove( |
88 | MergeTreeMovingParts & parts_to_move, |
89 | const AllowedMovingPredicate & can_move, |
90 | const std::lock_guard<std::mutex> & /* moving_parts_lock */) |
91 | { |
92 | unsigned parts_to_move_by_policy_rules = 0; |
93 | unsigned parts_to_move_by_ttl_rules = 0; |
94 | double parts_to_move_total_size_bytes = 0.0; |
95 | |
96 | MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector(); |
97 | |
98 | if (data_parts.empty()) |
99 | return false; |
100 | |
101 | std::unordered_map<DiskPtr, LargestPartsWithRequiredSize> need_to_move; |
102 | const auto & policy = data->getStoragePolicy(); |
103 | const auto & volumes = policy->getVolumes(); |
104 | |
105 | if (volumes.size() > 0) |
106 | { |
107 | /// Do not check last volume |
108 | for (size_t i = 0; i != volumes.size() - 1; ++i) |
109 | { |
110 | for (const auto & disk : volumes[i]->disks) |
111 | { |
112 | UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor(); |
113 | UInt64 unreserved_space = disk->getUnreservedSpace(); |
114 | |
115 | if (unreserved_space < required_maximum_available_space) |
116 | need_to_move.emplace(disk, required_maximum_available_space - unreserved_space); |
117 | } |
118 | } |
119 | } |
120 | |
121 | time_t time_of_move = time(nullptr); |
122 | |
123 | for (const auto & part : data_parts) |
124 | { |
125 | String reason; |
126 | /// Don't report message to log, because logging is excessive. |
127 | if (!can_move(part, &reason)) |
128 | continue; |
129 | |
130 | const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move); |
131 | auto to_insert = need_to_move.find(part->disk); |
132 | ReservationPtr reservation; |
133 | if (ttl_entry_ptr) |
134 | { |
135 | auto destination = ttl_entry_ptr->getDestination(policy); |
136 | if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part)) |
137 | reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); |
138 | } |
139 | |
140 | if (reservation) /// Found reservation by TTL rule. |
141 | { |
142 | parts_to_move.emplace_back(part, std::move(reservation)); |
143 | /// If table TTL rule satisfies on this part, won't apply policy rules on it. |
144 | /// In order to not over-move, we need to "release" required space on this disk, |
145 | /// possibly to zero. |
146 | if (to_insert != need_to_move.end()) |
147 | { |
148 | to_insert->second.decreaseRequiredSizeAndRemoveRedundantParts(part->bytes_on_disk); |
149 | } |
150 | ++parts_to_move_by_ttl_rules; |
151 | parts_to_move_total_size_bytes += part->bytes_on_disk; |
152 | } |
153 | else |
154 | { |
155 | if (to_insert != need_to_move.end()) |
156 | to_insert->second.add(part); |
157 | } |
158 | } |
159 | |
160 | for (auto && move : need_to_move) |
161 | { |
162 | auto min_volume_index = policy->getVolumeIndexByDisk(move.first) + 1; |
163 | for (auto && part : move.second.getAccumulatedParts()) |
164 | { |
165 | auto reservation = policy->reserve(part->bytes_on_disk, min_volume_index); |
166 | if (!reservation) |
167 | { |
168 | /// Next parts to move from this disk has greater size and same min volume index. |
169 | /// There are no space for them. |
170 | /// But it can be possible to move data from other disks. |
171 | break; |
172 | } |
173 | parts_to_move.emplace_back(part, std::move(reservation)); |
174 | ++parts_to_move_by_policy_rules; |
175 | parts_to_move_total_size_bytes += part->bytes_on_disk; |
176 | } |
177 | } |
178 | |
179 | if (!parts_to_move.empty()) |
180 | { |
181 | LOG_TRACE(log, "Selected " << parts_to_move_by_policy_rules << " parts to move according to storage policy rules and " |
182 | << parts_to_move_by_ttl_rules << " parts according to TTL rules, " |
183 | << formatReadableSizeWithBinarySuffix(parts_to_move_total_size_bytes) << " total" ); |
184 | return true; |
185 | } |
186 | else |
187 | return false; |
188 | } |
189 | |
190 | MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const |
191 | { |
192 | if (moves_blocker.isCancelled()) |
193 | throw Exception("Cancelled moving parts." , ErrorCodes::ABORTED); |
194 | |
195 | LOG_TRACE(log, "Cloning part " << moving_part.part->name); |
196 | moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space); |
197 | |
198 | MergeTreeData::MutableDataPartPtr cloned_part = |
199 | std::make_shared<MergeTreeData::DataPart>(*data, moving_part.reserved_space->getDisk(), moving_part.part->name); |
200 | cloned_part->relative_path = "detached/" + moving_part.part->name; |
201 | LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath()); |
202 | |
203 | cloned_part->loadColumnsChecksumsIndexes(true, true); |
204 | return cloned_part; |
205 | |
206 | } |
207 | |
208 | |
209 | void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & cloned_part) const |
210 | { |
211 | if (moves_blocker.isCancelled()) |
212 | throw Exception("Cancelled moving parts." , ErrorCodes::ABORTED); |
213 | |
214 | auto active_part = data->getActiveContainingPart(cloned_part->name); |
215 | |
216 | /// It's ok, because we don't block moving parts for merges or mutations |
217 | if (!active_part || active_part->name != cloned_part->name) |
218 | { |
219 | LOG_INFO(log, "Failed to swap " << cloned_part->name << ". Active part doesn't exist." |
220 | << " Possible it was merged or mutated. Will remove copy on path '" << cloned_part->getFullPath() << "'." ); |
221 | return; |
222 | } |
223 | |
224 | cloned_part->renameTo(active_part->name); |
225 | /// TODO what happen if server goes down here? |
226 | data->swapActivePart(cloned_part); |
227 | |
228 | LOG_TRACE(log, "Part " << cloned_part->name << " was moved to " << cloned_part->getFullPath()); |
229 | } |
230 | |
231 | } |
232 | |