1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30#define _GNU_SOURCE
31#include <errno.h>
32#include <string.h>
33#include <stdarg.h>
34#include <signal.h>
35#include <stdlib.h>
36#include <sys/stat.h>
37
38#include "rdkafka_int.h"
39#include "rdkafka_msg.h"
40#include "rdkafka_broker.h"
41#include "rdkafka_topic.h"
42#include "rdkafka_partition.h"
43#include "rdkafka_offset.h"
44#include "rdkafka_transport.h"
45#include "rdkafka_cgrp.h"
46#include "rdkafka_assignor.h"
47#include "rdkafka_request.h"
48#include "rdkafka_event.h"
49#include "rdkafka_sasl.h"
50#include "rdkafka_interceptor.h"
51#include "rdkafka_idempotence.h"
52#include "rdkafka_sasl_oauthbearer.h"
53#if WITH_SSL
54#include "rdkafka_ssl.h"
55#endif
56
57#include "rdtime.h"
58#include "crc32c.h"
59#include "rdunittest.h"
60
61#ifdef _MSC_VER
62#include <sys/types.h>
63#include <sys/timeb.h>
64#endif
65
66
67
68static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
69
70/**
71 * @brief Global counter+lock for all active librdkafka instances
72 */
73mtx_t rd_kafka_global_lock;
74int rd_kafka_global_cnt;
75
76
77/**
78 * Last API error code, per thread.
79 * Shared among all rd_kafka_t instances.
80 */
81rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
82
83
84/**
85 * Current number of threads created by rdkafka.
86 * This is used in regression tests.
87 */
88rd_atomic32_t rd_kafka_thread_cnt_curr;
89int rd_kafka_thread_cnt (void) {
90#if ENABLE_SHAREDPTR_DEBUG
91 rd_shared_ptrs_dump();
92#endif
93
94 return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
95}
96
97/**
98 * Current thread's log name (TLS)
99 */
100static char RD_TLS rd_kafka_thread_name[64] = "app";
101
102void rd_kafka_set_thread_name (const char *fmt, ...) {
103 va_list ap;
104
105 va_start(ap, fmt);
106 rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name),
107 fmt, ap);
108 va_end(ap);
109}
110
111/**
112 * @brief Current thread's system name (TLS)
113 *
114 * Note the name must be 15 characters or less, because it is passed to
115 * pthread_setname_np on Linux which imposes this limit.
116 */
117static char RD_TLS rd_kafka_thread_sysname[16] = "app";
118
119void rd_kafka_set_thread_sysname (const char *fmt, ...) {
120 va_list ap;
121
122 va_start(ap, fmt);
123 rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
124 fmt, ap);
125 va_end(ap);
126
127 thrd_setname(rd_kafka_thread_sysname);
128}
129
130static void rd_kafka_global_init0 (void) {
131#if ENABLE_SHAREDPTR_DEBUG
132 LIST_INIT(&rd_shared_ptr_debug_list);
133 mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain);
134 atexit(rd_shared_ptrs_dump);
135#endif
136 mtx_init(&rd_kafka_global_lock, mtx_plain);
137#if ENABLE_DEVEL
138 rd_atomic32_init(&rd_kafka_op_cnt, 0);
139#endif
140 crc32c_global_init();
141#if WITH_SSL
142 /* The configuration interface might need to use
143 * OpenSSL to parse keys, prior to any rd_kafka_t
144 * object has been created. */
145 rd_kafka_ssl_init();
146#endif
147}
148
149/**
150 * @brief Initialize once per process
151 */
152void rd_kafka_global_init (void) {
153 call_once(&rd_kafka_global_init_once, rd_kafka_global_init0);
154}
155
156/**
157 * @returns the current number of active librdkafka instances
158 */
159static int rd_kafka_global_cnt_get (void) {
160 int r;
161 mtx_lock(&rd_kafka_global_lock);
162 r = rd_kafka_global_cnt;
163 mtx_unlock(&rd_kafka_global_lock);
164 return r;
165}
166
167
168/**
169 * @brief Increase counter for active librdkafka instances.
170 * If this is the first instance the global constructors will be called, if any.
171 */
172static void rd_kafka_global_cnt_incr (void) {
173 mtx_lock(&rd_kafka_global_lock);
174 rd_kafka_global_cnt++;
175 if (rd_kafka_global_cnt == 1) {
176 rd_kafka_transport_init();
177#if WITH_SSL
178 rd_kafka_ssl_init();
179#endif
180 rd_kafka_sasl_global_init();
181 }
182 mtx_unlock(&rd_kafka_global_lock);
183}
184
185/**
186 * @brief Decrease counter for active librdkafka instances.
187 * If this counter reaches 0 the global destructors will be called, if any.
188 */
189static void rd_kafka_global_cnt_decr (void) {
190 mtx_lock(&rd_kafka_global_lock);
191 rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
192 rd_kafka_global_cnt--;
193 if (rd_kafka_global_cnt == 0) {
194 rd_kafka_sasl_global_term();
195#if WITH_SSL
196 rd_kafka_ssl_term();
197#endif
198 }
199 mtx_unlock(&rd_kafka_global_lock);
200}
201
202
203/**
204 * Wait for all rd_kafka_t objects to be destroyed.
205 * Returns 0 if all kafka objects are now destroyed, or -1 if the
206 * timeout was reached.
207 */
208int rd_kafka_wait_destroyed (int timeout_ms) {
209 rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
210
211 while (rd_kafka_thread_cnt() > 0 ||
212 rd_kafka_global_cnt_get() > 0) {
213 if (rd_clock() >= timeout) {
214 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
215 ETIMEDOUT);
216#if ENABLE_SHAREDPTR_DEBUG
217 rd_shared_ptrs_dump();
218#endif
219 return -1;
220 }
221 rd_usleep(25000, NULL); /* 25ms */
222 }
223
224 return 0;
225}
226
227static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
228 const rd_kafka_t *rk, int level, const char *fac,
229 const char *buf) {
230 if (level > conf->log_level)
231 return;
232 else if (rk && conf->log_queue) {
233 rd_kafka_op_t *rko;
234
235 if (!rk->rk_logq)
236 return; /* Terminating */
237
238 rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
239 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
240 rko->rko_u.log.level = level;
241 strncpy(rko->rko_u.log.fac, fac,
242 sizeof(rko->rko_u.log.fac) - 1);
243 rko->rko_u.log.str = rd_strdup(buf);
244 rd_kafka_q_enq(rk->rk_logq, rko);
245
246 } else if (conf->log_cb) {
247 conf->log_cb(rk, level, fac, buf);
248 }
249}
250
251/**
252 * @brief Logger
253 *
254 * @remark conf must be set, but rk may be NULL
255 */
256void rd_kafka_log0 (const rd_kafka_conf_t *conf,
257 const rd_kafka_t *rk,
258 const char *extra, int level,
259 const char *fac, const char *fmt, ...) {
260 char buf[2048];
261 va_list ap;
262 unsigned int elen = 0;
263 unsigned int of = 0;
264
265 if (level > conf->log_level)
266 return;
267
268 if (conf->log_thread_name) {
269 elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
270 rd_kafka_thread_name);
271 if (unlikely(elen >= sizeof(buf)))
272 elen = sizeof(buf);
273 of = elen;
274 }
275
276 if (extra) {
277 elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
278 if (unlikely(elen >= sizeof(buf)-of))
279 elen = sizeof(buf)-of;
280 of += elen;
281 }
282
283 va_start(ap, fmt);
284 rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
285 va_end(ap);
286
287 rd_kafka_log_buf(conf, rk, level, fac, buf);
288}
289
290rd_kafka_resp_err_t
291rd_kafka_oauthbearer_set_token (rd_kafka_t *rk,
292 const char *token_value,
293 int64_t md_lifetime_ms,
294 const char *md_principal_name,
295 const char **extensions, size_t extension_size,
296 char *errstr, size_t errstr_size) {
297#if WITH_SASL_OAUTHBEARER
298 return rd_kafka_oauthbearer_set_token0(
299 rk, token_value,
300 md_lifetime_ms, md_principal_name, extensions, extension_size,
301 errstr, errstr_size);
302#else
303 rd_snprintf(errstr, errstr_size,
304 "librdkafka not built with SASL OAUTHBEARER support");
305 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
306#endif
307}
308
309rd_kafka_resp_err_t
310rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr) {
311#if WITH_SASL_OAUTHBEARER
312 return rd_kafka_oauthbearer_set_token_failure0(rk, errstr);
313#else
314 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
315#endif
316}
317
318void rd_kafka_log_print(const rd_kafka_t *rk, int level,
319 const char *fac, const char *buf) {
320 int secs, msecs;
321 struct timeval tv;
322 rd_gettimeofday(&tv, NULL);
323 secs = (int)tv.tv_sec;
324 msecs = (int)(tv.tv_usec / 1000);
325 fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
326 level, secs, msecs,
327 fac, rk ? rk->rk_name : "", buf);
328}
329
330#ifndef _MSC_VER
331void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
332 const char *fac, const char *buf) {
333 static int initialized = 0;
334
335 if (!initialized)
336 openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
337
338 syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
339}
340#endif
341
342void rd_kafka_set_logger (rd_kafka_t *rk,
343 void (*func) (const rd_kafka_t *rk, int level,
344 const char *fac, const char *buf)) {
345 rk->rk_conf.log_cb = func;
346}
347
348void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
349 rk->rk_conf.log_level = level;
350}
351
352
353
354
355
356
357static const char *rd_kafka_type2str (rd_kafka_type_t type) {
358 static const char *types[] = {
359 [RD_KAFKA_PRODUCER] = "producer",
360 [RD_KAFKA_CONSUMER] = "consumer",
361 };
362 return types[type];
363}
364
365#define _ERR_DESC(ENUM,DESC) \
366 [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC }
367
368static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
369 _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
370 _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
371 "Local: Bad message format"),
372 _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
373 "Local: Invalid compressed data"),
374 _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
375 "Local: Broker handle destroyed"),
376 _ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
377 "Local: Communication failure with broker"), //FIXME: too specific
378 _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
379 "Local: Broker transport failure"),
380 _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
381 "Local: Critical system resource failure"),
382 _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
383 "Local: Host resolution failure"),
384 _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
385 "Local: Message timed out"),
386 _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
387 "Broker: No more messages"),
388 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
389 "Local: Unknown partition"),
390 _ERR_DESC(RD_KAFKA_RESP_ERR__FS,
391 "Local: File or filesystem error"),
392 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
393 "Local: Unknown topic"),
394 _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
395 "Local: All broker connections are down"),
396 _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
397 "Local: Invalid argument or configuration"),
398 _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
399 "Local: Timed out"),
400 _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
401 "Local: Queue full"),
402 _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
403 "Local: ISR count insufficient"),
404 _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
405 "Local: Broker node update"),
406 _ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
407 "Local: SSL error"),
408 _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
409 "Local: Waiting for coordinator"),
410 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
411 "Local: Unknown group"),
412 _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
413 "Local: Operation in progress"),
414 _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
415 "Local: Previous operation in progress"),
416 _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
417 "Local: Existing subscription"),
418 _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
419 "Local: Assign partitions"),
420 _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
421 "Local: Revoke partitions"),
422 _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
423 "Local: Conflicting use"),
424 _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
425 "Local: Erroneous state"),
426 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
427 "Local: Unknown protocol"),
428 _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
429 "Local: Not implemented"),
430 _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
431 "Local: Authentication failure"),
432 _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
433 "Local: No offset stored"),
434 _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
435 "Local: Outdated"),
436 _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
437 "Local: Timed out in queue"),
438 _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
439 "Local: Required feature not supported by broker"),
440 _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
441 "Local: Awaiting cache update"),
442 _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
443 "Local: Operation interrupted"),
444 _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
445 "Local: Key serialization error"),
446 _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
447 "Local: Value serialization error"),
448 _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
449 "Local: Key deserialization error"),
450 _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
451 "Local: Value deserialization error"),
452 _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL,
453 "Local: Partial response"),
454 _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY,
455 "Local: Read-only object"),
456 _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT,
457 "Local: No such entry"),
458 _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW,
459 "Local: Read underflow"),
460 _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE,
461 "Local: Invalid type"),
462 _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY,
463 "Local: Retry operation"),
464 _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE,
465 "Local: Purged in queue"),
466 _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT,
467 "Local: Purged in flight"),
468 _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL,
469 "Local: Fatal error"),
470 _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT,
471 "Local: Inconsistent state"),
472 _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
473 "Local: Gap-less ordering would not be guaranteed "
474 "if proceeding"),
475 _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
476 "Local: Maximum application poll interval "
477 "(max.poll.interval.ms) exceeded"),
478
479 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
480 "Unknown broker error"),
481 _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
482 "Success"),
483 _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
484 "Broker: Offset out of range"),
485 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
486 "Broker: Invalid message"),
487 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
488 "Broker: Unknown topic or partition"),
489 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
490 "Broker: Invalid message size"),
491 _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
492 "Broker: Leader not available"),
493 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
494 "Broker: Not leader for partition"),
495 _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
496 "Broker: Request timed out"),
497 _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
498 "Broker: Broker not available"),
499 _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
500 "Broker: Replica not available"),
501 _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
502 "Broker: Message size too large"),
503 _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
504 "Broker: StaleControllerEpochCode"),
505 _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
506 "Broker: Offset metadata string too large"),
507 _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
508 "Broker: Broker disconnected before response received"),
509 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
510 "Broker: Group coordinator load in progress"),
511 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
512 "Broker: Group coordinator not available"),
513 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
514 "Broker: Not coordinator for group"),
515 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
516 "Broker: Invalid topic"),
517 _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
518 "Broker: Message batch larger than configured server "
519 "segment size"),
520 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
521 "Broker: Not enough in-sync replicas"),
522 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
523 "Broker: Message(s) written to insufficient number of "
524 "in-sync replicas"),
525 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
526 "Broker: Invalid required acks value"),
527 _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
528 "Broker: Specified group generation id is not valid"),
529 _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
530 "Broker: Inconsistent group protocol"),
531 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
532 "Broker: Invalid group.id"),
533 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
534 "Broker: Unknown member"),
535 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
536 "Broker: Invalid session timeout"),
537 _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
538 "Broker: Group rebalance in progress"),
539 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
540 "Broker: Commit offset data size is not valid"),
541 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
542 "Broker: Topic authorization failed"),
543 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
544 "Broker: Group authorization failed"),
545 _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
546 "Broker: Cluster authorization failed"),
547 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
548 "Broker: Invalid timestamp"),
549 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
550 "Broker: Unsupported SASL mechanism"),
551 _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
552 "Broker: Request not valid in current SASL state"),
553 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
554 "Broker: API version not supported"),
555 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
556 "Broker: Topic already exists"),
557 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
558 "Broker: Invalid number of partitions"),
559 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
560 "Broker: Invalid replication factor"),
561 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
562 "Broker: Invalid replica assignment"),
563 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
564 "Broker: Configuration is invalid"),
565 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
566 "Broker: Not controller for cluster"),
567 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
568 "Broker: Invalid request"),
569 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
570 "Broker: Message format on broker does not support request"),
571 _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
572 "Broker: Policy violation"),
573 _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
574 "Broker: Broker received an out of order sequence number"),
575 _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
576 "Broker: Broker received a duplicate sequence number"),
577 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
578 "Broker: Producer attempted an operation with an old epoch"),
579 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
580 "Broker: Producer attempted a transactional operation in "
581 "an invalid state"),
582 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
583 "Broker: Producer attempted to use a producer id which is "
584 "not currently assigned to its transactional id"),
585 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
586 "Broker: Transaction timeout is larger than the maximum "
587 "value allowed by the broker's max.transaction.timeout.ms"),
588 _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
589 "Broker: Producer attempted to update a transaction while "
590 "another concurrent operation on the same transaction was "
591 "ongoing"),
592 _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
593 "Broker: Indicates that the transaction coordinator sending "
594 "a WriteTxnMarker is no longer the current coordinator for "
595 "a given producer"),
596 _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
597 "Broker: Transactional Id authorization failed"),
598 _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
599 "Broker: Security features are disabled"),
600 _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
601 "Broker: Operation not attempted"),
602 _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
603 "Disk error when trying to access log file on the disk"),
604 _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND,
605 "The user-specified log directory is not found "
606 "in the broker config"),
607 _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED,
608 "SASL Authentication failed"),
609 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
610 "Unknown Producer Id"),
611 _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS,
612 "Partition reassignment is in progress"),
613 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED,
614 "Delegation Token feature is not enabled"),
615 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND,
616 "Delegation Token is not found on server"),
617 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH,
618 "Specified Principal is not valid Owner/Renewer"),
619 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
620 "Delegation Token requests are not allowed on "
621 "this connection"),
622 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED,
623 "Delegation Token authorization failed"),
624 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED,
625 "Delegation Token is expired"),
626 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE,
627 "Supplied principalType is not supported"),
628 _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP,
629 "The group is not empty"),
630 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND,
631 "The group id does not exist"),
632 _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND,
633 "The fetch session ID was not found"),
634 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH,
635 "The fetch session epoch is invalid"),
636 _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND,
637 "No matching listener"),
638 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED,
639 "Topic deletion is disabled"),
640 _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
641 "Leader epoch is older than broker epoch"),
642 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
643 "Leader epoch is newer than broker epoch"),
644 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE,
645 "Unsupported compression type"),
646 _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH,
647 "Broker epoch has changed"),
648 _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
649 "Leader high watermark is not caught up"),
650 _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
651 "Group member needs a valid member ID"),
652 _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE,
653 "Preferred leader was not available"),
654 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED,
655 "Consumer group has reached maximum size"),
656
657 _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
658};
659
660
661void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
662 size_t *cntp) {
663 *errdescs = rd_kafka_err_descs;
664 *cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
665}
666
667
668const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
669 static RD_TLS char ret[32];
670 int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
671
672 if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
673 err >= RD_KAFKA_RESP_ERR_END_ALL ||
674 !rd_kafka_err_descs[idx].desc)) {
675 rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
676 return ret;
677 }
678
679 return rd_kafka_err_descs[idx].desc;
680}
681
682
683const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
684 static RD_TLS char ret[32];
685 int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
686
687 if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
688 err >= RD_KAFKA_RESP_ERR_END_ALL ||
689 !rd_kafka_err_descs[idx].desc)) {
690 rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
691 return ret;
692 }
693
694 return rd_kafka_err_descs[idx].name;
695}
696
697
698rd_kafka_resp_err_t rd_kafka_last_error (void) {
699 return rd_kafka_last_error_code;
700}
701
702
703rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
704 switch (errnox)
705 {
706 case EINVAL:
707 return RD_KAFKA_RESP_ERR__INVALID_ARG;
708
709 case EBUSY:
710 return RD_KAFKA_RESP_ERR__CONFLICT;
711
712 case ENOENT:
713 return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
714
715 case ESRCH:
716 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
717
718 case ETIMEDOUT:
719 return RD_KAFKA_RESP_ERR__TIMED_OUT;
720
721 case EMSGSIZE:
722 return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
723
724 case ENOBUFS:
725 return RD_KAFKA_RESP_ERR__QUEUE_FULL;
726
727 case ECANCELED:
728 return RD_KAFKA_RESP_ERR__FATAL;
729
730 default:
731 return RD_KAFKA_RESP_ERR__FAIL;
732 }
733}
734
735
736rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk,
737 char *errstr, size_t errstr_size) {
738 rd_kafka_resp_err_t err;
739
740 if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) {
741 rd_kafka_rdlock(rk);
742 rd_snprintf(errstr, errstr_size, "%s", rk->rk_fatal.errstr);
743 rd_kafka_rdunlock(rk);
744 }
745
746 return err;
747}
748
749
750/**
751 * @brief Set's the fatal error for this instance.
752 *
753 * @returns 1 if the error was set, or 0 if a previous fatal error
754 * has already been set on this instance.
755 *
756 * @locality any
757 * @locks none
758 */
759int rd_kafka_set_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
760 const char *fmt, ...) {
761 va_list ap;
762 char buf[512];
763
764 rd_kafka_wrlock(rk);
765 rk->rk_fatal.cnt++;
766 if (rd_atomic32_get(&rk->rk_fatal.err)) {
767 rd_kafka_wrunlock(rk);
768 rd_kafka_dbg(rk, GENERIC, "FATAL",
769 "Suppressing subsequent fatal error: %s",
770 rd_kafka_err2name(err));
771 return 0;
772 }
773
774 rd_atomic32_set(&rk->rk_fatal.err, err);
775
776 va_start(ap, fmt);
777 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
778 va_end(ap);
779 rk->rk_fatal.errstr = rd_strdup(buf);
780
781 rd_kafka_wrunlock(rk);
782
783 /* If there is an error callback or event handler we
784 * also log the fatal error as it happens.
785 * If there is no error callback the error event
786 * will be automatically logged, and this check here
787 * prevents us from duplicate logs. */
788 if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)
789 rd_kafka_log(rk, LOG_EMERG, "FATAL",
790 "Fatal error: %s: %s",
791 rd_kafka_err2str(err), rk->rk_fatal.errstr);
792 else
793 rd_kafka_dbg(rk, ALL, "FATAL",
794 "Fatal error: %s: %s",
795 rd_kafka_err2str(err), rk->rk_fatal.errstr);
796
797 /* Indicate to the application that a fatal error was raised,
798 * the app should use rd_kafka_fatal_error() to extract the
799 * fatal error code itself. */
800 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL,
801 "Fatal error: %s: %s",
802 rd_kafka_err2str(err), rk->rk_fatal.errstr);
803
804
805 /* Purge producer queues, but not in-flight since we'll
806 * want proper delivery status for transmitted requests.
807 * Need NON_BLOCKING to avoid dead-lock if user is
808 * calling purge() at the same time, which could be
809 * waiting for this broker thread to handle its
810 * OP_PURGE request. */
811 if (rk->rk_type == RD_KAFKA_PRODUCER)
812 rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE|
813 RD_KAFKA_PURGE_F_NON_BLOCKING);
814
815 return 1;
816}
817
818
819rd_kafka_resp_err_t
820rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
821 const char *reason) {
822 if (rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s", reason))
823 return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
824 else
825 return RD_KAFKA_RESP_ERR_NO_ERROR;
826}
827
828
829
830/**
831 * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
832 *
833 * @locality application thread
834 */
835void rd_kafka_destroy_final (rd_kafka_t *rk) {
836
837 rd_kafka_assert(rk, rd_kafka_terminating(rk));
838
839 /* Synchronize state */
840 rd_kafka_wrlock(rk);
841 rd_kafka_wrunlock(rk);
842
843 rd_kafka_assignors_term(rk);
844
845 rd_kafka_metadata_cache_destroy(rk);
846
847 /* Terminate SASL provider */
848 if (rk->rk_conf.sasl.provider)
849 rd_kafka_sasl_term(rk);
850
851 rd_kafka_timers_destroy(&rk->rk_timers);
852
853 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
854
855 /* Destroy cgrp */
856 if (rk->rk_cgrp) {
857 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
858 "Destroying cgrp");
859 /* Reset queue forwarding (rep -> cgrp) */
860 rd_kafka_q_fwd_set(rk->rk_rep, NULL);
861 rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
862 }
863
864 /* Purge op-queues */
865 rd_kafka_q_destroy_owner(rk->rk_rep);
866 rd_kafka_q_destroy_owner(rk->rk_ops);
867
868#if WITH_SSL
869 if (rk->rk_conf.ssl.ctx) {
870 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
871 rd_kafka_ssl_ctx_term(rk);
872 }
873#endif
874
875 /* It is not safe to log after this point. */
876 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
877 "Termination done: freeing resources");
878
879 if (rk->rk_logq) {
880 rd_kafka_q_destroy_owner(rk->rk_logq);
881 rk->rk_logq = NULL;
882 }
883
884 if (rk->rk_type == RD_KAFKA_PRODUCER) {
885 cnd_destroy(&rk->rk_curr_msgs.cnd);
886 mtx_destroy(&rk->rk_curr_msgs.lock);
887 }
888
889 if (rk->rk_fatal.errstr) {
890 rd_free(rk->rk_fatal.errstr);
891 rk->rk_fatal.errstr = NULL;
892 }
893
894 cnd_destroy(&rk->rk_broker_state_change_cnd);
895 mtx_destroy(&rk->rk_broker_state_change_lock);
896
897 mtx_destroy(&rk->rk_suppress.sparse_connect_lock);
898
899 cnd_destroy(&rk->rk_init_cnd);
900 mtx_destroy(&rk->rk_init_lock);
901
902 if (rk->rk_full_metadata)
903 rd_kafka_metadata_destroy(rk->rk_full_metadata);
904 rd_kafkap_str_destroy(rk->rk_client_id);
905 rd_kafkap_str_destroy(rk->rk_group_id);
906 rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
907 rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
908 rd_list_destroy(&rk->rk_broker_by_id);
909
910 rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes);
911 rwlock_destroy(&rk->rk_lock);
912
913 rd_free(rk);
914 rd_kafka_global_cnt_decr();
915}
916
917
918static void rd_kafka_destroy_app (rd_kafka_t *rk, int flags) {
919 thrd_t thrd;
920#ifndef _MSC_VER
921 int term_sig = rk->rk_conf.term_sig;
922#endif
923 int res;
924 char flags_str[256];
925 static const char *rd_kafka_destroy_flags_names[] = {
926 "Terminate",
927 "DestroyCalled",
928 "Immediate",
929 "NoConsumerClose",
930 NULL
931 };
932
933 /* _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */
934 if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE)
935 flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE;
936
937 rd_flags2str(flags_str, sizeof(flags_str),
938 rd_kafka_destroy_flags_names, flags);
939 rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance "
940 "(destroy flags %s (0x%x))",
941 flags ? flags_str : "none", flags);
942
943 /* Make sure destroy is not called from a librdkafka thread
944 * since this will most likely cause a deadlock.
945 * FIXME: include broker threads (for log_cb) */
946 if (thrd_is_current(rk->rk_thread) ||
947 thrd_is_current(rk->rk_background.thread)) {
948 rd_kafka_log(rk, LOG_EMERG, "BGQUEUE",
949 "Application bug: "
950 "rd_kafka_destroy() called from "
951 "librdkafka owned thread");
952 rd_kafka_assert(NULL,
953 !*"Application bug: "
954 "calling rd_kafka_destroy() from "
955 "librdkafka owned thread is prohibited");
956 }
957
958 /* Before signaling for general termination, set the destroy
959 * flags to hint cgrp how to shut down. */
960 rd_atomic32_set(&rk->rk_terminate,
961 flags|RD_KAFKA_DESTROY_F_DESTROY_CALLED);
962
963 /* The legacy/simple consumer lacks an API to close down the consumer*/
964 if (rk->rk_cgrp) {
965 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
966 "Terminating consumer group handler");
967 rd_kafka_consumer_close(rk);
968 }
969
970 /* With the consumer closed, terminate the rest of librdkafka. */
971 rd_atomic32_set(&rk->rk_terminate, flags|RD_KAFKA_DESTROY_F_TERMINATE);
972
973 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
974 rd_kafka_wrlock(rk);
975 thrd = rk->rk_thread;
976 rd_kafka_timers_interrupt(&rk->rk_timers);
977 rd_kafka_wrunlock(rk);
978
979 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
980 "Sending TERMINATE to internal main thread");
981 /* Send op to trigger queue/io wake-up.
982 * The op itself is (likely) ignored by the receiver. */
983 rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
984
985#ifndef _MSC_VER
986 /* Interrupt main kafka thread to speed up termination. */
987 if (term_sig) {
988 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
989 "Sending thread kill signal %d", term_sig);
990 pthread_kill(thrd, term_sig);
991 }
992#endif
993
994 if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE))
995 return; /* FIXME: thread resource leak */
996
997 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
998 "Joining internal main thread");
999
1000 if (thrd_join(thrd, &res) != thrd_success)
1001 rd_kafka_log(rk, LOG_ERR, "DESTROY",
1002 "Failed to join internal main thread: %s "
1003 "(was process forked?)",
1004 rd_strerror(errno));
1005
1006 rd_kafka_destroy_final(rk);
1007}
1008
1009
1010/* NOTE: Must only be called by application.
1011 * librdkafka itself must use rd_kafka_destroy0(). */
1012void rd_kafka_destroy (rd_kafka_t *rk) {
1013 rd_kafka_destroy_app(rk, 0);
1014}
1015
1016void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags) {
1017 rd_kafka_destroy_app(rk, flags);
1018}
1019
1020
1021/**
1022 * Main destructor for rd_kafka_t
1023 *
1024 * Locality: rdkafka main thread or application thread during rd_kafka_new()
1025 */
1026static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
1027 rd_kafka_itopic_t *rkt, *rkt_tmp;
1028 rd_kafka_broker_t *rkb, *rkb_tmp;
1029 rd_list_t wait_thrds;
1030 thrd_t *thrd;
1031 int i;
1032
1033 rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
1034
1035 /* Trigger any state-change waiters (which should check the
1036 * terminate flag whenever they wake up). */
1037 rd_kafka_brokers_broadcast_state_change(rk);
1038
1039 if (rk->rk_background.thread) {
1040 int res;
1041 /* Send op to trigger queue/io wake-up.
1042 * The op itself is (likely) ignored by the receiver. */
1043 rd_kafka_q_enq(rk->rk_background.q,
1044 rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1045
1046 rd_kafka_dbg(rk, ALL, "DESTROY",
1047 "Waiting for background queue thread "
1048 "to terminate");
1049 thrd_join(rk->rk_background.thread, &res);
1050 rd_kafka_q_destroy_owner(rk->rk_background.q);
1051 }
1052
1053 /* Call on_destroy() interceptors */
1054 rd_kafka_interceptors_on_destroy(rk);
1055
1056 /* Brokers pick up on rk_terminate automatically. */
1057
1058 /* List of (broker) threads to join to synchronize termination */
1059 rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
1060
1061 rd_kafka_wrlock(rk);
1062
1063 rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
1064 /* Decommission all topics */
1065 TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
1066 rd_kafka_wrunlock(rk);
1067 rd_kafka_topic_partitions_remove(rkt);
1068 rd_kafka_wrlock(rk);
1069 }
1070
1071 /* Decommission brokers.
1072 * Broker thread holds a refcount and detects when broker refcounts
1073 * reaches 1 and then decommissions itself. */
1074 TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
1075 /* Add broker's thread to wait_thrds list for later joining */
1076 thrd = malloc(sizeof(*thrd));
1077 *thrd = rkb->rkb_thread;
1078 rd_list_add(&wait_thrds, thrd);
1079 rd_kafka_wrunlock(rk);
1080
1081 rd_kafka_dbg(rk, BROKER, "DESTROY",
1082 "Sending TERMINATE to %s",
1083 rd_kafka_broker_name(rkb));
1084 /* Send op to trigger queue/io wake-up.
1085 * The op itself is (likely) ignored by the broker thread. */
1086 rd_kafka_q_enq(rkb->rkb_ops,
1087 rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1088
1089#ifndef _MSC_VER
1090 /* Interrupt IO threads to speed up termination. */
1091 if (rk->rk_conf.term_sig)
1092 pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
1093#endif
1094
1095 rd_kafka_broker_destroy(rkb);
1096
1097 rd_kafka_wrlock(rk);
1098 }
1099
1100 if (rk->rk_clusterid) {
1101 rd_free(rk->rk_clusterid);
1102 rk->rk_clusterid = NULL;
1103 }
1104
1105 rd_kafka_wrunlock(rk);
1106
1107 mtx_lock(&rk->rk_broker_state_change_lock);
1108 /* Purge broker state change waiters */
1109 rd_list_destroy(&rk->rk_broker_state_change_waiters);
1110 mtx_unlock(&rk->rk_broker_state_change_lock);
1111
1112 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1113 "Purging reply queue");
1114
1115 /* Purge op-queue */
1116 rd_kafka_q_disable(rk->rk_rep);
1117 rd_kafka_q_purge(rk->rk_rep);
1118
1119 /* Loose our special reference to the internal broker. */
1120 mtx_lock(&rk->rk_internal_rkb_lock);
1121 if ((rkb = rk->rk_internal_rkb)) {
1122 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1123 "Decommissioning internal broker");
1124
1125 /* Send op to trigger queue wake-up. */
1126 rd_kafka_q_enq(rkb->rkb_ops,
1127 rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1128
1129 rk->rk_internal_rkb = NULL;
1130 thrd = malloc(sizeof(*thrd));
1131 *thrd = rkb->rkb_thread;
1132 rd_list_add(&wait_thrds, thrd);
1133 }
1134 mtx_unlock(&rk->rk_internal_rkb_lock);
1135 if (rkb)
1136 rd_kafka_broker_destroy(rkb);
1137
1138
1139 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1140 "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
1141
1142 /* Join broker threads */
1143 RD_LIST_FOREACH(thrd, &wait_thrds, i) {
1144 int res;
1145 if (thrd_join(*thrd, &res) != thrd_success)
1146 ;
1147 free(thrd);
1148 }
1149
1150 rd_list_destroy(&wait_thrds);
1151}
1152
1153/**
1154 * @brief Buffer state for stats emitter
1155 */
1156struct _stats_emit {
1157 char *buf; /* Pointer to allocated buffer */
1158 size_t size; /* Current allocated size of buf */
1159 size_t of; /* Current write-offset in buf */
1160};
1161
1162
1163/* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the
1164 * current scope. */
1165#define _st_printf(...) do { \
1166 ssize_t _r; \
1167 ssize_t _rem = st->size - st->of; \
1168 _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \
1169 if (_r >= _rem) { \
1170 st->size *= 2; \
1171 _rem = st->size - st->of; \
1172 st->buf = rd_realloc(st->buf, st->size); \
1173 _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \
1174 } \
1175 st->of += _r; \
1176 } while (0)
1177
1178struct _stats_total {
1179 int64_t tx; /**< broker.tx */
1180 int64_t tx_bytes; /**< broker.tx_bytes */
1181 int64_t rx; /**< broker.rx */
1182 int64_t rx_bytes; /**< broker.rx_bytes */
1183 int64_t txmsgs; /**< partition.txmsgs */
1184 int64_t txmsg_bytes; /**< partition.txbytes */
1185 int64_t rxmsgs; /**< partition.rxmsgs */
1186 int64_t rxmsg_bytes; /**< partition.rxbytes */
1187};
1188
1189
1190
1191/**
1192 * @brief Rollover and emit an average window.
1193 */
1194static RD_INLINE void rd_kafka_stats_emit_avg (struct _stats_emit *st,
1195 const char *name,
1196 rd_avg_t *src_avg) {
1197 rd_avg_t avg;
1198
1199 rd_avg_rollover(&avg, src_avg);
1200 _st_printf(
1201 "\"%s\": {"
1202 " \"min\":%"PRId64","
1203 " \"max\":%"PRId64","
1204 " \"avg\":%"PRId64","
1205 " \"sum\":%"PRId64","
1206 " \"stddev\": %"PRId64","
1207 " \"p50\": %"PRId64","
1208 " \"p75\": %"PRId64","
1209 " \"p90\": %"PRId64","
1210 " \"p95\": %"PRId64","
1211 " \"p99\": %"PRId64","
1212 " \"p99_99\": %"PRId64","
1213 " \"outofrange\": %"PRId64","
1214 " \"hdrsize\": %"PRId32","
1215 " \"cnt\":%i "
1216 "}, ",
1217 name,
1218 avg.ra_v.minv,
1219 avg.ra_v.maxv,
1220 avg.ra_v.avg,
1221 avg.ra_v.sum,
1222 (int64_t)avg.ra_hist.stddev,
1223 avg.ra_hist.p50,
1224 avg.ra_hist.p75,
1225 avg.ra_hist.p90,
1226 avg.ra_hist.p95,
1227 avg.ra_hist.p99,
1228 avg.ra_hist.p99_99,
1229 avg.ra_hist.oor,
1230 avg.ra_hist.hdrsize,
1231 avg.ra_v.cnt);
1232 rd_avg_destroy(&avg);
1233}
1234
1235/**
1236 * Emit stats for toppar
1237 */
1238static RD_INLINE void rd_kafka_stats_emit_toppar (struct _stats_emit *st,
1239 struct _stats_total *total,
1240 rd_kafka_toppar_t *rktp,
1241 int first) {
1242 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1243 int64_t consumer_lag = -1;
1244 struct offset_stats offs;
1245 int32_t leader_nodeid = -1;
1246
1247 rd_kafka_toppar_lock(rktp);
1248
1249 if (rktp->rktp_leader) {
1250 rd_kafka_broker_lock(rktp->rktp_leader);
1251 leader_nodeid = rktp->rktp_leader->rkb_nodeid;
1252 rd_kafka_broker_unlock(rktp->rktp_leader);
1253 }
1254
1255 /* Grab a copy of the latest finalized offset stats */
1256 offs = rktp->rktp_offsets_fin;
1257
1258 /* Calculate consumer_lag by using the highest offset
1259 * of app_offset (the last message passed to application + 1)
1260 * or the committed_offset (the last message committed by this or
1261 * another consumer).
1262 * Using app_offset allows consumer_lag to be up to date even if
1263 * offsets are not (yet) committed.
1264 */
1265 if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID &&
1266 (rktp->rktp_app_offset >= 0 || rktp->rktp_committed_offset >= 0)) {
1267 consumer_lag = rktp->rktp_hi_offset -
1268 RD_MAX(rktp->rktp_app_offset,
1269 rktp->rktp_committed_offset);
1270 if (unlikely(consumer_lag) < 0)
1271 consumer_lag = 0;
1272 }
1273
1274 _st_printf("%s\"%"PRId32"\": { "
1275 "\"partition\":%"PRId32", "
1276 "\"leader\":%"PRId32", "
1277 "\"desired\":%s, "
1278 "\"unknown\":%s, "
1279 "\"msgq_cnt\":%i, "
1280 "\"msgq_bytes\":%"PRIusz", "
1281 "\"xmit_msgq_cnt\":%i, "
1282 "\"xmit_msgq_bytes\":%"PRIusz", "
1283 "\"fetchq_cnt\":%i, "
1284 "\"fetchq_size\":%"PRIu64", "
1285 "\"fetch_state\":\"%s\", "
1286 "\"query_offset\":%"PRId64", "
1287 "\"next_offset\":%"PRId64", "
1288 "\"app_offset\":%"PRId64", "
1289 "\"stored_offset\":%"PRId64", "
1290 "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
1291 "\"committed_offset\":%"PRId64", "
1292 "\"eof_offset\":%"PRId64", "
1293 "\"lo_offset\":%"PRId64", "
1294 "\"hi_offset\":%"PRId64", "
1295 "\"consumer_lag\":%"PRId64", "
1296 "\"txmsgs\":%"PRIu64", "
1297 "\"txbytes\":%"PRIu64", "
1298 "\"rxmsgs\":%"PRIu64", "
1299 "\"rxbytes\":%"PRIu64", "
1300 "\"msgs\": %"PRIu64", "
1301 "\"rx_ver_drops\": %"PRIu64", "
1302 "\"msgs_inflight\": %"PRId32", "
1303 "\"next_ack_seq\": %"PRId32", "
1304 "\"next_err_seq\": %"PRId32", "
1305 "\"acked_msgid\": %"PRIu64
1306 "} ",
1307 first ? "" : ", ",
1308 rktp->rktp_partition,
1309 rktp->rktp_partition,
1310 leader_nodeid,
1311 (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
1312 (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
1313 rd_kafka_msgq_len(&rktp->rktp_msgq),
1314 rd_kafka_msgq_size(&rktp->rktp_msgq),
1315 /* FIXME: xmit_msgq is local to the broker thread. */
1316 0,
1317 (size_t)0,
1318 rd_kafka_q_len(rktp->rktp_fetchq),
1319 rd_kafka_q_size(rktp->rktp_fetchq),
1320 rd_kafka_fetch_states[rktp->rktp_fetch_state],
1321 rktp->rktp_query_offset,
1322 offs.fetch_offset,
1323 rktp->rktp_app_offset,
1324 rktp->rktp_stored_offset,
1325 rktp->rktp_committed_offset, /* FIXME: issue #80 */
1326 rktp->rktp_committed_offset,
1327 offs.eof_offset,
1328 rktp->rktp_lo_offset,
1329 rktp->rktp_hi_offset,
1330 consumer_lag,
1331 rd_atomic64_get(&rktp->rktp_c.tx_msgs),
1332 rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes),
1333 rd_atomic64_get(&rktp->rktp_c.rx_msgs),
1334 rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes),
1335 rk->rk_type == RD_KAFKA_PRODUCER ?
1336 rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs) :
1337 rd_atomic64_get(&rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */
1338 rd_atomic64_get(&rktp->rktp_c.rx_ver_drops),
1339 rd_atomic32_get(&rktp->rktp_msgs_inflight),
1340 rktp->rktp_eos.next_ack_seq,
1341 rktp->rktp_eos.next_err_seq,
1342 rktp->rktp_eos.acked_msgid);
1343
1344 if (total) {
1345 total->txmsgs += rd_atomic64_get(&rktp->rktp_c.tx_msgs);
1346 total->txmsg_bytes += rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes);
1347 total->rxmsgs += rd_atomic64_get(&rktp->rktp_c.rx_msgs);
1348 total->rxmsg_bytes += rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes);
1349 }
1350
1351 rd_kafka_toppar_unlock(rktp);
1352}
1353
1354/**
1355 * @brief Emit broker request type stats
1356 */
1357static void rd_kafka_stats_emit_broker_reqs (struct _stats_emit *st,
1358 rd_kafka_broker_t *rkb) {
1359 /* Filter out request types that will never be sent by the client. */
1360 static const rd_bool_t filter[4][RD_KAFKAP__NUM] = {
1361 [RD_KAFKA_PRODUCER] = {
1362 [RD_KAFKAP_Fetch] = rd_true,
1363 [RD_KAFKAP_OffsetCommit] = rd_true,
1364 [RD_KAFKAP_OffsetFetch] = rd_true,
1365 [RD_KAFKAP_GroupCoordinator] = rd_true,
1366 [RD_KAFKAP_JoinGroup] = rd_true,
1367 [RD_KAFKAP_Heartbeat] = rd_true,
1368 [RD_KAFKAP_LeaveGroup] = rd_true,
1369 [RD_KAFKAP_SyncGroup] = rd_true
1370 },
1371 [RD_KAFKA_CONSUMER] = {
1372 [RD_KAFKAP_Produce] = rd_true,
1373 [RD_KAFKAP_InitProducerId] = rd_true
1374 },
1375 [2/*any client type*/] = {
1376 [RD_KAFKAP_UpdateMetadata] = rd_true,
1377 [RD_KAFKAP_ControlledShutdown] = rd_true,
1378 [RD_KAFKAP_LeaderAndIsr] = rd_true,
1379 [RD_KAFKAP_StopReplica] = rd_true,
1380 [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true,
1381
1382 /* FIXME: Remove when transaction support is added */
1383 [RD_KAFKAP_AddPartitionsToTxn] = rd_true,
1384 [RD_KAFKAP_AddOffsetsToTxn] = rd_true,
1385 [RD_KAFKAP_EndTxn] = rd_true,
1386
1387 [RD_KAFKAP_WriteTxnMarkers] = rd_true,
1388 [RD_KAFKAP_TxnOffsetCommit] = rd_true,
1389
1390 [RD_KAFKAP_AlterReplicaLogDirs] = rd_true,
1391 [RD_KAFKAP_DescribeLogDirs] = rd_true,
1392
1393 /* FIXME: Remove when re-auth support is added */
1394 [RD_KAFKAP_SaslAuthenticate] = rd_true,
1395
1396 [RD_KAFKAP_CreateDelegationToken] = rd_true,
1397 [RD_KAFKAP_RenewDelegationToken] = rd_true,
1398 [RD_KAFKAP_ExpireDelegationToken] = rd_true,
1399 [RD_KAFKAP_DescribeDelegationToken] = rd_true
1400 },
1401 [3/*hide-unless-non-zero*/] = {
1402 /* Hide Admin requests unless they've been used */
1403 [RD_KAFKAP_CreateTopics] = rd_true,
1404 [RD_KAFKAP_DeleteTopics] = rd_true,
1405 [RD_KAFKAP_DeleteRecords] = rd_true,
1406 [RD_KAFKAP_CreatePartitions] = rd_true,
1407 [RD_KAFKAP_DescribeAcls] = rd_true,
1408 [RD_KAFKAP_CreateAcls] = rd_true,
1409 [RD_KAFKAP_DeleteAcls] = rd_true,
1410 [RD_KAFKAP_DescribeConfigs] = rd_true,
1411 [RD_KAFKAP_AlterConfigs] = rd_true,
1412 [RD_KAFKAP_DeleteGroups] = rd_true,
1413 [RD_KAFKAP_ListGroups] = rd_true,
1414 [RD_KAFKAP_DescribeGroups] = rd_true
1415 }
1416 };
1417 int i;
1418 int cnt = 0;
1419
1420 _st_printf("\"req\": { ");
1421 for (i = 0 ; i < RD_KAFKAP__NUM ; i++) {
1422 int64_t v;
1423
1424 if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i])
1425 continue;
1426
1427 v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]);
1428 if (!v && filter[3][i])
1429 continue; /* Filter out zero values */
1430
1431 _st_printf("%s\"%s\": %"PRId64,
1432 cnt > 0 ? ", " : "",
1433 rd_kafka_ApiKey2str(i), v);
1434
1435 cnt++;
1436 }
1437 _st_printf(" }, ");
1438}
1439
1440
1441/**
1442 * Emit all statistics
1443 */
1444static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
1445 rd_kafka_broker_t *rkb;
1446 rd_kafka_itopic_t *rkt;
1447 shptr_rd_kafka_toppar_t *s_rktp;
1448 rd_ts_t now;
1449 rd_kafka_op_t *rko;
1450 unsigned int tot_cnt;
1451 size_t tot_size;
1452 rd_kafka_resp_err_t err;
1453 struct _stats_emit stx = { .size = 1024*10 };
1454 struct _stats_emit *st = &stx;
1455 struct _stats_total total = {0};
1456
1457 st->buf = rd_malloc(st->size);
1458
1459
1460 rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1461 rd_kafka_rdlock(rk);
1462
1463 now = rd_clock();
1464 _st_printf("{ "
1465 "\"name\": \"%s\", "
1466 "\"client_id\": \"%s\", "
1467 "\"type\": \"%s\", "
1468 "\"ts\":%"PRId64", "
1469 "\"time\":%lli, "
1470 "\"replyq\":%i, "
1471 "\"msg_cnt\":%u, "
1472 "\"msg_size\":%"PRIusz", "
1473 "\"msg_max\":%u, "
1474 "\"msg_size_max\":%"PRIusz", "
1475 "\"simple_cnt\":%i, "
1476 "\"metadata_cache_cnt\":%i, "
1477 "\"brokers\":{ "/*open brokers*/,
1478 rk->rk_name,
1479 rk->rk_conf.client_id_str,
1480 rd_kafka_type2str(rk->rk_type),
1481 now,
1482 (signed long long)time(NULL),
1483 rd_kafka_q_len(rk->rk_rep),
1484 tot_cnt, tot_size,
1485 rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
1486 rd_atomic32_get(&rk->rk_simple_cnt),
1487 rk->rk_metadata_cache.rkmc_cnt);
1488
1489
1490 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1491 rd_kafka_toppar_t *rktp;
1492
1493 rd_kafka_broker_lock(rkb);
1494 _st_printf("%s\"%s\": { "/*open broker*/
1495 "\"name\":\"%s\", "
1496 "\"nodeid\":%"PRId32", "
1497 "\"nodename\":\"%s\", "
1498 "\"source\":\"%s\", "
1499 "\"state\":\"%s\", "
1500 "\"stateage\":%"PRId64", "
1501 "\"outbuf_cnt\":%i, "
1502 "\"outbuf_msg_cnt\":%i, "
1503 "\"waitresp_cnt\":%i, "
1504 "\"waitresp_msg_cnt\":%i, "
1505 "\"tx\":%"PRIu64", "
1506 "\"txbytes\":%"PRIu64", "
1507 "\"txerrs\":%"PRIu64", "
1508 "\"txretries\":%"PRIu64", "
1509 "\"req_timeouts\":%"PRIu64", "
1510 "\"rx\":%"PRIu64", "
1511 "\"rxbytes\":%"PRIu64", "
1512 "\"rxerrs\":%"PRIu64", "
1513 "\"rxcorriderrs\":%"PRIu64", "
1514 "\"rxpartial\":%"PRIu64", "
1515 "\"zbuf_grow\":%"PRIu64", "
1516 "\"buf_grow\":%"PRIu64", "
1517 "\"wakeups\":%"PRIu64", "
1518 "\"connects\":%"PRId32", "
1519 "\"disconnects\":%"PRId32", ",
1520 rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
1521 rkb->rkb_name,
1522 rkb->rkb_name,
1523 rkb->rkb_nodeid,
1524 rkb->rkb_nodename,
1525 rd_kafka_confsource2str(rkb->rkb_source),
1526 rd_kafka_broker_state_names[rkb->rkb_state],
1527 rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
1528 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
1529 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
1530 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
1531 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
1532 rd_atomic64_get(&rkb->rkb_c.tx),
1533 rd_atomic64_get(&rkb->rkb_c.tx_bytes),
1534 rd_atomic64_get(&rkb->rkb_c.tx_err),
1535 rd_atomic64_get(&rkb->rkb_c.tx_retries),
1536 rd_atomic64_get(&rkb->rkb_c.req_timeouts),
1537 rd_atomic64_get(&rkb->rkb_c.rx),
1538 rd_atomic64_get(&rkb->rkb_c.rx_bytes),
1539 rd_atomic64_get(&rkb->rkb_c.rx_err),
1540 rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
1541 rd_atomic64_get(&rkb->rkb_c.rx_partial),
1542 rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
1543 rd_atomic64_get(&rkb->rkb_c.buf_grow),
1544 rd_atomic64_get(&rkb->rkb_c.wakeups),
1545 rd_atomic32_get(&rkb->rkb_c.connects),
1546 rd_atomic32_get(&rkb->rkb_c.disconnects));
1547
1548 total.tx += rd_atomic64_get(&rkb->rkb_c.tx);
1549 total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes);
1550 total.rx += rd_atomic64_get(&rkb->rkb_c.rx);
1551 total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes);
1552
1553 rd_kafka_stats_emit_avg(st, "int_latency",
1554 &rkb->rkb_avg_int_latency);
1555 rd_kafka_stats_emit_avg(st, "outbuf_latency",
1556 &rkb->rkb_avg_outbuf_latency);
1557 rd_kafka_stats_emit_avg(st, "rtt", &rkb->rkb_avg_rtt);
1558 rd_kafka_stats_emit_avg(st, "throttle", &rkb->rkb_avg_throttle);
1559
1560 rd_kafka_stats_emit_broker_reqs(st, rkb);
1561
1562 _st_printf("\"toppars\":{ "/*open toppars*/);
1563
1564 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
1565 _st_printf("%s\"%.*s-%"PRId32"\": { "
1566 "\"topic\":\"%.*s\", "
1567 "\"partition\":%"PRId32"} ",
1568 rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
1569 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1570 rktp->rktp_partition,
1571 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1572 rktp->rktp_partition);
1573 }
1574
1575 rd_kafka_broker_unlock(rkb);
1576
1577 _st_printf("} "/*close toppars*/
1578 "} "/*close broker*/);
1579 }
1580
1581
1582 _st_printf("}, " /* close "brokers" array */
1583 "\"topics\":{ ");
1584
1585 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
1586 int i, j;
1587
1588 rd_kafka_topic_rdlock(rkt);
1589 _st_printf("%s\"%.*s\": { "
1590 "\"topic\":\"%.*s\", "
1591 "\"metadata_age\":%"PRId64", ",
1592 rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
1593 RD_KAFKAP_STR_PR(rkt->rkt_topic),
1594 RD_KAFKAP_STR_PR(rkt->rkt_topic),
1595 rkt->rkt_ts_metadata ?
1596 (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0);
1597
1598 rd_kafka_stats_emit_avg(st, "batchsize",
1599 &rkt->rkt_avg_batchsize);
1600 rd_kafka_stats_emit_avg(st, "batchcnt",
1601 &rkt->rkt_avg_batchcnt);
1602
1603 _st_printf("\"partitions\":{ " /*open partitions*/);
1604
1605 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
1606 rd_kafka_stats_emit_toppar(
1607 st, &total,
1608 rd_kafka_toppar_s2i(rkt->rkt_p[i]),
1609 i == 0);
1610
1611 RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j)
1612 rd_kafka_stats_emit_toppar(
1613 st, &total,
1614 rd_kafka_toppar_s2i(s_rktp),
1615 i+j == 0);
1616
1617 i += j;
1618
1619 if (rkt->rkt_ua)
1620 rd_kafka_stats_emit_toppar(
1621 st, NULL,
1622 rd_kafka_toppar_s2i(rkt->rkt_ua),
1623 i++ == 0);
1624
1625 rd_kafka_topic_rdunlock(rkt);
1626
1627 _st_printf("} "/*close partitions*/
1628 "} "/*close topic*/);
1629
1630 }
1631 _st_printf("} "/*close topics*/);
1632
1633 if (rk->rk_cgrp) {
1634 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1635 _st_printf(", \"cgrp\": { "
1636 "\"state\": \"%s\", "
1637 "\"stateage\": %"PRId64", "
1638 "\"join_state\": \"%s\", "
1639 "\"rebalance_age\": %"PRId64", "
1640 "\"rebalance_cnt\": %d, "
1641 "\"rebalance_reason\": \"%s\", "
1642 "\"assignment_size\": %d }",
1643 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1644 rkcg->rkcg_ts_statechange ?
1645 (now - rkcg->rkcg_ts_statechange) / 1000 : 0,
1646 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
1647 rkcg->rkcg_c.ts_rebalance ?
1648 (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
1649 rkcg->rkcg_c.rebalance_cnt,
1650 rkcg->rkcg_c.rebalance_reason,
1651 rkcg->rkcg_c.assignment_size);
1652 }
1653
1654 if (rd_kafka_is_idempotent(rk)) {
1655 _st_printf(", \"eos\": { "
1656 "\"idemp_state\": \"%s\", "
1657 "\"idemp_stateage\": %"PRId64", "
1658 "\"producer_id\": %"PRId64", "
1659 "\"producer_epoch\": %hd, "
1660 "\"epoch_cnt\": %d "
1661 "}",
1662 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state),
1663 (rd_clock() - rk->rk_eos.ts_idemp_state) / 1000,
1664 rk->rk_eos.pid.id,
1665 rk->rk_eos.pid.epoch,
1666 rk->rk_eos.epoch_cnt);
1667 }
1668
1669 if ((err = rd_atomic32_get(&rk->rk_fatal.err)))
1670 _st_printf(", \"fatal\": { "
1671 "\"error\": \"%s\", "
1672 "\"reason\": \"%s\", "
1673 "\"cnt\": %d "
1674 "}",
1675 rd_kafka_err2str(err),
1676 rk->rk_fatal.errstr,
1677 rk->rk_fatal.cnt);
1678
1679 rd_kafka_rdunlock(rk);
1680
1681 /* Total counters */
1682 _st_printf(", "
1683 "\"tx\":%"PRId64", "
1684 "\"tx_bytes\":%"PRId64", "
1685 "\"rx\":%"PRId64", "
1686 "\"rx_bytes\":%"PRId64", "
1687 "\"txmsgs\":%"PRId64", "
1688 "\"txmsg_bytes\":%"PRId64", "
1689 "\"rxmsgs\":%"PRId64", "
1690 "\"rxmsg_bytes\":%"PRId64,
1691 total.tx,
1692 total.tx_bytes,
1693 total.rx,
1694 total.rx_bytes,
1695 total.txmsgs,
1696 total.txmsg_bytes,
1697 total.rxmsgs,
1698 total.rxmsg_bytes);
1699
1700 _st_printf("}"/*close object*/);
1701
1702
1703 /* Enqueue op for application */
1704 rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
1705 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
1706 rko->rko_u.stats.json = st->buf;
1707 rko->rko_u.stats.json_len = st->of;
1708 rd_kafka_q_enq(rk->rk_rep, rko);
1709}
1710
1711
1712/**
1713 * @brief 1 second generic timer.
1714 *
1715 * @locality rdkafka main thread
1716 * @locks none
1717 */
1718static void rd_kafka_1s_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1719 rd_kafka_t *rk = rkts->rkts_rk;
1720
1721 /* Scan topic state, message timeouts, etc. */
1722 rd_kafka_topic_scan_all(rk, rd_clock());
1723
1724 /* Sparse connections:
1725 * try to maintain at least one connection to the cluster. */
1726 if (rk->rk_conf.sparse_connections &&
1727 rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
1728 rd_kafka_connect_any(rk, "no cluster connection");
1729
1730}
1731
1732static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1733 rd_kafka_t *rk = rkts->rkts_rk;
1734 rd_kafka_stats_emit_all(rk);
1735}
1736
1737
1738/**
1739 * @brief Periodic metadata refresh callback
1740 *
1741 * @locality rdkafka main thread
1742 */
1743static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
1744 rd_kafka_t *rk = rkts->rkts_rk;
1745 int sparse = 1;
1746
1747 /* Dont do sparse requests if there is a consumer group with an
1748 * active subscription since subscriptions need to be able to match
1749 * on all topics. */
1750 if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp &&
1751 rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
1752 sparse = 0;
1753
1754 if (sparse)
1755 rd_kafka_metadata_refresh_known_topics(
1756 rk, NULL, 1/*force*/, "periodic refresh");
1757 else
1758 rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
1759}
1760
1761
1762
1763/**
1764 * @brief Wait for background threads to initialize.
1765 *
1766 * @returns the number of background threads still not initialized.
1767 *
1768 * @locality app thread calling rd_kafka_new()
1769 * @locks none
1770 */
1771static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) {
1772 struct timespec tspec;
1773 int ret;
1774
1775 rd_timeout_init_timespec(&tspec, timeout_ms);
1776
1777 mtx_lock(&rk->rk_init_lock);
1778 while (rk->rk_init_wait_cnt > 0 &&
1779 cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock,
1780 &tspec) == thrd_success)
1781 ;
1782 ret = rk->rk_init_wait_cnt;
1783 mtx_unlock(&rk->rk_init_lock);
1784
1785 return ret;
1786}
1787
1788
1789/**
1790 * Main loop for Kafka handler thread.
1791 */
1792static int rd_kafka_thread_main (void *arg) {
1793 rd_kafka_t *rk = arg;
1794 rd_kafka_timer_t tmr_1s = RD_ZERO_INIT;
1795 rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
1796 rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
1797
1798 rd_kafka_set_thread_name("main");
1799 rd_kafka_set_thread_sysname("rdk:main");
1800
1801 (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
1802
1803 /* Acquire lock (which was held by thread creator during creation)
1804 * to synchronise state. */
1805 rd_kafka_wrlock(rk);
1806 rd_kafka_wrunlock(rk);
1807
1808 /* 1 second timer for topic scan and connection checking. */
1809 rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000,
1810 rd_kafka_1s_tmr_cb, NULL);
1811 if (rk->rk_conf.stats_interval_ms)
1812 rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
1813 rk->rk_conf.stats_interval_ms * 1000ll,
1814 rd_kafka_stats_emit_tmr_cb, NULL);
1815 if (rk->rk_conf.metadata_refresh_interval_ms > 0)
1816 rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
1817 rk->rk_conf.metadata_refresh_interval_ms *
1818 1000ll,
1819 rd_kafka_metadata_refresh_cb, NULL);
1820
1821 if (rk->rk_cgrp)
1822 rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
1823
1824 if (rd_kafka_is_idempotent(rk))
1825 rd_kafka_idemp_init(rk);
1826
1827 mtx_lock(&rk->rk_init_lock);
1828 rk->rk_init_wait_cnt--;
1829 cnd_broadcast(&rk->rk_init_cnd);
1830 mtx_unlock(&rk->rk_init_lock);
1831
1832 while (likely(!rd_kafka_terminating(rk) ||
1833 rd_kafka_q_len(rk->rk_ops))) {
1834 rd_ts_t sleeptime = rd_kafka_timers_next(
1835 &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
1836 rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
1837 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
1838 if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
1839 rd_kafka_cgrp_serve(rk->rk_cgrp);
1840 rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
1841 }
1842
1843 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1844 "Internal main thread terminating");
1845
1846 if (rd_kafka_is_idempotent(rk))
1847 rd_kafka_idemp_term(rk);
1848
1849 rd_kafka_q_disable(rk->rk_ops);
1850 rd_kafka_q_purge(rk->rk_ops);
1851
1852 rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1);
1853 if (rk->rk_conf.stats_interval_ms)
1854 rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
1855 rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
1856
1857 /* Synchronise state */
1858 rd_kafka_wrlock(rk);
1859 rd_kafka_wrunlock(rk);
1860
1861 rd_kafka_destroy_internal(rk);
1862
1863 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1864 "Internal main thread termination done");
1865
1866 rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
1867
1868 return 0;
1869}
1870
1871
1872static void rd_kafka_term_sig_handler (int sig) {
1873 /* nop */
1874}
1875
1876
1877rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
1878 char *errstr, size_t errstr_size) {
1879 rd_kafka_t *rk;
1880 static rd_atomic32_t rkid;
1881 rd_kafka_conf_t *conf;
1882 rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1883 int ret_errno = 0;
1884 const char *conf_err;
1885#ifndef _MSC_VER
1886 sigset_t newset, oldset;
1887#endif
1888 char builtin_features[128];
1889 size_t bflen;
1890
1891 rd_kafka_global_init();
1892
1893 /* rd_kafka_new() takes ownership of the provided \p app_conf
1894 * object if rd_kafka_new() succeeds.
1895 * Since \p app_conf is optional we allocate a default configuration
1896 * object here if \p app_conf is NULL.
1897 * The configuration object itself is struct-copied later
1898 * leaving the default *conf pointer to be ready for freeing.
1899 * In case new() fails and app_conf was specified we will clear out
1900 * rk_conf to avoid double-freeing from destroy_internal() and the
1901 * user's eventual call to rd_kafka_conf_destroy().
1902 * This is all a bit tricky but that's the nature of
1903 * legacy interfaces. */
1904 if (!app_conf)
1905 conf = rd_kafka_conf_new();
1906 else
1907 conf = app_conf;
1908
1909 /* Verify and finalize configuration */
1910 if ((conf_err = rd_kafka_conf_finalize(type, conf))) {
1911 /* Incompatible configuration settings */
1912 rd_snprintf(errstr, errstr_size, "%s", conf_err);
1913 if (!app_conf)
1914 rd_kafka_conf_destroy(conf);
1915 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
1916 return NULL;
1917 }
1918
1919
1920 rd_kafka_global_cnt_incr();
1921
1922 /*
1923 * Set up the handle.
1924 */
1925 rk = rd_calloc(1, sizeof(*rk));
1926
1927 rk->rk_type = type;
1928
1929 /* Struct-copy the config object. */
1930 rk->rk_conf = *conf;
1931 if (!app_conf)
1932 rd_free(conf); /* Free the base config struct only,
1933 * not its fields since they were copied to
1934 * rk_conf just above. Those fields are
1935 * freed from rd_kafka_destroy_internal()
1936 * as the rk itself is destroyed. */
1937
1938 /* Call on_new() interceptors */
1939 rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
1940
1941 rwlock_init(&rk->rk_lock);
1942 mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
1943
1944 cnd_init(&rk->rk_broker_state_change_cnd);
1945 mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
1946 rd_list_init(&rk->rk_broker_state_change_waiters, 8,
1947 rd_kafka_enq_once_trigger_destroy);
1948
1949 cnd_init(&rk->rk_init_cnd);
1950 mtx_init(&rk->rk_init_lock, mtx_plain);
1951
1952 rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
1953 rd_interval_init(&rk->rk_suppress.sparse_connect_random);
1954 mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);
1955
1956 rd_atomic64_init(&rk->rk_ts_last_poll, INT64_MAX);
1957
1958 rk->rk_rep = rd_kafka_q_new(rk);
1959 rk->rk_ops = rd_kafka_q_new(rk);
1960 rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
1961 rk->rk_ops->rkq_opaque = rk;
1962
1963 if (rk->rk_conf.log_queue) {
1964 rk->rk_logq = rd_kafka_q_new(rk);
1965 rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
1966 rk->rk_logq->rkq_opaque = rk;
1967 }
1968
1969 TAILQ_INIT(&rk->rk_brokers);
1970 TAILQ_INIT(&rk->rk_topics);
1971 rd_kafka_timers_init(&rk->rk_timers, rk);
1972 rd_kafka_metadata_cache_init(rk);
1973
1974 if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
1975 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
1976 if (rk->rk_conf.rebalance_cb)
1977 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
1978 if (rk->rk_conf.offset_commit_cb)
1979 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
1980 if (rk->rk_conf.error_cb)
1981 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
1982#if WITH_SASL_OAUTHBEARER
1983 if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt &&
1984 !rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
1985 rd_kafka_conf_set_oauthbearer_token_refresh_cb(
1986 &rk->rk_conf,
1987 rd_kafka_oauthbearer_unsecured_token);
1988
1989 if (rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
1990 rk->rk_conf.enabled_events |=
1991 RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
1992#endif
1993
1994 rk->rk_controllerid = -1;
1995
1996 /* Admin client defaults */
1997 rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms;
1998
1999 /* Convenience Kafka protocol null bytes */
2000 rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0);
2001
2002 if (rk->rk_conf.debug)
2003 rk->rk_conf.log_level = LOG_DEBUG;
2004
2005 rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
2006 rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
2007 rd_atomic32_add(&rkid, 1));
2008
2009 /* Construct clientid kafka string */
2010 rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
2011
2012 /* Convert group.id to kafka string (may be NULL) */
2013 rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
2014
2015 /* Config fixups */
2016 rk->rk_conf.queued_max_msg_bytes =
2017 (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
2018
2019 /* Enable api.version.request=true if fallback.broker.version
2020 * indicates a supporting broker. */
2021 if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
2022 rk->rk_conf.api_version_request = 1;
2023
2024 if (rk->rk_type == RD_KAFKA_PRODUCER) {
2025 mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
2026 cnd_init(&rk->rk_curr_msgs.cnd);
2027 rk->rk_curr_msgs.max_cnt =
2028 rk->rk_conf.queue_buffering_max_msgs;
2029 if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 >
2030 (unsigned long long)SIZE_MAX)
2031 rk->rk_curr_msgs.max_size = SIZE_MAX;
2032 else
2033 rk->rk_curr_msgs.max_size =
2034 (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024;
2035 }
2036
2037 if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
2038 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2039 ret_errno = EINVAL;
2040 goto fail;
2041 }
2042
2043 if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
2044 rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
2045 /* Select SASL provider */
2046 if (rd_kafka_sasl_select_provider(rk,
2047 errstr, errstr_size) == -1) {
2048 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2049 ret_errno = EINVAL;
2050 goto fail;
2051 }
2052
2053 /* Initialize SASL provider */
2054 if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) {
2055 rk->rk_conf.sasl.provider = NULL;
2056 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2057 ret_errno = EINVAL;
2058 goto fail;
2059 }
2060 }
2061
2062#if WITH_SSL
2063 if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
2064 rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
2065 /* Create SSL context */
2066 if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) {
2067 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2068 ret_errno = EINVAL;
2069 goto fail;
2070 }
2071 }
2072#endif
2073
2074 /* Client group, eligible both in consumer and producer mode. */
2075 if (type == RD_KAFKA_CONSUMER &&
2076 RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
2077 rk->rk_cgrp = rd_kafka_cgrp_new(rk,
2078 rk->rk_group_id,
2079 rk->rk_client_id);
2080
2081 rk->rk_eos.transactional_id = rd_kafkap_str_new(NULL, 0);
2082
2083#ifndef _MSC_VER
2084 /* Block all signals in newly created threads.
2085 * To avoid race condition we block all signals in the calling
2086 * thread, which the new thread will inherit its sigmask from,
2087 * and then restore the original sigmask of the calling thread when
2088 * we're done creating the thread. */
2089 sigemptyset(&oldset);
2090 sigfillset(&newset);
2091 if (rk->rk_conf.term_sig) {
2092 struct sigaction sa_term = {
2093 .sa_handler = rd_kafka_term_sig_handler
2094 };
2095 sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
2096 }
2097 pthread_sigmask(SIG_SETMASK, &newset, &oldset);
2098#endif
2099
2100 mtx_lock(&rk->rk_init_lock);
2101
2102 /* Create background thread and queue if background_event_cb()
2103 * has been configured.
2104 * Do this before creating the main thread since after
2105 * the main thread is created it is no longer trivial to error
2106 * out from rd_kafka_new(). */
2107 if (rk->rk_conf.background_event_cb) {
2108 /* Hold off background thread until thrd_create() is done. */
2109 rd_kafka_wrlock(rk);
2110
2111 rk->rk_background.q = rd_kafka_q_new(rk);
2112
2113 rk->rk_init_wait_cnt++;
2114
2115 if ((thrd_create(&rk->rk_background.thread,
2116 rd_kafka_background_thread_main, rk)) !=
2117 thrd_success) {
2118 rk->rk_init_wait_cnt--;
2119 ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2120 ret_errno = errno;
2121 if (errstr)
2122 rd_snprintf(errstr, errstr_size,
2123 "Failed to create background "
2124 "thread: %s (%i)",
2125 rd_strerror(errno), errno);
2126 rd_kafka_wrunlock(rk);
2127 mtx_unlock(&rk->rk_init_lock);
2128
2129#ifndef _MSC_VER
2130 /* Restore sigmask of caller */
2131 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2132#endif
2133 goto fail;
2134 }
2135
2136 rd_kafka_wrunlock(rk);
2137 }
2138
2139
2140
2141 /* Lock handle here to synchronise state, i.e., hold off
2142 * the thread until we've finalized the handle. */
2143 rd_kafka_wrlock(rk);
2144
2145 /* Create handler thread */
2146 rk->rk_init_wait_cnt++;
2147 if ((thrd_create(&rk->rk_thread,
2148 rd_kafka_thread_main, rk)) != thrd_success) {
2149 rk->rk_init_wait_cnt--;
2150 ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2151 ret_errno = errno;
2152 if (errstr)
2153 rd_snprintf(errstr, errstr_size,
2154 "Failed to create thread: %s (%i)",
2155 rd_strerror(errno), errno);
2156 rd_kafka_wrunlock(rk);
2157 mtx_unlock(&rk->rk_init_lock);
2158#ifndef _MSC_VER
2159 /* Restore sigmask of caller */
2160 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2161#endif
2162 goto fail;
2163 }
2164
2165 rd_kafka_wrunlock(rk);
2166 mtx_unlock(&rk->rk_init_lock);
2167
2168 /*
2169 * @warning `goto fail` is prohibited past this point
2170 */
2171
2172 mtx_lock(&rk->rk_internal_rkb_lock);
2173 rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
2174 RD_KAFKA_PROTO_PLAINTEXT,
2175 "", 0, RD_KAFKA_NODEID_UA);
2176 mtx_unlock(&rk->rk_internal_rkb_lock);
2177
2178 /* Add initial list of brokers from configuration */
2179 if (rk->rk_conf.brokerlist) {
2180 if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
2181 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
2182 "No brokers configured");
2183 }
2184
2185#ifndef _MSC_VER
2186 /* Restore sigmask of caller */
2187 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2188#endif
2189
2190 /* Free user supplied conf's base pointer on success,
2191 * but not the actual allocated fields since the struct
2192 * will have been copied in its entirety above. */
2193 if (app_conf)
2194 rd_free(app_conf);
2195 rd_kafka_set_last_error(0, 0);
2196
2197 rd_kafka_conf_warn(rk);
2198
2199 /* Wait for background threads to fully initialize so that
2200 * the client instance is fully functional at the time it is
2201 * returned from the constructor. */
2202 if (rd_kafka_init_wait(rk, 60*1000) != 0) {
2203 /* This should never happen unless there is a bug
2204 * or the OS is not scheduling the background threads.
2205 * Either case there is no point in handling this gracefully
2206 * in the current state since the thread joins are likely
2207 * to hang as well. */
2208 mtx_lock(&rk->rk_init_lock);
2209 rd_kafka_log(rk, LOG_CRIT, "INIT",
2210 "Failed to initialize %s: "
2211 "%d background thread(s) did not initialize "
2212 "within 60 seconds",
2213 rk->rk_name, rk->rk_init_wait_cnt);
2214 if (errstr)
2215 rd_snprintf(errstr, errstr_size,
2216 "Timed out waiting for "
2217 "%d background thread(s) to initialize",
2218 rk->rk_init_wait_cnt);
2219 mtx_unlock(&rk->rk_init_lock);
2220
2221 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
2222 EDEADLK);
2223 return NULL;
2224 }
2225
2226 rk->rk_initialized = 1;
2227
2228 bflen = sizeof(builtin_features);
2229 if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features",
2230 builtin_features, &bflen) !=
2231 RD_KAFKA_CONF_OK)
2232 rd_snprintf(builtin_features, sizeof(builtin_features), "?");
2233 rd_kafka_dbg(rk, ALL, "INIT",
2234 "librdkafka v%s (0x%x) %s initialized "
2235 "(builtin.features %s, %s, debug 0x%x)",
2236 rd_kafka_version_str(), rd_kafka_version(),
2237 rk->rk_name,
2238 builtin_features, BUILT_WITH,
2239 rk->rk_conf.debug);
2240
2241 /* Log warnings for deprecated configuration */
2242 rd_kafka_conf_warn(rk);
2243
2244 return rk;
2245
2246fail:
2247 /*
2248 * Error out and clean up
2249 */
2250
2251 /*
2252 * Tell background thread to terminate and wait for it to return.
2253 */
2254 rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE);
2255
2256 /* Terminate SASL provider */
2257 if (rk->rk_conf.sasl.provider)
2258 rd_kafka_sasl_term(rk);
2259
2260 if (rk->rk_background.thread) {
2261 int res;
2262 thrd_join(rk->rk_background.thread, &res);
2263 rd_kafka_q_destroy_owner(rk->rk_background.q);
2264 }
2265
2266 /* If on_new() interceptors have been called we also need
2267 * to allow interceptor clean-up by calling on_destroy() */
2268 rd_kafka_interceptors_on_destroy(rk);
2269
2270 /* If rk_conf is a struct-copy of the application configuration
2271 * we need to avoid rk_conf fields from being freed from
2272 * rd_kafka_destroy_internal() since they belong to app_conf.
2273 * However, there are some internal fields, such as interceptors,
2274 * that belong to rk_conf and thus needs to be cleaned up.
2275 * Legacy APIs, sigh.. */
2276 if (app_conf) {
2277 rd_kafka_assignors_term(rk);
2278 rd_kafka_interceptors_destroy(&rk->rk_conf);
2279 memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
2280 }
2281
2282 rd_kafka_destroy_internal(rk);
2283 rd_kafka_destroy_final(rk);
2284
2285 rd_kafka_set_last_error(ret_err, ret_errno);
2286
2287 return NULL;
2288}
2289
2290
2291
2292
2293/**
2294 * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
2295 * friends) since it does not have an API for stopping the cgrp we will need to
2296 * sort that out automatically in the background when all consumption
2297 * has stopped.
2298 *
2299 * Returns 0 if a High level consumer is already instantiated
2300 * which means a Simple consumer cannot co-operate with it, else 1.
2301 *
2302 * A rd_kafka_t handle can never migrate from simple to high-level, or
2303 * vice versa, so we dont need a ..consumer_del().
2304 */
2305int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
2306 if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
2307 return 0;
2308
2309 return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
2310}
2311
2312
2313
2314
2315/**
2316 * rktp fetch is split up in these parts:
2317 * * application side:
2318 * * broker side (handled by current leader broker thread for rktp):
2319 * - the fetch state, initial offset, etc.
2320 * - fetching messages, updating fetched offset, etc.
2321 * - offset commits
2322 *
2323 * Communication between the two are:
2324 * app side -> rdkafka main side: rktp_ops
2325 * broker thread -> app side: rktp_fetchq
2326 *
2327 * There is no shared state between these threads, instead
2328 * state is communicated through the two op queues, and state synchronization
2329 * is performed by version barriers.
2330 *
2331 */
2332
2333static RD_UNUSED
2334int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
2335 int64_t offset, rd_kafka_q_t *rkq) {
2336 shptr_rd_kafka_toppar_t *s_rktp;
2337
2338 if (partition < 0) {
2339 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2340 ESRCH);
2341 return -1;
2342 }
2343
2344 if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
2345 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2346 return -1;
2347 }
2348
2349 rd_kafka_topic_wrlock(rkt);
2350 s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
2351 rd_kafka_topic_wrunlock(rkt);
2352
2353 /* Verify offset */
2354 if (offset == RD_KAFKA_OFFSET_BEGINNING ||
2355 offset == RD_KAFKA_OFFSET_END ||
2356 offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
2357 /* logical offsets */
2358
2359 } else if (offset == RD_KAFKA_OFFSET_STORED) {
2360 /* offset manager */
2361
2362 if (rkt->rkt_conf.offset_store_method ==
2363 RD_KAFKA_OFFSET_METHOD_BROKER &&
2364 RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
2365 /* Broker based offsets require a group id. */
2366 rd_kafka_toppar_destroy(s_rktp);
2367 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2368 EINVAL);
2369 return -1;
2370 }
2371
2372 } else if (offset < 0) {
2373 rd_kafka_toppar_destroy(s_rktp);
2374 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2375 EINVAL);
2376 return -1;
2377
2378 }
2379
2380 rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
2381 rkq, RD_KAFKA_NO_REPLYQ);
2382
2383 rd_kafka_toppar_destroy(s_rktp);
2384
2385 rd_kafka_set_last_error(0, 0);
2386 return 0;
2387}
2388
2389
2390
2391
2392int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
2393 int64_t offset) {
2394 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2395 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
2396 "Start consuming partition %"PRId32,partition);
2397 return rd_kafka_consume_start0(rkt, partition, offset, NULL);
2398}
2399
2400int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
2401 int64_t offset, rd_kafka_queue_t *rkqu) {
2402 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2403
2404 return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
2405}
2406
2407
2408
2409
2410static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
2411 rd_kafka_q_t *tmpq = NULL;
2412 rd_kafka_resp_err_t err;
2413
2414 rd_kafka_topic_wrlock(rktp->rktp_rkt);
2415 rd_kafka_toppar_lock(rktp);
2416 rd_kafka_toppar_desired_del(rktp);
2417 rd_kafka_toppar_unlock(rktp);
2418 rd_kafka_topic_wrunlock(rktp->rktp_rkt);
2419
2420 tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
2421
2422 rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
2423
2424 /* Synchronisation: Wait for stop reply from broker thread */
2425 err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2426 rd_kafka_q_destroy_owner(tmpq);
2427
2428 rd_kafka_set_last_error(err, err ? EINVAL : 0);
2429
2430 return err ? -1 : 0;
2431}
2432
2433
2434int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
2435 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2436 shptr_rd_kafka_toppar_t *s_rktp;
2437 int r;
2438
2439 if (partition == RD_KAFKA_PARTITION_UA) {
2440 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2441 return -1;
2442 }
2443
2444 rd_kafka_topic_wrlock(rkt);
2445 if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2446 !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2447 rd_kafka_topic_wrunlock(rkt);
2448 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2449 ESRCH);
2450 return -1;
2451 }
2452 rd_kafka_topic_wrunlock(rkt);
2453
2454 r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp));
2455 /* set_last_error() called by stop0() */
2456
2457 rd_kafka_toppar_destroy(s_rktp);
2458
2459 return r;
2460}
2461
2462
2463
2464rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
2465 int32_t partition,
2466 int64_t offset,
2467 int timeout_ms) {
2468 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2469 shptr_rd_kafka_toppar_t *s_rktp;
2470 rd_kafka_toppar_t *rktp;
2471 rd_kafka_q_t *tmpq = NULL;
2472 rd_kafka_resp_err_t err;
2473 rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ;
2474
2475 /* FIXME: simple consumer check */
2476
2477 if (partition == RD_KAFKA_PARTITION_UA)
2478 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2479
2480 rd_kafka_topic_rdlock(rkt);
2481 if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2482 !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2483 rd_kafka_topic_rdunlock(rkt);
2484 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2485 }
2486 rd_kafka_topic_rdunlock(rkt);
2487
2488 if (timeout_ms) {
2489 tmpq = rd_kafka_q_new(rkt->rkt_rk);
2490 replyq = RD_KAFKA_REPLYQ(tmpq, 0);
2491 }
2492
2493 rktp = rd_kafka_toppar_s2i(s_rktp);
2494 if ((err = rd_kafka_toppar_op_seek(rktp, offset, replyq))) {
2495 if (tmpq)
2496 rd_kafka_q_destroy_owner(tmpq);
2497 rd_kafka_toppar_destroy(s_rktp);
2498 return err;
2499 }
2500
2501 rd_kafka_toppar_destroy(s_rktp);
2502
2503 if (tmpq) {
2504 err = rd_kafka_q_wait_result(tmpq, timeout_ms);
2505 rd_kafka_q_destroy_owner(tmpq);
2506 return err;
2507 }
2508
2509 return RD_KAFKA_RESP_ERR_NO_ERROR;
2510}
2511
2512
2513
2514static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
2515 int timeout_ms,
2516 rd_kafka_message_t **rkmessages,
2517 size_t rkmessages_size) {
2518 /* Populate application's rkmessages array. */
2519 return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
2520 rkmessages, rkmessages_size);
2521}
2522
2523
2524ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
2525 int timeout_ms,
2526 rd_kafka_message_t **rkmessages,
2527 size_t rkmessages_size) {
2528 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2529 shptr_rd_kafka_toppar_t *s_rktp;
2530 rd_kafka_toppar_t *rktp;
2531 ssize_t cnt;
2532
2533 /* Get toppar */
2534 rd_kafka_topic_rdlock(rkt);
2535 s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
2536 if (unlikely(!s_rktp))
2537 s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
2538 rd_kafka_topic_rdunlock(rkt);
2539
2540 if (unlikely(!s_rktp)) {
2541 /* No such toppar known */
2542 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2543 ESRCH);
2544 return -1;
2545 }
2546
2547 rktp = rd_kafka_toppar_s2i(s_rktp);
2548
2549 /* Populate application's rkmessages array. */
2550 cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
2551 rkmessages, rkmessages_size);
2552
2553 rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
2554
2555 rd_kafka_set_last_error(0, 0);
2556
2557 return cnt;
2558}
2559
2560ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
2561 int timeout_ms,
2562 rd_kafka_message_t **rkmessages,
2563 size_t rkmessages_size) {
2564 /* Populate application's rkmessages array. */
2565 return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
2566 rkmessages, rkmessages_size);
2567}
2568
2569
2570struct consume_ctx {
2571 void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
2572 void *opaque;
2573};
2574
2575
2576/**
2577 * Trampoline for application's consume_cb()
2578 */
2579static rd_kafka_op_res_t
2580rd_kafka_consume_cb (rd_kafka_t *rk,
2581 rd_kafka_q_t *rkq,
2582 rd_kafka_op_t *rko,
2583 rd_kafka_q_cb_type_t cb_type, void *opaque) {
2584 struct consume_ctx *ctx = opaque;
2585 rd_kafka_message_t *rkmessage;
2586
2587 if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
2588 rd_kafka_op_destroy(rko);
2589 return RD_KAFKA_OP_RES_HANDLED;
2590 }
2591
2592 rkmessage = rd_kafka_message_get(rko);
2593
2594 rd_kafka_op_offset_store(rk, rko, rkmessage);
2595
2596 ctx->consume_cb(rkmessage, ctx->opaque);
2597
2598 rd_kafka_op_destroy(rko);
2599
2600 return RD_KAFKA_OP_RES_HANDLED;
2601}
2602
2603
2604
2605static rd_kafka_op_res_t
2606rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
2607 void (*consume_cb) (rd_kafka_message_t
2608 *rkmessage,
2609 void *opaque),
2610 void *opaque) {
2611 struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
2612 rd_kafka_op_res_t res;
2613
2614 if (timeout_ms)
2615 rd_kafka_app_poll_blocking(rkq->rkq_rk);
2616
2617 res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
2618 RD_KAFKA_Q_CB_RETURN,
2619 rd_kafka_consume_cb, &ctx);
2620
2621 rd_kafka_app_polled(rkq->rkq_rk);
2622
2623 return res;
2624}
2625
2626
2627int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
2628 int timeout_ms,
2629 void (*consume_cb) (rd_kafka_message_t
2630 *rkmessage,
2631 void *opaque),
2632 void *opaque) {
2633 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2634 shptr_rd_kafka_toppar_t *s_rktp;
2635 rd_kafka_toppar_t *rktp;
2636 int r;
2637
2638 /* Get toppar */
2639 rd_kafka_topic_rdlock(rkt);
2640 s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
2641 if (unlikely(!s_rktp))
2642 s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
2643 rd_kafka_topic_rdunlock(rkt);
2644
2645 if (unlikely(!s_rktp)) {
2646 /* No such toppar known */
2647 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2648 ESRCH);
2649 return -1;
2650 }
2651
2652 rktp = rd_kafka_toppar_s2i(s_rktp);
2653 r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
2654 rkt->rkt_conf.consume_callback_max_msgs,
2655 consume_cb, opaque);
2656
2657 rd_kafka_toppar_destroy(s_rktp);
2658
2659 rd_kafka_set_last_error(0, 0);
2660
2661 return r;
2662}
2663
2664
2665
2666int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
2667 int timeout_ms,
2668 void (*consume_cb) (rd_kafka_message_t
2669 *rkmessage,
2670 void *opaque),
2671 void *opaque) {
2672 return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
2673 consume_cb, opaque);
2674}
2675
2676
2677/**
2678 * Serve queue 'rkq' and return one message.
2679 * By serving the queue it will also call any registered callbacks
2680 * registered for matching events, this includes consumer_cb()
2681 * in which case no message will be returned.
2682 */
2683static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
2684 rd_kafka_q_t *rkq,
2685 int timeout_ms) {
2686 rd_kafka_op_t *rko;
2687 rd_kafka_message_t *rkmessage = NULL;
2688 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2689
2690 if (timeout_ms)
2691 rd_kafka_app_poll_blocking(rk);
2692
2693 rd_kafka_yield_thread = 0;
2694 while ((rko = rd_kafka_q_pop(rkq,
2695 rd_timeout_remains(abs_timeout), 0))) {
2696 rd_kafka_op_res_t res;
2697
2698 res = rd_kafka_poll_cb(rk, rkq, rko,
2699 RD_KAFKA_Q_CB_RETURN, NULL);
2700
2701 if (res == RD_KAFKA_OP_RES_PASS)
2702 break;
2703
2704 if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
2705 rd_kafka_yield_thread)) {
2706 /* Callback called rd_kafka_yield(), we must
2707 * stop dispatching the queue and return. */
2708 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
2709 EINTR);
2710 rd_kafka_app_polled(rk);
2711 return NULL;
2712 }
2713
2714 /* Message was handled by callback. */
2715 continue;
2716 }
2717
2718 if (!rko) {
2719 /* Timeout reached with no op returned. */
2720 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
2721 ETIMEDOUT);
2722 rd_kafka_app_polled(rk);
2723 return NULL;
2724 }
2725
2726 rd_kafka_assert(rk,
2727 rko->rko_type == RD_KAFKA_OP_FETCH ||
2728 rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
2729
2730 /* Get rkmessage from rko */
2731 rkmessage = rd_kafka_message_get(rko);
2732
2733 /* Store offset */
2734 rd_kafka_op_offset_store(rk, rko, rkmessage);
2735
2736 rd_kafka_set_last_error(0, 0);
2737
2738 rd_kafka_app_polled(rk);
2739
2740 return rkmessage;
2741}
2742
2743rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
2744 int32_t partition,
2745 int timeout_ms) {
2746 rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
2747 shptr_rd_kafka_toppar_t *s_rktp;
2748 rd_kafka_toppar_t *rktp;
2749 rd_kafka_message_t *rkmessage;
2750
2751 rd_kafka_topic_rdlock(rkt);
2752 s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
2753 if (unlikely(!s_rktp))
2754 s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
2755 rd_kafka_topic_rdunlock(rkt);
2756
2757 if (unlikely(!s_rktp)) {
2758 /* No such toppar known */
2759 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2760 ESRCH);
2761 return NULL;
2762 }
2763
2764 rktp = rd_kafka_toppar_s2i(s_rktp);
2765 rkmessage = rd_kafka_consume0(rkt->rkt_rk,
2766 rktp->rktp_fetchq, timeout_ms);
2767
2768 rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
2769
2770 return rkmessage;
2771}
2772
2773
2774rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
2775 int timeout_ms) {
2776 return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
2777}
2778
2779
2780
2781
2782rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
2783 rd_kafka_cgrp_t *rkcg;
2784
2785 if (!(rkcg = rd_kafka_cgrp_get(rk)))
2786 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
2787
2788 rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
2789 return RD_KAFKA_RESP_ERR_NO_ERROR;
2790}
2791
2792
2793
2794
2795rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
2796 int timeout_ms) {
2797 rd_kafka_cgrp_t *rkcg;
2798
2799 if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
2800 rd_kafka_message_t *rkmessage = rd_kafka_message_new();
2801 rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
2802 return rkmessage;
2803 }
2804
2805 return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
2806}
2807
2808
2809rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
2810 rd_kafka_cgrp_t *rkcg;
2811 rd_kafka_op_t *rko;
2812 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
2813 rd_kafka_q_t *rkq;
2814
2815 if (!(rkcg = rd_kafka_cgrp_get(rk)))
2816 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
2817
2818 rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Closing consumer");
2819
2820 /* Redirect cgrp queue to our temporary queue to make sure
2821 * all posted ops (e.g., rebalance callbacks) are served by
2822 * this function. */
2823 rkq = rd_kafka_q_new(rk);
2824 rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
2825
2826 rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
2827
2828 /* Disable the queue if termination is immediate or the user
2829 * does not want the blocking consumer_close() behaviour, this will
2830 * cause any ops posted for this queue (such as rebalance) to
2831 * be destroyed.
2832 */
2833 if (rd_kafka_destroy_flags_no_consumer_close(rk)) {
2834 rd_kafka_dbg(rk, CONSUMER, "CLOSE",
2835 "Disabling and purging temporary queue to quench "
2836 "close events");
2837 rd_kafka_q_disable(rkq);
2838 /* Purge ops already enqueued */
2839 rd_kafka_q_purge(rkq);
2840 } else {
2841 rd_kafka_dbg(rk, CONSUMER, "CLOSE",
2842 "Waiting for close events");
2843 while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
2844 rd_kafka_op_res_t res;
2845 if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
2846 RD_KAFKA_OP_TERMINATE) {
2847 err = rko->rko_err;
2848 rd_kafka_op_destroy(rko);
2849 break;
2850 }
2851 res = rd_kafka_poll_cb(rk, rkq, rko,
2852 RD_KAFKA_Q_CB_RETURN, NULL);
2853 if (res == RD_KAFKA_OP_RES_PASS)
2854 rd_kafka_op_destroy(rko);
2855 /* Ignore YIELD, we need to finish */
2856 }
2857 }
2858
2859 rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
2860
2861 rd_kafka_q_destroy_owner(rkq);
2862
2863 rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Consumer closed");
2864
2865 return err;
2866}
2867
2868
2869
2870rd_kafka_resp_err_t
2871rd_kafka_committed (rd_kafka_t *rk,
2872 rd_kafka_topic_partition_list_t *partitions,
2873 int timeout_ms) {
2874 rd_kafka_q_t *rkq;
2875 rd_kafka_resp_err_t err;
2876 rd_kafka_cgrp_t *rkcg;
2877 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2878
2879 if (!partitions)
2880 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2881
2882 if (!(rkcg = rd_kafka_cgrp_get(rk)))
2883 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
2884
2885 /* Set default offsets. */
2886 rd_kafka_topic_partition_list_reset_offsets(partitions,
2887 RD_KAFKA_OFFSET_INVALID);
2888
2889 rkq = rd_kafka_q_new(rk);
2890
2891 do {
2892 rd_kafka_op_t *rko;
2893 int state_version = rd_kafka_brokers_get_state_version(rk);
2894
2895 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
2896 rd_kafka_op_set_replyq(rko, rkq, NULL);
2897
2898 /* Issue #827
2899 * Copy partition list to avoid use-after-free if we time out
2900 * here, the app frees the list, and then cgrp starts
2901 * processing the op. */
2902 rko->rko_u.offset_fetch.partitions =
2903 rd_kafka_topic_partition_list_copy(partitions);
2904 rko->rko_u.offset_fetch.do_free = 1;
2905
2906 if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
2907 err = RD_KAFKA_RESP_ERR__DESTROY;
2908 break;
2909 }
2910
2911 rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0);
2912 if (rko) {
2913 if (!(err = rko->rko_err))
2914 rd_kafka_topic_partition_list_update(
2915 partitions,
2916 rko->rko_u.offset_fetch.partitions);
2917 else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
2918 err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
2919 !rd_kafka_brokers_wait_state_change(
2920 rk, state_version,
2921 rd_timeout_remains(abs_timeout)))
2922 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
2923
2924 rd_kafka_op_destroy(rko);
2925 } else
2926 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
2927 } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
2928 err == RD_KAFKA_RESP_ERR__WAIT_COORD);
2929
2930 rd_kafka_q_destroy_owner(rkq);
2931
2932 return err;
2933}
2934
2935
2936
2937rd_kafka_resp_err_t
2938rd_kafka_position (rd_kafka_t *rk,
2939 rd_kafka_topic_partition_list_t *partitions) {
2940 int i;
2941
2942 /* Set default offsets. */
2943 rd_kafka_topic_partition_list_reset_offsets(partitions,
2944 RD_KAFKA_OFFSET_INVALID);
2945
2946 for (i = 0 ; i < partitions->cnt ; i++) {
2947 rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
2948 shptr_rd_kafka_toppar_t *s_rktp;
2949 rd_kafka_toppar_t *rktp;
2950
2951 if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
2952 rktpar->partition, 0, 1))) {
2953 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2954 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2955 continue;
2956 }
2957
2958 rktp = rd_kafka_toppar_s2i(s_rktp);
2959 rd_kafka_toppar_lock(rktp);
2960 rktpar->offset = rktp->rktp_app_offset;
2961 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2962 rd_kafka_toppar_unlock(rktp);
2963 rd_kafka_toppar_destroy(s_rktp);
2964 }
2965
2966 return RD_KAFKA_RESP_ERR_NO_ERROR;
2967}
2968
2969
2970
2971struct _query_wmark_offsets_state {
2972 rd_kafka_resp_err_t err;
2973 const char *topic;
2974 int32_t partition;
2975 int64_t offsets[2];
2976 int offidx; /* next offset to set from response */
2977 rd_ts_t ts_end;
2978 int state_version; /* Broker state version */
2979};
2980
2981static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
2982 rd_kafka_broker_t *rkb,
2983 rd_kafka_resp_err_t err,
2984 rd_kafka_buf_t *rkbuf,
2985 rd_kafka_buf_t *request,
2986 void *opaque) {
2987 struct _query_wmark_offsets_state *state;
2988 rd_kafka_topic_partition_list_t *offsets;
2989 rd_kafka_topic_partition_t *rktpar;
2990
2991 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
2992 /* 'state' has gone out of scope when query_watermark..()
2993 * timed out and returned to the caller. */
2994 return;
2995 }
2996
2997 state = opaque;
2998
2999 offsets = rd_kafka_topic_partition_list_new(1);
3000 err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
3001 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
3002 rd_kafka_topic_partition_list_destroy(offsets);
3003 return; /* Retrying */
3004 }
3005
3006 /* Retry if no broker connection is available yet. */
3007 if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
3008 err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
3009 rkb &&
3010 rd_kafka_brokers_wait_state_change(
3011 rkb->rkb_rk, state->state_version,
3012 rd_timeout_remains(state->ts_end))) {
3013 /* Retry */
3014 state->state_version = rd_kafka_brokers_get_state_version(rk);
3015 request->rkbuf_retries = 0;
3016 if (rd_kafka_buf_retry(rkb, request)) {
3017 rd_kafka_topic_partition_list_destroy(offsets);
3018 return; /* Retry in progress */
3019 }
3020 /* FALLTHRU */
3021 }
3022
3023 /* Partition not seen in response. */
3024 if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
3025 state->topic,
3026 state->partition)))
3027 err = RD_KAFKA_RESP_ERR__BAD_MSG;
3028 else if (rktpar->err)
3029 err = rktpar->err;
3030 else
3031 state->offsets[state->offidx] = rktpar->offset;
3032
3033 state->offidx++;
3034
3035 if (err || state->offidx == 2) /* Error or Done */
3036 state->err = err;
3037
3038 rd_kafka_topic_partition_list_destroy(offsets);
3039}
3040
3041
3042rd_kafka_resp_err_t
3043rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
3044 int32_t partition,
3045 int64_t *low, int64_t *high, int timeout_ms) {
3046 rd_kafka_q_t *rkq;
3047 struct _query_wmark_offsets_state state;
3048 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3049 rd_kafka_topic_partition_list_t *partitions;
3050 rd_kafka_topic_partition_t *rktpar;
3051 struct rd_kafka_partition_leader *leader;
3052 rd_list_t leaders;
3053 rd_kafka_resp_err_t err;
3054
3055 partitions = rd_kafka_topic_partition_list_new(1);
3056 rktpar = rd_kafka_topic_partition_list_add(partitions,
3057 topic, partition);
3058
3059 rd_list_init(&leaders, partitions->cnt,
3060 (void *)rd_kafka_partition_leader_destroy);
3061
3062 err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
3063 &leaders, timeout_ms);
3064 if (err) {
3065 rd_list_destroy(&leaders);
3066 rd_kafka_topic_partition_list_destroy(partitions);
3067 return err;
3068 }
3069
3070 leader = rd_list_elem(&leaders, 0);
3071
3072 rkq = rd_kafka_q_new(rk);
3073
3074 /* Due to KAFKA-1588 we need to send a request for each wanted offset,
3075 * in this case one for the low watermark and one for the high. */
3076 state.topic = topic;
3077 state.partition = partition;
3078 state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
3079 state.offsets[1] = RD_KAFKA_OFFSET_END;
3080 state.offidx = 0;
3081 state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3082 state.ts_end = ts_end;
3083 state.state_version = rd_kafka_brokers_get_state_version(rk);
3084
3085
3086 rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
3087 rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
3088 RD_KAFKA_REPLYQ(rkq, 0),
3089 rd_kafka_query_wmark_offsets_resp_cb,
3090 &state);
3091
3092 rktpar->offset = RD_KAFKA_OFFSET_END;
3093 rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
3094 RD_KAFKA_REPLYQ(rkq, 0),
3095 rd_kafka_query_wmark_offsets_resp_cb,
3096 &state);
3097
3098 rd_kafka_topic_partition_list_destroy(partitions);
3099 rd_list_destroy(&leaders);
3100
3101 /* Wait for reply (or timeout) */
3102 while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
3103 rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
3104 rd_kafka_poll_cb, NULL) !=
3105 RD_KAFKA_OP_RES_YIELD)
3106 ;
3107
3108 rd_kafka_q_destroy_owner(rkq);
3109
3110 if (state.err)
3111 return state.err;
3112 else if (state.offidx != 2)
3113 return RD_KAFKA_RESP_ERR__FAIL;
3114
3115 /* We are not certain about the returned order. */
3116 if (state.offsets[0] < state.offsets[1]) {
3117 *low = state.offsets[0];
3118 *high = state.offsets[1];
3119 } else {
3120 *low = state.offsets[1];
3121 *high = state.offsets[0];
3122 }
3123
3124 /* If partition is empty only one offset (the last) will be returned. */
3125 if (*low < 0 && *high >= 0)
3126 *low = *high;
3127
3128 return RD_KAFKA_RESP_ERR_NO_ERROR;
3129}
3130
3131
3132rd_kafka_resp_err_t
3133rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
3134 int32_t partition,
3135 int64_t *low, int64_t *high) {
3136 shptr_rd_kafka_toppar_t *s_rktp;
3137 rd_kafka_toppar_t *rktp;
3138
3139 s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
3140 if (!s_rktp)
3141 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3142 rktp = rd_kafka_toppar_s2i(s_rktp);
3143
3144 rd_kafka_toppar_lock(rktp);
3145 *low = rktp->rktp_lo_offset;
3146 *high = rktp->rktp_hi_offset;
3147 rd_kafka_toppar_unlock(rktp);
3148
3149 rd_kafka_toppar_destroy(s_rktp);
3150
3151 return RD_KAFKA_RESP_ERR_NO_ERROR;
3152}
3153
3154
3155/**
3156 * @brief get_offsets_for_times() state
3157 */
3158struct _get_offsets_for_times {
3159 rd_kafka_topic_partition_list_t *results;
3160 rd_kafka_resp_err_t err;
3161 int wait_reply;
3162 int state_version;
3163 rd_ts_t ts_end;
3164};
3165
3166/**
3167 * @brief Handle OffsetRequest responses
3168 */
3169static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
3170 rd_kafka_broker_t *rkb,
3171 rd_kafka_resp_err_t err,
3172 rd_kafka_buf_t *rkbuf,
3173 rd_kafka_buf_t *request,
3174 void *opaque) {
3175 struct _get_offsets_for_times *state;
3176
3177 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3178 /* 'state' has gone out of scope when offsets_for_times()
3179 * timed out and returned to the caller. */
3180 return;
3181 }
3182
3183 state = opaque;
3184
3185 err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
3186 state->results);
3187 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
3188 return; /* Retrying */
3189
3190 /* Retry if no broker connection is available yet. */
3191 if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
3192 err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
3193 rkb &&
3194 rd_kafka_brokers_wait_state_change(
3195 rkb->rkb_rk, state->state_version,
3196 rd_timeout_remains(state->ts_end))) {
3197 /* Retry */
3198 state->state_version = rd_kafka_brokers_get_state_version(rk);
3199 request->rkbuf_retries = 0;
3200 if (rd_kafka_buf_retry(rkb, request))
3201 return; /* Retry in progress */
3202 /* FALLTHRU */
3203 }
3204
3205 if (err && !state->err)
3206 state->err = err;
3207
3208 state->wait_reply--;
3209}
3210
3211
3212rd_kafka_resp_err_t
3213rd_kafka_offsets_for_times (rd_kafka_t *rk,
3214 rd_kafka_topic_partition_list_t *offsets,
3215 int timeout_ms) {
3216 rd_kafka_q_t *rkq;
3217 struct _get_offsets_for_times state = RD_ZERO_INIT;
3218 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3219 rd_list_t leaders;
3220 int i;
3221 rd_kafka_resp_err_t err;
3222 struct rd_kafka_partition_leader *leader;
3223 int tmout;
3224
3225 if (offsets->cnt == 0)
3226 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3227
3228 rd_list_init(&leaders, offsets->cnt,
3229 (void *)rd_kafka_partition_leader_destroy);
3230
3231 err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
3232 timeout_ms);
3233 if (err) {
3234 rd_list_destroy(&leaders);
3235 return err;
3236 }
3237
3238
3239 rkq = rd_kafka_q_new(rk);
3240
3241 state.wait_reply = 0;
3242 state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
3243
3244 /* For each leader send a request for its partitions */
3245 RD_LIST_FOREACH(leader, &leaders, i) {
3246 state.wait_reply++;
3247 rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
3248 RD_KAFKA_REPLYQ(rkq, 0),
3249 rd_kafka_get_offsets_for_times_resp_cb,
3250 &state);
3251 }
3252
3253 rd_list_destroy(&leaders);
3254
3255 /* Wait for reply (or timeout) */
3256 while (state.wait_reply > 0 &&
3257 !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
3258 rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
3259 rd_kafka_poll_cb, NULL);
3260
3261 rd_kafka_q_destroy_owner(rkq);
3262
3263 if (state.wait_reply > 0 && !state.err)
3264 state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3265
3266 /* Then update the queried partitions. */
3267 if (!state.err)
3268 rd_kafka_topic_partition_list_update(offsets, state.results);
3269
3270 rd_kafka_topic_partition_list_destroy(state.results);
3271
3272 return state.err;
3273}
3274
3275
3276/**
3277 * @brief rd_kafka_poll() (and similar) op callback handler.
3278 * Will either call registered callback depending on cb_type and op type
3279 * or return op to application, if applicable (e.g., fetch message).
3280 *
3281 * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the
3282 * other res types (such as OP_RES_PASS).
3283 *
3284 * @locality application thread
3285 */
3286rd_kafka_op_res_t
3287rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
3288 rd_kafka_q_cb_type_t cb_type, void *opaque) {
3289 rd_kafka_msg_t *rkm;
3290 rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED;
3291
3292 /* Special handling for events based on cb_type */
3293 if (cb_type == RD_KAFKA_Q_CB_EVENT &&
3294 rd_kafka_event_setup(rk, rko)) {
3295 /* Return-as-event requested. */
3296 return RD_KAFKA_OP_RES_PASS; /* Return as event */
3297 }
3298
3299 switch ((int)rko->rko_type)
3300 {
3301 case RD_KAFKA_OP_FETCH:
3302 if (!rk->rk_conf.consume_cb ||
3303 cb_type == RD_KAFKA_Q_CB_RETURN ||
3304 cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3305 return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3306 else {
3307 struct consume_ctx ctx = {
3308 .consume_cb = rk->rk_conf.consume_cb,
3309 .opaque = rk->rk_conf.opaque };
3310
3311 return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
3312 }
3313 break;
3314
3315 case RD_KAFKA_OP_REBALANCE:
3316 /* If EVENT_REBALANCE is enabled but rebalance_cb isnt
3317 * we need to perform a dummy assign for the application.
3318 * This might happen during termination with consumer_close() */
3319 if (rk->rk_conf.rebalance_cb)
3320 rk->rk_conf.rebalance_cb(
3321 rk, rko->rko_err,
3322 rko->rko_u.rebalance.partitions,
3323 rk->rk_conf.opaque);
3324 else {
3325 rd_kafka_dbg(rk, CGRP, "UNASSIGN",
3326 "Forcing unassign of %d partition(s)",
3327 rko->rko_u.rebalance.partitions ?
3328 rko->rko_u.rebalance.partitions->cnt : 0);
3329 rd_kafka_assign(rk, NULL);
3330 }
3331 break;
3332
3333 case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
3334 if (!rko->rko_u.offset_commit.cb)
3335 return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3336 rko->rko_u.offset_commit.cb(
3337 rk, rko->rko_err,
3338 rko->rko_u.offset_commit.partitions,
3339 rko->rko_u.offset_commit.opaque);
3340 break;
3341
3342 case RD_KAFKA_OP_CONSUMER_ERR:
3343 /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
3344 * Consumer errors are returned to the application
3345 * as rkmessages, not error callbacks.
3346 *
3347 * rd_kafka_poll() (_Q_CB_GLOBAL):
3348 * convert to ERR op (fallthru)
3349 */
3350 if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3351 cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
3352 /* return as message_t to application */
3353 return RD_KAFKA_OP_RES_PASS;
3354 }
3355 /* FALLTHRU */
3356
3357 case RD_KAFKA_OP_ERR:
3358 if (rk->rk_conf.error_cb)
3359 rk->rk_conf.error_cb(rk, rko->rko_err,
3360 rko->rko_u.err.errstr,
3361 rk->rk_conf.opaque);
3362 else {
3363 /* If error string already contains
3364 * the err2str then skip including err2str in
3365 * the printout */
3366 if (rko->rko_u.err.errstr &&
3367 strstr(rko->rko_u.err.errstr,
3368 rd_kafka_err2str(rko->rko_err)))
3369 rd_kafka_log(rk, LOG_ERR, "ERROR",
3370 "%s: %s",
3371 rk->rk_name,
3372 rko->rko_u.err.errstr);
3373 else
3374 rd_kafka_log(rk, LOG_ERR, "ERROR",
3375 "%s: %s: %s",
3376 rk->rk_name,
3377 rko->rko_u.err.errstr,
3378 rd_kafka_err2str(rko->rko_err));
3379 }
3380 break;
3381
3382 case RD_KAFKA_OP_DR:
3383 /* Delivery report:
3384 * call application DR callback for each message. */
3385 while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
3386 rd_kafka_message_t *rkmessage;
3387
3388 TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
3389 rkm, rkm_link);
3390
3391 rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
3392
3393 if (rk->rk_conf.dr_msg_cb) {
3394 rk->rk_conf.dr_msg_cb(rk, rkmessage,
3395 rk->rk_conf.opaque);
3396
3397 } else {
3398
3399 rk->rk_conf.dr_cb(rk,
3400 rkmessage->payload,
3401 rkmessage->len,
3402 rkmessage->err,
3403 rk->rk_conf.opaque,
3404 rkmessage->_private);
3405 }
3406
3407 rd_kafka_msg_destroy(rk, rkm);
3408
3409 if (unlikely(rd_kafka_yield_thread)) {
3410 /* Callback called yield(),
3411 * re-enqueue the op (if there are any
3412 * remaining messages). */
3413 if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
3414 rkmq_msgs))
3415 rd_kafka_q_reenq(rkq, rko);
3416 else
3417 rd_kafka_op_destroy(rko);
3418 return RD_KAFKA_OP_RES_YIELD;
3419 }
3420 }
3421
3422 rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
3423
3424 break;
3425
3426 case RD_KAFKA_OP_THROTTLE:
3427 if (rk->rk_conf.throttle_cb)
3428 rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename,
3429 rko->rko_u.throttle.nodeid,
3430 rko->rko_u.throttle.
3431 throttle_time,
3432 rk->rk_conf.opaque);
3433 break;
3434
3435 case RD_KAFKA_OP_STATS:
3436 /* Statistics */
3437 if (rk->rk_conf.stats_cb &&
3438 rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
3439 rko->rko_u.stats.json_len,
3440 rk->rk_conf.opaque) == 1)
3441 rko->rko_u.stats.json = NULL; /* Application wanted json ptr */
3442 break;
3443
3444 case RD_KAFKA_OP_LOG:
3445 if (likely(rk->rk_conf.log_cb &&
3446 rk->rk_conf.log_level >= rko->rko_u.log.level))
3447 rk->rk_conf.log_cb(rk,
3448 rko->rko_u.log.level,
3449 rko->rko_u.log.fac,
3450 rko->rko_u.log.str);
3451 break;
3452
3453 case RD_KAFKA_OP_TERMINATE:
3454 /* nop: just a wake-up */
3455 break;
3456
3457 case RD_KAFKA_OP_CREATETOPICS:
3458 case RD_KAFKA_OP_DELETETOPICS:
3459 case RD_KAFKA_OP_CREATEPARTITIONS:
3460 case RD_KAFKA_OP_ALTERCONFIGS:
3461 case RD_KAFKA_OP_DESCRIBECONFIGS:
3462 /* Calls op_destroy() from worker callback,
3463 * when the time comes. */
3464 res = rd_kafka_op_call(rk, rkq, rko);
3465 break;
3466
3467 case RD_KAFKA_OP_ADMIN_RESULT:
3468 if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3469 cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3470 return RD_KAFKA_OP_RES_PASS; /* Don't handle here */
3471
3472 /* Op is silently destroyed below */
3473 break;
3474
3475 default:
3476 rd_kafka_assert(rk, !*"cant handle op type");
3477 break;
3478 }
3479
3480 if (res == RD_KAFKA_OP_RES_HANDLED)
3481 rd_kafka_op_destroy(rko);
3482
3483 return res;
3484}
3485
3486int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
3487 int r;
3488
3489 if (timeout_ms)
3490 rd_kafka_app_poll_blocking(rk);
3491
3492 r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
3493 RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
3494
3495 rd_kafka_app_polled(rk);
3496
3497 return r;
3498}
3499
3500
3501rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
3502 rd_kafka_op_t *rko;
3503
3504 if (timeout_ms)
3505 rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
3506
3507 rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, timeout_ms, 0,
3508 RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
3509
3510 rd_kafka_app_polled(rkqu->rkqu_rk);
3511
3512 if (!rko)
3513 return NULL;
3514
3515 return rko;
3516}
3517
3518int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) {
3519 int r;
3520
3521 if (timeout_ms)
3522 rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
3523
3524 r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
3525 RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
3526
3527 rd_kafka_app_polled(rkqu->rkqu_rk);
3528
3529 return r;
3530}
3531
3532
3533
3534static void rd_kafka_toppar_dump (FILE *fp, const char *indent,
3535 rd_kafka_toppar_t *rktp) {
3536
3537 fprintf(fp, "%s%.*s [%"PRId32"] leader %s\n",
3538 indent,
3539 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3540 rktp->rktp_partition,
3541 rktp->rktp_leader ?
3542 rktp->rktp_leader->rkb_name : "none");
3543 fprintf(fp,
3544 "%s refcnt %i\n"
3545 "%s msgq: %i messages\n"
3546 "%s xmit_msgq: %i messages\n"
3547 "%s total: %"PRIu64" messages, %"PRIu64" bytes\n",
3548 indent, rd_refcnt_get(&rktp->rktp_refcnt),
3549 indent, rktp->rktp_msgq.rkmq_msg_cnt,
3550 indent, rktp->rktp_xmit_msgq.rkmq_msg_cnt,
3551 indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs),
3552 rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes));
3553}
3554
3555static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) {
3556 rd_kafka_toppar_t *rktp;
3557
3558 if (locks)
3559 rd_kafka_broker_lock(rkb);
3560 fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
3561 " in state %s (for %.3fs)\n",
3562 rkb, rkb->rkb_name, rkb->rkb_nodeid,
3563 rd_kafka_broker_state_names[rkb->rkb_state],
3564 rkb->rkb_ts_state ?
3565 (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f :
3566 0.0f);
3567 fprintf(fp, " refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
3568 fprintf(fp, " outbuf_cnt: %i waitresp_cnt: %i\n",
3569 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
3570 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
3571 fprintf(fp,
3572 " %"PRIu64 " messages sent, %"PRIu64" bytes, "
3573 "%"PRIu64" errors, %"PRIu64" timeouts\n"
3574 " %"PRIu64 " messages received, %"PRIu64" bytes, "
3575 "%"PRIu64" errors\n"
3576 " %"PRIu64 " messageset transmissions were retried\n",
3577 rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes),
3578 rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts),
3579 rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes),
3580 rd_atomic64_get(&rkb->rkb_c.rx_err),
3581 rd_atomic64_get(&rkb->rkb_c.tx_retries));
3582
3583 fprintf(fp, " %i toppars:\n", rkb->rkb_toppar_cnt);
3584 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
3585 rd_kafka_toppar_dump(fp, " ", rktp);
3586 if (locks) {
3587 rd_kafka_broker_unlock(rkb);
3588 }
3589}
3590
3591
3592static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
3593 rd_kafka_broker_t *rkb;
3594 rd_kafka_itopic_t *rkt;
3595 rd_kafka_toppar_t *rktp;
3596 shptr_rd_kafka_toppar_t *s_rktp;
3597 int i;
3598 unsigned int tot_cnt;
3599 size_t tot_size;
3600
3601 rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
3602
3603 if (locks)
3604 rd_kafka_rdlock(rk);
3605#if ENABLE_DEVEL
3606 fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
3607#endif
3608 fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
3609
3610 fprintf(fp, " producer.msg_cnt %u (%"PRIusz" bytes)\n",
3611 tot_cnt, tot_size);
3612 fprintf(fp, " rk_rep reply queue: %i ops\n",
3613 rd_kafka_q_len(rk->rk_rep));
3614
3615 fprintf(fp, " brokers:\n");
3616 if (locks)
3617 mtx_lock(&rk->rk_internal_rkb_lock);
3618 if (rk->rk_internal_rkb)
3619 rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
3620 if (locks)
3621 mtx_unlock(&rk->rk_internal_rkb_lock);
3622
3623 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
3624 rd_kafka_broker_dump(fp, rkb, locks);
3625 }
3626
3627 fprintf(fp, " cgrp:\n");
3628 if (rk->rk_cgrp) {
3629 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
3630 fprintf(fp, " %.*s in state %s, flags 0x%x\n",
3631 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3632 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3633 rkcg->rkcg_flags);
3634 fprintf(fp, " coord_id %"PRId32", broker %s\n",
3635 rkcg->rkcg_coord_id,
3636 rkcg->rkcg_curr_coord ?
3637 rd_kafka_broker_name(rkcg->rkcg_curr_coord):"(none)");
3638
3639 fprintf(fp, " toppars:\n");
3640 RD_LIST_FOREACH(s_rktp, &rkcg->rkcg_toppars, i) {
3641 rktp = rd_kafka_toppar_s2i(s_rktp);
3642 fprintf(fp, " %.*s [%"PRId32"] in state %s\n",
3643 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3644 rktp->rktp_partition,
3645 rd_kafka_fetch_states[rktp->rktp_fetch_state]);
3646 }
3647 }
3648
3649 fprintf(fp, " topics:\n");
3650 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
3651 fprintf(fp, " %.*s with %"PRId32" partitions, state %s, "
3652 "refcnt %i\n",
3653 RD_KAFKAP_STR_PR(rkt->rkt_topic),
3654 rkt->rkt_partition_cnt,
3655 rd_kafka_topic_state_names[rkt->rkt_state],
3656 rd_refcnt_get(&rkt->rkt_refcnt));
3657 if (rkt->rkt_ua)
3658 rd_kafka_toppar_dump(fp, " ",
3659 rd_kafka_toppar_s2i(rkt->rkt_ua));
3660 if (rd_list_empty(&rkt->rkt_desp)) {
3661 fprintf(fp, " desired partitions:");
3662 RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
3663 fprintf(fp, " %"PRId32,
3664 rd_kafka_toppar_s2i(s_rktp)->
3665 rktp_partition);
3666 fprintf(fp, "\n");
3667 }
3668 }
3669
3670 fprintf(fp, "\n");
3671 rd_kafka_metadata_cache_dump(fp, rk);
3672
3673 if (locks)
3674 rd_kafka_rdunlock(rk);
3675}
3676
3677void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
3678
3679 if (rk)
3680 rd_kafka_dump0(fp, rk, 1/*locks*/);
3681
3682#if ENABLE_SHAREDPTR_DEBUG
3683 rd_shared_ptrs_dump();
3684#endif
3685}
3686
3687
3688
3689const char *rd_kafka_name (const rd_kafka_t *rk) {
3690 return rk->rk_name;
3691}
3692
3693rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
3694 return rk->rk_type;
3695}
3696
3697
3698char *rd_kafka_memberid (const rd_kafka_t *rk) {
3699 rd_kafka_op_t *rko;
3700 rd_kafka_cgrp_t *rkcg;
3701 char *memberid;
3702
3703 if (!(rkcg = rd_kafka_cgrp_get(rk)))
3704 return NULL;
3705
3706 rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
3707 if (!rko)
3708 return NULL;
3709 memberid = rko->rko_u.name.str;
3710 rko->rko_u.name.str = NULL;
3711 rd_kafka_op_destroy(rko);
3712
3713 return memberid;
3714}
3715
3716
3717char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) {
3718 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3719
3720 /* ClusterId is returned in Metadata >=V2 responses and
3721 * cached on the rk. If no cached value is available
3722 * it means no metadata has been received yet, or we're
3723 * using a lower protocol version
3724 * (e.g., lack of api.version.request=true). */
3725
3726 while (1) {
3727 int remains_ms;
3728
3729 rd_kafka_rdlock(rk);
3730
3731 if (rk->rk_clusterid) {
3732 /* Cached clusterid available. */
3733 char *ret = rd_strdup(rk->rk_clusterid);
3734 rd_kafka_rdunlock(rk);
3735 return ret;
3736 } else if (rk->rk_ts_metadata > 0) {
3737 /* Metadata received but no clusterid,
3738 * this probably means the broker is too old
3739 * or api.version.request=false. */
3740 rd_kafka_rdunlock(rk);
3741 return NULL;
3742 }
3743
3744 rd_kafka_rdunlock(rk);
3745
3746 /* Wait for up to timeout_ms for a metadata refresh,
3747 * if permitted by application. */
3748 remains_ms = rd_timeout_remains(abs_timeout);
3749 if (rd_timeout_expired(remains_ms))
3750 return NULL;
3751
3752 rd_kafka_metadata_cache_wait_change(rk, remains_ms);
3753 }
3754
3755 return NULL;
3756}
3757
3758
3759int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms) {
3760 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3761
3762 /* ControllerId is returned in Metadata >=V1 responses and
3763 * cached on the rk. If no cached value is available
3764 * it means no metadata has been received yet, or we're
3765 * using a lower protocol version
3766 * (e.g., lack of api.version.request=true). */
3767
3768 while (1) {
3769 int remains_ms;
3770 int version;
3771
3772 version = rd_kafka_brokers_get_state_version(rk);
3773
3774 rd_kafka_rdlock(rk);
3775
3776 if (rk->rk_controllerid != -1) {
3777 /* Cached controllerid available. */
3778 rd_kafka_rdunlock(rk);
3779 return rk->rk_controllerid;
3780 } else if (rk->rk_ts_metadata > 0) {
3781 /* Metadata received but no clusterid,
3782 * this probably means the broker is too old
3783 * or api.version.request=false. */
3784 rd_kafka_rdunlock(rk);
3785 return -1;
3786 }
3787
3788 rd_kafka_rdunlock(rk);
3789
3790 /* Wait for up to timeout_ms for a metadata refresh,
3791 * if permitted by application. */
3792 remains_ms = rd_timeout_remains(abs_timeout);
3793 if (rd_timeout_expired(remains_ms))
3794 return -1;
3795
3796 rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
3797 }
3798
3799 return -1;
3800}
3801
3802
3803void *rd_kafka_opaque (const rd_kafka_t *rk) {
3804 return rk->rk_conf.opaque;
3805}
3806
3807
3808int rd_kafka_outq_len (rd_kafka_t *rk) {
3809 return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) +
3810 (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0);
3811}
3812
3813
3814rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
3815 unsigned int msg_cnt = 0;
3816 int qlen;
3817 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3818 int tmout;
3819
3820 if (rk->rk_type != RD_KAFKA_PRODUCER)
3821 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
3822
3823 rd_kafka_yield_thread = 0;
3824
3825 /* First poll call is non-blocking for the case
3826 * where timeout_ms==RD_POLL_NOWAIT to make sure poll is
3827 * called at least once. */
3828 tmout = RD_POLL_NOWAIT;
3829 do {
3830 rd_kafka_poll(rk, tmout);
3831 } while (((qlen = rd_kafka_q_len(rk->rk_rep)) > 0 ||
3832 (msg_cnt = rd_kafka_curr_msgs_cnt(rk)) > 0) &&
3833 !rd_kafka_yield_thread &&
3834 (tmout = rd_timeout_remains_limit(ts_end, 10)) !=
3835 RD_POLL_NOWAIT);
3836
3837 return qlen + msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT :
3838 RD_KAFKA_RESP_ERR_NO_ERROR;
3839}
3840
3841
3842
3843rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) {
3844 rd_kafka_broker_t *rkb;
3845 rd_kafka_q_t *tmpq = NULL;
3846 int waitcnt = 0;
3847
3848 if (rk->rk_type != RD_KAFKA_PRODUCER)
3849 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
3850
3851 /* Check that future flags are not passed */
3852 if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0)
3853 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3854
3855 /* Nothing to purge */
3856 if (!purge_flags)
3857 return RD_KAFKA_RESP_ERR_NO_ERROR;
3858
3859 /* Set up a reply queue to wait for broker thread signalling
3860 * completion, unless non-blocking. */
3861 if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
3862 tmpq = rd_kafka_q_new(rk);
3863
3864 /* Send purge request to all broker threads */
3865 rd_kafka_rdlock(rk);
3866 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
3867 rd_kafka_broker_purge_queues(rkb, purge_flags,
3868 RD_KAFKA_REPLYQ(tmpq, 0));
3869 waitcnt++;
3870 }
3871 rd_kafka_rdunlock(rk);
3872
3873 /* The internal broker handler may hold unassigned partitions */
3874 mtx_lock(&rk->rk_internal_rkb_lock);
3875 rd_kafka_broker_purge_queues(rk->rk_internal_rkb, purge_flags,
3876 RD_KAFKA_REPLYQ(tmpq, 0));
3877 mtx_unlock(&rk->rk_internal_rkb_lock);
3878 waitcnt++;
3879
3880
3881 if (tmpq) {
3882 /* Wait for responses */
3883 while (waitcnt-- > 0)
3884 rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
3885
3886 rd_kafka_q_destroy_owner(tmpq);
3887 }
3888
3889 /* Purge messages for the UA(-1) partitions (which are not
3890 * handled by a broker thread) */
3891 if (purge_flags & RD_KAFKA_PURGE_F_QUEUE)
3892 rd_kafka_purge_ua_toppar_queues(rk);
3893
3894 return RD_KAFKA_RESP_ERR_NO_ERROR;
3895}
3896
3897
3898
3899
3900/**
3901 * @returns a csv string of purge flags in thread-local storage
3902 */
3903const char *rd_kafka_purge_flags2str (int flags) {
3904 static const char *names[] = { "queue", "inflight", NULL };
3905 static RD_TLS char ret[64];
3906
3907 return rd_flags2str(ret, sizeof(ret), names, flags);
3908}
3909
3910
3911int rd_kafka_version (void) {
3912 return RD_KAFKA_VERSION;
3913}
3914
3915const char *rd_kafka_version_str (void) {
3916 static RD_TLS char ret[128];
3917 size_t of = 0, r;
3918
3919 if (*ret)
3920 return ret;
3921
3922#ifdef LIBRDKAFKA_GIT_VERSION
3923 if (*LIBRDKAFKA_GIT_VERSION) {
3924 of = rd_snprintf(ret, sizeof(ret), "%s",
3925 *LIBRDKAFKA_GIT_VERSION == 'v' ?
3926 LIBRDKAFKA_GIT_VERSION+1 :
3927 LIBRDKAFKA_GIT_VERSION);
3928 if (of > sizeof(ret))
3929 of = sizeof(ret);
3930 }
3931#endif
3932
3933#define _my_sprintf(...) do { \
3934 r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__); \
3935 if (r > sizeof(ret)-of) \
3936 r = sizeof(ret)-of; \
3937 of += r; \
3938 } while(0)
3939
3940 if (of == 0) {
3941 int ver = rd_kafka_version();
3942 int prel = (ver & 0xff);
3943 _my_sprintf("%i.%i.%i",
3944 (ver >> 24) & 0xff,
3945 (ver >> 16) & 0xff,
3946 (ver >> 8) & 0xff);
3947 if (prel != 0xff) {
3948 /* pre-builds below 200 are just running numbers,
3949 * above 200 are RC numbers. */
3950 if (prel <= 200)
3951 _my_sprintf("-pre%d", prel);
3952 else
3953 _my_sprintf("-RC%d", prel - 200);
3954 }
3955 }
3956
3957#if ENABLE_DEVEL
3958 _my_sprintf("-devel");
3959#endif
3960
3961#if ENABLE_SHAREDPTR_DEBUG
3962 _my_sprintf("-shptr");
3963#endif
3964
3965#if WITHOUT_OPTIMIZATION
3966 _my_sprintf("-O0");
3967#endif
3968
3969 return ret;
3970}
3971
3972
3973/**
3974 * Assert trampoline to print some debugging information on crash.
3975 */
3976void
3977RD_NORETURN
3978rd_kafka_crash (const char *file, int line, const char *function,
3979 rd_kafka_t *rk, const char *reason) {
3980 fprintf(stderr, "*** %s:%i:%s: %s ***\n",
3981 file, line, function, reason);
3982 if (rk)
3983 rd_kafka_dump0(stderr, rk, 0/*no locks*/);
3984 abort();
3985}
3986
3987
3988
3989
3990
3991struct list_groups_state {
3992 rd_kafka_q_t *q;
3993 rd_kafka_resp_err_t err;
3994 int wait_cnt;
3995 const char *desired_group;
3996 struct rd_kafka_group_list *grplist;
3997 int grplist_size;
3998};
3999
4000static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk,
4001 rd_kafka_broker_t *rkb,
4002 rd_kafka_resp_err_t err,
4003 rd_kafka_buf_t *reply,
4004 rd_kafka_buf_t *request,
4005 void *opaque) {
4006 struct list_groups_state *state;
4007 const int log_decode_errors = LOG_ERR;
4008 int cnt;
4009
4010 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4011 /* 'state' has gone out of scope due to list_groups()
4012 * timing out and returning. */
4013 return;
4014 }
4015
4016 state = opaque;
4017 state->wait_cnt--;
4018
4019 if (err)
4020 goto err;
4021
4022 rd_kafka_buf_read_i32(reply, &cnt);
4023
4024 while (cnt-- > 0) {
4025 int16_t ErrorCode;
4026 rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
4027 int MemberCnt;
4028 struct rd_kafka_group_info *gi;
4029
4030 if (state->grplist->group_cnt == state->grplist_size) {
4031 /* Grow group array */
4032 state->grplist_size *= 2;
4033 state->grplist->groups =
4034 rd_realloc(state->grplist->groups,
4035 state->grplist_size *
4036 sizeof(*state->grplist->groups));
4037 }
4038
4039 gi = &state->grplist->groups[state->grplist->group_cnt++];
4040 memset(gi, 0, sizeof(*gi));
4041
4042 rd_kafka_buf_read_i16(reply, &ErrorCode);
4043 rd_kafka_buf_read_str(reply, &Group);
4044 rd_kafka_buf_read_str(reply, &GroupState);
4045 rd_kafka_buf_read_str(reply, &ProtoType);
4046 rd_kafka_buf_read_str(reply, &Proto);
4047 rd_kafka_buf_read_i32(reply, &MemberCnt);
4048
4049 if (MemberCnt > 100000) {
4050 err = RD_KAFKA_RESP_ERR__BAD_MSG;
4051 goto err;
4052 }
4053
4054 rd_kafka_broker_lock(rkb);
4055 gi->broker.id = rkb->rkb_nodeid;
4056 gi->broker.host = rd_strdup(rkb->rkb_origname);
4057 gi->broker.port = rkb->rkb_port;
4058 rd_kafka_broker_unlock(rkb);
4059
4060 gi->err = ErrorCode;
4061 gi->group = RD_KAFKAP_STR_DUP(&Group);
4062 gi->state = RD_KAFKAP_STR_DUP(&GroupState);
4063 gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
4064 gi->protocol = RD_KAFKAP_STR_DUP(&Proto);
4065
4066 if (MemberCnt > 0)
4067 gi->members =
4068 rd_malloc(MemberCnt * sizeof(*gi->members));
4069
4070 while (MemberCnt-- > 0) {
4071 rd_kafkap_str_t MemberId, ClientId, ClientHost;
4072 rd_kafkap_bytes_t Meta, Assignment;
4073 struct rd_kafka_group_member_info *mi;
4074
4075 mi = &gi->members[gi->member_cnt++];
4076 memset(mi, 0, sizeof(*mi));
4077
4078 rd_kafka_buf_read_str(reply, &MemberId);
4079 rd_kafka_buf_read_str(reply, &ClientId);
4080 rd_kafka_buf_read_str(reply, &ClientHost);
4081 rd_kafka_buf_read_bytes(reply, &Meta);
4082 rd_kafka_buf_read_bytes(reply, &Assignment);
4083
4084 mi->member_id = RD_KAFKAP_STR_DUP(&MemberId);
4085 mi->client_id = RD_KAFKAP_STR_DUP(&ClientId);
4086 mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);
4087
4088 if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
4089 mi->member_metadata_size = 0;
4090 mi->member_metadata = NULL;
4091 } else {
4092 mi->member_metadata_size =
4093 RD_KAFKAP_BYTES_LEN(&Meta);
4094 mi->member_metadata =
4095 rd_memdup(Meta.data,
4096 mi->member_metadata_size);
4097 }
4098
4099 if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
4100 mi->member_assignment_size = 0;
4101 mi->member_assignment = NULL;
4102 } else {
4103 mi->member_assignment_size =
4104 RD_KAFKAP_BYTES_LEN(&Assignment);
4105 mi->member_assignment =
4106 rd_memdup(Assignment.data,
4107 mi->member_assignment_size);
4108 }
4109 }
4110 }
4111
4112err:
4113 state->err = err;
4114 return;
4115
4116 err_parse:
4117 state->err = reply->rkbuf_err;
4118}
4119
4120static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk,
4121 rd_kafka_broker_t *rkb,
4122 rd_kafka_resp_err_t err,
4123 rd_kafka_buf_t *reply,
4124 rd_kafka_buf_t *request,
4125 void *opaque) {
4126 struct list_groups_state *state;
4127 const int log_decode_errors = LOG_ERR;
4128 int16_t ErrorCode;
4129 char **grps;
4130 int cnt, grpcnt, i = 0;
4131
4132 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4133 /* 'state' is no longer in scope because
4134 * list_groups() timed out and returned to the caller.
4135 * We must not touch anything here but simply return. */
4136 return;
4137 }
4138
4139 state = opaque;
4140
4141 state->wait_cnt--;
4142
4143 if (err)
4144 goto err;
4145
4146 rd_kafka_buf_read_i16(reply, &ErrorCode);
4147 if (ErrorCode) {
4148 err = ErrorCode;
4149 goto err;
4150 }
4151
4152 rd_kafka_buf_read_i32(reply, &cnt);
4153
4154 if (state->desired_group)
4155 grpcnt = 1;
4156 else
4157 grpcnt = cnt;
4158
4159 if (cnt == 0 || grpcnt == 0)
4160 return;
4161
4162 grps = rd_malloc(sizeof(*grps) * grpcnt);
4163
4164 while (cnt-- > 0) {
4165 rd_kafkap_str_t grp, proto;
4166
4167 rd_kafka_buf_read_str(reply, &grp);
4168 rd_kafka_buf_read_str(reply, &proto);
4169
4170 if (state->desired_group &&
4171 rd_kafkap_str_cmp_str(&grp, state->desired_group))
4172 continue;
4173
4174 grps[i++] = RD_KAFKAP_STR_DUP(&grp);
4175
4176 if (i == grpcnt)
4177 break;
4178 }
4179
4180 if (i > 0) {
4181 state->wait_cnt++;
4182 rd_kafka_DescribeGroupsRequest(rkb,
4183 (const char **)grps, i,
4184 RD_KAFKA_REPLYQ(state->q, 0),
4185 rd_kafka_DescribeGroups_resp_cb,
4186 state);
4187
4188 while (i-- > 0)
4189 rd_free(grps[i]);
4190 }
4191
4192
4193 rd_free(grps);
4194
4195err:
4196 state->err = err;
4197 return;
4198
4199 err_parse:
4200 state->err = reply->rkbuf_err;
4201}
4202
4203rd_kafka_resp_err_t
4204rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
4205 const struct rd_kafka_group_list **grplistp,
4206 int timeout_ms) {
4207 rd_kafka_broker_t *rkb;
4208 int rkb_cnt = 0;
4209 struct list_groups_state state = RD_ZERO_INIT;
4210 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
4211 int state_version = rd_kafka_brokers_get_state_version(rk);
4212
4213 /* Wait until metadata has been fetched from cluster so
4214 * that we have a full broker list.
4215 * This state only happens during initial client setup, after that
4216 * there'll always be a cached metadata copy. */
4217 rd_kafka_rdlock(rk);
4218 while (!rk->rk_ts_metadata) {
4219 rd_kafka_rdunlock(rk);
4220
4221 if (!rd_kafka_brokers_wait_state_change(
4222 rk, state_version, rd_timeout_remains(ts_end)))
4223 return RD_KAFKA_RESP_ERR__TIMED_OUT;
4224
4225 rd_kafka_rdlock(rk);
4226 }
4227
4228 state.q = rd_kafka_q_new(rk);
4229 state.desired_group = group;
4230 state.grplist = rd_calloc(1, sizeof(*state.grplist));
4231 state.grplist_size = group ? 1 : 32;
4232
4233 state.grplist->groups = rd_malloc(state.grplist_size *
4234 sizeof(*state.grplist->groups));
4235
4236 /* Query each broker for its list of groups */
4237 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4238 rd_kafka_broker_lock(rkb);
4239 if (rkb->rkb_nodeid == -1) {
4240 rd_kafka_broker_unlock(rkb);
4241 continue;
4242 }
4243
4244 state.wait_cnt++;
4245 rd_kafka_ListGroupsRequest(rkb,
4246 RD_KAFKA_REPLYQ(state.q, 0),
4247 rd_kafka_ListGroups_resp_cb,
4248 &state);
4249
4250 rkb_cnt++;
4251
4252 rd_kafka_broker_unlock(rkb);
4253
4254 }
4255 rd_kafka_rdunlock(rk);
4256
4257 if (rkb_cnt == 0) {
4258 state.err = RD_KAFKA_RESP_ERR__TRANSPORT;
4259
4260 } else {
4261 int remains;
4262
4263 while (state.wait_cnt > 0 &&
4264 !rd_timeout_expired((remains =
4265 rd_timeout_remains(ts_end)))) {
4266 rd_kafka_q_serve(state.q, remains, 0,
4267 RD_KAFKA_Q_CB_CALLBACK,
4268 rd_kafka_poll_cb, NULL);
4269 /* Ignore yields */
4270 }
4271 }
4272
4273 rd_kafka_q_destroy_owner(state.q);
4274
4275 if (state.wait_cnt > 0 && !state.err) {
4276 if (state.grplist->group_cnt == 0)
4277 state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
4278 else {
4279 *grplistp = state.grplist;
4280 return RD_KAFKA_RESP_ERR__PARTIAL;
4281 }
4282 }
4283
4284 if (state.err)
4285 rd_kafka_group_list_destroy(state.grplist);
4286 else
4287 *grplistp = state.grplist;
4288
4289 return state.err;
4290}
4291
4292
4293void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) {
4294 struct rd_kafka_group_list *grplist =
4295 (struct rd_kafka_group_list *)grplist0;
4296
4297 while (grplist->group_cnt-- > 0) {
4298 struct rd_kafka_group_info *gi;
4299 gi = &grplist->groups[grplist->group_cnt];
4300
4301 if (gi->broker.host)
4302 rd_free(gi->broker.host);
4303 if (gi->group)
4304 rd_free(gi->group);
4305 if (gi->state)
4306 rd_free(gi->state);
4307 if (gi->protocol_type)
4308 rd_free(gi->protocol_type);
4309 if (gi->protocol)
4310 rd_free(gi->protocol);
4311
4312 while (gi->member_cnt-- > 0) {
4313 struct rd_kafka_group_member_info *mi;
4314 mi = &gi->members[gi->member_cnt];
4315
4316 if (mi->member_id)
4317 rd_free(mi->member_id);
4318 if (mi->client_id)
4319 rd_free(mi->client_id);
4320 if (mi->client_host)
4321 rd_free(mi->client_host);
4322 if (mi->member_metadata)
4323 rd_free(mi->member_metadata);
4324 if (mi->member_assignment)
4325 rd_free(mi->member_assignment);
4326 }
4327
4328 if (gi->members)
4329 rd_free(gi->members);
4330 }
4331
4332 if (grplist->groups)
4333 rd_free(grplist->groups);
4334
4335 rd_free(grplist);
4336}
4337
4338
4339
4340const char *rd_kafka_get_debug_contexts(void) {
4341 return RD_KAFKA_DEBUG_CONTEXTS;
4342}
4343
4344
4345int rd_kafka_path_is_dir (const char *path) {
4346#ifdef _MSC_VER
4347 struct _stat st;
4348 return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
4349#else
4350 struct stat st;
4351 return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
4352#endif
4353}
4354
4355
4356void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) {
4357 free(ptr);
4358}
4359
4360
4361int rd_kafka_errno (void) {
4362 return errno;
4363}
4364
4365int rd_kafka_unittest (void) {
4366 return rd_unittest();
4367}
4368
4369
4370#if ENABLE_SHAREDPTR_DEBUG
4371struct rd_shptr0_head rd_shared_ptr_debug_list;
4372mtx_t rd_shared_ptr_debug_mtx;
4373
4374void rd_shared_ptrs_dump (void) {
4375 rd_shptr0_t *sptr;
4376
4377 printf("################ Current shared pointers ################\n");
4378 printf("### op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
4379 mtx_lock(&rd_shared_ptr_debug_mtx);
4380 LIST_FOREACH(sptr, &rd_shared_ptr_debug_list, link)
4381 printf("# shptr ((%s*)%p): object %p refcnt %d: at %s:%d\n",
4382 sptr->typename, sptr, sptr->obj,
4383 rd_refcnt_get(sptr->ref), sptr->func, sptr->line);
4384 mtx_unlock(&rd_shared_ptr_debug_mtx);
4385 printf("#########################################################\n");
4386}
4387#endif
4388