1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep
19
20// sys/mman.h not present in Visual Studio or Cygwin
21#ifdef _WIN32
22#ifndef NOMINMAX
23#define NOMINMAX
24#endif
25#include "arrow/io/mman.h"
26#undef Realloc
27#undef Free
28#else
29#include <sys/mman.h>
30#include <unistd.h> // IWYU pragma: keep
31#endif
32
33#include <algorithm>
34#include <cerrno>
35#include <cstdint>
36#include <cstring>
37#include <memory>
38#include <mutex>
39#include <sstream>
40#include <string>
41
42// ----------------------------------------------------------------------
43// Other Arrow includes
44
45#include "arrow/io/file.h"
46#include "arrow/io/interfaces.h"
47
48#include "arrow/buffer.h"
49#include "arrow/memory_pool.h"
50#include "arrow/status.h"
51#include "arrow/util/io-util.h"
52#include "arrow/util/logging.h"
53
54namespace arrow {
55namespace io {
56
57class OSFile {
58 public:
59 OSFile() : fd_(-1), is_open_(false), size_(-1) {}
60
61 ~OSFile() {}
62
63 // Note: only one of the Open* methods below may be called on a given instance
64
65 Status OpenWritable(const std::string& path, bool truncate, bool append,
66 bool write_only) {
67 RETURN_NOT_OK(SetFileName(path));
68
69 RETURN_NOT_OK(
70 internal::FileOpenWritable(file_name_, write_only, truncate, append, &fd_));
71 is_open_ = true;
72 mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
73
74 if (!truncate) {
75 RETURN_NOT_OK(internal::FileGetSize(fd_, &size_));
76 } else {
77 size_ = 0;
78 }
79 return Status::OK();
80 }
81
82 // This is different from OpenWritable(string, ...) in that it doesn't
83 // truncate nor mandate a seekable file
84 Status OpenWritable(int fd) {
85 if (!internal::FileGetSize(fd, &size_).ok()) {
86 // Non-seekable file
87 size_ = -1;
88 }
89 RETURN_NOT_OK(SetFileName(fd));
90 is_open_ = true;
91 mode_ = FileMode::WRITE;
92 fd_ = fd;
93 return Status::OK();
94 }
95
96 Status OpenReadable(const std::string& path) {
97 RETURN_NOT_OK(SetFileName(path));
98
99 RETURN_NOT_OK(internal::FileOpenReadable(file_name_, &fd_));
100 RETURN_NOT_OK(internal::FileGetSize(fd_, &size_));
101
102 is_open_ = true;
103 mode_ = FileMode::READ;
104 return Status::OK();
105 }
106
107 Status OpenReadable(int fd) {
108 RETURN_NOT_OK(internal::FileGetSize(fd, &size_));
109 RETURN_NOT_OK(SetFileName(fd));
110 is_open_ = true;
111 mode_ = FileMode::READ;
112 fd_ = fd;
113 return Status::OK();
114 }
115
116 Status Close() {
117 if (is_open_) {
118 // Even if closing fails, the fd will likely be closed (perhaps it's
119 // already closed).
120 is_open_ = false;
121 RETURN_NOT_OK(internal::FileClose(fd_));
122 }
123 return Status::OK();
124 }
125
126 Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
127 return internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes, bytes_read);
128 }
129
130 Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
131 return internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position, nbytes,
132 bytes_read);
133 }
134
135 Status Seek(int64_t pos) {
136 if (pos < 0) {
137 return Status::Invalid("Invalid position");
138 }
139 return internal::FileSeek(fd_, pos);
140 }
141
142 Status Tell(int64_t* pos) const { return internal::FileTell(fd_, pos); }
143
144 Status Write(const void* data, int64_t length) {
145 std::lock_guard<std::mutex> guard(lock_);
146 if (length < 0) {
147 return Status::IOError("Length must be non-negative");
148 }
149 return internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data), length);
150 }
151
152 int fd() const { return fd_; }
153
154 bool is_open() const { return is_open_; }
155
156 int64_t size() const { return size_; }
157
158 FileMode::type mode() const { return mode_; }
159
160 std::mutex& lock() { return lock_; }
161
162 protected:
163 Status SetFileName(const std::string& file_name) {
164 return internal::FileNameFromString(file_name, &file_name_);
165 }
166 Status SetFileName(int fd) {
167 std::stringstream ss;
168 ss << "<fd " << fd << ">";
169 return SetFileName(ss.str());
170 }
171
172 internal::PlatformFilename file_name_;
173
174 std::mutex lock_;
175
176 // File descriptor
177 int fd_;
178
179 FileMode::type mode_;
180
181 bool is_open_;
182 int64_t size_;
183};
184
185// ----------------------------------------------------------------------
186// ReadableFile implementation
187
188class ReadableFile::ReadableFileImpl : public OSFile {
189 public:
190 explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {}
191
192 Status Open(const std::string& path) { return OpenReadable(path); }
193 Status Open(int fd) { return OpenReadable(fd); }
194
195 Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) {
196 std::shared_ptr<ResizableBuffer> buffer;
197 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
198
199 int64_t bytes_read = 0;
200 RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
201 if (bytes_read < nbytes) {
202 RETURN_NOT_OK(buffer->Resize(bytes_read));
203 buffer->ZeroPadding();
204 }
205 *out = buffer;
206 return Status::OK();
207 }
208
209 Status ReadBufferAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
210 std::shared_ptr<ResizableBuffer> buffer;
211 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
212
213 int64_t bytes_read = 0;
214 RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
215 if (bytes_read < nbytes) {
216 RETURN_NOT_OK(buffer->Resize(bytes_read));
217 buffer->ZeroPadding();
218 }
219 *out = buffer;
220 return Status::OK();
221 }
222
223 private:
224 MemoryPool* pool_;
225};
226
227ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(pool)); }
228
229ReadableFile::~ReadableFile() { DCHECK(impl_->Close().ok()); }
230
231Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile>* file) {
232 return Open(path, default_memory_pool(), file);
233}
234
235Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
236 std::shared_ptr<ReadableFile>* file) {
237 *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool));
238 return (*file)->impl_->Open(path);
239}
240
241Status ReadableFile::Open(int fd, MemoryPool* memory_pool,
242 std::shared_ptr<ReadableFile>* file) {
243 *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool));
244 return (*file)->impl_->Open(fd);
245}
246
247Status ReadableFile::Open(int fd, std::shared_ptr<ReadableFile>* file) {
248 return Open(fd, default_memory_pool(), file);
249}
250
251Status ReadableFile::Close() { return impl_->Close(); }
252
253bool ReadableFile::closed() const { return !impl_->is_open(); }
254
255Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); }
256
257Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
258 std::lock_guard<std::mutex> guard(impl_->lock());
259 return impl_->Read(nbytes, bytes_read, out);
260}
261
262Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
263 void* out) {
264 return impl_->ReadAt(position, nbytes, bytes_read, out);
265}
266
267Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
268 std::shared_ptr<Buffer>* out) {
269 return impl_->ReadBufferAt(position, nbytes, out);
270}
271
272Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
273 std::lock_guard<std::mutex> guard(impl_->lock());
274 return impl_->ReadBuffer(nbytes, out);
275}
276
277Status ReadableFile::GetSize(int64_t* size) {
278 *size = impl_->size();
279 return Status::OK();
280}
281
282Status ReadableFile::Seek(int64_t pos) { return impl_->Seek(pos); }
283
284int ReadableFile::file_descriptor() const { return impl_->fd(); }
285
286// ----------------------------------------------------------------------
287// FileOutputStream
288
289class FileOutputStream::FileOutputStreamImpl : public OSFile {
290 public:
291 Status Open(const std::string& path, bool append) {
292 const bool truncate = !append;
293 return OpenWritable(path, truncate, append, true /* write_only */);
294 }
295 Status Open(int fd) { return OpenWritable(fd); }
296};
297
298FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); }
299
300FileOutputStream::~FileOutputStream() {
301 // This can fail; better to explicitly call close
302 DCHECK(impl_->Close().ok());
303}
304
305Status FileOutputStream::Open(const std::string& path,
306 std::shared_ptr<OutputStream>* file) {
307 return Open(path, false, file);
308}
309
310Status FileOutputStream::Open(const std::string& path, bool append,
311 std::shared_ptr<OutputStream>* out) {
312 *out = std::shared_ptr<FileOutputStream>(new FileOutputStream());
313 return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(path, append);
314}
315
316Status FileOutputStream::Open(int fd, std::shared_ptr<OutputStream>* out) {
317 *out = std::shared_ptr<FileOutputStream>(new FileOutputStream());
318 return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(fd);
319}
320
321Status FileOutputStream::Open(const std::string& path,
322 std::shared_ptr<FileOutputStream>* file) {
323 return Open(path, false, file);
324}
325
326Status FileOutputStream::Open(const std::string& path, bool append,
327 std::shared_ptr<FileOutputStream>* file) {
328 // private ctor
329 *file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
330 return (*file)->impl_->Open(path, append);
331}
332
333Status FileOutputStream::Open(int fd, std::shared_ptr<FileOutputStream>* file) {
334 *file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
335 return (*file)->impl_->Open(fd);
336}
337
338Status FileOutputStream::Close() { return impl_->Close(); }
339
340bool FileOutputStream::closed() const { return !impl_->is_open(); }
341
342Status FileOutputStream::Tell(int64_t* pos) const { return impl_->Tell(pos); }
343
344Status FileOutputStream::Write(const void* data, int64_t length) {
345 return impl_->Write(data, length);
346}
347
348int FileOutputStream::file_descriptor() const { return impl_->fd(); }
349
350// ----------------------------------------------------------------------
351// Implement MemoryMappedFile as a buffer subclass
352// The class doesn't differentiate between size and capacity
353class MemoryMappedFile::MemoryMap : public MutableBuffer {
354 public:
355 MemoryMap() : MutableBuffer(nullptr, 0) {}
356
357 ~MemoryMap() {
358 DCHECK_OK(Close());
359 if (mutable_data_ != nullptr) {
360 DCHECK_EQ(munmap(mutable_data_, static_cast<size_t>(size_)), 0);
361 }
362 }
363
364 Status Close() {
365 if (file_->is_open()) {
366 // NOTE: we don't unmap here, so that buffers exported through Read()
367 // remain valid until the MemoryMap object is destroyed
368 return file_->Close();
369 } else {
370 return Status::OK();
371 }
372 }
373
374 bool closed() const { return !file_->is_open(); }
375
376 Status Open(const std::string& path, FileMode::type mode) {
377 file_.reset(new OSFile());
378
379 if (mode != FileMode::READ) {
380 // Memory mapping has permission failures if PROT_READ not set
381 prot_flags_ = PROT_READ | PROT_WRITE;
382 map_mode_ = MAP_SHARED;
383 constexpr bool append = false;
384 constexpr bool truncate = false;
385 constexpr bool write_only = false;
386 RETURN_NOT_OK(file_->OpenWritable(path, truncate, append, write_only));
387
388 is_mutable_ = true;
389 } else {
390 prot_flags_ = PROT_READ;
391 map_mode_ = MAP_PRIVATE; // Changes are not to be committed back to the file
392 RETURN_NOT_OK(file_->OpenReadable(path));
393
394 is_mutable_ = false;
395 }
396
397 // Memory mapping fails when file size is 0
398 // delay it until the first resize
399 if (file_->size() > 0) {
400 RETURN_NOT_OK(InitMMap(file_->size()));
401 }
402
403 position_ = 0;
404
405 return Status::OK();
406 }
407
408 // Resize the mmap and file to the specified size.
409 Status Resize(const int64_t new_size) {
410 if (!writable()) {
411 return Status::IOError("Cannot resize a readonly memory map");
412 }
413
414 if (new_size == 0) {
415 if (mutable_data_ != nullptr) {
416 // just unmap the mmap and truncate the file to 0 size
417 if (munmap(mutable_data_, capacity_) != 0) {
418 return Status::IOError("Cannot unmap the file");
419 }
420 RETURN_NOT_OK(internal::FileTruncate(file_->fd(), 0));
421 data_ = mutable_data_ = nullptr;
422 size_ = capacity_ = 0;
423 }
424 position_ = 0;
425 return Status::OK();
426 }
427
428 if (mutable_data_) {
429 void* result;
430 RETURN_NOT_OK(
431 internal::MemoryMapRemap(mutable_data_, size_, new_size, file_->fd(), &result));
432 size_ = capacity_ = new_size;
433 data_ = mutable_data_ = static_cast<uint8_t*>(result);
434 if (position_ > size_) {
435 position_ = size_;
436 }
437 } else {
438 DCHECK_EQ(position_, 0);
439 // the mmap is not yet initialized, resize the underlying
440 // file, since it might have been 0-sized
441 RETURN_NOT_OK(InitMMap(new_size, /*resize_file*/ true));
442 }
443 return Status::OK();
444 }
445
446 int64_t size() const { return size_; }
447
448 Status Seek(int64_t position) {
449 if (position < 0) {
450 return Status::Invalid("position is out of bounds");
451 }
452 position_ = position;
453 return Status::OK();
454 }
455
456 int64_t position() { return position_; }
457
458 void advance(int64_t nbytes) { position_ = position_ + nbytes; }
459
460 uint8_t* head() { return mutable_data_ + position_; }
461
462 bool writable() { return file_->mode() != FileMode::READ; }
463
464 bool opened() { return file_->is_open(); }
465
466 int fd() const { return file_->fd(); }
467
468 std::mutex& write_lock() { return file_->lock(); }
469
470 std::mutex& resize_lock() { return resize_lock_; }
471
472 private:
473 // Initialize the mmap and set size, capacity and the data pointers
474 Status InitMMap(int64_t initial_size, bool resize_file = false) {
475 if (resize_file) {
476 RETURN_NOT_OK(internal::FileTruncate(file_->fd(), initial_size));
477 }
478 DCHECK(data_ == nullptr && mutable_data_ == nullptr);
479 void* result = mmap(nullptr, static_cast<size_t>(initial_size), prot_flags_,
480 map_mode_, file_->fd(), 0);
481 if (result == MAP_FAILED) {
482 return Status::IOError("Memory mapping file failed: ", std::strerror(errno));
483 }
484 size_ = capacity_ = initial_size;
485 data_ = mutable_data_ = static_cast<uint8_t*>(result);
486
487 return Status::OK();
488 }
489 std::unique_ptr<OSFile> file_;
490 int prot_flags_;
491 int map_mode_;
492 int64_t position_;
493 std::mutex resize_lock_;
494};
495
496MemoryMappedFile::MemoryMappedFile() {}
497MemoryMappedFile::~MemoryMappedFile() {}
498
499Status MemoryMappedFile::Create(const std::string& path, int64_t size,
500 std::shared_ptr<MemoryMappedFile>* out) {
501 std::shared_ptr<FileOutputStream> file;
502 RETURN_NOT_OK(FileOutputStream::Open(path, &file));
503
504 RETURN_NOT_OK(internal::FileTruncate(file->file_descriptor(), size));
505
506 RETURN_NOT_OK(file->Close());
507 return MemoryMappedFile::Open(path, FileMode::READWRITE, out);
508}
509
510Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
511 std::shared_ptr<MemoryMappedFile>* out) {
512 std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
513
514 result->memory_map_.reset(new MemoryMap());
515 RETURN_NOT_OK(result->memory_map_->Open(path, mode));
516
517 *out = result;
518 return Status::OK();
519}
520
521Status MemoryMappedFile::GetSize(int64_t* size) const {
522 *size = memory_map_->size();
523 return Status::OK();
524}
525
526Status MemoryMappedFile::GetSize(int64_t* size) {
527 return static_cast<const MemoryMappedFile*>(this)->GetSize(size);
528}
529
530Status MemoryMappedFile::Tell(int64_t* position) const {
531 *position = memory_map_->position();
532 return Status::OK();
533}
534
535Status MemoryMappedFile::Seek(int64_t position) { return memory_map_->Seek(position); }
536
537Status MemoryMappedFile::Close() { return memory_map_->Close(); }
538
539bool MemoryMappedFile::closed() const { return memory_map_->closed(); }
540
541Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
542 std::shared_ptr<Buffer>* out) {
543 // if the file is writable, we acquire the lock before creating any slices
544 // in case a resize is triggered concurrently, otherwise we wouldn't detect
545 // a change in the use count
546 auto guard_resize = memory_map_->writable()
547 ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
548 : std::unique_lock<std::mutex>();
549 nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
550
551 if (nbytes > 0) {
552 *out = SliceBuffer(memory_map_, position, nbytes);
553 } else {
554 *out = std::make_shared<Buffer>(nullptr, 0);
555 }
556 return Status::OK();
557}
558
559Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
560 void* out) {
561 auto guard_resize = memory_map_->writable()
562 ? std::unique_lock<std::mutex>(memory_map_->resize_lock())
563 : std::unique_lock<std::mutex>();
564 nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
565 if (nbytes > 0) {
566 memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
567 }
568 *bytes_read = nbytes;
569 return Status::OK();
570}
571
572Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
573 RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out));
574 memory_map_->advance(*bytes_read);
575 return Status::OK();
576}
577
578Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
579 RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, out));
580 memory_map_->advance((*out)->size());
581 return Status::OK();
582}
583
584bool MemoryMappedFile::supports_zero_copy() const { return true; }
585
586Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) {
587 std::lock_guard<std::mutex> guard(memory_map_->write_lock());
588
589 if (!memory_map_->opened() || !memory_map_->writable()) {
590 return Status::IOError("Unable to write");
591 }
592 if (position + nbytes > memory_map_->size()) {
593 return Status::Invalid("Cannot write past end of memory map");
594 }
595
596 RETURN_NOT_OK(memory_map_->Seek(position));
597 if (nbytes + memory_map_->position() > memory_map_->size()) {
598 return Status::Invalid("Cannot write past end of memory map");
599 }
600
601 return WriteInternal(data, nbytes);
602}
603
604Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
605 std::lock_guard<std::mutex> guard(memory_map_->write_lock());
606
607 if (!memory_map_->opened() || !memory_map_->writable()) {
608 return Status::IOError("Unable to write");
609 }
610 if (nbytes + memory_map_->position() > memory_map_->size()) {
611 return Status::Invalid("Cannot write past end of memory map");
612 }
613
614 return WriteInternal(data, nbytes);
615}
616
617Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) {
618 memcpy(memory_map_->head(), data, static_cast<size_t>(nbytes));
619 memory_map_->advance(nbytes);
620 return Status::OK();
621}
622
623Status MemoryMappedFile::Resize(int64_t new_size) {
624 std::unique_lock<std::mutex> write_guard(memory_map_->write_lock(), std::defer_lock);
625 std::unique_lock<std::mutex> resize_guard(memory_map_->resize_lock(), std::defer_lock);
626 std::lock(write_guard, resize_guard);
627 // having both locks, we can check the number of times memory_map_
628 // was borrwed (meaning number of reader still holding a ref to it + 1)
629 // and if it's greater than 1, we fail loudly
630 if (memory_map_.use_count() > 1) {
631 return Status::IOError("Cannot resize memory map while there are active readers");
632 }
633 RETURN_NOT_OK(memory_map_->Resize(new_size));
634 return Status::OK();
635}
636
637int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); }
638
639} // namespace io
640} // namespace arrow
641