| 1 | #include "MergeTreeDataPart.h" |
| 2 | |
| 3 | #include <optional> |
| 4 | #include <IO/ReadHelpers.h> |
| 5 | #include <IO/WriteHelpers.h> |
| 6 | #include <Compression/CompressedReadBuffer.h> |
| 7 | #include <Compression/CompressedWriteBuffer.h> |
| 8 | #include <IO/ReadBufferFromString.h> |
| 9 | #include <IO/WriteBufferFromString.h> |
| 10 | #include <IO/ReadBufferFromFile.h> |
| 11 | #include <IO/HashingWriteBuffer.h> |
| 12 | #include <Core/Defines.h> |
| 13 | #include <Common/SipHash.h> |
| 14 | #include <Common/escapeForFileName.h> |
| 15 | #include <Common/StringUtils/StringUtils.h> |
| 16 | #include <Common/localBackup.h> |
| 17 | #include <Compression/CompressionInfo.h> |
| 18 | #include <Storages/MergeTree/MergeTreeData.h> |
| 19 | #include <Poco/File.h> |
| 20 | #include <Poco/Path.h> |
| 21 | #include <Poco/DirectoryIterator.h> |
| 22 | #include <common/logger_useful.h> |
| 23 | #include <common/JSON.h> |
| 24 | |
| 25 | namespace DB |
| 26 | { |
| 27 | |
| 28 | namespace ErrorCodes |
| 29 | { |
| 30 | extern const int FILE_DOESNT_EXIST; |
| 31 | extern const int NO_FILE_IN_DATA_PART; |
| 32 | extern const int EXPECTED_END_OF_FILE; |
| 33 | extern const int CORRUPTED_DATA; |
| 34 | extern const int NOT_FOUND_EXPECTED_DATA_PART; |
| 35 | extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; |
| 36 | extern const int BAD_TTL_FILE; |
| 37 | extern const int CANNOT_UNLINK; |
| 38 | } |
| 39 | |
| 40 | |
| 41 | static ReadBufferFromFile openForReading(const String & path) |
| 42 | { |
| 43 | return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize())); |
| 44 | } |
| 45 | |
| 46 | static String getFileNameForColumn(const NameAndTypePair & column) |
| 47 | { |
| 48 | String filename; |
| 49 | column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
| 50 | { |
| 51 | if (filename.empty()) |
| 52 | filename = IDataType::getFileNameForStream(column.name, substream_path); |
| 53 | }); |
| 54 | return filename; |
| 55 | } |
| 56 | |
| 57 | void MergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const String & part_path) |
| 58 | { |
| 59 | size_t minmax_idx_size = data.minmax_idx_column_types.size(); |
| 60 | parallelogram.reserve(minmax_idx_size); |
| 61 | for (size_t i = 0; i < minmax_idx_size; ++i) |
| 62 | { |
| 63 | String file_name = part_path + "minmax_" + escapeForFileName(data.minmax_idx_columns[i]) + ".idx" ; |
| 64 | ReadBufferFromFile file = openForReading(file_name); |
| 65 | const DataTypePtr & type = data.minmax_idx_column_types[i]; |
| 66 | |
| 67 | Field min_val; |
| 68 | type->deserializeBinary(min_val, file); |
| 69 | Field max_val; |
| 70 | type->deserializeBinary(max_val, file); |
| 71 | |
| 72 | parallelogram.emplace_back(min_val, true, max_val, true); |
| 73 | } |
| 74 | initialized = true; |
| 75 | } |
| 76 | |
| 77 | void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & data, const String & part_path, Checksums & out_checksums) const |
| 78 | { |
| 79 | store(data.minmax_idx_columns, data.minmax_idx_column_types, part_path, out_checksums); |
| 80 | } |
| 81 | |
| 82 | void MergeTreeDataPart::MinMaxIndex::store(const Names & column_names, const DataTypes & data_types, const String & part_path, Checksums & out_checksums) const |
| 83 | { |
| 84 | if (!initialized) |
| 85 | throw Exception("Attempt to store uninitialized MinMax index for part " + part_path + ". This is a bug." , |
| 86 | ErrorCodes::LOGICAL_ERROR); |
| 87 | |
| 88 | for (size_t i = 0; i < column_names.size(); ++i) |
| 89 | { |
| 90 | String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx" ; |
| 91 | const DataTypePtr & type = data_types.at(i); |
| 92 | |
| 93 | WriteBufferFromFile out(part_path + file_name); |
| 94 | HashingWriteBuffer out_hashing(out); |
| 95 | type->serializeBinary(parallelogram[i].left, out_hashing); |
| 96 | type->serializeBinary(parallelogram[i].right, out_hashing); |
| 97 | out_hashing.next(); |
| 98 | out_checksums.files[file_name].file_size = out_hashing.count(); |
| 99 | out_checksums.files[file_name].file_hash = out_hashing.getHash(); |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | void MergeTreeDataPart::MinMaxIndex::update(const Block & block, const Names & column_names) |
| 104 | { |
| 105 | if (!initialized) |
| 106 | parallelogram.reserve(column_names.size()); |
| 107 | |
| 108 | for (size_t i = 0; i < column_names.size(); ++i) |
| 109 | { |
| 110 | Field min_value; |
| 111 | Field max_value; |
| 112 | const ColumnWithTypeAndName & column = block.getByName(column_names[i]); |
| 113 | column.column->getExtremes(min_value, max_value); |
| 114 | |
| 115 | if (!initialized) |
| 116 | parallelogram.emplace_back(min_value, true, max_value, true); |
| 117 | else |
| 118 | { |
| 119 | parallelogram[i].left = std::min(parallelogram[i].left, min_value); |
| 120 | parallelogram[i].right = std::max(parallelogram[i].right, max_value); |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | initialized = true; |
| 125 | } |
| 126 | |
| 127 | void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other) |
| 128 | { |
| 129 | if (!other.initialized) |
| 130 | return; |
| 131 | |
| 132 | if (!initialized) |
| 133 | { |
| 134 | parallelogram = other.parallelogram; |
| 135 | initialized = true; |
| 136 | } |
| 137 | else |
| 138 | { |
| 139 | for (size_t i = 0; i < parallelogram.size(); ++i) |
| 140 | { |
| 141 | parallelogram[i].left = std::min(parallelogram[i].left, other.parallelogram[i].left); |
| 142 | parallelogram[i].right = std::max(parallelogram[i].right, other.parallelogram[i].right); |
| 143 | } |
| 144 | } |
| 145 | } |
| 146 | |
| 147 | |
| 148 | MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskPtr & disk_, const String & name_) |
| 149 | : storage(storage_) |
| 150 | , disk(disk_) |
| 151 | , name(name_) |
| 152 | , info(MergeTreePartInfo::fromPartName(name_, storage.format_version)) |
| 153 | , index_granularity_info(storage) |
| 154 | { |
| 155 | } |
| 156 | |
| 157 | MergeTreeDataPart::MergeTreeDataPart( |
| 158 | const MergeTreeData & storage_, |
| 159 | const DiskPtr & disk_, |
| 160 | const String & name_, |
| 161 | const MergeTreePartInfo & info_) |
| 162 | : storage(storage_) |
| 163 | , disk(disk_) |
| 164 | , name(name_) |
| 165 | , info(info_) |
| 166 | , index_granularity_info(storage) |
| 167 | { |
| 168 | } |
| 169 | |
| 170 | |
| 171 | /// Takes into account the fact that several columns can e.g. share their .size substreams. |
| 172 | /// When calculating totals these should be counted only once. |
| 173 | ColumnSize MergeTreeDataPart::getColumnSizeImpl( |
| 174 | const String & column_name, const IDataType & type, std::unordered_set<String> * processed_substreams) const |
| 175 | { |
| 176 | ColumnSize size; |
| 177 | if (checksums.empty()) |
| 178 | return size; |
| 179 | |
| 180 | type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
| 181 | { |
| 182 | String file_name = IDataType::getFileNameForStream(column_name, substream_path); |
| 183 | |
| 184 | if (processed_substreams && !processed_substreams->insert(file_name).second) |
| 185 | return; |
| 186 | |
| 187 | auto bin_checksum = checksums.files.find(file_name + ".bin" ); |
| 188 | if (bin_checksum != checksums.files.end()) |
| 189 | { |
| 190 | size.data_compressed += bin_checksum->second.file_size; |
| 191 | size.data_uncompressed += bin_checksum->second.uncompressed_size; |
| 192 | } |
| 193 | |
| 194 | auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension); |
| 195 | if (mrk_checksum != checksums.files.end()) |
| 196 | size.marks += mrk_checksum->second.file_size; |
| 197 | }, {}); |
| 198 | |
| 199 | return size; |
| 200 | } |
| 201 | |
| 202 | ColumnSize MergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const |
| 203 | { |
| 204 | return getColumnSizeImpl(column_name, type, nullptr); |
| 205 | } |
| 206 | |
| 207 | ColumnSize MergeTreeDataPart::getTotalColumnsSize() const |
| 208 | { |
| 209 | ColumnSize totals; |
| 210 | std::unordered_set<String> processed_substreams; |
| 211 | for (const NameAndTypePair & column : columns) |
| 212 | { |
| 213 | ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams); |
| 214 | totals.add(size); |
| 215 | } |
| 216 | return totals; |
| 217 | } |
| 218 | |
| 219 | |
| 220 | size_t MergeTreeDataPart::getFileSizeOrZero(const String & file_name) const |
| 221 | { |
| 222 | auto checksum = checksums.files.find(file_name); |
| 223 | if (checksum == checksums.files.end()) |
| 224 | return 0; |
| 225 | return checksum->second.file_size; |
| 226 | } |
| 227 | |
| 228 | /** Returns the name of a column with minimum compressed size (as returned by getColumnSize()). |
| 229 | * If no checksums are present returns the name of the first physically existing column. |
| 230 | */ |
| 231 | String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const |
| 232 | { |
| 233 | const auto & storage_columns = storage.getColumns().getAllPhysical(); |
| 234 | const std::string * minimum_size_column = nullptr; |
| 235 | UInt64 minimum_size = std::numeric_limits<UInt64>::max(); |
| 236 | |
| 237 | for (const auto & column : storage_columns) |
| 238 | { |
| 239 | if (!hasColumnFiles(column.name, *column.type)) |
| 240 | continue; |
| 241 | |
| 242 | const auto size = getColumnSize(column.name, *column.type).data_compressed; |
| 243 | if (size < minimum_size) |
| 244 | { |
| 245 | minimum_size = size; |
| 246 | minimum_size_column = &column.name; |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | if (!minimum_size_column) |
| 251 | throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR); |
| 252 | |
| 253 | return *minimum_size_column; |
| 254 | } |
| 255 | |
| 256 | |
| 257 | String MergeTreeDataPart::getFullPath() const |
| 258 | { |
| 259 | if (relative_path.empty()) |
| 260 | throw Exception("Part relative_path cannot be empty. It's bug." , ErrorCodes::LOGICAL_ERROR); |
| 261 | |
| 262 | return storage.getFullPathOnDisk(disk) + relative_path + "/" ; |
| 263 | } |
| 264 | |
| 265 | String MergeTreeDataPart::getNameWithPrefix() const |
| 266 | { |
| 267 | String res = Poco::Path(relative_path).getFileName(); |
| 268 | |
| 269 | if (res.empty()) |
| 270 | throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set" , ErrorCodes::LOGICAL_ERROR); |
| 271 | |
| 272 | return res; |
| 273 | } |
| 274 | |
| 275 | String MergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) const |
| 276 | { |
| 277 | if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) |
| 278 | { |
| 279 | /// NOTE: getting min and max dates from the part name (instead of part data) because we want |
| 280 | /// the merged part name be determined only by source part names. |
| 281 | /// It is simpler this way when the real min and max dates for the block range can change |
| 282 | /// (e.g. after an ALTER DELETE command). |
| 283 | DayNum min_date; |
| 284 | DayNum max_date; |
| 285 | MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date); |
| 286 | return new_part_info.getPartNameV0(min_date, max_date); |
| 287 | } |
| 288 | else |
| 289 | return new_part_info.getPartName(); |
| 290 | } |
| 291 | |
| 292 | DayNum MergeTreeDataPart::getMinDate() const |
| 293 | { |
| 294 | if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized) |
| 295 | return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].left.get<UInt64>()); |
| 296 | else |
| 297 | return DayNum(); |
| 298 | } |
| 299 | |
| 300 | |
| 301 | DayNum MergeTreeDataPart::getMaxDate() const |
| 302 | { |
| 303 | if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized) |
| 304 | return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].right.get<UInt64>()); |
| 305 | else |
| 306 | return DayNum(); |
| 307 | } |
| 308 | |
| 309 | time_t MergeTreeDataPart::getMinTime() const |
| 310 | { |
| 311 | if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized) |
| 312 | return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].left.get<UInt64>(); |
| 313 | else |
| 314 | return 0; |
| 315 | } |
| 316 | |
| 317 | |
| 318 | time_t MergeTreeDataPart::getMaxTime() const |
| 319 | { |
| 320 | if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized) |
| 321 | return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].right.get<UInt64>(); |
| 322 | else |
| 323 | return 0; |
| 324 | } |
| 325 | |
| 326 | MergeTreeDataPart::~MergeTreeDataPart() |
| 327 | { |
| 328 | if (state == State::DeleteOnDestroy || is_temp) |
| 329 | { |
| 330 | try |
| 331 | { |
| 332 | std::string path = getFullPath(); |
| 333 | |
| 334 | Poco::File dir(path); |
| 335 | if (!dir.exists()) |
| 336 | return; |
| 337 | |
| 338 | if (is_temp) |
| 339 | { |
| 340 | if (!startsWith(getNameWithPrefix(), "tmp" )) |
| 341 | { |
| 342 | LOG_ERROR(storage.log, "~DataPart() should remove part " << path |
| 343 | << " but its name doesn't start with tmp. Too suspicious, keeping the part." ); |
| 344 | return; |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | dir.remove(true); |
| 349 | |
| 350 | if (state == State::DeleteOnDestroy) |
| 351 | { |
| 352 | LOG_TRACE(storage.log, "Removed part from old location " << path); |
| 353 | } |
| 354 | } |
| 355 | catch (...) |
| 356 | { |
| 357 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 358 | } |
| 359 | } |
| 360 | } |
| 361 | |
| 362 | UInt64 MergeTreeDataPart::calculateTotalSizeOnDisk(const String & from) |
| 363 | { |
| 364 | Poco::File cur(from); |
| 365 | if (cur.isFile()) |
| 366 | return cur.getSize(); |
| 367 | std::vector<std::string> files; |
| 368 | cur.list(files); |
| 369 | UInt64 res = 0; |
| 370 | for (const auto & file : files) |
| 371 | res += calculateTotalSizeOnDisk(from + file); |
| 372 | return res; |
| 373 | } |
| 374 | |
| 375 | void MergeTreeDataPart::remove() const |
| 376 | { |
| 377 | if (relative_path.empty()) |
| 378 | throw Exception("Part relative_path cannot be empty. This is bug." , ErrorCodes::LOGICAL_ERROR); |
| 379 | |
| 380 | /** Atomic directory removal: |
| 381 | * - rename directory to temporary name; |
| 382 | * - remove it recursive. |
| 383 | * |
| 384 | * For temporary name we use "delete_tmp_" prefix. |
| 385 | * |
| 386 | * NOTE: We cannot use "tmp_delete_" prefix, because there is a second thread, |
| 387 | * that calls "clearOldTemporaryDirectories" and removes all directories, that begin with "tmp_" and are old enough. |
| 388 | * But when we removing data part, it can be old enough. And rename doesn't change mtime. |
| 389 | * And a race condition can happen that will lead to "File not found" error here. |
| 390 | */ |
| 391 | |
| 392 | String full_path = storage.getFullPathOnDisk(disk); |
| 393 | String from = full_path + relative_path; |
| 394 | String to = full_path + "delete_tmp_" + name; |
| 395 | // TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function |
| 396 | |
| 397 | |
| 398 | Poco::File from_dir{from}; |
| 399 | Poco::File to_dir{to}; |
| 400 | |
| 401 | if (to_dir.exists()) |
| 402 | { |
| 403 | LOG_WARNING(storage.log, "Directory " << to << " (to which part must be renamed before removing) already exists." |
| 404 | " Most likely this is due to unclean restart. Removing it." ); |
| 405 | |
| 406 | try |
| 407 | { |
| 408 | to_dir.remove(true); |
| 409 | } |
| 410 | catch (...) |
| 411 | { |
| 412 | LOG_ERROR(storage.log, "Cannot remove directory " << to << ". Check owner and access rights." ); |
| 413 | throw; |
| 414 | } |
| 415 | } |
| 416 | |
| 417 | try |
| 418 | { |
| 419 | from_dir.renameTo(to); |
| 420 | } |
| 421 | catch (const Poco::FileNotFoundException &) |
| 422 | { |
| 423 | LOG_ERROR(storage.log, "Directory " << from << " (part to remove) doesn't exist or one of nested files has gone." |
| 424 | " Most likely this is due to manual removing. This should be discouraged. Ignoring." ); |
| 425 | |
| 426 | return; |
| 427 | } |
| 428 | |
| 429 | try |
| 430 | { |
| 431 | /// Remove each expected file in directory, then remove directory itself. |
| 432 | |
| 433 | #if !__clang__ |
| 434 | #pragma GCC diagnostic push |
| 435 | #pragma GCC diagnostic ignored "-Wunused-variable" |
| 436 | #endif |
| 437 | std::shared_lock<std::shared_mutex> lock(columns_lock); |
| 438 | |
| 439 | for (const auto & [file, _] : checksums.files) |
| 440 | { |
| 441 | String path_to_remove = to + "/" + file; |
| 442 | if (0 != unlink(path_to_remove.c_str())) |
| 443 | throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, |
| 444 | ErrorCodes::CANNOT_UNLINK); |
| 445 | } |
| 446 | #if !__clang__ |
| 447 | #pragma GCC diagnostic pop |
| 448 | #endif |
| 449 | |
| 450 | for (const auto & file : {"checksums.txt" , "columns.txt" }) |
| 451 | { |
| 452 | String path_to_remove = to + "/" + file; |
| 453 | if (0 != unlink(path_to_remove.c_str())) |
| 454 | throwFromErrnoWithPath("Cannot unlink file " + path_to_remove, path_to_remove, |
| 455 | ErrorCodes::CANNOT_UNLINK); |
| 456 | } |
| 457 | |
| 458 | if (0 != rmdir(to.c_str())) |
| 459 | throwFromErrnoWithPath("Cannot rmdir file " + to, to, ErrorCodes::CANNOT_UNLINK); |
| 460 | } |
| 461 | catch (...) |
| 462 | { |
| 463 | /// Recursive directory removal does many excessive "stat" syscalls under the hood. |
| 464 | |
| 465 | LOG_ERROR(storage.log, "Cannot quickly remove directory " << to << " by removing files; fallback to recursive removal. Reason: " |
| 466 | << getCurrentExceptionMessage(false)); |
| 467 | |
| 468 | to_dir.remove(true); |
| 469 | } |
| 470 | } |
| 471 | |
| 472 | |
| 473 | void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const |
| 474 | { |
| 475 | String from = getFullPath(); |
| 476 | String to = storage.getFullPathOnDisk(disk) + new_relative_path + "/" ; |
| 477 | |
| 478 | Poco::File from_file(from); |
| 479 | if (!from_file.exists()) |
| 480 | throw Exception("Part directory " + from + " doesn't exist. Most likely it is logical error." , ErrorCodes::FILE_DOESNT_EXIST); |
| 481 | |
| 482 | Poco::File to_file(to); |
| 483 | if (to_file.exists()) |
| 484 | { |
| 485 | if (remove_new_dir_if_exists) |
| 486 | { |
| 487 | Names files; |
| 488 | Poco::File(from).list(files); |
| 489 | |
| 490 | LOG_WARNING(storage.log, "Part directory " << to << " already exists" |
| 491 | << " and contains " << files.size() << " files. Removing it." ); |
| 492 | |
| 493 | to_file.remove(true); |
| 494 | } |
| 495 | else |
| 496 | { |
| 497 | throw Exception("Part directory " + to + " already exists" , ErrorCodes::DIRECTORY_ALREADY_EXISTS); |
| 498 | } |
| 499 | } |
| 500 | |
| 501 | from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr))); |
| 502 | from_file.renameTo(to); |
| 503 | relative_path = new_relative_path; |
| 504 | } |
| 505 | |
| 506 | |
| 507 | String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const |
| 508 | { |
| 509 | /// Do not allow underscores in the prefix because they are used as separators. |
| 510 | assert(prefix.find_first_of('_') == String::npos); |
| 511 | |
| 512 | String res; |
| 513 | |
| 514 | /** If you need to detach a part, and directory into which we want to rename it already exists, |
| 515 | * we will rename to the directory with the name to which the suffix is added in the form of "_tryN". |
| 516 | * This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter. |
| 517 | * No more than 10 attempts are made so that there are not too many junk directories left. |
| 518 | */ |
| 519 | for (int try_no = 0; try_no < 10; try_no++) |
| 520 | { |
| 521 | res = "detached/" + (prefix.empty() ? "" : prefix + "_" ) |
| 522 | + name + (try_no ? "_try" + DB::toString(try_no) : "" ); |
| 523 | |
| 524 | if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists()) |
| 525 | return res; |
| 526 | |
| 527 | LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists." |
| 528 | " Will detach to directory with '_tryN' suffix." ); |
| 529 | } |
| 530 | |
| 531 | return res; |
| 532 | } |
| 533 | |
| 534 | void MergeTreeDataPart::renameToDetached(const String & prefix) const |
| 535 | { |
| 536 | renameTo(getRelativePathForDetachedPart(prefix)); |
| 537 | } |
| 538 | |
| 539 | |
| 540 | UInt64 MergeTreeDataPart::getMarksCount() const |
| 541 | { |
| 542 | return index_granularity.getMarksCount(); |
| 543 | } |
| 544 | |
| 545 | void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const |
| 546 | { |
| 547 | Poco::Path src(getFullPath()); |
| 548 | Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix)); |
| 549 | /// Backup is not recursive (max_level is 0), so do not copy inner directories |
| 550 | localBackup(src, dst, 0); |
| 551 | } |
| 552 | |
| 553 | void MergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const |
| 554 | { |
| 555 | auto reserved_disk = reservation->getDisk(); |
| 556 | if (reserved_disk->getName() == disk->getName()) |
| 557 | throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR); |
| 558 | |
| 559 | String path_to_clone = storage.getFullPathOnDisk(reserved_disk) + "detached/" ; |
| 560 | |
| 561 | if (Poco::File(path_to_clone + relative_path).exists()) |
| 562 | throw Exception("Path " + path_to_clone + relative_path + " already exists. Can not clone " , ErrorCodes::DIRECTORY_ALREADY_EXISTS); |
| 563 | Poco::File(path_to_clone).createDirectory(); |
| 564 | |
| 565 | Poco::File cloning_directory(getFullPath()); |
| 566 | cloning_directory.copyTo(path_to_clone); |
| 567 | } |
| 568 | |
| 569 | void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency) |
| 570 | { |
| 571 | /// Memory should not be limited during ATTACH TABLE query. |
| 572 | /// This is already true at the server startup but must be also ensured for manual table ATTACH. |
| 573 | /// Motivation: memory for index is shared between queries - not belong to the query itself. |
| 574 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); |
| 575 | |
| 576 | loadColumns(require_columns_checksums); |
| 577 | loadChecksums(require_columns_checksums); |
| 578 | loadIndexGranularity(); |
| 579 | loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity` |
| 580 | loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`. |
| 581 | loadPartitionAndMinMaxIndex(); |
| 582 | loadTTLInfos(); |
| 583 | if (check_consistency) |
| 584 | checkConsistency(require_columns_checksums); |
| 585 | } |
| 586 | |
| 587 | void MergeTreeDataPart::loadIndexGranularity() |
| 588 | { |
| 589 | String full_path = getFullPath(); |
| 590 | index_granularity_info.changeGranularityIfRequired(full_path); |
| 591 | |
| 592 | if (columns.empty()) |
| 593 | throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); |
| 594 | |
| 595 | |
| 596 | /// We can use any column, it doesn't matter |
| 597 | std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + getFileNameForColumn(columns.front())); |
| 598 | if (!Poco::File(marks_file_path).exists()) |
| 599 | throw Exception("Marks file '" + marks_file_path + "' doesn't exist" , ErrorCodes::NO_FILE_IN_DATA_PART); |
| 600 | |
| 601 | size_t marks_file_size = Poco::File(marks_file_path).getSize(); |
| 602 | |
| 603 | /// old version of marks with static index granularity |
| 604 | if (!index_granularity_info.is_adaptive) |
| 605 | { |
| 606 | size_t marks_count = marks_file_size / index_granularity_info.mark_size_in_bytes; |
| 607 | index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same |
| 608 | } |
| 609 | else |
| 610 | { |
| 611 | ReadBufferFromFile buffer(marks_file_path, marks_file_size, -1); |
| 612 | while (!buffer.eof()) |
| 613 | { |
| 614 | buffer.seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block |
| 615 | size_t granularity; |
| 616 | readIntBinary(granularity, buffer); |
| 617 | index_granularity.appendMark(granularity); |
| 618 | } |
| 619 | if (index_granularity.getMarksCount() * index_granularity_info.mark_size_in_bytes != marks_file_size) |
| 620 | throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA); |
| 621 | } |
| 622 | index_granularity.setInitialized(); |
| 623 | } |
| 624 | |
| 625 | void MergeTreeDataPart::loadIndex() |
| 626 | { |
| 627 | /// It can be empty in case of mutations |
| 628 | if (!index_granularity.isInitialized()) |
| 629 | throw Exception("Index granularity is not loaded before index loading" , ErrorCodes::LOGICAL_ERROR); |
| 630 | |
| 631 | size_t key_size = storage.primary_key_columns.size(); |
| 632 | |
| 633 | if (key_size) |
| 634 | { |
| 635 | MutableColumns loaded_index; |
| 636 | loaded_index.resize(key_size); |
| 637 | |
| 638 | for (size_t i = 0; i < key_size; ++i) |
| 639 | { |
| 640 | loaded_index[i] = storage.primary_key_data_types[i]->createColumn(); |
| 641 | loaded_index[i]->reserve(index_granularity.getMarksCount()); |
| 642 | } |
| 643 | |
| 644 | String index_path = getFullPath() + "primary.idx" ; |
| 645 | ReadBufferFromFile index_file = openForReading(index_path); |
| 646 | |
| 647 | for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) //-V756 |
| 648 | for (size_t j = 0; j < key_size; ++j) |
| 649 | storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j], index_file); |
| 650 | |
| 651 | for (size_t i = 0; i < key_size; ++i) |
| 652 | { |
| 653 | loaded_index[i]->protect(); |
| 654 | if (loaded_index[i]->size() != index_granularity.getMarksCount()) |
| 655 | throw Exception("Cannot read all data from index file " + index_path |
| 656 | + "(expected size: " + toString(index_granularity.getMarksCount()) + ", read: " + toString(loaded_index[i]->size()) + ")" , |
| 657 | ErrorCodes::CANNOT_READ_ALL_DATA); |
| 658 | } |
| 659 | |
| 660 | if (!index_file.eof()) |
| 661 | throw Exception("Index file " + index_path + " is unexpectedly long" , ErrorCodes::EXPECTED_END_OF_FILE); |
| 662 | |
| 663 | index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end())); |
| 664 | } |
| 665 | } |
| 666 | |
| 667 | void MergeTreeDataPart::loadPartitionAndMinMaxIndex() |
| 668 | { |
| 669 | if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) |
| 670 | { |
| 671 | DayNum min_date; |
| 672 | DayNum max_date; |
| 673 | MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date); |
| 674 | |
| 675 | const auto & date_lut = DateLUT::instance(); |
| 676 | partition = MergeTreePartition(date_lut.toNumYYYYMM(min_date)); |
| 677 | minmax_idx = MinMaxIndex(min_date, max_date); |
| 678 | } |
| 679 | else |
| 680 | { |
| 681 | String path = getFullPath(); |
| 682 | partition.load(storage, path); |
| 683 | if (!isEmpty()) |
| 684 | minmax_idx.load(storage, path); |
| 685 | } |
| 686 | |
| 687 | String calculated_partition_id = partition.getID(storage.partition_key_sample); |
| 688 | if (calculated_partition_id != info.partition_id) |
| 689 | throw Exception( |
| 690 | "While loading part " + getFullPath() + ": calculated partition ID: " + calculated_partition_id |
| 691 | + " differs from partition ID in part name: " + info.partition_id, |
| 692 | ErrorCodes::CORRUPTED_DATA); |
| 693 | } |
| 694 | |
| 695 | void MergeTreeDataPart::loadChecksums(bool require) |
| 696 | { |
| 697 | String path = getFullPath() + "checksums.txt" ; |
| 698 | Poco::File checksums_file(path); |
| 699 | if (checksums_file.exists()) |
| 700 | { |
| 701 | ReadBufferFromFile file = openForReading(path); |
| 702 | if (checksums.read(file)) |
| 703 | { |
| 704 | assertEOF(file); |
| 705 | bytes_on_disk = checksums.getTotalSizeOnDisk(); |
| 706 | } |
| 707 | else |
| 708 | bytes_on_disk = calculateTotalSizeOnDisk(getFullPath()); |
| 709 | } |
| 710 | else |
| 711 | { |
| 712 | if (require) |
| 713 | throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); |
| 714 | |
| 715 | bytes_on_disk = calculateTotalSizeOnDisk(getFullPath()); |
| 716 | } |
| 717 | } |
| 718 | |
| 719 | void MergeTreeDataPart::loadRowsCount() |
| 720 | { |
| 721 | if (index_granularity.empty()) |
| 722 | { |
| 723 | rows_count = 0; |
| 724 | } |
| 725 | else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) |
| 726 | { |
| 727 | String path = getFullPath() + "count.txt" ; |
| 728 | if (!Poco::File(path).exists()) |
| 729 | throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); |
| 730 | |
| 731 | ReadBufferFromFile file = openForReading(path); |
| 732 | readIntText(rows_count, file); |
| 733 | assertEOF(file); |
| 734 | } |
| 735 | else |
| 736 | { |
| 737 | for (const NameAndTypePair & column : columns) |
| 738 | { |
| 739 | ColumnPtr column_col = column.type->createColumn(); |
| 740 | if (!column_col->isFixedAndContiguous() || column_col->lowCardinality()) |
| 741 | continue; |
| 742 | |
| 743 | size_t column_size = getColumnSize(column.name, *column.type).data_uncompressed; |
| 744 | if (!column_size) |
| 745 | continue; |
| 746 | |
| 747 | size_t sizeof_field = column_col->sizeOfValueIfFixed(); |
| 748 | rows_count = column_size / sizeof_field; |
| 749 | |
| 750 | if (column_size % sizeof_field != 0) |
| 751 | { |
| 752 | throw Exception( |
| 753 | "Uncompressed size of column " + column.name + "(" + toString(column_size) |
| 754 | + ") is not divisible by the size of value (" + toString(sizeof_field) + ")" , |
| 755 | ErrorCodes::LOGICAL_ERROR); |
| 756 | } |
| 757 | |
| 758 | size_t last_mark_index_granularity = index_granularity.getLastNonFinalMarkRows(); |
| 759 | size_t rows_approx = index_granularity.getTotalRows(); |
| 760 | if (!(rows_count <= rows_approx && rows_approx < rows_count + last_mark_index_granularity)) |
| 761 | throw Exception( |
| 762 | "Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows, expected " |
| 763 | + toString(rows_approx) + "+-" + toString(last_mark_index_granularity) + " rows according to the index" , |
| 764 | ErrorCodes::LOGICAL_ERROR); |
| 765 | |
| 766 | return; |
| 767 | } |
| 768 | |
| 769 | throw Exception("Data part doesn't contain fixed size column (even Date column)" , ErrorCodes::LOGICAL_ERROR); |
| 770 | } |
| 771 | } |
| 772 | |
| 773 | void MergeTreeDataPart::loadTTLInfos() |
| 774 | { |
| 775 | String path = getFullPath() + "ttl.txt" ; |
| 776 | if (Poco::File(path).exists()) |
| 777 | { |
| 778 | ReadBufferFromFile in = openForReading(path); |
| 779 | assertString("ttl format version: " , in); |
| 780 | size_t format_version; |
| 781 | readText(format_version, in); |
| 782 | assertChar('\n', in); |
| 783 | |
| 784 | if (format_version == 1) |
| 785 | { |
| 786 | try |
| 787 | { |
| 788 | ttl_infos.read(in); |
| 789 | } |
| 790 | catch (const JSONException &) |
| 791 | { |
| 792 | throw Exception("Error while parsing file ttl.txt in part: " + name, ErrorCodes::BAD_TTL_FILE); |
| 793 | } |
| 794 | } |
| 795 | else |
| 796 | throw Exception("Unknown ttl format version: " + toString(format_version), ErrorCodes::BAD_TTL_FILE); |
| 797 | } |
| 798 | } |
| 799 | |
| 800 | void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const |
| 801 | { |
| 802 | std::shared_lock<std::shared_mutex> part_lock(columns_lock); |
| 803 | |
| 804 | for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical()) |
| 805 | { |
| 806 | IDataType::SubstreamPath path; |
| 807 | name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
| 808 | { |
| 809 | Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin" ); |
| 810 | if (bin_file.exists()) |
| 811 | column_to_size[name_type.name] += bin_file.getSize(); |
| 812 | }, path); |
| 813 | } |
| 814 | } |
| 815 | |
| 816 | void MergeTreeDataPart::loadColumns(bool require) |
| 817 | { |
| 818 | String path = getFullPath() + "columns.txt" ; |
| 819 | Poco::File poco_file_path{path}; |
| 820 | if (!poco_file_path.exists()) |
| 821 | { |
| 822 | if (require) |
| 823 | throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); |
| 824 | |
| 825 | /// If there is no file with a list of columns, write it down. |
| 826 | for (const NameAndTypePair & column : storage.getColumns().getAllPhysical()) |
| 827 | if (Poco::File(getFullPath() + getFileNameForColumn(column) + ".bin" ).exists()) |
| 828 | columns.push_back(column); |
| 829 | |
| 830 | if (columns.empty()) |
| 831 | throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); |
| 832 | |
| 833 | { |
| 834 | WriteBufferFromFile out(path + ".tmp" , 4096); |
| 835 | columns.writeText(out); |
| 836 | } |
| 837 | Poco::File(path + ".tmp" ).renameTo(path); |
| 838 | |
| 839 | return; |
| 840 | } |
| 841 | |
| 842 | is_frozen = !poco_file_path.canWrite(); |
| 843 | |
| 844 | ReadBufferFromFile file = openForReading(path); |
| 845 | columns.readText(file); |
| 846 | } |
| 847 | |
| 848 | void MergeTreeDataPart::checkConsistency(bool require_part_metadata) |
| 849 | { |
| 850 | String path = getFullPath(); |
| 851 | |
| 852 | if (!checksums.empty()) |
| 853 | { |
| 854 | if (!storage.primary_key_columns.empty() && !checksums.files.count("primary.idx" )) |
| 855 | throw Exception("No checksum for primary.idx" , ErrorCodes::NO_FILE_IN_DATA_PART); |
| 856 | |
| 857 | if (require_part_metadata) |
| 858 | { |
| 859 | for (const NameAndTypePair & name_type : columns) |
| 860 | { |
| 861 | IDataType::SubstreamPath stream_path; |
| 862 | name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
| 863 | { |
| 864 | String file_name = IDataType::getFileNameForStream(name_type.name, substream_path); |
| 865 | String mrk_file_name = file_name + index_granularity_info.marks_file_extension; |
| 866 | String bin_file_name = file_name + ".bin" ; |
| 867 | if (!checksums.files.count(mrk_file_name)) |
| 868 | throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path, |
| 869 | ErrorCodes::NO_FILE_IN_DATA_PART); |
| 870 | if (!checksums.files.count(bin_file_name)) |
| 871 | throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + path, |
| 872 | ErrorCodes::NO_FILE_IN_DATA_PART); |
| 873 | }, stream_path); |
| 874 | } |
| 875 | } |
| 876 | |
| 877 | if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) |
| 878 | { |
| 879 | if (!checksums.files.count("count.txt" )) |
| 880 | throw Exception("No checksum for count.txt" , ErrorCodes::NO_FILE_IN_DATA_PART); |
| 881 | |
| 882 | if (storage.partition_key_expr && !checksums.files.count("partition.dat" )) |
| 883 | throw Exception("No checksum for partition.dat" , ErrorCodes::NO_FILE_IN_DATA_PART); |
| 884 | |
| 885 | if (!isEmpty()) |
| 886 | { |
| 887 | for (const String & col_name : storage.minmax_idx_columns) |
| 888 | { |
| 889 | if (!checksums.files.count("minmax_" + escapeForFileName(col_name) + ".idx" )) |
| 890 | throw Exception("No minmax idx file checksum for column " + col_name, ErrorCodes::NO_FILE_IN_DATA_PART); |
| 891 | } |
| 892 | } |
| 893 | } |
| 894 | |
| 895 | checksums.checkSizes(path); |
| 896 | } |
| 897 | else |
| 898 | { |
| 899 | auto check_file_not_empty = [&path](const String & file_path) |
| 900 | { |
| 901 | Poco::File file(file_path); |
| 902 | if (!file.exists() || file.getSize() == 0) |
| 903 | throw Exception("Part " + path + " is broken: " + file_path + " is empty" , ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); |
| 904 | return file.getSize(); |
| 905 | }; |
| 906 | |
| 907 | /// Check that the primary key index is not empty. |
| 908 | if (!storage.primary_key_columns.empty()) |
| 909 | check_file_not_empty(path + "primary.idx" ); |
| 910 | |
| 911 | if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) |
| 912 | { |
| 913 | check_file_not_empty(path + "count.txt" ); |
| 914 | |
| 915 | if (storage.partition_key_expr) |
| 916 | check_file_not_empty(path + "partition.dat" ); |
| 917 | |
| 918 | for (const String & col_name : storage.minmax_idx_columns) |
| 919 | check_file_not_empty(path + "minmax_" + escapeForFileName(col_name) + ".idx" ); |
| 920 | } |
| 921 | |
| 922 | /// Check that all marks are nonempty and have the same size. |
| 923 | |
| 924 | std::optional<UInt64> marks_size; |
| 925 | for (const NameAndTypePair & name_type : columns) |
| 926 | { |
| 927 | name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
| 928 | { |
| 929 | Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension); |
| 930 | |
| 931 | /// Missing file is Ok for case when new column was added. |
| 932 | if (file.exists()) |
| 933 | { |
| 934 | UInt64 file_size = file.getSize(); |
| 935 | |
| 936 | if (!file_size) |
| 937 | throw Exception("Part " + path + " is broken: " + file.path() + " is empty." , |
| 938 | ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); |
| 939 | |
| 940 | if (!marks_size) |
| 941 | marks_size = file_size; |
| 942 | else if (file_size != *marks_size) |
| 943 | throw Exception("Part " + path + " is broken: marks have different sizes." , |
| 944 | ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); |
| 945 | } |
| 946 | }); |
| 947 | } |
| 948 | } |
| 949 | } |
| 950 | |
| 951 | bool MergeTreeDataPart::hasColumnFiles(const String & column_name, const IDataType & type) const |
| 952 | { |
| 953 | bool res = true; |
| 954 | |
| 955 | type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
| 956 | { |
| 957 | String file_name = IDataType::getFileNameForStream(column_name, substream_path); |
| 958 | |
| 959 | auto bin_checksum = checksums.files.find(file_name + ".bin" ); |
| 960 | auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension); |
| 961 | |
| 962 | if (bin_checksum == checksums.files.end() || mrk_checksum == checksums.files.end()) |
| 963 | res = false; |
| 964 | }, {}); |
| 965 | |
| 966 | return res; |
| 967 | } |
| 968 | |
| 969 | |
| 970 | UInt64 MergeTreeDataPart::getIndexSizeInBytes() const |
| 971 | { |
| 972 | UInt64 res = 0; |
| 973 | for (const ColumnPtr & column : index) |
| 974 | res += column->byteSize(); |
| 975 | return res; |
| 976 | } |
| 977 | |
| 978 | UInt64 MergeTreeDataPart::getIndexSizeInAllocatedBytes() const |
| 979 | { |
| 980 | UInt64 res = 0; |
| 981 | for (const ColumnPtr & column : index) |
| 982 | res += column->allocatedBytes(); |
| 983 | return res; |
| 984 | } |
| 985 | |
| 986 | String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state) |
| 987 | { |
| 988 | switch (state) |
| 989 | { |
| 990 | case State::Temporary: |
| 991 | return "Temporary" ; |
| 992 | case State::PreCommitted: |
| 993 | return "PreCommitted" ; |
| 994 | case State::Committed: |
| 995 | return "Committed" ; |
| 996 | case State::Outdated: |
| 997 | return "Outdated" ; |
| 998 | case State::Deleting: |
| 999 | return "Deleting" ; |
| 1000 | case State::DeleteOnDestroy: |
| 1001 | return "DeleteOnDestroy" ; |
| 1002 | } |
| 1003 | |
| 1004 | __builtin_unreachable(); |
| 1005 | } |
| 1006 | |
| 1007 | String MergeTreeDataPart::stateString() const |
| 1008 | { |
| 1009 | return stateToString(state); |
| 1010 | } |
| 1011 | |
| 1012 | void MergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPart::State> & affordable_states) const |
| 1013 | { |
| 1014 | if (!checkState(affordable_states)) |
| 1015 | { |
| 1016 | String states_str; |
| 1017 | for (auto affordable_state : affordable_states) |
| 1018 | states_str += stateToString(affordable_state) + " " ; |
| 1019 | |
| 1020 | throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART); |
| 1021 | } |
| 1022 | } |
| 1023 | |
| 1024 | } |
| 1025 | |