1 | #include "duckdb/common/compressed_file_system.hpp" |
2 | |
3 | namespace duckdb { |
4 | |
5 | StreamWrapper::~StreamWrapper() { |
6 | } |
7 | |
8 | CompressedFile::CompressedFile(CompressedFileSystem &fs, unique_ptr<FileHandle> child_handle_p, const string &path) |
9 | : FileHandle(fs, path), compressed_fs(fs), child_handle(std::move(child_handle_p)) { |
10 | } |
11 | |
12 | CompressedFile::~CompressedFile() { |
13 | CompressedFile::Close(); |
14 | } |
15 | |
16 | void CompressedFile::Initialize(bool write) { |
17 | Close(); |
18 | |
19 | this->write = write; |
20 | stream_data.in_buf_size = compressed_fs.InBufferSize(); |
21 | stream_data.out_buf_size = compressed_fs.OutBufferSize(); |
22 | stream_data.in_buff = make_unsafe_uniq_array<data_t>(n: stream_data.in_buf_size); |
23 | stream_data.in_buff_start = stream_data.in_buff.get(); |
24 | stream_data.in_buff_end = stream_data.in_buff.get(); |
25 | stream_data.out_buff = make_unsafe_uniq_array<data_t>(n: stream_data.out_buf_size); |
26 | stream_data.out_buff_start = stream_data.out_buff.get(); |
27 | stream_data.out_buff_end = stream_data.out_buff.get(); |
28 | |
29 | stream_wrapper = compressed_fs.CreateStream(); |
30 | stream_wrapper->Initialize(file&: *this, write); |
31 | } |
32 | |
33 | int64_t CompressedFile::ReadData(void *buffer, int64_t remaining) { |
34 | idx_t total_read = 0; |
35 | while (true) { |
36 | // first check if there are input bytes available in the output buffers |
37 | if (stream_data.out_buff_start != stream_data.out_buff_end) { |
38 | // there is! copy it into the output buffer |
39 | idx_t available = MinValue<idx_t>(a: remaining, b: stream_data.out_buff_end - stream_data.out_buff_start); |
40 | memcpy(dest: data_ptr_t(buffer) + total_read, src: stream_data.out_buff_start, n: available); |
41 | |
42 | // increment the total read variables as required |
43 | stream_data.out_buff_start += available; |
44 | total_read += available; |
45 | remaining -= available; |
46 | if (remaining == 0) { |
47 | // done! read enough |
48 | return total_read; |
49 | } |
50 | } |
51 | if (!stream_wrapper) { |
52 | return total_read; |
53 | } |
54 | |
55 | // ran out of buffer: read more data from the child stream |
56 | stream_data.out_buff_start = stream_data.out_buff.get(); |
57 | stream_data.out_buff_end = stream_data.out_buff.get(); |
58 | D_ASSERT(stream_data.in_buff_start <= stream_data.in_buff_end); |
59 | D_ASSERT(stream_data.in_buff_end <= stream_data.in_buff_start + stream_data.in_buf_size); |
60 | |
61 | // read more input when requested and still data in the input stream |
62 | if (stream_data.refresh && (stream_data.in_buff_end == stream_data.in_buff.get() + stream_data.in_buf_size)) { |
63 | auto bufrem = stream_data.in_buff_end - stream_data.in_buff_start; |
64 | // buffer not empty, move remaining bytes to the beginning |
65 | memmove(dest: stream_data.in_buff.get(), src: stream_data.in_buff_start, n: bufrem); |
66 | stream_data.in_buff_start = stream_data.in_buff.get(); |
67 | // refill the rest of input buffer |
68 | auto sz = child_handle->Read(buffer: stream_data.in_buff_start + bufrem, nr_bytes: stream_data.in_buf_size - bufrem); |
69 | stream_data.in_buff_end = stream_data.in_buff_start + bufrem + sz; |
70 | if (sz <= 0) { |
71 | stream_wrapper.reset(); |
72 | break; |
73 | } |
74 | } |
75 | |
76 | // read more input if none available |
77 | if (stream_data.in_buff_start == stream_data.in_buff_end) { |
78 | // empty input buffer: refill from the start |
79 | stream_data.in_buff_start = stream_data.in_buff.get(); |
80 | stream_data.in_buff_end = stream_data.in_buff_start; |
81 | auto sz = child_handle->Read(buffer: stream_data.in_buff.get(), nr_bytes: stream_data.in_buf_size); |
82 | if (sz <= 0) { |
83 | stream_wrapper.reset(); |
84 | break; |
85 | } |
86 | stream_data.in_buff_end = stream_data.in_buff_start + sz; |
87 | } |
88 | |
89 | auto finished = stream_wrapper->Read(stream_data); |
90 | if (finished) { |
91 | stream_wrapper.reset(); |
92 | } |
93 | } |
94 | return total_read; |
95 | } |
96 | |
97 | int64_t CompressedFile::WriteData(data_ptr_t buffer, int64_t nr_bytes) { |
98 | stream_wrapper->Write(file&: *this, stream_data, buffer, nr_bytes); |
99 | return nr_bytes; |
100 | } |
101 | |
102 | void CompressedFile::Close() { |
103 | if (stream_wrapper) { |
104 | stream_wrapper->Close(); |
105 | stream_wrapper.reset(); |
106 | } |
107 | stream_data.in_buff.reset(); |
108 | stream_data.out_buff.reset(); |
109 | stream_data.out_buff_start = nullptr; |
110 | stream_data.out_buff_end = nullptr; |
111 | stream_data.in_buff_start = nullptr; |
112 | stream_data.in_buff_end = nullptr; |
113 | stream_data.in_buf_size = 0; |
114 | stream_data.out_buf_size = 0; |
115 | } |
116 | |
117 | int64_t CompressedFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) { |
118 | auto &compressed_file = handle.Cast<CompressedFile>(); |
119 | return compressed_file.ReadData(buffer, remaining: nr_bytes); |
120 | } |
121 | |
122 | int64_t CompressedFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) { |
123 | auto &compressed_file = handle.Cast<CompressedFile>(); |
124 | return compressed_file.WriteData(buffer: data_ptr_cast(src: buffer), nr_bytes); |
125 | } |
126 | |
127 | void CompressedFileSystem::Reset(FileHandle &handle) { |
128 | auto &compressed_file = handle.Cast<CompressedFile>(); |
129 | compressed_file.child_handle->Reset(); |
130 | compressed_file.Initialize(write: compressed_file.write); |
131 | } |
132 | |
133 | int64_t CompressedFileSystem::GetFileSize(FileHandle &handle) { |
134 | auto &compressed_file = handle.Cast<CompressedFile>(); |
135 | return compressed_file.child_handle->GetFileSize(); |
136 | } |
137 | |
138 | bool CompressedFileSystem::OnDiskFile(FileHandle &handle) { |
139 | auto &compressed_file = handle.Cast<CompressedFile>(); |
140 | return compressed_file.child_handle->OnDiskFile(); |
141 | } |
142 | |
143 | bool CompressedFileSystem::CanSeek() { |
144 | return false; |
145 | } |
146 | |
147 | } // namespace duckdb |
148 | |