1 | #include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
4 | #include "duckdb/common/file_system.hpp" |
5 | #include "duckdb/common/string_util.hpp" |
6 | #include "duckdb/common/to_string.hpp" |
7 | #include "duckdb/common/types/cast_helpers.hpp" |
8 | #include "duckdb/common/vector_operations/unary_executor.hpp" |
9 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
10 | #include "duckdb/function/scalar/strftime_format.hpp" |
11 | #include "duckdb/main/database.hpp" |
12 | #include "duckdb/parser/column_definition.hpp" |
13 | #include "duckdb/storage/data_table.hpp" |
14 | #include "utf8proc_wrapper.hpp" |
15 | #include "utf8proc.hpp" |
16 | #include "duckdb/parser/keyword_helper.hpp" |
17 | #include "duckdb/function/table/read_csv.hpp" |
18 | #include "duckdb/execution/operator/persistent/csv_line_info.hpp" |
19 | |
20 | #include <algorithm> |
21 | #include <cctype> |
22 | #include <cstring> |
23 | #include <fstream> |
24 | |
25 | namespace duckdb { |
26 | |
27 | ParallelCSVReader::ParallelCSVReader(ClientContext &context, BufferedCSVReaderOptions options_p, |
28 | unique_ptr<CSVBufferRead> buffer_p, idx_t first_pos_first_buffer_p, |
29 | const vector<LogicalType> &requested_types, idx_t file_idx_p) |
30 | : BaseCSVReader(context, std::move(options_p), requested_types), file_idx(file_idx_p), |
31 | first_pos_first_buffer(first_pos_first_buffer_p) { |
32 | Initialize(requested_types); |
33 | SetBufferRead(std::move(buffer_p)); |
34 | if (options.delimiter.size() > 1 || options.escape.size() > 1 || options.quote.size() > 1) { |
35 | throw InternalException("Parallel CSV reader cannot handle CSVs with multi-byte delimiters/escapes/quotes" ); |
36 | } |
37 | } |
38 | |
39 | void ParallelCSVReader::Initialize(const vector<LogicalType> &requested_types) { |
40 | return_types = requested_types; |
41 | InitParseChunk(num_cols: return_types.size()); |
42 | } |
43 | |
44 | bool ParallelCSVReader::NewLineDelimiter(bool carry, bool carry_followed_by_nl, bool first_char) { |
45 | // Set the delimiter if not set yet. |
46 | SetNewLineDelimiter(carry, carry_followed_by_nl); |
47 | D_ASSERT(options.new_line == NewLineIdentifier::SINGLE || options.new_line == NewLineIdentifier::CARRY_ON); |
48 | if (options.new_line == NewLineIdentifier::SINGLE) { |
49 | return (!carry) || (carry && !carry_followed_by_nl); |
50 | } |
51 | return (carry && carry_followed_by_nl) || (!carry && first_char); |
52 | } |
53 | |
54 | void ParallelCSVReader::SkipEmptyLines() { |
55 | idx_t new_pos_buffer = position_buffer; |
56 | if (parse_chunk.data.size() == 1) { |
57 | // Empty lines are null data. |
58 | return; |
59 | } |
60 | for (; new_pos_buffer < end_buffer; new_pos_buffer++) { |
61 | if (StringUtil::CharacterIsNewline(c: (*buffer)[new_pos_buffer])) { |
62 | bool carrier_return = (*buffer)[new_pos_buffer] == '\r'; |
63 | new_pos_buffer++; |
64 | if (carrier_return && new_pos_buffer < buffer_size && (*buffer)[new_pos_buffer] == '\n') { |
65 | position_buffer++; |
66 | } |
67 | if (new_pos_buffer > end_buffer) { |
68 | return; |
69 | } |
70 | position_buffer = new_pos_buffer; |
71 | } else if ((*buffer)[new_pos_buffer] != ' ') { |
72 | return; |
73 | } |
74 | } |
75 | } |
76 | |
77 | bool ParallelCSVReader::SetPosition() { |
78 | if (buffer->buffer->IsCSVFileFirstBuffer() && start_buffer == position_buffer && |
79 | start_buffer == first_pos_first_buffer) { |
80 | start_buffer = buffer->buffer->GetStart(); |
81 | position_buffer = start_buffer; |
82 | verification_positions.beginning_of_first_line = position_buffer; |
83 | verification_positions.end_of_last_line = position_buffer; |
84 | // First buffer doesn't need any setting |
85 | |
86 | if (options.header) { |
87 | for (; position_buffer < end_buffer; position_buffer++) { |
88 | if (StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer])) { |
89 | bool carrier_return = (*buffer)[position_buffer] == '\r'; |
90 | position_buffer++; |
91 | if (carrier_return && position_buffer < buffer_size && (*buffer)[position_buffer] == '\n') { |
92 | position_buffer++; |
93 | } |
94 | if (position_buffer > end_buffer) { |
95 | return false; |
96 | } |
97 | SkipEmptyLines(); |
98 | if (verification_positions.beginning_of_first_line == 0) { |
99 | verification_positions.beginning_of_first_line = position_buffer; |
100 | } |
101 | |
102 | verification_positions.end_of_last_line = position_buffer; |
103 | return true; |
104 | } |
105 | } |
106 | return false; |
107 | } |
108 | SkipEmptyLines(); |
109 | if (verification_positions.beginning_of_first_line == 0) { |
110 | verification_positions.beginning_of_first_line = position_buffer; |
111 | } |
112 | |
113 | verification_positions.end_of_last_line = position_buffer; |
114 | return true; |
115 | } |
116 | |
117 | // We have to move position up to next new line |
118 | idx_t end_buffer_real = end_buffer; |
119 | // Check if we already start in a valid line |
120 | string error_message; |
121 | bool successfully_read_first_line = false; |
122 | while (!successfully_read_first_line) { |
123 | DataChunk first_line_chunk; |
124 | first_line_chunk.Initialize(allocator, types: return_types); |
125 | // Ensure that parse_chunk has no gunk when trying to figure new line |
126 | parse_chunk.Reset(); |
127 | for (; position_buffer < end_buffer; position_buffer++) { |
128 | if (StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer])) { |
129 | bool carriage_return = (*buffer)[position_buffer] == '\r'; |
130 | bool carriage_return_followed = false; |
131 | position_buffer++; |
132 | if (position_buffer < end_buffer) { |
133 | if (carriage_return && (*buffer)[position_buffer] == '\n') { |
134 | carriage_return_followed = true; |
135 | position_buffer++; |
136 | } |
137 | } |
138 | if (NewLineDelimiter(carry: carriage_return, carry_followed_by_nl: carriage_return_followed, first_char: position_buffer - 1 == start_buffer)) { |
139 | break; |
140 | } |
141 | } |
142 | } |
143 | SkipEmptyLines(); |
144 | |
145 | if (position_buffer > buffer_size) { |
146 | break; |
147 | } |
148 | |
149 | if (position_buffer >= end_buffer && !StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer - 1])) { |
150 | break; |
151 | } |
152 | |
153 | if (position_buffer > end_buffer && options.new_line == NewLineIdentifier::CARRY_ON && |
154 | (*buffer)[position_buffer - 1] == '\n') { |
155 | break; |
156 | } |
157 | idx_t position_set = position_buffer; |
158 | start_buffer = position_buffer; |
159 | // We check if we can add this line |
160 | // disable the projection pushdown while reading the first line |
161 | // otherwise the first line parsing can be influenced by which columns we are reading |
162 | auto column_ids = std::move(reader_data.column_ids); |
163 | auto column_mapping = std::move(reader_data.column_mapping); |
164 | InitializeProjection(); |
165 | try { |
166 | successfully_read_first_line = TryParseSimpleCSV(insert_chunk&: first_line_chunk, error_message, try_add_line: true); |
167 | } catch (...) { |
168 | successfully_read_first_line = false; |
169 | } |
170 | // restore the projection pushdown |
171 | reader_data.column_ids = std::move(column_ids); |
172 | reader_data.column_mapping = std::move(column_mapping); |
173 | end_buffer = end_buffer_real; |
174 | start_buffer = position_set; |
175 | if (position_buffer >= end_buffer) { |
176 | if (successfully_read_first_line) { |
177 | position_buffer = position_set; |
178 | } |
179 | break; |
180 | } |
181 | position_buffer = position_set; |
182 | } |
183 | if (verification_positions.beginning_of_first_line == 0) { |
184 | verification_positions.beginning_of_first_line = position_buffer; |
185 | } |
186 | // Ensure that parse_chunk has no gunk when trying to figure new line |
187 | parse_chunk.Reset(); |
188 | |
189 | verification_positions.end_of_last_line = position_buffer; |
190 | finished = false; |
191 | return successfully_read_first_line; |
192 | } |
193 | |
194 | void ParallelCSVReader::SetBufferRead(unique_ptr<CSVBufferRead> buffer_read_p) { |
195 | if (!buffer_read_p->buffer) { |
196 | throw InternalException("ParallelCSVReader::SetBufferRead - CSVBufferRead does not have a buffer to read" ); |
197 | } |
198 | position_buffer = buffer_read_p->buffer_start; |
199 | start_buffer = buffer_read_p->buffer_start; |
200 | end_buffer = buffer_read_p->buffer_end; |
201 | if (buffer_read_p->next_buffer) { |
202 | buffer_size = buffer_read_p->buffer->GetBufferSize() + buffer_read_p->next_buffer->GetBufferSize(); |
203 | } else { |
204 | buffer_size = buffer_read_p->buffer->GetBufferSize(); |
205 | } |
206 | buffer = std::move(buffer_read_p); |
207 | |
208 | reached_remainder_state = false; |
209 | verification_positions.beginning_of_first_line = 0; |
210 | verification_positions.end_of_last_line = 0; |
211 | finished = false; |
212 | D_ASSERT(end_buffer <= buffer_size); |
213 | } |
214 | |
215 | VerificationPositions ParallelCSVReader::GetVerificationPositions() { |
216 | verification_positions.beginning_of_first_line += buffer->buffer->GetCSVGlobalStart(); |
217 | verification_positions.end_of_last_line += buffer->buffer->GetCSVGlobalStart(); |
218 | return verification_positions; |
219 | } |
220 | |
221 | // If BufferRemainder returns false, it means we are done scanning this buffer and should go to the end_state |
222 | bool ParallelCSVReader::BufferRemainder() { |
223 | if (position_buffer >= end_buffer && !reached_remainder_state) { |
224 | // First time we finish the buffer piece we should scan here, we set the variables |
225 | // to allow this piece to be scanned up to the end of the buffer or the next new line |
226 | reached_remainder_state = true; |
227 | // end_buffer is allowed to go to buffer size to finish its last line |
228 | end_buffer = buffer_size; |
229 | } |
230 | if (position_buffer >= end_buffer) { |
231 | // buffer ends, return false |
232 | return false; |
233 | } |
234 | // we can still scan stuff, return true |
235 | return true; |
236 | } |
237 | |
238 | void ParallelCSVReader::VerifyLineLength(idx_t line_size) { |
239 | if (line_size > options.maximum_line_size) { |
240 | throw InvalidInputException("Error in file \"%s\" on line %s: Maximum line size of %llu bytes exceeded!" , |
241 | options.file_path, |
242 | GetLineNumberStr(line_error: parse_chunk.size(), is_line_estimated: linenr_estimated, buffer_idx: buffer->batch_index).c_str(), |
243 | options.maximum_line_size); |
244 | } |
245 | } |
246 | |
247 | bool AllNewLine(string_t value, idx_t column_amount) { |
248 | auto value_str = value.GetString(); |
249 | if (value_str.empty() && column_amount == 1) { |
250 | // This is a one column (empty) |
251 | return false; |
252 | } |
253 | for (idx_t i = 0; i < value.GetSize(); i++) { |
254 | if (!StringUtil::CharacterIsNewline(c: value_str[i])) { |
255 | return false; |
256 | } |
257 | } |
258 | return true; |
259 | } |
260 | |
261 | bool ParallelCSVReader::TryParseSimpleCSV(DataChunk &insert_chunk, string &error_message, bool try_add_line) { |
262 | // If line is not set, we have to figure it out, we assume whatever is in the first line |
263 | if (options.new_line == NewLineIdentifier::NOT_SET) { |
264 | idx_t cur_pos = position_buffer; |
265 | // we can start in the middle of a new line, so move a bit forward. |
266 | while (cur_pos < end_buffer) { |
267 | if (StringUtil::CharacterIsNewline(c: (*buffer)[cur_pos])) { |
268 | cur_pos++; |
269 | } else { |
270 | break; |
271 | } |
272 | } |
273 | for (; cur_pos < end_buffer; cur_pos++) { |
274 | if (StringUtil::CharacterIsNewline(c: (*buffer)[cur_pos])) { |
275 | bool carriage_return = (*buffer)[cur_pos] == '\r'; |
276 | bool carriage_return_followed = false; |
277 | cur_pos++; |
278 | if (cur_pos < end_buffer) { |
279 | if (carriage_return && (*buffer)[cur_pos] == '\n') { |
280 | carriage_return_followed = true; |
281 | cur_pos++; |
282 | } |
283 | } |
284 | SetNewLineDelimiter(carry: carriage_return, carry_followed_by_nl: carriage_return_followed); |
285 | break; |
286 | } |
287 | } |
288 | } |
289 | // used for parsing algorithm |
290 | if (start_buffer == buffer_size) { |
291 | // Nothing to read |
292 | finished = true; |
293 | return true; |
294 | } |
295 | D_ASSERT(end_buffer <= buffer_size); |
296 | bool finished_chunk = false; |
297 | idx_t column = 0; |
298 | idx_t offset = 0; |
299 | bool has_quotes = false; |
300 | |
301 | vector<idx_t> escape_positions; |
302 | if ((start_buffer == buffer->buffer_start || start_buffer == buffer->buffer_end) && !try_add_line) { |
303 | // First time reading this buffer piece |
304 | if (!SetPosition()) { |
305 | finished = true; |
306 | return true; |
307 | } |
308 | } |
309 | if (position_buffer == buffer_size) { |
310 | // Nothing to read |
311 | finished = true; |
312 | return true; |
313 | } |
314 | // Keep track of line size |
315 | idx_t line_start = position_buffer; |
316 | // start parsing the first value |
317 | goto value_start; |
318 | |
319 | value_start : { |
320 | /* state: value_start */ |
321 | if (!BufferRemainder()) { |
322 | goto final_state; |
323 | } |
324 | offset = 0; |
325 | |
326 | // this state parses the first character of a value |
327 | if ((*buffer)[position_buffer] == options.quote[0]) { |
328 | // quote: actual value starts in the next position |
329 | // move to in_quotes state |
330 | start_buffer = position_buffer + 1; |
331 | goto in_quotes; |
332 | } else { |
333 | // no quote, move to normal parsing state |
334 | start_buffer = position_buffer; |
335 | goto normal; |
336 | } |
337 | }; |
338 | |
339 | normal : { |
340 | /* state: normal parsing state */ |
341 | // this state parses the remainder of a non-quoted value until we reach a delimiter or newline |
342 | for (; position_buffer < end_buffer; position_buffer++) { |
343 | auto c = (*buffer)[position_buffer]; |
344 | if (c == options.delimiter[0]) { |
345 | // delimiter: end the value and add it to the chunk |
346 | goto add_value; |
347 | } else if (c == options.quote[0] && try_add_line) { |
348 | return false; |
349 | } else if (StringUtil::CharacterIsNewline(c)) { |
350 | // newline: add row |
351 | if (column > 0 || try_add_line || parse_chunk.data.size() == 1) { |
352 | goto add_row; |
353 | } |
354 | if (column == 0 && position_buffer == start_buffer) { |
355 | start_buffer++; |
356 | } |
357 | } |
358 | } |
359 | if (!BufferRemainder()) { |
360 | goto final_state; |
361 | } else { |
362 | goto normal; |
363 | } |
364 | }; |
365 | |
366 | add_value : { |
367 | /* state: Add value to string vector */ |
368 | AddValue(str_val: buffer->GetValue(start_buffer, position_buffer, offset), column, escape_positions, has_quotes, |
369 | buffer_idx: buffer->local_batch_index); |
370 | // increase position by 1 and move start to the new position |
371 | offset = 0; |
372 | has_quotes = false; |
373 | start_buffer = ++position_buffer; |
374 | if (!BufferRemainder()) { |
375 | goto final_state; |
376 | } |
377 | goto value_start; |
378 | }; |
379 | |
380 | add_row : { |
381 | /* state: Add Row to Parse chunk */ |
382 | // check type of newline (\r or \n) |
383 | bool carriage_return = (*buffer)[position_buffer] == '\r'; |
384 | |
385 | AddValue(str_val: buffer->GetValue(start_buffer, position_buffer, offset), column, escape_positions, has_quotes, |
386 | buffer_idx: buffer->local_batch_index); |
387 | if (try_add_line) { |
388 | bool success = column == insert_chunk.ColumnCount(); |
389 | if (success) { |
390 | idx_t cur_linenr = linenr; |
391 | AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index); |
392 | success = Flush(insert_chunk, buffer_idx: buffer->local_batch_index, try_add_line: true); |
393 | linenr = cur_linenr; |
394 | } |
395 | reached_remainder_state = false; |
396 | parse_chunk.Reset(); |
397 | return success; |
398 | } else { |
399 | VerifyLineLength(line_size: position_buffer - line_start); |
400 | line_start = position_buffer; |
401 | finished_chunk = AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index); |
402 | } |
403 | // increase position by 1 and move start to the new position |
404 | offset = 0; |
405 | has_quotes = false; |
406 | position_buffer++; |
407 | start_buffer = position_buffer; |
408 | verification_positions.end_of_last_line = position_buffer; |
409 | if (carriage_return) { |
410 | // \r newline, go to special state that parses an optional \n afterwards |
411 | // optionally skips a newline (\n) character, which allows \r\n to be interpreted as a single line |
412 | if (!BufferRemainder()) { |
413 | goto final_state; |
414 | } |
415 | if ((*buffer)[position_buffer] == '\n') { |
416 | if (options.new_line == NewLineIdentifier::SINGLE) { |
417 | error_message = "Wrong NewLine Identifier. Expecting \\r\\n" ; |
418 | return false; |
419 | } |
420 | // newline after carriage return: skip |
421 | // increase position by 1 and move start to the new position |
422 | start_buffer = ++position_buffer; |
423 | |
424 | SkipEmptyLines(); |
425 | verification_positions.end_of_last_line = position_buffer; |
426 | start_buffer = position_buffer; |
427 | if (reached_remainder_state) { |
428 | goto final_state; |
429 | } |
430 | } else { |
431 | if (options.new_line == NewLineIdentifier::CARRY_ON) { |
432 | error_message = "Wrong NewLine Identifier. Expecting \\r or \\n" ; |
433 | return false; |
434 | } |
435 | } |
436 | if (!BufferRemainder()) { |
437 | goto final_state; |
438 | } |
439 | if (reached_remainder_state || finished_chunk) { |
440 | goto final_state; |
441 | } |
442 | goto value_start; |
443 | } else { |
444 | if (options.new_line == NewLineIdentifier::CARRY_ON) { |
445 | error_message = "Wrong NewLine Identifier. Expecting \\r or \\n" ; |
446 | return false; |
447 | } |
448 | if (reached_remainder_state) { |
449 | goto final_state; |
450 | } |
451 | if (!BufferRemainder()) { |
452 | goto final_state; |
453 | } |
454 | SkipEmptyLines(); |
455 | verification_positions.end_of_last_line = position_buffer; |
456 | start_buffer = position_buffer; |
457 | // \n newline, move to value start |
458 | if (finished_chunk) { |
459 | goto final_state; |
460 | } |
461 | goto value_start; |
462 | } |
463 | } |
464 | in_quotes: |
465 | /* state: in_quotes this state parses the remainder of a quoted value*/ |
466 | has_quotes = true; |
467 | position_buffer++; |
468 | for (; position_buffer < end_buffer; position_buffer++) { |
469 | auto c = (*buffer)[position_buffer]; |
470 | if (c == options.quote[0]) { |
471 | // quote: move to unquoted state |
472 | goto unquote; |
473 | } else if (c == options.escape[0]) { |
474 | // escape: store the escaped position and move to handle_escape state |
475 | escape_positions.push_back(x: position_buffer - start_buffer); |
476 | goto handle_escape; |
477 | } |
478 | } |
479 | if (!BufferRemainder()) { |
480 | if (buffer->buffer->IsCSVFileLastBuffer()) { |
481 | if (try_add_line) { |
482 | return false; |
483 | } |
484 | // still in quoted state at the end of the file or at the end of a buffer when running multithreaded, error: |
485 | throw InvalidInputException("Error in file \"%s\" on line %s: unterminated quotes. (%s)" , options.file_path, |
486 | GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(), |
487 | options.ToString()); |
488 | } else { |
489 | goto final_state; |
490 | } |
491 | } else { |
492 | position_buffer--; |
493 | goto in_quotes; |
494 | } |
495 | |
496 | unquote : { |
497 | /* state: unquote: this state handles the state directly after we unquote*/ |
498 | // |
499 | // in this state we expect either another quote (entering the quoted state again, and escaping the quote) |
500 | // or a delimiter/newline, ending the current value and moving on to the next value |
501 | position_buffer++; |
502 | if (!BufferRemainder()) { |
503 | offset = 1; |
504 | goto final_state; |
505 | } |
506 | auto c = (*buffer)[position_buffer]; |
507 | if (c == options.quote[0] && (options.escape.empty() || options.escape[0] == options.quote[0])) { |
508 | // escaped quote, return to quoted state and store escape position |
509 | escape_positions.push_back(x: position_buffer - start_buffer); |
510 | goto in_quotes; |
511 | } else if (c == options.delimiter[0]) { |
512 | // delimiter, add value |
513 | offset = 1; |
514 | goto add_value; |
515 | } else if (StringUtil::CharacterIsNewline(c)) { |
516 | offset = 1; |
517 | // FIXME: should this be an assertion? |
518 | D_ASSERT(try_add_line || (!try_add_line && column == parse_chunk.ColumnCount() - 1)); |
519 | goto add_row; |
520 | } else if (position_buffer >= end_buffer) { |
521 | // reached end of buffer |
522 | offset = 1; |
523 | goto final_state; |
524 | } else { |
525 | error_message = StringUtil::Format( |
526 | fmt_str: "Error in file \"%s\" on line %s: quote should be followed by end of value, end of " |
527 | "row or another quote. (%s). " , |
528 | params: options.file_path, params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(), |
529 | params: options.ToString()); |
530 | return false; |
531 | } |
532 | } |
533 | handle_escape : { |
534 | /* state: handle_escape */ |
535 | // escape should be followed by a quote or another escape character |
536 | position_buffer++; |
537 | if (!BufferRemainder()) { |
538 | goto final_state; |
539 | } |
540 | if (position_buffer >= buffer_size && buffer->buffer->IsCSVFileLastBuffer()) { |
541 | error_message = StringUtil::Format( |
542 | fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)" , params: options.file_path, |
543 | params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(), params: options.ToString()); |
544 | return false; |
545 | } |
546 | if ((*buffer)[position_buffer] != options.quote[0] && (*buffer)[position_buffer] != options.escape[0]) { |
547 | error_message = StringUtil::Format( |
548 | fmt_str: "Error in file \"%s\" on line %s: neither QUOTE nor ESCAPE is proceeded by ESCAPE. (%s)" , params: options.file_path, |
549 | params: GetLineNumberStr(line_error: linenr, is_line_estimated: linenr_estimated, buffer_idx: buffer->local_batch_index).c_str(), params: options.ToString()); |
550 | return false; |
551 | } |
552 | // escape was followed by quote or escape, go back to quoted state |
553 | goto in_quotes; |
554 | } |
555 | final_state : { |
556 | /* state: final_stage reached after we finished reading the end_buffer of the csv buffer */ |
557 | // reset end buffer |
558 | end_buffer = buffer->buffer_end; |
559 | if (position_buffer == end_buffer) { |
560 | reached_remainder_state = false; |
561 | } |
562 | if (finished_chunk) { |
563 | if (position_buffer >= end_buffer) { |
564 | if (position_buffer == end_buffer && StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer - 1]) && |
565 | position_buffer < buffer_size) { |
566 | // last position is a new line, we still have to go through one more line of this buffer |
567 | finished = false; |
568 | } else { |
569 | finished = true; |
570 | } |
571 | } |
572 | buffer->lines_read += insert_chunk.size(); |
573 | return true; |
574 | } |
575 | // If this is the last buffer, we have to read the last value |
576 | if (buffer->buffer->IsCSVFileLastBuffer() || (buffer->next_buffer && buffer->next_buffer->IsCSVFileLastBuffer())) { |
577 | if (column > 0 || start_buffer != position_buffer || try_add_line || |
578 | (insert_chunk.data.size() == 1 && start_buffer != position_buffer)) { |
579 | // remaining values to be added to the chunk |
580 | auto str_value = buffer->GetValue(start_buffer, position_buffer, offset); |
581 | if (!AllNewLine(value: str_value, column_amount: insert_chunk.data.size()) || offset == 0) { |
582 | AddValue(str_val: str_value, column, escape_positions, has_quotes, buffer_idx: buffer->local_batch_index); |
583 | if (try_add_line) { |
584 | bool success = column == return_types.size(); |
585 | if (success) { |
586 | auto cur_linenr = linenr; |
587 | AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index); |
588 | success = Flush(insert_chunk, buffer_idx: buffer->local_batch_index); |
589 | linenr = cur_linenr; |
590 | } |
591 | parse_chunk.Reset(); |
592 | reached_remainder_state = false; |
593 | return success; |
594 | } else { |
595 | VerifyLineLength(line_size: position_buffer - line_start); |
596 | line_start = position_buffer; |
597 | AddRow(insert_chunk, column, error_message, buffer_idx: buffer->local_batch_index); |
598 | verification_positions.end_of_last_line = position_buffer; |
599 | } |
600 | } |
601 | } |
602 | } |
603 | // flush the parsed chunk and finalize parsing |
604 | if (mode == ParserMode::PARSING) { |
605 | Flush(insert_chunk, buffer_idx: buffer->local_batch_index); |
606 | buffer->lines_read += insert_chunk.size(); |
607 | } |
608 | if (position_buffer - verification_positions.end_of_last_line > options.buffer_size) { |
609 | error_message = "Line does not fit in one buffer. Increase the buffer size." ; |
610 | return false; |
611 | } |
612 | end_buffer = buffer_size; |
613 | SkipEmptyLines(); |
614 | end_buffer = buffer->buffer_end; |
615 | verification_positions.end_of_last_line = position_buffer; |
616 | if (position_buffer >= end_buffer) { |
617 | if (position_buffer >= end_buffer) { |
618 | if (position_buffer == end_buffer && StringUtil::CharacterIsNewline(c: (*buffer)[position_buffer - 1]) && |
619 | position_buffer < buffer_size) { |
620 | // last position is a new line, we still have to go through one more line of this buffer |
621 | finished = false; |
622 | } else { |
623 | finished = true; |
624 | } |
625 | } |
626 | } |
627 | return true; |
628 | }; |
629 | } |
630 | |
631 | void ParallelCSVReader::ParseCSV(DataChunk &insert_chunk) { |
632 | string error_message; |
633 | if (!TryParseCSV(mode: ParserMode::PARSING, insert_chunk, error_message)) { |
634 | throw InvalidInputException(error_message); |
635 | } |
636 | } |
637 | |
638 | idx_t ParallelCSVReader::GetLineError(idx_t line_error, idx_t buffer_idx) { |
639 | while (true) { |
640 | if (buffer->line_info->CanItGetLine(file_idx, batch_idx: buffer_idx)) { |
641 | auto cur_start = verification_positions.beginning_of_first_line + buffer->buffer->GetCSVGlobalStart(); |
642 | // line errors are 1-indexed |
643 | return buffer->line_info->GetLine(batch_idx: buffer_idx, line_error, file_idx, cur_start, verify: false); |
644 | } |
645 | } |
646 | } |
647 | |
648 | bool ParallelCSVReader::TryParseCSV(ParserMode mode) { |
649 | DataChunk dummy_chunk; |
650 | string error_message; |
651 | return TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message); |
652 | } |
653 | |
654 | void ParallelCSVReader::ParseCSV(ParserMode mode) { |
655 | DataChunk dummy_chunk; |
656 | string error_message; |
657 | if (!TryParseCSV(mode, insert_chunk&: dummy_chunk, error_message)) { |
658 | throw InvalidInputException(error_message); |
659 | } |
660 | } |
661 | |
662 | bool ParallelCSVReader::TryParseCSV(ParserMode parser_mode, DataChunk &insert_chunk, string &error_message) { |
663 | mode = parser_mode; |
664 | return TryParseSimpleCSV(insert_chunk, error_message); |
665 | } |
666 | |
667 | } // namespace duckdb |
668 | |