1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <memory> |
19 | #include <random> |
20 | #include <string> |
21 | #include <vector> |
22 | |
23 | #include <gtest/gtest.h> |
24 | |
25 | #include "arrow/buffer.h" |
26 | #include "arrow/io/compressed.h" |
27 | #include "arrow/io/memory.h" |
28 | #include "arrow/io/test-common.h" |
29 | #include "arrow/status.h" |
30 | #include "arrow/test-util.h" |
31 | #include "arrow/util/compression.h" |
32 | |
33 | namespace arrow { |
34 | namespace io { |
35 | |
36 | using ::arrow::util::Codec; |
37 | |
38 | #ifdef ARROW_VALGRIND |
39 | // Avoid slowing down tests too much with Valgrind |
40 | static constexpr int64_t RANDOM_DATA_SIZE = 50 * 1024; |
41 | static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 120 * 1024; |
42 | #else |
43 | // The data should be large enough to exercise internal buffers |
44 | static constexpr int64_t RANDOM_DATA_SIZE = 3 * 1024 * 1024; |
45 | static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 8 * 1024 * 1024; |
46 | #endif |
47 | |
48 | std::vector<uint8_t> MakeRandomData(int data_size) { |
49 | std::vector<uint8_t> data(data_size); |
50 | random_bytes(data_size, 1234, data.data()); |
51 | return data; |
52 | } |
53 | |
54 | std::vector<uint8_t> MakeCompressibleData(int data_size) { |
55 | std::string base_data = |
56 | "Apache Arrow is a cross-language development platform for in-memory data" ; |
57 | int nrepeats = static_cast<int>(1 + data_size / base_data.size()); |
58 | |
59 | std::vector<uint8_t> data(base_data.size() * nrepeats); |
60 | for (int i = 0; i < nrepeats; ++i) { |
61 | std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size()); |
62 | } |
63 | data.resize(data_size); |
64 | return data; |
65 | } |
66 | |
67 | std::shared_ptr<Buffer> CompressDataOneShot(Codec* codec, |
68 | const std::vector<uint8_t>& data) { |
69 | int64_t max_compressed_len, compressed_len; |
70 | max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); |
71 | std::shared_ptr<ResizableBuffer> compressed; |
72 | ABORT_NOT_OK(AllocateResizableBuffer(max_compressed_len, &compressed)); |
73 | ABORT_NOT_OK(codec->Compress(data.size(), data.data(), max_compressed_len, |
74 | compressed->mutable_data(), &compressed_len)); |
75 | ABORT_NOT_OK(compressed->Resize(compressed_len)); |
76 | return std::move(compressed); |
77 | } |
78 | |
79 | Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> compressed, |
80 | std::vector<uint8_t>* out) { |
81 | // Create compressed input stream |
82 | auto buffer_reader = std::make_shared<BufferReader>(compressed); |
83 | std::shared_ptr<CompressedInputStream> stream; |
84 | RETURN_NOT_OK(CompressedInputStream::Make(codec, buffer_reader, &stream)); |
85 | |
86 | std::vector<uint8_t> decompressed; |
87 | int64_t decompressed_size = 0; |
88 | const int64_t chunk_size = 1111; |
89 | while (true) { |
90 | std::shared_ptr<Buffer> buf; |
91 | RETURN_NOT_OK(stream->Read(chunk_size, &buf)); |
92 | if (buf->size() == 0) { |
93 | // EOF |
94 | break; |
95 | } |
96 | decompressed.resize(decompressed_size + buf->size()); |
97 | memcpy(decompressed.data() + decompressed_size, buf->data(), buf->size()); |
98 | decompressed_size += buf->size(); |
99 | } |
100 | *out = std::move(decompressed); |
101 | return Status::OK(); |
102 | } |
103 | |
104 | void CheckCompressedInputStream(Codec* codec, const std::vector<uint8_t>& data) { |
105 | // Create compressed data |
106 | auto compressed = CompressDataOneShot(codec, data); |
107 | |
108 | std::vector<uint8_t> decompressed; |
109 | ASSERT_OK(RunCompressedInputStream(codec, compressed, &decompressed)); |
110 | |
111 | ASSERT_EQ(decompressed.size(), data.size()); |
112 | ASSERT_EQ(decompressed, data); |
113 | } |
114 | |
115 | void CheckCompressedOutputStream(Codec* codec, const std::vector<uint8_t>& data, |
116 | bool do_flush) { |
117 | // Create compressed output stream |
118 | std::shared_ptr<BufferOutputStream> buffer_writer; |
119 | ASSERT_OK(BufferOutputStream::Create(1024, default_memory_pool(), &buffer_writer)); |
120 | std::shared_ptr<CompressedOutputStream> stream; |
121 | ASSERT_OK(CompressedOutputStream::Make(codec, buffer_writer, &stream)); |
122 | |
123 | const uint8_t* input = data.data(); |
124 | int64_t input_len = data.size(); |
125 | const int64_t chunk_size = 1111; |
126 | while (input_len > 0) { |
127 | int64_t nbytes = std::min(chunk_size, input_len); |
128 | ASSERT_OK(stream->Write(input, nbytes)); |
129 | input += nbytes; |
130 | input_len -= nbytes; |
131 | if (do_flush) { |
132 | ASSERT_OK(stream->Flush()); |
133 | } |
134 | } |
135 | ASSERT_OK(stream->Close()); |
136 | |
137 | // Get compressed data and decompress it |
138 | std::shared_ptr<Buffer> compressed; |
139 | ASSERT_OK(buffer_writer->Finish(&compressed)); |
140 | std::vector<uint8_t> decompressed(data.size()); |
141 | ASSERT_OK(codec->Decompress(compressed->size(), compressed->data(), decompressed.size(), |
142 | decompressed.data())); |
143 | ASSERT_EQ(decompressed, data); |
144 | } |
145 | |
146 | class CompressedInputStreamTest : public ::testing::TestWithParam<Compression::type> { |
147 | protected: |
148 | Compression::type GetCompression() { return GetParam(); } |
149 | |
150 | std::unique_ptr<Codec> MakeCodec() { |
151 | std::unique_ptr<Codec> codec; |
152 | ABORT_NOT_OK(Codec::Create(GetCompression(), &codec)); |
153 | return codec; |
154 | } |
155 | }; |
156 | |
157 | TEST_P(CompressedInputStreamTest, CompressibleData) { |
158 | auto codec = MakeCodec(); |
159 | auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE); |
160 | |
161 | CheckCompressedInputStream(codec.get(), data); |
162 | } |
163 | |
164 | TEST_P(CompressedInputStreamTest, RandomData) { |
165 | auto codec = MakeCodec(); |
166 | auto data = MakeRandomData(RANDOM_DATA_SIZE); |
167 | |
168 | CheckCompressedInputStream(codec.get(), data); |
169 | } |
170 | |
171 | TEST_P(CompressedInputStreamTest, TruncatedData) { |
172 | auto codec = MakeCodec(); |
173 | auto data = MakeRandomData(10000); |
174 | auto compressed = CompressDataOneShot(codec.get(), data); |
175 | auto truncated = SliceBuffer(compressed, 0, compressed->size() - 3); |
176 | |
177 | std::vector<uint8_t> decompressed; |
178 | ASSERT_RAISES(IOError, RunCompressedInputStream(codec.get(), truncated, &decompressed)); |
179 | } |
180 | |
181 | TEST_P(CompressedInputStreamTest, InvalidData) { |
182 | auto codec = MakeCodec(); |
183 | auto compressed_data = MakeRandomData(100); |
184 | |
185 | auto buffer_reader = std::make_shared<BufferReader>(Buffer::Wrap(compressed_data)); |
186 | std::shared_ptr<CompressedInputStream> stream; |
187 | ASSERT_OK(CompressedInputStream::Make(codec.get(), buffer_reader, &stream)); |
188 | std::shared_ptr<Buffer> out_buf; |
189 | ASSERT_RAISES(IOError, stream->Read(1024, &out_buf)); |
190 | } |
191 | |
192 | // NOTE: Snappy doesn't support streaming decompression |
193 | |
194 | // NOTE: BZ2 doesn't support one-shot compression |
195 | |
196 | // NOTE: LZ4 streaming decompression uses the LZ4 framing format, |
197 | // which must be tested against a streaming compressor |
198 | |
199 | INSTANTIATE_TEST_CASE_P(TestGZipInputStream, CompressedInputStreamTest, |
200 | ::testing::Values(Compression::GZIP)); |
201 | |
202 | INSTANTIATE_TEST_CASE_P(TestBrotliInputStream, CompressedInputStreamTest, |
203 | ::testing::Values(Compression::BROTLI)); |
204 | |
205 | #ifdef ARROW_WITH_ZSTD |
206 | INSTANTIATE_TEST_CASE_P(TestZSTDInputStream, CompressedInputStreamTest, |
207 | ::testing::Values(Compression::ZSTD)); |
208 | #endif |
209 | |
210 | class CompressedOutputStreamTest : public ::testing::TestWithParam<Compression::type> { |
211 | protected: |
212 | Compression::type GetCompression() { return GetParam(); } |
213 | |
214 | std::unique_ptr<Codec> MakeCodec() { |
215 | std::unique_ptr<Codec> codec; |
216 | ABORT_NOT_OK(Codec::Create(GetCompression(), &codec)); |
217 | return codec; |
218 | } |
219 | }; |
220 | |
221 | TEST_P(CompressedOutputStreamTest, CompressibleData) { |
222 | auto codec = MakeCodec(); |
223 | auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE); |
224 | |
225 | CheckCompressedOutputStream(codec.get(), data, false /* do_flush */); |
226 | CheckCompressedOutputStream(codec.get(), data, true /* do_flush */); |
227 | } |
228 | |
229 | TEST_P(CompressedOutputStreamTest, RandomData) { |
230 | auto codec = MakeCodec(); |
231 | auto data = MakeRandomData(RANDOM_DATA_SIZE); |
232 | |
233 | CheckCompressedOutputStream(codec.get(), data, false /* do_flush */); |
234 | CheckCompressedOutputStream(codec.get(), data, true /* do_flush */); |
235 | } |
236 | |
237 | INSTANTIATE_TEST_CASE_P(TestGZipOutputStream, CompressedOutputStreamTest, |
238 | ::testing::Values(Compression::GZIP)); |
239 | |
240 | INSTANTIATE_TEST_CASE_P(TestBrotliOutputStream, CompressedOutputStreamTest, |
241 | ::testing::Values(Compression::BROTLI)); |
242 | |
243 | #ifdef ARROW_WITH_ZSTD |
244 | INSTANTIATE_TEST_CASE_P(TestZSTDOutputStream, CompressedOutputStreamTest, |
245 | ::testing::Values(Compression::ZSTD)); |
246 | #endif |
247 | |
248 | } // namespace io |
249 | } // namespace arrow |
250 | |