1 | #pragma once |
2 | |
3 | #include <Processors/IProcessor.h> |
4 | #include <Processors/Executors/ThreadsQueue.h> |
5 | #include <Common/ThreadPool.h> |
6 | #include <Common/EventCounter.h> |
7 | #include <common/logger_useful.h> |
8 | |
9 | #include <queue> |
10 | #include <stack> |
11 | #include <mutex> |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | |
17 | /// Executes query pipeline. |
18 | class PipelineExecutor |
19 | { |
20 | public: |
21 | /// Get pipeline as a set of processors. |
22 | /// Processors should represent full graph. All ports must be connected, all connected nodes are mentioned in set. |
23 | /// Executor doesn't own processors, just stores reference. |
24 | /// During pipeline execution new processors can appear. They will be added to existing set. |
25 | /// |
26 | /// Explicit graph representation is built in constructor. Throws if graph is not correct. |
27 | explicit PipelineExecutor(Processors & processors_); |
28 | |
29 | /// Execute pipeline in multiple threads. Must be called once. |
30 | /// In case of exception during execution throws any occurred. |
31 | void execute(size_t num_threads); |
32 | |
33 | String getName() const { return "PipelineExecutor" ; } |
34 | |
35 | const Processors & getProcessors() const { return processors; } |
36 | |
37 | /// Cancel execution. May be called from another thread. |
38 | void cancel(); |
39 | |
40 | private: |
41 | Processors & processors; |
42 | std::mutex processors_mutex; |
43 | |
44 | struct Edge |
45 | { |
46 | Edge(UInt64 to_, bool backward_, |
47 | UInt64 input_port_number_, UInt64 output_port_number_, std::vector<void *> * update_list) |
48 | : to(to_), backward(backward_) |
49 | , input_port_number(input_port_number_), output_port_number(output_port_number_) |
50 | { |
51 | update_info.update_list = update_list; |
52 | update_info.id = this; |
53 | } |
54 | |
55 | UInt64 to = std::numeric_limits<UInt64>::max(); |
56 | bool backward; |
57 | UInt64 input_port_number; |
58 | UInt64 output_port_number; |
59 | |
60 | /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details. |
61 | /// To compare version with prev_version we can decide if neighbour processor need to be prepared. |
62 | Port::UpdateInfo update_info; |
63 | }; |
64 | |
65 | /// Use std::list because new ports can be added to processor during execution. |
66 | using Edges = std::list<Edge>; |
67 | |
68 | /// Status for processor. |
69 | /// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can. |
70 | enum class ExecStatus |
71 | { |
72 | Idle, /// prepare returned NeedData or PortFull. Non-owning. |
73 | Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning. |
74 | Executing, /// prepare returned Ready and task is executing. Owning. |
75 | Finished, /// prepare returned Finished. Non-owning. |
76 | Async /// prepare returned Async. Owning. |
77 | }; |
78 | |
79 | /// Small structure with context of executing job. |
80 | struct ExecutionState |
81 | { |
82 | std::exception_ptr exception; |
83 | std::function<void()> job; |
84 | |
85 | IProcessor * processor = nullptr; |
86 | UInt64 processors_id = 0; |
87 | |
88 | /// Counters for profiling. |
89 | size_t num_executed_jobs = 0; |
90 | UInt64 execution_time_ns = 0; |
91 | UInt64 preparation_time_ns = 0; |
92 | }; |
93 | |
94 | struct Node |
95 | { |
96 | IProcessor * processor = nullptr; |
97 | Edges directEdges; |
98 | Edges backEdges; |
99 | |
100 | ExecStatus status; |
101 | std::mutex status_mutex; |
102 | |
103 | std::vector<void *> post_updated_input_ports; |
104 | std::vector<void *> post_updated_output_ports; |
105 | |
106 | /// Last state for profiling. |
107 | IProcessor::Status last_processor_status = IProcessor::Status::NeedData; |
108 | |
109 | std::unique_ptr<ExecutionState> execution_state; |
110 | |
111 | IProcessor::PortNumbers updated_input_ports; |
112 | IProcessor::PortNumbers updated_output_ports; |
113 | |
114 | Node(IProcessor * processor_, UInt64 processor_id) |
115 | : processor(processor_), status(ExecStatus::Idle) |
116 | { |
117 | execution_state = std::make_unique<ExecutionState>(); |
118 | execution_state->processor = processor; |
119 | execution_state->processors_id = processor_id; |
120 | } |
121 | |
122 | Node(Node && other) noexcept |
123 | : processor(other.processor), status(other.status) |
124 | , execution_state(std::move(other.execution_state)) |
125 | { |
126 | } |
127 | }; |
128 | |
129 | using Nodes = std::vector<Node>; |
130 | |
131 | Nodes graph; |
132 | |
133 | using Stack = std::stack<UInt64>; |
134 | |
135 | using TaskQueue = std::queue<ExecutionState *>; |
136 | |
137 | /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. |
138 | /// Stores processors need to be prepared. Preparing status is already set for them. |
139 | TaskQueue task_queue; |
140 | |
141 | ThreadsQueue threads_queue; |
142 | std::mutex task_queue_mutex; |
143 | |
144 | std::atomic_bool cancelled; |
145 | std::atomic_bool finished; |
146 | |
147 | Poco::Logger * log = &Poco::Logger::get("PipelineExecutor" ); |
148 | |
149 | /// Things to stop execution to expand pipeline. |
150 | struct ExpandPipelineTask |
151 | { |
152 | ExecutionState * node_to_expand; |
153 | Stack * stack; |
154 | size_t num_waiting_processing_threads = 0; |
155 | std::mutex mutex; |
156 | std::condition_variable condvar; |
157 | |
158 | ExpandPipelineTask(ExecutionState * node_to_expand_, Stack * stack_) |
159 | : node_to_expand(node_to_expand_), stack(stack_) {} |
160 | }; |
161 | |
162 | std::atomic<size_t> num_processing_executors; |
163 | std::atomic<ExpandPipelineTask *> expand_pipeline_task; |
164 | |
165 | /// Context for each thread. |
166 | struct ExecutorContext |
167 | { |
168 | /// Will store context for all expand pipeline tasks (it's easy and we don't expect many). |
169 | /// This can be solved by using atomic shard ptr. |
170 | std::list<ExpandPipelineTask> task_list; |
171 | |
172 | std::condition_variable condvar; |
173 | std::mutex mutex; |
174 | bool wake_flag = false; |
175 | |
176 | std::queue<ExecutionState *> pinned_tasks; |
177 | }; |
178 | |
179 | std::vector<std::unique_ptr<ExecutorContext>> executor_contexts; |
180 | std::mutex executor_contexts_mutex; |
181 | |
182 | /// Processor ptr -> node number |
183 | using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>; |
184 | ProcessorsMap processors_map; |
185 | |
186 | /// Graph related methods. |
187 | bool addEdges(UInt64 node); |
188 | void buildGraph(); |
189 | void expandPipeline(Stack & stack, UInt64 pid); |
190 | |
191 | /// Pipeline execution related methods. |
192 | void addChildlessProcessorsToStack(Stack & stack); |
193 | bool tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack); |
194 | static void addJob(ExecutionState * execution_state); |
195 | // TODO: void addAsyncJob(UInt64 pid); |
196 | |
197 | /// Prepare processor with pid number. |
198 | /// Check parents and children of current processor and push them to stacks if they also need to be prepared. |
199 | /// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used. |
200 | bool prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async); |
201 | void doExpandPipeline(ExpandPipelineTask * task, bool processing); |
202 | |
203 | void executeImpl(size_t num_threads); |
204 | void executeSingleThread(size_t thread_num, size_t num_threads); |
205 | void finish(); |
206 | |
207 | String dumpPipeline() const; |
208 | }; |
209 | |
210 | using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>; |
211 | |
212 | } |
213 | |