| 1 | #include "duckdb/execution/aggregate_hashtable.hpp" |
| 2 | #include "duckdb/execution/operator/join/physical_delim_join.hpp" |
| 3 | #include "duckdb/execution/operator/join/physical_hash_join.hpp" |
| 4 | #include "duckdb/execution/operator/projection/physical_projection.hpp" |
| 5 | #include "duckdb/execution/operator/scan/physical_chunk_scan.hpp" |
| 6 | #include "duckdb/execution/physical_plan_generator.hpp" |
| 7 | #include "duckdb/function/aggregate/distributive_functions.hpp" |
| 8 | #include "duckdb/planner/operator/logical_delim_join.hpp" |
| 9 | #include "duckdb/planner/expression/bound_aggregate_expression.hpp" |
| 10 | |
| 11 | using namespace duckdb; |
| 12 | using namespace std; |
| 13 | |
| 14 | static void GatherDelimScans(PhysicalOperator *op, vector<PhysicalOperator *> &delim_scans) { |
| 15 | assert(op); |
| 16 | if (op->type == PhysicalOperatorType::DELIM_SCAN) { |
| 17 | delim_scans.push_back(op); |
| 18 | } |
| 19 | for (auto &child : op->children) { |
| 20 | GatherDelimScans(child.get(), delim_scans); |
| 21 | } |
| 22 | } |
| 23 | |
| 24 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalDelimJoin &op) { |
| 25 | // first create the underlying join |
| 26 | auto plan = CreatePlan((LogicalComparisonJoin &)op); |
| 27 | // this should create a join, not a cross product |
| 28 | assert(plan && plan->type != PhysicalOperatorType::CROSS_PRODUCT); |
| 29 | // duplicate eliminated join |
| 30 | // first gather the scans on the duplicate eliminated data set from the RHS |
| 31 | vector<PhysicalOperator *> delim_scans; |
| 32 | GatherDelimScans(plan->children[1].get(), delim_scans); |
| 33 | if (delim_scans.size() == 0) { |
| 34 | // no duplicate eliminated scans in the RHS! |
| 35 | // in this case we don't need to create a delim join |
| 36 | // just push the normal join |
| 37 | return plan; |
| 38 | } |
| 39 | vector<TypeId> delim_types; |
| 40 | for (auto &delim_expr : op.duplicate_eliminated_columns) { |
| 41 | delim_types.push_back(delim_expr->return_type); |
| 42 | } |
| 43 | if (op.join_type == JoinType::MARK) { |
| 44 | assert(plan->type == PhysicalOperatorType::HASH_JOIN); |
| 45 | auto &hash_join = (PhysicalHashJoin &)*plan; |
| 46 | // correlated MARK join |
| 47 | if (delim_types.size() + 1 == hash_join.conditions.size()) { |
| 48 | // the correlated MARK join has one more condition than the amount of correlated columns |
| 49 | // this is the case in a correlated ANY() expression |
| 50 | // in this case we need to keep track of additional entries, namely: |
| 51 | // - (1) the total amount of elements per group |
| 52 | // - (2) the amount of non-null elements per group |
| 53 | // we need these to correctly deal with the cases of either: |
| 54 | // - (1) the group being empty [in which case the result is always false, even if the comparison is NULL] |
| 55 | // - (2) the group containing a NULL value [in which case FALSE becomes NULL] |
| 56 | auto &info = hash_join.hash_table->correlated_mark_join_info; |
| 57 | |
| 58 | vector<TypeId> payload_types = {TypeId::INT64, TypeId::INT64}; // COUNT types |
| 59 | vector<AggregateFunction> aggregate_functions = {CountStarFun::GetFunction(), CountFun::GetFunction()}; |
| 60 | vector<BoundAggregateExpression *> correlated_aggregates; |
| 61 | for (idx_t i = 0; i < aggregate_functions.size(); ++i) { |
| 62 | auto aggr = make_unique<BoundAggregateExpression>(payload_types[i], aggregate_functions[i], false); |
| 63 | correlated_aggregates.push_back(&*aggr); |
| 64 | info.correlated_aggregates.push_back(move(aggr)); |
| 65 | } |
| 66 | info.correlated_counts = |
| 67 | make_unique<SuperLargeHashTable>(1024, delim_types, payload_types, correlated_aggregates); |
| 68 | info.correlated_types = delim_types; |
| 69 | // FIXME: these can be initialized "empty" (without allocating empty vectors) |
| 70 | info.group_chunk.Initialize(delim_types); |
| 71 | info.payload_chunk.Initialize(payload_types); |
| 72 | info.result_chunk.Initialize(payload_types); |
| 73 | } |
| 74 | } |
| 75 | // now create the duplicate eliminated join |
| 76 | auto delim_join = make_unique<PhysicalDelimJoin>(op, move(plan), delim_scans); |
| 77 | // we still have to create the DISTINCT clause that is used to generate the duplicate eliminated chunk |
| 78 | // we create a ChunkCollectionScan that pulls from the delim_join LHS |
| 79 | auto chunk_scan = |
| 80 | make_unique<PhysicalChunkScan>(delim_join->children[0]->GetTypes(), PhysicalOperatorType::CHUNK_SCAN); |
| 81 | chunk_scan->collection = &delim_join->lhs_data; |
| 82 | // now we need to create a projection that projects only the duplicate eliminated columns |
| 83 | assert(op.duplicate_eliminated_columns.size() > 0); |
| 84 | auto projection = make_unique<PhysicalProjection>(delim_types, move(op.duplicate_eliminated_columns)); |
| 85 | projection->children.push_back(move(chunk_scan)); |
| 86 | // finally create the distinct clause on top of the projection |
| 87 | delim_join->distinct = CreateDistinct(move(projection)); |
| 88 | return move(delim_join); |
| 89 | } |
| 90 | |