1#include <Processors/ResizeProcessor.h>
2
3
4namespace DB
5{
6
7ResizeProcessor::Status ResizeProcessor::prepare()
8{
9 bool is_first_output = true;
10 auto output_end = current_output;
11
12 bool all_outs_full_or_unneeded = true;
13 bool all_outs_finished = true;
14
15 bool is_first_input = true;
16 auto input_end = current_input;
17
18 bool all_inputs_finished = true;
19
20 auto is_end_input = [&]() { return !is_first_input && current_input == input_end; };
21 auto is_end_output = [&]() { return !is_first_output && current_output == output_end; };
22
23 auto inc_current_input = [&]()
24 {
25 is_first_input = false;
26 ++current_input;
27
28 if (current_input == inputs.end())
29 current_input = inputs.begin();
30 };
31
32 auto inc_current_output = [&]()
33 {
34 is_first_output = false;
35 ++current_output;
36
37 if (current_output == outputs.end())
38 current_output = outputs.begin();
39 };
40
41 /// Find next output where can push.
42 auto get_next_out = [&, this]() -> OutputPorts::iterator
43 {
44 while (!is_end_output())
45 {
46 if (!current_output->isFinished())
47 {
48 all_outs_finished = false;
49
50 if (current_output->canPush())
51 {
52 all_outs_full_or_unneeded = false;
53 auto res_output = current_output;
54 inc_current_output();
55 return res_output;
56 }
57 }
58
59 inc_current_output();
60 }
61
62 return outputs.end();
63 };
64
65 /// Find next input from where can pull.
66 auto get_next_input = [&, this]() -> InputPorts::iterator
67 {
68 while (!is_end_input())
69 {
70 if (!current_input->isFinished())
71 {
72 all_inputs_finished = false;
73
74 current_input->setNeeded();
75 if (current_input->hasData())
76 {
77 auto res_input = current_input;
78 inc_current_input();
79 return res_input;
80 }
81 }
82
83 inc_current_input();
84 }
85
86 return inputs.end();
87 };
88
89 auto get_status_if_no_outputs = [&]() -> Status
90 {
91 if (all_outs_finished)
92 {
93 for (auto & in : inputs)
94 in.close();
95
96 return Status::Finished;
97 }
98
99 if (all_outs_full_or_unneeded)
100 {
101 for (auto & in : inputs)
102 in.setNotNeeded();
103
104 return Status::PortFull;
105 }
106
107 /// Now, we pushed to output, and it must be full.
108 return Status::PortFull;
109 };
110
111 auto get_status_if_no_inputs = [&]() -> Status
112 {
113 if (all_inputs_finished)
114 {
115 for (auto & out : outputs)
116 out.finish();
117
118 return Status::Finished;
119 }
120
121 return Status::NeedData;
122 };
123
124 /// Set all inputs needed in order to evenly process them.
125 /// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced,
126 /// some inputs can be skipped.
127// auto set_all_unprocessed_inputs_needed = [&]()
128// {
129// for (; cur_input != inputs.end(); ++cur_input)
130// if (!cur_input->isFinished())
131// cur_input->setNeeded();
132// };
133
134 while (!is_end_input() && !is_end_output())
135 {
136 auto output = get_next_out();
137 auto input = get_next_input();
138
139 if (output == outputs.end())
140 return get_status_if_no_outputs();
141
142
143 if (input == inputs.end())
144 return get_status_if_no_inputs();
145
146 output->push(input->pull());
147 }
148
149 if (is_end_input())
150 return get_status_if_no_outputs();
151
152 /// cur_input == inputs_end()
153 return get_status_if_no_inputs();
154}
155
156IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
157{
158 if (!initialized)
159 {
160 initialized = true;
161
162 for (auto & input : inputs)
163 {
164 input.setNeeded();
165 input_ports.push_back({.port = &input, .status = InputStatus::NotActive});
166 }
167
168 for (auto & output : outputs)
169 output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
170 }
171
172 for (auto & output_number : updated_outputs)
173 {
174 auto & output = output_ports[output_number];
175 if (output.port->isFinished())
176 {
177 if (output.status != OutputStatus::Finished)
178 {
179 ++num_finished_outputs;
180 output.status = OutputStatus::Finished;
181 }
182
183 continue;
184 }
185
186 if (output.port->canPush())
187 {
188 if (output.status != OutputStatus::NeedData)
189 {
190 output.status = OutputStatus::NeedData;
191 waiting_outputs.push(output_number);
192 }
193 }
194 }
195
196 if (num_finished_outputs == outputs.size())
197 {
198 for (auto & input : inputs)
199 input.close();
200
201 return Status::Finished;
202 }
203
204 for (auto & input_number : updated_inputs)
205 {
206 auto & input = input_ports[input_number];
207 if (input.port->isFinished())
208 {
209 if (input.status != InputStatus::Finished)
210 {
211 input.status = InputStatus::Finished;
212 ++num_finished_inputs;
213 }
214 continue;
215 }
216
217 if (input.port->hasData())
218 {
219 if (input.status != InputStatus::HasData)
220 {
221 input.status = InputStatus::HasData;
222 inputs_with_data.push(input_number);
223 }
224 }
225 }
226
227 while (!waiting_outputs.empty() && !inputs_with_data.empty())
228 {
229 auto & waiting_output = output_ports[waiting_outputs.front()];
230 waiting_outputs.pop();
231
232 auto & input_with_data = input_ports[inputs_with_data.front()];
233 inputs_with_data.pop();
234
235 waiting_output.port->pushData(input_with_data.port->pullData());
236 input_with_data.status = InputStatus::NotActive;
237 waiting_output.status = OutputStatus::NotActive;
238
239 if (input_with_data.port->isFinished())
240 {
241 input_with_data.status = InputStatus::Finished;
242 ++num_finished_inputs;
243 }
244 }
245
246 if (num_finished_inputs == inputs.size())
247 {
248 for (auto & output : outputs)
249 output.finish();
250
251 return Status::Finished;
252 }
253
254 if (!waiting_outputs.empty())
255 return Status::NeedData;
256
257 return Status::PortFull;
258}
259
260}
261
262