1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/util/compression_zlib.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <limits>
24#include <memory>
25#include <sstream>
26#include <string>
27
28#include <zconf.h>
29#include <zlib.h>
30
31#include "arrow/status.h"
32#include "arrow/util/logging.h"
33#include "arrow/util/macros.h"
34
35namespace arrow {
36namespace util {
37
38constexpr int kGZipDefaultCompressionLevel = 9;
39
40// ----------------------------------------------------------------------
41// gzip implementation
42
43// These are magic numbers from zlib.h. Not clear why they are not defined
44// there.
45
46// Maximum window size
47static constexpr int WINDOW_BITS = 15;
48
49// Output Gzip.
50static constexpr int GZIP_CODEC = 16;
51
52// Determine if this is libz or gzip from header.
53static constexpr int DETECT_CODEC = 32;
54
55static int CompressionWindowBitsForFormat(GZipCodec::Format format) {
56 int window_bits = WINDOW_BITS;
57 switch (format) {
58 case GZipCodec::DEFLATE:
59 window_bits = -window_bits;
60 break;
61 case GZipCodec::GZIP:
62 window_bits += GZIP_CODEC;
63 break;
64 case GZipCodec::ZLIB:
65 break;
66 }
67 return window_bits;
68}
69
70static int DecompressionWindowBitsForFormat(GZipCodec::Format format) {
71 if (format == GZipCodec::DEFLATE) {
72 return -WINDOW_BITS;
73 } else {
74 /* If not deflate, autodetect format from header */
75 return WINDOW_BITS | DETECT_CODEC;
76 }
77}
78
79static Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) {
80 return Status::IOError(prefix_msg, (msg) ? msg : "(unknown error)");
81}
82
83// ----------------------------------------------------------------------
84// gzip decompressor implementation
85
86class GZipDecompressor : public Decompressor {
87 public:
88 GZipDecompressor() : initialized_(false), finished_(false) {}
89
90 ~GZipDecompressor() override {
91 if (initialized_) {
92 inflateEnd(&stream_);
93 }
94 }
95
96 Status Init(GZipCodec::Format format) {
97 DCHECK(!initialized_);
98 memset(&stream_, 0, sizeof(stream_));
99 finished_ = false;
100
101 int ret;
102 int window_bits = DecompressionWindowBitsForFormat(format);
103 if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
104 return ZlibError("zlib inflateInit failed: ");
105 } else {
106 initialized_ = true;
107 return Status::OK();
108 }
109 }
110
111 Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
112 uint8_t* output, int64_t* bytes_read, int64_t* bytes_written,
113 bool* need_more_output) override {
114 static constexpr auto input_limit =
115 static_cast<int64_t>(std::numeric_limits<uInt>::max());
116 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
117 stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
118 stream_.next_out = reinterpret_cast<Bytef*>(output);
119 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
120 int ret;
121
122 ret = inflate(&stream_, Z_SYNC_FLUSH);
123 if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) {
124 return ZlibError("zlib inflate failed: ");
125 }
126 if (ret == Z_NEED_DICT) {
127 return ZlibError("zlib inflate failed (need preset dictionary): ");
128 }
129 if (ret == Z_BUF_ERROR) {
130 // No progress was possible
131 *bytes_read = 0;
132 *bytes_written = 0;
133 *need_more_output = true;
134 } else {
135 DCHECK(ret == Z_OK || ret == Z_STREAM_END);
136 // Some progress has been made
137 *bytes_read = input_len - stream_.avail_in;
138 *bytes_written = output_len - stream_.avail_out;
139 *need_more_output = false;
140 }
141 finished_ = (ret == Z_STREAM_END);
142 return Status::OK();
143 }
144
145 bool IsFinished() override { return finished_; }
146
147 protected:
148 Status ZlibError(const char* prefix_msg) {
149 return ZlibErrorPrefix(prefix_msg, stream_.msg);
150 }
151
152 z_stream stream_;
153 bool initialized_;
154 bool finished_;
155};
156
157// ----------------------------------------------------------------------
158// gzip compressor implementation
159
160class GZipCompressor : public Compressor {
161 public:
162 GZipCompressor() : initialized_(false) {}
163
164 ~GZipCompressor() override {
165 if (initialized_) {
166 deflateEnd(&stream_);
167 }
168 }
169
170 Status Init(GZipCodec::Format format) {
171 DCHECK(!initialized_);
172 memset(&stream_, 0, sizeof(stream_));
173
174 int ret;
175 // Initialize to run specified format
176 int window_bits = CompressionWindowBitsForFormat(format);
177 if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits,
178 kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) {
179 return ZlibError("zlib deflateInit failed: ");
180 } else {
181 initialized_ = true;
182 return Status::OK();
183 }
184 }
185
186 Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
187 uint8_t* output, int64_t* bytes_read, int64_t* bytes_written) override;
188
189 Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
190 bool* should_retry) override;
191
192 Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
193 bool* should_retry) override;
194
195 protected:
196 Status ZlibError(const char* prefix_msg) {
197 return ZlibErrorPrefix(prefix_msg, stream_.msg);
198 }
199
200 z_stream stream_;
201 bool initialized_;
202};
203
204Status GZipCompressor::Compress(int64_t input_len, const uint8_t* input,
205 int64_t output_len, uint8_t* output, int64_t* bytes_read,
206 int64_t* bytes_written) {
207 DCHECK(initialized_) << "Called on non-initialized stream";
208
209 static constexpr auto input_limit =
210 static_cast<int64_t>(std::numeric_limits<uInt>::max());
211
212 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
213 stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
214 stream_.next_out = reinterpret_cast<Bytef*>(output);
215 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
216
217 int64_t ret = 0;
218 ret = deflate(&stream_, Z_NO_FLUSH);
219 if (ret == Z_STREAM_ERROR) {
220 return ZlibError("zlib compress failed: ");
221 }
222 if (ret == Z_OK) {
223 // Some progress has been made
224 *bytes_read = input_len - stream_.avail_in;
225 *bytes_written = output_len - stream_.avail_out;
226 } else {
227 // No progress was possible
228 DCHECK_EQ(ret, Z_BUF_ERROR);
229 *bytes_read = 0;
230 *bytes_written = 0;
231 }
232 return Status::OK();
233}
234
235Status GZipCompressor::Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
236 bool* should_retry) {
237 DCHECK(initialized_) << "Called on non-initialized stream";
238
239 static constexpr auto input_limit =
240 static_cast<int64_t>(std::numeric_limits<uInt>::max());
241
242 stream_.avail_in = 0;
243 stream_.next_out = reinterpret_cast<Bytef*>(output);
244 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
245
246 int64_t ret = 0;
247 ret = deflate(&stream_, Z_SYNC_FLUSH);
248 if (ret == Z_STREAM_ERROR) {
249 return ZlibError("zlib flush failed: ");
250 }
251 if (ret == Z_OK) {
252 *bytes_written = output_len - stream_.avail_out;
253 } else {
254 DCHECK_EQ(ret, Z_BUF_ERROR);
255 *bytes_written = 0;
256 }
257 // "If deflate returns with avail_out == 0, this function must be called
258 // again with the same value of the flush parameter and more output space
259 // (updated avail_out), until the flush is complete (deflate returns
260 // with non-zero avail_out)."
261 *should_retry = (*bytes_written == 0);
262 return Status::OK();
263}
264
265Status GZipCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
266 bool* should_retry) {
267 DCHECK(initialized_) << "Called on non-initialized stream";
268
269 static constexpr auto input_limit =
270 static_cast<int64_t>(std::numeric_limits<uInt>::max());
271
272 stream_.avail_in = 0;
273 stream_.next_out = reinterpret_cast<Bytef*>(output);
274 stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
275
276 int64_t ret = 0;
277 ret = deflate(&stream_, Z_FINISH);
278 if (ret == Z_STREAM_ERROR) {
279 return ZlibError("zlib flush failed: ");
280 }
281 *bytes_written = output_len - stream_.avail_out;
282 if (ret == Z_STREAM_END) {
283 // Flush complete, we can now end the stream
284 *should_retry = false;
285 initialized_ = false;
286 ret = deflateEnd(&stream_);
287 if (ret == Z_OK) {
288 return Status::OK();
289 } else {
290 return ZlibError("zlib end failed: ");
291 }
292 } else {
293 // Not everything could be flushed,
294 *should_retry = true;
295 return Status::OK();
296 }
297}
298
299// ----------------------------------------------------------------------
300// gzip codec implementation
301
302class GZipCodec::GZipCodecImpl {
303 public:
304 explicit GZipCodecImpl(GZipCodec::Format format)
305 : format_(format),
306 compressor_initialized_(false),
307 decompressor_initialized_(false) {}
308
309 ~GZipCodecImpl() {
310 EndCompressor();
311 EndDecompressor();
312 }
313
314 Status MakeCompressor(std::shared_ptr<Compressor>* out) {
315 auto ptr = std::make_shared<GZipCompressor>();
316 RETURN_NOT_OK(ptr->Init(format_));
317 *out = ptr;
318 return Status::OK();
319 }
320
321 Status MakeDecompressor(std::shared_ptr<Decompressor>* out) {
322 auto ptr = std::make_shared<GZipDecompressor>();
323 RETURN_NOT_OK(ptr->Init(format_));
324 *out = ptr;
325 return Status::OK();
326 }
327
328 Status InitCompressor() {
329 EndDecompressor();
330 memset(&stream_, 0, sizeof(stream_));
331
332 int ret;
333 // Initialize to run specified format
334 int window_bits = CompressionWindowBitsForFormat(format_);
335 if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits,
336 kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) {
337 return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg);
338 }
339 compressor_initialized_ = true;
340 return Status::OK();
341 }
342
343 void EndCompressor() {
344 if (compressor_initialized_) {
345 (void)deflateEnd(&stream_);
346 }
347 compressor_initialized_ = false;
348 }
349
350 Status InitDecompressor() {
351 EndCompressor();
352 memset(&stream_, 0, sizeof(stream_));
353 int ret;
354
355 // Initialize to run either deflate or zlib/gzip format
356 int window_bits = DecompressionWindowBitsForFormat(format_);
357 if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
358 return ZlibErrorPrefix("zlib inflateInit failed: ", stream_.msg);
359 }
360 decompressor_initialized_ = true;
361 return Status::OK();
362 }
363
364 void EndDecompressor() {
365 if (decompressor_initialized_) {
366 (void)inflateEnd(&stream_);
367 }
368 decompressor_initialized_ = false;
369 }
370
371 Status Decompress(int64_t input_length, const uint8_t* input,
372 int64_t output_buffer_length, uint8_t* output,
373 int64_t* output_length) {
374 if (!decompressor_initialized_) {
375 RETURN_NOT_OK(InitDecompressor());
376 }
377 if (output_buffer_length == 0) {
378 // The zlib library does not allow *output to be NULL, even when
379 // output_buffer_length is 0 (inflate() will return Z_STREAM_ERROR). We don't
380 // consider this an error, so bail early if no output is expected. Note that we
381 // don't signal an error if the input actually contains compressed data.
382 if (output_length) {
383 *output_length = 0;
384 }
385 return Status::OK();
386 }
387
388 // Reset the stream for this block
389 if (inflateReset(&stream_) != Z_OK) {
390 return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
391 }
392
393 int ret = 0;
394 // gzip can run in streaming mode or non-streaming mode. We only
395 // support the non-streaming use case where we present it the entire
396 // compressed input and a buffer big enough to contain the entire
397 // compressed output. In the case where we don't know the output,
398 // we just make a bigger buffer and try the non-streaming mode
399 // from the beginning again.
400 while (ret != Z_STREAM_END) {
401 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
402 stream_.avail_in = static_cast<uInt>(input_length);
403 stream_.next_out = reinterpret_cast<Bytef*>(output);
404 stream_.avail_out = static_cast<uInt>(output_buffer_length);
405
406 // We know the output size. In this case, we can use Z_FINISH
407 // which is more efficient.
408 ret = inflate(&stream_, Z_FINISH);
409 if (ret == Z_STREAM_END || ret != Z_OK) break;
410
411 // Failure, buffer was too small
412 return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=",
413 input_length, " OutputLength=", output_buffer_length);
414 }
415
416 // Failure for some other reason
417 if (ret != Z_STREAM_END) {
418 return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
419 }
420
421 if (output_length) {
422 *output_length = stream_.total_out;
423 }
424
425 return Status::OK();
426 }
427
428 int64_t MaxCompressedLen(int64_t input_length, const uint8_t* ARROW_ARG_UNUSED(input)) {
429 // Must be in compression mode
430 if (!compressor_initialized_) {
431 Status s = InitCompressor();
432 DCHECK(s.ok());
433 }
434 int64_t max_len = deflateBound(&stream_, static_cast<uLong>(input_length));
435 // ARROW-3514: return a more pessimistic estimate to account for bugs
436 // in old zlib versions.
437 return max_len + 12;
438 }
439
440 Status Compress(int64_t input_length, const uint8_t* input, int64_t output_buffer_len,
441 uint8_t* output, int64_t* output_len) {
442 if (!compressor_initialized_) {
443 RETURN_NOT_OK(InitCompressor());
444 }
445 stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
446 stream_.avail_in = static_cast<uInt>(input_length);
447 stream_.next_out = reinterpret_cast<Bytef*>(output);
448 stream_.avail_out = static_cast<uInt>(output_buffer_len);
449
450 int64_t ret = 0;
451 if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
452 if (ret == Z_OK) {
453 // Will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
454 // small
455 return Status::IOError("zlib deflate failed, output buffer too small");
456 }
457
458 return ZlibErrorPrefix("zlib deflate failed: ", stream_.msg);
459 }
460
461 if (deflateReset(&stream_) != Z_OK) {
462 return ZlibErrorPrefix("zlib deflateReset failed: ", stream_.msg);
463 }
464
465 // Actual output length
466 *output_len = output_buffer_len - stream_.avail_out;
467 return Status::OK();
468 }
469
470 private:
471 // zlib is stateful and the z_stream state variable must be initialized
472 // before
473 z_stream stream_;
474
475 // Realistically, this will always be GZIP, but we leave the option open to
476 // configure
477 GZipCodec::Format format_;
478
479 // These variables are mutually exclusive. When the codec is in "compressor"
480 // state, compressor_initialized_ is true while decompressor_initialized_ is
481 // false. When it's decompressing, the opposite is true.
482 //
483 // Indeed, this is slightly hacky, but the alternative is having separate
484 // Compressor and Decompressor classes. If this ever becomes an issue, we can
485 // perform the refactoring then
486 bool compressor_initialized_;
487 bool decompressor_initialized_;
488};
489
490GZipCodec::GZipCodec(Format format) { impl_.reset(new GZipCodecImpl(format)); }
491
492GZipCodec::~GZipCodec() {}
493
494Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
495 int64_t output_buffer_len, uint8_t* output) {
496 return impl_->Decompress(input_length, input, output_buffer_len, output, nullptr);
497}
498
499Status GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
500 int64_t output_buffer_len, uint8_t* output,
501 int64_t* output_len) {
502 return impl_->Decompress(input_length, input, output_buffer_len, output, output_len);
503}
504
505int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
506 return impl_->MaxCompressedLen(input_length, input);
507}
508
509Status GZipCodec::Compress(int64_t input_length, const uint8_t* input,
510 int64_t output_buffer_len, uint8_t* output,
511 int64_t* output_len) {
512 return impl_->Compress(input_length, input, output_buffer_len, output, output_len);
513}
514
515Status GZipCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
516 return impl_->MakeCompressor(out);
517}
518
519Status GZipCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
520 return impl_->MakeDecompressor(out);
521}
522
523const char* GZipCodec::name() const { return "gzip"; }
524
525} // namespace util
526} // namespace arrow
527