1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29#include <sstream>
30#include <algorithm>
31#include <cctype>
32#include "macros.h"
33#include "consumer.h"
34#include "exceptions.h"
35#include "logging.h"
36#include "configuration.h"
37#include "topic_partition_list.h"
38#include "detail/callback_invoker.h"
39
40using std::vector;
41using std::string;
42using std::move;
43using std::make_tuple;
44using std::ostringstream;
45using std::chrono::milliseconds;
46using std::toupper;
47using std::equal;
48using std::allocator;
49
50namespace cppkafka {
51
52void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
53 rd_kafka_topic_partition_list_t *partitions, void *opaque) {
54 TopicPartitionList list = convert(partitions);
55 static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
56}
57
58Consumer::Consumer(Configuration config)
59: KafkaHandleBase(move(config)) {
60 char error_buffer[512];
61 rd_kafka_conf_t* config_handle = get_configuration_handle();
62 // Set ourselves as the opaque pointer
63 rd_kafka_conf_set_opaque(config_handle, this);
64 rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy);
65 rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
66 rd_kafka_conf_dup(config_handle),
67 error_buffer, sizeof(error_buffer));
68 if (!ptr) {
69 throw Exception("Failed to create consumer handle: " + string(error_buffer));
70 }
71 rd_kafka_poll_set_consumer(ptr);
72 set_handle(ptr);
73}
74
75Consumer::~Consumer() {
76 try {
77 // make sure to destroy the function closures. in case they hold kafka
78 // objects, they will need to be destroyed before we destroy the handle
79 assignment_callback_ = nullptr;
80 revocation_callback_ = nullptr;
81 rebalance_error_callback_ = nullptr;
82 close();
83 }
84 catch (const HandleException& ex) {
85 ostringstream error_msg;
86 error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what();
87 CallbackInvoker<Configuration::ErrorCallback> error_cb("error", get_configuration().get_error_callback(), this);
88 CallbackInvoker<Configuration::LogCallback> logger_cb("log", get_configuration().get_log_callback(), nullptr);
89 if (error_cb) {
90 error_cb(*this, static_cast<int>(ex.get_error().get_error()), error_msg.str());
91 }
92 else if (logger_cb) {
93 logger_cb(*this, static_cast<int>(LogLevel::LogErr), "cppkafka", error_msg.str());
94 }
95 else {
96 rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LogErr), "cppkafka", error_msg.str().c_str());
97 }
98 }
99}
100
101void Consumer::set_assignment_callback(AssignmentCallback callback) {
102 assignment_callback_ = move(callback);
103}
104
105void Consumer::set_revocation_callback(RevocationCallback callback) {
106 revocation_callback_ = move(callback);
107}
108
109void Consumer::set_rebalance_error_callback(RebalanceErrorCallback callback) {
110 rebalance_error_callback_ = move(callback);
111}
112
113void Consumer::subscribe(const vector<string>& topics) {
114 TopicPartitionList topic_partitions(topics.begin(), topics.end());
115 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
116 rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), topic_list_handle.get());
117 check_error(error);
118}
119
120void Consumer::unsubscribe() {
121 rd_kafka_resp_err_t error = rd_kafka_unsubscribe(get_handle());
122 check_error(error);
123}
124
125void Consumer::assign(const TopicPartitionList& topic_partitions) {
126 rd_kafka_resp_err_t error;
127 if (topic_partitions.empty()) {
128 error = rd_kafka_assign(get_handle(), nullptr);
129 check_error(error);
130 }
131 else {
132 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
133 error = rd_kafka_assign(get_handle(), topic_list_handle.get());
134 check_error(error, topic_list_handle.get());
135 }
136}
137
138void Consumer::unassign() {
139 rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), nullptr);
140 check_error(error);
141}
142
143void Consumer::pause() {
144 pause_partitions(get_assignment());
145}
146
147void Consumer::resume() {
148 resume_partitions(get_assignment());
149}
150
151void Consumer::commit() {
152 commit(nullptr, false);
153}
154
155void Consumer::async_commit() {
156 commit(nullptr, true);
157}
158
159void Consumer::commit(const Message& msg) {
160 commit(msg, false);
161}
162
163void Consumer::async_commit(const Message& msg) {
164 commit(msg, true);
165}
166
167void Consumer::commit(const TopicPartitionList& topic_partitions) {
168 commit(&topic_partitions, false);
169}
170
171void Consumer::async_commit(const TopicPartitionList& topic_partitions) {
172 commit(&topic_partitions, true);
173}
174
175KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_partition) const {
176 int64_t low;
177 int64_t high;
178 const string& topic = topic_partition.get_topic();
179 const int partition = topic_partition.get_partition();
180 rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(),
181 partition, &low, &high);
182 check_error(result);
183 return make_tuple(low, high);
184}
185
186TopicPartitionList
187Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const {
188 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
189 rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
190 static_cast<int>(get_timeout().count()));
191 check_error(error, topic_list_handle.get());
192 return convert(topic_list_handle);
193}
194
195TopicPartitionList
196Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const {
197 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
198 rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get());
199 check_error(error, topic_list_handle.get());
200 return convert(topic_list_handle);
201}
202
203#if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION)
204void Consumer::store_consumed_offsets() const {
205 store_offsets(get_offsets_position(get_assignment()));
206}
207
208void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const {
209 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
210 rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get());
211 check_error(error, topic_list_handle.get());
212}
213#endif
214
215void Consumer::store_offset(const Message& msg) const {
216 rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset());
217 check_error(error);
218}
219
220vector<string> Consumer::get_subscription() const {
221 rd_kafka_resp_err_t error;
222 rd_kafka_topic_partition_list_t* list = nullptr;
223 error = rd_kafka_subscription(get_handle(), &list);
224 check_error(error);
225
226 auto handle = make_handle(list);
227 vector<string> output;
228 for (const auto& topic_partition : convert(handle)) {
229 output.push_back(topic_partition.get_topic());
230 }
231 return output;
232}
233
234TopicPartitionList Consumer::get_assignment() const {
235 rd_kafka_resp_err_t error;
236 rd_kafka_topic_partition_list_t* list = nullptr;
237 error = rd_kafka_assignment(get_handle(), &list);
238 check_error(error);
239 return convert(make_handle(list));
240}
241
242string Consumer::get_member_id() const {
243 return rd_kafka_memberid(get_handle());
244}
245
246const Consumer::AssignmentCallback& Consumer::get_assignment_callback() const {
247 return assignment_callback_;
248}
249
250const Consumer::RevocationCallback& Consumer::get_revocation_callback() const {
251 return revocation_callback_;
252}
253
254const Consumer::RebalanceErrorCallback& Consumer::get_rebalance_error_callback() const {
255 return rebalance_error_callback_;
256}
257
258Message Consumer::poll() {
259 return poll(get_timeout());
260}
261
262Message Consumer::poll(milliseconds timeout) {
263 return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count()));
264}
265
266std::vector<Message> Consumer::poll_batch(size_t max_batch_size) {
267 return poll_batch(max_batch_size, get_timeout(), allocator<Message>());
268}
269
270std::vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
271 return poll_batch(max_batch_size, timeout, allocator<Message>());
272}
273
274Queue Consumer::get_main_queue() const {
275 Queue queue = Queue::make_queue(rd_kafka_queue_get_main(get_handle()));
276 queue.disable_queue_forwarding();
277 return queue;
278}
279
280Queue Consumer::get_consumer_queue() const {
281 return Queue::make_queue(rd_kafka_queue_get_consumer(get_handle()));
282}
283
284Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
285 Queue queue = Queue::make_queue(rd_kafka_queue_get_partition(get_handle(),
286 partition.get_topic().c_str(),
287 partition.get_partition()));
288 queue.disable_queue_forwarding();
289 return queue;
290}
291
292void Consumer::close() {
293 rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle());
294 check_error(error);
295}
296
297void Consumer::commit(const Message& msg, bool async) {
298 rd_kafka_resp_err_t error;
299 error = rd_kafka_commit_message(get_handle(), msg.get_handle(), async ? 1 : 0);
300 check_error(error);
301}
302
303void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
304 rd_kafka_resp_err_t error;
305 if (topic_partitions == nullptr) {
306 error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0);
307 check_error(error);
308 }
309 else {
310 TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions);
311 error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0);
312 check_error(error, topic_list_handle.get());
313 }
314}
315
316void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
317 TopicPartitionList& topic_partitions) {
318 if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
319 CallbackInvoker<AssignmentCallback>("assignment", assignment_callback_, this)(topic_partitions);
320 assign(topic_partitions);
321 }
322 else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
323 CallbackInvoker<RevocationCallback>("revocation", revocation_callback_, this)(topic_partitions);
324 unassign();
325 }
326 else {
327 CallbackInvoker<RebalanceErrorCallback>("rebalance error", rebalance_error_callback_, this)(error);
328 unassign();
329 }
330}
331
332} // cppkafka
333