1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDKAFKA_PROTO_H_
30#define _RDKAFKA_PROTO_H_
31
32
33#include "rdendian.h"
34#include "rdvarint.h"
35
36
37
38/*
39 * Kafka protocol definitions.
40 */
41
42#define RD_KAFKA_PORT 9092
43#define RD_KAFKA_PORT_STR "9092"
44
45
46/**
47 * Request types
48 */
49struct rd_kafkap_reqhdr {
50 int32_t Size;
51 int16_t ApiKey;
52#define RD_KAFKAP_None -1
53#define RD_KAFKAP_Produce 0
54#define RD_KAFKAP_Fetch 1
55#define RD_KAFKAP_Offset 2
56#define RD_KAFKAP_Metadata 3
57#define RD_KAFKAP_LeaderAndIsr 4
58#define RD_KAFKAP_StopReplica 5
59#define RD_KAFKAP_UpdateMetadata 6
60#define RD_KAFKAP_ControlledShutdown 7
61#define RD_KAFKAP_OffsetCommit 8
62#define RD_KAFKAP_OffsetFetch 9
63#define RD_KAFKAP_GroupCoordinator 10
64#define RD_KAFKAP_JoinGroup 11
65#define RD_KAFKAP_Heartbeat 12
66#define RD_KAFKAP_LeaveGroup 13
67#define RD_KAFKAP_SyncGroup 14
68#define RD_KAFKAP_DescribeGroups 15
69#define RD_KAFKAP_ListGroups 16
70#define RD_KAFKAP_SaslHandshake 17
71#define RD_KAFKAP_ApiVersion 18
72#define RD_KAFKAP_CreateTopics 19
73#define RD_KAFKAP_DeleteTopics 20
74#define RD_KAFKAP_DeleteRecords 21
75#define RD_KAFKAP_InitProducerId 22
76#define RD_KAFKAP_OffsetForLeaderEpoch 23
77#define RD_KAFKAP_AddPartitionsToTxn 24
78#define RD_KAFKAP_AddOffsetsToTxn 25
79#define RD_KAFKAP_EndTxn 26
80#define RD_KAFKAP_WriteTxnMarkers 27
81#define RD_KAFKAP_TxnOffsetCommit 28
82#define RD_KAFKAP_DescribeAcls 29
83#define RD_KAFKAP_CreateAcls 30
84#define RD_KAFKAP_DeleteAcls 31
85#define RD_KAFKAP_DescribeConfigs 32
86#define RD_KAFKAP_AlterConfigs 33
87#define RD_KAFKAP_AlterReplicaLogDirs 34
88#define RD_KAFKAP_DescribeLogDirs 35
89#define RD_KAFKAP_SaslAuthenticate 36
90#define RD_KAFKAP_CreatePartitions 37
91#define RD_KAFKAP_CreateDelegationToken 38
92#define RD_KAFKAP_RenewDelegationToken 39
93#define RD_KAFKAP_ExpireDelegationToken 40
94#define RD_KAFKAP_DescribeDelegationToken 41
95#define RD_KAFKAP_DeleteGroups 42
96#define RD_KAFKAP__NUM 43
97 int16_t ApiVersion;
98 int32_t CorrId;
99 /* ClientId follows */
100};
101
102#define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
103#define RD_KAFKAP_RESHDR_SIZE (4+4)
104
105/**
106 * Response header
107 */
108struct rd_kafkap_reshdr {
109 int32_t Size;
110 int32_t CorrId;
111};
112
113
114
115static RD_UNUSED
116const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
117 static const char *names[] = {
118 [RD_KAFKAP_Produce] = "Produce",
119 [RD_KAFKAP_Fetch] = "Fetch",
120 [RD_KAFKAP_Offset] = "Offset",
121 [RD_KAFKAP_Metadata] = "Metadata",
122 [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
123 [RD_KAFKAP_StopReplica] = "StopReplica",
124 [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata",
125 [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown",
126 [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
127 [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
128 [RD_KAFKAP_GroupCoordinator] = "GroupCoordinator",
129 [RD_KAFKAP_JoinGroup] = "JoinGroup",
130 [RD_KAFKAP_Heartbeat] = "Heartbeat",
131 [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
132 [RD_KAFKAP_SyncGroup] = "SyncGroup",
133 [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
134 [RD_KAFKAP_ListGroups] = "ListGroups",
135 [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
136 [RD_KAFKAP_ApiVersion] = "ApiVersion",
137 [RD_KAFKAP_CreateTopics] = "CreateTopics",
138 [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
139 [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
140 [RD_KAFKAP_InitProducerId] = "InitProducerId",
141 [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
142 [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
143 [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
144 [RD_KAFKAP_EndTxn] = "EndTxn",
145 [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
146 [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
147 [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
148 [RD_KAFKAP_CreateAcls] = "CreateAcls",
149 [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
150 [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
151 [RD_KAFKAP_AlterConfigs] = "AlterConfigs",
152 [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs",
153 [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs",
154 [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate",
155 [RD_KAFKAP_CreatePartitions] = "CreatePartitions",
156 [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken",
157 [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken",
158 [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken",
159 [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken",
160 [RD_KAFKAP_DeleteGroups] = "DeleteGroups"
161
162 };
163 static RD_TLS char ret[32];
164
165 if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) ||
166 !names[ApiKey]) {
167 rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
168 return ret;
169 }
170
171 return names[ApiKey];
172}
173
174
175
176
177
178
179
180
181/**
182 * @brief ApiKey version support tuple.
183 */
184struct rd_kafka_ApiVersion {
185 int16_t ApiKey;
186 int16_t MinVer;
187 int16_t MaxVer;
188};
189
190/**
191 * @brief ApiVersion.ApiKey comparator.
192 */
193static RD_UNUSED int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
194 const struct rd_kafka_ApiVersion *a = _a, *b = _b;
195
196 return a->ApiKey - b->ApiKey;
197}
198
199
200
201#define RD_KAFKAP_READ_UNCOMMITTED 0
202#define RD_KAFKAP_READ_COMMITTED 1
203
204
205/**
206 *
207 * Kafka protocol string representation prefixed with a convenience header
208 *
209 * Serialized format:
210 * { uint16, data.. }
211 *
212 */
213typedef struct rd_kafkap_str_s {
214 /* convenience header (aligned access, host endian) */
215 int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
216 const char *str; /* points into data[] or other memory,
217 * not NULL-terminated */
218} rd_kafkap_str_t;
219
220
221#define RD_KAFKAP_STR_LEN_NULL -1
222#define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
223
224/* Returns the length of the string of a kafka protocol string representation */
225#define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
226#define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
227
228/* Returns the actual size of a kafka protocol string representation. */
229#define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
230#define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
231
232
233/* Serialized Kafka string: only works for _new() kstrs */
234#define RD_KAFKAP_STR_SER(kstr) ((kstr)+1)
235
236/* Macro suitable for "%.*s" printing. */
237#define RD_KAFKAP_STR_PR(kstr) \
238 (int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
239 (kstr)->str
240
241/* strndupa() a Kafka string */
242#define RD_KAFKAP_STR_DUPA(destptr,kstr) \
243 rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
244
245/* strndup() a Kafka string */
246#define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
247
248#define RD_KAFKAP_STR_INITIALIZER { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL }
249
250/**
251 * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
252 */
253static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
254 rd_free(kstr);
255}
256
257
258
259/**
260 * Allocate a new Kafka string and make a copy of 'str'.
261 * If 'len' is -1 the length will be calculated.
262 * Supports Kafka NULL strings.
263 * Nul-terminates the string, but the trailing \0 is not part of
264 * the serialized string.
265 */
266static RD_INLINE RD_UNUSED
267rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
268 rd_kafkap_str_t *kstr;
269 int16_t klen;
270
271 if (!str)
272 len = RD_KAFKAP_STR_LEN_NULL;
273 else if (len == -1)
274 len = str ? (int)strlen(str) : RD_KAFKAP_STR_LEN_NULL;
275
276 kstr = rd_malloc(sizeof(*kstr) + 2 +
277 (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
278 kstr->len = len;
279
280 /* Serialised format: 16-bit string length */
281 klen = htobe16(len);
282 memcpy(kstr+1, &klen, 2);
283
284 /* Serialised format: non null-terminated string */
285 if (len == RD_KAFKAP_STR_LEN_NULL)
286 kstr->str = NULL;
287 else {
288 kstr->str = ((const char *)(kstr+1))+2;
289 memcpy((void *)kstr->str, str, len);
290 ((char *)kstr->str)[len] = '\0';
291 }
292
293 return kstr;
294}
295
296
297/**
298 * Makes a copy of `src`. The copy will be fully allocated and should
299 * be freed with rd_kafka_pstr_destroy()
300 */
301static RD_INLINE RD_UNUSED
302rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
303 return rd_kafkap_str_new(src->str, src->len);
304}
305
306static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
307 const rd_kafkap_str_t *b) {
308 int minlen = RD_MIN(a->len, b->len);
309 int r = memcmp(a->str, b->str, minlen);
310 if (r)
311 return r;
312 else
313 return a->len - b->len;
314}
315
316static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
317 const char *str) {
318 int len = (int)strlen(str);
319 int minlen = RD_MIN(a->len, len);
320 int r = memcmp(a->str, str, minlen);
321 if (r)
322 return r;
323 else
324 return a->len - len;
325}
326
327static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
328 const rd_kafkap_str_t *b){
329 int len = (int)strlen(str);
330 int minlen = RD_MIN(b->len, len);
331 int r = memcmp(str, b->str, minlen);
332 if (r)
333 return r;
334 else
335 return len - b->len;
336}
337
338
339
340/**
341 *
342 * Kafka protocol bytes array representation prefixed with a convenience header
343 *
344 * Serialized format:
345 * { uint32, data.. }
346 *
347 */
348typedef struct rd_kafkap_bytes_s {
349 /* convenience header (aligned access, host endian) */
350 int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
351 const void *data; /* points just past the struct, or other memory,
352 * not NULL-terminated */
353 const char _data[1]; /* Bytes following struct when new()ed */
354} rd_kafkap_bytes_t;
355
356
357#define RD_KAFKAP_BYTES_LEN_NULL -1
358#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
359 ((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
360
361/* Returns the length of the bytes of a kafka protocol bytes representation */
362#define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
363#define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
364
365/* Returns the actual size of a kafka protocol bytes representation. */
366#define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
367#define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
368
369
370/* Serialized Kafka bytes: only works for _new() kbytes */
371#define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes)+1)
372
373
374/**
375 * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
376 */
377static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
378 rd_free(kbytes);
379}
380
381
382/**
383 * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
384 * If \p len > 0 but \p bytes is NULL no copying is performed by
385 * the bytes structure will be allocated to fit \p size bytes.
386 *
387 * Supports:
388 * - Kafka NULL bytes (bytes==NULL,len==0),
389 * - Empty bytes (bytes!=NULL,len==0)
390 * - Copy data (bytes!=NULL,len>0)
391 * - No-copy, just alloc (bytes==NULL,len>0)
392 */
393static RD_INLINE RD_UNUSED
394rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
395 rd_kafkap_bytes_t *kbytes;
396 int32_t klen;
397
398 if (!bytes && !len)
399 len = RD_KAFKAP_BYTES_LEN_NULL;
400
401 kbytes = rd_malloc(sizeof(*kbytes) + 4 +
402 (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
403 kbytes->len = len;
404
405 klen = htobe32(len);
406 memcpy(kbytes+1, &klen, 4);
407
408 if (len == RD_KAFKAP_BYTES_LEN_NULL)
409 kbytes->data = NULL;
410 else {
411 kbytes->data = ((const char *)(kbytes+1))+4;
412 if (bytes)
413 memcpy((void *)kbytes->data, bytes, len);
414 }
415
416 return kbytes;
417}
418
419
420/**
421 * Makes a copy of `src`. The copy will be fully allocated and should
422 * be freed with rd_kafkap_bytes_destroy()
423 */
424static RD_INLINE RD_UNUSED
425rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
426 return rd_kafkap_bytes_new(src->data, src->len);
427}
428
429
430static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
431 const rd_kafkap_bytes_t *b) {
432 int minlen = RD_MIN(a->len, b->len);
433 int r = memcmp(a->data, b->data, minlen);
434 if (r)
435 return r;
436 else
437 return a->len - b->len;
438}
439
440static RD_INLINE RD_UNUSED
441int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
442 const char *data, int len) {
443 int minlen = RD_MIN(a->len, len);
444 int r = memcmp(a->data, data, minlen);
445 if (r)
446 return r;
447 else
448 return a->len - len;
449}
450
451
452
453
454typedef struct rd_kafka_buf_s rd_kafka_buf_t;
455
456
457#define RD_KAFKA_NODENAME_SIZE 256
458
459
460
461
462/**
463 * @brief Message overheads (worst-case)
464 */
465
466/**
467 * MsgVersion v0..v1
468 */
469/* Offset + MessageSize */
470#define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
471/* CRC + Magic + Attr + KeyLen + ValueLen */
472#define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4+1+1+4+4)
473/* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
474#define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4+1+1+8+4+4)
475/* Maximum per-message overhead */
476#define RD_KAFKAP_MESSAGE_V0_OVERHEAD \
477 (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
478#define RD_KAFKAP_MESSAGE_V1_OVERHEAD \
479 (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
480
481/**
482 * MsgVersion v2
483 */
484#define RD_KAFKAP_MESSAGE_V2_OVERHEAD \
485 ( \
486 /* Length (varint) */ \
487 RD_UVARINT_ENC_SIZEOF(int32_t) + \
488 /* Attributes */ \
489 1 + \
490 /* TimestampDelta (varint) */ \
491 RD_UVARINT_ENC_SIZEOF(int64_t) + \
492 /* OffsetDelta (varint) */ \
493 RD_UVARINT_ENC_SIZEOF(int32_t) + \
494 /* KeyLen (varint) */ \
495 RD_UVARINT_ENC_SIZEOF(int32_t) + \
496 /* ValueLen (varint) */ \
497 RD_UVARINT_ENC_SIZEOF(int32_t) + \
498 /* HeaderCnt (varint): */ \
499 RD_UVARINT_ENC_SIZEOF(int32_t) \
500 )
501
502
503
504/**
505 * @brief MessageSets are not explicitly versioned but depends on the
506 * Produce/Fetch API version and the encompassed Message versions.
507 * We use the Message version (MsgVersion, aka MagicByte) to describe
508 * the MessageSet version, that is, MsgVersion <= 1 uses the old
509 * MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
510 */
511
512/* Old MessageSet header: none */
513#define RD_KAFKAP_MSGSET_V0_SIZE 0
514
515/* MessageSet v2 header */
516#define RD_KAFKAP_MSGSET_V2_SIZE (8+4+4+1+4+2+4+8+8+8+2+4+4)
517
518/* Byte offsets for MessageSet fields */
519#define RD_KAFKAP_MSGSET_V2_OF_Length (8)
520#define RD_KAFKAP_MSGSET_V2_OF_CRC (8+4+4+1)
521#define RD_KAFKAP_MSGSET_V2_OF_Attributes (8+4+4+1+4)
522#define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8+4+4+1+4+2)
523#define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8+4+4+1+4+2+4)
524#define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8+4+4+1+4+2+4+8)
525#define RD_KAFKAP_MSGSET_V2_OF_BaseSequence (8+4+4+1+4+2+4+8+8+8+2)
526#define RD_KAFKAP_MSGSET_V2_OF_RecordCount (8+4+4+1+4+2+4+8+8+8+2+4)
527
528
529
530
531/**
532 * @name Producer ID and Epoch for the Idempotent Producer
533 * @{
534 *
535 */
536
537/**
538 * @brief Producer ID and Epoch
539 */
540typedef struct rd_kafka_pid_s {
541 int64_t id; /**< Producer Id */
542 int16_t epoch; /**< Producer Epoch */
543} rd_kafka_pid_t;
544
545#define RD_KAFKA_PID_INITIALIZER {-1,-1}
546
547/**
548 * @returns true if \p PID is valid
549 */
550#define rd_kafka_pid_valid(PID) ((PID).id != -1)
551
552/**
553 * @brief Check two pids for equality
554 */
555static RD_UNUSED RD_INLINE int rd_kafka_pid_eq (const rd_kafka_pid_t a,
556 const rd_kafka_pid_t b) {
557 return a.id == b.id && a.epoch == b.epoch;
558}
559
560/**
561 * @returns the string representation of a PID in a thread-safe
562 * static buffer.
563 */
564static RD_UNUSED const char *
565rd_kafka_pid2str (const rd_kafka_pid_t pid) {
566 static RD_TLS char buf[2][64];
567 static RD_TLS int i;
568
569 if (!rd_kafka_pid_valid(pid))
570 return "PID{Invalid}";
571
572 i = (i + 1) % 2;
573
574 rd_snprintf(buf[i], sizeof(buf[i]),
575 "PID{Id:%"PRId64",Epoch:%hd}", pid.id, pid.epoch);
576
577 return buf[i];
578}
579
580/**
581 * @brief Reset the PID to invalid/init state
582 */
583static RD_UNUSED RD_INLINE void rd_kafka_pid_reset (rd_kafka_pid_t *pid) {
584 pid->id = -1;
585 pid->epoch = -1;
586}
587
588
589/**
590 * @brief Bump the epoch of a valid PID
591 */
592static RD_UNUSED RD_INLINE rd_kafka_pid_t
593rd_kafka_pid_bump (const rd_kafka_pid_t old) {
594 rd_kafka_pid_t new = { old.id, ((int)old.epoch + 1) & (int)INT16_MAX };
595 return new;
596}
597
598/**@}*/
599
600
601#endif /* _RDKAFKA_PROTO_H_ */
602