1#include "CapnProtoRowInputFormat.h"
2#if USE_CAPNP
3
4#include <Core/Field.h>
5#include <IO/ReadBuffer.h>
6#include <Interpreters/Context.h>
7#include <Formats/FormatFactory.h>
8#include <Formats/FormatSchemaInfo.h>
9#include <capnp/serialize.h>
10#include <capnp/dynamic.h>
11#include <capnp/common.h>
12#include <boost/algorithm/string.hpp>
13#include <boost/range/join.hpp>
14#include <common/logger_useful.h>
15
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22 extern const int BAD_TYPE_OF_FIELD;
23 extern const int BAD_ARGUMENTS;
24 extern const int THERE_IS_NO_COLUMN;
25 extern const int LOGICAL_ERROR;
26}
27
28static CapnProtoRowInputFormat::NestedField split(const Block & header, size_t i)
29{
30 CapnProtoRowInputFormat::NestedField field = {{}, i};
31
32 // Remove leading dot in field definition, e.g. ".msg" -> "msg"
33 String name(header.safeGetByPosition(i).name);
34 if (!name.empty() && name[0] == '.')
35 name.erase(0, 1);
36
37 boost::split(field.tokens, name, boost::is_any_of("._"));
38 return field;
39}
40
41
42static Field convertNodeToField(const capnp::DynamicValue::Reader & value)
43{
44 switch (value.getType())
45 {
46 case capnp::DynamicValue::UNKNOWN:
47 throw Exception("Unknown field type", ErrorCodes::BAD_TYPE_OF_FIELD);
48 case capnp::DynamicValue::VOID:
49 return Field();
50 case capnp::DynamicValue::BOOL:
51 return value.as<bool>() ? 1u : 0u;
52 case capnp::DynamicValue::INT:
53 return value.as<int64_t>();
54 case capnp::DynamicValue::UINT:
55 return value.as<uint64_t>();
56 case capnp::DynamicValue::FLOAT:
57 return value.as<double>();
58 case capnp::DynamicValue::TEXT:
59 {
60 auto arr = value.as<capnp::Text>();
61 return String(arr.begin(), arr.size());
62 }
63 case capnp::DynamicValue::DATA:
64 {
65 auto arr = value.as<capnp::Data>().asChars();
66 return String(arr.begin(), arr.size());
67 }
68 case capnp::DynamicValue::LIST:
69 {
70 auto listValue = value.as<capnp::DynamicList>();
71 Array res(listValue.size());
72 for (auto i : kj::indices(listValue))
73 res[i] = convertNodeToField(listValue[i]);
74
75 return res;
76 }
77 case capnp::DynamicValue::ENUM:
78 return value.as<capnp::DynamicEnum>().getRaw();
79 case capnp::DynamicValue::STRUCT:
80 {
81 auto structValue = value.as<capnp::DynamicStruct>();
82 const auto & fields = structValue.getSchema().getFields();
83
84 Tuple tuple(fields.size());
85 for (auto i : kj::indices(fields))
86 tuple[i] = convertNodeToField(structValue.get(fields[i]));
87
88 return tuple;
89 }
90 case capnp::DynamicValue::CAPABILITY:
91 throw Exception("CAPABILITY type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
92 case capnp::DynamicValue::ANY_POINTER:
93 throw Exception("ANY_POINTER type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
94 }
95 return Field();
96}
97
98static capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
99{
100 KJ_IF_MAYBE(child, node.findFieldByName(field))
101 return *child;
102 else
103 throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN);
104}
105
106
107void CapnProtoRowInputFormat::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader)
108{
109 /// Columns in a table can map to fields in Cap'n'Proto or to structs.
110
111 /// Store common parents and their tokens in order to backtrack.
112 std::vector<capnp::StructSchema::Field> parents;
113 std::vector<std::string> parent_tokens;
114
115 capnp::StructSchema cur_reader = reader;
116
117 for (const auto & field : sorted_fields)
118 {
119 if (field.tokens.empty())
120 throw Exception("Logical error in CapnProtoRowInputFormat", ErrorCodes::LOGICAL_ERROR);
121
122 // Backtrack to common parent
123 while (field.tokens.size() < parent_tokens.size() + 1
124 || !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin()))
125 {
126 actions.push_back({Action::POP});
127 parents.pop_back();
128 parent_tokens.pop_back();
129
130 if (parents.empty())
131 {
132 cur_reader = reader;
133 break;
134 }
135 else
136 cur_reader = parents.back().getType().asStruct();
137 }
138
139 // Go forward
140 while (parent_tokens.size() + 1 < field.tokens.size())
141 {
142 const auto & token = field.tokens[parents.size()];
143 auto node = getFieldOrThrow(cur_reader, token);
144 if (node.getType().isStruct())
145 {
146 // Descend to field structure
147 parents.emplace_back(node);
148 parent_tokens.emplace_back(token);
149 cur_reader = node.getType().asStruct();
150 actions.push_back({Action::PUSH, node});
151 }
152 else if (node.getType().isList())
153 {
154 break; // Collect list
155 }
156 else
157 throw Exception("Field " + token + " is neither Struct nor List", ErrorCodes::BAD_TYPE_OF_FIELD);
158 }
159
160 // Read field from the structure
161 auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]);
162 if (node.getType().isList() && !actions.empty() && actions.back().field == node)
163 {
164 // The field list here flattens Nested elements into multiple arrays
165 // In order to map Nested types in Cap'nProto back, they need to be collected
166 // Since the field names are sorted, the order of field positions must be preserved
167 // For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first
168 // even though it's position is second.
169 auto & columns = actions.back().columns;
170 auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos);
171 columns.insert(it, field.pos);
172 }
173 else
174 {
175 actions.push_back({Action::READ, node, {field.pos}});
176 }
177 }
178}
179
180CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info)
181 : IRowInputFormat(std::move(header), in_, std::move(params_)), parser(std::make_shared<SchemaParser>())
182{
183 // Parse the schema and fetch the root object
184
185#pragma GCC diagnostic push
186#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
187 auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {});
188#pragma GCC diagnostic pop
189
190 root = schema.getNested(info.messageName()).asStruct();
191
192 /**
193 * The schema typically consists of fields in various nested structures.
194 * Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent,
195 * and the nesting level doesn't decrease to make traversal easier.
196 */
197 auto & sample = getPort().getHeader();
198 NestedFieldList list;
199 size_t num_columns = sample.columns();
200 for (size_t i = 0; i < num_columns; ++i)
201 list.push_back(split(sample, i));
202
203 // Order list first by value of strings then by length of string vector.
204 std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; });
205 createActions(list, root);
206}
207
208kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
209{
210 uint32_t segment_count;
211 in.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t));
212
213 // one for segmentCount and one because segmentCount starts from 0
214 const auto prefix_size = (2 + segment_count) * sizeof(uint32_t);
215 const auto words_prefix_size = (segment_count + 1) / 2 + 1;
216 auto prefix = kj::heapArray<capnp::word>(words_prefix_size);
217 auto prefix_chars = prefix.asChars();
218 ::memcpy(prefix_chars.begin(), &segment_count, sizeof(uint32_t));
219
220 // read size of each segment
221 for (size_t i = 0; i <= segment_count; ++i)
222 in.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t));
223
224 // calculate size of message
225 const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix);
226 const auto expected_bytes = expected_words * sizeof(capnp::word);
227 const auto data_size = expected_bytes - prefix_size;
228 auto msg = kj::heapArray<capnp::word>(expected_words);
229 auto msg_chars = msg.asChars();
230
231 // read full message
232 ::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size);
233 in.readStrict(msg_chars.begin() + prefix_size, data_size);
234
235 return msg;
236}
237
238bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
239{
240 if (in.eof())
241 return false;
242
243 auto array = readMessage();
244
245#if CAPNP_VERSION >= 8000
246 capnp::UnalignedFlatArrayMessageReader msg(array);
247#else
248 capnp::FlatArrayMessageReader msg(array);
249#endif
250 std::vector<capnp::DynamicStruct::Reader> stack;
251 stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
252
253 for (auto action : actions)
254 {
255 switch (action.type)
256 {
257 case Action::READ:
258 {
259 Field value = convertNodeToField(stack.back().get(action.field));
260 if (action.columns.size() > 1)
261 {
262 // Nested columns must be flattened into several arrays
263 // e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
264 const auto & collected = DB::get<const Array &>(value);
265 size_t size = collected.size();
266 // The flattened array contains an array of a part of the nested tuple
267 Array flattened(size);
268 for (size_t column_index = 0; column_index < action.columns.size(); ++column_index)
269 {
270 // Populate array with a single tuple elements
271 for (size_t off = 0; off < size; ++off)
272 {
273 const auto & tuple = DB::get<const Tuple &>(collected[off]);
274 flattened[off] = tuple[column_index];
275 }
276 auto & col = columns[action.columns[column_index]];
277 col->insert(flattened);
278 }
279 }
280 else
281 {
282 auto & col = columns[action.columns[0]];
283 col->insert(value);
284 }
285
286 break;
287 }
288 case Action::POP:
289 stack.pop_back();
290 break;
291 case Action::PUSH:
292 stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
293 break;
294 }
295 }
296
297 return true;
298}
299
300void registerInputFormatProcessorCapnProto(FormatFactory & factory)
301{
302 factory.registerInputFormatProcessor(
303 "CapnProto",
304 [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
305 {
306 return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params),
307 FormatSchemaInfo(settings.schema.format_schema, "CapnProto", true,
308 settings.schema.is_server, settings.schema.format_schema_path));
309 });
310}
311
312}
313
314#else
315
316namespace DB
317{
318 class FormatFactory;
319 void registerInputFormatProcessorCapnProto(FormatFactory &) {}
320}
321
322#endif // USE_CAPNP
323