| 1 | #include <Processors/ResizeProcessor.h> |
|---|---|
| 2 | |
| 3 | |
| 4 | namespace DB |
| 5 | { |
| 6 | |
| 7 | ResizeProcessor::Status ResizeProcessor::prepare() |
| 8 | { |
| 9 | bool is_first_output = true; |
| 10 | auto output_end = current_output; |
| 11 | |
| 12 | bool all_outs_full_or_unneeded = true; |
| 13 | bool all_outs_finished = true; |
| 14 | |
| 15 | bool is_first_input = true; |
| 16 | auto input_end = current_input; |
| 17 | |
| 18 | bool all_inputs_finished = true; |
| 19 | |
| 20 | auto is_end_input = [&]() { return !is_first_input && current_input == input_end; }; |
| 21 | auto is_end_output = [&]() { return !is_first_output && current_output == output_end; }; |
| 22 | |
| 23 | auto inc_current_input = [&]() |
| 24 | { |
| 25 | is_first_input = false; |
| 26 | ++current_input; |
| 27 | |
| 28 | if (current_input == inputs.end()) |
| 29 | current_input = inputs.begin(); |
| 30 | }; |
| 31 | |
| 32 | auto inc_current_output = [&]() |
| 33 | { |
| 34 | is_first_output = false; |
| 35 | ++current_output; |
| 36 | |
| 37 | if (current_output == outputs.end()) |
| 38 | current_output = outputs.begin(); |
| 39 | }; |
| 40 | |
| 41 | /// Find next output where can push. |
| 42 | auto get_next_out = [&, this]() -> OutputPorts::iterator |
| 43 | { |
| 44 | while (!is_end_output()) |
| 45 | { |
| 46 | if (!current_output->isFinished()) |
| 47 | { |
| 48 | all_outs_finished = false; |
| 49 | |
| 50 | if (current_output->canPush()) |
| 51 | { |
| 52 | all_outs_full_or_unneeded = false; |
| 53 | auto res_output = current_output; |
| 54 | inc_current_output(); |
| 55 | return res_output; |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | inc_current_output(); |
| 60 | } |
| 61 | |
| 62 | return outputs.end(); |
| 63 | }; |
| 64 | |
| 65 | /// Find next input from where can pull. |
| 66 | auto get_next_input = [&, this]() -> InputPorts::iterator |
| 67 | { |
| 68 | while (!is_end_input()) |
| 69 | { |
| 70 | if (!current_input->isFinished()) |
| 71 | { |
| 72 | all_inputs_finished = false; |
| 73 | |
| 74 | current_input->setNeeded(); |
| 75 | if (current_input->hasData()) |
| 76 | { |
| 77 | auto res_input = current_input; |
| 78 | inc_current_input(); |
| 79 | return res_input; |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | inc_current_input(); |
| 84 | } |
| 85 | |
| 86 | return inputs.end(); |
| 87 | }; |
| 88 | |
| 89 | auto get_status_if_no_outputs = [&]() -> Status |
| 90 | { |
| 91 | if (all_outs_finished) |
| 92 | { |
| 93 | for (auto & in : inputs) |
| 94 | in.close(); |
| 95 | |
| 96 | return Status::Finished; |
| 97 | } |
| 98 | |
| 99 | if (all_outs_full_or_unneeded) |
| 100 | { |
| 101 | for (auto & in : inputs) |
| 102 | in.setNotNeeded(); |
| 103 | |
| 104 | return Status::PortFull; |
| 105 | } |
| 106 | |
| 107 | /// Now, we pushed to output, and it must be full. |
| 108 | return Status::PortFull; |
| 109 | }; |
| 110 | |
| 111 | auto get_status_if_no_inputs = [&]() -> Status |
| 112 | { |
| 113 | if (all_inputs_finished) |
| 114 | { |
| 115 | for (auto & out : outputs) |
| 116 | out.finish(); |
| 117 | |
| 118 | return Status::Finished; |
| 119 | } |
| 120 | |
| 121 | return Status::NeedData; |
| 122 | }; |
| 123 | |
| 124 | /// Set all inputs needed in order to evenly process them. |
| 125 | /// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced, |
| 126 | /// some inputs can be skipped. |
| 127 | // auto set_all_unprocessed_inputs_needed = [&]() |
| 128 | // { |
| 129 | // for (; cur_input != inputs.end(); ++cur_input) |
| 130 | // if (!cur_input->isFinished()) |
| 131 | // cur_input->setNeeded(); |
| 132 | // }; |
| 133 | |
| 134 | while (!is_end_input() && !is_end_output()) |
| 135 | { |
| 136 | auto output = get_next_out(); |
| 137 | auto input = get_next_input(); |
| 138 | |
| 139 | if (output == outputs.end()) |
| 140 | return get_status_if_no_outputs(); |
| 141 | |
| 142 | |
| 143 | if (input == inputs.end()) |
| 144 | return get_status_if_no_inputs(); |
| 145 | |
| 146 | output->push(input->pull()); |
| 147 | } |
| 148 | |
| 149 | if (is_end_input()) |
| 150 | return get_status_if_no_outputs(); |
| 151 | |
| 152 | /// cur_input == inputs_end() |
| 153 | return get_status_if_no_inputs(); |
| 154 | } |
| 155 | |
| 156 | IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs) |
| 157 | { |
| 158 | if (!initialized) |
| 159 | { |
| 160 | initialized = true; |
| 161 | |
| 162 | for (auto & input : inputs) |
| 163 | { |
| 164 | input.setNeeded(); |
| 165 | input_ports.push_back({.port = &input, .status = InputStatus::NotActive}); |
| 166 | } |
| 167 | |
| 168 | for (auto & output : outputs) |
| 169 | output_ports.push_back({.port = &output, .status = OutputStatus::NotActive}); |
| 170 | } |
| 171 | |
| 172 | for (auto & output_number : updated_outputs) |
| 173 | { |
| 174 | auto & output = output_ports[output_number]; |
| 175 | if (output.port->isFinished()) |
| 176 | { |
| 177 | if (output.status != OutputStatus::Finished) |
| 178 | { |
| 179 | ++num_finished_outputs; |
| 180 | output.status = OutputStatus::Finished; |
| 181 | } |
| 182 | |
| 183 | continue; |
| 184 | } |
| 185 | |
| 186 | if (output.port->canPush()) |
| 187 | { |
| 188 | if (output.status != OutputStatus::NeedData) |
| 189 | { |
| 190 | output.status = OutputStatus::NeedData; |
| 191 | waiting_outputs.push(output_number); |
| 192 | } |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | if (num_finished_outputs == outputs.size()) |
| 197 | { |
| 198 | for (auto & input : inputs) |
| 199 | input.close(); |
| 200 | |
| 201 | return Status::Finished; |
| 202 | } |
| 203 | |
| 204 | for (auto & input_number : updated_inputs) |
| 205 | { |
| 206 | auto & input = input_ports[input_number]; |
| 207 | if (input.port->isFinished()) |
| 208 | { |
| 209 | if (input.status != InputStatus::Finished) |
| 210 | { |
| 211 | input.status = InputStatus::Finished; |
| 212 | ++num_finished_inputs; |
| 213 | } |
| 214 | continue; |
| 215 | } |
| 216 | |
| 217 | if (input.port->hasData()) |
| 218 | { |
| 219 | if (input.status != InputStatus::HasData) |
| 220 | { |
| 221 | input.status = InputStatus::HasData; |
| 222 | inputs_with_data.push(input_number); |
| 223 | } |
| 224 | } |
| 225 | } |
| 226 | |
| 227 | while (!waiting_outputs.empty() && !inputs_with_data.empty()) |
| 228 | { |
| 229 | auto & waiting_output = output_ports[waiting_outputs.front()]; |
| 230 | waiting_outputs.pop(); |
| 231 | |
| 232 | auto & input_with_data = input_ports[inputs_with_data.front()]; |
| 233 | inputs_with_data.pop(); |
| 234 | |
| 235 | waiting_output.port->pushData(input_with_data.port->pullData()); |
| 236 | input_with_data.status = InputStatus::NotActive; |
| 237 | waiting_output.status = OutputStatus::NotActive; |
| 238 | |
| 239 | if (input_with_data.port->isFinished()) |
| 240 | { |
| 241 | input_with_data.status = InputStatus::Finished; |
| 242 | ++num_finished_inputs; |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | if (num_finished_inputs == inputs.size()) |
| 247 | { |
| 248 | for (auto & output : outputs) |
| 249 | output.finish(); |
| 250 | |
| 251 | return Status::Finished; |
| 252 | } |
| 253 | |
| 254 | if (!waiting_outputs.empty()) |
| 255 | return Status::NeedData; |
| 256 | |
| 257 | return Status::PortFull; |
| 258 | } |
| 259 | |
| 260 | } |
| 261 | |
| 262 |