1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_buf.h"
31#include "rdkafka_broker.h"
32
33void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf) {
34
35 switch (rkbuf->rkbuf_reqhdr.ApiKey)
36 {
37 case RD_KAFKAP_Metadata:
38 if (rkbuf->rkbuf_u.Metadata.topics)
39 rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
40 if (rkbuf->rkbuf_u.Metadata.reason)
41 rd_free(rkbuf->rkbuf_u.Metadata.reason);
42 if (rkbuf->rkbuf_u.Metadata.rko)
43 rd_kafka_op_reply(rkbuf->rkbuf_u.Metadata.rko,
44 RD_KAFKA_RESP_ERR__DESTROY);
45 if (rkbuf->rkbuf_u.Metadata.decr) {
46 /* Decrease metadata cache's full_.._sent state. */
47 mtx_lock(rkbuf->rkbuf_u.Metadata.decr_lock);
48 rd_kafka_assert(NULL,
49 (*rkbuf->rkbuf_u.Metadata.decr) > 0);
50 (*rkbuf->rkbuf_u.Metadata.decr)--;
51 mtx_unlock(rkbuf->rkbuf_u.Metadata.decr_lock);
52 }
53 break;
54
55 case RD_KAFKAP_Produce:
56 rd_kafka_msgbatch_destroy(&rkbuf->rkbuf_batch);
57 break;
58 }
59
60 if (rkbuf->rkbuf_response)
61 rd_kafka_buf_destroy(rkbuf->rkbuf_response);
62
63 rd_kafka_replyq_destroy(&rkbuf->rkbuf_replyq);
64 rd_kafka_replyq_destroy(&rkbuf->rkbuf_orig_replyq);
65
66 rd_buf_destroy(&rkbuf->rkbuf_buf);
67
68 if (rkbuf->rkbuf_rktp_vers)
69 rd_list_destroy(rkbuf->rkbuf_rktp_vers);
70
71 if (rkbuf->rkbuf_rkb)
72 rd_kafka_broker_destroy(rkbuf->rkbuf_rkb);
73
74 rd_refcnt_destroy(&rkbuf->rkbuf_refcnt);
75
76 rd_free(rkbuf);
77}
78
79
80
81/**
82 * @brief Pushes \p buf of size \p len as a new segment on the buffer.
83 *
84 * \p buf will NOT be freed by the buffer.
85 */
86void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
87 int allow_crc_calc, void (*free_cb) (void *)) {
88 rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);
89
90 if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
91 rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
92}
93
94
95
96/**
97 * @brief Create a new buffer with \p segcmt initial segments and \p size bytes
98 * of initial backing memory.
99 * The underlying buffer will grow as needed.
100 *
101 * If \p rk is non-NULL (typical case):
102 * Additional space for the Kafka protocol headers is inserted automatically.
103 */
104rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags) {
105 rd_kafka_buf_t *rkbuf;
106
107 rkbuf = rd_calloc(1, sizeof(*rkbuf));
108
109 rkbuf->rkbuf_flags = flags;
110
111 rd_buf_init(&rkbuf->rkbuf_buf, segcnt, size);
112 rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
113
114 return rkbuf;
115}
116
117
118/**
119 * @brief Create new request buffer with the request-header written (will
120 * need to be updated with Length, etc, later)
121 */
122rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
123 int segcnt, size_t size) {
124 rd_kafka_buf_t *rkbuf;
125
126 /* Make room for common protocol request headers */
127 size += RD_KAFKAP_REQHDR_SIZE +
128 RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);
129 segcnt += 1; /* headers */
130
131 rkbuf = rd_kafka_buf_new0(segcnt, size, 0);
132
133 rkbuf->rkbuf_rkb = rkb;
134 rd_kafka_broker_keep(rkb);
135
136 rkbuf->rkbuf_rel_timeout = rkb->rkb_rk->rk_conf.socket_timeout_ms;
137
138 rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;
139
140 /* Write request header, will be updated later. */
141 /* Length: updated later */
142 rd_kafka_buf_write_i32(rkbuf, 0);
143 /* ApiKey */
144 rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
145 /* ApiVersion: updated later */
146 rd_kafka_buf_write_i16(rkbuf, 0);
147 /* CorrId: updated later */
148 rd_kafka_buf_write_i32(rkbuf, 0);
149
150 /* ClientId */
151 rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);
152
153 return rkbuf;
154}
155
156
157
158
159/**
160 * @brief Create new read-only rkbuf shadowing a memory region.
161 *
162 * @remark \p free_cb (possibly NULL) will be used to free \p ptr when
163 * buffer refcount reaches 0.
164 * @remark the buffer may only be read from, not written to.
165 */
166rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
167 void (*free_cb) (void *)) {
168 rd_kafka_buf_t *rkbuf;
169
170 rkbuf = rd_calloc(1, sizeof(*rkbuf));
171
172 rkbuf->rkbuf_reqhdr.ApiKey = RD_KAFKAP_None;
173
174 rd_buf_init(&rkbuf->rkbuf_buf, 1, 0);
175 rd_buf_push(&rkbuf->rkbuf_buf, ptr, size, free_cb);
176
177 rkbuf->rkbuf_totlen = size;
178
179 /* Initialize reader slice */
180 rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
181
182 rd_refcnt_init(&rkbuf->rkbuf_refcnt, 1);
183
184 return rkbuf;
185}
186
187
188
189void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
190 TAILQ_INSERT_TAIL(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
191 rd_atomic32_add(&rkbufq->rkbq_cnt, 1);
192 if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Produce)
193 rd_atomic32_add(&rkbufq->rkbq_msg_cnt,
194 rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq));
195}
196
197void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf) {
198 TAILQ_REMOVE(&rkbufq->rkbq_bufs, rkbuf, rkbuf_link);
199 rd_kafka_assert(NULL, rd_atomic32_get(&rkbufq->rkbq_cnt) > 0);
200 rd_atomic32_sub(&rkbufq->rkbq_cnt, 1);
201 if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Produce)
202 rd_atomic32_sub(&rkbufq->rkbq_msg_cnt,
203 rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq));
204}
205
206void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
207 TAILQ_INIT(&rkbufq->rkbq_bufs);
208 rd_atomic32_init(&rkbufq->rkbq_cnt, 0);
209 rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
210}
211
212/**
213 * Concat all buffers from 'src' to tail of 'dst'
214 */
215void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
216 TAILQ_CONCAT(&dst->rkbq_bufs, &src->rkbq_bufs, rkbuf_link);
217 (void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
218 (void)rd_atomic32_add(&dst->rkbq_msg_cnt, rd_atomic32_get(&src->rkbq_msg_cnt));
219 rd_kafka_bufq_init(src);
220}
221
222/**
223 * Purge the wait-response queue.
224 * NOTE: 'rkbufq' must be a temporary queue and not one of rkb_waitresps
225 * or rkb_outbufs since buffers may be re-enqueued on those queues.
226 * 'rkbufq' needs to be bufq_init():ed before reuse after this call.
227 */
228void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
229 rd_kafka_bufq_t *rkbufq,
230 rd_kafka_resp_err_t err) {
231 rd_kafka_buf_t *rkbuf, *tmp;
232
233 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
234
235 rd_rkb_dbg(rkb, QUEUE, "BUFQ", "Purging bufq with %i buffers",
236 rd_atomic32_get(&rkbufq->rkbq_cnt));
237
238 TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
239 rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
240 }
241}
242
243
244/**
245 * @brief Update bufq for connection reset:
246 *
247 * - Purge connection-setup API requests from the queue.
248 * - Reset any partially sent buffer's offset. (issue #756)
249 *
250 * Request types purged:
251 * ApiVersion
252 * SaslHandshake
253 */
254void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
255 rd_kafka_bufq_t *rkbufq) {
256 rd_kafka_buf_t *rkbuf, *tmp;
257 rd_ts_t now = rd_clock();
258
259 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
260
261 rd_rkb_dbg(rkb, QUEUE, "BUFQ",
262 "Updating %d buffers on connection reset",
263 rd_atomic32_get(&rkbufq->rkbq_cnt));
264
265 TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
266 switch (rkbuf->rkbuf_reqhdr.ApiKey)
267 {
268 case RD_KAFKAP_ApiVersion:
269 case RD_KAFKAP_SaslHandshake:
270 rd_kafka_bufq_deq(rkbufq, rkbuf);
271 rd_kafka_buf_callback(rkb->rkb_rk, rkb,
272 RD_KAFKA_RESP_ERR__DESTROY,
273 NULL, rkbuf);
274 break;
275 default:
276 /* Reset buffer send position */
277 rd_slice_seek(&rkbuf->rkbuf_reader, 0);
278 /* Reset timeout */
279 rd_kafka_buf_calc_timeout(rkb->rkb_rk, rkbuf, now);
280 break;
281 }
282 }
283}
284
285
286void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
287 rd_kafka_bufq_t *rkbq) {
288 rd_kafka_buf_t *rkbuf;
289 int cnt = rd_kafka_bufq_cnt(rkbq);
290 rd_ts_t now;
291
292 if (!cnt)
293 return;
294
295 now = rd_clock();
296
297 rd_rkb_dbg(rkb, BROKER, fac, "bufq with %d buffer(s):", cnt);
298
299 TAILQ_FOREACH(rkbuf, &rkbq->rkbq_bufs, rkbuf_link) {
300 rd_rkb_dbg(rkb, BROKER, fac,
301 " Buffer %s (%"PRIusz" bytes, corrid %"PRId32", "
302 "connid %d, prio %d, retry %d in %lldms, "
303 "timeout in %lldms)",
304 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
305 rkbuf->rkbuf_totlen, rkbuf->rkbuf_corrid,
306 rkbuf->rkbuf_connid, rkbuf->rkbuf_prio,
307 rkbuf->rkbuf_retries,
308 rkbuf->rkbuf_ts_retry ?
309 (rkbuf->rkbuf_ts_retry - now) / 1000LL : 0,
310 rkbuf->rkbuf_ts_timeout ?
311 (rkbuf->rkbuf_ts_timeout - now) / 1000LL : 0);
312 }
313}
314
315
316
317
318/**
319 * @brief Calculate the effective timeout for a request attempt
320 */
321void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
322 rd_ts_t now) {
323 if (likely(rkbuf->rkbuf_rel_timeout)) {
324 /* Default:
325 * Relative timeout, set request timeout to
326 * to now + rel timeout. */
327 rkbuf->rkbuf_ts_timeout = now + rkbuf->rkbuf_rel_timeout * 1000;
328 } else if (!rkbuf->rkbuf_force_timeout) {
329 /* Use absolute timeout, limited by socket.timeout.ms */
330 rd_ts_t sock_timeout = now +
331 rk->rk_conf.socket_timeout_ms * 1000;
332
333 rkbuf->rkbuf_ts_timeout =
334 RD_MIN(sock_timeout, rkbuf->rkbuf_abs_timeout);
335 } else {
336 /* Use absolue timeout without limit. */
337 rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_abs_timeout;
338 }
339}
340
341/**
342 * Retry failed request, if permitted.
343 * @remark \p rkb may be NULL
344 * @remark the retry count is only increased for actually transmitted buffers,
345 * if there is a failure while the buffers lingers in the output queue
346 * (rkb_outbufs) then the retry counter is not increased.
347 * Returns 1 if the request was scheduled for retry, else 0.
348 */
349int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
350 int incr_retry = rd_kafka_buf_was_sent(rkbuf) ? 1 : 0;
351
352 if (unlikely(!rkb ||
353 rkb->rkb_source == RD_KAFKA_INTERNAL ||
354 rd_kafka_terminating(rkb->rkb_rk) ||
355 rkbuf->rkbuf_retries + incr_retry >
356 rkb->rkb_rk->rk_conf.max_retries))
357 return 0;
358
359 /* Absolute timeout, check for expiry. */
360 if (rkbuf->rkbuf_abs_timeout &&
361 rkbuf->rkbuf_abs_timeout < rd_clock())
362 return 0; /* Expired */
363
364 /* Try again */
365 rkbuf->rkbuf_ts_sent = 0;
366 rkbuf->rkbuf_ts_timeout = 0; /* Will be updated in calc_timeout() */
367 rkbuf->rkbuf_retries += incr_retry;
368 rd_kafka_buf_keep(rkbuf);
369 rd_kafka_broker_buf_retry(rkb, rkbuf);
370 return 1;
371}
372
373
374/**
375 * @brief Handle RD_KAFKA_OP_RECV_BUF.
376 */
377void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
378 rd_kafka_buf_t *request, *response;
379
380 request = rko->rko_u.xbuf.rkbuf;
381 rko->rko_u.xbuf.rkbuf = NULL;
382
383 /* NULL on op_destroy() */
384 if (request->rkbuf_replyq.q) {
385 int32_t version = request->rkbuf_replyq.version;
386 /* Current queue usage is done, but retain original replyq for
387 * future retries, stealing
388 * the current reference. */
389 request->rkbuf_orig_replyq = request->rkbuf_replyq;
390 rd_kafka_replyq_clear(&request->rkbuf_replyq);
391 /* Callback might need to version check so we retain the
392 * version across the clear() call which clears it. */
393 request->rkbuf_replyq.version = version;
394 }
395
396 if (!request->rkbuf_cb) {
397 rd_kafka_buf_destroy(request);
398 return;
399 }
400
401 /* Let buf_callback() do destroy()s */
402 response = request->rkbuf_response; /* May be NULL */
403 request->rkbuf_response = NULL;
404
405 rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
406 request->rkbuf_rkb, err,
407 response, request);
408}
409
410
411
412/**
413 * Call request.rkbuf_cb(), but:
414 * - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
415 * with op type RD_KAFKA_OP_RECV_BUF.
416 * - else call rkbuf_cb().
417 *
418 * \p response may be NULL.
419 *
420 * Will decrease refcount for both response and request, eventually.
421 *
422 * The decision to retry, and the call to buf_retry(), is delegated
423 * to the buffer's response callback.
424 */
425void rd_kafka_buf_callback (rd_kafka_t *rk,
426 rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
427 rd_kafka_buf_t *response, rd_kafka_buf_t *request){
428
429
430 if (err != RD_KAFKA_RESP_ERR__DESTROY && request->rkbuf_replyq.q) {
431 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
432
433 rd_kafka_assert(NULL, !request->rkbuf_response);
434 request->rkbuf_response = response;
435
436 /* Increment refcnt since rko_rkbuf will be decref:ed
437 * if replyq_enq() fails and we dont want the rkbuf gone in that
438 * case. */
439 rd_kafka_buf_keep(request);
440 rko->rko_u.xbuf.rkbuf = request;
441
442 rko->rko_err = err;
443
444 /* Copy original replyq for future retries, with its own
445 * queue reference. */
446 rd_kafka_replyq_copy(&request->rkbuf_orig_replyq,
447 &request->rkbuf_replyq);
448
449 rd_kafka_replyq_enq(&request->rkbuf_replyq, rko, 0);
450
451 rd_kafka_buf_destroy(request); /* from keep above */
452 return;
453 }
454
455 if (request->rkbuf_cb)
456 request->rkbuf_cb(rk, rkb, err, response, request,
457 request->rkbuf_opaque);
458
459 rd_kafka_buf_destroy(request);
460 if (response)
461 rd_kafka_buf_destroy(response);
462}
463
464