| 1 | #include <Processors/Executors/SequentialPipelineExecutor.h> |
| 2 | #include <Processors/Executors/traverse.h> |
| 3 | |
| 4 | |
| 5 | namespace DB |
| 6 | { |
| 7 | |
| 8 | //SequentialPipelineExecutor::SequentialPipelineExecutor(const Processors & processors) |
| 9 | // : processors(processors) |
| 10 | //{ |
| 11 | //} |
| 12 | // |
| 13 | // |
| 14 | //SequentialPipelineExecutor::Status SequentialPipelineExecutor::prepare() |
| 15 | //{ |
| 16 | // current_processor = nullptr; |
| 17 | // |
| 18 | // bool has_someone_to_wait = false; |
| 19 | // Status found_status = Status::Finished; |
| 20 | // |
| 21 | // for (auto & element : processors) |
| 22 | // { |
| 23 | // traverse(*element, |
| 24 | // [&] (IProcessor & processor) |
| 25 | // { |
| 26 | // Status status = processor.prepare(); |
| 27 | // |
| 28 | // if (status == Status::Wait) |
| 29 | // has_someone_to_wait = true; |
| 30 | // |
| 31 | // if (status == Status::Ready || status == Status::Async) |
| 32 | // { |
| 33 | // current_processor = &processor; |
| 34 | // found_status = status; |
| 35 | // } |
| 36 | // |
| 37 | // return status; |
| 38 | // }); |
| 39 | // |
| 40 | // if (current_processor) |
| 41 | // break; |
| 42 | // } |
| 43 | // |
| 44 | // if (current_processor) |
| 45 | // return found_status; |
| 46 | // if (has_someone_to_wait) |
| 47 | // return Status::Wait; |
| 48 | // |
| 49 | // for (auto & element : processors) |
| 50 | // { |
| 51 | // if (element->prepare() == Status::NeedData) |
| 52 | // throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR); |
| 53 | // if (element->prepare() == Status::PortFull) |
| 54 | // throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR); |
| 55 | // } |
| 56 | // |
| 57 | // return Status::Finished; |
| 58 | //} |
| 59 | // |
| 60 | // |
| 61 | //void SequentialPipelineExecutor::work() |
| 62 | //{ |
| 63 | // if (!current_processor) |
| 64 | // throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); |
| 65 | // |
| 66 | // current_processor->work(); |
| 67 | //} |
| 68 | // |
| 69 | // |
| 70 | //void SequentialPipelineExecutor::schedule(EventCounter & watch) |
| 71 | //{ |
| 72 | // if (!current_processor) |
| 73 | // throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); |
| 74 | // |
| 75 | // current_processor->schedule(watch); |
| 76 | //} |
| 77 | |
| 78 | } |
| 79 | |
| 80 | |