1 | #include <Processors/Pipe.h> |
2 | #include <IO/WriteHelpers.h> |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | static void checkSingleInput(const IProcessor & transform) |
8 | { |
9 | if (transform.getInputs().size() != 1) |
10 | throw Exception("Processor for pipe should have single input, " |
11 | "but " + transform.getName() + " has " + |
12 | toString(transform.getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
13 | } |
14 | |
15 | static void checkMultipleInputs(const IProcessor & transform, size_t num_inputs) |
16 | { |
17 | if (transform.getInputs().size() != num_inputs) |
18 | throw Exception("Processor for pipe should have " + toString(num_inputs) + " inputs, " |
19 | "but " + transform.getName() + " has " + |
20 | toString(transform.getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
21 | } |
22 | |
23 | static void checkSingleOutput(const IProcessor & transform) |
24 | { |
25 | if (transform.getOutputs().size() != 1) |
26 | throw Exception("Processor for pipe should have single output, " |
27 | "but " + transform.getName() + " has " + |
28 | toString(transform.getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
29 | } |
30 | |
31 | static void checkSimpleTransform(const IProcessor & transform) |
32 | { |
33 | checkSingleInput(transform); |
34 | checkSingleOutput(transform); |
35 | } |
36 | |
37 | static void checkSource(const IProcessor & source) |
38 | { |
39 | if (!source.getInputs().empty()) |
40 | throw Exception("Source for pipe shouldn't have any input, but " + source.getName() + " has " + |
41 | toString(source.getInputs().size()) + " inputs." , ErrorCodes::LOGICAL_ERROR); |
42 | |
43 | if (source.getOutputs().empty()) |
44 | throw Exception("Source for pipe should have single output, but it doesn't have any" , |
45 | ErrorCodes::LOGICAL_ERROR); |
46 | |
47 | if (source.getOutputs().size() > 2) |
48 | throw Exception("Source for pipe should have single or two outputs, but " + source.getName() + " has " + |
49 | toString(source.getOutputs().size()) + " outputs." , ErrorCodes::LOGICAL_ERROR); |
50 | } |
51 | |
52 | |
53 | Pipe::Pipe(ProcessorPtr source) |
54 | { |
55 | checkSource(*source); |
56 | output_port = &source->getOutputs().front(); |
57 | |
58 | if (source->getOutputs().size() > 1) |
59 | totals = &source->getOutputs().back(); |
60 | |
61 | processors.emplace_back(std::move(source)); |
62 | } |
63 | |
64 | Pipe::Pipe(Pipes && pipes, ProcessorPtr transform) |
65 | { |
66 | checkSingleOutput(*transform); |
67 | checkMultipleInputs(*transform, pipes.size()); |
68 | |
69 | auto it = transform->getInputs().begin(); |
70 | |
71 | for (auto & pipe : pipes) |
72 | { |
73 | connect(*pipe.output_port, *it); |
74 | ++it; |
75 | |
76 | processors.insert(processors.end(), pipe.processors.begin(), pipe.processors.end()); |
77 | } |
78 | |
79 | output_port = &transform->getOutputs().front(); |
80 | processors.emplace_back(std::move(transform)); |
81 | } |
82 | |
83 | void Pipe::addSimpleTransform(ProcessorPtr transform) |
84 | { |
85 | checkSimpleTransform(*transform); |
86 | connect(*output_port, transform->getInputs().front()); |
87 | output_port = &transform->getOutputs().front(); |
88 | processors.emplace_back(std::move(transform)); |
89 | } |
90 | |
91 | void Pipe::setLimits(const ISourceWithProgress::LocalLimits & limits) |
92 | { |
93 | for (auto & processor : processors) |
94 | { |
95 | if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get())) |
96 | source_with_progress->setLimits(limits); |
97 | } |
98 | } |
99 | |
100 | void Pipe::setQuota(const std::shared_ptr<QuotaContext> & quota) |
101 | { |
102 | for (auto & processor : processors) |
103 | { |
104 | if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get())) |
105 | source_with_progress->setQuota(quota); |
106 | } |
107 | } |
108 | |
109 | void Pipe::pinSources(size_t executor_number) |
110 | { |
111 | for (auto & processor : processors) |
112 | { |
113 | if (auto * source = dynamic_cast<ISource *>(processor.get())) |
114 | source->setStream(executor_number); |
115 | } |
116 | } |
117 | |
118 | } |
119 | |