1#include <Processors/ForkProcessor.h>
2
3
4namespace DB
5{
6
7ForkProcessor::Status ForkProcessor::prepare()
8{
9 auto & input = inputs.front();
10
11 /// Check can output.
12
13 bool all_finished = true;
14 bool all_can_push = true;
15 size_t num_active_outputs = 0;
16
17 for (const auto & output : outputs)
18 {
19 if (!output.isFinished())
20 {
21 all_finished = false;
22 ++num_active_outputs;
23
24 /// The order is important.
25 if (!output.canPush())
26 all_can_push = false;
27 }
28 }
29
30 if (all_finished)
31 {
32 input.close();
33 return Status::Finished;
34 }
35
36 if (!all_can_push)
37 {
38 input.setNotNeeded();
39 return Status::PortFull;
40 }
41
42 /// Check can input.
43
44 if (input.isFinished())
45 {
46 for (auto & output : outputs)
47 output.finish();
48
49 return Status::Finished;
50 }
51
52 input.setNeeded();
53
54 if (!input.hasData())
55 return Status::NeedData;
56
57 /// Move data.
58
59 auto data = input.pull();
60 size_t num_processed_outputs = 0;
61
62 for (auto & output : outputs)
63 {
64 if (!output.isFinished()) /// Skip finished outputs.
65 {
66 ++num_processed_outputs;
67 if (num_processed_outputs == num_active_outputs)
68 output.push(std::move(data)); /// Can push because no full or unneeded outputs.
69 else
70 output.push(data.clone());
71 }
72 }
73
74 /// Now, we pulled from input. It must be empty.
75 return Status::NeedData;
76}
77
78}
79
80
81
82