1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/io/buffered.h"
19
20#include <algorithm>
21#include <cstring>
22#include <memory>
23#include <mutex>
24#include <utility>
25
26#include "arrow/buffer.h"
27#include "arrow/memory_pool.h"
28#include "arrow/status.h"
29#include "arrow/util/logging.h"
30#include "arrow/util/string_view.h"
31
32namespace arrow {
33namespace io {
34
35// ----------------------------------------------------------------------
36// BufferedOutputStream implementation
37
38class BufferedBase {
39 public:
40 explicit BufferedBase(MemoryPool* pool)
41 : pool_(pool),
42 is_open_(true),
43 buffer_data_(nullptr),
44 buffer_pos_(0),
45 buffer_size_(0),
46 raw_pos_(-1) {}
47
48 bool closed() const {
49 std::lock_guard<std::mutex> guard(lock_);
50 return !is_open_;
51 }
52
53 Status ResetBuffer() {
54 if (!buffer_) {
55 // On first invocation, or if the buffer has been released, we allocate a
56 // new buffer
57 RETURN_NOT_OK(AllocateResizableBuffer(pool_, buffer_size_, &buffer_));
58 } else if (buffer_->size() != buffer_size_) {
59 RETURN_NOT_OK(buffer_->Resize(buffer_size_));
60 }
61 buffer_data_ = buffer_->mutable_data();
62 buffer_pos_ = 0;
63 return Status::OK();
64 }
65
66 Status ResizeBuffer(int64_t new_buffer_size) {
67 buffer_size_ = new_buffer_size;
68 return ResetBuffer();
69 }
70
71 void AppendToBuffer(const void* data, int64_t nbytes) {
72 DCHECK_LE(buffer_pos_ + nbytes, buffer_size_);
73 std::memcpy(buffer_data_ + buffer_pos_, data, nbytes);
74 buffer_pos_ += nbytes;
75 }
76
77 int64_t buffer_size() const { return buffer_size_; }
78
79 protected:
80 MemoryPool* pool_;
81 bool is_open_;
82
83 std::shared_ptr<ResizableBuffer> buffer_;
84 uint8_t* buffer_data_;
85 int64_t buffer_pos_;
86 int64_t buffer_size_;
87
88 mutable int64_t raw_pos_;
89 mutable std::mutex lock_;
90};
91
92class BufferedOutputStream::Impl : public BufferedBase {
93 public:
94 explicit Impl(std::shared_ptr<OutputStream> raw, MemoryPool* pool)
95 : BufferedBase(pool), raw_(std::move(raw)) {}
96
97 Status Close() {
98 std::lock_guard<std::mutex> guard(lock_);
99 if (is_open_) {
100 Status st = FlushUnlocked();
101 is_open_ = false;
102 RETURN_NOT_OK(raw_->Close());
103 return st;
104 }
105 return Status::OK();
106 }
107
108 Status Tell(int64_t* position) const {
109 std::lock_guard<std::mutex> guard(lock_);
110 if (raw_pos_ == -1) {
111 RETURN_NOT_OK(raw_->Tell(&raw_pos_));
112 DCHECK_GE(raw_pos_, 0);
113 }
114 *position = raw_pos_ + buffer_pos_;
115 return Status::OK();
116 }
117
118 Status Write(const void* data, int64_t nbytes) {
119 std::lock_guard<std::mutex> guard(lock_);
120 if (nbytes < 0) {
121 return Status::Invalid("write count should be >= 0");
122 }
123 if (nbytes == 0) {
124 return Status::OK();
125 }
126 if (nbytes + buffer_pos_ >= buffer_size_) {
127 RETURN_NOT_OK(FlushUnlocked());
128 DCHECK_EQ(buffer_pos_, 0);
129 if (nbytes >= buffer_size_) {
130 // Direct write
131 return raw_->Write(data, nbytes);
132 }
133 }
134 AppendToBuffer(data, nbytes);
135 return Status::OK();
136 }
137
138 Status FlushUnlocked() {
139 if (buffer_pos_ > 0) {
140 // Invalidate cached raw pos
141 raw_pos_ = -1;
142 RETURN_NOT_OK(raw_->Write(buffer_data_, buffer_pos_));
143 buffer_pos_ = 0;
144 }
145 return Status::OK();
146 }
147
148 Status Flush() {
149 std::lock_guard<std::mutex> guard(lock_);
150 return FlushUnlocked();
151 }
152
153 Status Detach(std::shared_ptr<OutputStream>* raw) {
154 RETURN_NOT_OK(Flush());
155 *raw = std::move(raw_);
156 is_open_ = false;
157 return Status::OK();
158 }
159
160 Status SetBufferSize(int64_t new_buffer_size) {
161 std::lock_guard<std::mutex> guard(lock_);
162 DCHECK_GT(new_buffer_size, 0);
163 if (buffer_pos_ >= new_buffer_size) {
164 // If the buffer is shrinking, first flush to the raw OutputStream
165 RETURN_NOT_OK(FlushUnlocked());
166 }
167 return ResizeBuffer(new_buffer_size);
168 }
169
170 std::shared_ptr<OutputStream> raw() const { return raw_; }
171
172 private:
173 std::shared_ptr<OutputStream> raw_;
174};
175
176BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw,
177 MemoryPool* pool) {
178 impl_.reset(new Impl(std::move(raw), pool));
179}
180
181Status BufferedOutputStream::Create(int64_t buffer_size, MemoryPool* pool,
182 std::shared_ptr<OutputStream> raw,
183 std::shared_ptr<BufferedOutputStream>* out) {
184 auto result = std::shared_ptr<BufferedOutputStream>(
185 new BufferedOutputStream(std::move(raw), pool));
186 RETURN_NOT_OK(result->SetBufferSize(buffer_size));
187 *out = std::move(result);
188 return Status::OK();
189}
190
191BufferedOutputStream::~BufferedOutputStream() { DCHECK_OK(impl_->Close()); }
192
193Status BufferedOutputStream::SetBufferSize(int64_t new_buffer_size) {
194 return impl_->SetBufferSize(new_buffer_size);
195}
196
197int64_t BufferedOutputStream::buffer_size() const { return impl_->buffer_size(); }
198
199Status BufferedOutputStream::Detach(std::shared_ptr<OutputStream>* raw) {
200 return impl_->Detach(raw);
201}
202
203Status BufferedOutputStream::Close() { return impl_->Close(); }
204
205bool BufferedOutputStream::closed() const { return impl_->closed(); }
206
207Status BufferedOutputStream::Tell(int64_t* position) const {
208 return impl_->Tell(position);
209}
210
211Status BufferedOutputStream::Write(const void* data, int64_t nbytes) {
212 return impl_->Write(data, nbytes);
213}
214
215Status BufferedOutputStream::Flush() { return impl_->Flush(); }
216
217std::shared_ptr<OutputStream> BufferedOutputStream::raw() const { return impl_->raw(); }
218
219// ----------------------------------------------------------------------
220// BufferedInputStream implementation
221
222class BufferedInputStream::Impl : public BufferedBase {
223 public:
224 Impl(std::shared_ptr<InputStream> raw, MemoryPool* pool)
225 : BufferedBase(pool), raw_(std::move(raw)), bytes_buffered_(0) {}
226
227 ~Impl() { DCHECK_OK(Close()); }
228
229 Status Close() {
230 std::lock_guard<std::mutex> guard(lock_);
231 if (is_open_) {
232 is_open_ = false;
233 return raw_->Close();
234 }
235 return Status::OK();
236 }
237
238 Status Tell(int64_t* position) const {
239 std::lock_guard<std::mutex> guard(lock_);
240 if (raw_pos_ == -1) {
241 RETURN_NOT_OK(raw_->Tell(&raw_pos_));
242 DCHECK_GE(raw_pos_, 0);
243 }
244 // Shift by bytes_buffered to return semantic stream position
245 *position = raw_pos_ - bytes_buffered_;
246 return Status::OK();
247 }
248
249 Status SetBufferSize(int64_t new_buffer_size) {
250 std::lock_guard<std::mutex> guard(lock_);
251 DCHECK_GT(new_buffer_size, 0);
252 if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
253 return Status::Invalid("Cannot shrink read buffer if buffered data remains");
254 }
255 return ResizeBuffer(new_buffer_size);
256 }
257
258 util::string_view Peek(int64_t nbytes) const {
259 int64_t peek_size = std::min(nbytes, bytes_buffered_);
260 return util::string_view(reinterpret_cast<const char*>(buffer_data_ + buffer_pos_),
261 static_cast<size_t>(peek_size));
262 }
263
264 int64_t bytes_buffered() const { return bytes_buffered_; }
265
266 int64_t buffer_size() const { return buffer_size_; }
267
268 std::shared_ptr<InputStream> Detach() {
269 std::lock_guard<std::mutex> guard(lock_);
270 is_open_ = false;
271 return std::move(raw_);
272 }
273
274 void RewindBuffer() {
275 // Invalidate buffered data, as with a Seek or large Read
276 buffer_pos_ = bytes_buffered_ = 0;
277 }
278
279 Status BufferIfNeeded() {
280 if (bytes_buffered_ == 0) {
281 // Fill buffer
282 if (!buffer_) {
283 RETURN_NOT_OK(ResetBuffer());
284 }
285 RETURN_NOT_OK(raw_->Read(buffer_size_, &bytes_buffered_, buffer_data_));
286 buffer_pos_ = 0;
287
288 // Do not make assumptions about the raw stream position
289 raw_pos_ = -1;
290 }
291 return Status::OK();
292 }
293
294 void ConsumeBuffer(int64_t nbytes) {
295 buffer_pos_ += nbytes;
296 bytes_buffered_ -= nbytes;
297 }
298
299 Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
300 std::lock_guard<std::mutex> guard(lock_);
301 DCHECK_GT(nbytes, 0);
302
303 if (nbytes < buffer_size_) {
304 // Pre-buffer for small reads
305 RETURN_NOT_OK(BufferIfNeeded());
306 }
307
308 if (nbytes > bytes_buffered_) {
309 // Copy buffered bytes into out, then read rest
310 memcpy(out, buffer_data_ + buffer_pos_, bytes_buffered_);
311 RETURN_NOT_OK(raw_->Read(nbytes - bytes_buffered_, bytes_read,
312 reinterpret_cast<uint8_t*>(out) + bytes_buffered_));
313 // Do not make assumptions about the raw stream position
314 raw_pos_ = -1;
315 *bytes_read += bytes_buffered_;
316 RewindBuffer();
317 } else {
318 memcpy(out, buffer_data_ + buffer_pos_, nbytes);
319 *bytes_read = nbytes;
320 ConsumeBuffer(nbytes);
321 }
322 return Status::OK();
323 }
324
325 Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
326 std::shared_ptr<ResizableBuffer> buffer;
327 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
328
329 int64_t bytes_read = 0;
330 RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
331
332 if (bytes_read < nbytes) {
333 // Change size but do not reallocate internal capacity
334 RETURN_NOT_OK(buffer->Resize(bytes_read, false /* shrink_to_fit */));
335 buffer->ZeroPadding();
336 }
337
338 *out = buffer;
339 return Status::OK();
340 }
341
342 // For providing access to the raw file handles
343 std::shared_ptr<InputStream> raw() const { return raw_; }
344
345 private:
346 std::shared_ptr<InputStream> raw_;
347
348 // Number of remaining bytes in the buffer, to be reduced on each read from
349 // the buffer
350 int64_t bytes_buffered_;
351};
352
353BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw,
354 MemoryPool* pool) {
355 impl_.reset(new Impl(std::move(raw), pool));
356}
357
358BufferedInputStream::~BufferedInputStream() { DCHECK_OK(impl_->Close()); }
359
360Status BufferedInputStream::Create(int64_t buffer_size, MemoryPool* pool,
361 std::shared_ptr<InputStream> raw,
362 std::shared_ptr<BufferedInputStream>* out) {
363 auto result =
364 std::shared_ptr<BufferedInputStream>(new BufferedInputStream(std::move(raw), pool));
365 RETURN_NOT_OK(result->SetBufferSize(buffer_size));
366 *out = std::move(result);
367 return Status::OK();
368}
369
370Status BufferedInputStream::Close() { return impl_->Close(); }
371
372bool BufferedInputStream::closed() const { return impl_->closed(); }
373
374std::shared_ptr<InputStream> BufferedInputStream::Detach() { return impl_->Detach(); }
375
376std::shared_ptr<InputStream> BufferedInputStream::raw() const { return impl_->raw(); }
377
378Status BufferedInputStream::Tell(int64_t* position) const {
379 return impl_->Tell(position);
380}
381
382util::string_view BufferedInputStream::Peek(int64_t nbytes) const {
383 return impl_->Peek(nbytes);
384}
385
386Status BufferedInputStream::SetBufferSize(int64_t new_buffer_size) {
387 return impl_->SetBufferSize(new_buffer_size);
388}
389
390int64_t BufferedInputStream::bytes_buffered() const { return impl_->bytes_buffered(); }
391
392int64_t BufferedInputStream::buffer_size() const { return impl_->buffer_size(); }
393
394Status BufferedInputStream::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
395 return impl_->Read(nbytes, bytes_read, out);
396}
397
398Status BufferedInputStream::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
399 return impl_->Read(nbytes, out);
400}
401
402} // namespace io
403} // namespace arrow
404