1#pragma once
2
3#include <string>
4#include <Processors/IProcessor.h>
5#include <IO/Progress.h>
6
7
8namespace DB
9{
10
11class WriteBuffer;
12
13/** Output format have three inputs and no outputs. It writes data from WriteBuffer.
14 *
15 * First input is for main resultset, second is for "totals" and third is for "extremes".
16 * It's not necessarily to connect "totals" or "extremes" ports (they may remain dangling).
17 *
18 * Data from input ports are pulled in order: first, from main input, then totals, then extremes.
19 *
20 * By default, data for "totals" and "extremes" is ignored.
21 */
22class IOutputFormat : public IProcessor
23{
24public:
25 enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
26
27protected:
28 WriteBuffer & out;
29
30 Chunk current_chunk;
31 PortKind current_block_kind = PortKind::Main;
32 bool has_input = false;
33 bool finished = false;
34 bool finalized = false;
35
36 virtual void consume(Chunk) = 0;
37 virtual void consumeTotals(Chunk) {}
38 virtual void consumeExtremes(Chunk) {}
39 virtual void finalize() {}
40
41public:
42 IOutputFormat(const Block & header_, WriteBuffer & out_);
43
44 Status prepare() override;
45 void work() override;
46
47 /// Flush output buffers if any.
48 virtual void flush();
49
50 /// Value for rows_before_limit_at_least field.
51 virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
52
53 /// Notify about progress. Method could be called from different threads.
54 /// Passed value are delta, that must be summarized.
55 virtual void onProgress(const Progress & /*progress*/) {}
56
57 /// Content-Type to set when sending HTTP response.
58 virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
59
60 InputPort & getPort(PortKind kind) { return *std::next(inputs.begin(), kind); }
61
62public:
63 /// Compatible to IBlockOutputStream interface
64
65 void write(const Block & block) { consume(Chunk(block.getColumns(), block.rows())); }
66
67 virtual void doWritePrefix() {}
68 virtual void doWriteSuffix() { finalize(); }
69
70 void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
71 void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
72};
73}
74
75