1 | #pragma once |
2 | |
3 | #include <memory> |
4 | #include <Processors/Port.h> |
5 | |
6 | |
7 | class EventCounter; |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | class IProcessor; |
14 | using ProcessorPtr = std::shared_ptr<IProcessor>; |
15 | using Processors = std::vector<ProcessorPtr>; |
16 | |
17 | /** Processor is an element (low level building block) of a query execution pipeline. |
18 | * It has zero or more input ports and zero or more output ports. |
19 | * |
20 | * Blocks of data are transferred over ports. |
21 | * Each port has fixed structure: names and types of columns and values of constants. |
22 | * |
23 | * Processors may pull data from input ports, do some processing and push data to output ports. |
24 | * Processor may indicate that it requires input data to proceed and indicate that it needs data from some ports. |
25 | * |
26 | * Synchronous work must only use CPU - don't do any sleep, IO wait, network wait. |
27 | * |
28 | * Processor may want to do work asynchronously (example: fetch data from remote server) |
29 | * - in this case it will initiate background job and allow to subscribe to it. |
30 | * |
31 | * Processor may throw an exception to indicate some runtime error. |
32 | * |
33 | * Different ports may have different structure. For example, ports may correspond to different resultsets |
34 | * or semantically different parts of result. |
35 | * |
36 | * Processor may modify its ports (create another processors and connect to them) on the fly. |
37 | * Example: first execute the subquery; on basis of subquery result |
38 | * determine how to execute the rest of query and build the corresponding pipeline. |
39 | * |
40 | * Processor may simply wait for another processor to execute without transferring any data from it. |
41 | * For this purpose it should connect its input port to another processor, and indicate need of data. |
42 | * |
43 | * Examples: |
44 | * |
45 | * Source. Has no input ports and single output port. Generates data itself and pushes it to its output port. |
46 | * |
47 | * Sink. Has single input port and no output ports. Consumes data that was passed to its input port. |
48 | * |
49 | * Empty source. Immediately says that data on its output port is finished. |
50 | * |
51 | * Null sink. Consumes data and does nothing. |
52 | * |
53 | * Simple transformation. Has single input and single output port. Pulls data, transforms it and pushes to output port. |
54 | * Example: expression calculator. |
55 | * TODO Better to make each function a separate processor. It's better for pipeline analysis. Also keep in mind 'sleep' and 'rand' functions. |
56 | * |
57 | * Squashing or filtering transformation. Pulls data, possibly accumulates it, and sometimes pushes it to output port. |
58 | * Examples: DISTINCT, WHERE, squashing of blocks for INSERT SELECT. |
59 | * |
60 | * Accumulating transformation. Pulls and accumulates all data from input until it it exhausted, then pushes data to output port. |
61 | * Examples: ORDER BY, GROUP BY. |
62 | * |
63 | * Limiting transformation. Pulls data from input and passes to output. |
64 | * When there was enough data, says that it doesn't need data on its input and that data on its output port is finished. |
65 | * |
66 | * Resize. Has arbitary number of inputs and arbitary number of outputs. |
67 | * Pulls data from whatever ready input and pushes it to randomly choosed free output. |
68 | * Examples: |
69 | * Union - merge data from number of inputs to one output in arbitary order. |
70 | * Split - read data from one input and pass it to arbitary output. |
71 | * |
72 | * Concat. Has many inputs and only one output. Pulls all data from first input until it is exhausted, |
73 | * then all data from second input, etc. and pushes all data to output. |
74 | * |
75 | * Ordered merge. Has many inputs but only one output. Pulls data from selected input in specific order, merges and pushes it to output. |
76 | * |
77 | * Fork. Has one input and many outputs. Pulls data from input and copies it to all outputs. |
78 | * Used to process multiple queries with common source of data. |
79 | * |
80 | * Select. Has one or multiple inputs and one output. |
81 | * Read blocks from inputs and check that blocks on inputs are "parallel": correspond to each other in number of rows. |
82 | * Construct a new block by selecting some subset (or all) of columns from inputs. |
83 | * Example: collect columns - function arguments before function execution. |
84 | * |
85 | * |
86 | * TODO Processors may carry algebraic properties about transformations they do. |
87 | * For example, that processor doesn't change number of rows; doesn't change order of rows, doesn't change the set of rows, etc. |
88 | * |
89 | * TODO Ports may carry algebraic properties about streams of data. |
90 | * For example, that data comes ordered by specific key; or grouped by specific key; or have unique values of specific key. |
91 | * And also simple properties, including lower and upper bound on number of rows. |
92 | * |
93 | * TODO Processor should have declarative representation, that is able to be serialized and parsed. |
94 | * Example: read_from_merge_tree(database, table, Columns(a, b, c), Piece(0, 10), Parts(Part('name', MarkRanges(MarkRange(0, 100), ...)), ...)) |
95 | * It's reasonable to have an intermediate language for declaration of pipelines. |
96 | * |
97 | * TODO Processor with all its parameters should represent "pure" function on streams of data from its input ports. |
98 | * It's in question, what kind of "pure" function do we mean. |
99 | * For example, data streams are considered equal up to order unless ordering properties are stated explicitly. |
100 | * Another example: we should support the notion of "arbitary N-th of M substream" of full stream of data. |
101 | */ |
102 | |
103 | class IProcessor |
104 | { |
105 | protected: |
106 | InputPorts inputs; |
107 | OutputPorts outputs; |
108 | |
109 | public: |
110 | IProcessor() = default; |
111 | |
112 | IProcessor(InputPorts inputs_, OutputPorts outputs_) |
113 | : inputs(std::move(inputs_)), outputs(std::move(outputs_)) |
114 | { |
115 | for (auto & port : inputs) |
116 | port.processor = this; |
117 | for (auto & port : outputs) |
118 | port.processor = this; |
119 | } |
120 | |
121 | virtual String getName() const = 0; |
122 | |
123 | enum class Status |
124 | { |
125 | /// Processor needs some data at its inputs to proceed. |
126 | /// You need to run another processor to generate required input and then call 'prepare' again. |
127 | NeedData, |
128 | |
129 | /// Processor cannot proceed because output port is full or not isNeeded(). |
130 | /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again. |
131 | PortFull, |
132 | |
133 | /// All work is done (all data is processed or all output are closed), nothing more to do. |
134 | Finished, |
135 | |
136 | /// No one needs data on output ports. |
137 | /// Unneeded, |
138 | |
139 | /// You may call 'work' method and processor will do some work synchronously. |
140 | Ready, |
141 | |
142 | /// You may call 'schedule' method and processor will initiate some background work. |
143 | Async, |
144 | |
145 | /// Processor is doing some work in background. |
146 | /// You may wait for next event or do something else and then you should call 'prepare' again. |
147 | Wait, |
148 | |
149 | /// Processor wants to add other processors to pipeline. |
150 | /// New processors must be obtained by expandPipeline() call. |
151 | ExpandPipeline, |
152 | }; |
153 | |
154 | static std::string statusToName(Status status); |
155 | |
156 | /** Method 'prepare' is responsible for all cheap ("instantenous": O(1) of data volume, no wait) calculations. |
157 | * |
158 | * It may access input and output ports, |
159 | * indicate the need for work by another processor by returning NeedData or PortFull, |
160 | * or indicate the absense of work by returning Finished or Unneeded, |
161 | * it may pull data from input ports and push data to output ports. |
162 | * |
163 | * The method is not thread-safe and must be called from a single thread in one moment of time, |
164 | * even for different connected processors. |
165 | * |
166 | * Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async. |
167 | * |
168 | * Thread safety and parallel execution: |
169 | * - no methods (prepare, work, schedule) of single object can be executed in parallel; |
170 | * - method 'work' can be executed in parallel for different objects, even for connected processors; |
171 | * - method 'prepare' cannot be executed in parallel even for different objects, |
172 | * if they are connected (including indirectly) to each other by their ports; |
173 | */ |
174 | virtual Status prepare() |
175 | { |
176 | throw Exception("Method 'prepare' is not implemented for " + getName() + " processor" , ErrorCodes::NOT_IMPLEMENTED); |
177 | } |
178 | |
179 | using PortNumbers = std::vector<UInt64>; |
180 | |
181 | /// Optimization for prepare in case we know ports were updated. |
182 | virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); } |
183 | |
184 | /** You may call this method if 'prepare' returned Ready. |
185 | * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. |
186 | * |
187 | * Method work can be executed in parallel for different processors. |
188 | */ |
189 | virtual void work() |
190 | { |
191 | throw Exception("Method 'work' is not implemented for " + getName() + " processor" , ErrorCodes::NOT_IMPLEMENTED); |
192 | } |
193 | |
194 | /** You may call this method if 'prepare' returned Async. |
195 | * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. |
196 | * |
197 | * This method should return instantly and fire an event (or many events) when asynchronous job will be done. |
198 | * When the job is not done, method 'prepare' will return Wait and the user may block and wait for next event before checking again. |
199 | * |
200 | * Note that it can fire many events in EventCounter while doing its job, |
201 | * and you have to wait for next event (or do something else) every time when 'prepare' returned Wait. |
202 | */ |
203 | virtual void schedule(EventCounter & /*watch*/) |
204 | { |
205 | throw Exception("Method 'schedule' is not implemented for " + getName() + " processor" , ErrorCodes::NOT_IMPLEMENTED); |
206 | } |
207 | |
208 | /** You must call this method if 'prepare' returned ExpandPipeline. |
209 | * This method cannot access any port, but it can create new ports for current processor. |
210 | * |
211 | * Method should return set of new already connected processors. |
212 | * All added processors must be connected only to each other or current processor. |
213 | * |
214 | * Method can't remove or reconnect existing ports, move data from/to port or perform calculations. |
215 | * 'prepare' should be called again after expanding pipeline. |
216 | */ |
217 | virtual Processors expandPipeline() |
218 | { |
219 | throw Exception("Method 'expandPipeline' is not implemented for " + getName() + " processor" , ErrorCodes::NOT_IMPLEMENTED); |
220 | } |
221 | |
222 | /// In case if query was cancelled executor will wait till all processors finish their jobs. |
223 | /// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o). |
224 | bool isCancelled() const { return is_cancelled; } |
225 | void cancel() { is_cancelled = true; } |
226 | |
227 | virtual ~IProcessor() = default; |
228 | |
229 | auto & getInputs() { return inputs; } |
230 | auto & getOutputs() { return outputs; } |
231 | |
232 | UInt64 getInputPortNumber(const InputPort * input_port) const |
233 | { |
234 | UInt64 number = 0; |
235 | for (auto & port : inputs) |
236 | { |
237 | if (&port == input_port) |
238 | return number; |
239 | |
240 | ++number; |
241 | } |
242 | |
243 | throw Exception("Can't find input port for " + getName() + " processor" , ErrorCodes::LOGICAL_ERROR); |
244 | } |
245 | |
246 | UInt64 getOutputPortNumber(const OutputPort * output_port) const |
247 | { |
248 | UInt64 number = 0; |
249 | for (auto & port : outputs) |
250 | { |
251 | if (&port == output_port) |
252 | return number; |
253 | |
254 | ++number; |
255 | } |
256 | |
257 | throw Exception("Can't find output port for " + getName() + " processor" , ErrorCodes::LOGICAL_ERROR); |
258 | } |
259 | |
260 | const auto & getInputs() const { return inputs; } |
261 | const auto & getOutputs() const { return outputs; } |
262 | |
263 | /// Debug output. |
264 | void dump() const; |
265 | |
266 | /// Used to print pipeline. |
267 | void setDescription(const std::string & description_) { processor_description = description_; } |
268 | const std::string & getDescription() const { return processor_description; } |
269 | |
270 | /// Helpers for pipeline executor. |
271 | void setStream(size_t value) { stream_number = value; } |
272 | size_t getStream() const { return stream_number; } |
273 | constexpr static size_t NO_STREAM = std::numeric_limits<size_t>::max(); |
274 | |
275 | private: |
276 | std::atomic<bool> is_cancelled{false}; |
277 | |
278 | std::string processor_description; |
279 | |
280 | size_t stream_number = NO_STREAM; |
281 | }; |
282 | |
283 | |
284 | |
285 | |
286 | } |
287 | |