1 | /* |
2 | * Copyright (c) 2017, Matias Fontanini |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are |
7 | * met: |
8 | * |
9 | * * Redistributions of source code must retain the above copyright |
10 | * notice, this list of conditions and the following disclaimer. |
11 | * * Redistributions in binary form must reproduce the above |
12 | * copyright notice, this list of conditions and the following disclaimer |
13 | * in the documentation and/or other materials provided with the |
14 | * distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
17 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
18 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
19 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
20 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
21 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
22 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
23 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
24 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
25 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
26 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
27 | * |
28 | */ |
29 | #include <sstream> |
30 | #include <algorithm> |
31 | #include <cctype> |
32 | #include "macros.h" |
33 | #include "consumer.h" |
34 | #include "exceptions.h" |
35 | #include "logging.h" |
36 | #include "configuration.h" |
37 | #include "topic_partition_list.h" |
38 | #include "detail/callback_invoker.h" |
39 | |
40 | using std::vector; |
41 | using std::string; |
42 | using std::move; |
43 | using std::make_tuple; |
44 | using std::ostringstream; |
45 | using std::chrono::milliseconds; |
46 | using std::toupper; |
47 | using std::equal; |
48 | using std::allocator; |
49 | |
50 | namespace cppkafka { |
51 | |
52 | void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, |
53 | rd_kafka_topic_partition_list_t *partitions, void *opaque) { |
54 | TopicPartitionList list = convert(partitions); |
55 | static_cast<Consumer*>(opaque)->handle_rebalance(error, list); |
56 | } |
57 | |
58 | Consumer::Consumer(Configuration config) |
59 | : KafkaHandleBase(move(config)) { |
60 | char error_buffer[512]; |
61 | rd_kafka_conf_t* config_handle = get_configuration_handle(); |
62 | // Set ourselves as the opaque pointer |
63 | rd_kafka_conf_set_opaque(config_handle, this); |
64 | rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy); |
65 | rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, |
66 | rd_kafka_conf_dup(config_handle), |
67 | error_buffer, sizeof(error_buffer)); |
68 | if (!ptr) { |
69 | throw Exception("Failed to create consumer handle: " + string(error_buffer)); |
70 | } |
71 | rd_kafka_poll_set_consumer(ptr); |
72 | set_handle(ptr); |
73 | } |
74 | |
75 | Consumer::~Consumer() { |
76 | try { |
77 | // make sure to destroy the function closures. in case they hold kafka |
78 | // objects, they will need to be destroyed before we destroy the handle |
79 | assignment_callback_ = nullptr; |
80 | revocation_callback_ = nullptr; |
81 | rebalance_error_callback_ = nullptr; |
82 | close(); |
83 | } |
84 | catch (const HandleException& ex) { |
85 | ostringstream error_msg; |
86 | error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); |
87 | CallbackInvoker<Configuration::ErrorCallback> error_cb("error" , get_configuration().get_error_callback(), this); |
88 | CallbackInvoker<Configuration::LogCallback> logger_cb("log" , get_configuration().get_log_callback(), nullptr); |
89 | if (error_cb) { |
90 | error_cb(*this, static_cast<int>(ex.get_error().get_error()), error_msg.str()); |
91 | } |
92 | else if (logger_cb) { |
93 | logger_cb(*this, static_cast<int>(LogLevel::LogErr), "cppkafka" , error_msg.str()); |
94 | } |
95 | else { |
96 | rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LogErr), "cppkafka" , error_msg.str().c_str()); |
97 | } |
98 | } |
99 | } |
100 | |
101 | void Consumer::set_assignment_callback(AssignmentCallback callback) { |
102 | assignment_callback_ = move(callback); |
103 | } |
104 | |
105 | void Consumer::set_revocation_callback(RevocationCallback callback) { |
106 | revocation_callback_ = move(callback); |
107 | } |
108 | |
109 | void Consumer::set_rebalance_error_callback(RebalanceErrorCallback callback) { |
110 | rebalance_error_callback_ = move(callback); |
111 | } |
112 | |
113 | void Consumer::subscribe(const vector<string>& topics) { |
114 | TopicPartitionList topic_partitions(topics.begin(), topics.end()); |
115 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
116 | rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), topic_list_handle.get()); |
117 | check_error(error); |
118 | } |
119 | |
120 | void Consumer::unsubscribe() { |
121 | rd_kafka_resp_err_t error = rd_kafka_unsubscribe(get_handle()); |
122 | check_error(error); |
123 | } |
124 | |
125 | void Consumer::assign(const TopicPartitionList& topic_partitions) { |
126 | rd_kafka_resp_err_t error; |
127 | if (topic_partitions.empty()) { |
128 | error = rd_kafka_assign(get_handle(), nullptr); |
129 | check_error(error); |
130 | } |
131 | else { |
132 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
133 | error = rd_kafka_assign(get_handle(), topic_list_handle.get()); |
134 | check_error(error, topic_list_handle.get()); |
135 | } |
136 | } |
137 | |
138 | void Consumer::unassign() { |
139 | rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), nullptr); |
140 | check_error(error); |
141 | } |
142 | |
143 | void Consumer::pause() { |
144 | pause_partitions(get_assignment()); |
145 | } |
146 | |
147 | void Consumer::resume() { |
148 | resume_partitions(get_assignment()); |
149 | } |
150 | |
151 | void Consumer::commit() { |
152 | commit(nullptr, false); |
153 | } |
154 | |
155 | void Consumer::async_commit() { |
156 | commit(nullptr, true); |
157 | } |
158 | |
159 | void Consumer::commit(const Message& msg) { |
160 | commit(msg, false); |
161 | } |
162 | |
163 | void Consumer::async_commit(const Message& msg) { |
164 | commit(msg, true); |
165 | } |
166 | |
167 | void Consumer::commit(const TopicPartitionList& topic_partitions) { |
168 | commit(&topic_partitions, false); |
169 | } |
170 | |
171 | void Consumer::async_commit(const TopicPartitionList& topic_partitions) { |
172 | commit(&topic_partitions, true); |
173 | } |
174 | |
175 | KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_partition) const { |
176 | int64_t low; |
177 | int64_t high; |
178 | const string& topic = topic_partition.get_topic(); |
179 | const int partition = topic_partition.get_partition(); |
180 | rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(), |
181 | partition, &low, &high); |
182 | check_error(result); |
183 | return make_tuple(low, high); |
184 | } |
185 | |
186 | TopicPartitionList |
187 | Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const { |
188 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
189 | rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), |
190 | static_cast<int>(get_timeout().count())); |
191 | check_error(error, topic_list_handle.get()); |
192 | return convert(topic_list_handle); |
193 | } |
194 | |
195 | TopicPartitionList |
196 | Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const { |
197 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
198 | rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get()); |
199 | check_error(error, topic_list_handle.get()); |
200 | return convert(topic_list_handle); |
201 | } |
202 | |
203 | #if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION) |
204 | void Consumer::store_consumed_offsets() const { |
205 | store_offsets(get_offsets_position(get_assignment())); |
206 | } |
207 | |
208 | void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const { |
209 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
210 | rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get()); |
211 | check_error(error, topic_list_handle.get()); |
212 | } |
213 | #endif |
214 | |
215 | void Consumer::store_offset(const Message& msg) const { |
216 | rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset()); |
217 | check_error(error); |
218 | } |
219 | |
220 | vector<string> Consumer::get_subscription() const { |
221 | rd_kafka_resp_err_t error; |
222 | rd_kafka_topic_partition_list_t* list = nullptr; |
223 | error = rd_kafka_subscription(get_handle(), &list); |
224 | check_error(error); |
225 | |
226 | auto handle = make_handle(list); |
227 | vector<string> output; |
228 | for (const auto& topic_partition : convert(handle)) { |
229 | output.push_back(topic_partition.get_topic()); |
230 | } |
231 | return output; |
232 | } |
233 | |
234 | TopicPartitionList Consumer::get_assignment() const { |
235 | rd_kafka_resp_err_t error; |
236 | rd_kafka_topic_partition_list_t* list = nullptr; |
237 | error = rd_kafka_assignment(get_handle(), &list); |
238 | check_error(error); |
239 | return convert(make_handle(list)); |
240 | } |
241 | |
242 | string Consumer::get_member_id() const { |
243 | return rd_kafka_memberid(get_handle()); |
244 | } |
245 | |
246 | const Consumer::AssignmentCallback& Consumer::get_assignment_callback() const { |
247 | return assignment_callback_; |
248 | } |
249 | |
250 | const Consumer::RevocationCallback& Consumer::get_revocation_callback() const { |
251 | return revocation_callback_; |
252 | } |
253 | |
254 | const Consumer::RebalanceErrorCallback& Consumer::get_rebalance_error_callback() const { |
255 | return rebalance_error_callback_; |
256 | } |
257 | |
258 | Message Consumer::poll() { |
259 | return poll(get_timeout()); |
260 | } |
261 | |
262 | Message Consumer::poll(milliseconds timeout) { |
263 | return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count())); |
264 | } |
265 | |
266 | std::vector<Message> Consumer::poll_batch(size_t max_batch_size) { |
267 | return poll_batch(max_batch_size, get_timeout(), allocator<Message>()); |
268 | } |
269 | |
270 | std::vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { |
271 | return poll_batch(max_batch_size, timeout, allocator<Message>()); |
272 | } |
273 | |
274 | Queue Consumer::get_main_queue() const { |
275 | Queue queue = Queue::make_queue(rd_kafka_queue_get_main(get_handle())); |
276 | queue.disable_queue_forwarding(); |
277 | return queue; |
278 | } |
279 | |
280 | Queue Consumer::get_consumer_queue() const { |
281 | return Queue::make_queue(rd_kafka_queue_get_consumer(get_handle())); |
282 | } |
283 | |
284 | Queue Consumer::get_partition_queue(const TopicPartition& partition) const { |
285 | Queue queue = Queue::make_queue(rd_kafka_queue_get_partition(get_handle(), |
286 | partition.get_topic().c_str(), |
287 | partition.get_partition())); |
288 | queue.disable_queue_forwarding(); |
289 | return queue; |
290 | } |
291 | |
292 | void Consumer::close() { |
293 | rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); |
294 | check_error(error); |
295 | } |
296 | |
297 | void Consumer::commit(const Message& msg, bool async) { |
298 | rd_kafka_resp_err_t error; |
299 | error = rd_kafka_commit_message(get_handle(), msg.get_handle(), async ? 1 : 0); |
300 | check_error(error); |
301 | } |
302 | |
303 | void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) { |
304 | rd_kafka_resp_err_t error; |
305 | if (topic_partitions == nullptr) { |
306 | error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0); |
307 | check_error(error); |
308 | } |
309 | else { |
310 | TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions); |
311 | error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0); |
312 | check_error(error, topic_list_handle.get()); |
313 | } |
314 | } |
315 | |
316 | void Consumer::handle_rebalance(rd_kafka_resp_err_t error, |
317 | TopicPartitionList& topic_partitions) { |
318 | if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { |
319 | CallbackInvoker<AssignmentCallback>("assignment" , assignment_callback_, this)(topic_partitions); |
320 | assign(topic_partitions); |
321 | } |
322 | else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { |
323 | CallbackInvoker<RevocationCallback>("revocation" , revocation_callback_, this)(topic_partitions); |
324 | unassign(); |
325 | } |
326 | else { |
327 | CallbackInvoker<RebalanceErrorCallback>("rebalance error" , rebalance_error_callback_, this)(error); |
328 | unassign(); |
329 | } |
330 | } |
331 | |
332 | } // cppkafka |
333 | |