1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
18 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 */
26
27#ifndef _RDKAFKA_MSG_H_
28#define _RDKAFKA_MSG_H_
29
30#include "rdsysqueue.h"
31
32#include "rdkafka_proto.h"
33#include "rdkafka_header.h"
34
35
36/**
37 * @brief Internal RD_KAFKA_MSG_F_.. flags
38 */
39#define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */
40
41
42/**
43 * @brief Message.MsgAttributes for MsgVersion v0..v1,
44 * also used for MessageSet.Attributes for MsgVersion v2.
45 */
46#define RD_KAFKA_MSG_ATTR_GZIP (1 << 0)
47#define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1)
48#define RD_KAFKA_MSG_ATTR_LZ4 (3)
49#define RD_KAFKA_MSG_ATTR_ZSTD (4)
50#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7
51#define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3)
52#define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3)
53
54
55/**
56 * @brief MessageSet.Attributes for MsgVersion v2
57 *
58 * Attributes:
59 * -------------------------------------------------------------------------------------------------
60 * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
61 * -------------------------------------------------------------------------------------------------
62 */
63/* Compression types same as MsgVersion 0 above */
64/* Timestamp type same as MsgVersion 0 above */
65#define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4)
66#define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5)
67
68
69typedef struct rd_kafka_msg_s {
70 rd_kafka_message_t rkm_rkmessage; /* MUST be first field */
71#define rkm_len rkm_rkmessage.len
72#define rkm_payload rkm_rkmessage.payload
73#define rkm_opaque rkm_rkmessage._private
74#define rkm_partition rkm_rkmessage.partition
75#define rkm_offset rkm_rkmessage.offset
76#define rkm_key rkm_rkmessage.key
77#define rkm_key_len rkm_rkmessage.key_len
78#define rkm_err rkm_rkmessage.err
79
80 TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;
81
82 int rkm_flags;
83 /* @remark These additional flags must not collide with
84 * the RD_KAFKA_MSG_F_* flags in rdkafka.h */
85#define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */
86#define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */
87#define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */
88
89 rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */
90 int64_t rkm_timestamp; /* Message format V1.
91 * Meaning of timestamp depends on
92 * message Attribute LogAppendtime (broker)
93 * or CreateTime (producer).
94 * Unit is milliseconds since epoch (UTC).*/
95
96
97 rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */
98
99 rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in
100 * the ProduceResponse handler:
101 * this value is always up to date.
102 */
103
104 union {
105 struct {
106 rd_ts_t ts_timeout; /* Message timeout */
107 rd_ts_t ts_enq; /* Enqueue/Produce time */
108 rd_ts_t ts_backoff; /* Backoff next Produce until
109 * this time. */
110 uint64_t msgid; /**< Message sequencial id,
111 * used to maintain ordering.
112 * Starts at 1. */
113 uint64_t last_msgid; /**< On retry this is set
114 * on the first message
115 * in a batch to point
116 * out the last message
117 * of the batch so that
118 * the batch can be
119 * identically reconstructed.
120 */
121 int retries; /* Number of retries so far */
122 } producer;
123#define rkm_ts_timeout rkm_u.producer.ts_timeout
124#define rkm_ts_enq rkm_u.producer.ts_enq
125
126 struct {
127 rd_kafkap_bytes_t binhdrs; /**< Unparsed
128 * binary headers in
129 * protocol msg */
130 } consumer;
131 } rkm_u;
132} rd_kafka_msg_t;
133
134TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s);
135
136
137/** @returns the absolute time a message was enqueued (producer) */
138#define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq)
139
140/**
141 * @returns the message's total maximum on-wire size.
142 * @remark Depending on message version (MagicByte) the actual size
143 * may be smaller.
144 */
145static RD_INLINE RD_UNUSED
146size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) {
147 static const size_t overheads[] = {
148 [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD,
149 [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD,
150 [2] = RD_KAFKAP_MESSAGE_V2_OVERHEAD
151 };
152 size_t size;
153 rd_dassert(MsgVersion >= 0 && MsgVersion <= 2);
154
155 size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len;
156 if (MsgVersion == 2 && rkm->rkm_headers)
157 size += rd_kafka_headers_serialized_size(rkm->rkm_headers);
158
159 return size;
160}
161
162
163/**
164 * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t
165 * wrapped rd_kafka_message_t.
166 */
167static RD_INLINE RD_UNUSED
168rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
169 return (rd_kafka_msg_t *)rkmessage;
170}
171
172
173
174
175
176/**
177 * @brief Message queue with message and byte counters.
178 */
179TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
180typedef struct rd_kafka_msgq_s {
181 struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
182 int32_t rkmq_msg_cnt;
183 int64_t rkmq_msg_bytes;
184} rd_kafka_msgq_t;
185
186#define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \
187 { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) }
188
189#define RD_KAFKA_MSGQ_FOREACH(elm,head) \
190 TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)
191
192/* @brief Check if queue is empty. Proper locks must be held. */
193#define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs)
194
195/**
196 * Returns the number of messages in the specified queue.
197 */
198static RD_INLINE RD_UNUSED
199int rd_kafka_msgq_len (const rd_kafka_msgq_t *rkmq) {
200 return (int)rkmq->rkmq_msg_cnt;
201}
202
203/**
204 * Returns the total number of bytes in the specified queue.
205 */
206static RD_INLINE RD_UNUSED
207size_t rd_kafka_msgq_size (const rd_kafka_msgq_t *rkmq) {
208 return (size_t)rkmq->rkmq_msg_bytes;
209}
210
211
212void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm);
213
214int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
215 int msgflags,
216 char *payload, size_t len,
217 const void *keydata, size_t keylen,
218 void *msg_opaque);
219
220static RD_INLINE RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) {
221 TAILQ_INIT(&rkmq->rkmq_msgs);
222 rkmq->rkmq_msg_cnt = 0;
223 rkmq->rkmq_msg_bytes = 0;
224}
225
226#if ENABLE_DEVEL
227#define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \
228 rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, \
229 rktp, rkmq, exp_first_msgid, gapless)
230#else
231#define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \
232 do { } while (0)
233#endif
234
235void rd_kafka_msgq_verify_order0 (const char *function, int line,
236 const struct rd_kafka_toppar_s *rktp,
237 const rd_kafka_msgq_t *rkmq,
238 uint64_t exp_first_msgid,
239 rd_bool_t gapless);
240
241
242/**
243 * Concat all elements of 'src' onto tail of 'dst'.
244 * 'src' will be cleared.
245 * Proper locks for 'src' and 'dst' must be held.
246 */
247static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
248 rd_kafka_msgq_t *src) {
249 TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
250 dst->rkmq_msg_cnt += src->rkmq_msg_cnt;
251 dst->rkmq_msg_bytes += src->rkmq_msg_bytes;
252 rd_kafka_msgq_init(src);
253 rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
254}
255
256/**
257 * Move queue 'src' to 'dst' (overwrites dst)
258 * Source will be cleared.
259 */
260static RD_INLINE RD_UNUSED void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
261 rd_kafka_msgq_t *src) {
262 TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
263 dst->rkmq_msg_cnt = src->rkmq_msg_cnt;
264 dst->rkmq_msg_bytes = src->rkmq_msg_bytes;
265 rd_kafka_msgq_init(src);
266 rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
267}
268
269
270/**
271 * @brief Prepend all elements of \ src onto head of \p dst.
272 * \p src will be cleared/re-initialized.
273 *
274 * @locks proper locks for \p src and \p dst MUST be held.
275 */
276static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend (rd_kafka_msgq_t *dst,
277 rd_kafka_msgq_t *src) {
278 rd_kafka_msgq_concat(src, dst);
279 rd_kafka_msgq_move(dst, src);
280 rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
281}
282
283
284/**
285 * rd_free all msgs in msgq and reinitialize the msgq.
286 */
287static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge (rd_kafka_t *rk,
288 rd_kafka_msgq_t *rkmq) {
289 rd_kafka_msg_t *rkm, *next;
290
291 next = TAILQ_FIRST(&rkmq->rkmq_msgs);
292 while (next) {
293 rkm = next;
294 next = TAILQ_NEXT(next, rkm_link);
295
296 rd_kafka_msg_destroy(rk, rkm);
297 }
298
299 rd_kafka_msgq_init(rkmq);
300}
301
302
303/**
304 * Remove message from message queue
305 */
306static RD_INLINE RD_UNUSED
307rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
308 rd_kafka_msg_t *rkm,
309 int do_count) {
310 if (likely(do_count)) {
311 rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0);
312 rd_kafka_assert(NULL, rkmq->rkmq_msg_bytes >=
313 (int64_t)(rkm->rkm_len+rkm->rkm_key_len));
314 rkmq->rkmq_msg_cnt--;
315 rkmq->rkmq_msg_bytes -= rkm->rkm_len+rkm->rkm_key_len;
316 }
317
318 TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link);
319
320 return rkm;
321}
322
323static RD_INLINE RD_UNUSED
324rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
325 rd_kafka_msg_t *rkm;
326
327 if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
328 rd_kafka_msgq_deq(rkmq, rkm, 1);
329
330 return rkm;
331}
332
333
334/**
335 * @returns the first message in the queue, or NULL if empty.
336 *
337 * @locks caller's responsibility
338 */
339static RD_INLINE RD_UNUSED
340rd_kafka_msg_t *rd_kafka_msgq_first (const rd_kafka_msgq_t *rkmq) {
341 return TAILQ_FIRST(&rkmq->rkmq_msgs);
342}
343
344/**
345 * @returns the last message in the queue, or NULL if empty.
346 *
347 * @locks caller's responsibility
348 */
349static RD_INLINE RD_UNUSED
350rd_kafka_msg_t *rd_kafka_msgq_last (const rd_kafka_msgq_t *rkmq) {
351 return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s);
352}
353
354
355/**
356 * @returns the MsgId of the first message in the queue, or 0 if empty.
357 *
358 * @locks caller's responsibility
359 */
360static RD_INLINE RD_UNUSED
361uint64_t rd_kafka_msgq_first_msgid (const rd_kafka_msgq_t *rkmq) {
362 const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
363 if (rkm)
364 return rkm->rkm_u.producer.msgid;
365 else
366 return 0;
367}
368
369
370/**
371 * @brief Message ordering comparator using the message id
372 * number to order messages in ascending order (FIFO).
373 */
374static RD_INLINE
375int rd_kafka_msg_cmp_msgid (const void *_a, const void *_b) {
376 const rd_kafka_msg_t *a = _a, *b = _b;
377
378 rd_dassert(a->rkm_u.producer.msgid);
379
380 if (a->rkm_u.producer.msgid > b->rkm_u.producer.msgid)
381 return 1;
382 else if (a->rkm_u.producer.msgid < b->rkm_u.producer.msgid)
383 return -1;
384 else
385 return 0;
386}
387
388/**
389 * @brief Message ordering comparator using the message id
390 * number to order messages in descending order (LIFO).
391 */
392static RD_INLINE
393int rd_kafka_msg_cmp_msgid_lifo (const void *_a, const void *_b) {
394 const rd_kafka_msg_t *a = _a, *b = _b;
395
396 rd_dassert(a->rkm_u.producer.msgid);
397
398 if (a->rkm_u.producer.msgid < b->rkm_u.producer.msgid)
399 return 1;
400 else if (a->rkm_u.producer.msgid > b->rkm_u.producer.msgid)
401 return -1;
402 else
403 return 0;
404}
405
406
407/**
408 * @brief Insert message at its sorted position using the msgid.
409 * @remark This is an O(n) operation.
410 * @warning The message must have a msgid set.
411 * @returns the message count of the queue after enqueuing the message.
412 */
413int
414rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
415 rd_kafka_msg_t *rkm,
416 int (*order_cmp) (const void *, const void *));
417
418/**
419 * @brief Insert message at its sorted position using the msgid.
420 * @remark This is an O(n) operation.
421 * @warning The message must have a msgid set.
422 * @returns the message count of the queue after enqueuing the message.
423 */
424int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
425 rd_kafka_msgq_t *rkmq,
426 rd_kafka_msg_t *rkm);
427
428/**
429 * Insert message at head of message queue.
430 */
431static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,
432 rd_kafka_msg_t *rkm) {
433 TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link);
434 rkmq->rkmq_msg_cnt++;
435 rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
436}
437
438/**
439 * Append message to tail of message queue.
440 */
441static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
442 rd_kafka_msg_t *rkm) {
443 TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
444 rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
445 return (int)++rkmq->rkmq_msg_cnt;
446}
447
448
449/**
450 * @returns true if the MsgId extents (first, last) in the two queues overlap.
451 */
452static RD_INLINE RD_UNUSED rd_bool_t
453rd_kafka_msgq_overlap (const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) {
454 const rd_kafka_msg_t *fa, *la, *fb, *lb;
455
456 if (RD_KAFKA_MSGQ_EMPTY(a) ||
457 RD_KAFKA_MSGQ_EMPTY(b))
458 return rd_false;
459
460 fa = rd_kafka_msgq_first(a);
461 fb = rd_kafka_msgq_first(b);
462 la = rd_kafka_msgq_last(a);
463 lb = rd_kafka_msgq_last(b);
464
465 return (rd_bool_t)
466 (fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid &&
467 fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid);
468}
469
470/**
471 * Scans a message queue for timed out messages and removes them from
472 * 'rkmq' and adds them to 'timedout', returning the number of timed out
473 * messages.
474 * 'timedout' must be initialized.
475 */
476int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp,
477 rd_kafka_msgq_t *rkmq,
478 rd_kafka_msgq_t *timedout,
479 rd_ts_t now);
480
481rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq,
482 const rd_kafka_msg_t *rkm,
483 int (*cmp) (const void *,
484 const void *));
485
486void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq,
487 int64_t base_offset, int64_t timestamp,
488 rd_kafka_msg_status_t status);
489
490void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src,
491 uint64_t last_msgid,
492 rd_kafka_msg_status_t status);
493
494int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
495 int do_lock);
496
497
498rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
499rd_kafka_message_t *rd_kafka_message_get_from_rkm (struct rd_kafka_op_s *rko,
500 rd_kafka_msg_t *rkm);
501rd_kafka_message_t *rd_kafka_message_new (void);
502
503
504/**
505 * @returns a (possibly) wrapped Kafka protocol message sequence counter
506 * for the non-overflowing \p seq.
507 */
508static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap (int64_t seq) {
509 return (int32_t)(seq & (int64_t)INT32_MAX);
510}
511
512void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq);
513
514rd_kafka_msg_t *ut_rd_kafka_msg_new (void);
515void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq);
516int unittest_msg (void);
517
518#endif /* _RDKAFKA_MSG_H_ */
519