1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/io/compressed.h"
19
20#include <algorithm>
21#include <cstring>
22#include <memory>
23#include <mutex>
24#include <string>
25#include <utility>
26
27#include "arrow/buffer.h"
28#include "arrow/memory_pool.h"
29#include "arrow/status.h"
30#include "arrow/util/compression.h"
31#include "arrow/util/logging.h"
32
33namespace arrow {
34
35using util::Codec;
36using util::Compressor;
37using util::Decompressor;
38
39namespace io {
40
41// ----------------------------------------------------------------------
42// CompressedOutputStream implementation
43
44class CompressedOutputStream::Impl {
45 public:
46 Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<OutputStream>& raw)
47 : pool_(pool), raw_(raw), codec_(codec), is_open_(true), compressed_pos_(0) {}
48
49 ~Impl() { DCHECK(Close().ok()); }
50
51 Status Init() {
52 RETURN_NOT_OK(codec_->MakeCompressor(&compressor_));
53 RETURN_NOT_OK(AllocateResizableBuffer(pool_, kChunkSize, &compressed_));
54 compressed_pos_ = 0;
55 return Status::OK();
56 }
57
58 Status Tell(int64_t* position) const {
59 return Status::NotImplemented("Cannot tell() a compressed stream");
60 }
61
62 std::shared_ptr<OutputStream> raw() const { return raw_; }
63
64 Status FlushCompressed() {
65 if (compressed_pos_ > 0) {
66 RETURN_NOT_OK(raw_->Write(compressed_->data(), compressed_pos_));
67 compressed_pos_ = 0;
68 }
69 return Status::OK();
70 }
71
72 Status Write(const void* data, int64_t nbytes) {
73 std::lock_guard<std::mutex> guard(lock_);
74
75 auto input = reinterpret_cast<const uint8_t*>(data);
76 while (nbytes > 0) {
77 int64_t bytes_read, bytes_written;
78 int64_t input_len = nbytes;
79 int64_t output_len = compressed_->size() - compressed_pos_;
80 uint8_t* output = compressed_->mutable_data() + compressed_pos_;
81 RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, output,
82 &bytes_read, &bytes_written));
83 compressed_pos_ += bytes_written;
84
85 if (bytes_read == 0) {
86 // Not enough output, try to flush it and retry
87 if (compressed_pos_ > 0) {
88 RETURN_NOT_OK(FlushCompressed());
89 output_len = compressed_->size() - compressed_pos_;
90 output = compressed_->mutable_data() + compressed_pos_;
91 RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, output,
92 &bytes_read, &bytes_written));
93 compressed_pos_ += bytes_written;
94 }
95 }
96 input += bytes_read;
97 nbytes -= bytes_read;
98 if (compressed_pos_ == compressed_->size()) {
99 // Output buffer full, flush it
100 RETURN_NOT_OK(FlushCompressed());
101 }
102 if (bytes_read == 0) {
103 // Need to enlarge output buffer
104 RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
105 }
106 }
107 return Status::OK();
108 }
109
110 Status Flush() {
111 std::lock_guard<std::mutex> guard(lock_);
112
113 while (true) {
114 // Flush compressor
115 int64_t bytes_written;
116 bool should_retry;
117 int64_t output_len = compressed_->size() - compressed_pos_;
118 uint8_t* output = compressed_->mutable_data() + compressed_pos_;
119 RETURN_NOT_OK(
120 compressor_->Flush(output_len, output, &bytes_written, &should_retry));
121 compressed_pos_ += bytes_written;
122
123 // Flush compressed output
124 RETURN_NOT_OK(FlushCompressed());
125
126 if (should_retry) {
127 // Need to enlarge output buffer
128 RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
129 } else {
130 break;
131 }
132 }
133 return Status::OK();
134 }
135
136 Status FinalizeCompression() {
137 while (true) {
138 // Try to end compressor
139 int64_t bytes_written;
140 bool should_retry;
141 int64_t output_len = compressed_->size() - compressed_pos_;
142 uint8_t* output = compressed_->mutable_data() + compressed_pos_;
143 RETURN_NOT_OK(compressor_->End(output_len, output, &bytes_written, &should_retry));
144 compressed_pos_ += bytes_written;
145
146 // Flush compressed output
147 RETURN_NOT_OK(FlushCompressed());
148
149 if (should_retry) {
150 // Need to enlarge output buffer
151 RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
152 } else {
153 // Done
154 break;
155 }
156 }
157 return Status::OK();
158 }
159
160 Status Close() {
161 std::lock_guard<std::mutex> guard(lock_);
162
163 if (is_open_) {
164 is_open_ = false;
165 RETURN_NOT_OK(FinalizeCompression());
166 return raw_->Close();
167 } else {
168 return Status::OK();
169 }
170 }
171
172 bool closed() {
173 std::lock_guard<std::mutex> guard(lock_);
174 return !is_open_;
175 }
176
177 private:
178 // Write 64 KB compressed data at a time
179 static const int64_t kChunkSize = 64 * 1024;
180
181 MemoryPool* pool_;
182 std::shared_ptr<OutputStream> raw_;
183 Codec* codec_;
184 bool is_open_;
185 std::shared_ptr<Compressor> compressor_;
186 std::shared_ptr<ResizableBuffer> compressed_;
187 int64_t compressed_pos_;
188
189 mutable std::mutex lock_;
190};
191
192Status CompressedOutputStream::Make(util::Codec* codec,
193 const std::shared_ptr<OutputStream>& raw,
194 std::shared_ptr<CompressedOutputStream>* out) {
195 return Make(default_memory_pool(), codec, raw, out);
196}
197
198Status CompressedOutputStream::Make(MemoryPool* pool, util::Codec* codec,
199 const std::shared_ptr<OutputStream>& raw,
200 std::shared_ptr<CompressedOutputStream>* out) {
201 std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
202 res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
203 RETURN_NOT_OK(res->impl_->Init());
204 *out = res;
205 return Status::OK();
206}
207
208CompressedOutputStream::~CompressedOutputStream() {}
209
210Status CompressedOutputStream::Close() { return impl_->Close(); }
211
212bool CompressedOutputStream::closed() const { return impl_->closed(); }
213
214Status CompressedOutputStream::Tell(int64_t* position) const {
215 return impl_->Tell(position);
216}
217
218Status CompressedOutputStream::Write(const void* data, int64_t nbytes) {
219 return impl_->Write(data, nbytes);
220}
221
222Status CompressedOutputStream::Flush() { return impl_->Flush(); }
223
224std::shared_ptr<OutputStream> CompressedOutputStream::raw() const { return impl_->raw(); }
225
226// ----------------------------------------------------------------------
227// CompressedInputStream implementation
228
229class CompressedInputStream::Impl {
230 public:
231 Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw)
232 : pool_(pool), raw_(raw), codec_(codec), is_open_(true) {}
233
234 Status Init() {
235 RETURN_NOT_OK(codec_->MakeDecompressor(&decompressor_));
236 return Status::OK();
237 }
238
239 ~Impl() { DCHECK(Close().ok()); }
240
241 Status Close() {
242 std::lock_guard<std::mutex> guard(lock_);
243 if (is_open_) {
244 is_open_ = false;
245 return raw_->Close();
246 } else {
247 return Status::OK();
248 }
249 }
250
251 bool closed() {
252 std::lock_guard<std::mutex> guard(lock_);
253 return !is_open_;
254 }
255
256 Status Tell(int64_t* position) const {
257 return Status::NotImplemented("Cannot tell() a compressed stream");
258 }
259
260 // Read compressed data if necessary
261 Status EnsureCompressedData() {
262 int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0;
263 if (compressed_avail == 0) {
264 // No compressed data available, read a full chunk
265 RETURN_NOT_OK(raw_->Read(kChunkSize, &compressed_));
266 compressed_pos_ = 0;
267 }
268 return Status::OK();
269 }
270
271 Status DecompressData() {
272 int64_t decompress_size = kDecompressSize;
273
274 while (true) {
275 RETURN_NOT_OK(AllocateResizableBuffer(pool_, decompress_size, &decompressed_));
276 decompressed_pos_ = 0;
277
278 bool need_more_output;
279 int64_t bytes_read, bytes_written;
280 int64_t input_len = compressed_->size() - compressed_pos_;
281 const uint8_t* input = compressed_->data() + compressed_pos_;
282 int64_t output_len = decompressed_->size();
283 uint8_t* output = decompressed_->mutable_data();
284
285 RETURN_NOT_OK(decompressor_->Decompress(input_len, input, output_len, output,
286 &bytes_read, &bytes_written,
287 &need_more_output));
288 compressed_pos_ += bytes_read;
289 if (bytes_written > 0 || !need_more_output || input_len == 0) {
290 RETURN_NOT_OK(decompressed_->Resize(bytes_written));
291 break;
292 }
293 DCHECK_EQ(bytes_written, 0);
294 // Need to enlarge output buffer
295 decompress_size *= 2;
296 }
297 return Status::OK();
298 }
299
300 Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
301 std::lock_guard<std::mutex> guard(lock_);
302
303 *bytes_read = 0;
304 auto out_data = reinterpret_cast<uint8_t*>(out);
305
306 while (nbytes > 0) {
307 int64_t avail = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0;
308 if (avail > 0) {
309 // Pending decompressed data is available, use it
310 avail = std::min(avail, nbytes);
311 memcpy(out_data, decompressed_->data() + decompressed_pos_, avail);
312 decompressed_pos_ += avail;
313 out_data += avail;
314 *bytes_read += avail;
315 nbytes -= avail;
316 if (decompressed_pos_ == decompressed_->size()) {
317 // Decompressed data is exhausted, release buffer
318 decompressed_.reset();
319 }
320 if (nbytes == 0) {
321 // We're done
322 break;
323 }
324 }
325
326 // At this point, no more decompressed data remains,
327 // so we need to decompress more
328 if (decompressor_->IsFinished()) {
329 break;
330 }
331 // First try to read data from the decompressor
332 if (compressed_) {
333 RETURN_NOT_OK(DecompressData());
334 }
335 if (!decompressed_ || decompressed_->size() == 0) {
336 // Got nothing, need to read more compressed data
337 RETURN_NOT_OK(EnsureCompressedData());
338 if (compressed_pos_ == compressed_->size()) {
339 // Compressed stream unexpectedly exhausted
340 return Status::IOError("Truncated compressed stream");
341 }
342 RETURN_NOT_OK(DecompressData());
343 }
344 }
345 return Status::OK();
346 }
347
348 Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
349 std::shared_ptr<ResizableBuffer> buf;
350 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buf));
351 int64_t bytes_read;
352 RETURN_NOT_OK(Read(nbytes, &bytes_read, buf->mutable_data()));
353 RETURN_NOT_OK(buf->Resize(bytes_read));
354 *out = buf;
355 return Status::OK();
356 }
357
358 std::shared_ptr<InputStream> raw() const { return raw_; }
359
360 private:
361 // Read 64 KB compressed data at a time
362 static const int64_t kChunkSize = 64 * 1024;
363 // Decompress 1 MB at a time
364 static const int64_t kDecompressSize = 1024 * 1024;
365
366 MemoryPool* pool_;
367 std::shared_ptr<InputStream> raw_;
368 Codec* codec_;
369 bool is_open_;
370 std::shared_ptr<Decompressor> decompressor_;
371 std::shared_ptr<Buffer> compressed_;
372 int64_t compressed_pos_;
373 std::shared_ptr<ResizableBuffer> decompressed_;
374 int64_t decompressed_pos_;
375
376 mutable std::mutex lock_;
377};
378
379Status CompressedInputStream::Make(Codec* codec, const std::shared_ptr<InputStream>& raw,
380 std::shared_ptr<CompressedInputStream>* out) {
381 return Make(default_memory_pool(), codec, raw, out);
382}
383
384Status CompressedInputStream::Make(MemoryPool* pool, Codec* codec,
385 const std::shared_ptr<InputStream>& raw,
386 std::shared_ptr<CompressedInputStream>* out) {
387 std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
388 res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
389 RETURN_NOT_OK(res->impl_->Init());
390 *out = res;
391 return Status::OK();
392}
393
394CompressedInputStream::~CompressedInputStream() {}
395
396Status CompressedInputStream::Close() { return impl_->Close(); }
397
398bool CompressedInputStream::closed() const { return impl_->closed(); }
399
400Status CompressedInputStream::Tell(int64_t* position) const {
401 return impl_->Tell(position);
402}
403
404Status CompressedInputStream::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
405 return impl_->Read(nbytes, bytes_read, out);
406}
407
408Status CompressedInputStream::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
409 return impl_->Read(nbytes, out);
410}
411
412std::shared_ptr<InputStream> CompressedInputStream::raw() const { return impl_->raw(); }
413
414} // namespace io
415} // namespace arrow
416