1#include "MergeTreeDataPart.h"
2
3#include <optional>
4#include <IO/ReadHelpers.h>
5#include <IO/WriteHelpers.h>
6#include <Compression/CompressedReadBuffer.h>
7#include <Compression/CompressedWriteBuffer.h>
8#include <IO/ReadBufferFromString.h>
9#include <IO/WriteBufferFromString.h>
10#include <IO/ReadBufferFromFile.h>
11#include <IO/HashingWriteBuffer.h>
12#include <Core/Defines.h>
13#include <Common/SipHash.h>
14#include <Common/escapeForFileName.h>
15#include <Common/StringUtils/StringUtils.h>
16#include <Common/localBackup.h>
17#include <Compression/CompressionInfo.h>
18#include <Storages/MergeTree/MergeTreeData.h>
19#include <Poco/File.h>
20#include <Poco/Path.h>
21#include <Poco/DirectoryIterator.h>
22#include <common/logger_useful.h>
23#include <common/JSON.h>
24
25namespace DB
26{
27
28namespace ErrorCodes
29{
30 extern const int FILE_DOESNT_EXIST;
31 extern const int NO_FILE_IN_DATA_PART;
32 extern const int EXPECTED_END_OF_FILE;
33 extern const int CORRUPTED_DATA;
34 extern const int NOT_FOUND_EXPECTED_DATA_PART;
35 extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
36 extern const int BAD_TTL_FILE;
37 extern const int CANNOT_UNLINK;
38}
39
40
41static ReadBufferFromFile openForReading(const String & path)
42{
43 return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
44}
45
46static String getFileNameForColumn(const NameAndTypePair & column)
47{
48 String filename;
49 column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
50 {
51 if (filename.empty())
52 filename = IDataType::getFileNameForStream(column.name, substream_path);
53 });
54 return filename;
55}
56
57void MergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const String & part_path)
58{
59 size_t minmax_idx_size = data.minmax_idx_column_types.size();
60 parallelogram.reserve(minmax_idx_size);
61 for (size_t i = 0; i < minmax_idx_size; ++i)
62 {
63 String file_name = part_path + "minmax_" + escapeForFileName(data.minmax_idx_columns[i]) + ".idx";
64 ReadBufferFromFile file = openForReading(file_name);
65 const DataTypePtr & type = data.minmax_idx_column_types[i];
66
67 Field min_val;
68 type->deserializeBinary(min_val, file);
69 Field max_val;
70 type->deserializeBinary(max_val, file);
71
72 parallelogram.emplace_back(min_val, true, max_val, true);
73 }
74 initialized = true;
75}
76
77void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & data, const String & part_path, Checksums & out_checksums) const
78{
79 store(data.minmax_idx_columns, data.minmax_idx_column_types, part_path, out_checksums);
80}
81
82void MergeTreeDataPart::MinMaxIndex::store(const Names & column_names, const DataTypes & data_types, const String & part_path, Checksums & out_checksums) const
83{
84 if (!initialized)
85 throw Exception("Attempt to store uninitialized MinMax index for part " + part_path + ". This is a bug.",
86 ErrorCodes::LOGICAL_ERROR);
87
88 for (size_t i = 0; i < column_names.size(); ++i)
89 {
90 String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
91 const DataTypePtr & type = data_types.at(i);
92
93 WriteBufferFromFile out(part_path + file_name);
94 HashingWriteBuffer out_hashing(out);
95 type->serializeBinary(parallelogram[i].left, out_hashing);
96 type->serializeBinary(parallelogram[i].right, out_hashing);
97 out_hashing.next();
98 out_checksums.files[file_name].file_size = out_hashing.count();
99 out_checksums.files[file_name].file_hash = out_hashing.getHash();
100 }
101}
102
103void MergeTreeDataPart::MinMaxIndex::update(const Block & block, const Names & column_names)
104{
105 if (!initialized)
106 parallelogram.reserve(column_names.size());
107
108 for (size_t i = 0; i < column_names.size(); ++i)
109 {
110 Field min_value;
111 Field max_value;
112 const ColumnWithTypeAndName & column = block.getByName(column_names[i]);
113 column.column->getExtremes(min_value, max_value);
114
115 if (!initialized)
116 parallelogram.emplace_back(min_value, true, max_value, true);
117 else
118 {
119 parallelogram[i].left = std::min(parallelogram[i].left, min_value);
120 parallelogram[i].right = std::max(parallelogram[i].right, max_value);
121 }
122 }
123
124 initialized = true;
125}
126
127void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
128{
129 if (!other.initialized)
130 return;
131
132 if (!initialized)
133 {
134 parallelogram = other.parallelogram;
135 initialized = true;
136 }
137 else
138 {
139 for (size_t i = 0; i < parallelogram.size(); ++i)
140 {
141 parallelogram[i].left = std::min(parallelogram[i].left, other.parallelogram[i].left);
142 parallelogram[i].right = std::max(parallelogram[i].right, other.parallelogram[i].right);
143 }
144 }
145}
146
147
148MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskPtr & disk_, const String & name_)
149 : storage(storage_)
150 , disk(disk_)
151 , name(name_)
152 , info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
153 , index_granularity_info(storage)
154{
155}
156
157MergeTreeDataPart::MergeTreeDataPart(
158 const MergeTreeData & storage_,
159 const DiskPtr & disk_,
160 const String & name_,
161 const MergeTreePartInfo & info_)
162 : storage(storage_)
163 , disk(disk_)
164 , name(name_)
165 , info(info_)
166 , index_granularity_info(storage)
167{
168}
169
170
171/// Takes into account the fact that several columns can e.g. share their .size substreams.
172/// When calculating totals these should be counted only once.
173ColumnSize MergeTreeDataPart::getColumnSizeImpl(
174 const String & column_name, const IDataType & type, std::unordered_set<String> * processed_substreams) const
175{
176 ColumnSize size;
177 if (checksums.empty())
178 return size;
179
180 type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
181 {
182 String file_name = IDataType::getFileNameForStream(column_name, substream_path);
183
184 if (processed_substreams && !processed_substreams->insert(file_name).second)
185 return;
186
187 auto bin_checksum = checksums.files.find(file_name + ".bin");
188 if (bin_checksum != checksums.files.end())
189 {
190 size.data_compressed += bin_checksum->second.file_size;
191 size.data_uncompressed += bin_checksum->second.uncompressed_size;
192 }
193
194 auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
195 if (mrk_checksum != checksums.files.end())
196 size.marks += mrk_checksum->second.file_size;
197 }, {});
198
199 return size;
200}
201
202ColumnSize MergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const
203{
204 return getColumnSizeImpl(column_name, type, nullptr);
205}
206
207ColumnSize MergeTreeDataPart::getTotalColumnsSize() const
208{
209 ColumnSize totals;
210 std::unordered_set<String> processed_substreams;
211 for (const NameAndTypePair & column : columns)
212 {
213 ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
214 totals.add(size);
215 }
216 return totals;
217}
218
219
220size_t MergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
221{
222 auto checksum = checksums.files.find(file_name);
223 if (checksum == checksums.files.end())
224 return 0;
225 return checksum->second.file_size;
226}
227
228/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
229 * If no checksums are present returns the name of the first physically existing column.
230 */
231String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
232{
233 const auto & storage_columns = storage.getColumns().getAllPhysical();
234 const std::string * minimum_size_column = nullptr;
235 UInt64 minimum_size = std::numeric_limits<UInt64>::max();
236
237 for (const auto & column : storage_columns)
238 {
239 if (!hasColumnFiles(column.name, *column.type))
240 continue;
241
242 const auto size = getColumnSize(column.name, *column.type).data_compressed;
243 if (size < minimum_size)
244 {
245 minimum_size = size;
246 minimum_size_column = &column.name;
247 }
248 }
249
250 if (!minimum_size_column)
251 throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
252
253 return *minimum_size_column;
254}
255
256
257String MergeTreeDataPart::getFullPath() const
258{
259 if (relative_path.empty())
260 throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
261
262 return storage.getFullPathOnDisk(disk) + relative_path + "/";
263}
264
265String MergeTreeDataPart::getNameWithPrefix() const
266{
267 String res = Poco::Path(relative_path).getFileName();
268
269 if (res.empty())
270 throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR);
271
272 return res;
273}
274
275String MergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) const
276{
277 if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
278 {
279 /// NOTE: getting min and max dates from the part name (instead of part data) because we want
280 /// the merged part name be determined only by source part names.
281 /// It is simpler this way when the real min and max dates for the block range can change
282 /// (e.g. after an ALTER DELETE command).
283 DayNum min_date;
284 DayNum max_date;
285 MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
286 return new_part_info.getPartNameV0(min_date, max_date);
287 }
288 else
289 return new_part_info.getPartName();
290}
291
292DayNum MergeTreeDataPart::getMinDate() const
293{
294 if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
295 return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].left.get<UInt64>());
296 else
297 return DayNum();
298}
299
300
301DayNum MergeTreeDataPart::getMaxDate() const
302{
303 if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
304 return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].right.get<UInt64>());
305 else
306 return DayNum();
307}
308
309time_t MergeTreeDataPart::getMinTime() const
310{
311 if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized)
312 return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].left.get<UInt64>();
313 else
314 return 0;
315}
316
317
318time_t MergeTreeDataPart::getMaxTime() const
319{
320 if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized)
321 return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].right.get<UInt64>();
322 else
323 return 0;
324}
325
326MergeTreeDataPart::~MergeTreeDataPart()
327{
328 if (state == State::DeleteOnDestroy || is_temp)
329 {
330 try
331 {
332 std::string path = getFullPath();
333
334 Poco::File dir(path);
335 if (!dir.exists())
336 return;
337
338 if (is_temp)
339 {
340 if (!startsWith(getNameWithPrefix(), "tmp"))
341 {
342 LOG_ERROR(storage.log, "~DataPart() should remove part " << path
343 << " but its name doesn't start with tmp. Too suspicious, keeping the part.");
344 return;
345 }
346 }
347
348 dir.remove(true);
349
350 if (state == State::DeleteOnDestroy)
351 {
352 LOG_TRACE(storage.log, "Removed part from old location " << path);
353 }
354 }
355 catch (...)
356 {
357 tryLogCurrentException(__PRETTY_FUNCTION__);
358 }
359 }
360}
361
362UInt64 MergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
363{
364 Poco::File cur(from);
365 if (cur.isFile())
366 return cur.getSize();
367 std::vector<std::string> files;
368 cur.list(files);
369 UInt64 res = 0;
370 for (const auto & file : files)
371 res += calculateTotalSizeOnDisk(from + file);
372 return res;
373}
374
375void MergeTreeDataPart::remove() const
376{
377 if (relative_path.empty())
378 throw Exception("Part relative_path cannot be empty. This is bug.", ErrorCodes::LOGICAL_ERROR);
379
380 /** Atomic directory removal:
381 * - rename directory to temporary name;
382 * - remove it recursive.
383 *
384 * For temporary name we use "delete_tmp_" prefix.
385 *
386 * NOTE: We cannot use "tmp_delete_" prefix, because there is a second thread,
387 * that calls "clearOldTemporaryDirectories" and removes all directories, that begin with "tmp_" and are old enough.
388 * But when we removing data part, it can be old enough. And rename doesn't change mtime.
389 * And a race condition can happen that will lead to "File not found" error here.
390 */
391
392 String full_path = storage.getFullPathOnDisk(disk);
393 String from = full_path + relative_path;
394 String to = full_path + "delete_tmp_" + name;
395 // TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
396
397
398 Poco::File from_dir{from};
399 Poco::File to_dir{to};
400
401 if (to_dir.exists())
402 {
403 LOG_WARNING(storage.log, "Directory " << to << " (to which part must be renamed before removing) already exists."
404 " Most likely this is due to unclean restart. Removing it.");
405
406 try
407 {
408 to_dir.remove(true);
409 }
410 catch (...)
411 {
412 LOG_ERROR(storage.log, "Cannot remove directory " << to << ". Check owner and access rights.");
413 throw;
414 }
415 }
416
417 try
418 {
419 from_dir.renameTo(to);
420 }
421 catch (const Poco::FileNotFoundException &)
422 {
423 LOG_ERROR(storage.log, "Directory " << from << " (part to remove) doesn't exist or one of nested files has gone."
424 " Most likely this is due to manual removing. This should be discouraged. Ignoring.");
425
426 return;
427 }
428
429 try
430 {
431 /// Remove each expected file in directory, then remove directory itself.
432
433#if !__clang__
434#pragma GCC diagnostic push
435#pragma GCC diagnostic ignored "-Wunused-variable"
436#endif
437 std::shared_lock<std::shared_mutex> lock(columns_lock);
438
439 for (const auto & [file, _] : checksums.files)
440 {
441 String path_to_remove = to + "/" + file;
442 if (0 != unlink(path_to_remove.c_str()))
443 throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove,
444 ErrorCodes::CANNOT_UNLINK);
445 }
446#if !__clang__
447#pragma GCC diagnostic pop
448#endif
449
450 for (const auto & file : {"checksums.txt", "columns.txt"})
451 {
452 String path_to_remove = to + "/" + file;
453 if (0 != unlink(path_to_remove.c_str()))
454 throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove,
455 ErrorCodes::CANNOT_UNLINK);
456 }
457
458 if (0 != rmdir(to.c_str()))
459 throwFromErrnoWithPath("Cannot rmdir file " + to, to, ErrorCodes::CANNOT_UNLINK);
460 }
461 catch (...)
462 {
463 /// Recursive directory removal does many excessive "stat" syscalls under the hood.
464
465 LOG_ERROR(storage.log, "Cannot quickly remove directory " << to << " by removing files; fallback to recursive removal. Reason: "
466 << getCurrentExceptionMessage(false));
467
468 to_dir.remove(true);
469 }
470}
471
472
473void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
474{
475 String from = getFullPath();
476 String to = storage.getFullPathOnDisk(disk) + new_relative_path + "/";
477
478 Poco::File from_file(from);
479 if (!from_file.exists())
480 throw Exception("Part directory " + from + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
481
482 Poco::File to_file(to);
483 if (to_file.exists())
484 {
485 if (remove_new_dir_if_exists)
486 {
487 Names files;
488 Poco::File(from).list(files);
489
490 LOG_WARNING(storage.log, "Part directory " << to << " already exists"
491 << " and contains " << files.size() << " files. Removing it.");
492
493 to_file.remove(true);
494 }
495 else
496 {
497 throw Exception("Part directory " + to + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
498 }
499 }
500
501 from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr)));
502 from_file.renameTo(to);
503 relative_path = new_relative_path;
504}
505
506
507String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
508{
509 /// Do not allow underscores in the prefix because they are used as separators.
510 assert(prefix.find_first_of('_') == String::npos);
511
512 String res;
513
514 /** If you need to detach a part, and directory into which we want to rename it already exists,
515 * we will rename to the directory with the name to which the suffix is added in the form of "_tryN".
516 * This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter.
517 * No more than 10 attempts are made so that there are not too many junk directories left.
518 */
519 for (int try_no = 0; try_no < 10; try_no++)
520 {
521 res = "detached/" + (prefix.empty() ? "" : prefix + "_")
522 + name + (try_no ? "_try" + DB::toString(try_no) : "");
523
524 if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists())
525 return res;
526
527 LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
528 " Will detach to directory with '_tryN' suffix.");
529 }
530
531 return res;
532}
533
534void MergeTreeDataPart::renameToDetached(const String & prefix) const
535{
536 renameTo(getRelativePathForDetachedPart(prefix));
537}
538
539
540UInt64 MergeTreeDataPart::getMarksCount() const
541{
542 return index_granularity.getMarksCount();
543}
544
545void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
546{
547 Poco::Path src(getFullPath());
548 Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix));
549 /// Backup is not recursive (max_level is 0), so do not copy inner directories
550 localBackup(src, dst, 0);
551}
552
553void MergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
554{
555 auto reserved_disk = reservation->getDisk();
556 if (reserved_disk->getName() == disk->getName())
557 throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);
558
559 String path_to_clone = storage.getFullPathOnDisk(reserved_disk) + "detached/";
560
561 if (Poco::File(path_to_clone + relative_path).exists())
562 throw Exception("Path " + path_to_clone + relative_path + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
563 Poco::File(path_to_clone).createDirectory();
564
565 Poco::File cloning_directory(getFullPath());
566 cloning_directory.copyTo(path_to_clone);
567}
568
569void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
570{
571 /// Memory should not be limited during ATTACH TABLE query.
572 /// This is already true at the server startup but must be also ensured for manual table ATTACH.
573 /// Motivation: memory for index is shared between queries - not belong to the query itself.
574 auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
575
576 loadColumns(require_columns_checksums);
577 loadChecksums(require_columns_checksums);
578 loadIndexGranularity();
579 loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
580 loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`.
581 loadPartitionAndMinMaxIndex();
582 loadTTLInfos();
583 if (check_consistency)
584 checkConsistency(require_columns_checksums);
585}
586
587void MergeTreeDataPart::loadIndexGranularity()
588{
589 String full_path = getFullPath();
590 index_granularity_info.changeGranularityIfRequired(full_path);
591
592 if (columns.empty())
593 throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
594
595
596 /// We can use any column, it doesn't matter
597 std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + getFileNameForColumn(columns.front()));
598 if (!Poco::File(marks_file_path).exists())
599 throw Exception("Marks file '" + marks_file_path + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
600
601 size_t marks_file_size = Poco::File(marks_file_path).getSize();
602
603 /// old version of marks with static index granularity
604 if (!index_granularity_info.is_adaptive)
605 {
606 size_t marks_count = marks_file_size / index_granularity_info.mark_size_in_bytes;
607 index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
608 }
609 else
610 {
611 ReadBufferFromFile buffer(marks_file_path, marks_file_size, -1);
612 while (!buffer.eof())
613 {
614 buffer.seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
615 size_t granularity;
616 readIntBinary(granularity, buffer);
617 index_granularity.appendMark(granularity);
618 }
619 if (index_granularity.getMarksCount() * index_granularity_info.mark_size_in_bytes != marks_file_size)
620 throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
621 }
622 index_granularity.setInitialized();
623}
624
625void MergeTreeDataPart::loadIndex()
626{
627 /// It can be empty in case of mutations
628 if (!index_granularity.isInitialized())
629 throw Exception("Index granularity is not loaded before index loading", ErrorCodes::LOGICAL_ERROR);
630
631 size_t key_size = storage.primary_key_columns.size();
632
633 if (key_size)
634 {
635 MutableColumns loaded_index;
636 loaded_index.resize(key_size);
637
638 for (size_t i = 0; i < key_size; ++i)
639 {
640 loaded_index[i] = storage.primary_key_data_types[i]->createColumn();
641 loaded_index[i]->reserve(index_granularity.getMarksCount());
642 }
643
644 String index_path = getFullPath() + "primary.idx";
645 ReadBufferFromFile index_file = openForReading(index_path);
646
647 for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) //-V756
648 for (size_t j = 0; j < key_size; ++j)
649 storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j], index_file);
650
651 for (size_t i = 0; i < key_size; ++i)
652 {
653 loaded_index[i]->protect();
654 if (loaded_index[i]->size() != index_granularity.getMarksCount())
655 throw Exception("Cannot read all data from index file " + index_path
656 + "(expected size: " + toString(index_granularity.getMarksCount()) + ", read: " + toString(loaded_index[i]->size()) + ")",
657 ErrorCodes::CANNOT_READ_ALL_DATA);
658 }
659
660 if (!index_file.eof())
661 throw Exception("Index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
662
663 index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
664 }
665}
666
667void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
668{
669 if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
670 {
671 DayNum min_date;
672 DayNum max_date;
673 MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
674
675 const auto & date_lut = DateLUT::instance();
676 partition = MergeTreePartition(date_lut.toNumYYYYMM(min_date));
677 minmax_idx = MinMaxIndex(min_date, max_date);
678 }
679 else
680 {
681 String path = getFullPath();
682 partition.load(storage, path);
683 if (!isEmpty())
684 minmax_idx.load(storage, path);
685 }
686
687 String calculated_partition_id = partition.getID(storage.partition_key_sample);
688 if (calculated_partition_id != info.partition_id)
689 throw Exception(
690 "While loading part " + getFullPath() + ": calculated partition ID: " + calculated_partition_id
691 + " differs from partition ID in part name: " + info.partition_id,
692 ErrorCodes::CORRUPTED_DATA);
693}
694
695void MergeTreeDataPart::loadChecksums(bool require)
696{
697 String path = getFullPath() + "checksums.txt";
698 Poco::File checksums_file(path);
699 if (checksums_file.exists())
700 {
701 ReadBufferFromFile file = openForReading(path);
702 if (checksums.read(file))
703 {
704 assertEOF(file);
705 bytes_on_disk = checksums.getTotalSizeOnDisk();
706 }
707 else
708 bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
709 }
710 else
711 {
712 if (require)
713 throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
714
715 bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
716 }
717}
718
719void MergeTreeDataPart::loadRowsCount()
720{
721 if (index_granularity.empty())
722 {
723 rows_count = 0;
724 }
725 else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
726 {
727 String path = getFullPath() + "count.txt";
728 if (!Poco::File(path).exists())
729 throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
730
731 ReadBufferFromFile file = openForReading(path);
732 readIntText(rows_count, file);
733 assertEOF(file);
734 }
735 else
736 {
737 for (const NameAndTypePair & column : columns)
738 {
739 ColumnPtr column_col = column.type->createColumn();
740 if (!column_col->isFixedAndContiguous() || column_col->lowCardinality())
741 continue;
742
743 size_t column_size = getColumnSize(column.name, *column.type).data_uncompressed;
744 if (!column_size)
745 continue;
746
747 size_t sizeof_field = column_col->sizeOfValueIfFixed();
748 rows_count = column_size / sizeof_field;
749
750 if (column_size % sizeof_field != 0)
751 {
752 throw Exception(
753 "Uncompressed size of column " + column.name + "(" + toString(column_size)
754 + ") is not divisible by the size of value (" + toString(sizeof_field) + ")",
755 ErrorCodes::LOGICAL_ERROR);
756 }
757
758 size_t last_mark_index_granularity = index_granularity.getLastNonFinalMarkRows();
759 size_t rows_approx = index_granularity.getTotalRows();
760 if (!(rows_count <= rows_approx && rows_approx < rows_count + last_mark_index_granularity))
761 throw Exception(
762 "Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows, expected "
763 + toString(rows_approx) + "+-" + toString(last_mark_index_granularity) + " rows according to the index",
764 ErrorCodes::LOGICAL_ERROR);
765
766 return;
767 }
768
769 throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
770 }
771}
772
773void MergeTreeDataPart::loadTTLInfos()
774{
775 String path = getFullPath() + "ttl.txt";
776 if (Poco::File(path).exists())
777 {
778 ReadBufferFromFile in = openForReading(path);
779 assertString("ttl format version: ", in);
780 size_t format_version;
781 readText(format_version, in);
782 assertChar('\n', in);
783
784 if (format_version == 1)
785 {
786 try
787 {
788 ttl_infos.read(in);
789 }
790 catch (const JSONException &)
791 {
792 throw Exception("Error while parsing file ttl.txt in part: " + name, ErrorCodes::BAD_TTL_FILE);
793 }
794 }
795 else
796 throw Exception("Unknown ttl format version: " + toString(format_version), ErrorCodes::BAD_TTL_FILE);
797 }
798}
799
800void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
801{
802 std::shared_lock<std::shared_mutex> part_lock(columns_lock);
803
804 for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
805 {
806 IDataType::SubstreamPath path;
807 name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
808 {
809 Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin");
810 if (bin_file.exists())
811 column_to_size[name_type.name] += bin_file.getSize();
812 }, path);
813 }
814}
815
816void MergeTreeDataPart::loadColumns(bool require)
817{
818 String path = getFullPath() + "columns.txt";
819 Poco::File poco_file_path{path};
820 if (!poco_file_path.exists())
821 {
822 if (require)
823 throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
824
825 /// If there is no file with a list of columns, write it down.
826 for (const NameAndTypePair & column : storage.getColumns().getAllPhysical())
827 if (Poco::File(getFullPath() + getFileNameForColumn(column) + ".bin").exists())
828 columns.push_back(column);
829
830 if (columns.empty())
831 throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
832
833 {
834 WriteBufferFromFile out(path + ".tmp", 4096);
835 columns.writeText(out);
836 }
837 Poco::File(path + ".tmp").renameTo(path);
838
839 return;
840 }
841
842 is_frozen = !poco_file_path.canWrite();
843
844 ReadBufferFromFile file = openForReading(path);
845 columns.readText(file);
846}
847
848void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
849{
850 String path = getFullPath();
851
852 if (!checksums.empty())
853 {
854 if (!storage.primary_key_columns.empty() && !checksums.files.count("primary.idx"))
855 throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
856
857 if (require_part_metadata)
858 {
859 for (const NameAndTypePair & name_type : columns)
860 {
861 IDataType::SubstreamPath stream_path;
862 name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
863 {
864 String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
865 String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
866 String bin_file_name = file_name + ".bin";
867 if (!checksums.files.count(mrk_file_name))
868 throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path,
869 ErrorCodes::NO_FILE_IN_DATA_PART);
870 if (!checksums.files.count(bin_file_name))
871 throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + path,
872 ErrorCodes::NO_FILE_IN_DATA_PART);
873 }, stream_path);
874 }
875 }
876
877 if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
878 {
879 if (!checksums.files.count("count.txt"))
880 throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
881
882 if (storage.partition_key_expr && !checksums.files.count("partition.dat"))
883 throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
884
885 if (!isEmpty())
886 {
887 for (const String & col_name : storage.minmax_idx_columns)
888 {
889 if (!checksums.files.count("minmax_" + escapeForFileName(col_name) + ".idx"))
890 throw Exception("No minmax idx file checksum for column " + col_name, ErrorCodes::NO_FILE_IN_DATA_PART);
891 }
892 }
893 }
894
895 checksums.checkSizes(path);
896 }
897 else
898 {
899 auto check_file_not_empty = [&path](const String & file_path)
900 {
901 Poco::File file(file_path);
902 if (!file.exists() || file.getSize() == 0)
903 throw Exception("Part " + path + " is broken: " + file_path + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
904 return file.getSize();
905 };
906
907 /// Check that the primary key index is not empty.
908 if (!storage.primary_key_columns.empty())
909 check_file_not_empty(path + "primary.idx");
910
911 if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
912 {
913 check_file_not_empty(path + "count.txt");
914
915 if (storage.partition_key_expr)
916 check_file_not_empty(path + "partition.dat");
917
918 for (const String & col_name : storage.minmax_idx_columns)
919 check_file_not_empty(path + "minmax_" + escapeForFileName(col_name) + ".idx");
920 }
921
922 /// Check that all marks are nonempty and have the same size.
923
924 std::optional<UInt64> marks_size;
925 for (const NameAndTypePair & name_type : columns)
926 {
927 name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
928 {
929 Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension);
930
931 /// Missing file is Ok for case when new column was added.
932 if (file.exists())
933 {
934 UInt64 file_size = file.getSize();
935
936 if (!file_size)
937 throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
938 ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
939
940 if (!marks_size)
941 marks_size = file_size;
942 else if (file_size != *marks_size)
943 throw Exception("Part " + path + " is broken: marks have different sizes.",
944 ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
945 }
946 });
947 }
948 }
949}
950
951bool MergeTreeDataPart::hasColumnFiles(const String & column_name, const IDataType & type) const
952{
953 bool res = true;
954
955 type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
956 {
957 String file_name = IDataType::getFileNameForStream(column_name, substream_path);
958
959 auto bin_checksum = checksums.files.find(file_name + ".bin");
960 auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
961
962 if (bin_checksum == checksums.files.end() || mrk_checksum == checksums.files.end())
963 res = false;
964 }, {});
965
966 return res;
967}
968
969
970UInt64 MergeTreeDataPart::getIndexSizeInBytes() const
971{
972 UInt64 res = 0;
973 for (const ColumnPtr & column : index)
974 res += column->byteSize();
975 return res;
976}
977
978UInt64 MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
979{
980 UInt64 res = 0;
981 for (const ColumnPtr & column : index)
982 res += column->allocatedBytes();
983 return res;
984}
985
986String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
987{
988 switch (state)
989 {
990 case State::Temporary:
991 return "Temporary";
992 case State::PreCommitted:
993 return "PreCommitted";
994 case State::Committed:
995 return "Committed";
996 case State::Outdated:
997 return "Outdated";
998 case State::Deleting:
999 return "Deleting";
1000 case State::DeleteOnDestroy:
1001 return "DeleteOnDestroy";
1002 }
1003
1004 __builtin_unreachable();
1005}
1006
1007String MergeTreeDataPart::stateString() const
1008{
1009 return stateToString(state);
1010}
1011
1012void MergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPart::State> & affordable_states) const
1013{
1014 if (!checkState(affordable_states))
1015 {
1016 String states_str;
1017 for (auto affordable_state : affordable_states)
1018 states_str += stateToString(affordable_state) + " ";
1019
1020 throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
1021 }
1022}
1023
1024}
1025