1/*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28#ifndef _RDKAFKA_ASSIGNOR_H_
29#define _RDKAFKA_ASSIGNOR_H_
30
31
32
33typedef struct rd_kafka_group_member_s {
34 rd_kafka_topic_partition_list_t *rkgm_subscription;
35 rd_kafka_topic_partition_list_t *rkgm_assignment;
36 rd_list_t rkgm_eligible;
37 rd_kafkap_str_t *rkgm_member_id;
38 rd_kafkap_bytes_t *rkgm_userdata;
39 rd_kafkap_bytes_t *rkgm_member_metadata;
40} rd_kafka_group_member_t;
41
42
43int rd_kafka_group_member_cmp (const void *_a, const void *_b);
44
45int
46rd_kafka_group_member_find_subscription (rd_kafka_t *rk,
47 const rd_kafka_group_member_t *rkgm,
48 const char *topic);
49
50
51/**
52 * Structure to hold metadata for a single topic and all its
53 * subscribing members.
54 */
55typedef struct rd_kafka_assignor_topic_s {
56 const rd_kafka_metadata_topic_t *metadata;
57 rd_list_t members; /* rd_kafka_group_member_t * */
58} rd_kafka_assignor_topic_t;
59
60
61int rd_kafka_assignor_topic_cmp (const void *_a, const void *_b);
62
63
64typedef struct rd_kafka_assignor_s {
65 rd_kafkap_str_t *rkas_protocol_type;
66 rd_kafkap_str_t *rkas_protocol_name;
67
68 const void *rkas_userdata;
69 size_t rkas_userdata_size;
70
71 int rkas_enabled;
72
73 rd_kafka_resp_err_t (*rkas_assign_cb) (
74 rd_kafka_t *rk,
75 const char *member_id,
76 const char *protocol_name,
77 const rd_kafka_metadata_t *metadata,
78 rd_kafka_group_member_t *members,
79 size_t member_cnt,
80 rd_kafka_assignor_topic_t **eligible_topics,
81 size_t eligible_topic_cnt,
82 char *errstr,
83 size_t errstr_size,
84 void *opaque);
85
86 rd_kafkap_bytes_t *(*rkas_get_metadata_cb) (
87 struct rd_kafka_assignor_s *rkpas,
88 const rd_list_t *topics);
89
90
91 void (*rkas_on_assignment_cb) (const char *member_id,
92 rd_kafka_group_member_t
93 *assignment, void *opaque);
94
95 void *rkas_opaque;
96} rd_kafka_assignor_t;
97
98
99rd_kafkap_bytes_t *
100rd_kafka_assignor_get_metadata (rd_kafka_assignor_t *rkpas,
101 const rd_list_t *topics);
102
103
104void rd_kafka_assignor_update_subscription (rd_kafka_assignor_t *rkpas,
105 const rd_kafka_topic_partition_list_t
106 *subscription);
107
108
109rd_kafka_resp_err_t
110rd_kafka_assignor_run (struct rd_kafka_cgrp_s *rkcg,
111 const char *protocol_name,
112 rd_kafka_metadata_t *metadata,
113 rd_kafka_group_member_t *members, int member_cnt,
114 char *errstr, size_t errstr_size);
115
116rd_kafka_assignor_t *
117rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol);
118
119int rd_kafka_assignors_init (rd_kafka_t *rk, char *errstr, size_t errstr_size);
120void rd_kafka_assignors_term (rd_kafka_t *rk);
121
122
123
124void rd_kafka_group_member_clear (rd_kafka_group_member_t *rkgm);
125
126
127/**
128 * rd_kafka_range_assignor.c
129 */
130rd_kafka_resp_err_t
131rd_kafka_range_assignor_assign_cb (rd_kafka_t *rk,
132 const char *member_id,
133 const char *protocol_name,
134 const rd_kafka_metadata_t *metadata,
135 rd_kafka_group_member_t *members,
136 size_t member_cnt,
137 rd_kafka_assignor_topic_t **eligible_topics,
138 size_t eligible_topic_cnt,
139 char *errstr, size_t errstr_size,
140 void *opaque);
141
142
143/**
144 * rd_kafka_roundrobin_assignor.c
145 */
146rd_kafka_resp_err_t
147rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
148 const char *member_id,
149 const char *protocol_name,
150 const rd_kafka_metadata_t *metadata,
151 rd_kafka_group_member_t *members,
152 size_t member_cnt,
153 rd_kafka_assignor_topic_t
154 **eligible_topics,
155 size_t eligible_topic_cnt,
156 char *errstr, size_t errstr_size,
157 void *opaque);
158
159#endif /* _RDKAFKA_ASSIGNOR_H_ */
160