1#include "duckdb/execution/operator/set/physical_union.hpp"
2
3#include "duckdb/parallel/meta_pipeline.hpp"
4#include "duckdb/parallel/pipeline.hpp"
5#include "duckdb/parallel/thread_context.hpp"
6
7namespace duckdb {
8
9PhysicalUnion::PhysicalUnion(vector<LogicalType> types, unique_ptr<PhysicalOperator> top,
10 unique_ptr<PhysicalOperator> bottom, idx_t estimated_cardinality)
11 : PhysicalOperator(PhysicalOperatorType::UNION, std::move(types), estimated_cardinality) {
12 children.push_back(x: std::move(top));
13 children.push_back(x: std::move(bottom));
14}
15
16//===--------------------------------------------------------------------===//
17// Pipeline Construction
18//===--------------------------------------------------------------------===//
19void PhysicalUnion::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
20 op_state.reset();
21 sink_state.reset();
22
23 // order matters if any of the downstream operators are order dependent,
24 // or if the sink preserves order, but does not support batch indices to do so
25 auto sink = meta_pipeline.GetSink();
26 bool order_matters = false;
27 if (current.IsOrderDependent()) {
28 order_matters = true;
29 }
30 if (sink) {
31 if (sink->SinkOrderDependent() || sink->RequiresBatchIndex()) {
32 order_matters = true;
33 }
34 if (!sink->ParallelSink()) {
35 order_matters = true;
36 }
37 }
38
39 // create a union pipeline that is identical to 'current'
40 auto union_pipeline = meta_pipeline.CreateUnionPipeline(current, order_matters);
41
42 // continue with the current pipeline
43 children[0]->BuildPipelines(current, meta_pipeline);
44
45 if (order_matters) {
46 // order matters, so 'union_pipeline' must come after all pipelines created by building out 'current'
47 meta_pipeline.AddDependenciesFrom(dependant: union_pipeline, start: union_pipeline, including: false);
48 }
49
50 // build the union pipeline
51 children[1]->BuildPipelines(current&: *union_pipeline, meta_pipeline);
52
53 // Assign proper batch index to the union pipeline
54 // This needs to happen after the pipelines have been built because unions can be nested
55 meta_pipeline.AssignNextBatchIndex(pipeline: union_pipeline);
56}
57
58vector<const_reference<PhysicalOperator>> PhysicalUnion::GetSources() const {
59 vector<const_reference<PhysicalOperator>> result;
60 for (auto &child : children) {
61 auto child_sources = child->GetSources();
62 result.insert(position: result.end(), first: child_sources.begin(), last: child_sources.end());
63 }
64 return result;
65}
66
67} // namespace duckdb
68