1#include "duckdb/execution/operator/join/physical_nested_loop_join.hpp"
2
3#include "duckdb/common/operator/comparison_operators.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/execution/nested_loop_join.hpp"
7
8using namespace std;
9
10namespace duckdb {
11
12class PhysicalNestedLoopJoinState : public PhysicalComparisonJoinState {
13public:
14 PhysicalNestedLoopJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions)
15 : PhysicalComparisonJoinState(left, right, conditions), right_chunk(0), has_null(false), left_tuple(0),
16 right_tuple(0) {
17 }
18
19 idx_t right_chunk;
20 DataChunk left_join_condition;
21 ChunkCollection right_data;
22 ChunkCollection right_chunks;
23 //! Whether or not the RHS of the nested loop join has NULL values
24 bool has_null;
25
26 idx_t left_tuple;
27 idx_t right_tuple;
28
29 unique_ptr<bool[]> left_found_match;
30};
31
32PhysicalNestedLoopJoin::PhysicalNestedLoopJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left,
33 unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond,
34 JoinType join_type)
35 : PhysicalComparisonJoin(op, PhysicalOperatorType::NESTED_LOOP_JOIN, move(cond), join_type) {
36 children.push_back(move(left));
37 children.push_back(move(right));
38}
39
40static bool HasNullValues(DataChunk &chunk) {
41 for (idx_t col_idx = 0; col_idx < chunk.column_count(); col_idx++) {
42 VectorData vdata;
43 chunk.data[col_idx].Orrify(chunk.size(), vdata);
44
45 if (vdata.nullmask->none()) {
46 continue;
47 }
48 for (idx_t i = 0; i < chunk.size(); i++) {
49 auto idx = vdata.sel->get_index(i);
50 if ((*vdata.nullmask)[idx]) {
51 return true;
52 }
53 }
54 }
55 return false;
56}
57
58template <bool MATCH>
59void PhysicalJoin::ConstructSemiOrAntiJoinResult(DataChunk &left, DataChunk &result, bool found_match[]) {
60 assert(left.column_count() == result.column_count());
61 // create the selection vector from the matches that were found
62 idx_t result_count = 0;
63 SelectionVector sel(STANDARD_VECTOR_SIZE);
64 for (idx_t i = 0; i < left.size(); i++) {
65 if (found_match[i] == MATCH) {
66 sel.set_index(result_count++, i);
67 }
68 }
69 // construct the final result
70 if (result_count > 0) {
71 // we only return the columns on the left side
72 // project them using the result selection vector
73 // reference the columns of the left side from the result
74 result.Slice(left, sel, result_count);
75 } else {
76 result.SetCardinality(0);
77 }
78}
79
80void PhysicalJoin::ConstructMarkJoinResult(DataChunk &join_keys, DataChunk &left, DataChunk &result, bool found_match[],
81 bool has_null) {
82 // for the initial set of columns we just reference the left side
83 result.SetCardinality(left);
84 for (idx_t i = 0; i < left.column_count(); i++) {
85 result.data[i].Reference(left.data[i]);
86 }
87 auto &mark_vector = result.data.back();
88 mark_vector.vector_type = VectorType::FLAT_VECTOR;
89 // first we set the NULL values from the join keys
90 // if there is any NULL in the keys, the result is NULL
91 auto bool_result = FlatVector::GetData<bool>(mark_vector);
92 auto &nullmask = FlatVector::Nullmask(mark_vector);
93 for (idx_t col_idx = 0; col_idx < join_keys.column_count(); col_idx++) {
94 VectorData jdata;
95 join_keys.data[col_idx].Orrify(join_keys.size(), jdata);
96 if (jdata.nullmask->any()) {
97 for (idx_t i = 0; i < join_keys.size(); i++) {
98 auto jidx = jdata.sel->get_index(i);
99 nullmask[i] = (*jdata.nullmask)[jidx];
100 }
101 }
102 }
103 // now set the remaining entries to either true or false based on whether a match was found
104 if (found_match) {
105 for (idx_t i = 0; i < left.size(); i++) {
106 bool_result[i] = found_match[i];
107 }
108 } else {
109 memset(bool_result, 0, sizeof(bool) * left.size());
110 }
111 // if the right side contains NULL values, the result of any FALSE becomes NULL
112 if (has_null) {
113 for (idx_t i = 0; i < left.size(); i++) {
114 if (!bool_result[i]) {
115 nullmask[i] = true;
116 }
117 }
118 }
119}
120
121void PhysicalNestedLoopJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
122 auto state = reinterpret_cast<PhysicalNestedLoopJoinState *>(state_);
123
124 // first we fully materialize the right child, if we haven't done that yet
125 if (state->right_chunks.column_count() == 0) {
126 vector<TypeId> condition_types;
127 for (auto &cond : conditions) {
128 assert(cond.left->return_type == cond.right->return_type);
129 condition_types.push_back(cond.left->return_type);
130 }
131
132 auto right_state = children[1]->GetOperatorState();
133 auto types = children[1]->GetTypes();
134
135 DataChunk new_chunk, right_condition;
136 new_chunk.Initialize(types);
137 right_condition.Initialize(condition_types);
138 do {
139 children[1]->GetChunk(context, new_chunk, right_state.get());
140 if (new_chunk.size() == 0) {
141 break;
142 }
143 // resolve the join expression of the right side
144 state->rhs_executor.Execute(new_chunk, right_condition);
145
146 state->right_data.Append(new_chunk);
147 state->right_chunks.Append(right_condition);
148 } while (new_chunk.size() > 0);
149
150 if (state->right_chunks.count == 0) {
151 if ((type == JoinType::INNER || type == JoinType::SEMI)) {
152 // empty RHS with INNER or SEMI join means empty result set
153 return;
154 }
155 } else {
156 // for the MARK join, we check if there are null values in any of the right chunks
157 if (type == JoinType::MARK) {
158 for (idx_t i = 0; i < state->right_chunks.chunks.size(); i++) {
159 if (HasNullValues(*state->right_chunks.chunks[i])) {
160 state->has_null = true;
161 }
162 }
163 }
164 // initialize the chunks for the join conditions
165 state->left_join_condition.Initialize(condition_types);
166 state->right_chunk = state->right_chunks.chunks.size() - 1;
167 state->right_tuple = state->right_chunks.chunks[state->right_chunk]->size();
168 }
169 }
170
171 if (state->right_chunks.count == 0) {
172 // empty join, switch on type
173 if (type == JoinType::MARK) {
174 // pull a chunk from the LHS
175 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
176 if (state->child_chunk.size() == 0) {
177 return;
178 }
179 // RHS empty: set FOUND MATCh vector to false
180 chunk.Reference(state->child_chunk);
181 auto &mark_vector = chunk.data.back();
182 mark_vector.vector_type = VectorType::CONSTANT_VECTOR;
183 mark_vector.SetValue(0, Value::BOOLEAN(false));
184 } else if (type == JoinType::ANTI) {
185 // ANTI join, just pull chunk from RHS
186 children[0]->GetChunk(context, chunk, state->child_state.get());
187 } else if (type == JoinType::LEFT) {
188 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
189 if (state->child_chunk.size() == 0) {
190 return;
191 }
192 chunk.Reference(state->child_chunk);
193 for (idx_t idx = state->child_chunk.column_count(); idx < chunk.column_count(); idx++) {
194 chunk.data[idx].vector_type = VectorType::CONSTANT_VECTOR;
195 ConstantVector::SetNull(chunk.data[idx], true);
196 }
197 } else {
198 throw Exception("Unhandled type for empty NL join");
199 }
200 return;
201 }
202 if ((type == JoinType::INNER || type == JoinType::LEFT) &&
203 state->right_chunk >= state->right_chunks.chunks.size()) {
204 return;
205 }
206 // now that we have fully materialized the right child
207 // we have to perform the nested loop join
208 do {
209 // first check if we have to move to the next child on the right isde
210 assert(state->right_chunk < state->right_chunks.chunks.size());
211 if (state->right_tuple >= state->right_chunks.chunks[state->right_chunk]->size()) {
212 // we exhausted the chunk on the right
213 state->right_chunk++;
214 if (state->right_chunk >= state->right_chunks.chunks.size()) {
215 // we exhausted all right chunks!
216 // move to the next left chunk
217 do {
218 if (type == JoinType::LEFT) {
219 // left join: before we move to the next chunk, see if we need to output any vectors that didn't
220 // have a match found
221 if (state->left_found_match) {
222 SelectionVector remaining_sel(STANDARD_VECTOR_SIZE);
223 idx_t remaining_count = 0;
224 for (idx_t i = 0; i < state->child_chunk.size(); i++) {
225 if (!state->left_found_match[i]) {
226 remaining_sel.set_index(remaining_count++, i);
227 }
228 }
229 state->left_found_match.reset();
230 chunk.Slice(state->child_chunk, remaining_sel, remaining_count);
231 for (idx_t idx = state->child_chunk.column_count(); idx < chunk.column_count(); idx++) {
232 chunk.data[idx].vector_type = VectorType::CONSTANT_VECTOR;
233 ConstantVector::SetNull(chunk.data[idx], true);
234 }
235 } else {
236 state->left_found_match = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]);
237 memset(state->left_found_match.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE);
238 }
239 }
240 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
241 if (state->child_chunk.size() == 0) {
242 return;
243 }
244
245 // resolve the left join condition for the current chunk
246 state->lhs_executor.Execute(state->child_chunk, state->left_join_condition);
247 } while (state->left_join_condition.size() == 0);
248
249 state->right_chunk = 0;
250 }
251 // move to the start of this chunk
252 state->left_tuple = 0;
253 state->right_tuple = 0;
254 }
255
256 switch (type) {
257 case JoinType::SEMI:
258 case JoinType::ANTI:
259 case JoinType::MARK: {
260 // MARK, SEMI and ANTI joins are handled separately because they scan the whole RHS in one go
261 bool found_match[STANDARD_VECTOR_SIZE] = {false};
262 NestedLoopJoinMark::Perform(state->left_join_condition, state->right_chunks, found_match, conditions);
263 if (type == JoinType::MARK) {
264 // now construct the mark join result from the found matches
265 PhysicalJoin::ConstructMarkJoinResult(state->left_join_condition, state->child_chunk, chunk,
266 found_match, state->has_null);
267 } else if (type == JoinType::SEMI) {
268 // construct the semi join result from the found matches
269 PhysicalJoin::ConstructSemiOrAntiJoinResult<true>(state->child_chunk, chunk, found_match);
270 } else if (type == JoinType::ANTI) {
271 PhysicalJoin::ConstructSemiOrAntiJoinResult<false>(state->child_chunk, chunk, found_match);
272 }
273 // move to the next LHS chunk in the next iteration
274 state->right_tuple = state->right_chunks.chunks[state->right_chunk]->size();
275 state->right_chunk = state->right_chunks.chunks.size() - 1;
276 if (chunk.size() > 0) {
277 return;
278 } else {
279 continue;
280 }
281 }
282 default:
283 break;
284 }
285
286 auto &left_chunk = state->child_chunk;
287 auto &right_chunk = *state->right_chunks.chunks[state->right_chunk];
288 auto &right_data = *state->right_data.chunks[state->right_chunk];
289
290 // sanity check
291 left_chunk.Verify();
292 right_chunk.Verify();
293 right_data.Verify();
294
295 // now perform the join
296 switch (type) {
297 case JoinType::LEFT:
298 case JoinType::INNER: {
299 SelectionVector lvector(STANDARD_VECTOR_SIZE), rvector(STANDARD_VECTOR_SIZE);
300 idx_t match_count =
301 NestedLoopJoinInner::Perform(state->left_tuple, state->right_tuple, state->left_join_condition,
302 right_chunk, lvector, rvector, conditions);
303 // we have finished resolving the join conditions
304 if (match_count == 0) {
305 // if there are no results, move on
306 continue;
307 }
308 // we have matching tuples!
309 // construct the result
310 if (state->left_found_match) {
311 for (idx_t i = 0; i < match_count; i++) {
312 state->left_found_match[lvector.get_index(i)] = true;
313 }
314 }
315 chunk.Slice(state->child_chunk, lvector, match_count);
316 chunk.Slice(right_data, rvector, match_count, state->child_chunk.column_count());
317 break;
318 }
319 default:
320 throw NotImplementedException("Unimplemented type for nested loop join!");
321 }
322 } while (chunk.size() == 0);
323}
324
325unique_ptr<PhysicalOperatorState> PhysicalNestedLoopJoin::GetOperatorState() {
326 return make_unique<PhysicalNestedLoopJoinState>(children[0].get(), children[1].get(), conditions);
327}
328
329} // namespace duckdb
330