1 | #include "duckdb/function/table/read_csv.hpp" |
2 | #include "duckdb/function/function_set.hpp" |
3 | #include "duckdb/main/client_context.hpp" |
4 | #include "duckdb/main/database.hpp" |
5 | #include "duckdb/common/string_util.hpp" |
6 | #include "duckdb/common/enum_util.hpp" |
7 | #include "duckdb/common/union_by_name.hpp" |
8 | #include "duckdb/main/config.hpp" |
9 | #include "duckdb/parser/expression/constant_expression.hpp" |
10 | #include "duckdb/parser/expression/function_expression.hpp" |
11 | #include "duckdb/parser/tableref/table_function_ref.hpp" |
12 | #include "duckdb/planner/operator/logical_get.hpp" |
13 | #include "duckdb/main/extension_helper.hpp" |
14 | #include "duckdb/common/multi_file_reader.hpp" |
15 | #include "duckdb/main/client_data.hpp" |
16 | #include "duckdb/execution/operator/persistent/csv_line_info.hpp" |
17 | #include <limits> |
18 | |
19 | namespace duckdb { |
20 | |
21 | unique_ptr<CSVFileHandle> ReadCSV::OpenCSV(const string &file_path, FileCompressionType compression, |
22 | ClientContext &context) { |
23 | auto &fs = FileSystem::GetFileSystem(context); |
24 | auto &allocator = BufferAllocator::Get(context); |
25 | return CSVFileHandle::OpenFile(fs, allocator, path: file_path, compression, enable_reset: false); |
26 | } |
27 | |
28 | void ReadCSVData::FinalizeRead(ClientContext &context) { |
29 | BaseCSVData::Finalize(); |
30 | // Here we identify if we can run this CSV file on parallel or not. |
31 | bool null_or_empty = options.delimiter.empty() || options.escape.empty() || options.quote.empty() || |
32 | options.delimiter[0] == '\0' || options.escape[0] == '\0' || options.quote[0] == '\0'; |
33 | bool complex_options = options.delimiter.size() > 1 || options.escape.size() > 1 || options.quote.size() > 1; |
34 | bool not_supported_options = options.null_padding; |
35 | |
36 | auto number_of_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); |
37 | if (options.parallel_mode != ParallelMode::PARALLEL && int64_t(files.size() * 2) >= number_of_threads) { |
38 | single_threaded = true; |
39 | } |
40 | if (options.parallel_mode == ParallelMode::SINGLE_THREADED || null_or_empty || not_supported_options || |
41 | complex_options || options.new_line == NewLineIdentifier::MIX) { |
42 | // not supported for parallel CSV reading |
43 | single_threaded = true; |
44 | } |
45 | } |
46 | |
47 | uint8_t GetCandidateSpecificity(const LogicalType &candidate_type) { |
48 | //! Const ht with accepted auto_types and their weights in specificity |
49 | const duckdb::unordered_map<uint8_t, uint8_t> auto_type_candidates_specificity { |
50 | {(uint8_t)LogicalTypeId::VARCHAR, 0}, {(uint8_t)LogicalTypeId::TIMESTAMP, 1}, |
51 | {(uint8_t)LogicalTypeId::DATE, 2}, {(uint8_t)LogicalTypeId::TIME, 3}, |
52 | {(uint8_t)LogicalTypeId::DOUBLE, 4}, {(uint8_t)LogicalTypeId::FLOAT, 5}, |
53 | {(uint8_t)LogicalTypeId::BIGINT, 6}, {(uint8_t)LogicalTypeId::INTEGER, 7}, |
54 | {(uint8_t)LogicalTypeId::SMALLINT, 8}, {(uint8_t)LogicalTypeId::TINYINT, 9}, |
55 | {(uint8_t)LogicalTypeId::BOOLEAN, 10}, {(uint8_t)LogicalTypeId::SQLNULL, 11}}; |
56 | |
57 | auto id = (uint8_t)candidate_type.id(); |
58 | auto it = auto_type_candidates_specificity.find(x: id); |
59 | if (it == auto_type_candidates_specificity.end()) { |
60 | throw BinderException("Auto Type Candidate of type %s is not accepted as a valid input" , |
61 | EnumUtil::ToString(value: candidate_type.id())); |
62 | } |
63 | return it->second; |
64 | } |
65 | |
66 | static unique_ptr<FunctionData> ReadCSVBind(ClientContext &context, TableFunctionBindInput &input, |
67 | vector<LogicalType> &return_types, vector<string> &names) { |
68 | auto result = make_uniq<ReadCSVData>(); |
69 | auto &options = result->options; |
70 | result->files = MultiFileReader::GetFileList(context, input: input.inputs[0], name: "CSV" ); |
71 | |
72 | bool explicitly_set_columns = false; |
73 | for (auto &kv : input.named_parameters) { |
74 | if (MultiFileReader::ParseOption(key: kv.first, val: kv.second, options&: options.file_options)) { |
75 | continue; |
76 | } |
77 | auto loption = StringUtil::Lower(str: kv.first); |
78 | if (loption == "columns" ) { |
79 | explicitly_set_columns = true; |
80 | auto &child_type = kv.second.type(); |
81 | if (child_type.id() != LogicalTypeId::STRUCT) { |
82 | throw BinderException("read_csv columns requires a struct as input" ); |
83 | } |
84 | auto &struct_children = StructValue::GetChildren(value: kv.second); |
85 | D_ASSERT(StructType::GetChildCount(child_type) == struct_children.size()); |
86 | for (idx_t i = 0; i < struct_children.size(); i++) { |
87 | auto &name = StructType::GetChildName(type: child_type, index: i); |
88 | auto &val = struct_children[i]; |
89 | names.push_back(x: name); |
90 | if (val.type().id() != LogicalTypeId::VARCHAR) { |
91 | throw BinderException("read_csv requires a type specification as string" ); |
92 | } |
93 | return_types.emplace_back(args: TransformStringToLogicalType(str: StringValue::Get(value: val), context)); |
94 | } |
95 | if (names.empty()) { |
96 | throw BinderException("read_csv requires at least a single column as input!" ); |
97 | } |
98 | } else if (loption == "auto_type_candidates" ) { |
99 | options.auto_type_candidates.clear(); |
100 | map<uint8_t, LogicalType> candidate_types; |
101 | // We always have the extremes of Null and Varchar, so we can default to varchar if the |
102 | // sniffer is not able to confidently detect that column type |
103 | candidate_types[GetCandidateSpecificity(candidate_type: LogicalType::VARCHAR)] = LogicalType::VARCHAR; |
104 | candidate_types[GetCandidateSpecificity(candidate_type: LogicalType::SQLNULL)] = LogicalType::SQLNULL; |
105 | |
106 | auto &child_type = kv.second.type(); |
107 | if (child_type.id() != LogicalTypeId::LIST) { |
108 | throw BinderException("read_csv auto_types requires a list as input" ); |
109 | } |
110 | auto &list_children = ListValue::GetChildren(value: kv.second); |
111 | if (list_children.empty()) { |
112 | throw BinderException("auto_type_candidates requires at least one type" ); |
113 | } |
114 | for (auto &child : list_children) { |
115 | if (child.type().id() != LogicalTypeId::VARCHAR) { |
116 | throw BinderException("auto_type_candidates requires a type specification as string" ); |
117 | } |
118 | auto candidate_type = TransformStringToLogicalType(str: StringValue::Get(value: child), context); |
119 | candidate_types[GetCandidateSpecificity(candidate_type)] = candidate_type; |
120 | } |
121 | for (auto &candidate_type : candidate_types) { |
122 | options.auto_type_candidates.emplace_back(args&: candidate_type.second); |
123 | } |
124 | } else if (loption == "column_names" || loption == "names" ) { |
125 | if (!options.name_list.empty()) { |
126 | throw BinderException("read_csv_auto column_names/names can only be supplied once" ); |
127 | } |
128 | if (kv.second.IsNull()) { |
129 | throw BinderException("read_csv_auto %s cannot be NULL" , kv.first); |
130 | } |
131 | auto &children = ListValue::GetChildren(value: kv.second); |
132 | for (auto &child : children) { |
133 | options.name_list.push_back(x: StringValue::Get(value: child)); |
134 | } |
135 | } else if (loption == "column_types" || loption == "types" || loption == "dtypes" ) { |
136 | auto &child_type = kv.second.type(); |
137 | if (child_type.id() != LogicalTypeId::STRUCT && child_type.id() != LogicalTypeId::LIST) { |
138 | throw BinderException("read_csv_auto %s requires a struct or list as input" , kv.first); |
139 | } |
140 | if (!options.sql_type_list.empty()) { |
141 | throw BinderException("read_csv_auto column_types/types/dtypes can only be supplied once" ); |
142 | } |
143 | vector<string> sql_type_names; |
144 | if (child_type.id() == LogicalTypeId::STRUCT) { |
145 | auto &struct_children = StructValue::GetChildren(value: kv.second); |
146 | D_ASSERT(StructType::GetChildCount(child_type) == struct_children.size()); |
147 | for (idx_t i = 0; i < struct_children.size(); i++) { |
148 | auto &name = StructType::GetChildName(type: child_type, index: i); |
149 | auto &val = struct_children[i]; |
150 | if (val.type().id() != LogicalTypeId::VARCHAR) { |
151 | throw BinderException("read_csv_auto %s requires a type specification as string" , kv.first); |
152 | } |
153 | sql_type_names.push_back(x: StringValue::Get(value: val)); |
154 | options.sql_types_per_column[name] = i; |
155 | } |
156 | } else { |
157 | auto &list_child = ListType::GetChildType(type: child_type); |
158 | if (list_child.id() != LogicalTypeId::VARCHAR) { |
159 | throw BinderException("read_csv_auto %s requires a list of types (varchar) as input" , kv.first); |
160 | } |
161 | auto &children = ListValue::GetChildren(value: kv.second); |
162 | for (auto &child : children) { |
163 | sql_type_names.push_back(x: StringValue::Get(value: child)); |
164 | } |
165 | } |
166 | options.sql_type_list.reserve(n: sql_type_names.size()); |
167 | for (auto &sql_type : sql_type_names) { |
168 | auto def_type = TransformStringToLogicalType(str: sql_type); |
169 | if (def_type.id() == LogicalTypeId::USER) { |
170 | throw BinderException("Unrecognized type \"%s\" for read_csv_auto %s definition" , sql_type, |
171 | kv.first); |
172 | } |
173 | options.sql_type_list.push_back(x: std::move(def_type)); |
174 | } |
175 | } else if (loption == "all_varchar" ) { |
176 | options.all_varchar = BooleanValue::Get(value: kv.second); |
177 | } else if (loption == "normalize_names" ) { |
178 | options.normalize_names = BooleanValue::Get(value: kv.second); |
179 | } else { |
180 | options.SetReadOption(loption, value: kv.second, expected_names&: names); |
181 | } |
182 | } |
183 | if (options.file_options.auto_detect_hive_partitioning) { |
184 | options.file_options.hive_partitioning = MultiFileReaderOptions::AutoDetectHivePartitioning(files: result->files); |
185 | } |
186 | |
187 | if (!options.auto_detect && return_types.empty()) { |
188 | throw BinderException("read_csv requires columns to be specified through the 'columns' option. Use " |
189 | "read_csv_auto or set read_csv(..., " |
190 | "AUTO_DETECT=TRUE) to automatically guess columns." ); |
191 | } |
192 | if (options.auto_detect) { |
193 | options.file_path = result->files[0]; |
194 | auto initial_reader = make_uniq<BufferedCSVReader>(args&: context, args&: options); |
195 | return_types.assign(first: initial_reader->return_types.begin(), last: initial_reader->return_types.end()); |
196 | if (names.empty()) { |
197 | names.assign(first: initial_reader->names.begin(), last: initial_reader->names.end()); |
198 | } else { |
199 | if (explicitly_set_columns) { |
200 | // The user has influenced the names, can't assume they are valid anymore |
201 | if (return_types.size() != names.size()) { |
202 | throw BinderException("The amount of names specified (%d) and the observed amount of types (%d) in " |
203 | "the file don't match" , |
204 | names.size(), return_types.size()); |
205 | } |
206 | } else { |
207 | D_ASSERT(return_types.size() == names.size()); |
208 | } |
209 | initial_reader->names = names; |
210 | } |
211 | options = initial_reader->options; |
212 | result->initial_reader = std::move(initial_reader); |
213 | } else { |
214 | D_ASSERT(return_types.size() == names.size()); |
215 | } |
216 | result->csv_types = return_types; |
217 | result->csv_names = names; |
218 | |
219 | if (options.file_options.union_by_name) { |
220 | result->reader_bind = |
221 | MultiFileReader::BindUnionReader<BufferedCSVReader>(context, return_types, names, result&: *result, options); |
222 | if (result->union_readers.size() > 1) { |
223 | result->column_info.emplace_back(args&: result->csv_names, args&: result->csv_types); |
224 | for (idx_t i = 1; i < result->union_readers.size(); i++) { |
225 | result->column_info.emplace_back(args&: result->union_readers[i]->names, |
226 | args&: result->union_readers[i]->return_types); |
227 | } |
228 | } |
229 | if (!options.sql_types_per_column.empty()) { |
230 | auto exception = BufferedCSVReader::ColumnTypesError(sql_types_per_column: options.sql_types_per_column, names); |
231 | if (!exception.empty()) { |
232 | throw BinderException(exception); |
233 | } |
234 | } |
235 | } else { |
236 | result->reader_bind = MultiFileReader::BindOptions(options&: options.file_options, files: result->files, return_types, names); |
237 | } |
238 | result->return_types = return_types; |
239 | result->return_names = names; |
240 | result->FinalizeRead(context); |
241 | return std::move(result); |
242 | } |
243 | |
244 | static unique_ptr<FunctionData> ReadCSVAutoBind(ClientContext &context, TableFunctionBindInput &input, |
245 | vector<LogicalType> &return_types, vector<string> &names) { |
246 | input.named_parameters["auto_detect" ] = Value::BOOLEAN(value: true); |
247 | return ReadCSVBind(context, input, return_types, names); |
248 | } |
249 | |
250 | //===--------------------------------------------------------------------===// |
251 | // Parallel CSV Reader CSV Global State |
252 | //===--------------------------------------------------------------------===// |
253 | |
254 | struct ParallelCSVGlobalState : public GlobalTableFunctionState { |
255 | public: |
256 | ParallelCSVGlobalState(ClientContext &context, unique_ptr<CSVFileHandle> file_handle_p, |
257 | const vector<string> &files_path_p, idx_t system_threads_p, idx_t buffer_size_p, |
258 | idx_t rows_to_skip, bool force_parallelism_p, vector<column_t> column_ids_p, bool ) |
259 | : file_handle(std::move(file_handle_p)), system_threads(system_threads_p), buffer_size(buffer_size_p), |
260 | force_parallelism(force_parallelism_p), column_ids(std::move(column_ids_p)), |
261 | line_info(main_mutex, batch_to_tuple_end, tuple_start, tuple_end) { |
262 | file_handle->DisableReset(); |
263 | current_file_path = files_path_p[0]; |
264 | line_info.lines_read[0] = rows_to_skip; |
265 | if (has_header) { |
266 | line_info.lines_read[0]++; |
267 | } |
268 | file_size = file_handle->FileSize(); |
269 | first_file_size = file_size; |
270 | on_disk_file = file_handle->OnDiskFile(); |
271 | bytes_read = 0; |
272 | if (buffer_size < file_size || file_size == 0) { |
273 | bytes_per_local_state = buffer_size / ParallelCSVGlobalState::MaxThreads(); |
274 | } else { |
275 | bytes_per_local_state = file_size / MaxThreads(); |
276 | } |
277 | if (bytes_per_local_state == 0) { |
278 | // In practice, I think this won't happen, it only happens because we are mocking up test scenarios |
279 | // this boy needs to be at least one. |
280 | bytes_per_local_state = 1; |
281 | } |
282 | for (idx_t i = 0; i < rows_to_skip; i++) { |
283 | file_handle->ReadLine(); |
284 | } |
285 | first_position = current_csv_position; |
286 | current_buffer = make_shared<CSVBuffer>(args&: context, args&: buffer_size, args&: *file_handle, args&: current_csv_position, args&: file_number); |
287 | next_buffer = shared_ptr<CSVBuffer>( |
288 | current_buffer->Next(file_handle&: *file_handle, buffer_size, global_csv_current_position&: current_csv_position, file_number).release()); |
289 | running_threads = MaxThreads(); |
290 | |
291 | // Initialize all the book-keeping variables |
292 | auto file_count = files_path_p.size(); |
293 | line_info.current_batches.resize(new_size: file_count); |
294 | tuple_start.resize(new_size: file_count); |
295 | tuple_end.resize(new_size: file_count); |
296 | tuple_end_to_batch.resize(new_size: file_count); |
297 | batch_to_tuple_end.resize(new_size: file_count); |
298 | } |
299 | ParallelCSVGlobalState() : line_info(main_mutex, batch_to_tuple_end, tuple_start, tuple_end) { |
300 | running_threads = MaxThreads(); |
301 | } |
302 | |
303 | ~ParallelCSVGlobalState() override { |
304 | } |
305 | |
306 | //! How many bytes were read up to this point |
307 | atomic<idx_t> bytes_read; |
308 | //! Size of current file |
309 | idx_t file_size; |
310 | |
311 | public: |
312 | idx_t MaxThreads() const override; |
313 | //! Updates the CSV reader with the next buffer to read. Returns false if no more buffers are available. |
314 | bool Next(ClientContext &context, const ReadCSVData &bind_data, unique_ptr<ParallelCSVReader> &reader); |
315 | //! Verify if the CSV File was read correctly |
316 | void Verify(); |
317 | |
318 | void UpdateVerification(VerificationPositions positions, idx_t file_number, idx_t batch_idx); |
319 | |
320 | void UpdateLinesRead(CSVBufferRead &buffer_read, idx_t file_idx); |
321 | |
322 | void IncrementThread(); |
323 | |
324 | void DecrementThread(); |
325 | |
326 | bool Finished(); |
327 | |
328 | double GetProgress(const ReadCSVData &bind_data) const { |
329 | idx_t total_files = bind_data.files.size(); |
330 | |
331 | // get the progress WITHIN the current file |
332 | double progress; |
333 | if (file_size == 0) { |
334 | progress = 1.0; |
335 | } else { |
336 | progress = double(bytes_read) / double(file_size); |
337 | } |
338 | // now get the total percentage of files read |
339 | double percentage = double(file_index - 1) / total_files; |
340 | percentage += (double(1) / double(total_files)) * progress; |
341 | return percentage * 100; |
342 | } |
343 | |
344 | private: |
345 | //! File Handle for current file |
346 | unique_ptr<CSVFileHandle> file_handle; |
347 | shared_ptr<CSVBuffer> current_buffer; |
348 | shared_ptr<CSVBuffer> next_buffer; |
349 | |
350 | //! The index of the next file to read (i.e. current file + 1) |
351 | idx_t file_index = 1; |
352 | string current_file_path; |
353 | |
354 | //! Mutex to lock when getting next batch of bytes (Parallel Only) |
355 | mutex main_mutex; |
356 | //! Byte set from for last thread |
357 | idx_t next_byte = 0; |
358 | //! How many bytes we should execute per local state |
359 | idx_t bytes_per_local_state; |
360 | //! Size of first file |
361 | idx_t first_file_size; |
362 | //! Whether or not this is an on-disk file |
363 | bool on_disk_file = true; |
364 | //! Basically max number of threads in DuckDB |
365 | idx_t system_threads; |
366 | //! Size of the buffers |
367 | idx_t buffer_size; |
368 | //! Current batch index |
369 | idx_t batch_index = 0; |
370 | idx_t local_batch_index = 0; |
371 | |
372 | //! Forces parallelism for small CSV Files, should only be used for testing. |
373 | bool force_parallelism = false; |
374 | //! Current (Global) position of CSV |
375 | idx_t current_csv_position = 0; |
376 | //! First Position of First Buffer |
377 | idx_t first_position = 0; |
378 | //! Current File Number |
379 | idx_t file_number = 0; |
380 | idx_t max_tuple_end = 0; |
381 | //! The vector stores positions where threads ended the last line they read in the CSV File, and the set stores |
382 | //! Positions where they started reading the first line. |
383 | vector<vector<idx_t>> tuple_end; |
384 | vector<set<idx_t>> tuple_start; |
385 | //! Tuple end to batch |
386 | vector<unordered_map<idx_t, idx_t>> tuple_end_to_batch; |
387 | //! Batch to Tuple End |
388 | vector<unordered_map<idx_t, idx_t>> batch_to_tuple_end; |
389 | idx_t running_threads = 0; |
390 | //! The column ids to read |
391 | vector<column_t> column_ids; |
392 | //! Line Info used in error messages |
393 | LineInfo line_info; |
394 | }; |
395 | |
396 | idx_t ParallelCSVGlobalState::MaxThreads() const { |
397 | if (force_parallelism || !on_disk_file) { |
398 | return system_threads; |
399 | } |
400 | idx_t one_mb = 1000000; // We initialize max one thread per Mb |
401 | idx_t threads_per_mb = first_file_size / one_mb + 1; |
402 | if (threads_per_mb < system_threads || threads_per_mb == 1) { |
403 | return threads_per_mb; |
404 | } |
405 | |
406 | return system_threads; |
407 | } |
408 | |
409 | void ParallelCSVGlobalState::IncrementThread() { |
410 | lock_guard<mutex> parallel_lock(main_mutex); |
411 | running_threads++; |
412 | } |
413 | |
414 | void ParallelCSVGlobalState::DecrementThread() { |
415 | lock_guard<mutex> parallel_lock(main_mutex); |
416 | D_ASSERT(running_threads > 0); |
417 | running_threads--; |
418 | } |
419 | |
420 | bool ParallelCSVGlobalState::Finished() { |
421 | lock_guard<mutex> parallel_lock(main_mutex); |
422 | return running_threads == 0; |
423 | } |
424 | |
425 | void ParallelCSVGlobalState::Verify() { |
426 | // All threads are done, we run some magic sweet verification code |
427 | lock_guard<mutex> parallel_lock(main_mutex); |
428 | if (running_threads == 0) { |
429 | D_ASSERT(tuple_end.size() == tuple_start.size()); |
430 | for (idx_t i = 0; i < tuple_start.size(); i++) { |
431 | auto ¤t_tuple_end = tuple_end[i]; |
432 | auto ¤t_tuple_start = tuple_start[i]; |
433 | // figure out max value of last_pos |
434 | if (current_tuple_end.empty()) { |
435 | return; |
436 | } |
437 | auto max_value = *max_element(first: std::begin(cont&: current_tuple_end), last: std::end(cont&: current_tuple_end)); |
438 | for (idx_t tpl_idx = 0; tpl_idx < current_tuple_end.size(); tpl_idx++) { |
439 | auto last_pos = current_tuple_end[tpl_idx]; |
440 | auto first_pos = current_tuple_start.find(x: last_pos); |
441 | if (first_pos == current_tuple_start.end()) { |
442 | // this might be necessary due to carriage returns outside buffer scopes. |
443 | first_pos = current_tuple_start.find(x: last_pos + 1); |
444 | } |
445 | if (first_pos == current_tuple_start.end() && last_pos != max_value) { |
446 | auto batch_idx = tuple_end_to_batch[i][last_pos]; |
447 | auto problematic_line = line_info.GetLine(batch_idx); |
448 | throw InvalidInputException( |
449 | "CSV File not supported for multithreading. This can be a problematic line in your CSV File or " |
450 | "that this CSV can't be read in Parallel. Please, inspect if the line %llu is correct. If so, " |
451 | "please run single-threaded CSV Reading by setting parallel=false in the read_csv call." , |
452 | problematic_line); |
453 | } |
454 | } |
455 | } |
456 | } |
457 | } |
458 | |
459 | void LineInfo::Verify(idx_t file_idx, idx_t batch_idx, idx_t cur_first_pos) { |
460 | auto &tuple_start_set = tuple_start[file_idx]; |
461 | auto &processed_batches = batch_to_tuple_end[file_idx]; |
462 | auto &tuple_end_vec = tuple_end[file_idx]; |
463 | bool has_error = false; |
464 | idx_t problematic_line; |
465 | if (batch_idx == 0 || tuple_start_set.empty()) { |
466 | return; |
467 | } |
468 | for (idx_t cur_batch = 0; cur_batch < batch_idx - 1; cur_batch++) { |
469 | auto cur_end = tuple_end_vec[processed_batches[cur_batch]]; |
470 | auto first_pos = tuple_start_set.find(x: cur_end); |
471 | if (first_pos == tuple_start_set.end()) { |
472 | has_error = true; |
473 | problematic_line = GetLine(batch_idx: cur_batch); |
474 | break; |
475 | } |
476 | } |
477 | if (!has_error) { |
478 | auto cur_end = tuple_end_vec[processed_batches[batch_idx - 1]]; |
479 | if (cur_end != cur_first_pos) { |
480 | has_error = true; |
481 | problematic_line = GetLine(batch_idx); |
482 | } |
483 | } |
484 | if (has_error) { |
485 | throw InvalidInputException( |
486 | "CSV File not supported for multithreading. This can be a problematic line in your CSV File or " |
487 | "that this CSV can't be read in Parallel. Please, inspect if the line %llu is correct. If so, " |
488 | "please run single-threaded CSV Reading by setting parallel=false in the read_csv call." , |
489 | problematic_line); |
490 | } |
491 | } |
492 | |
493 | bool ParallelCSVGlobalState::Next(ClientContext &context, const ReadCSVData &bind_data, |
494 | unique_ptr<ParallelCSVReader> &reader) { |
495 | lock_guard<mutex> parallel_lock(main_mutex); |
496 | if (!current_buffer) { |
497 | // This means we are done with the current file, we need to go to the next one (if exists). |
498 | if (file_index < bind_data.files.size()) { |
499 | current_file_path = bind_data.files[file_index++]; |
500 | file_handle = ReadCSV::OpenCSV(file_path: current_file_path, compression: bind_data.options.compression, context); |
501 | current_csv_position = 0; |
502 | file_number++; |
503 | local_batch_index = 0; |
504 | current_buffer = |
505 | make_shared<CSVBuffer>(args&: context, args&: buffer_size, args&: *file_handle, args&: current_csv_position, args&: file_number); |
506 | next_buffer = shared_ptr<CSVBuffer>( |
507 | current_buffer->Next(file_handle&: *file_handle, buffer_size, global_csv_current_position&: current_csv_position, file_number).release()); |
508 | } else { |
509 | // We are done scanning. |
510 | reader.reset(); |
511 | return false; |
512 | } |
513 | } |
514 | // set up the current buffer |
515 | line_info.current_batches.back().insert(x: local_batch_index); |
516 | auto result = make_uniq<CSVBufferRead>(args&: current_buffer, args&: next_buffer, args&: next_byte, args: next_byte + bytes_per_local_state, |
517 | args: batch_index++, args: local_batch_index++, args: &line_info); |
518 | // move the byte index of the CSV reader to the next buffer |
519 | next_byte += bytes_per_local_state; |
520 | if (next_byte >= current_buffer->GetBufferSize()) { |
521 | // We replace the current buffer with the next buffer |
522 | next_byte = 0; |
523 | bytes_read += current_buffer->GetBufferSize(); |
524 | current_buffer = next_buffer; |
525 | if (next_buffer) { |
526 | // Next buffer gets the next-next buffer |
527 | next_buffer = shared_ptr<CSVBuffer>( |
528 | next_buffer->Next(file_handle&: *file_handle, buffer_size, global_csv_current_position&: current_csv_position, file_number).release()); |
529 | } |
530 | } |
531 | if (!reader || reader->options.file_path != current_file_path) { |
532 | // we either don't have a reader, or the reader was created for a different file |
533 | // we need to create a new reader and instantiate it |
534 | if (file_index > 0 && file_index <= bind_data.union_readers.size() && bind_data.union_readers[file_index - 1]) { |
535 | // we are doing UNION BY NAME - fetch the options from the union reader for this file |
536 | auto &union_reader = *bind_data.union_readers[file_index - 1]; |
537 | reader = make_uniq<ParallelCSVReader>(args&: context, args&: union_reader.options, args: std::move(result), args&: first_position, |
538 | args: union_reader.GetTypes(), args: file_index - 1); |
539 | reader->names = union_reader.GetNames(); |
540 | } else if (file_index <= bind_data.column_info.size()) { |
541 | // Serialized Union By name |
542 | reader = make_uniq<ParallelCSVReader>(args&: context, args: bind_data.options, args: std::move(result), args&: first_position, |
543 | args: bind_data.column_info[file_index - 1].types, args: file_index - 1); |
544 | reader->names = bind_data.column_info[file_index - 1].names; |
545 | } else { |
546 | // regular file - use the standard options |
547 | reader = make_uniq<ParallelCSVReader>(args&: context, args: bind_data.options, args: std::move(result), args&: first_position, |
548 | args: bind_data.csv_types, args: file_index - 1); |
549 | reader->names = bind_data.csv_names; |
550 | } |
551 | reader->options.file_path = current_file_path; |
552 | MultiFileReader::InitializeReader(reader&: *reader, options: bind_data.options.file_options, bind_data: bind_data.reader_bind, |
553 | global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: column_ids, table_filters: nullptr, |
554 | initial_file: bind_data.files.front()); |
555 | } else { |
556 | // update the current reader |
557 | reader->SetBufferRead(std::move(result)); |
558 | } |
559 | return true; |
560 | } |
561 | void ParallelCSVGlobalState::UpdateVerification(VerificationPositions positions, idx_t file_number_p, idx_t batch_idx) { |
562 | lock_guard<mutex> parallel_lock(main_mutex); |
563 | if (positions.end_of_last_line > max_tuple_end) { |
564 | max_tuple_end = positions.end_of_last_line; |
565 | } |
566 | tuple_end_to_batch[file_number_p][positions.end_of_last_line] = batch_idx; |
567 | batch_to_tuple_end[file_number_p][batch_idx] = tuple_end[file_number_p].size(); |
568 | tuple_start[file_number_p].insert(x: positions.beginning_of_first_line); |
569 | tuple_end[file_number_p].push_back(x: positions.end_of_last_line); |
570 | } |
571 | |
572 | void ParallelCSVGlobalState::UpdateLinesRead(CSVBufferRead &buffer_read, idx_t file_idx) { |
573 | auto batch_idx = buffer_read.local_batch_index; |
574 | auto lines_read = buffer_read.lines_read; |
575 | lock_guard<mutex> parallel_lock(main_mutex); |
576 | line_info.current_batches[file_idx].erase(x: batch_idx); |
577 | line_info.lines_read[batch_idx] += lines_read; |
578 | } |
579 | |
580 | bool LineInfo::CanItGetLine(idx_t file_idx, idx_t batch_idx) { |
581 | lock_guard<mutex> parallel_lock(main_mutex); |
582 | if (current_batches.empty() || done) { |
583 | return true; |
584 | } |
585 | if (file_idx >= current_batches.size() || current_batches[file_idx].empty()) { |
586 | return true; |
587 | } |
588 | auto min_value = *current_batches[file_idx].begin(); |
589 | if (min_value >= batch_idx) { |
590 | return true; |
591 | } |
592 | return false; |
593 | } |
594 | |
595 | // Returns the 1-indexed line number |
596 | idx_t LineInfo::GetLine(idx_t batch_idx, idx_t line_error, idx_t file_idx, idx_t cur_start, bool verify) { |
597 | unique_ptr<lock_guard<mutex>> parallel_lock; |
598 | if (!verify) { |
599 | parallel_lock = duckdb::make_uniq<lock_guard<mutex>>(args&: main_mutex); |
600 | } |
601 | idx_t line_count = 0; |
602 | if (done) { |
603 | // line count is 0-indexed, but we want to return 1-indexed |
604 | return first_line + 1; |
605 | } |
606 | for (idx_t i = 0; i <= batch_idx; i++) { |
607 | if (lines_read.find(x: i) == lines_read.end() && i != batch_idx) { |
608 | throw InternalException("Missing batch index on Parallel CSV Reader GetLine" ); |
609 | } |
610 | line_count += lines_read[i]; |
611 | } |
612 | |
613 | // before we are done, if this is not a call in Verify() we must check Verify up to this batch |
614 | if (!verify) { |
615 | Verify(file_idx, batch_idx, cur_first_pos: cur_start); |
616 | } |
617 | done = true; |
618 | first_line = line_count + line_error; |
619 | // line count is 0-indexed, but we want to return 1-indexed |
620 | return first_line + 1; |
621 | } |
622 | |
623 | static unique_ptr<GlobalTableFunctionState> ParallelCSVInitGlobal(ClientContext &context, |
624 | TableFunctionInitInput &input) { |
625 | auto &bind_data = input.bind_data->CastNoConst<ReadCSVData>(); |
626 | if (bind_data.files.empty()) { |
627 | // This can happen when a filename based filter pushdown has eliminated all possible files for this scan. |
628 | return make_uniq<ParallelCSVGlobalState>(); |
629 | } |
630 | unique_ptr<CSVFileHandle> file_handle; |
631 | |
632 | bind_data.options.file_path = bind_data.files[0]; |
633 | |
634 | if (bind_data.initial_reader) { |
635 | file_handle = std::move(bind_data.initial_reader->file_handle); |
636 | file_handle->Reset(); |
637 | file_handle->DisableReset(); |
638 | bind_data.initial_reader.reset(); |
639 | } else { |
640 | file_handle = ReadCSV::OpenCSV(file_path: bind_data.options.file_path, compression: bind_data.options.compression, context); |
641 | } |
642 | return make_uniq<ParallelCSVGlobalState>( |
643 | args&: context, args: std::move(file_handle), args&: bind_data.files, args: context.db->NumberOfThreads(), args&: bind_data.options.buffer_size, |
644 | args&: bind_data.options.skip_rows, args&: ClientConfig::GetConfig(context).verify_parallelism, args: input.column_ids, |
645 | args: bind_data.options.header && bind_data.options.has_header); |
646 | } |
647 | |
648 | //===--------------------------------------------------------------------===// |
649 | // Read CSV Local State |
650 | //===--------------------------------------------------------------------===// |
651 | struct ParallelCSVLocalState : public LocalTableFunctionState { |
652 | public: |
653 | explicit ParallelCSVLocalState(unique_ptr<ParallelCSVReader> csv_reader_p) : csv_reader(std::move(csv_reader_p)) { |
654 | } |
655 | |
656 | //! The CSV reader |
657 | unique_ptr<ParallelCSVReader> csv_reader; |
658 | CSVBufferRead previous_buffer; |
659 | bool done = false; |
660 | }; |
661 | |
662 | unique_ptr<LocalTableFunctionState> ParallelReadCSVInitLocal(ExecutionContext &context, TableFunctionInitInput &input, |
663 | GlobalTableFunctionState *global_state_p) { |
664 | auto &csv_data = input.bind_data->Cast<ReadCSVData>(); |
665 | auto &global_state = global_state_p->Cast<ParallelCSVGlobalState>(); |
666 | unique_ptr<ParallelCSVReader> csv_reader; |
667 | auto has_next = global_state.Next(context&: context.client, bind_data: csv_data, reader&: csv_reader); |
668 | if (!has_next) { |
669 | global_state.DecrementThread(); |
670 | csv_reader.reset(); |
671 | } |
672 | return make_uniq<ParallelCSVLocalState>(args: std::move(csv_reader)); |
673 | } |
674 | |
675 | static void ParallelReadCSVFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { |
676 | auto &bind_data = data_p.bind_data->Cast<ReadCSVData>(); |
677 | auto &csv_global_state = data_p.global_state->Cast<ParallelCSVGlobalState>(); |
678 | auto &csv_local_state = data_p.local_state->Cast<ParallelCSVLocalState>(); |
679 | |
680 | if (!csv_local_state.csv_reader) { |
681 | // no csv_reader was set, this can happen when a filename-based filter has filtered out all possible files |
682 | return; |
683 | } |
684 | |
685 | do { |
686 | if (output.size() != 0) { |
687 | MultiFileReader::FinalizeChunk(bind_data: bind_data.reader_bind, reader_data: csv_local_state.csv_reader->reader_data, chunk&: output); |
688 | break; |
689 | } |
690 | if (csv_local_state.csv_reader->finished) { |
691 | auto verification_updates = csv_local_state.csv_reader->GetVerificationPositions(); |
692 | csv_global_state.UpdateVerification(positions: verification_updates, |
693 | file_number_p: csv_local_state.csv_reader->buffer->buffer->GetFileNumber(), |
694 | batch_idx: csv_local_state.csv_reader->buffer->local_batch_index); |
695 | csv_global_state.UpdateLinesRead(buffer_read&: *csv_local_state.csv_reader->buffer, file_idx: csv_local_state.csv_reader->file_idx); |
696 | auto has_next = csv_global_state.Next(context, bind_data, reader&: csv_local_state.csv_reader); |
697 | if (csv_local_state.csv_reader) { |
698 | csv_local_state.csv_reader->linenr = 0; |
699 | } |
700 | if (!has_next) { |
701 | csv_global_state.DecrementThread(); |
702 | break; |
703 | } |
704 | } |
705 | csv_local_state.csv_reader->ParseCSV(insert_chunk&: output); |
706 | |
707 | } while (true); |
708 | if (csv_global_state.Finished()) { |
709 | csv_global_state.Verify(); |
710 | } |
711 | } |
712 | |
713 | //===--------------------------------------------------------------------===// |
714 | // Single-Threaded CSV Reader |
715 | //===--------------------------------------------------------------------===// |
716 | struct SingleThreadedCSVState : public GlobalTableFunctionState { |
717 | explicit SingleThreadedCSVState(idx_t total_files) : total_files(total_files), next_file(0), progress_in_files(0) { |
718 | } |
719 | |
720 | mutex csv_lock; |
721 | unique_ptr<BufferedCSVReader> initial_reader; |
722 | //! The total number of files to read from |
723 | idx_t total_files; |
724 | //! The index of the next file to read (i.e. current file + 1) |
725 | atomic<idx_t> next_file; |
726 | //! How far along we are in reading the current set of open files |
727 | //! This goes from [0...next_file] * 100 |
728 | atomic<idx_t> progress_in_files; |
729 | //! The set of SQL types |
730 | vector<LogicalType> csv_types; |
731 | //! The set of SQL names to be read from the file |
732 | vector<string> csv_names; |
733 | //! The column ids to read |
734 | vector<column_t> column_ids; |
735 | |
736 | idx_t MaxThreads() const override { |
737 | return total_files; |
738 | } |
739 | |
740 | double GetProgress(const ReadCSVData &bind_data) const { |
741 | D_ASSERT(total_files == bind_data.files.size()); |
742 | D_ASSERT(progress_in_files <= total_files * 100); |
743 | return (double(progress_in_files) / double(total_files)); |
744 | } |
745 | |
746 | unique_ptr<BufferedCSVReader> GetCSVReader(ClientContext &context, ReadCSVData &bind_data, idx_t &file_index, |
747 | idx_t &total_size) { |
748 | auto reader = GetCSVReaderInternal(context, bind_data, file_index, total_size); |
749 | if (reader) { |
750 | reader->file_handle->DisableReset(); |
751 | } |
752 | return reader; |
753 | } |
754 | |
755 | private: |
756 | unique_ptr<BufferedCSVReader> GetCSVReaderInternal(ClientContext &context, ReadCSVData &bind_data, |
757 | idx_t &file_index, idx_t &total_size) { |
758 | BufferedCSVReaderOptions options; |
759 | { |
760 | lock_guard<mutex> l(csv_lock); |
761 | if (initial_reader) { |
762 | total_size = initial_reader->file_handle ? initial_reader->file_handle->FileSize() : 0; |
763 | return std::move(initial_reader); |
764 | } |
765 | if (next_file >= total_files) { |
766 | return nullptr; |
767 | } |
768 | options = bind_data.options; |
769 | file_index = next_file; |
770 | next_file++; |
771 | } |
772 | // reuse csv_readers was created during binding |
773 | unique_ptr<BufferedCSVReader> result; |
774 | if (file_index < bind_data.union_readers.size() && bind_data.union_readers[file_index]) { |
775 | result = std::move(bind_data.union_readers[file_index]); |
776 | } else { |
777 | auto union_by_name = options.file_options.union_by_name; |
778 | options.file_path = bind_data.files[file_index]; |
779 | result = make_uniq<BufferedCSVReader>(args&: context, args: std::move(options), args&: csv_types); |
780 | if (!union_by_name) { |
781 | result->names = csv_names; |
782 | } |
783 | MultiFileReader::InitializeReader(reader&: *result, options: bind_data.options.file_options, bind_data: bind_data.reader_bind, |
784 | global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: column_ids, table_filters: nullptr, |
785 | initial_file: bind_data.files.front()); |
786 | } |
787 | total_size = result->file_handle->FileSize(); |
788 | return result; |
789 | } |
790 | }; |
791 | |
792 | struct SingleThreadedCSVLocalState : public LocalTableFunctionState { |
793 | public: |
794 | explicit SingleThreadedCSVLocalState() : bytes_read(0), total_size(0), current_progress(0), file_index(0) { |
795 | } |
796 | |
797 | //! The CSV reader |
798 | unique_ptr<BufferedCSVReader> csv_reader; |
799 | //! The current amount of bytes read by this reader |
800 | idx_t bytes_read; |
801 | //! The total amount of bytes in the file |
802 | idx_t total_size; |
803 | //! The current progress from 0..100 |
804 | idx_t current_progress; |
805 | //! The file index of this reader |
806 | idx_t file_index; |
807 | }; |
808 | |
809 | static unique_ptr<GlobalTableFunctionState> SingleThreadedCSVInit(ClientContext &context, |
810 | TableFunctionInitInput &input) { |
811 | auto &bind_data = input.bind_data->CastNoConst<ReadCSVData>(); |
812 | auto result = make_uniq<SingleThreadedCSVState>(args: bind_data.files.size()); |
813 | if (bind_data.files.empty()) { |
814 | // This can happen when a filename based filter pushdown has eliminated all possible files for this scan. |
815 | return std::move(result); |
816 | } else { |
817 | bind_data.options.file_path = bind_data.files[0]; |
818 | if (bind_data.initial_reader) { |
819 | // If this is a pipe and an initial reader already exists due to read_csv_auto |
820 | // We must re-use it, since we can't restart the reader due for it being a pipe. |
821 | result->initial_reader = std::move(bind_data.initial_reader); |
822 | } else { |
823 | result->initial_reader = make_uniq<BufferedCSVReader>(args&: context, args&: bind_data.options, args&: bind_data.csv_types); |
824 | } |
825 | if (!bind_data.options.file_options.union_by_name) { |
826 | result->initial_reader->names = bind_data.csv_names; |
827 | } |
828 | if (bind_data.options.auto_detect) { |
829 | bind_data.options = result->initial_reader->options; |
830 | } |
831 | } |
832 | MultiFileReader::InitializeReader(reader&: *result->initial_reader, options: bind_data.options.file_options, bind_data: bind_data.reader_bind, |
833 | global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: input.column_ids, table_filters: input.filters, |
834 | initial_file: bind_data.files.front()); |
835 | for (auto &reader : bind_data.union_readers) { |
836 | if (!reader) { |
837 | continue; |
838 | } |
839 | MultiFileReader::InitializeReader(reader&: *reader, options: bind_data.options.file_options, bind_data: bind_data.reader_bind, |
840 | global_types: bind_data.return_types, global_names: bind_data.return_names, global_column_ids: input.column_ids, |
841 | table_filters: input.filters, initial_file: bind_data.files.front()); |
842 | } |
843 | result->column_ids = input.column_ids; |
844 | |
845 | if (!bind_data.options.file_options.union_by_name) { |
846 | // if we are reading multiple files - run auto-detect only on the first file |
847 | // UNLESS union by name is turned on - in that case we assume that different files have different schemas |
848 | // as such, we need to re-run the auto detection on each file |
849 | bind_data.options.auto_detect = false; |
850 | } |
851 | result->csv_types = bind_data.csv_types; |
852 | result->csv_names = bind_data.csv_names; |
853 | result->next_file = 1; |
854 | return std::move(result); |
855 | } |
856 | |
857 | unique_ptr<LocalTableFunctionState> SingleThreadedReadCSVInitLocal(ExecutionContext &context, |
858 | TableFunctionInitInput &input, |
859 | GlobalTableFunctionState *global_state_p) { |
860 | auto &bind_data = input.bind_data->CastNoConst<ReadCSVData>(); |
861 | auto &data = global_state_p->Cast<SingleThreadedCSVState>(); |
862 | auto result = make_uniq<SingleThreadedCSVLocalState>(); |
863 | result->csv_reader = data.GetCSVReader(context&: context.client, bind_data, file_index&: result->file_index, total_size&: result->total_size); |
864 | return std::move(result); |
865 | } |
866 | |
867 | static void SingleThreadedCSVFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { |
868 | auto &bind_data = data_p.bind_data->CastNoConst<ReadCSVData>(); |
869 | auto &data = data_p.global_state->Cast<SingleThreadedCSVState>(); |
870 | auto &lstate = data_p.local_state->Cast<SingleThreadedCSVLocalState>(); |
871 | if (!lstate.csv_reader) { |
872 | // no csv_reader was set, this can happen when a filename-based filter has filtered out all possible files |
873 | return; |
874 | } |
875 | |
876 | do { |
877 | lstate.csv_reader->ParseCSV(insert_chunk&: output); |
878 | // update the number of bytes read |
879 | D_ASSERT(lstate.bytes_read <= lstate.csv_reader->bytes_in_chunk); |
880 | auto bytes_read = MinValue<idx_t>(a: lstate.total_size, b: lstate.csv_reader->bytes_in_chunk); |
881 | auto current_progress = lstate.total_size == 0 ? 100 : 100 * bytes_read / lstate.total_size; |
882 | if (current_progress > lstate.current_progress) { |
883 | if (current_progress > 100) { |
884 | throw InternalException("Progress should never exceed 100" ); |
885 | } |
886 | data.progress_in_files += current_progress - lstate.current_progress; |
887 | lstate.current_progress = current_progress; |
888 | } |
889 | if (output.size() == 0) { |
890 | // exhausted this file, but we might have more files we can read |
891 | auto csv_reader = data.GetCSVReader(context, bind_data, file_index&: lstate.file_index, total_size&: lstate.total_size); |
892 | // add any left-over progress for this file to the progress bar |
893 | if (lstate.current_progress < 100) { |
894 | data.progress_in_files += 100 - lstate.current_progress; |
895 | } |
896 | // reset the current progress |
897 | lstate.current_progress = 0; |
898 | lstate.bytes_read = 0; |
899 | lstate.csv_reader = std::move(csv_reader); |
900 | if (!lstate.csv_reader) { |
901 | // no more files - we are done |
902 | return; |
903 | } |
904 | lstate.bytes_read = 0; |
905 | } else { |
906 | MultiFileReader::FinalizeChunk(bind_data: bind_data.reader_bind, reader_data: lstate.csv_reader->reader_data, chunk&: output); |
907 | break; |
908 | } |
909 | } while (true); |
910 | } |
911 | |
912 | //===--------------------------------------------------------------------===// |
913 | // Read CSV Functions |
914 | //===--------------------------------------------------------------------===// |
915 | static unique_ptr<GlobalTableFunctionState> ReadCSVInitGlobal(ClientContext &context, TableFunctionInitInput &input) { |
916 | auto &bind_data = input.bind_data->Cast<ReadCSVData>(); |
917 | if (bind_data.single_threaded) { |
918 | return SingleThreadedCSVInit(context, input); |
919 | } else { |
920 | return ParallelCSVInitGlobal(context, input); |
921 | } |
922 | } |
923 | |
924 | unique_ptr<LocalTableFunctionState> ReadCSVInitLocal(ExecutionContext &context, TableFunctionInitInput &input, |
925 | GlobalTableFunctionState *global_state_p) { |
926 | auto &csv_data = input.bind_data->Cast<ReadCSVData>(); |
927 | if (csv_data.single_threaded) { |
928 | return SingleThreadedReadCSVInitLocal(context, input, global_state_p); |
929 | } else { |
930 | return ParallelReadCSVInitLocal(context, input, global_state_p); |
931 | } |
932 | } |
933 | |
934 | static void ReadCSVFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { |
935 | auto &bind_data = data_p.bind_data->Cast<ReadCSVData>(); |
936 | if (bind_data.single_threaded) { |
937 | SingleThreadedCSVFunction(context, data_p, output); |
938 | } else { |
939 | ParallelReadCSVFunction(context, data_p, output); |
940 | } |
941 | } |
942 | |
943 | static idx_t CSVReaderGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, |
944 | LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state) { |
945 | auto &bind_data = bind_data_p->Cast<ReadCSVData>(); |
946 | if (bind_data.single_threaded) { |
947 | auto &data = local_state->Cast<SingleThreadedCSVLocalState>(); |
948 | return data.file_index; |
949 | } |
950 | auto &data = local_state->Cast<ParallelCSVLocalState>(); |
951 | return data.csv_reader->buffer->batch_index; |
952 | } |
953 | |
954 | static void ReadCSVAddNamedParameters(TableFunction &table_function) { |
955 | table_function.named_parameters["sep" ] = LogicalType::VARCHAR; |
956 | table_function.named_parameters["delim" ] = LogicalType::VARCHAR; |
957 | table_function.named_parameters["quote" ] = LogicalType::VARCHAR; |
958 | table_function.named_parameters["new_line" ] = LogicalType::VARCHAR; |
959 | table_function.named_parameters["escape" ] = LogicalType::VARCHAR; |
960 | table_function.named_parameters["nullstr" ] = LogicalType::VARCHAR; |
961 | table_function.named_parameters["columns" ] = LogicalType::ANY; |
962 | table_function.named_parameters["auto_type_candidates" ] = LogicalType::ANY; |
963 | table_function.named_parameters["header" ] = LogicalType::BOOLEAN; |
964 | table_function.named_parameters["auto_detect" ] = LogicalType::BOOLEAN; |
965 | table_function.named_parameters["sample_size" ] = LogicalType::BIGINT; |
966 | table_function.named_parameters["sample_chunk_size" ] = LogicalType::BIGINT; |
967 | table_function.named_parameters["sample_chunks" ] = LogicalType::BIGINT; |
968 | table_function.named_parameters["all_varchar" ] = LogicalType::BOOLEAN; |
969 | table_function.named_parameters["dateformat" ] = LogicalType::VARCHAR; |
970 | table_function.named_parameters["timestampformat" ] = LogicalType::VARCHAR; |
971 | table_function.named_parameters["normalize_names" ] = LogicalType::BOOLEAN; |
972 | table_function.named_parameters["compression" ] = LogicalType::VARCHAR; |
973 | table_function.named_parameters["skip" ] = LogicalType::BIGINT; |
974 | table_function.named_parameters["max_line_size" ] = LogicalType::VARCHAR; |
975 | table_function.named_parameters["maximum_line_size" ] = LogicalType::VARCHAR; |
976 | table_function.named_parameters["ignore_errors" ] = LogicalType::BOOLEAN; |
977 | table_function.named_parameters["buffer_size" ] = LogicalType::UBIGINT; |
978 | table_function.named_parameters["decimal_separator" ] = LogicalType::VARCHAR; |
979 | table_function.named_parameters["parallel" ] = LogicalType::BOOLEAN; |
980 | table_function.named_parameters["null_padding" ] = LogicalType::BOOLEAN; |
981 | table_function.named_parameters["allow_quoted_nulls" ] = LogicalType::BOOLEAN; |
982 | table_function.named_parameters["column_types" ] = LogicalType::ANY; |
983 | table_function.named_parameters["dtypes" ] = LogicalType::ANY; |
984 | table_function.named_parameters["types" ] = LogicalType::ANY; |
985 | table_function.named_parameters["names" ] = LogicalType::LIST(child: LogicalType::VARCHAR); |
986 | table_function.named_parameters["column_names" ] = LogicalType::LIST(child: LogicalType::VARCHAR); |
987 | MultiFileReader::AddParameters(table_function); |
988 | } |
989 | |
990 | double CSVReaderProgress(ClientContext &context, const FunctionData *bind_data_p, |
991 | const GlobalTableFunctionState *global_state) { |
992 | auto &bind_data = bind_data_p->Cast<ReadCSVData>(); |
993 | if (bind_data.single_threaded) { |
994 | auto &data = global_state->Cast<SingleThreadedCSVState>(); |
995 | return data.GetProgress(bind_data); |
996 | } else { |
997 | auto &data = global_state->Cast<ParallelCSVGlobalState>(); |
998 | return data.GetProgress(bind_data); |
999 | } |
1000 | } |
1001 | |
1002 | void CSVComplexFilterPushdown(ClientContext &context, LogicalGet &get, FunctionData *bind_data_p, |
1003 | vector<unique_ptr<Expression>> &filters) { |
1004 | auto &data = bind_data_p->Cast<ReadCSVData>(); |
1005 | auto reset_reader = |
1006 | MultiFileReader::ComplexFilterPushdown(context, files&: data.files, options: data.options.file_options, get, filters); |
1007 | if (reset_reader) { |
1008 | MultiFileReader::PruneReaders(data); |
1009 | } |
1010 | } |
1011 | |
1012 | unique_ptr<NodeStatistics> CSVReaderCardinality(ClientContext &context, const FunctionData *bind_data_p) { |
1013 | auto &bind_data = bind_data_p->Cast<ReadCSVData>(); |
1014 | idx_t per_file_cardinality = 0; |
1015 | if (bind_data.initial_reader && bind_data.initial_reader->file_handle) { |
1016 | auto estimated_row_width = (bind_data.csv_types.size() * 5); |
1017 | per_file_cardinality = bind_data.initial_reader->file_handle->FileSize() / estimated_row_width; |
1018 | } else { |
1019 | // determined through the scientific method as the average amount of rows in a CSV file |
1020 | per_file_cardinality = 42; |
1021 | } |
1022 | return make_uniq<NodeStatistics>(args: bind_data.files.size() * per_file_cardinality); |
1023 | } |
1024 | |
1025 | void BufferedCSVReaderOptions::Serialize(FieldWriter &writer) const { |
1026 | // common options |
1027 | writer.WriteField<bool>(element: has_delimiter); |
1028 | writer.WriteString(val: delimiter); |
1029 | writer.WriteField<bool>(element: has_quote); |
1030 | writer.WriteString(val: quote); |
1031 | writer.WriteField<bool>(element: has_escape); |
1032 | writer.WriteString(val: escape); |
1033 | writer.WriteField<bool>(element: has_header); |
1034 | writer.WriteField<bool>(element: header); |
1035 | writer.WriteField<bool>(element: ignore_errors); |
1036 | writer.WriteField<idx_t>(element: num_cols); |
1037 | writer.WriteField<idx_t>(element: buffer_sample_size); |
1038 | writer.WriteString(val: null_str); |
1039 | writer.WriteField<FileCompressionType>(element: compression); |
1040 | writer.WriteField<NewLineIdentifier>(element: new_line); |
1041 | writer.WriteField<bool>(element: allow_quoted_nulls); |
1042 | // read options |
1043 | writer.WriteField<idx_t>(element: skip_rows); |
1044 | writer.WriteField<bool>(element: skip_rows_set); |
1045 | writer.WriteField<idx_t>(element: maximum_line_size); |
1046 | writer.WriteField<bool>(element: normalize_names); |
1047 | writer.WriteListNoReference<bool>(elements: force_not_null); |
1048 | writer.WriteField<bool>(element: all_varchar); |
1049 | writer.WriteField<idx_t>(element: sample_chunk_size); |
1050 | writer.WriteField<idx_t>(element: sample_chunks); |
1051 | writer.WriteField<bool>(element: auto_detect); |
1052 | writer.WriteString(val: file_path); |
1053 | writer.WriteString(val: decimal_separator); |
1054 | writer.WriteField<bool>(element: null_padding); |
1055 | writer.WriteField<idx_t>(element: buffer_size); |
1056 | writer.WriteSerializable(element: file_options); |
1057 | // write options |
1058 | writer.WriteListNoReference<bool>(elements: force_quote); |
1059 | // FIXME: serialize date_format / has_format |
1060 | vector<string> csv_formats; |
1061 | for (auto &format : date_format) { |
1062 | csv_formats.push_back(x: format.second.format_specifier); |
1063 | } |
1064 | writer.WriteList<string>(elements: csv_formats); |
1065 | } |
1066 | |
1067 | void BufferedCSVReaderOptions::Deserialize(FieldReader &reader) { |
1068 | // common options |
1069 | has_delimiter = reader.ReadRequired<bool>(); |
1070 | delimiter = reader.ReadRequired<string>(); |
1071 | has_quote = reader.ReadRequired<bool>(); |
1072 | quote = reader.ReadRequired<string>(); |
1073 | has_escape = reader.ReadRequired<bool>(); |
1074 | escape = reader.ReadRequired<string>(); |
1075 | has_header = reader.ReadRequired<bool>(); |
1076 | header = reader.ReadRequired<bool>(); |
1077 | ignore_errors = reader.ReadRequired<bool>(); |
1078 | num_cols = reader.ReadRequired<idx_t>(); |
1079 | buffer_sample_size = reader.ReadRequired<idx_t>(); |
1080 | null_str = reader.ReadRequired<string>(); |
1081 | compression = reader.ReadRequired<FileCompressionType>(); |
1082 | new_line = reader.ReadRequired<NewLineIdentifier>(); |
1083 | allow_quoted_nulls = reader.ReadRequired<bool>(); |
1084 | // read options |
1085 | skip_rows = reader.ReadRequired<idx_t>(); |
1086 | skip_rows_set = reader.ReadRequired<bool>(); |
1087 | maximum_line_size = reader.ReadRequired<idx_t>(); |
1088 | normalize_names = reader.ReadRequired<bool>(); |
1089 | force_not_null = reader.ReadRequiredList<bool>(); |
1090 | all_varchar = reader.ReadRequired<bool>(); |
1091 | sample_chunk_size = reader.ReadRequired<idx_t>(); |
1092 | sample_chunks = reader.ReadRequired<idx_t>(); |
1093 | auto_detect = reader.ReadRequired<bool>(); |
1094 | file_path = reader.ReadRequired<string>(); |
1095 | decimal_separator = reader.ReadRequired<string>(); |
1096 | null_padding = reader.ReadRequired<bool>(); |
1097 | buffer_size = reader.ReadRequired<idx_t>(); |
1098 | file_options = reader.ReadRequiredSerializable<MultiFileReaderOptions, MultiFileReaderOptions>(); |
1099 | // write options |
1100 | force_quote = reader.ReadRequiredList<bool>(); |
1101 | auto formats = reader.ReadRequiredList<string>(); |
1102 | vector<LogicalTypeId> format_types {LogicalTypeId::DATE, LogicalTypeId::TIMESTAMP}; |
1103 | for (idx_t f_idx = 0; f_idx < formats.size(); f_idx++) { |
1104 | auto &format = formats[f_idx]; |
1105 | auto &type = format_types[f_idx]; |
1106 | if (format.empty()) { |
1107 | continue; |
1108 | } |
1109 | has_format[type] = true; |
1110 | StrTimeFormat::ParseFormatSpecifier(format_string: format, format&: date_format[type]); |
1111 | } |
1112 | } |
1113 | |
1114 | static void CSVReaderSerialize(FieldWriter &writer, const FunctionData *bind_data_p, const TableFunction &function) { |
1115 | auto &bind_data = bind_data_p->Cast<ReadCSVData>(); |
1116 | writer.WriteString(val: function.extra_info); |
1117 | writer.WriteList<string>(elements: bind_data.files); |
1118 | writer.WriteRegularSerializableList<LogicalType>(elements: bind_data.csv_types); |
1119 | writer.WriteList<string>(elements: bind_data.csv_names); |
1120 | writer.WriteRegularSerializableList<LogicalType>(elements: bind_data.return_types); |
1121 | writer.WriteList<string>(elements: bind_data.return_names); |
1122 | writer.WriteField<idx_t>(element: bind_data.filename_col_idx); |
1123 | writer.WriteField<idx_t>(element: bind_data.hive_partition_col_idx); |
1124 | bind_data.options.Serialize(writer); |
1125 | writer.WriteField<bool>(element: bind_data.single_threaded); |
1126 | writer.WriteSerializable(element: bind_data.reader_bind); |
1127 | writer.WriteField<uint32_t>(element: bind_data.column_info.size()); |
1128 | for (auto &col : bind_data.column_info) { |
1129 | col.Serialize(writer); |
1130 | } |
1131 | } |
1132 | |
1133 | static unique_ptr<FunctionData> CSVReaderDeserialize(PlanDeserializationState &state, FieldReader &reader, |
1134 | TableFunction &function) { |
1135 | function.extra_info = reader.ReadRequired<string>(); |
1136 | auto result_data = make_uniq<ReadCSVData>(); |
1137 | result_data->files = reader.ReadRequiredList<string>(); |
1138 | result_data->csv_types = reader.ReadRequiredSerializableList<LogicalType, LogicalType>(); |
1139 | result_data->csv_names = reader.ReadRequiredList<string>(); |
1140 | result_data->return_types = reader.ReadRequiredSerializableList<LogicalType, LogicalType>(); |
1141 | result_data->return_names = reader.ReadRequiredList<string>(); |
1142 | result_data->filename_col_idx = reader.ReadRequired<idx_t>(); |
1143 | result_data->hive_partition_col_idx = reader.ReadRequired<idx_t>(); |
1144 | result_data->options.Deserialize(reader); |
1145 | result_data->single_threaded = reader.ReadField<bool>(default_value: true); |
1146 | result_data->reader_bind = reader.ReadRequiredSerializable<MultiFileReaderBindData, MultiFileReaderBindData>(); |
1147 | uint32_t file_number = reader.ReadRequired<uint32_t>(); |
1148 | for (idx_t i = 0; i < file_number; i++) { |
1149 | result_data->column_info.emplace_back(args: ColumnInfo::Deserialize(reader)); |
1150 | } |
1151 | return std::move(result_data); |
1152 | } |
1153 | |
1154 | TableFunction ReadCSVTableFunction::GetFunction() { |
1155 | TableFunction read_csv("read_csv" , {LogicalType::VARCHAR}, ReadCSVFunction, ReadCSVBind, ReadCSVInitGlobal, |
1156 | ReadCSVInitLocal); |
1157 | read_csv.table_scan_progress = CSVReaderProgress; |
1158 | read_csv.pushdown_complex_filter = CSVComplexFilterPushdown; |
1159 | read_csv.serialize = CSVReaderSerialize; |
1160 | read_csv.deserialize = CSVReaderDeserialize; |
1161 | read_csv.get_batch_index = CSVReaderGetBatchIndex; |
1162 | read_csv.cardinality = CSVReaderCardinality; |
1163 | read_csv.projection_pushdown = true; |
1164 | ReadCSVAddNamedParameters(table_function&: read_csv); |
1165 | return read_csv; |
1166 | } |
1167 | |
1168 | TableFunction ReadCSVTableFunction::GetAutoFunction() { |
1169 | auto read_csv_auto = ReadCSVTableFunction::GetFunction(); |
1170 | read_csv_auto.name = "read_csv_auto" ; |
1171 | read_csv_auto.bind = ReadCSVAutoBind; |
1172 | return read_csv_auto; |
1173 | } |
1174 | |
1175 | void ReadCSVTableFunction::RegisterFunction(BuiltinFunctions &set) { |
1176 | set.AddFunction(set: MultiFileReader::CreateFunctionSet(table_function: ReadCSVTableFunction::GetFunction())); |
1177 | set.AddFunction(set: MultiFileReader::CreateFunctionSet(table_function: ReadCSVTableFunction::GetAutoFunction())); |
1178 | } |
1179 | |
1180 | unique_ptr<TableRef> ReadCSVReplacement(ClientContext &context, const string &table_name, ReplacementScanData *data) { |
1181 | auto lower_name = StringUtil::Lower(str: table_name); |
1182 | // remove any compression |
1183 | if (StringUtil::EndsWith(str: lower_name, suffix: ".gz" )) { |
1184 | lower_name = lower_name.substr(pos: 0, n: lower_name.size() - 3); |
1185 | } else if (StringUtil::EndsWith(str: lower_name, suffix: ".zst" )) { |
1186 | lower_name = lower_name.substr(pos: 0, n: lower_name.size() - 4); |
1187 | } |
1188 | if (!StringUtil::EndsWith(str: lower_name, suffix: ".csv" ) && !StringUtil::Contains(haystack: lower_name, needle: ".csv?" ) && |
1189 | !StringUtil::EndsWith(str: lower_name, suffix: ".tsv" ) && !StringUtil::Contains(haystack: lower_name, needle: ".tsv?" )) { |
1190 | return nullptr; |
1191 | } |
1192 | auto table_function = make_uniq<TableFunctionRef>(); |
1193 | vector<unique_ptr<ParsedExpression>> children; |
1194 | children.push_back(x: make_uniq<ConstantExpression>(args: Value(table_name))); |
1195 | table_function->function = make_uniq<FunctionExpression>(args: "read_csv_auto" , args: std::move(children)); |
1196 | |
1197 | if (!FileSystem::HasGlob(str: table_name)) { |
1198 | table_function->alias = FileSystem::ExtractBaseName(path: table_name); |
1199 | } |
1200 | |
1201 | return std::move(table_function); |
1202 | } |
1203 | |
1204 | void BuiltinFunctions::RegisterReadFunctions() { |
1205 | CSVCopyFunction::RegisterFunction(set&: *this); |
1206 | ReadCSVTableFunction::RegisterFunction(set&: *this); |
1207 | auto &config = DBConfig::GetConfig(db&: *transaction.db); |
1208 | config.replacement_scans.emplace_back(args&: ReadCSVReplacement); |
1209 | } |
1210 | |
1211 | } // namespace duckdb |
1212 | |