1#include <Processors/Formats/Impl/TemplateRowInputFormat.h>
2#include <Formats/FormatFactory.h>
3#include <Formats/verbosePrintString.h>
4#include <IO/Operators.h>
5#include <DataTypes/DataTypeNothing.h>
6#include <Interpreters/Context.h>
7#include <DataTypes/DataTypeNullable.h>
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14extern const int ATTEMPT_TO_READ_AFTER_EOF;
15extern const int CANNOT_READ_ALL_DATA;
16extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
17extern const int CANNOT_PARSE_QUOTED_STRING;
18extern const int SYNTAX_ERROR;
19}
20
21
22TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
23 FormatSettings settings_, bool ignore_spaces_,
24 ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
25 std::string row_between_delimiter_)
26 : RowInputFormatWithDiagnosticInfo(header_, buf, params_), buf(in_), data_types(header_.getDataTypes()),
27 settings(std::move(settings_)), ignore_spaces(ignore_spaces_),
28 format(std::move(format_)), row_format(std::move(row_format_)),
29 default_csv_delimiter(settings.csv.delimiter), row_between_delimiter(std::move(row_between_delimiter_))
30{
31 /// Validate format string for result set
32 bool has_data = false;
33 for (size_t i = 0; i < format.columnsCount(); ++i)
34 {
35 if (format.format_idx_to_column_idx[i])
36 {
37 if (*format.format_idx_to_column_idx[i] != 0)
38 format.throwInvalidFormat("Invalid input part", i);
39 if (has_data)
40 format.throwInvalidFormat("${data} can occur only once", i);
41 if (format.formats[i] != ColumnFormat::None)
42 format.throwInvalidFormat("${data} must have empty or None deserialization type", i);
43 has_data = true;
44 format_data_idx = i;
45 }
46 else
47 {
48 if (format.formats[i] == ColumnFormat::Xml || format.formats[i] == ColumnFormat::Raw)
49 format.throwInvalidFormat("XML and Raw deserialization is not supported", i);
50 }
51 }
52
53 /// Validate format string for rows
54 std::vector<UInt8> column_in_format(header_.columns(), false);
55 for (size_t i = 0; i < row_format.columnsCount(); ++i)
56 {
57 if (row_format.formats[i] == ColumnFormat::Xml || row_format.formats[i] == ColumnFormat::Raw)
58 row_format.throwInvalidFormat("XML and Raw deserialization is not supported", i);
59
60 if (row_format.format_idx_to_column_idx[i])
61 {
62 if (header_.columns() <= *row_format.format_idx_to_column_idx[i])
63 row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) +
64 " must be less then number of columns (" + std::to_string(header_.columns()) + ")", i);
65 if (row_format.formats[i] == ColumnFormat::None)
66 row_format.throwInvalidFormat("Column is not skipped, but deserialization type is None", i);
67
68 size_t col_idx = *row_format.format_idx_to_column_idx[i];
69 if (column_in_format[col_idx])
70 row_format.throwInvalidFormat("Duplicate column", i);
71 column_in_format[col_idx] = true;
72 }
73 }
74
75 for (size_t i = 0; i < header_.columns(); ++i)
76 if (!column_in_format[i])
77 always_default_columns.push_back(i);
78}
79
80void TemplateRowInputFormat::readPrefix()
81{
82 size_t last_successfully_parsed_idx = 0;
83 try
84 {
85 tryReadPrefixOrSuffix<void>(last_successfully_parsed_idx, format_data_idx);
86 }
87 catch (Exception & e)
88 {
89 format.throwInvalidFormat(e.message() + " While parsing prefix", last_successfully_parsed_idx);
90 }
91}
92
93/// Asserts delimiters and skips fields in prefix or suffix.
94/// tryReadPrefixOrSuffix<bool>(...) is used in checkForSuffix() to avoid throwing an exception after read of each row
95/// (most likely false will be returned on first call of checkString(...))
96template <typename ReturnType>
97ReturnType TemplateRowInputFormat::tryReadPrefixOrSuffix(size_t & input_part_beg, size_t input_part_end)
98{
99 static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
100
101 skipSpaces();
102 if constexpr (throw_exception)
103 assertString(format.delimiters[input_part_beg], buf);
104 else
105 {
106 if (likely(!checkString(format.delimiters[input_part_beg], buf)))
107 return ReturnType(false);
108 }
109
110 while (input_part_beg < input_part_end)
111 {
112 skipSpaces();
113 if constexpr (throw_exception)
114 skipField(format.formats[input_part_beg]);
115 else
116 {
117 try
118 {
119 skipField(format.formats[input_part_beg]);
120 }
121 catch (const Exception & e)
122 {
123 if (e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF &&
124 e.code() != ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE &&
125 e.code() != ErrorCodes::CANNOT_PARSE_QUOTED_STRING)
126 throw;
127 /// If it's parsing error, then suffix is not found
128 return ReturnType(false);
129 }
130 }
131 ++input_part_beg;
132
133 skipSpaces();
134 if constexpr (throw_exception)
135 assertString(format.delimiters[input_part_beg], buf);
136 else
137 {
138 if (likely(!checkString(format.delimiters[input_part_beg], buf)))
139 return ReturnType(false);
140 }
141 }
142
143 if constexpr (!throw_exception)
144 return ReturnType(true);
145}
146
147bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & extra)
148{
149 /// This function can be called again after it returned false
150 if (unlikely(end_of_stream))
151 return false;
152
153 skipSpaces();
154
155 if (unlikely(checkForSuffix()))
156 {
157 end_of_stream = true;
158 return false;
159 }
160
161 updateDiagnosticInfo();
162
163 if (likely(row_num != 1))
164 assertString(row_between_delimiter, buf);
165
166 extra.read_columns.assign(columns.size(), false);
167
168 for (size_t i = 0; i < row_format.columnsCount(); ++i)
169 {
170 skipSpaces();
171 assertString(row_format.delimiters[i], buf);
172 skipSpaces();
173 if (row_format.format_idx_to_column_idx[i])
174 {
175 size_t col_idx = *row_format.format_idx_to_column_idx[i];
176 extra.read_columns[col_idx] = deserializeField(data_types[col_idx], *columns[col_idx], i);
177 }
178 else
179 skipField(row_format.formats[i]);
180
181 }
182
183 skipSpaces();
184 assertString(row_format.delimiters.back(), buf);
185
186 for (const auto & idx : always_default_columns)
187 data_types[idx]->insertDefaultInto(*columns[idx]);
188
189 return true;
190}
191
192bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
193{
194 ColumnFormat col_format = row_format.formats[file_column];
195 bool read = true;
196 bool parse_as_nullable = settings.null_as_default && !type->isNullable();
197 try
198 {
199 switch (col_format)
200 {
201 case ColumnFormat::Escaped:
202 if (parse_as_nullable)
203 read = DataTypeNullable::deserializeTextEscaped(column, buf, settings, type);
204 else
205 type->deserializeAsTextEscaped(column, buf, settings);
206 break;
207 case ColumnFormat::Quoted:
208 if (parse_as_nullable)
209 read = DataTypeNullable::deserializeTextQuoted(column, buf, settings, type);
210 else
211 type->deserializeAsTextQuoted(column, buf, settings);
212 break;
213 case ColumnFormat::Csv:
214 /// Will read unquoted string until settings.csv.delimiter
215 settings.csv.delimiter = row_format.delimiters[file_column + 1].empty() ? default_csv_delimiter :
216 row_format.delimiters[file_column + 1].front();
217 if (parse_as_nullable)
218 read = DataTypeNullable::deserializeTextCSV(column, buf, settings, type);
219 else
220 type->deserializeAsTextCSV(column, buf, settings);
221 break;
222 case ColumnFormat::Json:
223 if (parse_as_nullable)
224 read = DataTypeNullable::deserializeTextJSON(column, buf, settings, type);
225 else
226 type->deserializeAsTextJSON(column, buf, settings);
227 break;
228 default:
229 __builtin_unreachable();
230 }
231 }
232 catch (Exception & e)
233 {
234 if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
235 throwUnexpectedEof();
236 throw;
237 }
238 return read;
239}
240
241void TemplateRowInputFormat::skipField(TemplateRowInputFormat::ColumnFormat col_format)
242{
243 String tmp;
244 constexpr const char * field_name = "<SKIPPED COLUMN>";
245 constexpr size_t field_name_len = 16;
246 try
247 {
248 switch (col_format)
249 {
250 case ColumnFormat::None:
251 /// Empty field, just skip spaces
252 break;
253 case ColumnFormat::Escaped:
254 readEscapedString(tmp, buf);
255 break;
256 case ColumnFormat::Quoted:
257 readQuotedString(tmp, buf);
258 break;
259 case ColumnFormat::Csv:
260 readCSVString(tmp, buf, settings.csv);
261 break;
262 case ColumnFormat::Json:
263 skipJSONField(buf, StringRef(field_name, field_name_len));
264 break;
265 default:
266 __builtin_unreachable();
267 }
268 }
269 catch (Exception & e)
270 {
271 if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
272 throwUnexpectedEof();
273 throw;
274 }
275}
276
277/// Returns true if all rows have been read i.e. there are only suffix and spaces (if ignore_spaces == true) before EOF.
278/// Otherwise returns false
279bool TemplateRowInputFormat::checkForSuffix()
280{
281 PeekableReadBufferCheckpoint checkpoint{buf};
282 bool suffix_found = false;
283 size_t last_successfully_parsed_idx = format_data_idx + 1;
284 try
285 {
286 suffix_found = tryReadPrefixOrSuffix<bool>(last_successfully_parsed_idx, format.columnsCount());
287 }
288 catch (const Exception & e)
289 {
290 if (e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF &&
291 e.code() != ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE &&
292 e.code() != ErrorCodes::CANNOT_PARSE_QUOTED_STRING)
293 throw;
294 }
295
296 if (unlikely(suffix_found))
297 {
298 skipSpaces();
299 if (buf.eof())
300 return true;
301 }
302
303 buf.rollbackToCheckpoint();
304 return false;
305}
306
307bool TemplateRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
308{
309 out << "Suffix does not match: ";
310 size_t last_successfully_parsed_idx = format_data_idx + 1;
311 const ReadBuffer::Position row_begin_pos = buf.position();
312 bool caught = false;
313 try
314 {
315 PeekableReadBufferCheckpoint checkpoint{buf, true};
316 tryReadPrefixOrSuffix<void>(last_successfully_parsed_idx, format.columnsCount());
317 }
318 catch (Exception & e)
319 {
320 out << e.message() << " Near column " << last_successfully_parsed_idx;
321 caught = true;
322 }
323 if (!caught)
324 {
325 out << " There is some data after suffix (EOF expected, got ";
326 verbosePrintString(buf.position(), std::min(buf.buffer().end(), buf.position() + 16), out);
327 out << "). ";
328 }
329 out << " Format string (from format_schema): \n" << format.dump() << "\n";
330
331 if (row_begin_pos != buf.position())
332 {
333 /// Pointers to buffer memory were invalidated during checking for suffix
334 out << "\nCannot print more diagnostic info.";
335 return false;
336 }
337
338 out << "\nUsing format string (from format_schema_rows): " << row_format.dump() << "\n";
339 out << "\nTrying to parse next row, because suffix does not match:\n";
340 try
341 {
342 if (likely(row_num != 1))
343 assertString(row_between_delimiter, buf);
344 }
345 catch (const DB::Exception &)
346 {
347 writeErrorStringForWrongDelimiter(out, "delimiter between rows", row_between_delimiter);
348
349 return false;
350 }
351 for (size_t i = 0; i < row_format.columnsCount(); ++i)
352 {
353 skipSpaces();
354 try
355 {
356 assertString(row_format.delimiters[i], buf);
357 }
358 catch (const DB::Exception &)
359 {
360 writeErrorStringForWrongDelimiter(out, "delimiter before field " + std::to_string(i), row_format.delimiters[i]);
361 return false;
362 }
363
364 skipSpaces();
365 if (row_format.format_idx_to_column_idx[i])
366 {
367 auto & header = getPort().getHeader();
368 size_t col_idx = *row_format.format_idx_to_column_idx[i];
369 if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx],
370 *columns[col_idx], out, i))
371 {
372 out << "Maybe it's not possible to deserialize field " + std::to_string(i) +
373 " as " + ParsedTemplateFormatString::formatToString(row_format.formats[i]);
374 return false;
375 }
376 }
377 else
378 {
379 static const String skipped_column_str = "<SKIPPED COLUMN>";
380 static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
381 static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
382 if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, i))
383 return false;
384 }
385 }
386
387 skipSpaces();
388 try
389 {
390 assertString(row_format.delimiters.back(), buf);
391 }
392 catch (const DB::Exception &)
393 {
394 writeErrorStringForWrongDelimiter(out, "delimiter after last field", row_format.delimiters.back());
395 return false;
396 }
397
398 return true;
399}
400
401void TemplateRowInputFormat::writeErrorStringForWrongDelimiter(WriteBuffer & out, const String & description, const String & delim)
402{
403 out << "ERROR: There is no " << description << ": expected ";
404 verbosePrintString(delim.data(), delim.data() + delim.size(), out);
405 out << ", got ";
406 if (buf.eof())
407 out << "<End of stream>";
408 else
409 verbosePrintString(buf.position(), std::min(buf.position() + delim.size() + 10, buf.buffer().end()), out);
410 out << '\n';
411}
412
413void TemplateRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t file_column,
414 ReadBuffer::Position & prev_pos, ReadBuffer::Position & curr_pos)
415{
416 prev_pos = buf.position();
417 if (row_format.format_idx_to_column_idx[file_column])
418 deserializeField(type, column, file_column);
419 else
420 skipField(row_format.formats[file_column]);
421 curr_pos = buf.position();
422}
423
424bool TemplateRowInputFormat::isGarbageAfterField(size_t, ReadBuffer::Position)
425{
426 /// Garbage will be considered as wrong delimiter
427 return false;
428}
429
430bool TemplateRowInputFormat::allowSyncAfterError() const
431{
432 return !row_format.delimiters.back().empty() || !row_between_delimiter.empty();
433}
434
435void TemplateRowInputFormat::syncAfterError()
436{
437 bool at_beginning_of_row_or_eof = false;
438 while (!at_beginning_of_row_or_eof)
439 {
440 skipToNextDelimiterOrEof(row_format.delimiters.back());
441 if (buf.eof())
442 {
443 end_of_stream = true;
444 return;
445 }
446 buf.ignore(row_format.delimiters.back().size());
447
448 skipSpaces();
449 if (checkForSuffix())
450 return;
451
452 bool last_delimiter_in_row_found = !row_format.delimiters.back().empty();
453
454 if (last_delimiter_in_row_found && checkString(row_between_delimiter, buf))
455 at_beginning_of_row_or_eof = true;
456 else
457 skipToNextDelimiterOrEof(row_between_delimiter);
458
459 if (buf.eof())
460 at_beginning_of_row_or_eof = end_of_stream = true;
461 }
462 /// It can happen that buf.position() is not at the beginning of row
463 /// if some delimiters is similar to row_format.delimiters.back() and row_between_delimiter.
464 /// It will cause another parsing error.
465}
466
467/// Searches for delimiter in input stream and sets buffer position to the beginning of delimiter (if found) or EOF (if not)
468void TemplateRowInputFormat::skipToNextDelimiterOrEof(const String & delimiter)
469{
470 if (delimiter.empty())
471 return;
472
473 while (!buf.eof())
474 {
475 void * pos = memchr(buf.position(), delimiter[0], buf.available());
476 if (!pos)
477 {
478 buf.position() += buf.available();
479 continue;
480 }
481
482 buf.position() = static_cast<ReadBuffer::Position>(pos);
483
484 PeekableReadBufferCheckpoint checkpoint{buf};
485 if (checkString(delimiter, buf))
486 return;
487
488 buf.rollbackToCheckpoint();
489 ++buf.position();
490 }
491}
492
493void TemplateRowInputFormat::throwUnexpectedEof()
494{
495 throw Exception("Unexpected EOF while parsing row " + std::to_string(row_num) + ". "
496 "Maybe last row has wrong format or input doesn't contain specified suffix before EOF.",
497 ErrorCodes::CANNOT_READ_ALL_DATA);
498}
499
500void TemplateRowInputFormat::resetParser()
501{
502 RowInputFormatWithDiagnosticInfo::resetParser();
503 end_of_stream = false;
504}
505
506void registerInputFormatProcessorTemplate(FormatFactory & factory)
507{
508 for (bool ignore_spaces : {false, true})
509 {
510 factory.registerInputFormatProcessor(ignore_spaces ? "TemplateIgnoreSpaces" : "Template", [=](
511 ReadBuffer & buf,
512 const Block & sample,
513 IRowInputFormat::Params params,
514 const FormatSettings & settings)
515 {
516 ParsedTemplateFormatString resultset_format;
517 if (settings.template_settings.resultset_format.empty())
518 {
519 /// Default format string: "${data}"
520 resultset_format.delimiters.resize(2);
521 resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None);
522 resultset_format.format_idx_to_column_idx.emplace_back(0);
523 resultset_format.column_names.emplace_back("data");
524 }
525 else
526 {
527 /// Read format string from file
528 resultset_format = ParsedTemplateFormatString(
529 FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false,
530 settings.schema.is_server, settings.schema.format_schema_path),
531 [&](const String & partName) -> std::optional<size_t>
532 {
533 if (partName == "data")
534 return 0;
535 throw Exception("Unknown input part " + partName,
536 ErrorCodes::SYNTAX_ERROR);
537 });
538 }
539
540 ParsedTemplateFormatString row_format = ParsedTemplateFormatString(
541 FormatSchemaInfo(settings.template_settings.row_format, "Template", false,
542 settings.schema.is_server, settings.schema.format_schema_path),
543 [&](const String & colName) -> std::optional<size_t>
544 {
545 return sample.getPositionByName(colName);
546 });
547
548 return std::make_shared<TemplateRowInputFormat>(sample, buf, params, settings, ignore_spaces, resultset_format, row_format, settings.template_settings.row_between_delimiter);
549 });
550 }
551
552 for (bool ignore_spaces : {false, true})
553 {
554 factory.registerInputFormatProcessor(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", [=](
555 ReadBuffer & buf,
556 const Block & sample,
557 IRowInputFormat::Params params,
558 const FormatSettings & settings)
559 {
560 ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom);
561 ParsedTemplateFormatString row_format = ParsedTemplateFormatString::setupCustomSeparatedRowFormat(settings.custom, sample);
562
563 return std::make_shared<TemplateRowInputFormat>(sample, buf, params, settings, ignore_spaces, resultset_format, row_format, settings.custom.row_between_delimiter);
564 });
565 }
566}
567
568}
569