1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <stdarg.h>
30
31#include "rdkafka_int.h"
32#include "rdkafka_request.h"
33#include "rdkafka_broker.h"
34#include "rdkafka_offset.h"
35#include "rdkafka_topic.h"
36#include "rdkafka_partition.h"
37#include "rdkafka_metadata.h"
38#include "rdkafka_msgset.h"
39#include "rdkafka_idempotence.h"
40
41#include "rdrand.h"
42#include "rdstring.h"
43#include "rdunittest.h"
44
45
46/**
47 * Kafka protocol request and response handling.
48 * All of this code runs in the broker thread and uses op queues for
49 * propagating results back to the various sub-systems operating in
50 * other threads.
51 */
52
53
54/* RD_KAFKA_ERR_ACTION_.. to string map */
55static const char *rd_kafka_actions_descs[] = {
56 "Permanent",
57 "Ignore",
58 "Refresh",
59 "Retry",
60 "Inform",
61 "Special",
62 "MsgNotPersisted",
63 "MsgPossiblyPersisted",
64 "MsgPersisted",
65 NULL,
66};
67
68static const char *rd_kafka_actions2str (int actions) {
69 static RD_TLS char actstr[128];
70 return rd_flags2str(actstr, sizeof(actstr),
71 rd_kafka_actions_descs,
72 actions);
73}
74
75
76/**
77 * @brief Decide action(s) to take based on the returned error code.
78 *
79 * The optional var-args is a .._ACTION_END terminated list
80 * of action,error tuples which overrides the general behaviour.
81 * It is to be read as: for \p error, return \p action(s).
82 *
83 * @warning \p request, \p rkbuf and \p rkb may be NULL.
84 */
85int rd_kafka_err_action (rd_kafka_broker_t *rkb,
86 rd_kafka_resp_err_t err,
87 const rd_kafka_buf_t *request, ...) {
88 va_list ap;
89 int actions = 0;
90 int exp_act;
91
92 if (!err)
93 return 0;
94
95 /* Match explicitly defined error mappings first. */
96 va_start(ap, request);
97 while ((exp_act = va_arg(ap, int))) {
98 int exp_err = va_arg(ap, int);
99
100 if (err == exp_err)
101 actions |= exp_act;
102 }
103 va_end(ap);
104
105 /* Explicit error match. */
106 if (actions) {
107 if (err && rkb && request)
108 rd_rkb_dbg(rkb, BROKER, "REQERR",
109 "%sRequest failed: %s: explicit actions %s",
110 rd_kafka_ApiKey2str(request->rkbuf_reqhdr.
111 ApiKey),
112 rd_kafka_err2str(err),
113 rd_kafka_actions2str(actions));
114
115 return actions;
116 }
117
118 /* Default error matching */
119 switch (err)
120 {
121 case RD_KAFKA_RESP_ERR_NO_ERROR:
122 break;
123 case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
124 case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
125 case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
126 case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
127 case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
128 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
129 case RD_KAFKA_RESP_ERR__WAIT_COORD:
130 /* Request metadata information update */
131 actions |= RD_KAFKA_ERR_ACTION_REFRESH|
132 RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
133 break;
134
135 case RD_KAFKA_RESP_ERR__TIMED_OUT:
136 case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
137 case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
138 actions |= RD_KAFKA_ERR_ACTION_RETRY|
139 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
140 break;
141
142 case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
143 /* Client-side wait-response/in-queue timeout */
144 case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS:
145 case RD_KAFKA_RESP_ERR__TRANSPORT:
146 actions |= RD_KAFKA_ERR_ACTION_RETRY|
147 RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
148 break;
149
150 case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT:
151 actions |= RD_KAFKA_ERR_ACTION_PERMANENT|
152 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
153 break;
154
155 case RD_KAFKA_RESP_ERR__DESTROY:
156 case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
157 case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
158 case RD_KAFKA_RESP_ERR__PURGE_QUEUE:
159 default:
160 actions |= RD_KAFKA_ERR_ACTION_PERMANENT|
161 RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
162 break;
163 }
164
165 /* If no request buffer was specified, which might be the case
166 * in certain error call chains, mask out the retry action. */
167 if (!request)
168 actions &= ~RD_KAFKA_ERR_ACTION_RETRY;
169 else if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_Produce)
170 /* Mask out message-related bits for non-Produce requests */
171 actions &= ~RD_KAFKA_ERR_ACTION_MSG_FLAGS;
172
173 if (err && actions && rkb && request)
174 rd_rkb_dbg(rkb, BROKER, "REQERR",
175 "%sRequest failed: %s: actions %s",
176 rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
177 rd_kafka_err2str(err),
178 rd_kafka_actions2str(actions));
179
180 return actions;
181}
182
183
184/**
185 * Send GroupCoordinatorRequest
186 */
187void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
188 const rd_kafkap_str_t *cgrp,
189 rd_kafka_replyq_t replyq,
190 rd_kafka_resp_cb_t *resp_cb,
191 void *opaque) {
192 rd_kafka_buf_t *rkbuf;
193
194 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1,
195 RD_KAFKAP_STR_SIZE(cgrp));
196 rd_kafka_buf_write_kstr(rkbuf, cgrp);
197
198 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
199}
200
201
202
203
204/**
205 * @brief Parses and handles Offset replies.
206 *
207 * Returns the parsed offsets (and errors) in \p offsets
208 *
209 * @returns 0 on success, else an error.
210 */
211rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
212 rd_kafka_broker_t *rkb,
213 rd_kafka_resp_err_t err,
214 rd_kafka_buf_t *rkbuf,
215 rd_kafka_buf_t *request,
216 rd_kafka_topic_partition_list_t
217 *offsets) {
218
219 const int log_decode_errors = LOG_ERR;
220 int16_t ErrorCode = 0;
221 int32_t TopicArrayCnt;
222 int actions;
223 int16_t api_version;
224
225 if (err) {
226 ErrorCode = err;
227 goto err;
228 }
229
230 api_version = request->rkbuf_reqhdr.ApiVersion;
231
232 /* NOTE:
233 * Broker may return offsets in a different constellation than
234 * in the original request .*/
235
236 rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
237 while (TopicArrayCnt-- > 0) {
238 rd_kafkap_str_t ktopic;
239 int32_t PartArrayCnt;
240 char *topic_name;
241
242 rd_kafka_buf_read_str(rkbuf, &ktopic);
243 rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
244
245 RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
246
247 while (PartArrayCnt-- > 0) {
248 int32_t kpartition;
249 int32_t OffsetArrayCnt;
250 int64_t Offset = -1;
251 rd_kafka_topic_partition_t *rktpar;
252
253 rd_kafka_buf_read_i32(rkbuf, &kpartition);
254 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
255
256 if (api_version == 1) {
257 int64_t Timestamp;
258 rd_kafka_buf_read_i64(rkbuf, &Timestamp);
259 rd_kafka_buf_read_i64(rkbuf, &Offset);
260 } else if (api_version == 0) {
261 rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
262 /* We only request one offset so just grab
263 * the first one. */
264 while (OffsetArrayCnt-- > 0)
265 rd_kafka_buf_read_i64(rkbuf, &Offset);
266 } else {
267 rd_kafka_assert(NULL, !*"NOTREACHED");
268 }
269
270 rktpar = rd_kafka_topic_partition_list_add(
271 offsets, topic_name, kpartition);
272 rktpar->err = ErrorCode;
273 rktpar->offset = Offset;
274 }
275 }
276
277 goto done;
278
279 err_parse:
280 ErrorCode = rkbuf->rkbuf_err;
281 err:
282 actions = rd_kafka_err_action(
283 rkb, ErrorCode, request,
284 RD_KAFKA_ERR_ACTION_PERMANENT,
285 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
286
287 RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
288 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
289
290 RD_KAFKA_ERR_ACTION_END);
291
292 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
293 char tmp[256];
294 /* Re-query for leader */
295 rd_snprintf(tmp, sizeof(tmp),
296 "OffsetRequest failed: %s",
297 rd_kafka_err2str(ErrorCode));
298 rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
299 tmp);
300 }
301
302 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
303 if (rd_kafka_buf_retry(rkb, request))
304 return RD_KAFKA_RESP_ERR__IN_PROGRESS;
305 /* FALLTHRU */
306 }
307
308done:
309 return ErrorCode;
310}
311
312
313
314
315
316
317/**
318 * Send OffsetRequest for toppar 'rktp'.
319 */
320void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
321 rd_kafka_topic_partition_list_t *partitions,
322 int16_t api_version,
323 rd_kafka_replyq_t replyq,
324 rd_kafka_resp_cb_t *resp_cb,
325 void *opaque) {
326 rd_kafka_buf_t *rkbuf;
327 int i;
328 size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
329 const char *last_topic = "";
330 int32_t topic_cnt = 0, part_cnt = 0;
331
332 rd_kafka_topic_partition_list_sort_by_topic(partitions);
333
334 rkbuf = rd_kafka_buf_new_request(
335 rkb, RD_KAFKAP_Offset, 1,
336 /* ReplicaId+TopicArrayCnt+Topic */
337 4+4+100+
338 /* PartArrayCnt */
339 4 +
340 /* partition_cnt * Partition+Time+MaxNumOffs */
341 (partitions->cnt * (4+8+4)));
342
343 /* ReplicaId */
344 rd_kafka_buf_write_i32(rkbuf, -1);
345 /* TopicArrayCnt */
346 of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
347
348 for (i = 0 ; i < partitions->cnt ; i++) {
349 const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
350
351 if (strcmp(rktpar->topic, last_topic)) {
352 /* Finish last topic, if any. */
353 if (of_PartArrayCnt > 0)
354 rd_kafka_buf_update_i32(rkbuf,
355 of_PartArrayCnt,
356 part_cnt);
357
358 /* Topic */
359 rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
360 topic_cnt++;
361 last_topic = rktpar->topic;
362 /* New topic so reset partition count */
363 part_cnt = 0;
364
365 /* PartitionArrayCnt: updated later */
366 of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
367 }
368
369 /* Partition */
370 rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
371 part_cnt++;
372
373 /* Time/Offset */
374 rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
375
376 if (api_version == 0) {
377 /* MaxNumberOfOffsets */
378 rd_kafka_buf_write_i32(rkbuf, 1);
379 }
380 }
381
382 if (of_PartArrayCnt > 0) {
383 rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
384 rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
385 }
386
387 rd_kafka_buf_ApiVersion_set(rkbuf, api_version,
388 api_version == 1 ?
389 RD_KAFKA_FEATURE_OFFSET_TIME : 0);
390
391 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
392 "OffsetRequest (v%hd, opv %d) "
393 "for %"PRId32" topic(s) and %"PRId32" partition(s)",
394 api_version, rkbuf->rkbuf_replyq.version,
395 topic_cnt, partitions->cnt);
396
397 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
398}
399
400
401/**
402 * Generic handler for OffsetFetch responses.
403 * Offsets for included partitions will be propagated through the passed
404 * 'offsets' list.
405 *
406 * \p update_toppar: update toppar's committed_offset
407 */
408rd_kafka_resp_err_t
409rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
410 rd_kafka_broker_t *rkb,
411 rd_kafka_resp_err_t err,
412 rd_kafka_buf_t *rkbuf,
413 rd_kafka_buf_t *request,
414 rd_kafka_topic_partition_list_t *offsets,
415 int update_toppar) {
416 const int log_decode_errors = LOG_ERR;
417 int32_t TopicArrayCnt;
418 int64_t offset = RD_KAFKA_OFFSET_INVALID;
419 rd_kafkap_str_t metadata;
420 int i;
421 int actions;
422 int seen_cnt = 0;
423
424 if (err)
425 goto err;
426
427 /* Set default offset for all partitions. */
428 rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0,
429 RD_KAFKA_OFFSET_INVALID,
430 0 /* !is commit */);
431
432 rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
433 for (i = 0 ; i < TopicArrayCnt ; i++) {
434 rd_kafkap_str_t topic;
435 int32_t PartArrayCnt;
436 char *topic_name;
437 int j;
438
439 rd_kafka_buf_read_str(rkbuf, &topic);
440 rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
441
442 RD_KAFKAP_STR_DUPA(&topic_name, &topic);
443
444 for (j = 0 ; j < PartArrayCnt ; j++) {
445 int32_t partition;
446 shptr_rd_kafka_toppar_t *s_rktp;
447 rd_kafka_topic_partition_t *rktpar;
448 int16_t err2;
449
450 rd_kafka_buf_read_i32(rkbuf, &partition);
451 rd_kafka_buf_read_i64(rkbuf, &offset);
452 rd_kafka_buf_read_str(rkbuf, &metadata);
453 rd_kafka_buf_read_i16(rkbuf, &err2);
454
455 rktpar = rd_kafka_topic_partition_list_find(offsets,
456 topic_name,
457 partition);
458 if (!rktpar) {
459 rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
460 "OffsetFetchResponse: %s [%"PRId32"] "
461 "not found in local list: ignoring",
462 topic_name, partition);
463 continue;
464 }
465
466 seen_cnt++;
467
468 if (!(s_rktp = rktpar->_private)) {
469 s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk,
470 topic_name,
471 partition, 0, 0);
472 /* May be NULL if topic is not locally known */
473 rktpar->_private = s_rktp;
474 }
475
476 /* broker reports invalid offset as -1 */
477 if (offset == -1)
478 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
479 else
480 rktpar->offset = offset;
481 rktpar->err = err2;
482
483 rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
484 "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64,
485 topic_name, partition, offset);
486
487 if (update_toppar && !err2 && s_rktp) {
488 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
489 /* Update toppar's committed offset */
490 rd_kafka_toppar_lock(rktp);
491 rktp->rktp_committed_offset = rktpar->offset;
492 rd_kafka_toppar_unlock(rktp);
493 }
494
495
496 if (rktpar->metadata)
497 rd_free(rktpar->metadata);
498
499 if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
500 rktpar->metadata = NULL;
501 rktpar->metadata_size = 0;
502 } else {
503 rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
504 rktpar->metadata_size =
505 RD_KAFKAP_STR_LEN(&metadata);
506 }
507 }
508 }
509
510
511err:
512 rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
513 "OffsetFetch for %d/%d partition(s) returned %s",
514 seen_cnt,
515 offsets ? offsets->cnt : -1, rd_kafka_err2str(err));
516
517 actions = rd_kafka_err_action(rkb, err, request,
518 RD_KAFKA_ERR_ACTION_END);
519
520 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
521 /* Re-query for coordinator */
522 rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL,
523 RD_KAFKA_NO_REPLYQ,
524 RD_KAFKA_OP_COORD_QUERY, err);
525 }
526
527 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
528 if (rd_kafka_buf_retry(rkb, request))
529 return RD_KAFKA_RESP_ERR__IN_PROGRESS;
530 /* FALLTHRU */
531 }
532
533 return err;
534
535 err_parse:
536 err = rkbuf->rkbuf_err;
537 goto err;
538}
539
540
541
542/**
543 * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH
544 * rko in \p opaque.
545 *
546 * @param opaque rko wrapper for handle_OffsetFetch.
547 *
548 * The \c rko->rko_u.offset_fetch.partitions list will be filled in with
549 * the fetched offsets.
550 *
551 * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
552 *
553 * @remark \p rkb, \p rkbuf and \p request are optional.
554 *
555 * @remark The \p request buffer may be retried on error.
556 *
557 * @locality cgrp's broker thread
558 */
559void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
560 rd_kafka_broker_t *rkb,
561 rd_kafka_resp_err_t err,
562 rd_kafka_buf_t *rkbuf,
563 rd_kafka_buf_t *request,
564 void *opaque) {
565 rd_kafka_op_t *rko = opaque;
566 rd_kafka_op_t *rko_reply;
567 rd_kafka_topic_partition_list_t *offsets;
568
569 RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
570
571 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
572 /* Termination, quick cleanup. */
573 rd_kafka_op_destroy(rko);
574 return;
575 }
576
577 offsets = rd_kafka_topic_partition_list_copy(
578 rko->rko_u.offset_fetch.partitions);
579
580 /* If all partitions already had usable offsets then there
581 * was no request sent and thus no reply, the offsets list is
582 * good to go.. */
583 if (rkbuf) {
584 /* ..else parse the response (or perror) */
585 err = rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
586 request, offsets, 0);
587 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
588 rd_kafka_topic_partition_list_destroy(offsets);
589 return; /* Retrying */
590 }
591 }
592
593 rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY);
594 rko_reply->rko_err = err;
595 rko_reply->rko_u.offset_fetch.partitions = offsets;
596 rko_reply->rko_u.offset_fetch.do_free = 1;
597 if (rko->rko_rktp)
598 rko_reply->rko_rktp = rd_kafka_toppar_keep(
599 rd_kafka_toppar_s2i(rko->rko_rktp));
600
601 rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
602
603 rd_kafka_op_destroy(rko);
604}
605
606
607
608
609
610
611/**
612 * Send OffsetFetchRequest for toppar.
613 *
614 * Any partition with a usable offset will be ignored, if all partitions
615 * have usable offsets then no request is sent at all but an empty
616 * reply is enqueued on the replyq.
617 */
618void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
619 int16_t api_version,
620 rd_kafka_topic_partition_list_t *parts,
621 rd_kafka_replyq_t replyq,
622 rd_kafka_resp_cb_t *resp_cb,
623 void *opaque) {
624 rd_kafka_buf_t *rkbuf;
625 size_t of_TopicCnt;
626 int TopicCnt = 0;
627 ssize_t of_PartCnt = -1;
628 const char *last_topic = NULL;
629 int PartCnt = 0;
630 int tot_PartCnt = 0;
631 int i;
632
633 rkbuf = rd_kafka_buf_new_request(
634 rkb, RD_KAFKAP_OffsetFetch, 1,
635 RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) +
636 4 +
637 (parts->cnt * 32));
638
639
640 /* ConsumerGroup */
641 rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id);
642
643 /* Sort partitions by topic */
644 rd_kafka_topic_partition_list_sort_by_topic(parts);
645
646 /* TopicArrayCnt */
647 of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
648
649 for (i = 0 ; i < parts->cnt ; i++) {
650 rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
651
652 /* Ignore partitions with a usable offset. */
653 if (rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
654 rktpar->offset != RD_KAFKA_OFFSET_STORED) {
655 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
656 "OffsetFetchRequest: skipping %s [%"PRId32"] "
657 "with valid offset %s",
658 rktpar->topic, rktpar->partition,
659 rd_kafka_offset2str(rktpar->offset));
660 continue;
661 }
662
663 if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
664 /* New topic */
665
666 /* Finalize previous PartitionCnt */
667 if (PartCnt > 0)
668 rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
669 PartCnt);
670
671 /* TopicName */
672 rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
673 /* PartitionCnt, finalized later */
674 of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
675 PartCnt = 0;
676 last_topic = rktpar->topic;
677 TopicCnt++;
678 }
679
680 /* Partition */
681 rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
682 PartCnt++;
683 tot_PartCnt++;
684 }
685
686 /* Finalize previous PartitionCnt */
687 if (PartCnt > 0)
688 rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
689
690 /* Finalize TopicCnt */
691 rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
692
693 rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
694
695 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
696 "OffsetFetchRequest(v%d) for %d/%d partition(s)",
697 api_version, tot_PartCnt, parts->cnt);
698
699 if (tot_PartCnt == 0) {
700 /* No partitions needs OffsetFetch, enqueue empty
701 * response right away. */
702 rkbuf->rkbuf_replyq = replyq;
703 rkbuf->rkbuf_cb = resp_cb;
704 rkbuf->rkbuf_opaque = opaque;
705 rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
706 return;
707 }
708
709 rd_rkb_dbg(rkb, CGRP|RD_KAFKA_DBG_CONSUMER, "OFFSET",
710 "Fetch committed offsets for %d/%d partition(s)",
711 tot_PartCnt, parts->cnt);
712
713
714 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
715}
716
717
718/**
719 * @remark \p offsets may be NULL if \p err is set
720 */
721rd_kafka_resp_err_t
722rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
723 rd_kafka_broker_t *rkb,
724 rd_kafka_resp_err_t err,
725 rd_kafka_buf_t *rkbuf,
726 rd_kafka_buf_t *request,
727 rd_kafka_topic_partition_list_t *offsets) {
728 const int log_decode_errors = LOG_ERR;
729 int32_t TopicArrayCnt;
730 int16_t ErrorCode = 0, last_ErrorCode = 0;
731 int errcnt = 0;
732 int i;
733 int actions;
734
735 if (err)
736 goto err;
737
738 rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
739 for (i = 0 ; i < TopicArrayCnt ; i++) {
740 rd_kafkap_str_t topic;
741 char *topic_str;
742 int32_t PartArrayCnt;
743 int j;
744
745 rd_kafka_buf_read_str(rkbuf, &topic);
746 rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
747
748 RD_KAFKAP_STR_DUPA(&topic_str, &topic);
749
750 for (j = 0 ; j < PartArrayCnt ; j++) {
751 int32_t partition;
752 rd_kafka_topic_partition_t *rktpar;
753
754 rd_kafka_buf_read_i32(rkbuf, &partition);
755 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
756
757 rktpar = rd_kafka_topic_partition_list_find(
758 offsets, topic_str, partition);
759
760 if (!rktpar) {
761 /* Received offset for topic/partition we didn't
762 * ask for, this shouldn't really happen. */
763 continue;
764 }
765
766 rktpar->err = ErrorCode;
767 if (ErrorCode) {
768 last_ErrorCode = ErrorCode;
769 errcnt++;
770 }
771 }
772 }
773
774 /* If all partitions failed use error code
775 * from last partition as the global error. */
776 if (offsets && errcnt == offsets->cnt)
777 err = last_ErrorCode;
778 goto done;
779
780 err_parse:
781 err = rkbuf->rkbuf_err;
782
783 err:
784 actions = rd_kafka_err_action(
785 rkb, err, request,
786
787 RD_KAFKA_ERR_ACTION_PERMANENT,
788 RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
789
790 RD_KAFKA_ERR_ACTION_RETRY,
791 RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
792
793 RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
794 RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
795
796 RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
797 RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
798
799 RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
800 RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
801
802 RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
803 RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
804
805 RD_KAFKA_ERR_ACTION_RETRY,
806 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
807
808 RD_KAFKA_ERR_ACTION_PERMANENT,
809 RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
810
811 RD_KAFKA_ERR_ACTION_PERMANENT,
812 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
813
814 RD_KAFKA_ERR_ACTION_PERMANENT,
815 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
816
817 RD_KAFKA_ERR_ACTION_END);
818
819 if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) {
820 /* Mark coordinator dead or re-query for coordinator.
821 * ..dead() will trigger a re-query. */
822 if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
823 rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
824 "OffsetCommitRequest failed");
825 else
826 rd_kafka_cgrp_coord_query(rk->rk_cgrp,
827 "OffsetCommitRequest failed");
828 }
829 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
830 if (rd_kafka_buf_retry(rkb, request))
831 return RD_KAFKA_RESP_ERR__IN_PROGRESS;
832 /* FALLTHRU */
833 }
834
835 done:
836 return err;
837}
838
839
840
841
842/**
843 * @brief Send OffsetCommitRequest for a list of partitions.
844 *
845 * @returns 0 if none of the partitions in \p offsets had valid offsets,
846 * else 1.
847 */
848int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
849 rd_kafka_cgrp_t *rkcg,
850 int16_t api_version,
851 rd_kafka_topic_partition_list_t *offsets,
852 rd_kafka_replyq_t replyq,
853 rd_kafka_resp_cb_t *resp_cb,
854 void *opaque, const char *reason) {
855 rd_kafka_buf_t *rkbuf;
856 ssize_t of_TopicCnt = -1;
857 int TopicCnt = 0;
858 const char *last_topic = NULL;
859 ssize_t of_PartCnt = -1;
860 int PartCnt = 0;
861 int tot_PartCnt = 0;
862 int i;
863
864 rd_kafka_assert(NULL, offsets != NULL);
865
866 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit,
867 1, 100 + (offsets->cnt * 128));
868
869 /* ConsumerGroup */
870 rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id);
871
872 /* v1,v2 */
873 if (api_version >= 1) {
874 /* ConsumerGroupGenerationId */
875 rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id);
876 /* ConsumerId */
877 rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id);
878 /* v2: RetentionTime */
879 if (api_version == 2)
880 rd_kafka_buf_write_i64(rkbuf, -1);
881 }
882
883 /* Sort offsets by topic */
884 rd_kafka_topic_partition_list_sort_by_topic(offsets);
885
886 /* TopicArrayCnt: Will be updated when we know the number of topics. */
887 of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
888
889 for (i = 0 ; i < offsets->cnt ; i++) {
890 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
891
892 /* Skip partitions with invalid offset. */
893 if (rktpar->offset < 0)
894 continue;
895
896 if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
897 /* New topic */
898
899 /* Finalize previous PartitionCnt */
900 if (PartCnt > 0)
901 rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
902 PartCnt);
903
904 /* TopicName */
905 rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
906 /* PartitionCnt, finalized later */
907 of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
908 PartCnt = 0;
909 last_topic = rktpar->topic;
910 TopicCnt++;
911 }
912
913 /* Partition */
914 rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
915 PartCnt++;
916 tot_PartCnt++;
917
918 /* Offset */
919 rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
920
921 /* v1: TimeStamp */
922 if (api_version == 1)
923 rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time
924
925 /* Metadata */
926 /* Java client 0.9.0 and broker <0.10.0 can't parse
927 * Null metadata fields, so as a workaround we send an
928 * empty string if it's Null. */
929 if (!rktpar->metadata)
930 rd_kafka_buf_write_str(rkbuf, "", 0);
931 else
932 rd_kafka_buf_write_str(rkbuf,
933 rktpar->metadata,
934 rktpar->metadata_size);
935 }
936
937 if (tot_PartCnt == 0) {
938 /* No topic+partitions had valid offsets to commit. */
939 rd_kafka_replyq_destroy(&replyq);
940 rd_kafka_buf_destroy(rkbuf);
941 return 0;
942 }
943
944 /* Finalize previous PartitionCnt */
945 if (PartCnt > 0)
946 rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
947
948 /* Finalize TopicCnt */
949 rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
950
951 rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);
952
953 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
954 "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
955 api_version, tot_PartCnt, offsets->cnt, reason);
956
957 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
958
959 return 1;
960
961}
962
963
964
965/**
966 * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
967 * enveloping buffer \p rkbuf.
968 */
969static void rd_kafka_group_MemberState_consumer_write (
970 rd_kafka_buf_t *env_rkbuf,
971 const rd_kafka_group_member_t *rkgm) {
972 rd_kafka_buf_t *rkbuf;
973 int i;
974 const char *last_topic = NULL;
975 size_t of_TopicCnt;
976 ssize_t of_PartCnt = -1;
977 int TopicCnt = 0;
978 int PartCnt = 0;
979 rd_slice_t slice;
980
981 rkbuf = rd_kafka_buf_new(1, 100);
982 rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
983 of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
984 for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) {
985 const rd_kafka_topic_partition_t *rktpar;
986
987 rktpar = &rkgm->rkgm_assignment->elems[i];
988
989 if (!last_topic || strcmp(last_topic,
990 rktpar->topic)) {
991 if (last_topic)
992 /* Finalize previous PartitionCnt */
993 rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
994 PartCnt);
995 rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
996 /* Updated later */
997 of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
998 PartCnt = 0;
999 last_topic = rktpar->topic;
1000 TopicCnt++;
1001 }
1002
1003 rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
1004 PartCnt++;
1005 }
1006
1007 if (of_PartCnt != -1)
1008 rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt);
1009 rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
1010
1011 rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
1012
1013 /* Get pointer to binary buffer */
1014 rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
1015
1016 /* Write binary buffer as Kafka Bytes to enveloping buffer. */
1017 rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
1018 rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
1019
1020 rd_kafka_buf_destroy(rkbuf);
1021}
1022
1023/**
1024 * Send SyncGroupRequest
1025 */
1026void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
1027 const rd_kafkap_str_t *group_id,
1028 int32_t generation_id,
1029 const rd_kafkap_str_t *member_id,
1030 const rd_kafka_group_member_t
1031 *assignments,
1032 int assignment_cnt,
1033 rd_kafka_replyq_t replyq,
1034 rd_kafka_resp_cb_t *resp_cb,
1035 void *opaque) {
1036 rd_kafka_buf_t *rkbuf;
1037 int i;
1038
1039 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup,
1040 1,
1041 RD_KAFKAP_STR_SIZE(group_id) +
1042 4 /* GenerationId */ +
1043 RD_KAFKAP_STR_SIZE(member_id) +
1044 4 /* array size group_assignment */ +
1045 (assignment_cnt * 100/*guess*/));
1046 rd_kafka_buf_write_kstr(rkbuf, group_id);
1047 rd_kafka_buf_write_i32(rkbuf, generation_id);
1048 rd_kafka_buf_write_kstr(rkbuf, member_id);
1049 rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
1050
1051 for (i = 0 ; i < assignment_cnt ; i++) {
1052 const rd_kafka_group_member_t *rkgm = &assignments[i];
1053
1054 rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
1055 rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
1056 }
1057
1058 /* This is a blocking request */
1059 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
1060 rd_kafka_buf_set_abs_timeout(
1061 rkbuf,
1062 rkb->rkb_rk->rk_conf.group_session_timeout_ms +
1063 3000/* 3s grace period*/,
1064 0);
1065
1066 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
1067}
1068
1069/**
1070 * Handler for SyncGroup responses
1071 * opaque must be the cgrp handle.
1072 */
1073void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
1074 rd_kafka_broker_t *rkb,
1075 rd_kafka_resp_err_t err,
1076 rd_kafka_buf_t *rkbuf,
1077 rd_kafka_buf_t *request,
1078 void *opaque) {
1079 rd_kafka_cgrp_t *rkcg = opaque;
1080 const int log_decode_errors = LOG_ERR;
1081 int16_t ErrorCode = 0;
1082 rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
1083 int actions;
1084
1085 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
1086 rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
1087 "SyncGroup response: discarding outdated request "
1088 "(now in join-state %s)",
1089 rd_kafka_cgrp_join_state_names[rkcg->
1090 rkcg_join_state]);
1091 return;
1092 }
1093
1094 if (err) {
1095 ErrorCode = err;
1096 goto err;
1097 }
1098
1099 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1100 rd_kafka_buf_read_bytes(rkbuf, &MemberState);
1101
1102err:
1103 actions = rd_kafka_err_action(rkb, ErrorCode, request,
1104 RD_KAFKA_ERR_ACTION_END);
1105
1106 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
1107 /* Re-query for coordinator */
1108 rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
1109 RD_KAFKA_OP_COORD_QUERY,
1110 ErrorCode);
1111 /* FALLTHRU */
1112 }
1113
1114 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1115 if (rd_kafka_buf_retry(rkb, request))
1116 return;
1117 /* FALLTHRU */
1118 }
1119
1120 rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
1121 "SyncGroup response: %s (%d bytes of MemberState data)",
1122 rd_kafka_err2str(ErrorCode),
1123 RD_KAFKAP_BYTES_LEN(&MemberState));
1124
1125 if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
1126 return; /* Termination */
1127
1128 rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState);
1129
1130 return;
1131
1132 err_parse:
1133 ErrorCode = rkbuf->rkbuf_err;
1134 goto err;
1135}
1136
1137
1138/**
1139 * Send JoinGroupRequest
1140 */
1141void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
1142 const rd_kafkap_str_t *group_id,
1143 const rd_kafkap_str_t *member_id,
1144 const rd_kafkap_str_t *protocol_type,
1145 const rd_list_t *topics,
1146 rd_kafka_replyq_t replyq,
1147 rd_kafka_resp_cb_t *resp_cb,
1148 void *opaque) {
1149 rd_kafka_buf_t *rkbuf;
1150 rd_kafka_t *rk = rkb->rkb_rk;
1151 rd_kafka_assignor_t *rkas;
1152 int i;
1153 int16_t ApiVersion = 0;
1154 int features;
1155
1156 ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
1157 RD_KAFKAP_JoinGroup,
1158 0, 2,
1159 &features);
1160
1161
1162 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup,
1163 1,
1164 RD_KAFKAP_STR_SIZE(group_id) +
1165 4 /* sessionTimeoutMs */ +
1166 4 /* rebalanceTimeoutMs */ +
1167 RD_KAFKAP_STR_SIZE(member_id) +
1168 RD_KAFKAP_STR_SIZE(protocol_type) +
1169 4 /* array count GroupProtocols */ +
1170 (rd_list_cnt(topics) * 100));
1171 rd_kafka_buf_write_kstr(rkbuf, group_id);
1172 rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
1173 if (ApiVersion >= 1)
1174 rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.max_poll_interval_ms);
1175 rd_kafka_buf_write_kstr(rkbuf, member_id);
1176 rd_kafka_buf_write_kstr(rkbuf, protocol_type);
1177 rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
1178
1179 RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
1180 rd_kafkap_bytes_t *member_metadata;
1181 if (!rkas->rkas_enabled)
1182 continue;
1183 rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
1184 member_metadata = rkas->rkas_get_metadata_cb(rkas, topics);
1185 rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
1186 rd_kafkap_bytes_destroy(member_metadata);
1187 }
1188
1189 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
1190
1191 if (ApiVersion < 1 &&
1192 rk->rk_conf.max_poll_interval_ms >
1193 rk->rk_conf.group_session_timeout_ms &&
1194 rd_interval(&rkb->rkb_suppress.unsupported_kip62,
1195 /* at most once per day */
1196 (rd_ts_t)86400 * 1000 * 1000, 0) > 0)
1197 rd_rkb_log(rkb, LOG_NOTICE, "MAXPOLL",
1198 "Broker does not support KIP-62 "
1199 "(requires Apache Kafka >= v0.10.1.0): "
1200 "consumer configuration "
1201 "`max.poll.interval.ms` (%d) "
1202 "is effectively limited "
1203 "by `session.timeout.ms` (%d) "
1204 "with this broker version",
1205 rk->rk_conf.max_poll_interval_ms,
1206 rk->rk_conf.group_session_timeout_ms);
1207
1208 /* Absolute timeout */
1209 rd_kafka_buf_set_abs_timeout_force(
1210 rkbuf,
1211 /* Request timeout is max.poll.interval.ms + grace
1212 * if the broker supports it, else
1213 * session.timeout.ms + grace. */
1214 (ApiVersion >= 1 ?
1215 rk->rk_conf.max_poll_interval_ms :
1216 rk->rk_conf.group_session_timeout_ms) +
1217 3000/* 3s grace period*/,
1218 0);
1219
1220 /* This is a blocking request */
1221 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
1222
1223 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
1224}
1225
1226
1227
1228
1229
1230
1231/**
1232 * Send LeaveGroupRequest
1233 */
1234void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
1235 const rd_kafkap_str_t *group_id,
1236 const rd_kafkap_str_t *member_id,
1237 rd_kafka_replyq_t replyq,
1238 rd_kafka_resp_cb_t *resp_cb,
1239 void *opaque) {
1240 rd_kafka_buf_t *rkbuf;
1241
1242 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup,
1243 1,
1244 RD_KAFKAP_STR_SIZE(group_id) +
1245 RD_KAFKAP_STR_SIZE(member_id));
1246 rd_kafka_buf_write_kstr(rkbuf, group_id);
1247 rd_kafka_buf_write_kstr(rkbuf, member_id);
1248
1249 /* LeaveGroupRequests are best-effort, the local consumer
1250 * does not care if it succeeds or not, so the request timeout
1251 * is shortened.
1252 * Retries are not needed. */
1253 rd_kafka_buf_set_abs_timeout(rkbuf, 5000, 0);
1254 rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
1255
1256 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
1257}
1258
1259
1260/**
1261 * Handler for LeaveGroup responses
1262 * opaque must be the cgrp handle.
1263 */
1264void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
1265 rd_kafka_broker_t *rkb,
1266 rd_kafka_resp_err_t err,
1267 rd_kafka_buf_t *rkbuf,
1268 rd_kafka_buf_t *request,
1269 void *opaque) {
1270 rd_kafka_cgrp_t *rkcg = opaque;
1271 const int log_decode_errors = LOG_ERR;
1272 int16_t ErrorCode = 0;
1273 int actions;
1274
1275 if (err) {
1276 ErrorCode = err;
1277 goto err;
1278 }
1279
1280 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1281
1282err:
1283 actions = rd_kafka_err_action(rkb, ErrorCode, request,
1284 RD_KAFKA_ERR_ACTION_END);
1285
1286 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
1287 /* Re-query for coordinator */
1288 rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
1289 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
1290 }
1291
1292 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1293 if (rd_kafka_buf_retry(rkb, request))
1294 return;
1295 /* FALLTHRU */
1296 }
1297
1298 if (ErrorCode)
1299 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
1300 "LeaveGroup response: %s",
1301 rd_kafka_err2str(ErrorCode));
1302
1303 return;
1304
1305 err_parse:
1306 ErrorCode = rkbuf->rkbuf_err;
1307 goto err;
1308}
1309
1310
1311
1312
1313
1314
1315/**
1316 * Send HeartbeatRequest
1317 */
1318void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
1319 const rd_kafkap_str_t *group_id,
1320 int32_t generation_id,
1321 const rd_kafkap_str_t *member_id,
1322 rd_kafka_replyq_t replyq,
1323 rd_kafka_resp_cb_t *resp_cb,
1324 void *opaque) {
1325 rd_kafka_buf_t *rkbuf;
1326
1327 rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
1328 "Heartbeat for group \"%s\" generation id %"PRId32,
1329 group_id->str, generation_id);
1330
1331 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat,
1332 1,
1333 RD_KAFKAP_STR_SIZE(group_id) +
1334 4 /* GenerationId */ +
1335 RD_KAFKAP_STR_SIZE(member_id));
1336
1337 rd_kafka_buf_write_kstr(rkbuf, group_id);
1338 rd_kafka_buf_write_i32(rkbuf, generation_id);
1339 rd_kafka_buf_write_kstr(rkbuf, member_id);
1340
1341 rd_kafka_buf_set_abs_timeout(
1342 rkbuf,
1343 rkb->rkb_rk->rk_conf.group_session_timeout_ms,
1344 0);
1345
1346 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
1347}
1348
1349
1350
1351
1352/**
1353 * Send ListGroupsRequest
1354 */
1355void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
1356 rd_kafka_replyq_t replyq,
1357 rd_kafka_resp_cb_t *resp_cb,
1358 void *opaque) {
1359 rd_kafka_buf_t *rkbuf;
1360
1361 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0);
1362
1363 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
1364}
1365
1366
1367/**
1368 * Send DescribeGroupsRequest
1369 */
1370void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
1371 const char **groups, int group_cnt,
1372 rd_kafka_replyq_t replyq,
1373 rd_kafka_resp_cb_t *resp_cb,
1374 void *opaque) {
1375 rd_kafka_buf_t *rkbuf;
1376
1377 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups,
1378 1, 32*group_cnt);
1379
1380 rd_kafka_buf_write_i32(rkbuf, group_cnt);
1381 while (group_cnt-- > 0)
1382 rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
1383
1384 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
1385}
1386
1387
1388
1389
1390/**
1391 * @brief Generic handler for Metadata responses
1392 *
1393 * @locality rdkafka main thread
1394 */
1395static void rd_kafka_handle_Metadata (rd_kafka_t *rk,
1396 rd_kafka_broker_t *rkb,
1397 rd_kafka_resp_err_t err,
1398 rd_kafka_buf_t *rkbuf,
1399 rd_kafka_buf_t *request,
1400 void *opaque) {
1401 rd_kafka_op_t *rko = opaque; /* Possibly NULL */
1402 struct rd_kafka_metadata *md = NULL;
1403 const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
1404 int actions;
1405
1406 rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
1407 thrd_is_current(rk->rk_thread));
1408
1409 /* Avoid metadata updates when we're terminating. */
1410 if (rd_kafka_terminating(rkb->rkb_rk) ||
1411 err == RD_KAFKA_RESP_ERR__DESTROY) {
1412 /* Terminating */
1413 goto done;
1414 }
1415
1416 if (err)
1417 goto err;
1418
1419 if (!topics)
1420 rd_rkb_dbg(rkb, METADATA, "METADATA",
1421 "===== Received metadata: %s =====",
1422 request->rkbuf_u.Metadata.reason);
1423 else
1424 rd_rkb_dbg(rkb, METADATA, "METADATA",
1425 "===== Received metadata "
1426 "(for %d requested topics): %s =====",
1427 rd_list_cnt(topics),
1428 request->rkbuf_u.Metadata.reason);
1429
1430 err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md);
1431 if (err)
1432 goto err;
1433
1434 if (rko && rko->rko_replyq.q) {
1435 /* Reply to metadata requester, passing on the metadata.
1436 * Reuse requesting rko for the reply. */
1437 rko->rko_err = err;
1438 rko->rko_u.metadata.md = md;
1439
1440 rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
1441 rko = NULL;
1442 } else {
1443 if (md)
1444 rd_free(md);
1445 }
1446
1447 goto done;
1448
1449 err:
1450 actions = rd_kafka_err_action(
1451 rkb, err, request,
1452
1453 RD_KAFKA_ERR_ACTION_RETRY,
1454 RD_KAFKA_RESP_ERR__PARTIAL,
1455
1456 RD_KAFKA_ERR_ACTION_END);
1457
1458 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1459 if (rd_kafka_buf_retry(rkb, request))
1460 return;
1461 /* FALLTHRU */
1462 } else {
1463 rd_rkb_log(rkb, LOG_WARNING, "METADATA",
1464 "Metadata request failed: %s: %s (%dms): %s",
1465 request->rkbuf_u.Metadata.reason,
1466 rd_kafka_err2str(err),
1467 (int)(request->rkbuf_ts_sent/1000),
1468 rd_kafka_actions2str(actions));
1469 }
1470
1471
1472
1473 /* FALLTHRU */
1474
1475 done:
1476 if (rko)
1477 rd_kafka_op_destroy(rko);
1478}
1479
1480
1481/**
1482 * @brief Construct MetadataRequest (does not send)
1483 *
1484 * \p topics is a list of topic names (char *) to request.
1485 *
1486 * !topics - only request brokers (if supported by broker, else
1487 * all topics)
1488 * topics.cnt==0 - all topics in cluster are requested
1489 * topics.cnt >0 - only specified topics are requested
1490 *
1491 * @param reason - metadata request reason
1492 * @param rko - (optional) rko with replyq for handling response.
1493 * Specifying an rko forces a metadata request even if
1494 * there is already a matching one in-transit.
1495 *
1496 * If full metadata for all topics is requested (or all brokers, which
1497 * results in all-topics on older brokers) and there is already a full request
1498 * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
1499 * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
1500 * is sent regardless.
1501 */
1502rd_kafka_resp_err_t
1503rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
1504 const rd_list_t *topics, const char *reason,
1505 rd_kafka_op_t *rko) {
1506 rd_kafka_buf_t *rkbuf;
1507 int16_t ApiVersion = 0;
1508 int features;
1509 int topic_cnt = topics ? rd_list_cnt(topics) : 0;
1510 int *full_incr = NULL;
1511
1512 ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
1513 RD_KAFKAP_Metadata,
1514 0, 2,
1515 &features);
1516
1517 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1,
1518 4 + (50 * topic_cnt));
1519
1520 if (!reason)
1521 reason = "";
1522
1523 rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
1524
1525 if (!topics && ApiVersion >= 1) {
1526 /* a null(0) array (in the protocol) represents no topics */
1527 rd_kafka_buf_write_i32(rkbuf, 0);
1528 rd_rkb_dbg(rkb, METADATA, "METADATA",
1529 "Request metadata for brokers only: %s", reason);
1530 full_incr = &rkb->rkb_rk->rk_metadata_cache.
1531 rkmc_full_brokers_sent;
1532
1533 } else {
1534 if (topic_cnt == 0 && !rko)
1535 full_incr = &rkb->rkb_rk->rk_metadata_cache.
1536 rkmc_full_topics_sent;
1537
1538 if (topic_cnt == 0 && ApiVersion >= 1)
1539 rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/
1540 else
1541 rd_kafka_buf_write_i32(rkbuf, topic_cnt);
1542
1543 if (topic_cnt == 0) {
1544 rkbuf->rkbuf_u.Metadata.all_topics = 1;
1545 rd_rkb_dbg(rkb, METADATA, "METADATA",
1546 "Request metadata for all topics: "
1547 "%s", reason);
1548 } else
1549 rd_rkb_dbg(rkb, METADATA, "METADATA",
1550 "Request metadata for %d topic(s): "
1551 "%s", topic_cnt, reason);
1552 }
1553
1554 if (full_incr) {
1555 /* Avoid multiple outstanding full requests
1556 * (since they are redundant and side-effect-less).
1557 * Forced requests (app using metadata() API) are passed
1558 * through regardless. */
1559
1560 mtx_lock(&rkb->rkb_rk->rk_metadata_cache.
1561 rkmc_full_lock);
1562 if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
1563 mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
1564 rkmc_full_lock);
1565 rd_rkb_dbg(rkb, METADATA, "METADATA",
1566 "Skipping metadata request: %s: "
1567 "full request already in-transit",
1568 reason);
1569 rd_kafka_buf_destroy(rkbuf);
1570 return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1571 }
1572
1573 (*full_incr)++;
1574 mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
1575 rkmc_full_lock);
1576 rkbuf->rkbuf_u.Metadata.decr = full_incr;
1577 rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk->
1578 rk_metadata_cache.rkmc_full_lock;
1579 }
1580
1581
1582 if (topic_cnt > 0) {
1583 char *topic;
1584 int i;
1585
1586 /* Maintain a copy of the topics list so we can purge
1587 * hints from the metadata cache on error. */
1588 rkbuf->rkbuf_u.Metadata.topics =
1589 rd_list_copy(topics, rd_list_string_copy, NULL);
1590
1591 RD_LIST_FOREACH(topic, topics, i)
1592 rd_kafka_buf_write_str(rkbuf, topic, -1);
1593
1594 }
1595
1596 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
1597
1598 /* Metadata requests are part of the important control plane
1599 * and should go before most other requests (Produce, Fetch, etc). */
1600 rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH;
1601
1602 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
1603 /* Handle response thru rk_ops,
1604 * but forward parsed result to
1605 * rko's replyq when done. */
1606 RD_KAFKA_REPLYQ(rkb->rkb_rk->
1607 rk_ops, 0),
1608 rd_kafka_handle_Metadata, rko);
1609
1610 return RD_KAFKA_RESP_ERR_NO_ERROR;
1611}
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621/**
1622 * @brief Parses and handles ApiVersion reply.
1623 *
1624 * @param apis will be allocated, populated and sorted
1625 * with broker's supported APIs.
1626 * @param api_cnt will be set to the number of elements in \p *apis
1627
1628 * @returns 0 on success, else an error.
1629 */
1630rd_kafka_resp_err_t
1631rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
1632 rd_kafka_broker_t *rkb,
1633 rd_kafka_resp_err_t err,
1634 rd_kafka_buf_t *rkbuf,
1635 rd_kafka_buf_t *request,
1636 struct rd_kafka_ApiVersion **apis,
1637 size_t *api_cnt) {
1638 const int log_decode_errors = LOG_ERR;
1639 int32_t ApiArrayCnt;
1640 int16_t ErrorCode;
1641 int i = 0;
1642
1643 *apis = NULL;
1644
1645 if (err)
1646 goto err;
1647
1648 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1649 if ((err = ErrorCode))
1650 goto err;
1651
1652 rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt);
1653 if (ApiArrayCnt > 1000)
1654 rd_kafka_buf_parse_fail(rkbuf,
1655 "ApiArrayCnt %"PRId32" out of range",
1656 ApiArrayCnt);
1657
1658 rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
1659 "Broker API support:");
1660
1661 *apis = malloc(sizeof(**apis) * ApiArrayCnt);
1662
1663 for (i = 0 ; i < ApiArrayCnt ; i++) {
1664 struct rd_kafka_ApiVersion *api = &(*apis)[i];
1665
1666 rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
1667 rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
1668 rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
1669
1670 rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
1671 " ApiKey %s (%hd) Versions %hd..%hd",
1672 rd_kafka_ApiKey2str(api->ApiKey),
1673 api->ApiKey, api->MinVer, api->MaxVer);
1674 }
1675
1676 *api_cnt = ApiArrayCnt;
1677 qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
1678
1679 goto done;
1680
1681 err_parse:
1682 err = rkbuf->rkbuf_err;
1683 err:
1684 if (*apis)
1685 rd_free(*apis);
1686
1687 /* There are no retryable errors. */
1688
1689 done:
1690 return err;
1691}
1692
1693
1694
1695/**
1696 * Send ApiVersionRequest (KIP-35)
1697 */
1698void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
1699 rd_kafka_replyq_t replyq,
1700 rd_kafka_resp_cb_t *resp_cb,
1701 void *opaque) {
1702 rd_kafka_buf_t *rkbuf;
1703
1704 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4);
1705
1706 /* Should be sent before any other requests since it is part of
1707 * the initial connection handshake. */
1708 rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
1709
1710 rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */
1711
1712 /* Non-supporting brokers will tear down the connection when they
1713 * receive an unknown API request, so dont retry request on failure. */
1714 rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
1715
1716 /* 0.9.0.x brokers will not close the connection on unsupported
1717 * API requests, so we minimize the timeout for the request.
1718 * This is a regression on the broker part. */
1719 rd_kafka_buf_set_abs_timeout(
1720 rkbuf,
1721 rkb->rkb_rk->rk_conf.api_version_request_timeout_ms,
1722 0);
1723
1724 if (replyq.q)
1725 rd_kafka_broker_buf_enq_replyq(rkb,
1726 rkbuf, replyq, resp_cb, opaque);
1727 else /* in broker thread */
1728 rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
1729}
1730
1731
1732/**
1733 * Send SaslHandshakeRequest (KIP-43)
1734 */
1735void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
1736 const char *mechanism,
1737 rd_kafka_replyq_t replyq,
1738 rd_kafka_resp_cb_t *resp_cb,
1739 void *opaque) {
1740 rd_kafka_buf_t *rkbuf;
1741 int mechlen = (int)strlen(mechanism);
1742
1743 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake,
1744 1, RD_KAFKAP_STR_SIZE0(mechlen));
1745
1746 /* Should be sent before any other requests since it is part of
1747 * the initial connection handshake. */
1748 rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
1749
1750 rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
1751
1752 /* Non-supporting brokers will tear down the conneciton when they
1753 * receive an unknown API request or where the SASL GSSAPI
1754 * token type is not recognized, so dont retry request on failure. */
1755 rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
1756
1757 /* 0.9.0.x brokers will not close the connection on unsupported
1758 * API requests, so we minimize the timeout of the request.
1759 * This is a regression on the broker part. */
1760 if (!rkb->rkb_rk->rk_conf.api_version_request &&
1761 rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000)
1762 rd_kafka_buf_set_abs_timeout(rkbuf, 10*1000 /*10s*/, 0);
1763
1764 if (replyq.q)
1765 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq,
1766 resp_cb, opaque);
1767 else /* in broker thread */
1768 rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
1769}
1770
1771
1772
1773/**
1774 * @struct Hold temporary result and return values from ProduceResponse
1775 */
1776struct rd_kafka_Produce_result {
1777 int64_t offset; /**< Assigned offset of first message */
1778 int64_t timestamp; /**< (Possibly assigned) offset of first message */
1779};
1780
1781/**
1782 * @brief Parses a Produce reply.
1783 * @returns 0 on success or an error code on failure.
1784 * @locality broker thread
1785 */
1786static rd_kafka_resp_err_t
1787rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb,
1788 rd_kafka_toppar_t *rktp,
1789 rd_kafka_buf_t *rkbuf,
1790 rd_kafka_buf_t *request,
1791 struct rd_kafka_Produce_result *result) {
1792 int32_t TopicArrayCnt;
1793 int32_t PartitionArrayCnt;
1794 struct {
1795 int32_t Partition;
1796 int16_t ErrorCode;
1797 int64_t Offset;
1798 } hdr;
1799 const int log_decode_errors = LOG_ERR;
1800 int64_t log_start_offset = -1;
1801
1802 rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
1803 if (TopicArrayCnt != 1)
1804 goto err;
1805
1806 /* Since we only produce to one single topic+partition in each
1807 * request we assume that the reply only contains one topic+partition
1808 * and that it is the same that we requested.
1809 * If not the broker is buggy. */
1810 rd_kafka_buf_skip_str(rkbuf);
1811 rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
1812
1813 if (PartitionArrayCnt != 1)
1814 goto err;
1815
1816 rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
1817 rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
1818 rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
1819
1820 result->offset = hdr.Offset;
1821
1822 result->timestamp = -1;
1823 if (request->rkbuf_reqhdr.ApiVersion >= 2)
1824 rd_kafka_buf_read_i64(rkbuf, &result->timestamp);
1825
1826 if (request->rkbuf_reqhdr.ApiVersion >= 5)
1827 rd_kafka_buf_read_i64(rkbuf, &log_start_offset);
1828
1829 if (request->rkbuf_reqhdr.ApiVersion >= 1) {
1830 int32_t Throttle_Time;
1831 rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
1832
1833 rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
1834 Throttle_Time);
1835 }
1836
1837
1838 return hdr.ErrorCode;
1839
1840 err_parse:
1841 return rkbuf->rkbuf_err;
1842 err:
1843 return RD_KAFKA_RESP_ERR__BAD_MSG;
1844}
1845
1846
1847/**
1848 * @struct Hold temporary Produce error state
1849 */
1850struct rd_kafka_Produce_err {
1851 rd_kafka_resp_err_t err; /**< Error code */
1852 int actions; /**< Actions to take */
1853 int incr_retry; /**< Increase per-message retry cnt */
1854 rd_kafka_msg_status_t status; /**< Messages persistence status */
1855
1856 /* Idempotent Producer */
1857 int32_t next_ack_seq; /**< Next expected sequence to ack */
1858 int32_t next_err_seq; /**< Next expected error sequence */
1859 rd_bool_t update_next_ack; /**< Update next_ack_seq */
1860 rd_bool_t update_next_err; /**< Update next_err_seq */
1861 rd_kafka_pid_t rktp_pid; /**< Partition's current PID */
1862 int32_t last_seq; /**< Last sequence in current batch */
1863};
1864
1865
1866/**
1867 * @brief Error-handling for Idempotent Producer-specific Produce errors.
1868 *
1869 * May update \p errp, \p actionsp and \p incr_retryp.
1870 *
1871 * The resulting \p actionsp are handled by the caller.
1872 *
1873 * @warning May be called on the old leader thread. Lock rktp appropriately!
1874 *
1875 * @locality broker thread (but not necessarily the leader broker)
1876 * @locks none
1877 */
1878static void
1879rd_kafka_handle_idempotent_Produce_error (rd_kafka_broker_t *rkb,
1880 rd_kafka_msgbatch_t *batch,
1881 struct rd_kafka_Produce_err *perr) {
1882 rd_kafka_t *rk = rkb->rkb_rk;
1883 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp);
1884 rd_kafka_msg_t *firstmsg, *lastmsg;
1885 int r;
1886 rd_ts_t now = rd_clock(), state_age;
1887 struct rd_kafka_toppar_err last_err;
1888
1889 rd_kafka_rdlock(rkb->rkb_rk);
1890 state_age = now - rkb->rkb_rk->rk_eos.ts_idemp_state;
1891 rd_kafka_rdunlock(rkb->rkb_rk);
1892
1893 firstmsg = rd_kafka_msgq_first(&batch->msgq);
1894 lastmsg = rd_kafka_msgq_last(&batch->msgq);
1895 rd_assert(firstmsg && lastmsg);
1896
1897 /* Store the last msgid of the batch
1898 * on the first message in case we need to retry
1899 * and thus reconstruct the entire batch. */
1900 if (firstmsg->rkm_u.producer.last_msgid) {
1901 /* last_msgid already set, make sure it
1902 * actually points to the last message. */
1903 rd_assert(firstmsg->rkm_u.producer.last_msgid ==
1904 lastmsg->rkm_u.producer.msgid);
1905 } else {
1906 firstmsg->rkm_u.producer.last_msgid =
1907 lastmsg->rkm_u.producer.msgid;
1908 }
1909
1910 if (!rd_kafka_pid_eq(batch->pid, perr->rktp_pid)) {
1911 /* Don't retry if PID changed since we can't
1912 * guarantee correctness across PID sessions. */
1913 perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
1914 perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
1915
1916 rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "ERRPID",
1917 "%.*s [%"PRId32"] PID mismatch: "
1918 "request %s != partition %s: "
1919 "failing messages with error %s",
1920 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1921 rktp->rktp_partition,
1922 rd_kafka_pid2str(batch->pid),
1923 rd_kafka_pid2str(perr->rktp_pid),
1924 rd_kafka_err2str(perr->err));
1925 return;
1926 }
1927
1928 /*
1929 * Special error handling
1930 */
1931 switch (perr->err)
1932 {
1933 case RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER:
1934 /* Compare request's sequence to expected next
1935 * acked sequence.
1936 *
1937 * Example requests in flight:
1938 * R1(base_seq:5) R2(10) R3(15) R4(20)
1939 */
1940
1941 /* Acquire the last partition error to help
1942 * troubleshoot this problem. */
1943 rd_kafka_toppar_lock(rktp);
1944 last_err = rktp->rktp_last_err;
1945 rd_kafka_toppar_unlock(rktp);
1946
1947 r = batch->first_seq - perr->next_ack_seq;
1948
1949 if (r == 0) {
1950 /* R1 failed:
1951 * If this was the head-of-line request in-flight it
1952 * means there is a state desynchronization between the
1953 * producer and broker (a bug), in which case
1954 * we'll raise a fatal error since we can no longer
1955 * reason about the state of messages and thus
1956 * not guarantee ordering or once-ness for R1,
1957 * nor give the user a chance to opt out of sending
1958 * R2 to R4 which would be retried automatically. */
1959
1960 rd_kafka_set_fatal_error(
1961 rk, perr->err,
1962 "ProduceRequest for %.*s [%"PRId32"] "
1963 "with %d message(s) failed "
1964 "due to sequence desynchronization with "
1965 "broker %"PRId32" (%s, base seq %"PRId32", "
1966 "idemp state change %"PRId64"ms ago, "
1967 "last partition error %s (actions %s, "
1968 "base seq %"PRId32"..%"PRId32
1969 ", base msgid %"PRIu64", %"PRId64"ms ago)",
1970 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1971 rktp->rktp_partition,
1972 rd_kafka_msgq_len(&batch->msgq),
1973 rkb->rkb_nodeid,
1974 rd_kafka_pid2str(batch->pid),
1975 batch->first_seq,
1976 state_age / 1000,
1977 rd_kafka_err2name(last_err.err),
1978 rd_kafka_actions2str(last_err.actions),
1979 last_err.base_seq, last_err.last_seq,
1980 last_err.base_msgid,
1981 last_err.ts ?
1982 (now - last_err.ts)/1000 : -1);
1983
1984 perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
1985 perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
1986 perr->update_next_ack = rd_false;
1987 perr->update_next_err = rd_true;
1988
1989 } else if (r > 0) {
1990 /* R2 failed:
1991 * With max.in.flight > 1 we can have a situation
1992 * where the first request in-flight (R1) to the broker
1993 * fails, which causes the sub-sequent requests
1994 * that are in-flight to have a non-sequential
1995 * sequence number and thus fail.
1996 * But these sub-sequent requests (R2 to R4) are not at
1997 * the risk of being duplicated so we bump the epoch and
1998 * re-enqueue the messages for later retry
1999 * (without incrementing retries).
2000 */
2001 rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "ERRSEQ",
2002 "ProduceRequest for %.*s [%"PRId32"] "
2003 "with %d message(s) failed "
2004 "due to skipped sequence numbers "
2005 "(%s, base seq %"PRId32" > "
2006 "next seq %"PRId32") "
2007 "caused by previous failed request "
2008 "(%s, actions %s, "
2009 "base seq %"PRId32"..%"PRId32
2010 ", base msgid %"PRIu64", %"PRId64"ms ago): "
2011 "recovering and retrying",
2012 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2013 rktp->rktp_partition,
2014 rd_kafka_msgq_len(&batch->msgq),
2015 rd_kafka_pid2str(batch->pid),
2016 batch->first_seq,
2017 perr->next_ack_seq,
2018 rd_kafka_err2name(last_err.err),
2019 rd_kafka_actions2str(last_err.actions),
2020 last_err.base_seq, last_err.last_seq,
2021 last_err.base_msgid,
2022 last_err.ts ?
2023 (now - last_err.ts)/1000 : -1);
2024
2025 perr->incr_retry = 0;
2026 perr->actions = RD_KAFKA_ERR_ACTION_RETRY;
2027 perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
2028 perr->update_next_ack = rd_false;
2029 perr->update_next_err = rd_true;
2030
2031 rd_kafka_idemp_drain_epoch_bump(
2032 rk, "skipped sequence numbers");
2033
2034 } else {
2035 /* Request's sequence is less than next ack,
2036 * this should never happen unless we have
2037 * local bug or the broker did not respond
2038 * to the requests in order. */
2039 rd_kafka_set_fatal_error(
2040 rk, perr->err,
2041 "ProduceRequest for %.*s [%"PRId32"] "
2042 "with %d message(s) failed "
2043 "with rewound sequence number on "
2044 "broker %"PRId32" (%s, "
2045 "base seq %"PRId32" < next seq %"PRId32"): "
2046 "last error %s (actions %s, "
2047 "base seq %"PRId32"..%"PRId32
2048 ", base msgid %"PRIu64", %"PRId64"ms ago)",
2049 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2050 rktp->rktp_partition,
2051 rd_kafka_msgq_len(&batch->msgq),
2052 rkb->rkb_nodeid,
2053 rd_kafka_pid2str(batch->pid),
2054 batch->first_seq,
2055 perr->next_ack_seq,
2056 rd_kafka_err2name(last_err.err),
2057 rd_kafka_actions2str(last_err.actions),
2058 last_err.base_seq, last_err.last_seq,
2059 last_err.base_msgid,
2060 last_err.ts ?
2061 (now - last_err.ts)/1000 : -1);
2062
2063 perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
2064 perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
2065 perr->update_next_ack = rd_false;
2066 perr->update_next_err = rd_false;
2067 }
2068 break;
2069
2070 case RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER:
2071 /* This error indicates that we successfully produced
2072 * this set of messages before but this (supposed) retry failed.
2073 *
2074 * Treat as success, however offset and timestamp
2075 * will be invalid. */
2076
2077 /* Future improvement/FIXME:
2078 * But first make sure the first message has actually
2079 * been retried, getting this error for a non-retried message
2080 * indicates a synchronization issue or bug. */
2081 rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "DUPSEQ",
2082 "ProduceRequest for %.*s [%"PRId32"] "
2083 "with %d message(s) failed "
2084 "due to duplicate sequence number: "
2085 "previous send succeeded but was not acknowledged "
2086 "(%s, base seq %"PRId32"): "
2087 "marking the messages successfully delivered",
2088 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2089 rktp->rktp_partition,
2090 rd_kafka_msgq_len(&batch->msgq),
2091 rd_kafka_pid2str(batch->pid),
2092 batch->first_seq);
2093
2094 /* Void error, delivery succeeded */
2095 perr->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2096 perr->actions = 0;
2097 perr->status = RD_KAFKA_MSG_STATUS_PERSISTED;
2098 perr->update_next_ack = rd_true;
2099 perr->update_next_err = rd_true;
2100 break;
2101
2102 case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID:
2103 /* The broker/cluster lost track of our PID because
2104 * the last message we produced has now been deleted
2105 * (by DeleteRecords, compaction, or topic retention policy).
2106 *
2107 * If all previous messages are accounted for and this is not
2108 * a retry we can simply bump the epoch and reset the sequence
2109 * number and then retry the message(s) again.
2110 *
2111 * If there are outstanding messages not yet acknowledged
2112 * then there is no safe way to carry on without risking
2113 * duplication or reordering, in which case we fail
2114 * the producer. */
2115
2116 if (!firstmsg->rkm_u.producer.retries &&
2117 perr->next_err_seq == batch->first_seq) {
2118 rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "UNKPID",
2119 "ProduceRequest for %.*s [%"PRId32"] "
2120 "with %d message(s) failed "
2121 "due to unknown producer id "
2122 "(%s, base seq %"PRId32", %d retries): "
2123 "no risk of duplication/reordering: "
2124 "resetting PID and retrying",
2125 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2126 rktp->rktp_partition,
2127 rd_kafka_msgq_len(&batch->msgq),
2128 rd_kafka_pid2str(batch->pid),
2129 batch->first_seq,
2130 firstmsg->rkm_u.producer.retries);
2131
2132 /* Drain outstanding requests and bump epoch. */
2133 rd_kafka_idemp_drain_epoch_bump(rk,
2134 "unknown producer id");
2135
2136 perr->incr_retry = 0;
2137 perr->actions = RD_KAFKA_ERR_ACTION_RETRY;
2138 perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
2139 perr->update_next_ack = rd_false;
2140 perr->update_next_err = rd_true;
2141 break;
2142 }
2143
2144 rd_kafka_set_fatal_error(
2145 rk, perr->err,
2146 "ProduceRequest for %.*s [%"PRId32"] "
2147 "with %d message(s) failed "
2148 "due to unknown producer id ("
2149 "broker %"PRId32" %s, base seq %"PRId32", %d retries): "
2150 "unable to retry without risking "
2151 "duplication/reordering",
2152 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2153 rktp->rktp_partition,
2154 rd_kafka_msgq_len(&batch->msgq),
2155 rkb->rkb_nodeid,
2156 rd_kafka_pid2str(batch->pid),
2157 batch->first_seq,
2158 firstmsg->rkm_u.producer.retries);
2159
2160 perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
2161 perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
2162 perr->update_next_ack = rd_false;
2163 perr->update_next_err = rd_true;
2164 break;
2165
2166 default:
2167 /* All other errors are handled in the standard
2168 * error Produce handler, which will set
2169 * update_next_ack|err accordingly. */
2170 break;
2171 }
2172}
2173
2174
2175
2176/**
2177 * @brief Error-handling for failed ProduceRequests
2178 *
2179 * @param errp Is the input and output error, it may be changed
2180 * by this function.
2181 *
2182 * @returns 0 if no further processing of the request should be performed,
2183 * such as triggering delivery reports, else 1.
2184 *
2185 * @warning May be called on the old leader thread. Lock rktp appropriately!
2186 *
2187 * @warning \p request may be NULL.
2188 *
2189 * @locality broker thread (but not necessarily the leader broker)
2190 * @locks none
2191 */
2192static int rd_kafka_handle_Produce_error (rd_kafka_broker_t *rkb,
2193 const rd_kafka_buf_t *request,
2194 rd_kafka_msgbatch_t *batch,
2195 struct rd_kafka_Produce_err *perr) {
2196 rd_kafka_t *rk = rkb->rkb_rk;
2197 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp);
2198 int is_leader;
2199
2200 if (unlikely(perr->err == RD_KAFKA_RESP_ERR__DESTROY))
2201 return 0; /* Terminating */
2202
2203 /* When there is a partition leader change any outstanding
2204 * requests to the old broker will be handled by the old
2205 * broker thread when the responses are received/timeout:
2206 * in this case we need to be careful with locking:
2207 * check once if we're the leader (which allows relaxed
2208 * locking), and cache the current rktp's eos state vars. */
2209 rd_kafka_toppar_lock(rktp);
2210 is_leader = rktp->rktp_leader == rkb;
2211 perr->rktp_pid = rktp->rktp_eos.pid;
2212 perr->next_ack_seq = rktp->rktp_eos.next_ack_seq;
2213 perr->next_err_seq = rktp->rktp_eos.next_err_seq;
2214 rd_kafka_toppar_unlock(rktp);
2215
2216 /* All failures are initially treated as if the message
2217 * was not persisted, but the status may be changed later
2218 * for specific errors and actions. */
2219 perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
2220
2221 /* Set actions for known errors (may be overriden later),
2222 * all other errors are considered permanent failures.
2223 * (also see rd_kafka_err_action() for the default actions). */
2224 perr->actions = rd_kafka_err_action(
2225 rkb, perr->err, request,
2226
2227 RD_KAFKA_ERR_ACTION_REFRESH|
2228 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2229 RD_KAFKA_RESP_ERR__TRANSPORT,
2230
2231 RD_KAFKA_ERR_ACTION_REFRESH|
2232 RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
2233 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
2234
2235 RD_KAFKA_ERR_ACTION_RETRY|
2236 RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
2237 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
2238
2239 RD_KAFKA_ERR_ACTION_RETRY|
2240 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2241 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
2242
2243 RD_KAFKA_ERR_ACTION_RETRY|
2244 RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
2245 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
2246
2247 RD_KAFKA_ERR_ACTION_RETRY|
2248 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2249 RD_KAFKA_RESP_ERR__TIMED_OUT,
2250
2251 RD_KAFKA_ERR_ACTION_PERMANENT|
2252 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2253 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
2254
2255 /* All Idempotent Producer-specific errors are
2256 * initially set as permanent errors,
2257 * special handling may change the actions. */
2258 RD_KAFKA_ERR_ACTION_PERMANENT|
2259 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2260 RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
2261
2262 RD_KAFKA_ERR_ACTION_PERMANENT|
2263 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2264 RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
2265
2266 RD_KAFKA_ERR_ACTION_PERMANENT|
2267 RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
2268 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
2269
2270 /* Message was purged from out-queue due to
2271 * Idempotent Producer Id change */
2272 RD_KAFKA_ERR_ACTION_RETRY,
2273 RD_KAFKA_RESP_ERR__RETRY,
2274
2275 RD_KAFKA_ERR_ACTION_END);
2276
2277 rd_rkb_dbg(rkb, MSG, "MSGSET",
2278 "%s [%"PRId32"]: MessageSet with %i message(s) "
2279 "(MsgId %"PRIu64", BaseSeq %"PRId32") "
2280 "encountered error: %s (actions %s)%s",
2281 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
2282 rd_kafka_msgq_len(&batch->msgq),
2283 batch->first_msgid, batch->first_seq,
2284 rd_kafka_err2str(perr->err),
2285 rd_kafka_actions2str(perr->actions),
2286 is_leader ? "" : " [NOT LEADER]");
2287
2288
2289 /*
2290 * Special handling for Idempotent Producer
2291 *
2292 * Note: Idempotent Producer-specific errors received
2293 * on a non-idempotent producer will be passed through
2294 * directly to the application.
2295 */
2296 if (rd_kafka_is_idempotent(rk))
2297 rd_kafka_handle_idempotent_Produce_error(rkb, batch, perr);
2298
2299 /* Update message persistence status based on action flags.
2300 * None of these are typically set after an idempotent error,
2301 * which sets the status explicitly. */
2302 if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED)
2303 perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
2304 else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED)
2305 perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
2306 else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_PERSISTED)
2307 perr->status = RD_KAFKA_MSG_STATUS_PERSISTED;
2308
2309 /* Save the last error for debugging sub-sequent errors,
2310 * useful for Idempotent Producer throubleshooting. */
2311 rd_kafka_toppar_lock(rktp);
2312 rktp->rktp_last_err.err = perr->err;
2313 rktp->rktp_last_err.actions = perr->actions;
2314 rktp->rktp_last_err.ts = rd_clock();
2315 rktp->rktp_last_err.base_seq = batch->first_seq;
2316 rktp->rktp_last_err.last_seq = perr->last_seq;
2317 rktp->rktp_last_err.base_msgid = batch->first_msgid;
2318 rd_kafka_toppar_unlock(rktp);
2319
2320 /*
2321 * Handle actions
2322 */
2323 if (perr->actions & (RD_KAFKA_ERR_ACTION_REFRESH |
2324 RD_KAFKA_ERR_ACTION_RETRY)) {
2325 /* Retry (refresh also implies retry) */
2326
2327 if (perr->actions & RD_KAFKA_ERR_ACTION_REFRESH) {
2328 /* Request metadata information update.
2329 * These errors imply that we have stale
2330 * information and the request was
2331 * either rejected or not sent -
2332 * we don't need to increment the retry count
2333 * when we perform a retry since:
2334 * - it is a temporary error (hopefully)
2335 * - there is no chance of duplicate delivery
2336 */
2337 rd_kafka_toppar_leader_unavailable(
2338 rktp, "produce", perr->err);
2339
2340 /* We can't be certain the request wasn't
2341 * sent in case of transport failure,
2342 * so the ERR__TRANSPORT case will need
2343 * the retry count to be increased */
2344 if (perr->err != RD_KAFKA_RESP_ERR__TRANSPORT)
2345 perr->incr_retry = 0;
2346 }
2347
2348 /* If message timed out in queue, not in transit,
2349 * we will retry at a later time but not increment
2350 * the retry count since there is no risk
2351 * of duplicates. */
2352 if (!rd_kafka_buf_was_sent(request))
2353 perr->incr_retry = 0;
2354
2355 if (!perr->incr_retry) {
2356 /* If retries are not to be incremented then
2357 * there is no chance of duplicates on retry, which
2358 * means these messages were not persisted. */
2359 perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
2360 }
2361
2362 if (rd_kafka_is_idempotent(rk)) {
2363 /* Any currently in-flight requests will
2364 * fail with ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
2365 * which should not be treated as a fatal error
2366 * since this request and sub-sequent requests
2367 * will be retried and thus return to order.
2368 * Unless the error was a timeout, or similar,
2369 * in which case the request might have made it
2370 * and the messages are considered possibly persisted:
2371 * in this case we allow the next in-flight response
2372 * to be successful, in which case we mark
2373 * this request's messages as succesfully delivered. */
2374 if (perr->status &
2375 RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED)
2376 perr->update_next_ack = rd_true;
2377 else
2378 perr->update_next_ack = rd_false;
2379 perr->update_next_err = rd_true;
2380
2381 /* Drain outstanding requests so that retries
2382 * are attempted with proper state knowledge and
2383 * without any in-flight requests. */
2384 rd_kafka_toppar_lock(rktp);
2385 rd_kafka_idemp_drain_toppar(rktp,
2386 "drain before retrying");
2387 rd_kafka_toppar_unlock(rktp);
2388 }
2389
2390 /* Since requests are specific to a broker
2391 * we move the retryable messages from the request
2392 * back to the partition queue (prepend) and then
2393 * let the new broker construct a new request.
2394 * While doing this we also make sure the retry count
2395 * for each message is honoured, any messages that
2396 * would exceeded the retry count will not be
2397 * moved but instead fail below. */
2398 rd_kafka_toppar_retry_msgq(rktp, &batch->msgq,
2399 perr->incr_retry,
2400 perr->status);
2401
2402 if (rd_kafka_msgq_len(&batch->msgq) == 0) {
2403 /* No need do anything more with the request
2404 * here since the request no longer has any
2405 * messages associated with it. */
2406 return 0;
2407 }
2408 }
2409
2410 if (perr->actions & RD_KAFKA_ERR_ACTION_PERMANENT &&
2411 rd_kafka_is_idempotent(rk)) {
2412 if (rk->rk_conf.eos.gapless) {
2413 /* A permanent non-idempotent error will lead to
2414 * gaps in the message series, the next request
2415 * will fail with ...ERR_OUT_OF_ORDER_SEQUENCE_NUMBER.
2416 * To satisfy the gapless guarantee we need to raise
2417 * a fatal error here. */
2418 rd_kafka_set_fatal_error(
2419 rk, RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
2420 "ProduceRequest for %.*s [%"PRId32"] "
2421 "with %d message(s) failed: "
2422 "%s (broker %"PRId32" %s, base seq %"PRId32"): "
2423 "unable to satisfy gap-less guarantee",
2424 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2425 rktp->rktp_partition,
2426 rd_kafka_msgq_len(&batch->msgq),
2427 rd_kafka_err2str(perr->err),
2428 rkb->rkb_nodeid,
2429 rd_kafka_pid2str(batch->pid),
2430 batch->first_seq);
2431
2432 /* Drain outstanding requests and reset PID. */
2433 rd_kafka_idemp_drain_reset(rk);
2434
2435 } else {
2436 /* If gapless is not set we bump the Epoch and
2437 * renumber the messages to send. */
2438
2439 /* Drain outstanding requests and bump the epoch .*/
2440 rd_kafka_idemp_drain_epoch_bump(
2441 rk, "message sequence gap");
2442 }
2443
2444 perr->update_next_ack = rd_false;
2445 /* Make sure the next error will not raise a fatal error. */
2446 perr->update_next_err = rd_true;
2447 }
2448
2449 /* Translate request-level timeout error code
2450 * to message-level timeout error code. */
2451 if (perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT ||
2452 perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
2453 perr->err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
2454
2455 return 1;
2456}
2457
2458/**
2459 * @brief Handle ProduceResponse success for idempotent producer
2460 *
2461 * @warning May be called on the old leader thread. Lock rktp appropriately!
2462 *
2463 * @locks none
2464 * @locality broker thread (but not necessarily the leader broker thread)
2465 */
2466static void
2467rd_kafka_handle_idempotent_Produce_success (rd_kafka_broker_t *rkb,
2468 rd_kafka_msgbatch_t *batch,
2469 int32_t next_seq) {
2470 rd_kafka_t *rk = rkb->rkb_rk;
2471 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp);
2472 char fatal_err[512];
2473 uint64_t first_msgid, last_msgid;
2474
2475 *fatal_err = '\0';
2476
2477 first_msgid = rd_kafka_msgq_first(&batch->msgq)->rkm_u.producer.msgid;
2478 last_msgid = rd_kafka_msgq_last(&batch->msgq)->rkm_u.producer.msgid;
2479
2480 rd_kafka_toppar_lock(rktp);
2481
2482 /* If the last acked msgid is higher than
2483 * the next message to (re)transmit in the message queue
2484 * it means a previous series of R1,R2 ProduceRequests
2485 * had R1 fail with uncertain persistence status,
2486 * such as timeout or transport error, but R2 succeeded,
2487 * which means the messages in R1 were in fact persisted.
2488 * In this case trigger delivery reports for all messages
2489 * in queue until we hit a non-acked message msgid. */
2490 if (unlikely(rktp->rktp_eos.acked_msgid < first_msgid - 1)) {
2491 rd_kafka_dr_implicit_ack(rkb, rktp, last_msgid);
2492
2493 } else if (unlikely(batch->first_seq != rktp->rktp_eos.next_ack_seq &&
2494 batch->first_seq == rktp->rktp_eos.next_err_seq)) {
2495 /* Response ordering is typically not a concern
2496 * (but will not happen with current broker versions),
2497 * unless we're expecting an error to be returned at
2498 * this sequence rather than a success ack, in which
2499 * case raise a fatal error. */
2500
2501 /* Can't call set_fatal_error() while
2502 * holding the toppar lock, so construct
2503 * the error string here and call
2504 * set_fatal_error() below after
2505 * toppar lock has been released. */
2506 rd_snprintf(
2507 fatal_err, sizeof(fatal_err),
2508 "ProduceRequest for %.*s [%"PRId32"] "
2509 "with %d message(s) "
2510 "succeeded when expecting failure "
2511 "(broker %"PRId32" %s, "
2512 "base seq %"PRId32", "
2513 "next ack seq %"PRId32", "
2514 "next err seq %"PRId32": "
2515 "unable to retry without risking "
2516 "duplication/reordering",
2517 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2518 rktp->rktp_partition,
2519 rd_kafka_msgq_len(&batch->msgq),
2520 rkb->rkb_nodeid,
2521 rd_kafka_pid2str(batch->pid),
2522 batch->first_seq,
2523 rktp->rktp_eos.next_ack_seq,
2524 rktp->rktp_eos.next_err_seq);
2525
2526 rktp->rktp_eos.next_err_seq = next_seq;
2527 }
2528
2529 if (likely(!*fatal_err)) {
2530 /* Advance next expected err and/or ack sequence */
2531
2532 /* Only step err seq if it hasn't diverged. */
2533 if (rktp->rktp_eos.next_err_seq ==
2534 rktp->rktp_eos.next_ack_seq)
2535 rktp->rktp_eos.next_err_seq = next_seq;
2536
2537 rktp->rktp_eos.next_ack_seq = next_seq;
2538 }
2539
2540 /* Store the last acked message sequence,
2541 * since retries within the broker cache window (5 requests)
2542 * will succeed for older messages we must only update the
2543 * acked msgid if it is higher than the last acked. */
2544 if (last_msgid > rktp->rktp_eos.acked_msgid)
2545 rktp->rktp_eos.acked_msgid = last_msgid;
2546
2547 rd_kafka_toppar_unlock(rktp);
2548
2549 /* Must call set_fatal_error() after releasing
2550 * the toppar lock. */
2551 if (unlikely(*fatal_err))
2552 rd_kafka_set_fatal_error(rk, RD_KAFKA_RESP_ERR__INCONSISTENT,
2553 "%s", fatal_err);
2554}
2555
2556
2557/**
2558 * @brief Handle ProduceRequest result for a message batch.
2559 *
2560 * @warning \p request may be NULL.
2561 *
2562 * @localiy broker thread (but not necessarily the toppar's handler thread)
2563 * @locks none
2564 */
2565static void
2566rd_kafka_msgbatch_handle_Produce_result (
2567 rd_kafka_broker_t *rkb,
2568 rd_kafka_msgbatch_t *batch,
2569 rd_kafka_resp_err_t err,
2570 const struct rd_kafka_Produce_result *presult,
2571 const rd_kafka_buf_t *request) {
2572
2573 rd_kafka_t *rk = rkb->rkb_rk;
2574 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp);
2575 rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
2576 rd_bool_t last_inflight;
2577 int32_t next_seq;
2578
2579 /* Decrease partition's messages in-flight counter */
2580 rd_assert(rd_atomic32_get(&rktp->rktp_msgs_inflight) >=
2581 rd_kafka_msgq_len(&batch->msgq));
2582 last_inflight = !rd_atomic32_sub(&rktp->rktp_msgs_inflight,
2583 rd_kafka_msgq_len(&batch->msgq));
2584
2585 /* Next expected sequence (and handle wrap) */
2586 next_seq = rd_kafka_seq_wrap(batch->first_seq +
2587 rd_kafka_msgq_len(&batch->msgq));
2588
2589 if (likely(!err)) {
2590 rd_rkb_dbg(rkb, MSG, "MSGSET",
2591 "%s [%"PRId32"]: MessageSet with %i message(s) "
2592 "(MsgId %"PRIu64", BaseSeq %"PRId32") delivered",
2593 rktp->rktp_rkt->rkt_topic->str,
2594 rktp->rktp_partition,
2595 rd_kafka_msgq_len(&batch->msgq),
2596 batch->first_msgid, batch->first_seq);
2597
2598 if (rktp->rktp_rkt->rkt_conf.required_acks != 0)
2599 status = RD_KAFKA_MSG_STATUS_PERSISTED;
2600
2601 if (rd_kafka_is_idempotent(rk))
2602 rd_kafka_handle_idempotent_Produce_success(rkb, batch,
2603 next_seq);
2604 } else {
2605 /* Error handling */
2606 struct rd_kafka_Produce_err perr = {
2607 .err = err,
2608 .incr_retry = 1,
2609 .status = status,
2610 .update_next_ack = rd_true,
2611 .update_next_err = rd_true,
2612 .last_seq = (batch->first_seq +
2613 rd_kafka_msgq_len(&batch->msgq) - 1)
2614 };
2615
2616 rd_kafka_handle_Produce_error(rkb, request, batch, &perr);
2617
2618 /* Update next expected acked and/or err sequence. */
2619 if (perr.update_next_ack || perr.update_next_err) {
2620 rd_kafka_toppar_lock(rktp);
2621 if (perr.update_next_ack)
2622 rktp->rktp_eos.next_ack_seq = next_seq;
2623 if (perr.update_next_err)
2624 rktp->rktp_eos.next_err_seq = next_seq;
2625 rd_kafka_toppar_unlock(rktp);
2626 }
2627
2628 err = perr.err;
2629 status = perr.status;
2630 }
2631
2632
2633 /* Messages to retry will have been removed from the request's queue */
2634 if (likely(rd_kafka_msgq_len(&batch->msgq) > 0)) {
2635 /* Set offset, timestamp and status for each message. */
2636 rd_kafka_msgq_set_metadata(&batch->msgq,
2637 presult->offset,
2638 presult->timestamp,
2639 status);
2640
2641 /* Enqueue messages for delivery report. */
2642 rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err);
2643 }
2644
2645 if (rd_kafka_is_idempotent(rk) && last_inflight)
2646 rd_kafka_idemp_inflight_toppar_sub(rk, rktp);
2647}
2648
2649
2650/**
2651 * @brief Handle ProduceResponse
2652 *
2653 * @param reply is NULL when `acks=0` and on various local errors.
2654 *
2655 * @remark ProduceRequests are never retried, retriable errors are
2656 * instead handled by re-enqueuing the request's messages back
2657 * on the partition queue to have a new ProduceRequest constructed
2658 * eventually.
2659 *
2660 * @warning May be called on the old leader thread. Lock rktp appropriately!
2661 *
2662 * @locality broker thread (but not necessarily the leader broker thread)
2663 */
2664static void rd_kafka_handle_Produce (rd_kafka_t *rk,
2665 rd_kafka_broker_t *rkb,
2666 rd_kafka_resp_err_t err,
2667 rd_kafka_buf_t *reply,
2668 rd_kafka_buf_t *request,
2669 void *opaque) {
2670 rd_kafka_msgbatch_t *batch = &request->rkbuf_batch;
2671 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp);
2672 struct rd_kafka_Produce_result result = {
2673 .offset = RD_KAFKA_OFFSET_INVALID,
2674 .timestamp = -1
2675 };
2676
2677 /* Unit test interface: inject errors */
2678 if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) {
2679 err = rk->rk_conf.ut.handle_ProduceResponse(
2680 rkb->rkb_rk,
2681 rkb->rkb_nodeid,
2682 batch->first_msgid,
2683 err);
2684 }
2685
2686 /* Parse Produce reply (unless the request errored) */
2687 if (!err && reply)
2688 err = rd_kafka_handle_Produce_parse(rkb, rktp,
2689 reply, request,
2690 &result);
2691
2692 rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err,
2693 &result, request);
2694}
2695
2696
2697/**
2698 * @brief Send ProduceRequest for messages in toppar queue.
2699 *
2700 * @returns the number of messages included, or 0 on error / no messages.
2701 *
2702 * @locality broker thread
2703 */
2704int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp,
2705 const rd_kafka_pid_t pid) {
2706 rd_kafka_buf_t *rkbuf;
2707 rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
2708 size_t MessageSetSize = 0;
2709 int cnt;
2710 rd_ts_t now;
2711 int64_t first_msg_timeout;
2712 int tmout;
2713
2714 /**
2715 * Create ProduceRequest with as many messages from the toppar
2716 * transmit queue as possible.
2717 */
2718 rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
2719 &rktp->rktp_xmit_msgq,
2720 pid, &MessageSetSize);
2721 if (unlikely(!rkbuf))
2722 return 0;
2723
2724 cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq);
2725 rd_dassert(cnt > 0);
2726
2727 rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, (int64_t)cnt);
2728 rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, (int64_t)MessageSetSize);
2729
2730 if (!rkt->rkt_conf.required_acks)
2731 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
2732
2733 /* Use timeout from first message in batch */
2734 now = rd_clock();
2735 first_msg_timeout = (rd_kafka_msgq_first(&rkbuf->rkbuf_batch.msgq)->
2736 rkm_ts_timeout - now) / 1000;
2737
2738 if (unlikely(first_msg_timeout <= 0)) {
2739 /* Message has already timed out, allow 100 ms
2740 * to produce anyway */
2741 tmout = 100;
2742 } else {
2743 tmout = (int)RD_MIN(INT_MAX, first_msg_timeout);
2744 }
2745
2746 /* Set absolute timeout (including retries), the
2747 * effective timeout for this specific request will be
2748 * capped by socket.timeout.ms */
2749 rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now);
2750
2751 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
2752 RD_KAFKA_NO_REPLYQ,
2753 rd_kafka_handle_Produce, NULL);
2754
2755 return cnt;
2756}
2757
2758
2759/**
2760 * @brief Construct and send CreateTopicsRequest to \p rkb
2761 * with the topics (NewTopic_t*) in \p new_topics, using
2762 * \p options.
2763 *
2764 * The response (unparsed) will be enqueued on \p replyq
2765 * for handling by \p resp_cb (with \p opaque passed).
2766 *
2767 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
2768 * transmission, otherwise an error code and errstr will be
2769 * updated with a human readable error string.
2770 */
2771rd_kafka_resp_err_t
2772rd_kafka_CreateTopicsRequest (rd_kafka_broker_t *rkb,
2773 const rd_list_t *new_topics /*(NewTopic_t*)*/,
2774 rd_kafka_AdminOptions_t *options,
2775 char *errstr, size_t errstr_size,
2776 rd_kafka_replyq_t replyq,
2777 rd_kafka_resp_cb_t *resp_cb,
2778 void *opaque) {
2779 rd_kafka_buf_t *rkbuf;
2780 int16_t ApiVersion = 0;
2781 int features;
2782 int i = 0;
2783 rd_kafka_NewTopic_t *newt;
2784 int op_timeout;
2785
2786 if (rd_list_cnt(new_topics) == 0) {
2787 rd_snprintf(errstr, errstr_size, "No topics to create");
2788 rd_kafka_replyq_destroy(&replyq);
2789 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2790 }
2791
2792 ApiVersion = rd_kafka_broker_ApiVersion_supported(
2793 rkb, RD_KAFKAP_CreateTopics, 0, 2, &features);
2794 if (ApiVersion == -1) {
2795 rd_snprintf(errstr, errstr_size,
2796 "Topic Admin API (KIP-4) not supported "
2797 "by broker, requires broker version >= 0.10.2.0");
2798 rd_kafka_replyq_destroy(&replyq);
2799 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
2800 }
2801
2802 if (rd_kafka_confval_get_int(&options->validate_only) &&
2803 ApiVersion < 1) {
2804 rd_snprintf(errstr, errstr_size,
2805 "CreateTopics.validate_only=true not "
2806 "supported by broker");
2807 rd_kafka_replyq_destroy(&replyq);
2808 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
2809 }
2810
2811
2812
2813 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1,
2814 4 +
2815 (rd_list_cnt(new_topics) * 200) +
2816 4 + 1);
2817
2818 /* #topics */
2819 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics));
2820
2821 while ((newt = rd_list_elem(new_topics, i++))) {
2822 int partition;
2823 int ei = 0;
2824 const rd_kafka_ConfigEntry_t *entry;
2825
2826 /* topic */
2827 rd_kafka_buf_write_str(rkbuf, newt->topic, -1);
2828
2829 if (rd_list_cnt(&newt->replicas)) {
2830 /* num_partitions and replication_factor must be
2831 * set to -1 if a replica assignment is sent. */
2832 /* num_partitions */
2833 rd_kafka_buf_write_i32(rkbuf, -1);
2834 /* replication_factor */
2835 rd_kafka_buf_write_i16(rkbuf, -1);
2836 } else {
2837 /* num_partitions */
2838 rd_kafka_buf_write_i32(rkbuf, newt->num_partitions);
2839 /* replication_factor */
2840 rd_kafka_buf_write_i16(rkbuf,
2841 (int16_t)newt->
2842 replication_factor);
2843 }
2844
2845 /* #replica_assignment */
2846 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas));
2847
2848 /* Replicas per partition, see rdkafka_admin.[ch]
2849 * for how these are constructed. */
2850 for (partition = 0 ; partition < rd_list_cnt(&newt->replicas);
2851 partition++) {
2852 const rd_list_t *replicas;
2853 int ri = 0;
2854
2855 replicas = rd_list_elem(&newt->replicas, partition);
2856 if (!replicas)
2857 continue;
2858
2859 /* partition */
2860 rd_kafka_buf_write_i32(rkbuf, partition);
2861 /* #replicas */
2862 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas));
2863
2864 for (ri = 0 ; ri < rd_list_cnt(replicas) ; ri++) {
2865 /* replica */
2866 rd_kafka_buf_write_i32(
2867 rkbuf, rd_list_get_int32(replicas, ri));
2868 }
2869 }
2870
2871 /* #config_entries */
2872 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config));
2873
2874 RD_LIST_FOREACH(entry, &newt->config, ei) {
2875 /* config_name */
2876 rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
2877 /* config_value (nullable) */
2878 rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
2879 }
2880 }
2881
2882 /* timeout */
2883 op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
2884 rd_kafka_buf_write_i32(rkbuf, op_timeout);
2885
2886 if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
2887 rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0);
2888
2889 if (ApiVersion >= 1) {
2890 /* validate_only */
2891 rd_kafka_buf_write_i8(rkbuf,
2892 rd_kafka_confval_get_int(&options->
2893 validate_only));
2894 }
2895
2896 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
2897
2898 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
2899
2900 return RD_KAFKA_RESP_ERR_NO_ERROR;
2901}
2902
2903
2904/**
2905 * @brief Construct and send DeleteTopicsRequest to \p rkb
2906 * with the topics (DeleteTopic_t *) in \p del_topics, using
2907 * \p options.
2908 *
2909 * The response (unparsed) will be enqueued on \p replyq
2910 * for handling by \p resp_cb (with \p opaque passed).
2911 *
2912 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
2913 * transmission, otherwise an error code and errstr will be
2914 * updated with a human readable error string.
2915 */
2916rd_kafka_resp_err_t
2917rd_kafka_DeleteTopicsRequest (rd_kafka_broker_t *rkb,
2918 const rd_list_t *del_topics /*(DeleteTopic_t*)*/,
2919 rd_kafka_AdminOptions_t *options,
2920 char *errstr, size_t errstr_size,
2921 rd_kafka_replyq_t replyq,
2922 rd_kafka_resp_cb_t *resp_cb,
2923 void *opaque) {
2924 rd_kafka_buf_t *rkbuf;
2925 int16_t ApiVersion = 0;
2926 int features;
2927 int i = 0;
2928 rd_kafka_DeleteTopic_t *delt;
2929 int op_timeout;
2930
2931 if (rd_list_cnt(del_topics) == 0) {
2932 rd_snprintf(errstr, errstr_size, "No topics to delete");
2933 rd_kafka_replyq_destroy(&replyq);
2934 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2935 }
2936
2937 ApiVersion = rd_kafka_broker_ApiVersion_supported(
2938 rkb, RD_KAFKAP_DeleteTopics, 0, 1, &features);
2939 if (ApiVersion == -1) {
2940 rd_snprintf(errstr, errstr_size,
2941 "Topic Admin API (KIP-4) not supported "
2942 "by broker, requires broker version >= 0.10.2.0");
2943 rd_kafka_replyq_destroy(&replyq);
2944 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
2945 }
2946
2947 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteTopics, 1,
2948 /* FIXME */
2949 4 +
2950 (rd_list_cnt(del_topics) * 100) +
2951 4);
2952
2953 /* #topics */
2954 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_topics));
2955
2956 while ((delt = rd_list_elem(del_topics, i++)))
2957 rd_kafka_buf_write_str(rkbuf, delt->topic, -1);
2958
2959 /* timeout */
2960 op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
2961 rd_kafka_buf_write_i32(rkbuf, op_timeout);
2962
2963 if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
2964 rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0);
2965
2966 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
2967
2968 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
2969
2970 return RD_KAFKA_RESP_ERR_NO_ERROR;
2971}
2972
2973
2974
2975/**
2976 * @brief Construct and send CreatePartitionsRequest to \p rkb
2977 * with the topics (NewPartitions_t*) in \p new_parts, using
2978 * \p options.
2979 *
2980 * The response (unparsed) will be enqueued on \p replyq
2981 * for handling by \p resp_cb (with \p opaque passed).
2982 *
2983 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
2984 * transmission, otherwise an error code and errstr will be
2985 * updated with a human readable error string.
2986 */
2987rd_kafka_resp_err_t
2988rd_kafka_CreatePartitionsRequest (rd_kafka_broker_t *rkb,
2989 const rd_list_t *new_parts /*(NewPartitions_t*)*/,
2990 rd_kafka_AdminOptions_t *options,
2991 char *errstr, size_t errstr_size,
2992 rd_kafka_replyq_t replyq,
2993 rd_kafka_resp_cb_t *resp_cb,
2994 void *opaque) {
2995 rd_kafka_buf_t *rkbuf;
2996 int16_t ApiVersion = 0;
2997 int i = 0;
2998 rd_kafka_NewPartitions_t *newp;
2999 int op_timeout;
3000
3001 if (rd_list_cnt(new_parts) == 0) {
3002 rd_snprintf(errstr, errstr_size, "No partitions to create");
3003 rd_kafka_replyq_destroy(&replyq);
3004 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3005 }
3006
3007 ApiVersion = rd_kafka_broker_ApiVersion_supported(
3008 rkb, RD_KAFKAP_CreatePartitions, 0, 0, NULL);
3009 if (ApiVersion == -1) {
3010 rd_snprintf(errstr, errstr_size,
3011 "CreatePartitions (KIP-195) not supported "
3012 "by broker, requires broker version >= 1.0.0");
3013 rd_kafka_replyq_destroy(&replyq);
3014 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
3015 }
3016
3017 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreatePartitions, 1,
3018 4 +
3019 (rd_list_cnt(new_parts) * 200) +
3020 4 + 1);
3021
3022 /* #topics */
3023 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_parts));
3024
3025 while ((newp = rd_list_elem(new_parts, i++))) {
3026 /* topic */
3027 rd_kafka_buf_write_str(rkbuf, newp->topic, -1);
3028
3029 /* New partition count */
3030 rd_kafka_buf_write_i32(rkbuf, (int32_t)newp->total_cnt);
3031
3032 /* #replica_assignment */
3033 if (rd_list_empty(&newp->replicas)) {
3034 rd_kafka_buf_write_i32(rkbuf, -1);
3035 } else {
3036 const rd_list_t *replicas;
3037 int pi = -1;
3038
3039 rd_kafka_buf_write_i32(rkbuf,
3040 rd_list_cnt(&newp->replicas));
3041
3042 while ((replicas = rd_list_elem(&newp->replicas,
3043 ++pi))) {
3044 int ri = 0;
3045
3046 /* replica count */
3047 rd_kafka_buf_write_i32(rkbuf,
3048 rd_list_cnt(replicas));
3049
3050 /* replica */
3051 for (ri = 0 ; ri < rd_list_cnt(replicas) ;
3052 ri++) {
3053 rd_kafka_buf_write_i32(
3054 rkbuf,
3055 rd_list_get_int32(replicas,
3056 ri));
3057 }
3058 }
3059 }
3060 }
3061
3062 /* timeout */
3063 op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
3064 rd_kafka_buf_write_i32(rkbuf, op_timeout);
3065
3066 if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
3067 rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0);
3068
3069 /* validate_only */
3070 rd_kafka_buf_write_i8(
3071 rkbuf, rd_kafka_confval_get_int(&options->validate_only));
3072
3073 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
3074
3075 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
3076
3077 return RD_KAFKA_RESP_ERR_NO_ERROR;
3078}
3079
3080
3081/**
3082 * @brief Construct and send AlterConfigsRequest to \p rkb
3083 * with the configs (ConfigResource_t*) in \p configs, using
3084 * \p options.
3085 *
3086 * The response (unparsed) will be enqueued on \p replyq
3087 * for handling by \p resp_cb (with \p opaque passed).
3088 *
3089 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
3090 * transmission, otherwise an error code and errstr will be
3091 * updated with a human readable error string.
3092 */
3093rd_kafka_resp_err_t
3094rd_kafka_AlterConfigsRequest (rd_kafka_broker_t *rkb,
3095 const rd_list_t *configs /*(ConfigResource_t*)*/,
3096 rd_kafka_AdminOptions_t *options,
3097 char *errstr, size_t errstr_size,
3098 rd_kafka_replyq_t replyq,
3099 rd_kafka_resp_cb_t *resp_cb,
3100 void *opaque) {
3101 rd_kafka_buf_t *rkbuf;
3102 int16_t ApiVersion = 0;
3103 int i;
3104 const rd_kafka_ConfigResource_t *config;
3105 int op_timeout;
3106
3107 if (rd_list_cnt(configs) == 0) {
3108 rd_snprintf(errstr, errstr_size,
3109 "No config resources specified");
3110 rd_kafka_replyq_destroy(&replyq);
3111 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3112 }
3113
3114 ApiVersion = rd_kafka_broker_ApiVersion_supported(
3115 rkb, RD_KAFKAP_AlterConfigs, 0, 0, NULL);
3116 if (ApiVersion == -1) {
3117 rd_snprintf(errstr, errstr_size,
3118 "AlterConfigs (KIP-133) not supported "
3119 "by broker, requires broker version >= 0.11.0");
3120 rd_kafka_replyq_destroy(&replyq);
3121 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
3122 }
3123
3124 /* incremental requires ApiVersion > FIXME */
3125 if (ApiVersion < 1 /* FIXME */ &&
3126 rd_kafka_confval_get_int(&options->incremental)) {
3127 rd_snprintf(errstr, errstr_size,
3128 "AlterConfigs.incremental=true (KIP-248) "
3129 "not supported by broker, "
3130 "requires broker version >= 2.0.0");
3131 rd_kafka_replyq_destroy(&replyq);
3132 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
3133 }
3134
3135 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1,
3136 rd_list_cnt(configs) * 200);
3137
3138 /* #resources */
3139 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
3140
3141 RD_LIST_FOREACH(config, configs, i) {
3142 const rd_kafka_ConfigEntry_t *entry;
3143 int ei;
3144
3145 /* resource_type */
3146 rd_kafka_buf_write_i8(rkbuf, config->restype);
3147
3148 /* resource_name */
3149 rd_kafka_buf_write_str(rkbuf, config->name, -1);
3150
3151 /* #config */
3152 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config));
3153
3154 RD_LIST_FOREACH(entry, &config->config, ei) {
3155 /* config_name */
3156 rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
3157 /* config_value (nullable) */
3158 rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
3159
3160 if (ApiVersion == 1)
3161 rd_kafka_buf_write_i8(rkbuf,
3162 entry->a.operation);
3163 else if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) {
3164 rd_snprintf(errstr, errstr_size,
3165 "Broker version >= 2.0.0 required "
3166 "for add/delete config "
3167 "entries: only set supported "
3168 "by this broker");
3169 rd_kafka_buf_destroy(rkbuf);
3170 rd_kafka_replyq_destroy(&replyq);
3171 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
3172 }
3173 }
3174 }
3175
3176 /* timeout */
3177 op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
3178 if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
3179 rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0);
3180
3181 /* validate_only */
3182 rd_kafka_buf_write_i8(
3183 rkbuf, rd_kafka_confval_get_int(&options->validate_only));
3184
3185 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
3186
3187 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
3188
3189 return RD_KAFKA_RESP_ERR_NO_ERROR;
3190}
3191
3192
3193/**
3194 * @brief Construct and send DescribeConfigsRequest to \p rkb
3195 * with the configs (ConfigResource_t*) in \p configs, using
3196 * \p options.
3197 *
3198 * The response (unparsed) will be enqueued on \p replyq
3199 * for handling by \p resp_cb (with \p opaque passed).
3200 *
3201 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
3202 * transmission, otherwise an error code and errstr will be
3203 * updated with a human readable error string.
3204 */
3205rd_kafka_resp_err_t
3206rd_kafka_DescribeConfigsRequest (rd_kafka_broker_t *rkb,
3207 const rd_list_t *configs /*(ConfigResource_t*)*/,
3208 rd_kafka_AdminOptions_t *options,
3209 char *errstr, size_t errstr_size,
3210 rd_kafka_replyq_t replyq,
3211 rd_kafka_resp_cb_t *resp_cb,
3212 void *opaque) {
3213 rd_kafka_buf_t *rkbuf;
3214 int16_t ApiVersion = 0;
3215 int i;
3216 const rd_kafka_ConfigResource_t *config;
3217 int op_timeout;
3218
3219 if (rd_list_cnt(configs) == 0) {
3220 rd_snprintf(errstr, errstr_size,
3221 "No config resources specified");
3222 rd_kafka_replyq_destroy(&replyq);
3223 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3224 }
3225
3226 ApiVersion = rd_kafka_broker_ApiVersion_supported(
3227 rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL);
3228 if (ApiVersion == -1) {
3229 rd_snprintf(errstr, errstr_size,
3230 "DescribeConfigs (KIP-133) not supported "
3231 "by broker, requires broker version >= 0.11.0");
3232 rd_kafka_replyq_destroy(&replyq);
3233 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
3234 }
3235
3236 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1,
3237 rd_list_cnt(configs) * 200);
3238
3239 /* #resources */
3240 rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
3241
3242 RD_LIST_FOREACH(config, configs, i) {
3243 const rd_kafka_ConfigEntry_t *entry;
3244 int ei;
3245
3246 /* resource_type */
3247 rd_kafka_buf_write_i8(rkbuf, config->restype);
3248
3249 /* resource_name */
3250 rd_kafka_buf_write_str(rkbuf, config->name, -1);
3251
3252 /* #config */
3253 if (rd_list_empty(&config->config)) {
3254 /* Get all configs */
3255 rd_kafka_buf_write_i32(rkbuf, -1);
3256 } else {
3257 /* Get requested configs only */
3258 rd_kafka_buf_write_i32(rkbuf,
3259 rd_list_cnt(&config->config));
3260 }
3261
3262 RD_LIST_FOREACH(entry, &config->config, ei) {
3263 /* config_name */
3264 rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
3265 }
3266 }
3267
3268
3269 if (ApiVersion == 1) {
3270 /* include_synonyms */
3271 rd_kafka_buf_write_i8(rkbuf, 1);
3272 }
3273
3274 /* timeout */
3275 op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
3276 if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
3277 rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0);
3278
3279 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
3280
3281 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
3282
3283 return RD_KAFKA_RESP_ERR_NO_ERROR;
3284}
3285
3286
3287
3288/**
3289 * @brief Parses and handles an InitProducerId reply.
3290 *
3291 * @returns 0 on success, else an error.
3292 *
3293 * @locality rdkafka main thread
3294 * @locks none
3295 */
3296void
3297rd_kafka_handle_InitProducerId (rd_kafka_t *rk,
3298 rd_kafka_broker_t *rkb,
3299 rd_kafka_resp_err_t err,
3300 rd_kafka_buf_t *rkbuf,
3301 rd_kafka_buf_t *request,
3302 void *opaque) {
3303 const int log_decode_errors = LOG_ERR;
3304 int16_t error_code;
3305 rd_kafka_pid_t pid;
3306
3307 if (err)
3308 goto err;
3309
3310 rd_kafka_buf_read_throttle_time(rkbuf);
3311
3312 rd_kafka_buf_read_i16(rkbuf, &error_code);
3313 if ((err = error_code))
3314 goto err;
3315
3316 rd_kafka_buf_read_i64(rkbuf, &pid.id);
3317 rd_kafka_buf_read_i16(rkbuf, &pid.epoch);
3318
3319 rd_kafka_idemp_pid_update(rkb, pid);
3320
3321 return;
3322
3323 err_parse:
3324 err = rkbuf->rkbuf_err;
3325 err:
3326 /* Retries are performed by idempotence state handler */
3327 rd_kafka_idemp_request_pid_failed(rkb, err);
3328}
3329
3330
3331/**
3332 * @brief Construct and send InitProducerIdRequest to \p rkb.
3333 *
3334 * \p transactional_id may be NULL.
3335 * \p transaction_timeout_ms may be set to -1.
3336 *
3337 * The response (unparsed) will be handled by \p resp_cb served
3338 * by queue \p replyq.
3339 *
3340 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
3341 * transmission, otherwise an error code and errstr will be
3342 * updated with a human readable error string.
3343 */
3344rd_kafka_resp_err_t
3345rd_kafka_InitProducerIdRequest (rd_kafka_broker_t *rkb,
3346 const char *transactional_id,
3347 int transaction_timeout_ms,
3348 char *errstr, size_t errstr_size,
3349 rd_kafka_replyq_t replyq,
3350 rd_kafka_resp_cb_t *resp_cb,
3351 void *opaque) {
3352 rd_kafka_buf_t *rkbuf;
3353 int16_t ApiVersion = 0;
3354
3355 ApiVersion = rd_kafka_broker_ApiVersion_supported(
3356 rkb, RD_KAFKAP_InitProducerId, 0, 1, NULL);
3357 if (ApiVersion == -1) {
3358 rd_snprintf(errstr, errstr_size,
3359 "InitProducerId (KIP-98) not supported "
3360 "by broker, requires broker version >= 0.11.0");
3361 rd_kafka_replyq_destroy(&replyq);
3362 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
3363 }
3364
3365 rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_InitProducerId, 1,
3366 2 + (transactional_id ?
3367 strlen(transactional_id) : 0) +
3368 4);
3369
3370 /* transactional_id */
3371 rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
3372
3373 /* transaction_timeout_ms */
3374 rd_kafka_buf_write_i32(rkbuf, transaction_timeout_ms);
3375
3376 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
3377
3378 /* Let the idempotence state handler perform retries */
3379 rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;
3380
3381 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
3382
3383 return RD_KAFKA_RESP_ERR_NO_ERROR;
3384}
3385
3386
3387
3388/**
3389 * @name Unit tests
3390 * @{
3391 *
3392 *
3393 *
3394 *
3395 */
3396
3397/**
3398 * @brief Create \p cnt messages, starting at \p msgid, and add them
3399 * to \p rkmq.
3400 *
3401 * @returns the number of messages added.
3402 */
3403static int
3404ut_create_msgs (rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) {
3405 int i;
3406
3407 for (i = 0 ; i < cnt ; i++) {
3408 rd_kafka_msg_t *rkm;
3409
3410 rkm = ut_rd_kafka_msg_new();
3411 rkm->rkm_u.producer.msgid = msgid++;
3412
3413 rd_kafka_msgq_enq(rkmq, rkm);
3414 }
3415
3416 return cnt;
3417}
3418
3419/**
3420 * @brief Idempotent Producer request/response unit tests
3421 *
3422 * The current test verifies proper handling of the following case:
3423 * Batch 0 succeeds
3424 * Batch 1 fails with temporary error
3425 * Batch 2,3 fails with out of order sequence
3426 * Retry Batch 1-3 should succeed.
3427 */
3428static int unittest_idempotent_producer (void) {
3429 rd_kafka_t *rk;
3430 rd_kafka_conf_t *conf;
3431 rd_kafka_broker_t *rkb;
3432#define _BATCH_CNT 4
3433#define _MSGS_PER_BATCH 3
3434 const int msgcnt = _BATCH_CNT * _MSGS_PER_BATCH;
3435 int remaining_batches;
3436 uint64_t msgid = 1;
3437 shptr_rd_kafka_toppar_t *s_rktp;
3438 rd_kafka_toppar_t *rktp;
3439 rd_kafka_pid_t pid = { .id = 1000, .epoch = 0 };
3440 struct rd_kafka_Produce_result result = {
3441 .offset = 1,
3442 .timestamp = 1000
3443 };
3444 rd_kafka_queue_t *rkqu;
3445 rd_kafka_event_t *rkev;
3446 rd_kafka_buf_t *request[_BATCH_CNT];
3447 int rcnt = 0;
3448 int retry_msg_cnt = 0;
3449 int drcnt = 0;
3450 rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
3451 int i, r;
3452
3453 RD_UT_SAY("Verifying idempotent producer error handling");
3454
3455 conf = rd_kafka_conf_new();
3456 rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0);
3457 rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0);
3458 if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) !=
3459 RD_KAFKA_CONF_OK)
3460 RD_UT_FAIL("Failed to enable idempotence");
3461 rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR);
3462
3463 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
3464 RD_UT_ASSERT(rk, "failed to create producer");
3465
3466 rkqu = rd_kafka_queue_get_main(rk);
3467
3468 /* We need a broker handle, use a logical broker to avoid
3469 * any connection attempts. */
3470 rkb = rd_kafka_broker_add_logical(rk, "unittest");
3471
3472 /* Have the broker support everything so msgset_writer selects
3473 * the most up-to-date output features. */
3474 rd_kafka_broker_lock(rkb);
3475 rkb->rkb_features = RD_KAFKA_FEATURE_UNITTEST | RD_KAFKA_FEATURE_ALL;
3476 rd_kafka_broker_unlock(rkb);
3477
3478 /* Get toppar */
3479 s_rktp = rd_kafka_toppar_get2(rk, "uttopic", 0, rd_false, rd_true);
3480 RD_UT_ASSERT(s_rktp, "failed to get toppar");
3481 rktp = rd_kafka_toppar_s2i(s_rktp);
3482
3483 /* Set the topic as exists so messages are enqueued on
3484 * the desired rktp away (otherwise UA partition) */
3485 rd_ut_kafka_topic_set_topic_exists(rktp->rktp_rkt, 1, -1);
3486
3487 /* Produce messages */
3488 ut_create_msgs(&rkmq, 1, msgcnt);
3489
3490 /* Set the pid */
3491 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID);
3492 rd_kafka_idemp_pid_update(rkb, pid);
3493 pid = rd_kafka_idemp_get_pid(rk);
3494 RD_UT_ASSERT(rd_kafka_pid_valid(pid), "PID is invalid");
3495 rd_kafka_toppar_pid_change(rktp, pid, msgid);
3496
3497 remaining_batches = _BATCH_CNT;
3498
3499 /* Create a ProduceRequest for each batch */
3500 for (rcnt = 0 ; rcnt < remaining_batches ; rcnt++) {
3501 size_t msize;
3502 request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
3503 rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), &msize);
3504 RD_UT_ASSERT(request[rcnt], "request #%d failed", rcnt);
3505 }
3506
3507 RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == 0,
3508 "expected input message queue to be empty, "
3509 "but still has %d message(s)",
3510 rd_kafka_msgq_len(&rkmq));
3511
3512 /*
3513 * Mock handling of each request
3514 */
3515
3516 /* Batch 0: accepted */
3517 i = 0;
3518 r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
3519 RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
3520 rd_kafka_msgbatch_handle_Produce_result(
3521 rkb, &request[i]->rkbuf_batch,
3522 RD_KAFKA_RESP_ERR_NO_ERROR,
3523 &result, request[i]);
3524 result.offset += r;
3525 RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0,
3526 "batch %d: expected no messages in rktp_msgq, not %d",
3527 i, rd_kafka_msgq_len(&rktp->rktp_msgq));
3528 rd_kafka_buf_destroy(request[i]);
3529 remaining_batches--;
3530
3531 /* Batch 1: fail, triggering retry (re-enq on rktp_msgq) */
3532 i = 1;
3533 r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
3534 RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
3535 rd_kafka_msgbatch_handle_Produce_result(
3536 rkb, &request[i]->rkbuf_batch,
3537 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
3538 &result, request[i]);
3539 retry_msg_cnt += r;
3540 RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
3541 "batch %d: expected %d messages in rktp_msgq, not %d",
3542 i, retry_msg_cnt,
3543 rd_kafka_msgq_len(&rktp->rktp_msgq));
3544 rd_kafka_buf_destroy(request[i]);
3545
3546 /* Batch 2: OUT_OF_ORDER, triggering retry .. */
3547 i = 2;
3548 r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
3549 RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
3550 rd_kafka_msgbatch_handle_Produce_result(
3551 rkb, &request[i]->rkbuf_batch,
3552 RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
3553 &result, request[i]);
3554 retry_msg_cnt += r;
3555 RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
3556 "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
3557 i, retry_msg_cnt,
3558 rd_kafka_msgq_len(&rktp->rktp_msgq));
3559 rd_kafka_buf_destroy(request[i]);
3560
3561 /* Batch 3: OUT_OF_ORDER, triggering retry .. */
3562 i = 3;
3563 r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
3564 rd_kafka_msgbatch_handle_Produce_result(
3565 rkb, &request[i]->rkbuf_batch,
3566 RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
3567 &result, request[i]);
3568 retry_msg_cnt += r;
3569 RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
3570 "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
3571 i, retry_msg_cnt,
3572 rd_kafka_msgq_len(&rktp->rktp_msgq));
3573 rd_kafka_buf_destroy(request[i]);
3574
3575
3576 /* Retried messages will have been moved to rktp_msgq,
3577 * move them back to our local queue. */
3578 rd_kafka_toppar_lock(rktp);
3579 rd_kafka_msgq_move(&rkmq, &rktp->rktp_msgq);
3580 rd_kafka_toppar_unlock(rktp);
3581
3582 RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == retry_msg_cnt,
3583 "Expected %d messages in retry queue, not %d",
3584 retry_msg_cnt, rd_kafka_msgq_len(&rkmq));
3585
3586 /* Sleep a short while to make sure the retry backoff expires. */
3587 rd_usleep(5*1000, NULL); /* 5ms */
3588
3589 /*
3590 * Create requests for remaining batches.
3591 */
3592 for (rcnt = 0 ; rcnt < remaining_batches ; rcnt++) {
3593 size_t msize;
3594 request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
3595 rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), &msize);
3596 RD_UT_ASSERT(request[rcnt],
3597 "Failed to create retry #%d (%d msgs in queue)",
3598 rcnt, rd_kafka_msgq_len(&rkmq));
3599 }
3600
3601 /*
3602 * Mock handling of each request, they will now succeed.
3603 */
3604 for (i = 0 ; i < rcnt ; i++) {
3605 r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
3606 rd_kafka_msgbatch_handle_Produce_result(
3607 rkb, &request[i]->rkbuf_batch,
3608 RD_KAFKA_RESP_ERR_NO_ERROR,
3609 &result, request[i]);
3610 result.offset += r;
3611 rd_kafka_buf_destroy(request[i]);
3612 }
3613
3614 retry_msg_cnt = 0;
3615 RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
3616 "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
3617 i, retry_msg_cnt,
3618 rd_kafka_msgq_len(&rktp->rktp_msgq));
3619
3620 /*
3621 * Wait for delivery reports, they should all be successful.
3622 */
3623 while ((rkev = rd_kafka_queue_poll(rkqu, 1000))) {
3624 const rd_kafka_message_t *rkmessage;
3625
3626 RD_UT_SAY("Got %s event with %d message(s)",
3627 rd_kafka_event_name(rkev),
3628 (int)rd_kafka_event_message_count(rkev));
3629
3630 while ((rkmessage = rd_kafka_event_message_next(rkev))) {
3631 RD_UT_SAY(" DR for message: %s: (persistence=%d)",
3632 rd_kafka_err2str(rkmessage->err),
3633 rd_kafka_message_status(rkmessage));
3634 if (rkmessage->err)
3635 RD_UT_WARN(" ^ Should not have failed");
3636 else
3637 drcnt++;
3638 }
3639 rd_kafka_event_destroy(rkev);
3640 }
3641
3642 /* Should be no more messages in queues */
3643 r = rd_kafka_outq_len(rk);
3644 RD_UT_ASSERT(r == 0, "expected outq to return 0, not %d", r);
3645
3646 /* Verify the expected number of good delivery reports were seen */
3647 RD_UT_ASSERT(drcnt == msgcnt,
3648 "expected %d DRs, not %d", msgcnt, drcnt);
3649
3650 rd_kafka_queue_destroy(rkqu);
3651 rd_kafka_toppar_destroy(s_rktp);
3652 rd_kafka_broker_destroy(rkb);
3653 rd_kafka_destroy(rk);
3654
3655 RD_UT_PASS();
3656 return 0;
3657}
3658
3659/**
3660 * @brief Request/response unit tests
3661 */
3662int unittest_request (void) {
3663 int fails = 0;
3664
3665 fails += unittest_idempotent_producer();
3666
3667 return fails;
3668}
3669
3670/**@}*/
3671