1 | #include "WriteBufferToKafkaProducer.h" |
2 | |
3 | namespace DB |
4 | { |
5 | WriteBufferToKafkaProducer::WriteBufferToKafkaProducer( |
6 | ProducerPtr producer_, |
7 | const std::string & topic_, |
8 | std::optional<char> delimiter, |
9 | size_t rows_per_message, |
10 | size_t chunk_size_, |
11 | std::chrono::milliseconds poll_timeout) |
12 | : WriteBuffer(nullptr, 0) |
13 | , producer(producer_) |
14 | , topic(topic_) |
15 | , delim(delimiter) |
16 | , max_rows(rows_per_message) |
17 | , chunk_size(chunk_size_) |
18 | , timeout(poll_timeout) |
19 | { |
20 | } |
21 | |
22 | WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer() |
23 | { |
24 | assert(rows == 0 && chunks.empty()); |
25 | } |
26 | |
27 | void WriteBufferToKafkaProducer::count_row() |
28 | { |
29 | if (++rows % max_rows == 0) |
30 | { |
31 | std::string payload; |
32 | payload.reserve((chunks.size() - 1) * chunk_size + offset()); |
33 | for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i) |
34 | payload.append(*i); |
35 | int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0; |
36 | payload.append(chunks.back(), 0, offset() - trunk_delim); |
37 | |
38 | while (true) |
39 | { |
40 | try |
41 | { |
42 | producer->produce(cppkafka::MessageBuilder(topic).payload(payload)); |
43 | } |
44 | catch (cppkafka::HandleException & e) |
45 | { |
46 | if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) |
47 | { |
48 | producer->poll(timeout); |
49 | continue; |
50 | } |
51 | throw e; |
52 | } |
53 | |
54 | break; |
55 | } |
56 | |
57 | rows = 0; |
58 | chunks.clear(); |
59 | set(nullptr, 0); |
60 | } |
61 | } |
62 | |
63 | void WriteBufferToKafkaProducer::flush() |
64 | { |
65 | // For unknown reason we may hit some internal timeout when inserting for the first time. |
66 | while (true) |
67 | { |
68 | try |
69 | { |
70 | producer->flush(timeout); |
71 | } |
72 | catch (cppkafka::HandleException & e) |
73 | { |
74 | if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) |
75 | continue; |
76 | throw e; |
77 | } |
78 | |
79 | break; |
80 | } |
81 | } |
82 | |
83 | void WriteBufferToKafkaProducer::nextImpl() |
84 | { |
85 | chunks.push_back(std::string()); |
86 | chunks.back().resize(chunk_size); |
87 | set(chunks.back().data(), chunk_size); |
88 | } |
89 | |
90 | } |
91 | |