1#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
2#include "duckdb/execution/operator/persistent/physical_insert.hpp"
3#include "duckdb/execution/physical_plan_generator.hpp"
4#include "duckdb/planner/operator/logical_insert.hpp"
5#include "duckdb/storage/data_table.hpp"
6#include "duckdb/main/config.hpp"
7#include "duckdb/execution/operator/persistent/physical_batch_insert.hpp"
8#include "duckdb/parallel/task_scheduler.hpp"
9#include "duckdb/catalog/duck_catalog.hpp"
10
11namespace duckdb {
12
13static OrderPreservationType OrderPreservationRecursive(PhysicalOperator &op) {
14 if (op.IsSource()) {
15 return op.SourceOrder();
16 }
17 for (auto &child : op.children) {
18 auto child_preservation = OrderPreservationRecursive(op&: *child);
19 if (child_preservation != OrderPreservationType::INSERTION_ORDER) {
20 return child_preservation;
21 }
22 }
23 return OrderPreservationType::INSERTION_ORDER;
24}
25
26bool PhysicalPlanGenerator::PreserveInsertionOrder(ClientContext &context, PhysicalOperator &plan) {
27 auto &config = DBConfig::GetConfig(context);
28
29 auto preservation_type = OrderPreservationRecursive(op&: plan);
30 if (preservation_type == OrderPreservationType::FIXED_ORDER) {
31 // always need to maintain preservation order
32 return true;
33 }
34 if (preservation_type == OrderPreservationType::NO_ORDER) {
35 // never need to preserve order
36 return false;
37 }
38 // preserve insertion order - check flags
39 if (!config.options.preserve_insertion_order) {
40 // preserving insertion order is disabled by config
41 return false;
42 }
43 return true;
44}
45
46bool PhysicalPlanGenerator::PreserveInsertionOrder(PhysicalOperator &plan) {
47 return PreserveInsertionOrder(context, plan);
48}
49
50bool PhysicalPlanGenerator::UseBatchIndex(ClientContext &context, PhysicalOperator &plan) {
51 // TODO: always preserve order if query contains ORDER BY
52 auto &scheduler = TaskScheduler::GetScheduler(context);
53 if (scheduler.NumberOfThreads() == 1) {
54 // batch index usage only makes sense if we are using multiple threads
55 return false;
56 }
57 if (!plan.AllSourcesSupportBatchIndex()) {
58 // batch index is not supported
59 return false;
60 }
61 return true;
62}
63
64bool PhysicalPlanGenerator::UseBatchIndex(PhysicalOperator &plan) {
65 return UseBatchIndex(context, plan);
66}
67
68unique_ptr<PhysicalOperator> DuckCatalog::PlanInsert(ClientContext &context, LogicalInsert &op,
69 unique_ptr<PhysicalOperator> plan) {
70 bool parallel_streaming_insert = !PhysicalPlanGenerator::PreserveInsertionOrder(context, plan&: *plan);
71 bool use_batch_index = PhysicalPlanGenerator::UseBatchIndex(context, plan&: *plan);
72 auto num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
73 if (op.return_chunk) {
74 // not supported for RETURNING (yet?)
75 parallel_streaming_insert = false;
76 use_batch_index = false;
77 }
78 if (op.action_type != OnConflictAction::THROW) {
79 // We don't support ON CONFLICT clause in batch insertion operation currently
80 use_batch_index = false;
81 }
82 if (op.action_type == OnConflictAction::UPDATE) {
83 // When we potentially need to perform updates, we have to check that row is not updated twice
84 // that currently needs to be done for every chunk, which would add a huge bottleneck to parallelized insertion
85 parallel_streaming_insert = false;
86 }
87 unique_ptr<PhysicalOperator> insert;
88 if (use_batch_index && !parallel_streaming_insert) {
89 insert = make_uniq<PhysicalBatchInsert>(args&: op.types, args&: op.table, args&: op.column_index_map, args: std::move(op.bound_defaults),
90 args&: op.estimated_cardinality);
91 } else {
92 insert = make_uniq<PhysicalInsert>(
93 args&: op.types, args&: op.table, args&: op.column_index_map, args: std::move(op.bound_defaults), args: std::move(op.expressions),
94 args: std::move(op.set_columns), args: std::move(op.set_types), args&: op.estimated_cardinality, args&: op.return_chunk,
95 args: parallel_streaming_insert && num_threads > 1, args&: op.action_type, args: std::move(op.on_conflict_condition),
96 args: std::move(op.do_update_condition), args: std::move(op.on_conflict_filter), args: std::move(op.columns_to_fetch));
97 }
98 D_ASSERT(plan);
99 insert->children.push_back(x: std::move(plan));
100 return insert;
101}
102
103unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalInsert &op) {
104 unique_ptr<PhysicalOperator> plan;
105 if (!op.children.empty()) {
106 D_ASSERT(op.children.size() == 1);
107 plan = CreatePlan(op&: *op.children[0]);
108 }
109 dependencies.AddDependency(entry&: op.table);
110 return op.table.catalog.PlanInsert(context, op, plan: std::move(plan));
111}
112
113} // namespace duckdb
114