| 1 | #include "CapnProtoRowInputFormat.h" |
| 2 | #if USE_CAPNP |
| 3 | |
| 4 | #include <Core/Field.h> |
| 5 | #include <IO/ReadBuffer.h> |
| 6 | #include <Interpreters/Context.h> |
| 7 | #include <Formats/FormatFactory.h> |
| 8 | #include <Formats/FormatSchemaInfo.h> |
| 9 | #include <capnp/serialize.h> |
| 10 | #include <capnp/dynamic.h> |
| 11 | #include <capnp/common.h> |
| 12 | #include <boost/algorithm/string.hpp> |
| 13 | #include <boost/range/join.hpp> |
| 14 | #include <common/logger_useful.h> |
| 15 | |
| 16 | |
| 17 | namespace DB |
| 18 | { |
| 19 | |
| 20 | namespace ErrorCodes |
| 21 | { |
| 22 | extern const int BAD_TYPE_OF_FIELD; |
| 23 | extern const int BAD_ARGUMENTS; |
| 24 | extern const int THERE_IS_NO_COLUMN; |
| 25 | extern const int LOGICAL_ERROR; |
| 26 | } |
| 27 | |
| 28 | static CapnProtoRowInputFormat::NestedField split(const Block & , size_t i) |
| 29 | { |
| 30 | CapnProtoRowInputFormat::NestedField field = {{}, i}; |
| 31 | |
| 32 | // Remove leading dot in field definition, e.g. ".msg" -> "msg" |
| 33 | String name(header.safeGetByPosition(i).name); |
| 34 | if (!name.empty() && name[0] == '.') |
| 35 | name.erase(0, 1); |
| 36 | |
| 37 | boost::split(field.tokens, name, boost::is_any_of("._" )); |
| 38 | return field; |
| 39 | } |
| 40 | |
| 41 | |
| 42 | static Field convertNodeToField(const capnp::DynamicValue::Reader & value) |
| 43 | { |
| 44 | switch (value.getType()) |
| 45 | { |
| 46 | case capnp::DynamicValue::UNKNOWN: |
| 47 | throw Exception("Unknown field type" , ErrorCodes::BAD_TYPE_OF_FIELD); |
| 48 | case capnp::DynamicValue::VOID: |
| 49 | return Field(); |
| 50 | case capnp::DynamicValue::BOOL: |
| 51 | return value.as<bool>() ? 1u : 0u; |
| 52 | case capnp::DynamicValue::INT: |
| 53 | return value.as<int64_t>(); |
| 54 | case capnp::DynamicValue::UINT: |
| 55 | return value.as<uint64_t>(); |
| 56 | case capnp::DynamicValue::FLOAT: |
| 57 | return value.as<double>(); |
| 58 | case capnp::DynamicValue::TEXT: |
| 59 | { |
| 60 | auto arr = value.as<capnp::Text>(); |
| 61 | return String(arr.begin(), arr.size()); |
| 62 | } |
| 63 | case capnp::DynamicValue::DATA: |
| 64 | { |
| 65 | auto arr = value.as<capnp::Data>().asChars(); |
| 66 | return String(arr.begin(), arr.size()); |
| 67 | } |
| 68 | case capnp::DynamicValue::LIST: |
| 69 | { |
| 70 | auto listValue = value.as<capnp::DynamicList>(); |
| 71 | Array res(listValue.size()); |
| 72 | for (auto i : kj::indices(listValue)) |
| 73 | res[i] = convertNodeToField(listValue[i]); |
| 74 | |
| 75 | return res; |
| 76 | } |
| 77 | case capnp::DynamicValue::ENUM: |
| 78 | return value.as<capnp::DynamicEnum>().getRaw(); |
| 79 | case capnp::DynamicValue::STRUCT: |
| 80 | { |
| 81 | auto structValue = value.as<capnp::DynamicStruct>(); |
| 82 | const auto & fields = structValue.getSchema().getFields(); |
| 83 | |
| 84 | Tuple tuple(fields.size()); |
| 85 | for (auto i : kj::indices(fields)) |
| 86 | tuple[i] = convertNodeToField(structValue.get(fields[i])); |
| 87 | |
| 88 | return tuple; |
| 89 | } |
| 90 | case capnp::DynamicValue::CAPABILITY: |
| 91 | throw Exception("CAPABILITY type not supported" , ErrorCodes::BAD_TYPE_OF_FIELD); |
| 92 | case capnp::DynamicValue::ANY_POINTER: |
| 93 | throw Exception("ANY_POINTER type not supported" , ErrorCodes::BAD_TYPE_OF_FIELD); |
| 94 | } |
| 95 | return Field(); |
| 96 | } |
| 97 | |
| 98 | static capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field) |
| 99 | { |
| 100 | KJ_IF_MAYBE(child, node.findFieldByName(field)) |
| 101 | return *child; |
| 102 | else |
| 103 | throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN); |
| 104 | } |
| 105 | |
| 106 | |
| 107 | void CapnProtoRowInputFormat::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader) |
| 108 | { |
| 109 | /// Columns in a table can map to fields in Cap'n'Proto or to structs. |
| 110 | |
| 111 | /// Store common parents and their tokens in order to backtrack. |
| 112 | std::vector<capnp::StructSchema::Field> parents; |
| 113 | std::vector<std::string> parent_tokens; |
| 114 | |
| 115 | capnp::StructSchema cur_reader = reader; |
| 116 | |
| 117 | for (const auto & field : sorted_fields) |
| 118 | { |
| 119 | if (field.tokens.empty()) |
| 120 | throw Exception("Logical error in CapnProtoRowInputFormat" , ErrorCodes::LOGICAL_ERROR); |
| 121 | |
| 122 | // Backtrack to common parent |
| 123 | while (field.tokens.size() < parent_tokens.size() + 1 |
| 124 | || !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin())) |
| 125 | { |
| 126 | actions.push_back({Action::POP}); |
| 127 | parents.pop_back(); |
| 128 | parent_tokens.pop_back(); |
| 129 | |
| 130 | if (parents.empty()) |
| 131 | { |
| 132 | cur_reader = reader; |
| 133 | break; |
| 134 | } |
| 135 | else |
| 136 | cur_reader = parents.back().getType().asStruct(); |
| 137 | } |
| 138 | |
| 139 | // Go forward |
| 140 | while (parent_tokens.size() + 1 < field.tokens.size()) |
| 141 | { |
| 142 | const auto & token = field.tokens[parents.size()]; |
| 143 | auto node = getFieldOrThrow(cur_reader, token); |
| 144 | if (node.getType().isStruct()) |
| 145 | { |
| 146 | // Descend to field structure |
| 147 | parents.emplace_back(node); |
| 148 | parent_tokens.emplace_back(token); |
| 149 | cur_reader = node.getType().asStruct(); |
| 150 | actions.push_back({Action::PUSH, node}); |
| 151 | } |
| 152 | else if (node.getType().isList()) |
| 153 | { |
| 154 | break; // Collect list |
| 155 | } |
| 156 | else |
| 157 | throw Exception("Field " + token + " is neither Struct nor List" , ErrorCodes::BAD_TYPE_OF_FIELD); |
| 158 | } |
| 159 | |
| 160 | // Read field from the structure |
| 161 | auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]); |
| 162 | if (node.getType().isList() && !actions.empty() && actions.back().field == node) |
| 163 | { |
| 164 | // The field list here flattens Nested elements into multiple arrays |
| 165 | // In order to map Nested types in Cap'nProto back, they need to be collected |
| 166 | // Since the field names are sorted, the order of field positions must be preserved |
| 167 | // For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first |
| 168 | // even though it's position is second. |
| 169 | auto & columns = actions.back().columns; |
| 170 | auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos); |
| 171 | columns.insert(it, field.pos); |
| 172 | } |
| 173 | else |
| 174 | { |
| 175 | actions.push_back({Action::READ, node, {field.pos}}); |
| 176 | } |
| 177 | } |
| 178 | } |
| 179 | |
| 180 | CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block , Params params_, const FormatSchemaInfo & info) |
| 181 | : IRowInputFormat(std::move(header), in_, std::move(params_)), parser(std::make_shared<SchemaParser>()) |
| 182 | { |
| 183 | // Parse the schema and fetch the root object |
| 184 | |
| 185 | #pragma GCC diagnostic push |
| 186 | #pragma GCC diagnostic ignored "-Wdeprecated-declarations" |
| 187 | auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {}); |
| 188 | #pragma GCC diagnostic pop |
| 189 | |
| 190 | root = schema.getNested(info.messageName()).asStruct(); |
| 191 | |
| 192 | /** |
| 193 | * The schema typically consists of fields in various nested structures. |
| 194 | * Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent, |
| 195 | * and the nesting level doesn't decrease to make traversal easier. |
| 196 | */ |
| 197 | auto & sample = getPort().getHeader(); |
| 198 | NestedFieldList list; |
| 199 | size_t num_columns = sample.columns(); |
| 200 | for (size_t i = 0; i < num_columns; ++i) |
| 201 | list.push_back(split(sample, i)); |
| 202 | |
| 203 | // Order list first by value of strings then by length of string vector. |
| 204 | std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; }); |
| 205 | createActions(list, root); |
| 206 | } |
| 207 | |
| 208 | kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage() |
| 209 | { |
| 210 | uint32_t segment_count; |
| 211 | in.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t)); |
| 212 | |
| 213 | // one for segmentCount and one because segmentCount starts from 0 |
| 214 | const auto prefix_size = (2 + segment_count) * sizeof(uint32_t); |
| 215 | const auto words_prefix_size = (segment_count + 1) / 2 + 1; |
| 216 | auto prefix = kj::heapArray<capnp::word>(words_prefix_size); |
| 217 | auto prefix_chars = prefix.asChars(); |
| 218 | ::memcpy(prefix_chars.begin(), &segment_count, sizeof(uint32_t)); |
| 219 | |
| 220 | // read size of each segment |
| 221 | for (size_t i = 0; i <= segment_count; ++i) |
| 222 | in.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t)); |
| 223 | |
| 224 | // calculate size of message |
| 225 | const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix); |
| 226 | const auto expected_bytes = expected_words * sizeof(capnp::word); |
| 227 | const auto data_size = expected_bytes - prefix_size; |
| 228 | auto msg = kj::heapArray<capnp::word>(expected_words); |
| 229 | auto msg_chars = msg.asChars(); |
| 230 | |
| 231 | // read full message |
| 232 | ::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size); |
| 233 | in.readStrict(msg_chars.begin() + prefix_size, data_size); |
| 234 | |
| 235 | return msg; |
| 236 | } |
| 237 | |
| 238 | bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) |
| 239 | { |
| 240 | if (in.eof()) |
| 241 | return false; |
| 242 | |
| 243 | auto array = readMessage(); |
| 244 | |
| 245 | #if CAPNP_VERSION >= 8000 |
| 246 | capnp::UnalignedFlatArrayMessageReader msg(array); |
| 247 | #else |
| 248 | capnp::FlatArrayMessageReader msg(array); |
| 249 | #endif |
| 250 | std::vector<capnp::DynamicStruct::Reader> stack; |
| 251 | stack.push_back(msg.getRoot<capnp::DynamicStruct>(root)); |
| 252 | |
| 253 | for (auto action : actions) |
| 254 | { |
| 255 | switch (action.type) |
| 256 | { |
| 257 | case Action::READ: |
| 258 | { |
| 259 | Field value = convertNodeToField(stack.back().get(action.field)); |
| 260 | if (action.columns.size() > 1) |
| 261 | { |
| 262 | // Nested columns must be flattened into several arrays |
| 263 | // e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...) |
| 264 | const auto & collected = DB::get<const Array &>(value); |
| 265 | size_t size = collected.size(); |
| 266 | // The flattened array contains an array of a part of the nested tuple |
| 267 | Array flattened(size); |
| 268 | for (size_t column_index = 0; column_index < action.columns.size(); ++column_index) |
| 269 | { |
| 270 | // Populate array with a single tuple elements |
| 271 | for (size_t off = 0; off < size; ++off) |
| 272 | { |
| 273 | const auto & tuple = DB::get<const Tuple &>(collected[off]); |
| 274 | flattened[off] = tuple[column_index]; |
| 275 | } |
| 276 | auto & col = columns[action.columns[column_index]]; |
| 277 | col->insert(flattened); |
| 278 | } |
| 279 | } |
| 280 | else |
| 281 | { |
| 282 | auto & col = columns[action.columns[0]]; |
| 283 | col->insert(value); |
| 284 | } |
| 285 | |
| 286 | break; |
| 287 | } |
| 288 | case Action::POP: |
| 289 | stack.pop_back(); |
| 290 | break; |
| 291 | case Action::PUSH: |
| 292 | stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>()); |
| 293 | break; |
| 294 | } |
| 295 | } |
| 296 | |
| 297 | return true; |
| 298 | } |
| 299 | |
| 300 | void registerInputFormatProcessorCapnProto(FormatFactory & factory) |
| 301 | { |
| 302 | factory.registerInputFormatProcessor( |
| 303 | "CapnProto" , |
| 304 | [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) |
| 305 | { |
| 306 | return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params), |
| 307 | FormatSchemaInfo(settings.schema.format_schema, "CapnProto" , true, |
| 308 | settings.schema.is_server, settings.schema.format_schema_path)); |
| 309 | }); |
| 310 | } |
| 311 | |
| 312 | } |
| 313 | |
| 314 | #else |
| 315 | |
| 316 | namespace DB |
| 317 | { |
| 318 | class FormatFactory; |
| 319 | void registerInputFormatProcessorCapnProto(FormatFactory &) {} |
| 320 | } |
| 321 | |
| 322 | #endif // USE_CAPNP |
| 323 | |