1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/util/compression_lz4.h"
19
20#include <cstdint>
21#include <cstring>
22#include <sstream>
23
24#include <lz4.h>
25#include <lz4frame.h>
26
27#include "arrow/status.h"
28#include "arrow/util/logging.h"
29#include "arrow/util/macros.h"
30
31namespace arrow {
32namespace util {
33
34static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
35 return Status::IOError(prefix_msg, LZ4F_getErrorName(ret));
36}
37
38// ----------------------------------------------------------------------
39// Lz4 decompressor implementation
40
41class LZ4Decompressor : public Decompressor {
42 public:
43 LZ4Decompressor() {}
44
45 ~LZ4Decompressor() override {
46 if (ctx_ != nullptr) {
47 ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
48 }
49 }
50
51 Status Init() {
52 LZ4F_errorCode_t ret;
53 finished_ = false;
54
55 ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
56 if (LZ4F_isError(ret)) {
57 return LZ4Error(ret, "LZ4 init failed: ");
58 } else {
59 return Status::OK();
60 }
61 }
62
63 Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
64 uint8_t* output, int64_t* bytes_read, int64_t* bytes_written,
65 bool* need_more_output) override {
66 auto src = input;
67 auto dst = output;
68 auto srcSize = static_cast<size_t>(input_len);
69 auto dstCapacity = static_cast<size_t>(output_len);
70 size_t ret;
71
72 ret = LZ4F_decompress(ctx_, dst, &dstCapacity, src, &srcSize, nullptr /* options */);
73 if (LZ4F_isError(ret)) {
74 return LZ4Error(ret, "LZ4 decompress failed: ");
75 }
76 *bytes_read = static_cast<int64_t>(srcSize);
77 *bytes_written = static_cast<int64_t>(dstCapacity);
78 *need_more_output = (*bytes_read == 0 && *bytes_written == 0);
79 finished_ = (ret == 0);
80 return Status::OK();
81 }
82
83 bool IsFinished() override { return finished_; }
84
85 protected:
86 LZ4F_dctx* ctx_ = nullptr;
87 bool finished_;
88};
89
90// ----------------------------------------------------------------------
91// Lz4 compressor implementation
92
93class LZ4Compressor : public Compressor {
94 public:
95 LZ4Compressor() {}
96
97 ~LZ4Compressor() override {
98 if (ctx_ != nullptr) {
99 ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_));
100 }
101 }
102
103 Status Init() {
104 LZ4F_errorCode_t ret;
105 memset(&prefs_, 0, sizeof(prefs_));
106 first_time_ = true;
107
108 ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
109 if (LZ4F_isError(ret)) {
110 return LZ4Error(ret, "LZ4 init failed: ");
111 } else {
112 return Status::OK();
113 }
114 }
115
116 Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
117 uint8_t* output, int64_t* bytes_read, int64_t* bytes_written) override;
118
119 Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
120 bool* should_retry) override;
121
122 Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
123 bool* should_retry) override;
124
125 protected:
126 LZ4F_cctx* ctx_ = nullptr;
127 LZ4F_preferences_t prefs_;
128 bool first_time_;
129};
130
131#define BEGIN_COMPRESS(dst, dstCapacity) \
132 if (first_time_) { \
133 if (dstCapacity < LZ4F_HEADER_SIZE_MAX) { \
134 /* Output too small to write LZ4F header */ \
135 return Status::OK(); \
136 } \
137 ret = LZ4F_compressBegin(ctx_, dst, dstCapacity, &prefs_); \
138 if (LZ4F_isError(ret)) { \
139 return LZ4Error(ret, "LZ4 compress begin failed: "); \
140 } \
141 first_time_ = false; \
142 dst += ret; \
143 dstCapacity -= ret; \
144 *bytes_written += static_cast<int64_t>(ret); \
145 }
146
147Status LZ4Compressor::Compress(int64_t input_len, const uint8_t* input,
148 int64_t output_len, uint8_t* output, int64_t* bytes_read,
149 int64_t* bytes_written) {
150 auto src = input;
151 auto dst = output;
152 auto srcSize = static_cast<size_t>(input_len);
153 auto dstCapacity = static_cast<size_t>(output_len);
154 size_t ret;
155
156 *bytes_read = 0;
157 *bytes_written = 0;
158
159 BEGIN_COMPRESS(dst, dstCapacity);
160
161 if (dstCapacity < LZ4F_compressBound(srcSize, &prefs_)) {
162 // Output too small to compress into
163 return Status::OK();
164 }
165 ret = LZ4F_compressUpdate(ctx_, dst, dstCapacity, src, srcSize, nullptr /* options */);
166 if (LZ4F_isError(ret)) {
167 return LZ4Error(ret, "LZ4 compress update failed: ");
168 }
169 *bytes_read = input_len;
170 *bytes_written += static_cast<int64_t>(ret);
171 DCHECK_LE(*bytes_written, output_len);
172 return Status::OK();
173}
174
175Status LZ4Compressor::Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
176 bool* should_retry) {
177 auto dst = output;
178 auto dstCapacity = static_cast<size_t>(output_len);
179 size_t ret;
180
181 *bytes_written = 0;
182 *should_retry = true;
183
184 BEGIN_COMPRESS(dst, dstCapacity);
185
186 if (dstCapacity < LZ4F_compressBound(0, &prefs_)) {
187 // Output too small to flush into
188 return Status::OK();
189 }
190
191 ret = LZ4F_flush(ctx_, dst, dstCapacity, nullptr /* options */);
192 if (LZ4F_isError(ret)) {
193 return LZ4Error(ret, "LZ4 flush failed: ");
194 }
195 *bytes_written += static_cast<int64_t>(ret);
196 *should_retry = false;
197 DCHECK_LE(*bytes_written, output_len);
198 return Status::OK();
199}
200
201Status LZ4Compressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
202 bool* should_retry) {
203 auto dst = output;
204 auto dstCapacity = static_cast<size_t>(output_len);
205 size_t ret;
206
207 *bytes_written = 0;
208 *should_retry = true;
209
210 BEGIN_COMPRESS(dst, dstCapacity);
211
212 if (dstCapacity < LZ4F_compressBound(0, &prefs_)) {
213 // Output too small to end frame into
214 return Status::OK();
215 }
216
217 ret = LZ4F_compressEnd(ctx_, dst, dstCapacity, nullptr /* options */);
218 if (LZ4F_isError(ret)) {
219 return LZ4Error(ret, "LZ4 end failed: ");
220 }
221 *bytes_written += static_cast<int64_t>(ret);
222 *should_retry = false;
223 DCHECK_LE(*bytes_written, output_len);
224 return Status::OK();
225}
226
227#undef BEGIN_COMPRESS
228
229// ----------------------------------------------------------------------
230// Lz4 codec implementation
231
232Status Lz4Codec::MakeCompressor(std::shared_ptr<Compressor>* out) {
233 auto ptr = std::make_shared<LZ4Compressor>();
234 RETURN_NOT_OK(ptr->Init());
235 *out = ptr;
236 return Status::OK();
237}
238
239Status Lz4Codec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
240 auto ptr = std::make_shared<LZ4Decompressor>();
241 RETURN_NOT_OK(ptr->Init());
242 *out = ptr;
243 return Status::OK();
244}
245
246Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input,
247 int64_t output_buffer_len, uint8_t* output_buffer) {
248 return Decompress(input_len, input, output_buffer_len, output_buffer, nullptr);
249}
250
251Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input,
252 int64_t output_buffer_len, uint8_t* output_buffer,
253 int64_t* output_len) {
254 int64_t decompressed_size = LZ4_decompress_safe(
255 reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
256 static_cast<int>(input_len), static_cast<int>(output_buffer_len));
257 if (decompressed_size < 0) {
258 return Status::IOError("Corrupt Lz4 compressed data.");
259 }
260 if (output_len) {
261 *output_len = decompressed_size;
262 }
263 return Status::OK();
264}
265
266int64_t Lz4Codec::MaxCompressedLen(int64_t input_len,
267 const uint8_t* ARROW_ARG_UNUSED(input)) {
268 return LZ4_compressBound(static_cast<int>(input_len));
269}
270
271Status Lz4Codec::Compress(int64_t input_len, const uint8_t* input,
272 int64_t output_buffer_len, uint8_t* output_buffer,
273 int64_t* output_len) {
274 *output_len = LZ4_compress_default(
275 reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
276 static_cast<int>(input_len), static_cast<int>(output_buffer_len));
277 if (*output_len == 0) {
278 return Status::IOError("Lz4 compression failure.");
279 }
280 return Status::OK();
281}
282
283} // namespace util
284} // namespace arrow
285