1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/util/compression_lz4.h" |
19 | |
20 | #include <cstdint> |
21 | #include <cstring> |
22 | #include <sstream> |
23 | |
24 | #include <lz4.h> |
25 | #include <lz4frame.h> |
26 | |
27 | #include "arrow/status.h" |
28 | #include "arrow/util/logging.h" |
29 | #include "arrow/util/macros.h" |
30 | |
31 | namespace arrow { |
32 | namespace util { |
33 | |
34 | static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) { |
35 | return Status::IOError(prefix_msg, LZ4F_getErrorName(ret)); |
36 | } |
37 | |
38 | // ---------------------------------------------------------------------- |
39 | // Lz4 decompressor implementation |
40 | |
41 | class LZ4Decompressor : public Decompressor { |
42 | public: |
43 | LZ4Decompressor() {} |
44 | |
45 | ~LZ4Decompressor() override { |
46 | if (ctx_ != nullptr) { |
47 | ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_)); |
48 | } |
49 | } |
50 | |
51 | Status Init() { |
52 | LZ4F_errorCode_t ret; |
53 | finished_ = false; |
54 | |
55 | ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); |
56 | if (LZ4F_isError(ret)) { |
57 | return LZ4Error(ret, "LZ4 init failed: " ); |
58 | } else { |
59 | return Status::OK(); |
60 | } |
61 | } |
62 | |
63 | Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, |
64 | uint8_t* output, int64_t* bytes_read, int64_t* bytes_written, |
65 | bool* need_more_output) override { |
66 | auto src = input; |
67 | auto dst = output; |
68 | auto srcSize = static_cast<size_t>(input_len); |
69 | auto dstCapacity = static_cast<size_t>(output_len); |
70 | size_t ret; |
71 | |
72 | ret = LZ4F_decompress(ctx_, dst, &dstCapacity, src, &srcSize, nullptr /* options */); |
73 | if (LZ4F_isError(ret)) { |
74 | return LZ4Error(ret, "LZ4 decompress failed: " ); |
75 | } |
76 | *bytes_read = static_cast<int64_t>(srcSize); |
77 | *bytes_written = static_cast<int64_t>(dstCapacity); |
78 | *need_more_output = (*bytes_read == 0 && *bytes_written == 0); |
79 | finished_ = (ret == 0); |
80 | return Status::OK(); |
81 | } |
82 | |
83 | bool IsFinished() override { return finished_; } |
84 | |
85 | protected: |
86 | LZ4F_dctx* ctx_ = nullptr; |
87 | bool finished_; |
88 | }; |
89 | |
90 | // ---------------------------------------------------------------------- |
91 | // Lz4 compressor implementation |
92 | |
93 | class LZ4Compressor : public Compressor { |
94 | public: |
95 | LZ4Compressor() {} |
96 | |
97 | ~LZ4Compressor() override { |
98 | if (ctx_ != nullptr) { |
99 | ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_)); |
100 | } |
101 | } |
102 | |
103 | Status Init() { |
104 | LZ4F_errorCode_t ret; |
105 | memset(&prefs_, 0, sizeof(prefs_)); |
106 | first_time_ = true; |
107 | |
108 | ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); |
109 | if (LZ4F_isError(ret)) { |
110 | return LZ4Error(ret, "LZ4 init failed: " ); |
111 | } else { |
112 | return Status::OK(); |
113 | } |
114 | } |
115 | |
116 | Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len, |
117 | uint8_t* output, int64_t* bytes_read, int64_t* bytes_written) override; |
118 | |
119 | Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
120 | bool* should_retry) override; |
121 | |
122 | Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
123 | bool* should_retry) override; |
124 | |
125 | protected: |
126 | LZ4F_cctx* ctx_ = nullptr; |
127 | LZ4F_preferences_t prefs_; |
128 | bool first_time_; |
129 | }; |
130 | |
131 | #define BEGIN_COMPRESS(dst, dstCapacity) \ |
132 | if (first_time_) { \ |
133 | if (dstCapacity < LZ4F_HEADER_SIZE_MAX) { \ |
134 | /* Output too small to write LZ4F header */ \ |
135 | return Status::OK(); \ |
136 | } \ |
137 | ret = LZ4F_compressBegin(ctx_, dst, dstCapacity, &prefs_); \ |
138 | if (LZ4F_isError(ret)) { \ |
139 | return LZ4Error(ret, "LZ4 compress begin failed: "); \ |
140 | } \ |
141 | first_time_ = false; \ |
142 | dst += ret; \ |
143 | dstCapacity -= ret; \ |
144 | *bytes_written += static_cast<int64_t>(ret); \ |
145 | } |
146 | |
147 | Status LZ4Compressor::Compress(int64_t input_len, const uint8_t* input, |
148 | int64_t output_len, uint8_t* output, int64_t* bytes_read, |
149 | int64_t* bytes_written) { |
150 | auto src = input; |
151 | auto dst = output; |
152 | auto srcSize = static_cast<size_t>(input_len); |
153 | auto dstCapacity = static_cast<size_t>(output_len); |
154 | size_t ret; |
155 | |
156 | *bytes_read = 0; |
157 | *bytes_written = 0; |
158 | |
159 | BEGIN_COMPRESS(dst, dstCapacity); |
160 | |
161 | if (dstCapacity < LZ4F_compressBound(srcSize, &prefs_)) { |
162 | // Output too small to compress into |
163 | return Status::OK(); |
164 | } |
165 | ret = LZ4F_compressUpdate(ctx_, dst, dstCapacity, src, srcSize, nullptr /* options */); |
166 | if (LZ4F_isError(ret)) { |
167 | return LZ4Error(ret, "LZ4 compress update failed: " ); |
168 | } |
169 | *bytes_read = input_len; |
170 | *bytes_written += static_cast<int64_t>(ret); |
171 | DCHECK_LE(*bytes_written, output_len); |
172 | return Status::OK(); |
173 | } |
174 | |
175 | Status LZ4Compressor::Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
176 | bool* should_retry) { |
177 | auto dst = output; |
178 | auto dstCapacity = static_cast<size_t>(output_len); |
179 | size_t ret; |
180 | |
181 | *bytes_written = 0; |
182 | *should_retry = true; |
183 | |
184 | BEGIN_COMPRESS(dst, dstCapacity); |
185 | |
186 | if (dstCapacity < LZ4F_compressBound(0, &prefs_)) { |
187 | // Output too small to flush into |
188 | return Status::OK(); |
189 | } |
190 | |
191 | ret = LZ4F_flush(ctx_, dst, dstCapacity, nullptr /* options */); |
192 | if (LZ4F_isError(ret)) { |
193 | return LZ4Error(ret, "LZ4 flush failed: " ); |
194 | } |
195 | *bytes_written += static_cast<int64_t>(ret); |
196 | *should_retry = false; |
197 | DCHECK_LE(*bytes_written, output_len); |
198 | return Status::OK(); |
199 | } |
200 | |
201 | Status LZ4Compressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_written, |
202 | bool* should_retry) { |
203 | auto dst = output; |
204 | auto dstCapacity = static_cast<size_t>(output_len); |
205 | size_t ret; |
206 | |
207 | *bytes_written = 0; |
208 | *should_retry = true; |
209 | |
210 | BEGIN_COMPRESS(dst, dstCapacity); |
211 | |
212 | if (dstCapacity < LZ4F_compressBound(0, &prefs_)) { |
213 | // Output too small to end frame into |
214 | return Status::OK(); |
215 | } |
216 | |
217 | ret = LZ4F_compressEnd(ctx_, dst, dstCapacity, nullptr /* options */); |
218 | if (LZ4F_isError(ret)) { |
219 | return LZ4Error(ret, "LZ4 end failed: " ); |
220 | } |
221 | *bytes_written += static_cast<int64_t>(ret); |
222 | *should_retry = false; |
223 | DCHECK_LE(*bytes_written, output_len); |
224 | return Status::OK(); |
225 | } |
226 | |
227 | #undef BEGIN_COMPRESS |
228 | |
229 | // ---------------------------------------------------------------------- |
230 | // Lz4 codec implementation |
231 | |
232 | Status Lz4Codec::MakeCompressor(std::shared_ptr<Compressor>* out) { |
233 | auto ptr = std::make_shared<LZ4Compressor>(); |
234 | RETURN_NOT_OK(ptr->Init()); |
235 | *out = ptr; |
236 | return Status::OK(); |
237 | } |
238 | |
239 | Status Lz4Codec::MakeDecompressor(std::shared_ptr<Decompressor>* out) { |
240 | auto ptr = std::make_shared<LZ4Decompressor>(); |
241 | RETURN_NOT_OK(ptr->Init()); |
242 | *out = ptr; |
243 | return Status::OK(); |
244 | } |
245 | |
246 | Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, |
247 | int64_t output_buffer_len, uint8_t* output_buffer) { |
248 | return Decompress(input_len, input, output_buffer_len, output_buffer, nullptr); |
249 | } |
250 | |
251 | Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, |
252 | int64_t output_buffer_len, uint8_t* output_buffer, |
253 | int64_t* output_len) { |
254 | int64_t decompressed_size = LZ4_decompress_safe( |
255 | reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), |
256 | static_cast<int>(input_len), static_cast<int>(output_buffer_len)); |
257 | if (decompressed_size < 0) { |
258 | return Status::IOError("Corrupt Lz4 compressed data." ); |
259 | } |
260 | if (output_len) { |
261 | *output_len = decompressed_size; |
262 | } |
263 | return Status::OK(); |
264 | } |
265 | |
266 | int64_t Lz4Codec::MaxCompressedLen(int64_t input_len, |
267 | const uint8_t* ARROW_ARG_UNUSED(input)) { |
268 | return LZ4_compressBound(static_cast<int>(input_len)); |
269 | } |
270 | |
271 | Status Lz4Codec::Compress(int64_t input_len, const uint8_t* input, |
272 | int64_t output_buffer_len, uint8_t* output_buffer, |
273 | int64_t* output_len) { |
274 | *output_len = LZ4_compress_default( |
275 | reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), |
276 | static_cast<int>(input_len), static_cast<int>(output_buffer_len)); |
277 | if (*output_len == 0) { |
278 | return Status::IOError("Lz4 compression failure." ); |
279 | } |
280 | return Status::OK(); |
281 | } |
282 | |
283 | } // namespace util |
284 | } // namespace arrow |
285 | |