1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012,2013 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | #ifndef _RDKAFKA_PROTO_H_ |
30 | #define _RDKAFKA_PROTO_H_ |
31 | |
32 | |
33 | #include "rdendian.h" |
34 | #include "rdvarint.h" |
35 | |
36 | |
37 | |
38 | /* |
39 | * Kafka protocol definitions. |
40 | */ |
41 | |
42 | #define RD_KAFKA_PORT 9092 |
43 | #define RD_KAFKA_PORT_STR "9092" |
44 | |
45 | |
46 | /** |
47 | * Request types |
48 | */ |
49 | struct rd_kafkap_reqhdr { |
50 | int32_t Size; |
51 | int16_t ApiKey; |
52 | #define RD_KAFKAP_None -1 |
53 | #define RD_KAFKAP_Produce 0 |
54 | #define RD_KAFKAP_Fetch 1 |
55 | #define RD_KAFKAP_Offset 2 |
56 | #define RD_KAFKAP_Metadata 3 |
57 | #define RD_KAFKAP_LeaderAndIsr 4 |
58 | #define RD_KAFKAP_StopReplica 5 |
59 | #define RD_KAFKAP_UpdateMetadata 6 |
60 | #define RD_KAFKAP_ControlledShutdown 7 |
61 | #define RD_KAFKAP_OffsetCommit 8 |
62 | #define RD_KAFKAP_OffsetFetch 9 |
63 | #define RD_KAFKAP_GroupCoordinator 10 |
64 | #define RD_KAFKAP_JoinGroup 11 |
65 | #define RD_KAFKAP_Heartbeat 12 |
66 | #define RD_KAFKAP_LeaveGroup 13 |
67 | #define RD_KAFKAP_SyncGroup 14 |
68 | #define RD_KAFKAP_DescribeGroups 15 |
69 | #define RD_KAFKAP_ListGroups 16 |
70 | #define RD_KAFKAP_SaslHandshake 17 |
71 | #define RD_KAFKAP_ApiVersion 18 |
72 | #define RD_KAFKAP_CreateTopics 19 |
73 | #define RD_KAFKAP_DeleteTopics 20 |
74 | #define RD_KAFKAP_DeleteRecords 21 |
75 | #define RD_KAFKAP_InitProducerId 22 |
76 | #define RD_KAFKAP_OffsetForLeaderEpoch 23 |
77 | #define RD_KAFKAP_AddPartitionsToTxn 24 |
78 | #define RD_KAFKAP_AddOffsetsToTxn 25 |
79 | #define RD_KAFKAP_EndTxn 26 |
80 | #define RD_KAFKAP_WriteTxnMarkers 27 |
81 | #define RD_KAFKAP_TxnOffsetCommit 28 |
82 | #define RD_KAFKAP_DescribeAcls 29 |
83 | #define RD_KAFKAP_CreateAcls 30 |
84 | #define RD_KAFKAP_DeleteAcls 31 |
85 | #define RD_KAFKAP_DescribeConfigs 32 |
86 | #define RD_KAFKAP_AlterConfigs 33 |
87 | #define RD_KAFKAP_AlterReplicaLogDirs 34 |
88 | #define RD_KAFKAP_DescribeLogDirs 35 |
89 | #define RD_KAFKAP_SaslAuthenticate 36 |
90 | #define RD_KAFKAP_CreatePartitions 37 |
91 | #define RD_KAFKAP_CreateDelegationToken 38 |
92 | #define RD_KAFKAP_RenewDelegationToken 39 |
93 | #define RD_KAFKAP_ExpireDelegationToken 40 |
94 | #define RD_KAFKAP_DescribeDelegationToken 41 |
95 | #define RD_KAFKAP_DeleteGroups 42 |
96 | #define RD_KAFKAP__NUM 43 |
97 | int16_t ApiVersion; |
98 | int32_t CorrId; |
99 | /* ClientId follows */ |
100 | }; |
101 | |
102 | #define RD_KAFKAP_REQHDR_SIZE (4+2+2+4) |
103 | #define RD_KAFKAP_RESHDR_SIZE (4+4) |
104 | |
105 | /** |
106 | * Response header |
107 | */ |
108 | struct rd_kafkap_reshdr { |
109 | int32_t Size; |
110 | int32_t CorrId; |
111 | }; |
112 | |
113 | |
114 | |
115 | static RD_UNUSED |
116 | const char *rd_kafka_ApiKey2str (int16_t ApiKey) { |
117 | static const char *names[] = { |
118 | [RD_KAFKAP_Produce] = "Produce" , |
119 | [RD_KAFKAP_Fetch] = "Fetch" , |
120 | [RD_KAFKAP_Offset] = "Offset" , |
121 | [RD_KAFKAP_Metadata] = "Metadata" , |
122 | [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr" , |
123 | [RD_KAFKAP_StopReplica] = "StopReplica" , |
124 | [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata" , |
125 | [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown" , |
126 | [RD_KAFKAP_OffsetCommit] = "OffsetCommit" , |
127 | [RD_KAFKAP_OffsetFetch] = "OffsetFetch" , |
128 | [RD_KAFKAP_GroupCoordinator] = "GroupCoordinator" , |
129 | [RD_KAFKAP_JoinGroup] = "JoinGroup" , |
130 | [RD_KAFKAP_Heartbeat] = "Heartbeat" , |
131 | [RD_KAFKAP_LeaveGroup] = "LeaveGroup" , |
132 | [RD_KAFKAP_SyncGroup] = "SyncGroup" , |
133 | [RD_KAFKAP_DescribeGroups] = "DescribeGroups" , |
134 | [RD_KAFKAP_ListGroups] = "ListGroups" , |
135 | [RD_KAFKAP_SaslHandshake] = "SaslHandshake" , |
136 | [RD_KAFKAP_ApiVersion] = "ApiVersion" , |
137 | [RD_KAFKAP_CreateTopics] = "CreateTopics" , |
138 | [RD_KAFKAP_DeleteTopics] = "DeleteTopics" , |
139 | [RD_KAFKAP_DeleteRecords] = "DeleteRecords" , |
140 | [RD_KAFKAP_InitProducerId] = "InitProducerId" , |
141 | [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch" , |
142 | [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn" , |
143 | [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn" , |
144 | [RD_KAFKAP_EndTxn] = "EndTxn" , |
145 | [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers" , |
146 | [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit" , |
147 | [RD_KAFKAP_DescribeAcls] = "DescribeAcls" , |
148 | [RD_KAFKAP_CreateAcls] = "CreateAcls" , |
149 | [RD_KAFKAP_DeleteAcls] = "DeleteAcls" , |
150 | [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs" , |
151 | [RD_KAFKAP_AlterConfigs] = "AlterConfigs" , |
152 | [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs" , |
153 | [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs" , |
154 | [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate" , |
155 | [RD_KAFKAP_CreatePartitions] = "CreatePartitions" , |
156 | [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken" , |
157 | [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken" , |
158 | [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken" , |
159 | [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken" , |
160 | [RD_KAFKAP_DeleteGroups] = "DeleteGroups" |
161 | |
162 | }; |
163 | static RD_TLS char ret[32]; |
164 | |
165 | if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) || |
166 | !names[ApiKey]) { |
167 | rd_snprintf(ret, sizeof(ret), "Unknown-%hd?" , ApiKey); |
168 | return ret; |
169 | } |
170 | |
171 | return names[ApiKey]; |
172 | } |
173 | |
174 | |
175 | |
176 | |
177 | |
178 | |
179 | |
180 | |
181 | /** |
182 | * @brief ApiKey version support tuple. |
183 | */ |
184 | struct rd_kafka_ApiVersion { |
185 | int16_t ApiKey; |
186 | int16_t MinVer; |
187 | int16_t MaxVer; |
188 | }; |
189 | |
190 | /** |
191 | * @brief ApiVersion.ApiKey comparator. |
192 | */ |
193 | static RD_UNUSED int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) { |
194 | const struct rd_kafka_ApiVersion *a = _a, *b = _b; |
195 | |
196 | return a->ApiKey - b->ApiKey; |
197 | } |
198 | |
199 | |
200 | |
201 | #define RD_KAFKAP_READ_UNCOMMITTED 0 |
202 | #define RD_KAFKAP_READ_COMMITTED 1 |
203 | |
204 | |
205 | /** |
206 | * |
207 | * Kafka protocol string representation prefixed with a convenience header |
208 | * |
209 | * Serialized format: |
210 | * { uint16, data.. } |
211 | * |
212 | */ |
213 | typedef struct rd_kafkap_str_s { |
214 | /* convenience header (aligned access, host endian) */ |
215 | int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */ |
216 | const char *str; /* points into data[] or other memory, |
217 | * not NULL-terminated */ |
218 | } rd_kafkap_str_t; |
219 | |
220 | |
221 | #define RD_KAFKAP_STR_LEN_NULL -1 |
222 | #define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL) |
223 | |
224 | /* Returns the length of the string of a kafka protocol string representation */ |
225 | #define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len)) |
226 | #define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len) |
227 | |
228 | /* Returns the actual size of a kafka protocol string representation. */ |
229 | #define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len)) |
230 | #define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len) |
231 | |
232 | |
233 | /* Serialized Kafka string: only works for _new() kstrs */ |
234 | #define RD_KAFKAP_STR_SER(kstr) ((kstr)+1) |
235 | |
236 | /* Macro suitable for "%.*s" printing. */ |
237 | #define RD_KAFKAP_STR_PR(kstr) \ |
238 | (int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \ |
239 | (kstr)->str |
240 | |
241 | /* strndupa() a Kafka string */ |
242 | #define RD_KAFKAP_STR_DUPA(destptr,kstr) \ |
243 | rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr)) |
244 | |
245 | /* strndup() a Kafka string */ |
246 | #define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr)) |
247 | |
248 | #define RD_KAFKAP_STR_INITIALIZER { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL } |
249 | |
250 | /** |
251 | * Frees a Kafka string previously allocated with `rd_kafkap_str_new()` |
252 | */ |
253 | static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) { |
254 | rd_free(kstr); |
255 | } |
256 | |
257 | |
258 | |
259 | /** |
260 | * Allocate a new Kafka string and make a copy of 'str'. |
261 | * If 'len' is -1 the length will be calculated. |
262 | * Supports Kafka NULL strings. |
263 | * Nul-terminates the string, but the trailing \0 is not part of |
264 | * the serialized string. |
265 | */ |
266 | static RD_INLINE RD_UNUSED |
267 | rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) { |
268 | rd_kafkap_str_t *kstr; |
269 | int16_t klen; |
270 | |
271 | if (!str) |
272 | len = RD_KAFKAP_STR_LEN_NULL; |
273 | else if (len == -1) |
274 | len = str ? (int)strlen(str) : RD_KAFKAP_STR_LEN_NULL; |
275 | |
276 | kstr = rd_malloc(sizeof(*kstr) + 2 + |
277 | (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1)); |
278 | kstr->len = len; |
279 | |
280 | /* Serialised format: 16-bit string length */ |
281 | klen = htobe16(len); |
282 | memcpy(kstr+1, &klen, 2); |
283 | |
284 | /* Serialised format: non null-terminated string */ |
285 | if (len == RD_KAFKAP_STR_LEN_NULL) |
286 | kstr->str = NULL; |
287 | else { |
288 | kstr->str = ((const char *)(kstr+1))+2; |
289 | memcpy((void *)kstr->str, str, len); |
290 | ((char *)kstr->str)[len] = '\0'; |
291 | } |
292 | |
293 | return kstr; |
294 | } |
295 | |
296 | |
297 | /** |
298 | * Makes a copy of `src`. The copy will be fully allocated and should |
299 | * be freed with rd_kafka_pstr_destroy() |
300 | */ |
301 | static RD_INLINE RD_UNUSED |
302 | rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) { |
303 | return rd_kafkap_str_new(src->str, src->len); |
304 | } |
305 | |
306 | static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a, |
307 | const rd_kafkap_str_t *b) { |
308 | int minlen = RD_MIN(a->len, b->len); |
309 | int r = memcmp(a->str, b->str, minlen); |
310 | if (r) |
311 | return r; |
312 | else |
313 | return a->len - b->len; |
314 | } |
315 | |
316 | static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a, |
317 | const char *str) { |
318 | int len = (int)strlen(str); |
319 | int minlen = RD_MIN(a->len, len); |
320 | int r = memcmp(a->str, str, minlen); |
321 | if (r) |
322 | return r; |
323 | else |
324 | return a->len - len; |
325 | } |
326 | |
327 | static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str, |
328 | const rd_kafkap_str_t *b){ |
329 | int len = (int)strlen(str); |
330 | int minlen = RD_MIN(b->len, len); |
331 | int r = memcmp(str, b->str, minlen); |
332 | if (r) |
333 | return r; |
334 | else |
335 | return len - b->len; |
336 | } |
337 | |
338 | |
339 | |
340 | /** |
341 | * |
342 | * Kafka protocol bytes array representation prefixed with a convenience header |
343 | * |
344 | * Serialized format: |
345 | * { uint32, data.. } |
346 | * |
347 | */ |
348 | typedef struct rd_kafkap_bytes_s { |
349 | /* convenience header (aligned access, host endian) */ |
350 | int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */ |
351 | const void *data; /* points just past the struct, or other memory, |
352 | * not NULL-terminated */ |
353 | const char _data[1]; /* Bytes following struct when new()ed */ |
354 | } rd_kafkap_bytes_t; |
355 | |
356 | |
357 | #define RD_KAFKAP_BYTES_LEN_NULL -1 |
358 | #define RD_KAFKAP_BYTES_IS_NULL(kbytes) \ |
359 | ((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL) |
360 | |
361 | /* Returns the length of the bytes of a kafka protocol bytes representation */ |
362 | #define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len)) |
363 | #define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len) |
364 | |
365 | /* Returns the actual size of a kafka protocol bytes representation. */ |
366 | #define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len)) |
367 | #define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len) |
368 | |
369 | |
370 | /* Serialized Kafka bytes: only works for _new() kbytes */ |
371 | #define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes)+1) |
372 | |
373 | |
374 | /** |
375 | * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()` |
376 | */ |
377 | static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) { |
378 | rd_free(kbytes); |
379 | } |
380 | |
381 | |
382 | /** |
383 | * @brief Allocate a new Kafka bytes and make a copy of 'bytes'. |
384 | * If \p len > 0 but \p bytes is NULL no copying is performed by |
385 | * the bytes structure will be allocated to fit \p size bytes. |
386 | * |
387 | * Supports: |
388 | * - Kafka NULL bytes (bytes==NULL,len==0), |
389 | * - Empty bytes (bytes!=NULL,len==0) |
390 | * - Copy data (bytes!=NULL,len>0) |
391 | * - No-copy, just alloc (bytes==NULL,len>0) |
392 | */ |
393 | static RD_INLINE RD_UNUSED |
394 | rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) { |
395 | rd_kafkap_bytes_t *kbytes; |
396 | int32_t klen; |
397 | |
398 | if (!bytes && !len) |
399 | len = RD_KAFKAP_BYTES_LEN_NULL; |
400 | |
401 | kbytes = rd_malloc(sizeof(*kbytes) + 4 + |
402 | (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len)); |
403 | kbytes->len = len; |
404 | |
405 | klen = htobe32(len); |
406 | memcpy(kbytes+1, &klen, 4); |
407 | |
408 | if (len == RD_KAFKAP_BYTES_LEN_NULL) |
409 | kbytes->data = NULL; |
410 | else { |
411 | kbytes->data = ((const char *)(kbytes+1))+4; |
412 | if (bytes) |
413 | memcpy((void *)kbytes->data, bytes, len); |
414 | } |
415 | |
416 | return kbytes; |
417 | } |
418 | |
419 | |
420 | /** |
421 | * Makes a copy of `src`. The copy will be fully allocated and should |
422 | * be freed with rd_kafkap_bytes_destroy() |
423 | */ |
424 | static RD_INLINE RD_UNUSED |
425 | rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) { |
426 | return rd_kafkap_bytes_new(src->data, src->len); |
427 | } |
428 | |
429 | |
430 | static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a, |
431 | const rd_kafkap_bytes_t *b) { |
432 | int minlen = RD_MIN(a->len, b->len); |
433 | int r = memcmp(a->data, b->data, minlen); |
434 | if (r) |
435 | return r; |
436 | else |
437 | return a->len - b->len; |
438 | } |
439 | |
440 | static RD_INLINE RD_UNUSED |
441 | int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a, |
442 | const char *data, int len) { |
443 | int minlen = RD_MIN(a->len, len); |
444 | int r = memcmp(a->data, data, minlen); |
445 | if (r) |
446 | return r; |
447 | else |
448 | return a->len - len; |
449 | } |
450 | |
451 | |
452 | |
453 | |
454 | typedef struct rd_kafka_buf_s rd_kafka_buf_t; |
455 | |
456 | |
457 | #define RD_KAFKA_NODENAME_SIZE 256 |
458 | |
459 | |
460 | |
461 | |
462 | /** |
463 | * @brief Message overheads (worst-case) |
464 | */ |
465 | |
466 | /** |
467 | * MsgVersion v0..v1 |
468 | */ |
469 | /* Offset + MessageSize */ |
470 | #define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4) |
471 | /* CRC + Magic + Attr + KeyLen + ValueLen */ |
472 | #define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4+1+1+4+4) |
473 | /* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */ |
474 | #define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4+1+1+8+4+4) |
475 | /* Maximum per-message overhead */ |
476 | #define RD_KAFKAP_MESSAGE_V0_OVERHEAD \ |
477 | (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE) |
478 | #define RD_KAFKAP_MESSAGE_V1_OVERHEAD \ |
479 | (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE) |
480 | |
481 | /** |
482 | * MsgVersion v2 |
483 | */ |
484 | #define RD_KAFKAP_MESSAGE_V2_OVERHEAD \ |
485 | ( \ |
486 | /* Length (varint) */ \ |
487 | RD_UVARINT_ENC_SIZEOF(int32_t) + \ |
488 | /* Attributes */ \ |
489 | 1 + \ |
490 | /* TimestampDelta (varint) */ \ |
491 | RD_UVARINT_ENC_SIZEOF(int64_t) + \ |
492 | /* OffsetDelta (varint) */ \ |
493 | RD_UVARINT_ENC_SIZEOF(int32_t) + \ |
494 | /* KeyLen (varint) */ \ |
495 | RD_UVARINT_ENC_SIZEOF(int32_t) + \ |
496 | /* ValueLen (varint) */ \ |
497 | RD_UVARINT_ENC_SIZEOF(int32_t) + \ |
498 | /* HeaderCnt (varint): */ \ |
499 | RD_UVARINT_ENC_SIZEOF(int32_t) \ |
500 | ) |
501 | |
502 | |
503 | |
504 | /** |
505 | * @brief MessageSets are not explicitly versioned but depends on the |
506 | * Produce/Fetch API version and the encompassed Message versions. |
507 | * We use the Message version (MsgVersion, aka MagicByte) to describe |
508 | * the MessageSet version, that is, MsgVersion <= 1 uses the old |
509 | * MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2 |
510 | */ |
511 | |
512 | /* Old MessageSet header: none */ |
513 | #define RD_KAFKAP_MSGSET_V0_SIZE 0 |
514 | |
515 | /* MessageSet v2 header */ |
516 | #define RD_KAFKAP_MSGSET_V2_SIZE (8+4+4+1+4+2+4+8+8+8+2+4+4) |
517 | |
518 | /* Byte offsets for MessageSet fields */ |
519 | #define RD_KAFKAP_MSGSET_V2_OF_Length (8) |
520 | #define RD_KAFKAP_MSGSET_V2_OF_CRC (8+4+4+1) |
521 | #define RD_KAFKAP_MSGSET_V2_OF_Attributes (8+4+4+1+4) |
522 | #define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8+4+4+1+4+2) |
523 | #define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8+4+4+1+4+2+4) |
524 | #define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8+4+4+1+4+2+4+8) |
525 | #define RD_KAFKAP_MSGSET_V2_OF_BaseSequence (8+4+4+1+4+2+4+8+8+8+2) |
526 | #define RD_KAFKAP_MSGSET_V2_OF_RecordCount (8+4+4+1+4+2+4+8+8+8+2+4) |
527 | |
528 | |
529 | |
530 | |
531 | /** |
532 | * @name Producer ID and Epoch for the Idempotent Producer |
533 | * @{ |
534 | * |
535 | */ |
536 | |
537 | /** |
538 | * @brief Producer ID and Epoch |
539 | */ |
540 | typedef struct rd_kafka_pid_s { |
541 | int64_t id; /**< Producer Id */ |
542 | int16_t epoch; /**< Producer Epoch */ |
543 | } rd_kafka_pid_t; |
544 | |
545 | #define RD_KAFKA_PID_INITIALIZER {-1,-1} |
546 | |
547 | /** |
548 | * @returns true if \p PID is valid |
549 | */ |
550 | #define rd_kafka_pid_valid(PID) ((PID).id != -1) |
551 | |
552 | /** |
553 | * @brief Check two pids for equality |
554 | */ |
555 | static RD_UNUSED RD_INLINE int rd_kafka_pid_eq (const rd_kafka_pid_t a, |
556 | const rd_kafka_pid_t b) { |
557 | return a.id == b.id && a.epoch == b.epoch; |
558 | } |
559 | |
560 | /** |
561 | * @returns the string representation of a PID in a thread-safe |
562 | * static buffer. |
563 | */ |
564 | static RD_UNUSED const char * |
565 | rd_kafka_pid2str (const rd_kafka_pid_t pid) { |
566 | static RD_TLS char buf[2][64]; |
567 | static RD_TLS int i; |
568 | |
569 | if (!rd_kafka_pid_valid(pid)) |
570 | return "PID{Invalid}" ; |
571 | |
572 | i = (i + 1) % 2; |
573 | |
574 | rd_snprintf(buf[i], sizeof(buf[i]), |
575 | "PID{Id:%" PRId64",Epoch:%hd}" , pid.id, pid.epoch); |
576 | |
577 | return buf[i]; |
578 | } |
579 | |
580 | /** |
581 | * @brief Reset the PID to invalid/init state |
582 | */ |
583 | static RD_UNUSED RD_INLINE void rd_kafka_pid_reset (rd_kafka_pid_t *pid) { |
584 | pid->id = -1; |
585 | pid->epoch = -1; |
586 | } |
587 | |
588 | |
589 | /** |
590 | * @brief Bump the epoch of a valid PID |
591 | */ |
592 | static RD_UNUSED RD_INLINE rd_kafka_pid_t |
593 | rd_kafka_pid_bump (const rd_kafka_pid_t old) { |
594 | rd_kafka_pid_t new = { old.id, ((int)old.epoch + 1) & (int)INT16_MAX }; |
595 | return new; |
596 | } |
597 | |
598 | /**@}*/ |
599 | |
600 | |
601 | #endif /* _RDKAFKA_PROTO_H_ */ |
602 | |