1#include "DiskSpaceMonitor.h"
2#include "DiskFactory.h"
3#include "DiskLocal.h"
4
5#include <Interpreters/Context.h>
6#include <Common/escapeForFileName.h>
7#include <Common/quoteString.h>
8
9#include <set>
10#include <Poco/File.h>
11
12
13namespace DB
14{
15DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context)
16{
17 Poco::Util::AbstractConfiguration::Keys keys;
18 config.keys(config_prefix, keys);
19
20 auto & factory = DiskFactory::instance();
21
22 constexpr auto default_disk_name = "default";
23 bool has_default_disk = false;
24 for (const auto & disk_name : keys)
25 {
26 if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
27 throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
28
29 if (disk_name == default_disk_name)
30 has_default_disk = true;
31
32 auto disk_config_prefix = config_prefix + "." + disk_name;
33
34 disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
35 }
36 if (!has_default_disk)
37 disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context.getPath(), 0));
38}
39
40
41const DiskPtr & DiskSelector::operator[](const String & name) const
42{
43 auto it = disks.find(name);
44 if (it == disks.end())
45 throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_DISK);
46 return it->second;
47}
48
49
50Volume::Volume(
51 String name_,
52 const Poco::Util::AbstractConfiguration & config,
53 const String & config_prefix,
54 const DiskSelector & disk_selector)
55 : name(std::move(name_))
56{
57 Poco::Util::AbstractConfiguration::Keys keys;
58 config.keys(config_prefix, keys);
59
60 Logger * logger = &Logger::get("StorageConfiguration");
61
62 for (const auto & disk : keys)
63 {
64 if (startsWith(disk, "disk"))
65 {
66 auto disk_name = config.getString(config_prefix + "." + disk);
67 disks.push_back(disk_selector[disk_name]);
68 }
69 }
70
71 if (disks.empty())
72 throw Exception("Volume must contain at least one disk.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
73
74 auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
75 auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
76 if (has_max_bytes && has_max_ratio)
77 throw Exception(
78 "Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified.",
79 ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
80
81 if (has_max_bytes)
82 {
83 max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes", 0);
84 }
85 else if (has_max_ratio)
86 {
87 auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio");
88 if (ratio < 0)
89 throw Exception("'max_data_part_size_ratio' have to be not less then 0.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
90 UInt64 sum_size = 0;
91 std::vector<UInt64> sizes;
92 for (const auto & disk : disks)
93 {
94 sizes.push_back(disk->getTotalSpace());
95 sum_size += sizes.back();
96 }
97 max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
98 for (size_t i = 0; i < disks.size(); ++i)
99 if (sizes[i] < max_data_part_size)
100 LOG_WARNING(
101 logger,
102 "Disk " << backQuote(disks[i]->getName()) << " on volume " << backQuote(config_prefix) << " have not enough space ("
103 << formatReadableSizeWithBinarySuffix(sizes[i]) << ") for containing part the size of max_data_part_size ("
104 << formatReadableSizeWithBinarySuffix(max_data_part_size) << ")");
105 }
106 constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
107 if (max_data_part_size != 0 && max_data_part_size < MIN_PART_SIZE)
108 LOG_WARNING(
109 logger,
110 "Volume " << backQuote(name) << " max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size)
111 << " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")");
112}
113
114
115ReservationPtr Volume::reserve(UInt64 expected_size)
116{
117 /// This volume can not store files which size greater than max_data_part_size
118
119 if (max_data_part_size != 0 && expected_size > max_data_part_size)
120 return {};
121
122 size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
123 size_t disks_num = disks.size();
124 for (size_t i = 0; i < disks_num; ++i)
125 {
126 size_t index = (start_from + i) % disks_num;
127
128 auto reservation = disks[index]->reserve(expected_size);
129
130 if (reservation)
131 return reservation;
132 }
133 return {};
134}
135
136
137UInt64 Volume::getMaxUnreservedFreeSpace() const
138{
139 UInt64 res = 0;
140 for (const auto & disk : disks)
141 res = std::max(res, disk->getUnreservedSpace());
142 return res;
143}
144
145StoragePolicy::StoragePolicy(
146 String name_,
147 const Poco::Util::AbstractConfiguration & config,
148 const String & config_prefix,
149 const DiskSelector & disks)
150 : name(std::move(name_))
151{
152 String volumes_prefix = config_prefix + ".volumes";
153 if (!config.has(volumes_prefix))
154 throw Exception("StoragePolicy must contain at least one volume (.volumes)", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
155
156 Poco::Util::AbstractConfiguration::Keys keys;
157 config.keys(volumes_prefix, keys);
158
159 for (const auto & attr_name : keys)
160 {
161 if (!std::all_of(attr_name.begin(), attr_name.end(), isWordCharASCII))
162 throw Exception(
163 "Volume name can contain only alphanumeric and '_' (" + attr_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
164 volumes.push_back(std::make_shared<Volume>(attr_name, config, volumes_prefix + "." + attr_name, disks));
165 if (volumes_names.find(attr_name) != volumes_names.end())
166 throw Exception("Volumes names must be unique (" + attr_name + " duplicated)", ErrorCodes::UNKNOWN_POLICY);
167 volumes_names[attr_name] = volumes.size() - 1;
168 }
169
170 if (volumes.empty())
171 throw Exception("StoragePolicy must contain at least one volume.", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
172
173 /// Check that disks are unique in Policy
174 std::set<String> disk_names;
175 for (const auto & volume : volumes)
176 {
177 for (const auto & disk : volume->disks)
178 {
179 if (disk_names.find(disk->getName()) != disk_names.end())
180 throw Exception(
181 "Duplicate disk '" + disk->getName() + "' in storage policy '" + name + "'", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
182
183 disk_names.insert(disk->getName());
184 }
185 }
186
187 move_factor = config.getDouble(config_prefix + ".move_factor", 0.1);
188 if (move_factor > 1)
189 throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor), ErrorCodes::LOGICAL_ERROR);
190}
191
192
193StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
194 : volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
195{
196 if (volumes.empty())
197 throw Exception("StoragePolicy must contain at least one Volume.", ErrorCodes::UNKNOWN_POLICY);
198
199 if (move_factor > 1)
200 throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor), ErrorCodes::LOGICAL_ERROR);
201
202 for (size_t i = 0; i < volumes.size(); ++i)
203 {
204 if (volumes_names.find(volumes[i]->getName()) != volumes_names.end())
205 throw Exception("Volumes names must be unique (" + volumes[i]->getName() + " duplicated).", ErrorCodes::UNKNOWN_POLICY);
206 volumes_names[volumes[i]->getName()] = i;
207 }
208}
209
210
211bool StoragePolicy::isDefaultPolicy() const
212{
213 /// Guessing if this policy is default, not 100% correct though.
214
215 if (getName() != "default")
216 return false;
217
218 if (volumes.size() != 1)
219 return false;
220
221 if (volumes[0]->getName() != "default")
222 return false;
223
224 const auto & disks = volumes[0]->disks;
225 if (disks.size() != 1)
226 return false;
227
228 if (disks[0]->getName() != "default")
229 return false;
230
231 return true;
232}
233
234
235Disks StoragePolicy::getDisks() const
236{
237 Disks res;
238 for (const auto & volume : volumes)
239 for (const auto & disk : volume->disks)
240 res.push_back(disk);
241 return res;
242}
243
244
245DiskPtr StoragePolicy::getAnyDisk() const
246{
247 /// StoragePolicy must contain at least one Volume
248 /// Volume must contain at least one Disk
249 if (volumes.empty())
250 throw Exception("StoragePolicy has no volumes. It's a bug.", ErrorCodes::NOT_ENOUGH_SPACE);
251
252 if (volumes[0]->disks.empty())
253 throw Exception("Volume '" + volumes[0]->getName() + "' has no disks. It's a bug.", ErrorCodes::NOT_ENOUGH_SPACE);
254
255 return volumes[0]->disks[0];
256}
257
258
259DiskPtr StoragePolicy::getDiskByName(const String & disk_name) const
260{
261 for (auto && volume : volumes)
262 for (auto && disk : volume->disks)
263 if (disk->getName() == disk_name)
264 return disk;
265 return {};
266}
267
268
269UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
270{
271 UInt64 res = 0;
272 for (const auto & volume : volumes)
273 res = std::max(res, volume->getMaxUnreservedFreeSpace());
274 return res;
275}
276
277
278ReservationPtr StoragePolicy::reserve(UInt64 expected_size, size_t min_volume_index) const
279{
280 for (size_t i = min_volume_index; i < volumes.size(); ++i)
281 {
282 const auto & volume = volumes[i];
283 auto reservation = volume->reserve(expected_size);
284 if (reservation)
285 return reservation;
286 }
287 return {};
288}
289
290
291ReservationPtr StoragePolicy::reserve(UInt64 expected_size) const
292{
293 return reserve(expected_size, 0);
294}
295
296
297ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
298{
299 UInt64 max_space = 0;
300 DiskPtr max_disk;
301 for (const auto & volume : volumes)
302 {
303 for (const auto & disk : volume->disks)
304 {
305 auto avail_space = disk->getAvailableSpace();
306 if (avail_space > max_space)
307 {
308 max_space = avail_space;
309 max_disk = disk;
310 }
311 }
312 }
313 return max_disk->reserve(0);
314}
315
316
317size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
318{
319 for (size_t i = 0; i < volumes.size(); ++i)
320 {
321 const auto & volume = volumes[i];
322 for (const auto & disk : volume->disks)
323 if (disk->getName() == disk_ptr->getName())
324 return i;
325 }
326 throw Exception("No disk " + disk_ptr->getName() + " in policy " + name, ErrorCodes::UNKNOWN_DISK);
327}
328
329
330StoragePolicySelector::StoragePolicySelector(
331 const Poco::Util::AbstractConfiguration & config,
332 const String & config_prefix,
333 const DiskSelector & disks)
334{
335 Poco::Util::AbstractConfiguration::Keys keys;
336 config.keys(config_prefix, keys);
337
338 for (const auto & name : keys)
339 {
340 if (!std::all_of(name.begin(), name.end(), isWordCharASCII))
341 throw Exception(
342 "StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
343
344 policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
345 LOG_INFO(&Logger::get("StoragePolicySelector"), "Storage policy " << backQuote(name) << " loaded");
346 }
347
348 constexpr auto default_storage_policy_name = "default";
349 constexpr auto default_volume_name = "default";
350 constexpr auto default_disk_name = "default";
351
352 /// Add default policy if it's not specified explicetly
353 if (policies.find(default_storage_policy_name) == policies.end())
354 {
355 auto default_volume = std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks[default_disk_name]}, 0);
356
357 auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
358 policies.emplace(default_storage_policy_name, default_policy);
359 }
360}
361
362const StoragePolicyPtr & StoragePolicySelector::operator[](const String & name) const
363{
364 auto it = policies.find(name);
365 if (it == policies.end())
366 throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY);
367 return it->second;
368}
369
370}
371