1#include "duckdb/execution/operator/join/physical_join.hpp"
2
3#include "duckdb/execution/operator/join/physical_hash_join.hpp"
4#include "duckdb/parallel/meta_pipeline.hpp"
5#include "duckdb/parallel/pipeline.hpp"
6
7namespace duckdb {
8
9PhysicalJoin::PhysicalJoin(LogicalOperator &op, PhysicalOperatorType type, JoinType join_type,
10 idx_t estimated_cardinality)
11 : CachingPhysicalOperator(type, op.types, estimated_cardinality), join_type(join_type) {
12}
13
14bool PhysicalJoin::EmptyResultIfRHSIsEmpty() const {
15 // empty RHS with INNER, RIGHT or SEMI join means empty result set
16 switch (join_type) {
17 case JoinType::INNER:
18 case JoinType::RIGHT:
19 case JoinType::SEMI:
20 return true;
21 default:
22 return false;
23 }
24}
25
26//===--------------------------------------------------------------------===//
27// Pipeline Construction
28//===--------------------------------------------------------------------===//
29void PhysicalJoin::BuildJoinPipelines(Pipeline &current, MetaPipeline &meta_pipeline, PhysicalOperator &op) {
30 op.op_state.reset();
31 op.sink_state.reset();
32
33 // 'current' is the probe pipeline: add this operator
34 auto &state = meta_pipeline.GetState();
35 state.AddPipelineOperator(pipeline&: current, op);
36
37 // save the last added pipeline to set up dependencies later (in case we need to add a child pipeline)
38 vector<shared_ptr<Pipeline>> pipelines_so_far;
39 meta_pipeline.GetPipelines(result&: pipelines_so_far, recursive: false);
40 auto last_pipeline = pipelines_so_far.back().get();
41
42 // on the RHS (build side), we construct a child MetaPipeline with this operator as its sink
43 auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op);
44 child_meta_pipeline.Build(op&: *op.children[1]);
45
46 // continue building the current pipeline on the LHS (probe side)
47 op.children[0]->BuildPipelines(current, meta_pipeline);
48
49 switch (op.type) {
50 case PhysicalOperatorType::POSITIONAL_JOIN:
51 // Positional joins are always outer
52 meta_pipeline.CreateChildPipeline(current, op, last_pipeline);
53 return;
54 case PhysicalOperatorType::CROSS_PRODUCT:
55 return;
56 default:
57 break;
58 }
59
60 // Join can become a source operator if it's RIGHT/OUTER, or if the hash join goes out-of-core
61 bool add_child_pipeline = false;
62 auto &join_op = op.Cast<PhysicalJoin>();
63 if (IsRightOuterJoin(type: join_op.join_type) || join_op.type == PhysicalOperatorType::HASH_JOIN) {
64 add_child_pipeline = true;
65 }
66
67 if (add_child_pipeline) {
68 meta_pipeline.CreateChildPipeline(current, op, last_pipeline);
69 }
70}
71
72void PhysicalJoin::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
73 PhysicalJoin::BuildJoinPipelines(current, meta_pipeline, op&: *this);
74}
75
76vector<const_reference<PhysicalOperator>> PhysicalJoin::GetSources() const {
77 auto result = children[0]->GetSources();
78 if (IsSource()) {
79 result.push_back(x: *this);
80 }
81 return result;
82}
83
84} // namespace duckdb
85