1/*
2 * C port of the snappy compressor from Google.
3 * This is a very fast compressor with comparable compression to lzo.
4 * Works best on 64bit little-endian, but should be good on others too.
5 * Ported by Andi Kleen.
6 * Uptodate with snappy 1.1.0
7 */
8
9/*
10 * Copyright 2005 Google Inc. All Rights Reserved.
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are
14 * met:
15 *
16 * * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 * * Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following disclaimer
20 * in the documentation and/or other materials provided with the
21 * distribution.
22 * * Neither the name of Google Inc. nor the names of its
23 * contributors may be used to endorse or promote products derived from
24 * this software without specific prior written permission.
25 *
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
27 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
28 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
29 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
30 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
31 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
32 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
33 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
34 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
35 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
36 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 */
38
39#ifdef __GNUC__
40#pragma GCC diagnostic push
41#pragma GCC diagnostic ignored "-Wcast-align"
42#endif
43
44#ifndef SG
45#define SG /* Scatter-Gather / iovec support in Snappy */
46#endif
47
48#ifdef __KERNEL__
49#include <linux/kernel.h>
50#ifdef SG
51#include <linux/uio.h>
52#endif
53#include <linux/module.h>
54#include <linux/slab.h>
55#include <linux/string.h>
56#include <linux/snappy.h>
57#include <linux/vmalloc.h>
58#include <asm/unaligned.h>
59#else
60#include "snappy.h"
61#include "snappy_compat.h"
62#endif
63
64#include "rd.h"
65
66#ifdef _MSC_VER
67#define inline __inline
68#endif
69
70#define CRASH_UNLESS(x) BUG_ON(!(x))
71#define CHECK(cond) CRASH_UNLESS(cond)
72#define CHECK_LE(a, b) CRASH_UNLESS((a) <= (b))
73#define CHECK_GE(a, b) CRASH_UNLESS((a) >= (b))
74#define CHECK_EQ(a, b) CRASH_UNLESS((a) == (b))
75#define CHECK_NE(a, b) CRASH_UNLESS((a) != (b))
76#define CHECK_LT(a, b) CRASH_UNLESS((a) < (b))
77#define CHECK_GT(a, b) CRASH_UNLESS((a) > (b))
78
79#define UNALIGNED_LOAD16(_p) get_unaligned((u16 *)(_p))
80#define UNALIGNED_LOAD32(_p) get_unaligned((u32 *)(_p))
81#define UNALIGNED_LOAD64(_p) get_unaligned64((u64 *)(_p))
82
83#define UNALIGNED_STORE16(_p, _val) put_unaligned(_val, (u16 *)(_p))
84#define UNALIGNED_STORE32(_p, _val) put_unaligned(_val, (u32 *)(_p))
85#define UNALIGNED_STORE64(_p, _val) put_unaligned64(_val, (u64 *)(_p))
86
87/*
88 * This can be more efficient than UNALIGNED_LOAD64 + UNALIGNED_STORE64
89 * on some platforms, in particular ARM.
90 */
91static inline void unaligned_copy64(const void *src, void *dst)
92{
93 if (sizeof(void *) == 8) {
94 UNALIGNED_STORE64(dst, UNALIGNED_LOAD64(src));
95 } else {
96 const char *src_char = (const char *)(src);
97 char *dst_char = (char *)(dst);
98
99 UNALIGNED_STORE32(dst_char, UNALIGNED_LOAD32(src_char));
100 UNALIGNED_STORE32(dst_char + 4, UNALIGNED_LOAD32(src_char + 4));
101 }
102}
103
104#ifdef NDEBUG
105
106#define DCHECK(cond) do {} while(0)
107#define DCHECK_LE(a, b) do {} while(0)
108#define DCHECK_GE(a, b) do {} while(0)
109#define DCHECK_EQ(a, b) do {} while(0)
110#define DCHECK_NE(a, b) do {} while(0)
111#define DCHECK_LT(a, b) do {} while(0)
112#define DCHECK_GT(a, b) do {} while(0)
113
114#else
115
116#define DCHECK(cond) CHECK(cond)
117#define DCHECK_LE(a, b) CHECK_LE(a, b)
118#define DCHECK_GE(a, b) CHECK_GE(a, b)
119#define DCHECK_EQ(a, b) CHECK_EQ(a, b)
120#define DCHECK_NE(a, b) CHECK_NE(a, b)
121#define DCHECK_LT(a, b) CHECK_LT(a, b)
122#define DCHECK_GT(a, b) CHECK_GT(a, b)
123
124#endif
125
126static inline bool is_little_endian(void)
127{
128#ifdef __LITTLE_ENDIAN__
129 return true;
130#endif
131 return false;
132}
133
134#if defined(__xlc__) // xlc compiler on AIX
135#define rd_clz(n) __cntlz4(n)
136#define rd_ctz(n) __cnttz4(n)
137#define rd_ctz64(n) __cnttz8(n)
138
139#elif defined(__SUNPRO_C) // Solaris Studio compiler on sun
140/*
141 * Source for following definitions is Hacker’s Delight, Second Edition by Henry S. Warren
142 * http://www.hackersdelight.org/permissions.htm
143 */
144u32 rd_clz(u32 x) {
145 u32 n;
146
147 if (x == 0) return(32);
148 n = 1;
149 if ((x >> 16) == 0) {n = n +16; x = x <<16;}
150 if ((x >> 24) == 0) {n = n + 8; x = x << 8;}
151 if ((x >> 28) == 0) {n = n + 4; x = x << 4;}
152 if ((x >> 30) == 0) {n = n + 2; x = x << 2;}
153 n = n - (x >> 31);
154 return n;
155}
156
157u32 rd_ctz(u32 x) {
158 u32 y;
159 u32 n;
160
161 if (x == 0) return 32;
162 n = 31;
163 y = x <<16; if (y != 0) {n = n -16; x = y;}
164 y = x << 8; if (y != 0) {n = n - 8; x = y;}
165 y = x << 4; if (y != 0) {n = n - 4; x = y;}
166 y = x << 2; if (y != 0) {n = n - 2; x = y;}
167 y = x << 1; if (y != 0) {n = n - 1;}
168 return n;
169}
170
171u64 rd_ctz64(u64 x) {
172 u64 y;
173 u64 n;
174
175 if (x == 0) return 64;
176 n = 63;
177 y = x <<32; if (y != 0) {n = n -32; x = y;}
178 y = x <<16; if (y != 0) {n = n -16; x = y;}
179 y = x << 8; if (y != 0) {n = n - 8; x = y;}
180 y = x << 4; if (y != 0) {n = n - 4; x = y;}
181 y = x << 2; if (y != 0) {n = n - 2; x = y;}
182 y = x << 1; if (y != 0) {n = n - 1;}
183 return n;
184}
185#elif !defined(_MSC_VER)
186#define rd_clz(n) __builtin_clz(n)
187#define rd_ctz(n) __builtin_ctz(n)
188#define rd_ctz64(n) __builtin_ctzll(n)
189#else
190#include <intrin.h>
191static int inline rd_clz(u32 x) {
192 int r = 0;
193 if (_BitScanForward(&r, x))
194 return 31 - r;
195 else
196 return 32;
197}
198
199static int inline rd_ctz(u32 x) {
200 int r = 0;
201 if (_BitScanForward(&r, x))
202 return r;
203 else
204 return 32;
205}
206
207static int inline rd_ctz64(u64 x) {
208#ifdef _M_X64
209 int r = 0;
210 if (_BitScanReverse64(&r, x))
211 return r;
212 else
213 return 64;
214#else
215 int r;
216 if ((r = rd_ctz(x & 0xffffffff)) < 32)
217 return r;
218 return 32 + rd_ctz(x >> 32);
219#endif
220}
221#endif
222
223
224static inline int log2_floor(u32 n)
225{
226 return n == 0 ? -1 : 31 ^ rd_clz(n);
227}
228
229static inline RD_UNUSED int find_lsb_set_non_zero(u32 n)
230{
231 return rd_ctz(n);
232}
233
234static inline RD_UNUSED int find_lsb_set_non_zero64(u64 n)
235{
236 return rd_ctz64(n);
237}
238
239#define kmax32 5
240
241/*
242 * Attempts to parse a varint32 from a prefix of the bytes in [ptr,limit-1].
243 * Never reads a character at or beyond limit. If a valid/terminated varint32
244 * was found in the range, stores it in *OUTPUT and returns a pointer just
245 * past the last byte of the varint32. Else returns NULL. On success,
246 * "result <= limit".
247 */
248static inline const char *varint_parse32_with_limit(const char *p,
249 const char *l,
250 u32 * OUTPUT)
251{
252 const unsigned char *ptr = (const unsigned char *)(p);
253 const unsigned char *limit = (const unsigned char *)(l);
254 u32 b, result;
255
256 if (ptr >= limit)
257 return NULL;
258 b = *(ptr++);
259 result = b & 127;
260 if (b < 128)
261 goto done;
262 if (ptr >= limit)
263 return NULL;
264 b = *(ptr++);
265 result |= (b & 127) << 7;
266 if (b < 128)
267 goto done;
268 if (ptr >= limit)
269 return NULL;
270 b = *(ptr++);
271 result |= (b & 127) << 14;
272 if (b < 128)
273 goto done;
274 if (ptr >= limit)
275 return NULL;
276 b = *(ptr++);
277 result |= (b & 127) << 21;
278 if (b < 128)
279 goto done;
280 if (ptr >= limit)
281 return NULL;
282 b = *(ptr++);
283 result |= (b & 127) << 28;
284 if (b < 16)
285 goto done;
286 return NULL; /* Value is too long to be a varint32 */
287done:
288 *OUTPUT = result;
289 return (const char *)(ptr);
290}
291
292/*
293 * REQUIRES "ptr" points to a buffer of length sufficient to hold "v".
294 * EFFECTS Encodes "v" into "ptr" and returns a pointer to the
295 * byte just past the last encoded byte.
296 */
297static inline char *varint_encode32(char *sptr, u32 v)
298{
299 /* Operate on characters as unsigneds */
300 unsigned char *ptr = (unsigned char *)(sptr);
301 static const int B = 128;
302
303 if (v < (1 << 7)) {
304 *(ptr++) = v;
305 } else if (v < (1 << 14)) {
306 *(ptr++) = v | B;
307 *(ptr++) = v >> 7;
308 } else if (v < (1 << 21)) {
309 *(ptr++) = v | B;
310 *(ptr++) = (v >> 7) | B;
311 *(ptr++) = v >> 14;
312 } else if (v < (1 << 28)) {
313 *(ptr++) = v | B;
314 *(ptr++) = (v >> 7) | B;
315 *(ptr++) = (v >> 14) | B;
316 *(ptr++) = v >> 21;
317 } else {
318 *(ptr++) = v | B;
319 *(ptr++) = (v >> 7) | B;
320 *(ptr++) = (v >> 14) | B;
321 *(ptr++) = (v >> 21) | B;
322 *(ptr++) = v >> 28;
323 }
324 return (char *)(ptr);
325}
326
327#ifdef SG
328
329static inline void *n_bytes_after_addr(void *addr, size_t n_bytes)
330{
331 return (void *) ((char *)addr + n_bytes);
332}
333
334struct source {
335 struct iovec *iov;
336 int iovlen;
337 int curvec;
338 int curoff;
339 size_t total;
340};
341
342/* Only valid at beginning when nothing is consumed */
343static inline int available(struct source *s)
344{
345 return (int) s->total;
346}
347
348static inline const char *peek(struct source *s, size_t *len)
349{
350 if (likely(s->curvec < s->iovlen)) {
351 struct iovec *iv = &s->iov[s->curvec];
352 if ((unsigned)s->curoff < (size_t)iv->iov_len) {
353 *len = iv->iov_len - s->curoff;
354 return n_bytes_after_addr(iv->iov_base, s->curoff);
355 }
356 }
357 *len = 0;
358 return NULL;
359}
360
361static inline void skip(struct source *s, size_t n)
362{
363 struct iovec *iv = &s->iov[s->curvec];
364 s->curoff += (int) n;
365 DCHECK_LE((unsigned)s->curoff, (size_t)iv->iov_len);
366 if ((unsigned)s->curoff >= (size_t)iv->iov_len &&
367 s->curvec + 1 < s->iovlen) {
368 s->curoff = 0;
369 s->curvec++;
370 }
371}
372
373struct sink {
374 struct iovec *iov;
375 int iovlen;
376 unsigned curvec;
377 unsigned curoff;
378 unsigned written;
379};
380
381static inline void append(struct sink *s, const char *data, size_t n)
382{
383 struct iovec *iov = &s->iov[s->curvec];
384 char *dst = n_bytes_after_addr(iov->iov_base, s->curoff);
385 size_t nlen = min_t(size_t, iov->iov_len - s->curoff, n);
386 if (data != dst)
387 memcpy(dst, data, nlen);
388 s->written += (int) n;
389 s->curoff += (int) nlen;
390 while ((n -= nlen) > 0) {
391 data += nlen;
392 s->curvec++;
393 DCHECK_LT((signed)s->curvec, s->iovlen);
394 iov++;
395 nlen = min_t(size_t, (size_t)iov->iov_len, n);
396 memcpy(iov->iov_base, data, nlen);
397 s->curoff = (int) nlen;
398 }
399}
400
401static inline void *sink_peek(struct sink *s, size_t n)
402{
403 struct iovec *iov = &s->iov[s->curvec];
404 if (s->curvec < (size_t)iov->iov_len && iov->iov_len - s->curoff >= n)
405 return n_bytes_after_addr(iov->iov_base, s->curoff);
406 return NULL;
407}
408
409#else
410
411struct source {
412 const char *ptr;
413 size_t left;
414};
415
416static inline int available(struct source *s)
417{
418 return s->left;
419}
420
421static inline const char *peek(struct source *s, size_t * len)
422{
423 *len = s->left;
424 return s->ptr;
425}
426
427static inline void skip(struct source *s, size_t n)
428{
429 s->left -= n;
430 s->ptr += n;
431}
432
433struct sink {
434 char *dest;
435};
436
437static inline void append(struct sink *s, const char *data, size_t n)
438{
439 if (data != s->dest)
440 memcpy(s->dest, data, n);
441 s->dest += n;
442}
443
444#define sink_peek(s, n) sink_peek_no_sg(s)
445
446static inline void *sink_peek_no_sg(const struct sink *s)
447{
448 return s->dest;
449}
450
451#endif
452
453struct writer {
454 char *base;
455 char *op;
456 char *op_limit;
457};
458
459/* Called before decompression */
460static inline void writer_set_expected_length(struct writer *w, size_t len)
461{
462 w->op_limit = w->op + len;
463}
464
465/* Called after decompression */
466static inline bool writer_check_length(struct writer *w)
467{
468 return w->op == w->op_limit;
469}
470
471/*
472 * Copy "len" bytes from "src" to "op", one byte at a time. Used for
473 * handling COPY operations where the input and output regions may
474 * overlap. For example, suppose:
475 * src == "ab"
476 * op == src + 2
477 * len == 20
478 * After IncrementalCopy(src, op, len), the result will have
479 * eleven copies of "ab"
480 * ababababababababababab
481 * Note that this does not match the semantics of either memcpy()
482 * or memmove().
483 */
484static inline void incremental_copy(const char *src, char *op, ssize_t len)
485{
486 DCHECK_GT(len, 0);
487 do {
488 *op++ = *src++;
489 } while (--len > 0);
490}
491
492/*
493 * Equivalent to IncrementalCopy except that it can write up to ten extra
494 * bytes after the end of the copy, and that it is faster.
495 *
496 * The main part of this loop is a simple copy of eight bytes at a time until
497 * we've copied (at least) the requested amount of bytes. However, if op and
498 * src are less than eight bytes apart (indicating a repeating pattern of
499 * length < 8), we first need to expand the pattern in order to get the correct
500 * results. For instance, if the buffer looks like this, with the eight-byte
501 * <src> and <op> patterns marked as intervals:
502 *
503 * abxxxxxxxxxxxx
504 * [------] src
505 * [------] op
506 *
507 * a single eight-byte copy from <src> to <op> will repeat the pattern once,
508 * after which we can move <op> two bytes without moving <src>:
509 *
510 * ababxxxxxxxxxx
511 * [------] src
512 * [------] op
513 *
514 * and repeat the exercise until the two no longer overlap.
515 *
516 * This allows us to do very well in the special case of one single byte
517 * repeated many times, without taking a big hit for more general cases.
518 *
519 * The worst case of extra writing past the end of the match occurs when
520 * op - src == 1 and len == 1; the last copy will read from byte positions
521 * [0..7] and write to [4..11], whereas it was only supposed to write to
522 * position 1. Thus, ten excess bytes.
523 */
524
525#define kmax_increment_copy_overflow 10
526
527static inline void incremental_copy_fast_path(const char *src, char *op,
528 ssize_t len)
529{
530 while (op - src < 8) {
531 unaligned_copy64(src, op);
532 len -= op - src;
533 op += op - src;
534 }
535 while (len > 0) {
536 unaligned_copy64(src, op);
537 src += 8;
538 op += 8;
539 len -= 8;
540 }
541}
542
543static inline bool writer_append_from_self(struct writer *w, u32 offset,
544 u32 len)
545{
546 char *const op = w->op;
547 CHECK_LE(op, w->op_limit);
548 const u32 space_left = (u32) (w->op_limit - op);
549
550 if ((unsigned)(op - w->base) <= offset - 1u) /* -1u catches offset==0 */
551 return false;
552 if (len <= 16 && offset >= 8 && space_left >= 16) {
553 /* Fast path, used for the majority (70-80%) of dynamic
554 * invocations. */
555 unaligned_copy64(op - offset, op);
556 unaligned_copy64(op - offset + 8, op + 8);
557 } else {
558 if (space_left >= len + kmax_increment_copy_overflow) {
559 incremental_copy_fast_path(op - offset, op, len);
560 } else {
561 if (space_left < len) {
562 return false;
563 }
564 incremental_copy(op - offset, op, len);
565 }
566 }
567
568 w->op = op + len;
569 return true;
570}
571
572static inline bool writer_append(struct writer *w, const char *ip, u32 len)
573{
574 char *const op = w->op;
575 CHECK_LE(op, w->op_limit);
576 const u32 space_left = (u32) (w->op_limit - op);
577 if (space_left < len)
578 return false;
579 memcpy(op, ip, len);
580 w->op = op + len;
581 return true;
582}
583
584static inline bool writer_try_fast_append(struct writer *w, const char *ip,
585 u32 available_bytes, u32 len)
586{
587 char *const op = w->op;
588 const int space_left = (int) (w->op_limit - op);
589 if (len <= 16 && available_bytes >= 16 && space_left >= 16) {
590 /* Fast path, used for the majority (~95%) of invocations */
591 unaligned_copy64(ip, op);
592 unaligned_copy64(ip + 8, op + 8);
593 w->op = op + len;
594 return true;
595 }
596 return false;
597}
598
599/*
600 * Any hash function will produce a valid compressed bitstream, but a good
601 * hash function reduces the number of collisions and thus yields better
602 * compression for compressible input, and more speed for incompressible
603 * input. Of course, it doesn't hurt if the hash function is reasonably fast
604 * either, as it gets called a lot.
605 */
606static inline u32 hash_bytes(u32 bytes, int shift)
607{
608 u32 kmul = 0x1e35a7bd;
609 return (bytes * kmul) >> shift;
610}
611
612static inline u32 hash(const char *p, int shift)
613{
614 return hash_bytes(UNALIGNED_LOAD32(p), shift);
615}
616
617/*
618 * Compressed data can be defined as:
619 * compressed := item* literal*
620 * item := literal* copy
621 *
622 * The trailing literal sequence has a space blowup of at most 62/60
623 * since a literal of length 60 needs one tag byte + one extra byte
624 * for length information.
625 *
626 * Item blowup is trickier to measure. Suppose the "copy" op copies
627 * 4 bytes of data. Because of a special check in the encoding code,
628 * we produce a 4-byte copy only if the offset is < 65536. Therefore
629 * the copy op takes 3 bytes to encode, and this type of item leads
630 * to at most the 62/60 blowup for representing literals.
631 *
632 * Suppose the "copy" op copies 5 bytes of data. If the offset is big
633 * enough, it will take 5 bytes to encode the copy op. Therefore the
634 * worst case here is a one-byte literal followed by a five-byte copy.
635 * I.e., 6 bytes of input turn into 7 bytes of "compressed" data.
636 *
637 * This last factor dominates the blowup, so the final estimate is:
638 */
639size_t rd_kafka_snappy_max_compressed_length(size_t source_len)
640{
641 return 32 + source_len + source_len / 6;
642}
643EXPORT_SYMBOL(rd_kafka_snappy_max_compressed_length);
644
645enum {
646 LITERAL = 0,
647 COPY_1_BYTE_OFFSET = 1, /* 3 bit length + 3 bits of offset in opcode */
648 COPY_2_BYTE_OFFSET = 2,
649 COPY_4_BYTE_OFFSET = 3
650};
651
652static inline char *emit_literal(char *op,
653 const char *literal,
654 int len, bool allow_fast_path)
655{
656 int n = len - 1; /* Zero-length literals are disallowed */
657
658 if (n < 60) {
659 /* Fits in tag byte */
660 *op++ = LITERAL | (n << 2);
661
662/*
663 * The vast majority of copies are below 16 bytes, for which a
664 * call to memcpy is overkill. This fast path can sometimes
665 * copy up to 15 bytes too much, but that is okay in the
666 * main loop, since we have a bit to go on for both sides:
667 *
668 * - The input will always have kInputMarginBytes = 15 extra
669 * available bytes, as long as we're in the main loop, and
670 * if not, allow_fast_path = false.
671 * - The output will always have 32 spare bytes (see
672 * MaxCompressedLength).
673 */
674 if (allow_fast_path && len <= 16) {
675 unaligned_copy64(literal, op);
676 unaligned_copy64(literal + 8, op + 8);
677 return op + len;
678 }
679 } else {
680 /* Encode in upcoming bytes */
681 char *base = op;
682 int count = 0;
683 op++;
684 while (n > 0) {
685 *op++ = n & 0xff;
686 n >>= 8;
687 count++;
688 }
689 DCHECK(count >= 1);
690 DCHECK(count <= 4);
691 *base = LITERAL | ((59 + count) << 2);
692 }
693 memcpy(op, literal, len);
694 return op + len;
695}
696
697static inline char *emit_copy_less_than64(char *op, int offset, int len)
698{
699 DCHECK_LE(len, 64);
700 DCHECK_GE(len, 4);
701 DCHECK_LT(offset, 65536);
702
703 if ((len < 12) && (offset < 2048)) {
704 int len_minus_4 = len - 4;
705 DCHECK(len_minus_4 < 8); /* Must fit in 3 bits */
706 *op++ =
707 COPY_1_BYTE_OFFSET + ((len_minus_4) << 2) + ((offset >> 8)
708 << 5);
709 *op++ = offset & 0xff;
710 } else {
711 *op++ = COPY_2_BYTE_OFFSET + ((len - 1) << 2);
712 put_unaligned_le16(offset, op);
713 op += 2;
714 }
715 return op;
716}
717
718static inline char *emit_copy(char *op, int offset, int len)
719{
720 /*
721 * Emit 64 byte copies but make sure to keep at least four bytes
722 * reserved
723 */
724 while (len >= 68) {
725 op = emit_copy_less_than64(op, offset, 64);
726 len -= 64;
727 }
728
729 /*
730 * Emit an extra 60 byte copy if have too much data to fit in
731 * one copy
732 */
733 if (len > 64) {
734 op = emit_copy_less_than64(op, offset, 60);
735 len -= 60;
736 }
737
738 /* Emit remainder */
739 op = emit_copy_less_than64(op, offset, len);
740 return op;
741}
742
743/**
744 * rd_kafka_snappy_uncompressed_length - return length of uncompressed output.
745 * @start: compressed buffer
746 * @n: length of compressed buffer.
747 * @result: Write the length of the uncompressed output here.
748 *
749 * Returns true when successfull, otherwise false.
750 */
751bool rd_kafka_snappy_uncompressed_length(const char *start, size_t n, size_t * result)
752{
753 u32 v = 0;
754 const char *limit = start + n;
755 if (varint_parse32_with_limit(start, limit, &v) != NULL) {
756 *result = v;
757 return true;
758 } else {
759 return false;
760 }
761}
762EXPORT_SYMBOL(rd_kafka_snappy_uncompressed_length);
763
764/*
765 * The size of a compression block. Note that many parts of the compression
766 * code assumes that kBlockSize <= 65536; in particular, the hash table
767 * can only store 16-bit offsets, and EmitCopy() also assumes the offset
768 * is 65535 bytes or less. Note also that if you change this, it will
769 * affect the framing format
770 * Note that there might be older data around that is compressed with larger
771 * block sizes, so the decompression code should not rely on the
772 * non-existence of long backreferences.
773 */
774#define kblock_log 16
775#define kblock_size (1 << kblock_log)
776
777/*
778 * This value could be halfed or quartered to save memory
779 * at the cost of slightly worse compression.
780 */
781#define kmax_hash_table_bits 14
782#define kmax_hash_table_size (1U << kmax_hash_table_bits)
783
784/*
785 * Use smaller hash table when input.size() is smaller, since we
786 * fill the table, incurring O(hash table size) overhead for
787 * compression, and if the input is short, we won't need that
788 * many hash table entries anyway.
789 */
790static u16 *get_hash_table(struct snappy_env *env, size_t input_size,
791 int *table_size)
792{
793 unsigned htsize = 256;
794
795 DCHECK(kmax_hash_table_size >= 256);
796 while (htsize < kmax_hash_table_size && htsize < input_size)
797 htsize <<= 1;
798 CHECK_EQ(0, htsize & (htsize - 1));
799 CHECK_LE(htsize, kmax_hash_table_size);
800
801 u16 *table;
802 table = env->hash_table;
803
804 *table_size = htsize;
805 memset(table, 0, htsize * sizeof(*table));
806 return table;
807}
808
809/*
810 * Return the largest n such that
811 *
812 * s1[0,n-1] == s2[0,n-1]
813 * and n <= (s2_limit - s2).
814 *
815 * Does not read *s2_limit or beyond.
816 * Does not read *(s1 + (s2_limit - s2)) or beyond.
817 * Requires that s2_limit >= s2.
818 *
819 * Separate implementation for x86_64, for speed. Uses the fact that
820 * x86_64 is little endian.
821 */
822#if defined(__LITTLE_ENDIAN__) && BITS_PER_LONG == 64
823static inline int find_match_length(const char *s1,
824 const char *s2, const char *s2_limit)
825{
826 int matched = 0;
827
828 DCHECK_GE(s2_limit, s2);
829 /*
830 * Find out how long the match is. We loop over the data 64 bits at a
831 * time until we find a 64-bit block that doesn't match; then we find
832 * the first non-matching bit and use that to calculate the total
833 * length of the match.
834 */
835 while (likely(s2 <= s2_limit - 8)) {
836 if (unlikely
837 (UNALIGNED_LOAD64(s2) == UNALIGNED_LOAD64(s1 + matched))) {
838 s2 += 8;
839 matched += 8;
840 } else {
841 /*
842 * On current (mid-2008) Opteron models there
843 * is a 3% more efficient code sequence to
844 * find the first non-matching byte. However,
845 * what follows is ~10% better on Intel Core 2
846 * and newer, and we expect AMD's bsf
847 * instruction to improve.
848 */
849 u64 x =
850 UNALIGNED_LOAD64(s2) ^ UNALIGNED_LOAD64(s1 +
851 matched);
852 int matching_bits = find_lsb_set_non_zero64(x);
853 matched += matching_bits >> 3;
854 return matched;
855 }
856 }
857 while (likely(s2 < s2_limit)) {
858 if (likely(s1[matched] == *s2)) {
859 ++s2;
860 ++matched;
861 } else {
862 return matched;
863 }
864 }
865 return matched;
866}
867#else
868static inline int find_match_length(const char *s1,
869 const char *s2, const char *s2_limit)
870{
871 /* Implementation based on the x86-64 version, above. */
872 DCHECK_GE(s2_limit, s2);
873 int matched = 0;
874
875 while (s2 <= s2_limit - 4 &&
876 UNALIGNED_LOAD32(s2) == UNALIGNED_LOAD32(s1 + matched)) {
877 s2 += 4;
878 matched += 4;
879 }
880 if (is_little_endian() && s2 <= s2_limit - 4) {
881 u32 x =
882 UNALIGNED_LOAD32(s2) ^ UNALIGNED_LOAD32(s1 + matched);
883 int matching_bits = find_lsb_set_non_zero(x);
884 matched += matching_bits >> 3;
885 } else {
886 while ((s2 < s2_limit) && (s1[matched] == *s2)) {
887 ++s2;
888 ++matched;
889 }
890 }
891 return matched;
892}
893#endif
894
895/*
896 * For 0 <= offset <= 4, GetU32AtOffset(GetEightBytesAt(p), offset) will
897 * equal UNALIGNED_LOAD32(p + offset). Motivation: On x86-64 hardware we have
898 * empirically found that overlapping loads such as
899 * UNALIGNED_LOAD32(p) ... UNALIGNED_LOAD32(p+1) ... UNALIGNED_LOAD32(p+2)
900 * are slower than UNALIGNED_LOAD64(p) followed by shifts and casts to u32.
901 *
902 * We have different versions for 64- and 32-bit; ideally we would avoid the
903 * two functions and just inline the UNALIGNED_LOAD64 call into
904 * GetUint32AtOffset, but GCC (at least not as of 4.6) is seemingly not clever
905 * enough to avoid loading the value multiple times then. For 64-bit, the load
906 * is done when GetEightBytesAt() is called, whereas for 32-bit, the load is
907 * done at GetUint32AtOffset() time.
908 */
909
910#if BITS_PER_LONG == 64
911
912typedef u64 eight_bytes_reference;
913
914static inline eight_bytes_reference get_eight_bytes_at(const char* ptr)
915{
916 return UNALIGNED_LOAD64(ptr);
917}
918
919static inline u32 get_u32_at_offset(u64 v, int offset)
920{
921 DCHECK_GE(offset, 0);
922 DCHECK_LE(offset, 4);
923 return v >> (is_little_endian()? 8 * offset : 32 - 8 * offset);
924}
925
926#else
927
928typedef const char *eight_bytes_reference;
929
930static inline eight_bytes_reference get_eight_bytes_at(const char* ptr)
931{
932 return ptr;
933}
934
935static inline u32 get_u32_at_offset(const char *v, int offset)
936{
937 DCHECK_GE(offset, 0);
938 DCHECK_LE(offset, 4);
939 return UNALIGNED_LOAD32(v + offset);
940}
941#endif
942
943/*
944 * Flat array compression that does not emit the "uncompressed length"
945 * prefix. Compresses "input" string to the "*op" buffer.
946 *
947 * REQUIRES: "input" is at most "kBlockSize" bytes long.
948 * REQUIRES: "op" points to an array of memory that is at least
949 * "MaxCompressedLength(input.size())" in size.
950 * REQUIRES: All elements in "table[0..table_size-1]" are initialized to zero.
951 * REQUIRES: "table_size" is a power of two
952 *
953 * Returns an "end" pointer into "op" buffer.
954 * "end - op" is the compressed size of "input".
955 */
956
957static char *compress_fragment(const char *const input,
958 const size_t input_size,
959 char *op, u16 * table, const unsigned table_size)
960{
961 /* "ip" is the input pointer, and "op" is the output pointer. */
962 const char *ip = input;
963 CHECK_LE(input_size, kblock_size);
964 CHECK_EQ(table_size & (table_size - 1), 0);
965 const int shift = 32 - log2_floor(table_size);
966 DCHECK_EQ(UINT_MAX >> shift, table_size - 1);
967 const char *ip_end = input + input_size;
968 const char *baseip = ip;
969 /*
970 * Bytes in [next_emit, ip) will be emitted as literal bytes. Or
971 * [next_emit, ip_end) after the main loop.
972 */
973 const char *next_emit = ip;
974
975 const unsigned kinput_margin_bytes = 15;
976
977 if (likely(input_size >= kinput_margin_bytes)) {
978 const char *const ip_limit = input + input_size -
979 kinput_margin_bytes;
980
981 u32 next_hash;
982 for (next_hash = hash(++ip, shift);;) {
983 DCHECK_LT(next_emit, ip);
984/*
985 * The body of this loop calls EmitLiteral once and then EmitCopy one or
986 * more times. (The exception is that when we're close to exhausting
987 * the input we goto emit_remainder.)
988 *
989 * In the first iteration of this loop we're just starting, so
990 * there's nothing to copy, so calling EmitLiteral once is
991 * necessary. And we only start a new iteration when the
992 * current iteration has determined that a call to EmitLiteral will
993 * precede the next call to EmitCopy (if any).
994 *
995 * Step 1: Scan forward in the input looking for a 4-byte-long match.
996 * If we get close to exhausting the input then goto emit_remainder.
997 *
998 * Heuristic match skipping: If 32 bytes are scanned with no matches
999 * found, start looking only at every other byte. If 32 more bytes are
1000 * scanned, look at every third byte, etc.. When a match is found,
1001 * immediately go back to looking at every byte. This is a small loss
1002 * (~5% performance, ~0.1% density) for lcompressible data due to more
1003 * bookkeeping, but for non-compressible data (such as JPEG) it's a huge
1004 * win since the compressor quickly "realizes" the data is incompressible
1005 * and doesn't bother looking for matches everywhere.
1006 *
1007 * The "skip" variable keeps track of how many bytes there are since the
1008 * last match; dividing it by 32 (ie. right-shifting by five) gives the
1009 * number of bytes to move ahead for each iteration.
1010 */
1011 u32 skip_bytes = 32;
1012
1013 const char *next_ip = ip;
1014 const char *candidate;
1015 do {
1016 ip = next_ip;
1017 u32 hval = next_hash;
1018 DCHECK_EQ(hval, hash(ip, shift));
1019 u32 bytes_between_hash_lookups = skip_bytes++ >> 5;
1020 next_ip = ip + bytes_between_hash_lookups;
1021 if (unlikely(next_ip > ip_limit)) {
1022 goto emit_remainder;
1023 }
1024 next_hash = hash(next_ip, shift);
1025 candidate = baseip + table[hval];
1026 DCHECK_GE(candidate, baseip);
1027 DCHECK_LT(candidate, ip);
1028
1029 table[hval] = (u16) (ip - baseip);
1030 } while (likely(UNALIGNED_LOAD32(ip) !=
1031 UNALIGNED_LOAD32(candidate)));
1032
1033/*
1034 * Step 2: A 4-byte match has been found. We'll later see if more
1035 * than 4 bytes match. But, prior to the match, input
1036 * bytes [next_emit, ip) are unmatched. Emit them as "literal bytes."
1037 */
1038 DCHECK_LE(next_emit + 16, ip_end);
1039 op = emit_literal(op, next_emit, (int) (ip - next_emit), true);
1040
1041/*
1042 * Step 3: Call EmitCopy, and then see if another EmitCopy could
1043 * be our next move. Repeat until we find no match for the
1044 * input immediately after what was consumed by the last EmitCopy call.
1045 *
1046 * If we exit this loop normally then we need to call EmitLiteral next,
1047 * though we don't yet know how big the literal will be. We handle that
1048 * by proceeding to the next iteration of the main loop. We also can exit
1049 * this loop via goto if we get close to exhausting the input.
1050 */
1051 eight_bytes_reference input_bytes;
1052 u32 candidate_bytes = 0;
1053
1054 do {
1055/*
1056 * We have a 4-byte match at ip, and no need to emit any
1057 * "literal bytes" prior to ip.
1058 */
1059 const char *base = ip;
1060 int matched = 4 +
1061 find_match_length(candidate + 4, ip + 4,
1062 ip_end);
1063 ip += matched;
1064 int offset = (int) (base - candidate);
1065 DCHECK_EQ(0, memcmp(base, candidate, matched));
1066 op = emit_copy(op, offset, matched);
1067/*
1068 * We could immediately start working at ip now, but to improve
1069 * compression we first update table[Hash(ip - 1, ...)].
1070 */
1071 const char *insert_tail = ip - 1;
1072 next_emit = ip;
1073 if (unlikely(ip >= ip_limit)) {
1074 goto emit_remainder;
1075 }
1076 input_bytes = get_eight_bytes_at(insert_tail);
1077 u32 prev_hash =
1078 hash_bytes(get_u32_at_offset
1079 (input_bytes, 0), shift);
1080 table[prev_hash] = (u16) (ip - baseip - 1);
1081 u32 cur_hash =
1082 hash_bytes(get_u32_at_offset
1083 (input_bytes, 1), shift);
1084 candidate = baseip + table[cur_hash];
1085 candidate_bytes = UNALIGNED_LOAD32(candidate);
1086 table[cur_hash] = (u16) (ip - baseip);
1087 } while (get_u32_at_offset(input_bytes, 1) ==
1088 candidate_bytes);
1089
1090 next_hash =
1091 hash_bytes(get_u32_at_offset(input_bytes, 2),
1092 shift);
1093 ++ip;
1094 }
1095 }
1096
1097emit_remainder:
1098 /* Emit the remaining bytes as a literal */
1099 if (next_emit < ip_end)
1100 op = emit_literal(op, next_emit, (int) (ip_end - next_emit), false);
1101
1102 return op;
1103}
1104
1105/*
1106 * -----------------------------------------------------------------------
1107 * Lookup table for decompression code. Generated by ComputeTable() below.
1108 * -----------------------------------------------------------------------
1109 */
1110
1111/* Mapping from i in range [0,4] to a mask to extract the bottom 8*i bits */
1112static const u32 wordmask[] = {
1113 0u, 0xffu, 0xffffu, 0xffffffu, 0xffffffffu
1114};
1115
1116/*
1117 * Data stored per entry in lookup table:
1118 * Range Bits-used Description
1119 * ------------------------------------
1120 * 1..64 0..7 Literal/copy length encoded in opcode byte
1121 * 0..7 8..10 Copy offset encoded in opcode byte / 256
1122 * 0..4 11..13 Extra bytes after opcode
1123 *
1124 * We use eight bits for the length even though 7 would have sufficed
1125 * because of efficiency reasons:
1126 * (1) Extracting a byte is faster than a bit-field
1127 * (2) It properly aligns copy offset so we do not need a <<8
1128 */
1129static const u16 char_table[256] = {
1130 0x0001, 0x0804, 0x1001, 0x2001, 0x0002, 0x0805, 0x1002, 0x2002,
1131 0x0003, 0x0806, 0x1003, 0x2003, 0x0004, 0x0807, 0x1004, 0x2004,
1132 0x0005, 0x0808, 0x1005, 0x2005, 0x0006, 0x0809, 0x1006, 0x2006,
1133 0x0007, 0x080a, 0x1007, 0x2007, 0x0008, 0x080b, 0x1008, 0x2008,
1134 0x0009, 0x0904, 0x1009, 0x2009, 0x000a, 0x0905, 0x100a, 0x200a,
1135 0x000b, 0x0906, 0x100b, 0x200b, 0x000c, 0x0907, 0x100c, 0x200c,
1136 0x000d, 0x0908, 0x100d, 0x200d, 0x000e, 0x0909, 0x100e, 0x200e,
1137 0x000f, 0x090a, 0x100f, 0x200f, 0x0010, 0x090b, 0x1010, 0x2010,
1138 0x0011, 0x0a04, 0x1011, 0x2011, 0x0012, 0x0a05, 0x1012, 0x2012,
1139 0x0013, 0x0a06, 0x1013, 0x2013, 0x0014, 0x0a07, 0x1014, 0x2014,
1140 0x0015, 0x0a08, 0x1015, 0x2015, 0x0016, 0x0a09, 0x1016, 0x2016,
1141 0x0017, 0x0a0a, 0x1017, 0x2017, 0x0018, 0x0a0b, 0x1018, 0x2018,
1142 0x0019, 0x0b04, 0x1019, 0x2019, 0x001a, 0x0b05, 0x101a, 0x201a,
1143 0x001b, 0x0b06, 0x101b, 0x201b, 0x001c, 0x0b07, 0x101c, 0x201c,
1144 0x001d, 0x0b08, 0x101d, 0x201d, 0x001e, 0x0b09, 0x101e, 0x201e,
1145 0x001f, 0x0b0a, 0x101f, 0x201f, 0x0020, 0x0b0b, 0x1020, 0x2020,
1146 0x0021, 0x0c04, 0x1021, 0x2021, 0x0022, 0x0c05, 0x1022, 0x2022,
1147 0x0023, 0x0c06, 0x1023, 0x2023, 0x0024, 0x0c07, 0x1024, 0x2024,
1148 0x0025, 0x0c08, 0x1025, 0x2025, 0x0026, 0x0c09, 0x1026, 0x2026,
1149 0x0027, 0x0c0a, 0x1027, 0x2027, 0x0028, 0x0c0b, 0x1028, 0x2028,
1150 0x0029, 0x0d04, 0x1029, 0x2029, 0x002a, 0x0d05, 0x102a, 0x202a,
1151 0x002b, 0x0d06, 0x102b, 0x202b, 0x002c, 0x0d07, 0x102c, 0x202c,
1152 0x002d, 0x0d08, 0x102d, 0x202d, 0x002e, 0x0d09, 0x102e, 0x202e,
1153 0x002f, 0x0d0a, 0x102f, 0x202f, 0x0030, 0x0d0b, 0x1030, 0x2030,
1154 0x0031, 0x0e04, 0x1031, 0x2031, 0x0032, 0x0e05, 0x1032, 0x2032,
1155 0x0033, 0x0e06, 0x1033, 0x2033, 0x0034, 0x0e07, 0x1034, 0x2034,
1156 0x0035, 0x0e08, 0x1035, 0x2035, 0x0036, 0x0e09, 0x1036, 0x2036,
1157 0x0037, 0x0e0a, 0x1037, 0x2037, 0x0038, 0x0e0b, 0x1038, 0x2038,
1158 0x0039, 0x0f04, 0x1039, 0x2039, 0x003a, 0x0f05, 0x103a, 0x203a,
1159 0x003b, 0x0f06, 0x103b, 0x203b, 0x003c, 0x0f07, 0x103c, 0x203c,
1160 0x0801, 0x0f08, 0x103d, 0x203d, 0x1001, 0x0f09, 0x103e, 0x203e,
1161 0x1801, 0x0f0a, 0x103f, 0x203f, 0x2001, 0x0f0b, 0x1040, 0x2040
1162};
1163
1164struct snappy_decompressor {
1165 struct source *reader; /* Underlying source of bytes to decompress */
1166 const char *ip; /* Points to next buffered byte */
1167 const char *ip_limit; /* Points just past buffered bytes */
1168 u32 peeked; /* Bytes peeked from reader (need to skip) */
1169 bool eof; /* Hit end of input without an error? */
1170 char scratch[5]; /* Temporary buffer for peekfast boundaries */
1171};
1172
1173static void
1174init_snappy_decompressor(struct snappy_decompressor *d, struct source *reader)
1175{
1176 d->reader = reader;
1177 d->ip = NULL;
1178 d->ip_limit = NULL;
1179 d->peeked = 0;
1180 d->eof = false;
1181}
1182
1183static void exit_snappy_decompressor(struct snappy_decompressor *d)
1184{
1185 skip(d->reader, d->peeked);
1186}
1187
1188/*
1189 * Read the uncompressed length stored at the start of the compressed data.
1190 * On succcess, stores the length in *result and returns true.
1191 * On failure, returns false.
1192 */
1193static bool read_uncompressed_length(struct snappy_decompressor *d,
1194 u32 * result)
1195{
1196 DCHECK(d->ip == NULL); /*
1197 * Must not have read anything yet
1198 * Length is encoded in 1..5 bytes
1199 */
1200 *result = 0;
1201 u32 shift = 0;
1202 while (true) {
1203 if (shift >= 32)
1204 return false;
1205 size_t n;
1206 const char *ip = peek(d->reader, &n);
1207 if (n == 0)
1208 return false;
1209 const unsigned char c = *(const unsigned char *)(ip);
1210 skip(d->reader, 1);
1211 *result |= (u32) (c & 0x7f) << shift;
1212 if (c < 128) {
1213 break;
1214 }
1215 shift += 7;
1216 }
1217 return true;
1218}
1219
1220static bool refill_tag(struct snappy_decompressor *d);
1221
1222/*
1223 * Process the next item found in the input.
1224 * Returns true if successful, false on error or end of input.
1225 */
1226static void decompress_all_tags(struct snappy_decompressor *d,
1227 struct writer *writer)
1228{
1229 const char *ip = d->ip;
1230
1231 /*
1232 * We could have put this refill fragment only at the beginning of the loop.
1233 * However, duplicating it at the end of each branch gives the compiler more
1234 * scope to optimize the <ip_limit_ - ip> expression based on the local
1235 * context, which overall increases speed.
1236 */
1237#define MAYBE_REFILL() \
1238 if (d->ip_limit - ip < 5) { \
1239 d->ip = ip; \
1240 if (!refill_tag(d)) return; \
1241 ip = d->ip; \
1242 }
1243
1244
1245 MAYBE_REFILL();
1246 for (;;) {
1247 if (d->ip_limit - ip < 5) {
1248 d->ip = ip;
1249 if (!refill_tag(d))
1250 return;
1251 ip = d->ip;
1252 }
1253
1254 const unsigned char c = *(const unsigned char *)(ip++);
1255
1256 if ((c & 0x3) == LITERAL) {
1257 u32 literal_length = (c >> 2) + 1;
1258 if (writer_try_fast_append(writer, ip, (u32) (d->ip_limit - ip),
1259 literal_length)) {
1260 DCHECK_LT(literal_length, 61);
1261 ip += literal_length;
1262 MAYBE_REFILL();
1263 continue;
1264 }
1265 if (unlikely(literal_length >= 61)) {
1266 /* Long literal */
1267 const u32 literal_ll = literal_length - 60;
1268 literal_length = (get_unaligned_le32(ip) &
1269 wordmask[literal_ll]) + 1;
1270 ip += literal_ll;
1271 }
1272
1273 u32 avail = (u32) (d->ip_limit - ip);
1274 while (avail < literal_length) {
1275 if (!writer_append(writer, ip, avail))
1276 return;
1277 literal_length -= avail;
1278 skip(d->reader, d->peeked);
1279 size_t n;
1280 ip = peek(d->reader, &n);
1281 avail = (u32) n;
1282 d->peeked = avail;
1283 if (avail == 0)
1284 return; /* Premature end of input */
1285 d->ip_limit = ip + avail;
1286 }
1287 if (!writer_append(writer, ip, literal_length))
1288 return;
1289 ip += literal_length;
1290 MAYBE_REFILL();
1291 } else {
1292 const u32 entry = char_table[c];
1293 const u32 trailer = get_unaligned_le32(ip) &
1294 wordmask[entry >> 11];
1295 const u32 length = entry & 0xff;
1296 ip += entry >> 11;
1297
1298 /*
1299 * copy_offset/256 is encoded in bits 8..10.
1300 * By just fetching those bits, we get
1301 * copy_offset (since the bit-field starts at
1302 * bit 8).
1303 */
1304 const u32 copy_offset = entry & 0x700;
1305 if (!writer_append_from_self(writer,
1306 copy_offset + trailer,
1307 length))
1308 return;
1309 MAYBE_REFILL();
1310 }
1311 }
1312}
1313
1314#undef MAYBE_REFILL
1315
1316static bool refill_tag(struct snappy_decompressor *d)
1317{
1318 const char *ip = d->ip;
1319
1320 if (ip == d->ip_limit) {
1321 size_t n;
1322 /* Fetch a new fragment from the reader */
1323 skip(d->reader, d->peeked); /* All peeked bytes are used up */
1324 ip = peek(d->reader, &n);
1325 d->peeked = (u32) n;
1326 if (n == 0) {
1327 d->eof = true;
1328 return false;
1329 }
1330 d->ip_limit = ip + n;
1331 }
1332
1333 /* Read the tag character */
1334 DCHECK_LT(ip, d->ip_limit);
1335 const unsigned char c = *(const unsigned char *)(ip);
1336 const u32 entry = char_table[c];
1337 const u32 needed = (entry >> 11) + 1; /* +1 byte for 'c' */
1338 DCHECK_LE(needed, sizeof(d->scratch));
1339
1340 /* Read more bytes from reader if needed */
1341 u32 nbuf = (u32) (d->ip_limit - ip);
1342
1343 if (nbuf < needed) {
1344 /*
1345 * Stitch together bytes from ip and reader to form the word
1346 * contents. We store the needed bytes in "scratch". They
1347 * will be consumed immediately by the caller since we do not
1348 * read more than we need.
1349 */
1350 memmove(d->scratch, ip, nbuf);
1351 skip(d->reader, d->peeked); /* All peeked bytes are used up */
1352 d->peeked = 0;
1353 while (nbuf < needed) {
1354 size_t length;
1355 const char *src = peek(d->reader, &length);
1356 if (length == 0)
1357 return false;
1358 u32 to_add = min_t(u32, needed - nbuf, (u32) length);
1359 memcpy(d->scratch + nbuf, src, to_add);
1360 nbuf += to_add;
1361 skip(d->reader, to_add);
1362 }
1363 DCHECK_EQ(nbuf, needed);
1364 d->ip = d->scratch;
1365 d->ip_limit = d->scratch + needed;
1366 } else if (nbuf < 5) {
1367 /*
1368 * Have enough bytes, but move into scratch so that we do not
1369 * read past end of input
1370 */
1371 memmove(d->scratch, ip, nbuf);
1372 skip(d->reader, d->peeked); /* All peeked bytes are used up */
1373 d->peeked = 0;
1374 d->ip = d->scratch;
1375 d->ip_limit = d->scratch + nbuf;
1376 } else {
1377 /* Pass pointer to buffer returned by reader. */
1378 d->ip = ip;
1379 }
1380 return true;
1381}
1382
1383static int internal_uncompress(struct source *r,
1384 struct writer *writer, u32 max_len)
1385{
1386 struct snappy_decompressor decompressor;
1387 u32 uncompressed_len = 0;
1388
1389 init_snappy_decompressor(&decompressor, r);
1390
1391 if (!read_uncompressed_length(&decompressor, &uncompressed_len))
1392 return -EIO;
1393 /* Protect against possible DoS attack */
1394 if ((u64) (uncompressed_len) > max_len)
1395 return -EIO;
1396
1397 writer_set_expected_length(writer, uncompressed_len);
1398
1399 /* Process the entire input */
1400 decompress_all_tags(&decompressor, writer);
1401
1402 exit_snappy_decompressor(&decompressor);
1403 if (decompressor.eof && writer_check_length(writer))
1404 return 0;
1405 return -EIO;
1406}
1407
1408static inline int sn_compress(struct snappy_env *env, struct source *reader,
1409 struct sink *writer)
1410{
1411 int err;
1412 size_t written = 0;
1413 int N = available(reader);
1414 char ulength[kmax32];
1415 char *p = varint_encode32(ulength, N);
1416
1417 append(writer, ulength, p - ulength);
1418 written += (p - ulength);
1419
1420 while (N > 0) {
1421 /* Get next block to compress (without copying if possible) */
1422 size_t fragment_size;
1423 const char *fragment = peek(reader, &fragment_size);
1424 if (fragment_size == 0) {
1425 err = -EIO;
1426 goto out;
1427 }
1428 const unsigned num_to_read = min_t(int, N, kblock_size);
1429 size_t bytes_read = fragment_size;
1430
1431 int pending_advance = 0;
1432 if (bytes_read >= num_to_read) {
1433 /* Buffer returned by reader is large enough */
1434 pending_advance = num_to_read;
1435 fragment_size = num_to_read;
1436 }
1437 else {
1438 memcpy(env->scratch, fragment, bytes_read);
1439 skip(reader, bytes_read);
1440
1441 while (bytes_read < num_to_read) {
1442 fragment = peek(reader, &fragment_size);
1443 size_t n =
1444 min_t(size_t, fragment_size,
1445 num_to_read - bytes_read);
1446 memcpy((char *)(env->scratch) + bytes_read, fragment, n);
1447 bytes_read += n;
1448 skip(reader, n);
1449 }
1450 DCHECK_EQ(bytes_read, num_to_read);
1451 fragment = env->scratch;
1452 fragment_size = num_to_read;
1453 }
1454 if (fragment_size < num_to_read)
1455 return -EIO;
1456
1457 /* Get encoding table for compression */
1458 int table_size;
1459 u16 *table = get_hash_table(env, num_to_read, &table_size);
1460
1461 /* Compress input_fragment and append to dest */
1462 char *dest;
1463 dest = sink_peek(writer, rd_kafka_snappy_max_compressed_length(num_to_read));
1464 if (!dest) {
1465 /*
1466 * Need a scratch buffer for the output,
1467 * because the byte sink doesn't have enough
1468 * in one piece.
1469 */
1470 dest = env->scratch_output;
1471 }
1472 char *end = compress_fragment(fragment, fragment_size,
1473 dest, table, table_size);
1474 append(writer, dest, end - dest);
1475 written += (end - dest);
1476
1477 N -= num_to_read;
1478 skip(reader, pending_advance);
1479 }
1480
1481 err = 0;
1482out:
1483 return err;
1484}
1485
1486#ifdef SG
1487
1488int rd_kafka_snappy_compress_iov(struct snappy_env *env,
1489 const struct iovec *iov_in, size_t iov_in_cnt,
1490 size_t input_length,
1491 struct iovec *iov_out) {
1492 struct source reader = {
1493 .iov = (struct iovec *)iov_in,
1494 .iovlen = (int)iov_in_cnt,
1495 .total = input_length
1496 };
1497 struct sink writer = {
1498 .iov = iov_out,
1499 .iovlen = 1
1500 };
1501 int err = sn_compress(env, &reader, &writer);
1502
1503 iov_out->iov_len = writer.written;
1504
1505 return err;
1506}
1507EXPORT_SYMBOL(rd_kafka_snappy_compress_iov);
1508
1509/**
1510 * rd_kafka_snappy_compress - Compress a buffer using the snappy compressor.
1511 * @env: Preallocated environment
1512 * @input: Input buffer
1513 * @input_length: Length of input_buffer
1514 * @compressed: Output buffer for compressed data
1515 * @compressed_length: The real length of the output written here.
1516 *
1517 * Return 0 on success, otherwise an negative error code.
1518 *
1519 * The output buffer must be at least
1520 * rd_kafka_snappy_max_compressed_length(input_length) bytes long.
1521 *
1522 * Requires a preallocated environment from rd_kafka_snappy_init_env.
1523 * The environment does not keep state over individual calls
1524 * of this function, just preallocates the memory.
1525 */
1526int rd_kafka_snappy_compress(struct snappy_env *env,
1527 const char *input,
1528 size_t input_length,
1529 char *compressed, size_t *compressed_length)
1530{
1531 struct iovec iov_in = {
1532 .iov_base = (char *)input,
1533 .iov_len = input_length,
1534 };
1535 struct iovec iov_out = {
1536 .iov_base = compressed,
1537 .iov_len = 0xffffffff,
1538 };
1539 return rd_kafka_snappy_compress_iov(env,
1540 &iov_in, 1, input_length,
1541 &iov_out);
1542}
1543EXPORT_SYMBOL(rd_kafka_snappy_compress);
1544
1545int rd_kafka_snappy_uncompress_iov(struct iovec *iov_in, int iov_in_len,
1546 size_t input_len, char *uncompressed)
1547{
1548 struct source reader = {
1549 .iov = iov_in,
1550 .iovlen = iov_in_len,
1551 .total = input_len
1552 };
1553 struct writer output = {
1554 .base = uncompressed,
1555 .op = uncompressed
1556 };
1557 return internal_uncompress(&reader, &output, 0xffffffff);
1558}
1559EXPORT_SYMBOL(rd_kafka_snappy_uncompress_iov);
1560
1561/**
1562 * rd_kafka_snappy_uncompress - Uncompress a snappy compressed buffer
1563 * @compressed: Input buffer with compressed data
1564 * @n: length of compressed buffer
1565 * @uncompressed: buffer for uncompressed data
1566 *
1567 * The uncompressed data buffer must be at least
1568 * rd_kafka_snappy_uncompressed_length(compressed) bytes long.
1569 *
1570 * Return 0 on success, otherwise an negative error code.
1571 */
1572int rd_kafka_snappy_uncompress(const char *compressed, size_t n, char *uncompressed)
1573{
1574 struct iovec iov = {
1575 .iov_base = (char *)compressed,
1576 .iov_len = n
1577 };
1578 return rd_kafka_snappy_uncompress_iov(&iov, 1, n, uncompressed);
1579}
1580EXPORT_SYMBOL(rd_kafka_snappy_uncompress);
1581
1582
1583/**
1584 * @brief Decompress Snappy message with Snappy-java framing.
1585 *
1586 * @returns a malloced buffer with the uncompressed data, or NULL on failure.
1587 */
1588char *rd_kafka_snappy_java_uncompress (const char *inbuf, size_t inlen,
1589 size_t *outlenp,
1590 char *errstr, size_t errstr_size) {
1591 int pass;
1592 char *outbuf = NULL;
1593
1594 /**
1595 * Traverse all chunks in two passes:
1596 * pass 1: calculate total uncompressed length
1597 * pass 2: uncompress
1598 *
1599 * Each chunk is prefixed with 4: length */
1600
1601 for (pass = 1 ; pass <= 2 ; pass++) {
1602 ssize_t of = 0; /* inbuf offset */
1603 ssize_t uof = 0; /* outbuf offset */
1604
1605 while (of + 4 <= (ssize_t)inlen) {
1606 uint32_t clen; /* compressed length */
1607 size_t ulen; /* uncompressed length */
1608 int r;
1609
1610 memcpy(&clen, inbuf+of, 4);
1611 clen = be32toh(clen);
1612 of += 4;
1613
1614 if (unlikely(clen > inlen - of)) {
1615 rd_snprintf(errstr, errstr_size,
1616 "Invalid snappy-java chunk length "
1617 "%"PRId32" > %"PRIdsz
1618 " available bytes",
1619 clen, (ssize_t)inlen - of);
1620 return NULL;
1621 }
1622
1623 /* Acquire uncompressed length */
1624 if (unlikely(!rd_kafka_snappy_uncompressed_length(
1625 inbuf+of, clen, &ulen))) {
1626 rd_snprintf(errstr, errstr_size,
1627 "Failed to get length of "
1628 "(snappy-java framed) Snappy "
1629 "compressed payload "
1630 "(clen %"PRId32")",
1631 clen);
1632 return NULL;
1633 }
1634
1635 if (pass == 1) {
1636 /* pass 1: calculate total length */
1637 of += clen;
1638 uof += ulen;
1639 continue;
1640 }
1641
1642 /* pass 2: Uncompress to outbuf */
1643 if (unlikely((r = rd_kafka_snappy_uncompress(
1644 inbuf+of, clen, outbuf+uof)))) {
1645 rd_snprintf(errstr, errstr_size,
1646 "Failed to decompress Snappy-java "
1647 "framed payload of size %"PRId32
1648 ": %s",
1649 clen,
1650 rd_strerror(-r/*negative errno*/));
1651 rd_free(outbuf);
1652 return NULL;
1653 }
1654
1655 of += clen;
1656 uof += ulen;
1657 }
1658
1659 if (unlikely(of != (ssize_t)inlen)) {
1660 rd_snprintf(errstr, errstr_size,
1661 "%"PRIusz" trailing bytes in Snappy-java "
1662 "framed compressed data",
1663 inlen - of);
1664 if (outbuf)
1665 rd_free(outbuf);
1666 return NULL;
1667 }
1668
1669 if (pass == 1) {
1670 if (uof <= 0) {
1671 rd_snprintf(errstr, errstr_size,
1672 "Empty Snappy-java framed data");
1673 return NULL;
1674 }
1675
1676 /* Allocate memory for uncompressed data */
1677 outbuf = rd_malloc(uof);
1678 if (unlikely(!outbuf)) {
1679 rd_snprintf(errstr, errstr_size,
1680 "Failed to allocate memory "
1681 "(%"PRIdsz") for "
1682 "uncompressed Snappy data: %s",
1683 uof, rd_strerror(errno));
1684 return NULL;
1685 }
1686
1687 } else {
1688 /* pass 2 */
1689 *outlenp = uof;
1690 }
1691 }
1692
1693 return outbuf;
1694}
1695
1696
1697
1698#else
1699/**
1700 * rd_kafka_snappy_compress - Compress a buffer using the snappy compressor.
1701 * @env: Preallocated environment
1702 * @input: Input buffer
1703 * @input_length: Length of input_buffer
1704 * @compressed: Output buffer for compressed data
1705 * @compressed_length: The real length of the output written here.
1706 *
1707 * Return 0 on success, otherwise an negative error code.
1708 *
1709 * The output buffer must be at least
1710 * rd_kafka_snappy_max_compressed_length(input_length) bytes long.
1711 *
1712 * Requires a preallocated environment from rd_kafka_snappy_init_env.
1713 * The environment does not keep state over individual calls
1714 * of this function, just preallocates the memory.
1715 */
1716int rd_kafka_snappy_compress(struct snappy_env *env,
1717 const char *input,
1718 size_t input_length,
1719 char *compressed, size_t *compressed_length)
1720{
1721 struct source reader = {
1722 .ptr = input,
1723 .left = input_length
1724 };
1725 struct sink writer = {
1726 .dest = compressed,
1727 };
1728 int err = sn_compress(env, &reader, &writer);
1729
1730 /* Compute how many bytes were added */
1731 *compressed_length = (writer.dest - compressed);
1732 return err;
1733}
1734EXPORT_SYMBOL(rd_kafka_snappy_compress);
1735
1736/**
1737 * rd_kafka_snappy_uncompress - Uncompress a snappy compressed buffer
1738 * @compressed: Input buffer with compressed data
1739 * @n: length of compressed buffer
1740 * @uncompressed: buffer for uncompressed data
1741 *
1742 * The uncompressed data buffer must be at least
1743 * rd_kafka_snappy_uncompressed_length(compressed) bytes long.
1744 *
1745 * Return 0 on success, otherwise an negative error code.
1746 */
1747int rd_kafka_snappy_uncompress(const char *compressed, size_t n, char *uncompressed)
1748{
1749 struct source reader = {
1750 .ptr = compressed,
1751 .left = n
1752 };
1753 struct writer output = {
1754 .base = uncompressed,
1755 .op = uncompressed
1756 };
1757 return internal_uncompress(&reader, &output, 0xffffffff);
1758}
1759EXPORT_SYMBOL(rd_kafka_snappy_uncompress);
1760#endif
1761
1762static inline void clear_env(struct snappy_env *env)
1763{
1764 memset(env, 0, sizeof(*env));
1765}
1766
1767#ifdef SG
1768/**
1769 * rd_kafka_snappy_init_env_sg - Allocate snappy compression environment
1770 * @env: Environment to preallocate
1771 * @sg: Input environment ever does scather gather
1772 *
1773 * If false is passed to sg then multiple entries in an iovec
1774 * are not legal.
1775 * Returns 0 on success, otherwise negative errno.
1776 * Must run in process context.
1777 */
1778int rd_kafka_snappy_init_env_sg(struct snappy_env *env, bool sg)
1779{
1780 if (rd_kafka_snappy_init_env(env) < 0)
1781 goto error;
1782
1783 if (sg) {
1784 env->scratch = vmalloc(kblock_size);
1785 if (!env->scratch)
1786 goto error;
1787 env->scratch_output =
1788 vmalloc(rd_kafka_snappy_max_compressed_length(kblock_size));
1789 if (!env->scratch_output)
1790 goto error;
1791 }
1792 return 0;
1793error:
1794 rd_kafka_snappy_free_env(env);
1795 return -ENOMEM;
1796}
1797EXPORT_SYMBOL(rd_kafka_snappy_init_env_sg);
1798#endif
1799
1800/**
1801 * rd_kafka_snappy_init_env - Allocate snappy compression environment
1802 * @env: Environment to preallocate
1803 *
1804 * Passing multiple entries in an iovec is not allowed
1805 * on the environment allocated here.
1806 * Returns 0 on success, otherwise negative errno.
1807 * Must run in process context.
1808 */
1809int rd_kafka_snappy_init_env(struct snappy_env *env)
1810{
1811 clear_env(env);
1812 env->hash_table = vmalloc(sizeof(u16) * kmax_hash_table_size);
1813 if (!env->hash_table)
1814 return -ENOMEM;
1815 return 0;
1816}
1817EXPORT_SYMBOL(rd_kafka_snappy_init_env);
1818
1819/**
1820 * rd_kafka_snappy_free_env - Free an snappy compression environment
1821 * @env: Environment to free.
1822 *
1823 * Must run in process context.
1824 */
1825void rd_kafka_snappy_free_env(struct snappy_env *env)
1826{
1827 vfree(env->hash_table);
1828#ifdef SG
1829 vfree(env->scratch);
1830 vfree(env->scratch_output);
1831#endif
1832 clear_env(env);
1833}
1834EXPORT_SYMBOL(rd_kafka_snappy_free_env);
1835
1836#ifdef __GNUC__
1837#pragma GCC diagnostic pop /* -Wcast-align ignore */
1838#endif