1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#ifndef _RDKAFKA_BUF_H_
29#define _RDKAFKA_BUF_H_
30
31#include "rdkafka_int.h"
32#include "rdcrc32.h"
33#include "rdlist.h"
34#include "rdbuf.h"
35#include "rdkafka_msgbatch.h"
36
37typedef struct rd_kafka_broker_s rd_kafka_broker_t;
38
39#define RD_KAFKA_HEADERS_IOV_CNT 2
40
41
42/**
43 * Temporary buffer with memory aligned writes to accommodate
44 * effective and platform safe struct writes.
45 */
46typedef struct rd_tmpabuf_s {
47 size_t size;
48 size_t of;
49 char *buf;
50 int failed;
51 int assert_on_fail;
52} rd_tmpabuf_t;
53
54/**
55 * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
56 */
57static RD_UNUSED void
58rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
59 tab->buf = rd_malloc(size);
60 tab->size = size;
61 tab->of = 0;
62 tab->failed = 0;
63 tab->assert_on_fail = assert_on_fail;
64}
65
66/**
67 * @brief Free memory allocated by tmpabuf
68 */
69static RD_UNUSED void
70rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
71 rd_free(tab->buf);
72}
73
74/**
75 * @returns 1 if a previous operation failed.
76 */
77static RD_UNUSED RD_INLINE int
78rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
79 return tab->failed;
80}
81
82/**
83 * @brief Allocate \p size bytes for writing, returning an aligned pointer
84 * to the memory.
85 * @returns the allocated pointer (within the tmpabuf) on success or
86 * NULL if the requested number of bytes + alignment is not available
87 * in the tmpabuf.
88 */
89static RD_UNUSED void *
90rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
91 void *ptr;
92
93 if (unlikely(tab->failed))
94 return NULL;
95
96 if (unlikely(tab->of + size > tab->size)) {
97 if (tab->assert_on_fail) {
98 fprintf(stderr,
99 "%s: %s:%d: requested size %zd + %zd > %zd\n",
100 __FUNCTION__, func, line, tab->of, size,
101 tab->size);
102 assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
103 }
104 return NULL;
105 }
106
107 ptr = (void *)(tab->buf + tab->of);
108 tab->of += RD_ROUNDUP(size, 8);
109
110 return ptr;
111}
112
113#define rd_tmpabuf_alloc(tab,size) \
114 rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
115
116/**
117 * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
118 *
119 * @returns the allocated and written-to pointer (within the tmpabuf) on success
120 * or NULL if the requested number of bytes + alignment is not available
121 * in the tmpabuf.
122 */
123static RD_UNUSED void *
124rd_tmpabuf_write0 (const char *func, int line,
125 rd_tmpabuf_t *tab, const void *buf, size_t size) {
126 void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
127
128 if (ptr)
129 memcpy(ptr, buf, size);
130
131 return ptr;
132}
133#define rd_tmpabuf_write(tab,buf,size) \
134 rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
135
136
137/**
138 * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
139 */
140static RD_UNUSED char *
141rd_tmpabuf_write_str0 (const char *func, int line,
142 rd_tmpabuf_t *tab, const char *str) {
143 return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
144}
145#define rd_tmpabuf_write_str(tab,str) \
146 rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
147
148
149
150/**
151 * @name Read buffer interface
152 *
153 * Memory reading helper macros to be used when parsing network responses.
154 *
155 * Assumptions:
156 * - an 'err_parse:' goto-label must be available for error bailouts,
157 * the error code will be set in rkbuf->rkbuf_err
158 * - local `int log_decode_errors` variable set to the logging level
159 * to log parse errors (or 0 to turn off logging).
160 */
161
162#define rd_kafka_buf_parse_fail(rkbuf,...) do { \
163 if (log_decode_errors > 0) { \
164 rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
165 rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
166 "PROTOERR", \
167 "Protocol parse failure " \
168 "at %"PRIusz"/%"PRIusz" (%s:%i) " \
169 "(incorrect broker.version.fallback?)", \
170 rd_slice_offset(&rkbuf->rkbuf_reader), \
171 rd_slice_size(&rkbuf->rkbuf_reader), \
172 __FUNCTION__, __LINE__); \
173 rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
174 "PROTOERR", __VA_ARGS__); \
175 } \
176 (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
177 goto err_parse; \
178 } while (0)
179
180/**
181 * @name Fail buffer reading due to buffer underflow.
182 */
183#define rd_kafka_buf_underflow_fail(rkbuf,wantedlen,...) do { \
184 if (log_decode_errors > 0) { \
185 rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
186 char __tmpstr[256]; \
187 rd_snprintf(__tmpstr, sizeof(__tmpstr), \
188 ": " __VA_ARGS__); \
189 if (strlen(__tmpstr) == 2) __tmpstr[0] = '\0'; \
190 rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
191 "PROTOUFLOW", \
192 "Protocol read buffer underflow " \
193 "at %"PRIusz"/%"PRIusz" (%s:%i): " \
194 "expected %"PRIusz" bytes > " \
195 "%"PRIusz" remaining bytes (%s)%s", \
196 rd_slice_offset(&rkbuf->rkbuf_reader), \
197 rd_slice_size(&rkbuf->rkbuf_reader), \
198 __FUNCTION__, __LINE__, \
199 wantedlen, \
200 rd_slice_remains(&rkbuf->rkbuf_reader), \
201 rkbuf->rkbuf_uflow_mitigation ? \
202 rkbuf->rkbuf_uflow_mitigation : \
203 "incorrect broker.version.fallback?", \
204 __tmpstr); \
205 } \
206 (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__UNDERFLOW; \
207 goto err_parse; \
208 } while (0)
209
210
211/**
212 * Returns the number of remaining bytes available to read.
213 */
214#define rd_kafka_buf_read_remain(rkbuf) \
215 rd_slice_remains(&(rkbuf)->rkbuf_reader)
216
217/**
218 * Checks that at least 'len' bytes remain to be read in buffer, else fails.
219 */
220#define rd_kafka_buf_check_len(rkbuf,len) do { \
221 size_t __len0 = (size_t)(len); \
222 if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
223 rd_kafka_buf_underflow_fail(rkbuf, __len0); \
224 } \
225 } while (0)
226
227/**
228 * Skip (as in read and ignore) the next 'len' bytes.
229 */
230#define rd_kafka_buf_skip(rkbuf, len) do { \
231 size_t __len1 = (size_t)(len); \
232 if (__len1 && \
233 !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
234 rd_kafka_buf_check_len(rkbuf, __len1); \
235 } while (0)
236
237/**
238 * Skip (as in read and ignore) up to fixed position \p pos.
239 */
240#define rd_kafka_buf_skip_to(rkbuf, pos) do { \
241 size_t __len1 = (size_t)(pos) - \
242 rd_slice_offset(&(rkbuf)->rkbuf_reader); \
243 if (__len1 && \
244 !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
245 rd_kafka_buf_check_len(rkbuf, __len1); \
246 } while (0)
247
248
249
250/**
251 * Read 'len' bytes and copy to 'dstptr'
252 */
253#define rd_kafka_buf_read(rkbuf,dstptr,len) do { \
254 size_t __len2 = (size_t)(len); \
255 if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2)) \
256 rd_kafka_buf_check_len(rkbuf, __len2); \
257 } while (0)
258
259
260/**
261 * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
262 * without affecting the current reader position.
263 */
264#define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do { \
265 size_t __len2 = (size_t)(len); \
266 if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset, \
267 dstptr, __len2)) \
268 rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
269 } while (0)
270
271
272/**
273 * Read a 16,32,64-bit integer and store it in 'dstptr'
274 */
275#define rd_kafka_buf_read_i64(rkbuf,dstptr) do { \
276 int64_t _v; \
277 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
278 *(dstptr) = be64toh(_v); \
279 } while (0)
280
281#define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do { \
282 int64_t _v; \
283 rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \
284 *(dstptr) = be64toh(_v); \
285 } while (0)
286
287#define rd_kafka_buf_read_i32(rkbuf,dstptr) do { \
288 int32_t _v; \
289 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
290 *(dstptr) = be32toh(_v); \
291 } while (0)
292
293/* Same as .._read_i32 but does a direct assignment.
294 * dst is assumed to be a scalar, not pointer. */
295#define rd_kafka_buf_read_i32a(rkbuf, dst) do { \
296 int32_t _v; \
297 rd_kafka_buf_read(rkbuf, &_v, 4); \
298 dst = (int32_t) be32toh(_v); \
299 } while (0)
300
301#define rd_kafka_buf_read_i16(rkbuf,dstptr) do { \
302 int16_t _v; \
303 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
304 *(dstptr) = (int16_t)be16toh(_v); \
305 } while (0)
306
307
308#define rd_kafka_buf_read_i16a(rkbuf, dst) do { \
309 int16_t _v; \
310 rd_kafka_buf_read(rkbuf, &_v, 2); \
311 dst = (int16_t)be16toh(_v); \
312 } while (0)
313
314#define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
315
316#define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
317
318#define rd_kafka_buf_read_bool(rkbuf, dstptr) do { \
319 int8_t _v; \
320 rd_bool_t *_dst = dstptr; \
321 rd_kafka_buf_read(rkbuf, &_v, 1); \
322 *_dst = (rd_bool_t)_v; \
323 } while (0)
324
325
326/**
327 * @brief Read varint and store in int64_t \p dst
328 */
329#define rd_kafka_buf_read_varint(rkbuf,dst) do { \
330 int64_t _v; \
331 size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
332 if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
333 rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
334 "varint parsing failed");\
335 *(dst) = _v; \
336 } while (0)
337
338/* Read Kafka String representation (2+N).
339 * The kstr data will be updated to point to the rkbuf. */
340#define rd_kafka_buf_read_str(rkbuf, kstr) do { \
341 int _klen; \
342 rd_kafka_buf_read_i16a(rkbuf, (kstr)->len); \
343 _klen = RD_KAFKAP_STR_LEN(kstr); \
344 if (RD_KAFKAP_STR_IS_NULL(kstr)) \
345 (kstr)->str = NULL; \
346 else if (!((kstr)->str = \
347 rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
348 _klen))) \
349 rd_kafka_buf_check_len(rkbuf, _klen); \
350 } while (0)
351
352/* Read Kafka String representation (2+N) and write it to the \p tmpabuf
353 * with a trailing nul byte. */
354#define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do { \
355 rd_kafkap_str_t _kstr; \
356 size_t _slen; \
357 char *_dst; \
358 rd_kafka_buf_read_str(rkbuf, &_kstr); \
359 _slen = RD_KAFKAP_STR_LEN(&_kstr); \
360 if (!(_dst = \
361 rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1))) \
362 rd_kafka_buf_parse_fail( \
363 rkbuf, \
364 "Not enough room in tmpabuf: " \
365 "%"PRIusz"+%"PRIusz \
366 " > %"PRIusz, \
367 (tmpabuf)->of, _slen+1, (tmpabuf)->size); \
368 _dst[_slen] = '\0'; \
369 dst = (void *)_dst; \
370 } while (0)
371
372/**
373 * Skip a string.
374 */
375#define rd_kafka_buf_skip_str(rkbuf) do { \
376 int16_t _slen; \
377 rd_kafka_buf_read_i16(rkbuf, &_slen); \
378 rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \
379 } while (0)
380
381/* Read Kafka Bytes representation (4+N).
382 * The 'kbytes' will be updated to point to rkbuf data */
383#define rd_kafka_buf_read_bytes(rkbuf, kbytes) do { \
384 int _klen; \
385 rd_kafka_buf_read_i32a(rkbuf, _klen); \
386 (kbytes)->len = _klen; \
387 if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
388 (kbytes)->data = NULL; \
389 (kbytes)->len = 0; \
390 } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
391 (kbytes)->data = ""; \
392 else if (!((kbytes)->data = \
393 rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
394 _klen))) \
395 rd_kafka_buf_check_len(rkbuf, _klen); \
396 } while (0)
397
398
399/**
400 * @brief Read \p size bytes from buffer, setting \p *ptr to the start
401 * of the memory region.
402 */
403#define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do { \
404 size_t _klen = size; \
405 if (!(*(ptr) = (void *) \
406 rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
407 rd_kafka_buf_check_len(rkbuf, _klen); \
408 } while (0)
409
410
411/**
412 * @brief Read varint-lengted Kafka Bytes representation
413 */
414#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \
415 int64_t _len2; \
416 size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
417 &_len2); \
418 if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
419 rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
420 "varint parsing failed"); \
421 (kbytes)->len = (int32_t)_len2; \
422 if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
423 (kbytes)->data = NULL; \
424 (kbytes)->len = 0; \
425 } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
426 (kbytes)->data = ""; \
427 else if (!((kbytes)->data = \
428 rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
429 (size_t)_len2))) \
430 rd_kafka_buf_check_len(rkbuf, _len2); \
431 } while (0)
432
433
434/**
435 * @brief Read throttle_time_ms (i32) from response and pass the value
436 * to the throttle handling code.
437 */
438#define rd_kafka_buf_read_throttle_time(rkbuf) do { \
439 int32_t _throttle_time_ms; \
440 rd_kafka_buf_read_i32(rkbuf, &_throttle_time_ms); \
441 rd_kafka_op_throttle_time((rkbuf)->rkbuf_rkb, \
442 (rkbuf)->rkbuf_rkb->rkb_rk->rk_rep, \
443 _throttle_time_ms); \
444 } while (0)
445
446
447/**
448 * Response handling callback.
449 *
450 * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
451 * which indicates that some entity is terminating (rd_kafka_t, broker,
452 * toppar, queue, etc) and the callback may not be called in the
453 * correct thread. In this case the callback must perform just
454 * the most minimal cleanup and dont trigger any other operations.
455 *
456 * NOTE: rkb, reply and request may be NULL, depending on error situation.
457 */
458typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
459 rd_kafka_broker_t *rkb,
460 rd_kafka_resp_err_t err,
461 rd_kafka_buf_t *reply,
462 rd_kafka_buf_t *request,
463 void *opaque);
464
465struct rd_kafka_buf_s { /* rd_kafka_buf_t */
466 TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
467
468 int32_t rkbuf_corrid;
469
470 rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */
471
472 int rkbuf_flags; /* RD_KAFKA_OP_F */
473
474 rd_kafka_prio_t rkbuf_prio; /**< Request priority */
475
476 rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */
477 rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */
478
479 int rkbuf_connid; /* broker connection id (used when buffer
480 * was partially sent). */
481 size_t rkbuf_totlen; /* recv: total expected length,
482 * send: not used */
483
484 rd_crc32_t rkbuf_crc; /* Current CRC calculation */
485
486 struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header.
487 * These fields are encoded
488 * and written to output buffer
489 * on buffer finalization.
490 * Note:
491 * The request's
492 * reqhdr is copied to the
493 * response's reqhdr as a
494 * convenience. */
495 struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header.
496 * Decoded fields are copied
497 * here from the buffer
498 * to provide an ease-of-use
499 * interface to the header */
500
501 int32_t rkbuf_expected_size; /* expected size of message */
502
503 rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */
504 rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used
505 * for retries from inside
506 * the rkbuf_cb() callback
507 * since rkbuf_replyq will
508 * have been reset. */
509 rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */
510 struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */
511
512 struct rd_kafka_broker_s *rkbuf_rkb;
513
514 rd_refcnt_t rkbuf_refcnt;
515 void *rkbuf_opaque;
516
517 int rkbuf_retries; /* Retries so far. */
518#define RD_KAFKA_BUF_NO_RETRIES 1000000 /* Do not retry */
519
520 int rkbuf_features; /* Required feature(s) that must be
521 * supported by broker. */
522
523 rd_ts_t rkbuf_ts_enq;
524 rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission,
525 * after response: RTT. */
526
527 /* Request timeouts:
528 * rkbuf_ts_timeout is the effective absolute request timeout used
529 * by the timeout scanner to see if a request has timed out.
530 * It is set when a request is enqueued on the broker transmit
531 * queue based on the relative or absolute timeout:
532 *
533 * rkbuf_rel_timeout is the per-request-transmit relative timeout,
534 * this value is reused for each sub-sequent retry of a request.
535 *
536 * rkbuf_abs_timeout is the absolute request timeout, spanning
537 * all retries.
538 * This value is effectively limited by socket.timeout.ms for
539 * each transmission, but the absolute timeout for a request's
540 * lifetime is the absolute value.
541 *
542 * Use rd_kafka_buf_set_timeout() to set a relative timeout
543 * that will be reused on retry,
544 * or rd_kafka_buf_set_abs_timeout() to set a fixed absolute timeout
545 * for the case where the caller knows the request will be
546 * semantically outdated when that absolute time expires, such as for
547 * session.timeout.ms-based requests.
548 *
549 * The decision to retry a request is delegated to the rkbuf_cb
550 * response callback, which should use rd_kafka_err_action()
551 * and check the return actions for RD_KAFKA_ERR_ACTION_RETRY to be set
552 * and then call rd_kafka_buf_retry().
553 * rd_kafka_buf_retry() will enqueue the request on the rkb_retrybufs
554 * queue with a backoff time of retry.backoff.ms.
555 * The rkb_retrybufs queue is served by the broker thread's timeout
556 * scanner.
557 * @warning rkb_retrybufs is NOT purged on broker down.
558 */
559 rd_ts_t rkbuf_ts_timeout; /* Request timeout (absolute time). */
560 rd_ts_t rkbuf_abs_timeout;/* Absolute timeout for request, including
561 * retries.
562 * Mutually exclusive with rkbuf_rel_timeout*/
563 int rkbuf_rel_timeout;/* Relative timeout (ms), used for retries.
564 * Defaults to socket.timeout.ms.
565 * Mutually exclusive with rkbuf_abs_timeout*/
566 rd_bool_t rkbuf_force_timeout; /**< Force request timeout to be
567 * remaining abs_timeout regardless
568 * of socket.timeout.ms. */
569
570
571 int64_t rkbuf_offset; /* Used by OffsetCommit */
572
573 rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map.
574 * Used by FetchRequest. */
575
576 rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */
577
578 union {
579 struct {
580 rd_list_t *topics; /* Requested topics (char *) */
581 char *reason; /* Textual reason */
582 rd_kafka_op_t *rko; /* Originating rko with replyq
583 * (if any) */
584 int all_topics; /* Full/All topics requested */
585
586 int *decr; /* Decrement this integer by one
587 * when request is complete:
588 * typically points to metadata
589 * cache's full_.._sent.
590 * Will be performed with
591 * decr_lock held. */
592 mtx_t *decr_lock;
593
594 } Metadata;
595 struct {
596 rd_kafka_msgbatch_t batch; /**< MessageSet/batch */
597 } Produce;
598 } rkbuf_u;
599
600#define rkbuf_batch rkbuf_u.Produce.batch
601
602 const char *rkbuf_uflow_mitigation; /**< Buffer read underflow
603 * human readable mitigation
604 * string (const memory).
605 * This is used to hint the
606 * user why the underflow
607 * might have occurred, which
608 * depends on request type. */
609};
610
611
612/**
613 * @returns true if buffer has been sent on wire, else 0.
614 */
615#define rd_kafka_buf_was_sent(rkbuf) \
616 ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_SENT)
617
618typedef struct rd_kafka_bufq_s {
619 TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
620 rd_atomic32_t rkbq_cnt;
621 rd_atomic32_t rkbq_msg_cnt;
622} rd_kafka_bufq_t;
623
624#define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
625
626/**
627 * @brief Set buffer's request timeout to relative \p timeout_ms measured
628 * from the time the buffer is sent on the underlying socket.
629 *
630 * @param now Reuse current time from existing rd_clock() var, else 0.
631 *
632 * The relative timeout value is reused upon request retry.
633 */
634static RD_INLINE void
635rd_kafka_buf_set_timeout (rd_kafka_buf_t *rkbuf, int timeout_ms, rd_ts_t now) {
636 if (!now)
637 now = rd_clock();
638 rkbuf->rkbuf_rel_timeout = timeout_ms;
639 rkbuf->rkbuf_abs_timeout = 0;
640}
641
642
643/**
644 * @brief Calculate the effective timeout for a request attempt
645 */
646void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
647 rd_ts_t now);
648
649
650/**
651 * @brief Set buffer's request timeout to relative \p timeout_ms measured
652 * from \p now.
653 *
654 * @param now Reuse current time from existing rd_clock() var, else 0.
655 * @param force If true: force request timeout to be same as remaining
656 * abs timeout, regardless of socket.timeout.ms.
657 * If false: cap each request timeout to socket.timeout.ms.
658 *
659 * The remaining time is used as timeout for request retries.
660 */
661static RD_INLINE void
662rd_kafka_buf_set_abs_timeout0 (rd_kafka_buf_t *rkbuf, int timeout_ms,
663 rd_ts_t now, rd_bool_t force) {
664 if (!now)
665 now = rd_clock();
666 rkbuf->rkbuf_rel_timeout = 0;
667 rkbuf->rkbuf_abs_timeout = now + ((rd_ts_t)timeout_ms * 1000);
668 rkbuf->rkbuf_force_timeout = force;
669}
670
671#define rd_kafka_buf_set_abs_timeout(rkbuf,timeout_ms,now) \
672 rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_false)
673
674
675#define rd_kafka_buf_set_abs_timeout_force(rkbuf,timeout_ms,now) \
676 rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_true)
677
678
679#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
680#define rd_kafka_buf_destroy(rkbuf) \
681 rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt, \
682 rd_kafka_buf_destroy_final(rkbuf))
683
684void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
685void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
686 int allow_crc_calc, void (*free_cb) (void *));
687#define rd_kafka_buf_push(rkbuf,buf,len,free_cb) \
688 rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
689rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
690#define rd_kafka_buf_new(segcnt,size) \
691 rd_kafka_buf_new0(segcnt,size,0)
692rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
693 int segcnt, size_t size);
694rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
695 void (*free_cb) (void *));
696void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
697void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
698void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
699void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
700void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
701 rd_kafka_bufq_t *rkbufq,
702 rd_kafka_resp_err_t err);
703void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
704 rd_kafka_bufq_t *rkbufq);
705void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
706 rd_kafka_bufq_t *rkbq);
707
708int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
709
710void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
711void rd_kafka_buf_callback (rd_kafka_t *rk,
712 rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
713 rd_kafka_buf_t *response, rd_kafka_buf_t *request);
714
715
716
717/**
718 *
719 * Write buffer interface
720 *
721 */
722
723/**
724 * Set request API type version
725 */
726static RD_UNUSED RD_INLINE void
727rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
728 int16_t version, int features) {
729 rkbuf->rkbuf_reqhdr.ApiVersion = version;
730 rkbuf->rkbuf_features = features;
731}
732
733
734/**
735 * @returns the ApiVersion for a request
736 */
737#define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
738
739
740
741/**
742 * Write (copy) data to buffer at current write-buffer position.
743 * There must be enough space allocated in the rkbuf.
744 * Returns offset to written destination buffer.
745 */
746static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
747 const void *data, size_t len) {
748 size_t r;
749
750 r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
751
752 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
753 rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
754
755 return r;
756}
757
758
759
760/**
761 * Write (copy) 'data' to buffer at 'ptr'.
762 * There must be enough space to fit 'len'.
763 * This will overwrite the buffer at given location and length.
764 *
765 * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
766 * is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
767 */
768static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
769 const void *data, size_t len) {
770 rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
771 rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
772}
773
774/**
775 * Write int8_t to buffer.
776 */
777static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
778 int8_t v) {
779 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
780}
781
782/**
783 * Update int8_t in buffer at offset 'of'.
784 * 'of' should have been previously returned by `.._buf_write_i8()`.
785 */
786static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
787 size_t of, int8_t v) {
788 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
789}
790
791/**
792 * Write int16_t to buffer.
793 * The value will be endian-swapped before write.
794 */
795static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
796 int16_t v) {
797 v = htobe16(v);
798 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
799}
800
801/**
802 * Update int16_t in buffer at offset 'of'.
803 * 'of' should have been previously returned by `.._buf_write_i16()`.
804 */
805static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
806 size_t of, int16_t v) {
807 v = htobe16(v);
808 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
809}
810
811/**
812 * Write int32_t to buffer.
813 * The value will be endian-swapped before write.
814 */
815static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
816 int32_t v) {
817 v = htobe32(v);
818 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
819}
820
821/**
822 * Update int32_t in buffer at offset 'of'.
823 * 'of' should have been previously returned by `.._buf_write_i32()`.
824 */
825static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
826 size_t of, int32_t v) {
827 v = htobe32(v);
828 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
829}
830
831/**
832 * Update int32_t in buffer at offset 'of'.
833 * 'of' should have been previously returned by `.._buf_write_i32()`.
834 */
835static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
836 size_t of, uint32_t v) {
837 v = htobe32(v);
838 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
839}
840
841
842/**
843 * Write int64_t to buffer.
844 * The value will be endian-swapped before write.
845 */
846static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) {
847 v = htobe64(v);
848 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
849}
850
851/**
852 * Update int64_t in buffer at address 'ptr'.
853 * 'of' should have been previously returned by `.._buf_write_i64()`.
854 */
855static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
856 size_t of, int64_t v) {
857 v = htobe64(v);
858 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
859}
860
861
862/**
863 * @brief Write varint-encoded signed value to buffer.
864 */
865static RD_INLINE size_t
866rd_kafka_buf_write_varint (rd_kafka_buf_t *rkbuf, int64_t v) {
867 char varint[RD_UVARINT_ENC_SIZEOF(v)];
868 size_t sz;
869
870 sz = rd_uvarint_enc_i64(varint, sizeof(varint), v);
871
872 return rd_kafka_buf_write(rkbuf, varint, sz);
873}
874
875
876/**
877 * Write (copy) Kafka string to buffer.
878 */
879static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
880 const rd_kafkap_str_t *kstr) {
881 return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr),
882 RD_KAFKAP_STR_SIZE(kstr));
883}
884
885/**
886 * Write (copy) char * string to buffer.
887 */
888static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
889 const char *str, size_t len) {
890 size_t r;
891 if (!str)
892 len = RD_KAFKAP_STR_LEN_NULL;
893 else if (len == (size_t)-1)
894 len = strlen(str);
895 r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
896 if (str)
897 rd_kafka_buf_write(rkbuf, str, len);
898 return r;
899}
900
901
902/**
903 * Push (i.e., no copy) Kafka string to buffer iovec
904 */
905static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
906 const rd_kafkap_str_t *kstr) {
907 rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
908 RD_KAFKAP_STR_SIZE(kstr), NULL);
909}
910
911
912
913/**
914 * Write (copy) Kafka bytes to buffer.
915 */
916static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
917 const rd_kafkap_bytes_t *kbytes){
918 return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
919 RD_KAFKAP_BYTES_SIZE(kbytes));
920}
921
922/**
923 * Push (i.e., no copy) Kafka bytes to buffer iovec
924 */
925static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
926 const rd_kafkap_bytes_t *kbytes){
927 rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
928 RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
929}
930
931/**
932 * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
933 */
934static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
935 const void *payload, size_t size) {
936 size_t r;
937 if (!payload)
938 size = RD_KAFKAP_BYTES_LEN_NULL;
939 r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
940 if (payload)
941 rd_kafka_buf_write(rkbuf, payload, size);
942 return r;
943}
944
945
946
947
948/**
949 * Write Kafka Message to buffer
950 * The number of bytes written is returned in '*outlenp'.
951 *
952 * Returns the buffer offset of the first byte.
953 */
954size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
955 rd_kafka_buf_t *rkbuf,
956 int64_t Offset, int8_t MagicByte,
957 int8_t Attributes, int64_t Timestamp,
958 const void *key, int32_t key_len,
959 const void *payload, int32_t len,
960 int *outlenp);
961
962/**
963 * Start calculating CRC from now and track it in '*crcp'.
964 */
965static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
966 rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
967 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
968 rkbuf->rkbuf_crc = rd_crc32_init();
969}
970
971/**
972 * Finalizes CRC calculation and returns the calculated checksum.
973 */
974static RD_INLINE RD_UNUSED
975rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
976 rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
977 return rd_crc32_finalize(rkbuf->rkbuf_crc);
978}
979
980
981
982
983
984/**
985 * @brief Check if buffer's replyq.version is outdated.
986 * @param rkbuf: may be NULL, for convenience.
987 *
988 * @returns 1 if this is an outdated buffer, else 0.
989 */
990static RD_UNUSED RD_INLINE int
991rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
992 return rkbuf && rkbuf->rkbuf_replyq.version &&
993 rkbuf->rkbuf_replyq.version < version;
994}
995
996#endif /* _RDKAFKA_BUF_H_ */
997