1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2013, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | |
30 | #define _GNU_SOURCE |
31 | #include <errno.h> |
32 | #include <string.h> |
33 | #include <stdarg.h> |
34 | #include <signal.h> |
35 | #include <stdlib.h> |
36 | #include <sys/stat.h> |
37 | |
38 | #include "rdkafka_int.h" |
39 | #include "rdkafka_msg.h" |
40 | #include "rdkafka_broker.h" |
41 | #include "rdkafka_topic.h" |
42 | #include "rdkafka_partition.h" |
43 | #include "rdkafka_offset.h" |
44 | #include "rdkafka_transport.h" |
45 | #include "rdkafka_cgrp.h" |
46 | #include "rdkafka_assignor.h" |
47 | #include "rdkafka_request.h" |
48 | #include "rdkafka_event.h" |
49 | #include "rdkafka_sasl.h" |
50 | #include "rdkafka_interceptor.h" |
51 | #include "rdkafka_idempotence.h" |
52 | #include "rdkafka_sasl_oauthbearer.h" |
53 | #if WITH_SSL |
54 | #include "rdkafka_ssl.h" |
55 | #endif |
56 | |
57 | #include "rdtime.h" |
58 | #include "crc32c.h" |
59 | #include "rdunittest.h" |
60 | |
61 | #ifdef _MSC_VER |
62 | #include <sys/types.h> |
63 | #include <sys/timeb.h> |
64 | #endif |
65 | |
66 | |
67 | |
68 | static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT; |
69 | |
70 | /** |
71 | * @brief Global counter+lock for all active librdkafka instances |
72 | */ |
73 | mtx_t rd_kafka_global_lock; |
74 | int rd_kafka_global_cnt; |
75 | |
76 | |
77 | /** |
78 | * Last API error code, per thread. |
79 | * Shared among all rd_kafka_t instances. |
80 | */ |
81 | rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code; |
82 | |
83 | |
84 | /** |
85 | * Current number of threads created by rdkafka. |
86 | * This is used in regression tests. |
87 | */ |
88 | rd_atomic32_t rd_kafka_thread_cnt_curr; |
89 | int rd_kafka_thread_cnt (void) { |
90 | #if ENABLE_SHAREDPTR_DEBUG |
91 | rd_shared_ptrs_dump(); |
92 | #endif |
93 | |
94 | return rd_atomic32_get(&rd_kafka_thread_cnt_curr); |
95 | } |
96 | |
97 | /** |
98 | * Current thread's log name (TLS) |
99 | */ |
100 | static char RD_TLS rd_kafka_thread_name[64] = "app" ; |
101 | |
102 | void rd_kafka_set_thread_name (const char *fmt, ...) { |
103 | va_list ap; |
104 | |
105 | va_start(ap, fmt); |
106 | rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), |
107 | fmt, ap); |
108 | va_end(ap); |
109 | } |
110 | |
111 | /** |
112 | * @brief Current thread's system name (TLS) |
113 | * |
114 | * Note the name must be 15 characters or less, because it is passed to |
115 | * pthread_setname_np on Linux which imposes this limit. |
116 | */ |
117 | static char RD_TLS rd_kafka_thread_sysname[16] = "app" ; |
118 | |
119 | void rd_kafka_set_thread_sysname (const char *fmt, ...) { |
120 | va_list ap; |
121 | |
122 | va_start(ap, fmt); |
123 | rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname), |
124 | fmt, ap); |
125 | va_end(ap); |
126 | |
127 | thrd_setname(rd_kafka_thread_sysname); |
128 | } |
129 | |
130 | static void rd_kafka_global_init0 (void) { |
131 | #if ENABLE_SHAREDPTR_DEBUG |
132 | LIST_INIT(&rd_shared_ptr_debug_list); |
133 | mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain); |
134 | atexit(rd_shared_ptrs_dump); |
135 | #endif |
136 | mtx_init(&rd_kafka_global_lock, mtx_plain); |
137 | #if ENABLE_DEVEL |
138 | rd_atomic32_init(&rd_kafka_op_cnt, 0); |
139 | #endif |
140 | crc32c_global_init(); |
141 | #if WITH_SSL |
142 | /* The configuration interface might need to use |
143 | * OpenSSL to parse keys, prior to any rd_kafka_t |
144 | * object has been created. */ |
145 | rd_kafka_ssl_init(); |
146 | #endif |
147 | } |
148 | |
149 | /** |
150 | * @brief Initialize once per process |
151 | */ |
152 | void rd_kafka_global_init (void) { |
153 | call_once(&rd_kafka_global_init_once, rd_kafka_global_init0); |
154 | } |
155 | |
156 | /** |
157 | * @returns the current number of active librdkafka instances |
158 | */ |
159 | static int rd_kafka_global_cnt_get (void) { |
160 | int r; |
161 | mtx_lock(&rd_kafka_global_lock); |
162 | r = rd_kafka_global_cnt; |
163 | mtx_unlock(&rd_kafka_global_lock); |
164 | return r; |
165 | } |
166 | |
167 | |
168 | /** |
169 | * @brief Increase counter for active librdkafka instances. |
170 | * If this is the first instance the global constructors will be called, if any. |
171 | */ |
172 | static void rd_kafka_global_cnt_incr (void) { |
173 | mtx_lock(&rd_kafka_global_lock); |
174 | rd_kafka_global_cnt++; |
175 | if (rd_kafka_global_cnt == 1) { |
176 | rd_kafka_transport_init(); |
177 | #if WITH_SSL |
178 | rd_kafka_ssl_init(); |
179 | #endif |
180 | rd_kafka_sasl_global_init(); |
181 | } |
182 | mtx_unlock(&rd_kafka_global_lock); |
183 | } |
184 | |
185 | /** |
186 | * @brief Decrease counter for active librdkafka instances. |
187 | * If this counter reaches 0 the global destructors will be called, if any. |
188 | */ |
189 | static void rd_kafka_global_cnt_decr (void) { |
190 | mtx_lock(&rd_kafka_global_lock); |
191 | rd_kafka_assert(NULL, rd_kafka_global_cnt > 0); |
192 | rd_kafka_global_cnt--; |
193 | if (rd_kafka_global_cnt == 0) { |
194 | rd_kafka_sasl_global_term(); |
195 | #if WITH_SSL |
196 | rd_kafka_ssl_term(); |
197 | #endif |
198 | } |
199 | mtx_unlock(&rd_kafka_global_lock); |
200 | } |
201 | |
202 | |
203 | /** |
204 | * Wait for all rd_kafka_t objects to be destroyed. |
205 | * Returns 0 if all kafka objects are now destroyed, or -1 if the |
206 | * timeout was reached. |
207 | */ |
208 | int rd_kafka_wait_destroyed (int timeout_ms) { |
209 | rd_ts_t timeout = rd_clock() + (timeout_ms * 1000); |
210 | |
211 | while (rd_kafka_thread_cnt() > 0 || |
212 | rd_kafka_global_cnt_get() > 0) { |
213 | if (rd_clock() >= timeout) { |
214 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, |
215 | ETIMEDOUT); |
216 | #if ENABLE_SHAREDPTR_DEBUG |
217 | rd_shared_ptrs_dump(); |
218 | #endif |
219 | return -1; |
220 | } |
221 | rd_usleep(25000, NULL); /* 25ms */ |
222 | } |
223 | |
224 | return 0; |
225 | } |
226 | |
227 | static void rd_kafka_log_buf (const rd_kafka_conf_t *conf, |
228 | const rd_kafka_t *rk, int level, const char *fac, |
229 | const char *buf) { |
230 | if (level > conf->log_level) |
231 | return; |
232 | else if (rk && conf->log_queue) { |
233 | rd_kafka_op_t *rko; |
234 | |
235 | if (!rk->rk_logq) |
236 | return; /* Terminating */ |
237 | |
238 | rko = rd_kafka_op_new(RD_KAFKA_OP_LOG); |
239 | rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM); |
240 | rko->rko_u.log.level = level; |
241 | strncpy(rko->rko_u.log.fac, fac, |
242 | sizeof(rko->rko_u.log.fac) - 1); |
243 | rko->rko_u.log.str = rd_strdup(buf); |
244 | rd_kafka_q_enq(rk->rk_logq, rko); |
245 | |
246 | } else if (conf->log_cb) { |
247 | conf->log_cb(rk, level, fac, buf); |
248 | } |
249 | } |
250 | |
251 | /** |
252 | * @brief Logger |
253 | * |
254 | * @remark conf must be set, but rk may be NULL |
255 | */ |
256 | void rd_kafka_log0 (const rd_kafka_conf_t *conf, |
257 | const rd_kafka_t *rk, |
258 | const char *, int level, |
259 | const char *fac, const char *fmt, ...) { |
260 | char buf[2048]; |
261 | va_list ap; |
262 | unsigned int elen = 0; |
263 | unsigned int of = 0; |
264 | |
265 | if (level > conf->log_level) |
266 | return; |
267 | |
268 | if (conf->log_thread_name) { |
269 | elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: " , |
270 | rd_kafka_thread_name); |
271 | if (unlikely(elen >= sizeof(buf))) |
272 | elen = sizeof(buf); |
273 | of = elen; |
274 | } |
275 | |
276 | if (extra) { |
277 | elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: " , extra); |
278 | if (unlikely(elen >= sizeof(buf)-of)) |
279 | elen = sizeof(buf)-of; |
280 | of += elen; |
281 | } |
282 | |
283 | va_start(ap, fmt); |
284 | rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap); |
285 | va_end(ap); |
286 | |
287 | rd_kafka_log_buf(conf, rk, level, fac, buf); |
288 | } |
289 | |
290 | rd_kafka_resp_err_t |
291 | rd_kafka_oauthbearer_set_token (rd_kafka_t *rk, |
292 | const char *token_value, |
293 | int64_t md_lifetime_ms, |
294 | const char *md_principal_name, |
295 | const char **extensions, size_t extension_size, |
296 | char *errstr, size_t errstr_size) { |
297 | #if WITH_SASL_OAUTHBEARER |
298 | return rd_kafka_oauthbearer_set_token0( |
299 | rk, token_value, |
300 | md_lifetime_ms, md_principal_name, extensions, extension_size, |
301 | errstr, errstr_size); |
302 | #else |
303 | rd_snprintf(errstr, errstr_size, |
304 | "librdkafka not built with SASL OAUTHBEARER support" ); |
305 | return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
306 | #endif |
307 | } |
308 | |
309 | rd_kafka_resp_err_t |
310 | rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr) { |
311 | #if WITH_SASL_OAUTHBEARER |
312 | return rd_kafka_oauthbearer_set_token_failure0(rk, errstr); |
313 | #else |
314 | return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
315 | #endif |
316 | } |
317 | |
318 | void rd_kafka_log_print(const rd_kafka_t *rk, int level, |
319 | const char *fac, const char *buf) { |
320 | int secs, msecs; |
321 | struct timeval tv; |
322 | rd_gettimeofday(&tv, NULL); |
323 | secs = (int)tv.tv_sec; |
324 | msecs = (int)(tv.tv_usec / 1000); |
325 | fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n" , |
326 | level, secs, msecs, |
327 | fac, rk ? rk->rk_name : "" , buf); |
328 | } |
329 | |
330 | #ifndef _MSC_VER |
331 | void rd_kafka_log_syslog (const rd_kafka_t *rk, int level, |
332 | const char *fac, const char *buf) { |
333 | static int initialized = 0; |
334 | |
335 | if (!initialized) |
336 | openlog("rdkafka" , LOG_PID|LOG_CONS, LOG_USER); |
337 | |
338 | syslog(level, "%s: %s: %s" , fac, rk ? rk->rk_name : "" , buf); |
339 | } |
340 | #endif |
341 | |
342 | void rd_kafka_set_logger (rd_kafka_t *rk, |
343 | void (*func) (const rd_kafka_t *rk, int level, |
344 | const char *fac, const char *buf)) { |
345 | rk->rk_conf.log_cb = func; |
346 | } |
347 | |
348 | void rd_kafka_set_log_level (rd_kafka_t *rk, int level) { |
349 | rk->rk_conf.log_level = level; |
350 | } |
351 | |
352 | |
353 | |
354 | |
355 | |
356 | |
357 | static const char *rd_kafka_type2str (rd_kafka_type_t type) { |
358 | static const char *types[] = { |
359 | [RD_KAFKA_PRODUCER] = "producer" , |
360 | [RD_KAFKA_CONSUMER] = "consumer" , |
361 | }; |
362 | return types[type]; |
363 | } |
364 | |
365 | #define _ERR_DESC(ENUM,DESC) \ |
366 | [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC } |
367 | |
368 | static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { |
369 | _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL), |
370 | _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG, |
371 | "Local: Bad message format" ), |
372 | _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION, |
373 | "Local: Invalid compressed data" ), |
374 | _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY, |
375 | "Local: Broker handle destroyed" ), |
376 | _ERR_DESC(RD_KAFKA_RESP_ERR__FAIL, |
377 | "Local: Communication failure with broker" ), //FIXME: too specific |
378 | _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT, |
379 | "Local: Broker transport failure" ), |
380 | _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, |
381 | "Local: Critical system resource failure" ), |
382 | _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE, |
383 | "Local: Host resolution failure" ), |
384 | _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, |
385 | "Local: Message timed out" ), |
386 | _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF, |
387 | "Broker: No more messages" ), |
388 | _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
389 | "Local: Unknown partition" ), |
390 | _ERR_DESC(RD_KAFKA_RESP_ERR__FS, |
391 | "Local: File or filesystem error" ), |
392 | _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC, |
393 | "Local: Unknown topic" ), |
394 | _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, |
395 | "Local: All broker connections are down" ), |
396 | _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG, |
397 | "Local: Invalid argument or configuration" ), |
398 | _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT, |
399 | "Local: Timed out" ), |
400 | _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL, |
401 | "Local: Queue full" ), |
402 | _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF, |
403 | "Local: ISR count insufficient" ), |
404 | _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE, |
405 | "Local: Broker node update" ), |
406 | _ERR_DESC(RD_KAFKA_RESP_ERR__SSL, |
407 | "Local: SSL error" ), |
408 | _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD, |
409 | "Local: Waiting for coordinator" ), |
410 | _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP, |
411 | "Local: Unknown group" ), |
412 | _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS, |
413 | "Local: Operation in progress" ), |
414 | _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, |
415 | "Local: Previous operation in progress" ), |
416 | _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION, |
417 | "Local: Existing subscription" ), |
418 | _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, |
419 | "Local: Assign partitions" ), |
420 | _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, |
421 | "Local: Revoke partitions" ), |
422 | _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT, |
423 | "Local: Conflicting use" ), |
424 | _ERR_DESC(RD_KAFKA_RESP_ERR__STATE, |
425 | "Local: Erroneous state" ), |
426 | _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL, |
427 | "Local: Unknown protocol" ), |
428 | _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, |
429 | "Local: Not implemented" ), |
430 | _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION, |
431 | "Local: Authentication failure" ), |
432 | _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET, |
433 | "Local: No offset stored" ), |
434 | _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED, |
435 | "Local: Outdated" ), |
436 | _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, |
437 | "Local: Timed out in queue" ), |
438 | _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, |
439 | "Local: Required feature not supported by broker" ), |
440 | _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE, |
441 | "Local: Awaiting cache update" ), |
442 | _ERR_DESC(RD_KAFKA_RESP_ERR__INTR, |
443 | "Local: Operation interrupted" ), |
444 | _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION, |
445 | "Local: Key serialization error" ), |
446 | _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION, |
447 | "Local: Value serialization error" ), |
448 | _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION, |
449 | "Local: Key deserialization error" ), |
450 | _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION, |
451 | "Local: Value deserialization error" ), |
452 | _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL, |
453 | "Local: Partial response" ), |
454 | _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY, |
455 | "Local: Read-only object" ), |
456 | _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT, |
457 | "Local: No such entry" ), |
458 | _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW, |
459 | "Local: Read underflow" ), |
460 | _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE, |
461 | "Local: Invalid type" ), |
462 | _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY, |
463 | "Local: Retry operation" ), |
464 | _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE, |
465 | "Local: Purged in queue" ), |
466 | _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT, |
467 | "Local: Purged in flight" ), |
468 | _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL, |
469 | "Local: Fatal error" ), |
470 | _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT, |
471 | "Local: Inconsistent state" ), |
472 | _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE, |
473 | "Local: Gap-less ordering would not be guaranteed " |
474 | "if proceeding" ), |
475 | _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED, |
476 | "Local: Maximum application poll interval " |
477 | "(max.poll.interval.ms) exceeded" ), |
478 | |
479 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN, |
480 | "Unknown broker error" ), |
481 | _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR, |
482 | "Success" ), |
483 | _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE, |
484 | "Broker: Offset out of range" ), |
485 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG, |
486 | "Broker: Invalid message" ), |
487 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, |
488 | "Broker: Unknown topic or partition" ), |
489 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE, |
490 | "Broker: Invalid message size" ), |
491 | _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, |
492 | "Broker: Leader not available" ), |
493 | _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, |
494 | "Broker: Not leader for partition" ), |
495 | _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, |
496 | "Broker: Request timed out" ), |
497 | _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE, |
498 | "Broker: Broker not available" ), |
499 | _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, |
500 | "Broker: Replica not available" ), |
501 | _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, |
502 | "Broker: Message size too large" ), |
503 | _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH, |
504 | "Broker: StaleControllerEpochCode" ), |
505 | _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE, |
506 | "Broker: Offset metadata string too large" ), |
507 | _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION, |
508 | "Broker: Broker disconnected before response received" ), |
509 | _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS, |
510 | "Broker: Group coordinator load in progress" ), |
511 | _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, |
512 | "Broker: Group coordinator not available" ), |
513 | _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP, |
514 | "Broker: Not coordinator for group" ), |
515 | _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION, |
516 | "Broker: Invalid topic" ), |
517 | _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE, |
518 | "Broker: Message batch larger than configured server " |
519 | "segment size" ), |
520 | _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, |
521 | "Broker: Not enough in-sync replicas" ), |
522 | _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND, |
523 | "Broker: Message(s) written to insufficient number of " |
524 | "in-sync replicas" ), |
525 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS, |
526 | "Broker: Invalid required acks value" ), |
527 | _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, |
528 | "Broker: Specified group generation id is not valid" ), |
529 | _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL, |
530 | "Broker: Inconsistent group protocol" ), |
531 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID, |
532 | "Broker: Invalid group.id" ), |
533 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, |
534 | "Broker: Unknown member" ), |
535 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT, |
536 | "Broker: Invalid session timeout" ), |
537 | _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, |
538 | "Broker: Group rebalance in progress" ), |
539 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE, |
540 | "Broker: Commit offset data size is not valid" ), |
541 | _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, |
542 | "Broker: Topic authorization failed" ), |
543 | _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, |
544 | "Broker: Group authorization failed" ), |
545 | _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED, |
546 | "Broker: Cluster authorization failed" ), |
547 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP, |
548 | "Broker: Invalid timestamp" ), |
549 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM, |
550 | "Broker: Unsupported SASL mechanism" ), |
551 | _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE, |
552 | "Broker: Request not valid in current SASL state" ), |
553 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION, |
554 | "Broker: API version not supported" ), |
555 | _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS, |
556 | "Broker: Topic already exists" ), |
557 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS, |
558 | "Broker: Invalid number of partitions" ), |
559 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR, |
560 | "Broker: Invalid replication factor" ), |
561 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT, |
562 | "Broker: Invalid replica assignment" ), |
563 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG, |
564 | "Broker: Configuration is invalid" ), |
565 | _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER, |
566 | "Broker: Not controller for cluster" ), |
567 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST, |
568 | "Broker: Invalid request" ), |
569 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT, |
570 | "Broker: Message format on broker does not support request" ), |
571 | _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION, |
572 | "Broker: Policy violation" ), |
573 | _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, |
574 | "Broker: Broker received an out of order sequence number" ), |
575 | _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER, |
576 | "Broker: Broker received a duplicate sequence number" ), |
577 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, |
578 | "Broker: Producer attempted an operation with an old epoch" ), |
579 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, |
580 | "Broker: Producer attempted a transactional operation in " |
581 | "an invalid state" ), |
582 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING, |
583 | "Broker: Producer attempted to use a producer id which is " |
584 | "not currently assigned to its transactional id" ), |
585 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT, |
586 | "Broker: Transaction timeout is larger than the maximum " |
587 | "value allowed by the broker's max.transaction.timeout.ms" ), |
588 | _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, |
589 | "Broker: Producer attempted to update a transaction while " |
590 | "another concurrent operation on the same transaction was " |
591 | "ongoing" ), |
592 | _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED, |
593 | "Broker: Indicates that the transaction coordinator sending " |
594 | "a WriteTxnMarker is no longer the current coordinator for " |
595 | "a given producer" ), |
596 | _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED, |
597 | "Broker: Transactional Id authorization failed" ), |
598 | _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED, |
599 | "Broker: Security features are disabled" ), |
600 | _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED, |
601 | "Broker: Operation not attempted" ), |
602 | _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, |
603 | "Disk error when trying to access log file on the disk" ), |
604 | _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND, |
605 | "The user-specified log directory is not found " |
606 | "in the broker config" ), |
607 | _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED, |
608 | "SASL Authentication failed" ), |
609 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, |
610 | "Unknown Producer Id" ), |
611 | _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS, |
612 | "Partition reassignment is in progress" ), |
613 | _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED, |
614 | "Delegation Token feature is not enabled" ), |
615 | _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND, |
616 | "Delegation Token is not found on server" ), |
617 | _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH, |
618 | "Specified Principal is not valid Owner/Renewer" ), |
619 | _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, |
620 | "Delegation Token requests are not allowed on " |
621 | "this connection" ), |
622 | _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED, |
623 | "Delegation Token authorization failed" ), |
624 | _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED, |
625 | "Delegation Token is expired" ), |
626 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE, |
627 | "Supplied principalType is not supported" ), |
628 | _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP, |
629 | "The group is not empty" ), |
630 | _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND, |
631 | "The group id does not exist" ), |
632 | _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND, |
633 | "The fetch session ID was not found" ), |
634 | _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH, |
635 | "The fetch session epoch is invalid" ), |
636 | _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND, |
637 | "No matching listener" ), |
638 | _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED, |
639 | "Topic deletion is disabled" ), |
640 | _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, |
641 | "Leader epoch is older than broker epoch" ), |
642 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH, |
643 | "Leader epoch is newer than broker epoch" ), |
644 | _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE, |
645 | "Unsupported compression type" ), |
646 | _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH, |
647 | "Broker epoch has changed" ), |
648 | _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, |
649 | "Leader high watermark is not caught up" ), |
650 | _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED, |
651 | "Group member needs a valid member ID" ), |
652 | _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE, |
653 | "Preferred leader was not available" ), |
654 | _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED, |
655 | "Consumer group has reached maximum size" ), |
656 | |
657 | _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL) |
658 | }; |
659 | |
660 | |
661 | void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs, |
662 | size_t *cntp) { |
663 | *errdescs = rd_kafka_err_descs; |
664 | *cntp = RD_ARRAYSIZE(rd_kafka_err_descs); |
665 | } |
666 | |
667 | |
668 | const char *rd_kafka_err2str (rd_kafka_resp_err_t err) { |
669 | static RD_TLS char ret[32]; |
670 | int idx = err - RD_KAFKA_RESP_ERR__BEGIN; |
671 | |
672 | if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN || |
673 | err >= RD_KAFKA_RESP_ERR_END_ALL || |
674 | !rd_kafka_err_descs[idx].desc)) { |
675 | rd_snprintf(ret, sizeof(ret), "Err-%i?" , err); |
676 | return ret; |
677 | } |
678 | |
679 | return rd_kafka_err_descs[idx].desc; |
680 | } |
681 | |
682 | |
683 | const char *rd_kafka_err2name (rd_kafka_resp_err_t err) { |
684 | static RD_TLS char ret[32]; |
685 | int idx = err - RD_KAFKA_RESP_ERR__BEGIN; |
686 | |
687 | if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN || |
688 | err >= RD_KAFKA_RESP_ERR_END_ALL || |
689 | !rd_kafka_err_descs[idx].desc)) { |
690 | rd_snprintf(ret, sizeof(ret), "ERR_%i?" , err); |
691 | return ret; |
692 | } |
693 | |
694 | return rd_kafka_err_descs[idx].name; |
695 | } |
696 | |
697 | |
698 | rd_kafka_resp_err_t rd_kafka_last_error (void) { |
699 | return rd_kafka_last_error_code; |
700 | } |
701 | |
702 | |
703 | rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) { |
704 | switch (errnox) |
705 | { |
706 | case EINVAL: |
707 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
708 | |
709 | case EBUSY: |
710 | return RD_KAFKA_RESP_ERR__CONFLICT; |
711 | |
712 | case ENOENT: |
713 | return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
714 | |
715 | case ESRCH: |
716 | return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
717 | |
718 | case ETIMEDOUT: |
719 | return RD_KAFKA_RESP_ERR__TIMED_OUT; |
720 | |
721 | case EMSGSIZE: |
722 | return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; |
723 | |
724 | case ENOBUFS: |
725 | return RD_KAFKA_RESP_ERR__QUEUE_FULL; |
726 | |
727 | case ECANCELED: |
728 | return RD_KAFKA_RESP_ERR__FATAL; |
729 | |
730 | default: |
731 | return RD_KAFKA_RESP_ERR__FAIL; |
732 | } |
733 | } |
734 | |
735 | |
736 | rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk, |
737 | char *errstr, size_t errstr_size) { |
738 | rd_kafka_resp_err_t err; |
739 | |
740 | if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) { |
741 | rd_kafka_rdlock(rk); |
742 | rd_snprintf(errstr, errstr_size, "%s" , rk->rk_fatal.errstr); |
743 | rd_kafka_rdunlock(rk); |
744 | } |
745 | |
746 | return err; |
747 | } |
748 | |
749 | |
750 | /** |
751 | * @brief Set's the fatal error for this instance. |
752 | * |
753 | * @returns 1 if the error was set, or 0 if a previous fatal error |
754 | * has already been set on this instance. |
755 | * |
756 | * @locality any |
757 | * @locks none |
758 | */ |
759 | int rd_kafka_set_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err, |
760 | const char *fmt, ...) { |
761 | va_list ap; |
762 | char buf[512]; |
763 | |
764 | rd_kafka_wrlock(rk); |
765 | rk->rk_fatal.cnt++; |
766 | if (rd_atomic32_get(&rk->rk_fatal.err)) { |
767 | rd_kafka_wrunlock(rk); |
768 | rd_kafka_dbg(rk, GENERIC, "FATAL" , |
769 | "Suppressing subsequent fatal error: %s" , |
770 | rd_kafka_err2name(err)); |
771 | return 0; |
772 | } |
773 | |
774 | rd_atomic32_set(&rk->rk_fatal.err, err); |
775 | |
776 | va_start(ap, fmt); |
777 | rd_vsnprintf(buf, sizeof(buf), fmt, ap); |
778 | va_end(ap); |
779 | rk->rk_fatal.errstr = rd_strdup(buf); |
780 | |
781 | rd_kafka_wrunlock(rk); |
782 | |
783 | /* If there is an error callback or event handler we |
784 | * also log the fatal error as it happens. |
785 | * If there is no error callback the error event |
786 | * will be automatically logged, and this check here |
787 | * prevents us from duplicate logs. */ |
788 | if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR) |
789 | rd_kafka_log(rk, LOG_EMERG, "FATAL" , |
790 | "Fatal error: %s: %s" , |
791 | rd_kafka_err2str(err), rk->rk_fatal.errstr); |
792 | else |
793 | rd_kafka_dbg(rk, ALL, "FATAL" , |
794 | "Fatal error: %s: %s" , |
795 | rd_kafka_err2str(err), rk->rk_fatal.errstr); |
796 | |
797 | /* Indicate to the application that a fatal error was raised, |
798 | * the app should use rd_kafka_fatal_error() to extract the |
799 | * fatal error code itself. */ |
800 | rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL, |
801 | "Fatal error: %s: %s" , |
802 | rd_kafka_err2str(err), rk->rk_fatal.errstr); |
803 | |
804 | |
805 | /* Purge producer queues, but not in-flight since we'll |
806 | * want proper delivery status for transmitted requests. |
807 | * Need NON_BLOCKING to avoid dead-lock if user is |
808 | * calling purge() at the same time, which could be |
809 | * waiting for this broker thread to handle its |
810 | * OP_PURGE request. */ |
811 | if (rk->rk_type == RD_KAFKA_PRODUCER) |
812 | rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE| |
813 | RD_KAFKA_PURGE_F_NON_BLOCKING); |
814 | |
815 | return 1; |
816 | } |
817 | |
818 | |
819 | rd_kafka_resp_err_t |
820 | rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err, |
821 | const char *reason) { |
822 | if (rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s" , reason)) |
823 | return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; |
824 | else |
825 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
826 | } |
827 | |
828 | |
829 | |
830 | /** |
831 | * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0. |
832 | * |
833 | * @locality application thread |
834 | */ |
835 | void rd_kafka_destroy_final (rd_kafka_t *rk) { |
836 | |
837 | rd_kafka_assert(rk, rd_kafka_terminating(rk)); |
838 | |
839 | /* Synchronize state */ |
840 | rd_kafka_wrlock(rk); |
841 | rd_kafka_wrunlock(rk); |
842 | |
843 | rd_kafka_assignors_term(rk); |
844 | |
845 | rd_kafka_metadata_cache_destroy(rk); |
846 | |
847 | /* Terminate SASL provider */ |
848 | if (rk->rk_conf.sasl.provider) |
849 | rd_kafka_sasl_term(rk); |
850 | |
851 | rd_kafka_timers_destroy(&rk->rk_timers); |
852 | |
853 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , "Destroying op queues" ); |
854 | |
855 | /* Destroy cgrp */ |
856 | if (rk->rk_cgrp) { |
857 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
858 | "Destroying cgrp" ); |
859 | /* Reset queue forwarding (rep -> cgrp) */ |
860 | rd_kafka_q_fwd_set(rk->rk_rep, NULL); |
861 | rd_kafka_cgrp_destroy_final(rk->rk_cgrp); |
862 | } |
863 | |
864 | /* Purge op-queues */ |
865 | rd_kafka_q_destroy_owner(rk->rk_rep); |
866 | rd_kafka_q_destroy_owner(rk->rk_ops); |
867 | |
868 | #if WITH_SSL |
869 | if (rk->rk_conf.ssl.ctx) { |
870 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , "Destroying SSL CTX" ); |
871 | rd_kafka_ssl_ctx_term(rk); |
872 | } |
873 | #endif |
874 | |
875 | /* It is not safe to log after this point. */ |
876 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
877 | "Termination done: freeing resources" ); |
878 | |
879 | if (rk->rk_logq) { |
880 | rd_kafka_q_destroy_owner(rk->rk_logq); |
881 | rk->rk_logq = NULL; |
882 | } |
883 | |
884 | if (rk->rk_type == RD_KAFKA_PRODUCER) { |
885 | cnd_destroy(&rk->rk_curr_msgs.cnd); |
886 | mtx_destroy(&rk->rk_curr_msgs.lock); |
887 | } |
888 | |
889 | if (rk->rk_fatal.errstr) { |
890 | rd_free(rk->rk_fatal.errstr); |
891 | rk->rk_fatal.errstr = NULL; |
892 | } |
893 | |
894 | cnd_destroy(&rk->rk_broker_state_change_cnd); |
895 | mtx_destroy(&rk->rk_broker_state_change_lock); |
896 | |
897 | mtx_destroy(&rk->rk_suppress.sparse_connect_lock); |
898 | |
899 | cnd_destroy(&rk->rk_init_cnd); |
900 | mtx_destroy(&rk->rk_init_lock); |
901 | |
902 | if (rk->rk_full_metadata) |
903 | rd_kafka_metadata_destroy(rk->rk_full_metadata); |
904 | rd_kafkap_str_destroy(rk->rk_client_id); |
905 | rd_kafkap_str_destroy(rk->rk_group_id); |
906 | rd_kafkap_str_destroy(rk->rk_eos.transactional_id); |
907 | rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf); |
908 | rd_list_destroy(&rk->rk_broker_by_id); |
909 | |
910 | rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes); |
911 | rwlock_destroy(&rk->rk_lock); |
912 | |
913 | rd_free(rk); |
914 | rd_kafka_global_cnt_decr(); |
915 | } |
916 | |
917 | |
918 | static void rd_kafka_destroy_app (rd_kafka_t *rk, int flags) { |
919 | thrd_t thrd; |
920 | #ifndef _MSC_VER |
921 | int term_sig = rk->rk_conf.term_sig; |
922 | #endif |
923 | int res; |
924 | char flags_str[256]; |
925 | static const char *rd_kafka_destroy_flags_names[] = { |
926 | "Terminate" , |
927 | "DestroyCalled" , |
928 | "Immediate" , |
929 | "NoConsumerClose" , |
930 | NULL |
931 | }; |
932 | |
933 | /* _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */ |
934 | if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE) |
935 | flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE; |
936 | |
937 | rd_flags2str(flags_str, sizeof(flags_str), |
938 | rd_kafka_destroy_flags_names, flags); |
939 | rd_kafka_dbg(rk, ALL, "DESTROY" , "Terminating instance " |
940 | "(destroy flags %s (0x%x))" , |
941 | flags ? flags_str : "none" , flags); |
942 | |
943 | /* Make sure destroy is not called from a librdkafka thread |
944 | * since this will most likely cause a deadlock. |
945 | * FIXME: include broker threads (for log_cb) */ |
946 | if (thrd_is_current(rk->rk_thread) || |
947 | thrd_is_current(rk->rk_background.thread)) { |
948 | rd_kafka_log(rk, LOG_EMERG, "BGQUEUE" , |
949 | "Application bug: " |
950 | "rd_kafka_destroy() called from " |
951 | "librdkafka owned thread" ); |
952 | rd_kafka_assert(NULL, |
953 | !*"Application bug: " |
954 | "calling rd_kafka_destroy() from " |
955 | "librdkafka owned thread is prohibited" ); |
956 | } |
957 | |
958 | /* Before signaling for general termination, set the destroy |
959 | * flags to hint cgrp how to shut down. */ |
960 | rd_atomic32_set(&rk->rk_terminate, |
961 | flags|RD_KAFKA_DESTROY_F_DESTROY_CALLED); |
962 | |
963 | /* The legacy/simple consumer lacks an API to close down the consumer*/ |
964 | if (rk->rk_cgrp) { |
965 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
966 | "Terminating consumer group handler" ); |
967 | rd_kafka_consumer_close(rk); |
968 | } |
969 | |
970 | /* With the consumer closed, terminate the rest of librdkafka. */ |
971 | rd_atomic32_set(&rk->rk_terminate, flags|RD_KAFKA_DESTROY_F_TERMINATE); |
972 | |
973 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , "Interrupting timers" ); |
974 | rd_kafka_wrlock(rk); |
975 | thrd = rk->rk_thread; |
976 | rd_kafka_timers_interrupt(&rk->rk_timers); |
977 | rd_kafka_wrunlock(rk); |
978 | |
979 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
980 | "Sending TERMINATE to internal main thread" ); |
981 | /* Send op to trigger queue/io wake-up. |
982 | * The op itself is (likely) ignored by the receiver. */ |
983 | rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
984 | |
985 | #ifndef _MSC_VER |
986 | /* Interrupt main kafka thread to speed up termination. */ |
987 | if (term_sig) { |
988 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
989 | "Sending thread kill signal %d" , term_sig); |
990 | pthread_kill(thrd, term_sig); |
991 | } |
992 | #endif |
993 | |
994 | if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE)) |
995 | return; /* FIXME: thread resource leak */ |
996 | |
997 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
998 | "Joining internal main thread" ); |
999 | |
1000 | if (thrd_join(thrd, &res) != thrd_success) |
1001 | rd_kafka_log(rk, LOG_ERR, "DESTROY" , |
1002 | "Failed to join internal main thread: %s " |
1003 | "(was process forked?)" , |
1004 | rd_strerror(errno)); |
1005 | |
1006 | rd_kafka_destroy_final(rk); |
1007 | } |
1008 | |
1009 | |
1010 | /* NOTE: Must only be called by application. |
1011 | * librdkafka itself must use rd_kafka_destroy0(). */ |
1012 | void rd_kafka_destroy (rd_kafka_t *rk) { |
1013 | rd_kafka_destroy_app(rk, 0); |
1014 | } |
1015 | |
1016 | void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags) { |
1017 | rd_kafka_destroy_app(rk, flags); |
1018 | } |
1019 | |
1020 | |
1021 | /** |
1022 | * Main destructor for rd_kafka_t |
1023 | * |
1024 | * Locality: rdkafka main thread or application thread during rd_kafka_new() |
1025 | */ |
1026 | static void rd_kafka_destroy_internal (rd_kafka_t *rk) { |
1027 | rd_kafka_itopic_t *rkt, *rkt_tmp; |
1028 | rd_kafka_broker_t *rkb, *rkb_tmp; |
1029 | rd_list_t wait_thrds; |
1030 | thrd_t *thrd; |
1031 | int i; |
1032 | |
1033 | rd_kafka_dbg(rk, ALL, "DESTROY" , "Destroy internal" ); |
1034 | |
1035 | /* Trigger any state-change waiters (which should check the |
1036 | * terminate flag whenever they wake up). */ |
1037 | rd_kafka_brokers_broadcast_state_change(rk); |
1038 | |
1039 | if (rk->rk_background.thread) { |
1040 | int res; |
1041 | /* Send op to trigger queue/io wake-up. |
1042 | * The op itself is (likely) ignored by the receiver. */ |
1043 | rd_kafka_q_enq(rk->rk_background.q, |
1044 | rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
1045 | |
1046 | rd_kafka_dbg(rk, ALL, "DESTROY" , |
1047 | "Waiting for background queue thread " |
1048 | "to terminate" ); |
1049 | thrd_join(rk->rk_background.thread, &res); |
1050 | rd_kafka_q_destroy_owner(rk->rk_background.q); |
1051 | } |
1052 | |
1053 | /* Call on_destroy() interceptors */ |
1054 | rd_kafka_interceptors_on_destroy(rk); |
1055 | |
1056 | /* Brokers pick up on rk_terminate automatically. */ |
1057 | |
1058 | /* List of (broker) threads to join to synchronize termination */ |
1059 | rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL); |
1060 | |
1061 | rd_kafka_wrlock(rk); |
1062 | |
1063 | rd_kafka_dbg(rk, ALL, "DESTROY" , "Removing all topics" ); |
1064 | /* Decommission all topics */ |
1065 | TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) { |
1066 | rd_kafka_wrunlock(rk); |
1067 | rd_kafka_topic_partitions_remove(rkt); |
1068 | rd_kafka_wrlock(rk); |
1069 | } |
1070 | |
1071 | /* Decommission brokers. |
1072 | * Broker thread holds a refcount and detects when broker refcounts |
1073 | * reaches 1 and then decommissions itself. */ |
1074 | TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) { |
1075 | /* Add broker's thread to wait_thrds list for later joining */ |
1076 | thrd = malloc(sizeof(*thrd)); |
1077 | *thrd = rkb->rkb_thread; |
1078 | rd_list_add(&wait_thrds, thrd); |
1079 | rd_kafka_wrunlock(rk); |
1080 | |
1081 | rd_kafka_dbg(rk, BROKER, "DESTROY" , |
1082 | "Sending TERMINATE to %s" , |
1083 | rd_kafka_broker_name(rkb)); |
1084 | /* Send op to trigger queue/io wake-up. |
1085 | * The op itself is (likely) ignored by the broker thread. */ |
1086 | rd_kafka_q_enq(rkb->rkb_ops, |
1087 | rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
1088 | |
1089 | #ifndef _MSC_VER |
1090 | /* Interrupt IO threads to speed up termination. */ |
1091 | if (rk->rk_conf.term_sig) |
1092 | pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig); |
1093 | #endif |
1094 | |
1095 | rd_kafka_broker_destroy(rkb); |
1096 | |
1097 | rd_kafka_wrlock(rk); |
1098 | } |
1099 | |
1100 | if (rk->rk_clusterid) { |
1101 | rd_free(rk->rk_clusterid); |
1102 | rk->rk_clusterid = NULL; |
1103 | } |
1104 | |
1105 | rd_kafka_wrunlock(rk); |
1106 | |
1107 | mtx_lock(&rk->rk_broker_state_change_lock); |
1108 | /* Purge broker state change waiters */ |
1109 | rd_list_destroy(&rk->rk_broker_state_change_waiters); |
1110 | mtx_unlock(&rk->rk_broker_state_change_lock); |
1111 | |
1112 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
1113 | "Purging reply queue" ); |
1114 | |
1115 | /* Purge op-queue */ |
1116 | rd_kafka_q_disable(rk->rk_rep); |
1117 | rd_kafka_q_purge(rk->rk_rep); |
1118 | |
1119 | /* Loose our special reference to the internal broker. */ |
1120 | mtx_lock(&rk->rk_internal_rkb_lock); |
1121 | if ((rkb = rk->rk_internal_rkb)) { |
1122 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
1123 | "Decommissioning internal broker" ); |
1124 | |
1125 | /* Send op to trigger queue wake-up. */ |
1126 | rd_kafka_q_enq(rkb->rkb_ops, |
1127 | rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); |
1128 | |
1129 | rk->rk_internal_rkb = NULL; |
1130 | thrd = malloc(sizeof(*thrd)); |
1131 | *thrd = rkb->rkb_thread; |
1132 | rd_list_add(&wait_thrds, thrd); |
1133 | } |
1134 | mtx_unlock(&rk->rk_internal_rkb_lock); |
1135 | if (rkb) |
1136 | rd_kafka_broker_destroy(rkb); |
1137 | |
1138 | |
1139 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
1140 | "Join %d broker thread(s)" , rd_list_cnt(&wait_thrds)); |
1141 | |
1142 | /* Join broker threads */ |
1143 | RD_LIST_FOREACH(thrd, &wait_thrds, i) { |
1144 | int res; |
1145 | if (thrd_join(*thrd, &res) != thrd_success) |
1146 | ; |
1147 | free(thrd); |
1148 | } |
1149 | |
1150 | rd_list_destroy(&wait_thrds); |
1151 | } |
1152 | |
1153 | /** |
1154 | * @brief Buffer state for stats emitter |
1155 | */ |
1156 | struct _stats_emit { |
1157 | char *buf; /* Pointer to allocated buffer */ |
1158 | size_t size; /* Current allocated size of buf */ |
1159 | size_t of; /* Current write-offset in buf */ |
1160 | }; |
1161 | |
1162 | |
1163 | /* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the |
1164 | * current scope. */ |
1165 | #define _st_printf(...) do { \ |
1166 | ssize_t _r; \ |
1167 | ssize_t _rem = st->size - st->of; \ |
1168 | _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \ |
1169 | if (_r >= _rem) { \ |
1170 | st->size *= 2; \ |
1171 | _rem = st->size - st->of; \ |
1172 | st->buf = rd_realloc(st->buf, st->size); \ |
1173 | _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \ |
1174 | } \ |
1175 | st->of += _r; \ |
1176 | } while (0) |
1177 | |
1178 | struct _stats_total { |
1179 | int64_t tx; /**< broker.tx */ |
1180 | int64_t tx_bytes; /**< broker.tx_bytes */ |
1181 | int64_t rx; /**< broker.rx */ |
1182 | int64_t rx_bytes; /**< broker.rx_bytes */ |
1183 | int64_t txmsgs; /**< partition.txmsgs */ |
1184 | int64_t txmsg_bytes; /**< partition.txbytes */ |
1185 | int64_t rxmsgs; /**< partition.rxmsgs */ |
1186 | int64_t rxmsg_bytes; /**< partition.rxbytes */ |
1187 | }; |
1188 | |
1189 | |
1190 | |
1191 | /** |
1192 | * @brief Rollover and emit an average window. |
1193 | */ |
1194 | static RD_INLINE void rd_kafka_stats_emit_avg (struct _stats_emit *st, |
1195 | const char *name, |
1196 | rd_avg_t *src_avg) { |
1197 | rd_avg_t avg; |
1198 | |
1199 | rd_avg_rollover(&avg, src_avg); |
1200 | _st_printf( |
1201 | "\"%s\": {" |
1202 | " \"min\":%" PRId64"," |
1203 | " \"max\":%" PRId64"," |
1204 | " \"avg\":%" PRId64"," |
1205 | " \"sum\":%" PRId64"," |
1206 | " \"stddev\": %" PRId64"," |
1207 | " \"p50\": %" PRId64"," |
1208 | " \"p75\": %" PRId64"," |
1209 | " \"p90\": %" PRId64"," |
1210 | " \"p95\": %" PRId64"," |
1211 | " \"p99\": %" PRId64"," |
1212 | " \"p99_99\": %" PRId64"," |
1213 | " \"outofrange\": %" PRId64"," |
1214 | " \"hdrsize\": %" PRId32"," |
1215 | " \"cnt\":%i " |
1216 | "}, " , |
1217 | name, |
1218 | avg.ra_v.minv, |
1219 | avg.ra_v.maxv, |
1220 | avg.ra_v.avg, |
1221 | avg.ra_v.sum, |
1222 | (int64_t)avg.ra_hist.stddev, |
1223 | avg.ra_hist.p50, |
1224 | avg.ra_hist.p75, |
1225 | avg.ra_hist.p90, |
1226 | avg.ra_hist.p95, |
1227 | avg.ra_hist.p99, |
1228 | avg.ra_hist.p99_99, |
1229 | avg.ra_hist.oor, |
1230 | avg.ra_hist.hdrsize, |
1231 | avg.ra_v.cnt); |
1232 | rd_avg_destroy(&avg); |
1233 | } |
1234 | |
1235 | /** |
1236 | * Emit stats for toppar |
1237 | */ |
1238 | static RD_INLINE void rd_kafka_stats_emit_toppar (struct _stats_emit *st, |
1239 | struct _stats_total *total, |
1240 | rd_kafka_toppar_t *rktp, |
1241 | int first) { |
1242 | rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; |
1243 | int64_t consumer_lag = -1; |
1244 | struct offset_stats offs; |
1245 | int32_t leader_nodeid = -1; |
1246 | |
1247 | rd_kafka_toppar_lock(rktp); |
1248 | |
1249 | if (rktp->rktp_leader) { |
1250 | rd_kafka_broker_lock(rktp->rktp_leader); |
1251 | leader_nodeid = rktp->rktp_leader->rkb_nodeid; |
1252 | rd_kafka_broker_unlock(rktp->rktp_leader); |
1253 | } |
1254 | |
1255 | /* Grab a copy of the latest finalized offset stats */ |
1256 | offs = rktp->rktp_offsets_fin; |
1257 | |
1258 | /* Calculate consumer_lag by using the highest offset |
1259 | * of app_offset (the last message passed to application + 1) |
1260 | * or the committed_offset (the last message committed by this or |
1261 | * another consumer). |
1262 | * Using app_offset allows consumer_lag to be up to date even if |
1263 | * offsets are not (yet) committed. |
1264 | */ |
1265 | if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID && |
1266 | (rktp->rktp_app_offset >= 0 || rktp->rktp_committed_offset >= 0)) { |
1267 | consumer_lag = rktp->rktp_hi_offset - |
1268 | RD_MAX(rktp->rktp_app_offset, |
1269 | rktp->rktp_committed_offset); |
1270 | if (unlikely(consumer_lag) < 0) |
1271 | consumer_lag = 0; |
1272 | } |
1273 | |
1274 | _st_printf("%s\"%" PRId32"\": { " |
1275 | "\"partition\":%" PRId32", " |
1276 | "\"leader\":%" PRId32", " |
1277 | "\"desired\":%s, " |
1278 | "\"unknown\":%s, " |
1279 | "\"msgq_cnt\":%i, " |
1280 | "\"msgq_bytes\":%" PRIusz", " |
1281 | "\"xmit_msgq_cnt\":%i, " |
1282 | "\"xmit_msgq_bytes\":%" PRIusz", " |
1283 | "\"fetchq_cnt\":%i, " |
1284 | "\"fetchq_size\":%" PRIu64", " |
1285 | "\"fetch_state\":\"%s\", " |
1286 | "\"query_offset\":%" PRId64", " |
1287 | "\"next_offset\":%" PRId64", " |
1288 | "\"app_offset\":%" PRId64", " |
1289 | "\"stored_offset\":%" PRId64", " |
1290 | "\"commited_offset\":%" PRId64", " /*FIXME: issue #80 */ |
1291 | "\"committed_offset\":%" PRId64", " |
1292 | "\"eof_offset\":%" PRId64", " |
1293 | "\"lo_offset\":%" PRId64", " |
1294 | "\"hi_offset\":%" PRId64", " |
1295 | "\"consumer_lag\":%" PRId64", " |
1296 | "\"txmsgs\":%" PRIu64", " |
1297 | "\"txbytes\":%" PRIu64", " |
1298 | "\"rxmsgs\":%" PRIu64", " |
1299 | "\"rxbytes\":%" PRIu64", " |
1300 | "\"msgs\": %" PRIu64", " |
1301 | "\"rx_ver_drops\": %" PRIu64", " |
1302 | "\"msgs_inflight\": %" PRId32", " |
1303 | "\"next_ack_seq\": %" PRId32", " |
1304 | "\"next_err_seq\": %" PRId32", " |
1305 | "\"acked_msgid\": %" PRIu64 |
1306 | "} " , |
1307 | first ? "" : ", " , |
1308 | rktp->rktp_partition, |
1309 | rktp->rktp_partition, |
1310 | leader_nodeid, |
1311 | (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true" :"false" , |
1312 | (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true" :"false" , |
1313 | rd_kafka_msgq_len(&rktp->rktp_msgq), |
1314 | rd_kafka_msgq_size(&rktp->rktp_msgq), |
1315 | /* FIXME: xmit_msgq is local to the broker thread. */ |
1316 | 0, |
1317 | (size_t)0, |
1318 | rd_kafka_q_len(rktp->rktp_fetchq), |
1319 | rd_kafka_q_size(rktp->rktp_fetchq), |
1320 | rd_kafka_fetch_states[rktp->rktp_fetch_state], |
1321 | rktp->rktp_query_offset, |
1322 | offs.fetch_offset, |
1323 | rktp->rktp_app_offset, |
1324 | rktp->rktp_stored_offset, |
1325 | rktp->rktp_committed_offset, /* FIXME: issue #80 */ |
1326 | rktp->rktp_committed_offset, |
1327 | offs.eof_offset, |
1328 | rktp->rktp_lo_offset, |
1329 | rktp->rktp_hi_offset, |
1330 | consumer_lag, |
1331 | rd_atomic64_get(&rktp->rktp_c.tx_msgs), |
1332 | rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes), |
1333 | rd_atomic64_get(&rktp->rktp_c.rx_msgs), |
1334 | rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes), |
1335 | rk->rk_type == RD_KAFKA_PRODUCER ? |
1336 | rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs) : |
1337 | rd_atomic64_get(&rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */ |
1338 | rd_atomic64_get(&rktp->rktp_c.rx_ver_drops), |
1339 | rd_atomic32_get(&rktp->rktp_msgs_inflight), |
1340 | rktp->rktp_eos.next_ack_seq, |
1341 | rktp->rktp_eos.next_err_seq, |
1342 | rktp->rktp_eos.acked_msgid); |
1343 | |
1344 | if (total) { |
1345 | total->txmsgs += rd_atomic64_get(&rktp->rktp_c.tx_msgs); |
1346 | total->txmsg_bytes += rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes); |
1347 | total->rxmsgs += rd_atomic64_get(&rktp->rktp_c.rx_msgs); |
1348 | total->rxmsg_bytes += rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes); |
1349 | } |
1350 | |
1351 | rd_kafka_toppar_unlock(rktp); |
1352 | } |
1353 | |
1354 | /** |
1355 | * @brief Emit broker request type stats |
1356 | */ |
1357 | static void rd_kafka_stats_emit_broker_reqs (struct _stats_emit *st, |
1358 | rd_kafka_broker_t *rkb) { |
1359 | /* Filter out request types that will never be sent by the client. */ |
1360 | static const rd_bool_t filter[4][RD_KAFKAP__NUM] = { |
1361 | [RD_KAFKA_PRODUCER] = { |
1362 | [RD_KAFKAP_Fetch] = rd_true, |
1363 | [RD_KAFKAP_OffsetCommit] = rd_true, |
1364 | [RD_KAFKAP_OffsetFetch] = rd_true, |
1365 | [RD_KAFKAP_GroupCoordinator] = rd_true, |
1366 | [RD_KAFKAP_JoinGroup] = rd_true, |
1367 | [RD_KAFKAP_Heartbeat] = rd_true, |
1368 | [RD_KAFKAP_LeaveGroup] = rd_true, |
1369 | [RD_KAFKAP_SyncGroup] = rd_true |
1370 | }, |
1371 | [RD_KAFKA_CONSUMER] = { |
1372 | [RD_KAFKAP_Produce] = rd_true, |
1373 | [RD_KAFKAP_InitProducerId] = rd_true |
1374 | }, |
1375 | [2/*any client type*/] = { |
1376 | [RD_KAFKAP_UpdateMetadata] = rd_true, |
1377 | [RD_KAFKAP_ControlledShutdown] = rd_true, |
1378 | [RD_KAFKAP_LeaderAndIsr] = rd_true, |
1379 | [RD_KAFKAP_StopReplica] = rd_true, |
1380 | [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true, |
1381 | |
1382 | /* FIXME: Remove when transaction support is added */ |
1383 | [RD_KAFKAP_AddPartitionsToTxn] = rd_true, |
1384 | [RD_KAFKAP_AddOffsetsToTxn] = rd_true, |
1385 | [RD_KAFKAP_EndTxn] = rd_true, |
1386 | |
1387 | [RD_KAFKAP_WriteTxnMarkers] = rd_true, |
1388 | [RD_KAFKAP_TxnOffsetCommit] = rd_true, |
1389 | |
1390 | [RD_KAFKAP_AlterReplicaLogDirs] = rd_true, |
1391 | [RD_KAFKAP_DescribeLogDirs] = rd_true, |
1392 | |
1393 | /* FIXME: Remove when re-auth support is added */ |
1394 | [RD_KAFKAP_SaslAuthenticate] = rd_true, |
1395 | |
1396 | [RD_KAFKAP_CreateDelegationToken] = rd_true, |
1397 | [RD_KAFKAP_RenewDelegationToken] = rd_true, |
1398 | [RD_KAFKAP_ExpireDelegationToken] = rd_true, |
1399 | [RD_KAFKAP_DescribeDelegationToken] = rd_true |
1400 | }, |
1401 | [3/*hide-unless-non-zero*/] = { |
1402 | /* Hide Admin requests unless they've been used */ |
1403 | [RD_KAFKAP_CreateTopics] = rd_true, |
1404 | [RD_KAFKAP_DeleteTopics] = rd_true, |
1405 | [RD_KAFKAP_DeleteRecords] = rd_true, |
1406 | [RD_KAFKAP_CreatePartitions] = rd_true, |
1407 | [RD_KAFKAP_DescribeAcls] = rd_true, |
1408 | [RD_KAFKAP_CreateAcls] = rd_true, |
1409 | [RD_KAFKAP_DeleteAcls] = rd_true, |
1410 | [RD_KAFKAP_DescribeConfigs] = rd_true, |
1411 | [RD_KAFKAP_AlterConfigs] = rd_true, |
1412 | [RD_KAFKAP_DeleteGroups] = rd_true, |
1413 | [RD_KAFKAP_ListGroups] = rd_true, |
1414 | [RD_KAFKAP_DescribeGroups] = rd_true |
1415 | } |
1416 | }; |
1417 | int i; |
1418 | int cnt = 0; |
1419 | |
1420 | _st_printf("\"req\": { " ); |
1421 | for (i = 0 ; i < RD_KAFKAP__NUM ; i++) { |
1422 | int64_t v; |
1423 | |
1424 | if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i]) |
1425 | continue; |
1426 | |
1427 | v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]); |
1428 | if (!v && filter[3][i]) |
1429 | continue; /* Filter out zero values */ |
1430 | |
1431 | _st_printf("%s\"%s\": %" PRId64, |
1432 | cnt > 0 ? ", " : "" , |
1433 | rd_kafka_ApiKey2str(i), v); |
1434 | |
1435 | cnt++; |
1436 | } |
1437 | _st_printf(" }, " ); |
1438 | } |
1439 | |
1440 | |
1441 | /** |
1442 | * Emit all statistics |
1443 | */ |
1444 | static void rd_kafka_stats_emit_all (rd_kafka_t *rk) { |
1445 | rd_kafka_broker_t *rkb; |
1446 | rd_kafka_itopic_t *rkt; |
1447 | shptr_rd_kafka_toppar_t *s_rktp; |
1448 | rd_ts_t now; |
1449 | rd_kafka_op_t *rko; |
1450 | unsigned int tot_cnt; |
1451 | size_t tot_size; |
1452 | rd_kafka_resp_err_t err; |
1453 | struct _stats_emit stx = { .size = 1024*10 }; |
1454 | struct _stats_emit *st = &stx; |
1455 | struct _stats_total total = {0}; |
1456 | |
1457 | st->buf = rd_malloc(st->size); |
1458 | |
1459 | |
1460 | rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size); |
1461 | rd_kafka_rdlock(rk); |
1462 | |
1463 | now = rd_clock(); |
1464 | _st_printf("{ " |
1465 | "\"name\": \"%s\", " |
1466 | "\"client_id\": \"%s\", " |
1467 | "\"type\": \"%s\", " |
1468 | "\"ts\":%" PRId64", " |
1469 | "\"time\":%lli, " |
1470 | "\"replyq\":%i, " |
1471 | "\"msg_cnt\":%u, " |
1472 | "\"msg_size\":%" PRIusz", " |
1473 | "\"msg_max\":%u, " |
1474 | "\"msg_size_max\":%" PRIusz", " |
1475 | "\"simple_cnt\":%i, " |
1476 | "\"metadata_cache_cnt\":%i, " |
1477 | "\"brokers\":{ " /*open brokers*/, |
1478 | rk->rk_name, |
1479 | rk->rk_conf.client_id_str, |
1480 | rd_kafka_type2str(rk->rk_type), |
1481 | now, |
1482 | (signed long long)time(NULL), |
1483 | rd_kafka_q_len(rk->rk_rep), |
1484 | tot_cnt, tot_size, |
1485 | rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size, |
1486 | rd_atomic32_get(&rk->rk_simple_cnt), |
1487 | rk->rk_metadata_cache.rkmc_cnt); |
1488 | |
1489 | |
1490 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
1491 | rd_kafka_toppar_t *rktp; |
1492 | |
1493 | rd_kafka_broker_lock(rkb); |
1494 | _st_printf("%s\"%s\": { " /*open broker*/ |
1495 | "\"name\":\"%s\", " |
1496 | "\"nodeid\":%" PRId32", " |
1497 | "\"nodename\":\"%s\", " |
1498 | "\"source\":\"%s\", " |
1499 | "\"state\":\"%s\", " |
1500 | "\"stateage\":%" PRId64", " |
1501 | "\"outbuf_cnt\":%i, " |
1502 | "\"outbuf_msg_cnt\":%i, " |
1503 | "\"waitresp_cnt\":%i, " |
1504 | "\"waitresp_msg_cnt\":%i, " |
1505 | "\"tx\":%" PRIu64", " |
1506 | "\"txbytes\":%" PRIu64", " |
1507 | "\"txerrs\":%" PRIu64", " |
1508 | "\"txretries\":%" PRIu64", " |
1509 | "\"req_timeouts\":%" PRIu64", " |
1510 | "\"rx\":%" PRIu64", " |
1511 | "\"rxbytes\":%" PRIu64", " |
1512 | "\"rxerrs\":%" PRIu64", " |
1513 | "\"rxcorriderrs\":%" PRIu64", " |
1514 | "\"rxpartial\":%" PRIu64", " |
1515 | "\"zbuf_grow\":%" PRIu64", " |
1516 | "\"buf_grow\":%" PRIu64", " |
1517 | "\"wakeups\":%" PRIu64", " |
1518 | "\"connects\":%" PRId32", " |
1519 | "\"disconnects\":%" PRId32", " , |
1520 | rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", " , |
1521 | rkb->rkb_name, |
1522 | rkb->rkb_name, |
1523 | rkb->rkb_nodeid, |
1524 | rkb->rkb_nodename, |
1525 | rd_kafka_confsource2str(rkb->rkb_source), |
1526 | rd_kafka_broker_state_names[rkb->rkb_state], |
1527 | rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0, |
1528 | rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt), |
1529 | rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt), |
1530 | rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt), |
1531 | rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt), |
1532 | rd_atomic64_get(&rkb->rkb_c.tx), |
1533 | rd_atomic64_get(&rkb->rkb_c.tx_bytes), |
1534 | rd_atomic64_get(&rkb->rkb_c.tx_err), |
1535 | rd_atomic64_get(&rkb->rkb_c.tx_retries), |
1536 | rd_atomic64_get(&rkb->rkb_c.req_timeouts), |
1537 | rd_atomic64_get(&rkb->rkb_c.rx), |
1538 | rd_atomic64_get(&rkb->rkb_c.rx_bytes), |
1539 | rd_atomic64_get(&rkb->rkb_c.rx_err), |
1540 | rd_atomic64_get(&rkb->rkb_c.rx_corrid_err), |
1541 | rd_atomic64_get(&rkb->rkb_c.rx_partial), |
1542 | rd_atomic64_get(&rkb->rkb_c.zbuf_grow), |
1543 | rd_atomic64_get(&rkb->rkb_c.buf_grow), |
1544 | rd_atomic64_get(&rkb->rkb_c.wakeups), |
1545 | rd_atomic32_get(&rkb->rkb_c.connects), |
1546 | rd_atomic32_get(&rkb->rkb_c.disconnects)); |
1547 | |
1548 | total.tx += rd_atomic64_get(&rkb->rkb_c.tx); |
1549 | total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes); |
1550 | total.rx += rd_atomic64_get(&rkb->rkb_c.rx); |
1551 | total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes); |
1552 | |
1553 | rd_kafka_stats_emit_avg(st, "int_latency" , |
1554 | &rkb->rkb_avg_int_latency); |
1555 | rd_kafka_stats_emit_avg(st, "outbuf_latency" , |
1556 | &rkb->rkb_avg_outbuf_latency); |
1557 | rd_kafka_stats_emit_avg(st, "rtt" , &rkb->rkb_avg_rtt); |
1558 | rd_kafka_stats_emit_avg(st, "throttle" , &rkb->rkb_avg_throttle); |
1559 | |
1560 | rd_kafka_stats_emit_broker_reqs(st, rkb); |
1561 | |
1562 | _st_printf("\"toppars\":{ " /*open toppars*/); |
1563 | |
1564 | TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) { |
1565 | _st_printf("%s\"%.*s-%" PRId32"\": { " |
1566 | "\"topic\":\"%.*s\", " |
1567 | "\"partition\":%" PRId32"} " , |
1568 | rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"" :", " , |
1569 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1570 | rktp->rktp_partition, |
1571 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
1572 | rktp->rktp_partition); |
1573 | } |
1574 | |
1575 | rd_kafka_broker_unlock(rkb); |
1576 | |
1577 | _st_printf("} " /*close toppars*/ |
1578 | "} " /*close broker*/); |
1579 | } |
1580 | |
1581 | |
1582 | _st_printf("}, " /* close "brokers" array */ |
1583 | "\"topics\":{ " ); |
1584 | |
1585 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
1586 | int i, j; |
1587 | |
1588 | rd_kafka_topic_rdlock(rkt); |
1589 | _st_printf("%s\"%.*s\": { " |
1590 | "\"topic\":\"%.*s\", " |
1591 | "\"metadata_age\":%" PRId64", " , |
1592 | rkt==TAILQ_FIRST(&rk->rk_topics)?"" :", " , |
1593 | RD_KAFKAP_STR_PR(rkt->rkt_topic), |
1594 | RD_KAFKAP_STR_PR(rkt->rkt_topic), |
1595 | rkt->rkt_ts_metadata ? |
1596 | (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0); |
1597 | |
1598 | rd_kafka_stats_emit_avg(st, "batchsize" , |
1599 | &rkt->rkt_avg_batchsize); |
1600 | rd_kafka_stats_emit_avg(st, "batchcnt" , |
1601 | &rkt->rkt_avg_batchcnt); |
1602 | |
1603 | _st_printf("\"partitions\":{ " /*open partitions*/); |
1604 | |
1605 | for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) |
1606 | rd_kafka_stats_emit_toppar( |
1607 | st, &total, |
1608 | rd_kafka_toppar_s2i(rkt->rkt_p[i]), |
1609 | i == 0); |
1610 | |
1611 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j) |
1612 | rd_kafka_stats_emit_toppar( |
1613 | st, &total, |
1614 | rd_kafka_toppar_s2i(s_rktp), |
1615 | i+j == 0); |
1616 | |
1617 | i += j; |
1618 | |
1619 | if (rkt->rkt_ua) |
1620 | rd_kafka_stats_emit_toppar( |
1621 | st, NULL, |
1622 | rd_kafka_toppar_s2i(rkt->rkt_ua), |
1623 | i++ == 0); |
1624 | |
1625 | rd_kafka_topic_rdunlock(rkt); |
1626 | |
1627 | _st_printf("} " /*close partitions*/ |
1628 | "} " /*close topic*/); |
1629 | |
1630 | } |
1631 | _st_printf("} " /*close topics*/); |
1632 | |
1633 | if (rk->rk_cgrp) { |
1634 | rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
1635 | _st_printf(", \"cgrp\": { " |
1636 | "\"state\": \"%s\", " |
1637 | "\"stateage\": %" PRId64", " |
1638 | "\"join_state\": \"%s\", " |
1639 | "\"rebalance_age\": %" PRId64", " |
1640 | "\"rebalance_cnt\": %d, " |
1641 | "\"rebalance_reason\": \"%s\", " |
1642 | "\"assignment_size\": %d }" , |
1643 | rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
1644 | rkcg->rkcg_ts_statechange ? |
1645 | (now - rkcg->rkcg_ts_statechange) / 1000 : 0, |
1646 | rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], |
1647 | rkcg->rkcg_c.ts_rebalance ? |
1648 | (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0, |
1649 | rkcg->rkcg_c.rebalance_cnt, |
1650 | rkcg->rkcg_c.rebalance_reason, |
1651 | rkcg->rkcg_c.assignment_size); |
1652 | } |
1653 | |
1654 | if (rd_kafka_is_idempotent(rk)) { |
1655 | _st_printf(", \"eos\": { " |
1656 | "\"idemp_state\": \"%s\", " |
1657 | "\"idemp_stateage\": %" PRId64", " |
1658 | "\"producer_id\": %" PRId64", " |
1659 | "\"producer_epoch\": %hd, " |
1660 | "\"epoch_cnt\": %d " |
1661 | "}" , |
1662 | rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), |
1663 | (rd_clock() - rk->rk_eos.ts_idemp_state) / 1000, |
1664 | rk->rk_eos.pid.id, |
1665 | rk->rk_eos.pid.epoch, |
1666 | rk->rk_eos.epoch_cnt); |
1667 | } |
1668 | |
1669 | if ((err = rd_atomic32_get(&rk->rk_fatal.err))) |
1670 | _st_printf(", \"fatal\": { " |
1671 | "\"error\": \"%s\", " |
1672 | "\"reason\": \"%s\", " |
1673 | "\"cnt\": %d " |
1674 | "}" , |
1675 | rd_kafka_err2str(err), |
1676 | rk->rk_fatal.errstr, |
1677 | rk->rk_fatal.cnt); |
1678 | |
1679 | rd_kafka_rdunlock(rk); |
1680 | |
1681 | /* Total counters */ |
1682 | _st_printf(", " |
1683 | "\"tx\":%" PRId64", " |
1684 | "\"tx_bytes\":%" PRId64", " |
1685 | "\"rx\":%" PRId64", " |
1686 | "\"rx_bytes\":%" PRId64", " |
1687 | "\"txmsgs\":%" PRId64", " |
1688 | "\"txmsg_bytes\":%" PRId64", " |
1689 | "\"rxmsgs\":%" PRId64", " |
1690 | "\"rxmsg_bytes\":%" PRId64, |
1691 | total.tx, |
1692 | total.tx_bytes, |
1693 | total.rx, |
1694 | total.rx_bytes, |
1695 | total.txmsgs, |
1696 | total.txmsg_bytes, |
1697 | total.rxmsgs, |
1698 | total.rxmsg_bytes); |
1699 | |
1700 | _st_printf("}" /*close object*/); |
1701 | |
1702 | |
1703 | /* Enqueue op for application */ |
1704 | rko = rd_kafka_op_new(RD_KAFKA_OP_STATS); |
1705 | rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH); |
1706 | rko->rko_u.stats.json = st->buf; |
1707 | rko->rko_u.stats.json_len = st->of; |
1708 | rd_kafka_q_enq(rk->rk_rep, rko); |
1709 | } |
1710 | |
1711 | |
1712 | /** |
1713 | * @brief 1 second generic timer. |
1714 | * |
1715 | * @locality rdkafka main thread |
1716 | * @locks none |
1717 | */ |
1718 | static void rd_kafka_1s_tmr_cb (rd_kafka_timers_t *rkts, void *arg) { |
1719 | rd_kafka_t *rk = rkts->rkts_rk; |
1720 | |
1721 | /* Scan topic state, message timeouts, etc. */ |
1722 | rd_kafka_topic_scan_all(rk, rd_clock()); |
1723 | |
1724 | /* Sparse connections: |
1725 | * try to maintain at least one connection to the cluster. */ |
1726 | if (rk->rk_conf.sparse_connections && |
1727 | rd_atomic32_get(&rk->rk_broker_up_cnt) == 0) |
1728 | rd_kafka_connect_any(rk, "no cluster connection" ); |
1729 | |
1730 | } |
1731 | |
1732 | static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) { |
1733 | rd_kafka_t *rk = rkts->rkts_rk; |
1734 | rd_kafka_stats_emit_all(rk); |
1735 | } |
1736 | |
1737 | |
1738 | /** |
1739 | * @brief Periodic metadata refresh callback |
1740 | * |
1741 | * @locality rdkafka main thread |
1742 | */ |
1743 | static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) { |
1744 | rd_kafka_t *rk = rkts->rkts_rk; |
1745 | int sparse = 1; |
1746 | |
1747 | /* Dont do sparse requests if there is a consumer group with an |
1748 | * active subscription since subscriptions need to be able to match |
1749 | * on all topics. */ |
1750 | if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp && |
1751 | rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) |
1752 | sparse = 0; |
1753 | |
1754 | if (sparse) |
1755 | rd_kafka_metadata_refresh_known_topics( |
1756 | rk, NULL, 1/*force*/, "periodic refresh" ); |
1757 | else |
1758 | rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh" ); |
1759 | } |
1760 | |
1761 | |
1762 | |
1763 | /** |
1764 | * @brief Wait for background threads to initialize. |
1765 | * |
1766 | * @returns the number of background threads still not initialized. |
1767 | * |
1768 | * @locality app thread calling rd_kafka_new() |
1769 | * @locks none |
1770 | */ |
1771 | static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) { |
1772 | struct timespec tspec; |
1773 | int ret; |
1774 | |
1775 | rd_timeout_init_timespec(&tspec, timeout_ms); |
1776 | |
1777 | mtx_lock(&rk->rk_init_lock); |
1778 | while (rk->rk_init_wait_cnt > 0 && |
1779 | cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock, |
1780 | &tspec) == thrd_success) |
1781 | ; |
1782 | ret = rk->rk_init_wait_cnt; |
1783 | mtx_unlock(&rk->rk_init_lock); |
1784 | |
1785 | return ret; |
1786 | } |
1787 | |
1788 | |
1789 | /** |
1790 | * Main loop for Kafka handler thread. |
1791 | */ |
1792 | static int rd_kafka_thread_main (void *arg) { |
1793 | rd_kafka_t *rk = arg; |
1794 | rd_kafka_timer_t tmr_1s = RD_ZERO_INIT; |
1795 | rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT; |
1796 | rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT; |
1797 | |
1798 | rd_kafka_set_thread_name("main" ); |
1799 | rd_kafka_set_thread_sysname("rdk:main" ); |
1800 | |
1801 | (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); |
1802 | |
1803 | /* Acquire lock (which was held by thread creator during creation) |
1804 | * to synchronise state. */ |
1805 | rd_kafka_wrlock(rk); |
1806 | rd_kafka_wrunlock(rk); |
1807 | |
1808 | /* 1 second timer for topic scan and connection checking. */ |
1809 | rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000, |
1810 | rd_kafka_1s_tmr_cb, NULL); |
1811 | if (rk->rk_conf.stats_interval_ms) |
1812 | rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit, |
1813 | rk->rk_conf.stats_interval_ms * 1000ll, |
1814 | rd_kafka_stats_emit_tmr_cb, NULL); |
1815 | if (rk->rk_conf.metadata_refresh_interval_ms > 0) |
1816 | rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh, |
1817 | rk->rk_conf.metadata_refresh_interval_ms * |
1818 | 1000ll, |
1819 | rd_kafka_metadata_refresh_cb, NULL); |
1820 | |
1821 | if (rk->rk_cgrp) |
1822 | rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops); |
1823 | |
1824 | if (rd_kafka_is_idempotent(rk)) |
1825 | rd_kafka_idemp_init(rk); |
1826 | |
1827 | mtx_lock(&rk->rk_init_lock); |
1828 | rk->rk_init_wait_cnt--; |
1829 | cnd_broadcast(&rk->rk_init_cnd); |
1830 | mtx_unlock(&rk->rk_init_lock); |
1831 | |
1832 | while (likely(!rd_kafka_terminating(rk) || |
1833 | rd_kafka_q_len(rk->rk_ops))) { |
1834 | rd_ts_t sleeptime = rd_kafka_timers_next( |
1835 | &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/); |
1836 | rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0, |
1837 | RD_KAFKA_Q_CB_CALLBACK, NULL, NULL); |
1838 | if (rk->rk_cgrp) /* FIXME: move to timer-triggered */ |
1839 | rd_kafka_cgrp_serve(rk->rk_cgrp); |
1840 | rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT); |
1841 | } |
1842 | |
1843 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
1844 | "Internal main thread terminating" ); |
1845 | |
1846 | if (rd_kafka_is_idempotent(rk)) |
1847 | rd_kafka_idemp_term(rk); |
1848 | |
1849 | rd_kafka_q_disable(rk->rk_ops); |
1850 | rd_kafka_q_purge(rk->rk_ops); |
1851 | |
1852 | rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1); |
1853 | if (rk->rk_conf.stats_interval_ms) |
1854 | rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1); |
1855 | rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1); |
1856 | |
1857 | /* Synchronise state */ |
1858 | rd_kafka_wrlock(rk); |
1859 | rd_kafka_wrunlock(rk); |
1860 | |
1861 | rd_kafka_destroy_internal(rk); |
1862 | |
1863 | rd_kafka_dbg(rk, GENERIC, "TERMINATE" , |
1864 | "Internal main thread termination done" ); |
1865 | |
1866 | rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1); |
1867 | |
1868 | return 0; |
1869 | } |
1870 | |
1871 | |
1872 | static void rd_kafka_term_sig_handler (int sig) { |
1873 | /* nop */ |
1874 | } |
1875 | |
1876 | |
1877 | rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf, |
1878 | char *errstr, size_t errstr_size) { |
1879 | rd_kafka_t *rk; |
1880 | static rd_atomic32_t rkid; |
1881 | rd_kafka_conf_t *conf; |
1882 | rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; |
1883 | int ret_errno = 0; |
1884 | const char *conf_err; |
1885 | #ifndef _MSC_VER |
1886 | sigset_t newset, oldset; |
1887 | #endif |
1888 | char builtin_features[128]; |
1889 | size_t bflen; |
1890 | |
1891 | rd_kafka_global_init(); |
1892 | |
1893 | /* rd_kafka_new() takes ownership of the provided \p app_conf |
1894 | * object if rd_kafka_new() succeeds. |
1895 | * Since \p app_conf is optional we allocate a default configuration |
1896 | * object here if \p app_conf is NULL. |
1897 | * The configuration object itself is struct-copied later |
1898 | * leaving the default *conf pointer to be ready for freeing. |
1899 | * In case new() fails and app_conf was specified we will clear out |
1900 | * rk_conf to avoid double-freeing from destroy_internal() and the |
1901 | * user's eventual call to rd_kafka_conf_destroy(). |
1902 | * This is all a bit tricky but that's the nature of |
1903 | * legacy interfaces. */ |
1904 | if (!app_conf) |
1905 | conf = rd_kafka_conf_new(); |
1906 | else |
1907 | conf = app_conf; |
1908 | |
1909 | /* Verify and finalize configuration */ |
1910 | if ((conf_err = rd_kafka_conf_finalize(type, conf))) { |
1911 | /* Incompatible configuration settings */ |
1912 | rd_snprintf(errstr, errstr_size, "%s" , conf_err); |
1913 | if (!app_conf) |
1914 | rd_kafka_conf_destroy(conf); |
1915 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
1916 | return NULL; |
1917 | } |
1918 | |
1919 | |
1920 | rd_kafka_global_cnt_incr(); |
1921 | |
1922 | /* |
1923 | * Set up the handle. |
1924 | */ |
1925 | rk = rd_calloc(1, sizeof(*rk)); |
1926 | |
1927 | rk->rk_type = type; |
1928 | |
1929 | /* Struct-copy the config object. */ |
1930 | rk->rk_conf = *conf; |
1931 | if (!app_conf) |
1932 | rd_free(conf); /* Free the base config struct only, |
1933 | * not its fields since they were copied to |
1934 | * rk_conf just above. Those fields are |
1935 | * freed from rd_kafka_destroy_internal() |
1936 | * as the rk itself is destroyed. */ |
1937 | |
1938 | /* Call on_new() interceptors */ |
1939 | rd_kafka_interceptors_on_new(rk, &rk->rk_conf); |
1940 | |
1941 | rwlock_init(&rk->rk_lock); |
1942 | mtx_init(&rk->rk_internal_rkb_lock, mtx_plain); |
1943 | |
1944 | cnd_init(&rk->rk_broker_state_change_cnd); |
1945 | mtx_init(&rk->rk_broker_state_change_lock, mtx_plain); |
1946 | rd_list_init(&rk->rk_broker_state_change_waiters, 8, |
1947 | rd_kafka_enq_once_trigger_destroy); |
1948 | |
1949 | cnd_init(&rk->rk_init_cnd); |
1950 | mtx_init(&rk->rk_init_lock, mtx_plain); |
1951 | |
1952 | rd_interval_init(&rk->rk_suppress.no_idemp_brokers); |
1953 | rd_interval_init(&rk->rk_suppress.sparse_connect_random); |
1954 | mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain); |
1955 | |
1956 | rd_atomic64_init(&rk->rk_ts_last_poll, INT64_MAX); |
1957 | |
1958 | rk->rk_rep = rd_kafka_q_new(rk); |
1959 | rk->rk_ops = rd_kafka_q_new(rk); |
1960 | rk->rk_ops->rkq_serve = rd_kafka_poll_cb; |
1961 | rk->rk_ops->rkq_opaque = rk; |
1962 | |
1963 | if (rk->rk_conf.log_queue) { |
1964 | rk->rk_logq = rd_kafka_q_new(rk); |
1965 | rk->rk_logq->rkq_serve = rd_kafka_poll_cb; |
1966 | rk->rk_logq->rkq_opaque = rk; |
1967 | } |
1968 | |
1969 | TAILQ_INIT(&rk->rk_brokers); |
1970 | TAILQ_INIT(&rk->rk_topics); |
1971 | rd_kafka_timers_init(&rk->rk_timers, rk); |
1972 | rd_kafka_metadata_cache_init(rk); |
1973 | |
1974 | if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb) |
1975 | rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR; |
1976 | if (rk->rk_conf.rebalance_cb) |
1977 | rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE; |
1978 | if (rk->rk_conf.offset_commit_cb) |
1979 | rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT; |
1980 | if (rk->rk_conf.error_cb) |
1981 | rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR; |
1982 | #if WITH_SASL_OAUTHBEARER |
1983 | if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt && |
1984 | !rk->rk_conf.sasl.oauthbearer_token_refresh_cb) |
1985 | rd_kafka_conf_set_oauthbearer_token_refresh_cb( |
1986 | &rk->rk_conf, |
1987 | rd_kafka_oauthbearer_unsecured_token); |
1988 | |
1989 | if (rk->rk_conf.sasl.oauthbearer_token_refresh_cb) |
1990 | rk->rk_conf.enabled_events |= |
1991 | RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH; |
1992 | #endif |
1993 | |
1994 | rk->rk_controllerid = -1; |
1995 | |
1996 | /* Admin client defaults */ |
1997 | rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms; |
1998 | |
1999 | /* Convenience Kafka protocol null bytes */ |
2000 | rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0); |
2001 | |
2002 | if (rk->rk_conf.debug) |
2003 | rk->rk_conf.log_level = LOG_DEBUG; |
2004 | |
2005 | rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i" , |
2006 | rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type), |
2007 | rd_atomic32_add(&rkid, 1)); |
2008 | |
2009 | /* Construct clientid kafka string */ |
2010 | rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1); |
2011 | |
2012 | /* Convert group.id to kafka string (may be NULL) */ |
2013 | rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1); |
2014 | |
2015 | /* Config fixups */ |
2016 | rk->rk_conf.queued_max_msg_bytes = |
2017 | (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll; |
2018 | |
2019 | /* Enable api.version.request=true if fallback.broker.version |
2020 | * indicates a supporting broker. */ |
2021 | if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback)) |
2022 | rk->rk_conf.api_version_request = 1; |
2023 | |
2024 | if (rk->rk_type == RD_KAFKA_PRODUCER) { |
2025 | mtx_init(&rk->rk_curr_msgs.lock, mtx_plain); |
2026 | cnd_init(&rk->rk_curr_msgs.cnd); |
2027 | rk->rk_curr_msgs.max_cnt = |
2028 | rk->rk_conf.queue_buffering_max_msgs; |
2029 | if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 > |
2030 | (unsigned long long)SIZE_MAX) |
2031 | rk->rk_curr_msgs.max_size = SIZE_MAX; |
2032 | else |
2033 | rk->rk_curr_msgs.max_size = |
2034 | (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024; |
2035 | } |
2036 | |
2037 | if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) { |
2038 | ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
2039 | ret_errno = EINVAL; |
2040 | goto fail; |
2041 | } |
2042 | |
2043 | if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL || |
2044 | rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) { |
2045 | /* Select SASL provider */ |
2046 | if (rd_kafka_sasl_select_provider(rk, |
2047 | errstr, errstr_size) == -1) { |
2048 | ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
2049 | ret_errno = EINVAL; |
2050 | goto fail; |
2051 | } |
2052 | |
2053 | /* Initialize SASL provider */ |
2054 | if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) { |
2055 | rk->rk_conf.sasl.provider = NULL; |
2056 | ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
2057 | ret_errno = EINVAL; |
2058 | goto fail; |
2059 | } |
2060 | } |
2061 | |
2062 | #if WITH_SSL |
2063 | if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL || |
2064 | rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) { |
2065 | /* Create SSL context */ |
2066 | if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) { |
2067 | ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
2068 | ret_errno = EINVAL; |
2069 | goto fail; |
2070 | } |
2071 | } |
2072 | #endif |
2073 | |
2074 | /* Client group, eligible both in consumer and producer mode. */ |
2075 | if (type == RD_KAFKA_CONSUMER && |
2076 | RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) |
2077 | rk->rk_cgrp = rd_kafka_cgrp_new(rk, |
2078 | rk->rk_group_id, |
2079 | rk->rk_client_id); |
2080 | |
2081 | rk->rk_eos.transactional_id = rd_kafkap_str_new(NULL, 0); |
2082 | |
2083 | #ifndef _MSC_VER |
2084 | /* Block all signals in newly created threads. |
2085 | * To avoid race condition we block all signals in the calling |
2086 | * thread, which the new thread will inherit its sigmask from, |
2087 | * and then restore the original sigmask of the calling thread when |
2088 | * we're done creating the thread. */ |
2089 | sigemptyset(&oldset); |
2090 | sigfillset(&newset); |
2091 | if (rk->rk_conf.term_sig) { |
2092 | struct sigaction sa_term = { |
2093 | .sa_handler = rd_kafka_term_sig_handler |
2094 | }; |
2095 | sigaction(rk->rk_conf.term_sig, &sa_term, NULL); |
2096 | } |
2097 | pthread_sigmask(SIG_SETMASK, &newset, &oldset); |
2098 | #endif |
2099 | |
2100 | mtx_lock(&rk->rk_init_lock); |
2101 | |
2102 | /* Create background thread and queue if background_event_cb() |
2103 | * has been configured. |
2104 | * Do this before creating the main thread since after |
2105 | * the main thread is created it is no longer trivial to error |
2106 | * out from rd_kafka_new(). */ |
2107 | if (rk->rk_conf.background_event_cb) { |
2108 | /* Hold off background thread until thrd_create() is done. */ |
2109 | rd_kafka_wrlock(rk); |
2110 | |
2111 | rk->rk_background.q = rd_kafka_q_new(rk); |
2112 | |
2113 | rk->rk_init_wait_cnt++; |
2114 | |
2115 | if ((thrd_create(&rk->rk_background.thread, |
2116 | rd_kafka_background_thread_main, rk)) != |
2117 | thrd_success) { |
2118 | rk->rk_init_wait_cnt--; |
2119 | ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; |
2120 | ret_errno = errno; |
2121 | if (errstr) |
2122 | rd_snprintf(errstr, errstr_size, |
2123 | "Failed to create background " |
2124 | "thread: %s (%i)" , |
2125 | rd_strerror(errno), errno); |
2126 | rd_kafka_wrunlock(rk); |
2127 | mtx_unlock(&rk->rk_init_lock); |
2128 | |
2129 | #ifndef _MSC_VER |
2130 | /* Restore sigmask of caller */ |
2131 | pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
2132 | #endif |
2133 | goto fail; |
2134 | } |
2135 | |
2136 | rd_kafka_wrunlock(rk); |
2137 | } |
2138 | |
2139 | |
2140 | |
2141 | /* Lock handle here to synchronise state, i.e., hold off |
2142 | * the thread until we've finalized the handle. */ |
2143 | rd_kafka_wrlock(rk); |
2144 | |
2145 | /* Create handler thread */ |
2146 | rk->rk_init_wait_cnt++; |
2147 | if ((thrd_create(&rk->rk_thread, |
2148 | rd_kafka_thread_main, rk)) != thrd_success) { |
2149 | rk->rk_init_wait_cnt--; |
2150 | ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; |
2151 | ret_errno = errno; |
2152 | if (errstr) |
2153 | rd_snprintf(errstr, errstr_size, |
2154 | "Failed to create thread: %s (%i)" , |
2155 | rd_strerror(errno), errno); |
2156 | rd_kafka_wrunlock(rk); |
2157 | mtx_unlock(&rk->rk_init_lock); |
2158 | #ifndef _MSC_VER |
2159 | /* Restore sigmask of caller */ |
2160 | pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
2161 | #endif |
2162 | goto fail; |
2163 | } |
2164 | |
2165 | rd_kafka_wrunlock(rk); |
2166 | mtx_unlock(&rk->rk_init_lock); |
2167 | |
2168 | /* |
2169 | * @warning `goto fail` is prohibited past this point |
2170 | */ |
2171 | |
2172 | mtx_lock(&rk->rk_internal_rkb_lock); |
2173 | rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL, |
2174 | RD_KAFKA_PROTO_PLAINTEXT, |
2175 | "" , 0, RD_KAFKA_NODEID_UA); |
2176 | mtx_unlock(&rk->rk_internal_rkb_lock); |
2177 | |
2178 | /* Add initial list of brokers from configuration */ |
2179 | if (rk->rk_conf.brokerlist) { |
2180 | if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0) |
2181 | rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, |
2182 | "No brokers configured" ); |
2183 | } |
2184 | |
2185 | #ifndef _MSC_VER |
2186 | /* Restore sigmask of caller */ |
2187 | pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
2188 | #endif |
2189 | |
2190 | /* Free user supplied conf's base pointer on success, |
2191 | * but not the actual allocated fields since the struct |
2192 | * will have been copied in its entirety above. */ |
2193 | if (app_conf) |
2194 | rd_free(app_conf); |
2195 | rd_kafka_set_last_error(0, 0); |
2196 | |
2197 | rd_kafka_conf_warn(rk); |
2198 | |
2199 | /* Wait for background threads to fully initialize so that |
2200 | * the client instance is fully functional at the time it is |
2201 | * returned from the constructor. */ |
2202 | if (rd_kafka_init_wait(rk, 60*1000) != 0) { |
2203 | /* This should never happen unless there is a bug |
2204 | * or the OS is not scheduling the background threads. |
2205 | * Either case there is no point in handling this gracefully |
2206 | * in the current state since the thread joins are likely |
2207 | * to hang as well. */ |
2208 | mtx_lock(&rk->rk_init_lock); |
2209 | rd_kafka_log(rk, LOG_CRIT, "INIT" , |
2210 | "Failed to initialize %s: " |
2211 | "%d background thread(s) did not initialize " |
2212 | "within 60 seconds" , |
2213 | rk->rk_name, rk->rk_init_wait_cnt); |
2214 | if (errstr) |
2215 | rd_snprintf(errstr, errstr_size, |
2216 | "Timed out waiting for " |
2217 | "%d background thread(s) to initialize" , |
2218 | rk->rk_init_wait_cnt); |
2219 | mtx_unlock(&rk->rk_init_lock); |
2220 | |
2221 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, |
2222 | EDEADLK); |
2223 | return NULL; |
2224 | } |
2225 | |
2226 | rk->rk_initialized = 1; |
2227 | |
2228 | bflen = sizeof(builtin_features); |
2229 | if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features" , |
2230 | builtin_features, &bflen) != |
2231 | RD_KAFKA_CONF_OK) |
2232 | rd_snprintf(builtin_features, sizeof(builtin_features), "?" ); |
2233 | rd_kafka_dbg(rk, ALL, "INIT" , |
2234 | "librdkafka v%s (0x%x) %s initialized " |
2235 | "(builtin.features %s, %s, debug 0x%x)" , |
2236 | rd_kafka_version_str(), rd_kafka_version(), |
2237 | rk->rk_name, |
2238 | builtin_features, BUILT_WITH, |
2239 | rk->rk_conf.debug); |
2240 | |
2241 | /* Log warnings for deprecated configuration */ |
2242 | rd_kafka_conf_warn(rk); |
2243 | |
2244 | return rk; |
2245 | |
2246 | fail: |
2247 | /* |
2248 | * Error out and clean up |
2249 | */ |
2250 | |
2251 | /* |
2252 | * Tell background thread to terminate and wait for it to return. |
2253 | */ |
2254 | rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE); |
2255 | |
2256 | /* Terminate SASL provider */ |
2257 | if (rk->rk_conf.sasl.provider) |
2258 | rd_kafka_sasl_term(rk); |
2259 | |
2260 | if (rk->rk_background.thread) { |
2261 | int res; |
2262 | thrd_join(rk->rk_background.thread, &res); |
2263 | rd_kafka_q_destroy_owner(rk->rk_background.q); |
2264 | } |
2265 | |
2266 | /* If on_new() interceptors have been called we also need |
2267 | * to allow interceptor clean-up by calling on_destroy() */ |
2268 | rd_kafka_interceptors_on_destroy(rk); |
2269 | |
2270 | /* If rk_conf is a struct-copy of the application configuration |
2271 | * we need to avoid rk_conf fields from being freed from |
2272 | * rd_kafka_destroy_internal() since they belong to app_conf. |
2273 | * However, there are some internal fields, such as interceptors, |
2274 | * that belong to rk_conf and thus needs to be cleaned up. |
2275 | * Legacy APIs, sigh.. */ |
2276 | if (app_conf) { |
2277 | rd_kafka_assignors_term(rk); |
2278 | rd_kafka_interceptors_destroy(&rk->rk_conf); |
2279 | memset(&rk->rk_conf, 0, sizeof(rk->rk_conf)); |
2280 | } |
2281 | |
2282 | rd_kafka_destroy_internal(rk); |
2283 | rd_kafka_destroy_final(rk); |
2284 | |
2285 | rd_kafka_set_last_error(ret_err, ret_errno); |
2286 | |
2287 | return NULL; |
2288 | } |
2289 | |
2290 | |
2291 | |
2292 | |
2293 | /** |
2294 | * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with |
2295 | * friends) since it does not have an API for stopping the cgrp we will need to |
2296 | * sort that out automatically in the background when all consumption |
2297 | * has stopped. |
2298 | * |
2299 | * Returns 0 if a High level consumer is already instantiated |
2300 | * which means a Simple consumer cannot co-operate with it, else 1. |
2301 | * |
2302 | * A rd_kafka_t handle can never migrate from simple to high-level, or |
2303 | * vice versa, so we dont need a ..consumer_del(). |
2304 | */ |
2305 | int rd_kafka_simple_consumer_add (rd_kafka_t *rk) { |
2306 | if (rd_atomic32_get(&rk->rk_simple_cnt) < 0) |
2307 | return 0; |
2308 | |
2309 | return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1); |
2310 | } |
2311 | |
2312 | |
2313 | |
2314 | |
2315 | /** |
2316 | * rktp fetch is split up in these parts: |
2317 | * * application side: |
2318 | * * broker side (handled by current leader broker thread for rktp): |
2319 | * - the fetch state, initial offset, etc. |
2320 | * - fetching messages, updating fetched offset, etc. |
2321 | * - offset commits |
2322 | * |
2323 | * Communication between the two are: |
2324 | * app side -> rdkafka main side: rktp_ops |
2325 | * broker thread -> app side: rktp_fetchq |
2326 | * |
2327 | * There is no shared state between these threads, instead |
2328 | * state is communicated through the two op queues, and state synchronization |
2329 | * is performed by version barriers. |
2330 | * |
2331 | */ |
2332 | |
2333 | static RD_UNUSED |
2334 | int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition, |
2335 | int64_t offset, rd_kafka_q_t *rkq) { |
2336 | shptr_rd_kafka_toppar_t *s_rktp; |
2337 | |
2338 | if (partition < 0) { |
2339 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
2340 | ESRCH); |
2341 | return -1; |
2342 | } |
2343 | |
2344 | if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) { |
2345 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
2346 | return -1; |
2347 | } |
2348 | |
2349 | rd_kafka_topic_wrlock(rkt); |
2350 | s_rktp = rd_kafka_toppar_desired_add(rkt, partition); |
2351 | rd_kafka_topic_wrunlock(rkt); |
2352 | |
2353 | /* Verify offset */ |
2354 | if (offset == RD_KAFKA_OFFSET_BEGINNING || |
2355 | offset == RD_KAFKA_OFFSET_END || |
2356 | offset <= RD_KAFKA_OFFSET_TAIL_BASE) { |
2357 | /* logical offsets */ |
2358 | |
2359 | } else if (offset == RD_KAFKA_OFFSET_STORED) { |
2360 | /* offset manager */ |
2361 | |
2362 | if (rkt->rkt_conf.offset_store_method == |
2363 | RD_KAFKA_OFFSET_METHOD_BROKER && |
2364 | RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) { |
2365 | /* Broker based offsets require a group id. */ |
2366 | rd_kafka_toppar_destroy(s_rktp); |
2367 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, |
2368 | EINVAL); |
2369 | return -1; |
2370 | } |
2371 | |
2372 | } else if (offset < 0) { |
2373 | rd_kafka_toppar_destroy(s_rktp); |
2374 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, |
2375 | EINVAL); |
2376 | return -1; |
2377 | |
2378 | } |
2379 | |
2380 | rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset, |
2381 | rkq, RD_KAFKA_NO_REPLYQ); |
2382 | |
2383 | rd_kafka_toppar_destroy(s_rktp); |
2384 | |
2385 | rd_kafka_set_last_error(0, 0); |
2386 | return 0; |
2387 | } |
2388 | |
2389 | |
2390 | |
2391 | |
2392 | int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition, |
2393 | int64_t offset) { |
2394 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2395 | rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START" , |
2396 | "Start consuming partition %" PRId32,partition); |
2397 | return rd_kafka_consume_start0(rkt, partition, offset, NULL); |
2398 | } |
2399 | |
2400 | int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition, |
2401 | int64_t offset, rd_kafka_queue_t *rkqu) { |
2402 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2403 | |
2404 | return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q); |
2405 | } |
2406 | |
2407 | |
2408 | |
2409 | |
2410 | static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) { |
2411 | rd_kafka_q_t *tmpq = NULL; |
2412 | rd_kafka_resp_err_t err; |
2413 | |
2414 | rd_kafka_topic_wrlock(rktp->rktp_rkt); |
2415 | rd_kafka_toppar_lock(rktp); |
2416 | rd_kafka_toppar_desired_del(rktp); |
2417 | rd_kafka_toppar_unlock(rktp); |
2418 | rd_kafka_topic_wrunlock(rktp->rktp_rkt); |
2419 | |
2420 | tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk); |
2421 | |
2422 | rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0)); |
2423 | |
2424 | /* Synchronisation: Wait for stop reply from broker thread */ |
2425 | err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE); |
2426 | rd_kafka_q_destroy_owner(tmpq); |
2427 | |
2428 | rd_kafka_set_last_error(err, err ? EINVAL : 0); |
2429 | |
2430 | return err ? -1 : 0; |
2431 | } |
2432 | |
2433 | |
2434 | int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) { |
2435 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2436 | shptr_rd_kafka_toppar_t *s_rktp; |
2437 | int r; |
2438 | |
2439 | if (partition == RD_KAFKA_PARTITION_UA) { |
2440 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL); |
2441 | return -1; |
2442 | } |
2443 | |
2444 | rd_kafka_topic_wrlock(rkt); |
2445 | if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) && |
2446 | !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) { |
2447 | rd_kafka_topic_wrunlock(rkt); |
2448 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
2449 | ESRCH); |
2450 | return -1; |
2451 | } |
2452 | rd_kafka_topic_wrunlock(rkt); |
2453 | |
2454 | r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp)); |
2455 | /* set_last_error() called by stop0() */ |
2456 | |
2457 | rd_kafka_toppar_destroy(s_rktp); |
2458 | |
2459 | return r; |
2460 | } |
2461 | |
2462 | |
2463 | |
2464 | rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt, |
2465 | int32_t partition, |
2466 | int64_t offset, |
2467 | int timeout_ms) { |
2468 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2469 | shptr_rd_kafka_toppar_t *s_rktp; |
2470 | rd_kafka_toppar_t *rktp; |
2471 | rd_kafka_q_t *tmpq = NULL; |
2472 | rd_kafka_resp_err_t err; |
2473 | rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ; |
2474 | |
2475 | /* FIXME: simple consumer check */ |
2476 | |
2477 | if (partition == RD_KAFKA_PARTITION_UA) |
2478 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
2479 | |
2480 | rd_kafka_topic_rdlock(rkt); |
2481 | if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) && |
2482 | !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) { |
2483 | rd_kafka_topic_rdunlock(rkt); |
2484 | return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
2485 | } |
2486 | rd_kafka_topic_rdunlock(rkt); |
2487 | |
2488 | if (timeout_ms) { |
2489 | tmpq = rd_kafka_q_new(rkt->rkt_rk); |
2490 | replyq = RD_KAFKA_REPLYQ(tmpq, 0); |
2491 | } |
2492 | |
2493 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2494 | if ((err = rd_kafka_toppar_op_seek(rktp, offset, replyq))) { |
2495 | if (tmpq) |
2496 | rd_kafka_q_destroy_owner(tmpq); |
2497 | rd_kafka_toppar_destroy(s_rktp); |
2498 | return err; |
2499 | } |
2500 | |
2501 | rd_kafka_toppar_destroy(s_rktp); |
2502 | |
2503 | if (tmpq) { |
2504 | err = rd_kafka_q_wait_result(tmpq, timeout_ms); |
2505 | rd_kafka_q_destroy_owner(tmpq); |
2506 | return err; |
2507 | } |
2508 | |
2509 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2510 | } |
2511 | |
2512 | |
2513 | |
2514 | static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq, |
2515 | int timeout_ms, |
2516 | rd_kafka_message_t **rkmessages, |
2517 | size_t rkmessages_size) { |
2518 | /* Populate application's rkmessages array. */ |
2519 | return rd_kafka_q_serve_rkmessages(rkq, timeout_ms, |
2520 | rkmessages, rkmessages_size); |
2521 | } |
2522 | |
2523 | |
2524 | ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition, |
2525 | int timeout_ms, |
2526 | rd_kafka_message_t **rkmessages, |
2527 | size_t rkmessages_size) { |
2528 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2529 | shptr_rd_kafka_toppar_t *s_rktp; |
2530 | rd_kafka_toppar_t *rktp; |
2531 | ssize_t cnt; |
2532 | |
2533 | /* Get toppar */ |
2534 | rd_kafka_topic_rdlock(rkt); |
2535 | s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); |
2536 | if (unlikely(!s_rktp)) |
2537 | s_rktp = rd_kafka_toppar_desired_get(rkt, partition); |
2538 | rd_kafka_topic_rdunlock(rkt); |
2539 | |
2540 | if (unlikely(!s_rktp)) { |
2541 | /* No such toppar known */ |
2542 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
2543 | ESRCH); |
2544 | return -1; |
2545 | } |
2546 | |
2547 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2548 | |
2549 | /* Populate application's rkmessages array. */ |
2550 | cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms, |
2551 | rkmessages, rkmessages_size); |
2552 | |
2553 | rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */ |
2554 | |
2555 | rd_kafka_set_last_error(0, 0); |
2556 | |
2557 | return cnt; |
2558 | } |
2559 | |
2560 | ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu, |
2561 | int timeout_ms, |
2562 | rd_kafka_message_t **rkmessages, |
2563 | size_t rkmessages_size) { |
2564 | /* Populate application's rkmessages array. */ |
2565 | return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms, |
2566 | rkmessages, rkmessages_size); |
2567 | } |
2568 | |
2569 | |
2570 | struct consume_ctx { |
2571 | void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque); |
2572 | void *opaque; |
2573 | }; |
2574 | |
2575 | |
2576 | /** |
2577 | * Trampoline for application's consume_cb() |
2578 | */ |
2579 | static rd_kafka_op_res_t |
2580 | rd_kafka_consume_cb (rd_kafka_t *rk, |
2581 | rd_kafka_q_t *rkq, |
2582 | rd_kafka_op_t *rko, |
2583 | rd_kafka_q_cb_type_t cb_type, void *opaque) { |
2584 | struct consume_ctx *ctx = opaque; |
2585 | rd_kafka_message_t *rkmessage; |
2586 | |
2587 | if (unlikely(rd_kafka_op_version_outdated(rko, 0))) { |
2588 | rd_kafka_op_destroy(rko); |
2589 | return RD_KAFKA_OP_RES_HANDLED; |
2590 | } |
2591 | |
2592 | rkmessage = rd_kafka_message_get(rko); |
2593 | |
2594 | rd_kafka_op_offset_store(rk, rko, rkmessage); |
2595 | |
2596 | ctx->consume_cb(rkmessage, ctx->opaque); |
2597 | |
2598 | rd_kafka_op_destroy(rko); |
2599 | |
2600 | return RD_KAFKA_OP_RES_HANDLED; |
2601 | } |
2602 | |
2603 | |
2604 | |
2605 | static rd_kafka_op_res_t |
2606 | rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt, |
2607 | void (*consume_cb) (rd_kafka_message_t |
2608 | *rkmessage, |
2609 | void *opaque), |
2610 | void *opaque) { |
2611 | struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque }; |
2612 | rd_kafka_op_res_t res; |
2613 | |
2614 | if (timeout_ms) |
2615 | rd_kafka_app_poll_blocking(rkq->rkq_rk); |
2616 | |
2617 | res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, |
2618 | RD_KAFKA_Q_CB_RETURN, |
2619 | rd_kafka_consume_cb, &ctx); |
2620 | |
2621 | rd_kafka_app_polled(rkq->rkq_rk); |
2622 | |
2623 | return res; |
2624 | } |
2625 | |
2626 | |
2627 | int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition, |
2628 | int timeout_ms, |
2629 | void (*consume_cb) (rd_kafka_message_t |
2630 | *rkmessage, |
2631 | void *opaque), |
2632 | void *opaque) { |
2633 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2634 | shptr_rd_kafka_toppar_t *s_rktp; |
2635 | rd_kafka_toppar_t *rktp; |
2636 | int r; |
2637 | |
2638 | /* Get toppar */ |
2639 | rd_kafka_topic_rdlock(rkt); |
2640 | s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); |
2641 | if (unlikely(!s_rktp)) |
2642 | s_rktp = rd_kafka_toppar_desired_get(rkt, partition); |
2643 | rd_kafka_topic_rdunlock(rkt); |
2644 | |
2645 | if (unlikely(!s_rktp)) { |
2646 | /* No such toppar known */ |
2647 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
2648 | ESRCH); |
2649 | return -1; |
2650 | } |
2651 | |
2652 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2653 | r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms, |
2654 | rkt->rkt_conf.consume_callback_max_msgs, |
2655 | consume_cb, opaque); |
2656 | |
2657 | rd_kafka_toppar_destroy(s_rktp); |
2658 | |
2659 | rd_kafka_set_last_error(0, 0); |
2660 | |
2661 | return r; |
2662 | } |
2663 | |
2664 | |
2665 | |
2666 | int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu, |
2667 | int timeout_ms, |
2668 | void (*consume_cb) (rd_kafka_message_t |
2669 | *rkmessage, |
2670 | void *opaque), |
2671 | void *opaque) { |
2672 | return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0, |
2673 | consume_cb, opaque); |
2674 | } |
2675 | |
2676 | |
2677 | /** |
2678 | * Serve queue 'rkq' and return one message. |
2679 | * By serving the queue it will also call any registered callbacks |
2680 | * registered for matching events, this includes consumer_cb() |
2681 | * in which case no message will be returned. |
2682 | */ |
2683 | static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk, |
2684 | rd_kafka_q_t *rkq, |
2685 | int timeout_ms) { |
2686 | rd_kafka_op_t *rko; |
2687 | rd_kafka_message_t *rkmessage = NULL; |
2688 | rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
2689 | |
2690 | if (timeout_ms) |
2691 | rd_kafka_app_poll_blocking(rk); |
2692 | |
2693 | rd_kafka_yield_thread = 0; |
2694 | while ((rko = rd_kafka_q_pop(rkq, |
2695 | rd_timeout_remains(abs_timeout), 0))) { |
2696 | rd_kafka_op_res_t res; |
2697 | |
2698 | res = rd_kafka_poll_cb(rk, rkq, rko, |
2699 | RD_KAFKA_Q_CB_RETURN, NULL); |
2700 | |
2701 | if (res == RD_KAFKA_OP_RES_PASS) |
2702 | break; |
2703 | |
2704 | if (unlikely(res == RD_KAFKA_OP_RES_YIELD || |
2705 | rd_kafka_yield_thread)) { |
2706 | /* Callback called rd_kafka_yield(), we must |
2707 | * stop dispatching the queue and return. */ |
2708 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, |
2709 | EINTR); |
2710 | rd_kafka_app_polled(rk); |
2711 | return NULL; |
2712 | } |
2713 | |
2714 | /* Message was handled by callback. */ |
2715 | continue; |
2716 | } |
2717 | |
2718 | if (!rko) { |
2719 | /* Timeout reached with no op returned. */ |
2720 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT, |
2721 | ETIMEDOUT); |
2722 | rd_kafka_app_polled(rk); |
2723 | return NULL; |
2724 | } |
2725 | |
2726 | rd_kafka_assert(rk, |
2727 | rko->rko_type == RD_KAFKA_OP_FETCH || |
2728 | rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR); |
2729 | |
2730 | /* Get rkmessage from rko */ |
2731 | rkmessage = rd_kafka_message_get(rko); |
2732 | |
2733 | /* Store offset */ |
2734 | rd_kafka_op_offset_store(rk, rko, rkmessage); |
2735 | |
2736 | rd_kafka_set_last_error(0, 0); |
2737 | |
2738 | rd_kafka_app_polled(rk); |
2739 | |
2740 | return rkmessage; |
2741 | } |
2742 | |
2743 | rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt, |
2744 | int32_t partition, |
2745 | int timeout_ms) { |
2746 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
2747 | shptr_rd_kafka_toppar_t *s_rktp; |
2748 | rd_kafka_toppar_t *rktp; |
2749 | rd_kafka_message_t *rkmessage; |
2750 | |
2751 | rd_kafka_topic_rdlock(rkt); |
2752 | s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/); |
2753 | if (unlikely(!s_rktp)) |
2754 | s_rktp = rd_kafka_toppar_desired_get(rkt, partition); |
2755 | rd_kafka_topic_rdunlock(rkt); |
2756 | |
2757 | if (unlikely(!s_rktp)) { |
2758 | /* No such toppar known */ |
2759 | rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, |
2760 | ESRCH); |
2761 | return NULL; |
2762 | } |
2763 | |
2764 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2765 | rkmessage = rd_kafka_consume0(rkt->rkt_rk, |
2766 | rktp->rktp_fetchq, timeout_ms); |
2767 | |
2768 | rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */ |
2769 | |
2770 | return rkmessage; |
2771 | } |
2772 | |
2773 | |
2774 | rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu, |
2775 | int timeout_ms) { |
2776 | return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms); |
2777 | } |
2778 | |
2779 | |
2780 | |
2781 | |
2782 | rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) { |
2783 | rd_kafka_cgrp_t *rkcg; |
2784 | |
2785 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
2786 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
2787 | |
2788 | rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q); |
2789 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2790 | } |
2791 | |
2792 | |
2793 | |
2794 | |
2795 | rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, |
2796 | int timeout_ms) { |
2797 | rd_kafka_cgrp_t *rkcg; |
2798 | |
2799 | if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) { |
2800 | rd_kafka_message_t *rkmessage = rd_kafka_message_new(); |
2801 | rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
2802 | return rkmessage; |
2803 | } |
2804 | |
2805 | return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms); |
2806 | } |
2807 | |
2808 | |
2809 | rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) { |
2810 | rd_kafka_cgrp_t *rkcg; |
2811 | rd_kafka_op_t *rko; |
2812 | rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
2813 | rd_kafka_q_t *rkq; |
2814 | |
2815 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
2816 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
2817 | |
2818 | rd_kafka_dbg(rk, CONSUMER, "CLOSE" , "Closing consumer" ); |
2819 | |
2820 | /* Redirect cgrp queue to our temporary queue to make sure |
2821 | * all posted ops (e.g., rebalance callbacks) are served by |
2822 | * this function. */ |
2823 | rkq = rd_kafka_q_new(rk); |
2824 | rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq); |
2825 | |
2826 | rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */ |
2827 | |
2828 | /* Disable the queue if termination is immediate or the user |
2829 | * does not want the blocking consumer_close() behaviour, this will |
2830 | * cause any ops posted for this queue (such as rebalance) to |
2831 | * be destroyed. |
2832 | */ |
2833 | if (rd_kafka_destroy_flags_no_consumer_close(rk)) { |
2834 | rd_kafka_dbg(rk, CONSUMER, "CLOSE" , |
2835 | "Disabling and purging temporary queue to quench " |
2836 | "close events" ); |
2837 | rd_kafka_q_disable(rkq); |
2838 | /* Purge ops already enqueued */ |
2839 | rd_kafka_q_purge(rkq); |
2840 | } else { |
2841 | rd_kafka_dbg(rk, CONSUMER, "CLOSE" , |
2842 | "Waiting for close events" ); |
2843 | while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) { |
2844 | rd_kafka_op_res_t res; |
2845 | if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) == |
2846 | RD_KAFKA_OP_TERMINATE) { |
2847 | err = rko->rko_err; |
2848 | rd_kafka_op_destroy(rko); |
2849 | break; |
2850 | } |
2851 | res = rd_kafka_poll_cb(rk, rkq, rko, |
2852 | RD_KAFKA_Q_CB_RETURN, NULL); |
2853 | if (res == RD_KAFKA_OP_RES_PASS) |
2854 | rd_kafka_op_destroy(rko); |
2855 | /* Ignore YIELD, we need to finish */ |
2856 | } |
2857 | } |
2858 | |
2859 | rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL); |
2860 | |
2861 | rd_kafka_q_destroy_owner(rkq); |
2862 | |
2863 | rd_kafka_dbg(rk, CONSUMER, "CLOSE" , "Consumer closed" ); |
2864 | |
2865 | return err; |
2866 | } |
2867 | |
2868 | |
2869 | |
2870 | rd_kafka_resp_err_t |
2871 | rd_kafka_committed (rd_kafka_t *rk, |
2872 | rd_kafka_topic_partition_list_t *partitions, |
2873 | int timeout_ms) { |
2874 | rd_kafka_q_t *rkq; |
2875 | rd_kafka_resp_err_t err; |
2876 | rd_kafka_cgrp_t *rkcg; |
2877 | rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
2878 | |
2879 | if (!partitions) |
2880 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
2881 | |
2882 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
2883 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
2884 | |
2885 | /* Set default offsets. */ |
2886 | rd_kafka_topic_partition_list_reset_offsets(partitions, |
2887 | RD_KAFKA_OFFSET_INVALID); |
2888 | |
2889 | rkq = rd_kafka_q_new(rk); |
2890 | |
2891 | do { |
2892 | rd_kafka_op_t *rko; |
2893 | int state_version = rd_kafka_brokers_get_state_version(rk); |
2894 | |
2895 | rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH); |
2896 | rd_kafka_op_set_replyq(rko, rkq, NULL); |
2897 | |
2898 | /* Issue #827 |
2899 | * Copy partition list to avoid use-after-free if we time out |
2900 | * here, the app frees the list, and then cgrp starts |
2901 | * processing the op. */ |
2902 | rko->rko_u.offset_fetch.partitions = |
2903 | rd_kafka_topic_partition_list_copy(partitions); |
2904 | rko->rko_u.offset_fetch.do_free = 1; |
2905 | |
2906 | if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) { |
2907 | err = RD_KAFKA_RESP_ERR__DESTROY; |
2908 | break; |
2909 | } |
2910 | |
2911 | rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0); |
2912 | if (rko) { |
2913 | if (!(err = rko->rko_err)) |
2914 | rd_kafka_topic_partition_list_update( |
2915 | partitions, |
2916 | rko->rko_u.offset_fetch.partitions); |
2917 | else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD || |
2918 | err == RD_KAFKA_RESP_ERR__TRANSPORT) && |
2919 | !rd_kafka_brokers_wait_state_change( |
2920 | rk, state_version, |
2921 | rd_timeout_remains(abs_timeout))) |
2922 | err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
2923 | |
2924 | rd_kafka_op_destroy(rko); |
2925 | } else |
2926 | err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
2927 | } while (err == RD_KAFKA_RESP_ERR__TRANSPORT || |
2928 | err == RD_KAFKA_RESP_ERR__WAIT_COORD); |
2929 | |
2930 | rd_kafka_q_destroy_owner(rkq); |
2931 | |
2932 | return err; |
2933 | } |
2934 | |
2935 | |
2936 | |
2937 | rd_kafka_resp_err_t |
2938 | rd_kafka_position (rd_kafka_t *rk, |
2939 | rd_kafka_topic_partition_list_t *partitions) { |
2940 | int i; |
2941 | |
2942 | /* Set default offsets. */ |
2943 | rd_kafka_topic_partition_list_reset_offsets(partitions, |
2944 | RD_KAFKA_OFFSET_INVALID); |
2945 | |
2946 | for (i = 0 ; i < partitions->cnt ; i++) { |
2947 | rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; |
2948 | shptr_rd_kafka_toppar_t *s_rktp; |
2949 | rd_kafka_toppar_t *rktp; |
2950 | |
2951 | if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic, |
2952 | rktpar->partition, 0, 1))) { |
2953 | rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
2954 | rktpar->offset = RD_KAFKA_OFFSET_INVALID; |
2955 | continue; |
2956 | } |
2957 | |
2958 | rktp = rd_kafka_toppar_s2i(s_rktp); |
2959 | rd_kafka_toppar_lock(rktp); |
2960 | rktpar->offset = rktp->rktp_app_offset; |
2961 | rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR; |
2962 | rd_kafka_toppar_unlock(rktp); |
2963 | rd_kafka_toppar_destroy(s_rktp); |
2964 | } |
2965 | |
2966 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2967 | } |
2968 | |
2969 | |
2970 | |
2971 | struct _query_wmark_offsets_state { |
2972 | rd_kafka_resp_err_t err; |
2973 | const char *topic; |
2974 | int32_t partition; |
2975 | int64_t offsets[2]; |
2976 | int offidx; /* next offset to set from response */ |
2977 | rd_ts_t ts_end; |
2978 | int state_version; /* Broker state version */ |
2979 | }; |
2980 | |
2981 | static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk, |
2982 | rd_kafka_broker_t *rkb, |
2983 | rd_kafka_resp_err_t err, |
2984 | rd_kafka_buf_t *rkbuf, |
2985 | rd_kafka_buf_t *request, |
2986 | void *opaque) { |
2987 | struct _query_wmark_offsets_state *state; |
2988 | rd_kafka_topic_partition_list_t *offsets; |
2989 | rd_kafka_topic_partition_t *rktpar; |
2990 | |
2991 | if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
2992 | /* 'state' has gone out of scope when query_watermark..() |
2993 | * timed out and returned to the caller. */ |
2994 | return; |
2995 | } |
2996 | |
2997 | state = opaque; |
2998 | |
2999 | offsets = rd_kafka_topic_partition_list_new(1); |
3000 | err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets); |
3001 | if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { |
3002 | rd_kafka_topic_partition_list_destroy(offsets); |
3003 | return; /* Retrying */ |
3004 | } |
3005 | |
3006 | /* Retry if no broker connection is available yet. */ |
3007 | if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD || |
3008 | err == RD_KAFKA_RESP_ERR__TRANSPORT) && |
3009 | rkb && |
3010 | rd_kafka_brokers_wait_state_change( |
3011 | rkb->rkb_rk, state->state_version, |
3012 | rd_timeout_remains(state->ts_end))) { |
3013 | /* Retry */ |
3014 | state->state_version = rd_kafka_brokers_get_state_version(rk); |
3015 | request->rkbuf_retries = 0; |
3016 | if (rd_kafka_buf_retry(rkb, request)) { |
3017 | rd_kafka_topic_partition_list_destroy(offsets); |
3018 | return; /* Retry in progress */ |
3019 | } |
3020 | /* FALLTHRU */ |
3021 | } |
3022 | |
3023 | /* Partition not seen in response. */ |
3024 | if (!(rktpar = rd_kafka_topic_partition_list_find(offsets, |
3025 | state->topic, |
3026 | state->partition))) |
3027 | err = RD_KAFKA_RESP_ERR__BAD_MSG; |
3028 | else if (rktpar->err) |
3029 | err = rktpar->err; |
3030 | else |
3031 | state->offsets[state->offidx] = rktpar->offset; |
3032 | |
3033 | state->offidx++; |
3034 | |
3035 | if (err || state->offidx == 2) /* Error or Done */ |
3036 | state->err = err; |
3037 | |
3038 | rd_kafka_topic_partition_list_destroy(offsets); |
3039 | } |
3040 | |
3041 | |
3042 | rd_kafka_resp_err_t |
3043 | rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic, |
3044 | int32_t partition, |
3045 | int64_t *low, int64_t *high, int timeout_ms) { |
3046 | rd_kafka_q_t *rkq; |
3047 | struct _query_wmark_offsets_state state; |
3048 | rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
3049 | rd_kafka_topic_partition_list_t *partitions; |
3050 | rd_kafka_topic_partition_t *rktpar; |
3051 | struct rd_kafka_partition_leader *leader; |
3052 | rd_list_t leaders; |
3053 | rd_kafka_resp_err_t err; |
3054 | |
3055 | partitions = rd_kafka_topic_partition_list_new(1); |
3056 | rktpar = rd_kafka_topic_partition_list_add(partitions, |
3057 | topic, partition); |
3058 | |
3059 | rd_list_init(&leaders, partitions->cnt, |
3060 | (void *)rd_kafka_partition_leader_destroy); |
3061 | |
3062 | err = rd_kafka_topic_partition_list_query_leaders(rk, partitions, |
3063 | &leaders, timeout_ms); |
3064 | if (err) { |
3065 | rd_list_destroy(&leaders); |
3066 | rd_kafka_topic_partition_list_destroy(partitions); |
3067 | return err; |
3068 | } |
3069 | |
3070 | leader = rd_list_elem(&leaders, 0); |
3071 | |
3072 | rkq = rd_kafka_q_new(rk); |
3073 | |
3074 | /* Due to KAFKA-1588 we need to send a request for each wanted offset, |
3075 | * in this case one for the low watermark and one for the high. */ |
3076 | state.topic = topic; |
3077 | state.partition = partition; |
3078 | state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING; |
3079 | state.offsets[1] = RD_KAFKA_OFFSET_END; |
3080 | state.offidx = 0; |
3081 | state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS; |
3082 | state.ts_end = ts_end; |
3083 | state.state_version = rd_kafka_brokers_get_state_version(rk); |
3084 | |
3085 | |
3086 | rktpar->offset = RD_KAFKA_OFFSET_BEGINNING; |
3087 | rd_kafka_OffsetRequest(leader->rkb, partitions, 0, |
3088 | RD_KAFKA_REPLYQ(rkq, 0), |
3089 | rd_kafka_query_wmark_offsets_resp_cb, |
3090 | &state); |
3091 | |
3092 | rktpar->offset = RD_KAFKA_OFFSET_END; |
3093 | rd_kafka_OffsetRequest(leader->rkb, partitions, 0, |
3094 | RD_KAFKA_REPLYQ(rkq, 0), |
3095 | rd_kafka_query_wmark_offsets_resp_cb, |
3096 | &state); |
3097 | |
3098 | rd_kafka_topic_partition_list_destroy(partitions); |
3099 | rd_list_destroy(&leaders); |
3100 | |
3101 | /* Wait for reply (or timeout) */ |
3102 | while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS && |
3103 | rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK, |
3104 | rd_kafka_poll_cb, NULL) != |
3105 | RD_KAFKA_OP_RES_YIELD) |
3106 | ; |
3107 | |
3108 | rd_kafka_q_destroy_owner(rkq); |
3109 | |
3110 | if (state.err) |
3111 | return state.err; |
3112 | else if (state.offidx != 2) |
3113 | return RD_KAFKA_RESP_ERR__FAIL; |
3114 | |
3115 | /* We are not certain about the returned order. */ |
3116 | if (state.offsets[0] < state.offsets[1]) { |
3117 | *low = state.offsets[0]; |
3118 | *high = state.offsets[1]; |
3119 | } else { |
3120 | *low = state.offsets[1]; |
3121 | *high = state.offsets[0]; |
3122 | } |
3123 | |
3124 | /* If partition is empty only one offset (the last) will be returned. */ |
3125 | if (*low < 0 && *high >= 0) |
3126 | *low = *high; |
3127 | |
3128 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3129 | } |
3130 | |
3131 | |
3132 | rd_kafka_resp_err_t |
3133 | rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic, |
3134 | int32_t partition, |
3135 | int64_t *low, int64_t *high) { |
3136 | shptr_rd_kafka_toppar_t *s_rktp; |
3137 | rd_kafka_toppar_t *rktp; |
3138 | |
3139 | s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1); |
3140 | if (!s_rktp) |
3141 | return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
3142 | rktp = rd_kafka_toppar_s2i(s_rktp); |
3143 | |
3144 | rd_kafka_toppar_lock(rktp); |
3145 | *low = rktp->rktp_lo_offset; |
3146 | *high = rktp->rktp_hi_offset; |
3147 | rd_kafka_toppar_unlock(rktp); |
3148 | |
3149 | rd_kafka_toppar_destroy(s_rktp); |
3150 | |
3151 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3152 | } |
3153 | |
3154 | |
3155 | /** |
3156 | * @brief get_offsets_for_times() state |
3157 | */ |
3158 | struct _get_offsets_for_times { |
3159 | rd_kafka_topic_partition_list_t *results; |
3160 | rd_kafka_resp_err_t err; |
3161 | int wait_reply; |
3162 | int state_version; |
3163 | rd_ts_t ts_end; |
3164 | }; |
3165 | |
3166 | /** |
3167 | * @brief Handle OffsetRequest responses |
3168 | */ |
3169 | static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk, |
3170 | rd_kafka_broker_t *rkb, |
3171 | rd_kafka_resp_err_t err, |
3172 | rd_kafka_buf_t *rkbuf, |
3173 | rd_kafka_buf_t *request, |
3174 | void *opaque) { |
3175 | struct _get_offsets_for_times *state; |
3176 | |
3177 | if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
3178 | /* 'state' has gone out of scope when offsets_for_times() |
3179 | * timed out and returned to the caller. */ |
3180 | return; |
3181 | } |
3182 | |
3183 | state = opaque; |
3184 | |
3185 | err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, |
3186 | state->results); |
3187 | if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) |
3188 | return; /* Retrying */ |
3189 | |
3190 | /* Retry if no broker connection is available yet. */ |
3191 | if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD || |
3192 | err == RD_KAFKA_RESP_ERR__TRANSPORT) && |
3193 | rkb && |
3194 | rd_kafka_brokers_wait_state_change( |
3195 | rkb->rkb_rk, state->state_version, |
3196 | rd_timeout_remains(state->ts_end))) { |
3197 | /* Retry */ |
3198 | state->state_version = rd_kafka_brokers_get_state_version(rk); |
3199 | request->rkbuf_retries = 0; |
3200 | if (rd_kafka_buf_retry(rkb, request)) |
3201 | return; /* Retry in progress */ |
3202 | /* FALLTHRU */ |
3203 | } |
3204 | |
3205 | if (err && !state->err) |
3206 | state->err = err; |
3207 | |
3208 | state->wait_reply--; |
3209 | } |
3210 | |
3211 | |
3212 | rd_kafka_resp_err_t |
3213 | rd_kafka_offsets_for_times (rd_kafka_t *rk, |
3214 | rd_kafka_topic_partition_list_t *offsets, |
3215 | int timeout_ms) { |
3216 | rd_kafka_q_t *rkq; |
3217 | struct _get_offsets_for_times state = RD_ZERO_INIT; |
3218 | rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
3219 | rd_list_t leaders; |
3220 | int i; |
3221 | rd_kafka_resp_err_t err; |
3222 | struct rd_kafka_partition_leader *leader; |
3223 | int tmout; |
3224 | |
3225 | if (offsets->cnt == 0) |
3226 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3227 | |
3228 | rd_list_init(&leaders, offsets->cnt, |
3229 | (void *)rd_kafka_partition_leader_destroy); |
3230 | |
3231 | err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders, |
3232 | timeout_ms); |
3233 | if (err) { |
3234 | rd_list_destroy(&leaders); |
3235 | return err; |
3236 | } |
3237 | |
3238 | |
3239 | rkq = rd_kafka_q_new(rk); |
3240 | |
3241 | state.wait_reply = 0; |
3242 | state.results = rd_kafka_topic_partition_list_new(offsets->cnt); |
3243 | |
3244 | /* For each leader send a request for its partitions */ |
3245 | RD_LIST_FOREACH(leader, &leaders, i) { |
3246 | state.wait_reply++; |
3247 | rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1, |
3248 | RD_KAFKA_REPLYQ(rkq, 0), |
3249 | rd_kafka_get_offsets_for_times_resp_cb, |
3250 | &state); |
3251 | } |
3252 | |
3253 | rd_list_destroy(&leaders); |
3254 | |
3255 | /* Wait for reply (or timeout) */ |
3256 | while (state.wait_reply > 0 && |
3257 | !rd_timeout_expired((tmout = rd_timeout_remains(ts_end)))) |
3258 | rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK, |
3259 | rd_kafka_poll_cb, NULL); |
3260 | |
3261 | rd_kafka_q_destroy_owner(rkq); |
3262 | |
3263 | if (state.wait_reply > 0 && !state.err) |
3264 | state.err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
3265 | |
3266 | /* Then update the queried partitions. */ |
3267 | if (!state.err) |
3268 | rd_kafka_topic_partition_list_update(offsets, state.results); |
3269 | |
3270 | rd_kafka_topic_partition_list_destroy(state.results); |
3271 | |
3272 | return state.err; |
3273 | } |
3274 | |
3275 | |
3276 | /** |
3277 | * @brief rd_kafka_poll() (and similar) op callback handler. |
3278 | * Will either call registered callback depending on cb_type and op type |
3279 | * or return op to application, if applicable (e.g., fetch message). |
3280 | * |
3281 | * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the |
3282 | * other res types (such as OP_RES_PASS). |
3283 | * |
3284 | * @locality application thread |
3285 | */ |
3286 | rd_kafka_op_res_t |
3287 | rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
3288 | rd_kafka_q_cb_type_t cb_type, void *opaque) { |
3289 | rd_kafka_msg_t *rkm; |
3290 | rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED; |
3291 | |
3292 | /* Special handling for events based on cb_type */ |
3293 | if (cb_type == RD_KAFKA_Q_CB_EVENT && |
3294 | rd_kafka_event_setup(rk, rko)) { |
3295 | /* Return-as-event requested. */ |
3296 | return RD_KAFKA_OP_RES_PASS; /* Return as event */ |
3297 | } |
3298 | |
3299 | switch ((int)rko->rko_type) |
3300 | { |
3301 | case RD_KAFKA_OP_FETCH: |
3302 | if (!rk->rk_conf.consume_cb || |
3303 | cb_type == RD_KAFKA_Q_CB_RETURN || |
3304 | cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) |
3305 | return RD_KAFKA_OP_RES_PASS; /* Dont handle here */ |
3306 | else { |
3307 | struct consume_ctx ctx = { |
3308 | .consume_cb = rk->rk_conf.consume_cb, |
3309 | .opaque = rk->rk_conf.opaque }; |
3310 | |
3311 | return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx); |
3312 | } |
3313 | break; |
3314 | |
3315 | case RD_KAFKA_OP_REBALANCE: |
3316 | /* If EVENT_REBALANCE is enabled but rebalance_cb isnt |
3317 | * we need to perform a dummy assign for the application. |
3318 | * This might happen during termination with consumer_close() */ |
3319 | if (rk->rk_conf.rebalance_cb) |
3320 | rk->rk_conf.rebalance_cb( |
3321 | rk, rko->rko_err, |
3322 | rko->rko_u.rebalance.partitions, |
3323 | rk->rk_conf.opaque); |
3324 | else { |
3325 | rd_kafka_dbg(rk, CGRP, "UNASSIGN" , |
3326 | "Forcing unassign of %d partition(s)" , |
3327 | rko->rko_u.rebalance.partitions ? |
3328 | rko->rko_u.rebalance.partitions->cnt : 0); |
3329 | rd_kafka_assign(rk, NULL); |
3330 | } |
3331 | break; |
3332 | |
3333 | case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY: |
3334 | if (!rko->rko_u.offset_commit.cb) |
3335 | return RD_KAFKA_OP_RES_PASS; /* Dont handle here */ |
3336 | rko->rko_u.offset_commit.cb( |
3337 | rk, rko->rko_err, |
3338 | rko->rko_u.offset_commit.partitions, |
3339 | rko->rko_u.offset_commit.opaque); |
3340 | break; |
3341 | |
3342 | case RD_KAFKA_OP_CONSUMER_ERR: |
3343 | /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER): |
3344 | * Consumer errors are returned to the application |
3345 | * as rkmessages, not error callbacks. |
3346 | * |
3347 | * rd_kafka_poll() (_Q_CB_GLOBAL): |
3348 | * convert to ERR op (fallthru) |
3349 | */ |
3350 | if (cb_type == RD_KAFKA_Q_CB_RETURN || |
3351 | cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) { |
3352 | /* return as message_t to application */ |
3353 | return RD_KAFKA_OP_RES_PASS; |
3354 | } |
3355 | /* FALLTHRU */ |
3356 | |
3357 | case RD_KAFKA_OP_ERR: |
3358 | if (rk->rk_conf.error_cb) |
3359 | rk->rk_conf.error_cb(rk, rko->rko_err, |
3360 | rko->rko_u.err.errstr, |
3361 | rk->rk_conf.opaque); |
3362 | else { |
3363 | /* If error string already contains |
3364 | * the err2str then skip including err2str in |
3365 | * the printout */ |
3366 | if (rko->rko_u.err.errstr && |
3367 | strstr(rko->rko_u.err.errstr, |
3368 | rd_kafka_err2str(rko->rko_err))) |
3369 | rd_kafka_log(rk, LOG_ERR, "ERROR" , |
3370 | "%s: %s" , |
3371 | rk->rk_name, |
3372 | rko->rko_u.err.errstr); |
3373 | else |
3374 | rd_kafka_log(rk, LOG_ERR, "ERROR" , |
3375 | "%s: %s: %s" , |
3376 | rk->rk_name, |
3377 | rko->rko_u.err.errstr, |
3378 | rd_kafka_err2str(rko->rko_err)); |
3379 | } |
3380 | break; |
3381 | |
3382 | case RD_KAFKA_OP_DR: |
3383 | /* Delivery report: |
3384 | * call application DR callback for each message. */ |
3385 | while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) { |
3386 | rd_kafka_message_t *rkmessage; |
3387 | |
3388 | TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs, |
3389 | rkm, rkm_link); |
3390 | |
3391 | rkmessage = rd_kafka_message_get_from_rkm(rko, rkm); |
3392 | |
3393 | if (rk->rk_conf.dr_msg_cb) { |
3394 | rk->rk_conf.dr_msg_cb(rk, rkmessage, |
3395 | rk->rk_conf.opaque); |
3396 | |
3397 | } else { |
3398 | |
3399 | rk->rk_conf.dr_cb(rk, |
3400 | rkmessage->payload, |
3401 | rkmessage->len, |
3402 | rkmessage->err, |
3403 | rk->rk_conf.opaque, |
3404 | rkmessage->_private); |
3405 | } |
3406 | |
3407 | rd_kafka_msg_destroy(rk, rkm); |
3408 | |
3409 | if (unlikely(rd_kafka_yield_thread)) { |
3410 | /* Callback called yield(), |
3411 | * re-enqueue the op (if there are any |
3412 | * remaining messages). */ |
3413 | if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq. |
3414 | rkmq_msgs)) |
3415 | rd_kafka_q_reenq(rkq, rko); |
3416 | else |
3417 | rd_kafka_op_destroy(rko); |
3418 | return RD_KAFKA_OP_RES_YIELD; |
3419 | } |
3420 | } |
3421 | |
3422 | rd_kafka_msgq_init(&rko->rko_u.dr.msgq); |
3423 | |
3424 | break; |
3425 | |
3426 | case RD_KAFKA_OP_THROTTLE: |
3427 | if (rk->rk_conf.throttle_cb) |
3428 | rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename, |
3429 | rko->rko_u.throttle.nodeid, |
3430 | rko->rko_u.throttle. |
3431 | throttle_time, |
3432 | rk->rk_conf.opaque); |
3433 | break; |
3434 | |
3435 | case RD_KAFKA_OP_STATS: |
3436 | /* Statistics */ |
3437 | if (rk->rk_conf.stats_cb && |
3438 | rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json, |
3439 | rko->rko_u.stats.json_len, |
3440 | rk->rk_conf.opaque) == 1) |
3441 | rko->rko_u.stats.json = NULL; /* Application wanted json ptr */ |
3442 | break; |
3443 | |
3444 | case RD_KAFKA_OP_LOG: |
3445 | if (likely(rk->rk_conf.log_cb && |
3446 | rk->rk_conf.log_level >= rko->rko_u.log.level)) |
3447 | rk->rk_conf.log_cb(rk, |
3448 | rko->rko_u.log.level, |
3449 | rko->rko_u.log.fac, |
3450 | rko->rko_u.log.str); |
3451 | break; |
3452 | |
3453 | case RD_KAFKA_OP_TERMINATE: |
3454 | /* nop: just a wake-up */ |
3455 | break; |
3456 | |
3457 | case RD_KAFKA_OP_CREATETOPICS: |
3458 | case RD_KAFKA_OP_DELETETOPICS: |
3459 | case RD_KAFKA_OP_CREATEPARTITIONS: |
3460 | case RD_KAFKA_OP_ALTERCONFIGS: |
3461 | case RD_KAFKA_OP_DESCRIBECONFIGS: |
3462 | /* Calls op_destroy() from worker callback, |
3463 | * when the time comes. */ |
3464 | res = rd_kafka_op_call(rk, rkq, rko); |
3465 | break; |
3466 | |
3467 | case RD_KAFKA_OP_ADMIN_RESULT: |
3468 | if (cb_type == RD_KAFKA_Q_CB_RETURN || |
3469 | cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) |
3470 | return RD_KAFKA_OP_RES_PASS; /* Don't handle here */ |
3471 | |
3472 | /* Op is silently destroyed below */ |
3473 | break; |
3474 | |
3475 | default: |
3476 | rd_kafka_assert(rk, !*"cant handle op type" ); |
3477 | break; |
3478 | } |
3479 | |
3480 | if (res == RD_KAFKA_OP_RES_HANDLED) |
3481 | rd_kafka_op_destroy(rko); |
3482 | |
3483 | return res; |
3484 | } |
3485 | |
3486 | int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) { |
3487 | int r; |
3488 | |
3489 | if (timeout_ms) |
3490 | rd_kafka_app_poll_blocking(rk); |
3491 | |
3492 | r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, |
3493 | RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); |
3494 | |
3495 | rd_kafka_app_polled(rk); |
3496 | |
3497 | return r; |
3498 | } |
3499 | |
3500 | |
3501 | rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) { |
3502 | rd_kafka_op_t *rko; |
3503 | |
3504 | if (timeout_ms) |
3505 | rd_kafka_app_poll_blocking(rkqu->rkqu_rk); |
3506 | |
3507 | rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, timeout_ms, 0, |
3508 | RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); |
3509 | |
3510 | rd_kafka_app_polled(rkqu->rkqu_rk); |
3511 | |
3512 | if (!rko) |
3513 | return NULL; |
3514 | |
3515 | return rko; |
3516 | } |
3517 | |
3518 | int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) { |
3519 | int r; |
3520 | |
3521 | if (timeout_ms) |
3522 | rd_kafka_app_poll_blocking(rkqu->rkqu_rk); |
3523 | |
3524 | r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, |
3525 | RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); |
3526 | |
3527 | rd_kafka_app_polled(rkqu->rkqu_rk); |
3528 | |
3529 | return r; |
3530 | } |
3531 | |
3532 | |
3533 | |
3534 | static void rd_kafka_toppar_dump (FILE *fp, const char *indent, |
3535 | rd_kafka_toppar_t *rktp) { |
3536 | |
3537 | fprintf(fp, "%s%.*s [%" PRId32"] leader %s\n" , |
3538 | indent, |
3539 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
3540 | rktp->rktp_partition, |
3541 | rktp->rktp_leader ? |
3542 | rktp->rktp_leader->rkb_name : "none" ); |
3543 | fprintf(fp, |
3544 | "%s refcnt %i\n" |
3545 | "%s msgq: %i messages\n" |
3546 | "%s xmit_msgq: %i messages\n" |
3547 | "%s total: %" PRIu64" messages, %" PRIu64" bytes\n" , |
3548 | indent, rd_refcnt_get(&rktp->rktp_refcnt), |
3549 | indent, rktp->rktp_msgq.rkmq_msg_cnt, |
3550 | indent, rktp->rktp_xmit_msgq.rkmq_msg_cnt, |
3551 | indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs), |
3552 | rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes)); |
3553 | } |
3554 | |
3555 | static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) { |
3556 | rd_kafka_toppar_t *rktp; |
3557 | |
3558 | if (locks) |
3559 | rd_kafka_broker_lock(rkb); |
3560 | fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %" PRId32 |
3561 | " in state %s (for %.3fs)\n" , |
3562 | rkb, rkb->rkb_name, rkb->rkb_nodeid, |
3563 | rd_kafka_broker_state_names[rkb->rkb_state], |
3564 | rkb->rkb_ts_state ? |
3565 | (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f : |
3566 | 0.0f); |
3567 | fprintf(fp, " refcnt %i\n" , rd_refcnt_get(&rkb->rkb_refcnt)); |
3568 | fprintf(fp, " outbuf_cnt: %i waitresp_cnt: %i\n" , |
3569 | rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt), |
3570 | rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt)); |
3571 | fprintf(fp, |
3572 | " %" PRIu64 " messages sent, %" PRIu64" bytes, " |
3573 | "%" PRIu64" errors, %" PRIu64" timeouts\n" |
3574 | " %" PRIu64 " messages received, %" PRIu64" bytes, " |
3575 | "%" PRIu64" errors\n" |
3576 | " %" PRIu64 " messageset transmissions were retried\n" , |
3577 | rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes), |
3578 | rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts), |
3579 | rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes), |
3580 | rd_atomic64_get(&rkb->rkb_c.rx_err), |
3581 | rd_atomic64_get(&rkb->rkb_c.tx_retries)); |
3582 | |
3583 | fprintf(fp, " %i toppars:\n" , rkb->rkb_toppar_cnt); |
3584 | TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) |
3585 | rd_kafka_toppar_dump(fp, " " , rktp); |
3586 | if (locks) { |
3587 | rd_kafka_broker_unlock(rkb); |
3588 | } |
3589 | } |
3590 | |
3591 | |
3592 | static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) { |
3593 | rd_kafka_broker_t *rkb; |
3594 | rd_kafka_itopic_t *rkt; |
3595 | rd_kafka_toppar_t *rktp; |
3596 | shptr_rd_kafka_toppar_t *s_rktp; |
3597 | int i; |
3598 | unsigned int tot_cnt; |
3599 | size_t tot_size; |
3600 | |
3601 | rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size); |
3602 | |
3603 | if (locks) |
3604 | rd_kafka_rdlock(rk); |
3605 | #if ENABLE_DEVEL |
3606 | fprintf(fp, "rd_kafka_op_cnt: %d\n" , rd_atomic32_get(&rd_kafka_op_cnt)); |
3607 | #endif |
3608 | fprintf(fp, "rd_kafka_t %p: %s\n" , rk, rk->rk_name); |
3609 | |
3610 | fprintf(fp, " producer.msg_cnt %u (%" PRIusz" bytes)\n" , |
3611 | tot_cnt, tot_size); |
3612 | fprintf(fp, " rk_rep reply queue: %i ops\n" , |
3613 | rd_kafka_q_len(rk->rk_rep)); |
3614 | |
3615 | fprintf(fp, " brokers:\n" ); |
3616 | if (locks) |
3617 | mtx_lock(&rk->rk_internal_rkb_lock); |
3618 | if (rk->rk_internal_rkb) |
3619 | rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks); |
3620 | if (locks) |
3621 | mtx_unlock(&rk->rk_internal_rkb_lock); |
3622 | |
3623 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
3624 | rd_kafka_broker_dump(fp, rkb, locks); |
3625 | } |
3626 | |
3627 | fprintf(fp, " cgrp:\n" ); |
3628 | if (rk->rk_cgrp) { |
3629 | rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; |
3630 | fprintf(fp, " %.*s in state %s, flags 0x%x\n" , |
3631 | RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), |
3632 | rd_kafka_cgrp_state_names[rkcg->rkcg_state], |
3633 | rkcg->rkcg_flags); |
3634 | fprintf(fp, " coord_id %" PRId32", broker %s\n" , |
3635 | rkcg->rkcg_coord_id, |
3636 | rkcg->rkcg_curr_coord ? |
3637 | rd_kafka_broker_name(rkcg->rkcg_curr_coord):"(none)" ); |
3638 | |
3639 | fprintf(fp, " toppars:\n" ); |
3640 | RD_LIST_FOREACH(s_rktp, &rkcg->rkcg_toppars, i) { |
3641 | rktp = rd_kafka_toppar_s2i(s_rktp); |
3642 | fprintf(fp, " %.*s [%" PRId32"] in state %s\n" , |
3643 | RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
3644 | rktp->rktp_partition, |
3645 | rd_kafka_fetch_states[rktp->rktp_fetch_state]); |
3646 | } |
3647 | } |
3648 | |
3649 | fprintf(fp, " topics:\n" ); |
3650 | TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
3651 | fprintf(fp, " %.*s with %" PRId32" partitions, state %s, " |
3652 | "refcnt %i\n" , |
3653 | RD_KAFKAP_STR_PR(rkt->rkt_topic), |
3654 | rkt->rkt_partition_cnt, |
3655 | rd_kafka_topic_state_names[rkt->rkt_state], |
3656 | rd_refcnt_get(&rkt->rkt_refcnt)); |
3657 | if (rkt->rkt_ua) |
3658 | rd_kafka_toppar_dump(fp, " " , |
3659 | rd_kafka_toppar_s2i(rkt->rkt_ua)); |
3660 | if (rd_list_empty(&rkt->rkt_desp)) { |
3661 | fprintf(fp, " desired partitions:" ); |
3662 | RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) |
3663 | fprintf(fp, " %" PRId32, |
3664 | rd_kafka_toppar_s2i(s_rktp)-> |
3665 | rktp_partition); |
3666 | fprintf(fp, "\n" ); |
3667 | } |
3668 | } |
3669 | |
3670 | fprintf(fp, "\n" ); |
3671 | rd_kafka_metadata_cache_dump(fp, rk); |
3672 | |
3673 | if (locks) |
3674 | rd_kafka_rdunlock(rk); |
3675 | } |
3676 | |
3677 | void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) { |
3678 | |
3679 | if (rk) |
3680 | rd_kafka_dump0(fp, rk, 1/*locks*/); |
3681 | |
3682 | #if ENABLE_SHAREDPTR_DEBUG |
3683 | rd_shared_ptrs_dump(); |
3684 | #endif |
3685 | } |
3686 | |
3687 | |
3688 | |
3689 | const char *rd_kafka_name (const rd_kafka_t *rk) { |
3690 | return rk->rk_name; |
3691 | } |
3692 | |
3693 | rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) { |
3694 | return rk->rk_type; |
3695 | } |
3696 | |
3697 | |
3698 | char *rd_kafka_memberid (const rd_kafka_t *rk) { |
3699 | rd_kafka_op_t *rko; |
3700 | rd_kafka_cgrp_t *rkcg; |
3701 | char *memberid; |
3702 | |
3703 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
3704 | return NULL; |
3705 | |
3706 | rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME); |
3707 | if (!rko) |
3708 | return NULL; |
3709 | memberid = rko->rko_u.name.str; |
3710 | rko->rko_u.name.str = NULL; |
3711 | rd_kafka_op_destroy(rko); |
3712 | |
3713 | return memberid; |
3714 | } |
3715 | |
3716 | |
3717 | char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) { |
3718 | rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
3719 | |
3720 | /* ClusterId is returned in Metadata >=V2 responses and |
3721 | * cached on the rk. If no cached value is available |
3722 | * it means no metadata has been received yet, or we're |
3723 | * using a lower protocol version |
3724 | * (e.g., lack of api.version.request=true). */ |
3725 | |
3726 | while (1) { |
3727 | int remains_ms; |
3728 | |
3729 | rd_kafka_rdlock(rk); |
3730 | |
3731 | if (rk->rk_clusterid) { |
3732 | /* Cached clusterid available. */ |
3733 | char *ret = rd_strdup(rk->rk_clusterid); |
3734 | rd_kafka_rdunlock(rk); |
3735 | return ret; |
3736 | } else if (rk->rk_ts_metadata > 0) { |
3737 | /* Metadata received but no clusterid, |
3738 | * this probably means the broker is too old |
3739 | * or api.version.request=false. */ |
3740 | rd_kafka_rdunlock(rk); |
3741 | return NULL; |
3742 | } |
3743 | |
3744 | rd_kafka_rdunlock(rk); |
3745 | |
3746 | /* Wait for up to timeout_ms for a metadata refresh, |
3747 | * if permitted by application. */ |
3748 | remains_ms = rd_timeout_remains(abs_timeout); |
3749 | if (rd_timeout_expired(remains_ms)) |
3750 | return NULL; |
3751 | |
3752 | rd_kafka_metadata_cache_wait_change(rk, remains_ms); |
3753 | } |
3754 | |
3755 | return NULL; |
3756 | } |
3757 | |
3758 | |
3759 | int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms) { |
3760 | rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); |
3761 | |
3762 | /* ControllerId is returned in Metadata >=V1 responses and |
3763 | * cached on the rk. If no cached value is available |
3764 | * it means no metadata has been received yet, or we're |
3765 | * using a lower protocol version |
3766 | * (e.g., lack of api.version.request=true). */ |
3767 | |
3768 | while (1) { |
3769 | int remains_ms; |
3770 | int version; |
3771 | |
3772 | version = rd_kafka_brokers_get_state_version(rk); |
3773 | |
3774 | rd_kafka_rdlock(rk); |
3775 | |
3776 | if (rk->rk_controllerid != -1) { |
3777 | /* Cached controllerid available. */ |
3778 | rd_kafka_rdunlock(rk); |
3779 | return rk->rk_controllerid; |
3780 | } else if (rk->rk_ts_metadata > 0) { |
3781 | /* Metadata received but no clusterid, |
3782 | * this probably means the broker is too old |
3783 | * or api.version.request=false. */ |
3784 | rd_kafka_rdunlock(rk); |
3785 | return -1; |
3786 | } |
3787 | |
3788 | rd_kafka_rdunlock(rk); |
3789 | |
3790 | /* Wait for up to timeout_ms for a metadata refresh, |
3791 | * if permitted by application. */ |
3792 | remains_ms = rd_timeout_remains(abs_timeout); |
3793 | if (rd_timeout_expired(remains_ms)) |
3794 | return -1; |
3795 | |
3796 | rd_kafka_brokers_wait_state_change(rk, version, remains_ms); |
3797 | } |
3798 | |
3799 | return -1; |
3800 | } |
3801 | |
3802 | |
3803 | void *rd_kafka_opaque (const rd_kafka_t *rk) { |
3804 | return rk->rk_conf.opaque; |
3805 | } |
3806 | |
3807 | |
3808 | int rd_kafka_outq_len (rd_kafka_t *rk) { |
3809 | return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) + |
3810 | (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0); |
3811 | } |
3812 | |
3813 | |
3814 | rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) { |
3815 | unsigned int msg_cnt = 0; |
3816 | int qlen; |
3817 | rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
3818 | int tmout; |
3819 | |
3820 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
3821 | return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
3822 | |
3823 | rd_kafka_yield_thread = 0; |
3824 | |
3825 | /* First poll call is non-blocking for the case |
3826 | * where timeout_ms==RD_POLL_NOWAIT to make sure poll is |
3827 | * called at least once. */ |
3828 | tmout = RD_POLL_NOWAIT; |
3829 | do { |
3830 | rd_kafka_poll(rk, tmout); |
3831 | } while (((qlen = rd_kafka_q_len(rk->rk_rep)) > 0 || |
3832 | (msg_cnt = rd_kafka_curr_msgs_cnt(rk)) > 0) && |
3833 | !rd_kafka_yield_thread && |
3834 | (tmout = rd_timeout_remains_limit(ts_end, 10)) != |
3835 | RD_POLL_NOWAIT); |
3836 | |
3837 | return qlen + msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT : |
3838 | RD_KAFKA_RESP_ERR_NO_ERROR; |
3839 | } |
3840 | |
3841 | |
3842 | |
3843 | rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) { |
3844 | rd_kafka_broker_t *rkb; |
3845 | rd_kafka_q_t *tmpq = NULL; |
3846 | int waitcnt = 0; |
3847 | |
3848 | if (rk->rk_type != RD_KAFKA_PRODUCER) |
3849 | return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
3850 | |
3851 | /* Check that future flags are not passed */ |
3852 | if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0) |
3853 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3854 | |
3855 | /* Nothing to purge */ |
3856 | if (!purge_flags) |
3857 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3858 | |
3859 | /* Set up a reply queue to wait for broker thread signalling |
3860 | * completion, unless non-blocking. */ |
3861 | if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING)) |
3862 | tmpq = rd_kafka_q_new(rk); |
3863 | |
3864 | /* Send purge request to all broker threads */ |
3865 | rd_kafka_rdlock(rk); |
3866 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
3867 | rd_kafka_broker_purge_queues(rkb, purge_flags, |
3868 | RD_KAFKA_REPLYQ(tmpq, 0)); |
3869 | waitcnt++; |
3870 | } |
3871 | rd_kafka_rdunlock(rk); |
3872 | |
3873 | /* The internal broker handler may hold unassigned partitions */ |
3874 | mtx_lock(&rk->rk_internal_rkb_lock); |
3875 | rd_kafka_broker_purge_queues(rk->rk_internal_rkb, purge_flags, |
3876 | RD_KAFKA_REPLYQ(tmpq, 0)); |
3877 | mtx_unlock(&rk->rk_internal_rkb_lock); |
3878 | waitcnt++; |
3879 | |
3880 | |
3881 | if (tmpq) { |
3882 | /* Wait for responses */ |
3883 | while (waitcnt-- > 0) |
3884 | rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE); |
3885 | |
3886 | rd_kafka_q_destroy_owner(tmpq); |
3887 | } |
3888 | |
3889 | /* Purge messages for the UA(-1) partitions (which are not |
3890 | * handled by a broker thread) */ |
3891 | if (purge_flags & RD_KAFKA_PURGE_F_QUEUE) |
3892 | rd_kafka_purge_ua_toppar_queues(rk); |
3893 | |
3894 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3895 | } |
3896 | |
3897 | |
3898 | |
3899 | |
3900 | /** |
3901 | * @returns a csv string of purge flags in thread-local storage |
3902 | */ |
3903 | const char *rd_kafka_purge_flags2str (int flags) { |
3904 | static const char *names[] = { "queue" , "inflight" , NULL }; |
3905 | static RD_TLS char ret[64]; |
3906 | |
3907 | return rd_flags2str(ret, sizeof(ret), names, flags); |
3908 | } |
3909 | |
3910 | |
3911 | int rd_kafka_version (void) { |
3912 | return RD_KAFKA_VERSION; |
3913 | } |
3914 | |
3915 | const char *rd_kafka_version_str (void) { |
3916 | static RD_TLS char ret[128]; |
3917 | size_t of = 0, r; |
3918 | |
3919 | if (*ret) |
3920 | return ret; |
3921 | |
3922 | #ifdef LIBRDKAFKA_GIT_VERSION |
3923 | if (*LIBRDKAFKA_GIT_VERSION) { |
3924 | of = rd_snprintf(ret, sizeof(ret), "%s" , |
3925 | *LIBRDKAFKA_GIT_VERSION == 'v' ? |
3926 | LIBRDKAFKA_GIT_VERSION+1 : |
3927 | LIBRDKAFKA_GIT_VERSION); |
3928 | if (of > sizeof(ret)) |
3929 | of = sizeof(ret); |
3930 | } |
3931 | #endif |
3932 | |
3933 | #define _my_sprintf(...) do { \ |
3934 | r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__); \ |
3935 | if (r > sizeof(ret)-of) \ |
3936 | r = sizeof(ret)-of; \ |
3937 | of += r; \ |
3938 | } while(0) |
3939 | |
3940 | if (of == 0) { |
3941 | int ver = rd_kafka_version(); |
3942 | int prel = (ver & 0xff); |
3943 | _my_sprintf("%i.%i.%i" , |
3944 | (ver >> 24) & 0xff, |
3945 | (ver >> 16) & 0xff, |
3946 | (ver >> 8) & 0xff); |
3947 | if (prel != 0xff) { |
3948 | /* pre-builds below 200 are just running numbers, |
3949 | * above 200 are RC numbers. */ |
3950 | if (prel <= 200) |
3951 | _my_sprintf("-pre%d" , prel); |
3952 | else |
3953 | _my_sprintf("-RC%d" , prel - 200); |
3954 | } |
3955 | } |
3956 | |
3957 | #if ENABLE_DEVEL |
3958 | _my_sprintf("-devel" ); |
3959 | #endif |
3960 | |
3961 | #if ENABLE_SHAREDPTR_DEBUG |
3962 | _my_sprintf("-shptr" ); |
3963 | #endif |
3964 | |
3965 | #if WITHOUT_OPTIMIZATION |
3966 | _my_sprintf("-O0" ); |
3967 | #endif |
3968 | |
3969 | return ret; |
3970 | } |
3971 | |
3972 | |
3973 | /** |
3974 | * Assert trampoline to print some debugging information on crash. |
3975 | */ |
3976 | void |
3977 | RD_NORETURN |
3978 | rd_kafka_crash (const char *file, int line, const char *function, |
3979 | rd_kafka_t *rk, const char *reason) { |
3980 | fprintf(stderr, "*** %s:%i:%s: %s ***\n" , |
3981 | file, line, function, reason); |
3982 | if (rk) |
3983 | rd_kafka_dump0(stderr, rk, 0/*no locks*/); |
3984 | abort(); |
3985 | } |
3986 | |
3987 | |
3988 | |
3989 | |
3990 | |
3991 | struct list_groups_state { |
3992 | rd_kafka_q_t *q; |
3993 | rd_kafka_resp_err_t err; |
3994 | int wait_cnt; |
3995 | const char *desired_group; |
3996 | struct rd_kafka_group_list *grplist; |
3997 | int grplist_size; |
3998 | }; |
3999 | |
4000 | static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk, |
4001 | rd_kafka_broker_t *rkb, |
4002 | rd_kafka_resp_err_t err, |
4003 | rd_kafka_buf_t *reply, |
4004 | rd_kafka_buf_t *request, |
4005 | void *opaque) { |
4006 | struct list_groups_state *state; |
4007 | const int log_decode_errors = LOG_ERR; |
4008 | int cnt; |
4009 | |
4010 | if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
4011 | /* 'state' has gone out of scope due to list_groups() |
4012 | * timing out and returning. */ |
4013 | return; |
4014 | } |
4015 | |
4016 | state = opaque; |
4017 | state->wait_cnt--; |
4018 | |
4019 | if (err) |
4020 | goto err; |
4021 | |
4022 | rd_kafka_buf_read_i32(reply, &cnt); |
4023 | |
4024 | while (cnt-- > 0) { |
4025 | int16_t ErrorCode; |
4026 | rd_kafkap_str_t Group, GroupState, ProtoType, Proto; |
4027 | int MemberCnt; |
4028 | struct rd_kafka_group_info *gi; |
4029 | |
4030 | if (state->grplist->group_cnt == state->grplist_size) { |
4031 | /* Grow group array */ |
4032 | state->grplist_size *= 2; |
4033 | state->grplist->groups = |
4034 | rd_realloc(state->grplist->groups, |
4035 | state->grplist_size * |
4036 | sizeof(*state->grplist->groups)); |
4037 | } |
4038 | |
4039 | gi = &state->grplist->groups[state->grplist->group_cnt++]; |
4040 | memset(gi, 0, sizeof(*gi)); |
4041 | |
4042 | rd_kafka_buf_read_i16(reply, &ErrorCode); |
4043 | rd_kafka_buf_read_str(reply, &Group); |
4044 | rd_kafka_buf_read_str(reply, &GroupState); |
4045 | rd_kafka_buf_read_str(reply, &ProtoType); |
4046 | rd_kafka_buf_read_str(reply, &Proto); |
4047 | rd_kafka_buf_read_i32(reply, &MemberCnt); |
4048 | |
4049 | if (MemberCnt > 100000) { |
4050 | err = RD_KAFKA_RESP_ERR__BAD_MSG; |
4051 | goto err; |
4052 | } |
4053 | |
4054 | rd_kafka_broker_lock(rkb); |
4055 | gi->broker.id = rkb->rkb_nodeid; |
4056 | gi->broker.host = rd_strdup(rkb->rkb_origname); |
4057 | gi->broker.port = rkb->rkb_port; |
4058 | rd_kafka_broker_unlock(rkb); |
4059 | |
4060 | gi->err = ErrorCode; |
4061 | gi->group = RD_KAFKAP_STR_DUP(&Group); |
4062 | gi->state = RD_KAFKAP_STR_DUP(&GroupState); |
4063 | gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType); |
4064 | gi->protocol = RD_KAFKAP_STR_DUP(&Proto); |
4065 | |
4066 | if (MemberCnt > 0) |
4067 | gi->members = |
4068 | rd_malloc(MemberCnt * sizeof(*gi->members)); |
4069 | |
4070 | while (MemberCnt-- > 0) { |
4071 | rd_kafkap_str_t MemberId, ClientId, ClientHost; |
4072 | rd_kafkap_bytes_t Meta, Assignment; |
4073 | struct rd_kafka_group_member_info *mi; |
4074 | |
4075 | mi = &gi->members[gi->member_cnt++]; |
4076 | memset(mi, 0, sizeof(*mi)); |
4077 | |
4078 | rd_kafka_buf_read_str(reply, &MemberId); |
4079 | rd_kafka_buf_read_str(reply, &ClientId); |
4080 | rd_kafka_buf_read_str(reply, &ClientHost); |
4081 | rd_kafka_buf_read_bytes(reply, &Meta); |
4082 | rd_kafka_buf_read_bytes(reply, &Assignment); |
4083 | |
4084 | mi->member_id = RD_KAFKAP_STR_DUP(&MemberId); |
4085 | mi->client_id = RD_KAFKAP_STR_DUP(&ClientId); |
4086 | mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost); |
4087 | |
4088 | if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) { |
4089 | mi->member_metadata_size = 0; |
4090 | mi->member_metadata = NULL; |
4091 | } else { |
4092 | mi->member_metadata_size = |
4093 | RD_KAFKAP_BYTES_LEN(&Meta); |
4094 | mi->member_metadata = |
4095 | rd_memdup(Meta.data, |
4096 | mi->member_metadata_size); |
4097 | } |
4098 | |
4099 | if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) { |
4100 | mi->member_assignment_size = 0; |
4101 | mi->member_assignment = NULL; |
4102 | } else { |
4103 | mi->member_assignment_size = |
4104 | RD_KAFKAP_BYTES_LEN(&Assignment); |
4105 | mi->member_assignment = |
4106 | rd_memdup(Assignment.data, |
4107 | mi->member_assignment_size); |
4108 | } |
4109 | } |
4110 | } |
4111 | |
4112 | err: |
4113 | state->err = err; |
4114 | return; |
4115 | |
4116 | err_parse: |
4117 | state->err = reply->rkbuf_err; |
4118 | } |
4119 | |
4120 | static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk, |
4121 | rd_kafka_broker_t *rkb, |
4122 | rd_kafka_resp_err_t err, |
4123 | rd_kafka_buf_t *reply, |
4124 | rd_kafka_buf_t *request, |
4125 | void *opaque) { |
4126 | struct list_groups_state *state; |
4127 | const int log_decode_errors = LOG_ERR; |
4128 | int16_t ErrorCode; |
4129 | char **grps; |
4130 | int cnt, grpcnt, i = 0; |
4131 | |
4132 | if (err == RD_KAFKA_RESP_ERR__DESTROY) { |
4133 | /* 'state' is no longer in scope because |
4134 | * list_groups() timed out and returned to the caller. |
4135 | * We must not touch anything here but simply return. */ |
4136 | return; |
4137 | } |
4138 | |
4139 | state = opaque; |
4140 | |
4141 | state->wait_cnt--; |
4142 | |
4143 | if (err) |
4144 | goto err; |
4145 | |
4146 | rd_kafka_buf_read_i16(reply, &ErrorCode); |
4147 | if (ErrorCode) { |
4148 | err = ErrorCode; |
4149 | goto err; |
4150 | } |
4151 | |
4152 | rd_kafka_buf_read_i32(reply, &cnt); |
4153 | |
4154 | if (state->desired_group) |
4155 | grpcnt = 1; |
4156 | else |
4157 | grpcnt = cnt; |
4158 | |
4159 | if (cnt == 0 || grpcnt == 0) |
4160 | return; |
4161 | |
4162 | grps = rd_malloc(sizeof(*grps) * grpcnt); |
4163 | |
4164 | while (cnt-- > 0) { |
4165 | rd_kafkap_str_t grp, proto; |
4166 | |
4167 | rd_kafka_buf_read_str(reply, &grp); |
4168 | rd_kafka_buf_read_str(reply, &proto); |
4169 | |
4170 | if (state->desired_group && |
4171 | rd_kafkap_str_cmp_str(&grp, state->desired_group)) |
4172 | continue; |
4173 | |
4174 | grps[i++] = RD_KAFKAP_STR_DUP(&grp); |
4175 | |
4176 | if (i == grpcnt) |
4177 | break; |
4178 | } |
4179 | |
4180 | if (i > 0) { |
4181 | state->wait_cnt++; |
4182 | rd_kafka_DescribeGroupsRequest(rkb, |
4183 | (const char **)grps, i, |
4184 | RD_KAFKA_REPLYQ(state->q, 0), |
4185 | rd_kafka_DescribeGroups_resp_cb, |
4186 | state); |
4187 | |
4188 | while (i-- > 0) |
4189 | rd_free(grps[i]); |
4190 | } |
4191 | |
4192 | |
4193 | rd_free(grps); |
4194 | |
4195 | err: |
4196 | state->err = err; |
4197 | return; |
4198 | |
4199 | err_parse: |
4200 | state->err = reply->rkbuf_err; |
4201 | } |
4202 | |
4203 | rd_kafka_resp_err_t |
4204 | rd_kafka_list_groups (rd_kafka_t *rk, const char *group, |
4205 | const struct rd_kafka_group_list **grplistp, |
4206 | int timeout_ms) { |
4207 | rd_kafka_broker_t *rkb; |
4208 | int rkb_cnt = 0; |
4209 | struct list_groups_state state = RD_ZERO_INIT; |
4210 | rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
4211 | int state_version = rd_kafka_brokers_get_state_version(rk); |
4212 | |
4213 | /* Wait until metadata has been fetched from cluster so |
4214 | * that we have a full broker list. |
4215 | * This state only happens during initial client setup, after that |
4216 | * there'll always be a cached metadata copy. */ |
4217 | rd_kafka_rdlock(rk); |
4218 | while (!rk->rk_ts_metadata) { |
4219 | rd_kafka_rdunlock(rk); |
4220 | |
4221 | if (!rd_kafka_brokers_wait_state_change( |
4222 | rk, state_version, rd_timeout_remains(ts_end))) |
4223 | return RD_KAFKA_RESP_ERR__TIMED_OUT; |
4224 | |
4225 | rd_kafka_rdlock(rk); |
4226 | } |
4227 | |
4228 | state.q = rd_kafka_q_new(rk); |
4229 | state.desired_group = group; |
4230 | state.grplist = rd_calloc(1, sizeof(*state.grplist)); |
4231 | state.grplist_size = group ? 1 : 32; |
4232 | |
4233 | state.grplist->groups = rd_malloc(state.grplist_size * |
4234 | sizeof(*state.grplist->groups)); |
4235 | |
4236 | /* Query each broker for its list of groups */ |
4237 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
4238 | rd_kafka_broker_lock(rkb); |
4239 | if (rkb->rkb_nodeid == -1) { |
4240 | rd_kafka_broker_unlock(rkb); |
4241 | continue; |
4242 | } |
4243 | |
4244 | state.wait_cnt++; |
4245 | rd_kafka_ListGroupsRequest(rkb, |
4246 | RD_KAFKA_REPLYQ(state.q, 0), |
4247 | rd_kafka_ListGroups_resp_cb, |
4248 | &state); |
4249 | |
4250 | rkb_cnt++; |
4251 | |
4252 | rd_kafka_broker_unlock(rkb); |
4253 | |
4254 | } |
4255 | rd_kafka_rdunlock(rk); |
4256 | |
4257 | if (rkb_cnt == 0) { |
4258 | state.err = RD_KAFKA_RESP_ERR__TRANSPORT; |
4259 | |
4260 | } else { |
4261 | int remains; |
4262 | |
4263 | while (state.wait_cnt > 0 && |
4264 | !rd_timeout_expired((remains = |
4265 | rd_timeout_remains(ts_end)))) { |
4266 | rd_kafka_q_serve(state.q, remains, 0, |
4267 | RD_KAFKA_Q_CB_CALLBACK, |
4268 | rd_kafka_poll_cb, NULL); |
4269 | /* Ignore yields */ |
4270 | } |
4271 | } |
4272 | |
4273 | rd_kafka_q_destroy_owner(state.q); |
4274 | |
4275 | if (state.wait_cnt > 0 && !state.err) { |
4276 | if (state.grplist->group_cnt == 0) |
4277 | state.err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
4278 | else { |
4279 | *grplistp = state.grplist; |
4280 | return RD_KAFKA_RESP_ERR__PARTIAL; |
4281 | } |
4282 | } |
4283 | |
4284 | if (state.err) |
4285 | rd_kafka_group_list_destroy(state.grplist); |
4286 | else |
4287 | *grplistp = state.grplist; |
4288 | |
4289 | return state.err; |
4290 | } |
4291 | |
4292 | |
4293 | void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) { |
4294 | struct rd_kafka_group_list *grplist = |
4295 | (struct rd_kafka_group_list *)grplist0; |
4296 | |
4297 | while (grplist->group_cnt-- > 0) { |
4298 | struct rd_kafka_group_info *gi; |
4299 | gi = &grplist->groups[grplist->group_cnt]; |
4300 | |
4301 | if (gi->broker.host) |
4302 | rd_free(gi->broker.host); |
4303 | if (gi->group) |
4304 | rd_free(gi->group); |
4305 | if (gi->state) |
4306 | rd_free(gi->state); |
4307 | if (gi->protocol_type) |
4308 | rd_free(gi->protocol_type); |
4309 | if (gi->protocol) |
4310 | rd_free(gi->protocol); |
4311 | |
4312 | while (gi->member_cnt-- > 0) { |
4313 | struct rd_kafka_group_member_info *mi; |
4314 | mi = &gi->members[gi->member_cnt]; |
4315 | |
4316 | if (mi->member_id) |
4317 | rd_free(mi->member_id); |
4318 | if (mi->client_id) |
4319 | rd_free(mi->client_id); |
4320 | if (mi->client_host) |
4321 | rd_free(mi->client_host); |
4322 | if (mi->member_metadata) |
4323 | rd_free(mi->member_metadata); |
4324 | if (mi->member_assignment) |
4325 | rd_free(mi->member_assignment); |
4326 | } |
4327 | |
4328 | if (gi->members) |
4329 | rd_free(gi->members); |
4330 | } |
4331 | |
4332 | if (grplist->groups) |
4333 | rd_free(grplist->groups); |
4334 | |
4335 | rd_free(grplist); |
4336 | } |
4337 | |
4338 | |
4339 | |
4340 | const char *rd_kafka_get_debug_contexts(void) { |
4341 | return RD_KAFKA_DEBUG_CONTEXTS; |
4342 | } |
4343 | |
4344 | |
4345 | int rd_kafka_path_is_dir (const char *path) { |
4346 | #ifdef _MSC_VER |
4347 | struct _stat st; |
4348 | return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR); |
4349 | #else |
4350 | struct stat st; |
4351 | return (stat(path, &st) == 0 && S_ISDIR(st.st_mode)); |
4352 | #endif |
4353 | } |
4354 | |
4355 | |
4356 | void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) { |
4357 | free(ptr); |
4358 | } |
4359 | |
4360 | |
4361 | int rd_kafka_errno (void) { |
4362 | return errno; |
4363 | } |
4364 | |
4365 | int rd_kafka_unittest (void) { |
4366 | return rd_unittest(); |
4367 | } |
4368 | |
4369 | |
4370 | #if ENABLE_SHAREDPTR_DEBUG |
4371 | struct rd_shptr0_head rd_shared_ptr_debug_list; |
4372 | mtx_t rd_shared_ptr_debug_mtx; |
4373 | |
4374 | void rd_shared_ptrs_dump (void) { |
4375 | rd_shptr0_t *sptr; |
4376 | |
4377 | printf("################ Current shared pointers ################\n" ); |
4378 | printf("### op_cnt: %d\n" , rd_atomic32_get(&rd_kafka_op_cnt)); |
4379 | mtx_lock(&rd_shared_ptr_debug_mtx); |
4380 | LIST_FOREACH(sptr, &rd_shared_ptr_debug_list, link) |
4381 | printf("# shptr ((%s*)%p): object %p refcnt %d: at %s:%d\n" , |
4382 | sptr->typename, sptr, sptr->obj, |
4383 | rd_refcnt_get(sptr->ref), sptr->func, sptr->line); |
4384 | mtx_unlock(&rd_shared_ptr_debug_mtx); |
4385 | printf("#########################################################\n" ); |
4386 | } |
4387 | #endif |
4388 | |