1#include "duckdb/catalog/catalog_entry/scalar_function_catalog_entry.hpp"
2#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
3#include "duckdb/execution/operator/schema/physical_create_table.hpp"
4#include "duckdb/execution/physical_plan_generator.hpp"
5#include "duckdb/parser/parsed_data/create_table_info.hpp"
6#include "duckdb/execution/operator/persistent/physical_insert.hpp"
7#include "duckdb/planner/expression/bound_function_expression.hpp"
8#include "duckdb/planner/operator/logical_create_table.hpp"
9#include "duckdb/main/config.hpp"
10#include "duckdb/execution/operator/persistent/physical_batch_insert.hpp"
11#include "duckdb/planner/constraints/bound_check_constraint.hpp"
12#include "duckdb/parallel/task_scheduler.hpp"
13#include "duckdb/catalog/duck_catalog.hpp"
14
15namespace duckdb {
16
17unique_ptr<PhysicalOperator> DuckCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op,
18 unique_ptr<PhysicalOperator> plan) {
19 bool parallel_streaming_insert = !PhysicalPlanGenerator::PreserveInsertionOrder(context, plan&: *plan);
20 bool use_batch_index = PhysicalPlanGenerator::UseBatchIndex(context, plan&: *plan);
21 auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
22 unique_ptr<PhysicalOperator> create;
23 if (!parallel_streaming_insert && use_batch_index) {
24 create = make_uniq<PhysicalBatchInsert>(args&: op, args&: op.schema, args: std::move(op.info), args&: op.estimated_cardinality);
25
26 } else {
27 create = make_uniq<PhysicalInsert>(args&: op, args&: op.schema, args: std::move(op.info), args&: op.estimated_cardinality,
28 args: parallel_streaming_insert && num_threads > 1);
29 }
30
31 D_ASSERT(op.children.size() == 1);
32 create->children.push_back(x: std::move(plan));
33 return create;
34}
35
36unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalCreateTable &op) {
37 const auto &create_info = op.info->base->Cast<CreateTableInfo>();
38 auto &catalog = op.info->schema.catalog;
39 auto existing_entry = catalog.GetEntry<TableCatalogEntry>(context, schema_name: create_info.schema, name: create_info.table,
40 if_not_found: OnEntryNotFound::RETURN_NULL);
41 bool replace = op.info->Base().on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT;
42 if ((!existing_entry || replace) && !op.children.empty()) {
43 auto plan = CreatePlan(op&: *op.children[0]);
44 return op.schema.catalog.PlanCreateTableAs(context, op, plan: std::move(plan));
45 } else {
46 return make_uniq<PhysicalCreateTable>(args&: op, args&: op.schema, args: std::move(op.info), args&: op.estimated_cardinality);
47 }
48}
49
50} // namespace duckdb
51