1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #ifndef _RDKAFKA_BROKER_H_ |
30 | #define _RDKAFKA_BROKER_H_ |
31 | |
32 | #include "rdkafka_feature.h" |
33 | |
34 | |
35 | extern const char *rd_kafka_broker_state_names[]; |
36 | extern const char *rd_kafka_secproto_names[]; |
37 | |
38 | struct rd_kafka_broker_s { /* rd_kafka_broker_t */ |
39 | TAILQ_ENTRY(rd_kafka_broker_s) rkb_link; |
40 | |
41 | int32_t rkb_nodeid; |
42 | #define RD_KAFKA_NODEID_UA -1 |
43 | |
44 | rd_sockaddr_list_t *rkb_rsal; |
45 | rd_ts_t rkb_ts_rsal_last; |
46 | const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */ |
47 | |
48 | rd_kafka_transport_t *rkb_transport; |
49 | |
50 | uint32_t rkb_corrid; |
51 | int rkb_connid; /* Connection id, increased by |
52 | * one for each connection by |
53 | * this broker. Used as a safe-guard |
54 | * to help troubleshooting buffer |
55 | * problems across disconnects. */ |
56 | |
57 | rd_kafka_q_t *rkb_ops; |
58 | |
59 | mtx_t rkb_lock; |
60 | |
61 | int rkb_blocking_max_ms; /* Maximum IO poll blocking |
62 | * time. */ |
63 | |
64 | /* Toppars handled by this broker */ |
65 | TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars; |
66 | int rkb_toppar_cnt; |
67 | |
68 | /* Active toppars that are eligible for: |
69 | * - (consumer) fetching due to underflow |
70 | * - (producer) producing |
71 | * |
72 | * The circleq provides round-robin scheduling for both cases. |
73 | */ |
74 | CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars; |
75 | int rkb_active_toppar_cnt; |
76 | rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar |
77 | * in fetch list. |
78 | * This is used for |
79 | * round-robin. */ |
80 | |
81 | |
82 | rd_kafka_cgrp_t *rkb_cgrp; |
83 | |
84 | rd_ts_t rkb_ts_fetch_backoff; |
85 | int rkb_fetching; |
86 | |
87 | enum { |
88 | RD_KAFKA_BROKER_STATE_INIT, |
89 | RD_KAFKA_BROKER_STATE_DOWN, |
90 | RD_KAFKA_BROKER_STATE_TRY_CONNECT, |
91 | RD_KAFKA_BROKER_STATE_CONNECT, |
92 | RD_KAFKA_BROKER_STATE_AUTH, |
93 | |
94 | /* Any state >= STATE_UP means the Kafka protocol layer |
95 | * is operational (to some degree). */ |
96 | RD_KAFKA_BROKER_STATE_UP, |
97 | RD_KAFKA_BROKER_STATE_UPDATE, |
98 | RD_KAFKA_BROKER_STATE_APIVERSION_QUERY, |
99 | RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE |
100 | } rkb_state; |
101 | |
102 | rd_ts_t rkb_ts_state; /* Timestamp of last |
103 | * state change */ |
104 | rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan |
105 | * interval. */ |
106 | |
107 | rd_atomic32_t rkb_blocking_request_cnt; /* The number of |
108 | * in-flight blocking |
109 | * requests. |
110 | * A blocking request is |
111 | * one that is known to |
112 | * possibly block on the |
113 | * broker for longer than |
114 | * the typical processing |
115 | * time, e.g.: |
116 | * JoinGroup, SyncGroup */ |
117 | |
118 | int rkb_features; /* Protocol features supported |
119 | * by this broker. |
120 | * See RD_KAFKA_FEATURE_* in |
121 | * rdkafka_proto.h */ |
122 | |
123 | struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs |
124 | * (MUST be sorted) */ |
125 | size_t rkb_ApiVersions_cnt; |
126 | rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long |
127 | * the fallback proto |
128 | * will be used after |
129 | * ApiVersionRequest |
130 | * failure. */ |
131 | |
132 | rd_kafka_confsource_t rkb_source; |
133 | struct { |
134 | rd_atomic64_t tx_bytes; |
135 | rd_atomic64_t tx; /**< Kafka requests */ |
136 | rd_atomic64_t tx_err; |
137 | rd_atomic64_t tx_retries; |
138 | rd_atomic64_t req_timeouts; /* Accumulated value */ |
139 | |
140 | rd_atomic64_t rx_bytes; |
141 | rd_atomic64_t rx; /**< Kafka responses */ |
142 | rd_atomic64_t rx_err; |
143 | rd_atomic64_t rx_corrid_err; /* CorrId misses */ |
144 | rd_atomic64_t rx_partial; /* Partial messages received |
145 | * and dropped. */ |
146 | rd_atomic64_t zbuf_grow; /* Compression/decompression buffer grows needed */ |
147 | rd_atomic64_t buf_grow; /* rkbuf grows needed */ |
148 | rd_atomic64_t wakeups; /* Poll wakeups */ |
149 | |
150 | rd_atomic32_t connects; /**< Connection attempts, |
151 | * successful or not. */ |
152 | |
153 | rd_atomic32_t disconnects; /**< Disconnects. |
154 | * Always peer-triggered. */ |
155 | |
156 | rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type |
157 | * counter */ |
158 | } rkb_c; |
159 | |
160 | int rkb_req_timeouts; /* Current value */ |
161 | |
162 | rd_ts_t rkb_ts_tx_last; /**< Timestamp of last |
163 | * transmitted requested */ |
164 | |
165 | rd_ts_t rkb_ts_metadata_poll; /* Next metadata poll time */ |
166 | int rkb_metadata_fast_poll_cnt; /* Perform fast |
167 | * metadata polls. */ |
168 | thrd_t rkb_thread; |
169 | |
170 | rd_refcnt_t rkb_refcnt; |
171 | |
172 | rd_kafka_t *rkb_rk; |
173 | |
174 | rd_kafka_buf_t *rkb_recv_buf; |
175 | |
176 | int rkb_max_inflight; /* Maximum number of in-flight |
177 | * requests to broker. |
178 | * Compared to rkb_waitresps length.*/ |
179 | rd_kafka_bufq_t rkb_outbufs; |
180 | rd_kafka_bufq_t rkb_waitresps; |
181 | rd_kafka_bufq_t rkb_retrybufs; |
182 | |
183 | rd_avg_t rkb_avg_int_latency;/* Current internal latency period*/ |
184 | rd_avg_t rkb_avg_outbuf_latency; /**< Current latency |
185 | * between buf_enq0 |
186 | * and writing to socket |
187 | */ |
188 | rd_avg_t rkb_avg_rtt; /* Current RTT period */ |
189 | rd_avg_t rkb_avg_throttle; /* Current throttle period */ |
190 | |
191 | /* These are all protected by rkb_lock */ |
192 | char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */ |
193 | char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/ |
194 | uint16_t rkb_port; /* TCP port */ |
195 | char *rkb_origname; /* Original |
196 | * host name */ |
197 | int rkb_nodename_epoch; /**< Bumped each time |
198 | * the nodename is changed. |
199 | * Compared to |
200 | * rkb_connect_epoch |
201 | * to trigger a reconnect |
202 | * for logical broker |
203 | * when the nodename is |
204 | * updated. */ |
205 | int rkb_connect_epoch; /**< The value of |
206 | * rkb_nodename_epoch at the |
207 | * last connection attempt. |
208 | */ |
209 | |
210 | /* Logging name is a copy of rkb_name, protected by its own mutex */ |
211 | char *rkb_logname; |
212 | mtx_t rkb_logname_lock; |
213 | |
214 | int rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake |
215 | * up from IO-wait when |
216 | * queues have content. */ |
217 | int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd, |
218 | * this is rkb_wakeup_fd[1] |
219 | * if enabled. */ |
220 | |
221 | /**< Current, exponentially increased, reconnect backoff. */ |
222 | int rkb_reconnect_backoff_ms; |
223 | |
224 | /**< Absolute timestamp of next allowed reconnect. */ |
225 | rd_ts_t rkb_ts_reconnect; |
226 | |
227 | /**< Persistent connection demand is tracked by |
228 | * an counter for each type of demand. |
229 | * The broker thread will maintain a persistent connection |
230 | * if any of the counters are non-zero, and revert to |
231 | * on-demand mode when they all reach zero. |
232 | * After incrementing any of the counters a broker wakeup |
233 | * should be signalled to expedite handling. */ |
234 | struct { |
235 | /**< Producer: partitions are being produced to. |
236 | * Consumer: partitions are being fetched from. |
237 | * |
238 | * Counter is maintained by the broker handler thread |
239 | * itself, no need for atomic/locking. |
240 | * Is reset to 0 on each producer|consumer_serve() loop |
241 | * and updated according to current need, which |
242 | * will trigger a state transition to |
243 | * TRY_CONNECT if a connection is needed. */ |
244 | int internal; |
245 | |
246 | /**< Consumer: Broker is the group coordinator. |
247 | * |
248 | * Counter is maintained by cgrp logic in |
249 | * rdkafka main thread. */ |
250 | rd_atomic32_t coord; |
251 | } rkb_persistconn; |
252 | |
253 | rd_kafka_secproto_t rkb_proto; |
254 | |
255 | int rkb_down_reported; /* Down event reported */ |
256 | #if WITH_SASL_CYRUS |
257 | rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr; |
258 | #endif |
259 | |
260 | |
261 | /* |
262 | * Log suppression |
263 | */ |
264 | struct { |
265 | /**< Log: compression type not supported by broker. */ |
266 | rd_interval_t unsupported_compression; |
267 | |
268 | /**< Log: KIP-62 not supported by broker. */ |
269 | rd_interval_t unsupported_kip62; |
270 | } rkb_suppress; |
271 | |
272 | struct { |
273 | char msg[512]; |
274 | int err; /* errno */ |
275 | } rkb_err; |
276 | }; |
277 | |
278 | #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) |
279 | #define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock) |
280 | #define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock) |
281 | |
282 | |
283 | /** |
284 | * @returns true if the broker state is UP or UPDATE |
285 | */ |
286 | #define rd_kafka_broker_state_is_up(state) \ |
287 | ((state) == RD_KAFKA_BROKER_STATE_UP || \ |
288 | (state) == RD_KAFKA_BROKER_STATE_UPDATE) |
289 | |
290 | /** |
291 | * @brief Broker comparator |
292 | */ |
293 | static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a, |
294 | const void *_b) { |
295 | const rd_kafka_broker_t *a = _a, *b = _b; |
296 | return (int)(a - b); |
297 | } |
298 | |
299 | |
300 | /** |
301 | * @returns true if broker supports \p features, else false. |
302 | */ |
303 | static RD_UNUSED |
304 | int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) { |
305 | int r; |
306 | rd_kafka_broker_lock(rkb); |
307 | r = (rkb->rkb_features & features) == features; |
308 | rd_kafka_broker_unlock(rkb); |
309 | return r; |
310 | } |
311 | |
312 | int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb, |
313 | int16_t ApiKey, |
314 | int16_t minver, int16_t maxver, |
315 | int *featuresp); |
316 | |
317 | int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb); |
318 | |
319 | rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk, |
320 | int32_t nodeid, |
321 | int state, |
322 | rd_bool_t do_connect); |
323 | #define rd_kafka_broker_find_by_nodeid(rk,nodeid) \ |
324 | rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1,rd_false) |
325 | |
326 | /** |
327 | * Filter out brokers that are currently in a blocking request. |
328 | */ |
329 | static RD_INLINE RD_UNUSED int |
330 | rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) { |
331 | return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0; |
332 | } |
333 | |
334 | |
335 | /** |
336 | * Filter out brokers that don't support Idempotent Producer. |
337 | */ |
338 | static RD_INLINE RD_UNUSED int |
339 | rd_kafka_broker_filter_non_idempotent (rd_kafka_broker_t *rkb, void *opaque) { |
340 | return !(rkb->rkb_features & RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER); |
341 | } |
342 | |
343 | |
344 | /** |
345 | * Filter out brokers that cant do GroupCoordinator requests right now. |
346 | */ |
347 | static RD_INLINE RD_UNUSED int |
348 | rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) { |
349 | return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 || |
350 | !(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD); |
351 | } |
352 | |
353 | rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state, |
354 | int (*filter) (rd_kafka_broker_t *rkb, |
355 | void *opaque), |
356 | void *opaque, const char *reason); |
357 | |
358 | rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms, |
359 | int do_lock, const char *reason); |
360 | |
361 | rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, |
362 | int state); |
363 | |
364 | rd_kafka_broker_t * |
365 | rd_kafka_broker_get_async (rd_kafka_t *rk, int32_t broker_id, int state, |
366 | rd_kafka_enq_once_t *eonce); |
367 | |
368 | rd_kafka_broker_t *rd_kafka_broker_controller (rd_kafka_t *rk, int state, |
369 | rd_ts_t abs_timeout); |
370 | rd_kafka_broker_t * |
371 | rd_kafka_broker_controller_async (rd_kafka_t *rk, int state, |
372 | rd_kafka_enq_once_t *eonce); |
373 | |
374 | int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist); |
375 | void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state); |
376 | |
377 | void rd_kafka_broker_fail (rd_kafka_broker_t *rkb, |
378 | int level, rd_kafka_resp_err_t err, |
379 | const char *fmt, ...); |
380 | |
381 | void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb, |
382 | rd_kafka_resp_err_t err, |
383 | const char *errstr); |
384 | |
385 | void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb); |
386 | |
387 | #define rd_kafka_broker_destroy(rkb) \ |
388 | rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \ |
389 | rd_kafka_broker_destroy_final(rkb)) |
390 | |
391 | |
392 | void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto, |
393 | const struct rd_kafka_metadata_broker *mdb); |
394 | rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, |
395 | rd_kafka_confsource_t source, |
396 | rd_kafka_secproto_t proto, |
397 | const char *name, uint16_t port, |
398 | int32_t nodeid); |
399 | |
400 | rd_kafka_broker_t *rd_kafka_broker_add_logical (rd_kafka_t *rk, |
401 | const char *name); |
402 | |
403 | /** @define returns true if broker is logical. No locking is needed. */ |
404 | #define RD_KAFKA_BROKER_IS_LOGICAL(rkb) ((rkb)->rkb_source == RD_KAFKA_LOGICAL) |
405 | |
406 | void rd_kafka_broker_set_nodename (rd_kafka_broker_t *rkb, |
407 | rd_kafka_broker_t *from_rkb); |
408 | |
409 | void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb); |
410 | void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr); |
411 | |
412 | int rd_kafka_send (rd_kafka_broker_t *rkb); |
413 | int rd_kafka_recv (rd_kafka_broker_t *rkb); |
414 | |
415 | void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt, |
416 | rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err); |
417 | |
418 | void rd_kafka_dr_implicit_ack (rd_kafka_broker_t *rkb, |
419 | rd_kafka_toppar_t *rktp, |
420 | uint64_t last_msgid); |
421 | |
422 | void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb, |
423 | rd_kafka_buf_t *rkbuf, |
424 | rd_kafka_resp_cb_t *resp_cb, |
425 | void *opaque); |
426 | |
427 | void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb, |
428 | rd_kafka_buf_t *rkbuf, |
429 | rd_kafka_replyq_t replyq, |
430 | rd_kafka_resp_cb_t *resp_cb, |
431 | void *opaque); |
432 | |
433 | void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf); |
434 | |
435 | |
436 | rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk); |
437 | |
438 | void msghdr_print (rd_kafka_t *rk, |
439 | const char *what, const struct msghdr *msg, |
440 | int hexdump); |
441 | |
442 | const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb); |
443 | void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb); |
444 | int rd_kafka_all_brokers_wakeup (rd_kafka_t *rk, |
445 | int min_state); |
446 | |
447 | void rd_kafka_connect_any (rd_kafka_t *rk, const char *reason); |
448 | |
449 | void rd_kafka_broker_purge_queues (rd_kafka_broker_t *rkb, int purge_flags, |
450 | rd_kafka_replyq_t replyq); |
451 | |
452 | int rd_kafka_brokers_get_state_version (rd_kafka_t *rk); |
453 | int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version, |
454 | int timeout_ms); |
455 | int rd_kafka_brokers_wait_state_change_async (rd_kafka_t *rk, |
456 | int stored_version, |
457 | rd_kafka_enq_once_t *eonce); |
458 | void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk); |
459 | |
460 | |
461 | |
462 | /** |
463 | * Updates the current toppar active round-robin next pointer. |
464 | */ |
465 | static RD_INLINE RD_UNUSED |
466 | void rd_kafka_broker_active_toppar_next (rd_kafka_broker_t *rkb, |
467 | rd_kafka_toppar_t *sugg_next) { |
468 | if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) || |
469 | (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars)) |
470 | rkb->rkb_active_toppar_next = NULL; |
471 | else if (sugg_next) |
472 | rkb->rkb_active_toppar_next = sugg_next; |
473 | else |
474 | rkb->rkb_active_toppar_next = |
475 | CIRCLEQ_FIRST(&rkb->rkb_active_toppars); |
476 | } |
477 | |
478 | |
479 | void rd_kafka_broker_active_toppar_add (rd_kafka_broker_t *rkb, |
480 | rd_kafka_toppar_t *rktp); |
481 | |
482 | void rd_kafka_broker_active_toppar_del (rd_kafka_broker_t *rkb, |
483 | rd_kafka_toppar_t *rktp); |
484 | |
485 | |
486 | void rd_kafka_broker_schedule_connection (rd_kafka_broker_t *rkb); |
487 | |
488 | void |
489 | rd_kafka_broker_persistent_connection_add (rd_kafka_broker_t *rkb, |
490 | rd_atomic32_t *acntp); |
491 | |
492 | void |
493 | rd_kafka_broker_persistent_connection_del (rd_kafka_broker_t *rkb, |
494 | rd_atomic32_t *acntp); |
495 | |
496 | |
497 | int unittest_broker (void); |
498 | |
499 | #endif /* _RDKAFKA_BROKER_H_ */ |
500 | |