1#include <IO/ReadHelpers.h>
2#include <IO/Operators.h>
3
4#include <Formats/verbosePrintString.h>
5#include <Processors/Formats/Impl/CSVRowInputFormat.h>
6#include <Formats/FormatFactory.h>
7#include <DataTypes/DataTypeNullable.h>
8#include <DataTypes/DataTypeNothing.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int INCORRECT_DATA;
17 extern const int LOGICAL_ERROR;
18}
19
20
21CSVRowInputFormat::CSVRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
22 bool with_names_, const FormatSettings & format_settings_)
23 : RowInputFormatWithDiagnosticInfo(header_, in_, params_)
24 , with_names(with_names_)
25 , format_settings(format_settings_)
26{
27
28 const String bad_delimiters = " \t\"'.UL";
29 if (bad_delimiters.find(format_settings.csv.delimiter) != String::npos)
30 throw Exception(String("CSV format may not work correctly with delimiter '") + format_settings.csv.delimiter +
31 "'. Try use CustomSeparated format instead.", ErrorCodes::BAD_ARGUMENTS);
32
33 auto & sample = getPort().getHeader();
34 size_t num_columns = sample.columns();
35
36 data_types.resize(num_columns);
37 column_indexes_by_names.reserve(num_columns);
38
39 for (size_t i = 0; i < num_columns; ++i)
40 {
41 const auto & column_info = sample.getByPosition(i);
42
43 data_types[i] = column_info.type;
44 column_indexes_by_names.emplace(column_info.name, i);
45 }
46}
47
48
49/// Map an input file column to a table column, based on its name.
50void CSVRowInputFormat::addInputColumn(const String & column_name)
51{
52 const auto column_it = column_indexes_by_names.find(column_name);
53 if (column_it == column_indexes_by_names.end())
54 {
55 if (format_settings.skip_unknown_fields)
56 {
57 column_indexes_for_input_fields.push_back(std::nullopt);
58 return;
59 }
60
61 throw Exception(
62 "Unknown field found in CSV header: '" + column_name + "' " +
63 "at position " + std::to_string(column_indexes_for_input_fields.size()) +
64 "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
65 ErrorCodes::INCORRECT_DATA
66 );
67 }
68
69 const auto column_index = column_it->second;
70
71 if (read_columns[column_index])
72 throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
73
74 read_columns[column_index] = true;
75 column_indexes_for_input_fields.emplace_back(column_index);
76}
77
78static void skipEndOfLine(ReadBuffer & in)
79{
80 /// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic)
81
82 if (*in.position() == '\n')
83 {
84 ++in.position();
85 if (!in.eof() && *in.position() == '\r')
86 ++in.position();
87 }
88 else if (*in.position() == '\r')
89 {
90 ++in.position();
91 if (!in.eof() && *in.position() == '\n')
92 ++in.position();
93 else
94 throw Exception("Cannot parse CSV format: found \\r (CR) not followed by \\n (LF)."
95 " Line must end by \\n (LF) or \\r\\n (CR LF) or \\n\\r.", ErrorCodes::INCORRECT_DATA);
96 }
97 else if (!in.eof())
98 throw Exception("Expected end of line", ErrorCodes::INCORRECT_DATA);
99}
100
101
102static void skipDelimiter(ReadBuffer & in, const char delimiter, bool is_last_column)
103{
104 if (is_last_column)
105 {
106 if (in.eof())
107 return;
108
109 /// we support the extra delimiter at the end of the line
110 if (*in.position() == delimiter)
111 {
112 ++in.position();
113 if (in.eof())
114 return;
115 }
116
117 skipEndOfLine(in);
118 }
119 else
120 assertChar(delimiter, in);
121}
122
123
124/// Skip `whitespace` symbols allowed in CSV.
125static inline void skipWhitespacesAndTabs(ReadBuffer & in)
126{
127 while (!in.eof()
128 && (*in.position() == ' '
129 || *in.position() == '\t'))
130 ++in.position();
131}
132
133
134static void skipRow(ReadBuffer & in, const FormatSettings::CSV & settings, size_t num_columns)
135{
136 String tmp;
137 for (size_t i = 0; i < num_columns; ++i)
138 {
139 skipWhitespacesAndTabs(in);
140 readCSVString(tmp, in, settings);
141 skipWhitespacesAndTabs(in);
142
143 skipDelimiter(in, settings.delimiter, i + 1 == num_columns);
144 }
145}
146
147
148void CSVRowInputFormat::readPrefix()
149{
150 /// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes,
151 /// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it.
152 skipBOMIfExists(in);
153
154 size_t num_columns = data_types.size();
155 auto & header = getPort().getHeader();
156
157 if (with_names)
158 {
159 /// This CSV file has a header row with column names. Depending on the
160 /// settings, use it or skip it.
161 if (format_settings.with_names_use_header)
162 {
163 /// Look at the file header to see which columns we have there.
164 /// The missing columns are filled with defaults.
165 read_columns.assign(header.columns(), false);
166 do
167 {
168 String column_name;
169 skipWhitespacesAndTabs(in);
170 readCSVString(column_name, in, format_settings.csv);
171 skipWhitespacesAndTabs(in);
172
173 addInputColumn(column_name);
174 }
175 while (checkChar(format_settings.csv.delimiter, in));
176
177 skipDelimiter(in, format_settings.csv.delimiter, true);
178
179 for (auto read_column : read_columns)
180 {
181 if (!read_column)
182 {
183 have_always_default_columns = true;
184 break;
185 }
186 }
187
188 return;
189 }
190 else
191 skipRow(in, format_settings.csv, num_columns);
192 }
193
194 /// The default: map each column of the file to the column of the table with
195 /// the same index.
196 read_columns.assign(header.columns(), true);
197 column_indexes_for_input_fields.resize(header.columns());
198
199 for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
200 {
201 column_indexes_for_input_fields[i] = i;
202 }
203}
204
205
206bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
207{
208 if (in.eof())
209 return false;
210
211 updateDiagnosticInfo();
212
213 /// Track whether we have to fill any columns in this row with default
214 /// values. If not, we return an empty column mask to the caller, so that
215 /// it doesn't have to check it.
216 bool have_default_columns = have_always_default_columns;
217
218 ext.read_columns.assign(read_columns.size(), true);
219 const auto delimiter = format_settings.csv.delimiter;
220 for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
221 {
222 const auto & table_column = column_indexes_for_input_fields[file_column];
223 const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
224
225 if (table_column)
226 {
227 skipWhitespacesAndTabs(in);
228 ext.read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column], is_last_file_column);
229 if (!ext.read_columns[*table_column])
230 have_default_columns = true;
231 skipWhitespacesAndTabs(in);
232 }
233 else
234 {
235 /// We never read this column from the file, just skip it.
236 String tmp;
237 readCSVString(tmp, in, format_settings.csv);
238 }
239
240 skipDelimiter(in, delimiter, is_last_file_column);
241 }
242
243 if (have_default_columns)
244 {
245 for (size_t i = 0; i < read_columns.size(); i++)
246 {
247 if (!read_columns[i])
248 {
249 /// The column value for this row is going to be overwritten
250 /// with default by the caller, but the general assumption is
251 /// that the column size increases for each row, so we have
252 /// to insert something. Since we do not care about the exact
253 /// value, we do not have to use the default value specified by
254 /// the data type, and can just use IColumn::insertDefault().
255 columns[i]->insertDefault();
256 ext.read_columns[i] = false;
257 }
258 }
259 }
260
261 return true;
262}
263
264bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
265{
266 const char delimiter = format_settings.csv.delimiter;
267
268 for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
269 {
270 if (file_column == 0 && in.eof())
271 {
272 out << "<End of stream>\n";
273 return false;
274 }
275
276 if (column_indexes_for_input_fields[file_column].has_value())
277 {
278 auto & header = getPort().getHeader();
279 size_t col_idx = column_indexes_for_input_fields[file_column].value();
280 if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
281 out, file_column))
282 return false;
283 }
284 else
285 {
286 static const String skipped_column_str = "<SKIPPED COLUMN>";
287 static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
288 static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
289 if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
290 return false;
291 }
292
293 /// Delimiters
294 if (file_column + 1 == column_indexes_for_input_fields.size())
295 {
296 if (in.eof())
297 return false;
298
299 /// we support the extra delimiter at the end of the line
300 if (*in.position() == delimiter)
301 {
302 ++in.position();
303 if (in.eof())
304 break;
305 }
306
307 if (!in.eof() && *in.position() != '\n' && *in.position() != '\r')
308 {
309 out << "ERROR: There is no line feed. ";
310 verbosePrintString(in.position(), in.position() + 1, out);
311 out << " found instead.\n"
312 " It's like your file has more columns than expected.\n"
313 "And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
314
315 return false;
316 }
317
318 skipEndOfLine(in);
319 }
320 else
321 {
322 try
323 {
324 assertChar(delimiter, in);
325 }
326 catch (const DB::Exception &)
327 {
328 if (*in.position() == '\n' || *in.position() == '\r')
329 {
330 out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected."
331 " It's like your file has less columns than expected.\n"
332 "And if your file have right number of columns, maybe it have unescaped quotes in values.\n";
333 }
334 else
335 {
336 out << "ERROR: There is no delimiter (" << delimiter << "). ";
337 verbosePrintString(in.position(), in.position() + 1, out);
338 out << " found instead.\n";
339 }
340 return false;
341 }
342 }
343 }
344
345 return true;
346}
347
348
349void CSVRowInputFormat::syncAfterError()
350{
351 skipToNextLineOrEOF(in);
352}
353
354void CSVRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t file_column,
355 ReadBuffer::Position & prev_pos, ReadBuffer::Position & curr_pos)
356{
357 skipWhitespacesAndTabs(in);
358 prev_pos = in.position();
359
360 if (column_indexes_for_input_fields[file_column])
361 {
362 const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
363 readField(column, type, is_last_file_column);
364 }
365 else
366 {
367 String tmp;
368 readCSVString(tmp, in, format_settings.csv);
369 }
370
371 curr_pos = in.position();
372 skipWhitespacesAndTabs(in);
373}
374
375bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
376{
377 const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter;
378 const bool at_last_column_line_end = is_last_file_column
379 && (in.eof() || *in.position() == '\n' || *in.position() == '\r');
380
381 /// Note: Tuples are serialized in CSV as separate columns, but with empty_as_default or null_as_default
382 /// only one empty or NULL column will be expected
383 if (format_settings.csv.empty_as_default
384 && (at_delimiter || at_last_column_line_end))
385 {
386 /// Treat empty unquoted column value as default value, if
387 /// specified in the settings. Tuple columns might seem
388 /// problematic, because they are never quoted but still contain
389 /// commas, which might be also used as delimiters. However,
390 /// they do not contain empty unquoted fields, so this check
391 /// works for tuples as well.
392 column.insertDefault();
393 return false;
394 }
395 else if (format_settings.null_as_default && !type->isNullable())
396 {
397 /// If value is null but type is not nullable then use default value instead.
398 return DataTypeNullable::deserializeTextCSV(column, in, format_settings, type);
399 }
400 else
401 {
402 /// Read the column normally.
403 type->deserializeAsTextCSV(column, in, format_settings);
404 return true;
405 }
406}
407
408void CSVRowInputFormat::resetParser()
409{
410 RowInputFormatWithDiagnosticInfo::resetParser();
411 column_indexes_for_input_fields.clear();
412 read_columns.clear();
413 have_always_default_columns = false;
414}
415
416
417void registerInputFormatProcessorCSV(FormatFactory & factory)
418{
419 for (bool with_names : {false, true})
420 {
421 factory.registerInputFormatProcessor(with_names ? "CSVWithNames" : "CSV", [=](
422 ReadBuffer & buf,
423 const Block & sample,
424 IRowInputFormat::Params params,
425 const FormatSettings & settings)
426 {
427 return std::make_shared<CSVRowInputFormat>(sample, buf, params, with_names, settings);
428 });
429 }
430}
431
432static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
433{
434 char * pos = in.position();
435 bool quotes = false;
436 bool need_more_data = true;
437
438 while (loadAtPosition(in, memory, pos) && need_more_data)
439 {
440 if (quotes)
441 {
442 pos = find_first_symbols<'"'>(pos, in.buffer().end());
443 if (pos == in.buffer().end())
444 continue;
445 if (*pos == '"')
446 {
447 ++pos;
448 if (loadAtPosition(in, memory, pos) && *pos == '"')
449 ++pos;
450 else
451 quotes = false;
452 }
453 }
454 else
455 {
456 pos = find_first_symbols<'"', '\r', '\n'>(pos, in.buffer().end());
457 if (pos == in.buffer().end())
458 continue;
459 if (*pos == '"')
460 {
461 quotes = true;
462 ++pos;
463 }
464 else if (*pos == '\n')
465 {
466 if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
467 need_more_data = false;
468 ++pos;
469 if (loadAtPosition(in, memory, pos) && *pos == '\r')
470 ++pos;
471 }
472 else if (*pos == '\r')
473 {
474 if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
475 need_more_data = false;
476 ++pos;
477 if (loadAtPosition(in, memory, pos) && *pos == '\n')
478 ++pos;
479 }
480 }
481 }
482
483 saveUpToPosition(in, memory, pos);
484 return loadAtPosition(in, memory, pos);
485}
486
487void registerFileSegmentationEngineCSV(FormatFactory & factory)
488{
489 factory.registerFileSegmentationEngine("CSV", &fileSegmentationEngineCSVImpl);
490}
491
492}
493