1#include "duckdb/execution/operator/persistent/buffered_csv_reader.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/file_system.hpp"
5#include "duckdb/common/gzip_stream.hpp"
6#include "duckdb/common/string_util.hpp"
7#include "duckdb/common/vector_operations/vector_operations.hpp"
8#include "duckdb/execution/operator/persistent/physical_copy_from_file.hpp"
9#include "duckdb/main/database.hpp"
10#include "duckdb/parser/column_definition.hpp"
11#include "duckdb/storage/data_table.hpp"
12#include "utf8proc_wrapper.hpp"
13
14#include <algorithm>
15#include <cstring>
16#include <fstream>
17#include <queue>
18
19using namespace duckdb;
20using namespace std;
21
22static char is_newline(char c) {
23 return c == '\n' || c == '\r';
24}
25
26// Helper function to generate column names
27static string GenerateColumnName(const idx_t total_cols, const idx_t col_number, const string prefix = "column") {
28 uint8_t max_digits = total_cols > 10 ? (int)log10((double)total_cols - 1) + 1 : 1;
29 uint8_t digits = col_number > 10 ? (int)log10((double)col_number) + 1 : 1;
30 string leading_zeros = string("0", max_digits - digits);
31 string value = std::to_string(col_number);
32 return string(prefix + leading_zeros + value);
33}
34
35static string GetLineNumberStr(idx_t linenr, bool linenr_estimated) {
36 string estimated = (linenr_estimated ? string(" (estimated)") : string(""));
37 return std::to_string(linenr) + estimated;
38}
39
40TextSearchShiftArray::TextSearchShiftArray() {
41}
42
43TextSearchShiftArray::TextSearchShiftArray(string search_term) : length(search_term.size()) {
44 if (length > 255) {
45 throw Exception("Size of delimiter/quote/escape in CSV reader is limited to 255 bytes");
46 }
47 // initialize the shifts array
48 shifts = unique_ptr<uint8_t[]>(new uint8_t[length * 255]);
49 memset(shifts.get(), 0, length * 255 * sizeof(uint8_t));
50 // iterate over each of the characters in the array
51 for (idx_t main_idx = 0; main_idx < length; main_idx++) {
52 uint8_t current_char = (uint8_t)search_term[main_idx];
53 // now move over all the remaining positions
54 for (idx_t i = main_idx; i < length; i++) {
55 bool is_match = true;
56 // check if the prefix matches at this position
57 // if it does, we move to this position after encountering the current character
58 for (idx_t j = 0; j < main_idx; j++) {
59 if (search_term[i - main_idx + j] != search_term[j]) {
60 is_match = false;
61 }
62 }
63 if (!is_match) {
64 continue;
65 }
66 shifts[i * 255 + current_char] = main_idx + 1;
67 }
68 }
69}
70
71BufferedCSVReader::BufferedCSVReader(ClientContext &context, CopyInfo &info, vector<SQLType> requested_types)
72 : info(info), buffer_size(0), position(0), start(0) {
73 source = OpenCSV(context, info);
74 Initialize(requested_types);
75}
76
77BufferedCSVReader::BufferedCSVReader(CopyInfo &info, vector<SQLType> requested_types, unique_ptr<istream> ssource)
78 : info(info), source(move(ssource)), buffer_size(0), position(0), start(0) {
79 Initialize(requested_types);
80}
81
82void BufferedCSVReader::Initialize(vector<SQLType> requested_types) {
83 if (info.auto_detect) {
84 sql_types = SniffCSV(requested_types);
85 } else {
86 sql_types = requested_types;
87 }
88
89 PrepareComplexParser();
90 InitParseChunk(sql_types.size());
91 SkipHeader();
92}
93
94void BufferedCSVReader::PrepareComplexParser() {
95 delimiter_search = TextSearchShiftArray(info.delimiter);
96 escape_search = TextSearchShiftArray(info.escape);
97 quote_search = TextSearchShiftArray(info.quote);
98}
99
100unique_ptr<istream> BufferedCSVReader::OpenCSV(ClientContext &context, CopyInfo &info) {
101 if (!FileSystem::GetFileSystem(context).FileExists(info.file_path)) {
102 throw IOException("File \"%s\" not found", info.file_path.c_str());
103 }
104 unique_ptr<istream> result;
105 // decide based on the extension which stream to use
106 if (StringUtil::EndsWith(StringUtil::Lower(info.file_path), ".gz")) {
107 result = make_unique<GzipStream>(info.file_path);
108 plain_file_source = false;
109 } else {
110 auto csv_local = make_unique<ifstream>();
111 csv_local->open(info.file_path);
112 result = move(csv_local);
113
114 // determine filesize
115 plain_file_source = true;
116 result->seekg(0, result->end);
117 file_size = (idx_t)result->tellg();
118 result->clear();
119 result->seekg(0, result->beg);
120 }
121 return result;
122}
123
124void BufferedCSVReader::SkipHeader() {
125 for (idx_t i = 0; i < info.skip_rows; i++) {
126 // ignore skip rows
127 string read_line;
128 getline(*source, read_line);
129 linenr++;
130 }
131
132 if (info.header) {
133 // ignore the first line as a header line
134 string read_line;
135 getline(*source, read_line);
136 linenr++;
137 }
138}
139
140void BufferedCSVReader::ResetBuffer() {
141 buffer.reset();
142 buffer_size = 0;
143 position = 0;
144 start = 0;
145 cached_buffers.clear();
146}
147
148void BufferedCSVReader::ResetStream() {
149 if (!plain_file_source && StringUtil::EndsWith(StringUtil::Lower(info.file_path), ".gz")) {
150 // seeking to the beginning appears to not be supported in all compiler/os-scenarios,
151 // so we have to create a new stream source here for now
152 source = make_unique<GzipStream>(info.file_path);
153 } else {
154 source->clear();
155 source->seekg(0, source->beg);
156 }
157 linenr = 0;
158 linenr_estimated = false;
159 bytes_per_line_avg = 0;
160 sample_chunk_idx = 0;
161 jumping_samples = false;
162}
163
164void BufferedCSVReader::ResetParseChunk() {
165 bytes_in_chunk = 0;
166 parse_chunk.Reset();
167}
168
169void BufferedCSVReader::InitParseChunk(idx_t num_cols) {
170 // adapt not null info
171 if (info.force_not_null.size() != num_cols) {
172 info.force_not_null.resize(num_cols, false);
173 }
174
175 // destroy previous chunk
176 parse_chunk.Destroy();
177
178 // initialize the parse_chunk with a set of VARCHAR types
179 vector<TypeId> varchar_types(num_cols, TypeId::VARCHAR);
180 parse_chunk.Initialize(varchar_types);
181}
182
183void BufferedCSVReader::JumpToBeginning() {
184 ResetBuffer();
185 ResetStream();
186 ResetParseChunk();
187 SkipHeader();
188}
189
190bool BufferedCSVReader::JumpToNextSample() {
191 if (source->eof() || sample_chunk_idx >= MAX_SAMPLE_CHUNKS) {
192 return false;
193 }
194
195 // update average bytes per line
196 double bytes_per_line = bytes_in_chunk / (double)SAMPLE_CHUNK_SIZE;
197 bytes_per_line_avg = ((bytes_per_line_avg * sample_chunk_idx) + bytes_per_line) / (sample_chunk_idx + 1);
198
199 // assess if it makes sense to jump, based on size of the first chunk relative to size of the entire file
200 if (sample_chunk_idx == 0) {
201 idx_t bytes_first_chunk = bytes_in_chunk;
202 double chunks_fit = (file_size / (double)bytes_first_chunk);
203 jumping_samples = chunks_fit >= (MAX_SAMPLE_CHUNKS - 1);
204 }
205
206 // if we deal with any other sources than plaintext files, jumping_samples can be tricky. In that case
207 // we just read x continuous chunks from the stream TODO: make jumps possible for zipfiles.
208 if (!plain_file_source || !jumping_samples) {
209 sample_chunk_idx++;
210 ResetParseChunk();
211 return true;
212 }
213
214 // adjust the value of bytes_in_chunk, based on current state of the buffer
215 idx_t remaining_bytes_in_buffer = buffer_size - start;
216 bytes_in_chunk -= remaining_bytes_in_buffer;
217
218 // if none of the previous conditions were met, we can jump
219 idx_t partition_size = (idx_t)round(file_size / (double)MAX_SAMPLE_CHUNKS);
220
221 // calculate offset to end of the current partition
222 int64_t offset = partition_size - bytes_in_chunk - remaining_bytes_in_buffer;
223 idx_t current_pos = (idx_t)source->tellg();
224
225 if (current_pos + offset < file_size) {
226 // set position in stream and clear failure bits
227 source->clear();
228 source->seekg(offset, source->cur);
229
230 // estimate linenr
231 linenr += (idx_t)round((offset + remaining_bytes_in_buffer) / bytes_per_line_avg);
232 linenr_estimated = true;
233 } else {
234 // seek backwards from the end in last chunk and hope to catch the end of the file
235 // TODO: actually it would be good to make sure that the end of file is being reached, because
236 // messy end-lines are quite common. For this case, however, we first need a skip_end detection anyways.
237 source->seekg(-bytes_in_chunk, source->end);
238
239 // estimate linenr
240 linenr = (idx_t)round((file_size - bytes_in_chunk) / bytes_per_line_avg);
241 linenr_estimated = true;
242 }
243
244 // reset buffers and internal positions
245 ResetBuffer();
246 ResetParseChunk();
247
248 // seek beginning of next line
249 // FIXME: if this jump ends up in a quoted linebreak, we will have a problem
250 string read_line;
251 getline(*source, read_line);
252 linenr++;
253
254 sample_chunk_idx++;
255
256 return true;
257}
258
259vector<SQLType> BufferedCSVReader::SniffCSV(vector<SQLType> requested_types) {
260 // TODO: sniff for uncommon (UTF-8) delimiter variants in first lines and add them to the list
261 const vector<string> delim_candidates = {",", "|", ";", "\t"};
262 const vector<QuoteRule> quoterule_candidates = {QuoteRule::QUOTES_RFC, QuoteRule::QUOTES_OTHER,
263 QuoteRule::NO_QUOTES};
264 // quote candiates depend on quote rule
265 const vector<vector<string>> quote_candidates_map = {{"\""}, {"\"", "'"}, {""}};
266 // escape candiates also depend on quote rule.
267 // Note: RFC-conform escapes are handled automatically, and without quotes no escape char is required
268 const vector<vector<string>> escape_candidates_map = {{""}, {"\\"}, {""}};
269
270 vector<CopyInfo> info_candidates;
271 idx_t best_consistent_rows = 0;
272 idx_t best_num_cols = 0;
273
274 // if requested_types were provided, use them already in dialect detection
275 // TODO: currently they only serve to solve the edge case of trailing empty delimiters,
276 // however, they could be used to solve additional ambigious scenarios.
277 sql_types = requested_types;
278 // TODO: add a flag to indicate that no option actually worked and default will be used (RFC-4180)
279 for (QuoteRule quoterule : quoterule_candidates) {
280 vector<string> quote_candidates = quote_candidates_map[static_cast<uint8_t>(quoterule)];
281 for (const auto &quote : quote_candidates) {
282 for (const auto &delim : delim_candidates) {
283 vector<string> escape_candidates = escape_candidates_map[static_cast<uint8_t>(quoterule)];
284 for (const auto &escape : escape_candidates) {
285 CopyInfo sniff_info = info;
286 sniff_info.delimiter = delim;
287 sniff_info.quote = quote;
288 sniff_info.escape = escape;
289 info = sniff_info;
290 PrepareComplexParser();
291
292 ResetBuffer();
293 ResetStream();
294 sniffed_column_counts.clear();
295 try {
296 ParseCSV(ParserMode::SNIFFING_DIALECT);
297 } catch (const ParserException &e) {
298 continue;
299 }
300
301 idx_t start_row = 0;
302 idx_t consistent_rows = 0;
303 idx_t num_cols = 0;
304
305 for (idx_t row = 0; row < sniffed_column_counts.size(); row++) {
306 if (sniffed_column_counts[row] == num_cols) {
307 consistent_rows++;
308 } else {
309 num_cols = sniffed_column_counts[row];
310 start_row = row;
311 consistent_rows = 1;
312 }
313 }
314
315 // some logic
316 bool more_values = (consistent_rows > best_consistent_rows && num_cols >= best_num_cols);
317 bool single_column_before = best_num_cols < 2 && num_cols > best_num_cols;
318 bool rows_conistent = start_row + consistent_rows == sniffed_column_counts.size();
319 bool more_than_one_row = (consistent_rows > 1);
320 bool more_than_one_column = (num_cols > 1);
321 bool start_good = info_candidates.size() > 0 && (start_row <= info_candidates.front().skip_rows);
322
323 if ((more_values || single_column_before) && rows_conistent) {
324 sniff_info.skip_rows = start_row;
325 sniff_info.num_cols = num_cols;
326 best_consistent_rows = consistent_rows;
327 best_num_cols = num_cols;
328
329 info_candidates.clear();
330 info_candidates.push_back(sniff_info);
331 } else if (more_than_one_row && more_than_one_column && start_good && rows_conistent) {
332 bool same_quote_is_candidate = false;
333 for (auto &info_candidate : info_candidates) {
334 if (quote.compare(info_candidate.quote) == 0) {
335 same_quote_is_candidate = true;
336 }
337 }
338 if (!same_quote_is_candidate) {
339 sniff_info.skip_rows = start_row;
340 sniff_info.num_cols = num_cols;
341 info_candidates.push_back(sniff_info);
342 }
343 }
344 }
345 }
346 }
347 }
348
349 // then, file was most likely empty and we can do no more
350 if (info_candidates.size() < 1) {
351 return requested_types;
352 }
353
354 // type candidates, ordered by descending specificity (~ from high to low)
355 vector<SQLType> type_candidates = {SQLType::VARCHAR, SQLType::TIMESTAMP, SQLType::DATE,
356 SQLType::TIME, SQLType::DOUBLE, /*SQLType::FLOAT,*/ SQLType::BIGINT,
357 SQLType::INTEGER, SQLType::SMALLINT, /*SQLType::TINYINT,*/ SQLType::BOOLEAN,
358 SQLType::SQLNULL};
359
360 // check which info candiate leads to minimum amount of non-varchar columns...
361 CopyInfo best_info;
362 idx_t min_varchar_cols = best_num_cols + 1;
363 vector<vector<SQLType>> best_sql_types_candidates;
364 for (auto &info_candidate : info_candidates) {
365 info = info_candidate;
366 vector<vector<SQLType>> info_sql_types_candidates(info.num_cols, type_candidates);
367
368 // set all sql_types to VARCHAR so we can do datatype detection based on VARCHAR values
369 sql_types.clear();
370 sql_types.assign(info.num_cols, SQLType::VARCHAR);
371 InitParseChunk(sql_types.size());
372
373 // detect types in first chunk
374 JumpToBeginning();
375 ParseCSV(ParserMode::SNIFFING_DATATYPES);
376 for (idx_t row = 0; row < parse_chunk.size(); row++) {
377 for (idx_t col = 0; col < parse_chunk.column_count(); col++) {
378 vector<SQLType> &col_type_candidates = info_sql_types_candidates[col];
379 while (col_type_candidates.size() > 1) {
380 const auto &sql_type = col_type_candidates.back();
381 // try cast from string to sql_type
382 auto dummy_val = parse_chunk.GetValue(col, row);
383 try {
384 dummy_val.CastAs(SQLType::VARCHAR, sql_type, true);
385 break;
386 } catch (const Exception &e) {
387 col_type_candidates.pop_back();
388 }
389 }
390 }
391 // reset type detection for second row, because first row could be header,
392 // but only do it if csv has more than one line
393 if (parse_chunk.size() > 1 && row == 0) {
394 info_sql_types_candidates = vector<vector<SQLType>>(info.num_cols, type_candidates);
395 }
396 }
397
398 // check number of varchar columns
399 idx_t varchar_cols = 0;
400 for (idx_t col = 0; col < parse_chunk.column_count(); col++) {
401 const auto &col_type = info_sql_types_candidates[col].back();
402 if (col_type == SQLType::VARCHAR) {
403 varchar_cols++;
404 }
405 }
406
407 // it's good if the dialect creates more non-varchar columns, but only if we sacrifice < 40% of best_num_cols.
408 if (varchar_cols < min_varchar_cols && parse_chunk.column_count() > (best_num_cols * 0.7)) {
409 // we have a new best_info candidate
410 best_info = info_candidate;
411 min_varchar_cols = varchar_cols;
412 best_sql_types_candidates = info_sql_types_candidates;
413 }
414 }
415
416 info = best_info;
417
418 // if data types were provided, exit here if number of columns does not match
419 // TODO: we could think about postponing this to see if the csv happens to contain a superset of requested columns
420 if (requested_types.size() > 0 && requested_types.size() != info.num_cols) {
421 throw ParserException("Error while determining column types: found %lld columns but expected %d", info.num_cols,
422 requested_types.size());
423 }
424
425 // sql_types and parse_chunk have to be in line with new info
426 sql_types.clear();
427 sql_types.assign(info.num_cols, SQLType::VARCHAR);
428 InitParseChunk(sql_types.size());
429
430 // jump through the rest of the file and continue to refine the sql type guess
431 while (JumpToNextSample()) {
432 // if jump ends up a bad line, we just skip this chunk
433 try {
434 ParseCSV(ParserMode::SNIFFING_DATATYPES);
435 } catch (const ParserException &e) {
436 continue;
437 }
438 for (idx_t col = 0; col < parse_chunk.column_count(); col++) {
439 vector<SQLType> &col_type_candidates = best_sql_types_candidates[col];
440 while (col_type_candidates.size() > 1) {
441 try {
442 const auto &sql_type = col_type_candidates.back();
443 // try vector-cast from string to sql_type
444 parse_chunk.data[col];
445 Vector dummy_result(GetInternalType(sql_type));
446 VectorOperations::Cast(parse_chunk.data[col], dummy_result, SQLType::VARCHAR, sql_type,
447 parse_chunk.size(), true);
448 break;
449 } catch (const Exception &e) {
450 col_type_candidates.pop_back();
451 }
452 }
453 }
454 }
455
456 // information for header detection
457 bool first_row_consistent = true;
458 bool first_row_nulls = true;
459
460 // parse first row again with knowledge from the rest of the file to check
461 // whether first row is consistent with the others or not.
462 JumpToBeginning();
463 ParseCSV(ParserMode::SNIFFING_DATATYPES);
464 if (parse_chunk.size() > 0) {
465 for (idx_t col = 0; col < parse_chunk.column_count(); col++) {
466 auto dummy_val = parse_chunk.GetValue(col, 0);
467 // try cast as SQLNULL
468 try {
469 dummy_val.CastAs(SQLType::VARCHAR, SQLType::SQLNULL, true);
470 } catch (const Exception &e) {
471 first_row_nulls = false;
472 }
473 // try cast to sql_type of column
474 vector<SQLType> &col_type_candidates = best_sql_types_candidates[col];
475 const auto &sql_type = col_type_candidates.back();
476
477 try {
478 dummy_val.CastAs(SQLType::VARCHAR, sql_type, true);
479 } catch (const Exception &e) {
480 first_row_consistent = false;
481 break;
482 }
483 }
484 }
485
486 // if all rows are of type string, we will currently make the assumption there is no header.
487 // TODO: Do some kind of string-distance based constistency metic between first row and others
488 /*bool all_types_string = true;
489 for (idx_t col = 0; col < parse_chunk.column_count(); col++) {
490 const auto &col_type = best_sql_types_candidates[col].back();
491 all_types_string &= (col_type == SQLType::VARCHAR);
492 }*/
493
494 // update parser info, and read, generate & set col_names based on previous findings
495 if (!first_row_consistent || first_row_nulls) {
496 info.header = true;
497 vector<string> t_col_names;
498 for (idx_t col = 0; col < parse_chunk.column_count(); col++) {
499 const auto &val = parse_chunk.GetValue(col, 0);
500 string col_name = val.ToString();
501 if (col_name.empty() || val.is_null) {
502 col_name = GenerateColumnName(parse_chunk.column_count(), col);
503 }
504 // We'll keep column names as they appear in the file, no canonicalization
505 // col_name = StringUtil::Lower(col_name);
506 t_col_names.push_back(col_name);
507 }
508 for (idx_t col = 0; col < t_col_names.size(); col++) {
509 string col_name = t_col_names[col];
510 idx_t exists_n_times = std::count(t_col_names.begin(), t_col_names.end(), col_name);
511 idx_t exists_n_times_before = std::count(t_col_names.begin(), t_col_names.begin() + col, col_name);
512 if (exists_n_times > 1) {
513 col_name = GenerateColumnName(exists_n_times, exists_n_times_before, col_name + "_");
514 }
515 col_names.push_back(col_name);
516 }
517 } else {
518 info.header = false;
519 idx_t total_columns = parse_chunk.column_count();
520 for (idx_t col = 0; col < total_columns; col++) {
521 string column_name = GenerateColumnName(total_columns, col);
522 col_names.push_back(column_name);
523 }
524 }
525
526 // set sql types
527 vector<SQLType> detected_types;
528 for (idx_t col = 0; col < best_sql_types_candidates.size(); col++) {
529 SQLType d_type = best_sql_types_candidates[col].back();
530
531 if (requested_types.size() > 0) {
532 SQLType r_type = requested_types[col];
533
534 // check if the detected types are in line with the provided types
535 if (r_type != d_type) {
536 if (r_type.IsMoreGenericThan(d_type)) {
537 d_type = r_type;
538 } else {
539 throw ParserException(
540 "Error while sniffing data type for column '%s': Requested column type %s, detected type %s",
541 col_names[col].c_str(), SQLTypeToString(r_type).c_str(), SQLTypeToString(d_type).c_str());
542 }
543 }
544 }
545
546 detected_types.push_back(d_type);
547 }
548
549 // back to normal
550 ResetBuffer();
551 ResetStream();
552 ResetParseChunk();
553 sniffed_column_counts.clear();
554
555 return detected_types;
556}
557
558void BufferedCSVReader::ParseComplexCSV(DataChunk &insert_chunk) {
559 // used for parsing algorithm
560 bool finished_chunk = false;
561 idx_t column = 0;
562 vector<idx_t> escape_positions;
563 uint8_t delimiter_pos = 0, escape_pos = 0, quote_pos = 0;
564 idx_t offset = 0;
565
566 // read values into the buffer (if any)
567 if (position >= buffer_size) {
568 if (!ReadBuffer(start)) {
569 return;
570 }
571 }
572 // start parsing the first value
573 start = position;
574 goto value_start;
575value_start:
576 /* state: value_start */
577 // this state parses the first characters of a value
578 offset = 0;
579 delimiter_pos = 0;
580 quote_pos = 0;
581 do {
582 idx_t count = 0;
583 for (; position < buffer_size; position++) {
584 quote_search.Match(quote_pos, buffer[position]);
585 delimiter_search.Match(delimiter_pos, buffer[position]);
586 count++;
587 if (delimiter_pos == info.delimiter.size()) {
588 // found a delimiter, add the value
589 offset = info.delimiter.size() - 1;
590 goto add_value;
591 } else if (is_newline(buffer[position])) {
592 // found a newline, add the row
593 goto add_row;
594 }
595 if (count > quote_pos) {
596 // did not find a quote directly at the start of the value, stop looking for the quote now
597 goto normal;
598 }
599 if (quote_pos == info.quote.size()) {
600 // found a quote, go to quoted loop and skip the initial quote
601 start += info.quote.size();
602 goto in_quotes;
603 }
604 }
605 } while (ReadBuffer(start));
606 // file ends while scanning for quote/delimiter, go to final state
607 goto final_state;
608normal:
609 /* state: normal parsing state */
610 // this state parses the remainder of a non-quoted value until we reach a delimiter or newline
611 position++;
612 do {
613 for (; position < buffer_size; position++) {
614 delimiter_search.Match(delimiter_pos, buffer[position]);
615 if (delimiter_pos == info.delimiter.size()) {
616 offset = info.delimiter.size() - 1;
617 goto add_value;
618 } else if (is_newline(buffer[position])) {
619 goto add_row;
620 }
621 }
622 } while (ReadBuffer(start));
623 goto final_state;
624add_value:
625 AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
626 // increase position by 1 and move start to the new position
627 offset = 0;
628 start = ++position;
629 if (position >= buffer_size && !ReadBuffer(start)) {
630 // file ends right after delimiter, go to final state
631 goto final_state;
632 }
633 goto value_start;
634add_row : {
635 // check type of newline (\r or \n)
636 bool carriage_return = buffer[position] == '\r';
637 AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
638 finished_chunk = AddRow(insert_chunk, column);
639 // increase position by 1 and move start to the new position
640 offset = 0;
641 start = ++position;
642 if (position >= buffer_size && !ReadBuffer(start)) {
643 // file ends right after newline, go to final state
644 goto final_state;
645 }
646 if (carriage_return) {
647 // \r newline, go to special state that parses an optional \n afterwards
648 goto carriage_return;
649 } else {
650 // \n newline, move to value start
651 if (finished_chunk) {
652 return;
653 }
654 goto value_start;
655 }
656}
657in_quotes:
658 /* state: in_quotes */
659 // this state parses the remainder of a quoted value
660 quote_pos = 0;
661 escape_pos = 0;
662 position++;
663 do {
664 for (; position < buffer_size; position++) {
665 quote_search.Match(quote_pos, buffer[position]);
666 escape_search.Match(escape_pos, buffer[position]);
667 if (quote_pos == info.quote.size()) {
668 goto unquote;
669 } else if (escape_pos == info.escape.size()) {
670 escape_positions.push_back(position - start - (info.escape.size() - 1));
671 goto handle_escape;
672 }
673 }
674 } while (ReadBuffer(start));
675 // still in quoted state at the end of the file, error:
676 throw ParserException("Error on line %s: unterminated quotes", GetLineNumberStr(linenr, linenr_estimated).c_str());
677unquote:
678 /* state: unquote */
679 // this state handles the state directly after we unquote
680 // in this state we expect either another quote (entering the quoted state again, and escaping the quote)
681 // or a delimiter/newline, ending the current value and moving on to the next value
682 delimiter_pos = 0;
683 quote_pos = 0;
684 position++;
685 if (position >= buffer_size && !ReadBuffer(start)) {
686 // file ends right after unquote, go to final state
687 offset = info.quote.size();
688 goto final_state;
689 }
690 if (is_newline(buffer[position])) {
691 // quote followed by newline, add row
692 offset = info.quote.size();
693 goto add_row;
694 }
695 do {
696 idx_t count = 0;
697 for (; position < buffer_size; position++) {
698 quote_search.Match(quote_pos, buffer[position]);
699 delimiter_search.Match(delimiter_pos, buffer[position]);
700 count++;
701 if (count > delimiter_pos && count > quote_pos) {
702 throw ParserException(
703 "Error on line %s: quote should be followed by end of value, end of row or another quote",
704 GetLineNumberStr(linenr, linenr_estimated).c_str());
705 }
706 if (delimiter_pos == info.delimiter.size()) {
707 // quote followed by delimiter, add value
708 offset = info.quote.size() + info.delimiter.size() - 1;
709 goto add_value;
710 } else if (quote_pos == info.quote.size() && (info.escape.size() == 0 || info.escape == info.quote)) {
711 // quote followed by quote, go back to quoted state and add to escape
712 escape_positions.push_back(position - start - (info.quote.size() - 1));
713 goto in_quotes;
714 }
715 }
716 } while (ReadBuffer(start));
717 throw ParserException("Error on line %s: quote should be followed by end of value, end of row or another quote",
718 GetLineNumberStr(linenr, linenr_estimated).c_str());
719handle_escape:
720 escape_pos = 0;
721 quote_pos = 0;
722 position++;
723 do {
724 idx_t count = 0;
725 for (; position < buffer_size; position++) {
726 quote_search.Match(quote_pos, buffer[position]);
727 escape_search.Match(escape_pos, buffer[position]);
728 count++;
729 if (count > escape_pos && count > quote_pos) {
730 throw ParserException("Error on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE",
731 GetLineNumberStr(linenr, linenr_estimated).c_str());
732 }
733 if (quote_pos == info.quote.size() || escape_pos == info.escape.size()) {
734 // found quote or escape: move back to quoted state
735 goto in_quotes;
736 }
737 }
738 } while (ReadBuffer(start));
739 throw ParserException("Error on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE",
740 GetLineNumberStr(linenr, linenr_estimated).c_str());
741carriage_return:
742 /* state: carriage_return */
743 // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line
744 if (buffer[position] == '\n') {
745 // newline after carriage return: skip
746 start = ++position;
747 if (position >= buffer_size && !ReadBuffer(start)) {
748 // file ends right after newline, go to final state
749 goto final_state;
750 }
751 }
752 if (finished_chunk) {
753 return;
754 }
755 goto value_start;
756final_state:
757 if (finished_chunk) {
758 return;
759 }
760 if (column > 0 || position > start) {
761 // remaining values to be added to the chunk
762 AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
763 finished_chunk = AddRow(insert_chunk, column);
764 }
765 // final stage, only reached after parsing the file is finished
766 // flush the parsed chunk and finalize parsing
767 if (mode == ParserMode::PARSING) {
768 Flush(insert_chunk);
769 }
770}
771
772void BufferedCSVReader::ParseSimpleCSV(DataChunk &insert_chunk) {
773 // used for parsing algorithm
774 bool finished_chunk = false;
775 idx_t column = 0;
776 idx_t offset = 0;
777 vector<idx_t> escape_positions;
778
779 // read values into the buffer (if any)
780 if (position >= buffer_size) {
781 if (!ReadBuffer(start)) {
782 return;
783 }
784 }
785 // start parsing the first value
786 goto value_start;
787value_start:
788 offset = 0;
789 /* state: value_start */
790 // this state parses the first character of a value
791 if (buffer[position] == info.quote[0]) {
792 // quote: actual value starts in the next position
793 // move to in_quotes state
794 start = position + 1;
795 goto in_quotes;
796 } else {
797 // no quote, move to normal parsing state
798 start = position;
799 goto normal;
800 }
801normal:
802 /* state: normal parsing state */
803 // this state parses the remainder of a non-quoted value until we reach a delimiter or newline
804 do {
805 for (; position < buffer_size; position++) {
806 if (buffer[position] == info.delimiter[0]) {
807 // delimiter: end the value and add it to the chunk
808 goto add_value;
809 } else if (is_newline(buffer[position])) {
810 // newline: add row
811 goto add_row;
812 }
813 }
814 } while (ReadBuffer(start));
815 // file ends during normal scan: go to end state
816 goto final_state;
817add_value:
818 AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
819 // increase position by 1 and move start to the new position
820 offset = 0;
821 start = ++position;
822 if (position >= buffer_size && !ReadBuffer(start)) {
823 // file ends right after delimiter, go to final state
824 goto final_state;
825 }
826 goto value_start;
827add_row : {
828 // check type of newline (\r or \n)
829 bool carriage_return = buffer[position] == '\r';
830 AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
831 finished_chunk = AddRow(insert_chunk, column);
832 // increase position by 1 and move start to the new position
833 offset = 0;
834 start = ++position;
835 if (position >= buffer_size && !ReadBuffer(start)) {
836 // file ends right after delimiter, go to final state
837 goto final_state;
838 }
839 if (carriage_return) {
840 // \r newline, go to special state that parses an optional \n afterwards
841 goto carriage_return;
842 } else {
843 // \n newline, move to value start
844 if (finished_chunk) {
845 return;
846 }
847 goto value_start;
848 }
849}
850in_quotes:
851 /* state: in_quotes */
852 // this state parses the remainder of a quoted value
853 position++;
854 do {
855 for (; position < buffer_size; position++) {
856 if (buffer[position] == info.quote[0]) {
857 // quote: move to unquoted state
858 goto unquote;
859 } else if (buffer[position] == info.escape[0]) {
860 // escape: store the escaped position and move to handle_escape state
861 escape_positions.push_back(position - start);
862 goto handle_escape;
863 }
864 }
865 } while (ReadBuffer(start));
866 // still in quoted state at the end of the file, error:
867 throw ParserException("Error on line %s: unterminated quotes", GetLineNumberStr(linenr, linenr_estimated).c_str());
868unquote:
869 /* state: unquote */
870 // this state handles the state directly after we unquote
871 // in this state we expect either another quote (entering the quoted state again, and escaping the quote)
872 // or a delimiter/newline, ending the current value and moving on to the next value
873 position++;
874 if (position >= buffer_size && !ReadBuffer(start)) {
875 // file ends right after unquote, go to final state
876 offset = 1;
877 goto final_state;
878 }
879 if (buffer[position] == info.quote[0] && (info.escape.size() == 0 || info.escape[0] == info.quote[0])) {
880 // escaped quote, return to quoted state and store escape position
881 escape_positions.push_back(position - start);
882 goto in_quotes;
883 } else if (buffer[position] == info.delimiter[0]) {
884 // delimiter, add value
885 offset = 1;
886 goto add_value;
887 } else if (is_newline(buffer[position])) {
888 offset = 1;
889 goto add_row;
890 } else {
891 throw ParserException("Error on line %s: quote should be followed by end of value, end of row or another quote",
892 GetLineNumberStr(linenr, linenr_estimated).c_str());
893 }
894handle_escape:
895 /* state: handle_escape */
896 // escape should be followed by a quote or another escape character
897 position++;
898 if (position >= buffer_size && !ReadBuffer(start)) {
899 throw ParserException("Error on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE",
900 GetLineNumberStr(linenr, linenr_estimated).c_str());
901 }
902 if (buffer[position] != info.quote[0] && buffer[position] != info.escape[0]) {
903 throw ParserException("Error on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE",
904 GetLineNumberStr(linenr, linenr_estimated).c_str());
905 }
906 // escape was followed by quote or escape, go back to quoted state
907 goto in_quotes;
908carriage_return:
909 /* state: carriage_return */
910 // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line
911 if (buffer[position] == '\n') {
912 // newline after carriage return: skip
913 // increase position by 1 and move start to the new position
914 start = ++position;
915 if (position >= buffer_size && !ReadBuffer(start)) {
916 // file ends right after delimiter, go to final state
917 goto final_state;
918 }
919 }
920 if (finished_chunk) {
921 return;
922 }
923 goto value_start;
924final_state:
925 if (finished_chunk) {
926 return;
927 }
928
929 if (column > 0 || position > start) {
930 // remaining values to be added to the chunk
931 AddValue(buffer.get() + start, position - start - offset, column, escape_positions);
932 finished_chunk = AddRow(insert_chunk, column);
933 }
934
935 // final stage, only reached after parsing the file is finished
936 // flush the parsed chunk and finalize parsing
937 if (mode == ParserMode::PARSING) {
938 Flush(insert_chunk);
939 }
940}
941
942bool BufferedCSVReader::ReadBuffer(idx_t &start) {
943 auto old_buffer = move(buffer);
944
945 // the remaining part of the last buffer
946 idx_t remaining = buffer_size - start;
947 idx_t buffer_read_size = INITIAL_BUFFER_SIZE;
948 while (remaining > buffer_read_size) {
949 buffer_read_size *= 2;
950 }
951 if (remaining + buffer_read_size > MAXIMUM_CSV_LINE_SIZE) {
952 throw ParserException("Maximum line size of %llu bytes exceeded!", MAXIMUM_CSV_LINE_SIZE);
953 }
954 buffer = unique_ptr<char[]>(new char[buffer_read_size + remaining + 1]);
955 buffer_size = remaining + buffer_read_size;
956 if (remaining > 0) {
957 // remaining from last buffer: copy it here
958 memcpy(buffer.get(), old_buffer.get() + start, remaining);
959 }
960 source->read(buffer.get() + remaining, buffer_read_size);
961
962 idx_t read_count = source->eof() ? source->gcount() : buffer_read_size;
963 bytes_in_chunk += read_count;
964 buffer_size = remaining + read_count;
965 buffer[buffer_size] = '\0';
966 if (old_buffer) {
967 cached_buffers.push_back(move(old_buffer));
968 }
969 start = 0;
970 position = remaining;
971
972 return read_count > 0;
973}
974
975void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) {
976 cached_buffers.clear();
977
978 ParseCSV(ParserMode::PARSING, insert_chunk);
979}
980
981void BufferedCSVReader::ParseCSV(ParserMode parser_mode, DataChunk &insert_chunk) {
982 mode = parser_mode;
983
984 if (info.quote.size() <= 1 && info.escape.size() <= 1 && info.delimiter.size() == 1) {
985 ParseSimpleCSV(insert_chunk);
986 } else {
987 ParseComplexCSV(insert_chunk);
988 }
989}
990
991void BufferedCSVReader::AddValue(char *str_val, idx_t length, idx_t &column, vector<idx_t> &escape_positions) {
992 if (sql_types.size() > 0 && column == sql_types.size() && length == 0) {
993 // skip a single trailing delimiter in last column
994 return;
995 }
996 if (mode == ParserMode::SNIFFING_DIALECT) {
997 column++;
998 return;
999 }
1000 if (column >= sql_types.size()) {
1001 throw ParserException("Error on line %s: expected %lld values but got %d",
1002 GetLineNumberStr(linenr, linenr_estimated).c_str(), sql_types.size(), column + 1);
1003 }
1004
1005 // insert the line number into the chunk
1006 idx_t row_entry = parse_chunk.size();
1007
1008 str_val[length] = '\0';
1009 /*if (info.force_not_null.size() == 0) {
1010 info.force_not_null.resize(sql_types.size(), false);
1011 }*/
1012 // test against null string
1013 if (!info.force_not_null[column] && strcmp(info.null_str.c_str(), str_val) == 0) {
1014 FlatVector::SetNull(parse_chunk.data[column], row_entry, true);
1015 } else {
1016 auto &v = parse_chunk.data[column];
1017 auto parse_data = FlatVector::GetData<string_t>(v);
1018 if (escape_positions.size() > 0) {
1019 // remove escape characters (if any)
1020 string old_val = str_val;
1021 string new_val = "";
1022 idx_t prev_pos = 0;
1023 for (idx_t i = 0; i < escape_positions.size(); i++) {
1024 idx_t next_pos = escape_positions[i];
1025 new_val += old_val.substr(prev_pos, next_pos - prev_pos);
1026
1027 if (info.escape.size() == 0 || info.escape == info.quote) {
1028 prev_pos = next_pos + info.quote.size();
1029 } else {
1030 prev_pos = next_pos + info.escape.size();
1031 }
1032 }
1033 new_val += old_val.substr(prev_pos, old_val.size() - prev_pos);
1034 escape_positions.clear();
1035 parse_data[row_entry] = StringVector::AddString(v, new_val.c_str(), new_val.size());
1036 } else {
1037 parse_data[row_entry] = string_t(str_val, length);
1038 }
1039 }
1040
1041 // move to the next column
1042 column++;
1043}
1044
1045bool BufferedCSVReader::AddRow(DataChunk &insert_chunk, idx_t &column) {
1046 if (column < sql_types.size() && mode != ParserMode::SNIFFING_DIALECT) {
1047 throw ParserException("Error on line %s: expected %lld values but got %d",
1048 GetLineNumberStr(linenr, linenr_estimated).c_str(), sql_types.size(), column);
1049 }
1050
1051 if (mode == ParserMode::SNIFFING_DIALECT) {
1052 sniffed_column_counts.push_back(column);
1053
1054 if (sniffed_column_counts.size() == SAMPLE_CHUNK_SIZE) {
1055 return true;
1056 }
1057 } else {
1058 parse_chunk.SetCardinality(parse_chunk.size() + 1);
1059 }
1060
1061 if (mode == ParserMode::SNIFFING_DATATYPES && parse_chunk.size() == SAMPLE_CHUNK_SIZE) {
1062 return true;
1063 }
1064
1065 if (mode == ParserMode::PARSING && parse_chunk.size() == STANDARD_VECTOR_SIZE) {
1066 Flush(insert_chunk);
1067 return true;
1068 }
1069
1070 column = 0;
1071 linenr++;
1072 return false;
1073}
1074
1075void BufferedCSVReader::Flush(DataChunk &insert_chunk) {
1076 if (parse_chunk.size() == 0) {
1077 return;
1078 }
1079 // convert the columns in the parsed chunk to the types of the table
1080 insert_chunk.SetCardinality(parse_chunk);
1081 for (idx_t col_idx = 0; col_idx < sql_types.size(); col_idx++) {
1082 if (sql_types[col_idx].id == SQLTypeId::VARCHAR) {
1083
1084 // target type is varchar: no need to convert
1085 // just test that all strings are valid utf-8 strings
1086 auto parse_data = FlatVector::GetData<string_t>(parse_chunk.data[col_idx]);
1087 for (idx_t i = 0; i < parse_chunk.size(); i++) {
1088 if (!FlatVector::IsNull(parse_chunk.data[col_idx], i)) {
1089 auto s = parse_data[i];
1090 auto utf_type = Utf8Proc::Analyze(s.GetData(), s.GetSize());
1091 switch (utf_type) {
1092 case UnicodeType::INVALID:
1093 throw ParserException("Error on line %s: file is not valid UTF8",
1094 GetLineNumberStr(linenr, linenr_estimated).c_str());
1095 case UnicodeType::ASCII:
1096 break;
1097 case UnicodeType::UNICODE: {
1098 auto normie = Utf8Proc::Normalize(s.GetData());
1099 parse_data[i] = StringVector::AddString(parse_chunk.data[col_idx], normie);
1100 free(normie);
1101 break;
1102 }
1103 }
1104 }
1105 }
1106
1107 insert_chunk.data[col_idx].Reference(parse_chunk.data[col_idx]);
1108 } else {
1109 // target type is not varchar: perform a cast
1110 VectorOperations::Cast(parse_chunk.data[col_idx], insert_chunk.data[col_idx], SQLType::VARCHAR,
1111 sql_types[col_idx], parse_chunk.size());
1112 }
1113 }
1114 parse_chunk.Reset();
1115}
1116