1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <memory>
19#include <random>
20#include <string>
21#include <vector>
22
23#include <gtest/gtest.h>
24
25#include "arrow/buffer.h"
26#include "arrow/io/compressed.h"
27#include "arrow/io/memory.h"
28#include "arrow/io/test-common.h"
29#include "arrow/status.h"
30#include "arrow/test-util.h"
31#include "arrow/util/compression.h"
32
33namespace arrow {
34namespace io {
35
36using ::arrow::util::Codec;
37
38#ifdef ARROW_VALGRIND
39// Avoid slowing down tests too much with Valgrind
40static constexpr int64_t RANDOM_DATA_SIZE = 50 * 1024;
41static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 120 * 1024;
42#else
43// The data should be large enough to exercise internal buffers
44static constexpr int64_t RANDOM_DATA_SIZE = 3 * 1024 * 1024;
45static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 8 * 1024 * 1024;
46#endif
47
48std::vector<uint8_t> MakeRandomData(int data_size) {
49 std::vector<uint8_t> data(data_size);
50 random_bytes(data_size, 1234, data.data());
51 return data;
52}
53
54std::vector<uint8_t> MakeCompressibleData(int data_size) {
55 std::string base_data =
56 "Apache Arrow is a cross-language development platform for in-memory data";
57 int nrepeats = static_cast<int>(1 + data_size / base_data.size());
58
59 std::vector<uint8_t> data(base_data.size() * nrepeats);
60 for (int i = 0; i < nrepeats; ++i) {
61 std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size());
62 }
63 data.resize(data_size);
64 return data;
65}
66
67std::shared_ptr<Buffer> CompressDataOneShot(Codec* codec,
68 const std::vector<uint8_t>& data) {
69 int64_t max_compressed_len, compressed_len;
70 max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
71 std::shared_ptr<ResizableBuffer> compressed;
72 ABORT_NOT_OK(AllocateResizableBuffer(max_compressed_len, &compressed));
73 ABORT_NOT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
74 compressed->mutable_data(), &compressed_len));
75 ABORT_NOT_OK(compressed->Resize(compressed_len));
76 return std::move(compressed);
77}
78
79Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> compressed,
80 std::vector<uint8_t>* out) {
81 // Create compressed input stream
82 auto buffer_reader = std::make_shared<BufferReader>(compressed);
83 std::shared_ptr<CompressedInputStream> stream;
84 RETURN_NOT_OK(CompressedInputStream::Make(codec, buffer_reader, &stream));
85
86 std::vector<uint8_t> decompressed;
87 int64_t decompressed_size = 0;
88 const int64_t chunk_size = 1111;
89 while (true) {
90 std::shared_ptr<Buffer> buf;
91 RETURN_NOT_OK(stream->Read(chunk_size, &buf));
92 if (buf->size() == 0) {
93 // EOF
94 break;
95 }
96 decompressed.resize(decompressed_size + buf->size());
97 memcpy(decompressed.data() + decompressed_size, buf->data(), buf->size());
98 decompressed_size += buf->size();
99 }
100 *out = std::move(decompressed);
101 return Status::OK();
102}
103
104void CheckCompressedInputStream(Codec* codec, const std::vector<uint8_t>& data) {
105 // Create compressed data
106 auto compressed = CompressDataOneShot(codec, data);
107
108 std::vector<uint8_t> decompressed;
109 ASSERT_OK(RunCompressedInputStream(codec, compressed, &decompressed));
110
111 ASSERT_EQ(decompressed.size(), data.size());
112 ASSERT_EQ(decompressed, data);
113}
114
115void CheckCompressedOutputStream(Codec* codec, const std::vector<uint8_t>& data,
116 bool do_flush) {
117 // Create compressed output stream
118 std::shared_ptr<BufferOutputStream> buffer_writer;
119 ASSERT_OK(BufferOutputStream::Create(1024, default_memory_pool(), &buffer_writer));
120 std::shared_ptr<CompressedOutputStream> stream;
121 ASSERT_OK(CompressedOutputStream::Make(codec, buffer_writer, &stream));
122
123 const uint8_t* input = data.data();
124 int64_t input_len = data.size();
125 const int64_t chunk_size = 1111;
126 while (input_len > 0) {
127 int64_t nbytes = std::min(chunk_size, input_len);
128 ASSERT_OK(stream->Write(input, nbytes));
129 input += nbytes;
130 input_len -= nbytes;
131 if (do_flush) {
132 ASSERT_OK(stream->Flush());
133 }
134 }
135 ASSERT_OK(stream->Close());
136
137 // Get compressed data and decompress it
138 std::shared_ptr<Buffer> compressed;
139 ASSERT_OK(buffer_writer->Finish(&compressed));
140 std::vector<uint8_t> decompressed(data.size());
141 ASSERT_OK(codec->Decompress(compressed->size(), compressed->data(), decompressed.size(),
142 decompressed.data()));
143 ASSERT_EQ(decompressed, data);
144}
145
146class CompressedInputStreamTest : public ::testing::TestWithParam<Compression::type> {
147 protected:
148 Compression::type GetCompression() { return GetParam(); }
149
150 std::unique_ptr<Codec> MakeCodec() {
151 std::unique_ptr<Codec> codec;
152 ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
153 return codec;
154 }
155};
156
157TEST_P(CompressedInputStreamTest, CompressibleData) {
158 auto codec = MakeCodec();
159 auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
160
161 CheckCompressedInputStream(codec.get(), data);
162}
163
164TEST_P(CompressedInputStreamTest, RandomData) {
165 auto codec = MakeCodec();
166 auto data = MakeRandomData(RANDOM_DATA_SIZE);
167
168 CheckCompressedInputStream(codec.get(), data);
169}
170
171TEST_P(CompressedInputStreamTest, TruncatedData) {
172 auto codec = MakeCodec();
173 auto data = MakeRandomData(10000);
174 auto compressed = CompressDataOneShot(codec.get(), data);
175 auto truncated = SliceBuffer(compressed, 0, compressed->size() - 3);
176
177 std::vector<uint8_t> decompressed;
178 ASSERT_RAISES(IOError, RunCompressedInputStream(codec.get(), truncated, &decompressed));
179}
180
181TEST_P(CompressedInputStreamTest, InvalidData) {
182 auto codec = MakeCodec();
183 auto compressed_data = MakeRandomData(100);
184
185 auto buffer_reader = std::make_shared<BufferReader>(Buffer::Wrap(compressed_data));
186 std::shared_ptr<CompressedInputStream> stream;
187 ASSERT_OK(CompressedInputStream::Make(codec.get(), buffer_reader, &stream));
188 std::shared_ptr<Buffer> out_buf;
189 ASSERT_RAISES(IOError, stream->Read(1024, &out_buf));
190}
191
192// NOTE: Snappy doesn't support streaming decompression
193
194// NOTE: BZ2 doesn't support one-shot compression
195
196// NOTE: LZ4 streaming decompression uses the LZ4 framing format,
197// which must be tested against a streaming compressor
198
199INSTANTIATE_TEST_CASE_P(TestGZipInputStream, CompressedInputStreamTest,
200 ::testing::Values(Compression::GZIP));
201
202INSTANTIATE_TEST_CASE_P(TestBrotliInputStream, CompressedInputStreamTest,
203 ::testing::Values(Compression::BROTLI));
204
205#ifdef ARROW_WITH_ZSTD
206INSTANTIATE_TEST_CASE_P(TestZSTDInputStream, CompressedInputStreamTest,
207 ::testing::Values(Compression::ZSTD));
208#endif
209
210class CompressedOutputStreamTest : public ::testing::TestWithParam<Compression::type> {
211 protected:
212 Compression::type GetCompression() { return GetParam(); }
213
214 std::unique_ptr<Codec> MakeCodec() {
215 std::unique_ptr<Codec> codec;
216 ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
217 return codec;
218 }
219};
220
221TEST_P(CompressedOutputStreamTest, CompressibleData) {
222 auto codec = MakeCodec();
223 auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
224
225 CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
226 CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
227}
228
229TEST_P(CompressedOutputStreamTest, RandomData) {
230 auto codec = MakeCodec();
231 auto data = MakeRandomData(RANDOM_DATA_SIZE);
232
233 CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
234 CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
235}
236
237INSTANTIATE_TEST_CASE_P(TestGZipOutputStream, CompressedOutputStreamTest,
238 ::testing::Values(Compression::GZIP));
239
240INSTANTIATE_TEST_CASE_P(TestBrotliOutputStream, CompressedOutputStreamTest,
241 ::testing::Values(Compression::BROTLI));
242
243#ifdef ARROW_WITH_ZSTD
244INSTANTIATE_TEST_CASE_P(TestZSTDOutputStream, CompressedOutputStreamTest,
245 ::testing::Values(Compression::ZSTD));
246#endif
247
248} // namespace io
249} // namespace arrow
250