1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#include "rdkafka_int.h"
29#include "rdkafka_assignor.h"
30
31
32/**
33 * Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
34 *
35 * The roundrobin assignor lays out all the available partitions and all the
36 * available consumers. It then proceeds to do a roundrobin assignment from
37 * partition to consumer. If the subscriptions of all consumer instances are
38 * identical, then the partitions will be uniformly distributed. (i.e., the
39 * partition ownership counts will be within a delta of exactly one across all
40 * consumers.)
41 *
42 * For example, suppose there are two consumers C0 and C1, two topics t0 and
43 * t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
44 * t0p2, t1p0, t1p1, and t1p2.
45 *
46 * The assignment will be:
47 * C0: [t0p0, t0p2, t1p1]
48 * C1: [t0p1, t1p0, t1p2]
49 */
50
51rd_kafka_resp_err_t
52rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
53 const char *member_id,
54 const char *protocol_name,
55 const rd_kafka_metadata_t *metadata,
56 rd_kafka_group_member_t *members,
57 size_t member_cnt,
58 rd_kafka_assignor_topic_t
59 **eligible_topics,
60 size_t eligible_topic_cnt,
61 char *errstr, size_t errstr_size,
62 void *opaque) {
63 unsigned int ti;
64 int next = 0; /* Next member id */
65
66 /* Sort topics by name */
67 qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
68 rd_kafka_assignor_topic_cmp);
69
70 /* Sort members by name */
71 qsort(members, member_cnt, sizeof(*members),
72 rd_kafka_group_member_cmp);
73
74 for (ti = 0 ; ti < eligible_topic_cnt ; ti++) {
75 rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
76 int partition;
77
78 /* For each topic+partition, assign one member (in a cyclic
79 * iteration) per partition until the partitions are exhausted*/
80 for (partition = 0 ;
81 partition < eligible_topic->metadata->partition_cnt ;
82 partition++) {
83 rd_kafka_group_member_t *rkgm;
84
85 /* Scan through members until we find one with a
86 * subscription to this topic. */
87 while (!rd_kafka_group_member_find_subscription(
88 rk, &members[next],
89 eligible_topic->metadata->topic))
90 next++;
91
92 rkgm = &members[next];
93
94 rd_kafka_dbg(rk, CGRP, "ASSIGN",
95 "roundrobin: Member \"%s\": "
96 "assigned topic %s partition %d",
97 rkgm->rkgm_member_id->str,
98 eligible_topic->metadata->topic,
99 partition);
100
101 rd_kafka_topic_partition_list_add(
102 rkgm->rkgm_assignment,
103 eligible_topic->metadata->topic, partition);
104
105 next = (next+1) % rd_list_cnt(&eligible_topic->members);
106 }
107 }
108
109
110 return 0;
111}
112
113
114
115