1 | #include <Compression/CompressionCodecDelta.h> |
2 | #include <Compression/CompressionInfo.h> |
3 | #include <Compression/CompressionFactory.h> |
4 | #include <common/unaligned.h> |
5 | #include <Parsers/IAST.h> |
6 | #include <Parsers/ASTLiteral.h> |
7 | #include <IO/WriteHelpers.h> |
8 | #include <cstdlib> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int CANNOT_COMPRESS; |
17 | extern const int CANNOT_DECOMPRESS; |
18 | extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; |
19 | extern const int ILLEGAL_CODEC_PARAMETER; |
20 | } |
21 | |
22 | CompressionCodecDelta::CompressionCodecDelta(UInt8 delta_bytes_size_) |
23 | : delta_bytes_size(delta_bytes_size_) |
24 | { |
25 | } |
26 | |
27 | UInt8 CompressionCodecDelta::getMethodByte() const |
28 | { |
29 | return static_cast<UInt8>(CompressionMethodByte::Delta); |
30 | } |
31 | |
32 | String CompressionCodecDelta::getCodecDesc() const |
33 | { |
34 | return "Delta(" + toString(delta_bytes_size) + ")" ; |
35 | } |
36 | |
37 | namespace |
38 | { |
39 | |
40 | template <typename T> |
41 | void compressDataForType(const char * source, UInt32 source_size, char * dest) |
42 | { |
43 | if (source_size % sizeof(T) != 0) |
44 | throw Exception("Cannot delta compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS); |
45 | |
46 | T prev_src{}; |
47 | const char * source_end = source + source_size; |
48 | while (source < source_end) |
49 | { |
50 | T curr_src = unalignedLoad<T>(source); |
51 | unalignedStore<T>(dest, curr_src - prev_src); |
52 | prev_src = curr_src; |
53 | |
54 | source += sizeof(T); |
55 | dest += sizeof(T); |
56 | } |
57 | } |
58 | |
59 | template <typename T> |
60 | void decompressDataForType(const char * source, UInt32 source_size, char * dest) |
61 | { |
62 | if (source_size % sizeof(T) != 0) |
63 | throw Exception("Cannot delta decompress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_DECOMPRESS); |
64 | |
65 | T accumulator{}; |
66 | const char * source_end = source + source_size; |
67 | while (source < source_end) |
68 | { |
69 | accumulator += unalignedLoad<T>(source); |
70 | unalignedStore<T>(dest, accumulator); |
71 | |
72 | source += sizeof(T); |
73 | dest += sizeof(T); |
74 | } |
75 | } |
76 | |
77 | } |
78 | |
79 | UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const |
80 | { |
81 | UInt8 bytes_to_skip = source_size % delta_bytes_size; |
82 | dest[0] = delta_bytes_size; |
83 | dest[1] = bytes_to_skip; /// unused (backward compatibility) |
84 | memcpy(&dest[2], source, bytes_to_skip); |
85 | size_t start_pos = 2 + bytes_to_skip; |
86 | switch (delta_bytes_size) |
87 | { |
88 | case 1: |
89 | compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
90 | break; |
91 | case 2: |
92 | compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
93 | break; |
94 | case 4: |
95 | compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
96 | break; |
97 | case 8: |
98 | compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]); |
99 | break; |
100 | } |
101 | return 1 + 1 + source_size; |
102 | } |
103 | |
104 | void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const |
105 | { |
106 | if (source_size < 2) |
107 | throw Exception("Cannot decompress. File has wrong header" , ErrorCodes::CANNOT_DECOMPRESS); |
108 | |
109 | UInt8 bytes_size = source[0]; |
110 | UInt8 bytes_to_skip = uncompressed_size % bytes_size; |
111 | |
112 | if (UInt32(2 + bytes_to_skip) > source_size) |
113 | throw Exception("Cannot decompress. File has wrong header" , ErrorCodes::CANNOT_DECOMPRESS); |
114 | |
115 | memcpy(dest, &source[2], bytes_to_skip); |
116 | UInt32 = source_size - bytes_to_skip - 2; |
117 | switch (bytes_size) |
118 | { |
119 | case 1: |
120 | decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
121 | break; |
122 | case 2: |
123 | decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
124 | break; |
125 | case 4: |
126 | decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
127 | break; |
128 | case 8: |
129 | decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]); |
130 | break; |
131 | } |
132 | } |
133 | |
134 | namespace |
135 | { |
136 | |
137 | UInt8 getDeltaBytesSize(DataTypePtr column_type) |
138 | { |
139 | UInt8 delta_bytes_size = 1; |
140 | if (column_type && column_type->haveMaximumSizeOfValue()) |
141 | { |
142 | size_t max_size = column_type->getSizeOfValueInMemory(); |
143 | if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8) |
144 | delta_bytes_size = static_cast<UInt8>(max_size); |
145 | } |
146 | return delta_bytes_size; |
147 | } |
148 | |
149 | } |
150 | |
151 | void CompressionCodecDelta::useInfoAboutType(DataTypePtr data_type) |
152 | { |
153 | delta_bytes_size = getDeltaBytesSize(data_type); |
154 | } |
155 | |
156 | void registerCodecDelta(CompressionCodecFactory & factory) |
157 | { |
158 | UInt8 method_code = UInt8(CompressionMethodByte::Delta); |
159 | factory.registerCompressionCodecWithType("Delta" , method_code, [&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr |
160 | { |
161 | UInt8 delta_bytes_size = getDeltaBytesSize(column_type); |
162 | if (arguments && !arguments->children.empty()) |
163 | { |
164 | if (arguments->children.size() > 1) |
165 | throw Exception("Delta codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE); |
166 | |
167 | const auto children = arguments->children; |
168 | const auto * literal = children[0]->as<ASTLiteral>(); |
169 | size_t user_bytes_size = literal->value.safeGet<UInt64>(); |
170 | if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8) |
171 | throw Exception("Delta value for delta codec can be 1, 2, 4 or 8, given " + toString(user_bytes_size), ErrorCodes::ILLEGAL_CODEC_PARAMETER); |
172 | delta_bytes_size = static_cast<UInt8>(user_bytes_size); |
173 | } |
174 | return std::make_shared<CompressionCodecDelta>(delta_bytes_size); |
175 | }); |
176 | } |
177 | } |
178 | |