1 | #include <Storages/Kafka/KafkaBlockInputStream.h> |
2 | |
3 | #include <DataStreams/ConvertingBlockInputStream.h> |
4 | #include <DataStreams/OneBlockInputStream.h> |
5 | #include <Formats/FormatFactory.h> |
6 | #include <Storages/Kafka/ReadBufferFromKafkaConsumer.h> |
7 | #include <Processors/Formats/InputStreamFromInputFormat.h> |
8 | |
9 | namespace DB |
10 | { |
11 | KafkaBlockInputStream::KafkaBlockInputStream( |
12 | StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_) |
13 | : storage(storage_) |
14 | , context(context_) |
15 | , column_names(columns) |
16 | , max_block_size(max_block_size_) |
17 | , commit_in_suffix(commit_in_suffix_) |
18 | , non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support |
19 | , virtual_header(storage.getSampleBlockForColumns({"_topic" , "_key" , "_offset" , "_partition" , "_timestamp" })) |
20 | |
21 | { |
22 | context.setSetting("input_format_skip_unknown_fields" , 1u); // Always skip unknown fields regardless of the context (JSON or TSKV) |
23 | context.setSetting("input_format_allow_errors_ratio" , 0.); |
24 | context.setSetting("input_format_allow_errors_num" , storage.skipBroken()); |
25 | |
26 | if (!storage.getSchemaName().empty()) |
27 | context.setSetting("format_schema" , storage.getSchemaName()); |
28 | } |
29 | |
30 | KafkaBlockInputStream::~KafkaBlockInputStream() |
31 | { |
32 | if (!claimed) |
33 | return; |
34 | |
35 | if (broken) |
36 | buffer->unsubscribe(); |
37 | |
38 | storage.pushReadBuffer(buffer); |
39 | } |
40 | |
41 | Block KafkaBlockInputStream::() const |
42 | { |
43 | return storage.getSampleBlockForColumns(column_names); |
44 | } |
45 | |
46 | void KafkaBlockInputStream::readPrefixImpl() |
47 | { |
48 | auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds()); |
49 | buffer = storage.popReadBuffer(timeout); |
50 | claimed = !!buffer; |
51 | |
52 | if (!buffer) |
53 | return; |
54 | |
55 | buffer->subscribe(storage.getTopics()); |
56 | |
57 | broken = true; |
58 | } |
59 | |
60 | Block KafkaBlockInputStream::readImpl() |
61 | { |
62 | if (!buffer) |
63 | return Block(); |
64 | |
65 | MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); |
66 | MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); |
67 | |
68 | auto input_format = FormatFactory::instance().getInputFormat( |
69 | storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); |
70 | |
71 | InputPort port(input_format->getPort().getHeader(), input_format.get()); |
72 | connect(input_format->getPort(), port); |
73 | port.setNeeded(); |
74 | |
75 | auto read_kafka_message = [&] |
76 | { |
77 | size_t new_rows = 0; |
78 | |
79 | while (true) |
80 | { |
81 | auto status = input_format->prepare(); |
82 | |
83 | switch (status) |
84 | { |
85 | case IProcessor::Status::Ready: |
86 | input_format->work(); |
87 | break; |
88 | |
89 | case IProcessor::Status::Finished: |
90 | input_format->resetParser(); |
91 | return new_rows; |
92 | |
93 | case IProcessor::Status::PortFull: |
94 | { |
95 | auto chunk = port.pull(); |
96 | |
97 | // that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005 |
98 | // if will be backported should go together with #8005 |
99 | auto chunk_rows = chunk.getNumRows(); |
100 | new_rows += chunk_rows; |
101 | |
102 | auto columns = chunk.detachColumns(); |
103 | for (size_t i = 0, s = columns.size(); i < s; ++i) |
104 | { |
105 | result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size()); |
106 | } |
107 | break; |
108 | } |
109 | case IProcessor::Status::NeedData: |
110 | case IProcessor::Status::Async: |
111 | case IProcessor::Status::Wait: |
112 | case IProcessor::Status::ExpandPipeline: |
113 | throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR); |
114 | } |
115 | } |
116 | }; |
117 | |
118 | size_t total_rows = 0; |
119 | |
120 | while (true) |
121 | { |
122 | // some formats (like RowBinaryWithNamesAndTypes / CSVWithNames) |
123 | // throw an exception from readPrefix when buffer in empty |
124 | if (buffer->eof()) |
125 | break; |
126 | |
127 | auto new_rows = read_kafka_message(); |
128 | |
129 | auto _topic = buffer->currentTopic(); |
130 | auto _key = buffer->currentKey(); |
131 | auto _offset = buffer->currentOffset(); |
132 | auto _partition = buffer->currentPartition(); |
133 | auto _timestamp_raw = buffer->currentTimestamp(); |
134 | auto _timestamp = _timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(_timestamp_raw->get_timestamp()).count() |
135 | : 0; |
136 | for (size_t i = 0; i < new_rows; ++i) |
137 | { |
138 | virtual_columns[0]->insert(_topic); |
139 | virtual_columns[1]->insert(_key); |
140 | virtual_columns[2]->insert(_offset); |
141 | virtual_columns[3]->insert(_partition); |
142 | if (_timestamp_raw) |
143 | { |
144 | virtual_columns[4]->insert(_timestamp); |
145 | } |
146 | else |
147 | { |
148 | virtual_columns[4]->insertDefault(); |
149 | } |
150 | } |
151 | |
152 | total_rows = total_rows + new_rows; |
153 | buffer->allowNext(); |
154 | if (!new_rows || total_rows >= max_block_size || !checkTimeLimit()) |
155 | break; |
156 | } |
157 | |
158 | if (total_rows == 0) |
159 | return Block(); |
160 | |
161 | /// MATERIALIZED columns can be added here, but I think |
162 | // they are not needed here: |
163 | // and it's misleading to use them here, |
164 | // as columns 'materialized' that way stays 'ephemeral' |
165 | // i.e. will not be stored anythere |
166 | // If needed any extra columns can be added using DEFAULT they can be added at MV level if needed. |
167 | |
168 | auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns)); |
169 | auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); |
170 | |
171 | for (const auto & column : virtual_block.getColumnsWithTypeAndName()) |
172 | result_block.insert(column); |
173 | |
174 | return ConvertingBlockInputStream( |
175 | context, |
176 | std::make_shared<OneBlockInputStream>(result_block), |
177 | getHeader(), |
178 | ConvertingBlockInputStream::MatchColumnsMode::Name) |
179 | .read(); |
180 | } |
181 | |
182 | void KafkaBlockInputStream::readSuffixImpl() |
183 | { |
184 | broken = false; |
185 | |
186 | if (commit_in_suffix) |
187 | commit(); |
188 | } |
189 | |
190 | void KafkaBlockInputStream::commit() |
191 | { |
192 | if (!buffer) |
193 | return; |
194 | |
195 | buffer->commit(); |
196 | } |
197 | |
198 | } |
199 | |