1 | #include <Compression/CompressionCodecDoubleDelta.h> |
2 | #include <Compression/CompressionInfo.h> |
3 | #include <Compression/CompressionFactory.h> |
4 | #include <common/unaligned.h> |
5 | #include <Parsers/IAST_fwd.h> |
6 | |
7 | #include <IO/ReadBufferFromMemory.h> |
8 | #include <IO/BitHelpers.h> |
9 | #include <IO/WriteHelpers.h> |
10 | |
11 | #include <string.h> |
12 | #include <algorithm> |
13 | #include <cstdlib> |
14 | #include <type_traits> |
15 | #include <limits> |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | namespace ErrorCodes |
21 | { |
22 | extern const int CANNOT_COMPRESS; |
23 | extern const int CANNOT_DECOMPRESS; |
24 | } |
25 | |
26 | namespace |
27 | { |
28 | |
29 | Int64 getMaxValueForByteSize(UInt8 byte_size) |
30 | { |
31 | switch (byte_size) |
32 | { |
33 | case sizeof(UInt8): |
34 | return std::numeric_limits<Int8>::max(); |
35 | case sizeof(UInt16): |
36 | return std::numeric_limits<Int16>::max(); |
37 | case sizeof(UInt32): |
38 | return std::numeric_limits<Int32>::max(); |
39 | case sizeof(UInt64): |
40 | return std::numeric_limits<Int64>::max(); |
41 | default: |
42 | assert(false && "only 1, 2, 4 and 8 data sizes are supported" ); |
43 | } |
44 | __builtin_unreachable(); |
45 | } |
46 | |
47 | struct WriteSpec |
48 | { |
49 | const UInt8 prefix_bits; |
50 | const UInt8 prefix; |
51 | const UInt8 data_bits; |
52 | }; |
53 | |
54 | const std::array<UInt8, 5> DELTA_SIZES{7, 9, 12, 32, 64}; |
55 | |
56 | template <typename T> |
57 | WriteSpec getDeltaWriteSpec(const T & value) |
58 | { |
59 | if (value > -63 && value < 64) |
60 | { |
61 | return WriteSpec{2, 0b10, 7}; |
62 | } |
63 | else if (value > -255 && value < 256) |
64 | { |
65 | return WriteSpec{3, 0b110, 9}; |
66 | } |
67 | else if (value > -2047 && value < 2048) |
68 | { |
69 | return WriteSpec{4, 0b1110, 12}; |
70 | } |
71 | else if (value > std::numeric_limits<Int32>::min() && value < std::numeric_limits<Int32>::max()) |
72 | { |
73 | return WriteSpec{5, 0b11110, 32}; |
74 | } |
75 | else |
76 | { |
77 | return WriteSpec{5, 0b11111, 64}; |
78 | } |
79 | } |
80 | |
81 | WriteSpec getDeltaMaxWriteSpecByteSize(UInt8 data_bytes_size) |
82 | { |
83 | return getDeltaWriteSpec(getMaxValueForByteSize(data_bytes_size)); |
84 | } |
85 | |
86 | UInt32 (UInt8 data_bytes_size) |
87 | { |
88 | const UInt8 items_count_size = 4; |
89 | const UInt8 first_delta_bytes_size = data_bytes_size; |
90 | |
91 | return items_count_size + data_bytes_size + first_delta_bytes_size; |
92 | } |
93 | |
94 | UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size) |
95 | { |
96 | const UInt32 items_count = uncompressed_size / data_bytes_size; |
97 | const auto double_delta_write_spec = getDeltaMaxWriteSpecByteSize(data_bytes_size); |
98 | |
99 | const UInt32 max_item_size_bits = double_delta_write_spec.prefix_bits + double_delta_write_spec.data_bits; |
100 | |
101 | // + 8 is to round up to next byte. |
102 | auto result = (items_count * max_item_size_bits + 7) / 8; |
103 | |
104 | return result; |
105 | } |
106 | |
107 | template <typename ValueType> |
108 | UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest) |
109 | { |
110 | // Since only unsinged int has granted 2-compliment overflow handling, we are doing math here on unsigned types. |
111 | // To simplify and booletproof code, we operate enforce ValueType to be unsigned too. |
112 | static_assert(is_unsigned_v<ValueType>, "ValueType must be unsigned." ); |
113 | using UnsignedDeltaType = ValueType; |
114 | |
115 | // We use signed delta type to turn huge unsigned values into smaller signed: |
116 | // ffffffff => -1 |
117 | using SignedDeltaType = typename std::make_signed<UnsignedDeltaType>::type; |
118 | |
119 | if (source_size % sizeof(ValueType) != 0) |
120 | throw Exception("Cannot compress, data size " + toString(source_size) |
121 | + " is not aligned to " + toString(sizeof(ValueType)), ErrorCodes::CANNOT_COMPRESS); |
122 | const char * source_end = source + source_size; |
123 | |
124 | const UInt32 items_count = source_size / sizeof(ValueType); |
125 | unalignedStore<UInt32>(dest, items_count); |
126 | dest += sizeof(items_count); |
127 | |
128 | ValueType prev_value{}; |
129 | UnsignedDeltaType prev_delta{}; |
130 | |
131 | if (source < source_end) |
132 | { |
133 | prev_value = unalignedLoad<ValueType>(source); |
134 | unalignedStore<ValueType>(dest, prev_value); |
135 | |
136 | source += sizeof(prev_value); |
137 | dest += sizeof(prev_value); |
138 | } |
139 | |
140 | if (source < source_end) |
141 | { |
142 | const ValueType curr_value = unalignedLoad<ValueType>(source); |
143 | |
144 | prev_delta = curr_value - prev_value; |
145 | unalignedStore<UnsignedDeltaType>(dest, prev_delta); |
146 | |
147 | source += sizeof(curr_value); |
148 | dest += sizeof(prev_delta); |
149 | prev_value = curr_value; |
150 | } |
151 | |
152 | WriteBuffer buffer(dest, getCompressedDataSize(sizeof(ValueType), source_size - sizeof(ValueType)*2)); |
153 | BitWriter writer(buffer); |
154 | |
155 | int item = 2; |
156 | for (; source < source_end; source += sizeof(ValueType), ++item) |
157 | { |
158 | const ValueType curr_value = unalignedLoad<ValueType>(source); |
159 | |
160 | const UnsignedDeltaType delta = curr_value - prev_value; |
161 | const UnsignedDeltaType double_delta = delta - prev_delta; |
162 | |
163 | prev_delta = delta; |
164 | prev_value = curr_value; |
165 | |
166 | if (double_delta == 0) |
167 | { |
168 | writer.writeBits(1, 0); |
169 | } |
170 | else |
171 | { |
172 | const SignedDeltaType signed_dd = static_cast<SignedDeltaType>(double_delta); |
173 | const auto sign = std::signbit(signed_dd); |
174 | // -1 shirnks dd down to fit into number of bits, and there can't be 0, so it is OK. |
175 | const auto abs_value = static_cast<UnsignedDeltaType>(std::abs(signed_dd) - 1); |
176 | const auto write_spec = getDeltaWriteSpec(signed_dd); |
177 | |
178 | writer.writeBits(write_spec.prefix_bits, write_spec.prefix); |
179 | writer.writeBits(1, sign); |
180 | writer.writeBits(write_spec.data_bits - 1, abs_value); |
181 | } |
182 | } |
183 | |
184 | writer.flush(); |
185 | |
186 | return sizeof(items_count) + sizeof(prev_value) + sizeof(prev_delta) + buffer.count(); |
187 | } |
188 | |
189 | template <typename ValueType> |
190 | void decompressDataForType(const char * source, UInt32 source_size, char * dest) |
191 | { |
192 | static_assert(is_unsigned_v<ValueType>, "ValueType must be unsigned." ); |
193 | using UnsignedDeltaType = ValueType; |
194 | using SignedDeltaType = typename std::make_signed<UnsignedDeltaType>::type; |
195 | |
196 | const char * source_end = source + source_size; |
197 | |
198 | const UInt32 items_count = unalignedLoad<UInt32>(source); |
199 | source += sizeof(items_count); |
200 | |
201 | ValueType prev_value{}; |
202 | UnsignedDeltaType prev_delta{}; |
203 | |
204 | if (source < source_end) |
205 | { |
206 | prev_value = unalignedLoad<ValueType>(source); |
207 | unalignedStore<ValueType>(dest, prev_value); |
208 | |
209 | source += sizeof(prev_value); |
210 | dest += sizeof(prev_value); |
211 | } |
212 | |
213 | if (source < source_end) |
214 | { |
215 | prev_delta = unalignedLoad<UnsignedDeltaType>(source); |
216 | prev_value = prev_value + static_cast<ValueType>(prev_delta); |
217 | unalignedStore<ValueType>(dest, prev_value); |
218 | |
219 | source += sizeof(prev_delta); |
220 | dest += sizeof(prev_value); |
221 | } |
222 | |
223 | ReadBufferFromMemory buffer(source, source_size - sizeof(prev_value) - sizeof(prev_delta) - sizeof(items_count)); |
224 | BitReader reader(buffer); |
225 | |
226 | // since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes, |
227 | // we have to keep track of items to avoid reading more that there is. |
228 | for (UInt32 items_read = 2; items_read < items_count && !reader.eof(); ++items_read) |
229 | { |
230 | UnsignedDeltaType double_delta = 0; |
231 | if (reader.readBit() == 1) |
232 | { |
233 | UInt8 i = 0; |
234 | for (; i < sizeof(DELTA_SIZES) - 1; ++i) |
235 | { |
236 | const auto next_bit = reader.readBit(); |
237 | if (next_bit == 0) |
238 | { |
239 | break; |
240 | } |
241 | } |
242 | |
243 | const UInt8 sign = reader.readBit(); |
244 | SignedDeltaType signed_dd = static_cast<SignedDeltaType>(reader.readBits(DELTA_SIZES[i] - 1) + 1); |
245 | if (sign) |
246 | { |
247 | signed_dd *= -1; |
248 | } |
249 | double_delta = static_cast<UnsignedDeltaType>(signed_dd); |
250 | } |
251 | // else if first bit is zero, no need to read more data. |
252 | |
253 | const UnsignedDeltaType delta = double_delta + prev_delta; |
254 | const ValueType curr_value = prev_value + delta; |
255 | unalignedStore<ValueType>(dest, curr_value); |
256 | dest += sizeof(curr_value); |
257 | |
258 | prev_delta = curr_value - prev_value; |
259 | prev_value = curr_value; |
260 | } |
261 | } |
262 | |
263 | UInt8 getDataBytesSize(DataTypePtr column_type) |
264 | { |
265 | UInt8 data_bytes_size = 1; |
266 | if (column_type && column_type->haveMaximumSizeOfValue()) |
267 | { |
268 | size_t max_size = column_type->getSizeOfValueInMemory(); |
269 | if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8) |
270 | data_bytes_size = static_cast<UInt8>(max_size); |
271 | } |
272 | return data_bytes_size; |
273 | } |
274 | |
275 | } |
276 | |
277 | |
278 | CompressionCodecDoubleDelta::CompressionCodecDoubleDelta(UInt8 data_bytes_size_) |
279 | : data_bytes_size(data_bytes_size_) |
280 | { |
281 | } |
282 | |
283 | UInt8 CompressionCodecDoubleDelta::getMethodByte() const |
284 | { |
285 | return static_cast<UInt8>(CompressionMethodByte::DoubleDelta); |
286 | } |
287 | |
288 | String CompressionCodecDoubleDelta::getCodecDesc() const |
289 | { |
290 | return "DoubleDelta" ; |
291 | } |
292 | |
293 | UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const |
294 | { |
295 | const auto result = 2 // common header |
296 | + data_bytes_size // max bytes skipped if source is not properly aligned. |
297 | + getCompressedHeaderSize(data_bytes_size) // data-specific header |
298 | + getCompressedDataSize(data_bytes_size, uncompressed_size); |
299 | |
300 | return result; |
301 | } |
302 | |
303 | UInt32 CompressionCodecDoubleDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const |
304 | { |
305 | UInt8 bytes_to_skip = source_size % data_bytes_size; |
306 | dest[0] = data_bytes_size; |
307 | dest[1] = bytes_to_skip; /// unused (backward compatibility) |
308 | memcpy(&dest[2], source, bytes_to_skip); |
309 | size_t start_pos = 2 + bytes_to_skip; |
310 | UInt32 compressed_size = 0; |
311 | |
312 | switch (data_bytes_size) |
313 | { |
314 | case 1: |
315 | compressed_size = compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
316 | break; |
317 | case 2: |
318 | compressed_size = compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
319 | break; |
320 | case 4: |
321 | compressed_size = compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
322 | break; |
323 | case 8: |
324 | compressed_size = compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
325 | break; |
326 | } |
327 | |
328 | return 1 + 1 + compressed_size; |
329 | } |
330 | |
331 | void CompressionCodecDoubleDelta::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const |
332 | { |
333 | if (source_size < 2) |
334 | throw Exception("Cannot decompress. File has wrong header" , ErrorCodes::CANNOT_DECOMPRESS); |
335 | |
336 | UInt8 bytes_size = source[0]; |
337 | UInt8 bytes_to_skip = uncompressed_size % bytes_size; |
338 | |
339 | if (UInt32(2 + bytes_to_skip) > source_size) |
340 | throw Exception("Cannot decompress. File has wrong header" , ErrorCodes::CANNOT_DECOMPRESS); |
341 | |
342 | memcpy(dest, &source[2], bytes_to_skip); |
343 | UInt32 = source_size - bytes_to_skip - 2; |
344 | switch (bytes_size) |
345 | { |
346 | case 1: |
347 | decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
348 | break; |
349 | case 2: |
350 | decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
351 | break; |
352 | case 4: |
353 | decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
354 | break; |
355 | case 8: |
356 | decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
357 | break; |
358 | } |
359 | } |
360 | |
361 | void CompressionCodecDoubleDelta::useInfoAboutType(DataTypePtr data_type) |
362 | { |
363 | data_bytes_size = getDataBytesSize(data_type); |
364 | } |
365 | |
366 | void registerCodecDoubleDelta(CompressionCodecFactory & factory) |
367 | { |
368 | UInt8 method_code = UInt8(CompressionMethodByte::DoubleDelta); |
369 | factory.registerCompressionCodecWithType("DoubleDelta" , method_code, [&](const ASTPtr &, DataTypePtr column_type) -> CompressionCodecPtr |
370 | { |
371 | UInt8 delta_bytes_size = getDataBytesSize(column_type); |
372 | return std::make_shared<CompressionCodecDoubleDelta>(delta_bytes_size); |
373 | }); |
374 | } |
375 | } |
376 | |