1 | #include "duckdb/execution/aggregate_hashtable.hpp" |
2 | #include "duckdb/execution/operator/join/physical_delim_join.hpp" |
3 | #include "duckdb/execution/operator/join/physical_hash_join.hpp" |
4 | #include "duckdb/execution/operator/projection/physical_projection.hpp" |
5 | #include "duckdb/execution/operator/scan/physical_chunk_scan.hpp" |
6 | #include "duckdb/execution/physical_plan_generator.hpp" |
7 | #include "duckdb/function/aggregate/distributive_functions.hpp" |
8 | #include "duckdb/planner/operator/logical_delim_join.hpp" |
9 | #include "duckdb/planner/expression/bound_aggregate_expression.hpp" |
10 | |
11 | using namespace duckdb; |
12 | using namespace std; |
13 | |
14 | static void GatherDelimScans(PhysicalOperator *op, vector<PhysicalOperator *> &delim_scans) { |
15 | assert(op); |
16 | if (op->type == PhysicalOperatorType::DELIM_SCAN) { |
17 | delim_scans.push_back(op); |
18 | } |
19 | for (auto &child : op->children) { |
20 | GatherDelimScans(child.get(), delim_scans); |
21 | } |
22 | } |
23 | |
24 | unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalDelimJoin &op) { |
25 | // first create the underlying join |
26 | auto plan = CreatePlan((LogicalComparisonJoin &)op); |
27 | // this should create a join, not a cross product |
28 | assert(plan && plan->type != PhysicalOperatorType::CROSS_PRODUCT); |
29 | // duplicate eliminated join |
30 | // first gather the scans on the duplicate eliminated data set from the RHS |
31 | vector<PhysicalOperator *> delim_scans; |
32 | GatherDelimScans(plan->children[1].get(), delim_scans); |
33 | if (delim_scans.size() == 0) { |
34 | // no duplicate eliminated scans in the RHS! |
35 | // in this case we don't need to create a delim join |
36 | // just push the normal join |
37 | return plan; |
38 | } |
39 | vector<TypeId> delim_types; |
40 | for (auto &delim_expr : op.duplicate_eliminated_columns) { |
41 | delim_types.push_back(delim_expr->return_type); |
42 | } |
43 | if (op.join_type == JoinType::MARK) { |
44 | assert(plan->type == PhysicalOperatorType::HASH_JOIN); |
45 | auto &hash_join = (PhysicalHashJoin &)*plan; |
46 | // correlated MARK join |
47 | if (delim_types.size() + 1 == hash_join.conditions.size()) { |
48 | // the correlated MARK join has one more condition than the amount of correlated columns |
49 | // this is the case in a correlated ANY() expression |
50 | // in this case we need to keep track of additional entries, namely: |
51 | // - (1) the total amount of elements per group |
52 | // - (2) the amount of non-null elements per group |
53 | // we need these to correctly deal with the cases of either: |
54 | // - (1) the group being empty [in which case the result is always false, even if the comparison is NULL] |
55 | // - (2) the group containing a NULL value [in which case FALSE becomes NULL] |
56 | auto &info = hash_join.hash_table->correlated_mark_join_info; |
57 | |
58 | vector<TypeId> payload_types = {TypeId::INT64, TypeId::INT64}; // COUNT types |
59 | vector<AggregateFunction> aggregate_functions = {CountStarFun::GetFunction(), CountFun::GetFunction()}; |
60 | vector<BoundAggregateExpression *> correlated_aggregates; |
61 | for (idx_t i = 0; i < aggregate_functions.size(); ++i) { |
62 | auto aggr = make_unique<BoundAggregateExpression>(payload_types[i], aggregate_functions[i], false); |
63 | correlated_aggregates.push_back(&*aggr); |
64 | info.correlated_aggregates.push_back(move(aggr)); |
65 | } |
66 | info.correlated_counts = |
67 | make_unique<SuperLargeHashTable>(1024, delim_types, payload_types, correlated_aggregates); |
68 | info.correlated_types = delim_types; |
69 | // FIXME: these can be initialized "empty" (without allocating empty vectors) |
70 | info.group_chunk.Initialize(delim_types); |
71 | info.payload_chunk.Initialize(payload_types); |
72 | info.result_chunk.Initialize(payload_types); |
73 | } |
74 | } |
75 | // now create the duplicate eliminated join |
76 | auto delim_join = make_unique<PhysicalDelimJoin>(op, move(plan), delim_scans); |
77 | // we still have to create the DISTINCT clause that is used to generate the duplicate eliminated chunk |
78 | // we create a ChunkCollectionScan that pulls from the delim_join LHS |
79 | auto chunk_scan = |
80 | make_unique<PhysicalChunkScan>(delim_join->children[0]->GetTypes(), PhysicalOperatorType::CHUNK_SCAN); |
81 | chunk_scan->collection = &delim_join->lhs_data; |
82 | // now we need to create a projection that projects only the duplicate eliminated columns |
83 | assert(op.duplicate_eliminated_columns.size() > 0); |
84 | auto projection = make_unique<PhysicalProjection>(delim_types, move(op.duplicate_eliminated_columns)); |
85 | projection->children.push_back(move(chunk_scan)); |
86 | // finally create the distinct clause on top of the projection |
87 | delim_join->distinct = CreateDistinct(move(projection)); |
88 | return move(delim_join); |
89 | } |
90 | |