| 1 | #include "duckdb/execution/operator/set/physical_union.hpp" |
| 2 | |
| 3 | #include "duckdb/parallel/meta_pipeline.hpp" |
| 4 | #include "duckdb/parallel/pipeline.hpp" |
| 5 | #include "duckdb/parallel/thread_context.hpp" |
| 6 | |
| 7 | namespace duckdb { |
| 8 | |
| 9 | PhysicalUnion::PhysicalUnion(vector<LogicalType> types, unique_ptr<PhysicalOperator> top, |
| 10 | unique_ptr<PhysicalOperator> bottom, idx_t estimated_cardinality) |
| 11 | : PhysicalOperator(PhysicalOperatorType::UNION, std::move(types), estimated_cardinality) { |
| 12 | children.push_back(x: std::move(top)); |
| 13 | children.push_back(x: std::move(bottom)); |
| 14 | } |
| 15 | |
| 16 | //===--------------------------------------------------------------------===// |
| 17 | // Pipeline Construction |
| 18 | //===--------------------------------------------------------------------===// |
| 19 | void PhysicalUnion::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
| 20 | op_state.reset(); |
| 21 | sink_state.reset(); |
| 22 | |
| 23 | // order matters if any of the downstream operators are order dependent, |
| 24 | // or if the sink preserves order, but does not support batch indices to do so |
| 25 | auto sink = meta_pipeline.GetSink(); |
| 26 | bool order_matters = false; |
| 27 | if (current.IsOrderDependent()) { |
| 28 | order_matters = true; |
| 29 | } |
| 30 | if (sink) { |
| 31 | if (sink->SinkOrderDependent() || sink->RequiresBatchIndex()) { |
| 32 | order_matters = true; |
| 33 | } |
| 34 | if (!sink->ParallelSink()) { |
| 35 | order_matters = true; |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | // create a union pipeline that is identical to 'current' |
| 40 | auto union_pipeline = meta_pipeline.CreateUnionPipeline(current, order_matters); |
| 41 | |
| 42 | // continue with the current pipeline |
| 43 | children[0]->BuildPipelines(current, meta_pipeline); |
| 44 | |
| 45 | if (order_matters) { |
| 46 | // order matters, so 'union_pipeline' must come after all pipelines created by building out 'current' |
| 47 | meta_pipeline.AddDependenciesFrom(dependant: union_pipeline, start: union_pipeline, including: false); |
| 48 | } |
| 49 | |
| 50 | // build the union pipeline |
| 51 | children[1]->BuildPipelines(current&: *union_pipeline, meta_pipeline); |
| 52 | |
| 53 | // Assign proper batch index to the union pipeline |
| 54 | // This needs to happen after the pipelines have been built because unions can be nested |
| 55 | meta_pipeline.AssignNextBatchIndex(pipeline: union_pipeline); |
| 56 | } |
| 57 | |
| 58 | vector<const_reference<PhysicalOperator>> PhysicalUnion::GetSources() const { |
| 59 | vector<const_reference<PhysicalOperator>> result; |
| 60 | for (auto &child : children) { |
| 61 | auto child_sources = child->GetSources(); |
| 62 | result.insert(position: result.end(), first: child_sources.begin(), last: child_sources.end()); |
| 63 | } |
| 64 | return result; |
| 65 | } |
| 66 | |
| 67 | } // namespace duckdb |
| 68 | |