| 1 | #include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp" | 
|---|
| 2 |  | 
|---|
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" | 
|---|
| 4 | #include "duckdb/execution/expression_executor.hpp" | 
|---|
| 5 | #include "duckdb/execution/merge_join.hpp" | 
|---|
| 6 | #include "duckdb/common/operator/comparison_operators.hpp" | 
|---|
| 7 |  | 
|---|
| 8 | using namespace duckdb; | 
|---|
| 9 | using namespace std; | 
|---|
| 10 |  | 
|---|
| 11 | static void OrderVector(Vector &vector, idx_t count, MergeOrder &order); | 
|---|
| 12 |  | 
|---|
| 13 | class PhysicalPiecewiseMergeJoinState : public PhysicalComparisonJoinState { | 
|---|
| 14 | public: | 
|---|
| 15 | PhysicalPiecewiseMergeJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions) | 
|---|
| 16 | : PhysicalComparisonJoinState(left, right, conditions), initialized(false), left_position(0), right_position(0), | 
|---|
| 17 | right_chunk_index(0), has_null(false) { | 
|---|
| 18 | } | 
|---|
| 19 |  | 
|---|
| 20 | bool initialized; | 
|---|
| 21 | idx_t left_position; | 
|---|
| 22 | idx_t right_position; | 
|---|
| 23 | idx_t right_chunk_index; | 
|---|
| 24 | DataChunk left_chunk; | 
|---|
| 25 | DataChunk join_keys; | 
|---|
| 26 | MergeOrder left_orders; | 
|---|
| 27 | ChunkCollection right_chunks; | 
|---|
| 28 | ChunkCollection right_conditions; | 
|---|
| 29 | vector<MergeOrder> right_orders; | 
|---|
| 30 | bool has_null; | 
|---|
| 31 | }; | 
|---|
| 32 |  | 
|---|
| 33 | PhysicalPiecewiseMergeJoin::PhysicalPiecewiseMergeJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, | 
|---|
| 34 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, | 
|---|
| 35 | JoinType join_type) | 
|---|
| 36 | : PhysicalComparisonJoin(op, PhysicalOperatorType::PIECEWISE_MERGE_JOIN, move(cond), join_type) { | 
|---|
| 37 | // for now we only support one condition! | 
|---|
| 38 | assert(conditions.size() == 1); | 
|---|
| 39 | for (auto &cond : conditions) { | 
|---|
| 40 | // COMPARE NOT EQUAL not supported yet with merge join | 
|---|
| 41 | assert(cond.comparison != ExpressionType::COMPARE_NOTEQUAL); | 
|---|
| 42 | assert(cond.left->return_type == cond.right->return_type); | 
|---|
| 43 | join_key_types.push_back(cond.left->return_type); | 
|---|
| 44 | } | 
|---|
| 45 | children.push_back(move(left)); | 
|---|
| 46 | children.push_back(move(right)); | 
|---|
| 47 | } | 
|---|
| 48 |  | 
|---|
| 49 | void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, | 
|---|
| 50 | PhysicalOperatorState *state_) { | 
|---|
| 51 | auto state = reinterpret_cast<PhysicalPiecewiseMergeJoinState *>(state_); | 
|---|
| 52 | assert(conditions.size() == 1); | 
|---|
| 53 | if (!state->initialized) { | 
|---|
| 54 | // create the sorted pieces | 
|---|
| 55 | auto right_state = children[1]->GetOperatorState(); | 
|---|
| 56 | auto types = children[1]->GetTypes(); | 
|---|
| 57 |  | 
|---|
| 58 | DataChunk right_chunk; | 
|---|
| 59 | right_chunk.Initialize(types); | 
|---|
| 60 | state->join_keys.Initialize(join_key_types); | 
|---|
| 61 | // first fetch the entire right side | 
|---|
| 62 | while (true) { | 
|---|
| 63 | children[1]->GetChunk(context, right_chunk, right_state.get()); | 
|---|
| 64 | if (right_chunk.size() == 0) { | 
|---|
| 65 | break; | 
|---|
| 66 | } | 
|---|
| 67 | // resolve the join keys for this chunk | 
|---|
| 68 | state->rhs_executor.SetChunk(right_chunk); | 
|---|
| 69 |  | 
|---|
| 70 | state->join_keys.Reset(); | 
|---|
| 71 | state->join_keys.SetCardinality(right_chunk); | 
|---|
| 72 | for (idx_t k = 0; k < conditions.size(); k++) { | 
|---|
| 73 | // resolve the join key | 
|---|
| 74 | state->rhs_executor.ExecuteExpression(k, state->join_keys.data[k]); | 
|---|
| 75 | } | 
|---|
| 76 | // append the join keys and the chunk to the chunk collection | 
|---|
| 77 | state->right_chunks.Append(right_chunk); | 
|---|
| 78 | state->right_conditions.Append(state->join_keys); | 
|---|
| 79 | } | 
|---|
| 80 | if (state->right_chunks.count == 0 && (type == JoinType::INNER || type == JoinType::SEMI)) { | 
|---|
| 81 | // empty RHS with INNER or SEMI join means empty result set | 
|---|
| 82 | return; | 
|---|
| 83 | } | 
|---|
| 84 | // now order all the chunks | 
|---|
| 85 | state->right_orders.resize(state->right_conditions.chunks.size()); | 
|---|
| 86 | for (idx_t i = 0; i < state->right_conditions.chunks.size(); i++) { | 
|---|
| 87 | auto &chunk_to_order = *state->right_conditions.chunks[i]; | 
|---|
| 88 | assert(chunk_to_order.column_count() == 1); | 
|---|
| 89 | for (idx_t col_idx = 0; col_idx < chunk_to_order.column_count(); col_idx++) { | 
|---|
| 90 | OrderVector(chunk_to_order.data[col_idx], chunk_to_order.size(), state->right_orders[i]); | 
|---|
| 91 | if (state->right_orders[i].count < chunk_to_order.size()) { | 
|---|
| 92 | // the amount of entries in the order vector is smaller than the amount of entries in the vector | 
|---|
| 93 | // this only happens if there are NULL values in the right-hand side | 
|---|
| 94 | // hence we set the has_null to true (this is required for the MARK join) | 
|---|
| 95 | state->has_null = true; | 
|---|
| 96 | } | 
|---|
| 97 | } | 
|---|
| 98 | } | 
|---|
| 99 | state->right_chunk_index = state->right_orders.size(); | 
|---|
| 100 | state->initialized = true; | 
|---|
| 101 | } | 
|---|
| 102 |  | 
|---|
| 103 | do { | 
|---|
| 104 | // check if we have to fetch a child from the left side | 
|---|
| 105 | if (state->right_chunk_index == state->right_orders.size()) { | 
|---|
| 106 | // fetch the chunk from the left side | 
|---|
| 107 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); | 
|---|
| 108 | if (state->child_chunk.size() == 0) { | 
|---|
| 109 | return; | 
|---|
| 110 | } | 
|---|
| 111 |  | 
|---|
| 112 | // resolve the join keys for the left chunk | 
|---|
| 113 | state->join_keys.Reset(); | 
|---|
| 114 | state->lhs_executor.SetChunk(state->child_chunk); | 
|---|
| 115 | state->join_keys.SetCardinality(state->child_chunk); | 
|---|
| 116 | for (idx_t k = 0; k < conditions.size(); k++) { | 
|---|
| 117 | state->lhs_executor.ExecuteExpression(k, state->join_keys.data[k]); | 
|---|
| 118 | // sort by join key | 
|---|
| 119 | OrderVector(state->join_keys.data[k], state->join_keys.size(), state->left_orders); | 
|---|
| 120 | } | 
|---|
| 121 | state->right_chunk_index = 0; | 
|---|
| 122 | state->left_position = 0; | 
|---|
| 123 | state->right_position = 0; | 
|---|
| 124 | } | 
|---|
| 125 |  | 
|---|
| 126 | ScalarMergeInfo left_info(state->left_orders, state->join_keys.data[0].type, state->left_position); | 
|---|
| 127 |  | 
|---|
| 128 | // first check if the join type is MARK, SEMI or ANTI | 
|---|
| 129 | // in this case we loop over the entire right collection immediately | 
|---|
| 130 | // because we can never return more than STANDARD_VECTOR_SIZE rows from a join | 
|---|
| 131 | switch (type) { | 
|---|
| 132 | case JoinType::MARK: { | 
|---|
| 133 | // MARK join | 
|---|
| 134 | if (state->right_chunks.count > 0) { | 
|---|
| 135 | ChunkMergeInfo right_info(state->right_conditions, state->right_orders); | 
|---|
| 136 | // first perform the MARK join | 
|---|
| 137 | // this method uses the LHS to loop over the entire RHS looking for matches | 
|---|
| 138 | MergeJoinMark::Perform(left_info, right_info, conditions[0].comparison); | 
|---|
| 139 | // now construct the mark join result from the found matches | 
|---|
| 140 | PhysicalJoin::ConstructMarkJoinResult(state->join_keys, state->child_chunk, chunk, | 
|---|
| 141 | right_info.found_match, state->has_null); | 
|---|
| 142 | } else { | 
|---|
| 143 | // RHS empty: result is false for everything | 
|---|
| 144 | chunk.Reference(state->child_chunk); | 
|---|
| 145 | auto &mark_vector = chunk.data.back(); | 
|---|
| 146 | mark_vector.vector_type = VectorType::CONSTANT_VECTOR; | 
|---|
| 147 | mark_vector.SetValue(0, Value::BOOLEAN(false)); | 
|---|
| 148 | } | 
|---|
| 149 | state->right_chunk_index = state->right_orders.size(); | 
|---|
| 150 | return; | 
|---|
| 151 | } | 
|---|
| 152 | default: | 
|---|
| 153 | // INNER, LEFT OUTER, etc... join that can return >STANDARD_VECTOR_SIZE entries | 
|---|
| 154 | break; | 
|---|
| 155 | } | 
|---|
| 156 |  | 
|---|
| 157 | // perform the actual merge join | 
|---|
| 158 | auto &right_chunk = *state->right_chunks.chunks[state->right_chunk_index]; | 
|---|
| 159 | auto &right_condition_chunk = *state->right_conditions.chunks[state->right_chunk_index]; | 
|---|
| 160 | auto &right_orders = state->right_orders[state->right_chunk_index]; | 
|---|
| 161 |  | 
|---|
| 162 | ScalarMergeInfo right(right_orders, right_condition_chunk.data[0].type, state->right_position); | 
|---|
| 163 | // perform the merge join | 
|---|
| 164 | switch (type) { | 
|---|
| 165 | case JoinType::INNER: { | 
|---|
| 166 | idx_t result_count = MergeJoinInner::Perform(left_info, right, conditions[0].comparison); | 
|---|
| 167 | if (result_count == 0) { | 
|---|
| 168 | // exhausted this chunk on the right side | 
|---|
| 169 | // move to the next | 
|---|
| 170 | state->right_chunk_index++; | 
|---|
| 171 | state->left_position = 0; | 
|---|
| 172 | state->right_position = 0; | 
|---|
| 173 | } else { | 
|---|
| 174 | chunk.Slice(state->child_chunk, left_info.result, result_count); | 
|---|
| 175 | chunk.Slice(right_chunk, right.result, result_count, state->child_chunk.column_count()); | 
|---|
| 176 | } | 
|---|
| 177 | break; | 
|---|
| 178 | } | 
|---|
| 179 | default: | 
|---|
| 180 | throw NotImplementedException( "Unimplemented join type for merge join"); | 
|---|
| 181 | } | 
|---|
| 182 | } while (chunk.size() == 0); | 
|---|
| 183 | } | 
|---|
| 184 |  | 
|---|
| 185 | unique_ptr<PhysicalOperatorState> PhysicalPiecewiseMergeJoin::GetOperatorState() { | 
|---|
| 186 | return make_unique<PhysicalPiecewiseMergeJoinState>(children[0].get(), children[1].get(), conditions); | 
|---|
| 187 | } | 
|---|
| 188 |  | 
|---|
| 189 | template <class T, class OP> | 
|---|
| 190 | static sel_t templated_quicksort_initial(T *data, const SelectionVector &sel, const SelectionVector ¬_null_sel, | 
|---|
| 191 | idx_t count, SelectionVector &result) { | 
|---|
| 192 | // select pivot | 
|---|
| 193 | auto pivot_idx = not_null_sel.get_index(0); | 
|---|
| 194 | auto dpivot_idx = sel.get_index(pivot_idx); | 
|---|
| 195 | sel_t low = 0, high = count - 1; | 
|---|
| 196 | // now insert elements | 
|---|
| 197 | for (idx_t i = 1; i < count; i++) { | 
|---|
| 198 | auto idx = not_null_sel.get_index(i); | 
|---|
| 199 | auto didx = sel.get_index(idx); | 
|---|
| 200 | if (OP::Operation(data[didx], data[dpivot_idx])) { | 
|---|
| 201 | result.set_index(low++, idx); | 
|---|
| 202 | } else { | 
|---|
| 203 | result.set_index(high--, idx); | 
|---|
| 204 | } | 
|---|
| 205 | } | 
|---|
| 206 | assert(low == high); | 
|---|
| 207 | result.set_index(low, pivot_idx); | 
|---|
| 208 | return low; | 
|---|
| 209 | } | 
|---|
| 210 |  | 
|---|
| 211 | template <class T, class OP> | 
|---|
| 212 | static void templated_quicksort_inplace(T *data, const SelectionVector &sel, idx_t count, SelectionVector &result, | 
|---|
| 213 | sel_t left, sel_t right) { | 
|---|
| 214 | if (left >= right) { | 
|---|
| 215 | return; | 
|---|
| 216 | } | 
|---|
| 217 |  | 
|---|
| 218 | sel_t middle = left + (right - left) / 2; | 
|---|
| 219 | sel_t dpivot_idx = sel.get_index(result.get_index(middle)); | 
|---|
| 220 |  | 
|---|
| 221 | // move the mid point value to the front. | 
|---|
| 222 | sel_t i = left + 1; | 
|---|
| 223 | sel_t j = right; | 
|---|
| 224 |  | 
|---|
| 225 | result.swap(middle, left); | 
|---|
| 226 | while (i <= j) { | 
|---|
| 227 | while (i <= j && (OP::Operation(data[sel.get_index(result.get_index(i))], data[dpivot_idx]))) { | 
|---|
| 228 | i++; | 
|---|
| 229 | } | 
|---|
| 230 |  | 
|---|
| 231 | while (i <= j && !OP::Operation(data[sel.get_index(result.get_index(j))], data[dpivot_idx])) { | 
|---|
| 232 | j--; | 
|---|
| 233 | } | 
|---|
| 234 |  | 
|---|
| 235 | if (i < j) { | 
|---|
| 236 | result.swap(i, j); | 
|---|
| 237 | } | 
|---|
| 238 | } | 
|---|
| 239 | result.swap(i - 1, left); | 
|---|
| 240 | sel_t part = i - 1; | 
|---|
| 241 |  | 
|---|
| 242 | if (part > 0) { | 
|---|
| 243 | templated_quicksort_inplace<T, OP>(data, sel, count, result, left, part - 1); | 
|---|
| 244 | } | 
|---|
| 245 | templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, right); | 
|---|
| 246 | } | 
|---|
| 247 |  | 
|---|
| 248 | template <class T, class OP> | 
|---|
| 249 | void templated_quicksort(T *__restrict data, const SelectionVector &sel, const SelectionVector ¬_null_sel, | 
|---|
| 250 | idx_t count, SelectionVector &result) { | 
|---|
| 251 | auto part = templated_quicksort_initial<T, OP>(data, sel, not_null_sel, count, result); | 
|---|
| 252 | if (part > count) { | 
|---|
| 253 | return; | 
|---|
| 254 | } | 
|---|
| 255 | templated_quicksort_inplace<T, OP>(data, sel, count, result, 0, part); | 
|---|
| 256 | templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, count - 1); | 
|---|
| 257 | } | 
|---|
| 258 |  | 
|---|
| 259 | template <class T> | 
|---|
| 260 | static void templated_quicksort(VectorData &vdata, const SelectionVector ¬_null_sel, idx_t not_null_count, | 
|---|
| 261 | SelectionVector &result) { | 
|---|
| 262 | if (not_null_count == 0) { | 
|---|
| 263 | return; | 
|---|
| 264 | } | 
|---|
| 265 | templated_quicksort<T, duckdb::LessThanEquals>((T *)vdata.data, *vdata.sel, not_null_sel, not_null_count, result); | 
|---|
| 266 | } | 
|---|
| 267 |  | 
|---|
| 268 | void OrderVector(Vector &vector, idx_t count, MergeOrder &order) { | 
|---|
| 269 | if (count == 0) { | 
|---|
| 270 | order.count = 0; | 
|---|
| 271 | return; | 
|---|
| 272 | } | 
|---|
| 273 | vector.Orrify(count, order.vdata); | 
|---|
| 274 | auto &vdata = order.vdata; | 
|---|
| 275 |  | 
|---|
| 276 | // first filter out all the non-null values | 
|---|
| 277 | SelectionVector not_null(STANDARD_VECTOR_SIZE); | 
|---|
| 278 | idx_t not_null_count = 0; | 
|---|
| 279 | for (idx_t i = 0; i < count; i++) { | 
|---|
| 280 | auto idx = vdata.sel->get_index(i); | 
|---|
| 281 | if (!(*vdata.nullmask)[idx]) { | 
|---|
| 282 | not_null.set_index(not_null_count++, i); | 
|---|
| 283 | } | 
|---|
| 284 | } | 
|---|
| 285 |  | 
|---|
| 286 | order.count = not_null_count; | 
|---|
| 287 | order.order.Initialize(STANDARD_VECTOR_SIZE); | 
|---|
| 288 | switch (vector.type) { | 
|---|
| 289 | case TypeId::BOOL: | 
|---|
| 290 | case TypeId::INT8: | 
|---|
| 291 | templated_quicksort<int8_t>(vdata, not_null, not_null_count, order.order); | 
|---|
| 292 | break; | 
|---|
| 293 | case TypeId::INT16: | 
|---|
| 294 | templated_quicksort<int16_t>(vdata, not_null, not_null_count, order.order); | 
|---|
| 295 | break; | 
|---|
| 296 | case TypeId::INT32: | 
|---|
| 297 | templated_quicksort<int32_t>(vdata, not_null, not_null_count, order.order); | 
|---|
| 298 | break; | 
|---|
| 299 | case TypeId::INT64: | 
|---|
| 300 | templated_quicksort<int64_t>(vdata, not_null, not_null_count, order.order); | 
|---|
| 301 | break; | 
|---|
| 302 | case TypeId::FLOAT: | 
|---|
| 303 | templated_quicksort<float>(vdata, not_null, not_null_count, order.order); | 
|---|
| 304 | break; | 
|---|
| 305 | case TypeId::DOUBLE: | 
|---|
| 306 | templated_quicksort<double>(vdata, not_null, not_null_count, order.order); | 
|---|
| 307 | break; | 
|---|
| 308 | case TypeId::VARCHAR: | 
|---|
| 309 | templated_quicksort<string_t>(vdata, not_null, not_null_count, order.order); | 
|---|
| 310 | break; | 
|---|
| 311 | default: | 
|---|
| 312 | throw NotImplementedException( "Unimplemented type for sort"); | 
|---|
| 313 | } | 
|---|
| 314 | } | 
|---|
| 315 |  | 
|---|