1#pragma once
2
3#include <IO/WriteBuffer.h>
4
5#include <cppkafka/cppkafka.h>
6
7#include <list>
8
9namespace DB
10{
11
12using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
13
14class WriteBufferToKafkaProducer : public WriteBuffer
15{
16public:
17 WriteBufferToKafkaProducer(
18 ProducerPtr producer_,
19 const std::string & topic_,
20 std::optional<char> delimiter,
21 size_t rows_per_message,
22 size_t chunk_size_,
23 std::chrono::milliseconds poll_timeout);
24 ~WriteBufferToKafkaProducer() override;
25
26 void count_row();
27 void flush();
28
29private:
30 void nextImpl() override;
31
32 ProducerPtr producer;
33 const std::string topic;
34 const std::optional<char> delim;
35 const size_t max_rows;
36 const size_t chunk_size;
37 const std::chrono::milliseconds timeout;
38
39 size_t rows = 0;
40 std::list<std::string> chunks;
41};
42
43}
44