1#include <DataStreams/NativeBlockInputStream.h>
2#include <DataStreams/NativeBlockOutputStream.h>
3#include <Formats/FormatFactory.h>
4#include <Processors/Formats/IInputFormat.h>
5#include <Processors/Formats/IOutputFormat.h>
6
7
8namespace DB
9{
10
11class NativeInputFormatFromNativeBlockInputStream : public IInputFormat
12{
13public:
14 NativeInputFormatFromNativeBlockInputStream(const Block & header, ReadBuffer & in_)
15 : IInputFormat(header, in_)
16 , stream(std::make_shared<NativeBlockInputStream>(in, header, 0))
17 {
18 }
19
20 String getName() const override { return "NativeInputFormatFromNativeBlockInputStream"; }
21
22protected:
23 void resetParser() override
24 {
25 IInputFormat::resetParser();
26 stream->resetParser();
27 read_prefix = false;
28 read_suffix = false;
29 }
30
31
32 Chunk generate() override
33 {
34 /// TODO: do something with totals and extremes.
35
36 if (!read_prefix)
37 {
38 stream->readPrefix();
39 read_prefix = true;
40 }
41
42 auto block = stream->read();
43 if (!block)
44 {
45 if (!read_suffix)
46 {
47 stream->readSuffix();
48 read_suffix = true;
49 }
50
51 return Chunk();
52 }
53
54 assertBlocksHaveEqualStructure(getPort().getHeader(), block, getName());
55 block.checkNumberOfRows();
56
57 UInt64 num_rows = block.rows();
58 return Chunk(block.getColumns(), num_rows);
59 }
60
61private:
62 std::shared_ptr<NativeBlockInputStream> stream;
63 bool read_prefix = false;
64 bool read_suffix = false;
65};
66
67
68class NativeOutputFormatFromNativeBlockOutputStream : public IOutputFormat
69{
70public:
71 NativeOutputFormatFromNativeBlockOutputStream(const Block & header, WriteBuffer & out_)
72 : IOutputFormat(header, out_)
73 , stream(std::make_shared<NativeBlockOutputStream>(out, 0, header))
74 {
75 }
76
77 String getName() const override { return "NativeOutputFormatFromNativeBlockOutputStream"; }
78
79 void setRowsBeforeLimit(size_t rows_before_limit) override
80 {
81 stream->setRowsBeforeLimit(rows_before_limit);
82 }
83
84 void onProgress(const Progress & progress) override
85 {
86 stream->onProgress(progress);
87 }
88
89 std::string getContentType() const override
90 {
91 return stream->getContentType();
92 }
93
94protected:
95 void consume(Chunk chunk) override
96 {
97 writePrefixIfNot();
98
99 if (chunk)
100 {
101
102 auto block = getPort(PortKind::Main).getHeader();
103 block.setColumns(chunk.detachColumns());
104 stream->write(block);
105 }
106 }
107
108 void consumeTotals(Chunk chunk) override
109 {
110 writePrefixIfNot();
111
112 auto block = getPort(PortKind::Totals).getHeader();
113 block.setColumns(chunk.detachColumns());
114 stream->setTotals(block);
115 }
116
117 void consumeExtremes(Chunk chunk) override
118 {
119 writePrefixIfNot();
120
121 auto block = getPort(PortKind::Extremes).getHeader();
122 block.setColumns(chunk.detachColumns());
123 stream->setExtremes(block);
124 }
125
126 void finalize() override
127 {
128 writePrefixIfNot();
129 writeSuffixIfNot();
130 }
131
132private:
133 std::shared_ptr<NativeBlockOutputStream> stream;
134 bool prefix_written = false;
135 bool suffix_written = false;
136
137 void writePrefixIfNot()
138 {
139 if (!prefix_written)
140 stream->writePrefix();
141
142 prefix_written = true;
143 }
144
145 void writeSuffixIfNot()
146 {
147 if (!suffix_written)
148 stream->writeSuffix();
149
150 suffix_written = true;
151 }
152};
153
154void registerInputFormatProcessorNative(FormatFactory & factory)
155{
156 factory.registerInputFormatProcessor("Native", [](
157 ReadBuffer & buf,
158 const Block & sample,
159 const RowInputFormatParams &,
160 const FormatSettings &)
161 {
162 return std::make_shared<NativeInputFormatFromNativeBlockInputStream>(sample, buf);
163 });
164}
165
166void registerOutputFormatProcessorNative(FormatFactory & factory)
167{
168 factory.registerOutputFormatProcessor("Native", [](
169 WriteBuffer & buf,
170 const Block & sample,
171 FormatFactory::WriteCallback,
172 const FormatSettings &)
173 {
174 return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf);
175 });
176}
177
178}
179