| 1 | //===----------------------------------------------------------------------===// |
| 2 | // DuckDB |
| 3 | // |
| 4 | // duckdb/parallel/pipeline.hpp |
| 5 | // |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #pragma once |
| 10 | |
| 11 | #include "duckdb/common/atomic.hpp" |
| 12 | #include "duckdb/common/unordered_set.hpp" |
| 13 | #include "duckdb/common/set.hpp" |
| 14 | #include "duckdb/execution/physical_operator.hpp" |
| 15 | #include "duckdb/function/table_function.hpp" |
| 16 | #include "duckdb/parallel/task_scheduler.hpp" |
| 17 | #include "duckdb/common/reference_map.hpp" |
| 18 | |
| 19 | namespace duckdb { |
| 20 | |
| 21 | class Executor; |
| 22 | class Event; |
| 23 | class MetaPipeline; |
| 24 | |
| 25 | class PipelineBuildState { |
| 26 | public: |
| 27 | //! How much to increment batch indexes when multiple pipelines share the same source |
| 28 | constexpr static idx_t BATCH_INCREMENT = 10000000000000; |
| 29 | |
| 30 | public: |
| 31 | //! Duplicate eliminated join scan dependencies |
| 32 | reference_map_t<const PhysicalOperator, reference<Pipeline>> delim_join_dependencies; |
| 33 | |
| 34 | public: |
| 35 | void SetPipelineSource(Pipeline &pipeline, PhysicalOperator &op); |
| 36 | void SetPipelineSink(Pipeline &pipeline, optional_ptr<PhysicalOperator> op, idx_t sink_pipeline_count); |
| 37 | void SetPipelineOperators(Pipeline &pipeline, vector<reference<PhysicalOperator>> operators); |
| 38 | void AddPipelineOperator(Pipeline &pipeline, PhysicalOperator &op); |
| 39 | shared_ptr<Pipeline> CreateChildPipeline(Executor &executor, Pipeline &pipeline, PhysicalOperator &op); |
| 40 | |
| 41 | optional_ptr<PhysicalOperator> GetPipelineSource(Pipeline &pipeline); |
| 42 | optional_ptr<PhysicalOperator> GetPipelineSink(Pipeline &pipeline); |
| 43 | vector<reference<PhysicalOperator>> GetPipelineOperators(Pipeline &pipeline); |
| 44 | }; |
| 45 | |
| 46 | //! The Pipeline class represents an execution pipeline starting at a |
| 47 | class Pipeline : public std::enable_shared_from_this<Pipeline> { |
| 48 | friend class Executor; |
| 49 | friend class PipelineExecutor; |
| 50 | friend class PipelineEvent; |
| 51 | friend class PipelineFinishEvent; |
| 52 | friend class PipelineBuildState; |
| 53 | friend class MetaPipeline; |
| 54 | |
| 55 | public: |
| 56 | explicit Pipeline(Executor &execution_context); |
| 57 | |
| 58 | Executor &executor; |
| 59 | |
| 60 | public: |
| 61 | ClientContext &GetClientContext(); |
| 62 | |
| 63 | void AddDependency(shared_ptr<Pipeline> &pipeline); |
| 64 | |
| 65 | void Ready(); |
| 66 | void Reset(); |
| 67 | void ResetSink(); |
| 68 | void ResetSource(bool force); |
| 69 | void ClearSource(); |
| 70 | void Schedule(shared_ptr<Event> &event); |
| 71 | |
| 72 | //! Finalize this pipeline |
| 73 | void Finalize(Event &event); |
| 74 | |
| 75 | string ToString() const; |
| 76 | void Print() const; |
| 77 | void PrintDependencies() const; |
| 78 | |
| 79 | //! Returns query progress |
| 80 | bool GetProgress(double ¤t_percentage, idx_t &estimated_cardinality); |
| 81 | |
| 82 | //! Returns a list of all operators (including source and sink) involved in this pipeline |
| 83 | vector<reference<PhysicalOperator>> GetOperators(); |
| 84 | vector<const_reference<PhysicalOperator>> GetOperators() const; |
| 85 | |
| 86 | optional_ptr<PhysicalOperator> GetSink() { |
| 87 | return sink; |
| 88 | } |
| 89 | |
| 90 | optional_ptr<PhysicalOperator> GetSource() { |
| 91 | return source; |
| 92 | } |
| 93 | |
| 94 | //! Returns whether any of the operators in the pipeline care about preserving order |
| 95 | bool IsOrderDependent() const; |
| 96 | |
| 97 | //! Registers a new batch index for a pipeline executor - returns the current minimum batch index |
| 98 | idx_t RegisterNewBatchIndex(); |
| 99 | |
| 100 | //! Updates the batch index of a pipeline (and returns the new minimum batch index) |
| 101 | idx_t UpdateBatchIndex(idx_t old_index, idx_t new_index); |
| 102 | |
| 103 | private: |
| 104 | //! Whether or not the pipeline has been readied |
| 105 | bool ready; |
| 106 | //! Whether or not the pipeline has been initialized |
| 107 | atomic<bool> initialized; |
| 108 | //! The source of this pipeline |
| 109 | optional_ptr<PhysicalOperator> source; |
| 110 | //! The chain of intermediate operators |
| 111 | vector<reference<PhysicalOperator>> operators; |
| 112 | //! The sink (i.e. destination) for data; this is e.g. a hash table to-be-built |
| 113 | optional_ptr<PhysicalOperator> sink; |
| 114 | |
| 115 | //! The global source state |
| 116 | unique_ptr<GlobalSourceState> source_state; |
| 117 | |
| 118 | //! The parent pipelines (i.e. pipelines that are dependent on this pipeline to finish) |
| 119 | vector<weak_ptr<Pipeline>> parents; |
| 120 | //! The dependencies of this pipeline |
| 121 | vector<weak_ptr<Pipeline>> dependencies; |
| 122 | |
| 123 | //! The base batch index of this pipeline |
| 124 | idx_t base_batch_index = 0; |
| 125 | //! Lock for accessing the set of batch indexes |
| 126 | mutex batch_lock; |
| 127 | //! The set of batch indexes that are currently being processed |
| 128 | //! Despite batch indexes being unique - this is a multiset |
| 129 | //! The reason is that when we start a new pipeline we insert the current minimum batch index as a placeholder |
| 130 | //! Which leads to duplicate entries in the set of active batch indexes |
| 131 | multiset<idx_t> batch_indexes; |
| 132 | |
| 133 | private: |
| 134 | void ScheduleSequentialTask(shared_ptr<Event> &event); |
| 135 | bool LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads); |
| 136 | |
| 137 | bool ScheduleParallel(shared_ptr<Event> &event); |
| 138 | }; |
| 139 | |
| 140 | } // namespace duckdb |
| 141 | |