1#include "duckdb/execution/aggregate_hashtable.hpp"
2#include "duckdb/execution/operator/join/physical_delim_join.hpp"
3#include "duckdb/execution/operator/join/physical_hash_join.hpp"
4#include "duckdb/execution/operator/projection/physical_projection.hpp"
5#include "duckdb/execution/operator/scan/physical_chunk_scan.hpp"
6#include "duckdb/execution/physical_plan_generator.hpp"
7#include "duckdb/function/aggregate/distributive_functions.hpp"
8#include "duckdb/planner/operator/logical_delim_join.hpp"
9#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
10
11using namespace duckdb;
12using namespace std;
13
14static void GatherDelimScans(PhysicalOperator *op, vector<PhysicalOperator *> &delim_scans) {
15 assert(op);
16 if (op->type == PhysicalOperatorType::DELIM_SCAN) {
17 delim_scans.push_back(op);
18 }
19 for (auto &child : op->children) {
20 GatherDelimScans(child.get(), delim_scans);
21 }
22}
23
24unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalDelimJoin &op) {
25 // first create the underlying join
26 auto plan = CreatePlan((LogicalComparisonJoin &)op);
27 // this should create a join, not a cross product
28 assert(plan && plan->type != PhysicalOperatorType::CROSS_PRODUCT);
29 // duplicate eliminated join
30 // first gather the scans on the duplicate eliminated data set from the RHS
31 vector<PhysicalOperator *> delim_scans;
32 GatherDelimScans(plan->children[1].get(), delim_scans);
33 if (delim_scans.size() == 0) {
34 // no duplicate eliminated scans in the RHS!
35 // in this case we don't need to create a delim join
36 // just push the normal join
37 return plan;
38 }
39 vector<TypeId> delim_types;
40 for (auto &delim_expr : op.duplicate_eliminated_columns) {
41 delim_types.push_back(delim_expr->return_type);
42 }
43 if (op.join_type == JoinType::MARK) {
44 assert(plan->type == PhysicalOperatorType::HASH_JOIN);
45 auto &hash_join = (PhysicalHashJoin &)*plan;
46 // correlated MARK join
47 if (delim_types.size() + 1 == hash_join.conditions.size()) {
48 // the correlated MARK join has one more condition than the amount of correlated columns
49 // this is the case in a correlated ANY() expression
50 // in this case we need to keep track of additional entries, namely:
51 // - (1) the total amount of elements per group
52 // - (2) the amount of non-null elements per group
53 // we need these to correctly deal with the cases of either:
54 // - (1) the group being empty [in which case the result is always false, even if the comparison is NULL]
55 // - (2) the group containing a NULL value [in which case FALSE becomes NULL]
56 auto &info = hash_join.hash_table->correlated_mark_join_info;
57
58 vector<TypeId> payload_types = {TypeId::INT64, TypeId::INT64}; // COUNT types
59 vector<AggregateFunction> aggregate_functions = {CountStarFun::GetFunction(), CountFun::GetFunction()};
60 vector<BoundAggregateExpression *> correlated_aggregates;
61 for (idx_t i = 0; i < aggregate_functions.size(); ++i) {
62 auto aggr = make_unique<BoundAggregateExpression>(payload_types[i], aggregate_functions[i], false);
63 correlated_aggregates.push_back(&*aggr);
64 info.correlated_aggregates.push_back(move(aggr));
65 }
66 info.correlated_counts =
67 make_unique<SuperLargeHashTable>(1024, delim_types, payload_types, correlated_aggregates);
68 info.correlated_types = delim_types;
69 // FIXME: these can be initialized "empty" (without allocating empty vectors)
70 info.group_chunk.Initialize(delim_types);
71 info.payload_chunk.Initialize(payload_types);
72 info.result_chunk.Initialize(payload_types);
73 }
74 }
75 // now create the duplicate eliminated join
76 auto delim_join = make_unique<PhysicalDelimJoin>(op, move(plan), delim_scans);
77 // we still have to create the DISTINCT clause that is used to generate the duplicate eliminated chunk
78 // we create a ChunkCollectionScan that pulls from the delim_join LHS
79 auto chunk_scan =
80 make_unique<PhysicalChunkScan>(delim_join->children[0]->GetTypes(), PhysicalOperatorType::CHUNK_SCAN);
81 chunk_scan->collection = &delim_join->lhs_data;
82 // now we need to create a projection that projects only the duplicate eliminated columns
83 assert(op.duplicate_eliminated_columns.size() > 0);
84 auto projection = make_unique<PhysicalProjection>(delim_types, move(op.duplicate_eliminated_columns));
85 projection->children.push_back(move(chunk_scan));
86 // finally create the distinct clause on top of the projection
87 delim_join->distinct = CreateDistinct(move(projection));
88 return move(delim_join);
89}
90