1 | #include "duckdb/common/gzip_file_system.hpp" |
2 | #include "duckdb/common/exception.hpp" |
3 | #include "duckdb/common/file_system.hpp" |
4 | |
5 | #include "miniz.hpp" |
6 | #include "miniz_wrapper.hpp" |
7 | |
8 | #include "duckdb/common/limits.hpp" |
9 | |
10 | namespace duckdb { |
11 | |
12 | /* |
13 | |
14 | 0 2 bytes magic header 0x1f, 0x8b (\037 \213) |
15 | 2 1 byte compression method |
16 | 0: store (copied) |
17 | 1: compress |
18 | 2: pack |
19 | 3: lzh |
20 | 4..7: reserved |
21 | 8: deflate |
22 | 3 1 byte flags |
23 | bit 0 set: file probably ascii text |
24 | bit 1 set: continuation of multi-part gzip file, part number present |
25 | bit 2 set: extra field present |
26 | bit 3 set: original file name present |
27 | bit 4 set: file comment present |
28 | bit 5 set: file is encrypted, encryption header present |
29 | bit 6,7: reserved |
30 | 4 4 bytes file modification time in Unix format |
31 | 8 1 byte extra flags (depend on compression method) |
32 | 9 1 byte OS type |
33 | [ |
34 | 2 bytes optional part number (second part=1) |
35 | ]? |
36 | [ |
37 | 2 bytes optional extra field length (e) |
38 | (e)bytes optional extra field |
39 | ]? |
40 | [ |
41 | bytes optional original file name, zero terminated |
42 | ]? |
43 | [ |
44 | bytes optional file comment, zero terminated |
45 | ]? |
46 | [ |
47 | 12 bytes optional encryption header |
48 | ]? |
49 | bytes compressed data |
50 | 4 bytes crc32 |
51 | 4 bytes uncompressed input size modulo 2^32 |
52 | |
53 | */ |
54 | |
55 | static idx_t GZipConsumeString(FileHandle &input) { |
56 | idx_t size = 1; // terminator |
57 | char buffer[1]; |
58 | while (input.Read(buffer, nr_bytes: 1) == 1) { |
59 | if (buffer[0] == '\0') { |
60 | break; |
61 | } |
62 | size++; |
63 | } |
64 | return size; |
65 | } |
66 | |
67 | struct MiniZStreamWrapper : public StreamWrapper { |
68 | ~MiniZStreamWrapper() override; |
69 | |
70 | CompressedFile *file = nullptr; |
71 | duckdb_miniz::mz_stream *mz_stream_ptr = nullptr; |
72 | bool writing = false; |
73 | duckdb_miniz::mz_ulong crc; |
74 | idx_t total_size; |
75 | |
76 | public: |
77 | void Initialize(CompressedFile &file, bool write) override; |
78 | |
79 | bool Read(StreamData &stream_data) override; |
80 | void Write(CompressedFile &file, StreamData &stream_data, data_ptr_t buffer, int64_t nr_bytes) override; |
81 | |
82 | void Close() override; |
83 | |
84 | void FlushStream(); |
85 | }; |
86 | |
87 | MiniZStreamWrapper::~MiniZStreamWrapper() { |
88 | // avoid closing if destroyed during stack unwinding |
89 | if (Exception::UncaughtException()) { |
90 | return; |
91 | } |
92 | try { |
93 | MiniZStreamWrapper::Close(); |
94 | } catch (...) { |
95 | } |
96 | } |
97 | |
98 | void MiniZStreamWrapper::Initialize(CompressedFile &file, bool write) { |
99 | Close(); |
100 | this->file = &file; |
101 | mz_stream_ptr = new duckdb_miniz::mz_stream(); |
102 | memset(s: mz_stream_ptr, c: 0, n: sizeof(duckdb_miniz::mz_stream)); |
103 | this->writing = write; |
104 | |
105 | // TODO use custom alloc/free methods in miniz to throw exceptions on OOM |
106 | uint8_t gzip_hdr[GZIP_HEADER_MINSIZE]; |
107 | if (write) { |
108 | crc = MZ_CRC32_INIT; |
109 | total_size = 0; |
110 | |
111 | MiniZStream::InitializeGZIPHeader(gzip_header: gzip_hdr); |
112 | file.child_handle->Write(buffer: gzip_hdr, nr_bytes: GZIP_HEADER_MINSIZE); |
113 | |
114 | auto ret = mz_deflateInit2(pStream: (duckdb_miniz::mz_streamp)mz_stream_ptr, level: duckdb_miniz::MZ_DEFAULT_LEVEL, MZ_DEFLATED, |
115 | window_bits: -MZ_DEFAULT_WINDOW_BITS, mem_level: 1, strategy: 0); |
116 | if (ret != duckdb_miniz::MZ_OK) { |
117 | throw InternalException("Failed to initialize miniz" ); |
118 | } |
119 | } else { |
120 | idx_t data_start = GZIP_HEADER_MINSIZE; |
121 | auto read_count = file.child_handle->Read(buffer: gzip_hdr, nr_bytes: GZIP_HEADER_MINSIZE); |
122 | GZipFileSystem::VerifyGZIPHeader(gzip_hdr, read_count); |
123 | // Skip over the extra field if necessary |
124 | if (gzip_hdr[3] & GZIP_FLAG_EXTRA) { |
125 | uint8_t gzip_xlen[2]; |
126 | file.child_handle->Seek(location: data_start); |
127 | file.child_handle->Read(buffer: gzip_xlen, nr_bytes: 2); |
128 | idx_t xlen = (uint8_t)gzip_xlen[0] | (uint8_t)gzip_xlen[1] << 8; |
129 | data_start += xlen + 2; |
130 | } |
131 | // Skip over the file name if necessary |
132 | if (gzip_hdr[3] & GZIP_FLAG_NAME) { |
133 | file.child_handle->Seek(location: data_start); |
134 | data_start += GZipConsumeString(input&: *file.child_handle); |
135 | } |
136 | file.child_handle->Seek(location: data_start); |
137 | // stream is now set to beginning of payload data |
138 | auto ret = duckdb_miniz::mz_inflateInit2(pStream: (duckdb_miniz::mz_streamp)mz_stream_ptr, window_bits: -MZ_DEFAULT_WINDOW_BITS); |
139 | if (ret != duckdb_miniz::MZ_OK) { |
140 | throw InternalException("Failed to initialize miniz" ); |
141 | } |
142 | } |
143 | } |
144 | |
145 | bool MiniZStreamWrapper::Read(StreamData &sd) { |
146 | // Handling for the concatenated files |
147 | if (sd.refresh) { |
148 | sd.refresh = false; |
149 | auto body_ptr = sd.in_buff_start + GZIP_FOOTER_SIZE; |
150 | uint8_t gzip_hdr[GZIP_HEADER_MINSIZE]; |
151 | memcpy(dest: gzip_hdr, src: body_ptr, n: GZIP_HEADER_MINSIZE); |
152 | GZipFileSystem::VerifyGZIPHeader(gzip_hdr, read_count: GZIP_HEADER_MINSIZE); |
153 | body_ptr += GZIP_HEADER_MINSIZE; |
154 | if (gzip_hdr[3] & GZIP_FLAG_EXTRA) { |
155 | idx_t xlen = (uint8_t)*body_ptr | (uint8_t) * (body_ptr + 1) << 8; |
156 | body_ptr += xlen + 2; |
157 | if (GZIP_FOOTER_SIZE + GZIP_HEADER_MINSIZE + 2 + xlen >= GZIP_HEADER_MAXSIZE) { |
158 | throw InternalException("Extra field resulting in GZIP header larger than defined maximum (%d)" , |
159 | GZIP_HEADER_MAXSIZE); |
160 | } |
161 | } |
162 | if (gzip_hdr[3] & GZIP_FLAG_NAME) { |
163 | char c; |
164 | do { |
165 | c = *body_ptr; |
166 | body_ptr++; |
167 | } while (c != '\0' && body_ptr < sd.in_buff_end); |
168 | if ((idx_t)(body_ptr - sd.in_buff_start) >= GZIP_HEADER_MAXSIZE) { |
169 | throw InternalException("Filename resulting in GZIP header larger than defined maximum (%d)" , |
170 | GZIP_HEADER_MAXSIZE); |
171 | } |
172 | } |
173 | sd.in_buff_start = body_ptr; |
174 | if (sd.in_buff_end - sd.in_buff_start < 1) { |
175 | Close(); |
176 | return true; |
177 | } |
178 | duckdb_miniz::mz_inflateEnd(pStream: mz_stream_ptr); |
179 | auto sta = duckdb_miniz::mz_inflateInit2(pStream: (duckdb_miniz::mz_streamp)mz_stream_ptr, window_bits: -MZ_DEFAULT_WINDOW_BITS); |
180 | if (sta != duckdb_miniz::MZ_OK) { |
181 | throw InternalException("Failed to initialize miniz" ); |
182 | } |
183 | } |
184 | |
185 | // actually decompress |
186 | mz_stream_ptr->next_in = sd.in_buff_start; |
187 | D_ASSERT(sd.in_buff_end - sd.in_buff_start < NumericLimits<int32_t>::Maximum()); |
188 | mz_stream_ptr->avail_in = (uint32_t)(sd.in_buff_end - sd.in_buff_start); |
189 | mz_stream_ptr->next_out = data_ptr_cast(src: sd.out_buff_end); |
190 | mz_stream_ptr->avail_out = (uint32_t)((sd.out_buff.get() + sd.out_buf_size) - sd.out_buff_end); |
191 | auto ret = duckdb_miniz::mz_inflate(pStream: mz_stream_ptr, flush: duckdb_miniz::MZ_NO_FLUSH); |
192 | if (ret != duckdb_miniz::MZ_OK && ret != duckdb_miniz::MZ_STREAM_END) { |
193 | throw IOException("Failed to decode gzip stream: %s" , duckdb_miniz::mz_error(err: ret)); |
194 | } |
195 | // update pointers following inflate() |
196 | sd.in_buff_start = (data_ptr_t)mz_stream_ptr->next_in; // NOLINT |
197 | sd.in_buff_end = sd.in_buff_start + mz_stream_ptr->avail_in; |
198 | sd.out_buff_end = data_ptr_cast(src: mz_stream_ptr->next_out); |
199 | D_ASSERT(sd.out_buff_end + mz_stream_ptr->avail_out == sd.out_buff.get() + sd.out_buf_size); |
200 | |
201 | // if stream ended, deallocate inflator |
202 | if (ret == duckdb_miniz::MZ_STREAM_END) { |
203 | // Last read from file done and remaining bytes only for footer or less |
204 | if ((sd.in_buff_end < sd.in_buff.get() + sd.in_buf_size) && mz_stream_ptr->avail_in <= GZIP_FOOTER_SIZE) { |
205 | Close(); |
206 | return true; |
207 | } |
208 | if (mz_stream_ptr->avail_in > GZIP_FOOTER_SIZE) { |
209 | // Definitely not concatenated gzip |
210 | if (*(sd.in_buff_start + GZIP_FOOTER_SIZE) != 0x1F) { |
211 | Close(); |
212 | return true; |
213 | } |
214 | } |
215 | // Concatenated GZIP potentially coming up - refresh input buffer |
216 | sd.refresh = true; |
217 | } |
218 | return false; |
219 | } |
220 | |
221 | void MiniZStreamWrapper::Write(CompressedFile &file, StreamData &sd, data_ptr_t uncompressed_data, |
222 | int64_t uncompressed_size) { |
223 | // update the src and the total size |
224 | crc = duckdb_miniz::mz_crc32(crc, ptr: reinterpret_cast<const unsigned char *>(uncompressed_data), buf_len: uncompressed_size); |
225 | total_size += uncompressed_size; |
226 | |
227 | auto remaining = uncompressed_size; |
228 | while (remaining > 0) { |
229 | idx_t output_remaining = (sd.out_buff.get() + sd.out_buf_size) - sd.out_buff_start; |
230 | |
231 | mz_stream_ptr->next_in = reinterpret_cast<const unsigned char *>(uncompressed_data); |
232 | mz_stream_ptr->avail_in = remaining; |
233 | mz_stream_ptr->next_out = sd.out_buff_start; |
234 | mz_stream_ptr->avail_out = output_remaining; |
235 | |
236 | auto res = mz_deflate(pStream: mz_stream_ptr, flush: duckdb_miniz::MZ_NO_FLUSH); |
237 | if (res != duckdb_miniz::MZ_OK) { |
238 | D_ASSERT(res != duckdb_miniz::MZ_STREAM_END); |
239 | throw InternalException("Failed to compress GZIP block" ); |
240 | } |
241 | sd.out_buff_start += output_remaining - mz_stream_ptr->avail_out; |
242 | if (mz_stream_ptr->avail_out == 0) { |
243 | // no more output buffer available: flush |
244 | file.child_handle->Write(buffer: sd.out_buff.get(), nr_bytes: sd.out_buff_start - sd.out_buff.get()); |
245 | sd.out_buff_start = sd.out_buff.get(); |
246 | } |
247 | idx_t written = remaining - mz_stream_ptr->avail_in; |
248 | uncompressed_data += written; |
249 | remaining = mz_stream_ptr->avail_in; |
250 | } |
251 | } |
252 | |
253 | void MiniZStreamWrapper::FlushStream() { |
254 | auto &sd = file->stream_data; |
255 | mz_stream_ptr->next_in = nullptr; |
256 | mz_stream_ptr->avail_in = 0; |
257 | while (true) { |
258 | auto output_remaining = (sd.out_buff.get() + sd.out_buf_size) - sd.out_buff_start; |
259 | mz_stream_ptr->next_out = sd.out_buff_start; |
260 | mz_stream_ptr->avail_out = output_remaining; |
261 | |
262 | auto res = mz_deflate(pStream: mz_stream_ptr, flush: duckdb_miniz::MZ_FINISH); |
263 | sd.out_buff_start += (output_remaining - mz_stream_ptr->avail_out); |
264 | if (sd.out_buff_start > sd.out_buff.get()) { |
265 | file->child_handle->Write(buffer: sd.out_buff.get(), nr_bytes: sd.out_buff_start - sd.out_buff.get()); |
266 | sd.out_buff_start = sd.out_buff.get(); |
267 | } |
268 | if (res == duckdb_miniz::MZ_STREAM_END) { |
269 | break; |
270 | } |
271 | if (res != duckdb_miniz::MZ_OK) { |
272 | throw InternalException("Failed to compress GZIP block" ); |
273 | } |
274 | } |
275 | } |
276 | |
277 | void MiniZStreamWrapper::Close() { |
278 | if (!mz_stream_ptr) { |
279 | return; |
280 | } |
281 | if (writing) { |
282 | // flush anything remaining in the stream |
283 | FlushStream(); |
284 | |
285 | // write the footer |
286 | unsigned char [MiniZStream::GZIP_FOOTER_SIZE]; |
287 | MiniZStream::InitializeGZIPFooter(gzip_footer, crc, uncompressed_size: total_size); |
288 | file->child_handle->Write(buffer: gzip_footer, nr_bytes: MiniZStream::GZIP_FOOTER_SIZE); |
289 | |
290 | duckdb_miniz::mz_deflateEnd(pStream: mz_stream_ptr); |
291 | } else { |
292 | duckdb_miniz::mz_inflateEnd(pStream: mz_stream_ptr); |
293 | } |
294 | delete mz_stream_ptr; |
295 | mz_stream_ptr = nullptr; |
296 | file = nullptr; |
297 | } |
298 | |
299 | class GZipFile : public CompressedFile { |
300 | public: |
301 | GZipFile(unique_ptr<FileHandle> child_handle_p, const string &path, bool write) |
302 | : CompressedFile(gzip_fs, std::move(child_handle_p), path) { |
303 | Initialize(write); |
304 | } |
305 | |
306 | GZipFileSystem gzip_fs; |
307 | }; |
308 | |
309 | void GZipFileSystem::(uint8_t gzip_hdr[], idx_t read_count) { |
310 | // check for incorrectly formatted files |
311 | if (read_count != GZIP_HEADER_MINSIZE) { |
312 | throw IOException("Input is not a GZIP stream" ); |
313 | } |
314 | if (gzip_hdr[0] != 0x1F || gzip_hdr[1] != 0x8B) { // magic header |
315 | throw IOException("Input is not a GZIP stream" ); |
316 | } |
317 | if (gzip_hdr[2] != GZIP_COMPRESSION_DEFLATE) { // compression method |
318 | throw IOException("Unsupported GZIP compression method" ); |
319 | } |
320 | if (gzip_hdr[3] & GZIP_FLAG_UNSUPPORTED) { |
321 | throw IOException("Unsupported GZIP archive" ); |
322 | } |
323 | } |
324 | |
325 | string GZipFileSystem::UncompressGZIPString(const string &in) { |
326 | // decompress file |
327 | auto body_ptr = in.data(); |
328 | |
329 | auto mz_stream_ptr = new duckdb_miniz::mz_stream(); |
330 | memset(s: mz_stream_ptr, c: 0, n: sizeof(duckdb_miniz::mz_stream)); |
331 | |
332 | uint8_t gzip_hdr[GZIP_HEADER_MINSIZE]; |
333 | |
334 | // check for incorrectly formatted files |
335 | |
336 | // TODO this is mostly the same as gzip_file_system.cpp |
337 | if (in.size() < GZIP_HEADER_MINSIZE) { |
338 | throw IOException("Input is not a GZIP stream" ); |
339 | } |
340 | memcpy(dest: gzip_hdr, src: body_ptr, n: GZIP_HEADER_MINSIZE); |
341 | body_ptr += GZIP_HEADER_MINSIZE; |
342 | GZipFileSystem::VerifyGZIPHeader(gzip_hdr, read_count: GZIP_HEADER_MINSIZE); |
343 | |
344 | if (gzip_hdr[3] & GZIP_FLAG_EXTRA) { |
345 | throw IOException("Extra field in a GZIP stream unsupported" ); |
346 | } |
347 | |
348 | if (gzip_hdr[3] & GZIP_FLAG_NAME) { |
349 | char c; |
350 | do { |
351 | c = *body_ptr; |
352 | body_ptr++; |
353 | } while (c != '\0' && (idx_t)(body_ptr - in.data()) < in.size()); |
354 | } |
355 | |
356 | // stream is now set to beginning of payload data |
357 | auto status = duckdb_miniz::mz_inflateInit2(pStream: mz_stream_ptr, window_bits: -MZ_DEFAULT_WINDOW_BITS); |
358 | if (status != duckdb_miniz::MZ_OK) { |
359 | throw InternalException("Failed to initialize miniz" ); |
360 | } |
361 | |
362 | auto bytes_remaining = in.size() - (body_ptr - in.data()); |
363 | mz_stream_ptr->next_in = const_uchar_ptr_cast(src: body_ptr); |
364 | mz_stream_ptr->avail_in = bytes_remaining; |
365 | |
366 | unsigned char decompress_buffer[BUFSIZ]; |
367 | string decompressed; |
368 | |
369 | while (status == duckdb_miniz::MZ_OK) { |
370 | mz_stream_ptr->next_out = decompress_buffer; |
371 | mz_stream_ptr->avail_out = sizeof(decompress_buffer); |
372 | status = mz_inflate(pStream: mz_stream_ptr, flush: duckdb_miniz::MZ_NO_FLUSH); |
373 | if (status != duckdb_miniz::MZ_STREAM_END && status != duckdb_miniz::MZ_OK) { |
374 | throw IOException("Failed to uncompress" ); |
375 | } |
376 | decompressed.append(s: char_ptr_cast(src: decompress_buffer), n: mz_stream_ptr->total_out - decompressed.size()); |
377 | } |
378 | duckdb_miniz::mz_inflateEnd(pStream: mz_stream_ptr); |
379 | if (decompressed.empty()) { |
380 | throw IOException("Failed to uncompress" ); |
381 | } |
382 | return decompressed; |
383 | } |
384 | |
385 | unique_ptr<FileHandle> GZipFileSystem::OpenCompressedFile(unique_ptr<FileHandle> handle, bool write) { |
386 | auto path = handle->path; |
387 | return make_uniq<GZipFile>(args: std::move(handle), args&: path, args&: write); |
388 | } |
389 | |
390 | unique_ptr<StreamWrapper> GZipFileSystem::CreateStream() { |
391 | return make_uniq<MiniZStreamWrapper>(); |
392 | } |
393 | |
394 | idx_t GZipFileSystem::InBufferSize() { |
395 | return BUFFER_SIZE; |
396 | } |
397 | |
398 | idx_t GZipFileSystem::OutBufferSize() { |
399 | return BUFFER_SIZE; |
400 | } |
401 | |
402 | } // namespace duckdb |
403 | |