1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "rd.h"
30#include "rdkafka_int.h"
31#include "rdkafka_msg.h"
32#include "rdkafka_msgset.h"
33#include "rdkafka_topic.h"
34#include "rdkafka_partition.h"
35#include "rdkafka_header.h"
36#include "rdkafka_lz4.h"
37
38#if WITH_ZSTD
39#include "rdkafka_zstd.h"
40#endif
41
42#include "snappy.h"
43#include "rdvarint.h"
44#include "crc32c.h"
45
46
47typedef struct rd_kafka_msgset_writer_s {
48 rd_kafka_buf_t *msetw_rkbuf; /* Backing store buffer (refcounted)*/
49
50 int16_t msetw_ApiVersion; /* ProduceRequest ApiVersion */
51 int msetw_MsgVersion; /* MsgVersion to construct */
52 int msetw_features; /* Protocol features to use */
53 rd_kafka_compression_t msetw_compression; /**< Compression type */
54 int msetw_msgcntmax; /* Max number of messages to send
55 * in a batch. */
56 size_t msetw_messages_len; /* Total size of Messages, with Message
57 * framing but without
58 * MessageSet header */
59 size_t msetw_messages_kvlen; /* Total size of Message keys
60 * and values */
61
62 size_t msetw_MessageSetSize; /* Current MessageSetSize value */
63 size_t msetw_of_MessageSetSize; /* offset of MessageSetSize */
64 size_t msetw_of_start; /* offset of MessageSet */
65
66 int msetw_relative_offsets; /* Bool: use relative offsets */
67
68 /* For MessageSet v2 */
69 int msetw_Attributes; /* MessageSet Attributes */
70 int64_t msetw_MaxTimestamp; /* Maximum timestamp in batch */
71 size_t msetw_of_CRC; /* offset of MessageSet.CRC */
72
73 rd_kafka_msgbatch_t *msetw_batch; /**< Convenience pointer to
74 * rkbuf_u.Produce.batch */
75
76 /* First message information */
77 struct {
78 size_t of; /* rkbuf's first message position */
79 int64_t timestamp;
80 } msetw_firstmsg;
81
82 rd_kafka_pid_t msetw_pid; /**< Idempotent producer's
83 * current Producer Id */
84 rd_kafka_broker_t *msetw_rkb; /* @warning Not a refcounted
85 * reference! */
86 rd_kafka_toppar_t *msetw_rktp; /* @warning Not a refcounted
87 * reference! */
88 rd_kafka_msgq_t *msetw_msgq; /**< Input message queue */
89} rd_kafka_msgset_writer_t;
90
91
92
93/**
94 * @brief Select ApiVersion and MsgVersion to use based on broker's
95 * feature compatibility.
96 *
97 * @locality broker thread
98 */
99static RD_INLINE void
100rd_kafka_msgset_writer_select_MsgVersion (rd_kafka_msgset_writer_t *msetw) {
101 rd_kafka_broker_t *rkb = msetw->msetw_rkb;
102 rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
103 int16_t min_ApiVersion = 0;
104 int feature;
105 /* Map compression types to required feature and ApiVersion */
106 static const struct {
107 int feature;
108 int16_t ApiVersion;
109 } compr_req[RD_KAFKA_COMPRESSION_NUM] = {
110 [RD_KAFKA_COMPRESSION_LZ4] = { RD_KAFKA_FEATURE_LZ4, 3 },
111#if WITH_ZSTD
112 [RD_KAFKA_COMPRESSION_ZSTD] = { RD_KAFKA_FEATURE_ZSTD, 7 },
113#endif
114 };
115
116 if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)) {
117 min_ApiVersion = 3;
118 msetw->msetw_MsgVersion = 2;
119 msetw->msetw_features |= feature;
120 } else if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)) {
121 min_ApiVersion = 2;
122 msetw->msetw_MsgVersion = 1;
123 msetw->msetw_features |= feature;
124 } else {
125 if ((feature =
126 rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)) {
127 min_ApiVersion = 1;
128 msetw->msetw_features |= feature;
129 } else
130 min_ApiVersion = 0;
131 msetw->msetw_MsgVersion = 0;
132 }
133
134 msetw->msetw_compression = rktp->rktp_rkt->rkt_conf.compression_codec;
135
136 /*
137 * Check that the configured compression type is supported
138 * by both client and broker, else disable compression.
139 */
140 if (msetw->msetw_compression &&
141 (rd_kafka_broker_ApiVersion_supported(
142 rkb, RD_KAFKAP_Produce,
143 0, compr_req[msetw->msetw_compression].ApiVersion,
144 NULL) == -1 ||
145 (compr_req[msetw->msetw_compression].feature &&
146 !(msetw->msetw_rkb->rkb_features &
147 compr_req[msetw->msetw_compression].feature)))) {
148 if (unlikely(rd_interval(
149 &rkb->rkb_suppress.unsupported_compression,
150 /* at most once per day */
151 (rd_ts_t)86400 * 1000 * 1000, 0) > 0))
152 rd_rkb_log(rkb, LOG_NOTICE, "COMPRESSION",
153 "%.*s [%"PRId32"]: "
154 "Broker does not support compression "
155 "type %s: not compressing batch",
156 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
157 rktp->rktp_partition,
158 rd_kafka_compression2str(
159 msetw->msetw_compression));
160 else
161 rd_rkb_dbg(rkb, MSG, "PRODUCE",
162 "%.*s [%"PRId32"]: "
163 "Broker does not support compression "
164 "type %s: not compressing batch",
165 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
166 rktp->rktp_partition,
167 rd_kafka_compression2str(
168 msetw->msetw_compression));
169
170 msetw->msetw_compression = RD_KAFKA_COMPRESSION_NONE;
171 } else {
172 /* Broker supports this compression type. */
173 msetw->msetw_features |=
174 compr_req[msetw->msetw_compression].feature;
175
176 if (min_ApiVersion <
177 compr_req[msetw->msetw_compression].ApiVersion)
178 min_ApiVersion =
179 compr_req[msetw->msetw_compression].ApiVersion;
180 }
181
182 /* MsgVersion specific setup. */
183 switch (msetw->msetw_MsgVersion)
184 {
185 case 2:
186 msetw->msetw_relative_offsets = 1; /* OffsetDelta */
187 break;
188 case 1:
189 if (msetw->msetw_compression != RD_KAFKA_COMPRESSION_NONE)
190 msetw->msetw_relative_offsets = 1;
191 break;
192 }
193
194 /* Set the highest ApiVersion supported by us and broker */
195 msetw->msetw_ApiVersion = rd_kafka_broker_ApiVersion_supported(
196 rkb,
197 RD_KAFKAP_Produce, min_ApiVersion, 7, NULL);
198
199 /* It should not be possible to get a lower version than requested,
200 * otherwise the logic in this function is buggy. */
201 rd_assert(msetw->msetw_ApiVersion >= min_ApiVersion);
202}
203
204
205/**
206 * @brief Allocate buffer for messageset writer based on a previously set
207 * up \p msetw.
208 *
209 * Allocate iovecs to hold all headers and messages,
210 * and allocate enough space to allow copies of small messages.
211 * The allocated size is the minimum of message.max.bytes
212 * or queued_bytes + msgcntmax * msg_overhead
213 */
214static void
215rd_kafka_msgset_writer_alloc_buf (rd_kafka_msgset_writer_t *msetw) {
216 rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
217 size_t msg_overhead = 0;
218 size_t hdrsize = 0;
219 size_t msgsetsize = 0;
220 size_t bufsize;
221
222 rd_kafka_assert(NULL, !msetw->msetw_rkbuf);
223
224 /* Calculate worst-case buffer size, produce header size,
225 * message size, etc, this isn't critical but avoids unnecesary
226 * extra allocations. The buffer will grow as needed if we get
227 * this wrong.
228 *
229 * ProduceRequest headers go in one iovec:
230 * ProduceRequest v0..2:
231 * RequiredAcks + Timeout +
232 * [Topic + [Partition + MessageSetSize]]
233 *
234 * ProduceRequest v3:
235 * TransactionalId + RequiredAcks + Timeout +
236 * [Topic + [Partition + MessageSetSize + MessageSet]]
237 */
238
239 /*
240 * ProduceRequest header sizes
241 */
242 switch (msetw->msetw_ApiVersion)
243 {
244 case 7:
245 case 6:
246 case 5:
247 case 4:
248 case 3:
249 /* Add TransactionalId */
250 hdrsize += RD_KAFKAP_STR_SIZE(rk->rk_eos.transactional_id);
251 /* FALLTHRU */
252 case 0:
253 case 1:
254 case 2:
255 hdrsize +=
256 /* RequiredAcks + Timeout + TopicCnt */
257 2 + 4 + 4 +
258 /* Topic */
259 RD_KAFKAP_STR_SIZE(msetw->msetw_rktp->
260 rktp_rkt->rkt_topic) +
261 /* PartitionCnt + Partition + MessageSetSize */
262 4 + 4 + 4;
263 msgsetsize += 4; /* MessageSetSize */
264 break;
265
266 default:
267 RD_NOTREACHED();
268 }
269
270 /*
271 * MsgVersion specific sizes:
272 * - (Worst-case) Message overhead: message fields
273 * - MessageSet header size
274 */
275 switch (msetw->msetw_MsgVersion)
276 {
277 case 0:
278 /* MsgVer0 */
279 msg_overhead = RD_KAFKAP_MESSAGE_V0_OVERHEAD;
280 break;
281 case 1:
282 /* MsgVer1 */
283 msg_overhead = RD_KAFKAP_MESSAGE_V1_OVERHEAD;
284 break;
285
286 case 2:
287 /* MsgVer2 uses varints, we calculate for the worst-case. */
288 msg_overhead += RD_KAFKAP_MESSAGE_V2_OVERHEAD;
289
290 /* MessageSet header fields */
291 msgsetsize +=
292 8 /* BaseOffset */ +
293 4 /* Length */ +
294 4 /* PartitionLeaderEpoch */ +
295 1 /* Magic (MsgVersion) */ +
296 4 /* CRC (CRC32C) */ +
297 2 /* Attributes */ +
298 4 /* LastOffsetDelta */ +
299 8 /* BaseTimestamp */ +
300 8 /* MaxTimestamp */ +
301 8 /* ProducerId */ +
302 2 /* ProducerEpoch */ +
303 4 /* BaseSequence */ +
304 4 /* RecordCount */;
305 break;
306
307 default:
308 RD_NOTREACHED();
309 }
310
311 /*
312 * Calculate total buffer size to allocate
313 */
314 bufsize = hdrsize + msgsetsize;
315
316 /* If copying for small payloads is enabled, allocate enough
317 * space for each message to be copied based on this limit.
318 */
319 if (rk->rk_conf.msg_copy_max_size > 0) {
320 size_t queued_bytes = rd_kafka_msgq_size(msetw->msetw_msgq);
321 bufsize += RD_MIN(queued_bytes,
322 (size_t)rk->rk_conf.msg_copy_max_size *
323 msetw->msetw_msgcntmax);
324 }
325
326 /* Add estimed per-message overhead */
327 bufsize += msg_overhead * msetw->msetw_msgcntmax;
328
329 /* Cap allocation at message.max.bytes */
330 if (bufsize > (size_t)rk->rk_conf.max_msg_size)
331 bufsize = (size_t)rk->rk_conf.max_msg_size;
332
333 /*
334 * Allocate iovecs to hold all headers and messages,
335 * and allocate auxilliery space for message headers, etc.
336 */
337 msetw->msetw_rkbuf =
338 rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
339 msetw->msetw_msgcntmax/2 + 10,
340 bufsize);
341
342 rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf,
343 msetw->msetw_ApiVersion,
344 msetw->msetw_features);
345}
346
347
348/**
349 * @brief Write the MessageSet header.
350 * @remark Must only be called for MsgVersion 2
351 */
352static void
353rd_kafka_msgset_writer_write_MessageSet_v2_header (
354 rd_kafka_msgset_writer_t *msetw) {
355 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
356
357 rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
358 rd_kafka_assert(NULL, msetw->msetw_MsgVersion == 2);
359
360 /* BaseOffset (also store the offset to the start of
361 * the messageset header fields) */
362 msetw->msetw_of_start = rd_kafka_buf_write_i64(rkbuf, 0);
363
364 /* Length: updated later */
365 rd_kafka_buf_write_i32(rkbuf, 0);
366
367 /* PartitionLeaderEpoch (KIP-101) */
368 rd_kafka_buf_write_i32(rkbuf, 0);
369
370 /* Magic (MsgVersion) */
371 rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
372
373 /* CRC (CRC32C): updated later.
374 * CRC needs to be done after the entire messageset+messages has
375 * been constructed and the following header fields updated. :(
376 * Save the offset for this position. so it can be udpated later. */
377 msetw->msetw_of_CRC = rd_kafka_buf_write_i32(rkbuf, 0);
378
379 /* Attributes: updated later */
380 rd_kafka_buf_write_i16(rkbuf, 0);
381
382 /* LastOffsetDelta: updated later */
383 rd_kafka_buf_write_i32(rkbuf, 0);
384
385 /* BaseTimestamp: updated later */
386 rd_kafka_buf_write_i64(rkbuf, 0);
387
388 /* MaxTimestamp: updated later */
389 rd_kafka_buf_write_i64(rkbuf, 0);
390
391 /* ProducerId */
392 rd_kafka_buf_write_i64(rkbuf, msetw->msetw_pid.id);
393
394 /* ProducerEpoch */
395 rd_kafka_buf_write_i16(rkbuf, msetw->msetw_pid.epoch);
396
397 /* BaseSequence: updated later in case of Idempotent Producer */
398 rd_kafka_buf_write_i32(rkbuf, -1);
399
400 /* RecordCount: udpated later */
401 rd_kafka_buf_write_i32(rkbuf, 0);
402
403}
404
405
406/**
407 * @brief Write ProduceRequest headers.
408 * When this function returns the msgset is ready for
409 * writing individual messages.
410 * msetw_MessageSetSize will have been set to the messageset header.
411 */
412static void
413rd_kafka_msgset_writer_write_Produce_header (rd_kafka_msgset_writer_t *msetw) {
414
415 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
416 rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
417 rd_kafka_itopic_t *rkt = msetw->msetw_rktp->rktp_rkt;
418
419 /* V3: TransactionalId */
420 if (msetw->msetw_ApiVersion >= 3)
421 rd_kafka_buf_write_kstr(rkbuf, rk->rk_eos.transactional_id);
422
423 /* RequiredAcks */
424 rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks);
425
426 /* Timeout */
427 rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);
428
429 /* TopicArrayCnt */
430 rd_kafka_buf_write_i32(rkbuf, 1);
431
432 /* Insert topic */
433 rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);
434
435 /* PartitionArrayCnt */
436 rd_kafka_buf_write_i32(rkbuf, 1);
437
438 /* Partition */
439 rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);
440
441 /* MessageSetSize: Will be finalized later*/
442 msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
443
444 if (msetw->msetw_MsgVersion == 2) {
445 /* MessageSet v2 header */
446 rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
447 msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE;
448 } else {
449 /* Older MessageSet */
450 msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE;
451 }
452}
453
454
455/**
456 * @brief Initialize a ProduceRequest MessageSet writer for
457 * the given broker and partition.
458 *
459 * A new buffer will be allocated to fit the pending messages in queue.
460 *
461 * @returns the number of messages to enqueue
462 *
463 * @remark This currently constructs the entire ProduceRequest, containing
464 * a single outer MessageSet for a single partition.
465 *
466 * @locality broker thread
467 */
468static int rd_kafka_msgset_writer_init (rd_kafka_msgset_writer_t *msetw,
469 rd_kafka_broker_t *rkb,
470 rd_kafka_toppar_t *rktp,
471 rd_kafka_msgq_t *rkmq,
472 rd_kafka_pid_t pid) {
473 int msgcnt = rd_kafka_msgq_len(rkmq);
474
475 if (msgcnt == 0)
476 return 0;
477
478 memset(msetw, 0, sizeof(*msetw));
479
480 msetw->msetw_rktp = rktp;
481 msetw->msetw_rkb = rkb;
482 msetw->msetw_msgq = rkmq;
483 msetw->msetw_pid = pid;
484
485 /* Max number of messages to send in a batch,
486 * limited by current queue size or configured batch size,
487 * whichever is lower. */
488 msetw->msetw_msgcntmax = RD_MIN(msgcnt,
489 rkb->rkb_rk->rk_conf.
490 batch_num_messages);
491 rd_dassert(msetw->msetw_msgcntmax > 0);
492
493 /* Select MsgVersion to use */
494 rd_kafka_msgset_writer_select_MsgVersion(msetw);
495
496 /* Allocate backing buffer */
497 rd_kafka_msgset_writer_alloc_buf(msetw);
498
499 /* Construct first part of Produce header + MessageSet header */
500 rd_kafka_msgset_writer_write_Produce_header(msetw);
501
502 /* The current buffer position is now where the first message
503 * is located.
504 * Record the current buffer position so it can be rewound later
505 * in case of compression. */
506 msetw->msetw_firstmsg.of = rd_buf_write_pos(&msetw->msetw_rkbuf->
507 rkbuf_buf);
508
509 rd_kafka_msgbatch_init(&msetw->msetw_rkbuf->rkbuf_u.Produce.batch,
510 rktp, pid);
511 msetw->msetw_batch = &msetw->msetw_rkbuf->rkbuf_u.Produce.batch;
512
513 return msetw->msetw_msgcntmax;
514}
515
516
517
518/**
519 * @brief Copy or link message payload to buffer.
520 */
521static RD_INLINE void
522rd_kafka_msgset_writer_write_msg_payload (rd_kafka_msgset_writer_t *msetw,
523 const rd_kafka_msg_t *rkm,
524 void (*free_cb)(void *)) {
525 const rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
526 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
527
528 /* If payload is below the copy limit and there is still
529 * room in the buffer we'll copy the payload to the buffer,
530 * otherwise we push a reference to the memory. */
531 if (rkm->rkm_len <= (size_t)rk->rk_conf.msg_copy_max_size &&
532 rd_buf_write_remains(&rkbuf->rkbuf_buf) > rkm->rkm_len) {
533 rd_kafka_buf_write(rkbuf,
534 rkm->rkm_payload, rkm->rkm_len);
535 if (free_cb)
536 free_cb(rkm->rkm_payload);
537 } else
538 rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len,
539 free_cb);
540}
541
542
543/**
544 * @brief Write message headers to buffer.
545 *
546 * @remark The enveloping HeaderCount varint must already have been written.
547 * @returns the number of bytes written to msetw->msetw_rkbuf
548 */
549static size_t
550rd_kafka_msgset_writer_write_msg_headers (rd_kafka_msgset_writer_t *msetw,
551 const rd_kafka_headers_t *hdrs) {
552 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
553 const rd_kafka_header_t *hdr;
554 int i;
555 size_t start_pos = rd_buf_write_pos(&rkbuf->rkbuf_buf);
556 size_t written;
557
558 RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) {
559 rd_kafka_buf_write_varint(rkbuf, hdr->rkhdr_name_size);
560 rd_kafka_buf_write(rkbuf,
561 hdr->rkhdr_name, hdr->rkhdr_name_size);
562 rd_kafka_buf_write_varint(rkbuf,
563 hdr->rkhdr_value ?
564 (int64_t)hdr->rkhdr_value_size : -1);
565 rd_kafka_buf_write(rkbuf,
566 hdr->rkhdr_value,
567 hdr->rkhdr_value_size);
568 }
569
570 written = rd_buf_write_pos(&rkbuf->rkbuf_buf) - start_pos;
571 rd_dassert(written == hdrs->rkhdrs_ser_size);
572
573 return written;
574}
575
576
577
578/**
579 * @brief Write message to messageset buffer with MsgVersion 0 or 1.
580 * @returns the number of bytes written.
581 */
582static size_t
583rd_kafka_msgset_writer_write_msg_v0_1 (rd_kafka_msgset_writer_t *msetw,
584 rd_kafka_msg_t *rkm,
585 int64_t Offset,
586 int8_t MsgAttributes,
587 void (*free_cb)(void *)) {
588 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
589 size_t MessageSize;
590 size_t of_Crc;
591
592 /*
593 * MessageSet's (v0 and v1) per-Message header.
594 */
595
596 /* Offset (only relevant for compressed messages on MsgVersion v1) */
597 rd_kafka_buf_write_i64(rkbuf, Offset);
598
599 /* MessageSize */
600 MessageSize =
601 4 + 1 + 1 + /* Crc+MagicByte+Attributes */
602 4 /* KeyLength */ + rkm->rkm_key_len +
603 4 /* ValueLength */ + rkm->rkm_len;
604
605 if (msetw->msetw_MsgVersion == 1)
606 MessageSize += 8; /* Timestamp i64 */
607
608 rd_kafka_buf_write_i32(rkbuf, (int32_t)MessageSize);
609
610 /*
611 * Message
612 */
613 /* Crc: will be updated later */
614 of_Crc = rd_kafka_buf_write_i32(rkbuf, 0);
615
616 /* Start Crc calculation of all buf writes. */
617 rd_kafka_buf_crc_init(rkbuf);
618
619 /* MagicByte */
620 rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
621
622 /* Attributes */
623 rd_kafka_buf_write_i8(rkbuf, MsgAttributes);
624
625 /* V1: Timestamp */
626 if (msetw->msetw_MsgVersion == 1)
627 rd_kafka_buf_write_i64(rkbuf, rkm->rkm_timestamp);
628
629 /* Message Key */
630 rd_kafka_buf_write_bytes(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
631
632 /* Write or copy Value/payload */
633 if (rkm->rkm_payload) {
634 rd_kafka_buf_write_i32(rkbuf, (int32_t)rkm->rkm_len);
635 rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
636 } else
637 rd_kafka_buf_write_i32(rkbuf, RD_KAFKAP_BYTES_LEN_NULL);
638
639 /* Finalize Crc */
640 rd_kafka_buf_update_u32(rkbuf, of_Crc,
641 rd_kafka_buf_crc_finalize(rkbuf));
642
643
644 /* Return written message size */
645 return 8/*Offset*/ + 4/*MessageSize*/ + MessageSize;
646}
647
648/**
649 * @brief Write message to messageset buffer with MsgVersion 2.
650 * @returns the number of bytes written.
651 */
652static size_t
653rd_kafka_msgset_writer_write_msg_v2 (rd_kafka_msgset_writer_t *msetw,
654 rd_kafka_msg_t *rkm,
655 int64_t Offset,
656 int8_t MsgAttributes,
657 void (*free_cb)(void *)) {
658 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
659 size_t MessageSize = 0;
660 char varint_Length[RD_UVARINT_ENC_SIZEOF(int32_t)];
661 char varint_TimestampDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
662 char varint_OffsetDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
663 char varint_KeyLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
664 char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
665 char varint_HeaderCount[RD_UVARINT_ENC_SIZEOF(int32_t)];
666 size_t sz_Length;
667 size_t sz_TimestampDelta;
668 size_t sz_OffsetDelta;
669 size_t sz_KeyLen;
670 size_t sz_ValueLen;
671 size_t sz_HeaderCount;
672 int HeaderCount = 0;
673 size_t HeaderSize = 0;
674
675 if (rkm->rkm_headers) {
676 HeaderCount = rkm->rkm_headers->rkhdrs_list.rl_cnt;
677 HeaderSize = rkm->rkm_headers->rkhdrs_ser_size;
678 }
679
680 /* All varints, except for Length, needs to be pre-built
681 * so that the Length field can be set correctly and thus have
682 * correct varint encoded width. */
683
684 sz_TimestampDelta = rd_uvarint_enc_i64(
685 varint_TimestampDelta, sizeof(varint_TimestampDelta),
686 rkm->rkm_timestamp - msetw->msetw_firstmsg.timestamp);
687 sz_OffsetDelta = rd_uvarint_enc_i64(
688 varint_OffsetDelta, sizeof(varint_OffsetDelta), Offset);
689 sz_KeyLen = rd_uvarint_enc_i32(
690 varint_KeyLen, sizeof(varint_KeyLen),
691 rkm->rkm_key ? (int32_t)rkm->rkm_key_len :
692 (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
693 sz_ValueLen = rd_uvarint_enc_i32(
694 varint_ValueLen, sizeof(varint_ValueLen),
695 rkm->rkm_payload ? (int32_t)rkm->rkm_len :
696 (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
697 sz_HeaderCount = rd_uvarint_enc_i32(
698 varint_HeaderCount, sizeof(varint_HeaderCount),
699 (int32_t)HeaderCount);
700
701 /* Calculate MessageSize without length of Length (added later)
702 * to store it in Length. */
703 MessageSize =
704 1 /* MsgAttributes */ +
705 sz_TimestampDelta +
706 sz_OffsetDelta +
707 sz_KeyLen +
708 rkm->rkm_key_len +
709 sz_ValueLen +
710 rkm->rkm_len +
711 sz_HeaderCount +
712 HeaderSize;
713
714 /* Length */
715 sz_Length = rd_uvarint_enc_i64(varint_Length, sizeof(varint_Length),
716 MessageSize);
717 rd_kafka_buf_write(rkbuf, varint_Length, sz_Length);
718 MessageSize += sz_Length;
719
720 /* Attributes: The MsgAttributes argument is losely based on MsgVer0
721 * which don't apply for MsgVer2 */
722 rd_kafka_buf_write_i8(rkbuf, 0);
723
724 /* TimestampDelta */
725 rd_kafka_buf_write(rkbuf, varint_TimestampDelta, sz_TimestampDelta);
726
727 /* OffsetDelta */
728 rd_kafka_buf_write(rkbuf, varint_OffsetDelta, sz_OffsetDelta);
729
730 /* KeyLen */
731 rd_kafka_buf_write(rkbuf, varint_KeyLen, sz_KeyLen);
732
733 /* Key (if any) */
734 if (rkm->rkm_key)
735 rd_kafka_buf_write(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
736
737 /* ValueLen */
738 rd_kafka_buf_write(rkbuf, varint_ValueLen, sz_ValueLen);
739
740 /* Write or copy Value/payload */
741 if (rkm->rkm_payload)
742 rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
743
744 /* HeaderCount */
745 rd_kafka_buf_write(rkbuf, varint_HeaderCount, sz_HeaderCount);
746
747 /* Headers array */
748 if (rkm->rkm_headers)
749 rd_kafka_msgset_writer_write_msg_headers(msetw,
750 rkm->rkm_headers);
751
752 /* Return written message size */
753 return MessageSize;
754}
755
756
757/**
758 * @brief Write message to messageset buffer.
759 * @returns the number of bytes written.
760 */
761static size_t
762rd_kafka_msgset_writer_write_msg (rd_kafka_msgset_writer_t *msetw,
763 rd_kafka_msg_t *rkm,
764 int64_t Offset, int8_t MsgAttributes,
765 void (*free_cb)(void *)) {
766 size_t outlen;
767 size_t (*writer[]) (rd_kafka_msgset_writer_t *,
768 rd_kafka_msg_t *, int64_t, int8_t,
769 void (*)(void *)) = {
770 [0] = rd_kafka_msgset_writer_write_msg_v0_1,
771 [1] = rd_kafka_msgset_writer_write_msg_v0_1,
772 [2] = rd_kafka_msgset_writer_write_msg_v2
773 };
774 size_t actual_written;
775 size_t pre_pos;
776
777 if (likely(rkm->rkm_timestamp))
778 MsgAttributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
779
780 pre_pos = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf);
781
782 outlen = writer[msetw->msetw_MsgVersion](msetw, rkm,
783 Offset, MsgAttributes,
784 free_cb);
785
786 actual_written = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
787 pre_pos;
788 rd_assert(outlen <=
789 rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion));
790 rd_assert(outlen == actual_written);
791
792 return outlen;
793
794}
795
796/**
797 * @brief Write as many messages from the given message queue to
798 * the messageset.
799 *
800 * May not write any messages.
801 *
802 * @returns 1 on success or 0 on error.
803 */
804static int
805rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
806 rd_kafka_msgq_t *rkmq) {
807 rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
808 rd_kafka_broker_t *rkb = msetw->msetw_rkb;
809 size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf);
810 size_t max_msg_size = (size_t)msetw->msetw_rkb->rkb_rk->
811 rk_conf.max_msg_size;
812 rd_ts_t int_latency_base;
813 rd_ts_t MaxTimestamp = 0;
814 rd_kafka_msg_t *rkm;
815 int msgcnt = 0;
816 const rd_ts_t now = rd_clock();
817
818 /* Internal latency calculation base.
819 * Uses rkm_ts_timeout which is enqueue time + timeout */
820 int_latency_base = now +
821 ((rd_ts_t) rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000);
822
823 /* Acquire BaseTimestamp from first message. */
824 rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
825 rd_kafka_assert(NULL, rkm);
826 msetw->msetw_firstmsg.timestamp = rkm->rkm_timestamp;
827
828 rd_kafka_msgbatch_set_first_msg(msetw->msetw_batch, rkm);
829
830 /*
831 * Write as many messages as possible until buffer is full
832 * or limit reached.
833 */
834 do {
835 if (unlikely(msetw->msetw_batch->last_msgid &&
836 msetw->msetw_batch->last_msgid <
837 rkm->rkm_u.producer.msgid)) {
838 rd_rkb_dbg(rkb, MSG, "PRODUCE",
839 "%.*s [%"PRId32"]: "
840 "Reconstructed MessageSet "
841 "(%d message(s), %"PRIusz" bytes, "
842 "MsgIds %"PRIu64"..%"PRIu64")",
843 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
844 rktp->rktp_partition,
845 msgcnt, len,
846 msetw->msetw_batch->first_msgid,
847 msetw->msetw_batch->last_msgid);
848 break;
849 }
850
851 if (unlikely(msgcnt == msetw->msetw_msgcntmax ||
852 len + rd_kafka_msg_wire_size(rkm, msetw->
853 msetw_MsgVersion) >
854 max_msg_size)) {
855 rd_rkb_dbg(rkb, MSG, "PRODUCE",
856 "%.*s [%"PRId32"]: "
857 "No more space in current MessageSet "
858 "(%i message(s), %"PRIusz" bytes)",
859 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
860 rktp->rktp_partition,
861 msgcnt, len);
862 break;
863 }
864
865 if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
866 /* Stop accumulation when we've reached
867 * a message with a retry backoff in the future */
868 break;
869 }
870
871 /* Move message to buffer's queue */
872 rd_kafka_msgq_deq(rkmq, rkm, 1);
873 rd_kafka_msgq_enq(&msetw->msetw_batch->msgq, rkm);
874
875 msetw->msetw_messages_kvlen += rkm->rkm_len + rkm->rkm_key_len;
876
877 /* Add internal latency metrics */
878 rd_avg_add(&rkb->rkb_avg_int_latency,
879 int_latency_base - rkm->rkm_ts_timeout);
880
881 /* MessageSet v2's .MaxTimestamp field */
882 if (unlikely(MaxTimestamp < rkm->rkm_timestamp))
883 MaxTimestamp = rkm->rkm_timestamp;
884
885 /* Write message to buffer */
886 len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0,
887 NULL);
888
889 rd_dassert(len <= max_msg_size);
890 msgcnt++;
891
892 } while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));
893
894 msetw->msetw_MaxTimestamp = MaxTimestamp;
895
896 /* Idempotent Producer:
897 * When reconstructing a batch to retry make sure
898 * the original message sequence span matches identically
899 * or we can't guarantee exactly-once delivery.
900 * If this check fails we raise a fatal error since
901 * it is unrecoverable and most likely caused by a bug
902 * in the client implementation. */
903 if (msgcnt > 0 && msetw->msetw_batch->last_msgid) {
904 rd_kafka_msg_t *lastmsg;
905
906 lastmsg = rd_kafka_msgq_last(&msetw->msetw_batch->msgq);
907 rd_assert(lastmsg);
908
909 if (unlikely(lastmsg->rkm_u.producer.msgid !=
910 msetw->msetw_batch->last_msgid)) {
911 rd_kafka_set_fatal_error(
912 rkb->rkb_rk,
913 RD_KAFKA_RESP_ERR__INCONSISTENT,
914 "Unable to reconstruct MessageSet "
915 "(currently with %d message(s)) "
916 "with msgid range %"PRIu64"..%"PRIu64": "
917 "last message added has msgid %"PRIu64": "
918 "unable to guarantee consistency",
919 msgcnt,
920 msetw->msetw_batch->first_msgid,
921 msetw->msetw_batch->last_msgid,
922 lastmsg->rkm_u.producer.msgid);
923 return 0;
924 }
925 }
926 return 1;
927}
928
929
930#if WITH_ZLIB
931/**
932 * @brief Compress messageset using gzip/zlib
933 */
934static int
935rd_kafka_msgset_writer_compress_gzip (rd_kafka_msgset_writer_t *msetw,
936 rd_slice_t *slice,
937 struct iovec *ciov) {
938
939 rd_kafka_broker_t *rkb = msetw->msetw_rkb;
940 rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
941 z_stream strm;
942 size_t len = rd_slice_remains(slice);
943 const void *p;
944 size_t rlen;
945 int r;
946 int comp_level =
947 msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
948
949 memset(&strm, 0, sizeof(strm));
950 r = deflateInit2(&strm, comp_level,
951 Z_DEFLATED, 15+16,
952 8, Z_DEFAULT_STRATEGY);
953 if (r != Z_OK) {
954 rd_rkb_log(rkb, LOG_ERR, "GZIP",
955 "Failed to initialize gzip for "
956 "compressing %"PRIusz" bytes in "
957 "topic %.*s [%"PRId32"]: %s (%i): "
958 "sending uncompressed",
959 len,
960 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
961 rktp->rktp_partition,
962 strm.msg ? strm.msg : "", r);
963 return -1;
964 }
965
966 /* Calculate maximum compressed size and
967 * allocate an output buffer accordingly, being
968 * prefixed with the Message header. */
969 ciov->iov_len = deflateBound(&strm, (uLong)rd_slice_remains(slice));
970 ciov->iov_base = rd_malloc(ciov->iov_len);
971
972 strm.next_out = (void *)ciov->iov_base;
973 strm.avail_out = (uInt)ciov->iov_len;
974
975 /* Iterate through each segment and compress it. */
976 while ((rlen = rd_slice_reader(slice, &p))) {
977
978 strm.next_in = (void *)p;
979 strm.avail_in = (uInt)rlen;
980
981 /* Compress message */
982 if ((r = deflate(&strm, Z_NO_FLUSH) != Z_OK)) {
983 rd_rkb_log(rkb, LOG_ERR, "GZIP",
984 "Failed to gzip-compress "
985 "%"PRIusz" bytes (%"PRIusz" total) for "
986 "topic %.*s [%"PRId32"]: "
987 "%s (%i): "
988 "sending uncompressed",
989 rlen, len,
990 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
991 rktp->rktp_partition,
992 strm.msg ? strm.msg : "", r);
993 deflateEnd(&strm);
994 rd_free(ciov->iov_base);
995 return -1;
996 }
997
998 rd_kafka_assert(rkb->rkb_rk, strm.avail_in == 0);
999 }
1000
1001 /* Finish the compression */
1002 if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) {
1003 rd_rkb_log(rkb, LOG_ERR, "GZIP",
1004 "Failed to finish gzip compression "
1005 " of %"PRIusz" bytes for "
1006 "topic %.*s [%"PRId32"]: "
1007 "%s (%i): "
1008 "sending uncompressed",
1009 len,
1010 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1011 rktp->rktp_partition,
1012 strm.msg ? strm.msg : "", r);
1013 deflateEnd(&strm);
1014 rd_free(ciov->iov_base);
1015 return -1;
1016 }
1017
1018 ciov->iov_len = strm.total_out;
1019
1020 /* Deinitialize compression */
1021 deflateEnd(&strm);
1022
1023 return 0;
1024}
1025#endif
1026
1027
1028#if WITH_SNAPPY
1029/**
1030 * @brief Compress messageset using Snappy
1031 */
1032static int
1033rd_kafka_msgset_writer_compress_snappy (rd_kafka_msgset_writer_t *msetw,
1034 rd_slice_t *slice, struct iovec *ciov) {
1035 rd_kafka_broker_t *rkb = msetw->msetw_rkb;
1036 rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
1037 struct iovec *iov;
1038 size_t iov_max, iov_cnt;
1039 struct snappy_env senv;
1040 size_t len = rd_slice_remains(slice);
1041 int r;
1042
1043 /* Initialize snappy compression environment */
1044 rd_kafka_snappy_init_env_sg(&senv, 1/*iov enable*/);
1045
1046 /* Calculate maximum compressed size and
1047 * allocate an output buffer accordingly. */
1048 ciov->iov_len = rd_kafka_snappy_max_compressed_length(len);
1049 ciov->iov_base = rd_malloc(ciov->iov_len);
1050
1051 iov_max = slice->buf->rbuf_segment_cnt;
1052 iov = rd_alloca(sizeof(*iov) * iov_max);
1053
1054 rd_slice_get_iov(slice, iov, &iov_cnt, iov_max, len);
1055
1056 /* Compress each message */
1057 if ((r = rd_kafka_snappy_compress_iov(&senv, iov, iov_cnt, len,
1058 ciov)) != 0) {
1059 rd_rkb_log(rkb, LOG_ERR, "SNAPPY",
1060 "Failed to snappy-compress "
1061 "%"PRIusz" bytes for "
1062 "topic %.*s [%"PRId32"]: %s: "
1063 "sending uncompressed",
1064 len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1065 rktp->rktp_partition,
1066 rd_strerror(-r));
1067 rd_free(ciov->iov_base);
1068 return -1;
1069 }
1070
1071 /* rd_free snappy environment */
1072 rd_kafka_snappy_free_env(&senv);
1073
1074 return 0;
1075}
1076#endif
1077
1078/**
1079 * @brief Compress messageset using LZ4F
1080 */
1081static int
1082rd_kafka_msgset_writer_compress_lz4 (rd_kafka_msgset_writer_t *msetw,
1083 rd_slice_t *slice, struct iovec *ciov) {
1084 rd_kafka_resp_err_t err;
1085 int comp_level =
1086 msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
1087 err = rd_kafka_lz4_compress(msetw->msetw_rkb,
1088 /* Correct or incorrect HC */
1089 msetw->msetw_MsgVersion >= 1 ? 1 : 0,
1090 comp_level,
1091 slice, &ciov->iov_base, &ciov->iov_len);
1092 return (err ? -1 : 0);
1093}
1094
1095#if WITH_ZSTD
1096/**
1097 * @brief Compress messageset using ZSTD
1098 */
1099static int
1100rd_kafka_msgset_writer_compress_zstd (rd_kafka_msgset_writer_t *msetw,
1101 rd_slice_t *slice, struct iovec *ciov) {
1102 rd_kafka_resp_err_t err;
1103 int comp_level =
1104 msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
1105 err = rd_kafka_zstd_compress(msetw->msetw_rkb,
1106 comp_level,
1107 slice, &ciov->iov_base, &ciov->iov_len);
1108 return (err ? -1 : 0);
1109}
1110#endif
1111
1112/**
1113 * @brief Compress the message set.
1114 * @param outlenp in: total uncompressed messages size,
1115 * out (on success): returns the compressed buffer size.
1116 * @returns 0 on success or if -1 if compression failed.
1117 * @remark Compression failures are not critical, we'll just send the
1118 * the messageset uncompressed.
1119 */
1120static int
1121rd_kafka_msgset_writer_compress (rd_kafka_msgset_writer_t *msetw,
1122 size_t *outlenp) {
1123 rd_buf_t *rbuf = &msetw->msetw_rkbuf->rkbuf_buf;
1124 rd_slice_t slice;
1125 size_t len = *outlenp;
1126 struct iovec ciov = RD_ZERO_INIT; /* Compressed output buffer */
1127 int r = -1;
1128 size_t outlen;
1129
1130 rd_assert(rd_buf_len(rbuf) >= msetw->msetw_firstmsg.of + len);
1131
1132 /* Create buffer slice from firstmsg and onwards */
1133 r = rd_slice_init(&slice, rbuf, msetw->msetw_firstmsg.of, len);
1134 rd_assert(r == 0 || !*"invalid firstmsg position");
1135
1136 switch (msetw->msetw_compression)
1137 {
1138#if WITH_ZLIB
1139 case RD_KAFKA_COMPRESSION_GZIP:
1140 r = rd_kafka_msgset_writer_compress_gzip(msetw, &slice, &ciov);
1141 break;
1142#endif
1143
1144#if WITH_SNAPPY
1145 case RD_KAFKA_COMPRESSION_SNAPPY:
1146 r = rd_kafka_msgset_writer_compress_snappy(msetw, &slice,
1147 &ciov);
1148 break;
1149#endif
1150
1151 case RD_KAFKA_COMPRESSION_LZ4:
1152 r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov);
1153 break;
1154
1155#if WITH_ZSTD
1156 case RD_KAFKA_COMPRESSION_ZSTD:
1157 r = rd_kafka_msgset_writer_compress_zstd(msetw, &slice, &ciov);
1158 break;
1159#endif
1160
1161 default:
1162 rd_kafka_assert(NULL,
1163 !*"notreached: unsupported compression.codec");
1164 break;
1165 }
1166
1167 if (r == -1) /* Compression failed, send uncompressed */
1168 return -1;
1169
1170
1171 if (unlikely(ciov.iov_len > len)) {
1172 /* If the compressed data is larger than the uncompressed size
1173 * then throw it away and send as uncompressed. */
1174 rd_free(ciov.iov_base);
1175 return -1;
1176 }
1177
1178 /* Set compression codec in MessageSet.Attributes */
1179 msetw->msetw_Attributes |= msetw->msetw_compression;
1180
1181 /* Rewind rkbuf to the pre-message checkpoint (firstmsg)
1182 * and replace the original message(s) with the compressed payload,
1183 * possibly with version dependent enveloping. */
1184 rd_buf_write_seek(rbuf, msetw->msetw_firstmsg.of);
1185
1186 rd_kafka_assert(msetw->msetw_rkb->rkb_rk, ciov.iov_len < INT32_MAX);
1187
1188 if (msetw->msetw_MsgVersion == 2) {
1189 /* MsgVersion 2 has no inner MessageSet header or wrapping
1190 * for compressed messages, just the messages back-to-back,
1191 * so we can push the compressed memory directly to the
1192 * buffer without wrapping it. */
1193 rd_buf_push(rbuf, ciov.iov_base, ciov.iov_len, rd_free);
1194 outlen = ciov.iov_len;
1195
1196 } else {
1197 /* Older MessageSets envelope/wrap the compressed MessageSet
1198 * in an outer Message. */
1199 rd_kafka_msg_t rkm = {
1200 .rkm_len = ciov.iov_len,
1201 .rkm_payload = ciov.iov_base,
1202 .rkm_timestamp = msetw->msetw_firstmsg.timestamp
1203 };
1204 outlen = rd_kafka_msgset_writer_write_msg(
1205 msetw, &rkm, 0,
1206 msetw->msetw_compression,
1207 rd_free/*free for ciov.iov_base*/);
1208 }
1209
1210 *outlenp = outlen;
1211
1212 return 0;
1213}
1214
1215
1216
1217
1218/**
1219 * @brief Calculate MessageSet v2 CRC (CRC32C) when messageset is complete.
1220 */
1221static void
1222rd_kafka_msgset_writer_calc_crc_v2 (rd_kafka_msgset_writer_t *msetw) {
1223 int32_t crc;
1224 rd_slice_t slice;
1225 int r;
1226
1227 r = rd_slice_init(&slice, &msetw->msetw_rkbuf->rkbuf_buf,
1228 msetw->msetw_of_CRC+4,
1229 rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
1230 msetw->msetw_of_CRC-4);
1231 rd_assert(!r && *"slice_init failed");
1232
1233 /* CRC32C calculation */
1234 crc = rd_slice_crc32c(&slice);
1235
1236 /* Update CRC at MessageSet v2 CRC offset */
1237 rd_kafka_buf_update_i32(msetw->msetw_rkbuf, msetw->msetw_of_CRC, crc);
1238}
1239
1240/**
1241 * @brief Finalize MessageSet v2 header fields.
1242 */
1243static void
1244rd_kafka_msgset_writer_finalize_MessageSet_v2_header (
1245 rd_kafka_msgset_writer_t *msetw) {
1246 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
1247 int msgcnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq);
1248
1249 rd_kafka_assert(NULL, msgcnt > 0);
1250 rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
1251
1252 msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE +
1253 msetw->msetw_messages_len;
1254
1255 /* MessageSet.Length is the same as
1256 * MessageSetSize minus field widths for FirstOffset+Length */
1257 rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
1258 RD_KAFKAP_MSGSET_V2_OF_Length,
1259 (int32_t)msetw->msetw_MessageSetSize - (8+4));
1260
1261 msetw->msetw_Attributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
1262
1263 rd_kafka_buf_update_i16(rkbuf, msetw->msetw_of_start +
1264 RD_KAFKAP_MSGSET_V2_OF_Attributes,
1265 msetw->msetw_Attributes);
1266
1267 rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
1268 RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta,
1269 msgcnt-1);
1270
1271 rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
1272 RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp,
1273 msetw->msetw_firstmsg.timestamp);
1274
1275 rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
1276 RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp,
1277 msetw->msetw_MaxTimestamp);
1278
1279 rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
1280 RD_KAFKAP_MSGSET_V2_OF_BaseSequence,
1281 msetw->msetw_batch->first_seq);
1282
1283 rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
1284 RD_KAFKAP_MSGSET_V2_OF_RecordCount, msgcnt);
1285
1286 rd_kafka_msgset_writer_calc_crc_v2(msetw);
1287}
1288
1289
1290
1291
1292/**
1293 * @brief Finalize the MessageSet header, if applicable.
1294 */
1295static void
1296rd_kafka_msgset_writer_finalize_MessageSet (rd_kafka_msgset_writer_t *msetw) {
1297 rd_dassert(msetw->msetw_messages_len > 0);
1298
1299 if (msetw->msetw_MsgVersion == 2)
1300 rd_kafka_msgset_writer_finalize_MessageSet_v2_header(msetw);
1301 else
1302 msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE +
1303 msetw->msetw_messages_len;
1304
1305 /* Update MessageSetSize */
1306 rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
1307 msetw->msetw_of_MessageSetSize,
1308 (int32_t)msetw->msetw_MessageSetSize);
1309
1310}
1311
1312
1313/**
1314 * @brief Finalize the messageset - call when no more messages are to be
1315 * added to the messageset.
1316 *
1317 * Will compress, update final values, CRCs, etc.
1318 *
1319 * The messageset writer is destroyed and the buffer is returned
1320 * and ready to be transmitted.
1321 *
1322 * @param MessagetSetSizep will be set to the finalized MessageSetSize
1323 *
1324 * @returns the buffer to transmit or NULL if there were no messages
1325 * in messageset.
1326 */
1327static rd_kafka_buf_t *
1328rd_kafka_msgset_writer_finalize (rd_kafka_msgset_writer_t *msetw,
1329 size_t *MessageSetSizep) {
1330 rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
1331 rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
1332 size_t len;
1333 int cnt;
1334
1335 /* No messages added, bail out early. */
1336 if (unlikely((cnt =
1337 rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq)) == 0)) {
1338 rd_kafka_buf_destroy(rkbuf);
1339 return NULL;
1340 }
1341
1342 /* Total size of messages */
1343 len = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
1344 msetw->msetw_firstmsg.of;
1345 rd_assert(len > 0);
1346 rd_assert(len <= (size_t)rktp->rktp_rkt->rkt_rk->rk_conf.max_msg_size);
1347
1348 rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
1349 rd_atomic64_add(&rktp->rktp_c.tx_msg_bytes, msetw->msetw_messages_kvlen);
1350
1351 /* Idempotent Producer:
1352 * Store request's PID for matching on response
1353 * if the instance PID has changed and thus made
1354 * the request obsolete. */
1355 msetw->msetw_rkbuf->rkbuf_u.Produce.batch.pid = msetw->msetw_pid;
1356
1357 /* Compress the message set */
1358 if (msetw->msetw_compression)
1359 rd_kafka_msgset_writer_compress(msetw, &len);
1360
1361 msetw->msetw_messages_len = len;
1362
1363 /* Finalize MessageSet header fields */
1364 rd_kafka_msgset_writer_finalize_MessageSet(msetw);
1365
1366 /* Return final MessageSetSize */
1367 *MessageSetSizep = msetw->msetw_MessageSetSize;
1368
1369 rd_rkb_dbg(msetw->msetw_rkb, MSG, "PRODUCE",
1370 "%s [%"PRId32"]: "
1371 "Produce MessageSet with %i message(s) (%"PRIusz" bytes, "
1372 "ApiVersion %d, MsgVersion %d, MsgId %"PRIu64", "
1373 "BaseSeq %"PRId32", %s)",
1374 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1375 cnt, msetw->msetw_MessageSetSize,
1376 msetw->msetw_ApiVersion, msetw->msetw_MsgVersion,
1377 msetw->msetw_batch->first_msgid,
1378 msetw->msetw_batch->first_seq,
1379 rd_kafka_pid2str(msetw->msetw_pid));
1380
1381 rd_kafka_msgq_verify_order(rktp, &msetw->msetw_batch->msgq,
1382 msetw->msetw_batch->first_msgid, rd_false);
1383
1384 rd_kafka_msgbatch_ready_produce(msetw->msetw_batch);
1385
1386 return rkbuf;
1387}
1388
1389
1390/**
1391 * @brief Create ProduceRequest containing as many messages from
1392 * the toppar's transmit queue as possible, limited by configuration,
1393 * size, etc.
1394 *
1395 * @param rkb broker to create buffer for
1396 * @param rktp toppar to transmit messages for
1397 * @param MessagetSetSizep will be set to the final MessageSetSize
1398 *
1399 * @returns the buffer to transmit or NULL if there were no messages
1400 * in messageset.
1401 *
1402 * @locality broker thread
1403 */
1404rd_kafka_buf_t *
1405rd_kafka_msgset_create_ProduceRequest (rd_kafka_broker_t *rkb,
1406 rd_kafka_toppar_t *rktp,
1407 rd_kafka_msgq_t *rkmq,
1408 const rd_kafka_pid_t pid,
1409 size_t *MessageSetSizep) {
1410
1411 rd_kafka_msgset_writer_t msetw;
1412
1413 if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp, rkmq, pid) == 0)
1414 return NULL;
1415
1416 if (!rd_kafka_msgset_writer_write_msgq(&msetw, msetw.msetw_msgq)) {
1417 /* Error while writing messages to MessageSet,
1418 * move all messages back on the xmit queue. */
1419 rd_kafka_msgq_insert_msgq(
1420 rkmq, &msetw.msetw_batch->msgq,
1421 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
1422 }
1423
1424 return rd_kafka_msgset_writer_finalize(&msetw, MessageSetSizep);
1425}
1426