1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30#include "rdkafka_int.h"
31#include "rdkafka_feature.h"
32
33#include <stdlib.h>
34
35static const char *rd_kafka_feature_names[] = {
36 "MsgVer1",
37 "ApiVersion",
38 "BrokerBalancedConsumer",
39 "ThrottleTime",
40 "Sasl",
41 "SaslHandshake",
42 "BrokerGroupCoordinator",
43 "LZ4",
44 "OffsetTime",
45 "MsgVer2",
46 "IdempotentProducer",
47 "ZSTD",
48 "UnitTest",
49 NULL
50};
51
52
53static const struct rd_kafka_feature_map {
54 /* RD_KAFKA_FEATURE_... */
55 int feature;
56
57 /* Depends on the following ApiVersions overlapping with
58 * what the broker supports: */
59 struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];
60
61} rd_kafka_feature_map[] = {
62 /**
63 * @brief List of features and the ApiVersions they depend on.
64 *
65 * The dependency list consists of the ApiKey followed by this
66 * client's supported minimum and maximum API versions.
67 * As long as this list and its versions overlaps with the
68 * broker supported API versions the feature will be enabled.
69 */
70 {
71
72 /* @brief >=0.10.0: Message.MagicByte version 1:
73 * Relative offsets (KIP-31) and message timestamps (KIP-32). */
74 .feature = RD_KAFKA_FEATURE_MSGVER1,
75 .depends = {
76 { RD_KAFKAP_Produce, 2, 2 },
77 { RD_KAFKAP_Fetch, 2, 2 },
78 { -1 },
79 },
80 },
81 {
82 /* @brief >=0.11.0: Message.MagicByte version 2 */
83 .feature = RD_KAFKA_FEATURE_MSGVER2,
84 .depends = {
85 { RD_KAFKAP_Produce, 3, 3 },
86 { RD_KAFKAP_Fetch, 4, 4 },
87 { -1 },
88 },
89 },
90 {
91
92 /* @brief >=0.10.0: ApiVersionQuery support.
93 * @remark This is a bit of chicken-and-egg problem but needs to be
94 * set by feature_check() to avoid the feature being cleared
95 * even when broker supports it. */
96 .feature = RD_KAFKA_FEATURE_APIVERSION,
97 .depends = {
98 { RD_KAFKAP_ApiVersion, 0, 0 },
99 { -1 },
100 },
101 },
102 {
103 /* @brief >=0.8.2.0: Broker-based Group coordinator */
104 .feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
105 .depends = {
106 { RD_KAFKAP_GroupCoordinator, 0, 0 },
107 { -1 },
108 },
109 },
110 {
111 /* @brief >=0.9.0: Broker-based balanced consumer groups. */
112 .feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
113 .depends = {
114 { RD_KAFKAP_GroupCoordinator, 0, 0 },
115 { RD_KAFKAP_OffsetCommit, 1, 2 },
116 { RD_KAFKAP_OffsetFetch, 1, 1 },
117 { RD_KAFKAP_JoinGroup, 0, 0 },
118 { RD_KAFKAP_SyncGroup, 0, 0 },
119 { RD_KAFKAP_Heartbeat, 0, 0 },
120 { RD_KAFKAP_LeaveGroup, 0, 0 },
121 { -1 },
122 },
123 },
124 {
125 /* @brief >=0.9.0: ThrottleTime */
126 .feature = RD_KAFKA_FEATURE_THROTTLETIME,
127 .depends = {
128 { RD_KAFKAP_Produce, 1, 2 },
129 { RD_KAFKAP_Fetch, 1, 2 },
130 { -1 },
131 },
132
133 },
134 {
135 /* @brief >=0.9.0: SASL (GSSAPI) authentication.
136 * Since SASL is not using the Kafka protocol
137 * we must use something else to map us to the
138 * proper broker version support:
139 * JoinGroup was released along with SASL in 0.9.0. */
140 .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
141 .depends = {
142 { RD_KAFKAP_JoinGroup, 0, 0 },
143 { -1 },
144 },
145 },
146 {
147 /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
148 * to automatically support other mechanisms
149 * than GSSAPI, such as PLAIN. */
150 .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
151 .depends = {
152 { RD_KAFKAP_SaslHandshake, 0, 0 },
153 { -1 },
154 },
155 },
156 {
157 /* @brief >=0.8.2: LZ4 compression.
158 * Since LZ4 initially did not rely on a specific API
159 * type or version (it does in >=0.10.0)
160 * we must use something else to map us to the
161 * proper broker version support:
162 * GrooupCoordinator was released in 0.8.2 */
163 .feature = RD_KAFKA_FEATURE_LZ4,
164 .depends = {
165 { RD_KAFKAP_GroupCoordinator, 0, 0 },
166 { -1 },
167 },
168 },
169 {
170 /* @brief >=0.10.1.0: Offset v1 (KIP-79)
171 * Time-based offset requests */
172 .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
173 .depends = {
174 { RD_KAFKAP_Offset, 1, 1 },
175 { -1 },
176 }
177 },
178 {
179 /* @brief >=0.11.0.0: Idempotent Producer*/
180 .feature = RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER,
181 .depends = {
182 { RD_KAFKAP_InitProducerId, 0, 0 },
183 { -1 },
184 }
185 },
186 {
187 /* @brief >=2.1.0-IV2: Support ZStandard Compression Codec (KIP-110) */
188 .feature = RD_KAFKA_FEATURE_ZSTD,
189 .depends = {
190 { RD_KAFKAP_Produce, 7, 7 },
191 { RD_KAFKAP_Fetch, 10, 10 },
192 { -1 },
193 },
194 },
195 { .feature = 0 }, /* sentinel */
196};
197
198
199
200/**
201 * @brief In absence of KIP-35 support in earlier broker versions we provide hardcoded
202 * lists that corresponds to older broker versions.
203 */
204
205/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
206static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
207 { RD_KAFKAP_ApiVersion, 0, 0 }
208};
209
210
211/* =~ 0.9.0 */
212static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
213 { RD_KAFKAP_Produce, 0, 1 },
214 { RD_KAFKAP_Fetch, 0, 1 },
215 { RD_KAFKAP_Offset, 0, 0 },
216 { RD_KAFKAP_Metadata, 0, 0 },
217 { RD_KAFKAP_OffsetCommit, 0, 2 },
218 { RD_KAFKAP_OffsetFetch, 0, 1 },
219 { RD_KAFKAP_GroupCoordinator, 0, 0 },
220 { RD_KAFKAP_JoinGroup, 0, 0 },
221 { RD_KAFKAP_Heartbeat, 0, 0 },
222 { RD_KAFKAP_LeaveGroup, 0, 0 },
223 { RD_KAFKAP_SyncGroup, 0, 0 },
224 { RD_KAFKAP_DescribeGroups, 0, 0 },
225 { RD_KAFKAP_ListGroups, 0, 0 }
226};
227
228/* =~ 0.8.2 */
229static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
230 { RD_KAFKAP_Produce, 0, 0 },
231 { RD_KAFKAP_Fetch, 0, 0 },
232 { RD_KAFKAP_Offset, 0, 0 },
233 { RD_KAFKAP_Metadata, 0, 0 },
234 { RD_KAFKAP_OffsetCommit, 0, 1 },
235 { RD_KAFKAP_OffsetFetch, 0, 1 },
236 { RD_KAFKAP_GroupCoordinator, 0, 0 }
237};
238
239/* =~ 0.8.1 */
240static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
241 { RD_KAFKAP_Produce, 0, 0 },
242 { RD_KAFKAP_Fetch, 0, 0 },
243 { RD_KAFKAP_Offset, 0, 0 },
244 { RD_KAFKAP_Metadata, 0, 0 },
245 { RD_KAFKAP_OffsetCommit, 0, 1 },
246 { RD_KAFKAP_OffsetFetch, 0, 0 }
247};
248
249/* =~ 0.8.0 */
250static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
251 { RD_KAFKAP_Produce, 0, 0 },
252 { RD_KAFKAP_Fetch, 0, 0 },
253 { RD_KAFKAP_Offset, 0, 0 },
254 { RD_KAFKAP_Metadata, 0, 0 }
255};
256
257
258/**
259 * @brief Returns the ApiVersion list for legacy broker versions that do not
260 * support the ApiVersionQuery request. E.g., brokers <0.10.0.
261 *
262 * @param broker_version Broker version to match (longest prefix matching).
263 * @param use_default If no match is found return the default APIs (but return 0).
264 *
265 * @returns 1 if \p broker_version was recognized: \p *apisp will point to
266 * the ApiVersion list and *api_cntp will be set to its element count.
267 * 0 if \p broker_version was not recognized: \p *apisp remains unchanged.
268 *
269 */
270int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
271 struct rd_kafka_ApiVersion **apisp,
272 size_t *api_cntp, const char *fallback) {
273 static const struct {
274 const char *pfx;
275 struct rd_kafka_ApiVersion *apis;
276 size_t api_cnt;
277 } vermap[] = {
278#define _VERMAP(PFX,APIS) { PFX, APIS, RD_ARRAYSIZE(APIS) }
279 _VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
280 _VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
281 _VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
282 _VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
283 { "0.7.", NULL }, /* Unsupported */
284 { "0.6.", NULL }, /* Unsupported */
285 _VERMAP("", rd_kafka_ApiVersion_Queryable),
286 { NULL }
287 };
288 int i;
289 int fallback_i = -1;
290 int ret = 0;
291
292 *apisp = NULL;
293 *api_cntp = 0;
294
295 for (i = 0 ; vermap[i].pfx ; i++) {
296 if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx))) {
297 if (!vermap[i].apis)
298 return 0;
299 *apisp = vermap[i].apis;
300 *api_cntp = vermap[i].api_cnt;
301 ret = 1;
302 break;
303 } else if (fallback && !strcmp(vermap[i].pfx, fallback))
304 fallback_i = i;
305 }
306
307 if (!*apisp && fallback) {
308 rd_kafka_assert(NULL, fallback_i != -1);
309 *apisp = vermap[fallback_i].apis;
310 *api_cntp = vermap[fallback_i].api_cnt;
311 }
312
313 return ret;
314}
315
316
317/**
318 * @returns 1 if the provided broker version (probably)
319 * supports api.version.request.
320 */
321int rd_kafka_ApiVersion_is_queryable (const char *broker_version) {
322 struct rd_kafka_ApiVersion *apis;
323 size_t api_cnt;
324
325
326 if (!rd_kafka_get_legacy_ApiVersions(broker_version,
327 &apis, &api_cnt, 0))
328 return 0;
329
330 return apis == rd_kafka_ApiVersion_Queryable;
331}
332
333
334
335
336
337/**
338 * @brief Check if match's versions overlaps with \p apis.
339 *
340 * @returns 1 if true, else 0.
341 * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
342 */
343static RD_INLINE int
344rd_kafka_ApiVersion_check (const struct rd_kafka_ApiVersion *apis, size_t api_cnt,
345 const struct rd_kafka_ApiVersion *match) {
346 const struct rd_kafka_ApiVersion *api;
347
348 api = bsearch(match, apis, api_cnt, sizeof(*apis),
349 rd_kafka_ApiVersion_key_cmp);
350 if (unlikely(!api))
351 return 0;
352
353 return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
354}
355
356
357/**
358 * @brief Compare broker's supported API versions to our feature request map
359 * and enable/disable features accordingly.
360 *
361 * @param broker_apis Broker's supported APIs. If NULL the
362 * \p broker.version.fallback configuration property will specify a
363 * default legacy version to use.
364 * @param broker_api_cnt Number of elements in \p broker_apis
365 *
366 * @returns the supported features (bitmask) to enable.
367 */
368int rd_kafka_features_check (rd_kafka_broker_t *rkb,
369 struct rd_kafka_ApiVersion *broker_apis,
370 size_t broker_api_cnt) {
371 int features = 0;
372 int i;
373
374 /* Scan through features. */
375 for (i = 0 ; rd_kafka_feature_map[i].feature != 0 ; i++) {
376 const struct rd_kafka_ApiVersion *match;
377 int fails = 0;
378
379 /* For each feature check that all its API dependencies
380 * can be fullfilled. */
381
382 for (match = &rd_kafka_feature_map[i].depends[0] ;
383 match->ApiKey != -1 ; match++) {
384 int r;
385
386 r = rd_kafka_ApiVersion_check(broker_apis, broker_api_cnt,
387 match);
388
389 rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
390 " Feature %s: %s (%hd..%hd) "
391 "%ssupported by broker",
392 rd_kafka_features2str(rd_kafka_feature_map[i].
393 feature),
394 rd_kafka_ApiKey2str(match->ApiKey),
395 match->MinVer, match->MaxVer,
396 r ? "" : "NOT ");
397
398 fails += !r;
399 }
400
401 rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
402 "%s feature %s",
403 fails ? "Disabling" : "Enabling",
404 rd_kafka_features2str(rd_kafka_feature_map[i].feature));
405
406
407 if (!fails)
408 features |= rd_kafka_feature_map[i].feature;
409 }
410
411 return features;
412}
413
414
415
416/**
417 * @brief Make an allocated and sorted copy of \p src.
418 */
419void
420rd_kafka_ApiVersions_copy (const struct rd_kafka_ApiVersion *src,
421 size_t src_cnt,
422 struct rd_kafka_ApiVersion **dstp,
423 size_t *dst_cntp) {
424 *dstp = rd_memdup(src, sizeof(*src) * src_cnt);
425 *dst_cntp = src_cnt;
426 qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
427}
428
429
430
431
432
433
434/**
435 * @returns a human-readable feature flag string.
436 */
437const char *rd_kafka_features2str (int features) {
438 static RD_TLS char ret[4][256];
439 size_t of = 0;
440 static RD_TLS int reti = 0;
441 int i;
442
443 reti = (reti + 1) % 4;
444
445 *ret[reti] = '\0';
446 for (i = 0 ; rd_kafka_feature_names[i] ; i++) {
447 int r;
448 if (!(features & (1 << i)))
449 continue;
450
451 r = rd_snprintf(ret[reti]+of, sizeof(ret[reti])-of, "%s%s",
452 of == 0 ? "" : ",",
453 rd_kafka_feature_names[i]);
454 if ((size_t)r > sizeof(ret[reti])-of) {
455 /* Out of space */
456 memcpy(&ret[reti][sizeof(ret[reti])-3], "..", 3);
457 break;
458 }
459
460 of += r;
461 }
462
463 return ret[reti];
464}
465