1 | #include "duckdb/execution/operator/join/physical_positional_join.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/execution/operator/join/physical_join.hpp" |
5 | |
6 | namespace duckdb { |
7 | |
8 | PhysicalPositionalJoin::PhysicalPositionalJoin(vector<LogicalType> types, unique_ptr<PhysicalOperator> left, |
9 | unique_ptr<PhysicalOperator> right, idx_t estimated_cardinality) |
10 | : PhysicalOperator(PhysicalOperatorType::POSITIONAL_JOIN, std::move(types), estimated_cardinality) { |
11 | children.push_back(x: std::move(left)); |
12 | children.push_back(x: std::move(right)); |
13 | } |
14 | |
15 | //===--------------------------------------------------------------------===// |
16 | // Sink |
17 | //===--------------------------------------------------------------------===// |
18 | class PositionalJoinGlobalState : public GlobalSinkState { |
19 | public: |
20 | explicit PositionalJoinGlobalState(ClientContext &context, const PhysicalPositionalJoin &op) |
21 | : rhs(context, op.children[1]->GetTypes()), initialized(false), source_offset(0), exhausted(false) { |
22 | rhs.InitializeAppend(state&: append_state); |
23 | } |
24 | |
25 | ColumnDataCollection rhs; |
26 | ColumnDataAppendState append_state; |
27 | mutex rhs_lock; |
28 | |
29 | bool initialized; |
30 | ColumnDataScanState scan_state; |
31 | DataChunk source; |
32 | idx_t source_offset; |
33 | bool exhausted; |
34 | |
35 | void InitializeScan(); |
36 | idx_t Refill(); |
37 | idx_t CopyData(DataChunk &output, const idx_t count, const idx_t col_offset); |
38 | void Execute(DataChunk &input, DataChunk &output); |
39 | void GetData(DataChunk &output); |
40 | }; |
41 | |
42 | unique_ptr<GlobalSinkState> PhysicalPositionalJoin::GetGlobalSinkState(ClientContext &context) const { |
43 | return make_uniq<PositionalJoinGlobalState>(args&: context, args: *this); |
44 | } |
45 | |
46 | SinkResultType PhysicalPositionalJoin::Sink(ExecutionContext &context, DataChunk &chunk, |
47 | OperatorSinkInput &input) const { |
48 | auto &sink = input.global_state.Cast<PositionalJoinGlobalState>(); |
49 | lock_guard<mutex> client_guard(sink.rhs_lock); |
50 | sink.rhs.Append(state&: sink.append_state, new_chunk&: chunk); |
51 | return SinkResultType::NEED_MORE_INPUT; |
52 | } |
53 | |
54 | //===--------------------------------------------------------------------===// |
55 | // Operator |
56 | //===--------------------------------------------------------------------===// |
57 | void PositionalJoinGlobalState::InitializeScan() { |
58 | if (!initialized) { |
59 | // not initialized yet: initialize the scan |
60 | initialized = true; |
61 | rhs.InitializeScanChunk(chunk&: source); |
62 | rhs.InitializeScan(state&: scan_state); |
63 | } |
64 | } |
65 | |
66 | idx_t PositionalJoinGlobalState::Refill() { |
67 | if (source_offset >= source.size()) { |
68 | if (!exhausted) { |
69 | source.Reset(); |
70 | rhs.Scan(state&: scan_state, result&: source); |
71 | } |
72 | source_offset = 0; |
73 | } |
74 | |
75 | const auto available = source.size() - source_offset; |
76 | if (!available) { |
77 | if (!exhausted) { |
78 | source.Reset(); |
79 | for (idx_t i = 0; i < source.ColumnCount(); ++i) { |
80 | auto &vec = source.data[i]; |
81 | vec.SetVectorType(VectorType::CONSTANT_VECTOR); |
82 | ConstantVector::SetNull(vector&: vec, is_null: true); |
83 | } |
84 | exhausted = true; |
85 | } |
86 | } |
87 | |
88 | return available; |
89 | } |
90 | |
91 | idx_t PositionalJoinGlobalState::CopyData(DataChunk &output, const idx_t count, const idx_t col_offset) { |
92 | if (!source_offset && (source.size() >= count || exhausted)) { |
93 | // Fast track: aligned and has enough data |
94 | for (idx_t i = 0; i < source.ColumnCount(); ++i) { |
95 | output.data[col_offset + i].Reference(other&: source.data[i]); |
96 | } |
97 | source_offset += count; |
98 | } else { |
99 | // Copy data |
100 | for (idx_t target_offset = 0; target_offset < count;) { |
101 | const auto needed = count - target_offset; |
102 | const auto available = exhausted ? needed : (source.size() - source_offset); |
103 | const auto copy_size = MinValue(a: needed, b: available); |
104 | const auto source_count = source_offset + copy_size; |
105 | for (idx_t i = 0; i < source.ColumnCount(); ++i) { |
106 | VectorOperations::Copy(source: source.data[i], target&: output.data[col_offset + i], source_count, source_offset, |
107 | target_offset); |
108 | } |
109 | target_offset += copy_size; |
110 | source_offset += copy_size; |
111 | Refill(); |
112 | } |
113 | } |
114 | |
115 | return source.ColumnCount(); |
116 | } |
117 | |
118 | void PositionalJoinGlobalState::Execute(DataChunk &input, DataChunk &output) { |
119 | lock_guard<mutex> client_guard(rhs_lock); |
120 | |
121 | // Reference the input and assume it will be full |
122 | const auto col_offset = input.ColumnCount(); |
123 | for (idx_t i = 0; i < col_offset; ++i) { |
124 | output.data[i].Reference(other&: input.data[i]); |
125 | } |
126 | |
127 | // Copy or reference the RHS columns |
128 | const auto count = input.size(); |
129 | InitializeScan(); |
130 | Refill(); |
131 | CopyData(output, count, col_offset); |
132 | |
133 | output.SetCardinality(count); |
134 | } |
135 | |
136 | OperatorResultType PhysicalPositionalJoin::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
137 | GlobalOperatorState &gstate, OperatorState &state_p) const { |
138 | auto &sink = sink_state->Cast<PositionalJoinGlobalState>(); |
139 | sink.Execute(input, output&: chunk); |
140 | return OperatorResultType::NEED_MORE_INPUT; |
141 | } |
142 | |
143 | //===--------------------------------------------------------------------===// |
144 | // Source |
145 | //===--------------------------------------------------------------------===// |
146 | void PositionalJoinGlobalState::GetData(DataChunk &output) { |
147 | lock_guard<mutex> client_guard(rhs_lock); |
148 | |
149 | InitializeScan(); |
150 | Refill(); |
151 | |
152 | // LHS exhausted |
153 | if (exhausted) { |
154 | // RHS exhausted too, so we are done |
155 | output.SetCardinality(0); |
156 | return; |
157 | } |
158 | |
159 | // LHS is all NULL |
160 | const auto col_offset = output.ColumnCount() - source.ColumnCount(); |
161 | for (idx_t i = 0; i < col_offset; ++i) { |
162 | auto &vec = output.data[i]; |
163 | vec.SetVectorType(VectorType::CONSTANT_VECTOR); |
164 | ConstantVector::SetNull(vector&: vec, is_null: true); |
165 | } |
166 | |
167 | // RHS still has data, so copy it |
168 | const auto count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, b: source.size() - source_offset); |
169 | CopyData(output, count, col_offset); |
170 | output.SetCardinality(count); |
171 | } |
172 | |
173 | SourceResultType PhysicalPositionalJoin::GetData(ExecutionContext &context, DataChunk &result, |
174 | OperatorSourceInput &input) const { |
175 | auto &sink = sink_state->Cast<PositionalJoinGlobalState>(); |
176 | sink.GetData(output&: result); |
177 | |
178 | return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
179 | } |
180 | |
181 | //===--------------------------------------------------------------------===// |
182 | // Pipeline Construction |
183 | //===--------------------------------------------------------------------===// |
184 | void PhysicalPositionalJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
185 | PhysicalJoin::BuildJoinPipelines(current, meta_pipeline, op&: *this); |
186 | } |
187 | |
188 | vector<const_reference<PhysicalOperator>> PhysicalPositionalJoin::GetSources() const { |
189 | auto result = children[0]->GetSources(); |
190 | if (IsSource()) { |
191 | result.push_back(x: *this); |
192 | } |
193 | return result; |
194 | } |
195 | |
196 | } // namespace duckdb |
197 | |