1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2016 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_offset.h"
31#include "rdkafka_topic.h"
32#include "rdkafka_interceptor.h"
33
34int RD_TLS rd_kafka_yield_thread = 0;
35
36void rd_kafka_yield (rd_kafka_t *rk) {
37 rd_kafka_yield_thread = 1;
38}
39
40
41/**
42 * @brief Check and reset yield flag.
43 * @returns rd_true if caller should yield, otherwise rd_false.
44 * @remarks rkq_lock MUST be held
45 */
46static RD_INLINE rd_bool_t rd_kafka_q_check_yield (rd_kafka_q_t *rkq) {
47 if (!(rkq->rkq_flags & RD_KAFKA_Q_F_YIELD))
48 return rd_false;
49
50 rkq->rkq_flags &= ~RD_KAFKA_Q_F_YIELD;
51 return rd_true;
52}
53/**
54 * Destroy a queue. refcnt must be at zero.
55 */
56void rd_kafka_q_destroy_final (rd_kafka_q_t *rkq) {
57
58 mtx_lock(&rkq->rkq_lock);
59 if (unlikely(rkq->rkq_qio != NULL)) {
60 rd_free(rkq->rkq_qio);
61 rkq->rkq_qio = NULL;
62 }
63 /* Queue must have been disabled prior to final destruction,
64 * this is to catch the case where the queue owner/poll does not
65 * use rd_kafka_q_destroy_owner(). */
66 rd_dassert(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY));
67 rd_kafka_q_disable0(rkq, 0/*no-lock*/); /* for the non-devel case */
68 rd_kafka_q_fwd_set0(rkq, NULL, 0/*no-lock*/, 0 /*no-fwd-app*/);
69 rd_kafka_q_purge0(rkq, 0/*no-lock*/);
70 assert(!rkq->rkq_fwdq);
71 mtx_unlock(&rkq->rkq_lock);
72 mtx_destroy(&rkq->rkq_lock);
73 cnd_destroy(&rkq->rkq_cond);
74
75 if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED)
76 rd_free(rkq);
77}
78
79
80
81/**
82 * Initialize a queue.
83 */
84void rd_kafka_q_init0 (rd_kafka_q_t *rkq, rd_kafka_t *rk,
85 const char *func, int line) {
86 rd_kafka_q_reset(rkq);
87 rkq->rkq_fwdq = NULL;
88 rkq->rkq_refcnt = 1;
89 rkq->rkq_flags = RD_KAFKA_Q_F_READY;
90 rkq->rkq_rk = rk;
91 rkq->rkq_qio = NULL;
92 rkq->rkq_serve = NULL;
93 rkq->rkq_opaque = NULL;
94 mtx_init(&rkq->rkq_lock, mtx_plain);
95 cnd_init(&rkq->rkq_cond);
96#if ENABLE_DEVEL
97 rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
98#else
99 rkq->rkq_name = func;
100#endif
101}
102
103
104/**
105 * Allocate a new queue and initialize it.
106 */
107rd_kafka_q_t *rd_kafka_q_new0 (rd_kafka_t *rk, const char *func, int line) {
108 rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq));
109 rd_kafka_q_init(rkq, rk);
110 rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
111#if ENABLE_DEVEL
112 rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
113#else
114 rkq->rkq_name = func;
115#endif
116 return rkq;
117}
118
119/**
120 * Set/clear forward queue.
121 * Queue forwarding enables message routing inside rdkafka.
122 * Typical use is to re-route all fetched messages for all partitions
123 * to one single queue.
124 *
125 * All access to rkq_fwdq are protected by rkq_lock.
126 */
127void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq,
128 int do_lock, int fwd_app) {
129
130 if (do_lock)
131 mtx_lock(&srcq->rkq_lock);
132 if (fwd_app)
133 srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP;
134 if (srcq->rkq_fwdq) {
135 rd_kafka_q_destroy(srcq->rkq_fwdq);
136 srcq->rkq_fwdq = NULL;
137 }
138 if (destq) {
139 rd_kafka_q_keep(destq);
140
141 /* If rkq has ops in queue, append them to fwdq's queue.
142 * This is an irreversible operation. */
143 if (srcq->rkq_qlen > 0) {
144 rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY);
145 rd_kafka_q_concat(destq, srcq);
146 }
147
148 srcq->rkq_fwdq = destq;
149 }
150 if (do_lock)
151 mtx_unlock(&srcq->rkq_lock);
152}
153
154/**
155 * Purge all entries from a queue.
156 */
157int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) {
158 rd_kafka_op_t *rko, *next;
159 TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
160 rd_kafka_q_t *fwdq;
161 int cnt = 0;
162
163 if (do_lock)
164 mtx_lock(&rkq->rkq_lock);
165
166 if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
167 if (do_lock)
168 mtx_unlock(&rkq->rkq_lock);
169 cnt = rd_kafka_q_purge(fwdq);
170 rd_kafka_q_destroy(fwdq);
171 return cnt;
172 }
173
174 /* Move ops queue to tmpq to avoid lock-order issue
175 * by locks taken from rd_kafka_op_destroy(). */
176 TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);
177
178 /* Zero out queue */
179 rd_kafka_q_reset(rkq);
180
181 if (do_lock)
182 mtx_unlock(&rkq->rkq_lock);
183
184 /* Destroy the ops */
185 next = TAILQ_FIRST(&tmpq);
186 while ((rko = next)) {
187 next = TAILQ_NEXT(next, rko_link);
188 rd_kafka_op_destroy(rko);
189 cnt++;
190 }
191
192 return cnt;
193}
194
195
196/**
197 * Purge all entries from a queue with a rktp version smaller than `version`
198 * This shaves off the head of the queue, up until the first rko with
199 * a non-matching rktp or version.
200 */
201void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq,
202 rd_kafka_toppar_t *rktp, int version) {
203 rd_kafka_op_t *rko, *next;
204 TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
205 int32_t cnt = 0;
206 int64_t size = 0;
207 rd_kafka_q_t *fwdq;
208
209 mtx_lock(&rkq->rkq_lock);
210
211 if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
212 mtx_unlock(&rkq->rkq_lock);
213 rd_kafka_q_purge_toppar_version(fwdq, rktp, version);
214 rd_kafka_q_destroy(fwdq);
215 return;
216 }
217
218 /* Move ops to temporary queue and then destroy them from there
219 * without locks to avoid lock-ordering problems in op_destroy() */
220 while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp &&
221 rd_kafka_toppar_s2i(rko->rko_rktp) == rktp &&
222 rko->rko_version < version) {
223 TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
224 TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
225 cnt++;
226 size += rko->rko_len;
227 }
228
229
230 rkq->rkq_qlen -= cnt;
231 rkq->rkq_qsize -= size;
232 mtx_unlock(&rkq->rkq_lock);
233
234 next = TAILQ_FIRST(&tmpq);
235 while ((rko = next)) {
236 next = TAILQ_NEXT(next, rko_link);
237 rd_kafka_op_destroy(rko);
238 }
239}
240
241
242/**
243 * Move 'cnt' entries from 'srcq' to 'dstq'.
244 * If 'cnt' == -1 all entries will be moved.
245 * Returns the number of entries moved.
246 */
247int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
248 int cnt, int do_locks) {
249 rd_kafka_op_t *rko;
250 int mcnt = 0;
251
252 if (do_locks) {
253 mtx_lock(&srcq->rkq_lock);
254 mtx_lock(&dstq->rkq_lock);
255 }
256
257 if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) {
258 if (cnt > 0 && dstq->rkq_qlen == 0)
259 rd_kafka_q_io_event(dstq);
260
261 /* Optimization, if 'cnt' is equal/larger than all
262 * items of 'srcq' we can move the entire queue. */
263 if (cnt == -1 ||
264 cnt >= (int)srcq->rkq_qlen) {
265 mcnt = srcq->rkq_qlen;
266 rd_kafka_q_concat0(dstq, srcq, 0/*no-lock*/);
267 } else {
268 while (mcnt < cnt &&
269 (rko = TAILQ_FIRST(&srcq->rkq_q))) {
270 TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
271 if (likely(!rko->rko_prio))
272 TAILQ_INSERT_TAIL(&dstq->rkq_q, rko,
273 rko_link);
274 else
275 TAILQ_INSERT_SORTED(
276 &dstq->rkq_q, rko,
277 rd_kafka_op_t *, rko_link,
278 rd_kafka_op_cmp_prio);
279
280 srcq->rkq_qlen--;
281 dstq->rkq_qlen++;
282 srcq->rkq_qsize -= rko->rko_len;
283 dstq->rkq_qsize += rko->rko_len;
284 mcnt++;
285 }
286 }
287 } else
288 mcnt = rd_kafka_q_move_cnt(dstq->rkq_fwdq ? dstq->rkq_fwdq:dstq,
289 srcq->rkq_fwdq ? srcq->rkq_fwdq:srcq,
290 cnt, do_locks);
291
292 if (do_locks) {
293 mtx_unlock(&dstq->rkq_lock);
294 mtx_unlock(&srcq->rkq_lock);
295 }
296
297 return mcnt;
298}
299
300
301/**
302 * Filters out outdated ops.
303 */
304static RD_INLINE rd_kafka_op_t *rd_kafka_op_filter (rd_kafka_q_t *rkq,
305 rd_kafka_op_t *rko,
306 int version) {
307 if (unlikely(!rko))
308 return NULL;
309
310 if (unlikely(rd_kafka_op_version_outdated(rko, version))) {
311 rd_kafka_q_deq0(rkq, rko);
312 rd_kafka_op_destroy(rko);
313 return NULL;
314 }
315
316 return rko;
317}
318
319
320
321/**
322 * Pop an op from a queue.
323 *
324 * Locality: any thread.
325 */
326
327
328/**
329 * Serve q like rd_kafka_q_serve() until an op is found that can be returned
330 * as an event to the application.
331 *
332 * @returns the first event:able op, or NULL on timeout.
333 *
334 * Locality: any thread
335 */
336rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
337 int32_t version,
338 rd_kafka_q_cb_type_t cb_type,
339 rd_kafka_q_serve_cb_t *callback,
340 void *opaque) {
341 rd_kafka_op_t *rko;
342 rd_kafka_q_t *fwdq;
343
344 rd_dassert(cb_type);
345
346 mtx_lock(&rkq->rkq_lock);
347
348 rd_kafka_yield_thread = 0;
349 if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
350 struct timespec timeout_tspec;
351
352 rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
353
354 while (1) {
355 rd_kafka_op_res_t res;
356
357 /* Filter out outdated ops */
358 retry:
359 while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
360 !(rko = rd_kafka_op_filter(rkq, rko, version)))
361 ;
362
363 if (rko) {
364 /* Proper versioned op */
365 rd_kafka_q_deq0(rkq, rko);
366
367 /* Ops with callbacks are considered handled
368 * and we move on to the next op, if any.
369 * Ops w/o callbacks are returned immediately */
370 res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
371 cb_type, opaque,
372 callback);
373 if (res == RD_KAFKA_OP_RES_HANDLED ||
374 res == RD_KAFKA_OP_RES_KEEP)
375 goto retry; /* Next op */
376 else if (unlikely(res ==
377 RD_KAFKA_OP_RES_YIELD)) {
378 /* Callback yielded, unroll */
379 mtx_unlock(&rkq->rkq_lock);
380 return NULL;
381 } else
382 break; /* Proper op, handle below. */
383 }
384
385 if (unlikely(rd_kafka_q_check_yield(rkq))) {
386 mtx_unlock(&rkq->rkq_lock);
387 return NULL;
388 }
389
390 if (cnd_timedwait_abs(&rkq->rkq_cond,
391 &rkq->rkq_lock,
392 &timeout_tspec) !=
393 thrd_success) {
394 mtx_unlock(&rkq->rkq_lock);
395 return NULL;
396 }
397 }
398
399 mtx_unlock(&rkq->rkq_lock);
400
401 } else {
402 /* Since the q_pop may block we need to release the parent
403 * queue's lock. */
404 mtx_unlock(&rkq->rkq_lock);
405 rko = rd_kafka_q_pop_serve(fwdq, timeout_ms, version,
406 cb_type, callback, opaque);
407 rd_kafka_q_destroy(fwdq);
408 }
409
410
411 return rko;
412}
413
414rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms,
415 int32_t version) {
416 return rd_kafka_q_pop_serve(rkq, timeout_ms, version,
417 RD_KAFKA_Q_CB_RETURN,
418 NULL, NULL);
419}
420
421
422/**
423 * Pop all available ops from a queue and call the provided
424 * callback for each op.
425 * `max_cnt` limits the number of ops served, 0 = no limit.
426 *
427 * Returns the number of ops served.
428 *
429 * Locality: any thread.
430 */
431int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms,
432 int max_cnt, rd_kafka_q_cb_type_t cb_type,
433 rd_kafka_q_serve_cb_t *callback, void *opaque) {
434 rd_kafka_t *rk = rkq->rkq_rk;
435 rd_kafka_op_t *rko;
436 rd_kafka_q_t localq;
437 rd_kafka_q_t *fwdq;
438 int cnt = 0;
439 struct timespec timeout_tspec;
440
441 rd_dassert(cb_type);
442
443 mtx_lock(&rkq->rkq_lock);
444
445 rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0);
446 if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
447 int ret;
448 /* Since the q_pop may block we need to release the parent
449 * queue's lock. */
450 mtx_unlock(&rkq->rkq_lock);
451 ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt,
452 cb_type, callback, opaque);
453 rd_kafka_q_destroy(fwdq);
454 return ret;
455 }
456
457 rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
458
459 /* Wait for op */
460 while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
461 !rd_kafka_q_check_yield(rkq) &&
462 cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
463 &timeout_tspec) == thrd_success)
464 ;
465
466 if (!rko) {
467 mtx_unlock(&rkq->rkq_lock);
468 return 0;
469 }
470
471 /* Move the first `max_cnt` ops. */
472 rd_kafka_q_init(&localq, rkq->rkq_rk);
473 rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1/*all*/ : max_cnt,
474 0/*no-locks*/);
475
476 mtx_unlock(&rkq->rkq_lock);
477
478 rd_kafka_yield_thread = 0;
479
480 /* Call callback for each op */
481 while ((rko = TAILQ_FIRST(&localq.rkq_q))) {
482 rd_kafka_op_res_t res;
483
484 rd_kafka_q_deq0(&localq, rko);
485 res = rd_kafka_op_handle(rk, &localq, rko, cb_type,
486 opaque, callback);
487 /* op must have been handled */
488 rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);
489 cnt++;
490
491 if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
492 rd_kafka_yield_thread)) {
493 /* Callback called rd_kafka_yield(), we must
494 * stop our callback dispatching and put the
495 * ops in localq back on the original queue head. */
496 if (!TAILQ_EMPTY(&localq.rkq_q))
497 rd_kafka_q_prepend(rkq, &localq);
498 break;
499 }
500 }
501
502 rd_kafka_q_destroy_owner(&localq);
503
504 return cnt;
505}
506
507
508
509
510
511/**
512 * Populate 'rkmessages' array with messages from 'rkq'.
513 * If 'auto_commit' is set, each message's offset will be committed
514 * to the offset store for that toppar.
515 *
516 * Returns the number of messages added.
517 */
518
519int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
520 rd_kafka_message_t **rkmessages,
521 size_t rkmessages_size) {
522 unsigned int cnt = 0;
523 TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
524 rd_kafka_op_t *rko, *next;
525 rd_kafka_t *rk = rkq->rkq_rk;
526 rd_kafka_q_t *fwdq;
527 struct timespec timeout_tspec;
528
529 mtx_lock(&rkq->rkq_lock);
530 if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
531 /* Since the q_pop may block we need to release the parent
532 * queue's lock. */
533 mtx_unlock(&rkq->rkq_lock);
534 cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms,
535 rkmessages, rkmessages_size);
536 rd_kafka_q_destroy(fwdq);
537 return cnt;
538 }
539 mtx_unlock(&rkq->rkq_lock);
540
541 if (timeout_ms)
542 rd_kafka_app_poll_blocking(rk);
543
544 rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
545
546 rd_kafka_yield_thread = 0;
547 while (cnt < rkmessages_size) {
548 rd_kafka_op_res_t res;
549
550 mtx_lock(&rkq->rkq_lock);
551
552 while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
553 !rd_kafka_q_check_yield(rkq) &&
554 cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
555 &timeout_tspec) == thrd_success)
556 ;
557
558 if (!rko) {
559 mtx_unlock(&rkq->rkq_lock);
560 break; /* Timed out */
561 }
562
563 rd_kafka_q_deq0(rkq, rko);
564
565 mtx_unlock(&rkq->rkq_lock);
566
567 if (rd_kafka_op_version_outdated(rko, 0)) {
568 /* Outdated op, put on discard queue */
569 TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
570 continue;
571 }
572
573 /* Serve non-FETCH callbacks */
574 res = rd_kafka_poll_cb(rk, rkq, rko,
575 RD_KAFKA_Q_CB_RETURN, NULL);
576 if (res == RD_KAFKA_OP_RES_KEEP ||
577 res == RD_KAFKA_OP_RES_HANDLED) {
578 /* Callback served, rko is destroyed (if HANDLED). */
579 continue;
580 } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
581 rd_kafka_yield_thread)) {
582 /* Yield. */
583 break;
584 }
585 rd_dassert(res == RD_KAFKA_OP_RES_PASS);
586
587 /* Auto-commit offset, if enabled. */
588 if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
589 rd_kafka_toppar_t *rktp;
590 rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
591 rd_kafka_toppar_lock(rktp);
592 rktp->rktp_app_offset = rko->rko_u.fetch.rkm.rkm_offset+1;
593 if (rktp->rktp_cgrp &&
594 rk->rk_conf.enable_auto_offset_store)
595 rd_kafka_offset_store0(rktp,
596 rktp->rktp_app_offset,
597 0/* no lock */);
598 rd_kafka_toppar_unlock(rktp);
599 }
600
601 /* Get rkmessage from rko and append to array. */
602 rkmessages[cnt++] = rd_kafka_message_get(rko);
603 }
604
605 /* Discard non-desired and already handled ops */
606 next = TAILQ_FIRST(&tmpq);
607 while (next) {
608 rko = next;
609 next = TAILQ_NEXT(next, rko_link);
610 rd_kafka_op_destroy(rko);
611 }
612
613 rd_kafka_app_polled(rk);
614
615 return cnt;
616}
617
618
619
620void rd_kafka_queue_destroy (rd_kafka_queue_t *rkqu) {
621 if (rkqu->rkqu_is_owner)
622 rd_kafka_q_destroy_owner(rkqu->rkqu_q);
623 else
624 rd_kafka_q_destroy(rkqu->rkqu_q);
625 rd_free(rkqu);
626}
627
628rd_kafka_queue_t *rd_kafka_queue_new0 (rd_kafka_t *rk, rd_kafka_q_t *rkq) {
629 rd_kafka_queue_t *rkqu;
630
631 rkqu = rd_calloc(1, sizeof(*rkqu));
632
633 rkqu->rkqu_q = rkq;
634 rd_kafka_q_keep(rkq);
635
636 rkqu->rkqu_rk = rk;
637
638 return rkqu;
639}
640
641
642rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk) {
643 rd_kafka_q_t *rkq;
644 rd_kafka_queue_t *rkqu;
645
646 rkq = rd_kafka_q_new(rk);
647 rkqu = rd_kafka_queue_new0(rk, rkq);
648 rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held
649 * by queue_new0 */
650 rkqu->rkqu_is_owner = 1;
651 return rkqu;
652}
653
654
655rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk) {
656 return rd_kafka_queue_new0(rk, rk->rk_rep);
657}
658
659
660rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {
661 if (!rk->rk_cgrp)
662 return NULL;
663 return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
664}
665
666rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
667 const char *topic,
668 int32_t partition) {
669 shptr_rd_kafka_toppar_t *s_rktp;
670 rd_kafka_toppar_t *rktp;
671 rd_kafka_queue_t *result;
672
673 if (rk->rk_type == RD_KAFKA_PRODUCER)
674 return NULL;
675
676 s_rktp = rd_kafka_toppar_get2(rk, topic,
677 partition,
678 0, /* no ua_on_miss */
679 1 /* create_on_miss */);
680
681 if (!s_rktp)
682 return NULL;
683
684 rktp = rd_kafka_toppar_s2i(s_rktp);
685 result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
686 rd_kafka_toppar_destroy(s_rktp);
687
688 return result;
689}
690
691rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk) {
692 if (rk->rk_background.q)
693 return rd_kafka_queue_new0(rk, rk->rk_background.q);
694 else
695 return NULL;
696}
697
698
699rd_kafka_resp_err_t rd_kafka_set_log_queue (rd_kafka_t *rk,
700 rd_kafka_queue_t *rkqu) {
701 rd_kafka_q_t *rkq;
702 if (!rkqu)
703 rkq = rk->rk_rep;
704 else
705 rkq = rkqu->rkqu_q;
706 rd_kafka_q_fwd_set(rk->rk_logq, rkq);
707 return RD_KAFKA_RESP_ERR_NO_ERROR;
708}
709
710void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst) {
711 rd_kafka_q_fwd_set0(src->rkqu_q, dst ? dst->rkqu_q : NULL,
712 1, /* do_lock */
713 1 /* fwd_app */);
714}
715
716
717size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu) {
718 return (size_t)rd_kafka_q_len(rkqu->rkqu_q);
719}
720
721/**
722 * @brief Enable or disable(fd==-1) fd-based wake-ups for queue
723 */
724void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd,
725 const void *payload, size_t size) {
726 struct rd_kafka_q_io *qio = NULL;
727
728 if (fd != -1) {
729 qio = rd_malloc(sizeof(*qio) + size);
730 qio->fd = fd;
731 qio->size = size;
732 qio->payload = (void *)(qio+1);
733 qio->event_cb = NULL;
734 qio->event_cb_opaque = NULL;
735 memcpy(qio->payload, payload, size);
736 }
737
738 mtx_lock(&rkq->rkq_lock);
739 if (rkq->rkq_qio) {
740 rd_free(rkq->rkq_qio);
741 rkq->rkq_qio = NULL;
742 }
743
744 if (fd != -1) {
745 rkq->rkq_qio = qio;
746 }
747
748 mtx_unlock(&rkq->rkq_lock);
749
750}
751
752void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
753 const void *payload, size_t size) {
754 rd_kafka_q_io_event_enable(rkqu->rkqu_q, fd, payload, size);
755}
756
757
758/**
759 * @brief Enable or disable(event_cb==NULL) callback-based wake-ups for queue
760 */
761void rd_kafka_q_cb_event_enable (rd_kafka_q_t *rkq,
762 void (*event_cb) (rd_kafka_t *rk,
763 void *opaque),
764 void *opaque) {
765 struct rd_kafka_q_io *qio = NULL;
766
767 if (event_cb) {
768 qio = rd_malloc(sizeof(*qio));
769 qio->fd = -1;
770 qio->size = 0;
771 qio->payload = NULL;
772 qio->event_cb = event_cb;
773 qio->event_cb_opaque = opaque;
774 }
775
776 mtx_lock(&rkq->rkq_lock);
777 if (rkq->rkq_qio) {
778 rd_free(rkq->rkq_qio);
779 rkq->rkq_qio = NULL;
780 }
781
782 if (event_cb) {
783 rkq->rkq_qio = qio;
784 }
785
786 mtx_unlock(&rkq->rkq_lock);
787
788}
789
790void rd_kafka_queue_cb_event_enable (rd_kafka_queue_t *rkqu,
791 void (*event_cb) (rd_kafka_t *rk,
792 void *opaque),
793 void *opaque) {
794 rd_kafka_q_cb_event_enable (rkqu->rkqu_q, event_cb, opaque);
795}
796
797
798/**
799 * Helper: wait for single op on 'rkq', and return its error,
800 * or .._TIMED_OUT on timeout.
801 */
802rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms) {
803 rd_kafka_op_t *rko;
804 rd_kafka_resp_err_t err;
805
806 rko = rd_kafka_q_pop(rkq, timeout_ms, 0);
807 if (!rko)
808 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
809 else {
810 err = rko->rko_err;
811 rd_kafka_op_destroy(rko);
812 }
813
814 return err;
815}
816
817
818/**
819 * Apply \p callback on each op in queue.
820 * If the callback wishes to remove the rko it must do so using
821 * using rd_kafka_op_deq0().
822 *
823 * @returns the sum of \p callback() return values.
824 * @remark rkq will be locked, callers should take care not to
825 * interact with \p rkq through other means from the callback to avoid
826 * deadlocks.
827 */
828int rd_kafka_q_apply (rd_kafka_q_t *rkq,
829 int (*callback) (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
830 void *opaque),
831 void *opaque) {
832 rd_kafka_op_t *rko, *next;
833 rd_kafka_q_t *fwdq;
834 int cnt = 0;
835
836 mtx_lock(&rkq->rkq_lock);
837 if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
838 mtx_unlock(&rkq->rkq_lock);
839 cnt = rd_kafka_q_apply(fwdq, callback, opaque);
840 rd_kafka_q_destroy(fwdq);
841 return cnt;
842 }
843
844 next = TAILQ_FIRST(&rkq->rkq_q);
845 while ((rko = next)) {
846 next = TAILQ_NEXT(next, rko_link);
847 cnt += callback(rkq, rko, opaque);
848 }
849 mtx_unlock(&rkq->rkq_lock);
850
851 return cnt;
852}
853
854/**
855 * @brief Convert relative to absolute offsets and also purge any messages
856 * that are older than \p min_offset.
857 * @remark Error ops with ERR__NOT_IMPLEMENTED will not be purged since
858 * they are used to indicate unknnown compression codecs and compressed
859 * messagesets may have a starting offset lower than what we requested.
860 * @remark \p rkq locking is not performed (caller's responsibility)
861 * @remark Must NOT be used on fwdq.
862 */
863void rd_kafka_q_fix_offsets (rd_kafka_q_t *rkq, int64_t min_offset,
864 int64_t base_offset) {
865 rd_kafka_op_t *rko, *next;
866 int adj_len = 0;
867 int64_t adj_size = 0;
868
869 rd_kafka_assert(NULL, !rkq->rkq_fwdq);
870
871 next = TAILQ_FIRST(&rkq->rkq_q);
872 while ((rko = next)) {
873 next = TAILQ_NEXT(next, rko_link);
874
875 if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH))
876 continue;
877
878 rko->rko_u.fetch.rkm.rkm_offset += base_offset;
879
880 if (rko->rko_u.fetch.rkm.rkm_offset < min_offset &&
881 rko->rko_err != RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) {
882 adj_len++;
883 adj_size += rko->rko_len;
884 TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
885 rd_kafka_op_destroy(rko);
886 continue;
887 }
888 }
889
890
891 rkq->rkq_qlen -= adj_len;
892 rkq->rkq_qsize -= adj_size;
893}
894
895
896/**
897 * @brief Print information and contents of queue
898 */
899void rd_kafka_q_dump (FILE *fp, rd_kafka_q_t *rkq) {
900 mtx_lock(&rkq->rkq_lock);
901 fprintf(fp, "Queue %p \"%s\" (refcnt %d, flags 0x%x, %d ops, "
902 "%"PRId64" bytes)\n",
903 rkq, rkq->rkq_name, rkq->rkq_refcnt, rkq->rkq_flags,
904 rkq->rkq_qlen, rkq->rkq_qsize);
905
906 if (rkq->rkq_qio)
907 fprintf(fp, " QIO fd %d\n", rkq->rkq_qio->fd);
908 if (rkq->rkq_serve)
909 fprintf(fp, " Serve callback %p, opaque %p\n",
910 rkq->rkq_serve, rkq->rkq_opaque);
911
912 if (rkq->rkq_fwdq) {
913 fprintf(fp, " Forwarded ->\n");
914 rd_kafka_q_dump(fp, rkq->rkq_fwdq);
915 } else {
916 rd_kafka_op_t *rko;
917
918 if (!TAILQ_EMPTY(&rkq->rkq_q))
919 fprintf(fp, " Queued ops:\n");
920 TAILQ_FOREACH(rko, &rkq->rkq_q, rko_link) {
921 fprintf(fp, " %p %s (v%"PRId32", flags 0x%x, "
922 "prio %d, len %"PRId32", source %s, "
923 "replyq %p)\n",
924 rko, rd_kafka_op2str(rko->rko_type),
925 rko->rko_version, rko->rko_flags,
926 rko->rko_prio, rko->rko_len,
927 #if ENABLE_DEVEL
928 rko->rko_source
929 #else
930 "-"
931 #endif
932 ,
933 rko->rko_replyq.q
934 );
935 }
936 }
937
938 mtx_unlock(&rkq->rkq_lock);
939}
940
941
942void rd_kafka_enq_once_trigger_destroy (void *ptr) {
943 rd_kafka_enq_once_t *eonce = ptr;
944
945 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__DESTROY, "destroy");
946}
947