1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * @name MessageSet reader interface
31 *
32 * Parses FetchResponse for Messages
33 *
34 *
35 * @remark
36 * The broker may send partial messages, when this happens we bail out
37 * silently and keep the messages that we successfully parsed.
38 *
39 * "A Guide To The Kafka Protocol" states:
40 * "As an optimization the server is allowed to
41 * return a partial message at the end of the
42 * message set.
43 * Clients should handle this case."
44 *
45 * We're handling it by not passing the error upstream.
46 * This is why most err_parse: goto labels (that are called from buf parsing
47 * macros) suppress the error message and why log_decode_errors is off
48 * unless PROTOCOL debugging is enabled.
49 *
50 * When a FetchResponse contains multiple partitions, each partition's
51 * MessageSet may be partial, regardless of the other partitions.
52 * To make sure the next partition can be parsed, each partition parse
53 * uses its own sub-slice of only that partition's MessageSetSize length.
54 */
55
56#include "rd.h"
57#include "rdkafka_int.h"
58#include "rdkafka_msg.h"
59#include "rdkafka_msgset.h"
60#include "rdkafka_topic.h"
61#include "rdkafka_partition.h"
62#include "rdkafka_header.h"
63#include "rdkafka_lz4.h"
64
65#include "rdvarint.h"
66#include "crc32c.h"
67
68#if WITH_ZLIB
69#include "rdgz.h"
70#endif
71#if WITH_SNAPPY
72#include "snappy.h"
73#endif
74#if WITH_ZSTD
75#include "rdkafka_zstd.h"
76#endif
77
78
79struct msgset_v2_hdr {
80 int64_t BaseOffset;
81 int32_t Length;
82 int32_t PartitionLeaderEpoch;
83 int8_t MagicByte;
84 int32_t Crc;
85 int16_t Attributes;
86 int32_t LastOffsetDelta;
87 int64_t BaseTimestamp;
88 int64_t MaxTimestamp;
89 int64_t PID;
90 int16_t ProducerEpoch;
91 int32_t BaseSequence;
92 int32_t RecordCount;
93};
94
95
96typedef struct rd_kafka_msgset_reader_s {
97 rd_kafka_buf_t *msetr_rkbuf; /**< Response read buffer */
98
99 int msetr_relative_offsets; /**< Bool: using relative offsets */
100
101 /**< Outer/wrapper Message fields. */
102 struct {
103 int64_t offset; /**< Relative_offsets: outer message's
104 * Offset (last offset) */
105 rd_kafka_timestamp_type_t tstype; /**< Compressed
106 * MessageSet's
107 * timestamp type. */
108 int64_t timestamp; /**< ... timestamp*/
109 } msetr_outer;
110
111 struct msgset_v2_hdr *msetr_v2_hdr; /**< MessageSet v2 header */
112
113 const struct rd_kafka_toppar_ver *msetr_tver; /**< Toppar op version of
114 * request. */
115
116 rd_kafka_broker_t *msetr_rkb; /* @warning Not a refcounted
117 * reference! */
118 rd_kafka_toppar_t *msetr_rktp; /* @warning Not a refcounted
119 * reference! */
120
121 int msetr_msgcnt; /**< Number of messages in rkq */
122 int64_t msetr_msg_bytes; /**< Number of bytes in rkq */
123 rd_kafka_q_t msetr_rkq; /**< Temp Message and error queue */
124 rd_kafka_q_t *msetr_par_rkq; /**< Parent message and error queue,
125 * the temp msetr_rkq will be moved
126 * to this queue when parsing
127 * is done.
128 * Refcount is not increased. */
129
130 int64_t msetr_next_offset; /**< Next offset to fetch after
131 * this reader run is done.
132 * Optional: only used for special
133 * cases where the per-message offset
134 * can't be relied on for next
135 * fetch offset, such as with
136 * compacted topics. */
137
138 int msetr_ctrl_cnt; /**< Number of control messages
139 * or MessageSets received. */
140
141 const char *msetr_srcname; /**< Optional message source string,
142 * used in debug logging to
143 * indicate messages were
144 * from an inner compressed
145 * message set.
146 * Not freed (use const memory).
147 * Add trailing space. */
148} rd_kafka_msgset_reader_t;
149
150
151
152/* Forward declarations */
153static rd_kafka_resp_err_t
154rd_kafka_msgset_reader_run (rd_kafka_msgset_reader_t *msetr);
155static rd_kafka_resp_err_t
156rd_kafka_msgset_reader_msgs_v2 (rd_kafka_msgset_reader_t *msetr);
157
158
159/**
160 * @brief Set up a MessageSet reader but don't start reading messages.
161 */
162static void
163rd_kafka_msgset_reader_init (rd_kafka_msgset_reader_t *msetr,
164 rd_kafka_buf_t *rkbuf,
165 rd_kafka_toppar_t *rktp,
166 const struct rd_kafka_toppar_ver *tver,
167 rd_kafka_q_t *par_rkq) {
168
169 memset(msetr, 0, sizeof(*msetr));
170
171 msetr->msetr_rkb = rkbuf->rkbuf_rkb;
172 msetr->msetr_rktp = rktp;
173 msetr->msetr_tver = tver;
174 msetr->msetr_rkbuf = rkbuf;
175 msetr->msetr_srcname = "";
176
177 rkbuf->rkbuf_uflow_mitigation = "truncated response from broker (ok)";
178
179 /* All parsed messages are put on this temporary op
180 * queue first and then moved in one go to the real op queue. */
181 rd_kafka_q_init(&msetr->msetr_rkq, msetr->msetr_rkb->rkb_rk);
182
183 /* Make sure enqueued ops get the correct serve/opaque reflecting the
184 * original queue. */
185 msetr->msetr_rkq.rkq_serve = par_rkq->rkq_serve;
186 msetr->msetr_rkq.rkq_opaque = par_rkq->rkq_opaque;
187
188 /* Keep (non-refcounted) reference to parent queue for
189 * moving the messages and events in msetr_rkq to when
190 * parsing is done. */
191 msetr->msetr_par_rkq = par_rkq;
192}
193
194
195
196
197
198/**
199 * @brief Decompress MessageSet, pass the uncompressed MessageSet to
200 * the MessageSet reader.
201 */
202static rd_kafka_resp_err_t
203rd_kafka_msgset_reader_decompress (rd_kafka_msgset_reader_t *msetr,
204 int MsgVersion, int Attributes,
205 int64_t Timestamp, int64_t Offset,
206 const void *compressed,
207 size_t compressed_size) {
208 struct iovec iov = { .iov_base = NULL, .iov_len = 0 };
209 rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
210 int codec = Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK;
211 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
212 rd_kafka_buf_t *rkbufz;
213
214 switch (codec)
215 {
216#if WITH_ZLIB
217 case RD_KAFKA_COMPRESSION_GZIP:
218 {
219 uint64_t outlenx = 0;
220
221 /* Decompress Message payload */
222 iov.iov_base = rd_gz_decompress(compressed, (int)compressed_size,
223 &outlenx);
224 if (unlikely(!iov.iov_base)) {
225 rd_rkb_dbg(msetr->msetr_rkb, MSG, "GZIP",
226 "Failed to decompress Gzip "
227 "message at offset %"PRId64
228 " of %"PRIusz" bytes: "
229 "ignoring message",
230 Offset, compressed_size);
231 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
232 goto err;
233 }
234
235 iov.iov_len = (size_t)outlenx;
236 }
237 break;
238#endif
239
240#if WITH_SNAPPY
241 case RD_KAFKA_COMPRESSION_SNAPPY:
242 {
243 const char *inbuf = compressed;
244 size_t inlen = compressed_size;
245 int r;
246 static const unsigned char snappy_java_magic[] =
247 { 0x82, 'S','N','A','P','P','Y', 0 };
248 static const size_t snappy_java_hdrlen = 8+4+4;
249
250 /* snappy-java adds its own header (SnappyCodec)
251 * which is not compatible with the official Snappy
252 * implementation.
253 * 8: magic, 4: version, 4: compatible
254 * followed by any number of chunks:
255 * 4: length
256 * ...: snappy-compressed data. */
257 if (likely(inlen > snappy_java_hdrlen + 4 &&
258 !memcmp(inbuf, snappy_java_magic, 8))) {
259 /* snappy-java framing */
260 char errstr[128];
261
262 inbuf = inbuf + snappy_java_hdrlen;
263 inlen -= snappy_java_hdrlen;
264 iov.iov_base = rd_kafka_snappy_java_uncompress(
265 inbuf, inlen,
266 &iov.iov_len,
267 errstr, sizeof(errstr));
268
269 if (unlikely(!iov.iov_base)) {
270 rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY",
271 "%s [%"PRId32"]: "
272 "Snappy decompression for message "
273 "at offset %"PRId64" failed: %s: "
274 "ignoring message",
275 rktp->rktp_rkt->rkt_topic->str,
276 rktp->rktp_partition,
277 Offset, errstr);
278 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
279 goto err;
280 }
281
282
283 } else {
284 /* No framing */
285
286 /* Acquire uncompressed length */
287 if (unlikely(!rd_kafka_snappy_uncompressed_length(
288 inbuf, inlen, &iov.iov_len))) {
289 rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY",
290 "Failed to get length of Snappy "
291 "compressed payload "
292 "for message at offset %"PRId64
293 " (%"PRIusz" bytes): "
294 "ignoring message",
295 Offset, inlen);
296 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
297 goto err;
298 }
299
300 /* Allocate output buffer for uncompressed data */
301 iov.iov_base = rd_malloc(iov.iov_len);
302 if (unlikely(!iov.iov_base)) {
303 rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY",
304 "Failed to allocate Snappy "
305 "decompress buffer of size %"PRIusz
306 "for message at offset %"PRId64
307 " (%"PRIusz" bytes): %s: "
308 "ignoring message",
309 iov.iov_len, Offset, inlen,
310 rd_strerror(errno));
311 err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
312 goto err;
313 }
314
315 /* Uncompress to outbuf */
316 if (unlikely((r = rd_kafka_snappy_uncompress(
317 inbuf, inlen, iov.iov_base)))) {
318 rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY",
319 "Failed to decompress Snappy "
320 "payload for message at offset "
321 "%"PRId64" (%"PRIusz" bytes): %s: "
322 "ignoring message",
323 Offset, inlen,
324 rd_strerror(-r/*negative errno*/));
325 rd_free(iov.iov_base);
326 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
327 goto err;
328 }
329 }
330
331 }
332 break;
333#endif
334
335 case RD_KAFKA_COMPRESSION_LZ4:
336 {
337 err = rd_kafka_lz4_decompress(msetr->msetr_rkb,
338 /* Proper HC? */
339 MsgVersion >= 1 ? 1 : 0,
340 Offset,
341 /* @warning Will modify compressed
342 * if no proper HC */
343 (char *)compressed,
344 compressed_size,
345 &iov.iov_base, &iov.iov_len);
346 if (err)
347 goto err;
348 }
349 break;
350
351#if WITH_ZSTD
352 case RD_KAFKA_COMPRESSION_ZSTD:
353 {
354 err = rd_kafka_zstd_decompress(msetr->msetr_rkb,
355 (char *)compressed,
356 compressed_size,
357 &iov.iov_base, &iov.iov_len);
358 if (err)
359 goto err;
360 }
361 break;
362#endif
363
364 default:
365 rd_rkb_dbg(msetr->msetr_rkb, MSG, "CODEC",
366 "%s [%"PRId32"]: Message at offset %"PRId64
367 " with unsupported "
368 "compression codec 0x%x: message ignored",
369 rktp->rktp_rkt->rkt_topic->str,
370 rktp->rktp_partition,
371 Offset, (int)codec);
372
373 err = RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
374 goto err;
375 }
376
377
378 rd_assert(iov.iov_base);
379
380 /*
381 * Decompression successful
382 */
383
384 /* Create a new buffer pointing to the uncompressed
385 * allocated buffer (outbuf) and let messages keep a reference to
386 * this new buffer. */
387 rkbufz = rd_kafka_buf_new_shadow(iov.iov_base, iov.iov_len, rd_free);
388 rkbufz->rkbuf_rkb = msetr->msetr_rkbuf->rkbuf_rkb;
389 rd_kafka_broker_keep(rkbufz->rkbuf_rkb);
390
391
392 /* In MsgVersion v0..1 the decompressed data contains
393 * an inner MessageSet, pass it to a new MessageSet reader.
394 *
395 * For MsgVersion v2 the decompressed data are the list of messages.
396 */
397
398 if (MsgVersion <= 1) {
399 /* Pass decompressed data (inner Messageset)
400 * to new instance of the MessageSet parser. */
401 rd_kafka_msgset_reader_t inner_msetr;
402 rd_kafka_msgset_reader_init(&inner_msetr,
403 rkbufz,
404 msetr->msetr_rktp,
405 msetr->msetr_tver,
406 &msetr->msetr_rkq);
407
408 inner_msetr.msetr_srcname = "compressed ";
409
410 if (MsgVersion == 1) {
411 /* postproc() will convert relative to
412 * absolute offsets */
413 inner_msetr.msetr_relative_offsets = 1;
414 inner_msetr.msetr_outer.offset = Offset;
415
416 /* Apply single LogAppendTime timestamp for
417 * all messages. */
418 if (Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) {
419 inner_msetr.msetr_outer.tstype =
420 RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
421 inner_msetr.msetr_outer.timestamp = Timestamp;
422 }
423 }
424
425 /* Parse the inner MessageSet */
426 err = rd_kafka_msgset_reader_run(&inner_msetr);
427
428 /* Transfer message count from inner to outer */
429 msetr->msetr_msgcnt += inner_msetr.msetr_msgcnt;
430 msetr->msetr_msg_bytes += inner_msetr.msetr_msg_bytes;
431
432
433 } else {
434 /* MsgVersion 2 */
435 rd_kafka_buf_t *orig_rkbuf = msetr->msetr_rkbuf;
436
437 rkbufz->rkbuf_uflow_mitigation =
438 "truncated response from broker (ok)";
439
440 /* Temporarily replace read buffer with uncompressed buffer */
441 msetr->msetr_rkbuf = rkbufz;
442
443 /* Read messages */
444 err = rd_kafka_msgset_reader_msgs_v2(msetr);
445
446 /* Restore original buffer */
447 msetr->msetr_rkbuf = orig_rkbuf;
448 }
449
450 /* Loose our refcnt of the uncompressed rkbuf.
451 * Individual messages/rko's will have their own reference. */
452 rd_kafka_buf_destroy(rkbufz);
453
454 return err;
455
456 err:
457 /* Enqueue error messsage:
458 * Create op and push on temporary queue. */
459 rd_kafka_q_op_err(&msetr->msetr_rkq, RD_KAFKA_OP_CONSUMER_ERR,
460 err, msetr->msetr_tver->version, rktp, Offset,
461 "Decompression (codec 0x%x) of message at %"PRIu64
462 " of %"PRIu64" bytes failed: %s",
463 codec, Offset, compressed_size, rd_kafka_err2str(err));
464
465 return err;
466
467}
468
469
470
471/**
472 * @brief Message parser for MsgVersion v0..1
473 *
474 * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or on single-message errors,
475 * or any other error code when the MessageSet parser should stop
476 * parsing (such as for partial Messages).
477 */
478static rd_kafka_resp_err_t
479rd_kafka_msgset_reader_msg_v0_1 (rd_kafka_msgset_reader_t *msetr) {
480 rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf;
481 rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
482 rd_kafka_broker_t *rkb = msetr->msetr_rkb;
483 struct {
484 int64_t Offset; /* MessageSet header */
485 int32_t MessageSize; /* MessageSet header */
486 uint32_t Crc;
487 int8_t MagicByte; /* MsgVersion */
488 int8_t Attributes;
489 int64_t Timestamp; /* v1 */
490 } hdr; /* Message header */
491 rd_kafkap_bytes_t Key;
492 rd_kafkap_bytes_t Value;
493 int32_t Value_len;
494 rd_kafka_op_t *rko;
495 size_t hdrsize = 6; /* Header size following MessageSize */
496 rd_slice_t crc_slice;
497 rd_kafka_msg_t *rkm;
498 int relative_offsets = 0;
499 const char *reloff_str = "";
500 /* Only log decoding errors if protocol debugging enabled. */
501 int log_decode_errors = (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug &
502 RD_KAFKA_DBG_PROTOCOL) ? LOG_DEBUG : 0;
503 size_t message_end;
504
505 rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
506 rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSize);
507 message_end = rd_slice_offset(&rkbuf->rkbuf_reader) + hdr.MessageSize;
508
509 rd_kafka_buf_read_i32(rkbuf, &hdr.Crc);
510 if (!rd_slice_narrow_copy_relative(&rkbuf->rkbuf_reader, &crc_slice,
511 hdr.MessageSize - 4))
512 rd_kafka_buf_check_len(rkbuf, hdr.MessageSize - 4);
513
514 rd_kafka_buf_read_i8(rkbuf, &hdr.MagicByte);
515 rd_kafka_buf_read_i8(rkbuf, &hdr.Attributes);
516
517 if (hdr.MagicByte == 1) { /* MsgVersion */
518 rd_kafka_buf_read_i64(rkbuf, &hdr.Timestamp);
519 hdrsize += 8;
520 /* MsgVersion 1 has relative offsets for compressed MessageSets*/
521 if (!(hdr.Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK) &&
522 msetr->msetr_relative_offsets) {
523 relative_offsets = 1;
524 reloff_str = "relative ";
525 }
526 } else
527 hdr.Timestamp = 0;
528
529 /* Verify MessageSize */
530 if (unlikely(hdr.MessageSize < (ssize_t)hdrsize))
531 rd_kafka_buf_parse_fail(rkbuf,
532 "Message at %soffset %"PRId64
533 " MessageSize %"PRId32
534 " < hdrsize %"PRIusz,
535 reloff_str,
536 hdr.Offset, hdr.MessageSize, hdrsize);
537
538 /* Early check for partial messages */
539 rd_kafka_buf_check_len(rkbuf, hdr.MessageSize - hdrsize);
540
541 if (rkb->rkb_rk->rk_conf.check_crcs) {
542 /* Verify CRC32 if desired. */
543 uint32_t calc_crc;
544
545 calc_crc = rd_slice_crc32(&crc_slice);
546 rd_dassert(rd_slice_remains(&crc_slice) == 0);
547
548 if (unlikely(hdr.Crc != calc_crc)) {
549 /* Propagate CRC error to application and
550 * continue with next message. */
551 rd_kafka_q_op_err(&msetr->msetr_rkq,
552 RD_KAFKA_OP_CONSUMER_ERR,
553 RD_KAFKA_RESP_ERR__BAD_MSG,
554 msetr->msetr_tver->version,
555 rktp,
556 hdr.Offset,
557 "Message at %soffset %"PRId64
558 " (%"PRId32" bytes) "
559 "failed CRC32 check "
560 "(original 0x%"PRIx32" != "
561 "calculated 0x%"PRIx32")",
562 reloff_str, hdr.Offset,
563 hdr.MessageSize, hdr.Crc, calc_crc);
564 rd_kafka_buf_skip_to(rkbuf, message_end);
565 rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
566 /* Continue with next message */
567 return RD_KAFKA_RESP_ERR_NO_ERROR;
568 }
569 }
570
571
572 /* Extract key */
573 rd_kafka_buf_read_bytes(rkbuf, &Key);
574
575 /* Extract Value */
576 rd_kafka_buf_read_bytes(rkbuf, &Value);
577 Value_len = RD_KAFKAP_BYTES_LEN(&Value);
578
579 /* MessageSets may contain offsets earlier than we
580 * requested (compressed MessageSets in particular),
581 * drop the earlier messages.
582 * Note: the inner offset may only be trusted for
583 * absolute offsets. KIP-31 introduced
584 * ApiVersion 2 that maintains relative offsets
585 * of compressed messages and the base offset
586 * in the outer message is the offset of
587 * the *LAST* message in the MessageSet.
588 * This requires us to assign offsets
589 * after all messages have been read from
590 * the messageset, and it also means
591 * we cant perform this offset check here
592 * in that case. */
593 if (!relative_offsets &&
594 hdr.Offset < rktp->rktp_offsets.fetch_offset)
595 return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */
596
597 /* Handle compressed MessageSet */
598 if (unlikely(hdr.Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK))
599 return rd_kafka_msgset_reader_decompress(
600 msetr, hdr.MagicByte, hdr.Attributes, hdr.Timestamp,
601 hdr.Offset, Value.data, Value_len);
602
603
604 /* Pure uncompressed message, this is the innermost
605 * handler after all compression and cascaded
606 * MessageSets have been peeled off. */
607
608 /* Create op/message container for message. */
609 rko = rd_kafka_op_new_fetch_msg(&rkm, rktp, msetr->msetr_tver->version,
610 rkbuf,
611 hdr.Offset,
612 (size_t)RD_KAFKAP_BYTES_LEN(&Key),
613 RD_KAFKAP_BYTES_IS_NULL(&Key) ?
614 NULL : Key.data,
615 (size_t)RD_KAFKAP_BYTES_LEN(&Value),
616 RD_KAFKAP_BYTES_IS_NULL(&Value) ?
617 NULL : Value.data);
618
619 /* Assign message timestamp.
620 * If message was in a compressed MessageSet and the outer/wrapper
621 * Message.Attribute had a LOG_APPEND_TIME set, use the
622 * outer timestamp */
623 if (msetr->msetr_outer.tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
624 rkm->rkm_timestamp = msetr->msetr_outer.timestamp;
625 rkm->rkm_tstype = msetr->msetr_outer.tstype;
626
627 } else if (hdr.MagicByte >= 1 && hdr.Timestamp) {
628 rkm->rkm_timestamp = hdr.Timestamp;
629 if (hdr.Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)
630 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
631 else
632 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
633 }
634
635 /* Enqueue message on temporary queue */
636 rd_kafka_q_enq(&msetr->msetr_rkq, rko);
637 msetr->msetr_msgcnt++;
638 msetr->msetr_msg_bytes += rkm->rkm_key_len + rkm->rkm_len;
639
640 return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue */
641
642 err_parse:
643 /* Count all parse errors as partial message errors. */
644 rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1);
645 return rkbuf->rkbuf_err;
646}
647
648
649
650
651/**
652 * @brief Message parser for MsgVersion v2
653 */
654static rd_kafka_resp_err_t
655rd_kafka_msgset_reader_msg_v2 (rd_kafka_msgset_reader_t *msetr) {
656 rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf;
657 rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
658 struct {
659 int64_t Length;
660 int8_t MsgAttributes;
661 int64_t TimestampDelta;
662 int64_t OffsetDelta;
663 int64_t Offset; /* Absolute offset */
664 rd_kafkap_bytes_t Key;
665 rd_kafkap_bytes_t Value;
666 rd_kafkap_bytes_t Headers;
667 } hdr;
668 rd_kafka_op_t *rko;
669 rd_kafka_msg_t *rkm;
670 /* Only log decoding errors if protocol debugging enabled. */
671 int log_decode_errors = (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug &
672 RD_KAFKA_DBG_PROTOCOL) ? LOG_DEBUG : 0;
673 size_t message_end;
674
675 rd_kafka_buf_read_varint(rkbuf, &hdr.Length);
676 message_end = rd_slice_offset(&rkbuf->rkbuf_reader)+(size_t)hdr.Length;
677 rd_kafka_buf_read_i8(rkbuf, &hdr.MsgAttributes);
678
679 rd_kafka_buf_read_varint(rkbuf, &hdr.TimestampDelta);
680 rd_kafka_buf_read_varint(rkbuf, &hdr.OffsetDelta);
681 hdr.Offset = msetr->msetr_v2_hdr->BaseOffset + hdr.OffsetDelta;
682
683 /* Skip message if outdated */
684 if (hdr.Offset < rktp->rktp_offsets.fetch_offset) {
685 rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG",
686 "%s [%"PRId32"]: "
687 "Skip offset %"PRId64" < fetch_offset %"PRId64,
688 rktp->rktp_rkt->rkt_topic->str,
689 rktp->rktp_partition,
690 hdr.Offset, rktp->rktp_offsets.fetch_offset);
691 rd_kafka_buf_skip_to(rkbuf, message_end);
692 return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */
693 }
694
695 rd_kafka_buf_read_bytes_varint(rkbuf, &hdr.Key);
696
697 rd_kafka_buf_read_bytes_varint(rkbuf, &hdr.Value);
698
699 /* We parse the Headers later, just store the size (possibly truncated)
700 * and pointer to the headers. */
701 hdr.Headers.len = (int32_t)(message_end -
702 rd_slice_offset(&rkbuf->rkbuf_reader));
703 rd_kafka_buf_read_ptr(rkbuf, &hdr.Headers.data, hdr.Headers.len);
704
705 /* Create op/message container for message. */
706 rko = rd_kafka_op_new_fetch_msg(&rkm,
707 rktp, msetr->msetr_tver->version, rkbuf,
708 hdr.Offset,
709 (size_t)RD_KAFKAP_BYTES_LEN(&hdr.Key),
710 RD_KAFKAP_BYTES_IS_NULL(&hdr.Key) ?
711 NULL : hdr.Key.data,
712 (size_t)RD_KAFKAP_BYTES_LEN(&hdr.Value),
713 RD_KAFKAP_BYTES_IS_NULL(&hdr.Value) ?
714 NULL : hdr.Value.data);
715
716 /* Store pointer to unparsed message headers, they will
717 * be parsed on the first access.
718 * This pointer points to the rkbuf payload.
719 * Note: can't perform struct copy here due to const fields (MSVC) */
720 rkm->rkm_u.consumer.binhdrs.len = hdr.Headers.len;
721 rkm->rkm_u.consumer.binhdrs.data = hdr.Headers.data;
722
723 /* Set timestamp.
724 *
725 * When broker assigns the timestamps (LOG_APPEND_TIME) it will
726 * assign the same timestamp for all messages in a MessageSet
727 * using MaxTimestamp.
728 */
729 if ((msetr->msetr_v2_hdr->Attributes &
730 RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) ||
731 (hdr.MsgAttributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)) {
732 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
733 rkm->rkm_timestamp = msetr->msetr_v2_hdr->MaxTimestamp;
734 } else {
735 rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
736 rkm->rkm_timestamp =
737 msetr->msetr_v2_hdr->BaseTimestamp + hdr.TimestampDelta;
738 }
739
740
741 /* Enqueue message on temporary queue */
742 rd_kafka_q_enq(&msetr->msetr_rkq, rko);
743 msetr->msetr_msgcnt++;
744 msetr->msetr_msg_bytes += rkm->rkm_key_len + rkm->rkm_len;
745
746 return RD_KAFKA_RESP_ERR_NO_ERROR;
747
748 err_parse:
749 /* Count all parse errors as partial message errors. */
750 rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1);
751 return rkbuf->rkbuf_err;
752}
753
754
755/**
756 * @brief Read v2 messages from current buffer position.
757 */
758static rd_kafka_resp_err_t
759rd_kafka_msgset_reader_msgs_v2 (rd_kafka_msgset_reader_t *msetr) {
760 while (rd_kafka_buf_read_remain(msetr->msetr_rkbuf)) {
761 rd_kafka_resp_err_t err;
762 err = rd_kafka_msgset_reader_msg_v2(msetr);
763 if (unlikely(err))
764 return err;
765 }
766
767 return RD_KAFKA_RESP_ERR_NO_ERROR;
768}
769
770
771
772/**
773 * @brief MessageSet reader for MsgVersion v2 (FetchRequest v4)
774 */
775static rd_kafka_resp_err_t
776rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
777 rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf;
778 rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
779 struct msgset_v2_hdr hdr;
780 rd_slice_t save_slice;
781 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
782 size_t len_start;
783 size_t payload_size;
784 int64_t LastOffset; /* Last absolute Offset in MessageSet header */
785 /* Only log decoding errors if protocol debugging enabled. */
786 int log_decode_errors = (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug &
787 RD_KAFKA_DBG_PROTOCOL) ? LOG_DEBUG : 0;
788
789 rd_kafka_buf_read_i64(rkbuf, &hdr.BaseOffset);
790 rd_kafka_buf_read_i32(rkbuf, &hdr.Length);
791 len_start = rd_slice_offset(&rkbuf->rkbuf_reader);
792
793 if (unlikely(hdr.Length < RD_KAFKAP_MSGSET_V2_SIZE - 8 - 4))
794 rd_kafka_buf_parse_fail(rkbuf,
795 "%s [%"PRId32"] "
796 "MessageSet at offset %"PRId64
797 " length %"PRId32" < header size %d",
798 rktp->rktp_rkt->rkt_topic->str,
799 rktp->rktp_partition,
800 hdr.BaseOffset, hdr.Length,
801 RD_KAFKAP_MSGSET_V2_SIZE - 8 - 4);
802
803 rd_kafka_buf_read_i32(rkbuf, &hdr.PartitionLeaderEpoch);
804 rd_kafka_buf_read_i8(rkbuf, &hdr.MagicByte);
805 rd_kafka_buf_read_i32(rkbuf, &hdr.Crc);
806
807 if (msetr->msetr_rkb->rkb_rk->rk_conf.check_crcs) {
808 /* Verify CRC32C if desired. */
809 uint32_t calc_crc;
810 rd_slice_t crc_slice;
811 size_t crc_len = hdr.Length-4-1-4;
812
813 if (!rd_slice_narrow_copy_relative(
814 &rkbuf->rkbuf_reader,
815 &crc_slice, crc_len))
816 rd_kafka_buf_check_len(rkbuf, crc_len);
817
818 calc_crc = rd_slice_crc32c(&crc_slice);
819
820 if (unlikely((uint32_t)hdr.Crc != calc_crc)) {
821 /* Propagate CRC error to application and
822 * continue with next message. */
823 rd_kafka_q_op_err(&msetr->msetr_rkq,
824 RD_KAFKA_OP_CONSUMER_ERR,
825 RD_KAFKA_RESP_ERR__BAD_MSG,
826 msetr->msetr_tver->version,
827 rktp,
828 hdr.BaseOffset,
829 "MessageSet at offset %"PRId64
830 " (%"PRId32" bytes) "
831 "failed CRC32C check "
832 "(original 0x%"PRIx32" != "
833 "calculated 0x%"PRIx32")",
834 hdr.BaseOffset,
835 hdr.Length, hdr.Crc, calc_crc);
836 rd_kafka_buf_skip_to(rkbuf, crc_len);
837 rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_err, 1);
838 return RD_KAFKA_RESP_ERR_NO_ERROR;
839 }
840 }
841
842 rd_kafka_buf_read_i16(rkbuf, &hdr.Attributes);
843 rd_kafka_buf_read_i32(rkbuf, &hdr.LastOffsetDelta);
844 LastOffset = hdr.BaseOffset + hdr.LastOffsetDelta;
845 rd_kafka_buf_read_i64(rkbuf, &hdr.BaseTimestamp);
846 rd_kafka_buf_read_i64(rkbuf, &hdr.MaxTimestamp);
847 rd_kafka_buf_read_i64(rkbuf, &hdr.PID);
848 rd_kafka_buf_read_i16(rkbuf, &hdr.ProducerEpoch);
849 rd_kafka_buf_read_i32(rkbuf, &hdr.BaseSequence);
850 rd_kafka_buf_read_i32(rkbuf, &hdr.RecordCount);
851
852 /* Payload size is hdr.Length - MessageSet headers */
853 payload_size = hdr.Length - (rd_slice_offset(&rkbuf->rkbuf_reader) -
854 len_start);
855
856 if (unlikely(payload_size > rd_kafka_buf_read_remain(rkbuf)))
857 rd_kafka_buf_underflow_fail(rkbuf, payload_size,
858 "%s [%"PRId32"] "
859 "MessageSet at offset %"PRId64
860 " payload size %"PRIusz,
861 rktp->rktp_rkt->rkt_topic->str,
862 rktp->rktp_partition,
863 hdr.BaseOffset, payload_size);
864
865 /* If entire MessageSet contains old outdated offsets, skip it. */
866 if (LastOffset < rktp->rktp_offsets.fetch_offset) {
867 rd_kafka_buf_skip(rkbuf, payload_size);
868 goto done;
869 }
870
871 /* Ignore control messages */
872 if (unlikely((hdr.Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL))) {
873 msetr->msetr_ctrl_cnt++;
874 rd_kafka_buf_skip(rkbuf, payload_size);
875 goto done;
876 }
877
878 msetr->msetr_v2_hdr = &hdr;
879
880 /* Handle compressed MessageSet */
881 if (hdr.Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK) {
882 const void *compressed;
883
884 compressed = rd_slice_ensure_contig(&rkbuf->rkbuf_reader,
885 payload_size);
886 rd_assert(compressed);
887
888 err = rd_kafka_msgset_reader_decompress(
889 msetr, 2/*MsgVersion v2*/, hdr.Attributes,
890 hdr.BaseTimestamp, hdr.BaseOffset,
891 compressed, payload_size);
892 if (err)
893 goto err;
894
895 } else {
896 /* Read uncompressed messages */
897
898 /* Save original slice, reduce size of the current one to
899 * be limited by the MessageSet.Length, and then start reading
900 * messages until the lesser slice is exhausted. */
901 if (!rd_slice_narrow_relative(&rkbuf->rkbuf_reader,
902 &save_slice, payload_size))
903 rd_kafka_buf_check_len(rkbuf, payload_size);
904
905 /* Read messages */
906 err = rd_kafka_msgset_reader_msgs_v2(msetr);
907
908 /* Restore wider slice */
909 rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice);
910
911 if (unlikely(err))
912 goto err;
913 }
914
915
916 done:
917 /* Set the next fetch offset to the MessageSet header's last offset + 1
918 * to avoid getting stuck on compacted MessageSets where the last
919 * Message in the MessageSet has an Offset < MessageSet header's
920 * last offset. See KAFKA-5443 */
921 msetr->msetr_next_offset = LastOffset + 1;
922
923 msetr->msetr_v2_hdr = NULL;
924
925 return RD_KAFKA_RESP_ERR_NO_ERROR;
926
927 err_parse:
928 /* Count all parse errors as partial message errors. */
929 rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1);
930 err = rkbuf->rkbuf_err;
931 /* FALLTHRU */
932 err:
933 msetr->msetr_v2_hdr = NULL;
934 return err;
935}
936
937
938/**
939 * @brief Peek into the next MessageSet to find the MsgVersion.
940 *
941 * @param MagicBytep the MsgVersion is returned here on success.
942 *
943 * @returns an error on read underflow or if the MsgVersion is
944 * unsupported.
945 */
946static rd_kafka_resp_err_t
947rd_kafka_msgset_reader_peek_msg_version (rd_kafka_msgset_reader_t *msetr,
948 int8_t *MagicBytep) {
949 rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf;
950 rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
951 /* Only log decoding errors if protocol debugging enabled. */
952 int log_decode_errors = (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug &
953 RD_KAFKA_DBG_PROTOCOL) ? LOG_DEBUG : 0;
954 size_t read_offset = rd_slice_offset(&rkbuf->rkbuf_reader);
955
956 rd_kafka_buf_peek_i8(rkbuf, read_offset+8+4+4, MagicBytep);
957
958 if (unlikely(*MagicBytep < 0 || *MagicBytep > 2)) {
959 int64_t Offset; /* For error logging */
960 int32_t Length;
961
962 rd_kafka_buf_read_i64(rkbuf, &Offset);
963
964 rd_rkb_dbg(msetr->msetr_rkb,
965 MSG | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FETCH,
966 "MAGICBYTE",
967 "%s [%"PRId32"]: "
968 "Unsupported Message(Set) MagicByte %d at "
969 "offset %"PRId64" "
970 "(buffer position %"PRIusz"/%"PRIusz"): skipping",
971 rktp->rktp_rkt->rkt_topic->str,
972 rktp->rktp_partition,
973 (int)*MagicBytep, Offset,
974 read_offset, rd_slice_size(&rkbuf->rkbuf_reader));
975
976 if (Offset >= msetr->msetr_rktp->rktp_offsets.fetch_offset) {
977 rd_kafka_q_op_err(
978 &msetr->msetr_rkq,
979 RD_KAFKA_OP_CONSUMER_ERR,
980 RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
981 msetr->msetr_tver->version, rktp, Offset,
982 "Unsupported Message(Set) MagicByte %d "
983 "at offset %"PRId64,
984 (int)*MagicBytep, Offset);
985 /* Skip message(set) */
986 msetr->msetr_rktp->rktp_offsets.fetch_offset = Offset+1;
987 }
988
989 /* Skip this Message(Set).
990 * If the message is malformed, the skip may trigger err_parse
991 * and return ERR__BAD_MSG. */
992 rd_kafka_buf_read_i32(rkbuf, &Length);
993 rd_kafka_buf_skip(rkbuf, Length);
994
995 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
996 }
997
998 return RD_KAFKA_RESP_ERR_NO_ERROR;
999
1000 err_parse:
1001 return RD_KAFKA_RESP_ERR__BAD_MSG;
1002}
1003
1004
1005/**
1006 * @brief Parse and read messages from msgset reader buffer.
1007 */
1008static rd_kafka_resp_err_t
1009rd_kafka_msgset_reader (rd_kafka_msgset_reader_t *msetr) {
1010 rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf;
1011 rd_kafka_resp_err_t (*reader[])
1012 (rd_kafka_msgset_reader_t *) = {
1013 /* Indexed by MsgVersion/MagicByte, pointing to
1014 * a Msg(Set)Version reader */
1015 [0] = rd_kafka_msgset_reader_msg_v0_1,
1016 [1] = rd_kafka_msgset_reader_msg_v0_1,
1017 [2] = rd_kafka_msgset_reader_v2
1018 };
1019 rd_kafka_resp_err_t err;
1020
1021 /* Parse MessageSets until the slice is exhausted or an
1022 * error occurs (typically a partial message). */
1023 do {
1024 int8_t MagicByte;
1025
1026 /* We dont know the MsgVersion at this point, peek where the
1027 * MagicByte resides both in MsgVersion v0..1 and v2 to
1028 * know which MessageSet reader to use. */
1029 err = rd_kafka_msgset_reader_peek_msg_version(msetr,
1030 &MagicByte);
1031 if (unlikely(err)) {
1032 if (err == RD_KAFKA_RESP_ERR__BAD_MSG)
1033 /* Read underflow, not an error.
1034 * Broker may return a partial Fetch response
1035 * due to its use of sendfile(2). */
1036 return RD_KAFKA_RESP_ERR_NO_ERROR;
1037
1038 /* Continue on unsupported MsgVersions, the
1039 * MessageSet will be skipped. */
1040 continue;
1041 }
1042
1043 /* Use MsgVersion-specific reader */
1044 err = reader[(int)MagicByte](msetr);
1045
1046 } while (!err && rd_slice_remains(&rkbuf->rkbuf_reader) > 0);
1047
1048 return err;
1049}
1050
1051
1052
1053/**
1054 * @brief MessageSet post-processing.
1055 *
1056 * @param last_offsetp will be set to the offset of the last message in the set,
1057 * or -1 if not applicable.
1058 */
1059static void rd_kafka_msgset_reader_postproc (rd_kafka_msgset_reader_t *msetr,
1060 int64_t *last_offsetp) {
1061 rd_kafka_op_t *rko;
1062
1063 if (msetr->msetr_relative_offsets) {
1064 /* Update messages to absolute offsets
1065 * and purge any messages older than the current
1066 * fetch offset. */
1067 rd_kafka_q_fix_offsets(&msetr->msetr_rkq,
1068 msetr->msetr_rktp->rktp_offsets.
1069 fetch_offset,
1070 msetr->msetr_outer.offset -
1071 msetr->msetr_msgcnt + 1);
1072 }
1073
1074 rko = rd_kafka_q_last(&msetr->msetr_rkq,
1075 RD_KAFKA_OP_FETCH,
1076 0 /* no error ops */);
1077 if (rko)
1078 *last_offsetp = rko->rko_u.fetch.rkm.rkm_offset;
1079}
1080
1081
1082
1083
1084
1085/**
1086 * @brief Run the MessageSet reader, read messages until buffer is
1087 * exhausted (or error encountered), enqueue parsed messages on
1088 * partition queue.
1089 *
1090 * @returns RD_KAFKA_RESP_ERR_NO_ERROR if MessageSet was successfully
1091 * or partially parsed. When other error codes are returned it
1092 * indicates a semi-permanent error (such as unsupported MsgVersion)
1093 * and the fetcher should back off this partition to avoid
1094 * busy-looping.
1095 */
1096static rd_kafka_resp_err_t
1097rd_kafka_msgset_reader_run (rd_kafka_msgset_reader_t *msetr) {
1098 rd_kafka_toppar_t *rktp = msetr->msetr_rktp;
1099 rd_kafka_resp_err_t err;
1100 int64_t last_offset = -1;
1101
1102 /* Parse MessageSets and messages */
1103 err = rd_kafka_msgset_reader(msetr);
1104
1105 if (unlikely(rd_kafka_q_len(&msetr->msetr_rkq) == 0)) {
1106 /* The message set didn't contain at least one full message
1107 * or no error was posted on the response queue.
1108 * This means the size limit perhaps was too tight,
1109 * increase it automatically.
1110 * If there was at least one control message there
1111 * is probably not a size limit and nothing is done. */
1112 if (msetr->msetr_ctrl_cnt > 0) {
1113 /* Noop */
1114
1115 } else if (rktp->rktp_fetch_msg_max_bytes < (1 << 30)) {
1116 rktp->rktp_fetch_msg_max_bytes *= 2;
1117 rd_rkb_dbg(msetr->msetr_rkb, FETCH, "CONSUME",
1118 "Topic %s [%"PRId32"]: Increasing "
1119 "max fetch bytes to %"PRId32,
1120 rktp->rktp_rkt->rkt_topic->str,
1121 rktp->rktp_partition,
1122 rktp->rktp_fetch_msg_max_bytes);
1123 } else if (!err) {
1124 rd_kafka_q_op_err(
1125 &msetr->msetr_rkq,
1126 RD_KAFKA_OP_CONSUMER_ERR,
1127 RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
1128 msetr->msetr_tver->version,
1129 rktp,
1130 rktp->rktp_offsets.fetch_offset,
1131 "Message at offset %"PRId64" "
1132 "might be too large to fetch, try increasing "
1133 "receive.message.max.bytes",
1134 rktp->rktp_offsets.fetch_offset);
1135 }
1136
1137 } else {
1138 /* MessageSet post-processing. */
1139 rd_kafka_msgset_reader_postproc(msetr, &last_offset);
1140
1141 /* Ignore parse errors if there was at least one
1142 * good message since it probably indicates a
1143 * partial response rather than an erroneous one. */
1144 if (err == RD_KAFKA_RESP_ERR__UNDERFLOW &&
1145 msetr->msetr_msgcnt > 0)
1146 err = RD_KAFKA_RESP_ERR_NO_ERROR;
1147 }
1148
1149 rd_rkb_dbg(msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME",
1150 "Enqueue %i %smessage(s) (%"PRId64" bytes, %d ops) on "
1151 "%s [%"PRId32"] "
1152 "fetch queue (qlen %d, v%d, last_offset %"PRId64
1153 ", %d ctrl msgs)",
1154 msetr->msetr_msgcnt, msetr->msetr_srcname,
1155 msetr->msetr_msg_bytes,
1156 rd_kafka_q_len(&msetr->msetr_rkq),
1157 rktp->rktp_rkt->rkt_topic->str,
1158 rktp->rktp_partition, rd_kafka_q_len(&msetr->msetr_rkq),
1159 msetr->msetr_tver->version, last_offset,
1160 msetr->msetr_ctrl_cnt);
1161
1162 /* Concat all messages&errors onto the parent's queue
1163 * (the partition's fetch queue) */
1164 if (rd_kafka_q_concat(msetr->msetr_par_rkq, &msetr->msetr_rkq) != -1) {
1165 /* Update partition's fetch offset based on
1166 * last message's offest. */
1167 if (likely(last_offset != -1))
1168 rktp->rktp_offsets.fetch_offset = last_offset + 1;
1169 }
1170
1171 /* Adjust next fetch offset if outlier code has indicated
1172 * an even later next offset. */
1173 if (msetr->msetr_next_offset > rktp->rktp_offsets.fetch_offset)
1174 rktp->rktp_offsets.fetch_offset = msetr->msetr_next_offset;
1175
1176 rd_kafka_q_destroy_owner(&msetr->msetr_rkq);
1177
1178 /* Skip remaining part of slice so caller can continue
1179 * with next partition. */
1180 rd_slice_read(&msetr->msetr_rkbuf->rkbuf_reader, NULL,
1181 rd_slice_remains(&msetr->msetr_rkbuf->rkbuf_reader));
1182 return err;
1183}
1184
1185
1186
1187/**
1188 * @brief Parse one MessageSet at the current buffer read position,
1189 * enqueueing messages, propagating errors, etc.
1190 * @remark The current rkbuf_reader slice must be limited to the MessageSet size
1191 *
1192 * @returns see rd_kafka_msgset_reader_run()
1193 */
1194rd_kafka_resp_err_t
1195rd_kafka_msgset_parse (rd_kafka_buf_t *rkbuf,
1196 rd_kafka_buf_t *request,
1197 rd_kafka_toppar_t *rktp,
1198 const struct rd_kafka_toppar_ver *tver) {
1199 rd_kafka_msgset_reader_t msetr;
1200 rd_kafka_resp_err_t err;
1201
1202 rd_kafka_msgset_reader_init(&msetr, rkbuf, rktp, tver,
1203 rktp->rktp_fetchq);
1204
1205 /* Parse and handle the message set */
1206 err = rd_kafka_msgset_reader_run(&msetr);
1207
1208 rd_atomic64_add(&rktp->rktp_c.rx_msgs, msetr.msetr_msgcnt);
1209 rd_atomic64_add(&rktp->rktp_c.rx_msg_bytes, msetr.msetr_msg_bytes);
1210
1211 rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt,
1212 (int64_t)msetr.msetr_msgcnt);
1213 rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize,
1214 (int64_t)msetr.msetr_msg_bytes);
1215
1216 return err;
1217
1218}
1219