1#include "KafkaBlockOutputStream.h"
2
3#include <Formats/FormatFactory.h>
4#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11
12extern int CANNOT_CREATE_IO_BUFFER;
13
14}
15
16KafkaBlockOutputStream::KafkaBlockOutputStream(StorageKafka & storage_, const Context & context_) : storage(storage_), context(context_)
17{
18}
19
20KafkaBlockOutputStream::~KafkaBlockOutputStream()
21{
22}
23
24Block KafkaBlockOutputStream::getHeader() const
25{
26 return storage.getSampleBlockNonMaterialized();
27}
28
29void KafkaBlockOutputStream::writePrefix()
30{
31 buffer = storage.createWriteBuffer();
32 if (!buffer)
33 throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
34
35 child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this]{ buffer->count_row(); });
36}
37
38void KafkaBlockOutputStream::write(const Block & block)
39{
40 child->write(block);
41}
42
43void KafkaBlockOutputStream::writeSuffix()
44{
45 child->writeSuffix();
46 flush();
47}
48
49void KafkaBlockOutputStream::flush()
50{
51 if (buffer)
52 buffer->flush();
53}
54
55}
56