1#include <Processors/ConcatProcessor.h>
2
3
4namespace DB
5{
6
7ConcatProcessor::Status ConcatProcessor::prepare()
8{
9 auto & output = outputs.front();
10
11 /// Check can output.
12
13 if (output.isFinished())
14 {
15 for (; current_input != inputs.end(); ++current_input)
16 current_input->close();
17
18 return Status::Finished;
19 }
20
21 if (!output.isNeeded())
22 {
23 if (current_input != inputs.end())
24 current_input->setNotNeeded();
25
26 return Status::PortFull;
27 }
28
29 if (!output.canPush())
30 return Status::PortFull;
31
32 /// Check can input.
33
34 if (current_input == inputs.end())
35 return Status::Finished;
36
37 if (current_input->isFinished())
38 {
39 ++current_input;
40 if (current_input == inputs.end())
41 {
42 output.finish();
43 return Status::Finished;
44 }
45 }
46
47 auto & input = *current_input;
48
49 input.setNeeded();
50
51 if (!input.hasData())
52 return Status::NeedData;
53
54 /// Move data.
55 output.push(input.pull());
56
57 /// Now, we pushed to output, and it must be full.
58 return Status::PortFull;
59}
60
61}
62
63
64