1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_INT_H_
30#define _RDKAFKA_INT_H_
31
32#ifndef _MSC_VER
33#define _GNU_SOURCE /* for strndup() */
34#include <syslog.h>
35#else
36typedef int mode_t;
37#endif
38#include <fcntl.h>
39
40
41#include "rdsysqueue.h"
42
43#include "rdkafka.h"
44#include "rd.h"
45#include "rdlog.h"
46#include "rdtime.h"
47#include "rdaddr.h"
48#include "rdinterval.h"
49#include "rdavg.h"
50#include "rdlist.h"
51
52#if WITH_SSL
53#include <openssl/ssl.h>
54#endif
55
56
57
58
59typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
60typedef struct rd_ikafka_s rd_ikafka_t;
61
62
63#define rd_kafka_assert(rk, cond) do { \
64 if (unlikely(!(cond))) \
65 rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
66 (rk), "assert: " # cond); \
67 } while (0)
68
69
70void
71RD_NORETURN
72rd_kafka_crash (const char *file, int line, const char *function,
73 rd_kafka_t *rk, const char *reason);
74
75
76/* Forward declarations */
77struct rd_kafka_s;
78struct rd_kafka_itopic_s;
79struct rd_kafka_msg_s;
80struct rd_kafka_broker_s;
81struct rd_kafka_toppar_s;
82
83typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
84typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
85
86
87
88#include "rdkafka_op.h"
89#include "rdkafka_queue.h"
90#include "rdkafka_msg.h"
91#include "rdkafka_proto.h"
92#include "rdkafka_buf.h"
93#include "rdkafka_pattern.h"
94#include "rdkafka_conf.h"
95#include "rdkafka_transport.h"
96#include "rdkafka_timer.h"
97#include "rdkafka_assignor.h"
98#include "rdkafka_metadata.h"
99
100
101/**
102 * Protocol level sanity
103 */
104#define RD_KAFKAP_BROKERS_MAX 10000
105#define RD_KAFKAP_TOPICS_MAX 1000000
106#define RD_KAFKAP_PARTITIONS_MAX 100000
107
108
109#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
110
111
112
113
114/**
115 * @enum Idempotent Producer state
116 */
117typedef enum {
118 RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */
119 RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */
120 RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */
121 RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */
122 RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */
123 RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding
124 * ProduceRequests to finish
125 * before resetting and
126 * re-requesting a new PID. */
127 RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding
128 * ProduceRequests to finish
129 * before bumping the current
130 * epoch. */
131} rd_kafka_idemp_state_t;
132
133/**
134 * @returns the idemp_state_t string representation
135 */
136static RD_UNUSED const char *
137rd_kafka_idemp_state2str (rd_kafka_idemp_state_t state) {
138 static const char *names[] = {
139 "Init",
140 "Terminate",
141 "RequestPID",
142 "WaitPID",
143 "Assigned",
144 "DrainReset",
145 "DrainBump"
146 };
147 return names[state];
148}
149
150
151
152
153/**
154 * Kafka handle, internal representation of the application's rd_kafka_t.
155 */
156
157typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
158
159struct rd_kafka_s {
160 rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */
161 rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */
162
163 TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
164 rd_list_t rk_broker_by_id; /* Fast id lookups. */
165 rd_atomic32_t rk_broker_cnt;
166 /**< Number of brokers in state >= UP */
167 rd_atomic32_t rk_broker_up_cnt;
168 /**< Number of logical brokers in state >= UP, this is a sub-set
169 * of rk_broker_up_cnt. */
170 rd_atomic32_t rk_logical_broker_up_cnt;
171 /**< Number of brokers that are down, only includes brokers
172 * that have had at least one connection attempt. */
173 rd_atomic32_t rk_broker_down_cnt;
174 /**< Logical brokers currently without an address.
175 * Used for calculating ERR__ALL_BROKERS_DOWN. */
176 rd_atomic32_t rk_broker_addrless_cnt;
177
178 mtx_t rk_internal_rkb_lock;
179 rd_kafka_broker_t *rk_internal_rkb;
180
181 /* Broadcasting of broker state changes to wake up
182 * functions waiting for a state change. */
183 cnd_t rk_broker_state_change_cnd;
184 mtx_t rk_broker_state_change_lock;
185 int rk_broker_state_change_version;
186 /* List of (rd_kafka_enq_once_t*) objects waiting for broker
187 * state changes. Protected by rk_broker_state_change_lock. */
188 rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */
189
190 TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics;
191 int rk_topic_cnt;
192
193 struct rd_kafka_cgrp_s *rk_cgrp;
194
195 rd_kafka_conf_t rk_conf;
196 rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
197 char rk_name[128];
198 rd_kafkap_str_t *rk_client_id;
199 rd_kafkap_str_t *rk_group_id; /* Consumer group id */
200
201 int rk_flags;
202 rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_..
203 * flags instance
204 * is being destroyed.
205 * The value set is the
206 * destroy flags from
207 * rd_kafka_destroy*() and
208 * the two internal flags shown
209 * below.
210 *
211 * Order:
212 * 1. user_flags | .._F_DESTROY_CALLED
213 * is set in rd_kafka_destroy*().
214 * 2. consumer_close() is called
215 * for consumers.
216 * 3. .._F_TERMINATE is set to
217 * signal all background threads
218 * to terminate.
219 */
220
221#define RD_KAFKA_DESTROY_F_TERMINATE 0x1 /**< Internal flag to make sure
222 * rk_terminate is set to non-zero
223 * value even if user passed
224 * no destroy flags. */
225#define RD_KAFKA_DESTROY_F_DESTROY_CALLED 0x2 /**< Application has called
226 * ..destroy*() and we've
227 * begun the termination
228 * process.
229 * This flag is needed to avoid
230 * rk_terminate from being
231 * 0 when destroy_flags()
232 * is called with flags=0
233 * and prior to _F_TERMINATE
234 * has been set. */
235#define RD_KAFKA_DESTROY_F_IMMEDIATE 0x4 /**< Immediate non-blocking
236 * destruction without waiting
237 * for all resources
238 * to be cleaned up.
239 * WARNING: Memory and resource
240 * leaks possible.
241 * This flag automatically sets
242 * .._NO_CONSUMER_CLOSE. */
243
244
245 rwlock_t rk_lock;
246 rd_kafka_type_t rk_type;
247 struct timeval rk_tv_state_change;
248
249 rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application
250 * consumer_poll() call
251 * (or equivalent).
252 * Used to enforce
253 * max.poll.interval.ms.
254 * Only relevant for consumer. */
255 /* First fatal error. */
256 struct {
257 rd_atomic32_t err; /**< rd_kafka_resp_err_t */
258 char *errstr; /**< Protected by rk_lock */
259 int cnt; /**< Number of errors raised, only
260 * the first one is stored. */
261 } rk_fatal;
262
263 rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value
264 * from broker. */
265
266 /* Locks: rd_kafka_*lock() */
267 rd_ts_t rk_ts_metadata; /* Timestamp of most recent
268 * metadata. */
269
270 struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
271 rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
272 struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
273
274 char *rk_clusterid; /* ClusterId from metadata */
275 int32_t rk_controllerid; /* ControllerId from metadata */
276
277 /* Simple consumer count:
278 * >0: Running in legacy / Simple Consumer mode,
279 * 0: No consumers running
280 * <0: Running in High level consumer mode */
281 rd_atomic32_t rk_simple_cnt;
282
283 /**
284 * Exactly Once Semantics and Idempotent Producer
285 *
286 * @locks rk_lock
287 */
288 struct {
289 rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer
290 * state */
291 rd_ts_t ts_idemp_state;/**< Last state change */
292 rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */
293 int epoch_cnt; /**< Number of times pid/epoch changed */
294 rd_atomic32_t inflight_toppar_cnt; /**< Current number of
295 * toppars with inflight
296 * requests. */
297 rd_kafka_timer_t request_pid_tmr; /**< Timer for pid retrieval*/
298
299 rd_kafkap_str_t *transactional_id; /**< Transactional Id,
300 * a null string. */
301 } rk_eos;
302
303 const rd_kafkap_bytes_t *rk_null_bytes;
304
305 struct {
306 mtx_t lock; /* Protects acces to this struct */
307 cnd_t cnd; /* For waking up blocking injectors */
308 unsigned int cnt; /* Current message count */
309 size_t size; /* Current message size sum */
310 unsigned int max_cnt; /* Max limit */
311 size_t max_size; /* Max limit */
312 } rk_curr_msgs;
313
314 rd_kafka_timers_t rk_timers;
315 thrd_t rk_thread;
316
317 int rk_initialized; /**< Will be > 0 when the rd_kafka_t
318 * instance has been fully initialized. */
319
320 int rk_init_wait_cnt; /**< Number of background threads that
321 * need to finish initialization. */
322 cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread
323 * to finish its initialization before
324 * before rd_kafka_new() returns. */
325 mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */
326
327 /**
328 * Background thread and queue,
329 * enabled by setting `background_event_cb()`.
330 */
331 struct {
332 rd_kafka_q_t *q; /**< Queue served by background thread. */
333 thrd_t thread; /**< Background thread. */
334 int calling; /**< Indicates whether the event callback
335 * is being called, reset back to 0
336 * when the callback returns.
337 * This can be used for troubleshooting
338 * purposes. */
339 } rk_background;
340
341
342 /*
343 * Logs, events or actions to rate limit / suppress
344 */
345 struct {
346 /**< Log: No brokers support Idempotent Producer */
347 rd_interval_t no_idemp_brokers;
348
349 /**< Sparse connections: randomly select broker
350 * to bring up. This interval should allow
351 * for a previous connection to be established,
352 * which varies between different environments:
353 * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000.
354 */
355 rd_interval_t sparse_connect_random;
356 /**< Lock for sparse_connect_random */
357 mtx_t sparse_connect_lock;
358 } rk_suppress;
359
360 struct {
361 void *handle; /**< Provider-specific handle struct pointer.
362 * Typically assigned in provider's .init() */
363 } rk_sasl;
364};
365
366#define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock)
367#define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock)
368#define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock)
369#define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock)
370
371
372/**
373 * @brief Add \p cnt messages and of total size \p size bytes to the
374 * internal bookkeeping of current message counts.
375 * If the total message count or size after add would exceed the
376 * configured limits \c queue.buffering.max.messages and
377 * \c queue.buffering.max.kbytes then depending on the value of
378 * \p block the function either blocks until enough space is available
379 * if \p block is 1, else immediately returns
380 * RD_KAFKA_RESP_ERR__QUEUE_FULL.
381 *
382 * @param rdmtx If non-null and \p block is set and blocking is to ensue,
383 * then unlock this mutex for the duration of the blocking
384 * and then reacquire with a read-lock.
385 */
386static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
387rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
388 int block, rwlock_t *rdlock) {
389
390 if (rk->rk_type != RD_KAFKA_PRODUCER)
391 return RD_KAFKA_RESP_ERR_NO_ERROR;
392
393 mtx_lock(&rk->rk_curr_msgs.lock);
394 while (unlikely(rk->rk_curr_msgs.cnt + cnt >
395 rk->rk_curr_msgs.max_cnt ||
396 (unsigned long long)(rk->rk_curr_msgs.size + size) >
397 (unsigned long long)rk->rk_curr_msgs.max_size)) {
398 if (!block) {
399 mtx_unlock(&rk->rk_curr_msgs.lock);
400 return RD_KAFKA_RESP_ERR__QUEUE_FULL;
401 }
402
403 if (rdlock)
404 rwlock_rdunlock(rdlock);
405
406 cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
407
408 if (rdlock)
409 rwlock_rdlock(rdlock);
410
411 }
412
413 rk->rk_curr_msgs.cnt += cnt;
414 rk->rk_curr_msgs.size += size;
415 mtx_unlock(&rk->rk_curr_msgs.lock);
416
417 return RD_KAFKA_RESP_ERR_NO_ERROR;
418}
419
420
421/**
422 * @brief Subtract \p cnt messages of total size \p size from the
423 * current bookkeeping and broadcast a wakeup on the condvar
424 * for any waiting & blocking threads.
425 */
426static RD_INLINE RD_UNUSED void
427rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
428 int broadcast = 0;
429
430 if (rk->rk_type != RD_KAFKA_PRODUCER)
431 return;
432
433 mtx_lock(&rk->rk_curr_msgs.lock);
434 rd_kafka_assert(NULL,
435 rk->rk_curr_msgs.cnt >= cnt &&
436 rk->rk_curr_msgs.size >= size);
437
438 /* If the subtraction would pass one of the thresholds
439 * broadcast a wake-up to any waiting listeners. */
440 if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
441 rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
442 (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
443 rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
444 broadcast = 1;
445
446 rk->rk_curr_msgs.cnt -= cnt;
447 rk->rk_curr_msgs.size -= size;
448
449 if (unlikely(broadcast))
450 cnd_broadcast(&rk->rk_curr_msgs.cnd);
451
452 mtx_unlock(&rk->rk_curr_msgs.lock);
453}
454
455static RD_INLINE RD_UNUSED void
456rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
457 if (rk->rk_type != RD_KAFKA_PRODUCER) {
458 *cntp = 0;
459 *sizep = 0;
460 return;
461 }
462
463 mtx_lock(&rk->rk_curr_msgs.lock);
464 *cntp = rk->rk_curr_msgs.cnt;
465 *sizep = rk->rk_curr_msgs.size;
466 mtx_unlock(&rk->rk_curr_msgs.lock);
467}
468
469static RD_INLINE RD_UNUSED int
470rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
471 int cnt;
472 if (rk->rk_type != RD_KAFKA_PRODUCER)
473 return 0;
474
475 mtx_lock(&rk->rk_curr_msgs.lock);
476 cnt = rk->rk_curr_msgs.cnt;
477 mtx_unlock(&rk->rk_curr_msgs.lock);
478
479 return cnt;
480}
481
482
483void rd_kafka_destroy_final (rd_kafka_t *rk);
484
485void rd_kafka_global_init (void);
486
487/**
488 * @returns true if \p rk handle is terminating.
489 *
490 * @remark If consumer_close() is called from destroy*() it will be
491 * called prior to _F_TERMINATE being set and will thus not
492 * be able to use rd_kafka_terminating() to know it is shutting down.
493 * That code should instead just check that rk_terminate is non-zero
494 * (the _F_DESTROY_CALLED flag will be set).
495 */
496#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate) & \
497 RD_KAFKA_DESTROY_F_TERMINATE)
498
499/**
500 * @returns the destroy flags set matching \p flags, which might be
501 * a subset of the flags.
502 */
503#define rd_kafka_destroy_flags_check(rk,flags) \
504 (rd_atomic32_get(&(rk)->rk_terminate) & (flags))
505
506/**
507 * @returns true if no consumer callbacks, or standard consumer_close
508 * behaviour, should be triggered. */
509#define rd_kafka_destroy_flags_no_consumer_close(rk) \
510 rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)
511
512#define rd_kafka_is_simple_consumer(rk) \
513 (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
514int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
515
516
517/**
518 * @returns true if idempotency is enabled (producer only).
519 */
520#define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence)
521
522#define RD_KAFKA_PURGE_F_MASK 0x7
523const char *rd_kafka_purge_flags2str (int flags);
524
525
526#include "rdkafka_topic.h"
527#include "rdkafka_partition.h"
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542/**
543 * Debug contexts
544 */
545#define RD_KAFKA_DBG_GENERIC 0x1
546#define RD_KAFKA_DBG_BROKER 0x2
547#define RD_KAFKA_DBG_TOPIC 0x4
548#define RD_KAFKA_DBG_METADATA 0x8
549#define RD_KAFKA_DBG_FEATURE 0x10
550#define RD_KAFKA_DBG_QUEUE 0x20
551#define RD_KAFKA_DBG_MSG 0x40
552#define RD_KAFKA_DBG_PROTOCOL 0x80
553#define RD_KAFKA_DBG_CGRP 0x100
554#define RD_KAFKA_DBG_SECURITY 0x200
555#define RD_KAFKA_DBG_FETCH 0x400
556#define RD_KAFKA_DBG_INTERCEPTOR 0x800
557#define RD_KAFKA_DBG_PLUGIN 0x1000
558#define RD_KAFKA_DBG_CONSUMER 0x2000
559#define RD_KAFKA_DBG_ADMIN 0x4000
560#define RD_KAFKA_DBG_EOS 0x8000
561#define RD_KAFKA_DBG_ALL 0xffff
562#define RD_KAFKA_DBG_NONE 0x0
563
564void rd_kafka_log0(const rd_kafka_conf_t *conf,
565 const rd_kafka_t *rk, const char *extra, int level,
566 const char *fac, const char *fmt, ...) RD_FORMAT(printf,
567 6, 7);
568
569#define rd_kafka_log(rk,level,fac,...) \
570 rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
571#define rd_kafka_dbg(rk,ctx,fac,...) do { \
572 if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
573 rd_kafka_log0(&rk->rk_conf,rk,NULL, \
574 LOG_DEBUG,fac,__VA_ARGS__); \
575 } while (0)
576
577/* dbg() not requiring an rk, just the conf object, for early logging */
578#define rd_kafka_dbg0(conf,ctx,fac,...) do { \
579 if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \
580 rd_kafka_log0(conf,NULL,NULL, \
581 LOG_DEBUG,fac,__VA_ARGS__); \
582 } while (0)
583
584/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
585 * when logging another broker's name in the message. */
586#define rd_rkb_log(rkb,level,fac,...) do { \
587 char _logname[RD_KAFKA_NODENAME_SIZE]; \
588 mtx_lock(&(rkb)->rkb_logname_lock); \
589 strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \
590 _logname[RD_KAFKA_NODENAME_SIZE-1] = '\0'; \
591 mtx_unlock(&(rkb)->rkb_logname_lock); \
592 rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
593 (rkb)->rkb_rk, _logname, \
594 level, fac, __VA_ARGS__); \
595 } while (0)
596
597#define rd_rkb_dbg(rkb,ctx,fac,...) do { \
598 if (unlikely((rkb)->rkb_rk->rk_conf.debug & \
599 (RD_KAFKA_DBG_ ## ctx))) { \
600 rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \
601 } \
602 } while (0)
603
604
605
606extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
607
608static RD_UNUSED RD_INLINE
609rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
610 int errnox) {
611 if (errnox) {
612 /* MSVC:
613 * This is the correct way to set errno on Windows,
614 * but it is still pointless due to different errnos in
615 * in different runtimes:
616 * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
617 * errno is thus highly deprecated, and buggy, on Windows
618 * when using librdkafka as a dynamically loaded DLL. */
619 rd_set_errno(errnox);
620 }
621 rd_kafka_last_error_code = err;
622 return err;
623}
624
625
626int rd_kafka_set_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
627 const char *fmt, ...) RD_FORMAT(printf, 3, 4);
628
629static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
630rd_kafka_fatal_error_code (rd_kafka_t *rk) {
631 return rd_atomic32_get(&rk->rk_fatal.err);
632}
633
634
635extern rd_atomic32_t rd_kafka_thread_cnt_curr;
636
637void rd_kafka_set_thread_name (const char *fmt, ...);
638void rd_kafka_set_thread_sysname (const char *fmt, ...);
639
640int rd_kafka_path_is_dir (const char *path);
641
642rd_kafka_op_res_t
643rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
644 rd_kafka_q_cb_type_t cb_type, void *opaque);
645
646rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
647
648
649/**
650 * @returns the number of milliseconds the maximum poll interval
651 * was exceeded, or 0 if not exceeded.
652 *
653 * @remark Only relevant for high-level consumer.
654 *
655 * @locality any
656 * @locks none
657 */
658static RD_INLINE RD_UNUSED int
659rd_kafka_max_poll_exceeded (rd_kafka_t *rk) {
660 rd_ts_t last_poll = rd_atomic64_get(&rk->rk_ts_last_poll);
661 int exceeded;
662
663 /* Application is blocked in librdkafka function, see
664 * rd_kafka_app_poll_blocking(). */
665 if (last_poll == INT64_MAX)
666 return 0;
667
668 exceeded = (int)((rd_clock() - last_poll) / 1000ll) -
669 rk->rk_conf.max_poll_interval_ms;
670
671 if (unlikely(exceeded > 0))
672 return exceeded;
673
674 return 0;
675}
676
677/**
678 * @brief Call on entry to blocking polling function to indicate
679 * that the application is blocked waiting for librdkafka
680 * and that max.poll.interval.ms should not be enforced.
681 *
682 * Call app_polled() Upon return from the function calling
683 * this function to register the application's last time of poll.
684 *
685 * @remark Only relevant for high-level consumer.
686 *
687 * @locality any
688 * @locks none
689 */
690static RD_INLINE RD_UNUSED void
691rd_kafka_app_poll_blocking (rd_kafka_t *rk) {
692 rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
693}
694
695/**
696 * @brief Set the last application poll time to now.
697 *
698 * @remark Only relevant for high-level consumer.
699 *
700 * @locality any
701 * @locks none
702 */
703static RD_INLINE RD_UNUSED void
704rd_kafka_app_polled (rd_kafka_t *rk) {
705 rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());
706}
707
708
709/**
710 * rdkafka_background.c
711 */
712int rd_kafka_background_thread_main (void *arg);
713
714#endif /* _RDKAFKA_INT_H_ */
715