1#include "duckdb/execution/operator/join/physical_positional_join.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/operator/join/physical_join.hpp"
5
6namespace duckdb {
7
8PhysicalPositionalJoin::PhysicalPositionalJoin(vector<LogicalType> types, unique_ptr<PhysicalOperator> left,
9 unique_ptr<PhysicalOperator> right, idx_t estimated_cardinality)
10 : PhysicalOperator(PhysicalOperatorType::POSITIONAL_JOIN, std::move(types), estimated_cardinality) {
11 children.push_back(x: std::move(left));
12 children.push_back(x: std::move(right));
13}
14
15//===--------------------------------------------------------------------===//
16// Sink
17//===--------------------------------------------------------------------===//
18class PositionalJoinGlobalState : public GlobalSinkState {
19public:
20 explicit PositionalJoinGlobalState(ClientContext &context, const PhysicalPositionalJoin &op)
21 : rhs(context, op.children[1]->GetTypes()), initialized(false), source_offset(0), exhausted(false) {
22 rhs.InitializeAppend(state&: append_state);
23 }
24
25 ColumnDataCollection rhs;
26 ColumnDataAppendState append_state;
27 mutex rhs_lock;
28
29 bool initialized;
30 ColumnDataScanState scan_state;
31 DataChunk source;
32 idx_t source_offset;
33 bool exhausted;
34
35 void InitializeScan();
36 idx_t Refill();
37 idx_t CopyData(DataChunk &output, const idx_t count, const idx_t col_offset);
38 void Execute(DataChunk &input, DataChunk &output);
39 void GetData(DataChunk &output);
40};
41
42unique_ptr<GlobalSinkState> PhysicalPositionalJoin::GetGlobalSinkState(ClientContext &context) const {
43 return make_uniq<PositionalJoinGlobalState>(args&: context, args: *this);
44}
45
46SinkResultType PhysicalPositionalJoin::Sink(ExecutionContext &context, DataChunk &chunk,
47 OperatorSinkInput &input) const {
48 auto &sink = input.global_state.Cast<PositionalJoinGlobalState>();
49 lock_guard<mutex> client_guard(sink.rhs_lock);
50 sink.rhs.Append(state&: sink.append_state, new_chunk&: chunk);
51 return SinkResultType::NEED_MORE_INPUT;
52}
53
54//===--------------------------------------------------------------------===//
55// Operator
56//===--------------------------------------------------------------------===//
57void PositionalJoinGlobalState::InitializeScan() {
58 if (!initialized) {
59 // not initialized yet: initialize the scan
60 initialized = true;
61 rhs.InitializeScanChunk(chunk&: source);
62 rhs.InitializeScan(state&: scan_state);
63 }
64}
65
66idx_t PositionalJoinGlobalState::Refill() {
67 if (source_offset >= source.size()) {
68 if (!exhausted) {
69 source.Reset();
70 rhs.Scan(state&: scan_state, result&: source);
71 }
72 source_offset = 0;
73 }
74
75 const auto available = source.size() - source_offset;
76 if (!available) {
77 if (!exhausted) {
78 source.Reset();
79 for (idx_t i = 0; i < source.ColumnCount(); ++i) {
80 auto &vec = source.data[i];
81 vec.SetVectorType(VectorType::CONSTANT_VECTOR);
82 ConstantVector::SetNull(vector&: vec, is_null: true);
83 }
84 exhausted = true;
85 }
86 }
87
88 return available;
89}
90
91idx_t PositionalJoinGlobalState::CopyData(DataChunk &output, const idx_t count, const idx_t col_offset) {
92 if (!source_offset && (source.size() >= count || exhausted)) {
93 // Fast track: aligned and has enough data
94 for (idx_t i = 0; i < source.ColumnCount(); ++i) {
95 output.data[col_offset + i].Reference(other&: source.data[i]);
96 }
97 source_offset += count;
98 } else {
99 // Copy data
100 for (idx_t target_offset = 0; target_offset < count;) {
101 const auto needed = count - target_offset;
102 const auto available = exhausted ? needed : (source.size() - source_offset);
103 const auto copy_size = MinValue(a: needed, b: available);
104 const auto source_count = source_offset + copy_size;
105 for (idx_t i = 0; i < source.ColumnCount(); ++i) {
106 VectorOperations::Copy(source: source.data[i], target&: output.data[col_offset + i], source_count, source_offset,
107 target_offset);
108 }
109 target_offset += copy_size;
110 source_offset += copy_size;
111 Refill();
112 }
113 }
114
115 return source.ColumnCount();
116}
117
118void PositionalJoinGlobalState::Execute(DataChunk &input, DataChunk &output) {
119 lock_guard<mutex> client_guard(rhs_lock);
120
121 // Reference the input and assume it will be full
122 const auto col_offset = input.ColumnCount();
123 for (idx_t i = 0; i < col_offset; ++i) {
124 output.data[i].Reference(other&: input.data[i]);
125 }
126
127 // Copy or reference the RHS columns
128 const auto count = input.size();
129 InitializeScan();
130 Refill();
131 CopyData(output, count, col_offset);
132
133 output.SetCardinality(count);
134}
135
136OperatorResultType PhysicalPositionalJoin::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
137 GlobalOperatorState &gstate, OperatorState &state_p) const {
138 auto &sink = sink_state->Cast<PositionalJoinGlobalState>();
139 sink.Execute(input, output&: chunk);
140 return OperatorResultType::NEED_MORE_INPUT;
141}
142
143//===--------------------------------------------------------------------===//
144// Source
145//===--------------------------------------------------------------------===//
146void PositionalJoinGlobalState::GetData(DataChunk &output) {
147 lock_guard<mutex> client_guard(rhs_lock);
148
149 InitializeScan();
150 Refill();
151
152 // LHS exhausted
153 if (exhausted) {
154 // RHS exhausted too, so we are done
155 output.SetCardinality(0);
156 return;
157 }
158
159 // LHS is all NULL
160 const auto col_offset = output.ColumnCount() - source.ColumnCount();
161 for (idx_t i = 0; i < col_offset; ++i) {
162 auto &vec = output.data[i];
163 vec.SetVectorType(VectorType::CONSTANT_VECTOR);
164 ConstantVector::SetNull(vector&: vec, is_null: true);
165 }
166
167 // RHS still has data, so copy it
168 const auto count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, b: source.size() - source_offset);
169 CopyData(output, count, col_offset);
170 output.SetCardinality(count);
171}
172
173SourceResultType PhysicalPositionalJoin::GetData(ExecutionContext &context, DataChunk &result,
174 OperatorSourceInput &input) const {
175 auto &sink = sink_state->Cast<PositionalJoinGlobalState>();
176 sink.GetData(output&: result);
177
178 return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
179}
180
181//===--------------------------------------------------------------------===//
182// Pipeline Construction
183//===--------------------------------------------------------------------===//
184void PhysicalPositionalJoin::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
185 PhysicalJoin::BuildJoinPipelines(current, meta_pipeline, op&: *this);
186}
187
188vector<const_reference<PhysicalOperator>> PhysicalPositionalJoin::GetSources() const {
189 auto result = children[0]->GetSources();
190 if (IsSource()) {
191 result.push_back(x: *this);
192 }
193 return result;
194}
195
196} // namespace duckdb
197