1 | #include <IO/WriteHelpers.h> |
2 | #include <IO/WriteBufferValidUTF8.h> |
3 | #include <Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h> |
4 | #include <Formats/FormatFactory.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | |
11 | JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_, |
12 | const Block & , |
13 | FormatFactory::WriteCallback callback, |
14 | const FormatSettings & settings_, |
15 | bool with_names_) |
16 | : IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_) |
17 | { |
18 | auto & sample = getPort(PortKind::Main).getHeader(); |
19 | NamesAndTypesList columns(sample.getNamesAndTypesList()); |
20 | fields.assign(columns.begin(), columns.end()); |
21 | } |
22 | |
23 | |
24 | void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) |
25 | { |
26 | type.serializeAsTextJSON(column, row_num, out, settings); |
27 | } |
28 | |
29 | |
30 | void JSONCompactEachRowRowOutputFormat::writeFieldDelimiter() |
31 | { |
32 | writeCString(", " , out); |
33 | } |
34 | |
35 | |
36 | void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter() |
37 | { |
38 | writeChar('[', out); |
39 | } |
40 | |
41 | |
42 | void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter() |
43 | { |
44 | writeCString("]\n" , out); |
45 | } |
46 | |
47 | void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) |
48 | { |
49 | writeChar('\n', out); |
50 | size_t num_columns = columns.size(); |
51 | writeChar('[', out); |
52 | for (size_t i = 0; i < num_columns; ++i) |
53 | { |
54 | if (i != 0) |
55 | JSONCompactEachRowRowOutputFormat::writeFieldDelimiter(); |
56 | |
57 | JSONCompactEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num); |
58 | } |
59 | writeCString("]\n" , out); |
60 | } |
61 | |
62 | void JSONCompactEachRowRowOutputFormat::writePrefix() |
63 | { |
64 | if (with_names) |
65 | { |
66 | writeChar('[', out); |
67 | for (size_t i = 0; i < fields.size(); ++i) |
68 | { |
69 | writeChar('\"', out); |
70 | writeString(fields[i].name, out); |
71 | writeChar('\"', out); |
72 | if (i != fields.size() - 1) |
73 | writeCString(", " , out); |
74 | } |
75 | writeCString("]\n[" , out); |
76 | for (size_t i = 0; i < fields.size(); ++i) |
77 | { |
78 | writeJSONString(fields[i].type->getName(), out, settings); |
79 | if (i != fields.size() - 1) |
80 | writeCString(", " , out); |
81 | } |
82 | writeCString("]\n" , out); |
83 | } |
84 | } |
85 | |
86 | void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk) |
87 | { |
88 | if (with_names) |
89 | IRowOutputFormat::consumeTotals(std::move(chunk)); |
90 | } |
91 | |
92 | void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) |
93 | { |
94 | factory.registerOutputFormatProcessor("JSONCompactEachRow" , []( |
95 | WriteBuffer & buf, |
96 | const Block & sample, |
97 | FormatFactory::WriteCallback callback, |
98 | const FormatSettings & format_settings) |
99 | { |
100 | return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false); |
101 | }); |
102 | |
103 | factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes" , []( |
104 | WriteBuffer &buf, |
105 | const Block &sample, |
106 | FormatFactory::WriteCallback callback, |
107 | const FormatSettings &format_settings) |
108 | { |
109 | return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true); |
110 | }); |
111 | } |
112 | |
113 | |
114 | } |
115 | |