1#include <Compression/CompressionCodecDelta.h>
2#include <Compression/CompressionInfo.h>
3#include <Compression/CompressionFactory.h>
4#include <common/unaligned.h>
5#include <Parsers/IAST.h>
6#include <Parsers/ASTLiteral.h>
7#include <IO/WriteHelpers.h>
8#include <cstdlib>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16extern const int CANNOT_COMPRESS;
17extern const int CANNOT_DECOMPRESS;
18extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
19extern const int ILLEGAL_CODEC_PARAMETER;
20}
21
22CompressionCodecDelta::CompressionCodecDelta(UInt8 delta_bytes_size_)
23 : delta_bytes_size(delta_bytes_size_)
24{
25}
26
27UInt8 CompressionCodecDelta::getMethodByte() const
28{
29 return static_cast<UInt8>(CompressionMethodByte::Delta);
30}
31
32String CompressionCodecDelta::getCodecDesc() const
33{
34 return "Delta(" + toString(delta_bytes_size) + ")";
35}
36
37namespace
38{
39
40template <typename T>
41void compressDataForType(const char * source, UInt32 source_size, char * dest)
42{
43 if (source_size % sizeof(T) != 0)
44 throw Exception("Cannot delta compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS);
45
46 T prev_src{};
47 const char * source_end = source + source_size;
48 while (source < source_end)
49 {
50 T curr_src = unalignedLoad<T>(source);
51 unalignedStore<T>(dest, curr_src - prev_src);
52 prev_src = curr_src;
53
54 source += sizeof(T);
55 dest += sizeof(T);
56 }
57}
58
59template <typename T>
60void decompressDataForType(const char * source, UInt32 source_size, char * dest)
61{
62 if (source_size % sizeof(T) != 0)
63 throw Exception("Cannot delta decompress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_DECOMPRESS);
64
65 T accumulator{};
66 const char * source_end = source + source_size;
67 while (source < source_end)
68 {
69 accumulator += unalignedLoad<T>(source);
70 unalignedStore<T>(dest, accumulator);
71
72 source += sizeof(T);
73 dest += sizeof(T);
74 }
75}
76
77}
78
79UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const
80{
81 UInt8 bytes_to_skip = source_size % delta_bytes_size;
82 dest[0] = delta_bytes_size;
83 dest[1] = bytes_to_skip; /// unused (backward compatibility)
84 memcpy(&dest[2], source, bytes_to_skip);
85 size_t start_pos = 2 + bytes_to_skip;
86 switch (delta_bytes_size)
87 {
88 case 1:
89 compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
90 break;
91 case 2:
92 compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
93 break;
94 case 4:
95 compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
96 break;
97 case 8:
98 compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
99 break;
100 }
101 return 1 + 1 + source_size;
102}
103
104void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
105{
106 if (source_size < 2)
107 throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
108
109 UInt8 bytes_size = source[0];
110 UInt8 bytes_to_skip = uncompressed_size % bytes_size;
111
112 if (UInt32(2 + bytes_to_skip) > source_size)
113 throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
114
115 memcpy(dest, &source[2], bytes_to_skip);
116 UInt32 source_size_no_header = source_size - bytes_to_skip - 2;
117 switch (bytes_size)
118 {
119 case 1:
120 decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
121 break;
122 case 2:
123 decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
124 break;
125 case 4:
126 decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
127 break;
128 case 8:
129 decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
130 break;
131 }
132}
133
134namespace
135{
136
137UInt8 getDeltaBytesSize(DataTypePtr column_type)
138{
139 UInt8 delta_bytes_size = 1;
140 if (column_type && column_type->haveMaximumSizeOfValue())
141 {
142 size_t max_size = column_type->getSizeOfValueInMemory();
143 if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8)
144 delta_bytes_size = static_cast<UInt8>(max_size);
145 }
146 return delta_bytes_size;
147}
148
149}
150
151void CompressionCodecDelta::useInfoAboutType(DataTypePtr data_type)
152{
153 delta_bytes_size = getDeltaBytesSize(data_type);
154}
155
156void registerCodecDelta(CompressionCodecFactory & factory)
157{
158 UInt8 method_code = UInt8(CompressionMethodByte::Delta);
159 factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr
160 {
161 UInt8 delta_bytes_size = getDeltaBytesSize(column_type);
162 if (arguments && !arguments->children.empty())
163 {
164 if (arguments->children.size() > 1)
165 throw Exception("Delta codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
166
167 const auto children = arguments->children;
168 const auto * literal = children[0]->as<ASTLiteral>();
169 size_t user_bytes_size = literal->value.safeGet<UInt64>();
170 if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
171 throw Exception("Delta value for delta codec can be 1, 2, 4 or 8, given " + toString(user_bytes_size), ErrorCodes::ILLEGAL_CODEC_PARAMETER);
172 delta_bytes_size = static_cast<UInt8>(user_bytes_size);
173 }
174 return std::make_shared<CompressionCodecDelta>(delta_bytes_size);
175 });
176}
177}
178