1#include "duckdb/common/multi_file_reader.hpp"
2#include "duckdb/function/table_function.hpp"
3#include "duckdb/main/config.hpp"
4#include "duckdb/common/types/value.hpp"
5#include "duckdb/planner/operator/logical_get.hpp"
6#include "duckdb/common/exception.hpp"
7#include "duckdb/function/function_set.hpp"
8#include "duckdb/common/hive_partitioning.hpp"
9
10namespace duckdb {
11
12void MultiFileReader::AddParameters(TableFunction &table_function) {
13 table_function.named_parameters["filename"] = LogicalType::BOOLEAN;
14 table_function.named_parameters["hive_partitioning"] = LogicalType::BOOLEAN;
15 table_function.named_parameters["union_by_name"] = LogicalType::BOOLEAN;
16}
17
18vector<string> MultiFileReader::GetFileList(ClientContext &context, const Value &input, const string &name,
19 FileGlobOptions options) {
20 auto &config = DBConfig::GetConfig(context);
21 if (!config.options.enable_external_access) {
22 throw PermissionException("Scanning %s files is disabled through configuration", name);
23 }
24 if (input.IsNull()) {
25 throw ParserException("%s reader cannot take NULL list as parameter", name);
26 }
27 FileSystem &fs = FileSystem::GetFileSystem(context);
28 vector<string> files;
29 if (input.type().id() == LogicalTypeId::VARCHAR) {
30 auto file_name = StringValue::Get(value: input);
31 files = fs.GlobFiles(pattern: file_name, context, options);
32 } else if (input.type().id() == LogicalTypeId::LIST) {
33 for (auto &val : ListValue::GetChildren(value: input)) {
34 if (val.IsNull()) {
35 throw ParserException("%s reader cannot take NULL input as parameter", name);
36 }
37 if (val.type().id() != LogicalTypeId::VARCHAR) {
38 throw ParserException("%s reader can only take a list of strings as a parameter", name);
39 }
40 auto glob_files = fs.GlobFiles(pattern: StringValue::Get(value: val), context, options);
41 files.insert(position: files.end(), first: glob_files.begin(), last: glob_files.end());
42 }
43 } else {
44 throw InternalException("Unsupported type for MultiFileReader::GetFileList");
45 }
46 if (files.empty() && options == FileGlobOptions::DISALLOW_EMPTY) {
47 throw IOException("%s reader needs at least one file to read", name);
48 }
49 return files;
50}
51
52bool MultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options) {
53 auto loption = StringUtil::Lower(str: key);
54 if (loption == "filename") {
55 options.filename = BooleanValue::Get(value: val);
56 } else if (loption == "hive_partitioning") {
57 options.hive_partitioning = BooleanValue::Get(value: val);
58 options.auto_detect_hive_partitioning = false;
59 } else if (loption == "union_by_name") {
60 options.union_by_name = BooleanValue::Get(value: val);
61 } else {
62 return false;
63 }
64 return true;
65}
66
67bool MultiFileReader::ComplexFilterPushdown(ClientContext &context, vector<string> &files,
68 const MultiFileReaderOptions &options, LogicalGet &get,
69 vector<unique_ptr<Expression>> &filters) {
70 if (files.empty()) {
71 return false;
72 }
73 if (!options.hive_partitioning && !options.filename) {
74 return false;
75 }
76
77 unordered_map<string, column_t> column_map;
78 for (idx_t i = 0; i < get.column_ids.size(); i++) {
79 column_map.insert(x: {get.names[get.column_ids[i]], i});
80 }
81
82 auto start_files = files.size();
83 HivePartitioning::ApplyFiltersToFileList(context, files, filters, column_map, table_index: get.table_index,
84 hive_enabled: options.hive_partitioning, filename_enabled: options.filename);
85 if (files.size() != start_files) {
86 // we have pruned files
87 return true;
88 }
89 return false;
90}
91
92MultiFileReaderBindData MultiFileReader::BindOptions(MultiFileReaderOptions &options, const vector<string> &files,
93 vector<LogicalType> &return_types, vector<string> &names) {
94 MultiFileReaderBindData bind_data;
95 // Add generated constant column for filename
96 if (options.filename) {
97 if (std::find(first: names.begin(), last: names.end(), val: "filename") != names.end()) {
98 throw BinderException("Using filename option on file with column named filename is not supported");
99 }
100 bind_data.filename_idx = names.size();
101 return_types.emplace_back(args: LogicalType::VARCHAR);
102 names.emplace_back(args: "filename");
103 }
104
105 // Add generated constant columns from hive partitioning scheme
106 if (options.hive_partitioning) {
107 D_ASSERT(!files.empty());
108 auto partitions = HivePartitioning::Parse(filename: files[0]);
109 // verify that all files have the same hive partitioning scheme
110 for (auto &f : files) {
111 auto file_partitions = HivePartitioning::Parse(filename: f);
112 for (auto &part_info : partitions) {
113 if (file_partitions.find(x: part_info.first) == file_partitions.end()) {
114 if (options.auto_detect_hive_partitioning == true) {
115 throw BinderException(
116 "Hive partitioning was enabled automatically, but an error was encountered: Hive partition "
117 "mismatch between file \"%s\" and \"%s\": key \"%s\" not found\n\nTo switch off hive "
118 "partition, set: HIVE_PARTITIONING=0",
119 files[0], f, part_info.first);
120 }
121 throw BinderException(
122 "Hive partition mismatch between file \"%s\" and \"%s\": key \"%s\" not found", files[0], f,
123 part_info.first);
124 }
125 }
126 if (partitions.size() != file_partitions.size()) {
127 if (options.auto_detect_hive_partitioning == true) {
128 throw BinderException("Hive partitioning was enabled automatically, but an error was encountered: "
129 "Hive partition mismatch between file \"%s\" and \"%s\"\n\nTo switch off "
130 "hive partition, set: HIVE_PARTITIONING=0",
131 files[0], f);
132 }
133 throw BinderException("Hive partition mismatch between file \"%s\" and \"%s\"", files[0], f);
134 }
135 }
136 for (auto &part : partitions) {
137 idx_t hive_partitioning_index = DConstants::INVALID_INDEX;
138 auto lookup = std::find(first: names.begin(), last: names.end(), val: part.first);
139 if (lookup != names.end()) {
140 // hive partitioning column also exists in file - override
141 auto idx = lookup - names.begin();
142 hive_partitioning_index = idx;
143 return_types[idx] = LogicalType::VARCHAR;
144 } else {
145 // hive partitioning column does not exist in file - add a new column containing the key
146 hive_partitioning_index = names.size();
147 return_types.emplace_back(args: LogicalType::VARCHAR);
148 names.emplace_back(args: part.first);
149 }
150 bind_data.hive_partitioning_indexes.emplace_back(args: part.first, args&: hive_partitioning_index);
151 }
152 }
153 return bind_data;
154}
155
156void MultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options,
157 const string &filename, const vector<string> &local_names,
158 const vector<LogicalType> &global_types, const vector<string> &global_names,
159 const vector<column_t> &global_column_ids, MultiFileReaderData &reader_data) {
160 // create a map of name -> column index
161 case_insensitive_map_t<idx_t> name_map;
162 if (file_options.union_by_name) {
163 for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) {
164 name_map[local_names[col_idx]] = col_idx;
165 }
166 }
167 for (idx_t i = 0; i < global_column_ids.size(); i++) {
168 auto column_id = global_column_ids[i];
169 if (IsRowIdColumnId(column_id)) {
170 // row-id
171 reader_data.constant_map.emplace_back(args&: i, args: Value::BIGINT(value: 42));
172 continue;
173 }
174 if (column_id == options.filename_idx) {
175 // filename
176 reader_data.constant_map.emplace_back(args&: i, args: Value(filename));
177 continue;
178 }
179 if (!options.hive_partitioning_indexes.empty()) {
180 // hive partition constants
181 auto partitions = HivePartitioning::Parse(filename);
182 D_ASSERT(partitions.size() == options.hive_partitioning_indexes.size());
183 bool found_partition = false;
184 for (auto &entry : options.hive_partitioning_indexes) {
185 if (column_id == entry.index) {
186 reader_data.constant_map.emplace_back(args&: i, args: Value(partitions[entry.value]));
187 found_partition = true;
188 break;
189 }
190 }
191 if (found_partition) {
192 continue;
193 }
194 }
195 if (file_options.union_by_name) {
196 auto &global_name = global_names[column_id];
197 auto entry = name_map.find(x: global_name);
198 bool not_present_in_file = entry == name_map.end();
199 if (not_present_in_file) {
200 // we need to project a column with name \"global_name\" - but it does not exist in the current file
201 // push a NULL value of the specified type
202 reader_data.constant_map.emplace_back(args&: i, args: Value(global_types[column_id]));
203 continue;
204 }
205 }
206 }
207}
208
209void MultiFileReader::CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types,
210 const vector<string> &local_names, const vector<LogicalType> &global_types,
211 const vector<string> &global_names, const vector<column_t> &global_column_ids,
212 MultiFileReaderData &reader_data, const string &initial_file) {
213 D_ASSERT(global_types.size() == global_names.size());
214 D_ASSERT(local_types.size() == local_names.size());
215 // we have expected types: create a map of name -> column index
216 case_insensitive_map_t<idx_t> name_map;
217 for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) {
218 name_map[local_names[col_idx]] = col_idx;
219 }
220 for (idx_t i = 0; i < global_column_ids.size(); i++) {
221 // check if this is a constant column
222 bool constant = false;
223 for (auto &entry : reader_data.constant_map) {
224 if (entry.column_id == i) {
225 constant = true;
226 break;
227 }
228 }
229 if (constant) {
230 // this column is constant for this file
231 continue;
232 }
233 // not constant - look up the column in the name map
234 auto global_id = global_column_ids[i];
235 if (global_id >= global_types.size()) {
236 throw InternalException(
237 "MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file");
238 }
239 auto &global_name = global_names[global_id];
240 auto entry = name_map.find(x: global_name);
241 if (entry == name_map.end()) {
242 string candidate_names;
243 for (auto &local_name : local_names) {
244 if (!candidate_names.empty()) {
245 candidate_names += ", ";
246 }
247 candidate_names += local_name;
248 }
249 throw IOException(
250 StringUtil::Format(fmt_str: "Failed to read file \"%s\": schema mismatch in glob: column \"%s\" was read from "
251 "the original file \"%s\", but could not be found in file \"%s\".\nCandidate names: "
252 "%s\nIf you are trying to "
253 "read files with different schemas, try setting union_by_name=True",
254 params: file_name, params: global_name, params: initial_file, params: file_name, params: candidate_names));
255 }
256 // we found the column in the local file - check if the types are the same
257 auto local_id = entry->second;
258 D_ASSERT(global_id < global_types.size());
259 D_ASSERT(local_id < local_types.size());
260 auto &global_type = global_types[global_id];
261 auto &local_type = local_types[local_id];
262 if (global_type != local_type) {
263 reader_data.cast_map[local_id] = global_type;
264 }
265 // the types are the same - create the mapping
266 reader_data.column_mapping.push_back(x: i);
267 reader_data.column_ids.push_back(x: local_id);
268 }
269 reader_data.empty_columns = reader_data.column_ids.empty();
270}
271
272void MultiFileReader::CreateMapping(const string &file_name, const vector<LogicalType> &local_types,
273 const vector<string> &local_names, const vector<LogicalType> &global_types,
274 const vector<string> &global_names, const vector<column_t> &global_column_ids,
275 optional_ptr<TableFilterSet> filters, MultiFileReaderData &reader_data,
276 const string &initial_file) {
277 CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data,
278 initial_file);
279 if (filters) {
280 reader_data.filter_map.resize(new_size: global_types.size());
281 for (idx_t c = 0; c < reader_data.column_mapping.size(); c++) {
282 auto map_index = reader_data.column_mapping[c];
283 reader_data.filter_map[map_index].index = c;
284 reader_data.filter_map[map_index].is_constant = false;
285 }
286 for (idx_t c = 0; c < reader_data.constant_map.size(); c++) {
287 auto constant_index = reader_data.constant_map[c].column_id;
288 reader_data.filter_map[constant_index].index = c;
289 reader_data.filter_map[constant_index].is_constant = true;
290 }
291 }
292}
293
294void MultiFileReader::FinalizeChunk(const MultiFileReaderBindData &bind_data, const MultiFileReaderData &reader_data,
295 DataChunk &chunk) {
296 // reference all the constants set up in MultiFileReader::FinalizeBind
297 for (auto &entry : reader_data.constant_map) {
298 chunk.data[entry.column_id].Reference(value: entry.value);
299 }
300 chunk.Verify();
301}
302
303TableFunctionSet MultiFileReader::CreateFunctionSet(TableFunction table_function) {
304 TableFunctionSet function_set(table_function.name);
305 function_set.AddFunction(function: table_function);
306 D_ASSERT(table_function.arguments.size() == 1 && table_function.arguments[0] == LogicalType::VARCHAR);
307 table_function.arguments[0] = LogicalType::LIST(child: LogicalType::VARCHAR);
308 function_set.AddFunction(function: std::move(table_function));
309 return function_set;
310}
311
312void MultiFileReaderOptions::Serialize(Serializer &serializer) const {
313 FieldWriter writer(serializer);
314 writer.WriteField<bool>(element: filename);
315 writer.WriteField<bool>(element: hive_partitioning);
316 writer.WriteField<bool>(element: union_by_name);
317 writer.Finalize();
318}
319
320MultiFileReaderOptions MultiFileReaderOptions::Deserialize(Deserializer &source) {
321 MultiFileReaderOptions result;
322 FieldReader reader(source);
323 result.filename = reader.ReadRequired<bool>();
324 result.hive_partitioning = reader.ReadRequired<bool>();
325 result.union_by_name = reader.ReadRequired<bool>();
326 reader.Finalize();
327 return result;
328}
329
330void MultiFileReaderBindData::Serialize(Serializer &serializer) const {
331 FieldWriter writer(serializer);
332 writer.WriteField(element: filename_idx);
333 writer.WriteRegularSerializableList<HivePartitioningIndex>(elements: hive_partitioning_indexes);
334 writer.Finalize();
335}
336
337MultiFileReaderBindData MultiFileReaderBindData::Deserialize(Deserializer &source) {
338 MultiFileReaderBindData result;
339 FieldReader reader(source);
340 result.filename_idx = reader.ReadRequired<idx_t>();
341 result.hive_partitioning_indexes =
342 reader.ReadRequiredSerializableList<HivePartitioningIndex, HivePartitioningIndex>();
343 reader.Finalize();
344 return result;
345}
346
347HivePartitioningIndex::HivePartitioningIndex(string value_p, idx_t index) : value(std::move(value_p)), index(index) {
348}
349
350void HivePartitioningIndex::Serialize(Serializer &serializer) const {
351 FieldWriter writer(serializer);
352 writer.WriteString(val: value);
353 writer.WriteField<idx_t>(element: index);
354 writer.Finalize();
355}
356
357HivePartitioningIndex HivePartitioningIndex::Deserialize(Deserializer &source) {
358 FieldReader reader(source);
359 auto value = reader.ReadRequired<string>();
360 auto index = reader.ReadRequired<idx_t>();
361 reader.Finalize();
362 return HivePartitioningIndex(std::move(value), index);
363}
364
365void MultiFileReaderOptions::AddBatchInfo(BindInfo &bind_info) const {
366 bind_info.InsertOption(name: "filename", value: Value::BOOLEAN(value: filename));
367 bind_info.InsertOption(name: "hive_partitioning", value: Value::BOOLEAN(value: hive_partitioning));
368 bind_info.InsertOption(name: "union_by_name", value: Value::BOOLEAN(value: union_by_name));
369}
370
371void UnionByName::CombineUnionTypes(const vector<string> &col_names, const vector<LogicalType> &sql_types,
372 vector<LogicalType> &union_col_types, vector<string> &union_col_names,
373 case_insensitive_map_t<idx_t> &union_names_map) {
374 D_ASSERT(col_names.size() == sql_types.size());
375
376 for (idx_t col = 0; col < col_names.size(); ++col) {
377 auto union_find = union_names_map.find(x: col_names[col]);
378
379 if (union_find != union_names_map.end()) {
380 // given same name , union_col's type must compatible with col's type
381 auto &current_type = union_col_types[union_find->second];
382 LogicalType compatible_type;
383 compatible_type = LogicalType::MaxLogicalType(left: current_type, right: sql_types[col]);
384 union_col_types[union_find->second] = compatible_type;
385 } else {
386 union_names_map[col_names[col]] = union_col_names.size();
387 union_col_names.emplace_back(args: col_names[col]);
388 union_col_types.emplace_back(args: sql_types[col]);
389 }
390 }
391}
392
393} // namespace duckdb
394