1#include "duckdb/execution/operator/persistent/buffered_csv_reader.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/file_system.hpp"
5#include "duckdb/common/string_util.hpp"
6#include "duckdb/common/to_string.hpp"
7#include "duckdb/common/types/cast_helpers.hpp"
8#include "duckdb/common/vector_operations/unary_executor.hpp"
9#include "duckdb/common/vector_operations/vector_operations.hpp"
10#include "duckdb/function/scalar/strftime_format.hpp"
11#include "duckdb/main/database.hpp"
12#include "duckdb/parser/column_definition.hpp"
13#include "duckdb/storage/data_table.hpp"
14#include "utf8proc_wrapper.hpp"
15#include "utf8proc.hpp"
16#include "duckdb/parser/keyword_helper.hpp"
17#include "duckdb/main/error_manager.hpp"
18#include "duckdb/main/client_data.hpp"
19
20#include <algorithm>
21#include <cctype>
22#include <cstring>
23#include <fstream>
24
25namespace duckdb {
26
27BufferedCSVReader::BufferedCSVReader(ClientContext &context, BufferedCSVReaderOptions options_p,
28 const vector<LogicalType> &requested_types)
29 : BaseCSVReader(context, std::move(options_p), requested_types), buffer_size(0), position(0), start(0) {
30 file_handle = OpenCSV(options_p: options);
31 Initialize(requested_types);
32}
33
34BufferedCSVReader::BufferedCSVReader(ClientContext &context, string filename, BufferedCSVReaderOptions options_p,
35 const vector<LogicalType> &requested_types)
36 : BaseCSVReader(context, std::move(options_p), requested_types), buffer_size(0), position(0), start(0) {
37 options.file_path = std::move(filename);
38 file_handle = OpenCSV(options_p: options);
39 Initialize(requested_types);
40}
41
42enum class QuoteRule : uint8_t { QUOTES_RFC = 0, QUOTES_OTHER = 1, NO_QUOTES = 2 };
43
44static bool StartsWithNumericDate(string &separator, const string &value) {
45 auto begin = value.c_str();
46 auto end = begin + value.size();
47
48 // StrpTimeFormat::Parse will skip whitespace, so we can too
49 auto field1 = std::find_if_not(first: begin, last: end, pred: StringUtil::CharacterIsSpace);
50 if (field1 == end) {
51 return false;
52 }
53
54 // first numeric field must start immediately
55 if (!StringUtil::CharacterIsDigit(c: *field1)) {
56 return false;
57 }
58 auto literal1 = std::find_if_not(first: field1, last: end, pred: StringUtil::CharacterIsDigit);
59 if (literal1 == end) {
60 return false;
61 }
62
63 // second numeric field must exist
64 auto field2 = std::find_if(first: literal1, last: end, pred: StringUtil::CharacterIsDigit);
65 if (field2 == end) {
66 return false;
67 }
68 auto literal2 = std::find_if_not(first: field2, last: end, pred: StringUtil::CharacterIsDigit);
69 if (literal2 == end) {
70 return false;
71 }
72
73 // third numeric field must exist
74 auto field3 = std::find_if(first: literal2, last: end, pred: StringUtil::CharacterIsDigit);
75 if (field3 == end) {
76 return false;
77 }
78
79 // second literal must match first
80 if (((field3 - literal2) != (field2 - literal1)) || strncmp(s1: literal1, s2: literal2, n: (field2 - literal1)) != 0) {
81 return false;
82 }
83
84 // copy the literal as the separator, escaping percent signs
85 separator.clear();
86 while (literal1 < field2) {
87 const auto literal_char = *literal1++;
88 if (literal_char == '%') {
89 separator.push_back(c: literal_char);
90 }
91 separator.push_back(c: literal_char);
92 }
93
94 return true;
95}
96
97string GenerateDateFormat(const string &separator, const char *format_template) {
98 string format_specifier = format_template;
99 auto amount_of_dashes = std::count(first: format_specifier.begin(), last: format_specifier.end(), value: '-');
100 if (!amount_of_dashes) {
101 return format_specifier;
102 }
103 string result;
104 result.reserve(res_arg: format_specifier.size() - amount_of_dashes + (amount_of_dashes * separator.size()));
105 for (auto &character : format_specifier) {
106 if (character == '-') {
107 result += separator;
108 } else {
109 result += character;
110 }
111 }
112 return result;
113}
114
115TextSearchShiftArray::TextSearchShiftArray() {
116}
117
118TextSearchShiftArray::TextSearchShiftArray(string search_term) : length(search_term.size()) {
119 if (length > 255) {
120 throw InvalidInputException("Size of delimiter/quote/escape in CSV reader is limited to 255 bytes");
121 }
122 // initialize the shifts array
123 shifts = unique_ptr<uint8_t[]>(new uint8_t[length * 255]);
124 memset(s: shifts.get(), c: 0, n: length * 255 * sizeof(uint8_t));
125 // iterate over each of the characters in the array
126 for (idx_t main_idx = 0; main_idx < length; main_idx++) {
127 uint8_t current_char = (uint8_t)search_term[main_idx];
128 // now move over all the remaining positions
129 for (idx_t i = main_idx; i < length; i++) {
130 bool is_match = true;
131 // check if the prefix matches at this position
132 // if it does, we move to this position after encountering the current character
133 for (idx_t j = 0; j < main_idx; j++) {
134 if (search_term[i - main_idx + j] != search_term[j]) {
135 is_match = false;
136 }
137 }
138 if (!is_match) {
139 continue;
140 }
141 shifts[i * 255 + current_char] = main_idx + 1;
142 }
143 }
144}
145
146// Helper function to generate column names
147static string GenerateColumnName(const idx_t total_cols, const idx_t col_number, const string &prefix = "column") {
148 int max_digits = NumericHelper::UnsignedLength(value: total_cols - 1);
149 int digits = NumericHelper::UnsignedLength(value: col_number);
150 string leading_zeros = string(max_digits - digits, '0');
151 string value = to_string(val: col_number);
152 return string(prefix + leading_zeros + value);
153}
154
155// Helper function for UTF-8 aware space trimming
156static string TrimWhitespace(const string &col_name) {
157 utf8proc_int32_t codepoint;
158 auto str = reinterpret_cast<const utf8proc_uint8_t *>(col_name.c_str());
159 idx_t size = col_name.size();
160 // Find the first character that is not left trimmed
161 idx_t begin = 0;
162 while (begin < size) {
163 auto bytes = utf8proc_iterate(str: str + begin, strlen: size - begin, codepoint_ref: &codepoint);
164 D_ASSERT(bytes > 0);
165 if (utf8proc_category(codepoint) != UTF8PROC_CATEGORY_ZS) {
166 break;
167 }
168 begin += bytes;
169 }
170
171 // Find the last character that is not right trimmed
172 idx_t end;
173 end = begin;
174 for (auto next = begin; next < col_name.size();) {
175 auto bytes = utf8proc_iterate(str: str + next, strlen: size - next, codepoint_ref: &codepoint);
176 D_ASSERT(bytes > 0);
177 next += bytes;
178 if (utf8proc_category(codepoint) != UTF8PROC_CATEGORY_ZS) {
179 end = next;
180 }
181 }
182
183 // return the trimmed string
184 return col_name.substr(pos: begin, n: end - begin);
185}
186
187static string NormalizeColumnName(const string &col_name) {
188 // normalize UTF8 characters to NFKD
189 auto nfkd = utf8proc_NFKD(str: reinterpret_cast<const utf8proc_uint8_t *>(col_name.c_str()), len: col_name.size());
190 const string col_name_nfkd = string(const_char_ptr_cast(src: nfkd), strlen(s: const_char_ptr_cast(src: nfkd)));
191 free(ptr: nfkd);
192
193 // only keep ASCII characters 0-9 a-z A-Z and replace spaces with regular whitespace
194 string col_name_ascii = "";
195 for (idx_t i = 0; i < col_name_nfkd.size(); i++) {
196 if (col_name_nfkd[i] == '_' || (col_name_nfkd[i] >= '0' && col_name_nfkd[i] <= '9') ||
197 (col_name_nfkd[i] >= 'A' && col_name_nfkd[i] <= 'Z') ||
198 (col_name_nfkd[i] >= 'a' && col_name_nfkd[i] <= 'z')) {
199 col_name_ascii += col_name_nfkd[i];
200 } else if (StringUtil::CharacterIsSpace(c: col_name_nfkd[i])) {
201 col_name_ascii += " ";
202 }
203 }
204
205 // trim whitespace and replace remaining whitespace by _
206 string col_name_trimmed = TrimWhitespace(col_name: col_name_ascii);
207 string col_name_cleaned = "";
208 bool in_whitespace = false;
209 for (idx_t i = 0; i < col_name_trimmed.size(); i++) {
210 if (col_name_trimmed[i] == ' ') {
211 if (!in_whitespace) {
212 col_name_cleaned += "_";
213 in_whitespace = true;
214 }
215 } else {
216 col_name_cleaned += col_name_trimmed[i];
217 in_whitespace = false;
218 }
219 }
220
221 // don't leave string empty; if not empty, make lowercase
222 if (col_name_cleaned.empty()) {
223 col_name_cleaned = "_";
224 } else {
225 col_name_cleaned = StringUtil::Lower(str: col_name_cleaned);
226 }
227
228 // prepend _ if name starts with a digit or is a reserved keyword
229 if (KeywordHelper::IsKeyword(text: col_name_cleaned) || (col_name_cleaned[0] >= '0' && col_name_cleaned[0] <= '9')) {
230 col_name_cleaned = "_" + col_name_cleaned;
231 }
232 return col_name_cleaned;
233}
234
235void BufferedCSVReader::Initialize(const vector<LogicalType> &requested_types) {
236 PrepareComplexParser();
237 if (options.auto_detect) {
238 return_types = SniffCSV(requested_types);
239 if (return_types.empty()) {
240 throw InvalidInputException("Failed to detect column types from CSV: is the file a valid CSV file?");
241 }
242 JumpToBeginning(skip_rows: options.skip_rows, skip_header: options.header);
243 } else {
244 return_types = requested_types;
245 ResetBuffer();
246 SkipRowsAndReadHeader(skip_rows: options.skip_rows, skip_header: options.header);
247 }
248 InitParseChunk(num_cols: return_types.size());
249}
250
251void BufferedCSVReader::ResetBuffer() {
252 buffer.reset();
253 buffer_size = 0;
254 position = 0;
255 start = 0;
256 cached_buffers.clear();
257}
258
259void BufferedCSVReader::ResetStream() {
260 file_handle->Reset();
261 linenr = 0;
262 linenr_estimated = false;
263 bytes_per_line_avg = 0;
264 sample_chunk_idx = 0;
265 jumping_samples = false;
266}
267
268void BufferedCSVReader::JumpToBeginning(idx_t skip_rows = 0, bool skip_header = false) {
269 ResetBuffer();
270 ResetStream();
271 sample_chunk_idx = 0;
272 bytes_in_chunk = 0;
273 end_of_file_reached = false;
274 bom_checked = false;
275 SkipRowsAndReadHeader(skip_rows, skip_header);
276}
277
278void BufferedCSVReader::SkipRowsAndReadHeader(idx_t skip_rows, bool skip_header) {
279 for (idx_t i = 0; i < skip_rows; i++) {
280 // ignore skip rows
281 string read_line = file_handle->ReadLine();
282 linenr++;
283 }
284
285 if (skip_header) {
286 // ignore the first line as a header line
287 InitParseChunk(num_cols: return_types.size());
288 ParseCSV(mode: ParserMode::PARSING_HEADER);
289 }
290}
291
292void BufferedCSVReader::PrepareComplexParser() {
293 delimiter_search = TextSearchShiftArray(options.delimiter);
294 escape_search = TextSearchShiftArray(options.escape);
295 quote_search = TextSearchShiftArray(options.quote);
296}
297
298bool BufferedCSVReader::JumpToNextSample() {
299 // get bytes contained in the previously read chunk
300 idx_t remaining_bytes_in_buffer = buffer_size - start;
301 bytes_in_chunk -= remaining_bytes_in_buffer;
302 if (remaining_bytes_in_buffer == 0) {
303 return false;
304 }
305
306 // assess if it makes sense to jump, based on size of the first chunk relative to size of the entire file
307 if (sample_chunk_idx == 0) {
308 idx_t bytes_first_chunk = bytes_in_chunk;
309 double chunks_fit = (file_handle->FileSize() / (double)bytes_first_chunk);
310 jumping_samples = chunks_fit >= options.sample_chunks;
311
312 // jump back to the beginning
313 JumpToBeginning(skip_rows: options.skip_rows, skip_header: options.header);
314 sample_chunk_idx++;
315 return true;
316 }
317
318 if (end_of_file_reached || sample_chunk_idx >= options.sample_chunks) {
319 return false;
320 }
321
322 // if we deal with any other sources than plaintext files, jumping_samples can be tricky. In that case
323 // we just read x continuous chunks from the stream TODO: make jumps possible for zipfiles.
324 if (!file_handle->OnDiskFile() || !jumping_samples) {
325 sample_chunk_idx++;
326 return true;
327 }
328
329 // update average bytes per line
330 double bytes_per_line = bytes_in_chunk / (double)options.sample_chunk_size;
331 bytes_per_line_avg = ((bytes_per_line_avg * (sample_chunk_idx)) + bytes_per_line) / (sample_chunk_idx + 1);
332
333 // if none of the previous conditions were met, we can jump
334 idx_t partition_size = (idx_t)round(x: file_handle->FileSize() / (double)options.sample_chunks);
335
336 // calculate offset to end of the current partition
337 int64_t offset = partition_size - bytes_in_chunk - remaining_bytes_in_buffer;
338 auto current_pos = file_handle->SeekPosition();
339
340 if (current_pos + offset < file_handle->FileSize()) {
341 // set position in stream and clear failure bits
342 file_handle->Seek(position: current_pos + offset);
343
344 // estimate linenr
345 linenr += (idx_t)round(x: (offset + remaining_bytes_in_buffer) / bytes_per_line_avg);
346 linenr_estimated = true;
347 } else {
348 // seek backwards from the end in last chunk and hope to catch the end of the file
349 // TODO: actually it would be good to make sure that the end of file is being reached, because
350 // messy end-lines are quite common. For this case, however, we first need a skip_end detection anyways.
351 file_handle->Seek(position: file_handle->FileSize() - bytes_in_chunk);
352
353 // estimate linenr
354 linenr = (idx_t)round(x: (file_handle->FileSize() - bytes_in_chunk) / bytes_per_line_avg);
355 linenr_estimated = true;
356 }
357
358 // reset buffers and parse chunk
359 ResetBuffer();
360
361 // seek beginning of next line
362 // FIXME: if this jump ends up in a quoted linebreak, we will have a problem
363 string read_line = file_handle->ReadLine();
364 linenr++;
365
366 sample_chunk_idx++;
367
368 return true;
369}
370
371void BufferedCSVReader::DetectDialect(const vector<LogicalType> &requested_types,
372 BufferedCSVReaderOptions &original_options,
373 vector<BufferedCSVReaderOptions> &info_candidates, idx_t &best_num_cols) {
374 // set up the candidates we consider for delimiter and quote rules based on user input
375 vector<string> delim_candidates;
376 vector<QuoteRule> quoterule_candidates;
377 vector<vector<string>> quote_candidates_map;
378 vector<vector<string>> escape_candidates_map = {{""}, {"\\"}, {""}};
379
380 if (options.has_delimiter) {
381 // user provided a delimiter: use that delimiter
382 delim_candidates = {options.delimiter};
383 } else {
384 // no delimiter provided: try standard/common delimiters
385 delim_candidates = {",", "|", ";", "\t"};
386 }
387 if (options.has_quote) {
388 // user provided quote: use that quote rule
389 quote_candidates_map = {{options.quote}, {options.quote}, {options.quote}};
390 } else {
391 // no quote rule provided: use standard/common quotes
392 quote_candidates_map = {{"\""}, {"\"", "'"}, {""}};
393 }
394 if (options.has_escape) {
395 // user provided escape: use that escape rule
396 if (options.escape.empty()) {
397 quoterule_candidates = {QuoteRule::QUOTES_RFC};
398 } else {
399 quoterule_candidates = {QuoteRule::QUOTES_OTHER};
400 }
401 escape_candidates_map[static_cast<uint8_t>(quoterule_candidates[0])] = {options.escape};
402 } else {
403 // no escape provided: try standard/common escapes
404 quoterule_candidates = {QuoteRule::QUOTES_RFC, QuoteRule::QUOTES_OTHER, QuoteRule::NO_QUOTES};
405 }
406
407 idx_t best_consistent_rows = 0;
408 idx_t prev_padding_count = 0;
409 for (auto quoterule : quoterule_candidates) {
410 const auto &quote_candidates = quote_candidates_map[static_cast<uint8_t>(quoterule)];
411 for (const auto &quote : quote_candidates) {
412 for (const auto &delim : delim_candidates) {
413 const auto &escape_candidates = escape_candidates_map[static_cast<uint8_t>(quoterule)];
414 for (const auto &escape : escape_candidates) {
415 BufferedCSVReaderOptions sniff_info = original_options;
416 sniff_info.delimiter = delim;
417 sniff_info.quote = quote;
418 sniff_info.escape = escape;
419
420 options = sniff_info;
421 PrepareComplexParser();
422
423 JumpToBeginning(skip_rows: original_options.skip_rows);
424 sniffed_column_counts.clear();
425 if (!TryParseCSV(mode: ParserMode::SNIFFING_DIALECT)) {
426 continue;
427 }
428
429 idx_t start_row = original_options.skip_rows;
430 idx_t consistent_rows = 0;
431 idx_t num_cols = sniffed_column_counts.empty() ? 0 : sniffed_column_counts[0];
432 idx_t padding_count = 0;
433 bool allow_padding = original_options.null_padding;
434 for (idx_t row = 0; row < sniffed_column_counts.size(); row++) {
435 if (sniffed_column_counts[row] == num_cols) {
436 consistent_rows++;
437 } else if (num_cols < sniffed_column_counts[row] && !original_options.skip_rows_set) {
438 // we use the maximum amount of num_cols that we find
439 num_cols = sniffed_column_counts[row];
440 start_row = row + original_options.skip_rows;
441 consistent_rows = 1;
442 padding_count = 0;
443 } else if (num_cols >= sniffed_column_counts[row] && allow_padding) {
444 // we are missing some columns, we can parse this as long as we add padding
445 padding_count++;
446 }
447 }
448
449 // some logic
450 consistent_rows += padding_count;
451 bool more_values = (consistent_rows > best_consistent_rows && num_cols >= best_num_cols);
452 bool require_more_padding = padding_count > prev_padding_count;
453 bool require_less_padding = padding_count < prev_padding_count;
454 bool single_column_before = best_num_cols < 2 && num_cols > best_num_cols;
455 bool rows_consistent =
456 start_row + consistent_rows - original_options.skip_rows == sniffed_column_counts.size();
457 bool more_than_one_row = (consistent_rows > 1);
458 bool more_than_one_column = (num_cols > 1);
459 bool start_good = !info_candidates.empty() && (start_row <= info_candidates.front().skip_rows);
460
461 if (!requested_types.empty() && requested_types.size() != num_cols) {
462 continue;
463 } else if (rows_consistent && (single_column_before || (more_values && !require_more_padding) ||
464 (more_than_one_column && require_less_padding))) {
465 sniff_info.skip_rows = start_row;
466 sniff_info.num_cols = num_cols;
467 sniff_info.new_line = options.new_line;
468 best_consistent_rows = consistent_rows;
469 best_num_cols = num_cols;
470 prev_padding_count = padding_count;
471
472 info_candidates.clear();
473 info_candidates.push_back(x: sniff_info);
474 } else if (more_than_one_row && more_than_one_column && start_good && rows_consistent &&
475 !require_more_padding) {
476 bool same_quote_is_candidate = false;
477 for (auto &info_candidate : info_candidates) {
478 if (quote.compare(str: info_candidate.quote) == 0) {
479 same_quote_is_candidate = true;
480 }
481 }
482 if (!same_quote_is_candidate) {
483 sniff_info.skip_rows = start_row;
484 sniff_info.num_cols = num_cols;
485 sniff_info.new_line = options.new_line;
486 info_candidates.push_back(x: sniff_info);
487 }
488 }
489 }
490 }
491 }
492 }
493}
494
495void BufferedCSVReader::DetectCandidateTypes(const vector<LogicalType> &type_candidates,
496 const map<LogicalTypeId, vector<const char *>> &format_template_candidates,
497 const vector<BufferedCSVReaderOptions> &info_candidates,
498 BufferedCSVReaderOptions &original_options, idx_t best_num_cols,
499 vector<vector<LogicalType>> &best_sql_types_candidates,
500 std::map<LogicalTypeId, vector<string>> &best_format_candidates,
501 DataChunk &best_header_row) {
502 BufferedCSVReaderOptions best_options;
503 idx_t min_varchar_cols = best_num_cols + 1;
504
505 // check which info candidate leads to minimum amount of non-varchar columns...
506 for (const auto &t : format_template_candidates) {
507 best_format_candidates[t.first].clear();
508 }
509 for (auto &info_candidate : info_candidates) {
510 options = info_candidate;
511 vector<vector<LogicalType>> info_sql_types_candidates(options.num_cols, type_candidates);
512 std::map<LogicalTypeId, bool> has_format_candidates;
513 std::map<LogicalTypeId, vector<string>> format_candidates;
514 for (const auto &t : format_template_candidates) {
515 has_format_candidates[t.first] = false;
516 format_candidates[t.first].clear();
517 }
518
519 // set all return_types to VARCHAR so we can do datatype detection based on VARCHAR values
520 return_types.clear();
521 return_types.assign(n: options.num_cols, val: LogicalType::VARCHAR);
522
523 // jump to beginning and skip potential header
524 JumpToBeginning(skip_rows: options.skip_rows, skip_header: true);
525 DataChunk header_row;
526 header_row.Initialize(allocator, types: return_types);
527 parse_chunk.Copy(other&: header_row);
528
529 if (header_row.size() == 0) {
530 continue;
531 }
532
533 // init parse chunk and read csv with info candidate
534 InitParseChunk(num_cols: return_types.size());
535 if (!TryParseCSV(mode: ParserMode::SNIFFING_DATATYPES)) {
536 continue;
537 }
538 for (idx_t row_idx = 0; row_idx <= parse_chunk.size(); row_idx++) {
539 bool is_header_row = row_idx == 0;
540 idx_t row = row_idx - 1;
541 for (idx_t col = 0; col < parse_chunk.ColumnCount(); col++) {
542 auto &col_type_candidates = info_sql_types_candidates[col];
543 while (col_type_candidates.size() > 1) {
544 const auto &sql_type = col_type_candidates.back();
545 // try cast from string to sql_type
546 Value dummy_val;
547 if (is_header_row) {
548 VerifyUTF8(col_idx: col, row_idx: 0, chunk&: header_row, offset: -int64_t(parse_chunk.size()));
549 dummy_val = header_row.GetValue(col_idx: col, index: 0);
550 } else {
551 VerifyUTF8(col_idx: col, row_idx: row, chunk&: parse_chunk);
552 dummy_val = parse_chunk.GetValue(col_idx: col, index: row);
553 }
554 // try formatting for date types if the user did not specify one and it starts with numeric values.
555 string separator;
556 if (has_format_candidates.count(x: sql_type.id()) && !original_options.has_format[sql_type.id()] &&
557 !dummy_val.IsNull() && StartsWithNumericDate(separator, value: StringValue::Get(value: dummy_val))) {
558 // generate date format candidates the first time through
559 auto &type_format_candidates = format_candidates[sql_type.id()];
560 const auto had_format_candidates = has_format_candidates[sql_type.id()];
561 if (!has_format_candidates[sql_type.id()]) {
562 has_format_candidates[sql_type.id()] = true;
563 // order by preference
564 auto entry = format_template_candidates.find(x: sql_type.id());
565 if (entry != format_template_candidates.end()) {
566 const auto &format_template_list = entry->second;
567 for (const auto &t : format_template_list) {
568 const auto format_string = GenerateDateFormat(separator, format_template: t);
569 // don't parse ISO 8601
570 if (format_string.find(s: "%Y-%m-%d") == string::npos) {
571 type_format_candidates.emplace_back(args: format_string);
572 }
573 }
574 }
575 // initialise the first candidate
576 options.has_format[sql_type.id()] = true;
577 // all formats are constructed to be valid
578 SetDateFormat(format_specifier: type_format_candidates.back(), sql_type: sql_type.id());
579 }
580 // check all formats and keep the first one that works
581 StrpTimeFormat::ParseResult result;
582 auto save_format_candidates = type_format_candidates;
583 while (!type_format_candidates.empty()) {
584 // avoid using exceptions for flow control...
585 auto &current_format = options.date_format[sql_type.id()];
586 if (current_format.Parse(str: StringValue::Get(value: dummy_val), result)) {
587 break;
588 }
589 // doesn't work - move to the next one
590 type_format_candidates.pop_back();
591 options.has_format[sql_type.id()] = (!type_format_candidates.empty());
592 if (!type_format_candidates.empty()) {
593 SetDateFormat(format_specifier: type_format_candidates.back(), sql_type: sql_type.id());
594 }
595 }
596 // if none match, then this is not a value of type sql_type,
597 if (type_format_candidates.empty()) {
598 // so restore the candidates that did work.
599 // or throw them out if they were generated by this value.
600 if (had_format_candidates) {
601 type_format_candidates.swap(x&: save_format_candidates);
602 if (!type_format_candidates.empty()) {
603 SetDateFormat(format_specifier: type_format_candidates.back(), sql_type: sql_type.id());
604 }
605 } else {
606 has_format_candidates[sql_type.id()] = false;
607 }
608 }
609 }
610 // try cast from string to sql_type
611 if (TryCastValue(value: dummy_val, sql_type)) {
612 break;
613 } else {
614 col_type_candidates.pop_back();
615 }
616 }
617 }
618 // reset type detection, because first row could be header,
619 // but only do it if csv has more than one line (including header)
620 if (parse_chunk.size() > 0 && is_header_row) {
621 info_sql_types_candidates = vector<vector<LogicalType>>(options.num_cols, type_candidates);
622 for (auto &f : format_candidates) {
623 f.second.clear();
624 }
625 for (auto &h : has_format_candidates) {
626 h.second = false;
627 }
628 }
629 }
630
631 idx_t varchar_cols = 0;
632 for (idx_t col = 0; col < parse_chunk.ColumnCount(); col++) {
633 auto &col_type_candidates = info_sql_types_candidates[col];
634 // check number of varchar columns
635 const auto &col_type = col_type_candidates.back();
636 if (col_type == LogicalType::VARCHAR) {
637 varchar_cols++;
638 }
639 }
640
641 // it's good if the dialect creates more non-varchar columns, but only if we sacrifice < 30% of best_num_cols.
642 if (varchar_cols < min_varchar_cols && parse_chunk.ColumnCount() > (best_num_cols * 0.7)) {
643 // we have a new best_options candidate
644 best_options = info_candidate;
645 min_varchar_cols = varchar_cols;
646 best_sql_types_candidates = info_sql_types_candidates;
647 best_format_candidates = format_candidates;
648 best_header_row.Destroy();
649 auto header_row_types = header_row.GetTypes();
650 best_header_row.Initialize(allocator, types: header_row_types);
651 header_row.Copy(other&: best_header_row);
652 }
653 }
654
655 options = best_options;
656 for (const auto &best : best_format_candidates) {
657 if (!best.second.empty()) {
658 SetDateFormat(format_specifier: best.second.back(), sql_type: best.first);
659 }
660 }
661}
662
663void BufferedCSVReader::DetectHeader(const vector<vector<LogicalType>> &best_sql_types_candidates,
664 const DataChunk &best_header_row) {
665 // information for header detection
666 bool first_row_consistent = true;
667 bool first_row_nulls = false;
668
669 // check if header row is all null and/or consistent with detected column data types
670 first_row_nulls = true;
671 for (idx_t col = 0; col < best_sql_types_candidates.size(); col++) {
672 auto dummy_val = best_header_row.GetValue(col_idx: col, index: 0);
673 if (!dummy_val.IsNull()) {
674 first_row_nulls = false;
675 }
676
677 // try cast to sql_type of column
678 const auto &sql_type = best_sql_types_candidates[col].back();
679 if (!TryCastValue(value: dummy_val, sql_type)) {
680 first_row_consistent = false;
681 }
682 }
683
684 // update parser info, and read, generate & set col_names based on previous findings
685 if (((!first_row_consistent || first_row_nulls) && !options.has_header) || (options.has_header && options.header)) {
686 options.header = true;
687 case_insensitive_map_t<idx_t> name_collision_count;
688 // get header names from CSV
689 for (idx_t col = 0; col < options.num_cols; col++) {
690 const auto &val = best_header_row.GetValue(col_idx: col, index: 0);
691 string col_name = val.ToString();
692
693 // generate name if field is empty
694 if (col_name.empty() || val.IsNull()) {
695 col_name = GenerateColumnName(total_cols: options.num_cols, col_number: col);
696 }
697
698 // normalize names or at least trim whitespace
699 if (options.normalize_names) {
700 col_name = NormalizeColumnName(col_name);
701 } else {
702 col_name = TrimWhitespace(col_name);
703 }
704
705 // avoid duplicate header names
706 const string col_name_raw = col_name;
707 while (name_collision_count.find(x: col_name) != name_collision_count.end()) {
708 name_collision_count[col_name] += 1;
709 col_name = col_name + "_" + to_string(val: name_collision_count[col_name]);
710 }
711
712 names.push_back(x: col_name);
713 name_collision_count[col_name] = 0;
714 }
715
716 } else {
717 options.header = false;
718 for (idx_t col = 0; col < options.num_cols; col++) {
719 string column_name = GenerateColumnName(total_cols: options.num_cols, col_number: col);
720 names.push_back(x: column_name);
721 }
722 }
723 for (idx_t i = 0; i < MinValue<idx_t>(a: names.size(), b: options.name_list.size()); i++) {
724 names[i] = options.name_list[i];
725 }
726}
727
728vector<LogicalType> BufferedCSVReader::RefineTypeDetection(const vector<LogicalType> &type_candidates,
729 const vector<LogicalType> &requested_types,
730 vector<vector<LogicalType>> &best_sql_types_candidates,
731 map<LogicalTypeId, vector<string>> &best_format_candidates) {
732 // for the type refine we set the SQL types to VARCHAR for all columns
733 return_types.clear();
734 return_types.assign(n: options.num_cols, val: LogicalType::VARCHAR);
735
736 vector<LogicalType> detected_types;
737
738 // if data types were provided, exit here if number of columns does not match
739 if (!requested_types.empty()) {
740 if (requested_types.size() != options.num_cols) {
741 throw InvalidInputException(
742 "Error while determining column types: found %lld columns but expected %d. (%s)", options.num_cols,
743 requested_types.size(), options.ToString());
744 } else {
745 detected_types = requested_types;
746 }
747 } else if (options.all_varchar) {
748 // return all types varchar
749 detected_types = return_types;
750 } else {
751 // jump through the rest of the file and continue to refine the sql type guess
752 while (JumpToNextSample()) {
753 InitParseChunk(num_cols: return_types.size());
754 // if jump ends up a bad line, we just skip this chunk
755 if (!TryParseCSV(mode: ParserMode::SNIFFING_DATATYPES)) {
756 continue;
757 }
758 for (idx_t col = 0; col < parse_chunk.ColumnCount(); col++) {
759 vector<LogicalType> &col_type_candidates = best_sql_types_candidates[col];
760 while (col_type_candidates.size() > 1) {
761 const auto &sql_type = col_type_candidates.back();
762 // narrow down the date formats
763 if (best_format_candidates.count(x: sql_type.id())) {
764 auto &best_type_format_candidates = best_format_candidates[sql_type.id()];
765 auto save_format_candidates = best_type_format_candidates;
766 while (!best_type_format_candidates.empty()) {
767 if (TryCastVector(parse_chunk_col&: parse_chunk.data[col], size: parse_chunk.size(), sql_type)) {
768 break;
769 }
770 // doesn't work - move to the next one
771 best_type_format_candidates.pop_back();
772 options.has_format[sql_type.id()] = (!best_type_format_candidates.empty());
773 if (!best_type_format_candidates.empty()) {
774 SetDateFormat(format_specifier: best_type_format_candidates.back(), sql_type: sql_type.id());
775 }
776 }
777 // if none match, then this is not a column of type sql_type,
778 if (best_type_format_candidates.empty()) {
779 // so restore the candidates that did work.
780 best_type_format_candidates.swap(x&: save_format_candidates);
781 if (!best_type_format_candidates.empty()) {
782 SetDateFormat(format_specifier: best_type_format_candidates.back(), sql_type: sql_type.id());
783 }
784 }
785 }
786
787 if (TryCastVector(parse_chunk_col&: parse_chunk.data[col], size: parse_chunk.size(), sql_type)) {
788 break;
789 } else {
790 col_type_candidates.pop_back();
791 }
792 }
793 }
794 }
795
796 // set sql types
797 for (auto &best_sql_types_candidate : best_sql_types_candidates) {
798 LogicalType d_type = best_sql_types_candidate.back();
799 if (best_sql_types_candidate.size() == type_candidates.size()) {
800 d_type = LogicalType::VARCHAR;
801 }
802 detected_types.push_back(x: d_type);
803 }
804 }
805
806 return detected_types;
807}
808
809string BufferedCSVReader::ColumnTypesError(case_insensitive_map_t<idx_t> sql_types_per_column,
810 const vector<string> &names) {
811 for (idx_t i = 0; i < names.size(); i++) {
812 auto it = sql_types_per_column.find(x: names[i]);
813 if (it != sql_types_per_column.end()) {
814 sql_types_per_column.erase(x: names[i]);
815 continue;
816 }
817 }
818 if (sql_types_per_column.empty()) {
819 return string();
820 }
821 string exception = "COLUMN_TYPES error: Columns with names: ";
822 for (auto &col : sql_types_per_column) {
823 exception += "\"" + col.first + "\",";
824 }
825 exception.pop_back();
826 exception += " do not exist in the CSV File";
827 return exception;
828}
829
830vector<LogicalType> BufferedCSVReader::SniffCSV(const vector<LogicalType> &requested_types) {
831 for (auto &type : requested_types) {
832 // auto detect for blobs not supported: there may be invalid UTF-8 in the file
833 if (type.id() == LogicalTypeId::BLOB) {
834 return requested_types;
835 }
836 }
837
838 // #######
839 // ### dialect detection
840 // #######
841 BufferedCSVReaderOptions original_options = options;
842 vector<BufferedCSVReaderOptions> info_candidates;
843 idx_t best_num_cols = 0;
844
845 DetectDialect(requested_types, original_options, info_candidates, best_num_cols);
846
847 // if no dialect candidate was found, then file was most likely empty and we throw an exception
848 if (info_candidates.empty()) {
849 throw InvalidInputException(
850 "Error in file \"%s\": CSV options could not be auto-detected. Consider setting parser options manually.",
851 options.file_path);
852 }
853
854 // #######
855 // ### type detection (initial)
856 // #######
857
858 // format template candidates, ordered by descending specificity (~ from high to low)
859 std::map<LogicalTypeId, vector<const char *>> format_template_candidates = {
860 {LogicalTypeId::DATE, {"%m-%d-%Y", "%m-%d-%y", "%d-%m-%Y", "%d-%m-%y", "%Y-%m-%d", "%y-%m-%d"}},
861 {LogicalTypeId::TIMESTAMP,
862 {"%Y-%m-%d %H:%M:%S.%f", "%m-%d-%Y %I:%M:%S %p", "%m-%d-%y %I:%M:%S %p", "%d-%m-%Y %H:%M:%S",
863 "%d-%m-%y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%y-%m-%d %H:%M:%S"}},
864 };
865 vector<vector<LogicalType>> best_sql_types_candidates;
866 map<LogicalTypeId, vector<string>> best_format_candidates;
867 DataChunk best_header_row;
868 DetectCandidateTypes(type_candidates: options.auto_type_candidates, format_template_candidates, info_candidates, original_options,
869 best_num_cols, best_sql_types_candidates, best_format_candidates, best_header_row);
870
871 if (best_format_candidates.empty() || best_header_row.size() == 0) {
872 throw InvalidInputException(
873 "Error in file \"%s\": CSV options could not be auto-detected. Consider setting parser options manually.",
874 original_options.file_path);
875 }
876
877 // #######
878 // ### header detection
879 // #######
880 options.num_cols = best_num_cols;
881 DetectHeader(best_sql_types_candidates, best_header_row);
882 if (!options.sql_type_list.empty()) {
883 // user-defined types were supplied for certain columns
884 // override the types
885 if (!options.sql_types_per_column.empty()) {
886 // types supplied as name -> value map
887 idx_t found = 0;
888 for (idx_t i = 0; i < names.size(); i++) {
889 auto it = options.sql_types_per_column.find(x: names[i]);
890 if (it != options.sql_types_per_column.end()) {
891 best_sql_types_candidates[i] = {options.sql_type_list[it->second]};
892 found++;
893 continue;
894 }
895 }
896 if (!options.file_options.union_by_name && found < options.sql_types_per_column.size()) {
897 string exception = ColumnTypesError(sql_types_per_column: options.sql_types_per_column, names);
898 if (!exception.empty()) {
899 throw BinderException(exception);
900 }
901 }
902 } else {
903 // types supplied as list
904 if (names.size() < options.sql_type_list.size()) {
905 throw BinderException("read_csv: %d types were provided, but CSV file only has %d columns",
906 options.sql_type_list.size(), names.size());
907 }
908 for (idx_t i = 0; i < options.sql_type_list.size(); i++) {
909 best_sql_types_candidates[i] = {options.sql_type_list[i]};
910 }
911 }
912 }
913
914 // #######
915 // ### type detection (refining)
916 // #######
917 return RefineTypeDetection(type_candidates: options.auto_type_candidates, requested_types, best_sql_types_candidates,
918 best_format_candidates);
919}
920
921bool BufferedCSVReader::TryParseComplexCSV(DataChunk &insert_chunk, string &error_message) {
922 // used for parsing algorithm
923 bool finished_chunk = false;
924 idx_t column = 0;
925 vector<idx_t> escape_positions;
926 bool has_quotes = false;
927 uint8_t delimiter_pos = 0, escape_pos = 0, quote_pos = 0;
928 idx_t offset = 0;
929 idx_t line_start = 0;
930 // read values into the buffer (if any)
931 if (position >= buffer_size) {
932 if (!ReadBuffer(start, line_start)) {
933 return true;
934 }
935 }
936 // start parsing the first value
937 start = position;
938 goto value_start;
939value_start:
940 /* state: value_start */
941 // this state parses the first characters of a value
942 offset = 0;
943 delimiter_pos = 0;
944 quote_pos = 0;
945 do {
946 idx_t count = 0;
947 for (; position < buffer_size; position++) {
948 quote_search.Match(position&: quote_pos, byte_value: buffer[position]);
949 delimiter_search.Match(position&: delimiter_pos, byte_value: buffer[position]);
950 count++;
951 if (delimiter_pos == options.delimiter.size()) {
952 // found a delimiter, add the value
953 offset = options.delimiter.size() - 1;
954 goto add_value;
955 } else if (StringUtil::CharacterIsNewline(c: buffer[position])) {
956 // found a newline, add the row
957 goto add_row;
958 }
959 if (count > quote_pos) {
960 // did not find a quote directly at the start of the value, stop looking for the quote now
961 goto normal;
962 }
963 if (quote_pos == options.quote.size()) {
964 // found a quote, go to quoted loop and skip the initial quote
965 start += options.quote.size();
966 goto in_quotes;
967 }
968 }
969 } while (ReadBuffer(start, line_start));
970 // file ends while scanning for quote/delimiter, go to final state
971 goto final_state;
972normal:
973 /* state: normal parsing state */
974 // this state parses the remainder of a non-quoted value until we reach a delimiter or newline
975 position++;
976 do {
977 for (; position < buffer_size; position++) {
978 delimiter_search.Match(position&: delimiter_pos, byte_value: buffer[position]);
979 if (delimiter_pos == options.delimiter.size()) {
980 offset = options.delimiter.size() - 1;
981 goto add_value;
982 } else if (StringUtil::CharacterIsNewline(c: buffer[position])) {
983 goto add_row;
984 }
985 }
986 } while (ReadBuffer(start, line_start));
987 goto final_state;
988add_value:
989 AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes);
990 // increase position by 1 and move start to the new position
991 offset = 0;
992 has_quotes = false;
993 start = ++position;
994 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
995 // file ends right after delimiter, go to final state
996 goto final_state;
997 }
998 goto value_start;
999add_row : {
1000 // check type of newline (\r or \n)
1001 bool carriage_return = buffer[position] == '\r';
1002 AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes);
1003 finished_chunk = AddRow(insert_chunk, column, error_message);
1004
1005 if (!error_message.empty()) {
1006 return false;
1007 }
1008 // increase position by 1 and move start to the new position
1009 offset = 0;
1010 has_quotes = false;
1011 position++;
1012 SkipEmptyLines();
1013 start = position;
1014 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1015 // file ends right after newline, go to final state
1016 goto final_state;
1017 }
1018 if (carriage_return) {
1019 // \r newline, go to special state that parses an optional \n afterwards
1020 goto carriage_return;
1021 } else {
1022 // \n newline, move to value start
1023 if (finished_chunk) {
1024 return true;
1025 }
1026 goto value_start;
1027 }
1028}
1029in_quotes:
1030 /* state: in_quotes */
1031 // this state parses the remainder of a quoted value
1032 quote_pos = 0;
1033 escape_pos = 0;
1034 has_quotes = true;
1035 position++;
1036 do {
1037 for (; position < buffer_size; position++) {
1038 quote_search.Match(position&: quote_pos, byte_value: buffer[position]);
1039 escape_search.Match(position&: escape_pos, byte_value: buffer[position]);
1040 if (quote_pos == options.quote.size()) {
1041 goto unquote;
1042 } else if (escape_pos == options.escape.size()) {
1043 escape_positions.push_back(x: position - start - (options.escape.size() - 1));
1044 goto handle_escape;
1045 }
1046 }
1047 } while (ReadBuffer(start, line_start));
1048 // still in quoted state at the end of the file, error:
1049 error_message = StringUtil::Format(fmt_str: "Error in file \"%s\" on line %s: unterminated quotes. (%s)", params: options.file_path,
1050 params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1051 return false;
1052unquote:
1053 /* state: unquote */
1054 // this state handles the state directly after we unquote
1055 // in this state we expect either another quote (entering the quoted state again, and escaping the quote)
1056 // or a delimiter/newline, ending the current value and moving on to the next value
1057 delimiter_pos = 0;
1058 quote_pos = 0;
1059 position++;
1060 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1061 // file ends right after unquote, go to final state
1062 offset = options.quote.size();
1063 goto final_state;
1064 }
1065 if (StringUtil::CharacterIsNewline(c: buffer[position])) {
1066 // quote followed by newline, add row
1067 offset = options.quote.size();
1068 goto add_row;
1069 }
1070 do {
1071 idx_t count = 0;
1072 for (; position < buffer_size; position++) {
1073 quote_search.Match(position&: quote_pos, byte_value: buffer[position]);
1074 delimiter_search.Match(position&: delimiter_pos, byte_value: buffer[position]);
1075 count++;
1076 if (count > delimiter_pos && count > quote_pos) {
1077 error_message = StringUtil::Format(
1078 fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end "
1079 "of row or another quote. (%s)",
1080 params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1081 return false;
1082 }
1083 if (delimiter_pos == options.delimiter.size()) {
1084 // quote followed by delimiter, add value
1085 offset = options.quote.size() + options.delimiter.size() - 1;
1086 goto add_value;
1087 } else if (quote_pos == options.quote.size() &&
1088 (options.escape.empty() || options.escape == options.quote)) {
1089 // quote followed by quote, go back to quoted state and add to escape
1090 escape_positions.push_back(x: position - start - (options.quote.size() - 1));
1091 goto in_quotes;
1092 }
1093 }
1094 } while (ReadBuffer(start, line_start));
1095 error_message = StringUtil::Format(
1096 fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end of row or another quote. (%s)",
1097 params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1098 return false;
1099handle_escape:
1100 escape_pos = 0;
1101 quote_pos = 0;
1102 position++;
1103 do {
1104 idx_t count = 0;
1105 for (; position < buffer_size; position++) {
1106 quote_search.Match(position&: quote_pos, byte_value: buffer[position]);
1107 escape_search.Match(position&: escape_pos, byte_value: buffer[position]);
1108 count++;
1109 if (count > escape_pos && count > quote_pos) {
1110 error_message = StringUtil::Format(
1111 fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)",
1112 params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1113 return false;
1114 }
1115 if (quote_pos == options.quote.size() || escape_pos == options.escape.size()) {
1116 // found quote or escape: move back to quoted state
1117 goto in_quotes;
1118 }
1119 }
1120 } while (ReadBuffer(start, line_start));
1121 error_message =
1122 StringUtil::Format(fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)",
1123 params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1124 return false;
1125carriage_return:
1126 /* state: carriage_return */
1127 // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line
1128 if (buffer[position] == '\n') {
1129 // newline after carriage return: skip
1130 start = ++position;
1131 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1132 // file ends right after newline, go to final state
1133 goto final_state;
1134 }
1135 }
1136 if (finished_chunk) {
1137 return true;
1138 }
1139 goto value_start;
1140final_state:
1141 if (finished_chunk) {
1142 return true;
1143 }
1144 if (column > 0 || position > start) {
1145 // remaining values to be added to the chunk
1146 AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes);
1147 finished_chunk = AddRow(insert_chunk, column, error_message);
1148 SkipEmptyLines();
1149 if (!error_message.empty()) {
1150 return false;
1151 }
1152 }
1153 // final stage, only reached after parsing the file is finished
1154 // flush the parsed chunk and finalize parsing
1155 if (mode == ParserMode::PARSING) {
1156 Flush(insert_chunk);
1157 }
1158
1159 end_of_file_reached = true;
1160 return true;
1161}
1162
1163void BufferedCSVReader::SkipEmptyLines() {
1164 if (parse_chunk.data.size() == 1) {
1165 // Empty lines are null data.
1166 return;
1167 }
1168 for (; position < buffer_size; position++) {
1169 if (!StringUtil::CharacterIsNewline(c: buffer[position])) {
1170 return;
1171 }
1172 }
1173}
1174
1175void UpdateMaxLineLength(ClientContext &context, idx_t line_length) {
1176 if (!context.client_data->debug_set_max_line_length) {
1177 return;
1178 }
1179 if (line_length < context.client_data->debug_max_line_length) {
1180 return;
1181 }
1182 context.client_data->debug_max_line_length = line_length;
1183}
1184
1185bool BufferedCSVReader::TryParseSimpleCSV(DataChunk &insert_chunk, string &error_message) {
1186 // used for parsing algorithm
1187 bool finished_chunk = false;
1188 idx_t column = 0;
1189 idx_t offset = 0;
1190 bool has_quotes = false;
1191 vector<idx_t> escape_positions;
1192
1193 idx_t line_start = position;
1194 // read values into the buffer (if any)
1195 if (position >= buffer_size) {
1196 if (!ReadBuffer(start, line_start)) {
1197 return true;
1198 }
1199 }
1200
1201 // start parsing the first value
1202 goto value_start;
1203value_start:
1204 offset = 0;
1205 /* state: value_start */
1206 // this state parses the first character of a value
1207 if (buffer[position] == options.quote[0]) {
1208 // quote: actual value starts in the next position
1209 // move to in_quotes state
1210 start = position + 1;
1211 goto in_quotes;
1212 } else {
1213 // no quote, move to normal parsing state
1214 start = position;
1215 goto normal;
1216 }
1217normal:
1218 /* state: normal parsing state */
1219 // this state parses the remainder of a non-quoted value until we reach a delimiter or newline
1220 do {
1221 for (; position < buffer_size; position++) {
1222 if (buffer[position] == options.delimiter[0]) {
1223 // delimiter: end the value and add it to the chunk
1224 goto add_value;
1225 } else if (StringUtil::CharacterIsNewline(c: buffer[position])) {
1226 // newline: add row
1227 goto add_row;
1228 }
1229 }
1230 } while (ReadBuffer(start, line_start));
1231 // file ends during normal scan: go to end state
1232 goto final_state;
1233add_value:
1234 AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes);
1235 // increase position by 1 and move start to the new position
1236 offset = 0;
1237 has_quotes = false;
1238 start = ++position;
1239 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1240 // file ends right after delimiter, go to final state
1241 goto final_state;
1242 }
1243 goto value_start;
1244add_row : {
1245 // check type of newline (\r or \n)
1246 bool carriage_return = buffer[position] == '\r';
1247 AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes);
1248 if (!error_message.empty()) {
1249 return false;
1250 }
1251 finished_chunk = AddRow(insert_chunk, column, error_message);
1252 UpdateMaxLineLength(context, line_length: position - line_start);
1253 if (!error_message.empty()) {
1254 return false;
1255 }
1256 // increase position by 1 and move start to the new position
1257 offset = 0;
1258 has_quotes = false;
1259 position++;
1260 start = position;
1261 line_start = position;
1262 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1263 // file ends right after delimiter, go to final state
1264 goto final_state;
1265 }
1266 if (carriage_return) {
1267 // \r newline, go to special state that parses an optional \n afterwards
1268 goto carriage_return;
1269 } else {
1270 SetNewLineDelimiter();
1271 SkipEmptyLines();
1272
1273 start = position;
1274 line_start = position;
1275 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1276 // file ends right after delimiter, go to final state
1277 goto final_state;
1278 }
1279 // \n newline, move to value start
1280 if (finished_chunk) {
1281 return true;
1282 }
1283 goto value_start;
1284 }
1285}
1286in_quotes:
1287 /* state: in_quotes */
1288 // this state parses the remainder of a quoted value
1289 has_quotes = true;
1290 position++;
1291 do {
1292 for (; position < buffer_size; position++) {
1293 if (buffer[position] == options.quote[0]) {
1294 // quote: move to unquoted state
1295 goto unquote;
1296 } else if (buffer[position] == options.escape[0]) {
1297 // escape: store the escaped position and move to handle_escape state
1298 escape_positions.push_back(x: position - start);
1299 goto handle_escape;
1300 }
1301 }
1302 } while (ReadBuffer(start, line_start));
1303 // still in quoted state at the end of the file, error:
1304 throw InvalidInputException("Error in file \"%s\" on line %s: unterminated quotes. (%s)", options.file_path,
1305 GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), options.ToString());
1306unquote:
1307 /* state: unquote */
1308 // this state handles the state directly after we unquote
1309 // in this state we expect either another quote (entering the quoted state again, and escaping the quote)
1310 // or a delimiter/newline, ending the current value and moving on to the next value
1311 position++;
1312 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1313 // file ends right after unquote, go to final state
1314 offset = 1;
1315 goto final_state;
1316 }
1317 if (buffer[position] == options.quote[0] && (options.escape.empty() || options.escape[0] == options.quote[0])) {
1318 // escaped quote, return to quoted state and store escape position
1319 escape_positions.push_back(x: position - start);
1320 goto in_quotes;
1321 } else if (buffer[position] == options.delimiter[0]) {
1322 // delimiter, add value
1323 offset = 1;
1324 goto add_value;
1325 } else if (StringUtil::CharacterIsNewline(c: buffer[position])) {
1326 offset = 1;
1327 goto add_row;
1328 } else {
1329 error_message = StringUtil::Format(
1330 fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end of "
1331 "row or another quote. (%s)",
1332 params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1333 return false;
1334 }
1335handle_escape:
1336 /* state: handle_escape */
1337 // escape should be followed by a quote or another escape character
1338 position++;
1339 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1340 error_message = StringUtil::Format(
1341 fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)", params: options.file_path,
1342 params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1343 return false;
1344 }
1345 if (buffer[position] != options.quote[0] && buffer[position] != options.escape[0]) {
1346 error_message = StringUtil::Format(
1347 fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)", params: options.file_path,
1348 params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString());
1349 return false;
1350 }
1351 // escape was followed by quote or escape, go back to quoted state
1352 goto in_quotes;
1353carriage_return:
1354 /* state: carriage_return */
1355 // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line
1356 if (buffer[position] == '\n') {
1357 SetNewLineDelimiter(carry: true, carry_followed_by_nl: true);
1358 // newline after carriage return: skip
1359 // increase position by 1 and move start to the new position
1360 start = ++position;
1361 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1362 // file ends right after delimiter, go to final state
1363 goto final_state;
1364 }
1365 } else {
1366 SetNewLineDelimiter(carry: true, carry_followed_by_nl: false);
1367 }
1368 if (finished_chunk) {
1369 return true;
1370 }
1371 SkipEmptyLines();
1372 start = position;
1373 line_start = position;
1374 if (position >= buffer_size && !ReadBuffer(start, line_start)) {
1375 // file ends right after delimiter, go to final state
1376 goto final_state;
1377 }
1378
1379 goto value_start;
1380final_state:
1381 if (finished_chunk) {
1382 return true;
1383 }
1384
1385 if (column > 0 || position > start) {
1386 // remaining values to be added to the chunk
1387 AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes);
1388 finished_chunk = AddRow(insert_chunk, column, error_message);
1389 SkipEmptyLines();
1390 UpdateMaxLineLength(context, line_length: position - line_start);
1391 if (!error_message.empty()) {
1392 return false;
1393 }
1394 }
1395
1396 // final stage, only reached after parsing the file is finished
1397 // flush the parsed chunk and finalize parsing
1398 if (mode == ParserMode::PARSING) {
1399 Flush(insert_chunk);
1400 }
1401
1402 end_of_file_reached = true;
1403 return true;
1404}
1405
1406bool BufferedCSVReader::ReadBuffer(idx_t &start, idx_t &line_start) {
1407 if (start > buffer_size) {
1408 return false;
1409 }
1410 auto old_buffer = std::move(buffer);
1411
1412 // the remaining part of the last buffer
1413 idx_t remaining = buffer_size - start;
1414
1415 bool large_buffers = mode == ParserMode::PARSING && !file_handle->OnDiskFile() && file_handle->CanSeek();
1416 idx_t buffer_read_size = large_buffers ? INITIAL_BUFFER_SIZE_LARGE : INITIAL_BUFFER_SIZE;
1417
1418 while (remaining > buffer_read_size) {
1419 buffer_read_size *= 2;
1420 }
1421
1422 // Check line length
1423 if (remaining > options.maximum_line_size) {
1424 throw InvalidInputException("Maximum line size of %llu bytes exceeded on line %s!", options.maximum_line_size,
1425 GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated));
1426 }
1427
1428 buffer = make_unsafe_uniq_array<char>(n: buffer_read_size + remaining + 1);
1429 buffer_size = remaining + buffer_read_size;
1430 if (remaining > 0) {
1431 // remaining from last buffer: copy it here
1432 memcpy(dest: buffer.get(), src: old_buffer.get() + start, n: remaining);
1433 }
1434 idx_t read_count = file_handle->Read(buffer: buffer.get() + remaining, nr_bytes: buffer_read_size);
1435
1436 bytes_in_chunk += read_count;
1437 buffer_size = remaining + read_count;
1438 buffer[buffer_size] = '\0';
1439 if (old_buffer) {
1440 cached_buffers.push_back(x: std::move(old_buffer));
1441 }
1442 start = 0;
1443 position = remaining;
1444 if (!bom_checked) {
1445 bom_checked = true;
1446 if (read_count >= 3 && buffer[0] == '\xEF' && buffer[1] == '\xBB' && buffer[2] == '\xBF') {
1447 start += 3;
1448 position += 3;
1449 }
1450 }
1451 line_start = start;
1452
1453 return read_count > 0;
1454}
1455
1456void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) {
1457 string error_message;
1458 if (!TryParseCSV(mode: ParserMode::PARSING, insert_chunk, error_message)) {
1459 throw InvalidInputException(error_message);
1460 }
1461}
1462
1463bool BufferedCSVReader::TryParseCSV(ParserMode mode) {
1464 DataChunk dummy_chunk;
1465 string error_message;
1466 return TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message);
1467}
1468
1469void BufferedCSVReader::ParseCSV(ParserMode mode) {
1470 DataChunk dummy_chunk;
1471 string error_message;
1472 if (!TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message)) {
1473 throw InvalidInputException(error_message);
1474 }
1475}
1476
1477bool BufferedCSVReader::TryParseCSV(ParserMode parser_mode, DataChunk &insert_chunk, string &error_message) {
1478 mode = parser_mode;
1479
1480 if (options.quote.size() <= 1 && options.escape.size() <= 1 && options.delimiter.size() == 1) {
1481 return TryParseSimpleCSV(insert_chunk, error_message);
1482 } else {
1483 return TryParseComplexCSV(insert_chunk, error_message);
1484 }
1485}
1486
1487} // namespace duckdb
1488