1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2016 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_QUEUE_H_
30#define _RDKAFKA_QUEUE_H_
31
32#include "rdkafka_op.h"
33#include "rdkafka_int.h"
34
35#ifdef _MSC_VER
36#include <io.h> /* for _write() */
37#endif
38
39/** @brief Queueing strategy */
40#define RD_KAFKA_QUEUE_FIFO 0
41#define RD_KAFKA_QUEUE_LIFO 1
42
43TAILQ_HEAD(rd_kafka_op_tailq, rd_kafka_op_s);
44
45struct rd_kafka_q_s {
46 mtx_t rkq_lock;
47 cnd_t rkq_cond;
48 struct rd_kafka_q_s *rkq_fwdq; /* Forwarded/Routed queue.
49 * Used in place of this queue
50 * for all operations. */
51
52 struct rd_kafka_op_tailq rkq_q; /* TAILQ_HEAD(, rd_kafka_op_s) */
53 int rkq_qlen; /* Number of entries in queue */
54 int64_t rkq_qsize; /* Size of all entries in queue */
55 int rkq_refcnt;
56 int rkq_flags;
57#define RD_KAFKA_Q_F_ALLOCATED 0x1 /* Allocated: rd_free on destroy */
58#define RD_KAFKA_Q_F_READY 0x2 /* Queue is ready to be used.
59 * Flag is cleared on destroy */
60#define RD_KAFKA_Q_F_FWD_APP 0x4 /* Queue is being forwarded by a call
61 * to rd_kafka_queue_forward. */
62#define RD_KAFKA_Q_F_YIELD 0x8 /* Have waiters return even if
63 * no rko was enqueued.
64 * This is used to wake up a waiter
65 * by triggering the cond-var
66 * but without having to enqueue
67 * an op. */
68
69 rd_kafka_t *rkq_rk;
70 struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */
71
72 /* Op serve callback (optional).
73 * Mainly used for forwarded queues to use the original queue's
74 * serve function from the forwarded position.
75 * Shall return 1 if op was handled, else 0. */
76 rd_kafka_q_serve_cb_t *rkq_serve;
77 void *rkq_opaque;
78
79#if ENABLE_DEVEL
80 char rkq_name[64]; /* Debugging: queue name (FUNC:LINE) */
81#else
82 const char *rkq_name; /* Debugging: queue name (FUNC) */
83#endif
84};
85
86
87/* Application signalling state holder. */
88struct rd_kafka_q_io {
89 /* For FD-based signalling */
90 int fd;
91 void *payload;
92 size_t size;
93 /* For callback-based signalling */
94 void (*event_cb) (rd_kafka_t *rk, void *opaque);
95 void *event_cb_opaque;
96};
97
98
99
100/**
101 * @return true if queue is ready/enabled, else false.
102 * @remark queue luck must be held by caller (if applicable)
103 */
104static RD_INLINE RD_UNUSED
105int rd_kafka_q_ready (rd_kafka_q_t *rkq) {
106 return rkq->rkq_flags & RD_KAFKA_Q_F_READY;
107}
108
109
110
111
112void rd_kafka_q_init0 (rd_kafka_q_t *rkq, rd_kafka_t *rk,
113 const char *func, int line);
114#define rd_kafka_q_init(rkq,rk) rd_kafka_q_init0(rkq,rk,__FUNCTION__,__LINE__)
115rd_kafka_q_t *rd_kafka_q_new0 (rd_kafka_t *rk, const char *func, int line);
116#define rd_kafka_q_new(rk) rd_kafka_q_new0(rk,__FUNCTION__,__LINE__)
117void rd_kafka_q_destroy_final (rd_kafka_q_t *rkq);
118
119#define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock)
120#define rd_kafka_q_unlock(rkqu) mtx_unlock(&(rkqu)->rkq_lock)
121
122static RD_INLINE RD_UNUSED
123rd_kafka_q_t *rd_kafka_q_keep (rd_kafka_q_t *rkq) {
124 mtx_lock(&rkq->rkq_lock);
125 rkq->rkq_refcnt++;
126 mtx_unlock(&rkq->rkq_lock);
127 return rkq;
128}
129
130static RD_INLINE RD_UNUSED
131rd_kafka_q_t *rd_kafka_q_keep_nolock (rd_kafka_q_t *rkq) {
132 rkq->rkq_refcnt++;
133 return rkq;
134}
135
136
137/**
138 * @returns the queue's name (used for debugging)
139 */
140static RD_INLINE RD_UNUSED
141const char *rd_kafka_q_name (rd_kafka_q_t *rkq) {
142 return rkq->rkq_name;
143}
144
145/**
146 * @returns the final destination queue name (after forwarding)
147 * @remark rkq MUST NOT be locked
148 */
149static RD_INLINE RD_UNUSED
150const char *rd_kafka_q_dest_name (rd_kafka_q_t *rkq) {
151 const char *ret;
152 mtx_lock(&rkq->rkq_lock);
153 if (rkq->rkq_fwdq)
154 ret = rd_kafka_q_dest_name(rkq->rkq_fwdq);
155 else
156 ret = rd_kafka_q_name(rkq);
157 mtx_unlock(&rkq->rkq_lock);
158 return ret;
159}
160
161/**
162 * @brief Disable a queue.
163 * Attempting to enqueue ops to the queue will destroy the ops.
164 */
165static RD_INLINE RD_UNUSED
166void rd_kafka_q_disable0 (rd_kafka_q_t *rkq, int do_lock) {
167 if (do_lock)
168 mtx_lock(&rkq->rkq_lock);
169 rkq->rkq_flags &= ~RD_KAFKA_Q_F_READY;
170 if (do_lock)
171 mtx_unlock(&rkq->rkq_lock);
172}
173#define rd_kafka_q_disable(rkq) rd_kafka_q_disable0(rkq, 1/*lock*/)
174
175int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock);
176#define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1/*lock*/)
177void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq,
178 rd_kafka_toppar_t *rktp, int version);
179
180/**
181 * @brief Loose reference to queue, when refcount reaches 0 the queue
182 * will be destroyed.
183 *
184 * @param disable Also disable the queue, to be used by owner of the queue.
185 */
186static RD_INLINE RD_UNUSED
187void rd_kafka_q_destroy0 (rd_kafka_q_t *rkq, int disable) {
188 int do_delete = 0;
189
190 if (disable) {
191 /* To avoid recursive locking (from ops being purged
192 * that reference this queue somehow),
193 * we disable the queue and purge it with individual
194 * locking. */
195 rd_kafka_q_disable0(rkq, 1/*lock*/);
196 rd_kafka_q_purge0(rkq, 1/*lock*/);
197 }
198
199 mtx_lock(&rkq->rkq_lock);
200 rd_kafka_assert(NULL, rkq->rkq_refcnt > 0);
201 do_delete = !--rkq->rkq_refcnt;
202 mtx_unlock(&rkq->rkq_lock);
203
204 if (unlikely(do_delete))
205 rd_kafka_q_destroy_final(rkq);
206}
207
208#define rd_kafka_q_destroy(rkq) rd_kafka_q_destroy0(rkq, 0/*dont-disable*/)
209
210/**
211 * @brief Queue destroy method to be used by the owner (poller) of
212 * the queue. The only difference to q_destroy() is that this
213 * method also disables the queue so that any q_enq() operations
214 * will fail.
215 * Failure to disable a queue on the poller when it destroys its
216 * queue reference results in ops being enqueued on the queue
217 * but there is noone left to poll it, possibly resulting in a
218 * hang on termination due to refcounts held by the op.
219 */
220static RD_INLINE RD_UNUSED
221void rd_kafka_q_destroy_owner (rd_kafka_q_t *rkq) {
222 rd_kafka_q_destroy0(rkq, 1/*disable*/);
223}
224
225
226/**
227 * Reset a queue.
228 * WARNING: All messages will be lost and leaked.
229 * NOTE: No locking is performed.
230 */
231static RD_INLINE RD_UNUSED
232void rd_kafka_q_reset (rd_kafka_q_t *rkq) {
233 TAILQ_INIT(&rkq->rkq_q);
234 rd_dassert(TAILQ_EMPTY(&rkq->rkq_q));
235 rkq->rkq_qlen = 0;
236 rkq->rkq_qsize = 0;
237}
238
239
240
241/**
242 * Forward 'srcq' to 'destq'
243 */
244void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq,
245 int do_lock, int fwd_app);
246#define rd_kafka_q_fwd_set(S,D) rd_kafka_q_fwd_set0(S,D,1/*lock*/,\
247 0/*no fwd_app*/)
248
249/**
250 * @returns the forward queue (if any) with its refcount increased.
251 * @locks rd_kafka_q_lock(rkq) == !do_lock
252 */
253static RD_INLINE RD_UNUSED
254rd_kafka_q_t *rd_kafka_q_fwd_get (rd_kafka_q_t *rkq, int do_lock) {
255 rd_kafka_q_t *fwdq;
256 if (do_lock)
257 mtx_lock(&rkq->rkq_lock);
258
259 if ((fwdq = rkq->rkq_fwdq))
260 rd_kafka_q_keep(fwdq);
261
262 if (do_lock)
263 mtx_unlock(&rkq->rkq_lock);
264
265 return fwdq;
266}
267
268
269/**
270 * @returns true if queue is forwarded, else false.
271 *
272 * @remark Thread-safe.
273 */
274static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) {
275 int r;
276 mtx_lock(&rkq->rkq_lock);
277 r = rkq->rkq_fwdq ? 1 : 0;
278 mtx_unlock(&rkq->rkq_lock);
279 return r;
280}
281
282
283
284/**
285 * @brief Trigger an IO event for this queue.
286 *
287 * @remark Queue MUST be locked
288 */
289static RD_INLINE RD_UNUSED
290void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
291
292 if (likely(!rkq->rkq_qio))
293 return;
294
295 if (rkq->rkq_qio->event_cb) {
296 rkq->rkq_qio->event_cb(rkq->rkq_rk, rkq->rkq_qio->event_cb_opaque);
297 return;
298 }
299
300 /* Ignore errors, not much to do anyway. */
301 if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload,
302 (int)rkq->rkq_qio->size) == -1)
303 ;
304}
305
306
307/**
308 * @brief rko->rko_prio comparator
309 * @remark: descending order: higher priority takes preceedence.
310 */
311static RD_INLINE RD_UNUSED
312int rd_kafka_op_cmp_prio (const void *_a, const void *_b) {
313 const rd_kafka_op_t *a = _a, *b = _b;
314
315 return b->rko_prio - a->rko_prio;
316}
317
318
319/**
320 * @brief Wake up waiters without enqueuing an op.
321 */
322static RD_INLINE RD_UNUSED void
323rd_kafka_q_yield (rd_kafka_q_t *rkq) {
324 rd_kafka_q_t *fwdq;
325
326 mtx_lock(&rkq->rkq_lock);
327
328 rd_dassert(rkq->rkq_refcnt > 0);
329
330 if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
331 /* Queue has been disabled */
332 mtx_unlock(&rkq->rkq_lock);
333 return;
334 }
335
336 if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
337 rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD;
338 cnd_signal(&rkq->rkq_cond);
339 if (rkq->rkq_qlen == 0)
340 rd_kafka_q_io_event(rkq);
341
342 mtx_unlock(&rkq->rkq_lock);
343 } else {
344 mtx_unlock(&rkq->rkq_lock);
345 rd_kafka_q_yield(fwdq);
346 rd_kafka_q_destroy(fwdq);
347 }
348
349
350}
351
352/**
353 * @brief Low-level unprotected enqueue that only performs
354 * the actual queue enqueue and counter updates.
355 * @remark Will not perform locking, signaling, fwdq, READY checking, etc.
356 */
357static RD_INLINE RD_UNUSED void
358rd_kafka_q_enq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int at_head) {
359 if (likely(!rko->rko_prio))
360 TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
361 else if (at_head)
362 TAILQ_INSERT_HEAD(&rkq->rkq_q, rko, rko_link);
363 else
364 TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *,
365 rko_link, rd_kafka_op_cmp_prio);
366 rkq->rkq_qlen++;
367 rkq->rkq_qsize += rko->rko_len;
368}
369
370
371/**
372 * @brief Enqueue \p rko either at head or tail of \p rkq.
373 *
374 * The provided \p rko is either enqueued or destroyed.
375 *
376 * \p orig_destq is the original (outermost) dest queue for which
377 * this op was enqueued, before any queue forwarding has kicked in.
378 * The rko_serve callback from the orig_destq will be set on the rko
379 * if there is no rko_serve callback already set, and the \p rko isn't
380 * failed because the final queue is disabled.
381 *
382 * @returns 1 if op was enqueued or 0 if queue is disabled and
383 * there was no replyq to enqueue on in which case the rko is destroyed.
384 *
385 * @locality any thread.
386 */
387static RD_INLINE RD_UNUSED
388int rd_kafka_q_enq1 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
389 rd_kafka_q_t *orig_destq, int at_head, int do_lock) {
390 rd_kafka_q_t *fwdq;
391
392 if (do_lock)
393 mtx_lock(&rkq->rkq_lock);
394
395 rd_dassert(rkq->rkq_refcnt > 0);
396
397 if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
398 /* Queue has been disabled, reply to and fail the rko. */
399 if (do_lock)
400 mtx_unlock(&rkq->rkq_lock);
401
402 return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY);
403 }
404
405 if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
406 if (!rko->rko_serve && orig_destq->rkq_serve) {
407 /* Store original queue's serve callback and opaque
408 * prior to forwarding. */
409 rko->rko_serve = orig_destq->rkq_serve;
410 rko->rko_serve_opaque = orig_destq->rkq_opaque;
411 }
412
413 rd_kafka_q_enq0(rkq, rko, at_head);
414 cnd_signal(&rkq->rkq_cond);
415 if (rkq->rkq_qlen == 1)
416 rd_kafka_q_io_event(rkq);
417
418 if (do_lock)
419 mtx_unlock(&rkq->rkq_lock);
420 } else {
421 if (do_lock)
422 mtx_unlock(&rkq->rkq_lock);
423 rd_kafka_q_enq1(fwdq, rko, orig_destq, at_head, 1/*do lock*/);
424 rd_kafka_q_destroy(fwdq);
425 }
426
427 return 1;
428}
429
430/**
431 * @brief Enqueue the 'rko' op at the tail of the queue 'rkq'.
432 *
433 * The provided 'rko' is either enqueued or destroyed.
434 *
435 * @returns 1 if op was enqueued or 0 if queue is disabled and
436 * there was no replyq to enqueue on in which case the rko is destroyed.
437 *
438 * @locality any thread.
439 * @locks rkq MUST NOT be locked
440 */
441static RD_INLINE RD_UNUSED
442int rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
443 return rd_kafka_q_enq1(rkq, rko, rkq, 0/*at tail*/, 1/*do lock*/);
444}
445
446
447/**
448 * @brief Re-enqueue rko at head of rkq.
449 *
450 * The provided 'rko' is either enqueued or destroyed.
451 *
452 * @returns 1 if op was enqueued or 0 if queue is disabled and
453 * there was no replyq to enqueue on in which case the rko is destroyed.
454 *
455 * @locality any thread
456 * @locks rkq MUST BE locked
457 */
458static RD_INLINE RD_UNUSED
459int rd_kafka_q_reenq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
460 return rd_kafka_q_enq1(rkq, rko, rkq, 1/*at head*/, 0/*don't lock*/);
461}
462
463
464/**
465 * Dequeue 'rko' from queue 'rkq'.
466 *
467 * NOTE: rkq_lock MUST be held
468 * Locality: any thread
469 */
470static RD_INLINE RD_UNUSED
471void rd_kafka_q_deq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
472 rd_dassert(rkq->rkq_qlen > 0 &&
473 rkq->rkq_qsize >= (int64_t)rko->rko_len);
474
475 TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
476 rkq->rkq_qlen--;
477 rkq->rkq_qsize -= rko->rko_len;
478}
479
480/**
481 * Concat all elements of 'srcq' onto tail of 'rkq'.
482 * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not.
483 * NOTE: 'srcq' will be reset.
484 *
485 * Locality: any thread.
486 *
487 * @returns 0 if operation was performed or -1 if rkq is disabled.
488 */
489static RD_INLINE RD_UNUSED
490int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
491 int r = 0;
492
493 while (srcq->rkq_fwdq) /* Resolve source queue */
494 srcq = srcq->rkq_fwdq;
495 if (unlikely(srcq->rkq_qlen == 0))
496 return 0; /* Don't do anything if source queue is empty */
497
498 if (do_lock)
499 mtx_lock(&rkq->rkq_lock);
500 if (!rkq->rkq_fwdq) {
501 rd_kafka_op_t *rko;
502
503 rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) ||
504 srcq->rkq_qlen > 0);
505 if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
506 if (do_lock)
507 mtx_unlock(&rkq->rkq_lock);
508 return -1;
509 }
510 /* First insert any prioritized ops from srcq
511 * in the right position in rkq. */
512 while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) {
513 TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
514 TAILQ_INSERT_SORTED(&rkq->rkq_q, rko,
515 rd_kafka_op_t *, rko_link,
516 rd_kafka_op_cmp_prio);
517 }
518
519 TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
520 if (rkq->rkq_qlen == 0)
521 rd_kafka_q_io_event(rkq);
522 rkq->rkq_qlen += srcq->rkq_qlen;
523 rkq->rkq_qsize += srcq->rkq_qsize;
524 cnd_signal(&rkq->rkq_cond);
525
526 rd_kafka_q_reset(srcq);
527 } else
528 r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
529 srcq,
530 rkq->rkq_fwdq ? do_lock : 0);
531 if (do_lock)
532 mtx_unlock(&rkq->rkq_lock);
533
534 return r;
535}
536
537#define rd_kafka_q_concat(dstq,srcq) rd_kafka_q_concat0(dstq,srcq,1/*lock*/)
538
539
540/**
541 * @brief Prepend all elements of 'srcq' onto head of 'rkq'.
542 * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not.
543 * 'srcq' will be reset.
544 *
545 * @remark Will not respect priority of ops, srcq will be prepended in its
546 * original form to rkq.
547 *
548 * @locality any thread.
549 */
550static RD_INLINE RD_UNUSED
551void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq,
552 int do_lock) {
553 if (do_lock)
554 mtx_lock(&rkq->rkq_lock);
555 if (!rkq->rkq_fwdq && !srcq->rkq_fwdq) {
556 /* FIXME: prio-aware */
557 /* Concat rkq on srcq */
558 TAILQ_CONCAT(&srcq->rkq_q, &rkq->rkq_q, rko_link);
559 /* Move srcq to rkq */
560 TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link);
561 if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0)
562 rd_kafka_q_io_event(rkq);
563 rkq->rkq_qlen += srcq->rkq_qlen;
564 rkq->rkq_qsize += srcq->rkq_qsize;
565
566 rd_kafka_q_reset(srcq);
567 } else
568 rd_kafka_q_prepend0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
569 srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq,
570 rkq->rkq_fwdq ? do_lock : 0);
571 if (do_lock)
572 mtx_unlock(&rkq->rkq_lock);
573}
574
575#define rd_kafka_q_prepend(dstq,srcq) rd_kafka_q_prepend0(dstq,srcq,1/*lock*/)
576
577
578/* Returns the number of elements in the queue */
579static RD_INLINE RD_UNUSED
580int rd_kafka_q_len (rd_kafka_q_t *rkq) {
581 int qlen;
582 rd_kafka_q_t *fwdq;
583 mtx_lock(&rkq->rkq_lock);
584 if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
585 qlen = rkq->rkq_qlen;
586 mtx_unlock(&rkq->rkq_lock);
587 } else {
588 mtx_unlock(&rkq->rkq_lock);
589 qlen = rd_kafka_q_len(fwdq);
590 rd_kafka_q_destroy(fwdq);
591 }
592 return qlen;
593}
594
595/* Returns the total size of elements in the queue */
596static RD_INLINE RD_UNUSED
597uint64_t rd_kafka_q_size (rd_kafka_q_t *rkq) {
598 uint64_t sz;
599 rd_kafka_q_t *fwdq;
600 mtx_lock(&rkq->rkq_lock);
601 if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
602 sz = rkq->rkq_qsize;
603 mtx_unlock(&rkq->rkq_lock);
604 } else {
605 mtx_unlock(&rkq->rkq_lock);
606 sz = rd_kafka_q_size(fwdq);
607 rd_kafka_q_destroy(fwdq);
608 }
609 return sz;
610}
611
612/**
613 * @brief Construct a temporary on-stack replyq with increased
614 * \p rkq refcount (unless NULL), version, and debug id.
615 */
616static RD_INLINE RD_UNUSED rd_kafka_replyq_t
617rd_kafka_replyq_make (rd_kafka_q_t *rkq, int version, const char *id) {
618 rd_kafka_replyq_t replyq = RD_ZERO_INIT;
619
620 if (rkq) {
621 replyq.q = rd_kafka_q_keep(rkq);
622 replyq.version = version;
623#if ENABLE_DEVEL
624 replyq._id = rd_strdup(id);
625#endif
626 }
627
628 return replyq;
629}
630
631/* Construct temporary on-stack replyq with increased Q refcount and
632 * optional VERSION. */
633#define RD_KAFKA_REPLYQ(Q,VERSION) rd_kafka_replyq_make(Q,VERSION,__FUNCTION__)
634
635/* Construct temporary on-stack replyq for indicating no replyq. */
636#if ENABLE_DEVEL
637#define RD_KAFKA_NO_REPLYQ (rd_kafka_replyq_t){NULL, 0, NULL}
638#else
639#define RD_KAFKA_NO_REPLYQ (rd_kafka_replyq_t){NULL, 0}
640#endif
641
642/**
643 * Set up replyq.
644 * Q refcnt is increased.
645 */
646static RD_INLINE RD_UNUSED void
647rd_kafka_set_replyq (rd_kafka_replyq_t *replyq,
648 rd_kafka_q_t *rkq, int32_t version) {
649 replyq->q = rkq ? rd_kafka_q_keep(rkq) : NULL;
650 replyq->version = version;
651#if ENABLE_DEVEL
652 replyq->_id = rd_strdup(__FUNCTION__);
653#endif
654}
655
656/**
657 * Set rko's replyq with an optional version (versionptr != NULL).
658 * Q refcnt is increased.
659 */
660static RD_INLINE RD_UNUSED void
661rd_kafka_op_set_replyq (rd_kafka_op_t *rko, rd_kafka_q_t *rkq,
662 rd_atomic32_t *versionptr) {
663 rd_kafka_set_replyq(&rko->rko_replyq, rkq,
664 versionptr ? rd_atomic32_get(versionptr) : 0);
665}
666
667/* Set reply rko's version from replyq's version */
668#define rd_kafka_op_get_reply_version(REPLY_RKO, ORIG_RKO) do { \
669 (REPLY_RKO)->rko_version = (ORIG_RKO)->rko_replyq.version; \
670 } while (0)
671
672
673/* Clear replyq holder without decreasing any .q references. */
674static RD_INLINE RD_UNUSED void
675rd_kafka_replyq_clear (rd_kafka_replyq_t *replyq) {
676 memset(replyq, 0, sizeof(*replyq));
677}
678
679/**
680 * @brief Make a copy of \p src in \p dst, with its own queue reference
681 */
682static RD_INLINE RD_UNUSED void
683rd_kafka_replyq_copy (rd_kafka_replyq_t *dst, rd_kafka_replyq_t *src) {
684 dst->version = src->version;
685 dst->q = src->q;
686 if (dst->q)
687 rd_kafka_q_keep(dst->q);
688#if ENABLE_DEVEL
689 if (src->_id)
690 dst->_id = rd_strdup(src->_id);
691 else
692 dst->_id = NULL;
693#endif
694}
695
696
697/**
698 * Clear replyq holder and destroy any .q references.
699 */
700static RD_INLINE RD_UNUSED void
701rd_kafka_replyq_destroy (rd_kafka_replyq_t *replyq) {
702 if (replyq->q)
703 rd_kafka_q_destroy(replyq->q);
704#if ENABLE_DEVEL
705 if (replyq->_id) {
706 rd_free(replyq->_id);
707 replyq->_id = NULL;
708 }
709#endif
710 rd_kafka_replyq_clear(replyq);
711}
712
713
714/**
715 * @brief Wrapper for rd_kafka_q_enq() that takes a replyq,
716 * steals its queue reference, enqueues the op with the replyq version,
717 * and then destroys the queue reference.
718 *
719 * If \p version is non-zero it will be updated, else replyq->version.
720 *
721 * @returns Same as rd_kafka_q_enq()
722 */
723static RD_INLINE RD_UNUSED int
724rd_kafka_replyq_enq (rd_kafka_replyq_t *replyq, rd_kafka_op_t *rko,
725 int version) {
726 rd_kafka_q_t *rkq = replyq->q;
727 int r;
728
729 if (version)
730 rko->rko_version = version;
731 else
732 rko->rko_version = replyq->version;
733
734 /* The replyq queue reference is done after we've enqueued the rko
735 * so clear it here. */
736 replyq->q = NULL; /* destroyed separately below */
737
738#if ENABLE_DEVEL
739 if (replyq->_id) {
740 rd_free(replyq->_id);
741 replyq->_id = NULL;
742 }
743#endif
744
745 /* Retain replyq->version since it is used by buf_callback
746 * when dispatching the callback. */
747
748 r = rd_kafka_q_enq(rkq, rko);
749
750 rd_kafka_q_destroy(rkq);
751
752 return r;
753}
754
755
756
757rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
758 int32_t version,
759 rd_kafka_q_cb_type_t cb_type,
760 rd_kafka_q_serve_cb_t *callback,
761 void *opaque);
762rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms,
763 int32_t version);
764int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
765 rd_kafka_q_cb_type_t cb_type,
766 rd_kafka_q_serve_cb_t *callback,
767 void *opaque);
768
769
770int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
771 int cnt, int do_locks);
772
773int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
774 rd_kafka_message_t **rkmessages,
775 size_t rkmessages_size);
776rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms);
777
778int rd_kafka_q_apply (rd_kafka_q_t *rkq,
779 int (*callback) (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
780 void *opaque),
781 void *opaque);
782
783void rd_kafka_q_fix_offsets (rd_kafka_q_t *rkq, int64_t min_offset,
784 int64_t base_offset);
785
786/**
787 * @returns the last op in the queue matching \p op_type and \p allow_err (bool)
788 * @remark The \p rkq must be properly locked before this call, the returned rko
789 * is not removed from the queue and may thus not be held for longer
790 * than the lock is held.
791 */
792static RD_INLINE RD_UNUSED
793rd_kafka_op_t *rd_kafka_q_last (rd_kafka_q_t *rkq, rd_kafka_op_type_t op_type,
794 int allow_err) {
795 rd_kafka_op_t *rko;
796 TAILQ_FOREACH_REVERSE(rko, &rkq->rkq_q, rd_kafka_op_tailq, rko_link) {
797 if (rko->rko_type == op_type &&
798 (allow_err || !rko->rko_err))
799 return rko;
800 }
801
802 return NULL;
803}
804
805void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd,
806 const void *payload, size_t size);
807
808/* Public interface */
809struct rd_kafka_queue_s {
810 rd_kafka_q_t *rkqu_q;
811 rd_kafka_t *rkqu_rk;
812 int rkqu_is_owner; /**< Is owner/creator of rkqu_q */
813};
814
815
816void rd_kafka_q_dump (FILE *fp, rd_kafka_q_t *rkq);
817
818extern int RD_TLS rd_kafka_yield_thread;
819
820
821
822/**
823 * @name Enqueue op once
824 * @{
825 */
826
827/**
828 * @brief Minimal rd_kafka_op_t wrapper that ensures that
829 * the op is only enqueued on the provided queue once.
830 *
831 * Typical use-case is for an op to be triggered from multiple sources,
832 * but at most once, such as from a timer and some other source.
833 */
834typedef struct rd_kafka_enq_once_s {
835 mtx_t lock;
836 int refcnt;
837 rd_kafka_op_t *rko;
838 rd_kafka_replyq_t replyq;
839} rd_kafka_enq_once_t;
840
841
842/**
843 * @brief Allocate and set up a new eonce and set the initial refcount to 1.
844 * @remark This is to be called by the owner of the rko.
845 */
846static RD_INLINE RD_UNUSED
847rd_kafka_enq_once_t *
848rd_kafka_enq_once_new (rd_kafka_op_t *rko, rd_kafka_replyq_t replyq) {
849 rd_kafka_enq_once_t *eonce = rd_calloc(1, sizeof(*eonce));
850 mtx_init(&eonce->lock, mtx_plain);
851 eonce->rko = rko;
852 eonce->replyq = replyq; /* struct copy */
853 eonce->refcnt = 1;
854 return eonce;
855}
856
857/**
858 * @brief Re-enable triggering of a eonce even after it has been triggered
859 * once.
860 *
861 * @remark This is to be called by the owner.
862 */
863static RD_INLINE RD_UNUSED
864void
865rd_kafka_enq_once_reenable (rd_kafka_enq_once_t *eonce,
866 rd_kafka_op_t *rko, rd_kafka_replyq_t replyq) {
867 mtx_lock(&eonce->lock);
868 eonce->rko = rko;
869 rd_kafka_replyq_destroy(&eonce->replyq);
870 eonce->replyq = replyq; /* struct copy */
871 mtx_unlock(&eonce->lock);
872}
873
874
875/**
876 * @brief Free eonce and its resources. Must only be called with refcnt==0
877 * and eonce->lock NOT held.
878 */
879static RD_INLINE RD_UNUSED
880void rd_kafka_enq_once_destroy0 (rd_kafka_enq_once_t *eonce) {
881 /* This must not be called with the rko or replyq still set, which would
882 * indicate that no enqueueing was performed and that the owner
883 * did not clean up, which is a bug. */
884 rd_assert(!eonce->rko);
885 rd_assert(!eonce->replyq.q);
886#if ENABLE_DEVEL
887 rd_assert(!eonce->replyq._id);
888#endif
889 rd_assert(eonce->refcnt == 0);
890
891 mtx_destroy(&eonce->lock);
892 rd_free(eonce);
893}
894
895
896/**
897 * @brief Increment refcount for source (non-owner), such as a timer.
898 *
899 * @param srcdesc a human-readable descriptive string of the source.
900 * May be used for future debugging.
901 */
902static RD_INLINE RD_UNUSED
903void rd_kafka_enq_once_add_source (rd_kafka_enq_once_t *eonce,
904 const char *srcdesc) {
905 mtx_lock(&eonce->lock);
906 eonce->refcnt++;
907 mtx_unlock(&eonce->lock);
908}
909
910
911/**
912 * @brief Decrement refcount for source (non-owner), such as a timer.
913 *
914 * @param srcdesc a human-readable descriptive string of the source.
915 * May be used for future debugging.
916 *
917 * @remark Must only be called from the owner with the owner
918 * still holding its own refcount.
919 * This API is used to undo an add_source() from the
920 * same code.
921 */
922static RD_INLINE RD_UNUSED
923void rd_kafka_enq_once_del_source (rd_kafka_enq_once_t *eonce,
924 const char *srcdesc) {
925 int do_destroy;
926
927 mtx_lock(&eonce->lock);
928 rd_assert(eonce->refcnt > 1);
929 eonce->refcnt--;
930 do_destroy = eonce->refcnt == 0;
931 mtx_unlock(&eonce->lock);
932
933 if (do_destroy) {
934 /* We're the last refcount holder, clean up eonce. */
935 rd_kafka_enq_once_destroy0(eonce);
936 }
937}
938
939/**
940 * @brief Trigger a source's reference where the eonce resides on
941 * an rd_list_t. This is typically used as a free_cb for
942 * rd_list_destroy() and the trigger error code is
943 * always RD_KAFKA_RESP_ERR__DESTROY.
944 */
945void rd_kafka_enq_once_trigger_destroy (void *ptr);
946
947
948/**
949 * @brief Trigger enqueuing of the rko (unless already enqueued)
950 * and drops the source's refcount.
951 *
952 * @remark Must only be called by sources (non-owner).
953 */
954static RD_INLINE RD_UNUSED
955void rd_kafka_enq_once_trigger (rd_kafka_enq_once_t *eonce,
956 rd_kafka_resp_err_t err,
957 const char *srcdesc) {
958 int do_destroy;
959 rd_kafka_op_t *rko = NULL;
960 rd_kafka_replyq_t replyq = RD_ZERO_INIT;
961
962 mtx_lock(&eonce->lock);
963
964 rd_assert(eonce->refcnt > 0);
965 eonce->refcnt--;
966 do_destroy = eonce->refcnt == 0;
967
968 if (eonce->rko) {
969 /* Not already enqueued, do it.
970 * Detach the rko and replyq from the eonce and unlock the eonce
971 * before enqueuing rko on reply to avoid recursive locks
972 * if the replyq has been disabled and the ops
973 * destructor is called (which might then access the eonce
974 * to clean up). */
975 rko = eonce->rko;
976 replyq = eonce->replyq;
977
978 eonce->rko = NULL;
979 rd_kafka_replyq_clear(&eonce->replyq);
980
981 /* Reply is enqueued at the end of this function */
982 }
983 mtx_unlock(&eonce->lock);
984
985 if (do_destroy) {
986 /* We're the last refcount holder, clean up eonce. */
987 rd_kafka_enq_once_destroy0(eonce);
988 }
989
990 if (rko) {
991 rd_kafka_replyq_enq(&replyq, rko, replyq.version);
992 rd_kafka_replyq_destroy(&replyq);
993 }
994}
995
996/**
997 * @brief Destroy eonce, must only be called by the owner.
998 * There may be outstanding refcounts by non-owners after this call
999 */
1000static RD_INLINE RD_UNUSED
1001void rd_kafka_enq_once_destroy (rd_kafka_enq_once_t *eonce) {
1002 int do_destroy;
1003
1004 mtx_lock(&eonce->lock);
1005 rd_assert(eonce->refcnt > 0);
1006 eonce->refcnt--;
1007 do_destroy = eonce->refcnt == 0;
1008
1009 eonce->rko = NULL;
1010 rd_kafka_replyq_destroy(&eonce->replyq);
1011
1012 mtx_unlock(&eonce->lock);
1013
1014 if (do_destroy) {
1015 /* We're the last refcount holder, clean up eonce. */
1016 rd_kafka_enq_once_destroy0(eonce);
1017 }
1018}
1019
1020
1021/**
1022 * @brief Disable the owner's eonce, extracting, resetting and returning
1023 * the \c rko object.
1024 *
1025 * This is the same as rd_kafka_enq_once_destroy() but returning
1026 * the rko.
1027 *
1028 * Use this for owner-thread triggering where the enqueuing of the
1029 * rko on the replyq is not necessary.
1030 *
1031 * @returns the eonce's rko object, if still available, else NULL.
1032 */
1033static RD_INLINE RD_UNUSED
1034rd_kafka_op_t *rd_kafka_enq_once_disable (rd_kafka_enq_once_t *eonce) {
1035 int do_destroy;
1036 rd_kafka_op_t *rko;
1037
1038 mtx_lock(&eonce->lock);
1039 rd_assert(eonce->refcnt > 0);
1040 eonce->refcnt--;
1041 do_destroy = eonce->refcnt == 0;
1042
1043 /* May be NULL */
1044 rko = eonce->rko;
1045 eonce->rko = NULL;
1046 rd_kafka_replyq_destroy(&eonce->replyq);
1047
1048 mtx_unlock(&eonce->lock);
1049
1050 if (do_destroy) {
1051 /* We're the last refcount holder, clean up eonce. */
1052 rd_kafka_enq_once_destroy0(eonce);
1053 }
1054
1055 return rko;
1056}
1057
1058
1059/**@}*/
1060
1061
1062#endif /* _RDKAFKA_QUEUE_H_ */
1063