1 | #pragma once |
2 | |
3 | #include <Core/Types.h> |
4 | #include <DataStreams/IBlockStream_fwd.h> |
5 | #include <IO/BufferWithOwnMemory.h> |
6 | |
7 | #include <functional> |
8 | #include <memory> |
9 | #include <unordered_map> |
10 | #include <boost/noncopyable.hpp> |
11 | |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | class Block; |
17 | class Context; |
18 | struct FormatSettings; |
19 | |
20 | class ReadBuffer; |
21 | class WriteBuffer; |
22 | |
23 | class IProcessor; |
24 | using ProcessorPtr = std::shared_ptr<IProcessor>; |
25 | |
26 | class IInputFormat; |
27 | class IOutputFormat; |
28 | |
29 | struct RowInputFormatParams; |
30 | |
31 | using InputFormatPtr = std::shared_ptr<IInputFormat>; |
32 | using OutputFormatPtr = std::shared_ptr<IOutputFormat>; |
33 | |
34 | |
35 | /** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format. |
36 | * Note: format and compression are independent things. |
37 | */ |
38 | class FormatFactory final : private boost::noncopyable |
39 | { |
40 | public: |
41 | /// This callback allows to perform some additional actions after reading a single row. |
42 | /// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer. |
43 | using ReadCallback = std::function<void()>; |
44 | |
45 | /** Fast reading data from buffer and save result to memory. |
46 | * Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format. |
47 | * Used in ParallelParsingBlockInputStream. |
48 | */ |
49 | using FileSegmentationEngine = std::function<bool( |
50 | ReadBuffer & buf, |
51 | DB::Memory<> & memory, |
52 | size_t min_chunk_bytes)>; |
53 | |
54 | /// This callback allows to perform some additional actions after writing a single row. |
55 | /// It's initial purpose was to flush Kafka message for each row. |
56 | using WriteCallback = std::function<void()>; |
57 | |
58 | private: |
59 | using InputCreator = std::function<BlockInputStreamPtr( |
60 | ReadBuffer & buf, |
61 | const Block & sample, |
62 | UInt64 max_block_size, |
63 | ReadCallback callback, |
64 | const FormatSettings & settings)>; |
65 | |
66 | using OutputCreator = std::function<BlockOutputStreamPtr( |
67 | WriteBuffer & buf, |
68 | const Block & sample, |
69 | WriteCallback callback, |
70 | const FormatSettings & settings)>; |
71 | |
72 | using InputProcessorCreator = std::function<InputFormatPtr( |
73 | ReadBuffer & buf, |
74 | const Block & , |
75 | const RowInputFormatParams & params, |
76 | const FormatSettings & settings)>; |
77 | |
78 | using OutputProcessorCreator = std::function<OutputFormatPtr( |
79 | WriteBuffer & buf, |
80 | const Block & sample, |
81 | WriteCallback callback, |
82 | const FormatSettings & settings)>; |
83 | |
84 | struct Creators |
85 | { |
86 | InputCreator input_creator; |
87 | OutputCreator output_creator; |
88 | InputProcessorCreator input_processor_creator; |
89 | OutputProcessorCreator output_processor_creator; |
90 | FileSegmentationEngine file_segmentation_engine; |
91 | }; |
92 | |
93 | using FormatsDictionary = std::unordered_map<String, Creators>; |
94 | |
95 | public: |
96 | |
97 | static FormatFactory & instance(); |
98 | |
99 | BlockInputStreamPtr getInput( |
100 | const String & name, |
101 | ReadBuffer & buf, |
102 | const Block & sample, |
103 | const Context & context, |
104 | UInt64 max_block_size, |
105 | ReadCallback callback = {}) const; |
106 | |
107 | BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf, |
108 | const Block & sample, const Context & context, WriteCallback callback = {}) const; |
109 | |
110 | InputFormatPtr getInputFormat( |
111 | const String & name, |
112 | ReadBuffer & buf, |
113 | const Block & sample, |
114 | const Context & context, |
115 | UInt64 max_block_size, |
116 | ReadCallback callback = {}) const; |
117 | |
118 | OutputFormatPtr getOutputFormat( |
119 | const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const; |
120 | |
121 | /// Register format by its name. |
122 | void registerInputFormat(const String & name, InputCreator input_creator); |
123 | void registerOutputFormat(const String & name, OutputCreator output_creator); |
124 | void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine); |
125 | |
126 | void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator); |
127 | void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator); |
128 | |
129 | const FormatsDictionary & getAllFormats() const |
130 | { |
131 | return dict; |
132 | } |
133 | |
134 | private: |
135 | /// FormatsDictionary dict; |
136 | FormatsDictionary dict; |
137 | |
138 | FormatFactory(); |
139 | |
140 | const Creators & getCreators(const String & name) const; |
141 | }; |
142 | |
143 | /// Formats for both input/output. |
144 | |
145 | void registerInputFormatNative(FormatFactory & factory); |
146 | void registerOutputFormatNative(FormatFactory & factory); |
147 | |
148 | void registerInputFormatProcessorNative(FormatFactory & factory); |
149 | void registerOutputFormatProcessorNative(FormatFactory & factory); |
150 | void registerInputFormatProcessorRowBinary(FormatFactory & factory); |
151 | void registerOutputFormatProcessorRowBinary(FormatFactory & factory); |
152 | void registerInputFormatProcessorTabSeparated(FormatFactory & factory); |
153 | void registerOutputFormatProcessorTabSeparated(FormatFactory & factory); |
154 | void registerInputFormatProcessorValues(FormatFactory & factory); |
155 | void registerOutputFormatProcessorValues(FormatFactory & factory); |
156 | void registerInputFormatProcessorCSV(FormatFactory & factory); |
157 | void registerOutputFormatProcessorCSV(FormatFactory & factory); |
158 | void registerInputFormatProcessorTSKV(FormatFactory & factory); |
159 | void registerOutputFormatProcessorTSKV(FormatFactory & factory); |
160 | void registerInputFormatProcessorJSONEachRow(FormatFactory & factory); |
161 | void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory); |
162 | void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory); |
163 | void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory); |
164 | void registerInputFormatProcessorParquet(FormatFactory & factory); |
165 | void registerInputFormatProcessorORC(FormatFactory & factory); |
166 | void registerOutputFormatProcessorParquet(FormatFactory & factory); |
167 | void registerInputFormatProcessorProtobuf(FormatFactory & factory); |
168 | void registerOutputFormatProcessorProtobuf(FormatFactory & factory); |
169 | void registerInputFormatProcessorTemplate(FormatFactory & factory); |
170 | void registerOutputFormatProcessorTemplate(FormatFactory &factory); |
171 | |
172 | /// File Segmentation Engines for parallel reading |
173 | |
174 | void registerFileSegmentationEngineTabSeparated(FormatFactory & factory); |
175 | void registerFileSegmentationEngineCSV(FormatFactory & factory); |
176 | void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory); |
177 | |
178 | /// Output only (presentational) formats. |
179 | |
180 | void registerOutputFormatNull(FormatFactory & factory); |
181 | |
182 | void registerOutputFormatProcessorPretty(FormatFactory & factory); |
183 | void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory); |
184 | void registerOutputFormatProcessorPrettySpace(FormatFactory & factory); |
185 | void registerOutputFormatProcessorVertical(FormatFactory & factory); |
186 | void registerOutputFormatProcessorJSON(FormatFactory & factory); |
187 | void registerOutputFormatProcessorJSONCompact(FormatFactory & factory); |
188 | void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory); |
189 | void registerOutputFormatProcessorXML(FormatFactory & factory); |
190 | void registerOutputFormatProcessorODBCDriver(FormatFactory & factory); |
191 | void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory); |
192 | void registerOutputFormatProcessorNull(FormatFactory & factory); |
193 | void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory); |
194 | |
195 | /// Input only formats. |
196 | void registerInputFormatProcessorCapnProto(FormatFactory & factory); |
197 | |
198 | } |
199 | |