1#include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/expression_executor.hpp"
5
6using namespace duckdb;
7using namespace std;
8
9class PhysicalBlockwiseNLJoinState : public PhysicalOperatorState {
10public:
11 PhysicalBlockwiseNLJoinState(PhysicalOperator *left, PhysicalOperator *right, Expression &condition)
12 : PhysicalOperatorState(left), left_position(0), right_position(0), fill_in_rhs(false),
13 checked_found_match(false), executor(condition) {
14 assert(left && right);
15 }
16
17 //! Whether or not a tuple on the LHS has found a match, only used for LEFT OUTER and FULL OUTER joins
18 unique_ptr<bool[]> lhs_found_match;
19 //! Whether or not a tuple on the RHS has found a match, only used for FULL OUTER joins
20 unique_ptr<bool[]> rhs_found_match;
21 ChunkCollection right_chunks;
22 idx_t left_position;
23 idx_t right_position;
24 bool fill_in_rhs;
25 bool checked_found_match;
26 ExpressionExecutor executor;
27};
28
29PhysicalBlockwiseNLJoin::PhysicalBlockwiseNLJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left,
30 unique_ptr<PhysicalOperator> right, unique_ptr<Expression> condition,
31 JoinType join_type)
32 : PhysicalJoin(op, PhysicalOperatorType::BLOCKWISE_NL_JOIN, join_type), condition(move(condition)) {
33 children.push_back(move(left));
34 children.push_back(move(right));
35 // MARK, SINGLE and RIGHT OUTER joins not handled
36 assert(join_type != JoinType::MARK);
37 assert(join_type != JoinType::RIGHT);
38 assert(join_type != JoinType::SINGLE);
39}
40
41void PhysicalBlockwiseNLJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk,
42 PhysicalOperatorState *state_) {
43 auto state = reinterpret_cast<PhysicalBlockwiseNLJoinState *>(state_);
44
45 // first we fully materialize the right child, if we haven't done that yet
46 if (state->right_chunks.column_count() == 0) {
47 auto right_state = children[1]->GetOperatorState();
48 auto left_types = children[0]->GetTypes();
49 auto right_types = children[1]->GetTypes();
50
51 DataChunk right_chunk;
52 right_chunk.Initialize(right_types);
53 while (true) {
54 children[1]->GetChunk(context, right_chunk, right_state.get());
55 if (right_chunk.size() == 0) {
56 break;
57 }
58 state->right_chunks.Append(right_chunk);
59 }
60
61 if (state->right_chunks.count == 0) {
62 if ((type == JoinType::INNER || type == JoinType::SEMI)) {
63 // empty RHS with INNER or SEMI join means empty result set
64 return;
65 }
66 }
67 // initialize the found_match vectors for the left and right sides
68 if (type == JoinType::LEFT || type == JoinType::OUTER) {
69 state->lhs_found_match = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]);
70 }
71 if (type == JoinType::OUTER) {
72 state->rhs_found_match = unique_ptr<bool[]>(new bool[state->right_chunks.count]);
73 memset(state->rhs_found_match.get(), 0, sizeof(bool) * state->right_chunks.count);
74 }
75 }
76
77 if (state->right_chunks.count == 0) {
78 // empty join
79 assert(type == JoinType::LEFT || type == JoinType::OUTER || type == JoinType::ANTI);
80 // pull a chunk from the LHS
81 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
82 if (state->child_chunk.size() == 0) {
83 return;
84 }
85 // fill in the data from the chunk
86 idx_t i;
87 for (i = 0; i < state->child_chunk.column_count(); i++) {
88 chunk.data[i].Reference(state->child_chunk.data[i]);
89 }
90 chunk.SetCardinality(state->child_chunk.size());
91 if (type == JoinType::LEFT || type == JoinType::OUTER) {
92 // LEFT OUTER or FULL OUTER join with empty RHS
93 // fill any columns from the RHS with NULLs
94 for (; i < chunk.column_count(); i++) {
95 chunk.data[i].vector_type = VectorType::CONSTANT_VECTOR;
96 ConstantVector::SetNull(chunk.data[i], true);
97 }
98 }
99 return;
100 }
101
102 // now perform the actual join
103 // we construct a combined DataChunk by referencing the LHS and the RHS
104 // every step that we do not have output results we shift the vectors of the RHS one up or down
105 // this creates a new "alignment" between the tuples, exhausting all possible O(n^2) combinations
106 // while allowing us to use vectorized execution for every step
107 idx_t result_count = 0;
108 do {
109 if (state->fill_in_rhs) {
110 throw NotImplementedException("FIXME: full outer join");
111 }
112 if (state->left_position >= state->child_chunk.size()) {
113 // exhausted LHS, have to pull new LHS chunk
114 if (!state->checked_found_match && state->lhs_found_match) {
115 // LEFT OUTER JOIN or FULL OUTER JOIN, first check if we need to create extra results because of
116 // non-matching tuples
117 SelectionVector sel(STANDARD_VECTOR_SIZE);
118 for (idx_t i = 0; i < state->child_chunk.size(); i++) {
119 if (!state->lhs_found_match[i]) {
120 sel.set_index(result_count++, i);
121 }
122 }
123 if (result_count > 0) {
124 // have to create the chunk, set the selection vector and count
125 // for the LHS, reference the child_chunk and set the sel_vector and count
126 chunk.Slice(state->child_chunk, sel, result_count);
127 // for the RHS, set the mask to NULL and set the sel_vector and count
128 for (idx_t i = state->child_chunk.column_count(); i < chunk.column_count(); i++) {
129 chunk.data[i].vector_type = VectorType::CONSTANT_VECTOR;
130 ConstantVector::SetNull(chunk.data[i], true);
131 }
132 state->checked_found_match = true;
133 return;
134 }
135 }
136 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
137 // no more data on LHS, if FULL OUTER JOIN iterate over RHS
138 if (state->child_chunk.size() == 0) {
139 if (type == JoinType::OUTER) {
140 state->fill_in_rhs = true;
141 continue;
142 } else {
143 return;
144 }
145 }
146 state->child_chunk.Normalify();
147 state->left_position = 0;
148 state->right_position = 0;
149 if (state->lhs_found_match) {
150 state->checked_found_match = false;
151 memset(state->lhs_found_match.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE);
152 }
153 }
154 auto &lchunk = state->child_chunk;
155 auto &rchunk = *state->right_chunks.chunks[state->right_position];
156
157 // fill in the current element of the LHS into the chunk
158 assert(chunk.column_count() == lchunk.column_count() + rchunk.column_count());
159 for (idx_t i = 0; i < lchunk.column_count(); i++) {
160 auto lvalue = lchunk.GetValue(i, state->left_position);
161 chunk.data[i].Reference(lvalue);
162 }
163 // for the RHS we just reference the entire vector
164 for (idx_t i = 0; i < rchunk.column_count(); i++) {
165 chunk.data[lchunk.column_count() + i].Reference(rchunk.data[i]);
166 }
167 chunk.SetCardinality(rchunk.size());
168
169 // now perform the computation
170 SelectionVector match_sel(STANDARD_VECTOR_SIZE);
171 result_count = state->executor.SelectExpression(chunk, match_sel);
172 if (result_count > 0) {
173 // found a match!
174 // set the match flags in the LHS
175 if (state->lhs_found_match) {
176 state->lhs_found_match[state->left_position] = true;
177 }
178 chunk.Slice(match_sel, result_count);
179
180 // set the match flags in the RHS
181 if (state->rhs_found_match) {
182 for (idx_t i = 0; i < result_count; i++) {
183 auto idx = match_sel.get_index(i);
184 state->rhs_found_match[state->right_position * STANDARD_VECTOR_SIZE + idx] = true;
185 }
186 }
187 } else {
188 // no result: reset the chunk
189 chunk.Reset();
190 }
191 // move to the next tuple on the LHS
192 state->left_position++;
193 if (state->left_position >= state->child_chunk.size()) {
194 // exhausted the current chunk, move to the next RHS chunk
195 state->right_position++;
196 if (state->right_position < state->right_chunks.chunks.size()) {
197 // we still have chunks left! start over on the LHS
198 state->left_position = 0;
199 }
200 }
201 } while (result_count == 0);
202}
203
204unique_ptr<PhysicalOperatorState> PhysicalBlockwiseNLJoin::GetOperatorState() {
205 return make_unique<PhysicalBlockwiseNLJoinState>(children[0].get(), children[1].get(), *condition);
206}
207
208string PhysicalBlockwiseNLJoin::ExtraRenderInformation() const {
209 string extra_info = JoinTypeToString(type) + "\n";
210 extra_info += condition->GetName();
211 return extra_info;
212}
213