1#include <IO/WriteHelpers.h>
2#include <IO/WriteBufferValidUTF8.h>
3#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
4#include <Formats/FormatFactory.h>
5
6
7namespace DB
8{
9
10
11JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
12 : IRowOutputFormat(header_, out_, callback), settings(settings_)
13{
14 auto & sample = getPort(PortKind::Main).getHeader();
15 size_t columns = sample.columns();
16 fields.resize(columns);
17
18 for (size_t i = 0; i < columns; ++i)
19 {
20 WriteBufferFromString buf(fields[i]);
21 writeJSONString(sample.getByPosition(i).name, buf, settings);
22 }
23}
24
25
26void JSONEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
27{
28 writeString(fields[field_number], out);
29 writeChar(':', out);
30 type.serializeAsTextJSON(column, row_num, out, settings);
31 ++field_number;
32}
33
34
35void JSONEachRowRowOutputFormat::writeFieldDelimiter()
36{
37 writeChar(',', out);
38}
39
40
41void JSONEachRowRowOutputFormat::writeRowStartDelimiter()
42{
43 writeChar('{', out);
44}
45
46
47void JSONEachRowRowOutputFormat::writeRowEndDelimiter()
48{
49 writeCString("}\n", out);
50 field_number = 0;
51}
52
53
54void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
55{
56 factory.registerOutputFormatProcessor("JSONEachRow", [](
57 WriteBuffer & buf,
58 const Block & sample,
59 FormatFactory::WriteCallback callback,
60 const FormatSettings & format_settings)
61 {
62 return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings);
63 });
64}
65
66}
67