1#include <IO/ReadHelpers.h>
2#include <IO/WriteBufferFromString.h>
3#include <IO/Operators.h>
4
5#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
6#include <Formats/verbosePrintString.h>
7#include <Formats/FormatFactory.h>
8#include <DataTypes/DataTypeNothing.h>
9#include <DataTypes/DataTypeNullable.h>
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int INCORRECT_DATA;
17 extern const int LOGICAL_ERROR;
18}
19
20
21static void skipTSVRow(ReadBuffer & in, const size_t num_columns)
22{
23 NullSink null_sink;
24
25 for (size_t i = 0; i < num_columns; ++i)
26 {
27 readEscapedStringInto(null_sink, in);
28 assertChar(i == num_columns - 1 ? '\n' : '\t', in);
29 }
30}
31
32
33/** Check for a common error case - usage of Windows line feed.
34 */
35static void checkForCarriageReturn(ReadBuffer & in)
36{
37 if (in.position()[0] == '\r' || (in.position() != in.buffer().begin() && in.position()[-1] == '\r'))
38 throw Exception("\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
39 "\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
40 " You must transform your file to Unix format."
41 "\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.",
42 ErrorCodes::INCORRECT_DATA);
43}
44
45
46TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(const Block & header_, ReadBuffer & in_, const Params & params_,
47 bool with_names_, bool with_types_, const FormatSettings & format_settings_)
48 : RowInputFormatWithDiagnosticInfo(header_, in_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
49{
50 auto & sample = getPort().getHeader();
51 size_t num_columns = sample.columns();
52
53 data_types.resize(num_columns);
54 column_indexes_by_names.reserve(num_columns);
55
56 for (size_t i = 0; i < num_columns; ++i)
57 {
58 const auto & column_info = sample.getByPosition(i);
59
60 data_types[i] = column_info.type;
61 column_indexes_by_names.emplace(column_info.name, i);
62 }
63
64 column_indexes_for_input_fields.reserve(num_columns);
65 read_columns.assign(num_columns, false);
66}
67
68
69void TabSeparatedRowInputFormat::setupAllColumnsByTableSchema()
70{
71 auto & header = getPort().getHeader();
72 read_columns.assign(header.columns(), true);
73 column_indexes_for_input_fields.resize(header.columns());
74
75 for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
76 column_indexes_for_input_fields[i] = i;
77}
78
79
80void TabSeparatedRowInputFormat::addInputColumn(const String & column_name)
81{
82 const auto column_it = column_indexes_by_names.find(column_name);
83 if (column_it == column_indexes_by_names.end())
84 {
85 if (format_settings.skip_unknown_fields)
86 {
87 column_indexes_for_input_fields.push_back(std::nullopt);
88 return;
89 }
90
91 throw Exception(
92 "Unknown field found in TSV header: '" + column_name + "' " +
93 "at position " + std::to_string(column_indexes_for_input_fields.size()) +
94 "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
95 ErrorCodes::INCORRECT_DATA
96 );
97 }
98
99 const auto column_index = column_it->second;
100
101 if (read_columns[column_index])
102 throw Exception("Duplicate field found while parsing TSV header: " + column_name, ErrorCodes::INCORRECT_DATA);
103
104 read_columns[column_index] = true;
105 column_indexes_for_input_fields.emplace_back(column_index);
106}
107
108
109void TabSeparatedRowInputFormat::fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension & row_read_extension)
110{
111 /// It is safe to memorize this on the first run - the format guarantees this does not change
112 if (unlikely(row_num == 1))
113 {
114 columns_to_fill_with_default_values.clear();
115 for (size_t index = 0; index < read_columns.size(); ++index)
116 if (read_columns[index] == 0)
117 columns_to_fill_with_default_values.push_back(index);
118 }
119
120 for (const auto column_index : columns_to_fill_with_default_values)
121 {
122 data_types[column_index]->insertDefaultInto(*columns[column_index]);
123 row_read_extension.read_columns[column_index] = false;
124 }
125}
126
127
128void TabSeparatedRowInputFormat::readPrefix()
129{
130 if (with_names || with_types)
131 {
132 /// In this format, we assume that column name or type cannot contain BOM,
133 /// so, if format has header,
134 /// then BOM at beginning of stream cannot be confused with name or type of field, and it is safe to skip it.
135 skipBOMIfExists(in);
136 }
137
138 if (with_names)
139 {
140 if (format_settings.with_names_use_header)
141 {
142 String column_name;
143 do
144 {
145 readEscapedString(column_name, in);
146 addInputColumn(column_name);
147 }
148 while (checkChar('\t', in));
149
150 if (!in.eof())
151 {
152 checkForCarriageReturn(in);
153 assertChar('\n', in);
154 }
155 }
156 else
157 {
158 setupAllColumnsByTableSchema();
159 skipTSVRow(in, column_indexes_for_input_fields.size());
160 }
161 }
162 else
163 setupAllColumnsByTableSchema();
164
165 if (with_types)
166 {
167 skipTSVRow(in, column_indexes_for_input_fields.size());
168 }
169}
170
171
172bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
173{
174 if (in.eof())
175 return false;
176
177 updateDiagnosticInfo();
178
179 ext.read_columns.assign(read_columns.size(), true);
180 for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
181 {
182 const auto & column_index = column_indexes_for_input_fields[file_column];
183 const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
184 if (column_index)
185 {
186 const auto & type = data_types[*column_index];
187 ext.read_columns[*column_index] = readField(*columns[*column_index], type, is_last_file_column);
188 }
189 else
190 {
191 NullSink null_sink;
192 readEscapedStringInto(null_sink, in);
193 }
194
195 /// skip separators
196 if (file_column + 1 < column_indexes_for_input_fields.size())
197 {
198 assertChar('\t', in);
199 }
200 else if (!in.eof())
201 {
202 if (unlikely(row_num == 1))
203 checkForCarriageReturn(in);
204
205 assertChar('\n', in);
206 }
207 }
208
209 fillUnreadColumnsWithDefaults(columns, ext);
210
211 return true;
212}
213
214
215bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
216{
217 const bool at_delimiter = !is_last_file_column && !in.eof() && *in.position() == '\t';
218 const bool at_last_column_line_end = is_last_file_column && (in.eof() || *in.position() == '\n');
219 if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
220 {
221 column.insertDefault();
222 return false;
223 }
224 else if (format_settings.null_as_default && !type->isNullable())
225 return DataTypeNullable::deserializeTextEscaped(column, in, format_settings, type);
226 type->deserializeAsTextEscaped(column, in, format_settings);
227 return true;
228}
229
230bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
231{
232 for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
233 {
234 if (file_column == 0 && in.eof())
235 {
236 out << "<End of stream>\n";
237 return false;
238 }
239
240 if (column_indexes_for_input_fields[file_column].has_value())
241 {
242 auto & header = getPort().getHeader();
243 size_t col_idx = column_indexes_for_input_fields[file_column].value();
244 if (!deserializeFieldAndPrintDiagnosticInfo(header.getByPosition(col_idx).name, data_types[col_idx], *columns[col_idx],
245 out, file_column))
246 return false;
247 }
248 else
249 {
250 static const String skipped_column_str = "<SKIPPED COLUMN>";
251 static const DataTypePtr skipped_column_type = std::make_shared<DataTypeNothing>();
252 static const MutableColumnPtr skipped_column = skipped_column_type->createColumn();
253 if (!deserializeFieldAndPrintDiagnosticInfo(skipped_column_str, skipped_column_type, *skipped_column, out, file_column))
254 return false;
255 }
256
257 /// Delimiters
258 if (file_column + 1 == column_indexes_for_input_fields.size())
259 {
260 if (!in.eof())
261 {
262 try
263 {
264 assertChar('\n', in);
265 }
266 catch (const DB::Exception &)
267 {
268 if (*in.position() == '\t')
269 {
270 out << "ERROR: Tab found where line feed is expected."
271 " It's like your file has more columns than expected.\n"
272 "And if your file have right number of columns, maybe it have unescaped tab in value.\n";
273 }
274 else if (*in.position() == '\r')
275 {
276 out << "ERROR: Carriage return found where line feed is expected."
277 " It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
278 }
279 else
280 {
281 out << "ERROR: There is no line feed. ";
282 verbosePrintString(in.position(), in.position() + 1, out);
283 out << " found instead.\n";
284 }
285 return false;
286 }
287 }
288 }
289 else
290 {
291 try
292 {
293 assertChar('\t', in);
294 }
295 catch (const DB::Exception &)
296 {
297 if (*in.position() == '\n')
298 {
299 out << "ERROR: Line feed found where tab is expected."
300 " It's like your file has less columns than expected.\n"
301 "And if your file have right number of columns, "
302 "maybe it have unescaped backslash in value before tab, which cause tab has escaped.\n";
303 }
304 else if (*in.position() == '\r')
305 {
306 out << "ERROR: Carriage return found where tab is expected.\n";
307 }
308 else
309 {
310 out << "ERROR: There is no tab. ";
311 verbosePrintString(in.position(), in.position() + 1, out);
312 out << " found instead.\n";
313 }
314 return false;
315 }
316 }
317 }
318
319 return true;
320}
321
322void TabSeparatedRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn & column, size_t file_column,
323 ReadBuffer::Position & prev_pos, ReadBuffer::Position & curr_pos)
324{
325 prev_pos = in.position();
326 if (column_indexes_for_input_fields[file_column])
327 {
328 const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
329 readField(column, type, is_last_file_column);
330 }
331 else
332 {
333 NullSink null_sink;
334 readEscapedStringInto(null_sink, in);
335 }
336 curr_pos = in.position();
337}
338
339void TabSeparatedRowInputFormat::syncAfterError()
340{
341 skipToUnescapedNextLineOrEOF(in);
342}
343
344void TabSeparatedRowInputFormat::resetParser()
345{
346 RowInputFormatWithDiagnosticInfo::resetParser();
347 column_indexes_for_input_fields.clear();
348 read_columns.clear();
349 columns_to_fill_with_default_values.clear();
350}
351
352void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
353{
354 for (auto name : {"TabSeparated", "TSV"})
355 {
356 factory.registerInputFormatProcessor(name, [](
357 ReadBuffer & buf,
358 const Block & sample,
359 IRowInputFormat::Params params,
360 const FormatSettings & settings)
361 {
362 return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, false, false, settings);
363 });
364 }
365
366 for (auto name : {"TabSeparatedWithNames", "TSVWithNames"})
367 {
368 factory.registerInputFormatProcessor(name, [](
369 ReadBuffer & buf,
370 const Block & sample,
371 IRowInputFormat::Params params,
372 const FormatSettings & settings)
373 {
374 return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, true, false, settings);
375 });
376 }
377
378 for (auto name : {"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes"})
379 {
380 factory.registerInputFormatProcessor(name, [](
381 ReadBuffer & buf,
382 const Block & sample,
383 IRowInputFormat::Params params,
384 const FormatSettings & settings)
385 {
386 return std::make_shared<TabSeparatedRowInputFormat>(sample, buf, params, true, true, settings);
387 });
388 }
389}
390
391static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
392{
393 bool need_more_data = true;
394 char * pos = in.position();
395
396 while (loadAtPosition(in, memory, pos) && need_more_data)
397 {
398 pos = find_first_symbols<'\\', '\r', '\n'>(pos, in.buffer().end());
399
400 if (pos == in.buffer().end())
401 continue;
402
403 if (*pos == '\\')
404 {
405 ++pos;
406 if (loadAtPosition(in, memory, pos))
407 ++pos;
408 }
409 else if (*pos == '\n' || *pos == '\r')
410 {
411 if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
412 need_more_data = false;
413 ++pos;
414 }
415 }
416
417 saveUpToPosition(in, memory, pos);
418
419 return loadAtPosition(in, memory, pos);
420}
421
422void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)
423{
424 // We can use the same segmentation engine for TSKV.
425 for (auto name : {"TabSeparated", "TSV", "TSKV"})
426 {
427 factory.registerFileSegmentationEngine(name, &fileSegmentationEngineTabSeparatedImpl);
428 }
429}
430
431}
432