1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_broker.h"
31#include "rdkafka_request.h"
32#include "rdkafka_topic.h"
33#include "rdkafka_partition.h"
34#include "rdkafka_assignor.h"
35#include "rdkafka_offset.h"
36#include "rdkafka_metadata.h"
37#include "rdkafka_cgrp.h"
38#include "rdkafka_interceptor.h"
39
40
41static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
42 const char *reason);
43static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
44 void *arg);
45static void rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
46 rd_kafka_topic_partition_list_t *assignment);
47static rd_kafka_resp_err_t rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg);
48static void
49rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
50 rd_kafka_topic_partition_list_t
51 *assignment, int usable_offsets,
52 int line);
53#define rd_kafka_cgrp_partitions_fetch_start(rkcg,assignment,usable_offsets) \
54 rd_kafka_cgrp_partitions_fetch_start0(rkcg,assignment,usable_offsets,\
55 __LINE__)
56static rd_kafka_op_res_t
57rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
58 rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
59 void *opaque);
60
61static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
62 const char *reason);
63
64static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg);
65
66static void rd_kafka_cgrp_rebalance (rd_kafka_cgrp_t *rkcg,
67 const char *reason);
68
69static void
70rd_kafka_cgrp_max_poll_interval_check_tmr_cb (rd_kafka_timers_t *rkts,
71 void *arg);
72
73
74/**
75 * @returns true if cgrp can start partition fetchers, which is true if
76 * there is a subscription and the group is fully joined, or there
77 * is no subscription (in which case the join state is irrelevant)
78 * such as for an assign() without subscribe(). */
79#define RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) \
80 ((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED)
81
82/**
83 * @returns true if cgrp is waiting for a rebalance_cb to be handled by
84 * the application.
85 */
86#define RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) \
87 ((rkcg)->rkcg_join_state == \
88 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB || \
89 (rkcg)->rkcg_join_state == \
90 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
91
92
93const char *rd_kafka_cgrp_state_names[] = {
94 "init",
95 "term",
96 "query-coord",
97 "wait-coord",
98 "wait-broker",
99 "wait-broker-transport",
100 "up"
101};
102
103const char *rd_kafka_cgrp_join_state_names[] = {
104 "init",
105 "wait-join",
106 "wait-metadata",
107 "wait-sync",
108 "wait-unassign",
109 "wait-assign-rebalance_cb",
110 "wait-revoke-rebalance_cb",
111 "assigned",
112 "started"
113};
114
115
116/**
117 * @brief Change the cgrp state.
118 *
119 * @returns 1 if the state was changed, else 0.
120 */
121static int rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) {
122 if ((int)rkcg->rkcg_state == state)
123 return 0;
124
125 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
126 "Group \"%.*s\" changed state %s -> %s "
127 "(v%d, join-state %s)",
128 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
129 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
130 rd_kafka_cgrp_state_names[state],
131 rkcg->rkcg_version,
132 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
133 rkcg->rkcg_state = state;
134 rkcg->rkcg_ts_statechange = rd_clock();
135
136 rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
137
138 return 1;
139}
140
141
142void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) {
143 if ((int)rkcg->rkcg_join_state == join_state)
144 return;
145
146 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
147 "Group \"%.*s\" changed join state %s -> %s "
148 "(v%d, state %s)",
149 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
150 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
151 rd_kafka_cgrp_join_state_names[join_state],
152 rkcg->rkcg_version,
153 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
154 rkcg->rkcg_join_state = join_state;
155}
156
157
158static RD_INLINE void
159rd_kafka_cgrp_version_new_barrier0 (rd_kafka_cgrp_t *rkcg,
160 const char *func, int line) {
161 rkcg->rkcg_version++;
162 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BARRIER",
163 "Group \"%.*s\": %s:%d: new version barrier v%d",
164 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), func, line,
165 rkcg->rkcg_version);
166}
167
168#define rd_kafka_cgrp_version_new_barrier(rkcg) \
169 rd_kafka_cgrp_version_new_barrier0(rkcg, __FUNCTION__, __LINE__)
170
171
172void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
173 rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_assignment);
174 rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
175 rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
176 rd_kafka_cgrp_set_member_id(rkcg, NULL);
177
178 rd_kafka_q_destroy_owner(rkcg->rkcg_q);
179 rd_kafka_q_destroy_owner(rkcg->rkcg_ops);
180 rd_kafka_q_destroy_owner(rkcg->rkcg_wait_coord_q);
181 rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
182 rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
183 rd_list_destroy(&rkcg->rkcg_toppars);
184 rd_list_destroy(rkcg->rkcg_subscribed_topics);
185 rd_free(rkcg);
186}
187
188
189
190
191rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
192 const rd_kafkap_str_t *group_id,
193 const rd_kafkap_str_t *client_id) {
194 rd_kafka_cgrp_t *rkcg;
195
196 rkcg = rd_calloc(1, sizeof(*rkcg));
197
198 rkcg->rkcg_rk = rk;
199 rkcg->rkcg_group_id = group_id;
200 rkcg->rkcg_client_id = client_id;
201 rkcg->rkcg_coord_id = -1;
202 rkcg->rkcg_generation_id = -1;
203 rkcg->rkcg_version = 1;
204
205 mtx_init(&rkcg->rkcg_lock, mtx_plain);
206 rkcg->rkcg_ops = rd_kafka_q_new(rk);
207 rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
208 rkcg->rkcg_ops->rkq_opaque = rkcg;
209 rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
210 rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
211 rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
212 rkcg->rkcg_q = rd_kafka_q_new(rk);
213
214 TAILQ_INIT(&rkcg->rkcg_topics);
215 rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
216 rd_kafka_cgrp_set_member_id(rkcg, "");
217 rkcg->rkcg_subscribed_topics =
218 rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
219 rd_interval_init(&rkcg->rkcg_coord_query_intvl);
220 rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
221 rd_interval_init(&rkcg->rkcg_join_intvl);
222 rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
223
224 /* Create a logical group coordinator broker to provide
225 * a dedicated connection for group coordination.
226 * This is needed since JoinGroup may block for up to
227 * max.poll.interval.ms, effectively blocking and timing out
228 * any other protocol requests (such as Metadata).
229 * The address for this broker will be updated when
230 * the group coordinator is assigned. */
231 rkcg->rkcg_coord = rd_kafka_broker_add_logical(rk, "GroupCoordinator");
232
233 if (rk->rk_conf.enable_auto_commit &&
234 rk->rk_conf.auto_commit_interval_ms > 0)
235 rd_kafka_timer_start(&rk->rk_timers,
236 &rkcg->rkcg_offset_commit_tmr,
237 rk->rk_conf.
238 auto_commit_interval_ms * 1000ll,
239 rd_kafka_cgrp_offset_commit_tmr_cb,
240 rkcg);
241
242 return rkcg;
243}
244
245
246/**
247 * @brief Set the group coordinator broker.
248 */
249static void rd_kafka_cgrp_coord_set_broker (rd_kafka_cgrp_t *rkcg,
250 rd_kafka_broker_t *rkb) {
251
252 rd_assert(rkcg->rkcg_curr_coord == NULL);
253
254 rd_assert(RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb));
255
256 rkcg->rkcg_curr_coord = rkb;
257 rd_kafka_broker_keep(rkb);
258
259 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDSET",
260 "Group \"%.*s\" coordinator set to broker %s",
261 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
262 rd_kafka_broker_name(rkb));
263
264 /* Reset query interval to trigger an immediate
265 * coord query if required */
266 if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
267 rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
268
269 rd_kafka_cgrp_set_state(rkcg,
270 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
271
272 rd_kafka_broker_persistent_connection_add(
273 rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord);
274
275 /* Set the logical coordinator's nodename to the
276 * proper broker's nodename, this will trigger a (re)connect
277 * to the new address. */
278 rd_kafka_broker_set_nodename(rkcg->rkcg_coord, rkb);
279}
280
281
282/**
283 * @brief Reset/clear the group coordinator broker.
284 */
285static void rd_kafka_cgrp_coord_clear_broker (rd_kafka_cgrp_t *rkcg) {
286 rd_kafka_broker_t *rkb = rkcg->rkcg_curr_coord;
287
288 rd_assert(rkcg->rkcg_curr_coord);
289 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDCLEAR",
290 "Group \"%.*s\" broker %s is no longer coordinator",
291 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
292 rd_kafka_broker_name(rkb));
293
294 rd_assert(rkcg->rkcg_coord);
295
296 rd_kafka_broker_persistent_connection_del(
297 rkcg->rkcg_coord,
298 &rkcg->rkcg_coord->rkb_persistconn.coord);
299
300 /* Clear the ephemeral broker's nodename.
301 * This will also trigger a disconnect. */
302 rd_kafka_broker_set_nodename(rkcg->rkcg_coord, NULL);
303
304 rkcg->rkcg_curr_coord = NULL;
305 rd_kafka_broker_destroy(rkb); /* from set_coord_broker() */
306}
307
308
309/**
310 * @brief Update/set the group coordinator.
311 *
312 * Will do nothing if there's been no change.
313 *
314 * @returns 1 if the coordinator, or state, was updated, else 0.
315 */
316static int rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg,
317 int32_t coord_id) {
318
319 /* Don't do anything while terminating */
320 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
321 return 0;
322
323 /* Check if coordinator changed */
324 if (rkcg->rkcg_coord_id != coord_id) {
325 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
326 "Group \"%.*s\" changing coordinator %"PRId32
327 " -> %"PRId32,
328 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
329 rkcg->rkcg_coord_id, coord_id);
330
331 /* Update coord id */
332 rkcg->rkcg_coord_id = coord_id;
333
334 /* Clear previous broker handle, if any */
335 if (rkcg->rkcg_curr_coord)
336 rd_kafka_cgrp_coord_clear_broker(rkcg);
337 }
338
339
340 if (rkcg->rkcg_curr_coord) {
341 /* There is already a known coordinator and a
342 * corresponding broker handle. */
343 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP)
344 return rd_kafka_cgrp_set_state(
345 rkcg,
346 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
347
348 } else if (rkcg->rkcg_coord_id != -1) {
349 rd_kafka_broker_t *rkb;
350
351 /* Try to find the coordinator broker handle */
352 rd_kafka_rdlock(rkcg->rkcg_rk);
353 rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, coord_id);
354 rd_kafka_rdunlock(rkcg->rkcg_rk);
355
356 /* It is possible, due to stale metadata, that the
357 * coordinator id points to a broker we still don't know
358 * about. In this case the client will continue
359 * querying metadata and querying for the coordinator
360 * until a match is found. */
361
362 if (rkb) {
363 /* Coordinator is known and broker handle exists */
364 rd_kafka_cgrp_coord_set_broker(rkcg, rkb);
365 rd_kafka_broker_destroy(rkb); /*from find_by_nodeid()*/
366
367 return 1;
368 } else {
369 /* Coordinator is known but no corresponding
370 * broker handle. */
371 return rd_kafka_cgrp_set_state(
372 rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
373
374 }
375
376 } else {
377 /* Coordinator still not known, re-query */
378 if (rkcg->rkcg_state >= RD_KAFKA_CGRP_STATE_WAIT_COORD)
379 return rd_kafka_cgrp_set_state(
380 rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
381 }
382
383 return 0; /* no change */
384}
385
386
387
388
389/**
390 * Handle GroupCoordinator response
391 */
392static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_t *rk,
393 rd_kafka_broker_t *rkb,
394 rd_kafka_resp_err_t err,
395 rd_kafka_buf_t *rkbuf,
396 rd_kafka_buf_t *request,
397 void *opaque) {
398 const int log_decode_errors = LOG_ERR;
399 int16_t ErrorCode = 0;
400 int32_t CoordId;
401 rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
402 int32_t CoordPort;
403 rd_kafka_cgrp_t *rkcg = opaque;
404 struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
405
406 if (likely(!(ErrorCode = err))) {
407 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
408 rd_kafka_buf_read_i32(rkbuf, &CoordId);
409 rd_kafka_buf_read_str(rkbuf, &CoordHost);
410 rd_kafka_buf_read_i32(rkbuf, &CoordPort);
411 }
412
413 if (ErrorCode)
414 goto err2;
415
416
417 mdb.id = CoordId;
418 RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
419 mdb.port = CoordPort;
420
421 rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
422 "Group \"%.*s\" coordinator is %s:%i id %"PRId32,
423 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
424 mdb.host, mdb.port, mdb.id);
425 rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb);
426
427 rd_kafka_cgrp_coord_update(rkcg, CoordId);
428 rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
429 return;
430
431err_parse: /* Parse error */
432 ErrorCode = rkbuf->rkbuf_err;
433 /* FALLTHRU */
434
435err2:
436 rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
437 "Group \"%.*s\" GroupCoordinator response error: %s",
438 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
439 rd_kafka_err2str(ErrorCode));
440
441 if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
442 return;
443
444 /* No need for retries since the coord query is intervalled. */
445
446 if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
447 rd_kafka_cgrp_coord_update(rkcg, -1);
448 else {
449 if (rkcg->rkcg_last_err != ErrorCode) {
450 rd_kafka_q_op_err(rkcg->rkcg_q,
451 RD_KAFKA_OP_CONSUMER_ERR,
452 ErrorCode, 0, NULL, 0,
453 "GroupCoordinator response error: %s",
454 rd_kafka_err2str(ErrorCode));
455
456 /* Suppress repeated errors */
457 rkcg->rkcg_last_err = ErrorCode;
458 }
459
460 /* Continue querying */
461 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
462 }
463
464 rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
465}
466
467
468/**
469 * Query for coordinator.
470 * Ask any broker in state UP
471 *
472 * Locality: main thread
473 */
474void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
475 const char *reason) {
476 rd_kafka_broker_t *rkb;
477
478 rd_kafka_rdlock(rkcg->rkcg_rk);
479 rkb = rd_kafka_broker_any(rkcg->rkcg_rk, RD_KAFKA_BROKER_STATE_UP,
480 rd_kafka_broker_filter_can_group_query, NULL,
481 "coordinator query");
482 rd_kafka_rdunlock(rkcg->rkcg_rk);
483
484 if (!rkb) {
485 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
486 "Group \"%.*s\": "
487 "no broker available for coordinator query: %s",
488 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
489 return;
490 }
491
492 rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
493 "Group \"%.*s\": querying for coordinator: %s",
494 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
495
496 rd_kafka_GroupCoordinatorRequest(rkb, rkcg->rkcg_group_id,
497 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
498 rd_kafka_cgrp_handle_GroupCoordinator,
499 rkcg);
500
501 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
502 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
503
504 rd_kafka_broker_destroy(rkb);
505}
506
507/**
508 * @brief Mark the current coordinator as dead.
509 *
510 * @locality main thread
511 */
512void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
513 const char *reason) {
514 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
515 "Group \"%.*s\": "
516 "marking the coordinator (%"PRId32") dead: %s: %s",
517 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
518 rkcg->rkcg_coord_id, rd_kafka_err2str(err), reason);
519
520 rd_kafka_cgrp_coord_update(rkcg, -1);
521
522 /* Re-query for coordinator */
523 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
524 rd_kafka_cgrp_coord_query(rkcg, reason);
525}
526
527
528
529/**
530 * @brief cgrp handling of LeaveGroup responses
531 * @param opaque must be the cgrp handle.
532 * @locality rdkafka main thread (unless err==ERR__DESTROY)
533 */
534static void rd_kafka_cgrp_handle_LeaveGroup (rd_kafka_t *rk,
535 rd_kafka_broker_t *rkb,
536 rd_kafka_resp_err_t err,
537 rd_kafka_buf_t *rkbuf,
538 rd_kafka_buf_t *request,
539 void *opaque) {
540 rd_kafka_cgrp_t *rkcg = opaque;
541 const int log_decode_errors = LOG_ERR;
542 int16_t ErrorCode = 0;
543
544 if (err) {
545 ErrorCode = err;
546 goto err;
547 }
548
549 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
550
551err:
552 if (ErrorCode)
553 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
554 "LeaveGroup response error in state %s: %s",
555 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
556 rd_kafka_err2str(ErrorCode));
557 else
558 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
559 "LeaveGroup response received in state %s",
560 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
561
562 if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
563 rd_assert(thrd_is_current(rk->rk_thread));
564 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
565 rd_kafka_cgrp_try_terminate(rkcg);
566 }
567
568
569
570 return;
571
572 err_parse:
573 ErrorCode = rkbuf->rkbuf_err;
574 goto err;
575}
576
577
578static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg) {
579
580 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
581 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
582 "Group \"%.*s\": leave (in state %s): "
583 "LeaveGroupRequest already in-transit",
584 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
585 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
586 return;
587 }
588
589 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
590 "Group \"%.*s\": leave (in state %s)",
591 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
592 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
593
594 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
595
596 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
597 rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
598 "Leaving group");
599 rd_kafka_LeaveGroupRequest(rkcg->rkcg_coord,
600 rkcg->rkcg_group_id,
601 rkcg->rkcg_member_id,
602 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
603 rd_kafka_cgrp_handle_LeaveGroup,
604 rkcg);
605 } else
606 rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk,
607 rkcg->rkcg_coord,
608 RD_KAFKA_RESP_ERR__WAIT_COORD,
609 NULL, NULL, rkcg);
610}
611
612
613/**
614 * Enqueue a rebalance op (if configured). 'partitions' is copied.
615 * This delegates the responsibility of assign() and unassign() to the
616 * application.
617 *
618 * Returns 1 if a rebalance op was enqueued, else 0.
619 * Returns 0 if there was no rebalance_cb or 'assignment' is NULL,
620 * in which case rd_kafka_cgrp_assign(rkcg,assignment) is called immediately.
621 */
622static int
623rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
624 rd_kafka_resp_err_t err,
625 rd_kafka_topic_partition_list_t *assignment,
626 const char *reason) {
627 rd_kafka_op_t *rko;
628
629 rd_kafka_wrlock(rkcg->rkcg_rk);
630 rkcg->rkcg_c.ts_rebalance = rd_clock();
631 rkcg->rkcg_c.rebalance_cnt++;
632 rd_kafka_wrunlock(rkcg->rkcg_rk);
633
634 /* Pause current partition set consumers until new assign() is called */
635 if (rkcg->rkcg_assignment)
636 rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 1,
637 RD_KAFKA_TOPPAR_F_LIB_PAUSE,
638 rkcg->rkcg_assignment);
639
640 if (!(rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE)
641 || !assignment
642 || rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) {
643 no_delegation:
644 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
645 rd_kafka_cgrp_assign(rkcg, assignment);
646 else
647 rd_kafka_cgrp_unassign(rkcg);
648 return 0;
649 }
650
651 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
652 "Group \"%s\": delegating %s of %d partition(s) "
653 "to application rebalance callback on queue %s: %s",
654 rkcg->rkcg_group_id->str,
655 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
656 "revoke":"assign", assignment->cnt,
657 rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
658
659 rd_kafka_cgrp_set_join_state(
660 rkcg,
661 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
662 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB :
663 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB);
664
665 rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
666 rko->rko_err = err;
667 rko->rko_u.rebalance.partitions =
668 rd_kafka_topic_partition_list_copy(assignment);
669
670 if (rd_kafka_q_enq(rkcg->rkcg_q, rko) == 0) {
671 /* Queue disabled, handle assignment here. */
672 goto no_delegation;
673 }
674
675 return 1;
676}
677
678
679/**
680 * @brief Run group assignment.
681 */
682static void
683rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg,
684 const char *protocol_name,
685 rd_kafka_resp_err_t err,
686 rd_kafka_metadata_t *metadata,
687 rd_kafka_group_member_t *members,
688 int member_cnt) {
689 char errstr[512];
690
691 if (err) {
692 rd_snprintf(errstr, sizeof(errstr),
693 "Failed to get cluster metadata: %s",
694 rd_kafka_err2str(err));
695 goto err;
696 }
697
698 *errstr = '\0';
699
700 /* Run assignor */
701 err = rd_kafka_assignor_run(rkcg, protocol_name, metadata,
702 members, member_cnt,
703 errstr, sizeof(errstr));
704
705 if (err) {
706 if (!*errstr)
707 rd_snprintf(errstr, sizeof(errstr), "%s",
708 rd_kafka_err2str(err));
709 goto err;
710 }
711
712 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "ASSIGNOR",
713 "Group \"%s\": \"%s\" assignor run for %d member(s)",
714 rkcg->rkcg_group_id->str, protocol_name, member_cnt);
715
716 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
717
718 /* Respond to broker with assignment set or error */
719 rd_kafka_SyncGroupRequest(rkcg->rkcg_coord,
720 rkcg->rkcg_group_id,
721 rkcg->rkcg_generation_id,
722 rkcg->rkcg_member_id,
723 members, err ? 0 : member_cnt,
724 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
725 rd_kafka_handle_SyncGroup, rkcg);
726 return;
727
728err:
729 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
730 "Group \"%s\": failed to run assignor \"%s\" for "
731 "%d member(s): %s",
732 rkcg->rkcg_group_id->str, protocol_name,
733 member_cnt, errstr);
734
735 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
736
737}
738
739
740
741/**
742 * @brief Op callback from handle_JoinGroup
743 */
744static rd_kafka_op_res_t
745rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk,
746 rd_kafka_q_t *rkq,
747 rd_kafka_op_t *rko) {
748 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
749
750 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
751 return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
752
753 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
754 return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
755
756 if (!rkcg->rkcg_group_leader.protocol) {
757 rd_kafka_dbg(rk, CGRP, "GRPLEADER",
758 "Group \"%.*s\": no longer leader: "
759 "not running assignor",
760 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
761 return RD_KAFKA_OP_RES_HANDLED;
762 }
763
764 rd_kafka_cgrp_assignor_run(rkcg,
765 rkcg->rkcg_group_leader.protocol,
766 rko->rko_err, rko->rko_u.metadata.md,
767 rkcg->rkcg_group_leader.members,
768 rkcg->rkcg_group_leader.member_cnt);
769
770 return RD_KAFKA_OP_RES_HANDLED;
771}
772
773
774/**
775 * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
776 *
777 * Protocol definition:
778 * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
779 *
780 * Returns 0 on success or -1 on error.
781 */
782static int
783rd_kafka_group_MemberMetadata_consumer_read (
784 rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm,
785 const rd_kafkap_str_t *GroupProtocol,
786 const rd_kafkap_bytes_t *MemberMetadata) {
787
788 rd_kafka_buf_t *rkbuf;
789 int16_t Version;
790 int32_t subscription_cnt;
791 rd_kafkap_bytes_t UserData;
792 const int log_decode_errors = LOG_ERR;
793 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
794
795 /* Create a shadow-buffer pointing to the metadata to ease parsing. */
796 rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data,
797 RD_KAFKAP_BYTES_LEN(MemberMetadata),
798 NULL);
799
800 rd_kafka_buf_read_i16(rkbuf, &Version);
801 rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
802
803 if (subscription_cnt > 10000 || subscription_cnt <= 0)
804 goto err;
805
806 rkgm->rkgm_subscription =
807 rd_kafka_topic_partition_list_new(subscription_cnt);
808
809 while (subscription_cnt-- > 0) {
810 rd_kafkap_str_t Topic;
811 char *topic_name;
812 rd_kafka_buf_read_str(rkbuf, &Topic);
813 RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
814 rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription,
815 topic_name,
816 RD_KAFKA_PARTITION_UA);
817 }
818
819 rd_kafka_buf_read_bytes(rkbuf, &UserData);
820 rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
821
822 rd_kafka_buf_destroy(rkbuf);
823
824 return 0;
825
826 err_parse:
827 err = rkbuf->rkbuf_err;
828
829 err:
830 rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
831 "Failed to parse MemberMetadata for \"%.*s\": %s",
832 RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
833 rd_kafka_err2str(err));
834 if (rkgm->rkgm_subscription) {
835 rd_kafka_topic_partition_list_destroy(rkgm->
836 rkgm_subscription);
837 rkgm->rkgm_subscription = NULL;
838 }
839
840 rd_kafka_buf_destroy(rkbuf);
841 return -1;
842}
843
844
845
846
847/**
848 * @brief cgrp handler for JoinGroup responses
849 * opaque must be the cgrp handle.
850 *
851 * @locality rdkafka main thread (unless ERR__DESTROY: arbitrary thread)
852 */
853static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
854 rd_kafka_broker_t *rkb,
855 rd_kafka_resp_err_t err,
856 rd_kafka_buf_t *rkbuf,
857 rd_kafka_buf_t *request,
858 void *opaque) {
859 rd_kafka_cgrp_t *rkcg = opaque;
860 const int log_decode_errors = LOG_ERR;
861 int16_t ErrorCode = 0;
862 int32_t GenerationId;
863 rd_kafkap_str_t Protocol, LeaderId, MyMemberId;
864 int32_t member_cnt;
865 int actions;
866 int i_am_leader = 0;
867
868 if (err == RD_KAFKA_RESP_ERR__DESTROY)
869 return; /* Terminating */
870
871 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
872 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
873 "JoinGroup response: discarding outdated request "
874 "(now in join-state %s)",
875 rd_kafka_cgrp_join_state_names[rkcg->
876 rkcg_join_state]);
877 return;
878 }
879
880 if (err) {
881 ErrorCode = err;
882 goto err;
883 }
884
885 if (request->rkbuf_reqhdr.ApiVersion >= 2)
886 rd_kafka_buf_read_throttle_time(rkbuf);
887
888 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
889 rd_kafka_buf_read_i32(rkbuf, &GenerationId);
890 rd_kafka_buf_read_str(rkbuf, &Protocol);
891 rd_kafka_buf_read_str(rkbuf, &LeaderId);
892 rd_kafka_buf_read_str(rkbuf, &MyMemberId);
893 rd_kafka_buf_read_i32(rkbuf, &member_cnt);
894
895 if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
896 /* Protocol not set, we will not be able to find
897 * a matching assignor so error out early. */
898 ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
899 }
900
901 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
902 "JoinGroup response: GenerationId %"PRId32", "
903 "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
904 "%"PRId32" members in group: %s",
905 GenerationId,
906 RD_KAFKAP_STR_PR(&Protocol),
907 RD_KAFKAP_STR_PR(&LeaderId),
908 !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "",
909 RD_KAFKAP_STR_PR(&MyMemberId),
910 member_cnt,
911 ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
912
913 if (!ErrorCode) {
914 char *my_member_id;
915 RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
916 rkcg->rkcg_generation_id = GenerationId;
917 rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
918 i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
919 } else {
920 rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000);
921 goto err;
922 }
923
924 if (i_am_leader) {
925 rd_kafka_group_member_t *members;
926 int i;
927 int sub_cnt = 0;
928 rd_list_t topics;
929 rd_kafka_op_t *rko;
930 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
931 "Elected leader for group \"%s\" "
932 "with %"PRId32" member(s)",
933 rkcg->rkcg_group_id->str, member_cnt);
934
935 if (member_cnt > 100000) {
936 err = RD_KAFKA_RESP_ERR__BAD_MSG;
937 goto err;
938 }
939
940 rd_list_init(&topics, member_cnt, rd_free);
941
942 members = rd_calloc(member_cnt, sizeof(*members));
943
944 for (i = 0 ; i < member_cnt ; i++) {
945 rd_kafkap_str_t MemberId;
946 rd_kafkap_bytes_t MemberMetadata;
947 rd_kafka_group_member_t *rkgm;
948
949 rd_kafka_buf_read_str(rkbuf, &MemberId);
950 rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
951
952 rkgm = &members[sub_cnt];
953 rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
954 rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
955
956 if (rd_kafka_group_MemberMetadata_consumer_read(
957 rkb, rkgm, &Protocol, &MemberMetadata)) {
958 /* Failed to parse this member's metadata,
959 * ignore it. */
960 } else {
961 sub_cnt++;
962 rkgm->rkgm_assignment =
963 rd_kafka_topic_partition_list_new(
964 rkgm->rkgm_subscription->size);
965 rd_kafka_topic_partition_list_get_topic_names(
966 rkgm->rkgm_subscription, &topics,
967 0/*dont include regex*/);
968 }
969
970 }
971
972 /* FIXME: What to do if parsing failed for some/all members?
973 * It is a sign of incompatibility. */
974
975
976 rd_kafka_cgrp_group_leader_reset(rkcg,
977 "JoinGroup response clean-up");
978
979 rkcg->rkcg_group_leader.protocol = RD_KAFKAP_STR_DUP(&Protocol);
980 rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
981 rkcg->rkcg_group_leader.members = members;
982 rkcg->rkcg_group_leader.member_cnt = sub_cnt;
983
984 rd_kafka_cgrp_set_join_state(
985 rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
986
987 /* The assignor will need metadata so fetch it asynchronously
988 * and run the assignor when we get a reply.
989 * Create a callback op that the generic metadata code
990 * will trigger when metadata has been parsed. */
991 rko = rd_kafka_op_new_cb(
992 rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
993 rd_kafka_cgrp_assignor_handle_Metadata_op);
994 rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
995
996 rd_kafka_MetadataRequest(rkb, &topics,
997 "partition assignor", rko);
998 rd_list_destroy(&topics);
999
1000 } else {
1001 rd_kafka_cgrp_set_join_state(
1002 rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
1003
1004 rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id,
1005 rkcg->rkcg_generation_id,
1006 rkcg->rkcg_member_id,
1007 NULL, 0,
1008 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1009 rd_kafka_handle_SyncGroup, rkcg);
1010
1011 }
1012
1013err:
1014 actions = rd_kafka_err_action(rkb, ErrorCode, request,
1015 RD_KAFKA_ERR_ACTION_IGNORE,
1016 RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
1017
1018 RD_KAFKA_ERR_ACTION_END);
1019
1020 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
1021 /* Re-query for coordinator */
1022 rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
1023 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
1024 }
1025
1026 /* No need for retries here since the join is intervalled,
1027 * see rkcg_join_intvl */
1028
1029 if (ErrorCode) {
1030 if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
1031 return; /* Termination */
1032
1033 if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1034 rd_kafka_q_op_err(rkcg->rkcg_q,
1035 RD_KAFKA_OP_CONSUMER_ERR,
1036 ErrorCode, 0, NULL, 0,
1037 "JoinGroup failed: %s",
1038 rd_kafka_err2str(ErrorCode));
1039
1040 if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
1041 rd_kafka_cgrp_set_member_id(rkcg, "");
1042 rd_kafka_cgrp_set_join_state(rkcg,
1043 RD_KAFKA_CGRP_JOIN_STATE_INIT);
1044 }
1045
1046 return;
1047
1048 err_parse:
1049 ErrorCode = rkbuf->rkbuf_err;
1050 goto err;
1051}
1052
1053
1054/**
1055 * @brief Check subscription against requested Metadata.
1056 */
1057static rd_kafka_op_res_t
1058rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq,
1059 rd_kafka_op_t *rko) {
1060 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1061
1062 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1063 return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
1064
1065 rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont rejoin*/);
1066
1067 return RD_KAFKA_OP_RES_HANDLED;
1068}
1069
1070
1071/**
1072 * @brief (Async) Refresh metadata (for cgrp's needs)
1073 *
1074 * @returns 1 if metadata refresh was requested, or 0 if metadata is
1075 * up to date, or -1 if no broker is available for metadata requests.
1076 *
1077 * @locks none
1078 * @locality rdkafka main thread
1079 */
1080static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg,
1081 int *metadata_agep,
1082 const char *reason) {
1083 rd_kafka_t *rk = rkcg->rkcg_rk;
1084 rd_kafka_op_t *rko;
1085 rd_list_t topics;
1086 rd_kafka_resp_err_t err;
1087
1088 rd_list_init(&topics, 8, rd_free);
1089
1090 /* Insert all non-wildcard topics in cache. */
1091 rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk,
1092 rkcg->rkcg_subscription,
1093 NULL, 0/*dont replace*/);
1094
1095 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
1096 /* For wildcard subscriptions make sure the
1097 * cached full metadata isn't too old. */
1098 int metadata_age = -1;
1099
1100 if (rk->rk_ts_full_metadata)
1101 metadata_age = (int)(rd_clock() -
1102 rk->rk_ts_full_metadata)/1000;
1103
1104 *metadata_agep = metadata_age;
1105
1106 if (metadata_age != -1 &&
1107 metadata_age <=
1108 /* The +1000 is since metadata.refresh.interval.ms
1109 * can be set to 0. */
1110 rk->rk_conf.metadata_refresh_interval_ms + 1000) {
1111 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
1112 "CGRPMETADATA",
1113 "%s: metadata for wildcard subscription "
1114 "is up to date (%dms old)",
1115 reason, *metadata_agep);
1116 rd_list_destroy(&topics);
1117 return 0; /* Up-to-date */
1118 }
1119
1120 } else {
1121 /* Check that all subscribed topics are in the cache. */
1122 int r;
1123
1124 rd_kafka_topic_partition_list_get_topic_names(
1125 rkcg->rkcg_subscription, &topics, 0/*no regexps*/);
1126
1127 rd_kafka_rdlock(rk);
1128 r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
1129 metadata_agep);
1130 rd_kafka_rdunlock(rk);
1131
1132 if (r == rd_list_cnt(&topics)) {
1133 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
1134 "CGRPMETADATA",
1135 "%s: metadata for subscription "
1136 "is up to date (%dms old)", reason,
1137 *metadata_agep);
1138 rd_list_destroy(&topics);
1139 return 0; /* Up-to-date and all topics exist. */
1140 }
1141
1142 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
1143 "CGRPMETADATA",
1144 "%s: metadata for subscription "
1145 "only available for %d/%d topics (%dms old)",
1146 reason, r, rd_list_cnt(&topics), *metadata_agep);
1147
1148 }
1149
1150 /* Async request, result will be triggered from
1151 * rd_kafka_parse_metadata(). */
1152 rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
1153 rd_kafka_cgrp_handle_Metadata_op);
1154 rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
1155
1156 err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
1157 reason, rko);
1158 if (err) {
1159 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
1160 "CGRPMETADATA",
1161 "%s: need to refresh metadata (%dms old) "
1162 "but no usable brokers available: %s",
1163 reason, *metadata_agep, rd_kafka_err2str(err));
1164 rd_kafka_op_destroy(rko);
1165 }
1166
1167 rd_list_destroy(&topics);
1168
1169 return err ? -1 : 1;
1170}
1171
1172
1173
1174static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) {
1175 int metadata_age;
1176
1177 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
1178 rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT)
1179 return;
1180
1181 /* On max.poll.interval.ms failure, do not rejoin group until the
1182 * application has called poll. */
1183 if ((rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) &&
1184 rd_kafka_max_poll_exceeded(rkcg->rkcg_rk))
1185 return;
1186
1187 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
1188
1189 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
1190 "Group \"%.*s\": join with %d (%d) subscribed topic(s)",
1191 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1192 rd_list_cnt(rkcg->rkcg_subscribed_topics),
1193 rkcg->rkcg_subscription->cnt);
1194
1195
1196 /* See if we need to query metadata to continue:
1197 * - if subscription contains wildcards:
1198 * * query all topics in cluster
1199 *
1200 * - if subscription does not contain wildcards but
1201 * some topics are missing from the local metadata cache:
1202 * * query subscribed topics (all cached ones)
1203 *
1204 * - otherwise:
1205 * * rely on topic metadata cache
1206 */
1207 /* We need up-to-date full metadata to continue,
1208 * refresh metadata if necessary. */
1209 if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
1210 "consumer join") == 1) {
1211 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "JOIN",
1212 "Group \"%.*s\": "
1213 "postponing join until up-to-date "
1214 "metadata is available",
1215 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
1216 return; /* ^ async call */
1217 }
1218
1219 if (rd_list_empty(rkcg->rkcg_subscribed_topics))
1220 rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont join*/);
1221
1222 if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
1223 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "JOIN",
1224 "Group \"%.*s\": "
1225 "no matching topics based on %dms old metadata: "
1226 "next metadata refresh in %dms",
1227 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1228 metadata_age,
1229 rkcg->rkcg_rk->rk_conf.
1230 metadata_refresh_interval_ms - metadata_age);
1231 return;
1232 }
1233
1234 rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "JOIN",
1235 "Joining group \"%.*s\" with %d subscribed topic(s)",
1236 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1237 rd_list_cnt(rkcg->rkcg_subscribed_topics));
1238
1239 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
1240 rd_kafka_JoinGroupRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id,
1241 rkcg->rkcg_member_id,
1242 rkcg->rkcg_rk->rk_conf.group_protocol_type,
1243 rkcg->rkcg_subscribed_topics,
1244 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1245 rd_kafka_cgrp_handle_JoinGroup, rkcg);
1246}
1247
1248/**
1249 * Rejoin group on update to effective subscribed topics list
1250 */
1251static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg) {
1252 /*
1253 * Clean-up group leader duties, if any.
1254 */
1255 rd_kafka_cgrp_group_leader_reset(rkcg, "Group rejoin");
1256
1257 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN",
1258 "Group \"%.*s\" rejoining in join-state %s "
1259 "with%s an assignment",
1260 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1261 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
1262 rkcg->rkcg_assignment ? "" : "out");
1263
1264 rd_kafka_cgrp_rebalance(rkcg, "group rejoin");
1265}
1266
1267/**
1268 * Update the effective list of subscribed topics and trigger a rejoin
1269 * if it changed.
1270 *
1271 * Set \p tinfos to NULL for clearing the list.
1272 *
1273 * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
1274 *
1275 * @returns 1 on change, else 0.
1276 *
1277 * @remark Takes ownership of \p tinfos
1278 */
1279static int
1280rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg,
1281 rd_list_t *tinfos) {
1282 rd_kafka_topic_info_t *tinfo;
1283 int i;
1284
1285 if (!tinfos) {
1286 if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
1287 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
1288 "Group \"%.*s\": "
1289 "clearing subscribed topics list (%d)",
1290 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1291 rd_list_cnt(rkcg->rkcg_subscribed_topics));
1292 tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
1293
1294 } else {
1295 if (rd_list_cnt(tinfos) == 0)
1296 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
1297 "Group \"%.*s\": "
1298 "no topics in metadata matched "
1299 "subscription",
1300 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
1301 }
1302
1303 /* Sort for comparison */
1304 rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
1305
1306 /* Compare to existing to see if anything changed. */
1307 if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
1308 rd_kafka_topic_info_cmp)) {
1309 /* No change */
1310 rd_list_destroy(tinfos);
1311 return 0;
1312 }
1313
1314 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
1315 "Group \"%.*s\": effective subscription list changed "
1316 "from %d to %d topic(s):",
1317 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1318 rd_list_cnt(rkcg->rkcg_subscribed_topics),
1319 rd_list_cnt(tinfos));
1320
1321 RD_LIST_FOREACH(tinfo, tinfos, i)
1322 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA,
1323 "SUBSCRIPTION",
1324 " Topic %s with %d partition(s)",
1325 tinfo->topic, tinfo->partition_cnt);
1326
1327 rd_list_destroy(rkcg->rkcg_subscribed_topics);
1328
1329 rkcg->rkcg_subscribed_topics = tinfos;
1330
1331 return 1;
1332}
1333
1334
1335
1336/**
1337 * @brief Handle Heartbeat response.
1338 */
1339void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
1340 rd_kafka_broker_t *rkb,
1341 rd_kafka_resp_err_t err,
1342 rd_kafka_buf_t *rkbuf,
1343 rd_kafka_buf_t *request,
1344 void *opaque) {
1345 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1346 const int log_decode_errors = LOG_ERR;
1347 int16_t ErrorCode = 0;
1348 int actions;
1349
1350 if (err) {
1351 if (err == RD_KAFKA_RESP_ERR__DESTROY)
1352 return; /* Terminating */
1353 ErrorCode = err;
1354 goto err;
1355 }
1356
1357 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1358
1359err:
1360 actions = rd_kafka_err_action(rkb, ErrorCode, request,
1361 RD_KAFKA_ERR_ACTION_END);
1362
1363 rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
1364 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
1365
1366 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
1367 /* Re-query for coordinator */
1368 rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
1369 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
1370 }
1371
1372 if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1373 if (rd_kafka_buf_retry(rkb, request)) {
1374 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
1375 return;
1376 }
1377 /* FALLTHRU */
1378 }
1379
1380 if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY)
1381 rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode);
1382
1383 return;
1384
1385 err_parse:
1386 ErrorCode = rkbuf->rkbuf_err;
1387 goto err;
1388}
1389
1390
1391
1392/**
1393 * @brief Send Heartbeat
1394 */
1395static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg) {
1396 /* Skip heartbeat if we have one in transit */
1397 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
1398 return;
1399
1400 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
1401 rd_kafka_HeartbeatRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id,
1402 rkcg->rkcg_generation_id,
1403 rkcg->rkcg_member_id,
1404 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1405 rd_kafka_cgrp_handle_Heartbeat, NULL);
1406}
1407
1408/**
1409 * Cgrp is now terminated: decommission it and signal back to application.
1410 */
1411static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) {
1412
1413 rd_kafka_assert(NULL, rkcg->rkcg_wait_unassign_cnt == 0);
1414 rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt == 0);
1415 rd_kafka_assert(NULL, !(rkcg->rkcg_flags&RD_KAFKA_CGRP_F_WAIT_UNASSIGN));
1416 rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
1417
1418 rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
1419 &rkcg->rkcg_offset_commit_tmr, 1/*lock*/);
1420
1421 rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
1422
1423 /* Disable and empty ops queue since there will be no
1424 * (broker) thread serving it anymore after the unassign_broker
1425 * below.
1426 * This prevents hang on destroy where responses are enqueued on rkcg_ops
1427 * without anything serving the queue. */
1428 rd_kafka_q_disable(rkcg->rkcg_ops);
1429 rd_kafka_q_purge(rkcg->rkcg_ops);
1430
1431 if (rkcg->rkcg_curr_coord)
1432 rd_kafka_cgrp_coord_clear_broker(rkcg);
1433
1434 if (rkcg->rkcg_coord) {
1435 rd_kafka_broker_destroy(rkcg->rkcg_coord);
1436 rkcg->rkcg_coord = NULL;
1437 }
1438
1439 if (rkcg->rkcg_reply_rko) {
1440 /* Signal back to application. */
1441 rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
1442 rkcg->rkcg_reply_rko, 0);
1443 rkcg->rkcg_reply_rko = NULL;
1444 }
1445}
1446
1447
1448/**
1449 * If a cgrp is terminating and all outstanding ops are now finished
1450 * then progress to final termination and return 1.
1451 * Else returns 0.
1452 */
1453static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) {
1454
1455 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
1456 return 1;
1457
1458 if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
1459 return 0;
1460
1461 /* Check if wait-coord queue has timed out. */
1462 if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
1463 rkcg->rkcg_ts_terminate +
1464 (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
1465 rd_clock()) {
1466 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
1467 "Group \"%s\": timing out %d op(s) in "
1468 "wait-for-coordinator queue",
1469 rkcg->rkcg_group_id->str,
1470 rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
1471 rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
1472 if (rd_kafka_q_concat(rkcg->rkcg_ops,
1473 rkcg->rkcg_wait_coord_q) == -1) {
1474 /* ops queue shut down, purge coord queue */
1475 rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
1476 }
1477 }
1478
1479 if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
1480 rd_list_empty(&rkcg->rkcg_toppars) &&
1481 rkcg->rkcg_wait_unassign_cnt == 0 &&
1482 rkcg->rkcg_wait_commit_cnt == 0 &&
1483 !(rkcg->rkcg_flags & (RD_KAFKA_CGRP_F_WAIT_UNASSIGN |
1484 RD_KAFKA_CGRP_F_WAIT_LEAVE))) {
1485 /* Since we might be deep down in a 'rko' handler
1486 * called from cgrp_op_serve() we cant call terminated()
1487 * directly since it will decommission the rkcg_ops queue
1488 * that might be locked by intermediate functions.
1489 * Instead set the TERM state and let the cgrp terminate
1490 * at its own discretion. */
1491 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
1492 return 1;
1493 } else {
1494 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
1495 "Group \"%s\": "
1496 "waiting for %s%d toppar(s), %d unassignment(s), "
1497 "%d commit(s)%s%s (state %s, join-state %s) "
1498 "before terminating",
1499 rkcg->rkcg_group_id->str,
1500 RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) ?
1501 "rebalance_cb, ": "",
1502 rd_list_cnt(&rkcg->rkcg_toppars),
1503 rkcg->rkcg_wait_unassign_cnt,
1504 rkcg->rkcg_wait_commit_cnt,
1505 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
1506 ", wait-unassign flag," : "",
1507 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)?
1508 ", wait-leave," : "",
1509 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1510 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
1511 return 0;
1512 }
1513}
1514
1515
1516/**
1517 * Add partition to this cgrp management
1518 */
1519static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg,
1520 rd_kafka_toppar_t *rktp) {
1521 rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD",
1522 "Group \"%s\": add %s [%"PRId32"]",
1523 rkcg->rkcg_group_id->str,
1524 rktp->rktp_rkt->rkt_topic->str,
1525 rktp->rktp_partition);
1526
1527 rd_kafka_assert(rkcg->rkcg_rk, !rktp->rktp_s_for_cgrp);
1528 rktp->rktp_s_for_cgrp = rd_kafka_toppar_keep(rktp);
1529 rd_list_add(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
1530}
1531
1532/**
1533 * Remove partition from this cgrp management
1534 */
1535static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg,
1536 rd_kafka_toppar_t *rktp) {
1537 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
1538 "Group \"%s\": delete %s [%"PRId32"]",
1539 rkcg->rkcg_group_id->str,
1540 rktp->rktp_rkt->rkt_topic->str,
1541 rktp->rktp_partition);
1542 rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_s_for_cgrp);
1543
1544 rd_list_remove(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
1545 rd_kafka_toppar_destroy(rktp->rktp_s_for_cgrp);
1546 rktp->rktp_s_for_cgrp = NULL;
1547
1548 rd_kafka_cgrp_try_terminate(rkcg);
1549}
1550
1551
1552
1553/**
1554 * Reply for OffsetFetch from call below.
1555 */
1556static void rd_kafka_cgrp_offsets_fetch_response (
1557 rd_kafka_t *rk,
1558 rd_kafka_broker_t *rkb,
1559 rd_kafka_resp_err_t err,
1560 rd_kafka_buf_t *reply,
1561 rd_kafka_buf_t *request,
1562 void *opaque) {
1563 rd_kafka_topic_partition_list_t *offsets = opaque;
1564 rd_kafka_cgrp_t *rkcg;
1565
1566 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
1567 /* Termination, quick cleanup. */
1568 rd_kafka_topic_partition_list_destroy(offsets);
1569 return;
1570 }
1571
1572 rkcg = rd_kafka_cgrp_get(rk);
1573
1574 if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version)) {
1575 rd_kafka_topic_partition_list_destroy(offsets);
1576 return;
1577 }
1578
1579 rd_kafka_topic_partition_list_log(rk, "OFFSETFETCH",
1580 RD_KAFKA_DBG_TOPIC|RD_KAFKA_DBG_CGRP,
1581 offsets);
1582 /* If all partitions already had usable offsets then there
1583 * was no request sent and thus no reply, the offsets list is
1584 * good to go. */
1585 if (reply) {
1586 err = rd_kafka_handle_OffsetFetch(rk, rkb, err,
1587 reply, request, offsets,
1588 1/* Update toppars */);
1589 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1590 return; /* retrying */
1591 }
1592 if (err) {
1593 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
1594 "Offset fetch error: %s",
1595 rd_kafka_err2str(err));
1596
1597 if (err != RD_KAFKA_RESP_ERR__WAIT_COORD)
1598 rd_kafka_q_op_err(rkcg->rkcg_q,
1599 RD_KAFKA_OP_CONSUMER_ERR, err, 0,
1600 NULL, 0,
1601 "Failed to fetch offsets: %s",
1602 rd_kafka_err2str(err));
1603 } else {
1604 if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
1605 rd_kafka_cgrp_partitions_fetch_start(
1606 rkcg, offsets, 1 /* usable offsets */);
1607 else
1608 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
1609 "Group \"%.*s\": "
1610 "ignoring Offset fetch response for "
1611 "%d partition(s): in state %s",
1612 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
1613 offsets ? offsets->cnt : -1,
1614 rd_kafka_cgrp_join_state_names[
1615 rkcg->rkcg_join_state]);
1616 }
1617
1618 rd_kafka_topic_partition_list_destroy(offsets);
1619}
1620
1621/**
1622 * Fetch offsets for a list of partitions
1623 */
1624static void
1625rd_kafka_cgrp_offsets_fetch (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb,
1626 rd_kafka_topic_partition_list_t *offsets) {
1627 rd_kafka_topic_partition_list_t *use_offsets;
1628
1629 /* Make a copy of the offsets */
1630 use_offsets = rd_kafka_topic_partition_list_copy(offsets);
1631
1632 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb)
1633 rd_kafka_cgrp_offsets_fetch_response(
1634 rkcg->rkcg_rk, rkb, RD_KAFKA_RESP_ERR__WAIT_COORD,
1635 NULL, NULL, use_offsets);
1636 else {
1637 rd_kafka_OffsetFetchRequest(
1638 rkb, 1, offsets,
1639 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, rkcg->rkcg_version),
1640 rd_kafka_cgrp_offsets_fetch_response,
1641 use_offsets);
1642 }
1643
1644}
1645
1646
1647/**
1648 * Start fetching all partitions in 'assignment' (async)
1649 */
1650static void
1651rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
1652 rd_kafka_topic_partition_list_t
1653 *assignment, int usable_offsets,
1654 int line) {
1655 int i;
1656
1657 /* If waiting for offsets to commit we need that to finish first
1658 * before starting fetchers (which might fetch those stored offsets).*/
1659 if (rkcg->rkcg_wait_commit_cnt > 0) {
1660 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
1661 "Group \"%s\": not starting fetchers "
1662 "for %d assigned partition(s) in join-state %s "
1663 "(usable_offsets=%s, v%"PRId32", line %d): "
1664 "waiting for %d commit(s)",
1665 rkcg->rkcg_group_id->str, assignment->cnt,
1666 rd_kafka_cgrp_join_state_names[rkcg->
1667 rkcg_join_state],
1668 usable_offsets ? "yes":"no",
1669 rkcg->rkcg_version, line,
1670 rkcg->rkcg_wait_commit_cnt);
1671 return;
1672 }
1673
1674 rd_kafka_cgrp_version_new_barrier(rkcg);
1675
1676 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
1677 "Group \"%s\": starting fetchers for %d assigned "
1678 "partition(s) in join-state %s "
1679 "(usable_offsets=%s, v%"PRId32", line %d)",
1680 rkcg->rkcg_group_id->str, assignment->cnt,
1681 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
1682 usable_offsets ? "yes":"no",
1683 rkcg->rkcg_version, line);
1684
1685 rd_kafka_topic_partition_list_log(rkcg->rkcg_rk,
1686 "FETCHSTART",
1687 RD_KAFKA_DBG_TOPIC|RD_KAFKA_DBG_CGRP,
1688 assignment);
1689
1690 if (assignment->cnt == 0)
1691 return;
1692
1693 /* Check if offsets are really unusable, this is to catch the
1694 * case where the entire assignment has absolute offsets set which
1695 * should make us skip offset lookups. */
1696 if (!usable_offsets)
1697 usable_offsets =
1698 rd_kafka_topic_partition_list_count_abs_offsets(
1699 assignment) == assignment->cnt;
1700
1701 if (!usable_offsets &&
1702 rkcg->rkcg_rk->rk_conf.offset_store_method ==
1703 RD_KAFKA_OFFSET_METHOD_BROKER) {
1704
1705 /* Fetch offsets for all assigned partitions */
1706 rd_kafka_cgrp_offsets_fetch(rkcg, rkcg->rkcg_coord,
1707 assignment);
1708
1709 } else {
1710 rd_kafka_cgrp_set_join_state(rkcg,
1711 RD_KAFKA_CGRP_JOIN_STATE_STARTED);
1712
1713 /* Start a timer to enforce `max.poll.interval.ms`.
1714 * Instead of restarting the timer on each ...poll() call,
1715 * which would be costly (once per message), set up an
1716 * intervalled timer that checks a timestamp
1717 * (that is updated on ..poll()).
1718 * The timer interval is 2 hz */
1719 rd_kafka_timer_start(&rkcg->rkcg_rk->rk_timers,
1720 &rkcg->rkcg_max_poll_interval_tmr,
1721 500 * 1000ll /* 500ms */,
1722 rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
1723 rkcg);
1724
1725 for (i = 0 ; i < assignment->cnt ; i++) {
1726 rd_kafka_topic_partition_t *rktpar =
1727 &assignment->elems[i];
1728 shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
1729 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
1730
1731 if (!rktp->rktp_assigned) {
1732 rktp->rktp_assigned = 1;
1733 rkcg->rkcg_assigned_cnt++;
1734
1735 /* Start fetcher for partition and
1736 * forward partition's fetchq to
1737 * consumer groups queue. */
1738 rd_kafka_toppar_op_fetch_start(
1739 rktp, rktpar->offset,
1740 rkcg->rkcg_q, RD_KAFKA_NO_REPLYQ);
1741 } else {
1742 int64_t offset;
1743 /* Fetcher already started,
1744 * just do seek to update offset */
1745 rd_kafka_toppar_lock(rktp);
1746 if (rktpar->offset < rktp->rktp_app_offset)
1747 offset = rktp->rktp_app_offset;
1748 else
1749 offset = rktpar->offset;
1750 rd_kafka_toppar_unlock(rktp);
1751 rd_kafka_toppar_op_seek(rktp, offset,
1752 RD_KAFKA_NO_REPLYQ);
1753 }
1754 }
1755 }
1756
1757 rd_kafka_assert(NULL, rkcg->rkcg_assigned_cnt <=
1758 (rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0));
1759}
1760
1761
1762
1763
1764
1765/**
1766 * @brief Defer offset commit (rko) until coordinator is available.
1767 *
1768 * @returns 1 if the rko was deferred or 0 if the defer queue is disabled
1769 * or rko already deferred.
1770 */
1771static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg,
1772 rd_kafka_op_t *rko,
1773 const char *reason) {
1774
1775 /* wait_coord_q is disabled session.timeout.ms after
1776 * group close() has been initated. */
1777 if (rko->rko_u.offset_commit.ts_timeout != 0 ||
1778 !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
1779 return 0;
1780
1781 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
1782 "Group \"%s\": "
1783 "unable to OffsetCommit in state %s: %s: "
1784 "coordinator (%s) is unavailable: "
1785 "retrying later",
1786 rkcg->rkcg_group_id->str,
1787 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1788 reason,
1789 rkcg->rkcg_curr_coord ?
1790 rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
1791 "none");
1792
1793 rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
1794 rko->rko_u.offset_commit.ts_timeout = rd_clock() +
1795 (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms
1796 * 1000);
1797 rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
1798
1799 return 1;
1800}
1801
1802
1803/**
1804 * @brief Handler of OffsetCommit response (after parsing).
1805 * @remark \p offsets may be NULL if \p err is set
1806 * @returns the number of partitions with errors encountered
1807 */
1808static int
1809rd_kafka_cgrp_handle_OffsetCommit (rd_kafka_cgrp_t *rkcg,
1810 rd_kafka_resp_err_t err,
1811 rd_kafka_topic_partition_list_t
1812 *offsets) {
1813 int i;
1814 int errcnt = 0;
1815
1816 if (!err) {
1817 /* Update toppars' committed offset */
1818 for (i = 0 ; i < offsets->cnt ; i++) {
1819 rd_kafka_topic_partition_t *rktpar =&offsets->elems[i];
1820 shptr_rd_kafka_toppar_t *s_rktp;
1821 rd_kafka_toppar_t *rktp;
1822
1823 if (unlikely(rktpar->err)) {
1824 rd_kafka_dbg(rkcg->rkcg_rk, TOPIC,
1825 "OFFSET",
1826 "OffsetCommit failed for "
1827 "%s [%"PRId32"] at offset "
1828 "%"PRId64": %s",
1829 rktpar->topic, rktpar->partition,
1830 rktpar->offset,
1831 rd_kafka_err2str(rktpar->err));
1832 errcnt++;
1833 continue;
1834 } else if (unlikely(rktpar->offset < 0))
1835 continue;
1836
1837 s_rktp = rd_kafka_topic_partition_list_get_toppar(
1838 rkcg->rkcg_rk, rktpar);
1839 if (!s_rktp)
1840 continue;
1841
1842 rktp = rd_kafka_toppar_s2i(s_rktp);
1843 rd_kafka_toppar_lock(rktp);
1844 rktp->rktp_committed_offset = rktpar->offset;
1845 rd_kafka_toppar_unlock(rktp);
1846
1847 rd_kafka_toppar_destroy(s_rktp);
1848 }
1849 }
1850
1851 if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
1852 rd_kafka_cgrp_check_unassign_done(rkcg, "OffsetCommit done");
1853
1854 rd_kafka_cgrp_try_terminate(rkcg);
1855
1856 return errcnt;
1857}
1858
1859
1860
1861
1862/**
1863 * Handle OffsetCommitResponse
1864 * Takes the original 'rko' as opaque argument.
1865 * @remark \p rkb, rkbuf, and request may be NULL in a number of
1866 * error cases (e.g., _NO_OFFSET, _WAIT_COORD)
1867 */
1868static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
1869 rd_kafka_broker_t *rkb,
1870 rd_kafka_resp_err_t err,
1871 rd_kafka_buf_t *rkbuf,
1872 rd_kafka_buf_t *request,
1873 void *opaque) {
1874 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1875 rd_kafka_op_t *rko_orig = opaque;
1876 rd_kafka_topic_partition_list_t *offsets =
1877 rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
1878 int errcnt;
1879 int offset_commit_cb_served = 0;
1880
1881 RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
1882
1883 if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version))
1884 err = RD_KAFKA_RESP_ERR__DESTROY;
1885
1886 err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf,
1887 request, offsets);
1888
1889 if (rkb)
1890 rd_rkb_dbg(rkb, CGRP, "COMMIT",
1891 "OffsetCommit for %d partition(s): %s: returned: %s",
1892 offsets ? offsets->cnt : -1,
1893 rko_orig->rko_u.offset_commit.reason,
1894 rd_kafka_err2str(err));
1895 else
1896 rd_kafka_dbg(rk, CGRP, "COMMIT",
1897 "OffsetCommit for %d partition(s): %s: returned: %s",
1898 offsets ? offsets->cnt : -1,
1899 rko_orig->rko_u.offset_commit.reason,
1900 rd_kafka_err2str(err));
1901
1902 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1903 return; /* Retrying */
1904 else if (err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP ||
1905 err == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
1906
1907 /* future-proofing, see timeout_scan(). */
1908 rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
1909
1910 if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
1911 rd_kafka_err2str(err)))
1912 return;
1913
1914 /* FALLTHRU and error out */
1915 }
1916
1917 rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt > 0);
1918 rkcg->rkcg_wait_commit_cnt--;
1919
1920 if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
1921 if (rkcg->rkcg_wait_commit_cnt == 0 &&
1922 rkcg->rkcg_assignment &&
1923 RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
1924 rd_kafka_cgrp_partitions_fetch_start(rkcg,
1925 rkcg->rkcg_assignment, 0);
1926 }
1927
1928 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1929 (err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
1930 rko_orig->rko_u.offset_commit.silent_empty)) {
1931 rd_kafka_op_destroy(rko_orig);
1932 rd_kafka_cgrp_check_unassign_done(
1933 rkcg,
1934 err == RD_KAFKA_RESP_ERR__DESTROY ?
1935 "OffsetCommit done (__DESTROY)" :
1936 "OffsetCommit done (__NO_OFFSET)");
1937 return;
1938 }
1939
1940 /* Call on_commit interceptors */
1941 if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
1942 err != RD_KAFKA_RESP_ERR__DESTROY &&
1943 offsets && offsets->cnt > 0)
1944 rd_kafka_interceptors_on_commit(rk, offsets, err);
1945
1946
1947 /* If no special callback is set but a offset_commit_cb has
1948 * been set in conf then post an event for the latter. */
1949 if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
1950 rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
1951
1952 rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
1953
1954 if (offsets)
1955 rko_reply->rko_u.offset_commit.partitions =
1956 rd_kafka_topic_partition_list_copy(offsets);
1957
1958 rko_reply->rko_u.offset_commit.cb =
1959 rk->rk_conf.offset_commit_cb;
1960 rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
1961
1962 rd_kafka_q_enq(rk->rk_rep, rko_reply);
1963 offset_commit_cb_served++;
1964 }
1965
1966
1967 /* Enqueue reply to requester's queue, if any. */
1968 if (rko_orig->rko_replyq.q) {
1969 rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
1970
1971 rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
1972
1973 /* Copy offset & partitions & callbacks to reply op */
1974 rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
1975 if (offsets)
1976 rko_reply->rko_u.offset_commit.partitions =
1977 rd_kafka_topic_partition_list_copy(offsets);
1978 if (rko_reply->rko_u.offset_commit.reason)
1979 rko_reply->rko_u.offset_commit.reason =
1980 rd_strdup(rko_reply->rko_u.offset_commit.reason);
1981
1982 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
1983 offset_commit_cb_served++;
1984 }
1985
1986 errcnt = rd_kafka_cgrp_handle_OffsetCommit(rkcg, err, offsets);
1987
1988 if (!offset_commit_cb_served &&
1989 err != RD_KAFKA_RESP_ERR_NO_ERROR &&
1990 err != RD_KAFKA_RESP_ERR__NO_OFFSET) {
1991 /* If there is no callback or handler for this (auto)
1992 * commit then raise an error to the application (#1043) */
1993 char tmp[512];
1994
1995 rd_kafka_topic_partition_list_str(
1996 offsets, tmp, sizeof(tmp),
1997 /*no partition-errs if a global error*/
1998 RD_KAFKA_FMT_F_OFFSET |
1999 (err ? 0 : RD_KAFKA_FMT_F_ONLY_ERR));
2000
2001 rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
2002 "Offset commit (%s) failed "
2003 "for %d/%d partition(s): "
2004 "%s%s%s",
2005 rko_orig->rko_u.offset_commit.reason,
2006 err ? offsets->cnt : errcnt, offsets->cnt,
2007 err ? rd_kafka_err2str(err) : "",
2008 err ? ": " : "",
2009 tmp);
2010 }
2011
2012 rd_kafka_op_destroy(rko_orig);
2013}
2014
2015
2016static size_t rd_kafka_topic_partition_has_absolute_offset (
2017 const rd_kafka_topic_partition_t *rktpar, void *opaque) {
2018 return rktpar->offset >= 0 ? 1 : 0;
2019}
2020
2021
2022/**
2023 * Commit a list of offsets.
2024 * Reuse the orignating 'rko' for the async reply.
2025 * 'rko->rko_payload' should either by NULL (to commit current assignment) or
2026 * a proper topic_partition_list_t with offsets to commit.
2027 * The offset list will be altered.
2028 *
2029 * \p rko...silent_empty: if there are no offsets to commit bail out
2030 * silently without posting an op on the reply queue.
2031 * \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions
2032 *
2033 * \p op_version: cgrp's op version to use (or 0)
2034 *
2035 * Locality: cgrp thread
2036 */
2037static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
2038 rd_kafka_op_t *rko,
2039 int set_offsets,
2040 const char *reason,
2041 int op_version) {
2042 rd_kafka_topic_partition_list_t *offsets;
2043 rd_kafka_resp_err_t err;
2044 int valid_offsets = 0;
2045
2046 /* If offsets is NULL we shall use the current assignment. */
2047 if (!rko->rko_u.offset_commit.partitions && rkcg->rkcg_assignment)
2048 rko->rko_u.offset_commit.partitions =
2049 rd_kafka_topic_partition_list_copy(
2050 rkcg->rkcg_assignment);
2051
2052 offsets = rko->rko_u.offset_commit.partitions;
2053
2054 if (offsets) {
2055 /* Set offsets to commits */
2056 if (set_offsets)
2057 rd_kafka_topic_partition_list_set_offsets(
2058 rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1,
2059 RD_KAFKA_OFFSET_INVALID/* def */,
2060 1 /* is commit */);
2061
2062 /* Check the number of valid offsets to commit. */
2063 valid_offsets = (int)rd_kafka_topic_partition_list_sum(
2064 offsets,
2065 rd_kafka_topic_partition_has_absolute_offset, NULL);
2066 }
2067
2068 if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
2069 /* wait_commit_cnt has already been increased for
2070 * reprocessed ops. */
2071 rkcg->rkcg_wait_commit_cnt++;
2072 }
2073
2074 if (!valid_offsets) {
2075 /* No valid offsets */
2076 err = RD_KAFKA_RESP_ERR__NO_OFFSET;
2077 goto err;
2078 }
2079
2080 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) {
2081 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "COMMIT",
2082 "Deferring \"%s\" offset commit "
2083 "for %d partition(s) in state %s: "
2084 "no coordinator available",
2085 reason, valid_offsets,
2086 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
2087
2088 if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
2089 return;
2090
2091 err = RD_KAFKA_RESP_ERR__WAIT_COORD;
2092
2093 } else {
2094 int r;
2095
2096 rd_rkb_dbg(rkcg->rkcg_coord, CONSUMER, "COMMIT",
2097 "Committing offsets for %d partition(s): %s",
2098 valid_offsets, reason);
2099
2100 /* Send OffsetCommit */
2101 r = rd_kafka_OffsetCommitRequest(
2102 rkcg->rkcg_coord, rkcg, 1, offsets,
2103 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, op_version),
2104 rd_kafka_cgrp_op_handle_OffsetCommit, rko,
2105 reason);
2106
2107 /* Must have valid offsets to commit if we get here */
2108 rd_kafka_assert(NULL, r != 0);
2109
2110 return;
2111 }
2112
2113
2114
2115 err:
2116 /* Propagate error to whoever wanted offset committed. */
2117 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
2118 "OffsetCommit internal error: %s", rd_kafka_err2str(err));
2119 rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, err,
2120 NULL, NULL, rko);
2121}
2122
2123
2124/**
2125 * Commit offsets for all assigned partitions.
2126 */
2127static void
2128rd_kafka_cgrp_assigned_offsets_commit (rd_kafka_cgrp_t *rkcg,
2129 const rd_kafka_topic_partition_list_t
2130 *offsets, const char *reason) {
2131 rd_kafka_op_t *rko;
2132
2133 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
2134 rko->rko_u.offset_commit.reason = rd_strdup(reason);
2135 if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT) {
2136 rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
2137 rko->rko_u.offset_commit.cb =
2138 rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
2139 rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
2140 }
2141 /* NULL partitions means current assignment */
2142 if (offsets)
2143 rko->rko_u.offset_commit.partitions =
2144 rd_kafka_topic_partition_list_copy(offsets);
2145 rko->rko_u.offset_commit.silent_empty = 1;
2146 rd_kafka_cgrp_offsets_commit(rkcg, rko, 1/* set offsets */, reason,
2147 rkcg->rkcg_version);
2148}
2149
2150
2151/**
2152 * auto.commit.interval.ms commit timer callback.
2153 *
2154 * Trigger a group offset commit.
2155 *
2156 * Locality: rdkafka main thread
2157 */
2158static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
2159 void *arg) {
2160 rd_kafka_cgrp_t *rkcg = arg;
2161
2162 rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL,
2163 "cgrp auto commit timer");
2164}
2165
2166
2167
2168
2169/**
2170 * Call when all unassign operations are done to transition to the next state
2171 */
2172static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg,
2173 const char *reason) {
2174 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
2175 "Group \"%s\": unassign done in state %s (join state %s): "
2176 "%s: %s",
2177 rkcg->rkcg_group_id->str,
2178 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2179 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2180 rkcg->rkcg_assignment ?
2181 "with new assignment" : "without new assignment",
2182 reason);
2183
2184 /* Don't send Leave when termating with NO_CONSUMER_CLOSE flag */
2185 if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
2186 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
2187
2188 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN) {
2189 rd_kafka_cgrp_leave(rkcg);
2190 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
2191 }
2192
2193 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) {
2194 rd_kafka_cgrp_try_terminate(rkcg);
2195 return;
2196 }
2197
2198 if (rkcg->rkcg_assignment) {
2199 rd_kafka_cgrp_set_join_state(rkcg,
2200 RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
2201 if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
2202 rd_kafka_cgrp_partitions_fetch_start(
2203 rkcg, rkcg->rkcg_assignment, 0);
2204 } else {
2205 rd_kafka_cgrp_set_join_state(rkcg,
2206 RD_KAFKA_CGRP_JOIN_STATE_INIT);
2207 }
2208
2209 rd_kafka_cgrp_try_terminate(rkcg);
2210}
2211
2212
2213/**
2214 * Checks if the current unassignment is done and if so
2215 * calls .._done().
2216 * Else does nothing.
2217 */
2218static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
2219 const char *reason) {
2220 if (rkcg->rkcg_wait_unassign_cnt > 0 ||
2221 rkcg->rkcg_assigned_cnt > 0 ||
2222 rkcg->rkcg_wait_commit_cnt > 0 ||
2223 rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN) {
2224
2225 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STARTED)
2226 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
2227 "Unassign not done yet "
2228 "(%d wait_unassign, %d assigned, "
2229 "%d wait commit"
2230 "%s, join state %s): %s",
2231 rkcg->rkcg_wait_unassign_cnt,
2232 rkcg->rkcg_assigned_cnt,
2233 rkcg->rkcg_wait_commit_cnt,
2234 (rkcg->rkcg_flags &
2235 RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
2236 ", F_WAIT_UNASSIGN" : "",
2237 rd_kafka_cgrp_join_state_names[
2238 rkcg->rkcg_join_state],
2239 reason);
2240
2241 return;
2242 }
2243
2244 rd_kafka_cgrp_unassign_done(rkcg, reason);
2245}
2246
2247
2248
2249/**
2250 * Remove existing assignment.
2251 */
2252static rd_kafka_resp_err_t
2253rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
2254 int i;
2255 rd_kafka_topic_partition_list_t *old_assignment;
2256
2257 rd_kafka_cgrp_set_join_state(rkcg,
2258 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN);
2259
2260 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
2261 old_assignment = rkcg->rkcg_assignment;
2262 if (!old_assignment) {
2263 rd_kafka_cgrp_check_unassign_done(
2264 rkcg, "unassign (no previous assignment)");
2265 return RD_KAFKA_RESP_ERR_NO_ERROR;
2266 }
2267 rkcg->rkcg_assignment = NULL;
2268
2269 rd_kafka_cgrp_version_new_barrier(rkcg);
2270
2271 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "UNASSIGN",
2272 "Group \"%s\": unassigning %d partition(s) (v%"PRId32")",
2273 rkcg->rkcg_group_id->str, old_assignment->cnt,
2274 rkcg->rkcg_version);
2275
2276 if (rkcg->rkcg_rk->rk_conf.offset_store_method ==
2277 RD_KAFKA_OFFSET_METHOD_BROKER &&
2278 rkcg->rkcg_rk->rk_conf.enable_auto_commit &&
2279 !rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) {
2280 /* Commit all offsets for all assigned partitions to broker */
2281 rd_kafka_cgrp_assigned_offsets_commit(rkcg, old_assignment,
2282 "unassign");
2283 }
2284
2285 for (i = 0 ; i < old_assignment->cnt ; i++) {
2286 rd_kafka_topic_partition_t *rktpar;
2287 shptr_rd_kafka_toppar_t *s_rktp;
2288 rd_kafka_toppar_t *rktp;
2289
2290 rktpar = &old_assignment->elems[i];
2291 s_rktp = rktpar->_private;
2292 rktp = rd_kafka_toppar_s2i(s_rktp);
2293
2294 if (rktp->rktp_assigned) {
2295 rd_kafka_toppar_op_fetch_stop(
2296 rktp, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0));
2297 rkcg->rkcg_wait_unassign_cnt++;
2298 }
2299
2300 rd_kafka_toppar_lock(rktp);
2301 rd_kafka_toppar_desired_del(rktp);
2302 rd_kafka_toppar_unlock(rktp);
2303 }
2304
2305 /* Resume partition consumption. */
2306 rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 0/*resume*/,
2307 RD_KAFKA_TOPPAR_F_LIB_PAUSE,
2308 old_assignment);
2309
2310 rd_kafka_topic_partition_list_destroy(old_assignment);
2311
2312 rd_kafka_cgrp_check_unassign_done(rkcg, "unassign");
2313
2314 return RD_KAFKA_RESP_ERR_NO_ERROR;
2315}
2316
2317
2318/**
2319 * Set new atomic partition assignment
2320 * May update \p assignment but will not hold on to it.
2321 */
2322static void
2323rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
2324 rd_kafka_topic_partition_list_t *assignment) {
2325 int i;
2326
2327 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "ASSIGN",
2328 "Group \"%s\": new assignment of %d partition(s) "
2329 "in join state %s",
2330 rkcg->rkcg_group_id->str,
2331 assignment ? assignment->cnt : 0,
2332 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
2333
2334 /* Get toppar object for each partition.
2335 * This is to make sure the rktp stays alive during unassign(). */
2336 for (i = 0 ; assignment && i < assignment->cnt ; i++) {
2337 rd_kafka_topic_partition_t *rktpar;
2338 shptr_rd_kafka_toppar_t *s_rktp;
2339
2340 rktpar = &assignment->elems[i];
2341
2342 /* Use existing toppar if set */
2343 if (rktpar->_private)
2344 continue;
2345
2346 s_rktp = rd_kafka_toppar_get2(rkcg->rkcg_rk,
2347 rktpar->topic,
2348 rktpar->partition,
2349 0/*no-ua*/, 1/*create-on-miss*/);
2350 if (s_rktp)
2351 rktpar->_private = s_rktp;
2352 }
2353
2354 rd_kafka_cgrp_version_new_barrier(rkcg);
2355
2356 rd_kafka_wrlock(rkcg->rkcg_rk);
2357 rkcg->rkcg_c.assignment_size = assignment ? assignment->cnt : 0;
2358 rd_kafka_wrunlock(rkcg->rkcg_rk);
2359
2360
2361 /* Remove existing assignment (async operation) */
2362 if (rkcg->rkcg_assignment)
2363 rd_kafka_cgrp_unassign(rkcg);
2364
2365 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
2366 "Group \"%s\": assigning %d partition(s) in join state %s",
2367 rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0,
2368 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
2369
2370
2371 if (assignment) {
2372 rkcg->rkcg_assignment =
2373 rd_kafka_topic_partition_list_copy(assignment);
2374
2375 /* Mark partition(s) as desired */
2376 for (i = 0 ; i < rkcg->rkcg_assignment->cnt ; i++) {
2377 rd_kafka_topic_partition_t *rktpar =
2378 &rkcg->rkcg_assignment->elems[i];
2379 shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
2380 rd_kafka_toppar_t *rktp =
2381 rd_kafka_toppar_s2i(s_rktp);
2382 rd_kafka_toppar_lock(rktp);
2383 rd_kafka_toppar_desired_add0(rktp);
2384 rd_kafka_toppar_unlock(rktp);
2385 }
2386 }
2387
2388 if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
2389 return;
2390
2391 rd_dassert(rkcg->rkcg_wait_unassign_cnt == 0);
2392
2393 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
2394
2395 if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) && rkcg->rkcg_assignment) {
2396 /* No existing assignment that needs to be decommissioned,
2397 * start partition fetchers right away */
2398 rd_kafka_cgrp_partitions_fetch_start(
2399 rkcg, rkcg->rkcg_assignment, 0);
2400 }
2401}
2402
2403
2404
2405
2406/**
2407 * Handle a rebalance-triggered partition assignment.
2408 *
2409 * If a rebalance_cb has been registered we enqueue an op for the app
2410 * and let the app perform the actual assign() call.
2411 * Otherwise we assign() directly from here.
2412 *
2413 * This provides the most flexibility, allowing the app to perform any
2414 * operation it seem fit (e.g., offset writes or reads) before actually
2415 * updating the assign():ment.
2416 */
2417static void
2418rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
2419 rd_kafka_topic_partition_list_t *assignment) {
2420
2421 rd_kafka_rebalance_op(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
2422 assignment, "new assignment");
2423}
2424
2425
2426/**
2427 * Handle HeartbeatResponse errors.
2428 *
2429 * If an IllegalGeneration error code is returned in the
2430 * HeartbeatResponse, it indicates that the co-ordinator has
2431 * initiated a rebalance. The consumer then stops fetching data,
2432 * commits offsets and sends a JoinGroupRequest to it's co-ordinator
2433 * broker */
2434void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
2435 rd_kafka_resp_err_t err) {
2436 const char *reason = NULL;
2437
2438 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2439 "Group \"%s\" heartbeat error response in "
2440 "state %s (join state %s, %d partition(s) assigned): %s",
2441 rkcg->rkcg_group_id->str,
2442 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2443 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2444 rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0,
2445 rd_kafka_err2str(err));
2446
2447 if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
2448 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2449 "Heartbeat response: discarding outdated "
2450 "request (now in join-state %s)",
2451 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
2452 return;
2453 }
2454
2455 switch (err)
2456 {
2457 case RD_KAFKA_RESP_ERR__DESTROY:
2458 /* quick cleanup */
2459 return;
2460
2461 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
2462 case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
2463 case RD_KAFKA_RESP_ERR__TRANSPORT:
2464 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
2465 "Heartbeat failed due to coordinator (%s) "
2466 "no longer available: %s: "
2467 "re-querying for coordinator",
2468 rkcg->rkcg_curr_coord ?
2469 rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
2470 "none",
2471 rd_kafka_err2str(err));
2472 /* Remain in joined state and keep querying for coordinator */
2473 rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0);
2474 return;
2475
2476 case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
2477 /* No further action if already rebalancing */
2478 if (rkcg->rkcg_join_state ==
2479 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
2480 return;
2481 reason = "group is rebalancing";
2482 break;
2483
2484 case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
2485 rd_kafka_cgrp_set_member_id(rkcg, "");
2486 reason = "resetting member-id";
2487 break;
2488
2489 case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
2490 reason = "group is rebalancing";
2491 break;
2492
2493 default:
2494 reason = rd_kafka_err2str(err);
2495 break;
2496 }
2497
2498 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
2499 "Heartbeat failed: %s: %s",
2500 rd_kafka_err2name(err), reason);
2501
2502 rd_kafka_cgrp_rebalance(rkcg, reason);
2503}
2504
2505
2506
2507/**
2508 * Clean up any group-leader related resources.
2509 *
2510 * Locality: cgrp thread
2511 */
2512static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
2513 const char *reason) {
2514 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER",
2515 "Group \"%.*s\": resetting group leader info: %s",
2516 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
2517 if (rkcg->rkcg_group_leader.protocol) {
2518 rd_free(rkcg->rkcg_group_leader.protocol);
2519 rkcg->rkcg_group_leader.protocol = NULL;
2520 }
2521
2522 if (rkcg->rkcg_group_leader.members) {
2523 int i;
2524
2525 for (i = 0 ; i < rkcg->rkcg_group_leader.member_cnt ; i++)
2526 rd_kafka_group_member_clear(&rkcg->rkcg_group_leader.
2527 members[i]);
2528 rkcg->rkcg_group_leader.member_cnt = 0;
2529 rd_free(rkcg->rkcg_group_leader.members);
2530 rkcg->rkcg_group_leader.members = NULL;
2531 }
2532}
2533
2534
2535/**
2536 * @brief Group is rebalancing, trigger rebalance callback to application,
2537 * and transition to INIT state for (eventual) rejoin.
2538 */
2539static void rd_kafka_cgrp_rebalance (rd_kafka_cgrp_t *rkcg,
2540 const char *reason) {
2541
2542 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "REBALANCE",
2543 "Group \"%.*s\" is rebalancing in "
2544 "state %s (join-state %s) %s assignment: %s",
2545 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2546 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2547 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2548 rkcg->rkcg_assignment ? "with" : "without",
2549 reason);
2550
2551 rd_snprintf(rkcg->rkcg_c.rebalance_reason,
2552 sizeof(rkcg->rkcg_c.rebalance_reason), "%s", reason);
2553
2554 /* Remove assignment (async), if any. If there is already an
2555 * unassign in progress we dont need to bother. */
2556 if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
2557 !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
2558 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
2559
2560 rd_kafka_rebalance_op(
2561 rkcg,
2562 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
2563 rkcg->rkcg_assignment, reason);
2564 }
2565}
2566
2567
2568/**
2569 * @brief `max.poll.interval.ms` enforcement check timer.
2570 *
2571 * @locality rdkafka main thread
2572 * @locks none
2573 */
2574static void
2575rd_kafka_cgrp_max_poll_interval_check_tmr_cb (rd_kafka_timers_t *rkts,
2576 void *arg) {
2577 rd_kafka_cgrp_t *rkcg = arg;
2578 rd_kafka_t *rk = rkcg->rkcg_rk;
2579 int exceeded;
2580
2581 exceeded = rd_kafka_max_poll_exceeded(rk);
2582
2583 if (likely(!exceeded))
2584 return;
2585
2586 rd_kafka_log(rk, LOG_WARNING, "MAXPOLL",
2587 "Application maximum poll interval (%dms) "
2588 "exceeded by %dms "
2589 "(adjust max.poll.interval.ms for "
2590 "long-running message processing): "
2591 "leaving group",
2592 rk->rk_conf.max_poll_interval_ms, exceeded);
2593
2594 rd_kafka_q_op_err(rkcg->rkcg_q, RD_KAFKA_OP_CONSUMER_ERR,
2595 RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED, 0, NULL, 0,
2596 "Application maximum poll interval (%dms) "
2597 "exceeded by %dms",
2598 rk->rk_conf.max_poll_interval_ms, exceeded);
2599
2600 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
2601
2602 rd_kafka_timer_stop(rkts, &rkcg->rkcg_max_poll_interval_tmr,
2603 1/*lock*/);
2604
2605 /* Leave the group before calling rebalance since the standard leave
2606 * will be triggered first after the rebalance callback has been served.
2607 * But since the application is blocked still doing processing
2608 * that leave will be further delayed. */
2609 rd_kafka_cgrp_leave(rkcg);
2610
2611 /* Leaving the group invalidates the member id, reset it now
2612 * to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
2613 rd_kafka_cgrp_set_member_id(rkcg, "");
2614
2615 /* Trigger rebalance */
2616 rd_kafka_cgrp_rebalance(rkcg, "max.poll.interval.ms exceeded");
2617}
2618
2619
2620/**
2621 * Remove existing topic subscription.
2622 */
2623static rd_kafka_resp_err_t
2624rd_kafka_cgrp_unsubscribe (rd_kafka_cgrp_t *rkcg, int leave_group) {
2625
2626 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
2627 "Group \"%.*s\": unsubscribe from current %ssubscription "
2628 "of %d topics (leave group=%s, join state %s, v%"PRId32")",
2629 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2630 rkcg->rkcg_subscription ? "" : "unset ",
2631 rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0,
2632 leave_group ? "yes":"no",
2633 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2634 rkcg->rkcg_version);
2635
2636 rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
2637 &rkcg->rkcg_max_poll_interval_tmr, 1/*lock*/);
2638
2639
2640 if (rkcg->rkcg_subscription) {
2641 rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
2642 rkcg->rkcg_subscription = NULL;
2643 }
2644
2645 rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
2646
2647 /*
2648 * Clean-up group leader duties, if any.
2649 */
2650 rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");
2651
2652 if (leave_group)
2653 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
2654
2655 rd_kafka_cgrp_rebalance(rkcg, "unsubscribe");
2656
2657 rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
2658 RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
2659
2660 return RD_KAFKA_RESP_ERR_NO_ERROR;
2661}
2662
2663
2664/**
2665 * Set new atomic topic subscription.
2666 */
2667static rd_kafka_resp_err_t
2668rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg,
2669 rd_kafka_topic_partition_list_t *rktparlist) {
2670
2671 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
2672 "Group \"%.*s\": subscribe to new %ssubscription "
2673 "of %d topics (join state %s)",
2674 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2675 rktparlist ? "" : "unset ",
2676 rktparlist ? rktparlist->cnt : 0,
2677 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
2678
2679 if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0)
2680 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2681
2682 /* Remove existing subscription first */
2683 rd_kafka_cgrp_unsubscribe(rkcg,
2684 rktparlist ?
2685 0/* dont leave group if new subscription */ :
2686 1/* leave group if no new subscription */);
2687
2688 if (!rktparlist)
2689 return RD_KAFKA_RESP_ERR_NO_ERROR;
2690
2691 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
2692
2693 if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
2694 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
2695
2696 rkcg->rkcg_subscription = rktparlist;
2697
2698 rd_kafka_cgrp_join(rkcg);
2699
2700 return RD_KAFKA_RESP_ERR_NO_ERROR;
2701}
2702
2703
2704
2705
2706
2707
2708/**
2709 * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving
2710 * the op 'rko' from cgrp_terminate().
2711 *
2712 * NOTE: Takes ownership of 'rko'
2713 *
2714 * Locality: main thread
2715 */
2716void
2717rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) {
2718
2719 rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
2720
2721 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
2722 "Terminating group \"%.*s\" in state %s "
2723 "with %d partition(s)",
2724 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2725 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2726 rd_list_cnt(&rkcg->rkcg_toppars));
2727
2728 if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM ||
2729 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) ||
2730 rkcg->rkcg_reply_rko != NULL)) {
2731 /* Already terminating or handling a previous terminate */
2732 if (rko) {
2733 rd_kafka_q_t *rkq = rko->rko_replyq.q;
2734 rko->rko_replyq.q = NULL;
2735 rd_kafka_q_op_err(rkq, RD_KAFKA_OP_CONSUMER_ERR,
2736 RD_KAFKA_RESP_ERR__IN_PROGRESS,
2737 rko->rko_replyq.version,
2738 NULL, 0,
2739 "Group is %s",
2740 rkcg->rkcg_reply_rko ?
2741 "terminating":"terminated");
2742 rd_kafka_q_destroy(rkq);
2743 rd_kafka_op_destroy(rko);
2744 }
2745 return;
2746 }
2747
2748 /* Mark for stopping, the actual state transition
2749 * is performed when all toppars have left. */
2750 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE;
2751 rkcg->rkcg_ts_terminate = rd_clock();
2752 rkcg->rkcg_reply_rko = rko;
2753
2754 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION)
2755 rd_kafka_cgrp_unsubscribe(
2756 rkcg,
2757 /* Leave group if this is a controlled shutdown */
2758 !rd_kafka_destroy_flags_no_consumer_close(
2759 rkcg->rkcg_rk));
2760
2761 /* Reset the wait-for-LeaveGroup flag if there is an outstanding
2762 * LeaveGroupRequest being waited on (from a prior unsubscribe), but
2763 * the destroy flags have NO_CONSUMER_CLOSE set, which calls
2764 * for immediate termination. */
2765 if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
2766 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
2767
2768 /* If there's an oustanding rebalance_cb which has not yet been
2769 * served by the application it will be served from consumer_close().
2770 * If the instate is being terminated with NO_CONSUMER_CLOSE we
2771 * trigger unassign directly to avoid stalling on rebalance callback
2772 * queues that are no longer served by the application. */
2773 if ((!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
2774 !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) ||
2775 rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
2776 rd_kafka_cgrp_unassign(rkcg);
2777
2778 /* Try to terminate right away if all preconditions are met. */
2779 rd_kafka_cgrp_try_terminate(rkcg);
2780}
2781
2782
2783/**
2784 * Terminate and decommission a cgrp asynchronously.
2785 *
2786 * Locality: any thread
2787 */
2788void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) {
2789 rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread));
2790 rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0);
2791}
2792
2793
2794struct _op_timeout_offset_commit {
2795 rd_ts_t now;
2796 rd_kafka_t *rk;
2797 rd_list_t expired;
2798};
2799
2800/**
2801 * q_filter callback for expiring OFFSET_COMMIT timeouts.
2802 */
2803static int rd_kafka_op_offset_commit_timeout_check (rd_kafka_q_t *rkq,
2804 rd_kafka_op_t *rko,
2805 void *opaque) {
2806 struct _op_timeout_offset_commit *state =
2807 (struct _op_timeout_offset_commit*)opaque;
2808
2809 if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT ||
2810 rko->rko_u.offset_commit.ts_timeout == 0 ||
2811 rko->rko_u.offset_commit.ts_timeout > state->now)) {
2812 return 0;
2813 }
2814
2815 rd_kafka_q_deq0(rkq, rko);
2816
2817 /* Add to temporary list to avoid recursive
2818 * locking of rkcg_wait_coord_q. */
2819 rd_list_add(&state->expired, rko);
2820 return 1;
2821}
2822
2823
2824/**
2825 * Scan for various timeouts.
2826 */
2827static void rd_kafka_cgrp_timeout_scan (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
2828 struct _op_timeout_offset_commit ofc_state;
2829 int i, cnt = 0;
2830 rd_kafka_op_t *rko;
2831
2832 ofc_state.now = now;
2833 ofc_state.rk = rkcg->rkcg_rk;
2834 rd_list_init(&ofc_state.expired, 0, NULL);
2835
2836 cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q,
2837 rd_kafka_op_offset_commit_timeout_check,
2838 &ofc_state);
2839
2840 RD_LIST_FOREACH(rko, &ofc_state.expired, i)
2841 rd_kafka_cgrp_op_handle_OffsetCommit(
2842 rkcg->rkcg_rk, NULL,
2843 RD_KAFKA_RESP_ERR__WAIT_COORD,
2844 NULL, NULL, rko);
2845
2846 rd_list_destroy(&ofc_state.expired);
2847
2848 if (cnt > 0)
2849 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT",
2850 "Group \"%.*s\": timed out %d op(s), %d remain",
2851 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt,
2852 rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
2853
2854
2855}
2856
2857
2858/**
2859 * @brief Handle cgrp queue op.
2860 * @locality rdkafka main thread
2861 * @locks none
2862 */
2863static rd_kafka_op_res_t
2864rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
2865 rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
2866 void *opaque) {
2867 rd_kafka_cgrp_t *rkcg = opaque;
2868 rd_kafka_toppar_t *rktp;
2869 rd_kafka_resp_err_t err;
2870 const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF;
2871
2872 if (rko->rko_version && rkcg->rkcg_version > rko->rko_version) {
2873 rd_kafka_op_destroy(rko); /* outdated */
2874 return RD_KAFKA_OP_RES_HANDLED;
2875 }
2876
2877 rktp = rko->rko_rktp ? rd_kafka_toppar_s2i(rko->rko_rktp) : NULL;
2878
2879 if (rktp && !silent_op)
2880 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
2881 "Group \"%.*s\" received op %s in state %s "
2882 "(join state %s, v%"PRId32") "
2883 "for %.*s [%"PRId32"]",
2884 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2885 rd_kafka_op2str(rko->rko_type),
2886 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2887 rd_kafka_cgrp_join_state_names[rkcg->
2888 rkcg_join_state],
2889 rkcg->rkcg_version,
2890 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2891 rktp->rktp_partition);
2892 else if (!silent_op)
2893 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
2894 "Group \"%.*s\" received op %s (v%d) in state %s "
2895 "(join state %s, v%"PRId32" vs %"PRId32")",
2896 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2897 rd_kafka_op2str(rko->rko_type),
2898 rko->rko_version,
2899 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2900 rd_kafka_cgrp_join_state_names[rkcg->
2901 rkcg_join_state],
2902 rkcg->rkcg_version, rko->rko_version);
2903
2904 switch ((int)rko->rko_type)
2905 {
2906 case RD_KAFKA_OP_NAME:
2907 /* Return the currently assigned member id. */
2908 if (rkcg->rkcg_member_id)
2909 rko->rko_u.name.str =
2910 RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id);
2911 rd_kafka_op_reply(rko, 0);
2912 rko = NULL;
2913 break;
2914
2915 case RD_KAFKA_OP_OFFSET_FETCH:
2916 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
2917 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
2918 rd_kafka_op_handle_OffsetFetch(
2919 rkcg->rkcg_rk, NULL,
2920 RD_KAFKA_RESP_ERR__WAIT_COORD,
2921 NULL, NULL, rko);
2922 rko = NULL; /* rko freed by handler */
2923 break;
2924 }
2925
2926 rd_kafka_OffsetFetchRequest(
2927 rkcg->rkcg_coord, 1,
2928 rko->rko_u.offset_fetch.partitions,
2929 RD_KAFKA_REPLYQ(rkcg->rkcg_ops,
2930 rkcg->rkcg_version),
2931 rd_kafka_op_handle_OffsetFetch, rko);
2932 rko = NULL; /* rko now owned by request */
2933 break;
2934
2935 case RD_KAFKA_OP_PARTITION_JOIN:
2936 rd_kafka_cgrp_partition_add(rkcg, rktp);
2937
2938 /* If terminating tell the partition to leave */
2939 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
2940 rd_kafka_toppar_op_fetch_stop(
2941 rktp, RD_KAFKA_NO_REPLYQ);
2942 break;
2943
2944 case RD_KAFKA_OP_PARTITION_LEAVE:
2945 rd_kafka_cgrp_partition_del(rkcg, rktp);
2946 break;
2947
2948 case RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY:
2949 /* Reply from toppar FETCH_STOP */
2950 rd_kafka_assert(rkcg->rkcg_rk,
2951 rkcg->rkcg_wait_unassign_cnt > 0);
2952 rkcg->rkcg_wait_unassign_cnt--;
2953
2954 rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_assigned);
2955 rd_kafka_assert(rkcg->rkcg_rk,
2956 rkcg->rkcg_assigned_cnt > 0);
2957 rktp->rktp_assigned = 0;
2958 rkcg->rkcg_assigned_cnt--;
2959
2960 /* All unassigned toppars now stopped and commit done:
2961 * transition to the next state. */
2962 if (rkcg->rkcg_join_state ==
2963 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
2964 rd_kafka_cgrp_check_unassign_done(rkcg,
2965 "FETCH_STOP done");
2966 break;
2967
2968 case RD_KAFKA_OP_OFFSET_COMMIT:
2969 /* Trigger offsets commit. */
2970 rd_kafka_cgrp_offsets_commit(rkcg, rko,
2971 /* only set offsets
2972 * if no partitions were
2973 * specified. */
2974 rko->rko_u.offset_commit.
2975 partitions ? 0 : 1,
2976 rko->rko_u.offset_commit.reason,
2977 0);
2978 rko = NULL; /* rko now owned by request */
2979 break;
2980
2981 case RD_KAFKA_OP_COORD_QUERY:
2982 rd_kafka_cgrp_coord_query(rkcg,
2983 rko->rko_err ?
2984 rd_kafka_err2str(rko->
2985 rko_err):
2986 "from op");
2987 break;
2988
2989 case RD_KAFKA_OP_SUBSCRIBE:
2990 rd_kafka_app_polled(rk);
2991
2992 /* New atomic subscription (may be NULL) */
2993 err = rd_kafka_cgrp_subscribe(
2994 rkcg, rko->rko_u.subscribe.topics);
2995 if (!err)
2996 rko->rko_u.subscribe.topics = NULL; /* owned by rkcg */
2997 rd_kafka_op_reply(rko, err);
2998 rko = NULL;
2999 break;
3000
3001 case RD_KAFKA_OP_ASSIGN:
3002 /* New atomic assignment (payload != NULL),
3003 * or unassignment (payload == NULL) */
3004 err = 0;
3005 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
3006 /* Treat all assignments as unassign
3007 * when terminating. */
3008 rd_kafka_cgrp_unassign(rkcg);
3009 if (rko->rko_u.assign.partitions)
3010 err = RD_KAFKA_RESP_ERR__DESTROY;
3011 } else {
3012 rd_kafka_cgrp_assign(
3013 rkcg, rko->rko_u.assign.partitions);
3014 }
3015 rd_kafka_op_reply(rko, err);
3016 rko = NULL;
3017 break;
3018
3019 case RD_KAFKA_OP_GET_SUBSCRIPTION:
3020 if (rkcg->rkcg_subscription)
3021 rko->rko_u.subscribe.topics =
3022 rd_kafka_topic_partition_list_copy(
3023 rkcg->rkcg_subscription);
3024 rd_kafka_op_reply(rko, 0);
3025 rko = NULL;
3026 break;
3027
3028 case RD_KAFKA_OP_GET_ASSIGNMENT:
3029 if (rkcg->rkcg_assignment)
3030 rko->rko_u.assign.partitions =
3031 rd_kafka_topic_partition_list_copy(
3032 rkcg->rkcg_assignment);
3033
3034 rd_kafka_op_reply(rko, 0);
3035 rko = NULL;
3036 break;
3037
3038 case RD_KAFKA_OP_TERMINATE:
3039 rd_kafka_cgrp_terminate0(rkcg, rko);
3040 rko = NULL; /* terminate0() takes ownership */
3041 break;
3042
3043 default:
3044 rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type");
3045 break;
3046 }
3047
3048 if (rko)
3049 rd_kafka_op_destroy(rko);
3050
3051 return RD_KAFKA_OP_RES_HANDLED;
3052}
3053
3054
3055/**
3056 * Client group's join state handling
3057 */
3058static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
3059
3060 switch (rkcg->rkcg_join_state)
3061 {
3062 case RD_KAFKA_CGRP_JOIN_STATE_INIT:
3063 /* If we have a subscription start the join process. */
3064 if (!rkcg->rkcg_subscription)
3065 break;
3066
3067 if (rd_interval_immediate(&rkcg->rkcg_join_intvl,
3068 1000*1000, 0) > 0)
3069 rd_kafka_cgrp_join(rkcg);
3070 break;
3071
3072 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:
3073 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA:
3074 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC:
3075 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN:
3076 break;
3077
3078 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB:
3079 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB:
3080 case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED:
3081 case RD_KAFKA_CGRP_JOIN_STATE_STARTED:
3082 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
3083 rd_interval(&rkcg->rkcg_heartbeat_intvl,
3084 rkcg->rkcg_rk->rk_conf.
3085 group_heartbeat_intvl_ms * 1000, 0) > 0)
3086 rd_kafka_cgrp_heartbeat(rkcg);
3087 break;
3088 }
3089
3090}
3091/**
3092 * Client group handling.
3093 * Called from main thread to serve the operational aspects of a cgrp.
3094 */
3095void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) {
3096 rd_kafka_broker_t *rkb = rkcg->rkcg_coord;
3097 int rkb_state = RD_KAFKA_BROKER_STATE_INIT;
3098 rd_ts_t now;
3099
3100 if (rkb) {
3101 rd_kafka_broker_lock(rkb);
3102 rkb_state = rkb->rkb_state;
3103 rd_kafka_broker_unlock(rkb);
3104
3105 /* Go back to querying state if we lost the current coordinator
3106 * connection. */
3107 if (rkb_state < RD_KAFKA_BROKER_STATE_UP &&
3108 rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
3109 rd_kafka_cgrp_set_state(rkcg,
3110 RD_KAFKA_CGRP_STATE_QUERY_COORD);
3111 }
3112
3113 now = rd_clock();
3114
3115 /* Check for cgrp termination */
3116 if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) {
3117 rd_kafka_cgrp_terminated(rkcg);
3118 return; /* cgrp terminated */
3119 }
3120
3121 /* Bail out if we're terminating. */
3122 if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
3123 return;
3124
3125 retry:
3126 switch (rkcg->rkcg_state)
3127 {
3128 case RD_KAFKA_CGRP_STATE_TERM:
3129 break;
3130
3131 case RD_KAFKA_CGRP_STATE_INIT:
3132 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
3133 /* FALLTHRU */
3134
3135 case RD_KAFKA_CGRP_STATE_QUERY_COORD:
3136 /* Query for coordinator. */
3137 if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl,
3138 500*1000, now) > 0)
3139 rd_kafka_cgrp_coord_query(rkcg,
3140 "intervaled in "
3141 "state query-coord");
3142 break;
3143
3144 case RD_KAFKA_CGRP_STATE_WAIT_COORD:
3145 /* Waiting for GroupCoordinator response */
3146 break;
3147
3148 case RD_KAFKA_CGRP_STATE_WAIT_BROKER:
3149 /* See if the group should be reassigned to another broker. */
3150 if (rd_kafka_cgrp_coord_update(rkcg, rkcg->rkcg_coord_id))
3151 goto retry; /* Coordinator changed, retry state-machine
3152 * to speed up next transition. */
3153
3154 /* Coordinator query */
3155 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
3156 1000*1000, now) > 0)
3157 rd_kafka_cgrp_coord_query(rkcg,
3158 "intervaled in "
3159 "state wait-broker");
3160 break;
3161
3162 case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT:
3163 /* Waiting for broker transport to come up.
3164 * Also make sure broker supports groups. */
3165 if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb ||
3166 !rd_kafka_broker_supports(
3167 rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) {
3168 /* Coordinator query */
3169 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
3170 1000*1000, now) > 0)
3171 rd_kafka_cgrp_coord_query(
3172 rkcg,
3173 "intervaled in state "
3174 "wait-broker-transport");
3175
3176 } else {
3177 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP);
3178
3179 /* Serve join state to trigger (re)join */
3180 rd_kafka_cgrp_join_state_serve(rkcg);
3181
3182 /* Start fetching if we have an assignment. */
3183 if (rkcg->rkcg_assignment &&
3184 RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
3185 rd_kafka_cgrp_partitions_fetch_start(
3186 rkcg, rkcg->rkcg_assignment, 0);
3187 }
3188 break;
3189
3190 case RD_KAFKA_CGRP_STATE_UP:
3191 /* Move any ops awaiting the coordinator to the ops queue
3192 * for reprocessing. */
3193 rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q);
3194
3195 /* Relaxed coordinator queries. */
3196 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
3197 rkcg->rkcg_rk->rk_conf.
3198 coord_query_intvl_ms * 1000, now) > 0)
3199 rd_kafka_cgrp_coord_query(rkcg,
3200 "intervaled in state up");
3201
3202 rd_kafka_cgrp_join_state_serve(rkcg);
3203 break;
3204
3205 }
3206
3207 if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP &&
3208 rd_interval(&rkcg->rkcg_timeout_scan_intvl,
3209 1000*1000, now) > 0))
3210 rd_kafka_cgrp_timeout_scan(rkcg, now);
3211}
3212
3213
3214
3215
3216
3217/**
3218 * Send an op to a cgrp.
3219 *
3220 * Locality: any thread
3221 */
3222void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
3223 rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
3224 rd_kafka_resp_err_t err) {
3225 rd_kafka_op_t *rko;
3226
3227 rko = rd_kafka_op_new(type);
3228 rko->rko_err = err;
3229 rko->rko_replyq = replyq;
3230
3231 if (rktp)
3232 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
3233
3234 rd_kafka_q_enq(rkcg->rkcg_ops, rko);
3235}
3236
3237
3238
3239
3240
3241
3242
3243void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){
3244 if (rkcg->rkcg_member_id && member_id &&
3245 !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id))
3246 return; /* No change */
3247
3248 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID",
3249 "Group \"%.*s\": updating member id \"%s\" -> \"%s\"",
3250 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3251 rkcg->rkcg_member_id ?
3252 rkcg->rkcg_member_id->str : "(not-set)",
3253 member_id ? member_id : "(not-set)");
3254
3255 if (rkcg->rkcg_member_id) {
3256 rd_kafkap_str_destroy(rkcg->rkcg_member_id);
3257 rkcg->rkcg_member_id = NULL;
3258 }
3259
3260 if (member_id)
3261 rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1);
3262}
3263
3264
3265
3266
3267/**
3268 * @brief Check if the latest metadata affects the current subscription:
3269 * - matched topic added
3270 * - matched topic removed
3271 * - matched topic's partition count change
3272 *
3273 * @locks none
3274 * @locality rdkafka main thread
3275 */
3276void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) {
3277 rd_list_t *tinfos;
3278
3279 rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
3280
3281 if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
3282 return;
3283
3284 /*
3285 * Create a list of the topics in metadata that matches our subscription
3286 */
3287 tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
3288 (void *)rd_kafka_topic_info_destroy);
3289
3290 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
3291 rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
3292 tinfos, rkcg->rkcg_subscription);
3293 else
3294 rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
3295 tinfos,
3296 rkcg->rkcg_subscription);
3297
3298
3299 /*
3300 * Update (takes ownership of \c tinfos)
3301 */
3302 if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && do_join) {
3303 /* List of subscribed topics changed, trigger rejoin. */
3304 rd_kafka_dbg(rkcg->rkcg_rk,
3305 CGRP|RD_KAFKA_DBG_METADATA|RD_KAFKA_DBG_CONSUMER,
3306 "REJOIN",
3307 "Group \"%.*s\": "
3308 "subscription updated from metadata change: "
3309 "rejoining group",
3310 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
3311 rd_kafka_cgrp_rejoin(rkcg);
3312 }
3313}
3314
3315
3316void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
3317 rd_kafka_broker_t *rkb,
3318 rd_kafka_resp_err_t err,
3319 const rd_kafkap_bytes_t *member_state) {
3320 rd_kafka_buf_t *rkbuf = NULL;
3321 rd_kafka_topic_partition_list_t *assignment;
3322 const int log_decode_errors = LOG_ERR;
3323 int16_t Version;
3324 int32_t TopicCnt;
3325 rd_kafkap_bytes_t UserData;
3326
3327 /* Dont handle new assignments when terminating */
3328 if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
3329 err = RD_KAFKA_RESP_ERR__DESTROY;
3330
3331 if (err)
3332 goto err;
3333
3334
3335 if (RD_KAFKAP_BYTES_LEN(member_state) == 0) {
3336 /* Empty assignment. */
3337 assignment = rd_kafka_topic_partition_list_new(0);
3338 memset(&UserData, 0, sizeof(UserData));
3339 goto done;
3340 }
3341
3342 /* Parse assignment from MemberState */
3343 rkbuf = rd_kafka_buf_new_shadow(member_state->data,
3344 RD_KAFKAP_BYTES_LEN(member_state),
3345 NULL);
3346 /* Protocol parser needs a broker handle to log errors on. */
3347 if (rkb) {
3348 rkbuf->rkbuf_rkb = rkb;
3349 rd_kafka_broker_keep(rkb);
3350 } else
3351 rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
3352
3353 rd_kafka_buf_read_i16(rkbuf, &Version);
3354 rd_kafka_buf_read_i32(rkbuf, &TopicCnt);
3355
3356 if (TopicCnt > 10000) {
3357 err = RD_KAFKA_RESP_ERR__BAD_MSG;
3358 goto err;
3359 }
3360
3361 assignment = rd_kafka_topic_partition_list_new(TopicCnt);
3362 while (TopicCnt-- > 0) {
3363 rd_kafkap_str_t Topic;
3364 int32_t PartCnt;
3365 rd_kafka_buf_read_str(rkbuf, &Topic);
3366 rd_kafka_buf_read_i32(rkbuf, &PartCnt);
3367 while (PartCnt-- > 0) {
3368 int32_t Partition;
3369 char *topic_name;
3370 RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
3371 rd_kafka_buf_read_i32(rkbuf, &Partition);
3372
3373 rd_kafka_topic_partition_list_add(
3374 assignment, topic_name, Partition);
3375 }
3376 }
3377
3378 rd_kafka_buf_read_bytes(rkbuf, &UserData);
3379
3380 done:
3381 /* Set the new assignment */
3382 rd_kafka_cgrp_handle_assignment(rkcg, assignment);
3383
3384 rd_kafka_topic_partition_list_destroy(assignment);
3385
3386 if (rkbuf)
3387 rd_kafka_buf_destroy(rkbuf);
3388
3389 return;
3390
3391 err_parse:
3392 err = rkbuf->rkbuf_err;
3393
3394 err:
3395 if (rkbuf)
3396 rd_kafka_buf_destroy(rkbuf);
3397
3398 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC",
3399 "Group \"%s\": synchronization failed: %s: rejoining",
3400 rkcg->rkcg_group_id->str, rd_kafka_err2str(err));
3401 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
3402}
3403