1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2015, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include <stdarg.h> |
30 | |
31 | #include "rdkafka_int.h" |
32 | #include "rdkafka_op.h" |
33 | #include "rdkafka_topic.h" |
34 | #include "rdkafka_partition.h" |
35 | #include "rdkafka_offset.h" |
36 | |
37 | /* Current number of rd_kafka_op_t */ |
38 | rd_atomic32_t rd_kafka_op_cnt; |
39 | |
40 | |
41 | const char *rd_kafka_op2str (rd_kafka_op_type_t type) { |
42 | int skiplen = 6; |
43 | static const char *names[] = { |
44 | [RD_KAFKA_OP_NONE] = "REPLY:NONE" , |
45 | [RD_KAFKA_OP_FETCH] = "REPLY:FETCH" , |
46 | [RD_KAFKA_OP_ERR] = "REPLY:ERR" , |
47 | [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR" , |
48 | [RD_KAFKA_OP_DR] = "REPLY:DR" , |
49 | [RD_KAFKA_OP_STATS] = "REPLY:STATS" , |
50 | [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT" , |
51 | [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE" , |
52 | [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF" , |
53 | [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF" , |
54 | [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY" , |
55 | [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START" , |
56 | [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP" , |
57 | [RD_KAFKA_OP_SEEK] = "REPLY:SEEK" , |
58 | [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE" , |
59 | [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH" , |
60 | [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN" , |
61 | [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE" , |
62 | [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE" , |
63 | [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE" , |
64 | [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY" , |
65 | [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE" , |
66 | [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN" , |
67 | [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION" , |
68 | [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT" , |
69 | [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE" , |
70 | [RD_KAFKA_OP_NAME] = "REPLY:NAME" , |
71 | [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET" , |
72 | [RD_KAFKA_OP_METADATA] = "REPLY:METADATA" , |
73 | [RD_KAFKA_OP_LOG] = "REPLY:LOG" , |
74 | [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP" , |
75 | [RD_KAFKA_OP_CREATETOPICS] = "REPLY:CREATETOPICS" , |
76 | [RD_KAFKA_OP_DELETETOPICS] = "REPLY:DELETETOPICS" , |
77 | [RD_KAFKA_OP_CREATEPARTITIONS] = "REPLY:CREATEPARTITIONS" , |
78 | [RD_KAFKA_OP_ALTERCONFIGS] = "REPLY:ALTERCONFIGS" , |
79 | [RD_KAFKA_OP_DESCRIBECONFIGS] = "REPLY:DESCRIBECONFIGS" , |
80 | [RD_KAFKA_OP_ADMIN_RESULT] = "REPLY:ADMIN_RESULT" , |
81 | [RD_KAFKA_OP_PURGE] = "REPLY:PURGE" , |
82 | [RD_KAFKA_OP_CONNECT] = "REPLY:CONNECT" , |
83 | [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = "REPLY:OAUTHBEARER_REFRESH" |
84 | }; |
85 | |
86 | if (type & RD_KAFKA_OP_REPLY) |
87 | skiplen = 0; |
88 | |
89 | return names[type & ~RD_KAFKA_OP_FLAGMASK]+skiplen; |
90 | } |
91 | |
92 | |
93 | void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko) { |
94 | fprintf(fp, |
95 | "%s((rd_kafka_op_t*)%p)\n" |
96 | "%s Type: %s (0x%x), Version: %" PRId32"\n" , |
97 | prefix, rko, |
98 | prefix, rd_kafka_op2str(rko->rko_type), rko->rko_type, |
99 | rko->rko_version); |
100 | if (rko->rko_err) |
101 | fprintf(fp, "%s Error: %s\n" , |
102 | prefix, rd_kafka_err2str(rko->rko_err)); |
103 | if (rko->rko_replyq.q) |
104 | fprintf(fp, "%s Replyq %p v%d (%s)\n" , |
105 | prefix, rko->rko_replyq.q, rko->rko_replyq.version, |
106 | #if ENABLE_DEVEL |
107 | rko->rko_replyq._id |
108 | #else |
109 | "" |
110 | #endif |
111 | ); |
112 | if (rko->rko_rktp) { |
113 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
114 | fprintf(fp, "%s ((rd_kafka_toppar_t*)%p) " |
115 | "%s [%" PRId32"] v%d (shptr %p)\n" , |
116 | prefix, rktp, rktp->rktp_rkt->rkt_topic->str, |
117 | rktp->rktp_partition, |
118 | rd_atomic32_get(&rktp->rktp_version), rko->rko_rktp); |
119 | } |
120 | |
121 | switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) |
122 | { |
123 | case RD_KAFKA_OP_FETCH: |
124 | fprintf(fp, "%s Offset: %" PRId64"\n" , |
125 | prefix, rko->rko_u.fetch.rkm.rkm_offset); |
126 | break; |
127 | case RD_KAFKA_OP_CONSUMER_ERR: |
128 | fprintf(fp, "%s Offset: %" PRId64"\n" , |
129 | prefix, rko->rko_u.err.offset); |
130 | /* FALLTHRU */ |
131 | case RD_KAFKA_OP_ERR: |
132 | fprintf(fp, "%s Reason: %s\n" , prefix, rko->rko_u.err.errstr); |
133 | break; |
134 | case RD_KAFKA_OP_DR: |
135 | fprintf(fp, "%s %" PRId32" messages on %s\n" , prefix, |
136 | rko->rko_u.dr.msgq.rkmq_msg_cnt, |
137 | rko->rko_u.dr.s_rkt ? |
138 | rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt)-> |
139 | rkt_topic->str : "(n/a)" ); |
140 | break; |
141 | case RD_KAFKA_OP_OFFSET_COMMIT: |
142 | fprintf(fp, "%s Callback: %p (opaque %p)\n" , |
143 | prefix, rko->rko_u.offset_commit.cb, |
144 | rko->rko_u.offset_commit.opaque); |
145 | fprintf(fp, "%s %d partitions\n" , |
146 | prefix, |
147 | rko->rko_u.offset_commit.partitions ? |
148 | rko->rko_u.offset_commit.partitions->cnt : 0); |
149 | break; |
150 | |
151 | case RD_KAFKA_OP_LOG: |
152 | fprintf(fp, "%s Log: %%%d %s: %s\n" , |
153 | prefix, rko->rko_u.log.level, |
154 | rko->rko_u.log.fac, |
155 | rko->rko_u.log.str); |
156 | break; |
157 | |
158 | default: |
159 | break; |
160 | } |
161 | } |
162 | |
163 | |
164 | rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) { |
165 | rd_kafka_op_t *rko; |
166 | static const size_t op2size[RD_KAFKA_OP__END] = { |
167 | [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch), |
168 | [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err), |
169 | [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err), |
170 | [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr), |
171 | [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats), |
172 | [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit), |
173 | [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node), |
174 | [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf), |
175 | [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf), |
176 | [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf), |
177 | [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start), |
178 | [RD_KAFKA_OP_FETCH_STOP] = 0, |
179 | [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start), |
180 | [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause), |
181 | [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch), |
182 | [RD_KAFKA_OP_PARTITION_JOIN] = 0, |
183 | [RD_KAFKA_OP_PARTITION_LEAVE] = 0, |
184 | [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance), |
185 | [RD_KAFKA_OP_TERMINATE] = 0, |
186 | [RD_KAFKA_OP_COORD_QUERY] = 0, |
187 | [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe), |
188 | [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign), |
189 | [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe), |
190 | [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign), |
191 | [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle), |
192 | [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name), |
193 | [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset), |
194 | [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata), |
195 | [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log), |
196 | [RD_KAFKA_OP_WAKEUP] = 0, |
197 | [RD_KAFKA_OP_CREATETOPICS] = sizeof(rko->rko_u.admin_request), |
198 | [RD_KAFKA_OP_DELETETOPICS] = sizeof(rko->rko_u.admin_request), |
199 | [RD_KAFKA_OP_CREATEPARTITIONS] = sizeof(rko->rko_u.admin_request), |
200 | [RD_KAFKA_OP_ALTERCONFIGS] = sizeof(rko->rko_u.admin_request), |
201 | [RD_KAFKA_OP_DESCRIBECONFIGS] = sizeof(rko->rko_u.admin_request), |
202 | [RD_KAFKA_OP_ADMIN_RESULT] = sizeof(rko->rko_u.admin_result), |
203 | [RD_KAFKA_OP_PURGE] = sizeof(rko->rko_u.purge), |
204 | [RD_KAFKA_OP_CONNECT] = 0, |
205 | [RD_KAFKA_OP_OAUTHBEARER_REFRESH] = 0, |
206 | }; |
207 | size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; |
208 | |
209 | rko = rd_calloc(1, sizeof(*rko)-sizeof(rko->rko_u)+tsize); |
210 | rko->rko_type = type; |
211 | |
212 | #if ENABLE_DEVEL |
213 | rko->rko_source = source; |
214 | rd_atomic32_add(&rd_kafka_op_cnt, 1); |
215 | #endif |
216 | return rko; |
217 | } |
218 | |
219 | |
220 | void rd_kafka_op_destroy (rd_kafka_op_t *rko) { |
221 | |
222 | switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) |
223 | { |
224 | case RD_KAFKA_OP_FETCH: |
225 | rd_kafka_msg_destroy(NULL, &rko->rko_u.fetch.rkm); |
226 | /* Decrease refcount on rkbuf to eventually rd_free shared buf*/ |
227 | if (rko->rko_u.fetch.rkbuf) |
228 | rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY); |
229 | |
230 | break; |
231 | |
232 | case RD_KAFKA_OP_OFFSET_FETCH: |
233 | if (rko->rko_u.offset_fetch.partitions && |
234 | rko->rko_u.offset_fetch.do_free) |
235 | rd_kafka_topic_partition_list_destroy( |
236 | rko->rko_u.offset_fetch.partitions); |
237 | break; |
238 | |
239 | case RD_KAFKA_OP_OFFSET_COMMIT: |
240 | RD_IF_FREE(rko->rko_u.offset_commit.partitions, |
241 | rd_kafka_topic_partition_list_destroy); |
242 | RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free); |
243 | break; |
244 | |
245 | case RD_KAFKA_OP_SUBSCRIBE: |
246 | case RD_KAFKA_OP_GET_SUBSCRIPTION: |
247 | RD_IF_FREE(rko->rko_u.subscribe.topics, |
248 | rd_kafka_topic_partition_list_destroy); |
249 | break; |
250 | |
251 | case RD_KAFKA_OP_ASSIGN: |
252 | case RD_KAFKA_OP_GET_ASSIGNMENT: |
253 | RD_IF_FREE(rko->rko_u.assign.partitions, |
254 | rd_kafka_topic_partition_list_destroy); |
255 | break; |
256 | |
257 | case RD_KAFKA_OP_REBALANCE: |
258 | RD_IF_FREE(rko->rko_u.rebalance.partitions, |
259 | rd_kafka_topic_partition_list_destroy); |
260 | break; |
261 | |
262 | case RD_KAFKA_OP_NAME: |
263 | RD_IF_FREE(rko->rko_u.name.str, rd_free); |
264 | break; |
265 | |
266 | case RD_KAFKA_OP_ERR: |
267 | case RD_KAFKA_OP_CONSUMER_ERR: |
268 | RD_IF_FREE(rko->rko_u.err.errstr, rd_free); |
269 | rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm); |
270 | break; |
271 | |
272 | break; |
273 | |
274 | case RD_KAFKA_OP_THROTTLE: |
275 | RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free); |
276 | break; |
277 | |
278 | case RD_KAFKA_OP_STATS: |
279 | RD_IF_FREE(rko->rko_u.stats.json, rd_free); |
280 | break; |
281 | |
282 | case RD_KAFKA_OP_XMIT_RETRY: |
283 | case RD_KAFKA_OP_XMIT_BUF: |
284 | case RD_KAFKA_OP_RECV_BUF: |
285 | if (rko->rko_u.xbuf.rkbuf) |
286 | rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY); |
287 | |
288 | RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy); |
289 | break; |
290 | |
291 | case RD_KAFKA_OP_DR: |
292 | rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq); |
293 | if (rko->rko_u.dr.do_purge2) |
294 | rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2); |
295 | |
296 | if (rko->rko_u.dr.s_rkt) |
297 | rd_kafka_topic_destroy0(rko->rko_u.dr.s_rkt); |
298 | break; |
299 | |
300 | case RD_KAFKA_OP_OFFSET_RESET: |
301 | RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free); |
302 | break; |
303 | |
304 | case RD_KAFKA_OP_METADATA: |
305 | RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); |
306 | break; |
307 | |
308 | case RD_KAFKA_OP_LOG: |
309 | rd_free(rko->rko_u.log.str); |
310 | break; |
311 | |
312 | case RD_KAFKA_OP_CREATETOPICS: |
313 | case RD_KAFKA_OP_DELETETOPICS: |
314 | case RD_KAFKA_OP_CREATEPARTITIONS: |
315 | case RD_KAFKA_OP_ALTERCONFIGS: |
316 | case RD_KAFKA_OP_DESCRIBECONFIGS: |
317 | rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); |
318 | rd_list_destroy(&rko->rko_u.admin_request.args); |
319 | break; |
320 | |
321 | case RD_KAFKA_OP_ADMIN_RESULT: |
322 | rd_list_destroy(&rko->rko_u.admin_result.results); |
323 | RD_IF_FREE(rko->rko_u.admin_result.errstr, rd_free); |
324 | break; |
325 | |
326 | default: |
327 | break; |
328 | } |
329 | |
330 | if (rko->rko_type & RD_KAFKA_OP_CB && rko->rko_op_cb) { |
331 | rd_kafka_op_res_t res; |
332 | /* Let callback clean up */ |
333 | rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY; |
334 | res = rko->rko_op_cb(rko->rko_rk, NULL, rko); |
335 | rd_assert(res != RD_KAFKA_OP_RES_YIELD); |
336 | rd_assert(res != RD_KAFKA_OP_RES_KEEP); |
337 | } |
338 | |
339 | RD_IF_FREE(rko->rko_rktp, rd_kafka_toppar_destroy); |
340 | |
341 | rd_kafka_replyq_destroy(&rko->rko_replyq); |
342 | |
343 | #if ENABLE_DEVEL |
344 | if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0) |
345 | rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0" ); |
346 | #endif |
347 | |
348 | rd_free(rko); |
349 | } |
350 | |
351 | |
352 | |
353 | |
354 | |
355 | |
356 | |
357 | |
358 | |
359 | |
360 | |
361 | /** |
362 | * Propagate an error event to the application on a specific queue. |
363 | * \p optype should be RD_KAFKA_OP_ERR for generic errors and |
364 | * RD_KAFKA_OP_CONSUMER_ERR for consumer errors. |
365 | */ |
366 | void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype, |
367 | rd_kafka_resp_err_t err, int32_t version, |
368 | rd_kafka_toppar_t *rktp, int64_t offset, |
369 | const char *fmt, ...) { |
370 | va_list ap; |
371 | char buf[2048]; |
372 | rd_kafka_op_t *rko; |
373 | |
374 | va_start(ap, fmt); |
375 | rd_vsnprintf(buf, sizeof(buf), fmt, ap); |
376 | va_end(ap); |
377 | |
378 | rko = rd_kafka_op_new(optype); |
379 | rko->rko_version = version; |
380 | rko->rko_err = err; |
381 | rko->rko_u.err.offset = offset; |
382 | rko->rko_u.err.errstr = rd_strdup(buf); |
383 | if (rktp) |
384 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
385 | |
386 | rd_kafka_q_enq(rkq, rko); |
387 | } |
388 | |
389 | |
390 | |
391 | /** |
392 | * Creates a reply opp based on 'rko_orig'. |
393 | * If 'rko_orig' has rko_op_cb set the reply op will be OR:ed with |
394 | * RD_KAFKA_OP_CB, else the reply type will be the original rko_type OR:ed |
395 | * with RD_KAFKA_OP_REPLY. |
396 | */ |
397 | rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig, |
398 | rd_kafka_resp_err_t err) { |
399 | rd_kafka_op_t *rko; |
400 | |
401 | rko = rd_kafka_op_new(rko_orig->rko_type | |
402 | (rko_orig->rko_op_cb ? |
403 | RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY)); |
404 | rd_kafka_op_get_reply_version(rko, rko_orig); |
405 | rko->rko_op_cb = rko_orig->rko_op_cb; |
406 | rko->rko_err = err; |
407 | if (rko_orig->rko_rktp) |
408 | rko->rko_rktp = rd_kafka_toppar_keep( |
409 | rd_kafka_toppar_s2i(rko_orig->rko_rktp)); |
410 | |
411 | return rko; |
412 | } |
413 | |
414 | |
415 | /** |
416 | * @brief Create new callback op for type \p type |
417 | */ |
418 | rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk, |
419 | rd_kafka_op_type_t type, |
420 | rd_kafka_op_cb_t *cb) { |
421 | rd_kafka_op_t *rko; |
422 | rko = rd_kafka_op_new(type | RD_KAFKA_OP_CB); |
423 | rko->rko_op_cb = cb; |
424 | rko->rko_rk = rk; |
425 | return rko; |
426 | } |
427 | |
428 | |
429 | |
430 | /** |
431 | * @brief Reply to 'rko' re-using the same rko. |
432 | * If there is no replyq the rko is destroyed. |
433 | * |
434 | * @returns 1 if op was enqueued, else 0 and rko is destroyed. |
435 | */ |
436 | int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) { |
437 | |
438 | if (!rko->rko_replyq.q) { |
439 | rd_kafka_op_destroy(rko); |
440 | return 0; |
441 | } |
442 | |
443 | rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY); |
444 | rko->rko_err = err; |
445 | |
446 | return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); |
447 | } |
448 | |
449 | |
450 | /** |
451 | * @brief Send request to queue, wait for response. |
452 | * |
453 | * @returns response on success or NULL if destq is disabled. |
454 | */ |
455 | rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq, |
456 | rd_kafka_q_t *recvq, |
457 | rd_kafka_op_t *rko, |
458 | int timeout_ms) { |
459 | rd_kafka_op_t *reply; |
460 | |
461 | /* Indicate to destination where to send reply. */ |
462 | rd_kafka_op_set_replyq(rko, recvq, NULL); |
463 | |
464 | /* Enqueue op */ |
465 | if (!rd_kafka_q_enq(destq, rko)) |
466 | return NULL; |
467 | |
468 | /* Wait for reply */ |
469 | reply = rd_kafka_q_pop(recvq, timeout_ms, 0); |
470 | |
471 | /* May be NULL for timeout */ |
472 | return reply; |
473 | } |
474 | |
475 | /** |
476 | * Send request to queue, wait for response. |
477 | * Creates a temporary reply queue. |
478 | */ |
479 | rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq, |
480 | rd_kafka_op_t *rko, |
481 | int timeout_ms) { |
482 | rd_kafka_q_t *recvq; |
483 | rd_kafka_op_t *reply; |
484 | |
485 | recvq = rd_kafka_q_new(destq->rkq_rk); |
486 | |
487 | reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms); |
488 | |
489 | rd_kafka_q_destroy_owner(recvq); |
490 | |
491 | return reply; |
492 | } |
493 | |
494 | |
495 | /** |
496 | * Send simple type-only request to queue, wait for response. |
497 | */ |
498 | rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type) { |
499 | rd_kafka_op_t *rko; |
500 | |
501 | rko = rd_kafka_op_new(type); |
502 | return rd_kafka_op_req(destq, rko, RD_POLL_INFINITE); |
503 | } |
504 | |
505 | /** |
506 | * Destroys the rko and returns its error. |
507 | */ |
508 | rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko) { |
509 | rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
510 | |
511 | if (rko) { |
512 | err = rko->rko_err; |
513 | rd_kafka_op_destroy(rko); |
514 | } |
515 | return err; |
516 | } |
517 | |
518 | |
519 | /** |
520 | * Call op callback |
521 | */ |
522 | rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
523 | rd_kafka_op_t *rko) { |
524 | rd_kafka_op_res_t res; |
525 | res = rko->rko_op_cb(rk, rkq, rko); |
526 | if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread)) |
527 | return RD_KAFKA_OP_RES_YIELD; |
528 | if (res != RD_KAFKA_OP_RES_KEEP) |
529 | rko->rko_op_cb = NULL; |
530 | return res; |
531 | } |
532 | |
533 | |
534 | /** |
535 | * @brief Creates a new RD_KAFKA_OP_FETCH op and sets up the |
536 | * embedded message according to the parameters. |
537 | * |
538 | * @param rkmp will be set to the embedded rkm in the rko (for convenience) |
539 | * @param offset may be updated later if relative offset. |
540 | */ |
541 | rd_kafka_op_t * |
542 | rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp, |
543 | rd_kafka_toppar_t *rktp, |
544 | int32_t version, |
545 | rd_kafka_buf_t *rkbuf, |
546 | int64_t offset, |
547 | size_t key_len, const void *key, |
548 | size_t val_len, const void *val) { |
549 | rd_kafka_msg_t *rkm; |
550 | rd_kafka_op_t *rko; |
551 | |
552 | rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH); |
553 | rko->rko_rktp = rd_kafka_toppar_keep(rktp); |
554 | rko->rko_version = version; |
555 | rkm = &rko->rko_u.fetch.rkm; |
556 | *rkmp = rkm; |
557 | |
558 | /* Since all the ops share the same payload buffer |
559 | * a refcnt is used on the rkbuf that makes sure all |
560 | * consume_cb() will have been |
561 | * called for each of these ops before the rkbuf |
562 | * and its memory backing buffers are freed. */ |
563 | rko->rko_u.fetch.rkbuf = rkbuf; |
564 | rd_kafka_buf_keep(rkbuf); |
565 | |
566 | rkm->rkm_offset = offset; |
567 | |
568 | rkm->rkm_key = (void *)key; |
569 | rkm->rkm_key_len = key_len; |
570 | |
571 | rkm->rkm_payload = (void *)val; |
572 | rkm->rkm_len = val_len; |
573 | rko->rko_len = (int32_t)rkm->rkm_len; |
574 | |
575 | rkm->rkm_partition = rktp->rktp_partition; |
576 | |
577 | /* Persistence status is always PERSISTED for consumed messages |
578 | * since we managed to read the message. */ |
579 | rkm->rkm_status = RD_KAFKA_MSG_STATUS_PERSISTED; |
580 | |
581 | return rko; |
582 | } |
583 | |
584 | |
585 | /** |
586 | * Enqueue ERR__THROTTLE op, if desired. |
587 | */ |
588 | void rd_kafka_op_throttle_time (rd_kafka_broker_t *rkb, |
589 | rd_kafka_q_t *rkq, |
590 | int throttle_time) { |
591 | rd_kafka_op_t *rko; |
592 | |
593 | rd_avg_add(&rkb->rkb_avg_throttle, throttle_time); |
594 | |
595 | /* We send throttle events when: |
596 | * - throttle_time > 0 |
597 | * - throttle_time == 0 and last throttle_time > 0 |
598 | */ |
599 | if (!rkb->rkb_rk->rk_conf.throttle_cb || |
600 | (!throttle_time && !rd_atomic32_get(&rkb->rkb_rk->rk_last_throttle))) |
601 | return; |
602 | |
603 | rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time); |
604 | |
605 | rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE); |
606 | rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH); |
607 | rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename); |
608 | rko->rko_u.throttle.nodeid = rkb->rkb_nodeid; |
609 | rko->rko_u.throttle.throttle_time = throttle_time; |
610 | rd_kafka_q_enq(rkq, rko); |
611 | } |
612 | |
613 | |
614 | /** |
615 | * @brief Handle standard op types. |
616 | */ |
617 | rd_kafka_op_res_t |
618 | rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq, |
619 | rd_kafka_op_t *rko, int cb_type) { |
620 | if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) |
621 | return RD_KAFKA_OP_RES_PASS; |
622 | else if (cb_type != RD_KAFKA_Q_CB_EVENT && |
623 | rko->rko_type & RD_KAFKA_OP_CB) |
624 | return rd_kafka_op_call(rk, rkq, rko); |
625 | else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) /* Handle Response */ |
626 | rd_kafka_buf_handle_op(rko, rko->rko_err); |
627 | else if (cb_type != RD_KAFKA_Q_CB_RETURN && |
628 | rko->rko_type & RD_KAFKA_OP_REPLY && |
629 | rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) |
630 | return RD_KAFKA_OP_RES_HANDLED; /* dest queue was |
631 | * probably disabled. */ |
632 | else |
633 | return RD_KAFKA_OP_RES_PASS; |
634 | |
635 | return RD_KAFKA_OP_RES_HANDLED; |
636 | } |
637 | |
638 | |
639 | /** |
640 | * @brief Attempt to handle op using its queue's serve callback, |
641 | * or the passed callback, or op_handle_std(), else do nothing. |
642 | * |
643 | * @param rkq is \p rko's queue (which it was unlinked from) with rkq_lock |
644 | * being held. Callback may re-enqueue the op on this queue |
645 | * and return YIELD. |
646 | * |
647 | * @returns HANDLED if op was handled (and destroyed), PASS if not, |
648 | * or YIELD if op was handled (maybe destroyed or re-enqueued) |
649 | * and caller must propagate yield upwards (cancel and return). |
650 | */ |
651 | rd_kafka_op_res_t |
652 | rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
653 | rd_kafka_q_cb_type_t cb_type, void *opaque, |
654 | rd_kafka_q_serve_cb_t *callback) { |
655 | rd_kafka_op_res_t res; |
656 | |
657 | res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type); |
658 | if (res == RD_KAFKA_OP_RES_KEEP) { |
659 | /* Op was handled but must not be destroyed. */ |
660 | return res; |
661 | } if (res == RD_KAFKA_OP_RES_HANDLED) { |
662 | rd_kafka_op_destroy(rko); |
663 | return res; |
664 | } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD)) |
665 | return res; |
666 | |
667 | if (rko->rko_serve) { |
668 | callback = rko->rko_serve; |
669 | opaque = rko->rko_serve_opaque; |
670 | rko->rko_serve = NULL; |
671 | rko->rko_serve_opaque = NULL; |
672 | } |
673 | |
674 | if (callback) |
675 | res = callback(rk, rkq, rko, cb_type, opaque); |
676 | |
677 | return res; |
678 | } |
679 | |
680 | |
681 | /** |
682 | * @brief Store offset for fetched message. |
683 | */ |
684 | void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko, |
685 | const rd_kafka_message_t *rkmessage) { |
686 | rd_kafka_toppar_t *rktp; |
687 | |
688 | if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH || rko->rko_err)) |
689 | return; |
690 | |
691 | rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
692 | |
693 | if (unlikely(!rk)) |
694 | rk = rktp->rktp_rkt->rkt_rk; |
695 | |
696 | rd_kafka_toppar_lock(rktp); |
697 | rktp->rktp_app_offset = rkmessage->offset+1; |
698 | if (rk->rk_conf.enable_auto_offset_store) |
699 | rd_kafka_offset_store0(rktp, rkmessage->offset+1, 0/*no lock*/); |
700 | rd_kafka_toppar_unlock(rktp); |
701 | } |
702 | |