| 1 | #pragma once |
| 2 | |
| 3 | #include <Core/Names.h> |
| 4 | #include <Core/Types.h> |
| 5 | #include <IO/ReadBuffer.h> |
| 6 | |
| 7 | #include <cppkafka/cppkafka.h> |
| 8 | |
| 9 | namespace Poco |
| 10 | { |
| 11 | class Logger; |
| 12 | } |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | |
| 17 | using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>; |
| 18 | |
| 19 | class ReadBufferFromKafkaConsumer : public ReadBuffer |
| 20 | { |
| 21 | public: |
| 22 | ReadBufferFromKafkaConsumer( |
| 23 | ConsumerPtr consumer_, |
| 24 | Poco::Logger * log_, |
| 25 | size_t max_batch_size, |
| 26 | size_t poll_timeout_, |
| 27 | bool intermediate_commit_, |
| 28 | const std::atomic<bool> & stopped_); |
| 29 | ~ReadBufferFromKafkaConsumer() override; |
| 30 | |
| 31 | void allowNext() { allowed = true; } // Allow to read next message. |
| 32 | void commit(); // Commit all processed messages. |
| 33 | void subscribe(const Names & topics); // Subscribe internal consumer to topics. |
| 34 | void unsubscribe(); // Unsubscribe internal consumer in case of failure. |
| 35 | |
| 36 | auto pollTimeout() const { return poll_timeout; } |
| 37 | |
| 38 | // Return values for the message that's being read. |
| 39 | String currentTopic() const { return current[-1].get_topic(); } |
| 40 | String currentKey() const { return current[-1].get_key(); } |
| 41 | auto currentOffset() const { return current[-1].get_offset(); } |
| 42 | auto currentPartition() const { return current[-1].get_partition(); } |
| 43 | auto currentTimestamp() const { return current[-1].get_timestamp(); } |
| 44 | |
| 45 | private: |
| 46 | using Messages = std::vector<cppkafka::Message>; |
| 47 | |
| 48 | ConsumerPtr consumer; |
| 49 | Poco::Logger * log; |
| 50 | const size_t batch_size = 1; |
| 51 | const size_t poll_timeout = 0; |
| 52 | bool stalled = false; |
| 53 | bool intermediate_commit = true; |
| 54 | bool allowed = true; |
| 55 | |
| 56 | const std::atomic<bool> & stopped; |
| 57 | |
| 58 | Messages messages; |
| 59 | Messages::const_iterator current; |
| 60 | |
| 61 | bool nextImpl() override; |
| 62 | }; |
| 63 | |
| 64 | } |
| 65 | |