1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_zstd.h"
31
32#if WITH_ZSTD_STATIC
33/* Enable advanced/unstable API for initCStream_srcSize */
34#define ZSTD_STATIC_LINKING_ONLY
35#endif
36
37#include <zstd.h>
38#include <zstd_errors.h>
39
40rd_kafka_resp_err_t
41rd_kafka_zstd_decompress (rd_kafka_broker_t *rkb,
42 char *inbuf, size_t inlen,
43 void **outbuf, size_t *outlenp) {
44 unsigned long long out_bufsize = ZSTD_getFrameContentSize(inbuf, inlen);
45
46 switch (out_bufsize) {
47 case ZSTD_CONTENTSIZE_UNKNOWN:
48 /* Decompressed size cannot be determined, make a guess */
49 out_bufsize = inlen * 2;
50 break;
51 case ZSTD_CONTENTSIZE_ERROR:
52 /* Error calculating frame content size */
53 rd_rkb_dbg(rkb, MSG, "ZSTD",
54 "Unable to begin ZSTD decompression "
55 "(out buffer is %llu bytes): %s",
56 out_bufsize, "Error in determining frame size");
57 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
58 default:
59 break;
60 }
61
62 /* Increase output buffer until it can fit the entire result,
63 * capped by message.max.bytes */
64 while (out_bufsize <=
65 (unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size) {
66 size_t ret;
67 char *decompressed;
68
69 decompressed = rd_malloc((size_t)out_bufsize);
70 if (!decompressed) {
71 rd_rkb_dbg(rkb, MSG, "ZSTD",
72 "Unable to allocate output buffer "
73 "(%llu bytes for %"PRIusz
74 " compressed bytes): %s",
75 out_bufsize, inlen, rd_strerror(errno));
76 return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
77 }
78
79
80 ret = ZSTD_decompress(decompressed, (size_t)out_bufsize,
81 inbuf, inlen);
82 if (!ZSTD_isError(ret)) {
83 *outlenp = ret;
84 *outbuf = decompressed;
85 return RD_KAFKA_RESP_ERR_NO_ERROR;
86 }
87
88 rd_free(decompressed);
89
90 /* Check if the destination size is too small */
91 if (ZSTD_getErrorCode(ret) == ZSTD_error_dstSize_tooSmall) {
92
93 /* Grow quadratically */
94 out_bufsize += RD_MAX(out_bufsize * 2, 4000);
95
96 rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1);
97
98 } else {
99 /* Fail on any other error */
100 rd_rkb_dbg(rkb, MSG, "ZSTD",
101 "Unable to begin ZSTD decompression "
102 "(out buffer is %llu bytes): %s",
103 out_bufsize, ZSTD_getErrorName(ret));
104 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
105 }
106 }
107
108 rd_rkb_dbg(rkb, MSG, "ZSTD",
109 "Unable to decompress ZSTD "
110 "(input buffer %"PRIusz", output buffer %llu): "
111 "output would exceed receive.message.max.bytes (%d)",
112 inlen, out_bufsize, rkb->rkb_rk->rk_conf.max_msg_size);
113
114 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
115}
116
117
118rd_kafka_resp_err_t
119rd_kafka_zstd_compress (rd_kafka_broker_t *rkb, int comp_level,
120 rd_slice_t *slice, void **outbuf, size_t *outlenp) {
121 ZSTD_CStream *cctx;
122 size_t r;
123 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
124 size_t len = rd_slice_remains(slice);
125 ZSTD_outBuffer out;
126 ZSTD_inBuffer in;
127
128 *outbuf = NULL;
129 out.pos = 0;
130 out.size = ZSTD_compressBound(len);
131 out.dst = rd_malloc(out.size);
132 if (!out.dst) {
133 rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
134 "Unable to allocate output buffer "
135 "(%"PRIusz" bytes): %s",
136 out.size, rd_strerror(errno));
137 return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
138 }
139
140
141 cctx = ZSTD_createCStream();
142 if (!cctx) {
143 rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
144 "Unable to create ZSTD compression context");
145 err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
146 goto done;
147 }
148
149#if defined(WITH_ZSTD_STATIC) && ZSTD_VERSION_NUMBER >= (1*100*100+2*100+1) /* v1.2.1 */
150 r = ZSTD_initCStream_srcSize(cctx, comp_level, len);
151#else
152 /* libzstd not linked statically (or zstd version < 1.2.1):
153 * decompression in consumer may be more costly due to
154 * decompressed size not included in header by librdkafka producer */
155 r = ZSTD_initCStream(cctx, comp_level);
156#endif
157 if (ZSTD_isError(r)) {
158 rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
159 "Unable to begin ZSTD compression "
160 "(out buffer is %"PRIusz" bytes): %s",
161 out.size, ZSTD_getErrorName(r));
162 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
163 goto done;
164 }
165
166 while ((in.size = rd_slice_reader(slice, &in.src))) {
167 in.pos = 0;
168 r = ZSTD_compressStream(cctx, &out, &in);
169 if (unlikely(ZSTD_isError(r))) {
170 rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
171 "ZSTD compression failed "
172 "(at of %"PRIusz" bytes, with "
173 "%"PRIusz" bytes remaining in out buffer): "
174 "%s",
175 in.size, out.size - out.pos,
176 ZSTD_getErrorName(r));
177 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
178 goto done;
179 }
180
181 /* No space left in output buffer,
182 * but input isn't fully consumed */
183 if (in.pos < in.size) {
184 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
185 goto done;
186 }
187 }
188
189 if (rd_slice_remains(slice) != 0) {
190 rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
191 "Failed to finalize ZSTD compression "
192 "of %"PRIusz" bytes: %s",
193 len, "Unexpected trailing data");
194 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
195 goto done;
196 }
197
198 r = ZSTD_endStream(cctx, &out);
199 if (unlikely(ZSTD_isError(r) || r > 0)) {
200 rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
201 "Failed to finalize ZSTD compression "
202 "of %"PRIusz" bytes: %s",
203 len, ZSTD_getErrorName(r));
204 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
205 goto done;
206 }
207
208 *outbuf = out.dst;
209 *outlenp = out.pos;
210
211 done:
212 if (cctx)
213 ZSTD_freeCStream(cctx);
214
215 if (err)
216 rd_free(out.dst);
217
218 return err;
219
220}
221