1 | #include <Storages/Kafka/ReadBufferFromKafkaConsumer.h> |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <cppkafka/cppkafka.h> |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | using namespace std::chrono_literals; |
11 | |
12 | ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer( |
13 | ConsumerPtr consumer_, |
14 | Poco::Logger * log_, |
15 | size_t max_batch_size, |
16 | size_t poll_timeout_, |
17 | bool intermediate_commit_, |
18 | const std::atomic<bool> & stopped_) |
19 | : ReadBuffer(nullptr, 0) |
20 | , consumer(consumer_) |
21 | , log(log_) |
22 | , batch_size(max_batch_size) |
23 | , poll_timeout(poll_timeout_) |
24 | , intermediate_commit(intermediate_commit_) |
25 | , stopped(stopped_) |
26 | , current(messages.begin()) |
27 | { |
28 | } |
29 | |
30 | ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer() |
31 | { |
32 | /// NOTE: see https://github.com/edenhill/librdkafka/issues/2077 |
33 | consumer->unsubscribe(); |
34 | consumer->unassign(); |
35 | while (consumer->get_consumer_queue().next_event(1s)); |
36 | } |
37 | |
38 | void ReadBufferFromKafkaConsumer::commit() |
39 | { |
40 | auto PrintOffsets = [this] (const char * prefix, const cppkafka::TopicPartitionList & offsets) |
41 | { |
42 | for (const auto & topic_part : offsets) |
43 | { |
44 | auto print_special_offset = [&topic_part] |
45 | { |
46 | switch (topic_part.get_offset()) |
47 | { |
48 | case cppkafka::TopicPartition::OFFSET_BEGINNING: return "BEGINNING" ; |
49 | case cppkafka::TopicPartition::OFFSET_END: return "END" ; |
50 | case cppkafka::TopicPartition::OFFSET_STORED: return "STORED" ; |
51 | case cppkafka::TopicPartition::OFFSET_INVALID: return "INVALID" ; |
52 | default: return "" ; |
53 | } |
54 | }; |
55 | |
56 | if (topic_part.get_offset() < 0) |
57 | { |
58 | LOG_TRACE( |
59 | log, |
60 | prefix << " " << print_special_offset() << " (topic: " << topic_part.get_topic() |
61 | << ", partition: " << topic_part.get_partition() << ")" ); |
62 | } |
63 | else |
64 | { |
65 | LOG_TRACE( |
66 | log, |
67 | prefix << " " << topic_part.get_offset() << " (topic: " << topic_part.get_topic() |
68 | << ", partition: " << topic_part.get_partition() << ")" ); |
69 | } |
70 | } |
71 | }; |
72 | |
73 | PrintOffsets("Polled offset" , consumer->get_offsets_position(consumer->get_assignment())); |
74 | |
75 | consumer->async_commit(); |
76 | |
77 | PrintOffsets("Committed offset" , consumer->get_offsets_committed(consumer->get_assignment())); |
78 | |
79 | stalled = false; |
80 | } |
81 | |
82 | void ReadBufferFromKafkaConsumer::subscribe(const Names & topics) |
83 | { |
84 | { |
85 | String message = "Already subscribed to topics:" ; |
86 | for (const auto & topic : consumer->get_subscription()) |
87 | message += " " + topic; |
88 | LOG_TRACE(log, message); |
89 | } |
90 | |
91 | { |
92 | String message = "Already assigned to topics:" ; |
93 | for (const auto & toppar : consumer->get_assignment()) |
94 | message += " " + toppar.get_topic(); |
95 | LOG_TRACE(log, message); |
96 | } |
97 | |
98 | // While we wait for an assignment after subscribtion, we'll poll zero messages anyway. |
99 | // If we're doing a manual select then it's better to get something after a wait, then immediate nothing. |
100 | // But due to the nature of async pause/resume/subscribe we can't guarantee any persistent state: |
101 | // see https://github.com/edenhill/librdkafka/issues/2455 |
102 | while (consumer->get_subscription().empty()) |
103 | { |
104 | stalled = false; |
105 | |
106 | try |
107 | { |
108 | consumer->subscribe(topics); |
109 | if (nextImpl()) |
110 | break; |
111 | |
112 | // FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up. |
113 | // see https://github.com/edenhill/librdkafka/issues/2077 |
114 | } |
115 | catch (cppkafka::HandleException & e) |
116 | { |
117 | if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) |
118 | continue; |
119 | throw; |
120 | } |
121 | } |
122 | |
123 | stalled = false; |
124 | } |
125 | |
126 | void ReadBufferFromKafkaConsumer::unsubscribe() |
127 | { |
128 | LOG_TRACE(log, "Re-joining claimed consumer after failure" ); |
129 | |
130 | messages.clear(); |
131 | current = messages.begin(); |
132 | BufferBase::set(nullptr, 0, 0); |
133 | |
134 | consumer->unsubscribe(); |
135 | } |
136 | |
137 | /// Do commit messages implicitly after we processed the previous batch. |
138 | bool ReadBufferFromKafkaConsumer::nextImpl() |
139 | { |
140 | /// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind. |
141 | /// If we failed to poll any message once - don't try again. |
142 | /// Otherwise, the |poll_timeout| expectations get flawn. |
143 | if (stalled || stopped || !allowed) |
144 | return false; |
145 | |
146 | if (current == messages.end()) |
147 | { |
148 | if (intermediate_commit) |
149 | commit(); |
150 | |
151 | /// Don't drop old messages immediately, since we may need them for virtual columns. |
152 | auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout)); |
153 | if (new_messages.empty()) |
154 | { |
155 | LOG_TRACE(log, "Stalled" ); |
156 | stalled = true; |
157 | return false; |
158 | } |
159 | messages = std::move(new_messages); |
160 | current = messages.begin(); |
161 | |
162 | LOG_TRACE(log, "Polled batch of " << messages.size() << " messages" ); |
163 | } |
164 | |
165 | if (auto err = current->get_error()) |
166 | { |
167 | ++current; |
168 | |
169 | // TODO: should throw exception instead |
170 | LOG_ERROR(log, "Consumer error: " << err); |
171 | return false; |
172 | } |
173 | |
174 | // XXX: very fishy place with const casting. |
175 | auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data())); |
176 | BufferBase::set(new_position, current->get_payload().get_size(), 0); |
177 | allowed = false; |
178 | |
179 | /// Since we can poll more messages than we already processed - commit only processed messages. |
180 | consumer->store_offset(*current); |
181 | |
182 | ++current; |
183 | |
184 | return true; |
185 | } |
186 | |
187 | } |
188 | |