1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#include "rdkafka_int.h"
29#include "rdkafka_assignor.h"
30
31#include <ctype.h>
32
33/**
34 * Clear out and free any memory used by the member, but not the rkgm itself.
35 */
36void rd_kafka_group_member_clear (rd_kafka_group_member_t *rkgm) {
37 if (rkgm->rkgm_subscription)
38 rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription);
39
40 if (rkgm->rkgm_assignment)
41 rd_kafka_topic_partition_list_destroy(rkgm->rkgm_assignment);
42
43 rd_list_destroy(&rkgm->rkgm_eligible);
44
45 if (rkgm->rkgm_member_id)
46 rd_kafkap_str_destroy(rkgm->rkgm_member_id);
47
48 if (rkgm->rkgm_userdata)
49 rd_kafkap_bytes_destroy(rkgm->rkgm_userdata);
50
51 if (rkgm->rkgm_member_metadata)
52 rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata);
53
54 memset(rkgm, 0, sizeof(*rkgm));
55}
56
57
58/**
59 * Member id string comparator (takes rd_kafka_group_member_t *)
60 */
61int rd_kafka_group_member_cmp (const void *_a, const void *_b) {
62 const rd_kafka_group_member_t *a =
63 (const rd_kafka_group_member_t *)_a;
64 const rd_kafka_group_member_t *b =
65 (const rd_kafka_group_member_t *)_b;
66
67 return rd_kafkap_str_cmp(a->rkgm_member_id, b->rkgm_member_id);
68}
69
70
71/**
72 * Returns true if member subscribes to topic, else false.
73 */
74int
75rd_kafka_group_member_find_subscription (rd_kafka_t *rk,
76 const rd_kafka_group_member_t *rkgm,
77 const char *topic) {
78 int i;
79
80 /* Match against member's subscription. */
81 for (i = 0 ; i < rkgm->rkgm_subscription->cnt ; i++) {
82 const rd_kafka_topic_partition_t *rktpar =
83 &rkgm->rkgm_subscription->elems[i];
84
85 if (rd_kafka_topic_partition_match(rk, rkgm, rktpar,
86 topic, NULL))
87 return 1;
88 }
89
90 return 0;
91}
92
93
94
95static rd_kafkap_bytes_t *
96rd_kafka_consumer_protocol_member_metadata_new (
97 const rd_list_t *topics,
98 const void *userdata, size_t userdata_size) {
99 rd_kafka_buf_t *rkbuf;
100 rd_kafkap_bytes_t *kbytes;
101 int i;
102 int topic_cnt = rd_list_cnt(topics);
103 const rd_kafka_topic_info_t *tinfo;
104 size_t len;
105
106 /*
107 * MemberMetadata => Version Subscription AssignmentStrategies
108 * Version => int16
109 * Subscription => Topics UserData
110 * Topics => [String]
111 * UserData => Bytes
112 */
113
114 rkbuf = rd_kafka_buf_new(1, 100 + (topic_cnt * 100) + userdata_size);
115
116 rd_kafka_buf_write_i16(rkbuf, 0);
117 rd_kafka_buf_write_i32(rkbuf, topic_cnt);
118 RD_LIST_FOREACH(tinfo, topics, i)
119 rd_kafka_buf_write_str(rkbuf, tinfo->topic, -1);
120 if (userdata)
121 rd_kafka_buf_write_bytes(rkbuf, userdata, userdata_size);
122 else /* Kafka 0.9.0.0 cant parse NULL bytes, so we provide empty. */
123 rd_kafka_buf_write_bytes(rkbuf, "", 0);
124
125 /* Get binary buffer and allocate a new Kafka Bytes with a copy. */
126 rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
127 len = rd_slice_remains(&rkbuf->rkbuf_reader);
128 kbytes = rd_kafkap_bytes_new(NULL, (int32_t)len);
129 rd_slice_read(&rkbuf->rkbuf_reader, (void *)kbytes->data, len);
130 rd_kafka_buf_destroy(rkbuf);
131
132 return kbytes;
133
134}
135
136
137
138
139rd_kafkap_bytes_t *
140rd_kafka_assignor_get_metadata (rd_kafka_assignor_t *rkas,
141 const rd_list_t *topics) {
142 return rd_kafka_consumer_protocol_member_metadata_new(
143 topics, rkas->rkas_userdata,
144 rkas->rkas_userdata_size);
145}
146
147
148
149
150
151/**
152 * Returns 1 if all subscriptions are satifised for this member, else 0.
153 */
154static int rd_kafka_member_subscription_match (
155 rd_kafka_cgrp_t *rkcg,
156 rd_kafka_group_member_t *rkgm,
157 const rd_kafka_metadata_topic_t *topic_metadata,
158 rd_kafka_assignor_topic_t *eligible_topic) {
159 int i;
160 int has_regex = 0;
161 int matched = 0;
162
163 /* Match against member's subscription. */
164 for (i = 0 ; i < rkgm->rkgm_subscription->cnt ; i++) {
165 const rd_kafka_topic_partition_t *rktpar =
166 &rkgm->rkgm_subscription->elems[i];
167 int matched_by_regex = 0;
168
169 if (rd_kafka_topic_partition_match(rkcg->rkcg_rk, rkgm, rktpar,
170 topic_metadata->topic,
171 &matched_by_regex)) {
172 rd_list_add(&rkgm->rkgm_eligible,
173 (void *)topic_metadata);
174 matched++;
175 has_regex += matched_by_regex;
176 }
177 }
178
179 if (matched)
180 rd_list_add(&eligible_topic->members, rkgm);
181
182 if (!has_regex &&
183 rd_list_cnt(&rkgm->rkgm_eligible) == rkgm->rkgm_subscription->cnt)
184 return 1; /* All subscriptions matched */
185 else
186 return 0;
187}
188
189
190static void
191rd_kafka_assignor_topic_destroy (rd_kafka_assignor_topic_t *at) {
192 rd_list_destroy(&at->members);
193 rd_free(at);
194}
195
196int rd_kafka_assignor_topic_cmp (const void *_a, const void *_b) {
197 const rd_kafka_assignor_topic_t *a =
198 *(const rd_kafka_assignor_topic_t * const *)_a;
199 const rd_kafka_assignor_topic_t *b =
200 *(const rd_kafka_assignor_topic_t * const *)_b;
201
202 return !strcmp(a->metadata->topic, b->metadata->topic);
203}
204
205/**
206 * Maps the available topics to the group members' subscriptions
207 * and updates the `member` map with the proper list of eligible topics,
208 * the latter are returned in `eligible_topics`.
209 */
210static void
211rd_kafka_member_subscriptions_map (rd_kafka_cgrp_t *rkcg,
212 rd_list_t *eligible_topics,
213 const rd_kafka_metadata_t *metadata,
214 rd_kafka_group_member_t *members,
215 int member_cnt) {
216 int ti;
217 rd_kafka_assignor_topic_t *eligible_topic = NULL;
218
219 rd_list_init(eligible_topics, RD_MIN(metadata->topic_cnt, 10),
220 (void *)rd_kafka_assignor_topic_destroy);
221
222 /* For each topic in the cluster, scan through the member list
223 * to find matching subscriptions. */
224 for (ti = 0 ; ti < metadata->topic_cnt ; ti++) {
225 int complete_cnt = 0;
226 int i;
227
228 /* Ignore topics in blacklist */
229 if (rkcg->rkcg_rk->rk_conf.topic_blacklist &&
230 rd_kafka_pattern_match(rkcg->rkcg_rk->rk_conf.
231 topic_blacklist,
232 metadata->topics[ti].topic)) {
233 rd_kafka_dbg(rkcg->rkcg_rk, TOPIC, "BLACKLIST",
234 "Assignor ignoring blacklisted "
235 "topic \"%s\"",
236 metadata->topics[ti].topic);
237 continue;
238 }
239
240 if (!eligible_topic)
241 eligible_topic = rd_calloc(1, sizeof(*eligible_topic));
242
243 rd_list_init(&eligible_topic->members, member_cnt, NULL);
244
245 /* For each member: scan through its topic subscription */
246 for (i = 0 ; i < member_cnt ; i++) {
247 /* Match topic against existing metadata,
248 incl regex matching. */
249 if (rd_kafka_member_subscription_match(
250 rkcg, &members[i], &metadata->topics[ti],
251 eligible_topic))
252 complete_cnt++;
253 }
254
255 if (rd_list_empty(&eligible_topic->members)) {
256 rd_list_destroy(&eligible_topic->members);
257 continue;
258 }
259
260 eligible_topic->metadata = &metadata->topics[ti];
261 rd_list_add(eligible_topics, eligible_topic);
262 eligible_topic = NULL;
263
264 if (complete_cnt == (int)member_cnt)
265 break;
266 }
267
268 if (eligible_topic)
269 rd_free(eligible_topic);
270}
271
272
273rd_kafka_resp_err_t
274rd_kafka_assignor_run (rd_kafka_cgrp_t *rkcg,
275 const char *protocol_name,
276 rd_kafka_metadata_t *metadata,
277 rd_kafka_group_member_t *members,
278 int member_cnt,
279 char *errstr, size_t errstr_size) {
280 rd_kafka_resp_err_t err;
281 rd_kafka_assignor_t *rkas;
282 rd_ts_t ts_start = rd_clock();
283 int i;
284 rd_list_t eligible_topics;
285 int j;
286
287 if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk, protocol_name)) ||
288 !rkas->rkas_enabled) {
289 rd_snprintf(errstr, errstr_size,
290 "Unsupported assignor \"%s\"", protocol_name);
291 return RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
292 }
293
294
295 /* Map available topics to subscribing members */
296 rd_kafka_member_subscriptions_map(rkcg, &eligible_topics, metadata,
297 members, member_cnt);
298
299
300 if (rkcg->rkcg_rk->rk_conf.debug & RD_KAFKA_DBG_CGRP) {
301 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
302 "Group \"%s\" running %s assignment for "
303 "%d member(s):",
304 rkcg->rkcg_group_id->str, protocol_name,
305 member_cnt);
306
307 for (i = 0 ; i < member_cnt ; i++) {
308 const rd_kafka_group_member_t *member = &members[i];
309
310 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
311 " Member \"%.*s\"%s with "
312 "%d subscription(s):",
313 RD_KAFKAP_STR_PR(member->rkgm_member_id),
314 !rd_kafkap_str_cmp(member->rkgm_member_id,
315 rkcg->rkcg_member_id) ?
316 " (me)":"",
317 member->rkgm_subscription->cnt);
318 for (j = 0 ; j < member->rkgm_subscription->cnt ; j++) {
319 const rd_kafka_topic_partition_t *p =
320 &member->rkgm_subscription->elems[j];
321 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
322 " %s [%"PRId32"]",
323 p->topic, p->partition);
324 }
325 }
326
327
328 }
329
330 /* Call assignors assign callback */
331 err = rkas->rkas_assign_cb(rkcg->rkcg_rk,
332 rkcg->rkcg_member_id->str,
333 protocol_name, metadata,
334 members, member_cnt,
335 (rd_kafka_assignor_topic_t **)
336 eligible_topics.rl_elems,
337 eligible_topics.rl_cnt,
338 errstr, sizeof(errstr),
339 rkas->rkas_opaque);
340
341 if (err) {
342 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
343 "Group \"%s\" %s assignment failed "
344 "for %d member(s): %s",
345 rkcg->rkcg_group_id->str, protocol_name,
346 (int)member_cnt, errstr);
347 } else if (rkcg->rkcg_rk->rk_conf.debug & RD_KAFKA_DBG_CGRP) {
348 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
349 "Group \"%s\" %s assignment for %d member(s) "
350 "finished in %.3fms:",
351 rkcg->rkcg_group_id->str, protocol_name,
352 (int)member_cnt,
353 (float)(rd_clock() - ts_start)/1000.0f);
354 for (i = 0 ; i < member_cnt ; i++) {
355 const rd_kafka_group_member_t *member = &members[i];
356
357 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
358 " Member \"%.*s\"%s assigned "
359 "%d partition(s):",
360 RD_KAFKAP_STR_PR(member->rkgm_member_id),
361 !rd_kafkap_str_cmp(member->rkgm_member_id,
362 rkcg->rkcg_member_id) ?
363 " (me)":"",
364 member->rkgm_assignment->cnt);
365 for (j = 0 ; j < member->rkgm_assignment->cnt ; j++) {
366 const rd_kafka_topic_partition_t *p =
367 &member->rkgm_assignment->elems[j];
368 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
369 " %s [%"PRId32"]",
370 p->topic, p->partition);
371 }
372 }
373 }
374
375 rd_list_destroy(&eligible_topics);
376
377 return err;
378}
379
380
381/**
382 * Assignor protocol string comparator
383 */
384static int rd_kafka_assignor_cmp_str (const void *_a, const void *_b) {
385 const char *a = _a;
386 const rd_kafka_assignor_t *b = _b;
387
388 return rd_kafkap_str_cmp_str2(a, b->rkas_protocol_name);
389}
390
391/**
392 * Find assignor by protocol name.
393 *
394 * Locality: any
395 * Locks: none
396 */
397rd_kafka_assignor_t *
398rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol) {
399 return (rd_kafka_assignor_t *)
400 rd_list_find(&rk->rk_conf.partition_assignors, protocol,
401 rd_kafka_assignor_cmp_str);
402}
403
404
405/**
406 * Destroys an assignor (but does not unlink).
407 */
408static void rd_kafka_assignor_destroy (rd_kafka_assignor_t *rkas) {
409 rd_kafkap_str_destroy(rkas->rkas_protocol_type);
410 rd_kafkap_str_destroy(rkas->rkas_protocol_name);
411 rd_free(rkas);
412}
413
414
415
416/**
417 * Add an assignor, overwriting any previous one with the same protocol_name.
418 */
419static rd_kafka_resp_err_t
420rd_kafka_assignor_add (rd_kafka_t *rk,
421 rd_kafka_assignor_t **rkasp,
422 const char *protocol_type,
423 const char *protocol_name,
424 rd_kafka_resp_err_t (*assign_cb) (
425 rd_kafka_t *rk,
426 const char *member_id,
427 const char *protocol_name,
428 const rd_kafka_metadata_t *metadata,
429 rd_kafka_group_member_t *members,
430 size_t member_cnt,
431 rd_kafka_assignor_topic_t **eligible_topics,
432 size_t eligible_topic_cnt,
433 char *errstr, size_t errstr_size, void *opaque),
434 void *opaque) {
435 rd_kafka_assignor_t *rkas;
436
437 if (rkasp)
438 *rkasp = NULL;
439
440 if (rd_kafkap_str_cmp_str(rk->rk_conf.group_protocol_type,
441 protocol_type))
442 return RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
443
444 /* Dont overwrite application assignors */
445 if ((rkas = rd_kafka_assignor_find(rk, protocol_name))) {
446 if (rkasp)
447 *rkasp = rkas;
448 return RD_KAFKA_RESP_ERR__CONFLICT;
449 }
450
451 rkas = rd_calloc(1, sizeof(*rkas));
452
453 rkas->rkas_protocol_name = rd_kafkap_str_new(protocol_name, -1);
454 rkas->rkas_protocol_type = rd_kafkap_str_new(protocol_type, -1);
455 rkas->rkas_assign_cb = assign_cb;
456 rkas->rkas_get_metadata_cb = rd_kafka_assignor_get_metadata;
457 rkas->rkas_opaque = opaque;
458
459 rd_list_add(&rk->rk_conf.partition_assignors, rkas);
460
461 if (rkasp)
462 *rkasp = rkas;
463
464 return RD_KAFKA_RESP_ERR_NO_ERROR;
465}
466
467
468/* Right trim string of whitespaces */
469static void rtrim (char *s) {
470 char *e = s + strlen(s);
471
472 if (e == s)
473 return;
474
475 while (e >= s && isspace(*e))
476 e--;
477
478 *e = '\0';
479}
480
481
482/**
483 * Initialize assignor list based on configuration.
484 */
485int rd_kafka_assignors_init (rd_kafka_t *rk, char *errstr, size_t errstr_size) {
486 char *wanted;
487 char *s;
488
489 rd_list_init(&rk->rk_conf.partition_assignors, 2,
490 (void *)rd_kafka_assignor_destroy);
491
492 rd_strdupa(&wanted, rk->rk_conf.partition_assignment_strategy);
493
494 s = wanted;
495 while (*s) {
496 rd_kafka_assignor_t *rkas = NULL;
497 char *t;
498
499 /* Left trim */
500 while (*s == ' ' || *s == ',')
501 s++;
502
503 if ((t = strchr(s, ','))) {
504 *t = '\0';
505 t++;
506 } else {
507 t = s + strlen(s);
508 }
509
510 /* Right trim */
511 rtrim(s);
512
513 /* Match builtin consumer assignors */
514 if (!strcmp(s, "range"))
515 rd_kafka_assignor_add(
516 rk, &rkas, "consumer", "range",
517 rd_kafka_range_assignor_assign_cb,
518 NULL);
519 else if (!strcmp(s, "roundrobin"))
520 rd_kafka_assignor_add(
521 rk, &rkas, "consumer", "roundrobin",
522 rd_kafka_roundrobin_assignor_assign_cb,
523 NULL);
524 else {
525 rd_snprintf(errstr, errstr_size,
526 "Unsupported partition.assignment.strategy:"
527 " %s", s);
528 return -1;
529 }
530
531 if (rkas) {
532 if (!rkas->rkas_enabled) {
533 rkas->rkas_enabled = 1;
534 rk->rk_conf.enabled_assignor_cnt++;
535 }
536 }
537
538 s = t;
539 }
540
541 return 0;
542}
543
544
545
546/**
547 * Free assignors
548 */
549void rd_kafka_assignors_term (rd_kafka_t *rk) {
550 rd_list_destroy(&rk->rk_conf.partition_assignors);
551}
552