1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <cmath>
19#include <cstdint>
20#include <cstring>
21#include <memory>
22#include <random>
23#include <string>
24#include <vector>
25
26#include <gtest/gtest.h>
27
28#include "arrow/test-util.h"
29#include "arrow/util/compression.h"
30
31using std::string;
32using std::vector;
33
34namespace arrow {
35namespace util {
36
37vector<uint8_t> MakeRandomData(int data_size) {
38 vector<uint8_t> data(data_size);
39 random_bytes(data_size, 1234, data.data());
40 return data;
41}
42
43vector<uint8_t> MakeCompressibleData(int data_size) {
44 std::string base_data =
45 "Apache Arrow is a cross-language development platform for in-memory data";
46 int nrepeats = static_cast<int>(1 + data_size / base_data.size());
47
48 vector<uint8_t> data(base_data.size() * nrepeats);
49 for (int i = 0; i < nrepeats; ++i) {
50 std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size());
51 }
52 data.resize(data_size);
53 return data;
54}
55
56// Check roundtrip of one-shot compression and decompression functions.
57
58void CheckCodecRoundtrip(Compression::type ctype, const vector<uint8_t>& data) {
59 // create multiple compressors to try to break them
60 std::unique_ptr<Codec> c1, c2;
61
62 ASSERT_OK(Codec::Create(ctype, &c1));
63 ASSERT_OK(Codec::Create(ctype, &c2));
64
65 int max_compressed_len =
66 static_cast<int>(c1->MaxCompressedLen(data.size(), data.data()));
67 std::vector<uint8_t> compressed(max_compressed_len);
68 std::vector<uint8_t> decompressed(data.size());
69
70 // compress with c1
71 int64_t actual_size;
72 ASSERT_OK(c1->Compress(data.size(), data.data(), max_compressed_len, compressed.data(),
73 &actual_size));
74 compressed.resize(actual_size);
75
76 // decompress with c2
77 ASSERT_OK(c2->Decompress(compressed.size(), compressed.data(), decompressed.size(),
78 decompressed.data()));
79
80 ASSERT_EQ(data, decompressed);
81
82 // decompress with size with c2
83 int64_t actual_decompressed_size;
84 ASSERT_OK(c2->Decompress(compressed.size(), compressed.data(), decompressed.size(),
85 decompressed.data(), &actual_decompressed_size));
86
87 ASSERT_EQ(data, decompressed);
88 ASSERT_EQ(data.size(), actual_decompressed_size);
89
90 // compress with c2
91 int64_t actual_size2;
92 ASSERT_OK(c2->Compress(data.size(), data.data(), max_compressed_len, compressed.data(),
93 &actual_size2));
94 ASSERT_EQ(actual_size2, actual_size);
95
96 // decompress with c1
97 ASSERT_OK(c1->Decompress(compressed.size(), compressed.data(), decompressed.size(),
98 decompressed.data()));
99
100 ASSERT_EQ(data, decompressed);
101
102 // decompress with size with c1
103 int64_t actual_decompressed_size2;
104 ASSERT_OK(c1->Decompress(compressed.size(), compressed.data(), decompressed.size(),
105 decompressed.data(), &actual_decompressed_size2));
106
107 ASSERT_EQ(data, decompressed);
108 ASSERT_EQ(data.size(), actual_decompressed_size2);
109}
110
111// Check the streaming compressor against one-shot decompression
112
113void CheckStreamingCompressor(Codec* codec, const vector<uint8_t>& data) {
114 std::shared_ptr<Compressor> compressor;
115 ASSERT_OK(codec->MakeCompressor(&compressor));
116
117 std::vector<uint8_t> compressed;
118 int64_t compressed_size = 0;
119 const uint8_t* input = data.data();
120 int64_t remaining = data.size();
121
122 compressed.resize(10);
123 bool do_flush = false;
124
125 while (remaining > 0) {
126 // Feed a small amount each time
127 int64_t input_len = std::min(remaining, static_cast<int64_t>(1111));
128 int64_t output_len = compressed.size() - compressed_size;
129 uint8_t* output = compressed.data() + compressed_size;
130 int64_t bytes_read, bytes_written;
131 ASSERT_OK(compressor->Compress(input_len, input, output_len, output, &bytes_read,
132 &bytes_written));
133 ASSERT_LE(bytes_read, input_len);
134 ASSERT_LE(bytes_written, output_len);
135 compressed_size += bytes_written;
136 input += bytes_read;
137 remaining -= bytes_read;
138 if (bytes_read == 0) {
139 compressed.resize(compressed.capacity() * 2);
140 }
141 // Once every two iterations, do a flush
142 if (do_flush) {
143 bool should_retry = false;
144 do {
145 output_len = compressed.size() - compressed_size;
146 output = compressed.data() + compressed_size;
147 ASSERT_OK(compressor->Flush(output_len, output, &bytes_written, &should_retry));
148 ASSERT_LE(bytes_written, output_len);
149 compressed_size += bytes_written;
150 if (should_retry) {
151 compressed.resize(compressed.capacity() * 2);
152 }
153 } while (should_retry);
154 }
155 do_flush = !do_flush;
156 }
157
158 // End the compressed stream
159 bool should_retry = false;
160 do {
161 int64_t output_len = compressed.size() - compressed_size;
162 uint8_t* output = compressed.data() + compressed_size;
163 int64_t bytes_written;
164 ASSERT_OK(compressor->End(output_len, output, &bytes_written, &should_retry));
165 ASSERT_LE(bytes_written, output_len);
166 compressed_size += bytes_written;
167 if (should_retry) {
168 compressed.resize(compressed.capacity() * 2);
169 }
170 } while (should_retry);
171
172 // Check decompressing the compressed data
173 std::vector<uint8_t> decompressed(data.size());
174 ASSERT_OK(codec->Decompress(compressed_size, compressed.data(), decompressed.size(),
175 decompressed.data()));
176
177 ASSERT_EQ(data, decompressed);
178}
179
180// Check the streaming decompressor against one-shot compression
181
182void CheckStreamingDecompressor(Codec* codec, const vector<uint8_t>& data) {
183 // Create compressed data
184 int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
185 std::vector<uint8_t> compressed(max_compressed_len);
186 int64_t compressed_size;
187 ASSERT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
188 compressed.data(), &compressed_size));
189 compressed.resize(compressed_size);
190
191 // Run streaming decompression
192 std::shared_ptr<Decompressor> decompressor;
193 ASSERT_OK(codec->MakeDecompressor(&decompressor));
194
195 std::vector<uint8_t> decompressed;
196 int64_t decompressed_size = 0;
197 const uint8_t* input = compressed.data();
198 int64_t remaining = compressed.size();
199
200 decompressed.resize(10);
201 while (!decompressor->IsFinished()) {
202 // Feed a small amount each time
203 int64_t input_len = std::min(remaining, static_cast<int64_t>(23));
204 int64_t output_len = decompressed.size() - decompressed_size;
205 uint8_t* output = decompressed.data() + decompressed_size;
206 int64_t bytes_read, bytes_written;
207 bool need_more_output;
208 ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output, &bytes_read,
209 &bytes_written, &need_more_output));
210 ASSERT_LE(bytes_read, input_len);
211 ASSERT_LE(bytes_written, output_len);
212 ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0)
213 << "Decompression not progressing anymore";
214 if (need_more_output) {
215 decompressed.resize(decompressed.capacity() * 2);
216 }
217 decompressed_size += bytes_written;
218 input += bytes_read;
219 remaining -= bytes_read;
220 }
221 ASSERT_TRUE(decompressor->IsFinished());
222 ASSERT_EQ(remaining, 0);
223
224 // Check the decompressed data
225 decompressed.resize(decompressed_size);
226 ASSERT_EQ(data.size(), decompressed_size);
227 ASSERT_EQ(data, decompressed);
228}
229
230// Check the streaming compressor and decompressor together
231
232void CheckStreamingRoundtrip(Codec* codec, const vector<uint8_t>& data) {
233 std::shared_ptr<Compressor> compressor;
234 std::shared_ptr<Decompressor> decompressor;
235 ASSERT_OK(codec->MakeCompressor(&compressor));
236 ASSERT_OK(codec->MakeDecompressor(&decompressor));
237
238 std::default_random_engine engine(42);
239 std::uniform_int_distribution<int> buf_size_distribution(10, 40);
240
241 auto make_buf_size = [&]() -> int64_t { return buf_size_distribution(engine); };
242
243 // Compress...
244
245 std::vector<uint8_t> compressed(1);
246 int64_t compressed_size = 0;
247 {
248 const uint8_t* input = data.data();
249 int64_t remaining = data.size();
250
251 while (remaining > 0) {
252 // Feed a varying amount each time
253 int64_t input_len = std::min(remaining, make_buf_size());
254 int64_t output_len = compressed.size() - compressed_size;
255 uint8_t* output = compressed.data() + compressed_size;
256 int64_t bytes_read, bytes_written;
257 ASSERT_OK(compressor->Compress(input_len, input, output_len, output, &bytes_read,
258 &bytes_written));
259 ASSERT_LE(bytes_read, input_len);
260 ASSERT_LE(bytes_written, output_len);
261 compressed_size += bytes_written;
262 input += bytes_read;
263 remaining -= bytes_read;
264 if (bytes_read == 0) {
265 compressed.resize(compressed.capacity() * 2);
266 }
267 }
268 // End the compressed stream
269 bool should_retry = false;
270 do {
271 int64_t output_len = compressed.size() - compressed_size;
272 uint8_t* output = compressed.data() + compressed_size;
273 int64_t bytes_written;
274 ASSERT_OK(compressor->End(output_len, output, &bytes_written, &should_retry));
275 ASSERT_LE(bytes_written, output_len);
276 compressed_size += bytes_written;
277 if (should_retry) {
278 compressed.resize(compressed.capacity() * 2);
279 }
280 } while (should_retry);
281
282 compressed.resize(compressed_size);
283 }
284
285 // Then decompress...
286
287 std::vector<uint8_t> decompressed(2);
288 int64_t decompressed_size = 0;
289 {
290 const uint8_t* input = compressed.data();
291 int64_t remaining = compressed.size();
292
293 while (!decompressor->IsFinished()) {
294 // Feed a varying amount each time
295 int64_t input_len = std::min(remaining, make_buf_size());
296 int64_t output_len = decompressed.size() - decompressed_size;
297 uint8_t* output = decompressed.data() + decompressed_size;
298 int64_t bytes_read, bytes_written;
299 bool need_more_output;
300 ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output,
301 &bytes_read, &bytes_written, &need_more_output));
302 ASSERT_LE(bytes_read, input_len);
303 ASSERT_LE(bytes_written, output_len);
304 ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0)
305 << "Decompression not progressing anymore";
306 if (need_more_output) {
307 decompressed.resize(decompressed.capacity() * 2);
308 }
309 decompressed_size += bytes_written;
310 input += bytes_read;
311 remaining -= bytes_read;
312 }
313 ASSERT_EQ(remaining, 0);
314 decompressed.resize(decompressed_size);
315 }
316
317 ASSERT_EQ(data.size(), decompressed.size());
318 ASSERT_EQ(data, decompressed);
319}
320
321class CodecTest : public ::testing::TestWithParam<Compression::type> {
322 protected:
323 Compression::type GetCompression() { return GetParam(); }
324
325 std::unique_ptr<Codec> MakeCodec() {
326 std::unique_ptr<Codec> codec;
327 ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
328 return codec;
329 }
330};
331
332TEST_P(CodecTest, CodecRoundtrip) {
333 if (GetCompression() == Compression::BZ2) {
334 // SKIP: BZ2 doesn't support one-shot compression
335 return;
336 }
337
338 int sizes[] = {0, 10000, 100000};
339 for (int data_size : sizes) {
340 vector<uint8_t> data = MakeRandomData(data_size);
341 CheckCodecRoundtrip(GetCompression(), data);
342
343 data = MakeCompressibleData(data_size);
344 CheckCodecRoundtrip(GetCompression(), data);
345 }
346}
347
348TEST_P(CodecTest, OutputBufferIsSmall) {
349 auto type = GetCompression();
350 if (type != Compression::SNAPPY) {
351 return;
352 }
353
354 std::unique_ptr<Codec> codec;
355 ASSERT_OK(Codec::Create(type, &codec));
356
357 vector<uint8_t> data = MakeRandomData(10);
358 auto max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
359 std::vector<uint8_t> compressed(max_compressed_len);
360 std::vector<uint8_t> decompressed(data.size() - 1);
361
362 int64_t actual_size;
363 ASSERT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
364 compressed.data(), &actual_size));
365 compressed.resize(actual_size);
366
367 int64_t actual_decompressed_size;
368 std::stringstream ss;
369 ss << "Invalid: Output buffer size (" << decompressed.size() << ") must be "
370 << data.size() << " or larger.";
371 ASSERT_RAISES_WITH_MESSAGE(
372 Invalid, ss.str(),
373 codec->Decompress(compressed.size(), compressed.data(), decompressed.size(),
374 decompressed.data(), &actual_decompressed_size));
375}
376
377TEST_P(CodecTest, StreamingCompressor) {
378 if (GetCompression() == Compression::SNAPPY) {
379 // SKIP: snappy doesn't support streaming compression
380 return;
381 }
382 if (GetCompression() == Compression::BZ2) {
383 // SKIP: BZ2 doesn't support one-shot decompression
384 return;
385 }
386 if (GetCompression() == Compression::LZ4) {
387 // SKIP: LZ4 streaming compression uses the LZ4 framing format,
388 // which must be tested against a streaming decompressor
389 return;
390 }
391
392 int sizes[] = {0, 10, 100000};
393 for (int data_size : sizes) {
394 auto codec = MakeCodec();
395
396 vector<uint8_t> data = MakeRandomData(data_size);
397 CheckStreamingCompressor(codec.get(), data);
398
399 data = MakeCompressibleData(data_size);
400 CheckStreamingCompressor(codec.get(), data);
401 }
402}
403
404TEST_P(CodecTest, StreamingDecompressor) {
405 if (GetCompression() == Compression::SNAPPY) {
406 // SKIP: snappy doesn't support streaming decompression
407 return;
408 }
409 if (GetCompression() == Compression::BZ2) {
410 // SKIP: BZ2 doesn't support one-shot compression
411 return;
412 }
413 if (GetCompression() == Compression::LZ4) {
414 // SKIP: LZ4 streaming decompression uses the LZ4 framing format,
415 // which must be tested against a streaming compressor
416 return;
417 }
418
419 int sizes[] = {0, 10, 100000};
420 for (int data_size : sizes) {
421 auto codec = MakeCodec();
422
423 vector<uint8_t> data = MakeRandomData(data_size);
424 CheckStreamingDecompressor(codec.get(), data);
425
426 data = MakeCompressibleData(data_size);
427 CheckStreamingDecompressor(codec.get(), data);
428 }
429}
430
431TEST_P(CodecTest, StreamingRoundtrip) {
432 if (GetCompression() == Compression::SNAPPY) {
433 // SKIP: snappy doesn't support streaming decompression
434 return;
435 }
436
437 int sizes[] = {0, 10, 100000};
438 for (int data_size : sizes) {
439 auto codec = MakeCodec();
440
441 vector<uint8_t> data = MakeRandomData(data_size);
442 CheckStreamingRoundtrip(codec.get(), data);
443
444 data = MakeCompressibleData(data_size);
445 CheckStreamingRoundtrip(codec.get(), data);
446 }
447}
448
449INSTANTIATE_TEST_CASE_P(TestGZip, CodecTest, ::testing::Values(Compression::GZIP));
450
451INSTANTIATE_TEST_CASE_P(TestSnappy, CodecTest, ::testing::Values(Compression::SNAPPY));
452
453INSTANTIATE_TEST_CASE_P(TestLZ4, CodecTest, ::testing::Values(Compression::LZ4));
454
455INSTANTIATE_TEST_CASE_P(TestBrotli, CodecTest, ::testing::Values(Compression::BROTLI));
456
457// bz2 requires a binary installation, there is no ExternalProject
458#if ARROW_WITH_BZ2
459INSTANTIATE_TEST_CASE_P(TestBZ2, CodecTest, ::testing::Values(Compression::BZ2));
460#endif
461
462// The ExternalProject for zstd does not build on CMake < 3.7, so we do not
463// require it here
464#ifdef ARROW_WITH_ZSTD
465INSTANTIATE_TEST_CASE_P(TestZSTD, CodecTest, ::testing::Values(Compression::ZSTD));
466#endif
467
468} // namespace util
469} // namespace arrow
470