1 | #include <IO/ReadHelpers.h> |
2 | #include <Processors/Formats/Impl/TSKVRowInputFormat.h> |
3 | #include <Formats/FormatFactory.h> |
4 | #include <DataTypes/DataTypeNullable.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | namespace ErrorCodes |
11 | { |
12 | extern const int INCORRECT_DATA; |
13 | extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; |
14 | extern const int CANNOT_READ_ALL_DATA; |
15 | extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED; |
16 | } |
17 | |
18 | |
19 | TSKVRowInputFormat::TSKVRowInputFormat(ReadBuffer & in_, Block , Params params_, const FormatSettings & format_settings_) |
20 | : IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns()) |
21 | { |
22 | /// In this format, we assume that column name cannot contain BOM, |
23 | /// so BOM at beginning of stream cannot be confused with name of field, and it is safe to skip it. |
24 | skipBOMIfExists(in); |
25 | |
26 | const auto & sample_block = getPort().getHeader(); |
27 | size_t num_columns = sample_block.columns(); |
28 | for (size_t i = 0; i < num_columns; ++i) |
29 | name_map[sample_block.getByPosition(i).name] = i; /// NOTE You could place names more cache-locally. |
30 | } |
31 | |
32 | |
33 | /** Read the field name in the `tskv` format. |
34 | * Return true if the field is followed by an equal sign, |
35 | * otherwise (field with no value) return false. |
36 | * The reference to the field name will be written to `ref`. |
37 | * A temporary `tmp` buffer can also be used to copy the field name to it. |
38 | * When reading, skips the name and the equal sign after it. |
39 | */ |
40 | static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp) |
41 | { |
42 | tmp.clear(); |
43 | |
44 | while (!buf.eof()) |
45 | { |
46 | const char * next_pos = find_first_symbols<'\t', '\n', '\\', '='>(buf.position(), buf.buffer().end()); |
47 | |
48 | if (next_pos == buf.buffer().end()) |
49 | { |
50 | tmp.append(buf.position(), next_pos - buf.position()); |
51 | buf.next(); |
52 | continue; |
53 | } |
54 | |
55 | /// Came to the end of the name. |
56 | if (*next_pos != '\\') |
57 | { |
58 | bool have_value = *next_pos == '='; |
59 | if (tmp.empty()) |
60 | { |
61 | /// No need to copy data, you can refer directly to the `buf`. |
62 | ref = StringRef(buf.position(), next_pos - buf.position()); |
63 | buf.position() += next_pos + have_value - buf.position(); |
64 | } |
65 | else |
66 | { |
67 | /// Copy the data to a temporary string and return a reference to it. |
68 | tmp.append(buf.position(), next_pos - buf.position()); |
69 | buf.position() += next_pos + have_value - buf.position(); |
70 | ref = StringRef(tmp); |
71 | } |
72 | return have_value; |
73 | } |
74 | /// The name has an escape sequence. |
75 | else |
76 | { |
77 | tmp.append(buf.position(), next_pos - buf.position()); |
78 | buf.position() += next_pos + 1 - buf.position(); |
79 | if (buf.eof()) |
80 | throw Exception("Cannot parse escape sequence" , ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); |
81 | |
82 | tmp.push_back(parseEscapeSequence(*buf.position())); |
83 | ++buf.position(); |
84 | continue; |
85 | } |
86 | } |
87 | |
88 | throw Exception("Unexpected end of stream while reading key name from TSKV format" , ErrorCodes::CANNOT_READ_ALL_DATA); |
89 | } |
90 | |
91 | |
92 | bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) |
93 | { |
94 | if (in.eof()) |
95 | return false; |
96 | |
97 | auto & = getPort().getHeader(); |
98 | size_t num_columns = columns.size(); |
99 | |
100 | /// Set of columns for which the values were read. The rest will be filled with default values. |
101 | read_columns.assign(num_columns, false); |
102 | seen_columns.assign(num_columns, false); |
103 | |
104 | if (unlikely(*in.position() == '\n')) |
105 | { |
106 | /// An empty string. It is permissible, but it is unclear why. |
107 | ++in.position(); |
108 | } |
109 | else |
110 | { |
111 | while (true) |
112 | { |
113 | StringRef name_ref; |
114 | bool has_value = readName(in, name_ref, name_buf); |
115 | ssize_t index = -1; |
116 | |
117 | if (has_value) |
118 | { |
119 | /// NOTE Optimization is possible by caching the order of fields (which is almost always the same) |
120 | /// and quickly checking for the next expected field, instead of searching the hash table. |
121 | |
122 | auto it = name_map.find(name_ref); |
123 | if (!it) |
124 | { |
125 | if (!format_settings.skip_unknown_fields) |
126 | throw Exception("Unknown field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); |
127 | |
128 | /// If the key is not found, skip the value. |
129 | NullSink sink; |
130 | readEscapedStringInto(sink, in); |
131 | } |
132 | else |
133 | { |
134 | index = it->getMapped(); |
135 | |
136 | if (seen_columns[index]) |
137 | throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); |
138 | |
139 | seen_columns[index] = read_columns[index] = true; |
140 | const auto & type = getPort().getHeader().getByPosition(index).type; |
141 | if (format_settings.null_as_default && !type->isNullable()) |
142 | read_columns[index] = DataTypeNullable::deserializeTextEscaped(*columns[index], in, format_settings, type); |
143 | else |
144 | header.getByPosition(index).type->deserializeAsTextEscaped(*columns[index], in, format_settings); |
145 | } |
146 | } |
147 | else |
148 | { |
149 | /// The only thing that can go without value is `tskv` fragment that is ignored. |
150 | if (!(name_ref.size == 4 && 0 == memcmp(name_ref.data, "tskv" , 4))) |
151 | throw Exception("Found field without value while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); |
152 | } |
153 | |
154 | if (in.eof()) |
155 | { |
156 | throw Exception("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA); |
157 | } |
158 | else if (*in.position() == '\t') |
159 | { |
160 | ++in.position(); |
161 | continue; |
162 | } |
163 | else if (*in.position() == '\n') |
164 | { |
165 | ++in.position(); |
166 | break; |
167 | } |
168 | else |
169 | { |
170 | /// Possibly a garbage was written into column, remove it |
171 | if (index >= 0) |
172 | { |
173 | columns[index]->popBack(1); |
174 | seen_columns[index] = read_columns[index] = false; |
175 | } |
176 | |
177 | throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); |
178 | } |
179 | } |
180 | } |
181 | |
182 | /// Fill in the not met columns with default values. |
183 | for (size_t i = 0; i < num_columns; ++i) |
184 | if (!seen_columns[i]) |
185 | header.getByPosition(i).type->insertDefaultInto(*columns[i]); |
186 | |
187 | /// return info about defaults set |
188 | ext.read_columns = read_columns; |
189 | |
190 | return true; |
191 | } |
192 | |
193 | |
194 | void TSKVRowInputFormat::syncAfterError() |
195 | { |
196 | skipToUnescapedNextLineOrEOF(in); |
197 | } |
198 | |
199 | |
200 | void TSKVRowInputFormat::resetParser() |
201 | { |
202 | IRowInputFormat::resetParser(); |
203 | read_columns.clear(); |
204 | seen_columns.clear(); |
205 | name_buf.clear(); |
206 | } |
207 | |
208 | void registerInputFormatProcessorTSKV(FormatFactory & factory) |
209 | { |
210 | factory.registerInputFormatProcessor("TSKV" , []( |
211 | ReadBuffer & buf, |
212 | const Block & sample, |
213 | IRowInputFormat::Params params, |
214 | const FormatSettings & settings) |
215 | { |
216 | return std::make_shared<TSKVRowInputFormat>(buf, sample, std::move(params), settings); |
217 | }); |
218 | } |
219 | |
220 | } |
221 | |