1#include <Processors/IAccumulatingTransform.h>
2
3
4namespace DB
5{
6
7IAccumulatingTransform::IAccumulatingTransform(Block input_header, Block output_header)
8 : IProcessor({std::move(input_header)}, {std::move(output_header)}),
9 input(inputs.front()), output(outputs.front())
10{
11}
12
13IAccumulatingTransform::Status IAccumulatingTransform::prepare()
14{
15 /// Check can output.
16 if (output.isFinished())
17 {
18 input.close();
19 return Status::Finished;
20 }
21
22 if (!output.canPush())
23 {
24 input.setNotNeeded();
25 return Status::PortFull;
26 }
27
28 /// Output if has data.
29 if (current_output_chunk)
30 output.push(std::move(current_output_chunk));
31
32 if (finished_generate)
33 {
34 output.finish();
35 return Status::Finished;
36 }
37
38 /// Generate output block.
39 if (input.isFinished())
40 {
41 finished_input = true;
42 return Status::Ready;
43 }
44
45 /// Close input if flag was set manually.
46 if (finished_input)
47 {
48 input.close();
49 return Status::Ready;
50 }
51
52 /// Check can input.
53 if (!has_input)
54 {
55 input.setNeeded();
56 if (!input.hasData())
57 return Status::NeedData;
58
59 current_input_chunk = input.pull();
60 has_input = true;
61 }
62
63 return Status::Ready;
64}
65
66void IAccumulatingTransform::work()
67{
68 if (!finished_input)
69 {
70 consume(std::move(current_input_chunk));
71 has_input = false;
72 }
73 else
74 {
75 current_output_chunk = generate();
76 if (!current_output_chunk)
77 finished_generate = true;
78 }
79}
80
81void IAccumulatingTransform::setReadyChunk(Chunk chunk)
82{
83 if (current_output_chunk)
84 throw Exception("IAccumulatingTransform already has input. Cannot set another chunk. "
85 "Probably, setReadyChunk method was called twice per consume().", ErrorCodes::LOGICAL_ERROR);
86
87 current_output_chunk = std::move(chunk);
88}
89
90}
91
92