1 | #include <Processors/Executors/SequentialPipelineExecutor.h> |
2 | #include <Processors/Executors/traverse.h> |
3 | |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | //SequentialPipelineExecutor::SequentialPipelineExecutor(const Processors & processors) |
9 | // : processors(processors) |
10 | //{ |
11 | //} |
12 | // |
13 | // |
14 | //SequentialPipelineExecutor::Status SequentialPipelineExecutor::prepare() |
15 | //{ |
16 | // current_processor = nullptr; |
17 | // |
18 | // bool has_someone_to_wait = false; |
19 | // Status found_status = Status::Finished; |
20 | // |
21 | // for (auto & element : processors) |
22 | // { |
23 | // traverse(*element, |
24 | // [&] (IProcessor & processor) |
25 | // { |
26 | // Status status = processor.prepare(); |
27 | // |
28 | // if (status == Status::Wait) |
29 | // has_someone_to_wait = true; |
30 | // |
31 | // if (status == Status::Ready || status == Status::Async) |
32 | // { |
33 | // current_processor = &processor; |
34 | // found_status = status; |
35 | // } |
36 | // |
37 | // return status; |
38 | // }); |
39 | // |
40 | // if (current_processor) |
41 | // break; |
42 | // } |
43 | // |
44 | // if (current_processor) |
45 | // return found_status; |
46 | // if (has_someone_to_wait) |
47 | // return Status::Wait; |
48 | // |
49 | // for (auto & element : processors) |
50 | // { |
51 | // if (element->prepare() == Status::NeedData) |
52 | // throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR); |
53 | // if (element->prepare() == Status::PortFull) |
54 | // throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR); |
55 | // } |
56 | // |
57 | // return Status::Finished; |
58 | //} |
59 | // |
60 | // |
61 | //void SequentialPipelineExecutor::work() |
62 | //{ |
63 | // if (!current_processor) |
64 | // throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); |
65 | // |
66 | // current_processor->work(); |
67 | //} |
68 | // |
69 | // |
70 | //void SequentialPipelineExecutor::schedule(EventCounter & watch) |
71 | //{ |
72 | // if (!current_processor) |
73 | // throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR); |
74 | // |
75 | // current_processor->schedule(watch); |
76 | //} |
77 | |
78 | } |
79 | |
80 | |