1#include "duckdb/common/compressed_file_system.hpp"
2
3namespace duckdb {
4
5StreamWrapper::~StreamWrapper() {
6}
7
8CompressedFile::CompressedFile(CompressedFileSystem &fs, unique_ptr<FileHandle> child_handle_p, const string &path)
9 : FileHandle(fs, path), compressed_fs(fs), child_handle(std::move(child_handle_p)) {
10}
11
12CompressedFile::~CompressedFile() {
13 CompressedFile::Close();
14}
15
16void CompressedFile::Initialize(bool write) {
17 Close();
18
19 this->write = write;
20 stream_data.in_buf_size = compressed_fs.InBufferSize();
21 stream_data.out_buf_size = compressed_fs.OutBufferSize();
22 stream_data.in_buff = make_unsafe_uniq_array<data_t>(n: stream_data.in_buf_size);
23 stream_data.in_buff_start = stream_data.in_buff.get();
24 stream_data.in_buff_end = stream_data.in_buff.get();
25 stream_data.out_buff = make_unsafe_uniq_array<data_t>(n: stream_data.out_buf_size);
26 stream_data.out_buff_start = stream_data.out_buff.get();
27 stream_data.out_buff_end = stream_data.out_buff.get();
28
29 stream_wrapper = compressed_fs.CreateStream();
30 stream_wrapper->Initialize(file&: *this, write);
31}
32
33int64_t CompressedFile::ReadData(void *buffer, int64_t remaining) {
34 idx_t total_read = 0;
35 while (true) {
36 // first check if there are input bytes available in the output buffers
37 if (stream_data.out_buff_start != stream_data.out_buff_end) {
38 // there is! copy it into the output buffer
39 idx_t available = MinValue<idx_t>(a: remaining, b: stream_data.out_buff_end - stream_data.out_buff_start);
40 memcpy(dest: data_ptr_t(buffer) + total_read, src: stream_data.out_buff_start, n: available);
41
42 // increment the total read variables as required
43 stream_data.out_buff_start += available;
44 total_read += available;
45 remaining -= available;
46 if (remaining == 0) {
47 // done! read enough
48 return total_read;
49 }
50 }
51 if (!stream_wrapper) {
52 return total_read;
53 }
54
55 // ran out of buffer: read more data from the child stream
56 stream_data.out_buff_start = stream_data.out_buff.get();
57 stream_data.out_buff_end = stream_data.out_buff.get();
58 D_ASSERT(stream_data.in_buff_start <= stream_data.in_buff_end);
59 D_ASSERT(stream_data.in_buff_end <= stream_data.in_buff_start + stream_data.in_buf_size);
60
61 // read more input when requested and still data in the input stream
62 if (stream_data.refresh && (stream_data.in_buff_end == stream_data.in_buff.get() + stream_data.in_buf_size)) {
63 auto bufrem = stream_data.in_buff_end - stream_data.in_buff_start;
64 // buffer not empty, move remaining bytes to the beginning
65 memmove(dest: stream_data.in_buff.get(), src: stream_data.in_buff_start, n: bufrem);
66 stream_data.in_buff_start = stream_data.in_buff.get();
67 // refill the rest of input buffer
68 auto sz = child_handle->Read(buffer: stream_data.in_buff_start + bufrem, nr_bytes: stream_data.in_buf_size - bufrem);
69 stream_data.in_buff_end = stream_data.in_buff_start + bufrem + sz;
70 if (sz <= 0) {
71 stream_wrapper.reset();
72 break;
73 }
74 }
75
76 // read more input if none available
77 if (stream_data.in_buff_start == stream_data.in_buff_end) {
78 // empty input buffer: refill from the start
79 stream_data.in_buff_start = stream_data.in_buff.get();
80 stream_data.in_buff_end = stream_data.in_buff_start;
81 auto sz = child_handle->Read(buffer: stream_data.in_buff.get(), nr_bytes: stream_data.in_buf_size);
82 if (sz <= 0) {
83 stream_wrapper.reset();
84 break;
85 }
86 stream_data.in_buff_end = stream_data.in_buff_start + sz;
87 }
88
89 auto finished = stream_wrapper->Read(stream_data);
90 if (finished) {
91 stream_wrapper.reset();
92 }
93 }
94 return total_read;
95}
96
97int64_t CompressedFile::WriteData(data_ptr_t buffer, int64_t nr_bytes) {
98 stream_wrapper->Write(file&: *this, stream_data, buffer, nr_bytes);
99 return nr_bytes;
100}
101
102void CompressedFile::Close() {
103 if (stream_wrapper) {
104 stream_wrapper->Close();
105 stream_wrapper.reset();
106 }
107 stream_data.in_buff.reset();
108 stream_data.out_buff.reset();
109 stream_data.out_buff_start = nullptr;
110 stream_data.out_buff_end = nullptr;
111 stream_data.in_buff_start = nullptr;
112 stream_data.in_buff_end = nullptr;
113 stream_data.in_buf_size = 0;
114 stream_data.out_buf_size = 0;
115}
116
117int64_t CompressedFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
118 auto &compressed_file = handle.Cast<CompressedFile>();
119 return compressed_file.ReadData(buffer, remaining: nr_bytes);
120}
121
122int64_t CompressedFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
123 auto &compressed_file = handle.Cast<CompressedFile>();
124 return compressed_file.WriteData(buffer: data_ptr_cast(src: buffer), nr_bytes);
125}
126
127void CompressedFileSystem::Reset(FileHandle &handle) {
128 auto &compressed_file = handle.Cast<CompressedFile>();
129 compressed_file.child_handle->Reset();
130 compressed_file.Initialize(write: compressed_file.write);
131}
132
133int64_t CompressedFileSystem::GetFileSize(FileHandle &handle) {
134 auto &compressed_file = handle.Cast<CompressedFile>();
135 return compressed_file.child_handle->GetFileSize();
136}
137
138bool CompressedFileSystem::OnDiskFile(FileHandle &handle) {
139 auto &compressed_file = handle.Cast<CompressedFile>();
140 return compressed_file.child_handle->OnDiskFile();
141}
142
143bool CompressedFileSystem::CanSeek() {
144 return false;
145}
146
147} // namespace duckdb
148