1#pragma once
2
3#include <Core/Types.h>
4#include <DataStreams/IBlockStream_fwd.h>
5#include <IO/BufferWithOwnMemory.h>
6
7#include <functional>
8#include <memory>
9#include <unordered_map>
10#include <boost/noncopyable.hpp>
11
12
13namespace DB
14{
15
16class Block;
17class Context;
18struct FormatSettings;
19
20class ReadBuffer;
21class WriteBuffer;
22
23class IProcessor;
24using ProcessorPtr = std::shared_ptr<IProcessor>;
25
26class IInputFormat;
27class IOutputFormat;
28
29struct RowInputFormatParams;
30
31using InputFormatPtr = std::shared_ptr<IInputFormat>;
32using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
33
34
35/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
36 * Note: format and compression are independent things.
37 */
38class FormatFactory final : private boost::noncopyable
39{
40public:
41 /// This callback allows to perform some additional actions after reading a single row.
42 /// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
43 using ReadCallback = std::function<void()>;
44
45 /** Fast reading data from buffer and save result to memory.
46 * Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
47 * Used in ParallelParsingBlockInputStream.
48 */
49 using FileSegmentationEngine = std::function<bool(
50 ReadBuffer & buf,
51 DB::Memory<> & memory,
52 size_t min_chunk_bytes)>;
53
54 /// This callback allows to perform some additional actions after writing a single row.
55 /// It's initial purpose was to flush Kafka message for each row.
56 using WriteCallback = std::function<void()>;
57
58private:
59 using InputCreator = std::function<BlockInputStreamPtr(
60 ReadBuffer & buf,
61 const Block & sample,
62 UInt64 max_block_size,
63 ReadCallback callback,
64 const FormatSettings & settings)>;
65
66 using OutputCreator = std::function<BlockOutputStreamPtr(
67 WriteBuffer & buf,
68 const Block & sample,
69 WriteCallback callback,
70 const FormatSettings & settings)>;
71
72 using InputProcessorCreator = std::function<InputFormatPtr(
73 ReadBuffer & buf,
74 const Block & header,
75 const RowInputFormatParams & params,
76 const FormatSettings & settings)>;
77
78 using OutputProcessorCreator = std::function<OutputFormatPtr(
79 WriteBuffer & buf,
80 const Block & sample,
81 WriteCallback callback,
82 const FormatSettings & settings)>;
83
84 struct Creators
85 {
86 InputCreator input_creator;
87 OutputCreator output_creator;
88 InputProcessorCreator input_processor_creator;
89 OutputProcessorCreator output_processor_creator;
90 FileSegmentationEngine file_segmentation_engine;
91 };
92
93 using FormatsDictionary = std::unordered_map<String, Creators>;
94
95public:
96
97 static FormatFactory & instance();
98
99 BlockInputStreamPtr getInput(
100 const String & name,
101 ReadBuffer & buf,
102 const Block & sample,
103 const Context & context,
104 UInt64 max_block_size,
105 ReadCallback callback = {}) const;
106
107 BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
108 const Block & sample, const Context & context, WriteCallback callback = {}) const;
109
110 InputFormatPtr getInputFormat(
111 const String & name,
112 ReadBuffer & buf,
113 const Block & sample,
114 const Context & context,
115 UInt64 max_block_size,
116 ReadCallback callback = {}) const;
117
118 OutputFormatPtr getOutputFormat(
119 const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;
120
121 /// Register format by its name.
122 void registerInputFormat(const String & name, InputCreator input_creator);
123 void registerOutputFormat(const String & name, OutputCreator output_creator);
124 void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
125
126 void registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator);
127 void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator);
128
129 const FormatsDictionary & getAllFormats() const
130 {
131 return dict;
132 }
133
134private:
135 /// FormatsDictionary dict;
136 FormatsDictionary dict;
137
138 FormatFactory();
139
140 const Creators & getCreators(const String & name) const;
141};
142
143/// Formats for both input/output.
144
145void registerInputFormatNative(FormatFactory & factory);
146void registerOutputFormatNative(FormatFactory & factory);
147
148void registerInputFormatProcessorNative(FormatFactory & factory);
149void registerOutputFormatProcessorNative(FormatFactory & factory);
150void registerInputFormatProcessorRowBinary(FormatFactory & factory);
151void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
152void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
153void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
154void registerInputFormatProcessorValues(FormatFactory & factory);
155void registerOutputFormatProcessorValues(FormatFactory & factory);
156void registerInputFormatProcessorCSV(FormatFactory & factory);
157void registerOutputFormatProcessorCSV(FormatFactory & factory);
158void registerInputFormatProcessorTSKV(FormatFactory & factory);
159void registerOutputFormatProcessorTSKV(FormatFactory & factory);
160void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
161void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
162void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
163void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
164void registerInputFormatProcessorParquet(FormatFactory & factory);
165void registerInputFormatProcessorORC(FormatFactory & factory);
166void registerOutputFormatProcessorParquet(FormatFactory & factory);
167void registerInputFormatProcessorProtobuf(FormatFactory & factory);
168void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
169void registerInputFormatProcessorTemplate(FormatFactory & factory);
170void registerOutputFormatProcessorTemplate(FormatFactory &factory);
171
172/// File Segmentation Engines for parallel reading
173
174void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
175void registerFileSegmentationEngineCSV(FormatFactory & factory);
176void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
177
178/// Output only (presentational) formats.
179
180void registerOutputFormatNull(FormatFactory & factory);
181
182void registerOutputFormatProcessorPretty(FormatFactory & factory);
183void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory);
184void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
185void registerOutputFormatProcessorVertical(FormatFactory & factory);
186void registerOutputFormatProcessorJSON(FormatFactory & factory);
187void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
188void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
189void registerOutputFormatProcessorXML(FormatFactory & factory);
190void registerOutputFormatProcessorODBCDriver(FormatFactory & factory);
191void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
192void registerOutputFormatProcessorNull(FormatFactory & factory);
193void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory);
194
195/// Input only formats.
196void registerInputFormatProcessorCapnProto(FormatFactory & factory);
197
198}
199