1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#ifndef _RDKAFKA_CGRP_H_
29#define _RDKAFKA_CGRP_H_
30
31#include "rdinterval.h"
32
33#include "rdkafka_assignor.h"
34
35/**
36 * Client groups implementation
37 *
38 * Client groups handling for a single cgrp is assigned to a single
39 * rd_kafka_broker_t object at any given time.
40 * The main thread will call cgrp_serve() to serve its cgrps.
41 *
42 * This means that the cgrp itself does not need to be locked since it
43 * is only ever used from the main thread.
44 *
45 */
46
47
48extern const char *rd_kafka_cgrp_join_state_names[];
49
50/**
51 * Client group
52 */
53typedef struct rd_kafka_cgrp_s {
54 const rd_kafkap_str_t *rkcg_group_id;
55 rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */
56 const rd_kafkap_str_t *rkcg_client_id;
57
58 enum {
59 /* Init state */
60 RD_KAFKA_CGRP_STATE_INIT,
61
62 /* Cgrp has been stopped. This is a final state */
63 RD_KAFKA_CGRP_STATE_TERM,
64
65 /* Query for group coordinator */
66 RD_KAFKA_CGRP_STATE_QUERY_COORD,
67
68 /* Outstanding query, awaiting response */
69 RD_KAFKA_CGRP_STATE_WAIT_COORD,
70
71 /* Wait ack from assigned cgrp manager broker thread */
72 RD_KAFKA_CGRP_STATE_WAIT_BROKER,
73
74 /* Wait for manager broker thread to connect to broker */
75 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,
76
77 /* Coordinator is up and manager is assigned. */
78 RD_KAFKA_CGRP_STATE_UP,
79 } rkcg_state;
80 rd_ts_t rkcg_ts_statechange; /* Timestamp of last
81 * state change. */
82
83
84 enum {
85 RD_KAFKA_CGRP_JOIN_STATE_INIT,
86
87 /* all: JoinGroupRequest sent, awaiting response. */
88 RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,
89
90 /* Leader: MetadataRequest sent, awaiting response. */
91 RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,
92
93 /* Follower: SyncGroupRequest sent, awaiting response. */
94 RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,
95
96 /* all: waiting for previous assignment to decommission */
97 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN,
98
99 /* all: waiting for application's rebalance_cb to assign() */
100 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB,
101
102 /* all: waiting for application's rebalance_cb to revoke */
103 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB,
104
105 /* all: synchronized and assigned
106 * may be an empty assignment. */
107 RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED,
108
109 /* all: fetchers are started and operational */
110 RD_KAFKA_CGRP_JOIN_STATE_STARTED
111 } rkcg_join_state;
112
113 /* State when group leader */
114 struct {
115 char *protocol;
116 rd_kafka_group_member_t *members;
117 int member_cnt;
118 } rkcg_group_leader;
119
120 rd_kafka_q_t *rkcg_q; /* Application poll queue */
121 rd_kafka_q_t *rkcg_ops; /* Manager ops queue */
122 rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */
123 int32_t rkcg_version; /* Ops queue version barrier
124 * Increased by:
125 * Rebalance delegation
126 * Assign/Unassign
127 */
128 mtx_t rkcg_lock;
129
130 int rkcg_flags;
131#define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */
132#define RD_KAFKA_CGRP_F_WAIT_UNASSIGN 0x4 /* Waiting for unassign
133 * to complete */
134#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN 0x8 /* Send LeaveGroup when
135 * unassign is done */
136#define RD_KAFKA_CGRP_F_SUBSCRIPTION 0x10 /* If set:
137 * subscription
138 * else:
139 * static assignment */
140#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT 0x20 /* A Heartbeat request
141 * is in transit, dont
142 * send a new one. */
143#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION 0x40 /* Subscription contains
144 * wildcards. */
145#define RD_KAFKA_CGRP_F_WAIT_LEAVE 0x80 /* Wait for LeaveGroup
146 * to be sent.
147 * This is used to stall
148 * termination until
149 * the LeaveGroupRequest
150 * is responded to,
151 * otherwise it risks
152 * being dropped in the
153 * output queue when
154 * the broker is destroyed.
155 */
156#define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED 0x100 /**< max.poll.interval.ms
157 * was exceeded and we
158 * left the group.
159 * Do not rejoin until
160 * the application has
161 * polled again. */
162
163 rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/
164 rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */
165 rd_interval_t rkcg_join_intvl; /* JoinGroup interval */
166 rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */
167
168 TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics;/* Topics subscribed to */
169
170 rd_list_t rkcg_toppars; /* Toppars subscribed to*/
171
172 int rkcg_assigned_cnt; /* Assigned partitions */
173
174 int32_t rkcg_generation_id; /* Current generation id */
175
176 rd_kafka_assignor_t *rkcg_assignor; /* Selected partition
177 * assignor strategy. */
178
179 int32_t rkcg_coord_id; /**< Current coordinator id,
180 * or -1 if not known. */
181
182 rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator
183 * broker handle, or NULL.
184 * rkcg_coord's nodename is
185 * updated to this broker's
186 * nodename when there is a
187 * coordinator change. */
188 rd_kafka_broker_t *rkcg_coord; /**< The dedicated coordinator
189 * broker handle.
190 * Will be updated when the
191 * coordinator changes. */
192
193 /* Current subscription */
194 rd_kafka_topic_partition_list_t *rkcg_subscription;
195 /* The actual topics subscribed (after metadata+wildcard matching) */
196 rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
197
198 /* Current assignment */
199 rd_kafka_topic_partition_list_t *rkcg_assignment;
200
201 int rkcg_wait_unassign_cnt; /* Waiting for this number
202 * of partitions to be
203 * unassigned and
204 * decommissioned before
205 * transitioning to the
206 * next state. */
207
208 int rkcg_wait_commit_cnt; /* Waiting for this number
209 * of commits to finish. */
210
211 rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to
212 * application.
213 * This is for silencing
214 * same errors. */
215
216 rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */
217 rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max
218 * poll interval. */
219
220 rd_kafka_t *rkcg_rk;
221
222 rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op
223 * (OP_TERMINATE)
224 * to this rko's queue. */
225
226 rd_ts_t rkcg_ts_terminate; /* Timestamp of when
227 * cgrp termination was
228 * initiated. */
229
230 /* Protected by rd_kafka_*lock() */
231 struct {
232 rd_ts_t ts_rebalance; /* Timestamp of
233 * last rebalance */
234 int rebalance_cnt; /* Number of
235 rebalances */
236 char rebalance_reason[128]; /**< Last rebalance
237 * reason */
238 int assignment_size; /* Partition count
239 * of last rebalance
240 * assignment */
241 } rkcg_c;
242
243} rd_kafka_cgrp_t;
244
245
246
247
248#define rd_kafka_cgrp_lock(rkcg) mtx_lock(&(rkcg)->rkcg_lock)
249#define rd_kafka_cgrp_unlock(rkcg) mtx_unlock(&(rkcg)->rkcg_lock)
250
251/* Check if broker is the coordinator */
252#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,rkb) \
253 ((rkcg)->rkcg_coord_id != -1 && \
254 (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)
255
256extern const char *rd_kafka_cgrp_state_names[];
257extern const char *rd_kafka_cgrp_join_state_names[];
258
259void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg);
260rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
261 const rd_kafkap_str_t *group_id,
262 const rd_kafkap_str_t *client_id);
263void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg);
264
265void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
266 rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
267 rd_kafka_resp_err_t err);
268void rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
269void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);
270
271
272rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del (rd_kafka_cgrp_t *rkcg,
273 const char *pattern);
274rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add (rd_kafka_cgrp_t *rkcg,
275 const char *pattern);
276
277int rd_kafka_cgrp_topic_check (rd_kafka_cgrp_t *rkcg, const char *topic);
278
279void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id);
280
281void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
282 rd_kafka_resp_err_t err);
283
284void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
285 rd_kafka_broker_t *rkb,
286 rd_kafka_resp_err_t err,
287 const rd_kafkap_bytes_t *member_state);
288void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state);
289
290void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
291 const char *reason);
292void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
293 const char *reason);
294void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join);
295#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)
296
297#endif /* _RDKAFKA_CGRP_H_ */
298