1#pragma once
2
3#include <Core/Names.h>
4#include <Core/Types.h>
5#include <IO/ReadBuffer.h>
6
7#include <cppkafka/cppkafka.h>
8
9namespace Poco
10{
11 class Logger;
12}
13
14namespace DB
15{
16
17using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
18
19class ReadBufferFromKafkaConsumer : public ReadBuffer
20{
21public:
22 ReadBufferFromKafkaConsumer(
23 ConsumerPtr consumer_,
24 Poco::Logger * log_,
25 size_t max_batch_size,
26 size_t poll_timeout_,
27 bool intermediate_commit_,
28 const std::atomic<bool> & stopped_);
29 ~ReadBufferFromKafkaConsumer() override;
30
31 void allowNext() { allowed = true; } // Allow to read next message.
32 void commit(); // Commit all processed messages.
33 void subscribe(const Names & topics); // Subscribe internal consumer to topics.
34 void unsubscribe(); // Unsubscribe internal consumer in case of failure.
35
36 auto pollTimeout() const { return poll_timeout; }
37
38 // Return values for the message that's being read.
39 String currentTopic() const { return current[-1].get_topic(); }
40 String currentKey() const { return current[-1].get_key(); }
41 auto currentOffset() const { return current[-1].get_offset(); }
42 auto currentPartition() const { return current[-1].get_partition(); }
43 auto currentTimestamp() const { return current[-1].get_timestamp(); }
44
45private:
46 using Messages = std::vector<cppkafka::Message>;
47
48 ConsumerPtr consumer;
49 Poco::Logger * log;
50 const size_t batch_size = 1;
51 const size_t poll_timeout = 0;
52 bool stalled = false;
53 bool intermediate_commit = true;
54 bool allowed = true;
55
56 const std::atomic<bool> & stopped;
57
58 Messages messages;
59 Messages::const_iterator current;
60
61 bool nextImpl() override;
62};
63
64}
65