| 1 | #include <Common/EventCounter.h> | 
|---|
| 2 | #include <Common/ThreadPool.h> | 
|---|
| 3 | #include <Processors/Executors/ParallelPipelineExecutor.h> | 
|---|
| 4 | #include <Processors/Executors/traverse.h> | 
|---|
| 5 |  | 
|---|
| 6 |  | 
|---|
| 7 | namespace DB | 
|---|
| 8 | { | 
|---|
| 9 | // | 
|---|
| 10 | //ParallelPipelineExecutor::ParallelPipelineExecutor(const std::vector<ProcessorPtr> & processors, ThreadPool & pool) | 
|---|
| 11 | //    : processors(processors), pool(pool) | 
|---|
| 12 | //{ | 
|---|
| 13 | //} | 
|---|
| 14 | // | 
|---|
| 15 | // | 
|---|
| 16 | //ParallelPipelineExecutor::Status ParallelPipelineExecutor::prepare() | 
|---|
| 17 | //{ | 
|---|
| 18 | //    current_processor = nullptr; | 
|---|
| 19 | // | 
|---|
| 20 | //    bool has_someone_to_wait = false; | 
|---|
| 21 | // | 
|---|
| 22 | //    for (auto & element : processors) | 
|---|
| 23 | //    { | 
|---|
| 24 | //        traverse(*element, | 
|---|
| 25 | //            [&] (IProcessor & processor) | 
|---|
| 26 | //            { | 
|---|
| 27 | //                { | 
|---|
| 28 | //                    std::lock_guard lock(mutex); | 
|---|
| 29 | //                    if (active_processors.count(&processor)) | 
|---|
| 30 | //                    { | 
|---|
| 31 | //                        has_someone_to_wait = true; | 
|---|
| 32 | //                        return Status::Wait; | 
|---|
| 33 | //                    } | 
|---|
| 34 | //                } | 
|---|
| 35 | // | 
|---|
| 36 | //                Status status = processor.prepare(); | 
|---|
| 37 | // | 
|---|
| 38 | //                if (status == Status::Wait) | 
|---|
| 39 | //                    has_someone_to_wait = true; | 
|---|
| 40 | // | 
|---|
| 41 | //                if (status == Status::Ready || status == Status::Async) | 
|---|
| 42 | //                { | 
|---|
| 43 | //                    current_processor = &processor; | 
|---|
| 44 | //                    current_status = status; | 
|---|
| 45 | //                } | 
|---|
| 46 | // | 
|---|
| 47 | //                return status; | 
|---|
| 48 | //            }); | 
|---|
| 49 | // | 
|---|
| 50 | //        if (current_processor) | 
|---|
| 51 | //            break; | 
|---|
| 52 | //    } | 
|---|
| 53 | // | 
|---|
| 54 | //    if (current_processor) | 
|---|
| 55 | //        return Status::Async; | 
|---|
| 56 | // | 
|---|
| 57 | //    if (has_someone_to_wait) | 
|---|
| 58 | //        return Status::Wait; | 
|---|
| 59 | // | 
|---|
| 60 | //    for (auto & element : processors) | 
|---|
| 61 | //    { | 
|---|
| 62 | //        if (element->prepare() == Status::NeedData) | 
|---|
| 63 | //            throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 64 | //        if (element->prepare() == Status::PortFull) | 
|---|
| 65 | //            throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 66 | //    } | 
|---|
| 67 | // | 
|---|
| 68 | //    return Status::Finished; | 
|---|
| 69 | //} | 
|---|
| 70 | // | 
|---|
| 71 | // | 
|---|
| 72 | //void ParallelPipelineExecutor::schedule(EventCounter & watch) | 
|---|
| 73 | //{ | 
|---|
| 74 | //    if (!current_processor) | 
|---|
| 75 | //        throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 76 | // | 
|---|
| 77 | //    if (current_status == Status::Async) | 
|---|
| 78 | //    { | 
|---|
| 79 | //        current_processor->schedule(watch); | 
|---|
| 80 | //    } | 
|---|
| 81 | //    else | 
|---|
| 82 | //    { | 
|---|
| 83 | //        { | 
|---|
| 84 | //            std::lock_guard lock(mutex); | 
|---|
| 85 | //            active_processors.insert(current_processor); | 
|---|
| 86 | //        } | 
|---|
| 87 | // | 
|---|
| 88 | //        pool.scheduleOrThrowOnError([processor = current_processor, &watch, this] | 
|---|
| 89 | //        { | 
|---|
| 90 | //            processor->work(); | 
|---|
| 91 | //            { | 
|---|
| 92 | //                std::lock_guard lock(mutex); | 
|---|
| 93 | //                active_processors.erase(processor); | 
|---|
| 94 | //            } | 
|---|
| 95 | //            watch.notify(); | 
|---|
| 96 | //        }); | 
|---|
| 97 | //    } | 
|---|
| 98 | //} | 
|---|
| 99 |  | 
|---|
| 100 | } | 
|---|
| 101 |  | 
|---|
| 102 |  | 
|---|