1#include <Processors/Formats/Impl/TemplateBlockOutputFormat.h>
2#include <Formats/FormatFactory.h>
3#include <IO/WriteHelpers.h>
4#include <DataTypes/DataTypesNumber.h>
5#include <Interpreters/Context.h>
6
7
8namespace DB
9{
10
11namespace ErrorCodes
12{
13 extern const int SYNTAX_ERROR;
14}
15
16TemplateBlockOutputFormat::TemplateBlockOutputFormat(const Block & header_, WriteBuffer & out_, const FormatSettings & settings_,
17 ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_,
18 std::string row_between_delimiter_)
19 : IOutputFormat(header_, out_), settings(settings_), format(std::move(format_))
20 , row_format(std::move(row_format_)), row_between_delimiter(std::move(row_between_delimiter_))
21{
22 auto & sample = getPort(PortKind::Main).getHeader();
23 size_t columns = sample.columns();
24 types.resize(columns);
25 for (size_t i = 0; i < columns; ++i)
26 types[i] = sample.safeGetByPosition(i).type;
27
28 /// Validate format string for whole output
29 size_t data_idx = format.format_idx_to_column_idx.size() + 1;
30 for (size_t i = 0; i < format.format_idx_to_column_idx.size(); ++i)
31 {
32 if (!format.format_idx_to_column_idx[i])
33 format.throwInvalidFormat("Output part name cannot be empty.", i);
34 switch (*format.format_idx_to_column_idx[i])
35 {
36 case static_cast<size_t>(ResultsetPart::Data):
37 data_idx = i;
38 [[fallthrough]];
39 case static_cast<size_t>(ResultsetPart::Totals):
40 case static_cast<size_t>(ResultsetPart::ExtremesMin):
41 case static_cast<size_t>(ResultsetPart::ExtremesMax):
42 if (format.formats[i] != ColumnFormat::None)
43 format.throwInvalidFormat("Serialization type for data, totals, min and max must be empty or None", i);
44 break;
45 case static_cast<size_t>(ResultsetPart::Rows):
46 case static_cast<size_t>(ResultsetPart::RowsBeforeLimit):
47 case static_cast<size_t>(ResultsetPart::TimeElapsed):
48 case static_cast<size_t>(ResultsetPart::RowsRead):
49 case static_cast<size_t>(ResultsetPart::BytesRead):
50 if (format.formats[i] == ColumnFormat::None)
51 format.throwInvalidFormat("Serialization type for output part rows, rows_before_limit, time, "
52 "rows_read or bytes_read is not specified", i);
53 break;
54 default:
55 format.throwInvalidFormat("Invalid output part", i);
56 }
57 }
58 if (data_idx != 0)
59 format.throwInvalidFormat("${data} must be the first output part", 0);
60
61 /// Validate format string for rows
62 if (row_format.delimiters.size() == 1)
63 row_format.throwInvalidFormat("No columns specified", 0);
64 for (size_t i = 0; i < row_format.columnsCount(); ++i)
65 {
66 if (!row_format.format_idx_to_column_idx[i])
67 row_format.throwInvalidFormat("Cannot skip format field for output, it's a bug.", i);
68 if (header_.columns() <= *row_format.format_idx_to_column_idx[i])
69 row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) +
70 " must be less then number of columns (" + std::to_string(header_.columns()) + ")", i);
71 if (row_format.formats[i] == ColumnFormat::None)
72 row_format.throwInvalidFormat("Serialization type for file column is not specified", i);
73 }
74}
75
76TemplateBlockOutputFormat::ResultsetPart TemplateBlockOutputFormat::stringToResultsetPart(const String & part)
77{
78 if (part == "data")
79 return ResultsetPart::Data;
80 else if (part == "totals")
81 return ResultsetPart::Totals;
82 else if (part == "min")
83 return ResultsetPart::ExtremesMin;
84 else if (part == "max")
85 return ResultsetPart::ExtremesMax;
86 else if (part == "rows")
87 return ResultsetPart::Rows;
88 else if (part == "rows_before_limit")
89 return ResultsetPart::RowsBeforeLimit;
90 else if (part == "time")
91 return ResultsetPart::TimeElapsed;
92 else if (part == "rows_read")
93 return ResultsetPart::RowsRead;
94 else if (part == "bytes_read")
95 return ResultsetPart::BytesRead;
96 else
97 throw Exception("Unknown output part " + part, ErrorCodes::SYNTAX_ERROR);
98}
99
100void TemplateBlockOutputFormat::writeRow(const Chunk & chunk, size_t row_num)
101{
102 size_t columns = row_format.format_idx_to_column_idx.size();
103 for (size_t j = 0; j < columns; ++j)
104 {
105 writeString(row_format.delimiters[j], out);
106
107 size_t col_idx = *row_format.format_idx_to_column_idx[j];
108 serializeField(*chunk.getColumns()[col_idx], *types[col_idx], row_num, row_format.formats[j]);
109 }
110 writeString(row_format.delimiters[columns], out);
111}
112
113void TemplateBlockOutputFormat::serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat col_format)
114{
115 switch (col_format)
116 {
117 case ColumnFormat::Escaped:
118 type.serializeAsTextEscaped(column, row_num, out, settings);
119 break;
120 case ColumnFormat::Quoted:
121 type.serializeAsTextQuoted(column, row_num, out, settings);
122 break;
123 case ColumnFormat::Csv:
124 type.serializeAsTextCSV(column, row_num, out, settings);
125 break;
126 case ColumnFormat::Json:
127 type.serializeAsTextJSON(column, row_num, out, settings);
128 break;
129 case ColumnFormat::Xml:
130 type.serializeAsTextXML(column, row_num, out, settings);
131 break;
132 case ColumnFormat::Raw:
133 type.serializeAsText(column, row_num, out, settings);
134 break;
135 default:
136 __builtin_unreachable();
137 }
138}
139
140template <typename U, typename V> void TemplateBlockOutputFormat::writeValue(U value, ColumnFormat col_format)
141{
142 auto type = std::make_unique<V>();
143 auto col = type->createColumn();
144 col->insert(value);
145 serializeField(*col, *type, 0, col_format);
146}
147
148void TemplateBlockOutputFormat::consume(Chunk chunk)
149{
150 doWritePrefix();
151
152 size_t rows = chunk.getNumRows();
153
154 for (size_t i = 0; i < rows; ++i)
155 {
156 if (row_count)
157 writeString(row_between_delimiter, out);
158
159 writeRow(chunk, i);
160 ++row_count;
161 }
162}
163
164void TemplateBlockOutputFormat::doWritePrefix()
165{
166 if (need_write_prefix)
167 {
168 writeString(format.delimiters.front(), out);
169 need_write_prefix = false;
170 }
171}
172
173void TemplateBlockOutputFormat::finalize()
174{
175 if (finalized)
176 return;
177
178 doWritePrefix();
179
180 size_t parts = format.format_idx_to_column_idx.size();
181
182 for (size_t i = 0; i < parts; ++i)
183 {
184 auto type = std::make_shared<DataTypeUInt64>();
185 ColumnWithTypeAndName col(type->createColumnConst(1, row_count), type, String("tmp"));
186 switch (static_cast<ResultsetPart>(*format.format_idx_to_column_idx[i]))
187 {
188 case ResultsetPart::Totals:
189 if (!totals)
190 format.throwInvalidFormat("Cannot print totals for this request", i);
191 writeRow(totals, 0);
192 break;
193 case ResultsetPart::ExtremesMin:
194 if (!extremes)
195 format.throwInvalidFormat("Cannot print extremes for this request", i);
196 writeRow(extremes, 0);
197 break;
198 case ResultsetPart::ExtremesMax:
199 if (!extremes)
200 format.throwInvalidFormat("Cannot print extremes for this request", i);
201 writeRow(extremes, 1);
202 break;
203 case ResultsetPart::Rows:
204 writeValue<size_t, DataTypeUInt64>(row_count, format.formats[i]);
205 break;
206 case ResultsetPart::RowsBeforeLimit:
207 if (!rows_before_limit_set)
208 format.throwInvalidFormat("Cannot print rows_before_limit for this request", i);
209 writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.formats[i]);
210 break;
211 case ResultsetPart::TimeElapsed:
212 writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.formats[i]);
213 break;
214 case ResultsetPart::RowsRead:
215 writeValue<size_t, DataTypeUInt64>(progress.read_rows.load(), format.formats[i]);
216 break;
217 case ResultsetPart::BytesRead:
218 writeValue<size_t, DataTypeUInt64>(progress.read_bytes.load(), format.formats[i]);
219 break;
220 default:
221 break;
222 }
223 writeString(format.delimiters[i + 1], out);
224 }
225
226 finalized = true;
227}
228
229
230void registerOutputFormatProcessorTemplate(FormatFactory & factory)
231{
232 factory.registerOutputFormatProcessor("Template", [](
233 WriteBuffer & buf,
234 const Block & sample,
235 FormatFactory::WriteCallback,
236 const FormatSettings & settings)
237 {
238 ParsedTemplateFormatString resultset_format;
239 if (settings.template_settings.resultset_format.empty())
240 {
241 /// Default format string: "${data}"
242 resultset_format.delimiters.resize(2);
243 resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None);
244 resultset_format.format_idx_to_column_idx.emplace_back(0);
245 resultset_format.column_names.emplace_back("data");
246 }
247 else
248 {
249 /// Read format string from file
250 resultset_format = ParsedTemplateFormatString(
251 FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false,
252 settings.schema.is_server, settings.schema.format_schema_path),
253 [&](const String & partName)
254 {
255 return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName));
256 });
257 }
258
259 ParsedTemplateFormatString row_format = ParsedTemplateFormatString(
260 FormatSchemaInfo(settings.template_settings.row_format, "Template", false,
261 settings.schema.is_server, settings.schema.format_schema_path),
262 [&](const String & colName)
263 {
264 return sample.getPositionByName(colName);
265 });
266
267 return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
268 });
269
270 factory.registerOutputFormatProcessor("CustomSeparated", [](
271 WriteBuffer & buf,
272 const Block & sample,
273 FormatFactory::WriteCallback,
274 const FormatSettings & settings)
275 {
276 ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom);
277 ParsedTemplateFormatString row_format = ParsedTemplateFormatString::setupCustomSeparatedRowFormat(settings.custom, sample);
278
279 return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.custom.row_between_delimiter);
280 });
281}
282}
283