1#include <Compression/CompressionCodecGorilla.h>
2#include <Compression/CompressionInfo.h>
3#include <Compression/CompressionFactory.h>
4#include <common/unaligned.h>
5#include <Parsers/IAST_fwd.h>
6#include <IO/WriteHelpers.h>
7#include <IO/ReadBufferFromMemory.h>
8#include <IO/BitHelpers.h>
9
10#include <string.h>
11#include <algorithm>
12#include <cstdlib>
13#include <type_traits>
14
15#include <bitset>
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22extern const int CANNOT_COMPRESS;
23extern const int CANNOT_DECOMPRESS;
24extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
25extern const int ILLEGAL_CODEC_PARAMETER;
26}
27
28namespace
29{
30
31constexpr inline UInt8 getBitLengthOfLength(UInt8 data_bytes_size)
32{
33 // 1-byte value is 8 bits, and we need 4 bits to represent 8 : 1000,
34 // 2-byte 16 bits => 5
35 // 4-byte 32 bits => 6
36 // 8-byte 64 bits => 7
37 const UInt8 bit_lengths[] = {0, 4, 5, 0, 6, 0, 0, 0, 7};
38 assert(data_bytes_size >= 1 && data_bytes_size < sizeof(bit_lengths) && bit_lengths[data_bytes_size] != 0);
39
40 return bit_lengths[data_bytes_size];
41}
42
43
44UInt32 getCompressedHeaderSize(UInt8 data_bytes_size)
45{
46 const UInt8 items_count_size = 4;
47
48 return items_count_size + data_bytes_size;
49}
50
51UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size)
52{
53 const UInt32 items_count = uncompressed_size / data_bytes_size;
54 static const auto DATA_BIT_LENGTH = getBitLengthOfLength(data_bytes_size);
55 // -1 since there must be at least 1 non-zero bit.
56 static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
57
58 // worst case (for 32-bit value):
59 // 11 + 5 bits of leading zeroes bit-size + 5 bits of data bit-size + non-zero data bits.
60 const UInt32 max_item_size_bits = 2 + LEADING_ZEROES_BIT_LENGTH + DATA_BIT_LENGTH + data_bytes_size * 8;
61
62 // + 8 is to round up to next byte.
63 return (items_count * max_item_size_bits + 8) / 8;
64}
65
66struct binary_value_info
67{
68 UInt8 leading_zero_bits;
69 UInt8 data_bits;
70 UInt8 trailing_zero_bits;
71};
72
73template <typename T>
74binary_value_info getLeadingAndTrailingBits(const T & value)
75{
76 constexpr UInt8 bit_size = sizeof(T) * 8;
77
78 const UInt8 lz = getLeadingZeroBits(value);
79 const UInt8 tz = getTrailingZeroBits(value);
80 const UInt8 data_size = value == 0 ? 0 : static_cast<UInt8>(bit_size - lz - tz);
81
82 return binary_value_info{lz, data_size, tz};
83}
84
85template <typename T>
86UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 dest_size)
87{
88 static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
89 // -1 since there must be at least 1 non-zero bit.
90 static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
91
92 if (source_size % sizeof(T) != 0)
93 throw Exception("Cannot compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS);
94 const char * source_end = source + source_size;
95 const char * dest_end = dest + dest_size;
96
97 const UInt32 items_count = source_size / sizeof(T);
98
99 unalignedStore<UInt32>(dest, items_count);
100 dest += sizeof(items_count);
101
102 T prev_value{};
103 // That would cause first XORed value to be written in-full.
104 binary_value_info prev_xored_info{0, 0, 0};
105
106 if (source < source_end)
107 {
108 prev_value = unalignedLoad<T>(source);
109 unalignedStore<T>(dest, prev_value);
110
111 source += sizeof(prev_value);
112 dest += sizeof(prev_value);
113 }
114
115 WriteBuffer buffer(dest, dest_end - dest);
116 BitWriter writer(buffer);
117
118 while (source < source_end)
119 {
120 const T curr_value = unalignedLoad<T>(source);
121 source += sizeof(curr_value);
122
123 const auto xored_data = curr_value ^ prev_value;
124 const binary_value_info curr_xored_info = getLeadingAndTrailingBits(xored_data);
125
126 if (xored_data == 0)
127 {
128 writer.writeBits(1, 0);
129 }
130 else if (prev_xored_info.data_bits != 0
131 && prev_xored_info.leading_zero_bits <= curr_xored_info.leading_zero_bits
132 && prev_xored_info.trailing_zero_bits <= curr_xored_info.trailing_zero_bits)
133 {
134 writer.writeBits(2, 0b10);
135 writer.writeBits(prev_xored_info.data_bits, xored_data >> prev_xored_info.trailing_zero_bits);
136 }
137 else
138 {
139 writer.writeBits(2, 0b11);
140 writer.writeBits(LEADING_ZEROES_BIT_LENGTH, curr_xored_info.leading_zero_bits);
141 writer.writeBits(DATA_BIT_LENGTH, curr_xored_info.data_bits);
142 writer.writeBits(curr_xored_info.data_bits, xored_data >> curr_xored_info.trailing_zero_bits);
143 prev_xored_info = curr_xored_info;
144 }
145
146 prev_value = curr_value;
147 }
148
149 writer.flush();
150
151 return sizeof(items_count) + sizeof(prev_value) + buffer.count();
152}
153
154template <typename T>
155void decompressDataForType(const char * source, UInt32 source_size, char * dest)
156{
157 static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
158 // -1 since there must be at least 1 non-zero bit.
159 static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
160
161 const char * source_end = source + source_size;
162
163 const UInt32 items_count = unalignedLoad<UInt32>(source);
164 source += sizeof(items_count);
165
166 T prev_value{};
167
168 if (source < source_end)
169 {
170 prev_value = unalignedLoad<T>(source);
171 unalignedStore<T>(dest, prev_value);
172
173 source += sizeof(prev_value);
174 dest += sizeof(prev_value);
175 }
176
177 ReadBufferFromMemory buffer(source, source_size - sizeof(items_count) - sizeof(prev_value));
178 BitReader reader(buffer);
179
180 binary_value_info prev_xored_info{0, 0, 0};
181
182 // since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes,
183 // we have to keep track of items to avoid reading more that there is.
184 for (UInt32 items_read = 1; items_read < items_count && !reader.eof(); ++items_read)
185 {
186 T curr_value = prev_value;
187 binary_value_info curr_xored_info = prev_xored_info;
188 T xored_data{};
189
190 if (reader.readBit() == 1)
191 {
192 if (reader.readBit() == 1)
193 {
194 // 0b11 prefix
195 curr_xored_info.leading_zero_bits = reader.readBits(LEADING_ZEROES_BIT_LENGTH);
196 curr_xored_info.data_bits = reader.readBits(DATA_BIT_LENGTH);
197 curr_xored_info.trailing_zero_bits = sizeof(T) * 8 - curr_xored_info.leading_zero_bits - curr_xored_info.data_bits;
198 }
199 // else: 0b10 prefix - use prev_xored_info
200
201 if (curr_xored_info.leading_zero_bits == 0
202 && curr_xored_info.data_bits == 0
203 && curr_xored_info.trailing_zero_bits == 0)
204 {
205 throw Exception("Cannot decompress gorilla-encoded data: corrupted input data.",
206 ErrorCodes::CANNOT_DECOMPRESS);
207 }
208
209 xored_data = reader.readBits(curr_xored_info.data_bits);
210 xored_data <<= curr_xored_info.trailing_zero_bits;
211 curr_value = prev_value ^ xored_data;
212 }
213 // else: 0b0 prefix - use prev_value
214
215 unalignedStore<T>(dest, curr_value);
216 dest += sizeof(curr_value);
217
218 prev_xored_info = curr_xored_info;
219 prev_value = curr_value;
220 }
221}
222
223UInt8 getDataBytesSize(DataTypePtr column_type)
224{
225 UInt8 delta_bytes_size = 1;
226 if (column_type && column_type->haveMaximumSizeOfValue())
227 {
228 size_t max_size = column_type->getSizeOfValueInMemory();
229 if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8)
230 delta_bytes_size = static_cast<UInt8>(max_size);
231 }
232 return delta_bytes_size;
233}
234
235}
236
237
238CompressionCodecGorilla::CompressionCodecGorilla(UInt8 data_bytes_size_)
239 : data_bytes_size(data_bytes_size_)
240{
241}
242
243UInt8 CompressionCodecGorilla::getMethodByte() const
244{
245 return static_cast<UInt8>(CompressionMethodByte::Gorilla);
246}
247
248String CompressionCodecGorilla::getCodecDesc() const
249{
250 return "Gorilla";
251}
252
253UInt32 CompressionCodecGorilla::getMaxCompressedDataSize(UInt32 uncompressed_size) const
254{
255 const auto result = 2 // common header
256 + data_bytes_size // max bytes skipped if source is not properly aligned.
257 + getCompressedHeaderSize(data_bytes_size) // data-specific header
258 + getCompressedDataSize(data_bytes_size, uncompressed_size);
259
260 return result;
261}
262
263UInt32 CompressionCodecGorilla::doCompressData(const char * source, UInt32 source_size, char * dest) const
264{
265 UInt8 bytes_to_skip = source_size % data_bytes_size;
266 dest[0] = data_bytes_size;
267 dest[1] = bytes_to_skip; /// unused (backward compatibility)
268 memcpy(&dest[2], source, bytes_to_skip);
269 size_t start_pos = 2 + bytes_to_skip;
270 UInt32 result_size = 0;
271
272 const UInt32 compressed_size = getMaxCompressedDataSize(source_size);
273 switch (data_bytes_size)
274 {
275 case 1:
276 result_size = compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
277 break;
278 case 2:
279 result_size = compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
280 break;
281 case 4:
282 result_size = compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
283 break;
284 case 8:
285 result_size = compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos], compressed_size);
286 break;
287 }
288
289 return 1 + 1 + result_size;
290}
291
292void CompressionCodecGorilla::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
293{
294 if (source_size < 2)
295 throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
296
297 UInt8 bytes_size = source[0];
298 UInt8 bytes_to_skip = uncompressed_size % bytes_size;
299
300 if (UInt32(2 + bytes_to_skip) > source_size)
301 throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
302
303 memcpy(dest, &source[2], bytes_to_skip);
304 UInt32 source_size_no_header = source_size - bytes_to_skip - 2;
305 switch (bytes_size)
306 {
307 case 1:
308 decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
309 break;
310 case 2:
311 decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
312 break;
313 case 4:
314 decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
315 break;
316 case 8:
317 decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
318 break;
319 }
320}
321
322void CompressionCodecGorilla::useInfoAboutType(DataTypePtr data_type)
323{
324 data_bytes_size = getDataBytesSize(data_type);
325}
326
327void registerCodecGorilla(CompressionCodecFactory & factory)
328{
329 UInt8 method_code = UInt8(CompressionMethodByte::Gorilla);
330 factory.registerCompressionCodecWithType("Gorilla", method_code, [&](const ASTPtr &, DataTypePtr column_type) -> CompressionCodecPtr
331 {
332 UInt8 delta_bytes_size = getDataBytesSize(column_type);
333 return std::make_shared<CompressionCodecGorilla>(delta_bytes_size);
334 });
335}
336}
337