1 | #include <IO/ReadHelpers.h> |
2 | #include <IO/Operators.h> |
3 | |
4 | #include <Formats/verbosePrintString.h> |
5 | #include <Processors/Formats/Impl/CSVRowInputFormat.h> |
6 | #include <Formats/FormatFactory.h> |
7 | #include <DataTypes/DataTypeNullable.h> |
8 | #include <DataTypes/DataTypeNothing.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int INCORRECT_DATA; |
17 | extern const int LOGICAL_ERROR; |
18 | } |
19 | |
20 | |
21 | CSVRowInputFormat::CSVRowInputFormat(const Block & , ReadBuffer & in_, const Params & params_, |
22 | bool with_names_, const FormatSettings & format_settings_) |
23 | : RowInputFormatWithDiagnosticInfo(header_, in_, params_) |
24 | , with_names(with_names_) |
25 | , format_settings(format_settings_) |
26 | { |
27 | |
28 | const String bad_delimiters = " \t\"'.UL" ; |
29 | if (bad_delimiters.find(format_settings.csv.delimiter) != String::npos) |
30 | throw Exception(String("CSV format may not work correctly with delimiter '" ) + format_settings.csv.delimiter + |
31 | "'. Try use CustomSeparated format instead." , ErrorCodes::BAD_ARGUMENTS); |
32 | |
33 | auto & sample = getPort().getHeader(); |
34 | size_t num_columns = sample.columns(); |
35 | |
36 | data_types.resize(num_columns); |
37 | column_indexes_by_names.reserve(num_columns); |
38 | |
39 | for (size_t i = 0; i < num_columns; ++i) |
40 | { |
41 | const auto & column_info = sample.getByPosition(i); |
42 | |
43 | data_types[i] = column_info.type; |
44 | column_indexes_by_names.emplace(column_info.name, i); |
45 | } |
46 | } |
47 | |
48 | |
49 | /// Map an input file column to a table column, based on its name. |
50 | void CSVRowInputFormat::addInputColumn(const String & column_name) |
51 | { |
52 | const auto column_it = column_indexes_by_names.find(column_name); |
53 | if (column_it == column_indexes_by_names.end()) |
54 | { |
55 | if (format_settings.skip_unknown_fields) |
56 | { |
57 | column_indexes_for_input_fields.push_back(std::nullopt); |
58 | return; |
59 | } |
60 | |
61 | throw Exception( |
62 | "Unknown field found in CSV header: '" + column_name + "' " + |
63 | "at position " + std::to_string(column_indexes_for_input_fields.size()) + |
64 | "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed" , |
65 | ErrorCodes::INCORRECT_DATA |
66 | ); |
67 | } |
68 | |
69 | const auto column_index = column_it->second; |
70 | |
71 | if (read_columns[column_index]) |
72 | throw Exception("Duplicate field found while parsing CSV header: " + column_name, ErrorCodes::INCORRECT_DATA); |
73 | |
74 | read_columns[column_index] = true; |
75 | column_indexes_for_input_fields.emplace_back(column_index); |
76 | } |
77 | |
78 | static void skipEndOfLine(ReadBuffer & in) |
79 | { |
80 | /// \n (Unix) or \r\n (DOS/Windows) or \n\r (Mac OS Classic) |
81 | |
82 | if (*in.position() == '\n') |
83 | { |
84 | ++in.position(); |
85 | if (!in.eof() && *in.position() == '\r') |
86 | ++in.position(); |
87 | } |
88 | else if (*in.position() == '\r') |
89 | { |
90 | ++in.position(); |
91 | if (!in.eof() && *in.position() == '\n') |
92 | ++in.position(); |
93 | else |
94 | throw Exception("Cannot parse CSV format: found \\r (CR) not followed by \\n (LF)." |
95 | " Line must end by \\n (LF) or \\r\\n (CR LF) or \\n\\r." , ErrorCodes::INCORRECT_DATA); |
96 | } |
97 | else if (!in.eof()) |
98 | throw Exception("Expected end of line" , ErrorCodes::INCORRECT_DATA); |
99 | } |
100 | |
101 | |
102 | static void skipDelimiter(ReadBuffer & in, const char delimiter, bool is_last_column) |
103 | { |
104 | if (is_last_column) |
105 | { |
106 | if (in.eof()) |
107 | return; |
108 | |
109 | /// we support the extra delimiter at the end of the line |
110 | if (*in.position() == delimiter) |
111 | { |
112 | ++in.position(); |
113 | if (in.eof()) |
114 | return; |
115 | } |
116 | |
117 | skipEndOfLine(in); |
118 | } |
119 | else |
120 | assertChar(delimiter, in); |
121 | } |
122 | |
123 | |
124 | /// Skip `whitespace` symbols allowed in CSV. |
125 | static inline void skipWhitespacesAndTabs(ReadBuffer & in) |
126 | { |
127 | while (!in.eof() |
128 | && (*in.position() == ' ' |
129 | || *in.position() == '\t')) |
130 | ++in.position(); |
131 | } |
132 | |
133 | |
134 | static void skipRow(ReadBuffer & in, const FormatSettings::CSV & settings, size_t num_columns) |
135 | { |
136 | String tmp; |
137 | for (size_t i = 0; i < num_columns; ++i) |
138 | { |
139 | skipWhitespacesAndTabs(in); |
140 | readCSVString(tmp, in, settings); |
141 | skipWhitespacesAndTabs(in); |
142 | |
143 | skipDelimiter(in, settings.delimiter, i + 1 == num_columns); |
144 | } |
145 | } |
146 | |
147 | |
148 | void CSVRowInputFormat::readPrefix() |
149 | { |
150 | /// In this format, we assume, that if first string field contain BOM as value, it will be written in quotes, |
151 | /// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it. |
152 | skipBOMIfExists(in); |
153 | |
154 | size_t num_columns = data_types.size(); |
155 | auto & = getPort().getHeader(); |
156 | |
157 | if (with_names) |
158 | { |
159 | /// This CSV file has a header row with column names. Depending on the |
160 | /// settings, use it or skip it. |
161 | if (format_settings.with_names_use_header) |
162 | { |
163 | /// Look at the file header to see which columns we have there. |
164 | /// The missing columns are filled with defaults. |
165 | read_columns.assign(header.columns(), false); |
166 | do |
167 | { |
168 | String column_name; |
169 | skipWhitespacesAndTabs(in); |
170 | readCSVString(column_name, in, format_settings.csv); |
171 | skipWhitespacesAndTabs(in); |
172 | |
173 | addInputColumn(column_name); |
174 | } |
175 | while (checkChar(format_settings.csv.delimiter, in)); |
176 | |
177 | skipDelimiter(in, format_settings.csv.delimiter, true); |
178 | |
179 | for (auto read_column : read_columns) |
180 | { |
181 | if (!read_column) |
182 | { |
183 | have_always_default_columns = true; |
184 | break; |
185 | } |
186 | } |
187 | |
188 | return; |
189 | } |
190 | else |
191 | skipRow(in, format_settings.csv, num_columns); |
192 | } |
193 | |
194 | /// The default: map each column of the file to the column of the table with |
195 | /// the same index. |
196 | read_columns.assign(header.columns(), true); |
197 | column_indexes_for_input_fields.resize(header.columns()); |
198 | |
199 | for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i) |
200 | { |
201 | column_indexes_for_input_fields[i] = i; |
202 | } |
203 | } |
204 | |
205 | |
206 | bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) |
207 | { |
208 | if (in.eof()) |
209 | return false; |
210 | |
211 | updateDiagnosticInfo(); |
212 | |
213 | /// Track whether we have to fill any columns in this row with default |
214 | /// values. If not, we return an empty column mask to the caller, so that |
215 | /// it doesn't have to check it. |
216 | bool have_default_columns = have_always_default_columns; |
217 | |
218 | ext.read_columns.assign(read_columns.size(), true); |
219 | const auto delimiter = format_settings.csv.delimiter; |
220 | for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column) |
221 | { |
222 | const auto & table_column = column_indexes_for_input_fields[file_column]; |
223 | const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size(); |
224 | |
225 | if (table_column) |
226 | { |
227 | skipWhitespacesAndTabs(in); |
228 | ext.read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column], is_last_file_column); |
229 | if (!ext.read_columns[*table_column]) |
230 | have_default_columns = true; |
231 | skipWhitespacesAndTabs(in); |
232 | } |
233 | else |
234 | { |
235 | /// We never read this column from the file, just skip it. |
236 | String tmp; |
237 | readCSVString(tmp, in, format_settings.csv); |
238 | } |
239 | |
240 | skipDelimiter(in, delimiter, is_last_file_column); |
241 | } |
242 | |
243 | if (have_default_columns) |
244 | { |
245 | for (size_t i = 0; i < read_columns.size(); i++) |
246 | { |
247 | if (!read_columns[i]) |
248 | { |
249 | /// The column value for this row is going to be overwritten |
250 | /// with default by the caller, but the general assumption is |
251 | /// that the column size increases for each row, so we have |
252 | /// to insert something. Since we do not care about the exact |
253 | /// value, we do not have to use the default value specified by |
254 | /// the data type, and can just use IColumn::insertDefault(). |
255 | columns[i]->insertDefault(); |
256 | ext.read_columns[i] = false; |
257 | } |
258 | } |
259 | } |
260 | |
261 | return true; |
262 | } |
263 | |
264 | bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) |
265 | { |
266 | const char delimiter = format_settings.csv.delimiter; |
267 | |
268 | for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column) |
269 | { |
270 | if (file_column == 0 && in.eof()) |
271 | { |
272 | out << "<End of stream>\n" ; |
273 | return false; |
274 | } |
275 | |
276 | if (column_indexes_for_input_fields[file_column].has_value()) |
277 | { |
278 | auto & = getPort().getHeader(); |
279 | size_t col_idx = column_indexes_for_input_fields[file_column].value(); |
280 | if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx], |
281 | out, file_column)) |
282 | return false; |
283 | } |
284 | else |
285 | { |
286 | static const String skipped_column_str = "<SKIPPED COLUMN>" ; |
287 | static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>(); |
288 | static const MutableColumnPtr skipped_column = skipped_column_type->createColumn(); |
289 | if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column)) |
290 | return false; |
291 | } |
292 | |
293 | /// Delimiters |
294 | if (file_column + 1 == column_indexes_for_input_fields.size()) |
295 | { |
296 | if (in.eof()) |
297 | return false; |
298 | |
299 | /// we support the extra delimiter at the end of the line |
300 | if (*in.position() == delimiter) |
301 | { |
302 | ++in.position(); |
303 | if (in.eof()) |
304 | break; |
305 | } |
306 | |
307 | if (!in.eof() && *in.position() != '\n' && *in.position() != '\r') |
308 | { |
309 | out << "ERROR: There is no line feed. " ; |
310 | verbosePrintString(in.position(), in.position() + 1, out); |
311 | out << " found instead.\n" |
312 | " It's like your file has more columns than expected.\n" |
313 | "And if your file have right number of columns, maybe it have unquoted string value with comma.\n" ; |
314 | |
315 | return false; |
316 | } |
317 | |
318 | skipEndOfLine(in); |
319 | } |
320 | else |
321 | { |
322 | try |
323 | { |
324 | assertChar(delimiter, in); |
325 | } |
326 | catch (const DB::Exception &) |
327 | { |
328 | if (*in.position() == '\n' || *in.position() == '\r') |
329 | { |
330 | out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected." |
331 | " It's like your file has less columns than expected.\n" |
332 | "And if your file have right number of columns, maybe it have unescaped quotes in values.\n" ; |
333 | } |
334 | else |
335 | { |
336 | out << "ERROR: There is no delimiter (" << delimiter << "). " ; |
337 | verbosePrintString(in.position(), in.position() + 1, out); |
338 | out << " found instead.\n" ; |
339 | } |
340 | return false; |
341 | } |
342 | } |
343 | } |
344 | |
345 | return true; |
346 | } |
347 | |
348 | |
349 | void CSVRowInputFormat::syncAfterError() |
350 | { |
351 | skipToNextLineOrEOF(in); |
352 | } |
353 | |
354 | void CSVRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t file_column, |
355 | ReadBuffer::Position & prev_pos, ReadBuffer::Position & curr_pos) |
356 | { |
357 | skipWhitespacesAndTabs(in); |
358 | prev_pos = in.position(); |
359 | |
360 | if (column_indexes_for_input_fields[file_column]) |
361 | { |
362 | const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size(); |
363 | readField(column, type, is_last_file_column); |
364 | } |
365 | else |
366 | { |
367 | String tmp; |
368 | readCSVString(tmp, in, format_settings.csv); |
369 | } |
370 | |
371 | curr_pos = in.position(); |
372 | skipWhitespacesAndTabs(in); |
373 | } |
374 | |
375 | bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column) |
376 | { |
377 | const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter; |
378 | const bool at_last_column_line_end = is_last_file_column |
379 | && (in.eof() || *in.position() == '\n' || *in.position() == '\r'); |
380 | |
381 | /// Note: Tuples are serialized in CSV as separate columns, but with empty_as_default or null_as_default |
382 | /// only one empty or NULL column will be expected |
383 | if (format_settings.csv.empty_as_default |
384 | && (at_delimiter || at_last_column_line_end)) |
385 | { |
386 | /// Treat empty unquoted column value as default value, if |
387 | /// specified in the settings. Tuple columns might seem |
388 | /// problematic, because they are never quoted but still contain |
389 | /// commas, which might be also used as delimiters. However, |
390 | /// they do not contain empty unquoted fields, so this check |
391 | /// works for tuples as well. |
392 | column.insertDefault(); |
393 | return false; |
394 | } |
395 | else if (format_settings.null_as_default && !type->isNullable()) |
396 | { |
397 | /// If value is null but type is not nullable then use default value instead. |
398 | return DataTypeNullable::deserializeTextCSV(column, in, format_settings, type); |
399 | } |
400 | else |
401 | { |
402 | /// Read the column normally. |
403 | type->deserializeAsTextCSV(column, in, format_settings); |
404 | return true; |
405 | } |
406 | } |
407 | |
408 | void CSVRowInputFormat::resetParser() |
409 | { |
410 | RowInputFormatWithDiagnosticInfo::resetParser(); |
411 | column_indexes_for_input_fields.clear(); |
412 | read_columns.clear(); |
413 | have_always_default_columns = false; |
414 | } |
415 | |
416 | |
417 | void registerInputFormatProcessorCSV(FormatFactory & factory) |
418 | { |
419 | for (bool with_names : {false, true}) |
420 | { |
421 | factory.registerInputFormatProcessor(with_names ? "CSVWithNames" : "CSV" , [=]( |
422 | ReadBuffer & buf, |
423 | const Block & sample, |
424 | IRowInputFormat::Params params, |
425 | const FormatSettings & settings) |
426 | { |
427 | return std::make_shared<CSVRowInputFormat>(sample, buf, params, with_names, settings); |
428 | }); |
429 | } |
430 | } |
431 | |
432 | static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) |
433 | { |
434 | char * pos = in.position(); |
435 | bool quotes = false; |
436 | bool need_more_data = true; |
437 | |
438 | while (loadAtPosition(in, memory, pos) && need_more_data) |
439 | { |
440 | if (quotes) |
441 | { |
442 | pos = find_first_symbols<'"'>(pos, in.buffer().end()); |
443 | if (pos == in.buffer().end()) |
444 | continue; |
445 | if (*pos == '"') |
446 | { |
447 | ++pos; |
448 | if (loadAtPosition(in, memory, pos) && *pos == '"') |
449 | ++pos; |
450 | else |
451 | quotes = false; |
452 | } |
453 | } |
454 | else |
455 | { |
456 | pos = find_first_symbols<'"', '\r', '\n'>(pos, in.buffer().end()); |
457 | if (pos == in.buffer().end()) |
458 | continue; |
459 | if (*pos == '"') |
460 | { |
461 | quotes = true; |
462 | ++pos; |
463 | } |
464 | else if (*pos == '\n') |
465 | { |
466 | if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size) |
467 | need_more_data = false; |
468 | ++pos; |
469 | if (loadAtPosition(in, memory, pos) && *pos == '\r') |
470 | ++pos; |
471 | } |
472 | else if (*pos == '\r') |
473 | { |
474 | if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size) |
475 | need_more_data = false; |
476 | ++pos; |
477 | if (loadAtPosition(in, memory, pos) && *pos == '\n') |
478 | ++pos; |
479 | } |
480 | } |
481 | } |
482 | |
483 | saveUpToPosition(in, memory, pos); |
484 | return loadAtPosition(in, memory, pos); |
485 | } |
486 | |
487 | void registerFileSegmentationEngineCSV(FormatFactory & factory) |
488 | { |
489 | factory.registerFileSegmentationEngine("CSV" , &fileSegmentationEngineCSVImpl); |
490 | } |
491 | |
492 | } |
493 | |