1#include "WriteBufferToKafkaProducer.h"
2
3namespace DB
4{
5WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
6 ProducerPtr producer_,
7 const std::string & topic_,
8 std::optional<char> delimiter,
9 size_t rows_per_message,
10 size_t chunk_size_,
11 std::chrono::milliseconds poll_timeout)
12 : WriteBuffer(nullptr, 0)
13 , producer(producer_)
14 , topic(topic_)
15 , delim(delimiter)
16 , max_rows(rows_per_message)
17 , chunk_size(chunk_size_)
18 , timeout(poll_timeout)
19{
20}
21
22WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
23{
24 assert(rows == 0 && chunks.empty());
25}
26
27void WriteBufferToKafkaProducer::count_row()
28{
29 if (++rows % max_rows == 0)
30 {
31 std::string payload;
32 payload.reserve((chunks.size() - 1) * chunk_size + offset());
33 for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
34 payload.append(*i);
35 int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0;
36 payload.append(chunks.back(), 0, offset() - trunk_delim);
37
38 while (true)
39 {
40 try
41 {
42 producer->produce(cppkafka::MessageBuilder(topic).payload(payload));
43 }
44 catch (cppkafka::HandleException & e)
45 {
46 if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
47 {
48 producer->poll(timeout);
49 continue;
50 }
51 throw e;
52 }
53
54 break;
55 }
56
57 rows = 0;
58 chunks.clear();
59 set(nullptr, 0);
60 }
61}
62
63void WriteBufferToKafkaProducer::flush()
64{
65 // For unknown reason we may hit some internal timeout when inserting for the first time.
66 while (true)
67 {
68 try
69 {
70 producer->flush(timeout);
71 }
72 catch (cppkafka::HandleException & e)
73 {
74 if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
75 continue;
76 throw e;
77 }
78
79 break;
80 }
81}
82
83void WriteBufferToKafkaProducer::nextImpl()
84{
85 chunks.push_back(std::string());
86 chunks.back().resize(chunk_size);
87 set(chunks.back().data(), chunk_size);
88}
89
90}
91