| 1 | #include <Compression/CompressionCodecMultiple.h> |
| 2 | #include <Compression/CompressionInfo.h> |
| 3 | #include <Common/PODArray.h> |
| 4 | #include <common/unaligned.h> |
| 5 | #include <Compression/CompressionFactory.h> |
| 6 | #include <IO/ReadHelpers.h> |
| 7 | #include <IO/WriteHelpers.h> |
| 8 | #include <IO/WriteBufferFromString.h> |
| 9 | #include <IO/Operators.h> |
| 10 | #include <Common/hex.h> |
| 11 | #include <sstream> |
| 12 | |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | |
| 17 | |
| 18 | namespace ErrorCodes |
| 19 | { |
| 20 | extern const int UNKNOWN_CODEC; |
| 21 | extern const int CORRUPTED_DATA; |
| 22 | } |
| 23 | |
| 24 | CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_) |
| 25 | : codecs(codecs_) |
| 26 | { |
| 27 | } |
| 28 | |
| 29 | UInt8 CompressionCodecMultiple::getMethodByte() const |
| 30 | { |
| 31 | return static_cast<UInt8>(CompressionMethodByte::Multiple); |
| 32 | } |
| 33 | |
| 34 | String CompressionCodecMultiple::getCodecDesc() const |
| 35 | { |
| 36 | WriteBufferFromOwnString out; |
| 37 | for (size_t idx = 0; idx < codecs.size(); ++idx) |
| 38 | { |
| 39 | if (idx != 0) |
| 40 | out << ", " ; |
| 41 | |
| 42 | out << codecs[idx]->getCodecDesc(); |
| 43 | } |
| 44 | return out.str(); |
| 45 | } |
| 46 | |
| 47 | UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const |
| 48 | { |
| 49 | UInt32 compressed_size = uncompressed_size; |
| 50 | for (auto & codec : codecs) |
| 51 | compressed_size = codec->getCompressedReserveSize(compressed_size); |
| 52 | |
| 53 | /// TotalCodecs ByteForEachCodec data |
| 54 | return sizeof(UInt8) + codecs.size() + compressed_size; |
| 55 | } |
| 56 | |
| 57 | UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const |
| 58 | { |
| 59 | PODArray<char> compressed_buf; |
| 60 | PODArray<char> uncompressed_buf(source, source + source_size); |
| 61 | |
| 62 | dest[0] = static_cast<UInt8>(codecs.size()); |
| 63 | |
| 64 | size_t codecs_byte_pos = 1; |
| 65 | for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos) |
| 66 | { |
| 67 | const auto codec = codecs[idx]; |
| 68 | dest[codecs_byte_pos] = codec->getMethodByte(); |
| 69 | compressed_buf.resize(codec->getCompressedReserveSize(source_size)); |
| 70 | |
| 71 | UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data()); |
| 72 | |
| 73 | uncompressed_buf.swap(compressed_buf); |
| 74 | source_size = size_compressed; |
| 75 | } |
| 76 | |
| 77 | memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size); |
| 78 | |
| 79 | return 1 + codecs.size() + source_size; |
| 80 | } |
| 81 | |
| 82 | void CompressionCodecMultiple::useInfoAboutType(DataTypePtr data_type) |
| 83 | { |
| 84 | for (auto & codec : codecs) |
| 85 | { |
| 86 | codec->useInfoAboutType(data_type); |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const |
| 91 | { |
| 92 | if (source_size < 1 || !source[0]) |
| 93 | throw Exception("Wrong compression methods list" , ErrorCodes::CORRUPTED_DATA); |
| 94 | |
| 95 | UInt8 compression_methods_size = source[0]; |
| 96 | |
| 97 | PODArray<char> compressed_buf(&source[compression_methods_size + 1], &source[source_size]); |
| 98 | PODArray<char> uncompressed_buf; |
| 99 | /// Insert all data into compressed buf |
| 100 | source_size -= (compression_methods_size + 1); |
| 101 | |
| 102 | for (long idx = compression_methods_size - 1; idx >= 0; --idx) |
| 103 | { |
| 104 | UInt8 compression_method = source[idx + 1]; |
| 105 | const auto codec = CompressionCodecFactory::instance().get(compression_method); |
| 106 | compressed_buf.resize(compressed_buf.size() + codec->getAdditionalSizeAtTheEndOfBuffer()); |
| 107 | UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data()); |
| 108 | |
| 109 | if (idx == 0 && uncompressed_size != decompressed_size) |
| 110 | throw Exception("Wrong final decompressed size in codec Multiple, got " + toString(uncompressed_size) + |
| 111 | ", expected " + toString(decompressed_size), ErrorCodes::CORRUPTED_DATA); |
| 112 | |
| 113 | uncompressed_buf.resize(uncompressed_size + codec->getAdditionalSizeAtTheEndOfBuffer()); |
| 114 | codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data()); |
| 115 | uncompressed_buf.swap(compressed_buf); |
| 116 | source_size = uncompressed_size; |
| 117 | } |
| 118 | |
| 119 | memcpy(dest, compressed_buf.data(), decompressed_size); |
| 120 | } |
| 121 | |
| 122 | void registerCodecMultiple(CompressionCodecFactory & factory) |
| 123 | { |
| 124 | factory.registerSimpleCompressionCodec("Multiple" , static_cast<UInt8>(CompressionMethodByte::Multiple), [&] () |
| 125 | { |
| 126 | return std::make_shared<CompressionCodecMultiple>(); |
| 127 | }); |
| 128 | } |
| 129 | |
| 130 | } |
| 131 | |