1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2018 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | /** |
30 | * @file rdkafka.h |
31 | * @brief Apache Kafka C/C++ consumer and producer client library. |
32 | * |
33 | * rdkafka.h contains the public API for librdkafka. |
34 | * The API is documented in this file as comments prefixing the function, type, |
35 | * enum, define, etc. |
36 | * |
37 | * @sa For the C++ interface see rdkafkacpp.h |
38 | * |
39 | * @tableofcontents |
40 | */ |
41 | |
42 | |
43 | /* @cond NO_DOC */ |
44 | #ifndef _RDKAFKA_H_ |
45 | #define _RDKAFKA_H_ |
46 | |
47 | #include <stdio.h> |
48 | #include <inttypes.h> |
49 | #include <sys/types.h> |
50 | |
51 | #ifdef __cplusplus |
52 | extern "C" { |
53 | #if 0 |
54 | } /* Restore indent */ |
55 | #endif |
56 | #endif |
57 | |
58 | #ifdef _MSC_VER |
59 | #include <basetsd.h> |
60 | #ifndef WIN32_MEAN_AND_LEAN |
61 | #define WIN32_MEAN_AND_LEAN |
62 | #endif |
63 | #include <Winsock2.h> /* for sockaddr, .. */ |
64 | typedef SSIZE_T ssize_t; |
65 | #define RD_UNUSED |
66 | #define RD_INLINE __inline |
67 | #define RD_DEPRECATED __declspec(deprecated) |
68 | #undef RD_EXPORT |
69 | #ifdef LIBRDKAFKA_STATICLIB |
70 | #define RD_EXPORT |
71 | #else |
72 | #ifdef LIBRDKAFKA_EXPORTS |
73 | #define RD_EXPORT __declspec(dllexport) |
74 | #else |
75 | #define RD_EXPORT __declspec(dllimport) |
76 | #endif |
77 | #ifndef LIBRDKAFKA_TYPECHECKS |
78 | #define LIBRDKAFKA_TYPECHECKS 0 |
79 | #endif |
80 | #endif |
81 | |
82 | #else |
83 | #include <sys/socket.h> /* for sockaddr, .. */ |
84 | |
85 | #define RD_UNUSED __attribute__((unused)) |
86 | #define RD_INLINE inline |
87 | #define RD_EXPORT |
88 | #define RD_DEPRECATED __attribute__((deprecated)) |
89 | |
90 | #ifndef LIBRDKAFKA_TYPECHECKS |
91 | #define LIBRDKAFKA_TYPECHECKS 1 |
92 | #endif |
93 | #endif |
94 | |
95 | |
96 | /** |
97 | * @brief Type-checking macros |
98 | * Compile-time checking that \p ARG is of type \p TYPE. |
99 | * @returns \p RET |
100 | */ |
101 | #if LIBRDKAFKA_TYPECHECKS |
102 | #define _LRK_TYPECHECK(RET,TYPE,ARG) \ |
103 | ({ if (0) { TYPE __t RD_UNUSED = (ARG); } RET; }) |
104 | |
105 | #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \ |
106 | ({ \ |
107 | if (0) { \ |
108 | TYPE __t RD_UNUSED = (ARG); \ |
109 | TYPE2 __t2 RD_UNUSED = (ARG2); \ |
110 | } \ |
111 | RET; }) |
112 | |
113 | #define _LRK_TYPECHECK3(RET,TYPE,ARG,TYPE2,ARG2,TYPE3,ARG3) \ |
114 | ({ \ |
115 | if (0) { \ |
116 | TYPE __t RD_UNUSED = (ARG); \ |
117 | TYPE2 __t2 RD_UNUSED = (ARG2); \ |
118 | TYPE3 __t3 RD_UNUSED = (ARG3); \ |
119 | } \ |
120 | RET; }) |
121 | #else |
122 | #define _LRK_TYPECHECK(RET,TYPE,ARG) (RET) |
123 | #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) (RET) |
124 | #define _LRK_TYPECHECK3(RET,TYPE,ARG,TYPE2,ARG2,TYPE3,ARG3) (RET) |
125 | #endif |
126 | |
127 | /* @endcond */ |
128 | |
129 | |
130 | /** |
131 | * @name librdkafka version |
132 | * @{ |
133 | * |
134 | * |
135 | */ |
136 | |
137 | /** |
138 | * @brief librdkafka version |
139 | * |
140 | * Interpreted as hex \c MM.mm.rr.xx: |
141 | * - MM = Major |
142 | * - mm = minor |
143 | * - rr = revision |
144 | * - xx = pre-release id (0xff is the final release) |
145 | * |
146 | * E.g.: \c 0x000801ff = 0.8.1 |
147 | * |
148 | * @remark This value should only be used during compile time, |
149 | * for runtime checks of version use rd_kafka_version() |
150 | */ |
151 | #define RD_KAFKA_VERSION 0x010100ff |
152 | |
153 | /** |
154 | * @brief Returns the librdkafka version as integer. |
155 | * |
156 | * @returns Version integer. |
157 | * |
158 | * @sa See RD_KAFKA_VERSION for how to parse the integer format. |
159 | * @sa Use rd_kafka_version_str() to retreive the version as a string. |
160 | */ |
161 | RD_EXPORT |
162 | int rd_kafka_version(void); |
163 | |
164 | /** |
165 | * @brief Returns the librdkafka version as string. |
166 | * |
167 | * @returns Version string |
168 | */ |
169 | RD_EXPORT |
170 | const char *rd_kafka_version_str (void); |
171 | |
172 | /**@}*/ |
173 | |
174 | |
175 | /** |
176 | * @name Constants, errors, types |
177 | * @{ |
178 | * |
179 | * |
180 | */ |
181 | |
182 | |
183 | /** |
184 | * @enum rd_kafka_type_t |
185 | * |
186 | * @brief rd_kafka_t handle type. |
187 | * |
188 | * @sa rd_kafka_new() |
189 | */ |
190 | typedef enum rd_kafka_type_t { |
191 | RD_KAFKA_PRODUCER, /**< Producer client */ |
192 | RD_KAFKA_CONSUMER /**< Consumer client */ |
193 | } rd_kafka_type_t; |
194 | |
195 | |
196 | /** |
197 | * @enum Timestamp types |
198 | * |
199 | * @sa rd_kafka_message_timestamp() |
200 | */ |
201 | typedef enum rd_kafka_timestamp_type_t { |
202 | RD_KAFKA_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */ |
203 | RD_KAFKA_TIMESTAMP_CREATE_TIME, /**< Message creation time */ |
204 | RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME /**< Log append time */ |
205 | } rd_kafka_timestamp_type_t; |
206 | |
207 | |
208 | |
209 | /** |
210 | * @brief Retrieve supported debug contexts for use with the \c \"debug\" |
211 | * configuration property. (runtime) |
212 | * |
213 | * @returns Comma-separated list of available debugging contexts. |
214 | */ |
215 | RD_EXPORT |
216 | const char *rd_kafka_get_debug_contexts(void); |
217 | |
218 | /** |
219 | * @brief Supported debug contexts. (compile time) |
220 | * |
221 | * @deprecated This compile time value may be outdated at runtime due to |
222 | * linking another version of the library. |
223 | * Use rd_kafka_get_debug_contexts() instead. |
224 | */ |
225 | #define RD_KAFKA_DEBUG_CONTEXTS \ |
226 | "all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos" |
227 | |
228 | |
229 | /* @cond NO_DOC */ |
230 | /* Private types to provide ABI compatibility */ |
231 | typedef struct rd_kafka_s rd_kafka_t; |
232 | typedef struct rd_kafka_topic_s rd_kafka_topic_t; |
233 | typedef struct rd_kafka_conf_s rd_kafka_conf_t; |
234 | typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t; |
235 | typedef struct rd_kafka_queue_s rd_kafka_queue_t; |
236 | typedef struct rd_kafka_op_s rd_kafka_event_t; |
237 | typedef struct rd_kafka_topic_result_s rd_kafka_topic_result_t; |
238 | /* @endcond */ |
239 | |
240 | |
241 | /** |
242 | * @enum rd_kafka_resp_err_t |
243 | * @brief Error codes. |
244 | * |
245 | * The negative error codes delimited by two underscores |
246 | * (\c RD_KAFKA_RESP_ERR__..) denotes errors internal to librdkafka and are |
247 | * displayed as \c \"Local: \<error string..\>\", while the error codes |
248 | * delimited by a single underscore (\c RD_KAFKA_RESP_ERR_..) denote broker |
249 | * errors and are displayed as \c \"Broker: \<error string..\>\". |
250 | * |
251 | * @sa Use rd_kafka_err2str() to translate an error code a human readable string |
252 | */ |
253 | typedef enum { |
254 | /* Internal errors to rdkafka: */ |
255 | /** Begin internal error codes */ |
256 | RD_KAFKA_RESP_ERR__BEGIN = -200, |
257 | /** Received message is incorrect */ |
258 | RD_KAFKA_RESP_ERR__BAD_MSG = -199, |
259 | /** Bad/unknown compression */ |
260 | RD_KAFKA_RESP_ERR__BAD_COMPRESSION = -198, |
261 | /** Broker is going away */ |
262 | RD_KAFKA_RESP_ERR__DESTROY = -197, |
263 | /** Generic failure */ |
264 | RD_KAFKA_RESP_ERR__FAIL = -196, |
265 | /** Broker transport failure */ |
266 | RD_KAFKA_RESP_ERR__TRANSPORT = -195, |
267 | /** Critical system resource */ |
268 | RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE = -194, |
269 | /** Failed to resolve broker */ |
270 | RD_KAFKA_RESP_ERR__RESOLVE = -193, |
271 | /** Produced message timed out*/ |
272 | RD_KAFKA_RESP_ERR__MSG_TIMED_OUT = -192, |
273 | /** Reached the end of the topic+partition queue on |
274 | * the broker. Not really an error. */ |
275 | RD_KAFKA_RESP_ERR__PARTITION_EOF = -191, |
276 | /** Permanent: Partition does not exist in cluster. */ |
277 | RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION = -190, |
278 | /** File or filesystem error */ |
279 | RD_KAFKA_RESP_ERR__FS = -189, |
280 | /** Permanent: Topic does not exist in cluster. */ |
281 | RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188, |
282 | /** All broker connections are down. */ |
283 | RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN = -187, |
284 | /** Invalid argument, or invalid configuration */ |
285 | RD_KAFKA_RESP_ERR__INVALID_ARG = -186, |
286 | /** Operation timed out */ |
287 | RD_KAFKA_RESP_ERR__TIMED_OUT = -185, |
288 | /** Queue is full */ |
289 | RD_KAFKA_RESP_ERR__QUEUE_FULL = -184, |
290 | /** ISR count < required.acks */ |
291 | RD_KAFKA_RESP_ERR__ISR_INSUFF = -183, |
292 | /** Broker node update */ |
293 | RD_KAFKA_RESP_ERR__NODE_UPDATE = -182, |
294 | /** SSL error */ |
295 | RD_KAFKA_RESP_ERR__SSL = -181, |
296 | /** Waiting for coordinator to become available. */ |
297 | RD_KAFKA_RESP_ERR__WAIT_COORD = -180, |
298 | /** Unknown client group */ |
299 | RD_KAFKA_RESP_ERR__UNKNOWN_GROUP = -179, |
300 | /** Operation in progress */ |
301 | RD_KAFKA_RESP_ERR__IN_PROGRESS = -178, |
302 | /** Previous operation in progress, wait for it to finish. */ |
303 | RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS = -177, |
304 | /** This operation would interfere with an existing subscription */ |
305 | RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION = -176, |
306 | /** Assigned partitions (rebalance_cb) */ |
307 | RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS = -175, |
308 | /** Revoked partitions (rebalance_cb) */ |
309 | RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS = -174, |
310 | /** Conflicting use */ |
311 | RD_KAFKA_RESP_ERR__CONFLICT = -173, |
312 | /** Wrong state */ |
313 | RD_KAFKA_RESP_ERR__STATE = -172, |
314 | /** Unknown protocol */ |
315 | RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL = -171, |
316 | /** Not implemented */ |
317 | RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED = -170, |
318 | /** Authentication failure*/ |
319 | RD_KAFKA_RESP_ERR__AUTHENTICATION = -169, |
320 | /** No stored offset */ |
321 | RD_KAFKA_RESP_ERR__NO_OFFSET = -168, |
322 | /** Outdated */ |
323 | RD_KAFKA_RESP_ERR__OUTDATED = -167, |
324 | /** Timed out in queue */ |
325 | RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE = -166, |
326 | /** Feature not supported by broker */ |
327 | RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE = -165, |
328 | /** Awaiting cache update */ |
329 | RD_KAFKA_RESP_ERR__WAIT_CACHE = -164, |
330 | /** Operation interrupted (e.g., due to yield)) */ |
331 | RD_KAFKA_RESP_ERR__INTR = -163, |
332 | /** Key serialization error */ |
333 | RD_KAFKA_RESP_ERR__KEY_SERIALIZATION = -162, |
334 | /** Value serialization error */ |
335 | RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION = -161, |
336 | /** Key deserialization error */ |
337 | RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION = -160, |
338 | /** Value deserialization error */ |
339 | RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION = -159, |
340 | /** Partial response */ |
341 | RD_KAFKA_RESP_ERR__PARTIAL = -158, |
342 | /** Modification attempted on read-only object */ |
343 | RD_KAFKA_RESP_ERR__READ_ONLY = -157, |
344 | /** No such entry / item not found */ |
345 | RD_KAFKA_RESP_ERR__NOENT = -156, |
346 | /** Read underflow */ |
347 | RD_KAFKA_RESP_ERR__UNDERFLOW = -155, |
348 | /** Invalid type */ |
349 | RD_KAFKA_RESP_ERR__INVALID_TYPE = -154, |
350 | /** Retry operation */ |
351 | RD_KAFKA_RESP_ERR__RETRY = -153, |
352 | /** Purged in queue */ |
353 | RD_KAFKA_RESP_ERR__PURGE_QUEUE = -152, |
354 | /** Purged in flight */ |
355 | RD_KAFKA_RESP_ERR__PURGE_INFLIGHT = -151, |
356 | /** Fatal error: see rd_kafka_fatal_error() */ |
357 | RD_KAFKA_RESP_ERR__FATAL = -150, |
358 | /** Inconsistent state */ |
359 | RD_KAFKA_RESP_ERR__INCONSISTENT = -149, |
360 | /** Gap-less ordering would not be guaranteed if proceeding */ |
361 | RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE = -148, |
362 | /** Maximum poll interval exceeded */ |
363 | RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED = -147, |
364 | |
365 | /** End internal error codes */ |
366 | RD_KAFKA_RESP_ERR__END = -100, |
367 | |
368 | /* Kafka broker errors: */ |
369 | /** Unknown broker error */ |
370 | RD_KAFKA_RESP_ERR_UNKNOWN = -1, |
371 | /** Success */ |
372 | RD_KAFKA_RESP_ERR_NO_ERROR = 0, |
373 | /** Offset out of range */ |
374 | RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE = 1, |
375 | /** Invalid message */ |
376 | RD_KAFKA_RESP_ERR_INVALID_MSG = 2, |
377 | /** Unknown topic or partition */ |
378 | RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART = 3, |
379 | /** Invalid message size */ |
380 | RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE = 4, |
381 | /** Leader not available */ |
382 | RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE = 5, |
383 | /** Not leader for partition */ |
384 | RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION = 6, |
385 | /** Request timed out */ |
386 | RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT = 7, |
387 | /** Broker not available */ |
388 | RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE = 8, |
389 | /** Replica not available */ |
390 | RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE = 9, |
391 | /** Message size too large */ |
392 | RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE = 10, |
393 | /** StaleControllerEpochCode */ |
394 | RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH = 11, |
395 | /** Offset metadata string too large */ |
396 | RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE = 12, |
397 | /** Broker disconnected before response received */ |
398 | RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION = 13, |
399 | /** Group coordinator load in progress */ |
400 | RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS = 14, |
401 | /** Group coordinator not available */ |
402 | RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15, |
403 | /** Not coordinator for group */ |
404 | RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP = 16, |
405 | /** Invalid topic */ |
406 | RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION = 17, |
407 | /** Message batch larger than configured server segment size */ |
408 | RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE = 18, |
409 | /** Not enough in-sync replicas */ |
410 | RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS = 19, |
411 | /** Message(s) written to insufficient number of in-sync replicas */ |
412 | RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20, |
413 | /** Invalid required acks value */ |
414 | RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS = 21, |
415 | /** Specified group generation id is not valid */ |
416 | RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION = 22, |
417 | /** Inconsistent group protocol */ |
418 | RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL = 23, |
419 | /** Invalid group.id */ |
420 | RD_KAFKA_RESP_ERR_INVALID_GROUP_ID = 24, |
421 | /** Unknown member */ |
422 | RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID = 25, |
423 | /** Invalid session timeout */ |
424 | RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT = 26, |
425 | /** Group rebalance in progress */ |
426 | RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS = 27, |
427 | /** Commit offset data size is not valid */ |
428 | RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE = 28, |
429 | /** Topic authorization failed */ |
430 | RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED = 29, |
431 | /** Group authorization failed */ |
432 | RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED = 30, |
433 | /** Cluster authorization failed */ |
434 | RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED = 31, |
435 | /** Invalid timestamp */ |
436 | RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP = 32, |
437 | /** Unsupported SASL mechanism */ |
438 | RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM = 33, |
439 | /** Illegal SASL state */ |
440 | RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE = 34, |
441 | /** Unuspported version */ |
442 | RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION = 35, |
443 | /** Topic already exists */ |
444 | RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS = 36, |
445 | /** Invalid number of partitions */ |
446 | RD_KAFKA_RESP_ERR_INVALID_PARTITIONS = 37, |
447 | /** Invalid replication factor */ |
448 | RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR = 38, |
449 | /** Invalid replica assignment */ |
450 | RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT = 39, |
451 | /** Invalid config */ |
452 | RD_KAFKA_RESP_ERR_INVALID_CONFIG = 40, |
453 | /** Not controller for cluster */ |
454 | RD_KAFKA_RESP_ERR_NOT_CONTROLLER = 41, |
455 | /** Invalid request */ |
456 | RD_KAFKA_RESP_ERR_INVALID_REQUEST = 42, |
457 | /** Message format on broker does not support request */ |
458 | RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43, |
459 | /** Policy violation */ |
460 | RD_KAFKA_RESP_ERR_POLICY_VIOLATION = 44, |
461 | /** Broker received an out of order sequence number */ |
462 | RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45, |
463 | /** Broker received a duplicate sequence number */ |
464 | RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER = 46, |
465 | /** Producer attempted an operation with an old epoch */ |
466 | RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH = 47, |
467 | /** Producer attempted a transactional operation in an invalid state */ |
468 | RD_KAFKA_RESP_ERR_INVALID_TXN_STATE = 48, |
469 | /** Producer attempted to use a producer id which is not |
470 | * currently assigned to its transactional id */ |
471 | RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING = 49, |
472 | /** Transaction timeout is larger than the maximum |
473 | * value allowed by the broker's max.transaction.timeout.ms */ |
474 | RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT = 50, |
475 | /** Producer attempted to update a transaction while another |
476 | * concurrent operation on the same transaction was ongoing */ |
477 | RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS = 51, |
478 | /** Indicates that the transaction coordinator sending a |
479 | * WriteTxnMarker is no longer the current coordinator for a |
480 | * given producer */ |
481 | RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED = 52, |
482 | /** Transactional Id authorization failed */ |
483 | RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53, |
484 | /** Security features are disabled */ |
485 | RD_KAFKA_RESP_ERR_SECURITY_DISABLED = 54, |
486 | /** Operation not attempted */ |
487 | RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED = 55, |
488 | /** Disk error when trying to access log file on the disk */ |
489 | RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR = 56, |
490 | /** The user-specified log directory is not found in the broker config */ |
491 | RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND = 57, |
492 | /** SASL Authentication failed */ |
493 | RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED = 58, |
494 | /** Unknown Producer Id */ |
495 | RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID = 59, |
496 | /** Partition reassignment is in progress */ |
497 | RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS = 60, |
498 | /** Delegation Token feature is not enabled */ |
499 | RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61, |
500 | /** Delegation Token is not found on server */ |
501 | RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND = 62, |
502 | /** Specified Principal is not valid Owner/Renewer */ |
503 | RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63, |
504 | /** Delegation Token requests are not allowed on this connection */ |
505 | RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64, |
506 | /** Delegation Token authorization failed */ |
507 | RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65, |
508 | /** Delegation Token is expired */ |
509 | RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED = 66, |
510 | /** Supplied principalType is not supported */ |
511 | RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE = 67, |
512 | /** The group is not empty */ |
513 | RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP = 68, |
514 | /** The group id does not exist */ |
515 | RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND = 69, |
516 | /** The fetch session ID was not found */ |
517 | RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND = 70, |
518 | /** The fetch session epoch is invalid */ |
519 | RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH = 71, |
520 | /** No matching listener */ |
521 | RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND = 72, |
522 | /** Topic deletion is disabled */ |
523 | RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED = 73, |
524 | /** Leader epoch is older than broker epoch */ |
525 | RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH = 74, |
526 | /** Leader epoch is newer than broker epoch */ |
527 | RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH = 75, |
528 | /** Unsupported compression type */ |
529 | RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE = 76, |
530 | /** Broker epoch has changed */ |
531 | RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH = 77, |
532 | /** Leader high watermark is not caught up */ |
533 | RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE = 78, |
534 | /** Group member needs a valid member ID */ |
535 | RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED = 79, |
536 | /** Preferred leader was not available */ |
537 | RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80, |
538 | /** Consumer group has reached maximum size */ |
539 | RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED = 81, |
540 | |
541 | RD_KAFKA_RESP_ERR_END_ALL, |
542 | } rd_kafka_resp_err_t; |
543 | |
544 | |
545 | /** |
546 | * @brief Error code value, name and description. |
547 | * Typically for use with language bindings to automatically expose |
548 | * the full set of librdkafka error codes. |
549 | */ |
550 | struct rd_kafka_err_desc { |
551 | rd_kafka_resp_err_t code;/**< Error code */ |
552 | const char *name; /**< Error name, same as code enum sans prefix */ |
553 | const char *desc; /**< Human readable error description. */ |
554 | }; |
555 | |
556 | |
557 | /** |
558 | * @brief Returns the full list of error codes. |
559 | */ |
560 | RD_EXPORT |
561 | void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs, |
562 | size_t *cntp); |
563 | |
564 | |
565 | |
566 | |
567 | /** |
568 | * @brief Returns a human readable representation of a kafka error. |
569 | * |
570 | * @param err Error code to translate |
571 | */ |
572 | RD_EXPORT |
573 | const char *rd_kafka_err2str (rd_kafka_resp_err_t err); |
574 | |
575 | |
576 | |
577 | /** |
578 | * @brief Returns the error code name (enum name). |
579 | * |
580 | * @param err Error code to translate |
581 | */ |
582 | RD_EXPORT |
583 | const char *rd_kafka_err2name (rd_kafka_resp_err_t err); |
584 | |
585 | |
586 | /** |
587 | * @brief Returns the last error code generated by a legacy API call |
588 | * in the current thread. |
589 | * |
590 | * The legacy APIs are the ones using errno to propagate error value, namely: |
591 | * - rd_kafka_topic_new() |
592 | * - rd_kafka_consume_start() |
593 | * - rd_kafka_consume_stop() |
594 | * - rd_kafka_consume() |
595 | * - rd_kafka_consume_batch() |
596 | * - rd_kafka_consume_callback() |
597 | * - rd_kafka_consume_queue() |
598 | * - rd_kafka_produce() |
599 | * |
600 | * The main use for this function is to avoid converting system \p errno |
601 | * values to rd_kafka_resp_err_t codes for legacy APIs. |
602 | * |
603 | * @remark The last error is stored per-thread, if multiple rd_kafka_t handles |
604 | * are used in the same application thread the developer needs to |
605 | * make sure rd_kafka_last_error() is called immediately after |
606 | * a failed API call. |
607 | * |
608 | * @remark errno propagation from librdkafka is not safe on Windows |
609 | * and should not be used, use rd_kafka_last_error() instead. |
610 | */ |
611 | RD_EXPORT |
612 | rd_kafka_resp_err_t rd_kafka_last_error (void); |
613 | |
614 | |
615 | /** |
616 | * @brief Converts the system errno value \p errnox to a rd_kafka_resp_err_t |
617 | * error code upon failure from the following functions: |
618 | * - rd_kafka_topic_new() |
619 | * - rd_kafka_consume_start() |
620 | * - rd_kafka_consume_stop() |
621 | * - rd_kafka_consume() |
622 | * - rd_kafka_consume_batch() |
623 | * - rd_kafka_consume_callback() |
624 | * - rd_kafka_consume_queue() |
625 | * - rd_kafka_produce() |
626 | * |
627 | * @param errnox System errno value to convert |
628 | * |
629 | * @returns Appropriate error code for \p errnox |
630 | * |
631 | * @remark A better alternative is to call rd_kafka_last_error() immediately |
632 | * after any of the above functions return -1 or NULL. |
633 | * |
634 | * @deprecated Use rd_kafka_last_error() to retrieve the last error code |
635 | * set by the legacy librdkafka APIs. |
636 | * |
637 | * @sa rd_kafka_last_error() |
638 | */ |
639 | RD_EXPORT RD_DEPRECATED |
640 | rd_kafka_resp_err_t rd_kafka_errno2err(int errnox); |
641 | |
642 | |
643 | /** |
644 | * @brief Returns the thread-local system errno |
645 | * |
646 | * On most platforms this is the same as \p errno but in case of different |
647 | * runtimes between library and application (e.g., Windows static DLLs) |
648 | * this provides a means for exposing the errno librdkafka uses. |
649 | * |
650 | * @remark The value is local to the current calling thread. |
651 | * |
652 | * @deprecated Use rd_kafka_last_error() to retrieve the last error code |
653 | * set by the legacy librdkafka APIs. |
654 | */ |
655 | RD_EXPORT RD_DEPRECATED |
656 | int rd_kafka_errno (void); |
657 | |
658 | |
659 | |
660 | |
661 | /** |
662 | * @brief Returns the first fatal error set on this client instance, |
663 | * or RD_KAFKA_RESP_ERR_NO_ERROR if no fatal error has occurred. |
664 | * |
665 | * This function is to be used with the Idempotent Producer and \c error_cb |
666 | * to detect fatal errors. |
667 | * |
668 | * Generally all errors raised by \c error_cb are to be considered |
669 | * informational and temporary, the client will try to recover from all |
670 | * errors in a graceful fashion (by retrying, etc). |
671 | * |
672 | * However, some errors should logically be considered fatal to retain |
673 | * consistency; in particular a set of errors that may occur when using the |
674 | * Idempotent Producer and the in-order or exactly-once producer guarantees |
675 | * can't be satisfied. |
676 | * |
677 | * @param errstr A human readable error string (nul-terminated) is written to |
678 | * this location that must be of at least \p errstr_size bytes. |
679 | * The \p errstr is only written to if there is a fatal error. |
680 | * |
681 | * |
682 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if no fatal error has been raised, else |
683 | * any other error code. |
684 | */ |
685 | RD_EXPORT |
686 | rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk, |
687 | char *errstr, size_t errstr_size); |
688 | |
689 | |
690 | /** |
691 | * @brief Trigger a fatal error for testing purposes. |
692 | * |
693 | * Since there is no practical way to trigger real fatal errors in the |
694 | * idempotent producer, this method allows an application to trigger |
695 | * fabricated fatal errors in tests to check its error handling code. |
696 | * |
697 | * @param err The underlying error code. |
698 | * @param reason A human readable error reason. |
699 | * Will be prefixed with "test_fatal_error: " to differentiate |
700 | * from real fatal errors. |
701 | * |
702 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if a fatal error was triggered, or |
703 | * RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS if a previous fatal error |
704 | * has already been triggered. |
705 | */ |
706 | RD_EXPORT rd_kafka_resp_err_t |
707 | rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err, |
708 | const char *reason); |
709 | |
710 | |
711 | /** |
712 | * @brief Topic+Partition place holder |
713 | * |
714 | * Generic place holder for a Topic+Partition and its related information |
715 | * used for multiple purposes: |
716 | * - consumer offset (see rd_kafka_commit(), et.al.) |
717 | * - group rebalancing callback (rd_kafka_conf_set_rebalance_cb()) |
718 | * - offset commit result callback (rd_kafka_conf_set_offset_commit_cb()) |
719 | */ |
720 | |
721 | /** |
722 | * @brief Generic place holder for a specific Topic+Partition. |
723 | * |
724 | * @sa rd_kafka_topic_partition_list_new() |
725 | */ |
726 | typedef struct rd_kafka_topic_partition_s { |
727 | char *topic; /**< Topic name */ |
728 | int32_t partition; /**< Partition */ |
729 | int64_t offset; /**< Offset */ |
730 | void *metadata; /**< Metadata */ |
731 | size_t metadata_size; /**< Metadata size */ |
732 | void *opaque; /**< Application opaque */ |
733 | rd_kafka_resp_err_t err; /**< Error code, depending on use. */ |
734 | void *_private; /**< INTERNAL USE ONLY, |
735 | * INITIALIZE TO ZERO, DO NOT TOUCH */ |
736 | } rd_kafka_topic_partition_t; |
737 | |
738 | |
739 | /** |
740 | * @brief Destroy a rd_kafka_topic_partition_t. |
741 | * @remark This must not be called for elements in a topic partition list. |
742 | */ |
743 | RD_EXPORT |
744 | void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar); |
745 | |
746 | |
747 | /** |
748 | * @brief A growable list of Topic+Partitions. |
749 | * |
750 | */ |
751 | typedef struct rd_kafka_topic_partition_list_s { |
752 | int cnt; /**< Current number of elements */ |
753 | int size; /**< Current allocated size */ |
754 | rd_kafka_topic_partition_t *elems; /**< Element array[] */ |
755 | } rd_kafka_topic_partition_list_t; |
756 | |
757 | |
758 | /** |
759 | * @brief Create a new list/vector Topic+Partition container. |
760 | * |
761 | * @param size Initial allocated size used when the expected number of |
762 | * elements is known or can be estimated. |
763 | * Avoids reallocation and possibly relocation of the |
764 | * elems array. |
765 | * |
766 | * @returns A newly allocated Topic+Partition list. |
767 | * |
768 | * @remark Use rd_kafka_topic_partition_list_destroy() to free all resources |
769 | * in use by a list and the list itself. |
770 | * @sa rd_kafka_topic_partition_list_add() |
771 | */ |
772 | RD_EXPORT |
773 | rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size); |
774 | |
775 | |
776 | /** |
777 | * @brief Free all resources used by the list and the list itself. |
778 | */ |
779 | RD_EXPORT |
780 | void |
781 | rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rkparlist); |
782 | |
783 | /** |
784 | * @brief Add topic+partition to list |
785 | * |
786 | * @param rktparlist List to extend |
787 | * @param topic Topic name (copied) |
788 | * @param partition Partition id |
789 | * |
790 | * @returns The object which can be used to fill in additionals fields. |
791 | */ |
792 | RD_EXPORT |
793 | rd_kafka_topic_partition_t * |
794 | rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist, |
795 | const char *topic, int32_t partition); |
796 | |
797 | |
798 | /** |
799 | * @brief Add range of partitions from \p start to \p stop inclusive. |
800 | * |
801 | * @param rktparlist List to extend |
802 | * @param topic Topic name (copied) |
803 | * @param start Start partition of range |
804 | * @param stop Last partition of range (inclusive) |
805 | */ |
806 | RD_EXPORT |
807 | void |
808 | rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t |
809 | *rktparlist, |
810 | const char *topic, |
811 | int32_t start, int32_t stop); |
812 | |
813 | |
814 | |
815 | /** |
816 | * @brief Delete partition from list. |
817 | * |
818 | * @param rktparlist List to modify |
819 | * @param topic Topic name to match |
820 | * @param partition Partition to match |
821 | * |
822 | * @returns 1 if partition was found (and removed), else 0. |
823 | * |
824 | * @remark Any held indices to elems[] are unusable after this call returns 1. |
825 | */ |
826 | RD_EXPORT |
827 | int |
828 | rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist, |
829 | const char *topic, int32_t partition); |
830 | |
831 | |
832 | /** |
833 | * @brief Delete partition from list by elems[] index. |
834 | * |
835 | * @returns 1 if partition was found (and removed), else 0. |
836 | * |
837 | * @sa rd_kafka_topic_partition_list_del() |
838 | */ |
839 | RD_EXPORT |
840 | int |
841 | rd_kafka_topic_partition_list_del_by_idx ( |
842 | rd_kafka_topic_partition_list_t *rktparlist, |
843 | int idx); |
844 | |
845 | |
846 | /** |
847 | * @brief Make a copy of an existing list. |
848 | * |
849 | * @param src The existing list to copy. |
850 | * |
851 | * @returns A new list fully populated to be identical to \p src |
852 | */ |
853 | RD_EXPORT |
854 | rd_kafka_topic_partition_list_t * |
855 | rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src); |
856 | |
857 | |
858 | |
859 | |
860 | /** |
861 | * @brief Set offset to \p offset for \p topic and \p partition |
862 | * |
863 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or |
864 | * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if \p partition was not found |
865 | * in the list. |
866 | */ |
867 | RD_EXPORT |
868 | rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset ( |
869 | rd_kafka_topic_partition_list_t *rktparlist, |
870 | const char *topic, int32_t partition, int64_t offset); |
871 | |
872 | |
873 | |
874 | /** |
875 | * @brief Find element by \p topic and \p partition. |
876 | * |
877 | * @returns a pointer to the first matching element, or NULL if not found. |
878 | */ |
879 | RD_EXPORT |
880 | rd_kafka_topic_partition_t * |
881 | rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist, |
882 | const char *topic, int32_t partition); |
883 | |
884 | |
885 | /** |
886 | * @brief Sort list using comparator \p cmp. |
887 | * |
888 | * If \p cmp is NULL the default comparator will be used that |
889 | * sorts by ascending topic name and partition. |
890 | * |
891 | */ |
892 | RD_EXPORT void |
893 | rd_kafka_topic_partition_list_sort (rd_kafka_topic_partition_list_t *rktparlist, |
894 | int (*cmp) (const void *a, const void *b, |
895 | void *opaque), |
896 | void *opaque); |
897 | |
898 | |
899 | /**@}*/ |
900 | |
901 | |
902 | |
903 | /** |
904 | * @name Var-arg tag types |
905 | * @{ |
906 | * |
907 | */ |
908 | |
909 | /** |
910 | * @enum rd_kafka_vtype_t |
911 | * |
912 | * @brief Var-arg tag types |
913 | * |
914 | * @sa rd_kafka_producev() |
915 | */ |
916 | typedef enum rd_kafka_vtype_t { |
917 | RD_KAFKA_VTYPE_END, /**< va-arg sentinel */ |
918 | RD_KAFKA_VTYPE_TOPIC, /**< (const char *) Topic name */ |
919 | RD_KAFKA_VTYPE_RKT, /**< (rd_kafka_topic_t *) Topic handle */ |
920 | RD_KAFKA_VTYPE_PARTITION, /**< (int32_t) Partition */ |
921 | RD_KAFKA_VTYPE_VALUE, /**< (void *, size_t) Message value (payload)*/ |
922 | RD_KAFKA_VTYPE_KEY, /**< (void *, size_t) Message key */ |
923 | RD_KAFKA_VTYPE_OPAQUE, /**< (void *) Application opaque */ |
924 | RD_KAFKA_VTYPE_MSGFLAGS, /**< (int) RD_KAFKA_MSG_F_.. flags */ |
925 | RD_KAFKA_VTYPE_TIMESTAMP, /**< (int64_t) Milliseconds since epoch UTC */ |
926 | , /**< (const char *, const void *, ssize_t) |
927 | * Message Header */ |
928 | , /**< (rd_kafka_headers_t *) Headers list */ |
929 | } rd_kafka_vtype_t; |
930 | |
931 | |
932 | /** |
933 | * @brief Convenience macros for rd_kafka_vtype_t that takes the |
934 | * correct arguments for each vtype. |
935 | */ |
936 | |
937 | /*! |
938 | * va-arg end sentinel used to terminate the variable argument list |
939 | */ |
940 | #define RD_KAFKA_V_END RD_KAFKA_VTYPE_END |
941 | |
942 | /*! |
943 | * Topic name (const char *) |
944 | */ |
945 | #define RD_KAFKA_V_TOPIC(topic) \ |
946 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_TOPIC, const char *, topic), \ |
947 | (const char *)topic |
948 | /*! |
949 | * Topic object (rd_kafka_topic_t *) |
950 | */ |
951 | #define RD_KAFKA_V_RKT(rkt) \ |
952 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_RKT, rd_kafka_topic_t *, rkt), \ |
953 | (rd_kafka_topic_t *)rkt |
954 | /*! |
955 | * Partition (int32_t) |
956 | */ |
957 | #define RD_KAFKA_V_PARTITION(partition) \ |
958 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \ |
959 | (int32_t)partition |
960 | /*! |
961 | * Message value/payload pointer and length (void *, size_t) |
962 | */ |
963 | #define RD_KAFKA_V_VALUE(VALUE,LEN) \ |
964 | _LRK_TYPECHECK2(RD_KAFKA_VTYPE_VALUE, void *, VALUE, size_t, LEN), \ |
965 | (void *)VALUE, (size_t)LEN |
966 | /*! |
967 | * Message key pointer and length (const void *, size_t) |
968 | */ |
969 | #define RD_KAFKA_V_KEY(KEY,LEN) \ |
970 | _LRK_TYPECHECK2(RD_KAFKA_VTYPE_KEY, const void *, KEY, size_t, LEN), \ |
971 | (void *)KEY, (size_t)LEN |
972 | /*! |
973 | * Message opaque pointer (void *) |
974 | * Same as \c produce(.., msg_opaque), and \c rkmessage->_private . |
975 | */ |
976 | #define RD_KAFKA_V_OPAQUE(opaque) \ |
977 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_OPAQUE, void *, opaque), \ |
978 | (void *)opaque |
979 | /*! |
980 | * Message flags (int) |
981 | * @sa RD_KAFKA_MSG_F_COPY, et.al. |
982 | */ |
983 | #define RD_KAFKA_V_MSGFLAGS(msgflags) \ |
984 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_MSGFLAGS, int, msgflags), \ |
985 | (int)msgflags |
986 | /*! |
987 | * Timestamp in milliseconds since epoch UTC (int64_t). |
988 | * A value of 0 will use the current wall-clock time. |
989 | */ |
990 | #define RD_KAFKA_V_TIMESTAMP(timestamp) \ |
991 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_TIMESTAMP, int64_t, timestamp), \ |
992 | (int64_t)timestamp |
993 | /*! |
994 | * Add Message Header (const char *NAME, const void *VALUE, ssize_t LEN). |
995 | * @sa rd_kafka_header_add() |
996 | * @remark RD_KAFKA_V_HEADER() and RD_KAFKA_V_HEADERS() MUST NOT be mixed |
997 | * in the same call to producev(). |
998 | */ |
999 | #define (NAME,VALUE,LEN) \ |
1000 | _LRK_TYPECHECK3(RD_KAFKA_VTYPE_HEADER, const char *, NAME, \ |
1001 | const void *, VALUE, ssize_t, LEN), \ |
1002 | (const char *)NAME, (const void *)VALUE, (ssize_t)LEN |
1003 | |
1004 | /*! |
1005 | * Message Headers list (rd_kafka_headers_t *). |
1006 | * The message object will assume ownership of the headers (unless producev() |
1007 | * fails). |
1008 | * Any existing headers will be replaced. |
1009 | * @sa rd_kafka_message_set_headers() |
1010 | * @remark RD_KAFKA_V_HEADER() and RD_KAFKA_V_HEADERS() MUST NOT be mixed |
1011 | * in the same call to producev(). |
1012 | */ |
1013 | #define (HDRS) \ |
1014 | _LRK_TYPECHECK(RD_KAFKA_VTYPE_HEADERS, rd_kafka_headers_t *, HDRS), \ |
1015 | (rd_kafka_headers_t *)HDRS |
1016 | |
1017 | |
1018 | /**@}*/ |
1019 | |
1020 | |
1021 | /** |
1022 | * @name Message headers |
1023 | * @{ |
1024 | * |
1025 | * @brief Message headers consist of a list of (string key, binary value) pairs. |
1026 | * Duplicate keys are supported and the order in which keys were |
1027 | * added are retained. |
1028 | * |
1029 | * Header values are considered binary and may have three types of |
1030 | * value: |
1031 | * - proper value with size > 0 and a valid pointer |
1032 | * - empty value with size = 0 and any non-NULL pointer |
1033 | * - null value with size = 0 and a NULL pointer |
1034 | * |
1035 | * Headers require Apache Kafka broker version v0.11.0.0 or later. |
1036 | * |
1037 | * Header operations are O(n). |
1038 | */ |
1039 | |
1040 | typedef struct ; |
1041 | |
1042 | /** |
1043 | * @brief Create a new headers list. |
1044 | * |
1045 | * @param initial_count Preallocate space for this number of headers. |
1046 | * Any number of headers may be added, updated and |
1047 | * removed regardless of the initial count. |
1048 | */ |
1049 | RD_EXPORT rd_kafka_headers_t * (size_t initial_count); |
1050 | |
1051 | /** |
1052 | * @brief Destroy the headers list. The object and any returned value pointers |
1053 | * are not usable after this call. |
1054 | */ |
1055 | RD_EXPORT void (rd_kafka_headers_t *hdrs); |
1056 | |
1057 | /** |
1058 | * @brief Make a copy of headers list \p src. |
1059 | */ |
1060 | RD_EXPORT rd_kafka_headers_t * |
1061 | (const rd_kafka_headers_t *src); |
1062 | |
1063 | /** |
1064 | * @brief Add header with name \p name and value \p val (copied) of size |
1065 | * \p size (not including null-terminator). |
1066 | * |
1067 | * @param name Header name. |
1068 | * @param name_size Header name size (not including the null-terminator). |
1069 | * If -1 the \p name length is automatically acquired using |
1070 | * strlen(). |
1071 | * @param value Pointer to header value, or NULL (set size to 0 or -1). |
1072 | * @param value_size Size of header value. If -1 the \p value is assumed to be a |
1073 | * null-terminated string and the length is automatically |
1074 | * acquired using strlen(). |
1075 | * |
1076 | * @returns RD_KAFKA_RESP_ERR__READ_ONLY if the headers are read-only, |
1077 | * else RD_KAFKA_RESP_ERR_NO_ERROR. |
1078 | */ |
1079 | RD_EXPORT rd_kafka_resp_err_t |
1080 | (rd_kafka_headers_t *hdrs, |
1081 | const char *name, ssize_t name_size, |
1082 | const void *value, ssize_t value_size); |
1083 | |
1084 | /** |
1085 | * @brief Remove all headers for the given key (if any). |
1086 | * |
1087 | * @returns RD_KAFKA_RESP_ERR__READ_ONLY if the headers are read-only, |
1088 | * RD_KAFKA_RESP_ERR__NOENT if no matching headers were found, |
1089 | * else RD_KAFKA_RESP_ERR_NO_ERROR if headers were removed. |
1090 | */ |
1091 | RD_EXPORT rd_kafka_resp_err_t |
1092 | (rd_kafka_headers_t *hdrs, const char *name); |
1093 | |
1094 | |
1095 | /** |
1096 | * @brief Find last header in list \p hdrs matching \p name. |
1097 | * |
1098 | * @param name Header to find (last match). |
1099 | * @param valuep (out) Set to a (null-terminated) const pointer to the value |
1100 | * (may be NULL). |
1101 | * @param sizep (out) Set to the value's size (not including null-terminator). |
1102 | * |
1103 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if an entry was found, else |
1104 | * RD_KAFKA_RESP_ERR__NOENT. |
1105 | * |
1106 | * @remark The returned pointer in \p valuep includes a trailing null-terminator |
1107 | * that is not accounted for in \p sizep. |
1108 | * @remark The returned pointer is only valid as long as the headers list and |
1109 | * the header item is valid. |
1110 | */ |
1111 | RD_EXPORT rd_kafka_resp_err_t |
1112 | (const rd_kafka_headers_t *hdrs, |
1113 | const char *name, const void **valuep, size_t *sizep); |
1114 | |
1115 | /** |
1116 | * @brief Iterator for headers matching \p name. |
1117 | * |
1118 | * Same semantics as rd_kafka_header_get_last() |
1119 | * |
1120 | * @param hdrs Headers to iterate. |
1121 | * @param idx Iterator index, start at 0 and increment by one for each call |
1122 | * as long as RD_KAFKA_RESP_ERR_NO_ERROR is returned. |
1123 | * @param name Header name to match. |
1124 | * @param valuep (out) Set to a (null-terminated) const pointer to the value |
1125 | * (may be NULL). |
1126 | * @param sizep (out) Set to the value's size (not including null-terminator). |
1127 | */ |
1128 | RD_EXPORT rd_kafka_resp_err_t |
1129 | (const rd_kafka_headers_t *hdrs, size_t idx, |
1130 | const char *name, const void **valuep, size_t *sizep); |
1131 | |
1132 | |
1133 | /** |
1134 | * @brief Iterator for all headers. |
1135 | * |
1136 | * Same semantics as rd_kafka_header_get() |
1137 | * |
1138 | * @sa rd_kafka_header_get() |
1139 | */ |
1140 | RD_EXPORT rd_kafka_resp_err_t |
1141 | (const rd_kafka_headers_t *hdrs, size_t idx, |
1142 | const char **namep, |
1143 | const void **valuep, size_t *sizep); |
1144 | |
1145 | |
1146 | |
1147 | /**@}*/ |
1148 | |
1149 | |
1150 | |
1151 | /** |
1152 | * @name Kafka messages |
1153 | * @{ |
1154 | * |
1155 | */ |
1156 | |
1157 | |
1158 | |
1159 | // FIXME: This doesn't show up in docs for some reason |
1160 | // "Compound rd_kafka_message_t is not documented." |
1161 | |
1162 | /** |
1163 | * @brief A Kafka message as returned by the \c rd_kafka_consume*() family |
1164 | * of functions as well as provided to the Producer \c dr_msg_cb(). |
1165 | * |
1166 | * For the consumer this object has two purposes: |
1167 | * - provide the application with a consumed message. (\c err == 0) |
1168 | * - report per-topic+partition consumer errors (\c err != 0) |
1169 | * |
1170 | * The application must check \c err to decide what action to take. |
1171 | * |
1172 | * When the application is finished with a message it must call |
1173 | * rd_kafka_message_destroy() unless otherwise noted. |
1174 | */ |
1175 | typedef struct rd_kafka_message_s { |
1176 | rd_kafka_resp_err_t err; /**< Non-zero for error signaling. */ |
1177 | rd_kafka_topic_t *rkt; /**< Topic */ |
1178 | int32_t partition; /**< Partition */ |
1179 | void *payload; /**< Producer: original message payload. |
1180 | * Consumer: Depends on the value of \c err : |
1181 | * - \c err==0: Message payload. |
1182 | * - \c err!=0: Error string */ |
1183 | size_t len; /**< Depends on the value of \c err : |
1184 | * - \c err==0: Message payload length |
1185 | * - \c err!=0: Error string length */ |
1186 | void *key; /**< Depends on the value of \c err : |
1187 | * - \c err==0: Optional message key */ |
1188 | size_t key_len; /**< Depends on the value of \c err : |
1189 | * - \c err==0: Optional message key length*/ |
1190 | int64_t offset; /**< Consumer: |
1191 | * - Message offset (or offset for error |
1192 | * if \c err!=0 if applicable). |
1193 | * Producer, dr_msg_cb: |
1194 | * Message offset assigned by broker. |
1195 | * May be RD_KAFKA_OFFSET_INVALID |
1196 | * for retried messages when |
1197 | * idempotence is enabled. */ |
1198 | void *_private; /**< Consume: |
1199 | * - rdkafka private pointer: DO NOT MODIFY |
1200 | * - dr_msg_cb: |
1201 | * msg_opaque from produce() call */ |
1202 | } rd_kafka_message_t; |
1203 | |
1204 | |
1205 | /** |
1206 | * @brief Frees resources for \p rkmessage and hands ownership back to rdkafka. |
1207 | */ |
1208 | RD_EXPORT |
1209 | void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage); |
1210 | |
1211 | |
1212 | |
1213 | |
1214 | /** |
1215 | * @brief Returns the error string for an errored rd_kafka_message_t or NULL if |
1216 | * there was no error. |
1217 | * |
1218 | * @remark This function MUST NOT be used with the producer. |
1219 | */ |
1220 | static RD_INLINE const char * |
1221 | RD_UNUSED |
1222 | rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) { |
1223 | if (!rkmessage->err) |
1224 | return NULL; |
1225 | |
1226 | if (rkmessage->payload) |
1227 | return (const char *)rkmessage->payload; |
1228 | |
1229 | return rd_kafka_err2str(rkmessage->err); |
1230 | } |
1231 | |
1232 | |
1233 | |
1234 | /** |
1235 | * @brief Returns the message timestamp for a consumed message. |
1236 | * |
1237 | * The timestamp is the number of milliseconds since the epoch (UTC). |
1238 | * |
1239 | * \p tstype (if not NULL) is updated to indicate the type of timestamp. |
1240 | * |
1241 | * @returns message timestamp, or -1 if not available. |
1242 | * |
1243 | * @remark Message timestamps require broker version 0.10.0 or later. |
1244 | */ |
1245 | RD_EXPORT |
1246 | int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage, |
1247 | rd_kafka_timestamp_type_t *tstype); |
1248 | |
1249 | |
1250 | |
1251 | /** |
1252 | * @brief Returns the latency for a produced message measured from |
1253 | * the produce() call. |
1254 | * |
1255 | * @returns the latency in microseconds, or -1 if not available. |
1256 | */ |
1257 | RD_EXPORT |
1258 | int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage); |
1259 | |
1260 | |
1261 | /** |
1262 | * @brief Get the message header list. |
1263 | * |
1264 | * The returned pointer in \p *hdrsp is associated with the \p rkmessage and |
1265 | * must not be used after destruction of the message object or the header |
1266 | * list is replaced with rd_kafka_message_set_headers(). |
1267 | * |
1268 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if headers were returned, |
1269 | * RD_KAFKA_RESP_ERR__NOENT if the message has no headers, |
1270 | * or another error code if the headers could not be parsed. |
1271 | * |
1272 | * @remark Headers require broker version 0.11.0.0 or later. |
1273 | * |
1274 | * @remark As an optimization the raw protocol headers are parsed on |
1275 | * the first call to this function. |
1276 | */ |
1277 | RD_EXPORT rd_kafka_resp_err_t |
1278 | (const rd_kafka_message_t *rkmessage, |
1279 | rd_kafka_headers_t **hdrsp); |
1280 | |
1281 | /** |
1282 | * @brief Get the message header list and detach the list from the message |
1283 | * making the application the owner of the headers. |
1284 | * The application must eventually destroy the headers using |
1285 | * rd_kafka_headers_destroy(). |
1286 | * The message's headers will be set to NULL. |
1287 | * |
1288 | * Otherwise same semantics as rd_kafka_message_headers() |
1289 | * |
1290 | * @sa rd_kafka_message_headers |
1291 | */ |
1292 | RD_EXPORT rd_kafka_resp_err_t |
1293 | (rd_kafka_message_t *rkmessage, |
1294 | rd_kafka_headers_t **hdrsp); |
1295 | |
1296 | |
1297 | /** |
1298 | * @brief Replace the message's current headers with a new list. |
1299 | * |
1300 | * @param hdrs New header list. The message object assumes ownership of |
1301 | * the list, the list will be destroyed automatically with |
1302 | * the message object. |
1303 | * The new headers list may be updated until the message object |
1304 | * is passed or returned to librdkafka. |
1305 | * |
1306 | * @remark The existing headers object, if any, will be destroyed. |
1307 | */ |
1308 | RD_EXPORT |
1309 | void (rd_kafka_message_t *rkmessage, |
1310 | rd_kafka_headers_t *hdrs); |
1311 | |
1312 | |
1313 | /** |
1314 | * @brief Returns the number of header key/value pairs |
1315 | * |
1316 | * @param hdrs Headers to count |
1317 | */ |
1318 | RD_EXPORT size_t (const rd_kafka_headers_t *hdrs); |
1319 | |
1320 | |
1321 | /** |
1322 | * @enum rd_kafka_msg_status_t |
1323 | * @brief Message persistence status can be used by the application to |
1324 | * find out if a produced message was persisted in the topic log. |
1325 | */ |
1326 | typedef enum { |
1327 | /**< Message was never transmitted to the broker, or failed with |
1328 | * an error indicating it was not written to the log. |
1329 | * Application retry risks ordering, but not duplication. */ |
1330 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED = 0, |
1331 | |
1332 | /**< Message was transmitted to broker, but no acknowledgement was |
1333 | * received. |
1334 | * Application retry risks ordering and duplication. */ |
1335 | RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED = 1, |
1336 | |
1337 | /**< Message was written to the log and acknowledged by the broker. */ |
1338 | RD_KAFKA_MSG_STATUS_PERSISTED = 2 |
1339 | } rd_kafka_msg_status_t; |
1340 | |
1341 | |
1342 | /** |
1343 | * @brief Returns the message's persistence status in the topic log. |
1344 | * |
1345 | * @remark The message status is not available in on_acknowledgement |
1346 | * interceptors. |
1347 | */ |
1348 | RD_EXPORT rd_kafka_msg_status_t |
1349 | rd_kafka_message_status (const rd_kafka_message_t *rkmessage); |
1350 | |
1351 | /**@}*/ |
1352 | |
1353 | |
1354 | /** |
1355 | * @name Configuration interface |
1356 | * @{ |
1357 | * |
1358 | * @brief Main/global configuration property interface |
1359 | * |
1360 | */ |
1361 | |
1362 | /** |
1363 | * @enum rd_kafka_conf_res_t |
1364 | * @brief Configuration result type |
1365 | */ |
1366 | typedef enum { |
1367 | RD_KAFKA_CONF_UNKNOWN = -2, /**< Unknown configuration name. */ |
1368 | RD_KAFKA_CONF_INVALID = -1, /**< Invalid configuration value. */ |
1369 | RD_KAFKA_CONF_OK = 0 /**< Configuration okay */ |
1370 | } rd_kafka_conf_res_t; |
1371 | |
1372 | |
1373 | /** |
1374 | * @brief Create configuration object. |
1375 | * |
1376 | * When providing your own configuration to the \c rd_kafka_*_new_*() calls |
1377 | * the rd_kafka_conf_t objects needs to be created with this function |
1378 | * which will set up the defaults. |
1379 | * I.e.: |
1380 | * @code |
1381 | * rd_kafka_conf_t *myconf; |
1382 | * rd_kafka_conf_res_t res; |
1383 | * |
1384 | * myconf = rd_kafka_conf_new(); |
1385 | * res = rd_kafka_conf_set(myconf, "socket.timeout.ms", "600", |
1386 | * errstr, sizeof(errstr)); |
1387 | * if (res != RD_KAFKA_CONF_OK) |
1388 | * die("%s\n", errstr); |
1389 | * |
1390 | * rk = rd_kafka_new(..., myconf); |
1391 | * @endcode |
1392 | * |
1393 | * Please see CONFIGURATION.md for the default settings or use |
1394 | * rd_kafka_conf_properties_show() to provide the information at runtime. |
1395 | * |
1396 | * The properties are identical to the Apache Kafka configuration properties |
1397 | * whenever possible. |
1398 | * |
1399 | * @returns A new rd_kafka_conf_t object with defaults set. |
1400 | * |
1401 | * @sa rd_kafka_conf_set(), rd_kafka_conf_destroy() |
1402 | */ |
1403 | RD_EXPORT |
1404 | rd_kafka_conf_t *rd_kafka_conf_new(void); |
1405 | |
1406 | |
1407 | /** |
1408 | * @brief Destroys a conf object. |
1409 | */ |
1410 | RD_EXPORT |
1411 | void rd_kafka_conf_destroy(rd_kafka_conf_t *conf); |
1412 | |
1413 | |
1414 | /** |
1415 | * @brief Creates a copy/duplicate of configuration object \p conf |
1416 | * |
1417 | * @remark Interceptors are NOT copied to the new configuration object. |
1418 | * @sa rd_kafka_interceptor_f_on_conf_dup |
1419 | */ |
1420 | RD_EXPORT |
1421 | rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf); |
1422 | |
1423 | |
1424 | /** |
1425 | * @brief Same as rd_kafka_conf_dup() but with an array of property name |
1426 | * prefixes to filter out (ignore) when copying. |
1427 | */ |
1428 | RD_EXPORT |
1429 | rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf, |
1430 | size_t filter_cnt, |
1431 | const char **filter); |
1432 | |
1433 | |
1434 | |
1435 | /** |
1436 | * @returns the configuration object used by an rd_kafka_t instance. |
1437 | * For use with rd_kafka_conf_get(), et.al., to extract configuration |
1438 | * properties from a running client. |
1439 | * |
1440 | * @remark the returned object is read-only and its lifetime is the same |
1441 | * as the rd_kafka_t object. |
1442 | */ |
1443 | RD_EXPORT |
1444 | const rd_kafka_conf_t *rd_kafka_conf (rd_kafka_t *rk); |
1445 | |
1446 | |
1447 | /** |
1448 | * @brief Sets a configuration property. |
1449 | * |
1450 | * \p conf must have been previously created with rd_kafka_conf_new(). |
1451 | * |
1452 | * Fallthrough: |
1453 | * Topic-level configuration properties may be set using this interface |
1454 | * in which case they are applied on the \c default_topic_conf. |
1455 | * If no \c default_topic_conf has been set one will be created. |
1456 | * Any sub-sequent rd_kafka_conf_set_default_topic_conf() calls will |
1457 | * replace the current default topic configuration. |
1458 | * |
1459 | * @returns \c rd_kafka_conf_res_t to indicate success or failure. |
1460 | * In case of failure \p errstr is updated to contain a human readable |
1461 | * error string. |
1462 | */ |
1463 | RD_EXPORT |
1464 | rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, |
1465 | const char *name, |
1466 | const char *value, |
1467 | char *errstr, size_t errstr_size); |
1468 | |
1469 | |
1470 | /** |
1471 | * @brief Enable event sourcing. |
1472 | * \p events is a bitmask of \c RD_KAFKA_EVENT_* of events to enable |
1473 | * for consumption by `rd_kafka_queue_poll()`. |
1474 | */ |
1475 | RD_EXPORT |
1476 | void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events); |
1477 | |
1478 | |
1479 | /** |
1480 | * @brief Generic event callback to be used with the event API to trigger |
1481 | * callbacks for \c rd_kafka_event_t objects from a background |
1482 | * thread serving the background queue. |
1483 | * |
1484 | * How to use: |
1485 | * 1. First set the event callback on the configuration object with this |
1486 | * function, followed by creating an rd_kafka_t instance |
1487 | * with rd_kafka_new(). |
1488 | * 2. Get the instance's background queue with rd_kafka_queue_get_background() |
1489 | * and pass it as the reply/response queue to an API that takes an |
1490 | * event queue, such as rd_kafka_CreateTopics(). |
1491 | * 3. As the response event is ready and enqueued on the background queue the |
1492 | * event callback will be triggered from the background thread. |
1493 | * 4. Prior to destroying the client instance, loose your reference to the |
1494 | * background queue by calling rd_kafka_queue_destroy(). |
1495 | * |
1496 | * The application must destroy the \c rkev passed to \p event cb using |
1497 | * rd_kafka_event_destroy(). |
1498 | * |
1499 | * The \p event_cb \c opaque argument is the opaque set with |
1500 | * rd_kafka_conf_set_opaque(). |
1501 | * |
1502 | * @remark This callback is a specialized alternative to the poll-based |
1503 | * event API described in the Event interface section. |
1504 | * |
1505 | * @remark The \p event_cb will be called spontaneously from a background |
1506 | * thread completely managed by librdkafka. |
1507 | * Take care to perform proper locking of application objects. |
1508 | * |
1509 | * @warning The application MUST NOT call rd_kafka_destroy() from the |
1510 | * event callback. |
1511 | * |
1512 | * @sa rd_kafka_queue_get_background |
1513 | */ |
1514 | RD_EXPORT void |
1515 | rd_kafka_conf_set_background_event_cb (rd_kafka_conf_t *conf, |
1516 | void (*event_cb) (rd_kafka_t *rk, |
1517 | rd_kafka_event_t *rkev, |
1518 | void *opaque)); |
1519 | |
1520 | |
1521 | /** |
1522 | * @deprecated See rd_kafka_conf_set_dr_msg_cb() |
1523 | */ |
1524 | RD_EXPORT |
1525 | void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, |
1526 | void (*dr_cb) (rd_kafka_t *rk, |
1527 | void *payload, size_t len, |
1528 | rd_kafka_resp_err_t err, |
1529 | void *opaque, void *msg_opaque)); |
1530 | |
1531 | /** |
1532 | * @brief \b Producer: Set delivery report callback in provided \p conf object. |
1533 | * |
1534 | * The delivery report callback will be called once for each message |
1535 | * accepted by rd_kafka_produce() (et.al) with \p err set to indicate |
1536 | * the result of the produce request. |
1537 | * |
1538 | * The callback is called when a message is succesfully produced or |
1539 | * if librdkafka encountered a permanent failure. |
1540 | * Delivery errors occur when the retry count is exceeded, when the |
1541 | * message.timeout.ms timeout is exceeded or there is a permanent error |
1542 | * like RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART. |
1543 | * |
1544 | * An application must call rd_kafka_poll() at regular intervals to |
1545 | * serve queued delivery report callbacks. |
1546 | * |
1547 | * The broker-assigned offset can be retrieved with \c rkmessage->offset |
1548 | * and the timestamp can be retrieved using rd_kafka_message_timestamp(). |
1549 | * |
1550 | * @remark The Idempotent Producer may return invalid timestamp |
1551 | * (RD_KAFKA_TIMESTAMP_NOT_AVAILABLE), and |
1552 | * and offset (RD_KAFKA_OFFSET_INVALID) for retried messages |
1553 | * that were previously successfully delivered but not properly |
1554 | * acknowledged. |
1555 | */ |
1556 | RD_EXPORT |
1557 | void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf, |
1558 | void (*dr_msg_cb) (rd_kafka_t *rk, |
1559 | const rd_kafka_message_t * |
1560 | rkmessage, |
1561 | void *opaque)); |
1562 | |
1563 | |
1564 | /** |
1565 | * @brief \b Consumer: Set consume callback for use with rd_kafka_consumer_poll() |
1566 | * |
1567 | */ |
1568 | RD_EXPORT |
1569 | void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, |
1570 | void (*consume_cb) (rd_kafka_message_t * |
1571 | rkmessage, |
1572 | void *opaque)); |
1573 | |
1574 | /** |
1575 | * @brief \b Consumer: Set rebalance callback for use with |
1576 | * coordinated consumer group balancing. |
1577 | * |
1578 | * The \p err field is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS |
1579 | * or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS and 'partitions' |
1580 | * contains the full partition set that was either assigned or revoked. |
1581 | * |
1582 | * Registering a \p rebalance_cb turns off librdkafka's automatic |
1583 | * partition assignment/revocation and instead delegates that responsibility |
1584 | * to the application's \p rebalance_cb. |
1585 | * |
1586 | * The rebalance callback is responsible for updating librdkafka's |
1587 | * assignment set based on the two events: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS |
1588 | * and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle |
1589 | * arbitrary rebalancing failures where \p err is neither of those. |
1590 | * @remark In this latter case (arbitrary error), the application must |
1591 | * call rd_kafka_assign(rk, NULL) to synchronize state. |
1592 | * |
1593 | * Without a rebalance callback this is done automatically by librdkafka |
1594 | * but registering a rebalance callback gives the application flexibility |
1595 | * in performing other operations along with the assigning/revocation, |
1596 | * such as fetching offsets from an alternate location (on assign) |
1597 | * or manually committing offsets (on revoke). |
1598 | * |
1599 | * @remark The \p partitions list is destroyed by librdkafka on return |
1600 | * return from the rebalance_cb and must not be freed or |
1601 | * saved by the application. |
1602 | * |
1603 | * @remark Be careful when modifying the \p partitions list. |
1604 | * Changing this list should only be done to change the initial |
1605 | * offsets for each partition. |
1606 | * But a function like `rd_kafka_position()` might have unexpected |
1607 | * effects for instance when a consumer gets assigned a partition |
1608 | * it used to consume at an earlier rebalance. In this case, the |
1609 | * list of partitions will be updated with the old offset for that |
1610 | * partition. In this case, it is generally better to pass a copy |
1611 | * of the list (see `rd_kafka_topic_partition_list_copy()`). |
1612 | * The result of `rd_kafka_position()` is typically outdated in |
1613 | * RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS. |
1614 | * |
1615 | * The following example shows the application's responsibilities: |
1616 | * @code |
1617 | * static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, |
1618 | * rd_kafka_topic_partition_list_t *partitions, |
1619 | * void *opaque) { |
1620 | * |
1621 | * switch (err) |
1622 | * { |
1623 | * case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: |
1624 | * // application may load offets from arbitrary external |
1625 | * // storage here and update \p partitions |
1626 | * |
1627 | * rd_kafka_assign(rk, partitions); |
1628 | * break; |
1629 | * |
1630 | * case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: |
1631 | * if (manual_commits) // Optional explicit manual commit |
1632 | * rd_kafka_commit(rk, partitions, 0); // sync commit |
1633 | * |
1634 | * rd_kafka_assign(rk, NULL); |
1635 | * break; |
1636 | * |
1637 | * default: |
1638 | * handle_unlikely_error(err); |
1639 | * rd_kafka_assign(rk, NULL); // sync state |
1640 | * break; |
1641 | * } |
1642 | * } |
1643 | * @endcode |
1644 | */ |
1645 | RD_EXPORT |
1646 | void rd_kafka_conf_set_rebalance_cb ( |
1647 | rd_kafka_conf_t *conf, |
1648 | void (*rebalance_cb) (rd_kafka_t *rk, |
1649 | rd_kafka_resp_err_t err, |
1650 | rd_kafka_topic_partition_list_t *partitions, |
1651 | void *opaque)); |
1652 | |
1653 | |
1654 | |
1655 | /** |
1656 | * @brief \b Consumer: Set offset commit callback for use with consumer groups. |
1657 | * |
1658 | * The results of automatic or manual offset commits will be scheduled |
1659 | * for this callback and is served by rd_kafka_consumer_poll(). |
1660 | * |
1661 | * If no partitions had valid offsets to commit this callback will be called |
1662 | * with \p err == RD_KAFKA_RESP_ERR__NO_OFFSET which is not to be considered |
1663 | * an error. |
1664 | * |
1665 | * The \p offsets list contains per-partition information: |
1666 | * - \c offset: committed offset (attempted) |
1667 | * - \c err: commit error |
1668 | */ |
1669 | RD_EXPORT |
1670 | void rd_kafka_conf_set_offset_commit_cb ( |
1671 | rd_kafka_conf_t *conf, |
1672 | void (*offset_commit_cb) (rd_kafka_t *rk, |
1673 | rd_kafka_resp_err_t err, |
1674 | rd_kafka_topic_partition_list_t *offsets, |
1675 | void *opaque)); |
1676 | |
1677 | |
1678 | /** |
1679 | * @brief Set error callback in provided conf object. |
1680 | * |
1681 | * The error callback is used by librdkafka to signal warnings and errors |
1682 | * back to the application. |
1683 | * |
1684 | * These errors should generally be considered informational and non-permanent, |
1685 | * the client will try to recover automatically from all type of errors. |
1686 | * Given that the client and cluster configuration is correct the |
1687 | * application should treat these as temporary errors. |
1688 | * |
1689 | * \p error_cb will be triggered with \c err set to RD_KAFKA_RESP_ERR__FATAL |
1690 | * if a fatal error has been raised; in this case use rd_kafka_fatal_error() to |
1691 | * retrieve the fatal error code and error string, and then begin terminating |
1692 | * the client instance. |
1693 | * |
1694 | * If no \p error_cb is registered, or RD_KAFKA_EVENT_ERROR has not been set |
1695 | * with rd_kafka_conf_set_events, then the errors will be logged instead. |
1696 | */ |
1697 | RD_EXPORT |
1698 | void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, |
1699 | void (*error_cb) (rd_kafka_t *rk, int err, |
1700 | const char *reason, |
1701 | void *opaque)); |
1702 | |
1703 | /** |
1704 | * @brief Set throttle callback. |
1705 | * |
1706 | * The throttle callback is used to forward broker throttle times to the |
1707 | * application for Produce and Fetch (consume) requests. |
1708 | * |
1709 | * Callbacks are triggered whenever a non-zero throttle time is returned by |
1710 | * the broker, or when the throttle time drops back to zero. |
1711 | * |
1712 | * An application must call rd_kafka_poll() or rd_kafka_consumer_poll() at |
1713 | * regular intervals to serve queued callbacks. |
1714 | * |
1715 | * @remark Requires broker version 0.9.0 or later. |
1716 | */ |
1717 | RD_EXPORT |
1718 | void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf, |
1719 | void (*throttle_cb) ( |
1720 | rd_kafka_t *rk, |
1721 | const char *broker_name, |
1722 | int32_t broker_id, |
1723 | int throttle_time_ms, |
1724 | void *opaque)); |
1725 | |
1726 | |
1727 | /** |
1728 | * @brief Set logger callback. |
1729 | * |
1730 | * The default is to print to stderr, but a syslog logger is also available, |
1731 | * see rd_kafka_log_print and rd_kafka_log_syslog for the builtin alternatives. |
1732 | * Alternatively the application may provide its own logger callback. |
1733 | * Or pass \p func as NULL to disable logging. |
1734 | * |
1735 | * This is the configuration alternative to the deprecated rd_kafka_set_logger() |
1736 | * |
1737 | * @remark The log_cb will be called spontaneously from librdkafka's internal |
1738 | * threads unless logs have been forwarded to a poll queue through |
1739 | * \c rd_kafka_set_log_queue(). |
1740 | * An application MUST NOT call any librdkafka APIs or do any prolonged |
1741 | * work in a non-forwarded \c log_cb. |
1742 | */ |
1743 | RD_EXPORT |
1744 | void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, |
1745 | void (*log_cb) (const rd_kafka_t *rk, int level, |
1746 | const char *fac, const char *buf)); |
1747 | |
1748 | |
1749 | /** |
1750 | * @brief Set statistics callback in provided conf object. |
1751 | * |
1752 | * The statistics callback is triggered from rd_kafka_poll() every |
1753 | * \c statistics.interval.ms (needs to be configured separately). |
1754 | * Function arguments: |
1755 | * - \p rk - Kafka handle |
1756 | * - \p json - String containing the statistics data in JSON format |
1757 | * - \p json_len - Length of \p json string. |
1758 | * - \p opaque - Application-provided opaque. |
1759 | * |
1760 | * For more information on the format of \p json, see |
1761 | * https://github.com/edenhill/librdkafka/wiki/Statistics |
1762 | * |
1763 | * If the application wishes to hold on to the \p json pointer and free |
1764 | * it at a later time it must return 1 from the \p stats_cb. |
1765 | * If the application returns 0 from the \p stats_cb then librdkafka |
1766 | * will immediately free the \p json pointer. |
1767 | * |
1768 | * See STATISTICS.md for a full definition of the JSON object. |
1769 | */ |
1770 | RD_EXPORT |
1771 | void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, |
1772 | int (*stats_cb) (rd_kafka_t *rk, |
1773 | char *json, |
1774 | size_t json_len, |
1775 | void *opaque)); |
1776 | |
1777 | /** |
1778 | * @brief Set SASL/OAUTHBEARER token refresh callback in provided conf object. |
1779 | * |
1780 | * @param conf the configuration to mutate. |
1781 | * @param oauthbearer_token_refresh_cb the callback to set; callback function |
1782 | * arguments:<br> |
1783 | * \p rk - Kafka handle<br> |
1784 | * \p oauthbearer_config - Value of configuration property |
1785 | * sasl.oauthbearer.config. |
1786 | * \p opaque - Application-provided opaque set via |
1787 | * rd_kafka_conf_set_opaque() |
1788 | * |
1789 | * The SASL/OAUTHBEARER token refresh callback is triggered via rd_kafka_poll() |
1790 | * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, |
1791 | * typically based on the configuration defined in \c sasl.oauthbearer.config. |
1792 | * |
1793 | * The callback should invoke rd_kafka_oauthbearer_set_token() |
1794 | * or rd_kafka_oauthbearer_set_token_failure() to indicate success |
1795 | * or failure, respectively. |
1796 | * |
1797 | * The refresh operation is eventable and may be received via |
1798 | * rd_kafka_queue_poll() with an event type of |
1799 | * \c RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH. |
1800 | * |
1801 | * Note that before any SASL/OAUTHBEARER broker connection can succeed the |
1802 | * application must call rd_kafka_oauthbearer_set_token() once -- either |
1803 | * directly or, more typically, by invoking either rd_kafka_poll() or |
1804 | * rd_kafka_queue_poll() -- in order to cause retrieval of an initial token to |
1805 | * occur. |
1806 | * |
1807 | * An unsecured JWT refresh handler is provided by librdkafka for development |
1808 | * and testing purposes, it is enabled by setting |
1809 | * the \c enable.sasl.oauthbearer.unsecure.jwt property to true and is |
1810 | * mutually exclusive to using a refresh callback. |
1811 | */ |
1812 | RD_EXPORT |
1813 | void rd_kafka_conf_set_oauthbearer_token_refresh_cb ( |
1814 | rd_kafka_conf_t *conf, |
1815 | void (*oauthbearer_token_refresh_cb) (rd_kafka_t *rk, |
1816 | const char *oauthbearer_config, |
1817 | void *opaque)); |
1818 | |
1819 | /** |
1820 | * @brief Set socket callback. |
1821 | * |
1822 | * The socket callback is responsible for opening a socket |
1823 | * according to the supplied \p domain, \p type and \p protocol. |
1824 | * The socket shall be created with \c CLOEXEC set in a racefree fashion, if |
1825 | * possible. |
1826 | * |
1827 | * Default: |
1828 | * - on linux: racefree CLOEXEC |
1829 | * - others : non-racefree CLOEXEC |
1830 | * |
1831 | * @remark The callback will be called from an internal librdkafka thread. |
1832 | */ |
1833 | RD_EXPORT |
1834 | void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf, |
1835 | int (*socket_cb) (int domain, int type, |
1836 | int protocol, |
1837 | void *opaque)); |
1838 | |
1839 | |
1840 | |
1841 | /** |
1842 | * @brief Set connect callback. |
1843 | * |
1844 | * The connect callback is responsible for connecting socket \p sockfd |
1845 | * to peer address \p addr. |
1846 | * The \p id field contains the broker identifier. |
1847 | * |
1848 | * \p connect_cb shall return 0 on success (socket connected) or an error |
1849 | * number (errno) on error. |
1850 | * |
1851 | * @remark The callback will be called from an internal librdkafka thread. |
1852 | */ |
1853 | RD_EXPORT void |
1854 | rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf, |
1855 | int (*connect_cb) (int sockfd, |
1856 | const struct sockaddr *addr, |
1857 | int addrlen, |
1858 | const char *id, |
1859 | void *opaque)); |
1860 | |
1861 | /** |
1862 | * @brief Set close socket callback. |
1863 | * |
1864 | * Close a socket (optionally opened with socket_cb()). |
1865 | * |
1866 | * @remark The callback will be called from an internal librdkafka thread. |
1867 | */ |
1868 | RD_EXPORT void |
1869 | rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf, |
1870 | int (*closesocket_cb) (int sockfd, |
1871 | void *opaque)); |
1872 | |
1873 | |
1874 | |
1875 | #ifndef _MSC_VER |
1876 | /** |
1877 | * @brief Set open callback. |
1878 | * |
1879 | * The open callback is responsible for opening the file specified by |
1880 | * pathname, flags and mode. |
1881 | * The file shall be opened with \c CLOEXEC set in a racefree fashion, if |
1882 | * possible. |
1883 | * |
1884 | * Default: |
1885 | * - on linux: racefree CLOEXEC |
1886 | * - others : non-racefree CLOEXEC |
1887 | * |
1888 | * @remark The callback will be called from an internal librdkafka thread. |
1889 | */ |
1890 | RD_EXPORT |
1891 | void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf, |
1892 | int (*open_cb) (const char *pathname, |
1893 | int flags, mode_t mode, |
1894 | void *opaque)); |
1895 | #endif |
1896 | |
1897 | |
1898 | /** |
1899 | * @brief Sets the verification callback of the broker certificate |
1900 | * |
1901 | * The verification callback is triggered from internal librdkafka threads |
1902 | * upon connecting to a broker. On each connection attempt the callback |
1903 | * will be called for each certificate in the broker's certificate chain, |
1904 | * starting at the root certification, as long as the application callback |
1905 | * returns 1 (valid certificate). |
1906 | * \c broker_name and \c broker_id correspond to the broker the connection |
1907 | * is being made to. |
1908 | * The \c x509_error argument indicates if OpenSSL's verification of |
1909 | * the certificate succeed (0) or failed (an OpenSSL error code). |
1910 | * The application may set the SSL context error code by returning 0 |
1911 | * from the verify callback and providing a non-zero SSL context error code |
1912 | * in \p x509_error. |
1913 | * If the verify callback sets \x509_error to 0, returns 1, and the |
1914 | * original \p x509_error was non-zero, the error on the SSL context will |
1915 | * be cleared. |
1916 | * \p x509_error is always a valid pointer to an int. |
1917 | * |
1918 | * \c depth is the depth of the current certificate in the chain, starting |
1919 | * at the root certificate. |
1920 | * |
1921 | * The certificate itself is passed in binary DER format in \c buf of |
1922 | * size \c size. |
1923 | * |
1924 | * The callback must return 1 if verification succeeds, or |
1925 | * 0 if verification fails and then write a human-readable error message |
1926 | * to \c errstr (limited to \c errstr_size bytes, including nul-term). |
1927 | * |
1928 | * @returns RD_KAFKA_CONF_OK if SSL is supported in this build, else |
1929 | * RD_KAFKA_CONF_INVALID. |
1930 | * |
1931 | * @warning This callback will be called from internal librdkafka threads. |
1932 | * |
1933 | * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution |
1934 | * for a list of \p x509_error codes. |
1935 | */ |
1936 | RD_EXPORT |
1937 | rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb ( |
1938 | rd_kafka_conf_t *conf, |
1939 | int (*ssl_cert_verify_cb) (rd_kafka_t *rk, |
1940 | const char *broker_name, |
1941 | int32_t broker_id, |
1942 | int *x509_error, |
1943 | int depth, |
1944 | const char *buf, size_t size, |
1945 | char *errstr, size_t errstr_size, |
1946 | void *opaque)); |
1947 | |
1948 | |
1949 | /** |
1950 | * @enum rd_kafka_cert_type_t |
1951 | * |
1952 | * @brief SSL certificate type |
1953 | * |
1954 | * @sa rd_kafka_conf_set_ssl_cert |
1955 | */ |
1956 | typedef enum rd_kafka_cert_type_t { |
1957 | RD_KAFKA_CERT_PUBLIC_KEY, /**< Client's public key */ |
1958 | RD_KAFKA_CERT_PRIVATE_KEY, /**< Client's private key */ |
1959 | RD_KAFKA_CERT_CA, /**< CA certificate */ |
1960 | RD_KAFKA_CERT__CNT, |
1961 | } rd_kafka_cert_type_t; |
1962 | |
1963 | /** |
1964 | * @enum rd_kafka_cert_enc_t |
1965 | * |
1966 | * @brief SSL certificate encoding |
1967 | * |
1968 | * @sa rd_kafka_conf_set_ssl_cert |
1969 | */ |
1970 | typedef enum rd_kafka_cert_enc_t { |
1971 | RD_KAFKA_CERT_ENC_PKCS12, /**< PKCS#12 */ |
1972 | RD_KAFKA_CERT_ENC_DER, /**< DER / binary X.509 ASN1 */ |
1973 | RD_KAFKA_CERT_ENC_PEM, /**< PEM */ |
1974 | RD_KAFKA_CERT_ENC__CNT, |
1975 | } rd_kafka_cert_enc_t; |
1976 | |
1977 | |
1978 | /** |
1979 | * @brief Set certificate/key \p cert_type from the \p cert_enc encoded |
1980 | * memory at \p buffer of \p size bytes. |
1981 | * |
1982 | * @param conf Configuration object. |
1983 | * @param cert_type Certificate or key type to configure. |
1984 | * @param cert_enc Buffer \p encoding type. |
1985 | * @param buffer Memory pointer to encoded certificate or key. |
1986 | * The memory is not referenced after this function returns. |
1987 | * @param size Size of memory at \p buffer. |
1988 | * @param errstr Memory were a human-readable error string will be written |
1989 | * on failure. |
1990 | * @param errstr_size Size of \p errstr, including space for nul-terminator. |
1991 | * |
1992 | * @returns RD_KAFKA_CONF_OK on success or RD_KAFKA_CONF_INVALID if the |
1993 | * memory in \p buffer is of incorrect encoding, or if librdkafka |
1994 | * was not built with SSL support. |
1995 | * |
1996 | * @remark Calling this method multiple times with the same \p cert_type |
1997 | * will replace the previous value. |
1998 | * |
1999 | * @remark Calling this method with \p buffer set to NULL will clear the |
2000 | * configuration for \p cert_type. |
2001 | * |
2002 | * @remark The private key may require a password, which must be specified |
2003 | * with the `ssl.key.password` configuration property prior to |
2004 | * calling this function. |
2005 | * |
2006 | * @remark Private and public keys in PEM format may also be set with the |
2007 | * `ssl.key.pem` and `ssl.certificate.pem` configuration properties. |
2008 | */ |
2009 | RD_EXPORT rd_kafka_conf_res_t |
2010 | rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf, |
2011 | rd_kafka_cert_type_t cert_type, |
2012 | rd_kafka_cert_enc_t cert_enc, |
2013 | const void *buffer, size_t size, |
2014 | char *errstr, size_t errstr_size); |
2015 | |
2016 | |
2017 | /** |
2018 | * @brief Sets the application's opaque pointer that will be passed to callbacks |
2019 | */ |
2020 | RD_EXPORT |
2021 | void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque); |
2022 | |
2023 | /** |
2024 | * @brief Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque() |
2025 | */ |
2026 | RD_EXPORT |
2027 | void *rd_kafka_opaque(const rd_kafka_t *rk); |
2028 | |
2029 | |
2030 | |
2031 | /** |
2032 | * @brief Sets the default topic configuration to use for automatically |
2033 | * subscribed topics (e.g., through pattern-matched topics). |
2034 | * The topic config object is not usable after this call. |
2035 | * |
2036 | * @warning Any topic configuration settings that have been set on the |
2037 | * global rd_kafka_conf_t object will be overwritten by this call |
2038 | * since the implicitly created default topic config object is |
2039 | * replaced by the user-supplied one. |
2040 | * |
2041 | * @deprecated Set default topic level configuration on the |
2042 | * global rd_kafka_conf_t object instead. |
2043 | */ |
2044 | RD_EXPORT |
2045 | void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf, |
2046 | rd_kafka_topic_conf_t *tconf); |
2047 | |
2048 | |
2049 | |
2050 | /** |
2051 | * @brief Retrieve configuration value for property \p name. |
2052 | * |
2053 | * If \p dest is non-NULL the value will be written to \p dest with at |
2054 | * most \p dest_size. |
2055 | * |
2056 | * \p *dest_size is updated to the full length of the value, thus if |
2057 | * \p *dest_size initially is smaller than the full length the application |
2058 | * may reallocate \p dest to fit the returned \p *dest_size and try again. |
2059 | * |
2060 | * If \p dest is NULL only the full length of the value is returned. |
2061 | * |
2062 | * Fallthrough: |
2063 | * Topic-level configuration properties from the \c default_topic_conf |
2064 | * may be retrieved using this interface. |
2065 | * |
2066 | * @returns \p RD_KAFKA_CONF_OK if the property name matched, else |
2067 | * \p RD_KAFKA_CONF_UNKNOWN. |
2068 | */ |
2069 | RD_EXPORT |
2070 | rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf, |
2071 | const char *name, |
2072 | char *dest, size_t *dest_size); |
2073 | |
2074 | |
2075 | /** |
2076 | * @brief Retrieve topic configuration value for property \p name. |
2077 | * |
2078 | * @sa rd_kafka_conf_get() |
2079 | */ |
2080 | RD_EXPORT |
2081 | rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf, |
2082 | const char *name, |
2083 | char *dest, size_t *dest_size); |
2084 | |
2085 | |
2086 | /** |
2087 | * @brief Dump the configuration properties and values of \p conf to an array |
2088 | * with \"key\", \"value\" pairs. |
2089 | * |
2090 | * The number of entries in the array is returned in \p *cntp. |
2091 | * |
2092 | * The dump must be freed with `rd_kafka_conf_dump_free()`. |
2093 | */ |
2094 | RD_EXPORT |
2095 | const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp); |
2096 | |
2097 | |
2098 | /** |
2099 | * @brief Dump the topic configuration properties and values of \p conf |
2100 | * to an array with \"key\", \"value\" pairs. |
2101 | * |
2102 | * The number of entries in the array is returned in \p *cntp. |
2103 | * |
2104 | * The dump must be freed with `rd_kafka_conf_dump_free()`. |
2105 | */ |
2106 | RD_EXPORT |
2107 | const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf, |
2108 | size_t *cntp); |
2109 | |
2110 | /** |
2111 | * @brief Frees a configuration dump returned from `rd_kafka_conf_dump()` or |
2112 | * `rd_kafka_topic_conf_dump(). |
2113 | */ |
2114 | RD_EXPORT |
2115 | void rd_kafka_conf_dump_free(const char **arr, size_t cnt); |
2116 | |
2117 | /** |
2118 | * @brief Prints a table to \p fp of all supported configuration properties, |
2119 | * their default values as well as a description. |
2120 | */ |
2121 | RD_EXPORT |
2122 | void rd_kafka_conf_properties_show(FILE *fp); |
2123 | |
2124 | /**@}*/ |
2125 | |
2126 | |
2127 | /** |
2128 | * @name Topic configuration |
2129 | * @{ |
2130 | * |
2131 | * @brief Topic configuration property interface |
2132 | * |
2133 | */ |
2134 | |
2135 | |
2136 | /** |
2137 | * @brief Create topic configuration object |
2138 | * |
2139 | * @sa Same semantics as for rd_kafka_conf_new(). |
2140 | */ |
2141 | RD_EXPORT |
2142 | rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void); |
2143 | |
2144 | |
2145 | /** |
2146 | * @brief Creates a copy/duplicate of topic configuration object \p conf. |
2147 | */ |
2148 | RD_EXPORT |
2149 | rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t |
2150 | *conf); |
2151 | |
2152 | /** |
2153 | * @brief Creates a copy/duplicate of \p rk 's default topic configuration |
2154 | * object. |
2155 | */ |
2156 | RD_EXPORT |
2157 | rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup (rd_kafka_t *rk); |
2158 | |
2159 | |
2160 | /** |
2161 | * @brief Destroys a topic conf object. |
2162 | */ |
2163 | RD_EXPORT |
2164 | void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf); |
2165 | |
2166 | |
2167 | /** |
2168 | * @brief Sets a single rd_kafka_topic_conf_t value by property name. |
2169 | * |
2170 | * \p topic_conf should have been previously set up |
2171 | * with `rd_kafka_topic_conf_new()`. |
2172 | * |
2173 | * @returns rd_kafka_conf_res_t to indicate success or failure. |
2174 | */ |
2175 | RD_EXPORT |
2176 | rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, |
2177 | const char *name, |
2178 | const char *value, |
2179 | char *errstr, size_t errstr_size); |
2180 | |
2181 | /** |
2182 | * @brief Sets the application's opaque pointer that will be passed to all topic |
2183 | * callbacks as the \c rkt_opaque argument. |
2184 | */ |
2185 | RD_EXPORT |
2186 | void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque); |
2187 | |
2188 | |
2189 | /** |
2190 | * @brief \b Producer: Set partitioner callback in provided topic conf object. |
2191 | * |
2192 | * The partitioner may be called in any thread at any time, |
2193 | * it may be called multiple times for the same message/key. |
2194 | * |
2195 | * Partitioner function constraints: |
2196 | * - MUST NOT call any rd_kafka_*() functions except: |
2197 | * rd_kafka_topic_partition_available() |
2198 | * - MUST NOT block or execute for prolonged periods of time. |
2199 | * - MUST return a value between 0 and partition_cnt-1, or the |
2200 | * special \c RD_KAFKA_PARTITION_UA value if partitioning |
2201 | * could not be performed. |
2202 | */ |
2203 | RD_EXPORT |
2204 | void |
2205 | rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf, |
2206 | int32_t (*partitioner) ( |
2207 | const rd_kafka_topic_t *rkt, |
2208 | const void *keydata, |
2209 | size_t keylen, |
2210 | int32_t partition_cnt, |
2211 | void *rkt_opaque, |
2212 | void *msg_opaque)); |
2213 | |
2214 | |
2215 | /** |
2216 | * @brief \b Producer: Set message queueing order comparator callback. |
2217 | * |
2218 | * The callback may be called in any thread at any time, |
2219 | * it may be called multiple times for the same message. |
2220 | * |
2221 | * Ordering comparator function constraints: |
2222 | * - MUST be stable sort (same input gives same output). |
2223 | * - MUST NOT call any rd_kafka_*() functions. |
2224 | * - MUST NOT block or execute for prolonged periods of time. |
2225 | * |
2226 | * The comparator shall compare the two messages and return: |
2227 | * - < 0 if message \p a should be inserted before message \p b. |
2228 | * - >=0 if message \p a should be inserted after message \p b. |
2229 | * |
2230 | * @remark Insert sorting will be used to enqueue the message in the |
2231 | * correct queue position, this comes at a cost of O(n). |
2232 | * |
2233 | * @remark If `queuing.strategy=fifo` new messages are enqueued to the |
2234 | * tail of the queue regardless of msg_order_cmp, but retried messages |
2235 | * are still affected by msg_order_cmp. |
2236 | * |
2237 | * @warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL, |
2238 | * DO NOT USE IN PRODUCTION. |
2239 | */ |
2240 | RD_EXPORT void |
2241 | rd_kafka_topic_conf_set_msg_order_cmp (rd_kafka_topic_conf_t *topic_conf, |
2242 | int (*msg_order_cmp) ( |
2243 | const rd_kafka_message_t *a, |
2244 | const rd_kafka_message_t *b)); |
2245 | |
2246 | |
2247 | /** |
2248 | * @brief Check if partition is available (has a leader broker). |
2249 | * |
2250 | * @returns 1 if the partition is available, else 0. |
2251 | * |
2252 | * @warning This function must only be called from inside a partitioner function |
2253 | */ |
2254 | RD_EXPORT |
2255 | int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt, |
2256 | int32_t partition); |
2257 | |
2258 | |
2259 | /******************************************************************* |
2260 | * * |
2261 | * Partitioners provided by rdkafka * |
2262 | * * |
2263 | *******************************************************************/ |
2264 | |
2265 | /** |
2266 | * @brief Random partitioner. |
2267 | * |
2268 | * Will try not to return unavailable partitions. |
2269 | * |
2270 | * @returns a random partition between 0 and \p partition_cnt - 1. |
2271 | * |
2272 | */ |
2273 | RD_EXPORT |
2274 | int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, |
2275 | const void *key, size_t keylen, |
2276 | int32_t partition_cnt, |
2277 | void *opaque, void *msg_opaque); |
2278 | |
2279 | /** |
2280 | * @brief Consistent partitioner. |
2281 | * |
2282 | * Uses consistent hashing to map identical keys onto identical partitions. |
2283 | * |
2284 | * @returns a \"random\" partition between 0 and \p partition_cnt - 1 based on |
2285 | * the CRC value of the key |
2286 | */ |
2287 | RD_EXPORT |
2288 | int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt, |
2289 | const void *key, size_t keylen, |
2290 | int32_t partition_cnt, |
2291 | void *opaque, void *msg_opaque); |
2292 | |
2293 | /** |
2294 | * @brief Consistent-Random partitioner. |
2295 | * |
2296 | * This is the default partitioner. |
2297 | * Uses consistent hashing to map identical keys onto identical partitions, and |
2298 | * messages without keys will be assigned via the random partitioner. |
2299 | * |
2300 | * @returns a \"random\" partition between 0 and \p partition_cnt - 1 based on |
2301 | * the CRC value of the key (if provided) |
2302 | */ |
2303 | RD_EXPORT |
2304 | int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt, |
2305 | const void *key, size_t keylen, |
2306 | int32_t partition_cnt, |
2307 | void *opaque, void *msg_opaque); |
2308 | |
2309 | |
2310 | /** |
2311 | * @brief Murmur2 partitioner (Java compatible). |
2312 | * |
2313 | * Uses consistent hashing to map identical keys onto identical partitions |
2314 | * using Java-compatible Murmur2 hashing. |
2315 | * |
2316 | * @returns a partition between 0 and \p partition_cnt - 1. |
2317 | */ |
2318 | RD_EXPORT |
2319 | int32_t rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt, |
2320 | const void *key, size_t keylen, |
2321 | int32_t partition_cnt, |
2322 | void *rkt_opaque, |
2323 | void *msg_opaque); |
2324 | |
2325 | /** |
2326 | * @brief Consistent-Random Murmur2 partitioner (Java compatible). |
2327 | * |
2328 | * Uses consistent hashing to map identical keys onto identical partitions |
2329 | * using Java-compatible Murmur2 hashing. |
2330 | * Messages without keys will be assigned via the random partitioner. |
2331 | * |
2332 | * @returns a partition between 0 and \p partition_cnt - 1. |
2333 | */ |
2334 | RD_EXPORT |
2335 | int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt, |
2336 | const void *key, size_t keylen, |
2337 | int32_t partition_cnt, |
2338 | void *rkt_opaque, |
2339 | void *msg_opaque); |
2340 | |
2341 | |
2342 | /**@}*/ |
2343 | |
2344 | |
2345 | |
2346 | /** |
2347 | * @name Main Kafka and Topic object handles |
2348 | * @{ |
2349 | * |
2350 | * |
2351 | */ |
2352 | |
2353 | |
2354 | |
2355 | |
2356 | /** |
2357 | * @brief Creates a new Kafka handle and starts its operation according to the |
2358 | * specified \p type (\p RD_KAFKA_CONSUMER or \p RD_KAFKA_PRODUCER). |
2359 | * |
2360 | * \p conf is an optional struct created with `rd_kafka_conf_new()` that will |
2361 | * be used instead of the default configuration. |
2362 | * The \p conf object is freed by this function on success and must not be used |
2363 | * or destroyed by the application sub-sequently. |
2364 | * See `rd_kafka_conf_set()` et.al for more information. |
2365 | * |
2366 | * \p errstr must be a pointer to memory of at least size \p errstr_size where |
2367 | * `rd_kafka_new()` may write a human readable error message in case the |
2368 | * creation of a new handle fails. In which case the function returns NULL. |
2369 | * |
2370 | * @remark \b RD_KAFKA_CONSUMER: When a new \p RD_KAFKA_CONSUMER |
2371 | * rd_kafka_t handle is created it may either operate in the |
2372 | * legacy simple consumer mode using the rd_kafka_consume_start() |
2373 | * interface, or the High-level KafkaConsumer API. |
2374 | * @remark An application must only use one of these groups of APIs on a given |
2375 | * rd_kafka_t RD_KAFKA_CONSUMER handle. |
2376 | |
2377 | * |
2378 | * @returns The Kafka handle on success or NULL on error (see \p errstr) |
2379 | * |
2380 | * @sa To destroy the Kafka handle, use rd_kafka_destroy(). |
2381 | */ |
2382 | RD_EXPORT |
2383 | rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, |
2384 | char *errstr, size_t errstr_size); |
2385 | |
2386 | |
2387 | /** |
2388 | * @brief Destroy Kafka handle. |
2389 | * |
2390 | * @remark This is a blocking operation. |
2391 | * @remark rd_kafka_consumer_close() will be called from this function |
2392 | * if the instance type is RD_KAFKA_CONSUMER, a \c group.id was |
2393 | * configured, and the rd_kafka_consumer_close() was not |
2394 | * explicitly called by the application. This in turn may |
2395 | * trigger consumer callbacks, such as rebalance_cb. |
2396 | * Use rd_kafka_destroy_flags() with |
2397 | * RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE to avoid this behaviour. |
2398 | * |
2399 | * @sa rd_kafka_destroy_flags() |
2400 | */ |
2401 | RD_EXPORT |
2402 | void rd_kafka_destroy(rd_kafka_t *rk); |
2403 | |
2404 | |
2405 | /** |
2406 | * @brief Destroy Kafka handle according to specified destroy flags |
2407 | * |
2408 | */ |
2409 | RD_EXPORT |
2410 | void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags); |
2411 | |
2412 | /** |
2413 | * @brief Flags for rd_kafka_destroy_flags() |
2414 | */ |
2415 | |
2416 | /*! |
2417 | * Don't call consumer_close() to leave group and commit final offsets. |
2418 | * |
2419 | * This also disables consumer callbacks to be called from rd_kafka_destroy*(), |
2420 | * such as rebalance_cb. |
2421 | * |
2422 | * The consumer group handler is still closed internally, but from an |
2423 | * application perspective none of the functionality from consumer_close() |
2424 | * is performed. |
2425 | */ |
2426 | #define RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE 0x8 |
2427 | |
2428 | |
2429 | |
2430 | /** |
2431 | * @brief Returns Kafka handle name. |
2432 | */ |
2433 | RD_EXPORT |
2434 | const char *rd_kafka_name(const rd_kafka_t *rk); |
2435 | |
2436 | |
2437 | /** |
2438 | * @brief Returns Kafka handle type. |
2439 | */ |
2440 | RD_EXPORT |
2441 | rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk); |
2442 | |
2443 | |
2444 | /** |
2445 | * @brief Returns this client's broker-assigned group member id |
2446 | * |
2447 | * @remark This currently requires the high-level KafkaConsumer |
2448 | * |
2449 | * @returns An allocated string containing the current broker-assigned group |
2450 | * member id, or NULL if not available. |
2451 | * The application must free the string with \p free() or |
2452 | * rd_kafka_mem_free() |
2453 | */ |
2454 | RD_EXPORT |
2455 | char *rd_kafka_memberid (const rd_kafka_t *rk); |
2456 | |
2457 | |
2458 | |
2459 | /** |
2460 | * @brief Returns the ClusterId as reported in broker metadata. |
2461 | * |
2462 | * @param timeout_ms If there is no cached value from metadata retrieval |
2463 | * then this specifies the maximum amount of time |
2464 | * (in milliseconds) the call will block waiting |
2465 | * for metadata to be retrieved. |
2466 | * Use 0 for non-blocking calls. |
2467 | |
2468 | * @remark Requires broker version >=0.10.0 and api.version.request=true. |
2469 | * |
2470 | * @remark The application must free the returned pointer |
2471 | * using rd_kafka_mem_free(). |
2472 | * |
2473 | * @returns a newly allocated string containing the ClusterId, or NULL |
2474 | * if no ClusterId could be retrieved in the allotted timespan. |
2475 | */ |
2476 | RD_EXPORT |
2477 | char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms); |
2478 | |
2479 | |
2480 | /** |
2481 | * @brief Returns the current ControllerId as reported in broker metadata. |
2482 | * |
2483 | * @param timeout_ms If there is no cached value from metadata retrieval |
2484 | * then this specifies the maximum amount of time |
2485 | * (in milliseconds) the call will block waiting |
2486 | * for metadata to be retrieved. |
2487 | * Use 0 for non-blocking calls. |
2488 | |
2489 | * @remark Requires broker version >=0.10.0 and api.version.request=true. |
2490 | * |
2491 | * @returns the controller broker id (>= 0), or -1 if no ControllerId could be |
2492 | * retrieved in the allotted timespan. |
2493 | */ |
2494 | RD_EXPORT |
2495 | int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms); |
2496 | |
2497 | |
2498 | /** |
2499 | * @brief Creates a new topic handle for topic named \p topic. |
2500 | * |
2501 | * \p conf is an optional configuration for the topic created with |
2502 | * `rd_kafka_topic_conf_new()` that will be used instead of the default |
2503 | * topic configuration. |
2504 | * The \p conf object is freed by this function and must not be used or |
2505 | * destroyed by the application sub-sequently. |
2506 | * See `rd_kafka_topic_conf_set()` et.al for more information. |
2507 | * |
2508 | * Topic handles are refcounted internally and calling rd_kafka_topic_new() |
2509 | * again with the same topic name will return the previous topic handle |
2510 | * without updating the original handle's configuration. |
2511 | * Applications must eventually call rd_kafka_topic_destroy() for each |
2512 | * succesfull call to rd_kafka_topic_new() to clear up resources. |
2513 | * |
2514 | * @returns the new topic handle or NULL on error (use rd_kafka_errno2err() |
2515 | * to convert system \p errno to an rd_kafka_resp_err_t error code. |
2516 | * |
2517 | * @sa rd_kafka_topic_destroy() |
2518 | */ |
2519 | RD_EXPORT |
2520 | rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, |
2521 | rd_kafka_topic_conf_t *conf); |
2522 | |
2523 | |
2524 | |
2525 | /** |
2526 | * @brief Loose application's topic handle refcount as previously created |
2527 | * with `rd_kafka_topic_new()`. |
2528 | * |
2529 | * @remark Since topic objects are refcounted (both internally and for the app) |
2530 | * the topic object might not actually be destroyed by this call, |
2531 | * but the application must consider the object destroyed. |
2532 | */ |
2533 | RD_EXPORT |
2534 | void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt); |
2535 | |
2536 | |
2537 | /** |
2538 | * @brief Returns the topic name. |
2539 | */ |
2540 | RD_EXPORT |
2541 | const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt); |
2542 | |
2543 | |
2544 | /** |
2545 | * @brief Get the \p rkt_opaque pointer that was set in the topic configuration. |
2546 | */ |
2547 | RD_EXPORT |
2548 | void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt); |
2549 | |
2550 | |
2551 | /** |
2552 | * @brief Unassigned partition. |
2553 | * |
2554 | * The unassigned partition is used by the producer API for messages |
2555 | * that should be partitioned using the configured or default partitioner. |
2556 | */ |
2557 | #define RD_KAFKA_PARTITION_UA ((int32_t)-1) |
2558 | |
2559 | |
2560 | /** |
2561 | * @brief Polls the provided kafka handle for events. |
2562 | * |
2563 | * Events will cause application provided callbacks to be called. |
2564 | * |
2565 | * The \p timeout_ms argument specifies the maximum amount of time |
2566 | * (in milliseconds) that the call will block waiting for events. |
2567 | * For non-blocking calls, provide 0 as \p timeout_ms. |
2568 | * To wait indefinately for an event, provide -1. |
2569 | * |
2570 | * @remark An application should make sure to call poll() at regular |
2571 | * intervals to serve any queued callbacks waiting to be called. |
2572 | * @remark If your producer doesn't have any callback set (in particular |
2573 | * via rd_kafka_conf_set_dr_msg_cb or rd_kafka_conf_set_error_cb) |
2574 | * you might chose not to call poll(), though this is not |
2575 | * recommended. |
2576 | * |
2577 | * Events: |
2578 | * - delivery report callbacks (if dr_cb/dr_msg_cb is configured) [producer] |
2579 | * - error callbacks (rd_kafka_conf_set_error_cb()) [all] |
2580 | * - stats callbacks (rd_kafka_conf_set_stats_cb()) [all] |
2581 | * - throttle callbacks (rd_kafka_conf_set_throttle_cb()) [all] |
2582 | * - OAUTHBEARER token refresh callbacks (rd_kafka_conf_set_oauthbearer_token_refresh_cb()) [all] |
2583 | * |
2584 | * @returns the number of events served. |
2585 | */ |
2586 | RD_EXPORT |
2587 | int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms); |
2588 | |
2589 | |
2590 | /** |
2591 | * @brief Cancels the current callback dispatcher (rd_kafka_poll(), |
2592 | * rd_kafka_consume_callback(), etc). |
2593 | * |
2594 | * A callback may use this to force an immediate return to the calling |
2595 | * code (caller of e.g. rd_kafka_poll()) without processing any further |
2596 | * events. |
2597 | * |
2598 | * @remark This function MUST ONLY be called from within a librdkafka callback. |
2599 | */ |
2600 | RD_EXPORT |
2601 | void rd_kafka_yield (rd_kafka_t *rk); |
2602 | |
2603 | |
2604 | |
2605 | |
2606 | /** |
2607 | * @brief Pause producing or consumption for the provided list of partitions. |
2608 | * |
2609 | * Success or error is returned per-partition \p err in the \p partitions list. |
2610 | * |
2611 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR |
2612 | */ |
2613 | RD_EXPORT rd_kafka_resp_err_t |
2614 | rd_kafka_pause_partitions (rd_kafka_t *rk, |
2615 | rd_kafka_topic_partition_list_t *partitions); |
2616 | |
2617 | |
2618 | |
2619 | /** |
2620 | * @brief Resume producing consumption for the provided list of partitions. |
2621 | * |
2622 | * Success or error is returned per-partition \p err in the \p partitions list. |
2623 | * |
2624 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR |
2625 | */ |
2626 | RD_EXPORT rd_kafka_resp_err_t |
2627 | rd_kafka_resume_partitions (rd_kafka_t *rk, |
2628 | rd_kafka_topic_partition_list_t *partitions); |
2629 | |
2630 | |
2631 | |
2632 | |
2633 | /** |
2634 | * @brief Query broker for low (oldest/beginning) and high (newest/end) offsets |
2635 | * for partition. |
2636 | * |
2637 | * Offsets are returned in \p *low and \p *high respectively. |
2638 | * |
2639 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on failure. |
2640 | */ |
2641 | RD_EXPORT rd_kafka_resp_err_t |
2642 | rd_kafka_query_watermark_offsets (rd_kafka_t *rk, |
2643 | const char *topic, int32_t partition, |
2644 | int64_t *low, int64_t *high, int timeout_ms); |
2645 | |
2646 | |
2647 | /** |
2648 | * @brief Get last known low (oldest/beginning) and high (newest/end) offsets |
2649 | * for partition. |
2650 | * |
2651 | * The low offset is updated periodically (if statistics.interval.ms is set) |
2652 | * while the high offset is updated on each fetched message set from the broker. |
2653 | * |
2654 | * If there is no cached offset (either low or high, or both) then |
2655 | * RD_KAFKA_OFFSET_INVALID will be returned for the respective offset. |
2656 | * |
2657 | * Offsets are returned in \p *low and \p *high respectively. |
2658 | * |
2659 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on failure. |
2660 | * |
2661 | * @remark Shall only be used with an active consumer instance. |
2662 | */ |
2663 | RD_EXPORT rd_kafka_resp_err_t |
2664 | rd_kafka_get_watermark_offsets (rd_kafka_t *rk, |
2665 | const char *topic, int32_t partition, |
2666 | int64_t *low, int64_t *high); |
2667 | |
2668 | |
2669 | |
2670 | /** |
2671 | * @brief Look up the offsets for the given partitions by timestamp. |
2672 | * |
2673 | * The returned offset for each partition is the earliest offset whose |
2674 | * timestamp is greater than or equal to the given timestamp in the |
2675 | * corresponding partition. |
2676 | * |
2677 | * The timestamps to query are represented as \c offset in \p offsets |
2678 | * on input, and \c offset will contain the offset on output. |
2679 | * |
2680 | * The function will block for at most \p timeout_ms milliseconds. |
2681 | * |
2682 | * @remark Duplicate Topic+Partitions are not supported. |
2683 | * @remark Per-partition errors may be returned in \c rd_kafka_topic_partition_t.err |
2684 | * |
2685 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if offsets were be queried (do note |
2686 | * that per-partition errors might be set), |
2687 | * RD_KAFKA_RESP_ERR__TIMED_OUT if not all offsets could be fetched |
2688 | * within \p timeout_ms, |
2689 | * RD_KAFKA_RESP_ERR__INVALID_ARG if the \p offsets list is empty, |
2690 | * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if all partitions are unknown, |
2691 | * RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE if unable to query leaders |
2692 | * for the given partitions. |
2693 | */ |
2694 | RD_EXPORT rd_kafka_resp_err_t |
2695 | rd_kafka_offsets_for_times (rd_kafka_t *rk, |
2696 | rd_kafka_topic_partition_list_t *offsets, |
2697 | int timeout_ms); |
2698 | |
2699 | |
2700 | /** |
2701 | * @brief Free pointer returned by librdkafka |
2702 | * |
2703 | * This is typically an abstraction for the free(3) call and makes sure |
2704 | * the application can use the same memory allocator as librdkafka for |
2705 | * freeing pointers returned by librdkafka. |
2706 | * |
2707 | * In standard setups it is usually not necessary to use this interface |
2708 | * rather than the free(3) functione. |
2709 | * |
2710 | * \p rk must be set for memory returned by APIs that take an \c rk argument, |
2711 | * for other APIs pass NULL for \p rk. |
2712 | * |
2713 | * @remark rd_kafka_mem_free() must only be used for pointers returned by APIs |
2714 | * that explicitly mention using this function for freeing. |
2715 | */ |
2716 | RD_EXPORT |
2717 | void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr); |
2718 | |
2719 | |
2720 | /**@}*/ |
2721 | |
2722 | |
2723 | |
2724 | |
2725 | |
2726 | /** |
2727 | * @name Queue API |
2728 | * @{ |
2729 | * |
2730 | * Message queues allows the application to re-route consumed messages |
2731 | * from multiple topic+partitions into one single queue point. |
2732 | * This queue point containing messages from a number of topic+partitions |
2733 | * may then be served by a single rd_kafka_consume*_queue() call, |
2734 | * rather than one call per topic+partition combination. |
2735 | */ |
2736 | |
2737 | |
2738 | /** |
2739 | * @brief Create a new message queue. |
2740 | * |
2741 | * See rd_kafka_consume_start_queue(), rd_kafka_consume_queue(), et.al. |
2742 | */ |
2743 | RD_EXPORT |
2744 | rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk); |
2745 | |
2746 | /** |
2747 | * Destroy a queue, purging all of its enqueued messages. |
2748 | */ |
2749 | RD_EXPORT |
2750 | void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu); |
2751 | |
2752 | |
2753 | /** |
2754 | * @returns a reference to the main librdkafka event queue. |
2755 | * This is the queue served by rd_kafka_poll(). |
2756 | * |
2757 | * Use rd_kafka_queue_destroy() to loose the reference. |
2758 | */ |
2759 | RD_EXPORT |
2760 | rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk); |
2761 | |
2762 | |
2763 | /** |
2764 | * @returns a reference to the librdkafka consumer queue. |
2765 | * This is the queue served by rd_kafka_consumer_poll(). |
2766 | * |
2767 | * Use rd_kafka_queue_destroy() to loose the reference. |
2768 | * |
2769 | * @remark rd_kafka_queue_destroy() MUST be called on this queue |
2770 | * prior to calling rd_kafka_consumer_close(). |
2771 | */ |
2772 | RD_EXPORT |
2773 | rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk); |
2774 | |
2775 | /** |
2776 | * @returns a reference to the partition's queue, or NULL if |
2777 | * partition is invalid. |
2778 | * |
2779 | * Use rd_kafka_queue_destroy() to loose the reference. |
2780 | * |
2781 | * @remark rd_kafka_queue_destroy() MUST be called on this queue |
2782 | * |
2783 | * @remark This function only works on consumers. |
2784 | */ |
2785 | RD_EXPORT |
2786 | rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk, |
2787 | const char *topic, |
2788 | int32_t partition); |
2789 | |
2790 | /** |
2791 | * @returns a reference to the background thread queue, or NULL if the |
2792 | * background queue is not enabled. |
2793 | * |
2794 | * To enable the background thread queue set a generic event handler callback |
2795 | * with rd_kafka_conf_set_background_event_cb() on the client instance |
2796 | * configuration object (rd_kafka_conf_t). |
2797 | * |
2798 | * The background queue is polled and served by librdkafka and MUST NOT be |
2799 | * polled, forwarded, or otherwise managed by the application, it may only |
2800 | * be used as the destination queue passed to queue-enabled APIs, such as |
2801 | * the Admin API. |
2802 | * |
2803 | * The background thread queue provides the application with an automatically |
2804 | * polled queue that triggers the event callback in a background thread, |
2805 | * this background thread is completely managed by librdkafka. |
2806 | * |
2807 | * Use rd_kafka_queue_destroy() to loose the reference. |
2808 | * |
2809 | * @warning The background queue MUST NOT be read from (polled, consumed, etc), |
2810 | * or forwarded from. |
2811 | */ |
2812 | RD_EXPORT |
2813 | rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk); |
2814 | |
2815 | |
2816 | /** |
2817 | * @brief Forward/re-route queue \p src to \p dst. |
2818 | * If \p dst is \c NULL the forwarding is removed. |
2819 | * |
2820 | * The internal refcounts for both queues are increased. |
2821 | * |
2822 | * @remark Regardless of whether \p dst is NULL or not, after calling this |
2823 | * function, \p src will not forward it's fetch queue to the consumer |
2824 | * queue. |
2825 | */ |
2826 | RD_EXPORT |
2827 | void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst); |
2828 | |
2829 | /** |
2830 | * @brief Forward librdkafka logs (and debug) to the specified queue |
2831 | * for serving with one of the ..poll() calls. |
2832 | * |
2833 | * This allows an application to serve log callbacks (\c log_cb) |
2834 | * in its thread of choice. |
2835 | * |
2836 | * @param rkqu Queue to forward logs to. If the value is NULL the logs |
2837 | * are forwarded to the main queue. |
2838 | * |
2839 | * @remark The configuration property \c log.queue MUST also be set to true. |
2840 | * |
2841 | * @remark librdkafka maintains its own reference to the provided queue. |
2842 | * |
2843 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error. |
2844 | */ |
2845 | RD_EXPORT |
2846 | rd_kafka_resp_err_t rd_kafka_set_log_queue (rd_kafka_t *rk, |
2847 | rd_kafka_queue_t *rkqu); |
2848 | |
2849 | |
2850 | /** |
2851 | * @returns the current number of elements in queue. |
2852 | */ |
2853 | RD_EXPORT |
2854 | size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu); |
2855 | |
2856 | |
2857 | /** |
2858 | * @brief Enable IO event triggering for queue. |
2859 | * |
2860 | * To ease integration with IO based polling loops this API |
2861 | * allows an application to create a separate file-descriptor |
2862 | * that librdkafka will write \p payload (of size \p size) to |
2863 | * whenever a new element is enqueued on a previously empty queue. |
2864 | * |
2865 | * To remove event triggering call with \p fd = -1. |
2866 | * |
2867 | * librdkafka will maintain a copy of the \p payload. |
2868 | * |
2869 | * @remark IO and callback event triggering are mutually exclusive. |
2870 | * @remark When using forwarded queues the IO event must only be enabled |
2871 | * on the final forwarded-to (destination) queue. |
2872 | */ |
2873 | RD_EXPORT |
2874 | void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd, |
2875 | const void *payload, size_t size); |
2876 | |
2877 | /** |
2878 | * @brief Enable callback event triggering for queue. |
2879 | * |
2880 | * The callback will be called from an internal librdkafka thread |
2881 | * when a new element is enqueued on a previously empty queue. |
2882 | * |
2883 | * To remove event triggering call with \p event_cb = NULL. |
2884 | * |
2885 | * @remark IO and callback event triggering are mutually exclusive. |
2886 | * @remark Since the callback may be triggered from internal librdkafka |
2887 | * threads, the application must not perform any pro-longed work in |
2888 | * the callback, or call any librdkafka APIs (for the same rd_kafka_t |
2889 | * handle). |
2890 | */ |
2891 | RD_EXPORT |
2892 | void rd_kafka_queue_cb_event_enable (rd_kafka_queue_t *rkqu, |
2893 | void (*event_cb) (rd_kafka_t *rk, |
2894 | void *opaque), |
2895 | void *opaque); |
2896 | |
2897 | /**@}*/ |
2898 | |
2899 | /** |
2900 | * |
2901 | * @name Simple Consumer API (legacy) |
2902 | * @{ |
2903 | * |
2904 | */ |
2905 | |
2906 | |
2907 | #define RD_KAFKA_OFFSET_BEGINNING -2 /**< Start consuming from beginning of |
2908 | * kafka partition queue: oldest msg */ |
2909 | #define RD_KAFKA_OFFSET_END -1 /**< Start consuming from end of kafka |
2910 | * partition queue: next msg */ |
2911 | #define RD_KAFKA_OFFSET_STORED -1000 /**< Start consuming from offset retrieved |
2912 | * from offset store */ |
2913 | #define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */ |
2914 | |
2915 | |
2916 | /** @cond NO_DOC */ |
2917 | #define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */ |
2918 | /** @endcond */ |
2919 | |
2920 | /** |
2921 | * @brief Start consuming \p CNT messages from topic's current end offset. |
2922 | * |
2923 | * That is, if current end offset is 12345 and \p CNT is 200, it will start |
2924 | * consuming from offset \c 12345-200 = \c 12145. */ |
2925 | #define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT)) |
2926 | |
2927 | /** |
2928 | * @brief Start consuming messages for topic \p rkt and \p partition |
2929 | * at offset \p offset which may either be an absolute \c (0..N) |
2930 | * or one of the logical offsets: |
2931 | * - RD_KAFKA_OFFSET_BEGINNING |
2932 | * - RD_KAFKA_OFFSET_END |
2933 | * - RD_KAFKA_OFFSET_STORED |
2934 | * - RD_KAFKA_OFFSET_TAIL |
2935 | * |
2936 | * rdkafka will attempt to keep \c queued.min.messages (config property) |
2937 | * messages in the local queue by repeatedly fetching batches of messages |
2938 | * from the broker until the threshold is reached. |
2939 | * |
2940 | * The application shall use one of the `rd_kafka_consume*()` functions |
2941 | * to consume messages from the local queue, each kafka message being |
2942 | * represented as a `rd_kafka_message_t *` object. |
2943 | * |
2944 | * `rd_kafka_consume_start()` must not be called multiple times for the same |
2945 | * topic and partition without stopping consumption first with |
2946 | * `rd_kafka_consume_stop()`. |
2947 | * |
2948 | * @returns 0 on success or -1 on error in which case errno is set accordingly: |
2949 | * - EBUSY - Conflicts with an existing or previous subscription |
2950 | * (RD_KAFKA_RESP_ERR__CONFLICT) |
2951 | * - EINVAL - Invalid offset, or incomplete configuration (lacking group.id) |
2952 | * (RD_KAFKA_RESP_ERR__INVALID_ARG) |
2953 | * - ESRCH - requested \p partition is invalid. |
2954 | * (RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) |
2955 | * - ENOENT - topic is unknown in the Kafka cluster. |
2956 | * (RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) |
2957 | * |
2958 | * Use `rd_kafka_errno2err()` to convert sytem \c errno to `rd_kafka_resp_err_t` |
2959 | */ |
2960 | RD_EXPORT |
2961 | int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, |
2962 | int64_t offset); |
2963 | |
2964 | /** |
2965 | * @brief Same as rd_kafka_consume_start() but re-routes incoming messages to |
2966 | * the provided queue \p rkqu (which must have been previously allocated |
2967 | * with `rd_kafka_queue_new()`. |
2968 | * |
2969 | * The application must use one of the `rd_kafka_consume_*_queue()` functions |
2970 | * to receive fetched messages. |
2971 | * |
2972 | * `rd_kafka_consume_start_queue()` must not be called multiple times for the |
2973 | * same topic and partition without stopping consumption first with |
2974 | * `rd_kafka_consume_stop()`. |
2975 | * `rd_kafka_consume_start()` and `rd_kafka_consume_start_queue()` must not |
2976 | * be combined for the same topic and partition. |
2977 | */ |
2978 | RD_EXPORT |
2979 | int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, |
2980 | int64_t offset, rd_kafka_queue_t *rkqu); |
2981 | |
2982 | /** |
2983 | * @brief Stop consuming messages for topic \p rkt and \p partition, purging |
2984 | * all messages currently in the local queue. |
2985 | * |
2986 | * NOTE: To enforce synchronisation this call will block until the internal |
2987 | * fetcher has terminated and offsets are committed to configured |
2988 | * storage method. |
2989 | * |
2990 | * The application needs to be stop all consumers before calling |
2991 | * `rd_kafka_destroy()` on the main object handle. |
2992 | * |
2993 | * @returns 0 on success or -1 on error (see `errno`). |
2994 | */ |
2995 | RD_EXPORT |
2996 | int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition); |
2997 | |
2998 | |
2999 | |
3000 | /** |
3001 | * @brief Seek consumer for topic+partition to \p offset which is either an |
3002 | * absolute or logical offset. |
3003 | * |
3004 | * If \p timeout_ms is not 0 the call will wait this long for the |
3005 | * seek to be performed. If the timeout is reached the internal state |
3006 | * will be unknown and this function returns `RD_KAFKA_RESP_ERR__TIMED_OUT`. |
3007 | * If \p timeout_ms is 0 it will initiate the seek but return |
3008 | * immediately without any error reporting (e.g., async). |
3009 | * |
3010 | * This call triggers a fetch queue barrier flush. |
3011 | * |
3012 | * @returns `RD_KAFKA_RESP_ERR__NO_ERROR` on success else an error code. |
3013 | */ |
3014 | RD_EXPORT |
3015 | rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt, |
3016 | int32_t partition, |
3017 | int64_t offset, |
3018 | int timeout_ms); |
3019 | |
3020 | |
3021 | /** |
3022 | * @brief Consume a single message from topic \p rkt and \p partition |
3023 | * |
3024 | * \p timeout_ms is maximum amount of time to wait for a message to be received. |
3025 | * Consumer must have been previously started with `rd_kafka_consume_start()`. |
3026 | * |
3027 | * @returns a message object on success or \c NULL on error. |
3028 | * The message object must be destroyed with `rd_kafka_message_destroy()` |
3029 | * when the application is done with it. |
3030 | * |
3031 | * Errors (when returning NULL): |
3032 | * - ETIMEDOUT - \p timeout_ms was reached with no new messages fetched. |
3033 | * - ENOENT - \p rkt + \p partition is unknown. |
3034 | * (no prior `rd_kafka_consume_start()` call) |
3035 | * |
3036 | * NOTE: The returned message's \c ..->err must be checked for errors. |
3037 | * NOTE: \c ..->err \c == \c RD_KAFKA_RESP_ERR__PARTITION_EOF signals that the |
3038 | * end of the partition has been reached, which should typically not be |
3039 | * considered an error. The application should handle this case |
3040 | * (e.g., ignore). |
3041 | * |
3042 | * @remark on_consume() interceptors may be called from this function prior to |
3043 | * passing message to application. |
3044 | */ |
3045 | RD_EXPORT |
3046 | rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, |
3047 | int timeout_ms); |
3048 | |
3049 | |
3050 | |
3051 | /** |
3052 | * @brief Consume up to \p rkmessages_size from topic \p rkt and \p partition |
3053 | * putting a pointer to each message in the application provided |
3054 | * array \p rkmessages (of size \p rkmessages_size entries). |
3055 | * |
3056 | * `rd_kafka_consume_batch()` provides higher throughput performance |
3057 | * than `rd_kafka_consume()`. |
3058 | * |
3059 | * \p timeout_ms is the maximum amount of time to wait for all of |
3060 | * \p rkmessages_size messages to be put into \p rkmessages. |
3061 | * If no messages were available within the timeout period this function |
3062 | * returns 0 and \p rkmessages remains untouched. |
3063 | * This differs somewhat from `rd_kafka_consume()`. |
3064 | * |
3065 | * The message objects must be destroyed with `rd_kafka_message_destroy()` |
3066 | * when the application is done with it. |
3067 | * |
3068 | * @returns the number of rkmessages added in \p rkmessages, |
3069 | * or -1 on error (same error codes as for `rd_kafka_consume()`. |
3070 | * |
3071 | * @sa rd_kafka_consume() |
3072 | * |
3073 | * @remark on_consume() interceptors may be called from this function prior to |
3074 | * passing message to application. |
3075 | */ |
3076 | RD_EXPORT |
3077 | ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, |
3078 | int timeout_ms, |
3079 | rd_kafka_message_t **rkmessages, |
3080 | size_t rkmessages_size); |
3081 | |
3082 | |
3083 | |
3084 | /** |
3085 | * @brief Consumes messages from topic \p rkt and \p partition, calling |
3086 | * the provided callback for each consumed messsage. |
3087 | * |
3088 | * `rd_kafka_consume_callback()` provides higher throughput performance |
3089 | * than both `rd_kafka_consume()` and `rd_kafka_consume_batch()`. |
3090 | * |
3091 | * \p timeout_ms is the maximum amount of time to wait for one or more messages |
3092 | * to arrive. |
3093 | * |
3094 | * The provided \p consume_cb function is called for each message, |
3095 | * the application \b MUST \b NOT call `rd_kafka_message_destroy()` on the |
3096 | * provided \p rkmessage. |
3097 | * |
3098 | * The \p opaque argument is passed to the 'consume_cb' as \p opaque. |
3099 | * |
3100 | * @returns the number of messages processed or -1 on error. |
3101 | * |
3102 | * @sa rd_kafka_consume() |
3103 | * |
3104 | * @remark on_consume() interceptors may be called from this function prior to |
3105 | * passing message to application. |
3106 | */ |
3107 | RD_EXPORT |
3108 | int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, |
3109 | int timeout_ms, |
3110 | void (*consume_cb) (rd_kafka_message_t |
3111 | *rkmessage, |
3112 | void *opaque), |
3113 | void *opaque); |
3114 | |
3115 | |
3116 | /** |
3117 | * @name Simple Consumer API (legacy): Queue consumers |
3118 | * @{ |
3119 | * |
3120 | * The following `..._queue()` functions are analogue to the functions above |
3121 | * but reads messages from the provided queue \p rkqu instead. |
3122 | * \p rkqu must have been previously created with `rd_kafka_queue_new()` |
3123 | * and the topic consumer must have been started with |
3124 | * `rd_kafka_consume_start_queue()` utilising the the same queue. |
3125 | */ |
3126 | |
3127 | /** |
3128 | * @brief Consume from queue |
3129 | * |
3130 | * @sa rd_kafka_consume() |
3131 | */ |
3132 | RD_EXPORT |
3133 | rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, |
3134 | int timeout_ms); |
3135 | |
3136 | /** |
3137 | * @brief Consume batch of messages from queue |
3138 | * |
3139 | * @sa rd_kafka_consume_batch() |
3140 | */ |
3141 | RD_EXPORT |
3142 | ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, |
3143 | int timeout_ms, |
3144 | rd_kafka_message_t **rkmessages, |
3145 | size_t rkmessages_size); |
3146 | |
3147 | /** |
3148 | * @brief Consume multiple messages from queue with callback |
3149 | * |
3150 | * @sa rd_kafka_consume_callback() |
3151 | */ |
3152 | RD_EXPORT |
3153 | int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, |
3154 | int timeout_ms, |
3155 | void (*consume_cb) (rd_kafka_message_t |
3156 | *rkmessage, |
3157 | void *opaque), |
3158 | void *opaque); |
3159 | |
3160 | |
3161 | /**@}*/ |
3162 | |
3163 | |
3164 | |
3165 | |
3166 | /** |
3167 | * @name Simple Consumer API (legacy): Topic+partition offset store. |
3168 | * @{ |
3169 | * |
3170 | * If \c auto.commit.enable is true the offset is stored automatically prior to |
3171 | * returning of the message(s) in each of the rd_kafka_consume*() functions |
3172 | * above. |
3173 | */ |
3174 | |
3175 | |
3176 | /** |
3177 | * @brief Store offset \p offset for topic \p rkt partition \p partition. |
3178 | * |
3179 | * The offset will be committed (written) to the offset store according |
3180 | * to \c `auto.commit.interval.ms` or manual offset-less commit() |
3181 | * |
3182 | * @remark \c `enable.auto.offset.store` must be set to "false" when using this API. |
3183 | * |
3184 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error. |
3185 | */ |
3186 | RD_EXPORT |
3187 | rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, |
3188 | int32_t partition, int64_t offset); |
3189 | |
3190 | |
3191 | /** |
3192 | * @brief Store offsets for next auto-commit for one or more partitions. |
3193 | * |
3194 | * The offset will be committed (written) to the offset store according |
3195 | * to \c `auto.commit.interval.ms` or manual offset-less commit(). |
3196 | * |
3197 | * Per-partition success/error status propagated through each partition's |
3198 | * \c .err field. |
3199 | * |
3200 | * @remark \c `enable.auto.offset.store` must be set to "false" when using this API. |
3201 | * |
3202 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or |
3203 | * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if none of the |
3204 | * offsets could be stored, or |
3205 | * RD_KAFKA_RESP_ERR__INVALID_ARG if \c enable.auto.offset.store is true. |
3206 | */ |
3207 | RD_EXPORT rd_kafka_resp_err_t |
3208 | rd_kafka_offsets_store(rd_kafka_t *rk, |
3209 | rd_kafka_topic_partition_list_t *offsets); |
3210 | /**@}*/ |
3211 | |
3212 | |
3213 | |
3214 | |
3215 | /** |
3216 | * @name KafkaConsumer (C) |
3217 | * @{ |
3218 | * @brief High-level KafkaConsumer C API |
3219 | * |
3220 | * |
3221 | * |
3222 | */ |
3223 | |
3224 | /** |
3225 | * @brief Subscribe to topic set using balanced consumer groups. |
3226 | * |
3227 | * Wildcard (regex) topics are supported: |
3228 | * any topic name in the \p topics list that is prefixed with \c \"^\" will |
3229 | * be regex-matched to the full list of topics in the cluster and matching |
3230 | * topics will be added to the subscription list. |
3231 | * |
3232 | * The full topic list is retrieved every \c topic.metadata.refresh.interval.ms |
3233 | * to pick up new or delete topics that match the subscription. |
3234 | * If there is any change to the matched topics the consumer will |
3235 | * immediately rejoin the group with the updated set of subscribed topics. |
3236 | * |
3237 | * Regex and full topic names can be mixed in \p topics. |
3238 | * |
3239 | * @remark Only the \c .topic field is used in the supplied \p topics list, |
3240 | * all other fields are ignored. |
3241 | * |
3242 | * @remark subscribe() is an asynchronous method which returns immediately: |
3243 | * background threads will (re)join the group, wait for group rebalance, |
3244 | * issue any registered rebalance_cb, assign() the assigned partitions, |
3245 | * and then start fetching messages. This cycle may take up to |
3246 | * \c session.timeout.ms * 2 or more to complete. |
3247 | * |
3248 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or |
3249 | * RD_KAFKA_RESP_ERR__INVALID_ARG if list is empty, contains invalid |
3250 | * topics or regexes. |
3251 | */ |
3252 | RD_EXPORT rd_kafka_resp_err_t |
3253 | rd_kafka_subscribe (rd_kafka_t *rk, |
3254 | const rd_kafka_topic_partition_list_t *topics); |
3255 | |
3256 | |
3257 | /** |
3258 | * @brief Unsubscribe from the current subscription set. |
3259 | */ |
3260 | RD_EXPORT |
3261 | rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk); |
3262 | |
3263 | |
3264 | /** |
3265 | * @brief Returns the current topic subscription |
3266 | * |
3267 | * @returns An error code on failure, otherwise \p topic is updated |
3268 | * to point to a newly allocated topic list (possibly empty). |
3269 | * |
3270 | * @remark The application is responsible for calling |
3271 | * rd_kafka_topic_partition_list_destroy on the returned list. |
3272 | */ |
3273 | RD_EXPORT rd_kafka_resp_err_t |
3274 | rd_kafka_subscription (rd_kafka_t *rk, |
3275 | rd_kafka_topic_partition_list_t **topics); |
3276 | |
3277 | |
3278 | |
3279 | /** |
3280 | * @brief Poll the consumer for messages or events. |
3281 | * |
3282 | * Will block for at most \p timeout_ms milliseconds. |
3283 | * |
3284 | * @remark An application should make sure to call consumer_poll() at regular |
3285 | * intervals, even if no messages are expected, to serve any |
3286 | * queued callbacks waiting to be called. This is especially |
3287 | * important when a rebalance_cb has been registered as it needs |
3288 | * to be called and handled properly to synchronize internal |
3289 | * consumer state. |
3290 | * |
3291 | * @returns A message object which is a proper message if \p ->err is |
3292 | * RD_KAFKA_RESP_ERR_NO_ERROR, or an event or error for any other |
3293 | * value. |
3294 | * |
3295 | * @remark on_consume() interceptors may be called from this function prior to |
3296 | * passing message to application. |
3297 | * |
3298 | * @remark When subscribing to topics the application must call poll at |
3299 | * least every \c max.poll.interval.ms to remain a member of the |
3300 | * consumer group. |
3301 | * |
3302 | * Noteworthy errors returned in \c ->err: |
3303 | * - RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED - application failed to call |
3304 | * poll within `max.poll.interval.ms`. |
3305 | * |
3306 | * @sa rd_kafka_message_t |
3307 | */ |
3308 | RD_EXPORT |
3309 | rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms); |
3310 | |
3311 | /** |
3312 | * @brief Close down the KafkaConsumer. |
3313 | * |
3314 | * @remark This call will block until the consumer has revoked its assignment, |
3315 | * calling the \c rebalance_cb if it is configured, committed offsets |
3316 | * to broker, and left the consumer group. |
3317 | * The maximum blocking time is roughly limited to session.timeout.ms. |
3318 | * |
3319 | * @returns An error code indicating if the consumer close was succesful |
3320 | * or not. |
3321 | * |
3322 | * @remark The application still needs to call rd_kafka_destroy() after |
3323 | * this call finishes to clean up the underlying handle resources. |
3324 | * |
3325 | */ |
3326 | RD_EXPORT |
3327 | rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk); |
3328 | |
3329 | |
3330 | |
3331 | /** |
3332 | * @brief Atomic assignment of partitions to consume. |
3333 | * |
3334 | * The new \p partitions will replace the existing assignment. |
3335 | * |
3336 | * When used from a rebalance callback the application shall pass the |
3337 | * partition list passed to the callback (or a copy of it) (even if the list |
3338 | * is empty) rather than NULL to maintain internal join state. |
3339 | |
3340 | * A zero-length \p partitions will treat the partitions as a valid, |
3341 | * albeit empty, assignment, and maintain internal state, while a \c NULL |
3342 | * value for \p partitions will reset and clear the internal state. |
3343 | */ |
3344 | RD_EXPORT rd_kafka_resp_err_t |
3345 | rd_kafka_assign (rd_kafka_t *rk, |
3346 | const rd_kafka_topic_partition_list_t *partitions); |
3347 | |
3348 | /** |
3349 | * @brief Returns the current partition assignment |
3350 | * |
3351 | * @returns An error code on failure, otherwise \p partitions is updated |
3352 | * to point to a newly allocated partition list (possibly empty). |
3353 | * |
3354 | * @remark The application is responsible for calling |
3355 | * rd_kafka_topic_partition_list_destroy on the returned list. |
3356 | */ |
3357 | RD_EXPORT rd_kafka_resp_err_t |
3358 | rd_kafka_assignment (rd_kafka_t *rk, |
3359 | rd_kafka_topic_partition_list_t **partitions); |
3360 | |
3361 | |
3362 | |
3363 | |
3364 | /** |
3365 | * @brief Commit offsets on broker for the provided list of partitions. |
3366 | * |
3367 | * \p offsets should contain \c topic, \c partition, \c offset and possibly |
3368 | * \c metadata. |
3369 | * If \p offsets is NULL the current partition assignment will be used instead. |
3370 | * |
3371 | * If \p async is false this operation will block until the broker offset commit |
3372 | * is done, returning the resulting success or error code. |
3373 | * |
3374 | * If a rd_kafka_conf_set_offset_commit_cb() offset commit callback has been |
3375 | * configured the callback will be enqueued for a future call to |
3376 | * rd_kafka_poll(), rd_kafka_consumer_poll() or similar. |
3377 | */ |
3378 | RD_EXPORT rd_kafka_resp_err_t |
3379 | rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, |
3380 | int async); |
3381 | |
3382 | |
3383 | /** |
3384 | * @brief Commit message's offset on broker for the message's partition. |
3385 | * |
3386 | * @sa rd_kafka_commit |
3387 | */ |
3388 | RD_EXPORT rd_kafka_resp_err_t |
3389 | rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, |
3390 | int async); |
3391 | |
3392 | |
3393 | /** |
3394 | * @brief Commit offsets on broker for the provided list of partitions. |
3395 | * |
3396 | * See rd_kafka_commit for \p offsets semantics. |
3397 | * |
3398 | * The result of the offset commit will be posted on the provided \p rkqu queue. |
3399 | * |
3400 | * If the application uses one of the poll APIs (rd_kafka_poll(), |
3401 | * rd_kafka_consumer_poll(), rd_kafka_queue_poll(), ..) to serve the queue |
3402 | * the \p cb callback is required. \p opaque is passed to the callback. |
3403 | * |
3404 | * If using the event API the callback is ignored and the offset commit result |
3405 | * will be returned as an RD_KAFKA_EVENT_COMMIT event. The \p opaque |
3406 | * value will be available with rd_kafka_event_opaque() |
3407 | * |
3408 | * If \p rkqu is NULL a temporary queue will be created and the callback will |
3409 | * be served by this call. |
3410 | * |
3411 | * @sa rd_kafka_commit() |
3412 | * @sa rd_kafka_conf_set_offset_commit_cb() |
3413 | */ |
3414 | RD_EXPORT rd_kafka_resp_err_t |
3415 | rd_kafka_commit_queue (rd_kafka_t *rk, |
3416 | const rd_kafka_topic_partition_list_t *offsets, |
3417 | rd_kafka_queue_t *rkqu, |
3418 | void (*cb) (rd_kafka_t *rk, |
3419 | rd_kafka_resp_err_t err, |
3420 | rd_kafka_topic_partition_list_t *offsets, |
3421 | void *opaque), |
3422 | void *opaque); |
3423 | |
3424 | |
3425 | /** |
3426 | * @brief Retrieve committed offsets for topics+partitions. |
3427 | * |
3428 | * The \p offset field of each requested partition will either be set to |
3429 | * stored offset or to RD_KAFKA_OFFSET_INVALID in case there was no stored |
3430 | * offset for that partition. |
3431 | * |
3432 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the |
3433 | * \p offset or \p err field of each \p partitions' element is filled |
3434 | * in with the stored offset, or a partition specific error. |
3435 | * Else returns an error code. |
3436 | */ |
3437 | RD_EXPORT rd_kafka_resp_err_t |
3438 | rd_kafka_committed (rd_kafka_t *rk, |
3439 | rd_kafka_topic_partition_list_t *partitions, |
3440 | int timeout_ms); |
3441 | |
3442 | |
3443 | |
3444 | /** |
3445 | * @brief Retrieve current positions (offsets) for topics+partitions. |
3446 | * |
3447 | * The \p offset field of each requested partition will be set to the offset |
3448 | * of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was |
3449 | * no previous message. |
3450 | * |
3451 | * @remark In this context the last consumed message is the offset consumed |
3452 | * by the current librdkafka instance and, in case of rebalancing, not |
3453 | * necessarily the last message fetched from the partition. |
3454 | * |
3455 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the |
3456 | * \p offset or \p err field of each \p partitions' element is filled |
3457 | * in with the stored offset, or a partition specific error. |
3458 | * Else returns an error code. |
3459 | */ |
3460 | RD_EXPORT rd_kafka_resp_err_t |
3461 | rd_kafka_position (rd_kafka_t *rk, |
3462 | rd_kafka_topic_partition_list_t *partitions); |
3463 | |
3464 | |
3465 | /**@}*/ |
3466 | |
3467 | |
3468 | |
3469 | /** |
3470 | * @name Producer API |
3471 | * @{ |
3472 | * |
3473 | * |
3474 | */ |
3475 | |
3476 | |
3477 | /** |
3478 | * @brief Producer message flags |
3479 | */ |
3480 | #define RD_KAFKA_MSG_F_FREE 0x1 /**< Delegate freeing of payload to rdkafka. */ |
3481 | #define RD_KAFKA_MSG_F_COPY 0x2 /**< rdkafka will make a copy of the payload. */ |
3482 | #define RD_KAFKA_MSG_F_BLOCK 0x4 /**< Block produce*() on message queue full. |
3483 | * WARNING: If a delivery report callback |
3484 | * is used the application MUST |
3485 | * call rd_kafka_poll() (or equiv.) |
3486 | * to make sure delivered messages |
3487 | * are drained from the internal |
3488 | * delivery report queue. |
3489 | * Failure to do so will result |
3490 | * in indefinately blocking on |
3491 | * the produce() call when the |
3492 | * message queue is full. */ |
3493 | #define RD_KAFKA_MSG_F_PARTITION 0x8 /**< produce_batch() will honor |
3494 | * per-message partition. */ |
3495 | |
3496 | |
3497 | |
3498 | /** |
3499 | * @brief Produce and send a single message to broker. |
3500 | * |
3501 | * \p rkt is the target topic which must have been previously created with |
3502 | * `rd_kafka_topic_new()`. |
3503 | * |
3504 | * `rd_kafka_produce()` is an asynch non-blocking API. |
3505 | * See `rd_kafka_conf_set_dr_msg_cb` on how to setup a callback to be called |
3506 | * once the delivery status (success or failure) is known. The delivery report |
3507 | * is trigged by the application calling `rd_kafka_poll()` (at regular |
3508 | * intervals) or `rd_kafka_flush()` (at termination). |
3509 | * |
3510 | * Since producing is asynchronous, you should call `rd_kafka_flush()` before |
3511 | * you destroy the producer. Otherwise, any outstanding messages will be |
3512 | * silently discarded. |
3513 | * |
3514 | * When temporary errors occur, librdkafka automatically retries to produce the |
3515 | * messages. Retries are triggered after retry.backoff.ms and when the |
3516 | * leader broker for the given partition is available. Otherwise, librdkafka |
3517 | * falls back to polling the topic metadata to monitor when a new leader is |
3518 | * elected (see the topic.metadata.refresh.fast.interval.ms and |
3519 | * topic.metadata.refresh.interval.ms configurations) and then performs a |
3520 | * retry. A delivery error will occur if the message could not be produced |
3521 | * within message.timeout.ms. |
3522 | * |
3523 | * See the "Message reliability" chapter in INTRODUCTION.md for more |
3524 | * information. |
3525 | * |
3526 | * \p partition is the target partition, either: |
3527 | * - RD_KAFKA_PARTITION_UA (unassigned) for |
3528 | * automatic partitioning using the topic's partitioner function, or |
3529 | * - a fixed partition (0..N) |
3530 | * |
3531 | * \p msgflags is zero or more of the following flags OR:ed together: |
3532 | * RD_KAFKA_MSG_F_BLOCK - block \p produce*() call if |
3533 | * \p queue.buffering.max.messages or |
3534 | * \p queue.buffering.max.kbytes are exceeded. |
3535 | * Messages are considered in-queue from the point they |
3536 | * are accepted by produce() until their corresponding |
3537 | * delivery report callback/event returns. |
3538 | * It is thus a requirement to call |
3539 | * rd_kafka_poll() (or equiv.) from a separate |
3540 | * thread when F_BLOCK is used. |
3541 | * See WARNING on \c RD_KAFKA_MSG_F_BLOCK above. |
3542 | * |
3543 | * RD_KAFKA_MSG_F_FREE - rdkafka will free(3) \p payload when it is done |
3544 | * with it. |
3545 | * RD_KAFKA_MSG_F_COPY - the \p payload data will be copied and the |
3546 | * \p payload pointer will not be used by rdkafka |
3547 | * after the call returns. |
3548 | * RD_KAFKA_MSG_F_PARTITION - produce_batch() will honour per-message |
3549 | * partition, either set manually or by the |
3550 | * configured partitioner. |
3551 | * |
3552 | * .._F_FREE and .._F_COPY are mutually exclusive. |
3553 | * |
3554 | * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then |
3555 | * the memory associated with the payload is still the caller's |
3556 | * responsibility. |
3557 | * |
3558 | * \p payload is the message payload of size \p len bytes. |
3559 | * |
3560 | * \p key is an optional message key of size \p keylen bytes, if non-NULL it |
3561 | * will be passed to the topic partitioner as well as be sent with the |
3562 | * message to the broker and passed on to the consumer. |
3563 | * |
3564 | * \p msg_opaque is an optional application-provided per-message opaque |
3565 | * pointer that will provided in the delivery report callback (`dr_cb`) for |
3566 | * referencing this message. |
3567 | * |
3568 | * @remark on_send() and on_acknowledgement() interceptors may be called |
3569 | * from this function. on_acknowledgement() will only be called if the |
3570 | * message fails partitioning. |
3571 | * |
3572 | * @returns 0 on success or -1 on error in which case errno is set accordingly: |
3573 | * - ENOBUFS - maximum number of outstanding messages has been reached: |
3574 | * "queue.buffering.max.messages" |
3575 | * (RD_KAFKA_RESP_ERR__QUEUE_FULL) |
3576 | * - EMSGSIZE - message is larger than configured max size: |
3577 | * "messages.max.bytes". |
3578 | * (RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) |
3579 | * - ESRCH - requested \p partition is unknown in the Kafka cluster. |
3580 | * (RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) |
3581 | * - ENOENT - topic is unknown in the Kafka cluster. |
3582 | * (RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) |
3583 | * - ECANCELED - fatal error has been raised on producer, see |
3584 | * rd_kafka_fatal_error(). |
3585 | * |
3586 | * @sa Use rd_kafka_errno2err() to convert `errno` to rdkafka error code. |
3587 | */ |
3588 | RD_EXPORT |
3589 | int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, |
3590 | int msgflags, |
3591 | void *payload, size_t len, |
3592 | const void *key, size_t keylen, |
3593 | void *msg_opaque); |
3594 | |
3595 | |
3596 | /** |
3597 | * @brief Produce and send a single message to broker. |
3598 | * |
3599 | * The message is defined by a va-arg list using \c rd_kafka_vtype_t |
3600 | * tag tuples which must be terminated with a single \c RD_KAFKA_V_END. |
3601 | * |
3602 | * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, else an error code. |
3603 | * \c RD_KAFKA_RESP_ERR__CONFLICT is returned if _V_HEADER and |
3604 | * _V_HEADERS are mixed. |
3605 | * |
3606 | * @sa rd_kafka_produce, RD_KAFKA_V_END |
3607 | */ |
3608 | RD_EXPORT |
3609 | rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...); |
3610 | |
3611 | |
3612 | /** |
3613 | * @brief Produce multiple messages. |
3614 | * |
3615 | * If partition is RD_KAFKA_PARTITION_UA the configured partitioner will |
3616 | * be run for each message (slower), otherwise the messages will be enqueued |
3617 | * to the specified partition directly (faster). |
3618 | * |
3619 | * The messages are provided in the array \p rkmessages of count \p message_cnt |
3620 | * elements. |
3621 | * The \p partition and \p msgflags are used for all provided messages. |
3622 | * |
3623 | * Honoured \p rkmessages[] fields are: |
3624 | * - payload,len Message payload and length |
3625 | * - key,key_len Optional message key |
3626 | * - _private Message opaque pointer (msg_opaque) |
3627 | * - err Will be set according to success or failure. |
3628 | * Application only needs to check for errors if |
3629 | * return value != \p message_cnt. |
3630 | * |
3631 | * @remark If \c RD_KAFKA_MSG_F_PARTITION is set in \p msgflags, the |
3632 | * \c .partition field of the \p rkmessages is used instead of |
3633 | * \p partition. |
3634 | * |
3635 | * @returns the number of messages succesfully enqueued for producing. |
3636 | * |
3637 | * @remark This interface does NOT support setting message headers on |
3638 | * the provided \p rkmessages. |
3639 | */ |
3640 | RD_EXPORT |
3641 | int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, |
3642 | int msgflags, |
3643 | rd_kafka_message_t *rkmessages, int message_cnt); |
3644 | |
3645 | |
3646 | |
3647 | |
3648 | /** |
3649 | * @brief Wait until all outstanding produce requests, et.al, are completed. |
3650 | * This should typically be done prior to destroying a producer instance |
3651 | * to make sure all queued and in-flight produce requests are completed |
3652 | * before terminating. |
3653 | * |
3654 | * @remark This function will call rd_kafka_poll() and thus trigger callbacks. |
3655 | * |
3656 | * @returns RD_KAFKA_RESP_ERR__TIMED_OUT if \p timeout_ms was reached before all |
3657 | * outstanding requests were completed, else RD_KAFKA_RESP_ERR_NO_ERROR |
3658 | * |
3659 | * @sa rd_kafka_outq_len() |
3660 | */ |
3661 | RD_EXPORT |
3662 | rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms); |
3663 | |
3664 | |
3665 | |
3666 | /** |
3667 | * @brief Purge messages currently handled by the producer instance. |
3668 | * |
3669 | * @param purge_flags tells which messages should be purged and how. |
3670 | * |
3671 | * The application will need to call rd_kafka_poll() or rd_kafka_flush() |
3672 | * afterwards to serve the delivery report callbacks of the purged messages. |
3673 | * |
3674 | * Messages purged from internal queues fail with the delivery report |
3675 | * error code set to RD_KAFKA_RESP_ERR__PURGE_QUEUE, while purged messages that |
3676 | * are in-flight to or from the broker will fail with the error code set to |
3677 | * RD_KAFKA_RESP_ERR__PURGE_INFLIGHT. |
3678 | * |
3679 | * @warning Purging messages that are in-flight to or from the broker |
3680 | * will ignore any sub-sequent acknowledgement for these messages |
3681 | * received from the broker, effectively making it impossible |
3682 | * for the application to know if the messages were successfully |
3683 | * produced or not. This may result in duplicate messages if the |
3684 | * application retries these messages at a later time. |
3685 | * |
3686 | * @remark This call may block for a short time while background thread |
3687 | * queues are purged. |
3688 | * |
3689 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, |
3690 | * RD_KAFKA_RESP_ERR__INVALID_ARG if the \p purge flags are invalid |
3691 | * or unknown, |
3692 | * RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if called on a non-producer |
3693 | * client instance. |
3694 | */ |
3695 | RD_EXPORT |
3696 | rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags); |
3697 | |
3698 | |
3699 | /** |
3700 | * @brief Flags for rd_kafka_purge() |
3701 | */ |
3702 | |
3703 | /*! |
3704 | * Purge messages in internal queues. |
3705 | */ |
3706 | #define RD_KAFKA_PURGE_F_QUEUE 0x1 |
3707 | |
3708 | /*! |
3709 | * Purge messages in-flight to or from the broker. |
3710 | * Purging these messages will void any future acknowledgements from the |
3711 | * broker, making it impossible for the application to know if these |
3712 | * messages were successfully delivered or not. |
3713 | * Retrying these messages may lead to duplicates. |
3714 | */ |
3715 | #define RD_KAFKA_PURGE_F_INFLIGHT 0x2 |
3716 | |
3717 | |
3718 | /*! |
3719 | * Don't wait for background thread queue purging to finish. |
3720 | */ |
3721 | #define RD_KAFKA_PURGE_F_NON_BLOCKING 0x4 |
3722 | |
3723 | |
3724 | /**@}*/ |
3725 | |
3726 | |
3727 | /** |
3728 | * @name Metadata API |
3729 | * @{ |
3730 | * |
3731 | * |
3732 | */ |
3733 | |
3734 | |
3735 | /** |
3736 | * @brief Broker information |
3737 | */ |
3738 | typedef struct rd_kafka_metadata_broker { |
3739 | int32_t id; /**< Broker Id */ |
3740 | char *host; /**< Broker hostname */ |
3741 | int port; /**< Broker listening port */ |
3742 | } rd_kafka_metadata_broker_t; |
3743 | |
3744 | /** |
3745 | * @brief Partition information |
3746 | */ |
3747 | typedef struct rd_kafka_metadata_partition { |
3748 | int32_t id; /**< Partition Id */ |
3749 | rd_kafka_resp_err_t err; /**< Partition error reported by broker */ |
3750 | int32_t leader; /**< Leader broker */ |
3751 | int replica_cnt; /**< Number of brokers in \p replicas */ |
3752 | int32_t *replicas; /**< Replica brokers */ |
3753 | int isr_cnt; /**< Number of ISR brokers in \p isrs */ |
3754 | int32_t *isrs; /**< In-Sync-Replica brokers */ |
3755 | } rd_kafka_metadata_partition_t; |
3756 | |
3757 | /** |
3758 | * @brief Topic information |
3759 | */ |
3760 | typedef struct rd_kafka_metadata_topic { |
3761 | char *topic; /**< Topic name */ |
3762 | int partition_cnt; /**< Number of partitions in \p partitions*/ |
3763 | struct rd_kafka_metadata_partition *partitions; /**< Partitions */ |
3764 | rd_kafka_resp_err_t err; /**< Topic error reported by broker */ |
3765 | } rd_kafka_metadata_topic_t; |
3766 | |
3767 | |
3768 | /** |
3769 | * @brief Metadata container |
3770 | */ |
3771 | typedef struct rd_kafka_metadata { |
3772 | int broker_cnt; /**< Number of brokers in \p brokers */ |
3773 | struct rd_kafka_metadata_broker *brokers; /**< Brokers */ |
3774 | |
3775 | int topic_cnt; /**< Number of topics in \p topics */ |
3776 | struct rd_kafka_metadata_topic *topics; /**< Topics */ |
3777 | |
3778 | int32_t orig_broker_id; /**< Broker originating this metadata */ |
3779 | char *orig_broker_name; /**< Name of originating broker */ |
3780 | } rd_kafka_metadata_t; |
3781 | |
3782 | |
3783 | /** |
3784 | * @brief Request Metadata from broker. |
3785 | * |
3786 | * Parameters: |
3787 | * - \p all_topics if non-zero: request info about all topics in cluster, |
3788 | * if zero: only request info about locally known topics. |
3789 | * - \p only_rkt only request info about this topic |
3790 | * - \p metadatap pointer to hold metadata result. |
3791 | * The \p *metadatap pointer must be released |
3792 | * with rd_kafka_metadata_destroy(). |
3793 | * - \p timeout_ms maximum response time before failing. |
3794 | * |
3795 | * Returns RD_KAFKA_RESP_ERR_NO_ERROR on success (in which case *metadatap) |
3796 | * will be set, else RD_KAFKA_RESP_ERR__TIMED_OUT on timeout or |
3797 | * other error code on error. |
3798 | */ |
3799 | RD_EXPORT |
3800 | rd_kafka_resp_err_t |
3801 | rd_kafka_metadata (rd_kafka_t *rk, int all_topics, |
3802 | rd_kafka_topic_t *only_rkt, |
3803 | const struct rd_kafka_metadata **metadatap, |
3804 | int timeout_ms); |
3805 | |
3806 | /** |
3807 | * @brief Release metadata memory. |
3808 | */ |
3809 | RD_EXPORT |
3810 | void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata); |
3811 | |
3812 | |
3813 | /**@}*/ |
3814 | |
3815 | |
3816 | |
3817 | /** |
3818 | * @name Client group information |
3819 | * @{ |
3820 | * |
3821 | * |
3822 | */ |
3823 | |
3824 | |
3825 | /** |
3826 | * @brief Group member information |
3827 | * |
3828 | * For more information on \p member_metadata format, see |
3829 | * https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI |
3830 | * |
3831 | */ |
3832 | struct rd_kafka_group_member_info { |
3833 | char *member_id; /**< Member id (generated by broker) */ |
3834 | char *client_id; /**< Client's \p client.id */ |
3835 | char *client_host; /**< Client's hostname */ |
3836 | void *member_metadata; /**< Member metadata (binary), |
3837 | * format depends on \p protocol_type. */ |
3838 | int member_metadata_size; /**< Member metadata size in bytes */ |
3839 | void *member_assignment; /**< Member assignment (binary), |
3840 | * format depends on \p protocol_type. */ |
3841 | int member_assignment_size; /**< Member assignment size in bytes */ |
3842 | }; |
3843 | |
3844 | /** |
3845 | * @brief Group information |
3846 | */ |
3847 | struct rd_kafka_group_info { |
3848 | struct rd_kafka_metadata_broker broker; /**< Originating broker info */ |
3849 | char *group; /**< Group name */ |
3850 | rd_kafka_resp_err_t err; /**< Broker-originated error */ |
3851 | char *state; /**< Group state */ |
3852 | char *protocol_type; /**< Group protocol type */ |
3853 | char *protocol; /**< Group protocol */ |
3854 | struct rd_kafka_group_member_info *members; /**< Group members */ |
3855 | int member_cnt; /**< Group member count */ |
3856 | }; |
3857 | |
3858 | /** |
3859 | * @brief List of groups |
3860 | * |
3861 | * @sa rd_kafka_group_list_destroy() to release list memory. |
3862 | */ |
3863 | struct rd_kafka_group_list { |
3864 | struct rd_kafka_group_info *groups; /**< Groups */ |
3865 | int group_cnt; /**< Group count */ |
3866 | }; |
3867 | |
3868 | |
3869 | /** |
3870 | * @brief List and describe client groups in cluster. |
3871 | * |
3872 | * \p group is an optional group name to describe, otherwise (\p NULL) all |
3873 | * groups are returned. |
3874 | * |
3875 | * \p timeout_ms is the (approximate) maximum time to wait for response |
3876 | * from brokers and must be a positive value. |
3877 | * |
3878 | * @returns \c RD_KAFKA_RESP_ERR__NO_ERROR on success and \p grplistp is |
3879 | * updated to point to a newly allocated list of groups. |
3880 | * \c RD_KAFKA_RESP_ERR__PARTIAL if not all brokers responded |
3881 | * in time but at least one group is returned in \p grplistlp. |
3882 | * \c RD_KAFKA_RESP_ERR__TIMED_OUT if no groups were returned in the |
3883 | * given timeframe but not all brokers have yet responded, or |
3884 | * if the list of brokers in the cluster could not be obtained within |
3885 | * the given timeframe. |
3886 | * \c RD_KAFKA_RESP_ERR__TRANSPORT if no brokers were found. |
3887 | * Other error codes may also be returned from the request layer. |
3888 | * |
3889 | * The \p grplistp remains untouched if any error code is returned, |
3890 | * with the exception of RD_KAFKA_RESP_ERR__PARTIAL which behaves |
3891 | * as RD_KAFKA_RESP_ERR__NO_ERROR (success) but with an incomplete |
3892 | * group list. |
3893 | * |
3894 | * @sa Use rd_kafka_group_list_destroy() to release list memory. |
3895 | */ |
3896 | RD_EXPORT |
3897 | rd_kafka_resp_err_t |
3898 | rd_kafka_list_groups (rd_kafka_t *rk, const char *group, |
3899 | const struct rd_kafka_group_list **grplistp, |
3900 | int timeout_ms); |
3901 | |
3902 | /** |
3903 | * @brief Release list memory |
3904 | */ |
3905 | RD_EXPORT |
3906 | void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist); |
3907 | |
3908 | |
3909 | /**@}*/ |
3910 | |
3911 | |
3912 | |
3913 | /** |
3914 | * @name Miscellaneous APIs |
3915 | * @{ |
3916 | * |
3917 | */ |
3918 | |
3919 | |
3920 | /** |
3921 | * @brief Adds one or more brokers to the kafka handle's list of initial |
3922 | * bootstrap brokers. |
3923 | * |
3924 | * Additional brokers will be discovered automatically as soon as rdkafka |
3925 | * connects to a broker by querying the broker metadata. |
3926 | * |
3927 | * If a broker name resolves to multiple addresses (and possibly |
3928 | * address families) all will be used for connection attempts in |
3929 | * round-robin fashion. |
3930 | * |
3931 | * \p brokerlist is a ,-separated list of brokers in the format: |
3932 | * \c \<broker1\>,\<broker2\>,.. |
3933 | * Where each broker is in either the host or URL based format: |
3934 | * \c \<host\>[:\<port\>] |
3935 | * \c \<proto\>://\<host\>[:port] |
3936 | * \c \<proto\> is either \c PLAINTEXT, \c SSL, \c SASL, \c SASL_PLAINTEXT |
3937 | * The two formats can be mixed but ultimately the value of the |
3938 | * `security.protocol` config property decides what brokers are allowed. |
3939 | * |
3940 | * Example: |
3941 | * brokerlist = "broker1:10000,broker2" |
3942 | * brokerlist = "SSL://broker3:9000,ssl://broker2" |
3943 | * |
3944 | * @returns the number of brokers successfully added. |
3945 | * |
3946 | * @remark Brokers may also be defined with the \c metadata.broker.list or |
3947 | * \c bootstrap.servers configuration property (preferred method). |
3948 | */ |
3949 | RD_EXPORT |
3950 | int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist); |
3951 | |
3952 | |
3953 | |
3954 | |
3955 | /** |
3956 | * @brief Set logger function. |
3957 | * |
3958 | * The default is to print to stderr, but a syslog logger is also available, |
3959 | * see rd_kafka_log_(print|syslog) for the builtin alternatives. |
3960 | * Alternatively the application may provide its own logger callback. |
3961 | * Or pass 'func' as NULL to disable logging. |
3962 | * |
3963 | * @deprecated Use rd_kafka_conf_set_log_cb() |
3964 | * |
3965 | * @remark \p rk may be passed as NULL in the callback. |
3966 | */ |
3967 | RD_EXPORT RD_DEPRECATED |
3968 | void rd_kafka_set_logger(rd_kafka_t *rk, |
3969 | void (*func) (const rd_kafka_t *rk, int level, |
3970 | const char *fac, const char *buf)); |
3971 | |
3972 | |
3973 | /** |
3974 | * @brief Specifies the maximum logging level emitted by |
3975 | * internal kafka logging and debugging. |
3976 | * |
3977 | * @deprecated Set the \c "log_level" configuration property instead. |
3978 | * |
3979 | * @remark If the \p \"debug\" configuration property is set the log level is |
3980 | * automatically adjusted to \c LOG_DEBUG (7). |
3981 | */ |
3982 | RD_EXPORT |
3983 | void rd_kafka_set_log_level(rd_kafka_t *rk, int level); |
3984 | |
3985 | |
3986 | /** |
3987 | * @brief Builtin (default) log sink: print to stderr |
3988 | */ |
3989 | RD_EXPORT |
3990 | void rd_kafka_log_print(const rd_kafka_t *rk, int level, |
3991 | const char *fac, const char *buf); |
3992 | |
3993 | |
3994 | /** |
3995 | * @brief Builtin log sink: print to syslog. |
3996 | */ |
3997 | RD_EXPORT |
3998 | void rd_kafka_log_syslog(const rd_kafka_t *rk, int level, |
3999 | const char *fac, const char *buf); |
4000 | |
4001 | |
4002 | /** |
4003 | * @brief Returns the current out queue length. |
4004 | * |
4005 | * The out queue length is the sum of: |
4006 | * - number of messages waiting to be sent to, or acknowledged by, |
4007 | * the broker. |
4008 | * - number of delivery reports (e.g., dr_msg_cb) waiting to be served |
4009 | * by rd_kafka_poll() or rd_kafka_flush(). |
4010 | * - number of callbacks (e.g., error_cb, stats_cb, etc) waiting to be |
4011 | * served by rd_kafka_poll(), rd_kafka_consumer_poll() or rd_kafka_flush(). |
4012 | * - number of events waiting to be served by background_event_cb() in |
4013 | * the background queue (see rd_kafka_conf_set_background_event_cb). |
4014 | * |
4015 | * An application should wait for the return value of this function to reach |
4016 | * zero before terminating to make sure outstanding messages, |
4017 | * requests (such as offset commits), callbacks and events are fully processed. |
4018 | * See rd_kafka_flush(). |
4019 | * |
4020 | * @returns number of messages and events waiting in queues. |
4021 | * |
4022 | * @sa rd_kafka_flush() |
4023 | */ |
4024 | RD_EXPORT |
4025 | int rd_kafka_outq_len(rd_kafka_t *rk); |
4026 | |
4027 | |
4028 | |
4029 | /** |
4030 | * @brief Dumps rdkafka's internal state for handle \p rk to stream \p fp |
4031 | * |
4032 | * This is only useful for debugging rdkafka, showing state and statistics |
4033 | * for brokers, topics, partitions, etc. |
4034 | */ |
4035 | RD_EXPORT |
4036 | void rd_kafka_dump(FILE *fp, rd_kafka_t *rk); |
4037 | |
4038 | |
4039 | |
4040 | /** |
4041 | * @brief Retrieve the current number of threads in use by librdkafka. |
4042 | * |
4043 | * Used by regression tests. |
4044 | */ |
4045 | RD_EXPORT |
4046 | int rd_kafka_thread_cnt(void); |
4047 | |
4048 | |
4049 | /** |
4050 | * @brief Wait for all rd_kafka_t objects to be destroyed. |
4051 | * |
4052 | * Returns 0 if all kafka objects are now destroyed, or -1 if the |
4053 | * timeout was reached. |
4054 | * |
4055 | * @remark This function is deprecated. |
4056 | */ |
4057 | RD_EXPORT |
4058 | int rd_kafka_wait_destroyed(int timeout_ms); |
4059 | |
4060 | |
4061 | /** |
4062 | * @brief Run librdkafka's built-in unit-tests. |
4063 | * |
4064 | * @returns the number of failures, or 0 if all tests passed. |
4065 | */ |
4066 | RD_EXPORT |
4067 | int rd_kafka_unittest (void); |
4068 | |
4069 | |
4070 | /**@}*/ |
4071 | |
4072 | |
4073 | |
4074 | |
4075 | /** |
4076 | * @name Experimental APIs |
4077 | * @{ |
4078 | */ |
4079 | |
4080 | /** |
4081 | * @brief Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer's |
4082 | * queue (rd_kafka_consumer_poll()). |
4083 | * |
4084 | * @warning It is not permitted to call rd_kafka_poll() after directing the |
4085 | * main queue with rd_kafka_poll_set_consumer(). |
4086 | */ |
4087 | RD_EXPORT |
4088 | rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk); |
4089 | |
4090 | |
4091 | /**@}*/ |
4092 | |
4093 | /** |
4094 | * @name Event interface |
4095 | * |
4096 | * @brief The event API provides an alternative pollable non-callback interface |
4097 | * to librdkafka's message and event queues. |
4098 | * |
4099 | * @{ |
4100 | */ |
4101 | |
4102 | |
4103 | /** |
4104 | * @brief Event types |
4105 | */ |
4106 | typedef int rd_kafka_event_type_t; |
4107 | #define RD_KAFKA_EVENT_NONE 0x0 |
4108 | #define RD_KAFKA_EVENT_DR 0x1 /**< Producer Delivery report batch */ |
4109 | #define RD_KAFKA_EVENT_FETCH 0x2 /**< Fetched message (consumer) */ |
4110 | #define RD_KAFKA_EVENT_LOG 0x4 /**< Log message */ |
4111 | #define RD_KAFKA_EVENT_ERROR 0x8 /**< Error */ |
4112 | #define RD_KAFKA_EVENT_REBALANCE 0x10 /**< Group rebalance (consumer) */ |
4113 | #define RD_KAFKA_EVENT_OFFSET_COMMIT 0x20 /**< Offset commit result */ |
4114 | #define RD_KAFKA_EVENT_STATS 0x40 /**< Stats */ |
4115 | #define RD_KAFKA_EVENT_CREATETOPICS_RESULT 100 /**< CreateTopics_result_t */ |
4116 | #define RD_KAFKA_EVENT_DELETETOPICS_RESULT 101 /**< DeleteTopics_result_t */ |
4117 | #define RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT 102 /**< CreatePartitions_result_t */ |
4118 | #define RD_KAFKA_EVENT_ALTERCONFIGS_RESULT 103 /**< AlterConfigs_result_t */ |
4119 | #define RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT 104 /**< DescribeConfigs_result_t */ |
4120 | #define RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH 0x100 /**< SASL/OAUTHBEARER |
4121 | token needs to be |
4122 | refreshed */ |
4123 | |
4124 | |
4125 | /** |
4126 | * @returns the event type for the given event. |
4127 | * |
4128 | * @remark As a convenience it is okay to pass \p rkev as NULL in which case |
4129 | * RD_KAFKA_EVENT_NONE is returned. |
4130 | */ |
4131 | RD_EXPORT |
4132 | rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev); |
4133 | |
4134 | /** |
4135 | * @returns the event type's name for the given event. |
4136 | * |
4137 | * @remark As a convenience it is okay to pass \p rkev as NULL in which case |
4138 | * the name for RD_KAFKA_EVENT_NONE is returned. |
4139 | */ |
4140 | RD_EXPORT |
4141 | const char *rd_kafka_event_name (const rd_kafka_event_t *rkev); |
4142 | |
4143 | |
4144 | /** |
4145 | * @brief Destroy an event. |
4146 | * |
4147 | * @remark Any references to this event, such as extracted messages, |
4148 | * will not be usable after this call. |
4149 | * |
4150 | * @remark As a convenience it is okay to pass \p rkev as NULL in which case |
4151 | * no action is performed. |
4152 | */ |
4153 | RD_EXPORT |
4154 | void rd_kafka_event_destroy (rd_kafka_event_t *rkev); |
4155 | |
4156 | |
4157 | /** |
4158 | * @returns the next message from an event. |
4159 | * |
4160 | * Call repeatedly until it returns NULL. |
4161 | * |
4162 | * Event types: |
4163 | * - RD_KAFKA_EVENT_FETCH (1 message) |
4164 | * - RD_KAFKA_EVENT_DR (>=1 message(s)) |
4165 | * |
4166 | * @remark The returned message(s) MUST NOT be |
4167 | * freed with rd_kafka_message_destroy(). |
4168 | * |
4169 | * @remark on_consume() interceptor may be called |
4170 | * from this function prior to passing message to application. |
4171 | */ |
4172 | RD_EXPORT |
4173 | const rd_kafka_message_t *rd_kafka_event_message_next (rd_kafka_event_t *rkev); |
4174 | |
4175 | |
4176 | /** |
4177 | * @brief Extacts \p size message(s) from the event into the |
4178 | * pre-allocated array \p rkmessages. |
4179 | * |
4180 | * Event types: |
4181 | * - RD_KAFKA_EVENT_FETCH (1 message) |
4182 | * - RD_KAFKA_EVENT_DR (>=1 message(s)) |
4183 | * |
4184 | * @returns the number of messages extracted. |
4185 | * |
4186 | * @remark on_consume() interceptor may be called |
4187 | * from this function prior to passing message to application. |
4188 | */ |
4189 | RD_EXPORT |
4190 | size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev, |
4191 | const rd_kafka_message_t **rkmessages, |
4192 | size_t size); |
4193 | |
4194 | |
4195 | /** |
4196 | * @returns the number of remaining messages in the event. |
4197 | * |
4198 | * Event types: |
4199 | * - RD_KAFKA_EVENT_FETCH (1 message) |
4200 | * - RD_KAFKA_EVENT_DR (>=1 message(s)) |
4201 | */ |
4202 | RD_EXPORT |
4203 | size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev); |
4204 | |
4205 | |
4206 | /** |
4207 | * @returns the associated configuration string for the event, or NULL |
4208 | * if the configuration property is not set or if |
4209 | * not applicable for the given event type. |
4210 | * |
4211 | * The returned memory is read-only and its lifetime is the same as the |
4212 | * event object. |
4213 | * |
4214 | * Event types: |
4215 | * - RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: value of sasl.oauthbearer.config |
4216 | */ |
4217 | RD_EXPORT |
4218 | const char *rd_kafka_event_config_string (rd_kafka_event_t *rkev); |
4219 | |
4220 | |
4221 | /** |
4222 | * @returns the error code for the event. |
4223 | * |
4224 | * Use rd_kafka_event_error_is_fatal() to detect if this is a fatal error. |
4225 | * |
4226 | * Event types: |
4227 | * - all |
4228 | */ |
4229 | RD_EXPORT |
4230 | rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev); |
4231 | |
4232 | |
4233 | /** |
4234 | * @returns the error string (if any). |
4235 | * An application should check that rd_kafka_event_error() returns |
4236 | * non-zero before calling this function. |
4237 | * |
4238 | * Event types: |
4239 | * - all |
4240 | */ |
4241 | RD_EXPORT |
4242 | const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev); |
4243 | |
4244 | |
4245 | /** |
4246 | * @returns 1 if the error is a fatal error, else 0. |
4247 | * |
4248 | * Event types: |
4249 | * - RD_KAFKA_EVENT_ERROR |
4250 | * |
4251 | * @sa rd_kafka_fatal_error() |
4252 | */ |
4253 | RD_EXPORT |
4254 | int rd_kafka_event_error_is_fatal (rd_kafka_event_t *rkev); |
4255 | |
4256 | |
4257 | /** |
4258 | * @returns the user opaque (if any) |
4259 | * |
4260 | * Event types: |
4261 | * - RD_KAFKA_EVENT_OFFSET_COMMIT |
4262 | * - RD_KAFKA_EVENT_CREATETOPICS_RESULT |
4263 | * - RD_KAFKA_EVENT_DELETETOPICS_RESULT |
4264 | * - RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT |
4265 | * - RD_KAFKA_EVENT_ALTERCONFIGS_RESULT |
4266 | * - RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT |
4267 | */ |
4268 | RD_EXPORT |
4269 | void *rd_kafka_event_opaque (rd_kafka_event_t *rkev); |
4270 | |
4271 | |
4272 | /** |
4273 | * @brief Extract log message from the event. |
4274 | * |
4275 | * Event types: |
4276 | * - RD_KAFKA_EVENT_LOG |
4277 | * |
4278 | * @returns 0 on success or -1 if unsupported event type. |
4279 | */ |
4280 | RD_EXPORT |
4281 | int rd_kafka_event_log (rd_kafka_event_t *rkev, |
4282 | const char **fac, const char **str, int *level); |
4283 | |
4284 | |
4285 | /** |
4286 | * @brief Extract stats from the event. |
4287 | * |
4288 | * Event types: |
4289 | * - RD_KAFKA_EVENT_STATS |
4290 | * |
4291 | * @returns stats json string. |
4292 | * |
4293 | * @remark the returned string will be freed automatically along with the event object |
4294 | * |
4295 | */ |
4296 | RD_EXPORT |
4297 | const char *rd_kafka_event_stats (rd_kafka_event_t *rkev); |
4298 | |
4299 | |
4300 | /** |
4301 | * @returns the topic partition list from the event. |
4302 | * |
4303 | * @remark The list MUST NOT be freed with rd_kafka_topic_partition_list_destroy() |
4304 | * |
4305 | * Event types: |
4306 | * - RD_KAFKA_EVENT_REBALANCE |
4307 | * - RD_KAFKA_EVENT_OFFSET_COMMIT |
4308 | */ |
4309 | RD_EXPORT rd_kafka_topic_partition_list_t * |
4310 | rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev); |
4311 | |
4312 | |
4313 | /** |
4314 | * @returns a newly allocated topic_partition container, if applicable for the event type, |
4315 | * else NULL. |
4316 | * |
4317 | * @remark The returned pointer MUST be freed with rd_kafka_topic_partition_destroy(). |
4318 | * |
4319 | * Event types: |
4320 | * RD_KAFKA_EVENT_ERROR (for partition level errors) |
4321 | */ |
4322 | RD_EXPORT rd_kafka_topic_partition_t * |
4323 | rd_kafka_event_topic_partition (rd_kafka_event_t *rkev); |
4324 | |
4325 | |
4326 | |
4327 | typedef rd_kafka_event_t rd_kafka_CreateTopics_result_t; |
4328 | typedef rd_kafka_event_t rd_kafka_DeleteTopics_result_t; |
4329 | typedef rd_kafka_event_t rd_kafka_CreatePartitions_result_t; |
4330 | typedef rd_kafka_event_t rd_kafka_AlterConfigs_result_t; |
4331 | typedef rd_kafka_event_t rd_kafka_DescribeConfigs_result_t; |
4332 | |
4333 | /** |
4334 | * @returns the result of a CreateTopics request, or NULL if event is of |
4335 | * different type. |
4336 | * |
4337 | * Event types: |
4338 | * RD_KAFKA_EVENT_CREATETOPICS_RESULT |
4339 | */ |
4340 | RD_EXPORT const rd_kafka_CreateTopics_result_t * |
4341 | rd_kafka_event_CreateTopics_result (rd_kafka_event_t *rkev); |
4342 | |
4343 | /** |
4344 | * @returns the result of a DeleteTopics request, or NULL if event is of |
4345 | * different type. |
4346 | * |
4347 | * Event types: |
4348 | * RD_KAFKA_EVENT_DELETETOPICS_RESULT |
4349 | */ |
4350 | RD_EXPORT const rd_kafka_DeleteTopics_result_t * |
4351 | rd_kafka_event_DeleteTopics_result (rd_kafka_event_t *rkev); |
4352 | |
4353 | /** |
4354 | * @returns the result of a CreatePartitions request, or NULL if event is of |
4355 | * different type. |
4356 | * |
4357 | * Event types: |
4358 | * RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT |
4359 | */ |
4360 | RD_EXPORT const rd_kafka_CreatePartitions_result_t * |
4361 | rd_kafka_event_CreatePartitions_result (rd_kafka_event_t *rkev); |
4362 | |
4363 | /** |
4364 | * @returns the result of a AlterConfigs request, or NULL if event is of |
4365 | * different type. |
4366 | * |
4367 | * Event types: |
4368 | * RD_KAFKA_EVENT_ALTERCONFIGS_RESULT |
4369 | */ |
4370 | RD_EXPORT const rd_kafka_AlterConfigs_result_t * |
4371 | rd_kafka_event_AlterConfigs_result (rd_kafka_event_t *rkev); |
4372 | |
4373 | /** |
4374 | * @returns the result of a DescribeConfigs request, or NULL if event is of |
4375 | * different type. |
4376 | * |
4377 | * Event types: |
4378 | * RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT |
4379 | */ |
4380 | RD_EXPORT const rd_kafka_DescribeConfigs_result_t * |
4381 | rd_kafka_event_DescribeConfigs_result (rd_kafka_event_t *rkev); |
4382 | |
4383 | |
4384 | |
4385 | |
4386 | /** |
4387 | * @brief Poll a queue for an event for max \p timeout_ms. |
4388 | * |
4389 | * @returns an event, or NULL. |
4390 | * |
4391 | * @remark Use rd_kafka_event_destroy() to free the event. |
4392 | * |
4393 | * @sa rd_kafka_conf_set_background_event_cb() |
4394 | */ |
4395 | RD_EXPORT |
4396 | rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms); |
4397 | |
4398 | /** |
4399 | * @brief Poll a queue for events served through callbacks for max \p timeout_ms. |
4400 | * |
4401 | * @returns the number of events served. |
4402 | * |
4403 | * @remark This API must only be used for queues with callbacks registered |
4404 | * for all expected event types. E.g., not a message queue. |
4405 | * |
4406 | * @remark Also see rd_kafka_conf_set_background_event_cb() for triggering |
4407 | * event callbacks from a librdkafka-managed background thread. |
4408 | * |
4409 | * @sa rd_kafka_conf_set_background_event_cb() |
4410 | */ |
4411 | RD_EXPORT |
4412 | int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms); |
4413 | |
4414 | |
4415 | /**@}*/ |
4416 | |
4417 | |
4418 | /** |
4419 | * @name Plugin interface |
4420 | * |
4421 | * @brief A plugin interface that allows external runtime-loaded libraries |
4422 | * to integrate with a client instance without modifications to |
4423 | * the application code. |
4424 | * |
4425 | * Plugins are loaded when referenced through the `plugin.library.paths` |
4426 | * configuration property and operates on the \c rd_kafka_conf_t |
4427 | * object prior \c rd_kafka_t instance creation. |
4428 | * |
4429 | * @warning Plugins require the application to link librdkafka dynamically |
4430 | * and not statically. Failure to do so will lead to missing symbols |
4431 | * or finding symbols in another librdkafka library than the |
4432 | * application was linked with. |
4433 | */ |
4434 | |
4435 | |
4436 | /** |
4437 | * @brief Plugin's configuration initializer method called each time the |
4438 | * library is referenced from configuration (even if previously loaded by |
4439 | * another client instance). |
4440 | * |
4441 | * @remark This method MUST be implemented by plugins and have the symbol name |
4442 | * \c conf_init |
4443 | * |
4444 | * @param conf Configuration set up to this point. |
4445 | * @param plug_opaquep Plugin can set this pointer to a per-configuration |
4446 | * opaque pointer. |
4447 | * @param errstr String buffer of size \p errstr_size where plugin must write |
4448 | * a human readable error string in the case the initializer |
4449 | * fails (returns non-zero). |
4450 | * |
4451 | * @remark A plugin may add an on_conf_destroy() interceptor to clean up |
4452 | * plugin-specific resources created in the plugin's conf_init() method. |
4453 | * |
4454 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error. |
4455 | */ |
4456 | typedef rd_kafka_resp_err_t |
4457 | (rd_kafka_plugin_f_conf_init_t) (rd_kafka_conf_t *conf, |
4458 | void **plug_opaquep, |
4459 | char *errstr, size_t errstr_size); |
4460 | |
4461 | /**@}*/ |
4462 | |
4463 | |
4464 | |
4465 | /** |
4466 | * @name Interceptors |
4467 | * |
4468 | * @{ |
4469 | * |
4470 | * @brief A callback interface that allows message interception for both |
4471 | * producer and consumer data pipelines. |
4472 | * |
4473 | * Except for the on_new(), on_conf_set(), on_conf_dup() and on_conf_destroy() |
4474 | * interceptors, interceptors are added to the |
4475 | * newly created rd_kafka_t client instance. These interceptors MUST only |
4476 | * be added from on_new() and MUST NOT be added after rd_kafka_new() returns. |
4477 | * |
4478 | * The on_new(), on_conf_set(), on_conf_dup() and on_conf_destroy() interceptors |
4479 | * are added to the configuration object which is later passed to |
4480 | * rd_kafka_new() where on_new() is called to allow addition of |
4481 | * other interceptors. |
4482 | * |
4483 | * Each interceptor reference consists of a display name (ic_name), |
4484 | * a callback function, and an application-specified opaque value that is |
4485 | * passed as-is to the callback. |
4486 | * The ic_name must be unique for the interceptor implementation and is used |
4487 | * to reject duplicate interceptor methods. |
4488 | * |
4489 | * Any number of interceptors can be added and they are called in the order |
4490 | * they were added, unless otherwise noted. |
4491 | * The list of registered interceptor methods are referred to as |
4492 | * interceptor chains. |
4493 | * |
4494 | * @remark Contrary to the Java client the librdkafka interceptor interface |
4495 | * does not support message key and value modification. |
4496 | * Message mutability is discouraged in the Java client and the |
4497 | * combination of serializers and headers cover most use-cases. |
4498 | * |
4499 | * @remark Interceptors are NOT copied to the new configuration on |
4500 | * rd_kafka_conf_dup() since it would be hard for interceptors to |
4501 | * track usage of the interceptor's opaque value. |
4502 | * An interceptor should rely on the plugin, which will be copied |
4503 | * in rd_kafka_conf_conf_dup(), to set up the initial interceptors. |
4504 | * An interceptor should implement the on_conf_dup() method |
4505 | * to manually set up its internal configuration on the newly created |
4506 | * configuration object that is being copied-to based on the |
4507 | * interceptor-specific configuration properties. |
4508 | * conf_dup() should thus be treated the same as conf_init(). |
4509 | * |
4510 | * @remark Interceptors are keyed by the interceptor type (on_..()), the |
4511 | * interceptor name (ic_name) and the interceptor method function. |
4512 | * Duplicates are not allowed and the .._add_on_..() method will |
4513 | * return RD_KAFKA_RESP_ERR__CONFLICT if attempting to add a duplicate |
4514 | * method. |
4515 | * The only exception is on_conf_destroy() which may be added multiple |
4516 | * times by the same interceptor to allow proper cleanup of |
4517 | * interceptor configuration state. |
4518 | */ |
4519 | |
4520 | |
4521 | /** |
4522 | * @brief on_conf_set() is called from rd_kafka_*_conf_set() in the order |
4523 | * the interceptors were added. |
4524 | * |
4525 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4526 | * @param name The configuration property to set. |
4527 | * @param val The configuration value to set, or NULL for reverting to default |
4528 | * in which case the previous value should be freed. |
4529 | * @param errstr A human readable error string in case the interceptor fails. |
4530 | * @param errstr_size Maximum space (including \0) in \p errstr. |
4531 | * |
4532 | * @returns RD_KAFKA_CONF_RES_OK if the property was known and successfully |
4533 | * handled by the interceptor, RD_KAFKA_CONF_RES_INVALID if the |
4534 | * property was handled by the interceptor but the value was invalid, |
4535 | * or RD_KAFKA_CONF_RES_UNKNOWN if the interceptor did not handle |
4536 | * this property, in which case the property is passed on on the |
4537 | * interceptor in the chain, finally ending up at the built-in |
4538 | * configuration handler. |
4539 | */ |
4540 | typedef rd_kafka_conf_res_t |
4541 | (rd_kafka_interceptor_f_on_conf_set_t) (rd_kafka_conf_t *conf, |
4542 | const char *name, const char *val, |
4543 | char *errstr, size_t errstr_size, |
4544 | void *ic_opaque); |
4545 | |
4546 | |
4547 | /** |
4548 | * @brief on_conf_dup() is called from rd_kafka_conf_dup() in the |
4549 | * order the interceptors were added and is used to let |
4550 | * an interceptor re-register its conf interecptors with a new |
4551 | * opaque value. |
4552 | * The on_conf_dup() method is called prior to the configuration from |
4553 | * \p old_conf being copied to \p new_conf. |
4554 | * |
4555 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4556 | * |
4557 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code |
4558 | * on failure (which is logged but otherwise ignored). |
4559 | * |
4560 | * @remark No on_conf_* interceptors are copied to the new configuration |
4561 | * object on rd_kafka_conf_dup(). |
4562 | */ |
4563 | typedef rd_kafka_resp_err_t |
4564 | (rd_kafka_interceptor_f_on_conf_dup_t) (rd_kafka_conf_t *new_conf, |
4565 | const rd_kafka_conf_t *old_conf, |
4566 | size_t filter_cnt, |
4567 | const char **filter, |
4568 | void *ic_opaque); |
4569 | |
4570 | |
4571 | /** |
4572 | * @brief on_conf_destroy() is called from rd_kafka_*_conf_destroy() in the |
4573 | * order the interceptors were added. |
4574 | * |
4575 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4576 | */ |
4577 | typedef rd_kafka_resp_err_t |
4578 | (rd_kafka_interceptor_f_on_conf_destroy_t) (void *ic_opaque); |
4579 | |
4580 | |
4581 | /** |
4582 | * @brief on_new() is called from rd_kafka_new() prior toreturning |
4583 | * the newly created client instance to the application. |
4584 | * |
4585 | * @param rk The client instance. |
4586 | * @param conf The client instance's final configuration. |
4587 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4588 | * @param errstr A human readable error string in case the interceptor fails. |
4589 | * @param errstr_size Maximum space (including \0) in \p errstr. |
4590 | * |
4591 | * @returns an error code on failure, the error is logged but otherwise ignored. |
4592 | * |
4593 | * @warning The \p rk client instance will not be fully set up when this |
4594 | * interceptor is called and the interceptor MUST NOT call any |
4595 | * other rk-specific APIs than rd_kafka_interceptor_add..(). |
4596 | * |
4597 | */ |
4598 | typedef rd_kafka_resp_err_t |
4599 | (rd_kafka_interceptor_f_on_new_t) (rd_kafka_t *rk, const rd_kafka_conf_t *conf, |
4600 | void *ic_opaque, |
4601 | char *errstr, size_t errstr_size); |
4602 | |
4603 | |
4604 | /** |
4605 | * @brief on_destroy() is called from rd_kafka_destroy() or (rd_kafka_new() |
4606 | * if rd_kafka_new() fails during initialization). |
4607 | * |
4608 | * @param rk The client instance. |
4609 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4610 | */ |
4611 | typedef rd_kafka_resp_err_t |
4612 | (rd_kafka_interceptor_f_on_destroy_t) (rd_kafka_t *rk, void *ic_opaque); |
4613 | |
4614 | |
4615 | |
4616 | |
4617 | /** |
4618 | * @brief on_send() is called from rd_kafka_produce*() (et.al) prior to |
4619 | * the partitioner being called. |
4620 | * |
4621 | * @param rk The client instance. |
4622 | * @param rkmessage The message being produced. Immutable. |
4623 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4624 | * |
4625 | * @remark This interceptor is only used by producer instances. |
4626 | * |
4627 | * @remark The \p rkmessage object is NOT mutable and MUST NOT be modified |
4628 | * by the interceptor. |
4629 | * |
4630 | * @remark If the partitioner fails or an unknown partition was specified, |
4631 | * the on_acknowledgement() interceptor chain will be called from |
4632 | * within the rd_kafka_produce*() call to maintain send-acknowledgement |
4633 | * symmetry. |
4634 | * |
4635 | * @returns an error code on failure, the error is logged but otherwise ignored. |
4636 | */ |
4637 | typedef rd_kafka_resp_err_t |
4638 | (rd_kafka_interceptor_f_on_send_t) (rd_kafka_t *rk, |
4639 | rd_kafka_message_t *rkmessage, |
4640 | void *ic_opaque); |
4641 | |
4642 | /** |
4643 | * @brief on_acknowledgement() is called to inform interceptors that a message |
4644 | * was succesfully delivered or permanently failed delivery. |
4645 | * The interceptor chain is called from internal librdkafka background |
4646 | * threads, or rd_kafka_produce*() if the partitioner failed. |
4647 | * |
4648 | * @param rk The client instance. |
4649 | * @param rkmessage The message being produced. Immutable. |
4650 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4651 | * |
4652 | * @remark This interceptor is only used by producer instances. |
4653 | * |
4654 | * @remark The \p rkmessage object is NOT mutable and MUST NOT be modified |
4655 | * by the interceptor. |
4656 | * |
4657 | * @warning The on_acknowledgement() method may be called from internal |
4658 | * librdkafka threads. An on_acknowledgement() interceptor MUST NOT |
4659 | * call any librdkafka API's associated with the \p rk, or perform |
4660 | * any blocking or prolonged work. |
4661 | * |
4662 | * @returns an error code on failure, the error is logged but otherwise ignored. |
4663 | */ |
4664 | typedef rd_kafka_resp_err_t |
4665 | (rd_kafka_interceptor_f_on_acknowledgement_t) (rd_kafka_t *rk, |
4666 | rd_kafka_message_t *rkmessage, |
4667 | void *ic_opaque); |
4668 | |
4669 | |
4670 | /** |
4671 | * @brief on_consume() is called just prior to passing the message to the |
4672 | * application in rd_kafka_consumer_poll(), rd_kafka_consume*(), |
4673 | * the event interface, etc. |
4674 | * |
4675 | * @param rk The client instance. |
4676 | * @param rkmessage The message being consumed. Immutable. |
4677 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4678 | * |
4679 | * @remark This interceptor is only used by consumer instances. |
4680 | * |
4681 | * @remark The \p rkmessage object is NOT mutable and MUST NOT be modified |
4682 | * by the interceptor. |
4683 | * |
4684 | * @returns an error code on failure, the error is logged but otherwise ignored. |
4685 | */ |
4686 | typedef rd_kafka_resp_err_t |
4687 | (rd_kafka_interceptor_f_on_consume_t) (rd_kafka_t *rk, |
4688 | rd_kafka_message_t *rkmessage, |
4689 | void *ic_opaque); |
4690 | |
4691 | /** |
4692 | * @brief on_commit() is called on completed or failed offset commit. |
4693 | * It is called from internal librdkafka threads. |
4694 | * |
4695 | * @param rk The client instance. |
4696 | * @param offsets List of topic+partition+offset+error that were committed. |
4697 | * The error message of each partition should be checked for |
4698 | * error. |
4699 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4700 | * |
4701 | * @remark This interceptor is only used by consumer instances. |
4702 | * |
4703 | * @warning The on_commit() interceptor is called from internal |
4704 | * librdkafka threads. An on_commit() interceptor MUST NOT |
4705 | * call any librdkafka API's associated with the \p rk, or perform |
4706 | * any blocking or prolonged work. |
4707 | * |
4708 | * |
4709 | * @returns an error code on failure, the error is logged but otherwise ignored. |
4710 | */ |
4711 | typedef rd_kafka_resp_err_t |
4712 | (rd_kafka_interceptor_f_on_commit_t) ( |
4713 | rd_kafka_t *rk, |
4714 | const rd_kafka_topic_partition_list_t *offsets, |
4715 | rd_kafka_resp_err_t err, void *ic_opaque); |
4716 | |
4717 | |
4718 | /** |
4719 | * @brief on_request_sent() is called when a request has been fully written |
4720 | * to a broker TCP connections socket. |
4721 | * |
4722 | * @param rk The client instance. |
4723 | * @param sockfd Socket file descriptor. |
4724 | * @param brokername Broker request is being sent to. |
4725 | * @param brokerid Broker request is being sent to. |
4726 | * @param ApiKey Kafka protocol request type. |
4727 | * @param ApiVersion Kafka protocol request type version. |
4728 | * @param Corrid Kafka protocol request correlation id. |
4729 | * @param size Size of request. |
4730 | * @param ic_opaque The interceptor's opaque pointer specified in ..add..(). |
4731 | * |
4732 | * @warning The on_request_sent() interceptor is called from internal |
4733 | * librdkafka broker threads. An on_request_sent() interceptor MUST NOT |
4734 | * call any librdkafka API's associated with the \p rk, or perform |
4735 | * any blocking or prolonged work. |
4736 | * |
4737 | * @returns an error code on failure, the error is logged but otherwise ignored. |
4738 | */ |
4739 | typedef rd_kafka_resp_err_t |
4740 | (rd_kafka_interceptor_f_on_request_sent_t) ( |
4741 | rd_kafka_t *rk, |
4742 | int sockfd, |
4743 | const char *brokername, |
4744 | int32_t brokerid, |
4745 | int16_t ApiKey, |
4746 | int16_t ApiVersion, |
4747 | int32_t CorrId, |
4748 | size_t size, |
4749 | void *ic_opaque); |
4750 | |
4751 | |
4752 | |
4753 | /** |
4754 | * @brief Append an on_conf_set() interceptor. |
4755 | * |
4756 | * @param conf Configuration object. |
4757 | * @param ic_name Interceptor name, used in logging. |
4758 | * @param on_conf_set Function pointer. |
4759 | * @param ic_opaque Opaque value that will be passed to the function. |
4760 | * |
4761 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4762 | * if an existing intercepted with the same \p ic_name and function |
4763 | * has already been added to \p conf. |
4764 | */ |
4765 | RD_EXPORT rd_kafka_resp_err_t |
4766 | rd_kafka_conf_interceptor_add_on_conf_set ( |
4767 | rd_kafka_conf_t *conf, const char *ic_name, |
4768 | rd_kafka_interceptor_f_on_conf_set_t *on_conf_set, |
4769 | void *ic_opaque); |
4770 | |
4771 | |
4772 | /** |
4773 | * @brief Append an on_conf_dup() interceptor. |
4774 | * |
4775 | * @param conf Configuration object. |
4776 | * @param ic_name Interceptor name, used in logging. |
4777 | * @param on_conf_dup Function pointer. |
4778 | * @param ic_opaque Opaque value that will be passed to the function. |
4779 | * |
4780 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4781 | * if an existing intercepted with the same \p ic_name and function |
4782 | * has already been added to \p conf. |
4783 | */ |
4784 | RD_EXPORT rd_kafka_resp_err_t |
4785 | rd_kafka_conf_interceptor_add_on_conf_dup ( |
4786 | rd_kafka_conf_t *conf, const char *ic_name, |
4787 | rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup, |
4788 | void *ic_opaque); |
4789 | |
4790 | /** |
4791 | * @brief Append an on_conf_destroy() interceptor. |
4792 | * |
4793 | * @param conf Configuration object. |
4794 | * @param ic_name Interceptor name, used in logging. |
4795 | * @param on_conf_destroy Function pointer. |
4796 | * @param ic_opaque Opaque value that will be passed to the function. |
4797 | * |
4798 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR |
4799 | * |
4800 | * @remark Multiple on_conf_destroy() interceptors are allowed to be added |
4801 | * to the same configuration object. |
4802 | */ |
4803 | RD_EXPORT rd_kafka_resp_err_t |
4804 | rd_kafka_conf_interceptor_add_on_conf_destroy ( |
4805 | rd_kafka_conf_t *conf, const char *ic_name, |
4806 | rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy, |
4807 | void *ic_opaque); |
4808 | |
4809 | |
4810 | /** |
4811 | * @brief Append an on_new() interceptor. |
4812 | * |
4813 | * @param conf Configuration object. |
4814 | * @param ic_name Interceptor name, used in logging. |
4815 | * @param on_send Function pointer. |
4816 | * @param ic_opaque Opaque value that will be passed to the function. |
4817 | * |
4818 | * @remark Since the on_new() interceptor is added to the configuration object |
4819 | * it may be copied by rd_kafka_conf_dup(). |
4820 | * An interceptor implementation must thus be able to handle |
4821 | * the same interceptor,ic_opaque tuple to be used by multiple |
4822 | * client instances. |
4823 | * |
4824 | * @remark An interceptor plugin should check the return value to make sure it |
4825 | * has not already been added. |
4826 | * |
4827 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4828 | * if an existing intercepted with the same \p ic_name and function |
4829 | * has already been added to \p conf. |
4830 | */ |
4831 | RD_EXPORT rd_kafka_resp_err_t |
4832 | rd_kafka_conf_interceptor_add_on_new ( |
4833 | rd_kafka_conf_t *conf, const char *ic_name, |
4834 | rd_kafka_interceptor_f_on_new_t *on_new, |
4835 | void *ic_opaque); |
4836 | |
4837 | |
4838 | |
4839 | /** |
4840 | * @brief Append an on_destroy() interceptor. |
4841 | * |
4842 | * @param rk Client instance. |
4843 | * @param ic_name Interceptor name, used in logging. |
4844 | * @param on_destroy Function pointer. |
4845 | * @param ic_opaque Opaque value that will be passed to the function. |
4846 | * |
4847 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4848 | * if an existing intercepted with the same \p ic_name and function |
4849 | * has already been added to \p conf. |
4850 | */ |
4851 | RD_EXPORT rd_kafka_resp_err_t |
4852 | rd_kafka_interceptor_add_on_destroy ( |
4853 | rd_kafka_t *rk, const char *ic_name, |
4854 | rd_kafka_interceptor_f_on_destroy_t *on_destroy, |
4855 | void *ic_opaque); |
4856 | |
4857 | |
4858 | /** |
4859 | * @brief Append an on_send() interceptor. |
4860 | * |
4861 | * @param rk Client instance. |
4862 | * @param ic_name Interceptor name, used in logging. |
4863 | * @param on_send Function pointer. |
4864 | * @param ic_opaque Opaque value that will be passed to the function. |
4865 | * |
4866 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4867 | * if an existing intercepted with the same \p ic_name and function |
4868 | * has already been added to \p conf. |
4869 | */ |
4870 | RD_EXPORT rd_kafka_resp_err_t |
4871 | rd_kafka_interceptor_add_on_send ( |
4872 | rd_kafka_t *rk, const char *ic_name, |
4873 | rd_kafka_interceptor_f_on_send_t *on_send, |
4874 | void *ic_opaque); |
4875 | |
4876 | /** |
4877 | * @brief Append an on_acknowledgement() interceptor. |
4878 | * |
4879 | * @param rk Client instance. |
4880 | * @param ic_name Interceptor name, used in logging. |
4881 | * @param on_acknowledgement Function pointer. |
4882 | * @param ic_opaque Opaque value that will be passed to the function. |
4883 | * |
4884 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4885 | * if an existing intercepted with the same \p ic_name and function |
4886 | * has already been added to \p conf. |
4887 | */ |
4888 | RD_EXPORT rd_kafka_resp_err_t |
4889 | rd_kafka_interceptor_add_on_acknowledgement ( |
4890 | rd_kafka_t *rk, const char *ic_name, |
4891 | rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement, |
4892 | void *ic_opaque); |
4893 | |
4894 | |
4895 | /** |
4896 | * @brief Append an on_consume() interceptor. |
4897 | * |
4898 | * @param rk Client instance. |
4899 | * @param ic_name Interceptor name, used in logging. |
4900 | * @param on_consume Function pointer. |
4901 | * @param ic_opaque Opaque value that will be passed to the function. |
4902 | * |
4903 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4904 | * if an existing intercepted with the same \p ic_name and function |
4905 | * has already been added to \p conf. |
4906 | */ |
4907 | RD_EXPORT rd_kafka_resp_err_t |
4908 | rd_kafka_interceptor_add_on_consume ( |
4909 | rd_kafka_t *rk, const char *ic_name, |
4910 | rd_kafka_interceptor_f_on_consume_t *on_consume, |
4911 | void *ic_opaque); |
4912 | |
4913 | |
4914 | /** |
4915 | * @brief Append an on_commit() interceptor. |
4916 | * |
4917 | * @param rk Client instance. |
4918 | * @param ic_name Interceptor name, used in logging. |
4919 | * @param on_commit() Function pointer. |
4920 | * @param ic_opaque Opaque value that will be passed to the function. |
4921 | * |
4922 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4923 | * if an existing intercepted with the same \p ic_name and function |
4924 | * has already been added to \p conf. |
4925 | */ |
4926 | RD_EXPORT rd_kafka_resp_err_t |
4927 | rd_kafka_interceptor_add_on_commit ( |
4928 | rd_kafka_t *rk, const char *ic_name, |
4929 | rd_kafka_interceptor_f_on_commit_t *on_commit, |
4930 | void *ic_opaque); |
4931 | |
4932 | |
4933 | /** |
4934 | * @brief Append an on_request_sent() interceptor. |
4935 | * |
4936 | * @param rk Client instance. |
4937 | * @param ic_name Interceptor name, used in logging. |
4938 | * @param on_request_sent() Function pointer. |
4939 | * @param ic_opaque Opaque value that will be passed to the function. |
4940 | * |
4941 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or RD_KAFKA_RESP_ERR__CONFLICT |
4942 | * if an existing intercepted with the same \p ic_name and function |
4943 | * has already been added to \p conf. |
4944 | */ |
4945 | RD_EXPORT rd_kafka_resp_err_t |
4946 | rd_kafka_interceptor_add_on_request_sent ( |
4947 | rd_kafka_t *rk, const char *ic_name, |
4948 | rd_kafka_interceptor_f_on_request_sent_t *on_request_sent, |
4949 | void *ic_opaque); |
4950 | |
4951 | |
4952 | |
4953 | |
4954 | /**@}*/ |
4955 | |
4956 | |
4957 | |
4958 | /** |
4959 | * @name Auxiliary types |
4960 | * |
4961 | * @{ |
4962 | */ |
4963 | |
4964 | |
4965 | |
4966 | /** |
4967 | * @brief Topic result provides per-topic operation result information. |
4968 | * |
4969 | */ |
4970 | |
4971 | /** |
4972 | * @returns the error code for the given topic result. |
4973 | */ |
4974 | RD_EXPORT rd_kafka_resp_err_t |
4975 | rd_kafka_topic_result_error (const rd_kafka_topic_result_t *topicres); |
4976 | |
4977 | /** |
4978 | * @returns the human readable error string for the given topic result, |
4979 | * or NULL if there was no error. |
4980 | * |
4981 | * @remark lifetime of the returned string is the same as the \p topicres. |
4982 | */ |
4983 | RD_EXPORT const char * |
4984 | rd_kafka_topic_result_error_string (const rd_kafka_topic_result_t *topicres); |
4985 | |
4986 | /** |
4987 | * @returns the name of the topic for the given topic result. |
4988 | * @remark lifetime of the returned string is the same as the \p topicres. |
4989 | * |
4990 | */ |
4991 | RD_EXPORT const char * |
4992 | rd_kafka_topic_result_name (const rd_kafka_topic_result_t *topicres); |
4993 | |
4994 | |
4995 | /**@}*/ |
4996 | |
4997 | |
4998 | /** |
4999 | * @name Admin API |
5000 | * |
5001 | * @{ |
5002 | * |
5003 | * @brief The Admin API enables applications to perform administrative |
5004 | * Apache Kafka tasks, such as creating and deleting topics, |
5005 | * altering and reading broker configuration, etc. |
5006 | * |
5007 | * The Admin API is asynchronous and makes use of librdkafka's standard |
5008 | * \c rd_kafka_queue_t queues to propagate the result of an admin operation |
5009 | * back to the application. |
5010 | * The supplied queue may be any queue, such as a temporary single-call queue, |
5011 | * a shared queue used for multiple requests, or even the main queue or |
5012 | * consumer queues. |
5013 | * |
5014 | * Use \c rd_kafka_queue_poll() to collect the result of an admin operation |
5015 | * from the queue of your choice, then extract the admin API-specific result |
5016 | * type by using the corresponding \c rd_kafka_event_CreateTopics_result, |
5017 | * \c rd_kafka_event_DescribeConfigs_result, etc, methods. |
5018 | * Use the getter methods on the \c .._result_t type to extract response |
5019 | * information and finally destroy the result and event by calling |
5020 | * \c rd_kafka_event_destroy(). |
5021 | * |
5022 | * Use rd_kafka_event_error() and rd_kafka_event_error_string() to acquire |
5023 | * the request-level error/success for an Admin API request. |
5024 | * Even if the returned value is \c RD_KAFKA_RESP_ERR_NO_ERROR there |
5025 | * may be individual objects (topics, resources, etc) that have failed. |
5026 | * Extract per-object error information with the corresponding |
5027 | * \c rd_kafka_..._result_topics|resources|..() to check per-object errors. |
5028 | * |
5029 | * Locally triggered errors: |
5030 | * - \c RD_KAFKA_RESP_ERR__TIMED_OUT - (Controller) broker connection did not |
5031 | * become available in the time allowed by AdminOption_set_request_timeout. |
5032 | */ |
5033 | |
5034 | |
5035 | /** |
5036 | * @enum rd_kafka_admin_op_t |
5037 | * |
5038 | * @brief Admin operation enum name for use with rd_kafka_AdminOptions_new() |
5039 | * |
5040 | * @sa rd_kafka_AdminOptions_new() |
5041 | */ |
5042 | typedef enum rd_kafka_admin_op_t { |
5043 | RD_KAFKA_ADMIN_OP_ANY = 0, /**< Default value */ |
5044 | RD_KAFKA_ADMIN_OP_CREATETOPICS, /**< CreateTopics */ |
5045 | RD_KAFKA_ADMIN_OP_DELETETOPICS, /**< DeleteTopics */ |
5046 | RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, /**< CreatePartitions */ |
5047 | RD_KAFKA_ADMIN_OP_ALTERCONFIGS, /**< AlterConfigs */ |
5048 | RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, /**< DescribeConfigs */ |
5049 | RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ |
5050 | } rd_kafka_admin_op_t; |
5051 | |
5052 | /** |
5053 | * @brief AdminOptions provides a generic mechanism for setting optional |
5054 | * parameters for the Admin API requests. |
5055 | * |
5056 | * @remark Since AdminOptions is decoupled from the actual request type |
5057 | * there is no enforcement to prevent setting unrelated properties, |
5058 | * e.g. setting validate_only on a DescribeConfigs request is allowed |
5059 | * but is silently ignored by DescribeConfigs. |
5060 | * Future versions may introduce such enforcement. |
5061 | */ |
5062 | |
5063 | |
5064 | typedef struct rd_kafka_AdminOptions_s rd_kafka_AdminOptions_t; |
5065 | |
5066 | /** |
5067 | * @brief Create a new AdminOptions object. |
5068 | * |
5069 | * The options object is not modified by the Admin API request APIs, |
5070 | * (e.g. CreateTopics) and may be reused for multiple calls. |
5071 | * |
5072 | * @param rk Client instance. |
5073 | * @param for_api Specifies what Admin API this AdminOptions object will be used |
5074 | * for, which will enforce what AdminOptions_set_..() calls may |
5075 | * be used based on the API, causing unsupported set..() calls |
5076 | * to fail. |
5077 | * Specifying RD_KAFKA_ADMIN_OP_ANY disables the enforcement |
5078 | * allowing any option to be set, even if the option |
5079 | * is not used in a future call to an Admin API method. |
5080 | * |
5081 | * @returns a new AdminOptions object (which must be freed with |
5082 | * rd_kafka_AdminOptions_destroy()), or NULL if \p for_api was set to |
5083 | * an unknown API op type. |
5084 | */ |
5085 | RD_EXPORT rd_kafka_AdminOptions_t * |
5086 | rd_kafka_AdminOptions_new (rd_kafka_t *rk, rd_kafka_admin_op_t for_api); |
5087 | |
5088 | |
5089 | /** |
5090 | * @brief Destroy a AdminOptions object. |
5091 | */ |
5092 | RD_EXPORT void rd_kafka_AdminOptions_destroy (rd_kafka_AdminOptions_t *options); |
5093 | |
5094 | |
5095 | /** |
5096 | * @brief Sets the overall request timeout, including broker lookup, |
5097 | * request transmission, operation time on broker, and response. |
5098 | * |
5099 | * @param timeout_ms Timeout in milliseconds, use -1 for indefinite timeout. |
5100 | * Defaults to `socket.timeout.ms`. |
5101 | * |
5102 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or |
5103 | * RD_KAFKA_RESP_ERR__INVALID_ARG if timeout was out of range in which |
5104 | * case an error string will be written \p errstr. |
5105 | * |
5106 | * @remark This option is valid for all Admin API requests. |
5107 | */ |
5108 | RD_EXPORT rd_kafka_resp_err_t |
5109 | rd_kafka_AdminOptions_set_request_timeout (rd_kafka_AdminOptions_t *options, |
5110 | int timeout_ms, |
5111 | char *errstr, size_t errstr_size); |
5112 | |
5113 | |
5114 | /** |
5115 | * @brief Sets the broker's operation timeout, such as the timeout for |
5116 | * CreateTopics to complete the creation of topics on the controller |
5117 | * before returning a result to the application. |
5118 | * |
5119 | * CreateTopics: values <= 0 will return immediately after triggering topic |
5120 | * creation, while > 0 will wait this long for topic creation to propagate |
5121 | * in cluster. Default: 0. |
5122 | * |
5123 | * DeleteTopics: same semantics as CreateTopics. |
5124 | * CreatePartitions: same semantics as CreateTopics. |
5125 | * |
5126 | * |
5127 | * @param timeout_ms Timeout in milliseconds. |
5128 | * |
5129 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or |
5130 | * RD_KAFKA_RESP_ERR__INVALID_ARG if timeout was out of range in which |
5131 | * case an error string will be written \p errstr. |
5132 | * |
5133 | * @remark This option is valid for CreateTopics, DeleteTopics and |
5134 | * CreatePartitions. |
5135 | */ |
5136 | RD_EXPORT rd_kafka_resp_err_t |
5137 | rd_kafka_AdminOptions_set_operation_timeout (rd_kafka_AdminOptions_t *options, |
5138 | int timeout_ms, |
5139 | char *errstr, size_t errstr_size); |
5140 | |
5141 | |
5142 | /** |
5143 | * @brief Tell broker to only validate the request, without performing |
5144 | * the requested operation (create topics, etc). |
5145 | * |
5146 | * @param true_or_false Defaults to false. |
5147 | * |
5148 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an |
5149 | * error code on failure in which case an error string will |
5150 | * be written \p errstr. |
5151 | * |
5152 | * @remark This option is valid for CreateTopics, |
5153 | * CreatePartitions, AlterConfigs. |
5154 | */ |
5155 | RD_EXPORT rd_kafka_resp_err_t |
5156 | rd_kafka_AdminOptions_set_validate_only (rd_kafka_AdminOptions_t *options, |
5157 | int true_or_false, |
5158 | char *errstr, size_t errstr_size); |
5159 | |
5160 | |
5161 | /** |
5162 | * @brief Override what broker the Admin request will be sent to. |
5163 | * |
5164 | * By default, Admin requests are sent to the controller broker, with |
5165 | * the following exceptions: |
5166 | * - AlterConfigs with a BROKER resource are sent to the broker id set |
5167 | * as the resource name. |
5168 | * - DescribeConfigs with a BROKER resource are sent to the broker id set |
5169 | * as the resource name. |
5170 | * |
5171 | * @param broker_id The broker to send the request to. |
5172 | * |
5173 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an |
5174 | * error code on failure in which case an error string will |
5175 | * be written \p errstr. |
5176 | * |
5177 | * @remark This API should typically not be used, but serves as a workaround |
5178 | * if new resource types are to the broker that the client |
5179 | * does not know where to send. |
5180 | */ |
5181 | RD_EXPORT rd_kafka_resp_err_t |
5182 | rd_kafka_AdminOptions_set_broker (rd_kafka_AdminOptions_t *options, |
5183 | int32_t broker_id, |
5184 | char *errstr, size_t errstr_size); |
5185 | |
5186 | |
5187 | |
5188 | /** |
5189 | * @brief Set application opaque value that can be extracted from the |
5190 | * result event using rd_kafka_event_opaque() |
5191 | */ |
5192 | RD_EXPORT void |
5193 | rd_kafka_AdminOptions_set_opaque (rd_kafka_AdminOptions_t *options, |
5194 | void *opaque); |
5195 | |
5196 | |
5197 | |
5198 | |
5199 | |
5200 | |
5201 | /** |
5202 | * @section CreateTopics - create topics in cluster |
5203 | * |
5204 | * |
5205 | */ |
5206 | |
5207 | |
5208 | typedef struct rd_kafka_NewTopic_s rd_kafka_NewTopic_t; |
5209 | |
5210 | /** |
5211 | * @brief Create a new NewTopic object. This object is later passed to |
5212 | * rd_kafka_CreateTopics(). |
5213 | * |
5214 | * @param topic Topic name to create. |
5215 | * @param num_partitions Number of partitions in topic. |
5216 | * @param replication_factor Default replication factor for the topic's |
5217 | * partitions, or -1 if set_replica_assignment() |
5218 | * will be used. |
5219 | * |
5220 | * @returns a new allocated NewTopic object, or NULL if the input parameters |
5221 | * are invalid. |
5222 | * Use rd_kafka_NewTopic_destroy() to free object when done. |
5223 | */ |
5224 | RD_EXPORT rd_kafka_NewTopic_t * |
5225 | rd_kafka_NewTopic_new (const char *topic, int num_partitions, |
5226 | int replication_factor, |
5227 | char *errstr, size_t errstr_size); |
5228 | |
5229 | /** |
5230 | * @brief Destroy and free a NewTopic object previously created with |
5231 | * rd_kafka_NewTopic_new() |
5232 | */ |
5233 | RD_EXPORT void |
5234 | rd_kafka_NewTopic_destroy (rd_kafka_NewTopic_t *new_topic); |
5235 | |
5236 | |
5237 | /** |
5238 | * @brief Helper function to destroy all NewTopic objects in the \p new_topics |
5239 | * array (of \p new_topic_cnt elements). |
5240 | * The array itself is not freed. |
5241 | */ |
5242 | RD_EXPORT void |
5243 | rd_kafka_NewTopic_destroy_array (rd_kafka_NewTopic_t **new_topics, |
5244 | size_t new_topic_cnt); |
5245 | |
5246 | |
5247 | /** |
5248 | * @brief Set the replica (broker) assignment for \p partition to the |
5249 | * replica set in \p broker_ids (of \p broker_id_cnt elements). |
5250 | * |
5251 | * @remark When this method is used, rd_kafka_NewTopic_new() must have |
5252 | * been called with a \c replication_factor of -1. |
5253 | * |
5254 | * @remark An application must either set the replica assignment for |
5255 | * all new partitions, or none. |
5256 | * |
5257 | * @remark If called, this function must be called consecutively for each |
5258 | * partition, starting at 0. |
5259 | * |
5260 | * @remark Use rd_kafka_metadata() to retrieve the list of brokers |
5261 | * in the cluster. |
5262 | * |
5263 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or an error code |
5264 | * if the arguments were invalid. |
5265 | * |
5266 | * @sa rd_kafka_AdminOptions_set_validate_only() |
5267 | */ |
5268 | RD_EXPORT rd_kafka_resp_err_t |
5269 | rd_kafka_NewTopic_set_replica_assignment (rd_kafka_NewTopic_t *new_topic, |
5270 | int32_t partition, |
5271 | int32_t *broker_ids, |
5272 | size_t broker_id_cnt, |
5273 | char *errstr, size_t errstr_size); |
5274 | |
5275 | /** |
5276 | * @brief Set (broker-side) topic configuration name/value pair. |
5277 | * |
5278 | * @remark The name and value are not validated by the client, the validation |
5279 | * takes place on the broker. |
5280 | * |
5281 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or an error code |
5282 | * if the arguments were invalid. |
5283 | * |
5284 | * @sa rd_kafka_AdminOptions_set_validate_only() |
5285 | * @sa http://kafka.apache.org/documentation.html#topicconfigs |
5286 | */ |
5287 | RD_EXPORT rd_kafka_resp_err_t |
5288 | rd_kafka_NewTopic_set_config (rd_kafka_NewTopic_t *new_topic, |
5289 | const char *name, const char *value); |
5290 | |
5291 | |
5292 | /** |
5293 | * @brief Create topics in cluster as specified by the \p new_topics |
5294 | * array of size \p new_topic_cnt elements. |
5295 | * |
5296 | * @param new_topics Array of new topics to create. |
5297 | * @param new_topic_cnt Number of elements in \p new_topics array. |
5298 | * @param options Optional admin options, or NULL for defaults. |
5299 | * @param rkqu Queue to emit result on. |
5300 | * |
5301 | * Supported admin options: |
5302 | * - rd_kafka_AdminOptions_set_validate_only() - default false |
5303 | * - rd_kafka_AdminOptions_set_operation_timeout() - default 0 |
5304 | * - rd_kafka_AdminOptions_set_timeout() - default socket.timeout.ms |
5305 | * |
5306 | * @remark The result event type emitted on the supplied queue is of type |
5307 | * \c RD_KAFKA_EVENT_CREATETOPICS_RESULT |
5308 | */ |
5309 | RD_EXPORT void |
5310 | rd_kafka_CreateTopics (rd_kafka_t *rk, |
5311 | rd_kafka_NewTopic_t **new_topics, |
5312 | size_t new_topic_cnt, |
5313 | const rd_kafka_AdminOptions_t *options, |
5314 | rd_kafka_queue_t *rkqu); |
5315 | |
5316 | |
5317 | /** |
5318 | * @brief CreateTopics result type and methods |
5319 | */ |
5320 | |
5321 | /** |
5322 | * @brief Get an array of topic results from a CreateTopics result. |
5323 | * |
5324 | * The returned \p topics life-time is the same as the \p result object. |
5325 | * @param cntp is updated to the number of elements in the array. |
5326 | */ |
5327 | RD_EXPORT const rd_kafka_topic_result_t ** |
5328 | rd_kafka_CreateTopics_result_topics ( |
5329 | const rd_kafka_CreateTopics_result_t *result, |
5330 | size_t *cntp); |
5331 | |
5332 | |
5333 | |
5334 | |
5335 | |
5336 | /** |
5337 | * @section DeleteTopics - delete topics from cluster |
5338 | * |
5339 | * |
5340 | */ |
5341 | |
5342 | typedef struct rd_kafka_DeleteTopic_s rd_kafka_DeleteTopic_t; |
5343 | |
5344 | /** |
5345 | * @brief Create a new DeleteTopic object. This object is later passed to |
5346 | * rd_kafka_DeleteTopics(). |
5347 | * |
5348 | * @param topic Topic name to delete. |
5349 | * |
5350 | * @returns a new allocated DeleteTopic object. |
5351 | * Use rd_kafka_DeleteTopic_destroy() to free object when done. |
5352 | */ |
5353 | RD_EXPORT rd_kafka_DeleteTopic_t * |
5354 | rd_kafka_DeleteTopic_new (const char *topic); |
5355 | |
5356 | /** |
5357 | * @brief Destroy and free a DeleteTopic object previously created with |
5358 | * rd_kafka_DeleteTopic_new() |
5359 | */ |
5360 | RD_EXPORT void |
5361 | rd_kafka_DeleteTopic_destroy (rd_kafka_DeleteTopic_t *del_topic); |
5362 | |
5363 | /** |
5364 | * @brief Helper function to destroy all DeleteTopic objects in |
5365 | * the \p del_topics array (of \p del_topic_cnt elements). |
5366 | * The array itself is not freed. |
5367 | */ |
5368 | RD_EXPORT void |
5369 | rd_kafka_DeleteTopic_destroy_array (rd_kafka_DeleteTopic_t **del_topics, |
5370 | size_t del_topic_cnt); |
5371 | |
5372 | /** |
5373 | * @brief Delete topics from cluster as specified by the \p topics |
5374 | * array of size \p topic_cnt elements. |
5375 | * |
5376 | * @param topics Array of topics to delete. |
5377 | * @param topic_cnt Number of elements in \p topics array. |
5378 | * @param options Optional admin options, or NULL for defaults. |
5379 | * @param rkqu Queue to emit result on. |
5380 | * |
5381 | * @remark The result event type emitted on the supplied queue is of type |
5382 | * \c RD_KAFKA_EVENT_DELETETOPICS_RESULT |
5383 | */ |
5384 | RD_EXPORT |
5385 | void rd_kafka_DeleteTopics (rd_kafka_t *rk, |
5386 | rd_kafka_DeleteTopic_t **del_topics, |
5387 | size_t del_topic_cnt, |
5388 | const rd_kafka_AdminOptions_t *options, |
5389 | rd_kafka_queue_t *rkqu); |
5390 | |
5391 | |
5392 | |
5393 | /** |
5394 | * @brief DeleteTopics result type and methods |
5395 | */ |
5396 | |
5397 | /** |
5398 | * @brief Get an array of topic results from a DeleteTopics result. |
5399 | * |
5400 | * The returned \p topics life-time is the same as the \p result object. |
5401 | * @param cntp is updated to the number of elements in the array. |
5402 | */ |
5403 | RD_EXPORT const rd_kafka_topic_result_t ** |
5404 | rd_kafka_DeleteTopics_result_topics ( |
5405 | const rd_kafka_DeleteTopics_result_t *result, |
5406 | size_t *cntp); |
5407 | |
5408 | |
5409 | |
5410 | |
5411 | |
5412 | |
5413 | /** |
5414 | * @section CreatePartitions - add partitions to topic. |
5415 | * |
5416 | * |
5417 | */ |
5418 | |
5419 | typedef struct rd_kafka_NewPartitions_s rd_kafka_NewPartitions_t; |
5420 | |
5421 | /** |
5422 | * @brief Create a new NewPartitions. This object is later passed to |
5423 | * rd_kafka_CreatePartitions() to increas the number of partitions |
5424 | * to \p new_total_cnt for an existing topic. |
5425 | * |
5426 | * @param topic Topic name to create more partitions for. |
5427 | * @param new_total_cnt Increase the topic's partition count to this value. |
5428 | * |
5429 | * @returns a new allocated NewPartitions object, or NULL if the |
5430 | * input parameters are invalid. |
5431 | * Use rd_kafka_NewPartitions_destroy() to free object when done. |
5432 | */ |
5433 | RD_EXPORT rd_kafka_NewPartitions_t * |
5434 | rd_kafka_NewPartitions_new (const char *topic, size_t new_total_cnt, |
5435 | char *errstr, size_t errstr_size); |
5436 | |
5437 | /** |
5438 | * @brief Destroy and free a NewPartitions object previously created with |
5439 | * rd_kafka_NewPartitions_new() |
5440 | */ |
5441 | RD_EXPORT void |
5442 | rd_kafka_NewPartitions_destroy (rd_kafka_NewPartitions_t *new_parts); |
5443 | |
5444 | /** |
5445 | * @brief Helper function to destroy all NewPartitions objects in the |
5446 | * \p new_parts array (of \p new_parts_cnt elements). |
5447 | * The array itself is not freed. |
5448 | */ |
5449 | RD_EXPORT void |
5450 | rd_kafka_NewPartitions_destroy_array (rd_kafka_NewPartitions_t **new_parts, |
5451 | size_t new_parts_cnt); |
5452 | |
5453 | /** |
5454 | * @brief Set the replica (broker id) assignment for \p new_partition_idx to the |
5455 | * replica set in \p broker_ids (of \p broker_id_cnt elements). |
5456 | * |
5457 | * @remark An application must either set the replica assignment for |
5458 | * all new partitions, or none. |
5459 | * |
5460 | * @remark If called, this function must be called consecutively for each |
5461 | * new partition being created, |
5462 | * where \p new_partition_idx 0 is the first new partition, |
5463 | * 1 is the second, and so on. |
5464 | * |
5465 | * @remark \p broker_id_cnt should match the topic's replication factor. |
5466 | * |
5467 | * @remark Use rd_kafka_metadata() to retrieve the list of brokers |
5468 | * in the cluster. |
5469 | * |
5470 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success, or an error code |
5471 | * if the arguments were invalid. |
5472 | * |
5473 | * @sa rd_kafka_AdminOptions_set_validate_only() |
5474 | */ |
5475 | RD_EXPORT rd_kafka_resp_err_t |
5476 | rd_kafka_NewPartitions_set_replica_assignment (rd_kafka_NewPartitions_t *new_parts, |
5477 | int32_t new_partition_idx, |
5478 | int32_t *broker_ids, |
5479 | size_t broker_id_cnt, |
5480 | char *errstr, |
5481 | size_t errstr_size); |
5482 | |
5483 | |
5484 | /** |
5485 | * @brief Create additional partitions for the given topics, as specified |
5486 | * by the \p new_parts array of size \p new_parts_cnt elements. |
5487 | * |
5488 | * @param new_parts Array of topics for which new partitions are to be created. |
5489 | * @param new_parts_cnt Number of elements in \p new_parts array. |
5490 | * @param options Optional admin options, or NULL for defaults. |
5491 | * @param rkqu Queue to emit result on. |
5492 | * |
5493 | * Supported admin options: |
5494 | * - rd_kafka_AdminOptions_set_validate_only() - default false |
5495 | * - rd_kafka_AdminOptions_set_operation_timeout() - default 0 |
5496 | * - rd_kafka_AdminOptions_set_timeout() - default socket.timeout.ms |
5497 | * |
5498 | * @remark The result event type emitted on the supplied queue is of type |
5499 | * \c RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT |
5500 | */ |
5501 | RD_EXPORT void |
5502 | rd_kafka_CreatePartitions (rd_kafka_t *rk, |
5503 | rd_kafka_NewPartitions_t **new_parts, |
5504 | size_t new_parts_cnt, |
5505 | const rd_kafka_AdminOptions_t *options, |
5506 | rd_kafka_queue_t *rkqu); |
5507 | |
5508 | |
5509 | |
5510 | /** |
5511 | * @brief CreatePartitions result type and methods |
5512 | */ |
5513 | |
5514 | /** |
5515 | * @brief Get an array of topic results from a CreatePartitions result. |
5516 | * |
5517 | * The returned \p topics life-time is the same as the \p result object. |
5518 | * @param cntp is updated to the number of elements in the array. |
5519 | */ |
5520 | RD_EXPORT const rd_kafka_topic_result_t ** |
5521 | rd_kafka_CreatePartitions_result_topics ( |
5522 | const rd_kafka_CreatePartitions_result_t *result, |
5523 | size_t *cntp); |
5524 | |
5525 | |
5526 | |
5527 | |
5528 | |
5529 | /** |
5530 | * @section Cluster, broker, topic configuration entries, sources, etc. |
5531 | * |
5532 | * These entities relate to the cluster, not the local client. |
5533 | * |
5534 | * @sa rd_kafka_conf_set(), et.al. for local client configuration. |
5535 | * |
5536 | */ |
5537 | |
5538 | /** |
5539 | * @enum Apache Kafka config sources |
5540 | */ |
5541 | typedef enum rd_kafka_ConfigSource_t { |
5542 | /**< Source unknown, e.g., in the ConfigEntry used for alter requests |
5543 | * where source is not set */ |
5544 | RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG = 0, |
5545 | /**< Dynamic topic config that is configured for a specific topic */ |
5546 | RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG = 1, |
5547 | /**< Dynamic broker config that is configured for a specific broker */ |
5548 | RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG = 2, |
5549 | /**< Dynamic broker config that is configured as default for all |
5550 | * brokers in the cluster */ |
5551 | RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG = 3, |
5552 | /**< Static broker config provided as broker properties at startup |
5553 | * (e.g. from server.properties file) */ |
5554 | RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG = 4, |
5555 | /**< Built-in default configuration for configs that have a |
5556 | * default value */ |
5557 | RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG = 5, |
5558 | |
5559 | /**< Number of source types defined */ |
5560 | RD_KAFKA_CONFIG_SOURCE__CNT, |
5561 | } rd_kafka_ConfigSource_t; |
5562 | |
5563 | |
5564 | /** |
5565 | * @returns a string representation of the \p confsource. |
5566 | */ |
5567 | RD_EXPORT const char * |
5568 | rd_kafka_ConfigSource_name (rd_kafka_ConfigSource_t confsource); |
5569 | |
5570 | |
5571 | typedef struct rd_kafka_ConfigEntry_s rd_kafka_ConfigEntry_t; |
5572 | |
5573 | /** |
5574 | * @returns the configuration property name |
5575 | */ |
5576 | RD_EXPORT const char * |
5577 | rd_kafka_ConfigEntry_name (const rd_kafka_ConfigEntry_t *entry); |
5578 | |
5579 | /** |
5580 | * @returns the configuration value, may be NULL for sensitive or unset |
5581 | * properties. |
5582 | */ |
5583 | RD_EXPORT const char * |
5584 | rd_kafka_ConfigEntry_value (const rd_kafka_ConfigEntry_t *entry); |
5585 | |
5586 | /** |
5587 | * @returns the config source. |
5588 | */ |
5589 | RD_EXPORT rd_kafka_ConfigSource_t |
5590 | rd_kafka_ConfigEntry_source (const rd_kafka_ConfigEntry_t *entry); |
5591 | |
5592 | /** |
5593 | * @returns 1 if the config property is read-only on the broker, else 0. |
5594 | * @remark Shall only be used on a DescribeConfigs result, otherwise returns -1. |
5595 | */ |
5596 | RD_EXPORT int |
5597 | rd_kafka_ConfigEntry_is_read_only (const rd_kafka_ConfigEntry_t *entry); |
5598 | |
5599 | /** |
5600 | * @returns 1 if the config property is set to its default value on the broker, |
5601 | * else 0. |
5602 | * @remark Shall only be used on a DescribeConfigs result, otherwise returns -1. |
5603 | */ |
5604 | RD_EXPORT int |
5605 | rd_kafka_ConfigEntry_is_default (const rd_kafka_ConfigEntry_t *entry); |
5606 | |
5607 | /** |
5608 | * @returns 1 if the config property contains sensitive information (such as |
5609 | * security configuration), else 0. |
5610 | * @remark An application should take care not to include the value of |
5611 | * sensitive configuration entries in its output. |
5612 | * @remark Shall only be used on a DescribeConfigs result, otherwise returns -1. |
5613 | */ |
5614 | RD_EXPORT int |
5615 | rd_kafka_ConfigEntry_is_sensitive (const rd_kafka_ConfigEntry_t *entry); |
5616 | |
5617 | /** |
5618 | * @returns 1 if this entry is a synonym, else 0. |
5619 | */ |
5620 | RD_EXPORT int |
5621 | rd_kafka_ConfigEntry_is_synonym (const rd_kafka_ConfigEntry_t *entry); |
5622 | |
5623 | |
5624 | /** |
5625 | * @returns the synonym config entry array. |
5626 | * |
5627 | * @param cntp is updated to the number of elements in the array. |
5628 | * |
5629 | * @remark The lifetime of the returned entry is the same as \p conf . |
5630 | * @remark Shall only be used on a DescribeConfigs result, |
5631 | * otherwise returns NULL. |
5632 | */ |
5633 | RD_EXPORT const rd_kafka_ConfigEntry_t ** |
5634 | rd_kafka_ConfigEntry_synonyms (const rd_kafka_ConfigEntry_t *entry, |
5635 | size_t *cntp); |
5636 | |
5637 | |
5638 | |
5639 | |
5640 | /** |
5641 | * @enum Apache Kafka resource types |
5642 | */ |
5643 | typedef enum rd_kafka_ResourceType_t { |
5644 | RD_KAFKA_RESOURCE_UNKNOWN = 0, /**< Unknown */ |
5645 | RD_KAFKA_RESOURCE_ANY = 1, /**< Any (used for lookups) */ |
5646 | RD_KAFKA_RESOURCE_TOPIC = 2, /**< Topic */ |
5647 | RD_KAFKA_RESOURCE_GROUP = 3, /**< Group */ |
5648 | RD_KAFKA_RESOURCE_BROKER = 4, /**< Broker */ |
5649 | RD_KAFKA_RESOURCE__CNT, /**< Number of resource types defined */ |
5650 | } rd_kafka_ResourceType_t; |
5651 | |
5652 | /** |
5653 | * @returns a string representation of the \p restype |
5654 | */ |
5655 | RD_EXPORT const char * |
5656 | rd_kafka_ResourceType_name (rd_kafka_ResourceType_t restype); |
5657 | |
5658 | typedef struct rd_kafka_ConfigResource_s rd_kafka_ConfigResource_t; |
5659 | |
5660 | |
5661 | RD_EXPORT rd_kafka_ConfigResource_t * |
5662 | rd_kafka_ConfigResource_new (rd_kafka_ResourceType_t restype, |
5663 | const char *resname); |
5664 | |
5665 | /** |
5666 | * @brief Destroy and free a ConfigResource object previously created with |
5667 | * rd_kafka_ConfigResource_new() |
5668 | */ |
5669 | RD_EXPORT void |
5670 | rd_kafka_ConfigResource_destroy (rd_kafka_ConfigResource_t *config); |
5671 | |
5672 | |
5673 | /** |
5674 | * @brief Helper function to destroy all ConfigResource objects in |
5675 | * the \p configs array (of \p config_cnt elements). |
5676 | * The array itself is not freed. |
5677 | */ |
5678 | RD_EXPORT void |
5679 | rd_kafka_ConfigResource_destroy_array (rd_kafka_ConfigResource_t **config, |
5680 | size_t config_cnt); |
5681 | |
5682 | |
5683 | /** |
5684 | * @brief Set configuration name value pair. |
5685 | * |
5686 | * @param name Configuration name, depends on resource type. |
5687 | * @param value Configuration value, depends on resource type and \p name. |
5688 | * Set to \c NULL to revert configuration value to default. |
5689 | * |
5690 | * This will overwrite the current value. |
5691 | * |
5692 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if config was added to resource, |
5693 | * or RD_KAFKA_RESP_ERR__INVALID_ARG on invalid input. |
5694 | */ |
5695 | RD_EXPORT rd_kafka_resp_err_t |
5696 | rd_kafka_ConfigResource_set_config (rd_kafka_ConfigResource_t *config, |
5697 | const char *name, const char *value); |
5698 | |
5699 | |
5700 | /** |
5701 | * @brief Get an array of config entries from a ConfigResource object. |
5702 | * |
5703 | * The returned object life-times are the same as the \p config object. |
5704 | * |
5705 | * @param cntp is updated to the number of elements in the array. |
5706 | */ |
5707 | RD_EXPORT const rd_kafka_ConfigEntry_t ** |
5708 | rd_kafka_ConfigResource_configs (const rd_kafka_ConfigResource_t *config, |
5709 | size_t *cntp); |
5710 | |
5711 | |
5712 | |
5713 | /** |
5714 | * @returns the ResourceType for \p config |
5715 | */ |
5716 | RD_EXPORT rd_kafka_ResourceType_t |
5717 | rd_kafka_ConfigResource_type (const rd_kafka_ConfigResource_t *config); |
5718 | |
5719 | /** |
5720 | * @returns the name for \p config |
5721 | */ |
5722 | RD_EXPORT const char * |
5723 | rd_kafka_ConfigResource_name (const rd_kafka_ConfigResource_t *config); |
5724 | |
5725 | /** |
5726 | * @returns the error for this resource from an AlterConfigs request |
5727 | */ |
5728 | RD_EXPORT rd_kafka_resp_err_t |
5729 | rd_kafka_ConfigResource_error (const rd_kafka_ConfigResource_t *config); |
5730 | |
5731 | /** |
5732 | * @returns the error string for this resource from an AlterConfigs |
5733 | * request, or NULL if no error. |
5734 | */ |
5735 | RD_EXPORT const char * |
5736 | rd_kafka_ConfigResource_error_string (const rd_kafka_ConfigResource_t *config); |
5737 | |
5738 | |
5739 | /** |
5740 | * @section AlterConfigs - alter cluster configuration. |
5741 | * |
5742 | * |
5743 | */ |
5744 | |
5745 | |
5746 | /** |
5747 | * @brief Update the configuration for the specified resources. |
5748 | * Updates are not transactional so they may succeed for a subset |
5749 | * of the provided resources while the others fail. |
5750 | * The configuration for a particular resource is updated atomically, |
5751 | * replacing values using the provided ConfigEntrys and reverting |
5752 | * unspecified ConfigEntrys to their default values. |
5753 | * |
5754 | * @remark Requires broker version >=0.11.0.0 |
5755 | * |
5756 | * @warning AlterConfigs will replace all existing configuration for |
5757 | * the provided resources with the new configuration given, |
5758 | * reverting all other configuration to their default values. |
5759 | * |
5760 | * @remark Multiple resources and resource types may be set, but at most one |
5761 | * resource of type \c RD_KAFKA_RESOURCE_BROKER is allowed per call |
5762 | * since these resource requests must be sent to the broker specified |
5763 | * in the resource. |
5764 | * |
5765 | */ |
5766 | RD_EXPORT |
5767 | void rd_kafka_AlterConfigs (rd_kafka_t *rk, |
5768 | rd_kafka_ConfigResource_t **configs, |
5769 | size_t config_cnt, |
5770 | const rd_kafka_AdminOptions_t *options, |
5771 | rd_kafka_queue_t *rkqu); |
5772 | |
5773 | |
5774 | /** |
5775 | * @brief AlterConfigs result type and methods |
5776 | */ |
5777 | |
5778 | /** |
5779 | * @brief Get an array of resource results from a AlterConfigs result. |
5780 | * |
5781 | * Use \c rd_kafka_ConfigResource_error() and |
5782 | * \c rd_kafka_ConfigResource_error_string() to extract per-resource error |
5783 | * results on the returned array elements. |
5784 | * |
5785 | * The returned object life-times are the same as the \p result object. |
5786 | * |
5787 | * @param cntp is updated to the number of elements in the array. |
5788 | * |
5789 | * @returns an array of ConfigResource elements, or NULL if not available. |
5790 | */ |
5791 | RD_EXPORT const rd_kafka_ConfigResource_t ** |
5792 | rd_kafka_AlterConfigs_result_resources ( |
5793 | const rd_kafka_AlterConfigs_result_t *result, |
5794 | size_t *cntp); |
5795 | |
5796 | |
5797 | |
5798 | |
5799 | |
5800 | |
5801 | /** |
5802 | * @section DescribeConfigs - retrieve cluster configuration. |
5803 | * |
5804 | * |
5805 | */ |
5806 | |
5807 | |
5808 | /** |
5809 | * @brief Get configuration for the specified resources in \p configs. |
5810 | * |
5811 | * The returned configuration includes default values and the |
5812 | * rd_kafka_ConfigEntry_is_default() or rd_kafka_ConfigEntry_source() |
5813 | * methods may be used to distinguish them from user supplied values. |
5814 | * |
5815 | * The value of config entries where rd_kafka_ConfigEntry_is_sensitive() |
5816 | * is true will always be NULL to avoid disclosing sensitive |
5817 | * information, such as security settings. |
5818 | * |
5819 | * Configuration entries where rd_kafka_ConfigEntry_is_read_only() |
5820 | * is true can't be updated (with rd_kafka_AlterConfigs()). |
5821 | * |
5822 | * Synonym configuration entries are returned if the broker supports |
5823 | * it (broker version >= 1.1.0). See rd_kafka_ConfigEntry_synonyms(). |
5824 | * |
5825 | * @remark Requires broker version >=0.11.0.0 |
5826 | * |
5827 | * @remark Multiple resources and resource types may be requested, but at most |
5828 | * one resource of type \c RD_KAFKA_RESOURCE_BROKER is allowed per call |
5829 | * since these resource requests must be sent to the broker specified |
5830 | * in the resource. |
5831 | */ |
5832 | RD_EXPORT |
5833 | void rd_kafka_DescribeConfigs (rd_kafka_t *rk, |
5834 | rd_kafka_ConfigResource_t **configs, |
5835 | size_t config_cnt, |
5836 | const rd_kafka_AdminOptions_t *options, |
5837 | rd_kafka_queue_t *rkqu); |
5838 | |
5839 | |
5840 | /** |
5841 | * @brief DescribeConfigs result type and methods |
5842 | */ |
5843 | |
5844 | /** |
5845 | * @brief Get an array of resource results from a DescribeConfigs result. |
5846 | * |
5847 | * The returned \p resources life-time is the same as the \p result object. |
5848 | * @param cntp is updated to the number of elements in the array. |
5849 | */ |
5850 | RD_EXPORT const rd_kafka_ConfigResource_t ** |
5851 | rd_kafka_DescribeConfigs_result_resources ( |
5852 | const rd_kafka_DescribeConfigs_result_t *result, |
5853 | size_t *cntp); |
5854 | |
5855 | /**@}*/ |
5856 | |
5857 | |
5858 | |
5859 | /** |
5860 | * @name Security APIs |
5861 | * @{ |
5862 | * |
5863 | */ |
5864 | |
5865 | /** |
5866 | * @brief Set SASL/OAUTHBEARER token and metadata |
5867 | * |
5868 | * @param rk Client instance. |
5869 | * @param token_value the mandatory token value to set, often (but not |
5870 | * necessarily) a JWS compact serialization as per |
5871 | * https://tools.ietf.org/html/rfc7515#section-3.1. |
5872 | * @param md_lifetime_ms when the token expires, in terms of the number of |
5873 | * milliseconds since the epoch. |
5874 | * @param md_principal_name the mandatory Kafka principal name associated |
5875 | * with the token. |
5876 | * @param extensions optional SASL extensions key-value array with |
5877 | * \p extensions_size elements (number of keys * 2), where [i] is the key and |
5878 | * [i+1] is the key's value, to be communicated to the broker |
5879 | * as additional key-value pairs during the initial client response as per |
5880 | * https://tools.ietf.org/html/rfc7628#section-3.1. The key-value pairs are |
5881 | * copied. |
5882 | * @param extension_size the number of SASL extension keys plus values, |
5883 | * which must be a non-negative multiple of 2. |
5884 | * @param errstr A human readable error string (nul-terminated) is written to |
5885 | * this location that must be of at least \p errstr_size bytes. |
5886 | * The \p errstr is only written to if there is an error. |
5887 | * |
5888 | * The SASL/OAUTHBEARER token refresh callback or event handler should invoke |
5889 | * this method upon success. The extension keys must not include the reserved |
5890 | * key "`auth`", and all extension keys and values must conform to the required |
5891 | * format as per https://tools.ietf.org/html/rfc7628#section-3.1: |
5892 | * |
5893 | * key = 1*(ALPHA) |
5894 | * value = *(VCHAR / SP / HTAB / CR / LF ) |
5895 | * |
5896 | * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise \p errstr set |
5897 | * and:<br> |
5898 | * \c RD_KAFKA_RESP_ERR__INVALID_ARG if any of the arguments are |
5899 | * invalid;<br> |
5900 | * \c RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not |
5901 | * supported by this build;<br> |
5902 | * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is supported but is |
5903 | * not configured as the client's authentication mechanism.<br> |
5904 | * |
5905 | * @sa rd_kafka_oauthbearer_set_token_failure |
5906 | * @sa rd_kafka_conf_set_oauthbearer_token_refresh_cb |
5907 | */ |
5908 | RD_EXPORT |
5909 | rd_kafka_resp_err_t |
5910 | rd_kafka_oauthbearer_set_token (rd_kafka_t *rk, |
5911 | const char *token_value, |
5912 | int64_t md_lifetime_ms, |
5913 | const char *md_principal_name, |
5914 | const char **extensions, size_t extension_size, |
5915 | char *errstr, size_t errstr_size); |
5916 | |
5917 | /** |
5918 | * @brief SASL/OAUTHBEARER token refresh failure indicator. |
5919 | * |
5920 | * @param rk Client instance. |
5921 | * @param errstr mandatory human readable error reason for failing to acquire |
5922 | * a token. |
5923 | * |
5924 | * The SASL/OAUTHBEARER token refresh callback or event handler should invoke |
5925 | * this method upon failure. |
5926 | * |
5927 | * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise:<br> |
5928 | * \c RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not |
5929 | * supported by this build;<br> |
5930 | * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is supported but is |
5931 | * not configured as the client's authentication mechanism,<br> |
5932 | * \c RD_KAFKA_RESP_ERR__INVALID_ARG if no error string is supplied. |
5933 | * |
5934 | * @sa rd_kafka_oauthbearer_set_token |
5935 | * @sa rd_kafka_conf_set_oauthbearer_token_refresh_cb |
5936 | */ |
5937 | RD_EXPORT |
5938 | rd_kafka_resp_err_t |
5939 | rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr); |
5940 | |
5941 | /**@}*/ |
5942 | |
5943 | |
5944 | #ifdef __cplusplus |
5945 | } |
5946 | #endif |
5947 | #endif /* _RDKAFKA_H_ */ |
5948 | |