1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/parallel/meta_pipeline.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/execution/physical_operator.hpp" |
12 | |
13 | namespace duckdb { |
14 | |
15 | class PhysicalRecursiveCTE; |
16 | |
17 | struct PipelineFinishGroup { |
18 | explicit PipelineFinishGroup(Pipeline *group_base_p) : group_base(group_base_p) { |
19 | } |
20 | Pipeline *group_base; |
21 | unordered_set<Pipeline *> group_members; |
22 | }; |
23 | |
24 | //! MetaPipeline represents a set of pipelines that all have the same sink |
25 | class MetaPipeline : public std::enable_shared_from_this<MetaPipeline> { |
26 | //! We follow these rules when building: |
27 | //! 1. For joins, build out the blocking side before going down the probe side |
28 | //! - The current streaming pipeline will have a dependency on it (dependency across MetaPipelines) |
29 | //! - Unions of this streaming pipeline will automatically inherit this dependency |
30 | //! 2. Build child pipelines last (e.g., Hash Join becomes source after probe is done: scan HT for FULL OUTER JOIN) |
31 | //! - 'last' means after building out all other pipelines associated with this operator |
32 | //! - The child pipeline automatically has dependencies (within this MetaPipeline) on: |
33 | //! * The 'current' streaming pipeline |
34 | //! * And all pipelines that were added to the MetaPipeline after 'current' |
35 | public: |
36 | //! Create a MetaPipeline with the given sink |
37 | explicit MetaPipeline(Executor &executor, PipelineBuildState &state, PhysicalOperator *sink); |
38 | |
39 | public: |
40 | //! Get the Executor for this MetaPipeline |
41 | Executor &GetExecutor() const; |
42 | //! Get the PipelineBuildState for this MetaPipeline |
43 | PipelineBuildState &GetState() const; |
44 | //! Get the sink operator for this MetaPipeline |
45 | optional_ptr<PhysicalOperator> GetSink() const; |
46 | |
47 | //! Get the initial pipeline of this MetaPipeline |
48 | shared_ptr<Pipeline> &GetBasePipeline(); |
49 | //! Get the pipelines of this MetaPipeline |
50 | void GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive); |
51 | //! Get the MetaPipeline children of this MetaPipeline |
52 | void GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip); |
53 | //! Get the dependencies (within this MetaPipeline) of the given Pipeline |
54 | const vector<Pipeline *> *GetDependencies(Pipeline *dependant) const; |
55 | //! Whether this MetaPipeline has a recursive CTE |
56 | bool HasRecursiveCTE() const; |
57 | //! Set the flag that this MetaPipeline is a recursive CTE pipeline |
58 | void SetRecursiveCTE(); |
59 | //! Assign a batch index to the given pipeline |
60 | void AssignNextBatchIndex(Pipeline *pipeline); |
61 | //! Let 'dependant' depend on all pipeline that were created since 'start', |
62 | //! where 'including' determines whether 'start' is added to the dependencies |
63 | void AddDependenciesFrom(Pipeline *dependant, Pipeline *start, bool including); |
64 | //! Make sure that the given pipeline has its own PipelineFinishEvent (e.g., for IEJoin - double Finalize) |
65 | void AddFinishEvent(Pipeline *pipeline); |
66 | //! Whether the pipeline needs its own PipelineFinishEvent |
67 | bool HasFinishEvent(Pipeline *pipeline) const; |
68 | //! Whether this pipeline is part of a PipelineFinishEvent |
69 | optional_ptr<Pipeline> GetFinishGroup(Pipeline *pipeline) const; |
70 | |
71 | public: |
72 | //! Build the MetaPipeline with 'op' as the first operator (excl. the shared sink) |
73 | void Build(PhysicalOperator &op); |
74 | //! Ready all the pipelines (recursively) |
75 | void Ready(); |
76 | |
77 | //! Create an empty pipeline within this MetaPipeline |
78 | Pipeline *CreatePipeline(); |
79 | //! Create a union pipeline (clone of 'current') |
80 | Pipeline *CreateUnionPipeline(Pipeline ¤t, bool order_matters); |
81 | //! Create a child pipeline op 'current' starting at 'op', |
82 | //! where 'last_pipeline' is the last pipeline added before building out 'current' |
83 | void CreateChildPipeline(Pipeline ¤t, PhysicalOperator &op, Pipeline *last_pipeline); |
84 | //! Create a MetaPipeline child that 'current' depends on |
85 | MetaPipeline &CreateChildMetaPipeline(Pipeline ¤t, PhysicalOperator &op); |
86 | |
87 | private: |
88 | //! The executor for all MetaPipelines in the query plan |
89 | Executor &executor; |
90 | //! The PipelineBuildState for all MetaPipelines in the query plan |
91 | PipelineBuildState &state; |
92 | //! The sink of all pipelines within this MetaPipeline |
93 | optional_ptr<PhysicalOperator> sink; |
94 | //! Whether this MetaPipeline is a the recursive pipeline of a recursive CTE |
95 | bool recursive_cte; |
96 | //! All pipelines with a different source, but the same sink |
97 | vector<shared_ptr<Pipeline>> pipelines; |
98 | //! Dependencies within this MetaPipeline |
99 | unordered_map<Pipeline *, vector<Pipeline *>> dependencies; |
100 | //! Other MetaPipelines that this MetaPipeline depends on |
101 | vector<shared_ptr<MetaPipeline>> children; |
102 | //! Next batch index |
103 | idx_t next_batch_index; |
104 | //! Pipelines (other than the base pipeline) that need their own PipelineFinishEvent (e.g., for IEJoin) |
105 | unordered_set<Pipeline *> finish_pipelines; |
106 | //! Mapping from pipeline (e.g., child or union) to finish pipeline |
107 | unordered_map<Pipeline *, Pipeline *> finish_map; |
108 | }; |
109 | |
110 | } // namespace duckdb |
111 | |