1 | #include <IO/WriteHelpers.h> |
2 | #include <IO/WriteBufferValidUTF8.h> |
3 | #include <Processors/Formats/Impl/XMLRowOutputFormat.h> |
4 | #include <Formats/FormatFactory.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & , FormatFactory::WriteCallback callback, const FormatSettings & format_settings_) |
11 | : IRowOutputFormat(header_, out_, callback), format_settings(format_settings_) |
12 | { |
13 | auto & sample = getPort(PortKind::Main).getHeader(); |
14 | NamesAndTypesList columns(sample.getNamesAndTypesList()); |
15 | fields.assign(columns.begin(), columns.end()); |
16 | field_tag_names.resize(sample.columns()); |
17 | |
18 | bool need_validate_utf8 = false; |
19 | for (size_t i = 0; i < sample.columns(); ++i) |
20 | { |
21 | if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8()) |
22 | need_validate_utf8 = true; |
23 | |
24 | /// As element names, we will use the column name if it has a valid form, or "field", otherwise. |
25 | /// The condition below is more strict than the XML standard requires. |
26 | bool is_column_name_suitable = true; |
27 | const char * begin = fields[i].name.data(); |
28 | const char * end = begin + fields[i].name.size(); |
29 | for (const char * pos = begin; pos != end; ++pos) |
30 | { |
31 | char c = *pos; |
32 | if (!(isAlphaASCII(c) |
33 | || (pos != begin && isNumericASCII(c)) |
34 | || c == '_' |
35 | || c == '-' |
36 | || c == '.')) |
37 | { |
38 | is_column_name_suitable = false; |
39 | break; |
40 | } |
41 | } |
42 | |
43 | field_tag_names[i] = is_column_name_suitable |
44 | ? fields[i].name |
45 | : "field" ; |
46 | } |
47 | |
48 | if (need_validate_utf8) |
49 | { |
50 | validating_ostr = std::make_unique<WriteBufferValidUTF8>(out); |
51 | ostr = validating_ostr.get(); |
52 | } |
53 | else |
54 | ostr = &out; |
55 | } |
56 | |
57 | |
58 | void XMLRowOutputFormat::writePrefix() |
59 | { |
60 | writeCString("<?xml version='1.0' encoding='UTF-8' ?>\n" , *ostr); |
61 | writeCString("<result>\n" , *ostr); |
62 | writeCString("\t<meta>\n" , *ostr); |
63 | writeCString("\t\t<columns>\n" , *ostr); |
64 | |
65 | for (const auto & field : fields) |
66 | { |
67 | writeCString("\t\t\t<column>\n" , *ostr); |
68 | |
69 | writeCString("\t\t\t\t<name>" , *ostr); |
70 | writeXMLString(field.name, *ostr); |
71 | writeCString("</name>\n" , *ostr); |
72 | writeCString("\t\t\t\t<type>" , *ostr); |
73 | writeXMLString(field.type->getName(), *ostr); |
74 | writeCString("</type>\n" , *ostr); |
75 | |
76 | writeCString("\t\t\t</column>\n" , *ostr); |
77 | } |
78 | |
79 | writeCString("\t\t</columns>\n" , *ostr); |
80 | writeCString("\t</meta>\n" , *ostr); |
81 | writeCString("\t<data>\n" , *ostr); |
82 | } |
83 | |
84 | |
85 | void XMLRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) |
86 | { |
87 | writeCString("\t\t\t<" , *ostr); |
88 | writeString(field_tag_names[field_number], *ostr); |
89 | writeCString(">" , *ostr); |
90 | type.serializeAsTextXML(column, row_num, *ostr, format_settings); |
91 | writeCString("</" , *ostr); |
92 | writeString(field_tag_names[field_number], *ostr); |
93 | writeCString(">\n" , *ostr); |
94 | ++field_number; |
95 | } |
96 | |
97 | |
98 | void XMLRowOutputFormat::writeRowStartDelimiter() |
99 | { |
100 | writeCString("\t\t<row>\n" , *ostr); |
101 | } |
102 | |
103 | |
104 | void XMLRowOutputFormat::writeRowEndDelimiter() |
105 | { |
106 | writeCString("\t\t</row>\n" , *ostr); |
107 | field_number = 0; |
108 | ++row_count; |
109 | } |
110 | |
111 | |
112 | void XMLRowOutputFormat::writeSuffix() |
113 | { |
114 | writeCString("\t</data>\n" , *ostr); |
115 | |
116 | } |
117 | |
118 | |
119 | void XMLRowOutputFormat::writeBeforeTotals() |
120 | { |
121 | writeCString("\t<totals>\n" , *ostr); |
122 | } |
123 | |
124 | void XMLRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) |
125 | { |
126 | size_t totals_columns = columns.size(); |
127 | auto & = getPort(PortKind::Totals).getHeader(); |
128 | for (size_t i = 0; i < totals_columns; ++i) |
129 | { |
130 | const ColumnWithTypeAndName & column = header.safeGetByPosition(i); |
131 | |
132 | writeCString("\t\t<" , *ostr); |
133 | writeString(field_tag_names[i], *ostr); |
134 | writeCString(">" , *ostr); |
135 | column.type->serializeAsTextXML(*columns[i], row_num, *ostr, format_settings); |
136 | writeCString("</" , *ostr); |
137 | writeString(field_tag_names[i], *ostr); |
138 | writeCString(">\n" , *ostr); |
139 | } |
140 | } |
141 | |
142 | void XMLRowOutputFormat::writeAfterTotals() |
143 | { |
144 | writeCString("\t</totals>\n" , *ostr); |
145 | } |
146 | |
147 | |
148 | void XMLRowOutputFormat::writeBeforeExtremes() |
149 | { |
150 | writeCString("\t<extremes>\n" , *ostr); |
151 | } |
152 | |
153 | void XMLRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num) |
154 | { |
155 | writeExtremesElement("min" , columns, row_num); |
156 | } |
157 | |
158 | void XMLRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num) |
159 | { |
160 | writeExtremesElement("max" , columns, row_num); |
161 | } |
162 | |
163 | void XMLRowOutputFormat::writeAfterExtremes() |
164 | { |
165 | writeCString("\t</extremes>\n" , *ostr); |
166 | } |
167 | |
168 | void XMLRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num) |
169 | { |
170 | auto & = getPort(PortKind::Extremes).getHeader(); |
171 | |
172 | writeCString("\t\t<" , *ostr); |
173 | writeCString(title, *ostr); |
174 | writeCString(">\n" , *ostr); |
175 | |
176 | size_t extremes_columns = columns.size(); |
177 | for (size_t i = 0; i < extremes_columns; ++i) |
178 | { |
179 | const ColumnWithTypeAndName & column = header.safeGetByPosition(i); |
180 | |
181 | writeCString("\t\t\t<" , *ostr); |
182 | writeString(field_tag_names[i], *ostr); |
183 | writeCString(">" , *ostr); |
184 | column.type->serializeAsTextXML(*columns[i], row_num, *ostr, format_settings); |
185 | writeCString("</" , *ostr); |
186 | writeString(field_tag_names[i], *ostr); |
187 | writeCString(">\n" , *ostr); |
188 | } |
189 | |
190 | writeCString("\t\t</" , *ostr); |
191 | writeCString(title, *ostr); |
192 | writeCString(">\n" , *ostr); |
193 | } |
194 | |
195 | |
196 | void XMLRowOutputFormat::onProgress(const Progress & value) |
197 | { |
198 | progress.incrementPiecewiseAtomically(value); |
199 | } |
200 | |
201 | void XMLRowOutputFormat::writeLastSuffix() |
202 | { |
203 | |
204 | writeCString("\t<rows>" , *ostr); |
205 | writeIntText(row_count, *ostr); |
206 | writeCString("</rows>\n" , *ostr); |
207 | |
208 | writeRowsBeforeLimitAtLeast(); |
209 | |
210 | if (format_settings.write_statistics) |
211 | writeStatistics(); |
212 | |
213 | writeCString("</result>\n" , *ostr); |
214 | ostr->next(); |
215 | } |
216 | |
217 | void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast() |
218 | { |
219 | if (applied_limit) |
220 | { |
221 | writeCString("\t<rows_before_limit_at_least>" , *ostr); |
222 | writeIntText(rows_before_limit, *ostr); |
223 | writeCString("</rows_before_limit_at_least>\n" , *ostr); |
224 | } |
225 | } |
226 | |
227 | void XMLRowOutputFormat::writeStatistics() |
228 | { |
229 | writeCString("\t<statistics>\n" , *ostr); |
230 | writeCString("\t\t<elapsed>" , *ostr); |
231 | writeText(watch.elapsedSeconds(), *ostr); |
232 | writeCString("</elapsed>\n" , *ostr); |
233 | writeCString("\t\t<rows_read>" , *ostr); |
234 | writeText(progress.read_rows.load(), *ostr); |
235 | writeCString("</rows_read>\n" , *ostr); |
236 | writeCString("\t\t<bytes_read>" , *ostr); |
237 | writeText(progress.read_bytes.load(), *ostr); |
238 | writeCString("</bytes_read>\n" , *ostr); |
239 | writeCString("\t</statistics>\n" , *ostr); |
240 | } |
241 | |
242 | |
243 | void registerOutputFormatProcessorXML(FormatFactory & factory) |
244 | { |
245 | factory.registerOutputFormatProcessor("XML" , []( |
246 | WriteBuffer & buf, |
247 | const Block & sample, |
248 | FormatFactory::WriteCallback callback, |
249 | const FormatSettings & settings) |
250 | { |
251 | return std::make_shared<XMLRowOutputFormat>(buf, sample, callback, settings); |
252 | }); |
253 | } |
254 | |
255 | } |
256 | |