1#include <IO/ReadHelpers.h>
2
3#include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h>
4#include <Formats/FormatFactory.h>
5#include <DataTypes/NestedUtils.h>
6#include <DataTypes/DataTypeNullable.h>
7
8namespace DB
9{
10
11namespace ErrorCodes
12{
13 extern const int INCORRECT_DATA;
14 extern const int CANNOT_READ_ALL_DATA;
15 extern const int LOGICAL_ERROR;
16}
17
18namespace
19{
20
21enum
22{
23 UNKNOWN_FIELD = size_t(-1),
24 NESTED_FIELD = size_t(-2)
25};
26
27}
28
29
30JSONEachRowRowInputFormat::JSONEachRowRowInputFormat(
31 ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
32 : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns())
33{
34 /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
35 skipBOMIfExists(in);
36
37 size_t num_columns = getPort().getHeader().columns();
38 for (size_t i = 0; i < num_columns; ++i)
39 {
40 const String & column_name = columnName(i);
41 name_map[column_name] = i; /// NOTE You could place names more cache-locally.
42 if (format_settings_.import_nested_json)
43 {
44 const auto splitted = Nested::splitName(column_name);
45 if (!splitted.second.empty())
46 {
47 const StringRef table_name(column_name.data(), splitted.first.size());
48 name_map[table_name] = NESTED_FIELD;
49 }
50 }
51 }
52
53 prev_positions.resize(num_columns);
54}
55
56const String & JSONEachRowRowInputFormat::columnName(size_t i) const
57{
58 return getPort().getHeader().getByPosition(i).name;
59}
60
61inline size_t JSONEachRowRowInputFormat::columnIndex(const StringRef & name, size_t key_index)
62{
63 /// Optimization by caching the order of fields (which is almost always the same)
64 /// and a quick check to match the next expected field, instead of searching the hash table.
65
66 if (prev_positions.size() > key_index
67 && prev_positions[key_index]
68 && name == prev_positions[key_index]->getKey())
69 {
70 return prev_positions[key_index]->getMapped();
71 }
72 else
73 {
74 const auto it = name_map.find(name);
75
76 if (it)
77 {
78 if (key_index < prev_positions.size())
79 prev_positions[key_index] = it;
80
81 return it->getMapped();
82 }
83 else
84 return UNKNOWN_FIELD;
85 }
86}
87
88/** Read the field name and convert it to column name
89 * (taking into account the current nested name prefix)
90 * Resulting StringRef is valid only before next read from buf.
91 */
92StringRef JSONEachRowRowInputFormat::readColumnName(ReadBuffer & buf)
93{
94 // This is just an optimization: try to avoid copying the name into current_column_name
95
96 if (nested_prefix_length == 0 && buf.position() + 1 < buf.buffer().end())
97 {
98 char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end());
99
100 if (next_pos != buf.buffer().end() && *next_pos != '\\')
101 {
102 /// The most likely option is that there is no escape sequence in the key name, and the entire name is placed in the buffer.
103 assertChar('"', buf);
104 StringRef res(buf.position(), next_pos - buf.position());
105 buf.position() = next_pos + 1;
106 return res;
107 }
108 }
109
110 current_column_name.resize(nested_prefix_length);
111 readJSONStringInto(current_column_name, buf);
112 return current_column_name;
113}
114
115
116static inline void skipColonDelimeter(ReadBuffer & istr)
117{
118 skipWhitespaceIfAny(istr);
119 assertChar(':', istr);
120 skipWhitespaceIfAny(istr);
121}
122
123void JSONEachRowRowInputFormat::skipUnknownField(const StringRef & name_ref)
124{
125 if (!format_settings.skip_unknown_fields)
126 throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
127
128 skipJSONField(in, name_ref);
129}
130
131void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
132{
133 if (seen_columns[index])
134 throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA);
135
136 try
137 {
138 seen_columns[index] = read_columns[index] = true;
139 const auto & type = getPort().getHeader().getByPosition(index).type;
140 if (format_settings.null_as_default && !type->isNullable())
141 read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type);
142 else
143 type->deserializeAsTextJSON(*columns[index], in, format_settings);
144 }
145 catch (Exception & e)
146 {
147 e.addMessage("(while read the value of key " + columnName(index) + ")");
148 throw;
149 }
150}
151
152inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index)
153{
154 skipWhitespaceIfAny(in);
155
156 if (in.eof())
157 throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
158 else if (*in.position() == '}')
159 {
160 ++in.position();
161 return false;
162 }
163
164 if (key_index > 0)
165 {
166 assertChar(',', in);
167 skipWhitespaceIfAny(in);
168 }
169 return true;
170}
171
172void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns)
173{
174 assertChar('{', in);
175
176 for (size_t key_index = 0; advanceToNextKey(key_index); ++key_index)
177 {
178 StringRef name_ref = readColumnName(in);
179 const size_t column_index = columnIndex(name_ref, key_index);
180
181 if (unlikely(ssize_t(column_index) < 0))
182 {
183 /// name_ref may point directly to the input buffer
184 /// and input buffer may be filled with new data on next read
185 /// If we want to use name_ref after another reads from buffer, we must copy it to temporary string.
186
187 current_column_name.assign(name_ref.data, name_ref.size);
188 name_ref = StringRef(current_column_name);
189
190 skipColonDelimeter(in);
191
192 if (column_index == UNKNOWN_FIELD)
193 skipUnknownField(name_ref);
194 else if (column_index == NESTED_FIELD)
195 readNestedData(name_ref.toString(), columns);
196 else
197 throw Exception("Logical error: illegal value of column_index", ErrorCodes::LOGICAL_ERROR);
198 }
199 else
200 {
201 skipColonDelimeter(in);
202 readField(column_index, columns);
203 }
204 }
205}
206
207void JSONEachRowRowInputFormat::readNestedData(const String & name, MutableColumns & columns)
208{
209 current_column_name = name;
210 current_column_name.push_back('.');
211 nested_prefix_length = current_column_name.size();
212 readJSONObject(columns);
213 nested_prefix_length = 0;
214}
215
216
217bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
218{
219 skipWhitespaceIfAny(in);
220
221 /// We consume ;, or \n before scanning a new row, instead scanning to next row at the end.
222 /// The reason is that if we want an exact number of rows read with LIMIT x
223 /// from a streaming table engine with text data format, like File or Kafka
224 /// then seeking to next ;, or \n would trigger reading of an extra row at the end.
225
226 /// Semicolon is added for convenience as it could be used at end of INSERT query.
227 if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
228 ++in.position();
229
230 skipWhitespaceIfAny(in);
231 if (in.eof())
232 return false;
233
234 size_t num_columns = columns.size();
235
236 read_columns.assign(num_columns, false);
237 seen_columns.assign(num_columns, false);
238
239 nested_prefix_length = 0;
240 readJSONObject(columns);
241
242 auto & header = getPort().getHeader();
243 /// Fill non-visited columns with the default values.
244 for (size_t i = 0; i < num_columns; ++i)
245 if (!seen_columns[i])
246 header.getByPosition(i).type->insertDefaultInto(*columns[i]);
247
248 /// return info about defaults set
249 ext.read_columns = read_columns;
250 return true;
251}
252
253
254void JSONEachRowRowInputFormat::syncAfterError()
255{
256 skipToUnescapedNextLineOrEOF(in);
257}
258
259void JSONEachRowRowInputFormat::resetParser()
260{
261 IRowInputFormat::resetParser();
262 nested_prefix_length = 0;
263 read_columns.clear();
264 seen_columns.clear();
265 prev_positions.clear();
266}
267
268
269void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
270{
271 factory.registerInputFormatProcessor("JSONEachRow", [](
272 ReadBuffer & buf,
273 const Block & sample,
274 IRowInputFormat::Params params,
275 const FormatSettings & settings)
276 {
277 return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings);
278 });
279}
280
281static bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
282{
283 skipWhitespaceIfAny(in);
284
285 char * pos = in.position();
286 size_t balance = 0;
287 bool quotes = false;
288
289 while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size))
290 {
291 if (quotes)
292 {
293 pos = find_first_symbols<'\\', '"'>(pos, in.buffer().end());
294 if (pos == in.buffer().end())
295 continue;
296 if (*pos == '\\')
297 {
298 ++pos;
299 if (loadAtPosition(in, memory, pos))
300 ++pos;
301 }
302 else if (*pos == '"')
303 {
304 ++pos;
305 quotes = false;
306 }
307 }
308 else
309 {
310 pos = find_first_symbols<'{', '}', '\\', '"'>(pos, in.buffer().end());
311 if (pos == in.buffer().end())
312 continue;
313 if (*pos == '{')
314 {
315 ++balance;
316 ++pos;
317 }
318 else if (*pos == '}')
319 {
320 --balance;
321 ++pos;
322 }
323 else if (*pos == '\\')
324 {
325 ++pos;
326 if (loadAtPosition(in, memory, pos))
327 ++pos;
328 }
329 else if (*pos == '"')
330 {
331 quotes = true;
332 ++pos;
333 }
334 }
335 }
336
337 saveUpToPosition(in, memory, pos);
338 return loadAtPosition(in, memory, pos);
339}
340
341void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory)
342{
343 factory.registerFileSegmentationEngine("JSONEachRow", &fileSegmentationEngineJSONEachRowImpl);
344}
345
346}
347