1 | #include "duckdb/execution/operator/join/physical_delim_join.hpp" |
2 | |
3 | #include "duckdb/common/types/column/column_data_collection.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp" |
6 | #include "duckdb/execution/operator/scan/physical_column_data_scan.hpp" |
7 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
8 | #include "duckdb/parallel/meta_pipeline.hpp" |
9 | #include "duckdb/parallel/pipeline.hpp" |
10 | #include "duckdb/parallel/thread_context.hpp" |
11 | |
12 | namespace duckdb { |
13 | |
14 | PhysicalDelimJoin::PhysicalDelimJoin(vector<LogicalType> types, unique_ptr<PhysicalOperator> original_join, |
15 | vector<const_reference<PhysicalOperator>> delim_scans, idx_t estimated_cardinality) |
16 | : PhysicalOperator(PhysicalOperatorType::DELIM_JOIN, std::move(types), estimated_cardinality), |
17 | join(std::move(original_join)), delim_scans(std::move(delim_scans)) { |
18 | D_ASSERT(join->children.size() == 2); |
19 | // now for the original join |
20 | // we take its left child, this is the side that we will duplicate eliminate |
21 | children.push_back(x: std::move(join->children[0])); |
22 | |
23 | // we replace it with a PhysicalColumnDataScan, that scans the ColumnDataCollection that we keep cached |
24 | // the actual chunk collection to scan will be created in the DelimJoinGlobalState |
25 | auto cached_chunk_scan = make_uniq<PhysicalColumnDataScan>( |
26 | args: children[0]->GetTypes(), args: PhysicalOperatorType::COLUMN_DATA_SCAN, args&: estimated_cardinality); |
27 | join->children[0] = std::move(cached_chunk_scan); |
28 | } |
29 | |
30 | vector<const_reference<PhysicalOperator>> PhysicalDelimJoin::GetChildren() const { |
31 | vector<const_reference<PhysicalOperator>> result; |
32 | for (auto &child : children) { |
33 | result.push_back(x: *child); |
34 | } |
35 | result.push_back(x: *join); |
36 | result.push_back(x: *distinct); |
37 | return result; |
38 | } |
39 | |
40 | //===--------------------------------------------------------------------===// |
41 | // Sink |
42 | //===--------------------------------------------------------------------===// |
43 | class DelimJoinGlobalState : public GlobalSinkState { |
44 | public: |
45 | explicit DelimJoinGlobalState(ClientContext &context, const PhysicalDelimJoin &delim_join) |
46 | : lhs_data(context, delim_join.children[0]->GetTypes()) { |
47 | D_ASSERT(delim_join.delim_scans.size() > 0); |
48 | // set up the delim join chunk to scan in the original join |
49 | auto &cached_chunk_scan = delim_join.join->children[0]->Cast<PhysicalColumnDataScan>(); |
50 | cached_chunk_scan.collection = &lhs_data; |
51 | } |
52 | |
53 | ColumnDataCollection lhs_data; |
54 | mutex lhs_lock; |
55 | |
56 | void Merge(ColumnDataCollection &input) { |
57 | lock_guard<mutex> guard(lhs_lock); |
58 | lhs_data.Combine(other&: input); |
59 | } |
60 | }; |
61 | |
62 | class DelimJoinLocalState : public LocalSinkState { |
63 | public: |
64 | explicit DelimJoinLocalState(ClientContext &context, const PhysicalDelimJoin &delim_join) |
65 | : lhs_data(context, delim_join.children[0]->GetTypes()) { |
66 | lhs_data.InitializeAppend(state&: append_state); |
67 | } |
68 | |
69 | unique_ptr<LocalSinkState> distinct_state; |
70 | ColumnDataCollection lhs_data; |
71 | ColumnDataAppendState append_state; |
72 | |
73 | void Append(DataChunk &input) { |
74 | lhs_data.Append(new_chunk&: input); |
75 | } |
76 | }; |
77 | |
78 | unique_ptr<GlobalSinkState> PhysicalDelimJoin::GetGlobalSinkState(ClientContext &context) const { |
79 | auto state = make_uniq<DelimJoinGlobalState>(args&: context, args: *this); |
80 | distinct->sink_state = distinct->GetGlobalSinkState(context); |
81 | if (delim_scans.size() > 1) { |
82 | PhysicalHashAggregate::SetMultiScan(*distinct->sink_state); |
83 | } |
84 | return std::move(state); |
85 | } |
86 | |
87 | unique_ptr<LocalSinkState> PhysicalDelimJoin::GetLocalSinkState(ExecutionContext &context) const { |
88 | auto state = make_uniq<DelimJoinLocalState>(args&: context.client, args: *this); |
89 | state->distinct_state = distinct->GetLocalSinkState(context); |
90 | return std::move(state); |
91 | } |
92 | |
93 | SinkResultType PhysicalDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
94 | auto &lstate = input.local_state.Cast<DelimJoinLocalState>(); |
95 | lstate.lhs_data.Append(state&: lstate.append_state, new_chunk&: chunk); |
96 | OperatorSinkInput distinct_sink_input {.global_state: *distinct->sink_state, .local_state: *lstate.distinct_state, .interrupt_state: input.interrupt_state}; |
97 | distinct->Sink(context, chunk, input&: distinct_sink_input); |
98 | return SinkResultType::NEED_MORE_INPUT; |
99 | } |
100 | |
101 | void PhysicalDelimJoin::Combine(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const { |
102 | auto &lstate = lstate_p.Cast<DelimJoinLocalState>(); |
103 | auto &gstate = state.Cast<DelimJoinGlobalState>(); |
104 | gstate.Merge(input&: lstate.lhs_data); |
105 | distinct->Combine(context, state&: *distinct->sink_state, lstate&: *lstate.distinct_state); |
106 | } |
107 | |
108 | SinkFinalizeType PhysicalDelimJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &client, |
109 | GlobalSinkState &gstate) const { |
110 | // finalize the distinct HT |
111 | D_ASSERT(distinct); |
112 | distinct->Finalize(pipeline, event, context&: client, gstate&: *distinct->sink_state); |
113 | return SinkFinalizeType::READY; |
114 | } |
115 | |
116 | string PhysicalDelimJoin::ParamsToString() const { |
117 | return join->ParamsToString(); |
118 | } |
119 | |
120 | //===--------------------------------------------------------------------===// |
121 | // Pipeline Construction |
122 | //===--------------------------------------------------------------------===// |
123 | void PhysicalDelimJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
124 | op_state.reset(); |
125 | sink_state.reset(); |
126 | |
127 | auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this); |
128 | child_meta_pipeline.Build(op&: *children[0]); |
129 | |
130 | if (type == PhysicalOperatorType::DELIM_JOIN) { |
131 | // recurse into the actual join |
132 | // any pipelines in there depend on the main pipeline |
133 | // any scan of the duplicate eliminated data on the RHS depends on this pipeline |
134 | // we add an entry to the mapping of (PhysicalOperator*) -> (Pipeline*) |
135 | auto &state = meta_pipeline.GetState(); |
136 | for (auto &delim_scan : delim_scans) { |
137 | state.delim_join_dependencies.insert( |
138 | x: make_pair(x&: delim_scan, y: reference<Pipeline>(*child_meta_pipeline.GetBasePipeline()))); |
139 | } |
140 | join->BuildPipelines(current, meta_pipeline); |
141 | } |
142 | } |
143 | |
144 | } // namespace duckdb |
145 | |