1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/util/windows_compatibility.h" // IWYU pragma: keep |
19 | |
20 | // sys/mman.h not present in Visual Studio or Cygwin |
21 | #ifdef _WIN32 |
22 | #ifndef NOMINMAX |
23 | #define NOMINMAX |
24 | #endif |
25 | #include "arrow/io/mman.h" |
26 | #undef Realloc |
27 | #undef Free |
28 | #else |
29 | #include <sys/mman.h> |
30 | #include <unistd.h> // IWYU pragma: keep |
31 | #endif |
32 | |
33 | #include <algorithm> |
34 | #include <cerrno> |
35 | #include <cstdint> |
36 | #include <cstring> |
37 | #include <memory> |
38 | #include <mutex> |
39 | #include <sstream> |
40 | #include <string> |
41 | |
42 | // ---------------------------------------------------------------------- |
43 | // Other Arrow includes |
44 | |
45 | #include "arrow/io/file.h" |
46 | #include "arrow/io/interfaces.h" |
47 | |
48 | #include "arrow/buffer.h" |
49 | #include "arrow/memory_pool.h" |
50 | #include "arrow/status.h" |
51 | #include "arrow/util/io-util.h" |
52 | #include "arrow/util/logging.h" |
53 | |
54 | namespace arrow { |
55 | namespace io { |
56 | |
57 | class OSFile { |
58 | public: |
59 | OSFile() : fd_(-1), is_open_(false), size_(-1) {} |
60 | |
61 | ~OSFile() {} |
62 | |
63 | // Note: only one of the Open* methods below may be called on a given instance |
64 | |
65 | Status OpenWritable(const std::string& path, bool truncate, bool append, |
66 | bool write_only) { |
67 | RETURN_NOT_OK(SetFileName(path)); |
68 | |
69 | RETURN_NOT_OK( |
70 | internal::FileOpenWritable(file_name_, write_only, truncate, append, &fd_)); |
71 | is_open_ = true; |
72 | mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE; |
73 | |
74 | if (!truncate) { |
75 | RETURN_NOT_OK(internal::FileGetSize(fd_, &size_)); |
76 | } else { |
77 | size_ = 0; |
78 | } |
79 | return Status::OK(); |
80 | } |
81 | |
82 | // This is different from OpenWritable(string, ...) in that it doesn't |
83 | // truncate nor mandate a seekable file |
84 | Status OpenWritable(int fd) { |
85 | if (!internal::FileGetSize(fd, &size_).ok()) { |
86 | // Non-seekable file |
87 | size_ = -1; |
88 | } |
89 | RETURN_NOT_OK(SetFileName(fd)); |
90 | is_open_ = true; |
91 | mode_ = FileMode::WRITE; |
92 | fd_ = fd; |
93 | return Status::OK(); |
94 | } |
95 | |
96 | Status OpenReadable(const std::string& path) { |
97 | RETURN_NOT_OK(SetFileName(path)); |
98 | |
99 | RETURN_NOT_OK(internal::FileOpenReadable(file_name_, &fd_)); |
100 | RETURN_NOT_OK(internal::FileGetSize(fd_, &size_)); |
101 | |
102 | is_open_ = true; |
103 | mode_ = FileMode::READ; |
104 | return Status::OK(); |
105 | } |
106 | |
107 | Status OpenReadable(int fd) { |
108 | RETURN_NOT_OK(internal::FileGetSize(fd, &size_)); |
109 | RETURN_NOT_OK(SetFileName(fd)); |
110 | is_open_ = true; |
111 | mode_ = FileMode::READ; |
112 | fd_ = fd; |
113 | return Status::OK(); |
114 | } |
115 | |
116 | Status Close() { |
117 | if (is_open_) { |
118 | // Even if closing fails, the fd will likely be closed (perhaps it's |
119 | // already closed). |
120 | is_open_ = false; |
121 | RETURN_NOT_OK(internal::FileClose(fd_)); |
122 | } |
123 | return Status::OK(); |
124 | } |
125 | |
126 | Status Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
127 | return internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read); |
128 | } |
129 | |
130 | Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { |
131 | return internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position, nbytes, |
132 | bytes_read); |
133 | } |
134 | |
135 | Status Seek(int64_t pos) { |
136 | if (pos < 0) { |
137 | return Status::Invalid("Invalid position" ); |
138 | } |
139 | return internal::FileSeek(fd_, pos); |
140 | } |
141 | |
142 | Status Tell(int64_t* pos) const { return internal::FileTell(fd_, pos); } |
143 | |
144 | Status Write(const void* data, int64_t length) { |
145 | std::lock_guard<std::mutex> guard(lock_); |
146 | if (length < 0) { |
147 | return Status::IOError("Length must be non-negative" ); |
148 | } |
149 | return internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data), length); |
150 | } |
151 | |
152 | int fd() const { return fd_; } |
153 | |
154 | bool is_open() const { return is_open_; } |
155 | |
156 | int64_t size() const { return size_; } |
157 | |
158 | FileMode::type mode() const { return mode_; } |
159 | |
160 | std::mutex& lock() { return lock_; } |
161 | |
162 | protected: |
163 | Status SetFileName(const std::string& file_name) { |
164 | return internal::FileNameFromString(file_name, &file_name_); |
165 | } |
166 | Status SetFileName(int fd) { |
167 | std::stringstream ss; |
168 | ss << "<fd " << fd << ">" ; |
169 | return SetFileName(ss.str()); |
170 | } |
171 | |
172 | internal::PlatformFilename file_name_; |
173 | |
174 | std::mutex lock_; |
175 | |
176 | // File descriptor |
177 | int fd_; |
178 | |
179 | FileMode::type mode_; |
180 | |
181 | bool is_open_; |
182 | int64_t size_; |
183 | }; |
184 | |
185 | // ---------------------------------------------------------------------- |
186 | // ReadableFile implementation |
187 | |
188 | class ReadableFile::ReadableFileImpl : public OSFile { |
189 | public: |
190 | explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {} |
191 | |
192 | Status Open(const std::string& path) { return OpenReadable(path); } |
193 | Status Open(int fd) { return OpenReadable(fd); } |
194 | |
195 | Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
196 | std::shared_ptr<ResizableBuffer> buffer; |
197 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); |
198 | |
199 | int64_t bytes_read = 0; |
200 | RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); |
201 | if (bytes_read < nbytes) { |
202 | RETURN_NOT_OK(buffer->Resize(bytes_read)); |
203 | buffer->ZeroPadding(); |
204 | } |
205 | *out = buffer; |
206 | return Status::OK(); |
207 | } |
208 | |
209 | Status ReadBufferAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) { |
210 | std::shared_ptr<ResizableBuffer> buffer; |
211 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); |
212 | |
213 | int64_t bytes_read = 0; |
214 | RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); |
215 | if (bytes_read < nbytes) { |
216 | RETURN_NOT_OK(buffer->Resize(bytes_read)); |
217 | buffer->ZeroPadding(); |
218 | } |
219 | *out = buffer; |
220 | return Status::OK(); |
221 | } |
222 | |
223 | private: |
224 | MemoryPool* pool_; |
225 | }; |
226 | |
227 | ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(pool)); } |
228 | |
229 | ReadableFile::~ReadableFile() { DCHECK(impl_->Close().ok()); } |
230 | |
231 | Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile>* file) { |
232 | return Open(path, default_memory_pool(), file); |
233 | } |
234 | |
235 | Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool, |
236 | std::shared_ptr<ReadableFile>* file) { |
237 | *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool)); |
238 | return (*file)->impl_->Open(path); |
239 | } |
240 | |
241 | Status ReadableFile::Open(int fd, MemoryPool* memory_pool, |
242 | std::shared_ptr<ReadableFile>* file) { |
243 | *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool)); |
244 | return (*file)->impl_->Open(fd); |
245 | } |
246 | |
247 | Status ReadableFile::Open(int fd, std::shared_ptr<ReadableFile>* file) { |
248 | return Open(fd, default_memory_pool(), file); |
249 | } |
250 | |
251 | Status ReadableFile::Close() { return impl_->Close(); } |
252 | |
253 | bool ReadableFile::closed() const { return !impl_->is_open(); } |
254 | |
255 | Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); } |
256 | |
257 | Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
258 | std::lock_guard<std::mutex> guard(impl_->lock()); |
259 | return impl_->Read(nbytes, bytes_read, out); |
260 | } |
261 | |
262 | Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, |
263 | void* out) { |
264 | return impl_->ReadAt(position, nbytes, bytes_read, out); |
265 | } |
266 | |
267 | Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, |
268 | std::shared_ptr<Buffer>* out) { |
269 | return impl_->ReadBufferAt(position, nbytes, out); |
270 | } |
271 | |
272 | Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
273 | std::lock_guard<std::mutex> guard(impl_->lock()); |
274 | return impl_->ReadBuffer(nbytes, out); |
275 | } |
276 | |
277 | Status ReadableFile::GetSize(int64_t* size) { |
278 | *size = impl_->size(); |
279 | return Status::OK(); |
280 | } |
281 | |
282 | Status ReadableFile::Seek(int64_t pos) { return impl_->Seek(pos); } |
283 | |
284 | int ReadableFile::file_descriptor() const { return impl_->fd(); } |
285 | |
286 | // ---------------------------------------------------------------------- |
287 | // FileOutputStream |
288 | |
289 | class FileOutputStream::FileOutputStreamImpl : public OSFile { |
290 | public: |
291 | Status Open(const std::string& path, bool append) { |
292 | const bool truncate = !append; |
293 | return OpenWritable(path, truncate, append, true /* write_only */); |
294 | } |
295 | Status Open(int fd) { return OpenWritable(fd); } |
296 | }; |
297 | |
298 | FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); } |
299 | |
300 | FileOutputStream::~FileOutputStream() { |
301 | // This can fail; better to explicitly call close |
302 | DCHECK(impl_->Close().ok()); |
303 | } |
304 | |
305 | Status FileOutputStream::Open(const std::string& path, |
306 | std::shared_ptr<OutputStream>* file) { |
307 | return Open(path, false, file); |
308 | } |
309 | |
310 | Status FileOutputStream::Open(const std::string& path, bool append, |
311 | std::shared_ptr<OutputStream>* out) { |
312 | *out = std::shared_ptr<FileOutputStream>(new FileOutputStream()); |
313 | return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(path, append); |
314 | } |
315 | |
316 | Status FileOutputStream::Open(int fd, std::shared_ptr<OutputStream>* out) { |
317 | *out = std::shared_ptr<FileOutputStream>(new FileOutputStream()); |
318 | return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(fd); |
319 | } |
320 | |
321 | Status FileOutputStream::Open(const std::string& path, |
322 | std::shared_ptr<FileOutputStream>* file) { |
323 | return Open(path, false, file); |
324 | } |
325 | |
326 | Status FileOutputStream::Open(const std::string& path, bool append, |
327 | std::shared_ptr<FileOutputStream>* file) { |
328 | // private ctor |
329 | *file = std::shared_ptr<FileOutputStream>(new FileOutputStream()); |
330 | return (*file)->impl_->Open(path, append); |
331 | } |
332 | |
333 | Status FileOutputStream::Open(int fd, std::shared_ptr<FileOutputStream>* file) { |
334 | *file = std::shared_ptr<FileOutputStream>(new FileOutputStream()); |
335 | return (*file)->impl_->Open(fd); |
336 | } |
337 | |
338 | Status FileOutputStream::Close() { return impl_->Close(); } |
339 | |
340 | bool FileOutputStream::closed() const { return !impl_->is_open(); } |
341 | |
342 | Status FileOutputStream::Tell(int64_t* pos) const { return impl_->Tell(pos); } |
343 | |
344 | Status FileOutputStream::Write(const void* data, int64_t length) { |
345 | return impl_->Write(data, length); |
346 | } |
347 | |
348 | int FileOutputStream::file_descriptor() const { return impl_->fd(); } |
349 | |
350 | // ---------------------------------------------------------------------- |
351 | // Implement MemoryMappedFile as a buffer subclass |
352 | // The class doesn't differentiate between size and capacity |
353 | class MemoryMappedFile::MemoryMap : public MutableBuffer { |
354 | public: |
355 | MemoryMap() : MutableBuffer(nullptr, 0) {} |
356 | |
357 | ~MemoryMap() { |
358 | DCHECK_OK(Close()); |
359 | if (mutable_data_ != nullptr) { |
360 | DCHECK_EQ(munmap(mutable_data_, static_cast<size_t>(size_)), 0); |
361 | } |
362 | } |
363 | |
364 | Status Close() { |
365 | if (file_->is_open()) { |
366 | // NOTE: we don't unmap here, so that buffers exported through Read() |
367 | // remain valid until the MemoryMap object is destroyed |
368 | return file_->Close(); |
369 | } else { |
370 | return Status::OK(); |
371 | } |
372 | } |
373 | |
374 | bool closed() const { return !file_->is_open(); } |
375 | |
376 | Status Open(const std::string& path, FileMode::type mode) { |
377 | file_.reset(new OSFile()); |
378 | |
379 | if (mode != FileMode::READ) { |
380 | // Memory mapping has permission failures if PROT_READ not set |
381 | prot_flags_ = PROT_READ | PROT_WRITE; |
382 | map_mode_ = MAP_SHARED; |
383 | constexpr bool append = false; |
384 | constexpr bool truncate = false; |
385 | constexpr bool write_only = false; |
386 | RETURN_NOT_OK(file_->OpenWritable(path, truncate, append, write_only)); |
387 | |
388 | is_mutable_ = true; |
389 | } else { |
390 | prot_flags_ = PROT_READ; |
391 | map_mode_ = MAP_PRIVATE; // Changes are not to be committed back to the file |
392 | RETURN_NOT_OK(file_->OpenReadable(path)); |
393 | |
394 | is_mutable_ = false; |
395 | } |
396 | |
397 | // Memory mapping fails when file size is 0 |
398 | // delay it until the first resize |
399 | if (file_->size() > 0) { |
400 | RETURN_NOT_OK(InitMMap(file_->size())); |
401 | } |
402 | |
403 | position_ = 0; |
404 | |
405 | return Status::OK(); |
406 | } |
407 | |
408 | // Resize the mmap and file to the specified size. |
409 | Status Resize(const int64_t new_size) { |
410 | if (!writable()) { |
411 | return Status::IOError("Cannot resize a readonly memory map" ); |
412 | } |
413 | |
414 | if (new_size == 0) { |
415 | if (mutable_data_ != nullptr) { |
416 | // just unmap the mmap and truncate the file to 0 size |
417 | if (munmap(mutable_data_, capacity_) != 0) { |
418 | return Status::IOError("Cannot unmap the file" ); |
419 | } |
420 | RETURN_NOT_OK(internal::FileTruncate(file_->fd(), 0)); |
421 | data_ = mutable_data_ = nullptr; |
422 | size_ = capacity_ = 0; |
423 | } |
424 | position_ = 0; |
425 | return Status::OK(); |
426 | } |
427 | |
428 | if (mutable_data_) { |
429 | void* result; |
430 | RETURN_NOT_OK( |
431 | internal::MemoryMapRemap(mutable_data_, size_, new_size, file_->fd(), &result)); |
432 | size_ = capacity_ = new_size; |
433 | data_ = mutable_data_ = static_cast<uint8_t*>(result); |
434 | if (position_ > size_) { |
435 | position_ = size_; |
436 | } |
437 | } else { |
438 | DCHECK_EQ(position_, 0); |
439 | // the mmap is not yet initialized, resize the underlying |
440 | // file, since it might have been 0-sized |
441 | RETURN_NOT_OK(InitMMap(new_size, /*resize_file*/ true)); |
442 | } |
443 | return Status::OK(); |
444 | } |
445 | |
446 | int64_t size() const { return size_; } |
447 | |
448 | Status Seek(int64_t position) { |
449 | if (position < 0) { |
450 | return Status::Invalid("position is out of bounds" ); |
451 | } |
452 | position_ = position; |
453 | return Status::OK(); |
454 | } |
455 | |
456 | int64_t position() { return position_; } |
457 | |
458 | void advance(int64_t nbytes) { position_ = position_ + nbytes; } |
459 | |
460 | uint8_t* head() { return mutable_data_ + position_; } |
461 | |
462 | bool writable() { return file_->mode() != FileMode::READ; } |
463 | |
464 | bool opened() { return file_->is_open(); } |
465 | |
466 | int fd() const { return file_->fd(); } |
467 | |
468 | std::mutex& write_lock() { return file_->lock(); } |
469 | |
470 | std::mutex& resize_lock() { return resize_lock_; } |
471 | |
472 | private: |
473 | // Initialize the mmap and set size, capacity and the data pointers |
474 | Status InitMMap(int64_t initial_size, bool resize_file = false) { |
475 | if (resize_file) { |
476 | RETURN_NOT_OK(internal::FileTruncate(file_->fd(), initial_size)); |
477 | } |
478 | DCHECK(data_ == nullptr && mutable_data_ == nullptr); |
479 | void* result = mmap(nullptr, static_cast<size_t>(initial_size), prot_flags_, |
480 | map_mode_, file_->fd(), 0); |
481 | if (result == MAP_FAILED) { |
482 | return Status::IOError("Memory mapping file failed: " , std::strerror(errno)); |
483 | } |
484 | size_ = capacity_ = initial_size; |
485 | data_ = mutable_data_ = static_cast<uint8_t*>(result); |
486 | |
487 | return Status::OK(); |
488 | } |
489 | std::unique_ptr<OSFile> file_; |
490 | int prot_flags_; |
491 | int map_mode_; |
492 | int64_t position_; |
493 | std::mutex resize_lock_; |
494 | }; |
495 | |
496 | MemoryMappedFile::MemoryMappedFile() {} |
497 | MemoryMappedFile::~MemoryMappedFile() {} |
498 | |
499 | Status MemoryMappedFile::Create(const std::string& path, int64_t size, |
500 | std::shared_ptr<MemoryMappedFile>* out) { |
501 | std::shared_ptr<FileOutputStream> file; |
502 | RETURN_NOT_OK(FileOutputStream::Open(path, &file)); |
503 | |
504 | RETURN_NOT_OK(internal::FileTruncate(file->file_descriptor(), size)); |
505 | |
506 | RETURN_NOT_OK(file->Close()); |
507 | return MemoryMappedFile::Open(path, FileMode::READWRITE, out); |
508 | } |
509 | |
510 | Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, |
511 | std::shared_ptr<MemoryMappedFile>* out) { |
512 | std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile()); |
513 | |
514 | result->memory_map_.reset(new MemoryMap()); |
515 | RETURN_NOT_OK(result->memory_map_->Open(path, mode)); |
516 | |
517 | *out = result; |
518 | return Status::OK(); |
519 | } |
520 | |
521 | Status MemoryMappedFile::GetSize(int64_t* size) const { |
522 | *size = memory_map_->size(); |
523 | return Status::OK(); |
524 | } |
525 | |
526 | Status MemoryMappedFile::GetSize(int64_t* size) { |
527 | return static_cast<const MemoryMappedFile*>(this)->GetSize(size); |
528 | } |
529 | |
530 | Status MemoryMappedFile::Tell(int64_t* position) const { |
531 | *position = memory_map_->position(); |
532 | return Status::OK(); |
533 | } |
534 | |
535 | Status MemoryMappedFile::Seek(int64_t position) { return memory_map_->Seek(position); } |
536 | |
537 | Status MemoryMappedFile::Close() { return memory_map_->Close(); } |
538 | |
539 | bool MemoryMappedFile::closed() const { return memory_map_->closed(); } |
540 | |
541 | Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, |
542 | std::shared_ptr<Buffer>* out) { |
543 | // if the file is writable, we acquire the lock before creating any slices |
544 | // in case a resize is triggered concurrently, otherwise we wouldn't detect |
545 | // a change in the use count |
546 | auto guard_resize = memory_map_->writable() |
547 | ? std::unique_lock<std::mutex>(memory_map_->resize_lock()) |
548 | : std::unique_lock<std::mutex>(); |
549 | nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position)); |
550 | |
551 | if (nbytes > 0) { |
552 | *out = SliceBuffer(memory_map_, position, nbytes); |
553 | } else { |
554 | *out = std::make_shared<Buffer>(nullptr, 0); |
555 | } |
556 | return Status::OK(); |
557 | } |
558 | |
559 | Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, |
560 | void* out) { |
561 | auto guard_resize = memory_map_->writable() |
562 | ? std::unique_lock<std::mutex>(memory_map_->resize_lock()) |
563 | : std::unique_lock<std::mutex>(); |
564 | nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position)); |
565 | if (nbytes > 0) { |
566 | memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes)); |
567 | } |
568 | *bytes_read = nbytes; |
569 | return Status::OK(); |
570 | } |
571 | |
572 | Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) { |
573 | RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out)); |
574 | memory_map_->advance(*bytes_read); |
575 | return Status::OK(); |
576 | } |
577 | |
578 | Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
579 | RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out)); |
580 | memory_map_->advance((*out)->size()); |
581 | return Status::OK(); |
582 | } |
583 | |
584 | bool MemoryMappedFile::supports_zero_copy() const { return true; } |
585 | |
586 | Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) { |
587 | std::lock_guard<std::mutex> guard(memory_map_->write_lock()); |
588 | |
589 | if (!memory_map_->opened() || !memory_map_->writable()) { |
590 | return Status::IOError("Unable to write" ); |
591 | } |
592 | if (position + nbytes > memory_map_->size()) { |
593 | return Status::Invalid("Cannot write past end of memory map" ); |
594 | } |
595 | |
596 | RETURN_NOT_OK(memory_map_->Seek(position)); |
597 | if (nbytes + memory_map_->position() > memory_map_->size()) { |
598 | return Status::Invalid("Cannot write past end of memory map" ); |
599 | } |
600 | |
601 | return WriteInternal(data, nbytes); |
602 | } |
603 | |
604 | Status MemoryMappedFile::Write(const void* data, int64_t nbytes) { |
605 | std::lock_guard<std::mutex> guard(memory_map_->write_lock()); |
606 | |
607 | if (!memory_map_->opened() || !memory_map_->writable()) { |
608 | return Status::IOError("Unable to write" ); |
609 | } |
610 | if (nbytes + memory_map_->position() > memory_map_->size()) { |
611 | return Status::Invalid("Cannot write past end of memory map" ); |
612 | } |
613 | |
614 | return WriteInternal(data, nbytes); |
615 | } |
616 | |
617 | Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) { |
618 | memcpy(memory_map_->head(), data, static_cast<size_t>(nbytes)); |
619 | memory_map_->advance(nbytes); |
620 | return Status::OK(); |
621 | } |
622 | |
623 | Status MemoryMappedFile::Resize(int64_t new_size) { |
624 | std::unique_lock<std::mutex> write_guard(memory_map_->write_lock(), std::defer_lock); |
625 | std::unique_lock<std::mutex> resize_guard(memory_map_->resize_lock(), std::defer_lock); |
626 | std::lock(write_guard, resize_guard); |
627 | // having both locks, we can check the number of times memory_map_ |
628 | // was borrwed (meaning number of reader still holding a ref to it + 1) |
629 | // and if it's greater than 1, we fail loudly |
630 | if (memory_map_.use_count() > 1) { |
631 | return Status::IOError("Cannot resize memory map while there are active readers" ); |
632 | } |
633 | RETURN_NOT_OK(memory_map_->Resize(new_size)); |
634 | return Status::OK(); |
635 | } |
636 | |
637 | int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); } |
638 | |
639 | } // namespace io |
640 | } // namespace arrow |
641 | |