| 1 | #include <Processors/Formats/IOutputFormat.h> | 
|---|---|
| 2 | #include <IO/WriteBuffer.h> | 
| 3 | |
| 4 | |
| 5 | namespace DB | 
| 6 | { | 
| 7 | |
| 8 | IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_) | 
| 9 | : IProcessor({header_, header_, header_}, {}), out(out_) | 
| 10 | { | 
| 11 | } | 
| 12 | |
| 13 | IOutputFormat::Status IOutputFormat::prepare() | 
| 14 | { | 
| 15 | if (has_input) | 
| 16 | return Status::Ready; | 
| 17 | |
| 18 | for (auto kind : {Main, Totals, Extremes}) | 
| 19 | { | 
| 20 | auto & input = getPort(kind); | 
| 21 | |
| 22 | if (kind != Main && !input.isConnected()) | 
| 23 | continue; | 
| 24 | |
| 25 | if (input.isFinished()) | 
| 26 | continue; | 
| 27 | |
| 28 | input.setNeeded(); | 
| 29 | |
| 30 | if (!input.hasData()) | 
| 31 | return Status::NeedData; | 
| 32 | |
| 33 | current_chunk = input.pull(); | 
| 34 | current_block_kind = kind; | 
| 35 | has_input = true; | 
| 36 | return Status::Ready; | 
| 37 | } | 
| 38 | |
| 39 | finished = true; | 
| 40 | |
| 41 | if (!finalized) | 
| 42 | return Status::Ready; | 
| 43 | |
| 44 | return Status::Finished; | 
| 45 | } | 
| 46 | |
| 47 | void IOutputFormat::work() | 
| 48 | { | 
| 49 | if (finished && !finalized) | 
| 50 | { | 
| 51 | finalize(); | 
| 52 | finalized = true; | 
| 53 | return; | 
| 54 | } | 
| 55 | |
| 56 | switch (current_block_kind) | 
| 57 | { | 
| 58 | case Main: | 
| 59 | consume(std::move(current_chunk)); | 
| 60 | break; | 
| 61 | case Totals: | 
| 62 | consumeTotals(std::move(current_chunk)); | 
| 63 | break; | 
| 64 | case Extremes: | 
| 65 | consumeExtremes(std::move(current_chunk)); | 
| 66 | break; | 
| 67 | } | 
| 68 | |
| 69 | has_input = false; | 
| 70 | } | 
| 71 | |
| 72 | void IOutputFormat::flush() | 
| 73 | { | 
| 74 | out.next(); | 
| 75 | } | 
| 76 | |
| 77 | } | 
| 78 | |
| 79 | 
