1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30#include <assert.h>
31#include "metadata.h"
32#include "error.h"
33
34using std::string;
35using std::vector;
36using std::unordered_set;
37
38namespace cppkafka {
39
40// PartitionMetadata
41
42PartitionMetadata::PartitionMetadata(const rd_kafka_metadata_partition& partition)
43: id_(partition.id), error_(partition.err), leader_(partition.leader) {
44 for (int i = 0; i < partition.replica_cnt; ++i) {
45 replicas_.push_back(partition.replicas[i]);
46 }
47 for (int i = 0; i < partition.isr_cnt; ++i) {
48 isrs_.push_back(partition.isrs[i]);
49 }
50}
51
52uint32_t PartitionMetadata::get_id() const {
53 return id_;
54}
55
56Error PartitionMetadata::get_error() const {
57 return error_;
58}
59
60int32_t PartitionMetadata::get_leader() const {
61 return leader_;
62}
63
64const vector<int32_t>& PartitionMetadata::get_replicas() const {
65 return replicas_;
66}
67
68const vector<int32_t>& PartitionMetadata::get_in_sync_replica_brokers() const {
69 return isrs_;
70}
71
72// TopicMetadata
73
74TopicMetadata::TopicMetadata(const rd_kafka_metadata_topic& topic)
75: name_(topic.topic), error_(topic.err) {
76 for (int i = 0; i < topic.partition_cnt; ++i) {
77 partitions_.emplace_back(topic.partitions[i]);
78 }
79}
80
81const string& TopicMetadata::get_name() const {
82 return name_;
83}
84
85Error TopicMetadata::get_error() const {
86 return error_;
87}
88
89const vector<PartitionMetadata>& TopicMetadata::get_partitions() const {
90 return partitions_;
91}
92
93// BrokerMetadata
94
95BrokerMetadata::BrokerMetadata(const rd_kafka_metadata_broker_t& broker)
96: host_(broker.host), id_(broker.id), port_(static_cast<uint16_t>(broker.port)) {
97
98}
99
100const string& BrokerMetadata::get_host() const {
101 return host_;
102}
103
104int32_t BrokerMetadata::get_id() const {
105 return id_;
106}
107
108uint16_t BrokerMetadata::get_port() const {
109 return port_;
110}
111
112// Metadata
113
114void dummy_metadata_destroyer(const rd_kafka_metadata_t*) {
115
116}
117
118Metadata Metadata::make_non_owning(const rd_kafka_metadata_t* handle) {
119 return Metadata(handle, NonOwningTag{});
120}
121
122Metadata::Metadata()
123: handle_(nullptr, nullptr) {
124
125}
126
127Metadata::Metadata(const rd_kafka_metadata_t* handle)
128: handle_(handle, &rd_kafka_metadata_destroy) {
129
130}
131
132Metadata::Metadata(const rd_kafka_metadata_t* handle, NonOwningTag)
133: handle_(handle, &dummy_metadata_destroyer) {
134
135}
136
137vector<BrokerMetadata> Metadata::get_brokers() const {
138 assert(handle_);
139 vector<BrokerMetadata> output;
140 for (int i = 0; i < handle_->broker_cnt; ++i) {
141 const rd_kafka_metadata_broker_t& broker = handle_->brokers[i];
142 output.emplace_back(broker);
143 }
144 return output;
145}
146
147vector<TopicMetadata> Metadata::get_topics() const {
148 assert(handle_);
149 vector<TopicMetadata> output;
150 for (int i = 0; i < handle_->topic_cnt; ++i) {
151 const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
152 output.emplace_back(topic);
153 }
154 return output;
155}
156
157vector<TopicMetadata> Metadata::get_topics(const unordered_set<string>& topics) const {
158 assert(handle_);
159 vector<TopicMetadata> output;
160 for (int i = 0; i < handle_->topic_cnt; ++i) {
161 const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
162 if (topics.count(topic.topic)) {
163 output.emplace_back(topic);
164 }
165 }
166 return output;
167}
168
169vector<TopicMetadata> Metadata::get_topics_prefixed(const string& prefix) const {
170 assert(handle_);
171 vector<TopicMetadata> output;
172 for (int i = 0; i < handle_->topic_cnt; ++i) {
173 const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
174 string topic_name = topic.topic;
175 if (topic_name.find(prefix) == 0) {
176 output.emplace_back(topic);
177 }
178 }
179 return output;
180}
181
182
183Metadata::operator bool() const {
184 return handle_ != nullptr;
185}
186
187const rd_kafka_metadata_t* Metadata::get_handle() const {
188 return handle_.get();
189}
190
191} // cppkafka
192