1#include <Common/EventCounter.h>
2#include <Common/ThreadPool.h>
3#include <Processors/Executors/ParallelPipelineExecutor.h>
4#include <Processors/Executors/traverse.h>
5
6
7namespace DB
8{
9//
10//ParallelPipelineExecutor::ParallelPipelineExecutor(const std::vector<ProcessorPtr> & processors, ThreadPool & pool)
11// : processors(processors), pool(pool)
12//{
13//}
14//
15//
16//ParallelPipelineExecutor::Status ParallelPipelineExecutor::prepare()
17//{
18// current_processor = nullptr;
19//
20// bool has_someone_to_wait = false;
21//
22// for (auto & element : processors)
23// {
24// traverse(*element,
25// [&] (IProcessor & processor)
26// {
27// {
28// std::lock_guard lock(mutex);
29// if (active_processors.count(&processor))
30// {
31// has_someone_to_wait = true;
32// return Status::Wait;
33// }
34// }
35//
36// Status status = processor.prepare();
37//
38// if (status == Status::Wait)
39// has_someone_to_wait = true;
40//
41// if (status == Status::Ready || status == Status::Async)
42// {
43// current_processor = &processor;
44// current_status = status;
45// }
46//
47// return status;
48// });
49//
50// if (current_processor)
51// break;
52// }
53//
54// if (current_processor)
55// return Status::Async;
56//
57// if (has_someone_to_wait)
58// return Status::Wait;
59//
60// for (auto & element : processors)
61// {
62// if (element->prepare() == Status::NeedData)
63// throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR);
64// if (element->prepare() == Status::PortFull)
65// throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR);
66// }
67//
68// return Status::Finished;
69//}
70//
71//
72//void ParallelPipelineExecutor::schedule(EventCounter & watch)
73//{
74// if (!current_processor)
75// throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR);
76//
77// if (current_status == Status::Async)
78// {
79// current_processor->schedule(watch);
80// }
81// else
82// {
83// {
84// std::lock_guard lock(mutex);
85// active_processors.insert(current_processor);
86// }
87//
88// pool.scheduleOrThrowOnError([processor = current_processor, &watch, this]
89// {
90// processor->work();
91// {
92// std::lock_guard lock(mutex);
93// active_processors.erase(processor);
94// }
95// watch.notify();
96// });
97// }
98//}
99
100}
101
102