1#include "duckdb/function/table/read_csv.hpp"
2#include "duckdb/function/function_set.hpp"
3#include "duckdb/main/client_context.hpp"
4#include "duckdb/main/database.hpp"
5#include "duckdb/common/string_util.hpp"
6#include "duckdb/common/enum_util.hpp"
7#include "duckdb/common/union_by_name.hpp"
8#include "duckdb/main/config.hpp"
9#include "duckdb/parser/expression/constant_expression.hpp"
10#include "duckdb/parser/expression/function_expression.hpp"
11#include "duckdb/parser/tableref/table_function_ref.hpp"
12#include "duckdb/planner/operator/logical_get.hpp"
13#include "duckdb/main/extension_helper.hpp"
14#include "duckdb/common/multi_file_reader.hpp"
15#include "duckdb/main/client_data.hpp"
16#include "duckdb/execution/operator/persistent/csv_line_info.hpp"
17#include <limits>
18
19namespace duckdb {
20
21unique_ptr<CSVFileHandle> ReadCSV::OpenCSV(const string &file_path, FileCompressionType compression,
22 ClientContext &context) {
23 auto &fs = FileSystem::GetFileSystem(context);
24 auto &allocator = BufferAllocator::Get(context);
25 return CSVFileHandle::OpenFile(fs, allocator, path: file_path, compression, enable_reset: false);
26}
27
28void ReadCSVData::FinalizeRead(ClientContext &context) {
29 BaseCSVData::Finalize();
30 // Here we identify if we can run this CSV file on parallel or not.
31 bool null_or_empty = options.delimiter.empty() || options.escape.empty() || options.quote.empty() ||
32 options.delimiter[0] == '\0' || options.escape[0] == '\0' || options.quote[0] == '\0';
33 bool complex_options = options.delimiter.size() > 1 || options.escape.size() > 1 || options.quote.size() > 1;
34 bool not_supported_options = options.null_padding;
35
36 auto number_of_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
37 if (options.parallel_mode != ParallelMode::PARALLEL && int64_t(files.size() * 2) >= number_of_threads) {
38 single_threaded = true;
39 }
40 if (options.parallel_mode == ParallelMode::SINGLE_THREADED || null_or_empty || not_supported_options ||
41 complex_options || options.new_line == NewLineIdentifier::MIX) {
42 // not supported for parallel CSV reading
43 single_threaded = true;
44 }
45}
46
47uint8_t GetCandidateSpecificity(const LogicalType &candidate_type) {
48 //! Const ht with accepted auto_types and their weights in specificity
49 const duckdb::unordered_map<uint8_t, uint8_t> auto_type_candidates_specificity {
50 {(uint8_t)LogicalTypeId::VARCHAR, 0}, {(uint8_t)LogicalTypeId::TIMESTAMP, 1},
51 {(uint8_t)LogicalTypeId::DATE, 2}, {(uint8_t)LogicalTypeId::TIME, 3},
52 {(uint8_t)LogicalTypeId::DOUBLE, 4}, {(uint8_t)LogicalTypeId::FLOAT, 5},
53 {(uint8_t)LogicalTypeId::BIGINT, 6}, {(uint8_t)LogicalTypeId::INTEGER, 7},
54 {(uint8_t)LogicalTypeId::SMALLINT, 8}, {(uint8_t)LogicalTypeId::TINYINT, 9},
55 {(uint8_t)LogicalTypeId::BOOLEAN, 10}, {(uint8_t)LogicalTypeId::SQLNULL, 11}};
56
57 auto id = (uint8_t)candidate_type.id();
58 auto it = auto_type_candidates_specificity.find(x: id);
59 if (it == auto_type_candidates_specificity.end()) {
60 throw BinderException("Auto Type Candidate of type %s is not accepted as a valid input",
61 EnumUtil::ToString(value: candidate_type.id()));
62 }
63 return it->second;
64}
65
66static unique_ptr<FunctionData> ReadCSVBind(ClientContext &context, TableFunctionBindInput &input,
67 vector<LogicalType> &return_types, vector<string> &names) {
68 auto result = make_uniq<ReadCSVData>();
69 auto &options = result->options;
70 result->files = MultiFileReader::GetFileList(context, input: input.inputs[0], name: "CSV");
71
72 bool explicitly_set_columns = false;
73 for (auto &kv : input.named_parameters) {
74 if (MultiFileReader::ParseOption(key: kv.first, val: kv.second, options&: options.file_options)) {
75 continue;
76 }
77 auto loption = StringUtil::Lower(str: kv.first);
78 if (loption == "columns") {
79 explicitly_set_columns = true;
80 auto &child_type = kv.second.type();
81 if (child_type.id() != LogicalTypeId::STRUCT) {
82 throw BinderException("read_csv columns requires a struct as input");
83 }
84 auto &struct_children = StructValue::GetChildren(value: kv.second);
85 D_ASSERT(StructType::GetChildCount(child_type) == struct_children.size());
86 for (idx_t i = 0; i < struct_children.size(); i++) {
87 auto &name = StructType::GetChildName(type: child_type, index: i);
88 auto &val = struct_children[i];
89 names.push_back(x: name);
90 if (val.type().id() != LogicalTypeId::VARCHAR) {
91 throw BinderException("read_csv requires a type specification as string");
92 }
93 return_types.emplace_back(args: TransformStringToLogicalType(str: StringValue::Get(value: val), context));
94 }
95 if (names.empty()) {
96 throw BinderException("read_csv requires at least a single column as input!");
97 }
98 } else if (loption == "auto_type_candidates") {
99 options.auto_type_candidates.clear();
100 map<uint8_t, LogicalType> candidate_types;
101 // We always have the extremes of Null and Varchar, so we can default to varchar if the
102 // sniffer is not able to confidently detect that column type
103 candidate_types[GetCandidateSpecificity(candidate_type: LogicalType::VARCHAR)] = LogicalType::VARCHAR;
104 candidate_types[GetCandidateSpecificity(candidate_type: LogicalType::SQLNULL)] = LogicalType::SQLNULL;
105
106 auto &child_type = kv.second.type();
107 if (child_type.id() != LogicalTypeId::LIST) {
108 throw BinderException("read_csv auto_types requires a list as input");
109 }
110 auto &list_children = ListValue::GetChildren(value: kv.second);
111 if (list_children.empty()) {
112 throw BinderException("auto_type_candidates requires at least one type");
113 }
114 for (auto &child : list_children) {
115 if (child.type().id() != LogicalTypeId::VARCHAR) {
116 throw BinderException("auto_type_candidates requires a type specification as string");
117 }
118 auto candidate_type = TransformStringToLogicalType(str: StringValue::Get(value: child), context);
119 candidate_types[GetCandidateSpecificity(candidate_type)] = candidate_type;
120 }
121 for (auto &candidate_type : candidate_types) {
122 options.auto_type_candidates.emplace_back(args&: candidate_type.second);
123 }
124 } else if (loption == "column_names" || loption == "names") {
125 if (!options.name_list.empty()) {
126 throw BinderException("read_csv_auto column_names/names can only be supplied once");
127 }
128 if (kv.second.IsNull()) {
129 throw BinderException("read_csv_auto %s cannot be NULL", kv.first);
130 }
131 auto &children = ListValue::GetChildren(value: kv.second);
132 for (auto &child : children) {
133 options.name_list.push_back(x: StringValue::Get(value: child));
134 }
135 } else if (loption == "column_types" || loption == "types" || loption == "dtypes") {
136 auto &child_type = kv.second.type();
137 if (child_type.id() != LogicalTypeId::STRUCT && child_type.id() != LogicalTypeId::LIST) {
138 throw BinderException("read_csv_auto %s requires a struct or list as input", kv.first);
139 }
140 if (!options.sql_type_list.empty()) {
141 throw BinderException("read_csv_auto column_types/types/dtypes can only be supplied once");
142 }
143 vector<string> sql_type_names;
144 if (child_type.id() == LogicalTypeId::STRUCT) {
145 auto &struct_children = StructValue::GetChildren(value: kv.second);
146 D_ASSERT(StructType::GetChildCount(child_type) == struct_children.size());
147 for (idx_t i = 0; i < struct_children.size(); i++) {
148 auto &name = StructType::GetChildName(type: child_type, index: i);
149 auto &val = struct_children[i];
150 if (val.type().id() != LogicalTypeId::VARCHAR) {
151 throw BinderException("read_csv_auto %s requires a type specification as string", kv.first);
152 }
153 sql_type_names.push_back(x: StringValue::Get(value: val));
154 options.sql_types_per_column[name] = i;
155 }
156 } else {
157 auto &list_child = ListType::GetChildType(type: child_type);
158 if (list_child.id() != LogicalTypeId::VARCHAR) {
159 throw BinderException("read_csv_auto %s requires a list of types (varchar) as input", kv.first);
160 }
161 auto &children = ListValue::GetChildren(value: kv.second);
162 for (auto &child : children) {
163 sql_type_names.push_back(x: StringValue::Get(value: child));
164 }
165 }
166 options.sql_type_list.reserve(n: sql_type_names.size());
167 for (auto &sql_type : sql_type_names) {
168 auto def_type = TransformStringToLogicalType(str: sql_type);
169 if (def_type.id() == LogicalTypeId::USER) {
170 throw BinderException("Unrecognized type \"%s\" for read_csv_auto %s definition", sql_type,
171 kv.first);
172 }
173 options.sql_type_list.push_back(x: std::move(def_type));
174 }
175 } else if (loption == "all_varchar") {
176 options.all_varchar = BooleanValue::Get(value: kv.second);
177 } else if (loption == "normalize_names") {
178 options.normalize_names = BooleanValue::Get(value: kv.second);
179 } else {
180 options.SetReadOption(loption, value: kv.second, expected_names&: names);
181 }
182 }
183 if (options.file_options.auto_detect_hive_partitioning) {
184 options.file_options.hive_partitioning = MultiFileReaderOptions::AutoDetectHivePartitioning(files: result->files);
185 }
186
187 if (!options.auto_detect && return_types.empty()) {
188 throw BinderException("read_csv requires columns to be specified through the 'columns' option. Use "
189 "read_csv_auto or set read_csv(..., "
190 "AUTO_DETECT=TRUE) to automatically guess columns.");
191 }
192 if (options.auto_detect) {
193 options.file_path = result->files[0];
194 auto initial_reader = make_uniq<BufferedCSVReader>(args&: context, args&: options);
195 return_types.assign(first: initial_reader->return_types.begin(), last: initial_reader->return_types.end());
196 if (names.empty()) {
197 names.assign(first: initial_reader->names.begin(), last: initial_reader->names.end());
198 } else {
199 if (explicitly_set_columns) {
200 // The user has influenced the names, can't assume they are valid anymore
201 if (return_types.size() != names.size()) {
202 throw BinderException("The amount of names specified (%d) and the observed amount of types (%d) in "
203 "the file don't match",
204 names.size(), return_types.size());
205 }
206 } else {
207 D_ASSERT(return_types.size() == names.size());
208 }
209 initial_reader->names = names;
210 }
211 options = initial_reader->options;
212 result->initial_reader = std::move(initial_reader);
213 } else {
214 D_ASSERT(return_types.size() == names.size());
215 }
216 result->csv_types = return_types;
217 result->csv_names = names;
218
219 if (options.file_options.union_by_name) {
220 result->reader_bind =
221 MultiFileReader::BindUnionReader<BufferedCSVReader>(context, return_types, names, result&: *result, options);
222 if (result->union_readers.size() > 1) {
223 result->column_info.emplace_back(args&: result->csv_names, args&: result->csv_types);
224 for (idx_t i = 1; i < result->union_readers.size(); i++) {
225 result->column_info.emplace_back(args&: result->union_readers[i]->names,
226 args&: result->union_readers[i]->return_types);
227 }
228 }
229 if (!options.sql_types_per_column.empty()) {
230 auto exception = BufferedCSVReader::ColumnTypesError(sql_types_per_column: options.sql_types_per_column, names);
231 if (!exception.empty()) {
232 throw BinderException(exception);
233 }
234 }
235 } else {
236 result->reader_bind = MultiFileReader::BindOptions(options&: options.file_options, files: result->files, return_types, names);
237 }
238 result->return_types = return_types;
239 result->return_names = names;
240 result->FinalizeRead(context);
241 return std::move(result);
242}
243
244static unique_ptr<FunctionData> ReadCSVAutoBind(ClientContext &context, TableFunctionBindInput &input,
245 vector<LogicalType> &return_types, vector<string> &names) {
246 input.named_parameters["auto_detect"] = Value::BOOLEAN(value: true);
247 return ReadCSVBind(context, input, return_types, names);
248}
249
250//===--------------------------------------------------------------------===//
251// Parallel CSV Reader CSV Global State
252//===--------------------------------------------------------------------===//
253
254struct ParallelCSVGlobalState : public GlobalTableFunctionState {
255public:
256 ParallelCSVGlobalState(ClientContext &context, unique_ptr<CSVFileHandle> file_handle_p,
257 const vector<string> &files_path_p, idx_t system_threads_p, idx_t buffer_size_p,
258 idx_t rows_to_skip, bool force_parallelism_p, vector<column_t> column_ids_p, bool has_header)
259 : file_handle(std::move(file_handle_p)), system_threads(system_threads_p), buffer_size(buffer_size_p),
260 force_parallelism(force_parallelism_p), column_ids(std::move(column_ids_p)),
261 line_info(main_mutex, batch_to_tuple_end, tuple_start, tuple_end) {
262 file_handle->DisableReset();
263 current_file_path = files_path_p[0];
264 line_info.lines_read[0] = rows_to_skip;
265 if (has_header) {
266 line_info.lines_read[0]++;
267 }
268 file_size = file_handle->FileSize();
269 first_file_size = file_size;
270 on_disk_file = file_handle->OnDiskFile();
271 bytes_read = 0;
272 if (buffer_size < file_size || file_size == 0) {
273 bytes_per_local_state = buffer_size / ParallelCSVGlobalState::MaxThreads();
274 } else {
275 bytes_per_local_state = file_size / MaxThreads();
276 }
277 if (bytes_per_local_state == 0) {
278 // In practice, I think this won't happen, it only happens because we are mocking up test scenarios
279 // this boy needs to be at least one.
280 bytes_per_local_state = 1;
281 }
282 for (idx_t i = 0; i < rows_to_skip; i++) {
283 file_handle->ReadLine();
284 }
285 first_position = current_csv_position;
286 current_buffer = make_shared<CSVBuffer>(args&: context, args&: buffer_size, args&: *file_handle, args&: current_csv_position, args&: file_number);
287 next_buffer = shared_ptr<CSVBuffer>(
288 current_buffer->Next(file_handle&: *file_handle, buffer_size, global_csv_current_position&: current_csv_position, file_number).release());
289 running_threads = MaxThreads();
290
291 // Initialize all the book-keeping variables
292 auto file_count = files_path_p.size();
293 line_info.current_batches.resize(new_size: file_count);
294 tuple_start.resize(new_size: file_count);
295 tuple_end.resize(new_size: file_count);
296 tuple_end_to_batch.resize(new_size: file_count);
297 batch_to_tuple_end.resize(new_size: file_count);
298 }
299 ParallelCSVGlobalState() : line_info(main_mutex, batch_to_tuple_end, tuple_start, tuple_end) {
300 running_threads = MaxThreads();
301 }
302
303 ~ParallelCSVGlobalState() override {
304 }
305
306 //! How many bytes were read up to this point
307 atomic<idx_t> bytes_read;
308 //! Size of current file
309 idx_t file_size;
310
311public:
312 idx_t MaxThreads() const override;
313 //! Updates the CSV reader with the next buffer to read. Returns false if no more buffers are available.
314 bool Next(ClientContext &context, const ReadCSVData &bind_data, unique_ptr<ParallelCSVReader> &reader);
315 //! Verify if the CSV File was read correctly
316 void Verify();
317
318 void UpdateVerification(VerificationPositions positions, idx_t file_number, idx_t batch_idx);
319
320 void UpdateLinesRead(CSVBufferRead &buffer_read, idx_t file_idx);
321
322 void IncrementThread();
323
324 void DecrementThread();
325
326 bool Finished();
327
328 double GetProgress(const ReadCSVData &bind_data) const {
329 idx_t total_files = bind_data.files.size();
330
331 // get the progress WITHIN the current file
332 double progress;
333 if (file_size == 0) {
334 progress = 1.0;
335 } else {
336 progress = double(bytes_read) / double(file_size);
337 }
338 // now get the total percentage of files read
339 double percentage = double(file_index - 1) / total_files;
340 percentage += (double(1) / double(total_files)) * progress;
341 return percentage * 100;
342 }
343
344private:
345 //! File Handle for current file
346 unique_ptr<CSVFileHandle> file_handle;
347 shared_ptr<CSVBuffer> current_buffer;
348 shared_ptr<CSVBuffer> next_buffer;
349
350 //! The index of the next file to read (i.e. current file + 1)
351 idx_t file_index = 1;
352 string current_file_path;
353
354 //! Mutex to lock when getting next batch of bytes (Parallel Only)
355 mutex main_mutex;
356 //! Byte set from for last thread
357 idx_t next_byte = 0;
358 //! How many bytes we should execute per local state
359 idx_t bytes_per_local_state;
360 //! Size of first file
361 idx_t first_file_size;
362 //! Whether or not this is an on-disk file
363 bool on_disk_file = true;
364 //! Basically max number of threads in DuckDB
365 idx_t system_threads;
366 //! Size of the buffers
367 idx_t buffer_size;
368 //! Current batch index
369 idx_t batch_index = 0;
370 idx_t local_batch_index = 0;
371
372 //! Forces parallelism for small CSV Files, should only be used for testing.
373 bool force_parallelism = false;
374 //! Current (Global) position of CSV
375 idx_t current_csv_position = 0;
376 //! First Position of First Buffer
377 idx_t first_position = 0;
378 //! Current File Number
379 idx_t file_number = 0;
380 idx_t max_tuple_end = 0;
381 //! The vector stores positions where threads ended the last line they read in the CSV File, and the set stores
382 //! Positions where they started reading the first line.
383 vector<vector<idx_t>> tuple_end;
384 vector<set<idx_t>> tuple_start;
385 //! Tuple end to batch
386 vector<unordered_map<idx_t, idx_t>> tuple_end_to_batch;
387 //! Batch to Tuple End
388 vector<unordered_map<idx_t, idx_t>> batch_to_tuple_end;
389 idx_t running_threads = 0;
390 //! The column ids to read
391 vector<column_t> column_ids;
392 //! Line Info used in error messages
393 LineInfo line_info;
394};
395
396idx_t ParallelCSVGlobalState::MaxThreads() const {
397 if (force_parallelism || !on_disk_file) {
398 return system_threads;
399 }
400 idx_t one_mb = 1000000; // We initialize max one thread per Mb
401 idx_t threads_per_mb = first_file_size / one_mb + 1;
402 if (threads_per_mb < system_threads || threads_per_mb == 1) {
403 return threads_per_mb;
404 }
405
406 return system_threads;
407}
408
409void ParallelCSVGlobalState::IncrementThread() {
410 lock_guard<mutex> parallel_lock(main_mutex);
411 running_threads++;
412}
413
414void ParallelCSVGlobalState::DecrementThread() {
415 lock_guard<mutex> parallel_lock(main_mutex);
416 D_ASSERT(running_threads > 0);
417 running_threads--;
418}
419
420bool ParallelCSVGlobalState::Finished() {
421 lock_guard<mutex> parallel_lock(main_mutex);
422 return running_threads == 0;
423}
424
425void ParallelCSVGlobalState::Verify() {
426 // All threads are done, we run some magic sweet verification code
427 lock_guard<mutex> parallel_lock(main_mutex);
428 if (running_threads == 0) {
429 D_ASSERT(tuple_end.size() == tuple_start.size());
430 for (idx_t i = 0; i < tuple_start.size(); i++) {
431 auto &current_tuple_end = tuple_end[i];
432 auto &current_tuple_start = tuple_start[i];
433 // figure out max value of last_pos
434 if (current_tuple_end.empty()) {
435 return;
436 }
437 auto max_value = *max_element(first: std::begin(cont&: current_tuple_end), last: std::end(cont&: current_tuple_end));
438 for (idx_t tpl_idx = 0; tpl_idx < current_tuple_end.size(); tpl_idx++) {
439 auto last_pos = current_tuple_end[tpl_idx];
440 auto first_pos = current_tuple_start.find(x: last_pos);
441 if (first_pos == current_tuple_start.end()) {
442 // this might be necessary due to carriage returns outside buffer scopes.
443 first_pos = current_tuple_start.find(x: last_pos + 1);
444 }
445 if (first_pos == current_tuple_start.end() && last_pos != max_value) {
446 auto batch_idx = tuple_end_to_batch[i][last_pos];
447 auto problematic_line = line_info.GetLine(batch_idx);
448 throw InvalidInputException(
449 "CSV File not supported for multithreading. This can be a problematic line in your CSV File or "
450 "that this CSV can't be read in Parallel. Please, inspect if the line %llu is correct. If so, "
451 "please run single-threaded CSV Reading by setting parallel=false in the read_csv call.",
452 problematic_line);
453 }
454 }
455 }
456 }
457}
458
459void LineInfo::Verify(idx_t file_idx, idx_t batch_idx, idx_t cur_first_pos) {
460 auto &tuple_start_set = tuple_start[file_idx];
461 auto &processed_batches = batch_to_tuple_end[file_idx];
462 auto &tuple_end_vec = tuple_end[file_idx];
463 bool has_error = false;
464 idx_t problematic_line;
465 if (batch_idx == 0 || tuple_start_set.empty()) {
466 return;
467 }
468 for (idx_t cur_batch = 0; cur_batch < batch_idx - 1; cur_batch++) {
469 auto cur_end = tuple_end_vec[processed_batches[cur_batch]];
470 auto first_pos = tuple_start_set.find(x: cur_end);
471 if (first_pos == tuple_start_set.end()) {
472 has_error = true;
473 problematic_line = GetLine(batch_idx: cur_batch);
474 break;
475 }
476 }
477 if (!has_error) {
478 auto cur_end = tuple_end_vec[processed_batches[batch_idx - 1]];
479 if (cur_end != cur_first_pos) {
480 has_error = true;
481 problematic_line = GetLine(batch_idx);
482 }
483 }
484 if (has_error) {
485 throw InvalidInputException(
486 "CSV File not supported for multithreading. This can be a problematic line in your CSV File or "
487 "that this CSV can't be read in Parallel. Please, inspect if the line %llu is correct. If so, "
488 "please run single-threaded CSV Reading by setting parallel=false in the read_csv call.",
489 problematic_line);
490 }
491}
492
493bool ParallelCSVGlobalState::Next(ClientContext &context, const ReadCSVData &bind_data,
494 unique_ptr<ParallelCSVReader> &reader) {
495 lock_guard<mutex> parallel_lock(main_mutex);
496 if (!current_buffer) {
497 // This means we are done with the current file, we need to go to the next one (if exists).
498 if (file_index < bind_data.files.size()) {
499 current_file_path = bind_data.files[file_index++];
500 file_handle = ReadCSV::OpenCSV(file_path: current_file_path, compression: bind_data.options.compression, context);
501 current_csv_position = 0;
502 file_number++;
503 local_batch_index = 0;
504 current_buffer =
505 make_shared<CSVBuffer>(args&: context, args&: buffer_size, args&: *file_handle, args&: current_csv_position, args&: file_number);
506 next_buffer = shared_ptr<CSVBuffer>(
507 current_buffer->Next(file_handle&: *file_handle, buffer_size, global_csv_current_position&: current_csv_position, file_number).release());
508 } else {
509 // We are done scanning.
510 reader.reset();
511 return false;
512 }
513 }
514 // set up the current buffer
515 line_info.current_batches.back().insert(x: local_batch_index);
516 auto result = make_uniq<CSVBufferRead>(args&: current_buffer, args&: next_buffer, args&: next_byte, args: next_byte + bytes_per_local_state,
517 args: batch_index++, args: local_batch_index++, args: &line_info);
518 // move the byte index of the CSV reader to the next buffer
519 next_byte += bytes_per_local_state;
520 if (next_byte >= current_buffer->GetBufferSize()) {
521 // We replace the current buffer with the next buffer
522 next_byte = 0;
523 bytes_read += current_buffer->GetBufferSize();
524 current_buffer = next_buffer;
525 if (next_buffer) {
526 // Next buffer gets the next-next buffer
527 next_buffer = shared_ptr<CSVBuffer>(
528 next_buffer->Next(file_handle&: *file_handle, buffer_size, global_csv_current_position&: current_csv_position, file_number).release());
529 }
530 }
531 if (!reader || reader->options.file_path != current_file_path) {
532 // we either don't have a reader, or the reader was created for a different file
533 // we need to create a new reader and instantiate it
534 if (file_index > 0 && file_index <= bind_data.union_readers.size() && bind_data.union_readers[file_index - 1]) {
535 // we are doing UNION BY NAME - fetch the options from the union reader for this file
536 auto &union_reader = *bind_data.union_readers[file_index - 1];
537 reader = make_uniq<ParallelCSVReader>(args&: context, args&: union_reader.options, args: std::move(result), args&: first_position,
538 args: union_reader.GetTypes(), args: file_index - 1);
539 reader->names = union_reader.GetNames();
540 } else if (file_index <= bind_data.column_info.size()) {
541 // Serialized Union By name
542 reader = make_uniq<ParallelCSVReader>(args&: context, args: bind_data.options, args: std::move(result), args&: first_position,
543 args: bind_data.column_info[file_index - 1].types, args: file_index - 1);
544 reader->names = bind_data.column_info[file_index - 1].names;
545 } else {
546 // regular file - use the standard options
547 reader = make_uniq<ParallelCSVReader>(args&: context, args: bind_data.options, args: std::move(result), args&: first_position,
548 args: bind_data.csv_types, args: file_index - 1);
549 reader->names = bind_data.csv_names;
550 }
551 reader->options.file_path = current_file_path;
552 MultiFileReader::InitializeReader(reader&: *reader, options: bind_data.options.file_options, bind_data: bind_data.reader_bind,
553 global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: column_ids, table_filters: nullptr,
554 initial_file: bind_data.files.front());
555 } else {
556 // update the current reader
557 reader->SetBufferRead(std::move(result));
558 }
559 return true;
560}
561void ParallelCSVGlobalState::UpdateVerification(VerificationPositions positions, idx_t file_number_p, idx_t batch_idx) {
562 lock_guard<mutex> parallel_lock(main_mutex);
563 if (positions.end_of_last_line > max_tuple_end) {
564 max_tuple_end = positions.end_of_last_line;
565 }
566 tuple_end_to_batch[file_number_p][positions.end_of_last_line] = batch_idx;
567 batch_to_tuple_end[file_number_p][batch_idx] = tuple_end[file_number_p].size();
568 tuple_start[file_number_p].insert(x: positions.beginning_of_first_line);
569 tuple_end[file_number_p].push_back(x: positions.end_of_last_line);
570}
571
572void ParallelCSVGlobalState::UpdateLinesRead(CSVBufferRead &buffer_read, idx_t file_idx) {
573 auto batch_idx = buffer_read.local_batch_index;
574 auto lines_read = buffer_read.lines_read;
575 lock_guard<mutex> parallel_lock(main_mutex);
576 line_info.current_batches[file_idx].erase(x: batch_idx);
577 line_info.lines_read[batch_idx] += lines_read;
578}
579
580bool LineInfo::CanItGetLine(idx_t file_idx, idx_t batch_idx) {
581 lock_guard<mutex> parallel_lock(main_mutex);
582 if (current_batches.empty() || done) {
583 return true;
584 }
585 if (file_idx >= current_batches.size() || current_batches[file_idx].empty()) {
586 return true;
587 }
588 auto min_value = *current_batches[file_idx].begin();
589 if (min_value >= batch_idx) {
590 return true;
591 }
592 return false;
593}
594
595// Returns the 1-indexed line number
596idx_t LineInfo::GetLine(idx_t batch_idx, idx_t line_error, idx_t file_idx, idx_t cur_start, bool verify) {
597 unique_ptr<lock_guard<mutex>> parallel_lock;
598 if (!verify) {
599 parallel_lock = duckdb::make_uniq<lock_guard<mutex>>(args&: main_mutex);
600 }
601 idx_t line_count = 0;
602 if (done) {
603 // line count is 0-indexed, but we want to return 1-indexed
604 return first_line + 1;
605 }
606 for (idx_t i = 0; i <= batch_idx; i++) {
607 if (lines_read.find(x: i) == lines_read.end() && i != batch_idx) {
608 throw InternalException("Missing batch index on Parallel CSV Reader GetLine");
609 }
610 line_count += lines_read[i];
611 }
612
613 // before we are done, if this is not a call in Verify() we must check Verify up to this batch
614 if (!verify) {
615 Verify(file_idx, batch_idx, cur_first_pos: cur_start);
616 }
617 done = true;
618 first_line = line_count + line_error;
619 // line count is 0-indexed, but we want to return 1-indexed
620 return first_line + 1;
621}
622
623static unique_ptr<GlobalTableFunctionState> ParallelCSVInitGlobal(ClientContext &context,
624 TableFunctionInitInput &input) {
625 auto &bind_data = input.bind_data->CastNoConst<ReadCSVData>();
626 if (bind_data.files.empty()) {
627 // This can happen when a filename based filter pushdown has eliminated all possible files for this scan.
628 return make_uniq<ParallelCSVGlobalState>();
629 }
630 unique_ptr<CSVFileHandle> file_handle;
631
632 bind_data.options.file_path = bind_data.files[0];
633
634 if (bind_data.initial_reader) {
635 file_handle = std::move(bind_data.initial_reader->file_handle);
636 file_handle->Reset();
637 file_handle->DisableReset();
638 bind_data.initial_reader.reset();
639 } else {
640 file_handle = ReadCSV::OpenCSV(file_path: bind_data.options.file_path, compression: bind_data.options.compression, context);
641 }
642 return make_uniq<ParallelCSVGlobalState>(
643 args&: context, args: std::move(file_handle), args&: bind_data.files, args: context.db->NumberOfThreads(), args&: bind_data.options.buffer_size,
644 args&: bind_data.options.skip_rows, args&: ClientConfig::GetConfig(context).verify_parallelism, args: input.column_ids,
645 args: bind_data.options.header && bind_data.options.has_header);
646}
647
648//===--------------------------------------------------------------------===//
649// Read CSV Local State
650//===--------------------------------------------------------------------===//
651struct ParallelCSVLocalState : public LocalTableFunctionState {
652public:
653 explicit ParallelCSVLocalState(unique_ptr<ParallelCSVReader> csv_reader_p) : csv_reader(std::move(csv_reader_p)) {
654 }
655
656 //! The CSV reader
657 unique_ptr<ParallelCSVReader> csv_reader;
658 CSVBufferRead previous_buffer;
659 bool done = false;
660};
661
662unique_ptr<LocalTableFunctionState> ParallelReadCSVInitLocal(ExecutionContext &context, TableFunctionInitInput &input,
663 GlobalTableFunctionState *global_state_p) {
664 auto &csv_data = input.bind_data->Cast<ReadCSVData>();
665 auto &global_state = global_state_p->Cast<ParallelCSVGlobalState>();
666 unique_ptr<ParallelCSVReader> csv_reader;
667 auto has_next = global_state.Next(context&: context.client, bind_data: csv_data, reader&: csv_reader);
668 if (!has_next) {
669 global_state.DecrementThread();
670 csv_reader.reset();
671 }
672 return make_uniq<ParallelCSVLocalState>(args: std::move(csv_reader));
673}
674
675static void ParallelReadCSVFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
676 auto &bind_data = data_p.bind_data->Cast<ReadCSVData>();
677 auto &csv_global_state = data_p.global_state->Cast<ParallelCSVGlobalState>();
678 auto &csv_local_state = data_p.local_state->Cast<ParallelCSVLocalState>();
679
680 if (!csv_local_state.csv_reader) {
681 // no csv_reader was set, this can happen when a filename-based filter has filtered out all possible files
682 return;
683 }
684
685 do {
686 if (output.size() != 0) {
687 MultiFileReader::FinalizeChunk(bind_data: bind_data.reader_bind, reader_data: csv_local_state.csv_reader->reader_data, chunk&: output);
688 break;
689 }
690 if (csv_local_state.csv_reader->finished) {
691 auto verification_updates = csv_local_state.csv_reader->GetVerificationPositions();
692 csv_global_state.UpdateVerification(positions: verification_updates,
693 file_number_p: csv_local_state.csv_reader->buffer->buffer->GetFileNumber(),
694 batch_idx: csv_local_state.csv_reader->buffer->local_batch_index);
695 csv_global_state.UpdateLinesRead(buffer_read&: *csv_local_state.csv_reader->buffer, file_idx: csv_local_state.csv_reader->file_idx);
696 auto has_next = csv_global_state.Next(context, bind_data, reader&: csv_local_state.csv_reader);
697 if (csv_local_state.csv_reader) {
698 csv_local_state.csv_reader->linenr = 0;
699 }
700 if (!has_next) {
701 csv_global_state.DecrementThread();
702 break;
703 }
704 }
705 csv_local_state.csv_reader->ParseCSV(insert_chunk&: output);
706
707 } while (true);
708 if (csv_global_state.Finished()) {
709 csv_global_state.Verify();
710 }
711}
712
713//===--------------------------------------------------------------------===//
714// Single-Threaded CSV Reader
715//===--------------------------------------------------------------------===//
716struct SingleThreadedCSVState : public GlobalTableFunctionState {
717 explicit SingleThreadedCSVState(idx_t total_files) : total_files(total_files), next_file(0), progress_in_files(0) {
718 }
719
720 mutex csv_lock;
721 unique_ptr<BufferedCSVReader> initial_reader;
722 //! The total number of files to read from
723 idx_t total_files;
724 //! The index of the next file to read (i.e. current file + 1)
725 atomic<idx_t> next_file;
726 //! How far along we are in reading the current set of open files
727 //! This goes from [0...next_file] * 100
728 atomic<idx_t> progress_in_files;
729 //! The set of SQL types
730 vector<LogicalType> csv_types;
731 //! The set of SQL names to be read from the file
732 vector<string> csv_names;
733 //! The column ids to read
734 vector<column_t> column_ids;
735
736 idx_t MaxThreads() const override {
737 return total_files;
738 }
739
740 double GetProgress(const ReadCSVData &bind_data) const {
741 D_ASSERT(total_files == bind_data.files.size());
742 D_ASSERT(progress_in_files <= total_files * 100);
743 return (double(progress_in_files) / double(total_files));
744 }
745
746 unique_ptr<BufferedCSVReader> GetCSVReader(ClientContext &context, ReadCSVData &bind_data, idx_t &file_index,
747 idx_t &total_size) {
748 auto reader = GetCSVReaderInternal(context, bind_data, file_index, total_size);
749 if (reader) {
750 reader->file_handle->DisableReset();
751 }
752 return reader;
753 }
754
755private:
756 unique_ptr<BufferedCSVReader> GetCSVReaderInternal(ClientContext &context, ReadCSVData &bind_data,
757 idx_t &file_index, idx_t &total_size) {
758 BufferedCSVReaderOptions options;
759 {
760 lock_guard<mutex> l(csv_lock);
761 if (initial_reader) {
762 total_size = initial_reader->file_handle ? initial_reader->file_handle->FileSize() : 0;
763 return std::move(initial_reader);
764 }
765 if (next_file >= total_files) {
766 return nullptr;
767 }
768 options = bind_data.options;
769 file_index = next_file;
770 next_file++;
771 }
772 // reuse csv_readers was created during binding
773 unique_ptr<BufferedCSVReader> result;
774 if (file_index < bind_data.union_readers.size() && bind_data.union_readers[file_index]) {
775 result = std::move(bind_data.union_readers[file_index]);
776 } else {
777 auto union_by_name = options.file_options.union_by_name;
778 options.file_path = bind_data.files[file_index];
779 result = make_uniq<BufferedCSVReader>(args&: context, args: std::move(options), args&: csv_types);
780 if (!union_by_name) {
781 result->names = csv_names;
782 }
783 MultiFileReader::InitializeReader(reader&: *result, options: bind_data.options.file_options, bind_data: bind_data.reader_bind,
784 global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: column_ids, table_filters: nullptr,
785 initial_file: bind_data.files.front());
786 }
787 total_size = result->file_handle->FileSize();
788 return result;
789 }
790};
791
792struct SingleThreadedCSVLocalState : public LocalTableFunctionState {
793public:
794 explicit SingleThreadedCSVLocalState() : bytes_read(0), total_size(0), current_progress(0), file_index(0) {
795 }
796
797 //! The CSV reader
798 unique_ptr<BufferedCSVReader> csv_reader;
799 //! The current amount of bytes read by this reader
800 idx_t bytes_read;
801 //! The total amount of bytes in the file
802 idx_t total_size;
803 //! The current progress from 0..100
804 idx_t current_progress;
805 //! The file index of this reader
806 idx_t file_index;
807};
808
809static unique_ptr<GlobalTableFunctionState> SingleThreadedCSVInit(ClientContext &context,
810 TableFunctionInitInput &input) {
811 auto &bind_data = input.bind_data->CastNoConst<ReadCSVData>();
812 auto result = make_uniq<SingleThreadedCSVState>(args: bind_data.files.size());
813 if (bind_data.files.empty()) {
814 // This can happen when a filename based filter pushdown has eliminated all possible files for this scan.
815 return std::move(result);
816 } else {
817 bind_data.options.file_path = bind_data.files[0];
818 if (bind_data.initial_reader) {
819 // If this is a pipe and an initial reader already exists due to read_csv_auto
820 // We must re-use it, since we can't restart the reader due for it being a pipe.
821 result->initial_reader = std::move(bind_data.initial_reader);
822 } else {
823 result->initial_reader = make_uniq<BufferedCSVReader>(args&: context, args&: bind_data.options, args&: bind_data.csv_types);
824 }
825 if (!bind_data.options.file_options.union_by_name) {
826 result->initial_reader->names = bind_data.csv_names;
827 }
828 if (bind_data.options.auto_detect) {
829 bind_data.options = result->initial_reader->options;
830 }
831 }
832 MultiFileReader::InitializeReader(reader&: *result->initial_reader, options: bind_data.options.file_options, bind_data: bind_data.reader_bind,
833 global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: input.column_ids, table_filters: input.filters,
834 initial_file: bind_data.files.front());
835 for (auto &reader : bind_data.union_readers) {
836 if (!reader) {
837 continue;
838 }
839 MultiFileReader::InitializeReader(reader&: *reader, options: bind_data.options.file_options, bind_data: bind_data.reader_bind,
840 global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: input.column_ids,
841 table_filters: input.filters, initial_file: bind_data.files.front());
842 }
843 result->column_ids = input.column_ids;
844
845 if (!bind_data.options.file_options.union_by_name) {
846 // if we are reading multiple files - run auto-detect only on the first file
847 // UNLESS union by name is turned on - in that case we assume that different files have different schemas
848 // as such, we need to re-run the auto detection on each file
849 bind_data.options.auto_detect = false;
850 }
851 result->csv_types = bind_data.csv_types;
852 result->csv_names = bind_data.csv_names;
853 result->next_file = 1;
854 return std::move(result);
855}
856
857unique_ptr<LocalTableFunctionState> SingleThreadedReadCSVInitLocal(ExecutionContext &context,
858 TableFunctionInitInput &input,
859 GlobalTableFunctionState *global_state_p) {
860 auto &bind_data = input.bind_data->CastNoConst<ReadCSVData>();
861 auto &data = global_state_p->Cast<SingleThreadedCSVState>();
862 auto result = make_uniq<SingleThreadedCSVLocalState>();
863 result->csv_reader = data.GetCSVReader(context&: context.client, bind_data, file_index&: result->file_index, total_size&: result->total_size);
864 return std::move(result);
865}
866
867static void SingleThreadedCSVFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
868 auto &bind_data = data_p.bind_data->CastNoConst<ReadCSVData>();
869 auto &data = data_p.global_state->Cast<SingleThreadedCSVState>();
870 auto &lstate = data_p.local_state->Cast<SingleThreadedCSVLocalState>();
871 if (!lstate.csv_reader) {
872 // no csv_reader was set, this can happen when a filename-based filter has filtered out all possible files
873 return;
874 }
875
876 do {
877 lstate.csv_reader->ParseCSV(insert_chunk&: output);
878 // update the number of bytes read
879 D_ASSERT(lstate.bytes_read <= lstate.csv_reader->bytes_in_chunk);
880 auto bytes_read = MinValue<idx_t>(a: lstate.total_size, b: lstate.csv_reader->bytes_in_chunk);
881 auto current_progress = lstate.total_size == 0 ? 100 : 100 * bytes_read / lstate.total_size;
882 if (current_progress > lstate.current_progress) {
883 if (current_progress > 100) {
884 throw InternalException("Progress should never exceed 100");
885 }
886 data.progress_in_files += current_progress - lstate.current_progress;
887 lstate.current_progress = current_progress;
888 }
889 if (output.size() == 0) {
890 // exhausted this file, but we might have more files we can read
891 auto csv_reader = data.GetCSVReader(context, bind_data, file_index&: lstate.file_index, total_size&: lstate.total_size);
892 // add any left-over progress for this file to the progress bar
893 if (lstate.current_progress < 100) {
894 data.progress_in_files += 100 - lstate.current_progress;
895 }
896 // reset the current progress
897 lstate.current_progress = 0;
898 lstate.bytes_read = 0;
899 lstate.csv_reader = std::move(csv_reader);
900 if (!lstate.csv_reader) {
901 // no more files - we are done
902 return;
903 }
904 lstate.bytes_read = 0;
905 } else {
906 MultiFileReader::FinalizeChunk(bind_data: bind_data.reader_bind, reader_data: lstate.csv_reader->reader_data, chunk&: output);
907 break;
908 }
909 } while (true);
910}
911
912//===--------------------------------------------------------------------===//
913// Read CSV Functions
914//===--------------------------------------------------------------------===//
915static unique_ptr<GlobalTableFunctionState> ReadCSVInitGlobal(ClientContext &context, TableFunctionInitInput &input) {
916 auto &bind_data = input.bind_data->Cast<ReadCSVData>();
917 if (bind_data.single_threaded) {
918 return SingleThreadedCSVInit(context, input);
919 } else {
920 return ParallelCSVInitGlobal(context, input);
921 }
922}
923
924unique_ptr<LocalTableFunctionState> ReadCSVInitLocal(ExecutionContext &context, TableFunctionInitInput &input,
925 GlobalTableFunctionState *global_state_p) {
926 auto &csv_data = input.bind_data->Cast<ReadCSVData>();
927 if (csv_data.single_threaded) {
928 return SingleThreadedReadCSVInitLocal(context, input, global_state_p);
929 } else {
930 return ParallelReadCSVInitLocal(context, input, global_state_p);
931 }
932}
933
934static void ReadCSVFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
935 auto &bind_data = data_p.bind_data->Cast<ReadCSVData>();
936 if (bind_data.single_threaded) {
937 SingleThreadedCSVFunction(context, data_p, output);
938 } else {
939 ParallelReadCSVFunction(context, data_p, output);
940 }
941}
942
943static idx_t CSVReaderGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
944 LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state) {
945 auto &bind_data = bind_data_p->Cast<ReadCSVData>();
946 if (bind_data.single_threaded) {
947 auto &data = local_state->Cast<SingleThreadedCSVLocalState>();
948 return data.file_index;
949 }
950 auto &data = local_state->Cast<ParallelCSVLocalState>();
951 return data.csv_reader->buffer->batch_index;
952}
953
954static void ReadCSVAddNamedParameters(TableFunction &table_function) {
955 table_function.named_parameters["sep"] = LogicalType::VARCHAR;
956 table_function.named_parameters["delim"] = LogicalType::VARCHAR;
957 table_function.named_parameters["quote"] = LogicalType::VARCHAR;
958 table_function.named_parameters["new_line"] = LogicalType::VARCHAR;
959 table_function.named_parameters["escape"] = LogicalType::VARCHAR;
960 table_function.named_parameters["nullstr"] = LogicalType::VARCHAR;
961 table_function.named_parameters["columns"] = LogicalType::ANY;
962 table_function.named_parameters["auto_type_candidates"] = LogicalType::ANY;
963 table_function.named_parameters["header"] = LogicalType::BOOLEAN;
964 table_function.named_parameters["auto_detect"] = LogicalType::BOOLEAN;
965 table_function.named_parameters["sample_size"] = LogicalType::BIGINT;
966 table_function.named_parameters["sample_chunk_size"] = LogicalType::BIGINT;
967 table_function.named_parameters["sample_chunks"] = LogicalType::BIGINT;
968 table_function.named_parameters["all_varchar"] = LogicalType::BOOLEAN;
969 table_function.named_parameters["dateformat"] = LogicalType::VARCHAR;
970 table_function.named_parameters["timestampformat"] = LogicalType::VARCHAR;
971 table_function.named_parameters["normalize_names"] = LogicalType::BOOLEAN;
972 table_function.named_parameters["compression"] = LogicalType::VARCHAR;
973 table_function.named_parameters["skip"] = LogicalType::BIGINT;
974 table_function.named_parameters["max_line_size"] = LogicalType::VARCHAR;
975 table_function.named_parameters["maximum_line_size"] = LogicalType::VARCHAR;
976 table_function.named_parameters["ignore_errors"] = LogicalType::BOOLEAN;
977 table_function.named_parameters["buffer_size"] = LogicalType::UBIGINT;
978 table_function.named_parameters["decimal_separator"] = LogicalType::VARCHAR;
979 table_function.named_parameters["parallel"] = LogicalType::BOOLEAN;
980 table_function.named_parameters["null_padding"] = LogicalType::BOOLEAN;
981 table_function.named_parameters["allow_quoted_nulls"] = LogicalType::BOOLEAN;
982 table_function.named_parameters["column_types"] = LogicalType::ANY;
983 table_function.named_parameters["dtypes"] = LogicalType::ANY;
984 table_function.named_parameters["types"] = LogicalType::ANY;
985 table_function.named_parameters["names"] = LogicalType::LIST(child: LogicalType::VARCHAR);
986 table_function.named_parameters["column_names"] = LogicalType::LIST(child: LogicalType::VARCHAR);
987 MultiFileReader::AddParameters(table_function);
988}
989
990double CSVReaderProgress(ClientContext &context, const FunctionData *bind_data_p,
991 const GlobalTableFunctionState *global_state) {
992 auto &bind_data = bind_data_p->Cast<ReadCSVData>();
993 if (bind_data.single_threaded) {
994 auto &data = global_state->Cast<SingleThreadedCSVState>();
995 return data.GetProgress(bind_data);
996 } else {
997 auto &data = global_state->Cast<ParallelCSVGlobalState>();
998 return data.GetProgress(bind_data);
999 }
1000}
1001
1002void CSVComplexFilterPushdown(ClientContext &context, LogicalGet &get, FunctionData *bind_data_p,
1003 vector<unique_ptr<Expression>> &filters) {
1004 auto &data = bind_data_p->Cast<ReadCSVData>();
1005 auto reset_reader =
1006 MultiFileReader::ComplexFilterPushdown(context, files&: data.files, options: data.options.file_options, get, filters);
1007 if (reset_reader) {
1008 MultiFileReader::PruneReaders(data);
1009 }
1010}
1011
1012unique_ptr<NodeStatistics> CSVReaderCardinality(ClientContext &context, const FunctionData *bind_data_p) {
1013 auto &bind_data = bind_data_p->Cast<ReadCSVData>();
1014 idx_t per_file_cardinality = 0;
1015 if (bind_data.initial_reader && bind_data.initial_reader->file_handle) {
1016 auto estimated_row_width = (bind_data.csv_types.size() * 5);
1017 per_file_cardinality = bind_data.initial_reader->file_handle->FileSize() / estimated_row_width;
1018 } else {
1019 // determined through the scientific method as the average amount of rows in a CSV file
1020 per_file_cardinality = 42;
1021 }
1022 return make_uniq<NodeStatistics>(args: bind_data.files.size() * per_file_cardinality);
1023}
1024
1025void BufferedCSVReaderOptions::Serialize(FieldWriter &writer) const {
1026 // common options
1027 writer.WriteField<bool>(element: has_delimiter);
1028 writer.WriteString(val: delimiter);
1029 writer.WriteField<bool>(element: has_quote);
1030 writer.WriteString(val: quote);
1031 writer.WriteField<bool>(element: has_escape);
1032 writer.WriteString(val: escape);
1033 writer.WriteField<bool>(element: has_header);
1034 writer.WriteField<bool>(element: header);
1035 writer.WriteField<bool>(element: ignore_errors);
1036 writer.WriteField<idx_t>(element: num_cols);
1037 writer.WriteField<idx_t>(element: buffer_sample_size);
1038 writer.WriteString(val: null_str);
1039 writer.WriteField<FileCompressionType>(element: compression);
1040 writer.WriteField<NewLineIdentifier>(element: new_line);
1041 writer.WriteField<bool>(element: allow_quoted_nulls);
1042 // read options
1043 writer.WriteField<idx_t>(element: skip_rows);
1044 writer.WriteField<bool>(element: skip_rows_set);
1045 writer.WriteField<idx_t>(element: maximum_line_size);
1046 writer.WriteField<bool>(element: normalize_names);
1047 writer.WriteListNoReference<bool>(elements: force_not_null);
1048 writer.WriteField<bool>(element: all_varchar);
1049 writer.WriteField<idx_t>(element: sample_chunk_size);
1050 writer.WriteField<idx_t>(element: sample_chunks);
1051 writer.WriteField<bool>(element: auto_detect);
1052 writer.WriteString(val: file_path);
1053 writer.WriteString(val: decimal_separator);
1054 writer.WriteField<bool>(element: null_padding);
1055 writer.WriteField<idx_t>(element: buffer_size);
1056 writer.WriteSerializable(element: file_options);
1057 // write options
1058 writer.WriteListNoReference<bool>(elements: force_quote);
1059 // FIXME: serialize date_format / has_format
1060 vector<string> csv_formats;
1061 for (auto &format : date_format) {
1062 csv_formats.push_back(x: format.second.format_specifier);
1063 }
1064 writer.WriteList<string>(elements: csv_formats);
1065}
1066
1067void BufferedCSVReaderOptions::Deserialize(FieldReader &reader) {
1068 // common options
1069 has_delimiter = reader.ReadRequired<bool>();
1070 delimiter = reader.ReadRequired<string>();
1071 has_quote = reader.ReadRequired<bool>();
1072 quote = reader.ReadRequired<string>();
1073 has_escape = reader.ReadRequired<bool>();
1074 escape = reader.ReadRequired<string>();
1075 has_header = reader.ReadRequired<bool>();
1076 header = reader.ReadRequired<bool>();
1077 ignore_errors = reader.ReadRequired<bool>();
1078 num_cols = reader.ReadRequired<idx_t>();
1079 buffer_sample_size = reader.ReadRequired<idx_t>();
1080 null_str = reader.ReadRequired<string>();
1081 compression = reader.ReadRequired<FileCompressionType>();
1082 new_line = reader.ReadRequired<NewLineIdentifier>();
1083 allow_quoted_nulls = reader.ReadRequired<bool>();
1084 // read options
1085 skip_rows = reader.ReadRequired<idx_t>();
1086 skip_rows_set = reader.ReadRequired<bool>();
1087 maximum_line_size = reader.ReadRequired<idx_t>();
1088 normalize_names = reader.ReadRequired<bool>();
1089 force_not_null = reader.ReadRequiredList<bool>();
1090 all_varchar = reader.ReadRequired<bool>();
1091 sample_chunk_size = reader.ReadRequired<idx_t>();
1092 sample_chunks = reader.ReadRequired<idx_t>();
1093 auto_detect = reader.ReadRequired<bool>();
1094 file_path = reader.ReadRequired<string>();
1095 decimal_separator = reader.ReadRequired<string>();
1096 null_padding = reader.ReadRequired<bool>();
1097 buffer_size = reader.ReadRequired<idx_t>();
1098 file_options = reader.ReadRequiredSerializable<MultiFileReaderOptions, MultiFileReaderOptions>();
1099 // write options
1100 force_quote = reader.ReadRequiredList<bool>();
1101 auto formats = reader.ReadRequiredList<string>();
1102 vector<LogicalTypeId> format_types {LogicalTypeId::DATE, LogicalTypeId::TIMESTAMP};
1103 for (idx_t f_idx = 0; f_idx < formats.size(); f_idx++) {
1104 auto &format = formats[f_idx];
1105 auto &type = format_types[f_idx];
1106 if (format.empty()) {
1107 continue;
1108 }
1109 has_format[type] = true;
1110 StrTimeFormat::ParseFormatSpecifier(format_string: format, format&: date_format[type]);
1111 }
1112}
1113
1114static void CSVReaderSerialize(FieldWriter &writer, const FunctionData *bind_data_p, const TableFunction &function) {
1115 auto &bind_data = bind_data_p->Cast<ReadCSVData>();
1116 writer.WriteString(val: function.extra_info);
1117 writer.WriteList<string>(elements: bind_data.files);
1118 writer.WriteRegularSerializableList<LogicalType>(elements: bind_data.csv_types);
1119 writer.WriteList<string>(elements: bind_data.csv_names);
1120 writer.WriteRegularSerializableList<LogicalType>(elements: bind_data.return_types);
1121 writer.WriteList<string>(elements: bind_data.return_names);
1122 writer.WriteField<idx_t>(element: bind_data.filename_col_idx);
1123 writer.WriteField<idx_t>(element: bind_data.hive_partition_col_idx);
1124 bind_data.options.Serialize(writer);
1125 writer.WriteField<bool>(element: bind_data.single_threaded);
1126 writer.WriteSerializable(element: bind_data.reader_bind);
1127 writer.WriteField<uint32_t>(element: bind_data.column_info.size());
1128 for (auto &col : bind_data.column_info) {
1129 col.Serialize(writer);
1130 }
1131}
1132
1133static unique_ptr<FunctionData> CSVReaderDeserialize(PlanDeserializationState &state, FieldReader &reader,
1134 TableFunction &function) {
1135 function.extra_info = reader.ReadRequired<string>();
1136 auto result_data = make_uniq<ReadCSVData>();
1137 result_data->files = reader.ReadRequiredList<string>();
1138 result_data->csv_types = reader.ReadRequiredSerializableList<LogicalType, LogicalType>();
1139 result_data->csv_names = reader.ReadRequiredList<string>();
1140 result_data->return_types = reader.ReadRequiredSerializableList<LogicalType, LogicalType>();
1141 result_data->return_names = reader.ReadRequiredList<string>();
1142 result_data->filename_col_idx = reader.ReadRequired<idx_t>();
1143 result_data->hive_partition_col_idx = reader.ReadRequired<idx_t>();
1144 result_data->options.Deserialize(reader);
1145 result_data->single_threaded = reader.ReadField<bool>(default_value: true);
1146 result_data->reader_bind = reader.ReadRequiredSerializable<MultiFileReaderBindData, MultiFileReaderBindData>();
1147 uint32_t file_number = reader.ReadRequired<uint32_t>();
1148 for (idx_t i = 0; i < file_number; i++) {
1149 result_data->column_info.emplace_back(args: ColumnInfo::Deserialize(reader));
1150 }
1151 return std::move(result_data);
1152}
1153
1154TableFunction ReadCSVTableFunction::GetFunction() {
1155 TableFunction read_csv("read_csv", {LogicalType::VARCHAR}, ReadCSVFunction, ReadCSVBind, ReadCSVInitGlobal,
1156 ReadCSVInitLocal);
1157 read_csv.table_scan_progress = CSVReaderProgress;
1158 read_csv.pushdown_complex_filter = CSVComplexFilterPushdown;
1159 read_csv.serialize = CSVReaderSerialize;
1160 read_csv.deserialize = CSVReaderDeserialize;
1161 read_csv.get_batch_index = CSVReaderGetBatchIndex;
1162 read_csv.cardinality = CSVReaderCardinality;
1163 read_csv.projection_pushdown = true;
1164 ReadCSVAddNamedParameters(table_function&: read_csv);
1165 return read_csv;
1166}
1167
1168TableFunction ReadCSVTableFunction::GetAutoFunction() {
1169 auto read_csv_auto = ReadCSVTableFunction::GetFunction();
1170 read_csv_auto.name = "read_csv_auto";
1171 read_csv_auto.bind = ReadCSVAutoBind;
1172 return read_csv_auto;
1173}
1174
1175void ReadCSVTableFunction::RegisterFunction(BuiltinFunctions &set) {
1176 set.AddFunction(set: MultiFileReader::CreateFunctionSet(table_function: ReadCSVTableFunction::GetFunction()));
1177 set.AddFunction(set: MultiFileReader::CreateFunctionSet(table_function: ReadCSVTableFunction::GetAutoFunction()));
1178}
1179
1180unique_ptr<TableRef> ReadCSVReplacement(ClientContext &context, const string &table_name, ReplacementScanData *data) {
1181 auto lower_name = StringUtil::Lower(str: table_name);
1182 // remove any compression
1183 if (StringUtil::EndsWith(str: lower_name, suffix: ".gz")) {
1184 lower_name = lower_name.substr(pos: 0, n: lower_name.size() - 3);
1185 } else if (StringUtil::EndsWith(str: lower_name, suffix: ".zst")) {
1186 lower_name = lower_name.substr(pos: 0, n: lower_name.size() - 4);
1187 }
1188 if (!StringUtil::EndsWith(str: lower_name, suffix: ".csv") && !StringUtil::Contains(haystack: lower_name, needle: ".csv?") &&
1189 !StringUtil::EndsWith(str: lower_name, suffix: ".tsv") && !StringUtil::Contains(haystack: lower_name, needle: ".tsv?")) {
1190 return nullptr;
1191 }
1192 auto table_function = make_uniq<TableFunctionRef>();
1193 vector<unique_ptr<ParsedExpression>> children;
1194 children.push_back(x: make_uniq<ConstantExpression>(args: Value(table_name)));
1195 table_function->function = make_uniq<FunctionExpression>(args: "read_csv_auto", args: std::move(children));
1196
1197 if (!FileSystem::HasGlob(str: table_name)) {
1198 table_function->alias = FileSystem::ExtractBaseName(path: table_name);
1199 }
1200
1201 return std::move(table_function);
1202}
1203
1204void BuiltinFunctions::RegisterReadFunctions() {
1205 CSVCopyFunction::RegisterFunction(set&: *this);
1206 ReadCSVTableFunction::RegisterFunction(set&: *this);
1207 auto &config = DBConfig::GetConfig(db&: *transaction.db);
1208 config.replacement_scans.emplace_back(args&: ReadCSVReplacement);
1209}
1210
1211} // namespace duckdb
1212