1 | #include "duckdb/common/multi_file_reader.hpp" |
2 | #include "duckdb/function/table_function.hpp" |
3 | #include "duckdb/main/config.hpp" |
4 | #include "duckdb/common/types/value.hpp" |
5 | #include "duckdb/planner/operator/logical_get.hpp" |
6 | #include "duckdb/common/exception.hpp" |
7 | #include "duckdb/function/function_set.hpp" |
8 | #include "duckdb/common/hive_partitioning.hpp" |
9 | |
10 | namespace duckdb { |
11 | |
12 | void MultiFileReader::AddParameters(TableFunction &table_function) { |
13 | table_function.named_parameters["filename" ] = LogicalType::BOOLEAN; |
14 | table_function.named_parameters["hive_partitioning" ] = LogicalType::BOOLEAN; |
15 | table_function.named_parameters["union_by_name" ] = LogicalType::BOOLEAN; |
16 | } |
17 | |
18 | vector<string> MultiFileReader::GetFileList(ClientContext &context, const Value &input, const string &name, |
19 | FileGlobOptions options) { |
20 | auto &config = DBConfig::GetConfig(context); |
21 | if (!config.options.enable_external_access) { |
22 | throw PermissionException("Scanning %s files is disabled through configuration" , name); |
23 | } |
24 | if (input.IsNull()) { |
25 | throw ParserException("%s reader cannot take NULL list as parameter" , name); |
26 | } |
27 | FileSystem &fs = FileSystem::GetFileSystem(context); |
28 | vector<string> files; |
29 | if (input.type().id() == LogicalTypeId::VARCHAR) { |
30 | auto file_name = StringValue::Get(value: input); |
31 | files = fs.GlobFiles(pattern: file_name, context, options); |
32 | } else if (input.type().id() == LogicalTypeId::LIST) { |
33 | for (auto &val : ListValue::GetChildren(value: input)) { |
34 | if (val.IsNull()) { |
35 | throw ParserException("%s reader cannot take NULL input as parameter" , name); |
36 | } |
37 | if (val.type().id() != LogicalTypeId::VARCHAR) { |
38 | throw ParserException("%s reader can only take a list of strings as a parameter" , name); |
39 | } |
40 | auto glob_files = fs.GlobFiles(pattern: StringValue::Get(value: val), context, options); |
41 | files.insert(position: files.end(), first: glob_files.begin(), last: glob_files.end()); |
42 | } |
43 | } else { |
44 | throw InternalException("Unsupported type for MultiFileReader::GetFileList" ); |
45 | } |
46 | if (files.empty() && options == FileGlobOptions::DISALLOW_EMPTY) { |
47 | throw IOException("%s reader needs at least one file to read" , name); |
48 | } |
49 | return files; |
50 | } |
51 | |
52 | bool MultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options) { |
53 | auto loption = StringUtil::Lower(str: key); |
54 | if (loption == "filename" ) { |
55 | options.filename = BooleanValue::Get(value: val); |
56 | } else if (loption == "hive_partitioning" ) { |
57 | options.hive_partitioning = BooleanValue::Get(value: val); |
58 | options.auto_detect_hive_partitioning = false; |
59 | } else if (loption == "union_by_name" ) { |
60 | options.union_by_name = BooleanValue::Get(value: val); |
61 | } else { |
62 | return false; |
63 | } |
64 | return true; |
65 | } |
66 | |
67 | bool MultiFileReader::ComplexFilterPushdown(ClientContext &context, vector<string> &files, |
68 | const MultiFileReaderOptions &options, LogicalGet &get, |
69 | vector<unique_ptr<Expression>> &filters) { |
70 | if (files.empty()) { |
71 | return false; |
72 | } |
73 | if (!options.hive_partitioning && !options.filename) { |
74 | return false; |
75 | } |
76 | |
77 | unordered_map<string, column_t> column_map; |
78 | for (idx_t i = 0; i < get.column_ids.size(); i++) { |
79 | column_map.insert(x: {get.names[get.column_ids[i]], i}); |
80 | } |
81 | |
82 | auto start_files = files.size(); |
83 | HivePartitioning::ApplyFiltersToFileList(context, files, filters, column_map, table_index: get.table_index, |
84 | hive_enabled: options.hive_partitioning, filename_enabled: options.filename); |
85 | if (files.size() != start_files) { |
86 | // we have pruned files |
87 | return true; |
88 | } |
89 | return false; |
90 | } |
91 | |
92 | MultiFileReaderBindData MultiFileReader::BindOptions(MultiFileReaderOptions &options, const vector<string> &files, |
93 | vector<LogicalType> &return_types, vector<string> &names) { |
94 | MultiFileReaderBindData bind_data; |
95 | // Add generated constant column for filename |
96 | if (options.filename) { |
97 | if (std::find(first: names.begin(), last: names.end(), val: "filename" ) != names.end()) { |
98 | throw BinderException("Using filename option on file with column named filename is not supported" ); |
99 | } |
100 | bind_data.filename_idx = names.size(); |
101 | return_types.emplace_back(args: LogicalType::VARCHAR); |
102 | names.emplace_back(args: "filename" ); |
103 | } |
104 | |
105 | // Add generated constant columns from hive partitioning scheme |
106 | if (options.hive_partitioning) { |
107 | D_ASSERT(!files.empty()); |
108 | auto partitions = HivePartitioning::Parse(filename: files[0]); |
109 | // verify that all files have the same hive partitioning scheme |
110 | for (auto &f : files) { |
111 | auto file_partitions = HivePartitioning::Parse(filename: f); |
112 | for (auto &part_info : partitions) { |
113 | if (file_partitions.find(x: part_info.first) == file_partitions.end()) { |
114 | if (options.auto_detect_hive_partitioning == true) { |
115 | throw BinderException( |
116 | "Hive partitioning was enabled automatically, but an error was encountered: Hive partition " |
117 | "mismatch between file \"%s\" and \"%s\": key \"%s\" not found\n\nTo switch off hive " |
118 | "partition, set: HIVE_PARTITIONING=0" , |
119 | files[0], f, part_info.first); |
120 | } |
121 | throw BinderException( |
122 | "Hive partition mismatch between file \"%s\" and \"%s\": key \"%s\" not found" , files[0], f, |
123 | part_info.first); |
124 | } |
125 | } |
126 | if (partitions.size() != file_partitions.size()) { |
127 | if (options.auto_detect_hive_partitioning == true) { |
128 | throw BinderException("Hive partitioning was enabled automatically, but an error was encountered: " |
129 | "Hive partition mismatch between file \"%s\" and \"%s\"\n\nTo switch off " |
130 | "hive partition, set: HIVE_PARTITIONING=0" , |
131 | files[0], f); |
132 | } |
133 | throw BinderException("Hive partition mismatch between file \"%s\" and \"%s\"" , files[0], f); |
134 | } |
135 | } |
136 | for (auto &part : partitions) { |
137 | idx_t hive_partitioning_index = DConstants::INVALID_INDEX; |
138 | auto lookup = std::find(first: names.begin(), last: names.end(), val: part.first); |
139 | if (lookup != names.end()) { |
140 | // hive partitioning column also exists in file - override |
141 | auto idx = lookup - names.begin(); |
142 | hive_partitioning_index = idx; |
143 | return_types[idx] = LogicalType::VARCHAR; |
144 | } else { |
145 | // hive partitioning column does not exist in file - add a new column containing the key |
146 | hive_partitioning_index = names.size(); |
147 | return_types.emplace_back(args: LogicalType::VARCHAR); |
148 | names.emplace_back(args: part.first); |
149 | } |
150 | bind_data.hive_partitioning_indexes.emplace_back(args: part.first, args&: hive_partitioning_index); |
151 | } |
152 | } |
153 | return bind_data; |
154 | } |
155 | |
156 | void MultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, |
157 | const string &filename, const vector<string> &local_names, |
158 | const vector<LogicalType> &global_types, const vector<string> &global_names, |
159 | const vector<column_t> &global_column_ids, MultiFileReaderData &reader_data) { |
160 | // create a map of name -> column index |
161 | case_insensitive_map_t<idx_t> name_map; |
162 | if (file_options.union_by_name) { |
163 | for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { |
164 | name_map[local_names[col_idx]] = col_idx; |
165 | } |
166 | } |
167 | for (idx_t i = 0; i < global_column_ids.size(); i++) { |
168 | auto column_id = global_column_ids[i]; |
169 | if (IsRowIdColumnId(column_id)) { |
170 | // row-id |
171 | reader_data.constant_map.emplace_back(args&: i, args: Value::BIGINT(value: 42)); |
172 | continue; |
173 | } |
174 | if (column_id == options.filename_idx) { |
175 | // filename |
176 | reader_data.constant_map.emplace_back(args&: i, args: Value(filename)); |
177 | continue; |
178 | } |
179 | if (!options.hive_partitioning_indexes.empty()) { |
180 | // hive partition constants |
181 | auto partitions = HivePartitioning::Parse(filename); |
182 | D_ASSERT(partitions.size() == options.hive_partitioning_indexes.size()); |
183 | bool found_partition = false; |
184 | for (auto &entry : options.hive_partitioning_indexes) { |
185 | if (column_id == entry.index) { |
186 | reader_data.constant_map.emplace_back(args&: i, args: Value(partitions[entry.value])); |
187 | found_partition = true; |
188 | break; |
189 | } |
190 | } |
191 | if (found_partition) { |
192 | continue; |
193 | } |
194 | } |
195 | if (file_options.union_by_name) { |
196 | auto &global_name = global_names[column_id]; |
197 | auto entry = name_map.find(x: global_name); |
198 | bool not_present_in_file = entry == name_map.end(); |
199 | if (not_present_in_file) { |
200 | // we need to project a column with name \"global_name\" - but it does not exist in the current file |
201 | // push a NULL value of the specified type |
202 | reader_data.constant_map.emplace_back(args&: i, args: Value(global_types[column_id])); |
203 | continue; |
204 | } |
205 | } |
206 | } |
207 | } |
208 | |
209 | void MultiFileReader::CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types, |
210 | const vector<string> &local_names, const vector<LogicalType> &global_types, |
211 | const vector<string> &global_names, const vector<column_t> &global_column_ids, |
212 | MultiFileReaderData &reader_data, const string &initial_file) { |
213 | D_ASSERT(global_types.size() == global_names.size()); |
214 | D_ASSERT(local_types.size() == local_names.size()); |
215 | // we have expected types: create a map of name -> column index |
216 | case_insensitive_map_t<idx_t> name_map; |
217 | for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { |
218 | name_map[local_names[col_idx]] = col_idx; |
219 | } |
220 | for (idx_t i = 0; i < global_column_ids.size(); i++) { |
221 | // check if this is a constant column |
222 | bool constant = false; |
223 | for (auto &entry : reader_data.constant_map) { |
224 | if (entry.column_id == i) { |
225 | constant = true; |
226 | break; |
227 | } |
228 | } |
229 | if (constant) { |
230 | // this column is constant for this file |
231 | continue; |
232 | } |
233 | // not constant - look up the column in the name map |
234 | auto global_id = global_column_ids[i]; |
235 | if (global_id >= global_types.size()) { |
236 | throw InternalException( |
237 | "MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file" ); |
238 | } |
239 | auto &global_name = global_names[global_id]; |
240 | auto entry = name_map.find(x: global_name); |
241 | if (entry == name_map.end()) { |
242 | string candidate_names; |
243 | for (auto &local_name : local_names) { |
244 | if (!candidate_names.empty()) { |
245 | candidate_names += ", " ; |
246 | } |
247 | candidate_names += local_name; |
248 | } |
249 | throw IOException( |
250 | StringUtil::Format(fmt_str: "Failed to read file \"%s\": schema mismatch in glob: column \"%s\" was read from " |
251 | "the original file \"%s\", but could not be found in file \"%s\".\nCandidate names: " |
252 | "%s\nIf you are trying to " |
253 | "read files with different schemas, try setting union_by_name=True" , |
254 | params: file_name, params: global_name, params: initial_file, params: file_name, params: candidate_names)); |
255 | } |
256 | // we found the column in the local file - check if the types are the same |
257 | auto local_id = entry->second; |
258 | D_ASSERT(global_id < global_types.size()); |
259 | D_ASSERT(local_id < local_types.size()); |
260 | auto &global_type = global_types[global_id]; |
261 | auto &local_type = local_types[local_id]; |
262 | if (global_type != local_type) { |
263 | reader_data.cast_map[local_id] = global_type; |
264 | } |
265 | // the types are the same - create the mapping |
266 | reader_data.column_mapping.push_back(x: i); |
267 | reader_data.column_ids.push_back(x: local_id); |
268 | } |
269 | reader_data.empty_columns = reader_data.column_ids.empty(); |
270 | } |
271 | |
272 | void MultiFileReader::CreateMapping(const string &file_name, const vector<LogicalType> &local_types, |
273 | const vector<string> &local_names, const vector<LogicalType> &global_types, |
274 | const vector<string> &global_names, const vector<column_t> &global_column_ids, |
275 | optional_ptr<TableFilterSet> filters, MultiFileReaderData &reader_data, |
276 | const string &initial_file) { |
277 | CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, |
278 | initial_file); |
279 | if (filters) { |
280 | reader_data.filter_map.resize(new_size: global_types.size()); |
281 | for (idx_t c = 0; c < reader_data.column_mapping.size(); c++) { |
282 | auto map_index = reader_data.column_mapping[c]; |
283 | reader_data.filter_map[map_index].index = c; |
284 | reader_data.filter_map[map_index].is_constant = false; |
285 | } |
286 | for (idx_t c = 0; c < reader_data.constant_map.size(); c++) { |
287 | auto constant_index = reader_data.constant_map[c].column_id; |
288 | reader_data.filter_map[constant_index].index = c; |
289 | reader_data.filter_map[constant_index].is_constant = true; |
290 | } |
291 | } |
292 | } |
293 | |
294 | void MultiFileReader::FinalizeChunk(const MultiFileReaderBindData &bind_data, const MultiFileReaderData &reader_data, |
295 | DataChunk &chunk) { |
296 | // reference all the constants set up in MultiFileReader::FinalizeBind |
297 | for (auto &entry : reader_data.constant_map) { |
298 | chunk.data[entry.column_id].Reference(value: entry.value); |
299 | } |
300 | chunk.Verify(); |
301 | } |
302 | |
303 | TableFunctionSet MultiFileReader::CreateFunctionSet(TableFunction table_function) { |
304 | TableFunctionSet function_set(table_function.name); |
305 | function_set.AddFunction(function: table_function); |
306 | D_ASSERT(table_function.arguments.size() == 1 && table_function.arguments[0] == LogicalType::VARCHAR); |
307 | table_function.arguments[0] = LogicalType::LIST(child: LogicalType::VARCHAR); |
308 | function_set.AddFunction(function: std::move(table_function)); |
309 | return function_set; |
310 | } |
311 | |
312 | void MultiFileReaderOptions::Serialize(Serializer &serializer) const { |
313 | FieldWriter writer(serializer); |
314 | writer.WriteField<bool>(element: filename); |
315 | writer.WriteField<bool>(element: hive_partitioning); |
316 | writer.WriteField<bool>(element: union_by_name); |
317 | writer.Finalize(); |
318 | } |
319 | |
320 | MultiFileReaderOptions MultiFileReaderOptions::Deserialize(Deserializer &source) { |
321 | MultiFileReaderOptions result; |
322 | FieldReader reader(source); |
323 | result.filename = reader.ReadRequired<bool>(); |
324 | result.hive_partitioning = reader.ReadRequired<bool>(); |
325 | result.union_by_name = reader.ReadRequired<bool>(); |
326 | reader.Finalize(); |
327 | return result; |
328 | } |
329 | |
330 | void MultiFileReaderBindData::Serialize(Serializer &serializer) const { |
331 | FieldWriter writer(serializer); |
332 | writer.WriteField(element: filename_idx); |
333 | writer.WriteRegularSerializableList<HivePartitioningIndex>(elements: hive_partitioning_indexes); |
334 | writer.Finalize(); |
335 | } |
336 | |
337 | MultiFileReaderBindData MultiFileReaderBindData::Deserialize(Deserializer &source) { |
338 | MultiFileReaderBindData result; |
339 | FieldReader reader(source); |
340 | result.filename_idx = reader.ReadRequired<idx_t>(); |
341 | result.hive_partitioning_indexes = |
342 | reader.ReadRequiredSerializableList<HivePartitioningIndex, HivePartitioningIndex>(); |
343 | reader.Finalize(); |
344 | return result; |
345 | } |
346 | |
347 | HivePartitioningIndex::HivePartitioningIndex(string value_p, idx_t index) : value(std::move(value_p)), index(index) { |
348 | } |
349 | |
350 | void HivePartitioningIndex::Serialize(Serializer &serializer) const { |
351 | FieldWriter writer(serializer); |
352 | writer.WriteString(val: value); |
353 | writer.WriteField<idx_t>(element: index); |
354 | writer.Finalize(); |
355 | } |
356 | |
357 | HivePartitioningIndex HivePartitioningIndex::Deserialize(Deserializer &source) { |
358 | FieldReader reader(source); |
359 | auto value = reader.ReadRequired<string>(); |
360 | auto index = reader.ReadRequired<idx_t>(); |
361 | reader.Finalize(); |
362 | return HivePartitioningIndex(std::move(value), index); |
363 | } |
364 | |
365 | void MultiFileReaderOptions::AddBatchInfo(BindInfo &bind_info) const { |
366 | bind_info.InsertOption(name: "filename" , value: Value::BOOLEAN(value: filename)); |
367 | bind_info.InsertOption(name: "hive_partitioning" , value: Value::BOOLEAN(value: hive_partitioning)); |
368 | bind_info.InsertOption(name: "union_by_name" , value: Value::BOOLEAN(value: union_by_name)); |
369 | } |
370 | |
371 | void UnionByName::CombineUnionTypes(const vector<string> &col_names, const vector<LogicalType> &sql_types, |
372 | vector<LogicalType> &union_col_types, vector<string> &union_col_names, |
373 | case_insensitive_map_t<idx_t> &union_names_map) { |
374 | D_ASSERT(col_names.size() == sql_types.size()); |
375 | |
376 | for (idx_t col = 0; col < col_names.size(); ++col) { |
377 | auto union_find = union_names_map.find(x: col_names[col]); |
378 | |
379 | if (union_find != union_names_map.end()) { |
380 | // given same name , union_col's type must compatible with col's type |
381 | auto ¤t_type = union_col_types[union_find->second]; |
382 | LogicalType compatible_type; |
383 | compatible_type = LogicalType::MaxLogicalType(left: current_type, right: sql_types[col]); |
384 | union_col_types[union_find->second] = compatible_type; |
385 | } else { |
386 | union_names_map[col_names[col]] = union_col_names.size(); |
387 | union_col_names.emplace_back(args: col_names[col]); |
388 | union_col_types.emplace_back(args: sql_types[col]); |
389 | } |
390 | } |
391 | } |
392 | |
393 | } // namespace duckdb |
394 | |