1#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
2
3#include <common/logger_useful.h>
4
5#include <cppkafka/cppkafka.h>
6
7namespace DB
8{
9
10using namespace std::chrono_literals;
11
12ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
13 ConsumerPtr consumer_,
14 Poco::Logger * log_,
15 size_t max_batch_size,
16 size_t poll_timeout_,
17 bool intermediate_commit_,
18 const std::atomic<bool> & stopped_)
19 : ReadBuffer(nullptr, 0)
20 , consumer(consumer_)
21 , log(log_)
22 , batch_size(max_batch_size)
23 , poll_timeout(poll_timeout_)
24 , intermediate_commit(intermediate_commit_)
25 , stopped(stopped_)
26 , current(messages.begin())
27{
28}
29
30ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
31{
32 /// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
33 consumer->unsubscribe();
34 consumer->unassign();
35 while (consumer->get_consumer_queue().next_event(1s));
36}
37
38void ReadBufferFromKafkaConsumer::commit()
39{
40 auto PrintOffsets = [this] (const char * prefix, const cppkafka::TopicPartitionList & offsets)
41 {
42 for (const auto & topic_part : offsets)
43 {
44 auto print_special_offset = [&topic_part]
45 {
46 switch (topic_part.get_offset())
47 {
48 case cppkafka::TopicPartition::OFFSET_BEGINNING: return "BEGINNING";
49 case cppkafka::TopicPartition::OFFSET_END: return "END";
50 case cppkafka::TopicPartition::OFFSET_STORED: return "STORED";
51 case cppkafka::TopicPartition::OFFSET_INVALID: return "INVALID";
52 default: return "";
53 }
54 };
55
56 if (topic_part.get_offset() < 0)
57 {
58 LOG_TRACE(
59 log,
60 prefix << " " << print_special_offset() << " (topic: " << topic_part.get_topic()
61 << ", partition: " << topic_part.get_partition() << ")");
62 }
63 else
64 {
65 LOG_TRACE(
66 log,
67 prefix << " " << topic_part.get_offset() << " (topic: " << topic_part.get_topic()
68 << ", partition: " << topic_part.get_partition() << ")");
69 }
70 }
71 };
72
73 PrintOffsets("Polled offset", consumer->get_offsets_position(consumer->get_assignment()));
74
75 consumer->async_commit();
76
77 PrintOffsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
78
79 stalled = false;
80}
81
82void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
83{
84 {
85 String message = "Already subscribed to topics:";
86 for (const auto & topic : consumer->get_subscription())
87 message += " " + topic;
88 LOG_TRACE(log, message);
89 }
90
91 {
92 String message = "Already assigned to topics:";
93 for (const auto & toppar : consumer->get_assignment())
94 message += " " + toppar.get_topic();
95 LOG_TRACE(log, message);
96 }
97
98 // While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
99 // If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
100 // But due to the nature of async pause/resume/subscribe we can't guarantee any persistent state:
101 // see https://github.com/edenhill/librdkafka/issues/2455
102 while (consumer->get_subscription().empty())
103 {
104 stalled = false;
105
106 try
107 {
108 consumer->subscribe(topics);
109 if (nextImpl())
110 break;
111
112 // FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up.
113 // see https://github.com/edenhill/librdkafka/issues/2077
114 }
115 catch (cppkafka::HandleException & e)
116 {
117 if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
118 continue;
119 throw;
120 }
121 }
122
123 stalled = false;
124}
125
126void ReadBufferFromKafkaConsumer::unsubscribe()
127{
128 LOG_TRACE(log, "Re-joining claimed consumer after failure");
129
130 messages.clear();
131 current = messages.begin();
132 BufferBase::set(nullptr, 0, 0);
133
134 consumer->unsubscribe();
135}
136
137/// Do commit messages implicitly after we processed the previous batch.
138bool ReadBufferFromKafkaConsumer::nextImpl()
139{
140 /// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
141 /// If we failed to poll any message once - don't try again.
142 /// Otherwise, the |poll_timeout| expectations get flawn.
143 if (stalled || stopped || !allowed)
144 return false;
145
146 if (current == messages.end())
147 {
148 if (intermediate_commit)
149 commit();
150
151 /// Don't drop old messages immediately, since we may need them for virtual columns.
152 auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
153 if (new_messages.empty())
154 {
155 LOG_TRACE(log, "Stalled");
156 stalled = true;
157 return false;
158 }
159 messages = std::move(new_messages);
160 current = messages.begin();
161
162 LOG_TRACE(log, "Polled batch of " << messages.size() << " messages");
163 }
164
165 if (auto err = current->get_error())
166 {
167 ++current;
168
169 // TODO: should throw exception instead
170 LOG_ERROR(log, "Consumer error: " << err);
171 return false;
172 }
173
174 // XXX: very fishy place with const casting.
175 auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
176 BufferBase::set(new_position, current->get_payload().get_size(), 0);
177 allowed = false;
178
179 /// Since we can poll more messages than we already processed - commit only processed messages.
180 consumer->store_offset(*current);
181
182 ++current;
183
184 return true;
185}
186
187}
188