1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rd.h"
31
32#include <stdlib.h>
33#include <ctype.h>
34#include <stddef.h>
35
36#include "rdkafka_int.h"
37#include "rdkafka_feature.h"
38#include "rdkafka_interceptor.h"
39#include "rdkafka_idempotence.h"
40#include "rdkafka_sasl_oauthbearer.h"
41#if WITH_PLUGINS
42#include "rdkafka_plugin.h"
43#endif
44#include "rdunittest.h"
45
46#ifndef _MSC_VER
47#include <netinet/tcp.h>
48#else
49
50#ifndef WIN32_MEAN_AND_LEAN
51#define WIN32_MEAN_AND_LEAN
52#endif
53#include <windows.h>
54#endif
55
56struct rd_kafka_property {
57 rd_kafka_conf_scope_t scope;
58 const char *name;
59 enum {
60 _RK_C_STR,
61 _RK_C_INT,
62 _RK_C_S2I, /* String to Integer mapping.
63 * Supports limited canonical str->int mappings
64 * using s2i[] */
65 _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */
66 _RK_C_BOOL,
67 _RK_C_PTR, /* Only settable through special set functions */
68 _RK_C_PATLIST, /* Pattern list */
69 _RK_C_KSTR, /* Kafka string */
70 _RK_C_ALIAS, /* Alias: points to other property through .sdef */
71 _RK_C_INTERNAL, /* Internal, don't expose to application */
72 _RK_C_INVALID, /* Invalid property, used to catch known
73 * but unsupported Java properties. */
74 } type;
75 int offset;
76 const char *desc;
77 int vmin;
78 int vmax;
79 int vdef; /* Default value (int) */
80 const char *sdef; /* Default value (string) */
81 void *pdef; /* Default value (pointer) */
82 struct {
83 int val;
84 const char *str;
85 } s2i[20]; /* _RK_C_S2I and _RK_C_S2F */
86
87 /* Value validator (STR) */
88 int (*validate) (const struct rd_kafka_property *prop,
89 const char *val, int ival);
90
91 /* Configuration object constructors and destructor for use when
92 * the property value itself is not used, or needs extra care. */
93 void (*ctor) (int scope, void *pconf);
94 void (*dtor) (int scope, void *pconf);
95 void (*copy) (int scope, void *pdst, const void *psrc,
96 void *dstptr, const void *srcptr,
97 size_t filter_cnt, const char **filter);
98
99 rd_kafka_conf_res_t (*set) (int scope, void *pconf,
100 const char *name, const char *value,
101 void *dstptr,
102 rd_kafka_conf_set_mode_t set_mode,
103 char *errstr, size_t errstr_size);
104};
105
106
107#define _RK(field) offsetof(rd_kafka_conf_t, field)
108#define _RKT(field) offsetof(rd_kafka_topic_conf_t, field)
109
110
111static rd_kafka_conf_res_t
112rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
113 char *dest, size_t *dest_size);
114
115
116
117
118/**
119 * @returns a unique index for property \p prop, using the byte position
120 * of the field.
121 */
122static RD_INLINE int rd_kafka_prop2idx (const struct rd_kafka_property *prop) {
123 return prop->offset;
124}
125
126
127
128/**
129 * @brief Set the property as modified.
130 *
131 * We do this by mapping the property's conf struct field byte offset
132 * to a bit in a bit vector.
133 * If the bit is set the property has been modified, otherwise it is
134 * at its default unmodified value.
135 *
136 * \p is_modified 1: set as modified, 0: clear modified
137 */
138static void rd_kafka_anyconf_set_modified (void *conf,
139 const struct rd_kafka_property *prop,
140 int is_modified) {
141 int idx = rd_kafka_prop2idx(prop);
142 int bkt = idx / 64;
143 uint64_t bit = (uint64_t)1 << (idx % 64);
144 struct rd_kafka_anyconf_hdr *confhdr = conf;
145
146 rd_assert(idx < RD_KAFKA_CONF_PROPS_IDX_MAX &&
147 *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
148
149 if (is_modified)
150 confhdr->modified[bkt] |= bit;
151 else
152 confhdr->modified[bkt] &= ~bit;
153}
154
155/**
156 * @brief Clear is_modified for all properties.
157 * @warning Does NOT clear/reset the value.
158 */
159static void rd_kafka_anyconf_clear_all_is_modified (void *conf) {
160 struct rd_kafka_anyconf_hdr *confhdr = conf;
161
162 memset(confhdr, 0, sizeof(*confhdr));
163}
164
165
166/**
167 * @returns true of the property has been set/modified, else false.
168 */
169static rd_bool_t
170rd_kafka_anyconf_is_modified (const void *conf,
171 const struct rd_kafka_property *prop) {
172 int idx = rd_kafka_prop2idx(prop);
173 int bkt = idx / 64;
174 uint64_t bit = (uint64_t)1 << (idx % 64);
175 const struct rd_kafka_anyconf_hdr *confhdr = conf;
176
177 return !!(confhdr->modified[bkt] & bit);
178}
179
180
181
182/**
183 * @brief Validate \p broker.version.fallback property.
184 */
185static int
186rd_kafka_conf_validate_broker_version (const struct rd_kafka_property *prop,
187 const char *val, int ival) {
188 struct rd_kafka_ApiVersion *apis;
189 size_t api_cnt;
190 return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL);
191}
192
193/**
194 * @brief Validate that string is a single item, without delimters (, space).
195 */
196static RD_UNUSED int
197rd_kafka_conf_validate_single (const struct rd_kafka_property *prop,
198 const char *val, int ival) {
199 return !strchr(val, ',') && !strchr(val, ' ');
200}
201
202/**
203 * @brief Validate builtin partitioner string
204 */
205static RD_UNUSED int
206rd_kafka_conf_validate_partitioner (const struct rd_kafka_property *prop,
207 const char *val, int ival) {
208 return !strcmp(val, "random") ||
209 !strcmp(val, "consistent") ||
210 !strcmp(val, "consistent_random") ||
211 !strcmp(val, "murmur2") ||
212 !strcmp(val, "murmur2_random");
213}
214
215
216/**
217 * librdkafka configuration property definitions.
218 */
219static const struct rd_kafka_property rd_kafka_properties[] = {
220 /* Global properties */
221 { _RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features),
222 "Indicates the builtin features for this build of librdkafka. "
223 "An application can either query this value or attempt to set it "
224 "with its list of required features to check for library support.",
225 0, 0x7fffffff, 0xffff,
226 .s2i = {
227#if WITH_ZLIB
228 { 0x1, "gzip" },
229#endif
230#if WITH_SNAPPY
231 { 0x2, "snappy" },
232#endif
233#if WITH_SSL
234 { 0x4, "ssl" },
235#endif
236 { 0x8, "sasl" },
237 { 0x10, "regex" },
238 { 0x20, "lz4" },
239#if defined(_MSC_VER) || WITH_SASL_CYRUS
240 { 0x40, "sasl_gssapi" },
241#endif
242 { 0x80, "sasl_plain" },
243#if WITH_SASL_SCRAM
244 { 0x100, "sasl_scram" },
245#endif
246#if WITH_PLUGINS
247 { 0x200, "plugins" },
248#endif
249#if WITH_ZSTD
250 { 0x400, "zstd" },
251#endif
252#if WITH_SASL_OAUTHBEARER
253 { 0x800, "sasl_oauthbearer" },
254#endif
255 { 0, NULL }
256 }
257 },
258 { _RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
259 "Client identifier.",
260 .sdef = "rdkafka" },
261 { _RK_GLOBAL|_RK_HIGH, "metadata.broker.list", _RK_C_STR,
262 _RK(brokerlist),
263 "Initial list of brokers as a CSV list of broker host or host:port. "
264 "The application may also use `rd_kafka_brokers_add()` to add "
265 "brokers during runtime." },
266 { _RK_GLOBAL|_RK_HIGH, "bootstrap.servers", _RK_C_ALIAS, 0,
267 "See metadata.broker.list",
268 .sdef = "metadata.broker.list" },
269 { _RK_GLOBAL|_RK_MED, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
270 "Maximum Kafka protocol request message size.",
271 1000, 1000000000, 1000000 },
272 { _RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT,
273 _RK(msg_copy_max_size),
274 "Maximum size for message to be copied to buffer. "
275 "Messages larger than this will be passed by reference (zero-copy) "
276 "at the expense of larger iovecs.",
277 0, 1000000000, 0xffff },
278 { _RK_GLOBAL|_RK_MED, "receive.message.max.bytes", _RK_C_INT,
279 _RK(recv_max_msg_size),
280 "Maximum Kafka protocol response message size. "
281 "This serves as a safety precaution to avoid memory exhaustion in "
282 "case of protocol hickups. "
283 "This value must be at least `fetch.max.bytes` + 512 to allow "
284 "for protocol overhead; the value is adjusted automatically "
285 "unless the configuration property is explicitly set.",
286 1000, INT_MAX, 100000000 },
287 { _RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT,
288 _RK(max_inflight),
289 "Maximum number of in-flight requests per broker connection. "
290 "This is a generic property applied to all broker communication, "
291 "however it is primarily relevant to produce requests. "
292 "In particular, note that other mechanisms limit the number "
293 "of outstanding consumer fetch request per broker to one.",
294 1, 1000000, 1000000 },
295 { _RK_GLOBAL, "max.in.flight", _RK_C_ALIAS,
296 .sdef = "max.in.flight.requests.per.connection" },
297 { _RK_GLOBAL, "metadata.request.timeout.ms", _RK_C_INT,
298 _RK(metadata_request_timeout_ms),
299 "Non-topic request timeout in milliseconds. "
300 "This is for metadata requests, etc.",
301 10, 900*1000, 60*1000},
302 { _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
303 _RK(metadata_refresh_interval_ms),
304 "Topic metadata refresh interval in milliseconds. "
305 "The metadata is automatically refreshed on error and connect. "
306 "Use -1 to disable the intervalled refresh.",
307 -1, 3600*1000, 5*60*1000 },
308 { _RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT,
309 _RK(metadata_max_age_ms),
310 "Metadata cache max age. "
311 "Defaults to topic.metadata.refresh.interval.ms * 3",
312 1, 24*3600*1000, 5*60*1000 * 3 },
313 { _RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
314 _RK(metadata_refresh_fast_interval_ms),
315 "When a topic loses its leader a new metadata request will be "
316 "enqueued with this initial interval, exponentially increasing "
317 "until the topic metadata has been refreshed. "
318 "This is used to recover quickly from transitioning leader brokers.",
319 1, 60*1000, 250 },
320 { _RK_GLOBAL|_RK_DEPRECATED,
321 "topic.metadata.refresh.fast.cnt", _RK_C_INT,
322 _RK(metadata_refresh_fast_cnt),
323 "No longer used.",
324 0, 1000, 10 },
325 { _RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
326 _RK(metadata_refresh_sparse),
327 "Sparse metadata requests (consumes less network bandwidth)",
328 0, 1, 1 },
329 { _RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST,
330 _RK(topic_blacklist),
331 "Topic blacklist, a comma-separated list of regular expressions "
332 "for matching topic names that should be ignored in "
333 "broker metadata information as if the topics did not exist." },
334 { _RK_GLOBAL|_RK_MED, "debug", _RK_C_S2F, _RK(debug),
335 "A comma-separated list of debug contexts to enable. "
336 "Detailed Producer debugging: broker,topic,msg. "
337 "Consumer: consumer,cgrp,topic,fetch",
338 .s2i = {
339 { RD_KAFKA_DBG_GENERIC, "generic" },
340 { RD_KAFKA_DBG_BROKER, "broker" },
341 { RD_KAFKA_DBG_TOPIC, "topic" },
342 { RD_KAFKA_DBG_METADATA, "metadata" },
343 { RD_KAFKA_DBG_FEATURE, "feature" },
344 { RD_KAFKA_DBG_QUEUE, "queue" },
345 { RD_KAFKA_DBG_MSG, "msg" },
346 { RD_KAFKA_DBG_PROTOCOL, "protocol" },
347 { RD_KAFKA_DBG_CGRP, "cgrp" },
348 { RD_KAFKA_DBG_SECURITY, "security" },
349 { RD_KAFKA_DBG_FETCH, "fetch" },
350 { RD_KAFKA_DBG_INTERCEPTOR, "interceptor" },
351 { RD_KAFKA_DBG_PLUGIN, "plugin" },
352 { RD_KAFKA_DBG_CONSUMER, "consumer" },
353 { RD_KAFKA_DBG_ADMIN, "admin" },
354 { RD_KAFKA_DBG_EOS, "eos" },
355 { RD_KAFKA_DBG_ALL, "all" }
356 } },
357 { _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
358 "Default timeout for network requests. "
359 "Producer: ProduceRequests will use the lesser value of "
360 "`socket.timeout.ms` and remaining `message.timeout.ms` for the "
361 "first message in the batch. "
362 "Consumer: FetchRequests will use "
363 "`fetch.wait.max.ms` + `socket.timeout.ms`. "
364 "Admin: Admin requests will use `socket.timeout.ms` or explicitly "
365 "set `rd_kafka_AdminOptions_set_operation_timeout()` value.",
366 10, 300*1000, 60*1000 },
367 { _RK_GLOBAL|_RK_DEPRECATED, "socket.blocking.max.ms", _RK_C_INT,
368 _RK(socket_blocking_max_ms),
369 "No longer used.",
370 1, 60*1000, 1000 },
371 { _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT,
372 _RK(socket_sndbuf_size),
373 "Broker socket send buffer size. System default is used if 0.",
374 0, 100000000, 0 },
375 { _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
376 _RK(socket_rcvbuf_size),
377 "Broker socket receive buffer size. System default is used if 0.",
378 0, 100000000, 0 },
379#ifdef SO_KEEPALIVE
380 { _RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL,
381 _RK(socket_keepalive),
382 "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets",
383 0, 1, 0 },
384#endif
385#ifdef TCP_NODELAY
386 { _RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL,
387 _RK(socket_nagle_disable),
388 "Disable the Nagle algorithm (TCP_NODELAY) on broker sockets.",
389 0, 1, 0 },
390#endif
391 { _RK_GLOBAL, "socket.max.fails", _RK_C_INT,
392 _RK(socket_max_fails),
393 "Disconnect from broker when this number of send failures "
394 "(e.g., timed out requests) is reached. Disable with 0. "
395 "WARNING: It is highly recommended to leave this setting at "
396 "its default value of 1 to avoid the client and broker to "
397 "become desynchronized in case of request timeouts. "
398 "NOTE: The connection is automatically re-established.",
399 0, 1000000, 1 },
400 { _RK_GLOBAL, "broker.address.ttl", _RK_C_INT,
401 _RK(broker_addr_ttl),
402 "How long to cache the broker address resolving "
403 "results (milliseconds).",
404 0, 86400*1000, 1*1000 },
405 { _RK_GLOBAL, "broker.address.family", _RK_C_S2I,
406 _RK(broker_addr_family),
407 "Allowed broker IP address families: any, v4, v6",
408 .vdef = AF_UNSPEC,
409 .s2i = {
410 { AF_UNSPEC, "any" },
411 { AF_INET, "v4" },
412 { AF_INET6, "v6" },
413 } },
414 { _RK_GLOBAL|_RK_MED|_RK_HIDDEN, "enable.sparse.connections",
415 _RK_C_BOOL,
416 _RK(sparse_connections),
417 "When enabled the client will only connect to brokers "
418 "it needs to communicate with. When disabled the client "
419 "will maintain connections to all brokers in the cluster.",
420 0, 1, 1 },
421 { _RK_GLOBAL|_RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT,
422 _RK(reconnect_jitter_ms),
423 "No longer used. See `reconnect.backoff.ms` and "
424 "`reconnect.backoff.max.ms`.",
425 0, 60*60*1000, 0 },
426 { _RK_GLOBAL|_RK_MED, "reconnect.backoff.ms", _RK_C_INT,
427 _RK(reconnect_backoff_ms),
428 "The initial time to wait before reconnecting to a broker "
429 "after the connection has been closed. "
430 "The time is increased exponentially until "
431 "`reconnect.backoff.max.ms` is reached. "
432 "-25% to +50% jitter is applied to each reconnect backoff. "
433 "A value of 0 disables the backoff and reconnects immediately.",
434 0, 60*60*1000, 100 },
435 { _RK_GLOBAL|_RK_MED, "reconnect.backoff.max.ms", _RK_C_INT,
436 _RK(reconnect_backoff_max_ms),
437 "The maximum time to wait before reconnecting to a broker "
438 "after the connection has been closed.",
439 0, 60*60*1000, 10*1000 },
440 { _RK_GLOBAL|_RK_HIGH, "statistics.interval.ms", _RK_C_INT,
441 _RK(stats_interval_ms),
442 "librdkafka statistics emit interval. The application also needs to "
443 "register a stats callback using `rd_kafka_conf_set_stats_cb()`. "
444 "The granularity is 1000ms. A value of 0 disables statistics.",
445 0, 86400*1000, 0 },
446 { _RK_GLOBAL, "enabled_events", _RK_C_INT,
447 _RK(enabled_events),
448 "See `rd_kafka_conf_set_events()`",
449 0, 0x7fffffff, 0 },
450 { _RK_GLOBAL, "error_cb", _RK_C_PTR,
451 _RK(error_cb),
452 "Error callback (set with rd_kafka_conf_set_error_cb())" },
453 { _RK_GLOBAL, "throttle_cb", _RK_C_PTR,
454 _RK(throttle_cb),
455 "Throttle callback (set with rd_kafka_conf_set_throttle_cb())" },
456 { _RK_GLOBAL, "stats_cb", _RK_C_PTR,
457 _RK(stats_cb),
458 "Statistics callback (set with rd_kafka_conf_set_stats_cb())" },
459 { _RK_GLOBAL, "log_cb", _RK_C_PTR,
460 _RK(log_cb),
461 "Log callback (set with rd_kafka_conf_set_log_cb())",
462 .pdef = rd_kafka_log_print },
463 { _RK_GLOBAL, "log_level", _RK_C_INT,
464 _RK(log_level),
465 "Logging level (syslog(3) levels)",
466 0, 7, 6 },
467 { _RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue),
468 "Disable spontaneous log_cb from internal librdkafka "
469 "threads, instead enqueue log messages on queue set with "
470 "`rd_kafka_set_log_queue()` and serve log callbacks or "
471 "events through the standard poll APIs. "
472 "**NOTE**: Log messages will linger in a temporary queue "
473 "until the log queue has been set.",
474 0, 1, 0 },
475 { _RK_GLOBAL, "log.thread.name", _RK_C_BOOL,
476 _RK(log_thread_name),
477 "Print internal thread name in log messages "
478 "(useful for debugging librdkafka internals)",
479 0, 1, 1 },
480 { _RK_GLOBAL, "log.connection.close", _RK_C_BOOL,
481 _RK(log_connection_close),
482 "Log broker disconnects. "
483 "It might be useful to turn this off when interacting with "
484 "0.9 brokers with an aggressive `connection.max.idle.ms` value.",
485 0, 1, 1 },
486 { _RK_GLOBAL, "background_event_cb", _RK_C_PTR,
487 _RK(background_event_cb),
488 "Background queue event callback "
489 "(set with rd_kafka_conf_set_background_event_cb())" },
490 { _RK_GLOBAL, "socket_cb", _RK_C_PTR,
491 _RK(socket_cb),
492 "Socket creation callback to provide race-free CLOEXEC",
493 .pdef =
494#ifdef __linux__
495 rd_kafka_socket_cb_linux
496#else
497 rd_kafka_socket_cb_generic
498#endif
499 },
500 { _RK_GLOBAL, "connect_cb", _RK_C_PTR,
501 _RK(connect_cb),
502 "Socket connect callback",
503 },
504 { _RK_GLOBAL, "closesocket_cb", _RK_C_PTR,
505 _RK(closesocket_cb),
506 "Socket close callback",
507 },
508 { _RK_GLOBAL, "open_cb", _RK_C_PTR,
509 _RK(open_cb),
510 "File open callback to provide race-free CLOEXEC",
511 .pdef =
512#ifdef __linux__
513 rd_kafka_open_cb_linux
514#else
515 rd_kafka_open_cb_generic
516#endif
517 },
518 { _RK_GLOBAL, "opaque", _RK_C_PTR,
519 _RK(opaque),
520 "Application opaque (set with rd_kafka_conf_set_opaque())" },
521 { _RK_GLOBAL, "default_topic_conf", _RK_C_PTR,
522 _RK(topic_conf),
523 "Default topic configuration for automatically subscribed topics" },
524 { _RK_GLOBAL, "internal.termination.signal", _RK_C_INT,
525 _RK(term_sig),
526 "Signal that librdkafka will use to quickly terminate on "
527 "rd_kafka_destroy(). If this signal is not set then there will be a "
528 "delay before rd_kafka_wait_destroyed() returns true "
529 "as internal threads are timing out their system calls. "
530 "If this signal is set however the delay will be minimal. "
531 "The application should mask this signal as an internal "
532 "signal handler is installed.",
533 0, 128, 0 },
534 { _RK_GLOBAL|_RK_HIGH, "api.version.request", _RK_C_BOOL,
535 _RK(api_version_request),
536 "Request broker's supported API versions to adjust functionality to "
537 "available protocol features. If set to false, or the "
538 "ApiVersionRequest fails, the fallback version "
539 "`broker.version.fallback` will be used. "
540 "**NOTE**: Depends on broker version >=0.10.0. If the request is not "
541 "supported by (an older) broker the `broker.version.fallback` fallback is used.",
542 0, 1, 1 },
543 { _RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT,
544 _RK(api_version_request_timeout_ms),
545 "Timeout for broker API version requests.",
546 1, 5*60*1000, 10*1000 },
547 { _RK_GLOBAL|_RK_MED, "api.version.fallback.ms", _RK_C_INT,
548 _RK(api_version_fallback_ms),
549 "Dictates how long the `broker.version.fallback` fallback is used "
550 "in the case the ApiVersionRequest fails. "
551 "**NOTE**: The ApiVersionRequest is only issued when a new connection "
552 "to the broker is made (such as after an upgrade).",
553 0, 86400*7*1000, 0 },
554
555 { _RK_GLOBAL|_RK_MED, "broker.version.fallback", _RK_C_STR,
556 _RK(broker_version_fallback),
557 "Older broker versions (before 0.10.0) provide no way for a client to query "
558 "for supported protocol features "
559 "(ApiVersionRequest, see `api.version.request`) making it impossible "
560 "for the client to know what features it may use. "
561 "As a workaround a user may set this property to the expected broker "
562 "version and the client will automatically adjust its feature set "
563 "accordingly if the ApiVersionRequest fails (or is disabled). "
564 "The fallback broker version will be used for `api.version.fallback.ms`. "
565 "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. "
566 "Any other value >= 0.10, such as 0.10.2.1, "
567 "enables ApiVersionRequests.",
568 .sdef = "0.10.0",
569 .validate = rd_kafka_conf_validate_broker_version },
570
571 /* Security related global properties */
572 { _RK_GLOBAL|_RK_HIGH, "security.protocol", _RK_C_S2I,
573 _RK(security_protocol),
574 "Protocol used to communicate with brokers.",
575 .vdef = RD_KAFKA_PROTO_PLAINTEXT,
576 .s2i = {
577 { RD_KAFKA_PROTO_PLAINTEXT, "plaintext" },
578#if WITH_SSL
579 { RD_KAFKA_PROTO_SSL, "ssl" },
580#endif
581 { RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext" },
582#if WITH_SSL
583 { RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl" },
584#endif
585 { 0, NULL }
586 } },
587
588#if WITH_SSL
589 { _RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR,
590 _RK(ssl.cipher_suites),
591 "A cipher suite is a named combination of authentication, "
592 "encryption, MAC and key exchange algorithm used to negotiate the "
593 "security settings for a network connection using TLS or SSL network "
594 "protocol. See manual page for `ciphers(1)` and "
595 "`SSL_CTX_set_cipher_list(3)."
596 },
597#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER)
598 { _RK_GLOBAL, "ssl.curves.list", _RK_C_STR,
599 _RK(ssl.curves_list),
600 "The supported-curves extension in the TLS ClientHello message specifies "
601 "the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client "
602 "is willing to have the server use. See manual page for "
603 "`SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required."
604 },
605 { _RK_GLOBAL, "ssl.sigalgs.list", _RK_C_STR,
606 _RK(ssl.sigalgs_list),
607 "The client uses the TLS ClientHello signature_algorithms extension "
608 "to indicate to the server which signature/hash algorithm pairs "
609 "may be used in digital signatures. See manual page for "
610 "`SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required."
611 },
612#endif
613 { _RK_GLOBAL, "ssl.key.location", _RK_C_STR,
614 _RK(ssl.key_location),
615 "Path to client's private key (PEM) used for authentication."
616 },
617 { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.password", _RK_C_STR,
618 _RK(ssl.key_password),
619 "Private key passphrase (for use with `ssl.key.location` "
620 "and `set_ssl_cert()`)"
621 },
622 { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.pem", _RK_C_STR,
623 _RK(ssl.key_pem),
624 "Client's private key string (PEM format) used for authentication."
625 },
626 { _RK_GLOBAL, "ssl_key", _RK_C_INTERNAL,
627 _RK(ssl.key),
628 "Client's private key as set by rd_kafka_conf_set_ssl_cert()",
629 .dtor = rd_kafka_conf_cert_dtor,
630 .copy = rd_kafka_conf_cert_copy
631 },
632 { _RK_GLOBAL, "ssl.certificate.location", _RK_C_STR,
633 _RK(ssl.cert_location),
634 "Path to client's public key (PEM) used for authentication."
635 },
636 { _RK_GLOBAL, "ssl.certificate.pem", _RK_C_STR,
637 _RK(ssl.cert_pem),
638 "Client's public key string (PEM format) used for authentication."
639 },
640 { _RK_GLOBAL, "ssl_certificate", _RK_C_INTERNAL,
641 _RK(ssl.key),
642 "Client's public key as set by rd_kafka_conf_set_ssl_cert()",
643 .dtor = rd_kafka_conf_cert_dtor,
644 .copy = rd_kafka_conf_cert_copy
645 },
646
647 { _RK_GLOBAL, "ssl.ca.location", _RK_C_STR,
648 _RK(ssl.ca_location),
649 "File or directory path to CA certificate(s) for verifying "
650 "the broker's key."
651 },
652 { _RK_GLOBAL, "ssl_ca", _RK_C_INTERNAL,
653 _RK(ssl.ca),
654 "CA certificate as set by rd_kafka_conf_set_ssl_cert()",
655 .dtor = rd_kafka_conf_cert_dtor,
656 .copy = rd_kafka_conf_cert_copy
657 },
658 { _RK_GLOBAL, "ssl.crl.location", _RK_C_STR,
659 _RK(ssl.crl_location),
660 "Path to CRL for verifying broker's certificate validity."
661 },
662 { _RK_GLOBAL, "ssl.keystore.location", _RK_C_STR,
663 _RK(ssl.keystore_location),
664 "Path to client's keystore (PKCS#12) used for authentication."
665 },
666 { _RK_GLOBAL|_RK_SENSITIVE, "ssl.keystore.password", _RK_C_STR,
667 _RK(ssl.keystore_password),
668 "Client's keystore (PKCS#12) password."
669 },
670 { _RK_GLOBAL, "enable.ssl.certificate.verification", _RK_C_BOOL,
671 _RK(ssl.enable_verify),
672 "Enable OpenSSL's builtin broker (server) certificate verification. "
673 "This verification can be extended by the application by "
674 "implementing a certificate_verify_cb.",
675 0, 1, 1
676 },
677#if OPENSSL_VERSION_NUMBER >= 0x1000200fL
678 { _RK_GLOBAL, "ssl.endpoint.identification.algorithm", _RK_C_S2I,
679 _RK(ssl.endpoint_identification),
680 "Endpoint identification algorithm to validate broker "
681 "hostname using broker certificate. "
682 "https - Server (broker) hostname verification as "
683 "specified in RFC2818. "
684 "none - No endpoint verification. "
685 "OpenSSL >= 1.0.2 required.",
686 .vdef = RD_KAFKA_SSL_ENDPOINT_ID_NONE,
687 .s2i = {
688 { RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none" },
689 { RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https" }
690 }
691 },
692#endif
693 { _RK_GLOBAL, "ssl.certificate.verify_cb", _RK_C_PTR,
694 _RK(ssl.cert_verify_cb),
695 "Callback to verify the broker certificate chain."
696 },
697#endif /* WITH_SSL */
698
699 /* Point user in the right direction if they try to apply
700 * Java client SSL / JAAS properties. */
701 { _RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID,
702 _RK(dummy),
703 "Java TrustStores are not supported, use `ssl.ca.location` "
704 "and a certificate file instead. "
705 "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information."
706 },
707 { _RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID,
708 _RK(dummy),
709 "Java JAAS configuration is not supported, see "
710 "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka "
711 "for more information."
712 },
713
714 {_RK_GLOBAL|_RK_HIGH, "sasl.mechanisms", _RK_C_STR,
715 _RK(sasl.mechanisms),
716 "SASL mechanism to use for authentication. "
717 "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. "
718 "**NOTE**: Despite the name only one mechanism must be configured.",
719 .sdef = "GSSAPI",
720 .validate = rd_kafka_conf_validate_single },
721 {_RK_GLOBAL|_RK_HIGH, "sasl.mechanism", _RK_C_ALIAS,
722 .sdef = "sasl.mechanisms" },
723 { _RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR,
724 _RK(sasl.service_name),
725 "Kerberos principal name that Kafka runs as, "
726 "not including /hostname@REALM",
727 .sdef = "kafka" },
728 { _RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR,
729 _RK(sasl.principal),
730 "This client's Kerberos principal name. "
731 "(Not supported on Windows, will use the logon user's principal).",
732 .sdef = "kafkaclient" },
733#ifndef _MSC_VER
734 { _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR,
735 _RK(sasl.kinit_cmd),
736 "Shell command to refresh or acquire the client's Kerberos ticket. "
737 "This command is executed on client creation and every "
738 "sasl.kerberos.min.time.before.relogin (0=disable). "
739 "%{config.prop.name} is replaced by corresponding config "
740 "object value.",
741 .sdef =
742 /* First attempt to refresh, else acquire. */
743 "kinit -R -t \"%{sasl.kerberos.keytab}\" "
744 "-k %{sasl.kerberos.principal} || "
745 "kinit -t \"%{sasl.kerberos.keytab}\" -k %{sasl.kerberos.principal}"
746 },
747 { _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR,
748 _RK(sasl.keytab),
749 "Path to Kerberos keytab file. "
750 "This configuration property is only used as a variable in "
751 "`sasl.kerberos.kinit.cmd` as "
752 "` ... -t \"%{sasl.kerberos.keytab}\"`." },
753 { _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
754 _RK(sasl.relogin_min_time),
755 "Minimum time in milliseconds between key refresh attempts. "
756 "Disable automatic key refresh by setting this property to 0.",
757 0, 86400*1000, 60*1000 },
758#endif
759 { _RK_GLOBAL|_RK_HIGH, "sasl.username", _RK_C_STR,
760 _RK(sasl.username),
761 "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" },
762 { _RK_GLOBAL|_RK_HIGH, "sasl.password", _RK_C_STR,
763 _RK(sasl.password),
764 "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" },
765#if WITH_SASL_OAUTHBEARER
766 { _RK_GLOBAL, "sasl.oauthbearer.config", _RK_C_STR,
767 _RK(sasl.oauthbearer_config),
768 "SASL/OAUTHBEARER configuration. The format is "
769 "implementation-dependent and must be parsed accordingly. The "
770 "default unsecured token implementation (see "
771 "https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes "
772 "space-separated name=value pairs with valid names including "
773 "principalClaimName, principal, scopeClaimName, scope, and "
774 "lifeSeconds. The default value for principalClaimName is \"sub\", "
775 "the default value for scopeClaimName is \"scope\", and the default "
776 "value for lifeSeconds is 3600. The scope value is CSV format with "
777 "the default value being no/empty scope. For example: "
778 "`principalClaimName=azp principal=admin scopeClaimName=roles "
779 "scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions "
780 "can be communicated to the broker via "
781 "`extension_<extensionname>=value`. For example: "
782 "`principal=admin extension_traceId=123`" },
783 { _RK_GLOBAL, "enable.sasl.oauthbearer.unsecure.jwt", _RK_C_BOOL,
784 _RK(sasl.enable_oauthbearer_unsecure_jwt),
785 "Enable the builtin unsecure JWT OAUTHBEARER token handler "
786 "if no oauthbearer_refresh_cb has been set. "
787 "This builtin handler should only be used for development "
788 "or testing, and not in production.",
789 0, 1, 0 },
790 { _RK_GLOBAL, "oauthbearer_token_refresh_cb", _RK_C_PTR,
791 _RK(sasl.oauthbearer_token_refresh_cb),
792 "SASL/OAUTHBEARER token refresh callback (set with "
793 "rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by "
794 "rd_kafka_poll(), et.al. "
795 "This callback will be triggered when it is time to refresh "
796 "the client's OAUTHBEARER token." },
797#endif
798
799#if WITH_PLUGINS
800 /* Plugins */
801 { _RK_GLOBAL, "plugin.library.paths", _RK_C_STR,
802 _RK(plugin_paths),
803 "List of plugin libraries to load (; separated). "
804 "The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the "
805 "platform-specific extension (such as .dll or .so) will be appended automatically.",
806 .set = rd_kafka_plugins_conf_set },
807#endif
808
809 /* Interceptors are added through specific API and not exposed
810 * as configuration properties.
811 * The interceptor property must be defined after plugin.library.paths
812 * so that the plugin libraries are properly loaded before
813 * interceptors are configured when duplicating configuration objects.*/
814 { _RK_GLOBAL, "interceptors", _RK_C_INTERNAL,
815 _RK(interceptors),
816 "Interceptors added through rd_kafka_conf_interceptor_add_..() "
817 "and any configuration handled by interceptors.",
818 .ctor = rd_kafka_conf_interceptor_ctor,
819 .dtor = rd_kafka_conf_interceptor_dtor,
820 .copy = rd_kafka_conf_interceptor_copy },
821
822 /* Unit test interfaces.
823 * These are not part of the public API and may change at any time.
824 * Only to be used by the librdkafka tests. */
825 { _RK_GLOBAL|_RK_HIDDEN, "ut_handle_ProduceResponse", _RK_C_PTR,
826 _RK(ut.handle_ProduceResponse),
827 "ProduceResponse handler: "
828 "rd_kafka_resp_err_t (*cb) (rd_kafka_t *rk, "
829 "int32_t brokerid, uint64_t msgid, rd_kafka_resp_err_t err)" },
830
831 /* Global consumer group properties */
832 { _RK_GLOBAL|_RK_CGRP|_RK_HIGH, "group.id", _RK_C_STR,
833 _RK(group_id_str),
834 "Client group id string. All clients sharing the same group.id "
835 "belong to the same group." },
836 { _RK_GLOBAL|_RK_CGRP|_RK_MED, "partition.assignment.strategy",
837 _RK_C_STR,
838 _RK(partition_assignment_strategy),
839 "Name of partition assignment strategy to use when elected "
840 "group leader assigns partitions to group members.",
841 .sdef = "range,roundrobin" },
842 { _RK_GLOBAL|_RK_CGRP|_RK_HIGH, "session.timeout.ms", _RK_C_INT,
843 _RK(group_session_timeout_ms),
844 "Client group session and failure detection timeout. "
845 "The consumer sends periodic heartbeats (heartbeat.interval.ms) "
846 "to indicate its liveness to the broker. If no hearts are "
847 "received by the broker for a group member within the "
848 "session timeout, the broker will remove the consumer from "
849 "the group and trigger a rebalance. "
850 "The allowed range is configured with the **broker** configuration "
851 "properties `group.min.session.timeout.ms` and "
852 "`group.max.session.timeout.ms`. "
853 "Also see `max.poll.interval.ms`.",
854 1, 3600*1000, 10*1000 },
855 { _RK_GLOBAL|_RK_CGRP, "heartbeat.interval.ms", _RK_C_INT,
856 _RK(group_heartbeat_intvl_ms),
857 "Group session keepalive heartbeat interval.",
858 1, 3600*1000, 3*1000 },
859 { _RK_GLOBAL|_RK_CGRP, "group.protocol.type", _RK_C_KSTR,
860 _RK(group_protocol_type),
861 "Group protocol type",
862 .sdef = "consumer" },
863 { _RK_GLOBAL|_RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
864 _RK(coord_query_intvl_ms),
865 "How often to query for the current client group coordinator. "
866 "If the currently assigned coordinator is down the configured "
867 "query interval will be divided by ten to more quickly recover "
868 "in case of coordinator reassignment.",
869 1, 3600*1000, 10*60*1000 },
870 { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "max.poll.interval.ms", _RK_C_INT,
871 _RK(max_poll_interval_ms),
872 "Maximum allowed time between calls to consume messages "
873 "(e.g., rd_kafka_consumer_poll()) for high-level consumers. "
874 "If this interval is exceeded the consumer is considered failed "
875 "and the group will rebalance in order to reassign the "
876 "partitions to another consumer group member. "
877 "Warning: Offset commits may be not possible at this point. "
878 "Note: It is recommended to set `enable.auto.offset.store=false` "
879 "for long-time processing applications and then explicitly store "
880 "offsets (using offsets_store()) *after* message processing, to "
881 "make sure offsets are not auto-committed prior to processing "
882 "has finished. "
883 "The interval is checked two times per second. "
884 "See KIP-62 for more information.",
885 1, 86400*1000, 300000
886 },
887
888 /* Global consumer properties */
889 { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "enable.auto.commit", _RK_C_BOOL,
890 _RK(enable_auto_commit),
891 "Automatically and periodically commit offsets in the background. "
892 "Note: setting this to false does not prevent the consumer from "
893 "fetching previously committed start offsets. To circumvent this "
894 "behaviour set specific start offsets per partition in the call "
895 "to assign().",
896 0, 1, 1 },
897 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "auto.commit.interval.ms",
898 _RK_C_INT,
899 _RK(auto_commit_interval_ms),
900 "The frequency in milliseconds that the consumer offsets "
901 "are committed (written) to offset storage. (0 = disable). "
902 "This setting is used by the high-level consumer.",
903 0, 86400*1000, 5*1000 },
904 { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "enable.auto.offset.store",
905 _RK_C_BOOL,
906 _RK(enable_auto_offset_store),
907 "Automatically store offset of last message provided to "
908 "application. "
909 "The offset store is an in-memory store of the next offset to "
910 "(auto-)commit for each partition.",
911 0, 1, 1 },
912 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "queued.min.messages", _RK_C_INT,
913 _RK(queued_min_msgs),
914 "Minimum number of messages per topic+partition "
915 "librdkafka tries to maintain in the local consumer queue.",
916 1, 10000000, 100000 },
917 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "queued.max.messages.kbytes",
918 _RK_C_INT,
919 _RK(queued_max_msg_kbytes),
920 "Maximum number of kilobytes per topic+partition in the "
921 "local consumer queue. "
922 "This value may be overshot by fetch.message.max.bytes. "
923 "This property has higher priority than queued.min.messages.",
924 1, INT_MAX/1024, 0x100000/*1GB*/ },
925 { _RK_GLOBAL|_RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT,
926 _RK(fetch_wait_max_ms),
927 "Maximum time the broker may wait to fill the response "
928 "with fetch.min.bytes.",
929 0, 300*1000, 100 },
930 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.message.max.bytes",
931 _RK_C_INT,
932 _RK(fetch_msg_max_bytes),
933 "Initial maximum number of bytes per topic+partition to request when "
934 "fetching messages from the broker. "
935 "If the client encounters a message larger than this value "
936 "it will gradually try to increase it until the "
937 "entire message can be fetched.",
938 1, 1000000000, 1024*1024 },
939 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "max.partition.fetch.bytes",
940 _RK_C_ALIAS,
941 .sdef = "fetch.message.max.bytes" },
942 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.max.bytes", _RK_C_INT,
943 _RK(fetch_max_bytes),
944 "Maximum amount of data the broker shall return for a Fetch request. "
945 "Messages are fetched in batches by the consumer and if the first "
946 "message batch in the first non-empty partition of the Fetch request "
947 "is larger than this value, then the message batch will still be "
948 "returned to ensure the consumer can make progress. "
949 "The maximum message batch size accepted by the broker is defined "
950 "via `message.max.bytes` (broker config) or "
951 "`max.message.bytes` (broker topic config). "
952 "`fetch.max.bytes` is automatically adjusted upwards to be "
953 "at least `message.max.bytes` (consumer config).",
954 0, INT_MAX-512, 50*1024*1024 /* 50MB */ },
955 { _RK_GLOBAL|_RK_CONSUMER, "fetch.min.bytes", _RK_C_INT,
956 _RK(fetch_min_bytes),
957 "Minimum number of bytes the broker responds with. "
958 "If fetch.wait.max.ms expires the accumulated data will "
959 "be sent to the client regardless of this setting.",
960 1, 100000000, 1 },
961 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.error.backoff.ms", _RK_C_INT,
962 _RK(fetch_error_backoff_ms),
963 "How long to postpone the next fetch request for a "
964 "topic+partition in case of a fetch error.",
965 0, 300*1000, 500 },
966 { _RK_GLOBAL|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.method",
967 _RK_C_S2I,
968 _RK(offset_store_method),
969 "Offset commit store method: "
970 "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
971 "'broker' - broker commit store "
972 "(requires Apache Kafka 0.8.2 or later on the broker).",
973 .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
974 .s2i = {
975 { RD_KAFKA_OFFSET_METHOD_NONE, "none" },
976 { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
977 { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
978 }
979 },
980 { _RK_GLOBAL|_RK_CONSUMER, "consume_cb", _RK_C_PTR,
981 _RK(consume_cb),
982 "Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
983 { _RK_GLOBAL|_RK_CONSUMER, "rebalance_cb", _RK_C_PTR,
984 _RK(rebalance_cb),
985 "Called after consumer group has been rebalanced "
986 "(set with rd_kafka_conf_set_rebalance_cb())" },
987 { _RK_GLOBAL|_RK_CONSUMER, "offset_commit_cb", _RK_C_PTR,
988 _RK(offset_commit_cb),
989 "Offset commit result propagation callback. "
990 "(set with rd_kafka_conf_set_offset_commit_cb())" },
991 { _RK_GLOBAL|_RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL,
992 _RK(enable_partition_eof),
993 "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
994 "consumer reaches the end of a partition.",
995 0, 1, 0 },
996 { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "check.crcs", _RK_C_BOOL,
997 _RK(check_crcs),
998 "Verify CRC32 of consumed messages, ensuring no on-the-wire or "
999 "on-disk corruption to the messages occurred. This check comes "
1000 "at slightly increased CPU usage.",
1001 0, 1, 0 },
1002
1003 /* Global producer properties */
1004 { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "enable.idempotence", _RK_C_BOOL,
1005 _RK(eos.idempotence),
1006 "When set to `true`, the producer will ensure that messages are "
1007 "successfully produced exactly once and in the original produce "
1008 "order. "
1009 "The following configuration properties are adjusted automatically "
1010 "(if not modified by the user) when idempotence is enabled: "
1011 "`max.in.flight.requests.per.connection="
1012 RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "` (must be less than or "
1013 "equal to " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "), `retries=INT32_MAX` "
1014 "(must be greater than 0), `acks=all`, `queuing.strategy=fifo`. "
1015 "Producer instantation will fail if user-supplied configuration "
1016 "is incompatible.",
1017 0, 1, 0 },
1018 { _RK_GLOBAL|_RK_PRODUCER|_RK_EXPERIMENTAL, "enable.gapless.guarantee",
1019 _RK_C_BOOL,
1020 _RK(eos.gapless),
1021 "When set to `true`, any error that could result in a gap "
1022 "in the produced message series when a batch of messages fails, "
1023 "will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop "
1024 "the producer. "
1025 "Messages failing due to `message.timeout.ms` are not covered "
1026 "by this guarantee. "
1027 "Requires `enable.idempotence=true`.",
1028 0, 1, 0 },
1029 { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.messages",
1030 _RK_C_INT,
1031 _RK(queue_buffering_max_msgs),
1032 "Maximum number of messages allowed on the producer queue. "
1033 "This queue is shared by all topics and partitions.",
1034 1, 10000000, 100000 },
1035 { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.kbytes",
1036 _RK_C_INT,
1037 _RK(queue_buffering_max_kbytes),
1038 "Maximum total message size sum allowed on the producer queue. "
1039 "This queue is shared by all topics and partitions. "
1040 "This property has higher priority than queue.buffering.max.messages.",
1041 1, INT_MAX/1024, 0x100000/*1GB*/ },
1042 { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.ms",
1043 _RK_C_INT,
1044 _RK(buffering_max_ms),
1045 "Delay in milliseconds to wait for messages in the producer queue "
1046 "to accumulate before constructing message batches (MessageSets) to "
1047 "transmit to brokers. "
1048 "A higher value allows larger and more effective "
1049 "(less overhead, improved compression) batches of messages to "
1050 "accumulate at the expense of increased message delivery latency.",
1051 0, 900*1000, 0 },
1052 { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "linger.ms", _RK_C_ALIAS,
1053 .sdef = "queue.buffering.max.ms" },
1054 { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "message.send.max.retries",
1055 _RK_C_INT,
1056 _RK(max_retries),
1057 "How many times to retry sending a failing Message. "
1058 "**Note:** retrying may cause reordering unless "
1059 "`enable.idempotence` is set to true.",
1060 0, 10000000, 2 },
1061 { _RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
1062 .sdef = "message.send.max.retries" },
1063 { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "retry.backoff.ms", _RK_C_INT,
1064 _RK(retry_backoff_ms),
1065 "The backoff time in milliseconds before retrying a protocol request.",
1066 1, 300*1000, 100 },
1067
1068 { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.backpressure.threshold",
1069 _RK_C_INT, _RK(queue_backpressure_thres),
1070 "The threshold of outstanding not yet transmitted broker requests "
1071 "needed to backpressure the producer's message accumulator. "
1072 "If the number of not yet transmitted requests equals or exceeds "
1073 "this number, produce request creation that would have otherwise "
1074 "been triggered (for example, in accordance with linger.ms) will be "
1075 "delayed. A lower number yields larger and more effective batches. "
1076 "A higher value can improve latency when using compression on slow "
1077 "machines.",
1078 1, 1000000, 1 },
1079
1080 { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "compression.codec", _RK_C_S2I,
1081 _RK(compression_codec),
1082 "compression codec to use for compressing message sets. "
1083 "This is the default value for all topics, may be overridden by "
1084 "the topic configuration property `compression.codec`. ",
1085 .vdef = RD_KAFKA_COMPRESSION_NONE,
1086 .s2i = {
1087 { RD_KAFKA_COMPRESSION_NONE, "none" },
1088#if WITH_ZLIB
1089 { RD_KAFKA_COMPRESSION_GZIP, "gzip" },
1090#endif
1091#if WITH_SNAPPY
1092 { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
1093#endif
1094 { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
1095#if WITH_ZSTD
1096 { RD_KAFKA_COMPRESSION_ZSTD, "zstd" },
1097#endif
1098 { 0 }
1099 } },
1100 { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "compression.type", _RK_C_ALIAS,
1101 .sdef = "compression.codec" },
1102 { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "batch.num.messages", _RK_C_INT,
1103 _RK(batch_num_messages),
1104 "Maximum number of messages batched in one MessageSet. "
1105 "The total MessageSet size is also limited by message.max.bytes.",
1106 1, 1000000, 10000 },
1107 { _RK_GLOBAL|_RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL,
1108 _RK(dr_err_only),
1109 "Only provide delivery reports for failed messages.",
1110 0, 1, 0 },
1111 { _RK_GLOBAL|_RK_PRODUCER, "dr_cb", _RK_C_PTR,
1112 _RK(dr_cb),
1113 "Delivery report callback (set with rd_kafka_conf_set_dr_cb())" },
1114 { _RK_GLOBAL|_RK_PRODUCER, "dr_msg_cb", _RK_C_PTR,
1115 _RK(dr_msg_cb),
1116 "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())" },
1117
1118
1119 /*
1120 * Topic properties
1121 */
1122
1123 /* Topic producer properties */
1124 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "request.required.acks", _RK_C_INT,
1125 _RKT(required_acks),
1126 "This field indicates the number of acknowledgements the leader "
1127 "broker must receive from ISR brokers before responding to the "
1128 "request: "
1129 "*0*=Broker does not send any response/ack to client, "
1130 "*-1* or *all*=Broker will block until message is committed by all "
1131 "in sync replicas (ISRs). If there are less than "
1132 "`min.insync.replicas` (broker configuration) in the ISR set the "
1133 "produce request will fail.",
1134 -1, 1000, -1,
1135 .s2i = {
1136 { -1, "all" },
1137 }
1138 },
1139 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "acks", _RK_C_ALIAS,
1140 .sdef = "request.required.acks" },
1141
1142 { _RK_TOPIC|_RK_PRODUCER|_RK_MED, "request.timeout.ms", _RK_C_INT,
1143 _RKT(request_timeout_ms),
1144 "The ack timeout of the producer request in milliseconds. "
1145 "This value is only enforced by the broker and relies "
1146 "on `request.required.acks` being != 0.",
1147 1, 900*1000, 5*1000 },
1148 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "message.timeout.ms", _RK_C_INT,
1149 _RKT(message_timeout_ms),
1150 "Local message timeout. "
1151 "This value is only enforced locally and limits the time a "
1152 "produced message waits for successful delivery. "
1153 "A time of 0 is infinite. "
1154 "This is the maximum time librdkafka may use to deliver a message "
1155 "(including retries). Delivery error occurs when either the retry "
1156 "count or the message timeout are exceeded.",
1157 0, INT32_MAX, 300*1000 },
1158 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "delivery.timeout.ms", _RK_C_ALIAS,
1159 .sdef = "message.timeout.ms" },
1160 { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED|_RK_EXPERIMENTAL,
1161 "queuing.strategy", _RK_C_S2I,
1162 _RKT(queuing_strategy),
1163 "Producer queuing strategy. FIFO preserves produce ordering, "
1164 "while LIFO prioritizes new messages.",
1165 .vdef = 0,
1166 .s2i = {
1167 { RD_KAFKA_QUEUE_FIFO, "fifo" },
1168 { RD_KAFKA_QUEUE_LIFO, "lifo" }
1169 }
1170 },
1171 { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED,
1172 "produce.offset.report", _RK_C_BOOL,
1173 _RKT(produce_offset_report),
1174 "No longer used.",
1175 0, 1, 0 },
1176 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "partitioner", _RK_C_STR,
1177 _RKT(partitioner_str),
1178 "Partitioner: "
1179 "`random` - random distribution, "
1180 "`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), "
1181 "`consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), "
1182 "`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), "
1183 "`murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).",
1184 .sdef = "consistent_random",
1185 .validate = rd_kafka_conf_validate_partitioner },
1186 { _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR,
1187 _RKT(partitioner),
1188 "Custom partitioner callback "
1189 "(set with rd_kafka_topic_conf_set_partitioner_cb())" },
1190 { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED|_RK_EXPERIMENTAL,
1191 "msg_order_cmp", _RK_C_PTR,
1192 _RKT(msg_order_cmp),
1193 "Message queue ordering comparator "
1194 "(set with rd_kafka_topic_conf_set_msg_order_cmp()). "
1195 "Also see `queuing.strategy`." },
1196 { _RK_TOPIC, "opaque", _RK_C_PTR,
1197 _RKT(opaque),
1198 "Application opaque (set with rd_kafka_topic_conf_set_opaque())" },
1199 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "compression.codec", _RK_C_S2I,
1200 _RKT(compression_codec),
1201 "Compression codec to use for compressing message sets. "
1202 "inherit = inherit global compression.codec configuration.",
1203 .vdef = RD_KAFKA_COMPRESSION_INHERIT,
1204 .s2i = {
1205 { RD_KAFKA_COMPRESSION_NONE, "none" },
1206#if WITH_ZLIB
1207 { RD_KAFKA_COMPRESSION_GZIP, "gzip" },
1208#endif
1209#if WITH_SNAPPY
1210 { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
1211#endif
1212 { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
1213#if WITH_ZSTD
1214 { RD_KAFKA_COMPRESSION_ZSTD, "zstd" },
1215#endif
1216 { RD_KAFKA_COMPRESSION_INHERIT, "inherit" },
1217 { 0 }
1218 } },
1219 { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "compression.type", _RK_C_ALIAS,
1220 .sdef = "compression.codec" },
1221 { _RK_TOPIC|_RK_PRODUCER|_RK_MED, "compression.level", _RK_C_INT,
1222 _RKT(compression_level),
1223 "Compression level parameter for algorithm selected by configuration "
1224 "property `compression.codec`. Higher values will result in better "
1225 "compression at the cost of more CPU usage. Usable range is "
1226 "algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; "
1227 "-1 = codec-dependent default compression level.",
1228 RD_KAFKA_COMPLEVEL_MIN,
1229 RD_KAFKA_COMPLEVEL_MAX,
1230 RD_KAFKA_COMPLEVEL_DEFAULT },
1231
1232
1233 /* Topic consumer properties */
1234 { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "auto.commit.enable",
1235 _RK_C_BOOL,
1236 _RKT(auto_commit),
1237 "[**LEGACY PROPERTY:** This property is used by the simple legacy "
1238 "consumer only. When using the high-level KafkaConsumer, the global "
1239 "`enable.auto.commit` property must be used instead]. "
1240 "If true, periodically commit offset of the last message handed "
1241 "to the application. This committed offset will be used when the "
1242 "process restarts to pick up where it left off. "
1243 "If false, the application will have to call "
1244 "`rd_kafka_offset_store()` to store an offset (optional). "
1245 "**NOTE:** There is currently no zookeeper integration, offsets "
1246 "will be written to broker or local file according to "
1247 "offset.store.method.",
1248 0, 1, 1 },
1249 { _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS,
1250 .sdef = "auto.commit.enable" },
1251 { _RK_TOPIC|_RK_CONSUMER|_RK_HIGH, "auto.commit.interval.ms",
1252 _RK_C_INT,
1253 _RKT(auto_commit_interval_ms),
1254 "[**LEGACY PROPERTY:** This setting is used by the simple legacy "
1255 "consumer only. When using the high-level KafkaConsumer, the "
1256 "global `auto.commit.interval.ms` property must be used instead]. "
1257 "The frequency in milliseconds that the consumer offsets "
1258 "are committed (written) to offset storage.",
1259 10, 86400*1000, 60*1000 },
1260 { _RK_TOPIC|_RK_CONSUMER|_RK_HIGH, "auto.offset.reset", _RK_C_S2I,
1261 _RKT(auto_offset_reset),
1262 "Action to take when there is no initial offset in offset store "
1263 "or the desired offset is out of range: "
1264 "'smallest','earliest' - automatically reset the offset to the smallest offset, "
1265 "'largest','latest' - automatically reset the offset to the largest offset, "
1266 "'error' - trigger an error which is retrieved by consuming messages "
1267 "and checking 'message->err'.",
1268 .vdef = RD_KAFKA_OFFSET_END,
1269 .s2i = {
1270 { RD_KAFKA_OFFSET_BEGINNING, "smallest" },
1271 { RD_KAFKA_OFFSET_BEGINNING, "earliest" },
1272 { RD_KAFKA_OFFSET_BEGINNING, "beginning" },
1273 { RD_KAFKA_OFFSET_END, "largest" },
1274 { RD_KAFKA_OFFSET_END, "latest" },
1275 { RD_KAFKA_OFFSET_END, "end" },
1276 { RD_KAFKA_OFFSET_INVALID, "error" },
1277 }
1278 },
1279 { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.path",
1280 _RK_C_STR,
1281 _RKT(offset_store_path),
1282 "Path to local file for storing offsets. If the path is a directory "
1283 "a filename will be automatically generated in that directory based "
1284 "on the topic and partition. "
1285 "File-based offset storage will be removed in a future version.",
1286 .sdef = "." },
1287
1288 { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED,
1289 "offset.store.sync.interval.ms", _RK_C_INT,
1290 _RKT(offset_store_sync_interval_ms),
1291 "fsync() interval for the offset file, in milliseconds. "
1292 "Use -1 to disable syncing, and 0 for immediate sync after "
1293 "each write. "
1294 "File-based offset storage will be removed in a future version.",
1295 -1, 86400*1000, -1 },
1296
1297 { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.method",
1298 _RK_C_S2I,
1299 _RKT(offset_store_method),
1300 "Offset commit store method: "
1301 "'file' - DEPRECATED: local file store (offset.store.path, et.al), "
1302 "'broker' - broker commit store "
1303 "(requires \"group.id\" to be configured and "
1304 "Apache Kafka 0.8.2 or later on the broker.).",
1305 .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
1306 .s2i = {
1307 { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
1308 { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
1309 }
1310 },
1311
1312 { _RK_TOPIC|_RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
1313 _RKT(consume_callback_max_msgs),
1314 "Maximum number of messages to dispatch in "
1315 "one `rd_kafka_consume_callback*()` call (0 = unlimited)",
1316 0, 1000000, 0 },
1317
1318 { 0, /* End */ }
1319};
1320
1321/**
1322 * @returns the property object for \p name in \p scope, or NULL if not found.
1323 * @remark does not work with interceptor configs.
1324 */
1325const struct rd_kafka_property *
1326rd_kafka_conf_prop_find (int scope, const char *name) {
1327 const struct rd_kafka_property *prop;
1328
1329 restart:
1330 for (prop = rd_kafka_properties ; prop->name ; prop++) {
1331
1332 if (!(prop->scope & scope))
1333 continue;
1334
1335 if (strcmp(prop->name, name))
1336 continue;
1337
1338 if (prop->type == _RK_C_ALIAS) {
1339 /* Caller supplied an alias, restart
1340 * search for real name. */
1341 name = prop->sdef;
1342 goto restart;
1343 }
1344
1345 return prop;
1346 }
1347
1348 return NULL;
1349}
1350
1351/**
1352 * @returns rd_true if property has been set/modified, else rd_false.
1353 * If \p name is unknown 0 is returned.
1354 */
1355static rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf,
1356 const char *name) {
1357 const struct rd_kafka_property *prop;
1358
1359 if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name)))
1360 return rd_false;
1361
1362 return rd_kafka_anyconf_is_modified(conf, prop);
1363}
1364
1365
1366/**
1367 * @returns true if property has been set/modified, else 0.
1368 * If \p name is unknown 0 is returned.
1369 */
1370static
1371rd_bool_t rd_kafka_topic_conf_is_modified (const rd_kafka_topic_conf_t *conf,
1372 const char *name) {
1373 const struct rd_kafka_property *prop;
1374
1375 if (!(prop = rd_kafka_conf_prop_find(_RK_TOPIC, name)))
1376 return 0;
1377
1378 return rd_kafka_anyconf_is_modified(conf, prop);
1379}
1380
1381
1382
1383static rd_kafka_conf_res_t
1384rd_kafka_anyconf_set_prop0 (int scope, void *conf,
1385 const struct rd_kafka_property *prop,
1386 const char *istr, int ival, rd_kafka_conf_set_mode_t set_mode,
1387 char *errstr, size_t errstr_size) {
1388 rd_kafka_conf_res_t res;
1389
1390#define _RK_PTR(TYPE,BASE,OFFSET) (TYPE)(void *)(((char *)(BASE))+(OFFSET))
1391
1392 /* Try interceptors first (only for GLOBAL config) */
1393 if (scope & _RK_GLOBAL) {
1394 if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL)
1395 res = RD_KAFKA_CONF_UNKNOWN;
1396 else
1397 res = rd_kafka_interceptors_on_conf_set(conf,
1398 prop->name,
1399 istr,
1400 errstr,
1401 errstr_size);
1402 if (res != RD_KAFKA_CONF_UNKNOWN)
1403 return res;
1404 }
1405
1406
1407 if (prop->set) {
1408 /* Custom setter */
1409 rd_kafka_conf_res_t res;
1410
1411 res = prop->set(scope, conf, prop->name, istr,
1412 _RK_PTR(void *, conf, prop->offset),
1413 set_mode, errstr, errstr_size);
1414
1415 if (res != RD_KAFKA_CONF_OK)
1416 return res;
1417
1418 /* FALLTHRU so that property value is set. */
1419 }
1420
1421 switch (prop->type)
1422 {
1423 case _RK_C_STR:
1424 {
1425 char **str = _RK_PTR(char **, conf, prop->offset);
1426 if (*str)
1427 rd_free(*str);
1428 if (istr)
1429 *str = rd_strdup(istr);
1430 else
1431 *str = prop->sdef ? rd_strdup(prop->sdef) : NULL;
1432 break;
1433 }
1434 case _RK_C_KSTR:
1435 {
1436 rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
1437 prop->offset);
1438 if (*kstr)
1439 rd_kafkap_str_destroy(*kstr);
1440 if (istr)
1441 *kstr = rd_kafkap_str_new(istr, -1);
1442 else
1443 *kstr = prop->sdef ?
1444 rd_kafkap_str_new(prop->sdef, -1) : NULL;
1445 break;
1446 }
1447 case _RK_C_PTR:
1448 *_RK_PTR(const void **, conf, prop->offset) = istr;
1449 break;
1450 case _RK_C_BOOL:
1451 case _RK_C_INT:
1452 case _RK_C_S2I:
1453 case _RK_C_S2F:
1454 {
1455 int *val = _RK_PTR(int *, conf, prop->offset);
1456
1457 if (prop->type == _RK_C_S2F) {
1458 switch (set_mode)
1459 {
1460 case _RK_CONF_PROP_SET_REPLACE:
1461 *val = ival;
1462 break;
1463 case _RK_CONF_PROP_SET_ADD:
1464 *val |= ival;
1465 break;
1466 case _RK_CONF_PROP_SET_DEL:
1467 *val &= ~ival;
1468 break;
1469 }
1470 } else {
1471 /* Single assignment */
1472 *val = ival;
1473
1474 }
1475 break;
1476 }
1477 case _RK_C_PATLIST:
1478 {
1479 /* Split comma-separated list into individual regex expressions
1480 * that are verified and then append to the provided list. */
1481 rd_kafka_pattern_list_t **plist;
1482
1483 plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
1484
1485 if (*plist)
1486 rd_kafka_pattern_list_destroy(*plist);
1487
1488 if (istr) {
1489 if (!(*plist =
1490 rd_kafka_pattern_list_new(istr,
1491 errstr,
1492 (int)errstr_size)))
1493 return RD_KAFKA_CONF_INVALID;
1494 } else
1495 *plist = NULL;
1496
1497 break;
1498 }
1499
1500 case _RK_C_INTERNAL:
1501 /* Probably handled by setter */
1502 break;
1503
1504 default:
1505 rd_kafka_assert(NULL, !*"unknown conf type");
1506 }
1507
1508
1509 rd_kafka_anyconf_set_modified(conf, prop, 1/*modified*/);
1510 return RD_KAFKA_CONF_OK;
1511}
1512
1513
1514/**
1515 * @brief Find s2i (string-to-int mapping) entry and return its array index,
1516 * or -1 on miss.
1517 */
1518static int rd_kafka_conf_s2i_find (const struct rd_kafka_property *prop,
1519 const char *value) {
1520 int j;
1521
1522 for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
1523 if (prop->s2i[j].str &&
1524 !rd_strcasecmp(prop->s2i[j].str, value))
1525 return j;
1526 }
1527
1528 return -1;
1529}
1530
1531
1532/**
1533 * @brief Set configuration property.
1534 *
1535 * @param allow_specific Allow rd_kafka_*conf_set_..() to be set,
1536 * such as rd_kafka_conf_set_log_cb().
1537 * Should not be allowed from the conf_set() string interface.
1538 */
1539static rd_kafka_conf_res_t
1540rd_kafka_anyconf_set_prop (int scope, void *conf,
1541 const struct rd_kafka_property *prop,
1542 const char *value,
1543 int allow_specific,
1544 char *errstr, size_t errstr_size) {
1545 int ival;
1546
1547 switch (prop->type)
1548 {
1549 case _RK_C_STR:
1550 case _RK_C_KSTR:
1551 if (prop->s2i[0].str) {
1552 int match;
1553
1554 if (!value ||
1555 (match = rd_kafka_conf_s2i_find(prop, value)) == -1){
1556 rd_snprintf(errstr, errstr_size,
1557 "Invalid value for "
1558 "configuration property \"%s\": "
1559 "%s",
1560 prop->name, value);
1561 return RD_KAFKA_CONF_INVALID;
1562 }
1563
1564 /* Replace value string with canonical form */
1565 value = prop->s2i[match].str;
1566 }
1567 /* FALLTHRU */
1568 case _RK_C_PATLIST:
1569 if (prop->validate &&
1570 (!value || !prop->validate(prop, value, -1))) {
1571 rd_snprintf(errstr, errstr_size,
1572 "Invalid value for "
1573 "configuration property \"%s\": %s",
1574 prop->name, value);
1575 return RD_KAFKA_CONF_INVALID;
1576 }
1577
1578 return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
1579 _RK_CONF_PROP_SET_REPLACE,
1580 errstr, errstr_size);
1581
1582 case _RK_C_PTR:
1583 /* Allow hidden internal unit test properties to
1584 * be set from generic conf_set() interface. */
1585 if (!allow_specific && !(prop->scope & _RK_HIDDEN)) {
1586 rd_snprintf(errstr, errstr_size,
1587 "Property \"%s\" must be set through "
1588 "dedicated .._set_..() function",
1589 prop->name);
1590 return RD_KAFKA_CONF_INVALID;
1591 }
1592 return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
1593 _RK_CONF_PROP_SET_REPLACE,
1594 errstr, errstr_size);
1595
1596 case _RK_C_BOOL:
1597 if (!value) {
1598 rd_snprintf(errstr, errstr_size,
1599 "Bool configuration property \"%s\" cannot "
1600 "be set to empty value", prop->name);
1601 return RD_KAFKA_CONF_INVALID;
1602 }
1603
1604
1605 if (!rd_strcasecmp(value, "true") ||
1606 !rd_strcasecmp(value, "t") ||
1607 !strcmp(value, "1"))
1608 ival = 1;
1609 else if (!rd_strcasecmp(value, "false") ||
1610 !rd_strcasecmp(value, "f") ||
1611 !strcmp(value, "0"))
1612 ival = 0;
1613 else {
1614 rd_snprintf(errstr, errstr_size,
1615 "Expected bool value for \"%s\": "
1616 "true or false", prop->name);
1617 return RD_KAFKA_CONF_INVALID;
1618 }
1619
1620 rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
1621 _RK_CONF_PROP_SET_REPLACE,
1622 errstr, errstr_size);
1623 return RD_KAFKA_CONF_OK;
1624
1625 case _RK_C_INT:
1626 {
1627 const char *end;
1628
1629 if (!value) {
1630 rd_snprintf(errstr, errstr_size,
1631 "Integer configuration "
1632 "property \"%s\" cannot be set "
1633 "to empty value", prop->name);
1634 return RD_KAFKA_CONF_INVALID;
1635 }
1636
1637 ival = (int)strtol(value, (char **)&end, 0);
1638 if (end == value) {
1639 /* Non numeric, check s2i for string mapping */
1640 int match = rd_kafka_conf_s2i_find(prop, value);
1641
1642 if (match == -1) {
1643 rd_snprintf(errstr, errstr_size,
1644 "Invalid value for "
1645 "configuration property \"%s\"",
1646 prop->name);
1647 return RD_KAFKA_CONF_INVALID;
1648 }
1649
1650 ival = prop->s2i[match].val;
1651 }
1652
1653 if (ival < prop->vmin ||
1654 ival > prop->vmax) {
1655 rd_snprintf(errstr, errstr_size,
1656 "Configuration property \"%s\" value "
1657 "%i is outside allowed range %i..%i\n",
1658 prop->name, ival,
1659 prop->vmin,
1660 prop->vmax);
1661 return RD_KAFKA_CONF_INVALID;
1662 }
1663
1664 rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
1665 _RK_CONF_PROP_SET_REPLACE,
1666 errstr, errstr_size);
1667 return RD_KAFKA_CONF_OK;
1668 }
1669
1670 case _RK_C_S2I:
1671 case _RK_C_S2F:
1672 {
1673 int j;
1674 const char *next;
1675
1676 if (!value) {
1677 rd_snprintf(errstr, errstr_size,
1678 "Configuration "
1679 "property \"%s\" cannot be set "
1680 "to empty value", prop->name);
1681 return RD_KAFKA_CONF_INVALID;
1682 }
1683
1684 next = value;
1685 while (next && *next) {
1686 const char *s, *t;
1687 rd_kafka_conf_set_mode_t set_mode = _RK_CONF_PROP_SET_ADD; /* S2F */
1688
1689 s = next;
1690
1691 if (prop->type == _RK_C_S2F &&
1692 (t = strchr(s, ','))) {
1693 /* CSV flag field */
1694 next = t+1;
1695 } else {
1696 /* Single string */
1697 t = s+strlen(s);
1698 next = NULL;
1699 }
1700
1701
1702 /* Left trim */
1703 while (s < t && isspace((int)*s))
1704 s++;
1705
1706 /* Right trim */
1707 while (t > s && isspace((int)*t))
1708 t--;
1709
1710 /* S2F: +/- prefix */
1711 if (prop->type == _RK_C_S2F) {
1712 if (*s == '+') {
1713 set_mode = _RK_CONF_PROP_SET_ADD;
1714 s++;
1715 } else if (*s == '-') {
1716 set_mode = _RK_CONF_PROP_SET_DEL;
1717 s++;
1718 }
1719 }
1720
1721 /* Empty string? */
1722 if (s == t)
1723 continue;
1724
1725 /* Match string to s2i table entry */
1726 for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
1727 int new_val;
1728
1729 if (!prop->s2i[j].str)
1730 continue;
1731
1732 if (strlen(prop->s2i[j].str) == (size_t)(t-s) &&
1733 !rd_strncasecmp(prop->s2i[j].str, s,
1734 (int)(t-s)))
1735 new_val = prop->s2i[j].val;
1736 else
1737 continue;
1738
1739 rd_kafka_anyconf_set_prop0(scope, conf, prop,
1740 value, new_val,
1741 set_mode,
1742 errstr, errstr_size);
1743
1744 if (prop->type == _RK_C_S2F) {
1745 /* Flags: OR it in: do next */
1746 break;
1747 } else {
1748 /* Single assignment */
1749 return RD_KAFKA_CONF_OK;
1750 }
1751 }
1752
1753 /* S2F: Good match: continue with next */
1754 if (j < (int)RD_ARRAYSIZE(prop->s2i))
1755 continue;
1756
1757 /* No match */
1758 rd_snprintf(errstr, errstr_size,
1759 "Invalid value \"%.*s\" for "
1760 "configuration property \"%s\"",
1761 (int)(t-s), s, prop->name);
1762 return RD_KAFKA_CONF_INVALID;
1763
1764 }
1765 return RD_KAFKA_CONF_OK;
1766 }
1767
1768 case _RK_C_INTERNAL:
1769 rd_snprintf(errstr, errstr_size,
1770 "Internal property \"%s\" not settable",
1771 prop->name);
1772 return RD_KAFKA_CONF_INVALID;
1773
1774 case _RK_C_INVALID:
1775 rd_snprintf(errstr, errstr_size, "%s", prop->desc);
1776 return RD_KAFKA_CONF_INVALID;
1777
1778 default:
1779 rd_kafka_assert(NULL, !*"unknown conf type");
1780 }
1781
1782 /* not reachable */
1783 return RD_KAFKA_CONF_INVALID;
1784}
1785
1786
1787
1788static void rd_kafka_defaultconf_set (int scope, void *conf) {
1789 const struct rd_kafka_property *prop;
1790
1791 for (prop = rd_kafka_properties ; prop->name ; prop++) {
1792 if (!(prop->scope & scope))
1793 continue;
1794
1795 if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
1796 continue;
1797
1798 if (prop->ctor)
1799 prop->ctor(scope, conf);
1800
1801 if (prop->sdef || prop->vdef || prop->pdef)
1802 rd_kafka_anyconf_set_prop0(scope, conf, prop,
1803 prop->sdef ?
1804 prop->sdef : prop->pdef,
1805 prop->vdef,
1806 _RK_CONF_PROP_SET_REPLACE,
1807 NULL, 0);
1808 }
1809}
1810
1811rd_kafka_conf_t *rd_kafka_conf_new (void) {
1812 rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
1813 rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
1814 rd_kafka_anyconf_clear_all_is_modified(conf);
1815 return conf;
1816}
1817
1818rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) {
1819 rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
1820 rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
1821 rd_kafka_anyconf_clear_all_is_modified(tconf);
1822 return tconf;
1823}
1824
1825
1826static int rd_kafka_anyconf_set (int scope, void *conf,
1827 const char *name, const char *value,
1828 char *errstr, size_t errstr_size) {
1829 char estmp[1];
1830 const struct rd_kafka_property *prop;
1831 rd_kafka_conf_res_t res;
1832
1833 if (!errstr) {
1834 errstr = estmp;
1835 errstr_size = 0;
1836 }
1837
1838 if (value && !*value)
1839 value = NULL;
1840
1841 /* Try interceptors first (only for GLOBAL config for now) */
1842 if (scope & _RK_GLOBAL) {
1843 res = rd_kafka_interceptors_on_conf_set(
1844 (rd_kafka_conf_t *)conf, name, value,
1845 errstr, errstr_size);
1846 /* Handled (successfully or not) by interceptor. */
1847 if (res != RD_KAFKA_CONF_UNKNOWN)
1848 return res;
1849 }
1850
1851 /* Then global config */
1852
1853
1854 for (prop = rd_kafka_properties ; prop->name ; prop++) {
1855
1856 if (!(prop->scope & scope))
1857 continue;
1858
1859 if (strcmp(prop->name, name))
1860 continue;
1861
1862 if (prop->type == _RK_C_ALIAS)
1863 return rd_kafka_anyconf_set(scope, conf,
1864 prop->sdef, value,
1865 errstr, errstr_size);
1866
1867 return rd_kafka_anyconf_set_prop(scope, conf, prop, value,
1868 0/*don't allow specifics*/,
1869 errstr, errstr_size);
1870 }
1871
1872 rd_snprintf(errstr, errstr_size,
1873 "No such configuration property: \"%s\"", name);
1874
1875 return RD_KAFKA_CONF_UNKNOWN;
1876}
1877
1878
1879/**
1880 * @brief Set a rd_kafka_*_conf_set_...() specific property, such as
1881 * rd_kafka_conf_set_error_cb().
1882 *
1883 * @warning Will not call interceptor's on_conf_set.
1884 * @warning Asserts if \p name is not known or value is incorrect.
1885 *
1886 * Implemented as a macro to have rd_assert() print the original function.
1887 */
1888
1889#define rd_kafka_anyconf_set_internal(SCOPE,CONF,NAME,VALUE) do { \
1890 const struct rd_kafka_property *_prop; \
1891 rd_kafka_conf_res_t _res; \
1892 _prop = rd_kafka_conf_prop_find(SCOPE, NAME); \
1893 rd_assert(_prop && *"invalid property name"); \
1894 _res = rd_kafka_anyconf_set_prop(SCOPE, CONF, _prop, \
1895 (const void *)VALUE, \
1896 1/*allow-specifics*/, \
1897 NULL, 0); \
1898 rd_assert(_res == RD_KAFKA_CONF_OK); \
1899 } while (0)
1900
1901
1902rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
1903 const char *name,
1904 const char *value,
1905 char *errstr, size_t errstr_size) {
1906 rd_kafka_conf_res_t res;
1907
1908 res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value,
1909 errstr, errstr_size);
1910 if (res != RD_KAFKA_CONF_UNKNOWN)
1911 return res;
1912
1913 /* Fallthru:
1914 * If the global property was unknown, try setting it on the
1915 * default topic config. */
1916 if (!conf->topic_conf) {
1917 /* Create topic config, might be over-written by application
1918 * later. */
1919 rd_kafka_conf_set_default_topic_conf(conf,
1920 rd_kafka_topic_conf_new());
1921 }
1922
1923 return rd_kafka_topic_conf_set(conf->topic_conf, name, value,
1924 errstr, errstr_size);
1925}
1926
1927
1928rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
1929 const char *name,
1930 const char *value,
1931 char *errstr, size_t errstr_size) {
1932 if (!strncmp(name, "topic.", strlen("topic.")))
1933 name += strlen("topic.");
1934
1935 return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value,
1936 errstr, errstr_size);
1937}
1938
1939
1940/**
1941 * @brief Overwrites the contents of \p str up until but not including
1942 * the nul-term.
1943 */
1944void rd_kafka_desensitize_str (char *str) {
1945 size_t len;
1946 static const char redacted[] = "(REDACTED)";
1947
1948#ifdef _MSC_VER
1949 len = strlen(str);
1950 SecureZeroMemory(str, len);
1951#else
1952 volatile char *volatile s;
1953
1954 for (s = str ; *s ; s++)
1955 *s = '\0';
1956
1957 len = (size_t)(s - str);
1958#endif
1959
1960 if (len > sizeof(redacted))
1961 memcpy(str, redacted, sizeof(redacted));
1962}
1963
1964
1965
1966
1967/**
1968 * @brief Overwrite the value of \p prop, if sensitive.
1969 */
1970static RD_INLINE void
1971rd_kafka_anyconf_prop_desensitize (int scope, void *conf,
1972 const struct rd_kafka_property *prop) {
1973 if (likely(!(prop->scope & _RK_SENSITIVE)))
1974 return;
1975
1976 switch (prop->type)
1977 {
1978 case _RK_C_STR:
1979 {
1980 char **str = _RK_PTR(char **, conf, prop->offset);
1981 if (*str)
1982 rd_kafka_desensitize_str(*str);
1983 break;
1984 }
1985
1986 default:
1987 rd_assert(!*"BUG: Don't know how to desensitize prop type");
1988 break;
1989 }
1990}
1991
1992
1993/**
1994 * @brief Desensitize all sensitive properties in \p conf
1995 */
1996static void rd_kafka_anyconf_desensitize (int scope, void *conf) {
1997 const struct rd_kafka_property *prop;
1998
1999 for (prop = rd_kafka_properties; prop->name ; prop++) {
2000 if (!(prop->scope & scope))
2001 continue;
2002
2003 rd_kafka_anyconf_prop_desensitize(scope, conf, prop);
2004 }
2005}
2006
2007/**
2008 * @brief Overwrite the values of sensitive properties
2009 */
2010void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf) {
2011 if (conf->topic_conf)
2012 rd_kafka_anyconf_desensitize(_RK_TOPIC,
2013 conf->topic_conf);
2014 rd_kafka_anyconf_desensitize(_RK_GLOBAL, conf);
2015}
2016
2017/**
2018 * @brief Overwrite the values of sensitive properties
2019 */
2020void rd_kafka_topic_conf_desensitize (rd_kafka_topic_conf_t *tconf) {
2021 rd_kafka_anyconf_desensitize(_RK_TOPIC, tconf);
2022}
2023
2024
2025static void rd_kafka_anyconf_clear (int scope, void *conf,
2026 const struct rd_kafka_property *prop) {
2027
2028 rd_kafka_anyconf_prop_desensitize(scope, conf, prop);
2029
2030 switch (prop->type)
2031 {
2032 case _RK_C_STR:
2033 {
2034 char **str = _RK_PTR(char **, conf, prop->offset);
2035
2036 if (*str) {
2037 if (prop->set) {
2038 prop->set(scope, conf, prop->name, NULL, *str,
2039 _RK_CONF_PROP_SET_DEL, NULL, 0);
2040 /* FALLTHRU */
2041 }
2042 rd_free(*str);
2043 *str = NULL;
2044 }
2045 }
2046 break;
2047
2048 case _RK_C_KSTR:
2049 {
2050 rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
2051 prop->offset);
2052 if (*kstr) {
2053 rd_kafkap_str_destroy(*kstr);
2054 *kstr = NULL;
2055 }
2056 }
2057 break;
2058
2059 case _RK_C_PATLIST:
2060 {
2061 rd_kafka_pattern_list_t **plist;
2062 plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
2063 if (*plist) {
2064 rd_kafka_pattern_list_destroy(*plist);
2065 *plist = NULL;
2066 }
2067 }
2068 break;
2069
2070 case _RK_C_PTR:
2071 if (_RK_PTR(void *, conf, prop->offset) != NULL) {
2072 if (!strcmp(prop->name, "default_topic_conf")) {
2073 rd_kafka_topic_conf_t **tconf;
2074
2075 tconf = _RK_PTR(rd_kafka_topic_conf_t **,
2076 conf, prop->offset);
2077 if (*tconf) {
2078 rd_kafka_topic_conf_destroy(*tconf);
2079 *tconf = NULL;
2080 }
2081 }
2082 }
2083 break;
2084
2085 default:
2086 break;
2087 }
2088
2089 if (prop->dtor)
2090 prop->dtor(scope, conf);
2091
2092}
2093
2094void rd_kafka_anyconf_destroy (int scope, void *conf) {
2095 const struct rd_kafka_property *prop;
2096
2097 /* Call on_conf_destroy() interceptors */
2098 if (scope == _RK_GLOBAL)
2099 rd_kafka_interceptors_on_conf_destroy(conf);
2100
2101 for (prop = rd_kafka_properties; prop->name ; prop++) {
2102 if (!(prop->scope & scope))
2103 continue;
2104
2105 rd_kafka_anyconf_clear(scope, conf, prop);
2106 }
2107}
2108
2109
2110void rd_kafka_conf_destroy (rd_kafka_conf_t *conf) {
2111 rd_kafka_anyconf_destroy(_RK_GLOBAL, conf);
2112 //FIXME: partition_assignors
2113 rd_free(conf);
2114}
2115
2116void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf) {
2117 rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf);
2118 rd_free(topic_conf);
2119}
2120
2121
2122
2123static void rd_kafka_anyconf_copy (int scope, void *dst, const void *src,
2124 size_t filter_cnt, const char **filter) {
2125 const struct rd_kafka_property *prop;
2126
2127 for (prop = rd_kafka_properties ; prop->name ; prop++) {
2128 const char *val = NULL;
2129 int ival = 0;
2130 char *valstr;
2131 size_t valsz;
2132 size_t fi;
2133 size_t nlen;
2134
2135 if (!(prop->scope & scope))
2136 continue;
2137
2138 if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
2139 continue;
2140
2141 /* Skip properties that have not been set,
2142 * unless it is an internal one which requires
2143 * extra logic, such as the interceptors. */
2144 if (!rd_kafka_anyconf_is_modified(src, prop) &&
2145 prop->type != _RK_C_INTERNAL)
2146 continue;
2147
2148 /* Apply filter, if any. */
2149 nlen = strlen(prop->name);
2150 for (fi = 0 ; fi < filter_cnt ; fi++) {
2151 size_t flen = strlen(filter[fi]);
2152 if (nlen >= flen &&
2153 !strncmp(filter[fi], prop->name, flen))
2154 break;
2155 }
2156 if (fi < filter_cnt)
2157 continue; /* Filter matched */
2158
2159 switch (prop->type)
2160 {
2161 case _RK_C_STR:
2162 case _RK_C_PTR:
2163 val = *_RK_PTR(const char **, src, prop->offset);
2164
2165 if (!strcmp(prop->name, "default_topic_conf") && val)
2166 val = (void *)rd_kafka_topic_conf_dup(
2167 (const rd_kafka_topic_conf_t *)
2168 (void *)val);
2169 break;
2170 case _RK_C_KSTR:
2171 {
2172 rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **,
2173 src, prop->offset);
2174 if (*kstr)
2175 val = (*kstr)->str;
2176 break;
2177 }
2178
2179 case _RK_C_BOOL:
2180 case _RK_C_INT:
2181 case _RK_C_S2I:
2182 case _RK_C_S2F:
2183 ival = *_RK_PTR(const int *, src, prop->offset);
2184
2185 /* Get string representation of configuration value. */
2186 valsz = 0;
2187 rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
2188 valstr = rd_alloca(valsz);
2189 rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
2190 val = valstr;
2191 break;
2192 case _RK_C_PATLIST:
2193 {
2194 const rd_kafka_pattern_list_t **plist;
2195 plist = _RK_PTR(const rd_kafka_pattern_list_t **,
2196 src, prop->offset);
2197 if (*plist)
2198 val = (*plist)->rkpl_orig;
2199 break;
2200 }
2201 case _RK_C_INTERNAL:
2202 /* Handled by ->copy() below. */
2203 break;
2204 default:
2205 continue;
2206 }
2207
2208 if (prop->copy)
2209 prop->copy(scope, dst, src,
2210 _RK_PTR(void *, dst, prop->offset),
2211 _RK_PTR(const void *, src, prop->offset),
2212 filter_cnt, filter);
2213
2214 rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival,
2215 _RK_CONF_PROP_SET_REPLACE, NULL, 0);
2216 }
2217}
2218
2219
2220rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf) {
2221 rd_kafka_conf_t *new = rd_kafka_conf_new();
2222
2223 rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL);
2224
2225 rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL);
2226
2227 return new;
2228}
2229
2230rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
2231 size_t filter_cnt,
2232 const char **filter) {
2233 rd_kafka_conf_t *new = rd_kafka_conf_new();
2234
2235 rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter);
2236
2237 rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter);
2238
2239 return new;
2240}
2241
2242
2243rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t
2244 *conf) {
2245 rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new();
2246
2247 rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL);
2248
2249 return new;
2250}
2251
2252rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup (rd_kafka_t *rk) {
2253 if (rk->rk_conf.topic_conf)
2254 return rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
2255 else
2256 return rd_kafka_topic_conf_new();
2257}
2258
2259void rd_kafka_conf_set_events (rd_kafka_conf_t *conf, int events) {
2260 char tmp[32];
2261 rd_snprintf(tmp, sizeof(tmp), "%d", events);
2262 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enabled_events", tmp);
2263}
2264
2265void
2266rd_kafka_conf_set_background_event_cb (rd_kafka_conf_t *conf,
2267 void (*event_cb) (rd_kafka_t *rk,
2268 rd_kafka_event_t *rkev,
2269 void *opaque)) {
2270 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "background_event_cb",
2271 event_cb);
2272}
2273
2274
2275void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf,
2276 void (*dr_cb) (rd_kafka_t *rk,
2277 void *payload, size_t len,
2278 rd_kafka_resp_err_t err,
2279 void *opaque, void *msg_opaque)) {
2280 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_cb", dr_cb);
2281}
2282
2283
2284void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
2285 void (*dr_msg_cb) (rd_kafka_t *rk,
2286 const rd_kafka_message_t *
2287 rkmessage,
2288 void *opaque)) {
2289 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_msg_cb", dr_msg_cb);
2290}
2291
2292
2293void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
2294 void (*consume_cb) (rd_kafka_message_t *
2295 rkmessage,
2296 void *opaque)) {
2297 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "consume_cb",
2298 consume_cb);
2299}
2300
2301void rd_kafka_conf_set_rebalance_cb (
2302 rd_kafka_conf_t *conf,
2303 void (*rebalance_cb) (rd_kafka_t *rk,
2304 rd_kafka_resp_err_t err,
2305 rd_kafka_topic_partition_list_t *partitions,
2306 void *opaque)) {
2307 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "rebalance_cb",
2308 rebalance_cb);
2309}
2310
2311void rd_kafka_conf_set_offset_commit_cb (
2312 rd_kafka_conf_t *conf,
2313 void (*offset_commit_cb) (rd_kafka_t *rk,
2314 rd_kafka_resp_err_t err,
2315 rd_kafka_topic_partition_list_t *offsets,
2316 void *opaque)) {
2317 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "offset_commit_cb",
2318 offset_commit_cb);
2319}
2320
2321
2322
2323void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf,
2324 void (*error_cb) (rd_kafka_t *rk, int err,
2325 const char *reason,
2326 void *opaque)) {
2327 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "error_cb", error_cb);
2328}
2329
2330
2331void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
2332 void (*throttle_cb) (
2333 rd_kafka_t *rk,
2334 const char *broker_name,
2335 int32_t broker_id,
2336 int throttle_time_ms,
2337 void *opaque)) {
2338 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "throttle_cb",
2339 throttle_cb);
2340}
2341
2342
2343void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf,
2344 void (*log_cb) (const rd_kafka_t *rk, int level,
2345 const char *fac, const char *buf)) {
2346 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "log_cb", log_cb);
2347}
2348
2349
2350void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf,
2351 int (*stats_cb) (rd_kafka_t *rk,
2352 char *json,
2353 size_t json_len,
2354 void *opaque)) {
2355 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "stats_cb", stats_cb);
2356}
2357
2358void rd_kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_conf_t *conf,
2359 void (*oauthbearer_token_refresh_cb) (
2360 rd_kafka_t *rk,
2361 const char *oauthbearer_config,
2362 void *opaque)) {
2363#if WITH_SASL_OAUTHBEARER
2364 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf,
2365 "oauthbearer_token_refresh_cb", oauthbearer_token_refresh_cb);
2366#endif
2367}
2368
2369void rd_kafka_conf_set_socket_cb (rd_kafka_conf_t *conf,
2370 int (*socket_cb) (int domain, int type,
2371 int protocol,
2372 void *opaque)) {
2373 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "socket_cb",
2374 socket_cb);
2375}
2376
2377void
2378rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
2379 int (*connect_cb) (int sockfd,
2380 const struct sockaddr *addr,
2381 int addrlen,
2382 const char *id,
2383 void *opaque)) {
2384 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "connect_cb",
2385 connect_cb);
2386}
2387
2388void
2389rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
2390 int (*closesocket_cb) (int sockfd,
2391 void *opaque)) {
2392 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "closesocket_cb",
2393 closesocket_cb);
2394}
2395
2396
2397
2398#ifndef _MSC_VER
2399void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
2400 int (*open_cb) (const char *pathname,
2401 int flags, mode_t mode,
2402 void *opaque)) {
2403 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "open_cb", open_cb);
2404}
2405#endif
2406
2407
2408rd_kafka_conf_res_t
2409rd_kafka_conf_set_ssl_cert_verify_cb (
2410 rd_kafka_conf_t *conf,
2411 int (*ssl_cert_verify_cb) (rd_kafka_t *rk,
2412 const char *broker_name,
2413 int32_t broker_id,
2414 int *x509_set_error,
2415 int depth,
2416 const char *buf, size_t size,
2417 char *errstr, size_t errstr_size,
2418 void *opaque)) {
2419#if !WITH_SSL
2420 return RD_KAFKA_CONF_INVALID;
2421#else
2422 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf,
2423 "ssl.certificate.verify_cb",
2424 ssl_cert_verify_cb);
2425 return RD_KAFKA_CONF_OK;
2426#endif
2427}
2428
2429
2430void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque) {
2431 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "opaque", opaque);
2432}
2433
2434
2435void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
2436 rd_kafka_topic_conf_t *tconf) {
2437 if (conf->topic_conf)
2438 rd_kafka_topic_conf_destroy(conf->topic_conf);
2439
2440 rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "default_topic_conf",
2441 tconf);
2442}
2443
2444
2445void
2446rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
2447 int32_t (*partitioner) (
2448 const rd_kafka_topic_t *rkt,
2449 const void *keydata,
2450 size_t keylen,
2451 int32_t partition_cnt,
2452 void *rkt_opaque,
2453 void *msg_opaque)) {
2454 rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "partitioner_cb",
2455 partitioner);
2456}
2457
2458void
2459rd_kafka_topic_conf_set_msg_order_cmp (rd_kafka_topic_conf_t *topic_conf,
2460 int (*msg_order_cmp) (
2461 const rd_kafka_message_t *a,
2462 const rd_kafka_message_t *b)) {
2463 rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "msg_order_cmp",
2464 msg_order_cmp);
2465}
2466
2467void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *topic_conf,
2468 void *opaque) {
2469 rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "opaque", opaque);
2470}
2471
2472
2473
2474
2475/**
2476 * @brief Convert flags \p ival to csv-string using S2F property \p prop.
2477 *
2478 * This function has two modes: size query and write.
2479 * To query for needed size call with dest==NULL,
2480 * to write to buffer of size dest_size call with dest!=NULL.
2481 *
2482 * An \p ival of -1 means all.
2483 *
2484 * @returns the number of bytes written to \p dest (if not NULL), else the
2485 * total number of bytes needed.
2486 *
2487 */
2488size_t rd_kafka_conf_flags2str (char *dest, size_t dest_size, const char *delim,
2489 const struct rd_kafka_property *prop,
2490 int ival) {
2491 size_t of = 0;
2492 int j;
2493
2494 if (dest && dest_size > 0)
2495 *dest = '\0';
2496
2497 /* Phase 1: scan for set flags, accumulate needed size.
2498 * Phase 2: write to dest */
2499 for (j = 0 ; prop->s2i[j].str ; j++) {
2500 if (prop->type == _RK_C_S2F && ival != -1 &&
2501 (ival & prop->s2i[j].val) != prop->s2i[j].val)
2502 continue;
2503 else if (prop->type == _RK_C_S2I &&
2504 ival != -1 && prop->s2i[j].val != ival)
2505 continue;
2506
2507 if (!dest)
2508 of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0);
2509 else {
2510 size_t r;
2511 r = rd_snprintf(dest+of, dest_size-of,
2512 "%s%s",
2513 of > 0 ? delim:"",
2514 prop->s2i[j].str);
2515 if (r > dest_size-of) {
2516 r = dest_size-of;
2517 break;
2518 }
2519 of += r;
2520 }
2521 }
2522
2523 return of+1/*nul*/;
2524}
2525
2526
2527/**
2528 * Return "original"(re-created) configuration value string
2529 */
2530static rd_kafka_conf_res_t
2531rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
2532 char *dest, size_t *dest_size) {
2533 char tmp[22];
2534 const char *val = NULL;
2535 size_t val_len = 0;
2536 int j;
2537
2538 switch (prop->type)
2539 {
2540 case _RK_C_STR:
2541 val = *_RK_PTR(const char **, conf, prop->offset);
2542 break;
2543
2544 case _RK_C_KSTR:
2545 {
2546 const rd_kafkap_str_t **kstr = _RK_PTR(const rd_kafkap_str_t **,
2547 conf, prop->offset);
2548 if (*kstr)
2549 val = (*kstr)->str;
2550 break;
2551 }
2552
2553 case _RK_C_PTR:
2554 val = *_RK_PTR(const void **, conf, prop->offset);
2555 if (val) {
2556 rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val);
2557 val = tmp;
2558 }
2559 break;
2560
2561 case _RK_C_BOOL:
2562 val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false");
2563 break;
2564
2565 case _RK_C_INT:
2566 rd_snprintf(tmp, sizeof(tmp), "%i",
2567 *_RK_PTR(int *, conf, prop->offset));
2568 val = tmp;
2569 break;
2570
2571 case _RK_C_S2I:
2572 for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
2573 if (prop->s2i[j].val ==
2574 *_RK_PTR(int *, conf, prop->offset)) {
2575 val = prop->s2i[j].str;
2576 break;
2577 }
2578 }
2579 break;
2580
2581 case _RK_C_S2F:
2582 {
2583 const int ival = *_RK_PTR(const int *, conf, prop->offset);
2584
2585 val_len = rd_kafka_conf_flags2str(dest,
2586 dest ? *dest_size : 0, ",",
2587 prop, ival);
2588 if (dest) {
2589 val_len = 0;
2590 val = dest;
2591 dest = NULL;
2592 }
2593 break;
2594 }
2595
2596 case _RK_C_PATLIST:
2597 {
2598 const rd_kafka_pattern_list_t **plist;
2599 plist = _RK_PTR(const rd_kafka_pattern_list_t **,
2600 conf, prop->offset);
2601 if (*plist)
2602 val = (*plist)->rkpl_orig;
2603 break;
2604 }
2605
2606 default:
2607 break;
2608 }
2609
2610 if (val_len) {
2611 *dest_size = val_len+1;
2612 return RD_KAFKA_CONF_OK;
2613 }
2614
2615 if (!val)
2616 return RD_KAFKA_CONF_INVALID;
2617
2618 val_len = strlen(val);
2619
2620 if (dest) {
2621 size_t use_len = RD_MIN(val_len, (*dest_size)-1);
2622 memcpy(dest, val, use_len);
2623 dest[use_len] = '\0';
2624 }
2625
2626 /* Return needed size */
2627 *dest_size = val_len+1;
2628
2629 return RD_KAFKA_CONF_OK;
2630}
2631
2632
2633static rd_kafka_conf_res_t rd_kafka_anyconf_get (int scope, const void *conf,
2634 const char *name,
2635 char *dest, size_t *dest_size){
2636 const struct rd_kafka_property *prop;
2637
2638 for (prop = rd_kafka_properties; prop->name ; prop++) {
2639
2640 if (!(prop->scope & scope) || strcmp(prop->name, name))
2641 continue;
2642
2643 if (prop->type == _RK_C_ALIAS)
2644 return rd_kafka_anyconf_get(scope, conf,
2645 prop->sdef,
2646 dest, dest_size);
2647
2648 if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) ==
2649 RD_KAFKA_CONF_OK)
2650 return RD_KAFKA_CONF_OK;
2651 }
2652
2653 return RD_KAFKA_CONF_UNKNOWN;
2654}
2655
2656rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
2657 const char *name,
2658 char *dest, size_t *dest_size) {
2659 return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size);
2660}
2661
2662rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
2663 const char *name,
2664 char *dest, size_t *dest_size) {
2665 rd_kafka_conf_res_t res;
2666 res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size);
2667 if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf)
2668 return res;
2669
2670 /* Fallthru:
2671 * If the global property was unknown, try getting it from the
2672 * default topic config, if any. */
2673 return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size);
2674}
2675
2676
2677static const char **rd_kafka_anyconf_dump (int scope, const void *conf,
2678 size_t *cntp) {
2679 const struct rd_kafka_property *prop;
2680 char **arr;
2681 int cnt = 0;
2682
2683 arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties)*2);
2684
2685 for (prop = rd_kafka_properties; prop->name ; prop++) {
2686 char *val = NULL;
2687 size_t val_size;
2688
2689 if (!(prop->scope & scope))
2690 continue;
2691
2692 /* Skip aliases, show original property instead.
2693 * Skip invalids. */
2694 if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
2695 continue;
2696
2697 /* Query value size */
2698 if (rd_kafka_anyconf_get0(conf, prop, NULL, &val_size) !=
2699 RD_KAFKA_CONF_OK)
2700 continue;
2701
2702 /* Get value */
2703 val = malloc(val_size);
2704 rd_kafka_anyconf_get0(conf, prop, val, &val_size);
2705
2706 arr[cnt++] = rd_strdup(prop->name);
2707 arr[cnt++] = val;
2708 }
2709
2710 *cntp = cnt;
2711
2712 return (const char **)arr;
2713}
2714
2715
2716const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) {
2717 return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp);
2718}
2719
2720const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf,
2721 size_t *cntp) {
2722 return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp);
2723}
2724
2725void rd_kafka_conf_dump_free (const char **arr, size_t cnt) {
2726 char **_arr = (char **)arr;
2727 unsigned int i;
2728
2729 for (i = 0 ; i < cnt ; i++)
2730 if (_arr[i])
2731 rd_free(_arr[i]);
2732
2733 rd_free(_arr);
2734}
2735
2736void rd_kafka_conf_properties_show (FILE *fp) {
2737 const struct rd_kafka_property *prop0;
2738 int last = 0;
2739 int j;
2740 char tmp[512];
2741 const char *dash80 = "----------------------------------------"
2742 "----------------------------------------";
2743
2744 for (prop0 = rd_kafka_properties; prop0->name ; prop0++) {
2745 const char *typeinfo = "";
2746 const char *importance;
2747 const struct rd_kafka_property *prop = prop0;
2748
2749 /* Skip hidden properties. */
2750 if (prop->scope & _RK_HIDDEN)
2751 continue;
2752
2753 /* Skip invalid properties. */
2754 if (prop->type == _RK_C_INVALID)
2755 continue;
2756
2757 if (!(prop->scope & last)) {
2758 fprintf(fp,
2759 "%s## %s configuration properties\n\n",
2760 last ? "\n\n":"",
2761 prop->scope == _RK_GLOBAL ? "Global": "Topic");
2762
2763 fprintf(fp,
2764 "%-40s | %3s | %-15s | %13s | %-10s | %-25s\n"
2765 "%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s-| -%.*s\n",
2766 "Property", "C/P", "Range",
2767 "Default", "Importance", "Description",
2768 40, dash80, 3, dash80, 15, dash80,
2769 13, dash80, 10, dash80, 25, dash80);
2770
2771 last = prop->scope & (_RK_GLOBAL|_RK_TOPIC);
2772
2773 }
2774
2775 fprintf(fp, "%-40s | ", prop->name);
2776
2777 /* For aliases, use the aliased property from here on
2778 * so that the alias property shows up with proper
2779 * ranges, defaults, etc. */
2780 if (prop->type == _RK_C_ALIAS) {
2781 prop = rd_kafka_conf_prop_find(prop->scope,
2782 prop->sdef);
2783 rd_assert(prop && *"BUG: "
2784 "alias points to unknown config property");
2785 }
2786
2787 fprintf(fp, "%3s | ",
2788 (!(prop->scope & _RK_PRODUCER) ==
2789 !(prop->scope & _RK_CONSUMER) ? " * " :
2790 ((prop->scope & _RK_PRODUCER) ? " P " :
2791 (prop->scope & _RK_CONSUMER) ? " C " : "")));
2792
2793 switch (prop->type)
2794 {
2795 case _RK_C_STR:
2796 case _RK_C_KSTR:
2797 typeinfo = "string";
2798 case _RK_C_PATLIST:
2799 if (prop->type == _RK_C_PATLIST)
2800 typeinfo = "pattern list";
2801 if (prop->s2i[0].str) {
2802 rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
2803 prop, -1);
2804 fprintf(fp, "%-15s | %13s",
2805 tmp, prop->sdef ? prop->sdef : "");
2806 } else {
2807 fprintf(fp, "%-15s | %13s",
2808 "", prop->sdef ? prop->sdef : "");
2809 }
2810 break;
2811 case _RK_C_BOOL:
2812 typeinfo = "boolean";
2813 fprintf(fp, "%-15s | %13s", "true, false",
2814 prop->vdef ? "true" : "false");
2815 break;
2816 case _RK_C_INT:
2817 typeinfo = "integer";
2818 rd_snprintf(tmp, sizeof(tmp),
2819 "%d .. %d", prop->vmin, prop->vmax);
2820 fprintf(fp, "%-15s | %13i", tmp, prop->vdef);
2821 break;
2822 case _RK_C_S2I:
2823 typeinfo = "enum value";
2824 rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
2825 prop, -1);
2826 fprintf(fp, "%-15s | ", tmp);
2827
2828 for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
2829 if (prop->s2i[j].val == prop->vdef) {
2830 fprintf(fp, "%13s", prop->s2i[j].str);
2831 break;
2832 }
2833 }
2834 if (j == RD_ARRAYSIZE(prop->s2i))
2835 fprintf(fp, "%13s", " ");
2836 break;
2837
2838 case _RK_C_S2F:
2839 typeinfo = "CSV flags";
2840 /* Dont duplicate builtin.features value in
2841 * both Range and Default */
2842 if (!strcmp(prop->name, "builtin.features"))
2843 *tmp = '\0';
2844 else
2845 rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
2846 prop, -1);
2847 fprintf(fp, "%-15s | ", tmp);
2848 rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
2849 prop, prop->vdef);
2850 fprintf(fp, "%13s", tmp);
2851
2852 break;
2853 case _RK_C_PTR:
2854 typeinfo = "pointer";
2855 /* FALLTHRU */
2856 default:
2857 fprintf(fp, "%-15s | %-13s", "", " ");
2858 break;
2859 }
2860
2861 if (prop->scope & _RK_HIGH)
2862 importance = "high";
2863 else if (prop->scope & _RK_MED)
2864 importance = "medium";
2865 else
2866 importance = "low";
2867
2868 fprintf(fp, " | %-10s | ", importance);
2869
2870 if (prop->scope & _RK_EXPERIMENTAL)
2871 fprintf(fp, "**EXPERIMENTAL**: "
2872 "subject to change or removal. ");
2873
2874 if (prop->scope & _RK_DEPRECATED)
2875 fprintf(fp, "**DEPRECATED** ");
2876
2877 /* If the original property is an alias, prefix the
2878 * description saying so. */
2879 if (prop0->type == _RK_C_ALIAS)
2880 fprintf(fp, "Alias for `%s`: ", prop0->sdef);
2881
2882 fprintf(fp, "%s <br>*Type: %s*\n", prop->desc,
2883 typeinfo);
2884 }
2885 fprintf(fp, "\n");
2886 fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n");
2887}
2888
2889
2890
2891
2892/**
2893 * @name Configuration value methods
2894 *
2895 * @remark This generic interface will eventually replace the config property
2896 * used above.
2897 * @{
2898 */
2899
2900
2901/**
2902 * @brief Set up an INT confval.
2903 *
2904 * @oaram name Property name, must be a const static string (will not be copied)
2905 */
2906void rd_kafka_confval_init_int (rd_kafka_confval_t *confval,
2907 const char *name,
2908 int vmin, int vmax, int vdef) {
2909 confval->name = name;
2910 confval->is_enabled = 1;
2911 confval->valuetype = RD_KAFKA_CONFVAL_INT;
2912 confval->u.INT.vmin = vmin;
2913 confval->u.INT.vmax = vmax;
2914 confval->u.INT.vdef = vdef;
2915 confval->u.INT.v = vdef;
2916}
2917
2918/**
2919 * @brief Set up a PTR confval.
2920 *
2921 * @oaram name Property name, must be a const static string (will not be copied)
2922 */
2923void rd_kafka_confval_init_ptr (rd_kafka_confval_t *confval,
2924 const char *name) {
2925 confval->name = name;
2926 confval->is_enabled = 1;
2927 confval->valuetype = RD_KAFKA_CONFVAL_PTR;
2928 confval->u.PTR = NULL;
2929}
2930
2931/**
2932 * @brief Set up but disable an intval, attempt to set this confval will fail.
2933 *
2934 * @oaram name Property name, must be a const static string (will not be copied)
2935 */
2936void rd_kafka_confval_disable (rd_kafka_confval_t *confval, const char *name) {
2937 confval->name = name;
2938 confval->is_enabled = 0;
2939}
2940
2941/**
2942 * @brief Set confval's value to \p valuep, verifying the passed
2943 * \p valuetype matches (or can be cast to) \p confval's type.
2944 *
2945 * @param dispname is the display name for the configuration value and is
2946 * included in error strings.
2947 * @param valuep is a pointer to the value, or NULL to revert to default.
2948 *
2949 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the new value was set, or
2950 * RD_KAFKA_RESP_ERR__INVALID_ARG if the value was of incorrect type,
2951 * out of range, or otherwise not a valid value.
2952 */
2953rd_kafka_resp_err_t
2954rd_kafka_confval_set_type (rd_kafka_confval_t *confval,
2955 rd_kafka_confval_type_t valuetype,
2956 const void *valuep,
2957 char *errstr, size_t errstr_size) {
2958
2959 if (!confval->is_enabled) {
2960 rd_snprintf(errstr, errstr_size,
2961 "\"%s\" is not supported for this operation",
2962 confval->name);
2963 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2964 }
2965
2966 switch (confval->valuetype)
2967 {
2968 case RD_KAFKA_CONFVAL_INT:
2969 {
2970 int v;
2971 const char *end;
2972
2973 if (!valuep) {
2974 /* Revert to default */
2975 confval->u.INT.v = confval->u.INT.vdef;
2976 confval->is_set = 0;
2977 return RD_KAFKA_RESP_ERR_NO_ERROR;
2978 }
2979
2980 switch (valuetype)
2981 {
2982 case RD_KAFKA_CONFVAL_INT:
2983 v = *(const int *)valuep;
2984 break;
2985 case RD_KAFKA_CONFVAL_STR:
2986 v = (int)strtol((const char *)valuep, (char **)&end, 0);
2987 if (end == (const char *)valuep) {
2988 rd_snprintf(errstr, errstr_size,
2989 "Invalid value type for \"%s\": "
2990 "expecting integer",
2991 confval->name);
2992 return RD_KAFKA_RESP_ERR__INVALID_TYPE;
2993 }
2994 default:
2995 rd_snprintf(errstr, errstr_size,
2996 "Invalid value type for \"%s\": "
2997 "expecting integer", confval->name);
2998 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2999 }
3000
3001
3002 if ((confval->u.INT.vmin || confval->u.INT.vmax) &&
3003 (v < confval->u.INT.vmin || v > confval->u.INT.vmax)) {
3004 rd_snprintf(errstr, errstr_size,
3005 "Invalid value type for \"%s\": "
3006 "expecting integer in range %d..%d",
3007 confval->name,
3008 confval->u.INT.vmin,
3009 confval->u.INT.vmax);
3010 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3011 }
3012
3013 confval->u.INT.v = v;
3014 confval->is_set = 1;
3015 }
3016 break;
3017
3018 case RD_KAFKA_CONFVAL_STR:
3019 {
3020 size_t vlen;
3021 const char *v = (const char *)valuep;
3022
3023 if (!valuep) {
3024 confval->is_set = 0;
3025 if (confval->u.STR.vdef)
3026 confval->u.STR.v = rd_strdup(confval->u.STR.
3027 vdef);
3028 else
3029 confval->u.STR.v = NULL;
3030 }
3031
3032 if (valuetype != RD_KAFKA_CONFVAL_STR) {
3033 rd_snprintf(errstr, errstr_size,
3034 "Invalid value type for \"%s\": "
3035 "expecting string", confval->name);
3036 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3037 }
3038
3039 vlen = strlen(v);
3040 if ((confval->u.STR.minlen || confval->u.STR.maxlen) &&
3041 (vlen < confval->u.STR.minlen ||
3042 vlen > confval->u.STR.maxlen)) {
3043 rd_snprintf(errstr, errstr_size,
3044 "Invalid value for \"%s\": "
3045 "expecting string with length "
3046 "%"PRIusz"..%"PRIusz,
3047 confval->name,
3048 confval->u.STR.minlen,
3049 confval->u.STR.maxlen);
3050 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3051 }
3052
3053 if (confval->u.STR.v)
3054 rd_free(confval->u.STR.v);
3055
3056 confval->u.STR.v = rd_strdup(v);
3057 }
3058 break;
3059
3060 case RD_KAFKA_CONFVAL_PTR:
3061 confval->u.PTR = (void *)valuep;
3062 break;
3063
3064 default:
3065 RD_NOTREACHED();
3066 return RD_KAFKA_RESP_ERR__NOENT;
3067 }
3068
3069 return RD_KAFKA_RESP_ERR_NO_ERROR;
3070}
3071
3072
3073int rd_kafka_confval_get_int (const rd_kafka_confval_t *confval) {
3074 rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_INT);
3075 return confval->u.INT.v;
3076}
3077
3078
3079const char *rd_kafka_confval_get_str (const rd_kafka_confval_t *confval) {
3080 rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_STR);
3081 return confval->u.STR.v;
3082}
3083
3084void *rd_kafka_confval_get_ptr (const rd_kafka_confval_t *confval) {
3085 rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_PTR);
3086 return confval->u.PTR;
3087}
3088
3089
3090/**
3091 * @brief Verify configuration \p conf is
3092 * correct/non-conflicting and finalize the configuration
3093 * settings for use.
3094 *
3095 * @returns an error string if configuration is incorrect, else NULL.
3096 */
3097const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype,
3098 rd_kafka_conf_t *conf) {
3099
3100 /* Verify mandatory configuration */
3101 if (!conf->socket_cb)
3102 return "Mandatory config property `socket_cb` not set";
3103
3104 if (!conf->open_cb)
3105 return "Mandatory config property `open_cb` not set";
3106
3107#if WITH_SSL
3108 if (conf->ssl.keystore_location && !conf->ssl.keystore_password)
3109 return "`ssl.keystore.password` is mandatory when "
3110 "`ssl.keystore.location` is set";
3111 if (conf->ssl.ca && conf->ssl.ca_location)
3112 return "`ssl.ca.location`, and memory-based "
3113 "set_ssl_cert(CERT_CA) are mutually exclusive.";
3114#endif
3115
3116#if WITH_SASL_OAUTHBEARER
3117 if (conf->sasl.enable_oauthbearer_unsecure_jwt &&
3118 conf->sasl.oauthbearer_token_refresh_cb)
3119 return "`enable.sasl.oauthbearer.unsecure.jwt` and "
3120 "`oauthbearer_token_refresh_cb` are mutually exclusive";
3121#endif
3122
3123 if (cltype == RD_KAFKA_CONSUMER) {
3124 /* Automatically adjust `fetch.max.bytes` to be >=
3125 * `message.max.bytes` unless set by user. */
3126 if (rd_kafka_conf_is_modified(conf, "fetch.max.bytes")) {
3127 if (conf->fetch_max_bytes < conf->max_msg_size)
3128 return "`fetch.max.bytes` must be >= "
3129 "`message.max.bytes`";
3130 } else {
3131 conf->fetch_max_bytes = RD_MAX(conf->fetch_max_bytes,
3132 conf->max_msg_size);
3133 }
3134
3135 /* Automatically adjust 'receive.message.max.bytes' to
3136 * be 512 bytes larger than 'fetch.max.bytes' to have enough
3137 * room for protocol framing (including topic name), unless
3138 * set by user. */
3139 if (rd_kafka_conf_is_modified(conf,
3140 "receive.message.max.bytes")) {
3141 if (conf->fetch_max_bytes + 512 >
3142 conf->recv_max_msg_size)
3143 return "`receive.message.max.bytes` must be >= "
3144 "`fetch.max.bytes` + 512";
3145 } else {
3146 conf->recv_max_msg_size =
3147 RD_MAX(conf->recv_max_msg_size,
3148 conf->fetch_max_bytes + 512);
3149 }
3150
3151 if (conf->max_poll_interval_ms <
3152 conf->group_session_timeout_ms)
3153 return "`max.poll.interval.ms`must be >= "
3154 "`session.timeout.ms`";
3155
3156 /* Simplifies rd_kafka_is_idempotent() which is producer-only */
3157 conf->eos.idempotence = 0;
3158
3159 } else if (cltype == RD_KAFKA_PRODUCER) {
3160 if (conf->eos.idempotence) {
3161 /* Adjust configuration values for idempotent producer*/
3162
3163 if (rd_kafka_conf_is_modified(conf, "max.in.flight")) {
3164 if (conf->max_inflight >
3165 RD_KAFKA_IDEMP_MAX_INFLIGHT)
3166 return "`max.in.flight` must be "
3167 "set <= "
3168 RD_KAFKA_IDEMP_MAX_INFLIGHT_STR
3169 " when `enable.idempotence` "
3170 "is true";
3171 } else {
3172 conf->max_inflight =
3173 RD_MIN(conf->max_inflight,
3174 RD_KAFKA_IDEMP_MAX_INFLIGHT);
3175 }
3176
3177
3178 if (rd_kafka_conf_is_modified(conf, "retries")) {
3179 if (conf->max_retries < 1)
3180 return "`retries` must be set >= 1 "
3181 "when `enable.idempotence` is "
3182 "true";
3183 } else {
3184 conf->max_retries = INT32_MAX;
3185 }
3186
3187
3188 if (rd_kafka_conf_is_modified(
3189 conf,
3190 "queue.buffering.backpressure.threshold")
3191 && conf->queue_backpressure_thres > 1)
3192 return "`queue.buffering.backpressure.threshold` "
3193 "must be set to 1 when "
3194 "`enable.idempotence` is true";
3195 else
3196 conf->queue_backpressure_thres = 1;
3197
3198 /* acks=all and queuing.strategy are set
3199 * in topic_conf_finalize() */
3200
3201 } else {
3202 if (conf->eos.gapless &&
3203 rd_kafka_conf_is_modified(
3204 conf, "enable.gapless.guarantee"))
3205 return "`enable.gapless.guarantee` requires "
3206 "`enable.idempotence` to be enabled";
3207 }
3208 }
3209
3210
3211 if (!rd_kafka_conf_is_modified(conf, "metadata.max.age.ms") &&
3212 conf->metadata_refresh_interval_ms > 0)
3213 conf->metadata_max_age_ms =
3214 conf->metadata_refresh_interval_ms * 3;
3215
3216 if (conf->reconnect_backoff_max_ms < conf->reconnect_backoff_ms)
3217 return "`reconnect.backoff.max.ms` must be >= "
3218 "`reconnect.max.ms`";
3219
3220 if (conf->sparse_connections) {
3221 /* Set sparse connection random selection interval to
3222 * 10 < reconnect.backoff.ms / 2 < 1000. */
3223 conf->sparse_connect_intvl =
3224 RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms/2, 1000));
3225 }
3226
3227 /* Finalize and verify the default.topic.config */
3228 if (conf->topic_conf)
3229 return rd_kafka_topic_conf_finalize(cltype, conf,
3230 conf->topic_conf);
3231
3232 return NULL;
3233}
3234
3235
3236/**
3237 * @brief Verify topic configuration \p tconf is
3238 * correct/non-conflicting and finalize the configuration
3239 * settings for use.
3240 *
3241 * @returns an error string if configuration is incorrect, else NULL.
3242 */
3243const char *rd_kafka_topic_conf_finalize (rd_kafka_type_t cltype,
3244 const rd_kafka_conf_t *conf,
3245 rd_kafka_topic_conf_t *tconf) {
3246
3247 if (conf->eos.idempotence) {
3248 /* Ensure acks=all */
3249
3250 if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
3251 if (tconf->required_acks != -1)
3252 return "`acks` must be set to `all` when "
3253 "`enable.idempotence` is true";
3254 } else {
3255 tconf->required_acks = -1; /* all */
3256 }
3257
3258 /* Ensure FIFO queueing */
3259 if (rd_kafka_topic_conf_is_modified(tconf, "queuing.strategy")) {
3260 if (tconf->queuing_strategy != RD_KAFKA_QUEUE_FIFO)
3261 return "`queuing.strategy` must be set to "
3262 "`fifo` when `enable.idempotence` is "
3263 "true";
3264 } else {
3265 tconf->queuing_strategy = RD_KAFKA_QUEUE_FIFO;
3266 }
3267 }
3268
3269
3270 if (cltype == RD_KAFKA_PRODUCER) {
3271 if (tconf->message_timeout_ms <= conf->buffering_max_ms)
3272 return "`message.timeout.ms` must be greater than "
3273 "`linger.ms`";
3274 }
3275
3276
3277 return NULL;
3278}
3279
3280
3281/**
3282 * @brief Log warnings for set deprecated or experimental
3283 * configuration properties.
3284 * @returns the number of warnings logged.
3285 */
3286static int rd_kafka_anyconf_warn_deprecated (rd_kafka_t *rk,
3287 rd_kafka_conf_scope_t scope,
3288 const void *conf) {
3289 const struct rd_kafka_property *prop;
3290 const int warn_on = _RK_DEPRECATED|_RK_EXPERIMENTAL;
3291 int cnt = 0;
3292
3293
3294 for (prop = rd_kafka_properties; prop->name ; prop++) {
3295 int match = prop->scope & warn_on;
3296
3297 if (likely(!(prop->scope & scope) || !match))
3298 continue;
3299
3300 if (likely(!rd_kafka_anyconf_is_modified(conf, prop)))
3301 continue;
3302
3303 rd_kafka_log(rk, LOG_WARNING, "CONFWARN",
3304 "Configuration property %s is %s%s%s: %s",
3305 prop->name,
3306 match & _RK_DEPRECATED ? "deprecated" : "",
3307 match == warn_on ? " and " : "",
3308 match & _RK_EXPERIMENTAL ? "experimental" : "",
3309 prop->desc);
3310 cnt++;
3311 }
3312
3313 return cnt;
3314}
3315
3316
3317/**
3318 * @brief Log configuration warnings (deprecated configuration properties,
3319 * unrecommended combinations, etc).
3320 *
3321 * @returns the number of warnings logged.
3322 *
3323 * @locality any
3324 * @locks none
3325 */
3326int rd_kafka_conf_warn (rd_kafka_t *rk) {
3327 int cnt = 0;
3328
3329 cnt = rd_kafka_anyconf_warn_deprecated(rk, _RK_GLOBAL, &rk->rk_conf);
3330 if (rk->rk_conf.topic_conf)
3331 cnt += rd_kafka_anyconf_warn_deprecated(
3332 rk, _RK_TOPIC, rk->rk_conf.topic_conf);
3333
3334 /* Additional warnings */
3335 if (rk->rk_type == RD_KAFKA_CONSUMER) {
3336 if (rk->rk_conf.fetch_wait_max_ms + 1000 >
3337 rk->rk_conf.socket_timeout_ms)
3338 rd_kafka_log(rk, LOG_WARNING,
3339 "CONFWARN",
3340 "Configuration property "
3341 "`fetch.wait.max.ms` (%d) should be "
3342 "set lower than `socket.timeout.ms` (%d) "
3343 "by at least 1000ms to avoid blocking "
3344 "and timing out sub-sequent requests",
3345 rk->rk_conf.fetch_wait_max_ms,
3346 rk->rk_conf.socket_timeout_ms);
3347 }
3348
3349 return cnt;
3350}
3351
3352
3353const rd_kafka_conf_t *rd_kafka_conf (rd_kafka_t *rk) {
3354 return &rk->rk_conf;
3355}
3356
3357
3358/**
3359 * @brief Unittests
3360 */
3361int unittest_conf (void) {
3362 rd_kafka_conf_t *conf;
3363 rd_kafka_topic_conf_t *tconf;
3364 rd_kafka_conf_res_t res;
3365 char errstr[128];
3366 int iteration;
3367 const struct rd_kafka_property *prop;
3368
3369 conf = rd_kafka_conf_new();
3370 tconf = rd_kafka_topic_conf_new();
3371
3372 res = rd_kafka_conf_set(conf, "unknown.thing", "foo",
3373 errstr, sizeof(errstr));
3374 RD_UT_ASSERT(res == RD_KAFKA_CONF_UNKNOWN, "fail");
3375 RD_UT_ASSERT(*errstr, "fail");
3376
3377 for (iteration = 0 ; iteration < 5 ; iteration++) {
3378 int cnt;
3379
3380
3381 /* Iterations:
3382 * 0 - Check is_modified
3383 * 1 - Set every other config property, read back and verify.
3384 * 2 - Check is_modified.
3385 * 3 - Set all config properties, read back and verify.
3386 * 4 - Check is_modified. */
3387 for (prop = rd_kafka_properties, cnt = 0 ; prop->name ;
3388 prop++, cnt++) {
3389 const char *val;
3390 char tmp[64];
3391 int odd = cnt & 1;
3392 int do_set = iteration == 3 || (iteration == 1 && odd);
3393 char readval[512];
3394 size_t readlen = sizeof(readval);
3395 rd_kafka_conf_res_t res2;
3396 rd_bool_t is_modified;
3397 int exp_is_modified = iteration >= 3 ||
3398 (iteration > 0 && (do_set || odd));
3399
3400 /* Avoid some special configs */
3401 if (!strcmp(prop->name, "plugin.library.paths") ||
3402 !strcmp(prop->name, "builtin.features"))
3403 continue;
3404
3405 switch (prop->type)
3406 {
3407 case _RK_C_STR:
3408 case _RK_C_KSTR:
3409 case _RK_C_PATLIST:
3410 if (prop->sdef)
3411 val = prop->sdef;
3412 else
3413 val = "test";
3414 break;
3415
3416 case _RK_C_BOOL:
3417 val = "true";
3418 break;
3419
3420 case _RK_C_INT:
3421 rd_snprintf(tmp, sizeof(tmp), "%d", prop->vdef);
3422 val = tmp;
3423 break;
3424
3425 case _RK_C_S2F:
3426 case _RK_C_S2I:
3427 val = prop->s2i[0].str;
3428 break;
3429
3430 case _RK_C_PTR:
3431 case _RK_C_ALIAS:
3432 case _RK_C_INVALID:
3433 case _RK_C_INTERNAL:
3434 default:
3435 continue;
3436 }
3437
3438
3439 if (prop->scope & _RK_GLOBAL) {
3440 if (do_set)
3441 res = rd_kafka_conf_set(conf,
3442 prop->name, val,
3443 errstr,
3444 sizeof(errstr));
3445
3446 res2 = rd_kafka_conf_get(conf,
3447 prop->name,
3448 readval, &readlen);
3449
3450 is_modified = rd_kafka_conf_is_modified(
3451 conf, prop->name);
3452
3453
3454 } else if (prop->scope & _RK_TOPIC) {
3455 if (do_set)
3456 res = rd_kafka_topic_conf_set(
3457 tconf,
3458 prop->name, val,
3459 errstr, sizeof(errstr));
3460
3461 res2 = rd_kafka_topic_conf_get(tconf,
3462 prop->name,
3463 readval,
3464 &readlen);
3465
3466 is_modified = rd_kafka_topic_conf_is_modified(
3467 tconf, prop->name);
3468
3469 } else {
3470 RD_NOTREACHED();
3471 }
3472
3473
3474
3475 if (do_set) {
3476 RD_UT_ASSERT(res == RD_KAFKA_CONF_OK,
3477 "conf_set %s failed: %d: %s",
3478 prop->name, res, errstr);
3479 RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK,
3480 "conf_get %s failed: %d",
3481 prop->name, res2);
3482
3483 RD_UT_ASSERT(!strcmp(readval, val),
3484 "conf_get %s "
3485 "returned \"%s\": "
3486 "expected \"%s\"",
3487 prop->name, readval, val);
3488
3489 RD_UT_ASSERT(is_modified,
3490 "Property %s was set but "
3491 "is_modified=%d",
3492 prop->name, is_modified);
3493
3494 }
3495
3496 assert(is_modified == exp_is_modified);
3497 RD_UT_ASSERT(is_modified == exp_is_modified,
3498 "Property %s is_modified=%d, "
3499 "exp_is_modified=%d "
3500 "(iter %d, odd %d, do_set %d)",
3501 prop->name, is_modified,
3502 exp_is_modified,
3503 iteration, odd, do_set);
3504 }
3505 }
3506
3507 /* Set an alias and make sure is_modified() works for it. */
3508 res = rd_kafka_conf_set(conf, "max.in.flight", "19", NULL, 0);
3509 RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d", res);
3510
3511 RD_UT_ASSERT(rd_kafka_conf_is_modified(conf, "max.in.flight") == rd_true,
3512 "fail");
3513 RD_UT_ASSERT(rd_kafka_conf_is_modified(
3514 conf,
3515 "max.in.flight.requests.per.connection") == rd_true,
3516 "fail");
3517
3518 rd_kafka_conf_destroy(conf);
3519 rd_kafka_topic_conf_destroy(tconf);
3520
3521 RD_UT_PASS();
3522}
3523
3524/**@}*/
3525