1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30#include "configuration.h"
31#include <vector>
32#include <librdkafka/rdkafka.h>
33#include "exceptions.h"
34#include "message.h"
35#include "producer.h"
36#include "consumer.h"
37
38using std::string;
39using std::map;
40using std::move;
41using std::vector;
42using std::initializer_list;
43using std::chrono::milliseconds;
44using boost::optional;
45
46namespace cppkafka {
47
48// Callback proxies
49
50void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
51 Producer* handle = static_cast<Producer*>(opaque);
52 Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
53 CallbackInvoker<Configuration::DeliveryReportCallback>
54 ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
55 (*handle, message);
56}
57
58void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err,
59 rd_kafka_topic_partition_list_t *offsets, void *opaque) {
60 Consumer* handle = static_cast<Consumer*>(opaque);
61 TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{};
62 CallbackInvoker<Configuration::OffsetCommitCallback>
63 ("offset commit", handle->get_configuration().get_offset_commit_callback(), handle)
64 (*handle, err, list);
65}
66
67void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) {
68 KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
69 CallbackInvoker<Configuration::ErrorCallback>
70 ("error", handle->get_configuration().get_error_callback(), handle)
71 (*handle, err, reason);
72}
73
74void throttle_callback_proxy(rd_kafka_t*, const char* broker_name,
75 int32_t broker_id, int throttle_time_ms, void *opaque) {
76 KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
77 CallbackInvoker<Configuration::ThrottleCallback>
78 ("throttle", handle->get_configuration().get_throttle_callback(), handle)
79 (*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
80}
81
82void log_callback_proxy(const rd_kafka_t* h, int level,
83 const char* facility, const char* message) {
84 KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(rd_kafka_opaque(h));
85 CallbackInvoker<Configuration::LogCallback>
86 ("log", handle->get_configuration().get_log_callback(), nullptr)
87 (*handle, level, facility, message);
88}
89
90int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) {
91 KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
92 CallbackInvoker<Configuration::StatsCallback>
93 ("statistics", handle->get_configuration().get_stats_callback(), handle)
94 (*handle, string(json, json + json_len));
95 return 0;
96}
97
98int socket_callback_proxy(int domain, int type, int protocol, void* opaque) {
99 KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
100 return CallbackInvoker<Configuration::SocketCallback>
101 ("socket", handle->get_configuration().get_socket_callback(), handle)
102 (domain, type, protocol);
103}
104
105void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, void *opaque) {
106 KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
107 CallbackInvoker<Configuration::BackgroundEventCallback>
108 ("background_event", handle->get_configuration().get_background_event_callback(), handle)
109 (*handle, Event{event_ptr});
110}
111
112// Configuration
113
114Configuration::Configuration()
115: handle_(make_handle(rd_kafka_conf_new())) {
116
117}
118
119Configuration::Configuration(const vector<ConfigurationOption>& options)
120: Configuration() {
121 set(options);
122}
123
124Configuration::Configuration(const initializer_list<ConfigurationOption>& options)
125: Configuration() {
126 set(options);
127}
128
129Configuration::Configuration(rd_kafka_conf_t* ptr)
130: handle_(make_handle(ptr)) {
131
132}
133
134Configuration& Configuration::set(const string& name, const string& value) {
135 char error_buffer[512];
136 rd_kafka_conf_res_t result;
137 result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
138 sizeof(error_buffer));
139 if (result != RD_KAFKA_CONF_OK) {
140 throw ConfigException(name, error_buffer);
141 }
142 return *this;
143}
144
145Configuration& Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
146 delivery_report_callback_ = move(callback);
147 rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy);
148 return *this;
149}
150
151Configuration& Configuration::set_offset_commit_callback(OffsetCommitCallback callback) {
152 offset_commit_callback_ = move(callback);
153 rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy);
154 return *this;
155}
156
157Configuration& Configuration::set_error_callback(ErrorCallback callback) {
158 error_callback_ = move(callback);
159 rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy);
160 return *this;
161}
162
163Configuration& Configuration::set_throttle_callback(ThrottleCallback callback) {
164 throttle_callback_ = move(callback);
165 rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy);
166 return *this;
167}
168
169Configuration& Configuration::set_log_callback(LogCallback callback) {
170 log_callback_ = move(callback);
171 rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy);
172 return *this;
173}
174
175Configuration& Configuration::set_stats_callback(StatsCallback callback) {
176 stats_callback_ = move(callback);
177 rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy);
178 return *this;
179}
180
181Configuration& Configuration::set_socket_callback(SocketCallback callback) {
182 socket_callback_ = move(callback);
183 rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy);
184 return *this;
185}
186
187#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
188Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) {
189 background_event_callback_ = move(callback);
190 rd_kafka_conf_set_background_event_cb(handle_.get(), &background_event_callback_proxy);
191 return *this;
192}
193
194Configuration& Configuration::set_events(int events) {
195 rd_kafka_conf_set_events(handle_.get(), events);
196 return *this;
197}
198#endif
199
200Configuration&
201Configuration::set_default_topic_configuration(TopicConfiguration config) {
202 default_topic_config_ = std::move(config);
203 return *this;
204}
205
206bool Configuration::has_property(const string& name) const {
207 size_t size = 0;
208 return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK;
209}
210
211rd_kafka_conf_t* Configuration::get_handle() const {
212 return handle_.get();
213}
214
215string Configuration::get(const string& name) const {
216 size_t size = 0;
217 auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size);
218 if (result != RD_KAFKA_CONF_OK) {
219 throw ConfigOptionNotFound(name);
220 }
221 vector<char> buffer(size);
222 rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size);
223 return string(buffer.data());
224}
225
226map<string, string> Configuration::get_all() const {
227 size_t count = 0;
228 const char** all = rd_kafka_conf_dump(handle_.get(), &count);
229 map<string, string> output = parse_dump(all, count);
230 rd_kafka_conf_dump_free(all, count);
231 return output;
232}
233
234const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const {
235 return delivery_report_callback_;
236}
237
238const Configuration::OffsetCommitCallback& Configuration::get_offset_commit_callback() const {
239 return offset_commit_callback_;
240}
241
242const Configuration::ErrorCallback& Configuration::get_error_callback() const {
243 return error_callback_;
244}
245
246const Configuration::ThrottleCallback& Configuration::get_throttle_callback() const {
247 return throttle_callback_;
248}
249
250const Configuration::LogCallback& Configuration::get_log_callback() const {
251 return log_callback_;
252}
253
254const Configuration::StatsCallback& Configuration::get_stats_callback() const {
255 return stats_callback_;
256}
257
258const Configuration::SocketCallback& Configuration::get_socket_callback() const {
259 return socket_callback_;
260}
261
262const Configuration::BackgroundEventCallback&
263Configuration::get_background_event_callback() const {
264 return background_event_callback_;
265}
266
267const optional<TopicConfiguration>& Configuration::get_default_topic_configuration() const {
268 return default_topic_config_;
269}
270
271optional<TopicConfiguration>& Configuration::get_default_topic_configuration() {
272 return default_topic_config_;
273}
274
275Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) {
276 return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup);
277}
278
279} // cppkafka
280