1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Read Arrow files and streams
19
20#ifndef ARROW_IPC_READER_H
21#define ARROW_IPC_READER_H
22
23#include <cstdint>
24#include <memory>
25
26#include "arrow/ipc/message.h"
27#include "arrow/record_batch.h"
28#include "arrow/util/visibility.h"
29
30namespace arrow {
31
32class Buffer;
33class Schema;
34class Status;
35class Tensor;
36class SparseTensor;
37
38namespace io {
39
40class InputStream;
41class RandomAccessFile;
42
43} // namespace io
44
45namespace ipc {
46
47using RecordBatchReader = ::arrow::RecordBatchReader;
48
49/// \class RecordBatchStreamReader
50/// \brief Synchronous batch stream reader that reads from io::InputStream
51///
52/// This class reads the schema (plus any dictionaries) as the first messages
53/// in the stream, followed by record batches. For more granular zero-copy
54/// reads see the ReadRecordBatch functions
55class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
56 public:
57 ~RecordBatchStreamReader() override;
58
59 /// Create batch reader from generic MessageReader
60 ///
61 /// \param[in] message_reader a MessageReader implementation
62 /// \param[out] out the created RecordBatchReader object
63 /// \return Status
64 static Status Open(std::unique_ptr<MessageReader> message_reader,
65 std::shared_ptr<RecordBatchReader>* out);
66
67 /// \brief Record batch stream reader from InputStream
68 ///
69 /// \param[in] stream an input stream instance. Must stay alive throughout
70 /// lifetime of stream reader
71 /// \param[out] out the created RecordBatchStreamReader object
72 /// \return Status
73 static Status Open(io::InputStream* stream, std::shared_ptr<RecordBatchReader>* out);
74
75 /// \brief Open stream and retain ownership of stream object
76 /// \param[in] stream the input stream
77 /// \param[out] out the batch reader
78 /// \return Status
79 static Status Open(const std::shared_ptr<io::InputStream>& stream,
80 std::shared_ptr<RecordBatchReader>* out);
81
82 /// \brief Returns the schema read from the stream
83 std::shared_ptr<Schema> schema() const override;
84
85 Status ReadNext(std::shared_ptr<RecordBatch>* batch) override;
86
87 private:
88 RecordBatchStreamReader();
89
90 class ARROW_NO_EXPORT RecordBatchStreamReaderImpl;
91 std::unique_ptr<RecordBatchStreamReaderImpl> impl_;
92};
93
94/// \brief Reads the record batch file format
95class ARROW_EXPORT RecordBatchFileReader {
96 public:
97 ~RecordBatchFileReader();
98
99 /// \brief Open a RecordBatchFileReader
100 ///
101 /// Open a file-like object that is assumed to be self-contained; i.e., the
102 /// end of the file interface is the end of the Arrow file. Note that there
103 /// can be any amount of data preceding the Arrow-formatted data, because we
104 /// need only locate the end of the Arrow file stream to discover the metadata
105 /// and then proceed to read the data into memory.
106 static Status Open(io::RandomAccessFile* file,
107 std::shared_ptr<RecordBatchFileReader>* reader);
108
109 /// \brief Open a RecordBatchFileReader
110 /// If the file is embedded within some larger file or memory region, you can
111 /// pass the absolute memory offset to the end of the file (which contains the
112 /// metadata footer). The metadata must have been written with memory offsets
113 /// relative to the start of the containing file
114 ///
115 /// \param[in] file the data source
116 /// \param[in] footer_offset the position of the end of the Arrow file
117 /// \param[out] reader the returned reader
118 /// \return Status
119 static Status Open(io::RandomAccessFile* file, int64_t footer_offset,
120 std::shared_ptr<RecordBatchFileReader>* reader);
121
122 /// \brief Version of Open that retains ownership of file
123 ///
124 /// \param[in] file the data source
125 /// \param[out] reader the returned reader
126 /// \return Status
127 static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
128 std::shared_ptr<RecordBatchFileReader>* reader);
129
130 /// \brief Version of Open that retains ownership of file
131 ///
132 /// \param[in] file the data source
133 /// \param[in] footer_offset the position of the end of the Arrow file
134 /// \param[out] reader the returned reader
135 /// \return Status
136 static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
137 int64_t footer_offset,
138 std::shared_ptr<RecordBatchFileReader>* reader);
139
140 /// \brief The schema read from the file
141 std::shared_ptr<Schema> schema() const;
142
143 /// \brief Returns the number of record batches in the file
144 int num_record_batches() const;
145
146 /// \brief Return the metadata version from the file metadata
147 MetadataVersion version() const;
148
149 /// \brief Read a particular record batch from the file. Does not copy memory
150 /// if the input source supports zero-copy.
151 ///
152 /// \param[in] i the index of the record batch to return
153 /// \param[out] batch the read batch
154 /// \return Status
155 Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
156
157 private:
158 RecordBatchFileReader();
159
160 class ARROW_NO_EXPORT RecordBatchFileReaderImpl;
161 std::unique_ptr<RecordBatchFileReaderImpl> impl_;
162};
163
164// Generic read functions; does not copy data if the input supports zero copy reads
165
166/// \brief Read Schema from stream serialized as a sequence of one or more IPC
167/// messages
168///
169/// \param[in] stream an InputStream
170/// \param[out] out the output Schema
171/// \return Status
172///
173/// If record batches follow the schema, it is better to use
174/// RecordBatchStreamReader
175ARROW_EXPORT
176Status ReadSchema(io::InputStream* stream, std::shared_ptr<Schema>* out);
177
178/// Read record batch as encapsulated IPC message with metadata size prefix and
179/// header
180///
181/// \param[in] schema the record batch schema
182/// \param[in] stream the file where the batch is located
183/// \param[out] out the read record batch
184/// \return Status
185ARROW_EXPORT
186Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, io::InputStream* stream,
187 std::shared_ptr<RecordBatch>* out);
188
189/// \brief Read record batch from file given metadata and schema
190///
191/// \param[in] metadata a Message containing the record batch metadata
192/// \param[in] schema the record batch schema
193/// \param[in] file a random access file
194/// \param[out] out the read record batch
195/// \return Status
196ARROW_EXPORT
197Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
198 io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
199
200/// \brief Read record batch from encapsulated Message
201///
202/// \param[in] message a message instance containing metadata and body
203/// \param[in] schema the record batch schema
204/// \param[out] out the resulting RecordBatch
205/// \return Status
206ARROW_EXPORT
207Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema,
208 std::shared_ptr<RecordBatch>* out);
209
210/// Read record batch from file given metadata and schema
211///
212/// \param[in] metadata a Message containing the record batch metadata
213/// \param[in] schema the record batch schema
214/// \param[in] file a random access file
215/// \param[in] max_recursion_depth the maximum permitted nesting depth
216/// \param[out] out the read record batch
217/// \return Status
218ARROW_EXPORT
219Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
220 int max_recursion_depth, io::RandomAccessFile* file,
221 std::shared_ptr<RecordBatch>* out);
222
223/// \brief Read arrow::Tensor as encapsulated IPC message in file
224///
225/// \param[in] file an InputStream pointed at the start of the message
226/// \param[out] out the read tensor
227/// \return Status
228ARROW_EXPORT
229Status ReadTensor(io::InputStream* file, std::shared_ptr<Tensor>* out);
230
231/// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message
232///
233/// \param[in] message a Message containing the tensor metadata and body
234/// \param[out] out the read tensor
235/// \return Status
236ARROW_EXPORT
237Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out);
238
239/// \brief EXPERIMETNAL: Read arrow::SparseTensor as encapsulated IPC message in file
240///
241/// \param[in] file an InputStream pointed at the start of the message
242/// \param[out] out the read sparse tensor
243/// \return Status
244ARROW_EXPORT
245Status ReadSparseTensor(io::InputStream* file, std::shared_ptr<SparseTensor>* out);
246
247/// \brief EXPERIMENTAL: Read arrow::SparseTensor from IPC message
248///
249/// \param[in] message a Message containing the tensor metadata and body
250/// \param[out] out the read sparse tensor
251/// \return Status
252ARROW_EXPORT
253Status ReadSparseTensor(const Message& message, std::shared_ptr<SparseTensor>* out);
254
255} // namespace ipc
256} // namespace arrow
257
258#endif // ARROW_IPC_READER_H
259