1#include "group_information.h"
2#include <cstring>
3#include <algorithm>
4#include "topic_partition.h"
5#include "exceptions.h"
6#include "detail/endianness.h"
7
8using std::string;
9using std::vector;
10using std::memcpy;
11using std::distance;
12
13namespace cppkafka {
14
15// MemberAssignmentInformation
16MemberAssignmentInformation::MemberAssignmentInformation(const vector<uint8_t>& data) {
17 const char* error_msg = "Message is malformed";
18 // Version + topic list size
19 if (data.size() < sizeof(uint16_t) + sizeof(uint32_t)) {
20 throw ParseException(error_msg);
21 }
22 const uint8_t* ptr = data.data();
23 const uint8_t* end = ptr + data.size();
24 memcpy(&version_, ptr, sizeof(version_));
25 version_ = be16toh(version_);
26 ptr += sizeof(version_);
27
28 uint32_t total_topics;
29 memcpy(&total_topics, ptr, sizeof(total_topics));
30 total_topics = be32toh(total_topics);
31 ptr += sizeof(total_topics);
32
33 for (uint32_t i = 0; i != total_topics; ++i) {
34 if (ptr + sizeof(uint16_t) > end) {
35 throw ParseException(error_msg);
36 }
37 uint16_t topic_length;
38 memcpy(&topic_length, ptr, sizeof(topic_length));
39 topic_length = be16toh(topic_length);
40 ptr += sizeof(topic_length);
41
42 // Check for string length + size of partitions list
43 if (topic_length > distance(ptr, end) + sizeof(uint32_t)) {
44 throw ParseException(error_msg);
45 }
46 string topic_name(ptr, ptr + topic_length);
47 ptr += topic_length;
48
49 uint32_t total_partitions;
50 memcpy(&total_partitions, ptr, sizeof(total_partitions));
51 total_partitions = be32toh(total_partitions);
52 ptr += sizeof(total_partitions);
53
54 if (ptr + total_partitions * sizeof(uint32_t) > end) {
55 throw ParseException(error_msg);
56 }
57 for (uint32_t j = 0; j < total_partitions; ++j) {
58 uint32_t partition;
59 memcpy(&partition, ptr, sizeof(partition));
60 partition = be32toh(partition);
61 ptr += sizeof(partition);
62
63 topic_partitions_.emplace_back(topic_name, partition);
64 }
65 }
66}
67
68uint16_t MemberAssignmentInformation::get_version() const {
69 return version_;
70}
71
72const TopicPartitionList& MemberAssignmentInformation::get_topic_partitions() const {
73 return topic_partitions_;
74}
75
76// GroupMemberInformation
77
78GroupMemberInformation::GroupMemberInformation(const rd_kafka_group_member_info& info)
79: member_id_(info.member_id), client_id_(info.client_id), client_host_(info.client_host),
80 member_metadata_((uint8_t*)info.member_metadata,
81 (uint8_t*)info.member_metadata + info.member_metadata_size),
82 member_assignment_((uint8_t*)info.member_assignment,
83 (uint8_t*)info.member_assignment + info.member_assignment_size) {
84
85}
86
87const string& GroupMemberInformation::get_member_id() const {
88 return member_id_;
89}
90
91const string& GroupMemberInformation::get_client_id() const {
92 return client_id_;
93}
94
95const string& GroupMemberInformation::get_client_host() const {
96 return client_host_;
97}
98
99const vector<uint8_t>& GroupMemberInformation::get_member_metadata() const {
100 return member_metadata_;
101}
102
103const vector<uint8_t>& GroupMemberInformation::get_member_assignment() const {
104 return member_assignment_;
105}
106
107// GroupInformation
108
109GroupInformation::GroupInformation(const rd_kafka_group_info& info)
110: broker_(info.broker), name_(info.group), error_(info.err), state_(info.state),
111 protocol_type_(info.protocol_type), protocol_(info.protocol) {
112 for (int i = 0; i < info.member_cnt; ++i) {
113 members_.emplace_back(info.members[i]);
114 }
115}
116
117const BrokerMetadata& GroupInformation::get_broker() const {
118 return broker_;
119}
120
121const string& GroupInformation::get_name() const {
122 return name_;
123}
124
125Error GroupInformation::get_error() const {
126 return error_;
127}
128
129const string& GroupInformation::get_state() const {
130 return state_;
131}
132
133const string& GroupInformation::get_protocol_type() const {
134 return protocol_type_;
135}
136
137const string& GroupInformation::get_protocol() const {
138 return protocol_;
139}
140
141const vector<GroupMemberInformation>& GroupInformation::get_members() const {
142 return members_;
143}
144
145} // cppkafka
146