1 | #include "duckdb/execution/operator/persistent/buffered_csv_reader.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
4 | #include "duckdb/common/file_system.hpp" |
5 | #include "duckdb/common/string_util.hpp" |
6 | #include "duckdb/common/to_string.hpp" |
7 | #include "duckdb/common/types/cast_helpers.hpp" |
8 | #include "duckdb/common/vector_operations/unary_executor.hpp" |
9 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
10 | #include "duckdb/function/scalar/strftime_format.hpp" |
11 | #include "duckdb/main/database.hpp" |
12 | #include "duckdb/parser/column_definition.hpp" |
13 | #include "duckdb/storage/data_table.hpp" |
14 | #include "utf8proc_wrapper.hpp" |
15 | #include "utf8proc.hpp" |
16 | #include "duckdb/parser/keyword_helper.hpp" |
17 | #include "duckdb/main/error_manager.hpp" |
18 | #include "duckdb/main/client_data.hpp" |
19 | |
20 | #include <algorithm> |
21 | #include <cctype> |
22 | #include <cstring> |
23 | #include <fstream> |
24 | |
25 | namespace duckdb { |
26 | |
27 | BufferedCSVReader::BufferedCSVReader(ClientContext &context, BufferedCSVReaderOptions options_p, |
28 | const vector<LogicalType> &requested_types) |
29 | : BaseCSVReader(context, std::move(options_p), requested_types), buffer_size(0), position(0), start(0) { |
30 | file_handle = OpenCSV(options_p: options); |
31 | Initialize(requested_types); |
32 | } |
33 | |
34 | BufferedCSVReader::BufferedCSVReader(ClientContext &context, string filename, BufferedCSVReaderOptions options_p, |
35 | const vector<LogicalType> &requested_types) |
36 | : BaseCSVReader(context, std::move(options_p), requested_types), buffer_size(0), position(0), start(0) { |
37 | options.file_path = std::move(filename); |
38 | file_handle = OpenCSV(options_p: options); |
39 | Initialize(requested_types); |
40 | } |
41 | |
42 | enum class QuoteRule : uint8_t { QUOTES_RFC = 0, QUOTES_OTHER = 1, NO_QUOTES = 2 }; |
43 | |
44 | static bool StartsWithNumericDate(string &separator, const string &value) { |
45 | auto begin = value.c_str(); |
46 | auto end = begin + value.size(); |
47 | |
48 | // StrpTimeFormat::Parse will skip whitespace, so we can too |
49 | auto field1 = std::find_if_not(first: begin, last: end, pred: StringUtil::CharacterIsSpace); |
50 | if (field1 == end) { |
51 | return false; |
52 | } |
53 | |
54 | // first numeric field must start immediately |
55 | if (!StringUtil::CharacterIsDigit(c: *field1)) { |
56 | return false; |
57 | } |
58 | auto literal1 = std::find_if_not(first: field1, last: end, pred: StringUtil::CharacterIsDigit); |
59 | if (literal1 == end) { |
60 | return false; |
61 | } |
62 | |
63 | // second numeric field must exist |
64 | auto field2 = std::find_if(first: literal1, last: end, pred: StringUtil::CharacterIsDigit); |
65 | if (field2 == end) { |
66 | return false; |
67 | } |
68 | auto literal2 = std::find_if_not(first: field2, last: end, pred: StringUtil::CharacterIsDigit); |
69 | if (literal2 == end) { |
70 | return false; |
71 | } |
72 | |
73 | // third numeric field must exist |
74 | auto field3 = std::find_if(first: literal2, last: end, pred: StringUtil::CharacterIsDigit); |
75 | if (field3 == end) { |
76 | return false; |
77 | } |
78 | |
79 | // second literal must match first |
80 | if (((field3 - literal2) != (field2 - literal1)) || strncmp(s1: literal1, s2: literal2, n: (field2 - literal1)) != 0) { |
81 | return false; |
82 | } |
83 | |
84 | // copy the literal as the separator, escaping percent signs |
85 | separator.clear(); |
86 | while (literal1 < field2) { |
87 | const auto literal_char = *literal1++; |
88 | if (literal_char == '%') { |
89 | separator.push_back(c: literal_char); |
90 | } |
91 | separator.push_back(c: literal_char); |
92 | } |
93 | |
94 | return true; |
95 | } |
96 | |
97 | string GenerateDateFormat(const string &separator, const char *format_template) { |
98 | string format_specifier = format_template; |
99 | auto amount_of_dashes = std::count(first: format_specifier.begin(), last: format_specifier.end(), value: '-'); |
100 | if (!amount_of_dashes) { |
101 | return format_specifier; |
102 | } |
103 | string result; |
104 | result.reserve(res_arg: format_specifier.size() - amount_of_dashes + (amount_of_dashes * separator.size())); |
105 | for (auto &character : format_specifier) { |
106 | if (character == '-') { |
107 | result += separator; |
108 | } else { |
109 | result += character; |
110 | } |
111 | } |
112 | return result; |
113 | } |
114 | |
115 | TextSearchShiftArray::TextSearchShiftArray() { |
116 | } |
117 | |
118 | TextSearchShiftArray::TextSearchShiftArray(string search_term) : length(search_term.size()) { |
119 | if (length > 255) { |
120 | throw InvalidInputException("Size of delimiter/quote/escape in CSV reader is limited to 255 bytes" ); |
121 | } |
122 | // initialize the shifts array |
123 | shifts = unique_ptr<uint8_t[]>(new uint8_t[length * 255]); |
124 | memset(s: shifts.get(), c: 0, n: length * 255 * sizeof(uint8_t)); |
125 | // iterate over each of the characters in the array |
126 | for (idx_t main_idx = 0; main_idx < length; main_idx++) { |
127 | uint8_t current_char = (uint8_t)search_term[main_idx]; |
128 | // now move over all the remaining positions |
129 | for (idx_t i = main_idx; i < length; i++) { |
130 | bool is_match = true; |
131 | // check if the prefix matches at this position |
132 | // if it does, we move to this position after encountering the current character |
133 | for (idx_t j = 0; j < main_idx; j++) { |
134 | if (search_term[i - main_idx + j] != search_term[j]) { |
135 | is_match = false; |
136 | } |
137 | } |
138 | if (!is_match) { |
139 | continue; |
140 | } |
141 | shifts[i * 255 + current_char] = main_idx + 1; |
142 | } |
143 | } |
144 | } |
145 | |
146 | // Helper function to generate column names |
147 | static string GenerateColumnName(const idx_t total_cols, const idx_t col_number, const string &prefix = "column" ) { |
148 | int max_digits = NumericHelper::UnsignedLength(value: total_cols - 1); |
149 | int digits = NumericHelper::UnsignedLength(value: col_number); |
150 | string leading_zeros = string(max_digits - digits, '0'); |
151 | string value = to_string(val: col_number); |
152 | return string(prefix + leading_zeros + value); |
153 | } |
154 | |
155 | // Helper function for UTF-8 aware space trimming |
156 | static string TrimWhitespace(const string &col_name) { |
157 | utf8proc_int32_t codepoint; |
158 | auto str = reinterpret_cast<const utf8proc_uint8_t *>(col_name.c_str()); |
159 | idx_t size = col_name.size(); |
160 | // Find the first character that is not left trimmed |
161 | idx_t begin = 0; |
162 | while (begin < size) { |
163 | auto bytes = utf8proc_iterate(str: str + begin, strlen: size - begin, codepoint_ref: &codepoint); |
164 | D_ASSERT(bytes > 0); |
165 | if (utf8proc_category(codepoint) != UTF8PROC_CATEGORY_ZS) { |
166 | break; |
167 | } |
168 | begin += bytes; |
169 | } |
170 | |
171 | // Find the last character that is not right trimmed |
172 | idx_t end; |
173 | end = begin; |
174 | for (auto next = begin; next < col_name.size();) { |
175 | auto bytes = utf8proc_iterate(str: str + next, strlen: size - next, codepoint_ref: &codepoint); |
176 | D_ASSERT(bytes > 0); |
177 | next += bytes; |
178 | if (utf8proc_category(codepoint) != UTF8PROC_CATEGORY_ZS) { |
179 | end = next; |
180 | } |
181 | } |
182 | |
183 | // return the trimmed string |
184 | return col_name.substr(pos: begin, n: end - begin); |
185 | } |
186 | |
187 | static string NormalizeColumnName(const string &col_name) { |
188 | // normalize UTF8 characters to NFKD |
189 | auto nfkd = utf8proc_NFKD(str: reinterpret_cast<const utf8proc_uint8_t *>(col_name.c_str()), len: col_name.size()); |
190 | const string col_name_nfkd = string(const_char_ptr_cast(src: nfkd), strlen(s: const_char_ptr_cast(src: nfkd))); |
191 | free(ptr: nfkd); |
192 | |
193 | // only keep ASCII characters 0-9 a-z A-Z and replace spaces with regular whitespace |
194 | string col_name_ascii = "" ; |
195 | for (idx_t i = 0; i < col_name_nfkd.size(); i++) { |
196 | if (col_name_nfkd[i] == '_' || (col_name_nfkd[i] >= '0' && col_name_nfkd[i] <= '9') || |
197 | (col_name_nfkd[i] >= 'A' && col_name_nfkd[i] <= 'Z') || |
198 | (col_name_nfkd[i] >= 'a' && col_name_nfkd[i] <= 'z')) { |
199 | col_name_ascii += col_name_nfkd[i]; |
200 | } else if (StringUtil::CharacterIsSpace(c: col_name_nfkd[i])) { |
201 | col_name_ascii += " " ; |
202 | } |
203 | } |
204 | |
205 | // trim whitespace and replace remaining whitespace by _ |
206 | string col_name_trimmed = TrimWhitespace(col_name: col_name_ascii); |
207 | string col_name_cleaned = "" ; |
208 | bool in_whitespace = false; |
209 | for (idx_t i = 0; i < col_name_trimmed.size(); i++) { |
210 | if (col_name_trimmed[i] == ' ') { |
211 | if (!in_whitespace) { |
212 | col_name_cleaned += "_" ; |
213 | in_whitespace = true; |
214 | } |
215 | } else { |
216 | col_name_cleaned += col_name_trimmed[i]; |
217 | in_whitespace = false; |
218 | } |
219 | } |
220 | |
221 | // don't leave string empty; if not empty, make lowercase |
222 | if (col_name_cleaned.empty()) { |
223 | col_name_cleaned = "_" ; |
224 | } else { |
225 | col_name_cleaned = StringUtil::Lower(str: col_name_cleaned); |
226 | } |
227 | |
228 | // prepend _ if name starts with a digit or is a reserved keyword |
229 | if (KeywordHelper::IsKeyword(text: col_name_cleaned) || (col_name_cleaned[0] >= '0' && col_name_cleaned[0] <= '9')) { |
230 | col_name_cleaned = "_" + col_name_cleaned; |
231 | } |
232 | return col_name_cleaned; |
233 | } |
234 | |
235 | void BufferedCSVReader::Initialize(const vector<LogicalType> &requested_types) { |
236 | PrepareComplexParser(); |
237 | if (options.auto_detect) { |
238 | return_types = SniffCSV(requested_types); |
239 | if (return_types.empty()) { |
240 | throw InvalidInputException("Failed to detect column types from CSV: is the file a valid CSV file?" ); |
241 | } |
242 | JumpToBeginning(skip_rows: options.skip_rows, skip_header: options.header); |
243 | } else { |
244 | return_types = requested_types; |
245 | ResetBuffer(); |
246 | SkipRowsAndReadHeader(skip_rows: options.skip_rows, skip_header: options.header); |
247 | } |
248 | InitParseChunk(num_cols: return_types.size()); |
249 | } |
250 | |
251 | void BufferedCSVReader::ResetBuffer() { |
252 | buffer.reset(); |
253 | buffer_size = 0; |
254 | position = 0; |
255 | start = 0; |
256 | cached_buffers.clear(); |
257 | } |
258 | |
259 | void BufferedCSVReader::ResetStream() { |
260 | file_handle->Reset(); |
261 | linenr = 0; |
262 | linenr_estimated = false; |
263 | bytes_per_line_avg = 0; |
264 | sample_chunk_idx = 0; |
265 | jumping_samples = false; |
266 | } |
267 | |
268 | void BufferedCSVReader::JumpToBeginning(idx_t skip_rows = 0, bool = false) { |
269 | ResetBuffer(); |
270 | ResetStream(); |
271 | sample_chunk_idx = 0; |
272 | bytes_in_chunk = 0; |
273 | end_of_file_reached = false; |
274 | bom_checked = false; |
275 | SkipRowsAndReadHeader(skip_rows, skip_header); |
276 | } |
277 | |
278 | void BufferedCSVReader::SkipRowsAndReadHeader(idx_t skip_rows, bool ) { |
279 | for (idx_t i = 0; i < skip_rows; i++) { |
280 | // ignore skip rows |
281 | string read_line = file_handle->ReadLine(); |
282 | linenr++; |
283 | } |
284 | |
285 | if (skip_header) { |
286 | // ignore the first line as a header line |
287 | InitParseChunk(num_cols: return_types.size()); |
288 | ParseCSV(mode: ParserMode::PARSING_HEADER); |
289 | } |
290 | } |
291 | |
292 | void BufferedCSVReader::PrepareComplexParser() { |
293 | delimiter_search = TextSearchShiftArray(options.delimiter); |
294 | escape_search = TextSearchShiftArray(options.escape); |
295 | quote_search = TextSearchShiftArray(options.quote); |
296 | } |
297 | |
298 | bool BufferedCSVReader::JumpToNextSample() { |
299 | // get bytes contained in the previously read chunk |
300 | idx_t remaining_bytes_in_buffer = buffer_size - start; |
301 | bytes_in_chunk -= remaining_bytes_in_buffer; |
302 | if (remaining_bytes_in_buffer == 0) { |
303 | return false; |
304 | } |
305 | |
306 | // assess if it makes sense to jump, based on size of the first chunk relative to size of the entire file |
307 | if (sample_chunk_idx == 0) { |
308 | idx_t bytes_first_chunk = bytes_in_chunk; |
309 | double chunks_fit = (file_handle->FileSize() / (double)bytes_first_chunk); |
310 | jumping_samples = chunks_fit >= options.sample_chunks; |
311 | |
312 | // jump back to the beginning |
313 | JumpToBeginning(skip_rows: options.skip_rows, skip_header: options.header); |
314 | sample_chunk_idx++; |
315 | return true; |
316 | } |
317 | |
318 | if (end_of_file_reached || sample_chunk_idx >= options.sample_chunks) { |
319 | return false; |
320 | } |
321 | |
322 | // if we deal with any other sources than plaintext files, jumping_samples can be tricky. In that case |
323 | // we just read x continuous chunks from the stream TODO: make jumps possible for zipfiles. |
324 | if (!file_handle->OnDiskFile() || !jumping_samples) { |
325 | sample_chunk_idx++; |
326 | return true; |
327 | } |
328 | |
329 | // update average bytes per line |
330 | double bytes_per_line = bytes_in_chunk / (double)options.sample_chunk_size; |
331 | bytes_per_line_avg = ((bytes_per_line_avg * (sample_chunk_idx)) + bytes_per_line) / (sample_chunk_idx + 1); |
332 | |
333 | // if none of the previous conditions were met, we can jump |
334 | idx_t partition_size = (idx_t)round(x: file_handle->FileSize() / (double)options.sample_chunks); |
335 | |
336 | // calculate offset to end of the current partition |
337 | int64_t offset = partition_size - bytes_in_chunk - remaining_bytes_in_buffer; |
338 | auto current_pos = file_handle->SeekPosition(); |
339 | |
340 | if (current_pos + offset < file_handle->FileSize()) { |
341 | // set position in stream and clear failure bits |
342 | file_handle->Seek(position: current_pos + offset); |
343 | |
344 | // estimate linenr |
345 | linenr += (idx_t)round(x: (offset + remaining_bytes_in_buffer) / bytes_per_line_avg); |
346 | linenr_estimated = true; |
347 | } else { |
348 | // seek backwards from the end in last chunk and hope to catch the end of the file |
349 | // TODO: actually it would be good to make sure that the end of file is being reached, because |
350 | // messy end-lines are quite common. For this case, however, we first need a skip_end detection anyways. |
351 | file_handle->Seek(position: file_handle->FileSize() - bytes_in_chunk); |
352 | |
353 | // estimate linenr |
354 | linenr = (idx_t)round(x: (file_handle->FileSize() - bytes_in_chunk) / bytes_per_line_avg); |
355 | linenr_estimated = true; |
356 | } |
357 | |
358 | // reset buffers and parse chunk |
359 | ResetBuffer(); |
360 | |
361 | // seek beginning of next line |
362 | // FIXME: if this jump ends up in a quoted linebreak, we will have a problem |
363 | string read_line = file_handle->ReadLine(); |
364 | linenr++; |
365 | |
366 | sample_chunk_idx++; |
367 | |
368 | return true; |
369 | } |
370 | |
371 | void BufferedCSVReader::DetectDialect(const vector<LogicalType> &requested_types, |
372 | BufferedCSVReaderOptions &original_options, |
373 | vector<BufferedCSVReaderOptions> &info_candidates, idx_t &best_num_cols) { |
374 | // set up the candidates we consider for delimiter and quote rules based on user input |
375 | vector<string> delim_candidates; |
376 | vector<QuoteRule> quoterule_candidates; |
377 | vector<vector<string>> quote_candidates_map; |
378 | vector<vector<string>> escape_candidates_map = {{"" }, {"\\" }, {"" }}; |
379 | |
380 | if (options.has_delimiter) { |
381 | // user provided a delimiter: use that delimiter |
382 | delim_candidates = {options.delimiter}; |
383 | } else { |
384 | // no delimiter provided: try standard/common delimiters |
385 | delim_candidates = {"," , "|" , ";" , "\t" }; |
386 | } |
387 | if (options.has_quote) { |
388 | // user provided quote: use that quote rule |
389 | quote_candidates_map = {{options.quote}, {options.quote}, {options.quote}}; |
390 | } else { |
391 | // no quote rule provided: use standard/common quotes |
392 | quote_candidates_map = {{"\"" }, {"\"" , "'" }, {"" }}; |
393 | } |
394 | if (options.has_escape) { |
395 | // user provided escape: use that escape rule |
396 | if (options.escape.empty()) { |
397 | quoterule_candidates = {QuoteRule::QUOTES_RFC}; |
398 | } else { |
399 | quoterule_candidates = {QuoteRule::QUOTES_OTHER}; |
400 | } |
401 | escape_candidates_map[static_cast<uint8_t>(quoterule_candidates[0])] = {options.escape}; |
402 | } else { |
403 | // no escape provided: try standard/common escapes |
404 | quoterule_candidates = {QuoteRule::QUOTES_RFC, QuoteRule::QUOTES_OTHER, QuoteRule::NO_QUOTES}; |
405 | } |
406 | |
407 | idx_t best_consistent_rows = 0; |
408 | idx_t prev_padding_count = 0; |
409 | for (auto quoterule : quoterule_candidates) { |
410 | const auto "e_candidates = quote_candidates_map[static_cast<uint8_t>(quoterule)]; |
411 | for (const auto "e : quote_candidates) { |
412 | for (const auto &delim : delim_candidates) { |
413 | const auto &escape_candidates = escape_candidates_map[static_cast<uint8_t>(quoterule)]; |
414 | for (const auto &escape : escape_candidates) { |
415 | BufferedCSVReaderOptions sniff_info = original_options; |
416 | sniff_info.delimiter = delim; |
417 | sniff_info.quote = quote; |
418 | sniff_info.escape = escape; |
419 | |
420 | options = sniff_info; |
421 | PrepareComplexParser(); |
422 | |
423 | JumpToBeginning(skip_rows: original_options.skip_rows); |
424 | sniffed_column_counts.clear(); |
425 | if (!TryParseCSV(mode: ParserMode::SNIFFING_DIALECT)) { |
426 | continue; |
427 | } |
428 | |
429 | idx_t start_row = original_options.skip_rows; |
430 | idx_t consistent_rows = 0; |
431 | idx_t num_cols = sniffed_column_counts.empty() ? 0 : sniffed_column_counts[0]; |
432 | idx_t padding_count = 0; |
433 | bool allow_padding = original_options.null_padding; |
434 | for (idx_t row = 0; row < sniffed_column_counts.size(); row++) { |
435 | if (sniffed_column_counts[row] == num_cols) { |
436 | consistent_rows++; |
437 | } else if (num_cols < sniffed_column_counts[row] && !original_options.skip_rows_set) { |
438 | // we use the maximum amount of num_cols that we find |
439 | num_cols = sniffed_column_counts[row]; |
440 | start_row = row + original_options.skip_rows; |
441 | consistent_rows = 1; |
442 | padding_count = 0; |
443 | } else if (num_cols >= sniffed_column_counts[row] && allow_padding) { |
444 | // we are missing some columns, we can parse this as long as we add padding |
445 | padding_count++; |
446 | } |
447 | } |
448 | |
449 | // some logic |
450 | consistent_rows += padding_count; |
451 | bool more_values = (consistent_rows > best_consistent_rows && num_cols >= best_num_cols); |
452 | bool require_more_padding = padding_count > prev_padding_count; |
453 | bool require_less_padding = padding_count < prev_padding_count; |
454 | bool single_column_before = best_num_cols < 2 && num_cols > best_num_cols; |
455 | bool rows_consistent = |
456 | start_row + consistent_rows - original_options.skip_rows == sniffed_column_counts.size(); |
457 | bool more_than_one_row = (consistent_rows > 1); |
458 | bool more_than_one_column = (num_cols > 1); |
459 | bool start_good = !info_candidates.empty() && (start_row <= info_candidates.front().skip_rows); |
460 | |
461 | if (!requested_types.empty() && requested_types.size() != num_cols) { |
462 | continue; |
463 | } else if (rows_consistent && (single_column_before || (more_values && !require_more_padding) || |
464 | (more_than_one_column && require_less_padding))) { |
465 | sniff_info.skip_rows = start_row; |
466 | sniff_info.num_cols = num_cols; |
467 | sniff_info.new_line = options.new_line; |
468 | best_consistent_rows = consistent_rows; |
469 | best_num_cols = num_cols; |
470 | prev_padding_count = padding_count; |
471 | |
472 | info_candidates.clear(); |
473 | info_candidates.push_back(x: sniff_info); |
474 | } else if (more_than_one_row && more_than_one_column && start_good && rows_consistent && |
475 | !require_more_padding) { |
476 | bool same_quote_is_candidate = false; |
477 | for (auto &info_candidate : info_candidates) { |
478 | if (quote.compare(str: info_candidate.quote) == 0) { |
479 | same_quote_is_candidate = true; |
480 | } |
481 | } |
482 | if (!same_quote_is_candidate) { |
483 | sniff_info.skip_rows = start_row; |
484 | sniff_info.num_cols = num_cols; |
485 | sniff_info.new_line = options.new_line; |
486 | info_candidates.push_back(x: sniff_info); |
487 | } |
488 | } |
489 | } |
490 | } |
491 | } |
492 | } |
493 | } |
494 | |
495 | void BufferedCSVReader::DetectCandidateTypes(const vector<LogicalType> &type_candidates, |
496 | const map<LogicalTypeId, vector<const char *>> &format_template_candidates, |
497 | const vector<BufferedCSVReaderOptions> &info_candidates, |
498 | BufferedCSVReaderOptions &original_options, idx_t best_num_cols, |
499 | vector<vector<LogicalType>> &best_sql_types_candidates, |
500 | std::map<LogicalTypeId, vector<string>> &best_format_candidates, |
501 | DataChunk &) { |
502 | BufferedCSVReaderOptions best_options; |
503 | idx_t min_varchar_cols = best_num_cols + 1; |
504 | |
505 | // check which info candidate leads to minimum amount of non-varchar columns... |
506 | for (const auto &t : format_template_candidates) { |
507 | best_format_candidates[t.first].clear(); |
508 | } |
509 | for (auto &info_candidate : info_candidates) { |
510 | options = info_candidate; |
511 | vector<vector<LogicalType>> info_sql_types_candidates(options.num_cols, type_candidates); |
512 | std::map<LogicalTypeId, bool> has_format_candidates; |
513 | std::map<LogicalTypeId, vector<string>> format_candidates; |
514 | for (const auto &t : format_template_candidates) { |
515 | has_format_candidates[t.first] = false; |
516 | format_candidates[t.first].clear(); |
517 | } |
518 | |
519 | // set all return_types to VARCHAR so we can do datatype detection based on VARCHAR values |
520 | return_types.clear(); |
521 | return_types.assign(n: options.num_cols, val: LogicalType::VARCHAR); |
522 | |
523 | // jump to beginning and skip potential header |
524 | JumpToBeginning(skip_rows: options.skip_rows, skip_header: true); |
525 | DataChunk ; |
526 | header_row.Initialize(allocator, types: return_types); |
527 | parse_chunk.Copy(other&: header_row); |
528 | |
529 | if (header_row.size() == 0) { |
530 | continue; |
531 | } |
532 | |
533 | // init parse chunk and read csv with info candidate |
534 | InitParseChunk(num_cols: return_types.size()); |
535 | if (!TryParseCSV(mode: ParserMode::SNIFFING_DATATYPES)) { |
536 | continue; |
537 | } |
538 | for (idx_t row_idx = 0; row_idx <= parse_chunk.size(); row_idx++) { |
539 | bool = row_idx == 0; |
540 | idx_t row = row_idx - 1; |
541 | for (idx_t col = 0; col < parse_chunk.ColumnCount(); col++) { |
542 | auto &col_type_candidates = info_sql_types_candidates[col]; |
543 | while (col_type_candidates.size() > 1) { |
544 | const auto &sql_type = col_type_candidates.back(); |
545 | // try cast from string to sql_type |
546 | Value dummy_val; |
547 | if (is_header_row) { |
548 | VerifyUTF8(col_idx: col, row_idx: 0, chunk&: header_row, offset: -int64_t(parse_chunk.size())); |
549 | dummy_val = header_row.GetValue(col_idx: col, index: 0); |
550 | } else { |
551 | VerifyUTF8(col_idx: col, row_idx: row, chunk&: parse_chunk); |
552 | dummy_val = parse_chunk.GetValue(col_idx: col, index: row); |
553 | } |
554 | // try formatting for date types if the user did not specify one and it starts with numeric values. |
555 | string separator; |
556 | if (has_format_candidates.count(x: sql_type.id()) && !original_options.has_format[sql_type.id()] && |
557 | !dummy_val.IsNull() && StartsWithNumericDate(separator, value: StringValue::Get(value: dummy_val))) { |
558 | // generate date format candidates the first time through |
559 | auto &type_format_candidates = format_candidates[sql_type.id()]; |
560 | const auto had_format_candidates = has_format_candidates[sql_type.id()]; |
561 | if (!has_format_candidates[sql_type.id()]) { |
562 | has_format_candidates[sql_type.id()] = true; |
563 | // order by preference |
564 | auto entry = format_template_candidates.find(x: sql_type.id()); |
565 | if (entry != format_template_candidates.end()) { |
566 | const auto &format_template_list = entry->second; |
567 | for (const auto &t : format_template_list) { |
568 | const auto format_string = GenerateDateFormat(separator, format_template: t); |
569 | // don't parse ISO 8601 |
570 | if (format_string.find(s: "%Y-%m-%d" ) == string::npos) { |
571 | type_format_candidates.emplace_back(args: format_string); |
572 | } |
573 | } |
574 | } |
575 | // initialise the first candidate |
576 | options.has_format[sql_type.id()] = true; |
577 | // all formats are constructed to be valid |
578 | SetDateFormat(format_specifier: type_format_candidates.back(), sql_type: sql_type.id()); |
579 | } |
580 | // check all formats and keep the first one that works |
581 | StrpTimeFormat::ParseResult result; |
582 | auto save_format_candidates = type_format_candidates; |
583 | while (!type_format_candidates.empty()) { |
584 | // avoid using exceptions for flow control... |
585 | auto ¤t_format = options.date_format[sql_type.id()]; |
586 | if (current_format.Parse(str: StringValue::Get(value: dummy_val), result)) { |
587 | break; |
588 | } |
589 | // doesn't work - move to the next one |
590 | type_format_candidates.pop_back(); |
591 | options.has_format[sql_type.id()] = (!type_format_candidates.empty()); |
592 | if (!type_format_candidates.empty()) { |
593 | SetDateFormat(format_specifier: type_format_candidates.back(), sql_type: sql_type.id()); |
594 | } |
595 | } |
596 | // if none match, then this is not a value of type sql_type, |
597 | if (type_format_candidates.empty()) { |
598 | // so restore the candidates that did work. |
599 | // or throw them out if they were generated by this value. |
600 | if (had_format_candidates) { |
601 | type_format_candidates.swap(x&: save_format_candidates); |
602 | if (!type_format_candidates.empty()) { |
603 | SetDateFormat(format_specifier: type_format_candidates.back(), sql_type: sql_type.id()); |
604 | } |
605 | } else { |
606 | has_format_candidates[sql_type.id()] = false; |
607 | } |
608 | } |
609 | } |
610 | // try cast from string to sql_type |
611 | if (TryCastValue(value: dummy_val, sql_type)) { |
612 | break; |
613 | } else { |
614 | col_type_candidates.pop_back(); |
615 | } |
616 | } |
617 | } |
618 | // reset type detection, because first row could be header, |
619 | // but only do it if csv has more than one line (including header) |
620 | if (parse_chunk.size() > 0 && is_header_row) { |
621 | info_sql_types_candidates = vector<vector<LogicalType>>(options.num_cols, type_candidates); |
622 | for (auto &f : format_candidates) { |
623 | f.second.clear(); |
624 | } |
625 | for (auto &h : has_format_candidates) { |
626 | h.second = false; |
627 | } |
628 | } |
629 | } |
630 | |
631 | idx_t varchar_cols = 0; |
632 | for (idx_t col = 0; col < parse_chunk.ColumnCount(); col++) { |
633 | auto &col_type_candidates = info_sql_types_candidates[col]; |
634 | // check number of varchar columns |
635 | const auto &col_type = col_type_candidates.back(); |
636 | if (col_type == LogicalType::VARCHAR) { |
637 | varchar_cols++; |
638 | } |
639 | } |
640 | |
641 | // it's good if the dialect creates more non-varchar columns, but only if we sacrifice < 30% of best_num_cols. |
642 | if (varchar_cols < min_varchar_cols && parse_chunk.ColumnCount() > (best_num_cols * 0.7)) { |
643 | // we have a new best_options candidate |
644 | best_options = info_candidate; |
645 | min_varchar_cols = varchar_cols; |
646 | best_sql_types_candidates = info_sql_types_candidates; |
647 | best_format_candidates = format_candidates; |
648 | best_header_row.Destroy(); |
649 | auto = header_row.GetTypes(); |
650 | best_header_row.Initialize(allocator, types: header_row_types); |
651 | header_row.Copy(other&: best_header_row); |
652 | } |
653 | } |
654 | |
655 | options = best_options; |
656 | for (const auto &best : best_format_candidates) { |
657 | if (!best.second.empty()) { |
658 | SetDateFormat(format_specifier: best.second.back(), sql_type: best.first); |
659 | } |
660 | } |
661 | } |
662 | |
663 | void BufferedCSVReader::(const vector<vector<LogicalType>> &best_sql_types_candidates, |
664 | const DataChunk &) { |
665 | // information for header detection |
666 | bool first_row_consistent = true; |
667 | bool first_row_nulls = false; |
668 | |
669 | // check if header row is all null and/or consistent with detected column data types |
670 | first_row_nulls = true; |
671 | for (idx_t col = 0; col < best_sql_types_candidates.size(); col++) { |
672 | auto dummy_val = best_header_row.GetValue(col_idx: col, index: 0); |
673 | if (!dummy_val.IsNull()) { |
674 | first_row_nulls = false; |
675 | } |
676 | |
677 | // try cast to sql_type of column |
678 | const auto &sql_type = best_sql_types_candidates[col].back(); |
679 | if (!TryCastValue(value: dummy_val, sql_type)) { |
680 | first_row_consistent = false; |
681 | } |
682 | } |
683 | |
684 | // update parser info, and read, generate & set col_names based on previous findings |
685 | if (((!first_row_consistent || first_row_nulls) && !options.has_header) || (options.has_header && options.header)) { |
686 | options.header = true; |
687 | case_insensitive_map_t<idx_t> name_collision_count; |
688 | // get header names from CSV |
689 | for (idx_t col = 0; col < options.num_cols; col++) { |
690 | const auto &val = best_header_row.GetValue(col_idx: col, index: 0); |
691 | string col_name = val.ToString(); |
692 | |
693 | // generate name if field is empty |
694 | if (col_name.empty() || val.IsNull()) { |
695 | col_name = GenerateColumnName(total_cols: options.num_cols, col_number: col); |
696 | } |
697 | |
698 | // normalize names or at least trim whitespace |
699 | if (options.normalize_names) { |
700 | col_name = NormalizeColumnName(col_name); |
701 | } else { |
702 | col_name = TrimWhitespace(col_name); |
703 | } |
704 | |
705 | // avoid duplicate header names |
706 | const string col_name_raw = col_name; |
707 | while (name_collision_count.find(x: col_name) != name_collision_count.end()) { |
708 | name_collision_count[col_name] += 1; |
709 | col_name = col_name + "_" + to_string(val: name_collision_count[col_name]); |
710 | } |
711 | |
712 | names.push_back(x: col_name); |
713 | name_collision_count[col_name] = 0; |
714 | } |
715 | |
716 | } else { |
717 | options.header = false; |
718 | for (idx_t col = 0; col < options.num_cols; col++) { |
719 | string column_name = GenerateColumnName(total_cols: options.num_cols, col_number: col); |
720 | names.push_back(x: column_name); |
721 | } |
722 | } |
723 | for (idx_t i = 0; i < MinValue<idx_t>(a: names.size(), b: options.name_list.size()); i++) { |
724 | names[i] = options.name_list[i]; |
725 | } |
726 | } |
727 | |
728 | vector<LogicalType> BufferedCSVReader::RefineTypeDetection(const vector<LogicalType> &type_candidates, |
729 | const vector<LogicalType> &requested_types, |
730 | vector<vector<LogicalType>> &best_sql_types_candidates, |
731 | map<LogicalTypeId, vector<string>> &best_format_candidates) { |
732 | // for the type refine we set the SQL types to VARCHAR for all columns |
733 | return_types.clear(); |
734 | return_types.assign(n: options.num_cols, val: LogicalType::VARCHAR); |
735 | |
736 | vector<LogicalType> detected_types; |
737 | |
738 | // if data types were provided, exit here if number of columns does not match |
739 | if (!requested_types.empty()) { |
740 | if (requested_types.size() != options.num_cols) { |
741 | throw InvalidInputException( |
742 | "Error while determining column types: found %lld columns but expected %d. (%s)" , options.num_cols, |
743 | requested_types.size(), options.ToString()); |
744 | } else { |
745 | detected_types = requested_types; |
746 | } |
747 | } else if (options.all_varchar) { |
748 | // return all types varchar |
749 | detected_types = return_types; |
750 | } else { |
751 | // jump through the rest of the file and continue to refine the sql type guess |
752 | while (JumpToNextSample()) { |
753 | InitParseChunk(num_cols: return_types.size()); |
754 | // if jump ends up a bad line, we just skip this chunk |
755 | if (!TryParseCSV(mode: ParserMode::SNIFFING_DATATYPES)) { |
756 | continue; |
757 | } |
758 | for (idx_t col = 0; col < parse_chunk.ColumnCount(); col++) { |
759 | vector<LogicalType> &col_type_candidates = best_sql_types_candidates[col]; |
760 | while (col_type_candidates.size() > 1) { |
761 | const auto &sql_type = col_type_candidates.back(); |
762 | // narrow down the date formats |
763 | if (best_format_candidates.count(x: sql_type.id())) { |
764 | auto &best_type_format_candidates = best_format_candidates[sql_type.id()]; |
765 | auto save_format_candidates = best_type_format_candidates; |
766 | while (!best_type_format_candidates.empty()) { |
767 | if (TryCastVector(parse_chunk_col&: parse_chunk.data[col], size: parse_chunk.size(), sql_type)) { |
768 | break; |
769 | } |
770 | // doesn't work - move to the next one |
771 | best_type_format_candidates.pop_back(); |
772 | options.has_format[sql_type.id()] = (!best_type_format_candidates.empty()); |
773 | if (!best_type_format_candidates.empty()) { |
774 | SetDateFormat(format_specifier: best_type_format_candidates.back(), sql_type: sql_type.id()); |
775 | } |
776 | } |
777 | // if none match, then this is not a column of type sql_type, |
778 | if (best_type_format_candidates.empty()) { |
779 | // so restore the candidates that did work. |
780 | best_type_format_candidates.swap(x&: save_format_candidates); |
781 | if (!best_type_format_candidates.empty()) { |
782 | SetDateFormat(format_specifier: best_type_format_candidates.back(), sql_type: sql_type.id()); |
783 | } |
784 | } |
785 | } |
786 | |
787 | if (TryCastVector(parse_chunk_col&: parse_chunk.data[col], size: parse_chunk.size(), sql_type)) { |
788 | break; |
789 | } else { |
790 | col_type_candidates.pop_back(); |
791 | } |
792 | } |
793 | } |
794 | } |
795 | |
796 | // set sql types |
797 | for (auto &best_sql_types_candidate : best_sql_types_candidates) { |
798 | LogicalType d_type = best_sql_types_candidate.back(); |
799 | if (best_sql_types_candidate.size() == type_candidates.size()) { |
800 | d_type = LogicalType::VARCHAR; |
801 | } |
802 | detected_types.push_back(x: d_type); |
803 | } |
804 | } |
805 | |
806 | return detected_types; |
807 | } |
808 | |
809 | string BufferedCSVReader::ColumnTypesError(case_insensitive_map_t<idx_t> sql_types_per_column, |
810 | const vector<string> &names) { |
811 | for (idx_t i = 0; i < names.size(); i++) { |
812 | auto it = sql_types_per_column.find(x: names[i]); |
813 | if (it != sql_types_per_column.end()) { |
814 | sql_types_per_column.erase(x: names[i]); |
815 | continue; |
816 | } |
817 | } |
818 | if (sql_types_per_column.empty()) { |
819 | return string(); |
820 | } |
821 | string exception = "COLUMN_TYPES error: Columns with names: " ; |
822 | for (auto &col : sql_types_per_column) { |
823 | exception += "\"" + col.first + "\"," ; |
824 | } |
825 | exception.pop_back(); |
826 | exception += " do not exist in the CSV File" ; |
827 | return exception; |
828 | } |
829 | |
830 | vector<LogicalType> BufferedCSVReader::SniffCSV(const vector<LogicalType> &requested_types) { |
831 | for (auto &type : requested_types) { |
832 | // auto detect for blobs not supported: there may be invalid UTF-8 in the file |
833 | if (type.id() == LogicalTypeId::BLOB) { |
834 | return requested_types; |
835 | } |
836 | } |
837 | |
838 | // ####### |
839 | // ### dialect detection |
840 | // ####### |
841 | BufferedCSVReaderOptions original_options = options; |
842 | vector<BufferedCSVReaderOptions> info_candidates; |
843 | idx_t best_num_cols = 0; |
844 | |
845 | DetectDialect(requested_types, original_options, info_candidates, best_num_cols); |
846 | |
847 | // if no dialect candidate was found, then file was most likely empty and we throw an exception |
848 | if (info_candidates.empty()) { |
849 | throw InvalidInputException( |
850 | "Error in file \"%s\": CSV options could not be auto-detected. Consider setting parser options manually." , |
851 | options.file_path); |
852 | } |
853 | |
854 | // ####### |
855 | // ### type detection (initial) |
856 | // ####### |
857 | |
858 | // format template candidates, ordered by descending specificity (~ from high to low) |
859 | std::map<LogicalTypeId, vector<const char *>> format_template_candidates = { |
860 | {LogicalTypeId::DATE, {"%m-%d-%Y" , "%m-%d-%y" , "%d-%m-%Y" , "%d-%m-%y" , "%Y-%m-%d" , "%y-%m-%d" }}, |
861 | {LogicalTypeId::TIMESTAMP, |
862 | {"%Y-%m-%d %H:%M:%S.%f" , "%m-%d-%Y %I:%M:%S %p" , "%m-%d-%y %I:%M:%S %p" , "%d-%m-%Y %H:%M:%S" , |
863 | "%d-%m-%y %H:%M:%S" , "%Y-%m-%d %H:%M:%S" , "%y-%m-%d %H:%M:%S" }}, |
864 | }; |
865 | vector<vector<LogicalType>> best_sql_types_candidates; |
866 | map<LogicalTypeId, vector<string>> best_format_candidates; |
867 | DataChunk ; |
868 | DetectCandidateTypes(type_candidates: options.auto_type_candidates, format_template_candidates, info_candidates, original_options, |
869 | best_num_cols, best_sql_types_candidates, best_format_candidates, best_header_row); |
870 | |
871 | if (best_format_candidates.empty() || best_header_row.size() == 0) { |
872 | throw InvalidInputException( |
873 | "Error in file \"%s\": CSV options could not be auto-detected. Consider setting parser options manually." , |
874 | original_options.file_path); |
875 | } |
876 | |
877 | // ####### |
878 | // ### header detection |
879 | // ####### |
880 | options.num_cols = best_num_cols; |
881 | DetectHeader(best_sql_types_candidates, best_header_row); |
882 | if (!options.sql_type_list.empty()) { |
883 | // user-defined types were supplied for certain columns |
884 | // override the types |
885 | if (!options.sql_types_per_column.empty()) { |
886 | // types supplied as name -> value map |
887 | idx_t found = 0; |
888 | for (idx_t i = 0; i < names.size(); i++) { |
889 | auto it = options.sql_types_per_column.find(x: names[i]); |
890 | if (it != options.sql_types_per_column.end()) { |
891 | best_sql_types_candidates[i] = {options.sql_type_list[it->second]}; |
892 | found++; |
893 | continue; |
894 | } |
895 | } |
896 | if (!options.file_options.union_by_name && found < options.sql_types_per_column.size()) { |
897 | string exception = ColumnTypesError(sql_types_per_column: options.sql_types_per_column, names); |
898 | if (!exception.empty()) { |
899 | throw BinderException(exception); |
900 | } |
901 | } |
902 | } else { |
903 | // types supplied as list |
904 | if (names.size() < options.sql_type_list.size()) { |
905 | throw BinderException("read_csv: %d types were provided, but CSV file only has %d columns" , |
906 | options.sql_type_list.size(), names.size()); |
907 | } |
908 | for (idx_t i = 0; i < options.sql_type_list.size(); i++) { |
909 | best_sql_types_candidates[i] = {options.sql_type_list[i]}; |
910 | } |
911 | } |
912 | } |
913 | |
914 | // ####### |
915 | // ### type detection (refining) |
916 | // ####### |
917 | return RefineTypeDetection(type_candidates: options.auto_type_candidates, requested_types, best_sql_types_candidates, |
918 | best_format_candidates); |
919 | } |
920 | |
921 | bool BufferedCSVReader::TryParseComplexCSV(DataChunk &insert_chunk, string &error_message) { |
922 | // used for parsing algorithm |
923 | bool finished_chunk = false; |
924 | idx_t column = 0; |
925 | vector<idx_t> escape_positions; |
926 | bool has_quotes = false; |
927 | uint8_t delimiter_pos = 0, escape_pos = 0, quote_pos = 0; |
928 | idx_t offset = 0; |
929 | idx_t line_start = 0; |
930 | // read values into the buffer (if any) |
931 | if (position >= buffer_size) { |
932 | if (!ReadBuffer(start, line_start)) { |
933 | return true; |
934 | } |
935 | } |
936 | // start parsing the first value |
937 | start = position; |
938 | goto value_start; |
939 | value_start: |
940 | /* state: value_start */ |
941 | // this state parses the first characters of a value |
942 | offset = 0; |
943 | delimiter_pos = 0; |
944 | quote_pos = 0; |
945 | do { |
946 | idx_t count = 0; |
947 | for (; position < buffer_size; position++) { |
948 | quote_search.Match(position&: quote_pos, byte_value: buffer[position]); |
949 | delimiter_search.Match(position&: delimiter_pos, byte_value: buffer[position]); |
950 | count++; |
951 | if (delimiter_pos == options.delimiter.size()) { |
952 | // found a delimiter, add the value |
953 | offset = options.delimiter.size() - 1; |
954 | goto add_value; |
955 | } else if (StringUtil::CharacterIsNewline(c: buffer[position])) { |
956 | // found a newline, add the row |
957 | goto add_row; |
958 | } |
959 | if (count > quote_pos) { |
960 | // did not find a quote directly at the start of the value, stop looking for the quote now |
961 | goto normal; |
962 | } |
963 | if (quote_pos == options.quote.size()) { |
964 | // found a quote, go to quoted loop and skip the initial quote |
965 | start += options.quote.size(); |
966 | goto in_quotes; |
967 | } |
968 | } |
969 | } while (ReadBuffer(start, line_start)); |
970 | // file ends while scanning for quote/delimiter, go to final state |
971 | goto final_state; |
972 | normal: |
973 | /* state: normal parsing state */ |
974 | // this state parses the remainder of a non-quoted value until we reach a delimiter or newline |
975 | position++; |
976 | do { |
977 | for (; position < buffer_size; position++) { |
978 | delimiter_search.Match(position&: delimiter_pos, byte_value: buffer[position]); |
979 | if (delimiter_pos == options.delimiter.size()) { |
980 | offset = options.delimiter.size() - 1; |
981 | goto add_value; |
982 | } else if (StringUtil::CharacterIsNewline(c: buffer[position])) { |
983 | goto add_row; |
984 | } |
985 | } |
986 | } while (ReadBuffer(start, line_start)); |
987 | goto final_state; |
988 | add_value: |
989 | AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes); |
990 | // increase position by 1 and move start to the new position |
991 | offset = 0; |
992 | has_quotes = false; |
993 | start = ++position; |
994 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
995 | // file ends right after delimiter, go to final state |
996 | goto final_state; |
997 | } |
998 | goto value_start; |
999 | add_row : { |
1000 | // check type of newline (\r or \n) |
1001 | bool carriage_return = buffer[position] == '\r'; |
1002 | AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes); |
1003 | finished_chunk = AddRow(insert_chunk, column, error_message); |
1004 | |
1005 | if (!error_message.empty()) { |
1006 | return false; |
1007 | } |
1008 | // increase position by 1 and move start to the new position |
1009 | offset = 0; |
1010 | has_quotes = false; |
1011 | position++; |
1012 | SkipEmptyLines(); |
1013 | start = position; |
1014 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1015 | // file ends right after newline, go to final state |
1016 | goto final_state; |
1017 | } |
1018 | if (carriage_return) { |
1019 | // \r newline, go to special state that parses an optional \n afterwards |
1020 | goto carriage_return; |
1021 | } else { |
1022 | // \n newline, move to value start |
1023 | if (finished_chunk) { |
1024 | return true; |
1025 | } |
1026 | goto value_start; |
1027 | } |
1028 | } |
1029 | in_quotes: |
1030 | /* state: in_quotes */ |
1031 | // this state parses the remainder of a quoted value |
1032 | quote_pos = 0; |
1033 | escape_pos = 0; |
1034 | has_quotes = true; |
1035 | position++; |
1036 | do { |
1037 | for (; position < buffer_size; position++) { |
1038 | quote_search.Match(position&: quote_pos, byte_value: buffer[position]); |
1039 | escape_search.Match(position&: escape_pos, byte_value: buffer[position]); |
1040 | if (quote_pos == options.quote.size()) { |
1041 | goto unquote; |
1042 | } else if (escape_pos == options.escape.size()) { |
1043 | escape_positions.push_back(x: position - start - (options.escape.size() - 1)); |
1044 | goto handle_escape; |
1045 | } |
1046 | } |
1047 | } while (ReadBuffer(start, line_start)); |
1048 | // still in quoted state at the end of the file, error: |
1049 | error_message = StringUtil::Format(fmt_str: "Error in file \"%s\" on line %s: unterminated quotes. (%s)" , params: options.file_path, |
1050 | params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1051 | return false; |
1052 | unquote: |
1053 | /* state: unquote */ |
1054 | // this state handles the state directly after we unquote |
1055 | // in this state we expect either another quote (entering the quoted state again, and escaping the quote) |
1056 | // or a delimiter/newline, ending the current value and moving on to the next value |
1057 | delimiter_pos = 0; |
1058 | quote_pos = 0; |
1059 | position++; |
1060 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1061 | // file ends right after unquote, go to final state |
1062 | offset = options.quote.size(); |
1063 | goto final_state; |
1064 | } |
1065 | if (StringUtil::CharacterIsNewline(c: buffer[position])) { |
1066 | // quote followed by newline, add row |
1067 | offset = options.quote.size(); |
1068 | goto add_row; |
1069 | } |
1070 | do { |
1071 | idx_t count = 0; |
1072 | for (; position < buffer_size; position++) { |
1073 | quote_search.Match(position&: quote_pos, byte_value: buffer[position]); |
1074 | delimiter_search.Match(position&: delimiter_pos, byte_value: buffer[position]); |
1075 | count++; |
1076 | if (count > delimiter_pos && count > quote_pos) { |
1077 | error_message = StringUtil::Format( |
1078 | fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end " |
1079 | "of row or another quote. (%s)" , |
1080 | params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1081 | return false; |
1082 | } |
1083 | if (delimiter_pos == options.delimiter.size()) { |
1084 | // quote followed by delimiter, add value |
1085 | offset = options.quote.size() + options.delimiter.size() - 1; |
1086 | goto add_value; |
1087 | } else if (quote_pos == options.quote.size() && |
1088 | (options.escape.empty() || options.escape == options.quote)) { |
1089 | // quote followed by quote, go back to quoted state and add to escape |
1090 | escape_positions.push_back(x: position - start - (options.quote.size() - 1)); |
1091 | goto in_quotes; |
1092 | } |
1093 | } |
1094 | } while (ReadBuffer(start, line_start)); |
1095 | error_message = StringUtil::Format( |
1096 | fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end of row or another quote. (%s)" , |
1097 | params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1098 | return false; |
1099 | handle_escape: |
1100 | escape_pos = 0; |
1101 | quote_pos = 0; |
1102 | position++; |
1103 | do { |
1104 | idx_t count = 0; |
1105 | for (; position < buffer_size; position++) { |
1106 | quote_search.Match(position&: quote_pos, byte_value: buffer[position]); |
1107 | escape_search.Match(position&: escape_pos, byte_value: buffer[position]); |
1108 | count++; |
1109 | if (count > escape_pos && count > quote_pos) { |
1110 | error_message = StringUtil::Format( |
1111 | fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)" , |
1112 | params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1113 | return false; |
1114 | } |
1115 | if (quote_pos == options.quote.size() || escape_pos == options.escape.size()) { |
1116 | // found quote or escape: move back to quoted state |
1117 | goto in_quotes; |
1118 | } |
1119 | } |
1120 | } while (ReadBuffer(start, line_start)); |
1121 | error_message = |
1122 | StringUtil::Format(fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)" , |
1123 | params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1124 | return false; |
1125 | carriage_return: |
1126 | /* state: carriage_return */ |
1127 | // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line |
1128 | if (buffer[position] == '\n') { |
1129 | // newline after carriage return: skip |
1130 | start = ++position; |
1131 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1132 | // file ends right after newline, go to final state |
1133 | goto final_state; |
1134 | } |
1135 | } |
1136 | if (finished_chunk) { |
1137 | return true; |
1138 | } |
1139 | goto value_start; |
1140 | final_state: |
1141 | if (finished_chunk) { |
1142 | return true; |
1143 | } |
1144 | if (column > 0 || position > start) { |
1145 | // remaining values to be added to the chunk |
1146 | AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes); |
1147 | finished_chunk = AddRow(insert_chunk, column, error_message); |
1148 | SkipEmptyLines(); |
1149 | if (!error_message.empty()) { |
1150 | return false; |
1151 | } |
1152 | } |
1153 | // final stage, only reached after parsing the file is finished |
1154 | // flush the parsed chunk and finalize parsing |
1155 | if (mode == ParserMode::PARSING) { |
1156 | Flush(insert_chunk); |
1157 | } |
1158 | |
1159 | end_of_file_reached = true; |
1160 | return true; |
1161 | } |
1162 | |
1163 | void BufferedCSVReader::SkipEmptyLines() { |
1164 | if (parse_chunk.data.size() == 1) { |
1165 | // Empty lines are null data. |
1166 | return; |
1167 | } |
1168 | for (; position < buffer_size; position++) { |
1169 | if (!StringUtil::CharacterIsNewline(c: buffer[position])) { |
1170 | return; |
1171 | } |
1172 | } |
1173 | } |
1174 | |
1175 | void UpdateMaxLineLength(ClientContext &context, idx_t line_length) { |
1176 | if (!context.client_data->debug_set_max_line_length) { |
1177 | return; |
1178 | } |
1179 | if (line_length < context.client_data->debug_max_line_length) { |
1180 | return; |
1181 | } |
1182 | context.client_data->debug_max_line_length = line_length; |
1183 | } |
1184 | |
1185 | bool BufferedCSVReader::TryParseSimpleCSV(DataChunk &insert_chunk, string &error_message) { |
1186 | // used for parsing algorithm |
1187 | bool finished_chunk = false; |
1188 | idx_t column = 0; |
1189 | idx_t offset = 0; |
1190 | bool has_quotes = false; |
1191 | vector<idx_t> escape_positions; |
1192 | |
1193 | idx_t line_start = position; |
1194 | // read values into the buffer (if any) |
1195 | if (position >= buffer_size) { |
1196 | if (!ReadBuffer(start, line_start)) { |
1197 | return true; |
1198 | } |
1199 | } |
1200 | |
1201 | // start parsing the first value |
1202 | goto value_start; |
1203 | value_start: |
1204 | offset = 0; |
1205 | /* state: value_start */ |
1206 | // this state parses the first character of a value |
1207 | if (buffer[position] == options.quote[0]) { |
1208 | // quote: actual value starts in the next position |
1209 | // move to in_quotes state |
1210 | start = position + 1; |
1211 | goto in_quotes; |
1212 | } else { |
1213 | // no quote, move to normal parsing state |
1214 | start = position; |
1215 | goto normal; |
1216 | } |
1217 | normal: |
1218 | /* state: normal parsing state */ |
1219 | // this state parses the remainder of a non-quoted value until we reach a delimiter or newline |
1220 | do { |
1221 | for (; position < buffer_size; position++) { |
1222 | if (buffer[position] == options.delimiter[0]) { |
1223 | // delimiter: end the value and add it to the chunk |
1224 | goto add_value; |
1225 | } else if (StringUtil::CharacterIsNewline(c: buffer[position])) { |
1226 | // newline: add row |
1227 | goto add_row; |
1228 | } |
1229 | } |
1230 | } while (ReadBuffer(start, line_start)); |
1231 | // file ends during normal scan: go to end state |
1232 | goto final_state; |
1233 | add_value: |
1234 | AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes); |
1235 | // increase position by 1 and move start to the new position |
1236 | offset = 0; |
1237 | has_quotes = false; |
1238 | start = ++position; |
1239 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1240 | // file ends right after delimiter, go to final state |
1241 | goto final_state; |
1242 | } |
1243 | goto value_start; |
1244 | add_row : { |
1245 | // check type of newline (\r or \n) |
1246 | bool carriage_return = buffer[position] == '\r'; |
1247 | AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes); |
1248 | if (!error_message.empty()) { |
1249 | return false; |
1250 | } |
1251 | finished_chunk = AddRow(insert_chunk, column, error_message); |
1252 | UpdateMaxLineLength(context, line_length: position - line_start); |
1253 | if (!error_message.empty()) { |
1254 | return false; |
1255 | } |
1256 | // increase position by 1 and move start to the new position |
1257 | offset = 0; |
1258 | has_quotes = false; |
1259 | position++; |
1260 | start = position; |
1261 | line_start = position; |
1262 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1263 | // file ends right after delimiter, go to final state |
1264 | goto final_state; |
1265 | } |
1266 | if (carriage_return) { |
1267 | // \r newline, go to special state that parses an optional \n afterwards |
1268 | goto carriage_return; |
1269 | } else { |
1270 | SetNewLineDelimiter(); |
1271 | SkipEmptyLines(); |
1272 | |
1273 | start = position; |
1274 | line_start = position; |
1275 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1276 | // file ends right after delimiter, go to final state |
1277 | goto final_state; |
1278 | } |
1279 | // \n newline, move to value start |
1280 | if (finished_chunk) { |
1281 | return true; |
1282 | } |
1283 | goto value_start; |
1284 | } |
1285 | } |
1286 | in_quotes: |
1287 | /* state: in_quotes */ |
1288 | // this state parses the remainder of a quoted value |
1289 | has_quotes = true; |
1290 | position++; |
1291 | do { |
1292 | for (; position < buffer_size; position++) { |
1293 | if (buffer[position] == options.quote[0]) { |
1294 | // quote: move to unquoted state |
1295 | goto unquote; |
1296 | } else if (buffer[position] == options.escape[0]) { |
1297 | // escape: store the escaped position and move to handle_escape state |
1298 | escape_positions.push_back(x: position - start); |
1299 | goto handle_escape; |
1300 | } |
1301 | } |
1302 | } while (ReadBuffer(start, line_start)); |
1303 | // still in quoted state at the end of the file, error: |
1304 | throw InvalidInputException("Error in file \"%s\" on line %s: unterminated quotes. (%s)" , options.file_path, |
1305 | GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), options.ToString()); |
1306 | unquote: |
1307 | /* state: unquote */ |
1308 | // this state handles the state directly after we unquote |
1309 | // in this state we expect either another quote (entering the quoted state again, and escaping the quote) |
1310 | // or a delimiter/newline, ending the current value and moving on to the next value |
1311 | position++; |
1312 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1313 | // file ends right after unquote, go to final state |
1314 | offset = 1; |
1315 | goto final_state; |
1316 | } |
1317 | if (buffer[position] == options.quote[0] && (options.escape.empty() || options.escape[0] == options.quote[0])) { |
1318 | // escaped quote, return to quoted state and store escape position |
1319 | escape_positions.push_back(x: position - start); |
1320 | goto in_quotes; |
1321 | } else if (buffer[position] == options.delimiter[0]) { |
1322 | // delimiter, add value |
1323 | offset = 1; |
1324 | goto add_value; |
1325 | } else if (StringUtil::CharacterIsNewline(c: buffer[position])) { |
1326 | offset = 1; |
1327 | goto add_row; |
1328 | } else { |
1329 | error_message = StringUtil::Format( |
1330 | fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end of " |
1331 | "row or another quote. (%s)" , |
1332 | params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1333 | return false; |
1334 | } |
1335 | handle_escape: |
1336 | /* state: handle_escape */ |
1337 | // escape should be followed by a quote or another escape character |
1338 | position++; |
1339 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1340 | error_message = StringUtil::Format( |
1341 | fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)" , params: options.file_path, |
1342 | params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1343 | return false; |
1344 | } |
1345 | if (buffer[position] != options.quote[0] && buffer[position] != options.escape[0]) { |
1346 | error_message = StringUtil::Format( |
1347 | fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)" , params: options.file_path, |
1348 | params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated).c_str(), params: options.ToString()); |
1349 | return false; |
1350 | } |
1351 | // escape was followed by quote or escape, go back to quoted state |
1352 | goto in_quotes; |
1353 | carriage_return: |
1354 | /* state: carriage_return */ |
1355 | // this stage optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line |
1356 | if (buffer[position] == '\n') { |
1357 | SetNewLineDelimiter(carry: true, carry_followed_by_nl: true); |
1358 | // newline after carriage return: skip |
1359 | // increase position by 1 and move start to the new position |
1360 | start = ++position; |
1361 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1362 | // file ends right after delimiter, go to final state |
1363 | goto final_state; |
1364 | } |
1365 | } else { |
1366 | SetNewLineDelimiter(carry: true, carry_followed_by_nl: false); |
1367 | } |
1368 | if (finished_chunk) { |
1369 | return true; |
1370 | } |
1371 | SkipEmptyLines(); |
1372 | start = position; |
1373 | line_start = position; |
1374 | if (position >= buffer_size && !ReadBuffer(start, line_start)) { |
1375 | // file ends right after delimiter, go to final state |
1376 | goto final_state; |
1377 | } |
1378 | |
1379 | goto value_start; |
1380 | final_state: |
1381 | if (finished_chunk) { |
1382 | return true; |
1383 | } |
1384 | |
1385 | if (column > 0 || position > start) { |
1386 | // remaining values to be added to the chunk |
1387 | AddValue(str_val: string_t(buffer.get() + start, position - start - offset), column, escape_positions, has_quotes); |
1388 | finished_chunk = AddRow(insert_chunk, column, error_message); |
1389 | SkipEmptyLines(); |
1390 | UpdateMaxLineLength(context, line_length: position - line_start); |
1391 | if (!error_message.empty()) { |
1392 | return false; |
1393 | } |
1394 | } |
1395 | |
1396 | // final stage, only reached after parsing the file is finished |
1397 | // flush the parsed chunk and finalize parsing |
1398 | if (mode == ParserMode::PARSING) { |
1399 | Flush(insert_chunk); |
1400 | } |
1401 | |
1402 | end_of_file_reached = true; |
1403 | return true; |
1404 | } |
1405 | |
1406 | bool BufferedCSVReader::ReadBuffer(idx_t &start, idx_t &line_start) { |
1407 | if (start > buffer_size) { |
1408 | return false; |
1409 | } |
1410 | auto old_buffer = std::move(buffer); |
1411 | |
1412 | // the remaining part of the last buffer |
1413 | idx_t remaining = buffer_size - start; |
1414 | |
1415 | bool large_buffers = mode == ParserMode::PARSING && !file_handle->OnDiskFile() && file_handle->CanSeek(); |
1416 | idx_t buffer_read_size = large_buffers ? INITIAL_BUFFER_SIZE_LARGE : INITIAL_BUFFER_SIZE; |
1417 | |
1418 | while (remaining > buffer_read_size) { |
1419 | buffer_read_size *= 2; |
1420 | } |
1421 | |
1422 | // Check line length |
1423 | if (remaining > options.maximum_line_size) { |
1424 | throw InvalidInputException("Maximum line size of %llu bytes exceeded on line %s!" , options.maximum_line_size, |
1425 | GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated)); |
1426 | } |
1427 | |
1428 | buffer = make_unsafe_uniq_array<char>(n: buffer_read_size + remaining + 1); |
1429 | buffer_size = remaining + buffer_read_size; |
1430 | if (remaining > 0) { |
1431 | // remaining from last buffer: copy it here |
1432 | memcpy(dest: buffer.get(), src: old_buffer.get() + start, n: remaining); |
1433 | } |
1434 | idx_t read_count = file_handle->Read(buffer: buffer.get() + remaining, nr_bytes: buffer_read_size); |
1435 | |
1436 | bytes_in_chunk += read_count; |
1437 | buffer_size = remaining + read_count; |
1438 | buffer[buffer_size] = '\0'; |
1439 | if (old_buffer) { |
1440 | cached_buffers.push_back(x: std::move(old_buffer)); |
1441 | } |
1442 | start = 0; |
1443 | position = remaining; |
1444 | if (!bom_checked) { |
1445 | bom_checked = true; |
1446 | if (read_count >= 3 && buffer[0] == '\xEF' && buffer[1] == '\xBB' && buffer[2] == '\xBF') { |
1447 | start += 3; |
1448 | position += 3; |
1449 | } |
1450 | } |
1451 | line_start = start; |
1452 | |
1453 | return read_count > 0; |
1454 | } |
1455 | |
1456 | void BufferedCSVReader::ParseCSV(DataChunk &insert_chunk) { |
1457 | string error_message; |
1458 | if (!TryParseCSV(mode: ParserMode::PARSING, insert_chunk, error_message)) { |
1459 | throw InvalidInputException(error_message); |
1460 | } |
1461 | } |
1462 | |
1463 | bool BufferedCSVReader::TryParseCSV(ParserMode mode) { |
1464 | DataChunk dummy_chunk; |
1465 | string error_message; |
1466 | return TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message); |
1467 | } |
1468 | |
1469 | void BufferedCSVReader::ParseCSV(ParserMode mode) { |
1470 | DataChunk dummy_chunk; |
1471 | string error_message; |
1472 | if (!TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message)) { |
1473 | throw InvalidInputException(error_message); |
1474 | } |
1475 | } |
1476 | |
1477 | bool BufferedCSVReader::TryParseCSV(ParserMode parser_mode, DataChunk &insert_chunk, string &error_message) { |
1478 | mode = parser_mode; |
1479 | |
1480 | if (options.quote.size() <= 1 && options.escape.size() <= 1 && options.delimiter.size() == 1) { |
1481 | return TryParseSimpleCSV(insert_chunk, error_message); |
1482 | } else { |
1483 | return TryParseComplexCSV(insert_chunk, error_message); |
1484 | } |
1485 | } |
1486 | |
1487 | } // namespace duckdb |
1488 | |