1#include <Processors/Executors/TreeExecutorBlockInputStream.h>
2#include <Processors/Sources/SourceWithProgress.h>
3#include <stack>
4
5namespace DB
6{
7
8static void checkProcessorHasSingleOutput(IProcessor * processor)
9{
10 size_t num_outputs = processor->getOutputs().size();
11 if (num_outputs != 1)
12 throw Exception("All processors in TreeExecutorBlockInputStream must have single output, "
13 "but processor with name " + processor->getName() + " has " + std::to_string(num_outputs),
14 ErrorCodes::LOGICAL_ERROR);
15}
16
17/// Check tree invariants (described in TreeExecutor.h).
18/// Collect sources with progress.
19static void validateTree(const Processors & processors, IProcessor * root, std::vector<ISourceWithProgress *> & sources)
20{
21 std::unordered_map<IProcessor *, size_t> index;
22
23 for (auto & processor : processors)
24 {
25 bool is_inserted = index.try_emplace(processor.get(), index.size()).second;
26
27 if (!is_inserted)
28 throw Exception("Duplicate processor in TreeExecutorBlockInputStream with name " + processor->getName(),
29 ErrorCodes::LOGICAL_ERROR);
30 }
31
32 std::vector<bool> is_visited(processors.size(), false);
33 std::stack<IProcessor *> stack;
34
35 stack.push(root);
36
37 while (!stack.empty())
38 {
39 IProcessor * node = stack.top();
40 stack.pop();
41
42 auto it = index.find(node);
43
44 if (it == index.end())
45 throw Exception("Processor with name " + node->getName() + " "
46 "was not mentioned in list passed to TreeExecutorBlockInputStream, "
47 "but was traversed to from other processors.", ErrorCodes::LOGICAL_ERROR);
48
49 size_t position = it->second;
50
51 if (is_visited[position])
52 throw Exception("Processor with name " + node->getName() + " was visited twice while traverse in TreeExecutorBlockInputStream. "
53 "Passed processors are not tree.", ErrorCodes::LOGICAL_ERROR);
54
55 is_visited[position] = true;
56
57 checkProcessorHasSingleOutput(node);
58
59 auto & children = node->getInputs();
60 for (auto & child : children)
61 stack.push(&child.getOutputPort().getProcessor());
62
63 /// Fill sources array.
64 if (children.empty())
65 {
66 if (auto * source = dynamic_cast<ISourceWithProgress *>(node))
67 sources.push_back(source);
68 }
69 }
70
71 for (size_t i = 0; i < is_visited.size(); ++i)
72 if (!is_visited[i])
73 throw Exception("Processor with name " + processors[i]->getName() +
74 " was not visited by traverse in TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
75}
76
77void TreeExecutorBlockInputStream::init()
78{
79 if (processors.empty())
80 throw Exception("No processors were passed to TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
81
82 root = &output_port.getProcessor();
83
84 validateTree(processors, root, sources_with_progress);
85
86 input_port = std::make_unique<InputPort>(getHeader(), root);
87 connect(output_port, *input_port);
88 input_port->setNeeded();
89}
90
91void TreeExecutorBlockInputStream::execute()
92{
93 std::stack<IProcessor *> stack;
94 stack.push(root);
95
96 auto prepare_processor = [](IProcessor * processor)
97 {
98 try
99 {
100 return processor->prepare();
101 }
102 catch (Exception & exception)
103 {
104 exception.addMessage(" While executing processor " + processor->getName());
105 throw;
106 }
107 };
108
109 while (!stack.empty())
110 {
111 IProcessor * node = stack.top();
112
113 auto status = prepare_processor(node);
114
115 switch (status)
116 {
117 case IProcessor::Status::NeedData:
118 {
119 auto & inputs = node->getInputs();
120
121 if (inputs.empty())
122 throw Exception("Processors " + node->getName() + " with empty input "
123 "has returned NeedData in TreeExecutorBlockInputStream", ErrorCodes::LOGICAL_ERROR);
124
125 bool all_finished = true;
126
127 for (auto & input : inputs)
128 {
129 if (input.isFinished())
130 continue;
131
132 all_finished = false;
133
134 stack.push(&input.getOutputPort().getProcessor());
135 }
136
137 if (all_finished)
138 throw Exception("Processors " + node->getName() + " has returned NeedData in TreeExecutorBlockInputStream, "
139 "but all it's inputs are finished.", ErrorCodes::LOGICAL_ERROR);
140 break;
141 }
142 case IProcessor::Status::PortFull:
143 {
144 stack.pop();
145 break;
146 }
147 case IProcessor::Status::Finished:
148 {
149 stack.pop();
150 break;
151 }
152 case IProcessor::Status::Ready:
153 {
154 node->work();
155 break;
156 }
157 case IProcessor::Status::Async:
158 case IProcessor::Status::Wait:
159 case IProcessor::Status::ExpandPipeline:
160 {
161 throw Exception("Processor with name " + node->getName() + " "
162 "returned status " + IProcessor::statusToName(status) + " "
163 "which is not supported in TreeExecutorBlockInputStream.", ErrorCodes::LOGICAL_ERROR);
164 }
165 }
166 }
167}
168
169Block TreeExecutorBlockInputStream::readImpl()
170{
171 while (true)
172 {
173 if (input_port->isFinished())
174 return {};
175
176 if (input_port->hasData())
177 return getHeader().cloneWithColumns(input_port->pull().detachColumns());
178
179 execute();
180 }
181}
182
183void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback & callback)
184{
185 for (auto & source : sources_with_progress)
186 source->setProgressCallback(callback);
187}
188
189void TreeExecutorBlockInputStream::setProcessListElement(QueryStatus * elem)
190{
191 for (auto & source : sources_with_progress)
192 source->setProcessListElement(elem);
193}
194
195void TreeExecutorBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_)
196{
197 for (auto & source : sources_with_progress)
198 source->setLimits(limits_);
199}
200
201void TreeExecutorBlockInputStream::setQuota(const std::shared_ptr<QuotaContext> & quota_)
202{
203 for (auto & source : sources_with_progress)
204 source->setQuota(quota_);
205}
206
207void TreeExecutorBlockInputStream::addTotalRowsApprox(size_t value)
208{
209 /// Add only for one source.
210 if (!sources_with_progress.empty())
211 sources_with_progress.front()->addTotalRowsApprox(value);
212}
213
214}
215