1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2014-2018 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #ifndef _RDKAFKA_CONF_H_ |
30 | #define _RDKAFKA_CONF_H_ |
31 | |
32 | #include "rdlist.h" |
33 | #include "rdkafka_cert.h" |
34 | |
35 | |
36 | /** |
37 | * Forward declarations |
38 | */ |
39 | struct rd_kafka_transport_s; |
40 | |
41 | |
42 | /** |
43 | * MessageSet compression codecs |
44 | */ |
45 | typedef enum { |
46 | RD_KAFKA_COMPRESSION_NONE, |
47 | RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP, |
48 | RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY, |
49 | RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4, |
50 | RD_KAFKA_COMPRESSION_ZSTD = RD_KAFKA_MSG_ATTR_ZSTD, |
51 | RD_KAFKA_COMPRESSION_INHERIT, /* Inherit setting from global conf */ |
52 | RD_KAFKA_COMPRESSION_NUM |
53 | } rd_kafka_compression_t; |
54 | |
55 | static RD_INLINE RD_UNUSED const char * |
56 | rd_kafka_compression2str (rd_kafka_compression_t compr) { |
57 | static const char *names[RD_KAFKA_COMPRESSION_NUM] = { |
58 | [RD_KAFKA_COMPRESSION_NONE] = "none" , |
59 | [RD_KAFKA_COMPRESSION_GZIP] = "gzip" , |
60 | [RD_KAFKA_COMPRESSION_SNAPPY] = "snappy" , |
61 | [RD_KAFKA_COMPRESSION_LZ4] = "lz4" , |
62 | [RD_KAFKA_COMPRESSION_ZSTD] = "zstd" , |
63 | [RD_KAFKA_COMPRESSION_INHERIT] = "inherit" |
64 | }; |
65 | return names[compr]; |
66 | } |
67 | |
68 | /** |
69 | * MessageSet compression levels |
70 | */ |
71 | typedef enum { |
72 | RD_KAFKA_COMPLEVEL_DEFAULT = -1, |
73 | RD_KAFKA_COMPLEVEL_MIN = -1, |
74 | RD_KAFKA_COMPLEVEL_GZIP_MAX = 9, |
75 | RD_KAFKA_COMPLEVEL_LZ4_MAX = 12, |
76 | RD_KAFKA_COMPLEVEL_SNAPPY_MAX = 0, |
77 | RD_KAFKA_COMPLEVEL_ZSTD_MAX = 22, |
78 | RD_KAFKA_COMPLEVEL_MAX = 12 |
79 | } rd_kafka_complevel_t; |
80 | |
81 | typedef enum { |
82 | RD_KAFKA_PROTO_PLAINTEXT, |
83 | RD_KAFKA_PROTO_SSL, |
84 | RD_KAFKA_PROTO_SASL_PLAINTEXT, |
85 | RD_KAFKA_PROTO_SASL_SSL, |
86 | RD_KAFKA_PROTO_NUM, |
87 | } rd_kafka_secproto_t; |
88 | |
89 | |
90 | typedef enum { |
91 | RD_KAFKA_CONFIGURED, |
92 | RD_KAFKA_LEARNED, |
93 | RD_KAFKA_INTERNAL, |
94 | RD_KAFKA_LOGICAL |
95 | } rd_kafka_confsource_t; |
96 | |
97 | static RD_INLINE RD_UNUSED |
98 | const char *rd_kafka_confsource2str (rd_kafka_confsource_t source) { |
99 | static const char *names[] = { |
100 | "configured" , |
101 | "learned" , |
102 | "internal" , |
103 | "logical" |
104 | }; |
105 | |
106 | return names[source]; |
107 | } |
108 | |
109 | |
110 | typedef enum { |
111 | _RK_GLOBAL = 0x1, |
112 | _RK_PRODUCER = 0x2, |
113 | _RK_CONSUMER = 0x4, |
114 | _RK_TOPIC = 0x8, |
115 | _RK_CGRP = 0x10, |
116 | _RK_DEPRECATED = 0x20, |
117 | _RK_HIDDEN = 0x40, |
118 | _RK_HIGH = 0x80, /* High Importance */ |
119 | _RK_MED = 0x100, /* Medium Importance */ |
120 | _RK_EXPERIMENTAL = 0x200, /* Experimental (unsupported) property */ |
121 | _RK_SENSITIVE = 0x400 /* The configuration property's value |
122 | * might contain sensitive information. */ |
123 | } rd_kafka_conf_scope_t; |
124 | |
125 | /**< While the client groups is a generic concept, it is currently |
126 | * only implemented for consumers in librdkafka. */ |
127 | #define _RK_CGRP _RK_CONSUMER |
128 | |
129 | typedef enum { |
130 | _RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */ |
131 | _RK_CONF_PROP_SET_ADD, /* Add value (S2F) */ |
132 | _RK_CONF_PROP_SET_DEL /* Remove value (S2F) */ |
133 | } rd_kafka_conf_set_mode_t; |
134 | |
135 | |
136 | |
137 | typedef enum { |
138 | RD_KAFKA_OFFSET_METHOD_NONE, |
139 | RD_KAFKA_OFFSET_METHOD_FILE, |
140 | RD_KAFKA_OFFSET_METHOD_BROKER |
141 | } rd_kafka_offset_method_t; |
142 | |
143 | |
144 | typedef enum { |
145 | RD_KAFKA_SSL_ENDPOINT_ID_NONE, |
146 | RD_KAFKA_SSL_ENDPOINT_ID_HTTPS, /**< RFC2818 */ |
147 | } rd_kafka_ssl_endpoint_id_t; |
148 | |
149 | /* Increase in steps of 64 as needed. */ |
150 | #define RD_KAFKA_CONF_PROPS_IDX_MAX (64*24) |
151 | |
152 | /** |
153 | * @struct rd_kafka_anyconf_t |
154 | * @brief The anyconf header must be the first field in the |
155 | * rd_kafka_conf_t and rd_kafka_topic_conf_t structs. |
156 | * It provides a way to track which property has been modified. |
157 | */ |
158 | struct rd_kafka_anyconf_hdr { |
159 | uint64_t modified[RD_KAFKA_CONF_PROPS_IDX_MAX/64]; |
160 | }; |
161 | |
162 | |
163 | /** |
164 | * Optional configuration struct passed to rd_kafka_new*(). |
165 | * |
166 | * The struct is populated ted through string properties |
167 | * by calling rd_kafka_conf_set(). |
168 | * |
169 | */ |
170 | struct rd_kafka_conf_s { |
171 | struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */ |
172 | |
173 | /* |
174 | * Generic configuration |
175 | */ |
176 | int enabled_events; |
177 | int max_msg_size; |
178 | int msg_copy_max_size; |
179 | int recv_max_msg_size; |
180 | int max_inflight; |
181 | int metadata_request_timeout_ms; |
182 | int metadata_refresh_interval_ms; |
183 | int metadata_refresh_fast_cnt; |
184 | int metadata_refresh_fast_interval_ms; |
185 | int metadata_refresh_sparse; |
186 | int metadata_max_age_ms; |
187 | int debug; |
188 | int broker_addr_ttl; |
189 | int broker_addr_family; |
190 | int socket_timeout_ms; |
191 | int socket_blocking_max_ms; |
192 | int socket_sndbuf_size; |
193 | int socket_rcvbuf_size; |
194 | int socket_keepalive; |
195 | int socket_nagle_disable; |
196 | int socket_max_fails; |
197 | char *client_id_str; |
198 | char *brokerlist; |
199 | int stats_interval_ms; |
200 | int term_sig; |
201 | int reconnect_backoff_ms; |
202 | int reconnect_backoff_max_ms; |
203 | int reconnect_jitter_ms; |
204 | int sparse_connections; |
205 | int sparse_connect_intvl; |
206 | int api_version_request; |
207 | int api_version_request_timeout_ms; |
208 | int api_version_fallback_ms; |
209 | char *broker_version_fallback; |
210 | rd_kafka_secproto_t security_protocol; |
211 | |
212 | #if WITH_SSL |
213 | struct { |
214 | SSL_CTX *ctx; |
215 | char *cipher_suites; |
216 | #if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER) |
217 | char *curves_list; |
218 | char *sigalgs_list; |
219 | #endif |
220 | char *key_location; |
221 | char *key_pem; |
222 | rd_kafka_cert_t *key; |
223 | char *key_password; |
224 | char *cert_location; |
225 | char *cert_pem; |
226 | rd_kafka_cert_t *cert; |
227 | char *ca_location; |
228 | rd_kafka_cert_t *ca; |
229 | char *crl_location; |
230 | char *keystore_location; |
231 | char *keystore_password; |
232 | int endpoint_identification; |
233 | int enable_verify; |
234 | int (*cert_verify_cb) (rd_kafka_t *rk, |
235 | const char *broker_name, |
236 | int32_t broker_id, |
237 | int *x509_error, |
238 | int depth, |
239 | const char *buf, size_t size, |
240 | char *errstr, size_t errstr_size, |
241 | void *opaque); |
242 | } ssl; |
243 | #endif |
244 | |
245 | struct { |
246 | const struct rd_kafka_sasl_provider *provider; |
247 | char *principal; |
248 | char *mechanisms; |
249 | char *service_name; |
250 | char *kinit_cmd; |
251 | char *keytab; |
252 | int relogin_min_time; |
253 | char *username; |
254 | char *password; |
255 | #if WITH_SASL_SCRAM |
256 | /* SCRAM EVP-wrapped hash function |
257 | * (return value from EVP_shaX()) */ |
258 | const void/*EVP_MD*/ *scram_evp; |
259 | /* SCRAM direct hash function (e.g., SHA256()) */ |
260 | unsigned char *(*scram_H) (const unsigned char *d, size_t n, |
261 | unsigned char *md); |
262 | /* Hash size */ |
263 | size_t scram_H_size; |
264 | #endif |
265 | #if WITH_SASL_OAUTHBEARER |
266 | char *oauthbearer_config; |
267 | int enable_oauthbearer_unsecure_jwt; |
268 | |
269 | /* SASL/OAUTHBEARER token refresh event callback */ |
270 | void (*oauthbearer_token_refresh_cb) ( |
271 | rd_kafka_t *rk, |
272 | const char *oauthbearer_config, |
273 | void *opaque); |
274 | #endif |
275 | } sasl; |
276 | |
277 | #if WITH_PLUGINS |
278 | char *plugin_paths; |
279 | rd_list_t plugins; |
280 | #endif |
281 | |
282 | /* Interceptors */ |
283 | struct { |
284 | /* rd_kafka_interceptor_method_t lists */ |
285 | rd_list_t on_conf_set; /* on_conf_set interceptors |
286 | * (not copied on conf_dup()) */ |
287 | rd_list_t on_conf_dup; /* .. (not copied) */ |
288 | rd_list_t on_conf_destroy; /* .. (not copied) */ |
289 | rd_list_t on_new; /* .. (copied) */ |
290 | rd_list_t on_destroy; /* .. (copied) */ |
291 | rd_list_t on_send; /* .. (copied) */ |
292 | rd_list_t on_acknowledgement; /* .. (copied) */ |
293 | rd_list_t on_consume; /* .. (copied) */ |
294 | rd_list_t on_commit; /* .. (copied) */ |
295 | rd_list_t on_request_sent; /* .. (copied) */ |
296 | |
297 | /* rd_strtup_t list */ |
298 | rd_list_t config; /* Configuration name=val's |
299 | * handled by interceptors. */ |
300 | } interceptors; |
301 | |
302 | /* Client group configuration */ |
303 | int coord_query_intvl_ms; |
304 | int max_poll_interval_ms; |
305 | |
306 | int builtin_features; |
307 | /* |
308 | * Consumer configuration |
309 | */ |
310 | int check_crcs; |
311 | int queued_min_msgs; |
312 | int queued_max_msg_kbytes; |
313 | int64_t queued_max_msg_bytes; |
314 | int fetch_wait_max_ms; |
315 | int fetch_msg_max_bytes; |
316 | int fetch_max_bytes; |
317 | int fetch_min_bytes; |
318 | int fetch_error_backoff_ms; |
319 | char *group_id_str; |
320 | |
321 | rd_kafka_pattern_list_t *topic_blacklist; |
322 | struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config |
323 | * for automatically |
324 | * subscribed topics. */ |
325 | int enable_auto_commit; |
326 | int enable_auto_offset_store; |
327 | int auto_commit_interval_ms; |
328 | int group_session_timeout_ms; |
329 | int group_heartbeat_intvl_ms; |
330 | rd_kafkap_str_t *group_protocol_type; |
331 | char *partition_assignment_strategy; |
332 | rd_list_t partition_assignors; |
333 | int enabled_assignor_cnt; |
334 | struct rd_kafka_assignor_s *assignor; |
335 | |
336 | void (*rebalance_cb) (rd_kafka_t *rk, |
337 | rd_kafka_resp_err_t err, |
338 | rd_kafka_topic_partition_list_t *partitions, |
339 | void *opaque); |
340 | |
341 | void (*offset_commit_cb) (rd_kafka_t *rk, |
342 | rd_kafka_resp_err_t err, |
343 | rd_kafka_topic_partition_list_t *offsets, |
344 | void *opaque); |
345 | |
346 | rd_kafka_offset_method_t offset_store_method; |
347 | int enable_partition_eof; |
348 | |
349 | /* |
350 | * Producer configuration |
351 | */ |
352 | struct { |
353 | int idempotence; /**< Enable Idempotent Producer */ |
354 | rd_bool_t gapless; /**< Raise fatal error if |
355 | * gapless guarantee can't be |
356 | * satisfied. */ |
357 | } eos; |
358 | int queue_buffering_max_msgs; |
359 | int queue_buffering_max_kbytes; |
360 | int buffering_max_ms; |
361 | int queue_backpressure_thres; |
362 | int max_retries; |
363 | int retry_backoff_ms; |
364 | int batch_num_messages; |
365 | rd_kafka_compression_t compression_codec; |
366 | int dr_err_only; |
367 | |
368 | /* Message delivery report callback. |
369 | * Called once for each produced message, either on |
370 | * successful and acknowledged delivery to the broker in which |
371 | * case 'err' is 0, or if the message could not be delivered |
372 | * in which case 'err' is non-zero (use rd_kafka_err2str() |
373 | * to obtain a human-readable error reason). |
374 | * |
375 | * If the message was produced with neither RD_KAFKA_MSG_F_FREE |
376 | * or RD_KAFKA_MSG_F_COPY set then 'payload' is the original |
377 | * pointer provided to rd_kafka_produce(). |
378 | * rdkafka will not perform any further actions on 'payload' |
379 | * at this point and the application may rd_free the payload data |
380 | * at this point. |
381 | * |
382 | * 'opaque' is 'conf.opaque', while 'msg_opaque' is |
383 | * the opaque pointer provided in the rd_kafka_produce() call. |
384 | */ |
385 | void (*dr_cb) (rd_kafka_t *rk, |
386 | void *payload, size_t len, |
387 | rd_kafka_resp_err_t err, |
388 | void *opaque, void *msg_opaque); |
389 | |
390 | void (*dr_msg_cb) (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, |
391 | void *opaque); |
392 | |
393 | /* Consume callback */ |
394 | void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque); |
395 | |
396 | /* Log callback */ |
397 | void (*log_cb) (const rd_kafka_t *rk, int level, |
398 | const char *fac, const char *buf); |
399 | int log_level; |
400 | int log_queue; |
401 | int log_thread_name; |
402 | int log_connection_close; |
403 | |
404 | /* Error callback */ |
405 | void (*error_cb) (rd_kafka_t *rk, int err, |
406 | const char *reason, void *opaque); |
407 | |
408 | /* Throttle callback */ |
409 | void (*throttle_cb) (rd_kafka_t *rk, const char *broker_name, |
410 | int32_t broker_id, int throttle_time_ms, |
411 | void *opaque); |
412 | |
413 | /* Stats callback */ |
414 | int (*stats_cb) (rd_kafka_t *rk, |
415 | char *json, |
416 | size_t json_len, |
417 | void *opaque); |
418 | |
419 | /* Socket creation callback */ |
420 | int (*socket_cb) (int domain, int type, int protocol, void *opaque); |
421 | |
422 | /* Connect callback */ |
423 | int (*connect_cb) (int sockfd, |
424 | const struct sockaddr *addr, |
425 | int addrlen, |
426 | const char *id, |
427 | void *opaque); |
428 | |
429 | /* Close socket callback */ |
430 | int (*closesocket_cb) (int sockfd, void *opaque); |
431 | |
432 | /* File open callback */ |
433 | int (*open_cb) (const char *pathname, int flags, mode_t mode, |
434 | void *opaque); |
435 | |
436 | /* Background queue event callback */ |
437 | void (*background_event_cb) (rd_kafka_t *rk, rd_kafka_event_t *rkev, |
438 | void *opaque); |
439 | |
440 | |
441 | /* Opaque passed to callbacks. */ |
442 | void *opaque; |
443 | |
444 | /* For use with value-less properties. */ |
445 | int dummy; |
446 | |
447 | |
448 | /* Admin client defaults */ |
449 | struct { |
450 | int request_timeout_ms; /* AdminOptions.request_timeout */ |
451 | } admin; |
452 | |
453 | |
454 | /* |
455 | * Unit test pluggable interfaces |
456 | */ |
457 | struct { |
458 | /**< Inject errors in ProduceResponse handler */ |
459 | rd_kafka_resp_err_t (*handle_ProduceResponse) ( |
460 | rd_kafka_t *rk, |
461 | int32_t brokerid, |
462 | uint64_t msgid, |
463 | rd_kafka_resp_err_t err); |
464 | } ut; |
465 | }; |
466 | |
467 | int rd_kafka_socket_cb_linux (int domain, int type, int protocol, void *opaque); |
468 | int rd_kafka_socket_cb_generic (int domain, int type, int protocol, |
469 | void *opaque); |
470 | #ifndef _MSC_VER |
471 | int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode, |
472 | void *opaque); |
473 | #endif |
474 | int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode, |
475 | void *opaque); |
476 | |
477 | |
478 | |
479 | struct rd_kafka_topic_conf_s { |
480 | struct rd_kafka_anyconf_hdr hdr; /**< Must be first field */ |
481 | |
482 | int required_acks; |
483 | int32_t request_timeout_ms; |
484 | int message_timeout_ms; |
485 | |
486 | int32_t (*partitioner) (const rd_kafka_topic_t *rkt, |
487 | const void *keydata, size_t keylen, |
488 | int32_t partition_cnt, |
489 | void *rkt_opaque, |
490 | void *msg_opaque); |
491 | char *partitioner_str; |
492 | |
493 | int queuing_strategy; /* RD_KAFKA_QUEUE_FIFO|LIFO */ |
494 | int (*msg_order_cmp) (const void *a, const void *b); |
495 | |
496 | rd_kafka_compression_t compression_codec; |
497 | rd_kafka_complevel_t compression_level; |
498 | int produce_offset_report; |
499 | |
500 | int consume_callback_max_msgs; |
501 | int auto_commit; |
502 | int auto_commit_interval_ms; |
503 | int auto_offset_reset; |
504 | char *offset_store_path; |
505 | int offset_store_sync_interval_ms; |
506 | |
507 | rd_kafka_offset_method_t offset_store_method; |
508 | |
509 | /* Application provided opaque pointer (this is rkt_opaque) */ |
510 | void *opaque; |
511 | }; |
512 | |
513 | |
514 | |
515 | void rd_kafka_anyconf_destroy (int scope, void *conf); |
516 | |
517 | void rd_kafka_desensitize_str (char *str); |
518 | |
519 | void rd_kafka_conf_desensitize (rd_kafka_conf_t *conf); |
520 | void rd_kafka_topic_conf_desensitize (rd_kafka_topic_conf_t *tconf); |
521 | |
522 | const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype, |
523 | rd_kafka_conf_t *conf); |
524 | const char *rd_kafka_topic_conf_finalize (rd_kafka_type_t cltype, |
525 | const rd_kafka_conf_t *conf, |
526 | rd_kafka_topic_conf_t *tconf); |
527 | |
528 | |
529 | int rd_kafka_conf_warn (rd_kafka_t *rk); |
530 | |
531 | |
532 | #include "rdkafka_confval.h" |
533 | |
534 | int unittest_conf (void); |
535 | |
536 | #endif /* _RDKAFKA_CONF_H_ */ |
537 | |