1#pragma once
2#include <Processors/IProcessor.h>
3#include <Processors/Sources/SourceWithProgress.h>
4
5namespace DB
6{
7
8class Pipe;
9using Pipes = std::vector<Pipe>;
10
11/// Pipe is a set of processors which represents the part of pipeline with single output.
12/// All processors in pipe are connected. All ports are connected except the output one.
13class Pipe
14{
15public:
16 /// Create from source. It must have no input ports and single output.
17 explicit Pipe(ProcessorPtr source);
18 /// Connect several pipes together with specified transform.
19 /// Transform must have the number of inputs equals to the number of pipes. And single output.
20 /// Will connect pipes outputs with transform inputs automatically.
21 Pipe(Pipes && pipes, ProcessorPtr transform);
22
23 Pipe(const Pipe & other) = delete;
24 Pipe(Pipe && other) = default;
25
26 Pipe & operator=(const Pipe & other) = delete;
27 Pipe & operator=(Pipe && other) = default;
28
29 OutputPort & getPort() const { return *output_port; }
30 const Block & getHeader() const { return output_port->getHeader(); }
31
32 /// Add transform to pipe. It must have single input and single output (is checked).
33 /// Input will be connected with current output port, output port will be updated.
34 void addSimpleTransform(ProcessorPtr transform);
35
36 Processors detachProcessors() && { return std::move(processors); }
37
38 /// Specify quotas and limits for every ISourceWithProgress.
39 void setLimits(const SourceWithProgress::LocalLimits & limits);
40 void setQuota(const std::shared_ptr<QuotaContext> & quota);
41
42 /// Set information about preferred executor number for sources.
43 void pinSources(size_t executor_number);
44
45 void setTotalsPort(OutputPort * totals_) { totals = totals_; }
46 OutputPort * getTotalsPort() const { return totals; }
47
48private:
49 Processors processors;
50 OutputPort * output_port = nullptr;
51 OutputPort * totals = nullptr;
52};
53
54}
55