1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/compression/Compression.h>
18
19#if FOLLY_HAVE_LIBLZ4
20#include <lz4.h>
21#include <lz4hc.h>
22#if LZ4_VERSION_NUMBER >= 10301
23#include <lz4frame.h>
24#endif
25#endif
26
27#include <glog/logging.h>
28
29#if FOLLY_HAVE_LIBSNAPPY
30#include <snappy-sinksource.h>
31#include <snappy.h>
32#endif
33
34#if FOLLY_HAVE_LIBZ
35#include <folly/compression/Zlib.h>
36#endif
37
38#if FOLLY_HAVE_LIBLZMA
39#include <lzma.h>
40#endif
41
42#if FOLLY_HAVE_LIBZSTD
43#include <folly/compression/Zstd.h>
44#endif
45
46#if FOLLY_HAVE_LIBBZ2
47#include <folly/portability/Windows.h>
48
49#include <bzlib.h>
50#endif
51
52#include <folly/Conv.h>
53#include <folly/Memory.h>
54#include <folly/Portability.h>
55#include <folly/Random.h>
56#include <folly/ScopeGuard.h>
57#include <folly/Varint.h>
58#include <folly/compression/Utils.h>
59#include <folly/io/Cursor.h>
60#include <folly/lang/Bits.h>
61#include <folly/stop_watch.h>
62#include <algorithm>
63#include <unordered_set>
64
65using folly::io::compression::detail::dataStartsWithLE;
66using folly::io::compression::detail::prefixToStringLE;
67
68namespace folly {
69namespace io {
70
71Codec::Codec(
72 CodecType type,
73 Optional<int> level,
74 StringPiece name,
75 bool counters)
76 : type_(type) {
77 if (counters) {
78 bytesBeforeCompression_ = {type,
79 name,
80 level,
81 CompressionCounterKey::BYTES_BEFORE_COMPRESSION,
82 CompressionCounterType::SUM};
83 bytesAfterCompression_ = {type,
84 name,
85 level,
86 CompressionCounterKey::BYTES_AFTER_COMPRESSION,
87 CompressionCounterType::SUM};
88 bytesBeforeDecompression_ = {
89 type,
90 name,
91 level,
92 CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION,
93 CompressionCounterType::SUM};
94 bytesAfterDecompression_ = {
95 type,
96 name,
97 level,
98 CompressionCounterKey::BYTES_AFTER_DECOMPRESSION,
99 CompressionCounterType::SUM};
100 compressions_ = {type,
101 name,
102 level,
103 CompressionCounterKey::COMPRESSIONS,
104 CompressionCounterType::SUM};
105 decompressions_ = {type,
106 name,
107 level,
108 CompressionCounterKey::DECOMPRESSIONS,
109 CompressionCounterType::SUM};
110 compressionMilliseconds_ = {type,
111 name,
112 level,
113 CompressionCounterKey::COMPRESSION_MILLISECONDS,
114 CompressionCounterType::SUM};
115 decompressionMilliseconds_ = {
116 type,
117 name,
118 level,
119 CompressionCounterKey::DECOMPRESSION_MILLISECONDS,
120 CompressionCounterType::SUM};
121 }
122}
123
124namespace {
125constexpr uint32_t kLoggingRate = 50;
126
127class Timer {
128 public:
129 explicit Timer(folly::detail::CompressionCounter& counter)
130 : counter_(&counter) {}
131
132 ~Timer() {
133 *counter_ += timer_.elapsed().count();
134 }
135
136 private:
137 folly::detail::CompressionCounter* counter_;
138 stop_watch<std::chrono::milliseconds> timer_;
139};
140} // namespace
141
142// Ensure consistent behavior in the nullptr case
143std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
144 if (data == nullptr) {
145 throw std::invalid_argument("Codec: data must not be nullptr");
146 }
147 const uint64_t len = data->computeChainDataLength();
148 if (len > maxUncompressedLength()) {
149 throw std::runtime_error("Codec: uncompressed length too large");
150 }
151 bool const logging = folly::Random::oneIn(kLoggingRate);
152 folly::Optional<Timer> const timer =
153 logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
154 auto result = doCompress(data);
155 if (logging) {
156 compressions_++;
157 bytesBeforeCompression_ += len;
158 bytesAfterCompression_ += result->computeChainDataLength();
159 }
160 return result;
161}
162
163std::string Codec::compress(const StringPiece data) {
164 const uint64_t len = data.size();
165 if (len > maxUncompressedLength()) {
166 throw std::runtime_error("Codec: uncompressed length too large");
167 }
168 bool const logging = folly::Random::oneIn(kLoggingRate);
169 folly::Optional<Timer> const timer =
170 logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
171 auto result = doCompressString(data);
172 if (logging) {
173 compressions_++;
174 bytesBeforeCompression_ += len;
175 bytesAfterCompression_ += result.size();
176 }
177 return result;
178}
179
180std::unique_ptr<IOBuf> Codec::uncompress(
181 const IOBuf* data,
182 Optional<uint64_t> uncompressedLength) {
183 if (data == nullptr) {
184 throw std::invalid_argument("Codec: data must not be nullptr");
185 }
186 if (!uncompressedLength) {
187 if (needsUncompressedLength()) {
188 throw std::invalid_argument("Codec: uncompressed length required");
189 }
190 } else if (*uncompressedLength > maxUncompressedLength()) {
191 throw std::runtime_error("Codec: uncompressed length too large");
192 }
193
194 if (data->empty()) {
195 if (uncompressedLength.value_or(0) != 0) {
196 throw std::runtime_error("Codec: invalid uncompressed length");
197 }
198 return IOBuf::create(0);
199 }
200
201 bool const logging = folly::Random::oneIn(kLoggingRate);
202 folly::Optional<Timer> const timer =
203 logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
204 auto result = doUncompress(data, uncompressedLength);
205 if (logging) {
206 decompressions_++;
207 bytesBeforeDecompression_ += data->computeChainDataLength();
208 bytesAfterDecompression_ += result->computeChainDataLength();
209 }
210 return result;
211}
212
213std::string Codec::uncompress(
214 const StringPiece data,
215 Optional<uint64_t> uncompressedLength) {
216 if (!uncompressedLength) {
217 if (needsUncompressedLength()) {
218 throw std::invalid_argument("Codec: uncompressed length required");
219 }
220 } else if (*uncompressedLength > maxUncompressedLength()) {
221 throw std::runtime_error("Codec: uncompressed length too large");
222 }
223
224 if (data.empty()) {
225 if (uncompressedLength.value_or(0) != 0) {
226 throw std::runtime_error("Codec: invalid uncompressed length");
227 }
228 return "";
229 }
230
231 bool const logging = folly::Random::oneIn(kLoggingRate);
232 folly::Optional<Timer> const timer =
233 logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
234 auto result = doUncompressString(data, uncompressedLength);
235 if (logging) {
236 decompressions_++;
237 bytesBeforeDecompression_ += data.size();
238 bytesAfterDecompression_ += result.size();
239 }
240 return result;
241}
242
243bool Codec::needsUncompressedLength() const {
244 return doNeedsUncompressedLength();
245}
246
247uint64_t Codec::maxUncompressedLength() const {
248 return doMaxUncompressedLength();
249}
250
251bool Codec::doNeedsUncompressedLength() const {
252 return false;
253}
254
255uint64_t Codec::doMaxUncompressedLength() const {
256 return UNLIMITED_UNCOMPRESSED_LENGTH;
257}
258
259std::vector<std::string> Codec::validPrefixes() const {
260 return {};
261}
262
263bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
264 return false;
265}
266
267std::string Codec::doCompressString(const StringPiece data) {
268 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
269 auto outputBuffer = doCompress(&inputBuffer);
270 std::string output;
271 output.reserve(outputBuffer->computeChainDataLength());
272 for (auto range : *outputBuffer) {
273 output.append(reinterpret_cast<const char*>(range.data()), range.size());
274 }
275 return output;
276}
277
278std::string Codec::doUncompressString(
279 const StringPiece data,
280 Optional<uint64_t> uncompressedLength) {
281 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
282 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
283 std::string output;
284 output.reserve(outputBuffer->computeChainDataLength());
285 for (auto range : *outputBuffer) {
286 output.append(reinterpret_cast<const char*>(range.data()), range.size());
287 }
288 return output;
289}
290
291uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
292 return doMaxCompressedLength(uncompressedLength);
293}
294
295Optional<uint64_t> Codec::getUncompressedLength(
296 const folly::IOBuf* data,
297 Optional<uint64_t> uncompressedLength) const {
298 auto const compressedLength = data->computeChainDataLength();
299 if (compressedLength == 0) {
300 if (uncompressedLength.value_or(0) != 0) {
301 throw std::runtime_error("Invalid uncompressed length");
302 }
303 return 0;
304 }
305 return doGetUncompressedLength(data, uncompressedLength);
306}
307
308Optional<uint64_t> Codec::doGetUncompressedLength(
309 const folly::IOBuf*,
310 Optional<uint64_t> uncompressedLength) const {
311 return uncompressedLength;
312}
313
314bool StreamCodec::needsDataLength() const {
315 return doNeedsDataLength();
316}
317
318bool StreamCodec::doNeedsDataLength() const {
319 return false;
320}
321
322void StreamCodec::assertStateIs(State expected) const {
323 if (state_ != expected) {
324 throw std::logic_error(folly::to<std::string>(
325 "Codec: state is ", state_, "; expected state ", expected));
326 }
327}
328
329void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
330 state_ = State::RESET;
331 uncompressedLength_ = uncompressedLength;
332 progressMade_ = true;
333 doResetStream();
334}
335
336bool StreamCodec::compressStream(
337 ByteRange& input,
338 MutableByteRange& output,
339 StreamCodec::FlushOp flushOp) {
340 if (state_ == State::RESET && input.empty() &&
341 flushOp == StreamCodec::FlushOp::END &&
342 uncompressedLength().value_or(0) != 0) {
343 throw std::runtime_error("Codec: invalid uncompressed length");
344 }
345
346 if (!uncompressedLength() && needsDataLength()) {
347 throw std::runtime_error("Codec: uncompressed length required");
348 }
349 if (state_ == State::RESET && !input.empty() &&
350 uncompressedLength() == uint64_t(0)) {
351 throw std::runtime_error("Codec: invalid uncompressed length");
352 }
353 // Handle input state transitions
354 switch (flushOp) {
355 case StreamCodec::FlushOp::NONE:
356 if (state_ == State::RESET) {
357 state_ = State::COMPRESS;
358 }
359 assertStateIs(State::COMPRESS);
360 break;
361 case StreamCodec::FlushOp::FLUSH:
362 if (state_ == State::RESET || state_ == State::COMPRESS) {
363 state_ = State::COMPRESS_FLUSH;
364 }
365 assertStateIs(State::COMPRESS_FLUSH);
366 break;
367 case StreamCodec::FlushOp::END:
368 if (state_ == State::RESET || state_ == State::COMPRESS) {
369 state_ = State::COMPRESS_END;
370 }
371 assertStateIs(State::COMPRESS_END);
372 break;
373 }
374 size_t const inputSize = input.size();
375 size_t const outputSize = output.size();
376 bool const done = doCompressStream(input, output, flushOp);
377 if (!done && inputSize == input.size() && outputSize == output.size()) {
378 if (!progressMade_) {
379 throw std::runtime_error("Codec: No forward progress made");
380 }
381 // Throw an exception if there is no progress again next time
382 progressMade_ = false;
383 } else {
384 progressMade_ = true;
385 }
386 // Handle output state transitions
387 if (done) {
388 if (state_ == State::COMPRESS_FLUSH) {
389 state_ = State::COMPRESS;
390 } else if (state_ == State::COMPRESS_END) {
391 state_ = State::END;
392 }
393 // Check internal invariants
394 DCHECK(input.empty());
395 DCHECK(flushOp != StreamCodec::FlushOp::NONE);
396 }
397 return done;
398}
399
400bool StreamCodec::uncompressStream(
401 ByteRange& input,
402 MutableByteRange& output,
403 StreamCodec::FlushOp flushOp) {
404 if (state_ == State::RESET && input.empty()) {
405 if (uncompressedLength().value_or(0) == 0) {
406 return true;
407 }
408 return false;
409 }
410 // Handle input state transitions
411 if (state_ == State::RESET) {
412 state_ = State::UNCOMPRESS;
413 }
414 assertStateIs(State::UNCOMPRESS);
415 size_t const inputSize = input.size();
416 size_t const outputSize = output.size();
417 bool const done = doUncompressStream(input, output, flushOp);
418 if (!done && inputSize == input.size() && outputSize == output.size()) {
419 if (!progressMade_) {
420 throw std::runtime_error("Codec: no forward progress made");
421 }
422 // Throw an exception if there is no progress again next time
423 progressMade_ = false;
424 } else {
425 progressMade_ = true;
426 }
427 // Handle output state transitions
428 if (done) {
429 state_ = State::END;
430 }
431 return done;
432}
433
434static std::unique_ptr<IOBuf> addOutputBuffer(
435 MutableByteRange& output,
436 uint64_t size) {
437 DCHECK(output.empty());
438 auto buffer = IOBuf::create(size);
439 buffer->append(buffer->capacity());
440 output = {buffer->writableData(), buffer->length()};
441 return buffer;
442}
443
444std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
445 uint64_t const uncompressedLength = data->computeChainDataLength();
446 resetStream(uncompressedLength);
447 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
448
449 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
450 auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
451
452 MutableByteRange output;
453 auto buffer = addOutputBuffer(
454 output,
455 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
456 : kDefaultBufferLength);
457
458 // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
459 IOBuf const* current = data;
460 ByteRange input{current->data(), current->length()};
461 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
462 bool done = false;
463 while (!done) {
464 while (input.empty() && current->next() != data) {
465 current = current->next();
466 input = {current->data(), current->length()};
467 }
468 if (current->next() == data) {
469 // This is the last input buffer so end the stream
470 flushOp = StreamCodec::FlushOp::END;
471 }
472 if (output.empty()) {
473 buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
474 }
475 done = compressStream(input, output, flushOp);
476 if (done) {
477 DCHECK(input.empty());
478 DCHECK(flushOp == StreamCodec::FlushOp::END);
479 DCHECK_EQ(current->next(), data);
480 }
481 }
482 buffer->prev()->trimEnd(output.size());
483 return buffer;
484}
485
486static uint64_t computeBufferLength(
487 uint64_t const compressedLength,
488 uint64_t const blockSize) {
489 uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
490 uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
491 return std::min(goodBufferSize, kMaxBufferLength);
492}
493
494std::unique_ptr<IOBuf> StreamCodec::doUncompress(
495 IOBuf const* data,
496 Optional<uint64_t> uncompressedLength) {
497 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
498 auto constexpr kBlockSize = uint64_t(128) << 10;
499 auto const defaultBufferLength =
500 computeBufferLength(data->computeChainDataLength(), kBlockSize);
501
502 uncompressedLength = getUncompressedLength(data, uncompressedLength);
503 resetStream(uncompressedLength);
504
505 MutableByteRange output;
506 auto buffer = addOutputBuffer(
507 output,
508 (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
509 ? *uncompressedLength
510 : defaultBufferLength));
511
512 // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
513 IOBuf const* current = data;
514 ByteRange input{current->data(), current->length()};
515 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
516 bool done = false;
517 while (!done) {
518 while (input.empty() && current->next() != data) {
519 current = current->next();
520 input = {current->data(), current->length()};
521 }
522 if (current->next() == data) {
523 // Tell the uncompressor there is no more input (it may optimize)
524 flushOp = StreamCodec::FlushOp::END;
525 }
526 if (output.empty()) {
527 buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
528 }
529 done = uncompressStream(input, output, flushOp);
530 }
531 if (!input.empty()) {
532 throw std::runtime_error("Codec: Junk after end of data");
533 }
534
535 buffer->prev()->trimEnd(output.size());
536 if (uncompressedLength &&
537 *uncompressedLength != buffer->computeChainDataLength()) {
538 throw std::runtime_error("Codec: invalid uncompressed length");
539 }
540
541 return buffer;
542}
543
544namespace {
545
546/**
547 * No compression
548 */
549class NoCompressionCodec final : public Codec {
550 public:
551 static std::unique_ptr<Codec> create(int level, CodecType type);
552 explicit NoCompressionCodec(int level, CodecType type);
553
554 private:
555 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
556 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
557 std::unique_ptr<IOBuf> doUncompress(
558 const IOBuf* data,
559 Optional<uint64_t> uncompressedLength) override;
560};
561
562std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
563 return std::make_unique<NoCompressionCodec>(level, type);
564}
565
566NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
567 : Codec(type) {
568 DCHECK(type == CodecType::NO_COMPRESSION);
569 switch (level) {
570 case COMPRESSION_LEVEL_DEFAULT:
571 case COMPRESSION_LEVEL_FASTEST:
572 case COMPRESSION_LEVEL_BEST:
573 level = 0;
574 }
575 if (level != 0) {
576 throw std::invalid_argument(
577 to<std::string>("NoCompressionCodec: invalid level ", level));
578 }
579}
580
581uint64_t NoCompressionCodec::doMaxCompressedLength(
582 uint64_t uncompressedLength) const {
583 return uncompressedLength;
584}
585
586std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(const IOBuf* data) {
587 return data->clone();
588}
589
590std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
591 const IOBuf* data,
592 Optional<uint64_t> uncompressedLength) {
593 if (uncompressedLength &&
594 data->computeChainDataLength() != *uncompressedLength) {
595 throw std::runtime_error(
596 to<std::string>("NoCompressionCodec: invalid uncompressed length"));
597 }
598 return data->clone();
599}
600
601#if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
602
603namespace {
604
605void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
606 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
607 out->append(encodeVarint(val, out->writableTail()));
608}
609
610inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
611 uint64_t val = 0;
612 int8_t b = 0;
613 for (int shift = 0; shift <= 63; shift += 7) {
614 b = cursor.read<int8_t>();
615 val |= static_cast<uint64_t>(b & 0x7f) << shift;
616 if (b >= 0) {
617 break;
618 }
619 }
620 if (b < 0) {
621 throw std::invalid_argument("Invalid varint value. Too big.");
622 }
623 return val;
624}
625
626} // namespace
627
628#endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
629
630#if FOLLY_HAVE_LIBLZ4
631
632#if LZ4_VERSION_NUMBER >= 10802 && defined(LZ4_STATIC_LINKING_ONLY) && \
633 defined(LZ4_HC_STATIC_LINKING_ONLY) && !defined(FOLLY_USE_LZ4_FAST_RESET)
634#define FOLLY_USE_LZ4_FAST_RESET
635#endif
636
637#ifdef FOLLY_USE_LZ4_FAST_RESET
638namespace {
639void lz4_stream_t_deleter(LZ4_stream_t* ctx) {
640 LZ4_freeStream(ctx);
641}
642
643void lz4_streamhc_t_deleter(LZ4_streamHC_t* ctx) {
644 LZ4_freeStreamHC(ctx);
645}
646} // namespace
647#endif
648
649/**
650 * LZ4 compression
651 */
652class LZ4Codec final : public Codec {
653 public:
654 static std::unique_ptr<Codec> create(int level, CodecType type);
655 explicit LZ4Codec(int level, CodecType type);
656
657 private:
658 bool doNeedsUncompressedLength() const override;
659 uint64_t doMaxUncompressedLength() const override;
660 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
661
662 bool encodeSize() const {
663 return type() == CodecType::LZ4_VARINT_SIZE;
664 }
665
666 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
667 std::unique_ptr<IOBuf> doUncompress(
668 const IOBuf* data,
669 Optional<uint64_t> uncompressedLength) override;
670
671#ifdef FOLLY_USE_LZ4_FAST_RESET
672 std::unique_ptr<
673 LZ4_stream_t,
674 folly::static_function_deleter<LZ4_stream_t, lz4_stream_t_deleter>>
675 ctx;
676 std::unique_ptr<
677 LZ4_streamHC_t,
678 folly::static_function_deleter<LZ4_streamHC_t, lz4_streamhc_t_deleter>>
679 hcctx;
680#endif
681
682 bool highCompression_;
683};
684
685std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
686 return std::make_unique<LZ4Codec>(level, type);
687}
688
689static int lz4ConvertLevel(int level) {
690 switch (level) {
691 case 1:
692 case COMPRESSION_LEVEL_FASTEST:
693 case COMPRESSION_LEVEL_DEFAULT:
694 return 1;
695 case 2:
696 case COMPRESSION_LEVEL_BEST:
697 return 2;
698 }
699 throw std::invalid_argument(
700 to<std::string>("LZ4Codec: invalid level: ", level));
701}
702
703LZ4Codec::LZ4Codec(int level, CodecType type)
704 : Codec(type, lz4ConvertLevel(level)),
705 highCompression_(lz4ConvertLevel(level) > 1) {
706 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
707}
708
709bool LZ4Codec::doNeedsUncompressedLength() const {
710 return !encodeSize();
711}
712
713// The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
714// define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
715// here.
716#ifndef LZ4_MAX_INPUT_SIZE
717#define LZ4_MAX_INPUT_SIZE 0x7E000000
718#endif
719
720uint64_t LZ4Codec::doMaxUncompressedLength() const {
721 return LZ4_MAX_INPUT_SIZE;
722}
723
724uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
725 return LZ4_compressBound(uncompressedLength) +
726 (encodeSize() ? kMaxVarintLength64 : 0);
727}
728
729std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
730 IOBuf clone;
731 if (data->isChained()) {
732 // LZ4 doesn't support streaming, so we have to coalesce
733 clone = data->cloneCoalescedAsValue();
734 data = &clone;
735 }
736
737 auto out = IOBuf::create(maxCompressedLength(data->length()));
738 if (encodeSize()) {
739 encodeVarintToIOBuf(data->length(), out.get());
740 }
741
742 int n;
743 auto input = reinterpret_cast<const char*>(data->data());
744 auto output = reinterpret_cast<char*>(out->writableTail());
745 const auto inputLength = data->length();
746
747#ifdef FOLLY_USE_LZ4_FAST_RESET
748 if (!highCompression_ && !ctx) {
749 ctx.reset(LZ4_createStream());
750 }
751 if (highCompression_ && !hcctx) {
752 hcctx.reset(LZ4_createStreamHC());
753 }
754
755 if (highCompression_) {
756 n = LZ4_compress_HC_extStateHC_fastReset(
757 hcctx.get(), input, output, inputLength, out->tailroom(), 0);
758 } else {
759 n = LZ4_compress_fast_extState_fastReset(
760 ctx.get(), input, output, inputLength, out->tailroom(), 1);
761 }
762#elif LZ4_VERSION_NUMBER >= 10700
763 if (highCompression_) {
764 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
765 } else {
766 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
767 }
768#else
769 if (highCompression_) {
770 n = LZ4_compressHC(input, output, inputLength);
771 } else {
772 n = LZ4_compress(input, output, inputLength);
773 }
774#endif
775
776 CHECK_GE(n, 0);
777 CHECK_LE(n, out->capacity());
778
779 out->append(n);
780 return out;
781}
782
783std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
784 const IOBuf* data,
785 Optional<uint64_t> uncompressedLength) {
786 IOBuf clone;
787 if (data->isChained()) {
788 // LZ4 doesn't support streaming, so we have to coalesce
789 clone = data->cloneCoalescedAsValue();
790 data = &clone;
791 }
792
793 folly::io::Cursor cursor(data);
794 uint64_t actualUncompressedLength;
795 if (encodeSize()) {
796 actualUncompressedLength = decodeVarintFromCursor(cursor);
797 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
798 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
799 }
800 } else {
801 // Invariants
802 DCHECK(uncompressedLength.hasValue());
803 DCHECK(*uncompressedLength <= maxUncompressedLength());
804 actualUncompressedLength = *uncompressedLength;
805 }
806
807 auto sp = StringPiece{cursor.peekBytes()};
808 auto out = IOBuf::create(actualUncompressedLength);
809 int n = LZ4_decompress_safe(
810 sp.data(),
811 reinterpret_cast<char*>(out->writableTail()),
812 sp.size(),
813 actualUncompressedLength);
814
815 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
816 throw std::runtime_error(
817 to<std::string>("LZ4 decompression returned invalid value ", n));
818 }
819 out->append(actualUncompressedLength);
820 return out;
821}
822
823#if LZ4_VERSION_NUMBER >= 10301
824
825class LZ4FrameCodec final : public Codec {
826 public:
827 static std::unique_ptr<Codec> create(int level, CodecType type);
828 explicit LZ4FrameCodec(int level, CodecType type);
829 ~LZ4FrameCodec() override;
830
831 std::vector<std::string> validPrefixes() const override;
832 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
833 const override;
834
835 private:
836 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
837
838 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
839 std::unique_ptr<IOBuf> doUncompress(
840 const IOBuf* data,
841 Optional<uint64_t> uncompressedLength) override;
842
843 // Reset the dctx_ if it is dirty or null.
844 void resetDCtx();
845
846 int level_;
847#ifdef FOLLY_USE_LZ4_FAST_RESET
848 LZ4F_compressionContext_t cctx_{nullptr};
849#endif
850 LZ4F_decompressionContext_t dctx_{nullptr};
851 bool dirty_{false};
852};
853
854/* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
855 int level,
856 CodecType type) {
857 return std::make_unique<LZ4FrameCodec>(level, type);
858}
859
860static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
861
862std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
863 return {prefixToStringLE(kLZ4FrameMagicLE)};
864}
865
866bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
867 return dataStartsWithLE(data, kLZ4FrameMagicLE);
868}
869
870uint64_t LZ4FrameCodec::doMaxCompressedLength(
871 uint64_t uncompressedLength) const {
872 LZ4F_preferences_t prefs{};
873 prefs.compressionLevel = level_;
874 prefs.frameInfo.contentSize = uncompressedLength;
875 return LZ4F_compressFrameBound(uncompressedLength, &prefs);
876}
877
878static size_t lz4FrameThrowOnError(size_t code) {
879 if (LZ4F_isError(code)) {
880 throw std::runtime_error(
881 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
882 }
883 return code;
884}
885
886void LZ4FrameCodec::resetDCtx() {
887 if (dctx_ && !dirty_) {
888 return;
889 }
890 if (dctx_) {
891 LZ4F_freeDecompressionContext(dctx_);
892 }
893 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
894 dirty_ = false;
895}
896
897static int lz4fConvertLevel(int level) {
898 switch (level) {
899 case COMPRESSION_LEVEL_FASTEST:
900 case COMPRESSION_LEVEL_DEFAULT:
901 return 0;
902 case COMPRESSION_LEVEL_BEST:
903 return 16;
904 }
905 return level;
906}
907
908LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
909 : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
910 DCHECK(type == CodecType::LZ4_FRAME);
911}
912
913LZ4FrameCodec::~LZ4FrameCodec() {
914 if (dctx_) {
915 LZ4F_freeDecompressionContext(dctx_);
916 }
917#ifdef FOLLY_USE_LZ4_FAST_RESET
918 if (cctx_) {
919 LZ4F_freeCompressionContext(cctx_);
920 }
921#endif
922}
923
924std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
925 // LZ4 Frame compression doesn't support streaming so we have to coalesce
926 IOBuf clone;
927 if (data->isChained()) {
928 clone = data->cloneCoalescedAsValue();
929 data = &clone;
930 }
931
932#ifdef FOLLY_USE_LZ4_FAST_RESET
933 if (!cctx_) {
934 lz4FrameThrowOnError(LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION));
935 }
936#endif
937
938 // Set preferences
939 const auto uncompressedLength = data->length();
940 LZ4F_preferences_t prefs{};
941 prefs.compressionLevel = level_;
942 prefs.frameInfo.contentSize = uncompressedLength;
943 // Compress
944 auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
945 const size_t written = lz4FrameThrowOnError(
946#ifdef FOLLY_USE_LZ4_FAST_RESET
947 LZ4F_compressFrame_usingCDict(
948 cctx_,
949 buf->writableTail(),
950 buf->tailroom(),
951 data->data(),
952 data->length(),
953 nullptr,
954 &prefs)
955#else
956 LZ4F_compressFrame(
957 buf->writableTail(),
958 buf->tailroom(),
959 data->data(),
960 data->length(),
961 &prefs)
962#endif
963 );
964 buf->append(written);
965 return buf;
966}
967
968std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
969 const IOBuf* data,
970 Optional<uint64_t> uncompressedLength) {
971 // Reset the dctx if any errors have occurred
972 resetDCtx();
973 // Coalesce the data
974 ByteRange in = *data->begin();
975 IOBuf clone;
976 if (data->isChained()) {
977 clone = data->cloneCoalescedAsValue();
978 in = clone.coalesce();
979 }
980 data = nullptr;
981 // Select decompression options
982 LZ4F_decompressOptions_t options;
983 options.stableDst = 1;
984 // Select blockSize and growthSize for the IOBufQueue
985 IOBufQueue queue(IOBufQueue::cacheChainLength());
986 auto blockSize = uint64_t{64} << 10;
987 auto growthSize = uint64_t{4} << 20;
988 if (uncompressedLength) {
989 // Allocate uncompressedLength in one chunk (up to 64 MB)
990 const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
991 queue.preallocate(allocateSize, allocateSize);
992 blockSize = std::min(*uncompressedLength, blockSize);
993 growthSize = std::min(*uncompressedLength, growthSize);
994 } else {
995 // Reduce growthSize for small data
996 const auto guessUncompressedLen =
997 4 * std::max<uint64_t>(blockSize, in.size());
998 growthSize = std::min(guessUncompressedLen, growthSize);
999 }
1000 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
1001 // returns 0
1002 dirty_ = true;
1003 // Decompress until the frame is over
1004 size_t code = 0;
1005 do {
1006 // Allocate enough space to decompress at least a block
1007 void* out;
1008 size_t outSize;
1009 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
1010 // Decompress
1011 size_t inSize = in.size();
1012 code = lz4FrameThrowOnError(
1013 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
1014 if (in.empty() && outSize == 0 && code != 0) {
1015 // We passed no input, no output was produced, and the frame isn't over
1016 // No more forward progress is possible
1017 throw std::runtime_error("LZ4Frame error: Incomplete frame");
1018 }
1019 in.uncheckedAdvance(inSize);
1020 queue.postallocate(outSize);
1021 } while (code != 0);
1022 // At this point the decompression context can be reused
1023 dirty_ = false;
1024 if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
1025 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
1026 }
1027 return queue.move();
1028}
1029
1030#endif // LZ4_VERSION_NUMBER >= 10301
1031#endif // FOLLY_HAVE_LIBLZ4
1032
1033#if FOLLY_HAVE_LIBSNAPPY
1034
1035/**
1036 * Snappy compression
1037 */
1038
1039/**
1040 * Implementation of snappy::Source that reads from a IOBuf chain.
1041 */
1042class IOBufSnappySource final : public snappy::Source {
1043 public:
1044 explicit IOBufSnappySource(const IOBuf* data);
1045 size_t Available() const override;
1046 const char* Peek(size_t* len) override;
1047 void Skip(size_t n) override;
1048
1049 private:
1050 size_t available_;
1051 io::Cursor cursor_;
1052};
1053
1054IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
1055 : available_(data->computeChainDataLength()), cursor_(data) {}
1056
1057size_t IOBufSnappySource::Available() const {
1058 return available_;
1059}
1060
1061const char* IOBufSnappySource::Peek(size_t* len) {
1062 auto sp = StringPiece{cursor_.peekBytes()};
1063 *len = sp.size();
1064 return sp.data();
1065}
1066
1067void IOBufSnappySource::Skip(size_t n) {
1068 CHECK_LE(n, available_);
1069 cursor_.skip(n);
1070 available_ -= n;
1071}
1072
1073class SnappyCodec final : public Codec {
1074 public:
1075 static std::unique_ptr<Codec> create(int level, CodecType type);
1076 explicit SnappyCodec(int level, CodecType type);
1077
1078 private:
1079 uint64_t doMaxUncompressedLength() const override;
1080 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1081 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1082 std::unique_ptr<IOBuf> doUncompress(
1083 const IOBuf* data,
1084 Optional<uint64_t> uncompressedLength) override;
1085};
1086
1087std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
1088 return std::make_unique<SnappyCodec>(level, type);
1089}
1090
1091SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
1092 DCHECK(type == CodecType::SNAPPY);
1093 switch (level) {
1094 case COMPRESSION_LEVEL_FASTEST:
1095 case COMPRESSION_LEVEL_DEFAULT:
1096 case COMPRESSION_LEVEL_BEST:
1097 level = 1;
1098 }
1099 if (level != 1) {
1100 throw std::invalid_argument(
1101 to<std::string>("SnappyCodec: invalid level: ", level));
1102 }
1103}
1104
1105uint64_t SnappyCodec::doMaxUncompressedLength() const {
1106 // snappy.h uses uint32_t for lengths, so there's that.
1107 return std::numeric_limits<uint32_t>::max();
1108}
1109
1110uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1111 return snappy::MaxCompressedLength(uncompressedLength);
1112}
1113
1114std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
1115 IOBufSnappySource source(data);
1116 auto out = IOBuf::create(maxCompressedLength(source.Available()));
1117
1118 snappy::UncheckedByteArraySink sink(
1119 reinterpret_cast<char*>(out->writableTail()));
1120
1121 size_t n = snappy::Compress(&source, &sink);
1122
1123 CHECK_LE(n, out->capacity());
1124 out->append(n);
1125 return out;
1126}
1127
1128std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
1129 const IOBuf* data,
1130 Optional<uint64_t> uncompressedLength) {
1131 uint32_t actualUncompressedLength = 0;
1132
1133 {
1134 IOBufSnappySource source(data);
1135 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
1136 throw std::runtime_error("snappy::GetUncompressedLength failed");
1137 }
1138 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
1139 throw std::runtime_error("snappy: invalid uncompressed length");
1140 }
1141 }
1142
1143 auto out = IOBuf::create(actualUncompressedLength);
1144
1145 {
1146 IOBufSnappySource source(data);
1147 if (!snappy::RawUncompress(
1148 &source, reinterpret_cast<char*>(out->writableTail()))) {
1149 throw std::runtime_error("snappy::RawUncompress failed");
1150 }
1151 }
1152
1153 out->append(actualUncompressedLength);
1154 return out;
1155}
1156
1157#endif // FOLLY_HAVE_LIBSNAPPY
1158
1159#if FOLLY_HAVE_LIBLZMA
1160
1161/**
1162 * LZMA2 compression
1163 */
1164class LZMA2StreamCodec final : public StreamCodec {
1165 public:
1166 static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1167 static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1168 explicit LZMA2StreamCodec(int level, CodecType type);
1169 ~LZMA2StreamCodec() override;
1170
1171 std::vector<std::string> validPrefixes() const override;
1172 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1173 const override;
1174
1175 private:
1176 bool doNeedsDataLength() const override;
1177 uint64_t doMaxUncompressedLength() const override;
1178 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1179
1180 bool encodeSize() const {
1181 return type() == CodecType::LZMA2_VARINT_SIZE;
1182 }
1183
1184 void doResetStream() override;
1185 bool doCompressStream(
1186 ByteRange& input,
1187 MutableByteRange& output,
1188 StreamCodec::FlushOp flushOp) override;
1189 bool doUncompressStream(
1190 ByteRange& input,
1191 MutableByteRange& output,
1192 StreamCodec::FlushOp flushOp) override;
1193
1194 void resetCStream();
1195 void resetDStream();
1196
1197 bool decodeAndCheckVarint(ByteRange& input);
1198 bool flushVarintBuffer(MutableByteRange& output);
1199 void resetVarintBuffer();
1200
1201 Optional<lzma_stream> cstream_{};
1202 Optional<lzma_stream> dstream_{};
1203
1204 std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1205 ByteRange varintToEncode_;
1206 size_t varintBufferPos_{0};
1207
1208 int level_;
1209 bool needReset_{true};
1210 bool needDecodeSize_{false};
1211};
1212
1213static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1214static constexpr unsigned kLZMA2MagicBytes = 6;
1215
1216std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1217 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1218 return {};
1219 }
1220 return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1221}
1222
1223bool LZMA2StreamCodec::doNeedsDataLength() const {
1224 return encodeSize();
1225}
1226
1227bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1228 const {
1229 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1230 return false;
1231 }
1232 // Returns false for all inputs less than 8 bytes.
1233 // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1234 return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1235}
1236
1237std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1238 int level,
1239 CodecType type) {
1240 return make_unique<LZMA2StreamCodec>(level, type);
1241}
1242
1243std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1244 int level,
1245 CodecType type) {
1246 return make_unique<LZMA2StreamCodec>(level, type);
1247}
1248
1249LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1250 : StreamCodec(type) {
1251 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1252 switch (level) {
1253 case COMPRESSION_LEVEL_FASTEST:
1254 level = 0;
1255 break;
1256 case COMPRESSION_LEVEL_DEFAULT:
1257 level = LZMA_PRESET_DEFAULT;
1258 break;
1259 case COMPRESSION_LEVEL_BEST:
1260 level = 9;
1261 break;
1262 }
1263 if (level < 0 || level > 9) {
1264 throw std::invalid_argument(
1265 to<std::string>("LZMA2Codec: invalid level: ", level));
1266 }
1267 level_ = level;
1268}
1269
1270LZMA2StreamCodec::~LZMA2StreamCodec() {
1271 if (cstream_) {
1272 lzma_end(cstream_.get_pointer());
1273 cstream_.clear();
1274 }
1275 if (dstream_) {
1276 lzma_end(dstream_.get_pointer());
1277 dstream_.clear();
1278 }
1279}
1280
1281uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1282 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1283 return uint64_t(1) << 63;
1284}
1285
1286uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1287 uint64_t uncompressedLength) const {
1288 return lzma_stream_buffer_bound(uncompressedLength) +
1289 (encodeSize() ? kMaxVarintLength64 : 0);
1290}
1291
1292void LZMA2StreamCodec::doResetStream() {
1293 needReset_ = true;
1294}
1295
1296void LZMA2StreamCodec::resetCStream() {
1297 if (!cstream_) {
1298 cstream_.assign(LZMA_STREAM_INIT);
1299 }
1300 lzma_ret const rc =
1301 lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1302 if (rc != LZMA_OK) {
1303 throw std::runtime_error(folly::to<std::string>(
1304 "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1305 }
1306}
1307
1308void LZMA2StreamCodec::resetDStream() {
1309 if (!dstream_) {
1310 dstream_.assign(LZMA_STREAM_INIT);
1311 }
1312 lzma_ret const rc = lzma_auto_decoder(
1313 dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1314 if (rc != LZMA_OK) {
1315 throw std::runtime_error(folly::to<std::string>(
1316 "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1317 }
1318}
1319
1320static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1321 switch (rc) {
1322 case LZMA_OK:
1323 case LZMA_STREAM_END:
1324 case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1325 return rc;
1326 default:
1327 throw std::runtime_error(
1328 to<std::string>("LZMA2StreamCodec: error: ", rc));
1329 }
1330}
1331
1332static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1333 switch (flush) {
1334 case StreamCodec::FlushOp::NONE:
1335 return LZMA_RUN;
1336 case StreamCodec::FlushOp::FLUSH:
1337 return LZMA_SYNC_FLUSH;
1338 case StreamCodec::FlushOp::END:
1339 return LZMA_FINISH;
1340 default:
1341 throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1342 }
1343}
1344
1345/**
1346 * Flushes the varint buffer.
1347 * Advances output by the number of bytes written.
1348 * Returns true when flushing is complete.
1349 */
1350bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1351 if (varintToEncode_.empty()) {
1352 return true;
1353 }
1354 const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1355 if (numBytesToCopy > 0) {
1356 memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1357 }
1358 varintToEncode_.advance(numBytesToCopy);
1359 output.advance(numBytesToCopy);
1360 return varintToEncode_.empty();
1361}
1362
1363bool LZMA2StreamCodec::doCompressStream(
1364 ByteRange& input,
1365 MutableByteRange& output,
1366 StreamCodec::FlushOp flushOp) {
1367 if (needReset_) {
1368 resetCStream();
1369 if (encodeSize()) {
1370 varintBufferPos_ = 0;
1371 size_t const varintSize =
1372 encodeVarint(*uncompressedLength(), varintBuffer_.data());
1373 varintToEncode_ = {varintBuffer_.data(), varintSize};
1374 }
1375 needReset_ = false;
1376 }
1377
1378 if (!flushVarintBuffer(output)) {
1379 return false;
1380 }
1381
1382 cstream_->next_in = const_cast<uint8_t*>(input.data());
1383 cstream_->avail_in = input.size();
1384 cstream_->next_out = output.data();
1385 cstream_->avail_out = output.size();
1386 SCOPE_EXIT {
1387 input.uncheckedAdvance(input.size() - cstream_->avail_in);
1388 output.uncheckedAdvance(output.size() - cstream_->avail_out);
1389 };
1390 lzma_ret const rc = lzmaThrowOnError(
1391 lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1392 switch (flushOp) {
1393 case StreamCodec::FlushOp::NONE:
1394 return false;
1395 case StreamCodec::FlushOp::FLUSH:
1396 return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1397 case StreamCodec::FlushOp::END:
1398 return rc == LZMA_STREAM_END;
1399 default:
1400 throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1401 }
1402}
1403
1404/**
1405 * Attempts to decode a varint from input.
1406 * The function advances input by the number of bytes read.
1407 *
1408 * If there are too many bytes and the varint is not valid, throw a
1409 * runtime_error.
1410 *
1411 * If the uncompressed length was provided and a decoded varint does not match
1412 * the provided length, throw a runtime_error.
1413 *
1414 * Returns true if the varint was successfully decoded and matches the
1415 * uncompressed length if provided, and false if more bytes are needed.
1416 */
1417bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) {
1418 if (input.empty()) {
1419 return false;
1420 }
1421 size_t const numBytesToCopy =
1422 std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1423 memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1424
1425 size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1426 ByteRange range{varintBuffer_.data(), rangeSize};
1427 auto const ret = tryDecodeVarint(range);
1428
1429 if (ret.hasValue()) {
1430 size_t const varintSize = rangeSize - range.size();
1431 input.advance(varintSize - varintBufferPos_);
1432 if (uncompressedLength() && *uncompressedLength() != ret.value()) {
1433 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1434 }
1435 return true;
1436 } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1437 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1438 } else {
1439 // Too few bytes
1440 input.advance(numBytesToCopy);
1441 varintBufferPos_ += numBytesToCopy;
1442 return false;
1443 }
1444}
1445
1446bool LZMA2StreamCodec::doUncompressStream(
1447 ByteRange& input,
1448 MutableByteRange& output,
1449 StreamCodec::FlushOp flushOp) {
1450 if (needReset_) {
1451 resetDStream();
1452 needReset_ = false;
1453 needDecodeSize_ = encodeSize();
1454 if (encodeSize()) {
1455 // Reset buffer
1456 varintBufferPos_ = 0;
1457 }
1458 }
1459
1460 if (needDecodeSize_) {
1461 // Try decoding the varint. If the input does not contain the entire varint,
1462 // buffer the input. If the varint can not be decoded, fail.
1463 if (!decodeAndCheckVarint(input)) {
1464 return false;
1465 }
1466 needDecodeSize_ = false;
1467 }
1468
1469 dstream_->next_in = const_cast<uint8_t*>(input.data());
1470 dstream_->avail_in = input.size();
1471 dstream_->next_out = output.data();
1472 dstream_->avail_out = output.size();
1473 SCOPE_EXIT {
1474 input.advance(input.size() - dstream_->avail_in);
1475 output.advance(output.size() - dstream_->avail_out);
1476 };
1477
1478 lzma_ret rc;
1479 switch (flushOp) {
1480 case StreamCodec::FlushOp::NONE:
1481 case StreamCodec::FlushOp::FLUSH:
1482 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1483 break;
1484 case StreamCodec::FlushOp::END:
1485 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1486 break;
1487 default:
1488 throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1489 }
1490 return rc == LZMA_STREAM_END;
1491}
1492#endif // FOLLY_HAVE_LIBLZMA
1493
1494#if FOLLY_HAVE_LIBZSTD
1495
1496static int zstdConvertLevel(int level) {
1497 switch (level) {
1498 case COMPRESSION_LEVEL_FASTEST:
1499 return 1;
1500 case COMPRESSION_LEVEL_DEFAULT:
1501 return 1;
1502 case COMPRESSION_LEVEL_BEST:
1503 return 19;
1504 }
1505 if (level < 1 || level > ZSTD_maxCLevel()) {
1506 throw std::invalid_argument(
1507 to<std::string>("ZSTD: invalid level: ", level));
1508 }
1509 return level;
1510}
1511
1512static int zstdFastConvertLevel(int level) {
1513 switch (level) {
1514 case COMPRESSION_LEVEL_FASTEST:
1515 return -5;
1516 case COMPRESSION_LEVEL_DEFAULT:
1517 return -1;
1518 case COMPRESSION_LEVEL_BEST:
1519 return -1;
1520 }
1521 if (level < 1) {
1522 throw std::invalid_argument(
1523 to<std::string>("ZSTD: invalid level: ", level));
1524 }
1525 return -level;
1526}
1527
1528std::unique_ptr<Codec> getZstdCodec(int level, CodecType type) {
1529 DCHECK(type == CodecType::ZSTD);
1530 return zstd::getCodec(zstd::Options(zstdConvertLevel(level)));
1531}
1532
1533std::unique_ptr<StreamCodec> getZstdStreamCodec(int level, CodecType type) {
1534 DCHECK(type == CodecType::ZSTD);
1535 return zstd::getStreamCodec(zstd::Options(zstdConvertLevel(level)));
1536}
1537
1538std::unique_ptr<Codec> getZstdFastCodec(int level, CodecType type) {
1539 DCHECK(type == CodecType::ZSTD_FAST);
1540 return zstd::getCodec(zstd::Options(zstdFastConvertLevel(level)));
1541}
1542
1543std::unique_ptr<StreamCodec> getZstdFastStreamCodec(int level, CodecType type) {
1544 DCHECK(type == CodecType::ZSTD_FAST);
1545 return zstd::getStreamCodec(zstd::Options(zstdFastConvertLevel(level)));
1546}
1547
1548#endif // FOLLY_HAVE_LIBZSTD
1549
1550#if FOLLY_HAVE_LIBBZ2
1551
1552class Bzip2StreamCodec final : public StreamCodec {
1553 public:
1554 static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1555 static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1556 explicit Bzip2StreamCodec(int level, CodecType type);
1557
1558 ~Bzip2StreamCodec() override;
1559
1560 std::vector<std::string> validPrefixes() const override;
1561 bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1562 const override;
1563
1564 private:
1565 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1566
1567 void doResetStream() override;
1568 bool doCompressStream(
1569 ByteRange& input,
1570 MutableByteRange& output,
1571 StreamCodec::FlushOp flushOp) override;
1572 bool doUncompressStream(
1573 ByteRange& input,
1574 MutableByteRange& output,
1575 StreamCodec::FlushOp flushOp) override;
1576
1577 void resetCStream();
1578 void resetDStream();
1579
1580 Optional<bz_stream> cstream_{};
1581 Optional<bz_stream> dstream_{};
1582
1583 int level_;
1584 bool needReset_{true};
1585};
1586
1587/* static */ std::unique_ptr<Codec> Bzip2StreamCodec::createCodec(
1588 int level,
1589 CodecType type) {
1590 return createStream(level, type);
1591}
1592
1593/* static */ std::unique_ptr<StreamCodec> Bzip2StreamCodec::createStream(
1594 int level,
1595 CodecType type) {
1596 return std::make_unique<Bzip2StreamCodec>(level, type);
1597}
1598
1599Bzip2StreamCodec::Bzip2StreamCodec(int level, CodecType type)
1600 : StreamCodec(type) {
1601 DCHECK(type == CodecType::BZIP2);
1602 switch (level) {
1603 case COMPRESSION_LEVEL_FASTEST:
1604 level = 1;
1605 break;
1606 case COMPRESSION_LEVEL_DEFAULT:
1607 level = 9;
1608 break;
1609 case COMPRESSION_LEVEL_BEST:
1610 level = 9;
1611 break;
1612 }
1613 if (level < 1 || level > 9) {
1614 throw std::invalid_argument(
1615 to<std::string>("Bzip2: invalid level: ", level));
1616 }
1617 level_ = level;
1618}
1619
1620static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1621static uint64_t constexpr kBzip2MagicBytes = 3;
1622
1623std::vector<std::string> Bzip2StreamCodec::validPrefixes() const {
1624 return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1625}
1626
1627bool Bzip2StreamCodec::canUncompress(IOBuf const* data, Optional<uint64_t>)
1628 const {
1629 return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1630}
1631
1632uint64_t Bzip2StreamCodec::doMaxCompressedLength(
1633 uint64_t uncompressedLength) const {
1634 // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1635 // To guarantee that the compressed data will fit in its buffer, allocate an
1636 // output buffer of size 1% larger than the uncompressed data, plus six
1637 // hundred extra bytes.
1638 return uncompressedLength + uncompressedLength / 100 + 600;
1639}
1640
1641static bz_stream createBzStream() {
1642 bz_stream stream;
1643 stream.bzalloc = nullptr;
1644 stream.bzfree = nullptr;
1645 stream.opaque = nullptr;
1646 stream.next_in = stream.next_out = nullptr;
1647 stream.avail_in = stream.avail_out = 0;
1648 return stream;
1649}
1650
1651// Throws on error condition, otherwise returns the code.
1652static int bzCheck(int const rc) {
1653 switch (rc) {
1654 case BZ_OK:
1655 case BZ_RUN_OK:
1656 case BZ_FLUSH_OK:
1657 case BZ_FINISH_OK:
1658 case BZ_STREAM_END:
1659 // Allow BZ_PARAM_ERROR.
1660 // It can get returned if no progress is made, but we handle that.
1661 case BZ_PARAM_ERROR:
1662 return rc;
1663 default:
1664 throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1665 }
1666}
1667
1668Bzip2StreamCodec::~Bzip2StreamCodec() {
1669 if (cstream_) {
1670 BZ2_bzCompressEnd(cstream_.get_pointer());
1671 cstream_.clear();
1672 }
1673 if (dstream_) {
1674 BZ2_bzDecompressEnd(dstream_.get_pointer());
1675 dstream_.clear();
1676 }
1677}
1678
1679void Bzip2StreamCodec::doResetStream() {
1680 needReset_ = true;
1681}
1682
1683void Bzip2StreamCodec::resetCStream() {
1684 if (cstream_) {
1685 BZ2_bzCompressEnd(cstream_.get_pointer());
1686 }
1687 cstream_ = createBzStream();
1688 bzCheck(BZ2_bzCompressInit(cstream_.get_pointer(), level_, 0, 0));
1689}
1690
1691int bzip2TranslateFlush(StreamCodec::FlushOp flushOp) {
1692 switch (flushOp) {
1693 case StreamCodec::FlushOp::NONE:
1694 return BZ_RUN;
1695 case StreamCodec::FlushOp::END:
1696 return BZ_FINISH;
1697 case StreamCodec::FlushOp::FLUSH:
1698 throw std::invalid_argument(
1699 "Bzip2StreamCodec: FlushOp::FLUSH not supported");
1700 default:
1701 throw std::invalid_argument("Bzip2StreamCodec: Invalid flush");
1702 }
1703}
1704
1705bool Bzip2StreamCodec::doCompressStream(
1706 ByteRange& input,
1707 MutableByteRange& output,
1708 StreamCodec::FlushOp flushOp) {
1709 if (needReset_) {
1710 resetCStream();
1711 needReset_ = false;
1712 }
1713 if (input.empty() && output.empty()) {
1714 return false;
1715 }
1716
1717 cstream_->next_in =
1718 const_cast<char*>(reinterpret_cast<const char*>(input.data()));
1719 cstream_->avail_in = input.size();
1720 cstream_->next_out = reinterpret_cast<char*>(output.data());
1721 cstream_->avail_out = output.size();
1722 SCOPE_EXIT {
1723 input.uncheckedAdvance(input.size() - cstream_->avail_in);
1724 output.uncheckedAdvance(output.size() - cstream_->avail_out);
1725 };
1726 int const rc = bzCheck(
1727 BZ2_bzCompress(cstream_.get_pointer(), bzip2TranslateFlush(flushOp)));
1728 switch (flushOp) {
1729 case StreamCodec::FlushOp::NONE:
1730 return false;
1731 case StreamCodec::FlushOp::FLUSH:
1732 if (rc == BZ_RUN_OK) {
1733 DCHECK_EQ(cstream_->avail_in, 0);
1734 DCHECK(input.size() == 0 || cstream_->avail_out != output.size());
1735 return true;
1736 }
1737 return false;
1738 case StreamCodec::FlushOp::END:
1739 return rc == BZ_STREAM_END;
1740 default:
1741 throw std::invalid_argument("Bzip2StreamCodec: invalid FlushOp");
1742 }
1743 return false;
1744}
1745
1746void Bzip2StreamCodec::resetDStream() {
1747 if (dstream_) {
1748 BZ2_bzDecompressEnd(dstream_.get_pointer());
1749 }
1750 dstream_ = createBzStream();
1751 bzCheck(BZ2_bzDecompressInit(dstream_.get_pointer(), 0, 0));
1752}
1753
1754bool Bzip2StreamCodec::doUncompressStream(
1755 ByteRange& input,
1756 MutableByteRange& output,
1757 StreamCodec::FlushOp flushOp) {
1758 if (flushOp == StreamCodec::FlushOp::FLUSH) {
1759 throw std::invalid_argument(
1760 "Bzip2StreamCodec: FlushOp::FLUSH not supported");
1761 }
1762 if (needReset_) {
1763 resetDStream();
1764 needReset_ = false;
1765 }
1766
1767 dstream_->next_in =
1768 const_cast<char*>(reinterpret_cast<const char*>(input.data()));
1769 dstream_->avail_in = input.size();
1770 dstream_->next_out = reinterpret_cast<char*>(output.data());
1771 dstream_->avail_out = output.size();
1772 SCOPE_EXIT {
1773 input.uncheckedAdvance(input.size() - dstream_->avail_in);
1774 output.uncheckedAdvance(output.size() - dstream_->avail_out);
1775 };
1776 int const rc = bzCheck(BZ2_bzDecompress(dstream_.get_pointer()));
1777 return rc == BZ_STREAM_END;
1778}
1779
1780#endif // FOLLY_HAVE_LIBBZ2
1781
1782#if FOLLY_HAVE_LIBZ
1783
1784zlib::Options getZlibOptions(CodecType type) {
1785 DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1786 return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1787 : zlib::defaultZlibOptions();
1788}
1789
1790std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1791 return zlib::getCodec(getZlibOptions(type), level);
1792}
1793
1794std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1795 return zlib::getStreamCodec(getZlibOptions(type), level);
1796}
1797
1798#endif // FOLLY_HAVE_LIBZ
1799
1800/**
1801 * Automatic decompression
1802 */
1803class AutomaticCodec final : public Codec {
1804 public:
1805 static std::unique_ptr<Codec> create(
1806 std::vector<std::unique_ptr<Codec>> customCodecs,
1807 std::unique_ptr<Codec> terminalCodec);
1808 explicit AutomaticCodec(
1809 std::vector<std::unique_ptr<Codec>> customCodecs,
1810 std::unique_ptr<Codec> terminalCodec);
1811
1812 std::vector<std::string> validPrefixes() const override;
1813 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1814 const override;
1815
1816 private:
1817 bool doNeedsUncompressedLength() const override;
1818 uint64_t doMaxUncompressedLength() const override;
1819
1820 uint64_t doMaxCompressedLength(uint64_t) const override {
1821 throw std::runtime_error(
1822 "AutomaticCodec error: maxCompressedLength() not supported.");
1823 }
1824 std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1825 throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1826 }
1827 std::unique_ptr<IOBuf> doUncompress(
1828 const IOBuf* data,
1829 Optional<uint64_t> uncompressedLength) override;
1830
1831 void addCodecIfSupported(CodecType type);
1832
1833 // Throws iff the codecs aren't compatible (very slow)
1834 void checkCompatibleCodecs() const;
1835
1836 std::vector<std::unique_ptr<Codec>> codecs_;
1837 std::unique_ptr<Codec> terminalCodec_;
1838 bool needsUncompressedLength_;
1839 uint64_t maxUncompressedLength_;
1840};
1841
1842std::vector<std::string> AutomaticCodec::validPrefixes() const {
1843 std::unordered_set<std::string> prefixes;
1844 for (const auto& codec : codecs_) {
1845 const auto codecPrefixes = codec->validPrefixes();
1846 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1847 }
1848 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1849}
1850
1851bool AutomaticCodec::canUncompress(
1852 const IOBuf* data,
1853 Optional<uint64_t> uncompressedLength) const {
1854 return std::any_of(
1855 codecs_.begin(),
1856 codecs_.end(),
1857 [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1858 return codec->canUncompress(data, uncompressedLength);
1859 });
1860}
1861
1862void AutomaticCodec::addCodecIfSupported(CodecType type) {
1863 const bool present = std::any_of(
1864 codecs_.begin(),
1865 codecs_.end(),
1866 [&type](std::unique_ptr<Codec> const& codec) {
1867 return codec->type() == type;
1868 });
1869 bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type;
1870 if (hasCodec(type) && !present && !isTerminalType) {
1871 codecs_.push_back(getCodec(type));
1872 }
1873}
1874
1875/* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1876 std::vector<std::unique_ptr<Codec>> customCodecs,
1877 std::unique_ptr<Codec> terminalCodec) {
1878 return std::make_unique<AutomaticCodec>(
1879 std::move(customCodecs), std::move(terminalCodec));
1880}
1881
1882AutomaticCodec::AutomaticCodec(
1883 std::vector<std::unique_ptr<Codec>> customCodecs,
1884 std::unique_ptr<Codec> terminalCodec)
1885 : Codec(CodecType::USER_DEFINED, folly::none, "auto"),
1886 codecs_(std::move(customCodecs)),
1887 terminalCodec_(std::move(terminalCodec)) {
1888 // Fastest -> slowest
1889 std::array<CodecType, 6> defaultTypes{{
1890 CodecType::LZ4_FRAME,
1891 CodecType::ZSTD,
1892 CodecType::ZLIB,
1893 CodecType::GZIP,
1894 CodecType::LZMA2,
1895 CodecType::BZIP2,
1896 }};
1897
1898 for (auto type : defaultTypes) {
1899 addCodecIfSupported(type);
1900 }
1901
1902 if (kIsDebug) {
1903 checkCompatibleCodecs();
1904 }
1905
1906 // Check that none of the codecs are null
1907 DCHECK(std::none_of(
1908 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1909 return codec == nullptr;
1910 }));
1911
1912 // Check that the terminal codec's type is not duplicated (with the exception
1913 // of USER_DEFINED).
1914 if (terminalCodec_) {
1915 DCHECK(std::none_of(
1916 codecs_.begin(),
1917 codecs_.end(),
1918 [&](std::unique_ptr<Codec> const& codec) {
1919 return codec->type() != CodecType::USER_DEFINED &&
1920 codec->type() == terminalCodec_->type();
1921 }));
1922 }
1923
1924 bool const terminalNeedsUncompressedLength =
1925 terminalCodec_ && terminalCodec_->needsUncompressedLength();
1926 needsUncompressedLength_ = std::any_of(
1927 codecs_.begin(),
1928 codecs_.end(),
1929 [](std::unique_ptr<Codec> const& codec) {
1930 return codec->needsUncompressedLength();
1931 }) ||
1932 terminalNeedsUncompressedLength;
1933
1934 const auto it = std::max_element(
1935 codecs_.begin(),
1936 codecs_.end(),
1937 [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1938 return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1939 });
1940 DCHECK(it != codecs_.end());
1941 auto const terminalMaxUncompressedLength =
1942 terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0;
1943 maxUncompressedLength_ =
1944 std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength);
1945}
1946
1947void AutomaticCodec::checkCompatibleCodecs() const {
1948 // Keep track of all the possible headers.
1949 std::unordered_set<std::string> headers;
1950 // The empty header is not allowed.
1951 headers.insert("");
1952 // Step 1:
1953 // Construct a set of headers and check that none of the headers occur twice.
1954 // Eliminate edge cases.
1955 for (auto&& codec : codecs_) {
1956 const auto codecHeaders = codec->validPrefixes();
1957 // Codecs without any valid headers are not allowed.
1958 if (codecHeaders.empty()) {
1959 throw std::invalid_argument{
1960 "AutomaticCodec: validPrefixes() must not be empty."};
1961 }
1962 // Insert all the headers for the current codec.
1963 const size_t beforeSize = headers.size();
1964 headers.insert(codecHeaders.begin(), codecHeaders.end());
1965 // Codecs are not compatible if any header occurred twice.
1966 if (beforeSize + codecHeaders.size() != headers.size()) {
1967 throw std::invalid_argument{
1968 "AutomaticCodec: Two valid prefixes collide."};
1969 }
1970 }
1971 // Step 2:
1972 // Check if any strict non-empty prefix of any header is a header.
1973 for (const auto& header : headers) {
1974 for (size_t i = 1; i < header.size(); ++i) {
1975 if (headers.count(header.substr(0, i))) {
1976 throw std::invalid_argument{
1977 "AutomaticCodec: One valid prefix is a prefix of another valid "
1978 "prefix."};
1979 }
1980 }
1981 }
1982}
1983
1984bool AutomaticCodec::doNeedsUncompressedLength() const {
1985 return needsUncompressedLength_;
1986}
1987
1988uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1989 return maxUncompressedLength_;
1990}
1991
1992std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1993 const IOBuf* data,
1994 Optional<uint64_t> uncompressedLength) {
1995 try {
1996 for (auto&& codec : codecs_) {
1997 if (codec->canUncompress(data, uncompressedLength)) {
1998 return codec->uncompress(data, uncompressedLength);
1999 }
2000 }
2001 } catch (std::exception const& e) {
2002 if (!terminalCodec_) {
2003 throw e;
2004 }
2005 }
2006
2007 // Try terminal codec
2008 if (terminalCodec_) {
2009 return terminalCodec_->uncompress(data, uncompressedLength);
2010 }
2011
2012 throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
2013}
2014
2015using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
2016using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
2017struct Factory {
2018 CodecFactory codec;
2019 StreamCodecFactory stream;
2020};
2021
2022constexpr Factory
2023 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2024 {}, // USER_DEFINED
2025 {NoCompressionCodec::create, nullptr},
2026
2027#if FOLLY_HAVE_LIBLZ4
2028 {LZ4Codec::create, nullptr},
2029#else
2030 {},
2031#endif
2032
2033#if FOLLY_HAVE_LIBSNAPPY
2034 {SnappyCodec::create, nullptr},
2035#else
2036 {},
2037#endif
2038
2039#if FOLLY_HAVE_LIBZ
2040 {getZlibCodec, getZlibStreamCodec},
2041#else
2042 {},
2043#endif
2044
2045#if FOLLY_HAVE_LIBLZ4
2046 {LZ4Codec::create, nullptr},
2047#else
2048 {},
2049#endif
2050
2051#if FOLLY_HAVE_LIBLZMA
2052 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2053 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2054#else
2055 {},
2056 {},
2057#endif
2058
2059#if FOLLY_HAVE_LIBZSTD
2060 {getZstdCodec, getZstdStreamCodec},
2061#else
2062 {},
2063#endif
2064
2065#if FOLLY_HAVE_LIBZ
2066 {getZlibCodec, getZlibStreamCodec},
2067#else
2068 {},
2069#endif
2070
2071#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2072 {LZ4FrameCodec::create, nullptr},
2073#else
2074 {},
2075#endif
2076
2077#if FOLLY_HAVE_LIBBZ2
2078 {Bzip2StreamCodec::createCodec, Bzip2StreamCodec::createStream},
2079#else
2080 {},
2081#endif
2082
2083#if FOLLY_HAVE_LIBZSTD
2084 {getZstdFastCodec, getZstdFastStreamCodec},
2085#else
2086 {},
2087#endif
2088};
2089
2090Factory const& getFactory(CodecType type) {
2091 size_t const idx = static_cast<size_t>(type);
2092 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2093 throw std::invalid_argument(
2094 to<std::string>("Compression type ", idx, " invalid"));
2095 }
2096 return codecFactories[idx];
2097}
2098} // namespace
2099
2100bool hasCodec(CodecType type) {
2101 return getFactory(type).codec != nullptr;
2102}
2103
2104std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2105 auto const factory = getFactory(type).codec;
2106 if (!factory) {
2107 throw std::invalid_argument(
2108 to<std::string>("Compression type ", type, " not supported"));
2109 }
2110 auto codec = (*factory)(level, type);
2111 DCHECK(codec->type() == type);
2112 return codec;
2113}
2114
2115bool hasStreamCodec(CodecType type) {
2116 return getFactory(type).stream != nullptr;
2117}
2118
2119std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2120 auto const factory = getFactory(type).stream;
2121 if (!factory) {
2122 throw std::invalid_argument(
2123 to<std::string>("Compression type ", type, " not supported"));
2124 }
2125 auto codec = (*factory)(level, type);
2126 DCHECK(codec->type() == type);
2127 return codec;
2128}
2129
2130std::unique_ptr<Codec> getAutoUncompressionCodec(
2131 std::vector<std::unique_ptr<Codec>> customCodecs,
2132 std::unique_ptr<Codec> terminalCodec) {
2133 return AutomaticCodec::create(
2134 std::move(customCodecs), std::move(terminalCodec));
2135}
2136} // namespace io
2137} // namespace folly
2138