1#include "duckdb/parallel/meta_pipeline.hpp"
2
3#include "duckdb/execution/executor.hpp"
4#include "duckdb/execution/operator/set/physical_recursive_cte.hpp"
5
6namespace duckdb {
7
8MetaPipeline::MetaPipeline(Executor &executor_p, PipelineBuildState &state_p, PhysicalOperator *sink_p)
9 : executor(executor_p), state(state_p), sink(sink_p), recursive_cte(false), next_batch_index(0) {
10 CreatePipeline();
11}
12
13Executor &MetaPipeline::GetExecutor() const {
14 return executor;
15}
16
17PipelineBuildState &MetaPipeline::GetState() const {
18 return state;
19}
20
21optional_ptr<PhysicalOperator> MetaPipeline::GetSink() const {
22 return sink;
23}
24
25shared_ptr<Pipeline> &MetaPipeline::GetBasePipeline() {
26 return pipelines[0];
27}
28
29void MetaPipeline::GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive) {
30 result.insert(position: result.end(), first: pipelines.begin(), last: pipelines.end());
31 if (recursive) {
32 for (auto &child : children) {
33 child->GetPipelines(result, recursive: true);
34 }
35 }
36}
37
38void MetaPipeline::GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip) {
39 if (!skip) {
40 result.push_back(x: shared_from_this());
41 }
42 if (recursive) {
43 for (auto &child : children) {
44 child->GetMetaPipelines(result, recursive: true, skip: false);
45 }
46 }
47}
48
49const vector<Pipeline *> *MetaPipeline::GetDependencies(Pipeline *dependant) const {
50 auto it = dependencies.find(x: dependant);
51 if (it == dependencies.end()) {
52 return nullptr;
53 } else {
54 return &it->second;
55 }
56}
57
58bool MetaPipeline::HasRecursiveCTE() const {
59 return recursive_cte;
60}
61
62void MetaPipeline::SetRecursiveCTE() {
63 recursive_cte = true;
64}
65
66void MetaPipeline::AssignNextBatchIndex(Pipeline *pipeline) {
67 pipeline->base_batch_index = next_batch_index++ * PipelineBuildState::BATCH_INCREMENT;
68}
69
70void MetaPipeline::Build(PhysicalOperator &op) {
71 D_ASSERT(pipelines.size() == 1);
72 D_ASSERT(children.empty());
73 op.BuildPipelines(current&: *pipelines.back(), meta_pipeline&: *this);
74}
75
76void MetaPipeline::Ready() {
77 for (auto &pipeline : pipelines) {
78 pipeline->Ready();
79 }
80 for (auto &child : children) {
81 child->Ready();
82 }
83}
84
85MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline &current, PhysicalOperator &op) {
86 children.push_back(x: make_shared<MetaPipeline>(args&: executor, args&: state, args: &op));
87 auto child_meta_pipeline = children.back().get();
88 // child MetaPipeline must finish completely before this MetaPipeline can start
89 current.AddDependency(pipeline&: child_meta_pipeline->GetBasePipeline());
90 // child meta pipeline is part of the recursive CTE too
91 child_meta_pipeline->recursive_cte = recursive_cte;
92 return *child_meta_pipeline;
93}
94
95Pipeline *MetaPipeline::CreatePipeline() {
96 pipelines.emplace_back(args: make_shared<Pipeline>(args&: executor));
97 state.SetPipelineSink(pipeline&: *pipelines.back(), op: sink, sink_pipeline_count: next_batch_index++);
98 return pipelines.back().get();
99}
100
101void MetaPipeline::AddDependenciesFrom(Pipeline *dependant, Pipeline *start, bool including) {
102 // find 'start'
103 auto it = pipelines.begin();
104 for (; it->get() != start; it++) {
105 }
106
107 if (!including) {
108 it++;
109 }
110
111 // collect pipelines that were created from then
112 vector<Pipeline *> created_pipelines;
113 for (; it != pipelines.end(); it++) {
114 if (it->get() == dependant) {
115 // cannot depend on itself
116 continue;
117 }
118 created_pipelines.push_back(x: it->get());
119 }
120
121 // add them to the dependencies
122 auto &deps = dependencies[dependant];
123 deps.insert(position: deps.begin(), first: created_pipelines.begin(), last: created_pipelines.end());
124}
125
126void MetaPipeline::AddFinishEvent(Pipeline *pipeline) {
127 D_ASSERT(finish_pipelines.find(pipeline) == finish_pipelines.end());
128 finish_pipelines.insert(x: pipeline);
129
130 // add all pipelines that were added since 'pipeline' was added (including 'pipeline') to the finish group
131 auto it = pipelines.begin();
132 for (; it->get() != pipeline; it++) {
133 }
134 it++;
135 for (; it != pipelines.end(); it++) {
136 finish_map.emplace(args: it->get(), args&: pipeline);
137 }
138}
139
140bool MetaPipeline::HasFinishEvent(Pipeline *pipeline) const {
141 return finish_pipelines.find(x: pipeline) != finish_pipelines.end();
142}
143
144optional_ptr<Pipeline> MetaPipeline::GetFinishGroup(Pipeline *pipeline) const {
145 auto it = finish_map.find(x: pipeline);
146 return it == finish_map.end() ? nullptr : it->second;
147}
148
149Pipeline *MetaPipeline::CreateUnionPipeline(Pipeline &current, bool order_matters) {
150 // create the union pipeline (batch index 0, should be set correctly afterwards)
151 auto union_pipeline = CreatePipeline();
152 state.SetPipelineOperators(pipeline&: *union_pipeline, operators: state.GetPipelineOperators(pipeline&: current));
153 state.SetPipelineSink(pipeline&: *union_pipeline, op: sink, sink_pipeline_count: 0);
154
155 // 'union_pipeline' inherits ALL dependencies of 'current' (within this MetaPipeline, and across MetaPipelines)
156 union_pipeline->dependencies = current.dependencies;
157 auto current_deps = GetDependencies(dependant: &current);
158 if (current_deps) {
159 dependencies[union_pipeline] = *current_deps;
160 }
161
162 if (order_matters) {
163 // if we need to preserve order, or if the sink is not parallel, we set a dependency
164 dependencies[union_pipeline].push_back(x: &current);
165 }
166
167 return union_pipeline;
168}
169
170void MetaPipeline::CreateChildPipeline(Pipeline &current, PhysicalOperator &op, Pipeline *last_pipeline) {
171 // rule 2: 'current' must be fully built (down to the source) before creating the child pipeline
172 D_ASSERT(current.source);
173
174 // create the child pipeline (same batch index)
175 pipelines.emplace_back(args: state.CreateChildPipeline(executor, pipeline&: current, op));
176 auto child_pipeline = pipelines.back().get();
177 child_pipeline->base_batch_index = current.base_batch_index;
178
179 // child pipeline has a dependency (within this MetaPipeline on all pipelines that were scheduled
180 // between 'current' and now (including 'current') - set them up
181 dependencies[child_pipeline].push_back(x: &current);
182 AddDependenciesFrom(dependant: child_pipeline, start: last_pipeline, including: false);
183 D_ASSERT(!GetDependencies(child_pipeline)->empty());
184}
185
186} // namespace duckdb
187