| 1 | #include <IO/WriteHelpers.h> |
| 2 | #include <IO/WriteBufferValidUTF8.h> |
| 3 | #include <Processors/Formats/Impl/JSONRowOutputFormat.h> |
| 4 | #include <Formats/FormatFactory.h> |
| 5 | |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | |
| 10 | JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & , FormatFactory::WriteCallback callback, const FormatSettings & settings_) |
| 11 | : IRowOutputFormat(header, out_, callback), settings(settings_) |
| 12 | { |
| 13 | auto & sample = getPort(PortKind::Main).getHeader(); |
| 14 | NamesAndTypesList columns(sample.getNamesAndTypesList()); |
| 15 | fields.assign(columns.begin(), columns.end()); |
| 16 | |
| 17 | bool need_validate_utf8 = false; |
| 18 | for (size_t i = 0; i < sample.columns(); ++i) |
| 19 | { |
| 20 | if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8()) |
| 21 | need_validate_utf8 = true; |
| 22 | |
| 23 | WriteBufferFromOwnString buf; |
| 24 | writeJSONString(fields[i].name, buf, settings); |
| 25 | |
| 26 | fields[i].name = buf.str(); |
| 27 | } |
| 28 | |
| 29 | if (need_validate_utf8) |
| 30 | { |
| 31 | validating_ostr = std::make_unique<WriteBufferValidUTF8>(out); |
| 32 | ostr = validating_ostr.get(); |
| 33 | } |
| 34 | else |
| 35 | ostr = &out; |
| 36 | } |
| 37 | |
| 38 | |
| 39 | void JSONRowOutputFormat::writePrefix() |
| 40 | { |
| 41 | writeCString("{\n" , *ostr); |
| 42 | writeCString("\t\"meta\":\n" , *ostr); |
| 43 | writeCString("\t[\n" , *ostr); |
| 44 | |
| 45 | for (size_t i = 0; i < fields.size(); ++i) |
| 46 | { |
| 47 | writeCString("\t\t{\n" , *ostr); |
| 48 | |
| 49 | writeCString("\t\t\t\"name\": " , *ostr); |
| 50 | writeString(fields[i].name, *ostr); |
| 51 | writeCString(",\n" , *ostr); |
| 52 | writeCString("\t\t\t\"type\": " , *ostr); |
| 53 | writeJSONString(fields[i].type->getName(), *ostr, settings); |
| 54 | writeChar('\n', *ostr); |
| 55 | |
| 56 | writeCString("\t\t}" , *ostr); |
| 57 | if (i + 1 < fields.size()) |
| 58 | writeChar(',', *ostr); |
| 59 | writeChar('\n', *ostr); |
| 60 | } |
| 61 | |
| 62 | writeCString("\t],\n" , *ostr); |
| 63 | writeChar('\n', *ostr); |
| 64 | writeCString("\t\"data\":\n" , *ostr); |
| 65 | writeCString("\t[\n" , *ostr); |
| 66 | } |
| 67 | |
| 68 | |
| 69 | void JSONRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num) |
| 70 | { |
| 71 | writeCString("\t\t\t" , *ostr); |
| 72 | writeString(fields[field_number].name, *ostr); |
| 73 | writeCString(": " , *ostr); |
| 74 | type.serializeAsTextJSON(column, row_num, *ostr, settings); |
| 75 | ++field_number; |
| 76 | } |
| 77 | |
| 78 | void JSONRowOutputFormat::writeTotalsField(const IColumn & column, const IDataType & type, size_t row_num) |
| 79 | { |
| 80 | writeCString("\t\t" , *ostr); |
| 81 | writeString(fields[field_number].name, *ostr); |
| 82 | writeCString(": " , *ostr); |
| 83 | type.serializeAsTextJSON(column, row_num, *ostr, settings); |
| 84 | ++field_number; |
| 85 | } |
| 86 | |
| 87 | void JSONRowOutputFormat::writeFieldDelimiter() |
| 88 | { |
| 89 | writeCString(",\n" , *ostr); |
| 90 | } |
| 91 | |
| 92 | |
| 93 | void JSONRowOutputFormat::writeRowStartDelimiter() |
| 94 | { |
| 95 | writeCString("\t\t{\n" , *ostr); |
| 96 | } |
| 97 | |
| 98 | |
| 99 | void JSONRowOutputFormat::writeRowEndDelimiter() |
| 100 | { |
| 101 | writeChar('\n', *ostr); |
| 102 | writeCString("\t\t}" , *ostr); |
| 103 | field_number = 0; |
| 104 | ++row_count; |
| 105 | } |
| 106 | |
| 107 | |
| 108 | void JSONRowOutputFormat::writeRowBetweenDelimiter() |
| 109 | { |
| 110 | writeCString(",\n" , *ostr); |
| 111 | } |
| 112 | |
| 113 | |
| 114 | void JSONRowOutputFormat::writeSuffix() |
| 115 | { |
| 116 | writeChar('\n', *ostr); |
| 117 | writeCString("\t]" , *ostr); |
| 118 | } |
| 119 | |
| 120 | void JSONRowOutputFormat::writeBeforeTotals() |
| 121 | { |
| 122 | writeCString(",\n" , *ostr); |
| 123 | writeChar('\n', *ostr); |
| 124 | writeCString("\t\"totals\":\n" , *ostr); |
| 125 | writeCString("\t{\n" , *ostr); |
| 126 | } |
| 127 | |
| 128 | void JSONRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) |
| 129 | { |
| 130 | size_t num_columns = columns.size(); |
| 131 | |
| 132 | for (size_t i = 0; i < num_columns; ++i) |
| 133 | { |
| 134 | if (i != 0) |
| 135 | writeTotalsFieldDelimiter(); |
| 136 | |
| 137 | writeTotalsField(*columns[i], *types[i], row_num); |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | void JSONRowOutputFormat::writeAfterTotals() |
| 142 | { |
| 143 | writeChar('\n', *ostr); |
| 144 | writeCString("\t}" , *ostr); |
| 145 | field_number = 0; |
| 146 | } |
| 147 | |
| 148 | void JSONRowOutputFormat::writeBeforeExtremes() |
| 149 | { |
| 150 | writeCString(",\n" , *ostr); |
| 151 | writeChar('\n', *ostr); |
| 152 | writeCString("\t\"extremes\":\n" , *ostr); |
| 153 | writeCString("\t{\n" , *ostr); |
| 154 | } |
| 155 | |
| 156 | void JSONRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num) |
| 157 | { |
| 158 | writeCString("\t\t\"" , *ostr); |
| 159 | writeCString(title, *ostr); |
| 160 | writeCString("\":\n" , *ostr); |
| 161 | writeCString("\t\t{\n" , *ostr); |
| 162 | |
| 163 | size_t extremes_columns = columns.size(); |
| 164 | for (size_t i = 0; i < extremes_columns; ++i) |
| 165 | { |
| 166 | if (i != 0) |
| 167 | writeFieldDelimiter(); |
| 168 | |
| 169 | writeField(*columns[i], *types[i], row_num); |
| 170 | } |
| 171 | |
| 172 | writeChar('\n', *ostr); |
| 173 | writeCString("\t\t}" , *ostr); |
| 174 | field_number = 0; |
| 175 | } |
| 176 | |
| 177 | void JSONRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num) |
| 178 | { |
| 179 | writeExtremesElement("min" , columns, row_num); |
| 180 | } |
| 181 | |
| 182 | void JSONRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num) |
| 183 | { |
| 184 | writeExtremesElement("max" , columns, row_num); |
| 185 | } |
| 186 | |
| 187 | void JSONRowOutputFormat::writeAfterExtremes() |
| 188 | { |
| 189 | writeChar('\n', *ostr); |
| 190 | writeCString("\t}" , *ostr); |
| 191 | } |
| 192 | |
| 193 | void JSONRowOutputFormat::writeLastSuffix() |
| 194 | { |
| 195 | writeCString(",\n\n" , *ostr); |
| 196 | writeCString("\t\"rows\": " , *ostr); |
| 197 | writeIntText(row_count, *ostr); |
| 198 | |
| 199 | writeRowsBeforeLimitAtLeast(); |
| 200 | |
| 201 | if (settings.write_statistics) |
| 202 | writeStatistics(); |
| 203 | |
| 204 | writeChar('\n', *ostr); |
| 205 | writeCString("}\n" , *ostr); |
| 206 | ostr->next(); |
| 207 | } |
| 208 | |
| 209 | void JSONRowOutputFormat::writeRowsBeforeLimitAtLeast() |
| 210 | { |
| 211 | if (applied_limit) |
| 212 | { |
| 213 | writeCString(",\n\n" , *ostr); |
| 214 | writeCString("\t\"rows_before_limit_at_least\": " , *ostr); |
| 215 | writeIntText(rows_before_limit, *ostr); |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | void JSONRowOutputFormat::writeStatistics() |
| 220 | { |
| 221 | writeCString(",\n\n" , *ostr); |
| 222 | writeCString("\t\"statistics\":\n" , *ostr); |
| 223 | writeCString("\t{\n" , *ostr); |
| 224 | |
| 225 | writeCString("\t\t\"elapsed\": " , *ostr); |
| 226 | writeText(watch.elapsedSeconds(), *ostr); |
| 227 | writeCString(",\n" , *ostr); |
| 228 | writeCString("\t\t\"rows_read\": " , *ostr); |
| 229 | writeText(progress.read_rows.load(), *ostr); |
| 230 | writeCString(",\n" , *ostr); |
| 231 | writeCString("\t\t\"bytes_read\": " , *ostr); |
| 232 | writeText(progress.read_bytes.load(), *ostr); |
| 233 | writeChar('\n', *ostr); |
| 234 | |
| 235 | writeCString("\t}" , *ostr); |
| 236 | } |
| 237 | |
| 238 | void JSONRowOutputFormat::onProgress(const Progress & value) |
| 239 | { |
| 240 | progress.incrementPiecewiseAtomically(value); |
| 241 | } |
| 242 | |
| 243 | |
| 244 | void registerOutputFormatProcessorJSON(FormatFactory & factory) |
| 245 | { |
| 246 | factory.registerOutputFormatProcessor("JSON" , []( |
| 247 | WriteBuffer & buf, |
| 248 | const Block & sample, |
| 249 | FormatFactory::WriteCallback callback, |
| 250 | const FormatSettings & format_settings) |
| 251 | { |
| 252 | return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings); |
| 253 | }); |
| 254 | } |
| 255 | |
| 256 | } |
| 257 | |