1#include "duckdb/execution/operator/join/physical_delim_join.hpp"
2
3#include "duckdb/common/types/column/column_data_collection.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp"
6#include "duckdb/execution/operator/scan/physical_column_data_scan.hpp"
7#include "duckdb/execution/operator/set/physical_recursive_cte.hpp"
8#include "duckdb/parallel/meta_pipeline.hpp"
9#include "duckdb/parallel/pipeline.hpp"
10#include "duckdb/parallel/thread_context.hpp"
11
12namespace duckdb {
13
14PhysicalDelimJoin::PhysicalDelimJoin(vector<LogicalType> types, unique_ptr<PhysicalOperator> original_join,
15 vector<const_reference<PhysicalOperator>> delim_scans, idx_t estimated_cardinality)
16 : PhysicalOperator(PhysicalOperatorType::DELIM_JOIN, std::move(types), estimated_cardinality),
17 join(std::move(original_join)), delim_scans(std::move(delim_scans)) {
18 D_ASSERT(join->children.size() == 2);
19 // now for the original join
20 // we take its left child, this is the side that we will duplicate eliminate
21 children.push_back(x: std::move(join->children[0]));
22
23 // we replace it with a PhysicalColumnDataScan, that scans the ColumnDataCollection that we keep cached
24 // the actual chunk collection to scan will be created in the DelimJoinGlobalState
25 auto cached_chunk_scan = make_uniq<PhysicalColumnDataScan>(
26 args: children[0]->GetTypes(), args: PhysicalOperatorType::COLUMN_DATA_SCAN, args&: estimated_cardinality);
27 join->children[0] = std::move(cached_chunk_scan);
28}
29
30vector<const_reference<PhysicalOperator>> PhysicalDelimJoin::GetChildren() const {
31 vector<const_reference<PhysicalOperator>> result;
32 for (auto &child : children) {
33 result.push_back(x: *child);
34 }
35 result.push_back(x: *join);
36 result.push_back(x: *distinct);
37 return result;
38}
39
40//===--------------------------------------------------------------------===//
41// Sink
42//===--------------------------------------------------------------------===//
43class DelimJoinGlobalState : public GlobalSinkState {
44public:
45 explicit DelimJoinGlobalState(ClientContext &context, const PhysicalDelimJoin &delim_join)
46 : lhs_data(context, delim_join.children[0]->GetTypes()) {
47 D_ASSERT(delim_join.delim_scans.size() > 0);
48 // set up the delim join chunk to scan in the original join
49 auto &cached_chunk_scan = delim_join.join->children[0]->Cast<PhysicalColumnDataScan>();
50 cached_chunk_scan.collection = &lhs_data;
51 }
52
53 ColumnDataCollection lhs_data;
54 mutex lhs_lock;
55
56 void Merge(ColumnDataCollection &input) {
57 lock_guard<mutex> guard(lhs_lock);
58 lhs_data.Combine(other&: input);
59 }
60};
61
62class DelimJoinLocalState : public LocalSinkState {
63public:
64 explicit DelimJoinLocalState(ClientContext &context, const PhysicalDelimJoin &delim_join)
65 : lhs_data(context, delim_join.children[0]->GetTypes()) {
66 lhs_data.InitializeAppend(state&: append_state);
67 }
68
69 unique_ptr<LocalSinkState> distinct_state;
70 ColumnDataCollection lhs_data;
71 ColumnDataAppendState append_state;
72
73 void Append(DataChunk &input) {
74 lhs_data.Append(new_chunk&: input);
75 }
76};
77
78unique_ptr<GlobalSinkState> PhysicalDelimJoin::GetGlobalSinkState(ClientContext &context) const {
79 auto state = make_uniq<DelimJoinGlobalState>(args&: context, args: *this);
80 distinct->sink_state = distinct->GetGlobalSinkState(context);
81 if (delim_scans.size() > 1) {
82 PhysicalHashAggregate::SetMultiScan(*distinct->sink_state);
83 }
84 return std::move(state);
85}
86
87unique_ptr<LocalSinkState> PhysicalDelimJoin::GetLocalSinkState(ExecutionContext &context) const {
88 auto state = make_uniq<DelimJoinLocalState>(args&: context.client, args: *this);
89 state->distinct_state = distinct->GetLocalSinkState(context);
90 return std::move(state);
91}
92
93SinkResultType PhysicalDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
94 auto &lstate = input.local_state.Cast<DelimJoinLocalState>();
95 lstate.lhs_data.Append(state&: lstate.append_state, new_chunk&: chunk);
96 OperatorSinkInput distinct_sink_input {.global_state: *distinct->sink_state, .local_state: *lstate.distinct_state, .interrupt_state: input.interrupt_state};
97 distinct->Sink(context, chunk, input&: distinct_sink_input);
98 return SinkResultType::NEED_MORE_INPUT;
99}
100
101void PhysicalDelimJoin::Combine(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const {
102 auto &lstate = lstate_p.Cast<DelimJoinLocalState>();
103 auto &gstate = state.Cast<DelimJoinGlobalState>();
104 gstate.Merge(input&: lstate.lhs_data);
105 distinct->Combine(context, state&: *distinct->sink_state, lstate&: *lstate.distinct_state);
106}
107
108SinkFinalizeType PhysicalDelimJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &client,
109 GlobalSinkState &gstate) const {
110 // finalize the distinct HT
111 D_ASSERT(distinct);
112 distinct->Finalize(pipeline, event, context&: client, gstate&: *distinct->sink_state);
113 return SinkFinalizeType::READY;
114}
115
116string PhysicalDelimJoin::ParamsToString() const {
117 return join->ParamsToString();
118}
119
120//===--------------------------------------------------------------------===//
121// Pipeline Construction
122//===--------------------------------------------------------------------===//
123void PhysicalDelimJoin::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
124 op_state.reset();
125 sink_state.reset();
126
127 auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this);
128 child_meta_pipeline.Build(op&: *children[0]);
129
130 if (type == PhysicalOperatorType::DELIM_JOIN) {
131 // recurse into the actual join
132 // any pipelines in there depend on the main pipeline
133 // any scan of the duplicate eliminated data on the RHS depends on this pipeline
134 // we add an entry to the mapping of (PhysicalOperator*) -> (Pipeline*)
135 auto &state = meta_pipeline.GetState();
136 for (auto &delim_scan : delim_scans) {
137 state.delim_join_dependencies.insert(
138 x: make_pair(x&: delim_scan, y: reference<Pipeline>(*child_meta_pipeline.GetBasePipeline())));
139 }
140 join->BuildPipelines(current, meta_pipeline);
141 }
142}
143
144} // namespace duckdb
145