1#pragma once
2
3#include <memory>
4#include <Processors/Port.h>
5
6
7class EventCounter;
8
9
10namespace DB
11{
12
13class IProcessor;
14using ProcessorPtr = std::shared_ptr<IProcessor>;
15using Processors = std::vector<ProcessorPtr>;
16
17/** Processor is an element (low level building block) of a query execution pipeline.
18 * It has zero or more input ports and zero or more output ports.
19 *
20 * Blocks of data are transferred over ports.
21 * Each port has fixed structure: names and types of columns and values of constants.
22 *
23 * Processors may pull data from input ports, do some processing and push data to output ports.
24 * Processor may indicate that it requires input data to proceed and indicate that it needs data from some ports.
25 *
26 * Synchronous work must only use CPU - don't do any sleep, IO wait, network wait.
27 *
28 * Processor may want to do work asynchronously (example: fetch data from remote server)
29 * - in this case it will initiate background job and allow to subscribe to it.
30 *
31 * Processor may throw an exception to indicate some runtime error.
32 *
33 * Different ports may have different structure. For example, ports may correspond to different resultsets
34 * or semantically different parts of result.
35 *
36 * Processor may modify its ports (create another processors and connect to them) on the fly.
37 * Example: first execute the subquery; on basis of subquery result
38 * determine how to execute the rest of query and build the corresponding pipeline.
39 *
40 * Processor may simply wait for another processor to execute without transferring any data from it.
41 * For this purpose it should connect its input port to another processor, and indicate need of data.
42 *
43 * Examples:
44 *
45 * Source. Has no input ports and single output port. Generates data itself and pushes it to its output port.
46 *
47 * Sink. Has single input port and no output ports. Consumes data that was passed to its input port.
48 *
49 * Empty source. Immediately says that data on its output port is finished.
50 *
51 * Null sink. Consumes data and does nothing.
52 *
53 * Simple transformation. Has single input and single output port. Pulls data, transforms it and pushes to output port.
54 * Example: expression calculator.
55 * TODO Better to make each function a separate processor. It's better for pipeline analysis. Also keep in mind 'sleep' and 'rand' functions.
56 *
57 * Squashing or filtering transformation. Pulls data, possibly accumulates it, and sometimes pushes it to output port.
58 * Examples: DISTINCT, WHERE, squashing of blocks for INSERT SELECT.
59 *
60 * Accumulating transformation. Pulls and accumulates all data from input until it it exhausted, then pushes data to output port.
61 * Examples: ORDER BY, GROUP BY.
62 *
63 * Limiting transformation. Pulls data from input and passes to output.
64 * When there was enough data, says that it doesn't need data on its input and that data on its output port is finished.
65 *
66 * Resize. Has arbitary number of inputs and arbitary number of outputs.
67 * Pulls data from whatever ready input and pushes it to randomly choosed free output.
68 * Examples:
69 * Union - merge data from number of inputs to one output in arbitary order.
70 * Split - read data from one input and pass it to arbitary output.
71 *
72 * Concat. Has many inputs and only one output. Pulls all data from first input until it is exhausted,
73 * then all data from second input, etc. and pushes all data to output.
74 *
75 * Ordered merge. Has many inputs but only one output. Pulls data from selected input in specific order, merges and pushes it to output.
76 *
77 * Fork. Has one input and many outputs. Pulls data from input and copies it to all outputs.
78 * Used to process multiple queries with common source of data.
79 *
80 * Select. Has one or multiple inputs and one output.
81 * Read blocks from inputs and check that blocks on inputs are "parallel": correspond to each other in number of rows.
82 * Construct a new block by selecting some subset (or all) of columns from inputs.
83 * Example: collect columns - function arguments before function execution.
84 *
85 *
86 * TODO Processors may carry algebraic properties about transformations they do.
87 * For example, that processor doesn't change number of rows; doesn't change order of rows, doesn't change the set of rows, etc.
88 *
89 * TODO Ports may carry algebraic properties about streams of data.
90 * For example, that data comes ordered by specific key; or grouped by specific key; or have unique values of specific key.
91 * And also simple properties, including lower and upper bound on number of rows.
92 *
93 * TODO Processor should have declarative representation, that is able to be serialized and parsed.
94 * Example: read_from_merge_tree(database, table, Columns(a, b, c), Piece(0, 10), Parts(Part('name', MarkRanges(MarkRange(0, 100), ...)), ...))
95 * It's reasonable to have an intermediate language for declaration of pipelines.
96 *
97 * TODO Processor with all its parameters should represent "pure" function on streams of data from its input ports.
98 * It's in question, what kind of "pure" function do we mean.
99 * For example, data streams are considered equal up to order unless ordering properties are stated explicitly.
100 * Another example: we should support the notion of "arbitary N-th of M substream" of full stream of data.
101 */
102
103class IProcessor
104{
105protected:
106 InputPorts inputs;
107 OutputPorts outputs;
108
109public:
110 IProcessor() = default;
111
112 IProcessor(InputPorts inputs_, OutputPorts outputs_)
113 : inputs(std::move(inputs_)), outputs(std::move(outputs_))
114 {
115 for (auto & port : inputs)
116 port.processor = this;
117 for (auto & port : outputs)
118 port.processor = this;
119 }
120
121 virtual String getName() const = 0;
122
123 enum class Status
124 {
125 /// Processor needs some data at its inputs to proceed.
126 /// You need to run another processor to generate required input and then call 'prepare' again.
127 NeedData,
128
129 /// Processor cannot proceed because output port is full or not isNeeded().
130 /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.
131 PortFull,
132
133 /// All work is done (all data is processed or all output are closed), nothing more to do.
134 Finished,
135
136 /// No one needs data on output ports.
137 /// Unneeded,
138
139 /// You may call 'work' method and processor will do some work synchronously.
140 Ready,
141
142 /// You may call 'schedule' method and processor will initiate some background work.
143 Async,
144
145 /// Processor is doing some work in background.
146 /// You may wait for next event or do something else and then you should call 'prepare' again.
147 Wait,
148
149 /// Processor wants to add other processors to pipeline.
150 /// New processors must be obtained by expandPipeline() call.
151 ExpandPipeline,
152 };
153
154 static std::string statusToName(Status status);
155
156 /** Method 'prepare' is responsible for all cheap ("instantenous": O(1) of data volume, no wait) calculations.
157 *
158 * It may access input and output ports,
159 * indicate the need for work by another processor by returning NeedData or PortFull,
160 * or indicate the absense of work by returning Finished or Unneeded,
161 * it may pull data from input ports and push data to output ports.
162 *
163 * The method is not thread-safe and must be called from a single thread in one moment of time,
164 * even for different connected processors.
165 *
166 * Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async.
167 *
168 * Thread safety and parallel execution:
169 * - no methods (prepare, work, schedule) of single object can be executed in parallel;
170 * - method 'work' can be executed in parallel for different objects, even for connected processors;
171 * - method 'prepare' cannot be executed in parallel even for different objects,
172 * if they are connected (including indirectly) to each other by their ports;
173 */
174 virtual Status prepare()
175 {
176 throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
177 }
178
179 using PortNumbers = std::vector<UInt64>;
180
181 /// Optimization for prepare in case we know ports were updated.
182 virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); }
183
184 /** You may call this method if 'prepare' returned Ready.
185 * This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
186 *
187 * Method work can be executed in parallel for different processors.
188 */
189 virtual void work()
190 {
191 throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
192 }
193
194 /** You may call this method if 'prepare' returned Async.
195 * This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
196 *
197 * This method should return instantly and fire an event (or many events) when asynchronous job will be done.
198 * When the job is not done, method 'prepare' will return Wait and the user may block and wait for next event before checking again.
199 *
200 * Note that it can fire many events in EventCounter while doing its job,
201 * and you have to wait for next event (or do something else) every time when 'prepare' returned Wait.
202 */
203 virtual void schedule(EventCounter & /*watch*/)
204 {
205 throw Exception("Method 'schedule' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
206 }
207
208 /** You must call this method if 'prepare' returned ExpandPipeline.
209 * This method cannot access any port, but it can create new ports for current processor.
210 *
211 * Method should return set of new already connected processors.
212 * All added processors must be connected only to each other or current processor.
213 *
214 * Method can't remove or reconnect existing ports, move data from/to port or perform calculations.
215 * 'prepare' should be called again after expanding pipeline.
216 */
217 virtual Processors expandPipeline()
218 {
219 throw Exception("Method 'expandPipeline' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
220 }
221
222 /// In case if query was cancelled executor will wait till all processors finish their jobs.
223 /// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
224 bool isCancelled() const { return is_cancelled; }
225 void cancel() { is_cancelled = true; }
226
227 virtual ~IProcessor() = default;
228
229 auto & getInputs() { return inputs; }
230 auto & getOutputs() { return outputs; }
231
232 UInt64 getInputPortNumber(const InputPort * input_port) const
233 {
234 UInt64 number = 0;
235 for (auto & port : inputs)
236 {
237 if (&port == input_port)
238 return number;
239
240 ++number;
241 }
242
243 throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
244 }
245
246 UInt64 getOutputPortNumber(const OutputPort * output_port) const
247 {
248 UInt64 number = 0;
249 for (auto & port : outputs)
250 {
251 if (&port == output_port)
252 return number;
253
254 ++number;
255 }
256
257 throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
258 }
259
260 const auto & getInputs() const { return inputs; }
261 const auto & getOutputs() const { return outputs; }
262
263 /// Debug output.
264 void dump() const;
265
266 /// Used to print pipeline.
267 void setDescription(const std::string & description_) { processor_description = description_; }
268 const std::string & getDescription() const { return processor_description; }
269
270 /// Helpers for pipeline executor.
271 void setStream(size_t value) { stream_number = value; }
272 size_t getStream() const { return stream_number; }
273 constexpr static size_t NO_STREAM = std::numeric_limits<size_t>::max();
274
275private:
276 std::atomic<bool> is_cancelled{false};
277
278 std::string processor_description;
279
280 size_t stream_number = NO_STREAM;
281};
282
283
284
285
286}
287