1 | #include <IO/ReadHelpers.h> |
2 | |
3 | #include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h> |
4 | #include <Formats/FormatFactory.h> |
5 | #include <DataTypes/NestedUtils.h> |
6 | #include <DataTypes/DataTypeNullable.h> |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | namespace ErrorCodes |
12 | { |
13 | extern const int INCORRECT_DATA; |
14 | extern const int CANNOT_READ_ALL_DATA; |
15 | extern const int LOGICAL_ERROR; |
16 | } |
17 | |
18 | namespace |
19 | { |
20 | |
21 | enum |
22 | { |
23 | UNKNOWN_FIELD = size_t(-1), |
24 | NESTED_FIELD = size_t(-2) |
25 | }; |
26 | |
27 | } |
28 | |
29 | |
30 | JSONEachRowRowInputFormat::JSONEachRowRowInputFormat( |
31 | ReadBuffer & in_, const Block & , Params params_, const FormatSettings & format_settings_) |
32 | : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()) |
33 | { |
34 | /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it. |
35 | skipBOMIfExists(in); |
36 | |
37 | size_t num_columns = getPort().getHeader().columns(); |
38 | for (size_t i = 0; i < num_columns; ++i) |
39 | { |
40 | const String & column_name = columnName(i); |
41 | name_map[column_name] = i; /// NOTE You could place names more cache-locally. |
42 | if (format_settings_.import_nested_json) |
43 | { |
44 | const auto splitted = Nested::splitName(column_name); |
45 | if (!splitted.second.empty()) |
46 | { |
47 | const StringRef table_name(column_name.data(), splitted.first.size()); |
48 | name_map[table_name] = NESTED_FIELD; |
49 | } |
50 | } |
51 | } |
52 | |
53 | prev_positions.resize(num_columns); |
54 | } |
55 | |
56 | const String & JSONEachRowRowInputFormat::columnName(size_t i) const |
57 | { |
58 | return getPort().getHeader().getByPosition(i).name; |
59 | } |
60 | |
61 | inline size_t JSONEachRowRowInputFormat::columnIndex(const StringRef & name, size_t key_index) |
62 | { |
63 | /// Optimization by caching the order of fields (which is almost always the same) |
64 | /// and a quick check to match the next expected field, instead of searching the hash table. |
65 | |
66 | if (prev_positions.size() > key_index |
67 | && prev_positions[key_index] |
68 | && name == prev_positions[key_index]->getKey()) |
69 | { |
70 | return prev_positions[key_index]->getMapped(); |
71 | } |
72 | else |
73 | { |
74 | const auto it = name_map.find(name); |
75 | |
76 | if (it) |
77 | { |
78 | if (key_index < prev_positions.size()) |
79 | prev_positions[key_index] = it; |
80 | |
81 | return it->getMapped(); |
82 | } |
83 | else |
84 | return UNKNOWN_FIELD; |
85 | } |
86 | } |
87 | |
88 | /** Read the field name and convert it to column name |
89 | * (taking into account the current nested name prefix) |
90 | * Resulting StringRef is valid only before next read from buf. |
91 | */ |
92 | StringRef JSONEachRowRowInputFormat::readColumnName(ReadBuffer & buf) |
93 | { |
94 | // This is just an optimization: try to avoid copying the name into current_column_name |
95 | |
96 | if (nested_prefix_length == 0 && buf.position() + 1 < buf.buffer().end()) |
97 | { |
98 | char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end()); |
99 | |
100 | if (next_pos != buf.buffer().end() && *next_pos != '\\') |
101 | { |
102 | /// The most likely option is that there is no escape sequence in the key name, and the entire name is placed in the buffer. |
103 | assertChar('"', buf); |
104 | StringRef res(buf.position(), next_pos - buf.position()); |
105 | buf.position() = next_pos + 1; |
106 | return res; |
107 | } |
108 | } |
109 | |
110 | current_column_name.resize(nested_prefix_length); |
111 | readJSONStringInto(current_column_name, buf); |
112 | return current_column_name; |
113 | } |
114 | |
115 | |
116 | static inline void skipColonDelimeter(ReadBuffer & istr) |
117 | { |
118 | skipWhitespaceIfAny(istr); |
119 | assertChar(':', istr); |
120 | skipWhitespaceIfAny(istr); |
121 | } |
122 | |
123 | void JSONEachRowRowInputFormat::skipUnknownField(const StringRef & name_ref) |
124 | { |
125 | if (!format_settings.skip_unknown_fields) |
126 | throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); |
127 | |
128 | skipJSONField(in, name_ref); |
129 | } |
130 | |
131 | void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns) |
132 | { |
133 | if (seen_columns[index]) |
134 | throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA); |
135 | |
136 | try |
137 | { |
138 | seen_columns[index] = read_columns[index] = true; |
139 | const auto & type = getPort().getHeader().getByPosition(index).type; |
140 | if (format_settings.null_as_default && !type->isNullable()) |
141 | read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type); |
142 | else |
143 | type->deserializeAsTextJSON(*columns[index], in, format_settings); |
144 | } |
145 | catch (Exception & e) |
146 | { |
147 | e.addMessage("(while read the value of key " + columnName(index) + ")" ); |
148 | throw; |
149 | } |
150 | } |
151 | |
152 | inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index) |
153 | { |
154 | skipWhitespaceIfAny(in); |
155 | |
156 | if (in.eof()) |
157 | throw Exception("Unexpected end of stream while parsing JSONEachRow format" , ErrorCodes::CANNOT_READ_ALL_DATA); |
158 | else if (*in.position() == '}') |
159 | { |
160 | ++in.position(); |
161 | return false; |
162 | } |
163 | |
164 | if (key_index > 0) |
165 | { |
166 | assertChar(',', in); |
167 | skipWhitespaceIfAny(in); |
168 | } |
169 | return true; |
170 | } |
171 | |
172 | void JSONEachRowRowInputFormat::readJSONObject(MutableColumns & columns) |
173 | { |
174 | assertChar('{', in); |
175 | |
176 | for (size_t key_index = 0; advanceToNextKey(key_index); ++key_index) |
177 | { |
178 | StringRef name_ref = readColumnName(in); |
179 | const size_t column_index = columnIndex(name_ref, key_index); |
180 | |
181 | if (unlikely(ssize_t(column_index) < 0)) |
182 | { |
183 | /// name_ref may point directly to the input buffer |
184 | /// and input buffer may be filled with new data on next read |
185 | /// If we want to use name_ref after another reads from buffer, we must copy it to temporary string. |
186 | |
187 | current_column_name.assign(name_ref.data, name_ref.size); |
188 | name_ref = StringRef(current_column_name); |
189 | |
190 | skipColonDelimeter(in); |
191 | |
192 | if (column_index == UNKNOWN_FIELD) |
193 | skipUnknownField(name_ref); |
194 | else if (column_index == NESTED_FIELD) |
195 | readNestedData(name_ref.toString(), columns); |
196 | else |
197 | throw Exception("Logical error: illegal value of column_index" , ErrorCodes::LOGICAL_ERROR); |
198 | } |
199 | else |
200 | { |
201 | skipColonDelimeter(in); |
202 | readField(column_index, columns); |
203 | } |
204 | } |
205 | } |
206 | |
207 | void JSONEachRowRowInputFormat::readNestedData(const String & name, MutableColumns & columns) |
208 | { |
209 | current_column_name = name; |
210 | current_column_name.push_back('.'); |
211 | nested_prefix_length = current_column_name.size(); |
212 | readJSONObject(columns); |
213 | nested_prefix_length = 0; |
214 | } |
215 | |
216 | |
217 | bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) |
218 | { |
219 | skipWhitespaceIfAny(in); |
220 | |
221 | /// We consume ;, or \n before scanning a new row, instead scanning to next row at the end. |
222 | /// The reason is that if we want an exact number of rows read with LIMIT x |
223 | /// from a streaming table engine with text data format, like File or Kafka |
224 | /// then seeking to next ;, or \n would trigger reading of an extra row at the end. |
225 | |
226 | /// Semicolon is added for convenience as it could be used at end of INSERT query. |
227 | if (!in.eof() && (*in.position() == ',' || *in.position() == ';')) |
228 | ++in.position(); |
229 | |
230 | skipWhitespaceIfAny(in); |
231 | if (in.eof()) |
232 | return false; |
233 | |
234 | size_t num_columns = columns.size(); |
235 | |
236 | read_columns.assign(num_columns, false); |
237 | seen_columns.assign(num_columns, false); |
238 | |
239 | nested_prefix_length = 0; |
240 | readJSONObject(columns); |
241 | |
242 | auto & = getPort().getHeader(); |
243 | /// Fill non-visited columns with the default values. |
244 | for (size_t i = 0; i < num_columns; ++i) |
245 | if (!seen_columns[i]) |
246 | header.getByPosition(i).type->insertDefaultInto(*columns[i]); |
247 | |
248 | /// return info about defaults set |
249 | ext.read_columns = read_columns; |
250 | return true; |
251 | } |
252 | |
253 | |
254 | void JSONEachRowRowInputFormat::syncAfterError() |
255 | { |
256 | skipToUnescapedNextLineOrEOF(in); |
257 | } |
258 | |
259 | void JSONEachRowRowInputFormat::resetParser() |
260 | { |
261 | IRowInputFormat::resetParser(); |
262 | nested_prefix_length = 0; |
263 | read_columns.clear(); |
264 | seen_columns.clear(); |
265 | prev_positions.clear(); |
266 | } |
267 | |
268 | |
269 | void registerInputFormatProcessorJSONEachRow(FormatFactory & factory) |
270 | { |
271 | factory.registerInputFormatProcessor("JSONEachRow" , []( |
272 | ReadBuffer & buf, |
273 | const Block & sample, |
274 | IRowInputFormat::Params params, |
275 | const FormatSettings & settings) |
276 | { |
277 | return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings); |
278 | }); |
279 | } |
280 | |
281 | static bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) |
282 | { |
283 | skipWhitespaceIfAny(in); |
284 | |
285 | char * pos = in.position(); |
286 | size_t balance = 0; |
287 | bool quotes = false; |
288 | |
289 | while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size)) |
290 | { |
291 | if (quotes) |
292 | { |
293 | pos = find_first_symbols<'\\', '"'>(pos, in.buffer().end()); |
294 | if (pos == in.buffer().end()) |
295 | continue; |
296 | if (*pos == '\\') |
297 | { |
298 | ++pos; |
299 | if (loadAtPosition(in, memory, pos)) |
300 | ++pos; |
301 | } |
302 | else if (*pos == '"') |
303 | { |
304 | ++pos; |
305 | quotes = false; |
306 | } |
307 | } |
308 | else |
309 | { |
310 | pos = find_first_symbols<'{', '}', '\\', '"'>(pos, in.buffer().end()); |
311 | if (pos == in.buffer().end()) |
312 | continue; |
313 | if (*pos == '{') |
314 | { |
315 | ++balance; |
316 | ++pos; |
317 | } |
318 | else if (*pos == '}') |
319 | { |
320 | --balance; |
321 | ++pos; |
322 | } |
323 | else if (*pos == '\\') |
324 | { |
325 | ++pos; |
326 | if (loadAtPosition(in, memory, pos)) |
327 | ++pos; |
328 | } |
329 | else if (*pos == '"') |
330 | { |
331 | quotes = true; |
332 | ++pos; |
333 | } |
334 | } |
335 | } |
336 | |
337 | saveUpToPosition(in, memory, pos); |
338 | return loadAtPosition(in, memory, pos); |
339 | } |
340 | |
341 | void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory) |
342 | { |
343 | factory.registerFileSegmentationEngine("JSONEachRow" , &fileSegmentationEngineJSONEachRowImpl); |
344 | } |
345 | |
346 | } |
347 | |