1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
16 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
17 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
18 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
19 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
20 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
21 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
22 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
23 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
24 | * POSSIBILITY OF SUCH DAMAGE. |
25 | */ |
26 | |
27 | #ifndef _RDKAFKA_MSG_H_ |
28 | #define _RDKAFKA_MSG_H_ |
29 | |
30 | #include "rdsysqueue.h" |
31 | |
32 | #include "rdkafka_proto.h" |
33 | #include "rdkafka_header.h" |
34 | |
35 | |
36 | /** |
37 | * @brief Internal RD_KAFKA_MSG_F_.. flags |
38 | */ |
39 | #define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */ |
40 | |
41 | |
42 | /** |
43 | * @brief Message.MsgAttributes for MsgVersion v0..v1, |
44 | * also used for MessageSet.Attributes for MsgVersion v2. |
45 | */ |
46 | #define RD_KAFKA_MSG_ATTR_GZIP (1 << 0) |
47 | #define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1) |
48 | #define RD_KAFKA_MSG_ATTR_LZ4 (3) |
49 | #define RD_KAFKA_MSG_ATTR_ZSTD (4) |
50 | #define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7 |
51 | #define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3) |
52 | #define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3) |
53 | |
54 | |
55 | /** |
56 | * @brief MessageSet.Attributes for MsgVersion v2 |
57 | * |
58 | * Attributes: |
59 | * ------------------------------------------------------------------------------------------------- |
60 | * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | |
61 | * ------------------------------------------------------------------------------------------------- |
62 | */ |
63 | /* Compression types same as MsgVersion 0 above */ |
64 | /* Timestamp type same as MsgVersion 0 above */ |
65 | #define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4) |
66 | #define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5) |
67 | |
68 | |
69 | typedef struct rd_kafka_msg_s { |
70 | rd_kafka_message_t rkm_rkmessage; /* MUST be first field */ |
71 | #define rkm_len rkm_rkmessage.len |
72 | #define rkm_payload rkm_rkmessage.payload |
73 | #define rkm_opaque rkm_rkmessage._private |
74 | #define rkm_partition rkm_rkmessage.partition |
75 | #define rkm_offset rkm_rkmessage.offset |
76 | #define rkm_key rkm_rkmessage.key |
77 | #define rkm_key_len rkm_rkmessage.key_len |
78 | #define rkm_err rkm_rkmessage.err |
79 | |
80 | TAILQ_ENTRY(rd_kafka_msg_s) rkm_link; |
81 | |
82 | int rkm_flags; |
83 | /* @remark These additional flags must not collide with |
84 | * the RD_KAFKA_MSG_F_* flags in rdkafka.h */ |
85 | #define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */ |
86 | #define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */ |
87 | #define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */ |
88 | |
89 | rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */ |
90 | int64_t rkm_timestamp; /* Message format V1. |
91 | * Meaning of timestamp depends on |
92 | * message Attribute LogAppendtime (broker) |
93 | * or CreateTime (producer). |
94 | * Unit is milliseconds since epoch (UTC).*/ |
95 | |
96 | |
97 | rd_kafka_headers_t *; /**< Parsed headers list, if any. */ |
98 | |
99 | rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in |
100 | * the ProduceResponse handler: |
101 | * this value is always up to date. |
102 | */ |
103 | |
104 | union { |
105 | struct { |
106 | rd_ts_t ts_timeout; /* Message timeout */ |
107 | rd_ts_t ts_enq; /* Enqueue/Produce time */ |
108 | rd_ts_t ts_backoff; /* Backoff next Produce until |
109 | * this time. */ |
110 | uint64_t msgid; /**< Message sequencial id, |
111 | * used to maintain ordering. |
112 | * Starts at 1. */ |
113 | uint64_t last_msgid; /**< On retry this is set |
114 | * on the first message |
115 | * in a batch to point |
116 | * out the last message |
117 | * of the batch so that |
118 | * the batch can be |
119 | * identically reconstructed. |
120 | */ |
121 | int retries; /* Number of retries so far */ |
122 | } producer; |
123 | #define rkm_ts_timeout rkm_u.producer.ts_timeout |
124 | #define rkm_ts_enq rkm_u.producer.ts_enq |
125 | |
126 | struct { |
127 | rd_kafkap_bytes_t binhdrs; /**< Unparsed |
128 | * binary headers in |
129 | * protocol msg */ |
130 | } consumer; |
131 | } rkm_u; |
132 | } rd_kafka_msg_t; |
133 | |
134 | TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s); |
135 | |
136 | |
137 | /** @returns the absolute time a message was enqueued (producer) */ |
138 | #define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq) |
139 | |
140 | /** |
141 | * @returns the message's total maximum on-wire size. |
142 | * @remark Depending on message version (MagicByte) the actual size |
143 | * may be smaller. |
144 | */ |
145 | static RD_INLINE RD_UNUSED |
146 | size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) { |
147 | static const size_t overheads[] = { |
148 | [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD, |
149 | [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD, |
150 | [2] = RD_KAFKAP_MESSAGE_V2_OVERHEAD |
151 | }; |
152 | size_t size; |
153 | rd_dassert(MsgVersion >= 0 && MsgVersion <= 2); |
154 | |
155 | size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len; |
156 | if (MsgVersion == 2 && rkm->rkm_headers) |
157 | size += rd_kafka_headers_serialized_size(rkm->rkm_headers); |
158 | |
159 | return size; |
160 | } |
161 | |
162 | |
163 | /** |
164 | * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t |
165 | * wrapped rd_kafka_message_t. |
166 | */ |
167 | static RD_INLINE RD_UNUSED |
168 | rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) { |
169 | return (rd_kafka_msg_t *)rkmessage; |
170 | } |
171 | |
172 | |
173 | |
174 | |
175 | |
176 | /** |
177 | * @brief Message queue with message and byte counters. |
178 | */ |
179 | TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s); |
180 | typedef struct rd_kafka_msgq_s { |
181 | struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */ |
182 | int32_t rkmq_msg_cnt; |
183 | int64_t rkmq_msg_bytes; |
184 | } rd_kafka_msgq_t; |
185 | |
186 | #define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \ |
187 | { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) } |
188 | |
189 | #define RD_KAFKA_MSGQ_FOREACH(elm,head) \ |
190 | TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link) |
191 | |
192 | /* @brief Check if queue is empty. Proper locks must be held. */ |
193 | #define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs) |
194 | |
195 | /** |
196 | * Returns the number of messages in the specified queue. |
197 | */ |
198 | static RD_INLINE RD_UNUSED |
199 | int rd_kafka_msgq_len (const rd_kafka_msgq_t *rkmq) { |
200 | return (int)rkmq->rkmq_msg_cnt; |
201 | } |
202 | |
203 | /** |
204 | * Returns the total number of bytes in the specified queue. |
205 | */ |
206 | static RD_INLINE RD_UNUSED |
207 | size_t rd_kafka_msgq_size (const rd_kafka_msgq_t *rkmq) { |
208 | return (size_t)rkmq->rkmq_msg_bytes; |
209 | } |
210 | |
211 | |
212 | void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm); |
213 | |
214 | int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition, |
215 | int msgflags, |
216 | char *payload, size_t len, |
217 | const void *keydata, size_t keylen, |
218 | void *msg_opaque); |
219 | |
220 | static RD_INLINE RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) { |
221 | TAILQ_INIT(&rkmq->rkmq_msgs); |
222 | rkmq->rkmq_msg_cnt = 0; |
223 | rkmq->rkmq_msg_bytes = 0; |
224 | } |
225 | |
226 | #if ENABLE_DEVEL |
227 | #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \ |
228 | rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, \ |
229 | rktp, rkmq, exp_first_msgid, gapless) |
230 | #else |
231 | #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \ |
232 | do { } while (0) |
233 | #endif |
234 | |
235 | void rd_kafka_msgq_verify_order0 (const char *function, int line, |
236 | const struct rd_kafka_toppar_s *rktp, |
237 | const rd_kafka_msgq_t *rkmq, |
238 | uint64_t exp_first_msgid, |
239 | rd_bool_t gapless); |
240 | |
241 | |
242 | /** |
243 | * Concat all elements of 'src' onto tail of 'dst'. |
244 | * 'src' will be cleared. |
245 | * Proper locks for 'src' and 'dst' must be held. |
246 | */ |
247 | static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst, |
248 | rd_kafka_msgq_t *src) { |
249 | TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); |
250 | dst->rkmq_msg_cnt += src->rkmq_msg_cnt; |
251 | dst->rkmq_msg_bytes += src->rkmq_msg_bytes; |
252 | rd_kafka_msgq_init(src); |
253 | rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); |
254 | } |
255 | |
256 | /** |
257 | * Move queue 'src' to 'dst' (overwrites dst) |
258 | * Source will be cleared. |
259 | */ |
260 | static RD_INLINE RD_UNUSED void rd_kafka_msgq_move (rd_kafka_msgq_t *dst, |
261 | rd_kafka_msgq_t *src) { |
262 | TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); |
263 | dst->rkmq_msg_cnt = src->rkmq_msg_cnt; |
264 | dst->rkmq_msg_bytes = src->rkmq_msg_bytes; |
265 | rd_kafka_msgq_init(src); |
266 | rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); |
267 | } |
268 | |
269 | |
270 | /** |
271 | * @brief Prepend all elements of \ src onto head of \p dst. |
272 | * \p src will be cleared/re-initialized. |
273 | * |
274 | * @locks proper locks for \p src and \p dst MUST be held. |
275 | */ |
276 | static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend (rd_kafka_msgq_t *dst, |
277 | rd_kafka_msgq_t *src) { |
278 | rd_kafka_msgq_concat(src, dst); |
279 | rd_kafka_msgq_move(dst, src); |
280 | rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); |
281 | } |
282 | |
283 | |
284 | /** |
285 | * rd_free all msgs in msgq and reinitialize the msgq. |
286 | */ |
287 | static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge (rd_kafka_t *rk, |
288 | rd_kafka_msgq_t *rkmq) { |
289 | rd_kafka_msg_t *rkm, *next; |
290 | |
291 | next = TAILQ_FIRST(&rkmq->rkmq_msgs); |
292 | while (next) { |
293 | rkm = next; |
294 | next = TAILQ_NEXT(next, rkm_link); |
295 | |
296 | rd_kafka_msg_destroy(rk, rkm); |
297 | } |
298 | |
299 | rd_kafka_msgq_init(rkmq); |
300 | } |
301 | |
302 | |
303 | /** |
304 | * Remove message from message queue |
305 | */ |
306 | static RD_INLINE RD_UNUSED |
307 | rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq, |
308 | rd_kafka_msg_t *rkm, |
309 | int do_count) { |
310 | if (likely(do_count)) { |
311 | rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0); |
312 | rd_kafka_assert(NULL, rkmq->rkmq_msg_bytes >= |
313 | (int64_t)(rkm->rkm_len+rkm->rkm_key_len)); |
314 | rkmq->rkmq_msg_cnt--; |
315 | rkmq->rkmq_msg_bytes -= rkm->rkm_len+rkm->rkm_key_len; |
316 | } |
317 | |
318 | TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link); |
319 | |
320 | return rkm; |
321 | } |
322 | |
323 | static RD_INLINE RD_UNUSED |
324 | rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) { |
325 | rd_kafka_msg_t *rkm; |
326 | |
327 | if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)))) |
328 | rd_kafka_msgq_deq(rkmq, rkm, 1); |
329 | |
330 | return rkm; |
331 | } |
332 | |
333 | |
334 | /** |
335 | * @returns the first message in the queue, or NULL if empty. |
336 | * |
337 | * @locks caller's responsibility |
338 | */ |
339 | static RD_INLINE RD_UNUSED |
340 | rd_kafka_msg_t *rd_kafka_msgq_first (const rd_kafka_msgq_t *rkmq) { |
341 | return TAILQ_FIRST(&rkmq->rkmq_msgs); |
342 | } |
343 | |
344 | /** |
345 | * @returns the last message in the queue, or NULL if empty. |
346 | * |
347 | * @locks caller's responsibility |
348 | */ |
349 | static RD_INLINE RD_UNUSED |
350 | rd_kafka_msg_t *rd_kafka_msgq_last (const rd_kafka_msgq_t *rkmq) { |
351 | return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s); |
352 | } |
353 | |
354 | |
355 | /** |
356 | * @returns the MsgId of the first message in the queue, or 0 if empty. |
357 | * |
358 | * @locks caller's responsibility |
359 | */ |
360 | static RD_INLINE RD_UNUSED |
361 | uint64_t rd_kafka_msgq_first_msgid (const rd_kafka_msgq_t *rkmq) { |
362 | const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs); |
363 | if (rkm) |
364 | return rkm->rkm_u.producer.msgid; |
365 | else |
366 | return 0; |
367 | } |
368 | |
369 | |
370 | /** |
371 | * @brief Message ordering comparator using the message id |
372 | * number to order messages in ascending order (FIFO). |
373 | */ |
374 | static RD_INLINE |
375 | int rd_kafka_msg_cmp_msgid (const void *_a, const void *_b) { |
376 | const rd_kafka_msg_t *a = _a, *b = _b; |
377 | |
378 | rd_dassert(a->rkm_u.producer.msgid); |
379 | |
380 | if (a->rkm_u.producer.msgid > b->rkm_u.producer.msgid) |
381 | return 1; |
382 | else if (a->rkm_u.producer.msgid < b->rkm_u.producer.msgid) |
383 | return -1; |
384 | else |
385 | return 0; |
386 | } |
387 | |
388 | /** |
389 | * @brief Message ordering comparator using the message id |
390 | * number to order messages in descending order (LIFO). |
391 | */ |
392 | static RD_INLINE |
393 | int rd_kafka_msg_cmp_msgid_lifo (const void *_a, const void *_b) { |
394 | const rd_kafka_msg_t *a = _a, *b = _b; |
395 | |
396 | rd_dassert(a->rkm_u.producer.msgid); |
397 | |
398 | if (a->rkm_u.producer.msgid < b->rkm_u.producer.msgid) |
399 | return 1; |
400 | else if (a->rkm_u.producer.msgid > b->rkm_u.producer.msgid) |
401 | return -1; |
402 | else |
403 | return 0; |
404 | } |
405 | |
406 | |
407 | /** |
408 | * @brief Insert message at its sorted position using the msgid. |
409 | * @remark This is an O(n) operation. |
410 | * @warning The message must have a msgid set. |
411 | * @returns the message count of the queue after enqueuing the message. |
412 | */ |
413 | int |
414 | rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq, |
415 | rd_kafka_msg_t *rkm, |
416 | int (*order_cmp) (const void *, const void *)); |
417 | |
418 | /** |
419 | * @brief Insert message at its sorted position using the msgid. |
420 | * @remark This is an O(n) operation. |
421 | * @warning The message must have a msgid set. |
422 | * @returns the message count of the queue after enqueuing the message. |
423 | */ |
424 | int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt, |
425 | rd_kafka_msgq_t *rkmq, |
426 | rd_kafka_msg_t *rkm); |
427 | |
428 | /** |
429 | * Insert message at head of message queue. |
430 | */ |
431 | static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq, |
432 | rd_kafka_msg_t *rkm) { |
433 | TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link); |
434 | rkmq->rkmq_msg_cnt++; |
435 | rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len; |
436 | } |
437 | |
438 | /** |
439 | * Append message to tail of message queue. |
440 | */ |
441 | static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq, |
442 | rd_kafka_msg_t *rkm) { |
443 | TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link); |
444 | rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len; |
445 | return (int)++rkmq->rkmq_msg_cnt; |
446 | } |
447 | |
448 | |
449 | /** |
450 | * @returns true if the MsgId extents (first, last) in the two queues overlap. |
451 | */ |
452 | static RD_INLINE RD_UNUSED rd_bool_t |
453 | rd_kafka_msgq_overlap (const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) { |
454 | const rd_kafka_msg_t *fa, *la, *fb, *lb; |
455 | |
456 | if (RD_KAFKA_MSGQ_EMPTY(a) || |
457 | RD_KAFKA_MSGQ_EMPTY(b)) |
458 | return rd_false; |
459 | |
460 | fa = rd_kafka_msgq_first(a); |
461 | fb = rd_kafka_msgq_first(b); |
462 | la = rd_kafka_msgq_last(a); |
463 | lb = rd_kafka_msgq_last(b); |
464 | |
465 | return (rd_bool_t) |
466 | (fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid && |
467 | fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid); |
468 | } |
469 | |
470 | /** |
471 | * Scans a message queue for timed out messages and removes them from |
472 | * 'rkmq' and adds them to 'timedout', returning the number of timed out |
473 | * messages. |
474 | * 'timedout' must be initialized. |
475 | */ |
476 | int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp, |
477 | rd_kafka_msgq_t *rkmq, |
478 | rd_kafka_msgq_t *timedout, |
479 | rd_ts_t now); |
480 | |
481 | rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq, |
482 | const rd_kafka_msg_t *rkm, |
483 | int (*cmp) (const void *, |
484 | const void *)); |
485 | |
486 | void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq, |
487 | int64_t base_offset, int64_t timestamp, |
488 | rd_kafka_msg_status_t status); |
489 | |
490 | void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src, |
491 | uint64_t last_msgid, |
492 | rd_kafka_msg_status_t status); |
493 | |
494 | int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm, |
495 | int do_lock); |
496 | |
497 | |
498 | rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko); |
499 | rd_kafka_message_t *rd_kafka_message_get_from_rkm (struct rd_kafka_op_s *rko, |
500 | rd_kafka_msg_t *rkm); |
501 | rd_kafka_message_t *rd_kafka_message_new (void); |
502 | |
503 | |
504 | /** |
505 | * @returns a (possibly) wrapped Kafka protocol message sequence counter |
506 | * for the non-overflowing \p seq. |
507 | */ |
508 | static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap (int64_t seq) { |
509 | return (int32_t)(seq & (int64_t)INT32_MAX); |
510 | } |
511 | |
512 | void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq); |
513 | |
514 | rd_kafka_msg_t *ut_rd_kafka_msg_new (void); |
515 | void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq); |
516 | int unittest_msg (void); |
517 | |
518 | #endif /* _RDKAFKA_MSG_H_ */ |
519 | |