1#include <Processors/Pipe.h>
2#include <IO/WriteHelpers.h>
3
4namespace DB
5{
6
7static void checkSingleInput(const IProcessor & transform)
8{
9 if (transform.getInputs().size() != 1)
10 throw Exception("Processor for pipe should have single input, "
11 "but " + transform.getName() + " has " +
12 toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
13}
14
15static void checkMultipleInputs(const IProcessor & transform, size_t num_inputs)
16{
17 if (transform.getInputs().size() != num_inputs)
18 throw Exception("Processor for pipe should have " + toString(num_inputs) + " inputs, "
19 "but " + transform.getName() + " has " +
20 toString(transform.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
21}
22
23static void checkSingleOutput(const IProcessor & transform)
24{
25 if (transform.getOutputs().size() != 1)
26 throw Exception("Processor for pipe should have single output, "
27 "but " + transform.getName() + " has " +
28 toString(transform.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
29}
30
31static void checkSimpleTransform(const IProcessor & transform)
32{
33 checkSingleInput(transform);
34 checkSingleOutput(transform);
35}
36
37static void checkSource(const IProcessor & source)
38{
39 if (!source.getInputs().empty())
40 throw Exception("Source for pipe shouldn't have any input, but " + source.getName() + " has " +
41 toString(source.getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
42
43 if (source.getOutputs().empty())
44 throw Exception("Source for pipe should have single output, but it doesn't have any",
45 ErrorCodes::LOGICAL_ERROR);
46
47 if (source.getOutputs().size() > 2)
48 throw Exception("Source for pipe should have single or two outputs, but " + source.getName() + " has " +
49 toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
50}
51
52
53Pipe::Pipe(ProcessorPtr source)
54{
55 checkSource(*source);
56 output_port = &source->getOutputs().front();
57
58 if (source->getOutputs().size() > 1)
59 totals = &source->getOutputs().back();
60
61 processors.emplace_back(std::move(source));
62}
63
64Pipe::Pipe(Pipes && pipes, ProcessorPtr transform)
65{
66 checkSingleOutput(*transform);
67 checkMultipleInputs(*transform, pipes.size());
68
69 auto it = transform->getInputs().begin();
70
71 for (auto & pipe : pipes)
72 {
73 connect(*pipe.output_port, *it);
74 ++it;
75
76 processors.insert(processors.end(), pipe.processors.begin(), pipe.processors.end());
77 }
78
79 output_port = &transform->getOutputs().front();
80 processors.emplace_back(std::move(transform));
81}
82
83void Pipe::addSimpleTransform(ProcessorPtr transform)
84{
85 checkSimpleTransform(*transform);
86 connect(*output_port, transform->getInputs().front());
87 output_port = &transform->getOutputs().front();
88 processors.emplace_back(std::move(transform));
89}
90
91void Pipe::setLimits(const ISourceWithProgress::LocalLimits & limits)
92{
93 for (auto & processor : processors)
94 {
95 if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
96 source_with_progress->setLimits(limits);
97 }
98}
99
100void Pipe::setQuota(const std::shared_ptr<QuotaContext> & quota)
101{
102 for (auto & processor : processors)
103 {
104 if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
105 source_with_progress->setQuota(quota);
106 }
107}
108
109void Pipe::pinSources(size_t executor_number)
110{
111 for (auto & processor : processors)
112 {
113 if (auto * source = dynamic_cast<ISource *>(processor.get()))
114 source->setStream(executor_number);
115 }
116}
117
118}
119