1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/io/compressed.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstring> |
22 | #include <memory> |
23 | #include <mutex> |
24 | #include <string> |
25 | #include <utility> |
26 | |
27 | #include "arrow/buffer.h" |
28 | #include "arrow/memory_pool.h" |
29 | #include "arrow/status.h" |
30 | #include "arrow/util/compression.h" |
31 | #include "arrow/util/logging.h" |
32 | |
33 | namespace arrow { |
34 | |
35 | using util::Codec; |
36 | using util::Compressor; |
37 | using util::Decompressor; |
38 | |
39 | namespace io { |
40 | |
41 | // ---------------------------------------------------------------------- |
42 | // CompressedOutputStream implementation |
43 | |
44 | class CompressedOutputStream::Impl { |
45 | public: |
46 | Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<OutputStream>& raw) |
47 | : pool_(pool), raw_(raw), codec_(codec), is_open_(true), compressed_pos_(0) {} |
48 | |
49 | ~Impl() { DCHECK(Close().ok()); } |
50 | |
51 | Status Init() { |
52 | RETURN_NOT_OK(codec_->MakeCompressor(&compressor_)); |
53 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, kChunkSize, &compressed_)); |
54 | compressed_pos_ = 0; |
55 | return Status::OK(); |
56 | } |
57 | |
58 | Status Tell(int64_t* position) const { |
59 | return Status::NotImplemented("Cannot tell() a compressed stream" ); |
60 | } |
61 | |
62 | std::shared_ptr<OutputStream> raw() const { return raw_; } |
63 | |
64 | Status FlushCompressed() { |
65 | if (compressed_pos_ > 0) { |
66 | RETURN_NOT_OK(raw_->Write(compressed_->data(), compressed_pos_)); |
67 | compressed_pos_ = 0; |
68 | } |
69 | return Status::OK(); |
70 | } |
71 | |
72 | Status Write(const void* data, int64_t nbytes) { |
73 | std::lock_guard<std::mutex> guard(lock_); |
74 | |
75 | auto input = reinterpret_cast<const uint8_t*>(data); |
76 | while (nbytes > 0) { |
77 | int64_t bytes_read, bytes_written; |
78 | int64_t input_len = nbytes; |
79 | int64_t output_len = compressed_->size() - compressed_pos_; |
80 | uint8_t* output = compressed_->mutable_data() + compressed_pos_; |
81 | RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, output, |
82 | &bytes_read, &bytes_written)); |
83 | compressed_pos_ += bytes_written; |
84 | |
85 | if (bytes_read == 0) { |
86 | // Not enough output, try to flush it and retry |
87 | if (compressed_pos_ > 0) { |
88 | RETURN_NOT_OK(FlushCompressed()); |
89 | output_len = compressed_->size() - compressed_pos_; |
90 | output = compressed_->mutable_data() + compressed_pos_; |
91 | RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, output, |
92 | &bytes_read, &bytes_written)); |
93 | compressed_pos_ += bytes_written; |
94 | } |
95 | } |
96 | input += bytes_read; |
97 | nbytes -= bytes_read; |
98 | if (compressed_pos_ == compressed_->size()) { |
99 | // Output buffer full, flush it |
100 | RETURN_NOT_OK(FlushCompressed()); |
101 | } |
102 | if (bytes_read == 0) { |
103 | // Need to enlarge output buffer |
104 | RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2)); |
105 | } |
106 | } |
107 | return Status::OK(); |
108 | } |
109 | |
110 | Status Flush() { |
111 | std::lock_guard<std::mutex> guard(lock_); |
112 | |
113 | while (true) { |
114 | // Flush compressor |
115 | int64_t bytes_written; |
116 | bool should_retry; |
117 | int64_t output_len = compressed_->size() - compressed_pos_; |
118 | uint8_t* output = compressed_->mutable_data() + compressed_pos_; |
119 | RETURN_NOT_OK( |
120 | compressor_->Flush(output_len, output, &bytes_written, &should_retry)); |
121 | compressed_pos_ += bytes_written; |
122 | |
123 | // Flush compressed output |
124 | RETURN_NOT_OK(FlushCompressed()); |
125 | |
126 | if (should_retry) { |
127 | // Need to enlarge output buffer |
128 | RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2)); |
129 | } else { |
130 | break; |
131 | } |
132 | } |
133 | return Status::OK(); |
134 | } |
135 | |
136 | Status FinalizeCompression() { |
137 | while (true) { |
138 | // Try to end compressor |
139 | int64_t bytes_written; |
140 | bool should_retry; |
141 | int64_t output_len = compressed_->size() - compressed_pos_; |
142 | uint8_t* output = compressed_->mutable_data() + compressed_pos_; |
143 | RETURN_NOT_OK(compressor_->End(output_len, output, &bytes_written, &should_retry)); |
144 | compressed_pos_ += bytes_written; |
145 | |
146 | // Flush compressed output |
147 | RETURN_NOT_OK(FlushCompressed()); |
148 | |
149 | if (should_retry) { |
150 | // Need to enlarge output buffer |
151 | RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2)); |
152 | } else { |
153 | // Done |
154 | break; |
155 | } |
156 | } |
157 | return Status::OK(); |
158 | } |
159 | |
160 | Status Close() { |
161 | std::lock_guard<std::mutex> guard(lock_); |
162 | |
163 | if (is_open_) { |
164 | is_open_ = false; |
165 | RETURN_NOT_OK(FinalizeCompression()); |
166 | return raw_->Close(); |
167 | } else { |
168 | return Status::OK(); |
169 | } |
170 | } |
171 | |
172 | bool closed() { |
173 | std::lock_guard<std::mutex> guard(lock_); |
174 | return !is_open_; |
175 | } |
176 | |
177 | private: |
178 | // Write 64 KB compressed data at a time |
179 | static const int64_t kChunkSize = 64 * 1024; |
180 | |
181 | MemoryPool* pool_; |
182 | std::shared_ptr<OutputStream> raw_; |
183 | Codec* codec_; |
184 | bool is_open_; |
185 | std::shared_ptr<Compressor> compressor_; |
186 | std::shared_ptr<ResizableBuffer> compressed_; |
187 | int64_t compressed_pos_; |
188 | |
189 | mutable std::mutex lock_; |
190 | }; |
191 | |
192 | Status CompressedOutputStream::Make(util::Codec* codec, |
193 | const std::shared_ptr<OutputStream>& raw, |
194 | std::shared_ptr<CompressedOutputStream>* out) { |
195 | return Make(default_memory_pool(), codec, raw, out); |
196 | } |
197 | |
198 | Status CompressedOutputStream::Make(MemoryPool* pool, util::Codec* codec, |
199 | const std::shared_ptr<OutputStream>& raw, |
200 | std::shared_ptr<CompressedOutputStream>* out) { |
201 | std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream); |
202 | res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw))); |
203 | RETURN_NOT_OK(res->impl_->Init()); |
204 | *out = res; |
205 | return Status::OK(); |
206 | } |
207 | |
208 | CompressedOutputStream::~CompressedOutputStream() {} |
209 | |
210 | Status CompressedOutputStream::Close() { return impl_->Close(); } |
211 | |
212 | bool CompressedOutputStream::closed() const { return impl_->closed(); } |
213 | |
214 | Status CompressedOutputStream::Tell(int64_t* position) const { |
215 | return impl_->Tell(position); |
216 | } |
217 | |
218 | Status CompressedOutputStream::Write(const void* data, int64_t nbytes) { |
219 | return impl_->Write(data, nbytes); |
220 | } |
221 | |
222 | Status CompressedOutputStream::Flush() { return impl_->Flush(); } |
223 | |
224 | std::shared_ptr<OutputStream> CompressedOutputStream::raw() const { return impl_->raw(); } |
225 | |
226 | // ---------------------------------------------------------------------- |
227 | // CompressedInputStream implementation |
228 | |
229 | class CompressedInputStream::Impl { |
230 | public: |
231 | Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw) |
232 | : pool_(pool), raw_(raw), codec_(codec), is_open_(true) {} |
233 | |
234 | Status Init() { |
235 | RETURN_NOT_OK(codec_->MakeDecompressor(&decompressor_)); |
236 | return Status::OK(); |
237 | } |
238 | |
239 | ~Impl() { DCHECK(Close().ok()); } |
240 | |
241 | Status Close() { |
242 | std::lock_guard<std::mutex> guard(lock_); |
243 | if (is_open_) { |
244 | is_open_ = false; |
245 | return raw_->Close(); |
246 | } else { |
247 | return Status::OK(); |
248 | } |
249 | } |
250 | |
251 | bool closed() { |
252 | std::lock_guard<std::mutex> guard(lock_); |
253 | return !is_open_; |
254 | } |
255 | |
256 | Status Tell(int64_t* position) const { |
257 | return Status::NotImplemented("Cannot tell() a compressed stream" ); |
258 | } |
259 | |
260 | // Read compressed data if necessary |
261 | Status EnsureCompressedData() { |
262 | int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; |
263 | if (compressed_avail == 0) { |
264 | // No compressed data available, read a full chunk |
265 | RETURN_NOT_OK(raw_->Read(kChunkSize, &compressed_)); |
266 | compressed_pos_ = 0; |
267 | } |
268 | return Status::OK(); |
269 | } |
270 | |
271 | Status DecompressData() { |
272 | int64_t decompress_size = kDecompressSize; |
273 | |
274 | while (true) { |
275 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, decompress_size, &decompressed_)); |
276 | decompressed_pos_ = 0; |
277 | |
278 | bool need_more_output; |
279 | int64_t bytes_read, bytes_written; |
280 | int64_t input_len = compressed_->size() - compressed_pos_; |
281 | const uint8_t* input = compressed_->data() + compressed_pos_; |
282 | int64_t output_len = decompressed_->size(); |
283 | uint8_t* output = decompressed_->mutable_data(); |
284 | |
285 | RETURN_NOT_OK(decompressor_->Decompress(input_len, input, output_len, output, |
286 | &bytes_read, &bytes_written, |
287 | &need_more_output)); |
288 | compressed_pos_ += bytes_read; |
289 | if (bytes_written > 0 || !need_more_output || input_len == 0) { |
290 | RETURN_NOT_OK(decompressed_->Resize(bytes_written)); |
291 | break; |
292 | } |
293 | DCHECK_EQ(bytes_written, 0); |
294 | // Need to enlarge output buffer |
295 | decompress_size *= 2; |
296 | } |
297 | return Status::OK(); |
298 | } |
299 | |
300 | Status Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
301 | std::lock_guard<std::mutex> guard(lock_); |
302 | |
303 | *bytes_read = 0; |
304 | auto out_data = reinterpret_cast<uint8_t*>(out); |
305 | |
306 | while (nbytes > 0) { |
307 | int64_t avail = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0; |
308 | if (avail > 0) { |
309 | // Pending decompressed data is available, use it |
310 | avail = std::min(avail, nbytes); |
311 | memcpy(out_data, decompressed_->data() + decompressed_pos_, avail); |
312 | decompressed_pos_ += avail; |
313 | out_data += avail; |
314 | *bytes_read += avail; |
315 | nbytes -= avail; |
316 | if (decompressed_pos_ == decompressed_->size()) { |
317 | // Decompressed data is exhausted, release buffer |
318 | decompressed_.reset(); |
319 | } |
320 | if (nbytes == 0) { |
321 | // We're done |
322 | break; |
323 | } |
324 | } |
325 | |
326 | // At this point, no more decompressed data remains, |
327 | // so we need to decompress more |
328 | if (decompressor_->IsFinished()) { |
329 | break; |
330 | } |
331 | // First try to read data from the decompressor |
332 | if (compressed_) { |
333 | RETURN_NOT_OK(DecompressData()); |
334 | } |
335 | if (!decompressed_ || decompressed_->size() == 0) { |
336 | // Got nothing, need to read more compressed data |
337 | RETURN_NOT_OK(EnsureCompressedData()); |
338 | if (compressed_pos_ == compressed_->size()) { |
339 | // Compressed stream unexpectedly exhausted |
340 | return Status::IOError("Truncated compressed stream" ); |
341 | } |
342 | RETURN_NOT_OK(DecompressData()); |
343 | } |
344 | } |
345 | return Status::OK(); |
346 | } |
347 | |
348 | Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
349 | std::shared_ptr<ResizableBuffer> buf; |
350 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buf)); |
351 | int64_t bytes_read; |
352 | RETURN_NOT_OK(Read(nbytes, &bytes_read, buf->mutable_data())); |
353 | RETURN_NOT_OK(buf->Resize(bytes_read)); |
354 | *out = buf; |
355 | return Status::OK(); |
356 | } |
357 | |
358 | std::shared_ptr<InputStream> raw() const { return raw_; } |
359 | |
360 | private: |
361 | // Read 64 KB compressed data at a time |
362 | static const int64_t kChunkSize = 64 * 1024; |
363 | // Decompress 1 MB at a time |
364 | static const int64_t kDecompressSize = 1024 * 1024; |
365 | |
366 | MemoryPool* pool_; |
367 | std::shared_ptr<InputStream> raw_; |
368 | Codec* codec_; |
369 | bool is_open_; |
370 | std::shared_ptr<Decompressor> decompressor_; |
371 | std::shared_ptr<Buffer> compressed_; |
372 | int64_t compressed_pos_; |
373 | std::shared_ptr<ResizableBuffer> decompressed_; |
374 | int64_t decompressed_pos_; |
375 | |
376 | mutable std::mutex lock_; |
377 | }; |
378 | |
379 | Status CompressedInputStream::Make(Codec* codec, const std::shared_ptr<InputStream>& raw, |
380 | std::shared_ptr<CompressedInputStream>* out) { |
381 | return Make(default_memory_pool(), codec, raw, out); |
382 | } |
383 | |
384 | Status CompressedInputStream::Make(MemoryPool* pool, Codec* codec, |
385 | const std::shared_ptr<InputStream>& raw, |
386 | std::shared_ptr<CompressedInputStream>* out) { |
387 | std::shared_ptr<CompressedInputStream> res(new CompressedInputStream); |
388 | res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw))); |
389 | RETURN_NOT_OK(res->impl_->Init()); |
390 | *out = res; |
391 | return Status::OK(); |
392 | } |
393 | |
394 | CompressedInputStream::~CompressedInputStream() {} |
395 | |
396 | Status CompressedInputStream::Close() { return impl_->Close(); } |
397 | |
398 | bool CompressedInputStream::closed() const { return impl_->closed(); } |
399 | |
400 | Status CompressedInputStream::Tell(int64_t* position) const { |
401 | return impl_->Tell(position); |
402 | } |
403 | |
404 | Status CompressedInputStream::Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
405 | return impl_->Read(nbytes, bytes_read, out); |
406 | } |
407 | |
408 | Status CompressedInputStream::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
409 | return impl_->Read(nbytes, out); |
410 | } |
411 | |
412 | std::shared_ptr<InputStream> CompressedInputStream::raw() const { return impl_->raw(); } |
413 | |
414 | } // namespace io |
415 | } // namespace arrow |
416 | |