1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include "rdkafka_int.h" |
30 | #include "rd.h" |
31 | |
32 | #include <stdlib.h> |
33 | #include <ctype.h> |
34 | #include <stddef.h> |
35 | |
36 | #include "rdkafka_int.h" |
37 | #include "rdkafka_feature.h" |
38 | #include "rdkafka_interceptor.h" |
39 | #include "rdkafka_idempotence.h" |
40 | #include "rdkafka_sasl_oauthbearer.h" |
41 | #if WITH_PLUGINS |
42 | #include "rdkafka_plugin.h" |
43 | #endif |
44 | #include "rdunittest.h" |
45 | |
46 | #ifndef _MSC_VER |
47 | #include <netinet/tcp.h> |
48 | #else |
49 | |
50 | #ifndef WIN32_MEAN_AND_LEAN |
51 | #define WIN32_MEAN_AND_LEAN |
52 | #endif |
53 | #include <windows.h> |
54 | #endif |
55 | |
56 | struct rd_kafka_property { |
57 | rd_kafka_conf_scope_t scope; |
58 | const char *name; |
59 | enum { |
60 | _RK_C_STR, |
61 | _RK_C_INT, |
62 | _RK_C_S2I, /* String to Integer mapping. |
63 | * Supports limited canonical str->int mappings |
64 | * using s2i[] */ |
65 | _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */ |
66 | _RK_C_BOOL, |
67 | _RK_C_PTR, /* Only settable through special set functions */ |
68 | _RK_C_PATLIST, /* Pattern list */ |
69 | _RK_C_KSTR, /* Kafka string */ |
70 | _RK_C_ALIAS, /* Alias: points to other property through .sdef */ |
71 | _RK_C_INTERNAL, /* Internal, don't expose to application */ |
72 | _RK_C_INVALID, /* Invalid property, used to catch known |
73 | * but unsupported Java properties. */ |
74 | } type; |
75 | int offset; |
76 | const char *desc; |
77 | int vmin; |
78 | int vmax; |
79 | int vdef; /* Default value (int) */ |
80 | const char *sdef; /* Default value (string) */ |
81 | void *pdef; /* Default value (pointer) */ |
82 | struct { |
83 | int val; |
84 | const char *str; |
85 | } s2i[20]; /* _RK_C_S2I and _RK_C_S2F */ |
86 | |
87 | /* Value validator (STR) */ |
88 | int (*validate) (const struct rd_kafka_property *prop, |
89 | const char *val, int ival); |
90 | |
91 | /* Configuration object constructors and destructor for use when |
92 | * the property value itself is not used, or needs extra care. */ |
93 | void (*ctor) (int scope, void *pconf); |
94 | void (*dtor) (int scope, void *pconf); |
95 | void (*copy) (int scope, void *pdst, const void *psrc, |
96 | void *dstptr, const void *srcptr, |
97 | size_t filter_cnt, const char **filter); |
98 | |
99 | rd_kafka_conf_res_t (*set) (int scope, void *pconf, |
100 | const char *name, const char *value, |
101 | void *dstptr, |
102 | rd_kafka_conf_set_mode_t set_mode, |
103 | char *errstr, size_t errstr_size); |
104 | }; |
105 | |
106 | |
107 | #define _RK(field) offsetof(rd_kafka_conf_t, field) |
108 | #define _RKT(field) offsetof(rd_kafka_topic_conf_t, field) |
109 | |
110 | |
111 | static rd_kafka_conf_res_t |
112 | rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop, |
113 | char *dest, size_t *dest_size); |
114 | |
115 | |
116 | |
117 | |
118 | /** |
119 | * @returns a unique index for property \p prop, using the byte position |
120 | * of the field. |
121 | */ |
122 | static RD_INLINE int rd_kafka_prop2idx (const struct rd_kafka_property *prop) { |
123 | return prop->offset; |
124 | } |
125 | |
126 | |
127 | |
128 | /** |
129 | * @brief Set the property as modified. |
130 | * |
131 | * We do this by mapping the property's conf struct field byte offset |
132 | * to a bit in a bit vector. |
133 | * If the bit is set the property has been modified, otherwise it is |
134 | * at its default unmodified value. |
135 | * |
136 | * \p is_modified 1: set as modified, 0: clear modified |
137 | */ |
138 | static void rd_kafka_anyconf_set_modified (void *conf, |
139 | const struct rd_kafka_property *prop, |
140 | int is_modified) { |
141 | int idx = rd_kafka_prop2idx(prop); |
142 | int bkt = idx / 64; |
143 | uint64_t bit = (uint64_t)1 << (idx % 64); |
144 | struct rd_kafka_anyconf_hdr *confhdr = conf; |
145 | |
146 | rd_assert(idx < RD_KAFKA_CONF_PROPS_IDX_MAX && |
147 | *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX" ); |
148 | |
149 | if (is_modified) |
150 | confhdr->modified[bkt] |= bit; |
151 | else |
152 | confhdr->modified[bkt] &= ~bit; |
153 | } |
154 | |
155 | /** |
156 | * @brief Clear is_modified for all properties. |
157 | * @warning Does NOT clear/reset the value. |
158 | */ |
159 | static void rd_kafka_anyconf_clear_all_is_modified (void *conf) { |
160 | struct rd_kafka_anyconf_hdr *confhdr = conf; |
161 | |
162 | memset(confhdr, 0, sizeof(*confhdr)); |
163 | } |
164 | |
165 | |
166 | /** |
167 | * @returns true of the property has been set/modified, else false. |
168 | */ |
169 | static rd_bool_t |
170 | rd_kafka_anyconf_is_modified (const void *conf, |
171 | const struct rd_kafka_property *prop) { |
172 | int idx = rd_kafka_prop2idx(prop); |
173 | int bkt = idx / 64; |
174 | uint64_t bit = (uint64_t)1 << (idx % 64); |
175 | const struct rd_kafka_anyconf_hdr *confhdr = conf; |
176 | |
177 | return !!(confhdr->modified[bkt] & bit); |
178 | } |
179 | |
180 | |
181 | |
182 | /** |
183 | * @brief Validate \p broker.version.fallback property. |
184 | */ |
185 | static int |
186 | rd_kafka_conf_validate_broker_version (const struct rd_kafka_property *prop, |
187 | const char *val, int ival) { |
188 | struct rd_kafka_ApiVersion *apis; |
189 | size_t api_cnt; |
190 | return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL); |
191 | } |
192 | |
193 | /** |
194 | * @brief Validate that string is a single item, without delimters (, space). |
195 | */ |
196 | static RD_UNUSED int |
197 | rd_kafka_conf_validate_single (const struct rd_kafka_property *prop, |
198 | const char *val, int ival) { |
199 | return !strchr(val, ',') && !strchr(val, ' '); |
200 | } |
201 | |
202 | /** |
203 | * @brief Validate builtin partitioner string |
204 | */ |
205 | static RD_UNUSED int |
206 | rd_kafka_conf_validate_partitioner (const struct rd_kafka_property *prop, |
207 | const char *val, int ival) { |
208 | return !strcmp(val, "random" ) || |
209 | !strcmp(val, "consistent" ) || |
210 | !strcmp(val, "consistent_random" ) || |
211 | !strcmp(val, "murmur2" ) || |
212 | !strcmp(val, "murmur2_random" ); |
213 | } |
214 | |
215 | |
216 | /** |
217 | * librdkafka configuration property definitions. |
218 | */ |
219 | static const struct rd_kafka_property rd_kafka_properties[] = { |
220 | /* Global properties */ |
221 | { _RK_GLOBAL, "builtin.features" , _RK_C_S2F, _RK(builtin_features), |
222 | "Indicates the builtin features for this build of librdkafka. " |
223 | "An application can either query this value or attempt to set it " |
224 | "with its list of required features to check for library support." , |
225 | 0, 0x7fffffff, 0xffff, |
226 | .s2i = { |
227 | #if WITH_ZLIB |
228 | { 0x1, "gzip" }, |
229 | #endif |
230 | #if WITH_SNAPPY |
231 | { 0x2, "snappy" }, |
232 | #endif |
233 | #if WITH_SSL |
234 | { 0x4, "ssl" }, |
235 | #endif |
236 | { 0x8, "sasl" }, |
237 | { 0x10, "regex" }, |
238 | { 0x20, "lz4" }, |
239 | #if defined(_MSC_VER) || WITH_SASL_CYRUS |
240 | { 0x40, "sasl_gssapi" }, |
241 | #endif |
242 | { 0x80, "sasl_plain" }, |
243 | #if WITH_SASL_SCRAM |
244 | { 0x100, "sasl_scram" }, |
245 | #endif |
246 | #if WITH_PLUGINS |
247 | { 0x200, "plugins" }, |
248 | #endif |
249 | #if WITH_ZSTD |
250 | { 0x400, "zstd" }, |
251 | #endif |
252 | #if WITH_SASL_OAUTHBEARER |
253 | { 0x800, "sasl_oauthbearer" }, |
254 | #endif |
255 | { 0, NULL } |
256 | } |
257 | }, |
258 | { _RK_GLOBAL, "client.id" , _RK_C_STR, _RK(client_id_str), |
259 | "Client identifier." , |
260 | .sdef = "rdkafka" }, |
261 | { _RK_GLOBAL|_RK_HIGH, "metadata.broker.list" , _RK_C_STR, |
262 | _RK(brokerlist), |
263 | "Initial list of brokers as a CSV list of broker host or host:port. " |
264 | "The application may also use `rd_kafka_brokers_add()` to add " |
265 | "brokers during runtime." }, |
266 | { _RK_GLOBAL|_RK_HIGH, "bootstrap.servers" , _RK_C_ALIAS, 0, |
267 | "See metadata.broker.list" , |
268 | .sdef = "metadata.broker.list" }, |
269 | { _RK_GLOBAL|_RK_MED, "message.max.bytes" , _RK_C_INT, _RK(max_msg_size), |
270 | "Maximum Kafka protocol request message size." , |
271 | 1000, 1000000000, 1000000 }, |
272 | { _RK_GLOBAL, "message.copy.max.bytes" , _RK_C_INT, |
273 | _RK(msg_copy_max_size), |
274 | "Maximum size for message to be copied to buffer. " |
275 | "Messages larger than this will be passed by reference (zero-copy) " |
276 | "at the expense of larger iovecs." , |
277 | 0, 1000000000, 0xffff }, |
278 | { _RK_GLOBAL|_RK_MED, "receive.message.max.bytes" , _RK_C_INT, |
279 | _RK(recv_max_msg_size), |
280 | "Maximum Kafka protocol response message size. " |
281 | "This serves as a safety precaution to avoid memory exhaustion in " |
282 | "case of protocol hickups. " |
283 | "This value must be at least `fetch.max.bytes` + 512 to allow " |
284 | "for protocol overhead; the value is adjusted automatically " |
285 | "unless the configuration property is explicitly set." , |
286 | 1000, INT_MAX, 100000000 }, |
287 | { _RK_GLOBAL, "max.in.flight.requests.per.connection" , _RK_C_INT, |
288 | _RK(max_inflight), |
289 | "Maximum number of in-flight requests per broker connection. " |
290 | "This is a generic property applied to all broker communication, " |
291 | "however it is primarily relevant to produce requests. " |
292 | "In particular, note that other mechanisms limit the number " |
293 | "of outstanding consumer fetch request per broker to one." , |
294 | 1, 1000000, 1000000 }, |
295 | { _RK_GLOBAL, "max.in.flight" , _RK_C_ALIAS, |
296 | .sdef = "max.in.flight.requests.per.connection" }, |
297 | { _RK_GLOBAL, "metadata.request.timeout.ms" , _RK_C_INT, |
298 | _RK(metadata_request_timeout_ms), |
299 | "Non-topic request timeout in milliseconds. " |
300 | "This is for metadata requests, etc." , |
301 | 10, 900*1000, 60*1000}, |
302 | { _RK_GLOBAL, "topic.metadata.refresh.interval.ms" , _RK_C_INT, |
303 | _RK(metadata_refresh_interval_ms), |
304 | "Topic metadata refresh interval in milliseconds. " |
305 | "The metadata is automatically refreshed on error and connect. " |
306 | "Use -1 to disable the intervalled refresh." , |
307 | -1, 3600*1000, 5*60*1000 }, |
308 | { _RK_GLOBAL, "metadata.max.age.ms" , _RK_C_INT, |
309 | _RK(metadata_max_age_ms), |
310 | "Metadata cache max age. " |
311 | "Defaults to topic.metadata.refresh.interval.ms * 3" , |
312 | 1, 24*3600*1000, 5*60*1000 * 3 }, |
313 | { _RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms" , _RK_C_INT, |
314 | _RK(metadata_refresh_fast_interval_ms), |
315 | "When a topic loses its leader a new metadata request will be " |
316 | "enqueued with this initial interval, exponentially increasing " |
317 | "until the topic metadata has been refreshed. " |
318 | "This is used to recover quickly from transitioning leader brokers." , |
319 | 1, 60*1000, 250 }, |
320 | { _RK_GLOBAL|_RK_DEPRECATED, |
321 | "topic.metadata.refresh.fast.cnt" , _RK_C_INT, |
322 | _RK(metadata_refresh_fast_cnt), |
323 | "No longer used." , |
324 | 0, 1000, 10 }, |
325 | { _RK_GLOBAL, "topic.metadata.refresh.sparse" , _RK_C_BOOL, |
326 | _RK(metadata_refresh_sparse), |
327 | "Sparse metadata requests (consumes less network bandwidth)" , |
328 | 0, 1, 1 }, |
329 | { _RK_GLOBAL, "topic.blacklist" , _RK_C_PATLIST, |
330 | _RK(topic_blacklist), |
331 | "Topic blacklist, a comma-separated list of regular expressions " |
332 | "for matching topic names that should be ignored in " |
333 | "broker metadata information as if the topics did not exist." }, |
334 | { _RK_GLOBAL|_RK_MED, "debug" , _RK_C_S2F, _RK(debug), |
335 | "A comma-separated list of debug contexts to enable. " |
336 | "Detailed Producer debugging: broker,topic,msg. " |
337 | "Consumer: consumer,cgrp,topic,fetch" , |
338 | .s2i = { |
339 | { RD_KAFKA_DBG_GENERIC, "generic" }, |
340 | { RD_KAFKA_DBG_BROKER, "broker" }, |
341 | { RD_KAFKA_DBG_TOPIC, "topic" }, |
342 | { RD_KAFKA_DBG_METADATA, "metadata" }, |
343 | { RD_KAFKA_DBG_FEATURE, "feature" }, |
344 | { RD_KAFKA_DBG_QUEUE, "queue" }, |
345 | { RD_KAFKA_DBG_MSG, "msg" }, |
346 | { RD_KAFKA_DBG_PROTOCOL, "protocol" }, |
347 | { RD_KAFKA_DBG_CGRP, "cgrp" }, |
348 | { RD_KAFKA_DBG_SECURITY, "security" }, |
349 | { RD_KAFKA_DBG_FETCH, "fetch" }, |
350 | { RD_KAFKA_DBG_INTERCEPTOR, "interceptor" }, |
351 | { RD_KAFKA_DBG_PLUGIN, "plugin" }, |
352 | { RD_KAFKA_DBG_CONSUMER, "consumer" }, |
353 | { RD_KAFKA_DBG_ADMIN, "admin" }, |
354 | { RD_KAFKA_DBG_EOS, "eos" }, |
355 | { RD_KAFKA_DBG_ALL, "all" } |
356 | } }, |
357 | { _RK_GLOBAL, "socket.timeout.ms" , _RK_C_INT, _RK(socket_timeout_ms), |
358 | "Default timeout for network requests. " |
359 | "Producer: ProduceRequests will use the lesser value of " |
360 | "`socket.timeout.ms` and remaining `message.timeout.ms` for the " |
361 | "first message in the batch. " |
362 | "Consumer: FetchRequests will use " |
363 | "`fetch.wait.max.ms` + `socket.timeout.ms`. " |
364 | "Admin: Admin requests will use `socket.timeout.ms` or explicitly " |
365 | "set `rd_kafka_AdminOptions_set_operation_timeout()` value." , |
366 | 10, 300*1000, 60*1000 }, |
367 | { _RK_GLOBAL|_RK_DEPRECATED, "socket.blocking.max.ms" , _RK_C_INT, |
368 | _RK(socket_blocking_max_ms), |
369 | "No longer used." , |
370 | 1, 60*1000, 1000 }, |
371 | { _RK_GLOBAL, "socket.send.buffer.bytes" , _RK_C_INT, |
372 | _RK(socket_sndbuf_size), |
373 | "Broker socket send buffer size. System default is used if 0." , |
374 | 0, 100000000, 0 }, |
375 | { _RK_GLOBAL, "socket.receive.buffer.bytes" , _RK_C_INT, |
376 | _RK(socket_rcvbuf_size), |
377 | "Broker socket receive buffer size. System default is used if 0." , |
378 | 0, 100000000, 0 }, |
379 | #ifdef SO_KEEPALIVE |
380 | { _RK_GLOBAL, "socket.keepalive.enable" , _RK_C_BOOL, |
381 | _RK(socket_keepalive), |
382 | "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets" , |
383 | 0, 1, 0 }, |
384 | #endif |
385 | #ifdef TCP_NODELAY |
386 | { _RK_GLOBAL, "socket.nagle.disable" , _RK_C_BOOL, |
387 | _RK(socket_nagle_disable), |
388 | "Disable the Nagle algorithm (TCP_NODELAY) on broker sockets." , |
389 | 0, 1, 0 }, |
390 | #endif |
391 | { _RK_GLOBAL, "socket.max.fails" , _RK_C_INT, |
392 | _RK(socket_max_fails), |
393 | "Disconnect from broker when this number of send failures " |
394 | "(e.g., timed out requests) is reached. Disable with 0. " |
395 | "WARNING: It is highly recommended to leave this setting at " |
396 | "its default value of 1 to avoid the client and broker to " |
397 | "become desynchronized in case of request timeouts. " |
398 | "NOTE: The connection is automatically re-established." , |
399 | 0, 1000000, 1 }, |
400 | { _RK_GLOBAL, "broker.address.ttl" , _RK_C_INT, |
401 | _RK(broker_addr_ttl), |
402 | "How long to cache the broker address resolving " |
403 | "results (milliseconds)." , |
404 | 0, 86400*1000, 1*1000 }, |
405 | { _RK_GLOBAL, "broker.address.family" , _RK_C_S2I, |
406 | _RK(broker_addr_family), |
407 | "Allowed broker IP address families: any, v4, v6" , |
408 | .vdef = AF_UNSPEC, |
409 | .s2i = { |
410 | { AF_UNSPEC, "any" }, |
411 | { AF_INET, "v4" }, |
412 | { AF_INET6, "v6" }, |
413 | } }, |
414 | { _RK_GLOBAL|_RK_MED|_RK_HIDDEN, "enable.sparse.connections" , |
415 | _RK_C_BOOL, |
416 | _RK(sparse_connections), |
417 | "When enabled the client will only connect to brokers " |
418 | "it needs to communicate with. When disabled the client " |
419 | "will maintain connections to all brokers in the cluster." , |
420 | 0, 1, 1 }, |
421 | { _RK_GLOBAL|_RK_DEPRECATED, "reconnect.backoff.jitter.ms" , _RK_C_INT, |
422 | _RK(reconnect_jitter_ms), |
423 | "No longer used. See `reconnect.backoff.ms` and " |
424 | "`reconnect.backoff.max.ms`." , |
425 | 0, 60*60*1000, 0 }, |
426 | { _RK_GLOBAL|_RK_MED, "reconnect.backoff.ms" , _RK_C_INT, |
427 | _RK(reconnect_backoff_ms), |
428 | "The initial time to wait before reconnecting to a broker " |
429 | "after the connection has been closed. " |
430 | "The time is increased exponentially until " |
431 | "`reconnect.backoff.max.ms` is reached. " |
432 | "-25% to +50% jitter is applied to each reconnect backoff. " |
433 | "A value of 0 disables the backoff and reconnects immediately." , |
434 | 0, 60*60*1000, 100 }, |
435 | { _RK_GLOBAL|_RK_MED, "reconnect.backoff.max.ms" , _RK_C_INT, |
436 | _RK(reconnect_backoff_max_ms), |
437 | "The maximum time to wait before reconnecting to a broker " |
438 | "after the connection has been closed." , |
439 | 0, 60*60*1000, 10*1000 }, |
440 | { _RK_GLOBAL|_RK_HIGH, "statistics.interval.ms" , _RK_C_INT, |
441 | _RK(stats_interval_ms), |
442 | "librdkafka statistics emit interval. The application also needs to " |
443 | "register a stats callback using `rd_kafka_conf_set_stats_cb()`. " |
444 | "The granularity is 1000ms. A value of 0 disables statistics." , |
445 | 0, 86400*1000, 0 }, |
446 | { _RK_GLOBAL, "enabled_events" , _RK_C_INT, |
447 | _RK(enabled_events), |
448 | "See `rd_kafka_conf_set_events()`" , |
449 | 0, 0x7fffffff, 0 }, |
450 | { _RK_GLOBAL, "error_cb" , _RK_C_PTR, |
451 | _RK(error_cb), |
452 | "Error callback (set with rd_kafka_conf_set_error_cb())" }, |
453 | { _RK_GLOBAL, "throttle_cb" , _RK_C_PTR, |
454 | _RK(throttle_cb), |
455 | "Throttle callback (set with rd_kafka_conf_set_throttle_cb())" }, |
456 | { _RK_GLOBAL, "stats_cb" , _RK_C_PTR, |
457 | _RK(stats_cb), |
458 | "Statistics callback (set with rd_kafka_conf_set_stats_cb())" }, |
459 | { _RK_GLOBAL, "log_cb" , _RK_C_PTR, |
460 | _RK(log_cb), |
461 | "Log callback (set with rd_kafka_conf_set_log_cb())" , |
462 | .pdef = rd_kafka_log_print }, |
463 | { _RK_GLOBAL, "log_level" , _RK_C_INT, |
464 | _RK(log_level), |
465 | "Logging level (syslog(3) levels)" , |
466 | 0, 7, 6 }, |
467 | { _RK_GLOBAL, "log.queue" , _RK_C_BOOL, _RK(log_queue), |
468 | "Disable spontaneous log_cb from internal librdkafka " |
469 | "threads, instead enqueue log messages on queue set with " |
470 | "`rd_kafka_set_log_queue()` and serve log callbacks or " |
471 | "events through the standard poll APIs. " |
472 | "**NOTE**: Log messages will linger in a temporary queue " |
473 | "until the log queue has been set." , |
474 | 0, 1, 0 }, |
475 | { _RK_GLOBAL, "log.thread.name" , _RK_C_BOOL, |
476 | _RK(log_thread_name), |
477 | "Print internal thread name in log messages " |
478 | "(useful for debugging librdkafka internals)" , |
479 | 0, 1, 1 }, |
480 | { _RK_GLOBAL, "log.connection.close" , _RK_C_BOOL, |
481 | _RK(log_connection_close), |
482 | "Log broker disconnects. " |
483 | "It might be useful to turn this off when interacting with " |
484 | "0.9 brokers with an aggressive `connection.max.idle.ms` value." , |
485 | 0, 1, 1 }, |
486 | { _RK_GLOBAL, "background_event_cb" , _RK_C_PTR, |
487 | _RK(background_event_cb), |
488 | "Background queue event callback " |
489 | "(set with rd_kafka_conf_set_background_event_cb())" }, |
490 | { _RK_GLOBAL, "socket_cb" , _RK_C_PTR, |
491 | _RK(socket_cb), |
492 | "Socket creation callback to provide race-free CLOEXEC" , |
493 | .pdef = |
494 | #ifdef __linux__ |
495 | rd_kafka_socket_cb_linux |
496 | #else |
497 | rd_kafka_socket_cb_generic |
498 | #endif |
499 | }, |
500 | { _RK_GLOBAL, "connect_cb" , _RK_C_PTR, |
501 | _RK(connect_cb), |
502 | "Socket connect callback" , |
503 | }, |
504 | { _RK_GLOBAL, "closesocket_cb" , _RK_C_PTR, |
505 | _RK(closesocket_cb), |
506 | "Socket close callback" , |
507 | }, |
508 | { _RK_GLOBAL, "open_cb" , _RK_C_PTR, |
509 | _RK(open_cb), |
510 | "File open callback to provide race-free CLOEXEC" , |
511 | .pdef = |
512 | #ifdef __linux__ |
513 | rd_kafka_open_cb_linux |
514 | #else |
515 | rd_kafka_open_cb_generic |
516 | #endif |
517 | }, |
518 | { _RK_GLOBAL, "opaque" , _RK_C_PTR, |
519 | _RK(opaque), |
520 | "Application opaque (set with rd_kafka_conf_set_opaque())" }, |
521 | { _RK_GLOBAL, "default_topic_conf" , _RK_C_PTR, |
522 | _RK(topic_conf), |
523 | "Default topic configuration for automatically subscribed topics" }, |
524 | { _RK_GLOBAL, "internal.termination.signal" , _RK_C_INT, |
525 | _RK(term_sig), |
526 | "Signal that librdkafka will use to quickly terminate on " |
527 | "rd_kafka_destroy(). If this signal is not set then there will be a " |
528 | "delay before rd_kafka_wait_destroyed() returns true " |
529 | "as internal threads are timing out their system calls. " |
530 | "If this signal is set however the delay will be minimal. " |
531 | "The application should mask this signal as an internal " |
532 | "signal handler is installed." , |
533 | 0, 128, 0 }, |
534 | { _RK_GLOBAL|_RK_HIGH, "api.version.request" , _RK_C_BOOL, |
535 | _RK(api_version_request), |
536 | "Request broker's supported API versions to adjust functionality to " |
537 | "available protocol features. If set to false, or the " |
538 | "ApiVersionRequest fails, the fallback version " |
539 | "`broker.version.fallback` will be used. " |
540 | "**NOTE**: Depends on broker version >=0.10.0. If the request is not " |
541 | "supported by (an older) broker the `broker.version.fallback` fallback is used." , |
542 | 0, 1, 1 }, |
543 | { _RK_GLOBAL, "api.version.request.timeout.ms" , _RK_C_INT, |
544 | _RK(api_version_request_timeout_ms), |
545 | "Timeout for broker API version requests." , |
546 | 1, 5*60*1000, 10*1000 }, |
547 | { _RK_GLOBAL|_RK_MED, "api.version.fallback.ms" , _RK_C_INT, |
548 | _RK(api_version_fallback_ms), |
549 | "Dictates how long the `broker.version.fallback` fallback is used " |
550 | "in the case the ApiVersionRequest fails. " |
551 | "**NOTE**: The ApiVersionRequest is only issued when a new connection " |
552 | "to the broker is made (such as after an upgrade)." , |
553 | 0, 86400*7*1000, 0 }, |
554 | |
555 | { _RK_GLOBAL|_RK_MED, "broker.version.fallback" , _RK_C_STR, |
556 | _RK(broker_version_fallback), |
557 | "Older broker versions (before 0.10.0) provide no way for a client to query " |
558 | "for supported protocol features " |
559 | "(ApiVersionRequest, see `api.version.request`) making it impossible " |
560 | "for the client to know what features it may use. " |
561 | "As a workaround a user may set this property to the expected broker " |
562 | "version and the client will automatically adjust its feature set " |
563 | "accordingly if the ApiVersionRequest fails (or is disabled). " |
564 | "The fallback broker version will be used for `api.version.fallback.ms`. " |
565 | "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. " |
566 | "Any other value >= 0.10, such as 0.10.2.1, " |
567 | "enables ApiVersionRequests." , |
568 | .sdef = "0.10.0" , |
569 | .validate = rd_kafka_conf_validate_broker_version }, |
570 | |
571 | /* Security related global properties */ |
572 | { _RK_GLOBAL|_RK_HIGH, "security.protocol" , _RK_C_S2I, |
573 | _RK(security_protocol), |
574 | "Protocol used to communicate with brokers." , |
575 | .vdef = RD_KAFKA_PROTO_PLAINTEXT, |
576 | .s2i = { |
577 | { RD_KAFKA_PROTO_PLAINTEXT, "plaintext" }, |
578 | #if WITH_SSL |
579 | { RD_KAFKA_PROTO_SSL, "ssl" }, |
580 | #endif |
581 | { RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext" }, |
582 | #if WITH_SSL |
583 | { RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl" }, |
584 | #endif |
585 | { 0, NULL } |
586 | } }, |
587 | |
588 | #if WITH_SSL |
589 | { _RK_GLOBAL, "ssl.cipher.suites" , _RK_C_STR, |
590 | _RK(ssl.cipher_suites), |
591 | "A cipher suite is a named combination of authentication, " |
592 | "encryption, MAC and key exchange algorithm used to negotiate the " |
593 | "security settings for a network connection using TLS or SSL network " |
594 | "protocol. See manual page for `ciphers(1)` and " |
595 | "`SSL_CTX_set_cipher_list(3)." |
596 | }, |
597 | #if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER) |
598 | { _RK_GLOBAL, "ssl.curves.list" , _RK_C_STR, |
599 | _RK(ssl.curves_list), |
600 | "The supported-curves extension in the TLS ClientHello message specifies " |
601 | "the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client " |
602 | "is willing to have the server use. See manual page for " |
603 | "`SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required." |
604 | }, |
605 | { _RK_GLOBAL, "ssl.sigalgs.list" , _RK_C_STR, |
606 | _RK(ssl.sigalgs_list), |
607 | "The client uses the TLS ClientHello signature_algorithms extension " |
608 | "to indicate to the server which signature/hash algorithm pairs " |
609 | "may be used in digital signatures. See manual page for " |
610 | "`SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required." |
611 | }, |
612 | #endif |
613 | { _RK_GLOBAL, "ssl.key.location" , _RK_C_STR, |
614 | _RK(ssl.key_location), |
615 | "Path to client's private key (PEM) used for authentication." |
616 | }, |
617 | { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.password" , _RK_C_STR, |
618 | _RK(ssl.key_password), |
619 | "Private key passphrase (for use with `ssl.key.location` " |
620 | "and `set_ssl_cert()`)" |
621 | }, |
622 | { _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.pem" , _RK_C_STR, |
623 | _RK(ssl.key_pem), |
624 | "Client's private key string (PEM format) used for authentication." |
625 | }, |
626 | { _RK_GLOBAL, "ssl_key" , _RK_C_INTERNAL, |
627 | _RK(ssl.key), |
628 | "Client's private key as set by rd_kafka_conf_set_ssl_cert()" , |
629 | .dtor = rd_kafka_conf_cert_dtor, |
630 | .copy = rd_kafka_conf_cert_copy |
631 | }, |
632 | { _RK_GLOBAL, "ssl.certificate.location" , _RK_C_STR, |
633 | _RK(ssl.cert_location), |
634 | "Path to client's public key (PEM) used for authentication." |
635 | }, |
636 | { _RK_GLOBAL, "ssl.certificate.pem" , _RK_C_STR, |
637 | _RK(ssl.cert_pem), |
638 | "Client's public key string (PEM format) used for authentication." |
639 | }, |
640 | { _RK_GLOBAL, "ssl_certificate" , _RK_C_INTERNAL, |
641 | _RK(ssl.key), |
642 | "Client's public key as set by rd_kafka_conf_set_ssl_cert()" , |
643 | .dtor = rd_kafka_conf_cert_dtor, |
644 | .copy = rd_kafka_conf_cert_copy |
645 | }, |
646 | |
647 | { _RK_GLOBAL, "ssl.ca.location" , _RK_C_STR, |
648 | _RK(ssl.ca_location), |
649 | "File or directory path to CA certificate(s) for verifying " |
650 | "the broker's key." |
651 | }, |
652 | { _RK_GLOBAL, "ssl_ca" , _RK_C_INTERNAL, |
653 | _RK(ssl.ca), |
654 | "CA certificate as set by rd_kafka_conf_set_ssl_cert()" , |
655 | .dtor = rd_kafka_conf_cert_dtor, |
656 | .copy = rd_kafka_conf_cert_copy |
657 | }, |
658 | { _RK_GLOBAL, "ssl.crl.location" , _RK_C_STR, |
659 | _RK(ssl.crl_location), |
660 | "Path to CRL for verifying broker's certificate validity." |
661 | }, |
662 | { _RK_GLOBAL, "ssl.keystore.location" , _RK_C_STR, |
663 | _RK(ssl.keystore_location), |
664 | "Path to client's keystore (PKCS#12) used for authentication." |
665 | }, |
666 | { _RK_GLOBAL|_RK_SENSITIVE, "ssl.keystore.password" , _RK_C_STR, |
667 | _RK(ssl.keystore_password), |
668 | "Client's keystore (PKCS#12) password." |
669 | }, |
670 | { _RK_GLOBAL, "enable.ssl.certificate.verification" , _RK_C_BOOL, |
671 | _RK(ssl.enable_verify), |
672 | "Enable OpenSSL's builtin broker (server) certificate verification. " |
673 | "This verification can be extended by the application by " |
674 | "implementing a certificate_verify_cb." , |
675 | 0, 1, 1 |
676 | }, |
677 | #if OPENSSL_VERSION_NUMBER >= 0x1000200fL |
678 | { _RK_GLOBAL, "ssl.endpoint.identification.algorithm" , _RK_C_S2I, |
679 | _RK(ssl.endpoint_identification), |
680 | "Endpoint identification algorithm to validate broker " |
681 | "hostname using broker certificate. " |
682 | "https - Server (broker) hostname verification as " |
683 | "specified in RFC2818. " |
684 | "none - No endpoint verification. " |
685 | "OpenSSL >= 1.0.2 required." , |
686 | .vdef = RD_KAFKA_SSL_ENDPOINT_ID_NONE, |
687 | .s2i = { |
688 | { RD_KAFKA_SSL_ENDPOINT_ID_NONE, "none" }, |
689 | { RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, "https" } |
690 | } |
691 | }, |
692 | #endif |
693 | { _RK_GLOBAL, "ssl.certificate.verify_cb" , _RK_C_PTR, |
694 | _RK(ssl.cert_verify_cb), |
695 | "Callback to verify the broker certificate chain." |
696 | }, |
697 | #endif /* WITH_SSL */ |
698 | |
699 | /* Point user in the right direction if they try to apply |
700 | * Java client SSL / JAAS properties. */ |
701 | { _RK_GLOBAL, "ssl.truststore.location" , _RK_C_INVALID, |
702 | _RK(dummy), |
703 | "Java TrustStores are not supported, use `ssl.ca.location` " |
704 | "and a certificate file instead. " |
705 | "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information." |
706 | }, |
707 | { _RK_GLOBAL, "sasl.jaas.config" , _RK_C_INVALID, |
708 | _RK(dummy), |
709 | "Java JAAS configuration is not supported, see " |
710 | "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka " |
711 | "for more information." |
712 | }, |
713 | |
714 | {_RK_GLOBAL|_RK_HIGH, "sasl.mechanisms" , _RK_C_STR, |
715 | _RK(sasl.mechanisms), |
716 | "SASL mechanism to use for authentication. " |
717 | "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. " |
718 | "**NOTE**: Despite the name only one mechanism must be configured." , |
719 | .sdef = "GSSAPI" , |
720 | .validate = rd_kafka_conf_validate_single }, |
721 | {_RK_GLOBAL|_RK_HIGH, "sasl.mechanism" , _RK_C_ALIAS, |
722 | .sdef = "sasl.mechanisms" }, |
723 | { _RK_GLOBAL, "sasl.kerberos.service.name" , _RK_C_STR, |
724 | _RK(sasl.service_name), |
725 | "Kerberos principal name that Kafka runs as, " |
726 | "not including /hostname@REALM" , |
727 | .sdef = "kafka" }, |
728 | { _RK_GLOBAL, "sasl.kerberos.principal" , _RK_C_STR, |
729 | _RK(sasl.principal), |
730 | "This client's Kerberos principal name. " |
731 | "(Not supported on Windows, will use the logon user's principal)." , |
732 | .sdef = "kafkaclient" }, |
733 | #ifndef _MSC_VER |
734 | { _RK_GLOBAL, "sasl.kerberos.kinit.cmd" , _RK_C_STR, |
735 | _RK(sasl.kinit_cmd), |
736 | "Shell command to refresh or acquire the client's Kerberos ticket. " |
737 | "This command is executed on client creation and every " |
738 | "sasl.kerberos.min.time.before.relogin (0=disable). " |
739 | "%{config.prop.name} is replaced by corresponding config " |
740 | "object value." , |
741 | .sdef = |
742 | /* First attempt to refresh, else acquire. */ |
743 | "kinit -R -t \"%{sasl.kerberos.keytab}\" " |
744 | "-k %{sasl.kerberos.principal} || " |
745 | "kinit -t \"%{sasl.kerberos.keytab}\" -k %{sasl.kerberos.principal}" |
746 | }, |
747 | { _RK_GLOBAL, "sasl.kerberos.keytab" , _RK_C_STR, |
748 | _RK(sasl.keytab), |
749 | "Path to Kerberos keytab file. " |
750 | "This configuration property is only used as a variable in " |
751 | "`sasl.kerberos.kinit.cmd` as " |
752 | "` ... -t \"%{sasl.kerberos.keytab}\"`." }, |
753 | { _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin" , _RK_C_INT, |
754 | _RK(sasl.relogin_min_time), |
755 | "Minimum time in milliseconds between key refresh attempts. " |
756 | "Disable automatic key refresh by setting this property to 0." , |
757 | 0, 86400*1000, 60*1000 }, |
758 | #endif |
759 | { _RK_GLOBAL|_RK_HIGH, "sasl.username" , _RK_C_STR, |
760 | _RK(sasl.username), |
761 | "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" }, |
762 | { _RK_GLOBAL|_RK_HIGH, "sasl.password" , _RK_C_STR, |
763 | _RK(sasl.password), |
764 | "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" }, |
765 | #if WITH_SASL_OAUTHBEARER |
766 | { _RK_GLOBAL, "sasl.oauthbearer.config" , _RK_C_STR, |
767 | _RK(sasl.oauthbearer_config), |
768 | "SASL/OAUTHBEARER configuration. The format is " |
769 | "implementation-dependent and must be parsed accordingly. The " |
770 | "default unsecured token implementation (see " |
771 | "https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes " |
772 | "space-separated name=value pairs with valid names including " |
773 | "principalClaimName, principal, scopeClaimName, scope, and " |
774 | "lifeSeconds. The default value for principalClaimName is \"sub\", " |
775 | "the default value for scopeClaimName is \"scope\", and the default " |
776 | "value for lifeSeconds is 3600. The scope value is CSV format with " |
777 | "the default value being no/empty scope. For example: " |
778 | "`principalClaimName=azp principal=admin scopeClaimName=roles " |
779 | "scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions " |
780 | "can be communicated to the broker via " |
781 | "`extension_<extensionname>=value`. For example: " |
782 | "`principal=admin extension_traceId=123`" }, |
783 | { _RK_GLOBAL, "enable.sasl.oauthbearer.unsecure.jwt" , _RK_C_BOOL, |
784 | _RK(sasl.enable_oauthbearer_unsecure_jwt), |
785 | "Enable the builtin unsecure JWT OAUTHBEARER token handler " |
786 | "if no oauthbearer_refresh_cb has been set. " |
787 | "This builtin handler should only be used for development " |
788 | "or testing, and not in production." , |
789 | 0, 1, 0 }, |
790 | { _RK_GLOBAL, "oauthbearer_token_refresh_cb" , _RK_C_PTR, |
791 | _RK(sasl.oauthbearer_token_refresh_cb), |
792 | "SASL/OAUTHBEARER token refresh callback (set with " |
793 | "rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by " |
794 | "rd_kafka_poll(), et.al. " |
795 | "This callback will be triggered when it is time to refresh " |
796 | "the client's OAUTHBEARER token." }, |
797 | #endif |
798 | |
799 | #if WITH_PLUGINS |
800 | /* Plugins */ |
801 | { _RK_GLOBAL, "plugin.library.paths" , _RK_C_STR, |
802 | _RK(plugin_paths), |
803 | "List of plugin libraries to load (; separated). " |
804 | "The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the " |
805 | "platform-specific extension (such as .dll or .so) will be appended automatically." , |
806 | .set = rd_kafka_plugins_conf_set }, |
807 | #endif |
808 | |
809 | /* Interceptors are added through specific API and not exposed |
810 | * as configuration properties. |
811 | * The interceptor property must be defined after plugin.library.paths |
812 | * so that the plugin libraries are properly loaded before |
813 | * interceptors are configured when duplicating configuration objects.*/ |
814 | { _RK_GLOBAL, "interceptors" , _RK_C_INTERNAL, |
815 | _RK(interceptors), |
816 | "Interceptors added through rd_kafka_conf_interceptor_add_..() " |
817 | "and any configuration handled by interceptors." , |
818 | .ctor = rd_kafka_conf_interceptor_ctor, |
819 | .dtor = rd_kafka_conf_interceptor_dtor, |
820 | .copy = rd_kafka_conf_interceptor_copy }, |
821 | |
822 | /* Unit test interfaces. |
823 | * These are not part of the public API and may change at any time. |
824 | * Only to be used by the librdkafka tests. */ |
825 | { _RK_GLOBAL|_RK_HIDDEN, "ut_handle_ProduceResponse" , _RK_C_PTR, |
826 | _RK(ut.handle_ProduceResponse), |
827 | "ProduceResponse handler: " |
828 | "rd_kafka_resp_err_t (*cb) (rd_kafka_t *rk, " |
829 | "int32_t brokerid, uint64_t msgid, rd_kafka_resp_err_t err)" }, |
830 | |
831 | /* Global consumer group properties */ |
832 | { _RK_GLOBAL|_RK_CGRP|_RK_HIGH, "group.id" , _RK_C_STR, |
833 | _RK(group_id_str), |
834 | "Client group id string. All clients sharing the same group.id " |
835 | "belong to the same group." }, |
836 | { _RK_GLOBAL|_RK_CGRP|_RK_MED, "partition.assignment.strategy" , |
837 | _RK_C_STR, |
838 | _RK(partition_assignment_strategy), |
839 | "Name of partition assignment strategy to use when elected " |
840 | "group leader assigns partitions to group members." , |
841 | .sdef = "range,roundrobin" }, |
842 | { _RK_GLOBAL|_RK_CGRP|_RK_HIGH, "session.timeout.ms" , _RK_C_INT, |
843 | _RK(group_session_timeout_ms), |
844 | "Client group session and failure detection timeout. " |
845 | "The consumer sends periodic heartbeats (heartbeat.interval.ms) " |
846 | "to indicate its liveness to the broker. If no hearts are " |
847 | "received by the broker for a group member within the " |
848 | "session timeout, the broker will remove the consumer from " |
849 | "the group and trigger a rebalance. " |
850 | "The allowed range is configured with the **broker** configuration " |
851 | "properties `group.min.session.timeout.ms` and " |
852 | "`group.max.session.timeout.ms`. " |
853 | "Also see `max.poll.interval.ms`." , |
854 | 1, 3600*1000, 10*1000 }, |
855 | { _RK_GLOBAL|_RK_CGRP, "heartbeat.interval.ms" , _RK_C_INT, |
856 | _RK(group_heartbeat_intvl_ms), |
857 | "Group session keepalive heartbeat interval." , |
858 | 1, 3600*1000, 3*1000 }, |
859 | { _RK_GLOBAL|_RK_CGRP, "group.protocol.type" , _RK_C_KSTR, |
860 | _RK(group_protocol_type), |
861 | "Group protocol type" , |
862 | .sdef = "consumer" }, |
863 | { _RK_GLOBAL|_RK_CGRP, "coordinator.query.interval.ms" , _RK_C_INT, |
864 | _RK(coord_query_intvl_ms), |
865 | "How often to query for the current client group coordinator. " |
866 | "If the currently assigned coordinator is down the configured " |
867 | "query interval will be divided by ten to more quickly recover " |
868 | "in case of coordinator reassignment." , |
869 | 1, 3600*1000, 10*60*1000 }, |
870 | { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "max.poll.interval.ms" , _RK_C_INT, |
871 | _RK(max_poll_interval_ms), |
872 | "Maximum allowed time between calls to consume messages " |
873 | "(e.g., rd_kafka_consumer_poll()) for high-level consumers. " |
874 | "If this interval is exceeded the consumer is considered failed " |
875 | "and the group will rebalance in order to reassign the " |
876 | "partitions to another consumer group member. " |
877 | "Warning: Offset commits may be not possible at this point. " |
878 | "Note: It is recommended to set `enable.auto.offset.store=false` " |
879 | "for long-time processing applications and then explicitly store " |
880 | "offsets (using offsets_store()) *after* message processing, to " |
881 | "make sure offsets are not auto-committed prior to processing " |
882 | "has finished. " |
883 | "The interval is checked two times per second. " |
884 | "See KIP-62 for more information." , |
885 | 1, 86400*1000, 300000 |
886 | }, |
887 | |
888 | /* Global consumer properties */ |
889 | { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "enable.auto.commit" , _RK_C_BOOL, |
890 | _RK(enable_auto_commit), |
891 | "Automatically and periodically commit offsets in the background. " |
892 | "Note: setting this to false does not prevent the consumer from " |
893 | "fetching previously committed start offsets. To circumvent this " |
894 | "behaviour set specific start offsets per partition in the call " |
895 | "to assign()." , |
896 | 0, 1, 1 }, |
897 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "auto.commit.interval.ms" , |
898 | _RK_C_INT, |
899 | _RK(auto_commit_interval_ms), |
900 | "The frequency in milliseconds that the consumer offsets " |
901 | "are committed (written) to offset storage. (0 = disable). " |
902 | "This setting is used by the high-level consumer." , |
903 | 0, 86400*1000, 5*1000 }, |
904 | { _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "enable.auto.offset.store" , |
905 | _RK_C_BOOL, |
906 | _RK(enable_auto_offset_store), |
907 | "Automatically store offset of last message provided to " |
908 | "application. " |
909 | "The offset store is an in-memory store of the next offset to " |
910 | "(auto-)commit for each partition." , |
911 | 0, 1, 1 }, |
912 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "queued.min.messages" , _RK_C_INT, |
913 | _RK(queued_min_msgs), |
914 | "Minimum number of messages per topic+partition " |
915 | "librdkafka tries to maintain in the local consumer queue." , |
916 | 1, 10000000, 100000 }, |
917 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "queued.max.messages.kbytes" , |
918 | _RK_C_INT, |
919 | _RK(queued_max_msg_kbytes), |
920 | "Maximum number of kilobytes per topic+partition in the " |
921 | "local consumer queue. " |
922 | "This value may be overshot by fetch.message.max.bytes. " |
923 | "This property has higher priority than queued.min.messages." , |
924 | 1, INT_MAX/1024, 0x100000/*1GB*/ }, |
925 | { _RK_GLOBAL|_RK_CONSUMER, "fetch.wait.max.ms" , _RK_C_INT, |
926 | _RK(fetch_wait_max_ms), |
927 | "Maximum time the broker may wait to fill the response " |
928 | "with fetch.min.bytes." , |
929 | 0, 300*1000, 100 }, |
930 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.message.max.bytes" , |
931 | _RK_C_INT, |
932 | _RK(fetch_msg_max_bytes), |
933 | "Initial maximum number of bytes per topic+partition to request when " |
934 | "fetching messages from the broker. " |
935 | "If the client encounters a message larger than this value " |
936 | "it will gradually try to increase it until the " |
937 | "entire message can be fetched." , |
938 | 1, 1000000000, 1024*1024 }, |
939 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "max.partition.fetch.bytes" , |
940 | _RK_C_ALIAS, |
941 | .sdef = "fetch.message.max.bytes" }, |
942 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.max.bytes" , _RK_C_INT, |
943 | _RK(fetch_max_bytes), |
944 | "Maximum amount of data the broker shall return for a Fetch request. " |
945 | "Messages are fetched in batches by the consumer and if the first " |
946 | "message batch in the first non-empty partition of the Fetch request " |
947 | "is larger than this value, then the message batch will still be " |
948 | "returned to ensure the consumer can make progress. " |
949 | "The maximum message batch size accepted by the broker is defined " |
950 | "via `message.max.bytes` (broker config) or " |
951 | "`max.message.bytes` (broker topic config). " |
952 | "`fetch.max.bytes` is automatically adjusted upwards to be " |
953 | "at least `message.max.bytes` (consumer config)." , |
954 | 0, INT_MAX-512, 50*1024*1024 /* 50MB */ }, |
955 | { _RK_GLOBAL|_RK_CONSUMER, "fetch.min.bytes" , _RK_C_INT, |
956 | _RK(fetch_min_bytes), |
957 | "Minimum number of bytes the broker responds with. " |
958 | "If fetch.wait.max.ms expires the accumulated data will " |
959 | "be sent to the client regardless of this setting." , |
960 | 1, 100000000, 1 }, |
961 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "fetch.error.backoff.ms" , _RK_C_INT, |
962 | _RK(fetch_error_backoff_ms), |
963 | "How long to postpone the next fetch request for a " |
964 | "topic+partition in case of a fetch error." , |
965 | 0, 300*1000, 500 }, |
966 | { _RK_GLOBAL|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.method" , |
967 | _RK_C_S2I, |
968 | _RK(offset_store_method), |
969 | "Offset commit store method: " |
970 | "'file' - DEPRECATED: local file store (offset.store.path, et.al), " |
971 | "'broker' - broker commit store " |
972 | "(requires Apache Kafka 0.8.2 or later on the broker)." , |
973 | .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, |
974 | .s2i = { |
975 | { RD_KAFKA_OFFSET_METHOD_NONE, "none" }, |
976 | { RD_KAFKA_OFFSET_METHOD_FILE, "file" }, |
977 | { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" } |
978 | } |
979 | }, |
980 | { _RK_GLOBAL|_RK_CONSUMER, "consume_cb" , _RK_C_PTR, |
981 | _RK(consume_cb), |
982 | "Message consume callback (set with rd_kafka_conf_set_consume_cb())" }, |
983 | { _RK_GLOBAL|_RK_CONSUMER, "rebalance_cb" , _RK_C_PTR, |
984 | _RK(rebalance_cb), |
985 | "Called after consumer group has been rebalanced " |
986 | "(set with rd_kafka_conf_set_rebalance_cb())" }, |
987 | { _RK_GLOBAL|_RK_CONSUMER, "offset_commit_cb" , _RK_C_PTR, |
988 | _RK(offset_commit_cb), |
989 | "Offset commit result propagation callback. " |
990 | "(set with rd_kafka_conf_set_offset_commit_cb())" }, |
991 | { _RK_GLOBAL|_RK_CONSUMER, "enable.partition.eof" , _RK_C_BOOL, |
992 | _RK(enable_partition_eof), |
993 | "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the " |
994 | "consumer reaches the end of a partition." , |
995 | 0, 1, 0 }, |
996 | { _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "check.crcs" , _RK_C_BOOL, |
997 | _RK(check_crcs), |
998 | "Verify CRC32 of consumed messages, ensuring no on-the-wire or " |
999 | "on-disk corruption to the messages occurred. This check comes " |
1000 | "at slightly increased CPU usage." , |
1001 | 0, 1, 0 }, |
1002 | |
1003 | /* Global producer properties */ |
1004 | { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "enable.idempotence" , _RK_C_BOOL, |
1005 | _RK(eos.idempotence), |
1006 | "When set to `true`, the producer will ensure that messages are " |
1007 | "successfully produced exactly once and in the original produce " |
1008 | "order. " |
1009 | "The following configuration properties are adjusted automatically " |
1010 | "(if not modified by the user) when idempotence is enabled: " |
1011 | "`max.in.flight.requests.per.connection=" |
1012 | RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "` (must be less than or " |
1013 | "equal to " RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "), `retries=INT32_MAX` " |
1014 | "(must be greater than 0), `acks=all`, `queuing.strategy=fifo`. " |
1015 | "Producer instantation will fail if user-supplied configuration " |
1016 | "is incompatible." , |
1017 | 0, 1, 0 }, |
1018 | { _RK_GLOBAL|_RK_PRODUCER|_RK_EXPERIMENTAL, "enable.gapless.guarantee" , |
1019 | _RK_C_BOOL, |
1020 | _RK(eos.gapless), |
1021 | "When set to `true`, any error that could result in a gap " |
1022 | "in the produced message series when a batch of messages fails, " |
1023 | "will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop " |
1024 | "the producer. " |
1025 | "Messages failing due to `message.timeout.ms` are not covered " |
1026 | "by this guarantee. " |
1027 | "Requires `enable.idempotence=true`." , |
1028 | 0, 1, 0 }, |
1029 | { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.messages" , |
1030 | _RK_C_INT, |
1031 | _RK(queue_buffering_max_msgs), |
1032 | "Maximum number of messages allowed on the producer queue. " |
1033 | "This queue is shared by all topics and partitions." , |
1034 | 1, 10000000, 100000 }, |
1035 | { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.kbytes" , |
1036 | _RK_C_INT, |
1037 | _RK(queue_buffering_max_kbytes), |
1038 | "Maximum total message size sum allowed on the producer queue. " |
1039 | "This queue is shared by all topics and partitions. " |
1040 | "This property has higher priority than queue.buffering.max.messages." , |
1041 | 1, INT_MAX/1024, 0x100000/*1GB*/ }, |
1042 | { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "queue.buffering.max.ms" , |
1043 | _RK_C_INT, |
1044 | _RK(buffering_max_ms), |
1045 | "Delay in milliseconds to wait for messages in the producer queue " |
1046 | "to accumulate before constructing message batches (MessageSets) to " |
1047 | "transmit to brokers. " |
1048 | "A higher value allows larger and more effective " |
1049 | "(less overhead, improved compression) batches of messages to " |
1050 | "accumulate at the expense of increased message delivery latency." , |
1051 | 0, 900*1000, 0 }, |
1052 | { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "linger.ms" , _RK_C_ALIAS, |
1053 | .sdef = "queue.buffering.max.ms" }, |
1054 | { _RK_GLOBAL|_RK_PRODUCER|_RK_HIGH, "message.send.max.retries" , |
1055 | _RK_C_INT, |
1056 | _RK(max_retries), |
1057 | "How many times to retry sending a failing Message. " |
1058 | "**Note:** retrying may cause reordering unless " |
1059 | "`enable.idempotence` is set to true." , |
1060 | 0, 10000000, 2 }, |
1061 | { _RK_GLOBAL | _RK_PRODUCER, "retries" , _RK_C_ALIAS, |
1062 | .sdef = "message.send.max.retries" }, |
1063 | { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "retry.backoff.ms" , _RK_C_INT, |
1064 | _RK(retry_backoff_ms), |
1065 | "The backoff time in milliseconds before retrying a protocol request." , |
1066 | 1, 300*1000, 100 }, |
1067 | |
1068 | { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.backpressure.threshold" , |
1069 | _RK_C_INT, _RK(queue_backpressure_thres), |
1070 | "The threshold of outstanding not yet transmitted broker requests " |
1071 | "needed to backpressure the producer's message accumulator. " |
1072 | "If the number of not yet transmitted requests equals or exceeds " |
1073 | "this number, produce request creation that would have otherwise " |
1074 | "been triggered (for example, in accordance with linger.ms) will be " |
1075 | "delayed. A lower number yields larger and more effective batches. " |
1076 | "A higher value can improve latency when using compression on slow " |
1077 | "machines." , |
1078 | 1, 1000000, 1 }, |
1079 | |
1080 | { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "compression.codec" , _RK_C_S2I, |
1081 | _RK(compression_codec), |
1082 | "compression codec to use for compressing message sets. " |
1083 | "This is the default value for all topics, may be overridden by " |
1084 | "the topic configuration property `compression.codec`. " , |
1085 | .vdef = RD_KAFKA_COMPRESSION_NONE, |
1086 | .s2i = { |
1087 | { RD_KAFKA_COMPRESSION_NONE, "none" }, |
1088 | #if WITH_ZLIB |
1089 | { RD_KAFKA_COMPRESSION_GZIP, "gzip" }, |
1090 | #endif |
1091 | #if WITH_SNAPPY |
1092 | { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" }, |
1093 | #endif |
1094 | { RD_KAFKA_COMPRESSION_LZ4, "lz4" }, |
1095 | #if WITH_ZSTD |
1096 | { RD_KAFKA_COMPRESSION_ZSTD, "zstd" }, |
1097 | #endif |
1098 | { 0 } |
1099 | } }, |
1100 | { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "compression.type" , _RK_C_ALIAS, |
1101 | .sdef = "compression.codec" }, |
1102 | { _RK_GLOBAL|_RK_PRODUCER|_RK_MED, "batch.num.messages" , _RK_C_INT, |
1103 | _RK(batch_num_messages), |
1104 | "Maximum number of messages batched in one MessageSet. " |
1105 | "The total MessageSet size is also limited by message.max.bytes." , |
1106 | 1, 1000000, 10000 }, |
1107 | { _RK_GLOBAL|_RK_PRODUCER, "delivery.report.only.error" , _RK_C_BOOL, |
1108 | _RK(dr_err_only), |
1109 | "Only provide delivery reports for failed messages." , |
1110 | 0, 1, 0 }, |
1111 | { _RK_GLOBAL|_RK_PRODUCER, "dr_cb" , _RK_C_PTR, |
1112 | _RK(dr_cb), |
1113 | "Delivery report callback (set with rd_kafka_conf_set_dr_cb())" }, |
1114 | { _RK_GLOBAL|_RK_PRODUCER, "dr_msg_cb" , _RK_C_PTR, |
1115 | _RK(dr_msg_cb), |
1116 | "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())" }, |
1117 | |
1118 | |
1119 | /* |
1120 | * Topic properties |
1121 | */ |
1122 | |
1123 | /* Topic producer properties */ |
1124 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "request.required.acks" , _RK_C_INT, |
1125 | _RKT(required_acks), |
1126 | "This field indicates the number of acknowledgements the leader " |
1127 | "broker must receive from ISR brokers before responding to the " |
1128 | "request: " |
1129 | "*0*=Broker does not send any response/ack to client, " |
1130 | "*-1* or *all*=Broker will block until message is committed by all " |
1131 | "in sync replicas (ISRs). If there are less than " |
1132 | "`min.insync.replicas` (broker configuration) in the ISR set the " |
1133 | "produce request will fail." , |
1134 | -1, 1000, -1, |
1135 | .s2i = { |
1136 | { -1, "all" }, |
1137 | } |
1138 | }, |
1139 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "acks" , _RK_C_ALIAS, |
1140 | .sdef = "request.required.acks" }, |
1141 | |
1142 | { _RK_TOPIC|_RK_PRODUCER|_RK_MED, "request.timeout.ms" , _RK_C_INT, |
1143 | _RKT(request_timeout_ms), |
1144 | "The ack timeout of the producer request in milliseconds. " |
1145 | "This value is only enforced by the broker and relies " |
1146 | "on `request.required.acks` being != 0." , |
1147 | 1, 900*1000, 5*1000 }, |
1148 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "message.timeout.ms" , _RK_C_INT, |
1149 | _RKT(message_timeout_ms), |
1150 | "Local message timeout. " |
1151 | "This value is only enforced locally and limits the time a " |
1152 | "produced message waits for successful delivery. " |
1153 | "A time of 0 is infinite. " |
1154 | "This is the maximum time librdkafka may use to deliver a message " |
1155 | "(including retries). Delivery error occurs when either the retry " |
1156 | "count or the message timeout are exceeded." , |
1157 | 0, INT32_MAX, 300*1000 }, |
1158 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "delivery.timeout.ms" , _RK_C_ALIAS, |
1159 | .sdef = "message.timeout.ms" }, |
1160 | { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED|_RK_EXPERIMENTAL, |
1161 | "queuing.strategy" , _RK_C_S2I, |
1162 | _RKT(queuing_strategy), |
1163 | "Producer queuing strategy. FIFO preserves produce ordering, " |
1164 | "while LIFO prioritizes new messages." , |
1165 | .vdef = 0, |
1166 | .s2i = { |
1167 | { RD_KAFKA_QUEUE_FIFO, "fifo" }, |
1168 | { RD_KAFKA_QUEUE_LIFO, "lifo" } |
1169 | } |
1170 | }, |
1171 | { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED, |
1172 | "produce.offset.report" , _RK_C_BOOL, |
1173 | _RKT(produce_offset_report), |
1174 | "No longer used." , |
1175 | 0, 1, 0 }, |
1176 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "partitioner" , _RK_C_STR, |
1177 | _RKT(partitioner_str), |
1178 | "Partitioner: " |
1179 | "`random` - random distribution, " |
1180 | "`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), " |
1181 | "`consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), " |
1182 | "`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), " |
1183 | "`murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.)." , |
1184 | .sdef = "consistent_random" , |
1185 | .validate = rd_kafka_conf_validate_partitioner }, |
1186 | { _RK_TOPIC|_RK_PRODUCER, "partitioner_cb" , _RK_C_PTR, |
1187 | _RKT(partitioner), |
1188 | "Custom partitioner callback " |
1189 | "(set with rd_kafka_topic_conf_set_partitioner_cb())" }, |
1190 | { _RK_TOPIC|_RK_PRODUCER|_RK_DEPRECATED|_RK_EXPERIMENTAL, |
1191 | "msg_order_cmp" , _RK_C_PTR, |
1192 | _RKT(msg_order_cmp), |
1193 | "Message queue ordering comparator " |
1194 | "(set with rd_kafka_topic_conf_set_msg_order_cmp()). " |
1195 | "Also see `queuing.strategy`." }, |
1196 | { _RK_TOPIC, "opaque" , _RK_C_PTR, |
1197 | _RKT(opaque), |
1198 | "Application opaque (set with rd_kafka_topic_conf_set_opaque())" }, |
1199 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "compression.codec" , _RK_C_S2I, |
1200 | _RKT(compression_codec), |
1201 | "Compression codec to use for compressing message sets. " |
1202 | "inherit = inherit global compression.codec configuration." , |
1203 | .vdef = RD_KAFKA_COMPRESSION_INHERIT, |
1204 | .s2i = { |
1205 | { RD_KAFKA_COMPRESSION_NONE, "none" }, |
1206 | #if WITH_ZLIB |
1207 | { RD_KAFKA_COMPRESSION_GZIP, "gzip" }, |
1208 | #endif |
1209 | #if WITH_SNAPPY |
1210 | { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" }, |
1211 | #endif |
1212 | { RD_KAFKA_COMPRESSION_LZ4, "lz4" }, |
1213 | #if WITH_ZSTD |
1214 | { RD_KAFKA_COMPRESSION_ZSTD, "zstd" }, |
1215 | #endif |
1216 | { RD_KAFKA_COMPRESSION_INHERIT, "inherit" }, |
1217 | { 0 } |
1218 | } }, |
1219 | { _RK_TOPIC|_RK_PRODUCER|_RK_HIGH, "compression.type" , _RK_C_ALIAS, |
1220 | .sdef = "compression.codec" }, |
1221 | { _RK_TOPIC|_RK_PRODUCER|_RK_MED, "compression.level" , _RK_C_INT, |
1222 | _RKT(compression_level), |
1223 | "Compression level parameter for algorithm selected by configuration " |
1224 | "property `compression.codec`. Higher values will result in better " |
1225 | "compression at the cost of more CPU usage. Usable range is " |
1226 | "algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; " |
1227 | "-1 = codec-dependent default compression level." , |
1228 | RD_KAFKA_COMPLEVEL_MIN, |
1229 | RD_KAFKA_COMPLEVEL_MAX, |
1230 | RD_KAFKA_COMPLEVEL_DEFAULT }, |
1231 | |
1232 | |
1233 | /* Topic consumer properties */ |
1234 | { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "auto.commit.enable" , |
1235 | _RK_C_BOOL, |
1236 | _RKT(auto_commit), |
1237 | "[**LEGACY PROPERTY:** This property is used by the simple legacy " |
1238 | "consumer only. When using the high-level KafkaConsumer, the global " |
1239 | "`enable.auto.commit` property must be used instead]. " |
1240 | "If true, periodically commit offset of the last message handed " |
1241 | "to the application. This committed offset will be used when the " |
1242 | "process restarts to pick up where it left off. " |
1243 | "If false, the application will have to call " |
1244 | "`rd_kafka_offset_store()` to store an offset (optional). " |
1245 | "**NOTE:** There is currently no zookeeper integration, offsets " |
1246 | "will be written to broker or local file according to " |
1247 | "offset.store.method." , |
1248 | 0, 1, 1 }, |
1249 | { _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit" , _RK_C_ALIAS, |
1250 | .sdef = "auto.commit.enable" }, |
1251 | { _RK_TOPIC|_RK_CONSUMER|_RK_HIGH, "auto.commit.interval.ms" , |
1252 | _RK_C_INT, |
1253 | _RKT(auto_commit_interval_ms), |
1254 | "[**LEGACY PROPERTY:** This setting is used by the simple legacy " |
1255 | "consumer only. When using the high-level KafkaConsumer, the " |
1256 | "global `auto.commit.interval.ms` property must be used instead]. " |
1257 | "The frequency in milliseconds that the consumer offsets " |
1258 | "are committed (written) to offset storage." , |
1259 | 10, 86400*1000, 60*1000 }, |
1260 | { _RK_TOPIC|_RK_CONSUMER|_RK_HIGH, "auto.offset.reset" , _RK_C_S2I, |
1261 | _RKT(auto_offset_reset), |
1262 | "Action to take when there is no initial offset in offset store " |
1263 | "or the desired offset is out of range: " |
1264 | "'smallest','earliest' - automatically reset the offset to the smallest offset, " |
1265 | "'largest','latest' - automatically reset the offset to the largest offset, " |
1266 | "'error' - trigger an error which is retrieved by consuming messages " |
1267 | "and checking 'message->err'." , |
1268 | .vdef = RD_KAFKA_OFFSET_END, |
1269 | .s2i = { |
1270 | { RD_KAFKA_OFFSET_BEGINNING, "smallest" }, |
1271 | { RD_KAFKA_OFFSET_BEGINNING, "earliest" }, |
1272 | { RD_KAFKA_OFFSET_BEGINNING, "beginning" }, |
1273 | { RD_KAFKA_OFFSET_END, "largest" }, |
1274 | { RD_KAFKA_OFFSET_END, "latest" }, |
1275 | { RD_KAFKA_OFFSET_END, "end" }, |
1276 | { RD_KAFKA_OFFSET_INVALID, "error" }, |
1277 | } |
1278 | }, |
1279 | { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.path" , |
1280 | _RK_C_STR, |
1281 | _RKT(offset_store_path), |
1282 | "Path to local file for storing offsets. If the path is a directory " |
1283 | "a filename will be automatically generated in that directory based " |
1284 | "on the topic and partition. " |
1285 | "File-based offset storage will be removed in a future version." , |
1286 | .sdef = "." }, |
1287 | |
1288 | { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, |
1289 | "offset.store.sync.interval.ms" , _RK_C_INT, |
1290 | _RKT(offset_store_sync_interval_ms), |
1291 | "fsync() interval for the offset file, in milliseconds. " |
1292 | "Use -1 to disable syncing, and 0 for immediate sync after " |
1293 | "each write. " |
1294 | "File-based offset storage will be removed in a future version." , |
1295 | -1, 86400*1000, -1 }, |
1296 | |
1297 | { _RK_TOPIC|_RK_CONSUMER|_RK_DEPRECATED, "offset.store.method" , |
1298 | _RK_C_S2I, |
1299 | _RKT(offset_store_method), |
1300 | "Offset commit store method: " |
1301 | "'file' - DEPRECATED: local file store (offset.store.path, et.al), " |
1302 | "'broker' - broker commit store " |
1303 | "(requires \"group.id\" to be configured and " |
1304 | "Apache Kafka 0.8.2 or later on the broker.)." , |
1305 | .vdef = RD_KAFKA_OFFSET_METHOD_BROKER, |
1306 | .s2i = { |
1307 | { RD_KAFKA_OFFSET_METHOD_FILE, "file" }, |
1308 | { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" } |
1309 | } |
1310 | }, |
1311 | |
1312 | { _RK_TOPIC|_RK_CONSUMER, "consume.callback.max.messages" , _RK_C_INT, |
1313 | _RKT(consume_callback_max_msgs), |
1314 | "Maximum number of messages to dispatch in " |
1315 | "one `rd_kafka_consume_callback*()` call (0 = unlimited)" , |
1316 | 0, 1000000, 0 }, |
1317 | |
1318 | { 0, /* End */ } |
1319 | }; |
1320 | |
1321 | /** |
1322 | * @returns the property object for \p name in \p scope, or NULL if not found. |
1323 | * @remark does not work with interceptor configs. |
1324 | */ |
1325 | const struct rd_kafka_property * |
1326 | rd_kafka_conf_prop_find (int scope, const char *name) { |
1327 | const struct rd_kafka_property *prop; |
1328 | |
1329 | restart: |
1330 | for (prop = rd_kafka_properties ; prop->name ; prop++) { |
1331 | |
1332 | if (!(prop->scope & scope)) |
1333 | continue; |
1334 | |
1335 | if (strcmp(prop->name, name)) |
1336 | continue; |
1337 | |
1338 | if (prop->type == _RK_C_ALIAS) { |
1339 | /* Caller supplied an alias, restart |
1340 | * search for real name. */ |
1341 | name = prop->sdef; |
1342 | goto restart; |
1343 | } |
1344 | |
1345 | return prop; |
1346 | } |
1347 | |
1348 | return NULL; |
1349 | } |
1350 | |
1351 | /** |
1352 | * @returns rd_true if property has been set/modified, else rd_false. |
1353 | * If \p name is unknown 0 is returned. |
1354 | */ |
1355 | static rd_bool_t rd_kafka_conf_is_modified (const rd_kafka_conf_t *conf, |
1356 | const char *name) { |
1357 | const struct rd_kafka_property *prop; |
1358 | |
1359 | if (!(prop = rd_kafka_conf_prop_find(_RK_GLOBAL, name))) |
1360 | return rd_false; |
1361 | |
1362 | return rd_kafka_anyconf_is_modified(conf, prop); |
1363 | } |
1364 | |
1365 | |
1366 | /** |
1367 | * @returns true if property has been set/modified, else 0. |
1368 | * If \p name is unknown 0 is returned. |
1369 | */ |
1370 | static |
1371 | rd_bool_t rd_kafka_topic_conf_is_modified (const rd_kafka_topic_conf_t *conf, |
1372 | const char *name) { |
1373 | const struct rd_kafka_property *prop; |
1374 | |
1375 | if (!(prop = rd_kafka_conf_prop_find(_RK_TOPIC, name))) |
1376 | return 0; |
1377 | |
1378 | return rd_kafka_anyconf_is_modified(conf, prop); |
1379 | } |
1380 | |
1381 | |
1382 | |
1383 | static rd_kafka_conf_res_t |
1384 | rd_kafka_anyconf_set_prop0 (int scope, void *conf, |
1385 | const struct rd_kafka_property *prop, |
1386 | const char *istr, int ival, rd_kafka_conf_set_mode_t set_mode, |
1387 | char *errstr, size_t errstr_size) { |
1388 | rd_kafka_conf_res_t res; |
1389 | |
1390 | #define _RK_PTR(TYPE,BASE,OFFSET) (TYPE)(void *)(((char *)(BASE))+(OFFSET)) |
1391 | |
1392 | /* Try interceptors first (only for GLOBAL config) */ |
1393 | if (scope & _RK_GLOBAL) { |
1394 | if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL) |
1395 | res = RD_KAFKA_CONF_UNKNOWN; |
1396 | else |
1397 | res = rd_kafka_interceptors_on_conf_set(conf, |
1398 | prop->name, |
1399 | istr, |
1400 | errstr, |
1401 | errstr_size); |
1402 | if (res != RD_KAFKA_CONF_UNKNOWN) |
1403 | return res; |
1404 | } |
1405 | |
1406 | |
1407 | if (prop->set) { |
1408 | /* Custom setter */ |
1409 | rd_kafka_conf_res_t res; |
1410 | |
1411 | res = prop->set(scope, conf, prop->name, istr, |
1412 | _RK_PTR(void *, conf, prop->offset), |
1413 | set_mode, errstr, errstr_size); |
1414 | |
1415 | if (res != RD_KAFKA_CONF_OK) |
1416 | return res; |
1417 | |
1418 | /* FALLTHRU so that property value is set. */ |
1419 | } |
1420 | |
1421 | switch (prop->type) |
1422 | { |
1423 | case _RK_C_STR: |
1424 | { |
1425 | char **str = _RK_PTR(char **, conf, prop->offset); |
1426 | if (*str) |
1427 | rd_free(*str); |
1428 | if (istr) |
1429 | *str = rd_strdup(istr); |
1430 | else |
1431 | *str = prop->sdef ? rd_strdup(prop->sdef) : NULL; |
1432 | break; |
1433 | } |
1434 | case _RK_C_KSTR: |
1435 | { |
1436 | rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf, |
1437 | prop->offset); |
1438 | if (*kstr) |
1439 | rd_kafkap_str_destroy(*kstr); |
1440 | if (istr) |
1441 | *kstr = rd_kafkap_str_new(istr, -1); |
1442 | else |
1443 | *kstr = prop->sdef ? |
1444 | rd_kafkap_str_new(prop->sdef, -1) : NULL; |
1445 | break; |
1446 | } |
1447 | case _RK_C_PTR: |
1448 | *_RK_PTR(const void **, conf, prop->offset) = istr; |
1449 | break; |
1450 | case _RK_C_BOOL: |
1451 | case _RK_C_INT: |
1452 | case _RK_C_S2I: |
1453 | case _RK_C_S2F: |
1454 | { |
1455 | int *val = _RK_PTR(int *, conf, prop->offset); |
1456 | |
1457 | if (prop->type == _RK_C_S2F) { |
1458 | switch (set_mode) |
1459 | { |
1460 | case _RK_CONF_PROP_SET_REPLACE: |
1461 | *val = ival; |
1462 | break; |
1463 | case _RK_CONF_PROP_SET_ADD: |
1464 | *val |= ival; |
1465 | break; |
1466 | case _RK_CONF_PROP_SET_DEL: |
1467 | *val &= ~ival; |
1468 | break; |
1469 | } |
1470 | } else { |
1471 | /* Single assignment */ |
1472 | *val = ival; |
1473 | |
1474 | } |
1475 | break; |
1476 | } |
1477 | case _RK_C_PATLIST: |
1478 | { |
1479 | /* Split comma-separated list into individual regex expressions |
1480 | * that are verified and then append to the provided list. */ |
1481 | rd_kafka_pattern_list_t **plist; |
1482 | |
1483 | plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset); |
1484 | |
1485 | if (*plist) |
1486 | rd_kafka_pattern_list_destroy(*plist); |
1487 | |
1488 | if (istr) { |
1489 | if (!(*plist = |
1490 | rd_kafka_pattern_list_new(istr, |
1491 | errstr, |
1492 | (int)errstr_size))) |
1493 | return RD_KAFKA_CONF_INVALID; |
1494 | } else |
1495 | *plist = NULL; |
1496 | |
1497 | break; |
1498 | } |
1499 | |
1500 | case _RK_C_INTERNAL: |
1501 | /* Probably handled by setter */ |
1502 | break; |
1503 | |
1504 | default: |
1505 | rd_kafka_assert(NULL, !*"unknown conf type" ); |
1506 | } |
1507 | |
1508 | |
1509 | rd_kafka_anyconf_set_modified(conf, prop, 1/*modified*/); |
1510 | return RD_KAFKA_CONF_OK; |
1511 | } |
1512 | |
1513 | |
1514 | /** |
1515 | * @brief Find s2i (string-to-int mapping) entry and return its array index, |
1516 | * or -1 on miss. |
1517 | */ |
1518 | static int rd_kafka_conf_s2i_find (const struct rd_kafka_property *prop, |
1519 | const char *value) { |
1520 | int j; |
1521 | |
1522 | for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
1523 | if (prop->s2i[j].str && |
1524 | !rd_strcasecmp(prop->s2i[j].str, value)) |
1525 | return j; |
1526 | } |
1527 | |
1528 | return -1; |
1529 | } |
1530 | |
1531 | |
1532 | /** |
1533 | * @brief Set configuration property. |
1534 | * |
1535 | * @param allow_specific Allow rd_kafka_*conf_set_..() to be set, |
1536 | * such as rd_kafka_conf_set_log_cb(). |
1537 | * Should not be allowed from the conf_set() string interface. |
1538 | */ |
1539 | static rd_kafka_conf_res_t |
1540 | rd_kafka_anyconf_set_prop (int scope, void *conf, |
1541 | const struct rd_kafka_property *prop, |
1542 | const char *value, |
1543 | int allow_specific, |
1544 | char *errstr, size_t errstr_size) { |
1545 | int ival; |
1546 | |
1547 | switch (prop->type) |
1548 | { |
1549 | case _RK_C_STR: |
1550 | case _RK_C_KSTR: |
1551 | if (prop->s2i[0].str) { |
1552 | int match; |
1553 | |
1554 | if (!value || |
1555 | (match = rd_kafka_conf_s2i_find(prop, value)) == -1){ |
1556 | rd_snprintf(errstr, errstr_size, |
1557 | "Invalid value for " |
1558 | "configuration property \"%s\": " |
1559 | "%s" , |
1560 | prop->name, value); |
1561 | return RD_KAFKA_CONF_INVALID; |
1562 | } |
1563 | |
1564 | /* Replace value string with canonical form */ |
1565 | value = prop->s2i[match].str; |
1566 | } |
1567 | /* FALLTHRU */ |
1568 | case _RK_C_PATLIST: |
1569 | if (prop->validate && |
1570 | (!value || !prop->validate(prop, value, -1))) { |
1571 | rd_snprintf(errstr, errstr_size, |
1572 | "Invalid value for " |
1573 | "configuration property \"%s\": %s" , |
1574 | prop->name, value); |
1575 | return RD_KAFKA_CONF_INVALID; |
1576 | } |
1577 | |
1578 | return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0, |
1579 | _RK_CONF_PROP_SET_REPLACE, |
1580 | errstr, errstr_size); |
1581 | |
1582 | case _RK_C_PTR: |
1583 | /* Allow hidden internal unit test properties to |
1584 | * be set from generic conf_set() interface. */ |
1585 | if (!allow_specific && !(prop->scope & _RK_HIDDEN)) { |
1586 | rd_snprintf(errstr, errstr_size, |
1587 | "Property \"%s\" must be set through " |
1588 | "dedicated .._set_..() function" , |
1589 | prop->name); |
1590 | return RD_KAFKA_CONF_INVALID; |
1591 | } |
1592 | return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0, |
1593 | _RK_CONF_PROP_SET_REPLACE, |
1594 | errstr, errstr_size); |
1595 | |
1596 | case _RK_C_BOOL: |
1597 | if (!value) { |
1598 | rd_snprintf(errstr, errstr_size, |
1599 | "Bool configuration property \"%s\" cannot " |
1600 | "be set to empty value" , prop->name); |
1601 | return RD_KAFKA_CONF_INVALID; |
1602 | } |
1603 | |
1604 | |
1605 | if (!rd_strcasecmp(value, "true" ) || |
1606 | !rd_strcasecmp(value, "t" ) || |
1607 | !strcmp(value, "1" )) |
1608 | ival = 1; |
1609 | else if (!rd_strcasecmp(value, "false" ) || |
1610 | !rd_strcasecmp(value, "f" ) || |
1611 | !strcmp(value, "0" )) |
1612 | ival = 0; |
1613 | else { |
1614 | rd_snprintf(errstr, errstr_size, |
1615 | "Expected bool value for \"%s\": " |
1616 | "true or false" , prop->name); |
1617 | return RD_KAFKA_CONF_INVALID; |
1618 | } |
1619 | |
1620 | rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival, |
1621 | _RK_CONF_PROP_SET_REPLACE, |
1622 | errstr, errstr_size); |
1623 | return RD_KAFKA_CONF_OK; |
1624 | |
1625 | case _RK_C_INT: |
1626 | { |
1627 | const char *end; |
1628 | |
1629 | if (!value) { |
1630 | rd_snprintf(errstr, errstr_size, |
1631 | "Integer configuration " |
1632 | "property \"%s\" cannot be set " |
1633 | "to empty value" , prop->name); |
1634 | return RD_KAFKA_CONF_INVALID; |
1635 | } |
1636 | |
1637 | ival = (int)strtol(value, (char **)&end, 0); |
1638 | if (end == value) { |
1639 | /* Non numeric, check s2i for string mapping */ |
1640 | int match = rd_kafka_conf_s2i_find(prop, value); |
1641 | |
1642 | if (match == -1) { |
1643 | rd_snprintf(errstr, errstr_size, |
1644 | "Invalid value for " |
1645 | "configuration property \"%s\"" , |
1646 | prop->name); |
1647 | return RD_KAFKA_CONF_INVALID; |
1648 | } |
1649 | |
1650 | ival = prop->s2i[match].val; |
1651 | } |
1652 | |
1653 | if (ival < prop->vmin || |
1654 | ival > prop->vmax) { |
1655 | rd_snprintf(errstr, errstr_size, |
1656 | "Configuration property \"%s\" value " |
1657 | "%i is outside allowed range %i..%i\n" , |
1658 | prop->name, ival, |
1659 | prop->vmin, |
1660 | prop->vmax); |
1661 | return RD_KAFKA_CONF_INVALID; |
1662 | } |
1663 | |
1664 | rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival, |
1665 | _RK_CONF_PROP_SET_REPLACE, |
1666 | errstr, errstr_size); |
1667 | return RD_KAFKA_CONF_OK; |
1668 | } |
1669 | |
1670 | case _RK_C_S2I: |
1671 | case _RK_C_S2F: |
1672 | { |
1673 | int j; |
1674 | const char *next; |
1675 | |
1676 | if (!value) { |
1677 | rd_snprintf(errstr, errstr_size, |
1678 | "Configuration " |
1679 | "property \"%s\" cannot be set " |
1680 | "to empty value" , prop->name); |
1681 | return RD_KAFKA_CONF_INVALID; |
1682 | } |
1683 | |
1684 | next = value; |
1685 | while (next && *next) { |
1686 | const char *s, *t; |
1687 | rd_kafka_conf_set_mode_t set_mode = _RK_CONF_PROP_SET_ADD; /* S2F */ |
1688 | |
1689 | s = next; |
1690 | |
1691 | if (prop->type == _RK_C_S2F && |
1692 | (t = strchr(s, ','))) { |
1693 | /* CSV flag field */ |
1694 | next = t+1; |
1695 | } else { |
1696 | /* Single string */ |
1697 | t = s+strlen(s); |
1698 | next = NULL; |
1699 | } |
1700 | |
1701 | |
1702 | /* Left trim */ |
1703 | while (s < t && isspace((int)*s)) |
1704 | s++; |
1705 | |
1706 | /* Right trim */ |
1707 | while (t > s && isspace((int)*t)) |
1708 | t--; |
1709 | |
1710 | /* S2F: +/- prefix */ |
1711 | if (prop->type == _RK_C_S2F) { |
1712 | if (*s == '+') { |
1713 | set_mode = _RK_CONF_PROP_SET_ADD; |
1714 | s++; |
1715 | } else if (*s == '-') { |
1716 | set_mode = _RK_CONF_PROP_SET_DEL; |
1717 | s++; |
1718 | } |
1719 | } |
1720 | |
1721 | /* Empty string? */ |
1722 | if (s == t) |
1723 | continue; |
1724 | |
1725 | /* Match string to s2i table entry */ |
1726 | for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
1727 | int new_val; |
1728 | |
1729 | if (!prop->s2i[j].str) |
1730 | continue; |
1731 | |
1732 | if (strlen(prop->s2i[j].str) == (size_t)(t-s) && |
1733 | !rd_strncasecmp(prop->s2i[j].str, s, |
1734 | (int)(t-s))) |
1735 | new_val = prop->s2i[j].val; |
1736 | else |
1737 | continue; |
1738 | |
1739 | rd_kafka_anyconf_set_prop0(scope, conf, prop, |
1740 | value, new_val, |
1741 | set_mode, |
1742 | errstr, errstr_size); |
1743 | |
1744 | if (prop->type == _RK_C_S2F) { |
1745 | /* Flags: OR it in: do next */ |
1746 | break; |
1747 | } else { |
1748 | /* Single assignment */ |
1749 | return RD_KAFKA_CONF_OK; |
1750 | } |
1751 | } |
1752 | |
1753 | /* S2F: Good match: continue with next */ |
1754 | if (j < (int)RD_ARRAYSIZE(prop->s2i)) |
1755 | continue; |
1756 | |
1757 | /* No match */ |
1758 | rd_snprintf(errstr, errstr_size, |
1759 | "Invalid value \"%.*s\" for " |
1760 | "configuration property \"%s\"" , |
1761 | (int)(t-s), s, prop->name); |
1762 | return RD_KAFKA_CONF_INVALID; |
1763 | |
1764 | } |
1765 | return RD_KAFKA_CONF_OK; |
1766 | } |
1767 | |
1768 | case _RK_C_INTERNAL: |
1769 | rd_snprintf(errstr, errstr_size, |
1770 | "Internal property \"%s\" not settable" , |
1771 | prop->name); |
1772 | return RD_KAFKA_CONF_INVALID; |
1773 | |
1774 | case _RK_C_INVALID: |
1775 | rd_snprintf(errstr, errstr_size, "%s" , prop->desc); |
1776 | return RD_KAFKA_CONF_INVALID; |
1777 | |
1778 | default: |
1779 | rd_kafka_assert(NULL, !*"unknown conf type" ); |
1780 | } |
1781 | |
1782 | /* not reachable */ |
1783 | return RD_KAFKA_CONF_INVALID; |
1784 | } |
1785 | |
1786 | |
1787 | |
1788 | static void rd_kafka_defaultconf_set (int scope, void *conf) { |
1789 | const struct rd_kafka_property *prop; |
1790 | |
1791 | for (prop = rd_kafka_properties ; prop->name ; prop++) { |
1792 | if (!(prop->scope & scope)) |
1793 | continue; |
1794 | |
1795 | if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID) |
1796 | continue; |
1797 | |
1798 | if (prop->ctor) |
1799 | prop->ctor(scope, conf); |
1800 | |
1801 | if (prop->sdef || prop->vdef || prop->pdef) |
1802 | rd_kafka_anyconf_set_prop0(scope, conf, prop, |
1803 | prop->sdef ? |
1804 | prop->sdef : prop->pdef, |
1805 | prop->vdef, |
1806 | _RK_CONF_PROP_SET_REPLACE, |
1807 | NULL, 0); |
1808 | } |
1809 | } |
1810 | |
1811 | rd_kafka_conf_t *rd_kafka_conf_new (void) { |
1812 | rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf)); |
1813 | rd_kafka_defaultconf_set(_RK_GLOBAL, conf); |
1814 | rd_kafka_anyconf_clear_all_is_modified(conf); |
1815 | return conf; |
1816 | } |
1817 | |
1818 | rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) { |
1819 | rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf)); |
1820 | rd_kafka_defaultconf_set(_RK_TOPIC, tconf); |
1821 | rd_kafka_anyconf_clear_all_is_modified(tconf); |
1822 | return tconf; |
1823 | } |
1824 | |
1825 | |
1826 | static int rd_kafka_anyconf_set (int scope, void *conf, |
1827 | const char *name, const char *value, |
1828 | char *errstr, size_t errstr_size) { |
1829 | char estmp[1]; |
1830 | const struct rd_kafka_property *prop; |
1831 | rd_kafka_conf_res_t res; |
1832 | |
1833 | if (!errstr) { |
1834 | errstr = estmp; |
1835 | errstr_size = 0; |
1836 | } |
1837 | |
1838 | if (value && !*value) |
1839 | value = NULL; |
1840 | |
1841 | /* Try interceptors first (only for GLOBAL config for now) */ |
1842 | if (scope & _RK_GLOBAL) { |
1843 | res = rd_kafka_interceptors_on_conf_set( |
1844 | (rd_kafka_conf_t *)conf, name, value, |
1845 | errstr, errstr_size); |
1846 | /* Handled (successfully or not) by interceptor. */ |
1847 | if (res != RD_KAFKA_CONF_UNKNOWN) |
1848 | return res; |
1849 | } |
1850 | |
1851 | /* Then global config */ |
1852 | |
1853 | |
1854 | for (prop = rd_kafka_properties ; prop->name ; prop++) { |
1855 | |
1856 | if (!(prop->scope & scope)) |
1857 | continue; |
1858 | |
1859 | if (strcmp(prop->name, name)) |
1860 | continue; |
1861 | |
1862 | if (prop->type == _RK_C_ALIAS) |
1863 | return rd_kafka_anyconf_set(scope, conf, |
1864 | prop->sdef, value, |
1865 | errstr, errstr_size); |
1866 | |
1867 | return rd_kafka_anyconf_set_prop(scope, conf, prop, value, |
1868 | 0/*don't allow specifics*/, |
1869 | errstr, errstr_size); |
1870 | } |
1871 | |
1872 | rd_snprintf(errstr, errstr_size, |
1873 | "No such configuration property: \"%s\"" , name); |
1874 | |
1875 | return RD_KAFKA_CONF_UNKNOWN; |
1876 | } |
1877 | |
1878 | |
1879 | /** |
1880 | * @brief Set a rd_kafka_*_conf_set_...() specific property, such as |
1881 | * rd_kafka_conf_set_error_cb(). |
1882 | * |
1883 | * @warning Will not call interceptor's on_conf_set. |
1884 | * @warning Asserts if \p name is not known or value is incorrect. |
1885 | * |
1886 | * Implemented as a macro to have rd_assert() print the original function. |
1887 | */ |
1888 | |
1889 | #define rd_kafka_anyconf_set_internal(SCOPE,CONF,NAME,VALUE) do { \ |
1890 | const struct rd_kafka_property *_prop; \ |
1891 | rd_kafka_conf_res_t _res; \ |
1892 | _prop = rd_kafka_conf_prop_find(SCOPE, NAME); \ |
1893 | rd_assert(_prop && *"invalid property name"); \ |
1894 | _res = rd_kafka_anyconf_set_prop(SCOPE, CONF, _prop, \ |
1895 | (const void *)VALUE, \ |
1896 | 1/*allow-specifics*/, \ |
1897 | NULL, 0); \ |
1898 | rd_assert(_res == RD_KAFKA_CONF_OK); \ |
1899 | } while (0) |
1900 | |
1901 | |
1902 | rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, |
1903 | const char *name, |
1904 | const char *value, |
1905 | char *errstr, size_t errstr_size) { |
1906 | rd_kafka_conf_res_t res; |
1907 | |
1908 | res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value, |
1909 | errstr, errstr_size); |
1910 | if (res != RD_KAFKA_CONF_UNKNOWN) |
1911 | return res; |
1912 | |
1913 | /* Fallthru: |
1914 | * If the global property was unknown, try setting it on the |
1915 | * default topic config. */ |
1916 | if (!conf->topic_conf) { |
1917 | /* Create topic config, might be over-written by application |
1918 | * later. */ |
1919 | rd_kafka_conf_set_default_topic_conf(conf, |
1920 | rd_kafka_topic_conf_new()); |
1921 | } |
1922 | |
1923 | return rd_kafka_topic_conf_set(conf->topic_conf, name, value, |
1924 | errstr, errstr_size); |
1925 | } |
1926 | |
1927 | |
1928 | rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf, |
1929 | const char *name, |
1930 | const char *value, |
1931 | char *errstr, size_t errstr_size) { |
1932 | if (!strncmp(name, "topic." , strlen("topic." ))) |
1933 | name += strlen("topic." ); |
1934 | |
1935 | return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value, |
1936 | errstr, errstr_size); |
1937 | } |
1938 | |
1939 | |
1940 | /** |
1941 | * @brief Overwrites the contents of \p str up until but not including |
1942 | * the nul-term. |
1943 | */ |
1944 | void rd_kafka_desensitize_str (char *str) { |
1945 | size_t len; |
1946 | static const char redacted[] = "(REDACTED)" ; |
1947 | |
1948 | #ifdef _MSC_VER |
1949 | len = strlen(str); |
1950 | SecureZeroMemory(str, len); |
1951 | #else |
1952 | volatile char *volatile s; |
1953 | |
1954 | for (s = str ; *s ; s++) |
1955 | *s = '\0'; |
1956 | |
1957 | len = (size_t)(s - str); |
1958 | #endif |
1959 | |
1960 | if (len > sizeof(redacted)) |
1961 | memcpy(str, redacted, sizeof(redacted)); |
1962 | } |
1963 | |
1964 | |
1965 | |
1966 | |
1967 | /** |
1968 | * @brief Overwrite the value of \p prop, if sensitive. |
1969 | */ |
1970 | static RD_INLINE void |
1971 | rd_kafka_anyconf_prop_desensitize (int scope, void *conf, |
1972 | const struct rd_kafka_property *prop) { |
1973 | if (likely(!(prop->scope & _RK_SENSITIVE))) |
1974 | return; |
1975 | |
1976 | switch (prop->type) |
1977 | { |
1978 | case _RK_C_STR: |
1979 | { |
1980 | char **str = _RK_PTR(char **, conf, prop->offset); |
1981 | if (*str) |
1982 | rd_kafka_desensitize_str(*str); |
1983 | break; |
1984 | } |
1985 | |
1986 | default: |
1987 | rd_assert(!*"BUG: Don't know how to desensitize prop type" ); |
1988 | break; |
1989 | } |
1990 | } |
1991 | |
1992 | |
1993 | /** |
1994 | * @brief Desensitize all sensitive properties in \p conf |
1995 | */ |
1996 | static void rd_kafka_anyconf_desensitize (int scope, void *conf) { |
1997 | const struct rd_kafka_property *prop; |
1998 | |
1999 | for (prop = rd_kafka_properties; prop->name ; prop++) { |
2000 | if (!(prop->scope & scope)) |
2001 | continue; |
2002 | |
2003 | rd_kafka_anyconf_prop_desensitize(scope, conf, prop); |
2004 | } |
2005 | } |
2006 | |
2007 | /** |
2008 | * @brief Overwrite the values of sensitive properties |
2009 | */ |
2010 | void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf) { |
2011 | if (conf->topic_conf) |
2012 | rd_kafka_anyconf_desensitize(_RK_TOPIC, |
2013 | conf->topic_conf); |
2014 | rd_kafka_anyconf_desensitize(_RK_GLOBAL, conf); |
2015 | } |
2016 | |
2017 | /** |
2018 | * @brief Overwrite the values of sensitive properties |
2019 | */ |
2020 | void rd_kafka_topic_conf_desensitize (rd_kafka_topic_conf_t *tconf) { |
2021 | rd_kafka_anyconf_desensitize(_RK_TOPIC, tconf); |
2022 | } |
2023 | |
2024 | |
2025 | static void rd_kafka_anyconf_clear (int scope, void *conf, |
2026 | const struct rd_kafka_property *prop) { |
2027 | |
2028 | rd_kafka_anyconf_prop_desensitize(scope, conf, prop); |
2029 | |
2030 | switch (prop->type) |
2031 | { |
2032 | case _RK_C_STR: |
2033 | { |
2034 | char **str = _RK_PTR(char **, conf, prop->offset); |
2035 | |
2036 | if (*str) { |
2037 | if (prop->set) { |
2038 | prop->set(scope, conf, prop->name, NULL, *str, |
2039 | _RK_CONF_PROP_SET_DEL, NULL, 0); |
2040 | /* FALLTHRU */ |
2041 | } |
2042 | rd_free(*str); |
2043 | *str = NULL; |
2044 | } |
2045 | } |
2046 | break; |
2047 | |
2048 | case _RK_C_KSTR: |
2049 | { |
2050 | rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf, |
2051 | prop->offset); |
2052 | if (*kstr) { |
2053 | rd_kafkap_str_destroy(*kstr); |
2054 | *kstr = NULL; |
2055 | } |
2056 | } |
2057 | break; |
2058 | |
2059 | case _RK_C_PATLIST: |
2060 | { |
2061 | rd_kafka_pattern_list_t **plist; |
2062 | plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset); |
2063 | if (*plist) { |
2064 | rd_kafka_pattern_list_destroy(*plist); |
2065 | *plist = NULL; |
2066 | } |
2067 | } |
2068 | break; |
2069 | |
2070 | case _RK_C_PTR: |
2071 | if (_RK_PTR(void *, conf, prop->offset) != NULL) { |
2072 | if (!strcmp(prop->name, "default_topic_conf" )) { |
2073 | rd_kafka_topic_conf_t **tconf; |
2074 | |
2075 | tconf = _RK_PTR(rd_kafka_topic_conf_t **, |
2076 | conf, prop->offset); |
2077 | if (*tconf) { |
2078 | rd_kafka_topic_conf_destroy(*tconf); |
2079 | *tconf = NULL; |
2080 | } |
2081 | } |
2082 | } |
2083 | break; |
2084 | |
2085 | default: |
2086 | break; |
2087 | } |
2088 | |
2089 | if (prop->dtor) |
2090 | prop->dtor(scope, conf); |
2091 | |
2092 | } |
2093 | |
2094 | void rd_kafka_anyconf_destroy (int scope, void *conf) { |
2095 | const struct rd_kafka_property *prop; |
2096 | |
2097 | /* Call on_conf_destroy() interceptors */ |
2098 | if (scope == _RK_GLOBAL) |
2099 | rd_kafka_interceptors_on_conf_destroy(conf); |
2100 | |
2101 | for (prop = rd_kafka_properties; prop->name ; prop++) { |
2102 | if (!(prop->scope & scope)) |
2103 | continue; |
2104 | |
2105 | rd_kafka_anyconf_clear(scope, conf, prop); |
2106 | } |
2107 | } |
2108 | |
2109 | |
2110 | void rd_kafka_conf_destroy (rd_kafka_conf_t *conf) { |
2111 | rd_kafka_anyconf_destroy(_RK_GLOBAL, conf); |
2112 | //FIXME: partition_assignors |
2113 | rd_free(conf); |
2114 | } |
2115 | |
2116 | void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf) { |
2117 | rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf); |
2118 | rd_free(topic_conf); |
2119 | } |
2120 | |
2121 | |
2122 | |
2123 | static void rd_kafka_anyconf_copy (int scope, void *dst, const void *src, |
2124 | size_t filter_cnt, const char **filter) { |
2125 | const struct rd_kafka_property *prop; |
2126 | |
2127 | for (prop = rd_kafka_properties ; prop->name ; prop++) { |
2128 | const char *val = NULL; |
2129 | int ival = 0; |
2130 | char *valstr; |
2131 | size_t valsz; |
2132 | size_t fi; |
2133 | size_t nlen; |
2134 | |
2135 | if (!(prop->scope & scope)) |
2136 | continue; |
2137 | |
2138 | if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID) |
2139 | continue; |
2140 | |
2141 | /* Skip properties that have not been set, |
2142 | * unless it is an internal one which requires |
2143 | * extra logic, such as the interceptors. */ |
2144 | if (!rd_kafka_anyconf_is_modified(src, prop) && |
2145 | prop->type != _RK_C_INTERNAL) |
2146 | continue; |
2147 | |
2148 | /* Apply filter, if any. */ |
2149 | nlen = strlen(prop->name); |
2150 | for (fi = 0 ; fi < filter_cnt ; fi++) { |
2151 | size_t flen = strlen(filter[fi]); |
2152 | if (nlen >= flen && |
2153 | !strncmp(filter[fi], prop->name, flen)) |
2154 | break; |
2155 | } |
2156 | if (fi < filter_cnt) |
2157 | continue; /* Filter matched */ |
2158 | |
2159 | switch (prop->type) |
2160 | { |
2161 | case _RK_C_STR: |
2162 | case _RK_C_PTR: |
2163 | val = *_RK_PTR(const char **, src, prop->offset); |
2164 | |
2165 | if (!strcmp(prop->name, "default_topic_conf" ) && val) |
2166 | val = (void *)rd_kafka_topic_conf_dup( |
2167 | (const rd_kafka_topic_conf_t *) |
2168 | (void *)val); |
2169 | break; |
2170 | case _RK_C_KSTR: |
2171 | { |
2172 | rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, |
2173 | src, prop->offset); |
2174 | if (*kstr) |
2175 | val = (*kstr)->str; |
2176 | break; |
2177 | } |
2178 | |
2179 | case _RK_C_BOOL: |
2180 | case _RK_C_INT: |
2181 | case _RK_C_S2I: |
2182 | case _RK_C_S2F: |
2183 | ival = *_RK_PTR(const int *, src, prop->offset); |
2184 | |
2185 | /* Get string representation of configuration value. */ |
2186 | valsz = 0; |
2187 | rd_kafka_anyconf_get0(src, prop, NULL, &valsz); |
2188 | valstr = rd_alloca(valsz); |
2189 | rd_kafka_anyconf_get0(src, prop, valstr, &valsz); |
2190 | val = valstr; |
2191 | break; |
2192 | case _RK_C_PATLIST: |
2193 | { |
2194 | const rd_kafka_pattern_list_t **plist; |
2195 | plist = _RK_PTR(const rd_kafka_pattern_list_t **, |
2196 | src, prop->offset); |
2197 | if (*plist) |
2198 | val = (*plist)->rkpl_orig; |
2199 | break; |
2200 | } |
2201 | case _RK_C_INTERNAL: |
2202 | /* Handled by ->copy() below. */ |
2203 | break; |
2204 | default: |
2205 | continue; |
2206 | } |
2207 | |
2208 | if (prop->copy) |
2209 | prop->copy(scope, dst, src, |
2210 | _RK_PTR(void *, dst, prop->offset), |
2211 | _RK_PTR(const void *, src, prop->offset), |
2212 | filter_cnt, filter); |
2213 | |
2214 | rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival, |
2215 | _RK_CONF_PROP_SET_REPLACE, NULL, 0); |
2216 | } |
2217 | } |
2218 | |
2219 | |
2220 | rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf) { |
2221 | rd_kafka_conf_t *new = rd_kafka_conf_new(); |
2222 | |
2223 | rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL); |
2224 | |
2225 | rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL); |
2226 | |
2227 | return new; |
2228 | } |
2229 | |
2230 | rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf, |
2231 | size_t filter_cnt, |
2232 | const char **filter) { |
2233 | rd_kafka_conf_t *new = rd_kafka_conf_new(); |
2234 | |
2235 | rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter); |
2236 | |
2237 | rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter); |
2238 | |
2239 | return new; |
2240 | } |
2241 | |
2242 | |
2243 | rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t |
2244 | *conf) { |
2245 | rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new(); |
2246 | |
2247 | rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL); |
2248 | |
2249 | return new; |
2250 | } |
2251 | |
2252 | rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup (rd_kafka_t *rk) { |
2253 | if (rk->rk_conf.topic_conf) |
2254 | return rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf); |
2255 | else |
2256 | return rd_kafka_topic_conf_new(); |
2257 | } |
2258 | |
2259 | void rd_kafka_conf_set_events (rd_kafka_conf_t *conf, int events) { |
2260 | char tmp[32]; |
2261 | rd_snprintf(tmp, sizeof(tmp), "%d" , events); |
2262 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "enabled_events" , tmp); |
2263 | } |
2264 | |
2265 | void |
2266 | rd_kafka_conf_set_background_event_cb (rd_kafka_conf_t *conf, |
2267 | void (*event_cb) (rd_kafka_t *rk, |
2268 | rd_kafka_event_t *rkev, |
2269 | void *opaque)) { |
2270 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "background_event_cb" , |
2271 | event_cb); |
2272 | } |
2273 | |
2274 | |
2275 | void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf, |
2276 | void (*dr_cb) (rd_kafka_t *rk, |
2277 | void *payload, size_t len, |
2278 | rd_kafka_resp_err_t err, |
2279 | void *opaque, void *msg_opaque)) { |
2280 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_cb" , dr_cb); |
2281 | } |
2282 | |
2283 | |
2284 | void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf, |
2285 | void (*dr_msg_cb) (rd_kafka_t *rk, |
2286 | const rd_kafka_message_t * |
2287 | rkmessage, |
2288 | void *opaque)) { |
2289 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "dr_msg_cb" , dr_msg_cb); |
2290 | } |
2291 | |
2292 | |
2293 | void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, |
2294 | void (*consume_cb) (rd_kafka_message_t * |
2295 | rkmessage, |
2296 | void *opaque)) { |
2297 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "consume_cb" , |
2298 | consume_cb); |
2299 | } |
2300 | |
2301 | void rd_kafka_conf_set_rebalance_cb ( |
2302 | rd_kafka_conf_t *conf, |
2303 | void (*rebalance_cb) (rd_kafka_t *rk, |
2304 | rd_kafka_resp_err_t err, |
2305 | rd_kafka_topic_partition_list_t *partitions, |
2306 | void *opaque)) { |
2307 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "rebalance_cb" , |
2308 | rebalance_cb); |
2309 | } |
2310 | |
2311 | void rd_kafka_conf_set_offset_commit_cb ( |
2312 | rd_kafka_conf_t *conf, |
2313 | void (*offset_commit_cb) (rd_kafka_t *rk, |
2314 | rd_kafka_resp_err_t err, |
2315 | rd_kafka_topic_partition_list_t *offsets, |
2316 | void *opaque)) { |
2317 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "offset_commit_cb" , |
2318 | offset_commit_cb); |
2319 | } |
2320 | |
2321 | |
2322 | |
2323 | void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf, |
2324 | void (*error_cb) (rd_kafka_t *rk, int err, |
2325 | const char *reason, |
2326 | void *opaque)) { |
2327 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "error_cb" , error_cb); |
2328 | } |
2329 | |
2330 | |
2331 | void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf, |
2332 | void (*throttle_cb) ( |
2333 | rd_kafka_t *rk, |
2334 | const char *broker_name, |
2335 | int32_t broker_id, |
2336 | int throttle_time_ms, |
2337 | void *opaque)) { |
2338 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "throttle_cb" , |
2339 | throttle_cb); |
2340 | } |
2341 | |
2342 | |
2343 | void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf, |
2344 | void (*log_cb) (const rd_kafka_t *rk, int level, |
2345 | const char *fac, const char *buf)) { |
2346 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "log_cb" , log_cb); |
2347 | } |
2348 | |
2349 | |
2350 | void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf, |
2351 | int (*stats_cb) (rd_kafka_t *rk, |
2352 | char *json, |
2353 | size_t json_len, |
2354 | void *opaque)) { |
2355 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "stats_cb" , stats_cb); |
2356 | } |
2357 | |
2358 | void rd_kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_conf_t *conf, |
2359 | void (*oauthbearer_token_refresh_cb) ( |
2360 | rd_kafka_t *rk, |
2361 | const char *oauthbearer_config, |
2362 | void *opaque)) { |
2363 | #if WITH_SASL_OAUTHBEARER |
2364 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, |
2365 | "oauthbearer_token_refresh_cb" , oauthbearer_token_refresh_cb); |
2366 | #endif |
2367 | } |
2368 | |
2369 | void rd_kafka_conf_set_socket_cb (rd_kafka_conf_t *conf, |
2370 | int (*socket_cb) (int domain, int type, |
2371 | int protocol, |
2372 | void *opaque)) { |
2373 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "socket_cb" , |
2374 | socket_cb); |
2375 | } |
2376 | |
2377 | void |
2378 | rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf, |
2379 | int (*connect_cb) (int sockfd, |
2380 | const struct sockaddr *addr, |
2381 | int addrlen, |
2382 | const char *id, |
2383 | void *opaque)) { |
2384 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "connect_cb" , |
2385 | connect_cb); |
2386 | } |
2387 | |
2388 | void |
2389 | rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf, |
2390 | int (*closesocket_cb) (int sockfd, |
2391 | void *opaque)) { |
2392 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "closesocket_cb" , |
2393 | closesocket_cb); |
2394 | } |
2395 | |
2396 | |
2397 | |
2398 | #ifndef _MSC_VER |
2399 | void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf, |
2400 | int (*open_cb) (const char *pathname, |
2401 | int flags, mode_t mode, |
2402 | void *opaque)) { |
2403 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "open_cb" , open_cb); |
2404 | } |
2405 | #endif |
2406 | |
2407 | |
2408 | rd_kafka_conf_res_t |
2409 | rd_kafka_conf_set_ssl_cert_verify_cb ( |
2410 | rd_kafka_conf_t *conf, |
2411 | int (*ssl_cert_verify_cb) (rd_kafka_t *rk, |
2412 | const char *broker_name, |
2413 | int32_t broker_id, |
2414 | int *x509_set_error, |
2415 | int depth, |
2416 | const char *buf, size_t size, |
2417 | char *errstr, size_t errstr_size, |
2418 | void *opaque)) { |
2419 | #if !WITH_SSL |
2420 | return RD_KAFKA_CONF_INVALID; |
2421 | #else |
2422 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, |
2423 | "ssl.certificate.verify_cb" , |
2424 | ssl_cert_verify_cb); |
2425 | return RD_KAFKA_CONF_OK; |
2426 | #endif |
2427 | } |
2428 | |
2429 | |
2430 | void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque) { |
2431 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "opaque" , opaque); |
2432 | } |
2433 | |
2434 | |
2435 | void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf, |
2436 | rd_kafka_topic_conf_t *tconf) { |
2437 | if (conf->topic_conf) |
2438 | rd_kafka_topic_conf_destroy(conf->topic_conf); |
2439 | |
2440 | rd_kafka_anyconf_set_internal(_RK_GLOBAL, conf, "default_topic_conf" , |
2441 | tconf); |
2442 | } |
2443 | |
2444 | |
2445 | void |
2446 | rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf, |
2447 | int32_t (*partitioner) ( |
2448 | const rd_kafka_topic_t *rkt, |
2449 | const void *keydata, |
2450 | size_t keylen, |
2451 | int32_t partition_cnt, |
2452 | void *rkt_opaque, |
2453 | void *msg_opaque)) { |
2454 | rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "partitioner_cb" , |
2455 | partitioner); |
2456 | } |
2457 | |
2458 | void |
2459 | rd_kafka_topic_conf_set_msg_order_cmp (rd_kafka_topic_conf_t *topic_conf, |
2460 | int (*msg_order_cmp) ( |
2461 | const rd_kafka_message_t *a, |
2462 | const rd_kafka_message_t *b)) { |
2463 | rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "msg_order_cmp" , |
2464 | msg_order_cmp); |
2465 | } |
2466 | |
2467 | void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *topic_conf, |
2468 | void *opaque) { |
2469 | rd_kafka_anyconf_set_internal(_RK_TOPIC, topic_conf, "opaque" , opaque); |
2470 | } |
2471 | |
2472 | |
2473 | |
2474 | |
2475 | /** |
2476 | * @brief Convert flags \p ival to csv-string using S2F property \p prop. |
2477 | * |
2478 | * This function has two modes: size query and write. |
2479 | * To query for needed size call with dest==NULL, |
2480 | * to write to buffer of size dest_size call with dest!=NULL. |
2481 | * |
2482 | * An \p ival of -1 means all. |
2483 | * |
2484 | * @returns the number of bytes written to \p dest (if not NULL), else the |
2485 | * total number of bytes needed. |
2486 | * |
2487 | */ |
2488 | size_t rd_kafka_conf_flags2str (char *dest, size_t dest_size, const char *delim, |
2489 | const struct rd_kafka_property *prop, |
2490 | int ival) { |
2491 | size_t of = 0; |
2492 | int j; |
2493 | |
2494 | if (dest && dest_size > 0) |
2495 | *dest = '\0'; |
2496 | |
2497 | /* Phase 1: scan for set flags, accumulate needed size. |
2498 | * Phase 2: write to dest */ |
2499 | for (j = 0 ; prop->s2i[j].str ; j++) { |
2500 | if (prop->type == _RK_C_S2F && ival != -1 && |
2501 | (ival & prop->s2i[j].val) != prop->s2i[j].val) |
2502 | continue; |
2503 | else if (prop->type == _RK_C_S2I && |
2504 | ival != -1 && prop->s2i[j].val != ival) |
2505 | continue; |
2506 | |
2507 | if (!dest) |
2508 | of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0); |
2509 | else { |
2510 | size_t r; |
2511 | r = rd_snprintf(dest+of, dest_size-of, |
2512 | "%s%s" , |
2513 | of > 0 ? delim:"" , |
2514 | prop->s2i[j].str); |
2515 | if (r > dest_size-of) { |
2516 | r = dest_size-of; |
2517 | break; |
2518 | } |
2519 | of += r; |
2520 | } |
2521 | } |
2522 | |
2523 | return of+1/*nul*/; |
2524 | } |
2525 | |
2526 | |
2527 | /** |
2528 | * Return "original"(re-created) configuration value string |
2529 | */ |
2530 | static rd_kafka_conf_res_t |
2531 | rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop, |
2532 | char *dest, size_t *dest_size) { |
2533 | char tmp[22]; |
2534 | const char *val = NULL; |
2535 | size_t val_len = 0; |
2536 | int j; |
2537 | |
2538 | switch (prop->type) |
2539 | { |
2540 | case _RK_C_STR: |
2541 | val = *_RK_PTR(const char **, conf, prop->offset); |
2542 | break; |
2543 | |
2544 | case _RK_C_KSTR: |
2545 | { |
2546 | const rd_kafkap_str_t **kstr = _RK_PTR(const rd_kafkap_str_t **, |
2547 | conf, prop->offset); |
2548 | if (*kstr) |
2549 | val = (*kstr)->str; |
2550 | break; |
2551 | } |
2552 | |
2553 | case _RK_C_PTR: |
2554 | val = *_RK_PTR(const void **, conf, prop->offset); |
2555 | if (val) { |
2556 | rd_snprintf(tmp, sizeof(tmp), "%p" , (void *)val); |
2557 | val = tmp; |
2558 | } |
2559 | break; |
2560 | |
2561 | case _RK_C_BOOL: |
2562 | val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false" ); |
2563 | break; |
2564 | |
2565 | case _RK_C_INT: |
2566 | rd_snprintf(tmp, sizeof(tmp), "%i" , |
2567 | *_RK_PTR(int *, conf, prop->offset)); |
2568 | val = tmp; |
2569 | break; |
2570 | |
2571 | case _RK_C_S2I: |
2572 | for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
2573 | if (prop->s2i[j].val == |
2574 | *_RK_PTR(int *, conf, prop->offset)) { |
2575 | val = prop->s2i[j].str; |
2576 | break; |
2577 | } |
2578 | } |
2579 | break; |
2580 | |
2581 | case _RK_C_S2F: |
2582 | { |
2583 | const int ival = *_RK_PTR(const int *, conf, prop->offset); |
2584 | |
2585 | val_len = rd_kafka_conf_flags2str(dest, |
2586 | dest ? *dest_size : 0, "," , |
2587 | prop, ival); |
2588 | if (dest) { |
2589 | val_len = 0; |
2590 | val = dest; |
2591 | dest = NULL; |
2592 | } |
2593 | break; |
2594 | } |
2595 | |
2596 | case _RK_C_PATLIST: |
2597 | { |
2598 | const rd_kafka_pattern_list_t **plist; |
2599 | plist = _RK_PTR(const rd_kafka_pattern_list_t **, |
2600 | conf, prop->offset); |
2601 | if (*plist) |
2602 | val = (*plist)->rkpl_orig; |
2603 | break; |
2604 | } |
2605 | |
2606 | default: |
2607 | break; |
2608 | } |
2609 | |
2610 | if (val_len) { |
2611 | *dest_size = val_len+1; |
2612 | return RD_KAFKA_CONF_OK; |
2613 | } |
2614 | |
2615 | if (!val) |
2616 | return RD_KAFKA_CONF_INVALID; |
2617 | |
2618 | val_len = strlen(val); |
2619 | |
2620 | if (dest) { |
2621 | size_t use_len = RD_MIN(val_len, (*dest_size)-1); |
2622 | memcpy(dest, val, use_len); |
2623 | dest[use_len] = '\0'; |
2624 | } |
2625 | |
2626 | /* Return needed size */ |
2627 | *dest_size = val_len+1; |
2628 | |
2629 | return RD_KAFKA_CONF_OK; |
2630 | } |
2631 | |
2632 | |
2633 | static rd_kafka_conf_res_t rd_kafka_anyconf_get (int scope, const void *conf, |
2634 | const char *name, |
2635 | char *dest, size_t *dest_size){ |
2636 | const struct rd_kafka_property *prop; |
2637 | |
2638 | for (prop = rd_kafka_properties; prop->name ; prop++) { |
2639 | |
2640 | if (!(prop->scope & scope) || strcmp(prop->name, name)) |
2641 | continue; |
2642 | |
2643 | if (prop->type == _RK_C_ALIAS) |
2644 | return rd_kafka_anyconf_get(scope, conf, |
2645 | prop->sdef, |
2646 | dest, dest_size); |
2647 | |
2648 | if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) == |
2649 | RD_KAFKA_CONF_OK) |
2650 | return RD_KAFKA_CONF_OK; |
2651 | } |
2652 | |
2653 | return RD_KAFKA_CONF_UNKNOWN; |
2654 | } |
2655 | |
2656 | rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf, |
2657 | const char *name, |
2658 | char *dest, size_t *dest_size) { |
2659 | return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size); |
2660 | } |
2661 | |
2662 | rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf, |
2663 | const char *name, |
2664 | char *dest, size_t *dest_size) { |
2665 | rd_kafka_conf_res_t res; |
2666 | res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size); |
2667 | if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf) |
2668 | return res; |
2669 | |
2670 | /* Fallthru: |
2671 | * If the global property was unknown, try getting it from the |
2672 | * default topic config, if any. */ |
2673 | return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size); |
2674 | } |
2675 | |
2676 | |
2677 | static const char **rd_kafka_anyconf_dump (int scope, const void *conf, |
2678 | size_t *cntp) { |
2679 | const struct rd_kafka_property *prop; |
2680 | char **arr; |
2681 | int cnt = 0; |
2682 | |
2683 | arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties)*2); |
2684 | |
2685 | for (prop = rd_kafka_properties; prop->name ; prop++) { |
2686 | char *val = NULL; |
2687 | size_t val_size; |
2688 | |
2689 | if (!(prop->scope & scope)) |
2690 | continue; |
2691 | |
2692 | /* Skip aliases, show original property instead. |
2693 | * Skip invalids. */ |
2694 | if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID) |
2695 | continue; |
2696 | |
2697 | /* Query value size */ |
2698 | if (rd_kafka_anyconf_get0(conf, prop, NULL, &val_size) != |
2699 | RD_KAFKA_CONF_OK) |
2700 | continue; |
2701 | |
2702 | /* Get value */ |
2703 | val = malloc(val_size); |
2704 | rd_kafka_anyconf_get0(conf, prop, val, &val_size); |
2705 | |
2706 | arr[cnt++] = rd_strdup(prop->name); |
2707 | arr[cnt++] = val; |
2708 | } |
2709 | |
2710 | *cntp = cnt; |
2711 | |
2712 | return (const char **)arr; |
2713 | } |
2714 | |
2715 | |
2716 | const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) { |
2717 | return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp); |
2718 | } |
2719 | |
2720 | const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf, |
2721 | size_t *cntp) { |
2722 | return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp); |
2723 | } |
2724 | |
2725 | void rd_kafka_conf_dump_free (const char **arr, size_t cnt) { |
2726 | char **_arr = (char **)arr; |
2727 | unsigned int i; |
2728 | |
2729 | for (i = 0 ; i < cnt ; i++) |
2730 | if (_arr[i]) |
2731 | rd_free(_arr[i]); |
2732 | |
2733 | rd_free(_arr); |
2734 | } |
2735 | |
2736 | void rd_kafka_conf_properties_show (FILE *fp) { |
2737 | const struct rd_kafka_property *prop0; |
2738 | int last = 0; |
2739 | int j; |
2740 | char tmp[512]; |
2741 | const char *dash80 = "----------------------------------------" |
2742 | "----------------------------------------" ; |
2743 | |
2744 | for (prop0 = rd_kafka_properties; prop0->name ; prop0++) { |
2745 | const char *typeinfo = "" ; |
2746 | const char *importance; |
2747 | const struct rd_kafka_property *prop = prop0; |
2748 | |
2749 | /* Skip hidden properties. */ |
2750 | if (prop->scope & _RK_HIDDEN) |
2751 | continue; |
2752 | |
2753 | /* Skip invalid properties. */ |
2754 | if (prop->type == _RK_C_INVALID) |
2755 | continue; |
2756 | |
2757 | if (!(prop->scope & last)) { |
2758 | fprintf(fp, |
2759 | "%s## %s configuration properties\n\n" , |
2760 | last ? "\n\n" :"" , |
2761 | prop->scope == _RK_GLOBAL ? "Global" : "Topic" ); |
2762 | |
2763 | fprintf(fp, |
2764 | "%-40s | %3s | %-15s | %13s | %-10s | %-25s\n" |
2765 | "%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s-| -%.*s\n" , |
2766 | "Property" , "C/P" , "Range" , |
2767 | "Default" , "Importance" , "Description" , |
2768 | 40, dash80, 3, dash80, 15, dash80, |
2769 | 13, dash80, 10, dash80, 25, dash80); |
2770 | |
2771 | last = prop->scope & (_RK_GLOBAL|_RK_TOPIC); |
2772 | |
2773 | } |
2774 | |
2775 | fprintf(fp, "%-40s | " , prop->name); |
2776 | |
2777 | /* For aliases, use the aliased property from here on |
2778 | * so that the alias property shows up with proper |
2779 | * ranges, defaults, etc. */ |
2780 | if (prop->type == _RK_C_ALIAS) { |
2781 | prop = rd_kafka_conf_prop_find(prop->scope, |
2782 | prop->sdef); |
2783 | rd_assert(prop && *"BUG: " |
2784 | "alias points to unknown config property" ); |
2785 | } |
2786 | |
2787 | fprintf(fp, "%3s | " , |
2788 | (!(prop->scope & _RK_PRODUCER) == |
2789 | !(prop->scope & _RK_CONSUMER) ? " * " : |
2790 | ((prop->scope & _RK_PRODUCER) ? " P " : |
2791 | (prop->scope & _RK_CONSUMER) ? " C " : "" ))); |
2792 | |
2793 | switch (prop->type) |
2794 | { |
2795 | case _RK_C_STR: |
2796 | case _RK_C_KSTR: |
2797 | typeinfo = "string" ; |
2798 | case _RK_C_PATLIST: |
2799 | if (prop->type == _RK_C_PATLIST) |
2800 | typeinfo = "pattern list" ; |
2801 | if (prop->s2i[0].str) { |
2802 | rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", " , |
2803 | prop, -1); |
2804 | fprintf(fp, "%-15s | %13s" , |
2805 | tmp, prop->sdef ? prop->sdef : "" ); |
2806 | } else { |
2807 | fprintf(fp, "%-15s | %13s" , |
2808 | "" , prop->sdef ? prop->sdef : "" ); |
2809 | } |
2810 | break; |
2811 | case _RK_C_BOOL: |
2812 | typeinfo = "boolean" ; |
2813 | fprintf(fp, "%-15s | %13s" , "true, false" , |
2814 | prop->vdef ? "true" : "false" ); |
2815 | break; |
2816 | case _RK_C_INT: |
2817 | typeinfo = "integer" ; |
2818 | rd_snprintf(tmp, sizeof(tmp), |
2819 | "%d .. %d" , prop->vmin, prop->vmax); |
2820 | fprintf(fp, "%-15s | %13i" , tmp, prop->vdef); |
2821 | break; |
2822 | case _RK_C_S2I: |
2823 | typeinfo = "enum value" ; |
2824 | rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", " , |
2825 | prop, -1); |
2826 | fprintf(fp, "%-15s | " , tmp); |
2827 | |
2828 | for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) { |
2829 | if (prop->s2i[j].val == prop->vdef) { |
2830 | fprintf(fp, "%13s" , prop->s2i[j].str); |
2831 | break; |
2832 | } |
2833 | } |
2834 | if (j == RD_ARRAYSIZE(prop->s2i)) |
2835 | fprintf(fp, "%13s" , " " ); |
2836 | break; |
2837 | |
2838 | case _RK_C_S2F: |
2839 | typeinfo = "CSV flags" ; |
2840 | /* Dont duplicate builtin.features value in |
2841 | * both Range and Default */ |
2842 | if (!strcmp(prop->name, "builtin.features" )) |
2843 | *tmp = '\0'; |
2844 | else |
2845 | rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", " , |
2846 | prop, -1); |
2847 | fprintf(fp, "%-15s | " , tmp); |
2848 | rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", " , |
2849 | prop, prop->vdef); |
2850 | fprintf(fp, "%13s" , tmp); |
2851 | |
2852 | break; |
2853 | case _RK_C_PTR: |
2854 | typeinfo = "pointer" ; |
2855 | /* FALLTHRU */ |
2856 | default: |
2857 | fprintf(fp, "%-15s | %-13s" , "" , " " ); |
2858 | break; |
2859 | } |
2860 | |
2861 | if (prop->scope & _RK_HIGH) |
2862 | importance = "high" ; |
2863 | else if (prop->scope & _RK_MED) |
2864 | importance = "medium" ; |
2865 | else |
2866 | importance = "low" ; |
2867 | |
2868 | fprintf(fp, " | %-10s | " , importance); |
2869 | |
2870 | if (prop->scope & _RK_EXPERIMENTAL) |
2871 | fprintf(fp, "**EXPERIMENTAL**: " |
2872 | "subject to change or removal. " ); |
2873 | |
2874 | if (prop->scope & _RK_DEPRECATED) |
2875 | fprintf(fp, "**DEPRECATED** " ); |
2876 | |
2877 | /* If the original property is an alias, prefix the |
2878 | * description saying so. */ |
2879 | if (prop0->type == _RK_C_ALIAS) |
2880 | fprintf(fp, "Alias for `%s`: " , prop0->sdef); |
2881 | |
2882 | fprintf(fp, "%s <br>*Type: %s*\n" , prop->desc, |
2883 | typeinfo); |
2884 | } |
2885 | fprintf(fp, "\n" ); |
2886 | fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n" ); |
2887 | } |
2888 | |
2889 | |
2890 | |
2891 | |
2892 | /** |
2893 | * @name Configuration value methods |
2894 | * |
2895 | * @remark This generic interface will eventually replace the config property |
2896 | * used above. |
2897 | * @{ |
2898 | */ |
2899 | |
2900 | |
2901 | /** |
2902 | * @brief Set up an INT confval. |
2903 | * |
2904 | * @oaram name Property name, must be a const static string (will not be copied) |
2905 | */ |
2906 | void rd_kafka_confval_init_int (rd_kafka_confval_t *confval, |
2907 | const char *name, |
2908 | int vmin, int vmax, int vdef) { |
2909 | confval->name = name; |
2910 | confval->is_enabled = 1; |
2911 | confval->valuetype = RD_KAFKA_CONFVAL_INT; |
2912 | confval->u.INT.vmin = vmin; |
2913 | confval->u.INT.vmax = vmax; |
2914 | confval->u.INT.vdef = vdef; |
2915 | confval->u.INT.v = vdef; |
2916 | } |
2917 | |
2918 | /** |
2919 | * @brief Set up a PTR confval. |
2920 | * |
2921 | * @oaram name Property name, must be a const static string (will not be copied) |
2922 | */ |
2923 | void rd_kafka_confval_init_ptr (rd_kafka_confval_t *confval, |
2924 | const char *name) { |
2925 | confval->name = name; |
2926 | confval->is_enabled = 1; |
2927 | confval->valuetype = RD_KAFKA_CONFVAL_PTR; |
2928 | confval->u.PTR = NULL; |
2929 | } |
2930 | |
2931 | /** |
2932 | * @brief Set up but disable an intval, attempt to set this confval will fail. |
2933 | * |
2934 | * @oaram name Property name, must be a const static string (will not be copied) |
2935 | */ |
2936 | void rd_kafka_confval_disable (rd_kafka_confval_t *confval, const char *name) { |
2937 | confval->name = name; |
2938 | confval->is_enabled = 0; |
2939 | } |
2940 | |
2941 | /** |
2942 | * @brief Set confval's value to \p valuep, verifying the passed |
2943 | * \p valuetype matches (or can be cast to) \p confval's type. |
2944 | * |
2945 | * @param dispname is the display name for the configuration value and is |
2946 | * included in error strings. |
2947 | * @param valuep is a pointer to the value, or NULL to revert to default. |
2948 | * |
2949 | * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the new value was set, or |
2950 | * RD_KAFKA_RESP_ERR__INVALID_ARG if the value was of incorrect type, |
2951 | * out of range, or otherwise not a valid value. |
2952 | */ |
2953 | rd_kafka_resp_err_t |
2954 | rd_kafka_confval_set_type (rd_kafka_confval_t *confval, |
2955 | rd_kafka_confval_type_t valuetype, |
2956 | const void *valuep, |
2957 | char *errstr, size_t errstr_size) { |
2958 | |
2959 | if (!confval->is_enabled) { |
2960 | rd_snprintf(errstr, errstr_size, |
2961 | "\"%s\" is not supported for this operation" , |
2962 | confval->name); |
2963 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
2964 | } |
2965 | |
2966 | switch (confval->valuetype) |
2967 | { |
2968 | case RD_KAFKA_CONFVAL_INT: |
2969 | { |
2970 | int v; |
2971 | const char *end; |
2972 | |
2973 | if (!valuep) { |
2974 | /* Revert to default */ |
2975 | confval->u.INT.v = confval->u.INT.vdef; |
2976 | confval->is_set = 0; |
2977 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
2978 | } |
2979 | |
2980 | switch (valuetype) |
2981 | { |
2982 | case RD_KAFKA_CONFVAL_INT: |
2983 | v = *(const int *)valuep; |
2984 | break; |
2985 | case RD_KAFKA_CONFVAL_STR: |
2986 | v = (int)strtol((const char *)valuep, (char **)&end, 0); |
2987 | if (end == (const char *)valuep) { |
2988 | rd_snprintf(errstr, errstr_size, |
2989 | "Invalid value type for \"%s\": " |
2990 | "expecting integer" , |
2991 | confval->name); |
2992 | return RD_KAFKA_RESP_ERR__INVALID_TYPE; |
2993 | } |
2994 | default: |
2995 | rd_snprintf(errstr, errstr_size, |
2996 | "Invalid value type for \"%s\": " |
2997 | "expecting integer" , confval->name); |
2998 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
2999 | } |
3000 | |
3001 | |
3002 | if ((confval->u.INT.vmin || confval->u.INT.vmax) && |
3003 | (v < confval->u.INT.vmin || v > confval->u.INT.vmax)) { |
3004 | rd_snprintf(errstr, errstr_size, |
3005 | "Invalid value type for \"%s\": " |
3006 | "expecting integer in range %d..%d" , |
3007 | confval->name, |
3008 | confval->u.INT.vmin, |
3009 | confval->u.INT.vmax); |
3010 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3011 | } |
3012 | |
3013 | confval->u.INT.v = v; |
3014 | confval->is_set = 1; |
3015 | } |
3016 | break; |
3017 | |
3018 | case RD_KAFKA_CONFVAL_STR: |
3019 | { |
3020 | size_t vlen; |
3021 | const char *v = (const char *)valuep; |
3022 | |
3023 | if (!valuep) { |
3024 | confval->is_set = 0; |
3025 | if (confval->u.STR.vdef) |
3026 | confval->u.STR.v = rd_strdup(confval->u.STR. |
3027 | vdef); |
3028 | else |
3029 | confval->u.STR.v = NULL; |
3030 | } |
3031 | |
3032 | if (valuetype != RD_KAFKA_CONFVAL_STR) { |
3033 | rd_snprintf(errstr, errstr_size, |
3034 | "Invalid value type for \"%s\": " |
3035 | "expecting string" , confval->name); |
3036 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3037 | } |
3038 | |
3039 | vlen = strlen(v); |
3040 | if ((confval->u.STR.minlen || confval->u.STR.maxlen) && |
3041 | (vlen < confval->u.STR.minlen || |
3042 | vlen > confval->u.STR.maxlen)) { |
3043 | rd_snprintf(errstr, errstr_size, |
3044 | "Invalid value for \"%s\": " |
3045 | "expecting string with length " |
3046 | "%" PRIusz"..%" PRIusz, |
3047 | confval->name, |
3048 | confval->u.STR.minlen, |
3049 | confval->u.STR.maxlen); |
3050 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
3051 | } |
3052 | |
3053 | if (confval->u.STR.v) |
3054 | rd_free(confval->u.STR.v); |
3055 | |
3056 | confval->u.STR.v = rd_strdup(v); |
3057 | } |
3058 | break; |
3059 | |
3060 | case RD_KAFKA_CONFVAL_PTR: |
3061 | confval->u.PTR = (void *)valuep; |
3062 | break; |
3063 | |
3064 | default: |
3065 | RD_NOTREACHED(); |
3066 | return RD_KAFKA_RESP_ERR__NOENT; |
3067 | } |
3068 | |
3069 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
3070 | } |
3071 | |
3072 | |
3073 | int rd_kafka_confval_get_int (const rd_kafka_confval_t *confval) { |
3074 | rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_INT); |
3075 | return confval->u.INT.v; |
3076 | } |
3077 | |
3078 | |
3079 | const char *rd_kafka_confval_get_str (const rd_kafka_confval_t *confval) { |
3080 | rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_STR); |
3081 | return confval->u.STR.v; |
3082 | } |
3083 | |
3084 | void *rd_kafka_confval_get_ptr (const rd_kafka_confval_t *confval) { |
3085 | rd_assert(confval->valuetype == RD_KAFKA_CONFVAL_PTR); |
3086 | return confval->u.PTR; |
3087 | } |
3088 | |
3089 | |
3090 | /** |
3091 | * @brief Verify configuration \p conf is |
3092 | * correct/non-conflicting and finalize the configuration |
3093 | * settings for use. |
3094 | * |
3095 | * @returns an error string if configuration is incorrect, else NULL. |
3096 | */ |
3097 | const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype, |
3098 | rd_kafka_conf_t *conf) { |
3099 | |
3100 | /* Verify mandatory configuration */ |
3101 | if (!conf->socket_cb) |
3102 | return "Mandatory config property `socket_cb` not set" ; |
3103 | |
3104 | if (!conf->open_cb) |
3105 | return "Mandatory config property `open_cb` not set" ; |
3106 | |
3107 | #if WITH_SSL |
3108 | if (conf->ssl.keystore_location && !conf->ssl.keystore_password) |
3109 | return "`ssl.keystore.password` is mandatory when " |
3110 | "`ssl.keystore.location` is set" ; |
3111 | if (conf->ssl.ca && conf->ssl.ca_location) |
3112 | return "`ssl.ca.location`, and memory-based " |
3113 | "set_ssl_cert(CERT_CA) are mutually exclusive." ; |
3114 | #endif |
3115 | |
3116 | #if WITH_SASL_OAUTHBEARER |
3117 | if (conf->sasl.enable_oauthbearer_unsecure_jwt && |
3118 | conf->sasl.oauthbearer_token_refresh_cb) |
3119 | return "`enable.sasl.oauthbearer.unsecure.jwt` and " |
3120 | "`oauthbearer_token_refresh_cb` are mutually exclusive" ; |
3121 | #endif |
3122 | |
3123 | if (cltype == RD_KAFKA_CONSUMER) { |
3124 | /* Automatically adjust `fetch.max.bytes` to be >= |
3125 | * `message.max.bytes` unless set by user. */ |
3126 | if (rd_kafka_conf_is_modified(conf, "fetch.max.bytes" )) { |
3127 | if (conf->fetch_max_bytes < conf->max_msg_size) |
3128 | return "`fetch.max.bytes` must be >= " |
3129 | "`message.max.bytes`" ; |
3130 | } else { |
3131 | conf->fetch_max_bytes = RD_MAX(conf->fetch_max_bytes, |
3132 | conf->max_msg_size); |
3133 | } |
3134 | |
3135 | /* Automatically adjust 'receive.message.max.bytes' to |
3136 | * be 512 bytes larger than 'fetch.max.bytes' to have enough |
3137 | * room for protocol framing (including topic name), unless |
3138 | * set by user. */ |
3139 | if (rd_kafka_conf_is_modified(conf, |
3140 | "receive.message.max.bytes" )) { |
3141 | if (conf->fetch_max_bytes + 512 > |
3142 | conf->recv_max_msg_size) |
3143 | return "`receive.message.max.bytes` must be >= " |
3144 | "`fetch.max.bytes` + 512" ; |
3145 | } else { |
3146 | conf->recv_max_msg_size = |
3147 | RD_MAX(conf->recv_max_msg_size, |
3148 | conf->fetch_max_bytes + 512); |
3149 | } |
3150 | |
3151 | if (conf->max_poll_interval_ms < |
3152 | conf->group_session_timeout_ms) |
3153 | return "`max.poll.interval.ms`must be >= " |
3154 | "`session.timeout.ms`" ; |
3155 | |
3156 | /* Simplifies rd_kafka_is_idempotent() which is producer-only */ |
3157 | conf->eos.idempotence = 0; |
3158 | |
3159 | } else if (cltype == RD_KAFKA_PRODUCER) { |
3160 | if (conf->eos.idempotence) { |
3161 | /* Adjust configuration values for idempotent producer*/ |
3162 | |
3163 | if (rd_kafka_conf_is_modified(conf, "max.in.flight" )) { |
3164 | if (conf->max_inflight > |
3165 | RD_KAFKA_IDEMP_MAX_INFLIGHT) |
3166 | return "`max.in.flight` must be " |
3167 | "set <= " |
3168 | RD_KAFKA_IDEMP_MAX_INFLIGHT_STR |
3169 | " when `enable.idempotence` " |
3170 | "is true" ; |
3171 | } else { |
3172 | conf->max_inflight = |
3173 | RD_MIN(conf->max_inflight, |
3174 | RD_KAFKA_IDEMP_MAX_INFLIGHT); |
3175 | } |
3176 | |
3177 | |
3178 | if (rd_kafka_conf_is_modified(conf, "retries" )) { |
3179 | if (conf->max_retries < 1) |
3180 | return "`retries` must be set >= 1 " |
3181 | "when `enable.idempotence` is " |
3182 | "true" ; |
3183 | } else { |
3184 | conf->max_retries = INT32_MAX; |
3185 | } |
3186 | |
3187 | |
3188 | if (rd_kafka_conf_is_modified( |
3189 | conf, |
3190 | "queue.buffering.backpressure.threshold" ) |
3191 | && conf->queue_backpressure_thres > 1) |
3192 | return "`queue.buffering.backpressure.threshold` " |
3193 | "must be set to 1 when " |
3194 | "`enable.idempotence` is true" ; |
3195 | else |
3196 | conf->queue_backpressure_thres = 1; |
3197 | |
3198 | /* acks=all and queuing.strategy are set |
3199 | * in topic_conf_finalize() */ |
3200 | |
3201 | } else { |
3202 | if (conf->eos.gapless && |
3203 | rd_kafka_conf_is_modified( |
3204 | conf, "enable.gapless.guarantee" )) |
3205 | return "`enable.gapless.guarantee` requires " |
3206 | "`enable.idempotence` to be enabled" ; |
3207 | } |
3208 | } |
3209 | |
3210 | |
3211 | if (!rd_kafka_conf_is_modified(conf, "metadata.max.age.ms" ) && |
3212 | conf->metadata_refresh_interval_ms > 0) |
3213 | conf->metadata_max_age_ms = |
3214 | conf->metadata_refresh_interval_ms * 3; |
3215 | |
3216 | if (conf->reconnect_backoff_max_ms < conf->reconnect_backoff_ms) |
3217 | return "`reconnect.backoff.max.ms` must be >= " |
3218 | "`reconnect.max.ms`" ; |
3219 | |
3220 | if (conf->sparse_connections) { |
3221 | /* Set sparse connection random selection interval to |
3222 | * 10 < reconnect.backoff.ms / 2 < 1000. */ |
3223 | conf->sparse_connect_intvl = |
3224 | RD_MAX(11, RD_MIN(conf->reconnect_backoff_ms/2, 1000)); |
3225 | } |
3226 | |
3227 | /* Finalize and verify the default.topic.config */ |
3228 | if (conf->topic_conf) |
3229 | return rd_kafka_topic_conf_finalize(cltype, conf, |
3230 | conf->topic_conf); |
3231 | |
3232 | return NULL; |
3233 | } |
3234 | |
3235 | |
3236 | /** |
3237 | * @brief Verify topic configuration \p tconf is |
3238 | * correct/non-conflicting and finalize the configuration |
3239 | * settings for use. |
3240 | * |
3241 | * @returns an error string if configuration is incorrect, else NULL. |
3242 | */ |
3243 | const char *rd_kafka_topic_conf_finalize (rd_kafka_type_t cltype, |
3244 | const rd_kafka_conf_t *conf, |
3245 | rd_kafka_topic_conf_t *tconf) { |
3246 | |
3247 | if (conf->eos.idempotence) { |
3248 | /* Ensure acks=all */ |
3249 | |
3250 | if (rd_kafka_topic_conf_is_modified(tconf, "acks" )) { |
3251 | if (tconf->required_acks != -1) |
3252 | return "`acks` must be set to `all` when " |
3253 | "`enable.idempotence` is true" ; |
3254 | } else { |
3255 | tconf->required_acks = -1; /* all */ |
3256 | } |
3257 | |
3258 | /* Ensure FIFO queueing */ |
3259 | if (rd_kafka_topic_conf_is_modified(tconf, "queuing.strategy" )) { |
3260 | if (tconf->queuing_strategy != RD_KAFKA_QUEUE_FIFO) |
3261 | return "`queuing.strategy` must be set to " |
3262 | "`fifo` when `enable.idempotence` is " |
3263 | "true" ; |
3264 | } else { |
3265 | tconf->queuing_strategy = RD_KAFKA_QUEUE_FIFO; |
3266 | } |
3267 | } |
3268 | |
3269 | |
3270 | if (cltype == RD_KAFKA_PRODUCER) { |
3271 | if (tconf->message_timeout_ms <= conf->buffering_max_ms) |
3272 | return "`message.timeout.ms` must be greater than " |
3273 | "`linger.ms`" ; |
3274 | } |
3275 | |
3276 | |
3277 | return NULL; |
3278 | } |
3279 | |
3280 | |
3281 | /** |
3282 | * @brief Log warnings for set deprecated or experimental |
3283 | * configuration properties. |
3284 | * @returns the number of warnings logged. |
3285 | */ |
3286 | static int rd_kafka_anyconf_warn_deprecated (rd_kafka_t *rk, |
3287 | rd_kafka_conf_scope_t scope, |
3288 | const void *conf) { |
3289 | const struct rd_kafka_property *prop; |
3290 | const int warn_on = _RK_DEPRECATED|_RK_EXPERIMENTAL; |
3291 | int cnt = 0; |
3292 | |
3293 | |
3294 | for (prop = rd_kafka_properties; prop->name ; prop++) { |
3295 | int match = prop->scope & warn_on; |
3296 | |
3297 | if (likely(!(prop->scope & scope) || !match)) |
3298 | continue; |
3299 | |
3300 | if (likely(!rd_kafka_anyconf_is_modified(conf, prop))) |
3301 | continue; |
3302 | |
3303 | rd_kafka_log(rk, LOG_WARNING, "CONFWARN" , |
3304 | "Configuration property %s is %s%s%s: %s" , |
3305 | prop->name, |
3306 | match & _RK_DEPRECATED ? "deprecated" : "" , |
3307 | match == warn_on ? " and " : "" , |
3308 | match & _RK_EXPERIMENTAL ? "experimental" : "" , |
3309 | prop->desc); |
3310 | cnt++; |
3311 | } |
3312 | |
3313 | return cnt; |
3314 | } |
3315 | |
3316 | |
3317 | /** |
3318 | * @brief Log configuration warnings (deprecated configuration properties, |
3319 | * unrecommended combinations, etc). |
3320 | * |
3321 | * @returns the number of warnings logged. |
3322 | * |
3323 | * @locality any |
3324 | * @locks none |
3325 | */ |
3326 | int rd_kafka_conf_warn (rd_kafka_t *rk) { |
3327 | int cnt = 0; |
3328 | |
3329 | cnt = rd_kafka_anyconf_warn_deprecated(rk, _RK_GLOBAL, &rk->rk_conf); |
3330 | if (rk->rk_conf.topic_conf) |
3331 | cnt += rd_kafka_anyconf_warn_deprecated( |
3332 | rk, _RK_TOPIC, rk->rk_conf.topic_conf); |
3333 | |
3334 | /* Additional warnings */ |
3335 | if (rk->rk_type == RD_KAFKA_CONSUMER) { |
3336 | if (rk->rk_conf.fetch_wait_max_ms + 1000 > |
3337 | rk->rk_conf.socket_timeout_ms) |
3338 | rd_kafka_log(rk, LOG_WARNING, |
3339 | "CONFWARN" , |
3340 | "Configuration property " |
3341 | "`fetch.wait.max.ms` (%d) should be " |
3342 | "set lower than `socket.timeout.ms` (%d) " |
3343 | "by at least 1000ms to avoid blocking " |
3344 | "and timing out sub-sequent requests" , |
3345 | rk->rk_conf.fetch_wait_max_ms, |
3346 | rk->rk_conf.socket_timeout_ms); |
3347 | } |
3348 | |
3349 | return cnt; |
3350 | } |
3351 | |
3352 | |
3353 | const rd_kafka_conf_t *rd_kafka_conf (rd_kafka_t *rk) { |
3354 | return &rk->rk_conf; |
3355 | } |
3356 | |
3357 | |
3358 | /** |
3359 | * @brief Unittests |
3360 | */ |
3361 | int unittest_conf (void) { |
3362 | rd_kafka_conf_t *conf; |
3363 | rd_kafka_topic_conf_t *tconf; |
3364 | rd_kafka_conf_res_t res; |
3365 | char errstr[128]; |
3366 | int iteration; |
3367 | const struct rd_kafka_property *prop; |
3368 | |
3369 | conf = rd_kafka_conf_new(); |
3370 | tconf = rd_kafka_topic_conf_new(); |
3371 | |
3372 | res = rd_kafka_conf_set(conf, "unknown.thing" , "foo" , |
3373 | errstr, sizeof(errstr)); |
3374 | RD_UT_ASSERT(res == RD_KAFKA_CONF_UNKNOWN, "fail" ); |
3375 | RD_UT_ASSERT(*errstr, "fail" ); |
3376 | |
3377 | for (iteration = 0 ; iteration < 5 ; iteration++) { |
3378 | int cnt; |
3379 | |
3380 | |
3381 | /* Iterations: |
3382 | * 0 - Check is_modified |
3383 | * 1 - Set every other config property, read back and verify. |
3384 | * 2 - Check is_modified. |
3385 | * 3 - Set all config properties, read back and verify. |
3386 | * 4 - Check is_modified. */ |
3387 | for (prop = rd_kafka_properties, cnt = 0 ; prop->name ; |
3388 | prop++, cnt++) { |
3389 | const char *val; |
3390 | char tmp[64]; |
3391 | int odd = cnt & 1; |
3392 | int do_set = iteration == 3 || (iteration == 1 && odd); |
3393 | char readval[512]; |
3394 | size_t readlen = sizeof(readval); |
3395 | rd_kafka_conf_res_t res2; |
3396 | rd_bool_t is_modified; |
3397 | int exp_is_modified = iteration >= 3 || |
3398 | (iteration > 0 && (do_set || odd)); |
3399 | |
3400 | /* Avoid some special configs */ |
3401 | if (!strcmp(prop->name, "plugin.library.paths" ) || |
3402 | !strcmp(prop->name, "builtin.features" )) |
3403 | continue; |
3404 | |
3405 | switch (prop->type) |
3406 | { |
3407 | case _RK_C_STR: |
3408 | case _RK_C_KSTR: |
3409 | case _RK_C_PATLIST: |
3410 | if (prop->sdef) |
3411 | val = prop->sdef; |
3412 | else |
3413 | val = "test" ; |
3414 | break; |
3415 | |
3416 | case _RK_C_BOOL: |
3417 | val = "true" ; |
3418 | break; |
3419 | |
3420 | case _RK_C_INT: |
3421 | rd_snprintf(tmp, sizeof(tmp), "%d" , prop->vdef); |
3422 | val = tmp; |
3423 | break; |
3424 | |
3425 | case _RK_C_S2F: |
3426 | case _RK_C_S2I: |
3427 | val = prop->s2i[0].str; |
3428 | break; |
3429 | |
3430 | case _RK_C_PTR: |
3431 | case _RK_C_ALIAS: |
3432 | case _RK_C_INVALID: |
3433 | case _RK_C_INTERNAL: |
3434 | default: |
3435 | continue; |
3436 | } |
3437 | |
3438 | |
3439 | if (prop->scope & _RK_GLOBAL) { |
3440 | if (do_set) |
3441 | res = rd_kafka_conf_set(conf, |
3442 | prop->name, val, |
3443 | errstr, |
3444 | sizeof(errstr)); |
3445 | |
3446 | res2 = rd_kafka_conf_get(conf, |
3447 | prop->name, |
3448 | readval, &readlen); |
3449 | |
3450 | is_modified = rd_kafka_conf_is_modified( |
3451 | conf, prop->name); |
3452 | |
3453 | |
3454 | } else if (prop->scope & _RK_TOPIC) { |
3455 | if (do_set) |
3456 | res = rd_kafka_topic_conf_set( |
3457 | tconf, |
3458 | prop->name, val, |
3459 | errstr, sizeof(errstr)); |
3460 | |
3461 | res2 = rd_kafka_topic_conf_get(tconf, |
3462 | prop->name, |
3463 | readval, |
3464 | &readlen); |
3465 | |
3466 | is_modified = rd_kafka_topic_conf_is_modified( |
3467 | tconf, prop->name); |
3468 | |
3469 | } else { |
3470 | RD_NOTREACHED(); |
3471 | } |
3472 | |
3473 | |
3474 | |
3475 | if (do_set) { |
3476 | RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, |
3477 | "conf_set %s failed: %d: %s" , |
3478 | prop->name, res, errstr); |
3479 | RD_UT_ASSERT(res2 == RD_KAFKA_CONF_OK, |
3480 | "conf_get %s failed: %d" , |
3481 | prop->name, res2); |
3482 | |
3483 | RD_UT_ASSERT(!strcmp(readval, val), |
3484 | "conf_get %s " |
3485 | "returned \"%s\": " |
3486 | "expected \"%s\"" , |
3487 | prop->name, readval, val); |
3488 | |
3489 | RD_UT_ASSERT(is_modified, |
3490 | "Property %s was set but " |
3491 | "is_modified=%d" , |
3492 | prop->name, is_modified); |
3493 | |
3494 | } |
3495 | |
3496 | assert(is_modified == exp_is_modified); |
3497 | RD_UT_ASSERT(is_modified == exp_is_modified, |
3498 | "Property %s is_modified=%d, " |
3499 | "exp_is_modified=%d " |
3500 | "(iter %d, odd %d, do_set %d)" , |
3501 | prop->name, is_modified, |
3502 | exp_is_modified, |
3503 | iteration, odd, do_set); |
3504 | } |
3505 | } |
3506 | |
3507 | /* Set an alias and make sure is_modified() works for it. */ |
3508 | res = rd_kafka_conf_set(conf, "max.in.flight" , "19" , NULL, 0); |
3509 | RD_UT_ASSERT(res == RD_KAFKA_CONF_OK, "%d" , res); |
3510 | |
3511 | RD_UT_ASSERT(rd_kafka_conf_is_modified(conf, "max.in.flight" ) == rd_true, |
3512 | "fail" ); |
3513 | RD_UT_ASSERT(rd_kafka_conf_is_modified( |
3514 | conf, |
3515 | "max.in.flight.requests.per.connection" ) == rd_true, |
3516 | "fail" ); |
3517 | |
3518 | rd_kafka_conf_destroy(conf); |
3519 | rd_kafka_topic_conf_destroy(tconf); |
3520 | |
3521 | RD_UT_PASS(); |
3522 | } |
3523 | |
3524 | /**@}*/ |
3525 | |