| 1 | #include "duckdb/common/multi_file_reader.hpp" |
| 2 | #include "duckdb/function/table_function.hpp" |
| 3 | #include "duckdb/main/config.hpp" |
| 4 | #include "duckdb/common/types/value.hpp" |
| 5 | #include "duckdb/planner/operator/logical_get.hpp" |
| 6 | #include "duckdb/common/exception.hpp" |
| 7 | #include "duckdb/function/function_set.hpp" |
| 8 | #include "duckdb/common/hive_partitioning.hpp" |
| 9 | |
| 10 | namespace duckdb { |
| 11 | |
| 12 | void MultiFileReader::AddParameters(TableFunction &table_function) { |
| 13 | table_function.named_parameters["filename" ] = LogicalType::BOOLEAN; |
| 14 | table_function.named_parameters["hive_partitioning" ] = LogicalType::BOOLEAN; |
| 15 | table_function.named_parameters["union_by_name" ] = LogicalType::BOOLEAN; |
| 16 | } |
| 17 | |
| 18 | vector<string> MultiFileReader::GetFileList(ClientContext &context, const Value &input, const string &name, |
| 19 | FileGlobOptions options) { |
| 20 | auto &config = DBConfig::GetConfig(context); |
| 21 | if (!config.options.enable_external_access) { |
| 22 | throw PermissionException("Scanning %s files is disabled through configuration" , name); |
| 23 | } |
| 24 | if (input.IsNull()) { |
| 25 | throw ParserException("%s reader cannot take NULL list as parameter" , name); |
| 26 | } |
| 27 | FileSystem &fs = FileSystem::GetFileSystem(context); |
| 28 | vector<string> files; |
| 29 | if (input.type().id() == LogicalTypeId::VARCHAR) { |
| 30 | auto file_name = StringValue::Get(value: input); |
| 31 | files = fs.GlobFiles(pattern: file_name, context, options); |
| 32 | } else if (input.type().id() == LogicalTypeId::LIST) { |
| 33 | for (auto &val : ListValue::GetChildren(value: input)) { |
| 34 | if (val.IsNull()) { |
| 35 | throw ParserException("%s reader cannot take NULL input as parameter" , name); |
| 36 | } |
| 37 | if (val.type().id() != LogicalTypeId::VARCHAR) { |
| 38 | throw ParserException("%s reader can only take a list of strings as a parameter" , name); |
| 39 | } |
| 40 | auto glob_files = fs.GlobFiles(pattern: StringValue::Get(value: val), context, options); |
| 41 | files.insert(position: files.end(), first: glob_files.begin(), last: glob_files.end()); |
| 42 | } |
| 43 | } else { |
| 44 | throw InternalException("Unsupported type for MultiFileReader::GetFileList" ); |
| 45 | } |
| 46 | if (files.empty() && options == FileGlobOptions::DISALLOW_EMPTY) { |
| 47 | throw IOException("%s reader needs at least one file to read" , name); |
| 48 | } |
| 49 | return files; |
| 50 | } |
| 51 | |
| 52 | bool MultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options) { |
| 53 | auto loption = StringUtil::Lower(str: key); |
| 54 | if (loption == "filename" ) { |
| 55 | options.filename = BooleanValue::Get(value: val); |
| 56 | } else if (loption == "hive_partitioning" ) { |
| 57 | options.hive_partitioning = BooleanValue::Get(value: val); |
| 58 | options.auto_detect_hive_partitioning = false; |
| 59 | } else if (loption == "union_by_name" ) { |
| 60 | options.union_by_name = BooleanValue::Get(value: val); |
| 61 | } else { |
| 62 | return false; |
| 63 | } |
| 64 | return true; |
| 65 | } |
| 66 | |
| 67 | bool MultiFileReader::ComplexFilterPushdown(ClientContext &context, vector<string> &files, |
| 68 | const MultiFileReaderOptions &options, LogicalGet &get, |
| 69 | vector<unique_ptr<Expression>> &filters) { |
| 70 | if (files.empty()) { |
| 71 | return false; |
| 72 | } |
| 73 | if (!options.hive_partitioning && !options.filename) { |
| 74 | return false; |
| 75 | } |
| 76 | |
| 77 | unordered_map<string, column_t> column_map; |
| 78 | for (idx_t i = 0; i < get.column_ids.size(); i++) { |
| 79 | column_map.insert(x: {get.names[get.column_ids[i]], i}); |
| 80 | } |
| 81 | |
| 82 | auto start_files = files.size(); |
| 83 | HivePartitioning::ApplyFiltersToFileList(context, files, filters, column_map, table_index: get.table_index, |
| 84 | hive_enabled: options.hive_partitioning, filename_enabled: options.filename); |
| 85 | if (files.size() != start_files) { |
| 86 | // we have pruned files |
| 87 | return true; |
| 88 | } |
| 89 | return false; |
| 90 | } |
| 91 | |
| 92 | MultiFileReaderBindData MultiFileReader::BindOptions(MultiFileReaderOptions &options, const vector<string> &files, |
| 93 | vector<LogicalType> &return_types, vector<string> &names) { |
| 94 | MultiFileReaderBindData bind_data; |
| 95 | // Add generated constant column for filename |
| 96 | if (options.filename) { |
| 97 | if (std::find(first: names.begin(), last: names.end(), val: "filename" ) != names.end()) { |
| 98 | throw BinderException("Using filename option on file with column named filename is not supported" ); |
| 99 | } |
| 100 | bind_data.filename_idx = names.size(); |
| 101 | return_types.emplace_back(args: LogicalType::VARCHAR); |
| 102 | names.emplace_back(args: "filename" ); |
| 103 | } |
| 104 | |
| 105 | // Add generated constant columns from hive partitioning scheme |
| 106 | if (options.hive_partitioning) { |
| 107 | D_ASSERT(!files.empty()); |
| 108 | auto partitions = HivePartitioning::Parse(filename: files[0]); |
| 109 | // verify that all files have the same hive partitioning scheme |
| 110 | for (auto &f : files) { |
| 111 | auto file_partitions = HivePartitioning::Parse(filename: f); |
| 112 | for (auto &part_info : partitions) { |
| 113 | if (file_partitions.find(x: part_info.first) == file_partitions.end()) { |
| 114 | if (options.auto_detect_hive_partitioning == true) { |
| 115 | throw BinderException( |
| 116 | "Hive partitioning was enabled automatically, but an error was encountered: Hive partition " |
| 117 | "mismatch between file \"%s\" and \"%s\": key \"%s\" not found\n\nTo switch off hive " |
| 118 | "partition, set: HIVE_PARTITIONING=0" , |
| 119 | files[0], f, part_info.first); |
| 120 | } |
| 121 | throw BinderException( |
| 122 | "Hive partition mismatch between file \"%s\" and \"%s\": key \"%s\" not found" , files[0], f, |
| 123 | part_info.first); |
| 124 | } |
| 125 | } |
| 126 | if (partitions.size() != file_partitions.size()) { |
| 127 | if (options.auto_detect_hive_partitioning == true) { |
| 128 | throw BinderException("Hive partitioning was enabled automatically, but an error was encountered: " |
| 129 | "Hive partition mismatch between file \"%s\" and \"%s\"\n\nTo switch off " |
| 130 | "hive partition, set: HIVE_PARTITIONING=0" , |
| 131 | files[0], f); |
| 132 | } |
| 133 | throw BinderException("Hive partition mismatch between file \"%s\" and \"%s\"" , files[0], f); |
| 134 | } |
| 135 | } |
| 136 | for (auto &part : partitions) { |
| 137 | idx_t hive_partitioning_index = DConstants::INVALID_INDEX; |
| 138 | auto lookup = std::find(first: names.begin(), last: names.end(), val: part.first); |
| 139 | if (lookup != names.end()) { |
| 140 | // hive partitioning column also exists in file - override |
| 141 | auto idx = lookup - names.begin(); |
| 142 | hive_partitioning_index = idx; |
| 143 | return_types[idx] = LogicalType::VARCHAR; |
| 144 | } else { |
| 145 | // hive partitioning column does not exist in file - add a new column containing the key |
| 146 | hive_partitioning_index = names.size(); |
| 147 | return_types.emplace_back(args: LogicalType::VARCHAR); |
| 148 | names.emplace_back(args: part.first); |
| 149 | } |
| 150 | bind_data.hive_partitioning_indexes.emplace_back(args: part.first, args&: hive_partitioning_index); |
| 151 | } |
| 152 | } |
| 153 | return bind_data; |
| 154 | } |
| 155 | |
| 156 | void MultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, |
| 157 | const string &filename, const vector<string> &local_names, |
| 158 | const vector<LogicalType> &global_types, const vector<string> &global_names, |
| 159 | const vector<column_t> &global_column_ids, MultiFileReaderData &reader_data) { |
| 160 | // create a map of name -> column index |
| 161 | case_insensitive_map_t<idx_t> name_map; |
| 162 | if (file_options.union_by_name) { |
| 163 | for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { |
| 164 | name_map[local_names[col_idx]] = col_idx; |
| 165 | } |
| 166 | } |
| 167 | for (idx_t i = 0; i < global_column_ids.size(); i++) { |
| 168 | auto column_id = global_column_ids[i]; |
| 169 | if (IsRowIdColumnId(column_id)) { |
| 170 | // row-id |
| 171 | reader_data.constant_map.emplace_back(args&: i, args: Value::BIGINT(value: 42)); |
| 172 | continue; |
| 173 | } |
| 174 | if (column_id == options.filename_idx) { |
| 175 | // filename |
| 176 | reader_data.constant_map.emplace_back(args&: i, args: Value(filename)); |
| 177 | continue; |
| 178 | } |
| 179 | if (!options.hive_partitioning_indexes.empty()) { |
| 180 | // hive partition constants |
| 181 | auto partitions = HivePartitioning::Parse(filename); |
| 182 | D_ASSERT(partitions.size() == options.hive_partitioning_indexes.size()); |
| 183 | bool found_partition = false; |
| 184 | for (auto &entry : options.hive_partitioning_indexes) { |
| 185 | if (column_id == entry.index) { |
| 186 | reader_data.constant_map.emplace_back(args&: i, args: Value(partitions[entry.value])); |
| 187 | found_partition = true; |
| 188 | break; |
| 189 | } |
| 190 | } |
| 191 | if (found_partition) { |
| 192 | continue; |
| 193 | } |
| 194 | } |
| 195 | if (file_options.union_by_name) { |
| 196 | auto &global_name = global_names[column_id]; |
| 197 | auto entry = name_map.find(x: global_name); |
| 198 | bool not_present_in_file = entry == name_map.end(); |
| 199 | if (not_present_in_file) { |
| 200 | // we need to project a column with name \"global_name\" - but it does not exist in the current file |
| 201 | // push a NULL value of the specified type |
| 202 | reader_data.constant_map.emplace_back(args&: i, args: Value(global_types[column_id])); |
| 203 | continue; |
| 204 | } |
| 205 | } |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | void MultiFileReader::CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types, |
| 210 | const vector<string> &local_names, const vector<LogicalType> &global_types, |
| 211 | const vector<string> &global_names, const vector<column_t> &global_column_ids, |
| 212 | MultiFileReaderData &reader_data, const string &initial_file) { |
| 213 | D_ASSERT(global_types.size() == global_names.size()); |
| 214 | D_ASSERT(local_types.size() == local_names.size()); |
| 215 | // we have expected types: create a map of name -> column index |
| 216 | case_insensitive_map_t<idx_t> name_map; |
| 217 | for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { |
| 218 | name_map[local_names[col_idx]] = col_idx; |
| 219 | } |
| 220 | for (idx_t i = 0; i < global_column_ids.size(); i++) { |
| 221 | // check if this is a constant column |
| 222 | bool constant = false; |
| 223 | for (auto &entry : reader_data.constant_map) { |
| 224 | if (entry.column_id == i) { |
| 225 | constant = true; |
| 226 | break; |
| 227 | } |
| 228 | } |
| 229 | if (constant) { |
| 230 | // this column is constant for this file |
| 231 | continue; |
| 232 | } |
| 233 | // not constant - look up the column in the name map |
| 234 | auto global_id = global_column_ids[i]; |
| 235 | if (global_id >= global_types.size()) { |
| 236 | throw InternalException( |
| 237 | "MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file" ); |
| 238 | } |
| 239 | auto &global_name = global_names[global_id]; |
| 240 | auto entry = name_map.find(x: global_name); |
| 241 | if (entry == name_map.end()) { |
| 242 | string candidate_names; |
| 243 | for (auto &local_name : local_names) { |
| 244 | if (!candidate_names.empty()) { |
| 245 | candidate_names += ", " ; |
| 246 | } |
| 247 | candidate_names += local_name; |
| 248 | } |
| 249 | throw IOException( |
| 250 | StringUtil::Format(fmt_str: "Failed to read file \"%s\": schema mismatch in glob: column \"%s\" was read from " |
| 251 | "the original file \"%s\", but could not be found in file \"%s\".\nCandidate names: " |
| 252 | "%s\nIf you are trying to " |
| 253 | "read files with different schemas, try setting union_by_name=True" , |
| 254 | params: file_name, params: global_name, params: initial_file, params: file_name, params: candidate_names)); |
| 255 | } |
| 256 | // we found the column in the local file - check if the types are the same |
| 257 | auto local_id = entry->second; |
| 258 | D_ASSERT(global_id < global_types.size()); |
| 259 | D_ASSERT(local_id < local_types.size()); |
| 260 | auto &global_type = global_types[global_id]; |
| 261 | auto &local_type = local_types[local_id]; |
| 262 | if (global_type != local_type) { |
| 263 | reader_data.cast_map[local_id] = global_type; |
| 264 | } |
| 265 | // the types are the same - create the mapping |
| 266 | reader_data.column_mapping.push_back(x: i); |
| 267 | reader_data.column_ids.push_back(x: local_id); |
| 268 | } |
| 269 | reader_data.empty_columns = reader_data.column_ids.empty(); |
| 270 | } |
| 271 | |
| 272 | void MultiFileReader::CreateMapping(const string &file_name, const vector<LogicalType> &local_types, |
| 273 | const vector<string> &local_names, const vector<LogicalType> &global_types, |
| 274 | const vector<string> &global_names, const vector<column_t> &global_column_ids, |
| 275 | optional_ptr<TableFilterSet> filters, MultiFileReaderData &reader_data, |
| 276 | const string &initial_file) { |
| 277 | CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, |
| 278 | initial_file); |
| 279 | if (filters) { |
| 280 | reader_data.filter_map.resize(new_size: global_types.size()); |
| 281 | for (idx_t c = 0; c < reader_data.column_mapping.size(); c++) { |
| 282 | auto map_index = reader_data.column_mapping[c]; |
| 283 | reader_data.filter_map[map_index].index = c; |
| 284 | reader_data.filter_map[map_index].is_constant = false; |
| 285 | } |
| 286 | for (idx_t c = 0; c < reader_data.constant_map.size(); c++) { |
| 287 | auto constant_index = reader_data.constant_map[c].column_id; |
| 288 | reader_data.filter_map[constant_index].index = c; |
| 289 | reader_data.filter_map[constant_index].is_constant = true; |
| 290 | } |
| 291 | } |
| 292 | } |
| 293 | |
| 294 | void MultiFileReader::FinalizeChunk(const MultiFileReaderBindData &bind_data, const MultiFileReaderData &reader_data, |
| 295 | DataChunk &chunk) { |
| 296 | // reference all the constants set up in MultiFileReader::FinalizeBind |
| 297 | for (auto &entry : reader_data.constant_map) { |
| 298 | chunk.data[entry.column_id].Reference(value: entry.value); |
| 299 | } |
| 300 | chunk.Verify(); |
| 301 | } |
| 302 | |
| 303 | TableFunctionSet MultiFileReader::CreateFunctionSet(TableFunction table_function) { |
| 304 | TableFunctionSet function_set(table_function.name); |
| 305 | function_set.AddFunction(function: table_function); |
| 306 | D_ASSERT(table_function.arguments.size() == 1 && table_function.arguments[0] == LogicalType::VARCHAR); |
| 307 | table_function.arguments[0] = LogicalType::LIST(child: LogicalType::VARCHAR); |
| 308 | function_set.AddFunction(function: std::move(table_function)); |
| 309 | return function_set; |
| 310 | } |
| 311 | |
| 312 | void MultiFileReaderOptions::Serialize(Serializer &serializer) const { |
| 313 | FieldWriter writer(serializer); |
| 314 | writer.WriteField<bool>(element: filename); |
| 315 | writer.WriteField<bool>(element: hive_partitioning); |
| 316 | writer.WriteField<bool>(element: union_by_name); |
| 317 | writer.Finalize(); |
| 318 | } |
| 319 | |
| 320 | MultiFileReaderOptions MultiFileReaderOptions::Deserialize(Deserializer &source) { |
| 321 | MultiFileReaderOptions result; |
| 322 | FieldReader reader(source); |
| 323 | result.filename = reader.ReadRequired<bool>(); |
| 324 | result.hive_partitioning = reader.ReadRequired<bool>(); |
| 325 | result.union_by_name = reader.ReadRequired<bool>(); |
| 326 | reader.Finalize(); |
| 327 | return result; |
| 328 | } |
| 329 | |
| 330 | void MultiFileReaderBindData::Serialize(Serializer &serializer) const { |
| 331 | FieldWriter writer(serializer); |
| 332 | writer.WriteField(element: filename_idx); |
| 333 | writer.WriteRegularSerializableList<HivePartitioningIndex>(elements: hive_partitioning_indexes); |
| 334 | writer.Finalize(); |
| 335 | } |
| 336 | |
| 337 | MultiFileReaderBindData MultiFileReaderBindData::Deserialize(Deserializer &source) { |
| 338 | MultiFileReaderBindData result; |
| 339 | FieldReader reader(source); |
| 340 | result.filename_idx = reader.ReadRequired<idx_t>(); |
| 341 | result.hive_partitioning_indexes = |
| 342 | reader.ReadRequiredSerializableList<HivePartitioningIndex, HivePartitioningIndex>(); |
| 343 | reader.Finalize(); |
| 344 | return result; |
| 345 | } |
| 346 | |
| 347 | HivePartitioningIndex::HivePartitioningIndex(string value_p, idx_t index) : value(std::move(value_p)), index(index) { |
| 348 | } |
| 349 | |
| 350 | void HivePartitioningIndex::Serialize(Serializer &serializer) const { |
| 351 | FieldWriter writer(serializer); |
| 352 | writer.WriteString(val: value); |
| 353 | writer.WriteField<idx_t>(element: index); |
| 354 | writer.Finalize(); |
| 355 | } |
| 356 | |
| 357 | HivePartitioningIndex HivePartitioningIndex::Deserialize(Deserializer &source) { |
| 358 | FieldReader reader(source); |
| 359 | auto value = reader.ReadRequired<string>(); |
| 360 | auto index = reader.ReadRequired<idx_t>(); |
| 361 | reader.Finalize(); |
| 362 | return HivePartitioningIndex(std::move(value), index); |
| 363 | } |
| 364 | |
| 365 | void MultiFileReaderOptions::AddBatchInfo(BindInfo &bind_info) const { |
| 366 | bind_info.InsertOption(name: "filename" , value: Value::BOOLEAN(value: filename)); |
| 367 | bind_info.InsertOption(name: "hive_partitioning" , value: Value::BOOLEAN(value: hive_partitioning)); |
| 368 | bind_info.InsertOption(name: "union_by_name" , value: Value::BOOLEAN(value: union_by_name)); |
| 369 | } |
| 370 | |
| 371 | void UnionByName::CombineUnionTypes(const vector<string> &col_names, const vector<LogicalType> &sql_types, |
| 372 | vector<LogicalType> &union_col_types, vector<string> &union_col_names, |
| 373 | case_insensitive_map_t<idx_t> &union_names_map) { |
| 374 | D_ASSERT(col_names.size() == sql_types.size()); |
| 375 | |
| 376 | for (idx_t col = 0; col < col_names.size(); ++col) { |
| 377 | auto union_find = union_names_map.find(x: col_names[col]); |
| 378 | |
| 379 | if (union_find != union_names_map.end()) { |
| 380 | // given same name , union_col's type must compatible with col's type |
| 381 | auto ¤t_type = union_col_types[union_find->second]; |
| 382 | LogicalType compatible_type; |
| 383 | compatible_type = LogicalType::MaxLogicalType(left: current_type, right: sql_types[col]); |
| 384 | union_col_types[union_find->second] = compatible_type; |
| 385 | } else { |
| 386 | union_names_map[col_names[col]] = union_col_names.size(); |
| 387 | union_col_names.emplace_back(args: col_names[col]); |
| 388 | union_col_types.emplace_back(args: sql_types[col]); |
| 389 | } |
| 390 | } |
| 391 | } |
| 392 | |
| 393 | } // namespace duckdb |
| 394 | |