1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/io/memory.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <mutex>
24
25#include "arrow/buffer.h"
26#include "arrow/status.h"
27#include "arrow/util/logging.h"
28#include "arrow/util/macros.h"
29#include "arrow/util/memory.h"
30
31namespace arrow {
32namespace io {
33
34// ----------------------------------------------------------------------
35// OutputStream that writes to resizable buffer
36
37static constexpr int64_t kBufferMinimumSize = 256;
38
39BufferOutputStream::BufferOutputStream()
40 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
41
42BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer)
43 : buffer_(buffer),
44 is_open_(true),
45 capacity_(buffer->size()),
46 position_(0),
47 mutable_data_(buffer->mutable_data()) {}
48
49Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool,
50 std::shared_ptr<BufferOutputStream>* out) {
51 // ctor is private, so cannot use make_shared
52 *out = std::shared_ptr<BufferOutputStream>(new BufferOutputStream);
53 return (*out)->Reset(initial_capacity, pool);
54}
55
56Status BufferOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool) {
57 RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer_));
58 is_open_ = true;
59 capacity_ = initial_capacity;
60 position_ = 0;
61 mutable_data_ = buffer_->mutable_data();
62 return Status::OK();
63}
64
65BufferOutputStream::~BufferOutputStream() {
66 // This can fail, better to explicitly call close
67 if (buffer_) {
68 DCHECK(Close().ok());
69 }
70}
71
72Status BufferOutputStream::Close() {
73 if (is_open_) {
74 is_open_ = false;
75 if (position_ < capacity_) {
76 RETURN_NOT_OK(buffer_->Resize(position_, false));
77 }
78 }
79 return Status::OK();
80}
81
82bool BufferOutputStream::closed() const { return !is_open_; }
83
84Status BufferOutputStream::Finish(std::shared_ptr<Buffer>* result) {
85 RETURN_NOT_OK(Close());
86 buffer_->ZeroPadding();
87 *result = buffer_;
88 buffer_ = nullptr;
89 is_open_ = false;
90 return Status::OK();
91}
92
93Status BufferOutputStream::Tell(int64_t* position) const {
94 *position = position_;
95 return Status::OK();
96}
97
98Status BufferOutputStream::Write(const void* data, int64_t nbytes) {
99 if (ARROW_PREDICT_FALSE(!is_open_)) {
100 return Status::IOError("OutputStream is closed");
101 }
102 DCHECK(buffer_);
103 RETURN_NOT_OK(Reserve(nbytes));
104 memcpy(mutable_data_ + position_, data, nbytes);
105 position_ += nbytes;
106 return Status::OK();
107}
108
109Status BufferOutputStream::Reserve(int64_t nbytes) {
110 int64_t new_capacity = capacity_;
111 while (position_ + nbytes > new_capacity) {
112 new_capacity = std::max(kBufferMinimumSize, new_capacity * 2);
113 }
114 if (new_capacity > capacity_) {
115 RETURN_NOT_OK(buffer_->Resize(new_capacity));
116 capacity_ = new_capacity;
117 }
118 mutable_data_ = buffer_->mutable_data();
119 return Status::OK();
120}
121
122// ----------------------------------------------------------------------
123// OutputStream that doesn't write anything
124
125Status MockOutputStream::Close() {
126 is_open_ = false;
127 return Status::OK();
128}
129
130bool MockOutputStream::closed() const { return !is_open_; }
131
132Status MockOutputStream::Tell(int64_t* position) const {
133 *position = extent_bytes_written_;
134 return Status::OK();
135}
136
137Status MockOutputStream::Write(const void* data, int64_t nbytes) {
138 extent_bytes_written_ += nbytes;
139 return Status::OK();
140}
141
142// ----------------------------------------------------------------------
143// In-memory buffer writer
144
145static constexpr int kMemcopyDefaultNumThreads = 1;
146static constexpr int64_t kMemcopyDefaultBlocksize = 64;
147static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024;
148
149class FixedSizeBufferWriter::FixedSizeBufferWriterImpl {
150 public:
151 /// Input buffer must be mutable, will abort if not
152
153 /// Input buffer must be mutable, will abort if not
154 explicit FixedSizeBufferWriterImpl(const std::shared_ptr<Buffer>& buffer)
155 : is_open_(true),
156 memcopy_num_threads_(kMemcopyDefaultNumThreads),
157 memcopy_blocksize_(kMemcopyDefaultBlocksize),
158 memcopy_threshold_(kMemcopyDefaultThreshold) {
159 buffer_ = buffer;
160 DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
161 mutable_data_ = buffer->mutable_data();
162 size_ = buffer->size();
163 position_ = 0;
164 }
165
166 Status Close() {
167 is_open_ = false;
168 return Status::OK();
169 }
170
171 bool closed() const { return !is_open_; }
172
173 Status Seek(int64_t position) {
174 if (position < 0 || position > size_) {
175 return Status::IOError("Seek out of bounds");
176 }
177 position_ = position;
178 return Status::OK();
179 }
180
181 Status Tell(int64_t* position) {
182 *position = position_;
183 return Status::OK();
184 }
185
186 Status Write(const void* data, int64_t nbytes) {
187 if (position_ + nbytes > size_) {
188 return Status::IOError("Write out of bounds");
189 }
190 if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
191 internal::parallel_memcopy(mutable_data_ + position_,
192 reinterpret_cast<const uint8_t*>(data), nbytes,
193 memcopy_blocksize_, memcopy_num_threads_);
194 } else {
195 memcpy(mutable_data_ + position_, data, nbytes);
196 }
197 position_ += nbytes;
198 return Status::OK();
199 }
200
201 Status WriteAt(int64_t position, const void* data, int64_t nbytes) {
202 std::lock_guard<std::mutex> guard(lock_);
203 RETURN_NOT_OK(Seek(position));
204 return Write(data, nbytes);
205 }
206
207 void set_memcopy_threads(int num_threads) { memcopy_num_threads_ = num_threads; }
208
209 void set_memcopy_blocksize(int64_t blocksize) { memcopy_blocksize_ = blocksize; }
210
211 void set_memcopy_threshold(int64_t threshold) { memcopy_threshold_ = threshold; }
212
213 private:
214 std::mutex lock_;
215 std::shared_ptr<Buffer> buffer_;
216 uint8_t* mutable_data_;
217 int64_t size_;
218 int64_t position_;
219 bool is_open_;
220
221 int memcopy_num_threads_;
222 int64_t memcopy_blocksize_;
223 int64_t memcopy_threshold_;
224};
225
226FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
227 : impl_(new FixedSizeBufferWriterImpl(buffer)) {}
228
229FixedSizeBufferWriter::~FixedSizeBufferWriter() = default;
230
231Status FixedSizeBufferWriter::Close() { return impl_->Close(); }
232
233bool FixedSizeBufferWriter::closed() const { return impl_->closed(); }
234
235Status FixedSizeBufferWriter::Seek(int64_t position) { return impl_->Seek(position); }
236
237Status FixedSizeBufferWriter::Tell(int64_t* position) const {
238 return impl_->Tell(position);
239}
240
241Status FixedSizeBufferWriter::Write(const void* data, int64_t nbytes) {
242 return impl_->Write(data, nbytes);
243}
244
245Status FixedSizeBufferWriter::WriteAt(int64_t position, const void* data,
246 int64_t nbytes) {
247 return impl_->WriteAt(position, data, nbytes);
248}
249
250void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) {
251 impl_->set_memcopy_threads(num_threads);
252}
253
254void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) {
255 impl_->set_memcopy_blocksize(blocksize);
256}
257
258void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) {
259 impl_->set_memcopy_threshold(threshold);
260}
261
262// ----------------------------------------------------------------------
263// In-memory buffer reader
264
265BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
266 : buffer_(buffer),
267 data_(buffer->data()),
268 size_(buffer->size()),
269 position_(0),
270 is_open_(true) {}
271
272BufferReader::BufferReader(const uint8_t* data, int64_t size)
273 : buffer_(nullptr), data_(data), size_(size), position_(0), is_open_(true) {}
274
275BufferReader::BufferReader(const Buffer& buffer)
276 : BufferReader(buffer.data(), buffer.size()) {}
277
278Status BufferReader::Close() {
279 is_open_ = false;
280 return Status::OK();
281}
282
283bool BufferReader::closed() const { return !is_open_; }
284
285Status BufferReader::Tell(int64_t* position) const {
286 *position = position_;
287 return Status::OK();
288}
289
290util::string_view BufferReader::Peek(int64_t nbytes) const {
291 const int64_t bytes_available = std::min(nbytes, size_ - position_);
292 return util::string_view(reinterpret_cast<const char*>(data_) + position_,
293 static_cast<size_t>(bytes_available));
294}
295
296bool BufferReader::supports_zero_copy() const { return true; }
297
298Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
299 void* buffer) {
300 if (nbytes < 0) {
301 return Status::IOError("Cannot read a negative number of bytes from BufferReader.");
302 }
303 *bytes_read = std::min(nbytes, size_ - position);
304 if (*bytes_read) {
305 memcpy(buffer, data_ + position, *bytes_read);
306 }
307 return Status::OK();
308}
309
310Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
311 std::shared_ptr<Buffer>* out) {
312 if (nbytes < 0) {
313 return Status::IOError("Cannot read a negative number of bytes from BufferReader.");
314 }
315 int64_t size = std::min(nbytes, size_ - position);
316
317 if (size > 0 && buffer_ != nullptr) {
318 *out = SliceBuffer(buffer_, position, size);
319 } else {
320 *out = std::make_shared<Buffer>(data_ + position, size);
321 }
322 return Status::OK();
323}
324
325Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) {
326 RETURN_NOT_OK(ReadAt(position_, nbytes, bytes_read, buffer));
327 position_ += *bytes_read;
328 return Status::OK();
329}
330
331Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
332 RETURN_NOT_OK(ReadAt(position_, nbytes, out));
333 position_ += (*out)->size();
334 return Status::OK();
335}
336
337Status BufferReader::GetSize(int64_t* size) {
338 *size = size_;
339 return Status::OK();
340}
341
342Status BufferReader::Seek(int64_t position) {
343 if (position < 0 || position > size_) {
344 return Status::IOError("Seek out of bounds");
345 }
346
347 position_ = position;
348 return Status::OK();
349}
350
351} // namespace io
352} // namespace arrow
353