1 | #pragma once |
2 | #include <Processors/IProcessor.h> |
3 | #include <Processors/Sources/SourceWithProgress.h> |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | class Pipe; |
9 | using Pipes = std::vector<Pipe>; |
10 | |
11 | /// Pipe is a set of processors which represents the part of pipeline with single output. |
12 | /// All processors in pipe are connected. All ports are connected except the output one. |
13 | class Pipe |
14 | { |
15 | public: |
16 | /// Create from source. It must have no input ports and single output. |
17 | explicit Pipe(ProcessorPtr source); |
18 | /// Connect several pipes together with specified transform. |
19 | /// Transform must have the number of inputs equals to the number of pipes. And single output. |
20 | /// Will connect pipes outputs with transform inputs automatically. |
21 | Pipe(Pipes && pipes, ProcessorPtr transform); |
22 | |
23 | Pipe(const Pipe & other) = delete; |
24 | Pipe(Pipe && other) = default; |
25 | |
26 | Pipe & operator=(const Pipe & other) = delete; |
27 | Pipe & operator=(Pipe && other) = default; |
28 | |
29 | OutputPort & getPort() const { return *output_port; } |
30 | const Block & () const { return output_port->getHeader(); } |
31 | |
32 | /// Add transform to pipe. It must have single input and single output (is checked). |
33 | /// Input will be connected with current output port, output port will be updated. |
34 | void addSimpleTransform(ProcessorPtr transform); |
35 | |
36 | Processors detachProcessors() && { return std::move(processors); } |
37 | |
38 | /// Specify quotas and limits for every ISourceWithProgress. |
39 | void setLimits(const SourceWithProgress::LocalLimits & limits); |
40 | void setQuota(const std::shared_ptr<QuotaContext> & quota); |
41 | |
42 | /// Set information about preferred executor number for sources. |
43 | void pinSources(size_t executor_number); |
44 | |
45 | void setTotalsPort(OutputPort * totals_) { totals = totals_; } |
46 | OutputPort * getTotalsPort() const { return totals; } |
47 | |
48 | private: |
49 | Processors processors; |
50 | OutputPort * output_port = nullptr; |
51 | OutputPort * totals = nullptr; |
52 | }; |
53 | |
54 | } |
55 | |