1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// C++ object model and user API for interprocess schema messaging
19
20#ifndef ARROW_IPC_MESSAGE_H
21#define ARROW_IPC_MESSAGE_H
22
23#include <cstdint>
24#include <memory>
25#include <string>
26
27#include "arrow/status.h"
28#include "arrow/util/macros.h"
29#include "arrow/util/visibility.h"
30
31namespace arrow {
32
33class Buffer;
34
35namespace io {
36
37class FileInterface;
38class InputStream;
39class OutputStream;
40class RandomAccessFile;
41
42} // namespace io
43
44namespace ipc {
45
46enum class MetadataVersion : char {
47 /// 0.1.0
48 V1,
49
50 /// 0.2.0
51 V2,
52
53 /// 0.3.0 to 0.7.1
54 V3,
55
56 /// >= 0.8.0
57 V4
58};
59
60// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
61// deeply nested schemas, it is expected the user will indicate explicitly the
62// maximum allowed recursion depth
63constexpr int kMaxNestingDepth = 64;
64
65// Read interface classes. We do not fully deserialize the flatbuffers so that
66// individual fields metadata can be retrieved from very large schema without
67//
68
69/// \class Message
70/// \brief An IPC message including metadata and body
71class ARROW_EXPORT Message {
72 public:
73 enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR, SPARSE_TENSOR };
74
75 /// \brief Construct message, but do not validate
76 ///
77 /// Use at your own risk; Message::Open has more metadata validation
78 Message(const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body);
79
80 ~Message();
81
82 /// \brief Create and validate a Message instance from two buffers
83 ///
84 /// \param[in] metadata a buffer containing the Flatbuffer metadata
85 /// \param[in] body a buffer containing the message body, which may be null
86 /// \param[out] out the created message
87 /// \return Status
88 static Status Open(const std::shared_ptr<Buffer>& metadata,
89 const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out);
90
91 /// \brief Read message body and create Message given Flatbuffer metadata
92 /// \param[in] metadata containing a serialized Message flatbuffer
93 /// \param[in] stream an InputStream
94 /// \param[out] out the created Message
95 /// \return Status
96 ///
97 /// \note If stream supports zero-copy, this is zero-copy
98 static Status ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
99 std::unique_ptr<Message>* out);
100
101 /// \brief Read message body from position in file, and create Message given
102 /// the Flatbuffer metadata
103 /// \param[in] offset the position in the file where the message body starts.
104 /// \param[in] metadata containing a serialized Message flatbuffer
105 /// \param[in] file the seekable file interface to read from
106 /// \param[out] out the created Message
107 /// \return Status
108 ///
109 /// \note If file supports zero-copy, this is zero-copy
110 static Status ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
111 io::RandomAccessFile* file, std::unique_ptr<Message>* out);
112
113 /// \brief Return true if message type and contents are equal
114 ///
115 /// \param other another message
116 /// \return true if contents equal
117 bool Equals(const Message& other) const;
118
119 /// \brief the Message metadata
120 ///
121 /// \return buffer
122 std::shared_ptr<Buffer> metadata() const;
123
124 /// \brief the Message body, if any
125 ///
126 /// \return buffer is null if no body
127 std::shared_ptr<Buffer> body() const;
128
129 /// \brief The expected body length according to the metadata, for
130 /// verification purposes
131 int64_t body_length() const;
132
133 /// \brief The Message type
134 Type type() const;
135
136 /// \brief The Message metadata version
137 MetadataVersion metadata_version() const;
138
139 const void* header() const;
140
141 /// \brief Write length-prefixed metadata and body to output stream
142 ///
143 /// \param[in] file output stream to write to
144 /// \param[in] alignment byte alignment for metadata, usually 8 or
145 /// 64. Whether the body is padded depends on the metadata; if the body
146 /// buffer is smaller than the size indicated in the metadata, then extra
147 /// padding bytes will be written
148 /// \param[out] output_length the number of bytes written
149 /// \return Status
150 Status SerializeTo(io::OutputStream* file, int32_t alignment,
151 int64_t* output_length) const;
152
153 /// \brief Return true if the Message metadata passes Flatbuffer validation
154 bool Verify() const;
155
156 private:
157 // Hide serialization details from user API
158 class MessageImpl;
159 std::unique_ptr<MessageImpl> impl_;
160
161 ARROW_DISALLOW_COPY_AND_ASSIGN(Message);
162};
163
164ARROW_EXPORT std::string FormatMessageType(Message::Type type);
165
166/// \brief Abstract interface for a sequence of messages
167/// \since 0.5.0
168class ARROW_EXPORT MessageReader {
169 public:
170 virtual ~MessageReader() = default;
171
172 /// \brief Create MessageReader that reads from InputStream
173 static std::unique_ptr<MessageReader> Open(io::InputStream* stream);
174
175 /// \brief Create MessageReader that reads from owned InputStream
176 static std::unique_ptr<MessageReader> Open(
177 const std::shared_ptr<io::InputStream>& owned_stream);
178
179 /// \brief Read next Message from the interface
180 ///
181 /// \param[out] message an arrow::ipc::Message instance
182 /// \return Status
183 virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0;
184};
185
186/// \brief Read encapsulated RPC message from position in file
187///
188/// Read a length-prefixed message flatbuffer starting at the indicated file
189/// offset. If the message has a body with non-zero length, it will also be
190/// read
191///
192/// The metadata_length includes at least the length prefix and the flatbuffer
193///
194/// \param[in] offset the position in the file where the message starts. The
195/// first 4 bytes after the offset are the message length
196/// \param[in] metadata_length the total number of bytes to read from file
197/// \param[in] file the seekable file interface to read from
198/// \param[out] message the message read
199/// \return Status success or failure
200ARROW_EXPORT
201Status ReadMessage(const int64_t offset, const int32_t metadata_length,
202 io::RandomAccessFile* file, std::unique_ptr<Message>* message);
203
204/// \brief Advance stream to an 8-byte offset if its position is not a multiple
205/// of 8 already
206/// \param[in] stream an input stream
207/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
208/// or 64, to ensure the body starts on a multiple of that alignment
209/// \return Status
210ARROW_EXPORT
211Status AlignStream(io::InputStream* stream, int32_t alignment = 8);
212
213/// \brief Advance stream to an 8-byte offset if its position is not a multiple
214/// of 8 already
215/// \param[in] stream an output stream
216/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
217/// or 64, to ensure the body starts on a multiple of that alignment
218/// \return Status
219ARROW_EXPORT
220Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
221
222/// \brief Return error Status if file position is not a multiple of the
223/// indicated alignment
224ARROW_EXPORT
225Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
226
227/// \brief Read encapsulated RPC message (metadata and body) from InputStream
228///
229/// Read length-prefixed message with as-yet unknown length. Returns null if
230/// there are not enough bytes available or the message length is 0 (e.g. EOS
231/// in a stream)
232ARROW_EXPORT
233Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
234
235} // namespace ipc
236} // namespace arrow
237
238#endif // ARROW_IPC_MESSAGE_H
239