1 | /* |
2 | * librdkafka - Apache Kafka C library |
3 | * |
4 | * Copyright (c) 2012-2015, Magnus Edenhill |
5 | * All rights reserved. |
6 | * |
7 | * Redistribution and use in source and binary forms, with or without |
8 | * modification, are permitted provided that the following conditions are met: |
9 | * |
10 | * 1. Redistributions of source code must retain the above copyright notice, |
11 | * this list of conditions and the following disclaimer. |
12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | * this list of conditions and the following disclaimer in the documentation |
14 | * and/or other materials provided with the distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | * POSSIBILITY OF SUCH DAMAGE. |
27 | */ |
28 | |
29 | |
30 | /** |
31 | * This is the high level consumer API which is mutually exclusive |
32 | * with the old legacy simple consumer. |
33 | * Only one of these interfaces may be used on a given rd_kafka_t handle. |
34 | */ |
35 | |
36 | #include "rdkafka_int.h" |
37 | |
38 | |
39 | rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk) { |
40 | rd_kafka_cgrp_t *rkcg; |
41 | |
42 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
43 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
44 | |
45 | return rd_kafka_op_err_destroy(rd_kafka_op_req2(rkcg->rkcg_ops, |
46 | RD_KAFKA_OP_SUBSCRIBE)); |
47 | } |
48 | |
49 | |
50 | /** @returns 1 if the topic is invalid (bad regex, empty), else 0 if valid. */ |
51 | static size_t _invalid_topic_cb (const rd_kafka_topic_partition_t *rktpar, |
52 | void *opaque) { |
53 | rd_regex_t *re; |
54 | char errstr[1]; |
55 | |
56 | if (!*rktpar->topic) |
57 | return 1; |
58 | |
59 | if (*rktpar->topic != '^') |
60 | return 0; |
61 | |
62 | if (!(re = rd_regex_comp(rktpar->topic, errstr, sizeof(errstr)))) |
63 | return 1; |
64 | |
65 | rd_regex_destroy(re); |
66 | |
67 | return 0; |
68 | } |
69 | |
70 | |
71 | rd_kafka_resp_err_t |
72 | rd_kafka_subscribe (rd_kafka_t *rk, |
73 | const rd_kafka_topic_partition_list_t *topics) { |
74 | |
75 | rd_kafka_op_t *rko; |
76 | rd_kafka_cgrp_t *rkcg; |
77 | |
78 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
79 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
80 | |
81 | /* Validate topics */ |
82 | if (topics->cnt == 0 || |
83 | rd_kafka_topic_partition_list_sum(topics, |
84 | _invalid_topic_cb, NULL) > 0) |
85 | return RD_KAFKA_RESP_ERR__INVALID_ARG; |
86 | |
87 | rko = rd_kafka_op_new(RD_KAFKA_OP_SUBSCRIBE); |
88 | rko->rko_u.subscribe.topics = rd_kafka_topic_partition_list_copy(topics); |
89 | |
90 | return rd_kafka_op_err_destroy( |
91 | rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE)); |
92 | } |
93 | |
94 | |
95 | rd_kafka_resp_err_t |
96 | rd_kafka_assign (rd_kafka_t *rk, |
97 | const rd_kafka_topic_partition_list_t *partitions) { |
98 | rd_kafka_op_t *rko; |
99 | rd_kafka_cgrp_t *rkcg; |
100 | |
101 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
102 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
103 | |
104 | rko = rd_kafka_op_new(RD_KAFKA_OP_ASSIGN); |
105 | if (partitions) |
106 | rko->rko_u.assign.partitions = |
107 | rd_kafka_topic_partition_list_copy(partitions); |
108 | |
109 | return rd_kafka_op_err_destroy( |
110 | rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE)); |
111 | } |
112 | |
113 | |
114 | |
115 | rd_kafka_resp_err_t |
116 | rd_kafka_assignment (rd_kafka_t *rk, |
117 | rd_kafka_topic_partition_list_t **partitions) { |
118 | rd_kafka_op_t *rko; |
119 | rd_kafka_resp_err_t err; |
120 | rd_kafka_cgrp_t *rkcg; |
121 | |
122 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
123 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
124 | |
125 | rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_ASSIGNMENT); |
126 | if (!rko) |
127 | return RD_KAFKA_RESP_ERR__TIMED_OUT; |
128 | |
129 | err = rko->rko_err; |
130 | |
131 | *partitions = rko->rko_u.assign.partitions; |
132 | rko->rko_u.assign.partitions = NULL; |
133 | rd_kafka_op_destroy(rko); |
134 | |
135 | if (!*partitions && !err) { |
136 | /* Create an empty list for convenience of the caller */ |
137 | *partitions = rd_kafka_topic_partition_list_new(0); |
138 | } |
139 | |
140 | return err; |
141 | } |
142 | |
143 | rd_kafka_resp_err_t |
144 | rd_kafka_subscription (rd_kafka_t *rk, |
145 | rd_kafka_topic_partition_list_t **topics){ |
146 | rd_kafka_op_t *rko; |
147 | rd_kafka_resp_err_t err; |
148 | rd_kafka_cgrp_t *rkcg; |
149 | |
150 | if (!(rkcg = rd_kafka_cgrp_get(rk))) |
151 | return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP; |
152 | |
153 | rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_SUBSCRIPTION); |
154 | if (!rko) |
155 | return RD_KAFKA_RESP_ERR__TIMED_OUT; |
156 | |
157 | err = rko->rko_err; |
158 | |
159 | *topics = rko->rko_u.subscribe.topics; |
160 | rko->rko_u.subscribe.topics = NULL; |
161 | rd_kafka_op_destroy(rko); |
162 | |
163 | if (!*topics && !err) { |
164 | /* Create an empty list for convenience of the caller */ |
165 | *topics = rd_kafka_topic_partition_list_new(0); |
166 | } |
167 | |
168 | return err; |
169 | } |
170 | |
171 | |
172 | rd_kafka_resp_err_t |
173 | rd_kafka_pause_partitions (rd_kafka_t *rk, |
174 | rd_kafka_topic_partition_list_t *partitions) { |
175 | return rd_kafka_toppars_pause_resume(rk, 1, RD_KAFKA_TOPPAR_F_APP_PAUSE, |
176 | partitions); |
177 | } |
178 | |
179 | |
180 | rd_kafka_resp_err_t |
181 | rd_kafka_resume_partitions (rd_kafka_t *rk, |
182 | rd_kafka_topic_partition_list_t *partitions) { |
183 | return rd_kafka_toppars_pause_resume(rk, 0, RD_KAFKA_TOPPAR_F_APP_PAUSE, |
184 | partitions); |
185 | } |
186 | |
187 | |