| 1 | #include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp" |
| 2 | |
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 4 | #include "duckdb/execution/expression_executor.hpp" |
| 5 | |
| 6 | using namespace duckdb; |
| 7 | using namespace std; |
| 8 | |
| 9 | class PhysicalBlockwiseNLJoinState : public PhysicalOperatorState { |
| 10 | public: |
| 11 | PhysicalBlockwiseNLJoinState(PhysicalOperator *left, PhysicalOperator *right, Expression &condition) |
| 12 | : PhysicalOperatorState(left), left_position(0), right_position(0), fill_in_rhs(false), |
| 13 | checked_found_match(false), executor(condition) { |
| 14 | assert(left && right); |
| 15 | } |
| 16 | |
| 17 | //! Whether or not a tuple on the LHS has found a match, only used for LEFT OUTER and FULL OUTER joins |
| 18 | unique_ptr<bool[]> lhs_found_match; |
| 19 | //! Whether or not a tuple on the RHS has found a match, only used for FULL OUTER joins |
| 20 | unique_ptr<bool[]> rhs_found_match; |
| 21 | ChunkCollection right_chunks; |
| 22 | idx_t left_position; |
| 23 | idx_t right_position; |
| 24 | bool fill_in_rhs; |
| 25 | bool checked_found_match; |
| 26 | ExpressionExecutor executor; |
| 27 | }; |
| 28 | |
| 29 | PhysicalBlockwiseNLJoin::PhysicalBlockwiseNLJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
| 30 | unique_ptr<PhysicalOperator> right, unique_ptr<Expression> condition, |
| 31 | JoinType join_type) |
| 32 | : PhysicalJoin(op, PhysicalOperatorType::BLOCKWISE_NL_JOIN, join_type), condition(move(condition)) { |
| 33 | children.push_back(move(left)); |
| 34 | children.push_back(move(right)); |
| 35 | // MARK, SINGLE and RIGHT OUTER joins not handled |
| 36 | assert(join_type != JoinType::MARK); |
| 37 | assert(join_type != JoinType::RIGHT); |
| 38 | assert(join_type != JoinType::SINGLE); |
| 39 | } |
| 40 | |
| 41 | void PhysicalBlockwiseNLJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, |
| 42 | PhysicalOperatorState *state_) { |
| 43 | auto state = reinterpret_cast<PhysicalBlockwiseNLJoinState *>(state_); |
| 44 | |
| 45 | // first we fully materialize the right child, if we haven't done that yet |
| 46 | if (state->right_chunks.column_count() == 0) { |
| 47 | auto right_state = children[1]->GetOperatorState(); |
| 48 | auto left_types = children[0]->GetTypes(); |
| 49 | auto right_types = children[1]->GetTypes(); |
| 50 | |
| 51 | DataChunk right_chunk; |
| 52 | right_chunk.Initialize(right_types); |
| 53 | while (true) { |
| 54 | children[1]->GetChunk(context, right_chunk, right_state.get()); |
| 55 | if (right_chunk.size() == 0) { |
| 56 | break; |
| 57 | } |
| 58 | state->right_chunks.Append(right_chunk); |
| 59 | } |
| 60 | |
| 61 | if (state->right_chunks.count == 0) { |
| 62 | if ((type == JoinType::INNER || type == JoinType::SEMI)) { |
| 63 | // empty RHS with INNER or SEMI join means empty result set |
| 64 | return; |
| 65 | } |
| 66 | } |
| 67 | // initialize the found_match vectors for the left and right sides |
| 68 | if (type == JoinType::LEFT || type == JoinType::OUTER) { |
| 69 | state->lhs_found_match = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]); |
| 70 | } |
| 71 | if (type == JoinType::OUTER) { |
| 72 | state->rhs_found_match = unique_ptr<bool[]>(new bool[state->right_chunks.count]); |
| 73 | memset(state->rhs_found_match.get(), 0, sizeof(bool) * state->right_chunks.count); |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | if (state->right_chunks.count == 0) { |
| 78 | // empty join |
| 79 | assert(type == JoinType::LEFT || type == JoinType::OUTER || type == JoinType::ANTI); |
| 80 | // pull a chunk from the LHS |
| 81 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
| 82 | if (state->child_chunk.size() == 0) { |
| 83 | return; |
| 84 | } |
| 85 | // fill in the data from the chunk |
| 86 | idx_t i; |
| 87 | for (i = 0; i < state->child_chunk.column_count(); i++) { |
| 88 | chunk.data[i].Reference(state->child_chunk.data[i]); |
| 89 | } |
| 90 | chunk.SetCardinality(state->child_chunk.size()); |
| 91 | if (type == JoinType::LEFT || type == JoinType::OUTER) { |
| 92 | // LEFT OUTER or FULL OUTER join with empty RHS |
| 93 | // fill any columns from the RHS with NULLs |
| 94 | for (; i < chunk.column_count(); i++) { |
| 95 | chunk.data[i].vector_type = VectorType::CONSTANT_VECTOR; |
| 96 | ConstantVector::SetNull(chunk.data[i], true); |
| 97 | } |
| 98 | } |
| 99 | return; |
| 100 | } |
| 101 | |
| 102 | // now perform the actual join |
| 103 | // we construct a combined DataChunk by referencing the LHS and the RHS |
| 104 | // every step that we do not have output results we shift the vectors of the RHS one up or down |
| 105 | // this creates a new "alignment" between the tuples, exhausting all possible O(n^2) combinations |
| 106 | // while allowing us to use vectorized execution for every step |
| 107 | idx_t result_count = 0; |
| 108 | do { |
| 109 | if (state->fill_in_rhs) { |
| 110 | throw NotImplementedException("FIXME: full outer join" ); |
| 111 | } |
| 112 | if (state->left_position >= state->child_chunk.size()) { |
| 113 | // exhausted LHS, have to pull new LHS chunk |
| 114 | if (!state->checked_found_match && state->lhs_found_match) { |
| 115 | // LEFT OUTER JOIN or FULL OUTER JOIN, first check if we need to create extra results because of |
| 116 | // non-matching tuples |
| 117 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
| 118 | for (idx_t i = 0; i < state->child_chunk.size(); i++) { |
| 119 | if (!state->lhs_found_match[i]) { |
| 120 | sel.set_index(result_count++, i); |
| 121 | } |
| 122 | } |
| 123 | if (result_count > 0) { |
| 124 | // have to create the chunk, set the selection vector and count |
| 125 | // for the LHS, reference the child_chunk and set the sel_vector and count |
| 126 | chunk.Slice(state->child_chunk, sel, result_count); |
| 127 | // for the RHS, set the mask to NULL and set the sel_vector and count |
| 128 | for (idx_t i = state->child_chunk.column_count(); i < chunk.column_count(); i++) { |
| 129 | chunk.data[i].vector_type = VectorType::CONSTANT_VECTOR; |
| 130 | ConstantVector::SetNull(chunk.data[i], true); |
| 131 | } |
| 132 | state->checked_found_match = true; |
| 133 | return; |
| 134 | } |
| 135 | } |
| 136 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
| 137 | // no more data on LHS, if FULL OUTER JOIN iterate over RHS |
| 138 | if (state->child_chunk.size() == 0) { |
| 139 | if (type == JoinType::OUTER) { |
| 140 | state->fill_in_rhs = true; |
| 141 | continue; |
| 142 | } else { |
| 143 | return; |
| 144 | } |
| 145 | } |
| 146 | state->child_chunk.Normalify(); |
| 147 | state->left_position = 0; |
| 148 | state->right_position = 0; |
| 149 | if (state->lhs_found_match) { |
| 150 | state->checked_found_match = false; |
| 151 | memset(state->lhs_found_match.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE); |
| 152 | } |
| 153 | } |
| 154 | auto &lchunk = state->child_chunk; |
| 155 | auto &rchunk = *state->right_chunks.chunks[state->right_position]; |
| 156 | |
| 157 | // fill in the current element of the LHS into the chunk |
| 158 | assert(chunk.column_count() == lchunk.column_count() + rchunk.column_count()); |
| 159 | for (idx_t i = 0; i < lchunk.column_count(); i++) { |
| 160 | auto lvalue = lchunk.GetValue(i, state->left_position); |
| 161 | chunk.data[i].Reference(lvalue); |
| 162 | } |
| 163 | // for the RHS we just reference the entire vector |
| 164 | for (idx_t i = 0; i < rchunk.column_count(); i++) { |
| 165 | chunk.data[lchunk.column_count() + i].Reference(rchunk.data[i]); |
| 166 | } |
| 167 | chunk.SetCardinality(rchunk.size()); |
| 168 | |
| 169 | // now perform the computation |
| 170 | SelectionVector match_sel(STANDARD_VECTOR_SIZE); |
| 171 | result_count = state->executor.SelectExpression(chunk, match_sel); |
| 172 | if (result_count > 0) { |
| 173 | // found a match! |
| 174 | // set the match flags in the LHS |
| 175 | if (state->lhs_found_match) { |
| 176 | state->lhs_found_match[state->left_position] = true; |
| 177 | } |
| 178 | chunk.Slice(match_sel, result_count); |
| 179 | |
| 180 | // set the match flags in the RHS |
| 181 | if (state->rhs_found_match) { |
| 182 | for (idx_t i = 0; i < result_count; i++) { |
| 183 | auto idx = match_sel.get_index(i); |
| 184 | state->rhs_found_match[state->right_position * STANDARD_VECTOR_SIZE + idx] = true; |
| 185 | } |
| 186 | } |
| 187 | } else { |
| 188 | // no result: reset the chunk |
| 189 | chunk.Reset(); |
| 190 | } |
| 191 | // move to the next tuple on the LHS |
| 192 | state->left_position++; |
| 193 | if (state->left_position >= state->child_chunk.size()) { |
| 194 | // exhausted the current chunk, move to the next RHS chunk |
| 195 | state->right_position++; |
| 196 | if (state->right_position < state->right_chunks.chunks.size()) { |
| 197 | // we still have chunks left! start over on the LHS |
| 198 | state->left_position = 0; |
| 199 | } |
| 200 | } |
| 201 | } while (result_count == 0); |
| 202 | } |
| 203 | |
| 204 | unique_ptr<PhysicalOperatorState> PhysicalBlockwiseNLJoin::GetOperatorState() { |
| 205 | return make_unique<PhysicalBlockwiseNLJoinState>(children[0].get(), children[1].get(), *condition); |
| 206 | } |
| 207 | |
| 208 | string PhysicalBlockwiseNLJoin::() const { |
| 209 | string = JoinTypeToString(type) + "\n" ; |
| 210 | extra_info += condition->GetName(); |
| 211 | return extra_info; |
| 212 | } |
| 213 | |