1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rd.h"
30#include "rdkafka_int.h"
31#include "rdkafka_msg.h"
32#include "rdkafka_topic.h"
33#include "rdkafka_partition.h"
34#include "rdkafka_interceptor.h"
35#include "rdkafka_header.h"
36#include "rdkafka_idempotence.h"
37#include "rdcrc32.h"
38#include "rdmurmur2.h"
39#include "rdrand.h"
40#include "rdtime.h"
41#include "rdsysqueue.h"
42#include "rdunittest.h"
43
44#include <stdarg.h>
45
46void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
47
48 if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
49 rd_dassert(rk || rkm->rkm_rkmessage.rkt);
50 rd_kafka_curr_msgs_sub(
51 rk ? rk :
52 rd_kafka_topic_a2i(rkm->rkm_rkmessage.rkt)->rkt_rk,
53 1, rkm->rkm_len);
54 }
55
56 if (rkm->rkm_headers)
57 rd_kafka_headers_destroy(rkm->rkm_headers);
58
59 if (likely(rkm->rkm_rkmessage.rkt != NULL))
60 rd_kafka_topic_destroy0(
61 rd_kafka_topic_a2s(rkm->rkm_rkmessage.rkt));
62
63 if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
64 rd_free(rkm->rkm_payload);
65
66 if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
67 rd_free(rkm);
68}
69
70
71
72/**
73 * @brief Create a new Producer message, copying the payload as
74 * indicated by msgflags.
75 *
76 * @returns the new message
77 */
78static
79rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
80 int32_t partition,
81 int msgflags,
82 char *payload, size_t len,
83 const void *key, size_t keylen,
84 void *msg_opaque) {
85 rd_kafka_msg_t *rkm;
86 size_t mlen = sizeof(*rkm);
87 char *p;
88
89 /* If we are to make a copy of the payload, allocate space for it too */
90 if (msgflags & RD_KAFKA_MSG_F_COPY) {
91 msgflags &= ~RD_KAFKA_MSG_F_FREE;
92 mlen += len;
93 }
94
95 mlen += keylen;
96
97 /* Note: using rd_malloc here, not rd_calloc, so make sure all fields
98 * are properly set up. */
99 rkm = rd_malloc(mlen);
100 rkm->rkm_err = 0;
101 rkm->rkm_flags = (RD_KAFKA_MSG_F_PRODUCER |
102 RD_KAFKA_MSG_F_FREE_RKM | msgflags);
103 rkm->rkm_len = len;
104 rkm->rkm_opaque = msg_opaque;
105 rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);
106
107 rkm->rkm_partition = partition;
108 rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID;
109 rkm->rkm_timestamp = 0;
110 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
111 rkm->rkm_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
112 rkm->rkm_headers = NULL;
113
114 p = (char *)(rkm+1);
115
116 if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
117 /* Copy payload to space following the ..msg_t */
118 rkm->rkm_payload = p;
119 memcpy(rkm->rkm_payload, payload, len);
120 p += len;
121
122 } else {
123 /* Just point to the provided payload. */
124 rkm->rkm_payload = payload;
125 }
126
127 if (key) {
128 rkm->rkm_key = p;
129 rkm->rkm_key_len = keylen;
130 memcpy(rkm->rkm_key, key, keylen);
131 } else {
132 rkm->rkm_key = NULL;
133 rkm->rkm_key_len = 0;
134 }
135
136
137 return rkm;
138}
139
140
141
142
143/**
144 * @brief Create a new Producer message.
145 *
146 * @remark Must only be used by producer code.
147 *
148 * Returns 0 on success or -1 on error.
149 * Both errno and 'errp' are set appropriately.
150 */
151static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
152 int32_t force_partition,
153 int msgflags,
154 char *payload, size_t len,
155 const void *key, size_t keylen,
156 void *msg_opaque,
157 rd_kafka_resp_err_t *errp,
158 int *errnop,
159 rd_kafka_headers_t *hdrs,
160 int64_t timestamp,
161 rd_ts_t now) {
162 rd_kafka_msg_t *rkm;
163 size_t hdrs_size = 0;
164
165 if (unlikely(!payload))
166 len = 0;
167 if (!key)
168 keylen = 0;
169 if (hdrs)
170 hdrs_size = rd_kafka_headers_serialized_size(hdrs);
171
172 if (unlikely(len + keylen + hdrs_size >
173 (size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
174 keylen > INT32_MAX)) {
175 *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
176 if (errnop)
177 *errnop = EMSGSIZE;
178 return NULL;
179 }
180
181 if (msgflags & RD_KAFKA_MSG_F_BLOCK)
182 *errp = rd_kafka_curr_msgs_add(
183 rkt->rkt_rk, 1, len, 1/*block*/,
184 (msgflags & RD_KAFKA_MSG_F_RKT_RDLOCKED) ?
185 &rkt->rkt_lock : NULL);
186 else
187 *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len, 0, NULL);
188
189 if (unlikely(*errp)) {
190 if (errnop)
191 *errnop = ENOBUFS;
192 return NULL;
193 }
194
195
196 rkm = rd_kafka_msg_new00(rkt, force_partition,
197 msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
198 payload, len, key, keylen, msg_opaque);
199
200 memset(&rkm->rkm_u.producer, 0, sizeof(rkm->rkm_u.producer));
201
202 if (timestamp)
203 rkm->rkm_timestamp = timestamp;
204 else
205 rkm->rkm_timestamp = rd_uclock()/1000;
206 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
207
208 if (hdrs) {
209 rd_dassert(!rkm->rkm_headers);
210 rkm->rkm_headers = hdrs;
211 }
212
213 rkm->rkm_ts_enq = now;
214
215 if (rkt->rkt_conf.message_timeout_ms == 0) {
216 rkm->rkm_ts_timeout = INT64_MAX;
217 } else {
218 rkm->rkm_ts_timeout = now +
219 (int64_t) rkt->rkt_conf.message_timeout_ms * 1000;
220 }
221
222 /* Call interceptor chain for on_send */
223 rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);
224
225 return rkm;
226}
227
228
229/**
230 * @brief Produce: creates a new message, runs the partitioner and enqueues
231 * into on the selected partition.
232 *
233 * @returns 0 on success or -1 on error.
234 *
235 * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then
236 * the memory associated with the payload is still the caller's
237 * responsibility.
238 *
239 * @locks none
240 */
241int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
242 int msgflags,
243 char *payload, size_t len,
244 const void *key, size_t keylen,
245 void *msg_opaque) {
246 rd_kafka_msg_t *rkm;
247 rd_kafka_resp_err_t err;
248 int errnox;
249
250 if (unlikely((err = rd_kafka_fatal_error_code(rkt->rkt_rk)))) {
251 rd_kafka_set_last_error(err, ECANCELED);
252 return -1;
253 }
254
255 /* Create message */
256 rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
257 payload, len, key, keylen, msg_opaque,
258 &err, &errnox, NULL, 0, rd_clock());
259 if (unlikely(!rkm)) {
260 /* errno is already set by msg_new() */
261 rd_kafka_set_last_error(err, errnox);
262 return -1;
263 }
264
265
266 /* Partition the message */
267 err = rd_kafka_msg_partitioner(rkt, rkm, 1);
268 if (likely(!err)) {
269 rd_kafka_set_last_error(0, 0);
270 return 0;
271 }
272
273 /* Interceptor: unroll failing messages by triggering on_ack.. */
274 rkm->rkm_err = err;
275 rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
276 &rkm->rkm_rkmessage);
277
278 /* Handle partitioner failures: it only fails when the application
279 * attempts to force a destination partition that does not exist
280 * in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE
281 * flag since our contract says we don't free the payload on
282 * failure. */
283
284 rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
285 rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
286
287 /* Translate error codes to errnos. */
288 if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
289 rd_kafka_set_last_error(err, ESRCH);
290 else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
291 rd_kafka_set_last_error(err, ENOENT);
292 else
293 rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */
294
295 return -1;
296}
297
298
299rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...) {
300 va_list ap;
301 rd_kafka_msg_t s_rkm = {
302 /* Message defaults */
303 .rkm_partition = RD_KAFKA_PARTITION_UA,
304 .rkm_timestamp = 0, /* current time */
305 };
306 rd_kafka_msg_t *rkm = &s_rkm;
307 rd_kafka_vtype_t vtype;
308 rd_kafka_topic_t *app_rkt;
309 shptr_rd_kafka_itopic_t *s_rkt = NULL;
310 rd_kafka_itopic_t *rkt;
311 rd_kafka_resp_err_t err;
312 rd_kafka_headers_t *hdrs = NULL;
313 rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
314
315 if (unlikely((err = rd_kafka_fatal_error_code(rk))))
316 return err;
317
318 va_start(ap, rk);
319 while (!err &&
320 (vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) {
321 switch (vtype)
322 {
323 case RD_KAFKA_VTYPE_TOPIC:
324 s_rkt = rd_kafka_topic_new0(rk,
325 va_arg(ap, const char *),
326 NULL, NULL, 1);
327 break;
328
329 case RD_KAFKA_VTYPE_RKT:
330 app_rkt = va_arg(ap, rd_kafka_topic_t *);
331 s_rkt = rd_kafka_topic_keep(
332 rd_kafka_topic_a2i(app_rkt));
333 break;
334
335 case RD_KAFKA_VTYPE_PARTITION:
336 rkm->rkm_partition = va_arg(ap, int32_t);
337 break;
338
339 case RD_KAFKA_VTYPE_VALUE:
340 rkm->rkm_payload = va_arg(ap, void *);
341 rkm->rkm_len = va_arg(ap, size_t);
342 break;
343
344 case RD_KAFKA_VTYPE_KEY:
345 rkm->rkm_key = va_arg(ap, void *);
346 rkm->rkm_key_len = va_arg(ap, size_t);
347 break;
348
349 case RD_KAFKA_VTYPE_OPAQUE:
350 rkm->rkm_opaque = va_arg(ap, void *);
351 break;
352
353 case RD_KAFKA_VTYPE_MSGFLAGS:
354 rkm->rkm_flags = va_arg(ap, int);
355 break;
356
357 case RD_KAFKA_VTYPE_TIMESTAMP:
358 rkm->rkm_timestamp = va_arg(ap, int64_t);
359 break;
360
361 case RD_KAFKA_VTYPE_HEADER:
362 {
363 const char *name;
364 const void *value;
365 ssize_t size;
366
367 if (unlikely(app_hdrs != NULL)) {
368 err = RD_KAFKA_RESP_ERR__CONFLICT;
369 break;
370 }
371
372 if (unlikely(!hdrs))
373 hdrs = rd_kafka_headers_new(8);
374
375 name = va_arg(ap, const char *);
376 value = va_arg(ap, const void *);
377 size = va_arg(ap, ssize_t);
378
379 err = rd_kafka_header_add(hdrs, name, -1, value, size);
380 }
381 break;
382
383 case RD_KAFKA_VTYPE_HEADERS:
384 if (unlikely(hdrs != NULL)) {
385 err = RD_KAFKA_RESP_ERR__CONFLICT;
386 break;
387 }
388 app_hdrs = va_arg(ap, rd_kafka_headers_t *);
389 break;
390
391 default:
392 err = RD_KAFKA_RESP_ERR__INVALID_ARG;
393 break;
394 }
395 }
396
397 va_end(ap);
398
399 if (unlikely(!s_rkt))
400 return RD_KAFKA_RESP_ERR__INVALID_ARG;
401
402 rkt = rd_kafka_topic_s2i(s_rkt);
403
404 if (likely(!err))
405 rkm = rd_kafka_msg_new0(rkt,
406 rkm->rkm_partition,
407 rkm->rkm_flags,
408 rkm->rkm_payload, rkm->rkm_len,
409 rkm->rkm_key, rkm->rkm_key_len,
410 rkm->rkm_opaque,
411 &err, NULL,
412 app_hdrs ? app_hdrs : hdrs,
413 rkm->rkm_timestamp,
414 rd_clock());
415
416 if (unlikely(err)) {
417 rd_kafka_topic_destroy0(s_rkt);
418 if (hdrs)
419 rd_kafka_headers_destroy(hdrs);
420 return err;
421 }
422
423 /* Partition the message */
424 err = rd_kafka_msg_partitioner(rkt, rkm, 1);
425 if (unlikely(err)) {
426 /* Handle partitioner failures: it only fails when
427 * the application attempts to force a destination
428 * partition that does not exist in the cluster. */
429
430 /* Interceptors: Unroll on_send by on_ack.. */
431 rkm->rkm_err = err;
432 rd_kafka_interceptors_on_acknowledgement(rk,
433 &rkm->rkm_rkmessage);
434
435 /* Note we must clear the RD_KAFKA_MSG_F_FREE
436 * flag since our contract says we don't free the payload on
437 * failure. */
438 rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
439
440 /* Deassociate application owned headers from message
441 * since headers remain in application ownership
442 * when producev() fails */
443 if (app_hdrs && app_hdrs == rkm->rkm_headers)
444 rkm->rkm_headers = NULL;
445
446 rd_kafka_msg_destroy(rk, rkm);
447 }
448
449 rd_kafka_topic_destroy0(s_rkt);
450
451 return err;
452}
453
454
455
456/**
457 * @brief Produce a single message.
458 * @locality any application thread
459 * @locks none
460 */
461int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
462 int msgflags,
463 void *payload, size_t len,
464 const void *key, size_t keylen,
465 void *msg_opaque) {
466 return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
467 msgflags, payload, len,
468 key, keylen, msg_opaque);
469}
470
471
472
473/**
474 * Produce a batch of messages.
475 * Returns the number of messages succesfully queued for producing.
476 * Each message's .err will be set accordingly.
477 */
478int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
479 int msgflags,
480 rd_kafka_message_t *rkmessages, int message_cnt) {
481 rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
482 int i;
483 int64_t utc_now = rd_uclock() / 1000;
484 rd_ts_t now = rd_clock();
485 int good = 0;
486 int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA ||
487 (msgflags & RD_KAFKA_MSG_F_PARTITION));
488 rd_kafka_resp_err_t all_err;
489 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
490 shptr_rd_kafka_toppar_t *s_rktp = NULL;
491
492 /* Propagated per-message below */
493 all_err = rd_kafka_fatal_error_code(rkt->rkt_rk);
494
495 rd_kafka_topic_rdlock(rkt);
496 if (!multiple_partitions) {
497 /* Single partition: look up the rktp once. */
498 s_rktp = rd_kafka_toppar_get_avail(rkt, partition,
499 1/*ua on miss*/, &all_err);
500
501 } else {
502 /* Indicate to lower-level msg_new..() that rkt is locked
503 * so that they may unlock it momentarily if blocking. */
504 msgflags |= RD_KAFKA_MSG_F_RKT_RDLOCKED;
505 }
506
507 for (i = 0 ; i < message_cnt ; i++) {
508 rd_kafka_msg_t *rkm;
509
510 /* Propagate error for all messages. */
511 if (unlikely(all_err)) {
512 rkmessages[i].err = all_err;
513 continue;
514 }
515
516 /* Create message */
517 rkm = rd_kafka_msg_new0(rkt,
518 (msgflags & RD_KAFKA_MSG_F_PARTITION) ?
519 rkmessages[i].partition : partition,
520 msgflags,
521 rkmessages[i].payload,
522 rkmessages[i].len,
523 rkmessages[i].key,
524 rkmessages[i].key_len,
525 rkmessages[i]._private,
526 &rkmessages[i].err, NULL,
527 NULL, utc_now, now);
528 if (unlikely(!rkm)) {
529 if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
530 all_err = rkmessages[i].err;
531 continue;
532 }
533
534 /* Three cases here:
535 * partition==UA: run the partitioner (slow)
536 * RD_KAFKA_MSG_F_PARTITION: produce message to specified
537 * partition
538 * fixed partition: simply concatenate the queue
539 * to partit */
540 if (multiple_partitions) {
541 if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
542 /* Partition the message */
543 rkmessages[i].err =
544 rd_kafka_msg_partitioner(
545 rkt, rkm, 0/*already locked*/);
546 } else {
547 if (s_rktp == NULL ||
548 rkm->rkm_partition !=
549 rd_kafka_toppar_s2i(s_rktp)->
550 rktp_partition) {
551 rd_kafka_resp_err_t err;
552 if (s_rktp != NULL)
553 rd_kafka_toppar_destroy(s_rktp);
554 s_rktp = rd_kafka_toppar_get_avail(
555 rkt, rkm->rkm_partition,
556 1/*ua on miss*/, &err);
557
558 if (unlikely(!s_rktp)) {
559 rkmessages[i].err = err;
560 continue;
561 }
562 }
563 rd_kafka_toppar_enq_msg(
564 rd_kafka_toppar_s2i(s_rktp), rkm);
565 }
566
567 if (unlikely(rkmessages[i].err)) {
568 /* Interceptors: Unroll on_send by on_ack.. */
569 rd_kafka_interceptors_on_acknowledgement(
570 rkt->rkt_rk, &rkmessages[i]);
571
572 rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
573 continue;
574 }
575
576
577 } else {
578 /* Single destination partition. */
579 rd_kafka_toppar_enq_msg(rd_kafka_toppar_s2i(s_rktp),
580 rkm);
581 }
582
583 rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
584 good++;
585 }
586
587 rd_kafka_topic_rdunlock(rkt);
588 if (s_rktp != NULL)
589 rd_kafka_toppar_destroy(s_rktp);
590
591 return good;
592}
593
594/**
595 * @brief Scan \p rkmq for messages that have timed out and remove them from
596 * \p rkmq and add to \p timedout queue.
597 *
598 * @returns the number of messages timed out.
599 *
600 * @locality any
601 * @locks toppar_lock MUST be held
602 */
603int rd_kafka_msgq_age_scan (rd_kafka_toppar_t *rktp,
604 rd_kafka_msgq_t *rkmq,
605 rd_kafka_msgq_t *timedout,
606 rd_ts_t now) {
607 rd_kafka_msg_t *rkm, *tmp, *first = NULL;
608 int cnt = timedout->rkmq_msg_cnt;
609
610 /* Assume messages are added in time sequencial order */
611 TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
612 /* NOTE: this is not true for the deprecated (and soon removed)
613 * LIFO queuing strategy. */
614 if (likely(rkm->rkm_ts_timeout > now))
615 break;
616
617 if (!first)
618 first = rkm;
619
620 rd_kafka_msgq_deq(rkmq, rkm, 1);
621 rd_kafka_msgq_enq(timedout, rkm);
622 }
623
624 return timedout->rkmq_msg_cnt - cnt;
625}
626
627
628int
629rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
630 rd_kafka_msg_t *rkm,
631 int (*order_cmp) (const void *, const void *)) {
632 TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
633 rkm_link, order_cmp);
634 rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
635 return ++rkmq->rkmq_msg_cnt;
636}
637
638int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
639 rd_kafka_msgq_t *rkmq,
640 rd_kafka_msg_t *rkm) {
641 rd_dassert(rkm->rkm_u.producer.msgid != 0);
642 return rd_kafka_msgq_enq_sorted0(rkmq, rkm,
643 rkt->rkt_conf.msg_order_cmp);
644}
645
646/**
647 * @brief Find the insert position (i.e., the previous element)
648 * for message \p rkm.
649 *
650 * @returns the insert position element, or NULL if \p rkm should be
651 * added at head of queue.
652 */
653rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq,
654 const rd_kafka_msg_t *rkm,
655 int (*cmp) (const void *,
656 const void *)) {
657 const rd_kafka_msg_t *curr, *last = NULL;
658
659 TAILQ_FOREACH(curr, &rkmq->rkmq_msgs, rkm_link) {
660 if (cmp(rkm, curr) < 0)
661 return (rd_kafka_msg_t *)last;
662 last = curr;
663 }
664
665 return (rd_kafka_msg_t *)last;
666}
667
668
669/**
670 * @brief Set per-message metadata for all messages in \p rkmq
671 */
672void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq,
673 int64_t base_offset, int64_t timestamp,
674 rd_kafka_msg_status_t status) {
675 rd_kafka_msg_t *rkm;
676
677 TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
678 rkm->rkm_offset = base_offset++;
679 if (timestamp != -1) {
680 rkm->rkm_timestamp = timestamp;
681 rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
682 }
683
684 /* Don't downgrade a message from any form of PERSISTED
685 * to NOT_PERSISTED, since the original cause of indicating
686 * PERSISTED can't be changed.
687 * E.g., a previous ack or in-flight timeout. */
688 if (unlikely(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
689 rkm->rkm_status != RD_KAFKA_MSG_STATUS_NOT_PERSISTED))
690 continue;
691
692 rkm->rkm_status = status;
693 }
694}
695
696
697/**
698 * @brief Move all messages in \p src to \p dst whose msgid <= last_msgid.
699 *
700 * @remark src must be ordered
701 */
702void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src,
703 uint64_t last_msgid,
704 rd_kafka_msg_status_t status) {
705 rd_kafka_msg_t *rkm;
706
707 while ((rkm = rd_kafka_msgq_first(src)) &&
708 rkm->rkm_u.producer.msgid <= last_msgid) {
709 rd_kafka_msgq_deq(src, rkm, 1);
710 rd_kafka_msgq_enq(dest, rkm);
711
712 rkm->rkm_status = status;
713 }
714
715 rd_kafka_msgq_verify_order(NULL, dest, 0, rd_false);
716 rd_kafka_msgq_verify_order(NULL, src, 0, rd_false);
717}
718
719
720
721int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
722 const void *key, size_t keylen,
723 int32_t partition_cnt,
724 void *rkt_opaque,
725 void *msg_opaque) {
726 int32_t p = rd_jitter(0, partition_cnt-1);
727 if (unlikely(!rd_kafka_topic_partition_available(rkt, p)))
728 return rd_jitter(0, partition_cnt-1);
729 else
730 return p;
731}
732
733int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
734 const void *key, size_t keylen,
735 int32_t partition_cnt,
736 void *rkt_opaque,
737 void *msg_opaque) {
738 return rd_crc32(key, keylen) % partition_cnt;
739}
740
741int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
742 const void *key, size_t keylen,
743 int32_t partition_cnt,
744 void *rkt_opaque,
745 void *msg_opaque) {
746 if (keylen == 0)
747 return rd_kafka_msg_partitioner_random(rkt,
748 key,
749 keylen,
750 partition_cnt,
751 rkt_opaque,
752 msg_opaque);
753 else
754 return rd_kafka_msg_partitioner_consistent(rkt,
755 key,
756 keylen,
757 partition_cnt,
758 rkt_opaque,
759 msg_opaque);
760}
761
762int32_t
763rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt,
764 const void *key, size_t keylen,
765 int32_t partition_cnt,
766 void *rkt_opaque,
767 void *msg_opaque) {
768 return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt;
769}
770
771int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
772 const void *key, size_t keylen,
773 int32_t partition_cnt,
774 void *rkt_opaque,
775 void *msg_opaque) {
776 if (!key)
777 return rd_kafka_msg_partitioner_random(rkt,
778 key,
779 keylen,
780 partition_cnt,
781 rkt_opaque,
782 msg_opaque);
783 else
784 return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt;
785}
786
787
788/**
789 * Assigns a message to a topic partition using a partitioner.
790 * Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
791 * partitioning failed, or 0 on success.
792 */
793int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
794 int do_lock) {
795 int32_t partition;
796 rd_kafka_toppar_t *rktp_new;
797 shptr_rd_kafka_toppar_t *s_rktp_new;
798 rd_kafka_resp_err_t err;
799
800 if (do_lock)
801 rd_kafka_topic_rdlock(rkt);
802
803 switch (rkt->rkt_state)
804 {
805 case RD_KAFKA_TOPIC_S_UNKNOWN:
806 /* No metadata received from cluster yet.
807 * Put message in UA partition and re-run partitioner when
808 * cluster comes up. */
809 partition = RD_KAFKA_PARTITION_UA;
810 break;
811
812 case RD_KAFKA_TOPIC_S_NOTEXISTS:
813 /* Topic not found in cluster.
814 * Fail message immediately. */
815 err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
816 if (do_lock)
817 rd_kafka_topic_rdunlock(rkt);
818 return err;
819
820 case RD_KAFKA_TOPIC_S_EXISTS:
821 /* Topic exists in cluster. */
822
823 /* Topic exists but has no partitions.
824 * This is usually an transient state following the
825 * auto-creation of a topic. */
826 if (unlikely(rkt->rkt_partition_cnt == 0)) {
827 partition = RD_KAFKA_PARTITION_UA;
828 break;
829 }
830
831 /* Partition not assigned, run partitioner. */
832 if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
833 rd_kafka_topic_t *app_rkt;
834 /* Provide a temporary app_rkt instance to protect
835 * from the case where the application decided to
836 * destroy its topic object prior to delivery completion
837 * (issue #502). */
838 app_rkt = rd_kafka_topic_keep_a(rkt);
839 partition = rkt->rkt_conf.
840 partitioner(app_rkt,
841 rkm->rkm_key,
842 rkm->rkm_key_len,
843 rkt->rkt_partition_cnt,
844 rkt->rkt_conf.opaque,
845 rkm->rkm_opaque);
846 rd_kafka_topic_destroy0(
847 rd_kafka_topic_a2s(app_rkt));
848 } else
849 partition = rkm->rkm_partition;
850
851 /* Check that partition exists. */
852 if (partition >= rkt->rkt_partition_cnt) {
853 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
854 if (do_lock)
855 rd_kafka_topic_rdunlock(rkt);
856 return err;
857 }
858 break;
859
860 default:
861 rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
862 break;
863 }
864
865 /* Get new partition */
866 s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
867
868 if (unlikely(!s_rktp_new)) {
869 /* Unknown topic or partition */
870 if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
871 err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
872 else
873 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
874
875 if (do_lock)
876 rd_kafka_topic_rdunlock(rkt);
877
878 return err;
879 }
880
881 rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
882 rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1);
883
884 /* Update message partition */
885 if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
886 rkm->rkm_partition = partition;
887
888 /* Partition is available: enqueue msg on partition's queue */
889 rd_kafka_toppar_enq_msg(rktp_new, rkm);
890 if (do_lock)
891 rd_kafka_topic_rdunlock(rkt);
892 rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
893 return 0;
894}
895
896
897
898
899/**
900 * @name Public message type (rd_kafka_message_t)
901 */
902void rd_kafka_message_destroy (rd_kafka_message_t *rkmessage) {
903 rd_kafka_op_t *rko;
904
905 if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
906 rd_kafka_op_destroy(rko);
907 else {
908 rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
909 rd_kafka_msg_destroy(NULL, rkm);
910 }
911}
912
913
914rd_kafka_message_t *rd_kafka_message_new (void) {
915 rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
916 return (rd_kafka_message_t *)rkm;
917}
918
919
920/**
921 * @brief Set up a rkmessage from an rko for passing to the application.
922 * @remark Will trigger on_consume() interceptors if any.
923 */
924static rd_kafka_message_t *
925rd_kafka_message_setup (rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) {
926 rd_kafka_itopic_t *rkt;
927 rd_kafka_toppar_t *rktp = NULL;
928
929 if (rko->rko_type == RD_KAFKA_OP_DR) {
930 rkt = rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt);
931 } else {
932 if (rko->rko_rktp) {
933 rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
934 rkt = rktp->rktp_rkt;
935 } else
936 rkt = NULL;
937
938 rkmessage->_private = rko;
939 }
940
941
942 if (!rkmessage->rkt && rkt)
943 rkmessage->rkt = rd_kafka_topic_keep_a(rkt);
944
945 if (rktp)
946 rkmessage->partition = rktp->rktp_partition;
947
948 if (!rkmessage->err)
949 rkmessage->err = rko->rko_err;
950
951 /* Call on_consume interceptors */
952 switch (rko->rko_type)
953 {
954 case RD_KAFKA_OP_FETCH:
955 if (!rkmessage->err && rkt)
956 rd_kafka_interceptors_on_consume(rkt->rkt_rk,
957 rkmessage);
958 break;
959
960 default:
961 break;
962 }
963
964 return rkmessage;
965}
966
967
968
969/**
970 * @brief Get rkmessage from rkm (for EVENT_DR)
971 * @remark Must only be called just prior to passing a dr to the application.
972 */
973rd_kafka_message_t *rd_kafka_message_get_from_rkm (rd_kafka_op_t *rko,
974 rd_kafka_msg_t *rkm) {
975 return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage);
976}
977
978/**
979 * @brief Convert rko to rkmessage
980 * @remark Must only be called just prior to passing a consumed message
981 * or event to the application.
982 * @remark Will trigger on_consume() interceptors, if any.
983 * @returns a rkmessage (bound to the rko).
984 */
985rd_kafka_message_t *rd_kafka_message_get (rd_kafka_op_t *rko) {
986 rd_kafka_message_t *rkmessage;
987
988 if (!rko)
989 return rd_kafka_message_new(); /* empty */
990
991 switch (rko->rko_type)
992 {
993 case RD_KAFKA_OP_FETCH:
994 /* Use embedded rkmessage */
995 rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage;
996 break;
997
998 case RD_KAFKA_OP_ERR:
999 case RD_KAFKA_OP_CONSUMER_ERR:
1000 rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage;
1001 rkmessage->payload = rko->rko_u.err.errstr;
1002 rkmessage->len = rkmessage->payload ?
1003 strlen(rkmessage->payload) : 0;
1004 rkmessage->offset = rko->rko_u.err.offset;
1005 break;
1006
1007 default:
1008 rd_kafka_assert(NULL, !*"unhandled optype");
1009 RD_NOTREACHED();
1010 return NULL;
1011 }
1012
1013 return rd_kafka_message_setup(rko, rkmessage);
1014}
1015
1016
1017int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
1018 rd_kafka_timestamp_type_t *tstype) {
1019 rd_kafka_msg_t *rkm;
1020
1021 if (rkmessage->err) {
1022 if (tstype)
1023 *tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
1024 return -1;
1025 }
1026
1027 rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
1028
1029 if (tstype)
1030 *tstype = rkm->rkm_tstype;
1031
1032 return rkm->rkm_timestamp;
1033}
1034
1035
1036int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage) {
1037 rd_kafka_msg_t *rkm;
1038
1039 rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
1040
1041 if (unlikely(!rkm->rkm_ts_enq))
1042 return -1;
1043
1044 return rd_clock() - rkm->rkm_ts_enq;
1045}
1046
1047
1048
1049/**
1050 * @brief Parse serialized message headers and populate
1051 * rkm->rkm_headers (which must be NULL).
1052 */
1053static rd_kafka_resp_err_t rd_kafka_msg_headers_parse (rd_kafka_msg_t *rkm) {
1054 rd_kafka_buf_t *rkbuf;
1055 int64_t HeaderCount;
1056 const int log_decode_errors = 0;
1057 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
1058 int i;
1059 rd_kafka_headers_t *hdrs = NULL;
1060
1061 rd_dassert(!rkm->rkm_headers);
1062
1063 if (RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs) == 0)
1064 return RD_KAFKA_RESP_ERR__NOENT;
1065
1066 rkbuf = rd_kafka_buf_new_shadow(rkm->rkm_u.consumer.binhdrs.data,
1067 RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.
1068 consumer.binhdrs),
1069 NULL);
1070
1071 rd_kafka_buf_read_varint(rkbuf, &HeaderCount);
1072
1073 if (HeaderCount <= 0) {
1074 rd_kafka_buf_destroy(rkbuf);
1075 return RD_KAFKA_RESP_ERR__NOENT;
1076 } else if (unlikely(HeaderCount > 100000)) {
1077 rd_kafka_buf_destroy(rkbuf);
1078 return RD_KAFKA_RESP_ERR__BAD_MSG;
1079 }
1080
1081 hdrs = rd_kafka_headers_new((size_t)HeaderCount);
1082
1083 for (i = 0 ; (int64_t)i < HeaderCount ; i++) {
1084 int64_t KeyLen, ValueLen;
1085 const char *Key, *Value;
1086
1087 rd_kafka_buf_read_varint(rkbuf, &KeyLen);
1088 rd_kafka_buf_read_ptr(rkbuf, &Key, (size_t)KeyLen);
1089
1090 rd_kafka_buf_read_varint(rkbuf, &ValueLen);
1091 if (unlikely(ValueLen == -1))
1092 Value = NULL;
1093 else
1094 rd_kafka_buf_read_ptr(rkbuf, &Value, (size_t)ValueLen);
1095
1096 rd_kafka_header_add(hdrs, Key, (ssize_t)KeyLen,
1097 Value, (ssize_t)ValueLen);
1098 }
1099
1100 rkm->rkm_headers = hdrs;
1101
1102 rd_kafka_buf_destroy(rkbuf);
1103 return RD_KAFKA_RESP_ERR_NO_ERROR;
1104
1105 err_parse:
1106 err = rkbuf->rkbuf_err;
1107 rd_kafka_buf_destroy(rkbuf);
1108 if (hdrs)
1109 rd_kafka_headers_destroy(hdrs);
1110 return err;
1111}
1112
1113
1114
1115
1116rd_kafka_resp_err_t
1117rd_kafka_message_headers (const rd_kafka_message_t *rkmessage,
1118 rd_kafka_headers_t **hdrsp) {
1119 rd_kafka_msg_t *rkm;
1120 rd_kafka_resp_err_t err;
1121
1122 rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
1123
1124 if (rkm->rkm_headers) {
1125 *hdrsp = rkm->rkm_headers;
1126 return RD_KAFKA_RESP_ERR_NO_ERROR;
1127 }
1128
1129 /* Producer (rkm_headers will be set if there were any headers) */
1130 if (rkm->rkm_flags & RD_KAFKA_MSG_F_PRODUCER)
1131 return RD_KAFKA_RESP_ERR__NOENT;
1132
1133 /* Consumer */
1134
1135 /* No previously parsed headers, check if the underlying
1136 * protocol message had headers and if so, parse them. */
1137 if (unlikely(!RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs)))
1138 return RD_KAFKA_RESP_ERR__NOENT;
1139
1140 err = rd_kafka_msg_headers_parse(rkm);
1141 if (unlikely(err))
1142 return err;
1143
1144 *hdrsp = rkm->rkm_headers;
1145 return RD_KAFKA_RESP_ERR_NO_ERROR;
1146}
1147
1148
1149rd_kafka_resp_err_t
1150rd_kafka_message_detach_headers (rd_kafka_message_t *rkmessage,
1151 rd_kafka_headers_t **hdrsp) {
1152 rd_kafka_msg_t *rkm;
1153 rd_kafka_resp_err_t err;
1154
1155 err = rd_kafka_message_headers(rkmessage, hdrsp);
1156 if (err)
1157 return err;
1158
1159 rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
1160 rkm->rkm_headers = NULL;
1161
1162 return RD_KAFKA_RESP_ERR_NO_ERROR;
1163}
1164
1165
1166void rd_kafka_message_set_headers (rd_kafka_message_t *rkmessage,
1167 rd_kafka_headers_t *hdrs) {
1168 rd_kafka_msg_t *rkm;
1169
1170 rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
1171
1172 if (rkm->rkm_headers) {
1173 assert(rkm->rkm_headers != hdrs);
1174 rd_kafka_headers_destroy(rkm->rkm_headers);
1175 }
1176
1177 rkm->rkm_headers = hdrs;
1178}
1179
1180
1181
1182rd_kafka_msg_status_t
1183rd_kafka_message_status (const rd_kafka_message_t *rkmessage) {
1184 rd_kafka_msg_t *rkm;
1185
1186 rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
1187
1188 return rkm->rkm_status;
1189}
1190
1191
1192void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq) {
1193 rd_kafka_msg_t *rkm;
1194
1195 fprintf(fp, "%s msgq_dump (%d messages, %"PRIusz" bytes):\n", what,
1196 rd_kafka_msgq_len(rkmq), rd_kafka_msgq_size(rkmq));
1197 TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
1198 fprintf(fp, " [%"PRId32"]@%"PRId64
1199 ": rkm msgid %"PRIu64": \"%.*s\"\n",
1200 rkm->rkm_partition, rkm->rkm_offset,
1201 rkm->rkm_u.producer.msgid,
1202 (int)rkm->rkm_len, (const char *)rkm->rkm_payload);
1203 }
1204}
1205
1206
1207
1208
1209/**
1210 * @brief Destroy resources associated with msgbatch
1211 */
1212void rd_kafka_msgbatch_destroy (rd_kafka_msgbatch_t *rkmb) {
1213 if (rkmb->s_rktp) {
1214 rd_kafka_toppar_destroy(rkmb->s_rktp);
1215 rkmb->s_rktp = NULL;
1216 }
1217
1218 rd_assert(RD_KAFKA_MSGQ_EMPTY(&rkmb->msgq));
1219}
1220
1221
1222/**
1223 * @brief Initialize a message batch for the Idempotent Producer.
1224 *
1225 * @param rkm is the first message in the batch.
1226 */
1227void rd_kafka_msgbatch_init (rd_kafka_msgbatch_t *rkmb,
1228 rd_kafka_toppar_t *rktp,
1229 rd_kafka_pid_t pid) {
1230 memset(rkmb, 0, sizeof(*rkmb));
1231
1232 rkmb->s_rktp = rd_kafka_toppar_keep(rktp);
1233
1234 rd_kafka_msgq_init(&rkmb->msgq);
1235
1236 rkmb->pid = pid;
1237 rkmb->first_seq = -1;
1238}
1239
1240
1241/**
1242 * @brief Set the first message in the batch. which is used to set
1243 * the BaseSequence and keep track of batch reconstruction range.
1244 */
1245void rd_kafka_msgbatch_set_first_msg (rd_kafka_msgbatch_t *rkmb,
1246 rd_kafka_msg_t *rkm) {
1247 rd_assert(rkmb->first_msgid == 0);
1248
1249 if (!rd_kafka_pid_valid(rkmb->pid))
1250 return;
1251
1252 rkmb->first_msgid = rkm->rkm_u.producer.msgid;
1253
1254 /* Our msgid counter is 64-bits, but the
1255 * Kafka protocol's sequence is only 31 (signed), so we'll
1256 * need to handle wrapping. */
1257 rkmb->first_seq =
1258 rd_kafka_seq_wrap(rkm->rkm_u.producer.msgid -
1259 rd_kafka_toppar_s2i(rkmb->s_rktp)->
1260 rktp_eos.epoch_base_msgid);
1261
1262 /* Check if there is a stored last message
1263 * on the first msg, which means an entire
1264 * batch of messages are being retried and
1265 * we need to maintain the exact messages
1266 * of the original batch.
1267 * Simply tracking the last message, on
1268 * the first message, is sufficient for now.
1269 * Will be 0 if not applicable. */
1270 rkmb->last_msgid = rkm->rkm_u.producer.last_msgid;
1271}
1272
1273
1274
1275/**
1276 * @brief Message batch is ready to be transmitted.
1277 *
1278 * @remark This function assumes the batch will be transmitted and increases
1279 * the toppar's in-flight count.
1280 */
1281void rd_kafka_msgbatch_ready_produce (rd_kafka_msgbatch_t *rkmb) {
1282 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rkmb->s_rktp);
1283 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1284
1285 /* Keep track of number of requests in-flight per partition,
1286 * and the number of partitions with in-flight requests when
1287 * idempotent producer - this is used to drain partitions
1288 * before resetting the PID. */
1289 if (rd_atomic32_add(&rktp->rktp_msgs_inflight,
1290 rd_kafka_msgq_len(&rkmb->msgq)) ==
1291 rd_kafka_msgq_len(&rkmb->msgq) &&
1292 rd_kafka_is_idempotent(rk))
1293 rd_kafka_idemp_inflight_toppar_add(rk, rktp);
1294}
1295
1296
1297/**
1298 * @brief Verify order (by msgid) in message queue.
1299 * For development use only.
1300 */
1301void rd_kafka_msgq_verify_order0 (const char *function, int line,
1302 const rd_kafka_toppar_t *rktp,
1303 const rd_kafka_msgq_t *rkmq,
1304 uint64_t exp_first_msgid,
1305 rd_bool_t gapless) {
1306 const rd_kafka_msg_t *rkm;
1307 uint64_t exp;
1308 int errcnt = 0;
1309 int cnt = 0;
1310 const char *topic = rktp ? rktp->rktp_rkt->rkt_topic->str : "n/a";
1311 int32_t partition = rktp ? rktp->rktp_partition : -1;
1312
1313 if (rd_kafka_msgq_len(rkmq) == 0)
1314 return;
1315
1316 if (exp_first_msgid)
1317 exp = exp_first_msgid;
1318 else {
1319 exp = rd_kafka_msgq_first(rkmq)->rkm_u.producer.msgid;
1320 if (exp == 0) /* message without msgid (e.g., UA partition) */
1321 return;
1322 }
1323
1324 TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
1325#if 0
1326 printf("%s:%d: %s [%"PRId32"]: rkm #%d (%p) "
1327 "msgid %"PRIu64"\n",
1328 function, line,
1329 topic, partition,
1330 cnt, rkm, rkm->rkm_u.producer.msgid);
1331#endif
1332 if (gapless &&
1333 rkm->rkm_u.producer.msgid != exp) {
1334 printf("%s:%d: %s [%"PRId32"]: rkm #%d (%p) "
1335 "msgid %"PRIu64": "
1336 "expected msgid %"PRIu64"\n",
1337 function, line,
1338 topic, partition,
1339 cnt, rkm, rkm->rkm_u.producer.msgid,
1340 exp);
1341 errcnt++;
1342 } else if (!gapless && rkm->rkm_u.producer.msgid < exp) {
1343 printf("%s:%d: %s [%"PRId32"]: rkm #%d (%p) "
1344 "msgid %"PRIu64": "
1345 "expected increased msgid >= %"PRIu64"\n",
1346 function, line,
1347 topic, partition,
1348 cnt, rkm, rkm->rkm_u.producer.msgid,
1349 exp);
1350 errcnt++;
1351 } else
1352 exp++;
1353
1354 cnt++;
1355 }
1356
1357 rd_assert(!errcnt);
1358}
1359
1360
1361
1362/**
1363 * @name Unit tests
1364 */
1365
1366/**
1367 * @brief Unittest: message allocator
1368 */
1369rd_kafka_msg_t *ut_rd_kafka_msg_new (void) {
1370 rd_kafka_msg_t *rkm;
1371
1372 rkm = rd_calloc(1, sizeof(*rkm));
1373 rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM;
1374 rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID;
1375 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
1376
1377 return rkm;
1378}
1379
1380
1381
1382/**
1383 * @brief Unittest: destroy all messages in queue
1384 */
1385void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq) {
1386 rd_kafka_msg_t *rkm, *tmp;
1387
1388 TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp)
1389 rd_kafka_msg_destroy(NULL, rkm);
1390
1391
1392 rd_kafka_msgq_init(rkmq);
1393}
1394
1395
1396
1397static int ut_verify_msgq_order (const char *what,
1398 const rd_kafka_msgq_t *rkmq,
1399 int first, int last) {
1400 const rd_kafka_msg_t *rkm;
1401 uint64_t expected = first;
1402 int incr = first < last ? +1 : -1;
1403 int fails = 0;
1404 int cnt = 0;
1405
1406 TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
1407 if (rkm->rkm_u.producer.msgid != expected) {
1408 RD_UT_SAY("%s: expected msgid %"PRIu64
1409 " not %"PRIu64" at index #%d",
1410 what, expected,
1411 rkm->rkm_u.producer.msgid, cnt);
1412 fails++;
1413 }
1414 cnt++;
1415 expected += incr;
1416 }
1417
1418 RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails);
1419 return fails;
1420}
1421
1422/**
1423 * @brief Verify ordering comparator for message queues.
1424 */
1425static int unittest_msgq_order (const char *what, int fifo,
1426 int (*cmp) (const void *, const void *)) {
1427 rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
1428 rd_kafka_msg_t *rkm;
1429 rd_kafka_msgq_t sendq, sendq2;
1430 int i;
1431
1432 RD_UT_SAY("%s: testing in %s mode", what, fifo? "FIFO" : "LIFO");
1433
1434 for (i = 1 ; i <= 6 ; i++) {
1435 rkm = ut_rd_kafka_msg_new();
1436 rkm->rkm_u.producer.msgid = i;
1437 rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp);
1438 }
1439
1440 if (fifo) {
1441 if (ut_verify_msgq_order("added", &rkmq, 1, 6))
1442 return 1;
1443 } else {
1444 if (ut_verify_msgq_order("added", &rkmq, 6, 1))
1445 return 1;
1446 }
1447
1448 /* Move 3 messages to "send" queue which we then re-insert
1449 * in the original queue (i.e., "retry"). */
1450 rd_kafka_msgq_init(&sendq);
1451 while (rd_kafka_msgq_len(&sendq) < 3)
1452 rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
1453
1454 if (fifo) {
1455 if (ut_verify_msgq_order("send removed", &rkmq, 4, 6))
1456 return 1;
1457
1458 if (ut_verify_msgq_order("sendq", &sendq, 1, 3))
1459 return 1;
1460 } else {
1461 if (ut_verify_msgq_order("send removed", &rkmq, 3, 1))
1462 return 1;
1463
1464 if (ut_verify_msgq_order("sendq", &sendq, 6, 4))
1465 return 1;
1466 }
1467
1468 /* Retry the messages, which moves them back to sendq
1469 * maintaining the original order */
1470 rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0,
1471 RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
1472
1473 RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0,
1474 "sendq FIFO should be empty, not contain %d messages",
1475 rd_kafka_msgq_len(&sendq));
1476
1477 if (fifo) {
1478 if (ut_verify_msgq_order("readded", &rkmq, 1, 6))
1479 return 1;
1480 } else {
1481 if (ut_verify_msgq_order("readded", &rkmq, 6, 1))
1482 return 1;
1483 }
1484
1485 /* Move 4 first messages to to "send" queue, then
1486 * retry them with max_retries=1 which should now fail for
1487 * the 3 first messages that were already retried. */
1488 rd_kafka_msgq_init(&sendq);
1489 while (rd_kafka_msgq_len(&sendq) < 4)
1490 rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
1491
1492 if (fifo) {
1493 if (ut_verify_msgq_order("send removed #2", &rkmq, 5, 6))
1494 return 1;
1495
1496 if (ut_verify_msgq_order("sendq #2", &sendq, 1, 4))
1497 return 1;
1498 } else {
1499 if (ut_verify_msgq_order("send removed #2", &rkmq, 2, 1))
1500 return 1;
1501
1502 if (ut_verify_msgq_order("sendq #2", &sendq, 6, 3))
1503 return 1;
1504 }
1505
1506 /* Retry the messages, which should now keep the 3 first messages
1507 * on sendq (no more retries) and just number 4 moved back. */
1508 rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0,
1509 RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
1510
1511 if (fifo) {
1512 if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6))
1513 return 1;
1514
1515 if (ut_verify_msgq_order("no more retries", &sendq, 1, 3))
1516 return 1;
1517
1518 } else {
1519 if (ut_verify_msgq_order("readded #2", &rkmq, 3, 1))
1520 return 1;
1521
1522 if (ut_verify_msgq_order("no more retries", &sendq, 6, 4))
1523 return 1;
1524 }
1525
1526 /* Move all messages back on rkmq */
1527 rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0,
1528 RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
1529
1530
1531 /* Move first half of messages to sendq (1,2,3).
1532 * Move second half o messages to sendq2 (4,5,6).
1533 * Add new message to rkmq (7).
1534 * Move first half of messages back on rkmq (1,2,3,7).
1535 * Move second half back on the rkmq (1,2,3,4,5,6,7). */
1536 rd_kafka_msgq_init(&sendq);
1537 rd_kafka_msgq_init(&sendq2);
1538
1539 while (rd_kafka_msgq_len(&sendq) < 3)
1540 rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
1541
1542 while (rd_kafka_msgq_len(&sendq2) < 3)
1543 rd_kafka_msgq_enq(&sendq2, rd_kafka_msgq_pop(&rkmq));
1544
1545 rkm = ut_rd_kafka_msg_new();
1546 rkm->rkm_u.producer.msgid = i;
1547 rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp);
1548
1549 rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0,
1550 RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
1551 rd_kafka_retry_msgq(&rkmq, &sendq2, 0, 1000, 0,
1552 RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
1553
1554 RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0,
1555 "sendq FIFO should be empty, not contain %d messages",
1556 rd_kafka_msgq_len(&sendq));
1557 RD_UT_ASSERT(rd_kafka_msgq_len(&sendq2) == 0,
1558 "sendq2 FIFO should be empty, not contain %d messages",
1559 rd_kafka_msgq_len(&sendq2));
1560
1561 if (fifo) {
1562 if (ut_verify_msgq_order("inject", &rkmq, 1, 7))
1563 return 1;
1564 } else {
1565 if (ut_verify_msgq_order("readded #2", &rkmq, 7, 1))
1566 return 1;
1567 }
1568
1569
1570 ut_rd_kafka_msgq_purge(&sendq);
1571 ut_rd_kafka_msgq_purge(&sendq2);
1572 ut_rd_kafka_msgq_purge(&rkmq);
1573
1574 return 0;
1575
1576}
1577
1578/**
1579 * @brief Verify that rd_kafka_seq_wrap() works.
1580 */
1581static int unittest_msg_seq_wrap (void) {
1582 static const struct exp {
1583 int64_t in;
1584 int32_t out;
1585 } exp[] = {
1586 { 0, 0 },
1587 { 1, 1 },
1588 { (int64_t)INT32_MAX+2, 1 },
1589 { (int64_t)INT32_MAX+1, 0 },
1590 { INT32_MAX, INT32_MAX },
1591 { INT32_MAX-1, INT32_MAX-1 },
1592 { INT32_MAX-2, INT32_MAX-2 },
1593 { ((int64_t)1<<33)-2, INT32_MAX-1 },
1594 { ((int64_t)1<<33)-1, INT32_MAX },
1595 { ((int64_t)1<<34), 0 },
1596 { ((int64_t)1<<35)+3, 3 },
1597 { 1710+1229, 2939 },
1598 { -1, -1 },
1599 };
1600 int i;
1601
1602 for (i = 0 ; exp[i].in != -1 ; i++) {
1603 int32_t wseq = rd_kafka_seq_wrap(exp[i].in);
1604 RD_UT_ASSERT(wseq == exp[i].out,
1605 "Expected seq_wrap(%"PRId64") -> %"PRId32
1606 ", not %"PRId32,
1607 exp[i].in, exp[i].out, wseq);
1608 }
1609
1610 RD_UT_PASS();
1611}
1612
1613int unittest_msg (void) {
1614 int fails = 0;
1615
1616 fails += unittest_msgq_order("FIFO", 1, rd_kafka_msg_cmp_msgid);
1617 fails += unittest_msg_seq_wrap();
1618
1619 return fails;
1620}
1621