| 1 | #include <Common/EventCounter.h> |
| 2 | #include <Common/ThreadPool.h> |
| 3 | #include <Processors/Executors/ParallelPipelineExecutor.h> |
| 4 | #include <Processors/Executors/traverse.h> |
| 5 | |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | // |
| 10 | //ParallelPipelineExecutor::ParallelPipelineExecutor(const std::vector<ProcessorPtr> & processors, ThreadPool & pool) |
| 11 | // : processors(processors), pool(pool) |
| 12 | //{ |
| 13 | //} |
| 14 | // |
| 15 | // |
| 16 | //ParallelPipelineExecutor::Status ParallelPipelineExecutor::prepare() |
| 17 | //{ |
| 18 | // current_processor = nullptr; |
| 19 | // |
| 20 | // bool has_someone_to_wait = false; |
| 21 | // |
| 22 | // for (auto & element : processors) |
| 23 | // { |
| 24 | // traverse(*element, |
| 25 | // [&] (IProcessor & processor) |
| 26 | // { |
| 27 | // { |
| 28 | // std::lock_guard lock(mutex); |
| 29 | // if (active_processors.count(&processor)) |
| 30 | // { |
| 31 | // has_someone_to_wait = true; |
| 32 | // return Status::Wait; |
| 33 | // } |
| 34 | // } |
| 35 | // |
| 36 | // Status status = processor.prepare(); |
| 37 | // |
| 38 | // if (status == Status::Wait) |
| 39 | // has_someone_to_wait = true; |
| 40 | // |
| 41 | // if (status == Status::Ready || status == Status::Async) |
| 42 | // { |
| 43 | // current_processor = &processor; |
| 44 | // current_status = status; |
| 45 | // } |
| 46 | // |
| 47 | // return status; |
| 48 | // }); |
| 49 | // |
| 50 | // if (current_processor) |
| 51 | // break; |
| 52 | // } |
| 53 | // |
| 54 | // if (current_processor) |
| 55 | // return Status::Async; |
| 56 | // |
| 57 | // if (has_someone_to_wait) |
| 58 | // return Status::Wait; |
| 59 | // |
| 60 | // for (auto & element : processors) |
| 61 | // { |
| 62 | // if (element->prepare() == Status::NeedData) |
| 63 | // throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR); |
| 64 | // if (element->prepare() == Status::PortFull) |
| 65 | // throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR); |
| 66 | // } |
| 67 | // |
| 68 | // return Status::Finished; |
| 69 | //} |
| 70 | // |
| 71 | // |
| 72 | //void ParallelPipelineExecutor::schedule(EventCounter & watch) |
| 73 | //{ |
| 74 | // if (!current_processor) |
| 75 | // throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); |
| 76 | // |
| 77 | // if (current_status == Status::Async) |
| 78 | // { |
| 79 | // current_processor->schedule(watch); |
| 80 | // } |
| 81 | // else |
| 82 | // { |
| 83 | // { |
| 84 | // std::lock_guard lock(mutex); |
| 85 | // active_processors.insert(current_processor); |
| 86 | // } |
| 87 | // |
| 88 | // pool.scheduleOrThrowOnError([processor = current_processor, &watch, this] |
| 89 | // { |
| 90 | // processor->work(); |
| 91 | // { |
| 92 | // std::lock_guard lock(mutex); |
| 93 | // active_processors.erase(processor); |
| 94 | // } |
| 95 | // watch.notify(); |
| 96 | // }); |
| 97 | // } |
| 98 | //} |
| 99 | |
| 100 | } |
| 101 | |
| 102 | |