1 | #include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/execution/expression_executor.hpp" |
5 | |
6 | using namespace duckdb; |
7 | using namespace std; |
8 | |
9 | class PhysicalBlockwiseNLJoinState : public PhysicalOperatorState { |
10 | public: |
11 | PhysicalBlockwiseNLJoinState(PhysicalOperator *left, PhysicalOperator *right, Expression &condition) |
12 | : PhysicalOperatorState(left), left_position(0), right_position(0), fill_in_rhs(false), |
13 | checked_found_match(false), executor(condition) { |
14 | assert(left && right); |
15 | } |
16 | |
17 | //! Whether or not a tuple on the LHS has found a match, only used for LEFT OUTER and FULL OUTER joins |
18 | unique_ptr<bool[]> lhs_found_match; |
19 | //! Whether or not a tuple on the RHS has found a match, only used for FULL OUTER joins |
20 | unique_ptr<bool[]> rhs_found_match; |
21 | ChunkCollection right_chunks; |
22 | idx_t left_position; |
23 | idx_t right_position; |
24 | bool fill_in_rhs; |
25 | bool checked_found_match; |
26 | ExpressionExecutor executor; |
27 | }; |
28 | |
29 | PhysicalBlockwiseNLJoin::PhysicalBlockwiseNLJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
30 | unique_ptr<PhysicalOperator> right, unique_ptr<Expression> condition, |
31 | JoinType join_type) |
32 | : PhysicalJoin(op, PhysicalOperatorType::BLOCKWISE_NL_JOIN, join_type), condition(move(condition)) { |
33 | children.push_back(move(left)); |
34 | children.push_back(move(right)); |
35 | // MARK, SINGLE and RIGHT OUTER joins not handled |
36 | assert(join_type != JoinType::MARK); |
37 | assert(join_type != JoinType::RIGHT); |
38 | assert(join_type != JoinType::SINGLE); |
39 | } |
40 | |
41 | void PhysicalBlockwiseNLJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, |
42 | PhysicalOperatorState *state_) { |
43 | auto state = reinterpret_cast<PhysicalBlockwiseNLJoinState *>(state_); |
44 | |
45 | // first we fully materialize the right child, if we haven't done that yet |
46 | if (state->right_chunks.column_count() == 0) { |
47 | auto right_state = children[1]->GetOperatorState(); |
48 | auto left_types = children[0]->GetTypes(); |
49 | auto right_types = children[1]->GetTypes(); |
50 | |
51 | DataChunk right_chunk; |
52 | right_chunk.Initialize(right_types); |
53 | while (true) { |
54 | children[1]->GetChunk(context, right_chunk, right_state.get()); |
55 | if (right_chunk.size() == 0) { |
56 | break; |
57 | } |
58 | state->right_chunks.Append(right_chunk); |
59 | } |
60 | |
61 | if (state->right_chunks.count == 0) { |
62 | if ((type == JoinType::INNER || type == JoinType::SEMI)) { |
63 | // empty RHS with INNER or SEMI join means empty result set |
64 | return; |
65 | } |
66 | } |
67 | // initialize the found_match vectors for the left and right sides |
68 | if (type == JoinType::LEFT || type == JoinType::OUTER) { |
69 | state->lhs_found_match = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]); |
70 | } |
71 | if (type == JoinType::OUTER) { |
72 | state->rhs_found_match = unique_ptr<bool[]>(new bool[state->right_chunks.count]); |
73 | memset(state->rhs_found_match.get(), 0, sizeof(bool) * state->right_chunks.count); |
74 | } |
75 | } |
76 | |
77 | if (state->right_chunks.count == 0) { |
78 | // empty join |
79 | assert(type == JoinType::LEFT || type == JoinType::OUTER || type == JoinType::ANTI); |
80 | // pull a chunk from the LHS |
81 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
82 | if (state->child_chunk.size() == 0) { |
83 | return; |
84 | } |
85 | // fill in the data from the chunk |
86 | idx_t i; |
87 | for (i = 0; i < state->child_chunk.column_count(); i++) { |
88 | chunk.data[i].Reference(state->child_chunk.data[i]); |
89 | } |
90 | chunk.SetCardinality(state->child_chunk.size()); |
91 | if (type == JoinType::LEFT || type == JoinType::OUTER) { |
92 | // LEFT OUTER or FULL OUTER join with empty RHS |
93 | // fill any columns from the RHS with NULLs |
94 | for (; i < chunk.column_count(); i++) { |
95 | chunk.data[i].vector_type = VectorType::CONSTANT_VECTOR; |
96 | ConstantVector::SetNull(chunk.data[i], true); |
97 | } |
98 | } |
99 | return; |
100 | } |
101 | |
102 | // now perform the actual join |
103 | // we construct a combined DataChunk by referencing the LHS and the RHS |
104 | // every step that we do not have output results we shift the vectors of the RHS one up or down |
105 | // this creates a new "alignment" between the tuples, exhausting all possible O(n^2) combinations |
106 | // while allowing us to use vectorized execution for every step |
107 | idx_t result_count = 0; |
108 | do { |
109 | if (state->fill_in_rhs) { |
110 | throw NotImplementedException("FIXME: full outer join" ); |
111 | } |
112 | if (state->left_position >= state->child_chunk.size()) { |
113 | // exhausted LHS, have to pull new LHS chunk |
114 | if (!state->checked_found_match && state->lhs_found_match) { |
115 | // LEFT OUTER JOIN or FULL OUTER JOIN, first check if we need to create extra results because of |
116 | // non-matching tuples |
117 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
118 | for (idx_t i = 0; i < state->child_chunk.size(); i++) { |
119 | if (!state->lhs_found_match[i]) { |
120 | sel.set_index(result_count++, i); |
121 | } |
122 | } |
123 | if (result_count > 0) { |
124 | // have to create the chunk, set the selection vector and count |
125 | // for the LHS, reference the child_chunk and set the sel_vector and count |
126 | chunk.Slice(state->child_chunk, sel, result_count); |
127 | // for the RHS, set the mask to NULL and set the sel_vector and count |
128 | for (idx_t i = state->child_chunk.column_count(); i < chunk.column_count(); i++) { |
129 | chunk.data[i].vector_type = VectorType::CONSTANT_VECTOR; |
130 | ConstantVector::SetNull(chunk.data[i], true); |
131 | } |
132 | state->checked_found_match = true; |
133 | return; |
134 | } |
135 | } |
136 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
137 | // no more data on LHS, if FULL OUTER JOIN iterate over RHS |
138 | if (state->child_chunk.size() == 0) { |
139 | if (type == JoinType::OUTER) { |
140 | state->fill_in_rhs = true; |
141 | continue; |
142 | } else { |
143 | return; |
144 | } |
145 | } |
146 | state->child_chunk.Normalify(); |
147 | state->left_position = 0; |
148 | state->right_position = 0; |
149 | if (state->lhs_found_match) { |
150 | state->checked_found_match = false; |
151 | memset(state->lhs_found_match.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE); |
152 | } |
153 | } |
154 | auto &lchunk = state->child_chunk; |
155 | auto &rchunk = *state->right_chunks.chunks[state->right_position]; |
156 | |
157 | // fill in the current element of the LHS into the chunk |
158 | assert(chunk.column_count() == lchunk.column_count() + rchunk.column_count()); |
159 | for (idx_t i = 0; i < lchunk.column_count(); i++) { |
160 | auto lvalue = lchunk.GetValue(i, state->left_position); |
161 | chunk.data[i].Reference(lvalue); |
162 | } |
163 | // for the RHS we just reference the entire vector |
164 | for (idx_t i = 0; i < rchunk.column_count(); i++) { |
165 | chunk.data[lchunk.column_count() + i].Reference(rchunk.data[i]); |
166 | } |
167 | chunk.SetCardinality(rchunk.size()); |
168 | |
169 | // now perform the computation |
170 | SelectionVector match_sel(STANDARD_VECTOR_SIZE); |
171 | result_count = state->executor.SelectExpression(chunk, match_sel); |
172 | if (result_count > 0) { |
173 | // found a match! |
174 | // set the match flags in the LHS |
175 | if (state->lhs_found_match) { |
176 | state->lhs_found_match[state->left_position] = true; |
177 | } |
178 | chunk.Slice(match_sel, result_count); |
179 | |
180 | // set the match flags in the RHS |
181 | if (state->rhs_found_match) { |
182 | for (idx_t i = 0; i < result_count; i++) { |
183 | auto idx = match_sel.get_index(i); |
184 | state->rhs_found_match[state->right_position * STANDARD_VECTOR_SIZE + idx] = true; |
185 | } |
186 | } |
187 | } else { |
188 | // no result: reset the chunk |
189 | chunk.Reset(); |
190 | } |
191 | // move to the next tuple on the LHS |
192 | state->left_position++; |
193 | if (state->left_position >= state->child_chunk.size()) { |
194 | // exhausted the current chunk, move to the next RHS chunk |
195 | state->right_position++; |
196 | if (state->right_position < state->right_chunks.chunks.size()) { |
197 | // we still have chunks left! start over on the LHS |
198 | state->left_position = 0; |
199 | } |
200 | } |
201 | } while (result_count == 0); |
202 | } |
203 | |
204 | unique_ptr<PhysicalOperatorState> PhysicalBlockwiseNLJoin::GetOperatorState() { |
205 | return make_unique<PhysicalBlockwiseNLJoinState>(children[0].get(), children[1].get(), *condition); |
206 | } |
207 | |
208 | string PhysicalBlockwiseNLJoin::() const { |
209 | string = JoinTypeToString(type) + "\n" ; |
210 | extra_info += condition->GetName(); |
211 | return extra_info; |
212 | } |
213 | |