1 | #pragma once |
2 | |
3 | #include "config_formats.h" |
4 | #if USE_CAPNP |
5 | |
6 | #include <Core/Block.h> |
7 | #include <Processors/Formats/IRowInputFormat.h> |
8 | #include <capnp/schema-parser.h> |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | class FormatSchemaInfo; |
14 | class ReadBuffer; |
15 | |
16 | /** A stream for reading messages in Cap'n Proto format in given schema. |
17 | * Like Protocol Buffers and Thrift (but unlike JSON or MessagePack), |
18 | * Cap'n Proto messages are strongly-typed and not self-describing. |
19 | * The schema in this case cannot be compiled in, so it uses a runtime schema parser. |
20 | * See https://capnproto.org/cxx.html |
21 | */ |
22 | class CapnProtoRowInputFormat : public IRowInputFormat |
23 | { |
24 | public: |
25 | struct NestedField |
26 | { |
27 | std::vector<std::string> tokens; |
28 | size_t pos; |
29 | }; |
30 | using NestedFieldList = std::vector<NestedField>; |
31 | |
32 | /** schema_dir - base path for schema files |
33 | * schema_file - location of the capnproto schema, e.g. "schema.capnp" |
34 | * root_object - name to the root object, e.g. "Message" |
35 | */ |
36 | CapnProtoRowInputFormat(ReadBuffer & in_, Block , Params params_, const FormatSchemaInfo & info); |
37 | |
38 | String getName() const override { return "CapnProtoRowInputFormat" ; } |
39 | |
40 | bool readRow(MutableColumns & columns, RowReadExtension &) override; |
41 | |
42 | private: |
43 | kj::Array<capnp::word> readMessage(); |
44 | |
45 | // Build a traversal plan from a sorted list of fields |
46 | void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader); |
47 | |
48 | /* Action for state machine for traversing nested structures. */ |
49 | using BlockPositionList = std::vector<size_t>; |
50 | struct Action |
51 | { |
52 | enum Type { POP, PUSH, READ }; |
53 | Type type; |
54 | capnp::StructSchema::Field field = {}; |
55 | BlockPositionList columns = {}; |
56 | }; |
57 | |
58 | // Wrapper for classes that could throw in destructor |
59 | // https://github.com/capnproto/capnproto/issues/553 |
60 | template <typename T> |
61 | struct DestructorCatcher |
62 | { |
63 | T impl; |
64 | template <typename ... Arg> |
65 | DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {} |
66 | ~DestructorCatcher() noexcept try { } catch (...) { return; } |
67 | }; |
68 | using SchemaParser = DestructorCatcher<capnp::SchemaParser>; |
69 | |
70 | std::shared_ptr<SchemaParser> parser; |
71 | capnp::StructSchema root; |
72 | std::vector<Action> actions; |
73 | }; |
74 | |
75 | } |
76 | |
77 | #endif // USE_CAPNP |
78 | |