1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/parallel/meta_pipeline.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/execution/physical_operator.hpp"
12
13namespace duckdb {
14
15class PhysicalRecursiveCTE;
16
17struct PipelineFinishGroup {
18 explicit PipelineFinishGroup(Pipeline *group_base_p) : group_base(group_base_p) {
19 }
20 Pipeline *group_base;
21 unordered_set<Pipeline *> group_members;
22};
23
24//! MetaPipeline represents a set of pipelines that all have the same sink
25class MetaPipeline : public std::enable_shared_from_this<MetaPipeline> {
26 //! We follow these rules when building:
27 //! 1. For joins, build out the blocking side before going down the probe side
28 //! - The current streaming pipeline will have a dependency on it (dependency across MetaPipelines)
29 //! - Unions of this streaming pipeline will automatically inherit this dependency
30 //! 2. Build child pipelines last (e.g., Hash Join becomes source after probe is done: scan HT for FULL OUTER JOIN)
31 //! - 'last' means after building out all other pipelines associated with this operator
32 //! - The child pipeline automatically has dependencies (within this MetaPipeline) on:
33 //! * The 'current' streaming pipeline
34 //! * And all pipelines that were added to the MetaPipeline after 'current'
35public:
36 //! Create a MetaPipeline with the given sink
37 explicit MetaPipeline(Executor &executor, PipelineBuildState &state, PhysicalOperator *sink);
38
39public:
40 //! Get the Executor for this MetaPipeline
41 Executor &GetExecutor() const;
42 //! Get the PipelineBuildState for this MetaPipeline
43 PipelineBuildState &GetState() const;
44 //! Get the sink operator for this MetaPipeline
45 optional_ptr<PhysicalOperator> GetSink() const;
46
47 //! Get the initial pipeline of this MetaPipeline
48 shared_ptr<Pipeline> &GetBasePipeline();
49 //! Get the pipelines of this MetaPipeline
50 void GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive);
51 //! Get the MetaPipeline children of this MetaPipeline
52 void GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip);
53 //! Get the dependencies (within this MetaPipeline) of the given Pipeline
54 const vector<Pipeline *> *GetDependencies(Pipeline *dependant) const;
55 //! Whether this MetaPipeline has a recursive CTE
56 bool HasRecursiveCTE() const;
57 //! Set the flag that this MetaPipeline is a recursive CTE pipeline
58 void SetRecursiveCTE();
59 //! Assign a batch index to the given pipeline
60 void AssignNextBatchIndex(Pipeline *pipeline);
61 //! Let 'dependant' depend on all pipeline that were created since 'start',
62 //! where 'including' determines whether 'start' is added to the dependencies
63 void AddDependenciesFrom(Pipeline *dependant, Pipeline *start, bool including);
64 //! Make sure that the given pipeline has its own PipelineFinishEvent (e.g., for IEJoin - double Finalize)
65 void AddFinishEvent(Pipeline *pipeline);
66 //! Whether the pipeline needs its own PipelineFinishEvent
67 bool HasFinishEvent(Pipeline *pipeline) const;
68 //! Whether this pipeline is part of a PipelineFinishEvent
69 optional_ptr<Pipeline> GetFinishGroup(Pipeline *pipeline) const;
70
71public:
72 //! Build the MetaPipeline with 'op' as the first operator (excl. the shared sink)
73 void Build(PhysicalOperator &op);
74 //! Ready all the pipelines (recursively)
75 void Ready();
76
77 //! Create an empty pipeline within this MetaPipeline
78 Pipeline *CreatePipeline();
79 //! Create a union pipeline (clone of 'current')
80 Pipeline *CreateUnionPipeline(Pipeline &current, bool order_matters);
81 //! Create a child pipeline op 'current' starting at 'op',
82 //! where 'last_pipeline' is the last pipeline added before building out 'current'
83 void CreateChildPipeline(Pipeline &current, PhysicalOperator &op, Pipeline *last_pipeline);
84 //! Create a MetaPipeline child that 'current' depends on
85 MetaPipeline &CreateChildMetaPipeline(Pipeline &current, PhysicalOperator &op);
86
87private:
88 //! The executor for all MetaPipelines in the query plan
89 Executor &executor;
90 //! The PipelineBuildState for all MetaPipelines in the query plan
91 PipelineBuildState &state;
92 //! The sink of all pipelines within this MetaPipeline
93 optional_ptr<PhysicalOperator> sink;
94 //! Whether this MetaPipeline is a the recursive pipeline of a recursive CTE
95 bool recursive_cte;
96 //! All pipelines with a different source, but the same sink
97 vector<shared_ptr<Pipeline>> pipelines;
98 //! Dependencies within this MetaPipeline
99 unordered_map<Pipeline *, vector<Pipeline *>> dependencies;
100 //! Other MetaPipelines that this MetaPipeline depends on
101 vector<shared_ptr<MetaPipeline>> children;
102 //! Next batch index
103 idx_t next_batch_index;
104 //! Pipelines (other than the base pipeline) that need their own PipelineFinishEvent (e.g., for IEJoin)
105 unordered_set<Pipeline *> finish_pipelines;
106 //! Mapping from pipeline (e.g., child or union) to finish pipeline
107 unordered_map<Pipeline *, Pipeline *> finish_map;
108};
109
110} // namespace duckdb
111