1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/parallel/pipeline.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/common/atomic.hpp" |
12 | #include "duckdb/common/unordered_set.hpp" |
13 | #include "duckdb/common/set.hpp" |
14 | #include "duckdb/execution/physical_operator.hpp" |
15 | #include "duckdb/function/table_function.hpp" |
16 | #include "duckdb/parallel/task_scheduler.hpp" |
17 | #include "duckdb/common/reference_map.hpp" |
18 | |
19 | namespace duckdb { |
20 | |
21 | class Executor; |
22 | class Event; |
23 | class MetaPipeline; |
24 | |
25 | class PipelineBuildState { |
26 | public: |
27 | //! How much to increment batch indexes when multiple pipelines share the same source |
28 | constexpr static idx_t BATCH_INCREMENT = 10000000000000; |
29 | |
30 | public: |
31 | //! Duplicate eliminated join scan dependencies |
32 | reference_map_t<const PhysicalOperator, reference<Pipeline>> delim_join_dependencies; |
33 | |
34 | public: |
35 | void SetPipelineSource(Pipeline &pipeline, PhysicalOperator &op); |
36 | void SetPipelineSink(Pipeline &pipeline, optional_ptr<PhysicalOperator> op, idx_t sink_pipeline_count); |
37 | void SetPipelineOperators(Pipeline &pipeline, vector<reference<PhysicalOperator>> operators); |
38 | void AddPipelineOperator(Pipeline &pipeline, PhysicalOperator &op); |
39 | shared_ptr<Pipeline> CreateChildPipeline(Executor &executor, Pipeline &pipeline, PhysicalOperator &op); |
40 | |
41 | optional_ptr<PhysicalOperator> GetPipelineSource(Pipeline &pipeline); |
42 | optional_ptr<PhysicalOperator> GetPipelineSink(Pipeline &pipeline); |
43 | vector<reference<PhysicalOperator>> GetPipelineOperators(Pipeline &pipeline); |
44 | }; |
45 | |
46 | //! The Pipeline class represents an execution pipeline starting at a |
47 | class Pipeline : public std::enable_shared_from_this<Pipeline> { |
48 | friend class Executor; |
49 | friend class PipelineExecutor; |
50 | friend class PipelineEvent; |
51 | friend class PipelineFinishEvent; |
52 | friend class PipelineBuildState; |
53 | friend class MetaPipeline; |
54 | |
55 | public: |
56 | explicit Pipeline(Executor &execution_context); |
57 | |
58 | Executor &executor; |
59 | |
60 | public: |
61 | ClientContext &GetClientContext(); |
62 | |
63 | void AddDependency(shared_ptr<Pipeline> &pipeline); |
64 | |
65 | void Ready(); |
66 | void Reset(); |
67 | void ResetSink(); |
68 | void ResetSource(bool force); |
69 | void ClearSource(); |
70 | void Schedule(shared_ptr<Event> &event); |
71 | |
72 | //! Finalize this pipeline |
73 | void Finalize(Event &event); |
74 | |
75 | string ToString() const; |
76 | void Print() const; |
77 | void PrintDependencies() const; |
78 | |
79 | //! Returns query progress |
80 | bool GetProgress(double ¤t_percentage, idx_t &estimated_cardinality); |
81 | |
82 | //! Returns a list of all operators (including source and sink) involved in this pipeline |
83 | vector<reference<PhysicalOperator>> GetOperators(); |
84 | vector<const_reference<PhysicalOperator>> GetOperators() const; |
85 | |
86 | optional_ptr<PhysicalOperator> GetSink() { |
87 | return sink; |
88 | } |
89 | |
90 | optional_ptr<PhysicalOperator> GetSource() { |
91 | return source; |
92 | } |
93 | |
94 | //! Returns whether any of the operators in the pipeline care about preserving order |
95 | bool IsOrderDependent() const; |
96 | |
97 | //! Registers a new batch index for a pipeline executor - returns the current minimum batch index |
98 | idx_t RegisterNewBatchIndex(); |
99 | |
100 | //! Updates the batch index of a pipeline (and returns the new minimum batch index) |
101 | idx_t UpdateBatchIndex(idx_t old_index, idx_t new_index); |
102 | |
103 | private: |
104 | //! Whether or not the pipeline has been readied |
105 | bool ready; |
106 | //! Whether or not the pipeline has been initialized |
107 | atomic<bool> initialized; |
108 | //! The source of this pipeline |
109 | optional_ptr<PhysicalOperator> source; |
110 | //! The chain of intermediate operators |
111 | vector<reference<PhysicalOperator>> operators; |
112 | //! The sink (i.e. destination) for data; this is e.g. a hash table to-be-built |
113 | optional_ptr<PhysicalOperator> sink; |
114 | |
115 | //! The global source state |
116 | unique_ptr<GlobalSourceState> source_state; |
117 | |
118 | //! The parent pipelines (i.e. pipelines that are dependent on this pipeline to finish) |
119 | vector<weak_ptr<Pipeline>> parents; |
120 | //! The dependencies of this pipeline |
121 | vector<weak_ptr<Pipeline>> dependencies; |
122 | |
123 | //! The base batch index of this pipeline |
124 | idx_t base_batch_index = 0; |
125 | //! Lock for accessing the set of batch indexes |
126 | mutex batch_lock; |
127 | //! The set of batch indexes that are currently being processed |
128 | //! Despite batch indexes being unique - this is a multiset |
129 | //! The reason is that when we start a new pipeline we insert the current minimum batch index as a placeholder |
130 | //! Which leads to duplicate entries in the set of active batch indexes |
131 | multiset<idx_t> batch_indexes; |
132 | |
133 | private: |
134 | void ScheduleSequentialTask(shared_ptr<Event> &event); |
135 | bool LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads); |
136 | |
137 | bool ScheduleParallel(shared_ptr<Event> &event); |
138 | }; |
139 | |
140 | } // namespace duckdb |
141 | |