| 1 | #include "CachedCompressedReadBuffer.h" |
| 2 | |
| 3 | #include <IO/createReadBufferFromFileBase.h> |
| 4 | #include <IO/WriteHelpers.h> |
| 5 | #include <Compression/CompressionInfo.h> |
| 6 | #include <Compression/LZ4_decompress_faster.h> |
| 7 | |
| 8 | |
| 9 | namespace DB |
| 10 | { |
| 11 | |
| 12 | namespace ErrorCodes |
| 13 | { |
| 14 | extern const int SEEK_POSITION_OUT_OF_BOUND; |
| 15 | } |
| 16 | |
| 17 | |
| 18 | void CachedCompressedReadBuffer::initInput() |
| 19 | { |
| 20 | if (!file_in) |
| 21 | { |
| 22 | file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size); |
| 23 | compressed_in = file_in.get(); |
| 24 | |
| 25 | if (profile_callback) |
| 26 | file_in->setProfileCallback(profile_callback, clock_type); |
| 27 | } |
| 28 | } |
| 29 | |
| 30 | |
| 31 | bool CachedCompressedReadBuffer::nextImpl() |
| 32 | { |
| 33 | |
| 34 | /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists. |
| 35 | UInt128 key = cache->hash(path, file_pos); |
| 36 | owned_cell = cache->get(key); |
| 37 | |
| 38 | if (!owned_cell) |
| 39 | { |
| 40 | /// If not, read it from the file. |
| 41 | initInput(); |
| 42 | file_in->seek(file_pos); |
| 43 | |
| 44 | owned_cell = std::make_shared<UncompressedCacheCell>(); |
| 45 | |
| 46 | size_t size_decompressed; |
| 47 | size_t size_compressed_without_checksum; |
| 48 | owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum); |
| 49 | |
| 50 | if (owned_cell->compressed_size) |
| 51 | { |
| 52 | owned_cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer(); |
| 53 | owned_cell->data.resize(size_decompressed + owned_cell->additional_bytes); |
| 54 | decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum); |
| 55 | |
| 56 | } |
| 57 | |
| 58 | /// Put data into cache. |
| 59 | /// NOTE: Even if we don't read anything (compressed_size == 0) |
| 60 | /// because we can reuse this information and don't reopen file in future |
| 61 | cache->set(key, owned_cell); |
| 62 | } |
| 63 | |
| 64 | if (owned_cell->data.size() == 0) |
| 65 | return false; |
| 66 | |
| 67 | working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes); |
| 68 | |
| 69 | file_pos += owned_cell->compressed_size; |
| 70 | |
| 71 | return true; |
| 72 | } |
| 73 | |
| 74 | |
| 75 | CachedCompressedReadBuffer::CachedCompressedReadBuffer( |
| 76 | const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_, |
| 77 | size_t buf_size_) |
| 78 | : ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_), |
| 79 | aio_threshold(aio_threshold_), file_pos(0) |
| 80 | { |
| 81 | } |
| 82 | |
| 83 | |
| 84 | void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) |
| 85 | { |
| 86 | if (owned_cell && |
| 87 | offset_in_compressed_file == file_pos - owned_cell->compressed_size && |
| 88 | offset_in_decompressed_block <= working_buffer.size()) |
| 89 | { |
| 90 | bytes += offset(); |
| 91 | pos = working_buffer.begin() + offset_in_decompressed_block; |
| 92 | bytes -= offset(); |
| 93 | } |
| 94 | else |
| 95 | { |
| 96 | file_pos = offset_in_compressed_file; |
| 97 | |
| 98 | bytes += offset(); |
| 99 | nextImpl(); |
| 100 | |
| 101 | if (offset_in_decompressed_block > working_buffer.size()) |
| 102 | throw Exception("Seek position is beyond the decompressed block" |
| 103 | " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")" , |
| 104 | ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); |
| 105 | |
| 106 | pos = working_buffer.begin() + offset_in_decompressed_block; |
| 107 | bytes -= offset(); |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | } |
| 112 | |