1#include <IO/WriteHelpers.h>
2#include <IO/WriteBufferValidUTF8.h>
3#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
4#include <Formats/FormatFactory.h>
5
6
7namespace DB
8{
9
10JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
11 : IRowOutputFormat(header, out_, callback), settings(settings_)
12{
13 auto & sample = getPort(PortKind::Main).getHeader();
14 NamesAndTypesList columns(sample.getNamesAndTypesList());
15 fields.assign(columns.begin(), columns.end());
16
17 bool need_validate_utf8 = false;
18 for (size_t i = 0; i < sample.columns(); ++i)
19 {
20 if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8())
21 need_validate_utf8 = true;
22
23 WriteBufferFromOwnString buf;
24 writeJSONString(fields[i].name, buf, settings);
25
26 fields[i].name = buf.str();
27 }
28
29 if (need_validate_utf8)
30 {
31 validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
32 ostr = validating_ostr.get();
33 }
34 else
35 ostr = &out;
36}
37
38
39void JSONRowOutputFormat::writePrefix()
40{
41 writeCString("{\n", *ostr);
42 writeCString("\t\"meta\":\n", *ostr);
43 writeCString("\t[\n", *ostr);
44
45 for (size_t i = 0; i < fields.size(); ++i)
46 {
47 writeCString("\t\t{\n", *ostr);
48
49 writeCString("\t\t\t\"name\": ", *ostr);
50 writeString(fields[i].name, *ostr);
51 writeCString(",\n", *ostr);
52 writeCString("\t\t\t\"type\": ", *ostr);
53 writeJSONString(fields[i].type->getName(), *ostr, settings);
54 writeChar('\n', *ostr);
55
56 writeCString("\t\t}", *ostr);
57 if (i + 1 < fields.size())
58 writeChar(',', *ostr);
59 writeChar('\n', *ostr);
60 }
61
62 writeCString("\t],\n", *ostr);
63 writeChar('\n', *ostr);
64 writeCString("\t\"data\":\n", *ostr);
65 writeCString("\t[\n", *ostr);
66}
67
68
69void JSONRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
70{
71 writeCString("\t\t\t", *ostr);
72 writeString(fields[field_number].name, *ostr);
73 writeCString(": ", *ostr);
74 type.serializeAsTextJSON(column, row_num, *ostr, settings);
75 ++field_number;
76}
77
78void JSONRowOutputFormat::writeTotalsField(const IColumn & column, const IDataType & type, size_t row_num)
79{
80 writeCString("\t\t", *ostr);
81 writeString(fields[field_number].name, *ostr);
82 writeCString(": ", *ostr);
83 type.serializeAsTextJSON(column, row_num, *ostr, settings);
84 ++field_number;
85}
86
87void JSONRowOutputFormat::writeFieldDelimiter()
88{
89 writeCString(",\n", *ostr);
90}
91
92
93void JSONRowOutputFormat::writeRowStartDelimiter()
94{
95 writeCString("\t\t{\n", *ostr);
96}
97
98
99void JSONRowOutputFormat::writeRowEndDelimiter()
100{
101 writeChar('\n', *ostr);
102 writeCString("\t\t}", *ostr);
103 field_number = 0;
104 ++row_count;
105}
106
107
108void JSONRowOutputFormat::writeRowBetweenDelimiter()
109{
110 writeCString(",\n", *ostr);
111}
112
113
114void JSONRowOutputFormat::writeSuffix()
115{
116 writeChar('\n', *ostr);
117 writeCString("\t]", *ostr);
118}
119
120void JSONRowOutputFormat::writeBeforeTotals()
121{
122 writeCString(",\n", *ostr);
123 writeChar('\n', *ostr);
124 writeCString("\t\"totals\":\n", *ostr);
125 writeCString("\t{\n", *ostr);
126}
127
128void JSONRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
129{
130 size_t num_columns = columns.size();
131
132 for (size_t i = 0; i < num_columns; ++i)
133 {
134 if (i != 0)
135 writeTotalsFieldDelimiter();
136
137 writeTotalsField(*columns[i], *types[i], row_num);
138 }
139}
140
141void JSONRowOutputFormat::writeAfterTotals()
142{
143 writeChar('\n', *ostr);
144 writeCString("\t}", *ostr);
145 field_number = 0;
146}
147
148void JSONRowOutputFormat::writeBeforeExtremes()
149{
150 writeCString(",\n", *ostr);
151 writeChar('\n', *ostr);
152 writeCString("\t\"extremes\":\n", *ostr);
153 writeCString("\t{\n", *ostr);
154}
155
156void JSONRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
157{
158 writeCString("\t\t\"", *ostr);
159 writeCString(title, *ostr);
160 writeCString("\":\n", *ostr);
161 writeCString("\t\t{\n", *ostr);
162
163 size_t extremes_columns = columns.size();
164 for (size_t i = 0; i < extremes_columns; ++i)
165 {
166 if (i != 0)
167 writeFieldDelimiter();
168
169 writeField(*columns[i], *types[i], row_num);
170 }
171
172 writeChar('\n', *ostr);
173 writeCString("\t\t}", *ostr);
174 field_number = 0;
175}
176
177void JSONRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num)
178{
179 writeExtremesElement("min", columns, row_num);
180}
181
182void JSONRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num)
183{
184 writeExtremesElement("max", columns, row_num);
185}
186
187void JSONRowOutputFormat::writeAfterExtremes()
188{
189 writeChar('\n', *ostr);
190 writeCString("\t}", *ostr);
191}
192
193void JSONRowOutputFormat::writeLastSuffix()
194{
195 writeCString(",\n\n", *ostr);
196 writeCString("\t\"rows\": ", *ostr);
197 writeIntText(row_count, *ostr);
198
199 writeRowsBeforeLimitAtLeast();
200
201 if (settings.write_statistics)
202 writeStatistics();
203
204 writeChar('\n', *ostr);
205 writeCString("}\n", *ostr);
206 ostr->next();
207}
208
209void JSONRowOutputFormat::writeRowsBeforeLimitAtLeast()
210{
211 if (applied_limit)
212 {
213 writeCString(",\n\n", *ostr);
214 writeCString("\t\"rows_before_limit_at_least\": ", *ostr);
215 writeIntText(rows_before_limit, *ostr);
216 }
217}
218
219void JSONRowOutputFormat::writeStatistics()
220{
221 writeCString(",\n\n", *ostr);
222 writeCString("\t\"statistics\":\n", *ostr);
223 writeCString("\t{\n", *ostr);
224
225 writeCString("\t\t\"elapsed\": ", *ostr);
226 writeText(watch.elapsedSeconds(), *ostr);
227 writeCString(",\n", *ostr);
228 writeCString("\t\t\"rows_read\": ", *ostr);
229 writeText(progress.read_rows.load(), *ostr);
230 writeCString(",\n", *ostr);
231 writeCString("\t\t\"bytes_read\": ", *ostr);
232 writeText(progress.read_bytes.load(), *ostr);
233 writeChar('\n', *ostr);
234
235 writeCString("\t}", *ostr);
236}
237
238void JSONRowOutputFormat::onProgress(const Progress & value)
239{
240 progress.incrementPiecewiseAtomically(value);
241}
242
243
244void registerOutputFormatProcessorJSON(FormatFactory & factory)
245{
246 factory.registerOutputFormatProcessor("JSON", [](
247 WriteBuffer & buf,
248 const Block & sample,
249 FormatFactory::WriteCallback callback,
250 const FormatSettings & format_settings)
251 {
252 return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings);
253 });
254}
255
256}
257