1#include "CompressedReadBufferFromFile.h"
2
3#include <IO/createReadBufferFromFileBase.h>
4#include <IO/WriteHelpers.h>
5#include <Compression/CompressionInfo.h>
6#include <Compression/LZ4_decompress_faster.h>
7
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int SEEK_POSITION_OUT_OF_BOUND;
15}
16
17
18bool CompressedReadBufferFromFile::nextImpl()
19{
20 size_t size_decompressed;
21 size_t size_compressed_without_checksum;
22 size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
23 if (!size_compressed)
24 return false;
25
26 memory.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
27 working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
28
29 decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
30
31 return true;
32}
33
34
35CompressedReadBufferFromFile::CompressedReadBufferFromFile(
36 const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size)
37 : BufferWithOwnMemory<ReadBuffer>(0),
38 p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)),
39 file_in(*p_file_in)
40{
41 compressed_in = &file_in;
42}
43
44
45void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
46{
47 if (size_compressed &&
48 offset_in_compressed_file == file_in.getPositionInFile() - size_compressed &&
49 offset_in_decompressed_block <= working_buffer.size())
50 {
51 bytes += offset();
52 pos = working_buffer.begin() + offset_in_decompressed_block;
53 /// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
54 bytes -= offset();
55 }
56 else
57 {
58 file_in.seek(offset_in_compressed_file);
59
60 bytes += offset();
61 nextImpl();
62
63 if (offset_in_decompressed_block > working_buffer.size())
64 throw Exception("Seek position is beyond the decompressed block"
65 " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
66 ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
67
68 pos = working_buffer.begin() + offset_in_decompressed_block;
69 bytes -= offset();
70 }
71}
72
73
74size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
75{
76 size_t bytes_read = 0;
77
78 /// If there are unread bytes in the buffer, then we copy needed to `to`.
79 if (pos < working_buffer.end())
80 bytes_read += read(to, std::min(static_cast<size_t>(working_buffer.end() - pos), n));
81
82 /// If you need to read more - we will, if possible, decompress at once to `to`.
83 while (bytes_read < n)
84 {
85 size_t size_decompressed = 0;
86 size_t size_compressed_without_checksum = 0;
87
88 size_t new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
89 size_compressed = 0; /// file_in no longer points to the end of the block in working_buffer.
90 if (!new_size_compressed)
91 return bytes_read;
92
93 /// If the decompressed block fits entirely where it needs to be copied.
94 if (size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer() <= n - bytes_read)
95 {
96 decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
97 bytes_read += size_decompressed;
98 bytes += size_decompressed;
99 }
100 else
101 {
102 size_compressed = new_size_compressed;
103 bytes += offset();
104 memory.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
105 working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
106 pos = working_buffer.begin();
107
108 decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
109
110 bytes_read += read(to + bytes_read, n - bytes_read);
111 break;
112 }
113 }
114
115 return bytes_read;
116}
117
118}
119