1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cstdint>
8#include <string>
9#include <libaio.h>
10#include <sys/types.h>
11#include <sys/stat.h>
12#include <unistd.h>
13
14#include "../core/async.h"
15#include "../core/status.h"
16#include "file_common.h"
17
18namespace FASTER {
19namespace environment {
20
21constexpr const char* kPathSeparator = "/";
22
23/// The File class encapsulates the OS file handle.
24class File {
25 protected:
26 File()
27 : fd_{ -1 }
28 , device_alignment_{ 0 }
29 , filename_{}
30 , owner_{ false }
31#ifdef IO_STATISTICS
32 , bytes_written_ { 0 }
33 , read_count_{ 0 }
34 , bytes_read_{ 0 }
35#endif
36 {
37 }
38
39 File(const std::string& filename)
40 : fd_{ -1 }
41 , device_alignment_{ 0 }
42 , filename_{ filename }
43 , owner_{ false }
44#ifdef IO_STATISTICS
45 , bytes_written_ { 0 }
46 , read_count_{ 0 }
47 , bytes_read_{ 0 }
48#endif
49 {
50 }
51
52 /// Move constructor.
53 File(File&& other)
54 : fd_{ other.fd_ }
55 , device_alignment_{ other.device_alignment_ }
56 , filename_{ std::move(other.filename_) }
57 , owner_{ other.owner_ }
58#ifdef IO_STATISTICS
59 , bytes_written_ { other.bytes_written_ }
60 , read_count_{ other.read_count_ }
61 , bytes_read_{ other.bytes_read_ }
62#endif
63 {
64 other.owner_ = false;
65 }
66
67 ~File() {
68 if(owner_) {
69 Status s = Close();
70 }
71 }
72
73 /// Move assignment operator.
74 File& operator=(File&& other) {
75 fd_ = other.fd_;
76 device_alignment_ = other.device_alignment_;
77 filename_ = std::move(other.filename_);
78 owner_ = other.owner_;
79#ifdef IO_STATISTICS
80 bytes_written_ = other.bytes_written_;
81 read_count_ = other.read_count_;
82 bytes_read_ = other.bytes_read_;
83#endif
84 other.owner_ = false;
85 return *this;
86 }
87
88 protected:
89 Status Open(int flags, FileCreateDisposition create_disposition, bool* exists = nullptr);
90
91 public:
92 Status Close();
93 Status Delete();
94
95 uint64_t size() const {
96 struct stat stat_buffer;
97 int result = ::fstat(fd_, &stat_buffer);
98 return (result == 0) ? stat_buffer.st_size : 0;
99 }
100
101 size_t device_alignment() const {
102 return device_alignment_;
103 }
104
105 const std::string& filename() const {
106 return filename_;
107 }
108
109#ifdef IO_STATISTICS
110 uint64_t bytes_written() const {
111 return bytes_written_.load();
112 }
113 uint64_t read_count() const {
114 return read_count_.load();
115 }
116 uint64_t bytes_read() const {
117 return bytes_read_.load();
118 }
119#endif
120
121 private:
122 Status GetDeviceAlignment();
123 static int GetCreateDisposition(FileCreateDisposition create_disposition);
124
125 protected:
126 int fd_;
127
128 private:
129 size_t device_alignment_;
130 std::string filename_;
131 bool owner_;
132
133#ifdef IO_STATISTICS
134 protected:
135 std::atomic<uint64_t> bytes_written_;
136 std::atomic<uint64_t> read_count_;
137 std::atomic<uint64_t> bytes_read_;
138#endif
139};
140
141class QueueFile;
142
143/// The QueueIoHandler class encapsulates completions for async file I/O, where the completions
144/// are put on the AIO completion queue.
145class QueueIoHandler {
146 public:
147 typedef QueueFile async_file_t;
148
149 private:
150 constexpr static int kMaxEvents = 128;
151
152 public:
153 QueueIoHandler()
154 : io_object_{ 0 } {
155 }
156 QueueIoHandler(size_t max_threads)
157 : io_object_{ 0 } {
158 int result = ::io_setup(kMaxEvents, &io_object_);
159 assert(result >= 0);
160 }
161
162 /// Move constructor
163 QueueIoHandler(QueueIoHandler&& other) {
164 io_object_ = other.io_object_;
165 other.io_object_ = 0;
166 }
167
168 ~QueueIoHandler() {
169 if(io_object_ != 0)
170 ::io_destroy(io_object_);
171 }
172
173 /// Invoked whenever a Linux AIO completes.
174 static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2);
175
176 struct IoCallbackContext {
177 IoCallbackContext(FileOperationType operation, int fd, size_t offset, uint32_t length,
178 uint8_t* buffer, IAsyncContext* context_, AsyncIOCallback callback_)
179 : caller_context{ context_ }
180 , callback{ callback_ } {
181 if(FileOperationType::Read == operation) {
182 ::io_prep_pread(&this->parent_iocb, fd, buffer, length, offset);
183 } else {
184 ::io_prep_pwrite(&this->parent_iocb, fd, buffer, length, offset);
185 }
186 ::io_set_callback(&this->parent_iocb, IoCompletionCallback);
187 }
188
189 // WARNING: "parent_iocb" must be the first field in AioCallbackContext. This class is a C-style
190 // subclass of "struct iocb".
191
192 /// The iocb structure for Linux AIO.
193 struct iocb parent_iocb;
194
195 /// Caller callback context.
196 IAsyncContext* caller_context;
197
198 /// The caller's asynchronous callback function
199 AsyncIOCallback callback;
200 };
201
202 inline io_context_t io_object() const {
203 return io_object_;
204 }
205
206 /// Try to execute the next IO completion on the queue, if any.
207 bool TryComplete();
208
209 private:
210 /// The Linux AIO context used for IO completions.
211 io_context_t io_object_;
212};
213
214/// The QueueFile class encapsulates asynchronous reads and writes, using the specified AIO
215/// context.
216class QueueFile : public File {
217 public:
218 QueueFile()
219 : File()
220 , io_object_{ nullptr } {
221 }
222 QueueFile(const std::string& filename)
223 : File(filename)
224 , io_object_{ nullptr } {
225 }
226 /// Move constructor
227 QueueFile(QueueFile&& other)
228 : File(std::move(other))
229 , io_object_{ other.io_object_ } {
230 }
231 /// Move assignment operator.
232 QueueFile& operator=(QueueFile&& other) {
233 File::operator=(std::move(other));
234 io_object_ = other.io_object_;
235 return *this;
236 }
237
238 Status Open(FileCreateDisposition create_disposition, const FileOptions& options,
239 QueueIoHandler* handler, bool* exists = nullptr);
240
241 Status Read(size_t offset, uint32_t length, uint8_t* buffer,
242 IAsyncContext& context, AsyncIOCallback callback) const;
243 Status Write(size_t offset, uint32_t length, const uint8_t* buffer,
244 IAsyncContext& context, AsyncIOCallback callback);
245
246 private:
247 Status ScheduleOperation(FileOperationType operationType, uint8_t* buffer, size_t offset,
248 uint32_t length, IAsyncContext& context, AsyncIOCallback callback);
249
250 io_context_t io_object_;
251};
252
253}
254} // namespace FASTER::environment
255