1#include "CachedCompressedReadBuffer.h"
2
3#include <IO/createReadBufferFromFileBase.h>
4#include <IO/WriteHelpers.h>
5#include <Compression/CompressionInfo.h>
6#include <Compression/LZ4_decompress_faster.h>
7
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int SEEK_POSITION_OUT_OF_BOUND;
15}
16
17
18void CachedCompressedReadBuffer::initInput()
19{
20 if (!file_in)
21 {
22 file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
23 compressed_in = file_in.get();
24
25 if (profile_callback)
26 file_in->setProfileCallback(profile_callback, clock_type);
27 }
28}
29
30
31bool CachedCompressedReadBuffer::nextImpl()
32{
33
34 /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
35 UInt128 key = cache->hash(path, file_pos);
36 owned_cell = cache->get(key);
37
38 if (!owned_cell)
39 {
40 /// If not, read it from the file.
41 initInput();
42 file_in->seek(file_pos);
43
44 owned_cell = std::make_shared<UncompressedCacheCell>();
45
46 size_t size_decompressed;
47 size_t size_compressed_without_checksum;
48 owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
49
50 if (owned_cell->compressed_size)
51 {
52 owned_cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer();
53 owned_cell->data.resize(size_decompressed + owned_cell->additional_bytes);
54 decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
55
56 }
57
58 /// Put data into cache.
59 /// NOTE: Even if we don't read anything (compressed_size == 0)
60 /// because we can reuse this information and don't reopen file in future
61 cache->set(key, owned_cell);
62 }
63
64 if (owned_cell->data.size() == 0)
65 return false;
66
67 working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);
68
69 file_pos += owned_cell->compressed_size;
70
71 return true;
72}
73
74
75CachedCompressedReadBuffer::CachedCompressedReadBuffer(
76 const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
77 size_t buf_size_)
78 : ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
79 aio_threshold(aio_threshold_), file_pos(0)
80{
81}
82
83
84void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
85{
86 if (owned_cell &&
87 offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
88 offset_in_decompressed_block <= working_buffer.size())
89 {
90 bytes += offset();
91 pos = working_buffer.begin() + offset_in_decompressed_block;
92 bytes -= offset();
93 }
94 else
95 {
96 file_pos = offset_in_compressed_file;
97
98 bytes += offset();
99 nextImpl();
100
101 if (offset_in_decompressed_block > working_buffer.size())
102 throw Exception("Seek position is beyond the decompressed block"
103 " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
104 ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
105
106 pos = working_buffer.begin() + offset_in_decompressed_block;
107 bytes -= offset();
108 }
109}
110
111}
112