1#include "duckdb/execution/operator/scan/physical_positional_scan.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/string_util.hpp"
5#include "duckdb/parallel/interrupt.hpp"
6#include "duckdb/planner/expression/bound_conjunction_expression.hpp"
7#include "duckdb/transaction/transaction.hpp"
8
9#include <utility>
10
11namespace duckdb {
12
13PhysicalPositionalScan::PhysicalPositionalScan(vector<LogicalType> types, unique_ptr<PhysicalOperator> left,
14 unique_ptr<PhysicalOperator> right)
15 : PhysicalOperator(PhysicalOperatorType::POSITIONAL_SCAN, std::move(types),
16 MaxValue(a: left->estimated_cardinality, b: right->estimated_cardinality)) {
17
18 // Manage the children ourselves
19 if (left->type == PhysicalOperatorType::TABLE_SCAN) {
20 child_tables.emplace_back(args: std::move(left));
21 } else if (left->type == PhysicalOperatorType::POSITIONAL_SCAN) {
22 auto &left_scan = left->Cast<PhysicalPositionalScan>();
23 child_tables = std::move(left_scan.child_tables);
24 } else {
25 throw InternalException("Invalid left input for PhysicalPositionalScan");
26 }
27
28 if (right->type == PhysicalOperatorType::TABLE_SCAN) {
29 child_tables.emplace_back(args: std::move(right));
30 } else if (right->type == PhysicalOperatorType::POSITIONAL_SCAN) {
31 auto &right_scan = right->Cast<PhysicalPositionalScan>();
32 auto &right_tables = right_scan.child_tables;
33 child_tables.reserve(n: child_tables.size() + right_tables.size());
34 std::move(first: right_tables.begin(), last: right_tables.end(), result: std::back_inserter(x&: child_tables));
35 } else {
36 throw InternalException("Invalid right input for PhysicalPositionalScan");
37 }
38}
39
40class PositionalScanGlobalSourceState : public GlobalSourceState {
41public:
42 PositionalScanGlobalSourceState(ClientContext &context, const PhysicalPositionalScan &op) {
43 for (const auto &table : op.child_tables) {
44 global_states.emplace_back(args: table->GetGlobalSourceState(context));
45 }
46 }
47
48 vector<unique_ptr<GlobalSourceState>> global_states;
49
50 idx_t MaxThreads() override {
51 return 1;
52 }
53};
54
55class PositionalTableScanner {
56public:
57 PositionalTableScanner(ExecutionContext &context, PhysicalOperator &table_p, GlobalSourceState &gstate_p)
58 : table(table_p), global_state(gstate_p), source_offset(0), exhausted(false) {
59 local_state = table.GetLocalSourceState(context, gstate&: gstate_p);
60 source.Initialize(allocator&: Allocator::Get(context&: context.client), types: table.types);
61 }
62
63 idx_t Refill(ExecutionContext &context) {
64 if (source_offset >= source.size()) {
65 if (!exhausted) {
66 source.Reset();
67
68 InterruptState interrupt_state;
69 OperatorSourceInput source_input {.global_state: global_state, .local_state: *local_state, .interrupt_state: interrupt_state};
70 auto source_result = table.GetData(context, chunk&: source, input&: source_input);
71 if (source_result == SourceResultType::BLOCKED) {
72 throw NotImplementedException(
73 "Unexpected interrupt from table Source in PositionalTableScanner refill");
74 }
75 }
76 source_offset = 0;
77 }
78
79 const auto available = source.size() - source_offset;
80 if (!available) {
81 if (!exhausted) {
82 source.Reset();
83 for (idx_t i = 0; i < source.ColumnCount(); ++i) {
84 auto &vec = source.data[i];
85 vec.SetVectorType(VectorType::CONSTANT_VECTOR);
86 ConstantVector::SetNull(vector&: vec, is_null: true);
87 }
88 exhausted = true;
89 }
90 }
91
92 return available;
93 }
94
95 idx_t CopyData(ExecutionContext &context, DataChunk &output, const idx_t count, const idx_t col_offset) {
96 if (!source_offset && (source.size() >= count || exhausted)) {
97 // Fast track: aligned and has enough data
98 for (idx_t i = 0; i < source.ColumnCount(); ++i) {
99 output.data[col_offset + i].Reference(other&: source.data[i]);
100 }
101 source_offset += count;
102 } else {
103 // Copy data
104 for (idx_t target_offset = 0; target_offset < count;) {
105 const auto needed = count - target_offset;
106 const auto available = exhausted ? needed : (source.size() - source_offset);
107 const auto copy_size = MinValue(a: needed, b: available);
108 const auto source_count = source_offset + copy_size;
109 for (idx_t i = 0; i < source.ColumnCount(); ++i) {
110 VectorOperations::Copy(source: source.data[i], target&: output.data[col_offset + i], source_count, source_offset,
111 target_offset);
112 }
113 target_offset += copy_size;
114 source_offset += copy_size;
115 Refill(context);
116 }
117 }
118
119 return source.ColumnCount();
120 }
121
122 double GetProgress(ClientContext &context) {
123 return table.GetProgress(context, gstate&: global_state);
124 }
125
126 PhysicalOperator &table;
127 GlobalSourceState &global_state;
128 unique_ptr<LocalSourceState> local_state;
129 DataChunk source;
130 idx_t source_offset;
131 bool exhausted;
132};
133
134class PositionalScanLocalSourceState : public LocalSourceState {
135public:
136 PositionalScanLocalSourceState(ExecutionContext &context, PositionalScanGlobalSourceState &gstate,
137 const PhysicalPositionalScan &op) {
138 for (size_t i = 0; i < op.child_tables.size(); ++i) {
139 auto &child = *op.child_tables[i];
140 auto &global_state = *gstate.global_states[i];
141 scanners.emplace_back(args: make_uniq<PositionalTableScanner>(args&: context, args&: child, args&: global_state));
142 }
143 }
144
145 vector<unique_ptr<PositionalTableScanner>> scanners;
146};
147
148unique_ptr<LocalSourceState> PhysicalPositionalScan::GetLocalSourceState(ExecutionContext &context,
149 GlobalSourceState &gstate) const {
150 return make_uniq<PositionalScanLocalSourceState>(args&: context, args&: gstate.Cast<PositionalScanGlobalSourceState>(), args: *this);
151}
152
153unique_ptr<GlobalSourceState> PhysicalPositionalScan::GetGlobalSourceState(ClientContext &context) const {
154 return make_uniq<PositionalScanGlobalSourceState>(args&: context, args: *this);
155}
156
157SourceResultType PhysicalPositionalScan::GetData(ExecutionContext &context, DataChunk &output,
158 OperatorSourceInput &input) const {
159 auto &lstate = input.local_state.Cast<PositionalScanLocalSourceState>();
160
161 // Find the longest source block
162 idx_t count = 0;
163 for (auto &scanner : lstate.scanners) {
164 count = MaxValue(a: count, b: scanner->Refill(context));
165 }
166
167 // All done?
168 if (!count) {
169 return SourceResultType::FINISHED;
170 }
171
172 // Copy or reference the source columns
173 idx_t col_offset = 0;
174 for (auto &scanner : lstate.scanners) {
175 col_offset += scanner->CopyData(context, output, count, col_offset);
176 }
177
178 output.SetCardinality(count);
179 return SourceResultType::HAVE_MORE_OUTPUT;
180}
181
182double PhysicalPositionalScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const {
183 auto &gstate = gstate_p.Cast<PositionalScanGlobalSourceState>();
184
185 double result = child_tables[0]->GetProgress(context, gstate&: *gstate.global_states[0]);
186 for (size_t t = 1; t < child_tables.size(); ++t) {
187 result = MinValue(a: result, b: child_tables[t]->GetProgress(context, gstate&: *gstate.global_states[t]));
188 }
189
190 return result;
191}
192
193bool PhysicalPositionalScan::Equals(const PhysicalOperator &other_p) const {
194 if (type != other_p.type) {
195 return false;
196 }
197
198 auto &other = other_p.Cast<PhysicalPositionalScan>();
199 if (child_tables.size() != other.child_tables.size()) {
200 return false;
201 }
202 for (size_t i = 0; i < child_tables.size(); ++i) {
203 if (!child_tables[i]->Equals(other: *other.child_tables[i])) {
204 return false;
205 }
206 }
207
208 return true;
209}
210
211} // namespace duckdb
212