1 | #include <Compression/CompressionCodecMultiple.h> |
2 | #include <Compression/CompressionInfo.h> |
3 | #include <Common/PODArray.h> |
4 | #include <common/unaligned.h> |
5 | #include <Compression/CompressionFactory.h> |
6 | #include <IO/ReadHelpers.h> |
7 | #include <IO/WriteHelpers.h> |
8 | #include <IO/WriteBufferFromString.h> |
9 | #include <IO/Operators.h> |
10 | #include <Common/hex.h> |
11 | #include <sstream> |
12 | |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int UNKNOWN_CODEC; |
21 | extern const int CORRUPTED_DATA; |
22 | } |
23 | |
24 | CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_) |
25 | : codecs(codecs_) |
26 | { |
27 | } |
28 | |
29 | UInt8 CompressionCodecMultiple::getMethodByte() const |
30 | { |
31 | return static_cast<UInt8>(CompressionMethodByte::Multiple); |
32 | } |
33 | |
34 | String CompressionCodecMultiple::getCodecDesc() const |
35 | { |
36 | WriteBufferFromOwnString out; |
37 | for (size_t idx = 0; idx < codecs.size(); ++idx) |
38 | { |
39 | if (idx != 0) |
40 | out << ", " ; |
41 | |
42 | out << codecs[idx]->getCodecDesc(); |
43 | } |
44 | return out.str(); |
45 | } |
46 | |
47 | UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const |
48 | { |
49 | UInt32 compressed_size = uncompressed_size; |
50 | for (auto & codec : codecs) |
51 | compressed_size = codec->getCompressedReserveSize(compressed_size); |
52 | |
53 | /// TotalCodecs ByteForEachCodec data |
54 | return sizeof(UInt8) + codecs.size() + compressed_size; |
55 | } |
56 | |
57 | UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const |
58 | { |
59 | PODArray<char> compressed_buf; |
60 | PODArray<char> uncompressed_buf(source, source + source_size); |
61 | |
62 | dest[0] = static_cast<UInt8>(codecs.size()); |
63 | |
64 | size_t codecs_byte_pos = 1; |
65 | for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos) |
66 | { |
67 | const auto codec = codecs[idx]; |
68 | dest[codecs_byte_pos] = codec->getMethodByte(); |
69 | compressed_buf.resize(codec->getCompressedReserveSize(source_size)); |
70 | |
71 | UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data()); |
72 | |
73 | uncompressed_buf.swap(compressed_buf); |
74 | source_size = size_compressed; |
75 | } |
76 | |
77 | memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size); |
78 | |
79 | return 1 + codecs.size() + source_size; |
80 | } |
81 | |
82 | void CompressionCodecMultiple::useInfoAboutType(DataTypePtr data_type) |
83 | { |
84 | for (auto & codec : codecs) |
85 | { |
86 | codec->useInfoAboutType(data_type); |
87 | } |
88 | } |
89 | |
90 | void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const |
91 | { |
92 | if (source_size < 1 || !source[0]) |
93 | throw Exception("Wrong compression methods list" , ErrorCodes::CORRUPTED_DATA); |
94 | |
95 | UInt8 compression_methods_size = source[0]; |
96 | |
97 | PODArray<char> compressed_buf(&source[compression_methods_size + 1], &source[source_size]); |
98 | PODArray<char> uncompressed_buf; |
99 | /// Insert all data into compressed buf |
100 | source_size -= (compression_methods_size + 1); |
101 | |
102 | for (long idx = compression_methods_size - 1; idx >= 0; --idx) |
103 | { |
104 | UInt8 compression_method = source[idx + 1]; |
105 | const auto codec = CompressionCodecFactory::instance().get(compression_method); |
106 | compressed_buf.resize(compressed_buf.size() + codec->getAdditionalSizeAtTheEndOfBuffer()); |
107 | UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data()); |
108 | |
109 | if (idx == 0 && uncompressed_size != decompressed_size) |
110 | throw Exception("Wrong final decompressed size in codec Multiple, got " + toString(uncompressed_size) + |
111 | ", expected " + toString(decompressed_size), ErrorCodes::CORRUPTED_DATA); |
112 | |
113 | uncompressed_buf.resize(uncompressed_size + codec->getAdditionalSizeAtTheEndOfBuffer()); |
114 | codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data()); |
115 | uncompressed_buf.swap(compressed_buf); |
116 | source_size = uncompressed_size; |
117 | } |
118 | |
119 | memcpy(dest, compressed_buf.data(), decompressed_size); |
120 | } |
121 | |
122 | void registerCodecMultiple(CompressionCodecFactory & factory) |
123 | { |
124 | factory.registerSimpleCompressionCodec("Multiple" , static_cast<UInt8>(CompressionMethodByte::Multiple), [&] () |
125 | { |
126 | return std::make_shared<CompressionCodecMultiple>(); |
127 | }); |
128 | } |
129 | |
130 | } |
131 | |