1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <stdarg.h>
30
31#include "rdkafka_int.h"
32#include "rdkafka_op.h"
33#include "rdkafka_topic.h"
34#include "rdkafka_partition.h"
35#include "rdkafka_offset.h"
36
37/* Current number of rd_kafka_op_t */
38rd_atomic32_t rd_kafka_op_cnt;
39
40
41const char *rd_kafka_op2str (rd_kafka_op_type_t type) {
42 int skiplen = 6;
43 static const char *names[] = {
44 [RD_KAFKA_OP_NONE] = "REPLY:NONE",
45 [RD_KAFKA_OP_FETCH] = "REPLY:FETCH",
46 [RD_KAFKA_OP_ERR] = "REPLY:ERR",
47 [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR",
48 [RD_KAFKA_OP_DR] = "REPLY:DR",
49 [RD_KAFKA_OP_STATS] = "REPLY:STATS",
50 [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT",
51 [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE",
52 [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF",
53 [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF",
54 [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY",
55 [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START",
56 [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP",
57 [RD_KAFKA_OP_SEEK] = "REPLY:SEEK",
58 [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE",
59 [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH",
60 [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN",
61 [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE",
62 [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE",
63 [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE",
64 [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY",
65 [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE",
66 [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN",
67 [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION",
68 [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT",
69 [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE",
70 [RD_KAFKA_OP_NAME] = "REPLY:NAME",
71 [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET",
72 [RD_KAFKA_OP_METADATA] = "REPLY:METADATA",
73 [RD_KAFKA_OP_LOG] = "REPLY:LOG",
74 [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP",
75 [RD_KAFKA_OP_CREATETOPICS] = "REPLY:CREATETOPICS",
76 [RD_KAFKA_OP_DELETETOPICS] = "REPLY:DELETETOPICS",
77 [RD_KAFKA_OP_CREATEPARTITIONS] = "REPLY:CREATEPARTITIONS",
78 [RD_KAFKA_OP_ALTERCONFIGS] = "REPLY:ALTERCONFIGS",
79 [RD_KAFKA_OP_DESCRIBECONFIGS] = "REPLY:DESCRIBECONFIGS",
80 [RD_KAFKA_OP_ADMIN_RESULT] = "REPLY:ADMIN_RESULT",
81 [RD_KAFKA_OP_PURGE] = "REPLY:PURGE",
82 [RD_KAFKA_OP_CONNECT] = "REPLY:CONNECT",
83 [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = "REPLY:OAUTHBEARER_REFRESH"
84 };
85
86 if (type & RD_KAFKA_OP_REPLY)
87 skiplen = 0;
88
89 return names[type & ~RD_KAFKA_OP_FLAGMASK]+skiplen;
90}
91
92
93void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko) {
94 fprintf(fp,
95 "%s((rd_kafka_op_t*)%p)\n"
96 "%s Type: %s (0x%x), Version: %"PRId32"\n",
97 prefix, rko,
98 prefix, rd_kafka_op2str(rko->rko_type), rko->rko_type,
99 rko->rko_version);
100 if (rko->rko_err)
101 fprintf(fp, "%s Error: %s\n",
102 prefix, rd_kafka_err2str(rko->rko_err));
103 if (rko->rko_replyq.q)
104 fprintf(fp, "%s Replyq %p v%d (%s)\n",
105 prefix, rko->rko_replyq.q, rko->rko_replyq.version,
106#if ENABLE_DEVEL
107 rko->rko_replyq._id
108#else
109 ""
110#endif
111 );
112 if (rko->rko_rktp) {
113 rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
114 fprintf(fp, "%s ((rd_kafka_toppar_t*)%p) "
115 "%s [%"PRId32"] v%d (shptr %p)\n",
116 prefix, rktp, rktp->rktp_rkt->rkt_topic->str,
117 rktp->rktp_partition,
118 rd_atomic32_get(&rktp->rktp_version), rko->rko_rktp);
119 }
120
121 switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK)
122 {
123 case RD_KAFKA_OP_FETCH:
124 fprintf(fp, "%s Offset: %"PRId64"\n",
125 prefix, rko->rko_u.fetch.rkm.rkm_offset);
126 break;
127 case RD_KAFKA_OP_CONSUMER_ERR:
128 fprintf(fp, "%s Offset: %"PRId64"\n",
129 prefix, rko->rko_u.err.offset);
130 /* FALLTHRU */
131 case RD_KAFKA_OP_ERR:
132 fprintf(fp, "%s Reason: %s\n", prefix, rko->rko_u.err.errstr);
133 break;
134 case RD_KAFKA_OP_DR:
135 fprintf(fp, "%s %"PRId32" messages on %s\n", prefix,
136 rko->rko_u.dr.msgq.rkmq_msg_cnt,
137 rko->rko_u.dr.s_rkt ?
138 rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt)->
139 rkt_topic->str : "(n/a)");
140 break;
141 case RD_KAFKA_OP_OFFSET_COMMIT:
142 fprintf(fp, "%s Callback: %p (opaque %p)\n",
143 prefix, rko->rko_u.offset_commit.cb,
144 rko->rko_u.offset_commit.opaque);
145 fprintf(fp, "%s %d partitions\n",
146 prefix,
147 rko->rko_u.offset_commit.partitions ?
148 rko->rko_u.offset_commit.partitions->cnt : 0);
149 break;
150
151 case RD_KAFKA_OP_LOG:
152 fprintf(fp, "%s Log: %%%d %s: %s\n",
153 prefix, rko->rko_u.log.level,
154 rko->rko_u.log.fac,
155 rko->rko_u.log.str);
156 break;
157
158 default:
159 break;
160 }
161}
162
163
164rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) {
165 rd_kafka_op_t *rko;
166 static const size_t op2size[RD_KAFKA_OP__END] = {
167 [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch),
168 [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err),
169 [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err),
170 [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr),
171 [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats),
172 [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit),
173 [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node),
174 [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf),
175 [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf),
176 [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf),
177 [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start),
178 [RD_KAFKA_OP_FETCH_STOP] = 0,
179 [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start),
180 [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause),
181 [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch),
182 [RD_KAFKA_OP_PARTITION_JOIN] = 0,
183 [RD_KAFKA_OP_PARTITION_LEAVE] = 0,
184 [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance),
185 [RD_KAFKA_OP_TERMINATE] = 0,
186 [RD_KAFKA_OP_COORD_QUERY] = 0,
187 [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe),
188 [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign),
189 [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe),
190 [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign),
191 [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle),
192 [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name),
193 [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset),
194 [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata),
195 [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log),
196 [RD_KAFKA_OP_WAKEUP] = 0,
197 [RD_KAFKA_OP_CREATETOPICS] = sizeof(rko->rko_u.admin_request),
198 [RD_KAFKA_OP_DELETETOPICS] = sizeof(rko->rko_u.admin_request),
199 [RD_KAFKA_OP_CREATEPARTITIONS] = sizeof(rko->rko_u.admin_request),
200 [RD_KAFKA_OP_ALTERCONFIGS] = sizeof(rko->rko_u.admin_request),
201 [RD_KAFKA_OP_DESCRIBECONFIGS] = sizeof(rko->rko_u.admin_request),
202 [RD_KAFKA_OP_ADMIN_RESULT] = sizeof(rko->rko_u.admin_result),
203 [RD_KAFKA_OP_PURGE] = sizeof(rko->rko_u.purge),
204 [RD_KAFKA_OP_CONNECT] = 0,
205 [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = 0,
206 };
207 size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];
208
209 rko = rd_calloc(1, sizeof(*rko)-sizeof(rko->rko_u)+tsize);
210 rko->rko_type = type;
211
212#if ENABLE_DEVEL
213 rko->rko_source = source;
214 rd_atomic32_add(&rd_kafka_op_cnt, 1);
215#endif
216 return rko;
217}
218
219
220void rd_kafka_op_destroy (rd_kafka_op_t *rko) {
221
222 switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK)
223 {
224 case RD_KAFKA_OP_FETCH:
225 rd_kafka_msg_destroy(NULL, &rko->rko_u.fetch.rkm);
226 /* Decrease refcount on rkbuf to eventually rd_free shared buf*/
227 if (rko->rko_u.fetch.rkbuf)
228 rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
229
230 break;
231
232 case RD_KAFKA_OP_OFFSET_FETCH:
233 if (rko->rko_u.offset_fetch.partitions &&
234 rko->rko_u.offset_fetch.do_free)
235 rd_kafka_topic_partition_list_destroy(
236 rko->rko_u.offset_fetch.partitions);
237 break;
238
239 case RD_KAFKA_OP_OFFSET_COMMIT:
240 RD_IF_FREE(rko->rko_u.offset_commit.partitions,
241 rd_kafka_topic_partition_list_destroy);
242 RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free);
243 break;
244
245 case RD_KAFKA_OP_SUBSCRIBE:
246 case RD_KAFKA_OP_GET_SUBSCRIPTION:
247 RD_IF_FREE(rko->rko_u.subscribe.topics,
248 rd_kafka_topic_partition_list_destroy);
249 break;
250
251 case RD_KAFKA_OP_ASSIGN:
252 case RD_KAFKA_OP_GET_ASSIGNMENT:
253 RD_IF_FREE(rko->rko_u.assign.partitions,
254 rd_kafka_topic_partition_list_destroy);
255 break;
256
257 case RD_KAFKA_OP_REBALANCE:
258 RD_IF_FREE(rko->rko_u.rebalance.partitions,
259 rd_kafka_topic_partition_list_destroy);
260 break;
261
262 case RD_KAFKA_OP_NAME:
263 RD_IF_FREE(rko->rko_u.name.str, rd_free);
264 break;
265
266 case RD_KAFKA_OP_ERR:
267 case RD_KAFKA_OP_CONSUMER_ERR:
268 RD_IF_FREE(rko->rko_u.err.errstr, rd_free);
269 rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm);
270 break;
271
272 break;
273
274 case RD_KAFKA_OP_THROTTLE:
275 RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free);
276 break;
277
278 case RD_KAFKA_OP_STATS:
279 RD_IF_FREE(rko->rko_u.stats.json, rd_free);
280 break;
281
282 case RD_KAFKA_OP_XMIT_RETRY:
283 case RD_KAFKA_OP_XMIT_BUF:
284 case RD_KAFKA_OP_RECV_BUF:
285 if (rko->rko_u.xbuf.rkbuf)
286 rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
287
288 RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy);
289 break;
290
291 case RD_KAFKA_OP_DR:
292 rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq);
293 if (rko->rko_u.dr.do_purge2)
294 rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2);
295
296 if (rko->rko_u.dr.s_rkt)
297 rd_kafka_topic_destroy0(rko->rko_u.dr.s_rkt);
298 break;
299
300 case RD_KAFKA_OP_OFFSET_RESET:
301 RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free);
302 break;
303
304 case RD_KAFKA_OP_METADATA:
305 RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
306 break;
307
308 case RD_KAFKA_OP_LOG:
309 rd_free(rko->rko_u.log.str);
310 break;
311
312 case RD_KAFKA_OP_CREATETOPICS:
313 case RD_KAFKA_OP_DELETETOPICS:
314 case RD_KAFKA_OP_CREATEPARTITIONS:
315 case RD_KAFKA_OP_ALTERCONFIGS:
316 case RD_KAFKA_OP_DESCRIBECONFIGS:
317 rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq);
318 rd_list_destroy(&rko->rko_u.admin_request.args);
319 break;
320
321 case RD_KAFKA_OP_ADMIN_RESULT:
322 rd_list_destroy(&rko->rko_u.admin_result.results);
323 RD_IF_FREE(rko->rko_u.admin_result.errstr, rd_free);
324 break;
325
326 default:
327 break;
328 }
329
330 if (rko->rko_type & RD_KAFKA_OP_CB && rko->rko_op_cb) {
331 rd_kafka_op_res_t res;
332 /* Let callback clean up */
333 rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
334 res = rko->rko_op_cb(rko->rko_rk, NULL, rko);
335 rd_assert(res != RD_KAFKA_OP_RES_YIELD);
336 rd_assert(res != RD_KAFKA_OP_RES_KEEP);
337 }
338
339 RD_IF_FREE(rko->rko_rktp, rd_kafka_toppar_destroy);
340
341 rd_kafka_replyq_destroy(&rko->rko_replyq);
342
343#if ENABLE_DEVEL
344 if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0)
345 rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0");
346#endif
347
348 rd_free(rko);
349}
350
351
352
353
354
355
356
357
358
359
360
361/**
362 * Propagate an error event to the application on a specific queue.
363 * \p optype should be RD_KAFKA_OP_ERR for generic errors and
364 * RD_KAFKA_OP_CONSUMER_ERR for consumer errors.
365 */
366void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype,
367 rd_kafka_resp_err_t err, int32_t version,
368 rd_kafka_toppar_t *rktp, int64_t offset,
369 const char *fmt, ...) {
370 va_list ap;
371 char buf[2048];
372 rd_kafka_op_t *rko;
373
374 va_start(ap, fmt);
375 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
376 va_end(ap);
377
378 rko = rd_kafka_op_new(optype);
379 rko->rko_version = version;
380 rko->rko_err = err;
381 rko->rko_u.err.offset = offset;
382 rko->rko_u.err.errstr = rd_strdup(buf);
383 if (rktp)
384 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
385
386 rd_kafka_q_enq(rkq, rko);
387}
388
389
390
391/**
392 * Creates a reply opp based on 'rko_orig'.
393 * If 'rko_orig' has rko_op_cb set the reply op will be OR:ed with
394 * RD_KAFKA_OP_CB, else the reply type will be the original rko_type OR:ed
395 * with RD_KAFKA_OP_REPLY.
396 */
397rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
398 rd_kafka_resp_err_t err) {
399 rd_kafka_op_t *rko;
400
401 rko = rd_kafka_op_new(rko_orig->rko_type |
402 (rko_orig->rko_op_cb ?
403 RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY));
404 rd_kafka_op_get_reply_version(rko, rko_orig);
405 rko->rko_op_cb = rko_orig->rko_op_cb;
406 rko->rko_err = err;
407 if (rko_orig->rko_rktp)
408 rko->rko_rktp = rd_kafka_toppar_keep(
409 rd_kafka_toppar_s2i(rko_orig->rko_rktp));
410
411 return rko;
412}
413
414
415/**
416 * @brief Create new callback op for type \p type
417 */
418rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
419 rd_kafka_op_type_t type,
420 rd_kafka_op_cb_t *cb) {
421 rd_kafka_op_t *rko;
422 rko = rd_kafka_op_new(type | RD_KAFKA_OP_CB);
423 rko->rko_op_cb = cb;
424 rko->rko_rk = rk;
425 return rko;
426}
427
428
429
430/**
431 * @brief Reply to 'rko' re-using the same rko.
432 * If there is no replyq the rko is destroyed.
433 *
434 * @returns 1 if op was enqueued, else 0 and rko is destroyed.
435 */
436int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
437
438 if (!rko->rko_replyq.q) {
439 rd_kafka_op_destroy(rko);
440 return 0;
441 }
442
443 rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY);
444 rko->rko_err = err;
445
446 return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
447}
448
449
450/**
451 * @brief Send request to queue, wait for response.
452 *
453 * @returns response on success or NULL if destq is disabled.
454 */
455rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq,
456 rd_kafka_q_t *recvq,
457 rd_kafka_op_t *rko,
458 int timeout_ms) {
459 rd_kafka_op_t *reply;
460
461 /* Indicate to destination where to send reply. */
462 rd_kafka_op_set_replyq(rko, recvq, NULL);
463
464 /* Enqueue op */
465 if (!rd_kafka_q_enq(destq, rko))
466 return NULL;
467
468 /* Wait for reply */
469 reply = rd_kafka_q_pop(recvq, timeout_ms, 0);
470
471 /* May be NULL for timeout */
472 return reply;
473}
474
475/**
476 * Send request to queue, wait for response.
477 * Creates a temporary reply queue.
478 */
479rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
480 rd_kafka_op_t *rko,
481 int timeout_ms) {
482 rd_kafka_q_t *recvq;
483 rd_kafka_op_t *reply;
484
485 recvq = rd_kafka_q_new(destq->rkq_rk);
486
487 reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms);
488
489 rd_kafka_q_destroy_owner(recvq);
490
491 return reply;
492}
493
494
495/**
496 * Send simple type-only request to queue, wait for response.
497 */
498rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type) {
499 rd_kafka_op_t *rko;
500
501 rko = rd_kafka_op_new(type);
502 return rd_kafka_op_req(destq, rko, RD_POLL_INFINITE);
503}
504
505/**
506 * Destroys the rko and returns its error.
507 */
508rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko) {
509 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
510
511 if (rko) {
512 err = rko->rko_err;
513 rd_kafka_op_destroy(rko);
514 }
515 return err;
516}
517
518
519/**
520 * Call op callback
521 */
522rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq,
523 rd_kafka_op_t *rko) {
524 rd_kafka_op_res_t res;
525 res = rko->rko_op_cb(rk, rkq, rko);
526 if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread))
527 return RD_KAFKA_OP_RES_YIELD;
528 if (res != RD_KAFKA_OP_RES_KEEP)
529 rko->rko_op_cb = NULL;
530 return res;
531}
532
533
534/**
535 * @brief Creates a new RD_KAFKA_OP_FETCH op and sets up the
536 * embedded message according to the parameters.
537 *
538 * @param rkmp will be set to the embedded rkm in the rko (for convenience)
539 * @param offset may be updated later if relative offset.
540 */
541rd_kafka_op_t *
542rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
543 rd_kafka_toppar_t *rktp,
544 int32_t version,
545 rd_kafka_buf_t *rkbuf,
546 int64_t offset,
547 size_t key_len, const void *key,
548 size_t val_len, const void *val) {
549 rd_kafka_msg_t *rkm;
550 rd_kafka_op_t *rko;
551
552 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH);
553 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
554 rko->rko_version = version;
555 rkm = &rko->rko_u.fetch.rkm;
556 *rkmp = rkm;
557
558 /* Since all the ops share the same payload buffer
559 * a refcnt is used on the rkbuf that makes sure all
560 * consume_cb() will have been
561 * called for each of these ops before the rkbuf
562 * and its memory backing buffers are freed. */
563 rko->rko_u.fetch.rkbuf = rkbuf;
564 rd_kafka_buf_keep(rkbuf);
565
566 rkm->rkm_offset = offset;
567
568 rkm->rkm_key = (void *)key;
569 rkm->rkm_key_len = key_len;
570
571 rkm->rkm_payload = (void *)val;
572 rkm->rkm_len = val_len;
573 rko->rko_len = (int32_t)rkm->rkm_len;
574
575 rkm->rkm_partition = rktp->rktp_partition;
576
577 /* Persistence status is always PERSISTED for consumed messages
578 * since we managed to read the message. */
579 rkm->rkm_status = RD_KAFKA_MSG_STATUS_PERSISTED;
580
581 return rko;
582}
583
584
585/**
586 * Enqueue ERR__THROTTLE op, if desired.
587 */
588void rd_kafka_op_throttle_time (rd_kafka_broker_t *rkb,
589 rd_kafka_q_t *rkq,
590 int throttle_time) {
591 rd_kafka_op_t *rko;
592
593 rd_avg_add(&rkb->rkb_avg_throttle, throttle_time);
594
595 /* We send throttle events when:
596 * - throttle_time > 0
597 * - throttle_time == 0 and last throttle_time > 0
598 */
599 if (!rkb->rkb_rk->rk_conf.throttle_cb ||
600 (!throttle_time && !rd_atomic32_get(&rkb->rkb_rk->rk_last_throttle)))
601 return;
602
603 rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time);
604
605 rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE);
606 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
607 rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename);
608 rko->rko_u.throttle.nodeid = rkb->rkb_nodeid;
609 rko->rko_u.throttle.throttle_time = throttle_time;
610 rd_kafka_q_enq(rkq, rko);
611}
612
613
614/**
615 * @brief Handle standard op types.
616 */
617rd_kafka_op_res_t
618rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq,
619 rd_kafka_op_t *rko, int cb_type) {
620 if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
621 return RD_KAFKA_OP_RES_PASS;
622 else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
623 rko->rko_type & RD_KAFKA_OP_CB)
624 return rd_kafka_op_call(rk, rkq, rko);
625 else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) /* Handle Response */
626 rd_kafka_buf_handle_op(rko, rko->rko_err);
627 else if (cb_type != RD_KAFKA_Q_CB_RETURN &&
628 rko->rko_type & RD_KAFKA_OP_REPLY &&
629 rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
630 return RD_KAFKA_OP_RES_HANDLED; /* dest queue was
631 * probably disabled. */
632 else
633 return RD_KAFKA_OP_RES_PASS;
634
635 return RD_KAFKA_OP_RES_HANDLED;
636}
637
638
639/**
640 * @brief Attempt to handle op using its queue's serve callback,
641 * or the passed callback, or op_handle_std(), else do nothing.
642 *
643 * @param rkq is \p rko's queue (which it was unlinked from) with rkq_lock
644 * being held. Callback may re-enqueue the op on this queue
645 * and return YIELD.
646 *
647 * @returns HANDLED if op was handled (and destroyed), PASS if not,
648 * or YIELD if op was handled (maybe destroyed or re-enqueued)
649 * and caller must propagate yield upwards (cancel and return).
650 */
651rd_kafka_op_res_t
652rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
653 rd_kafka_q_cb_type_t cb_type, void *opaque,
654 rd_kafka_q_serve_cb_t *callback) {
655 rd_kafka_op_res_t res;
656
657 res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);
658 if (res == RD_KAFKA_OP_RES_KEEP) {
659 /* Op was handled but must not be destroyed. */
660 return res;
661 } if (res == RD_KAFKA_OP_RES_HANDLED) {
662 rd_kafka_op_destroy(rko);
663 return res;
664 } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))
665 return res;
666
667 if (rko->rko_serve) {
668 callback = rko->rko_serve;
669 opaque = rko->rko_serve_opaque;
670 rko->rko_serve = NULL;
671 rko->rko_serve_opaque = NULL;
672 }
673
674 if (callback)
675 res = callback(rk, rkq, rko, cb_type, opaque);
676
677 return res;
678}
679
680
681/**
682 * @brief Store offset for fetched message.
683 */
684void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko,
685 const rd_kafka_message_t *rkmessage) {
686 rd_kafka_toppar_t *rktp;
687
688 if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH || rko->rko_err))
689 return;
690
691 rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
692
693 if (unlikely(!rk))
694 rk = rktp->rktp_rkt->rkt_rk;
695
696 rd_kafka_toppar_lock(rktp);
697 rktp->rktp_app_offset = rkmessage->offset+1;
698 if (rk->rk_conf.enable_auto_offset_store)
699 rd_kafka_offset_store0(rktp, rkmessage->offset+1, 0/*no lock*/);
700 rd_kafka_toppar_unlock(rktp);
701}
702