1 | #include <Processors/IAccumulatingTransform.h> |
---|---|
2 | |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | IAccumulatingTransform::IAccumulatingTransform(Block input_header, Block output_header) |
8 | : IProcessor({std::move(input_header)}, {std::move(output_header)}), |
9 | input(inputs.front()), output(outputs.front()) |
10 | { |
11 | } |
12 | |
13 | IAccumulatingTransform::Status IAccumulatingTransform::prepare() |
14 | { |
15 | /// Check can output. |
16 | if (output.isFinished()) |
17 | { |
18 | input.close(); |
19 | return Status::Finished; |
20 | } |
21 | |
22 | if (!output.canPush()) |
23 | { |
24 | input.setNotNeeded(); |
25 | return Status::PortFull; |
26 | } |
27 | |
28 | /// Output if has data. |
29 | if (current_output_chunk) |
30 | output.push(std::move(current_output_chunk)); |
31 | |
32 | if (finished_generate) |
33 | { |
34 | output.finish(); |
35 | return Status::Finished; |
36 | } |
37 | |
38 | /// Generate output block. |
39 | if (input.isFinished()) |
40 | { |
41 | finished_input = true; |
42 | return Status::Ready; |
43 | } |
44 | |
45 | /// Close input if flag was set manually. |
46 | if (finished_input) |
47 | { |
48 | input.close(); |
49 | return Status::Ready; |
50 | } |
51 | |
52 | /// Check can input. |
53 | if (!has_input) |
54 | { |
55 | input.setNeeded(); |
56 | if (!input.hasData()) |
57 | return Status::NeedData; |
58 | |
59 | current_input_chunk = input.pull(); |
60 | has_input = true; |
61 | } |
62 | |
63 | return Status::Ready; |
64 | } |
65 | |
66 | void IAccumulatingTransform::work() |
67 | { |
68 | if (!finished_input) |
69 | { |
70 | consume(std::move(current_input_chunk)); |
71 | has_input = false; |
72 | } |
73 | else |
74 | { |
75 | current_output_chunk = generate(); |
76 | if (!current_output_chunk) |
77 | finished_generate = true; |
78 | } |
79 | } |
80 | |
81 | void IAccumulatingTransform::setReadyChunk(Chunk chunk) |
82 | { |
83 | if (current_output_chunk) |
84 | throw Exception("IAccumulatingTransform already has input. Cannot set another chunk. " |
85 | "Probably, setReadyChunk method was called twice per consume().", ErrorCodes::LOGICAL_ERROR); |
86 | |
87 | current_output_chunk = std::move(chunk); |
88 | } |
89 | |
90 | } |
91 | |
92 |