| 1 | #pragma once |
| 2 | #include <Processors/IProcessor.h> |
| 3 | #include <Processors/Sources/SourceWithProgress.h> |
| 4 | |
| 5 | namespace DB |
| 6 | { |
| 7 | |
| 8 | class Pipe; |
| 9 | using Pipes = std::vector<Pipe>; |
| 10 | |
| 11 | /// Pipe is a set of processors which represents the part of pipeline with single output. |
| 12 | /// All processors in pipe are connected. All ports are connected except the output one. |
| 13 | class Pipe |
| 14 | { |
| 15 | public: |
| 16 | /// Create from source. It must have no input ports and single output. |
| 17 | explicit Pipe(ProcessorPtr source); |
| 18 | /// Connect several pipes together with specified transform. |
| 19 | /// Transform must have the number of inputs equals to the number of pipes. And single output. |
| 20 | /// Will connect pipes outputs with transform inputs automatically. |
| 21 | Pipe(Pipes && pipes, ProcessorPtr transform); |
| 22 | |
| 23 | Pipe(const Pipe & other) = delete; |
| 24 | Pipe(Pipe && other) = default; |
| 25 | |
| 26 | Pipe & operator=(const Pipe & other) = delete; |
| 27 | Pipe & operator=(Pipe && other) = default; |
| 28 | |
| 29 | OutputPort & getPort() const { return *output_port; } |
| 30 | const Block & () const { return output_port->getHeader(); } |
| 31 | |
| 32 | /// Add transform to pipe. It must have single input and single output (is checked). |
| 33 | /// Input will be connected with current output port, output port will be updated. |
| 34 | void addSimpleTransform(ProcessorPtr transform); |
| 35 | |
| 36 | Processors detachProcessors() && { return std::move(processors); } |
| 37 | |
| 38 | /// Specify quotas and limits for every ISourceWithProgress. |
| 39 | void setLimits(const SourceWithProgress::LocalLimits & limits); |
| 40 | void setQuota(const std::shared_ptr<QuotaContext> & quota); |
| 41 | |
| 42 | /// Set information about preferred executor number for sources. |
| 43 | void pinSources(size_t executor_number); |
| 44 | |
| 45 | void setTotalsPort(OutputPort * totals_) { totals = totals_; } |
| 46 | OutputPort * getTotalsPort() const { return totals; } |
| 47 | |
| 48 | private: |
| 49 | Processors processors; |
| 50 | OutputPort * output_port = nullptr; |
| 51 | OutputPort * totals = nullptr; |
| 52 | }; |
| 53 | |
| 54 | } |
| 55 | |