1 | #include <Processors/ResizeProcessor.h> |
---|---|
2 | |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | ResizeProcessor::Status ResizeProcessor::prepare() |
8 | { |
9 | bool is_first_output = true; |
10 | auto output_end = current_output; |
11 | |
12 | bool all_outs_full_or_unneeded = true; |
13 | bool all_outs_finished = true; |
14 | |
15 | bool is_first_input = true; |
16 | auto input_end = current_input; |
17 | |
18 | bool all_inputs_finished = true; |
19 | |
20 | auto is_end_input = [&]() { return !is_first_input && current_input == input_end; }; |
21 | auto is_end_output = [&]() { return !is_first_output && current_output == output_end; }; |
22 | |
23 | auto inc_current_input = [&]() |
24 | { |
25 | is_first_input = false; |
26 | ++current_input; |
27 | |
28 | if (current_input == inputs.end()) |
29 | current_input = inputs.begin(); |
30 | }; |
31 | |
32 | auto inc_current_output = [&]() |
33 | { |
34 | is_first_output = false; |
35 | ++current_output; |
36 | |
37 | if (current_output == outputs.end()) |
38 | current_output = outputs.begin(); |
39 | }; |
40 | |
41 | /// Find next output where can push. |
42 | auto get_next_out = [&, this]() -> OutputPorts::iterator |
43 | { |
44 | while (!is_end_output()) |
45 | { |
46 | if (!current_output->isFinished()) |
47 | { |
48 | all_outs_finished = false; |
49 | |
50 | if (current_output->canPush()) |
51 | { |
52 | all_outs_full_or_unneeded = false; |
53 | auto res_output = current_output; |
54 | inc_current_output(); |
55 | return res_output; |
56 | } |
57 | } |
58 | |
59 | inc_current_output(); |
60 | } |
61 | |
62 | return outputs.end(); |
63 | }; |
64 | |
65 | /// Find next input from where can pull. |
66 | auto get_next_input = [&, this]() -> InputPorts::iterator |
67 | { |
68 | while (!is_end_input()) |
69 | { |
70 | if (!current_input->isFinished()) |
71 | { |
72 | all_inputs_finished = false; |
73 | |
74 | current_input->setNeeded(); |
75 | if (current_input->hasData()) |
76 | { |
77 | auto res_input = current_input; |
78 | inc_current_input(); |
79 | return res_input; |
80 | } |
81 | } |
82 | |
83 | inc_current_input(); |
84 | } |
85 | |
86 | return inputs.end(); |
87 | }; |
88 | |
89 | auto get_status_if_no_outputs = [&]() -> Status |
90 | { |
91 | if (all_outs_finished) |
92 | { |
93 | for (auto & in : inputs) |
94 | in.close(); |
95 | |
96 | return Status::Finished; |
97 | } |
98 | |
99 | if (all_outs_full_or_unneeded) |
100 | { |
101 | for (auto & in : inputs) |
102 | in.setNotNeeded(); |
103 | |
104 | return Status::PortFull; |
105 | } |
106 | |
107 | /// Now, we pushed to output, and it must be full. |
108 | return Status::PortFull; |
109 | }; |
110 | |
111 | auto get_status_if_no_inputs = [&]() -> Status |
112 | { |
113 | if (all_inputs_finished) |
114 | { |
115 | for (auto & out : outputs) |
116 | out.finish(); |
117 | |
118 | return Status::Finished; |
119 | } |
120 | |
121 | return Status::NeedData; |
122 | }; |
123 | |
124 | /// Set all inputs needed in order to evenly process them. |
125 | /// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced, |
126 | /// some inputs can be skipped. |
127 | // auto set_all_unprocessed_inputs_needed = [&]() |
128 | // { |
129 | // for (; cur_input != inputs.end(); ++cur_input) |
130 | // if (!cur_input->isFinished()) |
131 | // cur_input->setNeeded(); |
132 | // }; |
133 | |
134 | while (!is_end_input() && !is_end_output()) |
135 | { |
136 | auto output = get_next_out(); |
137 | auto input = get_next_input(); |
138 | |
139 | if (output == outputs.end()) |
140 | return get_status_if_no_outputs(); |
141 | |
142 | |
143 | if (input == inputs.end()) |
144 | return get_status_if_no_inputs(); |
145 | |
146 | output->push(input->pull()); |
147 | } |
148 | |
149 | if (is_end_input()) |
150 | return get_status_if_no_outputs(); |
151 | |
152 | /// cur_input == inputs_end() |
153 | return get_status_if_no_inputs(); |
154 | } |
155 | |
156 | IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs) |
157 | { |
158 | if (!initialized) |
159 | { |
160 | initialized = true; |
161 | |
162 | for (auto & input : inputs) |
163 | { |
164 | input.setNeeded(); |
165 | input_ports.push_back({.port = &input, .status = InputStatus::NotActive}); |
166 | } |
167 | |
168 | for (auto & output : outputs) |
169 | output_ports.push_back({.port = &output, .status = OutputStatus::NotActive}); |
170 | } |
171 | |
172 | for (auto & output_number : updated_outputs) |
173 | { |
174 | auto & output = output_ports[output_number]; |
175 | if (output.port->isFinished()) |
176 | { |
177 | if (output.status != OutputStatus::Finished) |
178 | { |
179 | ++num_finished_outputs; |
180 | output.status = OutputStatus::Finished; |
181 | } |
182 | |
183 | continue; |
184 | } |
185 | |
186 | if (output.port->canPush()) |
187 | { |
188 | if (output.status != OutputStatus::NeedData) |
189 | { |
190 | output.status = OutputStatus::NeedData; |
191 | waiting_outputs.push(output_number); |
192 | } |
193 | } |
194 | } |
195 | |
196 | if (num_finished_outputs == outputs.size()) |
197 | { |
198 | for (auto & input : inputs) |
199 | input.close(); |
200 | |
201 | return Status::Finished; |
202 | } |
203 | |
204 | for (auto & input_number : updated_inputs) |
205 | { |
206 | auto & input = input_ports[input_number]; |
207 | if (input.port->isFinished()) |
208 | { |
209 | if (input.status != InputStatus::Finished) |
210 | { |
211 | input.status = InputStatus::Finished; |
212 | ++num_finished_inputs; |
213 | } |
214 | continue; |
215 | } |
216 | |
217 | if (input.port->hasData()) |
218 | { |
219 | if (input.status != InputStatus::HasData) |
220 | { |
221 | input.status = InputStatus::HasData; |
222 | inputs_with_data.push(input_number); |
223 | } |
224 | } |
225 | } |
226 | |
227 | while (!waiting_outputs.empty() && !inputs_with_data.empty()) |
228 | { |
229 | auto & waiting_output = output_ports[waiting_outputs.front()]; |
230 | waiting_outputs.pop(); |
231 | |
232 | auto & input_with_data = input_ports[inputs_with_data.front()]; |
233 | inputs_with_data.pop(); |
234 | |
235 | waiting_output.port->pushData(input_with_data.port->pullData()); |
236 | input_with_data.status = InputStatus::NotActive; |
237 | waiting_output.status = OutputStatus::NotActive; |
238 | |
239 | if (input_with_data.port->isFinished()) |
240 | { |
241 | input_with_data.status = InputStatus::Finished; |
242 | ++num_finished_inputs; |
243 | } |
244 | } |
245 | |
246 | if (num_finished_inputs == inputs.size()) |
247 | { |
248 | for (auto & output : outputs) |
249 | output.finish(); |
250 | |
251 | return Status::Finished; |
252 | } |
253 | |
254 | if (!waiting_outputs.empty()) |
255 | return Status::NeedData; |
256 | |
257 | return Status::PortFull; |
258 | } |
259 | |
260 | } |
261 | |
262 |