1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_BROKER_H_
30#define _RDKAFKA_BROKER_H_
31
32#include "rdkafka_feature.h"
33
34
35extern const char *rd_kafka_broker_state_names[];
36extern const char *rd_kafka_secproto_names[];
37
38struct rd_kafka_broker_s { /* rd_kafka_broker_t */
39 TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
40
41 int32_t rkb_nodeid;
42#define RD_KAFKA_NODEID_UA -1
43
44 rd_sockaddr_list_t *rkb_rsal;
45 rd_ts_t rkb_ts_rsal_last;
46 const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */
47
48 rd_kafka_transport_t *rkb_transport;
49
50 uint32_t rkb_corrid;
51 int rkb_connid; /* Connection id, increased by
52 * one for each connection by
53 * this broker. Used as a safe-guard
54 * to help troubleshooting buffer
55 * problems across disconnects. */
56
57 rd_kafka_q_t *rkb_ops;
58
59 mtx_t rkb_lock;
60
61 int rkb_blocking_max_ms; /* Maximum IO poll blocking
62 * time. */
63
64 /* Toppars handled by this broker */
65 TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
66 int rkb_toppar_cnt;
67
68 /* Active toppars that are eligible for:
69 * - (consumer) fetching due to underflow
70 * - (producer) producing
71 *
72 * The circleq provides round-robin scheduling for both cases.
73 */
74 CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars;
75 int rkb_active_toppar_cnt;
76 rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar
77 * in fetch list.
78 * This is used for
79 * round-robin. */
80
81
82 rd_kafka_cgrp_t *rkb_cgrp;
83
84 rd_ts_t rkb_ts_fetch_backoff;
85 int rkb_fetching;
86
87 enum {
88 RD_KAFKA_BROKER_STATE_INIT,
89 RD_KAFKA_BROKER_STATE_DOWN,
90 RD_KAFKA_BROKER_STATE_TRY_CONNECT,
91 RD_KAFKA_BROKER_STATE_CONNECT,
92 RD_KAFKA_BROKER_STATE_AUTH,
93
94 /* Any state >= STATE_UP means the Kafka protocol layer
95 * is operational (to some degree). */
96 RD_KAFKA_BROKER_STATE_UP,
97 RD_KAFKA_BROKER_STATE_UPDATE,
98 RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
99 RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
100 } rkb_state;
101
102 rd_ts_t rkb_ts_state; /* Timestamp of last
103 * state change */
104 rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan
105 * interval. */
106
107 rd_atomic32_t rkb_blocking_request_cnt; /* The number of
108 * in-flight blocking
109 * requests.
110 * A blocking request is
111 * one that is known to
112 * possibly block on the
113 * broker for longer than
114 * the typical processing
115 * time, e.g.:
116 * JoinGroup, SyncGroup */
117
118 int rkb_features; /* Protocol features supported
119 * by this broker.
120 * See RD_KAFKA_FEATURE_* in
121 * rdkafka_proto.h */
122
123 struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
124 * (MUST be sorted) */
125 size_t rkb_ApiVersions_cnt;
126 rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long
127 * the fallback proto
128 * will be used after
129 * ApiVersionRequest
130 * failure. */
131
132 rd_kafka_confsource_t rkb_source;
133 struct {
134 rd_atomic64_t tx_bytes;
135 rd_atomic64_t tx; /**< Kafka requests */
136 rd_atomic64_t tx_err;
137 rd_atomic64_t tx_retries;
138 rd_atomic64_t req_timeouts; /* Accumulated value */
139
140 rd_atomic64_t rx_bytes;
141 rd_atomic64_t rx; /**< Kafka responses */
142 rd_atomic64_t rx_err;
143 rd_atomic64_t rx_corrid_err; /* CorrId misses */
144 rd_atomic64_t rx_partial; /* Partial messages received
145 * and dropped. */
146 rd_atomic64_t zbuf_grow; /* Compression/decompression buffer grows needed */
147 rd_atomic64_t buf_grow; /* rkbuf grows needed */
148 rd_atomic64_t wakeups; /* Poll wakeups */
149
150 rd_atomic32_t connects; /**< Connection attempts,
151 * successful or not. */
152
153 rd_atomic32_t disconnects; /**< Disconnects.
154 * Always peer-triggered. */
155
156 rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type
157 * counter */
158 } rkb_c;
159
160 int rkb_req_timeouts; /* Current value */
161
162 rd_ts_t rkb_ts_tx_last; /**< Timestamp of last
163 * transmitted requested */
164
165 rd_ts_t rkb_ts_metadata_poll; /* Next metadata poll time */
166 int rkb_metadata_fast_poll_cnt; /* Perform fast
167 * metadata polls. */
168 thrd_t rkb_thread;
169
170 rd_refcnt_t rkb_refcnt;
171
172 rd_kafka_t *rkb_rk;
173
174 rd_kafka_buf_t *rkb_recv_buf;
175
176 int rkb_max_inflight; /* Maximum number of in-flight
177 * requests to broker.
178 * Compared to rkb_waitresps length.*/
179 rd_kafka_bufq_t rkb_outbufs;
180 rd_kafka_bufq_t rkb_waitresps;
181 rd_kafka_bufq_t rkb_retrybufs;
182
183 rd_avg_t rkb_avg_int_latency;/* Current internal latency period*/
184 rd_avg_t rkb_avg_outbuf_latency; /**< Current latency
185 * between buf_enq0
186 * and writing to socket
187 */
188 rd_avg_t rkb_avg_rtt; /* Current RTT period */
189 rd_avg_t rkb_avg_throttle; /* Current throttle period */
190
191 /* These are all protected by rkb_lock */
192 char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */
193 char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
194 uint16_t rkb_port; /* TCP port */
195 char *rkb_origname; /* Original
196 * host name */
197 int rkb_nodename_epoch; /**< Bumped each time
198 * the nodename is changed.
199 * Compared to
200 * rkb_connect_epoch
201 * to trigger a reconnect
202 * for logical broker
203 * when the nodename is
204 * updated. */
205 int rkb_connect_epoch; /**< The value of
206 * rkb_nodename_epoch at the
207 * last connection attempt.
208 */
209
210 /* Logging name is a copy of rkb_name, protected by its own mutex */
211 char *rkb_logname;
212 mtx_t rkb_logname_lock;
213
214 int rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake
215 * up from IO-wait when
216 * queues have content. */
217 int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
218 * this is rkb_wakeup_fd[1]
219 * if enabled. */
220
221 /**< Current, exponentially increased, reconnect backoff. */
222 int rkb_reconnect_backoff_ms;
223
224 /**< Absolute timestamp of next allowed reconnect. */
225 rd_ts_t rkb_ts_reconnect;
226
227 /**< Persistent connection demand is tracked by
228 * an counter for each type of demand.
229 * The broker thread will maintain a persistent connection
230 * if any of the counters are non-zero, and revert to
231 * on-demand mode when they all reach zero.
232 * After incrementing any of the counters a broker wakeup
233 * should be signalled to expedite handling. */
234 struct {
235 /**< Producer: partitions are being produced to.
236 * Consumer: partitions are being fetched from.
237 *
238 * Counter is maintained by the broker handler thread
239 * itself, no need for atomic/locking.
240 * Is reset to 0 on each producer|consumer_serve() loop
241 * and updated according to current need, which
242 * will trigger a state transition to
243 * TRY_CONNECT if a connection is needed. */
244 int internal;
245
246 /**< Consumer: Broker is the group coordinator.
247 *
248 * Counter is maintained by cgrp logic in
249 * rdkafka main thread. */
250 rd_atomic32_t coord;
251 } rkb_persistconn;
252
253 rd_kafka_secproto_t rkb_proto;
254
255 int rkb_down_reported; /* Down event reported */
256#if WITH_SASL_CYRUS
257 rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr;
258#endif
259
260
261 /*
262 * Log suppression
263 */
264 struct {
265 /**< Log: compression type not supported by broker. */
266 rd_interval_t unsupported_compression;
267
268 /**< Log: KIP-62 not supported by broker. */
269 rd_interval_t unsupported_kip62;
270 } rkb_suppress;
271
272 struct {
273 char msg[512];
274 int err; /* errno */
275 } rkb_err;
276};
277
278#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
279#define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock)
280#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
281
282
283/**
284 * @returns true if the broker state is UP or UPDATE
285 */
286#define rd_kafka_broker_state_is_up(state) \
287 ((state) == RD_KAFKA_BROKER_STATE_UP || \
288 (state) == RD_KAFKA_BROKER_STATE_UPDATE)
289
290/**
291 * @brief Broker comparator
292 */
293static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a,
294 const void *_b) {
295 const rd_kafka_broker_t *a = _a, *b = _b;
296 return (int)(a - b);
297}
298
299
300/**
301 * @returns true if broker supports \p features, else false.
302 */
303static RD_UNUSED
304int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) {
305 int r;
306 rd_kafka_broker_lock(rkb);
307 r = (rkb->rkb_features & features) == features;
308 rd_kafka_broker_unlock(rkb);
309 return r;
310}
311
312int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
313 int16_t ApiKey,
314 int16_t minver, int16_t maxver,
315 int *featuresp);
316
317int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb);
318
319rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
320 int32_t nodeid,
321 int state,
322 rd_bool_t do_connect);
323#define rd_kafka_broker_find_by_nodeid(rk,nodeid) \
324 rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1,rd_false)
325
326/**
327 * Filter out brokers that are currently in a blocking request.
328 */
329static RD_INLINE RD_UNUSED int
330rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) {
331 return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0;
332}
333
334
335/**
336 * Filter out brokers that don't support Idempotent Producer.
337 */
338static RD_INLINE RD_UNUSED int
339rd_kafka_broker_filter_non_idempotent (rd_kafka_broker_t *rkb, void *opaque) {
340 return !(rkb->rkb_features & RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER);
341}
342
343
344/**
345 * Filter out brokers that cant do GroupCoordinator requests right now.
346 */
347static RD_INLINE RD_UNUSED int
348rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) {
349 return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 ||
350 !(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD);
351}
352
353rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
354 int (*filter) (rd_kafka_broker_t *rkb,
355 void *opaque),
356 void *opaque, const char *reason);
357
358rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms,
359 int do_lock, const char *reason);
360
361rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id,
362 int state);
363
364rd_kafka_broker_t *
365rd_kafka_broker_get_async (rd_kafka_t *rk, int32_t broker_id, int state,
366 rd_kafka_enq_once_t *eonce);
367
368rd_kafka_broker_t *rd_kafka_broker_controller (rd_kafka_t *rk, int state,
369 rd_ts_t abs_timeout);
370rd_kafka_broker_t *
371rd_kafka_broker_controller_async (rd_kafka_t *rk, int state,
372 rd_kafka_enq_once_t *eonce);
373
374int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
375void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
376
377void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
378 int level, rd_kafka_resp_err_t err,
379 const char *fmt, ...);
380
381void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb,
382 rd_kafka_resp_err_t err,
383 const char *errstr);
384
385void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);
386
387#define rd_kafka_broker_destroy(rkb) \
388 rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \
389 rd_kafka_broker_destroy_final(rkb))
390
391
392void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
393 const struct rd_kafka_metadata_broker *mdb);
394rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
395 rd_kafka_confsource_t source,
396 rd_kafka_secproto_t proto,
397 const char *name, uint16_t port,
398 int32_t nodeid);
399
400rd_kafka_broker_t *rd_kafka_broker_add_logical (rd_kafka_t *rk,
401 const char *name);
402
403/** @define returns true if broker is logical. No locking is needed. */
404#define RD_KAFKA_BROKER_IS_LOGICAL(rkb) ((rkb)->rkb_source == RD_KAFKA_LOGICAL)
405
406void rd_kafka_broker_set_nodename (rd_kafka_broker_t *rkb,
407 rd_kafka_broker_t *from_rkb);
408
409void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb);
410void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr);
411
412int rd_kafka_send (rd_kafka_broker_t *rkb);
413int rd_kafka_recv (rd_kafka_broker_t *rkb);
414
415void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
416 rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err);
417
418void rd_kafka_dr_implicit_ack (rd_kafka_broker_t *rkb,
419 rd_kafka_toppar_t *rktp,
420 uint64_t last_msgid);
421
422void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
423 rd_kafka_buf_t *rkbuf,
424 rd_kafka_resp_cb_t *resp_cb,
425 void *opaque);
426
427void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
428 rd_kafka_buf_t *rkbuf,
429 rd_kafka_replyq_t replyq,
430 rd_kafka_resp_cb_t *resp_cb,
431 void *opaque);
432
433void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
434
435
436rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk);
437
438void msghdr_print (rd_kafka_t *rk,
439 const char *what, const struct msghdr *msg,
440 int hexdump);
441
442const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb);
443void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb);
444int rd_kafka_all_brokers_wakeup (rd_kafka_t *rk,
445 int min_state);
446
447void rd_kafka_connect_any (rd_kafka_t *rk, const char *reason);
448
449void rd_kafka_broker_purge_queues (rd_kafka_broker_t *rkb, int purge_flags,
450 rd_kafka_replyq_t replyq);
451
452int rd_kafka_brokers_get_state_version (rd_kafka_t *rk);
453int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
454 int timeout_ms);
455int rd_kafka_brokers_wait_state_change_async (rd_kafka_t *rk,
456 int stored_version,
457 rd_kafka_enq_once_t *eonce);
458void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk);
459
460
461
462/**
463 * Updates the current toppar active round-robin next pointer.
464 */
465static RD_INLINE RD_UNUSED
466void rd_kafka_broker_active_toppar_next (rd_kafka_broker_t *rkb,
467 rd_kafka_toppar_t *sugg_next) {
468 if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) ||
469 (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars))
470 rkb->rkb_active_toppar_next = NULL;
471 else if (sugg_next)
472 rkb->rkb_active_toppar_next = sugg_next;
473 else
474 rkb->rkb_active_toppar_next =
475 CIRCLEQ_FIRST(&rkb->rkb_active_toppars);
476}
477
478
479void rd_kafka_broker_active_toppar_add (rd_kafka_broker_t *rkb,
480 rd_kafka_toppar_t *rktp);
481
482void rd_kafka_broker_active_toppar_del (rd_kafka_broker_t *rkb,
483 rd_kafka_toppar_t *rktp);
484
485
486void rd_kafka_broker_schedule_connection (rd_kafka_broker_t *rkb);
487
488void
489rd_kafka_broker_persistent_connection_add (rd_kafka_broker_t *rkb,
490 rd_atomic32_t *acntp);
491
492void
493rd_kafka_broker_persistent_connection_del (rd_kafka_broker_t *rkb,
494 rd_atomic32_t *acntp);
495
496
497int unittest_broker (void);
498
499#endif /* _RDKAFKA_BROKER_H_ */
500