1 | #include "duckdb/execution/operator/set/physical_union.hpp" |
2 | |
3 | #include "duckdb/parallel/meta_pipeline.hpp" |
4 | #include "duckdb/parallel/pipeline.hpp" |
5 | #include "duckdb/parallel/thread_context.hpp" |
6 | |
7 | namespace duckdb { |
8 | |
9 | PhysicalUnion::PhysicalUnion(vector<LogicalType> types, unique_ptr<PhysicalOperator> top, |
10 | unique_ptr<PhysicalOperator> bottom, idx_t estimated_cardinality) |
11 | : PhysicalOperator(PhysicalOperatorType::UNION, std::move(types), estimated_cardinality) { |
12 | children.push_back(x: std::move(top)); |
13 | children.push_back(x: std::move(bottom)); |
14 | } |
15 | |
16 | //===--------------------------------------------------------------------===// |
17 | // Pipeline Construction |
18 | //===--------------------------------------------------------------------===// |
19 | void PhysicalUnion::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
20 | op_state.reset(); |
21 | sink_state.reset(); |
22 | |
23 | // order matters if any of the downstream operators are order dependent, |
24 | // or if the sink preserves order, but does not support batch indices to do so |
25 | auto sink = meta_pipeline.GetSink(); |
26 | bool order_matters = false; |
27 | if (current.IsOrderDependent()) { |
28 | order_matters = true; |
29 | } |
30 | if (sink) { |
31 | if (sink->SinkOrderDependent() || sink->RequiresBatchIndex()) { |
32 | order_matters = true; |
33 | } |
34 | if (!sink->ParallelSink()) { |
35 | order_matters = true; |
36 | } |
37 | } |
38 | |
39 | // create a union pipeline that is identical to 'current' |
40 | auto union_pipeline = meta_pipeline.CreateUnionPipeline(current, order_matters); |
41 | |
42 | // continue with the current pipeline |
43 | children[0]->BuildPipelines(current, meta_pipeline); |
44 | |
45 | if (order_matters) { |
46 | // order matters, so 'union_pipeline' must come after all pipelines created by building out 'current' |
47 | meta_pipeline.AddDependenciesFrom(dependant: union_pipeline, start: union_pipeline, including: false); |
48 | } |
49 | |
50 | // build the union pipeline |
51 | children[1]->BuildPipelines(current&: *union_pipeline, meta_pipeline); |
52 | |
53 | // Assign proper batch index to the union pipeline |
54 | // This needs to happen after the pipelines have been built because unions can be nested |
55 | meta_pipeline.AssignNextBatchIndex(pipeline: union_pipeline); |
56 | } |
57 | |
58 | vector<const_reference<PhysicalOperator>> PhysicalUnion::GetSources() const { |
59 | vector<const_reference<PhysicalOperator>> result; |
60 | for (auto &child : children) { |
61 | auto child_sources = child->GetSources(); |
62 | result.insert(position: result.end(), first: child_sources.begin(), last: child_sources.end()); |
63 | } |
64 | return result; |
65 | } |
66 | |
67 | } // namespace duckdb |
68 | |