1#include <IO/ReadHelpers.h>
2
3#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
4#include <Formats/FormatFactory.h>
5#include <DataTypes/NestedUtils.h>
6#include <DataTypes/DataTypeNullable.h>
7
8namespace DB
9{
10
11namespace ErrorCodes
12{
13 extern const int INCORRECT_DATA;
14 extern const int CANNOT_READ_ALL_DATA;
15 extern const int LOGICAL_ERROR;
16}
17
18
19JSONCompactEachRowRowInputFormat::JSONCompactEachRowRowInputFormat(ReadBuffer & in_,
20 const Block & header_,
21 Params params_,
22 const FormatSettings & format_settings_,
23 bool with_names_)
24 : IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_)
25{
26 /// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
27 skipBOMIfExists(in);
28 auto & sample = getPort().getHeader();
29 size_t num_columns = sample.columns();
30
31 data_types.resize(num_columns);
32 column_indexes_by_names.reserve(num_columns);
33
34 for (size_t i = 0; i < num_columns; ++i)
35 {
36 const auto & column_info = sample.getByPosition(i);
37
38 data_types[i] = column_info.type;
39 column_indexes_by_names.emplace(column_info.name, i);
40 }
41}
42
43void JSONCompactEachRowRowInputFormat::readPrefix()
44{
45 if (with_names)
46 {
47 size_t num_columns = getPort().getHeader().columns();
48 read_columns.assign(num_columns, false);
49
50 assertChar('[', in);
51 do
52 {
53 skipWhitespaceIfAny(in);
54 String column_name;
55 readJSONString(column_name, in);
56 addInputColumn(column_name);
57 skipWhitespaceIfAny(in);
58 }
59 while (checkChar(',', in));
60 assertChar(']', in);
61 skipEndOfLine();
62
63 /// Type checking
64 assertChar('[', in);
65 for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
66 {
67 skipWhitespaceIfAny(in);
68 String data_type;
69 readJSONString(data_type, in);
70
71 if (column_indexes_for_input_fields[i] &&
72 data_types[*column_indexes_for_input_fields[i]]->getName() != data_type)
73 {
74 throw Exception(
75 "Type of '" + getPort().getHeader().getByPosition(*column_indexes_for_input_fields[i]).name
76 + "' must be " + data_types[*column_indexes_for_input_fields[i]]->getName() +
77 ", not " + data_type,
78 ErrorCodes::INCORRECT_DATA
79 );
80 }
81
82 if (i != column_indexes_for_input_fields.size() - 1)
83 assertChar(',', in);
84 skipWhitespaceIfAny(in);
85 }
86 assertChar(']', in);
87 }
88 else
89 {
90 size_t num_columns = getPort().getHeader().columns();
91 read_columns.assign(num_columns, true);
92 column_indexes_for_input_fields.resize(num_columns);
93
94 for (size_t i = 0; i < num_columns; ++i)
95 {
96 column_indexes_for_input_fields[i] = i;
97 }
98 }
99
100 for (size_t i = 0; i < read_columns.size(); ++i)
101 {
102 if (!read_columns[i])
103 {
104 not_seen_columns.emplace_back(i);
105 }
106 }
107}
108
109void JSONCompactEachRowRowInputFormat::addInputColumn(const String & column_name)
110{
111 names_of_columns.emplace_back(column_name);
112
113 const auto column_it = column_indexes_by_names.find(column_name);
114 if (column_it == column_indexes_by_names.end())
115 {
116 if (format_settings.skip_unknown_fields)
117 {
118 column_indexes_for_input_fields.push_back(std::nullopt);
119 return;
120 }
121
122 throw Exception(
123 "Unknown field found in JSONCompactEachRow header: '" + column_name + "' " +
124 "at position " + std::to_string(column_indexes_for_input_fields.size()) +
125 "\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
126 ErrorCodes::INCORRECT_DATA
127 );
128 }
129
130 const auto column_index = column_it->second;
131
132 if (read_columns[column_index])
133 throw Exception("Duplicate field found while parsing JSONCompactEachRow header: " + column_name, ErrorCodes::INCORRECT_DATA);
134
135 read_columns[column_index] = true;
136 column_indexes_for_input_fields.emplace_back(column_index);
137}
138
139bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::RowReadExtension &ext)
140{
141 skipEndOfLine();
142
143 if (in.eof())
144 return false;
145
146 size_t num_columns = columns.size();
147
148 read_columns.assign(num_columns, false);
149
150 assertChar('[', in);
151 for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
152 {
153 const auto & table_column = column_indexes_for_input_fields[file_column];
154 if (table_column)
155 {
156 readField(*table_column, columns);
157 }
158 else
159 {
160 skipJSONField(in, StringRef(names_of_columns[file_column]));
161 }
162
163 skipWhitespaceIfAny(in);
164 if (in.eof())
165 throw Exception("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
166 if (file_column + 1 != column_indexes_for_input_fields.size())
167 {
168 assertChar(',', in);
169 skipWhitespaceIfAny(in);
170 }
171 }
172 assertChar(']', in);
173
174 for (size_t i = 0; i < not_seen_columns.size(); i++)
175 {
176 columns[not_seen_columns[i]]->insertDefault();
177 }
178
179 ext.read_columns = read_columns;
180 return true;
181}
182
183void JSONCompactEachRowRowInputFormat::skipEndOfLine()
184{
185 skipWhitespaceIfAny(in);
186 if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
187 ++in.position();
188
189 skipWhitespaceIfAny(in);
190}
191
192void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
193{
194 try
195 {
196 read_columns[index] = true;
197 const auto & type = data_types[index];
198 if (format_settings.null_as_default && !type->isNullable())
199 read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type);
200 else
201 type->deserializeAsTextJSON(*columns[index], in, format_settings);
202 }
203 catch (Exception & e)
204 {
205 e.addMessage("(while read the value of key " + getPort().getHeader().getByPosition(index).name + ")");
206 throw;
207 }
208}
209
210void JSONCompactEachRowRowInputFormat::syncAfterError()
211{
212 skipToUnescapedNextLineOrEOF(in);
213}
214
215void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
216{
217 factory.registerInputFormatProcessor("JSONCompactEachRow", [](
218 ReadBuffer & buf,
219 const Block & sample,
220 IRowInputFormat::Params params,
221 const FormatSettings & settings)
222 {
223 return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
224 });
225
226 factory.registerInputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
227 ReadBuffer & buf,
228 const Block & sample,
229 IRowInputFormat::Params params,
230 const FormatSettings & settings)
231 {
232 return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
233 });
234}
235
236}
237