1 | /* |
2 | * librdkafka - The Apache Kafka C/C++ library |
3 | * |
4 | * Copyright (c) 2017 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include "rdkafka_int.h" |
30 | #include "rdkafka_interceptor.h" |
31 | #include "rdstring.h" |
32 | |
33 | /** |
34 | * @brief Interceptor methodtion/method reference |
35 | */ |
36 | typedef struct rd_kafka_interceptor_method_s { |
37 | union { |
38 | rd_kafka_interceptor_f_on_conf_set_t *on_conf_set; |
39 | rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup; |
40 | rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy; |
41 | rd_kafka_interceptor_f_on_new_t *on_new; |
42 | rd_kafka_interceptor_f_on_destroy_t *on_destroy; |
43 | rd_kafka_interceptor_f_on_send_t *on_send; |
44 | rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement; |
45 | rd_kafka_interceptor_f_on_consume_t *on_consume; |
46 | rd_kafka_interceptor_f_on_commit_t *on_commit; |
47 | rd_kafka_interceptor_f_on_request_sent_t *on_request_sent; |
48 | void *generic; /* For easy assignment */ |
49 | |
50 | } u; |
51 | char *ic_name; |
52 | void *ic_opaque; |
53 | } rd_kafka_interceptor_method_t; |
54 | |
55 | /** |
56 | * @brief Destroy interceptor methodtion reference |
57 | */ |
58 | static void |
59 | rd_kafka_interceptor_method_destroy (void *ptr) { |
60 | rd_kafka_interceptor_method_t *method = ptr; |
61 | rd_free(method->ic_name); |
62 | rd_free(method); |
63 | } |
64 | |
65 | |
66 | |
67 | |
68 | |
69 | /** |
70 | * @brief Handle an interceptor on_... methodtion call failures. |
71 | */ |
72 | static RD_INLINE void |
73 | rd_kafka_interceptor_failed (rd_kafka_t *rk, |
74 | const rd_kafka_interceptor_method_t *method, |
75 | const char *method_name, rd_kafka_resp_err_t err, |
76 | const rd_kafka_message_t *rkmessage, |
77 | const char *errstr) { |
78 | |
79 | /* FIXME: Suppress log messages, eventually */ |
80 | if (rkmessage) |
81 | rd_kafka_log(rk, LOG_WARNING, "ICFAIL" , |
82 | "Interceptor %s failed %s for " |
83 | "message on %s [%" PRId32"] @ %" PRId64 |
84 | ": %s%s%s" , |
85 | method->ic_name, method_name, |
86 | rd_kafka_topic_a2i(rkmessage->rkt)->rkt_topic->str, |
87 | rkmessage->partition, |
88 | rkmessage->offset, |
89 | rd_kafka_err2str(err), |
90 | errstr ? ": " : "" , |
91 | errstr ? errstr : "" ); |
92 | else |
93 | rd_kafka_log(rk, LOG_WARNING, "ICFAIL" , |
94 | "Interceptor %s failed %s: %s%s%s" , |
95 | method->ic_name, method_name, |
96 | rd_kafka_err2str(err), |
97 | errstr ? ": " : "" , |
98 | errstr ? errstr : "" ); |
99 | |
100 | } |
101 | |
102 | |
103 | |
104 | /** |
105 | * @brief Create interceptor method reference. |
106 | * Duplicates are rejected |
107 | */ |
108 | static rd_kafka_interceptor_method_t * |
109 | rd_kafka_interceptor_method_new (const char *ic_name, |
110 | void *func, void *ic_opaque) { |
111 | rd_kafka_interceptor_method_t *method; |
112 | |
113 | method = rd_calloc(1, sizeof(*method)); |
114 | method->ic_name = rd_strdup(ic_name); |
115 | method->ic_opaque = ic_opaque; |
116 | method->u.generic = func; |
117 | |
118 | return method; |
119 | } |
120 | |
121 | |
122 | /** |
123 | * @brief Method comparator to be used for finding, not sorting. |
124 | */ |
125 | static int rd_kafka_interceptor_method_cmp (const void *_a, const void *_b) { |
126 | const rd_kafka_interceptor_method_t *a = _a, *b = _b; |
127 | |
128 | if (a->u.generic != b->u.generic) |
129 | return -1; |
130 | |
131 | return strcmp(a->ic_name, b->ic_name); |
132 | } |
133 | |
134 | /** |
135 | * @brief Add interceptor method reference |
136 | */ |
137 | static rd_kafka_resp_err_t |
138 | rd_kafka_interceptor_method_add (rd_list_t *list, const char *ic_name, |
139 | void *func, void *ic_opaque) { |
140 | rd_kafka_interceptor_method_t *method; |
141 | const rd_kafka_interceptor_method_t skel = { |
142 | .ic_name = (char *)ic_name, |
143 | .u = { .generic = func } |
144 | }; |
145 | |
146 | /* Reject same method from same interceptor. |
147 | * This is needed to avoid duplicate interceptors when configuration |
148 | * objects are duplicated. |
149 | * An exception is made for lists with _F_UNIQUE, which is currently |
150 | * only on_conf_destroy() to allow interceptor cleanup. */ |
151 | if ((list->rl_flags & RD_LIST_F_UNIQUE) && |
152 | rd_list_find(list, &skel, rd_kafka_interceptor_method_cmp)) |
153 | return RD_KAFKA_RESP_ERR__CONFLICT; |
154 | |
155 | method = rd_kafka_interceptor_method_new(ic_name, func, ic_opaque); |
156 | rd_list_add(list, method); |
157 | |
158 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
159 | } |
160 | |
161 | /** |
162 | * @brief Destroy all interceptors |
163 | * @locality application thread calling rd_kafka_conf_destroy() or |
164 | * rd_kafka_destroy() |
165 | */ |
166 | void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf) { |
167 | rd_list_destroy(&conf->interceptors.on_conf_set); |
168 | rd_list_destroy(&conf->interceptors.on_conf_dup); |
169 | rd_list_destroy(&conf->interceptors.on_conf_destroy); |
170 | rd_list_destroy(&conf->interceptors.on_new); |
171 | rd_list_destroy(&conf->interceptors.on_destroy); |
172 | rd_list_destroy(&conf->interceptors.on_send); |
173 | rd_list_destroy(&conf->interceptors.on_acknowledgement); |
174 | rd_list_destroy(&conf->interceptors.on_consume); |
175 | rd_list_destroy(&conf->interceptors.on_commit); |
176 | rd_list_destroy(&conf->interceptors.on_request_sent); |
177 | |
178 | /* Interceptor config */ |
179 | rd_list_destroy(&conf->interceptors.config); |
180 | } |
181 | |
182 | |
183 | /** |
184 | * @brief Initialize interceptor sub-system for config object. |
185 | * @locality application thread |
186 | */ |
187 | static void |
188 | rd_kafka_interceptors_init (rd_kafka_conf_t *conf) { |
189 | rd_list_init(&conf->interceptors.on_conf_set, 0, |
190 | rd_kafka_interceptor_method_destroy) |
191 | ->rl_flags |= RD_LIST_F_UNIQUE; |
192 | rd_list_init(&conf->interceptors.on_conf_dup, 0, |
193 | rd_kafka_interceptor_method_destroy) |
194 | ->rl_flags |= RD_LIST_F_UNIQUE; |
195 | /* conf_destroy() allows duplicates entries. */ |
196 | rd_list_init(&conf->interceptors.on_conf_destroy, 0, |
197 | rd_kafka_interceptor_method_destroy); |
198 | rd_list_init(&conf->interceptors.on_new, 0, |
199 | rd_kafka_interceptor_method_destroy) |
200 | ->rl_flags |= RD_LIST_F_UNIQUE; |
201 | rd_list_init(&conf->interceptors.on_destroy, 0, |
202 | rd_kafka_interceptor_method_destroy) |
203 | ->rl_flags |= RD_LIST_F_UNIQUE; |
204 | rd_list_init(&conf->interceptors.on_send, 0, |
205 | rd_kafka_interceptor_method_destroy) |
206 | ->rl_flags |= RD_LIST_F_UNIQUE; |
207 | rd_list_init(&conf->interceptors.on_acknowledgement, 0, |
208 | rd_kafka_interceptor_method_destroy) |
209 | ->rl_flags |= RD_LIST_F_UNIQUE; |
210 | rd_list_init(&conf->interceptors.on_consume, 0, |
211 | rd_kafka_interceptor_method_destroy) |
212 | ->rl_flags |= RD_LIST_F_UNIQUE; |
213 | rd_list_init(&conf->interceptors.on_commit, 0, |
214 | rd_kafka_interceptor_method_destroy) |
215 | ->rl_flags |= RD_LIST_F_UNIQUE; |
216 | rd_list_init(&conf->interceptors.on_request_sent, 0, |
217 | rd_kafka_interceptor_method_destroy) |
218 | ->rl_flags |= RD_LIST_F_UNIQUE; |
219 | |
220 | /* Interceptor config */ |
221 | rd_list_init(&conf->interceptors.config, 0, |
222 | (void (*)(void *))rd_strtup_destroy); |
223 | } |
224 | |
225 | |
226 | |
227 | |
228 | /** |
229 | * @name Configuration backend |
230 | */ |
231 | |
232 | |
233 | /** |
234 | * @brief Constructor called when configuration object is created. |
235 | */ |
236 | void rd_kafka_conf_interceptor_ctor (int scope, void *pconf) { |
237 | rd_kafka_conf_t *conf = pconf; |
238 | assert(scope == _RK_GLOBAL); |
239 | rd_kafka_interceptors_init(conf); |
240 | } |
241 | |
242 | /** |
243 | * @brief Destructor called when configuration object is destroyed. |
244 | */ |
245 | void rd_kafka_conf_interceptor_dtor (int scope, void *pconf) { |
246 | rd_kafka_conf_t *conf = pconf; |
247 | assert(scope == _RK_GLOBAL); |
248 | rd_kafka_interceptors_destroy(conf); |
249 | } |
250 | |
251 | /** |
252 | * @brief Copy-constructor called when configuration object \p psrcp is |
253 | * duplicated to \p dstp. |
254 | * @remark Interceptors are NOT copied, but interceptor config is. |
255 | * |
256 | */ |
257 | void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc, |
258 | void *dstptr, const void *srcptr, |
259 | size_t filter_cnt, const char **filter) { |
260 | rd_kafka_conf_t *dconf = pdst; |
261 | const rd_kafka_conf_t *sconf = psrc; |
262 | int i; |
263 | const rd_strtup_t *confval; |
264 | |
265 | assert(scope == _RK_GLOBAL); |
266 | |
267 | /* Apply interceptor configuration values. |
268 | * on_conf_dup() has already been called for dconf so |
269 | * on_conf_set() interceptors are already in place and we can |
270 | * apply the configuration through the standard conf_set() API. */ |
271 | RD_LIST_FOREACH(confval, &sconf->interceptors.config, i) { |
272 | size_t fi; |
273 | size_t nlen = strlen(confval->name); |
274 | |
275 | /* Apply filter */ |
276 | for (fi = 0 ; fi < filter_cnt ; fi++) { |
277 | size_t flen = strlen(filter[fi]); |
278 | if (nlen >= flen && !strncmp(filter[fi], confval->name, |
279 | flen)) |
280 | break; |
281 | } |
282 | |
283 | if (fi < filter_cnt) |
284 | continue; /* Filter matched: ignore property. */ |
285 | |
286 | /* Ignore errors for now */ |
287 | rd_kafka_conf_set(dconf, confval->name, confval->value, |
288 | NULL, 0); |
289 | } |
290 | } |
291 | |
292 | |
293 | |
294 | |
295 | /** |
296 | * @brief Call interceptor on_conf_set methods. |
297 | * @locality application thread calling rd_kafka_conf_set() and |
298 | * rd_kafka_conf_dup() |
299 | */ |
300 | rd_kafka_conf_res_t |
301 | rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf, |
302 | const char *name, const char *val, |
303 | char *errstr, size_t errstr_size) { |
304 | rd_kafka_interceptor_method_t *method; |
305 | int i; |
306 | |
307 | RD_LIST_FOREACH(method, &conf->interceptors.on_conf_set, i) { |
308 | rd_kafka_conf_res_t res; |
309 | |
310 | res = method->u.on_conf_set(conf, |
311 | name, val, errstr, errstr_size, |
312 | method->ic_opaque); |
313 | if (res == RD_KAFKA_CONF_UNKNOWN) |
314 | continue; |
315 | |
316 | /* Add successfully handled properties to list of |
317 | * interceptor config properties so conf_t objects |
318 | * can be copied. */ |
319 | if (res == RD_KAFKA_CONF_OK) |
320 | rd_list_add(&conf->interceptors.config, |
321 | rd_strtup_new(name, val)); |
322 | return res; |
323 | } |
324 | |
325 | return RD_KAFKA_CONF_UNKNOWN; |
326 | } |
327 | |
328 | /** |
329 | * @brief Call interceptor on_conf_dup methods. |
330 | * @locality application thread calling rd_kafka_conf_dup() |
331 | */ |
332 | void |
333 | rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf, |
334 | const rd_kafka_conf_t *old_conf, |
335 | size_t filter_cnt, const char **filter) { |
336 | rd_kafka_interceptor_method_t *method; |
337 | int i; |
338 | |
339 | RD_LIST_FOREACH(method, &old_conf->interceptors.on_conf_dup, i) { |
340 | /* FIXME: Ignore error for now */ |
341 | method->u.on_conf_dup(new_conf, old_conf, |
342 | filter_cnt, filter, method->ic_opaque); |
343 | } |
344 | } |
345 | |
346 | |
347 | /** |
348 | * @brief Call interceptor on_conf_destroy methods. |
349 | * @locality application thread calling rd_kafka_conf_destroy(), rd_kafka_new(), |
350 | * rd_kafka_destroy() |
351 | */ |
352 | void |
353 | rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) { |
354 | rd_kafka_interceptor_method_t *method; |
355 | int i; |
356 | |
357 | RD_LIST_FOREACH(method, &conf->interceptors.on_conf_destroy, i) { |
358 | /* FIXME: Ignore error for now */ |
359 | method->u.on_conf_destroy(method->ic_opaque); |
360 | } |
361 | } |
362 | |
363 | |
364 | /** |
365 | * @brief Call interceptor on_new methods. |
366 | * @locality application thread calling rd_kafka_new() |
367 | */ |
368 | void |
369 | rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf) { |
370 | rd_kafka_interceptor_method_t *method; |
371 | int i; |
372 | char errstr[512]; |
373 | |
374 | RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_new, i) { |
375 | rd_kafka_resp_err_t err; |
376 | |
377 | err = method->u.on_new(rk, conf, method->ic_opaque, |
378 | errstr, sizeof(errstr)); |
379 | if (unlikely(err)) |
380 | rd_kafka_interceptor_failed(rk, method, "on_new" , err, |
381 | NULL, errstr); |
382 | } |
383 | } |
384 | |
385 | |
386 | |
387 | /** |
388 | * @brief Call interceptor on_destroy methods. |
389 | * @locality application thread calling rd_kafka_new() or rd_kafka_destroy() |
390 | */ |
391 | void |
392 | rd_kafka_interceptors_on_destroy (rd_kafka_t *rk) { |
393 | rd_kafka_interceptor_method_t *method; |
394 | int i; |
395 | |
396 | RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_destroy, i) { |
397 | rd_kafka_resp_err_t err; |
398 | |
399 | err = method->u.on_destroy(rk, method->ic_opaque); |
400 | if (unlikely(err)) |
401 | rd_kafka_interceptor_failed(rk, method, "on_destroy" , |
402 | err, NULL, NULL); |
403 | } |
404 | } |
405 | |
406 | |
407 | |
408 | /** |
409 | * @brief Call interceptor on_send methods. |
410 | * @locality application thread calling produce() |
411 | */ |
412 | void |
413 | rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage) { |
414 | rd_kafka_interceptor_method_t *method; |
415 | int i; |
416 | |
417 | RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_send, i) { |
418 | rd_kafka_resp_err_t err; |
419 | |
420 | err = method->u.on_send(rk, rkmessage, method->ic_opaque); |
421 | if (unlikely(err)) |
422 | rd_kafka_interceptor_failed(rk, method, "on_send" , err, |
423 | rkmessage, NULL); |
424 | } |
425 | } |
426 | |
427 | |
428 | |
429 | /** |
430 | * @brief Call interceptor on_acknowledgement methods. |
431 | * @locality application thread calling poll(), or the broker thread if |
432 | * if dr callback has been set. |
433 | */ |
434 | void |
435 | rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk, |
436 | rd_kafka_message_t *rkmessage) { |
437 | rd_kafka_interceptor_method_t *method; |
438 | int i; |
439 | |
440 | RD_LIST_FOREACH(method, |
441 | &rk->rk_conf.interceptors.on_acknowledgement, i) { |
442 | rd_kafka_resp_err_t err; |
443 | |
444 | err = method->u.on_acknowledgement(rk, rkmessage, |
445 | method->ic_opaque); |
446 | if (unlikely(err)) |
447 | rd_kafka_interceptor_failed(rk, method, |
448 | "on_acknowledgement" , err, |
449 | rkmessage, NULL); |
450 | } |
451 | } |
452 | |
453 | |
454 | /** |
455 | * @brief Call on_acknowledgement methods for all messages in queue. |
456 | * |
457 | * @param force_err If non-zero, sets this error on each message. |
458 | * |
459 | * @locality broker thread |
460 | */ |
461 | void |
462 | rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk, |
463 | rd_kafka_msgq_t *rkmq, |
464 | rd_kafka_resp_err_t force_err) { |
465 | rd_kafka_msg_t *rkm; |
466 | |
467 | RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) { |
468 | if (force_err) |
469 | rkm->rkm_err = force_err; |
470 | rd_kafka_interceptors_on_acknowledgement(rk, |
471 | &rkm->rkm_rkmessage); |
472 | } |
473 | } |
474 | |
475 | |
476 | /** |
477 | * @brief Call interceptor on_consume methods. |
478 | * @locality application thread calling poll(), consume() or similar prior to |
479 | * passing the message to the application. |
480 | */ |
481 | void |
482 | rd_kafka_interceptors_on_consume (rd_kafka_t *rk, |
483 | rd_kafka_message_t *rkmessage) { |
484 | rd_kafka_interceptor_method_t *method; |
485 | int i; |
486 | |
487 | RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_consume, i) { |
488 | rd_kafka_resp_err_t err; |
489 | |
490 | err = method->u.on_consume(rk, rkmessage, |
491 | method->ic_opaque); |
492 | if (unlikely(err)) |
493 | rd_kafka_interceptor_failed(rk, method, |
494 | "on_consume" , err, |
495 | rkmessage, NULL); |
496 | } |
497 | } |
498 | |
499 | |
500 | /** |
501 | * @brief Call interceptor on_commit methods. |
502 | * @locality application thread calling poll(), consume() or similar, |
503 | * or rdkafka main thread if no commit_cb or handler registered. |
504 | */ |
505 | void |
506 | rd_kafka_interceptors_on_commit (rd_kafka_t *rk, |
507 | const rd_kafka_topic_partition_list_t *offsets, |
508 | rd_kafka_resp_err_t err) { |
509 | rd_kafka_interceptor_method_t *method; |
510 | int i; |
511 | |
512 | RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_commit, i) { |
513 | rd_kafka_resp_err_t ic_err; |
514 | |
515 | ic_err = method->u.on_commit(rk, offsets, err, |
516 | method->ic_opaque); |
517 | if (unlikely(ic_err)) |
518 | rd_kafka_interceptor_failed(rk, method, |
519 | "on_commit" , ic_err, NULL, |
520 | NULL); |
521 | } |
522 | } |
523 | |
524 | |
525 | /** |
526 | * @brief Call interceptor on_request_sent methods |
527 | * @locality internal broker thread |
528 | */ |
529 | void rd_kafka_interceptors_on_request_sent (rd_kafka_t *rk, |
530 | int sockfd, |
531 | const char *brokername, |
532 | int32_t brokerid, |
533 | int16_t ApiKey, |
534 | int16_t ApiVersion, |
535 | int32_t CorrId, |
536 | size_t size) { |
537 | rd_kafka_interceptor_method_t *method; |
538 | int i; |
539 | |
540 | RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_request_sent, i) { |
541 | rd_kafka_resp_err_t ic_err; |
542 | |
543 | ic_err = method->u.on_request_sent(rk, |
544 | sockfd, |
545 | brokername, |
546 | brokerid, |
547 | ApiKey, |
548 | ApiVersion, |
549 | CorrId, |
550 | size, |
551 | method->ic_opaque); |
552 | if (unlikely(ic_err)) |
553 | rd_kafka_interceptor_failed(rk, method, |
554 | "on_request_sent" , |
555 | ic_err, NULL, NULL); |
556 | } |
557 | } |
558 | |
559 | |
560 | /** |
561 | * @name Public API (backend) |
562 | * @{ |
563 | */ |
564 | |
565 | |
566 | rd_kafka_resp_err_t |
567 | rd_kafka_conf_interceptor_add_on_conf_set ( |
568 | rd_kafka_conf_t *conf, const char *ic_name, |
569 | rd_kafka_interceptor_f_on_conf_set_t *on_conf_set, |
570 | void *ic_opaque) { |
571 | return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_set, |
572 | ic_name, (void *)on_conf_set, |
573 | ic_opaque); |
574 | } |
575 | |
576 | rd_kafka_resp_err_t |
577 | rd_kafka_conf_interceptor_add_on_conf_dup ( |
578 | rd_kafka_conf_t *conf, const char *ic_name, |
579 | rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup, |
580 | void *ic_opaque) { |
581 | return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_dup, |
582 | ic_name, (void *)on_conf_dup, |
583 | ic_opaque); |
584 | } |
585 | |
586 | rd_kafka_resp_err_t |
587 | rd_kafka_conf_interceptor_add_on_conf_destroy ( |
588 | rd_kafka_conf_t *conf, const char *ic_name, |
589 | rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy, |
590 | void *ic_opaque) { |
591 | return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_destroy, |
592 | ic_name, (void *)on_conf_destroy, |
593 | ic_opaque); |
594 | } |
595 | |
596 | |
597 | |
598 | rd_kafka_resp_err_t |
599 | rd_kafka_conf_interceptor_add_on_new ( |
600 | rd_kafka_conf_t *conf, const char *ic_name, |
601 | rd_kafka_interceptor_f_on_new_t *on_new, |
602 | void *ic_opaque) { |
603 | return rd_kafka_interceptor_method_add(&conf->interceptors.on_new, |
604 | ic_name, (void *)on_new, |
605 | ic_opaque); |
606 | } |
607 | |
608 | |
609 | rd_kafka_resp_err_t |
610 | rd_kafka_interceptor_add_on_destroy ( |
611 | rd_kafka_t *rk, const char *ic_name, |
612 | rd_kafka_interceptor_f_on_destroy_t *on_destroy, |
613 | void *ic_opaque) { |
614 | assert(!rk->rk_initialized); |
615 | return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_destroy, |
616 | ic_name, (void *)on_destroy, |
617 | ic_opaque); |
618 | } |
619 | |
620 | rd_kafka_resp_err_t |
621 | rd_kafka_interceptor_add_on_send ( |
622 | rd_kafka_t *rk, const char *ic_name, |
623 | rd_kafka_interceptor_f_on_send_t *on_send, |
624 | void *ic_opaque) { |
625 | assert(!rk->rk_initialized); |
626 | return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_send, |
627 | ic_name, (void *)on_send, |
628 | ic_opaque); |
629 | } |
630 | |
631 | rd_kafka_resp_err_t |
632 | rd_kafka_interceptor_add_on_acknowledgement ( |
633 | rd_kafka_t *rk, const char *ic_name, |
634 | rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement, |
635 | void *ic_opaque) { |
636 | assert(!rk->rk_initialized); |
637 | return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors. |
638 | on_acknowledgement, |
639 | ic_name, |
640 | (void *)on_acknowledgement, |
641 | ic_opaque); |
642 | } |
643 | |
644 | |
645 | rd_kafka_resp_err_t |
646 | rd_kafka_interceptor_add_on_consume ( |
647 | rd_kafka_t *rk, const char *ic_name, |
648 | rd_kafka_interceptor_f_on_consume_t *on_consume, |
649 | void *ic_opaque) { |
650 | assert(!rk->rk_initialized); |
651 | return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors. |
652 | on_consume, |
653 | ic_name, (void *)on_consume, |
654 | ic_opaque); |
655 | } |
656 | |
657 | |
658 | rd_kafka_resp_err_t |
659 | rd_kafka_interceptor_add_on_commit ( |
660 | rd_kafka_t *rk, const char *ic_name, |
661 | rd_kafka_interceptor_f_on_commit_t *on_commit, |
662 | void *ic_opaque) { |
663 | assert(!rk->rk_initialized); |
664 | return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors. |
665 | on_commit, |
666 | ic_name, (void *)on_commit, |
667 | ic_opaque); |
668 | } |
669 | |
670 | |
671 | rd_kafka_resp_err_t |
672 | rd_kafka_interceptor_add_on_request_sent ( |
673 | rd_kafka_t *rk, const char *ic_name, |
674 | rd_kafka_interceptor_f_on_request_sent_t *on_request_sent, |
675 | void *ic_opaque) { |
676 | assert(!rk->rk_initialized); |
677 | return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors. |
678 | on_request_sent, |
679 | ic_name, (void *)on_request_sent, |
680 | ic_opaque); |
681 | } |
682 | |