| 1 | #include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp" |
| 2 | |
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 4 | #include "duckdb/execution/expression_executor.hpp" |
| 5 | #include "duckdb/execution/merge_join.hpp" |
| 6 | #include "duckdb/common/operator/comparison_operators.hpp" |
| 7 | |
| 8 | using namespace duckdb; |
| 9 | using namespace std; |
| 10 | |
| 11 | static void OrderVector(Vector &vector, idx_t count, MergeOrder &order); |
| 12 | |
| 13 | class PhysicalPiecewiseMergeJoinState : public PhysicalComparisonJoinState { |
| 14 | public: |
| 15 | PhysicalPiecewiseMergeJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions) |
| 16 | : PhysicalComparisonJoinState(left, right, conditions), initialized(false), left_position(0), right_position(0), |
| 17 | right_chunk_index(0), has_null(false) { |
| 18 | } |
| 19 | |
| 20 | bool initialized; |
| 21 | idx_t left_position; |
| 22 | idx_t right_position; |
| 23 | idx_t right_chunk_index; |
| 24 | DataChunk left_chunk; |
| 25 | DataChunk join_keys; |
| 26 | MergeOrder left_orders; |
| 27 | ChunkCollection right_chunks; |
| 28 | ChunkCollection right_conditions; |
| 29 | vector<MergeOrder> right_orders; |
| 30 | bool has_null; |
| 31 | }; |
| 32 | |
| 33 | PhysicalPiecewiseMergeJoin::PhysicalPiecewiseMergeJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
| 34 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, |
| 35 | JoinType join_type) |
| 36 | : PhysicalComparisonJoin(op, PhysicalOperatorType::PIECEWISE_MERGE_JOIN, move(cond), join_type) { |
| 37 | // for now we only support one condition! |
| 38 | assert(conditions.size() == 1); |
| 39 | for (auto &cond : conditions) { |
| 40 | // COMPARE NOT EQUAL not supported yet with merge join |
| 41 | assert(cond.comparison != ExpressionType::COMPARE_NOTEQUAL); |
| 42 | assert(cond.left->return_type == cond.right->return_type); |
| 43 | join_key_types.push_back(cond.left->return_type); |
| 44 | } |
| 45 | children.push_back(move(left)); |
| 46 | children.push_back(move(right)); |
| 47 | } |
| 48 | |
| 49 | void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, |
| 50 | PhysicalOperatorState *state_) { |
| 51 | auto state = reinterpret_cast<PhysicalPiecewiseMergeJoinState *>(state_); |
| 52 | assert(conditions.size() == 1); |
| 53 | if (!state->initialized) { |
| 54 | // create the sorted pieces |
| 55 | auto right_state = children[1]->GetOperatorState(); |
| 56 | auto types = children[1]->GetTypes(); |
| 57 | |
| 58 | DataChunk right_chunk; |
| 59 | right_chunk.Initialize(types); |
| 60 | state->join_keys.Initialize(join_key_types); |
| 61 | // first fetch the entire right side |
| 62 | while (true) { |
| 63 | children[1]->GetChunk(context, right_chunk, right_state.get()); |
| 64 | if (right_chunk.size() == 0) { |
| 65 | break; |
| 66 | } |
| 67 | // resolve the join keys for this chunk |
| 68 | state->rhs_executor.SetChunk(right_chunk); |
| 69 | |
| 70 | state->join_keys.Reset(); |
| 71 | state->join_keys.SetCardinality(right_chunk); |
| 72 | for (idx_t k = 0; k < conditions.size(); k++) { |
| 73 | // resolve the join key |
| 74 | state->rhs_executor.ExecuteExpression(k, state->join_keys.data[k]); |
| 75 | } |
| 76 | // append the join keys and the chunk to the chunk collection |
| 77 | state->right_chunks.Append(right_chunk); |
| 78 | state->right_conditions.Append(state->join_keys); |
| 79 | } |
| 80 | if (state->right_chunks.count == 0 && (type == JoinType::INNER || type == JoinType::SEMI)) { |
| 81 | // empty RHS with INNER or SEMI join means empty result set |
| 82 | return; |
| 83 | } |
| 84 | // now order all the chunks |
| 85 | state->right_orders.resize(state->right_conditions.chunks.size()); |
| 86 | for (idx_t i = 0; i < state->right_conditions.chunks.size(); i++) { |
| 87 | auto &chunk_to_order = *state->right_conditions.chunks[i]; |
| 88 | assert(chunk_to_order.column_count() == 1); |
| 89 | for (idx_t col_idx = 0; col_idx < chunk_to_order.column_count(); col_idx++) { |
| 90 | OrderVector(chunk_to_order.data[col_idx], chunk_to_order.size(), state->right_orders[i]); |
| 91 | if (state->right_orders[i].count < chunk_to_order.size()) { |
| 92 | // the amount of entries in the order vector is smaller than the amount of entries in the vector |
| 93 | // this only happens if there are NULL values in the right-hand side |
| 94 | // hence we set the has_null to true (this is required for the MARK join) |
| 95 | state->has_null = true; |
| 96 | } |
| 97 | } |
| 98 | } |
| 99 | state->right_chunk_index = state->right_orders.size(); |
| 100 | state->initialized = true; |
| 101 | } |
| 102 | |
| 103 | do { |
| 104 | // check if we have to fetch a child from the left side |
| 105 | if (state->right_chunk_index == state->right_orders.size()) { |
| 106 | // fetch the chunk from the left side |
| 107 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
| 108 | if (state->child_chunk.size() == 0) { |
| 109 | return; |
| 110 | } |
| 111 | |
| 112 | // resolve the join keys for the left chunk |
| 113 | state->join_keys.Reset(); |
| 114 | state->lhs_executor.SetChunk(state->child_chunk); |
| 115 | state->join_keys.SetCardinality(state->child_chunk); |
| 116 | for (idx_t k = 0; k < conditions.size(); k++) { |
| 117 | state->lhs_executor.ExecuteExpression(k, state->join_keys.data[k]); |
| 118 | // sort by join key |
| 119 | OrderVector(state->join_keys.data[k], state->join_keys.size(), state->left_orders); |
| 120 | } |
| 121 | state->right_chunk_index = 0; |
| 122 | state->left_position = 0; |
| 123 | state->right_position = 0; |
| 124 | } |
| 125 | |
| 126 | ScalarMergeInfo left_info(state->left_orders, state->join_keys.data[0].type, state->left_position); |
| 127 | |
| 128 | // first check if the join type is MARK, SEMI or ANTI |
| 129 | // in this case we loop over the entire right collection immediately |
| 130 | // because we can never return more than STANDARD_VECTOR_SIZE rows from a join |
| 131 | switch (type) { |
| 132 | case JoinType::MARK: { |
| 133 | // MARK join |
| 134 | if (state->right_chunks.count > 0) { |
| 135 | ChunkMergeInfo right_info(state->right_conditions, state->right_orders); |
| 136 | // first perform the MARK join |
| 137 | // this method uses the LHS to loop over the entire RHS looking for matches |
| 138 | MergeJoinMark::Perform(left_info, right_info, conditions[0].comparison); |
| 139 | // now construct the mark join result from the found matches |
| 140 | PhysicalJoin::ConstructMarkJoinResult(state->join_keys, state->child_chunk, chunk, |
| 141 | right_info.found_match, state->has_null); |
| 142 | } else { |
| 143 | // RHS empty: result is false for everything |
| 144 | chunk.Reference(state->child_chunk); |
| 145 | auto &mark_vector = chunk.data.back(); |
| 146 | mark_vector.vector_type = VectorType::CONSTANT_VECTOR; |
| 147 | mark_vector.SetValue(0, Value::BOOLEAN(false)); |
| 148 | } |
| 149 | state->right_chunk_index = state->right_orders.size(); |
| 150 | return; |
| 151 | } |
| 152 | default: |
| 153 | // INNER, LEFT OUTER, etc... join that can return >STANDARD_VECTOR_SIZE entries |
| 154 | break; |
| 155 | } |
| 156 | |
| 157 | // perform the actual merge join |
| 158 | auto &right_chunk = *state->right_chunks.chunks[state->right_chunk_index]; |
| 159 | auto &right_condition_chunk = *state->right_conditions.chunks[state->right_chunk_index]; |
| 160 | auto &right_orders = state->right_orders[state->right_chunk_index]; |
| 161 | |
| 162 | ScalarMergeInfo right(right_orders, right_condition_chunk.data[0].type, state->right_position); |
| 163 | // perform the merge join |
| 164 | switch (type) { |
| 165 | case JoinType::INNER: { |
| 166 | idx_t result_count = MergeJoinInner::Perform(left_info, right, conditions[0].comparison); |
| 167 | if (result_count == 0) { |
| 168 | // exhausted this chunk on the right side |
| 169 | // move to the next |
| 170 | state->right_chunk_index++; |
| 171 | state->left_position = 0; |
| 172 | state->right_position = 0; |
| 173 | } else { |
| 174 | chunk.Slice(state->child_chunk, left_info.result, result_count); |
| 175 | chunk.Slice(right_chunk, right.result, result_count, state->child_chunk.column_count()); |
| 176 | } |
| 177 | break; |
| 178 | } |
| 179 | default: |
| 180 | throw NotImplementedException("Unimplemented join type for merge join" ); |
| 181 | } |
| 182 | } while (chunk.size() == 0); |
| 183 | } |
| 184 | |
| 185 | unique_ptr<PhysicalOperatorState> PhysicalPiecewiseMergeJoin::GetOperatorState() { |
| 186 | return make_unique<PhysicalPiecewiseMergeJoinState>(children[0].get(), children[1].get(), conditions); |
| 187 | } |
| 188 | |
| 189 | template <class T, class OP> |
| 190 | static sel_t templated_quicksort_initial(T *data, const SelectionVector &sel, const SelectionVector ¬_null_sel, |
| 191 | idx_t count, SelectionVector &result) { |
| 192 | // select pivot |
| 193 | auto pivot_idx = not_null_sel.get_index(0); |
| 194 | auto dpivot_idx = sel.get_index(pivot_idx); |
| 195 | sel_t low = 0, high = count - 1; |
| 196 | // now insert elements |
| 197 | for (idx_t i = 1; i < count; i++) { |
| 198 | auto idx = not_null_sel.get_index(i); |
| 199 | auto didx = sel.get_index(idx); |
| 200 | if (OP::Operation(data[didx], data[dpivot_idx])) { |
| 201 | result.set_index(low++, idx); |
| 202 | } else { |
| 203 | result.set_index(high--, idx); |
| 204 | } |
| 205 | } |
| 206 | assert(low == high); |
| 207 | result.set_index(low, pivot_idx); |
| 208 | return low; |
| 209 | } |
| 210 | |
| 211 | template <class T, class OP> |
| 212 | static void templated_quicksort_inplace(T *data, const SelectionVector &sel, idx_t count, SelectionVector &result, |
| 213 | sel_t left, sel_t right) { |
| 214 | if (left >= right) { |
| 215 | return; |
| 216 | } |
| 217 | |
| 218 | sel_t middle = left + (right - left) / 2; |
| 219 | sel_t dpivot_idx = sel.get_index(result.get_index(middle)); |
| 220 | |
| 221 | // move the mid point value to the front. |
| 222 | sel_t i = left + 1; |
| 223 | sel_t j = right; |
| 224 | |
| 225 | result.swap(middle, left); |
| 226 | while (i <= j) { |
| 227 | while (i <= j && (OP::Operation(data[sel.get_index(result.get_index(i))], data[dpivot_idx]))) { |
| 228 | i++; |
| 229 | } |
| 230 | |
| 231 | while (i <= j && !OP::Operation(data[sel.get_index(result.get_index(j))], data[dpivot_idx])) { |
| 232 | j--; |
| 233 | } |
| 234 | |
| 235 | if (i < j) { |
| 236 | result.swap(i, j); |
| 237 | } |
| 238 | } |
| 239 | result.swap(i - 1, left); |
| 240 | sel_t part = i - 1; |
| 241 | |
| 242 | if (part > 0) { |
| 243 | templated_quicksort_inplace<T, OP>(data, sel, count, result, left, part - 1); |
| 244 | } |
| 245 | templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, right); |
| 246 | } |
| 247 | |
| 248 | template <class T, class OP> |
| 249 | void templated_quicksort(T *__restrict data, const SelectionVector &sel, const SelectionVector ¬_null_sel, |
| 250 | idx_t count, SelectionVector &result) { |
| 251 | auto part = templated_quicksort_initial<T, OP>(data, sel, not_null_sel, count, result); |
| 252 | if (part > count) { |
| 253 | return; |
| 254 | } |
| 255 | templated_quicksort_inplace<T, OP>(data, sel, count, result, 0, part); |
| 256 | templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, count - 1); |
| 257 | } |
| 258 | |
| 259 | template <class T> |
| 260 | static void templated_quicksort(VectorData &vdata, const SelectionVector ¬_null_sel, idx_t not_null_count, |
| 261 | SelectionVector &result) { |
| 262 | if (not_null_count == 0) { |
| 263 | return; |
| 264 | } |
| 265 | templated_quicksort<T, duckdb::LessThanEquals>((T *)vdata.data, *vdata.sel, not_null_sel, not_null_count, result); |
| 266 | } |
| 267 | |
| 268 | void OrderVector(Vector &vector, idx_t count, MergeOrder &order) { |
| 269 | if (count == 0) { |
| 270 | order.count = 0; |
| 271 | return; |
| 272 | } |
| 273 | vector.Orrify(count, order.vdata); |
| 274 | auto &vdata = order.vdata; |
| 275 | |
| 276 | // first filter out all the non-null values |
| 277 | SelectionVector not_null(STANDARD_VECTOR_SIZE); |
| 278 | idx_t not_null_count = 0; |
| 279 | for (idx_t i = 0; i < count; i++) { |
| 280 | auto idx = vdata.sel->get_index(i); |
| 281 | if (!(*vdata.nullmask)[idx]) { |
| 282 | not_null.set_index(not_null_count++, i); |
| 283 | } |
| 284 | } |
| 285 | |
| 286 | order.count = not_null_count; |
| 287 | order.order.Initialize(STANDARD_VECTOR_SIZE); |
| 288 | switch (vector.type) { |
| 289 | case TypeId::BOOL: |
| 290 | case TypeId::INT8: |
| 291 | templated_quicksort<int8_t>(vdata, not_null, not_null_count, order.order); |
| 292 | break; |
| 293 | case TypeId::INT16: |
| 294 | templated_quicksort<int16_t>(vdata, not_null, not_null_count, order.order); |
| 295 | break; |
| 296 | case TypeId::INT32: |
| 297 | templated_quicksort<int32_t>(vdata, not_null, not_null_count, order.order); |
| 298 | break; |
| 299 | case TypeId::INT64: |
| 300 | templated_quicksort<int64_t>(vdata, not_null, not_null_count, order.order); |
| 301 | break; |
| 302 | case TypeId::FLOAT: |
| 303 | templated_quicksort<float>(vdata, not_null, not_null_count, order.order); |
| 304 | break; |
| 305 | case TypeId::DOUBLE: |
| 306 | templated_quicksort<double>(vdata, not_null, not_null_count, order.order); |
| 307 | break; |
| 308 | case TypeId::VARCHAR: |
| 309 | templated_quicksort<string_t>(vdata, not_null, not_null_count, order.order); |
| 310 | break; |
| 311 | default: |
| 312 | throw NotImplementedException("Unimplemented type for sort" ); |
| 313 | } |
| 314 | } |
| 315 | |