1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30#include "kafka_handle_base.h"
31#include "metadata.h"
32#include "group_information.h"
33#include "exceptions.h"
34#include "topic.h"
35#include "topic_partition_list.h"
36
37using std::string;
38using std::vector;
39using std::move;
40using std::make_tuple;
41using std::lock_guard;
42using std::mutex;
43using std::exception;
44using std::chrono::milliseconds;
45
46namespace cppkafka {
47
48const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000};
49
50KafkaHandleBase::KafkaHandleBase(Configuration config)
51: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, nullptr) {
52 auto& maybe_config = config_.get_default_topic_configuration();
53 if (maybe_config) {
54 maybe_config->set_as_opaque();
55 auto conf_handle = rd_kafka_topic_conf_dup(maybe_config->get_handle());
56 rd_kafka_conf_set_default_topic_conf(config_.get_handle(), conf_handle);
57 }
58}
59
60void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) {
61 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
62 rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(),
63 topic_list_handle.get());
64 check_error(error, topic_list_handle.get());
65}
66
67void KafkaHandleBase::pause(const std::string& topic) {
68 pause_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
69}
70
71void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) {
72 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
73 rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
74 topic_list_handle.get());
75 check_error(error, topic_list_handle.get());
76}
77
78void KafkaHandleBase::resume(const std::string& topic) {
79 resume_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions()));
80}
81
82void KafkaHandleBase::set_timeout(milliseconds timeout) {
83 timeout_ms_ = timeout;
84}
85
86void KafkaHandleBase::set_log_level(LogLevel level) {
87 rd_kafka_set_log_level(handle_.get(), static_cast<int>(level));
88}
89
90void KafkaHandleBase::add_brokers(const string& brokers) {
91 rd_kafka_brokers_add(handle_.get(), brokers.data());
92}
93
94rd_kafka_t* KafkaHandleBase::get_handle() const {
95 return handle_.get();
96}
97
98Topic KafkaHandleBase::get_topic(const string& name) {
99 save_topic_config(name, TopicConfiguration{});
100 return get_topic(name, nullptr);
101}
102
103Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config) {
104 auto handle = config.get_handle();
105 save_topic_config(name, move(config));
106 return get_topic(name, rd_kafka_topic_conf_dup(handle));
107}
108
109KafkaHandleBase::OffsetTuple
110KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
111 int64_t low;
112 int64_t high;
113 const string& topic = topic_partition.get_topic();
114 const int partition = topic_partition.get_partition();
115 const int timeout = static_cast<int>(timeout_ms_.count());
116 rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
117 partition, &low, &high,
118 timeout);
119 check_error(result);
120 return make_tuple(low, high);
121}
122
123Metadata KafkaHandleBase::get_metadata(bool all_topics) const {
124 return get_metadata(all_topics, nullptr);
125}
126
127TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
128 Metadata md = get_metadata(false, topic.get_handle());
129 auto topics = md.get_topics();
130 if (topics.empty()) {
131 throw ElementNotFound("topic metadata", topic.get_name());
132 }
133 return topics.front();
134}
135
136GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
137 auto result = fetch_consumer_groups(name.c_str());
138 if (result.empty()) {
139 throw ElementNotFound("consumer group information", name);
140 }
141 return move(result[0]);
142}
143
144vector<GroupInformation> KafkaHandleBase::get_consumer_groups() {
145 return fetch_consumer_groups(nullptr);
146}
147
148TopicPartitionList
149KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
150 TopicPartitionList topic_partitions;
151 for (const auto& query : queries) {
152 const TopicPartition& topic_partition = query.first;
153 topic_partitions.emplace_back(topic_partition.get_topic(), topic_partition.get_partition(),
154 query.second.count());
155 }
156 TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
157 const int timeout = static_cast<int>(timeout_ms_.count());
158 rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
159 timeout);
160 check_error(result, topic_list_handle.get());
161 return convert(topic_list_handle);
162}
163
164string KafkaHandleBase::get_name() const {
165 return rd_kafka_name(handle_.get());
166}
167
168milliseconds KafkaHandleBase::get_timeout() const {
169 return timeout_ms_;
170}
171
172const Configuration& KafkaHandleBase::get_configuration() const {
173 return config_;
174}
175
176int KafkaHandleBase::get_out_queue_length() const {
177 return rd_kafka_outq_len(handle_.get());
178}
179
180void KafkaHandleBase::yield() const {
181 rd_kafka_yield(handle_.get());
182}
183
184void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
185 handle_ = HandlePtr(handle, &rd_kafka_destroy);
186}
187
188Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
189 rd_kafka_topic_t* topic = rd_kafka_topic_new(get_handle(), name.data(), conf);
190 if (!topic) {
191 throw HandleException(rd_kafka_last_error());
192 }
193 return Topic(topic);
194}
195
196Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const {
197 const rd_kafka_metadata_t* metadata;
198 const int timeout = static_cast<int>(timeout_ms_.count());
199 rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics,
200 topic_ptr, &metadata, timeout);
201 check_error(error);
202 return Metadata(metadata);
203}
204
205vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name) {
206 const rd_kafka_group_list* list = nullptr;
207 const int timeout = static_cast<int>(timeout_ms_.count());
208 auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout);
209 check_error(result);
210
211 // Wrap this in a unique_ptr so it gets auto deleted
212 using GroupHandle = std::unique_ptr<const rd_kafka_group_list,
213 decltype(&rd_kafka_group_list_destroy)>;
214 GroupHandle group_handle(list, &rd_kafka_group_list_destroy);
215
216 vector<GroupInformation> groups;
217 for (int i = 0; i < list->group_cnt; ++i) {
218 groups.emplace_back(list->groups[i]);
219 }
220 return groups;
221}
222
223void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfiguration config) {
224 lock_guard<mutex> _(topic_configurations_mutex_);
225 auto iter = topic_configurations_.emplace(topic_name, move(config)).first;
226 iter->second.set_as_opaque();
227}
228
229void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const {
230 if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
231 throw HandleException(error);
232 }
233}
234
235void KafkaHandleBase::check_error(rd_kafka_resp_err_t error,
236 const rd_kafka_topic_partition_list_t* list_ptr) const {
237 if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
238 throw HandleException(error);
239 }
240 if (list_ptr) {
241 //check if any partition has errors
242 for (int i = 0; i < list_ptr->cnt; ++i) {
243 if (list_ptr->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
244 throw HandleException(list_ptr->elems[i].err);
245 }
246 }
247 }
248}
249
250rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
251 return config_.get_handle();
252}
253
254} // cppkafka
255