1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rdkafka_int.h"
30#include "rdkafka_event.h"
31#include "rd.h"
32
33rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev) {
34 return rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE;
35}
36
37const char *rd_kafka_event_name (const rd_kafka_event_t *rkev) {
38 switch (rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE)
39 {
40 case RD_KAFKA_EVENT_NONE:
41 return "(NONE)";
42 case RD_KAFKA_EVENT_DR:
43 return "DeliveryReport";
44 case RD_KAFKA_EVENT_FETCH:
45 return "Fetch";
46 case RD_KAFKA_EVENT_LOG:
47 return "Log";
48 case RD_KAFKA_EVENT_ERROR:
49 return "Error";
50 case RD_KAFKA_EVENT_REBALANCE:
51 return "Rebalance";
52 case RD_KAFKA_EVENT_OFFSET_COMMIT:
53 return "OffsetCommit";
54 case RD_KAFKA_EVENT_STATS:
55 return "Stats";
56 case RD_KAFKA_EVENT_CREATETOPICS_RESULT:
57 return "CreateTopicsResult";
58 case RD_KAFKA_EVENT_DELETETOPICS_RESULT:
59 return "DeleteTopicsResult";
60 case RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT:
61 return "CreatePartitionsResult";
62 case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT:
63 return "AlterConfigsResult";
64 case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT:
65 return "DescribeConfigsResult";
66 case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH:
67 return "SaslOAuthBearerTokenRefresh";
68 default:
69 return "?unknown?";
70 }
71}
72
73
74
75
76void rd_kafka_event_destroy (rd_kafka_event_t *rkev) {
77 if (unlikely(!rkev))
78 return;
79 rd_kafka_op_destroy(rkev);
80}
81
82
83/**
84 * @returns the next message from the event's message queue.
85 * @remark messages will be freed automatically when event is destroyed,
86 * application MUST NOT call rd_kafka_message_destroy()
87 */
88const rd_kafka_message_t *
89rd_kafka_event_message_next (rd_kafka_event_t *rkev) {
90 rd_kafka_op_t *rko = rkev;
91 rd_kafka_msg_t *rkm;
92 rd_kafka_msgq_t *rkmq, *rkmq2;
93 rd_kafka_message_t *rkmessage;
94
95 switch (rkev->rko_type)
96 {
97 case RD_KAFKA_OP_DR:
98 rkmq = &rko->rko_u.dr.msgq;
99 rkmq2 = &rko->rko_u.dr.msgq2;
100 break;
101
102 case RD_KAFKA_OP_FETCH:
103 /* Just one message */
104 if (rko->rko_u.fetch.evidx++ > 0)
105 return NULL;
106
107 rkmessage = rd_kafka_message_get(rko);
108 if (unlikely(!rkmessage))
109 return NULL;
110
111 /* Store offset */
112 rd_kafka_op_offset_store(NULL, rko, rkmessage);
113
114 return rkmessage;
115
116
117 default:
118 return NULL;
119 }
120
121 if (unlikely(!(rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
122 return NULL;
123
124 rd_kafka_msgq_deq(rkmq, rkm, 1);
125
126 /* Put rkm on secondary message queue which will be purged later. */
127 rd_kafka_msgq_enq(rkmq2, rkm);
128
129 return rd_kafka_message_get_from_rkm(rko, rkm);
130}
131
132
133size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
134 const rd_kafka_message_t **rkmessages, size_t size) {
135 size_t cnt = 0;
136 const rd_kafka_message_t *rkmessage;
137
138 while ((rkmessage = rd_kafka_event_message_next(rkev)))
139 rkmessages[cnt++] = rkmessage;
140
141 return cnt;
142}
143
144
145size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev) {
146 switch (rkev->rko_evtype)
147 {
148 case RD_KAFKA_EVENT_DR:
149 return (size_t)rkev->rko_u.dr.msgq.rkmq_msg_cnt;
150 case RD_KAFKA_EVENT_FETCH:
151 return 1;
152 default:
153 return 0;
154 }
155}
156
157
158const char *rd_kafka_event_config_string (rd_kafka_event_t *rkev) {
159 switch (rkev->rko_evtype)
160 {
161#if WITH_SASL_OAUTHBEARER
162 case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH:
163 return rkev->rko_rk->rk_conf.sasl.oauthbearer_config;
164#endif
165 default:
166 return NULL;
167 }
168}
169
170rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev) {
171 return rkev->rko_err;
172}
173
174const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev) {
175 switch (rkev->rko_type)
176 {
177 case RD_KAFKA_OP_ERR:
178 case RD_KAFKA_OP_CONSUMER_ERR:
179 if (rkev->rko_u.err.errstr)
180 return rkev->rko_u.err.errstr;
181 break;
182 case RD_KAFKA_OP_ADMIN_RESULT:
183 if (rkev->rko_u.admin_result.errstr)
184 return rkev->rko_u.admin_result.errstr;
185 break;
186 default:
187 break;
188 }
189
190 return rd_kafka_err2str(rkev->rko_err);
191}
192
193int rd_kafka_event_error_is_fatal (rd_kafka_event_t *rkev) {
194 return rkev->rko_u.err.fatal;
195}
196
197
198void *rd_kafka_event_opaque (rd_kafka_event_t *rkev) {
199 switch (rkev->rko_type & ~RD_KAFKA_OP_FLAGMASK)
200 {
201 case RD_KAFKA_OP_OFFSET_COMMIT:
202 return rkev->rko_u.offset_commit.opaque;
203 case RD_KAFKA_OP_ADMIN_RESULT:
204 return rkev->rko_u.admin_result.opaque;
205 default:
206 return NULL;
207 }
208}
209
210
211int rd_kafka_event_log (rd_kafka_event_t *rkev, const char **fac,
212 const char **str, int *level) {
213 if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG))
214 return -1;
215
216 if (likely(fac != NULL))
217 *fac = rkev->rko_u.log.fac;
218 if (likely(str != NULL))
219 *str = rkev->rko_u.log.str;
220 if (likely(level != NULL))
221 *level = rkev->rko_u.log.level;
222
223 return 0;
224}
225
226const char *rd_kafka_event_stats (rd_kafka_event_t *rkev) {
227 return rkev->rko_u.stats.json;
228}
229
230rd_kafka_topic_partition_list_t *
231rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev) {
232 switch (rkev->rko_evtype)
233 {
234 case RD_KAFKA_EVENT_REBALANCE:
235 return rkev->rko_u.rebalance.partitions;
236 case RD_KAFKA_EVENT_OFFSET_COMMIT:
237 return rkev->rko_u.offset_commit.partitions;
238 default:
239 return NULL;
240 }
241}
242
243
244rd_kafka_topic_partition_t *
245rd_kafka_event_topic_partition (rd_kafka_event_t *rkev) {
246 rd_kafka_topic_partition_t *rktpar;
247
248 if (unlikely(!rkev->rko_rktp))
249 return NULL;
250
251 rktpar = rd_kafka_topic_partition_new_from_rktp(
252 rd_kafka_toppar_s2i(rkev->rko_rktp));
253
254 switch (rkev->rko_type)
255 {
256 case RD_KAFKA_OP_ERR:
257 case RD_KAFKA_OP_CONSUMER_ERR:
258 rktpar->offset = rkev->rko_u.err.offset;
259 break;
260 default:
261 break;
262 }
263
264 rktpar->err = rkev->rko_err;
265
266 return rktpar;
267
268}
269
270
271
272const rd_kafka_CreateTopics_result_t *
273rd_kafka_event_CreateTopics_result (rd_kafka_event_t *rkev) {
274 if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_CREATETOPICS_RESULT)
275 return NULL;
276 else
277 return (const rd_kafka_CreateTopics_result_t *)rkev;
278}
279
280
281const rd_kafka_DeleteTopics_result_t *
282rd_kafka_event_DeleteTopics_result (rd_kafka_event_t *rkev) {
283 if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DELETETOPICS_RESULT)
284 return NULL;
285 else
286 return (const rd_kafka_DeleteTopics_result_t *)rkev;
287}
288
289
290const rd_kafka_CreatePartitions_result_t *
291rd_kafka_event_CreatePartitions_result (rd_kafka_event_t *rkev) {
292 if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT)
293 return NULL;
294 else
295 return (const rd_kafka_CreatePartitions_result_t *)rkev;
296}
297
298
299const rd_kafka_AlterConfigs_result_t *
300rd_kafka_event_AlterConfigs_result (rd_kafka_event_t *rkev) {
301 if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_ALTERCONFIGS_RESULT)
302 return NULL;
303 else
304 return (const rd_kafka_AlterConfigs_result_t *)rkev;
305}
306
307
308const rd_kafka_DescribeConfigs_result_t *
309rd_kafka_event_DescribeConfigs_result (rd_kafka_event_t *rkev) {
310 if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT)
311 return NULL;
312 else
313 return (const rd_kafka_DescribeConfigs_result_t *)rkev;
314}
315