| 1 | #include "WriteBufferToKafkaProducer.h" |
| 2 | |
| 3 | namespace DB |
| 4 | { |
| 5 | WriteBufferToKafkaProducer::WriteBufferToKafkaProducer( |
| 6 | ProducerPtr producer_, |
| 7 | const std::string & topic_, |
| 8 | std::optional<char> delimiter, |
| 9 | size_t rows_per_message, |
| 10 | size_t chunk_size_, |
| 11 | std::chrono::milliseconds poll_timeout) |
| 12 | : WriteBuffer(nullptr, 0) |
| 13 | , producer(producer_) |
| 14 | , topic(topic_) |
| 15 | , delim(delimiter) |
| 16 | , max_rows(rows_per_message) |
| 17 | , chunk_size(chunk_size_) |
| 18 | , timeout(poll_timeout) |
| 19 | { |
| 20 | } |
| 21 | |
| 22 | WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer() |
| 23 | { |
| 24 | assert(rows == 0 && chunks.empty()); |
| 25 | } |
| 26 | |
| 27 | void WriteBufferToKafkaProducer::count_row() |
| 28 | { |
| 29 | if (++rows % max_rows == 0) |
| 30 | { |
| 31 | std::string payload; |
| 32 | payload.reserve((chunks.size() - 1) * chunk_size + offset()); |
| 33 | for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i) |
| 34 | payload.append(*i); |
| 35 | int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0; |
| 36 | payload.append(chunks.back(), 0, offset() - trunk_delim); |
| 37 | |
| 38 | while (true) |
| 39 | { |
| 40 | try |
| 41 | { |
| 42 | producer->produce(cppkafka::MessageBuilder(topic).payload(payload)); |
| 43 | } |
| 44 | catch (cppkafka::HandleException & e) |
| 45 | { |
| 46 | if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) |
| 47 | { |
| 48 | producer->poll(timeout); |
| 49 | continue; |
| 50 | } |
| 51 | throw e; |
| 52 | } |
| 53 | |
| 54 | break; |
| 55 | } |
| 56 | |
| 57 | rows = 0; |
| 58 | chunks.clear(); |
| 59 | set(nullptr, 0); |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | void WriteBufferToKafkaProducer::flush() |
| 64 | { |
| 65 | // For unknown reason we may hit some internal timeout when inserting for the first time. |
| 66 | while (true) |
| 67 | { |
| 68 | try |
| 69 | { |
| 70 | producer->flush(timeout); |
| 71 | } |
| 72 | catch (cppkafka::HandleException & e) |
| 73 | { |
| 74 | if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) |
| 75 | continue; |
| 76 | throw e; |
| 77 | } |
| 78 | |
| 79 | break; |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | void WriteBufferToKafkaProducer::nextImpl() |
| 84 | { |
| 85 | chunks.push_back(std::string()); |
| 86 | chunks.back().resize(chunk_size); |
| 87 | set(chunks.back().data(), chunk_size); |
| 88 | } |
| 89 | |
| 90 | } |
| 91 | |