1#include <memory>
2#include <city.h>
3#include <string.h>
4
5#include <common/unaligned.h>
6#include <Core/Types.h>
7
8#include "CompressedWriteBuffer.h"
9#include <Compression/CompressionFactory.h>
10
11
12namespace DB
13{
14
15namespace ErrorCodes
16{
17 extern const int CANNOT_COMPRESS;
18 extern const int UNKNOWN_COMPRESSION_METHOD;
19}
20
21static constexpr auto CHECKSUM_SIZE{sizeof(CityHash_v1_0_2::uint128)};
22
23void CompressedWriteBuffer::nextImpl()
24{
25 if (!offset())
26 return;
27
28 size_t decompressed_size = offset();
29 UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);
30 compressed_buffer.resize(compressed_reserve_size);
31 UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
32
33 CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
34 out.write(reinterpret_cast<const char *>(&checksum), CHECKSUM_SIZE);
35 out.write(compressed_buffer.data(), compressed_size);
36}
37
38
39CompressedWriteBuffer::CompressedWriteBuffer(
40 WriteBuffer & out_,
41 CompressionCodecPtr codec_,
42 size_t buf_size)
43 : BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_))
44{
45}
46
47CompressedWriteBuffer::~CompressedWriteBuffer()
48{
49 try
50 {
51 next();
52 }
53 catch (...)
54 {
55 tryLogCurrentException(__PRETTY_FUNCTION__);
56 }
57}
58
59}
60