| 1 | #pragma once | 
|---|
| 2 |  | 
|---|
| 3 | #include <memory> | 
|---|
| 4 | #include <Processors/Port.h> | 
|---|
| 5 |  | 
|---|
| 6 |  | 
|---|
| 7 | class EventCounter; | 
|---|
| 8 |  | 
|---|
| 9 |  | 
|---|
| 10 | namespace DB | 
|---|
| 11 | { | 
|---|
| 12 |  | 
|---|
| 13 | class IProcessor; | 
|---|
| 14 | using ProcessorPtr = std::shared_ptr<IProcessor>; | 
|---|
| 15 | using Processors = std::vector<ProcessorPtr>; | 
|---|
| 16 |  | 
|---|
| 17 | /** Processor is an element (low level building block) of a query execution pipeline. | 
|---|
| 18 | * It has zero or more input ports and zero or more output ports. | 
|---|
| 19 | * | 
|---|
| 20 | * Blocks of data are transferred over ports. | 
|---|
| 21 | * Each port has fixed structure: names and types of columns and values of constants. | 
|---|
| 22 | * | 
|---|
| 23 | * Processors may pull data from input ports, do some processing and push data to output ports. | 
|---|
| 24 | * Processor may indicate that it requires input data to proceed and indicate that it needs data from some ports. | 
|---|
| 25 | * | 
|---|
| 26 | * Synchronous work must only use CPU - don't do any sleep, IO wait, network wait. | 
|---|
| 27 | * | 
|---|
| 28 | * Processor may want to do work asynchronously (example: fetch data from remote server) | 
|---|
| 29 | *  - in this case it will initiate background job and allow to subscribe to it. | 
|---|
| 30 | * | 
|---|
| 31 | * Processor may throw an exception to indicate some runtime error. | 
|---|
| 32 | * | 
|---|
| 33 | * Different ports may have different structure. For example, ports may correspond to different resultsets | 
|---|
| 34 | *  or semantically different parts of result. | 
|---|
| 35 | * | 
|---|
| 36 | * Processor may modify its ports (create another processors and connect to them) on the fly. | 
|---|
| 37 | * Example: first execute the subquery; on basis of subquery result | 
|---|
| 38 | *  determine how to execute the rest of query and build the corresponding pipeline. | 
|---|
| 39 | * | 
|---|
| 40 | * Processor may simply wait for another processor to execute without transferring any data from it. | 
|---|
| 41 | * For this purpose it should connect its input port to another processor, and indicate need of data. | 
|---|
| 42 | * | 
|---|
| 43 | * Examples: | 
|---|
| 44 | * | 
|---|
| 45 | * Source. Has no input ports and single output port. Generates data itself and pushes it to its output port. | 
|---|
| 46 | * | 
|---|
| 47 | * Sink. Has single input port and no output ports. Consumes data that was passed to its input port. | 
|---|
| 48 | * | 
|---|
| 49 | * Empty source. Immediately says that data on its output port is finished. | 
|---|
| 50 | * | 
|---|
| 51 | * Null sink. Consumes data and does nothing. | 
|---|
| 52 | * | 
|---|
| 53 | * Simple transformation. Has single input and single output port. Pulls data, transforms it and pushes to output port. | 
|---|
| 54 | * Example: expression calculator. | 
|---|
| 55 | * TODO Better to make each function a separate processor. It's better for pipeline analysis. Also keep in mind 'sleep' and 'rand' functions. | 
|---|
| 56 | * | 
|---|
| 57 | * Squashing or filtering transformation. Pulls data, possibly accumulates it, and sometimes pushes it to output port. | 
|---|
| 58 | * Examples: DISTINCT, WHERE, squashing of blocks for INSERT SELECT. | 
|---|
| 59 | * | 
|---|
| 60 | * Accumulating transformation. Pulls and accumulates all data from input until it it exhausted, then pushes data to output port. | 
|---|
| 61 | * Examples: ORDER BY, GROUP BY. | 
|---|
| 62 | * | 
|---|
| 63 | * Limiting transformation. Pulls data from input and passes to output. | 
|---|
| 64 | * When there was enough data, says that it doesn't need data on its input and that data on its output port is finished. | 
|---|
| 65 | * | 
|---|
| 66 | * Resize. Has arbitary number of inputs and arbitary number of outputs. | 
|---|
| 67 | * Pulls data from whatever ready input and pushes it to randomly choosed free output. | 
|---|
| 68 | * Examples: | 
|---|
| 69 | * Union - merge data from number of inputs to one output in arbitary order. | 
|---|
| 70 | * Split - read data from one input and pass it to arbitary output. | 
|---|
| 71 | * | 
|---|
| 72 | * Concat. Has many inputs and only one output. Pulls all data from first input until it is exhausted, | 
|---|
| 73 | *  then all data from second input, etc. and pushes all data to output. | 
|---|
| 74 | * | 
|---|
| 75 | * Ordered merge. Has many inputs but only one output. Pulls data from selected input in specific order, merges and pushes it to output. | 
|---|
| 76 | * | 
|---|
| 77 | * Fork. Has one input and many outputs. Pulls data from input and copies it to all outputs. | 
|---|
| 78 | * Used to process multiple queries with common source of data. | 
|---|
| 79 | * | 
|---|
| 80 | * Select. Has one or multiple inputs and one output. | 
|---|
| 81 | * Read blocks from inputs and check that blocks on inputs are "parallel": correspond to each other in number of rows. | 
|---|
| 82 | * Construct a new block by selecting some subset (or all) of columns from inputs. | 
|---|
| 83 | * Example: collect columns - function arguments before function execution. | 
|---|
| 84 | * | 
|---|
| 85 | * | 
|---|
| 86 | * TODO Processors may carry algebraic properties about transformations they do. | 
|---|
| 87 | * For example, that processor doesn't change number of rows; doesn't change order of rows, doesn't change the set of rows, etc. | 
|---|
| 88 | * | 
|---|
| 89 | * TODO Ports may carry algebraic properties about streams of data. | 
|---|
| 90 | * For example, that data comes ordered by specific key; or grouped by specific key; or have unique values of specific key. | 
|---|
| 91 | * And also simple properties, including lower and upper bound on number of rows. | 
|---|
| 92 | * | 
|---|
| 93 | * TODO Processor should have declarative representation, that is able to be serialized and parsed. | 
|---|
| 94 | * Example: read_from_merge_tree(database, table, Columns(a, b, c), Piece(0, 10), Parts(Part('name', MarkRanges(MarkRange(0, 100), ...)), ...)) | 
|---|
| 95 | * It's reasonable to have an intermediate language for declaration of pipelines. | 
|---|
| 96 | * | 
|---|
| 97 | * TODO Processor with all its parameters should represent "pure" function on streams of data from its input ports. | 
|---|
| 98 | * It's in question, what kind of "pure" function do we mean. | 
|---|
| 99 | * For example, data streams are considered equal up to order unless ordering properties are stated explicitly. | 
|---|
| 100 | * Another example: we should support the notion of "arbitary N-th of M substream" of full stream of data. | 
|---|
| 101 | */ | 
|---|
| 102 |  | 
|---|
| 103 | class IProcessor | 
|---|
| 104 | { | 
|---|
| 105 | protected: | 
|---|
| 106 | InputPorts inputs; | 
|---|
| 107 | OutputPorts outputs; | 
|---|
| 108 |  | 
|---|
| 109 | public: | 
|---|
| 110 | IProcessor() = default; | 
|---|
| 111 |  | 
|---|
| 112 | IProcessor(InputPorts inputs_, OutputPorts outputs_) | 
|---|
| 113 | : inputs(std::move(inputs_)), outputs(std::move(outputs_)) | 
|---|
| 114 | { | 
|---|
| 115 | for (auto & port : inputs) | 
|---|
| 116 | port.processor = this; | 
|---|
| 117 | for (auto & port : outputs) | 
|---|
| 118 | port.processor = this; | 
|---|
| 119 | } | 
|---|
| 120 |  | 
|---|
| 121 | virtual String getName() const = 0; | 
|---|
| 122 |  | 
|---|
| 123 | enum class Status | 
|---|
| 124 | { | 
|---|
| 125 | /// Processor needs some data at its inputs to proceed. | 
|---|
| 126 | /// You need to run another processor to generate required input and then call 'prepare' again. | 
|---|
| 127 | NeedData, | 
|---|
| 128 |  | 
|---|
| 129 | /// Processor cannot proceed because output port is full or not isNeeded(). | 
|---|
| 130 | /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again. | 
|---|
| 131 | PortFull, | 
|---|
| 132 |  | 
|---|
| 133 | /// All work is done (all data is processed or all output are closed), nothing more to do. | 
|---|
| 134 | Finished, | 
|---|
| 135 |  | 
|---|
| 136 | /// No one needs data on output ports. | 
|---|
| 137 | /// Unneeded, | 
|---|
| 138 |  | 
|---|
| 139 | /// You may call 'work' method and processor will do some work synchronously. | 
|---|
| 140 | Ready, | 
|---|
| 141 |  | 
|---|
| 142 | /// You may call 'schedule' method and processor will initiate some background work. | 
|---|
| 143 | Async, | 
|---|
| 144 |  | 
|---|
| 145 | /// Processor is doing some work in background. | 
|---|
| 146 | /// You may wait for next event or do something else and then you should call 'prepare' again. | 
|---|
| 147 | Wait, | 
|---|
| 148 |  | 
|---|
| 149 | /// Processor wants to add other processors to pipeline. | 
|---|
| 150 | /// New processors must be obtained by expandPipeline() call. | 
|---|
| 151 | ExpandPipeline, | 
|---|
| 152 | }; | 
|---|
| 153 |  | 
|---|
| 154 | static std::string statusToName(Status status); | 
|---|
| 155 |  | 
|---|
| 156 | /** Method 'prepare' is responsible for all cheap ("instantenous": O(1) of data volume, no wait) calculations. | 
|---|
| 157 | * | 
|---|
| 158 | * It may access input and output ports, | 
|---|
| 159 | *  indicate the need for work by another processor by returning NeedData or PortFull, | 
|---|
| 160 | *  or indicate the absense of work by returning Finished or Unneeded, | 
|---|
| 161 | *  it may pull data from input ports and push data to output ports. | 
|---|
| 162 | * | 
|---|
| 163 | * The method is not thread-safe and must be called from a single thread in one moment of time, | 
|---|
| 164 | *  even for different connected processors. | 
|---|
| 165 | * | 
|---|
| 166 | * Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async. | 
|---|
| 167 | * | 
|---|
| 168 | * Thread safety and parallel execution: | 
|---|
| 169 | * - no methods (prepare, work, schedule) of single object can be executed in parallel; | 
|---|
| 170 | * - method 'work' can be executed in parallel for different objects, even for connected processors; | 
|---|
| 171 | * - method 'prepare' cannot be executed in parallel even for different objects, | 
|---|
| 172 | *   if they are connected (including indirectly) to each other by their ports; | 
|---|
| 173 | */ | 
|---|
| 174 | virtual Status prepare() | 
|---|
| 175 | { | 
|---|
| 176 | throw Exception( "Method 'prepare' is not implemented for "+ getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 177 | } | 
|---|
| 178 |  | 
|---|
| 179 | using PortNumbers = std::vector<UInt64>; | 
|---|
| 180 |  | 
|---|
| 181 | /// Optimization for prepare in case we know ports were updated. | 
|---|
| 182 | virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); } | 
|---|
| 183 |  | 
|---|
| 184 | /** You may call this method if 'prepare' returned Ready. | 
|---|
| 185 | * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. | 
|---|
| 186 | * | 
|---|
| 187 | * Method work can be executed in parallel for different processors. | 
|---|
| 188 | */ | 
|---|
| 189 | virtual void work() | 
|---|
| 190 | { | 
|---|
| 191 | throw Exception( "Method 'work' is not implemented for "+ getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 192 | } | 
|---|
| 193 |  | 
|---|
| 194 | /** You may call this method if 'prepare' returned Async. | 
|---|
| 195 | * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. | 
|---|
| 196 | * | 
|---|
| 197 | * This method should return instantly and fire an event (or many events) when asynchronous job will be done. | 
|---|
| 198 | * When the job is not done, method 'prepare' will return Wait and the user may block and wait for next event before checking again. | 
|---|
| 199 | * | 
|---|
| 200 | * Note that it can fire many events in EventCounter while doing its job, | 
|---|
| 201 | *  and you have to wait for next event (or do something else) every time when 'prepare' returned Wait. | 
|---|
| 202 | */ | 
|---|
| 203 | virtual void schedule(EventCounter & /*watch*/) | 
|---|
| 204 | { | 
|---|
| 205 | throw Exception( "Method 'schedule' is not implemented for "+ getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 206 | } | 
|---|
| 207 |  | 
|---|
| 208 | /** You must call this method if 'prepare' returned ExpandPipeline. | 
|---|
| 209 | * This method cannot access any port, but it can create new ports for current processor. | 
|---|
| 210 | * | 
|---|
| 211 | * Method should return set of new already connected processors. | 
|---|
| 212 | * All added processors must be connected only to each other or current processor. | 
|---|
| 213 | * | 
|---|
| 214 | * Method can't remove or reconnect existing ports, move data from/to port or perform calculations. | 
|---|
| 215 | * 'prepare' should be called again after expanding pipeline. | 
|---|
| 216 | */ | 
|---|
| 217 | virtual Processors expandPipeline() | 
|---|
| 218 | { | 
|---|
| 219 | throw Exception( "Method 'expandPipeline' is not implemented for "+ getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 220 | } | 
|---|
| 221 |  | 
|---|
| 222 | /// In case if query was cancelled executor will wait till all processors finish their jobs. | 
|---|
| 223 | /// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o). | 
|---|
| 224 | bool isCancelled() const { return is_cancelled; } | 
|---|
| 225 | void cancel() { is_cancelled = true; } | 
|---|
| 226 |  | 
|---|
| 227 | virtual ~IProcessor() = default; | 
|---|
| 228 |  | 
|---|
| 229 | auto & getInputs() { return inputs; } | 
|---|
| 230 | auto & getOutputs() { return outputs; } | 
|---|
| 231 |  | 
|---|
| 232 | UInt64 getInputPortNumber(const InputPort * input_port) const | 
|---|
| 233 | { | 
|---|
| 234 | UInt64 number = 0; | 
|---|
| 235 | for (auto & port : inputs) | 
|---|
| 236 | { | 
|---|
| 237 | if (&port == input_port) | 
|---|
| 238 | return number; | 
|---|
| 239 |  | 
|---|
| 240 | ++number; | 
|---|
| 241 | } | 
|---|
| 242 |  | 
|---|
| 243 | throw Exception( "Can't find input port for "+ getName() + " processor", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 244 | } | 
|---|
| 245 |  | 
|---|
| 246 | UInt64 getOutputPortNumber(const OutputPort * output_port) const | 
|---|
| 247 | { | 
|---|
| 248 | UInt64 number = 0; | 
|---|
| 249 | for (auto & port : outputs) | 
|---|
| 250 | { | 
|---|
| 251 | if (&port == output_port) | 
|---|
| 252 | return number; | 
|---|
| 253 |  | 
|---|
| 254 | ++number; | 
|---|
| 255 | } | 
|---|
| 256 |  | 
|---|
| 257 | throw Exception( "Can't find output port for "+ getName() + " processor", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 258 | } | 
|---|
| 259 |  | 
|---|
| 260 | const auto & getInputs() const { return inputs; } | 
|---|
| 261 | const auto & getOutputs() const { return outputs; } | 
|---|
| 262 |  | 
|---|
| 263 | /// Debug output. | 
|---|
| 264 | void dump() const; | 
|---|
| 265 |  | 
|---|
| 266 | /// Used to print pipeline. | 
|---|
| 267 | void setDescription(const std::string & description_) { processor_description = description_; } | 
|---|
| 268 | const std::string & getDescription() const { return processor_description; } | 
|---|
| 269 |  | 
|---|
| 270 | /// Helpers for pipeline executor. | 
|---|
| 271 | void setStream(size_t value) { stream_number = value; } | 
|---|
| 272 | size_t getStream() const { return stream_number; } | 
|---|
| 273 | constexpr static size_t NO_STREAM = std::numeric_limits<size_t>::max(); | 
|---|
| 274 |  | 
|---|
| 275 | private: | 
|---|
| 276 | std::atomic<bool> is_cancelled{false}; | 
|---|
| 277 |  | 
|---|
| 278 | std::string processor_description; | 
|---|
| 279 |  | 
|---|
| 280 | size_t stream_number = NO_STREAM; | 
|---|
| 281 | }; | 
|---|
| 282 |  | 
|---|
| 283 |  | 
|---|
| 284 |  | 
|---|
| 285 |  | 
|---|
| 286 | } | 
|---|
| 287 |  | 
|---|