| 1 | #include <IO/ReadHelpers.h> |
| 2 | |
| 3 | #include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h> |
| 4 | #include <Formats/FormatFactory.h> |
| 5 | #include <DataTypes/NestedUtils.h> |
| 6 | #include <DataTypes/DataTypeNullable.h> |
| 7 | |
| 8 | namespace DB |
| 9 | { |
| 10 | |
| 11 | namespace ErrorCodes |
| 12 | { |
| 13 | extern const int INCORRECT_DATA; |
| 14 | extern const int CANNOT_READ_ALL_DATA; |
| 15 | extern const int LOGICAL_ERROR; |
| 16 | } |
| 17 | |
| 18 | namespace |
| 19 | { |
| 20 | |
| 21 | enum |
| 22 | { |
| 23 | UNKNOWN_FIELD = size_t(-1), |
| 24 | NESTED_FIELD = size_t(-2) |
| 25 | }; |
| 26 | |
| 27 | } |
| 28 | |
| 29 | |
| 30 | JSONEachRowRowInputFormat::JSONEachRowRowInputFormat( |
| 31 | ReadBuffer & in_, const Block & , Params params_, const FormatSettings & format_settings_) |
| 32 | : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()) |
| 33 | { |
| 34 | /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. |
| 35 | skipBOMIfExists(in); |
| 36 | |
| 37 | size_t num_columns = getPort().getHeader().columns(); |
| 38 | for (size_t i = 0; i < num_columns; ++i) |
| 39 | { |
| 40 | const String & column_name = columnName(i); |
| 41 | name_map[column_name] = i; /// NOTE You could place names more cache-locally. |
| 42 | if (format_settings_.import_nested_json) |
| 43 | { |
| 44 | const auto splitted = Nested::splitName(column_name); |
| 45 | if (!splitted.second.empty()) |
| 46 | { |
| 47 | const StringRef table_name(column_name.data(), splitted.first.size()); |
| 48 | name_map[table_name] = NESTED_FIELD; |
| 49 | } |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | prev_positions.resize(num_columns); |
| 54 | } |
| 55 | |
| 56 | const String & JSONEachRowRowInputFormat::columnName(size_t i) const |
| 57 | { |
| 58 | return getPort().getHeader().getByPosition(i).name; |
| 59 | } |
| 60 | |
| 61 | inline size_t JSONEachRowRowInputFormat::columnIndex(const StringRef & name, size_t key_index) |
| 62 | { |
| 63 | /// Optimization by caching the order of fields (which is almost always the same) |
| 64 | /// and a quick check to match the next expected field, instead of searching the hash table. |
| 65 | |
| 66 | if (prev_positions.size() > key_index |
| 67 | && prev_positions[key_index] |
| 68 | && name == prev_positions[key_index]->getKey()) |
| 69 | { |
| 70 | return prev_positions[key_index]->getMapped(); |
| 71 | } |
| 72 | else |
| 73 | { |
| 74 | const auto it = name_map.find(name); |
| 75 | |
| 76 | if (it) |
| 77 | { |
| 78 | if (key_index < prev_positions.size()) |
| 79 | prev_positions[key_index] = it; |
| 80 | |
| 81 | return it->getMapped(); |
| 82 | } |
| 83 | else |
| 84 | return UNKNOWN_FIELD; |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | /** Read the field name and convert it to column name |
| 89 | * (taking into account the current nested name prefix) |
| 90 | * Resulting StringRef is valid only before next read from buf. |
| 91 | */ |
| 92 | StringRef JSONEachRowRowInputFormat::readColumnName(ReadBuffer & buf) |
| 93 | { |
| 94 | // This is just an optimization: try to avoid copying the name into current_column_name |
| 95 | |
| 96 | if (nested_prefix_length == 0 && buf.position() + 1 < buf.buffer().end()) |
| 97 | { |
| 98 | char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end()); |
| 99 | |
| 100 | if (next_pos != buf.buffer().end() && *next_pos != '\\') |
| 101 | { |
| 102 | /// The most likely option is that there is no escape sequence in the key name, and the entire name is placed in the buffer. |
| 103 | assertChar('"', buf); |
| 104 | StringRef res(buf.position(), next_pos - buf.position()); |
| 105 | buf.position() = next_pos + 1; |
| 106 | return res; |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | current_column_name.resize(nested_prefix_length); |
| 111 | readJSONStringInto(current_column_name, buf); |
| 112 | return current_column_name; |
| 113 | } |
| 114 | |
| 115 | |
| 116 | static inline void skipColonDelimeter(ReadBuffer & istr) |
| 117 | { |
| 118 | skipWhitespaceIfAny(istr); |
| 119 | assertChar(':', istr); |
| 120 | skipWhitespaceIfAny(istr); |
| 121 | } |
| 122 | |
| 123 | void JSONEachRowRowInputFormat::skipUnknownField(const StringRef & name_ref) |
| 124 | { |
| 125 | if (!format_settings.skip_unknown_fields) |
| 126 | throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); |
| 127 | |
| 128 | skipJSONField(in, name_ref); |
| 129 | } |
| 130 | |
| 131 | void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns) |
| 132 | { |
| 133 | if (seen_columns[index]) |
| 134 | throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA); |
| 135 | |
| 136 | try |
| 137 | { |
| 138 | seen_columns[index] = read_columns[index] = true; |
| 139 | const auto & type = getPort().getHeader().getByPosition(index).type; |
| 140 | if (format_settings.null_as_default && !type->isNullable()) |
| 141 | read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type); |
| 142 | else |
| 143 | type->deserializeAsTextJSON(*columns[index], in, format_settings); |
| 144 | } |
| 145 | catch (Exception & e) |
| 146 | { |
| 147 | e.addMessage("(while read the value of key " + columnName(index) + ")" ); |
| 148 | throw; |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index) |
| 153 | { |
| 154 | skipWhitespaceIfAny(in); |
| 155 | |
| 156 | if (in.eof()) |
| 157 | throw Exception("Unexpected end of stream while parsing JSONEachRow format" , ErrorCodes::CANNOT_READ_ALL_DATA); |
| 158 | else if (*in.position() == '}') |
| 159 | { |
| 160 | ++in.position(); |
| 161 | return false; |
| 162 | } |
| 163 | |
| 164 | if (key_index > 0) |
| 165 | { |
| 166 | assertChar(',', in); |
| 167 | skipWhitespaceIfAny(in); |
| 168 | } |
| 169 | return true; |
| 170 | } |
| 171 | |
| 172 | void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns) |
| 173 | { |
| 174 | assertChar('{', in); |
| 175 | |
| 176 | for (size_t key_index = 0; advanceToNextKey(key_index); ++key_index) |
| 177 | { |
| 178 | StringRef name_ref = readColumnName(in); |
| 179 | const size_t column_index = columnIndex(name_ref, key_index); |
| 180 | |
| 181 | if (unlikely(ssize_t(column_index) < 0)) |
| 182 | { |
| 183 | /// name_ref may point directly to the input buffer |
| 184 | /// and input buffer may be filled with new data on next read |
| 185 | /// If we want to use name_ref after another reads from buffer, we must copy it to temporary string. |
| 186 | |
| 187 | current_column_name.assign(name_ref.data, name_ref.size); |
| 188 | name_ref = StringRef(current_column_name); |
| 189 | |
| 190 | skipColonDelimeter(in); |
| 191 | |
| 192 | if (column_index == UNKNOWN_FIELD) |
| 193 | skipUnknownField(name_ref); |
| 194 | else if (column_index == NESTED_FIELD) |
| 195 | readNestedData(name_ref.toString(), columns); |
| 196 | else |
| 197 | throw Exception("Logical error: illegal value of column_index" , ErrorCodes::LOGICAL_ERROR); |
| 198 | } |
| 199 | else |
| 200 | { |
| 201 | skipColonDelimeter(in); |
| 202 | readField(column_index, columns); |
| 203 | } |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | void JSONEachRowRowInputFormat::readNestedData(const String & name, MutableColumns & columns) |
| 208 | { |
| 209 | current_column_name = name; |
| 210 | current_column_name.push_back('.'); |
| 211 | nested_prefix_length = current_column_name.size(); |
| 212 | readJSONObject(columns); |
| 213 | nested_prefix_length = 0; |
| 214 | } |
| 215 | |
| 216 | |
| 217 | bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) |
| 218 | { |
| 219 | skipWhitespaceIfAny(in); |
| 220 | |
| 221 | /// We consume ;, or \n before scanning a new row, instead scanning to next row at the end. |
| 222 | /// The reason is that if we want an exact number of rows read with LIMIT x |
| 223 | /// from a streaming table engine with text data format, like File or Kafka |
| 224 | /// then seeking to next ;, or \n would trigger reading of an extra row at the end. |
| 225 | |
| 226 | /// Semicolon is added for convenience as it could be used at end of INSERT query. |
| 227 | if (!in.eof() && (*in.position() == ',' || *in.position() == ';')) |
| 228 | ++in.position(); |
| 229 | |
| 230 | skipWhitespaceIfAny(in); |
| 231 | if (in.eof()) |
| 232 | return false; |
| 233 | |
| 234 | size_t num_columns = columns.size(); |
| 235 | |
| 236 | read_columns.assign(num_columns, false); |
| 237 | seen_columns.assign(num_columns, false); |
| 238 | |
| 239 | nested_prefix_length = 0; |
| 240 | readJSONObject(columns); |
| 241 | |
| 242 | auto & = getPort().getHeader(); |
| 243 | /// Fill non-visited columns with the default values. |
| 244 | for (size_t i = 0; i < num_columns; ++i) |
| 245 | if (!seen_columns[i]) |
| 246 | header.getByPosition(i).type->insertDefaultInto(*columns[i]); |
| 247 | |
| 248 | /// return info about defaults set |
| 249 | ext.read_columns = read_columns; |
| 250 | return true; |
| 251 | } |
| 252 | |
| 253 | |
| 254 | void JSONEachRowRowInputFormat::syncAfterError() |
| 255 | { |
| 256 | skipToUnescapedNextLineOrEOF(in); |
| 257 | } |
| 258 | |
| 259 | void JSONEachRowRowInputFormat::resetParser() |
| 260 | { |
| 261 | IRowInputFormat::resetParser(); |
| 262 | nested_prefix_length = 0; |
| 263 | read_columns.clear(); |
| 264 | seen_columns.clear(); |
| 265 | prev_positions.clear(); |
| 266 | } |
| 267 | |
| 268 | |
| 269 | void registerInputFormatProcessorJSONEachRow(FormatFactory & factory) |
| 270 | { |
| 271 | factory.registerInputFormatProcessor("JSONEachRow" , []( |
| 272 | ReadBuffer & buf, |
| 273 | const Block & sample, |
| 274 | IRowInputFormat::Params params, |
| 275 | const FormatSettings & settings) |
| 276 | { |
| 277 | return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings); |
| 278 | }); |
| 279 | } |
| 280 | |
| 281 | static bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) |
| 282 | { |
| 283 | skipWhitespaceIfAny(in); |
| 284 | |
| 285 | char * pos = in.position(); |
| 286 | size_t balance = 0; |
| 287 | bool quotes = false; |
| 288 | |
| 289 | while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size)) |
| 290 | { |
| 291 | if (quotes) |
| 292 | { |
| 293 | pos = find_first_symbols<'\\', '"'>(pos, in.buffer().end()); |
| 294 | if (pos == in.buffer().end()) |
| 295 | continue; |
| 296 | if (*pos == '\\') |
| 297 | { |
| 298 | ++pos; |
| 299 | if (loadAtPosition(in, memory, pos)) |
| 300 | ++pos; |
| 301 | } |
| 302 | else if (*pos == '"') |
| 303 | { |
| 304 | ++pos; |
| 305 | quotes = false; |
| 306 | } |
| 307 | } |
| 308 | else |
| 309 | { |
| 310 | pos = find_first_symbols<'{', '}', '\\', '"'>(pos, in.buffer().end()); |
| 311 | if (pos == in.buffer().end()) |
| 312 | continue; |
| 313 | if (*pos == '{') |
| 314 | { |
| 315 | ++balance; |
| 316 | ++pos; |
| 317 | } |
| 318 | else if (*pos == '}') |
| 319 | { |
| 320 | --balance; |
| 321 | ++pos; |
| 322 | } |
| 323 | else if (*pos == '\\') |
| 324 | { |
| 325 | ++pos; |
| 326 | if (loadAtPosition(in, memory, pos)) |
| 327 | ++pos; |
| 328 | } |
| 329 | else if (*pos == '"') |
| 330 | { |
| 331 | quotes = true; |
| 332 | ++pos; |
| 333 | } |
| 334 | } |
| 335 | } |
| 336 | |
| 337 | saveUpToPosition(in, memory, pos); |
| 338 | return loadAtPosition(in, memory, pos); |
| 339 | } |
| 340 | |
| 341 | void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory) |
| 342 | { |
| 343 | factory.registerFileSegmentationEngine("JSONEachRow" , &fileSegmentationEngineJSONEachRowImpl); |
| 344 | } |
| 345 | |
| 346 | } |
| 347 | |