1 | #include "duckdb/execution/operator/join/physical_nested_loop_join.hpp" |
2 | |
3 | #include "duckdb/common/operator/comparison_operators.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/execution/nested_loop_join.hpp" |
7 | |
8 | using namespace std; |
9 | |
10 | namespace duckdb { |
11 | |
12 | class PhysicalNestedLoopJoinState : public PhysicalComparisonJoinState { |
13 | public: |
14 | PhysicalNestedLoopJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions) |
15 | : PhysicalComparisonJoinState(left, right, conditions), right_chunk(0), has_null(false), left_tuple(0), |
16 | right_tuple(0) { |
17 | } |
18 | |
19 | idx_t right_chunk; |
20 | DataChunk left_join_condition; |
21 | ChunkCollection right_data; |
22 | ChunkCollection right_chunks; |
23 | //! Whether or not the RHS of the nested loop join has NULL values |
24 | bool has_null; |
25 | |
26 | idx_t left_tuple; |
27 | idx_t right_tuple; |
28 | |
29 | unique_ptr<bool[]> left_found_match; |
30 | }; |
31 | |
32 | PhysicalNestedLoopJoin::PhysicalNestedLoopJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
33 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, |
34 | JoinType join_type) |
35 | : PhysicalComparisonJoin(op, PhysicalOperatorType::NESTED_LOOP_JOIN, move(cond), join_type) { |
36 | children.push_back(move(left)); |
37 | children.push_back(move(right)); |
38 | } |
39 | |
40 | static bool HasNullValues(DataChunk &chunk) { |
41 | for (idx_t col_idx = 0; col_idx < chunk.column_count(); col_idx++) { |
42 | VectorData vdata; |
43 | chunk.data[col_idx].Orrify(chunk.size(), vdata); |
44 | |
45 | if (vdata.nullmask->none()) { |
46 | continue; |
47 | } |
48 | for (idx_t i = 0; i < chunk.size(); i++) { |
49 | auto idx = vdata.sel->get_index(i); |
50 | if ((*vdata.nullmask)[idx]) { |
51 | return true; |
52 | } |
53 | } |
54 | } |
55 | return false; |
56 | } |
57 | |
58 | template <bool MATCH> |
59 | void PhysicalJoin::ConstructSemiOrAntiJoinResult(DataChunk &left, DataChunk &result, bool found_match[]) { |
60 | assert(left.column_count() == result.column_count()); |
61 | // create the selection vector from the matches that were found |
62 | idx_t result_count = 0; |
63 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
64 | for (idx_t i = 0; i < left.size(); i++) { |
65 | if (found_match[i] == MATCH) { |
66 | sel.set_index(result_count++, i); |
67 | } |
68 | } |
69 | // construct the final result |
70 | if (result_count > 0) { |
71 | // we only return the columns on the left side |
72 | // project them using the result selection vector |
73 | // reference the columns of the left side from the result |
74 | result.Slice(left, sel, result_count); |
75 | } else { |
76 | result.SetCardinality(0); |
77 | } |
78 | } |
79 | |
80 | void PhysicalJoin::ConstructMarkJoinResult(DataChunk &join_keys, DataChunk &left, DataChunk &result, bool found_match[], |
81 | bool has_null) { |
82 | // for the initial set of columns we just reference the left side |
83 | result.SetCardinality(left); |
84 | for (idx_t i = 0; i < left.column_count(); i++) { |
85 | result.data[i].Reference(left.data[i]); |
86 | } |
87 | auto &mark_vector = result.data.back(); |
88 | mark_vector.vector_type = VectorType::FLAT_VECTOR; |
89 | // first we set the NULL values from the join keys |
90 | // if there is any NULL in the keys, the result is NULL |
91 | auto bool_result = FlatVector::GetData<bool>(mark_vector); |
92 | auto &nullmask = FlatVector::Nullmask(mark_vector); |
93 | for (idx_t col_idx = 0; col_idx < join_keys.column_count(); col_idx++) { |
94 | VectorData jdata; |
95 | join_keys.data[col_idx].Orrify(join_keys.size(), jdata); |
96 | if (jdata.nullmask->any()) { |
97 | for (idx_t i = 0; i < join_keys.size(); i++) { |
98 | auto jidx = jdata.sel->get_index(i); |
99 | nullmask[i] = (*jdata.nullmask)[jidx]; |
100 | } |
101 | } |
102 | } |
103 | // now set the remaining entries to either true or false based on whether a match was found |
104 | if (found_match) { |
105 | for (idx_t i = 0; i < left.size(); i++) { |
106 | bool_result[i] = found_match[i]; |
107 | } |
108 | } else { |
109 | memset(bool_result, 0, sizeof(bool) * left.size()); |
110 | } |
111 | // if the right side contains NULL values, the result of any FALSE becomes NULL |
112 | if (has_null) { |
113 | for (idx_t i = 0; i < left.size(); i++) { |
114 | if (!bool_result[i]) { |
115 | nullmask[i] = true; |
116 | } |
117 | } |
118 | } |
119 | } |
120 | |
121 | void PhysicalNestedLoopJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
122 | auto state = reinterpret_cast<PhysicalNestedLoopJoinState *>(state_); |
123 | |
124 | // first we fully materialize the right child, if we haven't done that yet |
125 | if (state->right_chunks.column_count() == 0) { |
126 | vector<TypeId> condition_types; |
127 | for (auto &cond : conditions) { |
128 | assert(cond.left->return_type == cond.right->return_type); |
129 | condition_types.push_back(cond.left->return_type); |
130 | } |
131 | |
132 | auto right_state = children[1]->GetOperatorState(); |
133 | auto types = children[1]->GetTypes(); |
134 | |
135 | DataChunk new_chunk, right_condition; |
136 | new_chunk.Initialize(types); |
137 | right_condition.Initialize(condition_types); |
138 | do { |
139 | children[1]->GetChunk(context, new_chunk, right_state.get()); |
140 | if (new_chunk.size() == 0) { |
141 | break; |
142 | } |
143 | // resolve the join expression of the right side |
144 | state->rhs_executor.Execute(new_chunk, right_condition); |
145 | |
146 | state->right_data.Append(new_chunk); |
147 | state->right_chunks.Append(right_condition); |
148 | } while (new_chunk.size() > 0); |
149 | |
150 | if (state->right_chunks.count == 0) { |
151 | if ((type == JoinType::INNER || type == JoinType::SEMI)) { |
152 | // empty RHS with INNER or SEMI join means empty result set |
153 | return; |
154 | } |
155 | } else { |
156 | // for the MARK join, we check if there are null values in any of the right chunks |
157 | if (type == JoinType::MARK) { |
158 | for (idx_t i = 0; i < state->right_chunks.chunks.size(); i++) { |
159 | if (HasNullValues(*state->right_chunks.chunks[i])) { |
160 | state->has_null = true; |
161 | } |
162 | } |
163 | } |
164 | // initialize the chunks for the join conditions |
165 | state->left_join_condition.Initialize(condition_types); |
166 | state->right_chunk = state->right_chunks.chunks.size() - 1; |
167 | state->right_tuple = state->right_chunks.chunks[state->right_chunk]->size(); |
168 | } |
169 | } |
170 | |
171 | if (state->right_chunks.count == 0) { |
172 | // empty join, switch on type |
173 | if (type == JoinType::MARK) { |
174 | // pull a chunk from the LHS |
175 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
176 | if (state->child_chunk.size() == 0) { |
177 | return; |
178 | } |
179 | // RHS empty: set FOUND MATCh vector to false |
180 | chunk.Reference(state->child_chunk); |
181 | auto &mark_vector = chunk.data.back(); |
182 | mark_vector.vector_type = VectorType::CONSTANT_VECTOR; |
183 | mark_vector.SetValue(0, Value::BOOLEAN(false)); |
184 | } else if (type == JoinType::ANTI) { |
185 | // ANTI join, just pull chunk from RHS |
186 | children[0]->GetChunk(context, chunk, state->child_state.get()); |
187 | } else if (type == JoinType::LEFT) { |
188 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
189 | if (state->child_chunk.size() == 0) { |
190 | return; |
191 | } |
192 | chunk.Reference(state->child_chunk); |
193 | for (idx_t idx = state->child_chunk.column_count(); idx < chunk.column_count(); idx++) { |
194 | chunk.data[idx].vector_type = VectorType::CONSTANT_VECTOR; |
195 | ConstantVector::SetNull(chunk.data[idx], true); |
196 | } |
197 | } else { |
198 | throw Exception("Unhandled type for empty NL join" ); |
199 | } |
200 | return; |
201 | } |
202 | if ((type == JoinType::INNER || type == JoinType::LEFT) && |
203 | state->right_chunk >= state->right_chunks.chunks.size()) { |
204 | return; |
205 | } |
206 | // now that we have fully materialized the right child |
207 | // we have to perform the nested loop join |
208 | do { |
209 | // first check if we have to move to the next child on the right isde |
210 | assert(state->right_chunk < state->right_chunks.chunks.size()); |
211 | if (state->right_tuple >= state->right_chunks.chunks[state->right_chunk]->size()) { |
212 | // we exhausted the chunk on the right |
213 | state->right_chunk++; |
214 | if (state->right_chunk >= state->right_chunks.chunks.size()) { |
215 | // we exhausted all right chunks! |
216 | // move to the next left chunk |
217 | do { |
218 | if (type == JoinType::LEFT) { |
219 | // left join: before we move to the next chunk, see if we need to output any vectors that didn't |
220 | // have a match found |
221 | if (state->left_found_match) { |
222 | SelectionVector remaining_sel(STANDARD_VECTOR_SIZE); |
223 | idx_t remaining_count = 0; |
224 | for (idx_t i = 0; i < state->child_chunk.size(); i++) { |
225 | if (!state->left_found_match[i]) { |
226 | remaining_sel.set_index(remaining_count++, i); |
227 | } |
228 | } |
229 | state->left_found_match.reset(); |
230 | chunk.Slice(state->child_chunk, remaining_sel, remaining_count); |
231 | for (idx_t idx = state->child_chunk.column_count(); idx < chunk.column_count(); idx++) { |
232 | chunk.data[idx].vector_type = VectorType::CONSTANT_VECTOR; |
233 | ConstantVector::SetNull(chunk.data[idx], true); |
234 | } |
235 | } else { |
236 | state->left_found_match = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]); |
237 | memset(state->left_found_match.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE); |
238 | } |
239 | } |
240 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
241 | if (state->child_chunk.size() == 0) { |
242 | return; |
243 | } |
244 | |
245 | // resolve the left join condition for the current chunk |
246 | state->lhs_executor.Execute(state->child_chunk, state->left_join_condition); |
247 | } while (state->left_join_condition.size() == 0); |
248 | |
249 | state->right_chunk = 0; |
250 | } |
251 | // move to the start of this chunk |
252 | state->left_tuple = 0; |
253 | state->right_tuple = 0; |
254 | } |
255 | |
256 | switch (type) { |
257 | case JoinType::SEMI: |
258 | case JoinType::ANTI: |
259 | case JoinType::MARK: { |
260 | // MARK, SEMI and ANTI joins are handled separately because they scan the whole RHS in one go |
261 | bool found_match[STANDARD_VECTOR_SIZE] = {false}; |
262 | NestedLoopJoinMark::Perform(state->left_join_condition, state->right_chunks, found_match, conditions); |
263 | if (type == JoinType::MARK) { |
264 | // now construct the mark join result from the found matches |
265 | PhysicalJoin::ConstructMarkJoinResult(state->left_join_condition, state->child_chunk, chunk, |
266 | found_match, state->has_null); |
267 | } else if (type == JoinType::SEMI) { |
268 | // construct the semi join result from the found matches |
269 | PhysicalJoin::ConstructSemiOrAntiJoinResult<true>(state->child_chunk, chunk, found_match); |
270 | } else if (type == JoinType::ANTI) { |
271 | PhysicalJoin::ConstructSemiOrAntiJoinResult<false>(state->child_chunk, chunk, found_match); |
272 | } |
273 | // move to the next LHS chunk in the next iteration |
274 | state->right_tuple = state->right_chunks.chunks[state->right_chunk]->size(); |
275 | state->right_chunk = state->right_chunks.chunks.size() - 1; |
276 | if (chunk.size() > 0) { |
277 | return; |
278 | } else { |
279 | continue; |
280 | } |
281 | } |
282 | default: |
283 | break; |
284 | } |
285 | |
286 | auto &left_chunk = state->child_chunk; |
287 | auto &right_chunk = *state->right_chunks.chunks[state->right_chunk]; |
288 | auto &right_data = *state->right_data.chunks[state->right_chunk]; |
289 | |
290 | // sanity check |
291 | left_chunk.Verify(); |
292 | right_chunk.Verify(); |
293 | right_data.Verify(); |
294 | |
295 | // now perform the join |
296 | switch (type) { |
297 | case JoinType::LEFT: |
298 | case JoinType::INNER: { |
299 | SelectionVector lvector(STANDARD_VECTOR_SIZE), rvector(STANDARD_VECTOR_SIZE); |
300 | idx_t match_count = |
301 | NestedLoopJoinInner::Perform(state->left_tuple, state->right_tuple, state->left_join_condition, |
302 | right_chunk, lvector, rvector, conditions); |
303 | // we have finished resolving the join conditions |
304 | if (match_count == 0) { |
305 | // if there are no results, move on |
306 | continue; |
307 | } |
308 | // we have matching tuples! |
309 | // construct the result |
310 | if (state->left_found_match) { |
311 | for (idx_t i = 0; i < match_count; i++) { |
312 | state->left_found_match[lvector.get_index(i)] = true; |
313 | } |
314 | } |
315 | chunk.Slice(state->child_chunk, lvector, match_count); |
316 | chunk.Slice(right_data, rvector, match_count, state->child_chunk.column_count()); |
317 | break; |
318 | } |
319 | default: |
320 | throw NotImplementedException("Unimplemented type for nested loop join!" ); |
321 | } |
322 | } while (chunk.size() == 0); |
323 | } |
324 | |
325 | unique_ptr<PhysicalOperatorState> PhysicalNestedLoopJoin::GetOperatorState() { |
326 | return make_unique<PhysicalNestedLoopJoinState>(children[0].get(), children[1].get(), conditions); |
327 | } |
328 | |
329 | } // namespace duckdb |
330 | |