1 | /* |
2 | * Copyright (c) 2017, Matias Fontanini |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are |
7 | * met: |
8 | * |
9 | * * Redistributions of source code must retain the above copyright |
10 | * notice, this list of conditions and the following disclaimer. |
11 | * * Redistributions in binary form must reproduce the above |
12 | * copyright notice, this list of conditions and the following disclaimer |
13 | * in the documentation and/or other materials provided with the |
14 | * distribution. |
15 | * |
16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
17 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
18 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
19 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
20 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
21 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
22 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
23 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
24 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
25 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
26 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
27 | * |
28 | */ |
29 | |
30 | #include "configuration.h" |
31 | #include <vector> |
32 | #include <librdkafka/rdkafka.h> |
33 | #include "exceptions.h" |
34 | #include "message.h" |
35 | #include "producer.h" |
36 | #include "consumer.h" |
37 | |
38 | using std::string; |
39 | using std::map; |
40 | using std::move; |
41 | using std::vector; |
42 | using std::initializer_list; |
43 | using std::chrono::milliseconds; |
44 | using boost::optional; |
45 | |
46 | namespace cppkafka { |
47 | |
48 | // Callback proxies |
49 | |
50 | void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { |
51 | Producer* handle = static_cast<Producer*>(opaque); |
52 | Message message = Message::make_non_owning((rd_kafka_message_t*)msg); |
53 | CallbackInvoker<Configuration::DeliveryReportCallback> |
54 | ("delivery report" , handle->get_configuration().get_delivery_report_callback(), handle) |
55 | (*handle, message); |
56 | } |
57 | |
58 | void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err, |
59 | rd_kafka_topic_partition_list_t *offsets, void *opaque) { |
60 | Consumer* handle = static_cast<Consumer*>(opaque); |
61 | TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{}; |
62 | CallbackInvoker<Configuration::OffsetCommitCallback> |
63 | ("offset commit" , handle->get_configuration().get_offset_commit_callback(), handle) |
64 | (*handle, err, list); |
65 | } |
66 | |
67 | void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) { |
68 | KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque); |
69 | CallbackInvoker<Configuration::ErrorCallback> |
70 | ("error" , handle->get_configuration().get_error_callback(), handle) |
71 | (*handle, err, reason); |
72 | } |
73 | |
74 | void throttle_callback_proxy(rd_kafka_t*, const char* broker_name, |
75 | int32_t broker_id, int throttle_time_ms, void *opaque) { |
76 | KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque); |
77 | CallbackInvoker<Configuration::ThrottleCallback> |
78 | ("throttle" , handle->get_configuration().get_throttle_callback(), handle) |
79 | (*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); |
80 | } |
81 | |
82 | void log_callback_proxy(const rd_kafka_t* h, int level, |
83 | const char* facility, const char* message) { |
84 | KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(rd_kafka_opaque(h)); |
85 | CallbackInvoker<Configuration::LogCallback> |
86 | ("log" , handle->get_configuration().get_log_callback(), nullptr) |
87 | (*handle, level, facility, message); |
88 | } |
89 | |
90 | int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) { |
91 | KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque); |
92 | CallbackInvoker<Configuration::StatsCallback> |
93 | ("statistics" , handle->get_configuration().get_stats_callback(), handle) |
94 | (*handle, string(json, json + json_len)); |
95 | return 0; |
96 | } |
97 | |
98 | int socket_callback_proxy(int domain, int type, int protocol, void* opaque) { |
99 | KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque); |
100 | return CallbackInvoker<Configuration::SocketCallback> |
101 | ("socket" , handle->get_configuration().get_socket_callback(), handle) |
102 | (domain, type, protocol); |
103 | } |
104 | |
105 | void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, void *opaque) { |
106 | KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque); |
107 | CallbackInvoker<Configuration::BackgroundEventCallback> |
108 | ("background_event" , handle->get_configuration().get_background_event_callback(), handle) |
109 | (*handle, Event{event_ptr}); |
110 | } |
111 | |
112 | // Configuration |
113 | |
114 | Configuration::Configuration() |
115 | : handle_(make_handle(rd_kafka_conf_new())) { |
116 | |
117 | } |
118 | |
119 | Configuration::Configuration(const vector<ConfigurationOption>& options) |
120 | : Configuration() { |
121 | set(options); |
122 | } |
123 | |
124 | Configuration::Configuration(const initializer_list<ConfigurationOption>& options) |
125 | : Configuration() { |
126 | set(options); |
127 | } |
128 | |
129 | Configuration::Configuration(rd_kafka_conf_t* ptr) |
130 | : handle_(make_handle(ptr)) { |
131 | |
132 | } |
133 | |
134 | Configuration& Configuration::set(const string& name, const string& value) { |
135 | char error_buffer[512]; |
136 | rd_kafka_conf_res_t result; |
137 | result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, |
138 | sizeof(error_buffer)); |
139 | if (result != RD_KAFKA_CONF_OK) { |
140 | throw ConfigException(name, error_buffer); |
141 | } |
142 | return *this; |
143 | } |
144 | |
145 | Configuration& Configuration::set_delivery_report_callback(DeliveryReportCallback callback) { |
146 | delivery_report_callback_ = move(callback); |
147 | rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy); |
148 | return *this; |
149 | } |
150 | |
151 | Configuration& Configuration::set_offset_commit_callback(OffsetCommitCallback callback) { |
152 | offset_commit_callback_ = move(callback); |
153 | rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy); |
154 | return *this; |
155 | } |
156 | |
157 | Configuration& Configuration::set_error_callback(ErrorCallback callback) { |
158 | error_callback_ = move(callback); |
159 | rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy); |
160 | return *this; |
161 | } |
162 | |
163 | Configuration& Configuration::set_throttle_callback(ThrottleCallback callback) { |
164 | throttle_callback_ = move(callback); |
165 | rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy); |
166 | return *this; |
167 | } |
168 | |
169 | Configuration& Configuration::set_log_callback(LogCallback callback) { |
170 | log_callback_ = move(callback); |
171 | rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy); |
172 | return *this; |
173 | } |
174 | |
175 | Configuration& Configuration::set_stats_callback(StatsCallback callback) { |
176 | stats_callback_ = move(callback); |
177 | rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy); |
178 | return *this; |
179 | } |
180 | |
181 | Configuration& Configuration::set_socket_callback(SocketCallback callback) { |
182 | socket_callback_ = move(callback); |
183 | rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy); |
184 | return *this; |
185 | } |
186 | |
187 | #if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION |
188 | Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) { |
189 | background_event_callback_ = move(callback); |
190 | rd_kafka_conf_set_background_event_cb(handle_.get(), &background_event_callback_proxy); |
191 | return *this; |
192 | } |
193 | |
194 | Configuration& Configuration::set_events(int events) { |
195 | rd_kafka_conf_set_events(handle_.get(), events); |
196 | return *this; |
197 | } |
198 | #endif |
199 | |
200 | Configuration& |
201 | Configuration::set_default_topic_configuration(TopicConfiguration config) { |
202 | default_topic_config_ = std::move(config); |
203 | return *this; |
204 | } |
205 | |
206 | bool Configuration::has_property(const string& name) const { |
207 | size_t size = 0; |
208 | return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK; |
209 | } |
210 | |
211 | rd_kafka_conf_t* Configuration::get_handle() const { |
212 | return handle_.get(); |
213 | } |
214 | |
215 | string Configuration::get(const string& name) const { |
216 | size_t size = 0; |
217 | auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size); |
218 | if (result != RD_KAFKA_CONF_OK) { |
219 | throw ConfigOptionNotFound(name); |
220 | } |
221 | vector<char> buffer(size); |
222 | rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size); |
223 | return string(buffer.data()); |
224 | } |
225 | |
226 | map<string, string> Configuration::get_all() const { |
227 | size_t count = 0; |
228 | const char** all = rd_kafka_conf_dump(handle_.get(), &count); |
229 | map<string, string> output = parse_dump(all, count); |
230 | rd_kafka_conf_dump_free(all, count); |
231 | return output; |
232 | } |
233 | |
234 | const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const { |
235 | return delivery_report_callback_; |
236 | } |
237 | |
238 | const Configuration::OffsetCommitCallback& Configuration::get_offset_commit_callback() const { |
239 | return offset_commit_callback_; |
240 | } |
241 | |
242 | const Configuration::ErrorCallback& Configuration::get_error_callback() const { |
243 | return error_callback_; |
244 | } |
245 | |
246 | const Configuration::ThrottleCallback& Configuration::get_throttle_callback() const { |
247 | return throttle_callback_; |
248 | } |
249 | |
250 | const Configuration::LogCallback& Configuration::get_log_callback() const { |
251 | return log_callback_; |
252 | } |
253 | |
254 | const Configuration::StatsCallback& Configuration::get_stats_callback() const { |
255 | return stats_callback_; |
256 | } |
257 | |
258 | const Configuration::SocketCallback& Configuration::get_socket_callback() const { |
259 | return socket_callback_; |
260 | } |
261 | |
262 | const Configuration::BackgroundEventCallback& |
263 | Configuration::get_background_event_callback() const { |
264 | return background_event_callback_; |
265 | } |
266 | |
267 | const optional<TopicConfiguration>& Configuration::get_default_topic_configuration() const { |
268 | return default_topic_config_; |
269 | } |
270 | |
271 | optional<TopicConfiguration>& Configuration::get_default_topic_configuration() { |
272 | return default_topic_config_; |
273 | } |
274 | |
275 | Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) { |
276 | return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup); |
277 | } |
278 | |
279 | } // cppkafka |
280 | |