1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * @file rdkafka.h
31 * @brief Apache Kafka C/C++ consumer and producer client library.
32 *
33 * rdkafka.h contains the public API for librdkafka.
34 * The API is documented in this file as comments prefixing the function, type,
35 * enum, define, etc.
36 *
37 * @sa For the C++ interface see rdkafkacpp.h
38 *
39 * @tableofcontents
40 */
41
42
43/* @cond NO_DOC */
44#ifndef _RDKAFKA_H_
45#define _RDKAFKA_H_
46
47#include <stdio.h>
48#include <inttypes.h>
49#include <sys/types.h>
50
51#ifdef __cplusplus
52extern "C" {
53#if 0
54} /* Restore indent */
55#endif
56#endif
57
58#ifdef _MSC_VER
59#include <basetsd.h>
60#ifndef WIN32_MEAN_AND_LEAN
61#define WIN32_MEAN_AND_LEAN
62#endif
63#include <Winsock2.h> /* for sockaddr, .. */
64typedef SSIZE_T ssize_t;
65#define RD_UNUSED
66#define RD_INLINE __inline
67#define RD_DEPRECATED __declspec(deprecated)
68#undef RD_EXPORT
69#ifdef LIBRDKAFKA_STATICLIB
70#define RD_EXPORT
71#else
72#ifdef LIBRDKAFKA_EXPORTS
73#define RD_EXPORT __declspec(dllexport)
74#else
75#define RD_EXPORT __declspec(dllimport)
76#endif
77#ifndef LIBRDKAFKA_TYPECHECKS
78#define LIBRDKAFKA_TYPECHECKS 0
79#endif
80#endif
81
82#else
83#include <sys/socket.h> /* for sockaddr, .. */
84
85#define RD_UNUSED __attribute__((unused))
86#define RD_INLINE inline
87#define RD_EXPORT
88#define RD_DEPRECATED __attribute__((deprecated))
89
90#ifndef LIBRDKAFKA_TYPECHECKS
91#define LIBRDKAFKA_TYPECHECKS 1
92#endif
93#endif
94
95
96/**
97 * @brief Type-checking macros
98 * Compile-time checking that \p ARG is of type \p TYPE.
99 * @returns \p RET
100 */
101#if LIBRDKAFKA_TYPECHECKS
102#define _LRK_TYPECHECK(RET,TYPE,ARG) \
103 ({ if (0) { TYPE __t RD_UNUSED = (ARG); } RET; })
104
105#define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \
106 ({ \
107 if (0) { \
108 TYPE __t RD_UNUSED = (ARG); \
109 TYPE2 __t2 RD_UNUSED = (ARG2); \
110 } \
111 RET; })
112
113#define _LRK_TYPECHECK3(RET,TYPE,ARG,TYPE2,ARG2,TYPE3,ARG3) \
114 ({ \
115 if (0) { \
116 TYPE __t RD_UNUSED = (ARG); \
117 TYPE2 __t2 RD_UNUSED = (ARG2); \
118 TYPE3 __t3 RD_UNUSED = (ARG3); \
119 } \
120 RET; })
121#else
122#define _LRK_TYPECHECK(RET,TYPE,ARG) (RET)
123#define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) (RET)
124#define _LRK_TYPECHECK3(RET,TYPE,ARG,TYPE2,ARG2,TYPE3,ARG3) (RET)
125#endif
126
127/* @endcond */
128
129
130/**
131 * @name librdkafka version
132 * @{
133 *
134 *
135 */
136
137/**
138 * @brief librdkafka version
139 *
140 * Interpreted as hex \c MM.mm.rr.xx:
141 * - MM = Major
142 * - mm = minor
143 * - rr = revision
144 * - xx = pre-release id (0xff is the final release)
145 *
146 * E.g.: \c 0x000801ff = 0.8.1
147 *
148 * @remark This value should only be used during compile time,
149 * for runtime checks of version use rd_kafka_version()
150 */
151#define RD_KAFKA_VERSION 0x010100ff
152
153/**
154 * @brief Returns the librdkafka version as integer.
155 *
156 * @returns Version integer.
157 *
158 * @sa See RD_KAFKA_VERSION for how to parse the integer format.
159 * @sa Use rd_kafka_version_str() to retreive the version as a string.
160 */
161RD_EXPORT
162int rd_kafka_version(void);
163
164/**
165 * @brief Returns the librdkafka version as string.
166 *
167 * @returns Version string
168 */
169RD_EXPORT
170const char *rd_kafka_version_str (void);
171
172/**@}*/
173
174
175/**
176 * @name Constants, errors, types
177 * @{
178 *
179 *
180 */
181
182
183/**
184 * @enum rd_kafka_type_t
185 *
186 * @brief rd_kafka_t handle type.
187 *
188 * @sa rd_kafka_new()
189 */
190typedef enum rd_kafka_type_t {
191 RD_KAFKA_PRODUCER, /**< Producer client */
192 RD_KAFKA_CONSUMER /**< Consumer client */
193} rd_kafka_type_t;
194
195
196/**
197 * @enum Timestamp types
198 *
199 * @sa rd_kafka_message_timestamp()
200 */
201typedef enum rd_kafka_timestamp_type_t {
202 RD_KAFKA_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */
203 RD_KAFKA_TIMESTAMP_CREATE_TIME, /**< Message creation time */
204 RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME /**< Log append time */
205} rd_kafka_timestamp_type_t;
206
207
208
209/**
210 * @brief Retrieve supported debug contexts for use with the \c \"debug\"
211 * configuration property. (runtime)
212 *
213 * @returns Comma-separated list of available debugging contexts.
214 */
215RD_EXPORT
216const char *rd_kafka_get_debug_contexts(void);
217
218/**
219 * @brief Supported debug contexts. (compile time)
220 *
221 * @deprecated This compile time value may be outdated at runtime due to
222 * linking another version of the library.
223 * Use rd_kafka_get_debug_contexts() instead.
224 */
225#define RD_KAFKA_DEBUG_CONTEXTS \
226 "all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos"
227
228
229/* @cond NO_DOC */
230/* Private types to provide ABI compatibility */
231typedef struct rd_kafka_s rd_kafka_t;
232typedef struct rd_kafka_topic_s rd_kafka_topic_t;
233typedef struct rd_kafka_conf_s rd_kafka_conf_t;
234typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
235typedef struct rd_kafka_queue_s rd_kafka_queue_t;
236typedef struct rd_kafka_op_s rd_kafka_event_t;
237typedef struct rd_kafka_topic_result_s rd_kafka_topic_result_t;
238/* @endcond */
239
240
241/**
242 * @enum rd_kafka_resp_err_t
243 * @brief Error codes.
244 *
245 * The negative error codes delimited by two underscores
246 * (\c RD_KAFKA_RESP_ERR__..) denotes errors internal to librdkafka and are
247 * displayed as \c \"Local: \<error string..\>\", while the error codes
248 * delimited by a single underscore (\c RD_KAFKA_RESP_ERR_..) denote broker
249 * errors and are displayed as \c \"Broker: \<error string..\>\".
250 *
251 * @sa Use rd_kafka_err2str() to translate an error code a human readable string
252 */
253typedef enum {
254 /* Internal errors to rdkafka: */
255 /** Begin internal error codes */
256 RD_KAFKA_RESP_ERR__BEGIN = -200,
257 /** Received message is incorrect */
258 RD_KAFKA_RESP_ERR__BAD_MSG = -199,
259 /** Bad/unknown compression */
260 RD_KAFKA_RESP_ERR__BAD_COMPRESSION = -198,
261 /** Broker is going away */
262 RD_KAFKA_RESP_ERR__DESTROY = -197,
263 /** Generic failure */
264 RD_KAFKA_RESP_ERR__FAIL = -196,
265 /** Broker transport failure */
266 RD_KAFKA_RESP_ERR__TRANSPORT = -195,
267 /** Critical system resource */
268 RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE = -194,
269 /** Failed to resolve broker */
270 RD_KAFKA_RESP_ERR__RESOLVE = -193,
271 /** Produced message timed out*/
272 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT = -192,
273 /** Reached the end of the topic+partition queue on
274 * the broker. Not really an error. */
275 RD_KAFKA_RESP_ERR__PARTITION_EOF = -191,
276 /** Permanent: Partition does not exist in cluster. */
277 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION = -190,
278 /** File or filesystem error */
279 RD_KAFKA_RESP_ERR__FS = -189,
280 /** Permanent: Topic does not exist in cluster. */
281 RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188,
282 /** All broker connections are down. */
283 RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN = -187,
284 /** Invalid argument, or invalid configuration */
285 RD_KAFKA_RESP_ERR__INVALID_ARG = -186,
286 /** Operation timed out */
287 RD_KAFKA_RESP_ERR__TIMED_OUT = -185,
288 /** Queue is full */
289 RD_KAFKA_RESP_ERR__QUEUE_FULL = -184,
290 /** ISR count < required.acks */
291 RD_KAFKA_RESP_ERR__ISR_INSUFF = -183,
292 /** Broker node update */
293 RD_KAFKA_RESP_ERR__NODE_UPDATE = -182,
294 /** SSL error */
295 RD_KAFKA_RESP_ERR__SSL = -181,
296 /** Waiting for coordinator to become available. */
297 RD_KAFKA_RESP_ERR__WAIT_COORD = -180,
298 /** Unknown client group */
299 RD_KAFKA_RESP_ERR__UNKNOWN_GROUP = -179,
300 /** Operation in progress */
301 RD_KAFKA_RESP_ERR__IN_PROGRESS = -178,
302 /** Previous operation in progress, wait for it to finish. */
303 RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS = -177,
304 /** This operation would interfere with an existing subscription */
305 RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION = -176,
306 /** Assigned partitions (rebalance_cb) */
307 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS = -175,
308 /** Revoked partitions (rebalance_cb) */
309 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS = -174,
310 /** Conflicting use */
311 RD_KAFKA_RESP_ERR__CONFLICT = -173,
312 /** Wrong state */
313 RD_KAFKA_RESP_ERR__STATE = -172,
314 /** Unknown protocol */
315 RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL = -171,
316 /** Not implemented */
317 RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED = -170,
318 /** Authentication failure*/
319 RD_KAFKA_RESP_ERR__AUTHENTICATION = -169,
320 /** No stored offset */
321 RD_KAFKA_RESP_ERR__NO_OFFSET = -168,
322 /** Outdated */
323 RD_KAFKA_RESP_ERR__OUTDATED = -167,
324 /** Timed out in queue */
325 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE = -166,
326 /** Feature not supported by broker */
327 RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE = -165,
328 /** Awaiting cache update */
329 RD_KAFKA_RESP_ERR__WAIT_CACHE = -164,
330 /** Operation interrupted (e.g., due to yield)) */
331 RD_KAFKA_RESP_ERR__INTR = -163,
332 /** Key serialization error */
333 RD_KAFKA_RESP_ERR__KEY_SERIALIZATION = -162,
334 /** Value serialization error */
335 RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION = -161,
336 /** Key deserialization error */
337 RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION = -160,
338 /** Value deserialization error */
339 RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION = -159,
340 /** Partial response */
341 RD_KAFKA_RESP_ERR__PARTIAL = -158,
342 /** Modification attempted on read-only object */
343 RD_KAFKA_RESP_ERR__READ_ONLY = -157,
344 /** No such entry / item not found */
345 RD_KAFKA_RESP_ERR__NOENT = -156,
346 /** Read underflow */
347 RD_KAFKA_RESP_ERR__UNDERFLOW = -155,
348 /** Invalid type */
349 RD_KAFKA_RESP_ERR__INVALID_TYPE = -154,
350 /** Retry operation */
351 RD_KAFKA_RESP_ERR__RETRY = -153,
352 /** Purged in queue */
353 RD_KAFKA_RESP_ERR__PURGE_QUEUE = -152,
354 /** Purged in flight */
355 RD_KAFKA_RESP_ERR__PURGE_INFLIGHT = -151,
356 /** Fatal error: see rd_kafka_fatal_error() */
357 RD_KAFKA_RESP_ERR__FATAL = -150,
358 /** Inconsistent state */
359 RD_KAFKA_RESP_ERR__INCONSISTENT = -149,
360 /** Gap-less ordering would not be guaranteed if proceeding */
361 RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE = -148,
362 /** Maximum poll interval exceeded */
363 RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED = -147,
364
365 /** End internal error codes */
366 RD_KAFKA_RESP_ERR__END = -100,
367
368 /* Kafka broker errors: */
369 /** Unknown broker error */
370 RD_KAFKA_RESP_ERR_UNKNOWN = -1,
371 /** Success */
372 RD_KAFKA_RESP_ERR_NO_ERROR = 0,
373 /** Offset out of range */
374 RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE = 1,
375 /** Invalid message */
376 RD_KAFKA_RESP_ERR_INVALID_MSG = 2,
377 /** Unknown topic or partition */
378 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART = 3,
379 /** Invalid message size */
380 RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE = 4,
381 /** Leader not available */
382 RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE = 5,
383 /** Not leader for partition */
384 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION = 6,
385 /** Request timed out */
386 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT = 7,
387 /** Broker not available */
388 RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE = 8,
389 /** Replica not available */
390 RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE = 9,
391 /** Message size too large */
392 RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE = 10,
393 /** StaleControllerEpochCode */
394 RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH = 11,
395 /** Offset metadata string too large */
396 RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE = 12,
397 /** Broker disconnected before response received */
398 RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION = 13,
399 /** Group coordinator load in progress */
400 RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS = 14,
401 /** Group coordinator not available */
402 RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
403 /** Not coordinator for group */
404 RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP = 16,
405 /** Invalid topic */
406 RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION = 17,
407 /** Message batch larger than configured server segment size */
408 RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE = 18,
409 /** Not enough in-sync replicas */
410 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS = 19,
411 /** Message(s) written to insufficient number of in-sync replicas */
412 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
413 /** Invalid required acks value */
414 RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS = 21,
415 /** Specified group generation id is not valid */
416 RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION = 22,
417 /** Inconsistent group protocol */
418 RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
419 /** Invalid group.id */
420 RD_KAFKA_RESP_ERR_INVALID_GROUP_ID = 24,
421 /** Unknown member */
422 RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID = 25,
423 /** Invalid session timeout */
424 RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT = 26,
425 /** Group rebalance in progress */
426 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS = 27,
427 /** Commit offset data size is not valid */
428 RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
429 /** Topic authorization failed */
430 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED = 29,
431 /** Group authorization failed */
432 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED = 30,
433 /** Cluster authorization failed */
434 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
435 /** Invalid timestamp */
436 RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP = 32,
437 /** Unsupported SASL mechanism */
438 RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM = 33,
439 /** Illegal SASL state */
440 RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE = 34,
441 /** Unuspported version */
442 RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION = 35,
443 /** Topic already exists */
444 RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS = 36,
445 /** Invalid number of partitions */
446 RD_KAFKA_RESP_ERR_INVALID_PARTITIONS = 37,
447 /** Invalid replication factor */
448 RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR = 38,
449 /** Invalid replica assignment */
450 RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT = 39,
451 /** Invalid config */
452 RD_KAFKA_RESP_ERR_INVALID_CONFIG = 40,
453 /** Not controller for cluster */
454 RD_KAFKA_RESP_ERR_NOT_CONTROLLER = 41,
455 /** Invalid request */
456 RD_KAFKA_RESP_ERR_INVALID_REQUEST = 42,
457 /** Message format on broker does not support request */
458 RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
459 /** Policy violation */
460 RD_KAFKA_RESP_ERR_POLICY_VIOLATION = 44,
461 /** Broker received an out of order sequence number */
462 RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
463 /** Broker received a duplicate sequence number */
464 RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
465 /** Producer attempted an operation with an old epoch */
466 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH = 47,
467 /** Producer attempted a transactional operation in an invalid state */
468 RD_KAFKA_RESP_ERR_INVALID_TXN_STATE = 48,
469 /** Producer attempted to use a producer id which is not
470 * currently assigned to its transactional id */
471 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING = 49,
472 /** Transaction timeout is larger than the maximum
473 * value allowed by the broker's max.transaction.timeout.ms */
474 RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT = 50,
475 /** Producer attempted to update a transaction while another
476 * concurrent operation on the same transaction was ongoing */
477 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS = 51,
478 /** Indicates that the transaction coordinator sending a
479 * WriteTxnMarker is no longer the current coordinator for a
480 * given producer */
481 RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED = 52,
482 /** Transactional Id authorization failed */
483 RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
484 /** Security features are disabled */
485 RD_KAFKA_RESP_ERR_SECURITY_DISABLED = 54,
486 /** Operation not attempted */
487 RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED = 55,
488 /** Disk error when trying to access log file on the disk */
489 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR = 56,
490 /** The user-specified log directory is not found in the broker config */
491 RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND = 57,
492 /** SASL Authentication failed */
493 RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED = 58,
494 /** Unknown Producer Id */
495 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID = 59,
496 /** Partition reassignment is in progress */
497 RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS = 60,
498 /** Delegation Token feature is not enabled */
499 RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61,
500 /** Delegation Token is not found on server */
501 RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND = 62,
502 /** Specified Principal is not valid Owner/Renewer */
503 RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
504 /** Delegation Token requests are not allowed on this connection */
505 RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
506 /** Delegation Token authorization failed */
507 RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
508 /** Delegation Token is expired */
509 RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED = 66,
510 /** Supplied principalType is not supported */
511 RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE = 67,
512 /** The group is not empty */
513 RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP = 68,
514 /** The group id does not exist */
515 RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND = 69,
516 /** The fetch session ID was not found */
517 RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND = 70,
518 /** The fetch session epoch is invalid */
519 RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH = 71,
520 /** No matching listener */
521 RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND = 72,
522 /** Topic deletion is disabled */
523 RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED = 73,
524 /** Leader epoch is older than broker epoch */
525 RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH = 74,
526 /** Leader epoch is newer than broker epoch */
527 RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH = 75,
528 /** Unsupported compression type */
529 RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE = 76,
530 /** Broker epoch has changed */
531 RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH = 77,
532 /** Leader high watermark is not caught up */
533 RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE = 78,
534 /** Group member needs a valid member ID */
535 RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED = 79,
536 /** Preferred leader was not available */
537 RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
538 /** Consumer group has reached maximum size */
539 RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED = 81,
540
541 RD_KAFKA_RESP_ERR_END_ALL,
542} rd_kafka_resp_err_t;
543
544
545/**
546 * @brief Error code value, name and description.
547 * Typically for use with language bindings to automatically expose
548 * the full set of librdkafka error codes.
549 */
550struct rd_kafka_err_desc {
551 rd_kafka_resp_err_t code;/**< Error code */
552 const char *name; /**< Error name, same as code enum sans prefix */
553 const char *desc; /**< Human readable error description. */
554};
555
556
557/**
558 * @brief Returns the full list of error codes.
559 */
560RD_EXPORT
561void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
562 size_t *cntp);
563
564
565
566
567/**
568 * @brief Returns a human readable representation of a kafka error.
569 *
570 * @param err Error code to translate
571 */
572RD_EXPORT
573const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
574
575
576
577/**
578 * @brief Returns the error code name (enum name).
579 *
580 * @param err Error code to translate
581 */
582RD_EXPORT
583const char *rd_kafka_err2name (rd_kafka_resp_err_t err);
584
585
586/**
587 * @brief Returns the last error code generated by a legacy API call
588 * in the current thread.
589 *
590 * The legacy APIs are the ones using errno to propagate error value, namely:
591 * - rd_kafka_topic_new()
592 * - rd_kafka_consume_start()
593 * - rd_kafka_consume_stop()
594 * - rd_kafka_consume()
595 * - rd_kafka_consume_batch()
596 * - rd_kafka_consume_callback()
597 * - rd_kafka_consume_queue()
598 * - rd_kafka_produce()
599 *
600 * The main use for this function is to avoid converting system \p errno
601 * values to rd_kafka_resp_err_t codes for legacy APIs.
602 *
603 * @remark The last error is stored per-thread, if multiple rd_kafka_t handles
604 * are used in the same application thread the developer needs to
605 * make sure rd_kafka_last_error() is called immediately after
606 * a failed API call.
607 *
608 * @remark errno propagation from librdkafka is not safe on Windows
609 * and should not be used, use rd_kafka_last_error() instead.
610 */
611RD_EXPORT
612rd_kafka_resp_err_t rd_kafka_last_error (void);
613
614
615/**
616 * @brief Converts the system errno value \p errnox to a rd_kafka_resp_err_t
617 * error code upon failure from the following functions:
618 * - rd_kafka_topic_new()
619 * - rd_kafka_consume_start()
620 * - rd_kafka_consume_stop()
621 * - rd_kafka_consume()
622 * - rd_kafka_consume_batch()
623 * - rd_kafka_consume_callback()
624 * - rd_kafka_consume_queue()
625 * - rd_kafka_produce()
626 *
627 * @param errnox System errno value to convert
628 *
629 * @returns Appropriate error code for \p errnox
630 *
631 * @remark A better alternative is to call rd_kafka_last_error() immediately
632 * after any of the above functions return -1 or NULL.
633 *
634 * @deprecated Use rd_kafka_last_error() to retrieve the last error code
635 * set by the legacy librdkafka APIs.
636 *
637 * @sa rd_kafka_last_error()
638 */
639RD_EXPORT RD_DEPRECATED
640rd_kafka_resp_err_t rd_kafka_errno2err(int errnox);
641
642
643/**
644 * @brief Returns the thread-local system errno
645 *
646 * On most platforms this is the same as \p errno but in case of different
647 * runtimes between library and application (e.g., Windows static DLLs)
648 * this provides a means for exposing the errno librdkafka uses.
649 *
650 * @remark The value is local to the current calling thread.
651 *
652 * @deprecated Use rd_kafka_last_error() to retrieve the last error code
653 * set by the legacy librdkafka APIs.
654 */
655RD_EXPORT RD_DEPRECATED
656int rd_kafka_errno (void);
657
658
659
660
661/**
662 * @brief Returns the first fatal error set on this client instance,
663 * or RD_KAFKA_RESP_ERR_NO_ERROR if no fatal error has occurred.
664 *
665 * This function is to be used with the Idempotent Producer and \c error_cb
666 * to detect fatal errors.
667 *
668 * Generally all errors raised by \c error_cb are to be considered
669 * informational and temporary, the client will try to recover from all
670 * errors in a graceful fashion (by retrying, etc).
671 *
672 * However, some errors should logically be considered fatal to retain
673 * consistency; in particular a set of errors that may occur when using the
674 * Idempotent Producer and the in-order or exactly-once producer guarantees
675 * can't be satisfied.
676 *
677 * @param errstr A human readable error string (nul-terminated) is written to
678 * this location that must be of at least \p errstr_size bytes.
679 * The \p errstr is only written to if there is a fatal error.
680 *
681 *
682 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if no fatal error has been raised, else
683 * any other error code.
684 */
685RD_EXPORT
686rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk,
687 char *errstr, size_t errstr_size);
688
689
690/**
691 * @brief Trigger a fatal error for testing purposes.
692 *
693 * Since there is no practical way to trigger real fatal errors in the
694 * idempotent producer, this method allows an application to trigger
695 * fabricated fatal errors in tests to check its error handling code.
696 *
697 * @param err The underlying error code.
698 * @param reason A human readable error reason.
699 * Will be prefixed with "test_fatal_error: " to differentiate
700 * from real fatal errors.
701 *
702 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if a fatal error was triggered, or
703 * RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS if a previous fatal error
704 * has already been triggered.
705 */
706RD_EXPORT rd_kafka_resp_err_t
707rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
708 const char *reason);
709
710
711/**
712 * @brief Topic+Partition place holder
713 *
714 * Generic place holder for a Topic+Partition and its related information
715 * used for multiple purposes:
716 * - consumer offset (see rd_kafka_commit(), et.al.)
717 * - group rebalancing callback (rd_kafka_conf_set_rebalance_cb())
718 * - offset commit result callback (rd_kafka_conf_set_offset_commit_cb())
719 */
720
721/**
722 * @brief Generic place holder for a specific Topic+Partition.
723 *
724 * @sa rd_kafka_topic_partition_list_new()
725 */
726typedef struct rd_kafka_topic_partition_s {
727 char *topic; /**< Topic name */
728 int32_t partition; /**< Partition */
729 int64_t offset; /**< Offset */
730 void *metadata; /**< Metadata */
731 size_t metadata_size; /**< Metadata size */
732 void *opaque; /**< Application opaque */
733 rd_kafka_resp_err_t err; /**< Error code, depending on use. */
734 void *_private; /**< INTERNAL USE ONLY,
735 * INITIALIZE TO ZERO, DO NOT TOUCH */
736} rd_kafka_topic_partition_t;
737
738
739/**
740 * @brief Destroy a rd_kafka_topic_partition_t.
741 * @remark This must not be called for elements in a topic partition list.
742 */
743RD_EXPORT
744void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar);
745
746
747/**
748 * @brief A growable list of Topic+Partitions.
749 *
750 */
751typedef struct rd_kafka_topic_partition_list_s {
752 int cnt; /**< Current number of elements */
753 int size; /**< Current allocated size */
754 rd_kafka_topic_partition_t *elems; /**< Element array[] */
755} rd_kafka_topic_partition_list_t;
756
757
758/**
759 * @brief Create a new list/vector Topic+Partition container.
760 *
761 * @param size Initial allocated size used when the expected number of
762 * elements is known or can be estimated.
763 * Avoids reallocation and possibly relocation of the
764 * elems array.
765 *
766 * @returns A newly allocated Topic+Partition list.
767 *
768 * @remark Use rd_kafka_topic_partition_list_destroy() to free all resources
769 * in use by a list and the list itself.
770 * @sa rd_kafka_topic_partition_list_add()
771 */
772RD_EXPORT
773rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size);
774
775
776/**
777 * @brief Free all resources used by the list and the list itself.
778 */
779RD_EXPORT
780void
781rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rkparlist);
782
783/**
784 * @brief Add topic+partition to list
785 *
786 * @param rktparlist List to extend
787 * @param topic Topic name (copied)
788 * @param partition Partition id
789 *
790 * @returns The object which can be used to fill in additionals fields.
791 */
792RD_EXPORT
793rd_kafka_topic_partition_t *
794rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
795 const char *topic, int32_t partition);
796
797
798/**
799 * @brief Add range of partitions from \p start to \p stop inclusive.
800 *
801 * @param rktparlist List to extend
802 * @param topic Topic name (copied)
803 * @param start Start partition of range
804 * @param stop Last partition of range (inclusive)
805 */
806RD_EXPORT
807void
808rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
809 *rktparlist,
810 const char *topic,
811 int32_t start, int32_t stop);
812
813
814
815/**
816 * @brief Delete partition from list.
817 *
818 * @param rktparlist List to modify
819 * @param topic Topic name to match
820 * @param partition Partition to match
821 *
822 * @returns 1 if partition was found (and removed), else 0.
823 *
824 * @remark Any held indices to elems[] are unusable after this call returns 1.
825 */
826RD_EXPORT
827int
828rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
829 const char *topic, int32_t partition);
830
831
832/**
833 * @brief Delete partition from list by elems[] index.
834 *
835 * @returns 1 if partition was found (and removed), else 0.
836 *
837 * @sa rd_kafka_topic_partition_list_del()
838 */
839RD_EXPORT
840int
841rd_kafka_topic_partition_list_del_by_idx (
842 rd_kafka_topic_partition_list_t *rktparlist,
843 int idx);
844
845
846/**
847 * @brief Make a copy of an existing list.
848 *
849 * @param src The existing list to copy.
850 *
851 * @returns A new list fully populated to be identical to \p src
852 */
853RD_EXPORT
854rd_kafka_topic_partition_list_t *
855rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src);
856
857
858
859
860/**
861 * @brief Set offset to \p offset for \p topic and \p partition
862 *
863 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or
864 * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if \p partition was not found
865 * in the list.
866 */
867RD_EXPORT
868rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
869 rd_kafka_topic_partition_list_t *rktparlist,
870 const char *topic, int32_t partition, int64_t offset);
871
872
873
874/**
875 * @brief Find element by \p topic and \p partition.
876 *
877 * @returns a pointer to the first matching element, or NULL if not found.
878 */
879RD_EXPORT
880rd_kafka_topic_partition_t *
881rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
882 const char *topic, int32_t partition);
883
884
885/**
886 * @brief Sort list using comparator \p cmp.
887 *
888 * If \p cmp is NULL the default comparator will be used that
889 * sorts by ascending topic name and partition.
890 *
891 */
892RD_EXPORT void
893rd_kafka_topic_partition_list_sort (rd_kafka_topic_partition_list_t *rktparlist,
894 int (*cmp) (const void *a, const void *b,
895 void *opaque),
896 void *opaque);
897
898
899/**@}*/
900
901
902
903/**
904 * @name Var-arg tag types
905 * @{
906 *
907 */
908
909/**
910 * @enum rd_kafka_vtype_t
911 *
912 * @brief Var-arg tag types
913 *
914 * @sa rd_kafka_producev()
915 */
916typedef enum rd_kafka_vtype_t {
917 RD_KAFKA_VTYPE_END, /**< va-arg sentinel */
918 RD_KAFKA_VTYPE_TOPIC, /**< (const char *) Topic name */
919 RD_KAFKA_VTYPE_RKT, /**< (rd_kafka_topic_t *) Topic handle */
920 RD_KAFKA_VTYPE_PARTITION, /**< (int32_t) Partition */
921 RD_KAFKA_VTYPE_VALUE, /**< (void *, size_t) Message value (payload)*/
922 RD_KAFKA_VTYPE_KEY, /**< (void *, size_t) Message key */
923 RD_KAFKA_VTYPE_OPAQUE, /**< (void *) Application opaque */
924 RD_KAFKA_VTYPE_MSGFLAGS, /**< (int) RD_KAFKA_MSG_F_.. flags */
925 RD_KAFKA_VTYPE_TIMESTAMP, /**< (int64_t) Milliseconds since epoch UTC */
926 RD_KAFKA_VTYPE_HEADER, /**< (const char *, const void *, ssize_t)
927 * Message Header */
928 RD_KAFKA_VTYPE_HEADERS, /**< (rd_kafka_headers_t *) Headers list */
929} rd_kafka_vtype_t;
930
931
932/**
933 * @brief Convenience macros for rd_kafka_vtype_t that takes the
934 * correct arguments for each vtype.
935 */
936
937/*!
938 * va-arg end sentinel used to terminate the variable argument list
939 */
940#define RD_KAFKA_V_END RD_KAFKA_VTYPE_END
941
942/*!
943 * Topic name (const char *)
944 */
945#define RD_KAFKA_V_TOPIC(topic) \
946 _LRK_TYPECHECK(RD_KAFKA_VTYPE_TOPIC, const char *, topic), \
947 (const char *)topic
948/*!
949 * Topic object (rd_kafka_topic_t *)
950 */
951#define RD_KAFKA_V_RKT(rkt) \
952 _LRK_TYPECHECK(RD_KAFKA_VTYPE_RKT, rd_kafka_topic_t *, rkt), \
953 (rd_kafka_topic_t *)rkt
954/*!
955 * Partition (int32_t)
956 */
957#define RD_KAFKA_V_PARTITION(partition) \
958 _LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \
959 (int32_t)partition
960/*!
961 * Message value/payload pointer and length (void *, size_t)
962 */
963#define RD_KAFKA_V_VALUE(VALUE,LEN) \
964 _LRK_TYPECHECK2(RD_KAFKA_VTYPE_VALUE, void *, VALUE, size_t, LEN), \
965 (void *)VALUE, (size_t)LEN
966/*!
967 * Message key pointer and length (const void *, size_t)
968 */
969#define RD_KAFKA_V_KEY(KEY,LEN) \
970 _LRK_TYPECHECK2(RD_KAFKA_VTYPE_KEY, const void *, KEY, size_t, LEN), \
971 (void *)KEY, (size_t)LEN
972/*!
973 * Message opaque pointer (void *)
974 * Same as \c produce(.., msg_opaque), and \c rkmessage->_private .
975 */
976#define RD_KAFKA_V_OPAQUE(opaque) \
977 _LRK_TYPECHECK(RD_KAFKA_VTYPE_OPAQUE, void *, opaque), \
978 (void *)opaque
979/*!
980 * Message flags (int)
981 * @sa RD_KAFKA_MSG_F_COPY, et.al.
982 */
983#define RD_KAFKA_V_MSGFLAGS(msgflags) \
984 _LRK_TYPECHECK(RD_KAFKA_VTYPE_MSGFLAGS, int, msgflags), \
985 (int)msgflags
986/*!
987 * Timestamp in milliseconds since epoch UTC (int64_t).
988 * A value of 0 will use the current wall-clock time.
989 */
990#define RD_KAFKA_V_TIMESTAMP(timestamp) \
991 _LRK_TYPECHECK(RD_KAFKA_VTYPE_TIMESTAMP, int64_t, timestamp), \
992 (int64_t)timestamp
993/*!
994 * Add Message Header (const char *NAME, const void *VALUE, ssize_t LEN).
995 * @sa rd_kafka_header_add()
996 * @remark RD_KAFKA_V_HEADER() and RD_KAFKA_V_HEADERS() MUST NOT be mixed
997 * in the same call to producev().
998 */
999#define RD_KAFKA_V_HEADER(NAME,VALUE,LEN) \
1000 _LRK_TYPECHECK3(RD_KAFKA_VTYPE_HEADER, const char *, NAME, \
1001 const void *, VALUE, ssize_t, LEN), \
1002 (const char *)NAME, (const void *)VALUE, (ssize_t)LEN
1003
1004/*!
1005 * Message Headers list (rd_kafka_headers_t *).
1006 * The message object will assume ownership of the headers (unless producev()
1007 * fails).
1008 * Any existing headers will be replaced.
1009 * @sa rd_kafka_message_set_headers()
1010 * @remark RD_KAFKA_V_HEADER() and RD_KAFKA_V_HEADERS() MUST NOT be mixed
1011 * in the same call to producev().
1012 */
1013#define RD_KAFKA_V_HEADERS(HDRS) \
1014 _LRK_TYPECHECK(RD_KAFKA_VTYPE_HEADERS, rd_kafka_headers_t *, HDRS), \
1015 (rd_kafka_headers_t *)HDRS
1016
1017
1018/**@}*/
1019
1020
1021/**
1022 * @name Message headers
1023 * @{
1024 *
1025 * @brief Message headers consist of a list of (string key, binary value) pairs.
1026 * Duplicate keys are supported and the order in which keys were
1027 * added are retained.
1028 *
1029 * Header values are considered binary and may have three types of
1030 * value:
1031 * - proper value with size > 0 and a valid pointer
1032 * - empty value with size = 0 and any non-NULL pointer
1033 * - null value with size = 0 and a NULL pointer
1034 *
1035 * Headers require Apache Kafka broker version v0.11.0.0 or later.
1036 *
1037 * Header operations are O(n).
1038 */
1039
1040typedef struct rd_kafka_headers_s rd_kafka_headers_t;
1041
1042/**
1043 * @brief Create a new headers list.
1044 *
1045 * @param initial_count Preallocate space for this number of headers.
1046 * Any number of headers may be added, updated and
1047 * removed regardless of the initial count.
1048 */
1049RD_EXPORT rd_kafka_headers_t *rd_kafka_headers_new (size_t initial_count);
1050
1051/**
1052 * @brief Destroy the headers list. The object and any returned value pointers
1053 * are not usable after this call.
1054 */
1055RD_EXPORT void rd_kafka_headers_destroy (rd_kafka_headers_t *hdrs);
1056
1057/**
1058 * @brief Make a copy of headers list \p src.
1059 */
1060RD_EXPORT rd_kafka_headers_t *
1061rd_kafka_headers_copy (const rd_kafka_headers_t *src);
1062
1063/**
1064 * @brief Add header with name \p name and value \p val (copied) of size
1065 * \p size (not including null-terminator).
1066 *
1067 * @param name Header name.
1068 * @param name_size Header name size (not including the null-terminator).
1069 * If -1 the \p name length is automatically acquired using
1070 * strlen().
1071 * @param value Pointer to header value, or NULL (set size to 0 or -1).
1072 * @param value_size Size of header value. If -1 the \p value is assumed to be a
1073 * null-terminated string and the length is automatically
1074 * acquired using strlen().
1075 *
1076 * @returns RD_KAFKA_RESP_ERR__READ_ONLY if the headers are read-only,
1077 * else RD_KAFKA_RESP_ERR_NO_ERROR.
1078 */
1079RD_EXPORT rd_kafka_resp_err_t
1080rd_kafka_header_add (rd_kafka_headers_t *hdrs,
1081 const char *name, ssize_t name_size,
1082 const void *value, ssize_t value_size);
1083
1084/**
1085 * @brief Remove all headers for the given key (if any).
1086 *
1087 * @returns RD_KAFKA_RESP_ERR__READ_ONLY if the headers are read-only,
1088 * RD_KAFKA_RESP_ERR__NOENT if no matching headers were found,
1089 * else RD_KAFKA_RESP_ERR_NO_ERROR if headers were removed.
1090 */
1091RD_EXPORT rd_kafka_resp_err_t
1092rd_kafka_header_remove (rd_kafka_headers_t *hdrs, const char *name);
1093
1094
1095/**
1096 * @brief Find last header in list \p hdrs matching \p name.
1097 *
1098 * @param name Header to find (last match).
1099 * @param valuep (out) Set to a (null-terminated) const pointer to the value
1100 * (may be NULL).
1101 * @param sizep (out) Set to the value's size (not including null-terminator).
1102 *
1103 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if an entry was found, else
1104 * RD_KAFKA_RESP_ERR__NOENT.
1105 *
1106 * @remark The returned pointer in \p valuep includes a trailing null-terminator
1107 * that is not accounted for in \p sizep.
1108 * @remark The returned pointer is only valid as long as the headers list and
1109 * the header item is valid.
1110 */
1111RD_EXPORT rd_kafka_resp_err_t
1112rd_kafka_header_get_last (const rd_kafka_headers_t *hdrs,
1113 const char *name, const void **valuep, size_t *sizep);
1114
1115/**
1116 * @brief Iterator for headers matching \p name.
1117 *
1118 * Same semantics as rd_kafka_header_get_last()
1119 *
1120 * @param hdrs Headers to iterate.
1121 * @param idx Iterator index, start at 0 and increment by one for each call
1122 * as long as RD_KAFKA_RESP_ERR_NO_ERROR is returned.
1123 * @param name Header name to match.
1124 * @param valuep (out) Set to a (null-terminated) const pointer to the value
1125 * (may be NULL).
1126 * @param sizep (out) Set to the value's size (not including null-terminator).
1127 */
1128RD_EXPORT rd_kafka_resp_err_t
1129rd_kafka_header_get (const rd_kafka_headers_t *hdrs, size_t idx,
1130 const char *name, const void **valuep, size_t *sizep);
1131
1132
1133/**
1134 * @brief Iterator for all headers.
1135 *
1136 * Same semantics as rd_kafka_header_get()
1137 *
1138 * @sa rd_kafka_header_get()
1139 */
1140RD_EXPORT rd_kafka_resp_err_t
1141rd_kafka_header_get_all (const rd_kafka_headers_t *hdrs, size_t idx,
1142 const char **namep,
1143 const void **valuep, size_t *sizep);
1144
1145
1146
1147/**@}*/
1148
1149
1150
1151/**
1152 * @name Kafka messages
1153 * @{
1154 *
1155 */
1156
1157
1158
1159// FIXME: This doesn't show up in docs for some reason
1160// "Compound rd_kafka_message_t is not documented."
1161
1162/**
1163 * @brief A Kafka message as returned by the \c rd_kafka_consume*() family
1164 * of functions as well as provided to the Producer \c dr_msg_cb().
1165 *
1166 * For the consumer this object has two purposes:
1167 * - provide the application with a consumed message. (\c err == 0)
1168 * - report per-topic+partition consumer errors (\c err != 0)
1169 *
1170 * The application must check \c err to decide what action to take.
1171 *
1172 * When the application is finished with a message it must call
1173 * rd_kafka_message_destroy() unless otherwise noted.
1174 */
1175typedef struct rd_kafka_message_s {
1176 rd_kafka_resp_err_t err; /**< Non-zero for error signaling. */
1177 rd_kafka_topic_t *rkt; /**< Topic */
1178 int32_t partition; /**< Partition */
1179 void *payload; /**< Producer: original message payload.
1180 * Consumer: Depends on the value of \c err :
1181 * - \c err==0: Message payload.
1182 * - \c err!=0: Error string */
1183 size_t len; /**< Depends on the value of \c err :
1184 * - \c err==0: Message payload length
1185 * - \c err!=0: Error string length */
1186 void *key; /**< Depends on the value of \c err :
1187 * - \c err==0: Optional message key */
1188 size_t key_len; /**< Depends on the value of \c err :
1189 * - \c err==0: Optional message key length*/
1190 int64_t offset; /**< Consumer:
1191 * - Message offset (or offset for error
1192 * if \c err!=0 if applicable).
1193 * Producer, dr_msg_cb:
1194 * Message offset assigned by broker.
1195 * May be RD_KAFKA_OFFSET_INVALID
1196 * for retried messages when
1197 * idempotence is enabled. */
1198 void *_private; /**< Consume:
1199 * - rdkafka private pointer: DO NOT MODIFY
1200 * - dr_msg_cb:
1201 * msg_opaque from produce() call */
1202} rd_kafka_message_t;
1203
1204
1205/**
1206 * @brief Frees resources for \p rkmessage and hands ownership back to rdkafka.
1207 */
1208RD_EXPORT
1209void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
1210
1211
1212
1213
1214/**
1215 * @brief Returns the error string for an errored rd_kafka_message_t or NULL if
1216 * there was no error.
1217 *
1218 * @remark This function MUST NOT be used with the producer.
1219 */
1220static RD_INLINE const char *
1221RD_UNUSED
1222rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) {
1223 if (!rkmessage->err)
1224 return NULL;
1225
1226 if (rkmessage->payload)
1227 return (const char *)rkmessage->payload;
1228
1229 return rd_kafka_err2str(rkmessage->err);
1230}
1231
1232
1233
1234/**
1235 * @brief Returns the message timestamp for a consumed message.
1236 *
1237 * The timestamp is the number of milliseconds since the epoch (UTC).
1238 *
1239 * \p tstype (if not NULL) is updated to indicate the type of timestamp.
1240 *
1241 * @returns message timestamp, or -1 if not available.
1242 *
1243 * @remark Message timestamps require broker version 0.10.0 or later.
1244 */
1245RD_EXPORT
1246int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
1247 rd_kafka_timestamp_type_t *tstype);
1248
1249
1250
1251/**
1252 * @brief Returns the latency for a produced message measured from
1253 * the produce() call.
1254 *
1255 * @returns the latency in microseconds, or -1 if not available.
1256 */
1257RD_EXPORT
1258int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage);
1259
1260
1261/**
1262 * @brief Get the message header list.
1263 *
1264 * The returned pointer in \p *hdrsp is associated with the \p rkmessage and
1265 * must not be used after destruction of the message object or the header
1266 * list is replaced with rd_kafka_message_set_headers().
1267 *
1268 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if headers were returned,
1269 * RD_KAFKA_RESP_ERR__NOENT if the message has no headers,
1270 * or another error code if the headers could not be parsed.
1271 *
1272 * @remark Headers require broker version 0.11.0.0 or later.
1273 *
1274 * @remark As an optimization the raw protocol headers are parsed on
1275 * the first call to this function.
1276 */
1277RD_EXPORT rd_kafka_resp_err_t
1278rd_kafka_message_headers (const rd_kafka_message_t *rkmessage,
1279 rd_kafka_headers_t **hdrsp);
1280
1281/**
1282 * @brief Get the message header list and detach the list from the message
1283 * making the application the owner of the headers.
1284 * The application must eventually destroy the headers using
1285 * rd_kafka_headers_destroy().
1286 * The message's headers will be set to NULL.
1287 *
1288 * Otherwise same semantics as rd_kafka_message_headers()
1289 *
1290 * @sa rd_kafka_message_headers
1291 */
1292RD_EXPORT rd_kafka_resp_err_t
1293rd_kafka_message_detach_headers (rd_kafka_message_t *rkmessage,
1294 rd_kafka_headers_t **hdrsp);
1295
1296
1297/**
1298 * @brief Replace the message's current headers with a new list.
1299 *
1300 * @param hdrs New header list. The message object assumes ownership of
1301 * the list, the list will be destroyed automatically with
1302 * the message object.
1303 * The new headers list may be updated until the message object
1304 * is passed or returned to librdkafka.
1305 *
1306 * @remark The existing headers object, if any, will be destroyed.
1307 */
1308RD_EXPORT
1309void rd_kafka_message_set_headers (rd_kafka_message_t *rkmessage,
1310 rd_kafka_headers_t *hdrs);
1311
1312
1313/**
1314 * @brief Returns the number of header key/value pairs
1315 *
1316 * @param hdrs Headers to count
1317 */
1318RD_EXPORT size_t rd_kafka_header_cnt (const rd_kafka_headers_t *hdrs);
1319
1320
1321/**
1322 * @enum rd_kafka_msg_status_t
1323 * @brief Message persistence status can be used by the application to
1324 * find out if a produced message was persisted in the topic log.
1325 */
1326typedef enum {
1327 /**< Message was never transmitted to the broker, or failed with
1328 * an error indicating it was not written to the log.
1329 * Application retry risks ordering, but not duplication. */
1330 RD_KAFKA_MSG_STATUS_NOT_PERSISTED = 0,
1331
1332 /**< Message was transmitted to broker, but no acknowledgement was
1333 * received.
1334 * Application retry risks ordering and duplication. */
1335 RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED = 1,
1336
1337 /**< Message was written to the log and acknowledged by the broker. */
1338 RD_KAFKA_MSG_STATUS_PERSISTED = 2
1339} rd_kafka_msg_status_t;
1340
1341
1342/**
1343 * @brief Returns the message's persistence status in the topic log.
1344 *
1345 * @remark The message status is not available in on_acknowledgement
1346 * interceptors.
1347 */
1348RD_EXPORT rd_kafka_msg_status_t
1349rd_kafka_message_status (const rd_kafka_message_t *rkmessage);
1350
1351/**@}*/
1352
1353
1354/**
1355 * @name Configuration interface
1356 * @{
1357 *
1358 * @brief Main/global configuration property interface
1359 *
1360 */
1361
1362/**
1363 * @enum rd_kafka_conf_res_t
1364 * @brief Configuration result type
1365 */
1366typedef enum {
1367 RD_KAFKA_CONF_UNKNOWN = -2, /**< Unknown configuration name. */
1368 RD_KAFKA_CONF_INVALID = -1, /**< Invalid configuration value. */
1369 RD_KAFKA_CONF_OK = 0 /**< Configuration okay */
1370} rd_kafka_conf_res_t;
1371
1372
1373/**
1374 * @brief Create configuration object.
1375 *
1376 * When providing your own configuration to the \c rd_kafka_*_new_*() calls
1377 * the rd_kafka_conf_t objects needs to be created with this function
1378 * which will set up the defaults.
1379 * I.e.:
1380 * @code
1381 * rd_kafka_conf_t *myconf;
1382 * rd_kafka_conf_res_t res;
1383 *
1384 * myconf = rd_kafka_conf_new();
1385 * res = rd_kafka_conf_set(myconf, "socket.timeout.ms", "600",
1386 * errstr, sizeof(errstr));
1387 * if (res != RD_KAFKA_CONF_OK)
1388 * die("%s\n", errstr);
1389 *
1390 * rk = rd_kafka_new(..., myconf);
1391 * @endcode
1392 *
1393 * Please see CONFIGURATION.md for the default settings or use
1394 * rd_kafka_conf_properties_show() to provide the information at runtime.
1395 *
1396 * The properties are identical to the Apache Kafka configuration properties
1397 * whenever possible.
1398 *
1399 * @returns A new rd_kafka_conf_t object with defaults set.
1400 *
1401 * @sa rd_kafka_conf_set(), rd_kafka_conf_destroy()
1402 */
1403RD_EXPORT
1404rd_kafka_conf_t *rd_kafka_conf_new(void);
1405
1406
1407/**
1408 * @brief Destroys a conf object.
1409 */
1410RD_EXPORT
1411void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
1412
1413
1414/**
1415 * @brief Creates a copy/duplicate of configuration object \p conf
1416 *
1417 * @remark Interceptors are NOT copied to the new configuration object.
1418 * @sa rd_kafka_interceptor_f_on_conf_dup
1419 */
1420RD_EXPORT
1421rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
1422
1423
1424/**
1425 * @brief Same as rd_kafka_conf_dup() but with an array of property name
1426 * prefixes to filter out (ignore) when copying.
1427 */
1428RD_EXPORT
1429rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
1430 size_t filter_cnt,
1431 const char **filter);
1432
1433
1434
1435/**
1436 * @returns the configuration object used by an rd_kafka_t instance.
1437 * For use with rd_kafka_conf_get(), et.al., to extract configuration
1438 * properties from a running client.
1439 *
1440 * @remark the returned object is read-only and its lifetime is the same
1441 * as the rd_kafka_t object.
1442 */
1443RD_EXPORT
1444const rd_kafka_conf_t *rd_kafka_conf (rd_kafka_t *rk);
1445
1446
1447/**
1448 * @brief Sets a configuration property.
1449 *
1450 * \p conf must have been previously created with rd_kafka_conf_new().
1451 *
1452 * Fallthrough:
1453 * Topic-level configuration properties may be set using this interface
1454 * in which case they are applied on the \c default_topic_conf.
1455 * If no \c default_topic_conf has been set one will be created.
1456 * Any sub-sequent rd_kafka_conf_set_default_topic_conf() calls will
1457 * replace the current default topic configuration.
1458 *
1459 * @returns \c rd_kafka_conf_res_t to indicate success or failure.
1460 * In case of failure \p errstr is updated to contain a human readable
1461 * error string.
1462 */
1463RD_EXPORT
1464rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
1465 const char *name,
1466 const char *value,
1467 char *errstr, size_t errstr_size);
1468
1469
1470/**
1471 * @brief Enable event sourcing.
1472 * \p events is a bitmask of \c RD_KAFKA_EVENT_* of events to enable
1473 * for consumption by `rd_kafka_queue_poll()`.
1474 */
1475RD_EXPORT
1476void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events);
1477
1478
1479/**
1480 * @brief Generic event callback to be used with the event API to trigger
1481 * callbacks for \c rd_kafka_event_t objects from a background
1482 * thread serving the background queue.
1483 *
1484 * How to use:
1485 * 1. First set the event callback on the configuration object with this
1486 * function, followed by creating an rd_kafka_t instance
1487 * with rd_kafka_new().
1488 * 2. Get the instance's background queue with rd_kafka_queue_get_background()
1489 * and pass it as the reply/response queue to an API that takes an
1490 * event queue, such as rd_kafka_CreateTopics().
1491 * 3. As the response event is ready and enqueued on the background queue the
1492 * event callback will be triggered from the background thread.
1493 * 4. Prior to destroying the client instance, loose your reference to the
1494 * background queue by calling rd_kafka_queue_destroy().
1495 *
1496 * The application must destroy the \c rkev passed to \p event cb using
1497 * rd_kafka_event_destroy().
1498 *
1499 * The \p event_cb \c opaque argument is the opaque set with
1500 * rd_kafka_conf_set_opaque().
1501 *
1502 * @remark This callback is a specialized alternative to the poll-based
1503 * event API described in the Event interface section.
1504 *
1505 * @remark The \p event_cb will be called spontaneously from a background
1506 * thread completely managed by librdkafka.
1507 * Take care to perform proper locking of application objects.
1508 *
1509 * @warning The application MUST NOT call rd_kafka_destroy() from the
1510 * event callback.
1511 *
1512 * @sa rd_kafka_queue_get_background
1513 */
1514RD_EXPORT void
1515rd_kafka_conf_set_background_event_cb (rd_kafka_conf_t *conf,
1516 void (*event_cb) (rd_kafka_t *rk,
1517 rd_kafka_event_t *rkev,
1518 void *opaque));
1519
1520
1521/**
1522 * @deprecated See rd_kafka_conf_set_dr_msg_cb()
1523 */
1524RD_EXPORT
1525void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
1526 void (*dr_cb) (rd_kafka_t *rk,
1527 void *payload, size_t len,
1528 rd_kafka_resp_err_t err,
1529 void *opaque, void *msg_opaque));
1530
1531/**
1532 * @brief \b Producer: Set delivery report callback in provided \p conf object.
1533 *
1534 * The delivery report callback will be called once for each message
1535 * accepted by rd_kafka_produce() (et.al) with \p err set to indicate
1536 * the result of the produce request.
1537 *
1538 * The callback is called when a message is succesfully produced or
1539 * if librdkafka encountered a permanent failure.
1540 * Delivery errors occur when the retry count is exceeded, when the
1541 * message.timeout.ms timeout is exceeded or there is a permanent error
1542 * like RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART.
1543 *
1544 * An application must call rd_kafka_poll() at regular intervals to
1545 * serve queued delivery report callbacks.
1546 *
1547 * The broker-assigned offset can be retrieved with \c rkmessage->offset
1548 * and the timestamp can be retrieved using rd_kafka_message_timestamp().
1549 *
1550 * @remark The Idempotent Producer may return invalid timestamp
1551 * (RD_KAFKA_TIMESTAMP_NOT_AVAILABLE), and
1552 * and offset (RD_KAFKA_OFFSET_INVALID) for retried messages
1553 * that were previously successfully delivered but not properly
1554 * acknowledged.
1555 */
1556RD_EXPORT
1557void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
1558 void (*dr_msg_cb) (rd_kafka_t *rk,
1559 const rd_kafka_message_t *
1560 rkmessage,
1561 void *opaque));
1562
1563
1564/**
1565 * @brief \b Consumer: Set consume callback for use with rd_kafka_consumer_poll()
1566 *
1567 */
1568RD_EXPORT
1569void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
1570 void (*consume_cb) (rd_kafka_message_t *
1571 rkmessage,
1572 void *opaque));
1573
1574/**
1575 * @brief \b Consumer: Set rebalance callback for use with
1576 * coordinated consumer group balancing.
1577 *
1578 * The \p err field is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
1579 * or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS and 'partitions'
1580 * contains the full partition set that was either assigned or revoked.
1581 *
1582 * Registering a \p rebalance_cb turns off librdkafka's automatic
1583 * partition assignment/revocation and instead delegates that responsibility
1584 * to the application's \p rebalance_cb.
1585 *
1586 * The rebalance callback is responsible for updating librdkafka's
1587 * assignment set based on the two events: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
1588 * and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle
1589 * arbitrary rebalancing failures where \p err is neither of those.
1590 * @remark In this latter case (arbitrary error), the application must
1591 * call rd_kafka_assign(rk, NULL) to synchronize state.
1592 *
1593 * Without a rebalance callback this is done automatically by librdkafka
1594 * but registering a rebalance callback gives the application flexibility
1595 * in performing other operations along with the assigning/revocation,
1596 * such as fetching offsets from an alternate location (on assign)
1597 * or manually committing offsets (on revoke).
1598 *
1599 * @remark The \p partitions list is destroyed by librdkafka on return
1600 * return from the rebalance_cb and must not be freed or
1601 * saved by the application.
1602 *
1603 * @remark Be careful when modifying the \p partitions list.
1604 * Changing this list should only be done to change the initial
1605 * offsets for each partition.
1606 * But a function like `rd_kafka_position()` might have unexpected
1607 * effects for instance when a consumer gets assigned a partition
1608 * it used to consume at an earlier rebalance. In this case, the
1609 * list of partitions will be updated with the old offset for that
1610 * partition. In this case, it is generally better to pass a copy
1611 * of the list (see `rd_kafka_topic_partition_list_copy()`).
1612 * The result of `rd_kafka_position()` is typically outdated in
1613 * RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS.
1614 *
1615 * The following example shows the application's responsibilities:
1616 * @code
1617 * static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
1618 * rd_kafka_topic_partition_list_t *partitions,
1619 * void *opaque) {
1620 *
1621 * switch (err)
1622 * {
1623 * case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
1624 * // application may load offets from arbitrary external
1625 * // storage here and update \p partitions
1626 *
1627 * rd_kafka_assign(rk, partitions);
1628 * break;
1629 *
1630 * case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
1631 * if (manual_commits) // Optional explicit manual commit
1632 * rd_kafka_commit(rk, partitions, 0); // sync commit
1633 *
1634 * rd_kafka_assign(rk, NULL);
1635 * break;
1636 *
1637 * default:
1638 * handle_unlikely_error(err);
1639 * rd_kafka_assign(rk, NULL); // sync state
1640 * break;
1641 * }
1642 * }
1643 * @endcode
1644 */
1645RD_EXPORT
1646void rd_kafka_conf_set_rebalance_cb (
1647 rd_kafka_conf_t *conf,
1648 void (*rebalance_cb) (rd_kafka_t *rk,
1649 rd_kafka_resp_err_t err,
1650 rd_kafka_topic_partition_list_t *partitions,
1651 void *opaque));
1652
1653
1654
1655/**
1656 * @brief \b Consumer: Set offset commit callback for use with consumer groups.
1657 *
1658 * The results of automatic or manual offset commits will be scheduled
1659 * for this callback and is served by rd_kafka_consumer_poll().
1660 *
1661 * If no partitions had valid offsets to commit this callback will be called
1662 * with \p err == RD_KAFKA_RESP_ERR__NO_OFFSET which is not to be considered
1663 * an error.
1664 *
1665 * The \p offsets list contains per-partition information:
1666 * - \c offset: committed offset (attempted)
1667 * - \c err: commit error
1668 */
1669RD_EXPORT
1670void rd_kafka_conf_set_offset_commit_cb (
1671 rd_kafka_conf_t *conf,
1672 void (*offset_commit_cb) (rd_kafka_t *rk,
1673 rd_kafka_resp_err_t err,
1674 rd_kafka_topic_partition_list_t *offsets,
1675 void *opaque));
1676
1677
1678/**
1679 * @brief Set error callback in provided conf object.
1680 *
1681 * The error callback is used by librdkafka to signal warnings and errors
1682 * back to the application.
1683 *
1684 * These errors should generally be considered informational and non-permanent,
1685 * the client will try to recover automatically from all type of errors.
1686 * Given that the client and cluster configuration is correct the
1687 * application should treat these as temporary errors.
1688 *
1689 * \p error_cb will be triggered with \c err set to RD_KAFKA_RESP_ERR__FATAL
1690 * if a fatal error has been raised; in this case use rd_kafka_fatal_error() to
1691 * retrieve the fatal error code and error string, and then begin terminating
1692 * the client instance.
1693 *
1694 * If no \p error_cb is registered, or RD_KAFKA_EVENT_ERROR has not been set
1695 * with rd_kafka_conf_set_events, then the errors will be logged instead.
1696 */
1697RD_EXPORT
1698void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
1699 void (*error_cb) (rd_kafka_t *rk, int err,
1700 const char *reason,
1701 void *opaque));
1702
1703/**
1704 * @brief Set throttle callback.
1705 *
1706 * The throttle callback is used to forward broker throttle times to the
1707 * application for Produce and Fetch (consume) requests.
1708 *
1709 * Callbacks are triggered whenever a non-zero throttle time is returned by
1710 * the broker, or when the throttle time drops back to zero.
1711 *
1712 * An application must call rd_kafka_poll() or rd_kafka_consumer_poll() at
1713 * regular intervals to serve queued callbacks.
1714 *
1715 * @remark Requires broker version 0.9.0 or later.
1716 */
1717RD_EXPORT
1718void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
1719 void (*throttle_cb) (
1720 rd_kafka_t *rk,
1721 const char *broker_name,
1722 int32_t broker_id,
1723 int throttle_time_ms,
1724 void *opaque));
1725
1726
1727/**
1728 * @brief Set logger callback.
1729 *
1730 * The default is to print to stderr, but a syslog logger is also available,
1731 * see rd_kafka_log_print and rd_kafka_log_syslog for the builtin alternatives.
1732 * Alternatively the application may provide its own logger callback.
1733 * Or pass \p func as NULL to disable logging.
1734 *
1735 * This is the configuration alternative to the deprecated rd_kafka_set_logger()
1736 *
1737 * @remark The log_cb will be called spontaneously from librdkafka's internal
1738 * threads unless logs have been forwarded to a poll queue through
1739 * \c rd_kafka_set_log_queue().
1740 * An application MUST NOT call any librdkafka APIs or do any prolonged
1741 * work in a non-forwarded \c log_cb.
1742 */
1743RD_EXPORT
1744void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
1745 void (*log_cb) (const rd_kafka_t *rk, int level,
1746 const char *fac, const char *buf));
1747
1748
1749/**
1750 * @brief Set statistics callback in provided conf object.
1751 *
1752 * The statistics callback is triggered from rd_kafka_poll() every
1753 * \c statistics.interval.ms (needs to be configured separately).
1754 * Function arguments:
1755 * - \p rk - Kafka handle
1756 * - \p json - String containing the statistics data in JSON format
1757 * - \p json_len - Length of \p json string.
1758 * - \p opaque - Application-provided opaque.
1759 *
1760 * For more information on the format of \p json, see
1761 * https://github.com/edenhill/librdkafka/wiki/Statistics
1762 *
1763 * If the application wishes to hold on to the \p json pointer and free
1764 * it at a later time it must return 1 from the \p stats_cb.
1765 * If the application returns 0 from the \p stats_cb then librdkafka
1766 * will immediately free the \p json pointer.
1767 *
1768 * See STATISTICS.md for a full definition of the JSON object.
1769 */
1770RD_EXPORT
1771void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
1772 int (*stats_cb) (rd_kafka_t *rk,
1773 char *json,
1774 size_t json_len,
1775 void *opaque));
1776
1777/**
1778 * @brief Set SASL/OAUTHBEARER token refresh callback in provided conf object.
1779 *
1780 * @param conf the configuration to mutate.
1781 * @param oauthbearer_token_refresh_cb the callback to set; callback function
1782 * arguments:<br>
1783 * \p rk - Kafka handle<br>
1784 * \p oauthbearer_config - Value of configuration property
1785 * sasl.oauthbearer.config.
1786 * \p opaque - Application-provided opaque set via
1787 * rd_kafka_conf_set_opaque()
1788 *
1789 * The SASL/OAUTHBEARER token refresh callback is triggered via rd_kafka_poll()
1790 * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved,
1791 * typically based on the configuration defined in \c sasl.oauthbearer.config.
1792 *
1793 * The callback should invoke rd_kafka_oauthbearer_set_token()
1794 * or rd_kafka_oauthbearer_set_token_failure() to indicate success
1795 * or failure, respectively.
1796 *
1797 * The refresh operation is eventable and may be received via
1798 * rd_kafka_queue_poll() with an event type of
1799 * \c RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH.
1800 *
1801 * Note that before any SASL/OAUTHBEARER broker connection can succeed the
1802 * application must call rd_kafka_oauthbearer_set_token() once -- either
1803 * directly or, more typically, by invoking either rd_kafka_poll() or
1804 * rd_kafka_queue_poll() -- in order to cause retrieval of an initial token to
1805 * occur.
1806 *
1807 * An unsecured JWT refresh handler is provided by librdkafka for development
1808 * and testing purposes, it is enabled by setting
1809 * the \c enable.sasl.oauthbearer.unsecure.jwt property to true and is
1810 * mutually exclusive to using a refresh callback.
1811 */
1812RD_EXPORT
1813void rd_kafka_conf_set_oauthbearer_token_refresh_cb (
1814 rd_kafka_conf_t *conf,
1815 void (*oauthbearer_token_refresh_cb) (rd_kafka_t *rk,
1816 const char *oauthbearer_config,
1817 void *opaque));
1818
1819/**
1820 * @brief Set socket callback.
1821 *
1822 * The socket callback is responsible for opening a socket
1823 * according to the supplied \p domain, \p type and \p protocol.
1824 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if
1825 * possible.
1826 *
1827 * Default:
1828 * - on linux: racefree CLOEXEC
1829 * - others : non-racefree CLOEXEC
1830 *
1831 * @remark The callback will be called from an internal librdkafka thread.
1832 */
1833RD_EXPORT
1834void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf,
1835 int (*socket_cb) (int domain, int type,
1836 int protocol,
1837 void *opaque));
1838
1839
1840
1841/**
1842 * @brief Set connect callback.
1843 *
1844 * The connect callback is responsible for connecting socket \p sockfd
1845 * to peer address \p addr.
1846 * The \p id field contains the broker identifier.
1847 *
1848 * \p connect_cb shall return 0 on success (socket connected) or an error
1849 * number (errno) on error.
1850 *
1851 * @remark The callback will be called from an internal librdkafka thread.
1852 */
1853RD_EXPORT void
1854rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
1855 int (*connect_cb) (int sockfd,
1856 const struct sockaddr *addr,
1857 int addrlen,
1858 const char *id,
1859 void *opaque));
1860
1861/**
1862 * @brief Set close socket callback.
1863 *
1864 * Close a socket (optionally opened with socket_cb()).
1865 *
1866 * @remark The callback will be called from an internal librdkafka thread.
1867 */
1868RD_EXPORT void
1869rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
1870 int (*closesocket_cb) (int sockfd,
1871 void *opaque));
1872
1873
1874
1875#ifndef _MSC_VER
1876/**
1877 * @brief Set open callback.
1878 *
1879 * The open callback is responsible for opening the file specified by
1880 * pathname, flags and mode.
1881 * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
1882 * possible.
1883 *
1884 * Default:
1885 * - on linux: racefree CLOEXEC
1886 * - others : non-racefree CLOEXEC
1887 *
1888 * @remark The callback will be called from an internal librdkafka thread.
1889 */
1890RD_EXPORT
1891void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
1892 int (*open_cb) (const char *pathname,
1893 int flags, mode_t mode,
1894 void *opaque));
1895#endif
1896
1897
1898/**
1899 * @brief Sets the verification callback of the broker certificate
1900 *
1901 * The verification callback is triggered from internal librdkafka threads
1902 * upon connecting to a broker. On each connection attempt the callback
1903 * will be called for each certificate in the broker's certificate chain,
1904 * starting at the root certification, as long as the application callback
1905 * returns 1 (valid certificate).
1906 * \c broker_name and \c broker_id correspond to the broker the connection
1907 * is being made to.
1908 * The \c x509_error argument indicates if OpenSSL's verification of
1909 * the certificate succeed (0) or failed (an OpenSSL error code).
1910 * The application may set the SSL context error code by returning 0
1911 * from the verify callback and providing a non-zero SSL context error code
1912 * in \p x509_error.
1913 * If the verify callback sets \x509_error to 0, returns 1, and the
1914 * original \p x509_error was non-zero, the error on the SSL context will
1915 * be cleared.
1916 * \p x509_error is always a valid pointer to an int.
1917 *
1918 * \c depth is the depth of the current certificate in the chain, starting
1919 * at the root certificate.
1920 *
1921 * The certificate itself is passed in binary DER format in \c buf of
1922 * size \c size.
1923 *
1924 * The callback must return 1 if verification succeeds, or
1925 * 0 if verification fails and then write a human-readable error message
1926 * to \c errstr (limited to \c errstr_size bytes, including nul-term).
1927 *
1928 * @returns RD_KAFKA_CONF_OK if SSL is supported in this build, else
1929 * RD_KAFKA_CONF_INVALID.
1930 *
1931 * @warning This callback will be called from internal librdkafka threads.
1932 *
1933 * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution
1934 * for a list of \p x509_error codes.
1935 */
1936RD_EXPORT
1937rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb (
1938 rd_kafka_conf_t *conf,
1939 int (*ssl_cert_verify_cb) (rd_kafka_t *rk,
1940 const char *broker_name,
1941 int32_t broker_id,
1942 int *x509_error,
1943 int depth,
1944 const char *buf, size_t size,
1945 char *errstr, size_t errstr_size,
1946 void *opaque));
1947
1948
1949/**
1950 * @enum rd_kafka_cert_type_t
1951 *
1952 * @brief SSL certificate type
1953 *
1954 * @sa rd_kafka_conf_set_ssl_cert
1955 */
1956typedef enum rd_kafka_cert_type_t {
1957 RD_KAFKA_CERT_PUBLIC_KEY, /**< Client's public key */
1958 RD_KAFKA_CERT_PRIVATE_KEY, /**< Client's private key */
1959 RD_KAFKA_CERT_CA, /**< CA certificate */
1960 RD_KAFKA_CERT__CNT,
1961} rd_kafka_cert_type_t;
1962
1963/**
1964 * @enum rd_kafka_cert_enc_t
1965 *
1966 * @brief SSL certificate encoding
1967 *
1968 * @sa rd_kafka_conf_set_ssl_cert
1969 */
1970typedef enum rd_kafka_cert_enc_t {
1971 RD_KAFKA_CERT_ENC_PKCS12, /**< PKCS#12 */
1972 RD_KAFKA_CERT_ENC_DER, /**< DER / binary X.509 ASN1 */
1973 RD_KAFKA_CERT_ENC_PEM, /**< PEM */
1974 RD_KAFKA_CERT_ENC__CNT,
1975} rd_kafka_cert_enc_t;
1976
1977
1978/**
1979 * @brief Set certificate/key \p cert_type from the \p cert_enc encoded
1980 * memory at \p buffer of \p size bytes.
1981 *
1982 * @param conf Configuration object.
1983 * @param cert_type Certificate or key type to configure.
1984 * @param cert_enc Buffer \p encoding type.
1985 * @param buffer Memory pointer to encoded certificate or key.
1986 * The memory is not referenced after this function returns.
1987 * @param size Size of memory at \p buffer.
1988 * @param errstr Memory were a human-readable error string will be written
1989 * on failure.
1990 * @param errstr_size Size of \p errstr, including space for nul-terminator.
1991 *
1992 * @returns RD_KAFKA_CONF_OK on success or RD_KAFKA_CONF_INVALID if the
1993 * memory in \p buffer is of incorrect encoding, or if librdkafka
1994 * was not built with SSL support.
1995 *
1996 * @remark Calling this method multiple times with the same \p cert_type
1997 * will replace the previous value.
1998 *
1999 * @remark Calling this method with \p buffer set to NULL will clear the
2000 * configuration for \p cert_type.
2001 *
2002 * @remark The private key may require a password, which must be specified
2003 * with the `ssl.key.password` configuration property prior to
2004 * calling this function.
2005 *
2006 * @remark Private and public keys in PEM format may also be set with the
2007 * `ssl.key.pem` and `ssl.certificate.pem` configuration properties.
2008 */
2009RD_EXPORT rd_kafka_conf_res_t
2010rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf,
2011 rd_kafka_cert_type_t cert_type,
2012 rd_kafka_cert_enc_t cert_enc,
2013 const void *buffer, size_t size,
2014 char *errstr, size_t errstr_size);
2015
2016
2017/**
2018 * @brief Sets the application's opaque pointer that will be passed to callbacks
2019 */
2020RD_EXPORT
2021void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque);
2022
2023/**
2024 * @brief Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()
2025 */
2026RD_EXPORT
2027void *rd_kafka_opaque(const rd_kafka_t *rk);
2028
2029
2030
2031/**
2032 * @brief Sets the default topic configuration to use for automatically
2033 * subscribed topics (e.g., through pattern-matched topics).
2034 * The topic config object is not usable after this call.
2035 *
2036 * @warning Any topic configuration settings that have been set on the
2037 * global rd_kafka_conf_t object will be overwritten by this call
2038 * since the implicitly created default topic config object is
2039 * replaced by the user-supplied one.
2040 *
2041 * @deprecated Set default topic level configuration on the
2042 * global rd_kafka_conf_t object instead.
2043 */
2044RD_EXPORT
2045void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
2046 rd_kafka_topic_conf_t *tconf);
2047
2048
2049
2050/**
2051 * @brief Retrieve configuration value for property \p name.
2052 *
2053 * If \p dest is non-NULL the value will be written to \p dest with at
2054 * most \p dest_size.
2055 *
2056 * \p *dest_size is updated to the full length of the value, thus if
2057 * \p *dest_size initially is smaller than the full length the application
2058 * may reallocate \p dest to fit the returned \p *dest_size and try again.
2059 *
2060 * If \p dest is NULL only the full length of the value is returned.
2061 *
2062 * Fallthrough:
2063 * Topic-level configuration properties from the \c default_topic_conf
2064 * may be retrieved using this interface.
2065 *
2066 * @returns \p RD_KAFKA_CONF_OK if the property name matched, else
2067 * \p RD_KAFKA_CONF_UNKNOWN.
2068 */
2069RD_EXPORT
2070rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
2071 const char *name,
2072 char *dest, size_t *dest_size);
2073
2074
2075/**
2076 * @brief Retrieve topic configuration value for property \p name.
2077 *
2078 * @sa rd_kafka_conf_get()
2079 */
2080RD_EXPORT
2081rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
2082 const char *name,
2083 char *dest, size_t *dest_size);
2084
2085
2086/**
2087 * @brief Dump the configuration properties and values of \p conf to an array
2088 * with \"key\", \"value\" pairs.
2089 *
2090 * The number of entries in the array is returned in \p *cntp.
2091 *
2092 * The dump must be freed with `rd_kafka_conf_dump_free()`.
2093 */
2094RD_EXPORT
2095const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp);
2096
2097
2098/**
2099 * @brief Dump the topic configuration properties and values of \p conf
2100 * to an array with \"key\", \"value\" pairs.
2101 *
2102 * The number of entries in the array is returned in \p *cntp.
2103 *
2104 * The dump must be freed with `rd_kafka_conf_dump_free()`.
2105 */
2106RD_EXPORT
2107const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
2108 size_t *cntp);
2109
2110/**
2111 * @brief Frees a configuration dump returned from `rd_kafka_conf_dump()` or
2112 * `rd_kafka_topic_conf_dump().
2113 */
2114RD_EXPORT
2115void rd_kafka_conf_dump_free(const char **arr, size_t cnt);
2116
2117/**
2118 * @brief Prints a table to \p fp of all supported configuration properties,
2119 * their default values as well as a description.
2120 */
2121RD_EXPORT
2122void rd_kafka_conf_properties_show(FILE *fp);
2123
2124/**@}*/
2125
2126
2127/**
2128 * @name Topic configuration
2129 * @{
2130 *
2131 * @brief Topic configuration property interface
2132 *
2133 */
2134
2135
2136/**
2137 * @brief Create topic configuration object
2138 *
2139 * @sa Same semantics as for rd_kafka_conf_new().
2140 */
2141RD_EXPORT
2142rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);
2143
2144
2145/**
2146 * @brief Creates a copy/duplicate of topic configuration object \p conf.
2147 */
2148RD_EXPORT
2149rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
2150 *conf);
2151
2152/**
2153 * @brief Creates a copy/duplicate of \p rk 's default topic configuration
2154 * object.
2155 */
2156RD_EXPORT
2157rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup (rd_kafka_t *rk);
2158
2159
2160/**
2161 * @brief Destroys a topic conf object.
2162 */
2163RD_EXPORT
2164void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);
2165
2166
2167/**
2168 * @brief Sets a single rd_kafka_topic_conf_t value by property name.
2169 *
2170 * \p topic_conf should have been previously set up
2171 * with `rd_kafka_topic_conf_new()`.
2172 *
2173 * @returns rd_kafka_conf_res_t to indicate success or failure.
2174 */
2175RD_EXPORT
2176rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
2177 const char *name,
2178 const char *value,
2179 char *errstr, size_t errstr_size);
2180
2181/**
2182 * @brief Sets the application's opaque pointer that will be passed to all topic
2183 * callbacks as the \c rkt_opaque argument.
2184 */
2185RD_EXPORT
2186void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque);
2187
2188
2189/**
2190 * @brief \b Producer: Set partitioner callback in provided topic conf object.
2191 *
2192 * The partitioner may be called in any thread at any time,
2193 * it may be called multiple times for the same message/key.
2194 *
2195 * Partitioner function constraints:
2196 * - MUST NOT call any rd_kafka_*() functions except:
2197 * rd_kafka_topic_partition_available()
2198 * - MUST NOT block or execute for prolonged periods of time.
2199 * - MUST return a value between 0 and partition_cnt-1, or the
2200 * special \c RD_KAFKA_PARTITION_UA value if partitioning
2201 * could not be performed.
2202 */
2203RD_EXPORT
2204void
2205rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
2206 int32_t (*partitioner) (
2207 const rd_kafka_topic_t *rkt,
2208 const void *keydata,
2209 size_t keylen,
2210 int32_t partition_cnt,
2211 void *rkt_opaque,
2212 void *msg_opaque));
2213
2214
2215/**
2216 * @brief \b Producer: Set message queueing order comparator callback.
2217 *
2218 * The callback may be called in any thread at any time,
2219 * it may be called multiple times for the same message.
2220 *
2221 * Ordering comparator function constraints:
2222 * - MUST be stable sort (same input gives same output).
2223 * - MUST NOT call any rd_kafka_*() functions.
2224 * - MUST NOT block or execute for prolonged periods of time.
2225 *
2226 * The comparator shall compare the two messages and return:
2227 * - < 0 if message \p a should be inserted before message \p b.
2228 * - >=0 if message \p a should be inserted after message \p b.
2229 *
2230 * @remark Insert sorting will be used to enqueue the message in the
2231 * correct queue position, this comes at a cost of O(n).
2232 *
2233 * @remark If `queuing.strategy=fifo` new messages are enqueued to the
2234 * tail of the queue regardless of msg_order_cmp, but retried messages
2235 * are still affected by msg_order_cmp.
2236 *
2237 * @warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL,
2238 * DO NOT USE IN PRODUCTION.
2239 */
2240RD_EXPORT void
2241rd_kafka_topic_conf_set_msg_order_cmp (rd_kafka_topic_conf_t *topic_conf,
2242 int (*msg_order_cmp) (
2243 const rd_kafka_message_t *a,
2244 const rd_kafka_message_t *b));
2245
2246
2247/**
2248 * @brief Check if partition is available (has a leader broker).
2249 *
2250 * @returns 1 if the partition is available, else 0.
2251 *
2252 * @warning This function must only be called from inside a partitioner function
2253 */
2254RD_EXPORT
2255int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt,
2256 int32_t partition);
2257
2258
2259/*******************************************************************
2260 * *
2261 * Partitioners provided by rdkafka *
2262 * *
2263 *******************************************************************/
2264
2265/**
2266 * @brief Random partitioner.
2267 *
2268 * Will try not to return unavailable partitions.
2269 *
2270 * @returns a random partition between 0 and \p partition_cnt - 1.
2271 *
2272 */
2273RD_EXPORT
2274int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
2275 const void *key, size_t keylen,
2276 int32_t partition_cnt,
2277 void *opaque, void *msg_opaque);
2278
2279/**
2280 * @brief Consistent partitioner.
2281 *
2282 * Uses consistent hashing to map identical keys onto identical partitions.
2283 *
2284 * @returns a \"random\" partition between 0 and \p partition_cnt - 1 based on
2285 * the CRC value of the key
2286 */
2287RD_EXPORT
2288int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
2289 const void *key, size_t keylen,
2290 int32_t partition_cnt,
2291 void *opaque, void *msg_opaque);
2292
2293/**
2294 * @brief Consistent-Random partitioner.
2295 *
2296 * This is the default partitioner.
2297 * Uses consistent hashing to map identical keys onto identical partitions, and
2298 * messages without keys will be assigned via the random partitioner.
2299 *
2300 * @returns a \"random\" partition between 0 and \p partition_cnt - 1 based on
2301 * the CRC value of the key (if provided)
2302 */
2303RD_EXPORT
2304int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
2305 const void *key, size_t keylen,
2306 int32_t partition_cnt,
2307 void *opaque, void *msg_opaque);
2308
2309
2310/**
2311 * @brief Murmur2 partitioner (Java compatible).
2312 *
2313 * Uses consistent hashing to map identical keys onto identical partitions
2314 * using Java-compatible Murmur2 hashing.
2315 *
2316 * @returns a partition between 0 and \p partition_cnt - 1.
2317 */
2318RD_EXPORT
2319int32_t rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt,
2320 const void *key, size_t keylen,
2321 int32_t partition_cnt,
2322 void *rkt_opaque,
2323 void *msg_opaque);
2324
2325/**
2326 * @brief Consistent-Random Murmur2 partitioner (Java compatible).
2327 *
2328 * Uses consistent hashing to map identical keys onto identical partitions
2329 * using Java-compatible Murmur2 hashing.
2330 * Messages without keys will be assigned via the random partitioner.
2331 *
2332 * @returns a partition between 0 and \p partition_cnt - 1.
2333 */
2334RD_EXPORT
2335int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
2336 const void *key, size_t keylen,
2337 int32_t partition_cnt,
2338 void *rkt_opaque,
2339 void *msg_opaque);
2340
2341
2342/**@}*/
2343
2344
2345
2346/**
2347 * @name Main Kafka and Topic object handles
2348 * @{
2349 *
2350 *
2351 */
2352
2353
2354
2355
2356/**
2357 * @brief Creates a new Kafka handle and starts its operation according to the
2358 * specified \p type (\p RD_KAFKA_CONSUMER or \p RD_KAFKA_PRODUCER).
2359 *
2360 * \p conf is an optional struct created with `rd_kafka_conf_new()` that will
2361 * be used instead of the default configuration.
2362 * The \p conf object is freed by this function on success and must not be used
2363 * or destroyed by the application sub-sequently.
2364 * See `rd_kafka_conf_set()` et.al for more information.
2365 *
2366 * \p errstr must be a pointer to memory of at least size \p errstr_size where
2367 * `rd_kafka_new()` may write a human readable error message in case the
2368 * creation of a new handle fails. In which case the function returns NULL.
2369 *
2370 * @remark \b RD_KAFKA_CONSUMER: When a new \p RD_KAFKA_CONSUMER
2371 * rd_kafka_t handle is created it may either operate in the
2372 * legacy simple consumer mode using the rd_kafka_consume_start()
2373 * interface, or the High-level KafkaConsumer API.
2374 * @remark An application must only use one of these groups of APIs on a given
2375 * rd_kafka_t RD_KAFKA_CONSUMER handle.
2376
2377 *
2378 * @returns The Kafka handle on success or NULL on error (see \p errstr)
2379 *
2380 * @sa To destroy the Kafka handle, use rd_kafka_destroy().
2381 */
2382RD_EXPORT
2383rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
2384 char *errstr, size_t errstr_size);
2385
2386
2387/**
2388 * @brief Destroy Kafka handle.
2389 *
2390 * @remark This is a blocking operation.
2391 * @remark rd_kafka_consumer_close() will be called from this function
2392 * if the instance type is RD_KAFKA_CONSUMER, a \c group.id was
2393 * configured, and the rd_kafka_consumer_close() was not
2394 * explicitly called by the application. This in turn may
2395 * trigger consumer callbacks, such as rebalance_cb.
2396 * Use rd_kafka_destroy_flags() with
2397 * RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE to avoid this behaviour.
2398 *
2399 * @sa rd_kafka_destroy_flags()
2400 */
2401RD_EXPORT
2402void rd_kafka_destroy(rd_kafka_t *rk);
2403
2404
2405/**
2406 * @brief Destroy Kafka handle according to specified destroy flags
2407 *
2408 */
2409RD_EXPORT
2410void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags);
2411
2412/**
2413 * @brief Flags for rd_kafka_destroy_flags()
2414 */
2415
2416/*!
2417 * Don't call consumer_close() to leave group and commit final offsets.
2418 *
2419 * This also disables consumer callbacks to be called from rd_kafka_destroy*(),
2420 * such as rebalance_cb.
2421 *
2422 * The consumer group handler is still closed internally, but from an
2423 * application perspective none of the functionality from consumer_close()
2424 * is performed.
2425 */
2426#define RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE 0x8
2427
2428
2429
2430/**
2431 * @brief Returns Kafka handle name.
2432 */
2433RD_EXPORT
2434const char *rd_kafka_name(const rd_kafka_t *rk);
2435
2436
2437/**
2438 * @brief Returns Kafka handle type.
2439 */
2440RD_EXPORT
2441rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk);
2442
2443
2444/**
2445 * @brief Returns this client's broker-assigned group member id
2446 *
2447 * @remark This currently requires the high-level KafkaConsumer
2448 *
2449 * @returns An allocated string containing the current broker-assigned group
2450 * member id, or NULL if not available.
2451 * The application must free the string with \p free() or
2452 * rd_kafka_mem_free()
2453 */
2454RD_EXPORT
2455char *rd_kafka_memberid (const rd_kafka_t *rk);
2456
2457
2458
2459/**
2460 * @brief Returns the ClusterId as reported in broker metadata.
2461 *
2462 * @param timeout_ms If there is no cached value from metadata retrieval
2463 * then this specifies the maximum amount of time
2464 * (in milliseconds) the call will block waiting
2465 * for metadata to be retrieved.
2466 * Use 0 for non-blocking calls.
2467
2468 * @remark Requires broker version >=0.10.0 and api.version.request=true.
2469 *
2470 * @remark The application must free the returned pointer
2471 * using rd_kafka_mem_free().
2472 *
2473 * @returns a newly allocated string containing the ClusterId, or NULL
2474 * if no ClusterId could be retrieved in the allotted timespan.
2475 */
2476RD_EXPORT
2477char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms);
2478
2479
2480/**
2481 * @brief Returns the current ControllerId as reported in broker metadata.
2482 *
2483 * @param timeout_ms If there is no cached value from metadata retrieval
2484 * then this specifies the maximum amount of time
2485 * (in milliseconds) the call will block waiting
2486 * for metadata to be retrieved.
2487 * Use 0 for non-blocking calls.
2488
2489 * @remark Requires broker version >=0.10.0 and api.version.request=true.
2490 *
2491 * @returns the controller broker id (>= 0), or -1 if no ControllerId could be
2492 * retrieved in the allotted timespan.
2493 */
2494RD_EXPORT
2495int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms);
2496
2497
2498/**
2499 * @brief Creates a new topic handle for topic named \p topic.
2500 *
2501 * \p conf is an optional configuration for the topic created with
2502 * `rd_kafka_topic_conf_new()` that will be used instead of the default
2503 * topic configuration.
2504 * The \p conf object is freed by this function and must not be used or
2505 * destroyed by the application sub-sequently.
2506 * See `rd_kafka_topic_conf_set()` et.al for more information.
2507 *
2508 * Topic handles are refcounted internally and calling rd_kafka_topic_new()
2509 * again with the same topic name will return the previous topic handle
2510 * without updating the original handle's configuration.
2511 * Applications must eventually call rd_kafka_topic_destroy() for each
2512 * succesfull call to rd_kafka_topic_new() to clear up resources.
2513 *
2514 * @returns the new topic handle or NULL on error (use rd_kafka_errno2err()
2515 * to convert system \p errno to an rd_kafka_resp_err_t error code.
2516 *
2517 * @sa rd_kafka_topic_destroy()
2518 */
2519RD_EXPORT
2520rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
2521 rd_kafka_topic_conf_t *conf);
2522
2523
2524
2525/**
2526 * @brief Loose application's topic handle refcount as previously created
2527 * with `rd_kafka_topic_new()`.
2528 *
2529 * @remark Since topic objects are refcounted (both internally and for the app)
2530 * the topic object might not actually be destroyed by this call,
2531 * but the application must consider the object destroyed.
2532 */
2533RD_EXPORT
2534void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
2535
2536
2537/**
2538 * @brief Returns the topic name.
2539 */
2540RD_EXPORT
2541const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt);
2542
2543
2544/**
2545 * @brief Get the \p rkt_opaque pointer that was set in the topic configuration.
2546 */
2547RD_EXPORT
2548void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt);
2549
2550
2551/**
2552 * @brief Unassigned partition.
2553 *
2554 * The unassigned partition is used by the producer API for messages
2555 * that should be partitioned using the configured or default partitioner.
2556 */
2557#define RD_KAFKA_PARTITION_UA ((int32_t)-1)
2558
2559
2560/**
2561 * @brief Polls the provided kafka handle for events.
2562 *
2563 * Events will cause application provided callbacks to be called.
2564 *
2565 * The \p timeout_ms argument specifies the maximum amount of time
2566 * (in milliseconds) that the call will block waiting for events.
2567 * For non-blocking calls, provide 0 as \p timeout_ms.
2568 * To wait indefinately for an event, provide -1.
2569 *
2570 * @remark An application should make sure to call poll() at regular
2571 * intervals to serve any queued callbacks waiting to be called.
2572 * @remark If your producer doesn't have any callback set (in particular
2573 * via rd_kafka_conf_set_dr_msg_cb or rd_kafka_conf_set_error_cb)
2574 * you might chose not to call poll(), though this is not
2575 * recommended.
2576 *
2577 * Events:
2578 * - delivery report callbacks (if dr_cb/dr_msg_cb is configured) [producer]
2579 * - error callbacks (rd_kafka_conf_set_error_cb()) [all]
2580 * - stats callbacks (rd_kafka_conf_set_stats_cb()) [all]
2581 * - throttle callbacks (rd_kafka_conf_set_throttle_cb()) [all]
2582 * - OAUTHBEARER token refresh callbacks (rd_kafka_conf_set_oauthbearer_token_refresh_cb()) [all]
2583 *
2584 * @returns the number of events served.
2585 */
2586RD_EXPORT
2587int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
2588
2589
2590/**
2591 * @brief Cancels the current callback dispatcher (rd_kafka_poll(),
2592 * rd_kafka_consume_callback(), etc).
2593 *
2594 * A callback may use this to force an immediate return to the calling
2595 * code (caller of e.g. rd_kafka_poll()) without processing any further
2596 * events.
2597 *
2598 * @remark This function MUST ONLY be called from within a librdkafka callback.
2599 */
2600RD_EXPORT
2601void rd_kafka_yield (rd_kafka_t *rk);
2602
2603
2604
2605
2606/**
2607 * @brief Pause producing or consumption for the provided list of partitions.
2608 *
2609 * Success or error is returned per-partition \p err in the \p partitions list.
2610 *
2611 * @returns RD_KAFKA_RESP_ERR_NO_ERROR
2612 */
2613RD_EXPORT rd_kafka_resp_err_t
2614rd_kafka_pause_partitions (rd_kafka_t *rk,
2615 rd_kafka_topic_partition_list_t *partitions);
2616
2617
2618
2619/**
2620 * @brief Resume producing consumption for the provided list of partitions.
2621 *
2622 * Success or error is returned per-partition \p err in the \p partitions list.
2623 *
2624 * @returns RD_KAFKA_RESP_ERR_NO_ERROR
2625 */
2626RD_EXPORT rd_kafka_resp_err_t
2627rd_kafka_resume_partitions (rd_kafka_t *rk,
2628 rd_kafka_topic_partition_list_t *partitions);
2629
2630
2631
2632
2633/**
2634 * @brief Query broker for low (oldest/beginning) and high (newest/end) offsets
2635 * for partition.
2636 *
2637 * Offsets are returned in \p *low and \p *high respectively.
2638 *
2639 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on failure.
2640 */
2641RD_EXPORT rd_kafka_resp_err_t
2642rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
2643 const char *topic, int32_t partition,
2644 int64_t *low, int64_t *high, int timeout_ms);
2645
2646
2647/**
2648 * @brief Get last known low (oldest/beginning) and high (newest/end) offsets
2649 * for partition.
2650 *
2651 * The low offset is updated periodically (if statistics.interval.ms is set)
2652 * while the high offset is updated on each fetched message set from the broker.
2653 *
2654 * If there is no cached offset (either low or high, or both) then
2655 * RD_KAFKA_OFFSET_INVALID will be returned for the respective offset.
2656 *
2657 * Offsets are returned in \p *low and \p *high respectively.
2658 *
2659 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on failure.
2660 *
2661 * @remark Shall only be used with an active consumer instance.
2662 */
2663RD_EXPORT rd_kafka_resp_err_t
2664rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
2665 const char *topic, int32_t partition,
2666 int64_t *low, int64_t *high);
2667
2668
2669
2670/**
2671 * @brief Look up the offsets for the given partitions by timestamp.
2672 *
2673 * The returned offset for each partition is the earliest offset whose
2674 * timestamp is greater than or equal to the given timestamp in the
2675 * corresponding partition.
2676 *
2677 * The timestamps to query are represented as \c offset in \p offsets
2678 * on input, and \c offset will contain the offset on output.
2679 *
2680 * The function will block for at most \p timeout_ms milliseconds.
2681 *
2682 * @remark Duplicate Topic+Partitions are not supported.
2683 * @remark Per-partition errors may be returned in \c rd_kafka_topic_partition_t.err
2684 *
2685 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if offsets were be queried (do note
2686 * that per-partition errors might be set),
2687 * RD_KAFKA_RESP_ERR__TIMED_OUT if not all offsets could be fetched
2688 * within \p timeout_ms,
2689 * RD_KAFKA_RESP_ERR__INVALID_ARG if the \p offsets list is empty,
2690 * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if all partitions are unknown,
2691 * RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE if unable to query leaders
2692 * for the given partitions.
2693 */
2694RD_EXPORT rd_kafka_resp_err_t
2695rd_kafka_offsets_for_times (rd_kafka_t *rk,
2696 rd_kafka_topic_partition_list_t *offsets,
2697 int timeout_ms);
2698
2699
2700/**
2701 * @brief Free pointer returned by librdkafka
2702 *
2703 * This is typically an abstraction for the free(3) call and makes sure
2704 * the application can use the same memory allocator as librdkafka for
2705 * freeing pointers returned by librdkafka.
2706 *
2707 * In standard setups it is usually not necessary to use this interface
2708 * rather than the free(3) functione.
2709 *
2710 * \p rk must be set for memory returned by APIs that take an \c rk argument,
2711 * for other APIs pass NULL for \p rk.
2712 *
2713 * @remark rd_kafka_mem_free() must only be used for pointers returned by APIs
2714 * that explicitly mention using this function for freeing.
2715 */
2716RD_EXPORT
2717void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr);
2718
2719
2720/**@}*/
2721
2722
2723
2724
2725
2726/**
2727 * @name Queue API
2728 * @{
2729 *
2730 * Message queues allows the application to re-route consumed messages
2731 * from multiple topic+partitions into one single queue point.
2732 * This queue point containing messages from a number of topic+partitions
2733 * may then be served by a single rd_kafka_consume*_queue() call,
2734 * rather than one call per topic+partition combination.
2735 */
2736
2737
2738/**
2739 * @brief Create a new message queue.
2740 *
2741 * See rd_kafka_consume_start_queue(), rd_kafka_consume_queue(), et.al.
2742 */
2743RD_EXPORT
2744rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
2745
2746/**
2747 * Destroy a queue, purging all of its enqueued messages.
2748 */
2749RD_EXPORT
2750void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
2751
2752
2753/**
2754 * @returns a reference to the main librdkafka event queue.
2755 * This is the queue served by rd_kafka_poll().
2756 *
2757 * Use rd_kafka_queue_destroy() to loose the reference.
2758 */
2759RD_EXPORT
2760rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk);
2761
2762
2763/**
2764 * @returns a reference to the librdkafka consumer queue.
2765 * This is the queue served by rd_kafka_consumer_poll().
2766 *
2767 * Use rd_kafka_queue_destroy() to loose the reference.
2768 *
2769 * @remark rd_kafka_queue_destroy() MUST be called on this queue
2770 * prior to calling rd_kafka_consumer_close().
2771 */
2772RD_EXPORT
2773rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk);
2774
2775/**
2776 * @returns a reference to the partition's queue, or NULL if
2777 * partition is invalid.
2778 *
2779 * Use rd_kafka_queue_destroy() to loose the reference.
2780 *
2781 * @remark rd_kafka_queue_destroy() MUST be called on this queue
2782 *
2783 * @remark This function only works on consumers.
2784 */
2785RD_EXPORT
2786rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
2787 const char *topic,
2788 int32_t partition);
2789
2790/**
2791 * @returns a reference to the background thread queue, or NULL if the
2792 * background queue is not enabled.
2793 *
2794 * To enable the background thread queue set a generic event handler callback
2795 * with rd_kafka_conf_set_background_event_cb() on the client instance
2796 * configuration object (rd_kafka_conf_t).
2797 *
2798 * The background queue is polled and served by librdkafka and MUST NOT be
2799 * polled, forwarded, or otherwise managed by the application, it may only
2800 * be used as the destination queue passed to queue-enabled APIs, such as
2801 * the Admin API.
2802 *
2803 * The background thread queue provides the application with an automatically
2804 * polled queue that triggers the event callback in a background thread,
2805 * this background thread is completely managed by librdkafka.
2806 *
2807 * Use rd_kafka_queue_destroy() to loose the reference.
2808 *
2809 * @warning The background queue MUST NOT be read from (polled, consumed, etc),
2810 * or forwarded from.
2811 */
2812RD_EXPORT
2813rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk);
2814
2815
2816/**
2817 * @brief Forward/re-route queue \p src to \p dst.
2818 * If \p dst is \c NULL the forwarding is removed.
2819 *
2820 * The internal refcounts for both queues are increased.
2821 *
2822 * @remark Regardless of whether \p dst is NULL or not, after calling this
2823 * function, \p src will not forward it's fetch queue to the consumer
2824 * queue.
2825 */
2826RD_EXPORT
2827void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst);
2828
2829/**
2830 * @brief Forward librdkafka logs (and debug) to the specified queue
2831 * for serving with one of the ..poll() calls.
2832 *
2833 * This allows an application to serve log callbacks (\c log_cb)
2834 * in its thread of choice.
2835 *
2836 * @param rkqu Queue to forward logs to. If the value is NULL the logs
2837 * are forwarded to the main queue.
2838 *
2839 * @remark The configuration property \c log.queue MUST also be set to true.
2840 *
2841 * @remark librdkafka maintains its own reference to the provided queue.
2842 *
2843 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error.
2844 */
2845RD_EXPORT
2846rd_kafka_resp_err_t rd_kafka_set_log_queue (rd_kafka_t *rk,
2847 rd_kafka_queue_t *rkqu);
2848
2849
2850/**
2851 * @returns the current number of elements in queue.
2852 */
2853RD_EXPORT
2854size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu);
2855
2856
2857/**
2858 * @brief Enable IO event triggering for queue.
2859 *
2860 * To ease integration with IO based polling loops this API
2861 * allows an application to create a separate file-descriptor
2862 * that librdkafka will write \p payload (of size \p size) to
2863 * whenever a new element is enqueued on a previously empty queue.
2864 *
2865 * To remove event triggering call with \p fd = -1.
2866 *
2867 * librdkafka will maintain a copy of the \p payload.
2868 *
2869 * @remark IO and callback event triggering are mutually exclusive.
2870 * @remark When using forwarded queues the IO event must only be enabled
2871 * on the final forwarded-to (destination) queue.
2872 */
2873RD_EXPORT
2874void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
2875 const void *payload, size_t size);
2876
2877/**
2878 * @brief Enable callback event triggering for queue.
2879 *
2880 * The callback will be called from an internal librdkafka thread
2881 * when a new element is enqueued on a previously empty queue.
2882 *
2883 * To remove event triggering call with \p event_cb = NULL.
2884 *
2885 * @remark IO and callback event triggering are mutually exclusive.
2886 * @remark Since the callback may be triggered from internal librdkafka
2887 * threads, the application must not perform any pro-longed work in
2888 * the callback, or call any librdkafka APIs (for the same rd_kafka_t
2889 * handle).
2890 */
2891RD_EXPORT
2892void rd_kafka_queue_cb_event_enable (rd_kafka_queue_t *rkqu,
2893 void (*event_cb) (rd_kafka_t *rk,
2894 void *opaque),
2895 void *opaque);
2896
2897/**@}*/
2898
2899/**
2900 *
2901 * @name Simple Consumer API (legacy)
2902 * @{
2903 *
2904 */
2905
2906
2907#define RD_KAFKA_OFFSET_BEGINNING -2 /**< Start consuming from beginning of
2908 * kafka partition queue: oldest msg */
2909#define RD_KAFKA_OFFSET_END -1 /**< Start consuming from end of kafka
2910 * partition queue: next msg */
2911#define RD_KAFKA_OFFSET_STORED -1000 /**< Start consuming from offset retrieved
2912 * from offset store */
2913#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */
2914
2915
2916/** @cond NO_DOC */
2917#define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
2918/** @endcond */
2919
2920/**
2921 * @brief Start consuming \p CNT messages from topic's current end offset.
2922 *
2923 * That is, if current end offset is 12345 and \p CNT is 200, it will start
2924 * consuming from offset \c 12345-200 = \c 12145. */
2925#define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))
2926
2927/**
2928 * @brief Start consuming messages for topic \p rkt and \p partition
2929 * at offset \p offset which may either be an absolute \c (0..N)
2930 * or one of the logical offsets:
2931 * - RD_KAFKA_OFFSET_BEGINNING
2932 * - RD_KAFKA_OFFSET_END
2933 * - RD_KAFKA_OFFSET_STORED
2934 * - RD_KAFKA_OFFSET_TAIL
2935 *
2936 * rdkafka will attempt to keep \c queued.min.messages (config property)
2937 * messages in the local queue by repeatedly fetching batches of messages
2938 * from the broker until the threshold is reached.
2939 *
2940 * The application shall use one of the `rd_kafka_consume*()` functions
2941 * to consume messages from the local queue, each kafka message being
2942 * represented as a `rd_kafka_message_t *` object.
2943 *
2944 * `rd_kafka_consume_start()` must not be called multiple times for the same
2945 * topic and partition without stopping consumption first with
2946 * `rd_kafka_consume_stop()`.
2947 *
2948 * @returns 0 on success or -1 on error in which case errno is set accordingly:
2949 * - EBUSY - Conflicts with an existing or previous subscription
2950 * (RD_KAFKA_RESP_ERR__CONFLICT)
2951 * - EINVAL - Invalid offset, or incomplete configuration (lacking group.id)
2952 * (RD_KAFKA_RESP_ERR__INVALID_ARG)
2953 * - ESRCH - requested \p partition is invalid.
2954 * (RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
2955 * - ENOENT - topic is unknown in the Kafka cluster.
2956 * (RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
2957 *
2958 * Use `rd_kafka_errno2err()` to convert sytem \c errno to `rd_kafka_resp_err_t`
2959 */
2960RD_EXPORT
2961int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
2962 int64_t offset);
2963
2964/**
2965 * @brief Same as rd_kafka_consume_start() but re-routes incoming messages to
2966 * the provided queue \p rkqu (which must have been previously allocated
2967 * with `rd_kafka_queue_new()`.
2968 *
2969 * The application must use one of the `rd_kafka_consume_*_queue()` functions
2970 * to receive fetched messages.
2971 *
2972 * `rd_kafka_consume_start_queue()` must not be called multiple times for the
2973 * same topic and partition without stopping consumption first with
2974 * `rd_kafka_consume_stop()`.
2975 * `rd_kafka_consume_start()` and `rd_kafka_consume_start_queue()` must not
2976 * be combined for the same topic and partition.
2977 */
2978RD_EXPORT
2979int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
2980 int64_t offset, rd_kafka_queue_t *rkqu);
2981
2982/**
2983 * @brief Stop consuming messages for topic \p rkt and \p partition, purging
2984 * all messages currently in the local queue.
2985 *
2986 * NOTE: To enforce synchronisation this call will block until the internal
2987 * fetcher has terminated and offsets are committed to configured
2988 * storage method.
2989 *
2990 * The application needs to be stop all consumers before calling
2991 * `rd_kafka_destroy()` on the main object handle.
2992 *
2993 * @returns 0 on success or -1 on error (see `errno`).
2994 */
2995RD_EXPORT
2996int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
2997
2998
2999
3000/**
3001 * @brief Seek consumer for topic+partition to \p offset which is either an
3002 * absolute or logical offset.
3003 *
3004 * If \p timeout_ms is not 0 the call will wait this long for the
3005 * seek to be performed. If the timeout is reached the internal state
3006 * will be unknown and this function returns `RD_KAFKA_RESP_ERR__TIMED_OUT`.
3007 * If \p timeout_ms is 0 it will initiate the seek but return
3008 * immediately without any error reporting (e.g., async).
3009 *
3010 * This call triggers a fetch queue barrier flush.
3011 *
3012 * @returns `RD_KAFKA_RESP_ERR__NO_ERROR` on success else an error code.
3013 */
3014RD_EXPORT
3015rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
3016 int32_t partition,
3017 int64_t offset,
3018 int timeout_ms);
3019
3020
3021/**
3022 * @brief Consume a single message from topic \p rkt and \p partition
3023 *
3024 * \p timeout_ms is maximum amount of time to wait for a message to be received.
3025 * Consumer must have been previously started with `rd_kafka_consume_start()`.
3026 *
3027 * @returns a message object on success or \c NULL on error.
3028 * The message object must be destroyed with `rd_kafka_message_destroy()`
3029 * when the application is done with it.
3030 *
3031 * Errors (when returning NULL):
3032 * - ETIMEDOUT - \p timeout_ms was reached with no new messages fetched.
3033 * - ENOENT - \p rkt + \p partition is unknown.
3034 * (no prior `rd_kafka_consume_start()` call)
3035 *
3036 * NOTE: The returned message's \c ..->err must be checked for errors.
3037 * NOTE: \c ..->err \c == \c RD_KAFKA_RESP_ERR__PARTITION_EOF signals that the
3038 * end of the partition has been reached, which should typically not be
3039 * considered an error. The application should handle this case
3040 * (e.g., ignore).
3041 *
3042 * @remark on_consume() interceptors may be called from this function prior to
3043 * passing message to application.
3044 */
3045RD_EXPORT
3046rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
3047 int timeout_ms);
3048
3049
3050
3051/**
3052 * @brief Consume up to \p rkmessages_size from topic \p rkt and \p partition
3053 * putting a pointer to each message in the application provided
3054 * array \p rkmessages (of size \p rkmessages_size entries).
3055 *
3056 * `rd_kafka_consume_batch()` provides higher throughput performance
3057 * than `rd_kafka_consume()`.
3058 *
3059 * \p timeout_ms is the maximum amount of time to wait for all of
3060 * \p rkmessages_size messages to be put into \p rkmessages.
3061 * If no messages were available within the timeout period this function
3062 * returns 0 and \p rkmessages remains untouched.
3063 * This differs somewhat from `rd_kafka_consume()`.
3064 *
3065 * The message objects must be destroyed with `rd_kafka_message_destroy()`
3066 * when the application is done with it.
3067 *
3068 * @returns the number of rkmessages added in \p rkmessages,
3069 * or -1 on error (same error codes as for `rd_kafka_consume()`.
3070 *
3071 * @sa rd_kafka_consume()
3072 *
3073 * @remark on_consume() interceptors may be called from this function prior to
3074 * passing message to application.
3075 */
3076RD_EXPORT
3077ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
3078 int timeout_ms,
3079 rd_kafka_message_t **rkmessages,
3080 size_t rkmessages_size);
3081
3082
3083
3084/**
3085 * @brief Consumes messages from topic \p rkt and \p partition, calling
3086 * the provided callback for each consumed messsage.
3087 *
3088 * `rd_kafka_consume_callback()` provides higher throughput performance
3089 * than both `rd_kafka_consume()` and `rd_kafka_consume_batch()`.
3090 *
3091 * \p timeout_ms is the maximum amount of time to wait for one or more messages
3092 * to arrive.
3093 *
3094 * The provided \p consume_cb function is called for each message,
3095 * the application \b MUST \b NOT call `rd_kafka_message_destroy()` on the
3096 * provided \p rkmessage.
3097 *
3098 * The \p opaque argument is passed to the 'consume_cb' as \p opaque.
3099 *
3100 * @returns the number of messages processed or -1 on error.
3101 *
3102 * @sa rd_kafka_consume()
3103 *
3104 * @remark on_consume() interceptors may be called from this function prior to
3105 * passing message to application.
3106 */
3107RD_EXPORT
3108int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
3109 int timeout_ms,
3110 void (*consume_cb) (rd_kafka_message_t
3111 *rkmessage,
3112 void *opaque),
3113 void *opaque);
3114
3115
3116/**
3117 * @name Simple Consumer API (legacy): Queue consumers
3118 * @{
3119 *
3120 * The following `..._queue()` functions are analogue to the functions above
3121 * but reads messages from the provided queue \p rkqu instead.
3122 * \p rkqu must have been previously created with `rd_kafka_queue_new()`
3123 * and the topic consumer must have been started with
3124 * `rd_kafka_consume_start_queue()` utilising the the same queue.
3125 */
3126
3127/**
3128 * @brief Consume from queue
3129 *
3130 * @sa rd_kafka_consume()
3131 */
3132RD_EXPORT
3133rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
3134 int timeout_ms);
3135
3136/**
3137 * @brief Consume batch of messages from queue
3138 *
3139 * @sa rd_kafka_consume_batch()
3140 */
3141RD_EXPORT
3142ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
3143 int timeout_ms,
3144 rd_kafka_message_t **rkmessages,
3145 size_t rkmessages_size);
3146
3147/**
3148 * @brief Consume multiple messages from queue with callback
3149 *
3150 * @sa rd_kafka_consume_callback()
3151 */
3152RD_EXPORT
3153int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
3154 int timeout_ms,
3155 void (*consume_cb) (rd_kafka_message_t
3156 *rkmessage,
3157 void *opaque),
3158 void *opaque);
3159
3160
3161/**@}*/
3162
3163
3164
3165
3166/**
3167 * @name Simple Consumer API (legacy): Topic+partition offset store.
3168 * @{
3169 *
3170 * If \c auto.commit.enable is true the offset is stored automatically prior to
3171 * returning of the message(s) in each of the rd_kafka_consume*() functions
3172 * above.
3173 */
3174
3175
3176/**
3177 * @brief Store offset \p offset for topic \p rkt partition \p partition.
3178 *
3179 * The offset will be committed (written) to the offset store according
3180 * to \c `auto.commit.interval.ms` or manual offset-less commit()
3181 *
3182 * @remark \c `enable.auto.offset.store` must be set to "false" when using this API.
3183 *
3184 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error.
3185 */
3186RD_EXPORT
3187rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
3188 int32_t partition, int64_t offset);
3189
3190
3191/**
3192 * @brief Store offsets for next auto-commit for one or more partitions.
3193 *
3194 * The offset will be committed (written) to the offset store according
3195 * to \c `auto.commit.interval.ms` or manual offset-less commit().
3196 *
3197 * Per-partition success/error status propagated through each partition's
3198 * \c .err field.
3199 *
3200 * @remark \c `enable.auto.offset.store` must be set to "false" when using this API.
3201 *
3202 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or
3203 * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if none of the
3204 * offsets could be stored, or
3205 * RD_KAFKA_RESP_ERR__INVALID_ARG if \c enable.auto.offset.store is true.
3206 */
3207RD_EXPORT rd_kafka_resp_err_t
3208rd_kafka_offsets_store(rd_kafka_t *rk,
3209 rd_kafka_topic_partition_list_t *offsets);
3210/**@}*/
3211
3212
3213
3214
3215/**
3216 * @name KafkaConsumer (C)
3217 * @{
3218 * @brief High-level KafkaConsumer C API
3219 *
3220 *
3221 *
3222 */
3223
3224/**
3225 * @brief Subscribe to topic set using balanced consumer groups.
3226 *
3227 * Wildcard (regex) topics are supported:
3228 * any topic name in the \p topics list that is prefixed with \c \"^\" will
3229 * be regex-matched to the full list of topics in the cluster and matching
3230 * topics will be added to the subscription list.
3231 *
3232 * The full topic list is retrieved every \c topic.metadata.refresh.interval.ms
3233 * to pick up new or delete topics that match the subscription.
3234 * If there is any change to the matched topics the consumer will
3235 * immediately rejoin the group with the updated set of subscribed topics.
3236 *
3237 * Regex and full topic names can be mixed in \p topics.
3238 *
3239 * @remark Only the \c .topic field is used in the supplied \p topics list,
3240 * all other fields are ignored.
3241 *
3242 * @remark subscribe() is an asynchronous method which returns immediately:
3243 * background threads will (re)join the group, wait for group rebalance,
3244 * issue any registered rebalance_cb, assign() the assigned partitions,
3245 * and then start fetching messages. This cycle may take up to
3246 * \c session.timeout.ms * 2 or more to complete.
3247 *
3248 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or
3249 * RD_KAFKA_RESP_ERR__INVALID_ARG if list is empty, contains invalid
3250 * topics or regexes.
3251 */
3252RD_EXPORT rd_kafka_resp_err_t
3253rd_kafka_subscribe (rd_kafka_t *rk,
3254 const rd_kafka_topic_partition_list_t *topics);
3255
3256
3257/**
3258 * @brief Unsubscribe from the current subscription set.
3259 */
3260RD_EXPORT
3261rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);
3262
3263
3264/**
3265 * @brief Returns the current topic subscription
3266 *
3267 * @returns An error code on failure, otherwise \p topic is updated
3268 * to point to a newly allocated topic list (possibly empty).
3269 *
3270 * @remark The application is responsible for calling
3271 * rd_kafka_topic_partition_list_destroy on the returned list.
3272 */
3273RD_EXPORT rd_kafka_resp_err_t
3274rd_kafka_subscription (rd_kafka_t *rk,
3275 rd_kafka_topic_partition_list_t **topics);
3276
3277
3278
3279/**
3280 * @brief Poll the consumer for messages or events.
3281 *
3282 * Will block for at most \p timeout_ms milliseconds.
3283 *
3284 * @remark An application should make sure to call consumer_poll() at regular
3285 * intervals, even if no messages are expected, to serve any
3286 * queued callbacks waiting to be called. This is especially
3287 * important when a rebalance_cb has been registered as it needs
3288 * to be called and handled properly to synchronize internal
3289 * consumer state.
3290 *
3291 * @returns A message object which is a proper message if \p ->err is
3292 * RD_KAFKA_RESP_ERR_NO_ERROR, or an event or error for any other
3293 * value.
3294 *
3295 * @remark on_consume() interceptors may be called from this function prior to
3296 * passing message to application.
3297 *
3298 * @remark When subscribing to topics the application must call poll at
3299 * least every \c max.poll.interval.ms to remain a member of the
3300 * consumer group.
3301 *
3302 * Noteworthy errors returned in \c ->err:
3303 * - RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED - application failed to call
3304 * poll within `max.poll.interval.ms`.
3305 *
3306 * @sa rd_kafka_message_t
3307 */
3308RD_EXPORT
3309rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
3310
3311/**
3312 * @brief Close down the KafkaConsumer.
3313 *
3314 * @remark This call will block until the consumer has revoked its assignment,
3315 * calling the \c rebalance_cb if it is configured, committed offsets
3316 * to broker, and left the consumer group.
3317 * The maximum blocking time is roughly limited to session.timeout.ms.
3318 *
3319 * @returns An error code indicating if the consumer close was succesful
3320 * or not.
3321 *
3322 * @remark The application still needs to call rd_kafka_destroy() after
3323 * this call finishes to clean up the underlying handle resources.
3324 *
3325 */
3326RD_EXPORT
3327rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk);
3328
3329
3330
3331/**
3332 * @brief Atomic assignment of partitions to consume.
3333 *
3334 * The new \p partitions will replace the existing assignment.
3335 *
3336 * When used from a rebalance callback the application shall pass the
3337 * partition list passed to the callback (or a copy of it) (even if the list
3338 * is empty) rather than NULL to maintain internal join state.
3339
3340 * A zero-length \p partitions will treat the partitions as a valid,
3341 * albeit empty, assignment, and maintain internal state, while a \c NULL
3342 * value for \p partitions will reset and clear the internal state.
3343 */
3344RD_EXPORT rd_kafka_resp_err_t
3345rd_kafka_assign (rd_kafka_t *rk,
3346 const rd_kafka_topic_partition_list_t *partitions);
3347
3348/**
3349 * @brief Returns the current partition assignment
3350 *
3351 * @returns An error code on failure, otherwise \p partitions is updated
3352 * to point to a newly allocated partition list (possibly empty).
3353 *
3354 * @remark The application is responsible for calling
3355 * rd_kafka_topic_partition_list_destroy on the returned list.
3356 */
3357RD_EXPORT rd_kafka_resp_err_t
3358rd_kafka_assignment (rd_kafka_t *rk,
3359 rd_kafka_topic_partition_list_t **partitions);
3360
3361
3362
3363
3364/**
3365 * @brief Commit offsets on broker for the provided list of partitions.
3366 *
3367 * \p offsets should contain \c topic, \c partition, \c offset and possibly
3368 * \c metadata.
3369 * If \p offsets is NULL the current partition assignment will be used instead.
3370 *
3371 * If \p async is false this operation will block until the broker offset commit
3372 * is done, returning the resulting success or error code.
3373 *
3374 * If a rd_kafka_conf_set_offset_commit_cb() offset commit callback has been
3375 * configured the callback will be enqueued for a future call to
3376 * rd_kafka_poll(), rd_kafka_consumer_poll() or similar.
3377 */
3378RD_EXPORT rd_kafka_resp_err_t
3379rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
3380 int async);
3381
3382
3383/**
3384 * @brief Commit message's offset on broker for the message's partition.
3385 *
3386 * @sa rd_kafka_commit
3387 */
3388RD_EXPORT rd_kafka_resp_err_t
3389rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
3390 int async);
3391
3392
3393/**
3394 * @brief Commit offsets on broker for the provided list of partitions.
3395 *
3396 * See rd_kafka_commit for \p offsets semantics.
3397 *
3398 * The result of the offset commit will be posted on the provided \p rkqu queue.
3399 *
3400 * If the application uses one of the poll APIs (rd_kafka_poll(),
3401 * rd_kafka_consumer_poll(), rd_kafka_queue_poll(), ..) to serve the queue
3402 * the \p cb callback is required. \p opaque is passed to the callback.
3403 *
3404 * If using the event API the callback is ignored and the offset commit result
3405 * will be returned as an RD_KAFKA_EVENT_COMMIT event. The \p opaque
3406 * value will be available with rd_kafka_event_opaque()
3407 *
3408 * If \p rkqu is NULL a temporary queue will be created and the callback will
3409 * be served by this call.
3410 *
3411 * @sa rd_kafka_commit()
3412 * @sa rd_kafka_conf_set_offset_commit_cb()
3413 */
3414RD_EXPORT rd_kafka_resp_err_t
3415rd_kafka_commit_queue (rd_kafka_t *rk,
3416 const rd_kafka_topic_partition_list_t *offsets,
3417 rd_kafka_queue_t *rkqu,
3418 void (*cb) (rd_kafka_t *rk,
3419 rd_kafka_resp_err_t err,
3420 rd_kafka_topic_partition_list_t *offsets,
3421 void *opaque),
3422 void *opaque);
3423
3424
3425/**
3426 * @brief Retrieve committed offsets for topics+partitions.
3427 *
3428 * The \p offset field of each requested partition will either be set to
3429 * stored offset or to RD_KAFKA_OFFSET_INVALID in case there was no stored
3430 * offset for that partition.
3431 *
3432 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the
3433 * \p offset or \p err field of each \p partitions' element is filled
3434 * in with the stored offset, or a partition specific error.
3435 * Else returns an error code.
3436 */
3437RD_EXPORT rd_kafka_resp_err_t
3438rd_kafka_committed (rd_kafka_t *rk,
3439 rd_kafka_topic_partition_list_t *partitions,
3440 int timeout_ms);
3441
3442
3443
3444/**
3445 * @brief Retrieve current positions (offsets) for topics+partitions.
3446 *
3447 * The \p offset field of each requested partition will be set to the offset
3448 * of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was
3449 * no previous message.
3450 *
3451 * @remark In this context the last consumed message is the offset consumed
3452 * by the current librdkafka instance and, in case of rebalancing, not
3453 * necessarily the last message fetched from the partition.
3454 *
3455 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the
3456 * \p offset or \p err field of each \p partitions' element is filled
3457 * in with the stored offset, or a partition specific error.
3458 * Else returns an error code.
3459 */
3460RD_EXPORT rd_kafka_resp_err_t
3461rd_kafka_position (rd_kafka_t *rk,
3462 rd_kafka_topic_partition_list_t *partitions);
3463
3464
3465/**@}*/
3466
3467
3468
3469/**
3470 * @name Producer API
3471 * @{
3472 *
3473 *
3474 */
3475
3476
3477/**
3478 * @brief Producer message flags
3479 */
3480#define RD_KAFKA_MSG_F_FREE 0x1 /**< Delegate freeing of payload to rdkafka. */
3481#define RD_KAFKA_MSG_F_COPY 0x2 /**< rdkafka will make a copy of the payload. */
3482#define RD_KAFKA_MSG_F_BLOCK 0x4 /**< Block produce*() on message queue full.
3483 * WARNING: If a delivery report callback
3484 * is used the application MUST
3485 * call rd_kafka_poll() (or equiv.)
3486 * to make sure delivered messages
3487 * are drained from the internal
3488 * delivery report queue.
3489 * Failure to do so will result
3490 * in indefinately blocking on
3491 * the produce() call when the
3492 * message queue is full. */
3493#define RD_KAFKA_MSG_F_PARTITION 0x8 /**< produce_batch() will honor
3494 * per-message partition. */
3495
3496
3497
3498/**
3499 * @brief Produce and send a single message to broker.
3500 *
3501 * \p rkt is the target topic which must have been previously created with
3502 * `rd_kafka_topic_new()`.
3503 *
3504 * `rd_kafka_produce()` is an asynch non-blocking API.
3505 * See `rd_kafka_conf_set_dr_msg_cb` on how to setup a callback to be called
3506 * once the delivery status (success or failure) is known. The delivery report
3507 * is trigged by the application calling `rd_kafka_poll()` (at regular
3508 * intervals) or `rd_kafka_flush()` (at termination).
3509 *
3510 * Since producing is asynchronous, you should call `rd_kafka_flush()` before
3511 * you destroy the producer. Otherwise, any outstanding messages will be
3512 * silently discarded.
3513 *
3514 * When temporary errors occur, librdkafka automatically retries to produce the
3515 * messages. Retries are triggered after retry.backoff.ms and when the
3516 * leader broker for the given partition is available. Otherwise, librdkafka
3517 * falls back to polling the topic metadata to monitor when a new leader is
3518 * elected (see the topic.metadata.refresh.fast.interval.ms and
3519 * topic.metadata.refresh.interval.ms configurations) and then performs a
3520 * retry. A delivery error will occur if the message could not be produced
3521 * within message.timeout.ms.
3522 *
3523 * See the "Message reliability" chapter in INTRODUCTION.md for more
3524 * information.
3525 *
3526 * \p partition is the target partition, either:
3527 * - RD_KAFKA_PARTITION_UA (unassigned) for
3528 * automatic partitioning using the topic's partitioner function, or
3529 * - a fixed partition (0..N)
3530 *
3531 * \p msgflags is zero or more of the following flags OR:ed together:
3532 * RD_KAFKA_MSG_F_BLOCK - block \p produce*() call if
3533 * \p queue.buffering.max.messages or
3534 * \p queue.buffering.max.kbytes are exceeded.
3535 * Messages are considered in-queue from the point they
3536 * are accepted by produce() until their corresponding
3537 * delivery report callback/event returns.
3538 * It is thus a requirement to call
3539 * rd_kafka_poll() (or equiv.) from a separate
3540 * thread when F_BLOCK is used.
3541 * See WARNING on \c RD_KAFKA_MSG_F_BLOCK above.
3542 *
3543 * RD_KAFKA_MSG_F_FREE - rdkafka will free(3) \p payload when it is done
3544 * with it.
3545 * RD_KAFKA_MSG_F_COPY - the \p payload data will be copied and the
3546 * \p payload pointer will not be used by rdkafka
3547 * after the call returns.
3548 * RD_KAFKA_MSG_F_PARTITION - produce_batch() will honour per-message
3549 * partition, either set manually or by the
3550 * configured partitioner.
3551 *
3552 * .._F_FREE and .._F_COPY are mutually exclusive.
3553 *
3554 * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then
3555 * the memory associated with the payload is still the caller's
3556 * responsibility.
3557 *
3558 * \p payload is the message payload of size \p len bytes.
3559 *
3560 * \p key is an optional message key of size \p keylen bytes, if non-NULL it
3561 * will be passed to the topic partitioner as well as be sent with the
3562 * message to the broker and passed on to the consumer.
3563 *
3564 * \p msg_opaque is an optional application-provided per-message opaque
3565 * pointer that will provided in the delivery report callback (`dr_cb`) for
3566 * referencing this message.
3567 *
3568 * @remark on_send() and on_acknowledgement() interceptors may be called
3569 * from this function. on_acknowledgement() will only be called if the
3570 * message fails partitioning.
3571 *
3572 * @returns 0 on success or -1 on error in which case errno is set accordingly:
3573 * - ENOBUFS - maximum number of outstanding messages has been reached:
3574 * "queue.buffering.max.messages"
3575 * (RD_KAFKA_RESP_ERR__QUEUE_FULL)
3576 * - EMSGSIZE - message is larger than configured max size:
3577 * "messages.max.bytes".
3578 * (RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
3579 * - ESRCH - requested \p partition is unknown in the Kafka cluster.
3580 * (RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
3581 * - ENOENT - topic is unknown in the Kafka cluster.
3582 * (RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
3583 * - ECANCELED - fatal error has been raised on producer, see
3584 * rd_kafka_fatal_error().
3585 *
3586 * @sa Use rd_kafka_errno2err() to convert `errno` to rdkafka error code.
3587 */
3588RD_EXPORT
3589int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
3590 int msgflags,
3591 void *payload, size_t len,
3592 const void *key, size_t keylen,
3593 void *msg_opaque);
3594
3595
3596/**
3597 * @brief Produce and send a single message to broker.
3598 *
3599 * The message is defined by a va-arg list using \c rd_kafka_vtype_t
3600 * tag tuples which must be terminated with a single \c RD_KAFKA_V_END.
3601 *
3602 * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, else an error code.
3603 * \c RD_KAFKA_RESP_ERR__CONFLICT is returned if _V_HEADER and
3604 * _V_HEADERS are mixed.
3605 *
3606 * @sa rd_kafka_produce, RD_KAFKA_V_END
3607 */
3608RD_EXPORT
3609rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...);
3610
3611
3612/**
3613 * @brief Produce multiple messages.
3614 *
3615 * If partition is RD_KAFKA_PARTITION_UA the configured partitioner will
3616 * be run for each message (slower), otherwise the messages will be enqueued
3617 * to the specified partition directly (faster).
3618 *
3619 * The messages are provided in the array \p rkmessages of count \p message_cnt
3620 * elements.
3621 * The \p partition and \p msgflags are used for all provided messages.
3622 *
3623 * Honoured \p rkmessages[] fields are:
3624 * - payload,len Message payload and length
3625 * - key,key_len Optional message key
3626 * - _private Message opaque pointer (msg_opaque)
3627 * - err Will be set according to success or failure.
3628 * Application only needs to check for errors if
3629 * return value != \p message_cnt.
3630 *
3631 * @remark If \c RD_KAFKA_MSG_F_PARTITION is set in \p msgflags, the
3632 * \c .partition field of the \p rkmessages is used instead of
3633 * \p partition.
3634 *
3635 * @returns the number of messages succesfully enqueued for producing.
3636 *
3637 * @remark This interface does NOT support setting message headers on
3638 * the provided \p rkmessages.
3639 */
3640RD_EXPORT
3641int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
3642 int msgflags,
3643 rd_kafka_message_t *rkmessages, int message_cnt);
3644
3645
3646
3647
3648/**
3649 * @brief Wait until all outstanding produce requests, et.al, are completed.
3650 * This should typically be done prior to destroying a producer instance
3651 * to make sure all queued and in-flight produce requests are completed
3652 * before terminating.
3653 *
3654 * @remark This function will call rd_kafka_poll() and thus trigger callbacks.
3655 *
3656 * @returns RD_KAFKA_RESP_ERR__TIMED_OUT if \p timeout_ms was reached before all
3657 * outstanding requests were completed, else RD_KAFKA_RESP_ERR_NO_ERROR
3658 *
3659 * @sa rd_kafka_outq_len()
3660 */
3661RD_EXPORT
3662rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
3663
3664
3665
3666/**
3667 * @brief Purge messages currently handled by the producer instance.
3668 *
3669 * @param purge_flags tells which messages should be purged and how.
3670 *
3671 * The application will need to call rd_kafka_poll() or rd_kafka_flush()
3672 * afterwards to serve the delivery report callbacks of the purged messages.
3673 *
3674 * Messages purged from internal queues fail with the delivery report
3675 * error code set to RD_KAFKA_RESP_ERR__PURGE_QUEUE, while purged messages that
3676 * are in-flight to or from the broker will fail with the error code set to
3677 * RD_KAFKA_RESP_ERR__PURGE_INFLIGHT.
3678 *
3679 * @warning Purging messages that are in-flight to or from the broker
3680 * will ignore any sub-sequent acknowledgement for these messages
3681 * received from the broker, effectively making it impossible
3682 * for the application to know if the messages were successfully
3683 * produced or not. This may result in duplicate messages if the
3684 * application retries these messages at a later time.
3685 *
3686 * @remark This call may block for a short time while background thread
3687 * queues are purged.
3688 *
3689 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
3690 * RD_KAFKA_RESP_ERR__INVALID_ARG if the \p purge flags are invalid
3691 * or unknown,
3692 * RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if called on a non-producer
3693 * client instance.
3694 */
3695RD_EXPORT
3696rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags);
3697
3698
3699/**
3700 * @brief Flags for rd_kafka_purge()
3701 */
3702
3703/*!
3704 * Purge messages in internal queues.
3705 */
3706#define RD_KAFKA_PURGE_F_QUEUE 0x1
3707
3708/*!
3709 * Purge messages in-flight to or from the broker.
3710 * Purging these messages will void any future acknowledgements from the
3711 * broker, making it impossible for the application to know if these
3712 * messages were successfully delivered or not.
3713 * Retrying these messages may lead to duplicates.
3714 */
3715#define RD_KAFKA_PURGE_F_INFLIGHT 0x2
3716
3717
3718/*!
3719 * Don't wait for background thread queue purging to finish.
3720 */
3721#define RD_KAFKA_PURGE_F_NON_BLOCKING 0x4
3722
3723
3724/**@}*/
3725
3726
3727/**
3728* @name Metadata API
3729* @{
3730*
3731*
3732*/
3733
3734
3735/**
3736 * @brief Broker information
3737 */
3738typedef struct rd_kafka_metadata_broker {
3739 int32_t id; /**< Broker Id */
3740 char *host; /**< Broker hostname */
3741 int port; /**< Broker listening port */
3742} rd_kafka_metadata_broker_t;
3743
3744/**
3745 * @brief Partition information
3746 */
3747typedef struct rd_kafka_metadata_partition {
3748 int32_t id; /**< Partition Id */
3749 rd_kafka_resp_err_t err; /**< Partition error reported by broker */
3750 int32_t leader; /**< Leader broker */
3751 int replica_cnt; /**< Number of brokers in \p replicas */
3752 int32_t *replicas; /**< Replica brokers */
3753 int isr_cnt; /**< Number of ISR brokers in \p isrs */
3754 int32_t *isrs; /**< In-Sync-Replica brokers */
3755} rd_kafka_metadata_partition_t;
3756
3757/**
3758 * @brief Topic information
3759 */
3760typedef struct rd_kafka_metadata_topic {
3761 char *topic; /**< Topic name */
3762 int partition_cnt; /**< Number of partitions in \p partitions*/
3763 struct rd_kafka_metadata_partition *partitions; /**< Partitions */
3764 rd_kafka_resp_err_t err; /**< Topic error reported by broker */
3765} rd_kafka_metadata_topic_t;
3766
3767
3768/**
3769 * @brief Metadata container
3770 */
3771typedef struct rd_kafka_metadata {
3772 int broker_cnt; /**< Number of brokers in \p brokers */
3773 struct rd_kafka_metadata_broker *brokers; /**< Brokers */
3774
3775 int topic_cnt; /**< Number of topics in \p topics */
3776 struct rd_kafka_metadata_topic *topics; /**< Topics */
3777
3778 int32_t orig_broker_id; /**< Broker originating this metadata */
3779 char *orig_broker_name; /**< Name of originating broker */
3780} rd_kafka_metadata_t;
3781
3782
3783/**
3784 * @brief Request Metadata from broker.
3785 *
3786 * Parameters:
3787 * - \p all_topics if non-zero: request info about all topics in cluster,
3788 * if zero: only request info about locally known topics.
3789 * - \p only_rkt only request info about this topic
3790 * - \p metadatap pointer to hold metadata result.
3791 * The \p *metadatap pointer must be released
3792 * with rd_kafka_metadata_destroy().
3793 * - \p timeout_ms maximum response time before failing.
3794 *
3795 * Returns RD_KAFKA_RESP_ERR_NO_ERROR on success (in which case *metadatap)
3796 * will be set, else RD_KAFKA_RESP_ERR__TIMED_OUT on timeout or
3797 * other error code on error.
3798 */
3799RD_EXPORT
3800rd_kafka_resp_err_t
3801rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
3802 rd_kafka_topic_t *only_rkt,
3803 const struct rd_kafka_metadata **metadatap,
3804 int timeout_ms);
3805
3806/**
3807 * @brief Release metadata memory.
3808 */
3809RD_EXPORT
3810void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
3811
3812
3813/**@}*/
3814
3815
3816
3817/**
3818* @name Client group information
3819* @{
3820*
3821*
3822*/
3823
3824
3825/**
3826 * @brief Group member information
3827 *
3828 * For more information on \p member_metadata format, see
3829 * https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
3830 *
3831 */
3832struct rd_kafka_group_member_info {
3833 char *member_id; /**< Member id (generated by broker) */
3834 char *client_id; /**< Client's \p client.id */
3835 char *client_host; /**< Client's hostname */
3836 void *member_metadata; /**< Member metadata (binary),
3837 * format depends on \p protocol_type. */
3838 int member_metadata_size; /**< Member metadata size in bytes */
3839 void *member_assignment; /**< Member assignment (binary),
3840 * format depends on \p protocol_type. */
3841 int member_assignment_size; /**< Member assignment size in bytes */
3842};
3843
3844/**
3845 * @brief Group information
3846 */
3847struct rd_kafka_group_info {
3848 struct rd_kafka_metadata_broker broker; /**< Originating broker info */
3849 char *group; /**< Group name */
3850 rd_kafka_resp_err_t err; /**< Broker-originated error */
3851 char *state; /**< Group state */
3852 char *protocol_type; /**< Group protocol type */
3853 char *protocol; /**< Group protocol */
3854 struct rd_kafka_group_member_info *members; /**< Group members */
3855 int member_cnt; /**< Group member count */
3856};
3857
3858/**
3859 * @brief List of groups
3860 *
3861 * @sa rd_kafka_group_list_destroy() to release list memory.
3862 */
3863struct rd_kafka_group_list {
3864 struct rd_kafka_group_info *groups; /**< Groups */
3865 int group_cnt; /**< Group count */
3866};
3867
3868
3869/**
3870 * @brief List and describe client groups in cluster.
3871 *
3872 * \p group is an optional group name to describe, otherwise (\p NULL) all
3873 * groups are returned.
3874 *
3875 * \p timeout_ms is the (approximate) maximum time to wait for response
3876 * from brokers and must be a positive value.
3877 *
3878 * @returns \c RD_KAFKA_RESP_ERR__NO_ERROR on success and \p grplistp is
3879 * updated to point to a newly allocated list of groups.
3880 * \c RD_KAFKA_RESP_ERR__PARTIAL if not all brokers responded
3881 * in time but at least one group is returned in \p grplistlp.
3882 * \c RD_KAFKA_RESP_ERR__TIMED_OUT if no groups were returned in the
3883 * given timeframe but not all brokers have yet responded, or
3884 * if the list of brokers in the cluster could not be obtained within
3885 * the given timeframe.
3886 * \c RD_KAFKA_RESP_ERR__TRANSPORT if no brokers were found.
3887 * Other error codes may also be returned from the request layer.
3888 *
3889 * The \p grplistp remains untouched if any error code is returned,
3890 * with the exception of RD_KAFKA_RESP_ERR__PARTIAL which behaves
3891 * as RD_KAFKA_RESP_ERR__NO_ERROR (success) but with an incomplete
3892 * group list.
3893 *
3894 * @sa Use rd_kafka_group_list_destroy() to release list memory.
3895 */
3896RD_EXPORT
3897rd_kafka_resp_err_t
3898rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
3899 const struct rd_kafka_group_list **grplistp,
3900 int timeout_ms);
3901
3902/**
3903 * @brief Release list memory
3904 */
3905RD_EXPORT
3906void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);
3907
3908
3909/**@}*/
3910
3911
3912
3913/**
3914 * @name Miscellaneous APIs
3915 * @{
3916 *
3917 */
3918
3919
3920/**
3921 * @brief Adds one or more brokers to the kafka handle's list of initial
3922 * bootstrap brokers.
3923 *
3924 * Additional brokers will be discovered automatically as soon as rdkafka
3925 * connects to a broker by querying the broker metadata.
3926 *
3927 * If a broker name resolves to multiple addresses (and possibly
3928 * address families) all will be used for connection attempts in
3929 * round-robin fashion.
3930 *
3931 * \p brokerlist is a ,-separated list of brokers in the format:
3932 * \c \<broker1\>,\<broker2\>,..
3933 * Where each broker is in either the host or URL based format:
3934 * \c \<host\>[:\<port\>]
3935 * \c \<proto\>://\<host\>[:port]
3936 * \c \<proto\> is either \c PLAINTEXT, \c SSL, \c SASL, \c SASL_PLAINTEXT
3937 * The two formats can be mixed but ultimately the value of the
3938 * `security.protocol` config property decides what brokers are allowed.
3939 *
3940 * Example:
3941 * brokerlist = "broker1:10000,broker2"
3942 * brokerlist = "SSL://broker3:9000,ssl://broker2"
3943 *
3944 * @returns the number of brokers successfully added.
3945 *
3946 * @remark Brokers may also be defined with the \c metadata.broker.list or
3947 * \c bootstrap.servers configuration property (preferred method).
3948 */
3949RD_EXPORT
3950int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
3951
3952
3953
3954
3955/**
3956 * @brief Set logger function.
3957 *
3958 * The default is to print to stderr, but a syslog logger is also available,
3959 * see rd_kafka_log_(print|syslog) for the builtin alternatives.
3960 * Alternatively the application may provide its own logger callback.
3961 * Or pass 'func' as NULL to disable logging.
3962 *
3963 * @deprecated Use rd_kafka_conf_set_log_cb()
3964 *
3965 * @remark \p rk may be passed as NULL in the callback.
3966 */
3967RD_EXPORT RD_DEPRECATED
3968void rd_kafka_set_logger(rd_kafka_t *rk,
3969 void (*func) (const rd_kafka_t *rk, int level,
3970 const char *fac, const char *buf));
3971
3972
3973/**
3974 * @brief Specifies the maximum logging level emitted by
3975 * internal kafka logging and debugging.
3976 *
3977 * @deprecated Set the \c "log_level" configuration property instead.
3978 *
3979 * @remark If the \p \"debug\" configuration property is set the log level is
3980 * automatically adjusted to \c LOG_DEBUG (7).
3981 */
3982RD_EXPORT
3983void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
3984
3985
3986/**
3987 * @brief Builtin (default) log sink: print to stderr
3988 */
3989RD_EXPORT
3990void rd_kafka_log_print(const rd_kafka_t *rk, int level,
3991 const char *fac, const char *buf);
3992
3993
3994/**
3995 * @brief Builtin log sink: print to syslog.
3996 */
3997RD_EXPORT
3998void rd_kafka_log_syslog(const rd_kafka_t *rk, int level,
3999 const char *fac, const char *buf);
4000
4001
4002/**
4003 * @brief Returns the current out queue length.
4004 *
4005 * The out queue length is the sum of:
4006 * - number of messages waiting to be sent to, or acknowledged by,
4007 * the broker.
4008 * - number of delivery reports (e.g., dr_msg_cb) waiting to be served
4009 * by rd_kafka_poll() or rd_kafka_flush().
4010 * - number of callbacks (e.g., error_cb, stats_cb, etc) waiting to be
4011 * served by rd_kafka_poll(), rd_kafka_consumer_poll() or rd_kafka_flush().
4012 * - number of events waiting to be served by background_event_cb() in
4013 * the background queue (see rd_kafka_conf_set_background_event_cb).
4014 *
4015 * An application should wait for the return value of this function to reach
4016 * zero before terminating to make sure outstanding messages,
4017 * requests (such as offset commits), callbacks and events are fully processed.
4018 * See rd_kafka_flush().
4019 *
4020 * @returns number of messages and events waiting in queues.
4021 *
4022 * @sa rd_kafka_flush()
4023 */
4024RD_EXPORT
4025int rd_kafka_outq_len(rd_kafka_t *rk);
4026
4027
4028
4029/**
4030 * @brief Dumps rdkafka's internal state for handle \p rk to stream \p fp
4031 *
4032 * This is only useful for debugging rdkafka, showing state and statistics
4033 * for brokers, topics, partitions, etc.
4034 */
4035RD_EXPORT
4036void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
4037
4038
4039
4040/**
4041 * @brief Retrieve the current number of threads in use by librdkafka.
4042 *
4043 * Used by regression tests.
4044 */
4045RD_EXPORT
4046int rd_kafka_thread_cnt(void);
4047
4048
4049/**
4050 * @brief Wait for all rd_kafka_t objects to be destroyed.
4051 *
4052 * Returns 0 if all kafka objects are now destroyed, or -1 if the
4053 * timeout was reached.
4054 *
4055 * @remark This function is deprecated.
4056 */
4057RD_EXPORT
4058int rd_kafka_wait_destroyed(int timeout_ms);
4059
4060
4061/**
4062 * @brief Run librdkafka's built-in unit-tests.
4063 *
4064 * @returns the number of failures, or 0 if all tests passed.
4065 */
4066RD_EXPORT
4067int rd_kafka_unittest (void);
4068
4069
4070/**@}*/
4071
4072
4073
4074
4075/**
4076 * @name Experimental APIs
4077 * @{
4078 */
4079
4080/**
4081 * @brief Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer's
4082 * queue (rd_kafka_consumer_poll()).
4083 *
4084 * @warning It is not permitted to call rd_kafka_poll() after directing the
4085 * main queue with rd_kafka_poll_set_consumer().
4086 */
4087RD_EXPORT
4088rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk);
4089
4090
4091/**@}*/
4092
4093/**
4094 * @name Event interface
4095 *
4096 * @brief The event API provides an alternative pollable non-callback interface
4097 * to librdkafka's message and event queues.
4098 *
4099 * @{
4100 */
4101
4102
4103/**
4104 * @brief Event types
4105 */
4106typedef int rd_kafka_event_type_t;
4107#define RD_KAFKA_EVENT_NONE 0x0
4108#define RD_KAFKA_EVENT_DR 0x1 /**< Producer Delivery report batch */
4109#define RD_KAFKA_EVENT_FETCH 0x2 /**< Fetched message (consumer) */
4110#define RD_KAFKA_EVENT_LOG 0x4 /**< Log message */
4111#define RD_KAFKA_EVENT_ERROR 0x8 /**< Error */
4112#define RD_KAFKA_EVENT_REBALANCE 0x10 /**< Group rebalance (consumer) */
4113#define RD_KAFKA_EVENT_OFFSET_COMMIT 0x20 /**< Offset commit result */
4114#define RD_KAFKA_EVENT_STATS 0x40 /**< Stats */
4115#define RD_KAFKA_EVENT_CREATETOPICS_RESULT 100 /**< CreateTopics_result_t */
4116#define RD_KAFKA_EVENT_DELETETOPICS_RESULT 101 /**< DeleteTopics_result_t */
4117#define RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT 102 /**< CreatePartitions_result_t */
4118#define RD_KAFKA_EVENT_ALTERCONFIGS_RESULT 103 /**< AlterConfigs_result_t */
4119#define RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT 104 /**< DescribeConfigs_result_t */
4120#define RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH 0x100 /**< SASL/OAUTHBEARER
4121 token needs to be
4122 refreshed */
4123
4124
4125/**
4126 * @returns the event type for the given event.
4127 *
4128 * @remark As a convenience it is okay to pass \p rkev as NULL in which case
4129 * RD_KAFKA_EVENT_NONE is returned.
4130 */
4131RD_EXPORT
4132rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev);
4133
4134/**
4135 * @returns the event type's name for the given event.
4136 *
4137 * @remark As a convenience it is okay to pass \p rkev as NULL in which case
4138 * the name for RD_KAFKA_EVENT_NONE is returned.
4139 */
4140RD_EXPORT
4141const char *rd_kafka_event_name (const rd_kafka_event_t *rkev);
4142
4143
4144/**
4145 * @brief Destroy an event.
4146 *
4147 * @remark Any references to this event, such as extracted messages,
4148 * will not be usable after this call.
4149 *
4150 * @remark As a convenience it is okay to pass \p rkev as NULL in which case
4151 * no action is performed.
4152 */
4153RD_EXPORT
4154void rd_kafka_event_destroy (rd_kafka_event_t *rkev);
4155
4156
4157/**
4158 * @returns the next message from an event.
4159 *
4160 * Call repeatedly until it returns NULL.
4161 *
4162 * Event types:
4163 * - RD_KAFKA_EVENT_FETCH (1 message)
4164 * - RD_KAFKA_EVENT_DR (>=1 message(s))
4165 *
4166 * @remark The returned message(s) MUST NOT be
4167 * freed with rd_kafka_message_destroy().
4168 *
4169 * @remark on_consume() interceptor may be called
4170 * from this function prior to passing message to application.
4171 */
4172RD_EXPORT
4173const rd_kafka_message_t *rd_kafka_event_message_next (rd_kafka_event_t *rkev);
4174
4175
4176/**
4177 * @brief Extacts \p size message(s) from the event into the
4178 * pre-allocated array \p rkmessages.
4179 *
4180 * Event types:
4181 * - RD_KAFKA_EVENT_FETCH (1 message)
4182 * - RD_KAFKA_EVENT_DR (>=1 message(s))
4183 *
4184 * @returns the number of messages extracted.
4185 *
4186 * @remark on_consume() interceptor may be called
4187 * from this function prior to passing message to application.
4188 */
4189RD_EXPORT
4190size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
4191 const rd_kafka_message_t **rkmessages,
4192 size_t size);
4193
4194
4195/**
4196 * @returns the number of remaining messages in the event.
4197 *
4198 * Event types:
4199 * - RD_KAFKA_EVENT_FETCH (1 message)
4200 * - RD_KAFKA_EVENT_DR (>=1 message(s))
4201 */
4202RD_EXPORT
4203size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev);
4204
4205
4206/**
4207 * @returns the associated configuration string for the event, or NULL
4208 * if the configuration property is not set or if
4209 * not applicable for the given event type.
4210 *
4211 * The returned memory is read-only and its lifetime is the same as the
4212 * event object.
4213 *
4214 * Event types:
4215 * - RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: value of sasl.oauthbearer.config
4216 */
4217RD_EXPORT
4218const char *rd_kafka_event_config_string (rd_kafka_event_t *rkev);
4219
4220
4221/**
4222 * @returns the error code for the event.
4223 *
4224 * Use rd_kafka_event_error_is_fatal() to detect if this is a fatal error.
4225 *
4226 * Event types:
4227 * - all
4228 */
4229RD_EXPORT
4230rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev);
4231
4232
4233/**
4234 * @returns the error string (if any).
4235 * An application should check that rd_kafka_event_error() returns
4236 * non-zero before calling this function.
4237 *
4238 * Event types:
4239 * - all
4240 */
4241RD_EXPORT
4242const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev);
4243
4244
4245/**
4246 * @returns 1 if the error is a fatal error, else 0.
4247 *
4248 * Event types:
4249 * - RD_KAFKA_EVENT_ERROR
4250 *
4251 * @sa rd_kafka_fatal_error()
4252 */
4253RD_EXPORT
4254int rd_kafka_event_error_is_fatal (rd_kafka_event_t *rkev);
4255
4256
4257/**
4258 * @returns the user opaque (if any)
4259 *
4260 * Event types:
4261 * - RD_KAFKA_EVENT_OFFSET_COMMIT
4262 * - RD_KAFKA_EVENT_CREATETOPICS_RESULT
4263 * - RD_KAFKA_EVENT_DELETETOPICS_RESULT
4264 * - RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT
4265 * - RD_KAFKA_EVENT_ALTERCONFIGS_RESULT
4266 * - RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
4267 */
4268RD_EXPORT
4269void *rd_kafka_event_opaque (rd_kafka_event_t *rkev);
4270
4271
4272/**
4273 * @brief Extract log message from the event.
4274 *
4275 * Event types:
4276 * - RD_KAFKA_EVENT_LOG
4277 *
4278 * @returns 0 on success or -1 if unsupported event type.
4279 */
4280RD_EXPORT
4281int rd_kafka_event_log (rd_kafka_event_t *rkev,
4282 const char **fac, const char **str, int *level);
4283
4284
4285/**
4286 * @brief Extract stats from the event.
4287 *
4288 * Event types:
4289 * - RD_KAFKA_EVENT_STATS
4290 *
4291 * @returns stats json string.
4292 *
4293 * @remark the returned string will be freed automatically along with the event object
4294 *
4295 */
4296RD_EXPORT
4297const char *rd_kafka_event_stats (rd_kafka_event_t *rkev);
4298
4299
4300/**
4301 * @returns the topic partition list from the event.
4302 *
4303 * @remark The list MUST NOT be freed with rd_kafka_topic_partition_list_destroy()
4304 *
4305 * Event types:
4306 * - RD_KAFKA_EVENT_REBALANCE
4307 * - RD_KAFKA_EVENT_OFFSET_COMMIT
4308 */
4309RD_EXPORT rd_kafka_topic_partition_list_t *
4310rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev);
4311
4312
4313/**
4314 * @returns a newly allocated topic_partition container, if applicable for the event type,
4315 * else NULL.
4316 *
4317 * @remark The returned pointer MUST be freed with rd_kafka_topic_partition_destroy().
4318 *
4319 * Event types:
4320 * RD_KAFKA_EVENT_ERROR (for partition level errors)
4321 */
4322RD_EXPORT rd_kafka_topic_partition_t *
4323rd_kafka_event_topic_partition (rd_kafka_event_t *rkev);
4324
4325
4326
4327typedef rd_kafka_event_t rd_kafka_CreateTopics_result_t;
4328typedef rd_kafka_event_t rd_kafka_DeleteTopics_result_t;
4329typedef rd_kafka_event_t rd_kafka_CreatePartitions_result_t;
4330typedef rd_kafka_event_t rd_kafka_AlterConfigs_result_t;
4331typedef rd_kafka_event_t rd_kafka_DescribeConfigs_result_t;
4332
4333/**
4334 * @returns the result of a CreateTopics request, or NULL if event is of
4335 * different type.
4336 *
4337 * Event types:
4338 * RD_KAFKA_EVENT_CREATETOPICS_RESULT
4339 */
4340RD_EXPORT const rd_kafka_CreateTopics_result_t *
4341rd_kafka_event_CreateTopics_result (rd_kafka_event_t *rkev);
4342
4343/**
4344 * @returns the result of a DeleteTopics request, or NULL if event is of
4345 * different type.
4346 *
4347 * Event types:
4348 * RD_KAFKA_EVENT_DELETETOPICS_RESULT
4349 */
4350RD_EXPORT const rd_kafka_DeleteTopics_result_t *
4351rd_kafka_event_DeleteTopics_result (rd_kafka_event_t *rkev);
4352
4353/**
4354 * @returns the result of a CreatePartitions request, or NULL if event is of
4355 * different type.
4356 *
4357 * Event types:
4358 * RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT
4359 */
4360RD_EXPORT const rd_kafka_CreatePartitions_result_t *
4361rd_kafka_event_CreatePartitions_result (rd_kafka_event_t *rkev);
4362
4363/**
4364 * @returns the result of a AlterConfigs request, or NULL if event is of
4365 * different type.
4366 *
4367 * Event types:
4368 * RD_KAFKA_EVENT_ALTERCONFIGS_RESULT
4369 */
4370RD_EXPORT const rd_kafka_AlterConfigs_result_t *
4371rd_kafka_event_AlterConfigs_result (rd_kafka_event_t *rkev);
4372
4373/**
4374 * @returns the result of a DescribeConfigs request, or NULL if event is of
4375 * different type.
4376 *
4377 * Event types:
4378 * RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
4379 */
4380RD_EXPORT const rd_kafka_DescribeConfigs_result_t *
4381rd_kafka_event_DescribeConfigs_result (rd_kafka_event_t *rkev);
4382
4383
4384
4385
4386/**
4387 * @brief Poll a queue for an event for max \p timeout_ms.
4388 *
4389 * @returns an event, or NULL.
4390 *
4391 * @remark Use rd_kafka_event_destroy() to free the event.
4392 *
4393 * @sa rd_kafka_conf_set_background_event_cb()
4394 */
4395RD_EXPORT
4396rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms);
4397
4398/**
4399* @brief Poll a queue for events served through callbacks for max \p timeout_ms.
4400*
4401* @returns the number of events served.
4402*
4403* @remark This API must only be used for queues with callbacks registered
4404* for all expected event types. E.g., not a message queue.
4405*
4406* @remark Also see rd_kafka_conf_set_background_event_cb() for triggering
4407* event callbacks from a librdkafka-managed background thread.
4408*
4409* @sa rd_kafka_conf_set_background_event_cb()
4410*/
4411RD_EXPORT
4412int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms);
4413
4414
4415/**@}*/
4416
4417
4418/**
4419 * @name Plugin interface
4420 *
4421 * @brief A plugin interface that allows external runtime-loaded libraries
4422 * to integrate with a client instance without modifications to
4423 * the application code.
4424 *
4425 * Plugins are loaded when referenced through the `plugin.library.paths`
4426 * configuration property and operates on the \c rd_kafka_conf_t
4427 * object prior \c rd_kafka_t instance creation.
4428 *
4429 * @warning Plugins require the application to link librdkafka dynamically
4430 * and not statically. Failure to do so will lead to missing symbols
4431 * or finding symbols in another librdkafka library than the
4432 * application was linked with.
4433 */
4434
4435
4436/**
4437 * @brief Plugin's configuration initializer method called each time the
4438 * library is referenced from configuration (even if previously loaded by
4439 * another client instance).
4440 *
4441 * @remark This method MUST be implemented by plugins and have the symbol name
4442 * \c conf_init
4443 *
4444 * @param conf Configuration set up to this point.
4445 * @param plug_opaquep Plugin can set this pointer to a per-configuration
4446 * opaque pointer.
4447 * @param errstr String buffer of size \p errstr_size where plugin must write
4448 * a human readable error string in the case the initializer
4449 * fails (returns non-zero).
4450 *
4451 * @remark A plugin may add an on_conf_destroy() interceptor to clean up
4452 * plugin-specific resources created in the plugin's conf_init() method.
4453 *
4454 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error.
4455 */
4456typedef rd_kafka_resp_err_t
4457(rd_kafka_plugin_f_conf_init_t) (rd_kafka_conf_t *conf,
4458 void **plug_opaquep,
4459 char *errstr, size_t errstr_size);
4460
4461/**@}*/
4462
4463
4464
4465/**
4466 * @name Interceptors
4467 *
4468 * @{
4469 *
4470 * @brief A callback interface that allows message interception for both
4471 * producer and consumer data pipelines.
4472 *
4473 * Except for the on_new(), on_conf_set(), on_conf_dup() and on_conf_destroy()
4474 * interceptors, interceptors are added to the
4475 * newly created rd_kafka_t client instance. These interceptors MUST only
4476 * be added from on_new() and MUST NOT be added after rd_kafka_new() returns.
4477 *
4478 * The on_new(), on_conf_set(), on_conf_dup() and on_conf_destroy() interceptors
4479 * are added to the configuration object which is later passed to
4480 * rd_kafka_new() where on_new() is called to allow addition of
4481 * other interceptors.
4482 *
4483 * Each interceptor reference consists of a display name (ic_name),
4484 * a callback function, and an application-specified opaque value that is
4485 * passed as-is to the callback.
4486 * The ic_name must be unique for the interceptor implementation and is used
4487 * to reject duplicate interceptor methods.
4488 *
4489 * Any number of interceptors can be added and they are called in the order
4490 * they were added, unless otherwise noted.
4491 * The list of registered interceptor methods are referred to as
4492 * interceptor chains.
4493 *
4494 * @remark Contrary to the Java client the librdkafka interceptor interface
4495 * does not support message key and value modification.
4496 * Message mutability is discouraged in the Java client and the
4497 * combination of serializers and headers cover most use-cases.
4498 *
4499 * @remark Interceptors are NOT copied to the new configuration on
4500 * rd_kafka_conf_dup() since it would be hard for interceptors to
4501 * track usage of the interceptor's opaque value.
4502 * An interceptor should rely on the plugin, which will be copied
4503 * in rd_kafka_conf_conf_dup(), to set up the initial interceptors.
4504 * An interceptor should implement the on_conf_dup() method
4505 * to manually set up its internal configuration on the newly created
4506 * configuration object that is being copied-to based on the
4507 * interceptor-specific configuration properties.
4508 * conf_dup() should thus be treated the same as conf_init().
4509 *
4510 * @remark Interceptors are keyed by the interceptor type (on_..()), the
4511 * interceptor name (ic_name) and the interceptor method function.
4512 * Duplicates are not allowed and the .._add_on_..() method will
4513 * return RD_KAFKA_RESP_ERR__CONFLICT if attempting to add a duplicate
4514 * method.
4515 * The only exception is on_conf_destroy() which may be added multiple
4516 * times by the same interceptor to allow proper cleanup of
4517 * interceptor configuration state.
4518 */
4519
4520
4521/**
4522 * @brief on_conf_set() is called from rd_kafka_*_conf_set() in the order
4523 * the interceptors were added.
4524 *
4525 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4526 * @param name The configuration property to set.
4527 * @param val The configuration value to set, or NULL for reverting to default
4528 * in which case the previous value should be freed.
4529 * @param errstr A human readable error string in case the interceptor fails.
4530 * @param errstr_size Maximum space (including \0) in \p errstr.
4531 *
4532 * @returns RD_KAFKA_CONF_RES_OK if the property was known and successfully
4533 * handled by the interceptor, RD_KAFKA_CONF_RES_INVALID if the
4534 * property was handled by the interceptor but the value was invalid,
4535 * or RD_KAFKA_CONF_RES_UNKNOWN if the interceptor did not handle
4536 * this property, in which case the property is passed on on the
4537 * interceptor in the chain, finally ending up at the built-in
4538 * configuration handler.
4539 */
4540typedef rd_kafka_conf_res_t
4541(rd_kafka_interceptor_f_on_conf_set_t) (rd_kafka_conf_t *conf,
4542 const char *name, const char *val,
4543 char *errstr, size_t errstr_size,
4544 void *ic_opaque);
4545
4546
4547/**
4548 * @brief on_conf_dup() is called from rd_kafka_conf_dup() in the
4549 * order the interceptors were added and is used to let
4550 * an interceptor re-register its conf interecptors with a new
4551 * opaque value.
4552 * The on_conf_dup() method is called prior to the configuration from
4553 * \p old_conf being copied to \p new_conf.
4554 *
4555 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4556 *
4557 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code
4558 * on failure (which is logged but otherwise ignored).
4559 *
4560 * @remark No on_conf_* interceptors are copied to the new configuration
4561 * object on rd_kafka_conf_dup().
4562 */
4563typedef rd_kafka_resp_err_t
4564(rd_kafka_interceptor_f_on_conf_dup_t) (rd_kafka_conf_t *new_conf,
4565 const rd_kafka_conf_t *old_conf,
4566 size_t filter_cnt,
4567 const char **filter,
4568 void *ic_opaque);
4569
4570
4571/**
4572 * @brief on_conf_destroy() is called from rd_kafka_*_conf_destroy() in the
4573 * order the interceptors were added.
4574 *
4575 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4576 */
4577typedef rd_kafka_resp_err_t
4578(rd_kafka_interceptor_f_on_conf_destroy_t) (void *ic_opaque);
4579
4580
4581/**
4582 * @brief on_new() is called from rd_kafka_new() prior toreturning
4583 * the newly created client instance to the application.
4584 *
4585 * @param rk The client instance.
4586 * @param conf The client instance's final configuration.
4587 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4588 * @param errstr A human readable error string in case the interceptor fails.
4589 * @param errstr_size Maximum space (including \0) in \p errstr.
4590 *
4591 * @returns an error code on failure, the error is logged but otherwise ignored.
4592 *
4593 * @warning The \p rk client instance will not be fully set up when this
4594 * interceptor is called and the interceptor MUST NOT call any
4595 * other rk-specific APIs than rd_kafka_interceptor_add..().
4596 *
4597 */
4598typedef rd_kafka_resp_err_t
4599(rd_kafka_interceptor_f_on_new_t) (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
4600 void *ic_opaque,
4601 char *errstr, size_t errstr_size);
4602
4603
4604/**
4605 * @brief on_destroy() is called from rd_kafka_destroy() or (rd_kafka_new()
4606 * if rd_kafka_new() fails during initialization).
4607 *
4608 * @param rk The client instance.
4609 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4610 */
4611typedef rd_kafka_resp_err_t
4612(rd_kafka_interceptor_f_on_destroy_t) (rd_kafka_t *rk, void *ic_opaque);
4613
4614
4615
4616
4617/**
4618 * @brief on_send() is called from rd_kafka_produce*() (et.al) prior to
4619 * the partitioner being called.
4620 *
4621 * @param rk The client instance.
4622 * @param rkmessage The message being produced. Immutable.
4623 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4624 *
4625 * @remark This interceptor is only used by producer instances.
4626 *
4627 * @remark The \p rkmessage object is NOT mutable and MUST NOT be modified
4628 * by the interceptor.
4629 *
4630 * @remark If the partitioner fails or an unknown partition was specified,
4631 * the on_acknowledgement() interceptor chain will be called from
4632 * within the rd_kafka_produce*() call to maintain send-acknowledgement
4633 * symmetry.
4634 *
4635 * @returns an error code on failure, the error is logged but otherwise ignored.
4636 */
4637typedef rd_kafka_resp_err_t
4638(rd_kafka_interceptor_f_on_send_t) (rd_kafka_t *rk,
4639 rd_kafka_message_t *rkmessage,
4640 void *ic_opaque);
4641
4642/**
4643 * @brief on_acknowledgement() is called to inform interceptors that a message
4644 * was succesfully delivered or permanently failed delivery.
4645 * The interceptor chain is called from internal librdkafka background
4646 * threads, or rd_kafka_produce*() if the partitioner failed.
4647 *
4648 * @param rk The client instance.
4649 * @param rkmessage The message being produced. Immutable.
4650 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4651 *
4652 * @remark This interceptor is only used by producer instances.
4653 *
4654 * @remark The \p rkmessage object is NOT mutable and MUST NOT be modified
4655 * by the interceptor.
4656 *
4657 * @warning The on_acknowledgement() method may be called from internal
4658 * librdkafka threads. An on_acknowledgement() interceptor MUST NOT
4659 * call any librdkafka API's associated with the \p rk, or perform
4660 * any blocking or prolonged work.
4661 *
4662 * @returns an error code on failure, the error is logged but otherwise ignored.
4663 */
4664typedef rd_kafka_resp_err_t
4665(rd_kafka_interceptor_f_on_acknowledgement_t) (rd_kafka_t *rk,
4666 rd_kafka_message_t *rkmessage,
4667 void *ic_opaque);
4668
4669
4670/**
4671 * @brief on_consume() is called just prior to passing the message to the
4672 * application in rd_kafka_consumer_poll(), rd_kafka_consume*(),
4673 * the event interface, etc.
4674 *
4675 * @param rk The client instance.
4676 * @param rkmessage The message being consumed. Immutable.
4677 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4678 *
4679 * @remark This interceptor is only used by consumer instances.
4680 *
4681 * @remark The \p rkmessage object is NOT mutable and MUST NOT be modified
4682 * by the interceptor.
4683 *
4684 * @returns an error code on failure, the error is logged but otherwise ignored.
4685 */
4686typedef rd_kafka_resp_err_t
4687(rd_kafka_interceptor_f_on_consume_t) (rd_kafka_t *rk,
4688 rd_kafka_message_t *rkmessage,
4689 void *ic_opaque);
4690
4691/**
4692 * @brief on_commit() is called on completed or failed offset commit.
4693 * It is called from internal librdkafka threads.
4694 *
4695 * @param rk The client instance.
4696 * @param offsets List of topic+partition+offset+error that were committed.
4697 * The error message of each partition should be checked for
4698 * error.
4699 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4700 *
4701 * @remark This interceptor is only used by consumer instances.
4702 *
4703 * @warning The on_commit() interceptor is called from internal
4704 * librdkafka threads. An on_commit() interceptor MUST NOT
4705 * call any librdkafka API's associated with the \p rk, or perform
4706 * any blocking or prolonged work.
4707 *
4708 *
4709 * @returns an error code on failure, the error is logged but otherwise ignored.
4710 */
4711typedef rd_kafka_resp_err_t
4712(rd_kafka_interceptor_f_on_commit_t) (
4713 rd_kafka_t *rk,
4714 const rd_kafka_topic_partition_list_t *offsets,
4715 rd_kafka_resp_err_t err, void *ic_opaque);
4716
4717
4718/**
4719 * @brief on_request_sent() is called when a request has been fully written
4720 * to a broker TCP connections socket.
4721 *
4722 * @param rk The client instance.
4723 * @param sockfd Socket file descriptor.
4724 * @param brokername Broker request is being sent to.
4725 * @param brokerid Broker request is being sent to.
4726 * @param ApiKey Kafka protocol request type.
4727 * @param ApiVersion Kafka protocol request type version.
4728 * @param Corrid Kafka protocol request correlation id.
4729 * @param size Size of request.
4730 * @param ic_opaque The interceptor's opaque pointer specified in ..add..().
4731 *
4732 * @warning The on_request_sent() interceptor is called from internal
4733 * librdkafka broker threads. An on_request_sent() interceptor MUST NOT
4734 * call any librdkafka API's associated with the \p rk, or perform
4735 * any blocking or prolonged work.
4736 *
4737 * @returns an error code on failure, the error is logged but otherwise ignored.
4738 */
4739typedef rd_kafka_resp_err_t
4740(rd_kafka_interceptor_f_on_request_sent_t) (
4741 rd_kafka_t *rk,
4742 int sockfd,
4743 const char *brokername,
4744 int32_t brokerid,
4745 int16_t ApiKey,
4746 int16_t ApiVersion,
4747 int32_t CorrId,
4748 size_t size,
4749 void *ic_opaque);
4750
4751
4752
4753/**
4754 * @brief Append an on_conf_set() interceptor.
4755 *
4756 * @param conf Configuration object.
4757 * @param ic_name Interceptor name, used in logging.
4758 * @param on_conf_set Function pointer.
4759 * @param ic_opaque Opaque value that will be passed to the function.
4760 *
4761 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4762 * if an existing intercepted with the same \p ic_name and function
4763 * has already been added to \p conf.
4764 */
4765RD_EXPORT rd_kafka_resp_err_t
4766rd_kafka_conf_interceptor_add_on_conf_set (
4767 rd_kafka_conf_t *conf, const char *ic_name,
4768 rd_kafka_interceptor_f_on_conf_set_t *on_conf_set,
4769 void *ic_opaque);
4770
4771
4772/**
4773 * @brief Append an on_conf_dup() interceptor.
4774 *
4775 * @param conf Configuration object.
4776 * @param ic_name Interceptor name, used in logging.
4777 * @param on_conf_dup Function pointer.
4778 * @param ic_opaque Opaque value that will be passed to the function.
4779 *
4780 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4781 * if an existing intercepted with the same \p ic_name and function
4782 * has already been added to \p conf.
4783 */
4784RD_EXPORT rd_kafka_resp_err_t
4785rd_kafka_conf_interceptor_add_on_conf_dup (
4786 rd_kafka_conf_t *conf, const char *ic_name,
4787 rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup,
4788 void *ic_opaque);
4789
4790/**
4791 * @brief Append an on_conf_destroy() interceptor.
4792 *
4793 * @param conf Configuration object.
4794 * @param ic_name Interceptor name, used in logging.
4795 * @param on_conf_destroy Function pointer.
4796 * @param ic_opaque Opaque value that will be passed to the function.
4797 *
4798 * @returns RD_KAFKA_RESP_ERR_NO_ERROR
4799 *
4800 * @remark Multiple on_conf_destroy() interceptors are allowed to be added
4801 * to the same configuration object.
4802 */
4803RD_EXPORT rd_kafka_resp_err_t
4804rd_kafka_conf_interceptor_add_on_conf_destroy (
4805 rd_kafka_conf_t *conf, const char *ic_name,
4806 rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy,
4807 void *ic_opaque);
4808
4809
4810/**
4811 * @brief Append an on_new() interceptor.
4812 *
4813 * @param conf Configuration object.
4814 * @param ic_name Interceptor name, used in logging.
4815 * @param on_send Function pointer.
4816 * @param ic_opaque Opaque value that will be passed to the function.
4817 *
4818 * @remark Since the on_new() interceptor is added to the configuration object
4819 * it may be copied by rd_kafka_conf_dup().
4820 * An interceptor implementation must thus be able to handle
4821 * the same interceptor,ic_opaque tuple to be used by multiple
4822 * client instances.
4823 *
4824 * @remark An interceptor plugin should check the return value to make sure it
4825 * has not already been added.
4826 *
4827 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4828 * if an existing intercepted with the same \p ic_name and function
4829 * has already been added to \p conf.
4830 */
4831RD_EXPORT rd_kafka_resp_err_t
4832rd_kafka_conf_interceptor_add_on_new (
4833 rd_kafka_conf_t *conf, const char *ic_name,
4834 rd_kafka_interceptor_f_on_new_t *on_new,
4835 void *ic_opaque);
4836
4837
4838
4839/**
4840 * @brief Append an on_destroy() interceptor.
4841 *
4842 * @param rk Client instance.
4843 * @param ic_name Interceptor name, used in logging.
4844 * @param on_destroy Function pointer.
4845 * @param ic_opaque Opaque value that will be passed to the function.
4846 *
4847 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4848 * if an existing intercepted with the same \p ic_name and function
4849 * has already been added to \p conf.
4850 */
4851RD_EXPORT rd_kafka_resp_err_t
4852rd_kafka_interceptor_add_on_destroy (
4853 rd_kafka_t *rk, const char *ic_name,
4854 rd_kafka_interceptor_f_on_destroy_t *on_destroy,
4855 void *ic_opaque);
4856
4857
4858/**
4859 * @brief Append an on_send() interceptor.
4860 *
4861 * @param rk Client instance.
4862 * @param ic_name Interceptor name, used in logging.
4863 * @param on_send Function pointer.
4864 * @param ic_opaque Opaque value that will be passed to the function.
4865 *
4866 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4867 * if an existing intercepted with the same \p ic_name and function
4868 * has already been added to \p conf.
4869 */
4870RD_EXPORT rd_kafka_resp_err_t
4871rd_kafka_interceptor_add_on_send (
4872 rd_kafka_t *rk, const char *ic_name,
4873 rd_kafka_interceptor_f_on_send_t *on_send,
4874 void *ic_opaque);
4875
4876/**
4877 * @brief Append an on_acknowledgement() interceptor.
4878 *
4879 * @param rk Client instance.
4880 * @param ic_name Interceptor name, used in logging.
4881 * @param on_acknowledgement Function pointer.
4882 * @param ic_opaque Opaque value that will be passed to the function.
4883 *
4884 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4885 * if an existing intercepted with the same \p ic_name and function
4886 * has already been added to \p conf.
4887 */
4888RD_EXPORT rd_kafka_resp_err_t
4889rd_kafka_interceptor_add_on_acknowledgement (
4890 rd_kafka_t *rk, const char *ic_name,
4891 rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
4892 void *ic_opaque);
4893
4894
4895/**
4896 * @brief Append an on_consume() interceptor.
4897 *
4898 * @param rk Client instance.
4899 * @param ic_name Interceptor name, used in logging.
4900 * @param on_consume Function pointer.
4901 * @param ic_opaque Opaque value that will be passed to the function.
4902 *
4903 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4904 * if an existing intercepted with the same \p ic_name and function
4905 * has already been added to \p conf.
4906 */
4907RD_EXPORT rd_kafka_resp_err_t
4908rd_kafka_interceptor_add_on_consume (
4909 rd_kafka_t *rk, const char *ic_name,
4910 rd_kafka_interceptor_f_on_consume_t *on_consume,
4911 void *ic_opaque);
4912
4913
4914/**
4915 * @brief Append an on_commit() interceptor.
4916 *
4917 * @param rk Client instance.
4918 * @param ic_name Interceptor name, used in logging.
4919 * @param on_commit() Function pointer.
4920 * @param ic_opaque Opaque value that will be passed to the function.
4921 *
4922 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4923 * if an existing intercepted with the same \p ic_name and function
4924 * has already been added to \p conf.
4925 */
4926RD_EXPORT rd_kafka_resp_err_t
4927rd_kafka_interceptor_add_on_commit (
4928 rd_kafka_t *rk, const char *ic_name,
4929 rd_kafka_interceptor_f_on_commit_t *on_commit,
4930 void *ic_opaque);
4931
4932
4933/**
4934 * @brief Append an on_request_sent() interceptor.
4935 *
4936 * @param rk Client instance.
4937 * @param ic_name Interceptor name, used in logging.
4938 * @param on_request_sent() Function pointer.
4939 * @param ic_opaque Opaque value that will be passed to the function.
4940 *
4941 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT
4942 * if an existing intercepted with the same \p ic_name and function
4943 * has already been added to \p conf.
4944 */
4945RD_EXPORT rd_kafka_resp_err_t
4946rd_kafka_interceptor_add_on_request_sent (
4947 rd_kafka_t *rk, const char *ic_name,
4948 rd_kafka_interceptor_f_on_request_sent_t *on_request_sent,
4949 void *ic_opaque);
4950
4951
4952
4953
4954/**@}*/
4955
4956
4957
4958/**
4959 * @name Auxiliary types
4960 *
4961 * @{
4962 */
4963
4964
4965
4966/**
4967 * @brief Topic result provides per-topic operation result information.
4968 *
4969 */
4970
4971/**
4972 * @returns the error code for the given topic result.
4973 */
4974RD_EXPORT rd_kafka_resp_err_t
4975rd_kafka_topic_result_error (const rd_kafka_topic_result_t *topicres);
4976
4977/**
4978 * @returns the human readable error string for the given topic result,
4979 * or NULL if there was no error.
4980 *
4981 * @remark lifetime of the returned string is the same as the \p topicres.
4982 */
4983RD_EXPORT const char *
4984rd_kafka_topic_result_error_string (const rd_kafka_topic_result_t *topicres);
4985
4986/**
4987 * @returns the name of the topic for the given topic result.
4988 * @remark lifetime of the returned string is the same as the \p topicres.
4989 *
4990 */
4991RD_EXPORT const char *
4992rd_kafka_topic_result_name (const rd_kafka_topic_result_t *topicres);
4993
4994
4995/**@}*/
4996
4997
4998/**
4999 * @name Admin API
5000 *
5001 * @{
5002 *
5003 * @brief The Admin API enables applications to perform administrative
5004 * Apache Kafka tasks, such as creating and deleting topics,
5005 * altering and reading broker configuration, etc.
5006 *
5007 * The Admin API is asynchronous and makes use of librdkafka's standard
5008 * \c rd_kafka_queue_t queues to propagate the result of an admin operation
5009 * back to the application.
5010 * The supplied queue may be any queue, such as a temporary single-call queue,
5011 * a shared queue used for multiple requests, or even the main queue or
5012 * consumer queues.
5013 *
5014 * Use \c rd_kafka_queue_poll() to collect the result of an admin operation
5015 * from the queue of your choice, then extract the admin API-specific result
5016 * type by using the corresponding \c rd_kafka_event_CreateTopics_result,
5017 * \c rd_kafka_event_DescribeConfigs_result, etc, methods.
5018 * Use the getter methods on the \c .._result_t type to extract response
5019 * information and finally destroy the result and event by calling
5020 * \c rd_kafka_event_destroy().
5021 *
5022 * Use rd_kafka_event_error() and rd_kafka_event_error_string() to acquire
5023 * the request-level error/success for an Admin API request.
5024 * Even if the returned value is \c RD_KAFKA_RESP_ERR_NO_ERROR there
5025 * may be individual objects (topics, resources, etc) that have failed.
5026 * Extract per-object error information with the corresponding
5027 * \c rd_kafka_..._result_topics|resources|..() to check per-object errors.
5028 *
5029 * Locally triggered errors:
5030 * - \c RD_KAFKA_RESP_ERR__TIMED_OUT - (Controller) broker connection did not
5031 * become available in the time allowed by AdminOption_set_request_timeout.
5032 */
5033
5034
5035/**
5036 * @enum rd_kafka_admin_op_t
5037 *
5038 * @brief Admin operation enum name for use with rd_kafka_AdminOptions_new()
5039 *
5040 * @sa rd_kafka_AdminOptions_new()
5041 */
5042typedef enum rd_kafka_admin_op_t {
5043 RD_KAFKA_ADMIN_OP_ANY = 0, /**< Default value */
5044 RD_KAFKA_ADMIN_OP_CREATETOPICS, /**< CreateTopics */
5045 RD_KAFKA_ADMIN_OP_DELETETOPICS, /**< DeleteTopics */
5046 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, /**< CreatePartitions */
5047 RD_KAFKA_ADMIN_OP_ALTERCONFIGS, /**< AlterConfigs */
5048 RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, /**< DescribeConfigs */
5049 RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
5050} rd_kafka_admin_op_t;
5051
5052/**
5053 * @brief AdminOptions provides a generic mechanism for setting optional
5054 * parameters for the Admin API requests.
5055 *
5056 * @remark Since AdminOptions is decoupled from the actual request type
5057 * there is no enforcement to prevent setting unrelated properties,
5058 * e.g. setting validate_only on a DescribeConfigs request is allowed
5059 * but is silently ignored by DescribeConfigs.
5060 * Future versions may introduce such enforcement.
5061 */
5062
5063
5064typedef struct rd_kafka_AdminOptions_s rd_kafka_AdminOptions_t;
5065
5066/**
5067 * @brief Create a new AdminOptions object.
5068 *
5069 * The options object is not modified by the Admin API request APIs,
5070 * (e.g. CreateTopics) and may be reused for multiple calls.
5071 *
5072 * @param rk Client instance.
5073 * @param for_api Specifies what Admin API this AdminOptions object will be used
5074 * for, which will enforce what AdminOptions_set_..() calls may
5075 * be used based on the API, causing unsupported set..() calls
5076 * to fail.
5077 * Specifying RD_KAFKA_ADMIN_OP_ANY disables the enforcement
5078 * allowing any option to be set, even if the option
5079 * is not used in a future call to an Admin API method.
5080 *
5081 * @returns a new AdminOptions object (which must be freed with
5082 * rd_kafka_AdminOptions_destroy()), or NULL if \p for_api was set to
5083 * an unknown API op type.
5084 */
5085RD_EXPORT rd_kafka_AdminOptions_t *
5086rd_kafka_AdminOptions_new (rd_kafka_t *rk, rd_kafka_admin_op_t for_api);
5087
5088
5089/**
5090 * @brief Destroy a AdminOptions object.
5091 */
5092RD_EXPORT void rd_kafka_AdminOptions_destroy (rd_kafka_AdminOptions_t *options);
5093
5094
5095/**
5096 * @brief Sets the overall request timeout, including broker lookup,
5097 * request transmission, operation time on broker, and response.
5098 *
5099 * @param timeout_ms Timeout in milliseconds, use -1 for indefinite timeout.
5100 * Defaults to `socket.timeout.ms`.
5101 *
5102 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or
5103 * RD_KAFKA_RESP_ERR__INVALID_ARG if timeout was out of range in which
5104 * case an error string will be written \p errstr.
5105 *
5106 * @remark This option is valid for all Admin API requests.
5107 */
5108RD_EXPORT rd_kafka_resp_err_t
5109rd_kafka_AdminOptions_set_request_timeout (rd_kafka_AdminOptions_t *options,
5110 int timeout_ms,
5111 char *errstr, size_t errstr_size);
5112
5113
5114/**
5115 * @brief Sets the broker's operation timeout, such as the timeout for
5116 * CreateTopics to complete the creation of topics on the controller
5117 * before returning a result to the application.
5118 *
5119 * CreateTopics: values <= 0 will return immediately after triggering topic
5120 * creation, while > 0 will wait this long for topic creation to propagate
5121 * in cluster. Default: 0.
5122 *
5123 * DeleteTopics: same semantics as CreateTopics.
5124 * CreatePartitions: same semantics as CreateTopics.
5125 *
5126 *
5127 * @param timeout_ms Timeout in milliseconds.
5128 *
5129 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or
5130 * RD_KAFKA_RESP_ERR__INVALID_ARG if timeout was out of range in which
5131 * case an error string will be written \p errstr.
5132 *
5133 * @remark This option is valid for CreateTopics, DeleteTopics and
5134 * CreatePartitions.
5135 */
5136RD_EXPORT rd_kafka_resp_err_t
5137rd_kafka_AdminOptions_set_operation_timeout (rd_kafka_AdminOptions_t *options,
5138 int timeout_ms,
5139 char *errstr, size_t errstr_size);
5140
5141
5142/**
5143 * @brief Tell broker to only validate the request, without performing
5144 * the requested operation (create topics, etc).
5145 *
5146 * @param true_or_false Defaults to false.
5147 *
5148 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an
5149 * error code on failure in which case an error string will
5150 * be written \p errstr.
5151 *
5152 * @remark This option is valid for CreateTopics,
5153 * CreatePartitions, AlterConfigs.
5154 */
5155RD_EXPORT rd_kafka_resp_err_t
5156rd_kafka_AdminOptions_set_validate_only (rd_kafka_AdminOptions_t *options,
5157 int true_or_false,
5158 char *errstr, size_t errstr_size);
5159
5160
5161/**
5162 * @brief Override what broker the Admin request will be sent to.
5163 *
5164 * By default, Admin requests are sent to the controller broker, with
5165 * the following exceptions:
5166 * - AlterConfigs with a BROKER resource are sent to the broker id set
5167 * as the resource name.
5168 * - DescribeConfigs with a BROKER resource are sent to the broker id set
5169 * as the resource name.
5170 *
5171 * @param broker_id The broker to send the request to.
5172 *
5173 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an
5174 * error code on failure in which case an error string will
5175 * be written \p errstr.
5176 *
5177 * @remark This API should typically not be used, but serves as a workaround
5178 * if new resource types are to the broker that the client
5179 * does not know where to send.
5180 */
5181RD_EXPORT rd_kafka_resp_err_t
5182rd_kafka_AdminOptions_set_broker (rd_kafka_AdminOptions_t *options,
5183 int32_t broker_id,
5184 char *errstr, size_t errstr_size);
5185
5186
5187
5188/**
5189 * @brief Set application opaque value that can be extracted from the
5190 * result event using rd_kafka_event_opaque()
5191 */
5192RD_EXPORT void
5193rd_kafka_AdminOptions_set_opaque (rd_kafka_AdminOptions_t *options,
5194 void *opaque);
5195
5196
5197
5198
5199
5200
5201/**
5202 * @section CreateTopics - create topics in cluster
5203 *
5204 *
5205 */
5206
5207
5208typedef struct rd_kafka_NewTopic_s rd_kafka_NewTopic_t;
5209
5210/**
5211 * @brief Create a new NewTopic object. This object is later passed to
5212 * rd_kafka_CreateTopics().
5213 *
5214 * @param topic Topic name to create.
5215 * @param num_partitions Number of partitions in topic.
5216 * @param replication_factor Default replication factor for the topic's
5217 * partitions, or -1 if set_replica_assignment()
5218 * will be used.
5219 *
5220 * @returns a new allocated NewTopic object, or NULL if the input parameters
5221 * are invalid.
5222 * Use rd_kafka_NewTopic_destroy() to free object when done.
5223 */
5224RD_EXPORT rd_kafka_NewTopic_t *
5225rd_kafka_NewTopic_new (const char *topic, int num_partitions,
5226 int replication_factor,
5227 char *errstr, size_t errstr_size);
5228
5229/**
5230 * @brief Destroy and free a NewTopic object previously created with
5231 * rd_kafka_NewTopic_new()
5232 */
5233RD_EXPORT void
5234rd_kafka_NewTopic_destroy (rd_kafka_NewTopic_t *new_topic);
5235
5236
5237/**
5238 * @brief Helper function to destroy all NewTopic objects in the \p new_topics
5239 * array (of \p new_topic_cnt elements).
5240 * The array itself is not freed.
5241 */
5242RD_EXPORT void
5243rd_kafka_NewTopic_destroy_array (rd_kafka_NewTopic_t **new_topics,
5244 size_t new_topic_cnt);
5245
5246
5247/**
5248 * @brief Set the replica (broker) assignment for \p partition to the
5249 * replica set in \p broker_ids (of \p broker_id_cnt elements).
5250 *
5251 * @remark When this method is used, rd_kafka_NewTopic_new() must have
5252 * been called with a \c replication_factor of -1.
5253 *
5254 * @remark An application must either set the replica assignment for
5255 * all new partitions, or none.
5256 *
5257 * @remark If called, this function must be called consecutively for each
5258 * partition, starting at 0.
5259 *
5260 * @remark Use rd_kafka_metadata() to retrieve the list of brokers
5261 * in the cluster.
5262 *
5263 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or an error code
5264 * if the arguments were invalid.
5265 *
5266 * @sa rd_kafka_AdminOptions_set_validate_only()
5267 */
5268RD_EXPORT rd_kafka_resp_err_t
5269rd_kafka_NewTopic_set_replica_assignment (rd_kafka_NewTopic_t *new_topic,
5270 int32_t partition,
5271 int32_t *broker_ids,
5272 size_t broker_id_cnt,
5273 char *errstr, size_t errstr_size);
5274
5275/**
5276 * @brief Set (broker-side) topic configuration name/value pair.
5277 *
5278 * @remark The name and value are not validated by the client, the validation
5279 * takes place on the broker.
5280 *
5281 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or an error code
5282 * if the arguments were invalid.
5283 *
5284 * @sa rd_kafka_AdminOptions_set_validate_only()
5285 * @sa http://kafka.apache.org/documentation.html#topicconfigs
5286 */
5287RD_EXPORT rd_kafka_resp_err_t
5288rd_kafka_NewTopic_set_config (rd_kafka_NewTopic_t *new_topic,
5289 const char *name, const char *value);
5290
5291
5292/**
5293 * @brief Create topics in cluster as specified by the \p new_topics
5294 * array of size \p new_topic_cnt elements.
5295 *
5296 * @param new_topics Array of new topics to create.
5297 * @param new_topic_cnt Number of elements in \p new_topics array.
5298 * @param options Optional admin options, or NULL for defaults.
5299 * @param rkqu Queue to emit result on.
5300 *
5301 * Supported admin options:
5302 * - rd_kafka_AdminOptions_set_validate_only() - default false
5303 * - rd_kafka_AdminOptions_set_operation_timeout() - default 0
5304 * - rd_kafka_AdminOptions_set_timeout() - default socket.timeout.ms
5305 *
5306 * @remark The result event type emitted on the supplied queue is of type
5307 * \c RD_KAFKA_EVENT_CREATETOPICS_RESULT
5308 */
5309RD_EXPORT void
5310rd_kafka_CreateTopics (rd_kafka_t *rk,
5311 rd_kafka_NewTopic_t **new_topics,
5312 size_t new_topic_cnt,
5313 const rd_kafka_AdminOptions_t *options,
5314 rd_kafka_queue_t *rkqu);
5315
5316
5317/**
5318 * @brief CreateTopics result type and methods
5319 */
5320
5321/**
5322 * @brief Get an array of topic results from a CreateTopics result.
5323 *
5324 * The returned \p topics life-time is the same as the \p result object.
5325 * @param cntp is updated to the number of elements in the array.
5326 */
5327RD_EXPORT const rd_kafka_topic_result_t **
5328rd_kafka_CreateTopics_result_topics (
5329 const rd_kafka_CreateTopics_result_t *result,
5330 size_t *cntp);
5331
5332
5333
5334
5335
5336/**
5337 * @section DeleteTopics - delete topics from cluster
5338 *
5339 *
5340 */
5341
5342typedef struct rd_kafka_DeleteTopic_s rd_kafka_DeleteTopic_t;
5343
5344/**
5345 * @brief Create a new DeleteTopic object. This object is later passed to
5346 * rd_kafka_DeleteTopics().
5347 *
5348 * @param topic Topic name to delete.
5349 *
5350 * @returns a new allocated DeleteTopic object.
5351 * Use rd_kafka_DeleteTopic_destroy() to free object when done.
5352 */
5353RD_EXPORT rd_kafka_DeleteTopic_t *
5354rd_kafka_DeleteTopic_new (const char *topic);
5355
5356/**
5357 * @brief Destroy and free a DeleteTopic object previously created with
5358 * rd_kafka_DeleteTopic_new()
5359 */
5360RD_EXPORT void
5361rd_kafka_DeleteTopic_destroy (rd_kafka_DeleteTopic_t *del_topic);
5362
5363/**
5364 * @brief Helper function to destroy all DeleteTopic objects in
5365 * the \p del_topics array (of \p del_topic_cnt elements).
5366 * The array itself is not freed.
5367 */
5368RD_EXPORT void
5369rd_kafka_DeleteTopic_destroy_array (rd_kafka_DeleteTopic_t **del_topics,
5370 size_t del_topic_cnt);
5371
5372/**
5373 * @brief Delete topics from cluster as specified by the \p topics
5374 * array of size \p topic_cnt elements.
5375 *
5376 * @param topics Array of topics to delete.
5377 * @param topic_cnt Number of elements in \p topics array.
5378 * @param options Optional admin options, or NULL for defaults.
5379 * @param rkqu Queue to emit result on.
5380 *
5381 * @remark The result event type emitted on the supplied queue is of type
5382 * \c RD_KAFKA_EVENT_DELETETOPICS_RESULT
5383 */
5384RD_EXPORT
5385void rd_kafka_DeleteTopics (rd_kafka_t *rk,
5386 rd_kafka_DeleteTopic_t **del_topics,
5387 size_t del_topic_cnt,
5388 const rd_kafka_AdminOptions_t *options,
5389 rd_kafka_queue_t *rkqu);
5390
5391
5392
5393/**
5394 * @brief DeleteTopics result type and methods
5395 */
5396
5397/**
5398 * @brief Get an array of topic results from a DeleteTopics result.
5399 *
5400 * The returned \p topics life-time is the same as the \p result object.
5401 * @param cntp is updated to the number of elements in the array.
5402 */
5403RD_EXPORT const rd_kafka_topic_result_t **
5404rd_kafka_DeleteTopics_result_topics (
5405 const rd_kafka_DeleteTopics_result_t *result,
5406 size_t *cntp);
5407
5408
5409
5410
5411
5412
5413/**
5414 * @section CreatePartitions - add partitions to topic.
5415 *
5416 *
5417 */
5418
5419typedef struct rd_kafka_NewPartitions_s rd_kafka_NewPartitions_t;
5420
5421/**
5422 * @brief Create a new NewPartitions. This object is later passed to
5423 * rd_kafka_CreatePartitions() to increas the number of partitions
5424 * to \p new_total_cnt for an existing topic.
5425 *
5426 * @param topic Topic name to create more partitions for.
5427 * @param new_total_cnt Increase the topic's partition count to this value.
5428 *
5429 * @returns a new allocated NewPartitions object, or NULL if the
5430 * input parameters are invalid.
5431 * Use rd_kafka_NewPartitions_destroy() to free object when done.
5432 */
5433RD_EXPORT rd_kafka_NewPartitions_t *
5434rd_kafka_NewPartitions_new (const char *topic, size_t new_total_cnt,
5435 char *errstr, size_t errstr_size);
5436
5437/**
5438 * @brief Destroy and free a NewPartitions object previously created with
5439 * rd_kafka_NewPartitions_new()
5440 */
5441RD_EXPORT void
5442rd_kafka_NewPartitions_destroy (rd_kafka_NewPartitions_t *new_parts);
5443
5444/**
5445 * @brief Helper function to destroy all NewPartitions objects in the
5446 * \p new_parts array (of \p new_parts_cnt elements).
5447 * The array itself is not freed.
5448 */
5449RD_EXPORT void
5450rd_kafka_NewPartitions_destroy_array (rd_kafka_NewPartitions_t **new_parts,
5451 size_t new_parts_cnt);
5452
5453/**
5454 * @brief Set the replica (broker id) assignment for \p new_partition_idx to the
5455 * replica set in \p broker_ids (of \p broker_id_cnt elements).
5456 *
5457 * @remark An application must either set the replica assignment for
5458 * all new partitions, or none.
5459 *
5460 * @remark If called, this function must be called consecutively for each
5461 * new partition being created,
5462 * where \p new_partition_idx 0 is the first new partition,
5463 * 1 is the second, and so on.
5464 *
5465 * @remark \p broker_id_cnt should match the topic's replication factor.
5466 *
5467 * @remark Use rd_kafka_metadata() to retrieve the list of brokers
5468 * in the cluster.
5469 *
5470 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or an error code
5471 * if the arguments were invalid.
5472 *
5473 * @sa rd_kafka_AdminOptions_set_validate_only()
5474 */
5475RD_EXPORT rd_kafka_resp_err_t
5476rd_kafka_NewPartitions_set_replica_assignment (rd_kafka_NewPartitions_t *new_parts,
5477 int32_t new_partition_idx,
5478 int32_t *broker_ids,
5479 size_t broker_id_cnt,
5480 char *errstr,
5481 size_t errstr_size);
5482
5483
5484/**
5485 * @brief Create additional partitions for the given topics, as specified
5486 * by the \p new_parts array of size \p new_parts_cnt elements.
5487 *
5488 * @param new_parts Array of topics for which new partitions are to be created.
5489 * @param new_parts_cnt Number of elements in \p new_parts array.
5490 * @param options Optional admin options, or NULL for defaults.
5491 * @param rkqu Queue to emit result on.
5492 *
5493 * Supported admin options:
5494 * - rd_kafka_AdminOptions_set_validate_only() - default false
5495 * - rd_kafka_AdminOptions_set_operation_timeout() - default 0
5496 * - rd_kafka_AdminOptions_set_timeout() - default socket.timeout.ms
5497 *
5498 * @remark The result event type emitted on the supplied queue is of type
5499 * \c RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT
5500 */
5501RD_EXPORT void
5502rd_kafka_CreatePartitions (rd_kafka_t *rk,
5503 rd_kafka_NewPartitions_t **new_parts,
5504 size_t new_parts_cnt,
5505 const rd_kafka_AdminOptions_t *options,
5506 rd_kafka_queue_t *rkqu);
5507
5508
5509
5510/**
5511 * @brief CreatePartitions result type and methods
5512 */
5513
5514/**
5515 * @brief Get an array of topic results from a CreatePartitions result.
5516 *
5517 * The returned \p topics life-time is the same as the \p result object.
5518 * @param cntp is updated to the number of elements in the array.
5519 */
5520RD_EXPORT const rd_kafka_topic_result_t **
5521rd_kafka_CreatePartitions_result_topics (
5522 const rd_kafka_CreatePartitions_result_t *result,
5523 size_t *cntp);
5524
5525
5526
5527
5528
5529/**
5530 * @section Cluster, broker, topic configuration entries, sources, etc.
5531 *
5532 * These entities relate to the cluster, not the local client.
5533 *
5534 * @sa rd_kafka_conf_set(), et.al. for local client configuration.
5535 *
5536 */
5537
5538/**
5539 * @enum Apache Kafka config sources
5540 */
5541typedef enum rd_kafka_ConfigSource_t {
5542 /**< Source unknown, e.g., in the ConfigEntry used for alter requests
5543 * where source is not set */
5544 RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG = 0,
5545 /**< Dynamic topic config that is configured for a specific topic */
5546 RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG = 1,
5547 /**< Dynamic broker config that is configured for a specific broker */
5548 RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG = 2,
5549 /**< Dynamic broker config that is configured as default for all
5550 * brokers in the cluster */
5551 RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG = 3,
5552 /**< Static broker config provided as broker properties at startup
5553 * (e.g. from server.properties file) */
5554 RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG = 4,
5555 /**< Built-in default configuration for configs that have a
5556 * default value */
5557 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG = 5,
5558
5559 /**< Number of source types defined */
5560 RD_KAFKA_CONFIG_SOURCE__CNT,
5561} rd_kafka_ConfigSource_t;
5562
5563
5564/**
5565 * @returns a string representation of the \p confsource.
5566 */
5567RD_EXPORT const char *
5568rd_kafka_ConfigSource_name (rd_kafka_ConfigSource_t confsource);
5569
5570
5571typedef struct rd_kafka_ConfigEntry_s rd_kafka_ConfigEntry_t;
5572
5573/**
5574 * @returns the configuration property name
5575 */
5576RD_EXPORT const char *
5577rd_kafka_ConfigEntry_name (const rd_kafka_ConfigEntry_t *entry);
5578
5579/**
5580 * @returns the configuration value, may be NULL for sensitive or unset
5581 * properties.
5582 */
5583RD_EXPORT const char *
5584rd_kafka_ConfigEntry_value (const rd_kafka_ConfigEntry_t *entry);
5585
5586/**
5587 * @returns the config source.
5588 */
5589RD_EXPORT rd_kafka_ConfigSource_t
5590rd_kafka_ConfigEntry_source (const rd_kafka_ConfigEntry_t *entry);
5591
5592/**
5593 * @returns 1 if the config property is read-only on the broker, else 0.
5594 * @remark Shall only be used on a DescribeConfigs result, otherwise returns -1.
5595 */
5596RD_EXPORT int
5597rd_kafka_ConfigEntry_is_read_only (const rd_kafka_ConfigEntry_t *entry);
5598
5599/**
5600 * @returns 1 if the config property is set to its default value on the broker,
5601 * else 0.
5602 * @remark Shall only be used on a DescribeConfigs result, otherwise returns -1.
5603 */
5604RD_EXPORT int
5605rd_kafka_ConfigEntry_is_default (const rd_kafka_ConfigEntry_t *entry);
5606
5607/**
5608 * @returns 1 if the config property contains sensitive information (such as
5609 * security configuration), else 0.
5610 * @remark An application should take care not to include the value of
5611 * sensitive configuration entries in its output.
5612 * @remark Shall only be used on a DescribeConfigs result, otherwise returns -1.
5613 */
5614RD_EXPORT int
5615rd_kafka_ConfigEntry_is_sensitive (const rd_kafka_ConfigEntry_t *entry);
5616
5617/**
5618 * @returns 1 if this entry is a synonym, else 0.
5619 */
5620RD_EXPORT int
5621rd_kafka_ConfigEntry_is_synonym (const rd_kafka_ConfigEntry_t *entry);
5622
5623
5624/**
5625 * @returns the synonym config entry array.
5626 *
5627 * @param cntp is updated to the number of elements in the array.
5628 *
5629 * @remark The lifetime of the returned entry is the same as \p conf .
5630 * @remark Shall only be used on a DescribeConfigs result,
5631 * otherwise returns NULL.
5632 */
5633RD_EXPORT const rd_kafka_ConfigEntry_t **
5634rd_kafka_ConfigEntry_synonyms (const rd_kafka_ConfigEntry_t *entry,
5635 size_t *cntp);
5636
5637
5638
5639
5640/**
5641 * @enum Apache Kafka resource types
5642 */
5643typedef enum rd_kafka_ResourceType_t {
5644 RD_KAFKA_RESOURCE_UNKNOWN = 0, /**< Unknown */
5645 RD_KAFKA_RESOURCE_ANY = 1, /**< Any (used for lookups) */
5646 RD_KAFKA_RESOURCE_TOPIC = 2, /**< Topic */
5647 RD_KAFKA_RESOURCE_GROUP = 3, /**< Group */
5648 RD_KAFKA_RESOURCE_BROKER = 4, /**< Broker */
5649 RD_KAFKA_RESOURCE__CNT, /**< Number of resource types defined */
5650} rd_kafka_ResourceType_t;
5651
5652/**
5653 * @returns a string representation of the \p restype
5654 */
5655RD_EXPORT const char *
5656rd_kafka_ResourceType_name (rd_kafka_ResourceType_t restype);
5657
5658typedef struct rd_kafka_ConfigResource_s rd_kafka_ConfigResource_t;
5659
5660
5661RD_EXPORT rd_kafka_ConfigResource_t *
5662rd_kafka_ConfigResource_new (rd_kafka_ResourceType_t restype,
5663 const char *resname);
5664
5665/**
5666 * @brief Destroy and free a ConfigResource object previously created with
5667 * rd_kafka_ConfigResource_new()
5668 */
5669RD_EXPORT void
5670rd_kafka_ConfigResource_destroy (rd_kafka_ConfigResource_t *config);
5671
5672
5673/**
5674 * @brief Helper function to destroy all ConfigResource objects in
5675 * the \p configs array (of \p config_cnt elements).
5676 * The array itself is not freed.
5677 */
5678RD_EXPORT void
5679rd_kafka_ConfigResource_destroy_array (rd_kafka_ConfigResource_t **config,
5680 size_t config_cnt);
5681
5682
5683/**
5684 * @brief Set configuration name value pair.
5685 *
5686 * @param name Configuration name, depends on resource type.
5687 * @param value Configuration value, depends on resource type and \p name.
5688 * Set to \c NULL to revert configuration value to default.
5689 *
5690 * This will overwrite the current value.
5691 *
5692 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if config was added to resource,
5693 * or RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input.
5694 */
5695RD_EXPORT rd_kafka_resp_err_t
5696rd_kafka_ConfigResource_set_config (rd_kafka_ConfigResource_t *config,
5697 const char *name, const char *value);
5698
5699
5700/**
5701 * @brief Get an array of config entries from a ConfigResource object.
5702 *
5703 * The returned object life-times are the same as the \p config object.
5704 *
5705 * @param cntp is updated to the number of elements in the array.
5706 */
5707RD_EXPORT const rd_kafka_ConfigEntry_t **
5708rd_kafka_ConfigResource_configs (const rd_kafka_ConfigResource_t *config,
5709 size_t *cntp);
5710
5711
5712
5713/**
5714 * @returns the ResourceType for \p config
5715 */
5716RD_EXPORT rd_kafka_ResourceType_t
5717rd_kafka_ConfigResource_type (const rd_kafka_ConfigResource_t *config);
5718
5719/**
5720 * @returns the name for \p config
5721 */
5722RD_EXPORT const char *
5723rd_kafka_ConfigResource_name (const rd_kafka_ConfigResource_t *config);
5724
5725/**
5726 * @returns the error for this resource from an AlterConfigs request
5727 */
5728RD_EXPORT rd_kafka_resp_err_t
5729rd_kafka_ConfigResource_error (const rd_kafka_ConfigResource_t *config);
5730
5731/**
5732 * @returns the error string for this resource from an AlterConfigs
5733 * request, or NULL if no error.
5734 */
5735RD_EXPORT const char *
5736rd_kafka_ConfigResource_error_string (const rd_kafka_ConfigResource_t *config);
5737
5738
5739/**
5740 * @section AlterConfigs - alter cluster configuration.
5741 *
5742 *
5743 */
5744
5745
5746/**
5747 * @brief Update the configuration for the specified resources.
5748 * Updates are not transactional so they may succeed for a subset
5749 * of the provided resources while the others fail.
5750 * The configuration for a particular resource is updated atomically,
5751 * replacing values using the provided ConfigEntrys and reverting
5752 * unspecified ConfigEntrys to their default values.
5753 *
5754 * @remark Requires broker version >=0.11.0.0
5755 *
5756 * @warning AlterConfigs will replace all existing configuration for
5757 * the provided resources with the new configuration given,
5758 * reverting all other configuration to their default values.
5759 *
5760 * @remark Multiple resources and resource types may be set, but at most one
5761 * resource of type \c RD_KAFKA_RESOURCE_BROKER is allowed per call
5762 * since these resource requests must be sent to the broker specified
5763 * in the resource.
5764 *
5765 */
5766RD_EXPORT
5767void rd_kafka_AlterConfigs (rd_kafka_t *rk,
5768 rd_kafka_ConfigResource_t **configs,
5769 size_t config_cnt,
5770 const rd_kafka_AdminOptions_t *options,
5771 rd_kafka_queue_t *rkqu);
5772
5773
5774/**
5775 * @brief AlterConfigs result type and methods
5776 */
5777
5778/**
5779 * @brief Get an array of resource results from a AlterConfigs result.
5780 *
5781 * Use \c rd_kafka_ConfigResource_error() and
5782 * \c rd_kafka_ConfigResource_error_string() to extract per-resource error
5783 * results on the returned array elements.
5784 *
5785 * The returned object life-times are the same as the \p result object.
5786 *
5787 * @param cntp is updated to the number of elements in the array.
5788 *
5789 * @returns an array of ConfigResource elements, or NULL if not available.
5790 */
5791RD_EXPORT const rd_kafka_ConfigResource_t **
5792rd_kafka_AlterConfigs_result_resources (
5793 const rd_kafka_AlterConfigs_result_t *result,
5794 size_t *cntp);
5795
5796
5797
5798
5799
5800
5801/**
5802 * @section DescribeConfigs - retrieve cluster configuration.
5803 *
5804 *
5805 */
5806
5807
5808/**
5809 * @brief Get configuration for the specified resources in \p configs.
5810 *
5811 * The returned configuration includes default values and the
5812 * rd_kafka_ConfigEntry_is_default() or rd_kafka_ConfigEntry_source()
5813 * methods may be used to distinguish them from user supplied values.
5814 *
5815 * The value of config entries where rd_kafka_ConfigEntry_is_sensitive()
5816 * is true will always be NULL to avoid disclosing sensitive
5817 * information, such as security settings.
5818 *
5819 * Configuration entries where rd_kafka_ConfigEntry_is_read_only()
5820 * is true can't be updated (with rd_kafka_AlterConfigs()).
5821 *
5822 * Synonym configuration entries are returned if the broker supports
5823 * it (broker version >= 1.1.0). See rd_kafka_ConfigEntry_synonyms().
5824 *
5825 * @remark Requires broker version >=0.11.0.0
5826 *
5827 * @remark Multiple resources and resource types may be requested, but at most
5828 * one resource of type \c RD_KAFKA_RESOURCE_BROKER is allowed per call
5829 * since these resource requests must be sent to the broker specified
5830 * in the resource.
5831 */
5832RD_EXPORT
5833void rd_kafka_DescribeConfigs (rd_kafka_t *rk,
5834 rd_kafka_ConfigResource_t **configs,
5835 size_t config_cnt,
5836 const rd_kafka_AdminOptions_t *options,
5837 rd_kafka_queue_t *rkqu);
5838
5839
5840/**
5841 * @brief DescribeConfigs result type and methods
5842 */
5843
5844/**
5845 * @brief Get an array of resource results from a DescribeConfigs result.
5846 *
5847 * The returned \p resources life-time is the same as the \p result object.
5848 * @param cntp is updated to the number of elements in the array.
5849 */
5850RD_EXPORT const rd_kafka_ConfigResource_t **
5851rd_kafka_DescribeConfigs_result_resources (
5852 const rd_kafka_DescribeConfigs_result_t *result,
5853 size_t *cntp);
5854
5855/**@}*/
5856
5857
5858
5859/**
5860 * @name Security APIs
5861 * @{
5862 *
5863 */
5864
5865/**
5866 * @brief Set SASL/OAUTHBEARER token and metadata
5867 *
5868 * @param rk Client instance.
5869 * @param token_value the mandatory token value to set, often (but not
5870 * necessarily) a JWS compact serialization as per
5871 * https://tools.ietf.org/html/rfc7515#section-3.1.
5872 * @param md_lifetime_ms when the token expires, in terms of the number of
5873 * milliseconds since the epoch.
5874 * @param md_principal_name the mandatory Kafka principal name associated
5875 * with the token.
5876 * @param extensions optional SASL extensions key-value array with
5877 * \p extensions_size elements (number of keys * 2), where [i] is the key and
5878 * [i+1] is the key's value, to be communicated to the broker
5879 * as additional key-value pairs during the initial client response as per
5880 * https://tools.ietf.org/html/rfc7628#section-3.1. The key-value pairs are
5881 * copied.
5882 * @param extension_size the number of SASL extension keys plus values,
5883 * which must be a non-negative multiple of 2.
5884 * @param errstr A human readable error string (nul-terminated) is written to
5885 * this location that must be of at least \p errstr_size bytes.
5886 * The \p errstr is only written to if there is an error.
5887 *
5888 * The SASL/OAUTHBEARER token refresh callback or event handler should invoke
5889 * this method upon success. The extension keys must not include the reserved
5890 * key "`auth`", and all extension keys and values must conform to the required
5891 * format as per https://tools.ietf.org/html/rfc7628#section-3.1:
5892 *
5893 * key = 1*(ALPHA)
5894 * value = *(VCHAR / SP / HTAB / CR / LF )
5895 *
5896 * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise \p errstr set
5897 * and:<br>
5898 * \c RD_KAFKA_RESP_ERR__INVALID_ARG if any of the arguments are
5899 * invalid;<br>
5900 * \c RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not
5901 * supported by this build;<br>
5902 * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is supported but is
5903 * not configured as the client's authentication mechanism.<br>
5904 *
5905 * @sa rd_kafka_oauthbearer_set_token_failure
5906 * @sa rd_kafka_conf_set_oauthbearer_token_refresh_cb
5907 */
5908RD_EXPORT
5909rd_kafka_resp_err_t
5910rd_kafka_oauthbearer_set_token (rd_kafka_t *rk,
5911 const char *token_value,
5912 int64_t md_lifetime_ms,
5913 const char *md_principal_name,
5914 const char **extensions, size_t extension_size,
5915 char *errstr, size_t errstr_size);
5916
5917/**
5918 * @brief SASL/OAUTHBEARER token refresh failure indicator.
5919 *
5920 * @param rk Client instance.
5921 * @param errstr mandatory human readable error reason for failing to acquire
5922 * a token.
5923 *
5924 * The SASL/OAUTHBEARER token refresh callback or event handler should invoke
5925 * this method upon failure.
5926 *
5927 * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise:<br>
5928 * \c RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not
5929 * supported by this build;<br>
5930 * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is supported but is
5931 * not configured as the client's authentication mechanism,<br>
5932 * \c RD_KAFKA_RESP_ERR__INVALID_ARG if no error string is supplied.
5933 *
5934 * @sa rd_kafka_oauthbearer_set_token
5935 * @sa rd_kafka_conf_set_oauthbearer_token_refresh_cb
5936 */
5937RD_EXPORT
5938rd_kafka_resp_err_t
5939rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr);
5940
5941/**@}*/
5942
5943
5944#ifdef __cplusplus
5945}
5946#endif
5947#endif /* _RDKAFKA_H_ */
5948