1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef _WIN32
19#include <fcntl.h> // IWYU pragma: keep
20#include <unistd.h>
21#endif
22
23#include <atomic>
24#include <cstdint>
25#include <cstdio>
26#include <cstdlib>
27#include <cstring>
28#include <memory>
29#include <sstream> // IWYU pragma: keep
30#include <string>
31#include <thread>
32#include <vector>
33
34#include <gtest/gtest.h>
35
36#include "arrow/buffer.h"
37#include "arrow/io/file.h"
38#include "arrow/io/interfaces.h"
39#include "arrow/io/test-common.h"
40#include "arrow/memory_pool.h"
41#include "arrow/status.h"
42#include "arrow/test-util.h"
43#include "arrow/util/io-util.h"
44
45namespace arrow {
46namespace io {
47
48class FileTestFixture : public ::testing::Test {
49 public:
50 void SetUp() {
51 path_ = "arrow-test-io-file.txt";
52 EnsureFileDeleted();
53 }
54
55 void TearDown() { EnsureFileDeleted(); }
56
57 void EnsureFileDeleted() {
58 if (FileExists(path_)) {
59 ARROW_UNUSED(std::remove(path_.c_str()));
60 }
61 }
62
63 protected:
64 std::string path_;
65};
66
67// ----------------------------------------------------------------------
68// File output tests
69
70class TestFileOutputStream : public FileTestFixture {
71 public:
72 void OpenFile(bool append = false) {
73 ASSERT_OK(FileOutputStream::Open(path_, append, &file_));
74 ASSERT_OK(FileOutputStream::Open(path_, append, &stream_));
75 }
76 void OpenFileDescriptor() {
77 internal::PlatformFilename file_name;
78 ASSERT_OK(internal::FileNameFromString(path_, &file_name));
79 int fd_file, fd_stream;
80 ASSERT_OK(internal::FileOpenWritable(file_name, true /* write_only */,
81 false /* truncate */, false /* append */,
82 &fd_file));
83 ASSERT_OK(FileOutputStream::Open(fd_file, &file_));
84 ASSERT_OK(internal::FileOpenWritable(file_name, true /* write_only */,
85 false /* truncate */, false /* append */,
86 &fd_stream));
87 ASSERT_OK(FileOutputStream::Open(fd_stream, &stream_));
88 }
89
90 protected:
91 std::shared_ptr<FileOutputStream> file_;
92 std::shared_ptr<OutputStream> stream_;
93};
94
95#if defined(_MSC_VER)
96TEST_F(TestFileOutputStream, FileNameWideCharConversionRangeException) {
97 std::shared_ptr<FileOutputStream> file;
98 // Form literal string with non-ASCII symbol(127 + 1)
99 std::string file_name = "\x80";
100 ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &file));
101
102 std::shared_ptr<OutputStream> stream;
103 ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &stream));
104
105 std::shared_ptr<ReadableFile> rd_file;
106 ASSERT_RAISES(Invalid, ReadableFile::Open(file_name, &rd_file));
107}
108#endif
109
110TEST_F(TestFileOutputStream, DestructorClosesFile) {
111 int fd_file, fd_stream;
112
113 OpenFile();
114 fd_file = file_->file_descriptor();
115 fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
116 ASSERT_FALSE(FileIsClosed(fd_file));
117 file_.reset();
118 ASSERT_TRUE(FileIsClosed(fd_file));
119 ASSERT_FALSE(FileIsClosed(fd_stream));
120 stream_.reset();
121 ASSERT_TRUE(FileIsClosed(fd_stream));
122
123 OpenFileDescriptor();
124 fd_file = file_->file_descriptor();
125 fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
126 ASSERT_FALSE(FileIsClosed(fd_file));
127 file_.reset();
128 ASSERT_TRUE(FileIsClosed(fd_file));
129 ASSERT_FALSE(FileIsClosed(fd_stream));
130 stream_.reset();
131 ASSERT_TRUE(FileIsClosed(fd_stream));
132}
133
134TEST_F(TestFileOutputStream, Close) {
135 OpenFile();
136
137 const char* data = "testdata";
138 ASSERT_OK(file_->Write(data, strlen(data)));
139
140 int fd = file_->file_descriptor();
141 ASSERT_FALSE(file_->closed());
142 ASSERT_OK(file_->Close());
143 ASSERT_TRUE(file_->closed());
144 ASSERT_TRUE(FileIsClosed(fd));
145
146 // Idempotent
147 ASSERT_OK(file_->Close());
148
149 AssertFileContents(path_, data);
150
151 ASSERT_OK(stream_->Write(data, strlen(data)));
152
153 fd = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
154 ASSERT_FALSE(stream_->closed());
155 ASSERT_OK(stream_->Close());
156 ASSERT_TRUE(stream_->closed());
157 ASSERT_TRUE(FileIsClosed(fd));
158
159 // Idempotent
160 ASSERT_OK(stream_->Close());
161
162 AssertFileContents(path_, data);
163}
164
165TEST_F(TestFileOutputStream, FromFileDescriptor) {
166 OpenFileDescriptor();
167 stream_.reset();
168
169 std::string data1 = "test";
170 ASSERT_OK(file_->Write(data1.data(), data1.size()));
171 int fd = file_->file_descriptor();
172 ASSERT_OK(file_->Close());
173 ASSERT_TRUE(FileIsClosed(fd));
174
175 AssertFileContents(path_, data1);
176
177 // Re-open at end of file
178 internal::PlatformFilename file_name;
179 ASSERT_OK(internal::FileNameFromString(path_, &file_name));
180 ASSERT_OK(internal::FileOpenWritable(file_name, true /* write_only */,
181 false /* truncate */, false /* append */, &fd));
182 ASSERT_OK(internal::FileSeek(fd, 0, SEEK_END));
183 ASSERT_OK(FileOutputStream::Open(fd, &stream_));
184
185 std::string data2 = "data";
186 ASSERT_OK(stream_->Write(data2.data(), data2.size()));
187 ASSERT_OK(stream_->Close());
188
189 AssertFileContents(path_, data1 + data2);
190}
191
192TEST_F(TestFileOutputStream, InvalidWrites) {
193 OpenFile();
194
195 const char* data = "";
196
197 ASSERT_RAISES(IOError, file_->Write(data, -1));
198 ASSERT_RAISES(IOError, stream_->Write(data, -1));
199}
200
201TEST_F(TestFileOutputStream, Tell) {
202 OpenFile();
203
204 int64_t position;
205 ASSERT_OK(file_->Tell(&position));
206 ASSERT_EQ(0, position);
207
208 const char* data = "testdata";
209 ASSERT_OK(file_->Write(data, 8));
210 ASSERT_OK(file_->Tell(&position));
211 ASSERT_EQ(8, position);
212
213 ASSERT_OK(stream_->Tell(&position));
214 ASSERT_EQ(0, position);
215
216 ASSERT_OK(stream_->Write(data, 8));
217 ASSERT_OK(stream_->Tell(&position));
218 ASSERT_EQ(8, position);
219}
220
221TEST_F(TestFileOutputStream, TruncatesNewFile) {
222 ASSERT_OK(FileOutputStream::Open(path_, &file_));
223
224 const char* data = "testdata";
225 ASSERT_OK(file_->Write(data, strlen(data)));
226 ASSERT_OK(file_->Close());
227
228 ASSERT_OK(FileOutputStream::Open(path_, &file_));
229 ASSERT_OK(file_->Close());
230
231 AssertFileContents(path_, "");
232
233 // Same with stream-returning API
234 ASSERT_OK(FileOutputStream::Open(path_, &stream_));
235
236 ASSERT_OK(stream_->Write(data, strlen(data)));
237 ASSERT_OK(stream_->Close());
238
239 ASSERT_OK(FileOutputStream::Open(path_, &stream_));
240 ASSERT_OK(stream_->Close());
241
242 AssertFileContents(path_, "");
243}
244
245TEST_F(TestFileOutputStream, Append) {
246 ASSERT_OK(FileOutputStream::Open(path_, &file_));
247 {
248 const char* data = "test";
249 ASSERT_OK(file_->Write(data, strlen(data)));
250 }
251 ASSERT_OK(file_->Close());
252 ASSERT_OK(FileOutputStream::Open(path_, true /* append */, &file_));
253 {
254 const char* data = "data";
255 ASSERT_OK(file_->Write(data, strlen(data)));
256 }
257 ASSERT_OK(file_->Close());
258 AssertFileContents(path_, "testdata");
259
260 // Same with stream-returning API
261 ASSERT_OK(FileOutputStream::Open(path_, &stream_));
262 {
263 const char* data = "test";
264 ASSERT_OK(stream_->Write(data, strlen(data)));
265 }
266 ASSERT_OK(stream_->Close());
267
268 ASSERT_OK(FileOutputStream::Open(path_, true /* append */, &stream_));
269 {
270 const char* data = "data";
271 ASSERT_OK(stream_->Write(data, strlen(data)));
272 }
273 ASSERT_OK(stream_->Close());
274 AssertFileContents(path_, "testdata");
275}
276
277// ----------------------------------------------------------------------
278// File input tests
279
280class TestReadableFile : public FileTestFixture {
281 public:
282 void OpenFile() { ASSERT_OK(ReadableFile::Open(path_, &file_)); }
283
284 void MakeTestFile() {
285 std::string data = "testdata";
286 std::ofstream stream;
287 stream.open(path_.c_str());
288 stream << data;
289 }
290
291 protected:
292 std::shared_ptr<ReadableFile> file_;
293};
294
295TEST_F(TestReadableFile, DestructorClosesFile) {
296 MakeTestFile();
297
298 int fd;
299 {
300 std::shared_ptr<ReadableFile> file;
301 ASSERT_OK(ReadableFile::Open(path_, &file));
302 fd = file->file_descriptor();
303 }
304 ASSERT_TRUE(FileIsClosed(fd));
305}
306
307TEST_F(TestReadableFile, Close) {
308 MakeTestFile();
309 OpenFile();
310
311 int fd = file_->file_descriptor();
312 ASSERT_FALSE(file_->closed());
313 ASSERT_OK(file_->Close());
314 ASSERT_TRUE(file_->closed());
315
316 ASSERT_TRUE(FileIsClosed(fd));
317
318 // Idempotent
319 ASSERT_OK(file_->Close());
320 ASSERT_TRUE(FileIsClosed(fd));
321}
322
323TEST_F(TestReadableFile, FromFileDescriptor) {
324 MakeTestFile();
325
326 internal::PlatformFilename file_name;
327 int fd = -2;
328 ASSERT_OK(internal::FileNameFromString(path_, &file_name));
329 ASSERT_OK(internal::FileOpenReadable(file_name, &fd));
330 ASSERT_GE(fd, 0);
331 ASSERT_OK(internal::FileSeek(fd, 4));
332
333 ASSERT_OK(ReadableFile::Open(fd, &file_));
334 ASSERT_EQ(file_->file_descriptor(), fd);
335 std::shared_ptr<Buffer> buf;
336 ASSERT_OK(file_->Read(5, &buf));
337 ASSERT_EQ(buf->size(), 4);
338 ASSERT_TRUE(buf->Equals(Buffer("data")));
339
340 ASSERT_FALSE(FileIsClosed(fd));
341 ASSERT_OK(file_->Close());
342 ASSERT_TRUE(FileIsClosed(fd));
343 // Idempotent
344 ASSERT_OK(file_->Close());
345 ASSERT_TRUE(FileIsClosed(fd));
346}
347
348TEST_F(TestReadableFile, Peek) {
349 MakeTestFile();
350 OpenFile();
351
352 // Cannot peek
353 auto view = file_->Peek(4);
354 ASSERT_EQ(0, view.size());
355}
356
357TEST_F(TestReadableFile, SeekTellSize) {
358 MakeTestFile();
359 OpenFile();
360
361 int64_t position;
362 ASSERT_OK(file_->Tell(&position));
363 ASSERT_EQ(0, position);
364
365 ASSERT_OK(file_->Seek(4));
366 ASSERT_OK(file_->Tell(&position));
367 ASSERT_EQ(4, position);
368
369 ASSERT_OK(file_->Seek(100));
370 ASSERT_OK(file_->Tell(&position));
371
372 // Can seek past end of file
373 ASSERT_EQ(100, position);
374
375 int64_t size;
376 ASSERT_OK(file_->GetSize(&size));
377 ASSERT_EQ(8, size);
378
379 ASSERT_OK(file_->Tell(&position));
380 ASSERT_EQ(100, position);
381
382 // does not support zero copy
383 ASSERT_FALSE(file_->supports_zero_copy());
384}
385
386TEST_F(TestReadableFile, Read) {
387 uint8_t buffer[50];
388
389 MakeTestFile();
390 OpenFile();
391
392 int64_t bytes_read;
393 ASSERT_OK(file_->Read(4, &bytes_read, buffer));
394 ASSERT_EQ(4, bytes_read);
395 ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
396
397 ASSERT_OK(file_->Read(10, &bytes_read, buffer));
398 ASSERT_EQ(4, bytes_read);
399 ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
400
401 // Test incomplete read, ARROW-1094
402 std::shared_ptr<Buffer> buf;
403 int64_t size;
404 ASSERT_OK(file_->GetSize(&size));
405
406 ASSERT_OK(file_->Seek(1));
407 ASSERT_OK(file_->Read(size, &buf));
408 ASSERT_EQ(size - 1, buf->size());
409}
410
411TEST_F(TestReadableFile, ReadAt) {
412 uint8_t buffer[50];
413 const char* test_data = "testdata";
414
415 MakeTestFile();
416 OpenFile();
417
418 int64_t bytes_read;
419
420 ASSERT_OK(file_->ReadAt(0, 4, &bytes_read, buffer));
421 ASSERT_EQ(4, bytes_read);
422 ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
423
424 ASSERT_OK(file_->ReadAt(1, 10, &bytes_read, buffer));
425 ASSERT_EQ(7, bytes_read);
426 ASSERT_EQ(0, std::memcmp(buffer, "estdata", 7));
427
428 // Check buffer API
429 std::shared_ptr<Buffer> buffer2;
430
431 ASSERT_OK(file_->ReadAt(2, 5, &buffer2));
432 ASSERT_EQ(5, buffer2->size());
433
434 Buffer expected(reinterpret_cast<const uint8_t*>(test_data + 2), 5);
435 ASSERT_TRUE(buffer2->Equals(expected));
436}
437
438TEST_F(TestReadableFile, NonExistentFile) {
439 std::string path = "0xDEADBEEF.txt";
440 Status s = ReadableFile::Open(path, &file_);
441 ASSERT_TRUE(s.IsIOError());
442
443 std::string message = s.message();
444 ASSERT_NE(std::string::npos, message.find(path));
445}
446
447class MyMemoryPool : public MemoryPool {
448 public:
449 MyMemoryPool() : num_allocations_(0) {}
450
451 Status Allocate(int64_t size, uint8_t** out) override {
452 *out = reinterpret_cast<uint8_t*>(std::malloc(size));
453 ++num_allocations_;
454 return Status::OK();
455 }
456
457 void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); }
458
459 Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
460 *ptr = reinterpret_cast<uint8_t*>(std::realloc(*ptr, new_size));
461
462 if (*ptr == NULL) {
463 return Status::OutOfMemory("realloc of size ", new_size, " failed");
464 }
465
466 return Status::OK();
467 }
468
469 int64_t bytes_allocated() const override { return -1; }
470
471 int64_t num_allocations() const { return num_allocations_.load(); }
472
473 private:
474 std::atomic<int64_t> num_allocations_;
475};
476
477TEST_F(TestReadableFile, CustomMemoryPool) {
478 MakeTestFile();
479
480 MyMemoryPool pool;
481 ASSERT_OK(ReadableFile::Open(path_, &pool, &file_));
482
483 std::shared_ptr<Buffer> buffer;
484 ASSERT_OK(file_->ReadAt(0, 4, &buffer));
485 ASSERT_OK(file_->ReadAt(4, 8, &buffer));
486
487 ASSERT_EQ(2, pool.num_allocations());
488}
489
490TEST_F(TestReadableFile, ThreadSafety) {
491 std::string data = "foobar";
492 {
493 std::ofstream stream;
494 stream.open(path_.c_str());
495 stream << data;
496 }
497
498 MyMemoryPool pool;
499 ASSERT_OK(ReadableFile::Open(path_, &pool, &file_));
500
501 std::atomic<int> correct_count(0);
502 int niter = 30000;
503
504 auto ReadData = [&correct_count, &data, &niter, this]() {
505 std::shared_ptr<Buffer> buffer;
506
507 for (int i = 0; i < niter; ++i) {
508 const int offset = i % 3;
509 ASSERT_OK(file_->ReadAt(offset, 3, &buffer));
510 if (0 == memcmp(data.c_str() + offset, buffer->data(), 3)) {
511 correct_count += 1;
512 }
513 }
514 };
515
516 std::thread thread1(ReadData);
517 std::thread thread2(ReadData);
518
519 thread1.join();
520 thread2.join();
521
522 ASSERT_EQ(niter * 2, correct_count);
523}
524
525// ----------------------------------------------------------------------
526// Pipe I/O tests using FileOutputStream
527// (cannot test using ReadableFile as it currently requires seeking)
528
529class TestPipeIO : public ::testing::Test {
530 public:
531 void MakePipe() {
532 int fd[2];
533 ASSERT_OK(internal::CreatePipe(fd));
534 r_ = fd[0];
535 w_ = fd[1];
536 ASSERT_GE(r_, 0);
537 ASSERT_GE(w_, 0);
538 }
539 void ClosePipe() {
540 if (r_ != -1) {
541 ASSERT_OK(internal::FileClose(r_));
542 r_ = -1;
543 }
544 if (w_ != -1) {
545 ASSERT_OK(internal::FileClose(w_));
546 w_ = -1;
547 }
548 }
549 void TearDown() { ClosePipe(); }
550
551 protected:
552 int r_ = -1, w_ = -1;
553};
554
555TEST_F(TestPipeIO, TestWrite) {
556 std::string data1 = "test", data2 = "data!";
557 std::shared_ptr<FileOutputStream> file;
558 uint8_t buffer[10];
559 int64_t bytes_read;
560
561 MakePipe();
562 ASSERT_OK(FileOutputStream::Open(w_, &file));
563 w_ = -1; // now owned by FileOutputStream
564
565 ASSERT_OK(file->Write(data1.data(), data1.size()));
566 ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read));
567 ASSERT_EQ(bytes_read, 4);
568 ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
569
570 ASSERT_OK(file->Write(data2.data(), data2.size()));
571 ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read));
572 ASSERT_EQ(bytes_read, 4);
573 ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
574
575 ASSERT_FALSE(file->closed());
576 ASSERT_OK(file->Close());
577 ASSERT_TRUE(file->closed());
578 ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read));
579 ASSERT_EQ(bytes_read, 1);
580 ASSERT_EQ(0, std::memcmp(buffer, "!", 1));
581 // EOF reached
582 ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read));
583 ASSERT_EQ(bytes_read, 0);
584}
585
586TEST_F(TestPipeIO, ReadableFileFails) {
587 // ReadableFile fails on non-seekable fd
588 std::shared_ptr<ReadableFile> file;
589 ASSERT_RAISES(IOError, ReadableFile::Open(r_, &file));
590}
591
592// ----------------------------------------------------------------------
593// Memory map tests
594
595class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
596 public:
597 void TearDown() { MemoryMapFixture::TearDown(); }
598};
599
600TEST_F(TestMemoryMappedFile, InvalidUsages) {}
601
602TEST_F(TestMemoryMappedFile, ZeroSizeFlie) {
603 std::string path = "io-memory-map-zero-size";
604 std::shared_ptr<MemoryMappedFile> result;
605 ASSERT_OK(InitMemoryMap(0, path, &result));
606
607 int64_t size = -1;
608 ASSERT_OK(result->Tell(&size));
609 ASSERT_EQ(0, size);
610}
611
612TEST_F(TestMemoryMappedFile, WriteRead) {
613 const int64_t buffer_size = 1024;
614 std::vector<uint8_t> buffer(buffer_size);
615
616 random_bytes(1024, 0, buffer.data());
617
618 const int reps = 5;
619
620 std::string path = "io-memory-map-write-read-test";
621 std::shared_ptr<MemoryMappedFile> result;
622 ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &result));
623
624 int64_t position = 0;
625 std::shared_ptr<Buffer> out_buffer;
626 for (int i = 0; i < reps; ++i) {
627 ASSERT_OK(result->Write(buffer.data(), buffer_size));
628 ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
629
630 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
631
632 position += buffer_size;
633 }
634}
635
636TEST_F(TestMemoryMappedFile, WriteResizeRead) {
637 const int64_t buffer_size = 1024;
638 const int reps = 5;
639 std::vector<std::vector<uint8_t>> buffers(reps);
640 for (auto& b : buffers) {
641 b.resize(buffer_size);
642 random_bytes(buffer_size, 0, b.data());
643 }
644
645 std::string path = "io-memory-map-write-read-test";
646 std::shared_ptr<MemoryMappedFile> result;
647 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
648
649 int64_t position = 0;
650 std::shared_ptr<Buffer> out_buffer;
651 for (int i = 0; i < reps; ++i) {
652 if (i != 0) {
653 ASSERT_OK(result->Resize(buffer_size * (i + 1)));
654 }
655 ASSERT_OK(result->Write(buffers[i].data(), buffer_size));
656 ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
657
658 ASSERT_EQ(out_buffer->size(), buffer_size);
659 ASSERT_EQ(0, memcmp(out_buffer->data(), buffers[i].data(), buffer_size));
660 out_buffer.reset();
661
662 position += buffer_size;
663 }
664}
665
666TEST_F(TestMemoryMappedFile, GetConstGetSize) {
667 const int64_t buffer_size = 1024;
668 std::vector<uint8_t> buffer(buffer_size);
669 random_bytes(buffer_size, 0, buffer.data());
670
671 std::string path = "io-memory-map-write-read-test";
672 std::shared_ptr<MemoryMappedFile> result;
673 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
674
675 const auto& const_result = *result;
676 int64_t out_size;
677 ASSERT_OK(const_result.GetSize(&out_size));
678 ASSERT_EQ(buffer_size, out_size);
679}
680
681TEST_F(TestMemoryMappedFile, ResizeRaisesOnExported) {
682 const int64_t buffer_size = 1024;
683 std::vector<uint8_t> buffer(buffer_size);
684 random_bytes(buffer_size, 0, buffer.data());
685
686 std::string path = "io-memory-map-write-read-test";
687 std::shared_ptr<MemoryMappedFile> result;
688 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
689
690 std::shared_ptr<Buffer> out_buffer1, out_buffer2;
691 ASSERT_OK(result->Write(buffer.data(), buffer_size));
692 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer1));
693 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer2));
694 ASSERT_EQ(0, memcmp(out_buffer1->data(), buffer.data(), buffer_size));
695 ASSERT_EQ(0, memcmp(out_buffer2->data(), buffer.data(), buffer_size));
696
697 // attempt resize
698 ASSERT_RAISES(IOError, result->Resize(2 * buffer_size));
699
700 out_buffer1.reset();
701
702 ASSERT_RAISES(IOError, result->Resize(2 * buffer_size));
703
704 out_buffer2.reset();
705
706 ASSERT_OK(result->Resize(2 * buffer_size));
707
708 int64_t map_size;
709 ASSERT_OK(result->GetSize(&map_size));
710 ASSERT_EQ(map_size, 2 * buffer_size);
711
712 int64_t file_size;
713 ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
714 ASSERT_EQ(file_size, buffer_size * 2);
715}
716
717TEST_F(TestMemoryMappedFile, WriteReadZeroInitSize) {
718 const int64_t buffer_size = 1024;
719 std::vector<uint8_t> buffer(buffer_size);
720 random_bytes(buffer_size, 0, buffer.data());
721
722 std::string path = "io-memory-map-write-read-test";
723 std::shared_ptr<MemoryMappedFile> result;
724 ASSERT_OK(InitMemoryMap(0, path, &result));
725
726 std::shared_ptr<Buffer> out_buffer;
727 ASSERT_OK(result->Resize(buffer_size));
728 ASSERT_OK(result->Write(buffer.data(), buffer_size));
729 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
730 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
731
732 int64_t map_size;
733 ASSERT_OK(result->GetSize(&map_size));
734 ASSERT_EQ(map_size, buffer_size);
735}
736
737TEST_F(TestMemoryMappedFile, WriteThenShrink) {
738 const int64_t buffer_size = 1024;
739 std::vector<uint8_t> buffer(buffer_size);
740 random_bytes(buffer_size, 0, buffer.data());
741
742 std::string path = "io-memory-map-write-read-test";
743 std::shared_ptr<MemoryMappedFile> result;
744 ASSERT_OK(InitMemoryMap(buffer_size * 2, path, &result));
745
746 std::shared_ptr<Buffer> out_buffer;
747 ASSERT_OK(result->Resize(buffer_size));
748 ASSERT_OK(result->Write(buffer.data(), buffer_size));
749 ASSERT_OK(result->Resize(buffer_size / 2));
750
751 ASSERT_OK(result->ReadAt(0, buffer_size / 2, &out_buffer));
752 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size / 2));
753
754 int64_t map_size;
755 ASSERT_OK(result->GetSize(&map_size));
756 ASSERT_EQ(map_size, buffer_size / 2);
757
758 int64_t file_size;
759 ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
760 ASSERT_EQ(file_size, buffer_size / 2);
761}
762
763TEST_F(TestMemoryMappedFile, WriteThenShrinkToHalfThenWrite) {
764 const int64_t buffer_size = 1024;
765 std::vector<uint8_t> buffer(buffer_size);
766 random_bytes(buffer_size, 0, buffer.data());
767
768 std::string path = "io-memory-map-write-read-test";
769 std::shared_ptr<MemoryMappedFile> result;
770 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
771
772 std::shared_ptr<Buffer> out_buffer;
773 ASSERT_OK(result->Write(buffer.data(), buffer_size));
774 ASSERT_OK(result->Resize(buffer_size / 2));
775
776 int64_t position;
777 ASSERT_OK(result->Tell(&position));
778 ASSERT_EQ(position, buffer_size / 2);
779
780 ASSERT_OK(result->ReadAt(0, buffer_size / 2, &out_buffer));
781 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size / 2));
782 out_buffer.reset();
783
784 // should resume writing directly at the seam
785 ASSERT_OK(result->Resize(buffer_size));
786 ASSERT_OK(result->Write(buffer.data() + buffer_size / 2, buffer_size / 2));
787
788 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
789 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
790
791 int64_t map_size;
792 ASSERT_OK(result->GetSize(&map_size));
793 ASSERT_EQ(map_size, buffer_size);
794
795 int64_t file_size;
796 ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
797 ASSERT_EQ(file_size, buffer_size);
798}
799
800TEST_F(TestMemoryMappedFile, ResizeToZeroThanWrite) {
801 const int64_t buffer_size = 1024;
802 std::vector<uint8_t> buffer(buffer_size);
803 random_bytes(buffer_size, 0, buffer.data());
804
805 std::string path = "io-memory-map-write-read-test";
806 std::shared_ptr<MemoryMappedFile> result;
807 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
808
809 std::shared_ptr<Buffer> out_buffer;
810 // just a sanity check that writing works ook
811 ASSERT_OK(result->Write(buffer.data(), buffer_size));
812 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
813 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
814 out_buffer.reset();
815
816 ASSERT_OK(result->Resize(0));
817 int64_t mapped_size;
818 ASSERT_OK(result->GetSize(&mapped_size));
819 ASSERT_EQ(mapped_size, 0);
820
821 int64_t position;
822 ASSERT_OK(result->Tell(&position));
823 ASSERT_EQ(position, 0);
824
825 int64_t file_size;
826 ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
827 ASSERT_EQ(file_size, 0);
828
829 // provision a vector to the buffer size in case ReadAt decides
830 // to read even though it shouldn't
831 std::vector<uint8_t> should_remain_empty(buffer_size);
832 int64_t bytes_read;
833 ASSERT_OK(result->ReadAt(0, 1, &bytes_read,
834 reinterpret_cast<void*>(should_remain_empty.data())));
835 ASSERT_EQ(bytes_read, 0);
836
837 // just a sanity check that writing works ook
838 ASSERT_OK(result->Resize(buffer_size));
839 ASSERT_OK(result->Write(buffer.data(), buffer_size));
840 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
841 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
842}
843
844TEST_F(TestMemoryMappedFile, WriteAt) {
845 const int64_t buffer_size = 1024;
846 std::vector<uint8_t> buffer(buffer_size);
847 random_bytes(buffer_size, 0, buffer.data());
848
849 std::string path = "io-memory-map-write-read-test";
850 std::shared_ptr<MemoryMappedFile> result;
851 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
852
853 ASSERT_OK(result->WriteAt(0, buffer.data(), buffer_size / 2));
854
855 ASSERT_OK(
856 result->WriteAt(buffer_size / 2, buffer.data() + buffer_size / 2, buffer_size / 2));
857
858 std::shared_ptr<Buffer> out_buffer;
859 ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
860
861 ASSERT_EQ(memcmp(out_buffer->data(), buffer.data(), buffer_size), 0);
862}
863
864TEST_F(TestMemoryMappedFile, WriteBeyondEnd) {
865 const int64_t buffer_size = 1024;
866 std::vector<uint8_t> buffer(buffer_size);
867 random_bytes(buffer_size, 0, buffer.data());
868
869 std::string path = "io-memory-map-write-read-test";
870 std::shared_ptr<MemoryMappedFile> result;
871 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
872
873 ASSERT_OK(result->Seek(1));
874 ASSERT_RAISES(Invalid, result->Write(buffer.data(), buffer_size));
875
876 // The position should remain unchanged afterwards
877 int64_t position;
878 ASSERT_OK(result->Tell(&position));
879 ASSERT_EQ(position, 1);
880}
881
882TEST_F(TestMemoryMappedFile, WriteAtBeyondEnd) {
883 const int64_t buffer_size = 1024;
884 std::vector<uint8_t> buffer(buffer_size);
885 random_bytes(buffer_size, 0, buffer.data());
886
887 std::string path = "io-memory-map-write-read-test";
888 std::shared_ptr<MemoryMappedFile> result;
889 ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
890
891 ASSERT_RAISES(Invalid, result->WriteAt(1, buffer.data(), buffer_size));
892
893 // The position should remain unchanged afterwards
894 int64_t position;
895 ASSERT_OK(result->Tell(&position));
896 ASSERT_EQ(position, 0);
897}
898
899TEST_F(TestMemoryMappedFile, GetSize) {
900 std::string path = "io-memory-map-get-size";
901 std::shared_ptr<MemoryMappedFile> result;
902 ASSERT_OK(InitMemoryMap(16384, path, &result));
903
904 int64_t size = -1;
905 ASSERT_OK(result->GetSize(&size));
906 ASSERT_EQ(16384, size);
907
908 int64_t position = -1;
909 ASSERT_OK(result->Tell(&position));
910 ASSERT_EQ(0, position);
911}
912
913TEST_F(TestMemoryMappedFile, ReadOnly) {
914 const int64_t buffer_size = 1024;
915 std::vector<uint8_t> buffer(buffer_size);
916
917 random_bytes(1024, 0, buffer.data());
918
919 const int reps = 5;
920
921 std::string path = "ipc-read-only-test";
922 std::shared_ptr<MemoryMappedFile> rwmmap;
923 ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &rwmmap));
924
925 int64_t position = 0;
926 for (int i = 0; i < reps; ++i) {
927 ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
928 position += buffer_size;
929 }
930 ASSERT_OK(rwmmap->Close());
931
932 std::shared_ptr<MemoryMappedFile> rommap;
933 ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
934
935 position = 0;
936 std::shared_ptr<Buffer> out_buffer;
937 for (int i = 0; i < reps; ++i) {
938 ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
939
940 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
941 position += buffer_size;
942 }
943 ASSERT_OK(rommap->Close());
944}
945
946TEST_F(TestMemoryMappedFile, DISABLED_ReadWriteOver4GbFile) {
947 // ARROW-1096
948 const int64_t buffer_size = 1000 * 1000;
949 std::vector<uint8_t> buffer(buffer_size);
950
951 random_bytes(buffer_size, 0, buffer.data());
952
953 const int64_t reps = 5000;
954
955 std::string path = "ipc-read-over-4gb-file-test";
956 std::shared_ptr<MemoryMappedFile> rwmmap;
957 ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &rwmmap));
958 AppendFile(path);
959
960 int64_t position = 0;
961 for (int i = 0; i < reps; ++i) {
962 ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
963 position += buffer_size;
964 }
965 ASSERT_OK(rwmmap->Close());
966
967 std::shared_ptr<MemoryMappedFile> rommap;
968 ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
969
970 position = 0;
971 std::shared_ptr<Buffer> out_buffer;
972 for (int i = 0; i < reps; ++i) {
973 ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
974
975 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
976 position += buffer_size;
977 }
978 ASSERT_OK(rommap->Close());
979}
980
981TEST_F(TestMemoryMappedFile, RetainMemoryMapReference) {
982 // ARROW-494
983
984 const int64_t buffer_size = 1024;
985 std::vector<uint8_t> buffer(buffer_size);
986
987 random_bytes(1024, 0, buffer.data());
988
989 std::string path = "ipc-read-only-test";
990 CreateFile(path, buffer_size);
991
992 {
993 std::shared_ptr<MemoryMappedFile> rwmmap;
994 ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
995 ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
996 ASSERT_FALSE(rwmmap->closed());
997 ASSERT_OK(rwmmap->Close());
998 ASSERT_TRUE(rwmmap->closed());
999 }
1000
1001 std::shared_ptr<Buffer> out_buffer;
1002
1003 {
1004 std::shared_ptr<MemoryMappedFile> rommap;
1005 ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
1006 ASSERT_OK(rommap->Read(buffer_size, &out_buffer));
1007 ASSERT_FALSE(rommap->closed());
1008 ASSERT_OK(rommap->Close());
1009 ASSERT_TRUE(rommap->closed());
1010 }
1011
1012 // valgrind will catch if memory is unmapped
1013 ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
1014}
1015
1016TEST_F(TestMemoryMappedFile, InvalidMode) {
1017 const int64_t buffer_size = 1024;
1018 std::vector<uint8_t> buffer(buffer_size);
1019
1020 random_bytes(1024, 0, buffer.data());
1021
1022 std::string path = "ipc-invalid-mode-test";
1023 CreateFile(path, buffer_size);
1024
1025 std::shared_ptr<MemoryMappedFile> rommap;
1026 ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
1027
1028 ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size));
1029}
1030
1031TEST_F(TestMemoryMappedFile, InvalidFile) {
1032 std::string non_existent_path = "invalid-file-name-asfd";
1033
1034 std::shared_ptr<MemoryMappedFile> result;
1035 ASSERT_RAISES(IOError,
1036 MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
1037}
1038
1039TEST_F(TestMemoryMappedFile, CastableToFileInterface) {
1040 std::shared_ptr<MemoryMappedFile> memory_mapped_file;
1041 std::shared_ptr<FileInterface> file = memory_mapped_file;
1042}
1043
1044TEST_F(TestMemoryMappedFile, ThreadSafety) {
1045 std::string data = "foobar";
1046 std::string path = "ipc-multithreading-test";
1047 CreateFile(path, static_cast<int>(data.size()));
1048
1049 std::shared_ptr<MemoryMappedFile> file;
1050 ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &file));
1051 ASSERT_OK(file->Write(data.c_str(), static_cast<int64_t>(data.size())));
1052
1053 std::atomic<int> correct_count(0);
1054 int niter = 10000;
1055
1056 auto ReadData = [&correct_count, &data, &file, &niter]() {
1057 std::shared_ptr<Buffer> buffer;
1058
1059 for (int i = 0; i < niter; ++i) {
1060 ASSERT_OK(file->ReadAt(0, 3, &buffer));
1061 if (0 == memcmp(data.c_str(), buffer->data(), 3)) {
1062 correct_count += 1;
1063 }
1064 }
1065 };
1066
1067 std::thread thread1(ReadData);
1068 std::thread thread2(ReadData);
1069
1070 thread1.join();
1071 thread2.join();
1072
1073 ASSERT_EQ(niter * 2, correct_count);
1074}
1075
1076} // namespace io
1077} // namespace arrow
1078