1#include <Processors/Executors/SequentialPipelineExecutor.h>
2#include <Processors/Executors/traverse.h>
3
4
5namespace DB
6{
7
8//SequentialPipelineExecutor::SequentialPipelineExecutor(const Processors & processors)
9// : processors(processors)
10//{
11//}
12//
13//
14//SequentialPipelineExecutor::Status SequentialPipelineExecutor::prepare()
15//{
16// current_processor = nullptr;
17//
18// bool has_someone_to_wait = false;
19// Status found_status = Status::Finished;
20//
21// for (auto & element : processors)
22// {
23// traverse(*element,
24// [&] (IProcessor & processor)
25// {
26// Status status = processor.prepare();
27//
28// if (status == Status::Wait)
29// has_someone_to_wait = true;
30//
31// if (status == Status::Ready || status == Status::Async)
32// {
33// current_processor = &processor;
34// found_status = status;
35// }
36//
37// return status;
38// });
39//
40// if (current_processor)
41// break;
42// }
43//
44// if (current_processor)
45// return found_status;
46// if (has_someone_to_wait)
47// return Status::Wait;
48//
49// for (auto & element : processors)
50// {
51// if (element->prepare() == Status::NeedData)
52// throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR);
53// if (element->prepare() == Status::PortFull)
54// throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR);
55// }
56//
57// return Status::Finished;
58//}
59//
60//
61//void SequentialPipelineExecutor::work()
62//{
63// if (!current_processor)
64// throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR);
65//
66// current_processor->work();
67//}
68//
69//
70//void SequentialPipelineExecutor::schedule(EventCounter & watch)
71//{
72// if (!current_processor)
73// throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR);
74//
75// current_processor->schedule(watch);
76//}
77
78}
79
80