1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <cmath> |
19 | #include <cstdint> |
20 | #include <cstring> |
21 | #include <memory> |
22 | #include <random> |
23 | #include <string> |
24 | #include <vector> |
25 | |
26 | #include <gtest/gtest.h> |
27 | |
28 | #include "arrow/test-util.h" |
29 | #include "arrow/util/compression.h" |
30 | |
31 | using std::string; |
32 | using std::vector; |
33 | |
34 | namespace arrow { |
35 | namespace util { |
36 | |
37 | vector<uint8_t> MakeRandomData(int data_size) { |
38 | vector<uint8_t> data(data_size); |
39 | random_bytes(data_size, 1234, data.data()); |
40 | return data; |
41 | } |
42 | |
43 | vector<uint8_t> MakeCompressibleData(int data_size) { |
44 | std::string base_data = |
45 | "Apache Arrow is a cross-language development platform for in-memory data" ; |
46 | int nrepeats = static_cast<int>(1 + data_size / base_data.size()); |
47 | |
48 | vector<uint8_t> data(base_data.size() * nrepeats); |
49 | for (int i = 0; i < nrepeats; ++i) { |
50 | std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size()); |
51 | } |
52 | data.resize(data_size); |
53 | return data; |
54 | } |
55 | |
56 | // Check roundtrip of one-shot compression and decompression functions. |
57 | |
58 | void CheckCodecRoundtrip(Compression::type ctype, const vector<uint8_t>& data) { |
59 | // create multiple compressors to try to break them |
60 | std::unique_ptr<Codec> c1, c2; |
61 | |
62 | ASSERT_OK(Codec::Create(ctype, &c1)); |
63 | ASSERT_OK(Codec::Create(ctype, &c2)); |
64 | |
65 | int max_compressed_len = |
66 | static_cast<int>(c1->MaxCompressedLen(data.size(), data.data())); |
67 | std::vector<uint8_t> compressed(max_compressed_len); |
68 | std::vector<uint8_t> decompressed(data.size()); |
69 | |
70 | // compress with c1 |
71 | int64_t actual_size; |
72 | ASSERT_OK(c1->Compress(data.size(), data.data(), max_compressed_len, compressed.data(), |
73 | &actual_size)); |
74 | compressed.resize(actual_size); |
75 | |
76 | // decompress with c2 |
77 | ASSERT_OK(c2->Decompress(compressed.size(), compressed.data(), decompressed.size(), |
78 | decompressed.data())); |
79 | |
80 | ASSERT_EQ(data, decompressed); |
81 | |
82 | // decompress with size with c2 |
83 | int64_t actual_decompressed_size; |
84 | ASSERT_OK(c2->Decompress(compressed.size(), compressed.data(), decompressed.size(), |
85 | decompressed.data(), &actual_decompressed_size)); |
86 | |
87 | ASSERT_EQ(data, decompressed); |
88 | ASSERT_EQ(data.size(), actual_decompressed_size); |
89 | |
90 | // compress with c2 |
91 | int64_t actual_size2; |
92 | ASSERT_OK(c2->Compress(data.size(), data.data(), max_compressed_len, compressed.data(), |
93 | &actual_size2)); |
94 | ASSERT_EQ(actual_size2, actual_size); |
95 | |
96 | // decompress with c1 |
97 | ASSERT_OK(c1->Decompress(compressed.size(), compressed.data(), decompressed.size(), |
98 | decompressed.data())); |
99 | |
100 | ASSERT_EQ(data, decompressed); |
101 | |
102 | // decompress with size with c1 |
103 | int64_t actual_decompressed_size2; |
104 | ASSERT_OK(c1->Decompress(compressed.size(), compressed.data(), decompressed.size(), |
105 | decompressed.data(), &actual_decompressed_size2)); |
106 | |
107 | ASSERT_EQ(data, decompressed); |
108 | ASSERT_EQ(data.size(), actual_decompressed_size2); |
109 | } |
110 | |
111 | // Check the streaming compressor against one-shot decompression |
112 | |
113 | void CheckStreamingCompressor(Codec* codec, const vector<uint8_t>& data) { |
114 | std::shared_ptr<Compressor> compressor; |
115 | ASSERT_OK(codec->MakeCompressor(&compressor)); |
116 | |
117 | std::vector<uint8_t> compressed; |
118 | int64_t compressed_size = 0; |
119 | const uint8_t* input = data.data(); |
120 | int64_t remaining = data.size(); |
121 | |
122 | compressed.resize(10); |
123 | bool do_flush = false; |
124 | |
125 | while (remaining > 0) { |
126 | // Feed a small amount each time |
127 | int64_t input_len = std::min(remaining, static_cast<int64_t>(1111)); |
128 | int64_t output_len = compressed.size() - compressed_size; |
129 | uint8_t* output = compressed.data() + compressed_size; |
130 | int64_t bytes_read, bytes_written; |
131 | ASSERT_OK(compressor->Compress(input_len, input, output_len, output, &bytes_read, |
132 | &bytes_written)); |
133 | ASSERT_LE(bytes_read, input_len); |
134 | ASSERT_LE(bytes_written, output_len); |
135 | compressed_size += bytes_written; |
136 | input += bytes_read; |
137 | remaining -= bytes_read; |
138 | if (bytes_read == 0) { |
139 | compressed.resize(compressed.capacity() * 2); |
140 | } |
141 | // Once every two iterations, do a flush |
142 | if (do_flush) { |
143 | bool should_retry = false; |
144 | do { |
145 | output_len = compressed.size() - compressed_size; |
146 | output = compressed.data() + compressed_size; |
147 | ASSERT_OK(compressor->Flush(output_len, output, &bytes_written, &should_retry)); |
148 | ASSERT_LE(bytes_written, output_len); |
149 | compressed_size += bytes_written; |
150 | if (should_retry) { |
151 | compressed.resize(compressed.capacity() * 2); |
152 | } |
153 | } while (should_retry); |
154 | } |
155 | do_flush = !do_flush; |
156 | } |
157 | |
158 | // End the compressed stream |
159 | bool should_retry = false; |
160 | do { |
161 | int64_t output_len = compressed.size() - compressed_size; |
162 | uint8_t* output = compressed.data() + compressed_size; |
163 | int64_t bytes_written; |
164 | ASSERT_OK(compressor->End(output_len, output, &bytes_written, &should_retry)); |
165 | ASSERT_LE(bytes_written, output_len); |
166 | compressed_size += bytes_written; |
167 | if (should_retry) { |
168 | compressed.resize(compressed.capacity() * 2); |
169 | } |
170 | } while (should_retry); |
171 | |
172 | // Check decompressing the compressed data |
173 | std::vector<uint8_t> decompressed(data.size()); |
174 | ASSERT_OK(codec->Decompress(compressed_size, compressed.data(), decompressed.size(), |
175 | decompressed.data())); |
176 | |
177 | ASSERT_EQ(data, decompressed); |
178 | } |
179 | |
180 | // Check the streaming decompressor against one-shot compression |
181 | |
182 | void CheckStreamingDecompressor(Codec* codec, const vector<uint8_t>& data) { |
183 | // Create compressed data |
184 | int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); |
185 | std::vector<uint8_t> compressed(max_compressed_len); |
186 | int64_t compressed_size; |
187 | ASSERT_OK(codec->Compress(data.size(), data.data(), max_compressed_len, |
188 | compressed.data(), &compressed_size)); |
189 | compressed.resize(compressed_size); |
190 | |
191 | // Run streaming decompression |
192 | std::shared_ptr<Decompressor> decompressor; |
193 | ASSERT_OK(codec->MakeDecompressor(&decompressor)); |
194 | |
195 | std::vector<uint8_t> decompressed; |
196 | int64_t decompressed_size = 0; |
197 | const uint8_t* input = compressed.data(); |
198 | int64_t remaining = compressed.size(); |
199 | |
200 | decompressed.resize(10); |
201 | while (!decompressor->IsFinished()) { |
202 | // Feed a small amount each time |
203 | int64_t input_len = std::min(remaining, static_cast<int64_t>(23)); |
204 | int64_t output_len = decompressed.size() - decompressed_size; |
205 | uint8_t* output = decompressed.data() + decompressed_size; |
206 | int64_t bytes_read, bytes_written; |
207 | bool need_more_output; |
208 | ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output, &bytes_read, |
209 | &bytes_written, &need_more_output)); |
210 | ASSERT_LE(bytes_read, input_len); |
211 | ASSERT_LE(bytes_written, output_len); |
212 | ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0) |
213 | << "Decompression not progressing anymore" ; |
214 | if (need_more_output) { |
215 | decompressed.resize(decompressed.capacity() * 2); |
216 | } |
217 | decompressed_size += bytes_written; |
218 | input += bytes_read; |
219 | remaining -= bytes_read; |
220 | } |
221 | ASSERT_TRUE(decompressor->IsFinished()); |
222 | ASSERT_EQ(remaining, 0); |
223 | |
224 | // Check the decompressed data |
225 | decompressed.resize(decompressed_size); |
226 | ASSERT_EQ(data.size(), decompressed_size); |
227 | ASSERT_EQ(data, decompressed); |
228 | } |
229 | |
230 | // Check the streaming compressor and decompressor together |
231 | |
232 | void CheckStreamingRoundtrip(Codec* codec, const vector<uint8_t>& data) { |
233 | std::shared_ptr<Compressor> compressor; |
234 | std::shared_ptr<Decompressor> decompressor; |
235 | ASSERT_OK(codec->MakeCompressor(&compressor)); |
236 | ASSERT_OK(codec->MakeDecompressor(&decompressor)); |
237 | |
238 | std::default_random_engine engine(42); |
239 | std::uniform_int_distribution<int> buf_size_distribution(10, 40); |
240 | |
241 | auto make_buf_size = [&]() -> int64_t { return buf_size_distribution(engine); }; |
242 | |
243 | // Compress... |
244 | |
245 | std::vector<uint8_t> compressed(1); |
246 | int64_t compressed_size = 0; |
247 | { |
248 | const uint8_t* input = data.data(); |
249 | int64_t remaining = data.size(); |
250 | |
251 | while (remaining > 0) { |
252 | // Feed a varying amount each time |
253 | int64_t input_len = std::min(remaining, make_buf_size()); |
254 | int64_t output_len = compressed.size() - compressed_size; |
255 | uint8_t* output = compressed.data() + compressed_size; |
256 | int64_t bytes_read, bytes_written; |
257 | ASSERT_OK(compressor->Compress(input_len, input, output_len, output, &bytes_read, |
258 | &bytes_written)); |
259 | ASSERT_LE(bytes_read, input_len); |
260 | ASSERT_LE(bytes_written, output_len); |
261 | compressed_size += bytes_written; |
262 | input += bytes_read; |
263 | remaining -= bytes_read; |
264 | if (bytes_read == 0) { |
265 | compressed.resize(compressed.capacity() * 2); |
266 | } |
267 | } |
268 | // End the compressed stream |
269 | bool should_retry = false; |
270 | do { |
271 | int64_t output_len = compressed.size() - compressed_size; |
272 | uint8_t* output = compressed.data() + compressed_size; |
273 | int64_t bytes_written; |
274 | ASSERT_OK(compressor->End(output_len, output, &bytes_written, &should_retry)); |
275 | ASSERT_LE(bytes_written, output_len); |
276 | compressed_size += bytes_written; |
277 | if (should_retry) { |
278 | compressed.resize(compressed.capacity() * 2); |
279 | } |
280 | } while (should_retry); |
281 | |
282 | compressed.resize(compressed_size); |
283 | } |
284 | |
285 | // Then decompress... |
286 | |
287 | std::vector<uint8_t> decompressed(2); |
288 | int64_t decompressed_size = 0; |
289 | { |
290 | const uint8_t* input = compressed.data(); |
291 | int64_t remaining = compressed.size(); |
292 | |
293 | while (!decompressor->IsFinished()) { |
294 | // Feed a varying amount each time |
295 | int64_t input_len = std::min(remaining, make_buf_size()); |
296 | int64_t output_len = decompressed.size() - decompressed_size; |
297 | uint8_t* output = decompressed.data() + decompressed_size; |
298 | int64_t bytes_read, bytes_written; |
299 | bool need_more_output; |
300 | ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output, |
301 | &bytes_read, &bytes_written, &need_more_output)); |
302 | ASSERT_LE(bytes_read, input_len); |
303 | ASSERT_LE(bytes_written, output_len); |
304 | ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0) |
305 | << "Decompression not progressing anymore" ; |
306 | if (need_more_output) { |
307 | decompressed.resize(decompressed.capacity() * 2); |
308 | } |
309 | decompressed_size += bytes_written; |
310 | input += bytes_read; |
311 | remaining -= bytes_read; |
312 | } |
313 | ASSERT_EQ(remaining, 0); |
314 | decompressed.resize(decompressed_size); |
315 | } |
316 | |
317 | ASSERT_EQ(data.size(), decompressed.size()); |
318 | ASSERT_EQ(data, decompressed); |
319 | } |
320 | |
321 | class CodecTest : public ::testing::TestWithParam<Compression::type> { |
322 | protected: |
323 | Compression::type GetCompression() { return GetParam(); } |
324 | |
325 | std::unique_ptr<Codec> MakeCodec() { |
326 | std::unique_ptr<Codec> codec; |
327 | ABORT_NOT_OK(Codec::Create(GetCompression(), &codec)); |
328 | return codec; |
329 | } |
330 | }; |
331 | |
332 | TEST_P(CodecTest, CodecRoundtrip) { |
333 | if (GetCompression() == Compression::BZ2) { |
334 | // SKIP: BZ2 doesn't support one-shot compression |
335 | return; |
336 | } |
337 | |
338 | int sizes[] = {0, 10000, 100000}; |
339 | for (int data_size : sizes) { |
340 | vector<uint8_t> data = MakeRandomData(data_size); |
341 | CheckCodecRoundtrip(GetCompression(), data); |
342 | |
343 | data = MakeCompressibleData(data_size); |
344 | CheckCodecRoundtrip(GetCompression(), data); |
345 | } |
346 | } |
347 | |
348 | TEST_P(CodecTest, OutputBufferIsSmall) { |
349 | auto type = GetCompression(); |
350 | if (type != Compression::SNAPPY) { |
351 | return; |
352 | } |
353 | |
354 | std::unique_ptr<Codec> codec; |
355 | ASSERT_OK(Codec::Create(type, &codec)); |
356 | |
357 | vector<uint8_t> data = MakeRandomData(10); |
358 | auto max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); |
359 | std::vector<uint8_t> compressed(max_compressed_len); |
360 | std::vector<uint8_t> decompressed(data.size() - 1); |
361 | |
362 | int64_t actual_size; |
363 | ASSERT_OK(codec->Compress(data.size(), data.data(), max_compressed_len, |
364 | compressed.data(), &actual_size)); |
365 | compressed.resize(actual_size); |
366 | |
367 | int64_t actual_decompressed_size; |
368 | std::stringstream ss; |
369 | ss << "Invalid: Output buffer size (" << decompressed.size() << ") must be " |
370 | << data.size() << " or larger." ; |
371 | ASSERT_RAISES_WITH_MESSAGE( |
372 | Invalid, ss.str(), |
373 | codec->Decompress(compressed.size(), compressed.data(), decompressed.size(), |
374 | decompressed.data(), &actual_decompressed_size)); |
375 | } |
376 | |
377 | TEST_P(CodecTest, StreamingCompressor) { |
378 | if (GetCompression() == Compression::SNAPPY) { |
379 | // SKIP: snappy doesn't support streaming compression |
380 | return; |
381 | } |
382 | if (GetCompression() == Compression::BZ2) { |
383 | // SKIP: BZ2 doesn't support one-shot decompression |
384 | return; |
385 | } |
386 | if (GetCompression() == Compression::LZ4) { |
387 | // SKIP: LZ4 streaming compression uses the LZ4 framing format, |
388 | // which must be tested against a streaming decompressor |
389 | return; |
390 | } |
391 | |
392 | int sizes[] = {0, 10, 100000}; |
393 | for (int data_size : sizes) { |
394 | auto codec = MakeCodec(); |
395 | |
396 | vector<uint8_t> data = MakeRandomData(data_size); |
397 | CheckStreamingCompressor(codec.get(), data); |
398 | |
399 | data = MakeCompressibleData(data_size); |
400 | CheckStreamingCompressor(codec.get(), data); |
401 | } |
402 | } |
403 | |
404 | TEST_P(CodecTest, StreamingDecompressor) { |
405 | if (GetCompression() == Compression::SNAPPY) { |
406 | // SKIP: snappy doesn't support streaming decompression |
407 | return; |
408 | } |
409 | if (GetCompression() == Compression::BZ2) { |
410 | // SKIP: BZ2 doesn't support one-shot compression |
411 | return; |
412 | } |
413 | if (GetCompression() == Compression::LZ4) { |
414 | // SKIP: LZ4 streaming decompression uses the LZ4 framing format, |
415 | // which must be tested against a streaming compressor |
416 | return; |
417 | } |
418 | |
419 | int sizes[] = {0, 10, 100000}; |
420 | for (int data_size : sizes) { |
421 | auto codec = MakeCodec(); |
422 | |
423 | vector<uint8_t> data = MakeRandomData(data_size); |
424 | CheckStreamingDecompressor(codec.get(), data); |
425 | |
426 | data = MakeCompressibleData(data_size); |
427 | CheckStreamingDecompressor(codec.get(), data); |
428 | } |
429 | } |
430 | |
431 | TEST_P(CodecTest, StreamingRoundtrip) { |
432 | if (GetCompression() == Compression::SNAPPY) { |
433 | // SKIP: snappy doesn't support streaming decompression |
434 | return; |
435 | } |
436 | |
437 | int sizes[] = {0, 10, 100000}; |
438 | for (int data_size : sizes) { |
439 | auto codec = MakeCodec(); |
440 | |
441 | vector<uint8_t> data = MakeRandomData(data_size); |
442 | CheckStreamingRoundtrip(codec.get(), data); |
443 | |
444 | data = MakeCompressibleData(data_size); |
445 | CheckStreamingRoundtrip(codec.get(), data); |
446 | } |
447 | } |
448 | |
449 | INSTANTIATE_TEST_CASE_P(TestGZip, CodecTest, ::testing::Values(Compression::GZIP)); |
450 | |
451 | INSTANTIATE_TEST_CASE_P(TestSnappy, CodecTest, ::testing::Values(Compression::SNAPPY)); |
452 | |
453 | INSTANTIATE_TEST_CASE_P(TestLZ4, CodecTest, ::testing::Values(Compression::LZ4)); |
454 | |
455 | INSTANTIATE_TEST_CASE_P(TestBrotli, CodecTest, ::testing::Values(Compression::BROTLI)); |
456 | |
457 | // bz2 requires a binary installation, there is no ExternalProject |
458 | #if ARROW_WITH_BZ2 |
459 | INSTANTIATE_TEST_CASE_P(TestBZ2, CodecTest, ::testing::Values(Compression::BZ2)); |
460 | #endif |
461 | |
462 | // The ExternalProject for zstd does not build on CMake < 3.7, so we do not |
463 | // require it here |
464 | #ifdef ARROW_WITH_ZSTD |
465 | INSTANTIATE_TEST_CASE_P(TestZSTD, CodecTest, ::testing::Values(Compression::ZSTD)); |
466 | #endif |
467 | |
468 | } // namespace util |
469 | } // namespace arrow |
470 | |