1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30
31#ifndef _MSC_VER
32#define _GNU_SOURCE
33/*
34 * AIX defines this and the value needs to be set correctly. For Solaris,
35 * src/rd.h defines _POSIX_SOURCE to be 200809L, which corresponds to XPG7,
36 * which itself is not compatible with _XOPEN_SOURCE on that platform.
37 */
38#if !defined(_AIX) && !defined(__sun)
39#define _XOPEN_SOURCE
40#endif
41#include <signal.h>
42#endif
43
44#include <stdio.h>
45#include <stdarg.h>
46#include <string.h>
47#include <ctype.h>
48
49#include "rd.h"
50#include "rdkafka_int.h"
51#include "rdkafka_msg.h"
52#include "rdkafka_msgset.h"
53#include "rdkafka_topic.h"
54#include "rdkafka_partition.h"
55#include "rdkafka_broker.h"
56#include "rdkafka_offset.h"
57#include "rdkafka_transport.h"
58#include "rdkafka_proto.h"
59#include "rdkafka_buf.h"
60#include "rdkafka_request.h"
61#include "rdkafka_sasl.h"
62#include "rdkafka_interceptor.h"
63#include "rdkafka_idempotence.h"
64#include "rdtime.h"
65#include "rdcrc32.h"
66#include "rdrand.h"
67#include "rdkafka_lz4.h"
68#if WITH_SSL
69#include <openssl/err.h>
70#endif
71#include "rdendian.h"
72#include "rdunittest.h"
73
74
75static const int rd_kafka_max_block_ms = 1000;
76
77const char *rd_kafka_broker_state_names[] = {
78 "INIT",
79 "DOWN",
80 "TRY_CONNECT",
81 "CONNECT",
82 "AUTH",
83 "UP",
84 "UPDATE",
85 "APIVERSION_QUERY",
86 "AUTH_HANDSHAKE"
87};
88
89const char *rd_kafka_secproto_names[] = {
90 [RD_KAFKA_PROTO_PLAINTEXT] = "plaintext",
91 [RD_KAFKA_PROTO_SSL] = "ssl",
92 [RD_KAFKA_PROTO_SASL_PLAINTEXT] = "sasl_plaintext",
93 [RD_KAFKA_PROTO_SASL_SSL] = "sasl_ssl",
94 NULL
95};
96
97
98
99/**
100 * @returns > 0 if a connection to this broker is needed, else 0.
101 * @locality broker thread
102 * @locks none
103 */
104static RD_INLINE int
105rd_kafka_broker_needs_connection (rd_kafka_broker_t *rkb) {
106 return rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT &&
107 (!rkb->rkb_rk->rk_conf.sparse_connections ||
108 rkb->rkb_persistconn.internal ||
109 rd_atomic32_get(&rkb->rkb_persistconn.coord));
110}
111
112
113static void rd_kafka_broker_handle_purge_queues (rd_kafka_broker_t *rkb,
114 rd_kafka_op_t *rko);
115
116
117
118
119#define rd_kafka_broker_terminating(rkb) \
120 (rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1)
121
122
123/**
124 * Construct broker nodename.
125 */
126static void rd_kafka_mk_nodename (char *dest, size_t dsize,
127 const char *name, uint16_t port) {
128 rd_snprintf(dest, dsize, "%s:%hu", name, port);
129}
130
131/**
132 * Construct descriptive broker name
133 */
134static void rd_kafka_mk_brokername (char *dest, size_t dsize,
135 rd_kafka_secproto_t proto,
136 const char *nodename, int32_t nodeid,
137 rd_kafka_confsource_t source) {
138
139 /* Prepend protocol name to brokername, unless it is a
140 * standard plaintext or logical broker in which case we
141 * omit the protocol part. */
142 if (proto != RD_KAFKA_PROTO_PLAINTEXT &&
143 source != RD_KAFKA_LOGICAL) {
144 int r = rd_snprintf(dest, dsize, "%s://",
145 rd_kafka_secproto_names[proto]);
146 if (r >= (int)dsize) /* Skip proto name if it wont fit.. */
147 r = 0;
148
149 dest += r;
150 dsize -= r;
151 }
152
153 if (nodeid == RD_KAFKA_NODEID_UA)
154 rd_snprintf(dest, dsize, "%s%s",
155 nodename,
156 source == RD_KAFKA_LOGICAL ? "" :
157 (source == RD_KAFKA_INTERNAL ?
158 "/internal" : "/bootstrap"));
159 else
160 rd_snprintf(dest, dsize, "%s/%"PRId32, nodename, nodeid);
161}
162
163
164/**
165 * @brief Enable protocol feature(s) for the current broker.
166 *
167 * Locality: broker thread
168 */
169static void rd_kafka_broker_feature_enable (rd_kafka_broker_t *rkb,
170 int features) {
171 if (features & rkb->rkb_features)
172 return;
173
174 rkb->rkb_features |= features;
175 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE,
176 "FEATURE",
177 "Updated enabled protocol features +%s to %s",
178 rd_kafka_features2str(features),
179 rd_kafka_features2str(rkb->rkb_features));
180}
181
182
183/**
184 * @brief Disable protocol feature(s) for the current broker.
185 *
186 * Locality: broker thread
187 */
188static void rd_kafka_broker_feature_disable (rd_kafka_broker_t *rkb,
189 int features) {
190 if (!(features & rkb->rkb_features))
191 return;
192
193 rkb->rkb_features &= ~features;
194 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE,
195 "FEATURE",
196 "Updated enabled protocol features -%s to %s",
197 rd_kafka_features2str(features),
198 rd_kafka_features2str(rkb->rkb_features));
199}
200
201
202/**
203 * @brief Set protocol feature(s) for the current broker.
204 *
205 * @remark This replaces the previous feature set.
206 *
207 * @locality broker thread
208 * @locks rd_kafka_broker_lock()
209 */
210static void rd_kafka_broker_features_set (rd_kafka_broker_t *rkb, int features) {
211 if (rkb->rkb_features == features)
212 return;
213
214 rkb->rkb_features = features;
215 rd_rkb_dbg(rkb, BROKER, "FEATURE",
216 "Updated enabled protocol features to %s",
217 rd_kafka_features2str(rkb->rkb_features));
218}
219
220
221/**
222 * @brief Check and return supported ApiVersion for \p ApiKey.
223 *
224 * @returns the highest supported ApiVersion in the specified range (inclusive)
225 * or -1 if the ApiKey is not supported or no matching ApiVersion.
226 * The current feature set is also returned in \p featuresp
227 * @locks none
228 * @locality any
229 */
230int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
231 int16_t ApiKey,
232 int16_t minver, int16_t maxver,
233 int *featuresp) {
234 struct rd_kafka_ApiVersion skel = { .ApiKey = ApiKey };
235 struct rd_kafka_ApiVersion ret = RD_ZERO_INIT, *retp;
236
237 rd_kafka_broker_lock(rkb);
238 if (featuresp)
239 *featuresp = rkb->rkb_features;
240
241 if (rkb->rkb_features & RD_KAFKA_FEATURE_UNITTEST) {
242 /* For unit tests let the broker support everything. */
243 rd_kafka_broker_unlock(rkb);
244 return maxver;
245 }
246
247 retp = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt,
248 sizeof(*rkb->rkb_ApiVersions),
249 rd_kafka_ApiVersion_key_cmp);
250 if (retp)
251 ret = *retp;
252 rd_kafka_broker_unlock(rkb);
253
254 if (!retp)
255 return -1;
256
257 if (ret.MaxVer < maxver) {
258 if (ret.MaxVer < minver)
259 return -1;
260 else
261 return ret.MaxVer;
262 } else if (ret.MinVer > maxver)
263 return -1;
264 else
265 return maxver;
266}
267
268
269/**
270 * @brief Set broker state.
271 *
272 * \c rkb->rkb_state is the previous state, while
273 * \p state is the new state.
274 *
275 * @locks rd_kafka_broker_lock() MUST be held.
276 * @locality broker thread
277 */
278void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state) {
279 if ((int)rkb->rkb_state == state)
280 return;
281
282 rd_kafka_dbg(rkb->rkb_rk, BROKER, "STATE",
283 "%s: Broker changed state %s -> %s",
284 rkb->rkb_name,
285 rd_kafka_broker_state_names[rkb->rkb_state],
286 rd_kafka_broker_state_names[state]);
287
288 if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
289 /* no-op */
290 } else if (state == RD_KAFKA_BROKER_STATE_DOWN &&
291 !rkb->rkb_down_reported &&
292 rkb->rkb_state != RD_KAFKA_BROKER_STATE_APIVERSION_QUERY) {
293 /* Propagate ALL_BROKERS_DOWN event if all brokers are
294 * now down, unless we're terminating.
295 * Dont do this if we're querying for ApiVersion since it
296 * is bound to fail once on older brokers. */
297 if (rd_atomic32_add(&rkb->rkb_rk->rk_broker_down_cnt, 1) ==
298 rd_atomic32_get(&rkb->rkb_rk->rk_broker_cnt) -
299 rd_atomic32_get(&rkb->rkb_rk->rk_broker_addrless_cnt) &&
300 !rd_kafka_terminating(rkb->rkb_rk))
301 rd_kafka_op_err(rkb->rkb_rk,
302 RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
303 "%i/%i brokers are down",
304 rd_atomic32_get(&rkb->rkb_rk->
305 rk_broker_down_cnt),
306 rd_atomic32_get(&rkb->rkb_rk->
307 rk_broker_cnt) -
308 rd_atomic32_get(&rkb->rkb_rk->
309 rk_broker_addrless_cnt));
310 rkb->rkb_down_reported = 1;
311
312 } else if (state >= RD_KAFKA_BROKER_STATE_UP &&
313 rkb->rkb_down_reported) {
314 rd_atomic32_sub(&rkb->rkb_rk->rk_broker_down_cnt, 1);
315 rkb->rkb_down_reported = 0;
316 }
317
318 if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
319 if (rd_kafka_broker_state_is_up(state) &&
320 !rd_kafka_broker_state_is_up(rkb->rkb_state)) {
321 rd_atomic32_add(&rkb->rkb_rk->rk_broker_up_cnt, 1);
322 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
323 rd_atomic32_add(&rkb->rkb_rk->
324 rk_logical_broker_up_cnt, 1);
325
326 } else if (rd_kafka_broker_state_is_up(rkb->rkb_state) &&
327 !rd_kafka_broker_state_is_up(state)) {
328 rd_atomic32_sub(&rkb->rkb_rk->rk_broker_up_cnt, 1);
329 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
330 rd_atomic32_sub(&rkb->rkb_rk->
331 rk_logical_broker_up_cnt, 1);
332 }
333 }
334
335 rkb->rkb_state = state;
336 rkb->rkb_ts_state = rd_clock();
337
338 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
339}
340
341
342/**
343 * @brief Locks broker, acquires the states, unlocks, and returns
344 * the state.
345 * @locks !broker_lock
346 * @locality any
347 */
348int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb) {
349 int state;
350 rd_kafka_broker_lock(rkb);
351 state = rkb->rkb_state;
352 rd_kafka_broker_unlock(rkb);
353 return state;
354}
355
356
357/**
358 * Failure propagation to application.
359 * Will tear down connection to broker and trigger a reconnect.
360 *
361 * If 'fmt' is NULL nothing will be logged or propagated to the application.
362 *
363 * \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
364 * be debug-logged.
365 *
366 * Locality: Broker thread
367 */
368void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
369 int level, rd_kafka_resp_err_t err,
370 const char *fmt, ...) {
371 va_list ap;
372 int errno_save = errno;
373 rd_kafka_bufq_t tmpq_waitresp, tmpq;
374 int old_state;
375
376 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
377
378 rd_kafka_dbg(rkb->rkb_rk, BROKER | RD_KAFKA_DBG_PROTOCOL, "BROKERFAIL",
379 "%s: failed: err: %s: (errno: %s)",
380 rkb->rkb_name, rd_kafka_err2str(err),
381 rd_strerror(errno_save));
382
383 rkb->rkb_err.err = errno_save;
384
385 if (rkb->rkb_transport) {
386 rd_kafka_transport_close(rkb->rkb_transport);
387 rkb->rkb_transport = NULL;
388
389 if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP)
390 rd_atomic32_add(&rkb->rkb_c.disconnects, 1);
391 }
392
393 rkb->rkb_req_timeouts = 0;
394
395 if (rkb->rkb_recv_buf) {
396 rd_kafka_buf_destroy(rkb->rkb_recv_buf);
397 rkb->rkb_recv_buf = NULL;
398 }
399
400 rd_kafka_broker_lock(rkb);
401
402 /* The caller may omit the format if it thinks this is a recurring
403 * failure, in which case the following things are omitted:
404 * - log message
405 * - application OP_ERR
406 * - metadata request
407 *
408 * Dont log anything if this was the termination signal, or if the
409 * socket disconnected while trying ApiVersionRequest.
410 */
411 if (fmt &&
412 !(errno_save == EINTR &&
413 rd_kafka_terminating(rkb->rkb_rk)) &&
414 !(err == RD_KAFKA_RESP_ERR__TRANSPORT &&
415 rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY)) {
416 int of;
417
418 /* Insert broker name in log message if it fits. */
419 of = rd_snprintf(rkb->rkb_err.msg, sizeof(rkb->rkb_err.msg),
420 "%s: ", rkb->rkb_name);
421 if (of >= (int)sizeof(rkb->rkb_err.msg))
422 of = 0;
423 va_start(ap, fmt);
424 rd_vsnprintf(rkb->rkb_err.msg+of,
425 sizeof(rkb->rkb_err.msg)-of, fmt, ap);
426 va_end(ap);
427
428 /* Append time since last state change
429 * to help debug connection issues */
430 of = (int)strlen(rkb->rkb_err.msg);
431 if (of + 30 < (int)sizeof(rkb->rkb_err.msg))
432 rd_snprintf(rkb->rkb_err.msg+of,
433 sizeof(rkb->rkb_err.msg)-of,
434 " (after %"PRId64"ms in state %s)",
435 (rd_clock() - rkb->rkb_ts_state)/1000,
436 rd_kafka_broker_state_names[rkb->
437 rkb_state]);
438
439 if (level >= LOG_DEBUG)
440 rd_kafka_dbg(rkb->rkb_rk, BROKER, "FAIL",
441 "%s", rkb->rkb_err.msg);
442 else {
443 /* Don't log if an error callback is registered,
444 * or the error event is enabled. */
445 if (!(rkb->rkb_rk->rk_conf.enabled_events &
446 RD_KAFKA_EVENT_ERROR))
447 rd_kafka_log(rkb->rkb_rk, level, "FAIL",
448 "%s", rkb->rkb_err.msg);
449 /* Send ERR op back to application for processing. */
450 rd_kafka_op_err(rkb->rkb_rk, err,
451 "%s", rkb->rkb_err.msg);
452 }
453 }
454
455 /* If we're currently asking for ApiVersion and the connection
456 * went down it probably means the broker does not support that request
457 * and tore down the connection. In this case we disable that feature flag. */
458 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY)
459 rd_kafka_broker_feature_disable(rkb, RD_KAFKA_FEATURE_APIVERSION);
460
461 /* Set broker state */
462 old_state = rkb->rkb_state;
463 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
464
465 /* Unlock broker since a requeue will try to lock it. */
466 rd_kafka_broker_unlock(rkb);
467
468 /*
469 * Purge all buffers
470 * (put bufs on a temporary queue since bufs may be requeued,
471 * make sure outstanding requests are re-enqueued before
472 * bufs on outbufs queue.)
473 */
474 rd_kafka_bufq_init(&tmpq_waitresp);
475 rd_kafka_bufq_init(&tmpq);
476 rd_kafka_bufq_concat(&tmpq_waitresp, &rkb->rkb_waitresps);
477 rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs);
478 rd_atomic32_init(&rkb->rkb_blocking_request_cnt, 0);
479
480 /* Purge the in-flight buffers (might get re-enqueued in case
481 * of retries). */
482 rd_kafka_bufq_purge(rkb, &tmpq_waitresp, err);
483
484 /* Purge the waiting-in-output-queue buffers,
485 * might also get re-enqueued. */
486 rd_kafka_bufq_purge(rkb, &tmpq,
487 /* If failure was caused by a timeout,
488 * adjust the error code for in-queue requests. */
489 err == RD_KAFKA_RESP_ERR__TIMED_OUT ?
490 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE : err);
491
492 /* Update bufq for connection reset:
493 * - Purge connection-setup requests from outbufs since they will be
494 * reissued on the next connect.
495 * - Reset any partially sent buffer's offset.
496 */
497 rd_kafka_bufq_connection_reset(rkb, &rkb->rkb_outbufs);
498
499 /* Extra debugging for tracking termination-hang issues:
500 * show what is keeping this broker from decommissioning. */
501 if (rd_kafka_terminating(rkb->rkb_rk) &&
502 !rd_kafka_broker_terminating(rkb)) {
503 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL, "BRKTERM",
504 "terminating: broker still has %d refcnt(s), "
505 "%"PRId32" buffer(s), %d partition(s)",
506 rd_refcnt_get(&rkb->rkb_refcnt),
507 rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
508 rkb->rkb_toppar_cnt);
509 rd_kafka_bufq_dump(rkb, "BRKOUTBUFS", &rkb->rkb_outbufs);
510#if ENABLE_SHAREDPTR_DEBUG
511 if (rd_refcnt_get(&rkb->rkb_refcnt) > 1) {
512 rd_rkb_dbg(rkb, BROKER, "BRKTERM",
513 "Dumping shared pointers: "
514 "this broker is %p", rkb);
515 rd_shared_ptrs_dump();
516 }
517#endif
518 }
519
520
521 /* Query for topic leaders to quickly pick up on failover. */
522 if (fmt && err != RD_KAFKA_RESP_ERR__DESTROY &&
523 old_state >= RD_KAFKA_BROKER_STATE_UP)
524 rd_kafka_metadata_refresh_known_topics(rkb->rkb_rk, NULL,
525 1/*force*/,
526 "broker down");
527}
528
529
530
531/**
532 * @brief Handle broker connection close.
533 *
534 * @locality broker thread
535 */
536void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb,
537 rd_kafka_resp_err_t err,
538 const char *errstr) {
539 int log_level = LOG_ERR;
540
541 if (!rkb->rkb_rk->rk_conf.log_connection_close) {
542 /* Silence all connection closes */
543 log_level = LOG_DEBUG;
544
545 } else {
546 /* Silence close logs for connections that are idle,
547 * it is most likely the broker's idle connection
548 * reaper kicking in.
549 *
550 * Indications there might be an error and not an
551 * idle disconnect:
552 * - If the connection age is low a disconnect
553 * typically indicates a failure, such as protocol mismatch.
554 * - If the connection hasn't been idle long enough.
555 * - There are outstanding requests, or requests enqueued.
556 *
557 * For non-idle connections, adjust log level:
558 * - requests in-flight: LOG_WARNING
559 * - else: LOG_INFO
560 */
561 rd_ts_t now = rd_clock();
562 rd_ts_t minidle =
563 RD_MAX(60*1000/*60s*/,
564 rkb->rkb_rk->rk_conf.socket_timeout_ms) * 1000;
565 int inflight = rd_kafka_bufq_cnt(&rkb->rkb_waitresps);
566 int inqueue = rd_kafka_bufq_cnt(&rkb->rkb_outbufs);
567
568 if (rkb->rkb_ts_state + minidle < now &&
569 rkb->rkb_ts_tx_last + minidle < now &&
570 inflight + inqueue == 0)
571 log_level = LOG_DEBUG;
572 else if (inflight > 1)
573 log_level = LOG_WARNING;
574 else
575 log_level = LOG_INFO;
576 }
577
578 rd_kafka_broker_fail(rkb, log_level, err, "%s", errstr);
579}
580
581
582/**
583 * @brief Purge requests in \p rkbq matching request \p ApiKey
584 * and partition \p rktp.
585 *
586 * @warning ApiKey must be RD_KAFKAP_Produce
587 *
588 * @returns the number of purged buffers.
589 *
590 * @locality broker thread
591 */
592static int
593rd_kafka_broker_bufq_purge_by_toppar (rd_kafka_broker_t *rkb,
594 rd_kafka_bufq_t *rkbq,
595 int64_t ApiKey,
596 rd_kafka_toppar_t *rktp,
597 rd_kafka_resp_err_t err) {
598 rd_kafka_buf_t *rkbuf, *tmp;
599 int cnt = 0;
600
601 rd_assert(ApiKey == RD_KAFKAP_Produce);
602
603 TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
604
605 if (rkbuf->rkbuf_reqhdr.ApiKey != ApiKey ||
606 rd_kafka_toppar_s2i(rkbuf->rkbuf_u.Produce.
607 batch.s_rktp) != rktp||
608 /* Skip partially sent buffers and let them transmit.
609 * The alternative would be to kill the connection here,
610 * which is more drastic and costly. */
611 rd_slice_offset(&rkbuf->rkbuf_reader) > 0)
612 continue;
613
614 rd_kafka_bufq_deq(rkbq, rkbuf);
615
616 rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
617 cnt++;
618 }
619
620 return cnt;
621}
622
623
624/**
625 * Scan bufq for buffer timeouts, trigger buffer callback on timeout.
626 *
627 * If \p partial_cntp is non-NULL any partially sent buffers will increase
628 * the provided counter by 1.
629 *
630 * @param ApiKey Only match requests with this ApiKey, or -1 for all.
631 * @param now If 0, all buffers will time out, else the current clock.
632 * @param description "N requests timed out <description>", e.g., "in flight".
633 * Only used if log_first_n > 0.
634 * @param log_first_n Log the first N request timeouts.
635 *
636 * @returns the number of timed out buffers.
637 *
638 * @locality broker thread
639 */
640static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
641 int is_waitresp_q,
642 rd_kafka_bufq_t *rkbq,
643 int *partial_cntp,
644 int16_t ApiKey,
645 rd_kafka_resp_err_t err,
646 rd_ts_t now,
647 const char *description,
648 int log_first_n) {
649 rd_kafka_buf_t *rkbuf, *tmp;
650 int cnt = 0;
651 int idx = -1;
652 const rd_kafka_buf_t *holb = TAILQ_FIRST(&rkbq->rkbq_bufs);
653
654 TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
655 idx++;
656
657 if (likely(now && rkbuf->rkbuf_ts_timeout > now))
658 continue;
659
660 if (ApiKey != -1 && rkbuf->rkbuf_reqhdr.ApiKey != ApiKey)
661 continue;
662
663 if (partial_cntp && rd_slice_offset(&rkbuf->rkbuf_reader) > 0)
664 (*partial_cntp)++;
665
666 /* Convert rkbuf_ts_sent to elapsed time since request */
667 if (rkbuf->rkbuf_ts_sent)
668 rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent;
669 else
670 rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_enq;
671
672 rd_kafka_bufq_deq(rkbq, rkbuf);
673
674 if (now && cnt < log_first_n) {
675 char holbstr[128];
676 /* Head of line blocking:
677 * If this is not the first request in queue, but the
678 * initial first request did not time out,
679 * it typically means the first request is a
680 * long-running blocking one, holding up the
681 * sub-sequent requests.
682 * In this case log what is likely holding up the
683 * requests and what caused this request to time out. */
684 if (holb && holb == TAILQ_FIRST(&rkbq->rkbq_bufs)) {
685 rd_snprintf(holbstr, sizeof(holbstr),
686 ": possibly held back by "
687 "preceeding%s %sRequest with "
688 "timeout in %dms",
689 (holb->rkbuf_flags &
690 RD_KAFKA_OP_F_BLOCKING) ?
691 " blocking" : "",
692 rd_kafka_ApiKey2str(holb->
693 rkbuf_reqhdr.
694 ApiKey),
695 (int)((holb->rkbuf_ts_timeout -
696 now) / 1000));
697 /* Only log the HOLB once */
698 holb = NULL;
699 } else {
700 *holbstr = '\0';
701 }
702
703 rd_rkb_log(rkb, LOG_NOTICE, "REQTMOUT",
704 "Timed out %sRequest %s "
705 "(after %"PRId64"ms, timeout #%d)%s",
706 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
707 ApiKey),
708 description, rkbuf->rkbuf_ts_sent/1000, cnt,
709 holbstr);
710 }
711
712 if (is_waitresp_q && rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING
713 && rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0)
714 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
715
716 rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
717 cnt++;
718 }
719
720 return cnt;
721}
722
723
724/**
725 * Scan the wait-response and outbuf queues for message timeouts.
726 *
727 * Locality: Broker thread
728 */
729static void rd_kafka_broker_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
730 int inflight_cnt, retry_cnt, outq_cnt;
731 int partial_cnt = 0;
732
733 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
734
735 /* In-flight requests waiting for response */
736 inflight_cnt = rd_kafka_broker_bufq_timeout_scan(
737 rkb, 1, &rkb->rkb_waitresps, NULL, -1,
738 RD_KAFKA_RESP_ERR__TIMED_OUT, now, "in flight", 5);
739 /* Requests in retry queue */
740 retry_cnt = rd_kafka_broker_bufq_timeout_scan(
741 rkb, 0, &rkb->rkb_retrybufs, NULL, -1,
742 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now, "in retry queue", 0);
743 /* Requests in local queue not sent yet.
744 * partial_cnt is included in outq_cnt and denotes a request
745 * that has been partially transmitted. */
746 outq_cnt = rd_kafka_broker_bufq_timeout_scan(
747 rkb, 0, &rkb->rkb_outbufs, &partial_cnt, -1,
748 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now, "in output queue", 0);
749
750 if (inflight_cnt + retry_cnt + outq_cnt + partial_cnt > 0) {
751 rd_rkb_log(rkb, LOG_WARNING, "REQTMOUT",
752 "Timed out %i in-flight, %i retry-queued, "
753 "%i out-queue, %i partially-sent requests",
754 inflight_cnt, retry_cnt, outq_cnt, partial_cnt);
755
756 rkb->rkb_req_timeouts += inflight_cnt + outq_cnt;
757 rd_atomic64_add(&rkb->rkb_c.req_timeouts,
758 inflight_cnt + outq_cnt);
759
760 /* If this was a partially sent request that timed out, or the
761 * number of timed out requests have reached the
762 * socket.max.fails threshold, we need to take down the
763 * connection. */
764 if (partial_cnt > 0 ||
765 (rkb->rkb_rk->rk_conf.socket_max_fails &&
766 rkb->rkb_req_timeouts >=
767 rkb->rkb_rk->rk_conf.socket_max_fails &&
768 rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP)) {
769 char rttinfo[32];
770 /* Print average RTT (if avail) to help diagnose. */
771 rd_avg_calc(&rkb->rkb_avg_rtt, now);
772 if (rkb->rkb_avg_rtt.ra_v.avg)
773 rd_snprintf(rttinfo, sizeof(rttinfo),
774 " (average rtt %.3fms)",
775 (float)(rkb->rkb_avg_rtt.ra_v.avg/
776 1000.0f));
777 else
778 rttinfo[0] = 0;
779 errno = ETIMEDOUT;
780 rd_kafka_broker_fail(rkb, LOG_ERR,
781 RD_KAFKA_RESP_ERR__TIMED_OUT,
782 "%i request(s) timed out: "
783 "disconnect%s",
784 rkb->rkb_req_timeouts, rttinfo);
785 }
786 }
787}
788
789
790
791static ssize_t
792rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice) {
793 ssize_t r;
794 char errstr[128];
795
796 rd_kafka_assert(rkb->rkb_rk, rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP);
797 rd_kafka_assert(rkb->rkb_rk, rkb->rkb_transport);
798
799 r = rd_kafka_transport_send(rkb->rkb_transport, slice,
800 errstr, sizeof(errstr));
801
802 if (r == -1) {
803 rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
804 "Send failed: %s", errstr);
805 rd_atomic64_add(&rkb->rkb_c.tx_err, 1);
806 return -1;
807 }
808
809 rd_atomic64_add(&rkb->rkb_c.tx_bytes, r);
810 rd_atomic64_add(&rkb->rkb_c.tx, 1);
811 return r;
812}
813
814
815
816
817static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb,
818 const char *nodename) {
819 const char *errstr;
820 int save_idx = 0;
821
822 if (!*nodename && rkb->rkb_source == RD_KAFKA_LOGICAL) {
823 rd_kafka_broker_fail(rkb, LOG_DEBUG,
824 RD_KAFKA_RESP_ERR__RESOLVE,
825 "Logical broker has no address yet");
826 return -1;
827 }
828
829 if (rkb->rkb_rsal &&
830 rkb->rkb_ts_rsal_last + (rkb->rkb_rk->rk_conf.broker_addr_ttl*1000)
831 < rd_clock()) {
832 /* Address list has expired. */
833
834 /* Save the address index to make sure we still round-robin
835 * if we get the same address list back */
836 save_idx = rkb->rkb_rsal->rsal_curr;
837
838 rd_sockaddr_list_destroy(rkb->rkb_rsal);
839 rkb->rkb_rsal = NULL;
840 }
841
842 if (!rkb->rkb_rsal) {
843 /* Resolve */
844 rkb->rkb_rsal = rd_getaddrinfo(rkb->rkb_nodename,
845 RD_KAFKA_PORT_STR,
846 AI_ADDRCONFIG,
847 rkb->rkb_rk->rk_conf.
848 broker_addr_family,
849 SOCK_STREAM,
850 IPPROTO_TCP, &errstr);
851
852 if (!rkb->rkb_rsal) {
853 rd_kafka_broker_fail(rkb, LOG_ERR,
854 RD_KAFKA_RESP_ERR__RESOLVE,
855 /* Avoid duplicate log messages */
856 rkb->rkb_err.err == errno ?
857 NULL :
858 "Failed to resolve '%s': %s",
859 nodename, errstr);
860 return -1;
861 } else {
862 rkb->rkb_ts_rsal_last = rd_clock();
863 /* Continue at previous round-robin position */
864 if (rkb->rkb_rsal->rsal_cnt > save_idx)
865 rkb->rkb_rsal->rsal_curr = save_idx;
866 }
867 }
868
869 return 0;
870}
871
872
873static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb,
874 rd_kafka_buf_t *rkbuf) {
875 rd_ts_t now;
876
877 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
878
879 if (rkb->rkb_rk->rk_conf.sparse_connections &&
880 rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT) {
881 /* Sparse connections:
882 * Trigger connection when a new request is enqueued. */
883 rkb->rkb_persistconn.internal++;
884 rd_kafka_broker_lock(rkb);
885 rd_kafka_broker_set_state(rkb,
886 RD_KAFKA_BROKER_STATE_TRY_CONNECT);
887 rd_kafka_broker_unlock(rkb);
888 }
889
890 now = rd_clock();
891 rkbuf->rkbuf_ts_enq = now;
892 rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_SENT;
893
894 /* Calculate request attempt timeout */
895 rd_kafka_buf_calc_timeout(rkb->rkb_rk, rkbuf, now);
896
897 if (likely(rkbuf->rkbuf_prio == RD_KAFKA_PRIO_NORMAL)) {
898 /* Insert request at tail of queue */
899 TAILQ_INSERT_TAIL(&rkb->rkb_outbufs.rkbq_bufs,
900 rkbuf, rkbuf_link);
901
902 } else {
903 /* Insert request after any requests with a higher or
904 * equal priority.
905 * Also make sure the request is after added any partially
906 * sent request (of any prio).
907 * We need to check if buf corrid is set rather than
908 * rkbuf_of since SSL_write may return 0 and expect the
909 * exact same arguments the next call. */
910 rd_kafka_buf_t *prev, *after = NULL;
911
912 TAILQ_FOREACH(prev, &rkb->rkb_outbufs.rkbq_bufs, rkbuf_link) {
913 if (prev->rkbuf_prio < rkbuf->rkbuf_prio &&
914 prev->rkbuf_corrid == 0)
915 break;
916 after = prev;
917 }
918
919 if (after)
920 TAILQ_INSERT_AFTER(&rkb->rkb_outbufs.rkbq_bufs,
921 after, rkbuf, rkbuf_link);
922 else
923 TAILQ_INSERT_HEAD(&rkb->rkb_outbufs.rkbq_bufs,
924 rkbuf, rkbuf_link);
925 }
926
927 rd_atomic32_add(&rkb->rkb_outbufs.rkbq_cnt, 1);
928 if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Produce)
929 rd_atomic32_add(&rkb->rkb_outbufs.rkbq_msg_cnt,
930 rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq));
931}
932
933
934/**
935 * Finalize a stuffed rkbuf for sending to broker.
936 */
937static void rd_kafka_buf_finalize (rd_kafka_t *rk, rd_kafka_buf_t *rkbuf) {
938 size_t totsize;
939
940 /* Calculate total request buffer length. */
941 totsize = rd_buf_len(&rkbuf->rkbuf_buf) - 4;
942 rd_assert(totsize <= (size_t)rk->rk_conf.max_msg_size);
943
944 /* Set up a buffer reader for sending the buffer. */
945 rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
946
947 /**
948 * Update request header fields
949 */
950 /* Total reuqest length */
951 rd_kafka_buf_update_i32(rkbuf, 0, (int32_t)totsize);
952
953 /* ApiVersion */
954 rd_kafka_buf_update_i16(rkbuf, 4+2, rkbuf->rkbuf_reqhdr.ApiVersion);
955}
956
957
958void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
959 rd_kafka_buf_t *rkbuf,
960 rd_kafka_resp_cb_t *resp_cb,
961 void *opaque) {
962
963
964 rkbuf->rkbuf_cb = resp_cb;
965 rkbuf->rkbuf_opaque = opaque;
966
967 rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf);
968
969 rd_kafka_broker_buf_enq0(rkb, rkbuf);
970}
971
972
973/**
974 * Enqueue buffer on broker's xmit queue, but fail buffer immediately
975 * if broker is not up.
976 *
977 * Locality: broker thread
978 */
979static int rd_kafka_broker_buf_enq2 (rd_kafka_broker_t *rkb,
980 rd_kafka_buf_t *rkbuf) {
981 if (unlikely(rkb->rkb_source == RD_KAFKA_INTERNAL)) {
982 /* Fail request immediately if this is the internal broker. */
983 rd_kafka_buf_callback(rkb->rkb_rk, rkb,
984 RD_KAFKA_RESP_ERR__TRANSPORT,
985 NULL, rkbuf);
986 return -1;
987 }
988
989 rd_kafka_broker_buf_enq0(rkb, rkbuf);
990
991 return 0;
992}
993
994
995
996/**
997 * Enqueue buffer for tranmission.
998 * Responses are enqueued on 'replyq' (RD_KAFKA_OP_RECV_BUF)
999 *
1000 * Locality: any thread
1001 */
1002void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
1003 rd_kafka_buf_t *rkbuf,
1004 rd_kafka_replyq_t replyq,
1005 rd_kafka_resp_cb_t *resp_cb,
1006 void *opaque) {
1007
1008 assert(rkbuf->rkbuf_rkb == rkb);
1009 if (resp_cb) {
1010 rkbuf->rkbuf_replyq = replyq;
1011 rkbuf->rkbuf_cb = resp_cb;
1012 rkbuf->rkbuf_opaque = opaque;
1013 } else {
1014 rd_dassert(!replyq.q);
1015 }
1016
1017 rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf);
1018
1019
1020 if (thrd_is_current(rkb->rkb_thread)) {
1021 rd_kafka_broker_buf_enq2(rkb, rkbuf);
1022
1023 } else {
1024 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_BUF);
1025 rko->rko_u.xbuf.rkbuf = rkbuf;
1026 rd_kafka_q_enq(rkb->rkb_ops, rko);
1027 }
1028}
1029
1030
1031
1032
1033/**
1034 * @returns the current broker state change version.
1035 * Pass this value to future rd_kafka_brokers_wait_state_change() calls
1036 * to avoid the race condition where a state-change happens between
1037 * an initial call to some API that fails and the sub-sequent
1038 * .._wait_state_change() call.
1039 */
1040int rd_kafka_brokers_get_state_version (rd_kafka_t *rk) {
1041 int version;
1042 mtx_lock(&rk->rk_broker_state_change_lock);
1043 version = rk->rk_broker_state_change_version;
1044 mtx_unlock(&rk->rk_broker_state_change_lock);
1045 return version;
1046}
1047
1048/**
1049 * @brief Wait at most \p timeout_ms for any state change for any broker.
1050 * \p stored_version is the value previously returned by
1051 * rd_kafka_brokers_get_state_version() prior to another API call
1052 * that failed due to invalid state.
1053 *
1054 * Triggers:
1055 * - broker state changes
1056 * - broker transitioning from blocking to non-blocking
1057 * - partition leader changes
1058 * - group state changes
1059 *
1060 * @remark There is no guarantee that a state change actually took place.
1061 *
1062 * @returns 1 if a state change was signaled (maybe), else 0 (timeout)
1063 *
1064 * @locality any thread
1065 */
1066int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
1067 int timeout_ms) {
1068 int r;
1069 mtx_lock(&rk->rk_broker_state_change_lock);
1070 if (stored_version != rk->rk_broker_state_change_version)
1071 r = 1;
1072 else
1073 r = cnd_timedwait_ms(&rk->rk_broker_state_change_cnd,
1074 &rk->rk_broker_state_change_lock,
1075 timeout_ms) == thrd_success;
1076 mtx_unlock(&rk->rk_broker_state_change_lock);
1077 return r;
1078}
1079
1080
1081/**
1082 * @brief Same as rd_kafka_brokers_wait_state_change() but will trigger
1083 * the wakeup asynchronously through the provided \p eonce.
1084 *
1085 * If the eonce was added to the wait list its reference count
1086 * will have been updated, this reference is later removed by
1087 * rd_kafka_broker_state_change_trigger_eonce() by calling trigger().
1088 *
1089 * @returns 1 if the \p eonce was added to the wait-broker-state-changes list,
1090 * or 0 if the \p stored_version is outdated in which case the
1091 * caller should redo the broker lookup.
1092 */
1093int rd_kafka_brokers_wait_state_change_async (rd_kafka_t *rk,
1094 int stored_version,
1095 rd_kafka_enq_once_t *eonce) {
1096 int r = 1;
1097 mtx_lock(&rk->rk_broker_state_change_lock);
1098
1099 if (stored_version != rk->rk_broker_state_change_version)
1100 r = 0;
1101 else {
1102 rd_kafka_enq_once_add_source(eonce, "wait broker state change");
1103 rd_list_add(&rk->rk_broker_state_change_waiters, eonce);
1104 }
1105
1106 mtx_unlock(&rk->rk_broker_state_change_lock);
1107 return r;
1108}
1109
1110
1111/**
1112 * @brief eonce trigger callback for rd_list_apply() call in
1113 * rd_kafka_brokers_broadcast_state_change()
1114 */
1115static int
1116rd_kafka_broker_state_change_trigger_eonce (void *elem, void *opaque) {
1117 rd_kafka_enq_once_t *eonce = elem;
1118 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
1119 "broker state change");
1120 return 0; /* remove eonce from list */
1121}
1122
1123
1124/**
1125 * @brief Broadcast broker state change to listeners, if any.
1126 *
1127 * @locality any thread
1128 */
1129void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk) {
1130
1131 rd_kafka_dbg(rk, GENERIC, "BROADCAST",
1132 "Broadcasting state change");
1133
1134 mtx_lock(&rk->rk_broker_state_change_lock);
1135
1136 /* Bump version */
1137 rk->rk_broker_state_change_version++;
1138
1139 /* Trigger waiters */
1140 rd_list_apply(&rk->rk_broker_state_change_waiters,
1141 rd_kafka_broker_state_change_trigger_eonce, NULL);
1142
1143 /* Broadcast to listeners */
1144 cnd_broadcast(&rk->rk_broker_state_change_cnd);
1145
1146 mtx_unlock(&rk->rk_broker_state_change_lock);
1147}
1148
1149
1150/**
1151 * @returns a random broker (with refcnt increased) with matching \p state
1152 * and where the \p filter function returns 0.
1153 *
1154 * Uses reservoir sampling.
1155 *
1156 * @param filter is an optional callback used to filter out undesired brokers.
1157 * The filter function should return 1 to filter out a broker,
1158 * or 0 to keep it in the list of eligible brokers to return.
1159 * rd_kafka_broker_lock() is held during the filter callback.
1160 *
1161 *
1162 * @locks rd_kafka_*lock() MUST be held
1163 * @locality any
1164 */
1165static rd_kafka_broker_t *
1166rd_kafka_broker_random (rd_kafka_t *rk,
1167 int state,
1168 int (*filter) (rd_kafka_broker_t *rk, void *opaque),
1169 void *opaque) {
1170 rd_kafka_broker_t *rkb, *good = NULL;
1171 int cnt = 0;
1172
1173 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1174 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
1175 continue;
1176
1177 rd_kafka_broker_lock(rkb);
1178 if ((int)rkb->rkb_state == state &&
1179 (!filter || !filter(rkb, opaque))) {
1180 if (cnt < 1 || rd_jitter(0, cnt) < 1) {
1181 if (good)
1182 rd_kafka_broker_destroy(good);
1183 rd_kafka_broker_keep(rkb);
1184 good = rkb;
1185 }
1186 cnt += 1;
1187 }
1188 rd_kafka_broker_unlock(rkb);
1189 }
1190
1191 return good;
1192}
1193
1194
1195/**
1196 * @brief Returns a random broker (with refcnt increased) in state \p state.
1197 *
1198 * Uses Reservoir sampling.
1199 *
1200 * @param filter is optional, see rd_kafka_broker_random().
1201 *
1202 * @sa rd_kafka_broker_random
1203 *
1204 * @locks rd_kafka_*lock(rk) MUST be held.
1205 * @locality any thread
1206 */
1207rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
1208 int (*filter) (rd_kafka_broker_t *rkb,
1209 void *opaque),
1210 void *opaque,
1211 const char *reason) {
1212 rd_kafka_broker_t *rkb;
1213
1214 rkb = rd_kafka_broker_random(rk, state, filter, opaque);
1215
1216 if (!rkb && rk->rk_conf.sparse_connections) {
1217 /* Sparse connections:
1218 * If no eligible broker was found, schedule
1219 * a random broker for connecting. */
1220 rd_kafka_connect_any(rk, reason);
1221 }
1222
1223 return rkb;
1224}
1225
1226
1227/**
1228 * @brief Spend at most \p timeout_ms to acquire a usable (Up && non-blocking)
1229 * broker.
1230 *
1231 * @returns A probably usable broker with increased refcount, or NULL on timeout
1232 * @locks rd_kafka_*lock() if !do_lock
1233 * @locality any
1234 */
1235rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk,
1236 int timeout_ms,
1237 int do_lock,
1238 const char *reason) {
1239 const rd_ts_t ts_end = rd_timeout_init(timeout_ms);
1240
1241 while (1) {
1242 rd_kafka_broker_t *rkb;
1243 int remains;
1244 int version = rd_kafka_brokers_get_state_version(rk);
1245
1246 /* Try non-blocking (e.g., non-fetching) brokers first. */
1247 if (do_lock)
1248 rd_kafka_rdlock(rk);
1249 rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
1250 rd_kafka_broker_filter_non_blocking,
1251 NULL, reason);
1252 if (!rkb)
1253 rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
1254 NULL, NULL, reason);
1255 if (do_lock)
1256 rd_kafka_rdunlock(rk);
1257
1258 if (rkb)
1259 return rkb;
1260
1261 remains = rd_timeout_remains(ts_end);
1262 if (rd_timeout_expired(remains))
1263 return NULL;
1264
1265 rd_kafka_brokers_wait_state_change(rk, version, remains);
1266 }
1267
1268 return NULL;
1269}
1270
1271
1272
1273/**
1274 * Returns a broker in state `state`, preferring the one with
1275 * matching `broker_id`.
1276 * Uses Reservoir sampling.
1277 *
1278 * Locks: rd_kafka_rdlock(rk) MUST be held.
1279 * Locality: any thread
1280 */
1281rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id,
1282 int state) {
1283 rd_kafka_broker_t *rkb, *good = NULL;
1284 int cnt = 0;
1285
1286 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1287 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
1288 continue;
1289
1290 rd_kafka_broker_lock(rkb);
1291 if ((int)rkb->rkb_state == state) {
1292 if (broker_id != -1 && rkb->rkb_nodeid == broker_id) {
1293 if (good)
1294 rd_kafka_broker_destroy(good);
1295 rd_kafka_broker_keep(rkb);
1296 good = rkb;
1297 rd_kafka_broker_unlock(rkb);
1298 break;
1299 }
1300 if (cnt < 1 || rd_jitter(0, cnt) < 1) {
1301 if (good)
1302 rd_kafka_broker_destroy(good);
1303 rd_kafka_broker_keep(rkb);
1304 good = rkb;
1305 }
1306 cnt += 1;
1307 }
1308 rd_kafka_broker_unlock(rkb);
1309 }
1310
1311 return good;
1312}
1313
1314
1315
1316/**
1317 * @returns the broker handle fork \p broker_id using cached metadata
1318 * information (if available) in state == \p state,
1319 * with refcount increaesd.
1320 *
1321 * Otherwise enqueues the \p eonce on the wait-state-change queue
1322 * which will be triggered on broker state changes.
1323 * It may also be triggered erroneously, so the caller
1324 * should call rd_kafka_broker_get_async() again when
1325 * the eonce is triggered.
1326 *
1327 * @locks none
1328 * @locality any thread
1329 */
1330rd_kafka_broker_t *
1331rd_kafka_broker_get_async (rd_kafka_t *rk, int32_t broker_id, int state,
1332 rd_kafka_enq_once_t *eonce) {
1333 int version;
1334 do {
1335 rd_kafka_broker_t *rkb;
1336
1337 version = rd_kafka_brokers_get_state_version(rk);
1338
1339 rd_kafka_rdlock(rk);
1340 rkb = rd_kafka_broker_find_by_nodeid0(rk, broker_id, state,
1341 rd_true);
1342 rd_kafka_rdunlock(rk);
1343
1344 if (rkb)
1345 return rkb;
1346
1347 } while (!rd_kafka_brokers_wait_state_change_async(rk, version, eonce));
1348
1349 return NULL; /* eonce added to wait list */
1350}
1351
1352
1353/**
1354 * @returns the current controller using cached metadata information,
1355 * and only if the broker's state == \p state.
1356 * The reference count is increased for the returned broker.
1357 *
1358 * @locks none
1359 * @locality any thread
1360 */
1361
1362static rd_kafka_broker_t *rd_kafka_broker_controller_nowait (rd_kafka_t *rk,
1363 int state) {
1364 rd_kafka_broker_t *rkb;
1365
1366 rd_kafka_rdlock(rk);
1367
1368 if (rk->rk_controllerid == -1) {
1369 rd_kafka_rdunlock(rk);
1370 rd_kafka_metadata_refresh_brokers(rk, NULL,
1371 "lookup controller");
1372 return NULL;
1373 }
1374
1375 rkb = rd_kafka_broker_find_by_nodeid0(rk, rk->rk_controllerid, state,
1376 rd_true);
1377
1378 rd_kafka_rdunlock(rk);
1379
1380 return rkb;
1381}
1382
1383
1384/**
1385 * @returns the current controller using cached metadata information if
1386 * available in state == \p state, with refcount increaesd.
1387 *
1388 * Otherwise enqueues the \p eonce on the wait-controller queue
1389 * which will be triggered on controller updates or broker state
1390 * changes. It may also be triggered erroneously, so the caller
1391 * should call rd_kafka_broker_controller_async() again when
1392 * the eonce is triggered.
1393 *
1394 * @locks none
1395 * @locality any thread
1396 */
1397rd_kafka_broker_t *
1398rd_kafka_broker_controller_async (rd_kafka_t *rk, int state,
1399 rd_kafka_enq_once_t *eonce) {
1400 int version;
1401 do {
1402 rd_kafka_broker_t *rkb;
1403
1404 version = rd_kafka_brokers_get_state_version(rk);
1405
1406 rkb = rd_kafka_broker_controller_nowait(rk, state);
1407 if (rkb)
1408 return rkb;
1409
1410 } while (!rd_kafka_brokers_wait_state_change_async(rk, version, eonce));
1411
1412 return NULL; /* eonce added to wait list */
1413}
1414
1415
1416/**
1417 * @returns the current controller using cached metadata information,
1418 * blocking up to \p abs_timeout for the controller to be known
1419 * and to reach state == \p state. The reference count is increased
1420 * for the returned broker.
1421 *
1422 * @locks none
1423 * @locality any thread
1424 */
1425rd_kafka_broker_t *rd_kafka_broker_controller (rd_kafka_t *rk, int state,
1426 rd_ts_t abs_timeout) {
1427
1428 while (1) {
1429 int version = rd_kafka_brokers_get_state_version(rk);
1430 rd_kafka_broker_t *rkb;
1431 int remains_ms;
1432
1433 rkb = rd_kafka_broker_controller_nowait(rk, state);
1434 if (rkb)
1435 return rkb;
1436
1437 remains_ms = rd_timeout_remains(abs_timeout);
1438 if (rd_timeout_expired(remains_ms))
1439 return NULL;
1440
1441 rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
1442 }
1443}
1444
1445
1446
1447
1448/**
1449 * Find a waitresp (rkbuf awaiting response) by the correlation id.
1450 */
1451static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb,
1452 int32_t corrid) {
1453 rd_kafka_buf_t *rkbuf;
1454 rd_ts_t now = rd_clock();
1455
1456 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
1457
1458 TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link)
1459 if (rkbuf->rkbuf_corrid == corrid) {
1460 /* Convert ts_sent to RTT */
1461 rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent;
1462 rd_avg_add(&rkb->rkb_avg_rtt, rkbuf->rkbuf_ts_sent);
1463
1464 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
1465 rd_atomic32_sub(&rkb->rkb_blocking_request_cnt,
1466 1) == 1)
1467 rd_kafka_brokers_broadcast_state_change(
1468 rkb->rkb_rk);
1469
1470 rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf);
1471 return rkbuf;
1472 }
1473 return NULL;
1474}
1475
1476
1477
1478
1479/**
1480 * Map a response message to a request.
1481 */
1482static int rd_kafka_req_response (rd_kafka_broker_t *rkb,
1483 rd_kafka_buf_t *rkbuf) {
1484 rd_kafka_buf_t *req;
1485
1486 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
1487
1488
1489 /* Find corresponding request message by correlation id */
1490 if (unlikely(!(req =
1491 rd_kafka_waitresp_find(rkb,
1492 rkbuf->rkbuf_reshdr.CorrId)))) {
1493 /* unknown response. probably due to request timeout */
1494 rd_atomic64_add(&rkb->rkb_c.rx_corrid_err, 1);
1495 rd_rkb_dbg(rkb, BROKER, "RESPONSE",
1496 "Response for unknown CorrId %"PRId32" (timed out?)",
1497 rkbuf->rkbuf_reshdr.CorrId);
1498 rd_kafka_buf_destroy(rkbuf);
1499 return -1;
1500 }
1501
1502 rd_rkb_dbg(rkb, PROTOCOL, "RECV",
1503 "Received %sResponse (v%hd, %"PRIusz" bytes, CorrId %"PRId32
1504 ", rtt %.2fms)",
1505 rd_kafka_ApiKey2str(req->rkbuf_reqhdr.ApiKey),
1506 req->rkbuf_reqhdr.ApiVersion,
1507 rkbuf->rkbuf_totlen, rkbuf->rkbuf_reshdr.CorrId,
1508 (float)req->rkbuf_ts_sent / 1000.0f);
1509
1510 /* Copy request's header to response object's reqhdr for convenience. */
1511 rkbuf->rkbuf_reqhdr = req->rkbuf_reqhdr;
1512
1513 /* Set up response reader slice starting past the response header */
1514 rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf,
1515 RD_KAFKAP_RESHDR_SIZE,
1516 rd_buf_len(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE);
1517
1518 if (!rkbuf->rkbuf_rkb) {
1519 rkbuf->rkbuf_rkb = rkb;
1520 rd_kafka_broker_keep(rkbuf->rkbuf_rkb);
1521 } else
1522 rd_assert(rkbuf->rkbuf_rkb == rkb);
1523
1524 /* Call callback. */
1525 rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, rkbuf, req);
1526
1527 return 0;
1528}
1529
1530
1531
1532
1533int rd_kafka_recv (rd_kafka_broker_t *rkb) {
1534 rd_kafka_buf_t *rkbuf;
1535 ssize_t r;
1536 /* errstr is not set by buf_read errors, so default it here. */
1537 char errstr[512] = "Protocol parse failure";
1538 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
1539 const int log_decode_errors = LOG_ERR;
1540
1541
1542 /* It is impossible to estimate the correct size of the response
1543 * so we split the read up in two parts: first we read the protocol
1544 * length and correlation id (i.e., the Response header), and then
1545 * when we know the full length of the response we allocate a new
1546 * buffer and call receive again.
1547 * All this in an async fashion (e.g., partial reads).
1548 */
1549 if (!(rkbuf = rkb->rkb_recv_buf)) {
1550 /* No receive in progress: create new buffer */
1551
1552 rkbuf = rd_kafka_buf_new(2, RD_KAFKAP_RESHDR_SIZE);
1553
1554 rkb->rkb_recv_buf = rkbuf;
1555
1556 /* Set up buffer reader for the response header. */
1557 rd_buf_write_ensure(&rkbuf->rkbuf_buf,
1558 RD_KAFKAP_RESHDR_SIZE,
1559 RD_KAFKAP_RESHDR_SIZE);
1560 }
1561
1562 rd_dassert(rd_buf_write_remains(&rkbuf->rkbuf_buf) > 0);
1563
1564 r = rd_kafka_transport_recv(rkb->rkb_transport, &rkbuf->rkbuf_buf,
1565 errstr, sizeof(errstr));
1566 if (unlikely(r <= 0)) {
1567 if (r == 0)
1568 return 0; /* EAGAIN */
1569 err = RD_KAFKA_RESP_ERR__TRANSPORT;
1570 rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
1571 goto err;
1572 }
1573
1574 if (rkbuf->rkbuf_totlen == 0) {
1575 /* Packet length not known yet. */
1576
1577 if (unlikely(rd_buf_write_pos(&rkbuf->rkbuf_buf) <
1578 RD_KAFKAP_RESHDR_SIZE)) {
1579 /* Need response header for packet length and corrid.
1580 * Wait for more data. */
1581 return 0;
1582 }
1583
1584 rd_assert(!rkbuf->rkbuf_rkb);
1585 rkbuf->rkbuf_rkb = rkb; /* Protocol parsing code needs
1586 * the rkb for logging, but we dont
1587 * want to keep a reference to the
1588 * broker this early since that extra
1589 * refcount will mess with the broker's
1590 * refcount-based termination code. */
1591
1592 /* Initialize reader */
1593 rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0,
1594 RD_KAFKAP_RESHDR_SIZE);
1595
1596 /* Read protocol header */
1597 rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.Size);
1598 rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.CorrId);
1599
1600 rkbuf->rkbuf_rkb = NULL; /* Reset */
1601
1602 rkbuf->rkbuf_totlen = rkbuf->rkbuf_reshdr.Size;
1603
1604 /* Make sure message size is within tolerable limits. */
1605 if (rkbuf->rkbuf_totlen < 4/*CorrId*/ ||
1606 rkbuf->rkbuf_totlen >
1607 (size_t)rkb->rkb_rk->rk_conf.recv_max_msg_size) {
1608 rd_snprintf(errstr, sizeof(errstr),
1609 "Invalid response size %"PRId32" (0..%i): "
1610 "increase receive.message.max.bytes",
1611 rkbuf->rkbuf_reshdr.Size,
1612 rkb->rkb_rk->rk_conf.recv_max_msg_size);
1613 err = RD_KAFKA_RESP_ERR__BAD_MSG;
1614 rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
1615 goto err;
1616 }
1617
1618 rkbuf->rkbuf_totlen -= 4; /*CorrId*/
1619
1620 if (rkbuf->rkbuf_totlen > 0) {
1621 /* Allocate another buffer that fits all data (short of
1622 * the common response header). We want all
1623 * data to be in contigious memory. */
1624
1625 rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf,
1626 rkbuf->rkbuf_totlen);
1627 }
1628 }
1629
1630 if (rd_buf_write_pos(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE ==
1631 rkbuf->rkbuf_totlen) {
1632 /* Message is complete, pass it on to the original requester. */
1633 rkb->rkb_recv_buf = NULL;
1634 rd_atomic64_add(&rkb->rkb_c.rx, 1);
1635 rd_atomic64_add(&rkb->rkb_c.rx_bytes,
1636 rd_buf_write_pos(&rkbuf->rkbuf_buf));
1637 rd_kafka_req_response(rkb, rkbuf);
1638 }
1639
1640 return 1;
1641
1642 err_parse:
1643 err = rkbuf->rkbuf_err;
1644 err:
1645 if (!strcmp(errstr, "Disconnected"))
1646 rd_kafka_broker_conn_closed(rkb, err, errstr);
1647 else
1648 rd_kafka_broker_fail(rkb, LOG_ERR, err,
1649 "Receive failed: %s", errstr);
1650 return -1;
1651}
1652
1653
1654/**
1655 * Linux version of socket_cb providing racefree CLOEXEC.
1656 */
1657int rd_kafka_socket_cb_linux (int domain, int type, int protocol,
1658 void *opaque) {
1659#ifdef SOCK_CLOEXEC
1660 return socket(domain, type | SOCK_CLOEXEC, protocol);
1661#else
1662 return rd_kafka_socket_cb_generic(domain, type, protocol, opaque);
1663#endif
1664}
1665
1666/**
1667 * Fallback version of socket_cb NOT providing racefree CLOEXEC,
1668 * but setting CLOEXEC after socket creation (if FD_CLOEXEC is defined).
1669 */
1670int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
1671 void *opaque) {
1672 int s;
1673 int on = 1;
1674 s = (int)socket(domain, type, protocol);
1675 if (s == -1)
1676 return -1;
1677#ifdef FD_CLOEXEC
1678 fcntl(s, F_SETFD, FD_CLOEXEC, &on);
1679#endif
1680 return s;
1681}
1682
1683
1684
1685/**
1686 * @brief Update the reconnect backoff.
1687 * Should be called when a connection is made.
1688 *
1689 * @locality broker thread
1690 * @locks none
1691 */
1692static void
1693rd_kafka_broker_update_reconnect_backoff (rd_kafka_broker_t *rkb,
1694 const rd_kafka_conf_t *conf,
1695 rd_ts_t now) {
1696 int backoff;
1697
1698 /* If last connection attempt was more than reconnect.backoff.max.ms
1699 * ago, reset the reconnect backoff to the initial
1700 * reconnect.backoff.ms value. */
1701 if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) <
1702 now)
1703 rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
1704
1705 /* Apply -25%...+50% jitter to next backoff. */
1706 backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),
1707 (int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
1708
1709 /* Cap to reconnect.backoff.max.ms. */
1710 backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
1711
1712 /* Set time of next reconnect */
1713 rkb->rkb_ts_reconnect = now + (backoff * 1000);
1714 rkb->rkb_reconnect_backoff_ms =
1715 RD_MIN(rkb->rkb_reconnect_backoff_ms* 2,
1716 conf->reconnect_backoff_max_ms);
1717}
1718
1719
1720/**
1721 * @brief Calculate time until next reconnect attempt.
1722 *
1723 * @returns the number of milliseconds to the next connection attempt, or 0
1724 * if immediate.
1725 * @locality broker thread
1726 * @locks none
1727 */
1728
1729static RD_INLINE int
1730rd_kafka_broker_reconnect_backoff (const rd_kafka_broker_t *rkb,
1731 rd_ts_t now) {
1732 rd_ts_t remains;
1733
1734 if (unlikely(rkb->rkb_ts_reconnect == 0))
1735 return 0; /* immediate */
1736
1737 remains = rkb->rkb_ts_reconnect - now;
1738 if (remains <= 0)
1739 return 0; /* immediate */
1740
1741 return (int)(remains / 1000);
1742}
1743
1744
1745/**
1746 * @brief Unittest for reconnect.backoff.ms
1747 */
1748static int rd_ut_reconnect_backoff (void) {
1749 rd_kafka_broker_t rkb = RD_ZERO_INIT;
1750 rd_kafka_conf_t conf = {
1751 .reconnect_backoff_ms = 10,
1752 .reconnect_backoff_max_ms = 90
1753 };
1754 rd_ts_t now = 1000000;
1755 int backoff;
1756
1757 rkb.rkb_reconnect_backoff_ms = conf.reconnect_backoff_ms;
1758
1759 /* broker's backoff is the initial reconnect.backoff.ms=10 */
1760 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
1761 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
1762 RD_UT_ASSERT_RANGE(backoff, 7, 15, "%d");
1763
1764 /* .. 20 */
1765 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
1766 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
1767 RD_UT_ASSERT_RANGE(backoff, 15, 30, "%d");
1768
1769 /* .. 40 */
1770 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
1771 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
1772 RD_UT_ASSERT_RANGE(backoff, 30, 60, "%d");
1773
1774 /* .. 80, the jitter is capped at reconnect.backoff.max.ms=90 */
1775 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
1776 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
1777 RD_UT_ASSERT_RANGE(backoff, 60, conf.reconnect_backoff_max_ms, "%d");
1778
1779 /* .. 90, capped by reconnect.backoff.max.ms */
1780 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
1781 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
1782 RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
1783
1784 /* .. 90, should remain at capped value. */
1785 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
1786 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
1787 RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
1788
1789 RD_UT_PASS();
1790}
1791
1792
1793/**
1794 * @brief Initiate asynchronous connection attempt to the next address
1795 * in the broker's address list.
1796 * While the connect is asynchronous and its IO served in the
1797 * CONNECT state, the initial name resolve is blocking.
1798 *
1799 * @returns -1 on error, 0 if broker does not have a hostname, or 1
1800 * if the connection is now in progress.
1801 */
1802static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
1803 const rd_sockaddr_inx_t *sinx;
1804 char errstr[512];
1805 char nodename[RD_KAFKA_NODENAME_SIZE];
1806
1807 rd_rkb_dbg(rkb, BROKER, "CONNECT",
1808 "broker in state %s connecting",
1809 rd_kafka_broker_state_names[rkb->rkb_state]);
1810
1811 rd_atomic32_add(&rkb->rkb_c.connects, 1);
1812
1813 rd_kafka_broker_lock(rkb);
1814 strncpy(nodename, rkb->rkb_nodename, sizeof(nodename));
1815 rkb->rkb_connect_epoch = rkb->rkb_nodename_epoch;
1816 /* Logical brokers might not have a hostname set, in which case
1817 * we should not try to connect. */
1818 if (*nodename)
1819 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECT);
1820 rd_kafka_broker_unlock(rkb);
1821
1822 if (!*nodename) {
1823 rd_rkb_dbg(rkb, BROKER, "CONNECT",
1824 "broker has no address yet: postponing connect");
1825 return 0;
1826 }
1827
1828 rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf,
1829 rd_clock());
1830
1831 if (rd_kafka_broker_resolve(rkb, nodename) == -1)
1832 return -1;
1833
1834 sinx = rd_sockaddr_list_next(rkb->rkb_rsal);
1835
1836 rd_kafka_assert(rkb->rkb_rk, !rkb->rkb_transport);
1837
1838 if (!(rkb->rkb_transport = rd_kafka_transport_connect(rkb, sinx,
1839 errstr, sizeof(errstr)))) {
1840 /* Avoid duplicate log messages */
1841 if (rkb->rkb_err.err == errno)
1842 rd_kafka_broker_fail(rkb, LOG_DEBUG,
1843 RD_KAFKA_RESP_ERR__FAIL, NULL);
1844 else
1845 rd_kafka_broker_fail(rkb, LOG_ERR,
1846 RD_KAFKA_RESP_ERR__TRANSPORT,
1847 "%s", errstr);
1848 return -1;
1849 }
1850
1851 return 0;
1852}
1853
1854
1855/**
1856 * @brief Call when connection is ready to transition to fully functional
1857 * UP state.
1858 *
1859 * @locality Broker thread
1860 */
1861void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb) {
1862
1863 rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
1864 rkb->rkb_err.err = 0;
1865
1866 rd_kafka_broker_lock(rkb);
1867 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
1868 rd_kafka_broker_unlock(rkb);
1869
1870 /* Request metadata (async):
1871 * try locally known topics first and if there are none try
1872 * getting just the broker list. */
1873 if (rd_kafka_metadata_refresh_known_topics(NULL, rkb, 0/*dont force*/,
1874 "connected") ==
1875 RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
1876 rd_kafka_metadata_refresh_brokers(NULL, rkb, "connected");
1877}
1878
1879
1880
1881static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb);
1882
1883
1884/**
1885 * @brief Parses and handles SaslMechanism response, transitions
1886 * the broker state.
1887 *
1888 */
1889static void
1890rd_kafka_broker_handle_SaslHandshake (rd_kafka_t *rk,
1891 rd_kafka_broker_t *rkb,
1892 rd_kafka_resp_err_t err,
1893 rd_kafka_buf_t *rkbuf,
1894 rd_kafka_buf_t *request,
1895 void *opaque) {
1896 const int log_decode_errors = LOG_ERR;
1897 int32_t MechCnt;
1898 int16_t ErrorCode;
1899 int i = 0;
1900 char *mechs = "(n/a)";
1901 size_t msz, mof = 0;
1902
1903 if (err == RD_KAFKA_RESP_ERR__DESTROY)
1904 return;
1905
1906 if (err)
1907 goto err;
1908
1909 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1910 rd_kafka_buf_read_i32(rkbuf, &MechCnt);
1911
1912 /* Build a CSV string of supported mechanisms. */
1913 msz = RD_MIN(511, MechCnt * 32);
1914 mechs = rd_alloca(msz);
1915 *mechs = '\0';
1916
1917 for (i = 0 ; i < MechCnt ; i++) {
1918 rd_kafkap_str_t mech;
1919 rd_kafka_buf_read_str(rkbuf, &mech);
1920
1921 mof += rd_snprintf(mechs+mof, msz-mof, "%s%.*s",
1922 i ? ",":"", RD_KAFKAP_STR_PR(&mech));
1923
1924 if (mof >= msz)
1925 break;
1926 }
1927
1928 rd_rkb_dbg(rkb,
1929 PROTOCOL | RD_KAFKA_DBG_SECURITY | RD_KAFKA_DBG_BROKER,
1930 "SASLMECHS", "Broker supported SASL mechanisms: %s",
1931 mechs);
1932
1933 if (ErrorCode) {
1934 err = ErrorCode;
1935 goto err;
1936 }
1937
1938 /* Circle back to connect_auth() to start proper AUTH state. */
1939 rd_kafka_broker_connect_auth(rkb);
1940 return;
1941
1942 err_parse:
1943 err = rkbuf->rkbuf_err;
1944 err:
1945 rd_kafka_broker_fail(rkb, LOG_ERR,
1946 RD_KAFKA_RESP_ERR__AUTHENTICATION,
1947 "SASL %s mechanism handshake failed: %s: "
1948 "broker's supported mechanisms: %s",
1949 rkb->rkb_rk->rk_conf.sasl.mechanisms,
1950 rd_kafka_err2str(err), mechs);
1951}
1952
1953
1954/**
1955 * @brief Transition state to:
1956 * - AUTH_HANDSHAKE (if SASL is configured and handshakes supported)
1957 * - AUTH (if SASL is configured but no handshake is required or
1958 * not supported, or has already taken place.)
1959 * - UP (if SASL is not configured)
1960 */
1961static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb) {
1962
1963 if ((rkb->rkb_proto == RD_KAFKA_PROTO_SASL_PLAINTEXT ||
1964 rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL)) {
1965
1966 rd_rkb_dbg(rkb, SECURITY | RD_KAFKA_DBG_BROKER, "AUTH",
1967 "Auth in state %s (handshake %ssupported)",
1968 rd_kafka_broker_state_names[rkb->rkb_state],
1969 (rkb->rkb_features&RD_KAFKA_FEATURE_SASL_HANDSHAKE)
1970 ? "" : "not ");
1971
1972 /* Broker >= 0.10.0: send request to select mechanism */
1973 if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE &&
1974 (rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
1975
1976 rd_kafka_broker_lock(rkb);
1977 rd_kafka_broker_set_state(
1978 rkb, RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE);
1979 rd_kafka_broker_unlock(rkb);
1980
1981 rd_kafka_SaslHandshakeRequest(
1982 rkb, rkb->rkb_rk->rk_conf.sasl.mechanisms,
1983 RD_KAFKA_NO_REPLYQ,
1984 rd_kafka_broker_handle_SaslHandshake,
1985 NULL);
1986 } else {
1987 /* Either Handshake succeeded (protocol selected)
1988 * or Handshakes were not supported.
1989 * In both cases continue with authentication. */
1990 char sasl_errstr[512];
1991
1992 rd_kafka_broker_lock(rkb);
1993 rd_kafka_broker_set_state(rkb,
1994 RD_KAFKA_BROKER_STATE_AUTH);
1995 rd_kafka_broker_unlock(rkb);
1996
1997 if (rd_kafka_sasl_client_new(
1998 rkb->rkb_transport, sasl_errstr,
1999 sizeof(sasl_errstr)) == -1) {
2000 errno = EINVAL;
2001 rd_kafka_broker_fail(
2002 rkb, LOG_ERR,
2003 RD_KAFKA_RESP_ERR__AUTHENTICATION,
2004 "Failed to initialize "
2005 "SASL authentication: %s",
2006 sasl_errstr);
2007 return;
2008 }
2009
2010 /* Enter non-Kafka-protocol-framed SASL communication
2011 * state handled in rdkafka_sasl.c */
2012 rd_kafka_broker_lock(rkb);
2013 rd_kafka_broker_set_state(rkb,
2014 RD_KAFKA_BROKER_STATE_AUTH);
2015 rd_kafka_broker_unlock(rkb);
2016 }
2017
2018 return;
2019 }
2020
2021 /* No authentication required. */
2022 rd_kafka_broker_connect_up(rkb);
2023}
2024
2025
2026/**
2027 * @brief Specify API versions to use for this connection.
2028 *
2029 * @param apis is an allocated list of supported partitions.
2030 * If NULL the default set will be used based on the
2031 * \p broker.version.fallback property.
2032 * @param api_cnt number of elements in \p apis
2033 *
2034 * @remark \p rkb takes ownership of \p apis.
2035 *
2036 * @locality Broker thread
2037 * @locks none
2038 */
2039static void rd_kafka_broker_set_api_versions (rd_kafka_broker_t *rkb,
2040 struct rd_kafka_ApiVersion *apis,
2041 size_t api_cnt) {
2042
2043 rd_kafka_broker_lock(rkb);
2044
2045 if (rkb->rkb_ApiVersions)
2046 rd_free(rkb->rkb_ApiVersions);
2047
2048
2049 if (!apis) {
2050 rd_rkb_dbg(rkb, PROTOCOL | RD_KAFKA_DBG_BROKER, "APIVERSION",
2051 "Using (configuration fallback) %s protocol features",
2052 rkb->rkb_rk->rk_conf.broker_version_fallback);
2053
2054
2055 rd_kafka_get_legacy_ApiVersions(rkb->rkb_rk->rk_conf.
2056 broker_version_fallback,
2057 &apis, &api_cnt,
2058 rkb->rkb_rk->rk_conf.
2059 broker_version_fallback);
2060
2061 /* Make a copy to store on broker. */
2062 rd_kafka_ApiVersions_copy(apis, api_cnt, &apis, &api_cnt);
2063 }
2064
2065 rkb->rkb_ApiVersions = apis;
2066 rkb->rkb_ApiVersions_cnt = api_cnt;
2067
2068 /* Update feature set based on supported broker APIs. */
2069 rd_kafka_broker_features_set(rkb,
2070 rd_kafka_features_check(rkb, apis, api_cnt));
2071
2072 rd_kafka_broker_unlock(rkb);
2073}
2074
2075
2076/**
2077 * Handler for ApiVersion response.
2078 */
2079static void
2080rd_kafka_broker_handle_ApiVersion (rd_kafka_t *rk,
2081 rd_kafka_broker_t *rkb,
2082 rd_kafka_resp_err_t err,
2083 rd_kafka_buf_t *rkbuf,
2084 rd_kafka_buf_t *request, void *opaque) {
2085 struct rd_kafka_ApiVersion *apis;
2086 size_t api_cnt;
2087
2088 if (err == RD_KAFKA_RESP_ERR__DESTROY)
2089 return;
2090
2091 err = rd_kafka_handle_ApiVersion(rk, rkb, err, rkbuf, request,
2092 &apis, &api_cnt);
2093
2094 if (err) {
2095 rd_kafka_broker_fail(rkb, LOG_DEBUG,
2096 RD_KAFKA_RESP_ERR__TRANSPORT,
2097 "ApiVersionRequest failed: %s: "
2098 "probably due to old broker version",
2099 rd_kafka_err2str(err));
2100 return;
2101 }
2102
2103 rd_kafka_broker_set_api_versions(rkb, apis, api_cnt);
2104
2105 rd_kafka_broker_connect_auth(rkb);
2106}
2107
2108
2109/**
2110 * Call when asynchronous connection attempt completes, either succesfully
2111 * (if errstr is NULL) or fails.
2112 *
2113 * Locality: broker thread
2114 */
2115void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr) {
2116
2117 if (errstr) {
2118 /* Connect failed */
2119 rd_kafka_broker_fail(rkb,
2120 errno != 0 && rkb->rkb_err.err == errno ?
2121 LOG_DEBUG : LOG_ERR,
2122 RD_KAFKA_RESP_ERR__TRANSPORT,
2123 "%s", errstr);
2124 return;
2125 }
2126
2127 /* Connect succeeded */
2128 rkb->rkb_connid++;
2129 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL,
2130 "CONNECTED", "Connected (#%d)", rkb->rkb_connid);
2131 rkb->rkb_err.err = 0;
2132 rkb->rkb_max_inflight = 1; /* Hold back other requests until
2133 * ApiVersion, SaslHandshake, etc
2134 * are done. */
2135
2136 rd_kafka_transport_poll_set(rkb->rkb_transport, POLLIN);
2137
2138 if (rkb->rkb_rk->rk_conf.api_version_request &&
2139 rd_interval_immediate(&rkb->rkb_ApiVersion_fail_intvl, 0, 0) > 0) {
2140 /* Use ApiVersion to query broker for supported API versions. */
2141 rd_kafka_broker_feature_enable(rkb, RD_KAFKA_FEATURE_APIVERSION);
2142 }
2143
2144 if (!(rkb->rkb_features & RD_KAFKA_FEATURE_APIVERSION)) {
2145 /* Use configured broker.version.fallback to
2146 * figure out API versions.
2147 * In case broker.version.fallback indicates a version
2148 * that supports ApiVersionRequest it will update
2149 * rkb_features to have FEATURE_APIVERSION set which will
2150 * trigger an ApiVersionRequest below. */
2151 rd_kafka_broker_set_api_versions(rkb, NULL, 0);
2152 }
2153
2154 if (rkb->rkb_features & RD_KAFKA_FEATURE_APIVERSION) {
2155 /* Query broker for supported API versions.
2156 * This may fail with a disconnect on non-supporting brokers
2157 * so hold off any other requests until we get a response,
2158 * and if the connection is torn down we disable this feature. */
2159 rd_kafka_broker_lock(rkb);
2160 rd_kafka_broker_set_state(rkb,RD_KAFKA_BROKER_STATE_APIVERSION_QUERY);
2161 rd_kafka_broker_unlock(rkb);
2162
2163 rd_kafka_ApiVersionRequest(
2164 rkb, RD_KAFKA_NO_REPLYQ,
2165 rd_kafka_broker_handle_ApiVersion, NULL);
2166 } else {
2167 /* Authenticate if necessary */
2168 rd_kafka_broker_connect_auth(rkb);
2169 }
2170
2171}
2172
2173
2174
2175/**
2176 * @brief Checks if the given API request+version is supported by the broker.
2177 * @returns 1 if supported, else 0.
2178 * @locality broker thread
2179 * @locks none
2180 */
2181static RD_INLINE int
2182rd_kafka_broker_request_supported (rd_kafka_broker_t *rkb,
2183 rd_kafka_buf_t *rkbuf) {
2184 struct rd_kafka_ApiVersion skel = {
2185 .ApiKey = rkbuf->rkbuf_reqhdr.ApiKey
2186 };
2187 struct rd_kafka_ApiVersion *ret;
2188
2189 if (unlikely(rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_ApiVersion))
2190 return 1; /* ApiVersion requests are used to detect
2191 * the supported API versions, so should always
2192 * be allowed through. */
2193
2194 /* First try feature flags, if any, which may cover a larger
2195 * set of APIs. */
2196 if (rkbuf->rkbuf_features)
2197 return (rkb->rkb_features & rkbuf->rkbuf_features) ==
2198 rkbuf->rkbuf_features;
2199
2200 /* Then try the ApiVersion map. */
2201 ret = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt,
2202 sizeof(*rkb->rkb_ApiVersions),
2203 rd_kafka_ApiVersion_key_cmp);
2204 if (!ret)
2205 return 0;
2206
2207 return ret->MinVer <= rkbuf->rkbuf_reqhdr.ApiVersion &&
2208 rkbuf->rkbuf_reqhdr.ApiVersion <= ret->MaxVer;
2209}
2210
2211
2212/**
2213 * Send queued messages to broker
2214 *
2215 * Locality: io thread
2216 */
2217int rd_kafka_send (rd_kafka_broker_t *rkb) {
2218 rd_kafka_buf_t *rkbuf;
2219 unsigned int cnt = 0;
2220
2221 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
2222
2223 while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
2224 rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
2225 (rkbuf = TAILQ_FIRST(&rkb->rkb_outbufs.rkbq_bufs))) {
2226 ssize_t r;
2227 size_t pre_of = rd_slice_offset(&rkbuf->rkbuf_reader);
2228 rd_ts_t now;
2229
2230 /* Check for broker support */
2231 if (unlikely(!rd_kafka_broker_request_supported(rkb, rkbuf))) {
2232 rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
2233 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL,
2234 "UNSUPPORTED",
2235 "Failing %sResponse "
2236 "(v%hd, %"PRIusz" bytes, CorrId %"PRId32"): "
2237 "request not supported by broker "
2238 "(missing api.version.request or "
2239 "incorrect broker.version.fallback config?)",
2240 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
2241 ApiKey),
2242 rkbuf->rkbuf_reqhdr.ApiVersion,
2243 rkbuf->rkbuf_totlen,
2244 rkbuf->rkbuf_reshdr.CorrId);
2245 rd_kafka_buf_callback(
2246 rkb->rkb_rk, rkb,
2247 RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
2248 NULL, rkbuf);
2249 continue;
2250 }
2251
2252 /* Set CorrId header field, unless this is the latter part
2253 * of a partial send in which case the corrid has already
2254 * been set.
2255 * Due to how SSL_write() will accept a buffer but still
2256 * return 0 in some cases we can't rely on the buffer offset
2257 * but need to use corrid to check this. SSL_write() expects
2258 * us to send the same buffer again when 0 is returned.
2259 */
2260 if (rkbuf->rkbuf_corrid == 0 ||
2261 rkbuf->rkbuf_connid != rkb->rkb_connid) {
2262 rd_assert(rd_slice_offset(&rkbuf->rkbuf_reader) == 0);
2263 rkbuf->rkbuf_corrid = ++rkb->rkb_corrid;
2264 rd_kafka_buf_update_i32(rkbuf, 4+2+2,
2265 rkbuf->rkbuf_corrid);
2266 rkbuf->rkbuf_connid = rkb->rkb_connid;
2267 } else if (pre_of > RD_KAFKAP_REQHDR_SIZE) {
2268 rd_kafka_assert(NULL,
2269 rkbuf->rkbuf_connid == rkb->rkb_connid);
2270 }
2271
2272 if (0) {
2273 rd_rkb_dbg(rkb, PROTOCOL, "SEND",
2274 "Send %s corrid %"PRId32" at "
2275 "offset %"PRIusz"/%"PRIusz,
2276 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
2277 ApiKey),
2278 rkbuf->rkbuf_corrid,
2279 pre_of, rd_slice_size(&rkbuf->rkbuf_reader));
2280 }
2281
2282 if ((r = rd_kafka_broker_send(rkb, &rkbuf->rkbuf_reader)) == -1)
2283 return -1;
2284
2285 now = rd_clock();
2286 rkb->rkb_ts_tx_last = now;
2287
2288 /* Partial send? Continue next time. */
2289 if (rd_slice_remains(&rkbuf->rkbuf_reader) > 0) {
2290 rd_rkb_dbg(rkb, PROTOCOL, "SEND",
2291 "Sent partial %sRequest "
2292 "(v%hd, "
2293 "%"PRIdsz"+%"PRIdsz"/%"PRIusz" bytes, "
2294 "CorrId %"PRId32")",
2295 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
2296 ApiKey),
2297 rkbuf->rkbuf_reqhdr.ApiVersion,
2298 (ssize_t)pre_of, r,
2299 rd_slice_size(&rkbuf->rkbuf_reader),
2300 rkbuf->rkbuf_corrid);
2301 return 0;
2302 }
2303
2304 rd_rkb_dbg(rkb, PROTOCOL, "SEND",
2305 "Sent %sRequest (v%hd, %"PRIusz" bytes @ %"PRIusz", "
2306 "CorrId %"PRId32")",
2307 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
2308 rkbuf->rkbuf_reqhdr.ApiVersion,
2309 rd_slice_size(&rkbuf->rkbuf_reader),
2310 pre_of, rkbuf->rkbuf_corrid);
2311
2312 rd_atomic64_add(&rkb->rkb_c.reqtype[rkbuf->rkbuf_reqhdr.ApiKey],
2313 1);
2314
2315 /* Notify transport layer of full request sent */
2316 if (likely(rkb->rkb_transport != NULL))
2317 rd_kafka_transport_request_sent(rkb, rkbuf);
2318
2319 /* Entire buffer sent, unlink from outbuf */
2320 rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
2321 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_SENT;
2322
2323 /* Store time for RTT calculation */
2324 rkbuf->rkbuf_ts_sent = now;
2325
2326 /* Add to outbuf_latency averager */
2327 rd_avg_add(&rkb->rkb_avg_outbuf_latency,
2328 rkbuf->rkbuf_ts_sent - rkbuf->rkbuf_ts_enq);
2329
2330 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
2331 rd_atomic32_add(&rkb->rkb_blocking_request_cnt, 1) == 1)
2332 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
2333
2334 /* Put buffer on response wait list unless we are not
2335 * expecting a response (required_acks=0). */
2336 if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE))
2337 rd_kafka_bufq_enq(&rkb->rkb_waitresps, rkbuf);
2338 else { /* Call buffer callback for delivery report. */
2339 rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
2340 }
2341
2342 cnt++;
2343 }
2344
2345 return cnt;
2346}
2347
2348
2349/**
2350 * Add 'rkbuf' to broker 'rkb's retry queue.
2351 */
2352void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
2353
2354 /* Restore original replyq since replyq.q will have been NULLed
2355 * by buf_callback()/replyq_enq(). */
2356 if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) {
2357 rkbuf->rkbuf_replyq = rkbuf->rkbuf_orig_replyq;
2358 rd_kafka_replyq_clear(&rkbuf->rkbuf_orig_replyq);
2359 }
2360
2361 /* If called from another thread than rkb's broker thread
2362 * enqueue the buffer on the broker's op queue. */
2363 if (!thrd_is_current(rkb->rkb_thread)) {
2364 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_RETRY);
2365 rko->rko_u.xbuf.rkbuf = rkbuf;
2366 rd_kafka_q_enq(rkb->rkb_ops, rko);
2367 return;
2368 }
2369
2370 rd_rkb_dbg(rkb, PROTOCOL, "RETRY",
2371 "Retrying %sRequest (v%hd, %"PRIusz" bytes, retry %d/%d, "
2372 "prev CorrId %"PRId32") in %dms",
2373 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
2374 rkbuf->rkbuf_reqhdr.ApiVersion,
2375 rd_slice_size(&rkbuf->rkbuf_reader),
2376 rkbuf->rkbuf_retries, rkb->rkb_rk->rk_conf.max_retries,
2377 rkbuf->rkbuf_corrid,
2378 rkb->rkb_rk->rk_conf.retry_backoff_ms);
2379
2380 rd_atomic64_add(&rkb->rkb_c.tx_retries, 1);
2381
2382 rkbuf->rkbuf_ts_retry = rd_clock() +
2383 (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
2384 /* Precaution: time out the request if it hasn't moved from the
2385 * retry queue within the retry interval (such as when the broker is
2386 * down). */
2387 // FIXME: implememt this properly.
2388 rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_ts_retry + (5*1000*1000);
2389
2390 /* Reset send offset */
2391 rd_slice_seek(&rkbuf->rkbuf_reader, 0);
2392 rkbuf->rkbuf_corrid = 0;
2393
2394 rd_kafka_bufq_enq(&rkb->rkb_retrybufs, rkbuf);
2395}
2396
2397
2398/**
2399 * Move buffers that have expired their retry backoff time from the
2400 * retry queue to the outbuf.
2401 */
2402static void rd_kafka_broker_retry_bufs_move (rd_kafka_broker_t *rkb) {
2403 rd_ts_t now = rd_clock();
2404 rd_kafka_buf_t *rkbuf;
2405 int cnt = 0;
2406
2407 while ((rkbuf = TAILQ_FIRST(&rkb->rkb_retrybufs.rkbq_bufs))) {
2408 if (rkbuf->rkbuf_ts_retry > now)
2409 break;
2410
2411 rd_kafka_bufq_deq(&rkb->rkb_retrybufs, rkbuf);
2412
2413 rd_kafka_broker_buf_enq0(rkb, rkbuf);
2414 cnt++;
2415 }
2416
2417 if (cnt > 0)
2418 rd_rkb_dbg(rkb, BROKER, "RETRY",
2419 "Moved %d retry buffer(s) to output queue", cnt);
2420}
2421
2422
2423/**
2424 * @brief Propagate delivery report for entire message queue.
2425 *
2426 * @param err The error which will be set on each message.
2427 * @param status The status which will be set on each message.
2428 *
2429 * To avoid extra iterations, the \p err and \p status are set on
2430 * the message as they are popped off the OP_DR msgq in rd_kafka_poll() et.al
2431 */
2432void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
2433 rd_kafka_msgq_t *rkmq,
2434 rd_kafka_resp_err_t err) {
2435 rd_kafka_t *rk = rkt->rkt_rk;
2436
2437 if (unlikely(rd_kafka_msgq_len(rkmq) == 0))
2438 return;
2439
2440 /* Call on_acknowledgement() interceptors */
2441 rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err);
2442
2443 if ((rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) &&
2444 (!rk->rk_conf.dr_err_only || err)) {
2445 /* Pass all messages to application thread in one op. */
2446 rd_kafka_op_t *rko;
2447
2448 rko = rd_kafka_op_new(RD_KAFKA_OP_DR);
2449 rko->rko_err = err;
2450 rko->rko_u.dr.s_rkt = rd_kafka_topic_keep(rkt);
2451 rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
2452
2453 /* Move all messages to op's msgq */
2454 rd_kafka_msgq_move(&rko->rko_u.dr.msgq, rkmq);
2455
2456 rd_kafka_q_enq(rk->rk_rep, rko);
2457
2458 } else {
2459 /* No delivery report callback. */
2460
2461 /* Destroy the messages right away. */
2462 rd_kafka_msgq_purge(rk, rkmq);
2463 }
2464}
2465
2466
2467/**
2468 * @brief Trigger delivery reports for implicitly acked messages.
2469 *
2470 * @locks none
2471 * @locality broker thread - either last or current leader
2472 */
2473void rd_kafka_dr_implicit_ack (rd_kafka_broker_t *rkb,
2474 rd_kafka_toppar_t *rktp,
2475 uint64_t last_msgid) {
2476 rd_kafka_msgq_t acked = RD_KAFKA_MSGQ_INITIALIZER(acked);
2477 rd_kafka_msgq_t acked2 = RD_KAFKA_MSGQ_INITIALIZER(acked2);
2478 rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
2479
2480 if (rktp->rktp_rkt->rkt_conf.required_acks != 0)
2481 status = RD_KAFKA_MSG_STATUS_PERSISTED;
2482
2483 rd_kafka_msgq_move_acked(&acked, &rktp->rktp_xmit_msgq, last_msgid,
2484 status);
2485 rd_kafka_msgq_move_acked(&acked2, &rktp->rktp_msgq, last_msgid,
2486 status);
2487
2488 /* Insert acked2 into acked in correct order */
2489 rd_kafka_msgq_insert_msgq(&acked, &acked2,
2490 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
2491
2492 if (!rd_kafka_msgq_len(&acked))
2493 return;
2494
2495 rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "IMPLICITACK",
2496 "%.*s [%"PRId32"] %d message(s) implicitly acked "
2497 "by subsequent batch success "
2498 "(msgids %"PRIu64"..%"PRIu64", "
2499 "last acked %"PRIu64")",
2500 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2501 rktp->rktp_partition,
2502 rd_kafka_msgq_len(&acked),
2503 rd_kafka_msgq_first(&acked)->rkm_u.producer.msgid,
2504 rd_kafka_msgq_last(&acked)->rkm_u.producer.msgid,
2505 last_msgid);
2506
2507 /* Trigger delivery reports */
2508 rd_kafka_dr_msgq(rktp->rktp_rkt, &acked, RD_KAFKA_RESP_ERR_NO_ERROR);
2509}
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520/**
2521 * @brief Map and assign existing partitions to this broker using
2522 * the leader-id.
2523 *
2524 * @locks none
2525 * @locality any
2526 */
2527static void rd_kafka_broker_map_partitions (rd_kafka_broker_t *rkb) {
2528 rd_kafka_t *rk = rkb->rkb_rk;
2529 rd_kafka_itopic_t *rkt;
2530 int cnt = 0;
2531
2532 if (rkb->rkb_nodeid == -1)
2533 return;
2534
2535 rd_kafka_rdlock(rk);
2536 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
2537 int i;
2538
2539 rd_kafka_topic_wrlock(rkt);
2540 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) {
2541 shptr_rd_kafka_toppar_t *s_rktp = rkt->rkt_p[i];
2542 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
2543
2544 /* Only map unassigned partitions matching this broker*/
2545 rd_kafka_toppar_lock(rktp);
2546 if (rktp->rktp_leader_id == rkb->rkb_nodeid &&
2547 !(rktp->rktp_leader && rktp->rktp_next_leader)) {
2548 rd_kafka_toppar_leader_update(
2549 rktp, rktp->rktp_leader_id, rkb);
2550 cnt++;
2551 }
2552 rd_kafka_toppar_unlock(rktp);
2553 }
2554 rd_kafka_topic_wrunlock(rkt);
2555 }
2556 rd_kafka_rdunlock(rk);
2557
2558 rd_rkb_dbg(rkb, TOPIC|RD_KAFKA_DBG_BROKER, "LEADER",
2559 "Mapped %d partition(s) to broker", cnt);
2560}
2561
2562
2563/**
2564 * @brief Broker id comparator
2565 */
2566static int rd_kafka_broker_cmp_by_id (const void *_a, const void *_b) {
2567 const rd_kafka_broker_t *a = _a, *b = _b;
2568 return a->rkb_nodeid - b->rkb_nodeid;
2569}
2570
2571
2572/**
2573 * @brief Set the broker logname (used in logs) to a copy of \p logname.
2574 *
2575 * @locality any
2576 * @locks none
2577 */
2578static void rd_kafka_broker_set_logname (rd_kafka_broker_t *rkb,
2579 const char *logname) {
2580 mtx_lock(&rkb->rkb_logname_lock);
2581 if (rkb->rkb_logname)
2582 rd_free(rkb->rkb_logname);
2583 rkb->rkb_logname = rd_strdup(logname);
2584 mtx_unlock(&rkb->rkb_logname_lock);
2585}
2586
2587/**
2588 * @brief Serve a broker op (an op posted by another thread to be handled by
2589 * this broker's thread).
2590 *
2591 * @returns 0 if calling op loop should break out, else 1 to continue.
2592 * @locality broker thread
2593 * @locks none
2594 */
2595static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
2596 rd_kafka_op_t *rko) {
2597 shptr_rd_kafka_toppar_t *s_rktp;
2598 rd_kafka_toppar_t *rktp;
2599 int ret = 1;
2600
2601 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
2602
2603 switch (rko->rko_type)
2604 {
2605 case RD_KAFKA_OP_NODE_UPDATE:
2606 {
2607 enum {
2608 _UPD_NAME = 0x1,
2609 _UPD_ID = 0x2
2610 } updated = 0;
2611 char brokername[RD_KAFKA_NODENAME_SIZE];
2612
2613 /* Need kafka_wrlock for updating rk_broker_by_id */
2614 rd_kafka_wrlock(rkb->rkb_rk);
2615 rd_kafka_broker_lock(rkb);
2616
2617 if (strcmp(rkb->rkb_nodename,
2618 rko->rko_u.node.nodename)) {
2619 rd_rkb_dbg(rkb, BROKER, "UPDATE",
2620 "Nodename changed from %s to %s",
2621 rkb->rkb_nodename,
2622 rko->rko_u.node.nodename);
2623 strncpy(rkb->rkb_nodename,
2624 rko->rko_u.node.nodename,
2625 sizeof(rkb->rkb_nodename)-1);
2626 rkb->rkb_nodename_epoch++;
2627 updated |= _UPD_NAME;
2628 }
2629
2630 if (rko->rko_u.node.nodeid != -1 &&
2631 rko->rko_u.node.nodeid != rkb->rkb_nodeid) {
2632 int32_t old_nodeid = rkb->rkb_nodeid;
2633 rd_rkb_dbg(rkb, BROKER, "UPDATE",
2634 "NodeId changed from %"PRId32" to %"PRId32,
2635 rkb->rkb_nodeid,
2636 rko->rko_u.node.nodeid);
2637
2638 rkb->rkb_nodeid = rko->rko_u.node.nodeid;
2639
2640 /* Update system thread name */
2641 rd_kafka_set_thread_sysname("rdk:broker%"PRId32,
2642 rkb->rkb_nodeid);
2643
2644 /* Update broker_by_id sorted list */
2645 if (old_nodeid == -1)
2646 rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb);
2647 rd_list_sort(&rkb->rkb_rk->rk_broker_by_id,
2648 rd_kafka_broker_cmp_by_id);
2649
2650 updated |= _UPD_ID;
2651 }
2652
2653 rd_kafka_mk_brokername(brokername, sizeof(brokername),
2654 rkb->rkb_proto,
2655 rkb->rkb_nodename, rkb->rkb_nodeid,
2656 RD_KAFKA_LEARNED);
2657 if (strcmp(rkb->rkb_name, brokername)) {
2658 /* Udate the name copy used for logging. */
2659 rd_kafka_broker_set_logname(rkb, brokername);
2660
2661 rd_rkb_dbg(rkb, BROKER, "UPDATE",
2662 "Name changed from %s to %s",
2663 rkb->rkb_name, brokername);
2664 strncpy(rkb->rkb_name, brokername,
2665 sizeof(rkb->rkb_name)-1);
2666 }
2667 rd_kafka_broker_unlock(rkb);
2668 rd_kafka_wrunlock(rkb->rkb_rk);
2669
2670 if (updated & _UPD_NAME)
2671 rd_kafka_broker_fail(rkb, LOG_NOTICE,
2672 RD_KAFKA_RESP_ERR__NODE_UPDATE,
2673 "Broker hostname updated");
2674 else if (updated & _UPD_ID) {
2675 /* Map existing partitions to this broker. */
2676 rd_kafka_broker_map_partitions(rkb);
2677
2678 /* If broker is currently in state up we need
2679 * to trigger a state change so it exits its
2680 * state&type based .._serve() loop. */
2681 rd_kafka_broker_lock(rkb);
2682 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP)
2683 rd_kafka_broker_set_state(
2684 rkb, RD_KAFKA_BROKER_STATE_UPDATE);
2685 rd_kafka_broker_unlock(rkb);
2686 }
2687
2688 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
2689 break;
2690 }
2691
2692 case RD_KAFKA_OP_XMIT_BUF:
2693 rd_kafka_broker_buf_enq2(rkb, rko->rko_u.xbuf.rkbuf);
2694 rko->rko_u.xbuf.rkbuf = NULL; /* buffer now owned by broker */
2695 if (rko->rko_replyq.q) {
2696 /* Op will be reused for forwarding response. */
2697 rko = NULL;
2698 }
2699 break;
2700
2701 case RD_KAFKA_OP_XMIT_RETRY:
2702 rd_kafka_broker_buf_retry(rkb, rko->rko_u.xbuf.rkbuf);
2703 rko->rko_u.xbuf.rkbuf = NULL;
2704 break;
2705
2706 case RD_KAFKA_OP_PARTITION_JOIN:
2707 /*
2708 * Add partition to broker toppars
2709 */
2710 rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
2711 rd_kafka_toppar_lock(rktp);
2712
2713 /* Abort join if instance is terminating */
2714 if (rd_kafka_terminating(rkb->rkb_rk) ||
2715 (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE)) {
2716 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
2717 "Topic %s [%"PRId32"]: not joining broker: "
2718 "%s",
2719 rktp->rktp_rkt->rkt_topic->str,
2720 rktp->rktp_partition,
2721 rd_kafka_terminating(rkb->rkb_rk) ?
2722 "instance is terminating" :
2723 "partition removed");
2724
2725 rd_kafka_broker_destroy(rktp->rktp_next_leader);
2726 rktp->rktp_next_leader = NULL;
2727 rd_kafka_toppar_unlock(rktp);
2728 break;
2729 }
2730
2731 /* See if we are still the next leader */
2732 if (rktp->rktp_next_leader != rkb) {
2733 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
2734 "Topic %s [%"PRId32"]: not joining broker "
2735 "(next leader %s)",
2736 rktp->rktp_rkt->rkt_topic->str,
2737 rktp->rktp_partition,
2738 rktp->rktp_next_leader ?
2739 rd_kafka_broker_name(rktp->rktp_next_leader):
2740 "(none)");
2741
2742 /* Need temporary refcount so we can safely unlock
2743 * after q_enq(). */
2744 s_rktp = rd_kafka_toppar_keep(rktp);
2745
2746 /* No, forward this op to the new next leader. */
2747 rd_kafka_q_enq(rktp->rktp_next_leader->rkb_ops, rko);
2748 rko = NULL;
2749
2750 rd_kafka_toppar_unlock(rktp);
2751 rd_kafka_toppar_destroy(s_rktp);
2752
2753 break;
2754 }
2755
2756 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
2757 "Topic %s [%"PRId32"]: joining broker "
2758 "(rktp %p, %d message(s) queued)",
2759 rktp->rktp_rkt->rkt_topic->str,
2760 rktp->rktp_partition, rktp,
2761 rd_kafka_msgq_len(&rktp->rktp_msgq));
2762
2763 rd_kafka_assert(NULL, rktp->rktp_s_for_rkb == NULL);
2764 rktp->rktp_s_for_rkb = rd_kafka_toppar_keep(rktp);
2765 rd_kafka_broker_lock(rkb);
2766 TAILQ_INSERT_TAIL(&rkb->rkb_toppars, rktp, rktp_rkblink);
2767 rkb->rkb_toppar_cnt++;
2768 rd_kafka_broker_unlock(rkb);
2769 rktp->rktp_leader = rkb;
2770 rd_assert(!rktp->rktp_msgq_wakeup_q);
2771 rktp->rktp_msgq_wakeup_q = rd_kafka_q_keep(rkb->rkb_ops);
2772 rd_kafka_broker_keep(rkb);
2773
2774 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER) {
2775 rd_kafka_broker_active_toppar_add(rkb, rktp);
2776
2777 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
2778 /* Wait for all outstanding requests from
2779 * the previous leader to finish before
2780 * producing anything to this new leader. */
2781 rd_kafka_idemp_drain_toppar(
2782 rktp,
2783 "wait for outstanding requests to "
2784 "finish before producing to "
2785 "new leader");
2786 }
2787 }
2788
2789 rd_kafka_broker_destroy(rktp->rktp_next_leader);
2790 rktp->rktp_next_leader = NULL;
2791
2792 rd_kafka_toppar_unlock(rktp);
2793
2794 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
2795 break;
2796
2797 case RD_KAFKA_OP_PARTITION_LEAVE:
2798 /*
2799 * Remove partition from broker toppars
2800 */
2801 rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
2802
2803 rd_kafka_toppar_lock(rktp);
2804
2805 /* Multiple PARTITION_LEAVEs are possible during partition
2806 * migration, make sure we're supposed to handle this one. */
2807 if (unlikely(rktp->rktp_leader != rkb)) {
2808 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
2809 "Topic %s [%"PRId32"]: "
2810 "ignoring PARTITION_LEAVE: "
2811 "broker is not leader (%s)",
2812 rktp->rktp_rkt->rkt_topic->str,
2813 rktp->rktp_partition,
2814 rktp->rktp_leader ?
2815 rd_kafka_broker_name(rktp->rktp_leader) :
2816 "none");
2817 rd_kafka_toppar_unlock(rktp);
2818 break;
2819 }
2820 rd_kafka_toppar_unlock(rktp);
2821
2822 /* Remove from fetcher list */
2823 rd_kafka_toppar_fetch_decide(rktp, rkb, 1/*force remove*/);
2824
2825 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER) {
2826 /* Purge any ProduceRequests for this toppar
2827 * in the output queue. */
2828 rd_kafka_broker_bufq_purge_by_toppar(
2829 rkb,
2830 &rkb->rkb_outbufs,
2831 RD_KAFKAP_Produce, rktp,
2832 RD_KAFKA_RESP_ERR__RETRY);
2833 }
2834
2835
2836 rd_kafka_toppar_lock(rktp);
2837
2838 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
2839 "Topic %s [%"PRId32"]: leaving broker "
2840 "(%d messages in xmitq, next leader %s, rktp %p)",
2841 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
2842 rd_kafka_msgq_len(&rktp->rktp_xmit_msgq),
2843 rktp->rktp_next_leader ?
2844 rd_kafka_broker_name(rktp->rktp_next_leader) :
2845 "(none)", rktp);
2846
2847 /* Insert xmitq(broker-local) messages to the msgq(global)
2848 * at their sorted position to maintain ordering. */
2849 rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq,
2850 &rktp->rktp_xmit_msgq,
2851 rktp->rktp_rkt->rkt_conf.
2852 msg_order_cmp);
2853
2854 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
2855 rd_kafka_broker_active_toppar_del(rkb, rktp);
2856
2857 rd_kafka_broker_lock(rkb);
2858 TAILQ_REMOVE(&rkb->rkb_toppars, rktp, rktp_rkblink);
2859 rkb->rkb_toppar_cnt--;
2860 rd_kafka_broker_unlock(rkb);
2861 rd_kafka_broker_destroy(rktp->rktp_leader);
2862 if (rktp->rktp_msgq_wakeup_q) {
2863 rd_kafka_q_destroy(rktp->rktp_msgq_wakeup_q);
2864 rktp->rktp_msgq_wakeup_q = NULL;
2865 }
2866 rktp->rktp_leader = NULL;
2867
2868 /* Need to hold on to a refcount past q_enq() and
2869 * unlock() below */
2870 s_rktp = rktp->rktp_s_for_rkb;
2871 rktp->rktp_s_for_rkb = NULL;
2872
2873 if (rktp->rktp_next_leader) {
2874 /* There is a next leader we need to migrate to. */
2875 rko->rko_type = RD_KAFKA_OP_PARTITION_JOIN;
2876 rd_kafka_q_enq(rktp->rktp_next_leader->rkb_ops, rko);
2877 rko = NULL;
2878 } else {
2879 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
2880 "Topic %s [%"PRId32"]: no next leader, "
2881 "failing %d message(s) in partition queue",
2882 rktp->rktp_rkt->rkt_topic->str,
2883 rktp->rktp_partition,
2884 rd_kafka_msgq_len(&rktp->rktp_msgq));
2885 rd_kafka_assert(NULL, rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
2886 rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
2887 rd_kafka_terminating(rkb->rkb_rk) ?
2888 RD_KAFKA_RESP_ERR__DESTROY :
2889 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
2890
2891 }
2892
2893 rd_kafka_toppar_unlock(rktp);
2894 rd_kafka_toppar_destroy(s_rktp);
2895
2896 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
2897 break;
2898
2899 case RD_KAFKA_OP_TERMINATE:
2900 /* nop: just a wake-up. */
2901 rd_rkb_dbg(rkb, BROKER, "TERM",
2902 "Received TERMINATE op in state %s: "
2903 "%d refcnts, %d toppar(s), %d active toppar(s), "
2904 "%d outbufs, %d waitresps, %d retrybufs",
2905 rd_kafka_broker_state_names[rkb->rkb_state],
2906 rd_refcnt_get(&rkb->rkb_refcnt),
2907 rkb->rkb_toppar_cnt, rkb->rkb_active_toppar_cnt,
2908 (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
2909 (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps),
2910 (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs));
2911 /* Expedite termination by bringing down the broker
2912 * and trigger a state change.
2913 * This makes sure any eonce dependent on state changes
2914 * are triggered. */
2915 rd_kafka_broker_fail(rkb, LOG_DEBUG,
2916 RD_KAFKA_RESP_ERR__DESTROY,
2917 "Client is terminating");
2918 ret = 0;
2919 break;
2920
2921 case RD_KAFKA_OP_WAKEUP:
2922 break;
2923
2924 case RD_KAFKA_OP_PURGE:
2925 rd_kafka_broker_handle_purge_queues(rkb, rko);
2926 rko = NULL; /* the rko is reused for the reply */
2927 break;
2928
2929 case RD_KAFKA_OP_CONNECT:
2930 /* Sparse connections: connection requested, transition
2931 * to TRY_CONNECT state to trigger new connection. */
2932 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT) {
2933 rd_rkb_dbg(rkb, BROKER, "CONNECT",
2934 "Received CONNECT op");
2935 rkb->rkb_persistconn.internal++;
2936 rd_kafka_broker_lock(rkb);
2937 rd_kafka_broker_set_state(
2938 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
2939 rd_kafka_broker_unlock(rkb);
2940
2941 } else if (rkb->rkb_state >=
2942 RD_KAFKA_BROKER_STATE_TRY_CONNECT) {
2943 rd_bool_t do_disconnect = rd_false;
2944
2945 /* If the nodename was changed since the last connect,
2946 * close the current connection. */
2947
2948 rd_kafka_broker_lock(rkb);
2949 do_disconnect = (rkb->rkb_connect_epoch !=
2950 rkb->rkb_nodename_epoch);
2951 rd_kafka_broker_unlock(rkb);
2952
2953 if (do_disconnect)
2954 rd_kafka_broker_fail(
2955 rkb, LOG_DEBUG,
2956 RD_KAFKA_RESP_ERR__NODE_UPDATE,
2957 "Closing connection due to "
2958 "nodename change");
2959 }
2960 break;
2961
2962 default:
2963 rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type");
2964 break;
2965 }
2966
2967 if (rko)
2968 rd_kafka_op_destroy(rko);
2969
2970 return ret;
2971}
2972
2973
2974
2975/**
2976 * @brief Serve broker ops.
2977 * @returns the number of ops served
2978 */
2979static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, int timeout_ms) {
2980 rd_kafka_op_t *rko;
2981 int cnt = 0;
2982
2983 while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_ms, 0)) &&
2984 (cnt++, rd_kafka_broker_op_serve(rkb, rko)))
2985 timeout_ms = RD_POLL_NOWAIT;
2986
2987 return cnt;
2988}
2989
2990/**
2991 * @brief Serve broker ops and IOs.
2992 *
2993 * If a connection exists, poll IO first based on timeout.
2994 * Use remaining timeout for ops queue poll.
2995 *
2996 * If no connection, poll ops queue using timeout.
2997 *
2998 * Sparse connections: if there's need for a connection, set
2999 * timeout to NOWAIT.
3000 *
3001 * @param abs_timeout Maximum block time (absolute time).
3002 *
3003 * @locality broker thread
3004 * @locks none
3005 */
3006static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
3007 rd_ts_t abs_timeout) {
3008 rd_ts_t now;
3009 rd_ts_t remains_us;
3010 int remains_ms;
3011
3012 if (unlikely(rd_kafka_terminating(rkb->rkb_rk)))
3013 remains_ms = 1;
3014 else if (unlikely(rd_kafka_broker_needs_connection(rkb)))
3015 remains_ms = RD_POLL_NOWAIT;
3016 else if (unlikely(abs_timeout == RD_POLL_INFINITE))
3017 remains_ms = rd_kafka_max_block_ms;
3018 else if ((remains_us = abs_timeout - (now = rd_clock())) < 0)
3019 remains_ms = RD_POLL_NOWAIT;
3020 else
3021 /* + 999: Round up to millisecond to
3022 * avoid busy-looping during the last
3023 * millisecond. */
3024 remains_ms = (int)((remains_us + 999) / 1000);
3025
3026
3027 if (likely(rkb->rkb_transport != NULL)) {
3028 /* Serve IO events */
3029 rd_kafka_transport_io_serve(rkb->rkb_transport, remains_ms);
3030
3031 remains_ms = RD_POLL_NOWAIT;
3032 }
3033
3034
3035 /* Serve broker ops */
3036 rd_kafka_broker_ops_serve(rkb, remains_ms);
3037
3038
3039 /* An op might have triggered the need for a connection, if so
3040 * transition to TRY_CONNECT state. */
3041 if (unlikely(rd_kafka_broker_needs_connection(rkb) &&
3042 rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT)) {
3043 rd_kafka_broker_lock(rkb);
3044 rd_kafka_broker_set_state(
3045 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
3046 rd_kafka_broker_unlock(rkb);
3047 }
3048
3049 /* Scan queues for timeouts. */
3050 now = rd_clock();
3051 if (rd_interval(&rkb->rkb_timeout_scan_intvl, 1000000, now) > 0)
3052 rd_kafka_broker_timeout_scan(rkb, now);
3053}
3054
3055
3056/**
3057 * @brief Serve the toppar's assigned to this broker.
3058 *
3059 * @returns the minimum Fetch backoff time (abs timestamp) for the
3060 * partitions to fetch.
3061 *
3062 * @locality broker thread
3063 */
3064static rd_ts_t rd_kafka_broker_toppars_serve (rd_kafka_broker_t *rkb) {
3065 rd_kafka_toppar_t *rktp, *rktp_tmp;
3066 rd_ts_t min_backoff = RD_TS_MAX;
3067
3068 TAILQ_FOREACH_SAFE(rktp, &rkb->rkb_toppars, rktp_rkblink, rktp_tmp) {
3069 rd_ts_t backoff;
3070
3071 /* Serve toppar to update desired rktp state */
3072 backoff = rd_kafka_broker_consumer_toppar_serve(rkb, rktp);
3073 if (backoff < min_backoff)
3074 min_backoff = backoff;
3075 }
3076
3077 return min_backoff;
3078}
3079
3080
3081/**
3082 * @brief Idle function for the internal broker handle.
3083 */
3084static void rd_kafka_broker_internal_serve (rd_kafka_broker_t *rkb,
3085 rd_ts_t abs_timeout) {
3086 int initial_state = rkb->rkb_state;
3087
3088 do {
3089 rd_kafka_broker_toppars_serve(rkb);
3090 rd_kafka_broker_ops_io_serve(rkb, abs_timeout);
3091 } while (!rd_kafka_broker_terminating(rkb) &&
3092 (int)rkb->rkb_state == initial_state &&
3093 !rd_timeout_expired(rd_timeout_remains(abs_timeout)));
3094}
3095
3096
3097/**
3098 * @brief Scan toppar's xmit and producer queue for message timeouts and
3099 * enqueue delivery reports for timed out messages.
3100 *
3101 * @returns the number of messages timed out.
3102 *
3103 * @locality toppar's broker handler thread
3104 * @locks toppar_lock MUST be held
3105 */
3106static int rd_kafka_broker_toppar_msgq_scan (rd_kafka_broker_t *rkb,
3107 rd_kafka_toppar_t *rktp,
3108 rd_ts_t now) {
3109 rd_kafka_msgq_t xtimedout = RD_KAFKA_MSGQ_INITIALIZER(xtimedout);
3110 rd_kafka_msgq_t qtimedout = RD_KAFKA_MSGQ_INITIALIZER(qtimedout);
3111 int xcnt, qcnt, cnt;
3112 uint64_t first, last;
3113
3114 xcnt = rd_kafka_msgq_age_scan(rktp, &rktp->rktp_xmit_msgq,
3115 &xtimedout, now);
3116 qcnt = rd_kafka_msgq_age_scan(rktp, &rktp->rktp_msgq,
3117 &qtimedout, now);
3118
3119 cnt = xcnt + qcnt;
3120 if (likely(cnt == 0))
3121 return 0;
3122
3123 /* Insert queue-timedout into xmitqueue-timedout in a sorted fashion */
3124 rd_kafka_msgq_insert_msgq(&xtimedout, &qtimedout,
3125 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
3126
3127 first = rd_kafka_msgq_first(&xtimedout)->rkm_u.producer.msgid;
3128 last = rd_kafka_msgq_last(&xtimedout)->rkm_u.producer.msgid;
3129
3130 rd_rkb_dbg(rkb, MSG, "TIMEOUT",
3131 "%s [%"PRId32"]: timed out %d+%d message(s) "
3132 "(MsgId %"PRIu64"..%"PRIu64"): message.timeout.ms exceeded",
3133 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
3134 xcnt, qcnt, first, last);
3135
3136 /* Trigger delivery report for timed out messages */
3137 rd_kafka_dr_msgq(rktp->rktp_rkt, &xtimedout,
3138 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
3139
3140 return cnt;
3141}
3142
3143
3144/**
3145 * @returns the number of requests that may be enqueued before
3146 * queue.backpressure.threshold is reached.
3147 */
3148
3149static RD_INLINE unsigned int
3150rd_kafka_broker_outbufs_space (rd_kafka_broker_t *rkb) {
3151 int r = rkb->rkb_rk->rk_conf.queue_backpressure_thres -
3152 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt);
3153 return r < 0 ? 0 : (unsigned int)r;
3154}
3155
3156
3157/**
3158 * @brief Serve a toppar for producing.
3159 *
3160 * @param next_wakeup will be updated to when the next wake-up/attempt is
3161 * desired, only lower (sooner) values will be set.
3162 *
3163 * @returns the number of messages produced.
3164 *
3165 * @locks none
3166 * @locality broker thread
3167 */
3168static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
3169 rd_kafka_toppar_t *rktp,
3170 const rd_kafka_pid_t pid,
3171 rd_ts_t now,
3172 rd_ts_t *next_wakeup,
3173 int do_timeout_scan) {
3174 int cnt = 0;
3175 int r;
3176 rd_kafka_msg_t *rkm;
3177 int move_cnt = 0;
3178 int max_requests;
3179 int reqcnt;
3180 int inflight = 0;
3181
3182 /* By limiting the number of not-yet-sent buffers (rkb_outbufs) we
3183 * provide a backpressure mechanism to the producer loop
3184 * which allows larger message batches to accumulate and thus
3185 * increase throughput.
3186 * This comes at no latency cost since there are already
3187 * buffers enqueued waiting for transmission. */
3188 max_requests = rd_kafka_broker_outbufs_space(rkb);
3189
3190 rd_kafka_toppar_lock(rktp);
3191
3192 if (unlikely(rktp->rktp_leader != rkb)) {
3193 /* Currently migrating away from this
3194 * broker. */
3195 rd_kafka_toppar_unlock(rktp);
3196 return 0;
3197 }
3198
3199 if (unlikely(do_timeout_scan)) {
3200 int timeoutcnt;
3201
3202 /* Scan queues for msg timeouts */
3203 timeoutcnt = rd_kafka_broker_toppar_msgq_scan(rkb, rktp, now);
3204
3205 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3206 if (!rd_kafka_pid_valid(pid)) {
3207 /* If we don't have a PID, we can't transmit
3208 * any messages. */
3209 rd_kafka_toppar_unlock(rktp);
3210 return 0;
3211
3212 } else if (timeoutcnt > 0) {
3213 /* Message timeouts will lead to gaps the in
3214 * the message sequence and thus trigger
3215 * OutOfOrderSequence errors from the broker.
3216 * Bump the epoch to reset the base msgid after
3217 * draining all partitions. */
3218
3219 /* Must not hold toppar lock */
3220 rd_kafka_toppar_unlock(rktp);
3221
3222 rd_kafka_idemp_drain_epoch_bump(
3223 rkb->rkb_rk,
3224 "%d message(s) timed out "
3225 "on %s [%"PRId32"]",
3226 timeoutcnt,
3227 rktp->rktp_rkt->rkt_topic->str,
3228 rktp->rktp_partition);
3229 return 0;
3230 }
3231 }
3232 }
3233
3234 if (unlikely(rd_kafka_fatal_error_code(rkb->rkb_rk))) {
3235 /* Fatal error has been raised, don't produce. */
3236 max_requests = 0;
3237 } else if (unlikely(RD_KAFKA_TOPPAR_IS_PAUSED(rktp))) {
3238 /* Partition is paused */
3239 max_requests = 0;
3240 } else if (max_requests > 0) {
3241 /* Move messages from locked partition produce queue
3242 * to broker-local xmit queue. */
3243 if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0)
3244 rd_kafka_msgq_insert_msgq(&rktp->rktp_xmit_msgq,
3245 &rktp->rktp_msgq,
3246 rktp->rktp_rkt->rkt_conf.
3247 msg_order_cmp);
3248 }
3249
3250 rd_kafka_toppar_unlock(rktp);
3251
3252
3253 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3254 /* Update the partition's cached PID, and reset the
3255 * base msg sequence if necessary */
3256 rd_bool_t did_purge = rd_false;
3257
3258 if (unlikely(!rd_kafka_pid_eq(pid, rktp->rktp_eos.pid))) {
3259 /* Flush any ProduceRequests for this partition in the
3260 * output buffer queue to speed up recovery. */
3261 rd_kafka_broker_bufq_purge_by_toppar(
3262 rkb,
3263 &rkb->rkb_outbufs,
3264 RD_KAFKAP_Produce, rktp,
3265 RD_KAFKA_RESP_ERR__RETRY);
3266 did_purge = rd_true;
3267
3268 if (rd_kafka_pid_valid(rktp->rktp_eos.pid))
3269 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3270 "%.*s [%"PRId32"] PID has changed: "
3271 "must drain requests for all "
3272 "partitions before resuming reset "
3273 "of PID",
3274 RD_KAFKAP_STR_PR(rktp->rktp_rkt->
3275 rkt_topic),
3276 rktp->rktp_partition);
3277 }
3278
3279 inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
3280
3281 if (unlikely(rktp->rktp_eos.wait_drain)) {
3282 if (inflight) {
3283 /* Waiting for in-flight requests to
3284 * drain/finish before producing anything more.
3285 * This is used to recover to a consistent
3286 * state when the partition leader
3287 * has changed, or timed out messages
3288 * have been removed from the queue. */
3289
3290 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3291 "%.*s [%"PRId32"] waiting for "
3292 "%d in-flight request(s) to drain "
3293 "from queue before continuing "
3294 "to produce",
3295 RD_KAFKAP_STR_PR(rktp->rktp_rkt->
3296 rkt_topic),
3297 rktp->rktp_partition,
3298 inflight);
3299
3300 /* Flush any ProduceRequests for this
3301 * partition in the output buffer queue to
3302 * speed up draining. */
3303 if (!did_purge)
3304 rd_kafka_broker_bufq_purge_by_toppar(
3305 rkb,
3306 &rkb->rkb_outbufs,
3307 RD_KAFKAP_Produce, rktp,
3308 RD_KAFKA_RESP_ERR__RETRY);
3309
3310 return 0;
3311 }
3312
3313 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3314 "%.*s [%"PRId32"] all in-flight requests "
3315 "drained from queue",
3316 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3317 rktp->rktp_partition);
3318
3319 rktp->rktp_eos.wait_drain = rd_false;
3320 }
3321
3322 /* Limit the number of in-flight requests (per partition)
3323 * to the broker's sequence de-duplication window. */
3324 max_requests = RD_MIN(max_requests,
3325 RD_KAFKA_IDEMP_MAX_INFLIGHT - inflight);
3326 }
3327
3328
3329 /* Check if allowed to create and enqueue a ProduceRequest */
3330 if (max_requests <= 0)
3331 return 0;
3332
3333 r = rktp->rktp_xmit_msgq.rkmq_msg_cnt;
3334 if (r == 0)
3335 return 0;
3336
3337 rd_kafka_msgq_verify_order(rktp, &rktp->rktp_xmit_msgq, 0, rd_false);
3338
3339 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3340 "%.*s [%"PRId32"] %d message(s) in "
3341 "xmit queue (%d added from partition queue)",
3342 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3343 rktp->rktp_partition,
3344 r, move_cnt);
3345
3346 rkm = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
3347 rd_dassert(rkm != NULL);
3348
3349 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3350 /* Update the partition's cached PID, and reset the
3351 * base msg sequence if necessary */
3352 if (unlikely(!rd_kafka_pid_eq(pid, rktp->rktp_eos.pid))) {
3353 /* Attempt to change the pid, it will fail if there
3354 * are outstanding messages in-flight, in which case
3355 * we eventually come back here to retry. */
3356 if (!rd_kafka_toppar_pid_change(
3357 rktp, pid, rkm->rkm_u.producer.msgid))
3358 return 0;
3359 }
3360 }
3361
3362 if (unlikely(rkb->rkb_state != RD_KAFKA_BROKER_STATE_UP)) {
3363 /* There are messages to send but connection is not up. */
3364 rd_rkb_dbg(rkb, BROKER, "TOPPAR",
3365 "%.*s [%"PRId32"] "
3366 "%d message(s) queued but broker not up",
3367 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3368 rktp->rktp_partition,
3369 r);
3370 rkb->rkb_persistconn.internal++;
3371 return 0;
3372 }
3373
3374 /* Attempt to fill the batch size, but limit
3375 * our waiting to queue.buffering.max.ms
3376 * and batch.num.messages. */
3377 if (r < rkb->rkb_rk->rk_conf.batch_num_messages) {
3378 rd_ts_t wait_max;
3379
3380 /* Calculate maximum wait-time to honour
3381 * queue.buffering.max.ms contract. */
3382 wait_max = rd_kafka_msg_enq_time(rkm) +
3383 (rkb->rkb_rk->rk_conf.buffering_max_ms * 1000);
3384
3385 if (wait_max > now) {
3386 /* Wait for more messages or queue.buffering.max.ms
3387 * to expire. */
3388 if (wait_max < *next_wakeup)
3389 *next_wakeup = wait_max;
3390 return 0;
3391 }
3392 }
3393
3394 /* Honour retry.backoff.ms. */
3395 if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
3396 if (rkm->rkm_u.producer.ts_backoff < *next_wakeup)
3397 *next_wakeup = rkm->rkm_u.producer.ts_backoff;
3398 /* Wait for backoff to expire */
3399 return 0;
3400 }
3401
3402 /* Send Produce requests for this toppar, honouring the
3403 * queue backpressure threshold. */
3404 for (reqcnt = 0 ; reqcnt < max_requests ; reqcnt++) {
3405 r = rd_kafka_ProduceRequest(rkb, rktp, pid);
3406 if (likely(r > 0))
3407 cnt += r;
3408 else
3409 break;
3410 }
3411
3412 /* If there are messages still in the queue, make the next
3413 * wakeup immediate. */
3414 if (rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) > 0)
3415 *next_wakeup = now;
3416
3417 return cnt;
3418}
3419
3420
3421
3422/**
3423 * @brief Produce from all toppars assigned to this broker.
3424 *
3425 * @param next_wakeup is updated if the next IO/ops timeout should be
3426 * less than the input value.
3427 *
3428 * @returns the total number of messages produced.
3429 */
3430static int rd_kafka_broker_produce_toppars (rd_kafka_broker_t *rkb,
3431 rd_ts_t now,
3432 rd_ts_t *next_wakeup,
3433 int do_timeout_scan) {
3434 rd_kafka_toppar_t *rktp;
3435 int cnt = 0;
3436 rd_ts_t ret_next_wakeup = *next_wakeup;
3437 rd_kafka_pid_t pid = RD_KAFKA_PID_INITIALIZER;
3438
3439 /* Round-robin serve each toppar. */
3440 rktp = rkb->rkb_active_toppar_next;
3441 if (unlikely(!rktp))
3442 return 0;
3443
3444 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3445 /* Idempotent producer: get a copy of the current pid. */
3446 pid = rd_kafka_idemp_get_pid(rkb->rkb_rk);
3447
3448 /* If we don't have a valid pid return immedatiely,
3449 * unless the per-partition timeout scan needs to run.
3450 * The broker threads are woken up when a PID is acquired. */
3451 if (!rd_kafka_pid_valid(pid) && !do_timeout_scan)
3452 return 0;
3453 }
3454
3455 do {
3456 rd_ts_t this_next_wakeup = ret_next_wakeup;
3457
3458 /* Try producing toppar */
3459 cnt += rd_kafka_toppar_producer_serve(
3460 rkb, rktp, pid, now, &this_next_wakeup,
3461 do_timeout_scan);
3462
3463 if (this_next_wakeup < ret_next_wakeup)
3464 ret_next_wakeup = this_next_wakeup;
3465
3466 } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->
3467 rkb_active_toppars,
3468 rktp, rktp_activelink)) !=
3469 rkb->rkb_active_toppar_next);
3470
3471 /* Update next starting toppar to produce in round-robin list. */
3472 rd_kafka_broker_active_toppar_next(
3473 rkb,
3474 CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
3475 rktp, rktp_activelink));
3476
3477 *next_wakeup = ret_next_wakeup;
3478
3479 return cnt;
3480}
3481
3482/**
3483 * @brief Producer serving
3484 */
3485static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb,
3486 rd_ts_t abs_timeout) {
3487 rd_interval_t timeout_scan;
3488 unsigned int initial_state = rkb->rkb_state;
3489 rd_ts_t now;
3490 int cnt = 0;
3491
3492 rd_interval_init(&timeout_scan);
3493
3494 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
3495
3496 rd_kafka_broker_lock(rkb);
3497
3498 while (!rd_kafka_broker_terminating(rkb) &&
3499 rkb->rkb_state == initial_state &&
3500 (abs_timeout > (now = rd_clock()))) {
3501 int do_timeout_scan;
3502 rd_ts_t next_wakeup = abs_timeout;
3503
3504 rd_kafka_broker_unlock(rkb);
3505
3506 /* Perform timeout scan on first iteration, thus
3507 * on each state change, to make sure messages in
3508 * partition rktp_xmit_msgq are timed out before
3509 * being attempted to re-transmit. */
3510 do_timeout_scan = cnt++ == 0 ||
3511 rd_interval(&timeout_scan, 1000*1000, now) >= 0;
3512
3513 rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup,
3514 do_timeout_scan);
3515
3516 /* Check and move retry buffers */
3517 if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0))
3518 rd_kafka_broker_retry_bufs_move(rkb);
3519
3520 rd_kafka_broker_ops_io_serve(rkb, next_wakeup);
3521
3522 rd_kafka_broker_lock(rkb);
3523 }
3524
3525 rd_kafka_broker_unlock(rkb);
3526}
3527
3528
3529
3530
3531
3532
3533
3534/**
3535 * Backoff the next Fetch request (due to error).
3536 */
3537static void rd_kafka_broker_fetch_backoff (rd_kafka_broker_t *rkb,
3538 rd_kafka_resp_err_t err) {
3539 int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
3540 rkb->rkb_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000);
3541 rd_rkb_dbg(rkb, FETCH, "BACKOFF",
3542 "Fetch backoff for %dms: %s",
3543 backoff_ms, rd_kafka_err2str(err));
3544}
3545
3546/**
3547 * @brief Backoff the next Fetch for specific partition
3548 */
3549static void rd_kafka_toppar_fetch_backoff (rd_kafka_broker_t *rkb,
3550 rd_kafka_toppar_t *rktp,
3551 rd_kafka_resp_err_t err) {
3552 int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
3553
3554 /* Don't back off on reaching end of partition */
3555 if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
3556 return;
3557
3558 rktp->rktp_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000);
3559 rd_rkb_dbg(rkb, FETCH, "BACKOFF",
3560 "%s [%"PRId32"]: Fetch backoff for %dms: %s",
3561 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
3562 backoff_ms, rd_kafka_err2str(err));
3563}
3564
3565
3566/**
3567 * Parses and handles a Fetch reply.
3568 * Returns 0 on success or an error code on failure.
3569 */
3570static rd_kafka_resp_err_t
3571rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
3572 rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request) {
3573 int32_t TopicArrayCnt;
3574 int i;
3575 const int log_decode_errors = LOG_ERR;
3576 shptr_rd_kafka_itopic_t *s_rkt = NULL;
3577
3578 if (rd_kafka_buf_ApiVersion(request) >= 1) {
3579 int32_t Throttle_Time;
3580 rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
3581
3582 rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
3583 Throttle_Time);
3584 }
3585
3586 rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
3587 /* Verify that TopicArrayCnt seems to be in line with remaining size */
3588 rd_kafka_buf_check_len(rkbuf,
3589 TopicArrayCnt * (3/*topic min size*/ +
3590 4/*PartitionArrayCnt*/ +
3591 4+2+8+4/*inner header*/));
3592
3593 for (i = 0 ; i < TopicArrayCnt ; i++) {
3594 rd_kafkap_str_t topic;
3595 int32_t fetch_version;
3596 int32_t PartitionArrayCnt;
3597 int j;
3598
3599 rd_kafka_buf_read_str(rkbuf, &topic);
3600 rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
3601
3602 s_rkt = rd_kafka_topic_find0(rkb->rkb_rk, &topic);
3603
3604 for (j = 0 ; j < PartitionArrayCnt ; j++) {
3605 struct rd_kafka_toppar_ver *tver, tver_skel;
3606 rd_kafka_toppar_t *rktp;
3607 shptr_rd_kafka_toppar_t *s_rktp = NULL;
3608 rd_slice_t save_slice;
3609 struct {
3610 int32_t Partition;
3611 int16_t ErrorCode;
3612 int64_t HighwaterMarkOffset;
3613 int64_t LastStableOffset; /* v4 */
3614 int32_t MessageSetSize;
3615 } hdr;
3616 rd_kafka_resp_err_t err;
3617
3618 rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
3619 rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
3620 rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset);
3621
3622 if (rd_kafka_buf_ApiVersion(request) == 4) {
3623 int32_t AbortedTxCnt;
3624 rd_kafka_buf_read_i64(rkbuf,
3625 &hdr.LastStableOffset);
3626 rd_kafka_buf_read_i32(rkbuf, &AbortedTxCnt);
3627 /* Ignore aborted transactions for now */
3628 if (AbortedTxCnt > 0)
3629 rd_kafka_buf_skip(rkbuf,
3630 AbortedTxCnt * (8+8));
3631 } else
3632 hdr.LastStableOffset = -1;
3633
3634 rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSetSize);
3635
3636 if (unlikely(hdr.MessageSetSize < 0))
3637 rd_kafka_buf_parse_fail(
3638 rkbuf,
3639 "%.*s [%"PRId32"]: "
3640 "invalid MessageSetSize %"PRId32,
3641 RD_KAFKAP_STR_PR(&topic),
3642 hdr.Partition,
3643 hdr.MessageSetSize);
3644
3645 /* Look up topic+partition */
3646 if (likely(s_rkt != NULL)) {
3647 rd_kafka_itopic_t *rkt;
3648 rkt = rd_kafka_topic_s2i(s_rkt);
3649 rd_kafka_topic_rdlock(rkt);
3650 s_rktp = rd_kafka_toppar_get(
3651 rkt, hdr.Partition, 0/*no ua-on-miss*/);
3652 rd_kafka_topic_rdunlock(rkt);
3653 }
3654
3655 if (unlikely(!s_rkt || !s_rktp)) {
3656 rd_rkb_dbg(rkb, TOPIC, "UNKTOPIC",
3657 "Received Fetch response "
3658 "(error %hu) for unknown topic "
3659 "%.*s [%"PRId32"]: ignoring",
3660 hdr.ErrorCode,
3661 RD_KAFKAP_STR_PR(&topic),
3662 hdr.Partition);
3663 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
3664 continue;
3665 }
3666
3667 rktp = rd_kafka_toppar_s2i(s_rktp);
3668
3669 rd_kafka_toppar_lock(rktp);
3670 /* Make sure toppar hasn't moved to another broker
3671 * during the lifetime of the request. */
3672 if (unlikely(rktp->rktp_leader != rkb)) {
3673 rd_kafka_toppar_unlock(rktp);
3674 rd_rkb_dbg(rkb, MSG, "FETCH",
3675 "%.*s [%"PRId32"]: "
3676 "partition leadership changed: "
3677 "discarding fetch response",
3678 RD_KAFKAP_STR_PR(&topic),
3679 hdr.Partition);
3680 rd_kafka_toppar_destroy(s_rktp); /* from get */
3681 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
3682 continue;
3683 }
3684 fetch_version = rktp->rktp_fetch_version;
3685 rd_kafka_toppar_unlock(rktp);
3686
3687 /* Check if this Fetch is for an outdated fetch version,
3688 * or the original rktp was removed and a new one
3689 * created (due to partition count decreasing and
3690 * then increasing again, which can happen in
3691 * desynchronized clusters): if so ignore it. */
3692 tver_skel.s_rktp = s_rktp;
3693 tver = rd_list_find(request->rkbuf_rktp_vers,
3694 &tver_skel,
3695 rd_kafka_toppar_ver_cmp);
3696 rd_kafka_assert(NULL, tver);
3697 if (rd_kafka_toppar_s2i(tver->s_rktp) != rktp ||
3698 tver->version < fetch_version) {
3699 rd_rkb_dbg(rkb, MSG, "DROP",
3700 "%s [%"PRId32"]: "
3701 "dropping outdated fetch response "
3702 "(v%d < %d or old rktp)",
3703 rktp->rktp_rkt->rkt_topic->str,
3704 rktp->rktp_partition,
3705 tver->version, fetch_version);
3706 rd_atomic64_add(&rktp->rktp_c. rx_ver_drops, 1);
3707 rd_kafka_toppar_destroy(s_rktp); /* from get */
3708 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
3709 continue;
3710 }
3711
3712 rd_rkb_dbg(rkb, MSG, "FETCH",
3713 "Topic %.*s [%"PRId32"] MessageSet "
3714 "size %"PRId32", error \"%s\", "
3715 "MaxOffset %"PRId64", "
3716 "Ver %"PRId32"/%"PRId32,
3717 RD_KAFKAP_STR_PR(&topic), hdr.Partition,
3718 hdr.MessageSetSize,
3719 rd_kafka_err2str(hdr.ErrorCode),
3720 hdr.HighwaterMarkOffset,
3721 tver->version, fetch_version);
3722
3723
3724 /* Update hi offset to be able to compute
3725 * consumer lag. */
3726 /* FIXME: if IsolationLevel==READ_COMMITTED,
3727 * use hdr.LastStableOffset */
3728 rktp->rktp_offsets.hi_offset = hdr.HighwaterMarkOffset;
3729
3730
3731 /* High offset for get_watermark_offsets() */
3732 rd_kafka_toppar_lock(rktp);
3733 rktp->rktp_hi_offset = hdr.HighwaterMarkOffset;
3734 rd_kafka_toppar_unlock(rktp);
3735
3736 /* If this is the last message of the queue,
3737 * signal EOF back to the application. */
3738 if (hdr.HighwaterMarkOffset ==
3739 rktp->rktp_offsets.fetch_offset
3740 &&
3741 rktp->rktp_offsets.eof_offset !=
3742 rktp->rktp_offsets.fetch_offset) {
3743 hdr.ErrorCode =
3744 RD_KAFKA_RESP_ERR__PARTITION_EOF;
3745 rktp->rktp_offsets.eof_offset =
3746 rktp->rktp_offsets.fetch_offset;
3747 }
3748
3749 /* Handle partition-level errors. */
3750 if (unlikely(hdr.ErrorCode !=
3751 RD_KAFKA_RESP_ERR_NO_ERROR)) {
3752 /* Some errors should be passed to the
3753 * application while some handled by rdkafka */
3754 switch (hdr.ErrorCode)
3755 {
3756 /* Errors handled by rdkafka */
3757 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
3758 case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
3759 case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
3760 case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
3761 /* Request metadata information update*/
3762 rd_kafka_toppar_leader_unavailable(
3763 rktp, "fetch", hdr.ErrorCode);
3764 break;
3765
3766 /* Application errors */
3767 case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE:
3768 {
3769 int64_t err_offset =
3770 rktp->rktp_offsets.fetch_offset;
3771 rktp->rktp_offsets.fetch_offset =
3772 RD_KAFKA_OFFSET_INVALID;
3773 rd_kafka_offset_reset(
3774 rktp, err_offset,
3775 hdr.ErrorCode,
3776 rd_kafka_err2str(hdr.
3777 ErrorCode));
3778 }
3779 break;
3780 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
3781 if (!rkb->rkb_rk->rk_conf.enable_partition_eof)
3782 break;
3783 /* FALLTHRU */
3784 case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
3785 default: /* and all other errors */
3786 rd_dassert(tver->version > 0);
3787 rd_kafka_q_op_err(
3788 rktp->rktp_fetchq,
3789 RD_KAFKA_OP_CONSUMER_ERR,
3790 hdr.ErrorCode, tver->version,
3791 rktp,
3792 rktp->rktp_offsets.fetch_offset,
3793 "%s",
3794 rd_kafka_err2str(hdr.ErrorCode));
3795 break;
3796 }
3797
3798 rd_kafka_toppar_fetch_backoff(rkb, rktp,
3799 hdr.ErrorCode);
3800
3801 rd_kafka_toppar_destroy(s_rktp);/* from get()*/
3802
3803 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
3804 continue;
3805 }
3806
3807 if (unlikely(hdr.MessageSetSize <= 0)) {
3808 rd_kafka_toppar_destroy(s_rktp); /*from get()*/
3809 continue;
3810 }
3811
3812 /**
3813 * Parse MessageSet
3814 */
3815 if (!rd_slice_narrow_relative(
3816 &rkbuf->rkbuf_reader,
3817 &save_slice,
3818 (size_t)hdr.MessageSetSize))
3819 rd_kafka_buf_check_len(rkbuf,
3820 hdr.MessageSetSize);
3821
3822 /* Parse messages */
3823 err = rd_kafka_msgset_parse(rkbuf, request, rktp, tver);
3824
3825 rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice);
3826 /* Continue with next partition regardless of
3827 * parse errors (which are partition-specific) */
3828
3829 /* On error: back off the fetcher for this partition */
3830 if (unlikely(err))
3831 rd_kafka_toppar_fetch_backoff(rkb, rktp, err);
3832
3833 rd_kafka_toppar_destroy(s_rktp); /* from get */
3834 }
3835
3836 if (s_rkt) {
3837 rd_kafka_topic_destroy0(s_rkt);
3838 s_rkt = NULL;
3839 }
3840 }
3841
3842 if (rd_kafka_buf_read_remain(rkbuf) != 0) {
3843 rd_kafka_buf_parse_fail(rkbuf,
3844 "Remaining data after message set "
3845 "parse: %"PRIusz" bytes",
3846 rd_kafka_buf_read_remain(rkbuf));
3847 RD_NOTREACHED();
3848 }
3849
3850 return 0;
3851
3852err_parse:
3853 if (s_rkt)
3854 rd_kafka_topic_destroy0(s_rkt);
3855 rd_rkb_dbg(rkb, MSG, "BADMSG", "Bad message (Fetch v%d): "
3856 "is broker.version.fallback incorrectly set?",
3857 (int)request->rkbuf_reqhdr.ApiVersion);
3858 return rkbuf->rkbuf_err;
3859}
3860
3861
3862
3863static void rd_kafka_broker_fetch_reply (rd_kafka_t *rk,
3864 rd_kafka_broker_t *rkb,
3865 rd_kafka_resp_err_t err,
3866 rd_kafka_buf_t *reply,
3867 rd_kafka_buf_t *request,
3868 void *opaque) {
3869
3870 if (err == RD_KAFKA_RESP_ERR__DESTROY)
3871 return; /* Terminating */
3872
3873 rd_kafka_assert(rkb->rkb_rk, rkb->rkb_fetching > 0);
3874 rkb->rkb_fetching = 0;
3875
3876 /* Parse and handle the messages (unless the request errored) */
3877 if (!err && reply)
3878 err = rd_kafka_fetch_reply_handle(rkb, reply, request);
3879
3880 if (unlikely(err)) {
3881 char tmp[128];
3882
3883 rd_rkb_dbg(rkb, MSG, "FETCH", "Fetch reply: %s",
3884 rd_kafka_err2str(err));
3885 switch (err)
3886 {
3887 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
3888 case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
3889 case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
3890 case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
3891 case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
3892 /* Request metadata information update */
3893 rd_snprintf(tmp, sizeof(tmp),
3894 "FetchRequest failed: %s",
3895 rd_kafka_err2str(err));
3896 rd_kafka_metadata_refresh_known_topics(rkb->rkb_rk,
3897 NULL, 1/*force*/,
3898 tmp);
3899 /* FALLTHRU */
3900
3901 case RD_KAFKA_RESP_ERR__TRANSPORT:
3902 case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
3903 case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
3904 /* The fetch is already intervalled from
3905 * consumer_serve() so dont retry. */
3906 break;
3907
3908 default:
3909 break;
3910 }
3911
3912 rd_kafka_broker_fetch_backoff(rkb, err);
3913 /* FALLTHRU */
3914 }
3915}
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927/**
3928 * Build and send a Fetch request message for all underflowed toppars
3929 * for a specific broker.
3930 */
3931static int rd_kafka_broker_fetch_toppars (rd_kafka_broker_t *rkb, rd_ts_t now) {
3932 rd_kafka_toppar_t *rktp;
3933 rd_kafka_buf_t *rkbuf;
3934 int cnt = 0;
3935 size_t of_TopicArrayCnt = 0;
3936 int TopicArrayCnt = 0;
3937 size_t of_PartitionArrayCnt = 0;
3938 int PartitionArrayCnt = 0;
3939 rd_kafka_itopic_t *rkt_last = NULL;
3940
3941 /* Create buffer and segments:
3942 * 1 x ReplicaId MaxWaitTime MinBytes TopicArrayCnt
3943 * N x topic name
3944 * N x PartitionArrayCnt Partition FetchOffset MaxBytes
3945 * where N = number of toppars.
3946 * Since we dont keep track of the number of topics served by
3947 * this broker, only the partition count, we do a worst-case calc
3948 * when allocating and assume each partition is on its own topic
3949 */
3950
3951 if (unlikely(rkb->rkb_active_toppar_cnt == 0))
3952 return 0;
3953
3954 rkbuf = rd_kafka_buf_new_request(
3955 rkb, RD_KAFKAP_Fetch, 1,
3956 /* ReplicaId+MaxWaitTime+MinBytes+TopicCnt */
3957 4+4+4+4+
3958 /* N x PartCnt+Partition+FetchOffset+MaxBytes+?TopicNameLen?*/
3959 (rkb->rkb_active_toppar_cnt * (4+4+8+4+40)));
3960
3961 if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)
3962 rd_kafka_buf_ApiVersion_set(rkbuf, 4,
3963 RD_KAFKA_FEATURE_MSGVER2);
3964 else if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)
3965 rd_kafka_buf_ApiVersion_set(rkbuf, 2,
3966 RD_KAFKA_FEATURE_MSGVER1);
3967 else if (rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)
3968 rd_kafka_buf_ApiVersion_set(rkbuf, 1,
3969 RD_KAFKA_FEATURE_THROTTLETIME);
3970
3971
3972 /* FetchRequest header */
3973 /* ReplicaId */
3974 rd_kafka_buf_write_i32(rkbuf, -1);
3975 /* MaxWaitTime */
3976 rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_wait_max_ms);
3977 /* MinBytes */
3978 rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_min_bytes);
3979
3980 if (rd_kafka_buf_ApiVersion(rkbuf) == 4) {
3981 /* MaxBytes */
3982 rd_kafka_buf_write_i32(rkbuf,
3983 rkb->rkb_rk->rk_conf.fetch_max_bytes);
3984 /* IsolationLevel */
3985 rd_kafka_buf_write_i8(rkbuf, RD_KAFKAP_READ_UNCOMMITTED);
3986 }
3987
3988 /* Write zero TopicArrayCnt but store pointer for later update */
3989 of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
3990
3991 /* Prepare map for storing the fetch version for each partition,
3992 * this will later be checked in Fetch response to purge outdated
3993 * responses (e.g., after a seek). */
3994 rkbuf->rkbuf_rktp_vers = rd_list_new(
3995 0, (void *)rd_kafka_toppar_ver_destroy);
3996 rd_list_prealloc_elems(rkbuf->rkbuf_rktp_vers,
3997 sizeof(struct rd_kafka_toppar_ver),
3998 rkb->rkb_active_toppar_cnt, 0);
3999
4000 /* Round-robin start of the list. */
4001 rktp = rkb->rkb_active_toppar_next;
4002 do {
4003 struct rd_kafka_toppar_ver *tver;
4004
4005 if (rkt_last != rktp->rktp_rkt) {
4006 if (rkt_last != NULL) {
4007 /* Update PartitionArrayCnt */
4008 rd_kafka_buf_update_i32(rkbuf,
4009 of_PartitionArrayCnt,
4010 PartitionArrayCnt);
4011 }
4012
4013 /* Topic name */
4014 rd_kafka_buf_write_kstr(rkbuf,
4015 rktp->rktp_rkt->rkt_topic);
4016 TopicArrayCnt++;
4017 rkt_last = rktp->rktp_rkt;
4018 /* Partition count */
4019 of_PartitionArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
4020 PartitionArrayCnt = 0;
4021 }
4022
4023 PartitionArrayCnt++;
4024 /* Partition */
4025 rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition);
4026 /* FetchOffset */
4027 rd_kafka_buf_write_i64(rkbuf, rktp->rktp_offsets.fetch_offset);
4028 /* MaxBytes */
4029 rd_kafka_buf_write_i32(rkbuf, rktp->rktp_fetch_msg_max_bytes);
4030
4031 rd_rkb_dbg(rkb, FETCH, "FETCH",
4032 "Fetch topic %.*s [%"PRId32"] at offset %"PRId64
4033 " (v%d)",
4034 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4035 rktp->rktp_partition,
4036 rktp->rktp_offsets.fetch_offset,
4037 rktp->rktp_fetch_version);
4038
4039 /* Add toppar + op version mapping. */
4040 tver = rd_list_add(rkbuf->rkbuf_rktp_vers, NULL);
4041 tver->s_rktp = rd_kafka_toppar_keep(rktp);
4042 tver->version = rktp->rktp_fetch_version;
4043
4044 cnt++;
4045 } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
4046 rktp, rktp_activelink)) !=
4047 rkb->rkb_active_toppar_next);
4048
4049 /* Update next toppar to fetch in round-robin list. */
4050 rd_kafka_broker_active_toppar_next(
4051 rkb,
4052 rktp ?
4053 CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
4054 rktp, rktp_activelink) : NULL);
4055
4056 rd_rkb_dbg(rkb, FETCH, "FETCH", "Fetch %i/%i/%i toppar(s)",
4057 cnt, rkb->rkb_active_toppar_cnt, rkb->rkb_toppar_cnt);
4058 if (!cnt) {
4059 rd_kafka_buf_destroy(rkbuf);
4060 return cnt;
4061 }
4062
4063 if (rkt_last != NULL) {
4064 /* Update last topic's PartitionArrayCnt */
4065 rd_kafka_buf_update_i32(rkbuf,
4066 of_PartitionArrayCnt,
4067 PartitionArrayCnt);
4068 }
4069
4070 /* Update TopicArrayCnt */
4071 rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, TopicArrayCnt);
4072
4073 /* Consider Fetch requests blocking if fetch.wait.max.ms >= 1s */
4074 if (rkb->rkb_rk->rk_conf.fetch_wait_max_ms >= 1000)
4075 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
4076
4077 /* Use configured timeout */
4078 rd_kafka_buf_set_timeout(rkbuf,
4079 rkb->rkb_rk->rk_conf.socket_timeout_ms +
4080 rkb->rkb_rk->rk_conf.fetch_wait_max_ms,
4081 now);
4082
4083 /* Sort toppar versions for quicker lookups in Fetch response. */
4084 rd_list_sort(rkbuf->rkbuf_rktp_vers, rd_kafka_toppar_ver_cmp);
4085
4086 rkb->rkb_fetching = 1;
4087 rd_kafka_broker_buf_enq1(rkb, rkbuf, rd_kafka_broker_fetch_reply, NULL);
4088
4089 return cnt;
4090}
4091
4092
4093
4094
4095/**
4096 * Consumer serving
4097 */
4098static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb,
4099 rd_ts_t abs_timeout) {
4100 unsigned int initial_state = rkb->rkb_state;
4101 rd_ts_t now;
4102
4103 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
4104
4105 rd_kafka_broker_lock(rkb);
4106
4107 while (!rd_kafka_broker_terminating(rkb) &&
4108 rkb->rkb_state == initial_state &&
4109 abs_timeout > (now = rd_clock())) {
4110 rd_ts_t min_backoff;
4111
4112 rd_kafka_broker_unlock(rkb);
4113
4114 /* Serve toppars */
4115 min_backoff = rd_kafka_broker_toppars_serve(rkb);
4116 if (rkb->rkb_ts_fetch_backoff > now &&
4117 rkb->rkb_ts_fetch_backoff < min_backoff)
4118 min_backoff = rkb->rkb_ts_fetch_backoff;
4119
4120 if (min_backoff < RD_TS_MAX &&
4121 rkb->rkb_state != RD_KAFKA_BROKER_STATE_UP) {
4122 /* There are partitions to fetch but the
4123 * connection is not up. */
4124 rkb->rkb_persistconn.internal++;
4125 }
4126
4127 /* Send Fetch request message for all underflowed toppars
4128 * if the connection is up and there are no outstanding
4129 * fetch requests for this connection. */
4130 if (!rkb->rkb_fetching &&
4131 rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
4132 if (min_backoff < now) {
4133 rd_kafka_broker_fetch_toppars(rkb, now);
4134 min_backoff = abs_timeout;
4135 } else if (min_backoff < RD_TS_MAX)
4136 rd_rkb_dbg(rkb, FETCH, "FETCH",
4137 "Fetch backoff for %"PRId64
4138 "ms",
4139 (min_backoff-now)/1000);
4140 } else {
4141 /* Nothing needs to be done, next wakeup
4142 * is from ops, state change, IO, or this timeout */
4143 min_backoff = abs_timeout;
4144 }
4145
4146 /* Check and move retry buffers */
4147 if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0))
4148 rd_kafka_broker_retry_bufs_move(rkb);
4149
4150 if (min_backoff > abs_timeout)
4151 min_backoff = abs_timeout;
4152
4153 rd_kafka_broker_ops_io_serve(rkb, min_backoff);
4154
4155 rd_kafka_broker_lock(rkb);
4156 }
4157
4158 rd_kafka_broker_unlock(rkb);
4159}
4160
4161
4162/**
4163 * @brief Serve broker thread according to client type.
4164 * May be called in any broker state.
4165 *
4166 * This function is to be called from the state machine in
4167 * rd_kafka_broker_thread_main, and will return when
4168 * there was a state change, or the handle is terminating.
4169 *
4170 * Broker threads are triggered by three things:
4171 * - Ops from other parts of librdkafka / app.
4172 * This is the rkb_ops queue which is served from
4173 * rd_kafka_broker_ops_io_serve().
4174 * - IO from broker socket.
4175 * The ops queue is also IO-triggered to provide
4176 * quick wakeup when thread is blocking on IO.
4177 * Also serverd from rd_kafka_broker_ops_io_serve().
4178 * When there is no broker socket only the ops
4179 * queue is served.
4180 * - Ops/IO timeout when there were no ops or
4181 * IO events within a variable timeout.
4182 *
4183 * For each iteration of the loops in producer_serve(), consumer_serve(),
4184 * etc, the Ops and IO are polled, and the client type specific
4185 * logic is executed. For the consumer this logic checks which partitions
4186 * to fetch or backoff, and sends Fetch requests.
4187 * The producer checks for messages to batch and transmit.
4188 * All types check for request timeouts, etc.
4189 *
4190 * Wakeups
4191 * =======
4192 * The logic returns a next wakeup time which controls how long the
4193 * next Ops/IO poll may block before the logic wants to run again;
4194 * this is typically controlled by `linger.ms` on the Producer
4195 * and fetch backoffs on the consumer.
4196 *
4197 * Remote threads may also want to wake up the Ops/IO poll so that
4198 * the logic is run more quickly. For example when a new message
4199 * is enqueued by produce() it is important that it is batched
4200 * and transmitted within the configured `linger.ms`.
4201 *
4202 * Any op enqueued on the broker ops queue (rkb_ops) will automatically
4203 * trigger a wakeup of the broker thread (either by wakeup_fd IO event
4204 * or by the conditional variable of rkb_ops being triggered - or both).
4205 *
4206 * Produced messages are not enqueued on the rkb_ops queue but on
4207 * the partition's rktp_msgq message queue. To provide quick wakeups
4208 * the partition has a reference to the partition's current leader broker
4209 * thread's rkb_ops queue, rktp_msgq_wakeup_q.
4210 * When enqueuing a message on the partition queue and the queue was
4211 * previously empty, the rktp_msgq_wakeup_q (which is rkb_ops) is woken up
4212 * by rd_kafka_q_yield(), which sets a YIELD flag and triggers the cond var
4213 * to wake up the broker thread (without allocating and enqueuing an rko).
4214 * This also triggers the wakeup_fd of rkb_ops, if necessary.
4215 *
4216 * When sparse connections is enabled the broker will linger in the
4217 * INIT state until there's a need for a connection, in which case
4218 * it will set its state to DOWN to trigger the connection.
4219 * This is controlled both by the shared rkb_persistconn atomic counters
4220 * that may be updated from other parts of the code, as well as the
4221 * temporary per broker_serve() rkb_persistconn.internal counter which
4222 * is used by the broker handler code to detect if a connection is needed,
4223 * such as when a partition is being produced to.
4224 *
4225 *
4226 * @param timeout_ms The maximum timeout for blocking Ops/IO.
4227 *
4228 * @locality broker thread
4229 * @locks none
4230 */
4231static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) {
4232 rd_ts_t abs_timeout;
4233
4234 if (unlikely(rd_kafka_terminating(rkb->rkb_rk) ||
4235 timeout_ms == RD_POLL_NOWAIT))
4236 timeout_ms = 1;
4237 else if (timeout_ms == RD_POLL_INFINITE)
4238 timeout_ms = rd_kafka_max_block_ms;
4239
4240 abs_timeout = rd_timeout_init(timeout_ms);
4241 /* Must be a valid absolute time from here on. */
4242 rd_assert(abs_timeout > 0);
4243
4244 /* rkb_persistconn.internal is the per broker_serve()
4245 * automatic counter that keeps track of anything
4246 * in the producer/consumer logic needs this broker connection
4247 * to be up. */
4248 rkb->rkb_persistconn.internal = 0;
4249
4250 if (rkb->rkb_source == RD_KAFKA_INTERNAL)
4251 rd_kafka_broker_internal_serve(rkb, abs_timeout);
4252 else if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
4253 rd_kafka_broker_producer_serve(rkb, abs_timeout);
4254 else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER)
4255 rd_kafka_broker_consumer_serve(rkb, abs_timeout);
4256}
4257
4258
4259
4260
4261
4262static int rd_kafka_broker_thread_main (void *arg) {
4263 rd_kafka_broker_t *rkb = arg;
4264
4265 rd_kafka_set_thread_name("%s", rkb->rkb_name);
4266 rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);
4267
4268 (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
4269
4270 /* Our own refcount was increased just prior to thread creation,
4271 * when refcount drops to 1 it is just us left and the broker
4272 * thread should terminate. */
4273
4274 /* Acquire lock (which was held by thread creator during creation)
4275 * to synchronise state. */
4276 rd_kafka_broker_lock(rkb);
4277 rd_kafka_broker_unlock(rkb);
4278
4279 rd_rkb_dbg(rkb, BROKER, "BRKMAIN", "Enter main broker thread");
4280
4281 while (!rd_kafka_broker_terminating(rkb)) {
4282 int backoff;
4283 int r;
4284
4285 redo:
4286 switch (rkb->rkb_state)
4287 {
4288 case RD_KAFKA_BROKER_STATE_INIT:
4289 /* Check if there is demand for a connection
4290 * to this broker, if so jump to TRY_CONNECT state. */
4291 if (!rd_kafka_broker_needs_connection(rkb)) {
4292 rd_kafka_broker_serve(rkb,
4293 rd_kafka_max_block_ms);
4294 break;
4295 }
4296
4297 /* The INIT state also exists so that an initial
4298 * connection failure triggers a state transition
4299 * which might trigger a ALL_BROKERS_DOWN error. */
4300 rd_kafka_broker_lock(rkb);
4301 rd_kafka_broker_set_state(
4302 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
4303 rd_kafka_broker_unlock(rkb);
4304 goto redo; /* effectively a fallthru to TRY_CONNECT */
4305
4306 case RD_KAFKA_BROKER_STATE_DOWN:
4307 rd_kafka_broker_lock(rkb);
4308 if (rkb->rkb_rk->rk_conf.sparse_connections)
4309 rd_kafka_broker_set_state(
4310 rkb, RD_KAFKA_BROKER_STATE_INIT);
4311 else
4312 rd_kafka_broker_set_state(
4313 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
4314 rd_kafka_broker_unlock(rkb);
4315 goto redo; /* effectively a fallthru to TRY_CONNECT */
4316
4317 case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
4318 if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
4319 rd_kafka_broker_lock(rkb);
4320 rd_kafka_broker_set_state(rkb,
4321 RD_KAFKA_BROKER_STATE_UP);
4322 rd_kafka_broker_unlock(rkb);
4323 break;
4324 }
4325
4326 if (unlikely(rd_kafka_terminating(rkb->rkb_rk)))
4327 rd_kafka_broker_serve(rkb, 1000);
4328
4329 if (!rd_kafka_sasl_ready(rkb->rkb_rk)) {
4330 /* SASL provider not yet ready. */
4331 rd_kafka_broker_serve(rkb,
4332 rd_kafka_max_block_ms);
4333 /* Continue while loop to try again (as long as
4334 * we are not terminating). */
4335 continue;
4336 }
4337
4338 /* Throttle & jitter reconnects to avoid
4339 * thundering horde of reconnecting clients after
4340 * a broker / network outage. Issue #403 */
4341 backoff = rd_kafka_broker_reconnect_backoff(rkb,
4342 rd_clock());
4343 if (backoff > 0) {
4344 rd_rkb_dbg(rkb, BROKER, "RECONNECT",
4345 "Delaying next reconnect by %dms",
4346 backoff);
4347 rd_kafka_broker_serve(rkb, (int)backoff);
4348 continue;
4349 }
4350
4351 /* Initiate asynchronous connection attempt.
4352 * Only the host lookup is blocking here. */
4353 r = rd_kafka_broker_connect(rkb);
4354 if (r == -1) {
4355 /* Immediate failure, most likely host
4356 * resolving failed.
4357 * Try the next resolve result until we've
4358 * tried them all, in which case we sleep a
4359 * short while to avoid busy looping. */
4360 if (!rkb->rkb_rsal ||
4361 rkb->rkb_rsal->rsal_cnt == 0 ||
4362 rkb->rkb_rsal->rsal_curr + 1 ==
4363 rkb->rkb_rsal->rsal_cnt)
4364 rd_kafka_broker_serve(
4365 rkb, rd_kafka_max_block_ms);
4366 } else if (r == 0) {
4367 /* Broker has no hostname yet, wait
4368 * for hostname to be set and connection
4369 * triggered by received OP_CONNECT. */
4370 rd_kafka_broker_serve(rkb,
4371 rd_kafka_max_block_ms);
4372 } else {
4373 /* Connection in progress, state will
4374 * have changed to STATE_CONNECT. */
4375 }
4376
4377 break;
4378
4379 case RD_KAFKA_BROKER_STATE_CONNECT:
4380 case RD_KAFKA_BROKER_STATE_AUTH:
4381 case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
4382 case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
4383 /* Asynchronous connect in progress. */
4384 rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
4385
4386 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN) {
4387 /* Connect failure.
4388 * Try the next resolve result until we've
4389 * tried them all, in which case we sleep a
4390 * short while to avoid busy looping. */
4391 if (!rkb->rkb_rsal ||
4392 rkb->rkb_rsal->rsal_cnt == 0 ||
4393 rkb->rkb_rsal->rsal_curr + 1 ==
4394 rkb->rkb_rsal->rsal_cnt)
4395 rd_kafka_broker_serve(
4396 rkb, rd_kafka_max_block_ms);
4397 }
4398 break;
4399
4400 case RD_KAFKA_BROKER_STATE_UPDATE:
4401 /* FALLTHRU */
4402 case RD_KAFKA_BROKER_STATE_UP:
4403 rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
4404
4405 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UPDATE) {
4406 rd_kafka_broker_lock(rkb);
4407 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
4408 rd_kafka_broker_unlock(rkb);
4409 }
4410 break;
4411 }
4412
4413 if (rd_kafka_terminating(rkb->rkb_rk)) {
4414 /* Handle is terminating: fail the send+retry queue
4415 * to speed up termination, otherwise we'll
4416 * need to wait for request timeouts. */
4417 int r;
4418
4419 r = rd_kafka_broker_bufq_timeout_scan(
4420 rkb, 0, &rkb->rkb_outbufs, NULL, -1,
4421 RD_KAFKA_RESP_ERR__DESTROY, 0, NULL, 0);
4422 r += rd_kafka_broker_bufq_timeout_scan(
4423 rkb, 0, &rkb->rkb_retrybufs, NULL, -1,
4424 RD_KAFKA_RESP_ERR__DESTROY, 0, NULL, 0);
4425 rd_rkb_dbg(rkb, BROKER, "TERMINATE",
4426 "Handle is terminating in state %s: "
4427 "%d refcnts (%p), %d toppar(s), "
4428 "%d active toppar(s), "
4429 "%d outbufs, %d waitresps, %d retrybufs: "
4430 "failed %d request(s) in retry+outbuf",
4431 rd_kafka_broker_state_names[rkb->rkb_state],
4432 rd_refcnt_get(&rkb->rkb_refcnt),
4433 &rkb->rkb_refcnt,
4434 rkb->rkb_toppar_cnt,
4435 rkb->rkb_active_toppar_cnt,
4436 (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
4437 (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps),
4438 (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs),
4439 r);
4440 }
4441 }
4442
4443 if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
4444 rd_kafka_wrlock(rkb->rkb_rk);
4445 TAILQ_REMOVE(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
4446 if (rkb->rkb_nodeid != -1)
4447 rd_list_remove(&rkb->rkb_rk->rk_broker_by_id, rkb);
4448 (void)rd_atomic32_sub(&rkb->rkb_rk->rk_broker_cnt, 1);
4449 rd_kafka_wrunlock(rkb->rkb_rk);
4450 }
4451
4452 rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__DESTROY, NULL);
4453
4454 /* Disable and drain ops queue.
4455 * Simply purging the ops queue risks leaving dangling references
4456 * for ops such as PARTITION_JOIN/PARTITION_LEAVE where the broker
4457 * reference is not maintained in the rko (but in rktp_next_leader).
4458 * #1596 */
4459 rd_kafka_q_disable(rkb->rkb_ops);
4460 while (rd_kafka_broker_ops_serve(rkb, RD_POLL_NOWAIT))
4461 ;
4462
4463 rd_kafka_broker_destroy(rkb);
4464
4465#if WITH_SSL
4466 /* Remove OpenSSL per-thread error state to avoid memory leaks */
4467#if OPENSSL_VERSION_NUMBER >= 0x10100000L && !defined(LIBRESSL_VERSION_NUMBER)
4468 /*(OpenSSL libraries handle thread init and deinit)
4469 * https://github.com/openssl/openssl/pull/1048 */
4470#elif OPENSSL_VERSION_NUMBER >= 0x10000000L
4471 ERR_remove_thread_state(NULL);
4472#endif
4473#endif
4474
4475 rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
4476
4477 return 0;
4478}
4479
4480
4481/**
4482 * Final destructor. Refcnt must be 0.
4483 */
4484void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb) {
4485
4486 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
4487 rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
4488 rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
4489 rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_retrybufs.rkbq_bufs));
4490 rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_toppars));
4491
4492 if (rkb->rkb_source != RD_KAFKA_INTERNAL &&
4493 (rkb->rkb_rk->rk_conf.security_protocol ==
4494 RD_KAFKA_PROTO_SASL_PLAINTEXT ||
4495 rkb->rkb_rk->rk_conf.security_protocol ==
4496 RD_KAFKA_PROTO_SASL_SSL))
4497 rd_kafka_sasl_broker_term(rkb);
4498
4499 if (rkb->rkb_wakeup_fd[0] != -1)
4500 rd_close(rkb->rkb_wakeup_fd[0]);
4501 if (rkb->rkb_wakeup_fd[1] != -1)
4502 rd_close(rkb->rkb_wakeup_fd[1]);
4503
4504 if (rkb->rkb_recv_buf)
4505 rd_kafka_buf_destroy(rkb->rkb_recv_buf);
4506
4507 if (rkb->rkb_rsal)
4508 rd_sockaddr_list_destroy(rkb->rkb_rsal);
4509
4510 if (rkb->rkb_ApiVersions)
4511 rd_free(rkb->rkb_ApiVersions);
4512 rd_free(rkb->rkb_origname);
4513
4514 rd_kafka_q_purge(rkb->rkb_ops);
4515 rd_kafka_q_destroy_owner(rkb->rkb_ops);
4516
4517 rd_avg_destroy(&rkb->rkb_avg_int_latency);
4518 rd_avg_destroy(&rkb->rkb_avg_outbuf_latency);
4519 rd_avg_destroy(&rkb->rkb_avg_rtt);
4520 rd_avg_destroy(&rkb->rkb_avg_throttle);
4521
4522 mtx_lock(&rkb->rkb_logname_lock);
4523 rd_free(rkb->rkb_logname);
4524 rkb->rkb_logname = NULL;
4525 mtx_unlock(&rkb->rkb_logname_lock);
4526 mtx_destroy(&rkb->rkb_logname_lock);
4527
4528 mtx_destroy(&rkb->rkb_lock);
4529
4530 rd_refcnt_destroy(&rkb->rkb_refcnt);
4531
4532 rd_free(rkb);
4533}
4534
4535/**
4536 * Returns the internal broker with refcnt increased.
4537 */
4538rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk) {
4539 rd_kafka_broker_t *rkb;
4540
4541 mtx_lock(&rk->rk_internal_rkb_lock);
4542 rkb = rk->rk_internal_rkb;
4543 if (rkb)
4544 rd_kafka_broker_keep(rkb);
4545 mtx_unlock(&rk->rk_internal_rkb_lock);
4546
4547 return rkb;
4548}
4549
4550
4551/**
4552 * Adds a broker with refcount set to 1.
4553 * If 'source' is RD_KAFKA_INTERNAL an internal broker is added
4554 * that does not actually represent or connect to a real broker, it is used
4555 * for serving unassigned toppar's op queues.
4556 *
4557 * Locks: rd_kafka_wrlock(rk) must be held
4558 */
4559rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
4560 rd_kafka_confsource_t source,
4561 rd_kafka_secproto_t proto,
4562 const char *name, uint16_t port,
4563 int32_t nodeid) {
4564 rd_kafka_broker_t *rkb;
4565 int r;
4566#ifndef _MSC_VER
4567 sigset_t newset, oldset;
4568#endif
4569
4570 rkb = rd_calloc(1, sizeof(*rkb));
4571
4572 if (source != RD_KAFKA_LOGICAL) {
4573 rd_kafka_mk_nodename(rkb->rkb_nodename,
4574 sizeof(rkb->rkb_nodename),
4575 name, port);
4576 rd_kafka_mk_brokername(rkb->rkb_name, sizeof(rkb->rkb_name),
4577 proto, rkb->rkb_nodename,
4578 nodeid, source);
4579 } else {
4580 /* Logical broker does not have a nodename (address) or port
4581 * at initialization. */
4582 rd_snprintf(rkb->rkb_name, sizeof(rkb->rkb_name), "%s", name);
4583 }
4584
4585 rkb->rkb_source = source;
4586 rkb->rkb_rk = rk;
4587 rkb->rkb_ts_state = rd_clock();
4588 rkb->rkb_nodeid = nodeid;
4589 rkb->rkb_proto = proto;
4590 rkb->rkb_port = port;
4591 rkb->rkb_origname = rd_strdup(name);
4592
4593 mtx_init(&rkb->rkb_lock, mtx_plain);
4594 mtx_init(&rkb->rkb_logname_lock, mtx_plain);
4595 rkb->rkb_logname = rd_strdup(rkb->rkb_name);
4596 TAILQ_INIT(&rkb->rkb_toppars);
4597 CIRCLEQ_INIT(&rkb->rkb_active_toppars);
4598 rd_kafka_bufq_init(&rkb->rkb_outbufs);
4599 rd_kafka_bufq_init(&rkb->rkb_waitresps);
4600 rd_kafka_bufq_init(&rkb->rkb_retrybufs);
4601 rkb->rkb_ops = rd_kafka_q_new(rk);
4602 rd_avg_init(&rkb->rkb_avg_int_latency, RD_AVG_GAUGE, 0, 100*1000, 2,
4603 rk->rk_conf.stats_interval_ms ? 1 : 0);
4604 rd_avg_init(&rkb->rkb_avg_outbuf_latency, RD_AVG_GAUGE, 0, 100*1000, 2,
4605 rk->rk_conf.stats_interval_ms ? 1 : 0);
4606 rd_avg_init(&rkb->rkb_avg_rtt, RD_AVG_GAUGE, 0, 500*1000, 2,
4607 rk->rk_conf.stats_interval_ms ? 1 : 0);
4608 rd_avg_init(&rkb->rkb_avg_throttle, RD_AVG_GAUGE, 0, 5000*1000, 2,
4609 rk->rk_conf.stats_interval_ms ? 1 : 0);
4610 rd_refcnt_init(&rkb->rkb_refcnt, 0);
4611 rd_kafka_broker_keep(rkb); /* rk_broker's refcount */
4612
4613 rkb->rkb_reconnect_backoff_ms = rk->rk_conf.reconnect_backoff_ms;
4614 rd_atomic32_init(&rkb->rkb_persistconn.coord, 0);
4615
4616 /* ApiVersion fallback interval */
4617 if (rkb->rkb_rk->rk_conf.api_version_request) {
4618 rd_interval_init(&rkb->rkb_ApiVersion_fail_intvl);
4619 rd_interval_fixed(&rkb->rkb_ApiVersion_fail_intvl,
4620 rkb->rkb_rk->rk_conf.api_version_fallback_ms*1000);
4621 }
4622
4623 rd_interval_init(&rkb->rkb_suppress.unsupported_compression);
4624 rd_interval_init(&rkb->rkb_suppress.unsupported_kip62);
4625
4626 /* Set next intervalled metadata refresh, offset by a random
4627 * value to avoid all brokers to be queried simultaneously. */
4628 if (rkb->rkb_rk->rk_conf.metadata_refresh_interval_ms >= 0)
4629 rkb->rkb_ts_metadata_poll = rd_clock() +
4630 (rkb->rkb_rk->rk_conf.
4631 metadata_refresh_interval_ms * 1000) +
4632 (rd_jitter(500,1500) * 1000);
4633 else /* disabled */
4634 rkb->rkb_ts_metadata_poll = UINT64_MAX;
4635
4636#ifndef _MSC_VER
4637 /* Block all signals in newly created thread.
4638 * To avoid race condition we block all signals in the calling
4639 * thread, which the new thread will inherit its sigmask from,
4640 * and then restore the original sigmask of the calling thread when
4641 * we're done creating the thread.
4642 * NOTE: term_sig remains unblocked since we use it on termination
4643 * to quickly interrupt system calls. */
4644 sigemptyset(&oldset);
4645 sigfillset(&newset);
4646 if (rkb->rkb_rk->rk_conf.term_sig)
4647 sigdelset(&newset, rkb->rkb_rk->rk_conf.term_sig);
4648 pthread_sigmask(SIG_SETMASK, &newset, &oldset);
4649#endif
4650
4651 /*
4652 * Fd-based queue wake-ups using a non-blocking pipe.
4653 * Writes are best effort, if the socket queue is full
4654 * the write fails (silently) but this has no effect on latency
4655 * since the POLLIN flag will already have been raised for fd.
4656 */
4657 rkb->rkb_wakeup_fd[0] = -1;
4658 rkb->rkb_wakeup_fd[1] = -1;
4659 rkb->rkb_toppar_wakeup_fd = -1;
4660
4661 if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
4662 rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
4663 "Failed to setup broker queue wake-up fds: "
4664 "%s: disabling low-latency mode",
4665 rd_strerror(r));
4666
4667 } else if (source == RD_KAFKA_INTERNAL) {
4668 /* nop: internal broker has no IO transport. */
4669
4670 } else {
4671 char onebyte = 1;
4672
4673 rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
4674 "Enabled low-latency ops queue wake-ups");
4675 rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
4676 &onebyte, sizeof(onebyte));
4677 }
4678
4679 /* Lock broker's lock here to synchronise state, i.e., hold off
4680 * the broker thread until we've finalized the rkb. */
4681 rd_kafka_broker_lock(rkb);
4682 rd_kafka_broker_keep(rkb); /* broker thread's refcnt */
4683 if (thrd_create(&rkb->rkb_thread,
4684 rd_kafka_broker_thread_main, rkb) != thrd_success) {
4685 char tmp[512];
4686 rd_snprintf(tmp, sizeof(tmp),
4687 "Unable to create broker thread: %s (%i)",
4688 rd_strerror(errno), errno);
4689 rd_kafka_log(rk, LOG_CRIT, "THREAD", "%s", tmp);
4690
4691 rd_kafka_broker_unlock(rkb);
4692
4693 /* Send ERR op back to application for processing. */
4694 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
4695 "%s", tmp);
4696
4697 rd_free(rkb);
4698
4699#ifndef _MSC_VER
4700 /* Restore sigmask of caller */
4701 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
4702#endif
4703
4704 return NULL;
4705 }
4706
4707 if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
4708 if (rk->rk_conf.security_protocol ==
4709 RD_KAFKA_PROTO_SASL_PLAINTEXT ||
4710 rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL)
4711 rd_kafka_sasl_broker_init(rkb);
4712
4713 /* Insert broker at head of list, idea is that
4714 * newer brokers are more relevant than old ones,
4715 * and in particular LEARNED brokers are more relevant
4716 * than CONFIGURED (bootstrap) and LOGICAL brokers. */
4717 TAILQ_INSERT_HEAD(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
4718 (void)rd_atomic32_add(&rkb->rkb_rk->rk_broker_cnt, 1);
4719
4720 if (rkb->rkb_nodeid != -1) {
4721 rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb);
4722 rd_list_sort(&rkb->rkb_rk->rk_broker_by_id,
4723 rd_kafka_broker_cmp_by_id);
4724 }
4725
4726 rd_rkb_dbg(rkb, BROKER, "BROKER",
4727 "Added new broker with NodeId %"PRId32,
4728 rkb->rkb_nodeid);
4729 }
4730
4731 rd_kafka_broker_unlock(rkb);
4732
4733#ifndef _MSC_VER
4734 /* Restore sigmask of caller */
4735 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
4736#endif
4737
4738 return rkb;
4739}
4740
4741
4742/**
4743 * @brief Adds a logical broker.
4744 *
4745 * Logical brokers act just like any broker handle, but will not have
4746 * an initial address set. The address (or nodename is it is called
4747 * internally) can be set from another broker handle
4748 * by calling rd_kafka_broker_set_nodename().
4749 *
4750 * This allows maintaining a logical group coordinator broker
4751 * handle that can ambulate between real broker addresses.
4752 *
4753 * Logical broker constraints:
4754 * - will not have a broker-id set (-1).
4755 * - will not have a port set (0).
4756 * - the address for the broker may change.
4757 * - the name of broker will not correspond to the address,
4758 * but the \p name given here.
4759 *
4760 * @returns a new broker, holding a refcount for the caller.
4761 *
4762 * @locality any rdkafka thread
4763 * @locks none
4764 */
4765rd_kafka_broker_t *rd_kafka_broker_add_logical (rd_kafka_t *rk,
4766 const char *name) {
4767 rd_kafka_broker_t *rkb;
4768
4769 rd_kafka_wrlock(rk);
4770 rkb = rd_kafka_broker_add(rk, RD_KAFKA_LOGICAL,
4771 rk->rk_conf.security_protocol,
4772 name, 0/*port*/, -1/*brokerid*/);
4773 rd_assert(rkb && *"failed to create broker thread");
4774 rd_kafka_wrunlock(rk);
4775
4776 rd_atomic32_add(&rk->rk_broker_addrless_cnt, 1);
4777
4778 rd_dassert(RD_KAFKA_BROKER_IS_LOGICAL(rkb));
4779 rd_kafka_broker_keep(rkb);
4780 return rkb;
4781}
4782
4783
4784/**
4785 * @brief Update the nodename (address) of broker \p rkb
4786 * with the nodename from broker \p from_rkb (may be NULL).
4787 *
4788 * If \p rkb is connected, the connection will be torn down.
4789 * A new connection may be attempted to the new address
4790 * if a persistent connection is needed (standard connection rules).
4791 *
4792 * The broker's logname is also updated to include \p from_rkb's
4793 * broker id.
4794 *
4795 * @param from_rkb Use the nodename from this broker. If NULL, clear
4796 * the \p rkb nodename.
4797 *
4798 * @remark Must only be called for logical brokers.
4799 *
4800 * @locks none
4801 */
4802void rd_kafka_broker_set_nodename (rd_kafka_broker_t *rkb,
4803 rd_kafka_broker_t *from_rkb) {
4804 char nodename[RD_KAFKA_NODENAME_SIZE];
4805 char brokername[RD_KAFKA_NODENAME_SIZE];
4806 int32_t nodeid;
4807 rd_bool_t changed = rd_false;
4808
4809 rd_assert(RD_KAFKA_BROKER_IS_LOGICAL(rkb));
4810
4811 rd_assert(rkb != from_rkb);
4812
4813 /* Get nodename from from_rkb */
4814 if (from_rkb) {
4815 rd_kafka_broker_lock(from_rkb);
4816 strncpy(nodename, from_rkb->rkb_nodename, sizeof(nodename));
4817 nodeid = from_rkb->rkb_nodeid;
4818 rd_kafka_broker_unlock(from_rkb);
4819 } else {
4820 *nodename = '\0';
4821 nodeid = -1;
4822 }
4823
4824 /* Set nodename on rkb */
4825 rd_kafka_broker_lock(rkb);
4826 if (strcmp(rkb->rkb_nodename, nodename)) {
4827 rd_rkb_dbg(rkb, BROKER, "NODENAME",
4828 "Broker nodename changed from \"%s\" to \"%s\"",
4829 rkb->rkb_nodename, nodename);
4830 strncpy(rkb->rkb_nodename, nodename,
4831 sizeof(rkb->rkb_nodename));
4832 rkb->rkb_nodename_epoch++;
4833 changed = rd_true;
4834 }
4835 rd_kafka_broker_unlock(rkb);
4836
4837 /* Update the log name to include (or exclude) the nodeid.
4838 * The nodeid is appended as "..logname../nodeid" */
4839 rd_kafka_mk_brokername(brokername, sizeof(brokername),
4840 rkb->rkb_proto,
4841 rkb->rkb_name, nodeid,
4842 rkb->rkb_source);
4843
4844 rd_kafka_broker_set_logname(rkb, brokername);
4845
4846 if (!changed)
4847 return;
4848
4849 if (*rkb->rkb_nodename)
4850 rd_atomic32_sub(&rkb->rkb_rk->rk_broker_addrless_cnt, 1);
4851 else
4852 rd_atomic32_add(&rkb->rkb_rk->rk_broker_addrless_cnt, 1);
4853
4854 /* Trigger a disconnect & reconnect */
4855 rd_kafka_broker_schedule_connection(rkb);
4856}
4857
4858
4859/**
4860 * @brief Find broker by nodeid (not -1) and
4861 * possibly filtered by state (unless -1).
4862 *
4863 * @param do_connect If sparse connections are enabled and the broker is found
4864 * but not up, a connection will be triggered.
4865 *
4866 * @locks: rd_kafka_*lock() MUST be held
4867 * @remark caller must release rkb reference by rd_kafka_broker_destroy()
4868 */
4869rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
4870 int32_t nodeid,
4871 int state,
4872 rd_bool_t do_connect) {
4873 rd_kafka_broker_t *rkb;
4874 rd_kafka_broker_t skel = { .rkb_nodeid = nodeid };
4875
4876 if (rd_kafka_terminating(rk))
4877 return NULL;
4878
4879 rkb = rd_list_find(&rk->rk_broker_by_id, &skel,
4880 rd_kafka_broker_cmp_by_id);
4881
4882 if (!rkb)
4883 return NULL;
4884
4885 if (state != -1) {
4886 int broker_state;
4887 rd_kafka_broker_lock(rkb);
4888 broker_state = (int)rkb->rkb_state;
4889 rd_kafka_broker_unlock(rkb);
4890
4891 if (broker_state != state) {
4892 if (do_connect &&
4893 broker_state == RD_KAFKA_BROKER_STATE_INIT &&
4894 rk->rk_conf.sparse_connections)
4895 rd_kafka_broker_schedule_connection(rkb);
4896 return NULL;
4897 }
4898 }
4899
4900 rd_kafka_broker_keep(rkb);
4901 return rkb;
4902}
4903
4904/**
4905 * Locks: rd_kafka_rdlock(rk) must be held
4906 * NOTE: caller must release rkb reference by rd_kafka_broker_destroy()
4907 */
4908static rd_kafka_broker_t *rd_kafka_broker_find (rd_kafka_t *rk,
4909 rd_kafka_secproto_t proto,
4910 const char *name,
4911 uint16_t port) {
4912 rd_kafka_broker_t *rkb;
4913 char nodename[RD_KAFKA_NODENAME_SIZE];
4914
4915 rd_kafka_mk_nodename(nodename, sizeof(nodename), name, port);
4916
4917 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4918 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
4919 continue;
4920
4921 rd_kafka_broker_lock(rkb);
4922 if (!rd_kafka_terminating(rk) &&
4923 rkb->rkb_proto == proto &&
4924 !strcmp(rkb->rkb_nodename, nodename)) {
4925 rd_kafka_broker_keep(rkb);
4926 rd_kafka_broker_unlock(rkb);
4927 return rkb;
4928 }
4929 rd_kafka_broker_unlock(rkb);
4930 }
4931
4932 return NULL;
4933}
4934
4935
4936/**
4937 * Parse a broker host name.
4938 * The string 'name' is modified and null-terminated portions of it
4939 * are returned in 'proto', 'host', and 'port'.
4940 *
4941 * Returns 0 on success or -1 on parse error.
4942 */
4943static int rd_kafka_broker_name_parse (rd_kafka_t *rk,
4944 char **name,
4945 rd_kafka_secproto_t *proto,
4946 const char **host,
4947 uint16_t *port) {
4948 char *s = *name;
4949 char *orig;
4950 char *n, *t, *t2;
4951
4952 /* Save a temporary copy of the original name for logging purposes */
4953 rd_strdupa(&orig, *name);
4954
4955 /* Find end of this name (either by delimiter or end of string */
4956 if ((n = strchr(s, ',')))
4957 *n = '\0';
4958 else
4959 n = s + strlen(s)-1;
4960
4961
4962 /* Check if this looks like an url. */
4963 if ((t = strstr(s, "://"))) {
4964 int i;
4965 /* "proto://host[:port]" */
4966
4967 if (t == s) {
4968 rd_kafka_log(rk, LOG_WARNING, "BROKER",
4969 "Broker name \"%s\" parse error: "
4970 "empty protocol name", orig);
4971 return -1;
4972 }
4973
4974 /* Make protocol uppercase */
4975 for (t2 = s ; t2 < t ; t2++)
4976 *t2 = toupper(*t2);
4977
4978 *t = '\0';
4979
4980 /* Find matching protocol by name. */
4981 for (i = 0 ; i < RD_KAFKA_PROTO_NUM ; i++)
4982 if (!rd_strcasecmp(s, rd_kafka_secproto_names[i]))
4983 break;
4984
4985 /* Unsupported protocol */
4986 if (i == RD_KAFKA_PROTO_NUM) {
4987 rd_kafka_log(rk, LOG_WARNING, "BROKER",
4988 "Broker name \"%s\" parse error: "
4989 "unsupported protocol \"%s\"", orig, s);
4990
4991 return -1;
4992 }
4993
4994 *proto = i;
4995
4996 /* Enforce protocol */
4997 if (rk->rk_conf.security_protocol != *proto) {
4998 rd_kafka_log(rk, LOG_WARNING, "BROKER",
4999 "Broker name \"%s\" parse error: "
5000 "protocol \"%s\" does not match "
5001 "security.protocol setting \"%s\"",
5002 orig, s,
5003 rd_kafka_secproto_names[
5004 rk->rk_conf.security_protocol]);
5005 return -1;
5006 }
5007
5008 /* Hostname starts here */
5009 s = t+3;
5010
5011 /* Ignore anything that looks like the path part of an URL */
5012 if ((t = strchr(s, '/')))
5013 *t = '\0';
5014
5015 } else
5016 *proto = rk->rk_conf.security_protocol; /* Default protocol */
5017
5018
5019 *port = RD_KAFKA_PORT;
5020 /* Check if port has been specified, but try to identify IPv6
5021 * addresses first:
5022 * t = last ':' in string
5023 * t2 = first ':' in string
5024 * If t and t2 are equal then only one ":" exists in name
5025 * and thus an IPv4 address with port specified.
5026 * Else if not equal and t is prefixed with "]" then it's an
5027 * IPv6 address with port specified.
5028 * Else no port specified. */
5029 if ((t = strrchr(s, ':')) &&
5030 ((t2 = strchr(s, ':')) == t || *(t-1) == ']')) {
5031 *t = '\0';
5032 *port = atoi(t+1);
5033 }
5034
5035 /* Empty host name -> localhost */
5036 if (!*s)
5037 s = "localhost";
5038
5039 *host = s;
5040 *name = n+1; /* past this name. e.g., next name/delimiter to parse */
5041
5042 return 0;
5043}
5044
5045/**
5046 * @brief Adds a (csv list of) broker(s).
5047 * Returns the number of brokers succesfully added.
5048 *
5049 * @locality any thread
5050 * @locks none
5051 */
5052int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist) {
5053 char *s_copy = rd_strdup(brokerlist);
5054 char *s = s_copy;
5055 int cnt = 0;
5056 rd_kafka_broker_t *rkb;
5057 int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
5058
5059 /* Parse comma-separated list of brokers. */
5060 while (*s) {
5061 uint16_t port;
5062 const char *host;
5063 rd_kafka_secproto_t proto;
5064
5065 if (*s == ',' || *s == ' ') {
5066 s++;
5067 continue;
5068 }
5069
5070 if (rd_kafka_broker_name_parse(rk, &s, &proto,
5071 &host, &port) == -1)
5072 break;
5073
5074 rd_kafka_wrlock(rk);
5075
5076 if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
5077 rkb->rkb_source == RD_KAFKA_CONFIGURED) {
5078 cnt++;
5079 } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED,
5080 proto, host, port,
5081 RD_KAFKA_NODEID_UA) != NULL)
5082 cnt++;
5083
5084 /* If rd_kafka_broker_find returned a broker its
5085 * reference needs to be released
5086 * See issue #193 */
5087 if (rkb)
5088 rd_kafka_broker_destroy(rkb);
5089
5090 rd_kafka_wrunlock(rk);
5091 }
5092
5093 rd_free(s_copy);
5094
5095 if (rk->rk_conf.sparse_connections && cnt > 0 && pre_cnt == 0) {
5096 /* Sparse connections:
5097 * If this was the first set of brokers added,
5098 * select a random one to trigger the initial cluster
5099 * connection. */
5100 rd_kafka_rdlock(rk);
5101 rd_kafka_connect_any(rk, "bootstrap servers added");
5102 rd_kafka_rdunlock(rk);
5103 }
5104
5105 return cnt;
5106}
5107
5108
5109int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) {
5110 return rd_kafka_brokers_add0(rk, brokerlist);
5111}
5112
5113
5114/**
5115 * Adds a new broker or updates an existing one.
5116 *
5117 */
5118void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
5119 const struct rd_kafka_metadata_broker *mdb) {
5120 rd_kafka_broker_t *rkb;
5121 char nodename[RD_KAFKA_NODENAME_SIZE];
5122 int needs_update = 0;
5123
5124 rd_kafka_mk_nodename(nodename, sizeof(nodename), mdb->host, mdb->port);
5125
5126 rd_kafka_wrlock(rk);
5127 if (unlikely(rd_kafka_terminating(rk))) {
5128 /* Dont update metadata while terminating, do this
5129 * after acquiring lock for proper synchronisation */
5130 rd_kafka_wrunlock(rk);
5131 return;
5132 }
5133
5134 if ((rkb = rd_kafka_broker_find_by_nodeid(rk, mdb->id))) {
5135 /* Broker matched by nodeid, see if we need to update
5136 * the hostname. */
5137 if (strcmp(rkb->rkb_nodename, nodename))
5138 needs_update = 1;
5139 } else if ((rkb = rd_kafka_broker_find(rk, proto,
5140 mdb->host, mdb->port))) {
5141 /* Broker matched by hostname (but not by nodeid),
5142 * update the nodeid. */
5143 needs_update = 1;
5144
5145 } else {
5146 rd_kafka_broker_add(rk, RD_KAFKA_LEARNED,
5147 proto, mdb->host, mdb->port, mdb->id);
5148 }
5149
5150 rd_kafka_wrunlock(rk);
5151
5152 if (rkb) {
5153 /* Existing broker */
5154 if (needs_update) {
5155 rd_kafka_op_t *rko;
5156
5157 rko = rd_kafka_op_new(RD_KAFKA_OP_NODE_UPDATE);
5158 strncpy(rko->rko_u.node.nodename, nodename,
5159 sizeof(rko->rko_u.node.nodename)-1);
5160 rko->rko_u.node.nodeid = mdb->id;
5161 rd_kafka_q_enq(rkb->rkb_ops, rko);
5162 }
5163 rd_kafka_broker_destroy(rkb);
5164 }
5165}
5166
5167
5168/**
5169 * Returns a thread-safe temporary copy of the broker name.
5170 * Must not be called more than 4 times from the same expression.
5171 *
5172 * Locks: none
5173 * Locality: any thread
5174 */
5175const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb) {
5176 static RD_TLS char ret[4][RD_KAFKA_NODENAME_SIZE];
5177 static RD_TLS int reti = 0;
5178
5179 reti = (reti + 1) % 4;
5180 mtx_lock(&rkb->rkb_logname_lock);
5181 rd_snprintf(ret[reti], sizeof(ret[reti]), "%s", rkb->rkb_logname);
5182 mtx_unlock(&rkb->rkb_logname_lock);
5183
5184 return ret[reti];
5185}
5186
5187
5188/**
5189 * @brief Send dummy OP to broker thread to wake it up from IO sleep.
5190 *
5191 * @locality any
5192 * @locks any
5193 */
5194void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb) {
5195 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_WAKEUP);
5196 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
5197 rd_kafka_q_enq(rkb->rkb_ops, rko);
5198 rd_rkb_dbg(rkb, QUEUE, "WAKEUP", "Wake-up");
5199}
5200
5201/**
5202 * @brief Wake up all broker threads that are in at least state \p min_state
5203 *
5204 * @locality any
5205 * @locks none: rd_kafka_*lock() MUST NOT be held
5206 *
5207 * @returns the number of broker threads woken up
5208 */
5209int rd_kafka_all_brokers_wakeup (rd_kafka_t *rk, int min_state) {
5210 int cnt = 0;
5211 rd_kafka_broker_t *rkb;
5212
5213 rd_kafka_rdlock(rk);
5214 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
5215 int do_wakeup;
5216
5217 rd_kafka_broker_lock(rkb);
5218 do_wakeup = (int)rkb->rkb_state >= min_state;
5219 rd_kafka_broker_unlock(rkb);
5220
5221 if (do_wakeup) {
5222 rd_kafka_broker_wakeup(rkb);
5223 cnt += 1;
5224 }
5225 }
5226 rd_kafka_rdunlock(rk);
5227
5228 return cnt;
5229}
5230
5231/**
5232 * @brief Filter out brokers that have at least one connection attempt.
5233 */
5234static int rd_kafka_broker_filter_never_connected (rd_kafka_broker_t *rkb,
5235 void *opaque) {
5236 return rd_atomic32_get(&rkb->rkb_c.connects);
5237}
5238
5239
5240/**
5241 * @brief Sparse connections:
5242 * Select a random broker to connect to if no brokers are up.
5243 *
5244 * This is a non-blocking call, the connection is
5245 * performed by the selected broker thread.
5246 *
5247 * @locality any
5248 * @locks rd_kafka_rdlock() MUST be held
5249 */
5250void rd_kafka_connect_any (rd_kafka_t *rk, const char *reason) {
5251 rd_kafka_broker_t *rkb;
5252 rd_ts_t suppr;
5253
5254 /* Don't count connections to logical brokers since they serve
5255 * a specific purpose (group coordinator) and their connections
5256 * should not be reused for other purposes.
5257 * rd_kafka_broker_random() will not return LOGICAL brokers. */
5258 if (rd_atomic32_get(&rk->rk_broker_up_cnt) -
5259 rd_atomic32_get(&rk->rk_logical_broker_up_cnt) > 0 ||
5260 rd_atomic32_get(&rk->rk_broker_cnt) == 0)
5261 return;
5262
5263 mtx_lock(&rk->rk_suppress.sparse_connect_lock);
5264 suppr = rd_interval(&rk->rk_suppress.sparse_connect_random,
5265 rk->rk_conf.sparse_connect_intvl*1000, 0);
5266 mtx_unlock(&rk->rk_suppress.sparse_connect_lock);
5267
5268 if (suppr <= 0) {
5269 rd_kafka_dbg(rk, BROKER|RD_KAFKA_DBG_GENERIC, "CONNECT",
5270 "Not selecting any broker for cluster connection: "
5271 "still suppressed for %"PRId64"ms: %s",
5272 -suppr/1000, reason);
5273 return;
5274 }
5275
5276 /* First pass: only match brokers never connected to,
5277 * to try to exhaust the available brokers
5278 * so that an ERR_ALL_BROKERS_DOWN error can be raised. */
5279 rkb = rd_kafka_broker_random(rk, RD_KAFKA_BROKER_STATE_INIT,
5280 rd_kafka_broker_filter_never_connected,
5281 NULL);
5282 /* Second pass: match any non-connected/non-connecting broker. */
5283 if (!rkb)
5284 rkb = rd_kafka_broker_random(rk, RD_KAFKA_BROKER_STATE_INIT,
5285 NULL, NULL);
5286
5287 if (!rkb) {
5288 /* No brokers matched:
5289 * this happens if there are brokers in > INIT state,
5290 * in which case they're already connecting. */
5291
5292 rd_kafka_dbg(rk, BROKER|RD_KAFKA_DBG_GENERIC, "CONNECT",
5293 "Cluster connection already in progress: %s",
5294 reason);
5295 return;
5296 }
5297
5298 rd_rkb_dbg(rkb, BROKER|RD_KAFKA_DBG_GENERIC, "CONNECT",
5299 "Selected for cluster connection: "
5300 "%s (broker has %d connection attempt(s))",
5301 reason, rd_atomic32_get(&rkb->rkb_c.connects));
5302
5303 rd_kafka_broker_schedule_connection(rkb);
5304
5305 rd_kafka_broker_destroy(rkb); /* refcnt from ..broker_random() */
5306}
5307
5308
5309
5310/**
5311 * @brief Send PURGE queue request to broker.
5312 *
5313 * @locality any
5314 * @locks none
5315 */
5316void rd_kafka_broker_purge_queues (rd_kafka_broker_t *rkb, int purge_flags,
5317 rd_kafka_replyq_t replyq) {
5318 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
5319 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
5320 rko->rko_replyq = replyq;
5321 rko->rko_u.purge.flags = purge_flags;
5322 rd_kafka_q_enq(rkb->rkb_ops, rko);
5323}
5324
5325
5326/**
5327 * @brief Handle purge queues request
5328 *
5329 * @locality broker thread
5330 * @locks none
5331 */
5332static void rd_kafka_broker_handle_purge_queues (rd_kafka_broker_t *rkb,
5333 rd_kafka_op_t *rko) {
5334 int purge_flags = rko->rko_u.purge.flags;
5335 int inflight_cnt = 0, retry_cnt = 0, outq_cnt = 0, partial_cnt = 0;
5336
5337 rd_rkb_dbg(rkb, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGE",
5338 "Purging queues with flags %s",
5339 rd_kafka_purge_flags2str(purge_flags));
5340
5341
5342 /**
5343 * First purge any Produce requests to move the
5344 * messages from the request's message queue to delivery reports.
5345 */
5346
5347 /* Purge in-flight ProduceRequests */
5348 if (purge_flags & RD_KAFKA_PURGE_F_INFLIGHT)
5349 inflight_cnt = rd_kafka_broker_bufq_timeout_scan(
5350 rkb, 1, &rkb->rkb_waitresps, NULL, RD_KAFKAP_Produce,
5351 RD_KAFKA_RESP_ERR__PURGE_INFLIGHT, 0, NULL, 0);
5352
5353 if (purge_flags & RD_KAFKA_PURGE_F_QUEUE) {
5354 /* Requests in retry queue */
5355 retry_cnt = rd_kafka_broker_bufq_timeout_scan(
5356 rkb, 0, &rkb->rkb_retrybufs, NULL, RD_KAFKAP_Produce,
5357 RD_KAFKA_RESP_ERR__PURGE_QUEUE, 0, NULL, 0);
5358
5359 /* Requests in transmit queue not completely sent yet.
5360 * partial_cnt is included in outq_cnt and denotes a request
5361 * that has been partially transmitted. */
5362 outq_cnt = rd_kafka_broker_bufq_timeout_scan(
5363 rkb, 0, &rkb->rkb_outbufs, &partial_cnt,
5364 RD_KAFKAP_Produce, RD_KAFKA_RESP_ERR__PURGE_QUEUE, 0,
5365 NULL, 0);
5366
5367 /* Purging a partially transmitted request will mess up
5368 * the protocol stream, so we need to disconnect from the broker
5369 * to get a clean protocol socket. */
5370 if (partial_cnt)
5371 rd_kafka_broker_fail(
5372 rkb, LOG_NOTICE,
5373 RD_KAFKA_RESP_ERR__PURGE_QUEUE,
5374 "purged %d partially sent request: "
5375 "forcing disconnect", partial_cnt);
5376 }
5377
5378 rd_rkb_dbg(rkb, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
5379 "Purged %i in-flight, %i retry-queued, "
5380 "%i out-queue, %i partially-sent requests",
5381 inflight_cnt, retry_cnt, outq_cnt, partial_cnt);
5382
5383 /* Purge partition queues */
5384 if (purge_flags & RD_KAFKA_PURGE_F_QUEUE) {
5385 rd_kafka_toppar_t *rktp;
5386 int msg_cnt = 0;
5387 int part_cnt = 0;
5388
5389 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
5390 int r;
5391
5392 r = rd_kafka_toppar_handle_purge_queues(rktp, rkb,
5393 purge_flags);
5394 if (r > 0) {
5395 msg_cnt += r;
5396 part_cnt++;
5397 }
5398 }
5399
5400 rd_rkb_dbg(rkb, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
5401 "Purged %i message(s) from %d partition(s)",
5402 msg_cnt, part_cnt);
5403 }
5404
5405 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
5406}
5407
5408
5409/**
5410 * @brief Add toppar to broker's active list list.
5411 *
5412 * For consumer this means the fetch list.
5413 * For producers this is all partitions assigned to this broker.
5414 *
5415 * @locality broker thread
5416 * @locks rktp_lock MUST be held
5417 */
5418void rd_kafka_broker_active_toppar_add (rd_kafka_broker_t *rkb,
5419 rd_kafka_toppar_t *rktp) {
5420 int is_consumer = rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER;
5421
5422 if (is_consumer && rktp->rktp_fetch)
5423 return; /* Already added */
5424
5425 CIRCLEQ_INSERT_TAIL(&rkb->rkb_active_toppars, rktp, rktp_activelink);
5426 rkb->rkb_active_toppar_cnt++;
5427
5428 if (is_consumer)
5429 rktp->rktp_fetch = 1;
5430
5431 if (unlikely(rkb->rkb_active_toppar_cnt == 1))
5432 rd_kafka_broker_active_toppar_next(rkb, rktp);
5433
5434 rd_rkb_dbg(rkb, TOPIC, "FETCHADD",
5435 "Added %.*s [%"PRId32"] to %s list (%d entries, opv %d, "
5436 "%d messages queued)",
5437 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
5438 rktp->rktp_partition,
5439 is_consumer ? "fetch" : "active",
5440 rkb->rkb_active_toppar_cnt, rktp->rktp_fetch_version,
5441 rd_kafka_msgq_len(&rktp->rktp_msgq));
5442}
5443
5444
5445/**
5446 * @brief Remove toppar from active list.
5447 *
5448 * Locality: broker thread
5449 * Locks: none
5450 */
5451void rd_kafka_broker_active_toppar_del (rd_kafka_broker_t *rkb,
5452 rd_kafka_toppar_t *rktp) {
5453 int is_consumer = rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER;
5454
5455 if (is_consumer && !rktp->rktp_fetch)
5456 return; /* Not added */
5457
5458 CIRCLEQ_REMOVE(&rkb->rkb_active_toppars, rktp, rktp_activelink);
5459 rd_kafka_assert(NULL, rkb->rkb_active_toppar_cnt > 0);
5460 rkb->rkb_active_toppar_cnt--;
5461
5462 if (is_consumer)
5463 rktp->rktp_fetch = 0;
5464
5465 if (rkb->rkb_active_toppar_next == rktp) {
5466 /* Update next pointer */
5467 rd_kafka_broker_active_toppar_next(
5468 rkb, CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
5469 rktp, rktp_activelink));
5470 }
5471
5472 rd_rkb_dbg(rkb, TOPIC, "FETCHADD",
5473 "Removed %.*s [%"PRId32"] from %s list "
5474 "(%d entries, opv %d)",
5475 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
5476 rktp->rktp_partition,
5477 is_consumer ? "fetch" : "active",
5478 rkb->rkb_active_toppar_cnt, rktp->rktp_fetch_version);
5479
5480}
5481
5482
5483/**
5484 * @brief Schedule connection for \p rkb.
5485 * Will trigger disconnection for logical brokers whose nodename
5486 * was changed.
5487 *
5488 * @locality any
5489 * @locks none
5490 */
5491void rd_kafka_broker_schedule_connection (rd_kafka_broker_t *rkb) {
5492 rd_kafka_op_t *rko;
5493
5494 rko = rd_kafka_op_new(RD_KAFKA_OP_CONNECT);
5495 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
5496 rd_kafka_q_enq(rkb->rkb_ops, rko);
5497}
5498
5499
5500/**
5501 * @brief Add need for persistent connection to \p rkb
5502 * with rkb_persistconn atomic counter \p acntp
5503 *
5504 * @locality any
5505 * @locks none
5506 */
5507void
5508rd_kafka_broker_persistent_connection_add (rd_kafka_broker_t *rkb,
5509 rd_atomic32_t *acntp) {
5510
5511 if (rd_atomic32_add(acntp, 1) == 1) {
5512 /* First one, trigger event. */
5513 rd_kafka_broker_schedule_connection(rkb);
5514 }
5515}
5516
5517
5518/**
5519 * @brief Remove need for persistent connection to \p rkb
5520 * with rkb_persistconn atomic counter \p acntp
5521 *
5522 * @locality any
5523 * @locks none
5524 */
5525void
5526rd_kafka_broker_persistent_connection_del (rd_kafka_broker_t *rkb,
5527 rd_atomic32_t *acntp) {
5528 int32_t r = rd_atomic32_sub(acntp, 1);
5529 rd_assert(r >= 0);
5530}
5531
5532
5533int unittest_broker (void) {
5534 int fails = 0;
5535
5536 fails += rd_ut_reconnect_backoff();
5537
5538 return fails;
5539}
5540