1#include "duckdb/execution/operator/persistent/base_csv_reader.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/file_system.hpp"
5#include "duckdb/common/string_util.hpp"
6#include "duckdb/common/to_string.hpp"
7#include "duckdb/common/types/cast_helpers.hpp"
8#include "duckdb/common/operator/cast_operators.hpp"
9#include "duckdb/common/operator/decimal_cast_operators.hpp"
10#include "duckdb/common/vector_operations/unary_executor.hpp"
11#include "duckdb/common/vector_operations/vector_operations.hpp"
12#include "duckdb/function/scalar/strftime_format.hpp"
13#include "duckdb/main/database.hpp"
14#include "duckdb/parser/column_definition.hpp"
15#include "duckdb/storage/data_table.hpp"
16#include "utf8proc_wrapper.hpp"
17#include "utf8proc.hpp"
18#include "duckdb/parser/keyword_helper.hpp"
19#include "duckdb/main/error_manager.hpp"
20#include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp"
21
22#include <algorithm>
23#include <cctype>
24#include <cstring>
25#include <fstream>
26
27namespace duckdb {
28
29string BaseCSVReader::GetLineNumberStr(idx_t line_error, bool is_line_estimated, idx_t buffer_idx) {
30 // If an error happens during auto-detect it is an estimated line
31 string estimated = (is_line_estimated ? string(" (estimated)") : string(""));
32 return to_string(val: GetLineError(line_error, buffer_idx)) + estimated;
33}
34
35BaseCSVReader::BaseCSVReader(ClientContext &context_p, BufferedCSVReaderOptions options_p,
36 const vector<LogicalType> &requested_types)
37 : context(context_p), fs(FileSystem::GetFileSystem(context)), allocator(BufferAllocator::Get(context)),
38 options(std::move(options_p)) {
39}
40
41BaseCSVReader::~BaseCSVReader() {
42}
43
44unique_ptr<CSVFileHandle> BaseCSVReader::OpenCSV(const BufferedCSVReaderOptions &options_p) {
45 return CSVFileHandle::OpenFile(fs, allocator, path: options_p.file_path, compression: options_p.compression, enable_reset: true);
46}
47
48void BaseCSVReader::InitParseChunk(idx_t num_cols) {
49 // adapt not null info
50 if (options.force_not_null.size() != num_cols) {
51 options.force_not_null.resize(new_size: num_cols, x: false);
52 }
53 if (num_cols == parse_chunk.ColumnCount()) {
54 parse_chunk.Reset();
55 } else {
56 parse_chunk.Destroy();
57
58 // initialize the parse_chunk with a set of VARCHAR types
59 vector<LogicalType> varchar_types(num_cols, LogicalType::VARCHAR);
60 parse_chunk.Initialize(allocator, types: varchar_types);
61 }
62}
63
64void BaseCSVReader::InitializeProjection() {
65 for (idx_t i = 0; i < GetTypes().size(); i++) {
66 reader_data.column_ids.push_back(x: i);
67 reader_data.column_mapping.push_back(x: i);
68 }
69}
70
71void BaseCSVReader::SetDateFormat(const string &format_specifier, const LogicalTypeId &sql_type) {
72 options.has_format[sql_type] = true;
73 auto &date_format = options.date_format[sql_type];
74 date_format.format_specifier = format_specifier;
75 StrTimeFormat::ParseFormatSpecifier(format_string: date_format.format_specifier, format&: date_format);
76}
77
78struct TryCastDecimalOperator {
79 template <class OP, class T>
80 static bool Operation(string_t input, uint8_t width, uint8_t scale) {
81 T result;
82 string error_message;
83 return OP::Operation(input, result, &error_message, width, scale);
84 }
85};
86
87struct TryCastFloatingOperator {
88 template <class OP, class T>
89 static bool Operation(string_t input) {
90 T result;
91 string error_message;
92 return OP::Operation(input, result, &error_message);
93 }
94};
95
96bool TryCastDecimalValueCommaSeparated(const string_t &value_str, const LogicalType &sql_type) {
97 auto width = DecimalType::GetWidth(type: sql_type);
98 auto scale = DecimalType::GetScale(type: sql_type);
99 switch (sql_type.InternalType()) {
100 case PhysicalType::INT16:
101 return TryCastDecimalOperator::Operation<TryCastToDecimalCommaSeparated, int16_t>(input: value_str, width, scale);
102 case PhysicalType::INT32:
103 return TryCastDecimalOperator::Operation<TryCastToDecimalCommaSeparated, int32_t>(input: value_str, width, scale);
104 case PhysicalType::INT64:
105 return TryCastDecimalOperator::Operation<TryCastToDecimalCommaSeparated, int64_t>(input: value_str, width, scale);
106 case PhysicalType::INT128:
107 return TryCastDecimalOperator::Operation<TryCastToDecimalCommaSeparated, hugeint_t>(input: value_str, width, scale);
108 default:
109 throw InternalException("Unimplemented physical type for decimal");
110 }
111}
112
113bool TryCastFloatingValueCommaSeparated(const string_t &value_str, const LogicalType &sql_type) {
114 switch (sql_type.InternalType()) {
115 case PhysicalType::DOUBLE:
116 return TryCastFloatingOperator::Operation<TryCastErrorMessageCommaSeparated, double>(input: value_str);
117 case PhysicalType::FLOAT:
118 return TryCastFloatingOperator::Operation<TryCastErrorMessageCommaSeparated, float>(input: value_str);
119 default:
120 throw InternalException("Unimplemented physical type for floating");
121 }
122}
123
124bool BaseCSVReader::TryCastValue(const Value &value, const LogicalType &sql_type) {
125 if (value.IsNull()) {
126 return true;
127 }
128 if (options.has_format[LogicalTypeId::DATE] && sql_type.id() == LogicalTypeId::DATE) {
129 date_t result;
130 string error_message;
131 return options.date_format[LogicalTypeId::DATE].TryParseDate(str: string_t(StringValue::Get(value)), result,
132 error_message);
133 } else if (options.has_format[LogicalTypeId::TIMESTAMP] && sql_type.id() == LogicalTypeId::TIMESTAMP) {
134 timestamp_t result;
135 string error_message;
136 return options.date_format[LogicalTypeId::TIMESTAMP].TryParseTimestamp(str: string_t(StringValue::Get(value)),
137 result, error_message);
138 } else if (options.decimal_separator != "." && sql_type.id() == LogicalTypeId::DECIMAL) {
139 return TryCastDecimalValueCommaSeparated(value_str: string_t(StringValue::Get(value)), sql_type);
140 } else if (options.decimal_separator != "." &&
141 ((sql_type.id() == LogicalTypeId::FLOAT) || (sql_type.id() == LogicalTypeId::DOUBLE))) {
142 return TryCastFloatingValueCommaSeparated(value_str: string_t(StringValue::Get(value)), sql_type);
143 } else {
144 Value new_value;
145 string error_message;
146 return value.TryCastAs(context, target_type: sql_type, new_value, error_message: &error_message, strict: true);
147 }
148}
149
150struct TryCastDateOperator {
151 static bool Operation(BufferedCSVReaderOptions &options, string_t input, date_t &result, string &error_message) {
152 return options.date_format[LogicalTypeId::DATE].TryParseDate(str: input, result, error_message);
153 }
154};
155
156struct TryCastTimestampOperator {
157 static bool Operation(BufferedCSVReaderOptions &options, string_t input, timestamp_t &result,
158 string &error_message) {
159 return options.date_format[LogicalTypeId::TIMESTAMP].TryParseTimestamp(str: input, result, error_message);
160 }
161};
162
163template <class OP, class T>
164static bool TemplatedTryCastDateVector(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector,
165 idx_t count, string &error_message, idx_t &line_error) {
166 D_ASSERT(input_vector.GetType().id() == LogicalTypeId::VARCHAR);
167 bool all_converted = true;
168 idx_t cur_line = 0;
169 UnaryExecutor::Execute<string_t, T>(input_vector, result_vector, count, [&](string_t input) {
170 T result;
171 if (!OP::Operation(options, input, result, error_message)) {
172 line_error = cur_line;
173 all_converted = false;
174 }
175 cur_line++;
176 return result;
177 });
178 return all_converted;
179}
180
181bool TryCastDateVector(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector, idx_t count,
182 string &error_message, idx_t &line_error) {
183 return TemplatedTryCastDateVector<TryCastDateOperator, date_t>(options, input_vector, result_vector, count,
184 error_message, line_error);
185}
186
187bool TryCastTimestampVector(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector, idx_t count,
188 string &error_message) {
189 idx_t line_error;
190 return TemplatedTryCastDateVector<TryCastTimestampOperator, timestamp_t>(options, input_vector, result_vector,
191 count, error_message, line_error);
192}
193
194template <class OP, class T>
195bool TemplatedTryCastFloatingVector(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector,
196 idx_t count, string &error_message, idx_t &line_error) {
197 D_ASSERT(input_vector.GetType().id() == LogicalTypeId::VARCHAR);
198 bool all_converted = true;
199 idx_t row = 0;
200 UnaryExecutor::Execute<string_t, T>(input_vector, result_vector, count, [&](string_t input) {
201 T result;
202 if (!OP::Operation(input, result, &error_message)) {
203 line_error = row;
204 all_converted = false;
205 } else {
206 row++;
207 }
208 return result;
209 });
210 return all_converted;
211}
212
213template <class OP, class T>
214bool TemplatedTryCastDecimalVector(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector,
215 idx_t count, string &error_message, uint8_t width, uint8_t scale) {
216 D_ASSERT(input_vector.GetType().id() == LogicalTypeId::VARCHAR);
217 bool all_converted = true;
218 UnaryExecutor::Execute<string_t, T>(input_vector, result_vector, count, [&](string_t input) {
219 T result;
220 if (!OP::Operation(input, result, &error_message, width, scale)) {
221 all_converted = false;
222 }
223 return result;
224 });
225 return all_converted;
226}
227
228bool BaseCSVReader::TryCastVector(Vector &parse_chunk_col, idx_t size, const LogicalType &sql_type) {
229 // try vector-cast from string to sql_type
230 Vector dummy_result(sql_type);
231 if (options.has_format[LogicalTypeId::DATE] && sql_type == LogicalTypeId::DATE) {
232 // use the date format to cast the chunk
233 string error_message;
234 idx_t line_error;
235 return TryCastDateVector(options, input_vector&: parse_chunk_col, result_vector&: dummy_result, count: size, error_message, line_error);
236 } else if (options.has_format[LogicalTypeId::TIMESTAMP] && sql_type == LogicalTypeId::TIMESTAMP) {
237 // use the timestamp format to cast the chunk
238 string error_message;
239 return TryCastTimestampVector(options, input_vector&: parse_chunk_col, result_vector&: dummy_result, count: size, error_message);
240 } else {
241 // target type is not varchar: perform a cast
242 string error_message;
243 return VectorOperations::DefaultTryCast(source&: parse_chunk_col, result&: dummy_result, count: size, error_message: &error_message, strict: true);
244 }
245}
246
247void BaseCSVReader::AddValue(string_t str_val, idx_t &column, vector<idx_t> &escape_positions, bool has_quotes,
248 idx_t buffer_idx) {
249 auto length = str_val.GetSize();
250 if (length == 0 && column == 0) {
251 row_empty = true;
252 } else {
253 row_empty = false;
254 }
255 if (!return_types.empty() && column == return_types.size() && length == 0) {
256 // skip a single trailing delimiter in last column
257 return;
258 }
259 if (mode == ParserMode::SNIFFING_DIALECT) {
260 column++;
261 return;
262 }
263 if (column >= return_types.size()) {
264 if (options.ignore_errors) {
265 error_column_overflow = true;
266 return;
267 } else {
268 throw InvalidInputException(
269 "Error in file \"%s\", on line %s: expected %lld values per row, but got more. (%s)", options.file_path,
270 GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx).c_str(), return_types.size(),
271 options.ToString());
272 }
273 }
274
275 // insert the line number into the chunk
276 idx_t row_entry = parse_chunk.size();
277
278 // test against null string, but only if the value was not quoted
279 if ((!(has_quotes && !options.allow_quoted_nulls) || return_types[column].id() != LogicalTypeId::VARCHAR) &&
280 !options.force_not_null[column] && Equals::Operation(left: str_val, right: string_t(options.null_str))) {
281 FlatVector::SetNull(vector&: parse_chunk.data[column], idx: row_entry, is_null: true);
282 } else {
283 auto &v = parse_chunk.data[column];
284 auto parse_data = FlatVector::GetData<string_t>(vector&: v);
285 if (!escape_positions.empty()) {
286 // remove escape characters (if any)
287 string old_val = str_val.GetString();
288 string new_val = "";
289 idx_t prev_pos = 0;
290 for (idx_t i = 0; i < escape_positions.size(); i++) {
291 idx_t next_pos = escape_positions[i];
292 new_val += old_val.substr(pos: prev_pos, n: next_pos - prev_pos);
293
294 if (options.escape.empty() || options.escape == options.quote) {
295 prev_pos = next_pos + options.quote.size();
296 } else {
297 prev_pos = next_pos + options.escape.size();
298 }
299 }
300 new_val += old_val.substr(pos: prev_pos, n: old_val.size() - prev_pos);
301 escape_positions.clear();
302 parse_data[row_entry] = StringVector::AddStringOrBlob(vector&: v, data: string_t(new_val));
303 } else {
304 parse_data[row_entry] = str_val;
305 }
306 }
307
308 // move to the next column
309 column++;
310}
311
312bool BaseCSVReader::AddRow(DataChunk &insert_chunk, idx_t &column, string &error_message, idx_t buffer_idx) {
313 linenr++;
314
315 if (row_empty) {
316 row_empty = false;
317 if (return_types.size() != 1) {
318 if (mode == ParserMode::PARSING) {
319 FlatVector::SetNull(vector&: parse_chunk.data[0], idx: parse_chunk.size(), is_null: false);
320 }
321 column = 0;
322 return false;
323 }
324 }
325
326 // Error forwarded by 'ignore_errors' - originally encountered in 'AddValue'
327 if (error_column_overflow) {
328 D_ASSERT(options.ignore_errors);
329 error_column_overflow = false;
330 column = 0;
331 return false;
332 }
333
334 if (column < return_types.size() && mode != ParserMode::SNIFFING_DIALECT) {
335 if (options.null_padding) {
336 for (; column < return_types.size(); column++) {
337 FlatVector::SetNull(vector&: parse_chunk.data[column], idx: parse_chunk.size(), is_null: true);
338 }
339 } else if (options.ignore_errors) {
340 column = 0;
341 return false;
342 } else {
343 if (mode == ParserMode::SNIFFING_DATATYPES) {
344 error_message = "Error when adding line";
345 return false;
346 } else {
347 throw InvalidInputException(
348 "Error in file \"%s\" on line %s: expected %lld values per row, but got %d.\nParser options:\n%s",
349 options.file_path, GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx).c_str(),
350 return_types.size(), column, options.ToString());
351 }
352 }
353 }
354
355 if (mode == ParserMode::SNIFFING_DIALECT) {
356 sniffed_column_counts.push_back(x: column);
357
358 if (sniffed_column_counts.size() == options.sample_chunk_size) {
359 return true;
360 }
361 } else {
362 parse_chunk.SetCardinality(parse_chunk.size() + 1);
363 }
364
365 if (mode == ParserMode::PARSING_HEADER) {
366 return true;
367 }
368
369 if (mode == ParserMode::SNIFFING_DATATYPES && parse_chunk.size() == options.sample_chunk_size) {
370 return true;
371 }
372
373 if (mode == ParserMode::PARSING && parse_chunk.size() == STANDARD_VECTOR_SIZE) {
374 Flush(insert_chunk, buffer_idx);
375 return true;
376 }
377
378 column = 0;
379 return false;
380}
381
382void BaseCSVReader::VerifyUTF8(idx_t col_idx, idx_t row_idx, DataChunk &chunk, int64_t offset) {
383 D_ASSERT(col_idx < chunk.data.size());
384 D_ASSERT(row_idx < chunk.size());
385 auto &v = chunk.data[col_idx];
386 if (FlatVector::IsNull(vector: v, idx: row_idx)) {
387 return;
388 }
389
390 auto parse_data = FlatVector::GetData<string_t>(vector&: chunk.data[col_idx]);
391 auto s = parse_data[row_idx];
392 auto utf_type = Utf8Proc::Analyze(s: s.GetData(), len: s.GetSize());
393 if (utf_type == UnicodeType::INVALID) {
394 string col_name = to_string(val: col_idx);
395 if (col_idx < names.size()) {
396 col_name = "\"" + names[col_idx] + "\"";
397 }
398 int64_t error_line = linenr - (chunk.size() - row_idx) + 1 + offset;
399 D_ASSERT(error_line >= 0);
400 throw InvalidInputException("Error in file \"%s\" at line %llu in column \"%s\": "
401 "%s. Parser options:\n%s",
402 options.file_path, error_line, col_name,
403 ErrorManager::InvalidUnicodeError(input: s.GetString(), context: "CSV file"), options.ToString());
404 }
405}
406
407void BaseCSVReader::VerifyUTF8(idx_t col_idx) {
408 D_ASSERT(col_idx < parse_chunk.data.size());
409 for (idx_t i = 0; i < parse_chunk.size(); i++) {
410 VerifyUTF8(col_idx, row_idx: i, chunk&: parse_chunk);
411 }
412}
413
414bool TryCastDecimalVectorCommaSeparated(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector,
415 idx_t count, string &error_message, const LogicalType &result_type) {
416 auto width = DecimalType::GetWidth(type: result_type);
417 auto scale = DecimalType::GetScale(type: result_type);
418 switch (result_type.InternalType()) {
419 case PhysicalType::INT16:
420 return TemplatedTryCastDecimalVector<TryCastToDecimalCommaSeparated, int16_t>(
421 options, input_vector, result_vector, count, error_message, width, scale);
422 case PhysicalType::INT32:
423 return TemplatedTryCastDecimalVector<TryCastToDecimalCommaSeparated, int32_t>(
424 options, input_vector, result_vector, count, error_message, width, scale);
425 case PhysicalType::INT64:
426 return TemplatedTryCastDecimalVector<TryCastToDecimalCommaSeparated, int64_t>(
427 options, input_vector, result_vector, count, error_message, width, scale);
428 case PhysicalType::INT128:
429 return TemplatedTryCastDecimalVector<TryCastToDecimalCommaSeparated, hugeint_t>(
430 options, input_vector, result_vector, count, error_message, width, scale);
431 default:
432 throw InternalException("Unimplemented physical type for decimal");
433 }
434}
435
436bool TryCastFloatingVectorCommaSeparated(BufferedCSVReaderOptions &options, Vector &input_vector, Vector &result_vector,
437 idx_t count, string &error_message, const LogicalType &result_type,
438 idx_t &line_error) {
439 switch (result_type.InternalType()) {
440 case PhysicalType::DOUBLE:
441 return TemplatedTryCastFloatingVector<TryCastErrorMessageCommaSeparated, double>(
442 options, input_vector, result_vector, count, error_message, line_error);
443 case PhysicalType::FLOAT:
444 return TemplatedTryCastFloatingVector<TryCastErrorMessageCommaSeparated, float>(
445 options, input_vector, result_vector, count, error_message, line_error);
446 default:
447 throw InternalException("Unimplemented physical type for floating");
448 }
449}
450
451bool BaseCSVReader::Flush(DataChunk &insert_chunk, idx_t buffer_idx, bool try_add_line) {
452 if (parse_chunk.size() == 0) {
453 return true;
454 }
455
456 bool conversion_error_ignored = false;
457
458 // convert the columns in the parsed chunk to the types of the table
459 insert_chunk.SetCardinality(parse_chunk);
460 if (reader_data.column_ids.empty() && !reader_data.empty_columns) {
461 throw InternalException("BaseCSVReader::Flush called on a CSV reader that was not correctly initialized. Call "
462 "MultiFileReader::InitializeReader or InitializeProjection");
463 }
464 D_ASSERT(reader_data.column_ids.size() == reader_data.column_mapping.size());
465 for (idx_t c = 0; c < reader_data.column_ids.size(); c++) {
466 auto col_idx = reader_data.column_ids[c];
467 auto result_idx = reader_data.column_mapping[c];
468 auto &parse_vector = parse_chunk.data[col_idx];
469 auto &result_vector = insert_chunk.data[result_idx];
470 auto &type = result_vector.GetType();
471 if (type.id() == LogicalTypeId::VARCHAR) {
472 // target type is varchar: no need to convert
473 // just test that all strings are valid utf-8 strings
474 VerifyUTF8(col_idx);
475 // reinterpret rather than reference so we can deal with user-defined types
476 result_vector.Reinterpret(other&: parse_vector);
477 } else {
478 string error_message;
479 bool success;
480 idx_t line_error = 0;
481 bool target_type_not_varchar = false;
482 if (options.has_format[LogicalTypeId::DATE] && type.id() == LogicalTypeId::DATE) {
483 // use the date format to cast the chunk
484 success = TryCastDateVector(options, input_vector&: parse_vector, result_vector, count: parse_chunk.size(), error_message,
485 line_error);
486 } else if (options.has_format[LogicalTypeId::TIMESTAMP] && type.id() == LogicalTypeId::TIMESTAMP) {
487 // use the date format to cast the chunk
488 success =
489 TryCastTimestampVector(options, input_vector&: parse_vector, result_vector, count: parse_chunk.size(), error_message);
490 } else if (options.decimal_separator != "." &&
491 (type.id() == LogicalTypeId::FLOAT || type.id() == LogicalTypeId::DOUBLE)) {
492 success = TryCastFloatingVectorCommaSeparated(options, input_vector&: parse_vector, result_vector, count: parse_chunk.size(),
493 error_message, result_type: type, line_error);
494 } else if (options.decimal_separator != "." && type.id() == LogicalTypeId::DECIMAL) {
495 success = TryCastDecimalVectorCommaSeparated(options, input_vector&: parse_vector, result_vector, count: parse_chunk.size(),
496 error_message, result_type: type);
497 } else {
498 // target type is not varchar: perform a cast
499 target_type_not_varchar = true;
500 success =
501 VectorOperations::TryCast(context, source&: parse_vector, result&: result_vector, count: parse_chunk.size(), error_message: &error_message);
502 }
503 if (success) {
504 continue;
505 }
506 if (try_add_line) {
507 return false;
508 }
509 if (options.ignore_errors) {
510 conversion_error_ignored = true;
511 continue;
512 }
513 string col_name = to_string(val: col_idx);
514 if (col_idx < names.size()) {
515 col_name = "\"" + names[col_idx] + "\"";
516 }
517
518 // figure out the exact line number
519 if (target_type_not_varchar) {
520 UnifiedVectorFormat inserted_column_data;
521 result_vector.ToUnifiedFormat(count: parse_chunk.size(), data&: inserted_column_data);
522 for (; line_error < parse_chunk.size(); line_error++) {
523 if (!inserted_column_data.validity.RowIsValid(row_idx: line_error) &&
524 !FlatVector::IsNull(vector: parse_vector, idx: line_error)) {
525 break;
526 }
527 }
528 }
529
530 idx_t error_line;
531 // The line_error must be summed with linenr (All lines emmited from this batch)
532 // But subtracted from the parse_chunk
533 D_ASSERT(line_error + linenr >= parse_chunk.size());
534 line_error += linenr;
535 line_error -= parse_chunk.size();
536
537 error_line = GetLineError(line_error, buffer_idx);
538
539 if (options.auto_detect) {
540 throw InvalidInputException("%s in column %s, at line %llu.\n\nParser "
541 "options:\n%s.\n\nConsider either increasing the sample size "
542 "(SAMPLE_SIZE=X [X rows] or SAMPLE_SIZE=-1 [all rows]), "
543 "or skipping column conversion (ALL_VARCHAR=1)",
544 error_message, col_name, error_line, options.ToString());
545 } else {
546 throw InvalidInputException("%s at line %llu in column %s. Parser options:\n%s ", error_message,
547 error_line, col_name, options.ToString());
548 }
549 }
550 }
551 if (conversion_error_ignored) {
552 D_ASSERT(options.ignore_errors);
553 SelectionVector succesful_rows(parse_chunk.size());
554 idx_t sel_size = 0;
555
556 for (idx_t row_idx = 0; row_idx < parse_chunk.size(); row_idx++) {
557 bool failed = false;
558 for (idx_t c = 0; c < reader_data.column_ids.size(); c++) {
559 auto col_idx = reader_data.column_ids[c];
560 auto result_idx = reader_data.column_mapping[c];
561
562 auto &parse_vector = parse_chunk.data[col_idx];
563 auto &result_vector = insert_chunk.data[result_idx];
564
565 bool was_already_null = FlatVector::IsNull(vector: parse_vector, idx: row_idx);
566 if (!was_already_null && FlatVector::IsNull(vector: result_vector, idx: row_idx)) {
567 failed = true;
568 break;
569 }
570 }
571 if (!failed) {
572 succesful_rows.set_index(idx: sel_size++, loc: row_idx);
573 }
574 }
575 insert_chunk.Slice(sel_vector: succesful_rows, count: sel_size);
576 }
577 parse_chunk.Reset();
578 return true;
579}
580
581void BaseCSVReader::SetNewLineDelimiter(bool carry, bool carry_followed_by_nl) {
582 if ((mode == ParserMode::SNIFFING_DIALECT && !options.has_newline) ||
583 options.new_line == NewLineIdentifier::NOT_SET) {
584 if (options.new_line == NewLineIdentifier::MIX) {
585 return;
586 }
587 NewLineIdentifier this_line_identifier;
588 if (carry) {
589 if (carry_followed_by_nl) {
590 this_line_identifier = NewLineIdentifier::CARRY_ON;
591 } else {
592 this_line_identifier = NewLineIdentifier::SINGLE;
593 }
594 } else {
595 this_line_identifier = NewLineIdentifier::SINGLE;
596 }
597 if (options.new_line == NewLineIdentifier::NOT_SET) {
598 options.new_line = this_line_identifier;
599 return;
600 }
601 if (options.new_line != this_line_identifier) {
602 options.new_line = NewLineIdentifier::MIX;
603 return;
604 }
605 options.new_line = this_line_identifier;
606 }
607}
608} // namespace duckdb
609