1 | /* |
2 | * librdkafka - The Apache Kafka C/C++ library |
3 | * |
4 | * Copyright (c) 2015 Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | #include "rdkafka_int.h" |
29 | #include "rdkafka_assignor.h" |
30 | |
31 | |
32 | |
33 | |
34 | |
35 | |
36 | /** |
37 | * Source: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java |
38 | * |
39 | * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order |
40 | * and the consumers in lexicographic order. We then divide the number of partitions by the total number of |
41 | * consumers to determine the number of partitions to assign to each consumer. If it does not evenly |
42 | * divide, then the first few consumers will have one extra partition. |
43 | * |
44 | * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, |
45 | * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. |
46 | * |
47 | * The assignment will be: |
48 | * C0: [t0p0, t0p1, t1p0, t1p1] |
49 | * C1: [t0p2, t1p2] |
50 | */ |
51 | |
52 | rd_kafka_resp_err_t |
53 | rd_kafka_range_assignor_assign_cb (rd_kafka_t *rk, |
54 | const char *member_id, |
55 | const char *protocol_name, |
56 | const rd_kafka_metadata_t *metadata, |
57 | rd_kafka_group_member_t *members, |
58 | size_t member_cnt, |
59 | rd_kafka_assignor_topic_t **eligible_topics, |
60 | size_t eligible_topic_cnt, |
61 | char *errstr, size_t errstr_size, |
62 | void *opaque) { |
63 | unsigned int ti; |
64 | int i; |
65 | |
66 | /* The range assignor works on a per-topic basis. */ |
67 | for (ti = 0 ; ti < eligible_topic_cnt ; ti++) { |
68 | rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti]; |
69 | int numPartitionsPerConsumer; |
70 | int ; |
71 | |
72 | /* For each topic, we lay out the available partitions in |
73 | * numeric order and the consumers in lexicographic order. */ |
74 | rd_list_sort(&eligible_topic->members, |
75 | rd_kafka_group_member_cmp); |
76 | |
77 | /* We then divide the number of partitions by the total number of |
78 | * consumers to determine the number of partitions to assign to |
79 | * each consumer. */ |
80 | numPartitionsPerConsumer = |
81 | eligible_topic->metadata->partition_cnt / |
82 | rd_list_cnt(&eligible_topic->members); |
83 | |
84 | /* If it does not evenly divide, then the first few consumers |
85 | * will have one extra partition. */ |
86 | consumersWithExtraPartition = |
87 | eligible_topic->metadata->partition_cnt % |
88 | rd_list_cnt(&eligible_topic->members); |
89 | |
90 | rd_kafka_dbg(rk, CGRP, "ASSIGN" , |
91 | "range: Topic %s with %d partition(s) and " |
92 | "%d subscribing member(s)" , |
93 | eligible_topic->metadata->topic, |
94 | eligible_topic->metadata->partition_cnt, |
95 | rd_list_cnt(&eligible_topic->members)); |
96 | |
97 | for (i = 0 ; i < rd_list_cnt(&eligible_topic->members) ; i++) { |
98 | rd_kafka_group_member_t *rkgm = |
99 | rd_list_elem(&eligible_topic->members, i); |
100 | int start = numPartitionsPerConsumer * i + |
101 | RD_MIN(i, consumersWithExtraPartition); |
102 | int length = numPartitionsPerConsumer + |
103 | (i + 1 > consumersWithExtraPartition ? 0 : 1); |
104 | |
105 | if (length == 0) |
106 | continue; |
107 | |
108 | rd_kafka_dbg(rk, CGRP, "ASSIGN" , |
109 | "range: Member \"%s\": " |
110 | "assigned topic %s partitions %d..%d" , |
111 | rkgm->rkgm_member_id->str, |
112 | eligible_topic->metadata->topic, |
113 | start, start+length-1); |
114 | rd_kafka_topic_partition_list_add_range( |
115 | rkgm->rkgm_assignment, |
116 | eligible_topic->metadata->topic, |
117 | start, start+length-1); |
118 | } |
119 | } |
120 | |
121 | return 0; |
122 | } |
123 | |
124 | |
125 | |
126 | |