1 | #include "CachedCompressedReadBuffer.h" |
2 | |
3 | #include <IO/createReadBufferFromFileBase.h> |
4 | #include <IO/WriteHelpers.h> |
5 | #include <Compression/CompressionInfo.h> |
6 | #include <Compression/LZ4_decompress_faster.h> |
7 | |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | namespace ErrorCodes |
13 | { |
14 | extern const int SEEK_POSITION_OUT_OF_BOUND; |
15 | } |
16 | |
17 | |
18 | void CachedCompressedReadBuffer::initInput() |
19 | { |
20 | if (!file_in) |
21 | { |
22 | file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size); |
23 | compressed_in = file_in.get(); |
24 | |
25 | if (profile_callback) |
26 | file_in->setProfileCallback(profile_callback, clock_type); |
27 | } |
28 | } |
29 | |
30 | |
31 | bool CachedCompressedReadBuffer::nextImpl() |
32 | { |
33 | |
34 | /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists. |
35 | UInt128 key = cache->hash(path, file_pos); |
36 | owned_cell = cache->get(key); |
37 | |
38 | if (!owned_cell) |
39 | { |
40 | /// If not, read it from the file. |
41 | initInput(); |
42 | file_in->seek(file_pos); |
43 | |
44 | owned_cell = std::make_shared<UncompressedCacheCell>(); |
45 | |
46 | size_t size_decompressed; |
47 | size_t size_compressed_without_checksum; |
48 | owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum); |
49 | |
50 | if (owned_cell->compressed_size) |
51 | { |
52 | owned_cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer(); |
53 | owned_cell->data.resize(size_decompressed + owned_cell->additional_bytes); |
54 | decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum); |
55 | |
56 | } |
57 | |
58 | /// Put data into cache. |
59 | /// NOTE: Even if we don't read anything (compressed_size == 0) |
60 | /// because we can reuse this information and don't reopen file in future |
61 | cache->set(key, owned_cell); |
62 | } |
63 | |
64 | if (owned_cell->data.size() == 0) |
65 | return false; |
66 | |
67 | working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes); |
68 | |
69 | file_pos += owned_cell->compressed_size; |
70 | |
71 | return true; |
72 | } |
73 | |
74 | |
75 | CachedCompressedReadBuffer::CachedCompressedReadBuffer( |
76 | const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_, |
77 | size_t buf_size_) |
78 | : ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_), |
79 | aio_threshold(aio_threshold_), file_pos(0) |
80 | { |
81 | } |
82 | |
83 | |
84 | void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) |
85 | { |
86 | if (owned_cell && |
87 | offset_in_compressed_file == file_pos - owned_cell->compressed_size && |
88 | offset_in_decompressed_block <= working_buffer.size()) |
89 | { |
90 | bytes += offset(); |
91 | pos = working_buffer.begin() + offset_in_decompressed_block; |
92 | bytes -= offset(); |
93 | } |
94 | else |
95 | { |
96 | file_pos = offset_in_compressed_file; |
97 | |
98 | bytes += offset(); |
99 | nextImpl(); |
100 | |
101 | if (offset_in_decompressed_block > working_buffer.size()) |
102 | throw Exception("Seek position is beyond the decompressed block" |
103 | " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")" , |
104 | ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); |
105 | |
106 | pos = working_buffer.begin() + offset_in_decompressed_block; |
107 | bytes -= offset(); |
108 | } |
109 | } |
110 | |
111 | } |
112 | |