1#include <Processors/IInflatingTransform.h>
2
3namespace DB
4{
5
6IInflatingTransform::IInflatingTransform(Block input_header, Block output_header)
7 : IProcessor({std::move(input_header)}, {std::move(output_header)})
8 , input(inputs.front()), output(outputs.front())
9{
10
11}
12
13IInflatingTransform::Status IInflatingTransform::prepare()
14{
15 /// Check can output.
16
17 if (output.isFinished())
18 {
19 input.close();
20 return Status::Finished;
21 }
22
23 if (!output.canPush())
24 {
25 input.setNotNeeded();
26 return Status::PortFull;
27 }
28
29 /// Output if has data.
30 if (generated)
31 {
32 output.push(std::move(current_chunk));
33 generated = false;
34 }
35
36 if (can_generate)
37 return Status::Ready;
38
39 /// Check can input.
40 if (!has_input)
41 {
42 if (input.isFinished())
43 {
44 output.finish();
45 return Status::Finished;
46 }
47
48 input.setNeeded();
49
50 if (!input.hasData())
51 return Status::NeedData;
52
53 current_chunk = input.pull();
54 has_input = true;
55 }
56
57 /// Now transform.
58 return Status::Ready;
59}
60
61void IInflatingTransform::work()
62{
63 if (can_generate)
64 {
65 if (generated)
66 throw Exception("IInflatingTransform cannot consume chunk because it already was generated",
67 ErrorCodes::LOGICAL_ERROR);
68
69 current_chunk = generate();
70 generated = true;
71 can_generate = canGenerate();
72 }
73 else
74 {
75 if (!has_input)
76 throw Exception("IInflatingTransform cannot consume chunk because it wasn't read",
77 ErrorCodes::LOGICAL_ERROR);
78
79 consume(std::move(current_chunk));
80 has_input = false;
81 can_generate = canGenerate();
82 }
83}
84
85}
86