1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rd.h"
30#include "rdkafka_int.h"
31#include "rdkafka_request.h"
32
33#include <stdarg.h>
34
35/**
36 * @name Idempotent Producer logic
37 *
38 *
39 *
40 */
41
42static void rd_kafka_idemp_restart_request_pid_tmr (rd_kafka_t *rk,
43 rd_bool_t immediate);
44
45
46/**
47 * @brief Set the producer's idempotence state.
48 * @locks rd_kafka_wrlock() MUST be held
49 */
50void rd_kafka_idemp_set_state (rd_kafka_t *rk,
51 rd_kafka_idemp_state_t new_state) {
52
53 if (rk->rk_eos.idemp_state == new_state)
54 return;
55
56 rd_kafka_dbg(rk, EOS, "IDEMPSTATE",
57 "Idempotent producer state change %s -> %s",
58 rd_kafka_idemp_state2str(rk->rk_eos.
59 idemp_state),
60 rd_kafka_idemp_state2str(new_state));
61
62 rk->rk_eos.idemp_state = new_state;
63 rk->rk_eos.ts_idemp_state = rd_clock();
64}
65
66
67
68
69
70
71
72/**
73 * @brief Acquire Pid by looking up a suitable broker and then
74 * sending an InitProducerIdRequest to it.
75 *
76 * @param rkb may be set to specify a broker to use, otherwise a suitable
77 * one is looked up.
78 *
79 * @returns 1 if a request was enqueued, or 0 if no broker was available,
80 * incorrect state, or other error.
81 *
82 * @locality rdkafka main thread
83 * @locks none
84 */
85int rd_kafka_idemp_request_pid (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
86 const char *reason) {
87
88 rd_kafka_resp_err_t err;
89 char errstr[128];
90
91 rd_assert(thrd_is_current(rk->rk_thread));
92
93 if (unlikely(rd_kafka_fatal_error_code(rk))) {
94 /* If a fatal error has been raised we do not
95 * attempt to acquire a new PID. */
96 return 0;
97 }
98
99 rd_kafka_wrlock(rk);
100 if (rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_REQ_PID) {
101 rd_kafka_wrunlock(rk);
102 return 0;
103 }
104
105 if (!rkb) {
106 rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
107 rd_kafka_broker_filter_non_idempotent,
108 NULL, "acquire ProducerID");
109 if (!rkb) {
110 int up_cnt = rd_atomic32_get(&rk->rk_broker_up_cnt);
111 int all_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
112 int err_unsupported =
113 up_cnt > 0 &&
114 rd_interval(&rk->rk_suppress.no_idemp_brokers,
115 5*60*1000000/*5 minutes*/, 0) > 0;
116
117 rd_kafka_wrunlock(rk);
118 rd_kafka_idemp_restart_request_pid_tmr(rk, rd_false);
119
120 if (err_unsupported)
121 rd_kafka_op_err(
122 rk,
123 RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
124 "Idempotent Producer not supported by "
125 "any of the %d broker(s) in state UP: "
126 "requires broker version >= 0.11.0",
127 up_cnt);
128 else if (up_cnt == 0)
129 rd_kafka_dbg(rk, EOS, "PIDBROKER",
130 "No brokers available for "
131 "acquiring Producer ID: "
132 "no brokers are up");
133 else
134 rd_kafka_dbg(rk, EOS, "PIDBROKER",
135 "None of the %d/%d brokers in "
136 "state UP supports "
137 "the Idempotent Producer: "
138 "requires broker "
139 "version >= 0.11.0",
140 up_cnt, all_cnt);
141 return 0;
142 }
143 } else {
144 /* Increase passed broker's refcount so we don't
145 * have to check if rkb should be destroyed or not below
146 * (broker_any() returns a new reference). */
147 rd_kafka_broker_keep(rkb);
148 }
149
150 rd_rkb_dbg(rkb, EOS, "GETPID", "Acquiring ProducerId: %s", reason);
151
152 err = rd_kafka_InitProducerIdRequest(
153 rkb, NULL, -1,
154 errstr, sizeof(errstr),
155 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
156 rd_kafka_handle_InitProducerId, NULL);
157
158 if (!err) {
159 rd_kafka_idemp_set_state(rkb->rkb_rk,
160 RD_KAFKA_IDEMP_STATE_WAIT_PID);
161 rd_kafka_wrunlock(rkb->rkb_rk);
162 rd_kafka_broker_destroy(rkb);
163 return 1;
164 }
165
166 rd_kafka_wrunlock(rkb->rkb_rk);
167
168 rd_rkb_dbg(rkb, EOS, "GETPID",
169 "Can't acquire ProducerId from this broker: %s", errstr);
170 rd_kafka_idemp_restart_request_pid_tmr(rk, rd_false);
171
172 rd_kafka_broker_destroy(rkb);
173
174 return 0;
175}
176
177
178/**
179 * @brief Timed PID retrieval timer callback.
180 */
181static void rd_kafka_idemp_request_pid_tmr_cb (rd_kafka_timers_t *rkts,
182 void *arg) {
183 rd_kafka_t *rk = arg;
184
185 rd_kafka_idemp_request_pid(rk, NULL, "retry timer");
186}
187
188
189/**
190 * @brief Restart the pid retrieval timer.
191 *
192 * @param immediate If true, request a pid as soon as possible,
193 * else use the default interval (500ms).
194 * @locality any
195 * @locks none
196 */
197static void rd_kafka_idemp_restart_request_pid_tmr (rd_kafka_t *rk,
198 rd_bool_t immediate) {
199 rd_kafka_timer_start_oneshot(&rk->rk_timers,
200 &rk->rk_eos.request_pid_tmr,
201 1000 * (immediate ? 1 : 500/*500ms*/),
202 rd_kafka_idemp_request_pid_tmr_cb, rk);
203}
204
205
206/**
207 * @brief Handle failure to acquire a PID from broker.
208 *
209 * @locality rdkafka main thread
210 * @locks none
211 */
212void rd_kafka_idemp_request_pid_failed (rd_kafka_broker_t *rkb,
213 rd_kafka_resp_err_t err) {
214 rd_kafka_t *rk = rkb->rkb_rk;
215
216 rd_rkb_dbg(rkb, EOS, "GETPID",
217 "Failed to acquire PID: %s", rd_kafka_err2str(err));
218
219 if (err == RD_KAFKA_RESP_ERR__DESTROY)
220 return; /* Ignore */
221
222 rd_assert(thrd_is_current(rk->rk_thread));
223
224 /* FIXME: Handle special errors, maybe raise certain errors
225 * to the application (such as UNSUPPORTED_FEATURE) */
226
227 /* Retry request after a short wait. */
228 rd_kafka_idemp_restart_request_pid_tmr(rk, rd_false);
229}
230
231
232/**
233 * @brief Update Producer ID from InitProducerId response.
234 *
235 * @remark If we've already have a PID the new one is ignored.
236 *
237 * @locality rdkafka main thread
238 * @locks none
239 */
240void rd_kafka_idemp_pid_update (rd_kafka_broker_t *rkb,
241 const rd_kafka_pid_t pid) {
242 rd_kafka_t *rk = rkb->rkb_rk;
243
244 rd_kafka_wrlock(rk);
245 if (rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_WAIT_PID) {
246 rd_rkb_dbg(rkb, EOS, "GETPID",
247 "Ignoring InitProduceId response (%s) "
248 "in state %s",
249 rd_kafka_pid2str(pid),
250 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
251 rd_kafka_wrunlock(rk);
252 return;
253 }
254
255 if (!rd_kafka_pid_valid(pid)) {
256 rd_kafka_wrunlock(rk);
257 rd_rkb_log(rkb, LOG_WARNING, "GETPID",
258 "Acquired invalid PID{%"PRId64",%hd}: ignoring",
259 pid.id, pid.epoch);
260 rd_kafka_idemp_request_pid_failed(rkb,
261 RD_KAFKA_RESP_ERR__BAD_MSG);
262 return;
263 }
264
265 if (rd_kafka_pid_valid(rk->rk_eos.pid))
266 rd_kafka_dbg(rk, EOS, "GETPID",
267 "Acquired %s (previous %s)",
268 rd_kafka_pid2str(pid),
269 rd_kafka_pid2str(rk->rk_eos.pid));
270 else
271 rd_kafka_dbg(rk, EOS, "GETPID",
272 "Acquired %s", rd_kafka_pid2str(pid));
273 rk->rk_eos.pid = pid;
274 rk->rk_eos.epoch_cnt++;
275
276 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_ASSIGNED);
277
278 rd_kafka_wrunlock(rk);
279
280 /* Wake up all broker threads (that may have messages to send
281 * that were waiting for a Producer ID). */
282 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
283}
284
285
286/**
287 * @brief Call when all partition request queues
288 * are drained to reset and re-request a new PID.
289 *
290 * @locality any
291 * @locks none
292 */
293static void rd_kafka_idemp_drain_done (rd_kafka_t *rk) {
294 rd_bool_t restart_tmr = rd_false;
295 rd_bool_t wakeup_brokers = rd_false;
296
297 rd_kafka_wrlock(rk);
298 if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_DRAIN_RESET) {
299 rd_kafka_dbg(rk, EOS, "DRAIN", "All partitions drained");
300 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_REQ_PID);
301 restart_tmr = rd_true;
302
303 } else if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_DRAIN_BUMP &&
304 rd_kafka_pid_valid(rk->rk_eos.pid)) {
305 rk->rk_eos.pid = rd_kafka_pid_bump(rk->rk_eos.pid);
306 rd_kafka_dbg(rk, EOS, "DRAIN",
307 "All partitions drained, bumped epoch to %s",
308 rd_kafka_pid2str(rk->rk_eos.pid));
309 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_ASSIGNED);
310 wakeup_brokers = rd_true;
311 }
312 rd_kafka_wrunlock(rk);
313
314 /* Restart timer to eventually trigger a re-request */
315 if (restart_tmr)
316 rd_kafka_idemp_restart_request_pid_tmr(rk, rd_true);
317
318 /* Wake up all broker threads (that may have messages to send
319 * that were waiting for a Producer ID). */
320 if (wakeup_brokers)
321 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
322
323}
324
325/**
326 * @brief Check if in-flight toppars drain is done, if so transition to
327 * next state.
328 *
329 * @locality any
330 * @locks none
331 */
332static RD_INLINE void rd_kafka_idemp_check_drain_done (rd_kafka_t *rk) {
333 if (rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt) == 0)
334 rd_kafka_idemp_drain_done(rk);
335}
336
337
338/**
339 * @brief Schedule a reset and re-request of PID when the
340 * local ProduceRequest queues have been fully drained.
341 *
342 * The PID is not reset until the queues are fully drained.
343 *
344 * @locality any
345 * @locks none
346 */
347void rd_kafka_idemp_drain_reset (rd_kafka_t *rk) {
348 rd_kafka_wrlock(rk);
349 rd_kafka_dbg(rk, EOS, "DRAIN",
350 "Beginning partition drain for %s reset "
351 "for %d partition(s) with in-flight requests",
352 rd_kafka_pid2str(rk->rk_eos.pid),
353 rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt));
354 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_RESET);
355 rd_kafka_wrunlock(rk);
356
357 /* Check right away if the drain could be done. */
358 rd_kafka_idemp_check_drain_done(rk);
359}
360
361
362/**
363 * @brief Schedule an epoch bump when the local ProduceRequest queues
364 * have been fully drained.
365 *
366 * The PID is not bumped until the queues are fully drained.
367 *
368 * @param fmt is a human-readable reason for the bump
369 *
370 *
371 * @locality any
372 * @locks none
373 */
374void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...) {
375 va_list ap;
376 char buf[256];
377
378 va_start(ap, fmt);
379 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
380 va_end(ap);
381
382 rd_kafka_wrlock(rk);
383 rd_kafka_dbg(rk, EOS, "DRAIN",
384 "Beginning partition drain for %s epoch bump "
385 "for %d partition(s) with in-flight requests: %s",
386 rd_kafka_pid2str(rk->rk_eos.pid),
387 rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), buf);
388 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_BUMP);
389 rd_kafka_wrunlock(rk);
390
391 /* Check right away if the drain could be done. */
392 rd_kafka_idemp_check_drain_done(rk);
393}
394
395/**
396 * @brief Mark partition as waiting-to-drain.
397 *
398 * @locks toppar_lock MUST be held
399 * @locality broker thread (leader or not)
400 */
401void rd_kafka_idemp_drain_toppar (rd_kafka_toppar_t *rktp,
402 const char *reason) {
403 if (rktp->rktp_eos.wait_drain)
404 return;
405
406 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, EOS|RD_KAFKA_DBG_TOPIC, "DRAIN",
407 "%.*s [%"PRId32"] beginning partition drain: %s",
408 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
409 rktp->rktp_partition, reason);
410 rktp->rktp_eos.wait_drain = rd_true;
411}
412
413
414/**
415 * @brief Mark partition as no longer having a ProduceRequest in-flight.
416 *
417 * @locality any
418 * @locks none
419 */
420void rd_kafka_idemp_inflight_toppar_sub (rd_kafka_t *rk,
421 rd_kafka_toppar_t *rktp) {
422 int r = rd_atomic32_sub(&rk->rk_eos.inflight_toppar_cnt, 1);
423
424 if (r == 0) {
425 /* Check if we're waiting for the partitions to drain
426 * before resetting the PID, and if so trigger a reset
427 * since this was the last drained one. */
428 rd_kafka_idemp_drain_done(rk);
429 } else {
430 rd_assert(r >= 0);
431 }
432}
433
434
435/**
436 * @brief Mark partition as having a ProduceRequest in-flight.
437 *
438 * @locality toppar handler thread
439 * @locks none
440 */
441void rd_kafka_idemp_inflight_toppar_add (rd_kafka_t *rk,
442 rd_kafka_toppar_t *rktp) {
443 rd_atomic32_add(&rk->rk_eos.inflight_toppar_cnt, 1);
444}
445
446
447/**
448 * @brief Initialize the idempotent producer.
449 *
450 * @remark Must be called from rd_kafka_new() and only once.
451 * @locality application thread
452 * @locks none / not needed from rd_kafka_new()
453 */
454void rd_kafka_idemp_init (rd_kafka_t *rk) {
455 rd_assert(thrd_is_current(rk->rk_thread));
456
457 rd_atomic32_init(&rk->rk_eos.inflight_toppar_cnt, 0);
458
459 rd_kafka_wrlock(rk);
460 rd_kafka_pid_reset(&rk->rk_eos.pid);
461
462 /* There are no available brokers this early, so just set
463 * the state to indicate that we want to acquire a PID as soon
464 * as possible and start the timer. */
465 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_REQ_PID);
466 rd_kafka_wrunlock(rk);
467
468 rd_kafka_idemp_restart_request_pid_tmr(rk, rd_false);
469}
470
471
472/**
473 * @brief Terminate and clean up idempotent producer
474 *
475 * @locality rdkafka main thread
476 * @locks rd_kafka_wrlock() MUST be held
477 */
478void rd_kafka_idemp_term (rd_kafka_t *rk) {
479 rd_assert(thrd_is_current(rk->rk_thread));
480
481 rd_kafka_wrlock(rk);
482 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_TERM);
483 rd_kafka_wrunlock(rk);
484 rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_eos.request_pid_tmr, 1);
485}
486
487
488