1#include <IO/ReadHelpers.h>
2#include <Processors/Formats/Impl/TSKVRowInputFormat.h>
3#include <Formats/FormatFactory.h>
4#include <DataTypes/DataTypeNullable.h>
5
6
7namespace DB
8{
9
10namespace ErrorCodes
11{
12 extern const int INCORRECT_DATA;
13 extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
14 extern const int CANNOT_READ_ALL_DATA;
15 extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
16}
17
18
19TSKVRowInputFormat::TSKVRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_)
20 : IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns())
21{
22 /// In this format, we assume that column name cannot contain BOM,
23 /// so BOM at beginning of stream cannot be confused with name of field, and it is safe to skip it.
24 skipBOMIfExists(in);
25
26 const auto & sample_block = getPort().getHeader();
27 size_t num_columns = sample_block.columns();
28 for (size_t i = 0; i < num_columns; ++i)
29 name_map[sample_block.getByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
30}
31
32
33/** Read the field name in the `tskv` format.
34 * Return true if the field is followed by an equal sign,
35 * otherwise (field with no value) return false.
36 * The reference to the field name will be written to `ref`.
37 * A temporary `tmp` buffer can also be used to copy the field name to it.
38 * When reading, skips the name and the equal sign after it.
39 */
40static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
41{
42 tmp.clear();
43
44 while (!buf.eof())
45 {
46 const char * next_pos = find_first_symbols<'\t', '\n', '\\', '='>(buf.position(), buf.buffer().end());
47
48 if (next_pos == buf.buffer().end())
49 {
50 tmp.append(buf.position(), next_pos - buf.position());
51 buf.next();
52 continue;
53 }
54
55 /// Came to the end of the name.
56 if (*next_pos != '\\')
57 {
58 bool have_value = *next_pos == '=';
59 if (tmp.empty())
60 {
61 /// No need to copy data, you can refer directly to the `buf`.
62 ref = StringRef(buf.position(), next_pos - buf.position());
63 buf.position() += next_pos + have_value - buf.position();
64 }
65 else
66 {
67 /// Copy the data to a temporary string and return a reference to it.
68 tmp.append(buf.position(), next_pos - buf.position());
69 buf.position() += next_pos + have_value - buf.position();
70 ref = StringRef(tmp);
71 }
72 return have_value;
73 }
74 /// The name has an escape sequence.
75 else
76 {
77 tmp.append(buf.position(), next_pos - buf.position());
78 buf.position() += next_pos + 1 - buf.position();
79 if (buf.eof())
80 throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
81
82 tmp.push_back(parseEscapeSequence(*buf.position()));
83 ++buf.position();
84 continue;
85 }
86 }
87
88 throw Exception("Unexpected end of stream while reading key name from TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA);
89}
90
91
92bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
93{
94 if (in.eof())
95 return false;
96
97 auto & header = getPort().getHeader();
98 size_t num_columns = columns.size();
99
100 /// Set of columns for which the values were read. The rest will be filled with default values.
101 read_columns.assign(num_columns, false);
102 seen_columns.assign(num_columns, false);
103
104 if (unlikely(*in.position() == '\n'))
105 {
106 /// An empty string. It is permissible, but it is unclear why.
107 ++in.position();
108 }
109 else
110 {
111 while (true)
112 {
113 StringRef name_ref;
114 bool has_value = readName(in, name_ref, name_buf);
115 ssize_t index = -1;
116
117 if (has_value)
118 {
119 /// NOTE Optimization is possible by caching the order of fields (which is almost always the same)
120 /// and quickly checking for the next expected field, instead of searching the hash table.
121
122 auto it = name_map.find(name_ref);
123 if (!it)
124 {
125 if (!format_settings.skip_unknown_fields)
126 throw Exception("Unknown field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
127
128 /// If the key is not found, skip the value.
129 NullSink sink;
130 readEscapedStringInto(sink, in);
131 }
132 else
133 {
134 index = it->getMapped();
135
136 if (seen_columns[index])
137 throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
138
139 seen_columns[index] = read_columns[index] = true;
140 const auto & type = getPort().getHeader().getByPosition(index).type;
141 if (format_settings.null_as_default && !type->isNullable())
142 read_columns[index] = DataTypeNullable::deserializeTextEscaped(*columns[index], in, format_settings, type);
143 else
144 header.getByPosition(index).type->deserializeAsTextEscaped(*columns[index], in, format_settings);
145 }
146 }
147 else
148 {
149 /// The only thing that can go without value is `tskv` fragment that is ignored.
150 if (!(name_ref.size == 4 && 0 == memcmp(name_ref.data, "tskv", 4)))
151 throw Exception("Found field without value while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
152 }
153
154 if (in.eof())
155 {
156 throw Exception("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA);
157 }
158 else if (*in.position() == '\t')
159 {
160 ++in.position();
161 continue;
162 }
163 else if (*in.position() == '\n')
164 {
165 ++in.position();
166 break;
167 }
168 else
169 {
170 /// Possibly a garbage was written into column, remove it
171 if (index >= 0)
172 {
173 columns[index]->popBack(1);
174 seen_columns[index] = read_columns[index] = false;
175 }
176
177 throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
178 }
179 }
180 }
181
182 /// Fill in the not met columns with default values.
183 for (size_t i = 0; i < num_columns; ++i)
184 if (!seen_columns[i])
185 header.getByPosition(i).type->insertDefaultInto(*columns[i]);
186
187 /// return info about defaults set
188 ext.read_columns = read_columns;
189
190 return true;
191}
192
193
194void TSKVRowInputFormat::syncAfterError()
195{
196 skipToUnescapedNextLineOrEOF(in);
197}
198
199
200void TSKVRowInputFormat::resetParser()
201{
202 IRowInputFormat::resetParser();
203 read_columns.clear();
204 seen_columns.clear();
205 name_buf.clear();
206}
207
208void registerInputFormatProcessorTSKV(FormatFactory & factory)
209{
210 factory.registerInputFormatProcessor("TSKV", [](
211 ReadBuffer & buf,
212 const Block & sample,
213 IRowInputFormat::Params params,
214 const FormatSettings & settings)
215 {
216 return std::make_shared<TSKVRowInputFormat>(buf, sample, std::move(params), settings);
217 });
218}
219
220}
221