1 | #include <Processors/IInflatingTransform.h> |
---|---|
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | IInflatingTransform::IInflatingTransform(Block input_header, Block output_header) |
7 | : IProcessor({std::move(input_header)}, {std::move(output_header)}) |
8 | , input(inputs.front()), output(outputs.front()) |
9 | { |
10 | |
11 | } |
12 | |
13 | IInflatingTransform::Status IInflatingTransform::prepare() |
14 | { |
15 | /// Check can output. |
16 | |
17 | if (output.isFinished()) |
18 | { |
19 | input.close(); |
20 | return Status::Finished; |
21 | } |
22 | |
23 | if (!output.canPush()) |
24 | { |
25 | input.setNotNeeded(); |
26 | return Status::PortFull; |
27 | } |
28 | |
29 | /// Output if has data. |
30 | if (generated) |
31 | { |
32 | output.push(std::move(current_chunk)); |
33 | generated = false; |
34 | } |
35 | |
36 | if (can_generate) |
37 | return Status::Ready; |
38 | |
39 | /// Check can input. |
40 | if (!has_input) |
41 | { |
42 | if (input.isFinished()) |
43 | { |
44 | output.finish(); |
45 | return Status::Finished; |
46 | } |
47 | |
48 | input.setNeeded(); |
49 | |
50 | if (!input.hasData()) |
51 | return Status::NeedData; |
52 | |
53 | current_chunk = input.pull(); |
54 | has_input = true; |
55 | } |
56 | |
57 | /// Now transform. |
58 | return Status::Ready; |
59 | } |
60 | |
61 | void IInflatingTransform::work() |
62 | { |
63 | if (can_generate) |
64 | { |
65 | if (generated) |
66 | throw Exception("IInflatingTransform cannot consume chunk because it already was generated", |
67 | ErrorCodes::LOGICAL_ERROR); |
68 | |
69 | current_chunk = generate(); |
70 | generated = true; |
71 | can_generate = canGenerate(); |
72 | } |
73 | else |
74 | { |
75 | if (!has_input) |
76 | throw Exception("IInflatingTransform cannot consume chunk because it wasn't read", |
77 | ErrorCodes::LOGICAL_ERROR); |
78 | |
79 | consume(std::move(current_chunk)); |
80 | has_input = false; |
81 | can_generate = canGenerate(); |
82 | } |
83 | } |
84 | |
85 | } |
86 |