1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30#include "topic_configuration.h"
31#include <vector>
32#include <librdkafka/rdkafka.h>
33#include "exceptions.h"
34#include "topic.h"
35#include "buffer.h"
36#include "detail/callback_invoker.h"
37
38using std::string;
39using std::map;
40using std::vector;
41using std::initializer_list;
42
43namespace cppkafka {
44
45int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *key_ptr,
46 size_t key_size, int32_t partition_count,
47 void* topic_opaque, void* message_opaque) {
48 const TopicConfiguration* config = static_cast<TopicConfiguration*>(topic_opaque);
49 const auto& callback = config->get_partitioner_callback();
50 if (callback) {
51 Topic topic = Topic::make_non_owning(const_cast<rd_kafka_topic_t*>(handle));
52 Buffer key(static_cast<const char*>(key_ptr), key_size);
53 return CallbackInvoker<TopicConfiguration::PartitionerCallback>("topic partitioner", callback, nullptr)
54 (topic, key, partition_count);
55 }
56 else {
57 return rd_kafka_msg_partitioner_consistent_random(handle, key_ptr, key_size,
58 partition_count, topic_opaque,
59 message_opaque);
60 }
61}
62
63TopicConfiguration::TopicConfiguration()
64: handle_(make_handle(rd_kafka_topic_conf_new())) {
65
66}
67
68TopicConfiguration::TopicConfiguration(const vector<ConfigurationOption>& options)
69: TopicConfiguration() {
70 set(options);
71}
72
73TopicConfiguration::TopicConfiguration(const initializer_list<ConfigurationOption>& options)
74: TopicConfiguration() {
75 set(options);
76}
77
78TopicConfiguration::TopicConfiguration(rd_kafka_topic_conf_t* ptr)
79: handle_(make_handle(ptr)) {
80
81}
82
83TopicConfiguration& TopicConfiguration::set(const string& name, const string& value) {
84 char error_buffer[512];
85 rd_kafka_conf_res_t result;
86 result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
87 sizeof(error_buffer));
88 if (result != RD_KAFKA_CONF_OK) {
89 throw ConfigException(name, error_buffer);
90 }
91 return *this;
92}
93
94TopicConfiguration& TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) {
95 partitioner_callback_ = move(callback);
96 rd_kafka_topic_conf_set_partitioner_cb(handle_.get(), &partitioner_callback_proxy);
97 return *this;
98}
99
100TopicConfiguration& TopicConfiguration::set_as_opaque() {
101 rd_kafka_topic_conf_set_opaque(handle_.get(), this);
102 return *this;
103}
104
105const TopicConfiguration::PartitionerCallback&
106TopicConfiguration::get_partitioner_callback() const {
107 return partitioner_callback_;
108}
109
110bool TopicConfiguration::has_property(const string& name) const {
111 size_t size = 0;
112 return rd_kafka_topic_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK;
113}
114
115string TopicConfiguration::get(const string& name) const {
116 size_t size = 0;
117 auto result = rd_kafka_topic_conf_get(handle_.get(), name.data(), nullptr, &size);
118 if (result != RD_KAFKA_CONF_OK) {
119 throw ConfigOptionNotFound(name);
120 }
121 vector<char> buffer(size);
122 rd_kafka_topic_conf_get(handle_.get(), name.data(), buffer.data(), &size);
123 return string(buffer.data());
124}
125
126map<string, string> TopicConfiguration::get_all() const {
127 size_t count = 0;
128 const char** all = rd_kafka_topic_conf_dump(handle_.get(), &count);
129 map<string, string> output = parse_dump(all, count);
130 rd_kafka_conf_dump_free(all, count);
131 return output;
132}
133
134rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const {
135 return handle_.get();
136}
137
138TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) {
139 return HandlePtr(ptr, &rd_kafka_topic_conf_destroy, &rd_kafka_topic_conf_dup);
140}
141
142} // cppkafka
143