1 | #pragma once |
2 | |
3 | #include <Processors/IProcessor.h> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | /** Has one input and one output. |
10 | * Simply pull a block from input, transform it, and push it to output. |
11 | */ |
12 | class ISimpleTransform : public IProcessor |
13 | { |
14 | protected: |
15 | InputPort & input; |
16 | OutputPort & output; |
17 | |
18 | Port::Data current_data; |
19 | bool has_input = false; |
20 | bool transformed = false; |
21 | bool no_more_data_needed = false; |
22 | const bool skip_empty_chunks; |
23 | |
24 | /// Set input port NotNeeded after chunk was pulled. |
25 | /// Input port will become needed again only after data was transformed. |
26 | /// This allows to escape caching chunks in input port, which can lead to uneven data distribution. |
27 | bool set_input_not_needed_after_read = false; |
28 | |
29 | virtual void transform(Chunk & chunk) = 0; |
30 | void stopReading() { no_more_data_needed = true; } |
31 | |
32 | public: |
33 | ISimpleTransform(Block , Block , bool skip_empty_chunks_); |
34 | |
35 | Status prepare() override; |
36 | void work() override; |
37 | |
38 | InputPort & getInputPort() { return input; } |
39 | OutputPort & getOutputPort() { return output; } |
40 | |
41 | void setInputNotNeededAfterRead(bool value) { set_input_not_needed_after_read = value; } |
42 | }; |
43 | |
44 | } |
45 | |