1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/**
31 * @brief Converts op type to event type.
32 * @returns the event type, or 0 if the op cannot be mapped to an event.
33 */
34static RD_UNUSED RD_INLINE
35rd_kafka_event_type_t rd_kafka_op2event (rd_kafka_op_type_t optype) {
36 static const rd_kafka_event_type_t map[RD_KAFKA_OP__END] = {
37 [RD_KAFKA_OP_DR] = RD_KAFKA_EVENT_DR,
38 [RD_KAFKA_OP_FETCH] = RD_KAFKA_EVENT_FETCH,
39 [RD_KAFKA_OP_ERR] = RD_KAFKA_EVENT_ERROR,
40 [RD_KAFKA_OP_CONSUMER_ERR] = RD_KAFKA_EVENT_ERROR,
41 [RD_KAFKA_OP_REBALANCE] = RD_KAFKA_EVENT_REBALANCE,
42 [RD_KAFKA_OP_OFFSET_COMMIT] = RD_KAFKA_EVENT_OFFSET_COMMIT,
43 [RD_KAFKA_OP_LOG] = RD_KAFKA_EVENT_LOG,
44 [RD_KAFKA_OP_STATS] = RD_KAFKA_EVENT_STATS,
45 [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH
46 };
47
48 return map[(int)optype & ~RD_KAFKA_OP_FLAGMASK];
49}
50
51
52/**
53 * @brief Attempt to set up an event based on rko.
54 * @returns 1 if op is event:able and set up, else 0.
55 */
56static RD_UNUSED RD_INLINE
57int rd_kafka_event_setup (rd_kafka_t *rk, rd_kafka_op_t *rko) {
58 if (!rko->rko_evtype)
59 rko->rko_evtype = rd_kafka_op2event(rko->rko_type);
60 switch (rko->rko_evtype)
61 {
62 case RD_KAFKA_EVENT_NONE:
63 return 0;
64
65 case RD_KAFKA_EVENT_DR:
66 rko->rko_rk = rk;
67 rd_dassert(!rko->rko_u.dr.do_purge2);
68 rd_kafka_msgq_init(&rko->rko_u.dr.msgq2);
69 rko->rko_u.dr.do_purge2 = 1;
70 return 1;
71
72 case RD_KAFKA_EVENT_ERROR:
73 if (rko->rko_err == RD_KAFKA_RESP_ERR__FATAL) {
74 /* Translate ERR__FATAL to the underlying fatal error
75 * code and string */
76 rd_kafka_resp_err_t ferr;
77 char errstr[512];
78 ferr = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
79 if (likely(ferr)) {
80 rko->rko_err = ferr;
81 if (rko->rko_u.err.errstr)
82 rd_free(rko->rko_u.err.errstr);
83 rko->rko_u.err.errstr = rd_strdup(errstr);
84 rko->rko_u.err.fatal = 1;
85 }
86 }
87 return 1;
88
89 case RD_KAFKA_EVENT_REBALANCE:
90 case RD_KAFKA_EVENT_LOG:
91 case RD_KAFKA_EVENT_OFFSET_COMMIT:
92 case RD_KAFKA_EVENT_STATS:
93 case RD_KAFKA_EVENT_CREATETOPICS_RESULT:
94 case RD_KAFKA_EVENT_DELETETOPICS_RESULT:
95 case RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT:
96 case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT:
97 case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT:
98 case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH:
99 return 1;
100
101 default:
102 return 0;
103 }
104}
105