1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/util/compression_brotli.h"
19
20#include <cstddef>
21#include <cstdint>
22#include <sstream>
23
24#include <brotli/decode.h>
25#include <brotli/encode.h>
26#include <brotli/types.h>
27
28#include "arrow/status.h"
29#include "arrow/util/logging.h"
30#include "arrow/util/macros.h"
31
32namespace arrow {
33namespace util {
34
35// Brotli compression quality is max (11) by default, which is slow.
36// We use 8 as a default as it is the best trade-off for Parquet workload.
37constexpr int kBrotliDefaultCompressionLevel = 8;
38
39// ----------------------------------------------------------------------
40// Brotli decompressor implementation
41
42class BrotliDecompressor : public Decompressor {
43 public:
44 BrotliDecompressor() {}
45
46 ~BrotliDecompressor() override {
47 if (state_ != nullptr) {
48 BrotliDecoderDestroyInstance(state_);
49 }
50 }
51
52 Status Init() {
53 state_ = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
54 if (state_ == nullptr) {
55 return BrotliError("Brotli init failed");
56 }
57 return Status::OK();
58 }
59
60 Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
61 uint8_t* output, int64_t* bytes_read, int64_t* bytes_written,
62 bool* need_more_output) override {
63 auto avail_in = static_cast<size_t>(input_len);
64 auto avail_out = static_cast<size_t>(output_len);
65 BrotliDecoderResult ret;
66
67 ret = BrotliDecoderDecompressStream(state_, &avail_in, &input, &avail_out, &output,
68 nullptr /* total_out */);
69 if (ret == BROTLI_DECODER_RESULT_ERROR) {
70 return BrotliError(BrotliDecoderGetErrorCode(state_), "Brotli decompress failed: ");
71 }
72 *need_more_output = (ret == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
73 *bytes_read = static_cast<int64_t>(input_len - avail_in);
74 *bytes_written = static_cast<int64_t>(output_len - avail_out);
75 return Status::OK();
76 }
77
78 bool IsFinished() override { return BrotliDecoderIsFinished(state_); }
79
80 protected:
81 Status BrotliError(const char* msg) { return Status::IOError(msg); }
82
83 Status BrotliError(BrotliDecoderErrorCode code, const char* prefix_msg) {
84 return Status::IOError(prefix_msg, BrotliDecoderErrorString(code));
85 }
86
87 BrotliDecoderState* state_ = nullptr;
88};
89
90// ----------------------------------------------------------------------
91// Brotli compressor implementation
92
93class BrotliCompressor : public Compressor {
94 public:
95 BrotliCompressor() {}
96
97 ~BrotliCompressor() override {
98 if (state_ != nullptr) {
99 BrotliEncoderDestroyInstance(state_);
100 }
101 }
102
103 Status Init() {
104 state_ = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
105 if (state_ == nullptr) {
106 return BrotliError("Brotli init failed");
107 }
108 if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY,
109 kBrotliDefaultCompressionLevel)) {
110 return BrotliError("Brotli set compression level failed");
111 }
112 return Status::OK();
113 }
114
115 Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
116 uint8_t* output, int64_t* bytes_read, int64_t* bytes_written) override;
117
118 Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
119 bool* should_retry) override;
120
121 Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
122 bool* should_retry) override;
123
124 protected:
125 Status BrotliError(const char* msg) { return Status::IOError(msg); }
126
127 BrotliEncoderState* state_ = nullptr;
128};
129
130Status BrotliCompressor::Compress(int64_t input_len, const uint8_t* input,
131 int64_t output_len, uint8_t* output,
132 int64_t* bytes_read, int64_t* bytes_written) {
133 auto avail_in = static_cast<size_t>(input_len);
134 auto avail_out = static_cast<size_t>(output_len);
135 BROTLI_BOOL ret;
136
137 ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_PROCESS, &avail_in, &input,
138 &avail_out, &output, nullptr /* total_out */);
139 if (!ret) {
140 return BrotliError("Brotli compress failed");
141 }
142 *bytes_read = static_cast<int64_t>(input_len - avail_in);
143 *bytes_written = static_cast<int64_t>(output_len - avail_out);
144 return Status::OK();
145}
146
147Status BrotliCompressor::Flush(int64_t output_len, uint8_t* output,
148 int64_t* bytes_written, bool* should_retry) {
149 size_t avail_in = 0;
150 const uint8_t* next_in = nullptr;
151 auto avail_out = static_cast<size_t>(output_len);
152 BROTLI_BOOL ret;
153
154 ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_FLUSH, &avail_in, &next_in,
155 &avail_out, &output, nullptr /* total_out */);
156 if (!ret) {
157 return BrotliError("Brotli flush failed");
158 }
159 *bytes_written = static_cast<int64_t>(output_len - avail_out);
160 *should_retry = !!BrotliEncoderHasMoreOutput(state_);
161 return Status::OK();
162}
163
164Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
165 bool* should_retry) {
166 size_t avail_in = 0;
167 const uint8_t* next_in = nullptr;
168 auto avail_out = static_cast<size_t>(output_len);
169 BROTLI_BOOL ret;
170
171 ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_FINISH, &avail_in, &next_in,
172 &avail_out, &output, nullptr /* total_out */);
173 if (!ret) {
174 return BrotliError("Brotli end failed");
175 }
176 *bytes_written = static_cast<int64_t>(output_len - avail_out);
177 *should_retry = !!BrotliEncoderHasMoreOutput(state_);
178 DCHECK_EQ(*should_retry, !BrotliEncoderIsFinished(state_));
179 return Status::OK();
180}
181
182// ----------------------------------------------------------------------
183// Brotli codec implementation
184
185Status BrotliCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
186 auto ptr = std::make_shared<BrotliCompressor>();
187 RETURN_NOT_OK(ptr->Init());
188 *out = ptr;
189 return Status::OK();
190}
191
192Status BrotliCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
193 auto ptr = std::make_shared<BrotliDecompressor>();
194 RETURN_NOT_OK(ptr->Init());
195 *out = ptr;
196 return Status::OK();
197}
198
199Status BrotliCodec::Decompress(int64_t input_len, const uint8_t* input,
200 int64_t output_buffer_len, uint8_t* output_buffer) {
201 return Decompress(input_len, input, output_buffer_len, output_buffer, nullptr);
202}
203
204Status BrotliCodec::Decompress(int64_t input_len, const uint8_t* input,
205 int64_t output_buffer_len, uint8_t* output_buffer,
206 int64_t* output_len) {
207 std::size_t output_size = output_buffer_len;
208 if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) !=
209 BROTLI_DECODER_RESULT_SUCCESS) {
210 return Status::IOError("Corrupt brotli compressed data.");
211 }
212 if (output_len) {
213 *output_len = output_size;
214 }
215 return Status::OK();
216}
217
218int64_t BrotliCodec::MaxCompressedLen(int64_t input_len,
219 const uint8_t* ARROW_ARG_UNUSED(input)) {
220 return BrotliEncoderMaxCompressedSize(input_len);
221}
222
223Status BrotliCodec::Compress(int64_t input_len, const uint8_t* input,
224 int64_t output_buffer_len, uint8_t* output_buffer,
225 int64_t* output_len) {
226 std::size_t output_size = output_buffer_len;
227 if (BrotliEncoderCompress(kBrotliDefaultCompressionLevel, BROTLI_DEFAULT_WINDOW,
228 BROTLI_DEFAULT_MODE, input_len, input, &output_size,
229 output_buffer) == BROTLI_FALSE) {
230 return Status::IOError("Brotli compression failure.");
231 }
232 *output_len = output_size;
233 return Status::OK();
234}
235
236} // namespace util
237} // namespace arrow
238