1#pragma once
2
3#include <DataStreams/IBlockInputStream.h>
4#include <Interpreters/Context.h>
5
6#include <Storages/Kafka/StorageKafka.h>
7#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
8
9
10namespace DB
11{
12
13class KafkaBlockInputStream : public IBlockInputStream
14{
15public:
16 KafkaBlockInputStream(
17 StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix = true);
18 ~KafkaBlockInputStream() override;
19
20 String getName() const override { return storage.getName(); }
21 Block getHeader() const override;
22
23 void readPrefixImpl() override;
24 Block readImpl() override;
25 void readSuffixImpl() override;
26
27 void commit();
28
29private:
30 StorageKafka & storage;
31 Context context;
32 Names column_names;
33 UInt64 max_block_size;
34
35 ConsumerBufferPtr buffer;
36 bool broken = true, claimed = false, commit_in_suffix;
37 const Block non_virtual_header, virtual_header;
38};
39
40}
41