| 1 | /* |
| 2 | * librdkafka - Apache Kafka C library |
| 3 | * |
| 4 | * Copyright (c) 2012-2013, Magnus Edenhill |
| 5 | * All rights reserved. |
| 6 | * |
| 7 | * Redistribution and use in source and binary forms, with or without |
| 8 | * modification, are permitted provided that the following conditions are met: |
| 9 | * |
| 10 | * 1. Redistributions of source code must retain the above copyright notice, |
| 11 | * this list of conditions and the following disclaimer. |
| 12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 13 | * this list of conditions and the following disclaimer in the documentation |
| 14 | * and/or other materials provided with the distribution. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| 17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| 20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| 26 | * POSSIBILITY OF SUCH DAMAGE. |
| 27 | */ |
| 28 | |
| 29 | #ifndef _RDKAFKA_INT_H_ |
| 30 | #define _RDKAFKA_INT_H_ |
| 31 | |
| 32 | #ifndef _MSC_VER |
| 33 | #define _GNU_SOURCE /* for strndup() */ |
| 34 | #include <syslog.h> |
| 35 | #else |
| 36 | typedef int mode_t; |
| 37 | #endif |
| 38 | #include <fcntl.h> |
| 39 | |
| 40 | |
| 41 | #include "rdsysqueue.h" |
| 42 | |
| 43 | #include "rdkafka.h" |
| 44 | #include "rd.h" |
| 45 | #include "rdlog.h" |
| 46 | #include "rdtime.h" |
| 47 | #include "rdaddr.h" |
| 48 | #include "rdinterval.h" |
| 49 | #include "rdavg.h" |
| 50 | #include "rdlist.h" |
| 51 | |
| 52 | #if WITH_SSL |
| 53 | #include <openssl/ssl.h> |
| 54 | #endif |
| 55 | |
| 56 | |
| 57 | |
| 58 | |
| 59 | typedef struct rd_kafka_itopic_s rd_kafka_itopic_t; |
| 60 | typedef struct rd_ikafka_s rd_ikafka_t; |
| 61 | |
| 62 | |
| 63 | #define rd_kafka_assert(rk, cond) do { \ |
| 64 | if (unlikely(!(cond))) \ |
| 65 | rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \ |
| 66 | (rk), "assert: " # cond); \ |
| 67 | } while (0) |
| 68 | |
| 69 | |
| 70 | void |
| 71 | RD_NORETURN |
| 72 | rd_kafka_crash (const char *file, int line, const char *function, |
| 73 | rd_kafka_t *rk, const char *reason); |
| 74 | |
| 75 | |
| 76 | /* Forward declarations */ |
| 77 | struct rd_kafka_s; |
| 78 | struct rd_kafka_itopic_s; |
| 79 | struct rd_kafka_msg_s; |
| 80 | struct rd_kafka_broker_s; |
| 81 | struct rd_kafka_toppar_s; |
| 82 | |
| 83 | typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t; |
| 84 | typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t; |
| 85 | |
| 86 | |
| 87 | |
| 88 | #include "rdkafka_op.h" |
| 89 | #include "rdkafka_queue.h" |
| 90 | #include "rdkafka_msg.h" |
| 91 | #include "rdkafka_proto.h" |
| 92 | #include "rdkafka_buf.h" |
| 93 | #include "rdkafka_pattern.h" |
| 94 | #include "rdkafka_conf.h" |
| 95 | #include "rdkafka_transport.h" |
| 96 | #include "rdkafka_timer.h" |
| 97 | #include "rdkafka_assignor.h" |
| 98 | #include "rdkafka_metadata.h" |
| 99 | |
| 100 | |
| 101 | /** |
| 102 | * Protocol level sanity |
| 103 | */ |
| 104 | #define RD_KAFKAP_BROKERS_MAX 10000 |
| 105 | #define RD_KAFKAP_TOPICS_MAX 1000000 |
| 106 | #define RD_KAFKAP_PARTITIONS_MAX 100000 |
| 107 | |
| 108 | |
| 109 | #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) |
| 110 | |
| 111 | |
| 112 | |
| 113 | |
| 114 | /** |
| 115 | * @enum Idempotent Producer state |
| 116 | */ |
| 117 | typedef enum { |
| 118 | RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */ |
| 119 | RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */ |
| 120 | RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */ |
| 121 | RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */ |
| 122 | RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */ |
| 123 | RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding |
| 124 | * ProduceRequests to finish |
| 125 | * before resetting and |
| 126 | * re-requesting a new PID. */ |
| 127 | RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding |
| 128 | * ProduceRequests to finish |
| 129 | * before bumping the current |
| 130 | * epoch. */ |
| 131 | } rd_kafka_idemp_state_t; |
| 132 | |
| 133 | /** |
| 134 | * @returns the idemp_state_t string representation |
| 135 | */ |
| 136 | static RD_UNUSED const char * |
| 137 | rd_kafka_idemp_state2str (rd_kafka_idemp_state_t state) { |
| 138 | static const char *names[] = { |
| 139 | "Init" , |
| 140 | "Terminate" , |
| 141 | "RequestPID" , |
| 142 | "WaitPID" , |
| 143 | "Assigned" , |
| 144 | "DrainReset" , |
| 145 | "DrainBump" |
| 146 | }; |
| 147 | return names[state]; |
| 148 | } |
| 149 | |
| 150 | |
| 151 | |
| 152 | |
| 153 | /** |
| 154 | * Kafka handle, internal representation of the application's rd_kafka_t. |
| 155 | */ |
| 156 | |
| 157 | typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t; |
| 158 | |
| 159 | struct rd_kafka_s { |
| 160 | rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */ |
| 161 | rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */ |
| 162 | |
| 163 | TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers; |
| 164 | rd_list_t rk_broker_by_id; /* Fast id lookups. */ |
| 165 | rd_atomic32_t rk_broker_cnt; |
| 166 | /**< Number of brokers in state >= UP */ |
| 167 | rd_atomic32_t rk_broker_up_cnt; |
| 168 | /**< Number of logical brokers in state >= UP, this is a sub-set |
| 169 | * of rk_broker_up_cnt. */ |
| 170 | rd_atomic32_t rk_logical_broker_up_cnt; |
| 171 | /**< Number of brokers that are down, only includes brokers |
| 172 | * that have had at least one connection attempt. */ |
| 173 | rd_atomic32_t rk_broker_down_cnt; |
| 174 | /**< Logical brokers currently without an address. |
| 175 | * Used for calculating ERR__ALL_BROKERS_DOWN. */ |
| 176 | rd_atomic32_t rk_broker_addrless_cnt; |
| 177 | |
| 178 | mtx_t rk_internal_rkb_lock; |
| 179 | rd_kafka_broker_t *rk_internal_rkb; |
| 180 | |
| 181 | /* Broadcasting of broker state changes to wake up |
| 182 | * functions waiting for a state change. */ |
| 183 | cnd_t rk_broker_state_change_cnd; |
| 184 | mtx_t rk_broker_state_change_lock; |
| 185 | int rk_broker_state_change_version; |
| 186 | /* List of (rd_kafka_enq_once_t*) objects waiting for broker |
| 187 | * state changes. Protected by rk_broker_state_change_lock. */ |
| 188 | rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */ |
| 189 | |
| 190 | TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics; |
| 191 | int rk_topic_cnt; |
| 192 | |
| 193 | struct rd_kafka_cgrp_s *rk_cgrp; |
| 194 | |
| 195 | rd_kafka_conf_t rk_conf; |
| 196 | rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */ |
| 197 | char rk_name[128]; |
| 198 | rd_kafkap_str_t *rk_client_id; |
| 199 | rd_kafkap_str_t *rk_group_id; /* Consumer group id */ |
| 200 | |
| 201 | int rk_flags; |
| 202 | rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_.. |
| 203 | * flags instance |
| 204 | * is being destroyed. |
| 205 | * The value set is the |
| 206 | * destroy flags from |
| 207 | * rd_kafka_destroy*() and |
| 208 | * the two internal flags shown |
| 209 | * below. |
| 210 | * |
| 211 | * Order: |
| 212 | * 1. user_flags | .._F_DESTROY_CALLED |
| 213 | * is set in rd_kafka_destroy*(). |
| 214 | * 2. consumer_close() is called |
| 215 | * for consumers. |
| 216 | * 3. .._F_TERMINATE is set to |
| 217 | * signal all background threads |
| 218 | * to terminate. |
| 219 | */ |
| 220 | |
| 221 | #define RD_KAFKA_DESTROY_F_TERMINATE 0x1 /**< Internal flag to make sure |
| 222 | * rk_terminate is set to non-zero |
| 223 | * value even if user passed |
| 224 | * no destroy flags. */ |
| 225 | #define RD_KAFKA_DESTROY_F_DESTROY_CALLED 0x2 /**< Application has called |
| 226 | * ..destroy*() and we've |
| 227 | * begun the termination |
| 228 | * process. |
| 229 | * This flag is needed to avoid |
| 230 | * rk_terminate from being |
| 231 | * 0 when destroy_flags() |
| 232 | * is called with flags=0 |
| 233 | * and prior to _F_TERMINATE |
| 234 | * has been set. */ |
| 235 | #define RD_KAFKA_DESTROY_F_IMMEDIATE 0x4 /**< Immediate non-blocking |
| 236 | * destruction without waiting |
| 237 | * for all resources |
| 238 | * to be cleaned up. |
| 239 | * WARNING: Memory and resource |
| 240 | * leaks possible. |
| 241 | * This flag automatically sets |
| 242 | * .._NO_CONSUMER_CLOSE. */ |
| 243 | |
| 244 | |
| 245 | rwlock_t rk_lock; |
| 246 | rd_kafka_type_t rk_type; |
| 247 | struct timeval rk_tv_state_change; |
| 248 | |
| 249 | rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application |
| 250 | * consumer_poll() call |
| 251 | * (or equivalent). |
| 252 | * Used to enforce |
| 253 | * max.poll.interval.ms. |
| 254 | * Only relevant for consumer. */ |
| 255 | /* First fatal error. */ |
| 256 | struct { |
| 257 | rd_atomic32_t err; /**< rd_kafka_resp_err_t */ |
| 258 | char *errstr; /**< Protected by rk_lock */ |
| 259 | int cnt; /**< Number of errors raised, only |
| 260 | * the first one is stored. */ |
| 261 | } rk_fatal; |
| 262 | |
| 263 | rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value |
| 264 | * from broker. */ |
| 265 | |
| 266 | /* Locks: rd_kafka_*lock() */ |
| 267 | rd_ts_t rk_ts_metadata; /* Timestamp of most recent |
| 268 | * metadata. */ |
| 269 | |
| 270 | struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */ |
| 271 | rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ |
| 272 | struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ |
| 273 | |
| 274 | char *rk_clusterid; /* ClusterId from metadata */ |
| 275 | int32_t rk_controllerid; /* ControllerId from metadata */ |
| 276 | |
| 277 | /* Simple consumer count: |
| 278 | * >0: Running in legacy / Simple Consumer mode, |
| 279 | * 0: No consumers running |
| 280 | * <0: Running in High level consumer mode */ |
| 281 | rd_atomic32_t rk_simple_cnt; |
| 282 | |
| 283 | /** |
| 284 | * Exactly Once Semantics and Idempotent Producer |
| 285 | * |
| 286 | * @locks rk_lock |
| 287 | */ |
| 288 | struct { |
| 289 | rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer |
| 290 | * state */ |
| 291 | rd_ts_t ts_idemp_state;/**< Last state change */ |
| 292 | rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */ |
| 293 | int epoch_cnt; /**< Number of times pid/epoch changed */ |
| 294 | rd_atomic32_t inflight_toppar_cnt; /**< Current number of |
| 295 | * toppars with inflight |
| 296 | * requests. */ |
| 297 | rd_kafka_timer_t request_pid_tmr; /**< Timer for pid retrieval*/ |
| 298 | |
| 299 | rd_kafkap_str_t *transactional_id; /**< Transactional Id, |
| 300 | * a null string. */ |
| 301 | } rk_eos; |
| 302 | |
| 303 | const rd_kafkap_bytes_t *rk_null_bytes; |
| 304 | |
| 305 | struct { |
| 306 | mtx_t lock; /* Protects acces to this struct */ |
| 307 | cnd_t cnd; /* For waking up blocking injectors */ |
| 308 | unsigned int cnt; /* Current message count */ |
| 309 | size_t size; /* Current message size sum */ |
| 310 | unsigned int max_cnt; /* Max limit */ |
| 311 | size_t max_size; /* Max limit */ |
| 312 | } rk_curr_msgs; |
| 313 | |
| 314 | rd_kafka_timers_t rk_timers; |
| 315 | thrd_t rk_thread; |
| 316 | |
| 317 | int rk_initialized; /**< Will be > 0 when the rd_kafka_t |
| 318 | * instance has been fully initialized. */ |
| 319 | |
| 320 | int rk_init_wait_cnt; /**< Number of background threads that |
| 321 | * need to finish initialization. */ |
| 322 | cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread |
| 323 | * to finish its initialization before |
| 324 | * before rd_kafka_new() returns. */ |
| 325 | mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */ |
| 326 | |
| 327 | /** |
| 328 | * Background thread and queue, |
| 329 | * enabled by setting `background_event_cb()`. |
| 330 | */ |
| 331 | struct { |
| 332 | rd_kafka_q_t *q; /**< Queue served by background thread. */ |
| 333 | thrd_t thread; /**< Background thread. */ |
| 334 | int calling; /**< Indicates whether the event callback |
| 335 | * is being called, reset back to 0 |
| 336 | * when the callback returns. |
| 337 | * This can be used for troubleshooting |
| 338 | * purposes. */ |
| 339 | } rk_background; |
| 340 | |
| 341 | |
| 342 | /* |
| 343 | * Logs, events or actions to rate limit / suppress |
| 344 | */ |
| 345 | struct { |
| 346 | /**< Log: No brokers support Idempotent Producer */ |
| 347 | rd_interval_t no_idemp_brokers; |
| 348 | |
| 349 | /**< Sparse connections: randomly select broker |
| 350 | * to bring up. This interval should allow |
| 351 | * for a previous connection to be established, |
| 352 | * which varies between different environments: |
| 353 | * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000. |
| 354 | */ |
| 355 | rd_interval_t sparse_connect_random; |
| 356 | /**< Lock for sparse_connect_random */ |
| 357 | mtx_t sparse_connect_lock; |
| 358 | } rk_suppress; |
| 359 | |
| 360 | struct { |
| 361 | void *handle; /**< Provider-specific handle struct pointer. |
| 362 | * Typically assigned in provider's .init() */ |
| 363 | } rk_sasl; |
| 364 | }; |
| 365 | |
| 366 | #define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock) |
| 367 | #define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock) |
| 368 | #define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock) |
| 369 | #define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock) |
| 370 | |
| 371 | |
| 372 | /** |
| 373 | * @brief Add \p cnt messages and of total size \p size bytes to the |
| 374 | * internal bookkeeping of current message counts. |
| 375 | * If the total message count or size after add would exceed the |
| 376 | * configured limits \c queue.buffering.max.messages and |
| 377 | * \c queue.buffering.max.kbytes then depending on the value of |
| 378 | * \p block the function either blocks until enough space is available |
| 379 | * if \p block is 1, else immediately returns |
| 380 | * RD_KAFKA_RESP_ERR__QUEUE_FULL. |
| 381 | * |
| 382 | * @param rdmtx If non-null and \p block is set and blocking is to ensue, |
| 383 | * then unlock this mutex for the duration of the blocking |
| 384 | * and then reacquire with a read-lock. |
| 385 | */ |
| 386 | static RD_INLINE RD_UNUSED rd_kafka_resp_err_t |
| 387 | rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size, |
| 388 | int block, rwlock_t *rdlock) { |
| 389 | |
| 390 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
| 391 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
| 392 | |
| 393 | mtx_lock(&rk->rk_curr_msgs.lock); |
| 394 | while (unlikely(rk->rk_curr_msgs.cnt + cnt > |
| 395 | rk->rk_curr_msgs.max_cnt || |
| 396 | (unsigned long long)(rk->rk_curr_msgs.size + size) > |
| 397 | (unsigned long long)rk->rk_curr_msgs.max_size)) { |
| 398 | if (!block) { |
| 399 | mtx_unlock(&rk->rk_curr_msgs.lock); |
| 400 | return RD_KAFKA_RESP_ERR__QUEUE_FULL; |
| 401 | } |
| 402 | |
| 403 | if (rdlock) |
| 404 | rwlock_rdunlock(rdlock); |
| 405 | |
| 406 | cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock); |
| 407 | |
| 408 | if (rdlock) |
| 409 | rwlock_rdlock(rdlock); |
| 410 | |
| 411 | } |
| 412 | |
| 413 | rk->rk_curr_msgs.cnt += cnt; |
| 414 | rk->rk_curr_msgs.size += size; |
| 415 | mtx_unlock(&rk->rk_curr_msgs.lock); |
| 416 | |
| 417 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
| 418 | } |
| 419 | |
| 420 | |
| 421 | /** |
| 422 | * @brief Subtract \p cnt messages of total size \p size from the |
| 423 | * current bookkeeping and broadcast a wakeup on the condvar |
| 424 | * for any waiting & blocking threads. |
| 425 | */ |
| 426 | static RD_INLINE RD_UNUSED void |
| 427 | rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) { |
| 428 | int broadcast = 0; |
| 429 | |
| 430 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
| 431 | return; |
| 432 | |
| 433 | mtx_lock(&rk->rk_curr_msgs.lock); |
| 434 | rd_kafka_assert(NULL, |
| 435 | rk->rk_curr_msgs.cnt >= cnt && |
| 436 | rk->rk_curr_msgs.size >= size); |
| 437 | |
| 438 | /* If the subtraction would pass one of the thresholds |
| 439 | * broadcast a wake-up to any waiting listeners. */ |
| 440 | if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt && |
| 441 | rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) || |
| 442 | (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size && |
| 443 | rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size)) |
| 444 | broadcast = 1; |
| 445 | |
| 446 | rk->rk_curr_msgs.cnt -= cnt; |
| 447 | rk->rk_curr_msgs.size -= size; |
| 448 | |
| 449 | if (unlikely(broadcast)) |
| 450 | cnd_broadcast(&rk->rk_curr_msgs.cnd); |
| 451 | |
| 452 | mtx_unlock(&rk->rk_curr_msgs.lock); |
| 453 | } |
| 454 | |
| 455 | static RD_INLINE RD_UNUSED void |
| 456 | rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) { |
| 457 | if (rk->rk_type != RD_KAFKA_PRODUCER) { |
| 458 | *cntp = 0; |
| 459 | *sizep = 0; |
| 460 | return; |
| 461 | } |
| 462 | |
| 463 | mtx_lock(&rk->rk_curr_msgs.lock); |
| 464 | *cntp = rk->rk_curr_msgs.cnt; |
| 465 | *sizep = rk->rk_curr_msgs.size; |
| 466 | mtx_unlock(&rk->rk_curr_msgs.lock); |
| 467 | } |
| 468 | |
| 469 | static RD_INLINE RD_UNUSED int |
| 470 | rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) { |
| 471 | int cnt; |
| 472 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
| 473 | return 0; |
| 474 | |
| 475 | mtx_lock(&rk->rk_curr_msgs.lock); |
| 476 | cnt = rk->rk_curr_msgs.cnt; |
| 477 | mtx_unlock(&rk->rk_curr_msgs.lock); |
| 478 | |
| 479 | return cnt; |
| 480 | } |
| 481 | |
| 482 | |
| 483 | void rd_kafka_destroy_final (rd_kafka_t *rk); |
| 484 | |
| 485 | void rd_kafka_global_init (void); |
| 486 | |
| 487 | /** |
| 488 | * @returns true if \p rk handle is terminating. |
| 489 | * |
| 490 | * @remark If consumer_close() is called from destroy*() it will be |
| 491 | * called prior to _F_TERMINATE being set and will thus not |
| 492 | * be able to use rd_kafka_terminating() to know it is shutting down. |
| 493 | * That code should instead just check that rk_terminate is non-zero |
| 494 | * (the _F_DESTROY_CALLED flag will be set). |
| 495 | */ |
| 496 | #define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate) & \ |
| 497 | RD_KAFKA_DESTROY_F_TERMINATE) |
| 498 | |
| 499 | /** |
| 500 | * @returns the destroy flags set matching \p flags, which might be |
| 501 | * a subset of the flags. |
| 502 | */ |
| 503 | #define rd_kafka_destroy_flags_check(rk,flags) \ |
| 504 | (rd_atomic32_get(&(rk)->rk_terminate) & (flags)) |
| 505 | |
| 506 | /** |
| 507 | * @returns true if no consumer callbacks, or standard consumer_close |
| 508 | * behaviour, should be triggered. */ |
| 509 | #define rd_kafka_destroy_flags_no_consumer_close(rk) \ |
| 510 | rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE) |
| 511 | |
| 512 | #define rd_kafka_is_simple_consumer(rk) \ |
| 513 | (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0) |
| 514 | int rd_kafka_simple_consumer_add (rd_kafka_t *rk); |
| 515 | |
| 516 | |
| 517 | /** |
| 518 | * @returns true if idempotency is enabled (producer only). |
| 519 | */ |
| 520 | #define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence) |
| 521 | |
| 522 | #define RD_KAFKA_PURGE_F_MASK 0x7 |
| 523 | const char *rd_kafka_purge_flags2str (int flags); |
| 524 | |
| 525 | |
| 526 | #include "rdkafka_topic.h" |
| 527 | #include "rdkafka_partition.h" |
| 528 | |
| 529 | |
| 530 | |
| 531 | |
| 532 | |
| 533 | |
| 534 | |
| 535 | |
| 536 | |
| 537 | |
| 538 | |
| 539 | |
| 540 | |
| 541 | |
| 542 | /** |
| 543 | * Debug contexts |
| 544 | */ |
| 545 | #define RD_KAFKA_DBG_GENERIC 0x1 |
| 546 | #define RD_KAFKA_DBG_BROKER 0x2 |
| 547 | #define RD_KAFKA_DBG_TOPIC 0x4 |
| 548 | #define RD_KAFKA_DBG_METADATA 0x8 |
| 549 | #define RD_KAFKA_DBG_FEATURE 0x10 |
| 550 | #define RD_KAFKA_DBG_QUEUE 0x20 |
| 551 | #define RD_KAFKA_DBG_MSG 0x40 |
| 552 | #define RD_KAFKA_DBG_PROTOCOL 0x80 |
| 553 | #define RD_KAFKA_DBG_CGRP 0x100 |
| 554 | #define RD_KAFKA_DBG_SECURITY 0x200 |
| 555 | #define RD_KAFKA_DBG_FETCH 0x400 |
| 556 | #define RD_KAFKA_DBG_INTERCEPTOR 0x800 |
| 557 | #define RD_KAFKA_DBG_PLUGIN 0x1000 |
| 558 | #define RD_KAFKA_DBG_CONSUMER 0x2000 |
| 559 | #define RD_KAFKA_DBG_ADMIN 0x4000 |
| 560 | #define RD_KAFKA_DBG_EOS 0x8000 |
| 561 | #define RD_KAFKA_DBG_ALL 0xffff |
| 562 | #define RD_KAFKA_DBG_NONE 0x0 |
| 563 | |
| 564 | void rd_kafka_log0(const rd_kafka_conf_t *conf, |
| 565 | const rd_kafka_t *rk, const char *, int level, |
| 566 | const char *fac, const char *fmt, ...) RD_FORMAT(printf, |
| 567 | 6, 7); |
| 568 | |
| 569 | #define rd_kafka_log(rk,level,fac,...) \ |
| 570 | rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__) |
| 571 | #define rd_kafka_dbg(rk,ctx,fac,...) do { \ |
| 572 | if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \ |
| 573 | rd_kafka_log0(&rk->rk_conf,rk,NULL, \ |
| 574 | LOG_DEBUG,fac,__VA_ARGS__); \ |
| 575 | } while (0) |
| 576 | |
| 577 | /* dbg() not requiring an rk, just the conf object, for early logging */ |
| 578 | #define rd_kafka_dbg0(conf,ctx,fac,...) do { \ |
| 579 | if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \ |
| 580 | rd_kafka_log0(conf,NULL,NULL, \ |
| 581 | LOG_DEBUG,fac,__VA_ARGS__); \ |
| 582 | } while (0) |
| 583 | |
| 584 | /* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering |
| 585 | * when logging another broker's name in the message. */ |
| 586 | #define rd_rkb_log(rkb,level,fac,...) do { \ |
| 587 | char _logname[RD_KAFKA_NODENAME_SIZE]; \ |
| 588 | mtx_lock(&(rkb)->rkb_logname_lock); \ |
| 589 | strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \ |
| 590 | _logname[RD_KAFKA_NODENAME_SIZE-1] = '\0'; \ |
| 591 | mtx_unlock(&(rkb)->rkb_logname_lock); \ |
| 592 | rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \ |
| 593 | (rkb)->rkb_rk, _logname, \ |
| 594 | level, fac, __VA_ARGS__); \ |
| 595 | } while (0) |
| 596 | |
| 597 | #define rd_rkb_dbg(rkb,ctx,fac,...) do { \ |
| 598 | if (unlikely((rkb)->rkb_rk->rk_conf.debug & \ |
| 599 | (RD_KAFKA_DBG_ ## ctx))) { \ |
| 600 | rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \ |
| 601 | } \ |
| 602 | } while (0) |
| 603 | |
| 604 | |
| 605 | |
| 606 | extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code; |
| 607 | |
| 608 | static RD_UNUSED RD_INLINE |
| 609 | rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err, |
| 610 | int errnox) { |
| 611 | if (errnox) { |
| 612 | /* MSVC: |
| 613 | * This is the correct way to set errno on Windows, |
| 614 | * but it is still pointless due to different errnos in |
| 615 | * in different runtimes: |
| 616 | * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/ |
| 617 | * errno is thus highly deprecated, and buggy, on Windows |
| 618 | * when using librdkafka as a dynamically loaded DLL. */ |
| 619 | rd_set_errno(errnox); |
| 620 | } |
| 621 | rd_kafka_last_error_code = err; |
| 622 | return err; |
| 623 | } |
| 624 | |
| 625 | |
| 626 | int rd_kafka_set_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err, |
| 627 | const char *fmt, ...) RD_FORMAT(printf, 3, 4); |
| 628 | |
| 629 | static RD_INLINE RD_UNUSED rd_kafka_resp_err_t |
| 630 | rd_kafka_fatal_error_code (rd_kafka_t *rk) { |
| 631 | return rd_atomic32_get(&rk->rk_fatal.err); |
| 632 | } |
| 633 | |
| 634 | |
| 635 | extern rd_atomic32_t rd_kafka_thread_cnt_curr; |
| 636 | |
| 637 | void rd_kafka_set_thread_name (const char *fmt, ...); |
| 638 | void rd_kafka_set_thread_sysname (const char *fmt, ...); |
| 639 | |
| 640 | int rd_kafka_path_is_dir (const char *path); |
| 641 | |
| 642 | rd_kafka_op_res_t |
| 643 | rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
| 644 | rd_kafka_q_cb_type_t cb_type, void *opaque); |
| 645 | |
| 646 | rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt); |
| 647 | |
| 648 | |
| 649 | /** |
| 650 | * @returns the number of milliseconds the maximum poll interval |
| 651 | * was exceeded, or 0 if not exceeded. |
| 652 | * |
| 653 | * @remark Only relevant for high-level consumer. |
| 654 | * |
| 655 | * @locality any |
| 656 | * @locks none |
| 657 | */ |
| 658 | static RD_INLINE RD_UNUSED int |
| 659 | rd_kafka_max_poll_exceeded (rd_kafka_t *rk) { |
| 660 | rd_ts_t last_poll = rd_atomic64_get(&rk->rk_ts_last_poll); |
| 661 | int exceeded; |
| 662 | |
| 663 | /* Application is blocked in librdkafka function, see |
| 664 | * rd_kafka_app_poll_blocking(). */ |
| 665 | if (last_poll == INT64_MAX) |
| 666 | return 0; |
| 667 | |
| 668 | exceeded = (int)((rd_clock() - last_poll) / 1000ll) - |
| 669 | rk->rk_conf.max_poll_interval_ms; |
| 670 | |
| 671 | if (unlikely(exceeded > 0)) |
| 672 | return exceeded; |
| 673 | |
| 674 | return 0; |
| 675 | } |
| 676 | |
| 677 | /** |
| 678 | * @brief Call on entry to blocking polling function to indicate |
| 679 | * that the application is blocked waiting for librdkafka |
| 680 | * and that max.poll.interval.ms should not be enforced. |
| 681 | * |
| 682 | * Call app_polled() Upon return from the function calling |
| 683 | * this function to register the application's last time of poll. |
| 684 | * |
| 685 | * @remark Only relevant for high-level consumer. |
| 686 | * |
| 687 | * @locality any |
| 688 | * @locks none |
| 689 | */ |
| 690 | static RD_INLINE RD_UNUSED void |
| 691 | rd_kafka_app_poll_blocking (rd_kafka_t *rk) { |
| 692 | rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX); |
| 693 | } |
| 694 | |
| 695 | /** |
| 696 | * @brief Set the last application poll time to now. |
| 697 | * |
| 698 | * @remark Only relevant for high-level consumer. |
| 699 | * |
| 700 | * @locality any |
| 701 | * @locks none |
| 702 | */ |
| 703 | static RD_INLINE RD_UNUSED void |
| 704 | rd_kafka_app_polled (rd_kafka_t *rk) { |
| 705 | rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock()); |
| 706 | } |
| 707 | |
| 708 | |
| 709 | /** |
| 710 | * rdkafka_background.c |
| 711 | */ |
| 712 | int rd_kafka_background_thread_main (void *arg); |
| 713 | |
| 714 | #endif /* _RDKAFKA_INT_H_ */ |
| 715 | |