1#include <algorithm>
2#include <Common/config.h>
3#include <Common/Exception.h>
4#include <Interpreters/Context.h>
5#include <Core/Settings.h>
6#include <DataStreams/MaterializingBlockOutputStream.h>
7#include <DataStreams/ParallelParsingBlockInputStream.h>
8#include <Formats/FormatSettings.h>
9#include <Formats/FormatFactory.h>
10#include <Processors/Formats/IRowInputFormat.h>
11#include <Processors/Formats/InputStreamFromInputFormat.h>
12#include <Processors/Formats/OutputStreamToOutputFormat.h>
13#include <DataStreams/SquashingBlockOutputStream.h>
14#include <DataStreams/NativeBlockInputStream.h>
15#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
16#include <Processors/Formats/Impl/MySQLOutputFormat.h>
17
18
19namespace DB
20{
21
22namespace ErrorCodes
23{
24 extern const int UNKNOWN_FORMAT;
25 extern const int LOGICAL_ERROR;
26 extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT;
27 extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT;
28}
29
30const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const
31{
32 auto it = dict.find(name);
33 if (dict.end() != it)
34 return it->second;
35 throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
36}
37
38
39static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context)
40{
41 FormatSettings format_settings;
42 format_settings.csv.delimiter = settings.format_csv_delimiter;
43 format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
44 format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
45 format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
46 format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
47 format_settings.null_as_default = settings.input_format_null_as_default;
48 format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
49 format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
50 format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
51 format_settings.with_names_use_header = settings.input_format_with_names_use_header;
52 format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
53 format_settings.import_nested_json = settings.input_format_import_nested_json;
54 format_settings.date_time_input_format = settings.date_time_input_format;
55 format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
56 format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
57 format_settings.template_settings.resultset_format = settings.format_template_resultset;
58 format_settings.template_settings.row_format = settings.format_template_row;
59 format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
60 format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
61 format_settings.schema.format_schema = settings.format_schema;
62 format_settings.schema.format_schema_path = context.getFormatSchemaPath();
63 format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
64 format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
65 format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
66 format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
67 format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
68 format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
69 format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
70 format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
71
72 return format_settings;
73}
74
75static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
76{
77 FormatSettings format_settings;
78 format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
79 format_settings.json.quote_denormals = settings.output_format_json_quote_denormals;
80 format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
81 format_settings.csv.delimiter = settings.format_csv_delimiter;
82 format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
83 format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
84 format_settings.pretty.max_rows = settings.output_format_pretty_max_rows;
85 format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
86 format_settings.pretty.color = settings.output_format_pretty_color;
87 format_settings.template_settings.resultset_format = settings.format_template_resultset;
88 format_settings.template_settings.row_format = settings.format_template_row;
89 format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
90 format_settings.write_statistics = settings.output_format_write_statistics;
91 format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
92 format_settings.schema.format_schema = settings.format_schema;
93 format_settings.schema.format_schema_path = context.getFormatSchemaPath();
94 format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
95 format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
96 format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
97 format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
98 format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
99 format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
100 format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
101 format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
102
103 return format_settings;
104}
105
106
107BlockInputStreamPtr FormatFactory::getInput(
108 const String & name,
109 ReadBuffer & buf,
110 const Block & sample,
111 const Context & context,
112 UInt64 max_block_size,
113 ReadCallback callback) const
114{
115 if (name == "Native")
116 return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
117
118 if (!getCreators(name).input_processor_creator)
119 {
120 const auto & input_getter = getCreators(name).input_creator;
121 if (!input_getter)
122 throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
123
124 const Settings & settings = context.getSettingsRef();
125 FormatSettings format_settings = getInputFormatSetting(settings, context);
126
127 return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
128 }
129
130 const Settings & settings = context.getSettingsRef();
131 const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine;
132
133 // Doesn't make sense to use parallel parsing with less than four threads
134 // (segmentator + two parsers + reader).
135 if (settings.input_format_parallel_parsing
136 && file_segmentation_engine
137 && settings.max_threads >= 4)
138 {
139 const auto & input_getter = getCreators(name).input_processor_creator;
140 if (!input_getter)
141 throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
142
143 FormatSettings format_settings = getInputFormatSetting(settings, context);
144
145 RowInputFormatParams row_input_format_params;
146 row_input_format_params.max_block_size = max_block_size;
147 row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
148 row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
149 row_input_format_params.callback = std::move(callback);
150 row_input_format_params.max_execution_time = settings.max_execution_time;
151 row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
152
153 auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings};
154 ParallelParsingBlockInputStream::Params params{buf, input_getter,
155 input_creator_params, file_segmentation_engine,
156 static_cast<int>(settings.max_threads),
157 settings.min_chunk_bytes_for_parallel_parsing};
158 return std::make_shared<ParallelParsingBlockInputStream>(params);
159 }
160
161 auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
162 return std::make_shared<InputStreamFromInputFormat>(std::move(format));
163}
164
165
166BlockOutputStreamPtr FormatFactory::getOutput(
167 const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
168{
169 if (name == "PrettyCompactMonoBlock")
170 {
171 /// TODO: rewrite
172 auto format = getOutputFormat("PrettyCompact", buf, sample, context);
173 auto res = std::make_shared<SquashingBlockOutputStream>(
174 std::make_shared<OutputStreamToOutputFormat>(format),
175 sample, context.getSettingsRef().output_format_pretty_max_rows, 0);
176
177 res->disableFlush();
178
179 return std::make_shared<MaterializingBlockOutputStream>(res, sample);
180 }
181
182 if (!getCreators(name).output_processor_creator)
183 {
184 const auto & output_getter = getCreators(name).output_creator;
185 if (!output_getter)
186 throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
187
188 const Settings & settings = context.getSettingsRef();
189 FormatSettings format_settings = getOutputFormatSetting(settings, context);
190
191 /** Materialization is needed, because formats can use the functions `IDataType`,
192 * which only work with full columns.
193 */
194 return std::make_shared<MaterializingBlockOutputStream>(
195 output_getter(buf, sample, std::move(callback), format_settings), sample);
196 }
197
198 auto format = getOutputFormat(name, buf, sample, context, std::move(callback));
199 return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
200}
201
202
203InputFormatPtr FormatFactory::getInputFormat(
204 const String & name,
205 ReadBuffer & buf,
206 const Block & sample,
207 const Context & context,
208 UInt64 max_block_size,
209 ReadCallback callback) const
210{
211 const auto & input_getter = getCreators(name).input_processor_creator;
212 if (!input_getter)
213 throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
214
215 const Settings & settings = context.getSettingsRef();
216 FormatSettings format_settings = getInputFormatSetting(settings, context);
217
218 RowInputFormatParams params;
219 params.max_block_size = max_block_size;
220 params.allow_errors_num = format_settings.input_allow_errors_num;
221 params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
222 params.callback = std::move(callback);
223 params.max_execution_time = settings.max_execution_time;
224 params.timeout_overflow_mode = settings.timeout_overflow_mode;
225
226 auto format = input_getter(buf, sample, params, format_settings);
227
228 /// It's a kludge. Because I cannot remove context from values format.
229 if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
230 values->setContext(context);
231
232 return format;
233}
234
235
236OutputFormatPtr FormatFactory::getOutputFormat(
237 const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
238{
239 const auto & output_getter = getCreators(name).output_processor_creator;
240 if (!output_getter)
241 throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);
242
243 const Settings & settings = context.getSettingsRef();
244 FormatSettings format_settings = getOutputFormatSetting(settings, context);
245
246 /** TODO: Materialization is needed, because formats can use the functions `IDataType`,
247 * which only work with full columns.
248 */
249 auto format = output_getter(buf, sample, std::move(callback), format_settings);
250
251 /// It's a kludge. Because I cannot remove context from MySQL format.
252 if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get()))
253 mysql->setContext(context);
254
255 return format;
256}
257
258
259void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
260{
261 auto & target = dict[name].input_creator;
262 if (target)
263 throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
264 target = std::move(input_creator);
265}
266
267void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
268{
269 auto & target = dict[name].output_creator;
270 if (target)
271 throw Exception("FormatFactory: Output format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
272 target = std::move(output_creator);
273}
274
275void FormatFactory::registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator)
276{
277 auto & target = dict[name].input_processor_creator;
278 if (target)
279 throw Exception("FormatFactory: Input format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
280 target = std::move(input_creator);
281}
282
283void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator)
284{
285 auto & target = dict[name].output_processor_creator;
286 if (target)
287 throw Exception("FormatFactory: Output format " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
288 target = std::move(output_creator);
289}
290
291void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
292{
293 auto & target = dict[name].file_segmentation_engine;
294 if (target)
295 throw Exception("FormatFactory: File segmentation engine " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
296 target = std::move(file_segmentation_engine);
297}
298
299FormatFactory::FormatFactory()
300{
301 registerInputFormatNative(*this);
302 registerOutputFormatNative(*this);
303
304 registerOutputFormatProcessorJSONEachRowWithProgress(*this);
305
306 registerInputFormatProcessorNative(*this);
307 registerOutputFormatProcessorNative(*this);
308 registerInputFormatProcessorRowBinary(*this);
309 registerOutputFormatProcessorRowBinary(*this);
310 registerInputFormatProcessorTabSeparated(*this);
311 registerOutputFormatProcessorTabSeparated(*this);
312 registerInputFormatProcessorValues(*this);
313 registerOutputFormatProcessorValues(*this);
314 registerInputFormatProcessorCSV(*this);
315 registerOutputFormatProcessorCSV(*this);
316 registerInputFormatProcessorTSKV(*this);
317 registerOutputFormatProcessorTSKV(*this);
318 registerInputFormatProcessorJSONEachRow(*this);
319 registerOutputFormatProcessorJSONEachRow(*this);
320 registerInputFormatProcessorJSONCompactEachRow(*this);
321 registerOutputFormatProcessorJSONCompactEachRow(*this);
322 registerInputFormatProcessorProtobuf(*this);
323 registerOutputFormatProcessorProtobuf(*this);
324 registerInputFormatProcessorCapnProto(*this);
325 registerInputFormatProcessorORC(*this);
326 registerInputFormatProcessorParquet(*this);
327 registerOutputFormatProcessorParquet(*this);
328 registerInputFormatProcessorTemplate(*this);
329 registerOutputFormatProcessorTemplate(*this);
330
331 registerFileSegmentationEngineTabSeparated(*this);
332 registerFileSegmentationEngineCSV(*this);
333 registerFileSegmentationEngineJSONEachRow(*this);
334
335 registerOutputFormatNull(*this);
336
337 registerOutputFormatProcessorPretty(*this);
338 registerOutputFormatProcessorPrettyCompact(*this);
339 registerOutputFormatProcessorPrettySpace(*this);
340 registerOutputFormatProcessorVertical(*this);
341 registerOutputFormatProcessorJSON(*this);
342 registerOutputFormatProcessorJSONCompact(*this);
343 registerOutputFormatProcessorXML(*this);
344 registerOutputFormatProcessorODBCDriver(*this);
345 registerOutputFormatProcessorODBCDriver2(*this);
346 registerOutputFormatProcessorNull(*this);
347 registerOutputFormatProcessorMySQLWrite(*this);
348}
349
350FormatFactory & FormatFactory::instance()
351{
352 static FormatFactory ret;
353 return ret;
354}
355
356}
357