1 | #include <Processors/Executors/TreeExecutorBlockInputStream.h> |
2 | #include <Processors/Sources/SourceWithProgress.h> |
3 | #include <stack> |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | static void checkProcessorHasSingleOutput(IProcessor * processor) |
9 | { |
10 | size_t num_outputs = processor->getOutputs().size(); |
11 | if (num_outputs != 1) |
12 | throw Exception("All processors in TreeExecutorBlockInputStream must have single output, " |
13 | "but processor with name " + processor->getName() + " has " + std::to_string(num_outputs), |
14 | ErrorCodes::LOGICAL_ERROR); |
15 | } |
16 | |
17 | /// Check tree invariants (described in TreeExecutor.h). |
18 | /// Collect sources with progress. |
19 | static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources) |
20 | { |
21 | std::unordered_map<IProcessor *, size_t> index; |
22 | |
23 | for (auto & processor : processors) |
24 | { |
25 | bool is_inserted = index.try_emplace(processor.get(), index.size()).second; |
26 | |
27 | if (!is_inserted) |
28 | throw Exception("Duplicate processor in TreeExecutorBlockInputStream with name " + processor->getName(), |
29 | ErrorCodes::LOGICAL_ERROR); |
30 | } |
31 | |
32 | std::vector<bool> is_visited(processors.size(), false); |
33 | std::stack<IProcessor *> stack; |
34 | |
35 | stack.push(root); |
36 | |
37 | while (!stack.empty()) |
38 | { |
39 | IProcessor * node = stack.top(); |
40 | stack.pop(); |
41 | |
42 | auto it = index.find(node); |
43 | |
44 | if (it == index.end()) |
45 | throw Exception("Processor with name " + node->getName() + " " |
46 | "was not mentioned in list passed to TreeExecutorBlockInputStream, " |
47 | "but was traversed to from other processors." , ErrorCodes::LOGICAL_ERROR); |
48 | |
49 | size_t position = it->second; |
50 | |
51 | if (is_visited[position]) |
52 | throw Exception("Processor with name " + node->getName() + " was visited twice while traverse in TreeExecutorBlockInputStream. " |
53 | "Passed processors are not tree." , ErrorCodes::LOGICAL_ERROR); |
54 | |
55 | is_visited[position] = true; |
56 | |
57 | checkProcessorHasSingleOutput(node); |
58 | |
59 | auto & children = node->getInputs(); |
60 | for (auto & child : children) |
61 | stack.push(&child.getOutputPort().getProcessor()); |
62 | |
63 | /// Fill sources array. |
64 | if (children.empty()) |
65 | { |
66 | if (auto * source = dynamic_cast<ISourceWithProgress *>(node)) |
67 | sources.push_back(source); |
68 | } |
69 | } |
70 | |
71 | for (size_t i = 0; i < is_visited.size(); ++i) |
72 | if (!is_visited[i]) |
73 | throw Exception("Processor with name " + processors[i]->getName() + |
74 | " was not visited by traverse in TreeExecutorBlockInputStream." , ErrorCodes::LOGICAL_ERROR); |
75 | } |
76 | |
77 | void TreeExecutorBlockInputStream::init() |
78 | { |
79 | if (processors.empty()) |
80 | throw Exception("No processors were passed to TreeExecutorBlockInputStream." , ErrorCodes::LOGICAL_ERROR); |
81 | |
82 | root = &output_port.getProcessor(); |
83 | |
84 | validateTree(processors, root, sources_with_progress); |
85 | |
86 | input_port = std::make_unique<InputPort>(getHeader(), root); |
87 | connect(output_port, *input_port); |
88 | input_port->setNeeded(); |
89 | } |
90 | |
91 | void TreeExecutorBlockInputStream::execute() |
92 | { |
93 | std::stack<IProcessor *> stack; |
94 | stack.push(root); |
95 | |
96 | auto prepare_processor = [](IProcessor * processor) |
97 | { |
98 | try |
99 | { |
100 | return processor->prepare(); |
101 | } |
102 | catch (Exception & exception) |
103 | { |
104 | exception.addMessage(" While executing processor " + processor->getName()); |
105 | throw; |
106 | } |
107 | }; |
108 | |
109 | while (!stack.empty()) |
110 | { |
111 | IProcessor * node = stack.top(); |
112 | |
113 | auto status = prepare_processor(node); |
114 | |
115 | switch (status) |
116 | { |
117 | case IProcessor::Status::NeedData: |
118 | { |
119 | auto & inputs = node->getInputs(); |
120 | |
121 | if (inputs.empty()) |
122 | throw Exception("Processors " + node->getName() + " with empty input " |
123 | "has returned NeedData in TreeExecutorBlockInputStream" , ErrorCodes::LOGICAL_ERROR); |
124 | |
125 | bool all_finished = true; |
126 | |
127 | for (auto & input : inputs) |
128 | { |
129 | if (input.isFinished()) |
130 | continue; |
131 | |
132 | all_finished = false; |
133 | |
134 | stack.push(&input.getOutputPort().getProcessor()); |
135 | } |
136 | |
137 | if (all_finished) |
138 | throw Exception("Processors " + node->getName() + " has returned NeedData in TreeExecutorBlockInputStream, " |
139 | "but all it's inputs are finished." , ErrorCodes::LOGICAL_ERROR); |
140 | break; |
141 | } |
142 | case IProcessor::Status::PortFull: |
143 | { |
144 | stack.pop(); |
145 | break; |
146 | } |
147 | case IProcessor::Status::Finished: |
148 | { |
149 | stack.pop(); |
150 | break; |
151 | } |
152 | case IProcessor::Status::Ready: |
153 | { |
154 | node->work(); |
155 | break; |
156 | } |
157 | case IProcessor::Status::Async: |
158 | case IProcessor::Status::Wait: |
159 | case IProcessor::Status::ExpandPipeline: |
160 | { |
161 | throw Exception("Processor with name " + node->getName() + " " |
162 | "returned status " + IProcessor::statusToName(status) + " " |
163 | "which is not supported in TreeExecutorBlockInputStream." , ErrorCodes::LOGICAL_ERROR); |
164 | } |
165 | } |
166 | } |
167 | } |
168 | |
169 | Block TreeExecutorBlockInputStream::readImpl() |
170 | { |
171 | while (true) |
172 | { |
173 | if (input_port->isFinished()) |
174 | return {}; |
175 | |
176 | if (input_port->hasData()) |
177 | return getHeader().cloneWithColumns(input_port->pull().detachColumns()); |
178 | |
179 | execute(); |
180 | } |
181 | } |
182 | |
183 | void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback) |
184 | { |
185 | for (auto & source : sources_with_progress) |
186 | source->setProgressCallback(callback); |
187 | } |
188 | |
189 | void TreeExecutorBlockInputStream::setProcessListElement(QueryStatus * elem) |
190 | { |
191 | for (auto & source : sources_with_progress) |
192 | source->setProcessListElement(elem); |
193 | } |
194 | |
195 | void TreeExecutorBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_) |
196 | { |
197 | for (auto & source : sources_with_progress) |
198 | source->setLimits(limits_); |
199 | } |
200 | |
201 | void TreeExecutorBlockInputStream::setQuota(const std::shared_ptr<QuotaContext> & quota_) |
202 | { |
203 | for (auto & source : sources_with_progress) |
204 | source->setQuota(quota_); |
205 | } |
206 | |
207 | void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value) |
208 | { |
209 | /// Add only for one source. |
210 | if (!sources_with_progress.empty()) |
211 | sources_with_progress.front()->addTotalRowsApprox(value); |
212 | } |
213 | |
214 | } |
215 | |