| 1 | #include "duckdb/catalog/catalog.hpp" |
| 2 | #include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp" |
| 3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
| 4 | #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" |
| 5 | #include "duckdb/common/bind_helpers.hpp" |
| 6 | #include "duckdb/common/filename_pattern.hpp" |
| 7 | #include "duckdb/common/local_file_system.hpp" |
| 8 | #include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp" |
| 9 | #include "duckdb/function/table/read_csv.hpp" |
| 10 | #include "duckdb/main/client_context.hpp" |
| 11 | #include "duckdb/main/database.hpp" |
| 12 | #include "duckdb/parser/expression/columnref_expression.hpp" |
| 13 | #include "duckdb/parser/expression/star_expression.hpp" |
| 14 | #include "duckdb/parser/query_node/select_node.hpp" |
| 15 | #include "duckdb/parser/statement/copy_statement.hpp" |
| 16 | #include "duckdb/parser/statement/insert_statement.hpp" |
| 17 | #include "duckdb/parser/tableref/basetableref.hpp" |
| 18 | #include "duckdb/planner/binder.hpp" |
| 19 | #include "duckdb/planner/operator/logical_copy_to_file.hpp" |
| 20 | #include "duckdb/planner/operator/logical_get.hpp" |
| 21 | #include "duckdb/planner/operator/logical_insert.hpp" |
| 22 | |
| 23 | #include <algorithm> |
| 24 | |
| 25 | namespace duckdb { |
| 26 | |
| 27 | vector<string> GetUniqueNames(const vector<string> &original_names) { |
| 28 | unordered_set<string> name_set; |
| 29 | vector<string> unique_names; |
| 30 | unique_names.reserve(n: original_names.size()); |
| 31 | |
| 32 | for (auto &name : original_names) { |
| 33 | auto insert_result = name_set.insert(x: name); |
| 34 | if (insert_result.second == false) { |
| 35 | // Could not be inserted, name already exists |
| 36 | idx_t index = 1; |
| 37 | string postfixed_name; |
| 38 | while (true) { |
| 39 | postfixed_name = StringUtil::Format(fmt_str: "%s:%d" , params: name, params: index); |
| 40 | auto res = name_set.insert(x: postfixed_name); |
| 41 | if (!res.second) { |
| 42 | index++; |
| 43 | continue; |
| 44 | } |
| 45 | break; |
| 46 | } |
| 47 | unique_names.push_back(x: postfixed_name); |
| 48 | } else { |
| 49 | unique_names.push_back(x: name); |
| 50 | } |
| 51 | } |
| 52 | return unique_names; |
| 53 | } |
| 54 | |
| 55 | BoundStatement Binder::BindCopyTo(CopyStatement &stmt) { |
| 56 | // COPY TO a file |
| 57 | auto &config = DBConfig::GetConfig(context); |
| 58 | if (!config.options.enable_external_access) { |
| 59 | throw PermissionException("COPY TO is disabled by configuration" ); |
| 60 | } |
| 61 | BoundStatement result; |
| 62 | result.types = {LogicalType::BIGINT}; |
| 63 | result.names = {"Count" }; |
| 64 | |
| 65 | // lookup the format in the catalog |
| 66 | auto ©_function = |
| 67 | Catalog::GetEntry<CopyFunctionCatalogEntry>(context, INVALID_CATALOG, DEFAULT_SCHEMA, name: stmt.info->format); |
| 68 | if (copy_function.function.plan) { |
| 69 | // plan rewrite COPY TO |
| 70 | return copy_function.function.plan(*this, stmt); |
| 71 | } |
| 72 | |
| 73 | // bind the select statement |
| 74 | auto select_node = Bind(node&: *stmt.select_statement); |
| 75 | |
| 76 | if (!copy_function.function.copy_to_bind) { |
| 77 | throw NotImplementedException("COPY TO is not supported for FORMAT \"%s\"" , stmt.info->format); |
| 78 | } |
| 79 | bool use_tmp_file = true; |
| 80 | bool overwrite_or_ignore = false; |
| 81 | FilenamePattern filename_pattern; |
| 82 | bool user_set_use_tmp_file = false; |
| 83 | bool per_thread_output = false; |
| 84 | vector<idx_t> partition_cols; |
| 85 | |
| 86 | auto original_options = stmt.info->options; |
| 87 | stmt.info->options.clear(); |
| 88 | |
| 89 | for (auto &option : original_options) { |
| 90 | auto loption = StringUtil::Lower(str: option.first); |
| 91 | if (loption == "use_tmp_file" ) { |
| 92 | use_tmp_file = |
| 93 | option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>(); |
| 94 | user_set_use_tmp_file = true; |
| 95 | continue; |
| 96 | } |
| 97 | if (loption == "overwrite_or_ignore" ) { |
| 98 | overwrite_or_ignore = |
| 99 | option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>(); |
| 100 | continue; |
| 101 | } |
| 102 | if (loption == "filename_pattern" ) { |
| 103 | if (option.second.empty()) { |
| 104 | throw IOException("FILENAME_PATTERN cannot be empty" ); |
| 105 | } |
| 106 | filename_pattern.SetFilenamePattern( |
| 107 | option.second[0].CastAs(context, target_type: LogicalType::VARCHAR).GetValue<string>()); |
| 108 | continue; |
| 109 | } |
| 110 | |
| 111 | if (loption == "per_thread_output" ) { |
| 112 | per_thread_output = |
| 113 | option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>(); |
| 114 | continue; |
| 115 | } |
| 116 | if (loption == "partition_by" ) { |
| 117 | auto converted = ConvertVectorToValue(set: std::move(option.second)); |
| 118 | partition_cols = ParseColumnsOrdered(value: converted, names&: select_node.names, loption); |
| 119 | continue; |
| 120 | } |
| 121 | stmt.info->options[option.first] = option.second; |
| 122 | } |
| 123 | if (user_set_use_tmp_file && per_thread_output) { |
| 124 | throw NotImplementedException("Can't combine USE_TMP_FILE and PER_THREAD_OUTPUT for COPY" ); |
| 125 | } |
| 126 | if (user_set_use_tmp_file && !partition_cols.empty()) { |
| 127 | throw NotImplementedException("Can't combine USE_TMP_FILE and PARTITION_BY for COPY" ); |
| 128 | } |
| 129 | if (per_thread_output && !partition_cols.empty()) { |
| 130 | throw NotImplementedException("Can't combine PER_THREAD_OUTPUT and PARTITION_BY for COPY" ); |
| 131 | } |
| 132 | bool is_remote_file = config.file_system->IsRemoteFile(path: stmt.info->file_path); |
| 133 | if (is_remote_file) { |
| 134 | use_tmp_file = false; |
| 135 | } else { |
| 136 | bool is_file_and_exists = config.file_system->FileExists(filename: stmt.info->file_path); |
| 137 | bool is_stdout = stmt.info->file_path == "/dev/stdout" ; |
| 138 | if (!user_set_use_tmp_file) { |
| 139 | use_tmp_file = is_file_and_exists && !per_thread_output && partition_cols.empty() && !is_stdout; |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | auto unique_column_names = GetUniqueNames(original_names: select_node.names); |
| 144 | |
| 145 | auto function_data = |
| 146 | copy_function.function.copy_to_bind(context, *stmt.info, unique_column_names, select_node.types); |
| 147 | // now create the copy information |
| 148 | auto copy = make_uniq<LogicalCopyToFile>(args&: copy_function.function, args: std::move(function_data)); |
| 149 | copy->file_path = stmt.info->file_path; |
| 150 | copy->use_tmp_file = use_tmp_file; |
| 151 | copy->overwrite_or_ignore = overwrite_or_ignore; |
| 152 | copy->filename_pattern = filename_pattern; |
| 153 | copy->per_thread_output = per_thread_output; |
| 154 | copy->partition_output = !partition_cols.empty(); |
| 155 | copy->partition_columns = std::move(partition_cols); |
| 156 | |
| 157 | copy->names = unique_column_names; |
| 158 | copy->expected_types = select_node.types; |
| 159 | |
| 160 | copy->AddChild(child: std::move(select_node.plan)); |
| 161 | |
| 162 | result.plan = std::move(copy); |
| 163 | |
| 164 | return result; |
| 165 | } |
| 166 | |
| 167 | BoundStatement Binder::BindCopyFrom(CopyStatement &stmt) { |
| 168 | auto &config = DBConfig::GetConfig(context); |
| 169 | if (!config.options.enable_external_access) { |
| 170 | throw PermissionException("COPY FROM is disabled by configuration" ); |
| 171 | } |
| 172 | BoundStatement result; |
| 173 | result.types = {LogicalType::BIGINT}; |
| 174 | result.names = {"Count" }; |
| 175 | |
| 176 | if (stmt.info->table.empty()) { |
| 177 | throw ParserException("COPY FROM requires a table name to be specified" ); |
| 178 | } |
| 179 | // COPY FROM a file |
| 180 | // generate an insert statement for the the to-be-inserted table |
| 181 | InsertStatement insert; |
| 182 | insert.table = stmt.info->table; |
| 183 | insert.schema = stmt.info->schema; |
| 184 | insert.catalog = stmt.info->catalog; |
| 185 | insert.columns = stmt.info->select_list; |
| 186 | |
| 187 | // bind the insert statement to the base table |
| 188 | auto insert_statement = Bind(stmt&: insert); |
| 189 | D_ASSERT(insert_statement.plan->type == LogicalOperatorType::LOGICAL_INSERT); |
| 190 | |
| 191 | auto &bound_insert = insert_statement.plan->Cast<LogicalInsert>(); |
| 192 | |
| 193 | // lookup the format in the catalog |
| 194 | auto &catalog = Catalog::GetSystemCatalog(context); |
| 195 | auto ©_function = catalog.GetEntry<CopyFunctionCatalogEntry>(context, DEFAULT_SCHEMA, name: stmt.info->format); |
| 196 | if (!copy_function.function.copy_from_bind) { |
| 197 | throw NotImplementedException("COPY FROM is not supported for FORMAT \"%s\"" , stmt.info->format); |
| 198 | } |
| 199 | // lookup the table to copy into |
| 200 | BindSchemaOrCatalog(catalog_name&: stmt.info->catalog, schema_name&: stmt.info->schema); |
| 201 | auto &table = |
| 202 | Catalog::GetEntry<TableCatalogEntry>(context, catalog_name: stmt.info->catalog, schema_name: stmt.info->schema, name: stmt.info->table); |
| 203 | vector<string> expected_names; |
| 204 | if (!bound_insert.column_index_map.empty()) { |
| 205 | expected_names.resize(new_size: bound_insert.expected_types.size()); |
| 206 | for (auto &col : table.GetColumns().Physical()) { |
| 207 | auto i = col.Physical(); |
| 208 | if (bound_insert.column_index_map[i] != DConstants::INVALID_INDEX) { |
| 209 | expected_names[bound_insert.column_index_map[i]] = col.Name(); |
| 210 | } |
| 211 | } |
| 212 | } else { |
| 213 | expected_names.reserve(n: bound_insert.expected_types.size()); |
| 214 | for (auto &col : table.GetColumns().Physical()) { |
| 215 | expected_names.push_back(x: col.Name()); |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | auto function_data = |
| 220 | copy_function.function.copy_from_bind(context, *stmt.info, expected_names, bound_insert.expected_types); |
| 221 | auto get = make_uniq<LogicalGet>(args: GenerateTableIndex(), args&: copy_function.function.copy_from_function, |
| 222 | args: std::move(function_data), args&: bound_insert.expected_types, args&: expected_names); |
| 223 | for (idx_t i = 0; i < bound_insert.expected_types.size(); i++) { |
| 224 | get->column_ids.push_back(x: i); |
| 225 | } |
| 226 | insert_statement.plan->children.push_back(x: std::move(get)); |
| 227 | result.plan = std::move(insert_statement.plan); |
| 228 | return result; |
| 229 | } |
| 230 | |
| 231 | BoundStatement Binder::Bind(CopyStatement &stmt) { |
| 232 | if (!stmt.info->is_from && !stmt.select_statement) { |
| 233 | // copy table into file without a query |
| 234 | // generate SELECT * FROM table; |
| 235 | auto ref = make_uniq<BaseTableRef>(); |
| 236 | ref->catalog_name = stmt.info->catalog; |
| 237 | ref->schema_name = stmt.info->schema; |
| 238 | ref->table_name = stmt.info->table; |
| 239 | |
| 240 | auto statement = make_uniq<SelectNode>(); |
| 241 | statement->from_table = std::move(ref); |
| 242 | if (!stmt.info->select_list.empty()) { |
| 243 | for (auto &name : stmt.info->select_list) { |
| 244 | statement->select_list.push_back(x: make_uniq<ColumnRefExpression>(args&: name)); |
| 245 | } |
| 246 | } else { |
| 247 | statement->select_list.push_back(x: make_uniq<StarExpression>()); |
| 248 | } |
| 249 | stmt.select_statement = std::move(statement); |
| 250 | } |
| 251 | properties.allow_stream_result = false; |
| 252 | properties.return_type = StatementReturnType::CHANGED_ROWS; |
| 253 | if (stmt.info->is_from) { |
| 254 | return BindCopyFrom(stmt); |
| 255 | } else { |
| 256 | return BindCopyTo(stmt); |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | } // namespace duckdb |
| 261 | |