1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/io/memory.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstring> |
23 | #include <mutex> |
24 | |
25 | #include "arrow/buffer.h" |
26 | #include "arrow/status.h" |
27 | #include "arrow/util/logging.h" |
28 | #include "arrow/util/macros.h" |
29 | #include "arrow/util/memory.h" |
30 | |
31 | namespace arrow { |
32 | namespace io { |
33 | |
34 | // ---------------------------------------------------------------------- |
35 | // OutputStream that writes to resizable buffer |
36 | |
37 | static constexpr int64_t kBufferMinimumSize = 256; |
38 | |
39 | BufferOutputStream::BufferOutputStream() |
40 | : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {} |
41 | |
42 | BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer) |
43 | : buffer_(buffer), |
44 | is_open_(true), |
45 | capacity_(buffer->size()), |
46 | position_(0), |
47 | mutable_data_(buffer->mutable_data()) {} |
48 | |
49 | Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool, |
50 | std::shared_ptr<BufferOutputStream>* out) { |
51 | // ctor is private, so cannot use make_shared |
52 | *out = std::shared_ptr<BufferOutputStream>(new BufferOutputStream); |
53 | return (*out)->Reset(initial_capacity, pool); |
54 | } |
55 | |
56 | Status BufferOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool) { |
57 | RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer_)); |
58 | is_open_ = true; |
59 | capacity_ = initial_capacity; |
60 | position_ = 0; |
61 | mutable_data_ = buffer_->mutable_data(); |
62 | return Status::OK(); |
63 | } |
64 | |
65 | BufferOutputStream::~BufferOutputStream() { |
66 | // This can fail, better to explicitly call close |
67 | if (buffer_) { |
68 | DCHECK(Close().ok()); |
69 | } |
70 | } |
71 | |
72 | Status BufferOutputStream::Close() { |
73 | if (is_open_) { |
74 | is_open_ = false; |
75 | if (position_ < capacity_) { |
76 | RETURN_NOT_OK(buffer_->Resize(position_, false)); |
77 | } |
78 | } |
79 | return Status::OK(); |
80 | } |
81 | |
82 | bool BufferOutputStream::closed() const { return !is_open_; } |
83 | |
84 | Status BufferOutputStream::Finish(std::shared_ptr<Buffer>* result) { |
85 | RETURN_NOT_OK(Close()); |
86 | buffer_->ZeroPadding(); |
87 | *result = buffer_; |
88 | buffer_ = nullptr; |
89 | is_open_ = false; |
90 | return Status::OK(); |
91 | } |
92 | |
93 | Status BufferOutputStream::Tell(int64_t* position) const { |
94 | *position = position_; |
95 | return Status::OK(); |
96 | } |
97 | |
98 | Status BufferOutputStream::Write(const void* data, int64_t nbytes) { |
99 | if (ARROW_PREDICT_FALSE(!is_open_)) { |
100 | return Status::IOError("OutputStream is closed" ); |
101 | } |
102 | DCHECK(buffer_); |
103 | RETURN_NOT_OK(Reserve(nbytes)); |
104 | memcpy(mutable_data_ + position_, data, nbytes); |
105 | position_ += nbytes; |
106 | return Status::OK(); |
107 | } |
108 | |
109 | Status BufferOutputStream::Reserve(int64_t nbytes) { |
110 | int64_t new_capacity = capacity_; |
111 | while (position_ + nbytes > new_capacity) { |
112 | new_capacity = std::max(kBufferMinimumSize, new_capacity * 2); |
113 | } |
114 | if (new_capacity > capacity_) { |
115 | RETURN_NOT_OK(buffer_->Resize(new_capacity)); |
116 | capacity_ = new_capacity; |
117 | } |
118 | mutable_data_ = buffer_->mutable_data(); |
119 | return Status::OK(); |
120 | } |
121 | |
122 | // ---------------------------------------------------------------------- |
123 | // OutputStream that doesn't write anything |
124 | |
125 | Status MockOutputStream::Close() { |
126 | is_open_ = false; |
127 | return Status::OK(); |
128 | } |
129 | |
130 | bool MockOutputStream::closed() const { return !is_open_; } |
131 | |
132 | Status MockOutputStream::Tell(int64_t* position) const { |
133 | *position = extent_bytes_written_; |
134 | return Status::OK(); |
135 | } |
136 | |
137 | Status MockOutputStream::Write(const void* data, int64_t nbytes) { |
138 | extent_bytes_written_ += nbytes; |
139 | return Status::OK(); |
140 | } |
141 | |
142 | // ---------------------------------------------------------------------- |
143 | // In-memory buffer writer |
144 | |
145 | static constexpr int kMemcopyDefaultNumThreads = 1; |
146 | static constexpr int64_t kMemcopyDefaultBlocksize = 64; |
147 | static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024; |
148 | |
149 | class FixedSizeBufferWriter::FixedSizeBufferWriterImpl { |
150 | public: |
151 | /// Input buffer must be mutable, will abort if not |
152 | |
153 | /// Input buffer must be mutable, will abort if not |
154 | explicit FixedSizeBufferWriterImpl(const std::shared_ptr<Buffer>& buffer) |
155 | : is_open_(true), |
156 | memcopy_num_threads_(kMemcopyDefaultNumThreads), |
157 | memcopy_blocksize_(kMemcopyDefaultBlocksize), |
158 | memcopy_threshold_(kMemcopyDefaultThreshold) { |
159 | buffer_ = buffer; |
160 | DCHECK(buffer->is_mutable()) << "Must pass mutable buffer" ; |
161 | mutable_data_ = buffer->mutable_data(); |
162 | size_ = buffer->size(); |
163 | position_ = 0; |
164 | } |
165 | |
166 | Status Close() { |
167 | is_open_ = false; |
168 | return Status::OK(); |
169 | } |
170 | |
171 | bool closed() const { return !is_open_; } |
172 | |
173 | Status Seek(int64_t position) { |
174 | if (position < 0 || position > size_) { |
175 | return Status::IOError("Seek out of bounds" ); |
176 | } |
177 | position_ = position; |
178 | return Status::OK(); |
179 | } |
180 | |
181 | Status Tell(int64_t* position) { |
182 | *position = position_; |
183 | return Status::OK(); |
184 | } |
185 | |
186 | Status Write(const void* data, int64_t nbytes) { |
187 | if (position_ + nbytes > size_) { |
188 | return Status::IOError("Write out of bounds" ); |
189 | } |
190 | if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) { |
191 | internal::parallel_memcopy(mutable_data_ + position_, |
192 | reinterpret_cast<const uint8_t*>(data), nbytes, |
193 | memcopy_blocksize_, memcopy_num_threads_); |
194 | } else { |
195 | memcpy(mutable_data_ + position_, data, nbytes); |
196 | } |
197 | position_ += nbytes; |
198 | return Status::OK(); |
199 | } |
200 | |
201 | Status WriteAt(int64_t position, const void* data, int64_t nbytes) { |
202 | std::lock_guard<std::mutex> guard(lock_); |
203 | RETURN_NOT_OK(Seek(position)); |
204 | return Write(data, nbytes); |
205 | } |
206 | |
207 | void set_memcopy_threads(int num_threads) { memcopy_num_threads_ = num_threads; } |
208 | |
209 | void set_memcopy_blocksize(int64_t blocksize) { memcopy_blocksize_ = blocksize; } |
210 | |
211 | void set_memcopy_threshold(int64_t threshold) { memcopy_threshold_ = threshold; } |
212 | |
213 | private: |
214 | std::mutex lock_; |
215 | std::shared_ptr<Buffer> buffer_; |
216 | uint8_t* mutable_data_; |
217 | int64_t size_; |
218 | int64_t position_; |
219 | bool is_open_; |
220 | |
221 | int memcopy_num_threads_; |
222 | int64_t memcopy_blocksize_; |
223 | int64_t memcopy_threshold_; |
224 | }; |
225 | |
226 | FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) |
227 | : impl_(new FixedSizeBufferWriterImpl(buffer)) {} |
228 | |
229 | FixedSizeBufferWriter::~FixedSizeBufferWriter() = default; |
230 | |
231 | Status FixedSizeBufferWriter::Close() { return impl_->Close(); } |
232 | |
233 | bool FixedSizeBufferWriter::closed() const { return impl_->closed(); } |
234 | |
235 | Status FixedSizeBufferWriter::Seek(int64_t position) { return impl_->Seek(position); } |
236 | |
237 | Status FixedSizeBufferWriter::Tell(int64_t* position) const { |
238 | return impl_->Tell(position); |
239 | } |
240 | |
241 | Status FixedSizeBufferWriter::Write(const void* data, int64_t nbytes) { |
242 | return impl_->Write(data, nbytes); |
243 | } |
244 | |
245 | Status FixedSizeBufferWriter::WriteAt(int64_t position, const void* data, |
246 | int64_t nbytes) { |
247 | return impl_->WriteAt(position, data, nbytes); |
248 | } |
249 | |
250 | void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) { |
251 | impl_->set_memcopy_threads(num_threads); |
252 | } |
253 | |
254 | void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) { |
255 | impl_->set_memcopy_blocksize(blocksize); |
256 | } |
257 | |
258 | void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) { |
259 | impl_->set_memcopy_threshold(threshold); |
260 | } |
261 | |
262 | // ---------------------------------------------------------------------- |
263 | // In-memory buffer reader |
264 | |
265 | BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer) |
266 | : buffer_(buffer), |
267 | data_(buffer->data()), |
268 | size_(buffer->size()), |
269 | position_(0), |
270 | is_open_(true) {} |
271 | |
272 | BufferReader::BufferReader(const uint8_t* data, int64_t size) |
273 | : buffer_(nullptr), data_(data), size_(size), position_(0), is_open_(true) {} |
274 | |
275 | BufferReader::BufferReader(const Buffer& buffer) |
276 | : BufferReader(buffer.data(), buffer.size()) {} |
277 | |
278 | Status BufferReader::Close() { |
279 | is_open_ = false; |
280 | return Status::OK(); |
281 | } |
282 | |
283 | bool BufferReader::closed() const { return !is_open_; } |
284 | |
285 | Status BufferReader::Tell(int64_t* position) const { |
286 | *position = position_; |
287 | return Status::OK(); |
288 | } |
289 | |
290 | util::string_view BufferReader::Peek(int64_t nbytes) const { |
291 | const int64_t bytes_available = std::min(nbytes, size_ - position_); |
292 | return util::string_view(reinterpret_cast<const char*>(data_) + position_, |
293 | static_cast<size_t>(bytes_available)); |
294 | } |
295 | |
296 | bool BufferReader::supports_zero_copy() const { return true; } |
297 | |
298 | Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, |
299 | void* buffer) { |
300 | if (nbytes < 0) { |
301 | return Status::IOError("Cannot read a negative number of bytes from BufferReader." ); |
302 | } |
303 | *bytes_read = std::min(nbytes, size_ - position); |
304 | if (*bytes_read) { |
305 | memcpy(buffer, data_ + position, *bytes_read); |
306 | } |
307 | return Status::OK(); |
308 | } |
309 | |
310 | Status BufferReader::ReadAt(int64_t position, int64_t nbytes, |
311 | std::shared_ptr<Buffer>* out) { |
312 | if (nbytes < 0) { |
313 | return Status::IOError("Cannot read a negative number of bytes from BufferReader." ); |
314 | } |
315 | int64_t size = std::min(nbytes, size_ - position); |
316 | |
317 | if (size > 0 && buffer_ != nullptr) { |
318 | *out = SliceBuffer(buffer_, position, size); |
319 | } else { |
320 | *out = std::make_shared<Buffer>(data_ + position, size); |
321 | } |
322 | return Status::OK(); |
323 | } |
324 | |
325 | Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) { |
326 | RETURN_NOT_OK(ReadAt(position_, nbytes, bytes_read, buffer)); |
327 | position_ += *bytes_read; |
328 | return Status::OK(); |
329 | } |
330 | |
331 | Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
332 | RETURN_NOT_OK(ReadAt(position_, nbytes, out)); |
333 | position_ += (*out)->size(); |
334 | return Status::OK(); |
335 | } |
336 | |
337 | Status BufferReader::GetSize(int64_t* size) { |
338 | *size = size_; |
339 | return Status::OK(); |
340 | } |
341 | |
342 | Status BufferReader::Seek(int64_t position) { |
343 | if (position < 0 || position > size_) { |
344 | return Status::IOError("Seek out of bounds" ); |
345 | } |
346 | |
347 | position_ = position; |
348 | return Status::OK(); |
349 | } |
350 | |
351 | } // namespace io |
352 | } // namespace arrow |
353 | |