1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#include <cstring>
5#include <sys/ioctl.h>
6#include <linux/fs.h>
7#include <errno.h>
8#include <fcntl.h>
9#include <libgen.h>
10#include <stdio.h>
11#include <time.h>
12#include "file_linux.h"
13
14namespace FASTER {
15namespace environment {
16
17#ifdef _DEBUG
18#define DCHECK_ALIGNMENT(o, l, b) \
19do { \
20 assert(reinterpret_cast<uintptr_t>(b) % device_alignment() == 0); \
21 assert((o) % device_alignment() == 0); \
22 assert((l) % device_alignment() == 0); \
23} while (0)
24#else
25#define DCHECK_ALIGNMENT(o, l, b) do {} while(0)
26#endif
27
28Status File::Open(int flags, FileCreateDisposition create_disposition, bool* exists) {
29 if(exists) {
30 *exists = false;
31 }
32
33 int create_flags = GetCreateDisposition(create_disposition);
34
35 /// Always unbuffered (O_DIRECT).
36 fd_ = ::open(filename_.c_str(), flags | O_RDWR | create_flags, S_IRUSR | S_IWUSR);
37
38 if(exists) {
39 // Let the caller know whether the file we tried to open or create (already) exists.
40 if(create_disposition == FileCreateDisposition::CreateOrTruncate ||
41 create_disposition == FileCreateDisposition::OpenOrCreate) {
42 *exists = (errno == EEXIST);
43 } else if(create_disposition == FileCreateDisposition::OpenExisting) {
44 *exists = (errno != ENOENT);
45 if(!*exists) {
46 // The file doesn't exist. Don't return an error, since the caller is expecting this case.
47 return Status::Ok;
48 }
49 }
50 }
51 if(fd_ == -1) {
52 int error = errno;
53 return Status::IOError;
54 }
55
56 Status result = GetDeviceAlignment();
57 if(result != Status::Ok) {
58 Close();
59 }
60 owner_ = true;
61 return result;
62}
63
64Status File::Close() {
65 if(fd_ != -1) {
66 int result = ::close(fd_);
67 fd_ = -1;
68 if(result == -1) {
69 int error = errno;
70 return Status::IOError;
71 }
72 }
73 owner_ = false;
74 return Status::Ok;
75}
76
77Status File::Delete() {
78 int result = ::remove(filename_.c_str());
79 if(result == -1) {
80 int error = errno;
81 return Status::IOError;
82 }
83 return Status::Ok;
84}
85
86Status File::GetDeviceAlignment() {
87 // For now, just hardcode 512-byte alignment.
88 device_alignment_ = 512;
89 return Status::Ok;
90}
91
92int File::GetCreateDisposition(FileCreateDisposition create_disposition) {
93 switch(create_disposition) {
94 case FileCreateDisposition::CreateOrTruncate:
95 return O_CREAT | O_TRUNC;
96 case FileCreateDisposition::OpenOrCreate:
97 return O_CREAT;
98 case FileCreateDisposition::OpenExisting:
99 return 0;
100 default:
101 assert(false);
102 return 0; // not reached
103 }
104}
105
106void QueueIoHandler::IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res,
107 long res2) {
108 auto callback_context = make_context_unique_ptr<IoCallbackContext>(
109 reinterpret_cast<IoCallbackContext*>(iocb));
110 size_t bytes_transferred;
111 Status return_status;
112 if(res < 0) {
113 return_status = Status::IOError;
114 bytes_transferred = 0;
115 } else {
116 return_status = Status::Ok;
117 bytes_transferred = res;
118 }
119 callback_context->callback(callback_context->caller_context, return_status, bytes_transferred);
120}
121
122bool QueueIoHandler::TryComplete() {
123 struct timespec timeout;
124 std::memset(&timeout, 0, sizeof(timeout));
125 struct io_event events[1];
126 int result = ::io_getevents(io_object_, 1, 1, events, &timeout);
127 if(result == 1) {
128 io_callback_t callback = reinterpret_cast<io_callback_t>(events[0].data);
129 callback(io_object_, events[0].obj, events[0].res, events[0].res2);
130 return true;
131 } else {
132 return false;
133 }
134}
135
136Status QueueFile::Open(FileCreateDisposition create_disposition, const FileOptions& options,
137 QueueIoHandler* handler, bool* exists) {
138 int flags = 0;
139 if(options.unbuffered) {
140 flags |= O_DIRECT;
141 }
142 RETURN_NOT_OK(File::Open(flags, create_disposition, exists));
143 if(exists && !*exists) {
144 return Status::Ok;
145 }
146
147 io_object_ = handler->io_object();
148 return Status::Ok;
149}
150
151Status QueueFile::Read(size_t offset, uint32_t length, uint8_t* buffer,
152 IAsyncContext& context, AsyncIOCallback callback) const {
153 DCHECK_ALIGNMENT(offset, length, buffer);
154#ifdef IO_STATISTICS
155 ++read_count_;
156 bytes_read_ += length;
157#endif
158 return const_cast<QueueFile*>(this)->ScheduleOperation(FileOperationType::Read, buffer,
159 offset, length, context, callback);
160}
161
162Status QueueFile::Write(size_t offset, uint32_t length, const uint8_t* buffer,
163 IAsyncContext& context, AsyncIOCallback callback) {
164 DCHECK_ALIGNMENT(offset, length, buffer);
165#ifdef IO_STATISTICS
166 bytes_written_ += length;
167#endif
168 return ScheduleOperation(FileOperationType::Write, const_cast<uint8_t*>(buffer), offset, length,
169 context, callback);
170}
171
172Status QueueFile::ScheduleOperation(FileOperationType operationType, uint8_t* buffer,
173 size_t offset, uint32_t length, IAsyncContext& context,
174 AsyncIOCallback callback) {
175 auto io_context = alloc_context<QueueIoHandler::IoCallbackContext>(sizeof(
176 QueueIoHandler::IoCallbackContext));
177 if(!io_context.get()) return Status::OutOfMemory;
178
179 IAsyncContext* caller_context_copy;
180 RETURN_NOT_OK(context.DeepCopy(caller_context_copy));
181
182 new(io_context.get()) QueueIoHandler::IoCallbackContext(operationType, fd_, offset, length,
183 buffer, caller_context_copy, callback);
184
185 struct iocb* iocbs[1];
186 iocbs[0] = reinterpret_cast<struct iocb*>(io_context.get());
187
188 int result = ::io_submit(io_object_, 1, iocbs);
189 if(result != 1) {
190 return Status::IOError;
191 }
192
193 io_context.release();
194 return Status::Ok;
195}
196
197#undef DCHECK_ALIGNMENT
198
199}
200} // namespace FASTER::environment
201