1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Implement Arrow streaming binary format
19
20#ifndef ARROW_IPC_WRITER_H
21#define ARROW_IPC_WRITER_H
22
23#include <cstdint>
24#include <memory>
25#include <vector>
26
27#include "arrow/ipc/message.h"
28#include "arrow/util/visibility.h"
29
30namespace arrow {
31
32class Buffer;
33class MemoryPool;
34class RecordBatch;
35class Schema;
36class Status;
37class Table;
38class Tensor;
39class SparseTensor;
40
41namespace io {
42
43class OutputStream;
44
45} // namespace io
46
47namespace ipc {
48
49/// \class RecordBatchWriter
50/// \brief Abstract interface for writing a stream of record batches
51class ARROW_EXPORT RecordBatchWriter {
52 public:
53 virtual ~RecordBatchWriter();
54
55 /// \brief Write a record batch to the stream
56 ///
57 /// \param[in] batch the record batch to write to the stream
58 /// \param[in] allow_64bit if true, allow field lengths that don't fit
59 /// in a signed 32-bit int
60 /// \return Status
61 virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0;
62
63 /// \brief Write possibly-chunked table by creating sequence of record batches
64 /// \param[in] table table to write
65 /// \return Status
66 Status WriteTable(const Table& table);
67
68 /// \brief Write Table with a particular chunksize
69 /// \param[in] table table to write
70 /// \param[in] max_chunksize maximum chunk size for table chunks
71 /// \return Status
72 Status WriteTable(const Table& table, int64_t max_chunksize);
73
74 /// \brief Perform any logic necessary to finish the stream
75 ///
76 /// \return Status
77 virtual Status Close() = 0;
78
79 /// In some cases, writing may require memory allocation. We use the default
80 /// memory pool, but provide the option to override
81 ///
82 /// \param pool the memory pool to use for required allocations
83 virtual void set_memory_pool(MemoryPool* pool) = 0;
84};
85
86/// \class RecordBatchStreamWriter
87/// \brief Synchronous batch stream writer that writes the Arrow streaming
88/// format
89class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
90 public:
91 ~RecordBatchStreamWriter() override;
92
93 /// Create a new writer from stream sink and schema. User is responsible for
94 /// closing the actual OutputStream.
95 ///
96 /// \param[in] sink output stream to write to
97 /// \param[in] schema the schema of the record batches to be written
98 /// \param[out] out the created stream writer
99 /// \return Status
100 static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
101 std::shared_ptr<RecordBatchWriter>* out);
102
103 /// \brief Write a record batch to the stream
104 ///
105 /// \param[in] batch the record batch to write
106 /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1
107 /// \return Status
108 Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
109
110 /// \brief Close the stream by writing a 4-byte int32 0 EOS market
111 /// \return Status
112 Status Close() override;
113
114 void set_memory_pool(MemoryPool* pool) override;
115
116 protected:
117 RecordBatchStreamWriter();
118 class ARROW_NO_EXPORT RecordBatchStreamWriterImpl;
119 std::unique_ptr<RecordBatchStreamWriterImpl> impl_;
120};
121
122/// \brief Creates the Arrow record batch file format
123///
124/// Implements the random access file format, which structurally is a record
125/// batch stream followed by a metadata footer at the end of the file. Magic
126/// numbers are written at the start and end of the file
127class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
128 public:
129 ~RecordBatchFileWriter() override;
130
131 /// Create a new writer from stream sink and schema
132 ///
133 /// \param[in] sink output stream to write to
134 /// \param[in] schema the schema of the record batches to be written
135 /// \param[out] out the created stream writer
136 /// \return Status
137 static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
138 std::shared_ptr<RecordBatchWriter>* out);
139
140 /// \brief Write a record batch to the file
141 ///
142 /// \param[in] batch the record batch to write
143 /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1
144 /// \return Status
145 Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
146
147 /// \brief Close the file stream by writing the file footer and magic number
148 /// \return Status
149 Status Close() override;
150
151 private:
152 RecordBatchFileWriter();
153 class ARROW_NO_EXPORT RecordBatchFileWriterImpl;
154 std::unique_ptr<RecordBatchFileWriterImpl> file_impl_;
155};
156
157/// \brief Low-level API for writing a record batch (without schema) to an OutputStream
158///
159/// \param[in] batch the record batch to write
160/// \param[in] buffer_start_offset the start offset to use in the buffer metadata,
161/// generally should be 0
162/// \param[in] dst an OutputStream
163/// \param[out] metadata_length the size of the length-prefixed flatbuffer
164/// including padding to a 64-byte boundary
165/// \param[out] body_length the size of the contiguous buffer block plus
166/// \param[in] pool the memory pool to allocate memory from
167/// \param[in] max_recursion_depth the maximum permitted nesting schema depth
168/// \param[in] allow_64bit permit field lengths exceeding INT32_MAX. May not be
169/// readable by other Arrow implementations
170/// padding bytes
171/// \return Status
172///
173/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
174/// output stream in a contiguous block. The record batch metadata is written as
175/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
176/// prefixed by its size, followed by each of the memory buffers in the batch
177/// written end to end (with appropriate alignment and padding):
178///
179/// \code
180/// <int32: metadata size> <uint8*: metadata> <buffers ...>
181/// \endcode
182///
183/// Finally, the absolute offsets (relative to the start of the output stream)
184/// to the end of the body and end of the metadata / data header (suffixed by
185/// the header size) is returned in out-variables
186ARROW_EXPORT
187Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
188 io::OutputStream* dst, int32_t* metadata_length,
189 int64_t* body_length, MemoryPool* pool,
190 int max_recursion_depth = kMaxNestingDepth,
191 bool allow_64bit = false);
192
193/// \brief Serialize record batch as encapsulated IPC message in a new buffer
194///
195/// \param[in] batch the record batch
196/// \param[in] pool a MemoryPool to allocate memory from
197/// \param[out] out the serialized message
198/// \return Status
199ARROW_EXPORT
200Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
201 std::shared_ptr<Buffer>* out);
202
203/// \brief Write record batch to OutputStream
204///
205/// \param[in] batch the record batch to write
206/// \param[in] pool a MemoryPool to use for temporary allocations, if needed
207/// \param[in] out the OutputStream to write the output to
208/// \return Status
209///
210/// If writing to pre-allocated memory, you can use
211/// arrow::ipc::GetRecordBatchSize to compute how much space is required
212ARROW_EXPORT
213Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
214 io::OutputStream* out);
215
216/// \brief Serialize schema using stream writer as a sequence of one or more
217/// IPC messages
218///
219/// \param[in] schema the schema to write
220/// \param[in] pool a MemoryPool to allocate memory from
221/// \param[out] out the serialized schema
222/// \return Status
223ARROW_EXPORT
224Status SerializeSchema(const Schema& schema, MemoryPool* pool,
225 std::shared_ptr<Buffer>* out);
226
227/// \brief Write multiple record batches to OutputStream, including schema
228/// \param[in] batches a vector of batches. Must all have same schema
229/// \param[out] dst an OutputStream
230/// \return Status
231ARROW_EXPORT
232Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
233 io::OutputStream* dst);
234
235/// \brief Compute the number of bytes needed to write a record batch including metadata
236///
237/// \param[in] batch the record batch to write
238/// \param[out] size the size of the complete encapsulated message
239/// \return Status
240ARROW_EXPORT
241Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
242
243/// \brief Compute the number of bytes needed to write a tensor including metadata
244///
245/// \param[in] tensor the tenseor to write
246/// \param[out] size the size of the complete encapsulated message
247/// \return Status
248ARROW_EXPORT
249Status GetTensorSize(const Tensor& tensor, int64_t* size);
250
251/// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory
252/// allocation
253///
254/// \param[in] tensor the Tensor to write
255/// \param[in] pool MemoryPool to allocate space for metadata
256/// \param[out] out the resulting Message
257/// \return Status
258ARROW_EXPORT
259Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
260 std::unique_ptr<Message>* out);
261
262/// \brief Write arrow::Tensor as a contiguous message.
263///
264/// The metadata and body are written assuming 64-byte alignment. It is the
265/// user's responsibility to ensure that the OutputStream has been aligned
266/// to a 64-byte multiple before writing the message.
267///
268/// The message is written out as followed:
269/// \code
270/// <metadata size> <metadata> <tensor data>
271/// \endcode
272///
273/// \param[in] tensor the Tensor to write
274/// \param[in] dst the OutputStream to write to
275/// \param[out] metadata_length the actual metadata length, including padding
276/// \param[out] body_length the acutal message body length
277/// \return Status
278ARROW_EXPORT
279Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
280 int64_t* body_length);
281
282// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous mesasge. The metadata,
283// sparse index, and body are written assuming 64-byte alignment. It is the
284// user's responsibility to ensure that the OutputStream has been aligned
285// to a 64-byte multiple before writing the message.
286//
287// \param[in] tensor the SparseTensor to write
288// \param[in] dst the OutputStream to write to
289// \param[out] metadata_length the actual metadata length, including padding
290// \param[out] body_length the actual message body length
291ARROW_EXPORT
292Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst,
293 int32_t* metadata_length, int64_t* body_length,
294 MemoryPool* pool);
295
296namespace internal {
297
298// These internal APIs may change without warning or deprecation
299
300// Intermediate data structure with metadata header plus zero or more buffers
301// for the message body. This data can either be written out directly as an
302// encapsulated IPC message or used with Flight RPCs
303struct IpcPayload {
304 Message::Type type;
305 std::shared_ptr<Buffer> metadata;
306 std::vector<std::shared_ptr<Buffer>> body_buffers;
307 int64_t body_length;
308};
309
310/// \brief Extract IPC payloads from given schema for purposes of wire
311/// transport, separate from using the *StreamWriter classes
312ARROW_EXPORT
313Status GetDictionaryPayloads(const Schema& schema,
314 std::vector<std::unique_ptr<IpcPayload>>* out);
315
316/// \brief Compute IpcPayload for the given record batch
317/// \param[in] batch the RecordBatch that is being serialized
318/// \param[in,out] pool for any required temporary memory allocations
319/// \param[out] out the returned IpcPayload
320/// \return Status
321ARROW_EXPORT
322Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out);
323
324} // namespace internal
325
326} // namespace ipc
327} // namespace arrow
328
329#endif // ARROW_IPC_WRITER_H
330