1#include <Compression/CompressionCodecMultiple.h>
2#include <Compression/CompressionInfo.h>
3#include <Common/PODArray.h>
4#include <common/unaligned.h>
5#include <Compression/CompressionFactory.h>
6#include <IO/ReadHelpers.h>
7#include <IO/WriteHelpers.h>
8#include <IO/WriteBufferFromString.h>
9#include <IO/Operators.h>
10#include <Common/hex.h>
11#include <sstream>
12
13
14namespace DB
15{
16
17
18namespace ErrorCodes
19{
20extern const int UNKNOWN_CODEC;
21extern const int CORRUPTED_DATA;
22}
23
24CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_)
25 : codecs(codecs_)
26{
27}
28
29UInt8 CompressionCodecMultiple::getMethodByte() const
30{
31 return static_cast<UInt8>(CompressionMethodByte::Multiple);
32}
33
34String CompressionCodecMultiple::getCodecDesc() const
35{
36 WriteBufferFromOwnString out;
37 for (size_t idx = 0; idx < codecs.size(); ++idx)
38 {
39 if (idx != 0)
40 out << ", ";
41
42 out << codecs[idx]->getCodecDesc();
43 }
44 return out.str();
45}
46
47UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const
48{
49 UInt32 compressed_size = uncompressed_size;
50 for (auto & codec : codecs)
51 compressed_size = codec->getCompressedReserveSize(compressed_size);
52
53 /// TotalCodecs ByteForEachCodec data
54 return sizeof(UInt8) + codecs.size() + compressed_size;
55}
56
57UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const
58{
59 PODArray<char> compressed_buf;
60 PODArray<char> uncompressed_buf(source, source + source_size);
61
62 dest[0] = static_cast<UInt8>(codecs.size());
63
64 size_t codecs_byte_pos = 1;
65 for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos)
66 {
67 const auto codec = codecs[idx];
68 dest[codecs_byte_pos] = codec->getMethodByte();
69 compressed_buf.resize(codec->getCompressedReserveSize(source_size));
70
71 UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data());
72
73 uncompressed_buf.swap(compressed_buf);
74 source_size = size_compressed;
75 }
76
77 memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size);
78
79 return 1 + codecs.size() + source_size;
80}
81
82void CompressionCodecMultiple::useInfoAboutType(DataTypePtr data_type)
83{
84 for (auto & codec : codecs)
85 {
86 codec->useInfoAboutType(data_type);
87 }
88}
89
90void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const
91{
92 if (source_size < 1 || !source[0])
93 throw Exception("Wrong compression methods list", ErrorCodes::CORRUPTED_DATA);
94
95 UInt8 compression_methods_size = source[0];
96
97 PODArray<char> compressed_buf(&source[compression_methods_size + 1], &source[source_size]);
98 PODArray<char> uncompressed_buf;
99 /// Insert all data into compressed buf
100 source_size -= (compression_methods_size + 1);
101
102 for (long idx = compression_methods_size - 1; idx >= 0; --idx)
103 {
104 UInt8 compression_method = source[idx + 1];
105 const auto codec = CompressionCodecFactory::instance().get(compression_method);
106 compressed_buf.resize(compressed_buf.size() + codec->getAdditionalSizeAtTheEndOfBuffer());
107 UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data());
108
109 if (idx == 0 && uncompressed_size != decompressed_size)
110 throw Exception("Wrong final decompressed size in codec Multiple, got " + toString(uncompressed_size) +
111 ", expected " + toString(decompressed_size), ErrorCodes::CORRUPTED_DATA);
112
113 uncompressed_buf.resize(uncompressed_size + codec->getAdditionalSizeAtTheEndOfBuffer());
114 codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data());
115 uncompressed_buf.swap(compressed_buf);
116 source_size = uncompressed_size;
117 }
118
119 memcpy(dest, compressed_buf.data(), decompressed_size);
120}
121
122void registerCodecMultiple(CompressionCodecFactory & factory)
123{
124 factory.registerSimpleCompressionCodec("Multiple", static_cast<UInt8>(CompressionMethodByte::Multiple), [&] ()
125 {
126 return std::make_shared<CompressionCodecMultiple>();
127 });
128}
129
130}
131