1#include "duckdb/catalog/catalog.hpp"
2#include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp"
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp"
5#include "duckdb/common/bind_helpers.hpp"
6#include "duckdb/common/filename_pattern.hpp"
7#include "duckdb/common/local_file_system.hpp"
8#include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp"
9#include "duckdb/function/table/read_csv.hpp"
10#include "duckdb/main/client_context.hpp"
11#include "duckdb/main/database.hpp"
12#include "duckdb/parser/expression/columnref_expression.hpp"
13#include "duckdb/parser/expression/star_expression.hpp"
14#include "duckdb/parser/query_node/select_node.hpp"
15#include "duckdb/parser/statement/copy_statement.hpp"
16#include "duckdb/parser/statement/insert_statement.hpp"
17#include "duckdb/parser/tableref/basetableref.hpp"
18#include "duckdb/planner/binder.hpp"
19#include "duckdb/planner/operator/logical_copy_to_file.hpp"
20#include "duckdb/planner/operator/logical_get.hpp"
21#include "duckdb/planner/operator/logical_insert.hpp"
22
23#include <algorithm>
24
25namespace duckdb {
26
27vector<string> GetUniqueNames(const vector<string> &original_names) {
28 unordered_set<string> name_set;
29 vector<string> unique_names;
30 unique_names.reserve(n: original_names.size());
31
32 for (auto &name : original_names) {
33 auto insert_result = name_set.insert(x: name);
34 if (insert_result.second == false) {
35 // Could not be inserted, name already exists
36 idx_t index = 1;
37 string postfixed_name;
38 while (true) {
39 postfixed_name = StringUtil::Format(fmt_str: "%s:%d", params: name, params: index);
40 auto res = name_set.insert(x: postfixed_name);
41 if (!res.second) {
42 index++;
43 continue;
44 }
45 break;
46 }
47 unique_names.push_back(x: postfixed_name);
48 } else {
49 unique_names.push_back(x: name);
50 }
51 }
52 return unique_names;
53}
54
55BoundStatement Binder::BindCopyTo(CopyStatement &stmt) {
56 // COPY TO a file
57 auto &config = DBConfig::GetConfig(context);
58 if (!config.options.enable_external_access) {
59 throw PermissionException("COPY TO is disabled by configuration");
60 }
61 BoundStatement result;
62 result.types = {LogicalType::BIGINT};
63 result.names = {"Count"};
64
65 // lookup the format in the catalog
66 auto &copy_function =
67 Catalog::GetEntry<CopyFunctionCatalogEntry>(context, INVALID_CATALOG, DEFAULT_SCHEMA, name: stmt.info->format);
68 if (copy_function.function.plan) {
69 // plan rewrite COPY TO
70 return copy_function.function.plan(*this, stmt);
71 }
72
73 // bind the select statement
74 auto select_node = Bind(node&: *stmt.select_statement);
75
76 if (!copy_function.function.copy_to_bind) {
77 throw NotImplementedException("COPY TO is not supported for FORMAT \"%s\"", stmt.info->format);
78 }
79 bool use_tmp_file = true;
80 bool overwrite_or_ignore = false;
81 FilenamePattern filename_pattern;
82 bool user_set_use_tmp_file = false;
83 bool per_thread_output = false;
84 vector<idx_t> partition_cols;
85
86 auto original_options = stmt.info->options;
87 stmt.info->options.clear();
88
89 for (auto &option : original_options) {
90 auto loption = StringUtil::Lower(str: option.first);
91 if (loption == "use_tmp_file") {
92 use_tmp_file =
93 option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>();
94 user_set_use_tmp_file = true;
95 continue;
96 }
97 if (loption == "overwrite_or_ignore") {
98 overwrite_or_ignore =
99 option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>();
100 continue;
101 }
102 if (loption == "filename_pattern") {
103 if (option.second.empty()) {
104 throw IOException("FILENAME_PATTERN cannot be empty");
105 }
106 filename_pattern.SetFilenamePattern(
107 option.second[0].CastAs(context, target_type: LogicalType::VARCHAR).GetValue<string>());
108 continue;
109 }
110
111 if (loption == "per_thread_output") {
112 per_thread_output =
113 option.second.empty() || option.second[0].CastAs(context, target_type: LogicalType::BOOLEAN).GetValue<bool>();
114 continue;
115 }
116 if (loption == "partition_by") {
117 auto converted = ConvertVectorToValue(set: std::move(option.second));
118 partition_cols = ParseColumnsOrdered(value: converted, names&: select_node.names, loption);
119 continue;
120 }
121 stmt.info->options[option.first] = option.second;
122 }
123 if (user_set_use_tmp_file && per_thread_output) {
124 throw NotImplementedException("Can't combine USE_TMP_FILE and PER_THREAD_OUTPUT for COPY");
125 }
126 if (user_set_use_tmp_file && !partition_cols.empty()) {
127 throw NotImplementedException("Can't combine USE_TMP_FILE and PARTITION_BY for COPY");
128 }
129 if (per_thread_output && !partition_cols.empty()) {
130 throw NotImplementedException("Can't combine PER_THREAD_OUTPUT and PARTITION_BY for COPY");
131 }
132 bool is_remote_file = config.file_system->IsRemoteFile(path: stmt.info->file_path);
133 if (is_remote_file) {
134 use_tmp_file = false;
135 } else {
136 bool is_file_and_exists = config.file_system->FileExists(filename: stmt.info->file_path);
137 bool is_stdout = stmt.info->file_path == "/dev/stdout";
138 if (!user_set_use_tmp_file) {
139 use_tmp_file = is_file_and_exists && !per_thread_output && partition_cols.empty() && !is_stdout;
140 }
141 }
142
143 auto unique_column_names = GetUniqueNames(original_names: select_node.names);
144
145 auto function_data =
146 copy_function.function.copy_to_bind(context, *stmt.info, unique_column_names, select_node.types);
147 // now create the copy information
148 auto copy = make_uniq<LogicalCopyToFile>(args&: copy_function.function, args: std::move(function_data));
149 copy->file_path = stmt.info->file_path;
150 copy->use_tmp_file = use_tmp_file;
151 copy->overwrite_or_ignore = overwrite_or_ignore;
152 copy->filename_pattern = filename_pattern;
153 copy->per_thread_output = per_thread_output;
154 copy->partition_output = !partition_cols.empty();
155 copy->partition_columns = std::move(partition_cols);
156
157 copy->names = unique_column_names;
158 copy->expected_types = select_node.types;
159
160 copy->AddChild(child: std::move(select_node.plan));
161
162 result.plan = std::move(copy);
163
164 return result;
165}
166
167BoundStatement Binder::BindCopyFrom(CopyStatement &stmt) {
168 auto &config = DBConfig::GetConfig(context);
169 if (!config.options.enable_external_access) {
170 throw PermissionException("COPY FROM is disabled by configuration");
171 }
172 BoundStatement result;
173 result.types = {LogicalType::BIGINT};
174 result.names = {"Count"};
175
176 if (stmt.info->table.empty()) {
177 throw ParserException("COPY FROM requires a table name to be specified");
178 }
179 // COPY FROM a file
180 // generate an insert statement for the the to-be-inserted table
181 InsertStatement insert;
182 insert.table = stmt.info->table;
183 insert.schema = stmt.info->schema;
184 insert.catalog = stmt.info->catalog;
185 insert.columns = stmt.info->select_list;
186
187 // bind the insert statement to the base table
188 auto insert_statement = Bind(stmt&: insert);
189 D_ASSERT(insert_statement.plan->type == LogicalOperatorType::LOGICAL_INSERT);
190
191 auto &bound_insert = insert_statement.plan->Cast<LogicalInsert>();
192
193 // lookup the format in the catalog
194 auto &catalog = Catalog::GetSystemCatalog(context);
195 auto &copy_function = catalog.GetEntry<CopyFunctionCatalogEntry>(context, DEFAULT_SCHEMA, name: stmt.info->format);
196 if (!copy_function.function.copy_from_bind) {
197 throw NotImplementedException("COPY FROM is not supported for FORMAT \"%s\"", stmt.info->format);
198 }
199 // lookup the table to copy into
200 BindSchemaOrCatalog(catalog_name&: stmt.info->catalog, schema_name&: stmt.info->schema);
201 auto &table =
202 Catalog::GetEntry<TableCatalogEntry>(context, catalog_name: stmt.info->catalog, schema_name: stmt.info->schema, name: stmt.info->table);
203 vector<string> expected_names;
204 if (!bound_insert.column_index_map.empty()) {
205 expected_names.resize(new_size: bound_insert.expected_types.size());
206 for (auto &col : table.GetColumns().Physical()) {
207 auto i = col.Physical();
208 if (bound_insert.column_index_map[i] != DConstants::INVALID_INDEX) {
209 expected_names[bound_insert.column_index_map[i]] = col.Name();
210 }
211 }
212 } else {
213 expected_names.reserve(n: bound_insert.expected_types.size());
214 for (auto &col : table.GetColumns().Physical()) {
215 expected_names.push_back(x: col.Name());
216 }
217 }
218
219 auto function_data =
220 copy_function.function.copy_from_bind(context, *stmt.info, expected_names, bound_insert.expected_types);
221 auto get = make_uniq<LogicalGet>(args: GenerateTableIndex(), args&: copy_function.function.copy_from_function,
222 args: std::move(function_data), args&: bound_insert.expected_types, args&: expected_names);
223 for (idx_t i = 0; i < bound_insert.expected_types.size(); i++) {
224 get->column_ids.push_back(x: i);
225 }
226 insert_statement.plan->children.push_back(x: std::move(get));
227 result.plan = std::move(insert_statement.plan);
228 return result;
229}
230
231BoundStatement Binder::Bind(CopyStatement &stmt) {
232 if (!stmt.info->is_from && !stmt.select_statement) {
233 // copy table into file without a query
234 // generate SELECT * FROM table;
235 auto ref = make_uniq<BaseTableRef>();
236 ref->catalog_name = stmt.info->catalog;
237 ref->schema_name = stmt.info->schema;
238 ref->table_name = stmt.info->table;
239
240 auto statement = make_uniq<SelectNode>();
241 statement->from_table = std::move(ref);
242 if (!stmt.info->select_list.empty()) {
243 for (auto &name : stmt.info->select_list) {
244 statement->select_list.push_back(x: make_uniq<ColumnRefExpression>(args&: name));
245 }
246 } else {
247 statement->select_list.push_back(x: make_uniq<StarExpression>());
248 }
249 stmt.select_statement = std::move(statement);
250 }
251 properties.allow_stream_result = false;
252 properties.return_type = StatementReturnType::CHANGED_ROWS;
253 if (stmt.info->is_from) {
254 return BindCopyFrom(stmt);
255 } else {
256 return BindCopyTo(stmt);
257 }
258}
259
260} // namespace duckdb
261