1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/util/compression_zlib.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstring> |
23 | #include <limits> |
24 | #include <memory> |
25 | #include <sstream> |
26 | #include <string> |
27 | |
28 | #include <zconf.h> |
29 | #include <zlib.h> |
30 | |
31 | #include "arrow/status.h" |
32 | #include "arrow/util/logging.h" |
33 | #include "arrow/util/macros.h" |
34 | |
35 | namespace arrow { |
36 | namespace util { |
37 | |
38 | constexpr int kGZipDefaultCompressionLevel = 9; |
39 | |
40 | // ---------------------------------------------------------------------- |
41 | // gzip implementation |
42 | |
43 | // These are magic numbers from zlib.h. Not clear why they are not defined |
44 | // there. |
45 | |
46 | // Maximum window size |
47 | static constexpr int WINDOW_BITS = 15; |
48 | |
49 | // Output Gzip. |
50 | static constexpr int GZIP_CODEC = 16; |
51 | |
52 | // Determine if this is libz or gzip from header. |
53 | static constexpr int DETECT_CODEC = 32; |
54 | |
55 | static int CompressionWindowBitsForFormat(GZipCodec::Format format) { |
56 | int window_bits = WINDOW_BITS; |
57 | switch (format) { |
58 | case GZipCodec::DEFLATE: |
59 | window_bits = -window_bits; |
60 | break; |
61 | case GZipCodec::GZIP: |
62 | window_bits += GZIP_CODEC; |
63 | break; |
64 | case GZipCodec::ZLIB: |
65 | break; |
66 | } |
67 | return window_bits; |
68 | } |
69 | |
70 | static int DecompressionWindowBitsForFormat(GZipCodec::Format format) { |
71 | if (format == GZipCodec::DEFLATE) { |
72 | return -WINDOW_BITS; |
73 | } else { |
74 | /* If not deflate, autodetect format from header */ |
75 | return WINDOW_BITS | DETECT_CODEC; |
76 | } |
77 | } |
78 | |
79 | static Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { |
80 | return Status::IOError(prefix_msg, (msg) ? msg : "(unknown error)" ); |
81 | } |
82 | |
83 | // ---------------------------------------------------------------------- |
84 | // gzip decompressor implementation |
85 | |
86 | class GZipDecompressor : public Decompressor { |
87 | public: |
88 | GZipDecompressor() : initialized_(false), finished_(false) {} |
89 | |
90 | ~GZipDecompressor() override { |
91 | if (initialized_) { |
92 | inflateEnd(&stream_); |
93 | } |
94 | } |
95 | |
96 | Status Init(GZipCodec::Format format) { |
97 | DCHECK(!initialized_); |
98 | memset(&stream_, 0, sizeof(stream_)); |
99 | finished_ = false; |
100 | |
101 | int ret; |
102 | int window_bits = DecompressionWindowBitsForFormat(format); |
103 | if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { |
104 | return ZlibError("zlib inflateInit failed: " ); |
105 | } else { |
106 | initialized_ = true; |
107 | return Status::OK(); |
108 | } |
109 | } |
110 | |
111 | Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, |
112 | uint8_t* output, int64_t* bytes_read, int64_t* bytes_written, |
113 | bool* need_more_output) override { |
114 | static constexpr auto input_limit = |
115 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); |
116 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); |
117 | stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit)); |
118 | stream_.next_out = reinterpret_cast<Bytef*>(output); |
119 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); |
120 | int ret; |
121 | |
122 | ret = inflate(&stream_, Z_SYNC_FLUSH); |
123 | if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) { |
124 | return ZlibError("zlib inflate failed: " ); |
125 | } |
126 | if (ret == Z_NEED_DICT) { |
127 | return ZlibError("zlib inflate failed (need preset dictionary): " ); |
128 | } |
129 | if (ret == Z_BUF_ERROR) { |
130 | // No progress was possible |
131 | *bytes_read = 0; |
132 | *bytes_written = 0; |
133 | *need_more_output = true; |
134 | } else { |
135 | DCHECK(ret == Z_OK || ret == Z_STREAM_END); |
136 | // Some progress has been made |
137 | *bytes_read = input_len - stream_.avail_in; |
138 | *bytes_written = output_len - stream_.avail_out; |
139 | *need_more_output = false; |
140 | } |
141 | finished_ = (ret == Z_STREAM_END); |
142 | return Status::OK(); |
143 | } |
144 | |
145 | bool IsFinished() override { return finished_; } |
146 | |
147 | protected: |
148 | Status ZlibError(const char* prefix_msg) { |
149 | return ZlibErrorPrefix(prefix_msg, stream_.msg); |
150 | } |
151 | |
152 | z_stream stream_; |
153 | bool initialized_; |
154 | bool finished_; |
155 | }; |
156 | |
157 | // ---------------------------------------------------------------------- |
158 | // gzip compressor implementation |
159 | |
160 | class GZipCompressor : public Compressor { |
161 | public: |
162 | GZipCompressor() : initialized_(false) {} |
163 | |
164 | ~GZipCompressor() override { |
165 | if (initialized_) { |
166 | deflateEnd(&stream_); |
167 | } |
168 | } |
169 | |
170 | Status Init(GZipCodec::Format format) { |
171 | DCHECK(!initialized_); |
172 | memset(&stream_, 0, sizeof(stream_)); |
173 | |
174 | int ret; |
175 | // Initialize to run specified format |
176 | int window_bits = CompressionWindowBitsForFormat(format); |
177 | if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, |
178 | kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) { |
179 | return ZlibError("zlib deflateInit failed: " ); |
180 | } else { |
181 | initialized_ = true; |
182 | return Status::OK(); |
183 | } |
184 | } |
185 | |
186 | Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len, |
187 | uint8_t* output, int64_t* bytes_read, int64_t* bytes_written) override; |
188 | |
189 | Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
190 | bool* should_retry) override; |
191 | |
192 | Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
193 | bool* should_retry) override; |
194 | |
195 | protected: |
196 | Status ZlibError(const char* prefix_msg) { |
197 | return ZlibErrorPrefix(prefix_msg, stream_.msg); |
198 | } |
199 | |
200 | z_stream stream_; |
201 | bool initialized_; |
202 | }; |
203 | |
204 | Status GZipCompressor::Compress(int64_t input_len, const uint8_t* input, |
205 | int64_t output_len, uint8_t* output, int64_t* bytes_read, |
206 | int64_t* bytes_written) { |
207 | DCHECK(initialized_) << "Called on non-initialized stream" ; |
208 | |
209 | static constexpr auto input_limit = |
210 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); |
211 | |
212 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); |
213 | stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit)); |
214 | stream_.next_out = reinterpret_cast<Bytef*>(output); |
215 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); |
216 | |
217 | int64_t ret = 0; |
218 | ret = deflate(&stream_, Z_NO_FLUSH); |
219 | if (ret == Z_STREAM_ERROR) { |
220 | return ZlibError("zlib compress failed: " ); |
221 | } |
222 | if (ret == Z_OK) { |
223 | // Some progress has been made |
224 | *bytes_read = input_len - stream_.avail_in; |
225 | *bytes_written = output_len - stream_.avail_out; |
226 | } else { |
227 | // No progress was possible |
228 | DCHECK_EQ(ret, Z_BUF_ERROR); |
229 | *bytes_read = 0; |
230 | *bytes_written = 0; |
231 | } |
232 | return Status::OK(); |
233 | } |
234 | |
235 | Status GZipCompressor::Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
236 | bool* should_retry) { |
237 | DCHECK(initialized_) << "Called on non-initialized stream" ; |
238 | |
239 | static constexpr auto input_limit = |
240 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); |
241 | |
242 | stream_.avail_in = 0; |
243 | stream_.next_out = reinterpret_cast<Bytef*>(output); |
244 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); |
245 | |
246 | int64_t ret = 0; |
247 | ret = deflate(&stream_, Z_SYNC_FLUSH); |
248 | if (ret == Z_STREAM_ERROR) { |
249 | return ZlibError("zlib flush failed: " ); |
250 | } |
251 | if (ret == Z_OK) { |
252 | *bytes_written = output_len - stream_.avail_out; |
253 | } else { |
254 | DCHECK_EQ(ret, Z_BUF_ERROR); |
255 | *bytes_written = 0; |
256 | } |
257 | // "If deflate returns with avail_out == 0, this function must be called |
258 | // again with the same value of the flush parameter and more output space |
259 | // (updated avail_out), until the flush is complete (deflate returns |
260 | // with non-zero avail_out)." |
261 | *should_retry = (*bytes_written == 0); |
262 | return Status::OK(); |
263 | } |
264 | |
265 | Status GZipCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
266 | bool* should_retry) { |
267 | DCHECK(initialized_) << "Called on non-initialized stream" ; |
268 | |
269 | static constexpr auto input_limit = |
270 | static_cast<int64_t>(std::numeric_limits<uInt>::max()); |
271 | |
272 | stream_.avail_in = 0; |
273 | stream_.next_out = reinterpret_cast<Bytef*>(output); |
274 | stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit)); |
275 | |
276 | int64_t ret = 0; |
277 | ret = deflate(&stream_, Z_FINISH); |
278 | if (ret == Z_STREAM_ERROR) { |
279 | return ZlibError("zlib flush failed: " ); |
280 | } |
281 | *bytes_written = output_len - stream_.avail_out; |
282 | if (ret == Z_STREAM_END) { |
283 | // Flush complete, we can now end the stream |
284 | *should_retry = false; |
285 | initialized_ = false; |
286 | ret = deflateEnd(&stream_); |
287 | if (ret == Z_OK) { |
288 | return Status::OK(); |
289 | } else { |
290 | return ZlibError("zlib end failed: " ); |
291 | } |
292 | } else { |
293 | // Not everything could be flushed, |
294 | *should_retry = true; |
295 | return Status::OK(); |
296 | } |
297 | } |
298 | |
299 | // ---------------------------------------------------------------------- |
300 | // gzip codec implementation |
301 | |
302 | class GZipCodec::GZipCodecImpl { |
303 | public: |
304 | explicit GZipCodecImpl(GZipCodec::Format format) |
305 | : format_(format), |
306 | compressor_initialized_(false), |
307 | decompressor_initialized_(false) {} |
308 | |
309 | ~GZipCodecImpl() { |
310 | EndCompressor(); |
311 | EndDecompressor(); |
312 | } |
313 | |
314 | Status MakeCompressor(std::shared_ptr<Compressor>* out) { |
315 | auto ptr = std::make_shared<GZipCompressor>(); |
316 | RETURN_NOT_OK(ptr->Init(format_)); |
317 | *out = ptr; |
318 | return Status::OK(); |
319 | } |
320 | |
321 | Status MakeDecompressor(std::shared_ptr<Decompressor>* out) { |
322 | auto ptr = std::make_shared<GZipDecompressor>(); |
323 | RETURN_NOT_OK(ptr->Init(format_)); |
324 | *out = ptr; |
325 | return Status::OK(); |
326 | } |
327 | |
328 | Status InitCompressor() { |
329 | EndDecompressor(); |
330 | memset(&stream_, 0, sizeof(stream_)); |
331 | |
332 | int ret; |
333 | // Initialize to run specified format |
334 | int window_bits = CompressionWindowBitsForFormat(format_); |
335 | if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, |
336 | kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) { |
337 | return ZlibErrorPrefix("zlib deflateInit failed: " , stream_.msg); |
338 | } |
339 | compressor_initialized_ = true; |
340 | return Status::OK(); |
341 | } |
342 | |
343 | void EndCompressor() { |
344 | if (compressor_initialized_) { |
345 | (void)deflateEnd(&stream_); |
346 | } |
347 | compressor_initialized_ = false; |
348 | } |
349 | |
350 | Status InitDecompressor() { |
351 | EndCompressor(); |
352 | memset(&stream_, 0, sizeof(stream_)); |
353 | int ret; |
354 | |
355 | // Initialize to run either deflate or zlib/gzip format |
356 | int window_bits = DecompressionWindowBitsForFormat(format_); |
357 | if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { |
358 | return ZlibErrorPrefix("zlib inflateInit failed: " , stream_.msg); |
359 | } |
360 | decompressor_initialized_ = true; |
361 | return Status::OK(); |
362 | } |
363 | |
364 | void EndDecompressor() { |
365 | if (decompressor_initialized_) { |
366 | (void)inflateEnd(&stream_); |
367 | } |
368 | decompressor_initialized_ = false; |
369 | } |
370 | |
371 | Status Decompress(int64_t input_length, const uint8_t* input, |
372 | int64_t output_buffer_length, uint8_t* output, |
373 | int64_t* output_length) { |
374 | if (!decompressor_initialized_) { |
375 | RETURN_NOT_OK(InitDecompressor()); |
376 | } |
377 | if (output_buffer_length == 0) { |
378 | // The zlib library does not allow *output to be NULL, even when |
379 | // output_buffer_length is 0 (inflate() will return Z_STREAM_ERROR). We don't |
380 | // consider this an error, so bail early if no output is expected. Note that we |
381 | // don't signal an error if the input actually contains compressed data. |
382 | if (output_length) { |
383 | *output_length = 0; |
384 | } |
385 | return Status::OK(); |
386 | } |
387 | |
388 | // Reset the stream for this block |
389 | if (inflateReset(&stream_) != Z_OK) { |
390 | return ZlibErrorPrefix("zlib inflateReset failed: " , stream_.msg); |
391 | } |
392 | |
393 | int ret = 0; |
394 | // gzip can run in streaming mode or non-streaming mode. We only |
395 | // support the non-streaming use case where we present it the entire |
396 | // compressed input and a buffer big enough to contain the entire |
397 | // compressed output. In the case where we don't know the output, |
398 | // we just make a bigger buffer and try the non-streaming mode |
399 | // from the beginning again. |
400 | while (ret != Z_STREAM_END) { |
401 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); |
402 | stream_.avail_in = static_cast<uInt>(input_length); |
403 | stream_.next_out = reinterpret_cast<Bytef*>(output); |
404 | stream_.avail_out = static_cast<uInt>(output_buffer_length); |
405 | |
406 | // We know the output size. In this case, we can use Z_FINISH |
407 | // which is more efficient. |
408 | ret = inflate(&stream_, Z_FINISH); |
409 | if (ret == Z_STREAM_END || ret != Z_OK) break; |
410 | |
411 | // Failure, buffer was too small |
412 | return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=" , |
413 | input_length, " OutputLength=" , output_buffer_length); |
414 | } |
415 | |
416 | // Failure for some other reason |
417 | if (ret != Z_STREAM_END) { |
418 | return ZlibErrorPrefix("GZipCodec failed: " , stream_.msg); |
419 | } |
420 | |
421 | if (output_length) { |
422 | *output_length = stream_.total_out; |
423 | } |
424 | |
425 | return Status::OK(); |
426 | } |
427 | |
428 | int64_t MaxCompressedLen(int64_t input_length, const uint8_t* ARROW_ARG_UNUSED(input)) { |
429 | // Must be in compression mode |
430 | if (!compressor_initialized_) { |
431 | Status s = InitCompressor(); |
432 | DCHECK(s.ok()); |
433 | } |
434 | int64_t max_len = deflateBound(&stream_, static_cast<uLong>(input_length)); |
435 | // ARROW-3514: return a more pessimistic estimate to account for bugs |
436 | // in old zlib versions. |
437 | return max_len + 12; |
438 | } |
439 | |
440 | Status Compress(int64_t input_length, const uint8_t* input, int64_t output_buffer_len, |
441 | uint8_t* output, int64_t* output_len) { |
442 | if (!compressor_initialized_) { |
443 | RETURN_NOT_OK(InitCompressor()); |
444 | } |
445 | stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); |
446 | stream_.avail_in = static_cast<uInt>(input_length); |
447 | stream_.next_out = reinterpret_cast<Bytef*>(output); |
448 | stream_.avail_out = static_cast<uInt>(output_buffer_len); |
449 | |
450 | int64_t ret = 0; |
451 | if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { |
452 | if (ret == Z_OK) { |
453 | // Will return Z_OK (and stream.msg NOT set) if stream.avail_out is too |
454 | // small |
455 | return Status::IOError("zlib deflate failed, output buffer too small" ); |
456 | } |
457 | |
458 | return ZlibErrorPrefix("zlib deflate failed: " , stream_.msg); |
459 | } |
460 | |
461 | if (deflateReset(&stream_) != Z_OK) { |
462 | return ZlibErrorPrefix("zlib deflateReset failed: " , stream_.msg); |
463 | } |
464 | |
465 | // Actual output length |
466 | *output_len = output_buffer_len - stream_.avail_out; |
467 | return Status::OK(); |
468 | } |
469 | |
470 | private: |
471 | // zlib is stateful and the z_stream state variable must be initialized |
472 | // before |
473 | z_stream stream_; |
474 | |
475 | // Realistically, this will always be GZIP, but we leave the option open to |
476 | // configure |
477 | GZipCodec::Format format_; |
478 | |
479 | // These variables are mutually exclusive. When the codec is in "compressor" |
480 | // state, compressor_initialized_ is true while decompressor_initialized_ is |
481 | // false. When it's decompressing, the opposite is true. |
482 | // |
483 | // Indeed, this is slightly hacky, but the alternative is having separate |
484 | // Compressor and Decompressor classes. If this ever becomes an issue, we can |
485 | // perform the refactoring then |
486 | bool compressor_initialized_; |
487 | bool decompressor_initialized_; |
488 | }; |
489 | |
490 | GZipCodec::GZipCodec(Format format) { impl_.reset(new GZipCodecImpl(format)); } |
491 | |
492 | GZipCodec::~GZipCodec() {} |
493 | |
494 | Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input, |
495 | int64_t output_buffer_len, uint8_t* output) { |
496 | return impl_->Decompress(input_length, input, output_buffer_len, output, nullptr); |
497 | } |
498 | |
499 | Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input, |
500 | int64_t output_buffer_len, uint8_t* output, |
501 | int64_t* output_len) { |
502 | return impl_->Decompress(input_length, input, output_buffer_len, output, output_len); |
503 | } |
504 | |
505 | int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) { |
506 | return impl_->MaxCompressedLen(input_length, input); |
507 | } |
508 | |
509 | Status GZipCodec::Compress(int64_t input_length, const uint8_t* input, |
510 | int64_t output_buffer_len, uint8_t* output, |
511 | int64_t* output_len) { |
512 | return impl_->Compress(input_length, input, output_buffer_len, output, output_len); |
513 | } |
514 | |
515 | Status GZipCodec::MakeCompressor(std::shared_ptr<Compressor>* out) { |
516 | return impl_->MakeCompressor(out); |
517 | } |
518 | |
519 | Status GZipCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) { |
520 | return impl_->MakeDecompressor(out); |
521 | } |
522 | |
523 | const char* GZipCodec::name() const { return "gzip" ; } |
524 | |
525 | } // namespace util |
526 | } // namespace arrow |
527 | |