1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2015, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include <stdarg.h> |
30 | |
31 | #include "rdkafka_int.h" |
32 | #include "rdkafka_request.h" |
33 | #include "rdkafka_broker.h" |
34 | #include "rdkafka_offset.h" |
35 | #include "rdkafka_topic.h" |
36 | #include "rdkafka_partition.h" |
37 | #include "rdkafka_metadata.h" |
38 | #include "rdkafka_msgset.h" |
39 | #include "rdkafka_idempotence.h" |
40 | |
41 | #include "rdrand.h" |
42 | #include "rdstring.h" |
43 | #include "rdunittest.h" |
44 | |
45 | |
46 | /** |
47 | * Kafka protocol request and response handling. |
48 | * All of this code runs in the broker thread and uses op queues for |
49 | * propagating results back to the various sub-systems operating in |
50 | * other threads. |
51 | */ |
52 | |
53 | |
54 | /* RD_KAFKA_ERR_ACTION_.. to string map */ |
55 | static const char *rd_kafka_actions_descs[] = { |
56 | "Permanent" , |
57 | "Ignore" , |
58 | "Refresh" , |
59 | "Retry" , |
60 | "Inform" , |
61 | "Special" , |
62 | "MsgNotPersisted" , |
63 | "MsgPossiblyPersisted" , |
64 | "MsgPersisted" , |
65 | NULL, |
66 | }; |
67 | |
68 | static const char *rd_kafka_actions2str (int actions) { |
69 | static RD_TLS char actstr[128]; |
70 | return rd_flags2str(actstr, sizeof(actstr), |
71 | rd_kafka_actions_descs, |
72 | actions); |
73 | } |
74 | |
75 | |
76 | /** |
77 | * @brief Decide action(s) to take based on the returned error code. |
78 | * |
79 | * The optional var-args is a .._ACTION_END terminated list |
80 | * of action,error tuples which overrides the general behaviour. |
81 | * It is to be read as: for \p error, return \p action(s). |
82 | * |
83 | * @warning \p request, \p rkbuf and \p rkb may be NULL. |
84 | */ |
85 | int rd_kafka_err_action (rd_kafka_broker_t *rkb, |
86 | rd_kafka_resp_err_t err, |
87 | const rd_kafka_buf_t *request, ...) { |
88 | va_list ap; |
89 | int actions = 0; |
90 | int exp_act; |
91 | |
92 | if (!err) |
93 | return 0; |
94 | |
95 | /* Match explicitly defined error mappings first. */ |
96 | va_start(ap, request); |
97 | while ((exp_act = va_arg(ap, int))) { |
98 | int exp_err = va_arg(ap, int); |
99 | |
100 | if (err == exp_err) |
101 | actions |= exp_act; |
102 | } |
103 | va_end(ap); |
104 | |
105 | /* Explicit error match. */ |
106 | if (actions) { |
107 | if (err && rkb && request) |
108 | rd_rkb_dbg(rkb, BROKER, "REQERR" , |
109 | "%sRequest failed: %s: explicit actions %s" , |
110 | rd_kafka_ApiKey2str(request->rkbuf_reqhdr. |
111 | ApiKey), |
112 | rd_kafka_err2str(err), |
113 | rd_kafka_actions2str(actions)); |
114 | |
115 | return actions; |
116 | } |
117 | |
118 | /* Default error matching */ |
119 | switch (err) |
120 | { |
121 | case RD_KAFKA_RESP_ERR_NO_ERROR: |
122 | break; |
123 | case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: |
124 | case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: |
125 | case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: |
126 | case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: |
127 | case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: |
128 | case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: |
129 | case RD_KAFKA_RESP_ERR__WAIT_COORD: |
130 | /* Request metadata information update */ |
131 | actions |= RD_KAFKA_ERR_ACTION_REFRESH| |
132 | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; |
133 | break; |
134 | |
135 | case RD_KAFKA_RESP_ERR__TIMED_OUT: |
136 | case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: |
137 | case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND: |
138 | actions |= RD_KAFKA_ERR_ACTION_RETRY| |
139 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; |
140 | break; |
141 | |
142 | case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: |
143 | /* Client-side wait-response/in-queue timeout */ |
144 | case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS: |
145 | case RD_KAFKA_RESP_ERR__TRANSPORT: |
146 | actions |= RD_KAFKA_ERR_ACTION_RETRY| |
147 | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; |
148 | break; |
149 | |
150 | case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT: |
151 | actions |= RD_KAFKA_ERR_ACTION_PERMANENT| |
152 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; |
153 | break; |
154 | |
155 | case RD_KAFKA_RESP_ERR__DESTROY: |
156 | case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT: |
157 | case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE: |
158 | case RD_KAFKA_RESP_ERR__PURGE_QUEUE: |
159 | default: |
160 | actions |= RD_KAFKA_ERR_ACTION_PERMANENT| |
161 | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; |
162 | break; |
163 | } |
164 | |
165 | /* If no request buffer was specified, which might be the case |
166 | * in certain error call chains, mask out the retry action. */ |
167 | if (!request) |
168 | actions &= ~RD_KAFKA_ERR_ACTION_RETRY; |
169 | else if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_Produce) |
170 | /* Mask out message-related bits for non-Produce requests */ |
171 | actions &= ~RD_KAFKA_ERR_ACTION_MSG_FLAGS; |
172 | |
173 | if (err && actions && rkb && request) |
174 | rd_rkb_dbg(rkb, BROKER, "REQERR" , |
175 | "%sRequest failed: %s: actions %s" , |
176 | rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), |
177 | rd_kafka_err2str(err), |
178 | rd_kafka_actions2str(actions)); |
179 | |
180 | return actions; |
181 | } |
182 | |
183 | |
184 | /** |
185 | * Send GroupCoordinatorRequest |
186 | */ |
187 | void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb, |
188 | const rd_kafkap_str_t *cgrp, |
189 | rd_kafka_replyq_t replyq, |
190 | rd_kafka_resp_cb_t *resp_cb, |
191 | void *opaque) { |
192 | rd_kafka_buf_t *rkbuf; |
193 | |
194 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1, |
195 | RD_KAFKAP_STR_SIZE(cgrp)); |
196 | rd_kafka_buf_write_kstr(rkbuf, cgrp); |
197 | |
198 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
199 | } |
200 | |
201 | |
202 | |
203 | |
204 | /** |
205 | * @brief Parses and handles Offset replies. |
206 | * |
207 | * Returns the parsed offsets (and errors) in \p offsets |
208 | * |
209 | * @returns 0 on success, else an error. |
210 | */ |
211 | rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk, |
212 | rd_kafka_broker_t *rkb, |
213 | rd_kafka_resp_err_t err, |
214 | rd_kafka_buf_t *rkbuf, |
215 | rd_kafka_buf_t *request, |
216 | rd_kafka_topic_partition_list_t |
217 | *offsets) { |
218 | |
219 | const int log_decode_errors = LOG_ERR; |
220 | int16_t ErrorCode = 0; |
221 | int32_t TopicArrayCnt; |
222 | int actions; |
223 | int16_t api_version; |
224 | |
225 | if (err) { |
226 | ErrorCode = err; |
227 | goto err; |
228 | } |
229 | |
230 | api_version = request->rkbuf_reqhdr.ApiVersion; |
231 | |
232 | /* NOTE: |
233 | * Broker may return offsets in a different constellation than |
234 | * in the original request .*/ |
235 | |
236 | rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
237 | while (TopicArrayCnt-- > 0) { |
238 | rd_kafkap_str_t ktopic; |
239 | int32_t PartArrayCnt; |
240 | char *topic_name; |
241 | |
242 | rd_kafka_buf_read_str(rkbuf, &ktopic); |
243 | rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); |
244 | |
245 | RD_KAFKAP_STR_DUPA(&topic_name, &ktopic); |
246 | |
247 | while (PartArrayCnt-- > 0) { |
248 | int32_t kpartition; |
249 | int32_t OffsetArrayCnt; |
250 | int64_t Offset = -1; |
251 | rd_kafka_topic_partition_t *rktpar; |
252 | |
253 | rd_kafka_buf_read_i32(rkbuf, &kpartition); |
254 | rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
255 | |
256 | if (api_version == 1) { |
257 | int64_t Timestamp; |
258 | rd_kafka_buf_read_i64(rkbuf, &Timestamp); |
259 | rd_kafka_buf_read_i64(rkbuf, &Offset); |
260 | } else if (api_version == 0) { |
261 | rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt); |
262 | /* We only request one offset so just grab |
263 | * the first one. */ |
264 | while (OffsetArrayCnt-- > 0) |
265 | rd_kafka_buf_read_i64(rkbuf, &Offset); |
266 | } else { |
267 | rd_kafka_assert(NULL, !*"NOTREACHED" ); |
268 | } |
269 | |
270 | rktpar = rd_kafka_topic_partition_list_add( |
271 | offsets, topic_name, kpartition); |
272 | rktpar->err = ErrorCode; |
273 | rktpar->offset = Offset; |
274 | } |
275 | } |
276 | |
277 | goto done; |
278 | |
279 | err_parse: |
280 | ErrorCode = rkbuf->rkbuf_err; |
281 | err: |
282 | actions = rd_kafka_err_action( |
283 | rkb, ErrorCode, request, |
284 | RD_KAFKA_ERR_ACTION_PERMANENT, |
285 | RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, |
286 | |
287 | RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, |
288 | RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, |
289 | |
290 | RD_KAFKA_ERR_ACTION_END); |
291 | |
292 | if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
293 | char tmp[256]; |
294 | /* Re-query for leader */ |
295 | rd_snprintf(tmp, sizeof(tmp), |
296 | "OffsetRequest failed: %s" , |
297 | rd_kafka_err2str(ErrorCode)); |
298 | rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/, |
299 | tmp); |
300 | } |
301 | |
302 | if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
303 | if (rd_kafka_buf_retry(rkb, request)) |
304 | return RD_KAFKA_RESP_ERR__IN_PROGRESS; |
305 | /* FALLTHRU */ |
306 | } |
307 | |
308 | done: |
309 | return ErrorCode; |
310 | } |
311 | |
312 | |
313 | |
314 | |
315 | |
316 | |
317 | /** |
318 | * Send OffsetRequest for toppar 'rktp'. |
319 | */ |
320 | void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb, |
321 | rd_kafka_topic_partition_list_t *partitions, |
322 | int16_t api_version, |
323 | rd_kafka_replyq_t replyq, |
324 | rd_kafka_resp_cb_t *resp_cb, |
325 | void *opaque) { |
326 | rd_kafka_buf_t *rkbuf; |
327 | int i; |
328 | size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0; |
329 | const char *last_topic = "" ; |
330 | int32_t topic_cnt = 0, part_cnt = 0; |
331 | |
332 | rd_kafka_topic_partition_list_sort_by_topic(partitions); |
333 | |
334 | rkbuf = rd_kafka_buf_new_request( |
335 | rkb, RD_KAFKAP_Offset, 1, |
336 | /* ReplicaId+TopicArrayCnt+Topic */ |
337 | 4+4+100+ |
338 | /* PartArrayCnt */ |
339 | 4 + |
340 | /* partition_cnt * Partition+Time+MaxNumOffs */ |
341 | (partitions->cnt * (4+8+4))); |
342 | |
343 | /* ReplicaId */ |
344 | rd_kafka_buf_write_i32(rkbuf, -1); |
345 | /* TopicArrayCnt */ |
346 | of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */ |
347 | |
348 | for (i = 0 ; i < partitions->cnt ; i++) { |
349 | const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; |
350 | |
351 | if (strcmp(rktpar->topic, last_topic)) { |
352 | /* Finish last topic, if any. */ |
353 | if (of_PartArrayCnt > 0) |
354 | rd_kafka_buf_update_i32(rkbuf, |
355 | of_PartArrayCnt, |
356 | part_cnt); |
357 | |
358 | /* Topic */ |
359 | rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
360 | topic_cnt++; |
361 | last_topic = rktpar->topic; |
362 | /* New topic so reset partition count */ |
363 | part_cnt = 0; |
364 | |
365 | /* PartitionArrayCnt: updated later */ |
366 | of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
367 | } |
368 | |
369 | /* Partition */ |
370 | rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
371 | part_cnt++; |
372 | |
373 | /* Time/Offset */ |
374 | rd_kafka_buf_write_i64(rkbuf, rktpar->offset); |
375 | |
376 | if (api_version == 0) { |
377 | /* MaxNumberOfOffsets */ |
378 | rd_kafka_buf_write_i32(rkbuf, 1); |
379 | } |
380 | } |
381 | |
382 | if (of_PartArrayCnt > 0) { |
383 | rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt); |
384 | rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt); |
385 | } |
386 | |
387 | rd_kafka_buf_ApiVersion_set(rkbuf, api_version, |
388 | api_version == 1 ? |
389 | RD_KAFKA_FEATURE_OFFSET_TIME : 0); |
390 | |
391 | rd_rkb_dbg(rkb, TOPIC, "OFFSET" , |
392 | "OffsetRequest (v%hd, opv %d) " |
393 | "for %" PRId32" topic(s) and %" PRId32" partition(s)" , |
394 | api_version, rkbuf->rkbuf_replyq.version, |
395 | topic_cnt, partitions->cnt); |
396 | |
397 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
398 | } |
399 | |
400 | |
401 | /** |
402 | * Generic handler for OffsetFetch responses. |
403 | * Offsets for included partitions will be propagated through the passed |
404 | * 'offsets' list. |
405 | * |
406 | * \p update_toppar: update toppar's committed_offset |
407 | */ |
408 | rd_kafka_resp_err_t |
409 | rd_kafka_handle_OffsetFetch (rd_kafka_t *rk, |
410 | rd_kafka_broker_t *rkb, |
411 | rd_kafka_resp_err_t err, |
412 | rd_kafka_buf_t *rkbuf, |
413 | rd_kafka_buf_t *request, |
414 | rd_kafka_topic_partition_list_t *offsets, |
415 | int update_toppar) { |
416 | const int log_decode_errors = LOG_ERR; |
417 | int32_t TopicArrayCnt; |
418 | int64_t offset = RD_KAFKA_OFFSET_INVALID; |
419 | rd_kafkap_str_t metadata; |
420 | int i; |
421 | int actions; |
422 | int seen_cnt = 0; |
423 | |
424 | if (err) |
425 | goto err; |
426 | |
427 | /* Set default offset for all partitions. */ |
428 | rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0, |
429 | RD_KAFKA_OFFSET_INVALID, |
430 | 0 /* !is commit */); |
431 | |
432 | rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
433 | for (i = 0 ; i < TopicArrayCnt ; i++) { |
434 | rd_kafkap_str_t topic; |
435 | int32_t PartArrayCnt; |
436 | char *topic_name; |
437 | int j; |
438 | |
439 | rd_kafka_buf_read_str(rkbuf, &topic); |
440 | rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); |
441 | |
442 | RD_KAFKAP_STR_DUPA(&topic_name, &topic); |
443 | |
444 | for (j = 0 ; j < PartArrayCnt ; j++) { |
445 | int32_t partition; |
446 | shptr_rd_kafka_toppar_t *s_rktp; |
447 | rd_kafka_topic_partition_t *rktpar; |
448 | int16_t err2; |
449 | |
450 | rd_kafka_buf_read_i32(rkbuf, &partition); |
451 | rd_kafka_buf_read_i64(rkbuf, &offset); |
452 | rd_kafka_buf_read_str(rkbuf, &metadata); |
453 | rd_kafka_buf_read_i16(rkbuf, &err2); |
454 | |
455 | rktpar = rd_kafka_topic_partition_list_find(offsets, |
456 | topic_name, |
457 | partition); |
458 | if (!rktpar) { |
459 | rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH" , |
460 | "OffsetFetchResponse: %s [%" PRId32"] " |
461 | "not found in local list: ignoring" , |
462 | topic_name, partition); |
463 | continue; |
464 | } |
465 | |
466 | seen_cnt++; |
467 | |
468 | if (!(s_rktp = rktpar->_private)) { |
469 | s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk, |
470 | topic_name, |
471 | partition, 0, 0); |
472 | /* May be NULL if topic is not locally known */ |
473 | rktpar->_private = s_rktp; |
474 | } |
475 | |
476 | /* broker reports invalid offset as -1 */ |
477 | if (offset == -1) |
478 | rktpar->offset = RD_KAFKA_OFFSET_INVALID; |
479 | else |
480 | rktpar->offset = offset; |
481 | rktpar->err = err2; |
482 | |
483 | rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH" , |
484 | "OffsetFetchResponse: %s [%" PRId32"] offset %" PRId64, |
485 | topic_name, partition, offset); |
486 | |
487 | if (update_toppar && !err2 && s_rktp) { |
488 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
489 | /* Update toppar's committed offset */ |
490 | rd_kafka_toppar_lock(rktp); |
491 | rktp->rktp_committed_offset = rktpar->offset; |
492 | rd_kafka_toppar_unlock(rktp); |
493 | } |
494 | |
495 | |
496 | if (rktpar->metadata) |
497 | rd_free(rktpar->metadata); |
498 | |
499 | if (RD_KAFKAP_STR_IS_NULL(&metadata)) { |
500 | rktpar->metadata = NULL; |
501 | rktpar->metadata_size = 0; |
502 | } else { |
503 | rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata); |
504 | rktpar->metadata_size = |
505 | RD_KAFKAP_STR_LEN(&metadata); |
506 | } |
507 | } |
508 | } |
509 | |
510 | |
511 | err: |
512 | rd_rkb_dbg(rkb, TOPIC, "OFFFETCH" , |
513 | "OffsetFetch for %d/%d partition(s) returned %s" , |
514 | seen_cnt, |
515 | offsets ? offsets->cnt : -1, rd_kafka_err2str(err)); |
516 | |
517 | actions = rd_kafka_err_action(rkb, err, request, |
518 | RD_KAFKA_ERR_ACTION_END); |
519 | |
520 | if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
521 | /* Re-query for coordinator */ |
522 | rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL, |
523 | RD_KAFKA_NO_REPLYQ, |
524 | RD_KAFKA_OP_COORD_QUERY, err); |
525 | } |
526 | |
527 | if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
528 | if (rd_kafka_buf_retry(rkb, request)) |
529 | return RD_KAFKA_RESP_ERR__IN_PROGRESS; |
530 | /* FALLTHRU */ |
531 | } |
532 | |
533 | return err; |
534 | |
535 | err_parse: |
536 | err = rkbuf->rkbuf_err; |
537 | goto err; |
538 | } |
539 | |
540 | |
541 | |
542 | /** |
543 | * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH |
544 | * rko in \p opaque. |
545 | * |
546 | * @param opaque rko wrapper for handle_OffsetFetch. |
547 | * |
548 | * The \c rko->rko_u.offset_fetch.partitions list will be filled in with |
549 | * the fetched offsets. |
550 | * |
551 | * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH. |
552 | * |
553 | * @remark \p rkb, \p rkbuf and \p request are optional. |
554 | * |
555 | * @remark The \p request buffer may be retried on error. |
556 | * |
557 | * @locality cgrp's broker thread |
558 | */ |
559 | void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk, |
560 | rd_kafka_broker_t *rkb, |
561 | rd_kafka_resp_err_t err, |
562 | rd_kafka_buf_t *rkbuf, |
563 | rd_kafka_buf_t *request, |
564 | void *opaque) { |
565 | rd_kafka_op_t *rko = opaque; |
566 | rd_kafka_op_t *rko_reply; |
567 | rd_kafka_topic_partition_list_t *offsets; |
568 | |
569 | RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH); |
570 | |
571 | if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
572 | /* Termination, quick cleanup. */ |
573 | rd_kafka_op_destroy(rko); |
574 | return; |
575 | } |
576 | |
577 | offsets = rd_kafka_topic_partition_list_copy( |
578 | rko->rko_u.offset_fetch.partitions); |
579 | |
580 | /* If all partitions already had usable offsets then there |
581 | * was no request sent and thus no reply, the offsets list is |
582 | * good to go.. */ |
583 | if (rkbuf) { |
584 | /* ..else parse the response (or perror) */ |
585 | err = rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf, |
586 | request, offsets, 0); |
587 | if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { |
588 | rd_kafka_topic_partition_list_destroy(offsets); |
589 | return; /* Retrying */ |
590 | } |
591 | } |
592 | |
593 | rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY); |
594 | rko_reply->rko_err = err; |
595 | rko_reply->rko_u.offset_fetch.partitions = offsets; |
596 | rko_reply->rko_u.offset_fetch.do_free = 1; |
597 | if (rko->rko_rktp) |
598 | rko_reply->rko_rktp = rd_kafka_toppar_keep( |
599 | rd_kafka_toppar_s2i(rko->rko_rktp)); |
600 | |
601 | rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0); |
602 | |
603 | rd_kafka_op_destroy(rko); |
604 | } |
605 | |
606 | |
607 | |
608 | |
609 | |
610 | |
611 | /** |
612 | * Send OffsetFetchRequest for toppar. |
613 | * |
614 | * Any partition with a usable offset will be ignored, if all partitions |
615 | * have usable offsets then no request is sent at all but an empty |
616 | * reply is enqueued on the replyq. |
617 | */ |
618 | void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb, |
619 | int16_t api_version, |
620 | rd_kafka_topic_partition_list_t *parts, |
621 | rd_kafka_replyq_t replyq, |
622 | rd_kafka_resp_cb_t *resp_cb, |
623 | void *opaque) { |
624 | rd_kafka_buf_t *rkbuf; |
625 | size_t of_TopicCnt; |
626 | int TopicCnt = 0; |
627 | ssize_t of_PartCnt = -1; |
628 | const char *last_topic = NULL; |
629 | int PartCnt = 0; |
630 | int tot_PartCnt = 0; |
631 | int i; |
632 | |
633 | rkbuf = rd_kafka_buf_new_request( |
634 | rkb, RD_KAFKAP_OffsetFetch, 1, |
635 | RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) + |
636 | 4 + |
637 | (parts->cnt * 32)); |
638 | |
639 | |
640 | /* ConsumerGroup */ |
641 | rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id); |
642 | |
643 | /* Sort partitions by topic */ |
644 | rd_kafka_topic_partition_list_sort_by_topic(parts); |
645 | |
646 | /* TopicArrayCnt */ |
647 | of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */ |
648 | |
649 | for (i = 0 ; i < parts->cnt ; i++) { |
650 | rd_kafka_topic_partition_t *rktpar = &parts->elems[i]; |
651 | |
652 | /* Ignore partitions with a usable offset. */ |
653 | if (rktpar->offset != RD_KAFKA_OFFSET_INVALID && |
654 | rktpar->offset != RD_KAFKA_OFFSET_STORED) { |
655 | rd_rkb_dbg(rkb, TOPIC, "OFFSET" , |
656 | "OffsetFetchRequest: skipping %s [%" PRId32"] " |
657 | "with valid offset %s" , |
658 | rktpar->topic, rktpar->partition, |
659 | rd_kafka_offset2str(rktpar->offset)); |
660 | continue; |
661 | } |
662 | |
663 | if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { |
664 | /* New topic */ |
665 | |
666 | /* Finalize previous PartitionCnt */ |
667 | if (PartCnt > 0) |
668 | rd_kafka_buf_update_u32(rkbuf, of_PartCnt, |
669 | PartCnt); |
670 | |
671 | /* TopicName */ |
672 | rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
673 | /* PartitionCnt, finalized later */ |
674 | of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
675 | PartCnt = 0; |
676 | last_topic = rktpar->topic; |
677 | TopicCnt++; |
678 | } |
679 | |
680 | /* Partition */ |
681 | rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
682 | PartCnt++; |
683 | tot_PartCnt++; |
684 | } |
685 | |
686 | /* Finalize previous PartitionCnt */ |
687 | if (PartCnt > 0) |
688 | rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); |
689 | |
690 | /* Finalize TopicCnt */ |
691 | rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); |
692 | |
693 | rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0); |
694 | |
695 | rd_rkb_dbg(rkb, TOPIC, "OFFSET" , |
696 | "OffsetFetchRequest(v%d) for %d/%d partition(s)" , |
697 | api_version, tot_PartCnt, parts->cnt); |
698 | |
699 | if (tot_PartCnt == 0) { |
700 | /* No partitions needs OffsetFetch, enqueue empty |
701 | * response right away. */ |
702 | rkbuf->rkbuf_replyq = replyq; |
703 | rkbuf->rkbuf_cb = resp_cb; |
704 | rkbuf->rkbuf_opaque = opaque; |
705 | rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf); |
706 | return; |
707 | } |
708 | |
709 | rd_rkb_dbg(rkb, CGRP|RD_KAFKA_DBG_CONSUMER, "OFFSET" , |
710 | "Fetch committed offsets for %d/%d partition(s)" , |
711 | tot_PartCnt, parts->cnt); |
712 | |
713 | |
714 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
715 | } |
716 | |
717 | |
718 | /** |
719 | * @remark \p offsets may be NULL if \p err is set |
720 | */ |
721 | rd_kafka_resp_err_t |
722 | rd_kafka_handle_OffsetCommit (rd_kafka_t *rk, |
723 | rd_kafka_broker_t *rkb, |
724 | rd_kafka_resp_err_t err, |
725 | rd_kafka_buf_t *rkbuf, |
726 | rd_kafka_buf_t *request, |
727 | rd_kafka_topic_partition_list_t *offsets) { |
728 | const int log_decode_errors = LOG_ERR; |
729 | int32_t TopicArrayCnt; |
730 | int16_t ErrorCode = 0, last_ErrorCode = 0; |
731 | int errcnt = 0; |
732 | int i; |
733 | int actions; |
734 | |
735 | if (err) |
736 | goto err; |
737 | |
738 | rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
739 | for (i = 0 ; i < TopicArrayCnt ; i++) { |
740 | rd_kafkap_str_t topic; |
741 | char *topic_str; |
742 | int32_t PartArrayCnt; |
743 | int j; |
744 | |
745 | rd_kafka_buf_read_str(rkbuf, &topic); |
746 | rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); |
747 | |
748 | RD_KAFKAP_STR_DUPA(&topic_str, &topic); |
749 | |
750 | for (j = 0 ; j < PartArrayCnt ; j++) { |
751 | int32_t partition; |
752 | rd_kafka_topic_partition_t *rktpar; |
753 | |
754 | rd_kafka_buf_read_i32(rkbuf, &partition); |
755 | rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
756 | |
757 | rktpar = rd_kafka_topic_partition_list_find( |
758 | offsets, topic_str, partition); |
759 | |
760 | if (!rktpar) { |
761 | /* Received offset for topic/partition we didn't |
762 | * ask for, this shouldn't really happen. */ |
763 | continue; |
764 | } |
765 | |
766 | rktpar->err = ErrorCode; |
767 | if (ErrorCode) { |
768 | last_ErrorCode = ErrorCode; |
769 | errcnt++; |
770 | } |
771 | } |
772 | } |
773 | |
774 | /* If all partitions failed use error code |
775 | * from last partition as the global error. */ |
776 | if (offsets && errcnt == offsets->cnt) |
777 | err = last_ErrorCode; |
778 | goto done; |
779 | |
780 | err_parse: |
781 | err = rkbuf->rkbuf_err; |
782 | |
783 | err: |
784 | actions = rd_kafka_err_action( |
785 | rkb, err, request, |
786 | |
787 | RD_KAFKA_ERR_ACTION_PERMANENT, |
788 | RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE, |
789 | |
790 | RD_KAFKA_ERR_ACTION_RETRY, |
791 | RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS, |
792 | |
793 | RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL, |
794 | RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, |
795 | |
796 | RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL, |
797 | RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP, |
798 | |
799 | RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, |
800 | RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, |
801 | |
802 | RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, |
803 | RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, |
804 | |
805 | RD_KAFKA_ERR_ACTION_RETRY, |
806 | RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, |
807 | |
808 | RD_KAFKA_ERR_ACTION_PERMANENT, |
809 | RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE, |
810 | |
811 | RD_KAFKA_ERR_ACTION_PERMANENT, |
812 | RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, |
813 | |
814 | RD_KAFKA_ERR_ACTION_PERMANENT, |
815 | RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, |
816 | |
817 | RD_KAFKA_ERR_ACTION_END); |
818 | |
819 | if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) { |
820 | /* Mark coordinator dead or re-query for coordinator. |
821 | * ..dead() will trigger a re-query. */ |
822 | if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) |
823 | rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err, |
824 | "OffsetCommitRequest failed" ); |
825 | else |
826 | rd_kafka_cgrp_coord_query(rk->rk_cgrp, |
827 | "OffsetCommitRequest failed" ); |
828 | } |
829 | if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
830 | if (rd_kafka_buf_retry(rkb, request)) |
831 | return RD_KAFKA_RESP_ERR__IN_PROGRESS; |
832 | /* FALLTHRU */ |
833 | } |
834 | |
835 | done: |
836 | return err; |
837 | } |
838 | |
839 | |
840 | |
841 | |
842 | /** |
843 | * @brief Send OffsetCommitRequest for a list of partitions. |
844 | * |
845 | * @returns 0 if none of the partitions in \p offsets had valid offsets, |
846 | * else 1. |
847 | */ |
848 | int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb, |
849 | rd_kafka_cgrp_t *rkcg, |
850 | int16_t api_version, |
851 | rd_kafka_topic_partition_list_t *offsets, |
852 | rd_kafka_replyq_t replyq, |
853 | rd_kafka_resp_cb_t *resp_cb, |
854 | void *opaque, const char *reason) { |
855 | rd_kafka_buf_t *rkbuf; |
856 | ssize_t of_TopicCnt = -1; |
857 | int TopicCnt = 0; |
858 | const char *last_topic = NULL; |
859 | ssize_t of_PartCnt = -1; |
860 | int PartCnt = 0; |
861 | int tot_PartCnt = 0; |
862 | int i; |
863 | |
864 | rd_kafka_assert(NULL, offsets != NULL); |
865 | |
866 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, |
867 | 1, 100 + (offsets->cnt * 128)); |
868 | |
869 | /* ConsumerGroup */ |
870 | rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id); |
871 | |
872 | /* v1,v2 */ |
873 | if (api_version >= 1) { |
874 | /* ConsumerGroupGenerationId */ |
875 | rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id); |
876 | /* ConsumerId */ |
877 | rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id); |
878 | /* v2: RetentionTime */ |
879 | if (api_version == 2) |
880 | rd_kafka_buf_write_i64(rkbuf, -1); |
881 | } |
882 | |
883 | /* Sort offsets by topic */ |
884 | rd_kafka_topic_partition_list_sort_by_topic(offsets); |
885 | |
886 | /* TopicArrayCnt: Will be updated when we know the number of topics. */ |
887 | of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
888 | |
889 | for (i = 0 ; i < offsets->cnt ; i++) { |
890 | rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; |
891 | |
892 | /* Skip partitions with invalid offset. */ |
893 | if (rktpar->offset < 0) |
894 | continue; |
895 | |
896 | if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { |
897 | /* New topic */ |
898 | |
899 | /* Finalize previous PartitionCnt */ |
900 | if (PartCnt > 0) |
901 | rd_kafka_buf_update_u32(rkbuf, of_PartCnt, |
902 | PartCnt); |
903 | |
904 | /* TopicName */ |
905 | rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
906 | /* PartitionCnt, finalized later */ |
907 | of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
908 | PartCnt = 0; |
909 | last_topic = rktpar->topic; |
910 | TopicCnt++; |
911 | } |
912 | |
913 | /* Partition */ |
914 | rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
915 | PartCnt++; |
916 | tot_PartCnt++; |
917 | |
918 | /* Offset */ |
919 | rd_kafka_buf_write_i64(rkbuf, rktpar->offset); |
920 | |
921 | /* v1: TimeStamp */ |
922 | if (api_version == 1) |
923 | rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time |
924 | |
925 | /* Metadata */ |
926 | /* Java client 0.9.0 and broker <0.10.0 can't parse |
927 | * Null metadata fields, so as a workaround we send an |
928 | * empty string if it's Null. */ |
929 | if (!rktpar->metadata) |
930 | rd_kafka_buf_write_str(rkbuf, "" , 0); |
931 | else |
932 | rd_kafka_buf_write_str(rkbuf, |
933 | rktpar->metadata, |
934 | rktpar->metadata_size); |
935 | } |
936 | |
937 | if (tot_PartCnt == 0) { |
938 | /* No topic+partitions had valid offsets to commit. */ |
939 | rd_kafka_replyq_destroy(&replyq); |
940 | rd_kafka_buf_destroy(rkbuf); |
941 | return 0; |
942 | } |
943 | |
944 | /* Finalize previous PartitionCnt */ |
945 | if (PartCnt > 0) |
946 | rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); |
947 | |
948 | /* Finalize TopicCnt */ |
949 | rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); |
950 | |
951 | rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0); |
952 | |
953 | rd_rkb_dbg(rkb, TOPIC, "OFFSET" , |
954 | "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s" , |
955 | api_version, tot_PartCnt, offsets->cnt, reason); |
956 | |
957 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
958 | |
959 | return 1; |
960 | |
961 | } |
962 | |
963 | |
964 | |
965 | /** |
966 | * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to |
967 | * enveloping buffer \p rkbuf. |
968 | */ |
969 | static void rd_kafka_group_MemberState_consumer_write ( |
970 | rd_kafka_buf_t *env_rkbuf, |
971 | const rd_kafka_group_member_t *rkgm) { |
972 | rd_kafka_buf_t *rkbuf; |
973 | int i; |
974 | const char *last_topic = NULL; |
975 | size_t of_TopicCnt; |
976 | ssize_t of_PartCnt = -1; |
977 | int TopicCnt = 0; |
978 | int PartCnt = 0; |
979 | rd_slice_t slice; |
980 | |
981 | rkbuf = rd_kafka_buf_new(1, 100); |
982 | rd_kafka_buf_write_i16(rkbuf, 0); /* Version */ |
983 | of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */ |
984 | for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) { |
985 | const rd_kafka_topic_partition_t *rktpar; |
986 | |
987 | rktpar = &rkgm->rkgm_assignment->elems[i]; |
988 | |
989 | if (!last_topic || strcmp(last_topic, |
990 | rktpar->topic)) { |
991 | if (last_topic) |
992 | /* Finalize previous PartitionCnt */ |
993 | rd_kafka_buf_update_i32(rkbuf, of_PartCnt, |
994 | PartCnt); |
995 | rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); |
996 | /* Updated later */ |
997 | of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
998 | PartCnt = 0; |
999 | last_topic = rktpar->topic; |
1000 | TopicCnt++; |
1001 | } |
1002 | |
1003 | rd_kafka_buf_write_i32(rkbuf, rktpar->partition); |
1004 | PartCnt++; |
1005 | } |
1006 | |
1007 | if (of_PartCnt != -1) |
1008 | rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt); |
1009 | rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt); |
1010 | |
1011 | rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata); |
1012 | |
1013 | /* Get pointer to binary buffer */ |
1014 | rd_slice_init_full(&slice, &rkbuf->rkbuf_buf); |
1015 | |
1016 | /* Write binary buffer as Kafka Bytes to enveloping buffer. */ |
1017 | rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice)); |
1018 | rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice); |
1019 | |
1020 | rd_kafka_buf_destroy(rkbuf); |
1021 | } |
1022 | |
1023 | /** |
1024 | * Send SyncGroupRequest |
1025 | */ |
1026 | void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb, |
1027 | const rd_kafkap_str_t *group_id, |
1028 | int32_t generation_id, |
1029 | const rd_kafkap_str_t *member_id, |
1030 | const rd_kafka_group_member_t |
1031 | *assignments, |
1032 | int assignment_cnt, |
1033 | rd_kafka_replyq_t replyq, |
1034 | rd_kafka_resp_cb_t *resp_cb, |
1035 | void *opaque) { |
1036 | rd_kafka_buf_t *rkbuf; |
1037 | int i; |
1038 | |
1039 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup, |
1040 | 1, |
1041 | RD_KAFKAP_STR_SIZE(group_id) + |
1042 | 4 /* GenerationId */ + |
1043 | RD_KAFKAP_STR_SIZE(member_id) + |
1044 | 4 /* array size group_assignment */ + |
1045 | (assignment_cnt * 100/*guess*/)); |
1046 | rd_kafka_buf_write_kstr(rkbuf, group_id); |
1047 | rd_kafka_buf_write_i32(rkbuf, generation_id); |
1048 | rd_kafka_buf_write_kstr(rkbuf, member_id); |
1049 | rd_kafka_buf_write_i32(rkbuf, assignment_cnt); |
1050 | |
1051 | for (i = 0 ; i < assignment_cnt ; i++) { |
1052 | const rd_kafka_group_member_t *rkgm = &assignments[i]; |
1053 | |
1054 | rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id); |
1055 | rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm); |
1056 | } |
1057 | |
1058 | /* This is a blocking request */ |
1059 | rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; |
1060 | rd_kafka_buf_set_abs_timeout( |
1061 | rkbuf, |
1062 | rkb->rkb_rk->rk_conf.group_session_timeout_ms + |
1063 | 3000/* 3s grace period*/, |
1064 | 0); |
1065 | |
1066 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
1067 | } |
1068 | |
1069 | /** |
1070 | * Handler for SyncGroup responses |
1071 | * opaque must be the cgrp handle. |
1072 | */ |
1073 | void rd_kafka_handle_SyncGroup (rd_kafka_t *rk, |
1074 | rd_kafka_broker_t *rkb, |
1075 | rd_kafka_resp_err_t err, |
1076 | rd_kafka_buf_t *rkbuf, |
1077 | rd_kafka_buf_t *request, |
1078 | void *opaque) { |
1079 | rd_kafka_cgrp_t *rkcg = opaque; |
1080 | const int log_decode_errors = LOG_ERR; |
1081 | int16_t ErrorCode = 0; |
1082 | rd_kafkap_bytes_t MemberState = RD_ZERO_INIT; |
1083 | int actions; |
1084 | |
1085 | if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { |
1086 | rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP" , |
1087 | "SyncGroup response: discarding outdated request " |
1088 | "(now in join-state %s)" , |
1089 | rd_kafka_cgrp_join_state_names[rkcg-> |
1090 | rkcg_join_state]); |
1091 | return; |
1092 | } |
1093 | |
1094 | if (err) { |
1095 | ErrorCode = err; |
1096 | goto err; |
1097 | } |
1098 | |
1099 | rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
1100 | rd_kafka_buf_read_bytes(rkbuf, &MemberState); |
1101 | |
1102 | err: |
1103 | actions = rd_kafka_err_action(rkb, ErrorCode, request, |
1104 | RD_KAFKA_ERR_ACTION_END); |
1105 | |
1106 | if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
1107 | /* Re-query for coordinator */ |
1108 | rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, |
1109 | RD_KAFKA_OP_COORD_QUERY, |
1110 | ErrorCode); |
1111 | /* FALLTHRU */ |
1112 | } |
1113 | |
1114 | if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
1115 | if (rd_kafka_buf_retry(rkb, request)) |
1116 | return; |
1117 | /* FALLTHRU */ |
1118 | } |
1119 | |
1120 | rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP" , |
1121 | "SyncGroup response: %s (%d bytes of MemberState data)" , |
1122 | rd_kafka_err2str(ErrorCode), |
1123 | RD_KAFKAP_BYTES_LEN(&MemberState)); |
1124 | |
1125 | if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) |
1126 | return; /* Termination */ |
1127 | |
1128 | rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState); |
1129 | |
1130 | return; |
1131 | |
1132 | err_parse: |
1133 | ErrorCode = rkbuf->rkbuf_err; |
1134 | goto err; |
1135 | } |
1136 | |
1137 | |
1138 | /** |
1139 | * Send JoinGroupRequest |
1140 | */ |
1141 | void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb, |
1142 | const rd_kafkap_str_t *group_id, |
1143 | const rd_kafkap_str_t *member_id, |
1144 | const rd_kafkap_str_t *protocol_type, |
1145 | const rd_list_t *topics, |
1146 | rd_kafka_replyq_t replyq, |
1147 | rd_kafka_resp_cb_t *resp_cb, |
1148 | void *opaque) { |
1149 | rd_kafka_buf_t *rkbuf; |
1150 | rd_kafka_t *rk = rkb->rkb_rk; |
1151 | rd_kafka_assignor_t *rkas; |
1152 | int i; |
1153 | int16_t ApiVersion = 0; |
1154 | int features; |
1155 | |
1156 | ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, |
1157 | RD_KAFKAP_JoinGroup, |
1158 | 0, 2, |
1159 | &features); |
1160 | |
1161 | |
1162 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup, |
1163 | 1, |
1164 | RD_KAFKAP_STR_SIZE(group_id) + |
1165 | 4 /* sessionTimeoutMs */ + |
1166 | 4 /* rebalanceTimeoutMs */ + |
1167 | RD_KAFKAP_STR_SIZE(member_id) + |
1168 | RD_KAFKAP_STR_SIZE(protocol_type) + |
1169 | 4 /* array count GroupProtocols */ + |
1170 | (rd_list_cnt(topics) * 100)); |
1171 | rd_kafka_buf_write_kstr(rkbuf, group_id); |
1172 | rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms); |
1173 | if (ApiVersion >= 1) |
1174 | rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.max_poll_interval_ms); |
1175 | rd_kafka_buf_write_kstr(rkbuf, member_id); |
1176 | rd_kafka_buf_write_kstr(rkbuf, protocol_type); |
1177 | rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt); |
1178 | |
1179 | RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) { |
1180 | rd_kafkap_bytes_t *member_metadata; |
1181 | if (!rkas->rkas_enabled) |
1182 | continue; |
1183 | rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name); |
1184 | member_metadata = rkas->rkas_get_metadata_cb(rkas, topics); |
1185 | rd_kafka_buf_write_kbytes(rkbuf, member_metadata); |
1186 | rd_kafkap_bytes_destroy(member_metadata); |
1187 | } |
1188 | |
1189 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
1190 | |
1191 | if (ApiVersion < 1 && |
1192 | rk->rk_conf.max_poll_interval_ms > |
1193 | rk->rk_conf.group_session_timeout_ms && |
1194 | rd_interval(&rkb->rkb_suppress.unsupported_kip62, |
1195 | /* at most once per day */ |
1196 | (rd_ts_t)86400 * 1000 * 1000, 0) > 0) |
1197 | rd_rkb_log(rkb, LOG_NOTICE, "MAXPOLL" , |
1198 | "Broker does not support KIP-62 " |
1199 | "(requires Apache Kafka >= v0.10.1.0): " |
1200 | "consumer configuration " |
1201 | "`max.poll.interval.ms` (%d) " |
1202 | "is effectively limited " |
1203 | "by `session.timeout.ms` (%d) " |
1204 | "with this broker version" , |
1205 | rk->rk_conf.max_poll_interval_ms, |
1206 | rk->rk_conf.group_session_timeout_ms); |
1207 | |
1208 | /* Absolute timeout */ |
1209 | rd_kafka_buf_set_abs_timeout_force( |
1210 | rkbuf, |
1211 | /* Request timeout is max.poll.interval.ms + grace |
1212 | * if the broker supports it, else |
1213 | * session.timeout.ms + grace. */ |
1214 | (ApiVersion >= 1 ? |
1215 | rk->rk_conf.max_poll_interval_ms : |
1216 | rk->rk_conf.group_session_timeout_ms) + |
1217 | 3000/* 3s grace period*/, |
1218 | 0); |
1219 | |
1220 | /* This is a blocking request */ |
1221 | rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; |
1222 | |
1223 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
1224 | } |
1225 | |
1226 | |
1227 | |
1228 | |
1229 | |
1230 | |
1231 | /** |
1232 | * Send LeaveGroupRequest |
1233 | */ |
1234 | void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb, |
1235 | const rd_kafkap_str_t *group_id, |
1236 | const rd_kafkap_str_t *member_id, |
1237 | rd_kafka_replyq_t replyq, |
1238 | rd_kafka_resp_cb_t *resp_cb, |
1239 | void *opaque) { |
1240 | rd_kafka_buf_t *rkbuf; |
1241 | |
1242 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, |
1243 | 1, |
1244 | RD_KAFKAP_STR_SIZE(group_id) + |
1245 | RD_KAFKAP_STR_SIZE(member_id)); |
1246 | rd_kafka_buf_write_kstr(rkbuf, group_id); |
1247 | rd_kafka_buf_write_kstr(rkbuf, member_id); |
1248 | |
1249 | /* LeaveGroupRequests are best-effort, the local consumer |
1250 | * does not care if it succeeds or not, so the request timeout |
1251 | * is shortened. |
1252 | * Retries are not needed. */ |
1253 | rd_kafka_buf_set_abs_timeout(rkbuf, 5000, 0); |
1254 | rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES; |
1255 | |
1256 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
1257 | } |
1258 | |
1259 | |
1260 | /** |
1261 | * Handler for LeaveGroup responses |
1262 | * opaque must be the cgrp handle. |
1263 | */ |
1264 | void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk, |
1265 | rd_kafka_broker_t *rkb, |
1266 | rd_kafka_resp_err_t err, |
1267 | rd_kafka_buf_t *rkbuf, |
1268 | rd_kafka_buf_t *request, |
1269 | void *opaque) { |
1270 | rd_kafka_cgrp_t *rkcg = opaque; |
1271 | const int log_decode_errors = LOG_ERR; |
1272 | int16_t ErrorCode = 0; |
1273 | int actions; |
1274 | |
1275 | if (err) { |
1276 | ErrorCode = err; |
1277 | goto err; |
1278 | } |
1279 | |
1280 | rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
1281 | |
1282 | err: |
1283 | actions = rd_kafka_err_action(rkb, ErrorCode, request, |
1284 | RD_KAFKA_ERR_ACTION_END); |
1285 | |
1286 | if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
1287 | /* Re-query for coordinator */ |
1288 | rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, |
1289 | RD_KAFKA_OP_COORD_QUERY, ErrorCode); |
1290 | } |
1291 | |
1292 | if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
1293 | if (rd_kafka_buf_retry(rkb, request)) |
1294 | return; |
1295 | /* FALLTHRU */ |
1296 | } |
1297 | |
1298 | if (ErrorCode) |
1299 | rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP" , |
1300 | "LeaveGroup response: %s" , |
1301 | rd_kafka_err2str(ErrorCode)); |
1302 | |
1303 | return; |
1304 | |
1305 | err_parse: |
1306 | ErrorCode = rkbuf->rkbuf_err; |
1307 | goto err; |
1308 | } |
1309 | |
1310 | |
1311 | |
1312 | |
1313 | |
1314 | |
1315 | /** |
1316 | * Send HeartbeatRequest |
1317 | */ |
1318 | void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb, |
1319 | const rd_kafkap_str_t *group_id, |
1320 | int32_t generation_id, |
1321 | const rd_kafkap_str_t *member_id, |
1322 | rd_kafka_replyq_t replyq, |
1323 | rd_kafka_resp_cb_t *resp_cb, |
1324 | void *opaque) { |
1325 | rd_kafka_buf_t *rkbuf; |
1326 | |
1327 | rd_rkb_dbg(rkb, CGRP, "HEARTBEAT" , |
1328 | "Heartbeat for group \"%s\" generation id %" PRId32, |
1329 | group_id->str, generation_id); |
1330 | |
1331 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, |
1332 | 1, |
1333 | RD_KAFKAP_STR_SIZE(group_id) + |
1334 | 4 /* GenerationId */ + |
1335 | RD_KAFKAP_STR_SIZE(member_id)); |
1336 | |
1337 | rd_kafka_buf_write_kstr(rkbuf, group_id); |
1338 | rd_kafka_buf_write_i32(rkbuf, generation_id); |
1339 | rd_kafka_buf_write_kstr(rkbuf, member_id); |
1340 | |
1341 | rd_kafka_buf_set_abs_timeout( |
1342 | rkbuf, |
1343 | rkb->rkb_rk->rk_conf.group_session_timeout_ms, |
1344 | 0); |
1345 | |
1346 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
1347 | } |
1348 | |
1349 | |
1350 | |
1351 | |
1352 | /** |
1353 | * Send ListGroupsRequest |
1354 | */ |
1355 | void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb, |
1356 | rd_kafka_replyq_t replyq, |
1357 | rd_kafka_resp_cb_t *resp_cb, |
1358 | void *opaque) { |
1359 | rd_kafka_buf_t *rkbuf; |
1360 | |
1361 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0); |
1362 | |
1363 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
1364 | } |
1365 | |
1366 | |
1367 | /** |
1368 | * Send DescribeGroupsRequest |
1369 | */ |
1370 | void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb, |
1371 | const char **groups, int group_cnt, |
1372 | rd_kafka_replyq_t replyq, |
1373 | rd_kafka_resp_cb_t *resp_cb, |
1374 | void *opaque) { |
1375 | rd_kafka_buf_t *rkbuf; |
1376 | |
1377 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups, |
1378 | 1, 32*group_cnt); |
1379 | |
1380 | rd_kafka_buf_write_i32(rkbuf, group_cnt); |
1381 | while (group_cnt-- > 0) |
1382 | rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1); |
1383 | |
1384 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
1385 | } |
1386 | |
1387 | |
1388 | |
1389 | |
1390 | /** |
1391 | * @brief Generic handler for Metadata responses |
1392 | * |
1393 | * @locality rdkafka main thread |
1394 | */ |
1395 | static void rd_kafka_handle_Metadata (rd_kafka_t *rk, |
1396 | rd_kafka_broker_t *rkb, |
1397 | rd_kafka_resp_err_t err, |
1398 | rd_kafka_buf_t *rkbuf, |
1399 | rd_kafka_buf_t *request, |
1400 | void *opaque) { |
1401 | rd_kafka_op_t *rko = opaque; /* Possibly NULL */ |
1402 | struct rd_kafka_metadata *md = NULL; |
1403 | const rd_list_t *topics = request->rkbuf_u.Metadata.topics; |
1404 | int actions; |
1405 | |
1406 | rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || |
1407 | thrd_is_current(rk->rk_thread)); |
1408 | |
1409 | /* Avoid metadata updates when we're terminating. */ |
1410 | if (rd_kafka_terminating(rkb->rkb_rk) || |
1411 | err == RD_KAFKA_RESP_ERR__DESTROY) { |
1412 | /* Terminating */ |
1413 | goto done; |
1414 | } |
1415 | |
1416 | if (err) |
1417 | goto err; |
1418 | |
1419 | if (!topics) |
1420 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
1421 | "===== Received metadata: %s =====" , |
1422 | request->rkbuf_u.Metadata.reason); |
1423 | else |
1424 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
1425 | "===== Received metadata " |
1426 | "(for %d requested topics): %s =====" , |
1427 | rd_list_cnt(topics), |
1428 | request->rkbuf_u.Metadata.reason); |
1429 | |
1430 | err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md); |
1431 | if (err) |
1432 | goto err; |
1433 | |
1434 | if (rko && rko->rko_replyq.q) { |
1435 | /* Reply to metadata requester, passing on the metadata. |
1436 | * Reuse requesting rko for the reply. */ |
1437 | rko->rko_err = err; |
1438 | rko->rko_u.metadata.md = md; |
1439 | |
1440 | rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); |
1441 | rko = NULL; |
1442 | } else { |
1443 | if (md) |
1444 | rd_free(md); |
1445 | } |
1446 | |
1447 | goto done; |
1448 | |
1449 | err: |
1450 | actions = rd_kafka_err_action( |
1451 | rkb, err, request, |
1452 | |
1453 | RD_KAFKA_ERR_ACTION_RETRY, |
1454 | RD_KAFKA_RESP_ERR__PARTIAL, |
1455 | |
1456 | RD_KAFKA_ERR_ACTION_END); |
1457 | |
1458 | if (actions & RD_KAFKA_ERR_ACTION_RETRY) { |
1459 | if (rd_kafka_buf_retry(rkb, request)) |
1460 | return; |
1461 | /* FALLTHRU */ |
1462 | } else { |
1463 | rd_rkb_log(rkb, LOG_WARNING, "METADATA" , |
1464 | "Metadata request failed: %s: %s (%dms): %s" , |
1465 | request->rkbuf_u.Metadata.reason, |
1466 | rd_kafka_err2str(err), |
1467 | (int)(request->rkbuf_ts_sent/1000), |
1468 | rd_kafka_actions2str(actions)); |
1469 | } |
1470 | |
1471 | |
1472 | |
1473 | /* FALLTHRU */ |
1474 | |
1475 | done: |
1476 | if (rko) |
1477 | rd_kafka_op_destroy(rko); |
1478 | } |
1479 | |
1480 | |
1481 | /** |
1482 | * @brief Construct MetadataRequest (does not send) |
1483 | * |
1484 | * \p topics is a list of topic names (char *) to request. |
1485 | * |
1486 | * !topics - only request brokers (if supported by broker, else |
1487 | * all topics) |
1488 | * topics.cnt==0 - all topics in cluster are requested |
1489 | * topics.cnt >0 - only specified topics are requested |
1490 | * |
1491 | * @param reason - metadata request reason |
1492 | * @param rko - (optional) rko with replyq for handling response. |
1493 | * Specifying an rko forces a metadata request even if |
1494 | * there is already a matching one in-transit. |
1495 | * |
1496 | * If full metadata for all topics is requested (or all brokers, which |
1497 | * results in all-topics on older brokers) and there is already a full request |
1498 | * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS |
1499 | * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request |
1500 | * is sent regardless. |
1501 | */ |
1502 | rd_kafka_resp_err_t |
1503 | rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb, |
1504 | const rd_list_t *topics, const char *reason, |
1505 | rd_kafka_op_t *rko) { |
1506 | rd_kafka_buf_t *rkbuf; |
1507 | int16_t ApiVersion = 0; |
1508 | int features; |
1509 | int topic_cnt = topics ? rd_list_cnt(topics) : 0; |
1510 | int *full_incr = NULL; |
1511 | |
1512 | ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, |
1513 | RD_KAFKAP_Metadata, |
1514 | 0, 2, |
1515 | &features); |
1516 | |
1517 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1, |
1518 | 4 + (50 * topic_cnt)); |
1519 | |
1520 | if (!reason) |
1521 | reason = "" ; |
1522 | |
1523 | rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason); |
1524 | |
1525 | if (!topics && ApiVersion >= 1) { |
1526 | /* a null(0) array (in the protocol) represents no topics */ |
1527 | rd_kafka_buf_write_i32(rkbuf, 0); |
1528 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
1529 | "Request metadata for brokers only: %s" , reason); |
1530 | full_incr = &rkb->rkb_rk->rk_metadata_cache. |
1531 | rkmc_full_brokers_sent; |
1532 | |
1533 | } else { |
1534 | if (topic_cnt == 0 && !rko) |
1535 | full_incr = &rkb->rkb_rk->rk_metadata_cache. |
1536 | rkmc_full_topics_sent; |
1537 | |
1538 | if (topic_cnt == 0 && ApiVersion >= 1) |
1539 | rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/ |
1540 | else |
1541 | rd_kafka_buf_write_i32(rkbuf, topic_cnt); |
1542 | |
1543 | if (topic_cnt == 0) { |
1544 | rkbuf->rkbuf_u.Metadata.all_topics = 1; |
1545 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
1546 | "Request metadata for all topics: " |
1547 | "%s" , reason); |
1548 | } else |
1549 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
1550 | "Request metadata for %d topic(s): " |
1551 | "%s" , topic_cnt, reason); |
1552 | } |
1553 | |
1554 | if (full_incr) { |
1555 | /* Avoid multiple outstanding full requests |
1556 | * (since they are redundant and side-effect-less). |
1557 | * Forced requests (app using metadata() API) are passed |
1558 | * through regardless. */ |
1559 | |
1560 | mtx_lock(&rkb->rkb_rk->rk_metadata_cache. |
1561 | rkmc_full_lock); |
1562 | if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) { |
1563 | mtx_unlock(&rkb->rkb_rk->rk_metadata_cache. |
1564 | rkmc_full_lock); |
1565 | rd_rkb_dbg(rkb, METADATA, "METADATA" , |
1566 | "Skipping metadata request: %s: " |
1567 | "full request already in-transit" , |
1568 | reason); |
1569 | rd_kafka_buf_destroy(rkbuf); |
1570 | return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; |
1571 | } |
1572 | |
1573 | (*full_incr)++; |
1574 | mtx_unlock(&rkb->rkb_rk->rk_metadata_cache. |
1575 | rkmc_full_lock); |
1576 | rkbuf->rkbuf_u.Metadata.decr = full_incr; |
1577 | rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk-> |
1578 | rk_metadata_cache.rkmc_full_lock; |
1579 | } |
1580 | |
1581 | |
1582 | if (topic_cnt > 0) { |
1583 | char *topic; |
1584 | int i; |
1585 | |
1586 | /* Maintain a copy of the topics list so we can purge |
1587 | * hints from the metadata cache on error. */ |
1588 | rkbuf->rkbuf_u.Metadata.topics = |
1589 | rd_list_copy(topics, rd_list_string_copy, NULL); |
1590 | |
1591 | RD_LIST_FOREACH(topic, topics, i) |
1592 | rd_kafka_buf_write_str(rkbuf, topic, -1); |
1593 | |
1594 | } |
1595 | |
1596 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
1597 | |
1598 | /* Metadata requests are part of the important control plane |
1599 | * and should go before most other requests (Produce, Fetch, etc). */ |
1600 | rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH; |
1601 | |
1602 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, |
1603 | /* Handle response thru rk_ops, |
1604 | * but forward parsed result to |
1605 | * rko's replyq when done. */ |
1606 | RD_KAFKA_REPLYQ(rkb->rkb_rk-> |
1607 | rk_ops, 0), |
1608 | rd_kafka_handle_Metadata, rko); |
1609 | |
1610 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
1611 | } |
1612 | |
1613 | |
1614 | |
1615 | |
1616 | |
1617 | |
1618 | |
1619 | |
1620 | |
1621 | /** |
1622 | * @brief Parses and handles ApiVersion reply. |
1623 | * |
1624 | * @param apis will be allocated, populated and sorted |
1625 | * with broker's supported APIs. |
1626 | * @param api_cnt will be set to the number of elements in \p *apis |
1627 | |
1628 | * @returns 0 on success, else an error. |
1629 | */ |
1630 | rd_kafka_resp_err_t |
1631 | rd_kafka_handle_ApiVersion (rd_kafka_t *rk, |
1632 | rd_kafka_broker_t *rkb, |
1633 | rd_kafka_resp_err_t err, |
1634 | rd_kafka_buf_t *rkbuf, |
1635 | rd_kafka_buf_t *request, |
1636 | struct rd_kafka_ApiVersion **apis, |
1637 | size_t *api_cnt) { |
1638 | const int log_decode_errors = LOG_ERR; |
1639 | int32_t ApiArrayCnt; |
1640 | int16_t ErrorCode; |
1641 | int i = 0; |
1642 | |
1643 | *apis = NULL; |
1644 | |
1645 | if (err) |
1646 | goto err; |
1647 | |
1648 | rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
1649 | if ((err = ErrorCode)) |
1650 | goto err; |
1651 | |
1652 | rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt); |
1653 | if (ApiArrayCnt > 1000) |
1654 | rd_kafka_buf_parse_fail(rkbuf, |
1655 | "ApiArrayCnt %" PRId32" out of range" , |
1656 | ApiArrayCnt); |
1657 | |
1658 | rd_rkb_dbg(rkb, FEATURE, "APIVERSION" , |
1659 | "Broker API support:" ); |
1660 | |
1661 | *apis = malloc(sizeof(**apis) * ApiArrayCnt); |
1662 | |
1663 | for (i = 0 ; i < ApiArrayCnt ; i++) { |
1664 | struct rd_kafka_ApiVersion *api = &(*apis)[i]; |
1665 | |
1666 | rd_kafka_buf_read_i16(rkbuf, &api->ApiKey); |
1667 | rd_kafka_buf_read_i16(rkbuf, &api->MinVer); |
1668 | rd_kafka_buf_read_i16(rkbuf, &api->MaxVer); |
1669 | |
1670 | rd_rkb_dbg(rkb, FEATURE, "APIVERSION" , |
1671 | " ApiKey %s (%hd) Versions %hd..%hd" , |
1672 | rd_kafka_ApiKey2str(api->ApiKey), |
1673 | api->ApiKey, api->MinVer, api->MaxVer); |
1674 | } |
1675 | |
1676 | *api_cnt = ApiArrayCnt; |
1677 | qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp); |
1678 | |
1679 | goto done; |
1680 | |
1681 | err_parse: |
1682 | err = rkbuf->rkbuf_err; |
1683 | err: |
1684 | if (*apis) |
1685 | rd_free(*apis); |
1686 | |
1687 | /* There are no retryable errors. */ |
1688 | |
1689 | done: |
1690 | return err; |
1691 | } |
1692 | |
1693 | |
1694 | |
1695 | /** |
1696 | * Send ApiVersionRequest (KIP-35) |
1697 | */ |
1698 | void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb, |
1699 | rd_kafka_replyq_t replyq, |
1700 | rd_kafka_resp_cb_t *resp_cb, |
1701 | void *opaque) { |
1702 | rd_kafka_buf_t *rkbuf; |
1703 | |
1704 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4); |
1705 | |
1706 | /* Should be sent before any other requests since it is part of |
1707 | * the initial connection handshake. */ |
1708 | rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; |
1709 | |
1710 | rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */ |
1711 | |
1712 | /* Non-supporting brokers will tear down the connection when they |
1713 | * receive an unknown API request, so dont retry request on failure. */ |
1714 | rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES; |
1715 | |
1716 | /* 0.9.0.x brokers will not close the connection on unsupported |
1717 | * API requests, so we minimize the timeout for the request. |
1718 | * This is a regression on the broker part. */ |
1719 | rd_kafka_buf_set_abs_timeout( |
1720 | rkbuf, |
1721 | rkb->rkb_rk->rk_conf.api_version_request_timeout_ms, |
1722 | 0); |
1723 | |
1724 | if (replyq.q) |
1725 | rd_kafka_broker_buf_enq_replyq(rkb, |
1726 | rkbuf, replyq, resp_cb, opaque); |
1727 | else /* in broker thread */ |
1728 | rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); |
1729 | } |
1730 | |
1731 | |
1732 | /** |
1733 | * Send SaslHandshakeRequest (KIP-43) |
1734 | */ |
1735 | void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb, |
1736 | const char *mechanism, |
1737 | rd_kafka_replyq_t replyq, |
1738 | rd_kafka_resp_cb_t *resp_cb, |
1739 | void *opaque) { |
1740 | rd_kafka_buf_t *rkbuf; |
1741 | int mechlen = (int)strlen(mechanism); |
1742 | |
1743 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake, |
1744 | 1, RD_KAFKAP_STR_SIZE0(mechlen)); |
1745 | |
1746 | /* Should be sent before any other requests since it is part of |
1747 | * the initial connection handshake. */ |
1748 | rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; |
1749 | |
1750 | rd_kafka_buf_write_str(rkbuf, mechanism, mechlen); |
1751 | |
1752 | /* Non-supporting brokers will tear down the conneciton when they |
1753 | * receive an unknown API request or where the SASL GSSAPI |
1754 | * token type is not recognized, so dont retry request on failure. */ |
1755 | rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES; |
1756 | |
1757 | /* 0.9.0.x brokers will not close the connection on unsupported |
1758 | * API requests, so we minimize the timeout of the request. |
1759 | * This is a regression on the broker part. */ |
1760 | if (!rkb->rkb_rk->rk_conf.api_version_request && |
1761 | rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000) |
1762 | rd_kafka_buf_set_abs_timeout(rkbuf, 10*1000 /*10s*/, 0); |
1763 | |
1764 | if (replyq.q) |
1765 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, |
1766 | resp_cb, opaque); |
1767 | else /* in broker thread */ |
1768 | rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); |
1769 | } |
1770 | |
1771 | |
1772 | |
1773 | /** |
1774 | * @struct Hold temporary result and return values from ProduceResponse |
1775 | */ |
1776 | struct rd_kafka_Produce_result { |
1777 | int64_t offset; /**< Assigned offset of first message */ |
1778 | int64_t timestamp; /**< (Possibly assigned) offset of first message */ |
1779 | }; |
1780 | |
1781 | /** |
1782 | * @brief Parses a Produce reply. |
1783 | * @returns 0 on success or an error code on failure. |
1784 | * @locality broker thread |
1785 | */ |
1786 | static rd_kafka_resp_err_t |
1787 | rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb, |
1788 | rd_kafka_toppar_t *rktp, |
1789 | rd_kafka_buf_t *rkbuf, |
1790 | rd_kafka_buf_t *request, |
1791 | struct rd_kafka_Produce_result *result) { |
1792 | int32_t TopicArrayCnt; |
1793 | int32_t PartitionArrayCnt; |
1794 | struct { |
1795 | int32_t Partition; |
1796 | int16_t ErrorCode; |
1797 | int64_t Offset; |
1798 | } hdr; |
1799 | const int log_decode_errors = LOG_ERR; |
1800 | int64_t log_start_offset = -1; |
1801 | |
1802 | rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
1803 | if (TopicArrayCnt != 1) |
1804 | goto err; |
1805 | |
1806 | /* Since we only produce to one single topic+partition in each |
1807 | * request we assume that the reply only contains one topic+partition |
1808 | * and that it is the same that we requested. |
1809 | * If not the broker is buggy. */ |
1810 | rd_kafka_buf_skip_str(rkbuf); |
1811 | rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); |
1812 | |
1813 | if (PartitionArrayCnt != 1) |
1814 | goto err; |
1815 | |
1816 | rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); |
1817 | rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); |
1818 | rd_kafka_buf_read_i64(rkbuf, &hdr.Offset); |
1819 | |
1820 | result->offset = hdr.Offset; |
1821 | |
1822 | result->timestamp = -1; |
1823 | if (request->rkbuf_reqhdr.ApiVersion >= 2) |
1824 | rd_kafka_buf_read_i64(rkbuf, &result->timestamp); |
1825 | |
1826 | if (request->rkbuf_reqhdr.ApiVersion >= 5) |
1827 | rd_kafka_buf_read_i64(rkbuf, &log_start_offset); |
1828 | |
1829 | if (request->rkbuf_reqhdr.ApiVersion >= 1) { |
1830 | int32_t Throttle_Time; |
1831 | rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); |
1832 | |
1833 | rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, |
1834 | Throttle_Time); |
1835 | } |
1836 | |
1837 | |
1838 | return hdr.ErrorCode; |
1839 | |
1840 | err_parse: |
1841 | return rkbuf->rkbuf_err; |
1842 | err: |
1843 | return RD_KAFKA_RESP_ERR__BAD_MSG; |
1844 | } |
1845 | |
1846 | |
1847 | /** |
1848 | * @struct Hold temporary Produce error state |
1849 | */ |
1850 | struct rd_kafka_Produce_err { |
1851 | rd_kafka_resp_err_t err; /**< Error code */ |
1852 | int actions; /**< Actions to take */ |
1853 | int incr_retry; /**< Increase per-message retry cnt */ |
1854 | rd_kafka_msg_status_t status; /**< Messages persistence status */ |
1855 | |
1856 | /* Idempotent Producer */ |
1857 | int32_t next_ack_seq; /**< Next expected sequence to ack */ |
1858 | int32_t next_err_seq; /**< Next expected error sequence */ |
1859 | rd_bool_t update_next_ack; /**< Update next_ack_seq */ |
1860 | rd_bool_t update_next_err; /**< Update next_err_seq */ |
1861 | rd_kafka_pid_t rktp_pid; /**< Partition's current PID */ |
1862 | int32_t last_seq; /**< Last sequence in current batch */ |
1863 | }; |
1864 | |
1865 | |
1866 | /** |
1867 | * @brief Error-handling for Idempotent Producer-specific Produce errors. |
1868 | * |
1869 | * May update \p errp, \p actionsp and \p incr_retryp. |
1870 | * |
1871 | * The resulting \p actionsp are handled by the caller. |
1872 | * |
1873 | * @warning May be called on the old leader thread. Lock rktp appropriately! |
1874 | * |
1875 | * @locality broker thread (but not necessarily the leader broker) |
1876 | * @locks none |
1877 | */ |
1878 | static void |
1879 | rd_kafka_handle_idempotent_Produce_error (rd_kafka_broker_t *rkb, |
1880 | rd_kafka_msgbatch_t *batch, |
1881 | struct rd_kafka_Produce_err *perr) { |
1882 | rd_kafka_t *rk = rkb->rkb_rk; |
1883 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp); |
1884 | rd_kafka_msg_t *firstmsg, *lastmsg; |
1885 | int r; |
1886 | rd_ts_t now = rd_clock(), state_age; |
1887 | struct rd_kafka_toppar_err last_err; |
1888 | |
1889 | rd_kafka_rdlock(rkb->rkb_rk); |
1890 | state_age = now - rkb->rkb_rk->rk_eos.ts_idemp_state; |
1891 | rd_kafka_rdunlock(rkb->rkb_rk); |
1892 | |
1893 | firstmsg = rd_kafka_msgq_first(&batch->msgq); |
1894 | lastmsg = rd_kafka_msgq_last(&batch->msgq); |
1895 | rd_assert(firstmsg && lastmsg); |
1896 | |
1897 | /* Store the last msgid of the batch |
1898 | * on the first message in case we need to retry |
1899 | * and thus reconstruct the entire batch. */ |
1900 | if (firstmsg->rkm_u.producer.last_msgid) { |
1901 | /* last_msgid already set, make sure it |
1902 | * actually points to the last message. */ |
1903 | rd_assert(firstmsg->rkm_u.producer.last_msgid == |
1904 | lastmsg->rkm_u.producer.msgid); |
1905 | } else { |
1906 | firstmsg->rkm_u.producer.last_msgid = |
1907 | lastmsg->rkm_u.producer.msgid; |
1908 | } |
1909 | |
1910 | if (!rd_kafka_pid_eq(batch->pid, perr->rktp_pid)) { |
1911 | /* Don't retry if PID changed since we can't |
1912 | * guarantee correctness across PID sessions. */ |
1913 | perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; |
1914 | perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; |
1915 | |
1916 | rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "ERRPID" , |
1917 | "%.*s [%" PRId32"] PID mismatch: " |
1918 | "request %s != partition %s: " |
1919 | "failing messages with error %s" , |
1920 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1921 | rktp->rktp_partition, |
1922 | rd_kafka_pid2str(batch->pid), |
1923 | rd_kafka_pid2str(perr->rktp_pid), |
1924 | rd_kafka_err2str(perr->err)); |
1925 | return; |
1926 | } |
1927 | |
1928 | /* |
1929 | * Special error handling |
1930 | */ |
1931 | switch (perr->err) |
1932 | { |
1933 | case RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER: |
1934 | /* Compare request's sequence to expected next |
1935 | * acked sequence. |
1936 | * |
1937 | * Example requests in flight: |
1938 | * R1(base_seq:5) R2(10) R3(15) R4(20) |
1939 | */ |
1940 | |
1941 | /* Acquire the last partition error to help |
1942 | * troubleshoot this problem. */ |
1943 | rd_kafka_toppar_lock(rktp); |
1944 | last_err = rktp->rktp_last_err; |
1945 | rd_kafka_toppar_unlock(rktp); |
1946 | |
1947 | r = batch->first_seq - perr->next_ack_seq; |
1948 | |
1949 | if (r == 0) { |
1950 | /* R1 failed: |
1951 | * If this was the head-of-line request in-flight it |
1952 | * means there is a state desynchronization between the |
1953 | * producer and broker (a bug), in which case |
1954 | * we'll raise a fatal error since we can no longer |
1955 | * reason about the state of messages and thus |
1956 | * not guarantee ordering or once-ness for R1, |
1957 | * nor give the user a chance to opt out of sending |
1958 | * R2 to R4 which would be retried automatically. */ |
1959 | |
1960 | rd_kafka_set_fatal_error( |
1961 | rk, perr->err, |
1962 | "ProduceRequest for %.*s [%" PRId32"] " |
1963 | "with %d message(s) failed " |
1964 | "due to sequence desynchronization with " |
1965 | "broker %" PRId32" (%s, base seq %" PRId32", " |
1966 | "idemp state change %" PRId64"ms ago, " |
1967 | "last partition error %s (actions %s, " |
1968 | "base seq %" PRId32"..%" PRId32 |
1969 | ", base msgid %" PRIu64", %" PRId64"ms ago)" , |
1970 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1971 | rktp->rktp_partition, |
1972 | rd_kafka_msgq_len(&batch->msgq), |
1973 | rkb->rkb_nodeid, |
1974 | rd_kafka_pid2str(batch->pid), |
1975 | batch->first_seq, |
1976 | state_age / 1000, |
1977 | rd_kafka_err2name(last_err.err), |
1978 | rd_kafka_actions2str(last_err.actions), |
1979 | last_err.base_seq, last_err.last_seq, |
1980 | last_err.base_msgid, |
1981 | last_err.ts ? |
1982 | (now - last_err.ts)/1000 : -1); |
1983 | |
1984 | perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; |
1985 | perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; |
1986 | perr->update_next_ack = rd_false; |
1987 | perr->update_next_err = rd_true; |
1988 | |
1989 | } else if (r > 0) { |
1990 | /* R2 failed: |
1991 | * With max.in.flight > 1 we can have a situation |
1992 | * where the first request in-flight (R1) to the broker |
1993 | * fails, which causes the sub-sequent requests |
1994 | * that are in-flight to have a non-sequential |
1995 | * sequence number and thus fail. |
1996 | * But these sub-sequent requests (R2 to R4) are not at |
1997 | * the risk of being duplicated so we bump the epoch and |
1998 | * re-enqueue the messages for later retry |
1999 | * (without incrementing retries). |
2000 | */ |
2001 | rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "ERRSEQ" , |
2002 | "ProduceRequest for %.*s [%" PRId32"] " |
2003 | "with %d message(s) failed " |
2004 | "due to skipped sequence numbers " |
2005 | "(%s, base seq %" PRId32" > " |
2006 | "next seq %" PRId32") " |
2007 | "caused by previous failed request " |
2008 | "(%s, actions %s, " |
2009 | "base seq %" PRId32"..%" PRId32 |
2010 | ", base msgid %" PRIu64", %" PRId64"ms ago): " |
2011 | "recovering and retrying" , |
2012 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2013 | rktp->rktp_partition, |
2014 | rd_kafka_msgq_len(&batch->msgq), |
2015 | rd_kafka_pid2str(batch->pid), |
2016 | batch->first_seq, |
2017 | perr->next_ack_seq, |
2018 | rd_kafka_err2name(last_err.err), |
2019 | rd_kafka_actions2str(last_err.actions), |
2020 | last_err.base_seq, last_err.last_seq, |
2021 | last_err.base_msgid, |
2022 | last_err.ts ? |
2023 | (now - last_err.ts)/1000 : -1); |
2024 | |
2025 | perr->incr_retry = 0; |
2026 | perr->actions = RD_KAFKA_ERR_ACTION_RETRY; |
2027 | perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; |
2028 | perr->update_next_ack = rd_false; |
2029 | perr->update_next_err = rd_true; |
2030 | |
2031 | rd_kafka_idemp_drain_epoch_bump( |
2032 | rk, "skipped sequence numbers" ); |
2033 | |
2034 | } else { |
2035 | /* Request's sequence is less than next ack, |
2036 | * this should never happen unless we have |
2037 | * local bug or the broker did not respond |
2038 | * to the requests in order. */ |
2039 | rd_kafka_set_fatal_error( |
2040 | rk, perr->err, |
2041 | "ProduceRequest for %.*s [%" PRId32"] " |
2042 | "with %d message(s) failed " |
2043 | "with rewound sequence number on " |
2044 | "broker %" PRId32" (%s, " |
2045 | "base seq %" PRId32" < next seq %" PRId32"): " |
2046 | "last error %s (actions %s, " |
2047 | "base seq %" PRId32"..%" PRId32 |
2048 | ", base msgid %" PRIu64", %" PRId64"ms ago)" , |
2049 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2050 | rktp->rktp_partition, |
2051 | rd_kafka_msgq_len(&batch->msgq), |
2052 | rkb->rkb_nodeid, |
2053 | rd_kafka_pid2str(batch->pid), |
2054 | batch->first_seq, |
2055 | perr->next_ack_seq, |
2056 | rd_kafka_err2name(last_err.err), |
2057 | rd_kafka_actions2str(last_err.actions), |
2058 | last_err.base_seq, last_err.last_seq, |
2059 | last_err.base_msgid, |
2060 | last_err.ts ? |
2061 | (now - last_err.ts)/1000 : -1); |
2062 | |
2063 | perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; |
2064 | perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; |
2065 | perr->update_next_ack = rd_false; |
2066 | perr->update_next_err = rd_false; |
2067 | } |
2068 | break; |
2069 | |
2070 | case RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER: |
2071 | /* This error indicates that we successfully produced |
2072 | * this set of messages before but this (supposed) retry failed. |
2073 | * |
2074 | * Treat as success, however offset and timestamp |
2075 | * will be invalid. */ |
2076 | |
2077 | /* Future improvement/FIXME: |
2078 | * But first make sure the first message has actually |
2079 | * been retried, getting this error for a non-retried message |
2080 | * indicates a synchronization issue or bug. */ |
2081 | rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "DUPSEQ" , |
2082 | "ProduceRequest for %.*s [%" PRId32"] " |
2083 | "with %d message(s) failed " |
2084 | "due to duplicate sequence number: " |
2085 | "previous send succeeded but was not acknowledged " |
2086 | "(%s, base seq %" PRId32"): " |
2087 | "marking the messages successfully delivered" , |
2088 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2089 | rktp->rktp_partition, |
2090 | rd_kafka_msgq_len(&batch->msgq), |
2091 | rd_kafka_pid2str(batch->pid), |
2092 | batch->first_seq); |
2093 | |
2094 | /* Void error, delivery succeeded */ |
2095 | perr->err = RD_KAFKA_RESP_ERR_NO_ERROR; |
2096 | perr->actions = 0; |
2097 | perr->status = RD_KAFKA_MSG_STATUS_PERSISTED; |
2098 | perr->update_next_ack = rd_true; |
2099 | perr->update_next_err = rd_true; |
2100 | break; |
2101 | |
2102 | case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: |
2103 | /* The broker/cluster lost track of our PID because |
2104 | * the last message we produced has now been deleted |
2105 | * (by DeleteRecords, compaction, or topic retention policy). |
2106 | * |
2107 | * If all previous messages are accounted for and this is not |
2108 | * a retry we can simply bump the epoch and reset the sequence |
2109 | * number and then retry the message(s) again. |
2110 | * |
2111 | * If there are outstanding messages not yet acknowledged |
2112 | * then there is no safe way to carry on without risking |
2113 | * duplication or reordering, in which case we fail |
2114 | * the producer. */ |
2115 | |
2116 | if (!firstmsg->rkm_u.producer.retries && |
2117 | perr->next_err_seq == batch->first_seq) { |
2118 | rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "UNKPID" , |
2119 | "ProduceRequest for %.*s [%" PRId32"] " |
2120 | "with %d message(s) failed " |
2121 | "due to unknown producer id " |
2122 | "(%s, base seq %" PRId32", %d retries): " |
2123 | "no risk of duplication/reordering: " |
2124 | "resetting PID and retrying" , |
2125 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2126 | rktp->rktp_partition, |
2127 | rd_kafka_msgq_len(&batch->msgq), |
2128 | rd_kafka_pid2str(batch->pid), |
2129 | batch->first_seq, |
2130 | firstmsg->rkm_u.producer.retries); |
2131 | |
2132 | /* Drain outstanding requests and bump epoch. */ |
2133 | rd_kafka_idemp_drain_epoch_bump(rk, |
2134 | "unknown producer id" ); |
2135 | |
2136 | perr->incr_retry = 0; |
2137 | perr->actions = RD_KAFKA_ERR_ACTION_RETRY; |
2138 | perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; |
2139 | perr->update_next_ack = rd_false; |
2140 | perr->update_next_err = rd_true; |
2141 | break; |
2142 | } |
2143 | |
2144 | rd_kafka_set_fatal_error( |
2145 | rk, perr->err, |
2146 | "ProduceRequest for %.*s [%" PRId32"] " |
2147 | "with %d message(s) failed " |
2148 | "due to unknown producer id (" |
2149 | "broker %" PRId32" %s, base seq %" PRId32", %d retries): " |
2150 | "unable to retry without risking " |
2151 | "duplication/reordering" , |
2152 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2153 | rktp->rktp_partition, |
2154 | rd_kafka_msgq_len(&batch->msgq), |
2155 | rkb->rkb_nodeid, |
2156 | rd_kafka_pid2str(batch->pid), |
2157 | batch->first_seq, |
2158 | firstmsg->rkm_u.producer.retries); |
2159 | |
2160 | perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; |
2161 | perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; |
2162 | perr->update_next_ack = rd_false; |
2163 | perr->update_next_err = rd_true; |
2164 | break; |
2165 | |
2166 | default: |
2167 | /* All other errors are handled in the standard |
2168 | * error Produce handler, which will set |
2169 | * update_next_ack|err accordingly. */ |
2170 | break; |
2171 | } |
2172 | } |
2173 | |
2174 | |
2175 | |
2176 | /** |
2177 | * @brief Error-handling for failed ProduceRequests |
2178 | * |
2179 | * @param errp Is the input and output error, it may be changed |
2180 | * by this function. |
2181 | * |
2182 | * @returns 0 if no further processing of the request should be performed, |
2183 | * such as triggering delivery reports, else 1. |
2184 | * |
2185 | * @warning May be called on the old leader thread. Lock rktp appropriately! |
2186 | * |
2187 | * @warning \p request may be NULL. |
2188 | * |
2189 | * @locality broker thread (but not necessarily the leader broker) |
2190 | * @locks none |
2191 | */ |
2192 | static int rd_kafka_handle_Produce_error (rd_kafka_broker_t *rkb, |
2193 | const rd_kafka_buf_t *request, |
2194 | rd_kafka_msgbatch_t *batch, |
2195 | struct rd_kafka_Produce_err *perr) { |
2196 | rd_kafka_t *rk = rkb->rkb_rk; |
2197 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp); |
2198 | int is_leader; |
2199 | |
2200 | if (unlikely(perr->err == RD_KAFKA_RESP_ERR__DESTROY)) |
2201 | return 0; /* Terminating */ |
2202 | |
2203 | /* When there is a partition leader change any outstanding |
2204 | * requests to the old broker will be handled by the old |
2205 | * broker thread when the responses are received/timeout: |
2206 | * in this case we need to be careful with locking: |
2207 | * check once if we're the leader (which allows relaxed |
2208 | * locking), and cache the current rktp's eos state vars. */ |
2209 | rd_kafka_toppar_lock(rktp); |
2210 | is_leader = rktp->rktp_leader == rkb; |
2211 | perr->rktp_pid = rktp->rktp_eos.pid; |
2212 | perr->next_ack_seq = rktp->rktp_eos.next_ack_seq; |
2213 | perr->next_err_seq = rktp->rktp_eos.next_err_seq; |
2214 | rd_kafka_toppar_unlock(rktp); |
2215 | |
2216 | /* All failures are initially treated as if the message |
2217 | * was not persisted, but the status may be changed later |
2218 | * for specific errors and actions. */ |
2219 | perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; |
2220 | |
2221 | /* Set actions for known errors (may be overriden later), |
2222 | * all other errors are considered permanent failures. |
2223 | * (also see rd_kafka_err_action() for the default actions). */ |
2224 | perr->actions = rd_kafka_err_action( |
2225 | rkb, perr->err, request, |
2226 | |
2227 | RD_KAFKA_ERR_ACTION_REFRESH| |
2228 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2229 | RD_KAFKA_RESP_ERR__TRANSPORT, |
2230 | |
2231 | RD_KAFKA_ERR_ACTION_REFRESH| |
2232 | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, |
2233 | RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, |
2234 | |
2235 | RD_KAFKA_ERR_ACTION_RETRY| |
2236 | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, |
2237 | RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, |
2238 | |
2239 | RD_KAFKA_ERR_ACTION_RETRY| |
2240 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2241 | RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND, |
2242 | |
2243 | RD_KAFKA_ERR_ACTION_RETRY| |
2244 | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, |
2245 | RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, |
2246 | |
2247 | RD_KAFKA_ERR_ACTION_RETRY| |
2248 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2249 | RD_KAFKA_RESP_ERR__TIMED_OUT, |
2250 | |
2251 | RD_KAFKA_ERR_ACTION_PERMANENT| |
2252 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2253 | RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, |
2254 | |
2255 | /* All Idempotent Producer-specific errors are |
2256 | * initially set as permanent errors, |
2257 | * special handling may change the actions. */ |
2258 | RD_KAFKA_ERR_ACTION_PERMANENT| |
2259 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2260 | RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, |
2261 | |
2262 | RD_KAFKA_ERR_ACTION_PERMANENT| |
2263 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2264 | RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER, |
2265 | |
2266 | RD_KAFKA_ERR_ACTION_PERMANENT| |
2267 | RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, |
2268 | RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, |
2269 | |
2270 | /* Message was purged from out-queue due to |
2271 | * Idempotent Producer Id change */ |
2272 | RD_KAFKA_ERR_ACTION_RETRY, |
2273 | RD_KAFKA_RESP_ERR__RETRY, |
2274 | |
2275 | RD_KAFKA_ERR_ACTION_END); |
2276 | |
2277 | rd_rkb_dbg(rkb, MSG, "MSGSET" , |
2278 | "%s [%" PRId32"]: MessageSet with %i message(s) " |
2279 | "(MsgId %" PRIu64", BaseSeq %" PRId32") " |
2280 | "encountered error: %s (actions %s)%s" , |
2281 | rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
2282 | rd_kafka_msgq_len(&batch->msgq), |
2283 | batch->first_msgid, batch->first_seq, |
2284 | rd_kafka_err2str(perr->err), |
2285 | rd_kafka_actions2str(perr->actions), |
2286 | is_leader ? "" : " [NOT LEADER]" ); |
2287 | |
2288 | |
2289 | /* |
2290 | * Special handling for Idempotent Producer |
2291 | * |
2292 | * Note: Idempotent Producer-specific errors received |
2293 | * on a non-idempotent producer will be passed through |
2294 | * directly to the application. |
2295 | */ |
2296 | if (rd_kafka_is_idempotent(rk)) |
2297 | rd_kafka_handle_idempotent_Produce_error(rkb, batch, perr); |
2298 | |
2299 | /* Update message persistence status based on action flags. |
2300 | * None of these are typically set after an idempotent error, |
2301 | * which sets the status explicitly. */ |
2302 | if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED) |
2303 | perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; |
2304 | else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED) |
2305 | perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; |
2306 | else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_PERSISTED) |
2307 | perr->status = RD_KAFKA_MSG_STATUS_PERSISTED; |
2308 | |
2309 | /* Save the last error for debugging sub-sequent errors, |
2310 | * useful for Idempotent Producer throubleshooting. */ |
2311 | rd_kafka_toppar_lock(rktp); |
2312 | rktp->rktp_last_err.err = perr->err; |
2313 | rktp->rktp_last_err.actions = perr->actions; |
2314 | rktp->rktp_last_err.ts = rd_clock(); |
2315 | rktp->rktp_last_err.base_seq = batch->first_seq; |
2316 | rktp->rktp_last_err.last_seq = perr->last_seq; |
2317 | rktp->rktp_last_err.base_msgid = batch->first_msgid; |
2318 | rd_kafka_toppar_unlock(rktp); |
2319 | |
2320 | /* |
2321 | * Handle actions |
2322 | */ |
2323 | if (perr->actions & (RD_KAFKA_ERR_ACTION_REFRESH | |
2324 | RD_KAFKA_ERR_ACTION_RETRY)) { |
2325 | /* Retry (refresh also implies retry) */ |
2326 | |
2327 | if (perr->actions & RD_KAFKA_ERR_ACTION_REFRESH) { |
2328 | /* Request metadata information update. |
2329 | * These errors imply that we have stale |
2330 | * information and the request was |
2331 | * either rejected or not sent - |
2332 | * we don't need to increment the retry count |
2333 | * when we perform a retry since: |
2334 | * - it is a temporary error (hopefully) |
2335 | * - there is no chance of duplicate delivery |
2336 | */ |
2337 | rd_kafka_toppar_leader_unavailable( |
2338 | rktp, "produce" , perr->err); |
2339 | |
2340 | /* We can't be certain the request wasn't |
2341 | * sent in case of transport failure, |
2342 | * so the ERR__TRANSPORT case will need |
2343 | * the retry count to be increased */ |
2344 | if (perr->err != RD_KAFKA_RESP_ERR__TRANSPORT) |
2345 | perr->incr_retry = 0; |
2346 | } |
2347 | |
2348 | /* If message timed out in queue, not in transit, |
2349 | * we will retry at a later time but not increment |
2350 | * the retry count since there is no risk |
2351 | * of duplicates. */ |
2352 | if (!rd_kafka_buf_was_sent(request)) |
2353 | perr->incr_retry = 0; |
2354 | |
2355 | if (!perr->incr_retry) { |
2356 | /* If retries are not to be incremented then |
2357 | * there is no chance of duplicates on retry, which |
2358 | * means these messages were not persisted. */ |
2359 | perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; |
2360 | } |
2361 | |
2362 | if (rd_kafka_is_idempotent(rk)) { |
2363 | /* Any currently in-flight requests will |
2364 | * fail with ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, |
2365 | * which should not be treated as a fatal error |
2366 | * since this request and sub-sequent requests |
2367 | * will be retried and thus return to order. |
2368 | * Unless the error was a timeout, or similar, |
2369 | * in which case the request might have made it |
2370 | * and the messages are considered possibly persisted: |
2371 | * in this case we allow the next in-flight response |
2372 | * to be successful, in which case we mark |
2373 | * this request's messages as succesfully delivered. */ |
2374 | if (perr->status & |
2375 | RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED) |
2376 | perr->update_next_ack = rd_true; |
2377 | else |
2378 | perr->update_next_ack = rd_false; |
2379 | perr->update_next_err = rd_true; |
2380 | |
2381 | /* Drain outstanding requests so that retries |
2382 | * are attempted with proper state knowledge and |
2383 | * without any in-flight requests. */ |
2384 | rd_kafka_toppar_lock(rktp); |
2385 | rd_kafka_idemp_drain_toppar(rktp, |
2386 | "drain before retrying" ); |
2387 | rd_kafka_toppar_unlock(rktp); |
2388 | } |
2389 | |
2390 | /* Since requests are specific to a broker |
2391 | * we move the retryable messages from the request |
2392 | * back to the partition queue (prepend) and then |
2393 | * let the new broker construct a new request. |
2394 | * While doing this we also make sure the retry count |
2395 | * for each message is honoured, any messages that |
2396 | * would exceeded the retry count will not be |
2397 | * moved but instead fail below. */ |
2398 | rd_kafka_toppar_retry_msgq(rktp, &batch->msgq, |
2399 | perr->incr_retry, |
2400 | perr->status); |
2401 | |
2402 | if (rd_kafka_msgq_len(&batch->msgq) == 0) { |
2403 | /* No need do anything more with the request |
2404 | * here since the request no longer has any |
2405 | * messages associated with it. */ |
2406 | return 0; |
2407 | } |
2408 | } |
2409 | |
2410 | if (perr->actions & RD_KAFKA_ERR_ACTION_PERMANENT && |
2411 | rd_kafka_is_idempotent(rk)) { |
2412 | if (rk->rk_conf.eos.gapless) { |
2413 | /* A permanent non-idempotent error will lead to |
2414 | * gaps in the message series, the next request |
2415 | * will fail with ...ERR_OUT_OF_ORDER_SEQUENCE_NUMBER. |
2416 | * To satisfy the gapless guarantee we need to raise |
2417 | * a fatal error here. */ |
2418 | rd_kafka_set_fatal_error( |
2419 | rk, RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE, |
2420 | "ProduceRequest for %.*s [%" PRId32"] " |
2421 | "with %d message(s) failed: " |
2422 | "%s (broker %" PRId32" %s, base seq %" PRId32"): " |
2423 | "unable to satisfy gap-less guarantee" , |
2424 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2425 | rktp->rktp_partition, |
2426 | rd_kafka_msgq_len(&batch->msgq), |
2427 | rd_kafka_err2str(perr->err), |
2428 | rkb->rkb_nodeid, |
2429 | rd_kafka_pid2str(batch->pid), |
2430 | batch->first_seq); |
2431 | |
2432 | /* Drain outstanding requests and reset PID. */ |
2433 | rd_kafka_idemp_drain_reset(rk); |
2434 | |
2435 | } else { |
2436 | /* If gapless is not set we bump the Epoch and |
2437 | * renumber the messages to send. */ |
2438 | |
2439 | /* Drain outstanding requests and bump the epoch .*/ |
2440 | rd_kafka_idemp_drain_epoch_bump( |
2441 | rk, "message sequence gap" ); |
2442 | } |
2443 | |
2444 | perr->update_next_ack = rd_false; |
2445 | /* Make sure the next error will not raise a fatal error. */ |
2446 | perr->update_next_err = rd_true; |
2447 | } |
2448 | |
2449 | /* Translate request-level timeout error code |
2450 | * to message-level timeout error code. */ |
2451 | if (perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT || |
2452 | perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) |
2453 | perr->err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; |
2454 | |
2455 | return 1; |
2456 | } |
2457 | |
2458 | /** |
2459 | * @brief Handle ProduceResponse success for idempotent producer |
2460 | * |
2461 | * @warning May be called on the old leader thread. Lock rktp appropriately! |
2462 | * |
2463 | * @locks none |
2464 | * @locality broker thread (but not necessarily the leader broker thread) |
2465 | */ |
2466 | static void |
2467 | rd_kafka_handle_idempotent_Produce_success (rd_kafka_broker_t *rkb, |
2468 | rd_kafka_msgbatch_t *batch, |
2469 | int32_t next_seq) { |
2470 | rd_kafka_t *rk = rkb->rkb_rk; |
2471 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp); |
2472 | char fatal_err[512]; |
2473 | uint64_t first_msgid, last_msgid; |
2474 | |
2475 | *fatal_err = '\0'; |
2476 | |
2477 | first_msgid = rd_kafka_msgq_first(&batch->msgq)->rkm_u.producer.msgid; |
2478 | last_msgid = rd_kafka_msgq_last(&batch->msgq)->rkm_u.producer.msgid; |
2479 | |
2480 | rd_kafka_toppar_lock(rktp); |
2481 | |
2482 | /* If the last acked msgid is higher than |
2483 | * the next message to (re)transmit in the message queue |
2484 | * it means a previous series of R1,R2 ProduceRequests |
2485 | * had R1 fail with uncertain persistence status, |
2486 | * such as timeout or transport error, but R2 succeeded, |
2487 | * which means the messages in R1 were in fact persisted. |
2488 | * In this case trigger delivery reports for all messages |
2489 | * in queue until we hit a non-acked message msgid. */ |
2490 | if (unlikely(rktp->rktp_eos.acked_msgid < first_msgid - 1)) { |
2491 | rd_kafka_dr_implicit_ack(rkb, rktp, last_msgid); |
2492 | |
2493 | } else if (unlikely(batch->first_seq != rktp->rktp_eos.next_ack_seq && |
2494 | batch->first_seq == rktp->rktp_eos.next_err_seq)) { |
2495 | /* Response ordering is typically not a concern |
2496 | * (but will not happen with current broker versions), |
2497 | * unless we're expecting an error to be returned at |
2498 | * this sequence rather than a success ack, in which |
2499 | * case raise a fatal error. */ |
2500 | |
2501 | /* Can't call set_fatal_error() while |
2502 | * holding the toppar lock, so construct |
2503 | * the error string here and call |
2504 | * set_fatal_error() below after |
2505 | * toppar lock has been released. */ |
2506 | rd_snprintf( |
2507 | fatal_err, sizeof(fatal_err), |
2508 | "ProduceRequest for %.*s [%" PRId32"] " |
2509 | "with %d message(s) " |
2510 | "succeeded when expecting failure " |
2511 | "(broker %" PRId32" %s, " |
2512 | "base seq %" PRId32", " |
2513 | "next ack seq %" PRId32", " |
2514 | "next err seq %" PRId32": " |
2515 | "unable to retry without risking " |
2516 | "duplication/reordering" , |
2517 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
2518 | rktp->rktp_partition, |
2519 | rd_kafka_msgq_len(&batch->msgq), |
2520 | rkb->rkb_nodeid, |
2521 | rd_kafka_pid2str(batch->pid), |
2522 | batch->first_seq, |
2523 | rktp->rktp_eos.next_ack_seq, |
2524 | rktp->rktp_eos.next_err_seq); |
2525 | |
2526 | rktp->rktp_eos.next_err_seq = next_seq; |
2527 | } |
2528 | |
2529 | if (likely(!*fatal_err)) { |
2530 | /* Advance next expected err and/or ack sequence */ |
2531 | |
2532 | /* Only step err seq if it hasn't diverged. */ |
2533 | if (rktp->rktp_eos.next_err_seq == |
2534 | rktp->rktp_eos.next_ack_seq) |
2535 | rktp->rktp_eos.next_err_seq = next_seq; |
2536 | |
2537 | rktp->rktp_eos.next_ack_seq = next_seq; |
2538 | } |
2539 | |
2540 | /* Store the last acked message sequence, |
2541 | * since retries within the broker cache window (5 requests) |
2542 | * will succeed for older messages we must only update the |
2543 | * acked msgid if it is higher than the last acked. */ |
2544 | if (last_msgid > rktp->rktp_eos.acked_msgid) |
2545 | rktp->rktp_eos.acked_msgid = last_msgid; |
2546 | |
2547 | rd_kafka_toppar_unlock(rktp); |
2548 | |
2549 | /* Must call set_fatal_error() after releasing |
2550 | * the toppar lock. */ |
2551 | if (unlikely(*fatal_err)) |
2552 | rd_kafka_set_fatal_error(rk, RD_KAFKA_RESP_ERR__INCONSISTENT, |
2553 | "%s" , fatal_err); |
2554 | } |
2555 | |
2556 | |
2557 | /** |
2558 | * @brief Handle ProduceRequest result for a message batch. |
2559 | * |
2560 | * @warning \p request may be NULL. |
2561 | * |
2562 | * @localiy broker thread (but not necessarily the toppar's handler thread) |
2563 | * @locks none |
2564 | */ |
2565 | static void |
2566 | rd_kafka_msgbatch_handle_Produce_result ( |
2567 | rd_kafka_broker_t *rkb, |
2568 | rd_kafka_msgbatch_t *batch, |
2569 | rd_kafka_resp_err_t err, |
2570 | const struct rd_kafka_Produce_result *presult, |
2571 | const rd_kafka_buf_t *request) { |
2572 | |
2573 | rd_kafka_t *rk = rkb->rkb_rk; |
2574 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp); |
2575 | rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; |
2576 | rd_bool_t last_inflight; |
2577 | int32_t next_seq; |
2578 | |
2579 | /* Decrease partition's messages in-flight counter */ |
2580 | rd_assert(rd_atomic32_get(&rktp->rktp_msgs_inflight) >= |
2581 | rd_kafka_msgq_len(&batch->msgq)); |
2582 | last_inflight = !rd_atomic32_sub(&rktp->rktp_msgs_inflight, |
2583 | rd_kafka_msgq_len(&batch->msgq)); |
2584 | |
2585 | /* Next expected sequence (and handle wrap) */ |
2586 | next_seq = rd_kafka_seq_wrap(batch->first_seq + |
2587 | rd_kafka_msgq_len(&batch->msgq)); |
2588 | |
2589 | if (likely(!err)) { |
2590 | rd_rkb_dbg(rkb, MSG, "MSGSET" , |
2591 | "%s [%" PRId32"]: MessageSet with %i message(s) " |
2592 | "(MsgId %" PRIu64", BaseSeq %" PRId32") delivered" , |
2593 | rktp->rktp_rkt->rkt_topic->str, |
2594 | rktp->rktp_partition, |
2595 | rd_kafka_msgq_len(&batch->msgq), |
2596 | batch->first_msgid, batch->first_seq); |
2597 | |
2598 | if (rktp->rktp_rkt->rkt_conf.required_acks != 0) |
2599 | status = RD_KAFKA_MSG_STATUS_PERSISTED; |
2600 | |
2601 | if (rd_kafka_is_idempotent(rk)) |
2602 | rd_kafka_handle_idempotent_Produce_success(rkb, batch, |
2603 | next_seq); |
2604 | } else { |
2605 | /* Error handling */ |
2606 | struct rd_kafka_Produce_err perr = { |
2607 | .err = err, |
2608 | .incr_retry = 1, |
2609 | .status = status, |
2610 | .update_next_ack = rd_true, |
2611 | .update_next_err = rd_true, |
2612 | .last_seq = (batch->first_seq + |
2613 | rd_kafka_msgq_len(&batch->msgq) - 1) |
2614 | }; |
2615 | |
2616 | rd_kafka_handle_Produce_error(rkb, request, batch, &perr); |
2617 | |
2618 | /* Update next expected acked and/or err sequence. */ |
2619 | if (perr.update_next_ack || perr.update_next_err) { |
2620 | rd_kafka_toppar_lock(rktp); |
2621 | if (perr.update_next_ack) |
2622 | rktp->rktp_eos.next_ack_seq = next_seq; |
2623 | if (perr.update_next_err) |
2624 | rktp->rktp_eos.next_err_seq = next_seq; |
2625 | rd_kafka_toppar_unlock(rktp); |
2626 | } |
2627 | |
2628 | err = perr.err; |
2629 | status = perr.status; |
2630 | } |
2631 | |
2632 | |
2633 | /* Messages to retry will have been removed from the request's queue */ |
2634 | if (likely(rd_kafka_msgq_len(&batch->msgq) > 0)) { |
2635 | /* Set offset, timestamp and status for each message. */ |
2636 | rd_kafka_msgq_set_metadata(&batch->msgq, |
2637 | presult->offset, |
2638 | presult->timestamp, |
2639 | status); |
2640 | |
2641 | /* Enqueue messages for delivery report. */ |
2642 | rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err); |
2643 | } |
2644 | |
2645 | if (rd_kafka_is_idempotent(rk) && last_inflight) |
2646 | rd_kafka_idemp_inflight_toppar_sub(rk, rktp); |
2647 | } |
2648 | |
2649 | |
2650 | /** |
2651 | * @brief Handle ProduceResponse |
2652 | * |
2653 | * @param reply is NULL when `acks=0` and on various local errors. |
2654 | * |
2655 | * @remark ProduceRequests are never retried, retriable errors are |
2656 | * instead handled by re-enqueuing the request's messages back |
2657 | * on the partition queue to have a new ProduceRequest constructed |
2658 | * eventually. |
2659 | * |
2660 | * @warning May be called on the old leader thread. Lock rktp appropriately! |
2661 | * |
2662 | * @locality broker thread (but not necessarily the leader broker thread) |
2663 | */ |
2664 | static void rd_kafka_handle_Produce (rd_kafka_t *rk, |
2665 | rd_kafka_broker_t *rkb, |
2666 | rd_kafka_resp_err_t err, |
2667 | rd_kafka_buf_t *reply, |
2668 | rd_kafka_buf_t *request, |
2669 | void *opaque) { |
2670 | rd_kafka_msgbatch_t *batch = &request->rkbuf_batch; |
2671 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(batch->s_rktp); |
2672 | struct rd_kafka_Produce_result result = { |
2673 | .offset = RD_KAFKA_OFFSET_INVALID, |
2674 | .timestamp = -1 |
2675 | }; |
2676 | |
2677 | /* Unit test interface: inject errors */ |
2678 | if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) { |
2679 | err = rk->rk_conf.ut.handle_ProduceResponse( |
2680 | rkb->rkb_rk, |
2681 | rkb->rkb_nodeid, |
2682 | batch->first_msgid, |
2683 | err); |
2684 | } |
2685 | |
2686 | /* Parse Produce reply (unless the request errored) */ |
2687 | if (!err && reply) |
2688 | err = rd_kafka_handle_Produce_parse(rkb, rktp, |
2689 | reply, request, |
2690 | &result); |
2691 | |
2692 | rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, |
2693 | &result, request); |
2694 | } |
2695 | |
2696 | |
2697 | /** |
2698 | * @brief Send ProduceRequest for messages in toppar queue. |
2699 | * |
2700 | * @returns the number of messages included, or 0 on error / no messages. |
2701 | * |
2702 | * @locality broker thread |
2703 | */ |
2704 | int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, |
2705 | const rd_kafka_pid_t pid) { |
2706 | rd_kafka_buf_t *rkbuf; |
2707 | rd_kafka_itopic_t *rkt = rktp->rktp_rkt; |
2708 | size_t MessageSetSize = 0; |
2709 | int cnt; |
2710 | rd_ts_t now; |
2711 | int64_t first_msg_timeout; |
2712 | int tmout; |
2713 | |
2714 | /** |
2715 | * Create ProduceRequest with as many messages from the toppar |
2716 | * transmit queue as possible. |
2717 | */ |
2718 | rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp, |
2719 | &rktp->rktp_xmit_msgq, |
2720 | pid, &MessageSetSize); |
2721 | if (unlikely(!rkbuf)) |
2722 | return 0; |
2723 | |
2724 | cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq); |
2725 | rd_dassert(cnt > 0); |
2726 | |
2727 | rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, (int64_t)cnt); |
2728 | rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, (int64_t)MessageSetSize); |
2729 | |
2730 | if (!rkt->rkt_conf.required_acks) |
2731 | rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE; |
2732 | |
2733 | /* Use timeout from first message in batch */ |
2734 | now = rd_clock(); |
2735 | first_msg_timeout = (rd_kafka_msgq_first(&rkbuf->rkbuf_batch.msgq)-> |
2736 | rkm_ts_timeout - now) / 1000; |
2737 | |
2738 | if (unlikely(first_msg_timeout <= 0)) { |
2739 | /* Message has already timed out, allow 100 ms |
2740 | * to produce anyway */ |
2741 | tmout = 100; |
2742 | } else { |
2743 | tmout = (int)RD_MIN(INT_MAX, first_msg_timeout); |
2744 | } |
2745 | |
2746 | /* Set absolute timeout (including retries), the |
2747 | * effective timeout for this specific request will be |
2748 | * capped by socket.timeout.ms */ |
2749 | rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now); |
2750 | |
2751 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, |
2752 | RD_KAFKA_NO_REPLYQ, |
2753 | rd_kafka_handle_Produce, NULL); |
2754 | |
2755 | return cnt; |
2756 | } |
2757 | |
2758 | |
2759 | /** |
2760 | * @brief Construct and send CreateTopicsRequest to \p rkb |
2761 | * with the topics (NewTopic_t*) in \p new_topics, using |
2762 | * \p options. |
2763 | * |
2764 | * The response (unparsed) will be enqueued on \p replyq |
2765 | * for handling by \p resp_cb (with \p opaque passed). |
2766 | * |
2767 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for |
2768 | * transmission, otherwise an error code and errstr will be |
2769 | * updated with a human readable error string. |
2770 | */ |
2771 | rd_kafka_resp_err_t |
2772 | rd_kafka_CreateTopicsRequest (rd_kafka_broker_t *rkb, |
2773 | const rd_list_t *new_topics /*(NewTopic_t*)*/, |
2774 | rd_kafka_AdminOptions_t *options, |
2775 | char *errstr, size_t errstr_size, |
2776 | rd_kafka_replyq_t replyq, |
2777 | rd_kafka_resp_cb_t *resp_cb, |
2778 | void *opaque) { |
2779 | rd_kafka_buf_t *rkbuf; |
2780 | int16_t ApiVersion = 0; |
2781 | int features; |
2782 | int i = 0; |
2783 | rd_kafka_NewTopic_t *newt; |
2784 | int op_timeout; |
2785 | |
2786 | if (rd_list_cnt(new_topics) == 0) { |
2787 | rd_snprintf(errstr, errstr_size, "No topics to create" ); |
2788 | rd_kafka_replyq_destroy(&replyq); |
2789 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
2790 | } |
2791 | |
2792 | ApiVersion = rd_kafka_broker_ApiVersion_supported( |
2793 | rkb, RD_KAFKAP_CreateTopics, 0, 2, &features); |
2794 | if (ApiVersion == -1) { |
2795 | rd_snprintf(errstr, errstr_size, |
2796 | "Topic Admin API (KIP-4) not supported " |
2797 | "by broker, requires broker version >= 0.10.2.0" ); |
2798 | rd_kafka_replyq_destroy(&replyq); |
2799 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
2800 | } |
2801 | |
2802 | if (rd_kafka_confval_get_int(&options->validate_only) && |
2803 | ApiVersion < 1) { |
2804 | rd_snprintf(errstr, errstr_size, |
2805 | "CreateTopics.validate_only=true not " |
2806 | "supported by broker" ); |
2807 | rd_kafka_replyq_destroy(&replyq); |
2808 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
2809 | } |
2810 | |
2811 | |
2812 | |
2813 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1, |
2814 | 4 + |
2815 | (rd_list_cnt(new_topics) * 200) + |
2816 | 4 + 1); |
2817 | |
2818 | /* #topics */ |
2819 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics)); |
2820 | |
2821 | while ((newt = rd_list_elem(new_topics, i++))) { |
2822 | int partition; |
2823 | int ei = 0; |
2824 | const rd_kafka_ConfigEntry_t *entry; |
2825 | |
2826 | /* topic */ |
2827 | rd_kafka_buf_write_str(rkbuf, newt->topic, -1); |
2828 | |
2829 | if (rd_list_cnt(&newt->replicas)) { |
2830 | /* num_partitions and replication_factor must be |
2831 | * set to -1 if a replica assignment is sent. */ |
2832 | /* num_partitions */ |
2833 | rd_kafka_buf_write_i32(rkbuf, -1); |
2834 | /* replication_factor */ |
2835 | rd_kafka_buf_write_i16(rkbuf, -1); |
2836 | } else { |
2837 | /* num_partitions */ |
2838 | rd_kafka_buf_write_i32(rkbuf, newt->num_partitions); |
2839 | /* replication_factor */ |
2840 | rd_kafka_buf_write_i16(rkbuf, |
2841 | (int16_t)newt-> |
2842 | replication_factor); |
2843 | } |
2844 | |
2845 | /* #replica_assignment */ |
2846 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas)); |
2847 | |
2848 | /* Replicas per partition, see rdkafka_admin.[ch] |
2849 | * for how these are constructed. */ |
2850 | for (partition = 0 ; partition < rd_list_cnt(&newt->replicas); |
2851 | partition++) { |
2852 | const rd_list_t *replicas; |
2853 | int ri = 0; |
2854 | |
2855 | replicas = rd_list_elem(&newt->replicas, partition); |
2856 | if (!replicas) |
2857 | continue; |
2858 | |
2859 | /* partition */ |
2860 | rd_kafka_buf_write_i32(rkbuf, partition); |
2861 | /* #replicas */ |
2862 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas)); |
2863 | |
2864 | for (ri = 0 ; ri < rd_list_cnt(replicas) ; ri++) { |
2865 | /* replica */ |
2866 | rd_kafka_buf_write_i32( |
2867 | rkbuf, rd_list_get_int32(replicas, ri)); |
2868 | } |
2869 | } |
2870 | |
2871 | /* #config_entries */ |
2872 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config)); |
2873 | |
2874 | RD_LIST_FOREACH(entry, &newt->config, ei) { |
2875 | /* config_name */ |
2876 | rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); |
2877 | /* config_value (nullable) */ |
2878 | rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); |
2879 | } |
2880 | } |
2881 | |
2882 | /* timeout */ |
2883 | op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); |
2884 | rd_kafka_buf_write_i32(rkbuf, op_timeout); |
2885 | |
2886 | if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) |
2887 | rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0); |
2888 | |
2889 | if (ApiVersion >= 1) { |
2890 | /* validate_only */ |
2891 | rd_kafka_buf_write_i8(rkbuf, |
2892 | rd_kafka_confval_get_int(&options-> |
2893 | validate_only)); |
2894 | } |
2895 | |
2896 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
2897 | |
2898 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
2899 | |
2900 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2901 | } |
2902 | |
2903 | |
2904 | /** |
2905 | * @brief Construct and send DeleteTopicsRequest to \p rkb |
2906 | * with the topics (DeleteTopic_t *) in \p del_topics, using |
2907 | * \p options. |
2908 | * |
2909 | * The response (unparsed) will be enqueued on \p replyq |
2910 | * for handling by \p resp_cb (with \p opaque passed). |
2911 | * |
2912 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for |
2913 | * transmission, otherwise an error code and errstr will be |
2914 | * updated with a human readable error string. |
2915 | */ |
2916 | rd_kafka_resp_err_t |
2917 | rd_kafka_DeleteTopicsRequest (rd_kafka_broker_t *rkb, |
2918 | const rd_list_t *del_topics /*(DeleteTopic_t*)*/, |
2919 | rd_kafka_AdminOptions_t *options, |
2920 | char *errstr, size_t errstr_size, |
2921 | rd_kafka_replyq_t replyq, |
2922 | rd_kafka_resp_cb_t *resp_cb, |
2923 | void *opaque) { |
2924 | rd_kafka_buf_t *rkbuf; |
2925 | int16_t ApiVersion = 0; |
2926 | int features; |
2927 | int i = 0; |
2928 | rd_kafka_DeleteTopic_t *delt; |
2929 | int op_timeout; |
2930 | |
2931 | if (rd_list_cnt(del_topics) == 0) { |
2932 | rd_snprintf(errstr, errstr_size, "No topics to delete" ); |
2933 | rd_kafka_replyq_destroy(&replyq); |
2934 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
2935 | } |
2936 | |
2937 | ApiVersion = rd_kafka_broker_ApiVersion_supported( |
2938 | rkb, RD_KAFKAP_DeleteTopics, 0, 1, &features); |
2939 | if (ApiVersion == -1) { |
2940 | rd_snprintf(errstr, errstr_size, |
2941 | "Topic Admin API (KIP-4) not supported " |
2942 | "by broker, requires broker version >= 0.10.2.0" ); |
2943 | rd_kafka_replyq_destroy(&replyq); |
2944 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
2945 | } |
2946 | |
2947 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteTopics, 1, |
2948 | /* FIXME */ |
2949 | 4 + |
2950 | (rd_list_cnt(del_topics) * 100) + |
2951 | 4); |
2952 | |
2953 | /* #topics */ |
2954 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_topics)); |
2955 | |
2956 | while ((delt = rd_list_elem(del_topics, i++))) |
2957 | rd_kafka_buf_write_str(rkbuf, delt->topic, -1); |
2958 | |
2959 | /* timeout */ |
2960 | op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); |
2961 | rd_kafka_buf_write_i32(rkbuf, op_timeout); |
2962 | |
2963 | if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) |
2964 | rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0); |
2965 | |
2966 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
2967 | |
2968 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
2969 | |
2970 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2971 | } |
2972 | |
2973 | |
2974 | |
2975 | /** |
2976 | * @brief Construct and send CreatePartitionsRequest to \p rkb |
2977 | * with the topics (NewPartitions_t*) in \p new_parts, using |
2978 | * \p options. |
2979 | * |
2980 | * The response (unparsed) will be enqueued on \p replyq |
2981 | * for handling by \p resp_cb (with \p opaque passed). |
2982 | * |
2983 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for |
2984 | * transmission, otherwise an error code and errstr will be |
2985 | * updated with a human readable error string. |
2986 | */ |
2987 | rd_kafka_resp_err_t |
2988 | rd_kafka_CreatePartitionsRequest (rd_kafka_broker_t *rkb, |
2989 | const rd_list_t *new_parts /*(NewPartitions_t*)*/, |
2990 | rd_kafka_AdminOptions_t *options, |
2991 | char *errstr, size_t errstr_size, |
2992 | rd_kafka_replyq_t replyq, |
2993 | rd_kafka_resp_cb_t *resp_cb, |
2994 | void *opaque) { |
2995 | rd_kafka_buf_t *rkbuf; |
2996 | int16_t ApiVersion = 0; |
2997 | int i = 0; |
2998 | rd_kafka_NewPartitions_t *newp; |
2999 | int op_timeout; |
3000 | |
3001 | if (rd_list_cnt(new_parts) == 0) { |
3002 | rd_snprintf(errstr, errstr_size, "No partitions to create" ); |
3003 | rd_kafka_replyq_destroy(&replyq); |
3004 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3005 | } |
3006 | |
3007 | ApiVersion = rd_kafka_broker_ApiVersion_supported( |
3008 | rkb, RD_KAFKAP_CreatePartitions, 0, 0, NULL); |
3009 | if (ApiVersion == -1) { |
3010 | rd_snprintf(errstr, errstr_size, |
3011 | "CreatePartitions (KIP-195) not supported " |
3012 | "by broker, requires broker version >= 1.0.0" ); |
3013 | rd_kafka_replyq_destroy(&replyq); |
3014 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
3015 | } |
3016 | |
3017 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreatePartitions, 1, |
3018 | 4 + |
3019 | (rd_list_cnt(new_parts) * 200) + |
3020 | 4 + 1); |
3021 | |
3022 | /* #topics */ |
3023 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_parts)); |
3024 | |
3025 | while ((newp = rd_list_elem(new_parts, i++))) { |
3026 | /* topic */ |
3027 | rd_kafka_buf_write_str(rkbuf, newp->topic, -1); |
3028 | |
3029 | /* New partition count */ |
3030 | rd_kafka_buf_write_i32(rkbuf, (int32_t)newp->total_cnt); |
3031 | |
3032 | /* #replica_assignment */ |
3033 | if (rd_list_empty(&newp->replicas)) { |
3034 | rd_kafka_buf_write_i32(rkbuf, -1); |
3035 | } else { |
3036 | const rd_list_t *replicas; |
3037 | int pi = -1; |
3038 | |
3039 | rd_kafka_buf_write_i32(rkbuf, |
3040 | rd_list_cnt(&newp->replicas)); |
3041 | |
3042 | while ((replicas = rd_list_elem(&newp->replicas, |
3043 | ++pi))) { |
3044 | int ri = 0; |
3045 | |
3046 | /* replica count */ |
3047 | rd_kafka_buf_write_i32(rkbuf, |
3048 | rd_list_cnt(replicas)); |
3049 | |
3050 | /* replica */ |
3051 | for (ri = 0 ; ri < rd_list_cnt(replicas) ; |
3052 | ri++) { |
3053 | rd_kafka_buf_write_i32( |
3054 | rkbuf, |
3055 | rd_list_get_int32(replicas, |
3056 | ri)); |
3057 | } |
3058 | } |
3059 | } |
3060 | } |
3061 | |
3062 | /* timeout */ |
3063 | op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); |
3064 | rd_kafka_buf_write_i32(rkbuf, op_timeout); |
3065 | |
3066 | if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) |
3067 | rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0); |
3068 | |
3069 | /* validate_only */ |
3070 | rd_kafka_buf_write_i8( |
3071 | rkbuf, rd_kafka_confval_get_int(&options->validate_only)); |
3072 | |
3073 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
3074 | |
3075 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
3076 | |
3077 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3078 | } |
3079 | |
3080 | |
3081 | /** |
3082 | * @brief Construct and send AlterConfigsRequest to \p rkb |
3083 | * with the configs (ConfigResource_t*) in \p configs, using |
3084 | * \p options. |
3085 | * |
3086 | * The response (unparsed) will be enqueued on \p replyq |
3087 | * for handling by \p resp_cb (with \p opaque passed). |
3088 | * |
3089 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for |
3090 | * transmission, otherwise an error code and errstr will be |
3091 | * updated with a human readable error string. |
3092 | */ |
3093 | rd_kafka_resp_err_t |
3094 | rd_kafka_AlterConfigsRequest (rd_kafka_broker_t *rkb, |
3095 | const rd_list_t *configs /*(ConfigResource_t*)*/, |
3096 | rd_kafka_AdminOptions_t *options, |
3097 | char *errstr, size_t errstr_size, |
3098 | rd_kafka_replyq_t replyq, |
3099 | rd_kafka_resp_cb_t *resp_cb, |
3100 | void *opaque) { |
3101 | rd_kafka_buf_t *rkbuf; |
3102 | int16_t ApiVersion = 0; |
3103 | int i; |
3104 | const rd_kafka_ConfigResource_t *config; |
3105 | int op_timeout; |
3106 | |
3107 | if (rd_list_cnt(configs) == 0) { |
3108 | rd_snprintf(errstr, errstr_size, |
3109 | "No config resources specified" ); |
3110 | rd_kafka_replyq_destroy(&replyq); |
3111 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3112 | } |
3113 | |
3114 | ApiVersion = rd_kafka_broker_ApiVersion_supported( |
3115 | rkb, RD_KAFKAP_AlterConfigs, 0, 0, NULL); |
3116 | if (ApiVersion == -1) { |
3117 | rd_snprintf(errstr, errstr_size, |
3118 | "AlterConfigs (KIP-133) not supported " |
3119 | "by broker, requires broker version >= 0.11.0" ); |
3120 | rd_kafka_replyq_destroy(&replyq); |
3121 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
3122 | } |
3123 | |
3124 | /* incremental requires ApiVersion > FIXME */ |
3125 | if (ApiVersion < 1 /* FIXME */ && |
3126 | rd_kafka_confval_get_int(&options->incremental)) { |
3127 | rd_snprintf(errstr, errstr_size, |
3128 | "AlterConfigs.incremental=true (KIP-248) " |
3129 | "not supported by broker, " |
3130 | "requires broker version >= 2.0.0" ); |
3131 | rd_kafka_replyq_destroy(&replyq); |
3132 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
3133 | } |
3134 | |
3135 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1, |
3136 | rd_list_cnt(configs) * 200); |
3137 | |
3138 | /* #resources */ |
3139 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); |
3140 | |
3141 | RD_LIST_FOREACH(config, configs, i) { |
3142 | const rd_kafka_ConfigEntry_t *entry; |
3143 | int ei; |
3144 | |
3145 | /* resource_type */ |
3146 | rd_kafka_buf_write_i8(rkbuf, config->restype); |
3147 | |
3148 | /* resource_name */ |
3149 | rd_kafka_buf_write_str(rkbuf, config->name, -1); |
3150 | |
3151 | /* #config */ |
3152 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config)); |
3153 | |
3154 | RD_LIST_FOREACH(entry, &config->config, ei) { |
3155 | /* config_name */ |
3156 | rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); |
3157 | /* config_value (nullable) */ |
3158 | rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); |
3159 | |
3160 | if (ApiVersion == 1) |
3161 | rd_kafka_buf_write_i8(rkbuf, |
3162 | entry->a.operation); |
3163 | else if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) { |
3164 | rd_snprintf(errstr, errstr_size, |
3165 | "Broker version >= 2.0.0 required " |
3166 | "for add/delete config " |
3167 | "entries: only set supported " |
3168 | "by this broker" ); |
3169 | rd_kafka_buf_destroy(rkbuf); |
3170 | rd_kafka_replyq_destroy(&replyq); |
3171 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
3172 | } |
3173 | } |
3174 | } |
3175 | |
3176 | /* timeout */ |
3177 | op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); |
3178 | if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) |
3179 | rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0); |
3180 | |
3181 | /* validate_only */ |
3182 | rd_kafka_buf_write_i8( |
3183 | rkbuf, rd_kafka_confval_get_int(&options->validate_only)); |
3184 | |
3185 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
3186 | |
3187 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
3188 | |
3189 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3190 | } |
3191 | |
3192 | |
3193 | /** |
3194 | * @brief Construct and send DescribeConfigsRequest to \p rkb |
3195 | * with the configs (ConfigResource_t*) in \p configs, using |
3196 | * \p options. |
3197 | * |
3198 | * The response (unparsed) will be enqueued on \p replyq |
3199 | * for handling by \p resp_cb (with \p opaque passed). |
3200 | * |
3201 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for |
3202 | * transmission, otherwise an error code and errstr will be |
3203 | * updated with a human readable error string. |
3204 | */ |
3205 | rd_kafka_resp_err_t |
3206 | rd_kafka_DescribeConfigsRequest (rd_kafka_broker_t *rkb, |
3207 | const rd_list_t *configs /*(ConfigResource_t*)*/, |
3208 | rd_kafka_AdminOptions_t *options, |
3209 | char *errstr, size_t errstr_size, |
3210 | rd_kafka_replyq_t replyq, |
3211 | rd_kafka_resp_cb_t *resp_cb, |
3212 | void *opaque) { |
3213 | rd_kafka_buf_t *rkbuf; |
3214 | int16_t ApiVersion = 0; |
3215 | int i; |
3216 | const rd_kafka_ConfigResource_t *config; |
3217 | int op_timeout; |
3218 | |
3219 | if (rd_list_cnt(configs) == 0) { |
3220 | rd_snprintf(errstr, errstr_size, |
3221 | "No config resources specified" ); |
3222 | rd_kafka_replyq_destroy(&replyq); |
3223 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3224 | } |
3225 | |
3226 | ApiVersion = rd_kafka_broker_ApiVersion_supported( |
3227 | rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL); |
3228 | if (ApiVersion == -1) { |
3229 | rd_snprintf(errstr, errstr_size, |
3230 | "DescribeConfigs (KIP-133) not supported " |
3231 | "by broker, requires broker version >= 0.11.0" ); |
3232 | rd_kafka_replyq_destroy(&replyq); |
3233 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
3234 | } |
3235 | |
3236 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1, |
3237 | rd_list_cnt(configs) * 200); |
3238 | |
3239 | /* #resources */ |
3240 | rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); |
3241 | |
3242 | RD_LIST_FOREACH(config, configs, i) { |
3243 | const rd_kafka_ConfigEntry_t *entry; |
3244 | int ei; |
3245 | |
3246 | /* resource_type */ |
3247 | rd_kafka_buf_write_i8(rkbuf, config->restype); |
3248 | |
3249 | /* resource_name */ |
3250 | rd_kafka_buf_write_str(rkbuf, config->name, -1); |
3251 | |
3252 | /* #config */ |
3253 | if (rd_list_empty(&config->config)) { |
3254 | /* Get all configs */ |
3255 | rd_kafka_buf_write_i32(rkbuf, -1); |
3256 | } else { |
3257 | /* Get requested configs only */ |
3258 | rd_kafka_buf_write_i32(rkbuf, |
3259 | rd_list_cnt(&config->config)); |
3260 | } |
3261 | |
3262 | RD_LIST_FOREACH(entry, &config->config, ei) { |
3263 | /* config_name */ |
3264 | rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); |
3265 | } |
3266 | } |
3267 | |
3268 | |
3269 | if (ApiVersion == 1) { |
3270 | /* include_synonyms */ |
3271 | rd_kafka_buf_write_i8(rkbuf, 1); |
3272 | } |
3273 | |
3274 | /* timeout */ |
3275 | op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); |
3276 | if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) |
3277 | rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout+1000, 0); |
3278 | |
3279 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
3280 | |
3281 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
3282 | |
3283 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3284 | } |
3285 | |
3286 | |
3287 | |
3288 | /** |
3289 | * @brief Parses and handles an InitProducerId reply. |
3290 | * |
3291 | * @returns 0 on success, else an error. |
3292 | * |
3293 | * @locality rdkafka main thread |
3294 | * @locks none |
3295 | */ |
3296 | void |
3297 | rd_kafka_handle_InitProducerId (rd_kafka_t *rk, |
3298 | rd_kafka_broker_t *rkb, |
3299 | rd_kafka_resp_err_t err, |
3300 | rd_kafka_buf_t *rkbuf, |
3301 | rd_kafka_buf_t *request, |
3302 | void *opaque) { |
3303 | const int log_decode_errors = LOG_ERR; |
3304 | int16_t error_code; |
3305 | rd_kafka_pid_t pid; |
3306 | |
3307 | if (err) |
3308 | goto err; |
3309 | |
3310 | rd_kafka_buf_read_throttle_time(rkbuf); |
3311 | |
3312 | rd_kafka_buf_read_i16(rkbuf, &error_code); |
3313 | if ((err = error_code)) |
3314 | goto err; |
3315 | |
3316 | rd_kafka_buf_read_i64(rkbuf, &pid.id); |
3317 | rd_kafka_buf_read_i16(rkbuf, &pid.epoch); |
3318 | |
3319 | rd_kafka_idemp_pid_update(rkb, pid); |
3320 | |
3321 | return; |
3322 | |
3323 | err_parse: |
3324 | err = rkbuf->rkbuf_err; |
3325 | err: |
3326 | /* Retries are performed by idempotence state handler */ |
3327 | rd_kafka_idemp_request_pid_failed(rkb, err); |
3328 | } |
3329 | |
3330 | |
3331 | /** |
3332 | * @brief Construct and send InitProducerIdRequest to \p rkb. |
3333 | * |
3334 | * \p transactional_id may be NULL. |
3335 | * \p transaction_timeout_ms may be set to -1. |
3336 | * |
3337 | * The response (unparsed) will be handled by \p resp_cb served |
3338 | * by queue \p replyq. |
3339 | * |
3340 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for |
3341 | * transmission, otherwise an error code and errstr will be |
3342 | * updated with a human readable error string. |
3343 | */ |
3344 | rd_kafka_resp_err_t |
3345 | rd_kafka_InitProducerIdRequest (rd_kafka_broker_t *rkb, |
3346 | const char *transactional_id, |
3347 | int transaction_timeout_ms, |
3348 | char *errstr, size_t errstr_size, |
3349 | rd_kafka_replyq_t replyq, |
3350 | rd_kafka_resp_cb_t *resp_cb, |
3351 | void *opaque) { |
3352 | rd_kafka_buf_t *rkbuf; |
3353 | int16_t ApiVersion = 0; |
3354 | |
3355 | ApiVersion = rd_kafka_broker_ApiVersion_supported( |
3356 | rkb, RD_KAFKAP_InitProducerId, 0, 1, NULL); |
3357 | if (ApiVersion == -1) { |
3358 | rd_snprintf(errstr, errstr_size, |
3359 | "InitProducerId (KIP-98) not supported " |
3360 | "by broker, requires broker version >= 0.11.0" ); |
3361 | rd_kafka_replyq_destroy(&replyq); |
3362 | return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; |
3363 | } |
3364 | |
3365 | rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_InitProducerId, 1, |
3366 | 2 + (transactional_id ? |
3367 | strlen(transactional_id) : 0) + |
3368 | 4); |
3369 | |
3370 | /* transactional_id */ |
3371 | rd_kafka_buf_write_str(rkbuf, transactional_id, -1); |
3372 | |
3373 | /* transaction_timeout_ms */ |
3374 | rd_kafka_buf_write_i32(rkbuf, transaction_timeout_ms); |
3375 | |
3376 | rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); |
3377 | |
3378 | /* Let the idempotence state handler perform retries */ |
3379 | rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES; |
3380 | |
3381 | rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); |
3382 | |
3383 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3384 | } |
3385 | |
3386 | |
3387 | |
3388 | /** |
3389 | * @name Unit tests |
3390 | * @{ |
3391 | * |
3392 | * |
3393 | * |
3394 | * |
3395 | */ |
3396 | |
3397 | /** |
3398 | * @brief Create \p cnt messages, starting at \p msgid, and add them |
3399 | * to \p rkmq. |
3400 | * |
3401 | * @returns the number of messages added. |
3402 | */ |
3403 | static int |
3404 | ut_create_msgs (rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) { |
3405 | int i; |
3406 | |
3407 | for (i = 0 ; i < cnt ; i++) { |
3408 | rd_kafka_msg_t *rkm; |
3409 | |
3410 | rkm = ut_rd_kafka_msg_new(); |
3411 | rkm->rkm_u.producer.msgid = msgid++; |
3412 | |
3413 | rd_kafka_msgq_enq(rkmq, rkm); |
3414 | } |
3415 | |
3416 | return cnt; |
3417 | } |
3418 | |
3419 | /** |
3420 | * @brief Idempotent Producer request/response unit tests |
3421 | * |
3422 | * The current test verifies proper handling of the following case: |
3423 | * Batch 0 succeeds |
3424 | * Batch 1 fails with temporary error |
3425 | * Batch 2,3 fails with out of order sequence |
3426 | * Retry Batch 1-3 should succeed. |
3427 | */ |
3428 | static int unittest_idempotent_producer (void) { |
3429 | rd_kafka_t *rk; |
3430 | rd_kafka_conf_t *conf; |
3431 | rd_kafka_broker_t *rkb; |
3432 | #define _BATCH_CNT 4 |
3433 | #define _MSGS_PER_BATCH 3 |
3434 | const int msgcnt = _BATCH_CNT * _MSGS_PER_BATCH; |
3435 | int remaining_batches; |
3436 | uint64_t msgid = 1; |
3437 | shptr_rd_kafka_toppar_t *s_rktp; |
3438 | rd_kafka_toppar_t *rktp; |
3439 | rd_kafka_pid_t pid = { .id = 1000, .epoch = 0 }; |
3440 | struct rd_kafka_Produce_result result = { |
3441 | .offset = 1, |
3442 | .timestamp = 1000 |
3443 | }; |
3444 | rd_kafka_queue_t *rkqu; |
3445 | rd_kafka_event_t *rkev; |
3446 | rd_kafka_buf_t *request[_BATCH_CNT]; |
3447 | int rcnt = 0; |
3448 | int retry_msg_cnt = 0; |
3449 | int drcnt = 0; |
3450 | rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); |
3451 | int i, r; |
3452 | |
3453 | RD_UT_SAY("Verifying idempotent producer error handling" ); |
3454 | |
3455 | conf = rd_kafka_conf_new(); |
3456 | rd_kafka_conf_set(conf, "batch.num.messages" , "3" , NULL, 0); |
3457 | rd_kafka_conf_set(conf, "retry.backoff.ms" , "1" , NULL, 0); |
3458 | if (rd_kafka_conf_set(conf, "enable.idempotence" , "true" , NULL, 0) != |
3459 | RD_KAFKA_CONF_OK) |
3460 | RD_UT_FAIL("Failed to enable idempotence" ); |
3461 | rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR); |
3462 | |
3463 | rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); |
3464 | RD_UT_ASSERT(rk, "failed to create producer" ); |
3465 | |
3466 | rkqu = rd_kafka_queue_get_main(rk); |
3467 | |
3468 | /* We need a broker handle, use a logical broker to avoid |
3469 | * any connection attempts. */ |
3470 | rkb = rd_kafka_broker_add_logical(rk, "unittest" ); |
3471 | |
3472 | /* Have the broker support everything so msgset_writer selects |
3473 | * the most up-to-date output features. */ |
3474 | rd_kafka_broker_lock(rkb); |
3475 | rkb->rkb_features = RD_KAFKA_FEATURE_UNITTEST | RD_KAFKA_FEATURE_ALL; |
3476 | rd_kafka_broker_unlock(rkb); |
3477 | |
3478 | /* Get toppar */ |
3479 | s_rktp = rd_kafka_toppar_get2(rk, "uttopic" , 0, rd_false, rd_true); |
3480 | RD_UT_ASSERT(s_rktp, "failed to get toppar" ); |
3481 | rktp = rd_kafka_toppar_s2i(s_rktp); |
3482 | |
3483 | /* Set the topic as exists so messages are enqueued on |
3484 | * the desired rktp away (otherwise UA partition) */ |
3485 | rd_ut_kafka_topic_set_topic_exists(rktp->rktp_rkt, 1, -1); |
3486 | |
3487 | /* Produce messages */ |
3488 | ut_create_msgs(&rkmq, 1, msgcnt); |
3489 | |
3490 | /* Set the pid */ |
3491 | rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID); |
3492 | rd_kafka_idemp_pid_update(rkb, pid); |
3493 | pid = rd_kafka_idemp_get_pid(rk); |
3494 | RD_UT_ASSERT(rd_kafka_pid_valid(pid), "PID is invalid" ); |
3495 | rd_kafka_toppar_pid_change(rktp, pid, msgid); |
3496 | |
3497 | remaining_batches = _BATCH_CNT; |
3498 | |
3499 | /* Create a ProduceRequest for each batch */ |
3500 | for (rcnt = 0 ; rcnt < remaining_batches ; rcnt++) { |
3501 | size_t msize; |
3502 | request[rcnt] = rd_kafka_msgset_create_ProduceRequest( |
3503 | rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), &msize); |
3504 | RD_UT_ASSERT(request[rcnt], "request #%d failed" , rcnt); |
3505 | } |
3506 | |
3507 | RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == 0, |
3508 | "expected input message queue to be empty, " |
3509 | "but still has %d message(s)" , |
3510 | rd_kafka_msgq_len(&rkmq)); |
3511 | |
3512 | /* |
3513 | * Mock handling of each request |
3514 | */ |
3515 | |
3516 | /* Batch 0: accepted */ |
3517 | i = 0; |
3518 | r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); |
3519 | RD_UT_ASSERT(r == _MSGS_PER_BATCH, "." ); |
3520 | rd_kafka_msgbatch_handle_Produce_result( |
3521 | rkb, &request[i]->rkbuf_batch, |
3522 | RD_KAFKA_RESP_ERR_NO_ERROR, |
3523 | &result, request[i]); |
3524 | result.offset += r; |
3525 | RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0, |
3526 | "batch %d: expected no messages in rktp_msgq, not %d" , |
3527 | i, rd_kafka_msgq_len(&rktp->rktp_msgq)); |
3528 | rd_kafka_buf_destroy(request[i]); |
3529 | remaining_batches--; |
3530 | |
3531 | /* Batch 1: fail, triggering retry (re-enq on rktp_msgq) */ |
3532 | i = 1; |
3533 | r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); |
3534 | RD_UT_ASSERT(r == _MSGS_PER_BATCH, "." ); |
3535 | rd_kafka_msgbatch_handle_Produce_result( |
3536 | rkb, &request[i]->rkbuf_batch, |
3537 | RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, |
3538 | &result, request[i]); |
3539 | retry_msg_cnt += r; |
3540 | RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, |
3541 | "batch %d: expected %d messages in rktp_msgq, not %d" , |
3542 | i, retry_msg_cnt, |
3543 | rd_kafka_msgq_len(&rktp->rktp_msgq)); |
3544 | rd_kafka_buf_destroy(request[i]); |
3545 | |
3546 | /* Batch 2: OUT_OF_ORDER, triggering retry .. */ |
3547 | i = 2; |
3548 | r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); |
3549 | RD_UT_ASSERT(r == _MSGS_PER_BATCH, "." ); |
3550 | rd_kafka_msgbatch_handle_Produce_result( |
3551 | rkb, &request[i]->rkbuf_batch, |
3552 | RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, |
3553 | &result, request[i]); |
3554 | retry_msg_cnt += r; |
3555 | RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, |
3556 | "batch %d: expected %d messages in rktp_xmit_msgq, not %d" , |
3557 | i, retry_msg_cnt, |
3558 | rd_kafka_msgq_len(&rktp->rktp_msgq)); |
3559 | rd_kafka_buf_destroy(request[i]); |
3560 | |
3561 | /* Batch 3: OUT_OF_ORDER, triggering retry .. */ |
3562 | i = 3; |
3563 | r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); |
3564 | rd_kafka_msgbatch_handle_Produce_result( |
3565 | rkb, &request[i]->rkbuf_batch, |
3566 | RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, |
3567 | &result, request[i]); |
3568 | retry_msg_cnt += r; |
3569 | RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, |
3570 | "batch %d: expected %d messages in rktp_xmit_msgq, not %d" , |
3571 | i, retry_msg_cnt, |
3572 | rd_kafka_msgq_len(&rktp->rktp_msgq)); |
3573 | rd_kafka_buf_destroy(request[i]); |
3574 | |
3575 | |
3576 | /* Retried messages will have been moved to rktp_msgq, |
3577 | * move them back to our local queue. */ |
3578 | rd_kafka_toppar_lock(rktp); |
3579 | rd_kafka_msgq_move(&rkmq, &rktp->rktp_msgq); |
3580 | rd_kafka_toppar_unlock(rktp); |
3581 | |
3582 | RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == retry_msg_cnt, |
3583 | "Expected %d messages in retry queue, not %d" , |
3584 | retry_msg_cnt, rd_kafka_msgq_len(&rkmq)); |
3585 | |
3586 | /* Sleep a short while to make sure the retry backoff expires. */ |
3587 | rd_usleep(5*1000, NULL); /* 5ms */ |
3588 | |
3589 | /* |
3590 | * Create requests for remaining batches. |
3591 | */ |
3592 | for (rcnt = 0 ; rcnt < remaining_batches ; rcnt++) { |
3593 | size_t msize; |
3594 | request[rcnt] = rd_kafka_msgset_create_ProduceRequest( |
3595 | rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), &msize); |
3596 | RD_UT_ASSERT(request[rcnt], |
3597 | "Failed to create retry #%d (%d msgs in queue)" , |
3598 | rcnt, rd_kafka_msgq_len(&rkmq)); |
3599 | } |
3600 | |
3601 | /* |
3602 | * Mock handling of each request, they will now succeed. |
3603 | */ |
3604 | for (i = 0 ; i < rcnt ; i++) { |
3605 | r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); |
3606 | rd_kafka_msgbatch_handle_Produce_result( |
3607 | rkb, &request[i]->rkbuf_batch, |
3608 | RD_KAFKA_RESP_ERR_NO_ERROR, |
3609 | &result, request[i]); |
3610 | result.offset += r; |
3611 | rd_kafka_buf_destroy(request[i]); |
3612 | } |
3613 | |
3614 | retry_msg_cnt = 0; |
3615 | RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, |
3616 | "batch %d: expected %d messages in rktp_xmit_msgq, not %d" , |
3617 | i, retry_msg_cnt, |
3618 | rd_kafka_msgq_len(&rktp->rktp_msgq)); |
3619 | |
3620 | /* |
3621 | * Wait for delivery reports, they should all be successful. |
3622 | */ |
3623 | while ((rkev = rd_kafka_queue_poll(rkqu, 1000))) { |
3624 | const rd_kafka_message_t *rkmessage; |
3625 | |
3626 | RD_UT_SAY("Got %s event with %d message(s)" , |
3627 | rd_kafka_event_name(rkev), |
3628 | (int)rd_kafka_event_message_count(rkev)); |
3629 | |
3630 | while ((rkmessage = rd_kafka_event_message_next(rkev))) { |
3631 | RD_UT_SAY(" DR for message: %s: (persistence=%d)" , |
3632 | rd_kafka_err2str(rkmessage->err), |
3633 | rd_kafka_message_status(rkmessage)); |
3634 | if (rkmessage->err) |
3635 | RD_UT_WARN(" ^ Should not have failed" ); |
3636 | else |
3637 | drcnt++; |
3638 | } |
3639 | rd_kafka_event_destroy(rkev); |
3640 | } |
3641 | |
3642 | /* Should be no more messages in queues */ |
3643 | r = rd_kafka_outq_len(rk); |
3644 | RD_UT_ASSERT(r == 0, "expected outq to return 0, not %d" , r); |
3645 | |
3646 | /* Verify the expected number of good delivery reports were seen */ |
3647 | RD_UT_ASSERT(drcnt == msgcnt, |
3648 | "expected %d DRs, not %d" , msgcnt, drcnt); |
3649 | |
3650 | rd_kafka_queue_destroy(rkqu); |
3651 | rd_kafka_toppar_destroy(s_rktp); |
3652 | rd_kafka_broker_destroy(rkb); |
3653 | rd_kafka_destroy(rk); |
3654 | |
3655 | RD_UT_PASS(); |
3656 | return 0; |
3657 | } |
3658 | |
3659 | /** |
3660 | * @brief Request/response unit tests |
3661 | */ |
3662 | int unittest_request (void) { |
3663 | int fails = 0; |
3664 | |
3665 | fails += unittest_idempotent_producer(); |
3666 | |
3667 | return fails; |
3668 | } |
3669 | |
3670 | /**@}*/ |
3671 | |