1 | #include <algorithm> |
2 | #include <Common/config.h> |
3 | #include <Common/Exception.h> |
4 | #include <Interpreters/Context.h> |
5 | #include <Core/Settings.h> |
6 | #include <DataStreams/MaterializingBlockOutputStream.h> |
7 | #include <DataStreams/ParallelParsingBlockInputStream.h> |
8 | #include <Formats/FormatSettings.h> |
9 | #include <Formats/FormatFactory.h> |
10 | #include <Processors/Formats/IRowInputFormat.h> |
11 | #include <Processors/Formats/InputStreamFromInputFormat.h> |
12 | #include <Processors/Formats/OutputStreamToOutputFormat.h> |
13 | #include <DataStreams/SquashingBlockOutputStream.h> |
14 | #include <DataStreams/NativeBlockInputStream.h> |
15 | #include <Processors/Formats/Impl/ValuesBlockInputFormat.h> |
16 | #include <Processors/Formats/Impl/MySQLOutputFormat.h> |
17 | |
18 | |
19 | namespace DB |
20 | { |
21 | |
22 | namespace ErrorCodes |
23 | { |
24 | extern const int UNKNOWN_FORMAT; |
25 | extern const int LOGICAL_ERROR; |
26 | extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT; |
27 | extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT; |
28 | } |
29 | |
30 | const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const |
31 | { |
32 | auto it = dict.find(name); |
33 | if (dict.end() != it) |
34 | return it->second; |
35 | throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT); |
36 | } |
37 | |
38 | |
39 | static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context) |
40 | { |
41 | FormatSettings format_settings; |
42 | format_settings.csv.delimiter = settings.format_csv_delimiter; |
43 | format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; |
44 | format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; |
45 | format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null; |
46 | format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields; |
47 | format_settings.null_as_default = settings.input_format_null_as_default; |
48 | format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions; |
49 | format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions; |
50 | format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals; |
51 | format_settings.with_names_use_header = settings.input_format_with_names_use_header; |
52 | format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; |
53 | format_settings.import_nested_json = settings.input_format_import_nested_json; |
54 | format_settings.date_time_input_format = settings.date_time_input_format; |
55 | format_settings.input_allow_errors_num = settings.input_format_allow_errors_num; |
56 | format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio; |
57 | format_settings.template_settings.resultset_format = settings.format_template_resultset; |
58 | format_settings.template_settings.row_format = settings.format_template_row; |
59 | format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; |
60 | format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default; |
61 | format_settings.schema.format_schema = settings.format_schema; |
62 | format_settings.schema.format_schema_path = context.getFormatSchemaPath(); |
63 | format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER); |
64 | format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter; |
65 | format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter; |
66 | format_settings.custom.escaping_rule = settings.format_custom_escaping_rule; |
67 | format_settings.custom.field_delimiter = settings.format_custom_field_delimiter; |
68 | format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; |
69 | format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; |
70 | format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; |
71 | |
72 | return format_settings; |
73 | } |
74 | |
75 | static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context) |
76 | { |
77 | FormatSettings format_settings; |
78 | format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers; |
79 | format_settings.json.quote_denormals = settings.output_format_json_quote_denormals; |
80 | format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes; |
81 | format_settings.csv.delimiter = settings.format_csv_delimiter; |
82 | format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; |
83 | format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; |
84 | format_settings.pretty.max_rows = settings.output_format_pretty_max_rows; |
85 | format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width; |
86 | format_settings.pretty.color = settings.output_format_pretty_color; |
87 | format_settings.template_settings.resultset_format = settings.format_template_resultset; |
88 | format_settings.template_settings.row_format = settings.format_template_row; |
89 | format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; |
90 | format_settings.write_statistics = settings.output_format_write_statistics; |
91 | format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size; |
92 | format_settings.schema.format_schema = settings.format_schema; |
93 | format_settings.schema.format_schema_path = context.getFormatSchemaPath(); |
94 | format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER); |
95 | format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter; |
96 | format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter; |
97 | format_settings.custom.escaping_rule = settings.format_custom_escaping_rule; |
98 | format_settings.custom.field_delimiter = settings.format_custom_field_delimiter; |
99 | format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; |
100 | format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; |
101 | format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; |
102 | |
103 | return format_settings; |
104 | } |
105 | |
106 | |
107 | BlockInputStreamPtr FormatFactory::getInput( |
108 | const String & name, |
109 | ReadBuffer & buf, |
110 | const Block & sample, |
111 | const Context & context, |
112 | UInt64 max_block_size, |
113 | ReadCallback callback) const |
114 | { |
115 | if (name == "Native" ) |
116 | return std::make_shared<NativeBlockInputStream>(buf, sample, 0); |
117 | |
118 | if (!getCreators(name).input_processor_creator) |
119 | { |
120 | const auto & input_getter = getCreators(name).input_creator; |
121 | if (!input_getter) |
122 | throw Exception("Format " + name + " is not suitable for input" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); |
123 | |
124 | const Settings & settings = context.getSettingsRef(); |
125 | FormatSettings format_settings = getInputFormatSetting(settings, context); |
126 | |
127 | return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings); |
128 | } |
129 | |
130 | const Settings & settings = context.getSettingsRef(); |
131 | const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine; |
132 | |
133 | // Doesn't make sense to use parallel parsing with less than four threads |
134 | // (segmentator + two parsers + reader). |
135 | if (settings.input_format_parallel_parsing |
136 | && file_segmentation_engine |
137 | && settings.max_threads >= 4) |
138 | { |
139 | const auto & input_getter = getCreators(name).input_processor_creator; |
140 | if (!input_getter) |
141 | throw Exception("Format " + name + " is not suitable for input" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); |
142 | |
143 | FormatSettings format_settings = getInputFormatSetting(settings, context); |
144 | |
145 | RowInputFormatParams row_input_format_params; |
146 | row_input_format_params.max_block_size = max_block_size; |
147 | row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num; |
148 | row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio; |
149 | row_input_format_params.callback = std::move(callback); |
150 | row_input_format_params.max_execution_time = settings.max_execution_time; |
151 | row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode; |
152 | |
153 | auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings}; |
154 | ParallelParsingBlockInputStream::Params params{buf, input_getter, |
155 | input_creator_params, file_segmentation_engine, |
156 | static_cast<int>(settings.max_threads), |
157 | settings.min_chunk_bytes_for_parallel_parsing}; |
158 | return std::make_shared<ParallelParsingBlockInputStream>(params); |
159 | } |
160 | |
161 | auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback)); |
162 | return std::make_shared<InputStreamFromInputFormat>(std::move(format)); |
163 | } |
164 | |
165 | |
166 | BlockOutputStreamPtr FormatFactory::getOutput( |
167 | const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const |
168 | { |
169 | if (name == "PrettyCompactMonoBlock" ) |
170 | { |
171 | /// TODO: rewrite |
172 | auto format = getOutputFormat("PrettyCompact" , buf, sample, context); |
173 | auto res = std::make_shared<SquashingBlockOutputStream>( |
174 | std::make_shared<OutputStreamToOutputFormat>(format), |
175 | sample, context.getSettingsRef().output_format_pretty_max_rows, 0); |
176 | |
177 | res->disableFlush(); |
178 | |
179 | return std::make_shared<MaterializingBlockOutputStream>(res, sample); |
180 | } |
181 | |
182 | if (!getCreators(name).output_processor_creator) |
183 | { |
184 | const auto & output_getter = getCreators(name).output_creator; |
185 | if (!output_getter) |
186 | throw Exception("Format " + name + " is not suitable for output" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); |
187 | |
188 | const Settings & settings = context.getSettingsRef(); |
189 | FormatSettings format_settings = getOutputFormatSetting(settings, context); |
190 | |
191 | /** Materialization is needed, because formats can use the functions `IDataType`, |
192 | * which only work with full columns. |
193 | */ |
194 | return std::make_shared<MaterializingBlockOutputStream>( |
195 | output_getter(buf, sample, std::move(callback), format_settings), sample); |
196 | } |
197 | |
198 | auto format = getOutputFormat(name, buf, sample, context, std::move(callback)); |
199 | return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample); |
200 | } |
201 | |
202 | |
203 | InputFormatPtr FormatFactory::getInputFormat( |
204 | const String & name, |
205 | ReadBuffer & buf, |
206 | const Block & sample, |
207 | const Context & context, |
208 | UInt64 max_block_size, |
209 | ReadCallback callback) const |
210 | { |
211 | const auto & input_getter = getCreators(name).input_processor_creator; |
212 | if (!input_getter) |
213 | throw Exception("Format " + name + " is not suitable for input" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); |
214 | |
215 | const Settings & settings = context.getSettingsRef(); |
216 | FormatSettings format_settings = getInputFormatSetting(settings, context); |
217 | |
218 | RowInputFormatParams params; |
219 | params.max_block_size = max_block_size; |
220 | params.allow_errors_num = format_settings.input_allow_errors_num; |
221 | params.allow_errors_ratio = format_settings.input_allow_errors_ratio; |
222 | params.callback = std::move(callback); |
223 | params.max_execution_time = settings.max_execution_time; |
224 | params.timeout_overflow_mode = settings.timeout_overflow_mode; |
225 | |
226 | auto format = input_getter(buf, sample, params, format_settings); |
227 | |
228 | /// It's a kludge. Because I cannot remove context from values format. |
229 | if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get())) |
230 | values->setContext(context); |
231 | |
232 | return format; |
233 | } |
234 | |
235 | |
236 | OutputFormatPtr FormatFactory::getOutputFormat( |
237 | const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const |
238 | { |
239 | const auto & output_getter = getCreators(name).output_processor_creator; |
240 | if (!output_getter) |
241 | throw Exception("Format " + name + " is not suitable for output" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); |
242 | |
243 | const Settings & settings = context.getSettingsRef(); |
244 | FormatSettings format_settings = getOutputFormatSetting(settings, context); |
245 | |
246 | /** TODO: Materialization is needed, because formats can use the functions `IDataType`, |
247 | * which only work with full columns. |
248 | */ |
249 | auto format = output_getter(buf, sample, std::move(callback), format_settings); |
250 | |
251 | /// It's a kludge. Because I cannot remove context from MySQL format. |
252 | if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get())) |
253 | mysql->setContext(context); |
254 | |
255 | return format; |
256 | } |
257 | |
258 | |
259 | void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator) |
260 | { |
261 | auto & target = dict[name].input_creator; |
262 | if (target) |
263 | throw Exception("FormatFactory: Input format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
264 | target = std::move(input_creator); |
265 | } |
266 | |
267 | void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator) |
268 | { |
269 | auto & target = dict[name].output_creator; |
270 | if (target) |
271 | throw Exception("FormatFactory: Output format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
272 | target = std::move(output_creator); |
273 | } |
274 | |
275 | void FormatFactory::registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator) |
276 | { |
277 | auto & target = dict[name].input_processor_creator; |
278 | if (target) |
279 | throw Exception("FormatFactory: Input format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
280 | target = std::move(input_creator); |
281 | } |
282 | |
283 | void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator) |
284 | { |
285 | auto & target = dict[name].output_processor_creator; |
286 | if (target) |
287 | throw Exception("FormatFactory: Output format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
288 | target = std::move(output_creator); |
289 | } |
290 | |
291 | void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine) |
292 | { |
293 | auto & target = dict[name].file_segmentation_engine; |
294 | if (target) |
295 | throw Exception("FormatFactory: File segmentation engine " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
296 | target = std::move(file_segmentation_engine); |
297 | } |
298 | |
299 | FormatFactory::FormatFactory() |
300 | { |
301 | registerInputFormatNative(*this); |
302 | registerOutputFormatNative(*this); |
303 | |
304 | registerOutputFormatProcessorJSONEachRowWithProgress(*this); |
305 | |
306 | registerInputFormatProcessorNative(*this); |
307 | registerOutputFormatProcessorNative(*this); |
308 | registerInputFormatProcessorRowBinary(*this); |
309 | registerOutputFormatProcessorRowBinary(*this); |
310 | registerInputFormatProcessorTabSeparated(*this); |
311 | registerOutputFormatProcessorTabSeparated(*this); |
312 | registerInputFormatProcessorValues(*this); |
313 | registerOutputFormatProcessorValues(*this); |
314 | registerInputFormatProcessorCSV(*this); |
315 | registerOutputFormatProcessorCSV(*this); |
316 | registerInputFormatProcessorTSKV(*this); |
317 | registerOutputFormatProcessorTSKV(*this); |
318 | registerInputFormatProcessorJSONEachRow(*this); |
319 | registerOutputFormatProcessorJSONEachRow(*this); |
320 | registerInputFormatProcessorJSONCompactEachRow(*this); |
321 | registerOutputFormatProcessorJSONCompactEachRow(*this); |
322 | registerInputFormatProcessorProtobuf(*this); |
323 | registerOutputFormatProcessorProtobuf(*this); |
324 | registerInputFormatProcessorCapnProto(*this); |
325 | registerInputFormatProcessorORC(*this); |
326 | registerInputFormatProcessorParquet(*this); |
327 | registerOutputFormatProcessorParquet(*this); |
328 | registerInputFormatProcessorTemplate(*this); |
329 | registerOutputFormatProcessorTemplate(*this); |
330 | |
331 | registerFileSegmentationEngineTabSeparated(*this); |
332 | registerFileSegmentationEngineCSV(*this); |
333 | registerFileSegmentationEngineJSONEachRow(*this); |
334 | |
335 | registerOutputFormatNull(*this); |
336 | |
337 | registerOutputFormatProcessorPretty(*this); |
338 | registerOutputFormatProcessorPrettyCompact(*this); |
339 | registerOutputFormatProcessorPrettySpace(*this); |
340 | registerOutputFormatProcessorVertical(*this); |
341 | registerOutputFormatProcessorJSON(*this); |
342 | registerOutputFormatProcessorJSONCompact(*this); |
343 | registerOutputFormatProcessorXML(*this); |
344 | registerOutputFormatProcessorODBCDriver(*this); |
345 | registerOutputFormatProcessorODBCDriver2(*this); |
346 | registerOutputFormatProcessorNull(*this); |
347 | registerOutputFormatProcessorMySQLWrite(*this); |
348 | } |
349 | |
350 | FormatFactory & FormatFactory::instance() |
351 | { |
352 | static FormatFactory ret; |
353 | return ret; |
354 | } |
355 | |
356 | } |
357 | |