| 1 | /* |
| 2 | * Copyright (c) 2017, Matias Fontanini |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Redistribution and use in source and binary forms, with or without |
| 6 | * modification, are permitted provided that the following conditions are |
| 7 | * met: |
| 8 | * |
| 9 | * * Redistributions of source code must retain the above copyright |
| 10 | * notice, this list of conditions and the following disclaimer. |
| 11 | * * Redistributions in binary form must reproduce the above |
| 12 | * copyright notice, this list of conditions and the following disclaimer |
| 13 | * in the documentation and/or other materials provided with the |
| 14 | * distribution. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 17 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 18 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 19 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 20 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 21 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 22 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 23 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 24 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 25 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 26 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 27 | * |
| 28 | */ |
| 29 | #include <sstream> |
| 30 | #include <algorithm> |
| 31 | #include <cctype> |
| 32 | #include "macros.h" |
| 33 | #include "consumer.h" |
| 34 | #include "exceptions.h" |
| 35 | #include "logging.h" |
| 36 | #include "configuration.h" |
| 37 | #include "topic_partition_list.h" |
| 38 | #include "detail/callback_invoker.h" |
| 39 | |
| 40 | using std::vector; |
| 41 | using std::string; |
| 42 | using std::move; |
| 43 | using std::make_tuple; |
| 44 | using std::ostringstream; |
| 45 | using std::chrono::milliseconds; |
| 46 | using std::toupper; |
| 47 | using std::equal; |
| 48 | using std::allocator; |
| 49 | |
| 50 | namespace cppkafka { |
| 51 | |
| 52 | void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, |
| 53 | rd_kafka_topic_partition_list_t *partitions, void *opaque) { |
| 54 | TopicPartitionList list = convert(partitions); |
| 55 | static_cast<Consumer*>(opaque)->handle_rebalance(error, list); |
| 56 | } |
| 57 | |
| 58 | Consumer::Consumer(Configuration config) |
| 59 | : KafkaHandleBase(move(config)) { |
| 60 | char error_buffer[512]; |
| 61 | rd_kafka_conf_t* config_handle = get_configuration_handle(); |
| 62 | // Set ourselves as the opaque pointer |
| 63 | rd_kafka_conf_set_opaque(config_handle, this); |
| 64 | rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy); |
| 65 | rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, |
| 66 | rd_kafka_conf_dup(config_handle), |
| 67 | error_buffer, sizeof(error_buffer)); |
| 68 | if (!ptr) { |
| 69 | throw Exception("Failed to create consumer handle: " + string(error_buffer)); |
| 70 | } |
| 71 | rd_kafka_poll_set_consumer(ptr); |
| 72 | set_handle(ptr); |
| 73 | } |
| 74 | |
| 75 | Consumer::~Consumer() { |
| 76 | try { |
| 77 | // make sure to destroy the function closures. in case they hold kafka |
| 78 | // objects, they will need to be destroyed before we destroy the handle |
| 79 | assignment_callback_ = nullptr; |
| 80 | revocation_callback_ = nullptr; |
| 81 | rebalance_error_callback_ = nullptr; |
| 82 | close(); |
| 83 | } |
| 84 | catch (const HandleException& ex) { |
| 85 | ostringstream error_msg; |
| 86 | error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); |
| 87 | CallbackInvoker<Configuration::ErrorCallback> error_cb("error" , get_configuration().get_error_callback(), this); |
| 88 | CallbackInvoker<Configuration::LogCallback> logger_cb("log" , get_configuration().get_log_callback(), nullptr); |
| 89 | if (error_cb) { |
| 90 | error_cb(*this, static_cast<int>(ex.get_error().get_error()), error_msg.str()); |
| 91 | } |
| 92 | else if (logger_cb) { |
| 93 | logger_cb(*this, static_cast<int>(LogLevel::LogErr), "cppkafka" , error_msg.str()); |
| 94 | } |
| 95 | else { |
| 96 | rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LogErr), "cppkafka" , error_msg.str().c_str()); |
| 97 | } |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | void Consumer::set_assignment_callback(AssignmentCallback callback) { |
| 102 | assignment_callback_ = move(callback); |
| 103 | } |
| 104 | |
| 105 | void Consumer::set_revocation_callback(RevocationCallback callback) { |
| 106 | revocation_callback_ = move(callback); |
| 107 | } |
| 108 | |
| 109 | void Consumer::set_rebalance_error_callback(RebalanceErrorCallback callback) { |
| 110 | rebalance_error_callback_ = move(callback); |
| 111 | } |
| 112 | |
| 113 | void Consumer::subscribe(const vector<string>& topics) { |
| 114 | TopicPartitionList topic_partitions(topics.begin(), topics.end()); |
| 115 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
| 116 | rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), topic_list_handle.get()); |
| 117 | check_error(error); |
| 118 | } |
| 119 | |
| 120 | void Consumer::unsubscribe() { |
| 121 | rd_kafka_resp_err_t error = rd_kafka_unsubscribe(get_handle()); |
| 122 | check_error(error); |
| 123 | } |
| 124 | |
| 125 | void Consumer::assign(const TopicPartitionList& topic_partitions) { |
| 126 | rd_kafka_resp_err_t error; |
| 127 | if (topic_partitions.empty()) { |
| 128 | error = rd_kafka_assign(get_handle(), nullptr); |
| 129 | check_error(error); |
| 130 | } |
| 131 | else { |
| 132 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
| 133 | error = rd_kafka_assign(get_handle(), topic_list_handle.get()); |
| 134 | check_error(error, topic_list_handle.get()); |
| 135 | } |
| 136 | } |
| 137 | |
| 138 | void Consumer::unassign() { |
| 139 | rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), nullptr); |
| 140 | check_error(error); |
| 141 | } |
| 142 | |
| 143 | void Consumer::pause() { |
| 144 | pause_partitions(get_assignment()); |
| 145 | } |
| 146 | |
| 147 | void Consumer::resume() { |
| 148 | resume_partitions(get_assignment()); |
| 149 | } |
| 150 | |
| 151 | void Consumer::commit() { |
| 152 | commit(nullptr, false); |
| 153 | } |
| 154 | |
| 155 | void Consumer::async_commit() { |
| 156 | commit(nullptr, true); |
| 157 | } |
| 158 | |
| 159 | void Consumer::commit(const Message& msg) { |
| 160 | commit(msg, false); |
| 161 | } |
| 162 | |
| 163 | void Consumer::async_commit(const Message& msg) { |
| 164 | commit(msg, true); |
| 165 | } |
| 166 | |
| 167 | void Consumer::commit(const TopicPartitionList& topic_partitions) { |
| 168 | commit(&topic_partitions, false); |
| 169 | } |
| 170 | |
| 171 | void Consumer::async_commit(const TopicPartitionList& topic_partitions) { |
| 172 | commit(&topic_partitions, true); |
| 173 | } |
| 174 | |
| 175 | KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_partition) const { |
| 176 | int64_t low; |
| 177 | int64_t high; |
| 178 | const string& topic = topic_partition.get_topic(); |
| 179 | const int partition = topic_partition.get_partition(); |
| 180 | rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(), |
| 181 | partition, &low, &high); |
| 182 | check_error(result); |
| 183 | return make_tuple(low, high); |
| 184 | } |
| 185 | |
| 186 | TopicPartitionList |
| 187 | Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const { |
| 188 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
| 189 | rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), |
| 190 | static_cast<int>(get_timeout().count())); |
| 191 | check_error(error, topic_list_handle.get()); |
| 192 | return convert(topic_list_handle); |
| 193 | } |
| 194 | |
| 195 | TopicPartitionList |
| 196 | Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const { |
| 197 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
| 198 | rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get()); |
| 199 | check_error(error, topic_list_handle.get()); |
| 200 | return convert(topic_list_handle); |
| 201 | } |
| 202 | |
| 203 | #if (RD_KAFKA_VERSION >= RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION) |
| 204 | void Consumer::store_consumed_offsets() const { |
| 205 | store_offsets(get_offsets_position(get_assignment())); |
| 206 | } |
| 207 | |
| 208 | void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const { |
| 209 | TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); |
| 210 | rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get()); |
| 211 | check_error(error, topic_list_handle.get()); |
| 212 | } |
| 213 | #endif |
| 214 | |
| 215 | void Consumer::store_offset(const Message& msg) const { |
| 216 | rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset()); |
| 217 | check_error(error); |
| 218 | } |
| 219 | |
| 220 | vector<string> Consumer::get_subscription() const { |
| 221 | rd_kafka_resp_err_t error; |
| 222 | rd_kafka_topic_partition_list_t* list = nullptr; |
| 223 | error = rd_kafka_subscription(get_handle(), &list); |
| 224 | check_error(error); |
| 225 | |
| 226 | auto handle = make_handle(list); |
| 227 | vector<string> output; |
| 228 | for (const auto& topic_partition : convert(handle)) { |
| 229 | output.push_back(topic_partition.get_topic()); |
| 230 | } |
| 231 | return output; |
| 232 | } |
| 233 | |
| 234 | TopicPartitionList Consumer::get_assignment() const { |
| 235 | rd_kafka_resp_err_t error; |
| 236 | rd_kafka_topic_partition_list_t* list = nullptr; |
| 237 | error = rd_kafka_assignment(get_handle(), &list); |
| 238 | check_error(error); |
| 239 | return convert(make_handle(list)); |
| 240 | } |
| 241 | |
| 242 | string Consumer::get_member_id() const { |
| 243 | return rd_kafka_memberid(get_handle()); |
| 244 | } |
| 245 | |
| 246 | const Consumer::AssignmentCallback& Consumer::get_assignment_callback() const { |
| 247 | return assignment_callback_; |
| 248 | } |
| 249 | |
| 250 | const Consumer::RevocationCallback& Consumer::get_revocation_callback() const { |
| 251 | return revocation_callback_; |
| 252 | } |
| 253 | |
| 254 | const Consumer::RebalanceErrorCallback& Consumer::get_rebalance_error_callback() const { |
| 255 | return rebalance_error_callback_; |
| 256 | } |
| 257 | |
| 258 | Message Consumer::poll() { |
| 259 | return poll(get_timeout()); |
| 260 | } |
| 261 | |
| 262 | Message Consumer::poll(milliseconds timeout) { |
| 263 | return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count())); |
| 264 | } |
| 265 | |
| 266 | std::vector<Message> Consumer::poll_batch(size_t max_batch_size) { |
| 267 | return poll_batch(max_batch_size, get_timeout(), allocator<Message>()); |
| 268 | } |
| 269 | |
| 270 | std::vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { |
| 271 | return poll_batch(max_batch_size, timeout, allocator<Message>()); |
| 272 | } |
| 273 | |
| 274 | Queue Consumer::get_main_queue() const { |
| 275 | Queue queue = Queue::make_queue(rd_kafka_queue_get_main(get_handle())); |
| 276 | queue.disable_queue_forwarding(); |
| 277 | return queue; |
| 278 | } |
| 279 | |
| 280 | Queue Consumer::get_consumer_queue() const { |
| 281 | return Queue::make_queue(rd_kafka_queue_get_consumer(get_handle())); |
| 282 | } |
| 283 | |
| 284 | Queue Consumer::get_partition_queue(const TopicPartition& partition) const { |
| 285 | Queue queue = Queue::make_queue(rd_kafka_queue_get_partition(get_handle(), |
| 286 | partition.get_topic().c_str(), |
| 287 | partition.get_partition())); |
| 288 | queue.disable_queue_forwarding(); |
| 289 | return queue; |
| 290 | } |
| 291 | |
| 292 | void Consumer::close() { |
| 293 | rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); |
| 294 | check_error(error); |
| 295 | } |
| 296 | |
| 297 | void Consumer::commit(const Message& msg, bool async) { |
| 298 | rd_kafka_resp_err_t error; |
| 299 | error = rd_kafka_commit_message(get_handle(), msg.get_handle(), async ? 1 : 0); |
| 300 | check_error(error); |
| 301 | } |
| 302 | |
| 303 | void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) { |
| 304 | rd_kafka_resp_err_t error; |
| 305 | if (topic_partitions == nullptr) { |
| 306 | error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0); |
| 307 | check_error(error); |
| 308 | } |
| 309 | else { |
| 310 | TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions); |
| 311 | error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0); |
| 312 | check_error(error, topic_list_handle.get()); |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | void Consumer::handle_rebalance(rd_kafka_resp_err_t error, |
| 317 | TopicPartitionList& topic_partitions) { |
| 318 | if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { |
| 319 | CallbackInvoker<AssignmentCallback>("assignment" , assignment_callback_, this)(topic_partitions); |
| 320 | assign(topic_partitions); |
| 321 | } |
| 322 | else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { |
| 323 | CallbackInvoker<RevocationCallback>("revocation" , revocation_callback_, this)(topic_partitions); |
| 324 | unassign(); |
| 325 | } |
| 326 | else { |
| 327 | CallbackInvoker<RebalanceErrorCallback>("rebalance error" , rebalance_error_callback_, this)(error); |
| 328 | unassign(); |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | } // cppkafka |
| 333 | |