1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#ifndef _RDKAFKA_OP_H_
29#define _RDKAFKA_OP_H_
30
31
32#include "rdkafka_msg.h"
33#include "rdkafka_timer.h"
34#include "rdkafka_admin.h"
35
36
37/* Forward declarations */
38typedef struct rd_kafka_q_s rd_kafka_q_t;
39typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
40typedef struct rd_kafka_op_s rd_kafka_op_t;
41
42/* One-off reply queue + reply version.
43 * All APIs that take a rd_kafka_replyq_t makes a copy of the
44 * struct as-is and grabs hold of the existing .q refcount.
45 * Think of replyq as a (Q,VERSION) tuple. */
46typedef struct rd_kafka_replyq_s {
47 rd_kafka_q_t *q;
48 int32_t version;
49#if ENABLE_DEVEL
50 char *_id; /* Devel id used for debugging reference leaks.
51 * Is a strdup() of the caller's function name,
52 * which makes for easy debugging with valgrind. */
53#endif
54} rd_kafka_replyq_t;
55
56
57
58
59/**
60 * Flags used by:
61 * - rd_kafka_op_t.rko_flags
62 * - rd_kafka_buf_t.rkbuf_flags
63 */
64#define RD_KAFKA_OP_F_FREE 0x1 /* rd_free payload when done with it */
65#define RD_KAFKA_OP_F_NO_RESPONSE 0x2 /* rkbuf: Not expecting a response */
66#define RD_KAFKA_OP_F_CRC 0x4 /* rkbuf: Perform CRC calculation */
67#define RD_KAFKA_OP_F_BLOCKING 0x8 /* rkbuf: blocking protocol request */
68#define RD_KAFKA_OP_F_REPROCESS 0x10 /* cgrp: Reprocess at a later time. */
69#define RD_KAFKA_OP_F_SENT 0x20 /* rkbuf: request sent on wire */
70
71
72typedef enum {
73 RD_KAFKA_OP_NONE, /* No specific type, use OP_CB */
74 RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */
75 RD_KAFKA_OP_ERR, /* Kafka thread -> Application */
76 RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */
77 RD_KAFKA_OP_DR, /* Kafka thread -> Application
78 * Produce message delivery report */
79 RD_KAFKA_OP_STATS, /* Kafka thread -> Application */
80
81 RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */
82 RD_KAFKA_OP_NODE_UPDATE, /* any -> Broker thread: node update */
83
84 RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */
85 RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */
86 RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */
87 RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */
88 RD_KAFKA_OP_FETCH_STOP, /* Application -> toppar's handler thread */
89 RD_KAFKA_OP_SEEK, /* Application -> toppar's handler thread */
90 RD_KAFKA_OP_PAUSE, /* Application -> toppar's handler thread */
91 RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets
92 * for topic. */
93
94 RD_KAFKA_OP_PARTITION_JOIN, /* * -> cgrp op: add toppar to cgrp
95 * * -> broker op: add toppar to broker */
96 RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op: remove toppar from cgrp
97 * * -> broker op: remove toppar from rkb*/
98 RD_KAFKA_OP_REBALANCE, /* broker thread -> app:
99 * group rebalance */
100 RD_KAFKA_OP_TERMINATE, /* For generic use */
101 RD_KAFKA_OP_COORD_QUERY, /* Query for coordinator */
102 RD_KAFKA_OP_SUBSCRIBE, /* New subscription */
103 RD_KAFKA_OP_ASSIGN, /* New assignment */
104 RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription.
105 * Reuses u.subscribe */
106 RD_KAFKA_OP_GET_ASSIGNMENT, /* Get current assignment.
107 * Reuses u.assign */
108 RD_KAFKA_OP_THROTTLE, /* Throttle info */
109 RD_KAFKA_OP_NAME, /* Request name */
110 RD_KAFKA_OP_OFFSET_RESET, /* Offset reset */
111 RD_KAFKA_OP_METADATA, /* Metadata response */
112 RD_KAFKA_OP_LOG, /* Log */
113 RD_KAFKA_OP_WAKEUP, /* Wake-up signaling */
114 RD_KAFKA_OP_CREATETOPICS, /**< Admin: CreateTopics: u.admin_request*/
115 RD_KAFKA_OP_DELETETOPICS, /**< Admin: DeleteTopics: u.admin_request*/
116 RD_KAFKA_OP_CREATEPARTITIONS,/**< Admin: CreatePartitions: u.admin_request*/
117 RD_KAFKA_OP_ALTERCONFIGS, /**< Admin: AlterConfigs: u.admin_request*/
118 RD_KAFKA_OP_DESCRIBECONFIGS, /**< Admin: DescribeConfigs: u.admin_request*/
119 RD_KAFKA_OP_ADMIN_RESULT, /**< Admin API .._result_t */
120 RD_KAFKA_OP_PURGE, /**< Purge queues */
121 RD_KAFKA_OP_CONNECT, /**< Connect (to broker) */
122 RD_KAFKA_OP_OAUTHBEARER_REFRESH, /**< Refresh OAUTHBEARER token */
123 RD_KAFKA_OP__END
124} rd_kafka_op_type_t;
125
126/* Flags used with op_type_t */
127#define RD_KAFKA_OP_CB (int)(1 << 29) /* Callback op. */
128#define RD_KAFKA_OP_REPLY (int)(1 << 30) /* Reply op. */
129#define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY)
130
131
132/**
133 * @brief Op/queue priority levels.
134 * @remark Since priority levels alter the FIFO order, pay extra attention
135 * to preserve ordering as deemed necessary.
136 * @remark Priority should only be set on ops destined for application
137 * facing queues (rk_rep, rkcg_q, etc).
138 */
139typedef enum {
140 RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */
141 RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk,
142 * still at some scale. e.g. logs, .. */
143 RD_KAFKA_PRIO_HIGH, /* Small scale high priority */
144 RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */
145} rd_kafka_prio_t;
146
147
148/**
149 * @brief Op handler result
150 *
151 * @remark When returning YIELD from a handler the handler will
152 * need to have made sure to either re-enqueue the op or destroy it
153 * since the caller will not touch the op anymore.
154 */
155typedef enum {
156 RD_KAFKA_OP_RES_PASS, /* Not handled, pass to caller */
157 RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */
158 RD_KAFKA_OP_RES_KEEP, /* Op was handled (through callbacks)
159 * but must not be destroyed by op_handle().
160 * It is NOT PERMITTED to return RES_KEEP
161 * from a callback handling a ERR__DESTROY
162 * event. */
163 RD_KAFKA_OP_RES_YIELD /* Callback called yield */
164} rd_kafka_op_res_t;
165
166
167/**
168 * @brief Queue serve callback call type
169 */
170typedef enum {
171 RD_KAFKA_Q_CB_INVALID, /* dont use */
172 RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */
173 RD_KAFKA_Q_CB_RETURN, /* return op rather than trigger callback
174 * (if possible)*/
175 RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */
176 RD_KAFKA_Q_CB_EVENT /* like _Q_CB_RETURN but return event_t:ed op */
177} rd_kafka_q_cb_type_t;
178
179/**
180 * @brief Queue serve callback
181 * @remark See rd_kafka_op_res_t docs for return semantics.
182 */
183typedef rd_kafka_op_res_t
184(rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,
185 struct rd_kafka_q_s *rkq,
186 struct rd_kafka_op_s *rko,
187 rd_kafka_q_cb_type_t cb_type, void *opaque)
188 RD_WARN_UNUSED_RESULT;
189
190/**
191 * @brief Op callback type
192 */
193typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,
194 rd_kafka_q_t *rkq,
195 struct rd_kafka_op_s *rko)
196 RD_WARN_UNUSED_RESULT;
197
198/* Forward declaration */
199struct rd_kafka_admin_worker_cbs;
200
201
202#define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \
203 rd_kafka_assert(NULL, (rko)->rko_type == (type) && # type)
204
205struct rd_kafka_op_s {
206 TAILQ_ENTRY(rd_kafka_op_s) rko_link;
207
208 rd_kafka_op_type_t rko_type; /* Internal op type */
209 rd_kafka_event_type_t rko_evtype;
210 int rko_flags; /* See RD_KAFKA_OP_F_... above */
211 int32_t rko_version;
212 rd_kafka_resp_err_t rko_err;
213 int32_t rko_len; /* Depends on type, typically the
214 * message length. */
215 rd_kafka_prio_t rko_prio; /**< In-queue priority.
216 * Higher value means higher prio*/
217
218 shptr_rd_kafka_toppar_t *rko_rktp;
219
220 /*
221 * Generic fields
222 */
223
224 /* Indicates request: enqueue reply on rko_replyq.q with .version.
225 * .q is refcounted. */
226 rd_kafka_replyq_t rko_replyq;
227
228 /* Original queue's op serve callback and opaque, if any.
229 * Mainly used for forwarded queues to use the original queue's
230 * serve function from the forwarded position. */
231 rd_kafka_q_serve_cb_t *rko_serve;
232 void *rko_serve_opaque;
233
234 rd_kafka_t *rko_rk;
235
236#if ENABLE_DEVEL
237 const char *rko_source; /**< Where op was created */
238#endif
239
240 /* RD_KAFKA_OP_CB */
241 rd_kafka_op_cb_t *rko_op_cb;
242
243 union {
244 struct {
245 rd_kafka_buf_t *rkbuf;
246 rd_kafka_msg_t rkm;
247 int evidx;
248 } fetch;
249
250 struct {
251 rd_kafka_topic_partition_list_t *partitions;
252 int do_free; /* free .partitions on destroy() */
253 } offset_fetch;
254
255 struct {
256 rd_kafka_topic_partition_list_t *partitions;
257 void (*cb) (rd_kafka_t *rk,
258 rd_kafka_resp_err_t err,
259 rd_kafka_topic_partition_list_t *offsets,
260 void *opaque);
261 void *opaque;
262 int silent_empty; /**< Fail silently if there are no
263 * offsets to commit. */
264 rd_ts_t ts_timeout;
265 char *reason;
266 } offset_commit;
267
268 struct {
269 rd_kafka_topic_partition_list_t *topics;
270 } subscribe; /* also used for GET_SUBSCRIPTION */
271
272 struct {
273 rd_kafka_topic_partition_list_t *partitions;
274 } assign; /* also used for GET_ASSIGNMENT */
275
276 struct {
277 rd_kafka_topic_partition_list_t *partitions;
278 } rebalance;
279
280 struct {
281 char *str;
282 } name;
283
284 struct {
285 int64_t offset;
286 char *errstr;
287 rd_kafka_msg_t rkm;
288 int fatal; /**< This was a ERR__FATAL error that has
289 * been translated to the fatal error
290 * code. */
291 } err; /* used for ERR and CONSUMER_ERR */
292
293 struct {
294 int throttle_time;
295 int32_t nodeid;
296 char *nodename;
297 } throttle;
298
299 struct {
300 char *json;
301 size_t json_len;
302 } stats;
303
304 struct {
305 rd_kafka_buf_t *rkbuf;
306 } xbuf; /* XMIT_BUF and RECV_BUF */
307
308 /* RD_KAFKA_OP_METADATA */
309 struct {
310 rd_kafka_metadata_t *md;
311 int force; /* force request regardless of outstanding
312 * metadata requests. */
313 } metadata;
314
315 struct {
316 shptr_rd_kafka_itopic_t *s_rkt;
317 rd_kafka_msgq_t msgq;
318 rd_kafka_msgq_t msgq2;
319 int do_purge2;
320 } dr;
321
322 struct {
323 int32_t nodeid;
324 char nodename[RD_KAFKA_NODENAME_SIZE];
325 } node;
326
327 struct {
328 int64_t offset;
329 char *reason;
330 } offset_reset;
331
332 struct {
333 int64_t offset;
334 struct rd_kafka_cgrp_s *rkcg;
335 } fetch_start; /* reused for SEEK */
336
337 struct {
338 int pause;
339 int flag;
340 } pause;
341
342 struct {
343 char fac[64];
344 int level;
345 char *str;
346 } log;
347
348 struct {
349 rd_kafka_AdminOptions_t options; /**< Copy of user's
350 * options, or NULL */
351 rd_ts_t abs_timeout; /**< Absolute timeout
352 * for this request. */
353 rd_kafka_timer_t tmr; /**< Timeout timer */
354 struct rd_kafka_enq_once_s *eonce; /**< Enqueue op
355 * only once,
356 * used to
357 * (re)trigger
358 * the request op
359 * upon broker state
360 * changes while
361 * waiting for the
362 * controller, or
363 * due to .tmr
364 * timeout. */
365 rd_list_t args;/**< Type depends on request, e.g.
366 * rd_kafka_NewTopic_t for CreateTopics
367 */
368
369 rd_kafka_buf_t *reply_buf; /**< Protocol reply,
370 * temporary reference not
371 * owned by this rko */
372
373 /**< Worker callbacks, see rdkafka_admin.c */
374 struct rd_kafka_admin_worker_cbs *cbs;
375
376 /** Worker state */
377 enum {
378 RD_KAFKA_ADMIN_STATE_INIT,
379 RD_KAFKA_ADMIN_STATE_WAIT_BROKER,
380 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,
381 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,
382 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,
383 } state;
384
385 int32_t broker_id; /**< Requested broker id to
386 * communicate with.
387 * Used for AlterConfigs, et.al,
388 * that needs to speak to a
389 * specific broker rather than
390 * the controller.
391 * Defaults to -1:
392 * look up and use controller. */
393
394 /** Application's reply queue */
395 rd_kafka_replyq_t replyq;
396 rd_kafka_event_type_t reply_event_type;
397 } admin_request;
398
399 struct {
400 rd_kafka_op_type_t reqtype; /**< Request op type,
401 * used for logging. */
402
403 char *errstr; /**< Error string, if rko_err
404 * is set, else NULL. */
405
406 rd_list_t results; /**< Type depends on request type:
407 *
408 * (rd_kafka_topic_result_t *):
409 * CreateTopics, DeleteTopics,
410 * CreatePartitions.
411 *
412 * (rd_kafka_ConfigResource_t *):
413 * AlterConfigs, DescribeConfigs
414 */
415
416 void *opaque; /**< Application's opaque as set by
417 * rd_kafka_AdminOptions_set_opaque
418 */
419 } admin_result;
420
421 struct {
422 int flags; /**< purge_flags from rd_kafka_purge() */
423 } purge;
424 } rko_u;
425};
426
427TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s);
428
429
430
431
432const char *rd_kafka_op2str (rd_kafka_op_type_t type);
433void rd_kafka_op_destroy (rd_kafka_op_t *rko);
434rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);
435#if ENABLE_DEVEL
436#define _STRINGIFYX(A) #A
437#define _STRINGIFY(A) _STRINGIFYX(A)
438#define rd_kafka_op_new(type) \
439 rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type)
440#else
441#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)
442#endif
443rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
444 rd_kafka_resp_err_t err);
445rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
446 rd_kafka_op_type_t type,
447 rd_kafka_op_cb_t *cb);
448int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
449
450#define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio)
451
452
453#define rd_kafka_op_err(rk,err,...) do { \
454 if (!((rk)->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)) { \
455 rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \
456 break; \
457 } \
458 rd_kafka_q_op_err((rk)->rk_rep, RD_KAFKA_OP_ERR, err, 0, \
459 NULL, 0, __VA_ARGS__); \
460 } while (0)
461
462void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype,
463 rd_kafka_resp_err_t err, int32_t version,
464 rd_kafka_toppar_t *rktp, int64_t offset,
465 const char *fmt, ...);
466rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
467 rd_kafka_op_t *rko,
468 int timeout_ms);
469rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type);
470rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko);
471
472rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,
473 rd_kafka_q_t *rkq, rd_kafka_op_t *rko)
474 RD_WARN_UNUSED_RESULT;
475
476rd_kafka_op_t *
477rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
478 rd_kafka_toppar_t *rktp,
479 int32_t version,
480 rd_kafka_buf_t *rkbuf,
481 int64_t offset,
482 size_t key_len, const void *key,
483 size_t val_len, const void *val);
484
485void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb,
486 rd_kafka_q_t *rkq,
487 int throttle_time);
488
489
490rd_kafka_op_res_t
491rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
492 rd_kafka_q_cb_type_t cb_type, void *opaque,
493 rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT;
494
495
496extern rd_atomic32_t rd_kafka_op_cnt;
497
498void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko);
499
500void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko,
501 const rd_kafka_message_t *rkmessage);
502
503#endif /* _RDKAFKA_OP_H_ */
504