1 | #include "duckdb/parallel/meta_pipeline.hpp" |
2 | |
3 | #include "duckdb/execution/executor.hpp" |
4 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
5 | |
6 | namespace duckdb { |
7 | |
8 | MetaPipeline::MetaPipeline(Executor &executor_p, PipelineBuildState &state_p, PhysicalOperator *sink_p) |
9 | : executor(executor_p), state(state_p), sink(sink_p), recursive_cte(false), next_batch_index(0) { |
10 | CreatePipeline(); |
11 | } |
12 | |
13 | Executor &MetaPipeline::GetExecutor() const { |
14 | return executor; |
15 | } |
16 | |
17 | PipelineBuildState &MetaPipeline::GetState() const { |
18 | return state; |
19 | } |
20 | |
21 | optional_ptr<PhysicalOperator> MetaPipeline::GetSink() const { |
22 | return sink; |
23 | } |
24 | |
25 | shared_ptr<Pipeline> &MetaPipeline::GetBasePipeline() { |
26 | return pipelines[0]; |
27 | } |
28 | |
29 | void MetaPipeline::GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive) { |
30 | result.insert(position: result.end(), first: pipelines.begin(), last: pipelines.end()); |
31 | if (recursive) { |
32 | for (auto &child : children) { |
33 | child->GetPipelines(result, recursive: true); |
34 | } |
35 | } |
36 | } |
37 | |
38 | void MetaPipeline::GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip) { |
39 | if (!skip) { |
40 | result.push_back(x: shared_from_this()); |
41 | } |
42 | if (recursive) { |
43 | for (auto &child : children) { |
44 | child->GetMetaPipelines(result, recursive: true, skip: false); |
45 | } |
46 | } |
47 | } |
48 | |
49 | const vector<Pipeline *> *MetaPipeline::GetDependencies(Pipeline *dependant) const { |
50 | auto it = dependencies.find(x: dependant); |
51 | if (it == dependencies.end()) { |
52 | return nullptr; |
53 | } else { |
54 | return &it->second; |
55 | } |
56 | } |
57 | |
58 | bool MetaPipeline::HasRecursiveCTE() const { |
59 | return recursive_cte; |
60 | } |
61 | |
62 | void MetaPipeline::SetRecursiveCTE() { |
63 | recursive_cte = true; |
64 | } |
65 | |
66 | void MetaPipeline::AssignNextBatchIndex(Pipeline *pipeline) { |
67 | pipeline->base_batch_index = next_batch_index++ * PipelineBuildState::BATCH_INCREMENT; |
68 | } |
69 | |
70 | void MetaPipeline::Build(PhysicalOperator &op) { |
71 | D_ASSERT(pipelines.size() == 1); |
72 | D_ASSERT(children.empty()); |
73 | op.BuildPipelines(current&: *pipelines.back(), meta_pipeline&: *this); |
74 | } |
75 | |
76 | void MetaPipeline::Ready() { |
77 | for (auto &pipeline : pipelines) { |
78 | pipeline->Ready(); |
79 | } |
80 | for (auto &child : children) { |
81 | child->Ready(); |
82 | } |
83 | } |
84 | |
85 | MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline ¤t, PhysicalOperator &op) { |
86 | children.push_back(x: make_shared<MetaPipeline>(args&: executor, args&: state, args: &op)); |
87 | auto child_meta_pipeline = children.back().get(); |
88 | // child MetaPipeline must finish completely before this MetaPipeline can start |
89 | current.AddDependency(pipeline&: child_meta_pipeline->GetBasePipeline()); |
90 | // child meta pipeline is part of the recursive CTE too |
91 | child_meta_pipeline->recursive_cte = recursive_cte; |
92 | return *child_meta_pipeline; |
93 | } |
94 | |
95 | Pipeline *MetaPipeline::CreatePipeline() { |
96 | pipelines.emplace_back(args: make_shared<Pipeline>(args&: executor)); |
97 | state.SetPipelineSink(pipeline&: *pipelines.back(), op: sink, sink_pipeline_count: next_batch_index++); |
98 | return pipelines.back().get(); |
99 | } |
100 | |
101 | void MetaPipeline::AddDependenciesFrom(Pipeline *dependant, Pipeline *start, bool including) { |
102 | // find 'start' |
103 | auto it = pipelines.begin(); |
104 | for (; it->get() != start; it++) { |
105 | } |
106 | |
107 | if (!including) { |
108 | it++; |
109 | } |
110 | |
111 | // collect pipelines that were created from then |
112 | vector<Pipeline *> created_pipelines; |
113 | for (; it != pipelines.end(); it++) { |
114 | if (it->get() == dependant) { |
115 | // cannot depend on itself |
116 | continue; |
117 | } |
118 | created_pipelines.push_back(x: it->get()); |
119 | } |
120 | |
121 | // add them to the dependencies |
122 | auto &deps = dependencies[dependant]; |
123 | deps.insert(position: deps.begin(), first: created_pipelines.begin(), last: created_pipelines.end()); |
124 | } |
125 | |
126 | void MetaPipeline::AddFinishEvent(Pipeline *pipeline) { |
127 | D_ASSERT(finish_pipelines.find(pipeline) == finish_pipelines.end()); |
128 | finish_pipelines.insert(x: pipeline); |
129 | |
130 | // add all pipelines that were added since 'pipeline' was added (including 'pipeline') to the finish group |
131 | auto it = pipelines.begin(); |
132 | for (; it->get() != pipeline; it++) { |
133 | } |
134 | it++; |
135 | for (; it != pipelines.end(); it++) { |
136 | finish_map.emplace(args: it->get(), args&: pipeline); |
137 | } |
138 | } |
139 | |
140 | bool MetaPipeline::HasFinishEvent(Pipeline *pipeline) const { |
141 | return finish_pipelines.find(x: pipeline) != finish_pipelines.end(); |
142 | } |
143 | |
144 | optional_ptr<Pipeline> MetaPipeline::GetFinishGroup(Pipeline *pipeline) const { |
145 | auto it = finish_map.find(x: pipeline); |
146 | return it == finish_map.end() ? nullptr : it->second; |
147 | } |
148 | |
149 | Pipeline *MetaPipeline::CreateUnionPipeline(Pipeline ¤t, bool order_matters) { |
150 | // create the union pipeline (batch index 0, should be set correctly afterwards) |
151 | auto union_pipeline = CreatePipeline(); |
152 | state.SetPipelineOperators(pipeline&: *union_pipeline, operators: state.GetPipelineOperators(pipeline&: current)); |
153 | state.SetPipelineSink(pipeline&: *union_pipeline, op: sink, sink_pipeline_count: 0); |
154 | |
155 | // 'union_pipeline' inherits ALL dependencies of 'current' (within this MetaPipeline, and across MetaPipelines) |
156 | union_pipeline->dependencies = current.dependencies; |
157 | auto current_deps = GetDependencies(dependant: ¤t); |
158 | if (current_deps) { |
159 | dependencies[union_pipeline] = *current_deps; |
160 | } |
161 | |
162 | if (order_matters) { |
163 | // if we need to preserve order, or if the sink is not parallel, we set a dependency |
164 | dependencies[union_pipeline].push_back(x: ¤t); |
165 | } |
166 | |
167 | return union_pipeline; |
168 | } |
169 | |
170 | void MetaPipeline::CreateChildPipeline(Pipeline ¤t, PhysicalOperator &op, Pipeline *last_pipeline) { |
171 | // rule 2: 'current' must be fully built (down to the source) before creating the child pipeline |
172 | D_ASSERT(current.source); |
173 | |
174 | // create the child pipeline (same batch index) |
175 | pipelines.emplace_back(args: state.CreateChildPipeline(executor, pipeline&: current, op)); |
176 | auto child_pipeline = pipelines.back().get(); |
177 | child_pipeline->base_batch_index = current.base_batch_index; |
178 | |
179 | // child pipeline has a dependency (within this MetaPipeline on all pipelines that were scheduled |
180 | // between 'current' and now (including 'current') - set them up |
181 | dependencies[child_pipeline].push_back(x: ¤t); |
182 | AddDependenciesFrom(dependant: child_pipeline, start: last_pipeline, including: false); |
183 | D_ASSERT(!GetDependencies(child_pipeline)->empty()); |
184 | } |
185 | |
186 | } // namespace duckdb |
187 | |