1#include "MergeTreeDataPartChecksum.h"
2#include <Common/SipHash.h>
3#include <Common/hex.h>
4#include <IO/ReadHelpers.h>
5#include <IO/WriteHelpers.h>
6#include <IO/ReadBufferFromString.h>
7#include <IO/WriteBufferFromString.h>
8#include <Compression/CompressedReadBuffer.h>
9#include <Compression/CompressedWriteBuffer.h>
10#include <Poco/File.h>
11
12
13namespace DB
14{
15
16
17namespace ErrorCodes
18{
19 extern const int CHECKSUM_DOESNT_MATCH;
20 extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
21 extern const int FORMAT_VERSION_TOO_OLD;
22 extern const int FILE_DOESNT_EXIST;
23 extern const int UNEXPECTED_FILE_IN_DATA_PART;
24 extern const int UNKNOWN_FORMAT;
25 extern const int NO_FILE_IN_DATA_PART;
26}
27
28
29void MergeTreeDataPartChecksum::checkEqual(const MergeTreeDataPartChecksum & rhs, bool have_uncompressed, const String & name) const
30{
31 if (is_compressed && have_uncompressed)
32 {
33 if (!rhs.is_compressed)
34 throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
35 if (rhs.uncompressed_size != uncompressed_size)
36 throw Exception("Unexpected uncompressed size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
37 if (rhs.uncompressed_hash != uncompressed_hash)
38 throw Exception("Checksum mismatch for uncompressed file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
39 return;
40 }
41 if (rhs.file_size != file_size)
42 throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
43 if (rhs.file_hash != file_hash)
44 throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
45}
46
47void MergeTreeDataPartChecksum::checkSize(const String & path) const
48{
49 Poco::File file(path);
50 if (!file.exists())
51 throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
52 UInt64 size = file.getSize();
53 if (size != file_size)
54 throw Exception(path + " has unexpected size: " + toString(size) + " instead of " + toString(file_size),
55 ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
56}
57
58
59void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const
60{
61 for (const auto & it : rhs.files)
62 {
63 const String & name = it.first;
64
65 if (!files.count(name))
66 throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
67 }
68
69 for (const auto & it : files)
70 {
71 const String & name = it.first;
72
73 auto jt = rhs.files.find(name);
74 if (jt == rhs.files.end())
75 throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);
76
77 it.second.checkEqual(jt->second, have_uncompressed, name);
78 }
79}
80
81void MergeTreeDataPartChecksums::checkSizes(const String & path) const
82{
83 for (const auto & it : files)
84 {
85 const String & name = it.first;
86 it.second.checkSize(path + name);
87 }
88}
89
90UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const
91{
92 UInt64 res = 0;
93 for (const auto & it : files)
94 res += it.second.file_size;
95 return res;
96}
97
98bool MergeTreeDataPartChecksums::read(ReadBuffer & in, size_t format_version)
99{
100 switch (format_version)
101 {
102 case 1:
103 return false;
104 case 2:
105 return read_v2(in);
106 case 3:
107 return read_v3(in);
108 case 4:
109 return read_v4(in);
110 default:
111 throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
112 }
113}
114
115bool MergeTreeDataPartChecksums::read(ReadBuffer & in)
116{
117 files.clear();
118
119 assertString("checksums format version: ", in);
120 size_t format_version;
121 readText(format_version, in);
122 assertChar('\n', in);
123
124 read(in, format_version);
125 return true;
126}
127
128bool MergeTreeDataPartChecksums::read_v2(ReadBuffer & in)
129{
130 size_t count;
131
132 readText(count, in);
133 assertString(" files:\n", in);
134
135 for (size_t i = 0; i < count; ++i)
136 {
137 String name;
138 Checksum sum;
139
140 readString(name, in);
141 assertString("\n\tsize: ", in);
142 readText(sum.file_size, in);
143 assertString("\n\thash: ", in);
144 readText(sum.file_hash.first, in);
145 assertString(" ", in);
146 readText(sum.file_hash.second, in);
147 assertString("\n\tcompressed: ", in);
148 readText(sum.is_compressed, in);
149 if (sum.is_compressed)
150 {
151 assertString("\n\tuncompressed size: ", in);
152 readText(sum.uncompressed_size, in);
153 assertString("\n\tuncompressed hash: ", in);
154 readText(sum.uncompressed_hash.first, in);
155 assertString(" ", in);
156 readText(sum.uncompressed_hash.second, in);
157 }
158 assertChar('\n', in);
159
160 files.insert(std::make_pair(name, sum));
161 }
162
163 return true;
164}
165
166bool MergeTreeDataPartChecksums::read_v3(ReadBuffer & in)
167{
168 size_t count;
169
170 readVarUInt(count, in);
171
172 for (size_t i = 0; i < count; ++i)
173 {
174 String name;
175 Checksum sum;
176
177 readBinary(name, in);
178 readVarUInt(sum.file_size, in);
179 readPODBinary(sum.file_hash, in);
180 readBinary(sum.is_compressed, in);
181
182 if (sum.is_compressed)
183 {
184 readVarUInt(sum.uncompressed_size, in);
185 readPODBinary(sum.uncompressed_hash, in);
186 }
187
188 files.emplace(std::move(name), sum);
189 }
190
191 return true;
192}
193
194bool MergeTreeDataPartChecksums::read_v4(ReadBuffer & from)
195{
196 CompressedReadBuffer in{from};
197 return read_v3(in);
198}
199
200void MergeTreeDataPartChecksums::write(WriteBuffer & to) const
201{
202 writeString("checksums format version: 4\n", to);
203
204 CompressedWriteBuffer out{to, CompressionCodecFactory::instance().getDefaultCodec(), 1 << 16};
205
206 writeVarUInt(files.size(), out);
207
208 for (const auto & it : files)
209 {
210 const String & name = it.first;
211 const Checksum & sum = it.second;
212
213 writeBinary(name, out);
214 writeVarUInt(sum.file_size, out);
215 writePODBinary(sum.file_hash, out);
216 writeBinary(sum.is_compressed, out);
217
218 if (sum.is_compressed)
219 {
220 writeVarUInt(sum.uncompressed_size, out);
221 writePODBinary(sum.uncompressed_hash, out);
222 }
223 }
224}
225
226void MergeTreeDataPartChecksums::addFile(const String & file_name, UInt64 file_size, MergeTreeDataPartChecksum::uint128 file_hash)
227{
228 files[file_name] = Checksum(file_size, file_hash);
229}
230
231void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums)
232{
233 for (auto & checksum : rhs_checksums.files)
234 files[std::move(checksum.first)] = std::move(checksum.second);
235
236 rhs_checksums.files.clear();
237}
238
239/// Checksum computed from the set of control sums of .bin files.
240void MergeTreeDataPartChecksums::computeTotalChecksumDataOnly(SipHash & hash) const
241{
242 /// We use fact that iteration is in deterministic (lexicographical) order.
243 for (const auto & it : files)
244 {
245 const String & name = it.first;
246 const Checksum & sum = it.second;
247
248 if (!endsWith(name, ".bin"))
249 continue;
250
251 UInt64 len = name.size();
252 hash.update(len);
253 hash.update(name.data(), len);
254 hash.update(sum.uncompressed_size);
255 hash.update(sum.uncompressed_hash);
256 }
257}
258
259String MergeTreeDataPartChecksums::getSerializedString() const
260{
261 WriteBufferFromOwnString out;
262 write(out);
263 return out.str();
264}
265
266MergeTreeDataPartChecksums MergeTreeDataPartChecksums::deserializeFrom(const String & s)
267{
268 ReadBufferFromString in(s);
269 MergeTreeDataPartChecksums res;
270 if (!res.read(in))
271 throw Exception("Checksums format is too old", ErrorCodes::FORMAT_VERSION_TOO_OLD);
272 assertEOF(in);
273 return res;
274}
275
276bool MergeTreeDataPartChecksums::isBadChecksumsErrorCode(int code)
277{
278 return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
279 || code == ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART
280 || code == ErrorCodes::NO_FILE_IN_DATA_PART
281 || code == ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART;
282}
283
284/// Puts into hash "stream" length of the string and its bytes
285static void updateHash(SipHash & hash, const std::string & data)
286{
287 UInt64 len = data.size();
288 hash.update(len);
289 hash.update(data.data(), len);
290}
291
292/// Hash is the same as MinimalisticDataPartChecksums::hash_of_all_files
293String MergeTreeDataPartChecksums::getTotalChecksumHex() const
294{
295 SipHash hash_of_all_files;
296
297 for (const auto & elem : files)
298 {
299 const String & name = elem.first;
300 const auto & checksum = elem.second;
301
302 updateHash(hash_of_all_files, name);
303 hash_of_all_files.update(checksum.file_hash);
304 }
305
306 UInt64 lo, hi;
307 hash_of_all_files.get128(lo, hi);
308
309 return getHexUIntUppercase(hi) + getHexUIntUppercase(lo);
310}
311
312void MinimalisticDataPartChecksums::serialize(WriteBuffer & to) const
313{
314 writeString("checksums format version: 5\n", to);
315 serializeWithoutHeader(to);
316}
317
318void MinimalisticDataPartChecksums::serializeWithoutHeader(WriteBuffer & to) const
319{
320 writeVarUInt(num_compressed_files, to);
321 writeVarUInt(num_uncompressed_files, to);
322
323 writePODBinary(hash_of_all_files, to);
324 writePODBinary(hash_of_uncompressed_files, to);
325 writePODBinary(uncompressed_hash_of_compressed_files, to);
326}
327
328String MinimalisticDataPartChecksums::getSerializedString()
329{
330 WriteBufferFromOwnString wb;
331 serialize(wb);
332 return wb.str();
333}
334
335bool MinimalisticDataPartChecksums::deserialize(ReadBuffer & in)
336{
337 assertString("checksums format version: ", in);
338 size_t format_version;
339 readText(format_version, in);
340 assertChar('\n', in);
341
342 if (format_version < MINIMAL_VERSION_WITH_MINIMALISTIC_CHECKSUMS)
343 {
344 MergeTreeDataPartChecksums new_full_checksums;
345 if (!new_full_checksums.read(in, format_version))
346 return false;
347
348 computeTotalChecksums(new_full_checksums);
349 full_checksums = std::move(new_full_checksums);
350 return true;
351 }
352
353 if (format_version > MINIMAL_VERSION_WITH_MINIMALISTIC_CHECKSUMS)
354 throw Exception("Unknown checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
355
356 deserializeWithoutHeader(in);
357
358 return true;
359}
360
361void MinimalisticDataPartChecksums::deserializeWithoutHeader(ReadBuffer & in)
362{
363 readVarUInt(num_compressed_files, in);
364 readVarUInt(num_uncompressed_files, in);
365
366 readPODBinary(hash_of_all_files, in);
367 readPODBinary(hash_of_uncompressed_files, in);
368 readPODBinary(uncompressed_hash_of_compressed_files, in);
369}
370
371void MinimalisticDataPartChecksums::computeTotalChecksums(const MergeTreeDataPartChecksums & full_checksums_)
372{
373 num_compressed_files = 0;
374 num_uncompressed_files = 0;
375
376 SipHash hash_of_all_files_;
377 SipHash hash_of_uncompressed_files_;
378 SipHash uncompressed_hash_of_compressed_files_;
379
380 for (const auto & elem : full_checksums_.files)
381 {
382 const String & name = elem.first;
383 const auto & checksum = elem.second;
384
385 updateHash(hash_of_all_files_, name);
386 hash_of_all_files_.update(checksum.file_hash);
387
388 if (!checksum.is_compressed)
389 {
390 ++num_uncompressed_files;
391 updateHash(hash_of_uncompressed_files_, name);
392 hash_of_uncompressed_files_.update(checksum.file_hash);
393 }
394 else
395 {
396 ++num_compressed_files;
397 updateHash(uncompressed_hash_of_compressed_files_, name);
398 uncompressed_hash_of_compressed_files_.update(checksum.uncompressed_hash);
399 }
400 }
401
402 auto get_hash = [] (SipHash & hash, uint128 & data)
403 {
404 hash.get128(data.first, data.second);
405 };
406
407 get_hash(hash_of_all_files_, hash_of_all_files);
408 get_hash(hash_of_uncompressed_files_, hash_of_uncompressed_files);
409 get_hash(uncompressed_hash_of_compressed_files_, uncompressed_hash_of_compressed_files);
410}
411
412String MinimalisticDataPartChecksums::getSerializedString(const MergeTreeDataPartChecksums & full_checksums, bool minimalistic)
413{
414 if (!minimalistic)
415 return full_checksums.getSerializedString();
416
417 MinimalisticDataPartChecksums checksums;
418 checksums.computeTotalChecksums(full_checksums);
419 return checksums.getSerializedString();
420}
421
422void MinimalisticDataPartChecksums::checkEqual(const MinimalisticDataPartChecksums & rhs, bool check_uncompressed_hash_in_compressed_files) const
423{
424 if (full_checksums && rhs.full_checksums)
425 full_checksums->checkEqual(*rhs.full_checksums, check_uncompressed_hash_in_compressed_files);
426
427 // If full checksums were checked, check total checksums just in case
428 checkEqualImpl(rhs, check_uncompressed_hash_in_compressed_files);
429}
430
431void MinimalisticDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & rhs, bool check_uncompressed_hash_in_compressed_files) const
432{
433 if (full_checksums)
434 full_checksums->checkEqual(rhs, check_uncompressed_hash_in_compressed_files);
435
436 // If full checksums were checked, check total checksums just in case
437 MinimalisticDataPartChecksums rhs_minimalistic;
438 rhs_minimalistic.computeTotalChecksums(rhs);
439 checkEqualImpl(rhs_minimalistic, check_uncompressed_hash_in_compressed_files);
440}
441
442void MinimalisticDataPartChecksums::checkEqualImpl(const MinimalisticDataPartChecksums & rhs, bool check_uncompressed_hash_in_compressed_files) const
443{
444 if (num_compressed_files != rhs.num_compressed_files || num_uncompressed_files != rhs.num_uncompressed_files)
445 {
446 std::stringstream error_msg;
447 error_msg << "Different number of files: " << rhs.num_compressed_files << " compressed (expected " << num_compressed_files << ")"
448 << " and " << rhs.num_uncompressed_files << " uncompressed ones (expected " << num_uncompressed_files << ")";
449
450 throw Exception(error_msg.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
451 }
452
453 Strings errors;
454
455 if (hash_of_uncompressed_files != rhs.hash_of_uncompressed_files)
456 errors.emplace_back("hash of uncompressed files doesn't match");
457
458 if (check_uncompressed_hash_in_compressed_files)
459 {
460 if (uncompressed_hash_of_compressed_files != rhs.uncompressed_hash_of_compressed_files)
461 errors.emplace_back("uncompressed hash of compressed files doesn't match");
462 }
463 else
464 {
465 if (hash_of_all_files != rhs.hash_of_all_files)
466 errors.emplace_back("total hash of all files doesn't match");
467 }
468
469 if (!errors.empty())
470 {
471 String error_msg = "Checksums of parts don't match: " + errors.front();
472 for (size_t i = 1; i < errors.size(); ++i)
473 error_msg += ", " + errors[i];
474
475 throw Exception(error_msg, ErrorCodes::CHECKSUM_DOESNT_MATCH);
476 }
477}
478
479MinimalisticDataPartChecksums MinimalisticDataPartChecksums::deserializeFrom(const String & s)
480{
481 MinimalisticDataPartChecksums res;
482 ReadBufferFromString rb(s);
483 res.deserialize(rb);
484 return res;
485}
486
487}
488