| 1 | #include "duckdb/common/compressed_file_system.hpp" |
| 2 | |
| 3 | namespace duckdb { |
| 4 | |
| 5 | StreamWrapper::~StreamWrapper() { |
| 6 | } |
| 7 | |
| 8 | CompressedFile::CompressedFile(CompressedFileSystem &fs, unique_ptr<FileHandle> child_handle_p, const string &path) |
| 9 | : FileHandle(fs, path), compressed_fs(fs), child_handle(std::move(child_handle_p)) { |
| 10 | } |
| 11 | |
| 12 | CompressedFile::~CompressedFile() { |
| 13 | CompressedFile::Close(); |
| 14 | } |
| 15 | |
| 16 | void CompressedFile::Initialize(bool write) { |
| 17 | Close(); |
| 18 | |
| 19 | this->write = write; |
| 20 | stream_data.in_buf_size = compressed_fs.InBufferSize(); |
| 21 | stream_data.out_buf_size = compressed_fs.OutBufferSize(); |
| 22 | stream_data.in_buff = make_unsafe_uniq_array<data_t>(n: stream_data.in_buf_size); |
| 23 | stream_data.in_buff_start = stream_data.in_buff.get(); |
| 24 | stream_data.in_buff_end = stream_data.in_buff.get(); |
| 25 | stream_data.out_buff = make_unsafe_uniq_array<data_t>(n: stream_data.out_buf_size); |
| 26 | stream_data.out_buff_start = stream_data.out_buff.get(); |
| 27 | stream_data.out_buff_end = stream_data.out_buff.get(); |
| 28 | |
| 29 | stream_wrapper = compressed_fs.CreateStream(); |
| 30 | stream_wrapper->Initialize(file&: *this, write); |
| 31 | } |
| 32 | |
| 33 | int64_t CompressedFile::ReadData(void *buffer, int64_t remaining) { |
| 34 | idx_t total_read = 0; |
| 35 | while (true) { |
| 36 | // first check if there are input bytes available in the output buffers |
| 37 | if (stream_data.out_buff_start != stream_data.out_buff_end) { |
| 38 | // there is! copy it into the output buffer |
| 39 | idx_t available = MinValue<idx_t>(a: remaining, b: stream_data.out_buff_end - stream_data.out_buff_start); |
| 40 | memcpy(dest: data_ptr_t(buffer) + total_read, src: stream_data.out_buff_start, n: available); |
| 41 | |
| 42 | // increment the total read variables as required |
| 43 | stream_data.out_buff_start += available; |
| 44 | total_read += available; |
| 45 | remaining -= available; |
| 46 | if (remaining == 0) { |
| 47 | // done! read enough |
| 48 | return total_read; |
| 49 | } |
| 50 | } |
| 51 | if (!stream_wrapper) { |
| 52 | return total_read; |
| 53 | } |
| 54 | |
| 55 | // ran out of buffer: read more data from the child stream |
| 56 | stream_data.out_buff_start = stream_data.out_buff.get(); |
| 57 | stream_data.out_buff_end = stream_data.out_buff.get(); |
| 58 | D_ASSERT(stream_data.in_buff_start <= stream_data.in_buff_end); |
| 59 | D_ASSERT(stream_data.in_buff_end <= stream_data.in_buff_start + stream_data.in_buf_size); |
| 60 | |
| 61 | // read more input when requested and still data in the input stream |
| 62 | if (stream_data.refresh && (stream_data.in_buff_end == stream_data.in_buff.get() + stream_data.in_buf_size)) { |
| 63 | auto bufrem = stream_data.in_buff_end - stream_data.in_buff_start; |
| 64 | // buffer not empty, move remaining bytes to the beginning |
| 65 | memmove(dest: stream_data.in_buff.get(), src: stream_data.in_buff_start, n: bufrem); |
| 66 | stream_data.in_buff_start = stream_data.in_buff.get(); |
| 67 | // refill the rest of input buffer |
| 68 | auto sz = child_handle->Read(buffer: stream_data.in_buff_start + bufrem, nr_bytes: stream_data.in_buf_size - bufrem); |
| 69 | stream_data.in_buff_end = stream_data.in_buff_start + bufrem + sz; |
| 70 | if (sz <= 0) { |
| 71 | stream_wrapper.reset(); |
| 72 | break; |
| 73 | } |
| 74 | } |
| 75 | |
| 76 | // read more input if none available |
| 77 | if (stream_data.in_buff_start == stream_data.in_buff_end) { |
| 78 | // empty input buffer: refill from the start |
| 79 | stream_data.in_buff_start = stream_data.in_buff.get(); |
| 80 | stream_data.in_buff_end = stream_data.in_buff_start; |
| 81 | auto sz = child_handle->Read(buffer: stream_data.in_buff.get(), nr_bytes: stream_data.in_buf_size); |
| 82 | if (sz <= 0) { |
| 83 | stream_wrapper.reset(); |
| 84 | break; |
| 85 | } |
| 86 | stream_data.in_buff_end = stream_data.in_buff_start + sz; |
| 87 | } |
| 88 | |
| 89 | auto finished = stream_wrapper->Read(stream_data); |
| 90 | if (finished) { |
| 91 | stream_wrapper.reset(); |
| 92 | } |
| 93 | } |
| 94 | return total_read; |
| 95 | } |
| 96 | |
| 97 | int64_t CompressedFile::WriteData(data_ptr_t buffer, int64_t nr_bytes) { |
| 98 | stream_wrapper->Write(file&: *this, stream_data, buffer, nr_bytes); |
| 99 | return nr_bytes; |
| 100 | } |
| 101 | |
| 102 | void CompressedFile::Close() { |
| 103 | if (stream_wrapper) { |
| 104 | stream_wrapper->Close(); |
| 105 | stream_wrapper.reset(); |
| 106 | } |
| 107 | stream_data.in_buff.reset(); |
| 108 | stream_data.out_buff.reset(); |
| 109 | stream_data.out_buff_start = nullptr; |
| 110 | stream_data.out_buff_end = nullptr; |
| 111 | stream_data.in_buff_start = nullptr; |
| 112 | stream_data.in_buff_end = nullptr; |
| 113 | stream_data.in_buf_size = 0; |
| 114 | stream_data.out_buf_size = 0; |
| 115 | } |
| 116 | |
| 117 | int64_t CompressedFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) { |
| 118 | auto &compressed_file = handle.Cast<CompressedFile>(); |
| 119 | return compressed_file.ReadData(buffer, remaining: nr_bytes); |
| 120 | } |
| 121 | |
| 122 | int64_t CompressedFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) { |
| 123 | auto &compressed_file = handle.Cast<CompressedFile>(); |
| 124 | return compressed_file.WriteData(buffer: data_ptr_cast(src: buffer), nr_bytes); |
| 125 | } |
| 126 | |
| 127 | void CompressedFileSystem::Reset(FileHandle &handle) { |
| 128 | auto &compressed_file = handle.Cast<CompressedFile>(); |
| 129 | compressed_file.child_handle->Reset(); |
| 130 | compressed_file.Initialize(write: compressed_file.write); |
| 131 | } |
| 132 | |
| 133 | int64_t CompressedFileSystem::GetFileSize(FileHandle &handle) { |
| 134 | auto &compressed_file = handle.Cast<CompressedFile>(); |
| 135 | return compressed_file.child_handle->GetFileSize(); |
| 136 | } |
| 137 | |
| 138 | bool CompressedFileSystem::OnDiskFile(FileHandle &handle) { |
| 139 | auto &compressed_file = handle.Cast<CompressedFile>(); |
| 140 | return compressed_file.child_handle->OnDiskFile(); |
| 141 | } |
| 142 | |
| 143 | bool CompressedFileSystem::CanSeek() { |
| 144 | return false; |
| 145 | } |
| 146 | |
| 147 | } // namespace duckdb |
| 148 | |