1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Public API for different memory sharing / IO mechanisms
19
20#ifndef ARROW_IO_MEMORY_H
21#define ARROW_IO_MEMORY_H
22
23#include <cstdint>
24#include <memory>
25
26#include "arrow/io/interfaces.h"
27#include "arrow/memory_pool.h"
28#include "arrow/util/string_view.h"
29#include "arrow/util/visibility.h"
30
31namespace arrow {
32
33class Buffer;
34class ResizableBuffer;
35class Status;
36
37namespace io {
38
39// \brief An output stream that writes to a resizable buffer
40class ARROW_EXPORT BufferOutputStream : public OutputStream {
41 public:
42 explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
43
44 /// \brief Create in-memory output stream with indicated capacity using a
45 /// memory pool
46 /// \param[in] initial_capacity the initial allocated internal capacity of
47 /// the OutputStream
48 /// \param[in,out] pool a MemoryPool to use for allocations
49 /// \param[out] out the created stream
50 static Status Create(int64_t initial_capacity, MemoryPool* pool,
51 std::shared_ptr<BufferOutputStream>* out);
52
53 ~BufferOutputStream() override;
54
55 // Implement the OutputStream interface
56 Status Close() override;
57 bool closed() const override;
58 Status Tell(int64_t* position) const override;
59 Status Write(const void* data, int64_t nbytes) override;
60
61 using OutputStream::Write;
62
63 /// Close the stream and return the buffer
64 Status Finish(std::shared_ptr<Buffer>* result);
65
66 /// \brief Initialize state of OutputStream with newly allocated memory and
67 /// set position to 0
68 /// \param[in] initial_capacity the starting allocated capacity
69 /// \param[in,out] pool the memory pool to use for allocations
70 /// \return Status
71 Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool());
72
73 int64_t capacity() const { return capacity_; }
74
75 private:
76 BufferOutputStream();
77
78 // Ensures there is sufficient space available to write nbytes
79 Status Reserve(int64_t nbytes);
80
81 std::shared_ptr<ResizableBuffer> buffer_;
82 bool is_open_;
83 int64_t capacity_;
84 int64_t position_;
85 uint8_t* mutable_data_;
86};
87
88// \brief A helper class to tracks the size of allocations
89class ARROW_EXPORT MockOutputStream : public OutputStream {
90 public:
91 MockOutputStream() : extent_bytes_written_(0), is_open_(true) {}
92
93 // Implement the OutputStream interface
94 Status Close() override;
95 bool closed() const override;
96 Status Tell(int64_t* position) const override;
97 Status Write(const void* data, int64_t nbytes) override;
98
99 int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
100
101 private:
102 int64_t extent_bytes_written_;
103 bool is_open_;
104};
105
106/// \brief Enables random writes into a fixed-size mutable buffer
107class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile {
108 public:
109 /// Input buffer must be mutable, will abort if not
110 explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
111 ~FixedSizeBufferWriter() override;
112
113 Status Close() override;
114 bool closed() const override;
115 Status Seek(int64_t position) override;
116 Status Tell(int64_t* position) const override;
117 Status Write(const void* data, int64_t nbytes) override;
118 Status WriteAt(int64_t position, const void* data, int64_t nbytes) override;
119
120 void set_memcopy_threads(int num_threads);
121 void set_memcopy_blocksize(int64_t blocksize);
122 void set_memcopy_threshold(int64_t threshold);
123
124 protected:
125 class FixedSizeBufferWriterImpl;
126 std::unique_ptr<FixedSizeBufferWriterImpl> impl_;
127};
128
129/// \class BufferReader
130/// \brief Random access zero-copy reads on an arrow::Buffer
131class ARROW_EXPORT BufferReader : public RandomAccessFile {
132 public:
133 explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
134 explicit BufferReader(const Buffer& buffer);
135 BufferReader(const uint8_t* data, int64_t size);
136
137 /// \brief Instantiate from std::string or arrow::util::string_view. Does not
138 /// own data
139 explicit BufferReader(const util::string_view& data)
140 : BufferReader(reinterpret_cast<const uint8_t*>(data.data()),
141 static_cast<int64_t>(data.size())) {}
142
143 Status Close() override;
144 bool closed() const override;
145 Status Tell(int64_t* position) const override;
146 Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override;
147 // Zero copy read
148 Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
149
150 util::string_view Peek(int64_t nbytes) const override;
151
152 bool supports_zero_copy() const override;
153
154 Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
155 void* out) override;
156 Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
157
158 Status GetSize(int64_t* size) override;
159 Status Seek(int64_t position) override;
160
161 std::shared_ptr<Buffer> buffer() const { return buffer_; }
162
163 protected:
164 std::shared_ptr<Buffer> buffer_;
165 const uint8_t* data_;
166 int64_t size_;
167 int64_t position_;
168 bool is_open_;
169};
170
171} // namespace io
172} // namespace arrow
173
174#endif // ARROW_IO_MEMORY_H
175