1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30#ifndef _RD_KAFKA_IDEMPOTENCE_H_
31#define _RD_KAFKA_IDEMPOTENCE_H_
32
33
34/**
35 * @define The broker maintains a window of the 5 last Produce requests
36 * for a partition to be able to de-deduplicate resends.
37 */
38#define RD_KAFKA_IDEMP_MAX_INFLIGHT 5
39#define RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "5" /* For printouts */
40
41/**
42 * @brief Get the current PID if state permits.
43 *
44 * @returns If there is no valid PID or the state
45 * does not permit further PID usage (such as when draining)
46 * then an invalid PID is returned.
47 *
48 * @locality any
49 * @locks none
50 */
51
52static RD_UNUSED RD_INLINE rd_kafka_pid_t
53rd_kafka_idemp_get_pid (rd_kafka_t *rk) {
54 rd_kafka_pid_t pid;
55
56 rd_kafka_rdlock(rk);
57 if (likely(rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED))
58 pid = rk->rk_eos.pid;
59 else
60 rd_kafka_pid_reset(&pid);
61 rd_kafka_rdunlock(rk);
62
63 return pid;
64}
65
66void rd_kafka_idemp_set_state (rd_kafka_t *rk,
67 rd_kafka_idemp_state_t new_state);
68void rd_kafka_idemp_request_pid_failed (rd_kafka_broker_t *rkb,
69 rd_kafka_resp_err_t err);
70void rd_kafka_idemp_pid_update (rd_kafka_broker_t *rkb,
71 const rd_kafka_pid_t pid);
72int rd_kafka_idemp_request_pid (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
73 const char *reason);
74void rd_kafka_idemp_drain_reset (rd_kafka_t *rk);
75void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...);
76void rd_kafka_idemp_drain_toppar (rd_kafka_toppar_t *rktp, const char *reason);
77void rd_kafka_idemp_check_drain_done (rd_kafka_t *rk);
78void rd_kafka_idemp_inflight_toppar_sub (rd_kafka_t *rk,
79 rd_kafka_toppar_t *rktp);
80void rd_kafka_idemp_inflight_toppar_add (rd_kafka_t *rk,
81 rd_kafka_toppar_t *rktp);
82
83void rd_kafka_idemp_init (rd_kafka_t *rk);
84void rd_kafka_idemp_term (rd_kafka_t *rk);
85
86
87#endif /* _RD_KAFKA_IDEMPOTENCE_H_ */
88