1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef _WIN32 |
19 | #include <fcntl.h> // IWYU pragma: keep |
20 | #include <unistd.h> |
21 | #endif |
22 | |
23 | #include <atomic> |
24 | #include <cstdint> |
25 | #include <cstdio> |
26 | #include <cstdlib> |
27 | #include <cstring> |
28 | #include <memory> |
29 | #include <sstream> // IWYU pragma: keep |
30 | #include <string> |
31 | #include <thread> |
32 | #include <vector> |
33 | |
34 | #include <gtest/gtest.h> |
35 | |
36 | #include "arrow/buffer.h" |
37 | #include "arrow/io/file.h" |
38 | #include "arrow/io/interfaces.h" |
39 | #include "arrow/io/test-common.h" |
40 | #include "arrow/memory_pool.h" |
41 | #include "arrow/status.h" |
42 | #include "arrow/test-util.h" |
43 | #include "arrow/util/io-util.h" |
44 | |
45 | namespace arrow { |
46 | namespace io { |
47 | |
48 | class FileTestFixture : public ::testing::Test { |
49 | public: |
50 | void SetUp() { |
51 | path_ = "arrow-test-io-file.txt" ; |
52 | EnsureFileDeleted(); |
53 | } |
54 | |
55 | void TearDown() { EnsureFileDeleted(); } |
56 | |
57 | void EnsureFileDeleted() { |
58 | if (FileExists(path_)) { |
59 | ARROW_UNUSED(std::remove(path_.c_str())); |
60 | } |
61 | } |
62 | |
63 | protected: |
64 | std::string path_; |
65 | }; |
66 | |
67 | // ---------------------------------------------------------------------- |
68 | // File output tests |
69 | |
70 | class TestFileOutputStream : public FileTestFixture { |
71 | public: |
72 | void OpenFile(bool append = false) { |
73 | ASSERT_OK(FileOutputStream::Open(path_, append, &file_)); |
74 | ASSERT_OK(FileOutputStream::Open(path_, append, &stream_)); |
75 | } |
76 | void OpenFileDescriptor() { |
77 | internal::PlatformFilename file_name; |
78 | ASSERT_OK(internal::FileNameFromString(path_, &file_name)); |
79 | int fd_file, fd_stream; |
80 | ASSERT_OK(internal::FileOpenWritable(file_name, true /* write_only */, |
81 | false /* truncate */, false /* append */, |
82 | &fd_file)); |
83 | ASSERT_OK(FileOutputStream::Open(fd_file, &file_)); |
84 | ASSERT_OK(internal::FileOpenWritable(file_name, true /* write_only */, |
85 | false /* truncate */, false /* append */, |
86 | &fd_stream)); |
87 | ASSERT_OK(FileOutputStream::Open(fd_stream, &stream_)); |
88 | } |
89 | |
90 | protected: |
91 | std::shared_ptr<FileOutputStream> file_; |
92 | std::shared_ptr<OutputStream> stream_; |
93 | }; |
94 | |
95 | #if defined(_MSC_VER) |
96 | TEST_F(TestFileOutputStream, FileNameWideCharConversionRangeException) { |
97 | std::shared_ptr<FileOutputStream> file; |
98 | // Form literal string with non-ASCII symbol(127 + 1) |
99 | std::string file_name = "\x80" ; |
100 | ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &file)); |
101 | |
102 | std::shared_ptr<OutputStream> stream; |
103 | ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &stream)); |
104 | |
105 | std::shared_ptr<ReadableFile> rd_file; |
106 | ASSERT_RAISES(Invalid, ReadableFile::Open(file_name, &rd_file)); |
107 | } |
108 | #endif |
109 | |
110 | TEST_F(TestFileOutputStream, DestructorClosesFile) { |
111 | int fd_file, fd_stream; |
112 | |
113 | OpenFile(); |
114 | fd_file = file_->file_descriptor(); |
115 | fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor(); |
116 | ASSERT_FALSE(FileIsClosed(fd_file)); |
117 | file_.reset(); |
118 | ASSERT_TRUE(FileIsClosed(fd_file)); |
119 | ASSERT_FALSE(FileIsClosed(fd_stream)); |
120 | stream_.reset(); |
121 | ASSERT_TRUE(FileIsClosed(fd_stream)); |
122 | |
123 | OpenFileDescriptor(); |
124 | fd_file = file_->file_descriptor(); |
125 | fd_stream = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor(); |
126 | ASSERT_FALSE(FileIsClosed(fd_file)); |
127 | file_.reset(); |
128 | ASSERT_TRUE(FileIsClosed(fd_file)); |
129 | ASSERT_FALSE(FileIsClosed(fd_stream)); |
130 | stream_.reset(); |
131 | ASSERT_TRUE(FileIsClosed(fd_stream)); |
132 | } |
133 | |
134 | TEST_F(TestFileOutputStream, Close) { |
135 | OpenFile(); |
136 | |
137 | const char* data = "testdata" ; |
138 | ASSERT_OK(file_->Write(data, strlen(data))); |
139 | |
140 | int fd = file_->file_descriptor(); |
141 | ASSERT_FALSE(file_->closed()); |
142 | ASSERT_OK(file_->Close()); |
143 | ASSERT_TRUE(file_->closed()); |
144 | ASSERT_TRUE(FileIsClosed(fd)); |
145 | |
146 | // Idempotent |
147 | ASSERT_OK(file_->Close()); |
148 | |
149 | AssertFileContents(path_, data); |
150 | |
151 | ASSERT_OK(stream_->Write(data, strlen(data))); |
152 | |
153 | fd = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor(); |
154 | ASSERT_FALSE(stream_->closed()); |
155 | ASSERT_OK(stream_->Close()); |
156 | ASSERT_TRUE(stream_->closed()); |
157 | ASSERT_TRUE(FileIsClosed(fd)); |
158 | |
159 | // Idempotent |
160 | ASSERT_OK(stream_->Close()); |
161 | |
162 | AssertFileContents(path_, data); |
163 | } |
164 | |
165 | TEST_F(TestFileOutputStream, FromFileDescriptor) { |
166 | OpenFileDescriptor(); |
167 | stream_.reset(); |
168 | |
169 | std::string data1 = "test" ; |
170 | ASSERT_OK(file_->Write(data1.data(), data1.size())); |
171 | int fd = file_->file_descriptor(); |
172 | ASSERT_OK(file_->Close()); |
173 | ASSERT_TRUE(FileIsClosed(fd)); |
174 | |
175 | AssertFileContents(path_, data1); |
176 | |
177 | // Re-open at end of file |
178 | internal::PlatformFilename file_name; |
179 | ASSERT_OK(internal::FileNameFromString(path_, &file_name)); |
180 | ASSERT_OK(internal::FileOpenWritable(file_name, true /* write_only */, |
181 | false /* truncate */, false /* append */, &fd)); |
182 | ASSERT_OK(internal::FileSeek(fd, 0, SEEK_END)); |
183 | ASSERT_OK(FileOutputStream::Open(fd, &stream_)); |
184 | |
185 | std::string data2 = "data" ; |
186 | ASSERT_OK(stream_->Write(data2.data(), data2.size())); |
187 | ASSERT_OK(stream_->Close()); |
188 | |
189 | AssertFileContents(path_, data1 + data2); |
190 | } |
191 | |
192 | TEST_F(TestFileOutputStream, InvalidWrites) { |
193 | OpenFile(); |
194 | |
195 | const char* data = "" ; |
196 | |
197 | ASSERT_RAISES(IOError, file_->Write(data, -1)); |
198 | ASSERT_RAISES(IOError, stream_->Write(data, -1)); |
199 | } |
200 | |
201 | TEST_F(TestFileOutputStream, Tell) { |
202 | OpenFile(); |
203 | |
204 | int64_t position; |
205 | ASSERT_OK(file_->Tell(&position)); |
206 | ASSERT_EQ(0, position); |
207 | |
208 | const char* data = "testdata" ; |
209 | ASSERT_OK(file_->Write(data, 8)); |
210 | ASSERT_OK(file_->Tell(&position)); |
211 | ASSERT_EQ(8, position); |
212 | |
213 | ASSERT_OK(stream_->Tell(&position)); |
214 | ASSERT_EQ(0, position); |
215 | |
216 | ASSERT_OK(stream_->Write(data, 8)); |
217 | ASSERT_OK(stream_->Tell(&position)); |
218 | ASSERT_EQ(8, position); |
219 | } |
220 | |
221 | TEST_F(TestFileOutputStream, TruncatesNewFile) { |
222 | ASSERT_OK(FileOutputStream::Open(path_, &file_)); |
223 | |
224 | const char* data = "testdata" ; |
225 | ASSERT_OK(file_->Write(data, strlen(data))); |
226 | ASSERT_OK(file_->Close()); |
227 | |
228 | ASSERT_OK(FileOutputStream::Open(path_, &file_)); |
229 | ASSERT_OK(file_->Close()); |
230 | |
231 | AssertFileContents(path_, "" ); |
232 | |
233 | // Same with stream-returning API |
234 | ASSERT_OK(FileOutputStream::Open(path_, &stream_)); |
235 | |
236 | ASSERT_OK(stream_->Write(data, strlen(data))); |
237 | ASSERT_OK(stream_->Close()); |
238 | |
239 | ASSERT_OK(FileOutputStream::Open(path_, &stream_)); |
240 | ASSERT_OK(stream_->Close()); |
241 | |
242 | AssertFileContents(path_, "" ); |
243 | } |
244 | |
245 | TEST_F(TestFileOutputStream, Append) { |
246 | ASSERT_OK(FileOutputStream::Open(path_, &file_)); |
247 | { |
248 | const char* data = "test" ; |
249 | ASSERT_OK(file_->Write(data, strlen(data))); |
250 | } |
251 | ASSERT_OK(file_->Close()); |
252 | ASSERT_OK(FileOutputStream::Open(path_, true /* append */, &file_)); |
253 | { |
254 | const char* data = "data" ; |
255 | ASSERT_OK(file_->Write(data, strlen(data))); |
256 | } |
257 | ASSERT_OK(file_->Close()); |
258 | AssertFileContents(path_, "testdata" ); |
259 | |
260 | // Same with stream-returning API |
261 | ASSERT_OK(FileOutputStream::Open(path_, &stream_)); |
262 | { |
263 | const char* data = "test" ; |
264 | ASSERT_OK(stream_->Write(data, strlen(data))); |
265 | } |
266 | ASSERT_OK(stream_->Close()); |
267 | |
268 | ASSERT_OK(FileOutputStream::Open(path_, true /* append */, &stream_)); |
269 | { |
270 | const char* data = "data" ; |
271 | ASSERT_OK(stream_->Write(data, strlen(data))); |
272 | } |
273 | ASSERT_OK(stream_->Close()); |
274 | AssertFileContents(path_, "testdata" ); |
275 | } |
276 | |
277 | // ---------------------------------------------------------------------- |
278 | // File input tests |
279 | |
280 | class TestReadableFile : public FileTestFixture { |
281 | public: |
282 | void OpenFile() { ASSERT_OK(ReadableFile::Open(path_, &file_)); } |
283 | |
284 | void MakeTestFile() { |
285 | std::string data = "testdata" ; |
286 | std::ofstream stream; |
287 | stream.open(path_.c_str()); |
288 | stream << data; |
289 | } |
290 | |
291 | protected: |
292 | std::shared_ptr<ReadableFile> file_; |
293 | }; |
294 | |
295 | TEST_F(TestReadableFile, DestructorClosesFile) { |
296 | MakeTestFile(); |
297 | |
298 | int fd; |
299 | { |
300 | std::shared_ptr<ReadableFile> file; |
301 | ASSERT_OK(ReadableFile::Open(path_, &file)); |
302 | fd = file->file_descriptor(); |
303 | } |
304 | ASSERT_TRUE(FileIsClosed(fd)); |
305 | } |
306 | |
307 | TEST_F(TestReadableFile, Close) { |
308 | MakeTestFile(); |
309 | OpenFile(); |
310 | |
311 | int fd = file_->file_descriptor(); |
312 | ASSERT_FALSE(file_->closed()); |
313 | ASSERT_OK(file_->Close()); |
314 | ASSERT_TRUE(file_->closed()); |
315 | |
316 | ASSERT_TRUE(FileIsClosed(fd)); |
317 | |
318 | // Idempotent |
319 | ASSERT_OK(file_->Close()); |
320 | ASSERT_TRUE(FileIsClosed(fd)); |
321 | } |
322 | |
323 | TEST_F(TestReadableFile, FromFileDescriptor) { |
324 | MakeTestFile(); |
325 | |
326 | internal::PlatformFilename file_name; |
327 | int fd = -2; |
328 | ASSERT_OK(internal::FileNameFromString(path_, &file_name)); |
329 | ASSERT_OK(internal::FileOpenReadable(file_name, &fd)); |
330 | ASSERT_GE(fd, 0); |
331 | ASSERT_OK(internal::FileSeek(fd, 4)); |
332 | |
333 | ASSERT_OK(ReadableFile::Open(fd, &file_)); |
334 | ASSERT_EQ(file_->file_descriptor(), fd); |
335 | std::shared_ptr<Buffer> buf; |
336 | ASSERT_OK(file_->Read(5, &buf)); |
337 | ASSERT_EQ(buf->size(), 4); |
338 | ASSERT_TRUE(buf->Equals(Buffer("data" ))); |
339 | |
340 | ASSERT_FALSE(FileIsClosed(fd)); |
341 | ASSERT_OK(file_->Close()); |
342 | ASSERT_TRUE(FileIsClosed(fd)); |
343 | // Idempotent |
344 | ASSERT_OK(file_->Close()); |
345 | ASSERT_TRUE(FileIsClosed(fd)); |
346 | } |
347 | |
348 | TEST_F(TestReadableFile, Peek) { |
349 | MakeTestFile(); |
350 | OpenFile(); |
351 | |
352 | // Cannot peek |
353 | auto view = file_->Peek(4); |
354 | ASSERT_EQ(0, view.size()); |
355 | } |
356 | |
357 | TEST_F(TestReadableFile, SeekTellSize) { |
358 | MakeTestFile(); |
359 | OpenFile(); |
360 | |
361 | int64_t position; |
362 | ASSERT_OK(file_->Tell(&position)); |
363 | ASSERT_EQ(0, position); |
364 | |
365 | ASSERT_OK(file_->Seek(4)); |
366 | ASSERT_OK(file_->Tell(&position)); |
367 | ASSERT_EQ(4, position); |
368 | |
369 | ASSERT_OK(file_->Seek(100)); |
370 | ASSERT_OK(file_->Tell(&position)); |
371 | |
372 | // Can seek past end of file |
373 | ASSERT_EQ(100, position); |
374 | |
375 | int64_t size; |
376 | ASSERT_OK(file_->GetSize(&size)); |
377 | ASSERT_EQ(8, size); |
378 | |
379 | ASSERT_OK(file_->Tell(&position)); |
380 | ASSERT_EQ(100, position); |
381 | |
382 | // does not support zero copy |
383 | ASSERT_FALSE(file_->supports_zero_copy()); |
384 | } |
385 | |
386 | TEST_F(TestReadableFile, Read) { |
387 | uint8_t buffer[50]; |
388 | |
389 | MakeTestFile(); |
390 | OpenFile(); |
391 | |
392 | int64_t bytes_read; |
393 | ASSERT_OK(file_->Read(4, &bytes_read, buffer)); |
394 | ASSERT_EQ(4, bytes_read); |
395 | ASSERT_EQ(0, std::memcmp(buffer, "test" , 4)); |
396 | |
397 | ASSERT_OK(file_->Read(10, &bytes_read, buffer)); |
398 | ASSERT_EQ(4, bytes_read); |
399 | ASSERT_EQ(0, std::memcmp(buffer, "data" , 4)); |
400 | |
401 | // Test incomplete read, ARROW-1094 |
402 | std::shared_ptr<Buffer> buf; |
403 | int64_t size; |
404 | ASSERT_OK(file_->GetSize(&size)); |
405 | |
406 | ASSERT_OK(file_->Seek(1)); |
407 | ASSERT_OK(file_->Read(size, &buf)); |
408 | ASSERT_EQ(size - 1, buf->size()); |
409 | } |
410 | |
411 | TEST_F(TestReadableFile, ReadAt) { |
412 | uint8_t buffer[50]; |
413 | const char* test_data = "testdata" ; |
414 | |
415 | MakeTestFile(); |
416 | OpenFile(); |
417 | |
418 | int64_t bytes_read; |
419 | |
420 | ASSERT_OK(file_->ReadAt(0, 4, &bytes_read, buffer)); |
421 | ASSERT_EQ(4, bytes_read); |
422 | ASSERT_EQ(0, std::memcmp(buffer, "test" , 4)); |
423 | |
424 | ASSERT_OK(file_->ReadAt(1, 10, &bytes_read, buffer)); |
425 | ASSERT_EQ(7, bytes_read); |
426 | ASSERT_EQ(0, std::memcmp(buffer, "estdata" , 7)); |
427 | |
428 | // Check buffer API |
429 | std::shared_ptr<Buffer> buffer2; |
430 | |
431 | ASSERT_OK(file_->ReadAt(2, 5, &buffer2)); |
432 | ASSERT_EQ(5, buffer2->size()); |
433 | |
434 | Buffer expected(reinterpret_cast<const uint8_t*>(test_data + 2), 5); |
435 | ASSERT_TRUE(buffer2->Equals(expected)); |
436 | } |
437 | |
438 | TEST_F(TestReadableFile, NonExistentFile) { |
439 | std::string path = "0xDEADBEEF.txt" ; |
440 | Status s = ReadableFile::Open(path, &file_); |
441 | ASSERT_TRUE(s.IsIOError()); |
442 | |
443 | std::string message = s.message(); |
444 | ASSERT_NE(std::string::npos, message.find(path)); |
445 | } |
446 | |
447 | class MyMemoryPool : public MemoryPool { |
448 | public: |
449 | MyMemoryPool() : num_allocations_(0) {} |
450 | |
451 | Status Allocate(int64_t size, uint8_t** out) override { |
452 | *out = reinterpret_cast<uint8_t*>(std::malloc(size)); |
453 | ++num_allocations_; |
454 | return Status::OK(); |
455 | } |
456 | |
457 | void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); } |
458 | |
459 | Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { |
460 | *ptr = reinterpret_cast<uint8_t*>(std::realloc(*ptr, new_size)); |
461 | |
462 | if (*ptr == NULL) { |
463 | return Status::OutOfMemory("realloc of size " , new_size, " failed" ); |
464 | } |
465 | |
466 | return Status::OK(); |
467 | } |
468 | |
469 | int64_t bytes_allocated() const override { return -1; } |
470 | |
471 | int64_t num_allocations() const { return num_allocations_.load(); } |
472 | |
473 | private: |
474 | std::atomic<int64_t> num_allocations_; |
475 | }; |
476 | |
477 | TEST_F(TestReadableFile, CustomMemoryPool) { |
478 | MakeTestFile(); |
479 | |
480 | MyMemoryPool pool; |
481 | ASSERT_OK(ReadableFile::Open(path_, &pool, &file_)); |
482 | |
483 | std::shared_ptr<Buffer> buffer; |
484 | ASSERT_OK(file_->ReadAt(0, 4, &buffer)); |
485 | ASSERT_OK(file_->ReadAt(4, 8, &buffer)); |
486 | |
487 | ASSERT_EQ(2, pool.num_allocations()); |
488 | } |
489 | |
490 | TEST_F(TestReadableFile, ThreadSafety) { |
491 | std::string data = "foobar" ; |
492 | { |
493 | std::ofstream stream; |
494 | stream.open(path_.c_str()); |
495 | stream << data; |
496 | } |
497 | |
498 | MyMemoryPool pool; |
499 | ASSERT_OK(ReadableFile::Open(path_, &pool, &file_)); |
500 | |
501 | std::atomic<int> correct_count(0); |
502 | int niter = 30000; |
503 | |
504 | auto ReadData = [&correct_count, &data, &niter, this]() { |
505 | std::shared_ptr<Buffer> buffer; |
506 | |
507 | for (int i = 0; i < niter; ++i) { |
508 | const int offset = i % 3; |
509 | ASSERT_OK(file_->ReadAt(offset, 3, &buffer)); |
510 | if (0 == memcmp(data.c_str() + offset, buffer->data(), 3)) { |
511 | correct_count += 1; |
512 | } |
513 | } |
514 | }; |
515 | |
516 | std::thread thread1(ReadData); |
517 | std::thread thread2(ReadData); |
518 | |
519 | thread1.join(); |
520 | thread2.join(); |
521 | |
522 | ASSERT_EQ(niter * 2, correct_count); |
523 | } |
524 | |
525 | // ---------------------------------------------------------------------- |
526 | // Pipe I/O tests using FileOutputStream |
527 | // (cannot test using ReadableFile as it currently requires seeking) |
528 | |
529 | class TestPipeIO : public ::testing::Test { |
530 | public: |
531 | void MakePipe() { |
532 | int fd[2]; |
533 | ASSERT_OK(internal::CreatePipe(fd)); |
534 | r_ = fd[0]; |
535 | w_ = fd[1]; |
536 | ASSERT_GE(r_, 0); |
537 | ASSERT_GE(w_, 0); |
538 | } |
539 | void ClosePipe() { |
540 | if (r_ != -1) { |
541 | ASSERT_OK(internal::FileClose(r_)); |
542 | r_ = -1; |
543 | } |
544 | if (w_ != -1) { |
545 | ASSERT_OK(internal::FileClose(w_)); |
546 | w_ = -1; |
547 | } |
548 | } |
549 | void TearDown() { ClosePipe(); } |
550 | |
551 | protected: |
552 | int r_ = -1, w_ = -1; |
553 | }; |
554 | |
555 | TEST_F(TestPipeIO, TestWrite) { |
556 | std::string data1 = "test" , data2 = "data!" ; |
557 | std::shared_ptr<FileOutputStream> file; |
558 | uint8_t buffer[10]; |
559 | int64_t bytes_read; |
560 | |
561 | MakePipe(); |
562 | ASSERT_OK(FileOutputStream::Open(w_, &file)); |
563 | w_ = -1; // now owned by FileOutputStream |
564 | |
565 | ASSERT_OK(file->Write(data1.data(), data1.size())); |
566 | ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read)); |
567 | ASSERT_EQ(bytes_read, 4); |
568 | ASSERT_EQ(0, std::memcmp(buffer, "test" , 4)); |
569 | |
570 | ASSERT_OK(file->Write(data2.data(), data2.size())); |
571 | ASSERT_OK(internal::FileRead(r_, buffer, 4, &bytes_read)); |
572 | ASSERT_EQ(bytes_read, 4); |
573 | ASSERT_EQ(0, std::memcmp(buffer, "data" , 4)); |
574 | |
575 | ASSERT_FALSE(file->closed()); |
576 | ASSERT_OK(file->Close()); |
577 | ASSERT_TRUE(file->closed()); |
578 | ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read)); |
579 | ASSERT_EQ(bytes_read, 1); |
580 | ASSERT_EQ(0, std::memcmp(buffer, "!" , 1)); |
581 | // EOF reached |
582 | ASSERT_OK(internal::FileRead(r_, buffer, 2, &bytes_read)); |
583 | ASSERT_EQ(bytes_read, 0); |
584 | } |
585 | |
586 | TEST_F(TestPipeIO, ReadableFileFails) { |
587 | // ReadableFile fails on non-seekable fd |
588 | std::shared_ptr<ReadableFile> file; |
589 | ASSERT_RAISES(IOError, ReadableFile::Open(r_, &file)); |
590 | } |
591 | |
592 | // ---------------------------------------------------------------------- |
593 | // Memory map tests |
594 | |
595 | class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture { |
596 | public: |
597 | void TearDown() { MemoryMapFixture::TearDown(); } |
598 | }; |
599 | |
600 | TEST_F(TestMemoryMappedFile, InvalidUsages) {} |
601 | |
602 | TEST_F(TestMemoryMappedFile, ZeroSizeFlie) { |
603 | std::string path = "io-memory-map-zero-size" ; |
604 | std::shared_ptr<MemoryMappedFile> result; |
605 | ASSERT_OK(InitMemoryMap(0, path, &result)); |
606 | |
607 | int64_t size = -1; |
608 | ASSERT_OK(result->Tell(&size)); |
609 | ASSERT_EQ(0, size); |
610 | } |
611 | |
612 | TEST_F(TestMemoryMappedFile, WriteRead) { |
613 | const int64_t buffer_size = 1024; |
614 | std::vector<uint8_t> buffer(buffer_size); |
615 | |
616 | random_bytes(1024, 0, buffer.data()); |
617 | |
618 | const int reps = 5; |
619 | |
620 | std::string path = "io-memory-map-write-read-test" ; |
621 | std::shared_ptr<MemoryMappedFile> result; |
622 | ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &result)); |
623 | |
624 | int64_t position = 0; |
625 | std::shared_ptr<Buffer> out_buffer; |
626 | for (int i = 0; i < reps; ++i) { |
627 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
628 | ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer)); |
629 | |
630 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
631 | |
632 | position += buffer_size; |
633 | } |
634 | } |
635 | |
636 | TEST_F(TestMemoryMappedFile, WriteResizeRead) { |
637 | const int64_t buffer_size = 1024; |
638 | const int reps = 5; |
639 | std::vector<std::vector<uint8_t>> buffers(reps); |
640 | for (auto& b : buffers) { |
641 | b.resize(buffer_size); |
642 | random_bytes(buffer_size, 0, b.data()); |
643 | } |
644 | |
645 | std::string path = "io-memory-map-write-read-test" ; |
646 | std::shared_ptr<MemoryMappedFile> result; |
647 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
648 | |
649 | int64_t position = 0; |
650 | std::shared_ptr<Buffer> out_buffer; |
651 | for (int i = 0; i < reps; ++i) { |
652 | if (i != 0) { |
653 | ASSERT_OK(result->Resize(buffer_size * (i + 1))); |
654 | } |
655 | ASSERT_OK(result->Write(buffers[i].data(), buffer_size)); |
656 | ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer)); |
657 | |
658 | ASSERT_EQ(out_buffer->size(), buffer_size); |
659 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffers[i].data(), buffer_size)); |
660 | out_buffer.reset(); |
661 | |
662 | position += buffer_size; |
663 | } |
664 | } |
665 | |
666 | TEST_F(TestMemoryMappedFile, GetConstGetSize) { |
667 | const int64_t buffer_size = 1024; |
668 | std::vector<uint8_t> buffer(buffer_size); |
669 | random_bytes(buffer_size, 0, buffer.data()); |
670 | |
671 | std::string path = "io-memory-map-write-read-test" ; |
672 | std::shared_ptr<MemoryMappedFile> result; |
673 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
674 | |
675 | const auto& const_result = *result; |
676 | int64_t out_size; |
677 | ASSERT_OK(const_result.GetSize(&out_size)); |
678 | ASSERT_EQ(buffer_size, out_size); |
679 | } |
680 | |
681 | TEST_F(TestMemoryMappedFile, ResizeRaisesOnExported) { |
682 | const int64_t buffer_size = 1024; |
683 | std::vector<uint8_t> buffer(buffer_size); |
684 | random_bytes(buffer_size, 0, buffer.data()); |
685 | |
686 | std::string path = "io-memory-map-write-read-test" ; |
687 | std::shared_ptr<MemoryMappedFile> result; |
688 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
689 | |
690 | std::shared_ptr<Buffer> out_buffer1, out_buffer2; |
691 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
692 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer1)); |
693 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer2)); |
694 | ASSERT_EQ(0, memcmp(out_buffer1->data(), buffer.data(), buffer_size)); |
695 | ASSERT_EQ(0, memcmp(out_buffer2->data(), buffer.data(), buffer_size)); |
696 | |
697 | // attempt resize |
698 | ASSERT_RAISES(IOError, result->Resize(2 * buffer_size)); |
699 | |
700 | out_buffer1.reset(); |
701 | |
702 | ASSERT_RAISES(IOError, result->Resize(2 * buffer_size)); |
703 | |
704 | out_buffer2.reset(); |
705 | |
706 | ASSERT_OK(result->Resize(2 * buffer_size)); |
707 | |
708 | int64_t map_size; |
709 | ASSERT_OK(result->GetSize(&map_size)); |
710 | ASSERT_EQ(map_size, 2 * buffer_size); |
711 | |
712 | int64_t file_size; |
713 | ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size)); |
714 | ASSERT_EQ(file_size, buffer_size * 2); |
715 | } |
716 | |
717 | TEST_F(TestMemoryMappedFile, WriteReadZeroInitSize) { |
718 | const int64_t buffer_size = 1024; |
719 | std::vector<uint8_t> buffer(buffer_size); |
720 | random_bytes(buffer_size, 0, buffer.data()); |
721 | |
722 | std::string path = "io-memory-map-write-read-test" ; |
723 | std::shared_ptr<MemoryMappedFile> result; |
724 | ASSERT_OK(InitMemoryMap(0, path, &result)); |
725 | |
726 | std::shared_ptr<Buffer> out_buffer; |
727 | ASSERT_OK(result->Resize(buffer_size)); |
728 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
729 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer)); |
730 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
731 | |
732 | int64_t map_size; |
733 | ASSERT_OK(result->GetSize(&map_size)); |
734 | ASSERT_EQ(map_size, buffer_size); |
735 | } |
736 | |
737 | TEST_F(TestMemoryMappedFile, WriteThenShrink) { |
738 | const int64_t buffer_size = 1024; |
739 | std::vector<uint8_t> buffer(buffer_size); |
740 | random_bytes(buffer_size, 0, buffer.data()); |
741 | |
742 | std::string path = "io-memory-map-write-read-test" ; |
743 | std::shared_ptr<MemoryMappedFile> result; |
744 | ASSERT_OK(InitMemoryMap(buffer_size * 2, path, &result)); |
745 | |
746 | std::shared_ptr<Buffer> out_buffer; |
747 | ASSERT_OK(result->Resize(buffer_size)); |
748 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
749 | ASSERT_OK(result->Resize(buffer_size / 2)); |
750 | |
751 | ASSERT_OK(result->ReadAt(0, buffer_size / 2, &out_buffer)); |
752 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size / 2)); |
753 | |
754 | int64_t map_size; |
755 | ASSERT_OK(result->GetSize(&map_size)); |
756 | ASSERT_EQ(map_size, buffer_size / 2); |
757 | |
758 | int64_t file_size; |
759 | ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size)); |
760 | ASSERT_EQ(file_size, buffer_size / 2); |
761 | } |
762 | |
763 | TEST_F(TestMemoryMappedFile, WriteThenShrinkToHalfThenWrite) { |
764 | const int64_t buffer_size = 1024; |
765 | std::vector<uint8_t> buffer(buffer_size); |
766 | random_bytes(buffer_size, 0, buffer.data()); |
767 | |
768 | std::string path = "io-memory-map-write-read-test" ; |
769 | std::shared_ptr<MemoryMappedFile> result; |
770 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
771 | |
772 | std::shared_ptr<Buffer> out_buffer; |
773 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
774 | ASSERT_OK(result->Resize(buffer_size / 2)); |
775 | |
776 | int64_t position; |
777 | ASSERT_OK(result->Tell(&position)); |
778 | ASSERT_EQ(position, buffer_size / 2); |
779 | |
780 | ASSERT_OK(result->ReadAt(0, buffer_size / 2, &out_buffer)); |
781 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size / 2)); |
782 | out_buffer.reset(); |
783 | |
784 | // should resume writing directly at the seam |
785 | ASSERT_OK(result->Resize(buffer_size)); |
786 | ASSERT_OK(result->Write(buffer.data() + buffer_size / 2, buffer_size / 2)); |
787 | |
788 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer)); |
789 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
790 | |
791 | int64_t map_size; |
792 | ASSERT_OK(result->GetSize(&map_size)); |
793 | ASSERT_EQ(map_size, buffer_size); |
794 | |
795 | int64_t file_size; |
796 | ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size)); |
797 | ASSERT_EQ(file_size, buffer_size); |
798 | } |
799 | |
800 | TEST_F(TestMemoryMappedFile, ResizeToZeroThanWrite) { |
801 | const int64_t buffer_size = 1024; |
802 | std::vector<uint8_t> buffer(buffer_size); |
803 | random_bytes(buffer_size, 0, buffer.data()); |
804 | |
805 | std::string path = "io-memory-map-write-read-test" ; |
806 | std::shared_ptr<MemoryMappedFile> result; |
807 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
808 | |
809 | std::shared_ptr<Buffer> out_buffer; |
810 | // just a sanity check that writing works ook |
811 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
812 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer)); |
813 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
814 | out_buffer.reset(); |
815 | |
816 | ASSERT_OK(result->Resize(0)); |
817 | int64_t mapped_size; |
818 | ASSERT_OK(result->GetSize(&mapped_size)); |
819 | ASSERT_EQ(mapped_size, 0); |
820 | |
821 | int64_t position; |
822 | ASSERT_OK(result->Tell(&position)); |
823 | ASSERT_EQ(position, 0); |
824 | |
825 | int64_t file_size; |
826 | ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size)); |
827 | ASSERT_EQ(file_size, 0); |
828 | |
829 | // provision a vector to the buffer size in case ReadAt decides |
830 | // to read even though it shouldn't |
831 | std::vector<uint8_t> should_remain_empty(buffer_size); |
832 | int64_t bytes_read; |
833 | ASSERT_OK(result->ReadAt(0, 1, &bytes_read, |
834 | reinterpret_cast<void*>(should_remain_empty.data()))); |
835 | ASSERT_EQ(bytes_read, 0); |
836 | |
837 | // just a sanity check that writing works ook |
838 | ASSERT_OK(result->Resize(buffer_size)); |
839 | ASSERT_OK(result->Write(buffer.data(), buffer_size)); |
840 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer)); |
841 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
842 | } |
843 | |
844 | TEST_F(TestMemoryMappedFile, WriteAt) { |
845 | const int64_t buffer_size = 1024; |
846 | std::vector<uint8_t> buffer(buffer_size); |
847 | random_bytes(buffer_size, 0, buffer.data()); |
848 | |
849 | std::string path = "io-memory-map-write-read-test" ; |
850 | std::shared_ptr<MemoryMappedFile> result; |
851 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
852 | |
853 | ASSERT_OK(result->WriteAt(0, buffer.data(), buffer_size / 2)); |
854 | |
855 | ASSERT_OK( |
856 | result->WriteAt(buffer_size / 2, buffer.data() + buffer_size / 2, buffer_size / 2)); |
857 | |
858 | std::shared_ptr<Buffer> out_buffer; |
859 | ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer)); |
860 | |
861 | ASSERT_EQ(memcmp(out_buffer->data(), buffer.data(), buffer_size), 0); |
862 | } |
863 | |
864 | TEST_F(TestMemoryMappedFile, WriteBeyondEnd) { |
865 | const int64_t buffer_size = 1024; |
866 | std::vector<uint8_t> buffer(buffer_size); |
867 | random_bytes(buffer_size, 0, buffer.data()); |
868 | |
869 | std::string path = "io-memory-map-write-read-test" ; |
870 | std::shared_ptr<MemoryMappedFile> result; |
871 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
872 | |
873 | ASSERT_OK(result->Seek(1)); |
874 | ASSERT_RAISES(Invalid, result->Write(buffer.data(), buffer_size)); |
875 | |
876 | // The position should remain unchanged afterwards |
877 | int64_t position; |
878 | ASSERT_OK(result->Tell(&position)); |
879 | ASSERT_EQ(position, 1); |
880 | } |
881 | |
882 | TEST_F(TestMemoryMappedFile, WriteAtBeyondEnd) { |
883 | const int64_t buffer_size = 1024; |
884 | std::vector<uint8_t> buffer(buffer_size); |
885 | random_bytes(buffer_size, 0, buffer.data()); |
886 | |
887 | std::string path = "io-memory-map-write-read-test" ; |
888 | std::shared_ptr<MemoryMappedFile> result; |
889 | ASSERT_OK(InitMemoryMap(buffer_size, path, &result)); |
890 | |
891 | ASSERT_RAISES(Invalid, result->WriteAt(1, buffer.data(), buffer_size)); |
892 | |
893 | // The position should remain unchanged afterwards |
894 | int64_t position; |
895 | ASSERT_OK(result->Tell(&position)); |
896 | ASSERT_EQ(position, 0); |
897 | } |
898 | |
899 | TEST_F(TestMemoryMappedFile, GetSize) { |
900 | std::string path = "io-memory-map-get-size" ; |
901 | std::shared_ptr<MemoryMappedFile> result; |
902 | ASSERT_OK(InitMemoryMap(16384, path, &result)); |
903 | |
904 | int64_t size = -1; |
905 | ASSERT_OK(result->GetSize(&size)); |
906 | ASSERT_EQ(16384, size); |
907 | |
908 | int64_t position = -1; |
909 | ASSERT_OK(result->Tell(&position)); |
910 | ASSERT_EQ(0, position); |
911 | } |
912 | |
913 | TEST_F(TestMemoryMappedFile, ReadOnly) { |
914 | const int64_t buffer_size = 1024; |
915 | std::vector<uint8_t> buffer(buffer_size); |
916 | |
917 | random_bytes(1024, 0, buffer.data()); |
918 | |
919 | const int reps = 5; |
920 | |
921 | std::string path = "ipc-read-only-test" ; |
922 | std::shared_ptr<MemoryMappedFile> rwmmap; |
923 | ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &rwmmap)); |
924 | |
925 | int64_t position = 0; |
926 | for (int i = 0; i < reps; ++i) { |
927 | ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size)); |
928 | position += buffer_size; |
929 | } |
930 | ASSERT_OK(rwmmap->Close()); |
931 | |
932 | std::shared_ptr<MemoryMappedFile> rommap; |
933 | ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); |
934 | |
935 | position = 0; |
936 | std::shared_ptr<Buffer> out_buffer; |
937 | for (int i = 0; i < reps; ++i) { |
938 | ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer)); |
939 | |
940 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
941 | position += buffer_size; |
942 | } |
943 | ASSERT_OK(rommap->Close()); |
944 | } |
945 | |
946 | TEST_F(TestMemoryMappedFile, DISABLED_ReadWriteOver4GbFile) { |
947 | // ARROW-1096 |
948 | const int64_t buffer_size = 1000 * 1000; |
949 | std::vector<uint8_t> buffer(buffer_size); |
950 | |
951 | random_bytes(buffer_size, 0, buffer.data()); |
952 | |
953 | const int64_t reps = 5000; |
954 | |
955 | std::string path = "ipc-read-over-4gb-file-test" ; |
956 | std::shared_ptr<MemoryMappedFile> rwmmap; |
957 | ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &rwmmap)); |
958 | AppendFile(path); |
959 | |
960 | int64_t position = 0; |
961 | for (int i = 0; i < reps; ++i) { |
962 | ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size)); |
963 | position += buffer_size; |
964 | } |
965 | ASSERT_OK(rwmmap->Close()); |
966 | |
967 | std::shared_ptr<MemoryMappedFile> rommap; |
968 | ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); |
969 | |
970 | position = 0; |
971 | std::shared_ptr<Buffer> out_buffer; |
972 | for (int i = 0; i < reps; ++i) { |
973 | ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer)); |
974 | |
975 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
976 | position += buffer_size; |
977 | } |
978 | ASSERT_OK(rommap->Close()); |
979 | } |
980 | |
981 | TEST_F(TestMemoryMappedFile, RetainMemoryMapReference) { |
982 | // ARROW-494 |
983 | |
984 | const int64_t buffer_size = 1024; |
985 | std::vector<uint8_t> buffer(buffer_size); |
986 | |
987 | random_bytes(1024, 0, buffer.data()); |
988 | |
989 | std::string path = "ipc-read-only-test" ; |
990 | CreateFile(path, buffer_size); |
991 | |
992 | { |
993 | std::shared_ptr<MemoryMappedFile> rwmmap; |
994 | ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap)); |
995 | ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size)); |
996 | ASSERT_FALSE(rwmmap->closed()); |
997 | ASSERT_OK(rwmmap->Close()); |
998 | ASSERT_TRUE(rwmmap->closed()); |
999 | } |
1000 | |
1001 | std::shared_ptr<Buffer> out_buffer; |
1002 | |
1003 | { |
1004 | std::shared_ptr<MemoryMappedFile> rommap; |
1005 | ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); |
1006 | ASSERT_OK(rommap->Read(buffer_size, &out_buffer)); |
1007 | ASSERT_FALSE(rommap->closed()); |
1008 | ASSERT_OK(rommap->Close()); |
1009 | ASSERT_TRUE(rommap->closed()); |
1010 | } |
1011 | |
1012 | // valgrind will catch if memory is unmapped |
1013 | ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); |
1014 | } |
1015 | |
1016 | TEST_F(TestMemoryMappedFile, InvalidMode) { |
1017 | const int64_t buffer_size = 1024; |
1018 | std::vector<uint8_t> buffer(buffer_size); |
1019 | |
1020 | random_bytes(1024, 0, buffer.data()); |
1021 | |
1022 | std::string path = "ipc-invalid-mode-test" ; |
1023 | CreateFile(path, buffer_size); |
1024 | |
1025 | std::shared_ptr<MemoryMappedFile> rommap; |
1026 | ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); |
1027 | |
1028 | ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size)); |
1029 | } |
1030 | |
1031 | TEST_F(TestMemoryMappedFile, InvalidFile) { |
1032 | std::string non_existent_path = "invalid-file-name-asfd" ; |
1033 | |
1034 | std::shared_ptr<MemoryMappedFile> result; |
1035 | ASSERT_RAISES(IOError, |
1036 | MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result)); |
1037 | } |
1038 | |
1039 | TEST_F(TestMemoryMappedFile, CastableToFileInterface) { |
1040 | std::shared_ptr<MemoryMappedFile> memory_mapped_file; |
1041 | std::shared_ptr<FileInterface> file = memory_mapped_file; |
1042 | } |
1043 | |
1044 | TEST_F(TestMemoryMappedFile, ThreadSafety) { |
1045 | std::string data = "foobar" ; |
1046 | std::string path = "ipc-multithreading-test" ; |
1047 | CreateFile(path, static_cast<int>(data.size())); |
1048 | |
1049 | std::shared_ptr<MemoryMappedFile> file; |
1050 | ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &file)); |
1051 | ASSERT_OK(file->Write(data.c_str(), static_cast<int64_t>(data.size()))); |
1052 | |
1053 | std::atomic<int> correct_count(0); |
1054 | int niter = 10000; |
1055 | |
1056 | auto ReadData = [&correct_count, &data, &file, &niter]() { |
1057 | std::shared_ptr<Buffer> buffer; |
1058 | |
1059 | for (int i = 0; i < niter; ++i) { |
1060 | ASSERT_OK(file->ReadAt(0, 3, &buffer)); |
1061 | if (0 == memcmp(data.c_str(), buffer->data(), 3)) { |
1062 | correct_count += 1; |
1063 | } |
1064 | } |
1065 | }; |
1066 | |
1067 | std::thread thread1(ReadData); |
1068 | std::thread thread2(ReadData); |
1069 | |
1070 | thread1.join(); |
1071 | thread2.join(); |
1072 | |
1073 | ASSERT_EQ(niter * 2, correct_count); |
1074 | } |
1075 | |
1076 | } // namespace io |
1077 | } // namespace arrow |
1078 | |