1 | #include <Processors/Formats/Impl/TemplateBlockOutputFormat.h> |
2 | #include <Formats/FormatFactory.h> |
3 | #include <IO/WriteHelpers.h> |
4 | #include <DataTypes/DataTypesNumber.h> |
5 | #include <Interpreters/Context.h> |
6 | |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | namespace ErrorCodes |
12 | { |
13 | extern const int SYNTAX_ERROR; |
14 | } |
15 | |
16 | TemplateBlockOutputFormat::TemplateBlockOutputFormat(const Block & , WriteBuffer & out_, const FormatSettings & settings_, |
17 | ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_, |
18 | std::string row_between_delimiter_) |
19 | : IOutputFormat(header_, out_), settings(settings_), format(std::move(format_)) |
20 | , row_format(std::move(row_format_)), row_between_delimiter(std::move(row_between_delimiter_)) |
21 | { |
22 | auto & sample = getPort(PortKind::Main).getHeader(); |
23 | size_t columns = sample.columns(); |
24 | types.resize(columns); |
25 | for (size_t i = 0; i < columns; ++i) |
26 | types[i] = sample.safeGetByPosition(i).type; |
27 | |
28 | /// Validate format string for whole output |
29 | size_t data_idx = format.format_idx_to_column_idx.size() + 1; |
30 | for (size_t i = 0; i < format.format_idx_to_column_idx.size(); ++i) |
31 | { |
32 | if (!format.format_idx_to_column_idx[i]) |
33 | format.throwInvalidFormat("Output part name cannot be empty." , i); |
34 | switch (*format.format_idx_to_column_idx[i]) |
35 | { |
36 | case static_cast<size_t>(ResultsetPart::Data): |
37 | data_idx = i; |
38 | [[fallthrough]]; |
39 | case static_cast<size_t>(ResultsetPart::Totals): |
40 | case static_cast<size_t>(ResultsetPart::ExtremesMin): |
41 | case static_cast<size_t>(ResultsetPart::ExtremesMax): |
42 | if (format.formats[i] != ColumnFormat::None) |
43 | format.throwInvalidFormat("Serialization type for data, totals, min and max must be empty or None" , i); |
44 | break; |
45 | case static_cast<size_t>(ResultsetPart::Rows): |
46 | case static_cast<size_t>(ResultsetPart::RowsBeforeLimit): |
47 | case static_cast<size_t>(ResultsetPart::TimeElapsed): |
48 | case static_cast<size_t>(ResultsetPart::RowsRead): |
49 | case static_cast<size_t>(ResultsetPart::BytesRead): |
50 | if (format.formats[i] == ColumnFormat::None) |
51 | format.throwInvalidFormat("Serialization type for output part rows, rows_before_limit, time, " |
52 | "rows_read or bytes_read is not specified" , i); |
53 | break; |
54 | default: |
55 | format.throwInvalidFormat("Invalid output part" , i); |
56 | } |
57 | } |
58 | if (data_idx != 0) |
59 | format.throwInvalidFormat("${data} must be the first output part" , 0); |
60 | |
61 | /// Validate format string for rows |
62 | if (row_format.delimiters.size() == 1) |
63 | row_format.throwInvalidFormat("No columns specified" , 0); |
64 | for (size_t i = 0; i < row_format.columnsCount(); ++i) |
65 | { |
66 | if (!row_format.format_idx_to_column_idx[i]) |
67 | row_format.throwInvalidFormat("Cannot skip format field for output, it's a bug." , i); |
68 | if (header_.columns() <= *row_format.format_idx_to_column_idx[i]) |
69 | row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) + |
70 | " must be less then number of columns (" + std::to_string(header_.columns()) + ")" , i); |
71 | if (row_format.formats[i] == ColumnFormat::None) |
72 | row_format.throwInvalidFormat("Serialization type for file column is not specified" , i); |
73 | } |
74 | } |
75 | |
76 | TemplateBlockOutputFormat::ResultsetPart TemplateBlockOutputFormat::stringToResultsetPart(const String & part) |
77 | { |
78 | if (part == "data" ) |
79 | return ResultsetPart::Data; |
80 | else if (part == "totals" ) |
81 | return ResultsetPart::Totals; |
82 | else if (part == "min" ) |
83 | return ResultsetPart::ExtremesMin; |
84 | else if (part == "max" ) |
85 | return ResultsetPart::ExtremesMax; |
86 | else if (part == "rows" ) |
87 | return ResultsetPart::Rows; |
88 | else if (part == "rows_before_limit" ) |
89 | return ResultsetPart::RowsBeforeLimit; |
90 | else if (part == "time" ) |
91 | return ResultsetPart::TimeElapsed; |
92 | else if (part == "rows_read" ) |
93 | return ResultsetPart::RowsRead; |
94 | else if (part == "bytes_read" ) |
95 | return ResultsetPart::BytesRead; |
96 | else |
97 | throw Exception("Unknown output part " + part, ErrorCodes::SYNTAX_ERROR); |
98 | } |
99 | |
100 | void TemplateBlockOutputFormat::writeRow(const Chunk & chunk, size_t row_num) |
101 | { |
102 | size_t columns = row_format.format_idx_to_column_idx.size(); |
103 | for (size_t j = 0; j < columns; ++j) |
104 | { |
105 | writeString(row_format.delimiters[j], out); |
106 | |
107 | size_t col_idx = *row_format.format_idx_to_column_idx[j]; |
108 | serializeField(*chunk.getColumns()[col_idx], *types[col_idx], row_num, row_format.formats[j]); |
109 | } |
110 | writeString(row_format.delimiters[columns], out); |
111 | } |
112 | |
113 | void TemplateBlockOutputFormat::serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat col_format) |
114 | { |
115 | switch (col_format) |
116 | { |
117 | case ColumnFormat::Escaped: |
118 | type.serializeAsTextEscaped(column, row_num, out, settings); |
119 | break; |
120 | case ColumnFormat::Quoted: |
121 | type.serializeAsTextQuoted(column, row_num, out, settings); |
122 | break; |
123 | case ColumnFormat::Csv: |
124 | type.serializeAsTextCSV(column, row_num, out, settings); |
125 | break; |
126 | case ColumnFormat::Json: |
127 | type.serializeAsTextJSON(column, row_num, out, settings); |
128 | break; |
129 | case ColumnFormat::Xml: |
130 | type.serializeAsTextXML(column, row_num, out, settings); |
131 | break; |
132 | case ColumnFormat::Raw: |
133 | type.serializeAsText(column, row_num, out, settings); |
134 | break; |
135 | default: |
136 | __builtin_unreachable(); |
137 | } |
138 | } |
139 | |
140 | template <typename U, typename V> void TemplateBlockOutputFormat::writeValue(U value, ColumnFormat col_format) |
141 | { |
142 | auto type = std::make_unique<V>(); |
143 | auto col = type->createColumn(); |
144 | col->insert(value); |
145 | serializeField(*col, *type, 0, col_format); |
146 | } |
147 | |
148 | void TemplateBlockOutputFormat::consume(Chunk chunk) |
149 | { |
150 | doWritePrefix(); |
151 | |
152 | size_t rows = chunk.getNumRows(); |
153 | |
154 | for (size_t i = 0; i < rows; ++i) |
155 | { |
156 | if (row_count) |
157 | writeString(row_between_delimiter, out); |
158 | |
159 | writeRow(chunk, i); |
160 | ++row_count; |
161 | } |
162 | } |
163 | |
164 | void TemplateBlockOutputFormat::doWritePrefix() |
165 | { |
166 | if (need_write_prefix) |
167 | { |
168 | writeString(format.delimiters.front(), out); |
169 | need_write_prefix = false; |
170 | } |
171 | } |
172 | |
173 | void TemplateBlockOutputFormat::finalize() |
174 | { |
175 | if (finalized) |
176 | return; |
177 | |
178 | doWritePrefix(); |
179 | |
180 | size_t parts = format.format_idx_to_column_idx.size(); |
181 | |
182 | for (size_t i = 0; i < parts; ++i) |
183 | { |
184 | auto type = std::make_shared<DataTypeUInt64>(); |
185 | ColumnWithTypeAndName col(type->createColumnConst(1, row_count), type, String("tmp" )); |
186 | switch (static_cast<ResultsetPart>(*format.format_idx_to_column_idx[i])) |
187 | { |
188 | case ResultsetPart::Totals: |
189 | if (!totals) |
190 | format.throwInvalidFormat("Cannot print totals for this request" , i); |
191 | writeRow(totals, 0); |
192 | break; |
193 | case ResultsetPart::ExtremesMin: |
194 | if (!extremes) |
195 | format.throwInvalidFormat("Cannot print extremes for this request" , i); |
196 | writeRow(extremes, 0); |
197 | break; |
198 | case ResultsetPart::ExtremesMax: |
199 | if (!extremes) |
200 | format.throwInvalidFormat("Cannot print extremes for this request" , i); |
201 | writeRow(extremes, 1); |
202 | break; |
203 | case ResultsetPart::Rows: |
204 | writeValue<size_t, DataTypeUInt64>(row_count, format.formats[i]); |
205 | break; |
206 | case ResultsetPart::RowsBeforeLimit: |
207 | if (!rows_before_limit_set) |
208 | format.throwInvalidFormat("Cannot print rows_before_limit for this request" , i); |
209 | writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.formats[i]); |
210 | break; |
211 | case ResultsetPart::TimeElapsed: |
212 | writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.formats[i]); |
213 | break; |
214 | case ResultsetPart::RowsRead: |
215 | writeValue<size_t, DataTypeUInt64>(progress.read_rows.load(), format.formats[i]); |
216 | break; |
217 | case ResultsetPart::BytesRead: |
218 | writeValue<size_t, DataTypeUInt64>(progress.read_bytes.load(), format.formats[i]); |
219 | break; |
220 | default: |
221 | break; |
222 | } |
223 | writeString(format.delimiters[i + 1], out); |
224 | } |
225 | |
226 | finalized = true; |
227 | } |
228 | |
229 | |
230 | void registerOutputFormatProcessorTemplate(FormatFactory & factory) |
231 | { |
232 | factory.registerOutputFormatProcessor("Template" , []( |
233 | WriteBuffer & buf, |
234 | const Block & sample, |
235 | FormatFactory::WriteCallback, |
236 | const FormatSettings & settings) |
237 | { |
238 | ParsedTemplateFormatString resultset_format; |
239 | if (settings.template_settings.resultset_format.empty()) |
240 | { |
241 | /// Default format string: "${data}" |
242 | resultset_format.delimiters.resize(2); |
243 | resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None); |
244 | resultset_format.format_idx_to_column_idx.emplace_back(0); |
245 | resultset_format.column_names.emplace_back("data" ); |
246 | } |
247 | else |
248 | { |
249 | /// Read format string from file |
250 | resultset_format = ParsedTemplateFormatString( |
251 | FormatSchemaInfo(settings.template_settings.resultset_format, "Template" , false, |
252 | settings.schema.is_server, settings.schema.format_schema_path), |
253 | [&](const String & partName) |
254 | { |
255 | return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName)); |
256 | }); |
257 | } |
258 | |
259 | ParsedTemplateFormatString row_format = ParsedTemplateFormatString( |
260 | FormatSchemaInfo(settings.template_settings.row_format, "Template" , false, |
261 | settings.schema.is_server, settings.schema.format_schema_path), |
262 | [&](const String & colName) |
263 | { |
264 | return sample.getPositionByName(colName); |
265 | }); |
266 | |
267 | return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter); |
268 | }); |
269 | |
270 | factory.registerOutputFormatProcessor("CustomSeparated" , []( |
271 | WriteBuffer & buf, |
272 | const Block & sample, |
273 | FormatFactory::WriteCallback, |
274 | const FormatSettings & settings) |
275 | { |
276 | ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom); |
277 | ParsedTemplateFormatString row_format = ParsedTemplateFormatString::setupCustomSeparatedRowFormat(settings.custom, sample); |
278 | |
279 | return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.custom.row_between_delimiter); |
280 | }); |
281 | } |
282 | } |
283 | |