1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_interceptor.h"
31#include "rdstring.h"
32
33/**
34 * @brief Interceptor methodtion/method reference
35 */
36typedef struct rd_kafka_interceptor_method_s {
37 union {
38 rd_kafka_interceptor_f_on_conf_set_t *on_conf_set;
39 rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup;
40 rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy;
41 rd_kafka_interceptor_f_on_new_t *on_new;
42 rd_kafka_interceptor_f_on_destroy_t *on_destroy;
43 rd_kafka_interceptor_f_on_send_t *on_send;
44 rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement;
45 rd_kafka_interceptor_f_on_consume_t *on_consume;
46 rd_kafka_interceptor_f_on_commit_t *on_commit;
47 rd_kafka_interceptor_f_on_request_sent_t *on_request_sent;
48 void *generic; /* For easy assignment */
49
50 } u;
51 char *ic_name;
52 void *ic_opaque;
53} rd_kafka_interceptor_method_t;
54
55/**
56 * @brief Destroy interceptor methodtion reference
57 */
58static void
59rd_kafka_interceptor_method_destroy (void *ptr) {
60 rd_kafka_interceptor_method_t *method = ptr;
61 rd_free(method->ic_name);
62 rd_free(method);
63}
64
65
66
67
68
69/**
70 * @brief Handle an interceptor on_... methodtion call failures.
71 */
72static RD_INLINE void
73rd_kafka_interceptor_failed (rd_kafka_t *rk,
74 const rd_kafka_interceptor_method_t *method,
75 const char *method_name, rd_kafka_resp_err_t err,
76 const rd_kafka_message_t *rkmessage,
77 const char *errstr) {
78
79 /* FIXME: Suppress log messages, eventually */
80 if (rkmessage)
81 rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
82 "Interceptor %s failed %s for "
83 "message on %s [%"PRId32"] @ %"PRId64
84 ": %s%s%s",
85 method->ic_name, method_name,
86 rd_kafka_topic_a2i(rkmessage->rkt)->rkt_topic->str,
87 rkmessage->partition,
88 rkmessage->offset,
89 rd_kafka_err2str(err),
90 errstr ? ": " : "",
91 errstr ? errstr : "");
92 else
93 rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
94 "Interceptor %s failed %s: %s%s%s",
95 method->ic_name, method_name,
96 rd_kafka_err2str(err),
97 errstr ? ": " : "",
98 errstr ? errstr : "");
99
100}
101
102
103
104/**
105 * @brief Create interceptor method reference.
106 * Duplicates are rejected
107 */
108static rd_kafka_interceptor_method_t *
109rd_kafka_interceptor_method_new (const char *ic_name,
110 void *func, void *ic_opaque) {
111 rd_kafka_interceptor_method_t *method;
112
113 method = rd_calloc(1, sizeof(*method));
114 method->ic_name = rd_strdup(ic_name);
115 method->ic_opaque = ic_opaque;
116 method->u.generic = func;
117
118 return method;
119}
120
121
122/**
123 * @brief Method comparator to be used for finding, not sorting.
124 */
125static int rd_kafka_interceptor_method_cmp (const void *_a, const void *_b) {
126 const rd_kafka_interceptor_method_t *a = _a, *b = _b;
127
128 if (a->u.generic != b->u.generic)
129 return -1;
130
131 return strcmp(a->ic_name, b->ic_name);
132}
133
134/**
135 * @brief Add interceptor method reference
136 */
137static rd_kafka_resp_err_t
138rd_kafka_interceptor_method_add (rd_list_t *list, const char *ic_name,
139 void *func, void *ic_opaque) {
140 rd_kafka_interceptor_method_t *method;
141 const rd_kafka_interceptor_method_t skel = {
142 .ic_name = (char *)ic_name,
143 .u = { .generic = func }
144 };
145
146 /* Reject same method from same interceptor.
147 * This is needed to avoid duplicate interceptors when configuration
148 * objects are duplicated.
149 * An exception is made for lists with _F_UNIQUE, which is currently
150 * only on_conf_destroy() to allow interceptor cleanup. */
151 if ((list->rl_flags & RD_LIST_F_UNIQUE) &&
152 rd_list_find(list, &skel, rd_kafka_interceptor_method_cmp))
153 return RD_KAFKA_RESP_ERR__CONFLICT;
154
155 method = rd_kafka_interceptor_method_new(ic_name, func, ic_opaque);
156 rd_list_add(list, method);
157
158 return RD_KAFKA_RESP_ERR_NO_ERROR;
159}
160
161/**
162 * @brief Destroy all interceptors
163 * @locality application thread calling rd_kafka_conf_destroy() or
164 * rd_kafka_destroy()
165 */
166void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf) {
167 rd_list_destroy(&conf->interceptors.on_conf_set);
168 rd_list_destroy(&conf->interceptors.on_conf_dup);
169 rd_list_destroy(&conf->interceptors.on_conf_destroy);
170 rd_list_destroy(&conf->interceptors.on_new);
171 rd_list_destroy(&conf->interceptors.on_destroy);
172 rd_list_destroy(&conf->interceptors.on_send);
173 rd_list_destroy(&conf->interceptors.on_acknowledgement);
174 rd_list_destroy(&conf->interceptors.on_consume);
175 rd_list_destroy(&conf->interceptors.on_commit);
176 rd_list_destroy(&conf->interceptors.on_request_sent);
177
178 /* Interceptor config */
179 rd_list_destroy(&conf->interceptors.config);
180}
181
182
183/**
184 * @brief Initialize interceptor sub-system for config object.
185 * @locality application thread
186 */
187static void
188rd_kafka_interceptors_init (rd_kafka_conf_t *conf) {
189 rd_list_init(&conf->interceptors.on_conf_set, 0,
190 rd_kafka_interceptor_method_destroy)
191 ->rl_flags |= RD_LIST_F_UNIQUE;
192 rd_list_init(&conf->interceptors.on_conf_dup, 0,
193 rd_kafka_interceptor_method_destroy)
194 ->rl_flags |= RD_LIST_F_UNIQUE;
195 /* conf_destroy() allows duplicates entries. */
196 rd_list_init(&conf->interceptors.on_conf_destroy, 0,
197 rd_kafka_interceptor_method_destroy);
198 rd_list_init(&conf->interceptors.on_new, 0,
199 rd_kafka_interceptor_method_destroy)
200 ->rl_flags |= RD_LIST_F_UNIQUE;
201 rd_list_init(&conf->interceptors.on_destroy, 0,
202 rd_kafka_interceptor_method_destroy)
203 ->rl_flags |= RD_LIST_F_UNIQUE;
204 rd_list_init(&conf->interceptors.on_send, 0,
205 rd_kafka_interceptor_method_destroy)
206 ->rl_flags |= RD_LIST_F_UNIQUE;
207 rd_list_init(&conf->interceptors.on_acknowledgement, 0,
208 rd_kafka_interceptor_method_destroy)
209 ->rl_flags |= RD_LIST_F_UNIQUE;
210 rd_list_init(&conf->interceptors.on_consume, 0,
211 rd_kafka_interceptor_method_destroy)
212 ->rl_flags |= RD_LIST_F_UNIQUE;
213 rd_list_init(&conf->interceptors.on_commit, 0,
214 rd_kafka_interceptor_method_destroy)
215 ->rl_flags |= RD_LIST_F_UNIQUE;
216 rd_list_init(&conf->interceptors.on_request_sent, 0,
217 rd_kafka_interceptor_method_destroy)
218 ->rl_flags |= RD_LIST_F_UNIQUE;
219
220 /* Interceptor config */
221 rd_list_init(&conf->interceptors.config, 0,
222 (void (*)(void *))rd_strtup_destroy);
223}
224
225
226
227
228/**
229 * @name Configuration backend
230 */
231
232
233/**
234 * @brief Constructor called when configuration object is created.
235 */
236void rd_kafka_conf_interceptor_ctor (int scope, void *pconf) {
237 rd_kafka_conf_t *conf = pconf;
238 assert(scope == _RK_GLOBAL);
239 rd_kafka_interceptors_init(conf);
240}
241
242/**
243 * @brief Destructor called when configuration object is destroyed.
244 */
245void rd_kafka_conf_interceptor_dtor (int scope, void *pconf) {
246 rd_kafka_conf_t *conf = pconf;
247 assert(scope == _RK_GLOBAL);
248 rd_kafka_interceptors_destroy(conf);
249}
250
251/**
252 * @brief Copy-constructor called when configuration object \p psrcp is
253 * duplicated to \p dstp.
254 * @remark Interceptors are NOT copied, but interceptor config is.
255 *
256 */
257void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
258 void *dstptr, const void *srcptr,
259 size_t filter_cnt, const char **filter) {
260 rd_kafka_conf_t *dconf = pdst;
261 const rd_kafka_conf_t *sconf = psrc;
262 int i;
263 const rd_strtup_t *confval;
264
265 assert(scope == _RK_GLOBAL);
266
267 /* Apply interceptor configuration values.
268 * on_conf_dup() has already been called for dconf so
269 * on_conf_set() interceptors are already in place and we can
270 * apply the configuration through the standard conf_set() API. */
271 RD_LIST_FOREACH(confval, &sconf->interceptors.config, i) {
272 size_t fi;
273 size_t nlen = strlen(confval->name);
274
275 /* Apply filter */
276 for (fi = 0 ; fi < filter_cnt ; fi++) {
277 size_t flen = strlen(filter[fi]);
278 if (nlen >= flen && !strncmp(filter[fi], confval->name,
279 flen))
280 break;
281 }
282
283 if (fi < filter_cnt)
284 continue; /* Filter matched: ignore property. */
285
286 /* Ignore errors for now */
287 rd_kafka_conf_set(dconf, confval->name, confval->value,
288 NULL, 0);
289 }
290}
291
292
293
294
295/**
296 * @brief Call interceptor on_conf_set methods.
297 * @locality application thread calling rd_kafka_conf_set() and
298 * rd_kafka_conf_dup()
299 */
300rd_kafka_conf_res_t
301rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
302 const char *name, const char *val,
303 char *errstr, size_t errstr_size) {
304 rd_kafka_interceptor_method_t *method;
305 int i;
306
307 RD_LIST_FOREACH(method, &conf->interceptors.on_conf_set, i) {
308 rd_kafka_conf_res_t res;
309
310 res = method->u.on_conf_set(conf,
311 name, val, errstr, errstr_size,
312 method->ic_opaque);
313 if (res == RD_KAFKA_CONF_UNKNOWN)
314 continue;
315
316 /* Add successfully handled properties to list of
317 * interceptor config properties so conf_t objects
318 * can be copied. */
319 if (res == RD_KAFKA_CONF_OK)
320 rd_list_add(&conf->interceptors.config,
321 rd_strtup_new(name, val));
322 return res;
323 }
324
325 return RD_KAFKA_CONF_UNKNOWN;
326}
327
328/**
329 * @brief Call interceptor on_conf_dup methods.
330 * @locality application thread calling rd_kafka_conf_dup()
331 */
332void
333rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
334 const rd_kafka_conf_t *old_conf,
335 size_t filter_cnt, const char **filter) {
336 rd_kafka_interceptor_method_t *method;
337 int i;
338
339 RD_LIST_FOREACH(method, &old_conf->interceptors.on_conf_dup, i) {
340 /* FIXME: Ignore error for now */
341 method->u.on_conf_dup(new_conf, old_conf,
342 filter_cnt, filter, method->ic_opaque);
343 }
344}
345
346
347/**
348 * @brief Call interceptor on_conf_destroy methods.
349 * @locality application thread calling rd_kafka_conf_destroy(), rd_kafka_new(),
350 * rd_kafka_destroy()
351 */
352void
353rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) {
354 rd_kafka_interceptor_method_t *method;
355 int i;
356
357 RD_LIST_FOREACH(method, &conf->interceptors.on_conf_destroy, i) {
358 /* FIXME: Ignore error for now */
359 method->u.on_conf_destroy(method->ic_opaque);
360 }
361}
362
363
364/**
365 * @brief Call interceptor on_new methods.
366 * @locality application thread calling rd_kafka_new()
367 */
368void
369rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf) {
370 rd_kafka_interceptor_method_t *method;
371 int i;
372 char errstr[512];
373
374 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_new, i) {
375 rd_kafka_resp_err_t err;
376
377 err = method->u.on_new(rk, conf, method->ic_opaque,
378 errstr, sizeof(errstr));
379 if (unlikely(err))
380 rd_kafka_interceptor_failed(rk, method, "on_new", err,
381 NULL, errstr);
382 }
383}
384
385
386
387/**
388 * @brief Call interceptor on_destroy methods.
389 * @locality application thread calling rd_kafka_new() or rd_kafka_destroy()
390 */
391void
392rd_kafka_interceptors_on_destroy (rd_kafka_t *rk) {
393 rd_kafka_interceptor_method_t *method;
394 int i;
395
396 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_destroy, i) {
397 rd_kafka_resp_err_t err;
398
399 err = method->u.on_destroy(rk, method->ic_opaque);
400 if (unlikely(err))
401 rd_kafka_interceptor_failed(rk, method, "on_destroy",
402 err, NULL, NULL);
403 }
404}
405
406
407
408/**
409 * @brief Call interceptor on_send methods.
410 * @locality application thread calling produce()
411 */
412void
413rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage) {
414 rd_kafka_interceptor_method_t *method;
415 int i;
416
417 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_send, i) {
418 rd_kafka_resp_err_t err;
419
420 err = method->u.on_send(rk, rkmessage, method->ic_opaque);
421 if (unlikely(err))
422 rd_kafka_interceptor_failed(rk, method, "on_send", err,
423 rkmessage, NULL);
424 }
425}
426
427
428
429/**
430 * @brief Call interceptor on_acknowledgement methods.
431 * @locality application thread calling poll(), or the broker thread if
432 * if dr callback has been set.
433 */
434void
435rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
436 rd_kafka_message_t *rkmessage) {
437 rd_kafka_interceptor_method_t *method;
438 int i;
439
440 RD_LIST_FOREACH(method,
441 &rk->rk_conf.interceptors.on_acknowledgement, i) {
442 rd_kafka_resp_err_t err;
443
444 err = method->u.on_acknowledgement(rk, rkmessage,
445 method->ic_opaque);
446 if (unlikely(err))
447 rd_kafka_interceptor_failed(rk, method,
448 "on_acknowledgement", err,
449 rkmessage, NULL);
450 }
451}
452
453
454/**
455 * @brief Call on_acknowledgement methods for all messages in queue.
456 *
457 * @param force_err If non-zero, sets this error on each message.
458 *
459 * @locality broker thread
460 */
461void
462rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
463 rd_kafka_msgq_t *rkmq,
464 rd_kafka_resp_err_t force_err) {
465 rd_kafka_msg_t *rkm;
466
467 RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) {
468 if (force_err)
469 rkm->rkm_err = force_err;
470 rd_kafka_interceptors_on_acknowledgement(rk,
471 &rkm->rkm_rkmessage);
472 }
473}
474
475
476/**
477 * @brief Call interceptor on_consume methods.
478 * @locality application thread calling poll(), consume() or similar prior to
479 * passing the message to the application.
480 */
481void
482rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
483 rd_kafka_message_t *rkmessage) {
484 rd_kafka_interceptor_method_t *method;
485 int i;
486
487 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_consume, i) {
488 rd_kafka_resp_err_t err;
489
490 err = method->u.on_consume(rk, rkmessage,
491 method->ic_opaque);
492 if (unlikely(err))
493 rd_kafka_interceptor_failed(rk, method,
494 "on_consume", err,
495 rkmessage, NULL);
496 }
497}
498
499
500/**
501 * @brief Call interceptor on_commit methods.
502 * @locality application thread calling poll(), consume() or similar,
503 * or rdkafka main thread if no commit_cb or handler registered.
504 */
505void
506rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
507 const rd_kafka_topic_partition_list_t *offsets,
508 rd_kafka_resp_err_t err) {
509 rd_kafka_interceptor_method_t *method;
510 int i;
511
512 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_commit, i) {
513 rd_kafka_resp_err_t ic_err;
514
515 ic_err = method->u.on_commit(rk, offsets, err,
516 method->ic_opaque);
517 if (unlikely(ic_err))
518 rd_kafka_interceptor_failed(rk, method,
519 "on_commit", ic_err, NULL,
520 NULL);
521 }
522}
523
524
525/**
526 * @brief Call interceptor on_request_sent methods
527 * @locality internal broker thread
528 */
529void rd_kafka_interceptors_on_request_sent (rd_kafka_t *rk,
530 int sockfd,
531 const char *brokername,
532 int32_t brokerid,
533 int16_t ApiKey,
534 int16_t ApiVersion,
535 int32_t CorrId,
536 size_t size) {
537 rd_kafka_interceptor_method_t *method;
538 int i;
539
540 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_request_sent, i) {
541 rd_kafka_resp_err_t ic_err;
542
543 ic_err = method->u.on_request_sent(rk,
544 sockfd,
545 brokername,
546 brokerid,
547 ApiKey,
548 ApiVersion,
549 CorrId,
550 size,
551 method->ic_opaque);
552 if (unlikely(ic_err))
553 rd_kafka_interceptor_failed(rk, method,
554 "on_request_sent",
555 ic_err, NULL, NULL);
556 }
557}
558
559
560/**
561 * @name Public API (backend)
562 * @{
563 */
564
565
566rd_kafka_resp_err_t
567rd_kafka_conf_interceptor_add_on_conf_set (
568 rd_kafka_conf_t *conf, const char *ic_name,
569 rd_kafka_interceptor_f_on_conf_set_t *on_conf_set,
570 void *ic_opaque) {
571 return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_set,
572 ic_name, (void *)on_conf_set,
573 ic_opaque);
574}
575
576rd_kafka_resp_err_t
577rd_kafka_conf_interceptor_add_on_conf_dup (
578 rd_kafka_conf_t *conf, const char *ic_name,
579 rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup,
580 void *ic_opaque) {
581 return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_dup,
582 ic_name, (void *)on_conf_dup,
583 ic_opaque);
584}
585
586rd_kafka_resp_err_t
587rd_kafka_conf_interceptor_add_on_conf_destroy (
588 rd_kafka_conf_t *conf, const char *ic_name,
589 rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy,
590 void *ic_opaque) {
591 return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_destroy,
592 ic_name, (void *)on_conf_destroy,
593 ic_opaque);
594}
595
596
597
598rd_kafka_resp_err_t
599rd_kafka_conf_interceptor_add_on_new (
600 rd_kafka_conf_t *conf, const char *ic_name,
601 rd_kafka_interceptor_f_on_new_t *on_new,
602 void *ic_opaque) {
603 return rd_kafka_interceptor_method_add(&conf->interceptors.on_new,
604 ic_name, (void *)on_new,
605 ic_opaque);
606}
607
608
609rd_kafka_resp_err_t
610rd_kafka_interceptor_add_on_destroy (
611 rd_kafka_t *rk, const char *ic_name,
612 rd_kafka_interceptor_f_on_destroy_t *on_destroy,
613 void *ic_opaque) {
614 assert(!rk->rk_initialized);
615 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_destroy,
616 ic_name, (void *)on_destroy,
617 ic_opaque);
618}
619
620rd_kafka_resp_err_t
621rd_kafka_interceptor_add_on_send (
622 rd_kafka_t *rk, const char *ic_name,
623 rd_kafka_interceptor_f_on_send_t *on_send,
624 void *ic_opaque) {
625 assert(!rk->rk_initialized);
626 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_send,
627 ic_name, (void *)on_send,
628 ic_opaque);
629}
630
631rd_kafka_resp_err_t
632rd_kafka_interceptor_add_on_acknowledgement (
633 rd_kafka_t *rk, const char *ic_name,
634 rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
635 void *ic_opaque) {
636 assert(!rk->rk_initialized);
637 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
638 on_acknowledgement,
639 ic_name,
640 (void *)on_acknowledgement,
641 ic_opaque);
642}
643
644
645rd_kafka_resp_err_t
646rd_kafka_interceptor_add_on_consume (
647 rd_kafka_t *rk, const char *ic_name,
648 rd_kafka_interceptor_f_on_consume_t *on_consume,
649 void *ic_opaque) {
650 assert(!rk->rk_initialized);
651 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
652 on_consume,
653 ic_name, (void *)on_consume,
654 ic_opaque);
655}
656
657
658rd_kafka_resp_err_t
659rd_kafka_interceptor_add_on_commit (
660 rd_kafka_t *rk, const char *ic_name,
661 rd_kafka_interceptor_f_on_commit_t *on_commit,
662 void *ic_opaque) {
663 assert(!rk->rk_initialized);
664 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
665 on_commit,
666 ic_name, (void *)on_commit,
667 ic_opaque);
668}
669
670
671rd_kafka_resp_err_t
672rd_kafka_interceptor_add_on_request_sent (
673 rd_kafka_t *rk, const char *ic_name,
674 rd_kafka_interceptor_f_on_request_sent_t *on_request_sent,
675 void *ic_opaque) {
676 assert(!rk->rk_initialized);
677 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
678 on_request_sent,
679 ic_name, (void *)on_request_sent,
680 ic_opaque);
681}
682