1 | #pragma once |
2 | |
3 | #include <Core/Names.h> |
4 | #include <Core/Types.h> |
5 | #include <IO/ReadBuffer.h> |
6 | |
7 | #include <cppkafka/cppkafka.h> |
8 | |
9 | namespace Poco |
10 | { |
11 | class Logger; |
12 | } |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>; |
18 | |
19 | class ReadBufferFromKafkaConsumer : public ReadBuffer |
20 | { |
21 | public: |
22 | ReadBufferFromKafkaConsumer( |
23 | ConsumerPtr consumer_, |
24 | Poco::Logger * log_, |
25 | size_t max_batch_size, |
26 | size_t poll_timeout_, |
27 | bool intermediate_commit_, |
28 | const std::atomic<bool> & stopped_); |
29 | ~ReadBufferFromKafkaConsumer() override; |
30 | |
31 | void allowNext() { allowed = true; } // Allow to read next message. |
32 | void commit(); // Commit all processed messages. |
33 | void subscribe(const Names & topics); // Subscribe internal consumer to topics. |
34 | void unsubscribe(); // Unsubscribe internal consumer in case of failure. |
35 | |
36 | auto pollTimeout() const { return poll_timeout; } |
37 | |
38 | // Return values for the message that's being read. |
39 | String currentTopic() const { return current[-1].get_topic(); } |
40 | String currentKey() const { return current[-1].get_key(); } |
41 | auto currentOffset() const { return current[-1].get_offset(); } |
42 | auto currentPartition() const { return current[-1].get_partition(); } |
43 | auto currentTimestamp() const { return current[-1].get_timestamp(); } |
44 | |
45 | private: |
46 | using Messages = std::vector<cppkafka::Message>; |
47 | |
48 | ConsumerPtr consumer; |
49 | Poco::Logger * log; |
50 | const size_t batch_size = 1; |
51 | const size_t poll_timeout = 0; |
52 | bool stalled = false; |
53 | bool intermediate_commit = true; |
54 | bool allowed = true; |
55 | |
56 | const std::atomic<bool> & stopped; |
57 | |
58 | Messages messages; |
59 | Messages::const_iterator current; |
60 | |
61 | bool nextImpl() override; |
62 | }; |
63 | |
64 | } |
65 | |