1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/io/buffered.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstring> |
22 | #include <memory> |
23 | #include <mutex> |
24 | #include <utility> |
25 | |
26 | #include "arrow/buffer.h" |
27 | #include "arrow/memory_pool.h" |
28 | #include "arrow/status.h" |
29 | #include "arrow/util/logging.h" |
30 | #include "arrow/util/string_view.h" |
31 | |
32 | namespace arrow { |
33 | namespace io { |
34 | |
35 | // ---------------------------------------------------------------------- |
36 | // BufferedOutputStream implementation |
37 | |
38 | class BufferedBase { |
39 | public: |
40 | explicit BufferedBase(MemoryPool* pool) |
41 | : pool_(pool), |
42 | is_open_(true), |
43 | buffer_data_(nullptr), |
44 | buffer_pos_(0), |
45 | buffer_size_(0), |
46 | raw_pos_(-1) {} |
47 | |
48 | bool closed() const { |
49 | std::lock_guard<std::mutex> guard(lock_); |
50 | return !is_open_; |
51 | } |
52 | |
53 | Status ResetBuffer() { |
54 | if (!buffer_) { |
55 | // On first invocation, or if the buffer has been released, we allocate a |
56 | // new buffer |
57 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, buffer_size_, &buffer_)); |
58 | } else if (buffer_->size() != buffer_size_) { |
59 | RETURN_NOT_OK(buffer_->Resize(buffer_size_)); |
60 | } |
61 | buffer_data_ = buffer_->mutable_data(); |
62 | buffer_pos_ = 0; |
63 | return Status::OK(); |
64 | } |
65 | |
66 | Status ResizeBuffer(int64_t new_buffer_size) { |
67 | buffer_size_ = new_buffer_size; |
68 | return ResetBuffer(); |
69 | } |
70 | |
71 | void AppendToBuffer(const void* data, int64_t nbytes) { |
72 | DCHECK_LE(buffer_pos_ + nbytes, buffer_size_); |
73 | std::memcpy(buffer_data_ + buffer_pos_, data, nbytes); |
74 | buffer_pos_ += nbytes; |
75 | } |
76 | |
77 | int64_t buffer_size() const { return buffer_size_; } |
78 | |
79 | protected: |
80 | MemoryPool* pool_; |
81 | bool is_open_; |
82 | |
83 | std::shared_ptr<ResizableBuffer> buffer_; |
84 | uint8_t* buffer_data_; |
85 | int64_t buffer_pos_; |
86 | int64_t buffer_size_; |
87 | |
88 | mutable int64_t raw_pos_; |
89 | mutable std::mutex lock_; |
90 | }; |
91 | |
92 | class BufferedOutputStream::Impl : public BufferedBase { |
93 | public: |
94 | explicit Impl(std::shared_ptr<OutputStream> raw, MemoryPool* pool) |
95 | : BufferedBase(pool), raw_(std::move(raw)) {} |
96 | |
97 | Status Close() { |
98 | std::lock_guard<std::mutex> guard(lock_); |
99 | if (is_open_) { |
100 | Status st = FlushUnlocked(); |
101 | is_open_ = false; |
102 | RETURN_NOT_OK(raw_->Close()); |
103 | return st; |
104 | } |
105 | return Status::OK(); |
106 | } |
107 | |
108 | Status Tell(int64_t* position) const { |
109 | std::lock_guard<std::mutex> guard(lock_); |
110 | if (raw_pos_ == -1) { |
111 | RETURN_NOT_OK(raw_->Tell(&raw_pos_)); |
112 | DCHECK_GE(raw_pos_, 0); |
113 | } |
114 | *position = raw_pos_ + buffer_pos_; |
115 | return Status::OK(); |
116 | } |
117 | |
118 | Status Write(const void* data, int64_t nbytes) { |
119 | std::lock_guard<std::mutex> guard(lock_); |
120 | if (nbytes < 0) { |
121 | return Status::Invalid("write count should be >= 0" ); |
122 | } |
123 | if (nbytes == 0) { |
124 | return Status::OK(); |
125 | } |
126 | if (nbytes + buffer_pos_ >= buffer_size_) { |
127 | RETURN_NOT_OK(FlushUnlocked()); |
128 | DCHECK_EQ(buffer_pos_, 0); |
129 | if (nbytes >= buffer_size_) { |
130 | // Direct write |
131 | return raw_->Write(data, nbytes); |
132 | } |
133 | } |
134 | AppendToBuffer(data, nbytes); |
135 | return Status::OK(); |
136 | } |
137 | |
138 | Status FlushUnlocked() { |
139 | if (buffer_pos_ > 0) { |
140 | // Invalidate cached raw pos |
141 | raw_pos_ = -1; |
142 | RETURN_NOT_OK(raw_->Write(buffer_data_, buffer_pos_)); |
143 | buffer_pos_ = 0; |
144 | } |
145 | return Status::OK(); |
146 | } |
147 | |
148 | Status Flush() { |
149 | std::lock_guard<std::mutex> guard(lock_); |
150 | return FlushUnlocked(); |
151 | } |
152 | |
153 | Status Detach(std::shared_ptr<OutputStream>* raw) { |
154 | RETURN_NOT_OK(Flush()); |
155 | *raw = std::move(raw_); |
156 | is_open_ = false; |
157 | return Status::OK(); |
158 | } |
159 | |
160 | Status SetBufferSize(int64_t new_buffer_size) { |
161 | std::lock_guard<std::mutex> guard(lock_); |
162 | DCHECK_GT(new_buffer_size, 0); |
163 | if (buffer_pos_ >= new_buffer_size) { |
164 | // If the buffer is shrinking, first flush to the raw OutputStream |
165 | RETURN_NOT_OK(FlushUnlocked()); |
166 | } |
167 | return ResizeBuffer(new_buffer_size); |
168 | } |
169 | |
170 | std::shared_ptr<OutputStream> raw() const { return raw_; } |
171 | |
172 | private: |
173 | std::shared_ptr<OutputStream> raw_; |
174 | }; |
175 | |
176 | BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw, |
177 | MemoryPool* pool) { |
178 | impl_.reset(new Impl(std::move(raw), pool)); |
179 | } |
180 | |
181 | Status BufferedOutputStream::Create(int64_t buffer_size, MemoryPool* pool, |
182 | std::shared_ptr<OutputStream> raw, |
183 | std::shared_ptr<BufferedOutputStream>* out) { |
184 | auto result = std::shared_ptr<BufferedOutputStream>( |
185 | new BufferedOutputStream(std::move(raw), pool)); |
186 | RETURN_NOT_OK(result->SetBufferSize(buffer_size)); |
187 | *out = std::move(result); |
188 | return Status::OK(); |
189 | } |
190 | |
191 | BufferedOutputStream::~BufferedOutputStream() { DCHECK_OK(impl_->Close()); } |
192 | |
193 | Status BufferedOutputStream::SetBufferSize(int64_t new_buffer_size) { |
194 | return impl_->SetBufferSize(new_buffer_size); |
195 | } |
196 | |
197 | int64_t BufferedOutputStream::buffer_size() const { return impl_->buffer_size(); } |
198 | |
199 | Status BufferedOutputStream::Detach(std::shared_ptr<OutputStream>* raw) { |
200 | return impl_->Detach(raw); |
201 | } |
202 | |
203 | Status BufferedOutputStream::Close() { return impl_->Close(); } |
204 | |
205 | bool BufferedOutputStream::closed() const { return impl_->closed(); } |
206 | |
207 | Status BufferedOutputStream::Tell(int64_t* position) const { |
208 | return impl_->Tell(position); |
209 | } |
210 | |
211 | Status BufferedOutputStream::Write(const void* data, int64_t nbytes) { |
212 | return impl_->Write(data, nbytes); |
213 | } |
214 | |
215 | Status BufferedOutputStream::Flush() { return impl_->Flush(); } |
216 | |
217 | std::shared_ptr<OutputStream> BufferedOutputStream::raw() const { return impl_->raw(); } |
218 | |
219 | // ---------------------------------------------------------------------- |
220 | // BufferedInputStream implementation |
221 | |
222 | class BufferedInputStream::Impl : public BufferedBase { |
223 | public: |
224 | Impl(std::shared_ptr<InputStream> raw, MemoryPool* pool) |
225 | : BufferedBase(pool), raw_(std::move(raw)), bytes_buffered_(0) {} |
226 | |
227 | ~Impl() { DCHECK_OK(Close()); } |
228 | |
229 | Status Close() { |
230 | std::lock_guard<std::mutex> guard(lock_); |
231 | if (is_open_) { |
232 | is_open_ = false; |
233 | return raw_->Close(); |
234 | } |
235 | return Status::OK(); |
236 | } |
237 | |
238 | Status Tell(int64_t* position) const { |
239 | std::lock_guard<std::mutex> guard(lock_); |
240 | if (raw_pos_ == -1) { |
241 | RETURN_NOT_OK(raw_->Tell(&raw_pos_)); |
242 | DCHECK_GE(raw_pos_, 0); |
243 | } |
244 | // Shift by bytes_buffered to return semantic stream position |
245 | *position = raw_pos_ - bytes_buffered_; |
246 | return Status::OK(); |
247 | } |
248 | |
249 | Status SetBufferSize(int64_t new_buffer_size) { |
250 | std::lock_guard<std::mutex> guard(lock_); |
251 | DCHECK_GT(new_buffer_size, 0); |
252 | if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) { |
253 | return Status::Invalid("Cannot shrink read buffer if buffered data remains" ); |
254 | } |
255 | return ResizeBuffer(new_buffer_size); |
256 | } |
257 | |
258 | util::string_view Peek(int64_t nbytes) const { |
259 | int64_t peek_size = std::min(nbytes, bytes_buffered_); |
260 | return util::string_view(reinterpret_cast<const char*>(buffer_data_ + buffer_pos_), |
261 | static_cast<size_t>(peek_size)); |
262 | } |
263 | |
264 | int64_t bytes_buffered() const { return bytes_buffered_; } |
265 | |
266 | int64_t buffer_size() const { return buffer_size_; } |
267 | |
268 | std::shared_ptr<InputStream> Detach() { |
269 | std::lock_guard<std::mutex> guard(lock_); |
270 | is_open_ = false; |
271 | return std::move(raw_); |
272 | } |
273 | |
274 | void RewindBuffer() { |
275 | // Invalidate buffered data, as with a Seek or large Read |
276 | buffer_pos_ = bytes_buffered_ = 0; |
277 | } |
278 | |
279 | Status BufferIfNeeded() { |
280 | if (bytes_buffered_ == 0) { |
281 | // Fill buffer |
282 | if (!buffer_) { |
283 | RETURN_NOT_OK(ResetBuffer()); |
284 | } |
285 | RETURN_NOT_OK(raw_->Read(buffer_size_, &bytes_buffered_, buffer_data_)); |
286 | buffer_pos_ = 0; |
287 | |
288 | // Do not make assumptions about the raw stream position |
289 | raw_pos_ = -1; |
290 | } |
291 | return Status::OK(); |
292 | } |
293 | |
294 | void ConsumeBuffer(int64_t nbytes) { |
295 | buffer_pos_ += nbytes; |
296 | bytes_buffered_ -= nbytes; |
297 | } |
298 | |
299 | Status Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
300 | std::lock_guard<std::mutex> guard(lock_); |
301 | DCHECK_GT(nbytes, 0); |
302 | |
303 | if (nbytes < buffer_size_) { |
304 | // Pre-buffer for small reads |
305 | RETURN_NOT_OK(BufferIfNeeded()); |
306 | } |
307 | |
308 | if (nbytes > bytes_buffered_) { |
309 | // Copy buffered bytes into out, then read rest |
310 | memcpy(out, buffer_data_ + buffer_pos_, bytes_buffered_); |
311 | RETURN_NOT_OK(raw_->Read(nbytes - bytes_buffered_, bytes_read, |
312 | reinterpret_cast<uint8_t*>(out) + bytes_buffered_)); |
313 | // Do not make assumptions about the raw stream position |
314 | raw_pos_ = -1; |
315 | *bytes_read += bytes_buffered_; |
316 | RewindBuffer(); |
317 | } else { |
318 | memcpy(out, buffer_data_ + buffer_pos_, nbytes); |
319 | *bytes_read = nbytes; |
320 | ConsumeBuffer(nbytes); |
321 | } |
322 | return Status::OK(); |
323 | } |
324 | |
325 | Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
326 | std::shared_ptr<ResizableBuffer> buffer; |
327 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); |
328 | |
329 | int64_t bytes_read = 0; |
330 | RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); |
331 | |
332 | if (bytes_read < nbytes) { |
333 | // Change size but do not reallocate internal capacity |
334 | RETURN_NOT_OK(buffer->Resize(bytes_read, false /* shrink_to_fit */)); |
335 | buffer->ZeroPadding(); |
336 | } |
337 | |
338 | *out = buffer; |
339 | return Status::OK(); |
340 | } |
341 | |
342 | // For providing access to the raw file handles |
343 | std::shared_ptr<InputStream> raw() const { return raw_; } |
344 | |
345 | private: |
346 | std::shared_ptr<InputStream> raw_; |
347 | |
348 | // Number of remaining bytes in the buffer, to be reduced on each read from |
349 | // the buffer |
350 | int64_t bytes_buffered_; |
351 | }; |
352 | |
353 | BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw, |
354 | MemoryPool* pool) { |
355 | impl_.reset(new Impl(std::move(raw), pool)); |
356 | } |
357 | |
358 | BufferedInputStream::~BufferedInputStream() { DCHECK_OK(impl_->Close()); } |
359 | |
360 | Status BufferedInputStream::Create(int64_t buffer_size, MemoryPool* pool, |
361 | std::shared_ptr<InputStream> raw, |
362 | std::shared_ptr<BufferedInputStream>* out) { |
363 | auto result = |
364 | std::shared_ptr<BufferedInputStream>(new BufferedInputStream(std::move(raw), pool)); |
365 | RETURN_NOT_OK(result->SetBufferSize(buffer_size)); |
366 | *out = std::move(result); |
367 | return Status::OK(); |
368 | } |
369 | |
370 | Status BufferedInputStream::Close() { return impl_->Close(); } |
371 | |
372 | bool BufferedInputStream::closed() const { return impl_->closed(); } |
373 | |
374 | std::shared_ptr<InputStream> BufferedInputStream::Detach() { return impl_->Detach(); } |
375 | |
376 | std::shared_ptr<InputStream> BufferedInputStream::raw() const { return impl_->raw(); } |
377 | |
378 | Status BufferedInputStream::Tell(int64_t* position) const { |
379 | return impl_->Tell(position); |
380 | } |
381 | |
382 | util::string_view BufferedInputStream::Peek(int64_t nbytes) const { |
383 | return impl_->Peek(nbytes); |
384 | } |
385 | |
386 | Status BufferedInputStream::SetBufferSize(int64_t new_buffer_size) { |
387 | return impl_->SetBufferSize(new_buffer_size); |
388 | } |
389 | |
390 | int64_t BufferedInputStream::bytes_buffered() const { return impl_->bytes_buffered(); } |
391 | |
392 | int64_t BufferedInputStream::buffer_size() const { return impl_->buffer_size(); } |
393 | |
394 | Status BufferedInputStream::Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
395 | return impl_->Read(nbytes, bytes_read, out); |
396 | } |
397 | |
398 | Status BufferedInputStream::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
399 | return impl_->Read(nbytes, out); |
400 | } |
401 | |
402 | } // namespace io |
403 | } // namespace arrow |
404 | |