1 | #include <Common/EventCounter.h> |
2 | #include <Common/ThreadPool.h> |
3 | #include <Processors/Executors/ParallelPipelineExecutor.h> |
4 | #include <Processors/Executors/traverse.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | // |
10 | //ParallelPipelineExecutor::ParallelPipelineExecutor(const std::vector<ProcessorPtr> & processors, ThreadPool & pool) |
11 | // : processors(processors), pool(pool) |
12 | //{ |
13 | //} |
14 | // |
15 | // |
16 | //ParallelPipelineExecutor::Status ParallelPipelineExecutor::prepare() |
17 | //{ |
18 | // current_processor = nullptr; |
19 | // |
20 | // bool has_someone_to_wait = false; |
21 | // |
22 | // for (auto & element : processors) |
23 | // { |
24 | // traverse(*element, |
25 | // [&] (IProcessor & processor) |
26 | // { |
27 | // { |
28 | // std::lock_guard lock(mutex); |
29 | // if (active_processors.count(&processor)) |
30 | // { |
31 | // has_someone_to_wait = true; |
32 | // return Status::Wait; |
33 | // } |
34 | // } |
35 | // |
36 | // Status status = processor.prepare(); |
37 | // |
38 | // if (status == Status::Wait) |
39 | // has_someone_to_wait = true; |
40 | // |
41 | // if (status == Status::Ready || status == Status::Async) |
42 | // { |
43 | // current_processor = &processor; |
44 | // current_status = status; |
45 | // } |
46 | // |
47 | // return status; |
48 | // }); |
49 | // |
50 | // if (current_processor) |
51 | // break; |
52 | // } |
53 | // |
54 | // if (current_processor) |
55 | // return Status::Async; |
56 | // |
57 | // if (has_someone_to_wait) |
58 | // return Status::Wait; |
59 | // |
60 | // for (auto & element : processors) |
61 | // { |
62 | // if (element->prepare() == Status::NeedData) |
63 | // throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR); |
64 | // if (element->prepare() == Status::PortFull) |
65 | // throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR); |
66 | // } |
67 | // |
68 | // return Status::Finished; |
69 | //} |
70 | // |
71 | // |
72 | //void ParallelPipelineExecutor::schedule(EventCounter & watch) |
73 | //{ |
74 | // if (!current_processor) |
75 | // throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); |
76 | // |
77 | // if (current_status == Status::Async) |
78 | // { |
79 | // current_processor->schedule(watch); |
80 | // } |
81 | // else |
82 | // { |
83 | // { |
84 | // std::lock_guard lock(mutex); |
85 | // active_processors.insert(current_processor); |
86 | // } |
87 | // |
88 | // pool.scheduleOrThrowOnError([processor = current_processor, &watch, this] |
89 | // { |
90 | // processor->work(); |
91 | // { |
92 | // std::lock_guard lock(mutex); |
93 | // active_processors.erase(processor); |
94 | // } |
95 | // watch.notify(); |
96 | // }); |
97 | // } |
98 | //} |
99 | |
100 | } |
101 | |
102 | |