1#include <lz4.h>
2#include <string.h>
3#include <optional>
4#include <common/likely.h>
5#include <common/Types.h>
6
7#include <IO/ReadBuffer.h>
8#include <IO/ReadBufferFromFileDescriptor.h>
9#include <IO/WriteBufferFromFileDescriptor.h>
10#include <IO/MMapReadBufferFromFileDescriptor.h>
11#include <IO/HashingWriteBuffer.h>
12#include <IO/BufferWithOwnMemory.h>
13#include <Compression/CompressionInfo.h>
14#include <IO/WriteHelpers.h>
15#include <Compression/LZ4_decompress_faster.h>
16#include <IO/copyData.h>
17#include <Common/PODArray.h>
18#include <Common/Stopwatch.h>
19#include <Common/formatReadable.h>
20#include <Common/memcpySmall.h>
21#include <common/unaligned.h>
22
23
24/** for i in *.bin; do ./decompress_perf < $i > /dev/null; done
25 */
26
27namespace DB
28{
29
30namespace ErrorCodes
31{
32 extern const int UNKNOWN_COMPRESSION_METHOD;
33 extern const int TOO_LARGE_SIZE_COMPRESSED;
34 extern const int CANNOT_DECOMPRESS;
35}
36
37class FasterCompressedReadBufferBase
38{
39protected:
40 ReadBuffer * compressed_in;
41
42 /// If 'compressed_in' buffer has whole compressed block - then use it. Otherwise copy parts of data to 'own_compressed_buffer'.
43 PODArray<char> own_compressed_buffer;
44 /// Points to memory, holding compressed block.
45 char * compressed_buffer = nullptr;
46
47 ssize_t variant;
48
49 /// Variant for reference implementation of LZ4.
50 static constexpr ssize_t LZ4_REFERENCE = -3;
51
52 LZ4::StreamStatistics stream_stat;
53 LZ4::PerformanceStatistics perf_stat;
54
55 size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
56 {
57 if (compressed_in->eof())
58 return 0;
59
60 CityHash_v1_0_2::uint128 checksum;
61 compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
62
63 own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
64 compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE);
65
66 UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h
67
68 size_t & size_compressed = size_compressed_without_checksum;
69
70 if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) ||
71 method == static_cast<UInt8>(CompressionMethodByte::ZSTD) ||
72 method == static_cast<UInt8>(CompressionMethodByte::NONE))
73 {
74 size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]);
75 size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]);
76 }
77 else
78 throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
79
80 if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
81 throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
82
83 /// Is whole compressed block located in 'compressed_in' buffer?
84 if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
85 compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end())
86 {
87 compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE;
88 compressed_buffer = compressed_in->position();
89 compressed_in->position() += size_compressed;
90 }
91 else
92 {
93 own_compressed_buffer.resize(size_compressed + (variant == LZ4_REFERENCE ? 0 : LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER));
94 compressed_buffer = &own_compressed_buffer[0];
95 compressed_in->readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
96 }
97
98 return size_compressed + sizeof(checksum);
99 }
100
101 void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
102 {
103 UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h
104
105 if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
106 {
107 //LZ4::statistics(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed, stat);
108
109 if (variant == LZ4_REFERENCE)
110 {
111 if (LZ4_decompress_fast(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed) < 0)
112 throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CANNOT_DECOMPRESS);
113 }
114 else
115 LZ4::decompress(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_compressed_without_checksum, size_decompressed, perf_stat);
116 }
117 else
118 throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
119 }
120
121public:
122 /// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
123 FasterCompressedReadBufferBase(ReadBuffer * in, ssize_t variant)
124 : compressed_in(in), own_compressed_buffer(COMPRESSED_BLOCK_HEADER_SIZE), variant(variant), perf_stat(variant)
125 {
126 }
127
128 LZ4::StreamStatistics getStreamStatistics() const { return stream_stat; }
129 LZ4::PerformanceStatistics getPerformanceStatistics() const { return perf_stat; }
130};
131
132
133class FasterCompressedReadBuffer : public FasterCompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer>
134{
135private:
136 size_t size_compressed = 0;
137
138 bool nextImpl() override
139 {
140 size_t size_decompressed;
141 size_t size_compressed_without_checksum;
142 size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
143 if (!size_compressed)
144 return false;
145
146 memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
147 working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
148
149 decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
150
151 return true;
152 }
153
154public:
155 FasterCompressedReadBuffer(ReadBuffer & in_, ssize_t method)
156 : FasterCompressedReadBufferBase(&in_, method), BufferWithOwnMemory<ReadBuffer>(0)
157 {
158 }
159};
160
161}
162
163
164int main(int argc, char ** argv)
165try
166{
167 using namespace DB;
168
169 /** -3 - use reference implementation of LZ4
170 * -2 - run all algorithms in round robin fashion
171 * -1 - automatically detect best algorithm based on statistics
172 * 0..3 - run specified algorithm
173 */
174 ssize_t variant = argc < 2 ? -1 : parse<ssize_t>(argv[1]);
175
176 MMapReadBufferFromFileDescriptor in(STDIN_FILENO, 0);
177// ReadBufferFromFileDescriptor in(STDIN_FILENO);
178 FasterCompressedReadBuffer decompressing_in(in, variant);
179// WriteBufferFromFileDescriptor out(STDOUT_FILENO);
180// HashingWriteBuffer hashing_out(out);
181
182 Stopwatch watch;
183// copyData(decompressing_in, /*hashing_*/out);
184 while (!decompressing_in.eof())
185 {
186 decompressing_in.position() = decompressing_in.buffer().end();
187 decompressing_in.next();
188 }
189 watch.stop();
190
191 std::cout << std::fixed << std::setprecision(3)
192 << watch.elapsed() * 1000 / decompressing_in.count()
193 << '\n';
194
195/*
196// auto hash = hashing_out.getHash();
197
198 double seconds = watch.elapsedSeconds();
199 std::cerr << std::fixed << std::setprecision(3)
200 << "Elapsed: " << seconds
201 << ", " << formatReadableSizeWithBinarySuffix(in.count()) << " compressed"
202 << ", " << formatReadableSizeWithBinarySuffix(decompressing_in.count()) << " decompressed"
203 << ", ratio: " << static_cast<double>(decompressing_in.count()) / in.count()
204 << ", " << formatReadableSizeWithBinarySuffix(in.count() / seconds) << "/sec. compressed"
205 << ", " << formatReadableSizeWithBinarySuffix(decompressing_in.count() / seconds) << "/sec. decompressed"
206// << ", checksum: " << hash.first << "_" << hash.second
207 << "\n";
208
209// decompressing_in.getStatistics().print();
210
211 LZ4::PerformanceStatistics perf_stat = decompressing_in.getPerformanceStatistics();
212
213 std::optional<size_t> best_variant;
214 double best_variant_mean = 0;
215
216 for (size_t i = 0; i < LZ4::PerformanceStatistics::NUM_ELEMENTS; ++i)
217 {
218 const LZ4::PerformanceStatistics::Element & elem = perf_stat.data[i];
219
220 if (elem.count)
221 {
222 double mean = elem.mean();
223
224 std::cerr << "Variant " << i << ": "
225 << "count: " << elem.count
226 << ", mean ns/b: " << 1000000000.0 * mean << " (" << formatReadableSizeWithBinarySuffix(1 / mean) << "/sec.)"
227 << ", sigma ns/b: " << 1000000000.0 * elem.sigma()
228 << "\n";
229
230 if (!best_variant || mean < best_variant_mean)
231 {
232 best_variant_mean = mean;
233 best_variant = i;
234 }
235 }
236 }
237
238 if (best_variant)
239 std::cerr << "Best variant: " << *best_variant << "\n";
240*/
241
242 return 0;
243}
244catch (...)
245{
246 std::cerr << DB::getCurrentExceptionMessage(true);
247 return DB::getCurrentExceptionCode();
248}
249