1#include <IO/ReadHelpers.h>
2#include <Interpreters/evaluateConstantExpression.h>
3#include <Interpreters/Context.h>
4#include <Interpreters/convertFieldToType.h>
5#include <Parsers/TokenIterator.h>
6#include <Parsers/ExpressionListParsers.h>
7#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
8#include <Formats/FormatFactory.h>
9#include <Common/FieldVisitors.h>
10#include <Core/Block.h>
11#include <Common/typeid_cast.h>
12#include <common/find_symbols.h>
13#include <Parsers/ASTLiteral.h>
14#include <DataTypes/DataTypeNullable.h>
15
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22 extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
23 extern const int CANNOT_PARSE_QUOTED_STRING;
24 extern const int CANNOT_PARSE_NUMBER;
25 extern const int CANNOT_PARSE_DATE;
26 extern const int CANNOT_PARSE_DATETIME;
27 extern const int CANNOT_READ_ARRAY_FROM_TEXT;
28 extern const int CANNOT_PARSE_DATE;
29 extern const int SYNTAX_ERROR;
30 extern const int TYPE_MISMATCH;
31 extern const int SUPPORT_IS_DISABLED;
32}
33
34
35ValuesBlockInputFormat::ValuesBlockInputFormat(ReadBuffer & in_, const Block & header_, const RowInputFormatParams & params_,
36 const FormatSettings & format_settings_)
37 : IInputFormat(header_, buf), buf(in_), params(params_),
38 format_settings(format_settings_), num_columns(header_.columns()),
39 parser_type_for_column(num_columns, ParserType::Streaming),
40 attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),
41 rows_parsed_using_template(num_columns), templates(num_columns), types(header_.getDataTypes())
42{
43 /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
44 skipBOMIfExists(buf);
45}
46
47Chunk ValuesBlockInputFormat::generate()
48{
49 const Block & header = getPort().getHeader();
50 MutableColumns columns = header.cloneEmptyColumns();
51 block_missing_values.clear();
52
53 for (size_t rows_in_block = 0; rows_in_block < params.max_block_size; ++rows_in_block)
54 {
55 try
56 {
57 skipWhitespaceIfAny(buf);
58 if (buf.eof() || *buf.position() == ';')
59 break;
60 readRow(columns, rows_in_block);
61 if (params.callback)
62 params.callback();
63 }
64 catch (Exception & e)
65 {
66 if (isParseError(e.code()))
67 e.addMessage(" at row " + std::to_string(total_rows));
68 throw;
69 }
70 }
71
72 /// Evaluate expressions, which were parsed using templates, if any
73 for (size_t i = 0; i < columns.size(); ++i)
74 {
75 if (!templates[i] || !templates[i]->rowsCount())
76 continue;
77 if (columns[i]->empty())
78 columns[i] = std::move(*templates[i]->evaluateAll(block_missing_values, i)).mutate();
79 else
80 {
81 ColumnPtr evaluated = templates[i]->evaluateAll(block_missing_values, i, columns[i]->size());
82 columns[i]->insertRangeFrom(*evaluated, 0, evaluated->size());
83 }
84 }
85
86 if (columns.empty() || columns[0]->empty())
87 {
88 readSuffix();
89 return {};
90 }
91
92 size_t rows_in_block = columns[0]->size();
93 return Chunk{std::move(columns), rows_in_block};
94}
95
96void ValuesBlockInputFormat::readRow(MutableColumns & columns, size_t row_num)
97{
98 assertChar('(', buf);
99
100 for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
101 {
102 skipWhitespaceIfAny(buf);
103 PeekableReadBufferCheckpoint checkpoint{buf};
104 bool read;
105
106 /// Parse value using fast streaming parser for literals and slow SQL parser for expressions.
107 /// If there is SQL expression in some row, template of this expression will be deduced,
108 /// so it makes possible to parse the following rows much faster
109 /// if expressions in the following rows have the same structure
110 if (parser_type_for_column[column_idx] == ParserType::Streaming)
111 read = tryReadValue(*columns[column_idx], column_idx);
112 else if (parser_type_for_column[column_idx] == ParserType::BatchTemplate)
113 read = tryParseExpressionUsingTemplate(columns[column_idx], column_idx);
114 else /// if (parser_type_for_column[column_idx] == ParserType::SingleExpressionEvaluation)
115 read = parseExpression(*columns[column_idx], column_idx);
116
117 if (!read)
118 block_missing_values.setBit(column_idx, row_num);
119 /// If read is true, value still may be missing. Bit mask for these values will be copied from ConstantExpressionTemplate later.
120 }
121
122 skipWhitespaceIfAny(buf);
123 if (!buf.eof() && *buf.position() == ',')
124 ++buf.position();
125
126 ++total_rows;
127}
128
129bool ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx)
130{
131 /// Try to parse expression using template if one was successfully deduced while parsing the first row
132 if (templates[column_idx]->parseExpression(buf, format_settings))
133 {
134 ++rows_parsed_using_template[column_idx];
135 return true;
136 }
137
138 /// Expression in the current row is not match template deduced on the first row.
139 /// Evaluate expressions, which were parsed using this template.
140 if (column->empty())
141 column = std::move(*templates[column_idx]->evaluateAll(block_missing_values, column_idx)).mutate();
142 else
143 {
144 ColumnPtr evaluated = templates[column_idx]->evaluateAll(block_missing_values, column_idx, column->size());
145 column->insertRangeFrom(*evaluated, 0, evaluated->size());
146 }
147 /// Do not use this template anymore
148 templates[column_idx].reset();
149 buf.rollbackToCheckpoint();
150
151 /// It will deduce new template or fallback to slow SQL parser
152 return parseExpression(*column, column_idx);
153}
154
155bool ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
156{
157 bool rollback_on_exception = false;
158 try
159 {
160 bool read = true;
161 const auto & type = types[column_idx];
162 if (format_settings.null_as_default && !type->isNullable())
163 read = DataTypeNullable::deserializeTextQuoted(column, buf, format_settings, type);
164 else
165 type->deserializeAsTextQuoted(column, buf, format_settings);
166 rollback_on_exception = true;
167
168 skipWhitespaceIfAny(buf);
169 assertDelimiterAfterValue(column_idx);
170 return read;
171 }
172 catch (const Exception & e)
173 {
174 if (!isParseError(e.code()) && e.code() != ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED)
175 throw;
176 if (rollback_on_exception)
177 column.popBack(1);
178
179 /// Switch to SQL parser and don't try to use streaming parser for complex expressions
180 /// Note: Throwing exceptions for each expression may be very slow because of stacktraces
181 buf.rollbackToCheckpoint();
182 return parseExpression(column, column_idx);
183 }
184}
185
186bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
187{
188 const Block & header = getPort().getHeader();
189 const IDataType & type = *header.getByPosition(column_idx).type;
190
191 /// We need continuous memory containing the expression to use Lexer
192 skipToNextRow(0, 1);
193 buf.makeContinuousMemoryFromCheckpointToPos();
194 buf.rollbackToCheckpoint();
195
196 Expected expected;
197 Tokens tokens(buf.position(), buf.buffer().end());
198 IParser::Pos token_iterator(tokens);
199 ASTPtr ast;
200
201 bool parsed = parser.parse(token_iterator, ast, expected);
202
203 /// Consider delimiter after value (',' or ')') as part of expression
204 if (column_idx + 1 != num_columns)
205 parsed &= token_iterator->type == TokenType::Comma;
206 else
207 parsed &= token_iterator->type == TokenType::ClosingRoundBracket;
208
209 if (!parsed)
210 throw Exception("Cannot parse expression of type " + type.getName() + " here: "
211 + String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())),
212 ErrorCodes::SYNTAX_ERROR);
213 ++token_iterator;
214
215 if (parser_type_for_column[column_idx] != ParserType::Streaming && dynamic_cast<const ASTLiteral *>(ast.get()))
216 {
217 /// It's possible that streaming parsing has failed on some row (e.g. because of '+' sign before integer),
218 /// but it still can parse the following rows
219 /// Check if we can use fast streaming parser instead if using templates
220 bool rollback_on_exception = false;
221 bool ok = false;
222 try
223 {
224 header.getByPosition(column_idx).type->deserializeAsTextQuoted(column, buf, format_settings);
225 rollback_on_exception = true;
226 skipWhitespaceIfAny(buf);
227 if (checkDelimiterAfterValue(column_idx))
228 ok = true;
229 }
230 catch (const Exception & e)
231 {
232 if (!isParseError(e.code()))
233 throw;
234 }
235 if (ok)
236 {
237 parser_type_for_column[column_idx] = ParserType::Streaming;
238 return true;
239 }
240 else if (rollback_on_exception)
241 column.popBack(1);
242 }
243
244 parser_type_for_column[column_idx] = ParserType::SingleExpressionEvaluation;
245
246 /// Try to deduce template of expression and use it to parse the following rows
247 if (shouldDeduceNewTemplate(column_idx))
248 {
249 if (templates[column_idx])
250 throw DB::Exception("Template for column " + std::to_string(column_idx) + " already exists and it was not evaluated yet",
251 ErrorCodes::LOGICAL_ERROR);
252 std::exception_ptr exception;
253 try
254 {
255 bool found_in_cache = false;
256 const auto & result_type = header.getByPosition(column_idx).type;
257 const char * delimiter = (column_idx + 1 == num_columns) ? ")" : ",";
258 auto structure = templates_cache.getFromCacheOrConstruct(result_type, format_settings.null_as_default,
259 TokenIterator(tokens), token_iterator,
260 ast, *context, &found_in_cache, delimiter);
261 templates[column_idx].emplace(structure);
262 if (found_in_cache)
263 ++attempts_to_deduce_template_cached[column_idx];
264 else
265 ++attempts_to_deduce_template[column_idx];
266
267 buf.rollbackToCheckpoint();
268 if (templates[column_idx]->parseExpression(buf, format_settings))
269 {
270 ++rows_parsed_using_template[column_idx];
271 parser_type_for_column[column_idx] = ParserType::BatchTemplate;
272 return true;
273 }
274 }
275 catch (...)
276 {
277 exception = std::current_exception();
278 }
279 if (!format_settings.values.interpret_expressions)
280 {
281 if (exception)
282 std::rethrow_exception(exception);
283 else
284 {
285 buf.rollbackToCheckpoint();
286 size_t len = const_cast<char *>(token_iterator->begin) - buf.position();
287 throw Exception("Cannot deduce template of expression: " + std::string(buf.position(), len), ErrorCodes::SYNTAX_ERROR);
288 }
289 }
290 /// Continue parsing without template
291 templates[column_idx].reset();
292 }
293
294 if (!format_settings.values.interpret_expressions)
295 throw Exception("Interpreting expressions is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
296
297 /// Try to evaluate single expression if other parsers don't work
298 buf.position() = const_cast<char *>(token_iterator->begin);
299
300 std::pair<Field, DataTypePtr> value_raw = evaluateConstantExpression(ast, *context);
301 Field value = convertFieldToType(value_raw.first, type, value_raw.second.get());
302
303 /// Check that we are indeed allowed to insert a NULL.
304 if (value.isNull() && !type.isNullable())
305 {
306 if (format_settings.null_as_default)
307 {
308 type.insertDefaultInto(column);
309 return false;
310 }
311 buf.rollbackToCheckpoint();
312 throw Exception{"Cannot insert NULL value into a column of type '" + type.getName() + "'"
313 + " at: " +
314 String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())),
315 ErrorCodes::TYPE_MISMATCH};
316 }
317
318 column.insert(value);
319 return true;
320}
321
322/// Can be used in fileSegmentationEngine for parallel parsing of Values
323bool ValuesBlockInputFormat::skipToNextRow(size_t min_chunk_bytes, int balance)
324{
325 skipWhitespaceIfAny(buf);
326 if (buf.eof() || *buf.position() == ';')
327 return false;
328 bool quoted = false;
329
330 size_t chunk_begin_buf_count = buf.count();
331 while (!buf.eof() && (balance || buf.count() - chunk_begin_buf_count < min_chunk_bytes))
332 {
333 buf.position() = find_first_symbols<'\\', '\'', ')', '('>(buf.position(), buf.buffer().end());
334 if (buf.position() == buf.buffer().end())
335 continue;
336 if (*buf.position() == '\\')
337 {
338 ++buf.position();
339 if (!buf.eof())
340 ++buf.position();
341 }
342 else if (*buf.position() == '\'')
343 {
344 quoted ^= true;
345 ++buf.position();
346 }
347 else if (*buf.position() == ')')
348 {
349 ++buf.position();
350 if (!quoted)
351 --balance;
352 }
353 else if (*buf.position() == '(')
354 {
355 ++buf.position();
356 if (!quoted)
357 ++balance;
358 }
359 }
360
361 if (!buf.eof() && *buf.position() == ',')
362 ++buf.position();
363 return true;
364}
365
366void ValuesBlockInputFormat::assertDelimiterAfterValue(size_t column_idx)
367{
368 if (unlikely(!checkDelimiterAfterValue(column_idx)))
369 throwAtAssertionFailed((column_idx + 1 == num_columns) ? ")" : ",", buf);
370}
371
372bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx)
373{
374 skipWhitespaceIfAny(buf);
375
376 if (likely(column_idx + 1 != num_columns))
377 return checkChar(',', buf);
378 else
379 return checkChar(')', buf);
380}
381
382bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)
383{
384 if (!format_settings.values.deduce_templates_of_expressions)
385 return false;
386
387 /// TODO better heuristic
388
389 /// Using template from cache is approx 2x faster, than evaluating single expression
390 /// Construction of new template is approx 1.5x slower, than evaluating single expression
391 float attempts_weighted = 1.5 * attempts_to_deduce_template[column_idx] + 0.5 * attempts_to_deduce_template_cached[column_idx];
392
393 constexpr size_t max_attempts = 100;
394 if (attempts_weighted < max_attempts)
395 return true;
396
397 if (rows_parsed_using_template[column_idx] / attempts_weighted > 1)
398 {
399 /// Try again
400 attempts_to_deduce_template[column_idx] = 0;
401 attempts_to_deduce_template_cached[column_idx] = 0;
402 rows_parsed_using_template[column_idx] = 0;
403 return true;
404 }
405 return false;
406}
407
408void ValuesBlockInputFormat::readSuffix()
409{
410 if (buf.hasUnreadData())
411 throw Exception("Unread data in PeekableReadBuffer will be lost. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
412}
413
414void ValuesBlockInputFormat::resetParser()
415{
416 IInputFormat::resetParser();
417 // I'm not resetting parser modes here.
418 // There is a good chance that all messages have the same format.
419 total_rows = 0;
420}
421
422void registerInputFormatProcessorValues(FormatFactory & factory)
423{
424 factory.registerInputFormatProcessor("Values", [](
425 ReadBuffer & buf,
426 const Block & header,
427 const RowInputFormatParams & params,
428 const FormatSettings & settings)
429 {
430 return std::make_shared<ValuesBlockInputFormat>(buf, header, params, settings);
431 });
432}
433
434}
435