1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #include "rd.h" |
30 | #include "rdkafka_int.h" |
31 | #include "rdkafka_msg.h" |
32 | #include "rdkafka_topic.h" |
33 | #include "rdkafka_partition.h" |
34 | #include "rdkafka_interceptor.h" |
35 | #include "rdkafka_header.h" |
36 | #include "rdkafka_idempotence.h" |
37 | #include "rdcrc32.h" |
38 | #include "rdmurmur2.h" |
39 | #include "rdrand.h" |
40 | #include "rdtime.h" |
41 | #include "rdsysqueue.h" |
42 | #include "rdunittest.h" |
43 | |
44 | #include <stdarg.h> |
45 | |
46 | void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) { |
47 | |
48 | if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) { |
49 | rd_dassert(rk || rkm->rkm_rkmessage.rkt); |
50 | rd_kafka_curr_msgs_sub( |
51 | rk ? rk : |
52 | rd_kafka_topic_a2i(rkm->rkm_rkmessage.rkt)->rkt_rk, |
53 | 1, rkm->rkm_len); |
54 | } |
55 | |
56 | if (rkm->rkm_headers) |
57 | rd_kafka_headers_destroy(rkm->rkm_headers); |
58 | |
59 | if (likely(rkm->rkm_rkmessage.rkt != NULL)) |
60 | rd_kafka_topic_destroy0( |
61 | rd_kafka_topic_a2s(rkm->rkm_rkmessage.rkt)); |
62 | |
63 | if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload) |
64 | rd_free(rkm->rkm_payload); |
65 | |
66 | if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM) |
67 | rd_free(rkm); |
68 | } |
69 | |
70 | |
71 | |
72 | /** |
73 | * @brief Create a new Producer message, copying the payload as |
74 | * indicated by msgflags. |
75 | * |
76 | * @returns the new message |
77 | */ |
78 | static |
79 | rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt, |
80 | int32_t partition, |
81 | int msgflags, |
82 | char *payload, size_t len, |
83 | const void *key, size_t keylen, |
84 | void *msg_opaque) { |
85 | rd_kafka_msg_t *rkm; |
86 | size_t mlen = sizeof(*rkm); |
87 | char *p; |
88 | |
89 | /* If we are to make a copy of the payload, allocate space for it too */ |
90 | if (msgflags & RD_KAFKA_MSG_F_COPY) { |
91 | msgflags &= ~RD_KAFKA_MSG_F_FREE; |
92 | mlen += len; |
93 | } |
94 | |
95 | mlen += keylen; |
96 | |
97 | /* Note: using rd_malloc here, not rd_calloc, so make sure all fields |
98 | * are properly set up. */ |
99 | rkm = rd_malloc(mlen); |
100 | rkm->rkm_err = 0; |
101 | rkm->rkm_flags = (RD_KAFKA_MSG_F_PRODUCER | |
102 | RD_KAFKA_MSG_F_FREE_RKM | msgflags); |
103 | rkm->rkm_len = len; |
104 | rkm->rkm_opaque = msg_opaque; |
105 | rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt); |
106 | |
107 | rkm->rkm_partition = partition; |
108 | rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID; |
109 | rkm->rkm_timestamp = 0; |
110 | rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; |
111 | rkm->rkm_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; |
112 | rkm->rkm_headers = NULL; |
113 | |
114 | p = (char *)(rkm+1); |
115 | |
116 | if (payload && msgflags & RD_KAFKA_MSG_F_COPY) { |
117 | /* Copy payload to space following the ..msg_t */ |
118 | rkm->rkm_payload = p; |
119 | memcpy(rkm->rkm_payload, payload, len); |
120 | p += len; |
121 | |
122 | } else { |
123 | /* Just point to the provided payload. */ |
124 | rkm->rkm_payload = payload; |
125 | } |
126 | |
127 | if (key) { |
128 | rkm->rkm_key = p; |
129 | rkm->rkm_key_len = keylen; |
130 | memcpy(rkm->rkm_key, key, keylen); |
131 | } else { |
132 | rkm->rkm_key = NULL; |
133 | rkm->rkm_key_len = 0; |
134 | } |
135 | |
136 | |
137 | return rkm; |
138 | } |
139 | |
140 | |
141 | |
142 | |
143 | /** |
144 | * @brief Create a new Producer message. |
145 | * |
146 | * @remark Must only be used by producer code. |
147 | * |
148 | * Returns 0 on success or -1 on error. |
149 | * Both errno and 'errp' are set appropriately. |
150 | */ |
151 | static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt, |
152 | int32_t force_partition, |
153 | int msgflags, |
154 | char *payload, size_t len, |
155 | const void *key, size_t keylen, |
156 | void *msg_opaque, |
157 | rd_kafka_resp_err_t *errp, |
158 | int *errnop, |
159 | rd_kafka_headers_t *hdrs, |
160 | int64_t timestamp, |
161 | rd_ts_t now) { |
162 | rd_kafka_msg_t *rkm; |
163 | size_t hdrs_size = 0; |
164 | |
165 | if (unlikely(!payload)) |
166 | len = 0; |
167 | if (!key) |
168 | keylen = 0; |
169 | if (hdrs) |
170 | hdrs_size = rd_kafka_headers_serialized_size(hdrs); |
171 | |
172 | if (unlikely(len + keylen + hdrs_size > |
173 | (size_t)rkt->rkt_rk->rk_conf.max_msg_size || |
174 | keylen > INT32_MAX)) { |
175 | *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; |
176 | if (errnop) |
177 | *errnop = EMSGSIZE; |
178 | return NULL; |
179 | } |
180 | |
181 | if (msgflags & RD_KAFKA_MSG_F_BLOCK) |
182 | *errp = rd_kafka_curr_msgs_add( |
183 | rkt->rkt_rk, 1, len, 1/*block*/, |
184 | (msgflags & RD_KAFKA_MSG_F_RKT_RDLOCKED) ? |
185 | &rkt->rkt_lock : NULL); |
186 | else |
187 | *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len, 0, NULL); |
188 | |
189 | if (unlikely(*errp)) { |
190 | if (errnop) |
191 | *errnop = ENOBUFS; |
192 | return NULL; |
193 | } |
194 | |
195 | |
196 | rkm = rd_kafka_msg_new00(rkt, force_partition, |
197 | msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */, |
198 | payload, len, key, keylen, msg_opaque); |
199 | |
200 | memset(&rkm->rkm_u.producer, 0, sizeof(rkm->rkm_u.producer)); |
201 | |
202 | if (timestamp) |
203 | rkm->rkm_timestamp = timestamp; |
204 | else |
205 | rkm->rkm_timestamp = rd_uclock()/1000; |
206 | rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME; |
207 | |
208 | if (hdrs) { |
209 | rd_dassert(!rkm->rkm_headers); |
210 | rkm->rkm_headers = hdrs; |
211 | } |
212 | |
213 | rkm->rkm_ts_enq = now; |
214 | |
215 | if (rkt->rkt_conf.message_timeout_ms == 0) { |
216 | rkm->rkm_ts_timeout = INT64_MAX; |
217 | } else { |
218 | rkm->rkm_ts_timeout = now + |
219 | (int64_t) rkt->rkt_conf.message_timeout_ms * 1000; |
220 | } |
221 | |
222 | /* Call interceptor chain for on_send */ |
223 | rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage); |
224 | |
225 | return rkm; |
226 | } |
227 | |
228 | |
229 | /** |
230 | * @brief Produce: creates a new message, runs the partitioner and enqueues |
231 | * into on the selected partition. |
232 | * |
233 | * @returns 0 on success or -1 on error. |
234 | * |
235 | * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then |
236 | * the memory associated with the payload is still the caller's |
237 | * responsibility. |
238 | * |
239 | * @locks none |
240 | */ |
241 | int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition, |
242 | int msgflags, |
243 | char *payload, size_t len, |
244 | const void *key, size_t keylen, |
245 | void *msg_opaque) { |
246 | rd_kafka_msg_t *rkm; |
247 | rd_kafka_resp_err_t err; |
248 | int errnox; |
249 | |
250 | if (unlikely((err = rd_kafka_fatal_error_code(rkt->rkt_rk)))) { |
251 | rd_kafka_set_last_error(err, ECANCELED); |
252 | return -1; |
253 | } |
254 | |
255 | /* Create message */ |
256 | rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, |
257 | payload, len, key, keylen, msg_opaque, |
258 | &err, &errnox, NULL, 0, rd_clock()); |
259 | if (unlikely(!rkm)) { |
260 | /* errno is already set by msg_new() */ |
261 | rd_kafka_set_last_error(err, errnox); |
262 | return -1; |
263 | } |
264 | |
265 | |
266 | /* Partition the message */ |
267 | err = rd_kafka_msg_partitioner(rkt, rkm, 1); |
268 | if (likely(!err)) { |
269 | rd_kafka_set_last_error(0, 0); |
270 | return 0; |
271 | } |
272 | |
273 | /* Interceptor: unroll failing messages by triggering on_ack.. */ |
274 | rkm->rkm_err = err; |
275 | rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk, |
276 | &rkm->rkm_rkmessage); |
277 | |
278 | /* Handle partitioner failures: it only fails when the application |
279 | * attempts to force a destination partition that does not exist |
280 | * in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE |
281 | * flag since our contract says we don't free the payload on |
282 | * failure. */ |
283 | |
284 | rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE; |
285 | rd_kafka_msg_destroy(rkt->rkt_rk, rkm); |
286 | |
287 | /* Translate error codes to errnos. */ |
288 | if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) |
289 | rd_kafka_set_last_error(err, ESRCH); |
290 | else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) |
291 | rd_kafka_set_last_error(err, ENOENT); |
292 | else |
293 | rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */ |
294 | |
295 | return -1; |
296 | } |
297 | |
298 | |
299 | rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...) { |
300 | va_list ap; |
301 | rd_kafka_msg_t s_rkm = { |
302 | /* Message defaults */ |
303 | .rkm_partition = RD_KAFKA_PARTITION_UA, |
304 | .rkm_timestamp = 0, /* current time */ |
305 | }; |
306 | rd_kafka_msg_t *rkm = &s_rkm; |
307 | rd_kafka_vtype_t vtype; |
308 | rd_kafka_topic_t *app_rkt; |
309 | shptr_rd_kafka_itopic_t *s_rkt = NULL; |
310 | rd_kafka_itopic_t *rkt; |
311 | rd_kafka_resp_err_t err; |
312 | rd_kafka_headers_t *hdrs = NULL; |
313 | rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */ |
314 | |
315 | if (unlikely((err = rd_kafka_fatal_error_code(rk)))) |
316 | return err; |
317 | |
318 | va_start(ap, rk); |
319 | while (!err && |
320 | (vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) { |
321 | switch (vtype) |
322 | { |
323 | case RD_KAFKA_VTYPE_TOPIC: |
324 | s_rkt = rd_kafka_topic_new0(rk, |
325 | va_arg(ap, const char *), |
326 | NULL, NULL, 1); |
327 | break; |
328 | |
329 | case RD_KAFKA_VTYPE_RKT: |
330 | app_rkt = va_arg(ap, rd_kafka_topic_t *); |
331 | s_rkt = rd_kafka_topic_keep( |
332 | rd_kafka_topic_a2i(app_rkt)); |
333 | break; |
334 | |
335 | case RD_KAFKA_VTYPE_PARTITION: |
336 | rkm->rkm_partition = va_arg(ap, int32_t); |
337 | break; |
338 | |
339 | case RD_KAFKA_VTYPE_VALUE: |
340 | rkm->rkm_payload = va_arg(ap, void *); |
341 | rkm->rkm_len = va_arg(ap, size_t); |
342 | break; |
343 | |
344 | case RD_KAFKA_VTYPE_KEY: |
345 | rkm->rkm_key = va_arg(ap, void *); |
346 | rkm->rkm_key_len = va_arg(ap, size_t); |
347 | break; |
348 | |
349 | case RD_KAFKA_VTYPE_OPAQUE: |
350 | rkm->rkm_opaque = va_arg(ap, void *); |
351 | break; |
352 | |
353 | case RD_KAFKA_VTYPE_MSGFLAGS: |
354 | rkm->rkm_flags = va_arg(ap, int); |
355 | break; |
356 | |
357 | case RD_KAFKA_VTYPE_TIMESTAMP: |
358 | rkm->rkm_timestamp = va_arg(ap, int64_t); |
359 | break; |
360 | |
361 | case RD_KAFKA_VTYPE_HEADER: |
362 | { |
363 | const char *name; |
364 | const void *value; |
365 | ssize_t size; |
366 | |
367 | if (unlikely(app_hdrs != NULL)) { |
368 | err = RD_KAFKA_RESP_ERR__CONFLICT; |
369 | break; |
370 | } |
371 | |
372 | if (unlikely(!hdrs)) |
373 | hdrs = rd_kafka_headers_new(8); |
374 | |
375 | name = va_arg(ap, const char *); |
376 | value = va_arg(ap, const void *); |
377 | size = va_arg(ap, ssize_t); |
378 | |
379 | err = rd_kafka_header_add(hdrs, name, -1, value, size); |
380 | } |
381 | break; |
382 | |
383 | case RD_KAFKA_VTYPE_HEADERS: |
384 | if (unlikely(hdrs != NULL)) { |
385 | err = RD_KAFKA_RESP_ERR__CONFLICT; |
386 | break; |
387 | } |
388 | app_hdrs = va_arg(ap, rd_kafka_headers_t *); |
389 | break; |
390 | |
391 | default: |
392 | err = RD_KAFKA_RESP_ERR__INVALID_ARG; |
393 | break; |
394 | } |
395 | } |
396 | |
397 | va_end(ap); |
398 | |
399 | if (unlikely(!s_rkt)) |
400 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
401 | |
402 | rkt = rd_kafka_topic_s2i(s_rkt); |
403 | |
404 | if (likely(!err)) |
405 | rkm = rd_kafka_msg_new0(rkt, |
406 | rkm->rkm_partition, |
407 | rkm->rkm_flags, |
408 | rkm->rkm_payload, rkm->rkm_len, |
409 | rkm->rkm_key, rkm->rkm_key_len, |
410 | rkm->rkm_opaque, |
411 | &err, NULL, |
412 | app_hdrs ? app_hdrs : hdrs, |
413 | rkm->rkm_timestamp, |
414 | rd_clock()); |
415 | |
416 | if (unlikely(err)) { |
417 | rd_kafka_topic_destroy0(s_rkt); |
418 | if (hdrs) |
419 | rd_kafka_headers_destroy(hdrs); |
420 | return err; |
421 | } |
422 | |
423 | /* Partition the message */ |
424 | err = rd_kafka_msg_partitioner(rkt, rkm, 1); |
425 | if (unlikely(err)) { |
426 | /* Handle partitioner failures: it only fails when |
427 | * the application attempts to force a destination |
428 | * partition that does not exist in the cluster. */ |
429 | |
430 | /* Interceptors: Unroll on_send by on_ack.. */ |
431 | rkm->rkm_err = err; |
432 | rd_kafka_interceptors_on_acknowledgement(rk, |
433 | &rkm->rkm_rkmessage); |
434 | |
435 | /* Note we must clear the RD_KAFKA_MSG_F_FREE |
436 | * flag since our contract says we don't free the payload on |
437 | * failure. */ |
438 | rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE; |
439 | |
440 | /* Deassociate application owned headers from message |
441 | * since headers remain in application ownership |
442 | * when producev() fails */ |
443 | if (app_hdrs && app_hdrs == rkm->rkm_headers) |
444 | rkm->rkm_headers = NULL; |
445 | |
446 | rd_kafka_msg_destroy(rk, rkm); |
447 | } |
448 | |
449 | rd_kafka_topic_destroy0(s_rkt); |
450 | |
451 | return err; |
452 | } |
453 | |
454 | |
455 | |
456 | /** |
457 | * @brief Produce a single message. |
458 | * @locality any application thread |
459 | * @locks none |
460 | */ |
461 | int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition, |
462 | int msgflags, |
463 | void *payload, size_t len, |
464 | const void *key, size_t keylen, |
465 | void *msg_opaque) { |
466 | return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition, |
467 | msgflags, payload, len, |
468 | key, keylen, msg_opaque); |
469 | } |
470 | |
471 | |
472 | |
473 | /** |
474 | * Produce a batch of messages. |
475 | * Returns the number of messages succesfully queued for producing. |
476 | * Each message's .err will be set accordingly. |
477 | */ |
478 | int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition, |
479 | int msgflags, |
480 | rd_kafka_message_t *rkmessages, int message_cnt) { |
481 | rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq); |
482 | int i; |
483 | int64_t utc_now = rd_uclock() / 1000; |
484 | rd_ts_t now = rd_clock(); |
485 | int good = 0; |
486 | int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA || |
487 | (msgflags & RD_KAFKA_MSG_F_PARTITION)); |
488 | rd_kafka_resp_err_t all_err; |
489 | rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt); |
490 | shptr_rd_kafka_toppar_t *s_rktp = NULL; |
491 | |
492 | /* Propagated per-message below */ |
493 | all_err = rd_kafka_fatal_error_code(rkt->rkt_rk); |
494 | |
495 | rd_kafka_topic_rdlock(rkt); |
496 | if (!multiple_partitions) { |
497 | /* Single partition: look up the rktp once. */ |
498 | s_rktp = rd_kafka_toppar_get_avail(rkt, partition, |
499 | 1/*ua on miss*/, &all_err); |
500 | |
501 | } else { |
502 | /* Indicate to lower-level msg_new..() that rkt is locked |
503 | * so that they may unlock it momentarily if blocking. */ |
504 | msgflags |= RD_KAFKA_MSG_F_RKT_RDLOCKED; |
505 | } |
506 | |
507 | for (i = 0 ; i < message_cnt ; i++) { |
508 | rd_kafka_msg_t *rkm; |
509 | |
510 | /* Propagate error for all messages. */ |
511 | if (unlikely(all_err)) { |
512 | rkmessages[i].err = all_err; |
513 | continue; |
514 | } |
515 | |
516 | /* Create message */ |
517 | rkm = rd_kafka_msg_new0(rkt, |
518 | (msgflags & RD_KAFKA_MSG_F_PARTITION) ? |
519 | rkmessages[i].partition : partition, |
520 | msgflags, |
521 | rkmessages[i].payload, |
522 | rkmessages[i].len, |
523 | rkmessages[i].key, |
524 | rkmessages[i].key_len, |
525 | rkmessages[i]._private, |
526 | &rkmessages[i].err, NULL, |
527 | NULL, utc_now, now); |
528 | if (unlikely(!rkm)) { |
529 | if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL) |
530 | all_err = rkmessages[i].err; |
531 | continue; |
532 | } |
533 | |
534 | /* Three cases here: |
535 | * partition==UA: run the partitioner (slow) |
536 | * RD_KAFKA_MSG_F_PARTITION: produce message to specified |
537 | * partition |
538 | * fixed partition: simply concatenate the queue |
539 | * to partit */ |
540 | if (multiple_partitions) { |
541 | if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) { |
542 | /* Partition the message */ |
543 | rkmessages[i].err = |
544 | rd_kafka_msg_partitioner( |
545 | rkt, rkm, 0/*already locked*/); |
546 | } else { |
547 | if (s_rktp == NULL || |
548 | rkm->rkm_partition != |
549 | rd_kafka_toppar_s2i(s_rktp)-> |
550 | rktp_partition) { |
551 | rd_kafka_resp_err_t err; |
552 | if (s_rktp != NULL) |
553 | rd_kafka_toppar_destroy(s_rktp); |
554 | s_rktp = rd_kafka_toppar_get_avail( |
555 | rkt, rkm->rkm_partition, |
556 | 1/*ua on miss*/, &err); |
557 | |
558 | if (unlikely(!s_rktp)) { |
559 | rkmessages[i].err = err; |
560 | continue; |
561 | } |
562 | } |
563 | rd_kafka_toppar_enq_msg( |
564 | rd_kafka_toppar_s2i(s_rktp), rkm); |
565 | } |
566 | |
567 | if (unlikely(rkmessages[i].err)) { |
568 | /* Interceptors: Unroll on_send by on_ack.. */ |
569 | rd_kafka_interceptors_on_acknowledgement( |
570 | rkt->rkt_rk, &rkmessages[i]); |
571 | |
572 | rd_kafka_msg_destroy(rkt->rkt_rk, rkm); |
573 | continue; |
574 | } |
575 | |
576 | |
577 | } else { |
578 | /* Single destination partition. */ |
579 | rd_kafka_toppar_enq_msg(rd_kafka_toppar_s2i(s_rktp), |
580 | rkm); |
581 | } |
582 | |
583 | rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; |
584 | good++; |
585 | } |
586 | |
587 | rd_kafka_topic_rdunlock(rkt); |
588 | if (s_rktp != NULL) |
589 | rd_kafka_toppar_destroy(s_rktp); |
590 | |
591 | return good; |
592 | } |
593 | |
594 | /** |
595 | * @brief Scan \p rkmq for messages that have timed out and remove them from |
596 | * \p rkmq and add to \p timedout queue. |
597 | * |
598 | * @returns the number of messages timed out. |
599 | * |
600 | * @locality any |
601 | * @locks toppar_lock MUST be held |
602 | */ |
603 | int rd_kafka_msgq_age_scan (rd_kafka_toppar_t *rktp, |
604 | rd_kafka_msgq_t *rkmq, |
605 | rd_kafka_msgq_t *timedout, |
606 | rd_ts_t now) { |
607 | rd_kafka_msg_t *rkm, *tmp, *first = NULL; |
608 | int cnt = timedout->rkmq_msg_cnt; |
609 | |
610 | /* Assume messages are added in time sequencial order */ |
611 | TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) { |
612 | /* NOTE: this is not true for the deprecated (and soon removed) |
613 | * LIFO queuing strategy. */ |
614 | if (likely(rkm->rkm_ts_timeout > now)) |
615 | break; |
616 | |
617 | if (!first) |
618 | first = rkm; |
619 | |
620 | rd_kafka_msgq_deq(rkmq, rkm, 1); |
621 | rd_kafka_msgq_enq(timedout, rkm); |
622 | } |
623 | |
624 | return timedout->rkmq_msg_cnt - cnt; |
625 | } |
626 | |
627 | |
628 | int |
629 | rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq, |
630 | rd_kafka_msg_t *rkm, |
631 | int (*order_cmp) (const void *, const void *)) { |
632 | TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *, |
633 | rkm_link, order_cmp); |
634 | rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len; |
635 | return ++rkmq->rkmq_msg_cnt; |
636 | } |
637 | |
638 | int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt, |
639 | rd_kafka_msgq_t *rkmq, |
640 | rd_kafka_msg_t *rkm) { |
641 | rd_dassert(rkm->rkm_u.producer.msgid != 0); |
642 | return rd_kafka_msgq_enq_sorted0(rkmq, rkm, |
643 | rkt->rkt_conf.msg_order_cmp); |
644 | } |
645 | |
646 | /** |
647 | * @brief Find the insert position (i.e., the previous element) |
648 | * for message \p rkm. |
649 | * |
650 | * @returns the insert position element, or NULL if \p rkm should be |
651 | * added at head of queue. |
652 | */ |
653 | rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq, |
654 | const rd_kafka_msg_t *rkm, |
655 | int (*cmp) (const void *, |
656 | const void *)) { |
657 | const rd_kafka_msg_t *curr, *last = NULL; |
658 | |
659 | TAILQ_FOREACH(curr, &rkmq->rkmq_msgs, rkm_link) { |
660 | if (cmp(rkm, curr) < 0) |
661 | return (rd_kafka_msg_t *)last; |
662 | last = curr; |
663 | } |
664 | |
665 | return (rd_kafka_msg_t *)last; |
666 | } |
667 | |
668 | |
669 | /** |
670 | * @brief Set per-message metadata for all messages in \p rkmq |
671 | */ |
672 | void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq, |
673 | int64_t base_offset, int64_t timestamp, |
674 | rd_kafka_msg_status_t status) { |
675 | rd_kafka_msg_t *rkm; |
676 | |
677 | TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { |
678 | rkm->rkm_offset = base_offset++; |
679 | if (timestamp != -1) { |
680 | rkm->rkm_timestamp = timestamp; |
681 | rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME; |
682 | } |
683 | |
684 | /* Don't downgrade a message from any form of PERSISTED |
685 | * to NOT_PERSISTED, since the original cause of indicating |
686 | * PERSISTED can't be changed. |
687 | * E.g., a previous ack or in-flight timeout. */ |
688 | if (unlikely(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED && |
689 | rkm->rkm_status != RD_KAFKA_MSG_STATUS_NOT_PERSISTED)) |
690 | continue; |
691 | |
692 | rkm->rkm_status = status; |
693 | } |
694 | } |
695 | |
696 | |
697 | /** |
698 | * @brief Move all messages in \p src to \p dst whose msgid <= last_msgid. |
699 | * |
700 | * @remark src must be ordered |
701 | */ |
702 | void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src, |
703 | uint64_t last_msgid, |
704 | rd_kafka_msg_status_t status) { |
705 | rd_kafka_msg_t *rkm; |
706 | |
707 | while ((rkm = rd_kafka_msgq_first(src)) && |
708 | rkm->rkm_u.producer.msgid <= last_msgid) { |
709 | rd_kafka_msgq_deq(src, rkm, 1); |
710 | rd_kafka_msgq_enq(dest, rkm); |
711 | |
712 | rkm->rkm_status = status; |
713 | } |
714 | |
715 | rd_kafka_msgq_verify_order(NULL, dest, 0, rd_false); |
716 | rd_kafka_msgq_verify_order(NULL, src, 0, rd_false); |
717 | } |
718 | |
719 | |
720 | |
721 | int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt, |
722 | const void *key, size_t keylen, |
723 | int32_t partition_cnt, |
724 | void *rkt_opaque, |
725 | void *msg_opaque) { |
726 | int32_t p = rd_jitter(0, partition_cnt-1); |
727 | if (unlikely(!rd_kafka_topic_partition_available(rkt, p))) |
728 | return rd_jitter(0, partition_cnt-1); |
729 | else |
730 | return p; |
731 | } |
732 | |
733 | int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt, |
734 | const void *key, size_t keylen, |
735 | int32_t partition_cnt, |
736 | void *rkt_opaque, |
737 | void *msg_opaque) { |
738 | return rd_crc32(key, keylen) % partition_cnt; |
739 | } |
740 | |
741 | int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt, |
742 | const void *key, size_t keylen, |
743 | int32_t partition_cnt, |
744 | void *rkt_opaque, |
745 | void *msg_opaque) { |
746 | if (keylen == 0) |
747 | return rd_kafka_msg_partitioner_random(rkt, |
748 | key, |
749 | keylen, |
750 | partition_cnt, |
751 | rkt_opaque, |
752 | msg_opaque); |
753 | else |
754 | return rd_kafka_msg_partitioner_consistent(rkt, |
755 | key, |
756 | keylen, |
757 | partition_cnt, |
758 | rkt_opaque, |
759 | msg_opaque); |
760 | } |
761 | |
762 | int32_t |
763 | rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt, |
764 | const void *key, size_t keylen, |
765 | int32_t partition_cnt, |
766 | void *rkt_opaque, |
767 | void *msg_opaque) { |
768 | return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt; |
769 | } |
770 | |
771 | int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt, |
772 | const void *key, size_t keylen, |
773 | int32_t partition_cnt, |
774 | void *rkt_opaque, |
775 | void *msg_opaque) { |
776 | if (!key) |
777 | return rd_kafka_msg_partitioner_random(rkt, |
778 | key, |
779 | keylen, |
780 | partition_cnt, |
781 | rkt_opaque, |
782 | msg_opaque); |
783 | else |
784 | return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt; |
785 | } |
786 | |
787 | |
788 | /** |
789 | * Assigns a message to a topic partition using a partitioner. |
790 | * Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if |
791 | * partitioning failed, or 0 on success. |
792 | */ |
793 | int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm, |
794 | int do_lock) { |
795 | int32_t partition; |
796 | rd_kafka_toppar_t *rktp_new; |
797 | shptr_rd_kafka_toppar_t *s_rktp_new; |
798 | rd_kafka_resp_err_t err; |
799 | |
800 | if (do_lock) |
801 | rd_kafka_topic_rdlock(rkt); |
802 | |
803 | switch (rkt->rkt_state) |
804 | { |
805 | case RD_KAFKA_TOPIC_S_UNKNOWN: |
806 | /* No metadata received from cluster yet. |
807 | * Put message in UA partition and re-run partitioner when |
808 | * cluster comes up. */ |
809 | partition = RD_KAFKA_PARTITION_UA; |
810 | break; |
811 | |
812 | case RD_KAFKA_TOPIC_S_NOTEXISTS: |
813 | /* Topic not found in cluster. |
814 | * Fail message immediately. */ |
815 | err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
816 | if (do_lock) |
817 | rd_kafka_topic_rdunlock(rkt); |
818 | return err; |
819 | |
820 | case RD_KAFKA_TOPIC_S_EXISTS: |
821 | /* Topic exists in cluster. */ |
822 | |
823 | /* Topic exists but has no partitions. |
824 | * This is usually an transient state following the |
825 | * auto-creation of a topic. */ |
826 | if (unlikely(rkt->rkt_partition_cnt == 0)) { |
827 | partition = RD_KAFKA_PARTITION_UA; |
828 | break; |
829 | } |
830 | |
831 | /* Partition not assigned, run partitioner. */ |
832 | if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) { |
833 | rd_kafka_topic_t *app_rkt; |
834 | /* Provide a temporary app_rkt instance to protect |
835 | * from the case where the application decided to |
836 | * destroy its topic object prior to delivery completion |
837 | * (issue #502). */ |
838 | app_rkt = rd_kafka_topic_keep_a(rkt); |
839 | partition = rkt->rkt_conf. |
840 | partitioner(app_rkt, |
841 | rkm->rkm_key, |
842 | rkm->rkm_key_len, |
843 | rkt->rkt_partition_cnt, |
844 | rkt->rkt_conf.opaque, |
845 | rkm->rkm_opaque); |
846 | rd_kafka_topic_destroy0( |
847 | rd_kafka_topic_a2s(app_rkt)); |
848 | } else |
849 | partition = rkm->rkm_partition; |
850 | |
851 | /* Check that partition exists. */ |
852 | if (partition >= rkt->rkt_partition_cnt) { |
853 | err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
854 | if (do_lock) |
855 | rd_kafka_topic_rdunlock(rkt); |
856 | return err; |
857 | } |
858 | break; |
859 | |
860 | default: |
861 | rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED" ); |
862 | break; |
863 | } |
864 | |
865 | /* Get new partition */ |
866 | s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0); |
867 | |
868 | if (unlikely(!s_rktp_new)) { |
869 | /* Unknown topic or partition */ |
870 | if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) |
871 | err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; |
872 | else |
873 | err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; |
874 | |
875 | if (do_lock) |
876 | rd_kafka_topic_rdunlock(rkt); |
877 | |
878 | return err; |
879 | } |
880 | |
881 | rktp_new = rd_kafka_toppar_s2i(s_rktp_new); |
882 | rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1); |
883 | |
884 | /* Update message partition */ |
885 | if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) |
886 | rkm->rkm_partition = partition; |
887 | |
888 | /* Partition is available: enqueue msg on partition's queue */ |
889 | rd_kafka_toppar_enq_msg(rktp_new, rkm); |
890 | if (do_lock) |
891 | rd_kafka_topic_rdunlock(rkt); |
892 | rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */ |
893 | return 0; |
894 | } |
895 | |
896 | |
897 | |
898 | |
899 | /** |
900 | * @name Public message type (rd_kafka_message_t) |
901 | */ |
902 | void rd_kafka_message_destroy (rd_kafka_message_t *rkmessage) { |
903 | rd_kafka_op_t *rko; |
904 | |
905 | if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL)) |
906 | rd_kafka_op_destroy(rko); |
907 | else { |
908 | rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage); |
909 | rd_kafka_msg_destroy(NULL, rkm); |
910 | } |
911 | } |
912 | |
913 | |
914 | rd_kafka_message_t *rd_kafka_message_new (void) { |
915 | rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm)); |
916 | return (rd_kafka_message_t *)rkm; |
917 | } |
918 | |
919 | |
920 | /** |
921 | * @brief Set up a rkmessage from an rko for passing to the application. |
922 | * @remark Will trigger on_consume() interceptors if any. |
923 | */ |
924 | static rd_kafka_message_t * |
925 | rd_kafka_message_setup (rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) { |
926 | rd_kafka_itopic_t *rkt; |
927 | rd_kafka_toppar_t *rktp = NULL; |
928 | |
929 | if (rko->rko_type == RD_KAFKA_OP_DR) { |
930 | rkt = rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt); |
931 | } else { |
932 | if (rko->rko_rktp) { |
933 | rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
934 | rkt = rktp->rktp_rkt; |
935 | } else |
936 | rkt = NULL; |
937 | |
938 | rkmessage->_private = rko; |
939 | } |
940 | |
941 | |
942 | if (!rkmessage->rkt && rkt) |
943 | rkmessage->rkt = rd_kafka_topic_keep_a(rkt); |
944 | |
945 | if (rktp) |
946 | rkmessage->partition = rktp->rktp_partition; |
947 | |
948 | if (!rkmessage->err) |
949 | rkmessage->err = rko->rko_err; |
950 | |
951 | /* Call on_consume interceptors */ |
952 | switch (rko->rko_type) |
953 | { |
954 | case RD_KAFKA_OP_FETCH: |
955 | if (!rkmessage->err && rkt) |
956 | rd_kafka_interceptors_on_consume(rkt->rkt_rk, |
957 | rkmessage); |
958 | break; |
959 | |
960 | default: |
961 | break; |
962 | } |
963 | |
964 | return rkmessage; |
965 | } |
966 | |
967 | |
968 | |
969 | /** |
970 | * @brief Get rkmessage from rkm (for EVENT_DR) |
971 | * @remark Must only be called just prior to passing a dr to the application. |
972 | */ |
973 | rd_kafka_message_t *rd_kafka_message_get_from_rkm (rd_kafka_op_t *rko, |
974 | rd_kafka_msg_t *rkm) { |
975 | return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage); |
976 | } |
977 | |
978 | /** |
979 | * @brief Convert rko to rkmessage |
980 | * @remark Must only be called just prior to passing a consumed message |
981 | * or event to the application. |
982 | * @remark Will trigger on_consume() interceptors, if any. |
983 | * @returns a rkmessage (bound to the rko). |
984 | */ |
985 | rd_kafka_message_t *rd_kafka_message_get (rd_kafka_op_t *rko) { |
986 | rd_kafka_message_t *rkmessage; |
987 | |
988 | if (!rko) |
989 | return rd_kafka_message_new(); /* empty */ |
990 | |
991 | switch (rko->rko_type) |
992 | { |
993 | case RD_KAFKA_OP_FETCH: |
994 | /* Use embedded rkmessage */ |
995 | rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage; |
996 | break; |
997 | |
998 | case RD_KAFKA_OP_ERR: |
999 | case RD_KAFKA_OP_CONSUMER_ERR: |
1000 | rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage; |
1001 | rkmessage->payload = rko->rko_u.err.errstr; |
1002 | rkmessage->len = rkmessage->payload ? |
1003 | strlen(rkmessage->payload) : 0; |
1004 | rkmessage->offset = rko->rko_u.err.offset; |
1005 | break; |
1006 | |
1007 | default: |
1008 | rd_kafka_assert(NULL, !*"unhandled optype" ); |
1009 | RD_NOTREACHED(); |
1010 | return NULL; |
1011 | } |
1012 | |
1013 | return rd_kafka_message_setup(rko, rkmessage); |
1014 | } |
1015 | |
1016 | |
1017 | int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage, |
1018 | rd_kafka_timestamp_type_t *tstype) { |
1019 | rd_kafka_msg_t *rkm; |
1020 | |
1021 | if (rkmessage->err) { |
1022 | if (tstype) |
1023 | *tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; |
1024 | return -1; |
1025 | } |
1026 | |
1027 | rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); |
1028 | |
1029 | if (tstype) |
1030 | *tstype = rkm->rkm_tstype; |
1031 | |
1032 | return rkm->rkm_timestamp; |
1033 | } |
1034 | |
1035 | |
1036 | int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage) { |
1037 | rd_kafka_msg_t *rkm; |
1038 | |
1039 | rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); |
1040 | |
1041 | if (unlikely(!rkm->rkm_ts_enq)) |
1042 | return -1; |
1043 | |
1044 | return rd_clock() - rkm->rkm_ts_enq; |
1045 | } |
1046 | |
1047 | |
1048 | |
1049 | /** |
1050 | * @brief Parse serialized message headers and populate |
1051 | * rkm->rkm_headers (which must be NULL). |
1052 | */ |
1053 | static rd_kafka_resp_err_t (rd_kafka_msg_t *rkm) { |
1054 | rd_kafka_buf_t *rkbuf; |
1055 | int64_t ; |
1056 | const int log_decode_errors = 0; |
1057 | rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG; |
1058 | int i; |
1059 | rd_kafka_headers_t *hdrs = NULL; |
1060 | |
1061 | rd_dassert(!rkm->rkm_headers); |
1062 | |
1063 | if (RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs) == 0) |
1064 | return RD_KAFKA_RESP_ERR__NOENT; |
1065 | |
1066 | rkbuf = rd_kafka_buf_new_shadow(rkm->rkm_u.consumer.binhdrs.data, |
1067 | RD_KAFKAP_BYTES_LEN(&rkm->rkm_u. |
1068 | consumer.binhdrs), |
1069 | NULL); |
1070 | |
1071 | rd_kafka_buf_read_varint(rkbuf, &HeaderCount); |
1072 | |
1073 | if (HeaderCount <= 0) { |
1074 | rd_kafka_buf_destroy(rkbuf); |
1075 | return RD_KAFKA_RESP_ERR__NOENT; |
1076 | } else if (unlikely(HeaderCount > 100000)) { |
1077 | rd_kafka_buf_destroy(rkbuf); |
1078 | return RD_KAFKA_RESP_ERR__BAD_MSG; |
1079 | } |
1080 | |
1081 | hdrs = rd_kafka_headers_new((size_t)HeaderCount); |
1082 | |
1083 | for (i = 0 ; (int64_t)i < HeaderCount ; i++) { |
1084 | int64_t KeyLen, ValueLen; |
1085 | const char *Key, *Value; |
1086 | |
1087 | rd_kafka_buf_read_varint(rkbuf, &KeyLen); |
1088 | rd_kafka_buf_read_ptr(rkbuf, &Key, (size_t)KeyLen); |
1089 | |
1090 | rd_kafka_buf_read_varint(rkbuf, &ValueLen); |
1091 | if (unlikely(ValueLen == -1)) |
1092 | Value = NULL; |
1093 | else |
1094 | rd_kafka_buf_read_ptr(rkbuf, &Value, (size_t)ValueLen); |
1095 | |
1096 | rd_kafka_header_add(hdrs, Key, (ssize_t)KeyLen, |
1097 | Value, (ssize_t)ValueLen); |
1098 | } |
1099 | |
1100 | rkm->rkm_headers = hdrs; |
1101 | |
1102 | rd_kafka_buf_destroy(rkbuf); |
1103 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
1104 | |
1105 | err_parse: |
1106 | err = rkbuf->rkbuf_err; |
1107 | rd_kafka_buf_destroy(rkbuf); |
1108 | if (hdrs) |
1109 | rd_kafka_headers_destroy(hdrs); |
1110 | return err; |
1111 | } |
1112 | |
1113 | |
1114 | |
1115 | |
1116 | rd_kafka_resp_err_t |
1117 | (const rd_kafka_message_t *rkmessage, |
1118 | rd_kafka_headers_t **hdrsp) { |
1119 | rd_kafka_msg_t *rkm; |
1120 | rd_kafka_resp_err_t err; |
1121 | |
1122 | rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); |
1123 | |
1124 | if (rkm->rkm_headers) { |
1125 | *hdrsp = rkm->rkm_headers; |
1126 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
1127 | } |
1128 | |
1129 | /* Producer (rkm_headers will be set if there were any headers) */ |
1130 | if (rkm->rkm_flags & RD_KAFKA_MSG_F_PRODUCER) |
1131 | return RD_KAFKA_RESP_ERR__NOENT; |
1132 | |
1133 | /* Consumer */ |
1134 | |
1135 | /* No previously parsed headers, check if the underlying |
1136 | * protocol message had headers and if so, parse them. */ |
1137 | if (unlikely(!RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs))) |
1138 | return RD_KAFKA_RESP_ERR__NOENT; |
1139 | |
1140 | err = rd_kafka_msg_headers_parse(rkm); |
1141 | if (unlikely(err)) |
1142 | return err; |
1143 | |
1144 | *hdrsp = rkm->rkm_headers; |
1145 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
1146 | } |
1147 | |
1148 | |
1149 | rd_kafka_resp_err_t |
1150 | (rd_kafka_message_t *rkmessage, |
1151 | rd_kafka_headers_t **hdrsp) { |
1152 | rd_kafka_msg_t *rkm; |
1153 | rd_kafka_resp_err_t err; |
1154 | |
1155 | err = rd_kafka_message_headers(rkmessage, hdrsp); |
1156 | if (err) |
1157 | return err; |
1158 | |
1159 | rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); |
1160 | rkm->rkm_headers = NULL; |
1161 | |
1162 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
1163 | } |
1164 | |
1165 | |
1166 | void (rd_kafka_message_t *rkmessage, |
1167 | rd_kafka_headers_t *hdrs) { |
1168 | rd_kafka_msg_t *rkm; |
1169 | |
1170 | rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); |
1171 | |
1172 | if (rkm->rkm_headers) { |
1173 | assert(rkm->rkm_headers != hdrs); |
1174 | rd_kafka_headers_destroy(rkm->rkm_headers); |
1175 | } |
1176 | |
1177 | rkm->rkm_headers = hdrs; |
1178 | } |
1179 | |
1180 | |
1181 | |
1182 | rd_kafka_msg_status_t |
1183 | rd_kafka_message_status (const rd_kafka_message_t *rkmessage) { |
1184 | rd_kafka_msg_t *rkm; |
1185 | |
1186 | rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); |
1187 | |
1188 | return rkm->rkm_status; |
1189 | } |
1190 | |
1191 | |
1192 | void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq) { |
1193 | rd_kafka_msg_t *rkm; |
1194 | |
1195 | fprintf(fp, "%s msgq_dump (%d messages, %" PRIusz" bytes):\n" , what, |
1196 | rd_kafka_msgq_len(rkmq), rd_kafka_msgq_size(rkmq)); |
1197 | TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { |
1198 | fprintf(fp, " [%" PRId32"]@%" PRId64 |
1199 | ": rkm msgid %" PRIu64": \"%.*s\"\n" , |
1200 | rkm->rkm_partition, rkm->rkm_offset, |
1201 | rkm->rkm_u.producer.msgid, |
1202 | (int)rkm->rkm_len, (const char *)rkm->rkm_payload); |
1203 | } |
1204 | } |
1205 | |
1206 | |
1207 | |
1208 | |
1209 | /** |
1210 | * @brief Destroy resources associated with msgbatch |
1211 | */ |
1212 | void rd_kafka_msgbatch_destroy (rd_kafka_msgbatch_t *rkmb) { |
1213 | if (rkmb->s_rktp) { |
1214 | rd_kafka_toppar_destroy(rkmb->s_rktp); |
1215 | rkmb->s_rktp = NULL; |
1216 | } |
1217 | |
1218 | rd_assert(RD_KAFKA_MSGQ_EMPTY(&rkmb->msgq)); |
1219 | } |
1220 | |
1221 | |
1222 | /** |
1223 | * @brief Initialize a message batch for the Idempotent Producer. |
1224 | * |
1225 | * @param rkm is the first message in the batch. |
1226 | */ |
1227 | void rd_kafka_msgbatch_init (rd_kafka_msgbatch_t *rkmb, |
1228 | rd_kafka_toppar_t *rktp, |
1229 | rd_kafka_pid_t pid) { |
1230 | memset(rkmb, 0, sizeof(*rkmb)); |
1231 | |
1232 | rkmb->s_rktp = rd_kafka_toppar_keep(rktp); |
1233 | |
1234 | rd_kafka_msgq_init(&rkmb->msgq); |
1235 | |
1236 | rkmb->pid = pid; |
1237 | rkmb->first_seq = -1; |
1238 | } |
1239 | |
1240 | |
1241 | /** |
1242 | * @brief Set the first message in the batch. which is used to set |
1243 | * the BaseSequence and keep track of batch reconstruction range. |
1244 | */ |
1245 | void rd_kafka_msgbatch_set_first_msg (rd_kafka_msgbatch_t *rkmb, |
1246 | rd_kafka_msg_t *rkm) { |
1247 | rd_assert(rkmb->first_msgid == 0); |
1248 | |
1249 | if (!rd_kafka_pid_valid(rkmb->pid)) |
1250 | return; |
1251 | |
1252 | rkmb->first_msgid = rkm->rkm_u.producer.msgid; |
1253 | |
1254 | /* Our msgid counter is 64-bits, but the |
1255 | * Kafka protocol's sequence is only 31 (signed), so we'll |
1256 | * need to handle wrapping. */ |
1257 | rkmb->first_seq = |
1258 | rd_kafka_seq_wrap(rkm->rkm_u.producer.msgid - |
1259 | rd_kafka_toppar_s2i(rkmb->s_rktp)-> |
1260 | rktp_eos.epoch_base_msgid); |
1261 | |
1262 | /* Check if there is a stored last message |
1263 | * on the first msg, which means an entire |
1264 | * batch of messages are being retried and |
1265 | * we need to maintain the exact messages |
1266 | * of the original batch. |
1267 | * Simply tracking the last message, on |
1268 | * the first message, is sufficient for now. |
1269 | * Will be 0 if not applicable. */ |
1270 | rkmb->last_msgid = rkm->rkm_u.producer.last_msgid; |
1271 | } |
1272 | |
1273 | |
1274 | |
1275 | /** |
1276 | * @brief Message batch is ready to be transmitted. |
1277 | * |
1278 | * @remark This function assumes the batch will be transmitted and increases |
1279 | * the toppar's in-flight count. |
1280 | */ |
1281 | void rd_kafka_msgbatch_ready_produce (rd_kafka_msgbatch_t *rkmb) { |
1282 | rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rkmb->s_rktp); |
1283 | rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; |
1284 | |
1285 | /* Keep track of number of requests in-flight per partition, |
1286 | * and the number of partitions with in-flight requests when |
1287 | * idempotent producer - this is used to drain partitions |
1288 | * before resetting the PID. */ |
1289 | if (rd_atomic32_add(&rktp->rktp_msgs_inflight, |
1290 | rd_kafka_msgq_len(&rkmb->msgq)) == |
1291 | rd_kafka_msgq_len(&rkmb->msgq) && |
1292 | rd_kafka_is_idempotent(rk)) |
1293 | rd_kafka_idemp_inflight_toppar_add(rk, rktp); |
1294 | } |
1295 | |
1296 | |
1297 | /** |
1298 | * @brief Verify order (by msgid) in message queue. |
1299 | * For development use only. |
1300 | */ |
1301 | void rd_kafka_msgq_verify_order0 (const char *function, int line, |
1302 | const rd_kafka_toppar_t *rktp, |
1303 | const rd_kafka_msgq_t *rkmq, |
1304 | uint64_t exp_first_msgid, |
1305 | rd_bool_t gapless) { |
1306 | const rd_kafka_msg_t *rkm; |
1307 | uint64_t exp; |
1308 | int errcnt = 0; |
1309 | int cnt = 0; |
1310 | const char *topic = rktp ? rktp->rktp_rkt->rkt_topic->str : "n/a" ; |
1311 | int32_t partition = rktp ? rktp->rktp_partition : -1; |
1312 | |
1313 | if (rd_kafka_msgq_len(rkmq) == 0) |
1314 | return; |
1315 | |
1316 | if (exp_first_msgid) |
1317 | exp = exp_first_msgid; |
1318 | else { |
1319 | exp = rd_kafka_msgq_first(rkmq)->rkm_u.producer.msgid; |
1320 | if (exp == 0) /* message without msgid (e.g., UA partition) */ |
1321 | return; |
1322 | } |
1323 | |
1324 | TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { |
1325 | #if 0 |
1326 | printf("%s:%d: %s [%" PRId32"]: rkm #%d (%p) " |
1327 | "msgid %" PRIu64"\n" , |
1328 | function, line, |
1329 | topic, partition, |
1330 | cnt, rkm, rkm->rkm_u.producer.msgid); |
1331 | #endif |
1332 | if (gapless && |
1333 | rkm->rkm_u.producer.msgid != exp) { |
1334 | printf("%s:%d: %s [%" PRId32"]: rkm #%d (%p) " |
1335 | "msgid %" PRIu64": " |
1336 | "expected msgid %" PRIu64"\n" , |
1337 | function, line, |
1338 | topic, partition, |
1339 | cnt, rkm, rkm->rkm_u.producer.msgid, |
1340 | exp); |
1341 | errcnt++; |
1342 | } else if (!gapless && rkm->rkm_u.producer.msgid < exp) { |
1343 | printf("%s:%d: %s [%" PRId32"]: rkm #%d (%p) " |
1344 | "msgid %" PRIu64": " |
1345 | "expected increased msgid >= %" PRIu64"\n" , |
1346 | function, line, |
1347 | topic, partition, |
1348 | cnt, rkm, rkm->rkm_u.producer.msgid, |
1349 | exp); |
1350 | errcnt++; |
1351 | } else |
1352 | exp++; |
1353 | |
1354 | cnt++; |
1355 | } |
1356 | |
1357 | rd_assert(!errcnt); |
1358 | } |
1359 | |
1360 | |
1361 | |
1362 | /** |
1363 | * @name Unit tests |
1364 | */ |
1365 | |
1366 | /** |
1367 | * @brief Unittest: message allocator |
1368 | */ |
1369 | rd_kafka_msg_t *ut_rd_kafka_msg_new (void) { |
1370 | rd_kafka_msg_t *rkm; |
1371 | |
1372 | rkm = rd_calloc(1, sizeof(*rkm)); |
1373 | rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM; |
1374 | rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID; |
1375 | rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; |
1376 | |
1377 | return rkm; |
1378 | } |
1379 | |
1380 | |
1381 | |
1382 | /** |
1383 | * @brief Unittest: destroy all messages in queue |
1384 | */ |
1385 | void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq) { |
1386 | rd_kafka_msg_t *rkm, *tmp; |
1387 | |
1388 | TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) |
1389 | rd_kafka_msg_destroy(NULL, rkm); |
1390 | |
1391 | |
1392 | rd_kafka_msgq_init(rkmq); |
1393 | } |
1394 | |
1395 | |
1396 | |
1397 | static int ut_verify_msgq_order (const char *what, |
1398 | const rd_kafka_msgq_t *rkmq, |
1399 | int first, int last) { |
1400 | const rd_kafka_msg_t *rkm; |
1401 | uint64_t expected = first; |
1402 | int incr = first < last ? +1 : -1; |
1403 | int fails = 0; |
1404 | int cnt = 0; |
1405 | |
1406 | TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { |
1407 | if (rkm->rkm_u.producer.msgid != expected) { |
1408 | RD_UT_SAY("%s: expected msgid %" PRIu64 |
1409 | " not %" PRIu64" at index #%d" , |
1410 | what, expected, |
1411 | rkm->rkm_u.producer.msgid, cnt); |
1412 | fails++; |
1413 | } |
1414 | cnt++; |
1415 | expected += incr; |
1416 | } |
1417 | |
1418 | RD_UT_ASSERT(!fails, "See %d previous failure(s)" , fails); |
1419 | return fails; |
1420 | } |
1421 | |
1422 | /** |
1423 | * @brief Verify ordering comparator for message queues. |
1424 | */ |
1425 | static int unittest_msgq_order (const char *what, int fifo, |
1426 | int (*cmp) (const void *, const void *)) { |
1427 | rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); |
1428 | rd_kafka_msg_t *rkm; |
1429 | rd_kafka_msgq_t sendq, sendq2; |
1430 | int i; |
1431 | |
1432 | RD_UT_SAY("%s: testing in %s mode" , what, fifo? "FIFO" : "LIFO" ); |
1433 | |
1434 | for (i = 1 ; i <= 6 ; i++) { |
1435 | rkm = ut_rd_kafka_msg_new(); |
1436 | rkm->rkm_u.producer.msgid = i; |
1437 | rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp); |
1438 | } |
1439 | |
1440 | if (fifo) { |
1441 | if (ut_verify_msgq_order("added" , &rkmq, 1, 6)) |
1442 | return 1; |
1443 | } else { |
1444 | if (ut_verify_msgq_order("added" , &rkmq, 6, 1)) |
1445 | return 1; |
1446 | } |
1447 | |
1448 | /* Move 3 messages to "send" queue which we then re-insert |
1449 | * in the original queue (i.e., "retry"). */ |
1450 | rd_kafka_msgq_init(&sendq); |
1451 | while (rd_kafka_msgq_len(&sendq) < 3) |
1452 | rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq)); |
1453 | |
1454 | if (fifo) { |
1455 | if (ut_verify_msgq_order("send removed" , &rkmq, 4, 6)) |
1456 | return 1; |
1457 | |
1458 | if (ut_verify_msgq_order("sendq" , &sendq, 1, 3)) |
1459 | return 1; |
1460 | } else { |
1461 | if (ut_verify_msgq_order("send removed" , &rkmq, 3, 1)) |
1462 | return 1; |
1463 | |
1464 | if (ut_verify_msgq_order("sendq" , &sendq, 6, 4)) |
1465 | return 1; |
1466 | } |
1467 | |
1468 | /* Retry the messages, which moves them back to sendq |
1469 | * maintaining the original order */ |
1470 | rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, |
1471 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); |
1472 | |
1473 | RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, |
1474 | "sendq FIFO should be empty, not contain %d messages" , |
1475 | rd_kafka_msgq_len(&sendq)); |
1476 | |
1477 | if (fifo) { |
1478 | if (ut_verify_msgq_order("readded" , &rkmq, 1, 6)) |
1479 | return 1; |
1480 | } else { |
1481 | if (ut_verify_msgq_order("readded" , &rkmq, 6, 1)) |
1482 | return 1; |
1483 | } |
1484 | |
1485 | /* Move 4 first messages to to "send" queue, then |
1486 | * retry them with max_retries=1 which should now fail for |
1487 | * the 3 first messages that were already retried. */ |
1488 | rd_kafka_msgq_init(&sendq); |
1489 | while (rd_kafka_msgq_len(&sendq) < 4) |
1490 | rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq)); |
1491 | |
1492 | if (fifo) { |
1493 | if (ut_verify_msgq_order("send removed #2" , &rkmq, 5, 6)) |
1494 | return 1; |
1495 | |
1496 | if (ut_verify_msgq_order("sendq #2" , &sendq, 1, 4)) |
1497 | return 1; |
1498 | } else { |
1499 | if (ut_verify_msgq_order("send removed #2" , &rkmq, 2, 1)) |
1500 | return 1; |
1501 | |
1502 | if (ut_verify_msgq_order("sendq #2" , &sendq, 6, 3)) |
1503 | return 1; |
1504 | } |
1505 | |
1506 | /* Retry the messages, which should now keep the 3 first messages |
1507 | * on sendq (no more retries) and just number 4 moved back. */ |
1508 | rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, |
1509 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); |
1510 | |
1511 | if (fifo) { |
1512 | if (ut_verify_msgq_order("readded #2" , &rkmq, 4, 6)) |
1513 | return 1; |
1514 | |
1515 | if (ut_verify_msgq_order("no more retries" , &sendq, 1, 3)) |
1516 | return 1; |
1517 | |
1518 | } else { |
1519 | if (ut_verify_msgq_order("readded #2" , &rkmq, 3, 1)) |
1520 | return 1; |
1521 | |
1522 | if (ut_verify_msgq_order("no more retries" , &sendq, 6, 4)) |
1523 | return 1; |
1524 | } |
1525 | |
1526 | /* Move all messages back on rkmq */ |
1527 | rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, |
1528 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); |
1529 | |
1530 | |
1531 | /* Move first half of messages to sendq (1,2,3). |
1532 | * Move second half o messages to sendq2 (4,5,6). |
1533 | * Add new message to rkmq (7). |
1534 | * Move first half of messages back on rkmq (1,2,3,7). |
1535 | * Move second half back on the rkmq (1,2,3,4,5,6,7). */ |
1536 | rd_kafka_msgq_init(&sendq); |
1537 | rd_kafka_msgq_init(&sendq2); |
1538 | |
1539 | while (rd_kafka_msgq_len(&sendq) < 3) |
1540 | rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq)); |
1541 | |
1542 | while (rd_kafka_msgq_len(&sendq2) < 3) |
1543 | rd_kafka_msgq_enq(&sendq2, rd_kafka_msgq_pop(&rkmq)); |
1544 | |
1545 | rkm = ut_rd_kafka_msg_new(); |
1546 | rkm->rkm_u.producer.msgid = i; |
1547 | rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp); |
1548 | |
1549 | rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, |
1550 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); |
1551 | rd_kafka_retry_msgq(&rkmq, &sendq2, 0, 1000, 0, |
1552 | RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); |
1553 | |
1554 | RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, |
1555 | "sendq FIFO should be empty, not contain %d messages" , |
1556 | rd_kafka_msgq_len(&sendq)); |
1557 | RD_UT_ASSERT(rd_kafka_msgq_len(&sendq2) == 0, |
1558 | "sendq2 FIFO should be empty, not contain %d messages" , |
1559 | rd_kafka_msgq_len(&sendq2)); |
1560 | |
1561 | if (fifo) { |
1562 | if (ut_verify_msgq_order("inject" , &rkmq, 1, 7)) |
1563 | return 1; |
1564 | } else { |
1565 | if (ut_verify_msgq_order("readded #2" , &rkmq, 7, 1)) |
1566 | return 1; |
1567 | } |
1568 | |
1569 | |
1570 | ut_rd_kafka_msgq_purge(&sendq); |
1571 | ut_rd_kafka_msgq_purge(&sendq2); |
1572 | ut_rd_kafka_msgq_purge(&rkmq); |
1573 | |
1574 | return 0; |
1575 | |
1576 | } |
1577 | |
1578 | /** |
1579 | * @brief Verify that rd_kafka_seq_wrap() works. |
1580 | */ |
1581 | static int unittest_msg_seq_wrap (void) { |
1582 | static const struct exp { |
1583 | int64_t in; |
1584 | int32_t out; |
1585 | } exp[] = { |
1586 | { 0, 0 }, |
1587 | { 1, 1 }, |
1588 | { (int64_t)INT32_MAX+2, 1 }, |
1589 | { (int64_t)INT32_MAX+1, 0 }, |
1590 | { INT32_MAX, INT32_MAX }, |
1591 | { INT32_MAX-1, INT32_MAX-1 }, |
1592 | { INT32_MAX-2, INT32_MAX-2 }, |
1593 | { ((int64_t)1<<33)-2, INT32_MAX-1 }, |
1594 | { ((int64_t)1<<33)-1, INT32_MAX }, |
1595 | { ((int64_t)1<<34), 0 }, |
1596 | { ((int64_t)1<<35)+3, 3 }, |
1597 | { 1710+1229, 2939 }, |
1598 | { -1, -1 }, |
1599 | }; |
1600 | int i; |
1601 | |
1602 | for (i = 0 ; exp[i].in != -1 ; i++) { |
1603 | int32_t wseq = rd_kafka_seq_wrap(exp[i].in); |
1604 | RD_UT_ASSERT(wseq == exp[i].out, |
1605 | "Expected seq_wrap(%" PRId64") -> %" PRId32 |
1606 | ", not %" PRId32, |
1607 | exp[i].in, exp[i].out, wseq); |
1608 | } |
1609 | |
1610 | RD_UT_PASS(); |
1611 | } |
1612 | |
1613 | int unittest_msg (void) { |
1614 | int fails = 0; |
1615 | |
1616 | fails += unittest_msgq_order("FIFO" , 1, rd_kafka_msg_cmp_msgid); |
1617 | fails += unittest_msg_seq_wrap(); |
1618 | |
1619 | return fails; |
1620 | } |
1621 | |