1#include <Processors/Formats/IOutputFormat.h>
2#include <IO/WriteBuffer.h>
3
4
5namespace DB
6{
7
8IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_)
9 : IProcessor({header_, header_, header_}, {}), out(out_)
10{
11}
12
13IOutputFormat::Status IOutputFormat::prepare()
14{
15 if (has_input)
16 return Status::Ready;
17
18 for (auto kind : {Main, Totals, Extremes})
19 {
20 auto & input = getPort(kind);
21
22 if (kind != Main && !input.isConnected())
23 continue;
24
25 if (input.isFinished())
26 continue;
27
28 input.setNeeded();
29
30 if (!input.hasData())
31 return Status::NeedData;
32
33 current_chunk = input.pull();
34 current_block_kind = kind;
35 has_input = true;
36 return Status::Ready;
37 }
38
39 finished = true;
40
41 if (!finalized)
42 return Status::Ready;
43
44 return Status::Finished;
45}
46
47void IOutputFormat::work()
48{
49 if (finished && !finalized)
50 {
51 finalize();
52 finalized = true;
53 return;
54 }
55
56 switch (current_block_kind)
57 {
58 case Main:
59 consume(std::move(current_chunk));
60 break;
61 case Totals:
62 consumeTotals(std::move(current_chunk));
63 break;
64 case Extremes:
65 consumeExtremes(std::move(current_chunk));
66 break;
67 }
68
69 has_input = false;
70}
71
72void IOutputFormat::flush()
73{
74 out.next();
75}
76
77}
78
79