1#include <Compression/CompressionCodecDoubleDelta.h>
2#include <Compression/CompressionInfo.h>
3#include <Compression/CompressionFactory.h>
4#include <common/unaligned.h>
5#include <Parsers/IAST_fwd.h>
6
7#include <IO/ReadBufferFromMemory.h>
8#include <IO/BitHelpers.h>
9#include <IO/WriteHelpers.h>
10
11#include <string.h>
12#include <algorithm>
13#include <cstdlib>
14#include <type_traits>
15#include <limits>
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22extern const int CANNOT_COMPRESS;
23extern const int CANNOT_DECOMPRESS;
24}
25
26namespace
27{
28
29Int64 getMaxValueForByteSize(UInt8 byte_size)
30{
31 switch (byte_size)
32 {
33 case sizeof(UInt8):
34 return std::numeric_limits<Int8>::max();
35 case sizeof(UInt16):
36 return std::numeric_limits<Int16>::max();
37 case sizeof(UInt32):
38 return std::numeric_limits<Int32>::max();
39 case sizeof(UInt64):
40 return std::numeric_limits<Int64>::max();
41 default:
42 assert(false && "only 1, 2, 4 and 8 data sizes are supported");
43 }
44 __builtin_unreachable();
45}
46
47struct WriteSpec
48{
49 const UInt8 prefix_bits;
50 const UInt8 prefix;
51 const UInt8 data_bits;
52};
53
54const std::array<UInt8, 5> DELTA_SIZES{7, 9, 12, 32, 64};
55
56template <typename T>
57WriteSpec getDeltaWriteSpec(const T & value)
58{
59 if (value > -63 && value < 64)
60 {
61 return WriteSpec{2, 0b10, 7};
62 }
63 else if (value > -255 && value < 256)
64 {
65 return WriteSpec{3, 0b110, 9};
66 }
67 else if (value > -2047 && value < 2048)
68 {
69 return WriteSpec{4, 0b1110, 12};
70 }
71 else if (value > std::numeric_limits<Int32>::min() && value < std::numeric_limits<Int32>::max())
72 {
73 return WriteSpec{5, 0b11110, 32};
74 }
75 else
76 {
77 return WriteSpec{5, 0b11111, 64};
78 }
79}
80
81WriteSpec getDeltaMaxWriteSpecByteSize(UInt8 data_bytes_size)
82{
83 return getDeltaWriteSpec(getMaxValueForByteSize(data_bytes_size));
84}
85
86UInt32 getCompressedHeaderSize(UInt8 data_bytes_size)
87{
88 const UInt8 items_count_size = 4;
89 const UInt8 first_delta_bytes_size = data_bytes_size;
90
91 return items_count_size + data_bytes_size + first_delta_bytes_size;
92}
93
94UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size)
95{
96 const UInt32 items_count = uncompressed_size / data_bytes_size;
97 const auto double_delta_write_spec = getDeltaMaxWriteSpecByteSize(data_bytes_size);
98
99 const UInt32 max_item_size_bits = double_delta_write_spec.prefix_bits + double_delta_write_spec.data_bits;
100
101 // + 8 is to round up to next byte.
102 auto result = (items_count * max_item_size_bits + 7) / 8;
103
104 return result;
105}
106
107template <typename ValueType>
108UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
109{
110 // Since only unsinged int has granted 2-compliment overflow handling, we are doing math here on unsigned types.
111 // To simplify and booletproof code, we operate enforce ValueType to be unsigned too.
112 static_assert(is_unsigned_v<ValueType>, "ValueType must be unsigned.");
113 using UnsignedDeltaType = ValueType;
114
115 // We use signed delta type to turn huge unsigned values into smaller signed:
116 // ffffffff => -1
117 using SignedDeltaType = typename std::make_signed<UnsignedDeltaType>::type;
118
119 if (source_size % sizeof(ValueType) != 0)
120 throw Exception("Cannot compress, data size " + toString(source_size)
121 + " is not aligned to " + toString(sizeof(ValueType)), ErrorCodes::CANNOT_COMPRESS);
122 const char * source_end = source + source_size;
123
124 const UInt32 items_count = source_size / sizeof(ValueType);
125 unalignedStore<UInt32>(dest, items_count);
126 dest += sizeof(items_count);
127
128 ValueType prev_value{};
129 UnsignedDeltaType prev_delta{};
130
131 if (source < source_end)
132 {
133 prev_value = unalignedLoad<ValueType>(source);
134 unalignedStore<ValueType>(dest, prev_value);
135
136 source += sizeof(prev_value);
137 dest += sizeof(prev_value);
138 }
139
140 if (source < source_end)
141 {
142 const ValueType curr_value = unalignedLoad<ValueType>(source);
143
144 prev_delta = curr_value - prev_value;
145 unalignedStore<UnsignedDeltaType>(dest, prev_delta);
146
147 source += sizeof(curr_value);
148 dest += sizeof(prev_delta);
149 prev_value = curr_value;
150 }
151
152 WriteBuffer buffer(dest, getCompressedDataSize(sizeof(ValueType), source_size - sizeof(ValueType)*2));
153 BitWriter writer(buffer);
154
155 int item = 2;
156 for (; source < source_end; source += sizeof(ValueType), ++item)
157 {
158 const ValueType curr_value = unalignedLoad<ValueType>(source);
159
160 const UnsignedDeltaType delta = curr_value - prev_value;
161 const UnsignedDeltaType double_delta = delta - prev_delta;
162
163 prev_delta = delta;
164 prev_value = curr_value;
165
166 if (double_delta == 0)
167 {
168 writer.writeBits(1, 0);
169 }
170 else
171 {
172 const SignedDeltaType signed_dd = static_cast<SignedDeltaType>(double_delta);
173 const auto sign = std::signbit(signed_dd);
174 // -1 shirnks dd down to fit into number of bits, and there can't be 0, so it is OK.
175 const auto abs_value = static_cast<UnsignedDeltaType>(std::abs(signed_dd) - 1);
176 const auto write_spec = getDeltaWriteSpec(signed_dd);
177
178 writer.writeBits(write_spec.prefix_bits, write_spec.prefix);
179 writer.writeBits(1, sign);
180 writer.writeBits(write_spec.data_bits - 1, abs_value);
181 }
182 }
183
184 writer.flush();
185
186 return sizeof(items_count) + sizeof(prev_value) + sizeof(prev_delta) + buffer.count();
187}
188
189template <typename ValueType>
190void decompressDataForType(const char * source, UInt32 source_size, char * dest)
191{
192 static_assert(is_unsigned_v<ValueType>, "ValueType must be unsigned.");
193 using UnsignedDeltaType = ValueType;
194 using SignedDeltaType = typename std::make_signed<UnsignedDeltaType>::type;
195
196 const char * source_end = source + source_size;
197
198 const UInt32 items_count = unalignedLoad<UInt32>(source);
199 source += sizeof(items_count);
200
201 ValueType prev_value{};
202 UnsignedDeltaType prev_delta{};
203
204 if (source < source_end)
205 {
206 prev_value = unalignedLoad<ValueType>(source);
207 unalignedStore<ValueType>(dest, prev_value);
208
209 source += sizeof(prev_value);
210 dest += sizeof(prev_value);
211 }
212
213 if (source < source_end)
214 {
215 prev_delta = unalignedLoad<UnsignedDeltaType>(source);
216 prev_value = prev_value + static_cast<ValueType>(prev_delta);
217 unalignedStore<ValueType>(dest, prev_value);
218
219 source += sizeof(prev_delta);
220 dest += sizeof(prev_value);
221 }
222
223 ReadBufferFromMemory buffer(source, source_size - sizeof(prev_value) - sizeof(prev_delta) - sizeof(items_count));
224 BitReader reader(buffer);
225
226 // since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes,
227 // we have to keep track of items to avoid reading more that there is.
228 for (UInt32 items_read = 2; items_read < items_count && !reader.eof(); ++items_read)
229 {
230 UnsignedDeltaType double_delta = 0;
231 if (reader.readBit() == 1)
232 {
233 UInt8 i = 0;
234 for (; i < sizeof(DELTA_SIZES) - 1; ++i)
235 {
236 const auto next_bit = reader.readBit();
237 if (next_bit == 0)
238 {
239 break;
240 }
241 }
242
243 const UInt8 sign = reader.readBit();
244 SignedDeltaType signed_dd = static_cast<SignedDeltaType>(reader.readBits(DELTA_SIZES[i] - 1) + 1);
245 if (sign)
246 {
247 signed_dd *= -1;
248 }
249 double_delta = static_cast<UnsignedDeltaType>(signed_dd);
250 }
251 // else if first bit is zero, no need to read more data.
252
253 const UnsignedDeltaType delta = double_delta + prev_delta;
254 const ValueType curr_value = prev_value + delta;
255 unalignedStore<ValueType>(dest, curr_value);
256 dest += sizeof(curr_value);
257
258 prev_delta = curr_value - prev_value;
259 prev_value = curr_value;
260 }
261}
262
263UInt8 getDataBytesSize(DataTypePtr column_type)
264{
265 UInt8 data_bytes_size = 1;
266 if (column_type && column_type->haveMaximumSizeOfValue())
267 {
268 size_t max_size = column_type->getSizeOfValueInMemory();
269 if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8)
270 data_bytes_size = static_cast<UInt8>(max_size);
271 }
272 return data_bytes_size;
273}
274
275}
276
277
278CompressionCodecDoubleDelta::CompressionCodecDoubleDelta(UInt8 data_bytes_size_)
279 : data_bytes_size(data_bytes_size_)
280{
281}
282
283UInt8 CompressionCodecDoubleDelta::getMethodByte() const
284{
285 return static_cast<UInt8>(CompressionMethodByte::DoubleDelta);
286}
287
288String CompressionCodecDoubleDelta::getCodecDesc() const
289{
290 return "DoubleDelta";
291}
292
293UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const
294{
295 const auto result = 2 // common header
296 + data_bytes_size // max bytes skipped if source is not properly aligned.
297 + getCompressedHeaderSize(data_bytes_size) // data-specific header
298 + getCompressedDataSize(data_bytes_size, uncompressed_size);
299
300 return result;
301}
302
303UInt32 CompressionCodecDoubleDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const
304{
305 UInt8 bytes_to_skip = source_size % data_bytes_size;
306 dest[0] = data_bytes_size;
307 dest[1] = bytes_to_skip; /// unused (backward compatibility)
308 memcpy(&dest[2], source, bytes_to_skip);
309 size_t start_pos = 2 + bytes_to_skip;
310 UInt32 compressed_size = 0;
311
312 switch (data_bytes_size)
313 {
314 case 1:
315 compressed_size = compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
316 break;
317 case 2:
318 compressed_size = compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
319 break;
320 case 4:
321 compressed_size = compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
322 break;
323 case 8:
324 compressed_size = compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
325 break;
326 }
327
328 return 1 + 1 + compressed_size;
329}
330
331void CompressionCodecDoubleDelta::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
332{
333 if (source_size < 2)
334 throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
335
336 UInt8 bytes_size = source[0];
337 UInt8 bytes_to_skip = uncompressed_size % bytes_size;
338
339 if (UInt32(2 + bytes_to_skip) > source_size)
340 throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
341
342 memcpy(dest, &source[2], bytes_to_skip);
343 UInt32 source_size_no_header = source_size - bytes_to_skip - 2;
344 switch (bytes_size)
345 {
346 case 1:
347 decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
348 break;
349 case 2:
350 decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
351 break;
352 case 4:
353 decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
354 break;
355 case 8:
356 decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
357 break;
358 }
359}
360
361void CompressionCodecDoubleDelta::useInfoAboutType(DataTypePtr data_type)
362{
363 data_bytes_size = getDataBytesSize(data_type);
364}
365
366void registerCodecDoubleDelta(CompressionCodecFactory & factory)
367{
368 UInt8 method_code = UInt8(CompressionMethodByte::DoubleDelta);
369 factory.registerCompressionCodecWithType("DoubleDelta", method_code, [&](const ASTPtr &, DataTypePtr column_type) -> CompressionCodecPtr
370 {
371 UInt8 delta_bytes_size = getDataBytesSize(column_type);
372 return std::make_shared<CompressionCodecDoubleDelta>(delta_bytes_size);
373 });
374}
375}
376