1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // Public API for different memory sharing / IO mechanisms |
19 | |
20 | #ifndef ARROW_IO_MEMORY_H |
21 | #define ARROW_IO_MEMORY_H |
22 | |
23 | #include <cstdint> |
24 | #include <memory> |
25 | |
26 | #include "arrow/io/interfaces.h" |
27 | #include "arrow/memory_pool.h" |
28 | #include "arrow/util/string_view.h" |
29 | #include "arrow/util/visibility.h" |
30 | |
31 | namespace arrow { |
32 | |
33 | class Buffer; |
34 | class ResizableBuffer; |
35 | class Status; |
36 | |
37 | namespace io { |
38 | |
39 | // \brief An output stream that writes to a resizable buffer |
40 | class ARROW_EXPORT BufferOutputStream : public OutputStream { |
41 | public: |
42 | explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer); |
43 | |
44 | /// \brief Create in-memory output stream with indicated capacity using a |
45 | /// memory pool |
46 | /// \param[in] initial_capacity the initial allocated internal capacity of |
47 | /// the OutputStream |
48 | /// \param[in,out] pool a MemoryPool to use for allocations |
49 | /// \param[out] out the created stream |
50 | static Status Create(int64_t initial_capacity, MemoryPool* pool, |
51 | std::shared_ptr<BufferOutputStream>* out); |
52 | |
53 | ~BufferOutputStream() override; |
54 | |
55 | // Implement the OutputStream interface |
56 | Status Close() override; |
57 | bool closed() const override; |
58 | Status Tell(int64_t* position) const override; |
59 | Status Write(const void* data, int64_t nbytes) override; |
60 | |
61 | using OutputStream::Write; |
62 | |
63 | /// Close the stream and return the buffer |
64 | Status Finish(std::shared_ptr<Buffer>* result); |
65 | |
66 | /// \brief Initialize state of OutputStream with newly allocated memory and |
67 | /// set position to 0 |
68 | /// \param[in] initial_capacity the starting allocated capacity |
69 | /// \param[in,out] pool the memory pool to use for allocations |
70 | /// \return Status |
71 | Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool()); |
72 | |
73 | int64_t capacity() const { return capacity_; } |
74 | |
75 | private: |
76 | BufferOutputStream(); |
77 | |
78 | // Ensures there is sufficient space available to write nbytes |
79 | Status Reserve(int64_t nbytes); |
80 | |
81 | std::shared_ptr<ResizableBuffer> buffer_; |
82 | bool is_open_; |
83 | int64_t capacity_; |
84 | int64_t position_; |
85 | uint8_t* mutable_data_; |
86 | }; |
87 | |
88 | // \brief A helper class to tracks the size of allocations |
89 | class ARROW_EXPORT MockOutputStream : public OutputStream { |
90 | public: |
91 | MockOutputStream() : extent_bytes_written_(0), is_open_(true) {} |
92 | |
93 | // Implement the OutputStream interface |
94 | Status Close() override; |
95 | bool closed() const override; |
96 | Status Tell(int64_t* position) const override; |
97 | Status Write(const void* data, int64_t nbytes) override; |
98 | |
99 | int64_t GetExtentBytesWritten() const { return extent_bytes_written_; } |
100 | |
101 | private: |
102 | int64_t extent_bytes_written_; |
103 | bool is_open_; |
104 | }; |
105 | |
106 | /// \brief Enables random writes into a fixed-size mutable buffer |
107 | class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile { |
108 | public: |
109 | /// Input buffer must be mutable, will abort if not |
110 | explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer); |
111 | ~FixedSizeBufferWriter() override; |
112 | |
113 | Status Close() override; |
114 | bool closed() const override; |
115 | Status Seek(int64_t position) override; |
116 | Status Tell(int64_t* position) const override; |
117 | Status Write(const void* data, int64_t nbytes) override; |
118 | Status WriteAt(int64_t position, const void* data, int64_t nbytes) override; |
119 | |
120 | void set_memcopy_threads(int num_threads); |
121 | void set_memcopy_blocksize(int64_t blocksize); |
122 | void set_memcopy_threshold(int64_t threshold); |
123 | |
124 | protected: |
125 | class FixedSizeBufferWriterImpl; |
126 | std::unique_ptr<FixedSizeBufferWriterImpl> impl_; |
127 | }; |
128 | |
129 | /// \class BufferReader |
130 | /// \brief Random access zero-copy reads on an arrow::Buffer |
131 | class ARROW_EXPORT BufferReader : public RandomAccessFile { |
132 | public: |
133 | explicit BufferReader(const std::shared_ptr<Buffer>& buffer); |
134 | explicit BufferReader(const Buffer& buffer); |
135 | BufferReader(const uint8_t* data, int64_t size); |
136 | |
137 | /// \brief Instantiate from std::string or arrow::util::string_view. Does not |
138 | /// own data |
139 | explicit BufferReader(const util::string_view& data) |
140 | : BufferReader(reinterpret_cast<const uint8_t*>(data.data()), |
141 | static_cast<int64_t>(data.size())) {} |
142 | |
143 | Status Close() override; |
144 | bool closed() const override; |
145 | Status Tell(int64_t* position) const override; |
146 | Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override; |
147 | // Zero copy read |
148 | Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override; |
149 | |
150 | util::string_view Peek(int64_t nbytes) const override; |
151 | |
152 | bool supports_zero_copy() const override; |
153 | |
154 | Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, |
155 | void* out) override; |
156 | Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override; |
157 | |
158 | Status GetSize(int64_t* size) override; |
159 | Status Seek(int64_t position) override; |
160 | |
161 | std::shared_ptr<Buffer> buffer() const { return buffer_; } |
162 | |
163 | protected: |
164 | std::shared_ptr<Buffer> buffer_; |
165 | const uint8_t* data_; |
166 | int64_t size_; |
167 | int64_t position_; |
168 | bool is_open_; |
169 | }; |
170 | |
171 | } // namespace io |
172 | } // namespace arrow |
173 | |
174 | #endif // ARROW_IO_MEMORY_H |
175 | |