1#include <Processors/ISource.h>
2
3
4namespace DB
5{
6
7ISource::ISource(Block header)
8 : IProcessor({}, {std::move(header)}), output(outputs.front())
9{
10}
11
12ISource::Status ISource::prepare()
13{
14 if (finished)
15 {
16 output.finish();
17 return Status::Finished;
18 }
19
20 /// Check can output.
21 if (output.isFinished())
22 return Status::Finished;
23
24 if (!output.canPush())
25 return Status::PortFull;
26
27 if (!has_input)
28 return Status::Ready;
29
30 output.pushData(std::move(current_chunk));
31 has_input = false;
32
33 if (got_exception)
34 {
35 finished = true;
36 output.finish();
37 return Status::Finished;
38 }
39
40 /// Now, we pushed to output, and it must be full.
41 return Status::PortFull;
42}
43
44void ISource::work()
45{
46 try
47 {
48 current_chunk.chunk = generate();
49 if (!current_chunk.chunk)
50 finished = true;
51 else
52 has_input = true;
53 }
54 catch (...)
55 {
56 finished = true;
57 throw;
58 }
59// {
60// current_chunk = std::current_exception();
61// has_input = true;
62// got_exception = true;
63// }
64}
65
66}
67
68