| 1 | #include "duckdb/parallel/meta_pipeline.hpp" |
| 2 | |
| 3 | #include "duckdb/execution/executor.hpp" |
| 4 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
| 5 | |
| 6 | namespace duckdb { |
| 7 | |
| 8 | MetaPipeline::MetaPipeline(Executor &executor_p, PipelineBuildState &state_p, PhysicalOperator *sink_p) |
| 9 | : executor(executor_p), state(state_p), sink(sink_p), recursive_cte(false), next_batch_index(0) { |
| 10 | CreatePipeline(); |
| 11 | } |
| 12 | |
| 13 | Executor &MetaPipeline::GetExecutor() const { |
| 14 | return executor; |
| 15 | } |
| 16 | |
| 17 | PipelineBuildState &MetaPipeline::GetState() const { |
| 18 | return state; |
| 19 | } |
| 20 | |
| 21 | optional_ptr<PhysicalOperator> MetaPipeline::GetSink() const { |
| 22 | return sink; |
| 23 | } |
| 24 | |
| 25 | shared_ptr<Pipeline> &MetaPipeline::GetBasePipeline() { |
| 26 | return pipelines[0]; |
| 27 | } |
| 28 | |
| 29 | void MetaPipeline::GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive) { |
| 30 | result.insert(position: result.end(), first: pipelines.begin(), last: pipelines.end()); |
| 31 | if (recursive) { |
| 32 | for (auto &child : children) { |
| 33 | child->GetPipelines(result, recursive: true); |
| 34 | } |
| 35 | } |
| 36 | } |
| 37 | |
| 38 | void MetaPipeline::GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip) { |
| 39 | if (!skip) { |
| 40 | result.push_back(x: shared_from_this()); |
| 41 | } |
| 42 | if (recursive) { |
| 43 | for (auto &child : children) { |
| 44 | child->GetMetaPipelines(result, recursive: true, skip: false); |
| 45 | } |
| 46 | } |
| 47 | } |
| 48 | |
| 49 | const vector<Pipeline *> *MetaPipeline::GetDependencies(Pipeline *dependant) const { |
| 50 | auto it = dependencies.find(x: dependant); |
| 51 | if (it == dependencies.end()) { |
| 52 | return nullptr; |
| 53 | } else { |
| 54 | return &it->second; |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | bool MetaPipeline::HasRecursiveCTE() const { |
| 59 | return recursive_cte; |
| 60 | } |
| 61 | |
| 62 | void MetaPipeline::SetRecursiveCTE() { |
| 63 | recursive_cte = true; |
| 64 | } |
| 65 | |
| 66 | void MetaPipeline::AssignNextBatchIndex(Pipeline *pipeline) { |
| 67 | pipeline->base_batch_index = next_batch_index++ * PipelineBuildState::BATCH_INCREMENT; |
| 68 | } |
| 69 | |
| 70 | void MetaPipeline::Build(PhysicalOperator &op) { |
| 71 | D_ASSERT(pipelines.size() == 1); |
| 72 | D_ASSERT(children.empty()); |
| 73 | op.BuildPipelines(current&: *pipelines.back(), meta_pipeline&: *this); |
| 74 | } |
| 75 | |
| 76 | void MetaPipeline::Ready() { |
| 77 | for (auto &pipeline : pipelines) { |
| 78 | pipeline->Ready(); |
| 79 | } |
| 80 | for (auto &child : children) { |
| 81 | child->Ready(); |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline ¤t, PhysicalOperator &op) { |
| 86 | children.push_back(x: make_shared<MetaPipeline>(args&: executor, args&: state, args: &op)); |
| 87 | auto child_meta_pipeline = children.back().get(); |
| 88 | // child MetaPipeline must finish completely before this MetaPipeline can start |
| 89 | current.AddDependency(pipeline&: child_meta_pipeline->GetBasePipeline()); |
| 90 | // child meta pipeline is part of the recursive CTE too |
| 91 | child_meta_pipeline->recursive_cte = recursive_cte; |
| 92 | return *child_meta_pipeline; |
| 93 | } |
| 94 | |
| 95 | Pipeline *MetaPipeline::CreatePipeline() { |
| 96 | pipelines.emplace_back(args: make_shared<Pipeline>(args&: executor)); |
| 97 | state.SetPipelineSink(pipeline&: *pipelines.back(), op: sink, sink_pipeline_count: next_batch_index++); |
| 98 | return pipelines.back().get(); |
| 99 | } |
| 100 | |
| 101 | void MetaPipeline::AddDependenciesFrom(Pipeline *dependant, Pipeline *start, bool including) { |
| 102 | // find 'start' |
| 103 | auto it = pipelines.begin(); |
| 104 | for (; it->get() != start; it++) { |
| 105 | } |
| 106 | |
| 107 | if (!including) { |
| 108 | it++; |
| 109 | } |
| 110 | |
| 111 | // collect pipelines that were created from then |
| 112 | vector<Pipeline *> created_pipelines; |
| 113 | for (; it != pipelines.end(); it++) { |
| 114 | if (it->get() == dependant) { |
| 115 | // cannot depend on itself |
| 116 | continue; |
| 117 | } |
| 118 | created_pipelines.push_back(x: it->get()); |
| 119 | } |
| 120 | |
| 121 | // add them to the dependencies |
| 122 | auto &deps = dependencies[dependant]; |
| 123 | deps.insert(position: deps.begin(), first: created_pipelines.begin(), last: created_pipelines.end()); |
| 124 | } |
| 125 | |
| 126 | void MetaPipeline::AddFinishEvent(Pipeline *pipeline) { |
| 127 | D_ASSERT(finish_pipelines.find(pipeline) == finish_pipelines.end()); |
| 128 | finish_pipelines.insert(x: pipeline); |
| 129 | |
| 130 | // add all pipelines that were added since 'pipeline' was added (including 'pipeline') to the finish group |
| 131 | auto it = pipelines.begin(); |
| 132 | for (; it->get() != pipeline; it++) { |
| 133 | } |
| 134 | it++; |
| 135 | for (; it != pipelines.end(); it++) { |
| 136 | finish_map.emplace(args: it->get(), args&: pipeline); |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | bool MetaPipeline::HasFinishEvent(Pipeline *pipeline) const { |
| 141 | return finish_pipelines.find(x: pipeline) != finish_pipelines.end(); |
| 142 | } |
| 143 | |
| 144 | optional_ptr<Pipeline> MetaPipeline::GetFinishGroup(Pipeline *pipeline) const { |
| 145 | auto it = finish_map.find(x: pipeline); |
| 146 | return it == finish_map.end() ? nullptr : it->second; |
| 147 | } |
| 148 | |
| 149 | Pipeline *MetaPipeline::CreateUnionPipeline(Pipeline ¤t, bool order_matters) { |
| 150 | // create the union pipeline (batch index 0, should be set correctly afterwards) |
| 151 | auto union_pipeline = CreatePipeline(); |
| 152 | state.SetPipelineOperators(pipeline&: *union_pipeline, operators: state.GetPipelineOperators(pipeline&: current)); |
| 153 | state.SetPipelineSink(pipeline&: *union_pipeline, op: sink, sink_pipeline_count: 0); |
| 154 | |
| 155 | // 'union_pipeline' inherits ALL dependencies of 'current' (within this MetaPipeline, and across MetaPipelines) |
| 156 | union_pipeline->dependencies = current.dependencies; |
| 157 | auto current_deps = GetDependencies(dependant: ¤t); |
| 158 | if (current_deps) { |
| 159 | dependencies[union_pipeline] = *current_deps; |
| 160 | } |
| 161 | |
| 162 | if (order_matters) { |
| 163 | // if we need to preserve order, or if the sink is not parallel, we set a dependency |
| 164 | dependencies[union_pipeline].push_back(x: ¤t); |
| 165 | } |
| 166 | |
| 167 | return union_pipeline; |
| 168 | } |
| 169 | |
| 170 | void MetaPipeline::CreateChildPipeline(Pipeline ¤t, PhysicalOperator &op, Pipeline *last_pipeline) { |
| 171 | // rule 2: 'current' must be fully built (down to the source) before creating the child pipeline |
| 172 | D_ASSERT(current.source); |
| 173 | |
| 174 | // create the child pipeline (same batch index) |
| 175 | pipelines.emplace_back(args: state.CreateChildPipeline(executor, pipeline&: current, op)); |
| 176 | auto child_pipeline = pipelines.back().get(); |
| 177 | child_pipeline->base_batch_index = current.base_batch_index; |
| 178 | |
| 179 | // child pipeline has a dependency (within this MetaPipeline on all pipelines that were scheduled |
| 180 | // between 'current' and now (including 'current') - set them up |
| 181 | dependencies[child_pipeline].push_back(x: ¤t); |
| 182 | AddDependenciesFrom(dependant: child_pipeline, start: last_pipeline, including: false); |
| 183 | D_ASSERT(!GetDependencies(child_pipeline)->empty()); |
| 184 | } |
| 185 | |
| 186 | } // namespace duckdb |
| 187 | |