1#include <IO/WriteHelpers.h>
2#include <IO/WriteBufferValidUTF8.h>
3#include <Processors/Formats/Impl/XMLRowOutputFormat.h>
4#include <Formats/FormatFactory.h>
5
6
7namespace DB
8{
9
10XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
11 : IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
12{
13 auto & sample = getPort(PortKind::Main).getHeader();
14 NamesAndTypesList columns(sample.getNamesAndTypesList());
15 fields.assign(columns.begin(), columns.end());
16 field_tag_names.resize(sample.columns());
17
18 bool need_validate_utf8 = false;
19 for (size_t i = 0; i < sample.columns(); ++i)
20 {
21 if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8())
22 need_validate_utf8 = true;
23
24 /// As element names, we will use the column name if it has a valid form, or "field", otherwise.
25 /// The condition below is more strict than the XML standard requires.
26 bool is_column_name_suitable = true;
27 const char * begin = fields[i].name.data();
28 const char * end = begin + fields[i].name.size();
29 for (const char * pos = begin; pos != end; ++pos)
30 {
31 char c = *pos;
32 if (!(isAlphaASCII(c)
33 || (pos != begin && isNumericASCII(c))
34 || c == '_'
35 || c == '-'
36 || c == '.'))
37 {
38 is_column_name_suitable = false;
39 break;
40 }
41 }
42
43 field_tag_names[i] = is_column_name_suitable
44 ? fields[i].name
45 : "field";
46 }
47
48 if (need_validate_utf8)
49 {
50 validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
51 ostr = validating_ostr.get();
52 }
53 else
54 ostr = &out;
55}
56
57
58void XMLRowOutputFormat::writePrefix()
59{
60 writeCString("<?xml version='1.0' encoding='UTF-8' ?>\n", *ostr);
61 writeCString("<result>\n", *ostr);
62 writeCString("\t<meta>\n", *ostr);
63 writeCString("\t\t<columns>\n", *ostr);
64
65 for (const auto & field : fields)
66 {
67 writeCString("\t\t\t<column>\n", *ostr);
68
69 writeCString("\t\t\t\t<name>", *ostr);
70 writeXMLString(field.name, *ostr);
71 writeCString("</name>\n", *ostr);
72 writeCString("\t\t\t\t<type>", *ostr);
73 writeXMLString(field.type->getName(), *ostr);
74 writeCString("</type>\n", *ostr);
75
76 writeCString("\t\t\t</column>\n", *ostr);
77 }
78
79 writeCString("\t\t</columns>\n", *ostr);
80 writeCString("\t</meta>\n", *ostr);
81 writeCString("\t<data>\n", *ostr);
82}
83
84
85void XMLRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
86{
87 writeCString("\t\t\t<", *ostr);
88 writeString(field_tag_names[field_number], *ostr);
89 writeCString(">", *ostr);
90 type.serializeAsTextXML(column, row_num, *ostr, format_settings);
91 writeCString("</", *ostr);
92 writeString(field_tag_names[field_number], *ostr);
93 writeCString(">\n", *ostr);
94 ++field_number;
95}
96
97
98void XMLRowOutputFormat::writeRowStartDelimiter()
99{
100 writeCString("\t\t<row>\n", *ostr);
101}
102
103
104void XMLRowOutputFormat::writeRowEndDelimiter()
105{
106 writeCString("\t\t</row>\n", *ostr);
107 field_number = 0;
108 ++row_count;
109}
110
111
112void XMLRowOutputFormat::writeSuffix()
113{
114 writeCString("\t</data>\n", *ostr);
115
116}
117
118
119void XMLRowOutputFormat::writeBeforeTotals()
120{
121 writeCString("\t<totals>\n", *ostr);
122}
123
124void XMLRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
125{
126 size_t totals_columns = columns.size();
127 auto & header = getPort(PortKind::Totals).getHeader();
128 for (size_t i = 0; i < totals_columns; ++i)
129 {
130 const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
131
132 writeCString("\t\t<", *ostr);
133 writeString(field_tag_names[i], *ostr);
134 writeCString(">", *ostr);
135 column.type->serializeAsTextXML(*columns[i], row_num, *ostr, format_settings);
136 writeCString("</", *ostr);
137 writeString(field_tag_names[i], *ostr);
138 writeCString(">\n", *ostr);
139 }
140}
141
142void XMLRowOutputFormat::writeAfterTotals()
143{
144 writeCString("\t</totals>\n", *ostr);
145}
146
147
148void XMLRowOutputFormat::writeBeforeExtremes()
149{
150 writeCString("\t<extremes>\n", *ostr);
151}
152
153void XMLRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num)
154{
155 writeExtremesElement("min", columns, row_num);
156}
157
158void XMLRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num)
159{
160 writeExtremesElement("max", columns, row_num);
161}
162
163void XMLRowOutputFormat::writeAfterExtremes()
164{
165 writeCString("\t</extremes>\n", *ostr);
166}
167
168void XMLRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
169{
170 auto & header = getPort(PortKind::Extremes).getHeader();
171
172 writeCString("\t\t<", *ostr);
173 writeCString(title, *ostr);
174 writeCString(">\n", *ostr);
175
176 size_t extremes_columns = columns.size();
177 for (size_t i = 0; i < extremes_columns; ++i)
178 {
179 const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
180
181 writeCString("\t\t\t<", *ostr);
182 writeString(field_tag_names[i], *ostr);
183 writeCString(">", *ostr);
184 column.type->serializeAsTextXML(*columns[i], row_num, *ostr, format_settings);
185 writeCString("</", *ostr);
186 writeString(field_tag_names[i], *ostr);
187 writeCString(">\n", *ostr);
188 }
189
190 writeCString("\t\t</", *ostr);
191 writeCString(title, *ostr);
192 writeCString(">\n", *ostr);
193}
194
195
196void XMLRowOutputFormat::onProgress(const Progress & value)
197{
198 progress.incrementPiecewiseAtomically(value);
199}
200
201void XMLRowOutputFormat::writeLastSuffix()
202{
203
204 writeCString("\t<rows>", *ostr);
205 writeIntText(row_count, *ostr);
206 writeCString("</rows>\n", *ostr);
207
208 writeRowsBeforeLimitAtLeast();
209
210 if (format_settings.write_statistics)
211 writeStatistics();
212
213 writeCString("</result>\n", *ostr);
214 ostr->next();
215}
216
217void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast()
218{
219 if (applied_limit)
220 {
221 writeCString("\t<rows_before_limit_at_least>", *ostr);
222 writeIntText(rows_before_limit, *ostr);
223 writeCString("</rows_before_limit_at_least>\n", *ostr);
224 }
225}
226
227void XMLRowOutputFormat::writeStatistics()
228{
229 writeCString("\t<statistics>\n", *ostr);
230 writeCString("\t\t<elapsed>", *ostr);
231 writeText(watch.elapsedSeconds(), *ostr);
232 writeCString("</elapsed>\n", *ostr);
233 writeCString("\t\t<rows_read>", *ostr);
234 writeText(progress.read_rows.load(), *ostr);
235 writeCString("</rows_read>\n", *ostr);
236 writeCString("\t\t<bytes_read>", *ostr);
237 writeText(progress.read_bytes.load(), *ostr);
238 writeCString("</bytes_read>\n", *ostr);
239 writeCString("\t</statistics>\n", *ostr);
240}
241
242
243void registerOutputFormatProcessorXML(FormatFactory & factory)
244{
245 factory.registerOutputFormatProcessor("XML", [](
246 WriteBuffer & buf,
247 const Block & sample,
248 FormatFactory::WriteCallback callback,
249 const FormatSettings & settings)
250 {
251 return std::make_shared<XMLRowOutputFormat>(buf, sample, callback, settings);
252 });
253}
254
255}
256