1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2014-2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_CONF_H_
30#define _RDKAFKA_CONF_H_
31
32#include "rdlist.h"
33#include "rdkafka_cert.h"
34
35
36/**
37 * Forward declarations
38 */
39struct rd_kafka_transport_s;
40
41
42/**
43 * MessageSet compression codecs
44 */
45typedef enum {
46 RD_KAFKA_COMPRESSION_NONE,
47 RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
48 RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
49 RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
50 RD_KAFKA_COMPRESSION_ZSTD = RD_KAFKA_MSG_ATTR_ZSTD,
51 RD_KAFKA_COMPRESSION_INHERIT, /* Inherit setting from global conf */
52 RD_KAFKA_COMPRESSION_NUM
53} rd_kafka_compression_t;
54
55static RD_INLINE RD_UNUSED const char *
56rd_kafka_compression2str (rd_kafka_compression_t compr) {
57 static const char *names[RD_KAFKA_COMPRESSION_NUM] = {
58 [RD_KAFKA_COMPRESSION_NONE] = "none",
59 [RD_KAFKA_COMPRESSION_GZIP] = "gzip",
60 [RD_KAFKA_COMPRESSION_SNAPPY] = "snappy",
61 [RD_KAFKA_COMPRESSION_LZ4] = "lz4",
62 [RD_KAFKA_COMPRESSION_ZSTD] = "zstd",
63 [RD_KAFKA_COMPRESSION_INHERIT] = "inherit"
64 };
65 return names[compr];
66}
67
68/**
69 * MessageSet compression levels
70 */
71typedef enum {
72 RD_KAFKA_COMPLEVEL_DEFAULT = -1,
73 RD_KAFKA_COMPLEVEL_MIN = -1,
74 RD_KAFKA_COMPLEVEL_GZIP_MAX = 9,
75 RD_KAFKA_COMPLEVEL_LZ4_MAX = 12,
76 RD_KAFKA_COMPLEVEL_SNAPPY_MAX = 0,
77 RD_KAFKA_COMPLEVEL_ZSTD_MAX = 22,
78 RD_KAFKA_COMPLEVEL_MAX = 12
79} rd_kafka_complevel_t;
80
81typedef enum {
82 RD_KAFKA_PROTO_PLAINTEXT,
83 RD_KAFKA_PROTO_SSL,
84 RD_KAFKA_PROTO_SASL_PLAINTEXT,
85 RD_KAFKA_PROTO_SASL_SSL,
86 RD_KAFKA_PROTO_NUM,
87} rd_kafka_secproto_t;
88
89
90typedef enum {
91 RD_KAFKA_CONFIGURED,
92 RD_KAFKA_LEARNED,
93 RD_KAFKA_INTERNAL,
94 RD_KAFKA_LOGICAL
95} rd_kafka_confsource_t;
96
97static RD_INLINE RD_UNUSED
98const char *rd_kafka_confsource2str (rd_kafka_confsource_t source) {
99 static const char *names[] = {
100 "configured",
101 "learned",
102 "internal",
103 "logical"
104 };
105
106 return names[source];
107}
108
109
110typedef enum {
111 _RK_GLOBAL = 0x1,
112 _RK_PRODUCER = 0x2,
113 _RK_CONSUMER = 0x4,
114 _RK_TOPIC = 0x8,
115 _RK_CGRP = 0x10,
116 _RK_DEPRECATED = 0x20,
117 _RK_HIDDEN = 0x40,
118 _RK_HIGH = 0x80, /* High Importance */
119 _RK_MED = 0x100, /* Medium Importance */
120 _RK_EXPERIMENTAL = 0x200, /* Experimental (unsupported) property */
121 _RK_SENSITIVE = 0x400 /* The configuration property's value
122 * might contain sensitive information. */
123} rd_kafka_conf_scope_t;
124
125/**< While the client groups is a generic concept, it is currently
126 * only implemented for consumers in librdkafka. */
127#define _RK_CGRP _RK_CONSUMER
128
129typedef enum {
130 _RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */
131 _RK_CONF_PROP_SET_ADD, /* Add value (S2F) */
132 _RK_CONF_PROP_SET_DEL /* Remove value (S2F) */
133} rd_kafka_conf_set_mode_t;
134
135
136
137typedef enum {
138 RD_KAFKA_OFFSET_METHOD_NONE,
139 RD_KAFKA_OFFSET_METHOD_FILE,
140 RD_KAFKA_OFFSET_METHOD_BROKER
141} rd_kafka_offset_method_t;
142
143
144typedef enum {
145 RD_KAFKA_SSL_ENDPOINT_ID_NONE,
146 RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */
147} rd_kafka_ssl_endpoint_id_t;
148
149/* Increase in steps of 64 as needed. */
150#define RD_KAFKA_CONF_PROPS_IDX_MAX (64*24)
151
152/**
153 * @struct rd_kafka_anyconf_t
154 * @brief The anyconf header must be the first field in the
155 * rd_kafka_conf_t and rd_kafka_topic_conf_t structs.
156 * It provides a way to track which property has been modified.
157 */
158struct rd_kafka_anyconf_hdr {
159 uint64_t modified[RD_KAFKA_CONF_PROPS_IDX_MAX/64];
160};
161
162
163/**
164 * Optional configuration struct passed to rd_kafka_new*().
165 *
166 * The struct is populated ted through string properties
167 * by calling rd_kafka_conf_set().
168 *
169 */
170struct rd_kafka_conf_s {
171 struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */
172
173 /*
174 * Generic configuration
175 */
176 int enabled_events;
177 int max_msg_size;
178 int msg_copy_max_size;
179 int recv_max_msg_size;
180 int max_inflight;
181 int metadata_request_timeout_ms;
182 int metadata_refresh_interval_ms;
183 int metadata_refresh_fast_cnt;
184 int metadata_refresh_fast_interval_ms;
185 int metadata_refresh_sparse;
186 int metadata_max_age_ms;
187 int debug;
188 int broker_addr_ttl;
189 int broker_addr_family;
190 int socket_timeout_ms;
191 int socket_blocking_max_ms;
192 int socket_sndbuf_size;
193 int socket_rcvbuf_size;
194 int socket_keepalive;
195 int socket_nagle_disable;
196 int socket_max_fails;
197 char *client_id_str;
198 char *brokerlist;
199 int stats_interval_ms;
200 int term_sig;
201 int reconnect_backoff_ms;
202 int reconnect_backoff_max_ms;
203 int reconnect_jitter_ms;
204 int sparse_connections;
205 int sparse_connect_intvl;
206 int api_version_request;
207 int api_version_request_timeout_ms;
208 int api_version_fallback_ms;
209 char *broker_version_fallback;
210 rd_kafka_secproto_t security_protocol;
211
212#if WITH_SSL
213 struct {
214 SSL_CTX *ctx;
215 char *cipher_suites;
216#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER)
217 char *curves_list;
218 char *sigalgs_list;
219#endif
220 char *key_location;
221 char *key_pem;
222 rd_kafka_cert_t *key;
223 char *key_password;
224 char *cert_location;
225 char *cert_pem;
226 rd_kafka_cert_t *cert;
227 char *ca_location;
228 rd_kafka_cert_t *ca;
229 char *crl_location;
230 char *keystore_location;
231 char *keystore_password;
232 int endpoint_identification;
233 int enable_verify;
234 int (*cert_verify_cb) (rd_kafka_t *rk,
235 const char *broker_name,
236 int32_t broker_id,
237 int *x509_error,
238 int depth,
239 const char *buf, size_t size,
240 char *errstr, size_t errstr_size,
241 void *opaque);
242 } ssl;
243#endif
244
245 struct {
246 const struct rd_kafka_sasl_provider *provider;
247 char *principal;
248 char *mechanisms;
249 char *service_name;
250 char *kinit_cmd;
251 char *keytab;
252 int relogin_min_time;
253 char *username;
254 char *password;
255#if WITH_SASL_SCRAM
256 /* SCRAM EVP-wrapped hash function
257 * (return value from EVP_shaX()) */
258 const void/*EVP_MD*/ *scram_evp;
259 /* SCRAM direct hash function (e.g., SHA256()) */
260 unsigned char *(*scram_H) (const unsigned char *d, size_t n,
261 unsigned char *md);
262 /* Hash size */
263 size_t scram_H_size;
264#endif
265#if WITH_SASL_OAUTHBEARER
266 char *oauthbearer_config;
267 int enable_oauthbearer_unsecure_jwt;
268
269 /* SASL/OAUTHBEARER token refresh event callback */
270 void (*oauthbearer_token_refresh_cb) (
271 rd_kafka_t *rk,
272 const char *oauthbearer_config,
273 void *opaque);
274#endif
275 } sasl;
276
277#if WITH_PLUGINS
278 char *plugin_paths;
279 rd_list_t plugins;
280#endif
281
282 /* Interceptors */
283 struct {
284 /* rd_kafka_interceptor_method_t lists */
285 rd_list_t on_conf_set; /* on_conf_set interceptors
286 * (not copied on conf_dup()) */
287 rd_list_t on_conf_dup; /* .. (not copied) */
288 rd_list_t on_conf_destroy; /* .. (not copied) */
289 rd_list_t on_new; /* .. (copied) */
290 rd_list_t on_destroy; /* .. (copied) */
291 rd_list_t on_send; /* .. (copied) */
292 rd_list_t on_acknowledgement; /* .. (copied) */
293 rd_list_t on_consume; /* .. (copied) */
294 rd_list_t on_commit; /* .. (copied) */
295 rd_list_t on_request_sent; /* .. (copied) */
296
297 /* rd_strtup_t list */
298 rd_list_t config; /* Configuration name=val's
299 * handled by interceptors. */
300 } interceptors;
301
302 /* Client group configuration */
303 int coord_query_intvl_ms;
304 int max_poll_interval_ms;
305
306 int builtin_features;
307 /*
308 * Consumer configuration
309 */
310 int check_crcs;
311 int queued_min_msgs;
312 int queued_max_msg_kbytes;
313 int64_t queued_max_msg_bytes;
314 int fetch_wait_max_ms;
315 int fetch_msg_max_bytes;
316 int fetch_max_bytes;
317 int fetch_min_bytes;
318 int fetch_error_backoff_ms;
319 char *group_id_str;
320
321 rd_kafka_pattern_list_t *topic_blacklist;
322 struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
323 * for automatically
324 * subscribed topics. */
325 int enable_auto_commit;
326 int enable_auto_offset_store;
327 int auto_commit_interval_ms;
328 int group_session_timeout_ms;
329 int group_heartbeat_intvl_ms;
330 rd_kafkap_str_t *group_protocol_type;
331 char *partition_assignment_strategy;
332 rd_list_t partition_assignors;
333 int enabled_assignor_cnt;
334 struct rd_kafka_assignor_s *assignor;
335
336 void (*rebalance_cb) (rd_kafka_t *rk,
337 rd_kafka_resp_err_t err,
338 rd_kafka_topic_partition_list_t *partitions,
339 void *opaque);
340
341 void (*offset_commit_cb) (rd_kafka_t *rk,
342 rd_kafka_resp_err_t err,
343 rd_kafka_topic_partition_list_t *offsets,
344 void *opaque);
345
346 rd_kafka_offset_method_t offset_store_method;
347 int enable_partition_eof;
348
349 /*
350 * Producer configuration
351 */
352 struct {
353 int idempotence; /**< Enable Idempotent Producer */
354 rd_bool_t gapless; /**< Raise fatal error if
355 * gapless guarantee can't be
356 * satisfied. */
357 } eos;
358 int queue_buffering_max_msgs;
359 int queue_buffering_max_kbytes;
360 int buffering_max_ms;
361 int queue_backpressure_thres;
362 int max_retries;
363 int retry_backoff_ms;
364 int batch_num_messages;
365 rd_kafka_compression_t compression_codec;
366 int dr_err_only;
367
368 /* Message delivery report callback.
369 * Called once for each produced message, either on
370 * successful and acknowledged delivery to the broker in which
371 * case 'err' is 0, or if the message could not be delivered
372 * in which case 'err' is non-zero (use rd_kafka_err2str()
373 * to obtain a human-readable error reason).
374 *
375 * If the message was produced with neither RD_KAFKA_MSG_F_FREE
376 * or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
377 * pointer provided to rd_kafka_produce().
378 * rdkafka will not perform any further actions on 'payload'
379 * at this point and the application may rd_free the payload data
380 * at this point.
381 *
382 * 'opaque' is 'conf.opaque', while 'msg_opaque' is
383 * the opaque pointer provided in the rd_kafka_produce() call.
384 */
385 void (*dr_cb) (rd_kafka_t *rk,
386 void *payload, size_t len,
387 rd_kafka_resp_err_t err,
388 void *opaque, void *msg_opaque);
389
390 void (*dr_msg_cb) (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
391 void *opaque);
392
393 /* Consume callback */
394 void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
395
396 /* Log callback */
397 void (*log_cb) (const rd_kafka_t *rk, int level,
398 const char *fac, const char *buf);
399 int log_level;
400 int log_queue;
401 int log_thread_name;
402 int log_connection_close;
403
404 /* Error callback */
405 void (*error_cb) (rd_kafka_t *rk, int err,
406 const char *reason, void *opaque);
407
408 /* Throttle callback */
409 void (*throttle_cb) (rd_kafka_t *rk, const char *broker_name,
410 int32_t broker_id, int throttle_time_ms,
411 void *opaque);
412
413 /* Stats callback */
414 int (*stats_cb) (rd_kafka_t *rk,
415 char *json,
416 size_t json_len,
417 void *opaque);
418
419 /* Socket creation callback */
420 int (*socket_cb) (int domain, int type, int protocol, void *opaque);
421
422 /* Connect callback */
423 int (*connect_cb) (int sockfd,
424 const struct sockaddr *addr,
425 int addrlen,
426 const char *id,
427 void *opaque);
428
429 /* Close socket callback */
430 int (*closesocket_cb) (int sockfd, void *opaque);
431
432 /* File open callback */
433 int (*open_cb) (const char *pathname, int flags, mode_t mode,
434 void *opaque);
435
436 /* Background queue event callback */
437 void (*background_event_cb) (rd_kafka_t *rk, rd_kafka_event_t *rkev,
438 void *opaque);
439
440
441 /* Opaque passed to callbacks. */
442 void *opaque;
443
444 /* For use with value-less properties. */
445 int dummy;
446
447
448 /* Admin client defaults */
449 struct {
450 int request_timeout_ms; /* AdminOptions.request_timeout */
451 } admin;
452
453
454 /*
455 * Unit test pluggable interfaces
456 */
457 struct {
458 /**< Inject errors in ProduceResponse handler */
459 rd_kafka_resp_err_t (*handle_ProduceResponse) (
460 rd_kafka_t *rk,
461 int32_t brokerid,
462 uint64_t msgid,
463 rd_kafka_resp_err_t err);
464 } ut;
465};
466
467int rd_kafka_socket_cb_linux (int domain, int type, int protocol, void *opaque);
468int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
469 void *opaque);
470#ifndef _MSC_VER
471int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
472 void *opaque);
473#endif
474int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
475 void *opaque);
476
477
478
479struct rd_kafka_topic_conf_s {
480 struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */
481
482 int required_acks;
483 int32_t request_timeout_ms;
484 int message_timeout_ms;
485
486 int32_t (*partitioner) (const rd_kafka_topic_t *rkt,
487 const void *keydata, size_t keylen,
488 int32_t partition_cnt,
489 void *rkt_opaque,
490 void *msg_opaque);
491 char *partitioner_str;
492
493 int queuing_strategy; /* RD_KAFKA_QUEUE_FIFO|LIFO */
494 int (*msg_order_cmp) (const void *a, const void *b);
495
496 rd_kafka_compression_t compression_codec;
497 rd_kafka_complevel_t compression_level;
498 int produce_offset_report;
499
500 int consume_callback_max_msgs;
501 int auto_commit;
502 int auto_commit_interval_ms;
503 int auto_offset_reset;
504 char *offset_store_path;
505 int offset_store_sync_interval_ms;
506
507 rd_kafka_offset_method_t offset_store_method;
508
509 /* Application provided opaque pointer (this is rkt_opaque) */
510 void *opaque;
511};
512
513
514
515void rd_kafka_anyconf_destroy (int scope, void *conf);
516
517void rd_kafka_desensitize_str (char *str);
518
519void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf);
520void rd_kafka_topic_conf_desensitize (rd_kafka_topic_conf_t *tconf);
521
522const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype,
523 rd_kafka_conf_t *conf);
524const char *rd_kafka_topic_conf_finalize (rd_kafka_type_t cltype,
525 const rd_kafka_conf_t *conf,
526 rd_kafka_topic_conf_t *tconf);
527
528
529int rd_kafka_conf_warn (rd_kafka_t *rk);
530
531
532#include "rdkafka_confval.h"
533
534int unittest_conf (void);
535
536#endif /* _RDKAFKA_CONF_H_ */
537