1#include <Storages/Kafka/KafkaBlockInputStream.h>
2
3#include <DataStreams/ConvertingBlockInputStream.h>
4#include <DataStreams/OneBlockInputStream.h>
5#include <Formats/FormatFactory.h>
6#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
7#include <Processors/Formats/InputStreamFromInputFormat.h>
8
9namespace DB
10{
11KafkaBlockInputStream::KafkaBlockInputStream(
12 StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_)
13 : storage(storage_)
14 , context(context_)
15 , column_names(columns)
16 , max_block_size(max_block_size_)
17 , commit_in_suffix(commit_in_suffix_)
18 , non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
19 , virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
20
21{
22 context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
23 context.setSetting("input_format_allow_errors_ratio", 0.);
24 context.setSetting("input_format_allow_errors_num", storage.skipBroken());
25
26 if (!storage.getSchemaName().empty())
27 context.setSetting("format_schema", storage.getSchemaName());
28}
29
30KafkaBlockInputStream::~KafkaBlockInputStream()
31{
32 if (!claimed)
33 return;
34
35 if (broken)
36 buffer->unsubscribe();
37
38 storage.pushReadBuffer(buffer);
39}
40
41Block KafkaBlockInputStream::getHeader() const
42{
43 return storage.getSampleBlockForColumns(column_names);
44}
45
46void KafkaBlockInputStream::readPrefixImpl()
47{
48 auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
49 buffer = storage.popReadBuffer(timeout);
50 claimed = !!buffer;
51
52 if (!buffer)
53 return;
54
55 buffer->subscribe(storage.getTopics());
56
57 broken = true;
58}
59
60Block KafkaBlockInputStream::readImpl()
61{
62 if (!buffer)
63 return Block();
64
65 MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
66 MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
67
68 auto input_format = FormatFactory::instance().getInputFormat(
69 storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
70
71 InputPort port(input_format->getPort().getHeader(), input_format.get());
72 connect(input_format->getPort(), port);
73 port.setNeeded();
74
75 auto read_kafka_message = [&]
76 {
77 size_t new_rows = 0;
78
79 while (true)
80 {
81 auto status = input_format->prepare();
82
83 switch (status)
84 {
85 case IProcessor::Status::Ready:
86 input_format->work();
87 break;
88
89 case IProcessor::Status::Finished:
90 input_format->resetParser();
91 return new_rows;
92
93 case IProcessor::Status::PortFull:
94 {
95 auto chunk = port.pull();
96
97 // that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
98 // if will be backported should go together with #8005
99 auto chunk_rows = chunk.getNumRows();
100 new_rows += chunk_rows;
101
102 auto columns = chunk.detachColumns();
103 for (size_t i = 0, s = columns.size(); i < s; ++i)
104 {
105 result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
106 }
107 break;
108 }
109 case IProcessor::Status::NeedData:
110 case IProcessor::Status::Async:
111 case IProcessor::Status::Wait:
112 case IProcessor::Status::ExpandPipeline:
113 throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
114 }
115 }
116 };
117
118 size_t total_rows = 0;
119
120 while (true)
121 {
122 // some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
123 // throw an exception from readPrefix when buffer in empty
124 if (buffer->eof())
125 break;
126
127 auto new_rows = read_kafka_message();
128
129 auto _topic = buffer->currentTopic();
130 auto _key = buffer->currentKey();
131 auto _offset = buffer->currentOffset();
132 auto _partition = buffer->currentPartition();
133 auto _timestamp_raw = buffer->currentTimestamp();
134 auto _timestamp = _timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(_timestamp_raw->get_timestamp()).count()
135 : 0;
136 for (size_t i = 0; i < new_rows; ++i)
137 {
138 virtual_columns[0]->insert(_topic);
139 virtual_columns[1]->insert(_key);
140 virtual_columns[2]->insert(_offset);
141 virtual_columns[3]->insert(_partition);
142 if (_timestamp_raw)
143 {
144 virtual_columns[4]->insert(_timestamp);
145 }
146 else
147 {
148 virtual_columns[4]->insertDefault();
149 }
150 }
151
152 total_rows = total_rows + new_rows;
153 buffer->allowNext();
154 if (!new_rows || total_rows >= max_block_size || !checkTimeLimit())
155 break;
156 }
157
158 if (total_rows == 0)
159 return Block();
160
161 /// MATERIALIZED columns can be added here, but I think
162 // they are not needed here:
163 // and it's misleading to use them here,
164 // as columns 'materialized' that way stays 'ephemeral'
165 // i.e. will not be stored anythere
166 // If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
167
168 auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
169 auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
170
171 for (const auto & column : virtual_block.getColumnsWithTypeAndName())
172 result_block.insert(column);
173
174 return ConvertingBlockInputStream(
175 context,
176 std::make_shared<OneBlockInputStream>(result_block),
177 getHeader(),
178 ConvertingBlockInputStream::MatchColumnsMode::Name)
179 .read();
180}
181
182void KafkaBlockInputStream::readSuffixImpl()
183{
184 broken = false;
185
186 if (commit_in_suffix)
187 commit();
188}
189
190void KafkaBlockInputStream::commit()
191{
192 if (!buffer)
193 return;
194
195 buffer->commit();
196}
197
198}
199