1#include <algorithm>
2#include <optional>
3
4#include <Poco/File.h>
5#include <Poco/DirectoryIterator.h>
6
7#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
8#include <Storages/MergeTree/checkDataPart.h>
9#include <DataStreams/MarkInCompressedFile.h>
10#include <Compression/CompressedReadBuffer.h>
11#include <IO/HashingReadBuffer.h>
12#include <Common/CurrentMetrics.h>
13
14
15namespace CurrentMetrics
16{
17 extern const Metric ReplicatedChecks;
18}
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int CORRUPTED_DATA;
26 extern const int LOGICAL_ERROR;
27 extern const int INCORRECT_MARK;
28 extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
29}
30
31
32namespace
33{
34
35/** To read and checksum single stream (a pair of .bin, .mrk files) for a single column or secondary index.
36 */
37class Stream
38{
39public:
40 String base_name;
41 String bin_file_extension;
42 String mrk_file_extension;
43 String bin_file_path;
44 String mrk_file_path;
45private:
46 const MergeTreeIndexGranularity & index_granularity;
47 ReadBufferFromFile file_buf;
48 HashingReadBuffer compressed_hashing_buf;
49 CompressedReadBuffer uncompressing_buf;
50 size_t mark_position = 0;
51public:
52 HashingReadBuffer uncompressed_hashing_buf;
53
54private:
55 ReadBufferFromFile mrk_file_buf;
56
57 std::pair<MarkInCompressedFile, size_t> readMarkFromFile()
58 {
59 size_t mrk_rows;
60 MarkInCompressedFile mrk_mark;
61 readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
62 readIntBinary(mrk_mark.offset_in_decompressed_block, mrk_hashing_buf);
63 if (mrk_file_extension == ".mrk2")
64 readIntBinary(mrk_rows, mrk_hashing_buf);
65 else
66 mrk_rows = index_granularity.getMarkRows(mark_position);
67
68 return {mrk_mark, mrk_rows};
69 }
70public:
71 HashingReadBuffer mrk_hashing_buf;
72
73 Stream(
74 const String & path,
75 const String & base_name_,
76 const String & bin_file_extension_,
77 const String & mrk_file_extension_,
78 const MergeTreeIndexGranularity & index_granularity_)
79 : base_name(base_name_)
80 , bin_file_extension(bin_file_extension_)
81 , mrk_file_extension(mrk_file_extension_)
82 , bin_file_path(path + base_name + bin_file_extension)
83 , mrk_file_path(path + base_name + mrk_file_extension)
84 , index_granularity(index_granularity_)
85 , file_buf(bin_file_path)
86 , compressed_hashing_buf(file_buf)
87 , uncompressing_buf(compressed_hashing_buf)
88 , uncompressed_hashing_buf(uncompressing_buf)
89 , mrk_file_buf(mrk_file_path)
90 , mrk_hashing_buf(mrk_file_buf)
91 {}
92
93 void assertMark(bool only_read=false)
94 {
95
96 auto [mrk_mark, mrk_rows] = readMarkFromFile();
97 bool has_alternative_mark = false;
98 MarkInCompressedFile alternative_data_mark = {};
99 MarkInCompressedFile data_mark = {};
100
101 /// If the mark should be exactly at the border of blocks, we can also use a mark pointing to the end of previous block,
102 /// and the beginning of next.
103 if (!uncompressed_hashing_buf.hasPendingData())
104 {
105 /// Get a mark pointing to the end of previous block.
106 has_alternative_mark = true;
107 alternative_data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
108 alternative_data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
109
110 if (mrk_mark == alternative_data_mark)
111 {
112 mark_position++;
113 return;
114 }
115
116 uncompressed_hashing_buf.next();
117
118 /// At the end of file `compressed_hashing_buf.count()` points to the end of the file even before `calling next()`,
119 /// and the check you just performed does not work correctly. For simplicity, we will not check the last mark.
120 if (uncompressed_hashing_buf.eof())
121 {
122 mark_position++;
123 return;
124 }
125 }
126
127 data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
128 data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
129
130 if (!only_read && (mrk_mark != data_mark || mrk_rows != index_granularity.getMarkRows(mark_position)))
131 throw Exception("Incorrect mark: " + data_mark.toStringWithRows(index_granularity.getMarkRows(mark_position)) +
132 (has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " +
133 mrk_mark.toStringWithRows(mrk_rows) + " in " + mrk_file_path + " file", ErrorCodes::INCORRECT_MARK);
134
135 mark_position++;
136 }
137
138 void assertEnd()
139 {
140 if (!uncompressed_hashing_buf.eof())
141 throw Exception("EOF expected in " + bin_file_path + " file"
142 + " at position "
143 + toString(compressed_hashing_buf.count()) + " (compressed), "
144 + toString(uncompressed_hashing_buf.count()) + " (uncompressed)", ErrorCodes::CORRUPTED_DATA);
145
146 /// Maybe we have final mark.
147 if (index_granularity.hasFinalMark())
148 {
149 auto final_mark_rows = readMarkFromFile().second;
150 if (final_mark_rows != 0)
151 throw Exception("Incorrect final mark at the end of " + mrk_file_path + " expected 0 rows, got " + toString(final_mark_rows), ErrorCodes::CORRUPTED_DATA);
152 }
153
154 if (!mrk_hashing_buf.eof())
155 throw Exception("EOF expected in " + mrk_file_path + " file"
156 + " at position "
157 + toString(mrk_hashing_buf.count()), ErrorCodes::CORRUPTED_DATA);
158 }
159
160 void saveChecksums(MergeTreeData::DataPart::Checksums & checksums)
161 {
162 checksums.files[base_name + bin_file_extension] = MergeTreeData::DataPart::Checksums::Checksum(
163 compressed_hashing_buf.count(), compressed_hashing_buf.getHash(),
164 uncompressed_hashing_buf.count(), uncompressed_hashing_buf.getHash());
165
166 checksums.files[base_name + mrk_file_extension] = MergeTreeData::DataPart::Checksums::Checksum(
167 mrk_hashing_buf.count(), mrk_hashing_buf.getHash());
168 }
169};
170
171}
172
173
174MergeTreeData::DataPart::Checksums checkDataPart(
175 const String & full_path,
176 const MergeTreeIndexGranularity & adaptive_index_granularity,
177 const String & mrk_file_extension,
178 bool require_checksums,
179 const DataTypes & primary_key_data_types,
180 const MergeTreeIndices & indices,
181 std::function<bool()> is_cancelled)
182{
183 Logger * log = &Logger::get("checkDataPart");
184
185 /** Responsibility:
186 * - read list of columns from columns.txt;
187 * - read checksums if exist;
188 * - read (and validate checksum) of primary.idx; obtain number of marks;
189 * - read data files and marks for each stream of each column; calculate and validate checksums;
190 * - check that there are the same number of rows in each column;
191 * - check that all marks files have the same size;
192 */
193
194 CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
195
196 String path = full_path;
197 if (!path.empty() && path.back() != '/')
198 path += "/";
199
200 NamesAndTypesList columns;
201
202 {
203 ReadBufferFromFile buf(path + "columns.txt");
204 columns.readText(buf);
205 assertEOF(buf);
206 }
207
208 /// Checksums from file checksums.txt. May be absent. If present, they are subsequently compared with the actual data checksums.
209 MergeTreeData::DataPart::Checksums checksums_txt;
210
211 if (require_checksums || Poco::File(path + "checksums.txt").exists())
212 {
213 ReadBufferFromFile buf(path + "checksums.txt");
214 checksums_txt.read(buf);
215 assertEOF(buf);
216 }
217
218 /// Real checksums based on contents of data. Must correspond to checksums.txt. If not - it means the data is broken.
219 MergeTreeData::DataPart::Checksums checksums_data;
220
221 size_t marks_in_primary_key = 0;
222 if (!primary_key_data_types.empty())
223 {
224 ReadBufferFromFile file_buf(path + "primary.idx");
225 HashingReadBuffer hashing_buf(file_buf);
226
227 size_t key_size = primary_key_data_types.size();
228 MutableColumns tmp_columns(key_size);
229
230 for (size_t j = 0; j < key_size; ++j)
231 tmp_columns[j] = primary_key_data_types[j]->createColumn();
232
233 while (!hashing_buf.eof())
234 {
235 if (is_cancelled())
236 return {};
237
238 ++marks_in_primary_key;
239 for (size_t j = 0; j < key_size; ++j)
240 primary_key_data_types[j]->deserializeBinary(*tmp_columns[j].get(), hashing_buf);
241 }
242
243 size_t primary_idx_size = hashing_buf.count();
244
245 checksums_data.files["primary.idx"] = MergeTreeData::DataPart::Checksums::Checksum(primary_idx_size, hashing_buf.getHash());
246 }
247
248 /// Optional files count.txt, partition.dat, minmax_*.idx, ttl.txt. Just calculate checksums for existing files.
249 Poco::DirectoryIterator dir_end;
250 for (Poco::DirectoryIterator dir_it(path); dir_it != dir_end; ++dir_it)
251 {
252 const String & file_name = dir_it.name();
253 if (file_name == "count.txt"
254 || file_name == "partition.dat"
255 || (startsWith(file_name, "minmax_") && endsWith(file_name, ".idx"))
256 || file_name == "ttl.txt")
257 {
258 ReadBufferFromFile file_buf(dir_it->path());
259 HashingReadBuffer hashing_buf(file_buf);
260 hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
261 checksums_data.files[file_name] = MergeTreeData::DataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
262 }
263 }
264
265 if (is_cancelled())
266 return {};
267
268 /// If count.txt file exists, use it as source of truth for number of rows. Otherwise just check that all columns have equal amount of rows.
269 std::optional<size_t> rows;
270
271 if (Poco::File(path + "count.txt").exists())
272 {
273 ReadBufferFromFile buf(path + "count.txt");
274 size_t count = 0;
275 readText(count, buf);
276 assertEOF(buf);
277 rows = count;
278 }
279
280 /// Read and check skip indices.
281 for (const auto & index : indices)
282 {
283 Stream stream(path, index->getFileName(), ".idx", mrk_file_extension, adaptive_index_granularity);
284 size_t mark_num = 0;
285
286 while (!stream.uncompressed_hashing_buf.eof())
287 {
288 if (stream.mrk_hashing_buf.eof())
289 throw Exception("Unexpected end of mrk file while reading index " + index->name,
290 ErrorCodes::CORRUPTED_DATA);
291 try
292 {
293 stream.assertMark();
294 }
295 catch (Exception & e)
296 {
297 e.addMessage("Cannot read mark " + toString(mark_num)
298 + " in file " + stream.mrk_file_path
299 + ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
300 throw;
301 }
302 try
303 {
304 index->createIndexGranule()->deserializeBinary(stream.uncompressed_hashing_buf);
305 }
306 catch (Exception & e)
307 {
308 e.addMessage("Cannot read granule " + toString(mark_num)
309 + " in file " + stream.bin_file_path
310 + ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
311 throw;
312 }
313 ++mark_num;
314 if (is_cancelled())
315 return {};
316 }
317
318 stream.assertEnd();
319 stream.saveChecksums(checksums_data);
320 }
321
322 /// Read all columns, calculate checksums and validate marks.
323 for (const NameAndTypePair & name_type : columns)
324 {
325 LOG_DEBUG(log, "Checking column " + name_type.name + " in " + path);
326
327 std::map<String, Stream> streams;
328 size_t column_size = 0;
329 size_t mark_num = 0;
330
331 IDataType::DeserializeBinaryBulkStatePtr state;
332 IDataType::DeserializeBinaryBulkSettings settings;
333 settings.getter = [&](const IDataType::SubstreamPath & substream_path)
334 {
335 String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
336 auto & stream = streams.try_emplace(file_name, path, file_name, ".bin", mrk_file_extension, adaptive_index_granularity).first->second;
337 return &stream.uncompressed_hashing_buf;
338 };
339
340 /// Prefixes have to be read before data because first mark points after prefix
341 name_type.type->deserializeBinaryBulkStatePrefix(settings, state);
342
343 while (true)
344 {
345
346 /// Check that mark points to current position in file.
347 bool marks_eof = false;
348 name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
349 {
350 String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
351
352 auto & stream = streams.try_emplace(file_name, path, file_name, ".bin", mrk_file_extension, adaptive_index_granularity).first->second;
353 try
354 {
355 /// LowCardinality dictionary column is not read monotonically, so marks maybe inconsistent with
356 /// offset position in file. So we just read data and marks file, but doesn't check marks equality.
357 bool only_read = !substream_path.empty() && substream_path.back().type == IDataType::Substream::DictionaryKeys;
358 if (!stream.mrk_hashing_buf.eof())
359 stream.assertMark(only_read);
360 else
361 marks_eof = true;
362 }
363 catch (Exception & e)
364 {
365 e.addMessage("Cannot read mark " + toString(mark_num) + " at row " + toString(column_size)
366 + " in file " + stream.mrk_file_path
367 + ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
368 throw;
369 }
370 }, settings.path);
371
372 size_t rows_after_mark = adaptive_index_granularity.getMarkRows(mark_num);
373 ++mark_num;
374
375 /// Read index_granularity rows from column.
376 /// NOTE Shared array sizes of Nested columns are read more than once. That's Ok.
377
378 MutableColumnPtr tmp_column = name_type.type->createColumn();
379 name_type.type->deserializeBinaryBulkWithMultipleStreams(*tmp_column, rows_after_mark, settings, state);
380
381 size_t read_size = tmp_column->size();
382 column_size += read_size;
383
384 /// We already checked all marks except final (it will be checked in assertEnd()).
385 if (mark_num == adaptive_index_granularity.getMarksCountWithoutFinal())
386 break;
387 else if (marks_eof)
388 throw Exception("Unexpected end of mrk file while reading column " + name_type.name, ErrorCodes::CORRUPTED_DATA);
389
390 if (is_cancelled())
391 return {};
392 }
393
394 /// Check that number of rows are equal in each column.
395 if (!rows)
396 rows = column_size;
397 else if (*rows != column_size)
398 throw Exception{"Unexpected number of rows in column "
399 + name_type.name + " (" + toString(column_size) + ", expected: " + toString(*rows) + ")",
400 ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
401
402 /// Save checksums for column.
403 name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
404 {
405 String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
406 auto stream_it = streams.find(file_name);
407 if (stream_it == streams.end())
408 throw Exception("Logical error: cannot find stream " + file_name, ErrorCodes::LOGICAL_ERROR);
409
410 stream_it->second.assertEnd();
411 stream_it->second.saveChecksums(checksums_data);
412 }, {});
413
414 if (is_cancelled())
415 return {};
416 }
417
418 if (!rows)
419 throw Exception("No columns in data part", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
420
421 if (!primary_key_data_types.empty())
422 {
423 size_t expected_marks = adaptive_index_granularity.getMarksCount();
424 if (expected_marks != marks_in_primary_key)
425 {
426 throw Exception("Size of primary key doesn't match expected number of marks."
427 " Number of rows in columns: " + toString(*rows)
428 + ", expected number of marks: " + toString(expected_marks)
429 + ", size of primary key: " + toString(marks_in_primary_key),
430 ErrorCodes::CORRUPTED_DATA);
431 }
432 }
433
434 if (require_checksums || !checksums_txt.files.empty())
435 checksums_txt.checkEqual(checksums_data, true);
436
437 return checksums_data;
438}
439
440MergeTreeData::DataPart::Checksums checkDataPart(
441 MergeTreeData::DataPartPtr data_part,
442 bool require_checksums,
443 const DataTypes & primary_key_data_types,
444 const MergeTreeIndices & indices,
445 std::function<bool()> is_cancelled)
446{
447 return checkDataPart(
448 data_part->getFullPath(),
449 data_part->index_granularity,
450 data_part->index_granularity_info.marks_file_extension,
451 require_checksums,
452 primary_key_data_types,
453 indices,
454 is_cancelled);
455}
456
457
458}
459