1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // Implement Arrow streaming binary format |
19 | |
20 | #ifndef ARROW_IPC_WRITER_H |
21 | #define ARROW_IPC_WRITER_H |
22 | |
23 | #include <cstdint> |
24 | #include <memory> |
25 | #include <vector> |
26 | |
27 | #include "arrow/ipc/message.h" |
28 | #include "arrow/util/visibility.h" |
29 | |
30 | namespace arrow { |
31 | |
32 | class Buffer; |
33 | class MemoryPool; |
34 | class RecordBatch; |
35 | class Schema; |
36 | class Status; |
37 | class Table; |
38 | class Tensor; |
39 | class SparseTensor; |
40 | |
41 | namespace io { |
42 | |
43 | class OutputStream; |
44 | |
45 | } // namespace io |
46 | |
47 | namespace ipc { |
48 | |
49 | /// \class RecordBatchWriter |
50 | /// \brief Abstract interface for writing a stream of record batches |
51 | class ARROW_EXPORT RecordBatchWriter { |
52 | public: |
53 | virtual ~RecordBatchWriter(); |
54 | |
55 | /// \brief Write a record batch to the stream |
56 | /// |
57 | /// \param[in] batch the record batch to write to the stream |
58 | /// \param[in] allow_64bit if true, allow field lengths that don't fit |
59 | /// in a signed 32-bit int |
60 | /// \return Status |
61 | virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0; |
62 | |
63 | /// \brief Write possibly-chunked table by creating sequence of record batches |
64 | /// \param[in] table table to write |
65 | /// \return Status |
66 | Status WriteTable(const Table& table); |
67 | |
68 | /// \brief Write Table with a particular chunksize |
69 | /// \param[in] table table to write |
70 | /// \param[in] max_chunksize maximum chunk size for table chunks |
71 | /// \return Status |
72 | Status WriteTable(const Table& table, int64_t max_chunksize); |
73 | |
74 | /// \brief Perform any logic necessary to finish the stream |
75 | /// |
76 | /// \return Status |
77 | virtual Status Close() = 0; |
78 | |
79 | /// In some cases, writing may require memory allocation. We use the default |
80 | /// memory pool, but provide the option to override |
81 | /// |
82 | /// \param pool the memory pool to use for required allocations |
83 | virtual void set_memory_pool(MemoryPool* pool) = 0; |
84 | }; |
85 | |
86 | /// \class RecordBatchStreamWriter |
87 | /// \brief Synchronous batch stream writer that writes the Arrow streaming |
88 | /// format |
89 | class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { |
90 | public: |
91 | ~RecordBatchStreamWriter() override; |
92 | |
93 | /// Create a new writer from stream sink and schema. User is responsible for |
94 | /// closing the actual OutputStream. |
95 | /// |
96 | /// \param[in] sink output stream to write to |
97 | /// \param[in] schema the schema of the record batches to be written |
98 | /// \param[out] out the created stream writer |
99 | /// \return Status |
100 | static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, |
101 | std::shared_ptr<RecordBatchWriter>* out); |
102 | |
103 | /// \brief Write a record batch to the stream |
104 | /// |
105 | /// \param[in] batch the record batch to write |
106 | /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1 |
107 | /// \return Status |
108 | Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; |
109 | |
110 | /// \brief Close the stream by writing a 4-byte int32 0 EOS market |
111 | /// \return Status |
112 | Status Close() override; |
113 | |
114 | void set_memory_pool(MemoryPool* pool) override; |
115 | |
116 | protected: |
117 | RecordBatchStreamWriter(); |
118 | class ARROW_NO_EXPORT RecordBatchStreamWriterImpl; |
119 | std::unique_ptr<RecordBatchStreamWriterImpl> impl_; |
120 | }; |
121 | |
122 | /// \brief Creates the Arrow record batch file format |
123 | /// |
124 | /// Implements the random access file format, which structurally is a record |
125 | /// batch stream followed by a metadata footer at the end of the file. Magic |
126 | /// numbers are written at the start and end of the file |
127 | class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { |
128 | public: |
129 | ~RecordBatchFileWriter() override; |
130 | |
131 | /// Create a new writer from stream sink and schema |
132 | /// |
133 | /// \param[in] sink output stream to write to |
134 | /// \param[in] schema the schema of the record batches to be written |
135 | /// \param[out] out the created stream writer |
136 | /// \return Status |
137 | static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, |
138 | std::shared_ptr<RecordBatchWriter>* out); |
139 | |
140 | /// \brief Write a record batch to the file |
141 | /// |
142 | /// \param[in] batch the record batch to write |
143 | /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1 |
144 | /// \return Status |
145 | Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; |
146 | |
147 | /// \brief Close the file stream by writing the file footer and magic number |
148 | /// \return Status |
149 | Status Close() override; |
150 | |
151 | private: |
152 | RecordBatchFileWriter(); |
153 | class ARROW_NO_EXPORT RecordBatchFileWriterImpl; |
154 | std::unique_ptr<RecordBatchFileWriterImpl> file_impl_; |
155 | }; |
156 | |
157 | /// \brief Low-level API for writing a record batch (without schema) to an OutputStream |
158 | /// |
159 | /// \param[in] batch the record batch to write |
160 | /// \param[in] buffer_start_offset the start offset to use in the buffer metadata, |
161 | /// generally should be 0 |
162 | /// \param[in] dst an OutputStream |
163 | /// \param[out] metadata_length the size of the length-prefixed flatbuffer |
164 | /// including padding to a 64-byte boundary |
165 | /// \param[out] body_length the size of the contiguous buffer block plus |
166 | /// \param[in] pool the memory pool to allocate memory from |
167 | /// \param[in] max_recursion_depth the maximum permitted nesting schema depth |
168 | /// \param[in] allow_64bit permit field lengths exceeding INT32_MAX. May not be |
169 | /// readable by other Arrow implementations |
170 | /// padding bytes |
171 | /// \return Status |
172 | /// |
173 | /// Write the RecordBatch (collection of equal-length Arrow arrays) to the |
174 | /// output stream in a contiguous block. The record batch metadata is written as |
175 | /// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) |
176 | /// prefixed by its size, followed by each of the memory buffers in the batch |
177 | /// written end to end (with appropriate alignment and padding): |
178 | /// |
179 | /// \code |
180 | /// <int32: metadata size> <uint8*: metadata> <buffers ...> |
181 | /// \endcode |
182 | /// |
183 | /// Finally, the absolute offsets (relative to the start of the output stream) |
184 | /// to the end of the body and end of the metadata / data header (suffixed by |
185 | /// the header size) is returned in out-variables |
186 | ARROW_EXPORT |
187 | Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, |
188 | io::OutputStream* dst, int32_t* metadata_length, |
189 | int64_t* body_length, MemoryPool* pool, |
190 | int max_recursion_depth = kMaxNestingDepth, |
191 | bool allow_64bit = false); |
192 | |
193 | /// \brief Serialize record batch as encapsulated IPC message in a new buffer |
194 | /// |
195 | /// \param[in] batch the record batch |
196 | /// \param[in] pool a MemoryPool to allocate memory from |
197 | /// \param[out] out the serialized message |
198 | /// \return Status |
199 | ARROW_EXPORT |
200 | Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, |
201 | std::shared_ptr<Buffer>* out); |
202 | |
203 | /// \brief Write record batch to OutputStream |
204 | /// |
205 | /// \param[in] batch the record batch to write |
206 | /// \param[in] pool a MemoryPool to use for temporary allocations, if needed |
207 | /// \param[in] out the OutputStream to write the output to |
208 | /// \return Status |
209 | /// |
210 | /// If writing to pre-allocated memory, you can use |
211 | /// arrow::ipc::GetRecordBatchSize to compute how much space is required |
212 | ARROW_EXPORT |
213 | Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, |
214 | io::OutputStream* out); |
215 | |
216 | /// \brief Serialize schema using stream writer as a sequence of one or more |
217 | /// IPC messages |
218 | /// |
219 | /// \param[in] schema the schema to write |
220 | /// \param[in] pool a MemoryPool to allocate memory from |
221 | /// \param[out] out the serialized schema |
222 | /// \return Status |
223 | ARROW_EXPORT |
224 | Status SerializeSchema(const Schema& schema, MemoryPool* pool, |
225 | std::shared_ptr<Buffer>* out); |
226 | |
227 | /// \brief Write multiple record batches to OutputStream, including schema |
228 | /// \param[in] batches a vector of batches. Must all have same schema |
229 | /// \param[out] dst an OutputStream |
230 | /// \return Status |
231 | ARROW_EXPORT |
232 | Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches, |
233 | io::OutputStream* dst); |
234 | |
235 | /// \brief Compute the number of bytes needed to write a record batch including metadata |
236 | /// |
237 | /// \param[in] batch the record batch to write |
238 | /// \param[out] size the size of the complete encapsulated message |
239 | /// \return Status |
240 | ARROW_EXPORT |
241 | Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); |
242 | |
243 | /// \brief Compute the number of bytes needed to write a tensor including metadata |
244 | /// |
245 | /// \param[in] tensor the tenseor to write |
246 | /// \param[out] size the size of the complete encapsulated message |
247 | /// \return Status |
248 | ARROW_EXPORT |
249 | Status GetTensorSize(const Tensor& tensor, int64_t* size); |
250 | |
251 | /// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory |
252 | /// allocation |
253 | /// |
254 | /// \param[in] tensor the Tensor to write |
255 | /// \param[in] pool MemoryPool to allocate space for metadata |
256 | /// \param[out] out the resulting Message |
257 | /// \return Status |
258 | ARROW_EXPORT |
259 | Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, |
260 | std::unique_ptr<Message>* out); |
261 | |
262 | /// \brief Write arrow::Tensor as a contiguous message. |
263 | /// |
264 | /// The metadata and body are written assuming 64-byte alignment. It is the |
265 | /// user's responsibility to ensure that the OutputStream has been aligned |
266 | /// to a 64-byte multiple before writing the message. |
267 | /// |
268 | /// The message is written out as followed: |
269 | /// \code |
270 | /// <metadata size> <metadata> <tensor data> |
271 | /// \endcode |
272 | /// |
273 | /// \param[in] tensor the Tensor to write |
274 | /// \param[in] dst the OutputStream to write to |
275 | /// \param[out] metadata_length the actual metadata length, including padding |
276 | /// \param[out] body_length the acutal message body length |
277 | /// \return Status |
278 | ARROW_EXPORT |
279 | Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, |
280 | int64_t* body_length); |
281 | |
282 | // \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous mesasge. The metadata, |
283 | // sparse index, and body are written assuming 64-byte alignment. It is the |
284 | // user's responsibility to ensure that the OutputStream has been aligned |
285 | // to a 64-byte multiple before writing the message. |
286 | // |
287 | // \param[in] tensor the SparseTensor to write |
288 | // \param[in] dst the OutputStream to write to |
289 | // \param[out] metadata_length the actual metadata length, including padding |
290 | // \param[out] body_length the actual message body length |
291 | ARROW_EXPORT |
292 | Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, |
293 | int32_t* metadata_length, int64_t* body_length, |
294 | MemoryPool* pool); |
295 | |
296 | namespace internal { |
297 | |
298 | // These internal APIs may change without warning or deprecation |
299 | |
300 | // Intermediate data structure with metadata header plus zero or more buffers |
301 | // for the message body. This data can either be written out directly as an |
302 | // encapsulated IPC message or used with Flight RPCs |
303 | struct IpcPayload { |
304 | Message::Type type; |
305 | std::shared_ptr<Buffer> metadata; |
306 | std::vector<std::shared_ptr<Buffer>> body_buffers; |
307 | int64_t body_length; |
308 | }; |
309 | |
310 | /// \brief Extract IPC payloads from given schema for purposes of wire |
311 | /// transport, separate from using the *StreamWriter classes |
312 | ARROW_EXPORT |
313 | Status GetDictionaryPayloads(const Schema& schema, |
314 | std::vector<std::unique_ptr<IpcPayload>>* out); |
315 | |
316 | /// \brief Compute IpcPayload for the given record batch |
317 | /// \param[in] batch the RecordBatch that is being serialized |
318 | /// \param[in,out] pool for any required temporary memory allocations |
319 | /// \param[out] out the returned IpcPayload |
320 | /// \return Status |
321 | ARROW_EXPORT |
322 | Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out); |
323 | |
324 | } // namespace internal |
325 | |
326 | } // namespace ipc |
327 | } // namespace arrow |
328 | |
329 | #endif // ARROW_IPC_WRITER_H |
330 | |