1 | #include <Processors/ISource.h> |
---|---|
2 | |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | ISource::ISource(Block header) |
8 | : IProcessor({}, {std::move(header)}), output(outputs.front()) |
9 | { |
10 | } |
11 | |
12 | ISource::Status ISource::prepare() |
13 | { |
14 | if (finished) |
15 | { |
16 | output.finish(); |
17 | return Status::Finished; |
18 | } |
19 | |
20 | /// Check can output. |
21 | if (output.isFinished()) |
22 | return Status::Finished; |
23 | |
24 | if (!output.canPush()) |
25 | return Status::PortFull; |
26 | |
27 | if (!has_input) |
28 | return Status::Ready; |
29 | |
30 | output.pushData(std::move(current_chunk)); |
31 | has_input = false; |
32 | |
33 | if (got_exception) |
34 | { |
35 | finished = true; |
36 | output.finish(); |
37 | return Status::Finished; |
38 | } |
39 | |
40 | /// Now, we pushed to output, and it must be full. |
41 | return Status::PortFull; |
42 | } |
43 | |
44 | void ISource::work() |
45 | { |
46 | try |
47 | { |
48 | current_chunk.chunk = generate(); |
49 | if (!current_chunk.chunk) |
50 | finished = true; |
51 | else |
52 | has_input = true; |
53 | } |
54 | catch (...) |
55 | { |
56 | finished = true; |
57 | throw; |
58 | } |
59 | // { |
60 | // current_chunk = std::current_exception(); |
61 | // has_input = true; |
62 | // got_exception = true; |
63 | // } |
64 | } |
65 | |
66 | } |
67 | |
68 |