1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Background queue thread and event handling.
31 *
32 * See rdkafka.h's rd_kafka_conf_set_background_event_cb() for details.
33 */
34
35#include "rd.h"
36#include "rdkafka_int.h"
37#include "rdkafka_event.h"
38
39/**
40 * @brief Call the registered background_event_cb.
41 * @locality rdkafka background queue thread
42 */
43static RD_INLINE void
44rd_kafka_call_background_event_cb (rd_kafka_t *rk, rd_kafka_op_t *rko) {
45 rd_assert(!rk->rk_background.calling);
46 rk->rk_background.calling = 1;
47
48 rk->rk_conf.background_event_cb(rk, rko, rk->rk_conf.opaque);
49
50 rk->rk_background.calling = 0;
51}
52
53
54/**
55 * @brief Background queue handler.
56 *
57 * Triggers the background_event_cb for all event:able ops,
58 * for non-event:able ops:
59 * - call op callback if set, else
60 * - log and discard the op. This is a user error, forwarding non-event
61 * APIs to the background queue.
62 */
63static rd_kafka_op_res_t
64rd_kafka_background_queue_serve (rd_kafka_t *rk,
65 rd_kafka_q_t *rkq,
66 rd_kafka_op_t *rko,
67 rd_kafka_q_cb_type_t cb_type,
68 void *opaque) {
69 rd_kafka_op_res_t res;
70
71 /*
72 * Dispatch Event:able ops to background_event_cb()
73 */
74 if (likely(rd_kafka_event_setup(rk, rko))) {
75 rd_kafka_call_background_event_cb(rk, rko);
76 /* Event must be destroyed by application. */
77 return RD_KAFKA_OP_RES_HANDLED;
78 }
79
80 /*
81 * Handle non-event:able ops through the standard poll_cb that
82 * will trigger type-specific callbacks (and return OP_RES_HANDLED)
83 * or do no handling and return OP_RES_PASS
84 */
85 res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_CALLBACK, opaque);
86 if (res == RD_KAFKA_OP_RES_HANDLED)
87 return res;
88
89 /* Op was not handled, log and destroy it. */
90 rd_kafka_log(rk, LOG_NOTICE, "BGQUEUE",
91 "No support for handling "
92 "non-event op %s in background queue: discarding",
93 rd_kafka_op2str(rko->rko_type));
94 rd_kafka_op_destroy(rko);
95
96 /* Signal yield to q_serve() (implies that the op was handled). */
97 if (res == RD_KAFKA_OP_RES_YIELD)
98 return res;
99
100 /* Indicate that the op was handled. */
101 return RD_KAFKA_OP_RES_HANDLED;
102}
103
104
105/**
106 * @brief Main loop for background queue thread.
107 */
108int rd_kafka_background_thread_main (void *arg) {
109 rd_kafka_t *rk = arg;
110
111 rd_kafka_set_thread_name("background");
112 rd_kafka_set_thread_sysname("rdk:bg");
113
114 (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
115
116 /* Acquire lock (which was held by thread creator during creation)
117 * to synchronise state. */
118 rd_kafka_wrlock(rk);
119 rd_kafka_wrunlock(rk);
120
121 mtx_lock(&rk->rk_init_lock);
122 rk->rk_init_wait_cnt--;
123 cnd_broadcast(&rk->rk_init_cnd);
124 mtx_unlock(&rk->rk_init_lock);
125
126 while (likely(!rd_kafka_terminating(rk))) {
127 rd_kafka_q_serve(rk->rk_background.q, 10*1000, 0,
128 RD_KAFKA_Q_CB_RETURN,
129 rd_kafka_background_queue_serve, NULL);
130 }
131
132 /* Inform the user that they terminated the client before
133 * all outstanding events were handled. */
134 if (rd_kafka_q_len(rk->rk_background.q) > 0)
135 rd_kafka_log(rk, LOG_INFO, "BGQUEUE",
136 "Purging %d unserved events from background queue",
137 rd_kafka_q_len(rk->rk_background.q));
138 rd_kafka_q_disable(rk->rk_background.q);
139 rd_kafka_q_purge(rk->rk_background.q);
140
141 rd_kafka_dbg(rk, GENERIC, "BGQUEUE",
142 "Background queue thread exiting");
143
144 rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
145
146 return 0;
147}
148
149