| 1 | #include <algorithm> |
| 2 | #include <Common/config.h> |
| 3 | #include <Common/Exception.h> |
| 4 | #include <Interpreters/Context.h> |
| 5 | #include <Core/Settings.h> |
| 6 | #include <DataStreams/MaterializingBlockOutputStream.h> |
| 7 | #include <DataStreams/ParallelParsingBlockInputStream.h> |
| 8 | #include <Formats/FormatSettings.h> |
| 9 | #include <Formats/FormatFactory.h> |
| 10 | #include <Processors/Formats/IRowInputFormat.h> |
| 11 | #include <Processors/Formats/InputStreamFromInputFormat.h> |
| 12 | #include <Processors/Formats/OutputStreamToOutputFormat.h> |
| 13 | #include <DataStreams/SquashingBlockOutputStream.h> |
| 14 | #include <DataStreams/NativeBlockInputStream.h> |
| 15 | #include <Processors/Formats/Impl/ValuesBlockInputFormat.h> |
| 16 | #include <Processors/Formats/Impl/MySQLOutputFormat.h> |
| 17 | |
| 18 | |
| 19 | namespace DB |
| 20 | { |
| 21 | |
| 22 | namespace ErrorCodes |
| 23 | { |
| 24 | extern const int UNKNOWN_FORMAT; |
| 25 | extern const int LOGICAL_ERROR; |
| 26 | extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT; |
| 27 | extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT; |
| 28 | } |
| 29 | |
| 30 | const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const |
| 31 | { |
| 32 | auto it = dict.find(name); |
| 33 | if (dict.end() != it) |
| 34 | return it->second; |
| 35 | throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT); |
| 36 | } |
| 37 | |
| 38 | |
| 39 | static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context) |
| 40 | { |
| 41 | FormatSettings format_settings; |
| 42 | format_settings.csv.delimiter = settings.format_csv_delimiter; |
| 43 | format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; |
| 44 | format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; |
| 45 | format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null; |
| 46 | format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields; |
| 47 | format_settings.null_as_default = settings.input_format_null_as_default; |
| 48 | format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions; |
| 49 | format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions; |
| 50 | format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals; |
| 51 | format_settings.with_names_use_header = settings.input_format_with_names_use_header; |
| 52 | format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; |
| 53 | format_settings.import_nested_json = settings.input_format_import_nested_json; |
| 54 | format_settings.date_time_input_format = settings.date_time_input_format; |
| 55 | format_settings.input_allow_errors_num = settings.input_format_allow_errors_num; |
| 56 | format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio; |
| 57 | format_settings.template_settings.resultset_format = settings.format_template_resultset; |
| 58 | format_settings.template_settings.row_format = settings.format_template_row; |
| 59 | format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; |
| 60 | format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default; |
| 61 | format_settings.schema.format_schema = settings.format_schema; |
| 62 | format_settings.schema.format_schema_path = context.getFormatSchemaPath(); |
| 63 | format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER); |
| 64 | format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter; |
| 65 | format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter; |
| 66 | format_settings.custom.escaping_rule = settings.format_custom_escaping_rule; |
| 67 | format_settings.custom.field_delimiter = settings.format_custom_field_delimiter; |
| 68 | format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; |
| 69 | format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; |
| 70 | format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; |
| 71 | |
| 72 | return format_settings; |
| 73 | } |
| 74 | |
| 75 | static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context) |
| 76 | { |
| 77 | FormatSettings format_settings; |
| 78 | format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers; |
| 79 | format_settings.json.quote_denormals = settings.output_format_json_quote_denormals; |
| 80 | format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes; |
| 81 | format_settings.csv.delimiter = settings.format_csv_delimiter; |
| 82 | format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; |
| 83 | format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; |
| 84 | format_settings.pretty.max_rows = settings.output_format_pretty_max_rows; |
| 85 | format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width; |
| 86 | format_settings.pretty.color = settings.output_format_pretty_color; |
| 87 | format_settings.template_settings.resultset_format = settings.format_template_resultset; |
| 88 | format_settings.template_settings.row_format = settings.format_template_row; |
| 89 | format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; |
| 90 | format_settings.write_statistics = settings.output_format_write_statistics; |
| 91 | format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size; |
| 92 | format_settings.schema.format_schema = settings.format_schema; |
| 93 | format_settings.schema.format_schema_path = context.getFormatSchemaPath(); |
| 94 | format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER); |
| 95 | format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter; |
| 96 | format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter; |
| 97 | format_settings.custom.escaping_rule = settings.format_custom_escaping_rule; |
| 98 | format_settings.custom.field_delimiter = settings.format_custom_field_delimiter; |
| 99 | format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; |
| 100 | format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; |
| 101 | format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; |
| 102 | |
| 103 | return format_settings; |
| 104 | } |
| 105 | |
| 106 | |
| 107 | BlockInputStreamPtr FormatFactory::getInput( |
| 108 | const String & name, |
| 109 | ReadBuffer & buf, |
| 110 | const Block & sample, |
| 111 | const Context & context, |
| 112 | UInt64 max_block_size, |
| 113 | ReadCallback callback) const |
| 114 | { |
| 115 | if (name == "Native" ) |
| 116 | return std::make_shared<NativeBlockInputStream>(buf, sample, 0); |
| 117 | |
| 118 | if (!getCreators(name).input_processor_creator) |
| 119 | { |
| 120 | const auto & input_getter = getCreators(name).input_creator; |
| 121 | if (!input_getter) |
| 122 | throw Exception("Format " + name + " is not suitable for input" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); |
| 123 | |
| 124 | const Settings & settings = context.getSettingsRef(); |
| 125 | FormatSettings format_settings = getInputFormatSetting(settings, context); |
| 126 | |
| 127 | return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings); |
| 128 | } |
| 129 | |
| 130 | const Settings & settings = context.getSettingsRef(); |
| 131 | const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine; |
| 132 | |
| 133 | // Doesn't make sense to use parallel parsing with less than four threads |
| 134 | // (segmentator + two parsers + reader). |
| 135 | if (settings.input_format_parallel_parsing |
| 136 | && file_segmentation_engine |
| 137 | && settings.max_threads >= 4) |
| 138 | { |
| 139 | const auto & input_getter = getCreators(name).input_processor_creator; |
| 140 | if (!input_getter) |
| 141 | throw Exception("Format " + name + " is not suitable for input" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); |
| 142 | |
| 143 | FormatSettings format_settings = getInputFormatSetting(settings, context); |
| 144 | |
| 145 | RowInputFormatParams row_input_format_params; |
| 146 | row_input_format_params.max_block_size = max_block_size; |
| 147 | row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num; |
| 148 | row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio; |
| 149 | row_input_format_params.callback = std::move(callback); |
| 150 | row_input_format_params.max_execution_time = settings.max_execution_time; |
| 151 | row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode; |
| 152 | |
| 153 | auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings}; |
| 154 | ParallelParsingBlockInputStream::Params params{buf, input_getter, |
| 155 | input_creator_params, file_segmentation_engine, |
| 156 | static_cast<int>(settings.max_threads), |
| 157 | settings.min_chunk_bytes_for_parallel_parsing}; |
| 158 | return std::make_shared<ParallelParsingBlockInputStream>(params); |
| 159 | } |
| 160 | |
| 161 | auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback)); |
| 162 | return std::make_shared<InputStreamFromInputFormat>(std::move(format)); |
| 163 | } |
| 164 | |
| 165 | |
| 166 | BlockOutputStreamPtr FormatFactory::getOutput( |
| 167 | const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const |
| 168 | { |
| 169 | if (name == "PrettyCompactMonoBlock" ) |
| 170 | { |
| 171 | /// TODO: rewrite |
| 172 | auto format = getOutputFormat("PrettyCompact" , buf, sample, context); |
| 173 | auto res = std::make_shared<SquashingBlockOutputStream>( |
| 174 | std::make_shared<OutputStreamToOutputFormat>(format), |
| 175 | sample, context.getSettingsRef().output_format_pretty_max_rows, 0); |
| 176 | |
| 177 | res->disableFlush(); |
| 178 | |
| 179 | return std::make_shared<MaterializingBlockOutputStream>(res, sample); |
| 180 | } |
| 181 | |
| 182 | if (!getCreators(name).output_processor_creator) |
| 183 | { |
| 184 | const auto & output_getter = getCreators(name).output_creator; |
| 185 | if (!output_getter) |
| 186 | throw Exception("Format " + name + " is not suitable for output" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); |
| 187 | |
| 188 | const Settings & settings = context.getSettingsRef(); |
| 189 | FormatSettings format_settings = getOutputFormatSetting(settings, context); |
| 190 | |
| 191 | /** Materialization is needed, because formats can use the functions `IDataType`, |
| 192 | * which only work with full columns. |
| 193 | */ |
| 194 | return std::make_shared<MaterializingBlockOutputStream>( |
| 195 | output_getter(buf, sample, std::move(callback), format_settings), sample); |
| 196 | } |
| 197 | |
| 198 | auto format = getOutputFormat(name, buf, sample, context, std::move(callback)); |
| 199 | return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample); |
| 200 | } |
| 201 | |
| 202 | |
| 203 | InputFormatPtr FormatFactory::getInputFormat( |
| 204 | const String & name, |
| 205 | ReadBuffer & buf, |
| 206 | const Block & sample, |
| 207 | const Context & context, |
| 208 | UInt64 max_block_size, |
| 209 | ReadCallback callback) const |
| 210 | { |
| 211 | const auto & input_getter = getCreators(name).input_processor_creator; |
| 212 | if (!input_getter) |
| 213 | throw Exception("Format " + name + " is not suitable for input" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT); |
| 214 | |
| 215 | const Settings & settings = context.getSettingsRef(); |
| 216 | FormatSettings format_settings = getInputFormatSetting(settings, context); |
| 217 | |
| 218 | RowInputFormatParams params; |
| 219 | params.max_block_size = max_block_size; |
| 220 | params.allow_errors_num = format_settings.input_allow_errors_num; |
| 221 | params.allow_errors_ratio = format_settings.input_allow_errors_ratio; |
| 222 | params.callback = std::move(callback); |
| 223 | params.max_execution_time = settings.max_execution_time; |
| 224 | params.timeout_overflow_mode = settings.timeout_overflow_mode; |
| 225 | |
| 226 | auto format = input_getter(buf, sample, params, format_settings); |
| 227 | |
| 228 | /// It's a kludge. Because I cannot remove context from values format. |
| 229 | if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get())) |
| 230 | values->setContext(context); |
| 231 | |
| 232 | return format; |
| 233 | } |
| 234 | |
| 235 | |
| 236 | OutputFormatPtr FormatFactory::getOutputFormat( |
| 237 | const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const |
| 238 | { |
| 239 | const auto & output_getter = getCreators(name).output_processor_creator; |
| 240 | if (!output_getter) |
| 241 | throw Exception("Format " + name + " is not suitable for output" , ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); |
| 242 | |
| 243 | const Settings & settings = context.getSettingsRef(); |
| 244 | FormatSettings format_settings = getOutputFormatSetting(settings, context); |
| 245 | |
| 246 | /** TODO: Materialization is needed, because formats can use the functions `IDataType`, |
| 247 | * which only work with full columns. |
| 248 | */ |
| 249 | auto format = output_getter(buf, sample, std::move(callback), format_settings); |
| 250 | |
| 251 | /// It's a kludge. Because I cannot remove context from MySQL format. |
| 252 | if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get())) |
| 253 | mysql->setContext(context); |
| 254 | |
| 255 | return format; |
| 256 | } |
| 257 | |
| 258 | |
| 259 | void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator) |
| 260 | { |
| 261 | auto & target = dict[name].input_creator; |
| 262 | if (target) |
| 263 | throw Exception("FormatFactory: Input format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
| 264 | target = std::move(input_creator); |
| 265 | } |
| 266 | |
| 267 | void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator) |
| 268 | { |
| 269 | auto & target = dict[name].output_creator; |
| 270 | if (target) |
| 271 | throw Exception("FormatFactory: Output format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
| 272 | target = std::move(output_creator); |
| 273 | } |
| 274 | |
| 275 | void FormatFactory::registerInputFormatProcessor(const String & name, InputProcessorCreator input_creator) |
| 276 | { |
| 277 | auto & target = dict[name].input_processor_creator; |
| 278 | if (target) |
| 279 | throw Exception("FormatFactory: Input format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
| 280 | target = std::move(input_creator); |
| 281 | } |
| 282 | |
| 283 | void FormatFactory::registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator) |
| 284 | { |
| 285 | auto & target = dict[name].output_processor_creator; |
| 286 | if (target) |
| 287 | throw Exception("FormatFactory: Output format " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
| 288 | target = std::move(output_creator); |
| 289 | } |
| 290 | |
| 291 | void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine) |
| 292 | { |
| 293 | auto & target = dict[name].file_segmentation_engine; |
| 294 | if (target) |
| 295 | throw Exception("FormatFactory: File segmentation engine " + name + " is already registered" , ErrorCodes::LOGICAL_ERROR); |
| 296 | target = std::move(file_segmentation_engine); |
| 297 | } |
| 298 | |
| 299 | FormatFactory::FormatFactory() |
| 300 | { |
| 301 | registerInputFormatNative(*this); |
| 302 | registerOutputFormatNative(*this); |
| 303 | |
| 304 | registerOutputFormatProcessorJSONEachRowWithProgress(*this); |
| 305 | |
| 306 | registerInputFormatProcessorNative(*this); |
| 307 | registerOutputFormatProcessorNative(*this); |
| 308 | registerInputFormatProcessorRowBinary(*this); |
| 309 | registerOutputFormatProcessorRowBinary(*this); |
| 310 | registerInputFormatProcessorTabSeparated(*this); |
| 311 | registerOutputFormatProcessorTabSeparated(*this); |
| 312 | registerInputFormatProcessorValues(*this); |
| 313 | registerOutputFormatProcessorValues(*this); |
| 314 | registerInputFormatProcessorCSV(*this); |
| 315 | registerOutputFormatProcessorCSV(*this); |
| 316 | registerInputFormatProcessorTSKV(*this); |
| 317 | registerOutputFormatProcessorTSKV(*this); |
| 318 | registerInputFormatProcessorJSONEachRow(*this); |
| 319 | registerOutputFormatProcessorJSONEachRow(*this); |
| 320 | registerInputFormatProcessorJSONCompactEachRow(*this); |
| 321 | registerOutputFormatProcessorJSONCompactEachRow(*this); |
| 322 | registerInputFormatProcessorProtobuf(*this); |
| 323 | registerOutputFormatProcessorProtobuf(*this); |
| 324 | registerInputFormatProcessorCapnProto(*this); |
| 325 | registerInputFormatProcessorORC(*this); |
| 326 | registerInputFormatProcessorParquet(*this); |
| 327 | registerOutputFormatProcessorParquet(*this); |
| 328 | registerInputFormatProcessorTemplate(*this); |
| 329 | registerOutputFormatProcessorTemplate(*this); |
| 330 | |
| 331 | registerFileSegmentationEngineTabSeparated(*this); |
| 332 | registerFileSegmentationEngineCSV(*this); |
| 333 | registerFileSegmentationEngineJSONEachRow(*this); |
| 334 | |
| 335 | registerOutputFormatNull(*this); |
| 336 | |
| 337 | registerOutputFormatProcessorPretty(*this); |
| 338 | registerOutputFormatProcessorPrettyCompact(*this); |
| 339 | registerOutputFormatProcessorPrettySpace(*this); |
| 340 | registerOutputFormatProcessorVertical(*this); |
| 341 | registerOutputFormatProcessorJSON(*this); |
| 342 | registerOutputFormatProcessorJSONCompact(*this); |
| 343 | registerOutputFormatProcessorXML(*this); |
| 344 | registerOutputFormatProcessorODBCDriver(*this); |
| 345 | registerOutputFormatProcessorODBCDriver2(*this); |
| 346 | registerOutputFormatProcessorNull(*this); |
| 347 | registerOutputFormatProcessorMySQLWrite(*this); |
| 348 | } |
| 349 | |
| 350 | FormatFactory & FormatFactory::instance() |
| 351 | { |
| 352 | static FormatFactory ret; |
| 353 | return ret; |
| 354 | } |
| 355 | |
| 356 | } |
| 357 | |