1 | #include <DataStreams/NativeBlockInputStream.h> |
2 | #include <DataStreams/NativeBlockOutputStream.h> |
3 | #include <Formats/FormatFactory.h> |
4 | #include <Processors/Formats/IInputFormat.h> |
5 | #include <Processors/Formats/IOutputFormat.h> |
6 | |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | class NativeInputFormatFromNativeBlockInputStream : public IInputFormat |
12 | { |
13 | public: |
14 | NativeInputFormatFromNativeBlockInputStream(const Block & , ReadBuffer & in_) |
15 | : IInputFormat(header, in_) |
16 | , stream(std::make_shared<NativeBlockInputStream>(in, header, 0)) |
17 | { |
18 | } |
19 | |
20 | String getName() const override { return "NativeInputFormatFromNativeBlockInputStream" ; } |
21 | |
22 | protected: |
23 | void resetParser() override |
24 | { |
25 | IInputFormat::resetParser(); |
26 | stream->resetParser(); |
27 | read_prefix = false; |
28 | read_suffix = false; |
29 | } |
30 | |
31 | |
32 | Chunk generate() override |
33 | { |
34 | /// TODO: do something with totals and extremes. |
35 | |
36 | if (!read_prefix) |
37 | { |
38 | stream->readPrefix(); |
39 | read_prefix = true; |
40 | } |
41 | |
42 | auto block = stream->read(); |
43 | if (!block) |
44 | { |
45 | if (!read_suffix) |
46 | { |
47 | stream->readSuffix(); |
48 | read_suffix = true; |
49 | } |
50 | |
51 | return Chunk(); |
52 | } |
53 | |
54 | assertBlocksHaveEqualStructure(getPort().getHeader(), block, getName()); |
55 | block.checkNumberOfRows(); |
56 | |
57 | UInt64 num_rows = block.rows(); |
58 | return Chunk(block.getColumns(), num_rows); |
59 | } |
60 | |
61 | private: |
62 | std::shared_ptr<NativeBlockInputStream> stream; |
63 | bool read_prefix = false; |
64 | bool read_suffix = false; |
65 | }; |
66 | |
67 | |
68 | class NativeOutputFormatFromNativeBlockOutputStream : public IOutputFormat |
69 | { |
70 | public: |
71 | NativeOutputFormatFromNativeBlockOutputStream(const Block & , WriteBuffer & out_) |
72 | : IOutputFormat(header, out_) |
73 | , stream(std::make_shared<NativeBlockOutputStream>(out, 0, header)) |
74 | { |
75 | } |
76 | |
77 | String getName() const override { return "NativeOutputFormatFromNativeBlockOutputStream" ; } |
78 | |
79 | void setRowsBeforeLimit(size_t rows_before_limit) override |
80 | { |
81 | stream->setRowsBeforeLimit(rows_before_limit); |
82 | } |
83 | |
84 | void onProgress(const Progress & progress) override |
85 | { |
86 | stream->onProgress(progress); |
87 | } |
88 | |
89 | std::string getContentType() const override |
90 | { |
91 | return stream->getContentType(); |
92 | } |
93 | |
94 | protected: |
95 | void consume(Chunk chunk) override |
96 | { |
97 | writePrefixIfNot(); |
98 | |
99 | if (chunk) |
100 | { |
101 | |
102 | auto block = getPort(PortKind::Main).getHeader(); |
103 | block.setColumns(chunk.detachColumns()); |
104 | stream->write(block); |
105 | } |
106 | } |
107 | |
108 | void consumeTotals(Chunk chunk) override |
109 | { |
110 | writePrefixIfNot(); |
111 | |
112 | auto block = getPort(PortKind::Totals).getHeader(); |
113 | block.setColumns(chunk.detachColumns()); |
114 | stream->setTotals(block); |
115 | } |
116 | |
117 | void consumeExtremes(Chunk chunk) override |
118 | { |
119 | writePrefixIfNot(); |
120 | |
121 | auto block = getPort(PortKind::Extremes).getHeader(); |
122 | block.setColumns(chunk.detachColumns()); |
123 | stream->setExtremes(block); |
124 | } |
125 | |
126 | void finalize() override |
127 | { |
128 | writePrefixIfNot(); |
129 | writeSuffixIfNot(); |
130 | } |
131 | |
132 | private: |
133 | std::shared_ptr<NativeBlockOutputStream> stream; |
134 | bool prefix_written = false; |
135 | bool suffix_written = false; |
136 | |
137 | void writePrefixIfNot() |
138 | { |
139 | if (!prefix_written) |
140 | stream->writePrefix(); |
141 | |
142 | prefix_written = true; |
143 | } |
144 | |
145 | void writeSuffixIfNot() |
146 | { |
147 | if (!suffix_written) |
148 | stream->writeSuffix(); |
149 | |
150 | suffix_written = true; |
151 | } |
152 | }; |
153 | |
154 | void registerInputFormatProcessorNative(FormatFactory & factory) |
155 | { |
156 | factory.registerInputFormatProcessor("Native" , []( |
157 | ReadBuffer & buf, |
158 | const Block & sample, |
159 | const RowInputFormatParams &, |
160 | const FormatSettings &) |
161 | { |
162 | return std::make_shared<NativeInputFormatFromNativeBlockInputStream>(sample, buf); |
163 | }); |
164 | } |
165 | |
166 | void registerOutputFormatProcessorNative(FormatFactory & factory) |
167 | { |
168 | factory.registerOutputFormatProcessor("Native" , []( |
169 | WriteBuffer & buf, |
170 | const Block & sample, |
171 | FormatFactory::WriteCallback, |
172 | const FormatSettings &) |
173 | { |
174 | return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf); |
175 | }); |
176 | } |
177 | |
178 | } |
179 | |