1 | #include "duckdb/catalog/catalog.hpp" |
2 | #include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp" |
3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
4 | #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" |
5 | #include "duckdb/common/bind_helpers.hpp" |
6 | #include "duckdb/common/filename_pattern.hpp" |
7 | #include "duckdb/common/local_file_system.hpp" |
8 | #include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp" |
9 | #include "duckdb/function/table/read_csv.hpp" |
10 | #include "duckdb/main/client_context.hpp" |
11 | #include "duckdb/main/database.hpp" |
12 | #include "duckdb/parser/expression/columnref_expression.hpp" |
13 | #include "duckdb/parser/expression/star_expression.hpp" |
14 | #include "duckdb/parser/query_node/select_node.hpp" |
15 | #include "duckdb/parser/statement/copy_statement.hpp" |
16 | #include "duckdb/parser/statement/insert_statement.hpp" |
17 | #include "duckdb/parser/tableref/basetableref.hpp" |
18 | #include "duckdb/planner/binder.hpp" |
19 | #include "duckdb/planner/operator/logical_copy_to_file.hpp" |
20 | #include "duckdb/planner/operator/logical_get.hpp" |
21 | #include "duckdb/planner/operator/logical_insert.hpp" |
22 | |
23 | #include <algorithm> |
24 | |
25 | namespace duckdb { |
26 | |
27 | vector<string> GetUniqueNames(const vector<string> &original_names) { |
28 | unordered_set<string> name_set; |
29 | vector<string> unique_names; |
30 | unique_names.reserve(n: original_names.size()); |
31 | |
32 | for (auto &name : original_names) { |
33 | auto insert_result = name_set.insert(x: name); |
34 | if (insert_result.second == false) { |
35 | // Could not be inserted, name already exists |
36 | idx_t index = 1; |
37 | string postfixed_name; |
38 | while (true) { |
39 | postfixed_name = StringUtil::Format(fmt_str: "%s:%d" , params: name, params: index); |
40 | auto res = name_set.insert(x: postfixed_name); |
41 | if (!res.second) { |
42 | index++; |
43 | continue; |
44 | } |
45 | break; |
46 | } |
47 | unique_names.push_back(x: postfixed_name); |
48 | } else { |
49 | unique_names.push_back(x: name); |
50 | } |
51 | } |
52 | return unique_names; |
53 | } |
54 | |
55 | BoundStatement Binder::BindCopyTo(CopyStatement &stmt) { |
56 | // COPY TO a file |
57 | auto &config = DBConfig::GetConfig(context); |
58 | if (!config.options.enable_external_access) { |
59 | throw PermissionException("COPY TO is disabled by configuration" ); |
60 | } |
61 | BoundStatement result; |
62 | result.types = {LogicalType::BIGINT}; |
63 | result.names = {"Count" }; |
64 | |
65 | // lookup the format in the catalog |
66 | auto ©_function = |
67 | Catalog::GetEntry<CopyFunctionCatalogEntry>(context, INVALID_CATALOG, DEFAULT_SCHEMA, name: stmt.info->format); |
68 | if (copy_function.function.plan) { |
69 | // plan rewrite COPY TO |
70 | return copy_function.function.plan(*this, stmt); |
71 | } |
72 | |
73 | // bind the select statement |
74 | auto select_node = Bind(node&: *stmt.select_statement); |
75 | |
76 | if (!copy_function.function.copy_to_bind) { |
77 | throw NotImplementedException("COPY TO is not supported for FORMAT \"%s\"" , stmt.info->format); |
78 | } |
79 | bool use_tmp_file = true; |
80 | bool overwrite_or_ignore = false; |
81 | FilenamePattern filename_pattern; |
82 | bool user_set_use_tmp_file = false; |
83 | bool per_thread_output = false; |
84 | vector<idx_t> partition_cols; |
85 | |
86 | auto original_options = stmt.info->options; |
87 | stmt.info->options.clear(); |
88 | |
89 | for (auto &option : original_options) { |
90 | auto loption = StringUtil::Lower(str: option.first); |
91 | if (loption == "use_tmp_file" ) { |
92 | use_tmp_file = |
93 | option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>(); |
94 | user_set_use_tmp_file = true; |
95 | continue; |
96 | } |
97 | if (loption == "overwrite_or_ignore" ) { |
98 | overwrite_or_ignore = |
99 | option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>(); |
100 | continue; |
101 | } |
102 | if (loption == "filename_pattern" ) { |
103 | if (option.second.empty()) { |
104 | throw IOException("FILENAME_PATTERN cannot be empty" ); |
105 | } |
106 | filename_pattern.SetFilenamePattern( |
107 | option.second[0].CastAs(context, target_type: LogicalType::VARCHAR).GetValue<string>()); |
108 | continue; |
109 | } |
110 | |
111 | if (loption == "per_thread_output" ) { |
112 | per_thread_output = |
113 | option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>(); |
114 | continue; |
115 | } |
116 | if (loption == "partition_by" ) { |
117 | auto converted = ConvertVectorToValue(set: std::move(option.second)); |
118 | partition_cols = ParseColumnsOrdered(value: converted, names&: select_node.names, loption); |
119 | continue; |
120 | } |
121 | stmt.info->options[option.first] = option.second; |
122 | } |
123 | if (user_set_use_tmp_file && per_thread_output) { |
124 | throw NotImplementedException("Can't combine USE_TMP_FILE and PER_THREAD_OUTPUT for COPY" ); |
125 | } |
126 | if (user_set_use_tmp_file && !partition_cols.empty()) { |
127 | throw NotImplementedException("Can't combine USE_TMP_FILE and PARTITION_BY for COPY" ); |
128 | } |
129 | if (per_thread_output && !partition_cols.empty()) { |
130 | throw NotImplementedException("Can't combine PER_THREAD_OUTPUT and PARTITION_BY for COPY" ); |
131 | } |
132 | bool is_remote_file = config.file_system->IsRemoteFile(path: stmt.info->file_path); |
133 | if (is_remote_file) { |
134 | use_tmp_file = false; |
135 | } else { |
136 | bool is_file_and_exists = config.file_system->FileExists(filename: stmt.info->file_path); |
137 | bool is_stdout = stmt.info->file_path == "/dev/stdout" ; |
138 | if (!user_set_use_tmp_file) { |
139 | use_tmp_file = is_file_and_exists && !per_thread_output && partition_cols.empty() && !is_stdout; |
140 | } |
141 | } |
142 | |
143 | auto unique_column_names = GetUniqueNames(original_names: select_node.names); |
144 | |
145 | auto function_data = |
146 | copy_function.function.copy_to_bind(context, *stmt.info, unique_column_names, select_node.types); |
147 | // now create the copy information |
148 | auto copy = make_uniq<LogicalCopyToFile>(args&: copy_function.function, args: std::move(function_data)); |
149 | copy->file_path = stmt.info->file_path; |
150 | copy->use_tmp_file = use_tmp_file; |
151 | copy->overwrite_or_ignore = overwrite_or_ignore; |
152 | copy->filename_pattern = filename_pattern; |
153 | copy->per_thread_output = per_thread_output; |
154 | copy->partition_output = !partition_cols.empty(); |
155 | copy->partition_columns = std::move(partition_cols); |
156 | |
157 | copy->names = unique_column_names; |
158 | copy->expected_types = select_node.types; |
159 | |
160 | copy->AddChild(child: std::move(select_node.plan)); |
161 | |
162 | result.plan = std::move(copy); |
163 | |
164 | return result; |
165 | } |
166 | |
167 | BoundStatement Binder::BindCopyFrom(CopyStatement &stmt) { |
168 | auto &config = DBConfig::GetConfig(context); |
169 | if (!config.options.enable_external_access) { |
170 | throw PermissionException("COPY FROM is disabled by configuration" ); |
171 | } |
172 | BoundStatement result; |
173 | result.types = {LogicalType::BIGINT}; |
174 | result.names = {"Count" }; |
175 | |
176 | if (stmt.info->table.empty()) { |
177 | throw ParserException("COPY FROM requires a table name to be specified" ); |
178 | } |
179 | // COPY FROM a file |
180 | // generate an insert statement for the the to-be-inserted table |
181 | InsertStatement insert; |
182 | insert.table = stmt.info->table; |
183 | insert.schema = stmt.info->schema; |
184 | insert.catalog = stmt.info->catalog; |
185 | insert.columns = stmt.info->select_list; |
186 | |
187 | // bind the insert statement to the base table |
188 | auto insert_statement = Bind(stmt&: insert); |
189 | D_ASSERT(insert_statement.plan->type == LogicalOperatorType::LOGICAL_INSERT); |
190 | |
191 | auto &bound_insert = insert_statement.plan->Cast<LogicalInsert>(); |
192 | |
193 | // lookup the format in the catalog |
194 | auto &catalog = Catalog::GetSystemCatalog(context); |
195 | auto ©_function = catalog.GetEntry<CopyFunctionCatalogEntry>(context, DEFAULT_SCHEMA, name: stmt.info->format); |
196 | if (!copy_function.function.copy_from_bind) { |
197 | throw NotImplementedException("COPY FROM is not supported for FORMAT \"%s\"" , stmt.info->format); |
198 | } |
199 | // lookup the table to copy into |
200 | BindSchemaOrCatalog(catalog_name&: stmt.info->catalog, schema_name&: stmt.info->schema); |
201 | auto &table = |
202 | Catalog::GetEntry<TableCatalogEntry>(context, catalog_name: stmt.info->catalog, schema_name: stmt.info->schema, name: stmt.info->table); |
203 | vector<string> expected_names; |
204 | if (!bound_insert.column_index_map.empty()) { |
205 | expected_names.resize(new_size: bound_insert.expected_types.size()); |
206 | for (auto &col : table.GetColumns().Physical()) { |
207 | auto i = col.Physical(); |
208 | if (bound_insert.column_index_map[i] != DConstants::INVALID_INDEX) { |
209 | expected_names[bound_insert.column_index_map[i]] = col.Name(); |
210 | } |
211 | } |
212 | } else { |
213 | expected_names.reserve(n: bound_insert.expected_types.size()); |
214 | for (auto &col : table.GetColumns().Physical()) { |
215 | expected_names.push_back(x: col.Name()); |
216 | } |
217 | } |
218 | |
219 | auto function_data = |
220 | copy_function.function.copy_from_bind(context, *stmt.info, expected_names, bound_insert.expected_types); |
221 | auto get = make_uniq<LogicalGet>(args: GenerateTableIndex(), args&: copy_function.function.copy_from_function, |
222 | args: std::move(function_data), args&: bound_insert.expected_types, args&: expected_names); |
223 | for (idx_t i = 0; i < bound_insert.expected_types.size(); i++) { |
224 | get->column_ids.push_back(x: i); |
225 | } |
226 | insert_statement.plan->children.push_back(x: std::move(get)); |
227 | result.plan = std::move(insert_statement.plan); |
228 | return result; |
229 | } |
230 | |
231 | BoundStatement Binder::Bind(CopyStatement &stmt) { |
232 | if (!stmt.info->is_from && !stmt.select_statement) { |
233 | // copy table into file without a query |
234 | // generate SELECT * FROM table; |
235 | auto ref = make_uniq<BaseTableRef>(); |
236 | ref->catalog_name = stmt.info->catalog; |
237 | ref->schema_name = stmt.info->schema; |
238 | ref->table_name = stmt.info->table; |
239 | |
240 | auto statement = make_uniq<SelectNode>(); |
241 | statement->from_table = std::move(ref); |
242 | if (!stmt.info->select_list.empty()) { |
243 | for (auto &name : stmt.info->select_list) { |
244 | statement->select_list.push_back(x: make_uniq<ColumnRefExpression>(args&: name)); |
245 | } |
246 | } else { |
247 | statement->select_list.push_back(x: make_uniq<StarExpression>()); |
248 | } |
249 | stmt.select_statement = std::move(statement); |
250 | } |
251 | properties.allow_stream_result = false; |
252 | properties.return_type = StatementReturnType::CHANGED_ROWS; |
253 | if (stmt.info->is_from) { |
254 | return BindCopyFrom(stmt); |
255 | } else { |
256 | return BindCopyTo(stmt); |
257 | } |
258 | } |
259 | |
260 | } // namespace duckdb |
261 | |