1 | #include <Processors/Formats/IOutputFormat.h> |
---|---|
2 | #include <IO/WriteBuffer.h> |
3 | |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_) |
9 | : IProcessor({header_, header_, header_}, {}), out(out_) |
10 | { |
11 | } |
12 | |
13 | IOutputFormat::Status IOutputFormat::prepare() |
14 | { |
15 | if (has_input) |
16 | return Status::Ready; |
17 | |
18 | for (auto kind : {Main, Totals, Extremes}) |
19 | { |
20 | auto & input = getPort(kind); |
21 | |
22 | if (kind != Main && !input.isConnected()) |
23 | continue; |
24 | |
25 | if (input.isFinished()) |
26 | continue; |
27 | |
28 | input.setNeeded(); |
29 | |
30 | if (!input.hasData()) |
31 | return Status::NeedData; |
32 | |
33 | current_chunk = input.pull(); |
34 | current_block_kind = kind; |
35 | has_input = true; |
36 | return Status::Ready; |
37 | } |
38 | |
39 | finished = true; |
40 | |
41 | if (!finalized) |
42 | return Status::Ready; |
43 | |
44 | return Status::Finished; |
45 | } |
46 | |
47 | void IOutputFormat::work() |
48 | { |
49 | if (finished && !finalized) |
50 | { |
51 | finalize(); |
52 | finalized = true; |
53 | return; |
54 | } |
55 | |
56 | switch (current_block_kind) |
57 | { |
58 | case Main: |
59 | consume(std::move(current_chunk)); |
60 | break; |
61 | case Totals: |
62 | consumeTotals(std::move(current_chunk)); |
63 | break; |
64 | case Extremes: |
65 | consumeExtremes(std::move(current_chunk)); |
66 | break; |
67 | } |
68 | |
69 | has_input = false; |
70 | } |
71 | |
72 | void IOutputFormat::flush() |
73 | { |
74 | out.next(); |
75 | } |
76 | |
77 | } |
78 | |
79 |