1 | #include <IO/WriteHelpers.h> |
2 | #include <IO/WriteBufferValidUTF8.h> |
3 | #include <Processors/Formats/Impl/JSONRowOutputFormat.h> |
4 | #include <Formats/FormatFactory.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & , FormatFactory::WriteCallback callback, const FormatSettings & settings_) |
11 | : IRowOutputFormat(header, out_, callback), settings(settings_) |
12 | { |
13 | auto & sample = getPort(PortKind::Main).getHeader(); |
14 | NamesAndTypesList columns(sample.getNamesAndTypesList()); |
15 | fields.assign(columns.begin(), columns.end()); |
16 | |
17 | bool need_validate_utf8 = false; |
18 | for (size_t i = 0; i < sample.columns(); ++i) |
19 | { |
20 | if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8()) |
21 | need_validate_utf8 = true; |
22 | |
23 | WriteBufferFromOwnString buf; |
24 | writeJSONString(fields[i].name, buf, settings); |
25 | |
26 | fields[i].name = buf.str(); |
27 | } |
28 | |
29 | if (need_validate_utf8) |
30 | { |
31 | validating_ostr = std::make_unique<WriteBufferValidUTF8>(out); |
32 | ostr = validating_ostr.get(); |
33 | } |
34 | else |
35 | ostr = &out; |
36 | } |
37 | |
38 | |
39 | void JSONRowOutputFormat::writePrefix() |
40 | { |
41 | writeCString("{\n" , *ostr); |
42 | writeCString("\t\"meta\":\n" , *ostr); |
43 | writeCString("\t[\n" , *ostr); |
44 | |
45 | for (size_t i = 0; i < fields.size(); ++i) |
46 | { |
47 | writeCString("\t\t{\n" , *ostr); |
48 | |
49 | writeCString("\t\t\t\"name\": " , *ostr); |
50 | writeString(fields[i].name, *ostr); |
51 | writeCString(",\n" , *ostr); |
52 | writeCString("\t\t\t\"type\": " , *ostr); |
53 | writeJSONString(fields[i].type->getName(), *ostr, settings); |
54 | writeChar('\n', *ostr); |
55 | |
56 | writeCString("\t\t}" , *ostr); |
57 | if (i + 1 < fields.size()) |
58 | writeChar(',', *ostr); |
59 | writeChar('\n', *ostr); |
60 | } |
61 | |
62 | writeCString("\t],\n" , *ostr); |
63 | writeChar('\n', *ostr); |
64 | writeCString("\t\"data\":\n" , *ostr); |
65 | writeCString("\t[\n" , *ostr); |
66 | } |
67 | |
68 | |
69 | void JSONRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) |
70 | { |
71 | writeCString("\t\t\t" , *ostr); |
72 | writeString(fields[field_number].name, *ostr); |
73 | writeCString(": " , *ostr); |
74 | type.serializeAsTextJSON(column, row_num, *ostr, settings); |
75 | ++field_number; |
76 | } |
77 | |
78 | void JSONRowOutputFormat::writeTotalsField(const IColumn & column, const IDataType & type, size_t row_num) |
79 | { |
80 | writeCString("\t\t" , *ostr); |
81 | writeString(fields[field_number].name, *ostr); |
82 | writeCString(": " , *ostr); |
83 | type.serializeAsTextJSON(column, row_num, *ostr, settings); |
84 | ++field_number; |
85 | } |
86 | |
87 | void JSONRowOutputFormat::writeFieldDelimiter() |
88 | { |
89 | writeCString(",\n" , *ostr); |
90 | } |
91 | |
92 | |
93 | void JSONRowOutputFormat::writeRowStartDelimiter() |
94 | { |
95 | writeCString("\t\t{\n" , *ostr); |
96 | } |
97 | |
98 | |
99 | void JSONRowOutputFormat::writeRowEndDelimiter() |
100 | { |
101 | writeChar('\n', *ostr); |
102 | writeCString("\t\t}" , *ostr); |
103 | field_number = 0; |
104 | ++row_count; |
105 | } |
106 | |
107 | |
108 | void JSONRowOutputFormat::writeRowBetweenDelimiter() |
109 | { |
110 | writeCString(",\n" , *ostr); |
111 | } |
112 | |
113 | |
114 | void JSONRowOutputFormat::writeSuffix() |
115 | { |
116 | writeChar('\n', *ostr); |
117 | writeCString("\t]" , *ostr); |
118 | } |
119 | |
120 | void JSONRowOutputFormat::writeBeforeTotals() |
121 | { |
122 | writeCString(",\n" , *ostr); |
123 | writeChar('\n', *ostr); |
124 | writeCString("\t\"totals\":\n" , *ostr); |
125 | writeCString("\t{\n" , *ostr); |
126 | } |
127 | |
128 | void JSONRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) |
129 | { |
130 | size_t num_columns = columns.size(); |
131 | |
132 | for (size_t i = 0; i < num_columns; ++i) |
133 | { |
134 | if (i != 0) |
135 | writeTotalsFieldDelimiter(); |
136 | |
137 | writeTotalsField(*columns[i], *types[i], row_num); |
138 | } |
139 | } |
140 | |
141 | void JSONRowOutputFormat::writeAfterTotals() |
142 | { |
143 | writeChar('\n', *ostr); |
144 | writeCString("\t}" , *ostr); |
145 | field_number = 0; |
146 | } |
147 | |
148 | void JSONRowOutputFormat::writeBeforeExtremes() |
149 | { |
150 | writeCString(",\n" , *ostr); |
151 | writeChar('\n', *ostr); |
152 | writeCString("\t\"extremes\":\n" , *ostr); |
153 | writeCString("\t{\n" , *ostr); |
154 | } |
155 | |
156 | void JSONRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num) |
157 | { |
158 | writeCString("\t\t\"" , *ostr); |
159 | writeCString(title, *ostr); |
160 | writeCString("\":\n" , *ostr); |
161 | writeCString("\t\t{\n" , *ostr); |
162 | |
163 | size_t extremes_columns = columns.size(); |
164 | for (size_t i = 0; i < extremes_columns; ++i) |
165 | { |
166 | if (i != 0) |
167 | writeFieldDelimiter(); |
168 | |
169 | writeField(*columns[i], *types[i], row_num); |
170 | } |
171 | |
172 | writeChar('\n', *ostr); |
173 | writeCString("\t\t}" , *ostr); |
174 | field_number = 0; |
175 | } |
176 | |
177 | void JSONRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num) |
178 | { |
179 | writeExtremesElement("min" , columns, row_num); |
180 | } |
181 | |
182 | void JSONRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num) |
183 | { |
184 | writeExtremesElement("max" , columns, row_num); |
185 | } |
186 | |
187 | void JSONRowOutputFormat::writeAfterExtremes() |
188 | { |
189 | writeChar('\n', *ostr); |
190 | writeCString("\t}" , *ostr); |
191 | } |
192 | |
193 | void JSONRowOutputFormat::writeLastSuffix() |
194 | { |
195 | writeCString(",\n\n" , *ostr); |
196 | writeCString("\t\"rows\": " , *ostr); |
197 | writeIntText(row_count, *ostr); |
198 | |
199 | writeRowsBeforeLimitAtLeast(); |
200 | |
201 | if (settings.write_statistics) |
202 | writeStatistics(); |
203 | |
204 | writeChar('\n', *ostr); |
205 | writeCString("}\n" , *ostr); |
206 | ostr->next(); |
207 | } |
208 | |
209 | void JSONRowOutputFormat::writeRowsBeforeLimitAtLeast() |
210 | { |
211 | if (applied_limit) |
212 | { |
213 | writeCString(",\n\n" , *ostr); |
214 | writeCString("\t\"rows_before_limit_at_least\": " , *ostr); |
215 | writeIntText(rows_before_limit, *ostr); |
216 | } |
217 | } |
218 | |
219 | void JSONRowOutputFormat::writeStatistics() |
220 | { |
221 | writeCString(",\n\n" , *ostr); |
222 | writeCString("\t\"statistics\":\n" , *ostr); |
223 | writeCString("\t{\n" , *ostr); |
224 | |
225 | writeCString("\t\t\"elapsed\": " , *ostr); |
226 | writeText(watch.elapsedSeconds(), *ostr); |
227 | writeCString(",\n" , *ostr); |
228 | writeCString("\t\t\"rows_read\": " , *ostr); |
229 | writeText(progress.read_rows.load(), *ostr); |
230 | writeCString(",\n" , *ostr); |
231 | writeCString("\t\t\"bytes_read\": " , *ostr); |
232 | writeText(progress.read_bytes.load(), *ostr); |
233 | writeChar('\n', *ostr); |
234 | |
235 | writeCString("\t}" , *ostr); |
236 | } |
237 | |
238 | void JSONRowOutputFormat::onProgress(const Progress & value) |
239 | { |
240 | progress.incrementPiecewiseAtomically(value); |
241 | } |
242 | |
243 | |
244 | void registerOutputFormatProcessorJSON(FormatFactory & factory) |
245 | { |
246 | factory.registerOutputFormatProcessor("JSON" , []( |
247 | WriteBuffer & buf, |
248 | const Block & sample, |
249 | FormatFactory::WriteCallback callback, |
250 | const FormatSettings & format_settings) |
251 | { |
252 | return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings); |
253 | }); |
254 | } |
255 | |
256 | } |
257 | |