| 1 | #include <IO/WriteHelpers.h> |
| 2 | #include <IO/WriteBufferValidUTF8.h> |
| 3 | #include <Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h> |
| 4 | #include <Formats/FormatFactory.h> |
| 5 | |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | |
| 10 | |
| 11 | JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_, |
| 12 | const Block & , |
| 13 | FormatFactory::WriteCallback callback, |
| 14 | const FormatSettings & settings_, |
| 15 | bool with_names_) |
| 16 | : IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_) |
| 17 | { |
| 18 | auto & sample = getPort(PortKind::Main).getHeader(); |
| 19 | NamesAndTypesList columns(sample.getNamesAndTypesList()); |
| 20 | fields.assign(columns.begin(), columns.end()); |
| 21 | } |
| 22 | |
| 23 | |
| 24 | void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) |
| 25 | { |
| 26 | type.serializeAsTextJSON(column, row_num, out, settings); |
| 27 | } |
| 28 | |
| 29 | |
| 30 | void JSONCompactEachRowRowOutputFormat::writeFieldDelimiter() |
| 31 | { |
| 32 | writeCString(", " , out); |
| 33 | } |
| 34 | |
| 35 | |
| 36 | void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter() |
| 37 | { |
| 38 | writeChar('[', out); |
| 39 | } |
| 40 | |
| 41 | |
| 42 | void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter() |
| 43 | { |
| 44 | writeCString("]\n" , out); |
| 45 | } |
| 46 | |
| 47 | void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) |
| 48 | { |
| 49 | writeChar('\n', out); |
| 50 | size_t num_columns = columns.size(); |
| 51 | writeChar('[', out); |
| 52 | for (size_t i = 0; i < num_columns; ++i) |
| 53 | { |
| 54 | if (i != 0) |
| 55 | JSONCompactEachRowRowOutputFormat::writeFieldDelimiter(); |
| 56 | |
| 57 | JSONCompactEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num); |
| 58 | } |
| 59 | writeCString("]\n" , out); |
| 60 | } |
| 61 | |
| 62 | void JSONCompactEachRowRowOutputFormat::writePrefix() |
| 63 | { |
| 64 | if (with_names) |
| 65 | { |
| 66 | writeChar('[', out); |
| 67 | for (size_t i = 0; i < fields.size(); ++i) |
| 68 | { |
| 69 | writeChar('\"', out); |
| 70 | writeString(fields[i].name, out); |
| 71 | writeChar('\"', out); |
| 72 | if (i != fields.size() - 1) |
| 73 | writeCString(", " , out); |
| 74 | } |
| 75 | writeCString("]\n[" , out); |
| 76 | for (size_t i = 0; i < fields.size(); ++i) |
| 77 | { |
| 78 | writeJSONString(fields[i].type->getName(), out, settings); |
| 79 | if (i != fields.size() - 1) |
| 80 | writeCString(", " , out); |
| 81 | } |
| 82 | writeCString("]\n" , out); |
| 83 | } |
| 84 | } |
| 85 | |
| 86 | void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk) |
| 87 | { |
| 88 | if (with_names) |
| 89 | IRowOutputFormat::consumeTotals(std::move(chunk)); |
| 90 | } |
| 91 | |
| 92 | void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory) |
| 93 | { |
| 94 | factory.registerOutputFormatProcessor("JSONCompactEachRow" , []( |
| 95 | WriteBuffer & buf, |
| 96 | const Block & sample, |
| 97 | FormatFactory::WriteCallback callback, |
| 98 | const FormatSettings & format_settings) |
| 99 | { |
| 100 | return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false); |
| 101 | }); |
| 102 | |
| 103 | factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes" , []( |
| 104 | WriteBuffer &buf, |
| 105 | const Block &sample, |
| 106 | FormatFactory::WriteCallback callback, |
| 107 | const FormatSettings &format_settings) |
| 108 | { |
| 109 | return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true); |
| 110 | }); |
| 111 | } |
| 112 | |
| 113 | |
| 114 | } |
| 115 | |