1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "Adaptor.hh"
20#include "Compression.hh"
21#include "orc/Exceptions.hh"
22#include "LzoDecompressor.hh"
23#include "lz4.h"
24
25#include <algorithm>
26#include <iomanip>
27#include <iostream>
28#include <sstream>
29
30#include "zlib.h"
31
32#include "wrap/snappy-wrapper.h"
33
34namespace orc {
35
36 class CompressionStreamBase: public BufferedOutputStream {
37 public:
38 CompressionStreamBase(OutputStream * outStream,
39 int compressionLevel,
40 uint64_t capacity,
41 uint64_t blockSize,
42 MemoryPool& pool);
43
44 virtual bool Next(void** data, int*size) override = 0;
45 virtual void BackUp(int count) override;
46
47 virtual std::string getName() const override = 0;
48 virtual uint64_t flush() override;
49
50 virtual bool isCompressed() const override { return true; }
51 virtual uint64_t getSize() const override;
52
53 protected:
54 void writeHeader(char * buffer, size_t compressedSize, bool original) {
55 buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
56 buffer[1] = static_cast<char>(compressedSize >> 7);
57 buffer[2] = static_cast<char>(compressedSize >> 15);
58 }
59
60 // Buffer to hold uncompressed data until user calls Next()
61 DataBuffer<unsigned char> rawInputBuffer;
62
63 // Compress level
64 int level;
65
66 // Compressed data output buffer
67 char * outputBuffer;
68
69 // Size for compressionBuffer
70 int bufferSize;
71
72 // Compress output position
73 int outputPosition;
74
75 // Compress output buffer size
76 int outputSize;
77 };
78
79 CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
80 int compressionLevel,
81 uint64_t capacity,
82 uint64_t blockSize,
83 MemoryPool& pool) :
84 BufferedOutputStream(pool,
85 outStream,
86 capacity,
87 blockSize),
88 rawInputBuffer(pool, blockSize),
89 level(compressionLevel),
90 outputBuffer(nullptr),
91 bufferSize(0),
92 outputPosition(0),
93 outputSize(0) {
94 // PASS
95 }
96
97 void CompressionStreamBase::BackUp(int count) {
98 if (count > bufferSize) {
99 throw std::logic_error("Can't backup that much!");
100 }
101 bufferSize -= count;
102 }
103
104 uint64_t CompressionStreamBase::flush() {
105 void * data;
106 int size;
107 if (!Next(&data, &size)) {
108 throw std::runtime_error("Failed to flush compression buffer.");
109 }
110 BufferedOutputStream::BackUp(outputSize - outputPosition);
111 bufferSize = outputSize = outputPosition = 0;
112 return BufferedOutputStream::flush();
113 }
114
115 uint64_t CompressionStreamBase::getSize() const {
116 return BufferedOutputStream::getSize() -
117 static_cast<uint64_t>(outputSize - outputPosition);
118 }
119
120 /**
121 * Streaming compression base class
122 */
123 class CompressionStream: public CompressionStreamBase {
124 public:
125 CompressionStream(OutputStream * outStream,
126 int compressionLevel,
127 uint64_t capacity,
128 uint64_t blockSize,
129 MemoryPool& pool);
130
131 virtual bool Next(void** data, int*size) override;
132 virtual std::string getName() const override = 0;
133
134 protected:
135 // return total compressed size
136 virtual uint64_t doStreamingCompression() = 0;
137 };
138
139 CompressionStream::CompressionStream(OutputStream * outStream,
140 int compressionLevel,
141 uint64_t capacity,
142 uint64_t blockSize,
143 MemoryPool& pool) :
144 CompressionStreamBase(outStream,
145 compressionLevel,
146 capacity,
147 blockSize,
148 pool) {
149 // PASS
150 }
151
152 bool CompressionStream::Next(void** data, int*size) {
153 if (bufferSize != 0) {
154 // adjust 3 bytes for the compression header
155 if (outputPosition + 3 >= outputSize) {
156 int newPosition = outputPosition + 3 - outputSize;
157 if (!BufferedOutputStream::Next(
158 reinterpret_cast<void **>(&outputBuffer),
159 &outputSize)) {
160 throw std::runtime_error(
161 "Failed to get next output buffer from output stream.");
162 }
163 outputPosition = newPosition;
164 } else {
165 outputPosition += 3;
166 }
167
168 uint64_t totalCompressedSize = doStreamingCompression();
169
170 char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
171 if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
172 writeHeader(header, static_cast<size_t>(bufferSize), true);
173 memcpy(
174 header + 3,
175 rawInputBuffer.data(),
176 static_cast<size_t>(bufferSize));
177
178 int backup = static_cast<int>(totalCompressedSize) - bufferSize;
179 BufferedOutputStream::BackUp(backup);
180 outputPosition -= backup;
181 outputSize -= backup;
182 } else {
183 writeHeader(header, totalCompressedSize, false);
184 }
185 }
186
187 *data = rawInputBuffer.data();
188 *size = static_cast<int>(rawInputBuffer.size());
189 bufferSize = *size;
190
191 return true;
192 }
193
194 class ZlibCompressionStream: public CompressionStream {
195 public:
196 ZlibCompressionStream(OutputStream * outStream,
197 int compressionLevel,
198 uint64_t capacity,
199 uint64_t blockSize,
200 MemoryPool& pool);
201
202 virtual std::string getName() const override;
203
204 protected:
205 virtual uint64_t doStreamingCompression() override;
206
207 private:
208 void init();
209 z_stream strm;
210 };
211
212 ZlibCompressionStream::ZlibCompressionStream(
213 OutputStream * outStream,
214 int compressionLevel,
215 uint64_t capacity,
216 uint64_t blockSize,
217 MemoryPool& pool)
218 : CompressionStream(outStream,
219 compressionLevel,
220 capacity,
221 blockSize,
222 pool) {
223 init();
224 }
225
226 uint64_t ZlibCompressionStream::doStreamingCompression() {
227 if (deflateReset(&strm) != Z_OK) {
228 throw std::runtime_error("Failed to reset inflate.");
229 }
230
231 strm.avail_in = static_cast<unsigned int>(bufferSize);
232 strm.next_in = rawInputBuffer.data();
233
234 do {
235 if (outputPosition >= outputSize) {
236 if (!BufferedOutputStream::Next(
237 reinterpret_cast<void **>(&outputBuffer),
238 &outputSize)) {
239 throw std::runtime_error(
240 "Failed to get next output buffer from output stream.");
241 }
242 outputPosition = 0;
243 }
244 strm.next_out = reinterpret_cast<unsigned char *>
245 (outputBuffer + outputPosition);
246 strm.avail_out = static_cast<unsigned int>
247 (outputSize - outputPosition);
248
249 int ret = deflate(&strm, Z_FINISH);
250 outputPosition = outputSize - static_cast<int>(strm.avail_out);
251
252 if (ret == Z_STREAM_END) {
253 break;
254 } else if (ret == Z_OK) {
255 // needs more buffer so will continue the loop
256 } else {
257 throw std::runtime_error("Failed to deflate input data.");
258 }
259 } while (strm.avail_out == 0);
260
261 return strm.total_out;
262 }
263
264 std::string ZlibCompressionStream::getName() const {
265 return "ZlibCompressionStream";
266 }
267
268DIAGNOSTIC_PUSH
269
270#if defined(__GNUC__) || defined(__clang__)
271 DIAGNOSTIC_IGNORE("-Wold-style-cast")
272#endif
273
274 void ZlibCompressionStream::init() {
275 strm.zalloc = nullptr;
276 strm.zfree = nullptr;
277 strm.opaque = nullptr;
278
279 if (deflateInit2(&strm, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY)
280 != Z_OK) {
281 throw std::runtime_error("Error while calling deflateInit2() for zlib.");
282 }
283 }
284
285DIAGNOSTIC_PUSH
286
287 enum DecompressState { DECOMPRESS_HEADER,
288 DECOMPRESS_START,
289 DECOMPRESS_CONTINUE,
290 DECOMPRESS_ORIGINAL,
291 DECOMPRESS_EOF};
292
293 class ZlibDecompressionStream: public SeekableInputStream {
294 public:
295 ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
296 size_t blockSize,
297 MemoryPool& pool);
298 virtual ~ZlibDecompressionStream() override;
299 virtual bool Next(const void** data, int*size) override;
300 virtual void BackUp(int count) override;
301 virtual bool Skip(int count) override;
302 virtual int64_t ByteCount() const override;
303 virtual void seek(PositionProvider& position) override;
304 virtual std::string getName() const override;
305
306 private:
307 void readBuffer(bool failOnEof) {
308 int length;
309 if (!input->Next(reinterpret_cast<const void**>(&inputBuffer),
310 &length)) {
311 if (failOnEof) {
312 throw ParseError("Read past EOF in "
313 "ZlibDecompressionStream::readBuffer");
314 }
315 state = DECOMPRESS_EOF;
316 inputBuffer = nullptr;
317 inputBufferEnd = nullptr;
318 } else {
319 inputBufferEnd = inputBuffer + length;
320 }
321 }
322
323 uint32_t readByte(bool failOnEof) {
324 if (inputBuffer == inputBufferEnd) {
325 readBuffer(failOnEof);
326 if (state == DECOMPRESS_EOF) {
327 return 0;
328 }
329 }
330 return static_cast<unsigned char>(*(inputBuffer++));
331 }
332
333 void readHeader() {
334 uint32_t header = readByte(false);
335 if (state != DECOMPRESS_EOF) {
336 header |= readByte(true) << 8;
337 header |= readByte(true) << 16;
338 if (header & 1) {
339 state = DECOMPRESS_ORIGINAL;
340 } else {
341 state = DECOMPRESS_START;
342 }
343 remainingLength = header >> 1;
344 } else {
345 remainingLength = 0;
346 }
347 }
348
349 MemoryPool& pool;
350 const size_t blockSize;
351 std::unique_ptr<SeekableInputStream> input;
352 z_stream zstream;
353 DataBuffer<char> buffer;
354
355 // the current state
356 DecompressState state;
357
358 // the start of the current buffer
359 // This pointer is not owned by us. It is either owned by zstream or
360 // the underlying stream.
361 const char* outputBuffer;
362 // the size of the current buffer
363 size_t outputBufferLength;
364 // the size of the current chunk
365 size_t remainingLength;
366
367 // the last buffer returned from the input
368 const char *inputBuffer;
369 const char *inputBufferEnd;
370
371 // roughly the number of bytes returned
372 off_t bytesReturned;
373 };
374
375DIAGNOSTIC_PUSH
376
377#if defined(__GNUC__) || defined(__clang__)
378 DIAGNOSTIC_IGNORE("-Wold-style-cast")
379#endif
380
381 ZlibDecompressionStream::ZlibDecompressionStream
382 (std::unique_ptr<SeekableInputStream> inStream,
383 size_t _blockSize,
384 MemoryPool& _pool
385 ): pool(_pool),
386 blockSize(_blockSize),
387 buffer(pool, _blockSize) {
388 input.reset(inStream.release());
389 zstream.next_in = nullptr;
390 zstream.avail_in = 0;
391 zstream.zalloc = nullptr;
392 zstream.zfree = nullptr;
393 zstream.opaque = nullptr;
394 zstream.next_out = reinterpret_cast<Bytef*>(buffer.data());
395 zstream.avail_out = static_cast<uInt>(blockSize);
396 int64_t result = inflateInit2(&zstream, -15);
397 switch (result) {
398 case Z_OK:
399 break;
400 case Z_MEM_ERROR:
401 throw std::logic_error("Memory error from inflateInit2");
402 case Z_VERSION_ERROR:
403 throw std::logic_error("Version error from inflateInit2");
404 case Z_STREAM_ERROR:
405 throw std::logic_error("Stream error from inflateInit2");
406 default:
407 throw std::logic_error("Unknown error from inflateInit2");
408 }
409 outputBuffer = nullptr;
410 outputBufferLength = 0;
411 remainingLength = 0;
412 state = DECOMPRESS_HEADER;
413 inputBuffer = nullptr;
414 inputBufferEnd = nullptr;
415 bytesReturned = 0;
416 }
417
418DIAGNOSTIC_POP
419
420 ZlibDecompressionStream::~ZlibDecompressionStream() {
421 int64_t result = inflateEnd(&zstream);
422 if (result != Z_OK) {
423 // really can't throw in destructors
424 std::cout << "Error in ~ZlibDecompressionStream() " << result << "\n";
425 }
426 }
427
428 bool ZlibDecompressionStream::Next(const void** data, int*size) {
429 // if the user pushed back, return them the partial buffer
430 if (outputBufferLength) {
431 *data = outputBuffer;
432 *size = static_cast<int>(outputBufferLength);
433 outputBuffer += outputBufferLength;
434 outputBufferLength = 0;
435 return true;
436 }
437 if (state == DECOMPRESS_HEADER || remainingLength == 0) {
438 readHeader();
439 }
440 if (state == DECOMPRESS_EOF) {
441 return false;
442 }
443 if (inputBuffer == inputBufferEnd) {
444 readBuffer(true);
445 }
446 size_t availSize =
447 std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
448 remainingLength);
449 if (state == DECOMPRESS_ORIGINAL) {
450 *data = inputBuffer;
451 *size = static_cast<int>(availSize);
452 outputBuffer = inputBuffer + availSize;
453 outputBufferLength = 0;
454 } else if (state == DECOMPRESS_START) {
455 zstream.next_in =
456 reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
457 zstream.avail_in = static_cast<uInt>(availSize);
458 outputBuffer = buffer.data();
459 zstream.next_out =
460 reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer));
461 zstream.avail_out = static_cast<uInt>(blockSize);
462 if (inflateReset(&zstream) != Z_OK) {
463 throw std::logic_error("Bad inflateReset in "
464 "ZlibDecompressionStream::Next");
465 }
466 int64_t result;
467 do {
468 result = inflate(&zstream, availSize == remainingLength ? Z_FINISH :
469 Z_SYNC_FLUSH);
470 switch (result) {
471 case Z_OK:
472 remainingLength -= availSize;
473 inputBuffer += availSize;
474 readBuffer(true);
475 availSize =
476 std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
477 remainingLength);
478 zstream.next_in =
479 reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
480 zstream.avail_in = static_cast<uInt>(availSize);
481 break;
482 case Z_STREAM_END:
483 break;
484 case Z_BUF_ERROR:
485 throw std::logic_error("Buffer error in "
486 "ZlibDecompressionStream::Next");
487 case Z_DATA_ERROR:
488 throw std::logic_error("Data error in "
489 "ZlibDecompressionStream::Next");
490 case Z_STREAM_ERROR:
491 throw std::logic_error("Stream error in "
492 "ZlibDecompressionStream::Next");
493 default:
494 throw std::logic_error("Unknown error in "
495 "ZlibDecompressionStream::Next");
496 }
497 } while (result != Z_STREAM_END);
498 *size = static_cast<int>(blockSize - zstream.avail_out);
499 *data = outputBuffer;
500 outputBufferLength = 0;
501 outputBuffer += *size;
502 } else {
503 throw std::logic_error("Unknown compression state in "
504 "ZlibDecompressionStream::Next");
505 }
506 inputBuffer += availSize;
507 remainingLength -= availSize;
508 bytesReturned += *size;
509 return true;
510 }
511
512 void ZlibDecompressionStream::BackUp(int count) {
513 if (outputBuffer == nullptr || outputBufferLength != 0) {
514 throw std::logic_error("Backup without previous Next in "
515 "ZlibDecompressionStream");
516 }
517 outputBuffer -= static_cast<size_t>(count);
518 outputBufferLength = static_cast<size_t>(count);
519 bytesReturned -= count;
520 }
521
522 bool ZlibDecompressionStream::Skip(int count) {
523 bytesReturned += count;
524 // this is a stupid implementation for now.
525 // should skip entire blocks without decompressing
526 while (count > 0) {
527 const void *ptr;
528 int len;
529 if (!Next(&ptr, &len)) {
530 return false;
531 }
532 if (len > count) {
533 BackUp(len - count);
534 count = 0;
535 } else {
536 count -= len;
537 }
538 }
539 return true;
540 }
541
542 int64_t ZlibDecompressionStream::ByteCount() const {
543 return bytesReturned;
544 }
545
546 void ZlibDecompressionStream::seek(PositionProvider& position) {
547 input->seek(position);
548 bytesReturned = static_cast<off_t>(input->ByteCount());
549 if (!Skip(static_cast<int>(position.next()))) {
550 throw ParseError("Bad skip in ZlibDecompressionStream::seek");
551 }
552 }
553
554 std::string ZlibDecompressionStream::getName() const {
555 std::ostringstream result;
556 result << "zlib(" << input->getName() << ")";
557 return result.str();
558 }
559
560 class BlockDecompressionStream: public SeekableInputStream {
561 public:
562 BlockDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
563 size_t blockSize,
564 MemoryPool& pool);
565
566 virtual ~BlockDecompressionStream() override {}
567 virtual bool Next(const void** data, int*size) override;
568 virtual void BackUp(int count) override;
569 virtual bool Skip(int count) override;
570 virtual int64_t ByteCount() const override;
571 virtual void seek(PositionProvider& position) override;
572 virtual std::string getName() const override = 0;
573
574 protected:
575 virtual uint64_t decompress(const char *input, uint64_t length,
576 char *output, size_t maxOutputLength) = 0;
577
578 std::string getStreamName() const {
579 return input->getName();
580 }
581
582 private:
583 void readBuffer(bool failOnEof) {
584 int length;
585 if (!input->Next(reinterpret_cast<const void**>(&inputBufferPtr),
586 &length)) {
587 if (failOnEof) {
588 throw ParseError(getName() + "read past EOF");
589 }
590 state = DECOMPRESS_EOF;
591 inputBufferPtr = nullptr;
592 inputBufferPtrEnd = nullptr;
593 } else {
594 inputBufferPtrEnd = inputBufferPtr + length;
595 }
596 }
597
598 uint32_t readByte(bool failOnEof) {
599 if (inputBufferPtr == inputBufferPtrEnd) {
600 readBuffer(failOnEof);
601 if (state == DECOMPRESS_EOF) {
602 return 0;
603 }
604 }
605 return static_cast<unsigned char>(*(inputBufferPtr++));
606 }
607
608 void readHeader() {
609 uint32_t header = readByte(false);
610 if (state != DECOMPRESS_EOF) {
611 header |= readByte(true) << 8;
612 header |= readByte(true) << 16;
613 if (header & 1) {
614 state = DECOMPRESS_ORIGINAL;
615 } else {
616 state = DECOMPRESS_START;
617 }
618 remainingLength = header >> 1;
619 } else {
620 remainingLength = 0;
621 }
622 }
623
624 std::unique_ptr<SeekableInputStream> input;
625 MemoryPool& pool;
626
627 // may need to stitch together multiple input buffers;
628 // to give snappy a contiguous block
629 DataBuffer<char> inputBuffer;
630
631 // uncompressed output
632 DataBuffer<char> outputBuffer;
633
634 // the current state
635 DecompressState state;
636
637 // the start of the current output buffer
638 const char* outputBufferPtr;
639 // the size of the current output buffer
640 size_t outputBufferLength;
641
642 // the size of the current chunk
643 size_t remainingLength;
644
645 // the last buffer returned from the input
646 const char *inputBufferPtr;
647 const char *inputBufferPtrEnd;
648
649 // bytes returned by this stream
650 off_t bytesReturned;
651 };
652
653 BlockDecompressionStream::BlockDecompressionStream
654 (std::unique_ptr<SeekableInputStream> inStream,
655 size_t bufferSize,
656 MemoryPool& _pool
657 ) : pool(_pool),
658 inputBuffer(pool, bufferSize),
659 outputBuffer(pool, bufferSize),
660 state(DECOMPRESS_HEADER),
661 outputBufferPtr(nullptr),
662 outputBufferLength(0),
663 remainingLength(0),
664 inputBufferPtr(nullptr),
665 inputBufferPtrEnd(nullptr),
666 bytesReturned(0) {
667 input.reset(inStream.release());
668 }
669
670 bool BlockDecompressionStream::Next(const void** data, int*size) {
671 // if the user pushed back, return them the partial buffer
672 if (outputBufferLength) {
673 *data = outputBufferPtr;
674 *size = static_cast<int>(outputBufferLength);
675 outputBufferPtr += outputBufferLength;
676 bytesReturned += static_cast<off_t>(outputBufferLength);
677 outputBufferLength = 0;
678 return true;
679 }
680 if (state == DECOMPRESS_HEADER || remainingLength == 0) {
681 readHeader();
682 }
683 if (state == DECOMPRESS_EOF) {
684 return false;
685 }
686 if (inputBufferPtr == inputBufferPtrEnd) {
687 readBuffer(true);
688 }
689
690 size_t availSize =
691 std::min(static_cast<size_t>(inputBufferPtrEnd - inputBufferPtr),
692 remainingLength);
693 if (state == DECOMPRESS_ORIGINAL) {
694 *data = inputBufferPtr;
695 *size = static_cast<int>(availSize);
696 outputBufferPtr = inputBufferPtr + availSize;
697 outputBufferLength = 0;
698 inputBufferPtr += availSize;
699 remainingLength -= availSize;
700 } else if (state == DECOMPRESS_START) {
701 // Get contiguous bytes of compressed block.
702 const char *compressed = inputBufferPtr;
703 if (remainingLength == availSize) {
704 inputBufferPtr += availSize;
705 } else {
706 // Did not read enough from input.
707 if (inputBuffer.capacity() < remainingLength) {
708 inputBuffer.resize(remainingLength);
709 }
710 ::memcpy(inputBuffer.data(), inputBufferPtr, availSize);
711 inputBufferPtr += availSize;
712 compressed = inputBuffer.data();
713
714 for (size_t pos = availSize; pos < remainingLength; ) {
715 readBuffer(true);
716 size_t avail =
717 std::min(static_cast<size_t>(inputBufferPtrEnd -
718 inputBufferPtr),
719 remainingLength - pos);
720 ::memcpy(inputBuffer.data() + pos, inputBufferPtr, avail);
721 pos += avail;
722 inputBufferPtr += avail;
723 }
724 }
725
726 outputBufferLength = decompress(compressed, remainingLength,
727 outputBuffer.data(),
728 outputBuffer.capacity());
729
730 remainingLength = 0;
731 state = DECOMPRESS_HEADER;
732 *data = outputBuffer.data();
733 *size = static_cast<int>(outputBufferLength);
734 outputBufferPtr = outputBuffer.data() + outputBufferLength;
735 outputBufferLength = 0;
736 }
737
738 bytesReturned += *size;
739 return true;
740 }
741
742 void BlockDecompressionStream::BackUp(int count) {
743 if (outputBufferPtr == nullptr || outputBufferLength != 0) {
744 throw std::logic_error("Backup without previous Next in "+getName());
745 }
746 outputBufferPtr -= static_cast<size_t>(count);
747 outputBufferLength = static_cast<size_t>(count);
748 bytesReturned -= count;
749 }
750
751 bool BlockDecompressionStream::Skip(int count) {
752 bytesReturned += count;
753 // this is a stupid implementation for now.
754 // should skip entire blocks without decompressing
755 while (count > 0) {
756 const void *ptr;
757 int len;
758 if (!Next(&ptr, &len)) {
759 return false;
760 }
761 if (len > count) {
762 BackUp(len - count);
763 count = 0;
764 } else {
765 count -= len;
766 }
767 }
768 return true;
769 }
770
771 int64_t BlockDecompressionStream::ByteCount() const {
772 return bytesReturned;
773 }
774
775 void BlockDecompressionStream::seek(PositionProvider& position) {
776 input->seek(position);
777 if (!Skip(static_cast<int>(position.next()))) {
778 throw ParseError("Bad skip in " + getName());
779 }
780 }
781
782 class SnappyDecompressionStream: public BlockDecompressionStream {
783 public:
784 SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
785 size_t blockSize,
786 MemoryPool& pool
787 ): BlockDecompressionStream
788 (std::move(inStream),
789 blockSize,
790 pool) {
791 // PASS
792 }
793
794 std::string getName() const override {
795 std::ostringstream result;
796 result << "snappy(" << getStreamName() << ")";
797 return result.str();
798 }
799
800 protected:
801 virtual uint64_t decompress(const char *input, uint64_t length,
802 char *output, size_t maxOutputLength
803 ) override;
804 };
805
806 uint64_t SnappyDecompressionStream::decompress(const char *input,
807 uint64_t length,
808 char *output,
809 size_t maxOutputLength) {
810 size_t outLength;
811 if (!snappy::GetUncompressedLength(input, length, &outLength)) {
812 throw ParseError("SnappyDecompressionStream choked on corrupt input");
813 }
814
815 if (outLength > maxOutputLength) {
816 throw std::logic_error("Snappy length exceeds block size");
817 }
818
819 if (!snappy::RawUncompress(input, length, output)) {
820 throw ParseError("SnappyDecompressionStream choked on corrupt input");
821 }
822 return outLength;
823 }
824
825 class LzoDecompressionStream: public BlockDecompressionStream {
826 public:
827 LzoDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
828 size_t blockSize,
829 MemoryPool& pool
830 ): BlockDecompressionStream
831 (std::move(inStream),
832 blockSize,
833 pool) {
834 // PASS
835 }
836
837 std::string getName() const override {
838 std::ostringstream result;
839 result << "lzo(" << getStreamName() << ")";
840 return result.str();
841 }
842
843 protected:
844 virtual uint64_t decompress(const char *input, uint64_t length,
845 char *output, size_t maxOutputLength
846 ) override;
847 };
848
849 uint64_t LzoDecompressionStream::decompress(const char *input,
850 uint64_t length,
851 char *output,
852 size_t maxOutputLength) {
853 return lzoDecompress(input, input + length, output,
854 output + maxOutputLength);
855 }
856
857 class Lz4DecompressionStream: public BlockDecompressionStream {
858 public:
859 Lz4DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
860 size_t blockSize,
861 MemoryPool& pool
862 ): BlockDecompressionStream
863 (std::move(inStream),
864 blockSize,
865 pool) {
866 // PASS
867 }
868
869 std::string getName() const override {
870 std::ostringstream result;
871 result << "lz4(" << getStreamName() << ")";
872 return result.str();
873 }
874
875 protected:
876 virtual uint64_t decompress(const char *input, uint64_t length,
877 char *output, size_t maxOutputLength
878 ) override;
879 };
880
881 uint64_t Lz4DecompressionStream::decompress(const char *input,
882 uint64_t length,
883 char *output,
884 size_t maxOutputLength) {
885 int result = LZ4_decompress_safe(input, output, static_cast<int>(length),
886 static_cast<int>(maxOutputLength));
887 if (result < 0) {
888 throw ParseError(getName() + " - failed to decompress");
889 }
890 return static_cast<uint64_t>(result);
891 }
892
893 std::unique_ptr<BufferedOutputStream>
894 createCompressor(
895 CompressionKind kind,
896 OutputStream * outStream,
897 CompressionStrategy strategy,
898 uint64_t bufferCapacity,
899 uint64_t compressionBlockSize,
900 MemoryPool& pool) {
901 switch (static_cast<int64_t>(kind)) {
902 case CompressionKind_NONE: {
903 return std::unique_ptr<BufferedOutputStream>
904 (new BufferedOutputStream(
905 pool, outStream, bufferCapacity, compressionBlockSize));
906 }
907 case CompressionKind_ZLIB: {
908 int level = (strategy == CompressionStrategy_SPEED) ?
909 Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION;
910 return std::unique_ptr<BufferedOutputStream>
911 (new ZlibCompressionStream(
912 outStream, level, bufferCapacity, compressionBlockSize, pool));
913 }
914 case CompressionKind_SNAPPY:
915 case CompressionKind_LZO:
916 case CompressionKind_LZ4:
917 default:
918 throw NotImplementedYet("compression codec");
919 }
920 }
921
922 std::unique_ptr<SeekableInputStream>
923 createDecompressor(CompressionKind kind,
924 std::unique_ptr<SeekableInputStream> input,
925 uint64_t blockSize,
926 MemoryPool& pool) {
927 switch (static_cast<int64_t>(kind)) {
928 case CompressionKind_NONE:
929 return REDUNDANT_MOVE(input);
930 case CompressionKind_ZLIB:
931 return std::unique_ptr<SeekableInputStream>
932 (new ZlibDecompressionStream(std::move(input), blockSize, pool));
933 case CompressionKind_SNAPPY:
934 return std::unique_ptr<SeekableInputStream>
935 (new SnappyDecompressionStream(std::move(input), blockSize, pool));
936 case CompressionKind_LZO:
937 return std::unique_ptr<SeekableInputStream>
938 (new LzoDecompressionStream(std::move(input), blockSize, pool));
939 case CompressionKind_LZ4:
940 return std::unique_ptr<SeekableInputStream>
941 (new Lz4DecompressionStream(std::move(input), blockSize, pool));
942 default: {
943 std::ostringstream buffer;
944 buffer << "Unknown compression codec " << kind;
945 throw NotImplementedYet(buffer.str());
946 }
947 }
948 }
949
950}
951