1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2013, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #ifndef _RDKAFKA_INT_H_ |
30 | #define _RDKAFKA_INT_H_ |
31 | |
32 | #ifndef _MSC_VER |
33 | #define _GNU_SOURCE /* for strndup() */ |
34 | #include <syslog.h> |
35 | #else |
36 | typedef int mode_t; |
37 | #endif |
38 | #include <fcntl.h> |
39 | |
40 | |
41 | #include "rdsysqueue.h" |
42 | |
43 | #include "rdkafka.h" |
44 | #include "rd.h" |
45 | #include "rdlog.h" |
46 | #include "rdtime.h" |
47 | #include "rdaddr.h" |
48 | #include "rdinterval.h" |
49 | #include "rdavg.h" |
50 | #include "rdlist.h" |
51 | |
52 | #if WITH_SSL |
53 | #include <openssl/ssl.h> |
54 | #endif |
55 | |
56 | |
57 | |
58 | |
59 | typedef struct rd_kafka_itopic_s rd_kafka_itopic_t; |
60 | typedef struct rd_ikafka_s rd_ikafka_t; |
61 | |
62 | |
63 | #define rd_kafka_assert(rk, cond) do { \ |
64 | if (unlikely(!(cond))) \ |
65 | rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \ |
66 | (rk), "assert: " # cond); \ |
67 | } while (0) |
68 | |
69 | |
70 | void |
71 | RD_NORETURN |
72 | rd_kafka_crash (const char *file, int line, const char *function, |
73 | rd_kafka_t *rk, const char *reason); |
74 | |
75 | |
76 | /* Forward declarations */ |
77 | struct rd_kafka_s; |
78 | struct rd_kafka_itopic_s; |
79 | struct rd_kafka_msg_s; |
80 | struct rd_kafka_broker_s; |
81 | struct rd_kafka_toppar_s; |
82 | |
83 | typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t; |
84 | typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t; |
85 | |
86 | |
87 | |
88 | #include "rdkafka_op.h" |
89 | #include "rdkafka_queue.h" |
90 | #include "rdkafka_msg.h" |
91 | #include "rdkafka_proto.h" |
92 | #include "rdkafka_buf.h" |
93 | #include "rdkafka_pattern.h" |
94 | #include "rdkafka_conf.h" |
95 | #include "rdkafka_transport.h" |
96 | #include "rdkafka_timer.h" |
97 | #include "rdkafka_assignor.h" |
98 | #include "rdkafka_metadata.h" |
99 | |
100 | |
101 | /** |
102 | * Protocol level sanity |
103 | */ |
104 | #define RD_KAFKAP_BROKERS_MAX 10000 |
105 | #define RD_KAFKAP_TOPICS_MAX 1000000 |
106 | #define RD_KAFKAP_PARTITIONS_MAX 100000 |
107 | |
108 | |
109 | #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) |
110 | |
111 | |
112 | |
113 | |
114 | /** |
115 | * @enum Idempotent Producer state |
116 | */ |
117 | typedef enum { |
118 | RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */ |
119 | RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */ |
120 | RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */ |
121 | RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */ |
122 | RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */ |
123 | RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding |
124 | * ProduceRequests to finish |
125 | * before resetting and |
126 | * re-requesting a new PID. */ |
127 | RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding |
128 | * ProduceRequests to finish |
129 | * before bumping the current |
130 | * epoch. */ |
131 | } rd_kafka_idemp_state_t; |
132 | |
133 | /** |
134 | * @returns the idemp_state_t string representation |
135 | */ |
136 | static RD_UNUSED const char * |
137 | rd_kafka_idemp_state2str (rd_kafka_idemp_state_t state) { |
138 | static const char *names[] = { |
139 | "Init" , |
140 | "Terminate" , |
141 | "RequestPID" , |
142 | "WaitPID" , |
143 | "Assigned" , |
144 | "DrainReset" , |
145 | "DrainBump" |
146 | }; |
147 | return names[state]; |
148 | } |
149 | |
150 | |
151 | |
152 | |
153 | /** |
154 | * Kafka handle, internal representation of the application's rd_kafka_t. |
155 | */ |
156 | |
157 | typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t; |
158 | |
159 | struct rd_kafka_s { |
160 | rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */ |
161 | rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */ |
162 | |
163 | TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers; |
164 | rd_list_t rk_broker_by_id; /* Fast id lookups. */ |
165 | rd_atomic32_t rk_broker_cnt; |
166 | /**< Number of brokers in state >= UP */ |
167 | rd_atomic32_t rk_broker_up_cnt; |
168 | /**< Number of logical brokers in state >= UP, this is a sub-set |
169 | * of rk_broker_up_cnt. */ |
170 | rd_atomic32_t rk_logical_broker_up_cnt; |
171 | /**< Number of brokers that are down, only includes brokers |
172 | * that have had at least one connection attempt. */ |
173 | rd_atomic32_t rk_broker_down_cnt; |
174 | /**< Logical brokers currently without an address. |
175 | * Used for calculating ERR__ALL_BROKERS_DOWN. */ |
176 | rd_atomic32_t rk_broker_addrless_cnt; |
177 | |
178 | mtx_t rk_internal_rkb_lock; |
179 | rd_kafka_broker_t *rk_internal_rkb; |
180 | |
181 | /* Broadcasting of broker state changes to wake up |
182 | * functions waiting for a state change. */ |
183 | cnd_t rk_broker_state_change_cnd; |
184 | mtx_t rk_broker_state_change_lock; |
185 | int rk_broker_state_change_version; |
186 | /* List of (rd_kafka_enq_once_t*) objects waiting for broker |
187 | * state changes. Protected by rk_broker_state_change_lock. */ |
188 | rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */ |
189 | |
190 | TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics; |
191 | int rk_topic_cnt; |
192 | |
193 | struct rd_kafka_cgrp_s *rk_cgrp; |
194 | |
195 | rd_kafka_conf_t rk_conf; |
196 | rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */ |
197 | char rk_name[128]; |
198 | rd_kafkap_str_t *rk_client_id; |
199 | rd_kafkap_str_t *rk_group_id; /* Consumer group id */ |
200 | |
201 | int rk_flags; |
202 | rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_.. |
203 | * flags instance |
204 | * is being destroyed. |
205 | * The value set is the |
206 | * destroy flags from |
207 | * rd_kafka_destroy*() and |
208 | * the two internal flags shown |
209 | * below. |
210 | * |
211 | * Order: |
212 | * 1. user_flags | .._F_DESTROY_CALLED |
213 | * is set in rd_kafka_destroy*(). |
214 | * 2. consumer_close() is called |
215 | * for consumers. |
216 | * 3. .._F_TERMINATE is set to |
217 | * signal all background threads |
218 | * to terminate. |
219 | */ |
220 | |
221 | #define RD_KAFKA_DESTROY_F_TERMINATE 0x1 /**< Internal flag to make sure |
222 | * rk_terminate is set to non-zero |
223 | * value even if user passed |
224 | * no destroy flags. */ |
225 | #define RD_KAFKA_DESTROY_F_DESTROY_CALLED 0x2 /**< Application has called |
226 | * ..destroy*() and we've |
227 | * begun the termination |
228 | * process. |
229 | * This flag is needed to avoid |
230 | * rk_terminate from being |
231 | * 0 when destroy_flags() |
232 | * is called with flags=0 |
233 | * and prior to _F_TERMINATE |
234 | * has been set. */ |
235 | #define RD_KAFKA_DESTROY_F_IMMEDIATE 0x4 /**< Immediate non-blocking |
236 | * destruction without waiting |
237 | * for all resources |
238 | * to be cleaned up. |
239 | * WARNING: Memory and resource |
240 | * leaks possible. |
241 | * This flag automatically sets |
242 | * .._NO_CONSUMER_CLOSE. */ |
243 | |
244 | |
245 | rwlock_t rk_lock; |
246 | rd_kafka_type_t rk_type; |
247 | struct timeval rk_tv_state_change; |
248 | |
249 | rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application |
250 | * consumer_poll() call |
251 | * (or equivalent). |
252 | * Used to enforce |
253 | * max.poll.interval.ms. |
254 | * Only relevant for consumer. */ |
255 | /* First fatal error. */ |
256 | struct { |
257 | rd_atomic32_t err; /**< rd_kafka_resp_err_t */ |
258 | char *errstr; /**< Protected by rk_lock */ |
259 | int cnt; /**< Number of errors raised, only |
260 | * the first one is stored. */ |
261 | } rk_fatal; |
262 | |
263 | rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value |
264 | * from broker. */ |
265 | |
266 | /* Locks: rd_kafka_*lock() */ |
267 | rd_ts_t rk_ts_metadata; /* Timestamp of most recent |
268 | * metadata. */ |
269 | |
270 | struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */ |
271 | rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ |
272 | struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ |
273 | |
274 | char *rk_clusterid; /* ClusterId from metadata */ |
275 | int32_t rk_controllerid; /* ControllerId from metadata */ |
276 | |
277 | /* Simple consumer count: |
278 | * >0: Running in legacy / Simple Consumer mode, |
279 | * 0: No consumers running |
280 | * <0: Running in High level consumer mode */ |
281 | rd_atomic32_t rk_simple_cnt; |
282 | |
283 | /** |
284 | * Exactly Once Semantics and Idempotent Producer |
285 | * |
286 | * @locks rk_lock |
287 | */ |
288 | struct { |
289 | rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer |
290 | * state */ |
291 | rd_ts_t ts_idemp_state;/**< Last state change */ |
292 | rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */ |
293 | int epoch_cnt; /**< Number of times pid/epoch changed */ |
294 | rd_atomic32_t inflight_toppar_cnt; /**< Current number of |
295 | * toppars with inflight |
296 | * requests. */ |
297 | rd_kafka_timer_t request_pid_tmr; /**< Timer for pid retrieval*/ |
298 | |
299 | rd_kafkap_str_t *transactional_id; /**< Transactional Id, |
300 | * a null string. */ |
301 | } rk_eos; |
302 | |
303 | const rd_kafkap_bytes_t *rk_null_bytes; |
304 | |
305 | struct { |
306 | mtx_t lock; /* Protects acces to this struct */ |
307 | cnd_t cnd; /* For waking up blocking injectors */ |
308 | unsigned int cnt; /* Current message count */ |
309 | size_t size; /* Current message size sum */ |
310 | unsigned int max_cnt; /* Max limit */ |
311 | size_t max_size; /* Max limit */ |
312 | } rk_curr_msgs; |
313 | |
314 | rd_kafka_timers_t rk_timers; |
315 | thrd_t rk_thread; |
316 | |
317 | int rk_initialized; /**< Will be > 0 when the rd_kafka_t |
318 | * instance has been fully initialized. */ |
319 | |
320 | int rk_init_wait_cnt; /**< Number of background threads that |
321 | * need to finish initialization. */ |
322 | cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread |
323 | * to finish its initialization before |
324 | * before rd_kafka_new() returns. */ |
325 | mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */ |
326 | |
327 | /** |
328 | * Background thread and queue, |
329 | * enabled by setting `background_event_cb()`. |
330 | */ |
331 | struct { |
332 | rd_kafka_q_t *q; /**< Queue served by background thread. */ |
333 | thrd_t thread; /**< Background thread. */ |
334 | int calling; /**< Indicates whether the event callback |
335 | * is being called, reset back to 0 |
336 | * when the callback returns. |
337 | * This can be used for troubleshooting |
338 | * purposes. */ |
339 | } rk_background; |
340 | |
341 | |
342 | /* |
343 | * Logs, events or actions to rate limit / suppress |
344 | */ |
345 | struct { |
346 | /**< Log: No brokers support Idempotent Producer */ |
347 | rd_interval_t no_idemp_brokers; |
348 | |
349 | /**< Sparse connections: randomly select broker |
350 | * to bring up. This interval should allow |
351 | * for a previous connection to be established, |
352 | * which varies between different environments: |
353 | * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000. |
354 | */ |
355 | rd_interval_t sparse_connect_random; |
356 | /**< Lock for sparse_connect_random */ |
357 | mtx_t sparse_connect_lock; |
358 | } rk_suppress; |
359 | |
360 | struct { |
361 | void *handle; /**< Provider-specific handle struct pointer. |
362 | * Typically assigned in provider's .init() */ |
363 | } rk_sasl; |
364 | }; |
365 | |
366 | #define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock) |
367 | #define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock) |
368 | #define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock) |
369 | #define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock) |
370 | |
371 | |
372 | /** |
373 | * @brief Add \p cnt messages and of total size \p size bytes to the |
374 | * internal bookkeeping of current message counts. |
375 | * If the total message count or size after add would exceed the |
376 | * configured limits \c queue.buffering.max.messages and |
377 | * \c queue.buffering.max.kbytes then depending on the value of |
378 | * \p block the function either blocks until enough space is available |
379 | * if \p block is 1, else immediately returns |
380 | * RD_KAFKA_RESP_ERR__QUEUE_FULL. |
381 | * |
382 | * @param rdmtx If non-null and \p block is set and blocking is to ensue, |
383 | * then unlock this mutex for the duration of the blocking |
384 | * and then reacquire with a read-lock. |
385 | */ |
386 | static RD_INLINE RD_UNUSED rd_kafka_resp_err_t |
387 | rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size, |
388 | int block, rwlock_t *rdlock) { |
389 | |
390 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
391 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
392 | |
393 | mtx_lock(&rk->rk_curr_msgs.lock); |
394 | while (unlikely(rk->rk_curr_msgs.cnt + cnt > |
395 | rk->rk_curr_msgs.max_cnt || |
396 | (unsigned long long)(rk->rk_curr_msgs.size + size) > |
397 | (unsigned long long)rk->rk_curr_msgs.max_size)) { |
398 | if (!block) { |
399 | mtx_unlock(&rk->rk_curr_msgs.lock); |
400 | return RD_KAFKA_RESP_ERR__QUEUE_FULL; |
401 | } |
402 | |
403 | if (rdlock) |
404 | rwlock_rdunlock(rdlock); |
405 | |
406 | cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock); |
407 | |
408 | if (rdlock) |
409 | rwlock_rdlock(rdlock); |
410 | |
411 | } |
412 | |
413 | rk->rk_curr_msgs.cnt += cnt; |
414 | rk->rk_curr_msgs.size += size; |
415 | mtx_unlock(&rk->rk_curr_msgs.lock); |
416 | |
417 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
418 | } |
419 | |
420 | |
421 | /** |
422 | * @brief Subtract \p cnt messages of total size \p size from the |
423 | * current bookkeeping and broadcast a wakeup on the condvar |
424 | * for any waiting & blocking threads. |
425 | */ |
426 | static RD_INLINE RD_UNUSED void |
427 | rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) { |
428 | int broadcast = 0; |
429 | |
430 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
431 | return; |
432 | |
433 | mtx_lock(&rk->rk_curr_msgs.lock); |
434 | rd_kafka_assert(NULL, |
435 | rk->rk_curr_msgs.cnt >= cnt && |
436 | rk->rk_curr_msgs.size >= size); |
437 | |
438 | /* If the subtraction would pass one of the thresholds |
439 | * broadcast a wake-up to any waiting listeners. */ |
440 | if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt && |
441 | rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) || |
442 | (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size && |
443 | rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size)) |
444 | broadcast = 1; |
445 | |
446 | rk->rk_curr_msgs.cnt -= cnt; |
447 | rk->rk_curr_msgs.size -= size; |
448 | |
449 | if (unlikely(broadcast)) |
450 | cnd_broadcast(&rk->rk_curr_msgs.cnd); |
451 | |
452 | mtx_unlock(&rk->rk_curr_msgs.lock); |
453 | } |
454 | |
455 | static RD_INLINE RD_UNUSED void |
456 | rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) { |
457 | if (rk->rk_type != RD_KAFKA_PRODUCER) { |
458 | *cntp = 0; |
459 | *sizep = 0; |
460 | return; |
461 | } |
462 | |
463 | mtx_lock(&rk->rk_curr_msgs.lock); |
464 | *cntp = rk->rk_curr_msgs.cnt; |
465 | *sizep = rk->rk_curr_msgs.size; |
466 | mtx_unlock(&rk->rk_curr_msgs.lock); |
467 | } |
468 | |
469 | static RD_INLINE RD_UNUSED int |
470 | rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) { |
471 | int cnt; |
472 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
473 | return 0; |
474 | |
475 | mtx_lock(&rk->rk_curr_msgs.lock); |
476 | cnt = rk->rk_curr_msgs.cnt; |
477 | mtx_unlock(&rk->rk_curr_msgs.lock); |
478 | |
479 | return cnt; |
480 | } |
481 | |
482 | |
483 | void rd_kafka_destroy_final (rd_kafka_t *rk); |
484 | |
485 | void rd_kafka_global_init (void); |
486 | |
487 | /** |
488 | * @returns true if \p rk handle is terminating. |
489 | * |
490 | * @remark If consumer_close() is called from destroy*() it will be |
491 | * called prior to _F_TERMINATE being set and will thus not |
492 | * be able to use rd_kafka_terminating() to know it is shutting down. |
493 | * That code should instead just check that rk_terminate is non-zero |
494 | * (the _F_DESTROY_CALLED flag will be set). |
495 | */ |
496 | #define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate) & \ |
497 | RD_KAFKA_DESTROY_F_TERMINATE) |
498 | |
499 | /** |
500 | * @returns the destroy flags set matching \p flags, which might be |
501 | * a subset of the flags. |
502 | */ |
503 | #define rd_kafka_destroy_flags_check(rk,flags) \ |
504 | (rd_atomic32_get(&(rk)->rk_terminate) & (flags)) |
505 | |
506 | /** |
507 | * @returns true if no consumer callbacks, or standard consumer_close |
508 | * behaviour, should be triggered. */ |
509 | #define rd_kafka_destroy_flags_no_consumer_close(rk) \ |
510 | rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE) |
511 | |
512 | #define rd_kafka_is_simple_consumer(rk) \ |
513 | (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0) |
514 | int rd_kafka_simple_consumer_add (rd_kafka_t *rk); |
515 | |
516 | |
517 | /** |
518 | * @returns true if idempotency is enabled (producer only). |
519 | */ |
520 | #define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence) |
521 | |
522 | #define RD_KAFKA_PURGE_F_MASK 0x7 |
523 | const char *rd_kafka_purge_flags2str (int flags); |
524 | |
525 | |
526 | #include "rdkafka_topic.h" |
527 | #include "rdkafka_partition.h" |
528 | |
529 | |
530 | |
531 | |
532 | |
533 | |
534 | |
535 | |
536 | |
537 | |
538 | |
539 | |
540 | |
541 | |
542 | /** |
543 | * Debug contexts |
544 | */ |
545 | #define RD_KAFKA_DBG_GENERIC 0x1 |
546 | #define RD_KAFKA_DBG_BROKER 0x2 |
547 | #define RD_KAFKA_DBG_TOPIC 0x4 |
548 | #define RD_KAFKA_DBG_METADATA 0x8 |
549 | #define RD_KAFKA_DBG_FEATURE 0x10 |
550 | #define RD_KAFKA_DBG_QUEUE 0x20 |
551 | #define RD_KAFKA_DBG_MSG 0x40 |
552 | #define RD_KAFKA_DBG_PROTOCOL 0x80 |
553 | #define RD_KAFKA_DBG_CGRP 0x100 |
554 | #define RD_KAFKA_DBG_SECURITY 0x200 |
555 | #define RD_KAFKA_DBG_FETCH 0x400 |
556 | #define RD_KAFKA_DBG_INTERCEPTOR 0x800 |
557 | #define RD_KAFKA_DBG_PLUGIN 0x1000 |
558 | #define RD_KAFKA_DBG_CONSUMER 0x2000 |
559 | #define RD_KAFKA_DBG_ADMIN 0x4000 |
560 | #define RD_KAFKA_DBG_EOS 0x8000 |
561 | #define RD_KAFKA_DBG_ALL 0xffff |
562 | #define RD_KAFKA_DBG_NONE 0x0 |
563 | |
564 | void rd_kafka_log0(const rd_kafka_conf_t *conf, |
565 | const rd_kafka_t *rk, const char *, int level, |
566 | const char *fac, const char *fmt, ...) RD_FORMAT(printf, |
567 | 6, 7); |
568 | |
569 | #define rd_kafka_log(rk,level,fac,...) \ |
570 | rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__) |
571 | #define rd_kafka_dbg(rk,ctx,fac,...) do { \ |
572 | if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \ |
573 | rd_kafka_log0(&rk->rk_conf,rk,NULL, \ |
574 | LOG_DEBUG,fac,__VA_ARGS__); \ |
575 | } while (0) |
576 | |
577 | /* dbg() not requiring an rk, just the conf object, for early logging */ |
578 | #define rd_kafka_dbg0(conf,ctx,fac,...) do { \ |
579 | if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \ |
580 | rd_kafka_log0(conf,NULL,NULL, \ |
581 | LOG_DEBUG,fac,__VA_ARGS__); \ |
582 | } while (0) |
583 | |
584 | /* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering |
585 | * when logging another broker's name in the message. */ |
586 | #define rd_rkb_log(rkb,level,fac,...) do { \ |
587 | char _logname[RD_KAFKA_NODENAME_SIZE]; \ |
588 | mtx_lock(&(rkb)->rkb_logname_lock); \ |
589 | strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \ |
590 | _logname[RD_KAFKA_NODENAME_SIZE-1] = '\0'; \ |
591 | mtx_unlock(&(rkb)->rkb_logname_lock); \ |
592 | rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \ |
593 | (rkb)->rkb_rk, _logname, \ |
594 | level, fac, __VA_ARGS__); \ |
595 | } while (0) |
596 | |
597 | #define rd_rkb_dbg(rkb,ctx,fac,...) do { \ |
598 | if (unlikely((rkb)->rkb_rk->rk_conf.debug & \ |
599 | (RD_KAFKA_DBG_ ## ctx))) { \ |
600 | rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \ |
601 | } \ |
602 | } while (0) |
603 | |
604 | |
605 | |
606 | extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code; |
607 | |
608 | static RD_UNUSED RD_INLINE |
609 | rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err, |
610 | int errnox) { |
611 | if (errnox) { |
612 | /* MSVC: |
613 | * This is the correct way to set errno on Windows, |
614 | * but it is still pointless due to different errnos in |
615 | * in different runtimes: |
616 | * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/ |
617 | * errno is thus highly deprecated, and buggy, on Windows |
618 | * when using librdkafka as a dynamically loaded DLL. */ |
619 | rd_set_errno(errnox); |
620 | } |
621 | rd_kafka_last_error_code = err; |
622 | return err; |
623 | } |
624 | |
625 | |
626 | int rd_kafka_set_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err, |
627 | const char *fmt, ...) RD_FORMAT(printf, 3, 4); |
628 | |
629 | static RD_INLINE RD_UNUSED rd_kafka_resp_err_t |
630 | rd_kafka_fatal_error_code (rd_kafka_t *rk) { |
631 | return rd_atomic32_get(&rk->rk_fatal.err); |
632 | } |
633 | |
634 | |
635 | extern rd_atomic32_t rd_kafka_thread_cnt_curr; |
636 | |
637 | void rd_kafka_set_thread_name (const char *fmt, ...); |
638 | void rd_kafka_set_thread_sysname (const char *fmt, ...); |
639 | |
640 | int rd_kafka_path_is_dir (const char *path); |
641 | |
642 | rd_kafka_op_res_t |
643 | rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
644 | rd_kafka_q_cb_type_t cb_type, void *opaque); |
645 | |
646 | rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt); |
647 | |
648 | |
649 | /** |
650 | * @returns the number of milliseconds the maximum poll interval |
651 | * was exceeded, or 0 if not exceeded. |
652 | * |
653 | * @remark Only relevant for high-level consumer. |
654 | * |
655 | * @locality any |
656 | * @locks none |
657 | */ |
658 | static RD_INLINE RD_UNUSED int |
659 | rd_kafka_max_poll_exceeded (rd_kafka_t *rk) { |
660 | rd_ts_t last_poll = rd_atomic64_get(&rk->rk_ts_last_poll); |
661 | int exceeded; |
662 | |
663 | /* Application is blocked in librdkafka function, see |
664 | * rd_kafka_app_poll_blocking(). */ |
665 | if (last_poll == INT64_MAX) |
666 | return 0; |
667 | |
668 | exceeded = (int)((rd_clock() - last_poll) / 1000ll) - |
669 | rk->rk_conf.max_poll_interval_ms; |
670 | |
671 | if (unlikely(exceeded > 0)) |
672 | return exceeded; |
673 | |
674 | return 0; |
675 | } |
676 | |
677 | /** |
678 | * @brief Call on entry to blocking polling function to indicate |
679 | * that the application is blocked waiting for librdkafka |
680 | * and that max.poll.interval.ms should not be enforced. |
681 | * |
682 | * Call app_polled() Upon return from the function calling |
683 | * this function to register the application's last time of poll. |
684 | * |
685 | * @remark Only relevant for high-level consumer. |
686 | * |
687 | * @locality any |
688 | * @locks none |
689 | */ |
690 | static RD_INLINE RD_UNUSED void |
691 | rd_kafka_app_poll_blocking (rd_kafka_t *rk) { |
692 | rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX); |
693 | } |
694 | |
695 | /** |
696 | * @brief Set the last application poll time to now. |
697 | * |
698 | * @remark Only relevant for high-level consumer. |
699 | * |
700 | * @locality any |
701 | * @locks none |
702 | */ |
703 | static RD_INLINE RD_UNUSED void |
704 | rd_kafka_app_polled (rd_kafka_t *rk) { |
705 | rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock()); |
706 | } |
707 | |
708 | |
709 | /** |
710 | * rdkafka_background.c |
711 | */ |
712 | int rd_kafka_background_thread_main (void *arg); |
713 | |
714 | #endif /* _RDKAFKA_INT_H_ */ |
715 | |