| 1 | #include "duckdb/execution/operator/join/physical_delim_join.hpp" |
| 2 | |
| 3 | #include "duckdb/common/types/column/column_data_collection.hpp" |
| 4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 5 | #include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp" |
| 6 | #include "duckdb/execution/operator/scan/physical_column_data_scan.hpp" |
| 7 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
| 8 | #include "duckdb/parallel/meta_pipeline.hpp" |
| 9 | #include "duckdb/parallel/pipeline.hpp" |
| 10 | #include "duckdb/parallel/thread_context.hpp" |
| 11 | |
| 12 | namespace duckdb { |
| 13 | |
| 14 | PhysicalDelimJoin::PhysicalDelimJoin(vector<LogicalType> types, unique_ptr<PhysicalOperator> original_join, |
| 15 | vector<const_reference<PhysicalOperator>> delim_scans, idx_t estimated_cardinality) |
| 16 | : PhysicalOperator(PhysicalOperatorType::DELIM_JOIN, std::move(types), estimated_cardinality), |
| 17 | join(std::move(original_join)), delim_scans(std::move(delim_scans)) { |
| 18 | D_ASSERT(join->children.size() == 2); |
| 19 | // now for the original join |
| 20 | // we take its left child, this is the side that we will duplicate eliminate |
| 21 | children.push_back(x: std::move(join->children[0])); |
| 22 | |
| 23 | // we replace it with a PhysicalColumnDataScan, that scans the ColumnDataCollection that we keep cached |
| 24 | // the actual chunk collection to scan will be created in the DelimJoinGlobalState |
| 25 | auto cached_chunk_scan = make_uniq<PhysicalColumnDataScan>( |
| 26 | args: children[0]->GetTypes(), args: PhysicalOperatorType::COLUMN_DATA_SCAN, args&: estimated_cardinality); |
| 27 | join->children[0] = std::move(cached_chunk_scan); |
| 28 | } |
| 29 | |
| 30 | vector<const_reference<PhysicalOperator>> PhysicalDelimJoin::GetChildren() const { |
| 31 | vector<const_reference<PhysicalOperator>> result; |
| 32 | for (auto &child : children) { |
| 33 | result.push_back(x: *child); |
| 34 | } |
| 35 | result.push_back(x: *join); |
| 36 | result.push_back(x: *distinct); |
| 37 | return result; |
| 38 | } |
| 39 | |
| 40 | //===--------------------------------------------------------------------===// |
| 41 | // Sink |
| 42 | //===--------------------------------------------------------------------===// |
| 43 | class DelimJoinGlobalState : public GlobalSinkState { |
| 44 | public: |
| 45 | explicit DelimJoinGlobalState(ClientContext &context, const PhysicalDelimJoin &delim_join) |
| 46 | : lhs_data(context, delim_join.children[0]->GetTypes()) { |
| 47 | D_ASSERT(delim_join.delim_scans.size() > 0); |
| 48 | // set up the delim join chunk to scan in the original join |
| 49 | auto &cached_chunk_scan = delim_join.join->children[0]->Cast<PhysicalColumnDataScan>(); |
| 50 | cached_chunk_scan.collection = &lhs_data; |
| 51 | } |
| 52 | |
| 53 | ColumnDataCollection lhs_data; |
| 54 | mutex lhs_lock; |
| 55 | |
| 56 | void Merge(ColumnDataCollection &input) { |
| 57 | lock_guard<mutex> guard(lhs_lock); |
| 58 | lhs_data.Combine(other&: input); |
| 59 | } |
| 60 | }; |
| 61 | |
| 62 | class DelimJoinLocalState : public LocalSinkState { |
| 63 | public: |
| 64 | explicit DelimJoinLocalState(ClientContext &context, const PhysicalDelimJoin &delim_join) |
| 65 | : lhs_data(context, delim_join.children[0]->GetTypes()) { |
| 66 | lhs_data.InitializeAppend(state&: append_state); |
| 67 | } |
| 68 | |
| 69 | unique_ptr<LocalSinkState> distinct_state; |
| 70 | ColumnDataCollection lhs_data; |
| 71 | ColumnDataAppendState append_state; |
| 72 | |
| 73 | void Append(DataChunk &input) { |
| 74 | lhs_data.Append(new_chunk&: input); |
| 75 | } |
| 76 | }; |
| 77 | |
| 78 | unique_ptr<GlobalSinkState> PhysicalDelimJoin::GetGlobalSinkState(ClientContext &context) const { |
| 79 | auto state = make_uniq<DelimJoinGlobalState>(args&: context, args: *this); |
| 80 | distinct->sink_state = distinct->GetGlobalSinkState(context); |
| 81 | if (delim_scans.size() > 1) { |
| 82 | PhysicalHashAggregate::SetMultiScan(*distinct->sink_state); |
| 83 | } |
| 84 | return std::move(state); |
| 85 | } |
| 86 | |
| 87 | unique_ptr<LocalSinkState> PhysicalDelimJoin::GetLocalSinkState(ExecutionContext &context) const { |
| 88 | auto state = make_uniq<DelimJoinLocalState>(args&: context.client, args: *this); |
| 89 | state->distinct_state = distinct->GetLocalSinkState(context); |
| 90 | return std::move(state); |
| 91 | } |
| 92 | |
| 93 | SinkResultType PhysicalDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 94 | auto &lstate = input.local_state.Cast<DelimJoinLocalState>(); |
| 95 | lstate.lhs_data.Append(state&: lstate.append_state, new_chunk&: chunk); |
| 96 | OperatorSinkInput distinct_sink_input {.global_state: *distinct->sink_state, .local_state: *lstate.distinct_state, .interrupt_state: input.interrupt_state}; |
| 97 | distinct->Sink(context, chunk, input&: distinct_sink_input); |
| 98 | return SinkResultType::NEED_MORE_INPUT; |
| 99 | } |
| 100 | |
| 101 | void PhysicalDelimJoin::Combine(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const { |
| 102 | auto &lstate = lstate_p.Cast<DelimJoinLocalState>(); |
| 103 | auto &gstate = state.Cast<DelimJoinGlobalState>(); |
| 104 | gstate.Merge(input&: lstate.lhs_data); |
| 105 | distinct->Combine(context, state&: *distinct->sink_state, lstate&: *lstate.distinct_state); |
| 106 | } |
| 107 | |
| 108 | SinkFinalizeType PhysicalDelimJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &client, |
| 109 | GlobalSinkState &gstate) const { |
| 110 | // finalize the distinct HT |
| 111 | D_ASSERT(distinct); |
| 112 | distinct->Finalize(pipeline, event, context&: client, gstate&: *distinct->sink_state); |
| 113 | return SinkFinalizeType::READY; |
| 114 | } |
| 115 | |
| 116 | string PhysicalDelimJoin::ParamsToString() const { |
| 117 | return join->ParamsToString(); |
| 118 | } |
| 119 | |
| 120 | //===--------------------------------------------------------------------===// |
| 121 | // Pipeline Construction |
| 122 | //===--------------------------------------------------------------------===// |
| 123 | void PhysicalDelimJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
| 124 | op_state.reset(); |
| 125 | sink_state.reset(); |
| 126 | |
| 127 | auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this); |
| 128 | child_meta_pipeline.Build(op&: *children[0]); |
| 129 | |
| 130 | if (type == PhysicalOperatorType::DELIM_JOIN) { |
| 131 | // recurse into the actual join |
| 132 | // any pipelines in there depend on the main pipeline |
| 133 | // any scan of the duplicate eliminated data on the RHS depends on this pipeline |
| 134 | // we add an entry to the mapping of (PhysicalOperator*) -> (Pipeline*) |
| 135 | auto &state = meta_pipeline.GetState(); |
| 136 | for (auto &delim_scan : delim_scans) { |
| 137 | state.delim_join_dependencies.insert( |
| 138 | x: make_pair(x&: delim_scan, y: reference<Pipeline>(*child_meta_pipeline.GetBasePipeline()))); |
| 139 | } |
| 140 | join->BuildPipelines(current, meta_pipeline); |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | } // namespace duckdb |
| 145 | |