1#include <IO/WriteHelpers.h>
2#include <IO/WriteBufferValidUTF8.h>
3#include <Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h>
4#include <Formats/FormatFactory.h>
5
6
7namespace DB
8{
9
10
11JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
12 const Block & header_,
13 FormatFactory::WriteCallback callback,
14 const FormatSettings & settings_,
15 bool with_names_)
16 : IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_)
17{
18 auto & sample = getPort(PortKind::Main).getHeader();
19 NamesAndTypesList columns(sample.getNamesAndTypesList());
20 fields.assign(columns.begin(), columns.end());
21}
22
23
24void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
25{
26 type.serializeAsTextJSON(column, row_num, out, settings);
27}
28
29
30void JSONCompactEachRowRowOutputFormat::writeFieldDelimiter()
31{
32 writeCString(", ", out);
33}
34
35
36void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter()
37{
38 writeChar('[', out);
39}
40
41
42void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter()
43{
44 writeCString("]\n", out);
45}
46
47void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
48{
49 writeChar('\n', out);
50 size_t num_columns = columns.size();
51 writeChar('[', out);
52 for (size_t i = 0; i < num_columns; ++i)
53 {
54 if (i != 0)
55 JSONCompactEachRowRowOutputFormat::writeFieldDelimiter();
56
57 JSONCompactEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num);
58 }
59 writeCString("]\n", out);
60}
61
62void JSONCompactEachRowRowOutputFormat::writePrefix()
63{
64 if (with_names)
65 {
66 writeChar('[', out);
67 for (size_t i = 0; i < fields.size(); ++i)
68 {
69 writeChar('\"', out);
70 writeString(fields[i].name, out);
71 writeChar('\"', out);
72 if (i != fields.size() - 1)
73 writeCString(", ", out);
74 }
75 writeCString("]\n[", out);
76 for (size_t i = 0; i < fields.size(); ++i)
77 {
78 writeJSONString(fields[i].type->getName(), out, settings);
79 if (i != fields.size() - 1)
80 writeCString(", ", out);
81 }
82 writeCString("]\n", out);
83 }
84}
85
86void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
87{
88 if (with_names)
89 IRowOutputFormat::consumeTotals(std::move(chunk));
90}
91
92void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
93{
94 factory.registerOutputFormatProcessor("JSONCompactEachRow", [](
95 WriteBuffer & buf,
96 const Block & sample,
97 FormatFactory::WriteCallback callback,
98 const FormatSettings & format_settings)
99 {
100 return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
101 });
102
103 factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
104 WriteBuffer &buf,
105 const Block &sample,
106 FormatFactory::WriteCallback callback,
107 const FormatSettings &format_settings)
108 {
109 return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
110 });
111}
112
113
114}
115