1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rd.h"
31#include "rdtime.h"
32#include "rdsysqueue.h"
33
34
35static RD_INLINE void rd_kafka_timers_lock (rd_kafka_timers_t *rkts) {
36 mtx_lock(&rkts->rkts_lock);
37}
38
39static RD_INLINE void rd_kafka_timers_unlock (rd_kafka_timers_t *rkts) {
40 mtx_unlock(&rkts->rkts_lock);
41}
42
43
44static RD_INLINE int rd_kafka_timer_started (const rd_kafka_timer_t *rtmr) {
45 return rtmr->rtmr_interval ? 1 : 0;
46}
47
48
49static RD_INLINE int rd_kafka_timer_scheduled (const rd_kafka_timer_t *rtmr) {
50 return rtmr->rtmr_next ? 1 : 0;
51}
52
53
54static int rd_kafka_timer_cmp (const void *_a, const void *_b) {
55 const rd_kafka_timer_t *a = _a, *b = _b;
56 return (int)(a->rtmr_next - b->rtmr_next);
57}
58
59static void rd_kafka_timer_unschedule (rd_kafka_timers_t *rkts,
60 rd_kafka_timer_t *rtmr) {
61 TAILQ_REMOVE(&rkts->rkts_timers, rtmr, rtmr_link);
62 rtmr->rtmr_next = 0;
63}
64
65static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
66 rd_kafka_timer_t *rtmr, int extra_us) {
67 rd_kafka_timer_t *first;
68
69 /* Timer has been stopped */
70 if (!rtmr->rtmr_interval)
71 return;
72
73 /* Timers framework is terminating */
74 if (unlikely(!rkts->rkts_enabled))
75 return;
76
77 rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us;
78
79 if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
80 first->rtmr_next > rtmr->rtmr_next) {
81 TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
82 cnd_signal(&rkts->rkts_cond);
83 } else
84 TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
85 rd_kafka_timer_t *, rtmr_link,
86 rd_kafka_timer_cmp);
87}
88
89/**
90 * @brief Stop a timer that may be started.
91 * If called from inside a timer callback 'lock' must be 0, else 1.
92 *
93 * @returns 1 if the timer was started (before being stopped), else 0.
94 */
95int rd_kafka_timer_stop (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
96 int lock) {
97 if (lock)
98 rd_kafka_timers_lock(rkts);
99
100 if (!rd_kafka_timer_started(rtmr)) {
101 if (lock)
102 rd_kafka_timers_unlock(rkts);
103 return 0;
104 }
105
106 if (rd_kafka_timer_scheduled(rtmr))
107 rd_kafka_timer_unschedule(rkts, rtmr);
108
109 rtmr->rtmr_interval = 0;
110
111 if (lock)
112 rd_kafka_timers_unlock(rkts);
113
114 return 1;
115}
116
117
118/**
119 * Start the provided timer with the given interval.
120 * Upon expiration of the interval (us) the callback will be called in the
121 * main rdkafka thread, after callback return the timer will be restarted.
122 *
123 * Use rd_kafka_timer_stop() to stop a timer.
124 */
125void rd_kafka_timer_start0 (rd_kafka_timers_t *rkts,
126 rd_kafka_timer_t *rtmr, rd_ts_t interval,
127 rd_bool_t oneshot,
128 void (*callback) (rd_kafka_timers_t *rkts,
129 void *arg),
130 void *arg) {
131 rd_kafka_timers_lock(rkts);
132
133 rd_kafka_timer_stop(rkts, rtmr, 0/*!lock*/);
134
135 rtmr->rtmr_interval = interval;
136 rtmr->rtmr_callback = callback;
137 rtmr->rtmr_arg = arg;
138 rtmr->rtmr_oneshot = oneshot;
139
140 rd_kafka_timer_schedule(rkts, rtmr, 0);
141
142 rd_kafka_timers_unlock(rkts);
143}
144
145/**
146 * Delay the next timer invocation by 'backoff_us'
147 */
148void rd_kafka_timer_backoff (rd_kafka_timers_t *rkts,
149 rd_kafka_timer_t *rtmr, int backoff_us) {
150 rd_kafka_timers_lock(rkts);
151 if (rd_kafka_timer_scheduled(rtmr))
152 rd_kafka_timer_unschedule(rkts, rtmr);
153 rd_kafka_timer_schedule(rkts, rtmr, backoff_us);
154 rd_kafka_timers_unlock(rkts);
155}
156
157
158/**
159 * @returns the delta time to the next time (>=0) this timer fires, or -1
160 * if timer is stopped.
161 */
162rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
163 int do_lock) {
164 rd_ts_t now = rd_clock();
165 rd_ts_t delta = -1;
166
167 if (do_lock)
168 rd_kafka_timers_lock(rkts);
169
170 if (rd_kafka_timer_scheduled(rtmr)) {
171 delta = rtmr->rtmr_next - now;
172 if (delta < 0)
173 delta = 0;
174 }
175
176 if (do_lock)
177 rd_kafka_timers_unlock(rkts);
178
179 return delta;
180}
181
182
183/**
184 * Interrupt rd_kafka_timers_run().
185 * Used for termination.
186 */
187void rd_kafka_timers_interrupt (rd_kafka_timers_t *rkts) {
188 rd_kafka_timers_lock(rkts);
189 cnd_signal(&rkts->rkts_cond);
190 rd_kafka_timers_unlock(rkts);
191}
192
193
194/**
195 * Returns the delta time to the next timer to fire, capped by 'timeout_ms'.
196 */
197rd_ts_t rd_kafka_timers_next (rd_kafka_timers_t *rkts, int timeout_us,
198 int do_lock) {
199 rd_ts_t now = rd_clock();
200 rd_ts_t sleeptime = 0;
201 rd_kafka_timer_t *rtmr;
202
203 if (do_lock)
204 rd_kafka_timers_lock(rkts);
205
206 if (likely((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) != NULL)) {
207 sleeptime = rtmr->rtmr_next - now;
208 if (sleeptime < 0)
209 sleeptime = 0;
210 else if (sleeptime > (rd_ts_t)timeout_us)
211 sleeptime = (rd_ts_t)timeout_us;
212 } else
213 sleeptime = (rd_ts_t)timeout_us;
214
215 if (do_lock)
216 rd_kafka_timers_unlock(rkts);
217
218 return sleeptime;
219}
220
221
222/**
223 * Dispatch timers.
224 * Will block up to 'timeout' microseconds before returning.
225 */
226void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us) {
227 rd_ts_t now = rd_clock();
228 rd_ts_t end = now + timeout_us;
229
230 rd_kafka_timers_lock(rkts);
231
232 while (!rd_kafka_terminating(rkts->rkts_rk) && now <= end) {
233 int64_t sleeptime;
234 rd_kafka_timer_t *rtmr;
235
236 if (timeout_us != RD_POLL_NOWAIT) {
237 sleeptime = rd_kafka_timers_next(rkts,
238 timeout_us,
239 0/*no-lock*/);
240
241 if (sleeptime > 0) {
242 cnd_timedwait_ms(&rkts->rkts_cond,
243 &rkts->rkts_lock,
244 (int)(sleeptime / 1000));
245
246 }
247 }
248
249 now = rd_clock();
250
251 while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) &&
252 rtmr->rtmr_next <= now) {
253
254 rd_kafka_timer_unschedule(rkts, rtmr);
255
256 /* If timer must only be fired once,
257 * disable it now prior to callback. */
258 if (rtmr->rtmr_oneshot)
259 rtmr->rtmr_interval = 0;
260
261 rd_kafka_timers_unlock(rkts);
262
263 rtmr->rtmr_callback(rkts, rtmr->rtmr_arg);
264
265 rd_kafka_timers_lock(rkts);
266
267 /* Restart timer, unless it has been stopped, or
268 * already reschedueld (start()ed) from callback. */
269 if (rd_kafka_timer_started(rtmr) &&
270 !rd_kafka_timer_scheduled(rtmr))
271 rd_kafka_timer_schedule(rkts, rtmr, 0);
272 }
273
274 if (timeout_us == RD_POLL_NOWAIT) {
275 /* Only iterate once, even if rd_clock doesn't change */
276 break;
277 }
278 }
279
280 rd_kafka_timers_unlock(rkts);
281}
282
283
284void rd_kafka_timers_destroy (rd_kafka_timers_t *rkts) {
285 rd_kafka_timer_t *rtmr;
286
287 rd_kafka_timers_lock(rkts);
288 rkts->rkts_enabled = 0;
289 while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)))
290 rd_kafka_timer_stop(rkts, rtmr, 0);
291 rd_kafka_assert(rkts->rkts_rk, TAILQ_EMPTY(&rkts->rkts_timers));
292 rd_kafka_timers_unlock(rkts);
293
294 cnd_destroy(&rkts->rkts_cond);
295 mtx_destroy(&rkts->rkts_lock);
296}
297
298void rd_kafka_timers_init (rd_kafka_timers_t *rkts, rd_kafka_t *rk) {
299 memset(rkts, 0, sizeof(*rkts));
300 rkts->rkts_rk = rk;
301 TAILQ_INIT(&rkts->rkts_timers);
302 mtx_init(&rkts->rkts_lock, mtx_plain);
303 cnd_init(&rkts->rkts_cond);
304 rkts->rkts_enabled = 1;
305}
306