1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/parallel/pipeline.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/atomic.hpp"
12#include "duckdb/common/unordered_set.hpp"
13#include "duckdb/common/set.hpp"
14#include "duckdb/execution/physical_operator.hpp"
15#include "duckdb/function/table_function.hpp"
16#include "duckdb/parallel/task_scheduler.hpp"
17#include "duckdb/common/reference_map.hpp"
18
19namespace duckdb {
20
21class Executor;
22class Event;
23class MetaPipeline;
24
25class PipelineBuildState {
26public:
27 //! How much to increment batch indexes when multiple pipelines share the same source
28 constexpr static idx_t BATCH_INCREMENT = 10000000000000;
29
30public:
31 //! Duplicate eliminated join scan dependencies
32 reference_map_t<const PhysicalOperator, reference<Pipeline>> delim_join_dependencies;
33
34public:
35 void SetPipelineSource(Pipeline &pipeline, PhysicalOperator &op);
36 void SetPipelineSink(Pipeline &pipeline, optional_ptr<PhysicalOperator> op, idx_t sink_pipeline_count);
37 void SetPipelineOperators(Pipeline &pipeline, vector<reference<PhysicalOperator>> operators);
38 void AddPipelineOperator(Pipeline &pipeline, PhysicalOperator &op);
39 shared_ptr<Pipeline> CreateChildPipeline(Executor &executor, Pipeline &pipeline, PhysicalOperator &op);
40
41 optional_ptr<PhysicalOperator> GetPipelineSource(Pipeline &pipeline);
42 optional_ptr<PhysicalOperator> GetPipelineSink(Pipeline &pipeline);
43 vector<reference<PhysicalOperator>> GetPipelineOperators(Pipeline &pipeline);
44};
45
46//! The Pipeline class represents an execution pipeline starting at a
47class Pipeline : public std::enable_shared_from_this<Pipeline> {
48 friend class Executor;
49 friend class PipelineExecutor;
50 friend class PipelineEvent;
51 friend class PipelineFinishEvent;
52 friend class PipelineBuildState;
53 friend class MetaPipeline;
54
55public:
56 explicit Pipeline(Executor &execution_context);
57
58 Executor &executor;
59
60public:
61 ClientContext &GetClientContext();
62
63 void AddDependency(shared_ptr<Pipeline> &pipeline);
64
65 void Ready();
66 void Reset();
67 void ResetSink();
68 void ResetSource(bool force);
69 void ClearSource();
70 void Schedule(shared_ptr<Event> &event);
71
72 //! Finalize this pipeline
73 void Finalize(Event &event);
74
75 string ToString() const;
76 void Print() const;
77 void PrintDependencies() const;
78
79 //! Returns query progress
80 bool GetProgress(double &current_percentage, idx_t &estimated_cardinality);
81
82 //! Returns a list of all operators (including source and sink) involved in this pipeline
83 vector<reference<PhysicalOperator>> GetOperators();
84 vector<const_reference<PhysicalOperator>> GetOperators() const;
85
86 optional_ptr<PhysicalOperator> GetSink() {
87 return sink;
88 }
89
90 optional_ptr<PhysicalOperator> GetSource() {
91 return source;
92 }
93
94 //! Returns whether any of the operators in the pipeline care about preserving order
95 bool IsOrderDependent() const;
96
97 //! Registers a new batch index for a pipeline executor - returns the current minimum batch index
98 idx_t RegisterNewBatchIndex();
99
100 //! Updates the batch index of a pipeline (and returns the new minimum batch index)
101 idx_t UpdateBatchIndex(idx_t old_index, idx_t new_index);
102
103private:
104 //! Whether or not the pipeline has been readied
105 bool ready;
106 //! Whether or not the pipeline has been initialized
107 atomic<bool> initialized;
108 //! The source of this pipeline
109 optional_ptr<PhysicalOperator> source;
110 //! The chain of intermediate operators
111 vector<reference<PhysicalOperator>> operators;
112 //! The sink (i.e. destination) for data; this is e.g. a hash table to-be-built
113 optional_ptr<PhysicalOperator> sink;
114
115 //! The global source state
116 unique_ptr<GlobalSourceState> source_state;
117
118 //! The parent pipelines (i.e. pipelines that are dependent on this pipeline to finish)
119 vector<weak_ptr<Pipeline>> parents;
120 //! The dependencies of this pipeline
121 vector<weak_ptr<Pipeline>> dependencies;
122
123 //! The base batch index of this pipeline
124 idx_t base_batch_index = 0;
125 //! Lock for accessing the set of batch indexes
126 mutex batch_lock;
127 //! The set of batch indexes that are currently being processed
128 //! Despite batch indexes being unique - this is a multiset
129 //! The reason is that when we start a new pipeline we insert the current minimum batch index as a placeholder
130 //! Which leads to duplicate entries in the set of active batch indexes
131 multiset<idx_t> batch_indexes;
132
133private:
134 void ScheduleSequentialTask(shared_ptr<Event> &event);
135 bool LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads);
136
137 bool ScheduleParallel(shared_ptr<Event> &event);
138};
139
140} // namespace duckdb
141