1#pragma once
2
3#include <Processors/IProcessor.h>
4#include <Processors/Executors/ThreadsQueue.h>
5#include <Common/ThreadPool.h>
6#include <Common/EventCounter.h>
7#include <common/logger_useful.h>
8
9#include <queue>
10#include <stack>
11#include <mutex>
12
13namespace DB
14{
15
16
17/// Executes query pipeline.
18class PipelineExecutor
19{
20public:
21 /// Get pipeline as a set of processors.
22 /// Processors should represent full graph. All ports must be connected, all connected nodes are mentioned in set.
23 /// Executor doesn't own processors, just stores reference.
24 /// During pipeline execution new processors can appear. They will be added to existing set.
25 ///
26 /// Explicit graph representation is built in constructor. Throws if graph is not correct.
27 explicit PipelineExecutor(Processors & processors_);
28
29 /// Execute pipeline in multiple threads. Must be called once.
30 /// In case of exception during execution throws any occurred.
31 void execute(size_t num_threads);
32
33 String getName() const { return "PipelineExecutor"; }
34
35 const Processors & getProcessors() const { return processors; }
36
37 /// Cancel execution. May be called from another thread.
38 void cancel();
39
40private:
41 Processors & processors;
42 std::mutex processors_mutex;
43
44 struct Edge
45 {
46 Edge(UInt64 to_, bool backward_,
47 UInt64 input_port_number_, UInt64 output_port_number_, std::vector<void *> * update_list)
48 : to(to_), backward(backward_)
49 , input_port_number(input_port_number_), output_port_number(output_port_number_)
50 {
51 update_info.update_list = update_list;
52 update_info.id = this;
53 }
54
55 UInt64 to = std::numeric_limits<UInt64>::max();
56 bool backward;
57 UInt64 input_port_number;
58 UInt64 output_port_number;
59
60 /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
61 /// To compare version with prev_version we can decide if neighbour processor need to be prepared.
62 Port::UpdateInfo update_info;
63 };
64
65 /// Use std::list because new ports can be added to processor during execution.
66 using Edges = std::list<Edge>;
67
68 /// Status for processor.
69 /// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can.
70 enum class ExecStatus
71 {
72 Idle, /// prepare returned NeedData or PortFull. Non-owning.
73 Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning.
74 Executing, /// prepare returned Ready and task is executing. Owning.
75 Finished, /// prepare returned Finished. Non-owning.
76 Async /// prepare returned Async. Owning.
77 };
78
79 /// Small structure with context of executing job.
80 struct ExecutionState
81 {
82 std::exception_ptr exception;
83 std::function<void()> job;
84
85 IProcessor * processor = nullptr;
86 UInt64 processors_id = 0;
87
88 /// Counters for profiling.
89 size_t num_executed_jobs = 0;
90 UInt64 execution_time_ns = 0;
91 UInt64 preparation_time_ns = 0;
92 };
93
94 struct Node
95 {
96 IProcessor * processor = nullptr;
97 Edges directEdges;
98 Edges backEdges;
99
100 ExecStatus status;
101 std::mutex status_mutex;
102
103 std::vector<void *> post_updated_input_ports;
104 std::vector<void *> post_updated_output_ports;
105
106 /// Last state for profiling.
107 IProcessor::Status last_processor_status = IProcessor::Status::NeedData;
108
109 std::unique_ptr<ExecutionState> execution_state;
110
111 IProcessor::PortNumbers updated_input_ports;
112 IProcessor::PortNumbers updated_output_ports;
113
114 Node(IProcessor * processor_, UInt64 processor_id)
115 : processor(processor_), status(ExecStatus::Idle)
116 {
117 execution_state = std::make_unique<ExecutionState>();
118 execution_state->processor = processor;
119 execution_state->processors_id = processor_id;
120 }
121
122 Node(Node && other) noexcept
123 : processor(other.processor), status(other.status)
124 , execution_state(std::move(other.execution_state))
125 {
126 }
127 };
128
129 using Nodes = std::vector<Node>;
130
131 Nodes graph;
132
133 using Stack = std::stack<UInt64>;
134
135 using TaskQueue = std::queue<ExecutionState *>;
136
137 /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
138 /// Stores processors need to be prepared. Preparing status is already set for them.
139 TaskQueue task_queue;
140
141 ThreadsQueue threads_queue;
142 std::mutex task_queue_mutex;
143
144 std::atomic_bool cancelled;
145 std::atomic_bool finished;
146
147 Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");
148
149 /// Things to stop execution to expand pipeline.
150 struct ExpandPipelineTask
151 {
152 ExecutionState * node_to_expand;
153 Stack * stack;
154 size_t num_waiting_processing_threads = 0;
155 std::mutex mutex;
156 std::condition_variable condvar;
157
158 ExpandPipelineTask(ExecutionState * node_to_expand_, Stack * stack_)
159 : node_to_expand(node_to_expand_), stack(stack_) {}
160 };
161
162 std::atomic<size_t> num_processing_executors;
163 std::atomic<ExpandPipelineTask *> expand_pipeline_task;
164
165 /// Context for each thread.
166 struct ExecutorContext
167 {
168 /// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
169 /// This can be solved by using atomic shard ptr.
170 std::list<ExpandPipelineTask> task_list;
171
172 std::condition_variable condvar;
173 std::mutex mutex;
174 bool wake_flag = false;
175
176 std::queue<ExecutionState *> pinned_tasks;
177 };
178
179 std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
180 std::mutex executor_contexts_mutex;
181
182 /// Processor ptr -> node number
183 using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>;
184 ProcessorsMap processors_map;
185
186 /// Graph related methods.
187 bool addEdges(UInt64 node);
188 void buildGraph();
189 void expandPipeline(Stack & stack, UInt64 pid);
190
191 /// Pipeline execution related methods.
192 void addChildlessProcessorsToStack(Stack & stack);
193 bool tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack);
194 static void addJob(ExecutionState * execution_state);
195 // TODO: void addAsyncJob(UInt64 pid);
196
197 /// Prepare processor with pid number.
198 /// Check parents and children of current processor and push them to stacks if they also need to be prepared.
199 /// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used.
200 bool prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async);
201 void doExpandPipeline(ExpandPipelineTask * task, bool processing);
202
203 void executeImpl(size_t num_threads);
204 void executeSingleThread(size_t thread_num, size_t num_threads);
205 void finish();
206
207 String dumpPipeline() const;
208};
209
210using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>;
211
212}
213