1 | #include "CapnProtoRowInputFormat.h" |
2 | #if USE_CAPNP |
3 | |
4 | #include <Core/Field.h> |
5 | #include <IO/ReadBuffer.h> |
6 | #include <Interpreters/Context.h> |
7 | #include <Formats/FormatFactory.h> |
8 | #include <Formats/FormatSchemaInfo.h> |
9 | #include <capnp/serialize.h> |
10 | #include <capnp/dynamic.h> |
11 | #include <capnp/common.h> |
12 | #include <boost/algorithm/string.hpp> |
13 | #include <boost/range/join.hpp> |
14 | #include <common/logger_useful.h> |
15 | |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | namespace ErrorCodes |
21 | { |
22 | extern const int BAD_TYPE_OF_FIELD; |
23 | extern const int BAD_ARGUMENTS; |
24 | extern const int THERE_IS_NO_COLUMN; |
25 | extern const int LOGICAL_ERROR; |
26 | } |
27 | |
28 | static CapnProtoRowInputFormat::NestedField split(const Block & , size_t i) |
29 | { |
30 | CapnProtoRowInputFormat::NestedField field = {{}, i}; |
31 | |
32 | // Remove leading dot in field definition, e.g. ".msg" -> "msg" |
33 | String name(header.safeGetByPosition(i).name); |
34 | if (!name.empty() && name[0] == '.') |
35 | name.erase(0, 1); |
36 | |
37 | boost::split(field.tokens, name, boost::is_any_of("._" )); |
38 | return field; |
39 | } |
40 | |
41 | |
42 | static Field convertNodeToField(const capnp::DynamicValue::Reader & value) |
43 | { |
44 | switch (value.getType()) |
45 | { |
46 | case capnp::DynamicValue::UNKNOWN: |
47 | throw Exception("Unknown field type" , ErrorCodes::BAD_TYPE_OF_FIELD); |
48 | case capnp::DynamicValue::VOID: |
49 | return Field(); |
50 | case capnp::DynamicValue::BOOL: |
51 | return value.as<bool>() ? 1u : 0u; |
52 | case capnp::DynamicValue::INT: |
53 | return value.as<int64_t>(); |
54 | case capnp::DynamicValue::UINT: |
55 | return value.as<uint64_t>(); |
56 | case capnp::DynamicValue::FLOAT: |
57 | return value.as<double>(); |
58 | case capnp::DynamicValue::TEXT: |
59 | { |
60 | auto arr = value.as<capnp::Text>(); |
61 | return String(arr.begin(), arr.size()); |
62 | } |
63 | case capnp::DynamicValue::DATA: |
64 | { |
65 | auto arr = value.as<capnp::Data>().asChars(); |
66 | return String(arr.begin(), arr.size()); |
67 | } |
68 | case capnp::DynamicValue::LIST: |
69 | { |
70 | auto listValue = value.as<capnp::DynamicList>(); |
71 | Array res(listValue.size()); |
72 | for (auto i : kj::indices(listValue)) |
73 | res[i] = convertNodeToField(listValue[i]); |
74 | |
75 | return res; |
76 | } |
77 | case capnp::DynamicValue::ENUM: |
78 | return value.as<capnp::DynamicEnum>().getRaw(); |
79 | case capnp::DynamicValue::STRUCT: |
80 | { |
81 | auto structValue = value.as<capnp::DynamicStruct>(); |
82 | const auto & fields = structValue.getSchema().getFields(); |
83 | |
84 | Tuple tuple(fields.size()); |
85 | for (auto i : kj::indices(fields)) |
86 | tuple[i] = convertNodeToField(structValue.get(fields[i])); |
87 | |
88 | return tuple; |
89 | } |
90 | case capnp::DynamicValue::CAPABILITY: |
91 | throw Exception("CAPABILITY type not supported" , ErrorCodes::BAD_TYPE_OF_FIELD); |
92 | case capnp::DynamicValue::ANY_POINTER: |
93 | throw Exception("ANY_POINTER type not supported" , ErrorCodes::BAD_TYPE_OF_FIELD); |
94 | } |
95 | return Field(); |
96 | } |
97 | |
98 | static capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field) |
99 | { |
100 | KJ_IF_MAYBE(child, node.findFieldByName(field)) |
101 | return *child; |
102 | else |
103 | throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN); |
104 | } |
105 | |
106 | |
107 | void CapnProtoRowInputFormat::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader) |
108 | { |
109 | /// Columns in a table can map to fields in Cap'n'Proto or to structs. |
110 | |
111 | /// Store common parents and their tokens in order to backtrack. |
112 | std::vector<capnp::StructSchema::Field> parents; |
113 | std::vector<std::string> parent_tokens; |
114 | |
115 | capnp::StructSchema cur_reader = reader; |
116 | |
117 | for (const auto & field : sorted_fields) |
118 | { |
119 | if (field.tokens.empty()) |
120 | throw Exception("Logical error in CapnProtoRowInputFormat" , ErrorCodes::LOGICAL_ERROR); |
121 | |
122 | // Backtrack to common parent |
123 | while (field.tokens.size() < parent_tokens.size() + 1 |
124 | || !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin())) |
125 | { |
126 | actions.push_back({Action::POP}); |
127 | parents.pop_back(); |
128 | parent_tokens.pop_back(); |
129 | |
130 | if (parents.empty()) |
131 | { |
132 | cur_reader = reader; |
133 | break; |
134 | } |
135 | else |
136 | cur_reader = parents.back().getType().asStruct(); |
137 | } |
138 | |
139 | // Go forward |
140 | while (parent_tokens.size() + 1 < field.tokens.size()) |
141 | { |
142 | const auto & token = field.tokens[parents.size()]; |
143 | auto node = getFieldOrThrow(cur_reader, token); |
144 | if (node.getType().isStruct()) |
145 | { |
146 | // Descend to field structure |
147 | parents.emplace_back(node); |
148 | parent_tokens.emplace_back(token); |
149 | cur_reader = node.getType().asStruct(); |
150 | actions.push_back({Action::PUSH, node}); |
151 | } |
152 | else if (node.getType().isList()) |
153 | { |
154 | break; // Collect list |
155 | } |
156 | else |
157 | throw Exception("Field " + token + " is neither Struct nor List" , ErrorCodes::BAD_TYPE_OF_FIELD); |
158 | } |
159 | |
160 | // Read field from the structure |
161 | auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]); |
162 | if (node.getType().isList() && !actions.empty() && actions.back().field == node) |
163 | { |
164 | // The field list here flattens Nested elements into multiple arrays |
165 | // In order to map Nested types in Cap'nProto back, they need to be collected |
166 | // Since the field names are sorted, the order of field positions must be preserved |
167 | // For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first |
168 | // even though it's position is second. |
169 | auto & columns = actions.back().columns; |
170 | auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos); |
171 | columns.insert(it, field.pos); |
172 | } |
173 | else |
174 | { |
175 | actions.push_back({Action::READ, node, {field.pos}}); |
176 | } |
177 | } |
178 | } |
179 | |
180 | CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block , Params params_, const FormatSchemaInfo & info) |
181 | : IRowInputFormat(std::move(header), in_, std::move(params_)), parser(std::make_shared<SchemaParser>()) |
182 | { |
183 | // Parse the schema and fetch the root object |
184 | |
185 | #pragma GCC diagnostic push |
186 | #pragma GCC diagnostic ignored "-Wdeprecated-declarations" |
187 | auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {}); |
188 | #pragma GCC diagnostic pop |
189 | |
190 | root = schema.getNested(info.messageName()).asStruct(); |
191 | |
192 | /** |
193 | * The schema typically consists of fields in various nested structures. |
194 | * Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent, |
195 | * and the nesting level doesn't decrease to make traversal easier. |
196 | */ |
197 | auto & sample = getPort().getHeader(); |
198 | NestedFieldList list; |
199 | size_t num_columns = sample.columns(); |
200 | for (size_t i = 0; i < num_columns; ++i) |
201 | list.push_back(split(sample, i)); |
202 | |
203 | // Order list first by value of strings then by length of string vector. |
204 | std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; }); |
205 | createActions(list, root); |
206 | } |
207 | |
208 | kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage() |
209 | { |
210 | uint32_t segment_count; |
211 | in.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t)); |
212 | |
213 | // one for segmentCount and one because segmentCount starts from 0 |
214 | const auto prefix_size = (2 + segment_count) * sizeof(uint32_t); |
215 | const auto words_prefix_size = (segment_count + 1) / 2 + 1; |
216 | auto prefix = kj::heapArray<capnp::word>(words_prefix_size); |
217 | auto prefix_chars = prefix.asChars(); |
218 | ::memcpy(prefix_chars.begin(), &segment_count, sizeof(uint32_t)); |
219 | |
220 | // read size of each segment |
221 | for (size_t i = 0; i <= segment_count; ++i) |
222 | in.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t)); |
223 | |
224 | // calculate size of message |
225 | const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix); |
226 | const auto expected_bytes = expected_words * sizeof(capnp::word); |
227 | const auto data_size = expected_bytes - prefix_size; |
228 | auto msg = kj::heapArray<capnp::word>(expected_words); |
229 | auto msg_chars = msg.asChars(); |
230 | |
231 | // read full message |
232 | ::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size); |
233 | in.readStrict(msg_chars.begin() + prefix_size, data_size); |
234 | |
235 | return msg; |
236 | } |
237 | |
238 | bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) |
239 | { |
240 | if (in.eof()) |
241 | return false; |
242 | |
243 | auto array = readMessage(); |
244 | |
245 | #if CAPNP_VERSION >= 8000 |
246 | capnp::UnalignedFlatArrayMessageReader msg(array); |
247 | #else |
248 | capnp::FlatArrayMessageReader msg(array); |
249 | #endif |
250 | std::vector<capnp::DynamicStruct::Reader> stack; |
251 | stack.push_back(msg.getRoot<capnp::DynamicStruct>(root)); |
252 | |
253 | for (auto action : actions) |
254 | { |
255 | switch (action.type) |
256 | { |
257 | case Action::READ: |
258 | { |
259 | Field value = convertNodeToField(stack.back().get(action.field)); |
260 | if (action.columns.size() > 1) |
261 | { |
262 | // Nested columns must be flattened into several arrays |
263 | // e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...) |
264 | const auto & collected = DB::get<const Array &>(value); |
265 | size_t size = collected.size(); |
266 | // The flattened array contains an array of a part of the nested tuple |
267 | Array flattened(size); |
268 | for (size_t column_index = 0; column_index < action.columns.size(); ++column_index) |
269 | { |
270 | // Populate array with a single tuple elements |
271 | for (size_t off = 0; off < size; ++off) |
272 | { |
273 | const auto & tuple = DB::get<const Tuple &>(collected[off]); |
274 | flattened[off] = tuple[column_index]; |
275 | } |
276 | auto & col = columns[action.columns[column_index]]; |
277 | col->insert(flattened); |
278 | } |
279 | } |
280 | else |
281 | { |
282 | auto & col = columns[action.columns[0]]; |
283 | col->insert(value); |
284 | } |
285 | |
286 | break; |
287 | } |
288 | case Action::POP: |
289 | stack.pop_back(); |
290 | break; |
291 | case Action::PUSH: |
292 | stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>()); |
293 | break; |
294 | } |
295 | } |
296 | |
297 | return true; |
298 | } |
299 | |
300 | void registerInputFormatProcessorCapnProto(FormatFactory & factory) |
301 | { |
302 | factory.registerInputFormatProcessor( |
303 | "CapnProto" , |
304 | [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) |
305 | { |
306 | return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params), |
307 | FormatSchemaInfo(settings.schema.format_schema, "CapnProto" , true, |
308 | settings.schema.is_server, settings.schema.format_schema_path)); |
309 | }); |
310 | } |
311 | |
312 | } |
313 | |
314 | #else |
315 | |
316 | namespace DB |
317 | { |
318 | class FormatFactory; |
319 | void registerInputFormatProcessorCapnProto(FormatFactory &) {} |
320 | } |
321 | |
322 | #endif // USE_CAPNP |
323 | |