1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30#include <iostream>
31#include <string>
32#include "topic_partition_list.h"
33#include "topic_partition.h"
34#include "exceptions.h"
35#include "metadata.h"
36
37using std::vector;
38using std::set;
39using std::ostream;
40using std::string;
41using std::equal;
42
43namespace cppkafka {
44
45TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions) {
46 TopicPartitionsListPtr handle(rd_kafka_topic_partition_list_new(topic_partitions.size()),
47 &rd_kafka_topic_partition_list_destroy);
48 for (const auto& item : topic_partitions) {
49 rd_kafka_topic_partition_t* new_item = rd_kafka_topic_partition_list_add(
50 handle.get(),
51 item.get_topic().data(),
52 item.get_partition()
53 );
54 new_item->offset = item.get_offset();
55 }
56 return handle;
57}
58
59TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions) {
60 return convert(topic_partitions.get());
61}
62
63TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions) {
64 TopicPartitionList output;
65 for (int i = 0; i < topic_partitions->cnt; ++i) {
66 const auto& elem = topic_partitions->elems[i];
67 output.emplace_back(elem.topic, elem.partition, elem.offset);
68 }
69 return output;
70}
71
72TopicPartitionList convert(const std::string& topic,
73 const std::vector<PartitionMetadata>& partition_metadata)
74{
75 TopicPartitionList output;
76 for (const auto& meta : partition_metadata) {
77 output.emplace_back(topic, meta.get_id());
78 }
79 return output;
80}
81
82TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
83 return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
84}
85
86TopicPartitionList find_matches(const TopicPartitionList& partitions,
87 const set<string>& topics) {
88 TopicPartitionList subset;
89 for (const auto& partition : partitions) {
90 for (const auto& topic : topics) {
91 if (topic.size() == partition.get_topic().size()) {
92 // compare both strings
93 bool match = equal(topic.begin(), topic.end(), partition.get_topic().begin(),
94 [](char c1, char c2)->bool {
95 return toupper(c1) == toupper(c2);
96 });
97 if (match) {
98 subset.emplace_back(partition);
99 }
100 }
101 }
102 }
103 return subset;
104}
105
106TopicPartitionList find_matches(const TopicPartitionList& partitions,
107 const set<int>& ids) {
108 TopicPartitionList subset;
109 for (const auto& partition : partitions) {
110 if (ids.count(partition.get_partition()) > 0) {
111 subset.emplace_back(partition);
112 }
113 }
114 return subset;
115}
116
117ostream& operator<<(ostream& output, const TopicPartitionList& rhs) {
118 output << "[ ";
119 for (auto iter = rhs.begin(); iter != rhs.end(); ++iter) {
120 if (iter != rhs.begin()) {
121 output << ", ";
122 }
123 output << *iter;
124 }
125 output << " ]";
126 return output;
127}
128
129} // cppkafka
130