1#include "CompressedReadBufferBase.h"
2
3#include <vector>
4#include <string.h>
5#include <city.h>
6#include <Common/PODArray.h>
7#include <Common/ProfileEvents.h>
8#include <Common/Exception.h>
9#include <Common/hex.h>
10#include <common/unaligned.h>
11#include <Compression/ICompressionCodec.h>
12#include <Compression/CompressionFactory.h>
13#include <IO/ReadBuffer.h>
14#include <IO/BufferWithOwnMemory.h>
15#include <Compression/CompressionInfo.h>
16#include <IO/WriteHelpers.h>
17
18
19namespace ProfileEvents
20{
21 extern const Event ReadCompressedBytes;
22 extern const Event CompressedReadBufferBlocks;
23 extern const Event CompressedReadBufferBytes;
24}
25
26namespace DB
27{
28
29namespace ErrorCodes
30{
31 extern const int UNKNOWN_COMPRESSION_METHOD;
32 extern const int TOO_LARGE_SIZE_COMPRESSED;
33 extern const int CHECKSUM_DOESNT_MATCH;
34 extern const int CANNOT_DECOMPRESS;
35 extern const int CORRUPTED_DATA;
36}
37
38using Checksum = CityHash_v1_0_2::uint128;
39
40
41/// Validate checksum of data, and if it mismatches, find out possible reason and throw exception.
42static void validateChecksum(char * data, size_t size, const Checksum expected_checksum)
43{
44 auto calculated_checksum = CityHash_v1_0_2::CityHash128(data, size);
45 if (expected_checksum == calculated_checksum)
46 return;
47
48 std::stringstream message;
49
50 /// TODO mess up of endianess in error message.
51 message << "Checksum doesn't match: corrupted data."
52 " Reference: " + getHexUIntLowercase(expected_checksum.first) + getHexUIntLowercase(expected_checksum.second)
53 + ". Actual: " + getHexUIntLowercase(calculated_checksum.first) + getHexUIntLowercase(calculated_checksum.second)
54 + ". Size of compressed block: " + toString(size);
55
56 auto message_hardware_failure = "This is most likely due to hardware failure. If you receive broken data over network and the error does not repeat every time, this can be caused by bad RAM on network interface controller or bad controller itself or bad RAM on network switches or bad CPU on network switches (look at the logs on related network switches; note that TCP checksums don't help) or bad RAM on host (look at dmesg or kern.log for enormous amount of EDAC errors, ECC-related reports, Machine Check Exceptions, mcelog; note that ECC memory can fail if the number of errors is huge) or bad CPU on host. If you read data from disk, this can be caused by disk bit rott. This exception protects ClickHouse from data corruption due to hardware failures.";
57
58 auto flip_bit = [](char * buf, size_t pos)
59 {
60 buf[pos / 8] ^= 1 << pos % 8;
61 };
62
63 /// Check if the difference caused by single bit flip in data.
64 for (size_t bit_pos = 0; bit_pos < size * 8; ++bit_pos)
65 {
66 flip_bit(data, bit_pos);
67
68 auto checksum_of_data_with_flipped_bit = CityHash_v1_0_2::CityHash128(data, size);
69 if (expected_checksum == checksum_of_data_with_flipped_bit)
70 {
71 message << ". The mismatch is caused by single bit flip in data block at byte " << (bit_pos / 8) << ", bit " << (bit_pos % 8) << ". "
72 << message_hardware_failure;
73 throw Exception(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
74 }
75
76 flip_bit(data, bit_pos); /// Restore
77 }
78
79 /// Check if the difference caused by single bit flip in stored checksum.
80 size_t difference = __builtin_popcountll(expected_checksum.first ^ calculated_checksum.first)
81 + __builtin_popcountll(expected_checksum.second ^ calculated_checksum.second);
82
83 if (difference == 1)
84 {
85 message << ". The mismatch is caused by single bit flip in checksum. "
86 << message_hardware_failure;
87 throw Exception(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
88 }
89
90 throw Exception(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
91}
92
93
94/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
95/// Returns number of compressed bytes read.
96size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
97{
98 if (compressed_in->eof())
99 return 0;
100
101 Checksum checksum;
102 compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(Checksum));
103
104 UInt8 header_size = ICompressionCodec::getHeaderSize();
105 own_compressed_buffer.resize(header_size);
106 compressed_in->readStrict(own_compressed_buffer.data(), header_size);
107
108 UInt8 method = ICompressionCodec::readMethod(own_compressed_buffer.data());
109
110 if (!codec)
111 codec = CompressionCodecFactory::instance().get(method);
112 else if (method != codec->getMethodByte())
113 throw Exception("Data compressed with different methods, given method byte "
114 + getHexUIntLowercase(method)
115 + ", previous method byte "
116 + getHexUIntLowercase(codec->getMethodByte()),
117 ErrorCodes::CANNOT_DECOMPRESS);
118
119 size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data());
120 size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data());
121
122 if (size_compressed_without_checksum > DBMS_MAX_COMPRESSED_SIZE)
123 throw Exception("Too large size_compressed_without_checksum: "
124 + toString(size_compressed_without_checksum)
125 + ". Most likely corrupted data.",
126 ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
127
128 if (size_compressed_without_checksum < header_size)
129 throw Exception("Can't decompress data: the compressed data size (" + toString(size_compressed_without_checksum)
130 + ", this should include header size) is less than the header size (" + toString(header_size) + ")", ErrorCodes::CORRUPTED_DATA);
131
132 ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
133
134 /// Is whole compressed block located in 'compressed_in->' buffer?
135 if (compressed_in->offset() >= header_size &&
136 compressed_in->position() + size_compressed_without_checksum + codec->getAdditionalSizeAtTheEndOfBuffer() - header_size <= compressed_in->buffer().end())
137 {
138 compressed_in->position() -= header_size;
139 compressed_buffer = compressed_in->position();
140 compressed_in->position() += size_compressed_without_checksum;
141 }
142 else
143 {
144 own_compressed_buffer.resize(size_compressed_without_checksum + codec->getAdditionalSizeAtTheEndOfBuffer());
145 compressed_buffer = own_compressed_buffer.data();
146 compressed_in->readStrict(compressed_buffer + header_size, size_compressed_without_checksum - header_size);
147 }
148
149 if (!disable_checksum)
150 validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
151
152 return size_compressed_without_checksum + sizeof(Checksum);
153}
154
155
156void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
157{
158 ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
159 ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
160
161 UInt8 method = ICompressionCodec::readMethod(compressed_buffer);
162
163 if (!codec)
164 codec = CompressionCodecFactory::instance().get(method);
165 else if (codec->getMethodByte() != method)
166 throw Exception("Data compressed with different methods, given method byte "
167 + getHexUIntLowercase(method)
168 + ", previous method byte "
169 + getHexUIntLowercase(codec->getMethodByte()),
170 ErrorCodes::CANNOT_DECOMPRESS);
171
172 codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
173}
174
175
176/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
177CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in)
178 : compressed_in(in), own_compressed_buffer(0)
179{
180}
181
182
183CompressedReadBufferBase::~CompressedReadBufferBase() = default; /// Proper destruction of unique_ptr of forward-declared type.
184
185
186}
187
188