1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // C++ object model and user API for interprocess schema messaging |
19 | |
20 | #ifndef ARROW_IPC_MESSAGE_H |
21 | #define ARROW_IPC_MESSAGE_H |
22 | |
23 | #include <cstdint> |
24 | #include <memory> |
25 | #include <string> |
26 | |
27 | #include "arrow/status.h" |
28 | #include "arrow/util/macros.h" |
29 | #include "arrow/util/visibility.h" |
30 | |
31 | namespace arrow { |
32 | |
33 | class Buffer; |
34 | |
35 | namespace io { |
36 | |
37 | class FileInterface; |
38 | class InputStream; |
39 | class OutputStream; |
40 | class RandomAccessFile; |
41 | |
42 | } // namespace io |
43 | |
44 | namespace ipc { |
45 | |
46 | enum class MetadataVersion : char { |
47 | /// 0.1.0 |
48 | V1, |
49 | |
50 | /// 0.2.0 |
51 | V2, |
52 | |
53 | /// 0.3.0 to 0.7.1 |
54 | V3, |
55 | |
56 | /// >= 0.8.0 |
57 | V4 |
58 | }; |
59 | |
60 | // ARROW-109: We set this number arbitrarily to help catch user mistakes. For |
61 | // deeply nested schemas, it is expected the user will indicate explicitly the |
62 | // maximum allowed recursion depth |
63 | constexpr int kMaxNestingDepth = 64; |
64 | |
65 | // Read interface classes. We do not fully deserialize the flatbuffers so that |
66 | // individual fields metadata can be retrieved from very large schema without |
67 | // |
68 | |
69 | /// \class Message |
70 | /// \brief An IPC message including metadata and body |
71 | class ARROW_EXPORT Message { |
72 | public: |
73 | enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR, SPARSE_TENSOR }; |
74 | |
75 | /// \brief Construct message, but do not validate |
76 | /// |
77 | /// Use at your own risk; Message::Open has more metadata validation |
78 | Message(const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body); |
79 | |
80 | ~Message(); |
81 | |
82 | /// \brief Create and validate a Message instance from two buffers |
83 | /// |
84 | /// \param[in] metadata a buffer containing the Flatbuffer metadata |
85 | /// \param[in] body a buffer containing the message body, which may be null |
86 | /// \param[out] out the created message |
87 | /// \return Status |
88 | static Status Open(const std::shared_ptr<Buffer>& metadata, |
89 | const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out); |
90 | |
91 | /// \brief Read message body and create Message given Flatbuffer metadata |
92 | /// \param[in] metadata containing a serialized Message flatbuffer |
93 | /// \param[in] stream an InputStream |
94 | /// \param[out] out the created Message |
95 | /// \return Status |
96 | /// |
97 | /// \note If stream supports zero-copy, this is zero-copy |
98 | static Status ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream, |
99 | std::unique_ptr<Message>* out); |
100 | |
101 | /// \brief Read message body from position in file, and create Message given |
102 | /// the Flatbuffer metadata |
103 | /// \param[in] offset the position in the file where the message body starts. |
104 | /// \param[in] metadata containing a serialized Message flatbuffer |
105 | /// \param[in] file the seekable file interface to read from |
106 | /// \param[out] out the created Message |
107 | /// \return Status |
108 | /// |
109 | /// \note If file supports zero-copy, this is zero-copy |
110 | static Status ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata, |
111 | io::RandomAccessFile* file, std::unique_ptr<Message>* out); |
112 | |
113 | /// \brief Return true if message type and contents are equal |
114 | /// |
115 | /// \param other another message |
116 | /// \return true if contents equal |
117 | bool Equals(const Message& other) const; |
118 | |
119 | /// \brief the Message metadata |
120 | /// |
121 | /// \return buffer |
122 | std::shared_ptr<Buffer> metadata() const; |
123 | |
124 | /// \brief the Message body, if any |
125 | /// |
126 | /// \return buffer is null if no body |
127 | std::shared_ptr<Buffer> body() const; |
128 | |
129 | /// \brief The expected body length according to the metadata, for |
130 | /// verification purposes |
131 | int64_t body_length() const; |
132 | |
133 | /// \brief The Message type |
134 | Type type() const; |
135 | |
136 | /// \brief The Message metadata version |
137 | MetadataVersion metadata_version() const; |
138 | |
139 | const void* () const; |
140 | |
141 | /// \brief Write length-prefixed metadata and body to output stream |
142 | /// |
143 | /// \param[in] file output stream to write to |
144 | /// \param[in] alignment byte alignment for metadata, usually 8 or |
145 | /// 64. Whether the body is padded depends on the metadata; if the body |
146 | /// buffer is smaller than the size indicated in the metadata, then extra |
147 | /// padding bytes will be written |
148 | /// \param[out] output_length the number of bytes written |
149 | /// \return Status |
150 | Status SerializeTo(io::OutputStream* file, int32_t alignment, |
151 | int64_t* output_length) const; |
152 | |
153 | /// \brief Return true if the Message metadata passes Flatbuffer validation |
154 | bool Verify() const; |
155 | |
156 | private: |
157 | // Hide serialization details from user API |
158 | class MessageImpl; |
159 | std::unique_ptr<MessageImpl> impl_; |
160 | |
161 | ARROW_DISALLOW_COPY_AND_ASSIGN(Message); |
162 | }; |
163 | |
164 | ARROW_EXPORT std::string FormatMessageType(Message::Type type); |
165 | |
166 | /// \brief Abstract interface for a sequence of messages |
167 | /// \since 0.5.0 |
168 | class ARROW_EXPORT MessageReader { |
169 | public: |
170 | virtual ~MessageReader() = default; |
171 | |
172 | /// \brief Create MessageReader that reads from InputStream |
173 | static std::unique_ptr<MessageReader> Open(io::InputStream* stream); |
174 | |
175 | /// \brief Create MessageReader that reads from owned InputStream |
176 | static std::unique_ptr<MessageReader> Open( |
177 | const std::shared_ptr<io::InputStream>& owned_stream); |
178 | |
179 | /// \brief Read next Message from the interface |
180 | /// |
181 | /// \param[out] message an arrow::ipc::Message instance |
182 | /// \return Status |
183 | virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0; |
184 | }; |
185 | |
186 | /// \brief Read encapsulated RPC message from position in file |
187 | /// |
188 | /// Read a length-prefixed message flatbuffer starting at the indicated file |
189 | /// offset. If the message has a body with non-zero length, it will also be |
190 | /// read |
191 | /// |
192 | /// The metadata_length includes at least the length prefix and the flatbuffer |
193 | /// |
194 | /// \param[in] offset the position in the file where the message starts. The |
195 | /// first 4 bytes after the offset are the message length |
196 | /// \param[in] metadata_length the total number of bytes to read from file |
197 | /// \param[in] file the seekable file interface to read from |
198 | /// \param[out] message the message read |
199 | /// \return Status success or failure |
200 | ARROW_EXPORT |
201 | Status ReadMessage(const int64_t offset, const int32_t metadata_length, |
202 | io::RandomAccessFile* file, std::unique_ptr<Message>* message); |
203 | |
204 | /// \brief Advance stream to an 8-byte offset if its position is not a multiple |
205 | /// of 8 already |
206 | /// \param[in] stream an input stream |
207 | /// \param[in] alignment the byte multiple for the metadata prefix, usually 8 |
208 | /// or 64, to ensure the body starts on a multiple of that alignment |
209 | /// \return Status |
210 | ARROW_EXPORT |
211 | Status AlignStream(io::InputStream* stream, int32_t alignment = 8); |
212 | |
213 | /// \brief Advance stream to an 8-byte offset if its position is not a multiple |
214 | /// of 8 already |
215 | /// \param[in] stream an output stream |
216 | /// \param[in] alignment the byte multiple for the metadata prefix, usually 8 |
217 | /// or 64, to ensure the body starts on a multiple of that alignment |
218 | /// \return Status |
219 | ARROW_EXPORT |
220 | Status AlignStream(io::OutputStream* stream, int32_t alignment = 8); |
221 | |
222 | /// \brief Return error Status if file position is not a multiple of the |
223 | /// indicated alignment |
224 | ARROW_EXPORT |
225 | Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8); |
226 | |
227 | /// \brief Read encapsulated RPC message (metadata and body) from InputStream |
228 | /// |
229 | /// Read length-prefixed message with as-yet unknown length. Returns null if |
230 | /// there are not enough bytes available or the message length is 0 (e.g. EOS |
231 | /// in a stream) |
232 | ARROW_EXPORT |
233 | Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message); |
234 | |
235 | } // namespace ipc |
236 | } // namespace arrow |
237 | |
238 | #endif // ARROW_IPC_MESSAGE_H |
239 | |