1#include <Storages/MergeTree/MergeTreePartsMover.h>
2#include <Storages/MergeTree/MergeTreeData.h>
3#include <set>
4#include <boost/algorithm/string/join.hpp>
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int ABORTED;
12 extern const int NO_SUCH_DATA_PART;
13 extern const int LOGICAL_ERROR;
14}
15
16namespace
17{
18
19/// Contains minimal number of heaviest parts, which sum size on disk is greater than required.
20/// If there are not enough summary size, than contains all parts.
21class LargestPartsWithRequiredSize
22{
23 struct PartsSizeOnDiskComparator
24 {
25 bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
26 {
27 /// If parts have equal sizes, than order them by names (names are unique)
28 return std::tie(f->bytes_on_disk, f->name) < std::tie(s->bytes_on_disk, s->name);
29 }
30 };
31
32 std::set<MergeTreeData::DataPartPtr, PartsSizeOnDiskComparator> elems;
33 UInt64 required_size_sum;
34 UInt64 current_size_sum = 0;
35
36public:
37 LargestPartsWithRequiredSize(UInt64 required_sum_size_) : required_size_sum(required_sum_size_) {}
38
39 void add(MergeTreeData::DataPartPtr part)
40 {
41 if (current_size_sum < required_size_sum)
42 {
43 elems.emplace(part);
44 current_size_sum += part->bytes_on_disk;
45 return;
46 }
47
48 /// Adding smaller element
49 if (!elems.empty() && (*elems.begin())->bytes_on_disk >= part->bytes_on_disk)
50 return;
51
52 elems.emplace(part);
53 current_size_sum += part->bytes_on_disk;
54
55 removeRedundantElements();
56 }
57
58 /// Weaken requirements on size
59 void decreaseRequiredSizeAndRemoveRedundantParts(UInt64 size_decrease)
60 {
61 required_size_sum -= std::min(size_decrease, required_size_sum);
62 removeRedundantElements();
63 }
64
65 /// Returns parts ordered by size
66 MergeTreeData::DataPartsVector getAccumulatedParts()
67 {
68 MergeTreeData::DataPartsVector res;
69 for (const auto & elem : elems)
70 res.push_back(elem);
71 return res;
72 }
73
74private:
75 void removeRedundantElements()
76 {
77 while (!elems.empty() && (current_size_sum - (*elems.begin())->bytes_on_disk >= required_size_sum))
78 {
79 current_size_sum -= (*elems.begin())->bytes_on_disk;
80 elems.erase(elems.begin());
81 }
82 }
83};
84
85}
86
87bool MergeTreePartsMover::selectPartsForMove(
88 MergeTreeMovingParts & parts_to_move,
89 const AllowedMovingPredicate & can_move,
90 const std::lock_guard<std::mutex> & /* moving_parts_lock */)
91{
92 unsigned parts_to_move_by_policy_rules = 0;
93 unsigned parts_to_move_by_ttl_rules = 0;
94 double parts_to_move_total_size_bytes = 0.0;
95
96 MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector();
97
98 if (data_parts.empty())
99 return false;
100
101 std::unordered_map<DiskPtr, LargestPartsWithRequiredSize> need_to_move;
102 const auto & policy = data->getStoragePolicy();
103 const auto & volumes = policy->getVolumes();
104
105 if (volumes.size() > 0)
106 {
107 /// Do not check last volume
108 for (size_t i = 0; i != volumes.size() - 1; ++i)
109 {
110 for (const auto & disk : volumes[i]->disks)
111 {
112 UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor();
113 UInt64 unreserved_space = disk->getUnreservedSpace();
114
115 if (unreserved_space < required_maximum_available_space)
116 need_to_move.emplace(disk, required_maximum_available_space - unreserved_space);
117 }
118 }
119 }
120
121 time_t time_of_move = time(nullptr);
122
123 for (const auto & part : data_parts)
124 {
125 String reason;
126 /// Don't report message to log, because logging is excessive.
127 if (!can_move(part, &reason))
128 continue;
129
130 const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
131 auto to_insert = need_to_move.find(part->disk);
132 ReservationPtr reservation;
133 if (ttl_entry_ptr)
134 {
135 auto destination = ttl_entry_ptr->getDestination(policy);
136 if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
137 reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
138 }
139
140 if (reservation) /// Found reservation by TTL rule.
141 {
142 parts_to_move.emplace_back(part, std::move(reservation));
143 /// If table TTL rule satisfies on this part, won't apply policy rules on it.
144 /// In order to not over-move, we need to "release" required space on this disk,
145 /// possibly to zero.
146 if (to_insert != need_to_move.end())
147 {
148 to_insert->second.decreaseRequiredSizeAndRemoveRedundantParts(part->bytes_on_disk);
149 }
150 ++parts_to_move_by_ttl_rules;
151 parts_to_move_total_size_bytes += part->bytes_on_disk;
152 }
153 else
154 {
155 if (to_insert != need_to_move.end())
156 to_insert->second.add(part);
157 }
158 }
159
160 for (auto && move : need_to_move)
161 {
162 auto min_volume_index = policy->getVolumeIndexByDisk(move.first) + 1;
163 for (auto && part : move.second.getAccumulatedParts())
164 {
165 auto reservation = policy->reserve(part->bytes_on_disk, min_volume_index);
166 if (!reservation)
167 {
168 /// Next parts to move from this disk has greater size and same min volume index.
169 /// There are no space for them.
170 /// But it can be possible to move data from other disks.
171 break;
172 }
173 parts_to_move.emplace_back(part, std::move(reservation));
174 ++parts_to_move_by_policy_rules;
175 parts_to_move_total_size_bytes += part->bytes_on_disk;
176 }
177 }
178
179 if (!parts_to_move.empty())
180 {
181 LOG_TRACE(log, "Selected " << parts_to_move_by_policy_rules << " parts to move according to storage policy rules and "
182 << parts_to_move_by_ttl_rules << " parts according to TTL rules, "
183 << formatReadableSizeWithBinarySuffix(parts_to_move_total_size_bytes) << " total");
184 return true;
185 }
186 else
187 return false;
188}
189
190MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
191{
192 if (moves_blocker.isCancelled())
193 throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
194
195 LOG_TRACE(log, "Cloning part " << moving_part.part->name);
196 moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space);
197
198 MergeTreeData::MutableDataPartPtr cloned_part =
199 std::make_shared<MergeTreeData::DataPart>(*data, moving_part.reserved_space->getDisk(), moving_part.part->name);
200 cloned_part->relative_path = "detached/" + moving_part.part->name;
201 LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath());
202
203 cloned_part->loadColumnsChecksumsIndexes(true, true);
204 return cloned_part;
205
206}
207
208
209void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & cloned_part) const
210{
211 if (moves_blocker.isCancelled())
212 throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
213
214 auto active_part = data->getActiveContainingPart(cloned_part->name);
215
216 /// It's ok, because we don't block moving parts for merges or mutations
217 if (!active_part || active_part->name != cloned_part->name)
218 {
219 LOG_INFO(log, "Failed to swap " << cloned_part->name << ". Active part doesn't exist."
220 << " Possible it was merged or mutated. Will remove copy on path '" << cloned_part->getFullPath() << "'.");
221 return;
222 }
223
224 cloned_part->renameTo(active_part->name);
225 /// TODO what happen if server goes down here?
226 data->swapActivePart(cloned_part);
227
228 LOG_TRACE(log, "Part " << cloned_part->name << " was moved to " << cloned_part->getFullPath());
229}
230
231}
232