1#pragma once
2
3#include "config_formats.h"
4#if USE_CAPNP
5
6#include <Core/Block.h>
7#include <Processors/Formats/IRowInputFormat.h>
8#include <capnp/schema-parser.h>
9
10namespace DB
11{
12
13class FormatSchemaInfo;
14class ReadBuffer;
15
16/** A stream for reading messages in Cap'n Proto format in given schema.
17 * Like Protocol Buffers and Thrift (but unlike JSON or MessagePack),
18 * Cap'n Proto messages are strongly-typed and not self-describing.
19 * The schema in this case cannot be compiled in, so it uses a runtime schema parser.
20 * See https://capnproto.org/cxx.html
21 */
22class CapnProtoRowInputFormat : public IRowInputFormat
23{
24public:
25 struct NestedField
26 {
27 std::vector<std::string> tokens;
28 size_t pos;
29 };
30 using NestedFieldList = std::vector<NestedField>;
31
32 /** schema_dir - base path for schema files
33 * schema_file - location of the capnproto schema, e.g. "schema.capnp"
34 * root_object - name to the root object, e.g. "Message"
35 */
36 CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info);
37
38 String getName() const override { return "CapnProtoRowInputFormat"; }
39
40 bool readRow(MutableColumns & columns, RowReadExtension &) override;
41
42private:
43 kj::Array<capnp::word> readMessage();
44
45 // Build a traversal plan from a sorted list of fields
46 void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);
47
48 /* Action for state machine for traversing nested structures. */
49 using BlockPositionList = std::vector<size_t>;
50 struct Action
51 {
52 enum Type { POP, PUSH, READ };
53 Type type;
54 capnp::StructSchema::Field field = {};
55 BlockPositionList columns = {};
56 };
57
58 // Wrapper for classes that could throw in destructor
59 // https://github.com/capnproto/capnproto/issues/553
60 template <typename T>
61 struct DestructorCatcher
62 {
63 T impl;
64 template <typename ... Arg>
65 DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
66 ~DestructorCatcher() noexcept try { } catch (...) { return; }
67 };
68 using SchemaParser = DestructorCatcher<capnp::SchemaParser>;
69
70 std::shared_ptr<SchemaParser> parser;
71 capnp::StructSchema root;
72 std::vector<Action> actions;
73};
74
75}
76
77#endif // USE_CAPNP
78