1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/compression/Compression.h> |
18 | |
19 | #if FOLLY_HAVE_LIBLZ4 |
20 | #include <lz4.h> |
21 | #include <lz4hc.h> |
22 | #if LZ4_VERSION_NUMBER >= 10301 |
23 | #include <lz4frame.h> |
24 | #endif |
25 | #endif |
26 | |
27 | #include <glog/logging.h> |
28 | |
29 | #if FOLLY_HAVE_LIBSNAPPY |
30 | #include <snappy-sinksource.h> |
31 | #include <snappy.h> |
32 | #endif |
33 | |
34 | #if FOLLY_HAVE_LIBZ |
35 | #include <folly/compression/Zlib.h> |
36 | #endif |
37 | |
38 | #if FOLLY_HAVE_LIBLZMA |
39 | #include <lzma.h> |
40 | #endif |
41 | |
42 | #if FOLLY_HAVE_LIBZSTD |
43 | #include <folly/compression/Zstd.h> |
44 | #endif |
45 | |
46 | #if FOLLY_HAVE_LIBBZ2 |
47 | #include <folly/portability/Windows.h> |
48 | |
49 | #include <bzlib.h> |
50 | #endif |
51 | |
52 | #include <folly/Conv.h> |
53 | #include <folly/Memory.h> |
54 | #include <folly/Portability.h> |
55 | #include <folly/Random.h> |
56 | #include <folly/ScopeGuard.h> |
57 | #include <folly/Varint.h> |
58 | #include <folly/compression/Utils.h> |
59 | #include <folly/io/Cursor.h> |
60 | #include <folly/lang/Bits.h> |
61 | #include <folly/stop_watch.h> |
62 | #include <algorithm> |
63 | #include <unordered_set> |
64 | |
65 | using folly::io::compression::detail::dataStartsWithLE; |
66 | using folly::io::compression::detail::prefixToStringLE; |
67 | |
68 | namespace folly { |
69 | namespace io { |
70 | |
71 | Codec::Codec( |
72 | CodecType type, |
73 | Optional<int> level, |
74 | StringPiece name, |
75 | bool counters) |
76 | : type_(type) { |
77 | if (counters) { |
78 | bytesBeforeCompression_ = {type, |
79 | name, |
80 | level, |
81 | CompressionCounterKey::BYTES_BEFORE_COMPRESSION, |
82 | CompressionCounterType::SUM}; |
83 | bytesAfterCompression_ = {type, |
84 | name, |
85 | level, |
86 | CompressionCounterKey::BYTES_AFTER_COMPRESSION, |
87 | CompressionCounterType::SUM}; |
88 | bytesBeforeDecompression_ = { |
89 | type, |
90 | name, |
91 | level, |
92 | CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION, |
93 | CompressionCounterType::SUM}; |
94 | bytesAfterDecompression_ = { |
95 | type, |
96 | name, |
97 | level, |
98 | CompressionCounterKey::BYTES_AFTER_DECOMPRESSION, |
99 | CompressionCounterType::SUM}; |
100 | compressions_ = {type, |
101 | name, |
102 | level, |
103 | CompressionCounterKey::COMPRESSIONS, |
104 | CompressionCounterType::SUM}; |
105 | decompressions_ = {type, |
106 | name, |
107 | level, |
108 | CompressionCounterKey::DECOMPRESSIONS, |
109 | CompressionCounterType::SUM}; |
110 | compressionMilliseconds_ = {type, |
111 | name, |
112 | level, |
113 | CompressionCounterKey::COMPRESSION_MILLISECONDS, |
114 | CompressionCounterType::SUM}; |
115 | decompressionMilliseconds_ = { |
116 | type, |
117 | name, |
118 | level, |
119 | CompressionCounterKey::DECOMPRESSION_MILLISECONDS, |
120 | CompressionCounterType::SUM}; |
121 | } |
122 | } |
123 | |
124 | namespace { |
125 | constexpr uint32_t kLoggingRate = 50; |
126 | |
127 | class Timer { |
128 | public: |
129 | explicit Timer(folly::detail::CompressionCounter& counter) |
130 | : counter_(&counter) {} |
131 | |
132 | ~Timer() { |
133 | *counter_ += timer_.elapsed().count(); |
134 | } |
135 | |
136 | private: |
137 | folly::detail::CompressionCounter* counter_; |
138 | stop_watch<std::chrono::milliseconds> timer_; |
139 | }; |
140 | } // namespace |
141 | |
142 | // Ensure consistent behavior in the nullptr case |
143 | std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) { |
144 | if (data == nullptr) { |
145 | throw std::invalid_argument("Codec: data must not be nullptr" ); |
146 | } |
147 | const uint64_t len = data->computeChainDataLength(); |
148 | if (len > maxUncompressedLength()) { |
149 | throw std::runtime_error("Codec: uncompressed length too large" ); |
150 | } |
151 | bool const logging = folly::Random::oneIn(kLoggingRate); |
152 | folly::Optional<Timer> const timer = |
153 | logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>(); |
154 | auto result = doCompress(data); |
155 | if (logging) { |
156 | compressions_++; |
157 | bytesBeforeCompression_ += len; |
158 | bytesAfterCompression_ += result->computeChainDataLength(); |
159 | } |
160 | return result; |
161 | } |
162 | |
163 | std::string Codec::compress(const StringPiece data) { |
164 | const uint64_t len = data.size(); |
165 | if (len > maxUncompressedLength()) { |
166 | throw std::runtime_error("Codec: uncompressed length too large" ); |
167 | } |
168 | bool const logging = folly::Random::oneIn(kLoggingRate); |
169 | folly::Optional<Timer> const timer = |
170 | logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>(); |
171 | auto result = doCompressString(data); |
172 | if (logging) { |
173 | compressions_++; |
174 | bytesBeforeCompression_ += len; |
175 | bytesAfterCompression_ += result.size(); |
176 | } |
177 | return result; |
178 | } |
179 | |
180 | std::unique_ptr<IOBuf> Codec::uncompress( |
181 | const IOBuf* data, |
182 | Optional<uint64_t> uncompressedLength) { |
183 | if (data == nullptr) { |
184 | throw std::invalid_argument("Codec: data must not be nullptr" ); |
185 | } |
186 | if (!uncompressedLength) { |
187 | if (needsUncompressedLength()) { |
188 | throw std::invalid_argument("Codec: uncompressed length required" ); |
189 | } |
190 | } else if (*uncompressedLength > maxUncompressedLength()) { |
191 | throw std::runtime_error("Codec: uncompressed length too large" ); |
192 | } |
193 | |
194 | if (data->empty()) { |
195 | if (uncompressedLength.value_or(0) != 0) { |
196 | throw std::runtime_error("Codec: invalid uncompressed length" ); |
197 | } |
198 | return IOBuf::create(0); |
199 | } |
200 | |
201 | bool const logging = folly::Random::oneIn(kLoggingRate); |
202 | folly::Optional<Timer> const timer = |
203 | logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>(); |
204 | auto result = doUncompress(data, uncompressedLength); |
205 | if (logging) { |
206 | decompressions_++; |
207 | bytesBeforeDecompression_ += data->computeChainDataLength(); |
208 | bytesAfterDecompression_ += result->computeChainDataLength(); |
209 | } |
210 | return result; |
211 | } |
212 | |
213 | std::string Codec::uncompress( |
214 | const StringPiece data, |
215 | Optional<uint64_t> uncompressedLength) { |
216 | if (!uncompressedLength) { |
217 | if (needsUncompressedLength()) { |
218 | throw std::invalid_argument("Codec: uncompressed length required" ); |
219 | } |
220 | } else if (*uncompressedLength > maxUncompressedLength()) { |
221 | throw std::runtime_error("Codec: uncompressed length too large" ); |
222 | } |
223 | |
224 | if (data.empty()) { |
225 | if (uncompressedLength.value_or(0) != 0) { |
226 | throw std::runtime_error("Codec: invalid uncompressed length" ); |
227 | } |
228 | return "" ; |
229 | } |
230 | |
231 | bool const logging = folly::Random::oneIn(kLoggingRate); |
232 | folly::Optional<Timer> const timer = |
233 | logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>(); |
234 | auto result = doUncompressString(data, uncompressedLength); |
235 | if (logging) { |
236 | decompressions_++; |
237 | bytesBeforeDecompression_ += data.size(); |
238 | bytesAfterDecompression_ += result.size(); |
239 | } |
240 | return result; |
241 | } |
242 | |
243 | bool Codec::needsUncompressedLength() const { |
244 | return doNeedsUncompressedLength(); |
245 | } |
246 | |
247 | uint64_t Codec::maxUncompressedLength() const { |
248 | return doMaxUncompressedLength(); |
249 | } |
250 | |
251 | bool Codec::doNeedsUncompressedLength() const { |
252 | return false; |
253 | } |
254 | |
255 | uint64_t Codec::doMaxUncompressedLength() const { |
256 | return UNLIMITED_UNCOMPRESSED_LENGTH; |
257 | } |
258 | |
259 | std::vector<std::string> Codec::validPrefixes() const { |
260 | return {}; |
261 | } |
262 | |
263 | bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const { |
264 | return false; |
265 | } |
266 | |
267 | std::string Codec::doCompressString(const StringPiece data) { |
268 | const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; |
269 | auto outputBuffer = doCompress(&inputBuffer); |
270 | std::string output; |
271 | output.reserve(outputBuffer->computeChainDataLength()); |
272 | for (auto range : *outputBuffer) { |
273 | output.append(reinterpret_cast<const char*>(range.data()), range.size()); |
274 | } |
275 | return output; |
276 | } |
277 | |
278 | std::string Codec::doUncompressString( |
279 | const StringPiece data, |
280 | Optional<uint64_t> uncompressedLength) { |
281 | const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; |
282 | auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength); |
283 | std::string output; |
284 | output.reserve(outputBuffer->computeChainDataLength()); |
285 | for (auto range : *outputBuffer) { |
286 | output.append(reinterpret_cast<const char*>(range.data()), range.size()); |
287 | } |
288 | return output; |
289 | } |
290 | |
291 | uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const { |
292 | return doMaxCompressedLength(uncompressedLength); |
293 | } |
294 | |
295 | Optional<uint64_t> Codec::getUncompressedLength( |
296 | const folly::IOBuf* data, |
297 | Optional<uint64_t> uncompressedLength) const { |
298 | auto const compressedLength = data->computeChainDataLength(); |
299 | if (compressedLength == 0) { |
300 | if (uncompressedLength.value_or(0) != 0) { |
301 | throw std::runtime_error("Invalid uncompressed length" ); |
302 | } |
303 | return 0; |
304 | } |
305 | return doGetUncompressedLength(data, uncompressedLength); |
306 | } |
307 | |
308 | Optional<uint64_t> Codec::doGetUncompressedLength( |
309 | const folly::IOBuf*, |
310 | Optional<uint64_t> uncompressedLength) const { |
311 | return uncompressedLength; |
312 | } |
313 | |
314 | bool StreamCodec::needsDataLength() const { |
315 | return doNeedsDataLength(); |
316 | } |
317 | |
318 | bool StreamCodec::doNeedsDataLength() const { |
319 | return false; |
320 | } |
321 | |
322 | void StreamCodec::assertStateIs(State expected) const { |
323 | if (state_ != expected) { |
324 | throw std::logic_error(folly::to<std::string>( |
325 | "Codec: state is " , state_, "; expected state " , expected)); |
326 | } |
327 | } |
328 | |
329 | void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) { |
330 | state_ = State::RESET; |
331 | uncompressedLength_ = uncompressedLength; |
332 | progressMade_ = true; |
333 | doResetStream(); |
334 | } |
335 | |
336 | bool StreamCodec::compressStream( |
337 | ByteRange& input, |
338 | MutableByteRange& output, |
339 | StreamCodec::FlushOp flushOp) { |
340 | if (state_ == State::RESET && input.empty() && |
341 | flushOp == StreamCodec::FlushOp::END && |
342 | uncompressedLength().value_or(0) != 0) { |
343 | throw std::runtime_error("Codec: invalid uncompressed length" ); |
344 | } |
345 | |
346 | if (!uncompressedLength() && needsDataLength()) { |
347 | throw std::runtime_error("Codec: uncompressed length required" ); |
348 | } |
349 | if (state_ == State::RESET && !input.empty() && |
350 | uncompressedLength() == uint64_t(0)) { |
351 | throw std::runtime_error("Codec: invalid uncompressed length" ); |
352 | } |
353 | // Handle input state transitions |
354 | switch (flushOp) { |
355 | case StreamCodec::FlushOp::NONE: |
356 | if (state_ == State::RESET) { |
357 | state_ = State::COMPRESS; |
358 | } |
359 | assertStateIs(State::COMPRESS); |
360 | break; |
361 | case StreamCodec::FlushOp::FLUSH: |
362 | if (state_ == State::RESET || state_ == State::COMPRESS) { |
363 | state_ = State::COMPRESS_FLUSH; |
364 | } |
365 | assertStateIs(State::COMPRESS_FLUSH); |
366 | break; |
367 | case StreamCodec::FlushOp::END: |
368 | if (state_ == State::RESET || state_ == State::COMPRESS) { |
369 | state_ = State::COMPRESS_END; |
370 | } |
371 | assertStateIs(State::COMPRESS_END); |
372 | break; |
373 | } |
374 | size_t const inputSize = input.size(); |
375 | size_t const outputSize = output.size(); |
376 | bool const done = doCompressStream(input, output, flushOp); |
377 | if (!done && inputSize == input.size() && outputSize == output.size()) { |
378 | if (!progressMade_) { |
379 | throw std::runtime_error("Codec: No forward progress made" ); |
380 | } |
381 | // Throw an exception if there is no progress again next time |
382 | progressMade_ = false; |
383 | } else { |
384 | progressMade_ = true; |
385 | } |
386 | // Handle output state transitions |
387 | if (done) { |
388 | if (state_ == State::COMPRESS_FLUSH) { |
389 | state_ = State::COMPRESS; |
390 | } else if (state_ == State::COMPRESS_END) { |
391 | state_ = State::END; |
392 | } |
393 | // Check internal invariants |
394 | DCHECK(input.empty()); |
395 | DCHECK(flushOp != StreamCodec::FlushOp::NONE); |
396 | } |
397 | return done; |
398 | } |
399 | |
400 | bool StreamCodec::uncompressStream( |
401 | ByteRange& input, |
402 | MutableByteRange& output, |
403 | StreamCodec::FlushOp flushOp) { |
404 | if (state_ == State::RESET && input.empty()) { |
405 | if (uncompressedLength().value_or(0) == 0) { |
406 | return true; |
407 | } |
408 | return false; |
409 | } |
410 | // Handle input state transitions |
411 | if (state_ == State::RESET) { |
412 | state_ = State::UNCOMPRESS; |
413 | } |
414 | assertStateIs(State::UNCOMPRESS); |
415 | size_t const inputSize = input.size(); |
416 | size_t const outputSize = output.size(); |
417 | bool const done = doUncompressStream(input, output, flushOp); |
418 | if (!done && inputSize == input.size() && outputSize == output.size()) { |
419 | if (!progressMade_) { |
420 | throw std::runtime_error("Codec: no forward progress made" ); |
421 | } |
422 | // Throw an exception if there is no progress again next time |
423 | progressMade_ = false; |
424 | } else { |
425 | progressMade_ = true; |
426 | } |
427 | // Handle output state transitions |
428 | if (done) { |
429 | state_ = State::END; |
430 | } |
431 | return done; |
432 | } |
433 | |
434 | static std::unique_ptr<IOBuf> addOutputBuffer( |
435 | MutableByteRange& output, |
436 | uint64_t size) { |
437 | DCHECK(output.empty()); |
438 | auto buffer = IOBuf::create(size); |
439 | buffer->append(buffer->capacity()); |
440 | output = {buffer->writableData(), buffer->length()}; |
441 | return buffer; |
442 | } |
443 | |
444 | std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) { |
445 | uint64_t const uncompressedLength = data->computeChainDataLength(); |
446 | resetStream(uncompressedLength); |
447 | uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); |
448 | |
449 | auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB |
450 | auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB |
451 | |
452 | MutableByteRange output; |
453 | auto buffer = addOutputBuffer( |
454 | output, |
455 | maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen |
456 | : kDefaultBufferLength); |
457 | |
458 | // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer |
459 | IOBuf const* current = data; |
460 | ByteRange input{current->data(), current->length()}; |
461 | StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; |
462 | bool done = false; |
463 | while (!done) { |
464 | while (input.empty() && current->next() != data) { |
465 | current = current->next(); |
466 | input = {current->data(), current->length()}; |
467 | } |
468 | if (current->next() == data) { |
469 | // This is the last input buffer so end the stream |
470 | flushOp = StreamCodec::FlushOp::END; |
471 | } |
472 | if (output.empty()) { |
473 | buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength)); |
474 | } |
475 | done = compressStream(input, output, flushOp); |
476 | if (done) { |
477 | DCHECK(input.empty()); |
478 | DCHECK(flushOp == StreamCodec::FlushOp::END); |
479 | DCHECK_EQ(current->next(), data); |
480 | } |
481 | } |
482 | buffer->prev()->trimEnd(output.size()); |
483 | return buffer; |
484 | } |
485 | |
486 | static uint64_t computeBufferLength( |
487 | uint64_t const compressedLength, |
488 | uint64_t const blockSize) { |
489 | uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB |
490 | uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); |
491 | return std::min(goodBufferSize, kMaxBufferLength); |
492 | } |
493 | |
494 | std::unique_ptr<IOBuf> StreamCodec::doUncompress( |
495 | IOBuf const* data, |
496 | Optional<uint64_t> uncompressedLength) { |
497 | auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB |
498 | auto constexpr kBlockSize = uint64_t(128) << 10; |
499 | auto const defaultBufferLength = |
500 | computeBufferLength(data->computeChainDataLength(), kBlockSize); |
501 | |
502 | uncompressedLength = getUncompressedLength(data, uncompressedLength); |
503 | resetStream(uncompressedLength); |
504 | |
505 | MutableByteRange output; |
506 | auto buffer = addOutputBuffer( |
507 | output, |
508 | (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength |
509 | ? *uncompressedLength |
510 | : defaultBufferLength)); |
511 | |
512 | // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer |
513 | IOBuf const* current = data; |
514 | ByteRange input{current->data(), current->length()}; |
515 | StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; |
516 | bool done = false; |
517 | while (!done) { |
518 | while (input.empty() && current->next() != data) { |
519 | current = current->next(); |
520 | input = {current->data(), current->length()}; |
521 | } |
522 | if (current->next() == data) { |
523 | // Tell the uncompressor there is no more input (it may optimize) |
524 | flushOp = StreamCodec::FlushOp::END; |
525 | } |
526 | if (output.empty()) { |
527 | buffer->prependChain(addOutputBuffer(output, defaultBufferLength)); |
528 | } |
529 | done = uncompressStream(input, output, flushOp); |
530 | } |
531 | if (!input.empty()) { |
532 | throw std::runtime_error("Codec: Junk after end of data" ); |
533 | } |
534 | |
535 | buffer->prev()->trimEnd(output.size()); |
536 | if (uncompressedLength && |
537 | *uncompressedLength != buffer->computeChainDataLength()) { |
538 | throw std::runtime_error("Codec: invalid uncompressed length" ); |
539 | } |
540 | |
541 | return buffer; |
542 | } |
543 | |
544 | namespace { |
545 | |
546 | /** |
547 | * No compression |
548 | */ |
549 | class NoCompressionCodec final : public Codec { |
550 | public: |
551 | static std::unique_ptr<Codec> create(int level, CodecType type); |
552 | explicit NoCompressionCodec(int level, CodecType type); |
553 | |
554 | private: |
555 | uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; |
556 | std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override; |
557 | std::unique_ptr<IOBuf> doUncompress( |
558 | const IOBuf* data, |
559 | Optional<uint64_t> uncompressedLength) override; |
560 | }; |
561 | |
562 | std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) { |
563 | return std::make_unique<NoCompressionCodec>(level, type); |
564 | } |
565 | |
566 | NoCompressionCodec::NoCompressionCodec(int level, CodecType type) |
567 | : Codec(type) { |
568 | DCHECK(type == CodecType::NO_COMPRESSION); |
569 | switch (level) { |
570 | case COMPRESSION_LEVEL_DEFAULT: |
571 | case COMPRESSION_LEVEL_FASTEST: |
572 | case COMPRESSION_LEVEL_BEST: |
573 | level = 0; |
574 | } |
575 | if (level != 0) { |
576 | throw std::invalid_argument( |
577 | to<std::string>("NoCompressionCodec: invalid level " , level)); |
578 | } |
579 | } |
580 | |
581 | uint64_t NoCompressionCodec::doMaxCompressedLength( |
582 | uint64_t uncompressedLength) const { |
583 | return uncompressedLength; |
584 | } |
585 | |
586 | std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(const IOBuf* data) { |
587 | return data->clone(); |
588 | } |
589 | |
590 | std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress( |
591 | const IOBuf* data, |
592 | Optional<uint64_t> uncompressedLength) { |
593 | if (uncompressedLength && |
594 | data->computeChainDataLength() != *uncompressedLength) { |
595 | throw std::runtime_error( |
596 | to<std::string>("NoCompressionCodec: invalid uncompressed length" )); |
597 | } |
598 | return data->clone(); |
599 | } |
600 | |
601 | #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA) |
602 | |
603 | namespace { |
604 | |
605 | void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) { |
606 | DCHECK_GE(out->tailroom(), kMaxVarintLength64); |
607 | out->append(encodeVarint(val, out->writableTail())); |
608 | } |
609 | |
610 | inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { |
611 | uint64_t val = 0; |
612 | int8_t b = 0; |
613 | for (int shift = 0; shift <= 63; shift += 7) { |
614 | b = cursor.read<int8_t>(); |
615 | val |= static_cast<uint64_t>(b & 0x7f) << shift; |
616 | if (b >= 0) { |
617 | break; |
618 | } |
619 | } |
620 | if (b < 0) { |
621 | throw std::invalid_argument("Invalid varint value. Too big." ); |
622 | } |
623 | return val; |
624 | } |
625 | |
626 | } // namespace |
627 | |
628 | #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA |
629 | |
630 | #if FOLLY_HAVE_LIBLZ4 |
631 | |
632 | #if LZ4_VERSION_NUMBER >= 10802 && defined(LZ4_STATIC_LINKING_ONLY) && \ |
633 | defined(LZ4_HC_STATIC_LINKING_ONLY) && !defined(FOLLY_USE_LZ4_FAST_RESET) |
634 | #define FOLLY_USE_LZ4_FAST_RESET |
635 | #endif |
636 | |
637 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
638 | namespace { |
639 | void lz4_stream_t_deleter(LZ4_stream_t* ctx) { |
640 | LZ4_freeStream(ctx); |
641 | } |
642 | |
643 | void lz4_streamhc_t_deleter(LZ4_streamHC_t* ctx) { |
644 | LZ4_freeStreamHC(ctx); |
645 | } |
646 | } // namespace |
647 | #endif |
648 | |
649 | /** |
650 | * LZ4 compression |
651 | */ |
652 | class LZ4Codec final : public Codec { |
653 | public: |
654 | static std::unique_ptr<Codec> create(int level, CodecType type); |
655 | explicit LZ4Codec(int level, CodecType type); |
656 | |
657 | private: |
658 | bool doNeedsUncompressedLength() const override; |
659 | uint64_t doMaxUncompressedLength() const override; |
660 | uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; |
661 | |
662 | bool encodeSize() const { |
663 | return type() == CodecType::LZ4_VARINT_SIZE; |
664 | } |
665 | |
666 | std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override; |
667 | std::unique_ptr<IOBuf> doUncompress( |
668 | const IOBuf* data, |
669 | Optional<uint64_t> uncompressedLength) override; |
670 | |
671 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
672 | std::unique_ptr< |
673 | LZ4_stream_t, |
674 | folly::static_function_deleter<LZ4_stream_t, lz4_stream_t_deleter>> |
675 | ctx; |
676 | std::unique_ptr< |
677 | LZ4_streamHC_t, |
678 | folly::static_function_deleter<LZ4_streamHC_t, lz4_streamhc_t_deleter>> |
679 | hcctx; |
680 | #endif |
681 | |
682 | bool highCompression_; |
683 | }; |
684 | |
685 | std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) { |
686 | return std::make_unique<LZ4Codec>(level, type); |
687 | } |
688 | |
689 | static int lz4ConvertLevel(int level) { |
690 | switch (level) { |
691 | case 1: |
692 | case COMPRESSION_LEVEL_FASTEST: |
693 | case COMPRESSION_LEVEL_DEFAULT: |
694 | return 1; |
695 | case 2: |
696 | case COMPRESSION_LEVEL_BEST: |
697 | return 2; |
698 | } |
699 | throw std::invalid_argument( |
700 | to<std::string>("LZ4Codec: invalid level: " , level)); |
701 | } |
702 | |
703 | LZ4Codec::LZ4Codec(int level, CodecType type) |
704 | : Codec(type, lz4ConvertLevel(level)), |
705 | highCompression_(lz4ConvertLevel(level) > 1) { |
706 | DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE); |
707 | } |
708 | |
709 | bool LZ4Codec::doNeedsUncompressedLength() const { |
710 | return !encodeSize(); |
711 | } |
712 | |
713 | // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't |
714 | // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it |
715 | // here. |
716 | #ifndef LZ4_MAX_INPUT_SIZE |
717 | #define LZ4_MAX_INPUT_SIZE 0x7E000000 |
718 | #endif |
719 | |
720 | uint64_t LZ4Codec::doMaxUncompressedLength() const { |
721 | return LZ4_MAX_INPUT_SIZE; |
722 | } |
723 | |
724 | uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { |
725 | return LZ4_compressBound(uncompressedLength) + |
726 | (encodeSize() ? kMaxVarintLength64 : 0); |
727 | } |
728 | |
729 | std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) { |
730 | IOBuf clone; |
731 | if (data->isChained()) { |
732 | // LZ4 doesn't support streaming, so we have to coalesce |
733 | clone = data->cloneCoalescedAsValue(); |
734 | data = &clone; |
735 | } |
736 | |
737 | auto out = IOBuf::create(maxCompressedLength(data->length())); |
738 | if (encodeSize()) { |
739 | encodeVarintToIOBuf(data->length(), out.get()); |
740 | } |
741 | |
742 | int n; |
743 | auto input = reinterpret_cast<const char*>(data->data()); |
744 | auto output = reinterpret_cast<char*>(out->writableTail()); |
745 | const auto inputLength = data->length(); |
746 | |
747 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
748 | if (!highCompression_ && !ctx) { |
749 | ctx.reset(LZ4_createStream()); |
750 | } |
751 | if (highCompression_ && !hcctx) { |
752 | hcctx.reset(LZ4_createStreamHC()); |
753 | } |
754 | |
755 | if (highCompression_) { |
756 | n = LZ4_compress_HC_extStateHC_fastReset( |
757 | hcctx.get(), input, output, inputLength, out->tailroom(), 0); |
758 | } else { |
759 | n = LZ4_compress_fast_extState_fastReset( |
760 | ctx.get(), input, output, inputLength, out->tailroom(), 1); |
761 | } |
762 | #elif LZ4_VERSION_NUMBER >= 10700 |
763 | if (highCompression_) { |
764 | n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0); |
765 | } else { |
766 | n = LZ4_compress_default(input, output, inputLength, out->tailroom()); |
767 | } |
768 | #else |
769 | if (highCompression_) { |
770 | n = LZ4_compressHC(input, output, inputLength); |
771 | } else { |
772 | n = LZ4_compress(input, output, inputLength); |
773 | } |
774 | #endif |
775 | |
776 | CHECK_GE(n, 0); |
777 | CHECK_LE(n, out->capacity()); |
778 | |
779 | out->append(n); |
780 | return out; |
781 | } |
782 | |
783 | std::unique_ptr<IOBuf> LZ4Codec::doUncompress( |
784 | const IOBuf* data, |
785 | Optional<uint64_t> uncompressedLength) { |
786 | IOBuf clone; |
787 | if (data->isChained()) { |
788 | // LZ4 doesn't support streaming, so we have to coalesce |
789 | clone = data->cloneCoalescedAsValue(); |
790 | data = &clone; |
791 | } |
792 | |
793 | folly::io::Cursor cursor(data); |
794 | uint64_t actualUncompressedLength; |
795 | if (encodeSize()) { |
796 | actualUncompressedLength = decodeVarintFromCursor(cursor); |
797 | if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { |
798 | throw std::runtime_error("LZ4Codec: invalid uncompressed length" ); |
799 | } |
800 | } else { |
801 | // Invariants |
802 | DCHECK(uncompressedLength.hasValue()); |
803 | DCHECK(*uncompressedLength <= maxUncompressedLength()); |
804 | actualUncompressedLength = *uncompressedLength; |
805 | } |
806 | |
807 | auto sp = StringPiece{cursor.peekBytes()}; |
808 | auto out = IOBuf::create(actualUncompressedLength); |
809 | int n = LZ4_decompress_safe( |
810 | sp.data(), |
811 | reinterpret_cast<char*>(out->writableTail()), |
812 | sp.size(), |
813 | actualUncompressedLength); |
814 | |
815 | if (n < 0 || uint64_t(n) != actualUncompressedLength) { |
816 | throw std::runtime_error( |
817 | to<std::string>("LZ4 decompression returned invalid value " , n)); |
818 | } |
819 | out->append(actualUncompressedLength); |
820 | return out; |
821 | } |
822 | |
823 | #if LZ4_VERSION_NUMBER >= 10301 |
824 | |
825 | class LZ4FrameCodec final : public Codec { |
826 | public: |
827 | static std::unique_ptr<Codec> create(int level, CodecType type); |
828 | explicit LZ4FrameCodec(int level, CodecType type); |
829 | ~LZ4FrameCodec() override; |
830 | |
831 | std::vector<std::string> validPrefixes() const override; |
832 | bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength) |
833 | const override; |
834 | |
835 | private: |
836 | uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; |
837 | |
838 | std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override; |
839 | std::unique_ptr<IOBuf> doUncompress( |
840 | const IOBuf* data, |
841 | Optional<uint64_t> uncompressedLength) override; |
842 | |
843 | // Reset the dctx_ if it is dirty or null. |
844 | void resetDCtx(); |
845 | |
846 | int level_; |
847 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
848 | LZ4F_compressionContext_t cctx_{nullptr}; |
849 | #endif |
850 | LZ4F_decompressionContext_t dctx_{nullptr}; |
851 | bool dirty_{false}; |
852 | }; |
853 | |
854 | /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create( |
855 | int level, |
856 | CodecType type) { |
857 | return std::make_unique<LZ4FrameCodec>(level, type); |
858 | } |
859 | |
860 | static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204; |
861 | |
862 | std::vector<std::string> LZ4FrameCodec::validPrefixes() const { |
863 | return {prefixToStringLE(kLZ4FrameMagicLE)}; |
864 | } |
865 | |
866 | bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const { |
867 | return dataStartsWithLE(data, kLZ4FrameMagicLE); |
868 | } |
869 | |
870 | uint64_t LZ4FrameCodec::doMaxCompressedLength( |
871 | uint64_t uncompressedLength) const { |
872 | LZ4F_preferences_t prefs{}; |
873 | prefs.compressionLevel = level_; |
874 | prefs.frameInfo.contentSize = uncompressedLength; |
875 | return LZ4F_compressFrameBound(uncompressedLength, &prefs); |
876 | } |
877 | |
878 | static size_t lz4FrameThrowOnError(size_t code) { |
879 | if (LZ4F_isError(code)) { |
880 | throw std::runtime_error( |
881 | to<std::string>("LZ4Frame error: " , LZ4F_getErrorName(code))); |
882 | } |
883 | return code; |
884 | } |
885 | |
886 | void LZ4FrameCodec::resetDCtx() { |
887 | if (dctx_ && !dirty_) { |
888 | return; |
889 | } |
890 | if (dctx_) { |
891 | LZ4F_freeDecompressionContext(dctx_); |
892 | } |
893 | lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100)); |
894 | dirty_ = false; |
895 | } |
896 | |
897 | static int lz4fConvertLevel(int level) { |
898 | switch (level) { |
899 | case COMPRESSION_LEVEL_FASTEST: |
900 | case COMPRESSION_LEVEL_DEFAULT: |
901 | return 0; |
902 | case COMPRESSION_LEVEL_BEST: |
903 | return 16; |
904 | } |
905 | return level; |
906 | } |
907 | |
908 | LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) |
909 | : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) { |
910 | DCHECK(type == CodecType::LZ4_FRAME); |
911 | } |
912 | |
913 | LZ4FrameCodec::~LZ4FrameCodec() { |
914 | if (dctx_) { |
915 | LZ4F_freeDecompressionContext(dctx_); |
916 | } |
917 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
918 | if (cctx_) { |
919 | LZ4F_freeCompressionContext(cctx_); |
920 | } |
921 | #endif |
922 | } |
923 | |
924 | std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) { |
925 | // LZ4 Frame compression doesn't support streaming so we have to coalesce |
926 | IOBuf clone; |
927 | if (data->isChained()) { |
928 | clone = data->cloneCoalescedAsValue(); |
929 | data = &clone; |
930 | } |
931 | |
932 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
933 | if (!cctx_) { |
934 | lz4FrameThrowOnError(LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION)); |
935 | } |
936 | #endif |
937 | |
938 | // Set preferences |
939 | const auto uncompressedLength = data->length(); |
940 | LZ4F_preferences_t prefs{}; |
941 | prefs.compressionLevel = level_; |
942 | prefs.frameInfo.contentSize = uncompressedLength; |
943 | // Compress |
944 | auto buf = IOBuf::create(maxCompressedLength(uncompressedLength)); |
945 | const size_t written = lz4FrameThrowOnError( |
946 | #ifdef FOLLY_USE_LZ4_FAST_RESET |
947 | LZ4F_compressFrame_usingCDict( |
948 | cctx_, |
949 | buf->writableTail(), |
950 | buf->tailroom(), |
951 | data->data(), |
952 | data->length(), |
953 | nullptr, |
954 | &prefs) |
955 | #else |
956 | LZ4F_compressFrame( |
957 | buf->writableTail(), |
958 | buf->tailroom(), |
959 | data->data(), |
960 | data->length(), |
961 | &prefs) |
962 | #endif |
963 | ); |
964 | buf->append(written); |
965 | return buf; |
966 | } |
967 | |
968 | std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress( |
969 | const IOBuf* data, |
970 | Optional<uint64_t> uncompressedLength) { |
971 | // Reset the dctx if any errors have occurred |
972 | resetDCtx(); |
973 | // Coalesce the data |
974 | ByteRange in = *data->begin(); |
975 | IOBuf clone; |
976 | if (data->isChained()) { |
977 | clone = data->cloneCoalescedAsValue(); |
978 | in = clone.coalesce(); |
979 | } |
980 | data = nullptr; |
981 | // Select decompression options |
982 | LZ4F_decompressOptions_t options; |
983 | options.stableDst = 1; |
984 | // Select blockSize and growthSize for the IOBufQueue |
985 | IOBufQueue queue(IOBufQueue::cacheChainLength()); |
986 | auto blockSize = uint64_t{64} << 10; |
987 | auto growthSize = uint64_t{4} << 20; |
988 | if (uncompressedLength) { |
989 | // Allocate uncompressedLength in one chunk (up to 64 MB) |
990 | const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20); |
991 | queue.preallocate(allocateSize, allocateSize); |
992 | blockSize = std::min(*uncompressedLength, blockSize); |
993 | growthSize = std::min(*uncompressedLength, growthSize); |
994 | } else { |
995 | // Reduce growthSize for small data |
996 | const auto guessUncompressedLen = |
997 | 4 * std::max<uint64_t>(blockSize, in.size()); |
998 | growthSize = std::min(guessUncompressedLen, growthSize); |
999 | } |
1000 | // Once LZ4_decompress() is called, the dctx_ cannot be reused until it |
1001 | // returns 0 |
1002 | dirty_ = true; |
1003 | // Decompress until the frame is over |
1004 | size_t code = 0; |
1005 | do { |
1006 | // Allocate enough space to decompress at least a block |
1007 | void* out; |
1008 | size_t outSize; |
1009 | std::tie(out, outSize) = queue.preallocate(blockSize, growthSize); |
1010 | // Decompress |
1011 | size_t inSize = in.size(); |
1012 | code = lz4FrameThrowOnError( |
1013 | LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options)); |
1014 | if (in.empty() && outSize == 0 && code != 0) { |
1015 | // We passed no input, no output was produced, and the frame isn't over |
1016 | // No more forward progress is possible |
1017 | throw std::runtime_error("LZ4Frame error: Incomplete frame" ); |
1018 | } |
1019 | in.uncheckedAdvance(inSize); |
1020 | queue.postallocate(outSize); |
1021 | } while (code != 0); |
1022 | // At this point the decompression context can be reused |
1023 | dirty_ = false; |
1024 | if (uncompressedLength && queue.chainLength() != *uncompressedLength) { |
1025 | throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength" ); |
1026 | } |
1027 | return queue.move(); |
1028 | } |
1029 | |
1030 | #endif // LZ4_VERSION_NUMBER >= 10301 |
1031 | #endif // FOLLY_HAVE_LIBLZ4 |
1032 | |
1033 | #if FOLLY_HAVE_LIBSNAPPY |
1034 | |
1035 | /** |
1036 | * Snappy compression |
1037 | */ |
1038 | |
1039 | /** |
1040 | * Implementation of snappy::Source that reads from a IOBuf chain. |
1041 | */ |
1042 | class IOBufSnappySource final : public snappy::Source { |
1043 | public: |
1044 | explicit IOBufSnappySource(const IOBuf* data); |
1045 | size_t Available() const override; |
1046 | const char* Peek(size_t* len) override; |
1047 | void Skip(size_t n) override; |
1048 | |
1049 | private: |
1050 | size_t available_; |
1051 | io::Cursor cursor_; |
1052 | }; |
1053 | |
1054 | IOBufSnappySource::IOBufSnappySource(const IOBuf* data) |
1055 | : available_(data->computeChainDataLength()), cursor_(data) {} |
1056 | |
1057 | size_t IOBufSnappySource::Available() const { |
1058 | return available_; |
1059 | } |
1060 | |
1061 | const char* IOBufSnappySource::Peek(size_t* len) { |
1062 | auto sp = StringPiece{cursor_.peekBytes()}; |
1063 | *len = sp.size(); |
1064 | return sp.data(); |
1065 | } |
1066 | |
1067 | void IOBufSnappySource::Skip(size_t n) { |
1068 | CHECK_LE(n, available_); |
1069 | cursor_.skip(n); |
1070 | available_ -= n; |
1071 | } |
1072 | |
1073 | class SnappyCodec final : public Codec { |
1074 | public: |
1075 | static std::unique_ptr<Codec> create(int level, CodecType type); |
1076 | explicit SnappyCodec(int level, CodecType type); |
1077 | |
1078 | private: |
1079 | uint64_t doMaxUncompressedLength() const override; |
1080 | uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; |
1081 | std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override; |
1082 | std::unique_ptr<IOBuf> doUncompress( |
1083 | const IOBuf* data, |
1084 | Optional<uint64_t> uncompressedLength) override; |
1085 | }; |
1086 | |
1087 | std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) { |
1088 | return std::make_unique<SnappyCodec>(level, type); |
1089 | } |
1090 | |
1091 | SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) { |
1092 | DCHECK(type == CodecType::SNAPPY); |
1093 | switch (level) { |
1094 | case COMPRESSION_LEVEL_FASTEST: |
1095 | case COMPRESSION_LEVEL_DEFAULT: |
1096 | case COMPRESSION_LEVEL_BEST: |
1097 | level = 1; |
1098 | } |
1099 | if (level != 1) { |
1100 | throw std::invalid_argument( |
1101 | to<std::string>("SnappyCodec: invalid level: " , level)); |
1102 | } |
1103 | } |
1104 | |
1105 | uint64_t SnappyCodec::doMaxUncompressedLength() const { |
1106 | // snappy.h uses uint32_t for lengths, so there's that. |
1107 | return std::numeric_limits<uint32_t>::max(); |
1108 | } |
1109 | |
1110 | uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { |
1111 | return snappy::MaxCompressedLength(uncompressedLength); |
1112 | } |
1113 | |
1114 | std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) { |
1115 | IOBufSnappySource source(data); |
1116 | auto out = IOBuf::create(maxCompressedLength(source.Available())); |
1117 | |
1118 | snappy::UncheckedByteArraySink sink( |
1119 | reinterpret_cast<char*>(out->writableTail())); |
1120 | |
1121 | size_t n = snappy::Compress(&source, &sink); |
1122 | |
1123 | CHECK_LE(n, out->capacity()); |
1124 | out->append(n); |
1125 | return out; |
1126 | } |
1127 | |
1128 | std::unique_ptr<IOBuf> SnappyCodec::doUncompress( |
1129 | const IOBuf* data, |
1130 | Optional<uint64_t> uncompressedLength) { |
1131 | uint32_t actualUncompressedLength = 0; |
1132 | |
1133 | { |
1134 | IOBufSnappySource source(data); |
1135 | if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) { |
1136 | throw std::runtime_error("snappy::GetUncompressedLength failed" ); |
1137 | } |
1138 | if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { |
1139 | throw std::runtime_error("snappy: invalid uncompressed length" ); |
1140 | } |
1141 | } |
1142 | |
1143 | auto out = IOBuf::create(actualUncompressedLength); |
1144 | |
1145 | { |
1146 | IOBufSnappySource source(data); |
1147 | if (!snappy::RawUncompress( |
1148 | &source, reinterpret_cast<char*>(out->writableTail()))) { |
1149 | throw std::runtime_error("snappy::RawUncompress failed" ); |
1150 | } |
1151 | } |
1152 | |
1153 | out->append(actualUncompressedLength); |
1154 | return out; |
1155 | } |
1156 | |
1157 | #endif // FOLLY_HAVE_LIBSNAPPY |
1158 | |
1159 | #if FOLLY_HAVE_LIBLZMA |
1160 | |
1161 | /** |
1162 | * LZMA2 compression |
1163 | */ |
1164 | class LZMA2StreamCodec final : public StreamCodec { |
1165 | public: |
1166 | static std::unique_ptr<Codec> createCodec(int level, CodecType type); |
1167 | static std::unique_ptr<StreamCodec> createStream(int level, CodecType type); |
1168 | explicit LZMA2StreamCodec(int level, CodecType type); |
1169 | ~LZMA2StreamCodec() override; |
1170 | |
1171 | std::vector<std::string> validPrefixes() const override; |
1172 | bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength) |
1173 | const override; |
1174 | |
1175 | private: |
1176 | bool doNeedsDataLength() const override; |
1177 | uint64_t doMaxUncompressedLength() const override; |
1178 | uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; |
1179 | |
1180 | bool encodeSize() const { |
1181 | return type() == CodecType::LZMA2_VARINT_SIZE; |
1182 | } |
1183 | |
1184 | void doResetStream() override; |
1185 | bool doCompressStream( |
1186 | ByteRange& input, |
1187 | MutableByteRange& output, |
1188 | StreamCodec::FlushOp flushOp) override; |
1189 | bool doUncompressStream( |
1190 | ByteRange& input, |
1191 | MutableByteRange& output, |
1192 | StreamCodec::FlushOp flushOp) override; |
1193 | |
1194 | void resetCStream(); |
1195 | void resetDStream(); |
1196 | |
1197 | bool decodeAndCheckVarint(ByteRange& input); |
1198 | bool flushVarintBuffer(MutableByteRange& output); |
1199 | void resetVarintBuffer(); |
1200 | |
1201 | Optional<lzma_stream> cstream_{}; |
1202 | Optional<lzma_stream> dstream_{}; |
1203 | |
1204 | std::array<uint8_t, kMaxVarintLength64> varintBuffer_; |
1205 | ByteRange varintToEncode_; |
1206 | size_t varintBufferPos_{0}; |
1207 | |
1208 | int level_; |
1209 | bool needReset_{true}; |
1210 | bool needDecodeSize_{false}; |
1211 | }; |
1212 | |
1213 | static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD; |
1214 | static constexpr unsigned kLZMA2MagicBytes = 6; |
1215 | |
1216 | std::vector<std::string> LZMA2StreamCodec::validPrefixes() const { |
1217 | if (type() == CodecType::LZMA2_VARINT_SIZE) { |
1218 | return {}; |
1219 | } |
1220 | return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)}; |
1221 | } |
1222 | |
1223 | bool LZMA2StreamCodec::doNeedsDataLength() const { |
1224 | return encodeSize(); |
1225 | } |
1226 | |
1227 | bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) |
1228 | const { |
1229 | if (type() == CodecType::LZMA2_VARINT_SIZE) { |
1230 | return false; |
1231 | } |
1232 | // Returns false for all inputs less than 8 bytes. |
1233 | // This is okay, because no valid LZMA2 streams are less than 8 bytes. |
1234 | return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes); |
1235 | } |
1236 | |
1237 | std::unique_ptr<Codec> LZMA2StreamCodec::createCodec( |
1238 | int level, |
1239 | CodecType type) { |
1240 | return make_unique<LZMA2StreamCodec>(level, type); |
1241 | } |
1242 | |
1243 | std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream( |
1244 | int level, |
1245 | CodecType type) { |
1246 | return make_unique<LZMA2StreamCodec>(level, type); |
1247 | } |
1248 | |
1249 | LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type) |
1250 | : StreamCodec(type) { |
1251 | DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE); |
1252 | switch (level) { |
1253 | case COMPRESSION_LEVEL_FASTEST: |
1254 | level = 0; |
1255 | break; |
1256 | case COMPRESSION_LEVEL_DEFAULT: |
1257 | level = LZMA_PRESET_DEFAULT; |
1258 | break; |
1259 | case COMPRESSION_LEVEL_BEST: |
1260 | level = 9; |
1261 | break; |
1262 | } |
1263 | if (level < 0 || level > 9) { |
1264 | throw std::invalid_argument( |
1265 | to<std::string>("LZMA2Codec: invalid level: " , level)); |
1266 | } |
1267 | level_ = level; |
1268 | } |
1269 | |
1270 | LZMA2StreamCodec::~LZMA2StreamCodec() { |
1271 | if (cstream_) { |
1272 | lzma_end(cstream_.get_pointer()); |
1273 | cstream_.clear(); |
1274 | } |
1275 | if (dstream_) { |
1276 | lzma_end(dstream_.get_pointer()); |
1277 | dstream_.clear(); |
1278 | } |
1279 | } |
1280 | |
1281 | uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const { |
1282 | // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)" |
1283 | return uint64_t(1) << 63; |
1284 | } |
1285 | |
1286 | uint64_t LZMA2StreamCodec::doMaxCompressedLength( |
1287 | uint64_t uncompressedLength) const { |
1288 | return lzma_stream_buffer_bound(uncompressedLength) + |
1289 | (encodeSize() ? kMaxVarintLength64 : 0); |
1290 | } |
1291 | |
1292 | void LZMA2StreamCodec::doResetStream() { |
1293 | needReset_ = true; |
1294 | } |
1295 | |
1296 | void LZMA2StreamCodec::resetCStream() { |
1297 | if (!cstream_) { |
1298 | cstream_.assign(LZMA_STREAM_INIT); |
1299 | } |
1300 | lzma_ret const rc = |
1301 | lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE); |
1302 | if (rc != LZMA_OK) { |
1303 | throw std::runtime_error(folly::to<std::string>( |
1304 | "LZMA2StreamCodec: lzma_easy_encoder error: " , rc)); |
1305 | } |
1306 | } |
1307 | |
1308 | void LZMA2StreamCodec::resetDStream() { |
1309 | if (!dstream_) { |
1310 | dstream_.assign(LZMA_STREAM_INIT); |
1311 | } |
1312 | lzma_ret const rc = lzma_auto_decoder( |
1313 | dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0); |
1314 | if (rc != LZMA_OK) { |
1315 | throw std::runtime_error(folly::to<std::string>( |
1316 | "LZMA2StreamCodec: lzma_auto_decoder error: " , rc)); |
1317 | } |
1318 | } |
1319 | |
1320 | static lzma_ret lzmaThrowOnError(lzma_ret const rc) { |
1321 | switch (rc) { |
1322 | case LZMA_OK: |
1323 | case LZMA_STREAM_END: |
1324 | case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice |
1325 | return rc; |
1326 | default: |
1327 | throw std::runtime_error( |
1328 | to<std::string>("LZMA2StreamCodec: error: " , rc)); |
1329 | } |
1330 | } |
1331 | |
1332 | static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) { |
1333 | switch (flush) { |
1334 | case StreamCodec::FlushOp::NONE: |
1335 | return LZMA_RUN; |
1336 | case StreamCodec::FlushOp::FLUSH: |
1337 | return LZMA_SYNC_FLUSH; |
1338 | case StreamCodec::FlushOp::END: |
1339 | return LZMA_FINISH; |
1340 | default: |
1341 | throw std::invalid_argument("LZMA2StreamCodec: Invalid flush" ); |
1342 | } |
1343 | } |
1344 | |
1345 | /** |
1346 | * Flushes the varint buffer. |
1347 | * Advances output by the number of bytes written. |
1348 | * Returns true when flushing is complete. |
1349 | */ |
1350 | bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) { |
1351 | if (varintToEncode_.empty()) { |
1352 | return true; |
1353 | } |
1354 | const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size()); |
1355 | if (numBytesToCopy > 0) { |
1356 | memcpy(output.data(), varintToEncode_.data(), numBytesToCopy); |
1357 | } |
1358 | varintToEncode_.advance(numBytesToCopy); |
1359 | output.advance(numBytesToCopy); |
1360 | return varintToEncode_.empty(); |
1361 | } |
1362 | |
1363 | bool LZMA2StreamCodec::doCompressStream( |
1364 | ByteRange& input, |
1365 | MutableByteRange& output, |
1366 | StreamCodec::FlushOp flushOp) { |
1367 | if (needReset_) { |
1368 | resetCStream(); |
1369 | if (encodeSize()) { |
1370 | varintBufferPos_ = 0; |
1371 | size_t const varintSize = |
1372 | encodeVarint(*uncompressedLength(), varintBuffer_.data()); |
1373 | varintToEncode_ = {varintBuffer_.data(), varintSize}; |
1374 | } |
1375 | needReset_ = false; |
1376 | } |
1377 | |
1378 | if (!flushVarintBuffer(output)) { |
1379 | return false; |
1380 | } |
1381 | |
1382 | cstream_->next_in = const_cast<uint8_t*>(input.data()); |
1383 | cstream_->avail_in = input.size(); |
1384 | cstream_->next_out = output.data(); |
1385 | cstream_->avail_out = output.size(); |
1386 | SCOPE_EXIT { |
1387 | input.uncheckedAdvance(input.size() - cstream_->avail_in); |
1388 | output.uncheckedAdvance(output.size() - cstream_->avail_out); |
1389 | }; |
1390 | lzma_ret const rc = lzmaThrowOnError( |
1391 | lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp))); |
1392 | switch (flushOp) { |
1393 | case StreamCodec::FlushOp::NONE: |
1394 | return false; |
1395 | case StreamCodec::FlushOp::FLUSH: |
1396 | return cstream_->avail_in == 0 && cstream_->avail_out != 0; |
1397 | case StreamCodec::FlushOp::END: |
1398 | return rc == LZMA_STREAM_END; |
1399 | default: |
1400 | throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp" ); |
1401 | } |
1402 | } |
1403 | |
1404 | /** |
1405 | * Attempts to decode a varint from input. |
1406 | * The function advances input by the number of bytes read. |
1407 | * |
1408 | * If there are too many bytes and the varint is not valid, throw a |
1409 | * runtime_error. |
1410 | * |
1411 | * If the uncompressed length was provided and a decoded varint does not match |
1412 | * the provided length, throw a runtime_error. |
1413 | * |
1414 | * Returns true if the varint was successfully decoded and matches the |
1415 | * uncompressed length if provided, and false if more bytes are needed. |
1416 | */ |
1417 | bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) { |
1418 | if (input.empty()) { |
1419 | return false; |
1420 | } |
1421 | size_t const numBytesToCopy = |
1422 | std::min(kMaxVarintLength64 - varintBufferPos_, input.size()); |
1423 | memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy); |
1424 | |
1425 | size_t const rangeSize = varintBufferPos_ + numBytesToCopy; |
1426 | ByteRange range{varintBuffer_.data(), rangeSize}; |
1427 | auto const ret = tryDecodeVarint(range); |
1428 | |
1429 | if (ret.hasValue()) { |
1430 | size_t const varintSize = rangeSize - range.size(); |
1431 | input.advance(varintSize - varintBufferPos_); |
1432 | if (uncompressedLength() && *uncompressedLength() != ret.value()) { |
1433 | throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length" ); |
1434 | } |
1435 | return true; |
1436 | } else if (ret.error() == DecodeVarintError::TooManyBytes) { |
1437 | throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length" ); |
1438 | } else { |
1439 | // Too few bytes |
1440 | input.advance(numBytesToCopy); |
1441 | varintBufferPos_ += numBytesToCopy; |
1442 | return false; |
1443 | } |
1444 | } |
1445 | |
1446 | bool LZMA2StreamCodec::doUncompressStream( |
1447 | ByteRange& input, |
1448 | MutableByteRange& output, |
1449 | StreamCodec::FlushOp flushOp) { |
1450 | if (needReset_) { |
1451 | resetDStream(); |
1452 | needReset_ = false; |
1453 | needDecodeSize_ = encodeSize(); |
1454 | if (encodeSize()) { |
1455 | // Reset buffer |
1456 | varintBufferPos_ = 0; |
1457 | } |
1458 | } |
1459 | |
1460 | if (needDecodeSize_) { |
1461 | // Try decoding the varint. If the input does not contain the entire varint, |
1462 | // buffer the input. If the varint can not be decoded, fail. |
1463 | if (!decodeAndCheckVarint(input)) { |
1464 | return false; |
1465 | } |
1466 | needDecodeSize_ = false; |
1467 | } |
1468 | |
1469 | dstream_->next_in = const_cast<uint8_t*>(input.data()); |
1470 | dstream_->avail_in = input.size(); |
1471 | dstream_->next_out = output.data(); |
1472 | dstream_->avail_out = output.size(); |
1473 | SCOPE_EXIT { |
1474 | input.advance(input.size() - dstream_->avail_in); |
1475 | output.advance(output.size() - dstream_->avail_out); |
1476 | }; |
1477 | |
1478 | lzma_ret rc; |
1479 | switch (flushOp) { |
1480 | case StreamCodec::FlushOp::NONE: |
1481 | case StreamCodec::FlushOp::FLUSH: |
1482 | rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN)); |
1483 | break; |
1484 | case StreamCodec::FlushOp::END: |
1485 | rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH)); |
1486 | break; |
1487 | default: |
1488 | throw std::invalid_argument("LZMA2StreamCodec: invalid flush" ); |
1489 | } |
1490 | return rc == LZMA_STREAM_END; |
1491 | } |
1492 | #endif // FOLLY_HAVE_LIBLZMA |
1493 | |
1494 | #if FOLLY_HAVE_LIBZSTD |
1495 | |
1496 | static int zstdConvertLevel(int level) { |
1497 | switch (level) { |
1498 | case COMPRESSION_LEVEL_FASTEST: |
1499 | return 1; |
1500 | case COMPRESSION_LEVEL_DEFAULT: |
1501 | return 1; |
1502 | case COMPRESSION_LEVEL_BEST: |
1503 | return 19; |
1504 | } |
1505 | if (level < 1 || level > ZSTD_maxCLevel()) { |
1506 | throw std::invalid_argument( |
1507 | to<std::string>("ZSTD: invalid level: " , level)); |
1508 | } |
1509 | return level; |
1510 | } |
1511 | |
1512 | static int zstdFastConvertLevel(int level) { |
1513 | switch (level) { |
1514 | case COMPRESSION_LEVEL_FASTEST: |
1515 | return -5; |
1516 | case COMPRESSION_LEVEL_DEFAULT: |
1517 | return -1; |
1518 | case COMPRESSION_LEVEL_BEST: |
1519 | return -1; |
1520 | } |
1521 | if (level < 1) { |
1522 | throw std::invalid_argument( |
1523 | to<std::string>("ZSTD: invalid level: " , level)); |
1524 | } |
1525 | return -level; |
1526 | } |
1527 | |
1528 | std::unique_ptr<Codec> getZstdCodec(int level, CodecType type) { |
1529 | DCHECK(type == CodecType::ZSTD); |
1530 | return zstd::getCodec(zstd::Options(zstdConvertLevel(level))); |
1531 | } |
1532 | |
1533 | std::unique_ptr<StreamCodec> getZstdStreamCodec(int level, CodecType type) { |
1534 | DCHECK(type == CodecType::ZSTD); |
1535 | return zstd::getStreamCodec(zstd::Options(zstdConvertLevel(level))); |
1536 | } |
1537 | |
1538 | std::unique_ptr<Codec> getZstdFastCodec(int level, CodecType type) { |
1539 | DCHECK(type == CodecType::ZSTD_FAST); |
1540 | return zstd::getCodec(zstd::Options(zstdFastConvertLevel(level))); |
1541 | } |
1542 | |
1543 | std::unique_ptr<StreamCodec> getZstdFastStreamCodec(int level, CodecType type) { |
1544 | DCHECK(type == CodecType::ZSTD_FAST); |
1545 | return zstd::getStreamCodec(zstd::Options(zstdFastConvertLevel(level))); |
1546 | } |
1547 | |
1548 | #endif // FOLLY_HAVE_LIBZSTD |
1549 | |
1550 | #if FOLLY_HAVE_LIBBZ2 |
1551 | |
1552 | class Bzip2StreamCodec final : public StreamCodec { |
1553 | public: |
1554 | static std::unique_ptr<Codec> createCodec(int level, CodecType type); |
1555 | static std::unique_ptr<StreamCodec> createStream(int level, CodecType type); |
1556 | explicit Bzip2StreamCodec(int level, CodecType type); |
1557 | |
1558 | ~Bzip2StreamCodec() override; |
1559 | |
1560 | std::vector<std::string> validPrefixes() const override; |
1561 | bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength) |
1562 | const override; |
1563 | |
1564 | private: |
1565 | uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; |
1566 | |
1567 | void doResetStream() override; |
1568 | bool doCompressStream( |
1569 | ByteRange& input, |
1570 | MutableByteRange& output, |
1571 | StreamCodec::FlushOp flushOp) override; |
1572 | bool doUncompressStream( |
1573 | ByteRange& input, |
1574 | MutableByteRange& output, |
1575 | StreamCodec::FlushOp flushOp) override; |
1576 | |
1577 | void resetCStream(); |
1578 | void resetDStream(); |
1579 | |
1580 | Optional<bz_stream> cstream_{}; |
1581 | Optional<bz_stream> dstream_{}; |
1582 | |
1583 | int level_; |
1584 | bool needReset_{true}; |
1585 | }; |
1586 | |
1587 | /* static */ std::unique_ptr<Codec> Bzip2StreamCodec::createCodec( |
1588 | int level, |
1589 | CodecType type) { |
1590 | return createStream(level, type); |
1591 | } |
1592 | |
1593 | /* static */ std::unique_ptr<StreamCodec> Bzip2StreamCodec::createStream( |
1594 | int level, |
1595 | CodecType type) { |
1596 | return std::make_unique<Bzip2StreamCodec>(level, type); |
1597 | } |
1598 | |
1599 | Bzip2StreamCodec::Bzip2StreamCodec(int level, CodecType type) |
1600 | : StreamCodec(type) { |
1601 | DCHECK(type == CodecType::BZIP2); |
1602 | switch (level) { |
1603 | case COMPRESSION_LEVEL_FASTEST: |
1604 | level = 1; |
1605 | break; |
1606 | case COMPRESSION_LEVEL_DEFAULT: |
1607 | level = 9; |
1608 | break; |
1609 | case COMPRESSION_LEVEL_BEST: |
1610 | level = 9; |
1611 | break; |
1612 | } |
1613 | if (level < 1 || level > 9) { |
1614 | throw std::invalid_argument( |
1615 | to<std::string>("Bzip2: invalid level: " , level)); |
1616 | } |
1617 | level_ = level; |
1618 | } |
1619 | |
1620 | static uint32_t constexpr kBzip2MagicLE = 0x685a42; |
1621 | static uint64_t constexpr kBzip2MagicBytes = 3; |
1622 | |
1623 | std::vector<std::string> Bzip2StreamCodec::validPrefixes() const { |
1624 | return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)}; |
1625 | } |
1626 | |
1627 | bool Bzip2StreamCodec::canUncompress(IOBuf const* data, Optional<uint64_t>) |
1628 | const { |
1629 | return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes); |
1630 | } |
1631 | |
1632 | uint64_t Bzip2StreamCodec::doMaxCompressedLength( |
1633 | uint64_t uncompressedLength) const { |
1634 | // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress |
1635 | // To guarantee that the compressed data will fit in its buffer, allocate an |
1636 | // output buffer of size 1% larger than the uncompressed data, plus six |
1637 | // hundred extra bytes. |
1638 | return uncompressedLength + uncompressedLength / 100 + 600; |
1639 | } |
1640 | |
1641 | static bz_stream createBzStream() { |
1642 | bz_stream stream; |
1643 | stream.bzalloc = nullptr; |
1644 | stream.bzfree = nullptr; |
1645 | stream.opaque = nullptr; |
1646 | stream.next_in = stream.next_out = nullptr; |
1647 | stream.avail_in = stream.avail_out = 0; |
1648 | return stream; |
1649 | } |
1650 | |
1651 | // Throws on error condition, otherwise returns the code. |
1652 | static int bzCheck(int const rc) { |
1653 | switch (rc) { |
1654 | case BZ_OK: |
1655 | case BZ_RUN_OK: |
1656 | case BZ_FLUSH_OK: |
1657 | case BZ_FINISH_OK: |
1658 | case BZ_STREAM_END: |
1659 | // Allow BZ_PARAM_ERROR. |
1660 | // It can get returned if no progress is made, but we handle that. |
1661 | case BZ_PARAM_ERROR: |
1662 | return rc; |
1663 | default: |
1664 | throw std::runtime_error(to<std::string>("Bzip2 error: " , rc)); |
1665 | } |
1666 | } |
1667 | |
1668 | Bzip2StreamCodec::~Bzip2StreamCodec() { |
1669 | if (cstream_) { |
1670 | BZ2_bzCompressEnd(cstream_.get_pointer()); |
1671 | cstream_.clear(); |
1672 | } |
1673 | if (dstream_) { |
1674 | BZ2_bzDecompressEnd(dstream_.get_pointer()); |
1675 | dstream_.clear(); |
1676 | } |
1677 | } |
1678 | |
1679 | void Bzip2StreamCodec::doResetStream() { |
1680 | needReset_ = true; |
1681 | } |
1682 | |
1683 | void Bzip2StreamCodec::resetCStream() { |
1684 | if (cstream_) { |
1685 | BZ2_bzCompressEnd(cstream_.get_pointer()); |
1686 | } |
1687 | cstream_ = createBzStream(); |
1688 | bzCheck(BZ2_bzCompressInit(cstream_.get_pointer(), level_, 0, 0)); |
1689 | } |
1690 | |
1691 | int bzip2TranslateFlush(StreamCodec::FlushOp flushOp) { |
1692 | switch (flushOp) { |
1693 | case StreamCodec::FlushOp::NONE: |
1694 | return BZ_RUN; |
1695 | case StreamCodec::FlushOp::END: |
1696 | return BZ_FINISH; |
1697 | case StreamCodec::FlushOp::FLUSH: |
1698 | throw std::invalid_argument( |
1699 | "Bzip2StreamCodec: FlushOp::FLUSH not supported" ); |
1700 | default: |
1701 | throw std::invalid_argument("Bzip2StreamCodec: Invalid flush" ); |
1702 | } |
1703 | } |
1704 | |
1705 | bool Bzip2StreamCodec::doCompressStream( |
1706 | ByteRange& input, |
1707 | MutableByteRange& output, |
1708 | StreamCodec::FlushOp flushOp) { |
1709 | if (needReset_) { |
1710 | resetCStream(); |
1711 | needReset_ = false; |
1712 | } |
1713 | if (input.empty() && output.empty()) { |
1714 | return false; |
1715 | } |
1716 | |
1717 | cstream_->next_in = |
1718 | const_cast<char*>(reinterpret_cast<const char*>(input.data())); |
1719 | cstream_->avail_in = input.size(); |
1720 | cstream_->next_out = reinterpret_cast<char*>(output.data()); |
1721 | cstream_->avail_out = output.size(); |
1722 | SCOPE_EXIT { |
1723 | input.uncheckedAdvance(input.size() - cstream_->avail_in); |
1724 | output.uncheckedAdvance(output.size() - cstream_->avail_out); |
1725 | }; |
1726 | int const rc = bzCheck( |
1727 | BZ2_bzCompress(cstream_.get_pointer(), bzip2TranslateFlush(flushOp))); |
1728 | switch (flushOp) { |
1729 | case StreamCodec::FlushOp::NONE: |
1730 | return false; |
1731 | case StreamCodec::FlushOp::FLUSH: |
1732 | if (rc == BZ_RUN_OK) { |
1733 | DCHECK_EQ(cstream_->avail_in, 0); |
1734 | DCHECK(input.size() == 0 || cstream_->avail_out != output.size()); |
1735 | return true; |
1736 | } |
1737 | return false; |
1738 | case StreamCodec::FlushOp::END: |
1739 | return rc == BZ_STREAM_END; |
1740 | default: |
1741 | throw std::invalid_argument("Bzip2StreamCodec: invalid FlushOp" ); |
1742 | } |
1743 | return false; |
1744 | } |
1745 | |
1746 | void Bzip2StreamCodec::resetDStream() { |
1747 | if (dstream_) { |
1748 | BZ2_bzDecompressEnd(dstream_.get_pointer()); |
1749 | } |
1750 | dstream_ = createBzStream(); |
1751 | bzCheck(BZ2_bzDecompressInit(dstream_.get_pointer(), 0, 0)); |
1752 | } |
1753 | |
1754 | bool Bzip2StreamCodec::doUncompressStream( |
1755 | ByteRange& input, |
1756 | MutableByteRange& output, |
1757 | StreamCodec::FlushOp flushOp) { |
1758 | if (flushOp == StreamCodec::FlushOp::FLUSH) { |
1759 | throw std::invalid_argument( |
1760 | "Bzip2StreamCodec: FlushOp::FLUSH not supported" ); |
1761 | } |
1762 | if (needReset_) { |
1763 | resetDStream(); |
1764 | needReset_ = false; |
1765 | } |
1766 | |
1767 | dstream_->next_in = |
1768 | const_cast<char*>(reinterpret_cast<const char*>(input.data())); |
1769 | dstream_->avail_in = input.size(); |
1770 | dstream_->next_out = reinterpret_cast<char*>(output.data()); |
1771 | dstream_->avail_out = output.size(); |
1772 | SCOPE_EXIT { |
1773 | input.uncheckedAdvance(input.size() - dstream_->avail_in); |
1774 | output.uncheckedAdvance(output.size() - dstream_->avail_out); |
1775 | }; |
1776 | int const rc = bzCheck(BZ2_bzDecompress(dstream_.get_pointer())); |
1777 | return rc == BZ_STREAM_END; |
1778 | } |
1779 | |
1780 | #endif // FOLLY_HAVE_LIBBZ2 |
1781 | |
1782 | #if FOLLY_HAVE_LIBZ |
1783 | |
1784 | zlib::Options getZlibOptions(CodecType type) { |
1785 | DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB); |
1786 | return type == CodecType::GZIP ? zlib::defaultGzipOptions() |
1787 | : zlib::defaultZlibOptions(); |
1788 | } |
1789 | |
1790 | std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) { |
1791 | return zlib::getCodec(getZlibOptions(type), level); |
1792 | } |
1793 | |
1794 | std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) { |
1795 | return zlib::getStreamCodec(getZlibOptions(type), level); |
1796 | } |
1797 | |
1798 | #endif // FOLLY_HAVE_LIBZ |
1799 | |
1800 | /** |
1801 | * Automatic decompression |
1802 | */ |
1803 | class AutomaticCodec final : public Codec { |
1804 | public: |
1805 | static std::unique_ptr<Codec> create( |
1806 | std::vector<std::unique_ptr<Codec>> customCodecs, |
1807 | std::unique_ptr<Codec> terminalCodec); |
1808 | explicit AutomaticCodec( |
1809 | std::vector<std::unique_ptr<Codec>> customCodecs, |
1810 | std::unique_ptr<Codec> terminalCodec); |
1811 | |
1812 | std::vector<std::string> validPrefixes() const override; |
1813 | bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength) |
1814 | const override; |
1815 | |
1816 | private: |
1817 | bool doNeedsUncompressedLength() const override; |
1818 | uint64_t doMaxUncompressedLength() const override; |
1819 | |
1820 | uint64_t doMaxCompressedLength(uint64_t) const override { |
1821 | throw std::runtime_error( |
1822 | "AutomaticCodec error: maxCompressedLength() not supported." ); |
1823 | } |
1824 | std::unique_ptr<IOBuf> doCompress(const IOBuf*) override { |
1825 | throw std::runtime_error("AutomaticCodec error: compress() not supported." ); |
1826 | } |
1827 | std::unique_ptr<IOBuf> doUncompress( |
1828 | const IOBuf* data, |
1829 | Optional<uint64_t> uncompressedLength) override; |
1830 | |
1831 | void addCodecIfSupported(CodecType type); |
1832 | |
1833 | // Throws iff the codecs aren't compatible (very slow) |
1834 | void checkCompatibleCodecs() const; |
1835 | |
1836 | std::vector<std::unique_ptr<Codec>> codecs_; |
1837 | std::unique_ptr<Codec> terminalCodec_; |
1838 | bool needsUncompressedLength_; |
1839 | uint64_t maxUncompressedLength_; |
1840 | }; |
1841 | |
1842 | std::vector<std::string> AutomaticCodec::validPrefixes() const { |
1843 | std::unordered_set<std::string> prefixes; |
1844 | for (const auto& codec : codecs_) { |
1845 | const auto codecPrefixes = codec->validPrefixes(); |
1846 | prefixes.insert(codecPrefixes.begin(), codecPrefixes.end()); |
1847 | } |
1848 | return std::vector<std::string>{prefixes.begin(), prefixes.end()}; |
1849 | } |
1850 | |
1851 | bool AutomaticCodec::canUncompress( |
1852 | const IOBuf* data, |
1853 | Optional<uint64_t> uncompressedLength) const { |
1854 | return std::any_of( |
1855 | codecs_.begin(), |
1856 | codecs_.end(), |
1857 | [data, uncompressedLength](std::unique_ptr<Codec> const& codec) { |
1858 | return codec->canUncompress(data, uncompressedLength); |
1859 | }); |
1860 | } |
1861 | |
1862 | void AutomaticCodec::addCodecIfSupported(CodecType type) { |
1863 | const bool present = std::any_of( |
1864 | codecs_.begin(), |
1865 | codecs_.end(), |
1866 | [&type](std::unique_ptr<Codec> const& codec) { |
1867 | return codec->type() == type; |
1868 | }); |
1869 | bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type; |
1870 | if (hasCodec(type) && !present && !isTerminalType) { |
1871 | codecs_.push_back(getCodec(type)); |
1872 | } |
1873 | } |
1874 | |
1875 | /* static */ std::unique_ptr<Codec> AutomaticCodec::create( |
1876 | std::vector<std::unique_ptr<Codec>> customCodecs, |
1877 | std::unique_ptr<Codec> terminalCodec) { |
1878 | return std::make_unique<AutomaticCodec>( |
1879 | std::move(customCodecs), std::move(terminalCodec)); |
1880 | } |
1881 | |
1882 | AutomaticCodec::AutomaticCodec( |
1883 | std::vector<std::unique_ptr<Codec>> customCodecs, |
1884 | std::unique_ptr<Codec> terminalCodec) |
1885 | : Codec(CodecType::USER_DEFINED, folly::none, "auto" ), |
1886 | codecs_(std::move(customCodecs)), |
1887 | terminalCodec_(std::move(terminalCodec)) { |
1888 | // Fastest -> slowest |
1889 | std::array<CodecType, 6> defaultTypes{{ |
1890 | CodecType::LZ4_FRAME, |
1891 | CodecType::ZSTD, |
1892 | CodecType::ZLIB, |
1893 | CodecType::GZIP, |
1894 | CodecType::LZMA2, |
1895 | CodecType::BZIP2, |
1896 | }}; |
1897 | |
1898 | for (auto type : defaultTypes) { |
1899 | addCodecIfSupported(type); |
1900 | } |
1901 | |
1902 | if (kIsDebug) { |
1903 | checkCompatibleCodecs(); |
1904 | } |
1905 | |
1906 | // Check that none of the codecs are null |
1907 | DCHECK(std::none_of( |
1908 | codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) { |
1909 | return codec == nullptr; |
1910 | })); |
1911 | |
1912 | // Check that the terminal codec's type is not duplicated (with the exception |
1913 | // of USER_DEFINED). |
1914 | if (terminalCodec_) { |
1915 | DCHECK(std::none_of( |
1916 | codecs_.begin(), |
1917 | codecs_.end(), |
1918 | [&](std::unique_ptr<Codec> const& codec) { |
1919 | return codec->type() != CodecType::USER_DEFINED && |
1920 | codec->type() == terminalCodec_->type(); |
1921 | })); |
1922 | } |
1923 | |
1924 | bool const terminalNeedsUncompressedLength = |
1925 | terminalCodec_ && terminalCodec_->needsUncompressedLength(); |
1926 | needsUncompressedLength_ = std::any_of( |
1927 | codecs_.begin(), |
1928 | codecs_.end(), |
1929 | [](std::unique_ptr<Codec> const& codec) { |
1930 | return codec->needsUncompressedLength(); |
1931 | }) || |
1932 | terminalNeedsUncompressedLength; |
1933 | |
1934 | const auto it = std::max_element( |
1935 | codecs_.begin(), |
1936 | codecs_.end(), |
1937 | [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) { |
1938 | return lhs->maxUncompressedLength() < rhs->maxUncompressedLength(); |
1939 | }); |
1940 | DCHECK(it != codecs_.end()); |
1941 | auto const terminalMaxUncompressedLength = |
1942 | terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0; |
1943 | maxUncompressedLength_ = |
1944 | std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength); |
1945 | } |
1946 | |
1947 | void AutomaticCodec::checkCompatibleCodecs() const { |
1948 | // Keep track of all the possible headers. |
1949 | std::unordered_set<std::string> ; |
1950 | // The empty header is not allowed. |
1951 | headers.insert("" ); |
1952 | // Step 1: |
1953 | // Construct a set of headers and check that none of the headers occur twice. |
1954 | // Eliminate edge cases. |
1955 | for (auto&& codec : codecs_) { |
1956 | const auto = codec->validPrefixes(); |
1957 | // Codecs without any valid headers are not allowed. |
1958 | if (codecHeaders.empty()) { |
1959 | throw std::invalid_argument{ |
1960 | "AutomaticCodec: validPrefixes() must not be empty." }; |
1961 | } |
1962 | // Insert all the headers for the current codec. |
1963 | const size_t beforeSize = headers.size(); |
1964 | headers.insert(codecHeaders.begin(), codecHeaders.end()); |
1965 | // Codecs are not compatible if any header occurred twice. |
1966 | if (beforeSize + codecHeaders.size() != headers.size()) { |
1967 | throw std::invalid_argument{ |
1968 | "AutomaticCodec: Two valid prefixes collide." }; |
1969 | } |
1970 | } |
1971 | // Step 2: |
1972 | // Check if any strict non-empty prefix of any header is a header. |
1973 | for (const auto& : headers) { |
1974 | for (size_t i = 1; i < header.size(); ++i) { |
1975 | if (headers.count(header.substr(0, i))) { |
1976 | throw std::invalid_argument{ |
1977 | "AutomaticCodec: One valid prefix is a prefix of another valid " |
1978 | "prefix." }; |
1979 | } |
1980 | } |
1981 | } |
1982 | } |
1983 | |
1984 | bool AutomaticCodec::doNeedsUncompressedLength() const { |
1985 | return needsUncompressedLength_; |
1986 | } |
1987 | |
1988 | uint64_t AutomaticCodec::doMaxUncompressedLength() const { |
1989 | return maxUncompressedLength_; |
1990 | } |
1991 | |
1992 | std::unique_ptr<IOBuf> AutomaticCodec::doUncompress( |
1993 | const IOBuf* data, |
1994 | Optional<uint64_t> uncompressedLength) { |
1995 | try { |
1996 | for (auto&& codec : codecs_) { |
1997 | if (codec->canUncompress(data, uncompressedLength)) { |
1998 | return codec->uncompress(data, uncompressedLength); |
1999 | } |
2000 | } |
2001 | } catch (std::exception const& e) { |
2002 | if (!terminalCodec_) { |
2003 | throw e; |
2004 | } |
2005 | } |
2006 | |
2007 | // Try terminal codec |
2008 | if (terminalCodec_) { |
2009 | return terminalCodec_->uncompress(data, uncompressedLength); |
2010 | } |
2011 | |
2012 | throw std::runtime_error("AutomaticCodec error: Unknown compressed data" ); |
2013 | } |
2014 | |
2015 | using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType); |
2016 | using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType); |
2017 | struct Factory { |
2018 | CodecFactory codec; |
2019 | StreamCodecFactory stream; |
2020 | }; |
2021 | |
2022 | constexpr Factory |
2023 | codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = { |
2024 | {}, // USER_DEFINED |
2025 | {NoCompressionCodec::create, nullptr}, |
2026 | |
2027 | #if FOLLY_HAVE_LIBLZ4 |
2028 | {LZ4Codec::create, nullptr}, |
2029 | #else |
2030 | {}, |
2031 | #endif |
2032 | |
2033 | #if FOLLY_HAVE_LIBSNAPPY |
2034 | {SnappyCodec::create, nullptr}, |
2035 | #else |
2036 | {}, |
2037 | #endif |
2038 | |
2039 | #if FOLLY_HAVE_LIBZ |
2040 | {getZlibCodec, getZlibStreamCodec}, |
2041 | #else |
2042 | {}, |
2043 | #endif |
2044 | |
2045 | #if FOLLY_HAVE_LIBLZ4 |
2046 | {LZ4Codec::create, nullptr}, |
2047 | #else |
2048 | {}, |
2049 | #endif |
2050 | |
2051 | #if FOLLY_HAVE_LIBLZMA |
2052 | {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream}, |
2053 | {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream}, |
2054 | #else |
2055 | {}, |
2056 | {}, |
2057 | #endif |
2058 | |
2059 | #if FOLLY_HAVE_LIBZSTD |
2060 | {getZstdCodec, getZstdStreamCodec}, |
2061 | #else |
2062 | {}, |
2063 | #endif |
2064 | |
2065 | #if FOLLY_HAVE_LIBZ |
2066 | {getZlibCodec, getZlibStreamCodec}, |
2067 | #else |
2068 | {}, |
2069 | #endif |
2070 | |
2071 | #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) |
2072 | {LZ4FrameCodec::create, nullptr}, |
2073 | #else |
2074 | {}, |
2075 | #endif |
2076 | |
2077 | #if FOLLY_HAVE_LIBBZ2 |
2078 | {Bzip2StreamCodec::createCodec, Bzip2StreamCodec::createStream}, |
2079 | #else |
2080 | {}, |
2081 | #endif |
2082 | |
2083 | #if FOLLY_HAVE_LIBZSTD |
2084 | {getZstdFastCodec, getZstdFastStreamCodec}, |
2085 | #else |
2086 | {}, |
2087 | #endif |
2088 | }; |
2089 | |
2090 | Factory const& getFactory(CodecType type) { |
2091 | size_t const idx = static_cast<size_t>(type); |
2092 | if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) { |
2093 | throw std::invalid_argument( |
2094 | to<std::string>("Compression type " , idx, " invalid" )); |
2095 | } |
2096 | return codecFactories[idx]; |
2097 | } |
2098 | } // namespace |
2099 | |
2100 | bool hasCodec(CodecType type) { |
2101 | return getFactory(type).codec != nullptr; |
2102 | } |
2103 | |
2104 | std::unique_ptr<Codec> getCodec(CodecType type, int level) { |
2105 | auto const factory = getFactory(type).codec; |
2106 | if (!factory) { |
2107 | throw std::invalid_argument( |
2108 | to<std::string>("Compression type " , type, " not supported" )); |
2109 | } |
2110 | auto codec = (*factory)(level, type); |
2111 | DCHECK(codec->type() == type); |
2112 | return codec; |
2113 | } |
2114 | |
2115 | bool hasStreamCodec(CodecType type) { |
2116 | return getFactory(type).stream != nullptr; |
2117 | } |
2118 | |
2119 | std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) { |
2120 | auto const factory = getFactory(type).stream; |
2121 | if (!factory) { |
2122 | throw std::invalid_argument( |
2123 | to<std::string>("Compression type " , type, " not supported" )); |
2124 | } |
2125 | auto codec = (*factory)(level, type); |
2126 | DCHECK(codec->type() == type); |
2127 | return codec; |
2128 | } |
2129 | |
2130 | std::unique_ptr<Codec> getAutoUncompressionCodec( |
2131 | std::vector<std::unique_ptr<Codec>> customCodecs, |
2132 | std::unique_ptr<Codec> terminalCodec) { |
2133 | return AutomaticCodec::create( |
2134 | std::move(customCodecs), std::move(terminalCodec)); |
2135 | } |
2136 | } // namespace io |
2137 | } // namespace folly |
2138 | |