1#include "duckdb/catalog/catalog.hpp"
2#include "duckdb/parser/statement/export_statement.hpp"
3#include "duckdb/planner/binder.hpp"
4#include "duckdb/planner/operator/logical_export.hpp"
5#include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp"
6#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
7#include "duckdb/parser/statement/copy_statement.hpp"
8#include "duckdb/main/client_context.hpp"
9#include "duckdb/main/database.hpp"
10#include "duckdb/common/file_system.hpp"
11#include "duckdb/planner/operator/logical_set_operation.hpp"
12#include "duckdb/parser/parsed_data/exported_table_data.hpp"
13#include "duckdb/parser/constraints/foreign_key_constraint.hpp"
14
15#include "duckdb/common/string_util.hpp"
16#include <algorithm>
17
18namespace duckdb {
19
20//! Sanitizes a string to have only low case chars and underscores
21string SanitizeExportIdentifier(const string &str) {
22 // Copy the original string to result
23 string result(str);
24
25 for (idx_t i = 0; i < str.length(); ++i) {
26 auto c = str[i];
27 if (c >= 'a' && c <= 'z') {
28 // If it is lower case just continue
29 continue;
30 }
31
32 if (c >= 'A' && c <= 'Z') {
33 // To lowercase
34 result[i] = tolower(c: c);
35 } else {
36 // Substitute to underscore
37 result[i] = '_';
38 }
39 }
40
41 return result;
42}
43
44bool IsExistMainKeyTable(string &table_name, vector<reference<TableCatalogEntry>> &unordered) {
45 for (idx_t i = 0; i < unordered.size(); i++) {
46 if (unordered[i].get().name == table_name) {
47 return true;
48 }
49 }
50 return false;
51}
52
53void ScanForeignKeyTable(vector<reference<TableCatalogEntry>> &ordered, vector<reference<TableCatalogEntry>> &unordered,
54 bool move_only_pk_table) {
55 for (auto i = unordered.begin(); i != unordered.end();) {
56 auto table_entry = *i;
57 bool move_to_ordered = true;
58 auto &constraints = table_entry.get().GetConstraints();
59 for (idx_t j = 0; j < constraints.size(); j++) {
60 auto &cond = constraints[j];
61 if (cond->type == ConstraintType::FOREIGN_KEY) {
62 auto &fk = cond->Cast<ForeignKeyConstraint>();
63 if ((move_only_pk_table && fk.info.type == ForeignKeyType::FK_TYPE_FOREIGN_KEY_TABLE) ||
64 (!move_only_pk_table && fk.info.type == ForeignKeyType::FK_TYPE_FOREIGN_KEY_TABLE &&
65 IsExistMainKeyTable(table_name&: fk.info.table, unordered))) {
66 move_to_ordered = false;
67 break;
68 }
69 }
70 }
71 if (move_to_ordered) {
72 ordered.push_back(x: table_entry);
73 i = unordered.erase(position: i);
74 } else {
75 i++;
76 }
77 }
78}
79
80void ReorderTableEntries(vector<reference<TableCatalogEntry>> &tables) {
81 vector<reference<TableCatalogEntry>> ordered;
82 vector<reference<TableCatalogEntry>> unordered = tables;
83 ScanForeignKeyTable(ordered, unordered, move_only_pk_table: true);
84 while (!unordered.empty()) {
85 ScanForeignKeyTable(ordered, unordered, move_only_pk_table: false);
86 }
87 tables = ordered;
88}
89
90string CreateFileName(const string &id_suffix, TableCatalogEntry &table, const string &extension) {
91 auto name = SanitizeExportIdentifier(str: table.name);
92 if (table.schema.name == DEFAULT_SCHEMA) {
93 return StringUtil::Format(fmt_str: "%s%s.%s", params: name, params: id_suffix, params: extension);
94 }
95 auto schema = SanitizeExportIdentifier(str: table.schema.name);
96 return StringUtil::Format(fmt_str: "%s_%s%s.%s", params: schema, params: name, params: id_suffix, params: extension);
97}
98
99BoundStatement Binder::Bind(ExportStatement &stmt) {
100 // COPY TO a file
101 auto &config = DBConfig::GetConfig(context);
102 if (!config.options.enable_external_access) {
103 throw PermissionException("COPY TO is disabled through configuration");
104 }
105 BoundStatement result;
106 result.types = {LogicalType::BOOLEAN};
107 result.names = {"Success"};
108
109 // lookup the format in the catalog
110 auto &copy_function =
111 Catalog::GetEntry<CopyFunctionCatalogEntry>(context, INVALID_CATALOG, DEFAULT_SCHEMA, name: stmt.info->format);
112 if (!copy_function.function.copy_to_bind && !copy_function.function.plan) {
113 throw NotImplementedException("COPY TO is not supported for FORMAT \"%s\"", stmt.info->format);
114 }
115
116 // gather a list of all the tables
117 string catalog = stmt.database.empty() ? INVALID_CATALOG : stmt.database;
118 vector<reference<TableCatalogEntry>> tables;
119 auto schemas = Catalog::GetSchemas(context, catalog_name: catalog);
120 for (auto &schema : schemas) {
121 schema.get().Scan(context, type: CatalogType::TABLE_ENTRY, callback: [&](CatalogEntry &entry) {
122 if (entry.type == CatalogType::TABLE_ENTRY) {
123 tables.push_back(x: entry.Cast<TableCatalogEntry>());
124 }
125 });
126 }
127
128 // reorder tables because of foreign key constraint
129 ReorderTableEntries(tables);
130
131 // now generate the COPY statements for each of the tables
132 auto &fs = FileSystem::GetFileSystem(context);
133 unique_ptr<LogicalOperator> child_operator;
134
135 BoundExportData exported_tables;
136
137 unordered_set<string> table_name_index;
138 for (auto &t : tables) {
139 auto &table = t.get();
140 auto info = make_uniq<CopyInfo>();
141 // we copy the options supplied to the EXPORT
142 info->format = stmt.info->format;
143 info->options = stmt.info->options;
144 // set up the file name for the COPY TO
145
146 idx_t id = 0;
147 while (true) {
148 string id_suffix = id == 0 ? string() : "_" + to_string(val: id);
149 auto name = CreateFileName(id_suffix, table, extension: copy_function.function.extension);
150 auto directory = stmt.info->file_path;
151 auto full_path = fs.JoinPath(a: directory, path: name);
152 info->file_path = full_path;
153 auto insert_result = table_name_index.insert(x: info->file_path);
154 if (insert_result.second == true) {
155 // this name was not yet taken: take it
156 break;
157 }
158 id++;
159 }
160 info->is_from = false;
161 info->catalog = catalog;
162 info->schema = table.schema.name;
163 info->table = table.name;
164
165 // We can not export generated columns
166 for (auto &col : table.GetColumns().Physical()) {
167 info->select_list.push_back(x: col.GetName());
168 }
169
170 ExportedTableData exported_data;
171 exported_data.database_name = catalog;
172 exported_data.table_name = info->table;
173 exported_data.schema_name = info->schema;
174
175 exported_data.file_path = info->file_path;
176
177 ExportedTableInfo table_info(table, std::move(exported_data));
178 exported_tables.data.push_back(x: table_info);
179 id++;
180
181 // generate the copy statement and bind it
182 CopyStatement copy_stmt;
183 copy_stmt.info = std::move(info);
184
185 auto copy_binder = Binder::CreateBinder(context, parent: this);
186 auto bound_statement = copy_binder->Bind(stmt&: copy_stmt);
187 if (child_operator) {
188 // use UNION ALL to combine the individual copy statements into a single node
189 auto copy_union =
190 make_uniq<LogicalSetOperation>(args: GenerateTableIndex(), args: 1, args: std::move(child_operator),
191 args: std::move(bound_statement.plan), args: LogicalOperatorType::LOGICAL_UNION);
192 child_operator = std::move(copy_union);
193 } else {
194 child_operator = std::move(bound_statement.plan);
195 }
196 }
197
198 // try to create the directory, if it doesn't exist yet
199 // a bit hacky to do it here, but we need to create the directory BEFORE the copy statements run
200 if (!fs.DirectoryExists(directory: stmt.info->file_path)) {
201 fs.CreateDirectory(directory: stmt.info->file_path);
202 }
203
204 // create the export node
205 auto export_node = make_uniq<LogicalExport>(args&: copy_function.function, args: std::move(stmt.info), args&: exported_tables);
206
207 if (child_operator) {
208 export_node->children.push_back(x: std::move(child_operator));
209 }
210
211 result.plan = std::move(export_node);
212 properties.allow_stream_result = false;
213 properties.return_type = StatementReturnType::NOTHING;
214 return result;
215}
216
217} // namespace duckdb
218