1#include "duckdb/execution/operator/persistent/physical_export.hpp"
2
3#include "duckdb/catalog/catalog.hpp"
4#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
5#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
6#include "duckdb/common/file_system.hpp"
7#include "duckdb/common/string_util.hpp"
8#include "duckdb/parallel/meta_pipeline.hpp"
9#include "duckdb/parallel/pipeline.hpp"
10#include "duckdb/parser/keyword_helper.hpp"
11#include "duckdb/transaction/transaction.hpp"
12
13#include <algorithm>
14#include <sstream>
15
16namespace duckdb {
17
18using std::stringstream;
19
20static void WriteCatalogEntries(stringstream &ss, vector<reference<CatalogEntry>> &entries) {
21 for (auto &entry : entries) {
22 if (entry.get().internal) {
23 continue;
24 }
25 ss << entry.get().ToSQL() << std::endl;
26 }
27 ss << std::endl;
28}
29
30static void WriteStringStreamToFile(FileSystem &fs, stringstream &ss, const string &path) {
31 auto ss_string = ss.str();
32 auto handle = fs.OpenFile(path, flags: FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE_NEW,
33 lock: FileLockType::WRITE_LOCK);
34 fs.Write(handle&: *handle, buffer: (void *)ss_string.c_str(), nr_bytes: ss_string.size());
35 handle.reset();
36}
37
38static void WriteValueAsSQL(stringstream &ss, Value &val) {
39 if (val.type().IsNumeric()) {
40 ss << val.ToString();
41 } else {
42 ss << "'" << val.ToString() << "'";
43 }
44}
45
46static void WriteCopyStatement(FileSystem &fs, stringstream &ss, CopyInfo &info, ExportedTableData &exported_table,
47 CopyFunction const &function) {
48 ss << "COPY ";
49
50 if (exported_table.schema_name != DEFAULT_SCHEMA) {
51 ss << KeywordHelper::WriteOptionallyQuoted(text: exported_table.schema_name) << ".";
52 }
53
54 ss << StringUtil::Format(fmt_str: "%s FROM %s (", params: SQLIdentifier(exported_table.table_name),
55 params: SQLString(exported_table.file_path));
56
57 // write the copy options
58 ss << "FORMAT '" << info.format << "'";
59 if (info.format == "csv") {
60 // insert default csv options, if not specified
61 if (info.options.find(x: "header") == info.options.end()) {
62 info.options["header"].push_back(x: Value::INTEGER(value: 0));
63 }
64 if (info.options.find(x: "delimiter") == info.options.end() && info.options.find(x: "sep") == info.options.end() &&
65 info.options.find(x: "delim") == info.options.end()) {
66 info.options["delimiter"].push_back(x: Value(","));
67 }
68 if (info.options.find(x: "quote") == info.options.end()) {
69 info.options["quote"].push_back(x: Value("\""));
70 }
71 }
72 for (auto &copy_option : info.options) {
73 if (copy_option.first == "force_quote") {
74 continue;
75 }
76 ss << ", " << copy_option.first << " ";
77 if (copy_option.second.size() == 1) {
78 WriteValueAsSQL(ss, val&: copy_option.second[0]);
79 } else {
80 // FIXME handle multiple options
81 throw NotImplementedException("FIXME: serialize list of options");
82 }
83 }
84 ss << ");" << std::endl;
85}
86
87//===--------------------------------------------------------------------===//
88// Source
89//===--------------------------------------------------------------------===//
90class ExportSourceState : public GlobalSourceState {
91public:
92 ExportSourceState() : finished(false) {
93 }
94
95 bool finished;
96};
97
98unique_ptr<GlobalSourceState> PhysicalExport::GetGlobalSourceState(ClientContext &context) const {
99 return make_uniq<ExportSourceState>();
100}
101
102SourceResultType PhysicalExport::GetData(ExecutionContext &context, DataChunk &chunk,
103 OperatorSourceInput &input) const {
104 auto &state = input.global_state.Cast<ExportSourceState>();
105 if (state.finished) {
106 return SourceResultType::FINISHED;
107 }
108
109 auto &ccontext = context.client;
110 auto &fs = FileSystem::GetFileSystem(context&: ccontext);
111
112 // gather all catalog types to export
113 vector<reference<CatalogEntry>> schemas;
114 vector<reference<CatalogEntry>> custom_types;
115 vector<reference<CatalogEntry>> sequences;
116 vector<reference<CatalogEntry>> tables;
117 vector<reference<CatalogEntry>> views;
118 vector<reference<CatalogEntry>> indexes;
119 vector<reference<CatalogEntry>> macros;
120
121 auto schema_list = Catalog::GetSchemas(context&: ccontext, catalog_name: info->catalog);
122 for (auto &schema_p : schema_list) {
123 auto &schema = schema_p.get();
124 if (!schema.internal) {
125 schemas.push_back(x: schema);
126 }
127 schema.Scan(context&: context.client, type: CatalogType::TABLE_ENTRY, callback: [&](CatalogEntry &entry) {
128 if (entry.internal) {
129 return;
130 }
131 if (entry.type != CatalogType::TABLE_ENTRY) {
132 views.push_back(x: entry);
133 }
134 });
135 schema.Scan(context&: context.client, type: CatalogType::SEQUENCE_ENTRY,
136 callback: [&](CatalogEntry &entry) { sequences.push_back(x: entry); });
137 schema.Scan(context&: context.client, type: CatalogType::TYPE_ENTRY,
138 callback: [&](CatalogEntry &entry) { custom_types.push_back(x: entry); });
139 schema.Scan(context&: context.client, type: CatalogType::INDEX_ENTRY, callback: [&](CatalogEntry &entry) { indexes.push_back(x: entry); });
140 schema.Scan(context&: context.client, type: CatalogType::MACRO_ENTRY, callback: [&](CatalogEntry &entry) {
141 if (!entry.internal && entry.type == CatalogType::MACRO_ENTRY) {
142 macros.push_back(x: entry);
143 }
144 });
145 schema.Scan(context&: context.client, type: CatalogType::TABLE_MACRO_ENTRY, callback: [&](CatalogEntry &entry) {
146 if (!entry.internal && entry.type == CatalogType::TABLE_MACRO_ENTRY) {
147 macros.push_back(x: entry);
148 }
149 });
150 }
151
152 // consider the order of tables because of foreign key constraint
153 for (idx_t i = 0; i < exported_tables.data.size(); i++) {
154 tables.push_back(x: exported_tables.data[i].entry);
155 }
156
157 // order macro's by timestamp so nested macro's are imported nicely
158 sort(first: macros.begin(), last: macros.end(), comp: [](const reference<CatalogEntry> &lhs, const reference<CatalogEntry> &rhs) {
159 return lhs.get().oid < rhs.get().oid;
160 });
161
162 // write the schema.sql file
163 // export order is SCHEMA -> SEQUENCE -> TABLE -> VIEW -> INDEX
164
165 stringstream ss;
166 WriteCatalogEntries(ss, entries&: schemas);
167 WriteCatalogEntries(ss, entries&: custom_types);
168 WriteCatalogEntries(ss, entries&: sequences);
169 WriteCatalogEntries(ss, entries&: tables);
170 WriteCatalogEntries(ss, entries&: views);
171 WriteCatalogEntries(ss, entries&: indexes);
172 WriteCatalogEntries(ss, entries&: macros);
173
174 WriteStringStreamToFile(fs, ss, path: fs.JoinPath(a: info->file_path, path: "schema.sql"));
175
176 // write the load.sql file
177 // for every table, we write COPY INTO statement with the specified options
178 stringstream load_ss;
179 for (idx_t i = 0; i < exported_tables.data.size(); i++) {
180 auto exported_table_info = exported_tables.data[i].table_data;
181 WriteCopyStatement(fs, ss&: load_ss, info&: *info, exported_table&: exported_table_info, function);
182 }
183 WriteStringStreamToFile(fs, ss&: load_ss, path: fs.JoinPath(a: info->file_path, path: "load.sql"));
184 state.finished = true;
185
186 return SourceResultType::FINISHED;
187}
188
189//===--------------------------------------------------------------------===//
190// Sink
191//===--------------------------------------------------------------------===//
192SinkResultType PhysicalExport::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
193 // nop
194 return SinkResultType::NEED_MORE_INPUT;
195}
196
197//===--------------------------------------------------------------------===//
198// Pipeline Construction
199//===--------------------------------------------------------------------===//
200void PhysicalExport::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
201 // EXPORT has an optional child
202 // we only need to schedule child pipelines if there is a child
203 auto &state = meta_pipeline.GetState();
204 state.SetPipelineSource(pipeline&: current, op&: *this);
205 if (children.empty()) {
206 return;
207 }
208 PhysicalOperator::BuildPipelines(current, meta_pipeline);
209}
210
211vector<const_reference<PhysicalOperator>> PhysicalExport::GetSources() const {
212 return {*this};
213}
214
215} // namespace duckdb
216