1#include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/file_system.hpp"
5#include "duckdb/common/string_util.hpp"
6#include "duckdb/common/to_string.hpp"
7#include "duckdb/common/types/cast_helpers.hpp"
8#include "duckdb/common/vector_operations/unary_executor.hpp"
9#include "duckdb/common/vector_operations/vector_operations.hpp"
10#include "duckdb/function/scalar/strftime_format.hpp"
11#include "duckdb/main/database.hpp"
12#include "duckdb/parser/column_definition.hpp"
13#include "duckdb/storage/data_table.hpp"
14#include "utf8proc_wrapper.hpp"
15#include "utf8proc.hpp"
16#include "duckdb/parser/keyword_helper.hpp"
17#include "duckdb/function/table/read_csv.hpp"
18#include "duckdb/execution/operator/persistent/csv_line_info.hpp"
19
20#include <algorithm>
21#include <cctype>
22#include <cstring>
23#include <fstream>
24
25namespace duckdb {
26
27ParallelCSVReader::ParallelCSVReader(ClientContext &context, BufferedCSVReaderOptions options_p,
28 unique_ptr<CSVBufferRead> buffer_p, idx_t first_pos_first_buffer_p,
29 const vector<LogicalType> &requested_types, idx_t file_idx_p)
30 : BaseCSVReader(context, std::move(options_p), requested_types), file_idx(file_idx_p),
31 first_pos_first_buffer(first_pos_first_buffer_p) {
32 Initialize(requested_types);
33 SetBufferRead(std::move(buffer_p));
34 if (options.delimiter.size() > 1 || options.escape.size() > 1 || options.quote.size() > 1) {
35 throw InternalException("Parallel CSV reader cannot handle CSVs with multi-byte delimiters/escapes/quotes");
36 }
37}
38
39void ParallelCSVReader::Initialize(const vector<LogicalType> &requested_types) {
40 return_types = requested_types;
41 InitParseChunk(num_cols: return_types.size());
42}
43
44bool ParallelCSVReader::NewLineDelimiter(bool carry, bool carry_followed_by_nl, bool first_char) {
45 // Set the delimiter if not set yet.
46 SetNewLineDelimiter(carry, carry_followed_by_nl);
47 D_ASSERT(options.new_line == NewLineIdentifier::SINGLE || options.new_line == NewLineIdentifier::CARRY_ON);
48 if (options.new_line == NewLineIdentifier::SINGLE) {
49 return (!carry) || (carry && !carry_followed_by_nl);
50 }
51 return (carry && carry_followed_by_nl) || (!carry && first_char);
52}
53
54void ParallelCSVReader::SkipEmptyLines() {
55 idx_t new_pos_buffer = position_buffer;
56 if (parse_chunk.data.size() == 1) {
57 // Empty lines are null data.
58 return;
59 }
60 for (; new_pos_buffer < end_buffer; new_pos_buffer++) {
61 if (StringUtil::CharacterIsNewline(c: (*buffer)[new_pos_buffer])) {
62 bool carrier_return = (*buffer)[new_pos_buffer] == '\r';
63 new_pos_buffer++;
64 if (carrier_return && new_pos_buffer < buffer_size && (*buffer)[new_pos_buffer] == '\n') {
65 position_buffer++;
66 }
67 if (new_pos_buffer > end_buffer) {
68 return;
69 }
70 position_buffer = new_pos_buffer;
71 } else if ((*buffer)[new_pos_buffer] != ' ') {
72 return;
73 }
74 }
75}
76
77bool ParallelCSVReader::SetPosition() {
78 if (buffer->buffer->IsCSVFileFirstBuffer() && start_buffer == position_buffer &&
79 start_buffer == first_pos_first_buffer) {
80 start_buffer = buffer->buffer->GetStart();
81 position_buffer = start_buffer;
82 verification_positions.beginning_of_first_line = position_buffer;
83 verification_positions.end_of_last_line = position_buffer;
84 // First buffer doesn't need any setting
85
86 if (options.header) {
87 for (; position_buffer < end_buffer; position_buffer++) {
88 if (StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer])) {
89 bool carrier_return = (*buffer)[position_buffer] == '\r';
90 position_buffer++;
91 if (carrier_return && position_buffer < buffer_size && (*buffer)[position_buffer] == '\n') {
92 position_buffer++;
93 }
94 if (position_buffer > end_buffer) {
95 return false;
96 }
97 SkipEmptyLines();
98 if (verification_positions.beginning_of_first_line == 0) {
99 verification_positions.beginning_of_first_line = position_buffer;
100 }
101
102 verification_positions.end_of_last_line = position_buffer;
103 return true;
104 }
105 }
106 return false;
107 }
108 SkipEmptyLines();
109 if (verification_positions.beginning_of_first_line == 0) {
110 verification_positions.beginning_of_first_line = position_buffer;
111 }
112
113 verification_positions.end_of_last_line = position_buffer;
114 return true;
115 }
116
117 // We have to move position up to next new line
118 idx_t end_buffer_real = end_buffer;
119 // Check if we already start in a valid line
120 string error_message;
121 bool successfully_read_first_line = false;
122 while (!successfully_read_first_line) {
123 DataChunk first_line_chunk;
124 first_line_chunk.Initialize(allocator, types: return_types);
125 // Ensure that parse_chunk has no gunk when trying to figure new line
126 parse_chunk.Reset();
127 for (; position_buffer < end_buffer; position_buffer++) {
128 if (StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer])) {
129 bool carriage_return = (*buffer)[position_buffer] == '\r';
130 bool carriage_return_followed = false;
131 position_buffer++;
132 if (position_buffer < end_buffer) {
133 if (carriage_return && (*buffer)[position_buffer] == '\n') {
134 carriage_return_followed = true;
135 position_buffer++;
136 }
137 }
138 if (NewLineDelimiter(carry: carriage_return, carry_followed_by_nl: carriage_return_followed, first_char: position_buffer - 1 == start_buffer)) {
139 break;
140 }
141 }
142 }
143 SkipEmptyLines();
144
145 if (position_buffer > buffer_size) {
146 break;
147 }
148
149 if (position_buffer >= end_buffer && !StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer - 1])) {
150 break;
151 }
152
153 if (position_buffer > end_buffer && options.new_line == NewLineIdentifier::CARRY_ON &&
154 (*buffer)[position_buffer - 1] == '\n') {
155 break;
156 }
157 idx_t position_set = position_buffer;
158 start_buffer = position_buffer;
159 // We check if we can add this line
160 // disable the projection pushdown while reading the first line
161 // otherwise the first line parsing can be influenced by which columns we are reading
162 auto column_ids = std::move(reader_data.column_ids);
163 auto column_mapping = std::move(reader_data.column_mapping);
164 InitializeProjection();
165 try {
166 successfully_read_first_line = TryParseSimpleCSV(insert_chunk&: first_line_chunk, error_message, try_add_line: true);
167 } catch (...) {
168 successfully_read_first_line = false;
169 }
170 // restore the projection pushdown
171 reader_data.column_ids = std::move(column_ids);
172 reader_data.column_mapping = std::move(column_mapping);
173 end_buffer = end_buffer_real;
174 start_buffer = position_set;
175 if (position_buffer >= end_buffer) {
176 if (successfully_read_first_line) {
177 position_buffer = position_set;
178 }
179 break;
180 }
181 position_buffer = position_set;
182 }
183 if (verification_positions.beginning_of_first_line == 0) {
184 verification_positions.beginning_of_first_line = position_buffer;
185 }
186 // Ensure that parse_chunk has no gunk when trying to figure new line
187 parse_chunk.Reset();
188
189 verification_positions.end_of_last_line = position_buffer;
190 finished = false;
191 return successfully_read_first_line;
192}
193
194void ParallelCSVReader::SetBufferRead(unique_ptr<CSVBufferRead> buffer_read_p) {
195 if (!buffer_read_p->buffer) {
196 throw InternalException("ParallelCSVReader::SetBufferRead - CSVBufferRead does not have a buffer to read");
197 }
198 position_buffer = buffer_read_p->buffer_start;
199 start_buffer = buffer_read_p->buffer_start;
200 end_buffer = buffer_read_p->buffer_end;
201 if (buffer_read_p->next_buffer) {
202 buffer_size = buffer_read_p->buffer->GetBufferSize() + buffer_read_p->next_buffer->GetBufferSize();
203 } else {
204 buffer_size = buffer_read_p->buffer->GetBufferSize();
205 }
206 buffer = std::move(buffer_read_p);
207
208 reached_remainder_state = false;
209 verification_positions.beginning_of_first_line = 0;
210 verification_positions.end_of_last_line = 0;
211 finished = false;
212 D_ASSERT(end_buffer <= buffer_size);
213}
214
215VerificationPositions ParallelCSVReader::GetVerificationPositions() {
216 verification_positions.beginning_of_first_line += buffer->buffer->GetCSVGlobalStart();
217 verification_positions.end_of_last_line += buffer->buffer->GetCSVGlobalStart();
218 return verification_positions;
219}
220
221// If BufferRemainder returns false, it means we are done scanning this buffer and should go to the end_state
222bool ParallelCSVReader::BufferRemainder() {
223 if (position_buffer >= end_buffer && !reached_remainder_state) {
224 // First time we finish the buffer piece we should scan here, we set the variables
225 // to allow this piece to be scanned up to the end of the buffer or the next new line
226 reached_remainder_state = true;
227 // end_buffer is allowed to go to buffer size to finish its last line
228 end_buffer = buffer_size;
229 }
230 if (position_buffer >= end_buffer) {
231 // buffer ends, return false
232 return false;
233 }
234 // we can still scan stuff, return true
235 return true;
236}
237
238void ParallelCSVReader::VerifyLineLength(idx_t line_size) {
239 if (line_size > options.maximum_line_size) {
240 throw InvalidInputException("Error in file \"%s\" on line %s: Maximum line size of %llu bytes exceeded!",
241 options.file_path,
242 GetLineNumberStr(line_error: parse_chunk.size(), is_line_estimated: linenr_estimated, buffer_idx: buffer->batch_index).c_str(),
243 options.maximum_line_size);
244 }
245}
246
247bool AllNewLine(string_t value, idx_t column_amount) {
248 auto value_str = value.GetString();
249 if (value_str.empty() && column_amount == 1) {
250 // This is a one column (empty)
251 return false;
252 }
253 for (idx_t i = 0; i < value.GetSize(); i++) {
254 if (!StringUtil::CharacterIsNewline(c: value_str[i])) {
255 return false;
256 }
257 }
258 return true;
259}
260
261bool ParallelCSVReader::TryParseSimpleCSV(DataChunk &insert_chunk, string &error_message, bool try_add_line) {
262 // If line is not set, we have to figure it out, we assume whatever is in the first line
263 if (options.new_line == NewLineIdentifier::NOT_SET) {
264 idx_t cur_pos = position_buffer;
265 // we can start in the middle of a new line, so move a bit forward.
266 while (cur_pos < end_buffer) {
267 if (StringUtil::CharacterIsNewline(c: (*buffer)[cur_pos])) {
268 cur_pos++;
269 } else {
270 break;
271 }
272 }
273 for (; cur_pos < end_buffer; cur_pos++) {
274 if (StringUtil::CharacterIsNewline(c: (*buffer)[cur_pos])) {
275 bool carriage_return = (*buffer)[cur_pos] == '\r';
276 bool carriage_return_followed = false;
277 cur_pos++;
278 if (cur_pos < end_buffer) {
279 if (carriage_return && (*buffer)[cur_pos] == '\n') {
280 carriage_return_followed = true;
281 cur_pos++;
282 }
283 }
284 SetNewLineDelimiter(carry: carriage_return, carry_followed_by_nl: carriage_return_followed);
285 break;
286 }
287 }
288 }
289 // used for parsing algorithm
290 if (start_buffer == buffer_size) {
291 // Nothing to read
292 finished = true;
293 return true;
294 }
295 D_ASSERT(end_buffer <= buffer_size);
296 bool finished_chunk = false;
297 idx_t column = 0;
298 idx_t offset = 0;
299 bool has_quotes = false;
300
301 vector<idx_t> escape_positions;
302 if ((start_buffer == buffer->buffer_start || start_buffer == buffer->buffer_end) && !try_add_line) {
303 // First time reading this buffer piece
304 if (!SetPosition()) {
305 finished = true;
306 return true;
307 }
308 }
309 if (position_buffer == buffer_size) {
310 // Nothing to read
311 finished = true;
312 return true;
313 }
314 // Keep track of line size
315 idx_t line_start = position_buffer;
316 // start parsing the first value
317 goto value_start;
318
319value_start : {
320 /* state: value_start */
321 if (!BufferRemainder()) {
322 goto final_state;
323 }
324 offset = 0;
325
326 // this state parses the first character of a value
327 if ((*buffer)[position_buffer] == options.quote[0]) {
328 // quote: actual value starts in the next position
329 // move to in_quotes state
330 start_buffer = position_buffer + 1;
331 goto in_quotes;
332 } else {
333 // no quote, move to normal parsing state
334 start_buffer = position_buffer;
335 goto normal;
336 }
337};
338
339normal : {
340 /* state: normal parsing state */
341 // this state parses the remainder of a non-quoted value until we reach a delimiter or newline
342 for (; position_buffer < end_buffer; position_buffer++) {
343 auto c = (*buffer)[position_buffer];
344 if (c == options.delimiter[0]) {
345 // delimiter: end the value and add it to the chunk
346 goto add_value;
347 } else if (c == options.quote[0] && try_add_line) {
348 return false;
349 } else if (StringUtil::CharacterIsNewline(c)) {
350 // newline: add row
351 if (column > 0 || try_add_line || parse_chunk.data.size() == 1) {
352 goto add_row;
353 }
354 if (column == 0 && position_buffer == start_buffer) {
355 start_buffer++;
356 }
357 }
358 }
359 if (!BufferRemainder()) {
360 goto final_state;
361 } else {
362 goto normal;
363 }
364};
365
366add_value : {
367 /* state: Add value to string vector */
368 AddValue(str_val: buffer->GetValue(start_buffer, position_buffer, offset), column, escape_positions, has_quotes,
369 buffer_idx: buffer->local_batch_index);
370 // increase position by 1 and move start to the new position
371 offset = 0;
372 has_quotes = false;
373 start_buffer = ++position_buffer;
374 if (!BufferRemainder()) {
375 goto final_state;
376 }
377 goto value_start;
378};
379
380add_row : {
381 /* state: Add Row to Parse chunk */
382 // check type of newline (\r or \n)
383 bool carriage_return = (*buffer)[position_buffer] == '\r';
384
385 AddValue(str_val: buffer->GetValue(start_buffer, position_buffer, offset), column, escape_positions, has_quotes,
386 buffer_idx: buffer->local_batch_index);
387 if (try_add_line) {
388 bool success = column == insert_chunk.ColumnCount();
389 if (success) {
390 idx_t cur_linenr = linenr;
391 AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index);
392 success = Flush(insert_chunk, buffer_idx: buffer->local_batch_index, try_add_line: true);
393 linenr = cur_linenr;
394 }
395 reached_remainder_state = false;
396 parse_chunk.Reset();
397 return success;
398 } else {
399 VerifyLineLength(line_size: position_buffer - line_start);
400 line_start = position_buffer;
401 finished_chunk = AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index);
402 }
403 // increase position by 1 and move start to the new position
404 offset = 0;
405 has_quotes = false;
406 position_buffer++;
407 start_buffer = position_buffer;
408 verification_positions.end_of_last_line = position_buffer;
409 if (carriage_return) {
410 // \r newline, go to special state that parses an optional \n afterwards
411 // optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line
412 if (!BufferRemainder()) {
413 goto final_state;
414 }
415 if ((*buffer)[position_buffer] == '\n') {
416 if (options.new_line == NewLineIdentifier::SINGLE) {
417 error_message = "Wrong NewLine Identifier. Expecting \\r\\n";
418 return false;
419 }
420 // newline after carriage return: skip
421 // increase position by 1 and move start to the new position
422 start_buffer = ++position_buffer;
423
424 SkipEmptyLines();
425 verification_positions.end_of_last_line = position_buffer;
426 start_buffer = position_buffer;
427 if (reached_remainder_state) {
428 goto final_state;
429 }
430 } else {
431 if (options.new_line == NewLineIdentifier::CARRY_ON) {
432 error_message = "Wrong NewLine Identifier. Expecting \\r or \\n";
433 return false;
434 }
435 }
436 if (!BufferRemainder()) {
437 goto final_state;
438 }
439 if (reached_remainder_state || finished_chunk) {
440 goto final_state;
441 }
442 goto value_start;
443 } else {
444 if (options.new_line == NewLineIdentifier::CARRY_ON) {
445 error_message = "Wrong NewLine Identifier. Expecting \\r or \\n";
446 return false;
447 }
448 if (reached_remainder_state) {
449 goto final_state;
450 }
451 if (!BufferRemainder()) {
452 goto final_state;
453 }
454 SkipEmptyLines();
455 verification_positions.end_of_last_line = position_buffer;
456 start_buffer = position_buffer;
457 // \n newline, move to value start
458 if (finished_chunk) {
459 goto final_state;
460 }
461 goto value_start;
462 }
463}
464in_quotes:
465 /* state: in_quotes this state parses the remainder of a quoted value*/
466 has_quotes = true;
467 position_buffer++;
468 for (; position_buffer < end_buffer; position_buffer++) {
469 auto c = (*buffer)[position_buffer];
470 if (c == options.quote[0]) {
471 // quote: move to unquoted state
472 goto unquote;
473 } else if (c == options.escape[0]) {
474 // escape: store the escaped position and move to handle_escape state
475 escape_positions.push_back(x: position_buffer - start_buffer);
476 goto handle_escape;
477 }
478 }
479 if (!BufferRemainder()) {
480 if (buffer->buffer->IsCSVFileLastBuffer()) {
481 if (try_add_line) {
482 return false;
483 }
484 // still in quoted state at the end of the file or at the end of a buffer when running multithreaded, error:
485 throw InvalidInputException("Error in file \"%s\" on line %s: unterminated quotes. (%s)", options.file_path,
486 GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(),
487 options.ToString());
488 } else {
489 goto final_state;
490 }
491 } else {
492 position_buffer--;
493 goto in_quotes;
494 }
495
496unquote : {
497 /* state: unquote: this state handles the state directly after we unquote*/
498 //
499 // in this state we expect either another quote (entering the quoted state again, and escaping the quote)
500 // or a delimiter/newline, ending the current value and moving on to the next value
501 position_buffer++;
502 if (!BufferRemainder()) {
503 offset = 1;
504 goto final_state;
505 }
506 auto c = (*buffer)[position_buffer];
507 if (c == options.quote[0] && (options.escape.empty() || options.escape[0] == options.quote[0])) {
508 // escaped quote, return to quoted state and store escape position
509 escape_positions.push_back(x: position_buffer - start_buffer);
510 goto in_quotes;
511 } else if (c == options.delimiter[0]) {
512 // delimiter, add value
513 offset = 1;
514 goto add_value;
515 } else if (StringUtil::CharacterIsNewline(c)) {
516 offset = 1;
517 // FIXME: should this be an assertion?
518 D_ASSERT(try_add_line || (!try_add_line && column == parse_chunk.ColumnCount() - 1));
519 goto add_row;
520 } else if (position_buffer >= end_buffer) {
521 // reached end of buffer
522 offset = 1;
523 goto final_state;
524 } else {
525 error_message = StringUtil::Format(
526 fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end of "
527 "row or another quote. (%s). ",
528 params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(),
529 params: options.ToString());
530 return false;
531 }
532}
533handle_escape : {
534 /* state: handle_escape */
535 // escape should be followed by a quote or another escape character
536 position_buffer++;
537 if (!BufferRemainder()) {
538 goto final_state;
539 }
540 if (position_buffer >= buffer_size && buffer->buffer->IsCSVFileLastBuffer()) {
541 error_message = StringUtil::Format(
542 fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)", params: options.file_path,
543 params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(), params: options.ToString());
544 return false;
545 }
546 if ((*buffer)[position_buffer] != options.quote[0] && (*buffer)[position_buffer] != options.escape[0]) {
547 error_message = StringUtil::Format(
548 fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)", params: options.file_path,
549 params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(), params: options.ToString());
550 return false;
551 }
552 // escape was followed by quote or escape, go back to quoted state
553 goto in_quotes;
554}
555final_state : {
556 /* state: final_stage reached after we finished reading the end_buffer of the csv buffer */
557 // reset end buffer
558 end_buffer = buffer->buffer_end;
559 if (position_buffer == end_buffer) {
560 reached_remainder_state = false;
561 }
562 if (finished_chunk) {
563 if (position_buffer >= end_buffer) {
564 if (position_buffer == end_buffer && StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer - 1]) &&
565 position_buffer < buffer_size) {
566 // last position is a new line, we still have to go through one more line of this buffer
567 finished = false;
568 } else {
569 finished = true;
570 }
571 }
572 buffer->lines_read += insert_chunk.size();
573 return true;
574 }
575 // If this is the last buffer, we have to read the last value
576 if (buffer->buffer->IsCSVFileLastBuffer() || (buffer->next_buffer && buffer->next_buffer->IsCSVFileLastBuffer())) {
577 if (column > 0 || start_buffer != position_buffer || try_add_line ||
578 (insert_chunk.data.size() == 1 && start_buffer != position_buffer)) {
579 // remaining values to be added to the chunk
580 auto str_value = buffer->GetValue(start_buffer, position_buffer, offset);
581 if (!AllNewLine(value: str_value, column_amount: insert_chunk.data.size()) || offset == 0) {
582 AddValue(str_val: str_value, column, escape_positions, has_quotes, buffer_idx: buffer->local_batch_index);
583 if (try_add_line) {
584 bool success = column == return_types.size();
585 if (success) {
586 auto cur_linenr = linenr;
587 AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index);
588 success = Flush(insert_chunk, buffer_idx: buffer->local_batch_index);
589 linenr = cur_linenr;
590 }
591 parse_chunk.Reset();
592 reached_remainder_state = false;
593 return success;
594 } else {
595 VerifyLineLength(line_size: position_buffer - line_start);
596 line_start = position_buffer;
597 AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index);
598 verification_positions.end_of_last_line = position_buffer;
599 }
600 }
601 }
602 }
603 // flush the parsed chunk and finalize parsing
604 if (mode == ParserMode::PARSING) {
605 Flush(insert_chunk, buffer_idx: buffer->local_batch_index);
606 buffer->lines_read += insert_chunk.size();
607 }
608 if (position_buffer - verification_positions.end_of_last_line > options.buffer_size) {
609 error_message = "Line does not fit in one buffer. Increase the buffer size.";
610 return false;
611 }
612 end_buffer = buffer_size;
613 SkipEmptyLines();
614 end_buffer = buffer->buffer_end;
615 verification_positions.end_of_last_line = position_buffer;
616 if (position_buffer >= end_buffer) {
617 if (position_buffer >= end_buffer) {
618 if (position_buffer == end_buffer && StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer - 1]) &&
619 position_buffer < buffer_size) {
620 // last position is a new line, we still have to go through one more line of this buffer
621 finished = false;
622 } else {
623 finished = true;
624 }
625 }
626 }
627 return true;
628};
629}
630
631void ParallelCSVReader::ParseCSV(DataChunk &insert_chunk) {
632 string error_message;
633 if (!TryParseCSV(mode: ParserMode::PARSING, insert_chunk, error_message)) {
634 throw InvalidInputException(error_message);
635 }
636}
637
638idx_t ParallelCSVReader::GetLineError(idx_t line_error, idx_t buffer_idx) {
639 while (true) {
640 if (buffer->line_info->CanItGetLine(file_idx, batch_idx: buffer_idx)) {
641 auto cur_start = verification_positions.beginning_of_first_line + buffer->buffer->GetCSVGlobalStart();
642 // line errors are 1-indexed
643 return buffer->line_info->GetLine(batch_idx: buffer_idx, line_error, file_idx, cur_start, verify: false);
644 }
645 }
646}
647
648bool ParallelCSVReader::TryParseCSV(ParserMode mode) {
649 DataChunk dummy_chunk;
650 string error_message;
651 return TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message);
652}
653
654void ParallelCSVReader::ParseCSV(ParserMode mode) {
655 DataChunk dummy_chunk;
656 string error_message;
657 if (!TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message)) {
658 throw InvalidInputException(error_message);
659 }
660}
661
662bool ParallelCSVReader::TryParseCSV(ParserMode parser_mode, DataChunk &insert_chunk, string &error_message) {
663 mode = parser_mode;
664 return TryParseSimpleCSV(insert_chunk, error_message);
665}
666
667} // namespace duckdb
668