1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <cstdint>
7#include <experimental/filesystem>
8#include <mutex>
9#include <string>
10
11#include "../core/gc_state.h"
12#include "../core/guid.h"
13#include "../core/light_epoch.h"
14#include "../core/utility.h"
15#include "../environment/file.h"
16
17/// Wrapper that exposes files to FASTER. Encapsulates segmented files, etc.
18
19namespace FASTER {
20namespace device {
21
22template <class H, uint64_t S>
23class FileSystemDisk;
24
25template <class H>
26class FileSystemFile {
27 public:
28 typedef H handler_t;
29 typedef typename handler_t::async_file_t file_t;
30
31 /// Default constructor
32 FileSystemFile()
33 : file_{}
34 , file_options_{} {
35 }
36
37 FileSystemFile(const std::string& filename, const environment::FileOptions& file_options)
38 : file_{ filename }
39 , file_options_{ file_options } {
40 }
41
42 /// Move constructor.
43 FileSystemFile(FileSystemFile&& other)
44 : file_{ std::move(other.file_) }
45 , file_options_{ other.file_options_ } {
46 }
47
48 /// Move assignment operator.
49 FileSystemFile& operator=(FileSystemFile&& other) {
50 file_ = std::move(other.file_);
51 file_options_ = other.file_options_;
52 return *this;
53 }
54
55 Status Open(handler_t* handler) {
56 return file_.Open(FASTER::environment::FileCreateDisposition::OpenOrCreate, file_options_,
57 handler, nullptr);
58 }
59 Status Close() {
60 return file_.Close();
61 }
62 Status Delete() {
63 return file_.Delete();
64 }
65 void Truncate(uint64_t new_begin_offset, GcState::truncate_callback_t callback) {
66 // Truncation is a no-op.
67 if(callback) {
68 callback(new_begin_offset);
69 }
70 }
71
72 Status ReadAsync(uint64_t source, void* dest, uint32_t length,
73 AsyncIOCallback callback, IAsyncContext& context) const {
74 return file_.Read(source, length, reinterpret_cast<uint8_t*>(dest), context, callback);
75 }
76 Status WriteAsync(const void* source, uint64_t dest, uint32_t length,
77 AsyncIOCallback callback, IAsyncContext& context) {
78 return file_.Write(dest, length, reinterpret_cast<const uint8_t*>(source), context, callback);
79 }
80
81 size_t alignment() const {
82 return file_.device_alignment();
83 }
84
85 private:
86 file_t file_;
87 environment::FileOptions file_options_;
88};
89
90/// Manages a bundle of segment files.
91template <class H>
92class FileSystemSegmentBundle {
93 public:
94 typedef H handler_t;
95 typedef FileSystemFile<handler_t> file_t;
96 typedef FileSystemSegmentBundle<handler_t> bundle_t;
97
98 FileSystemSegmentBundle(const std::string& filename,
99 const environment::FileOptions& file_options, handler_t* handler,
100 uint64_t begin_segment_, uint64_t end_segment_)
101 : filename_{ filename }
102 , file_options_{ file_options }
103 , begin_segment{ begin_segment_ }
104 , end_segment{ end_segment_ }
105 , owner_{ true } {
106 for(uint64_t idx = begin_segment; idx < end_segment; ++idx) {
107 new(files() + (idx - begin_segment)) file_t{ filename_ + std::to_string(idx),
108 file_options_ };
109 Status result = file(idx).Open(handler);
110 assert(result == Status::Ok);
111 }
112 }
113
114 FileSystemSegmentBundle(handler_t* handler, uint64_t begin_segment_, uint64_t end_segment_,
115 bundle_t& other)
116 : filename_{ std::move(other.filename_) }
117 , file_options_{ other.file_options_ }
118 , begin_segment{ begin_segment_ }
119 , end_segment{ end_segment_ }
120 , owner_{ true } {
121 assert(end_segment >= other.end_segment);
122
123 uint64_t begin_new = begin_segment;
124 uint64_t begin_copy = std::max(begin_segment, other.begin_segment);
125 uint64_t end_copy = std::min(end_segment, other.end_segment);
126 uint64_t end_new = end_segment;
127
128 for(uint64_t idx = begin_segment; idx < begin_copy; ++idx) {
129 new(files() + (idx - begin_segment)) file_t{ filename_ + std::to_string(idx),
130 file_options_ };
131 Status result = file(idx).Open(handler);
132 assert(result == Status::Ok);
133 }
134 for(uint64_t idx = begin_copy; idx < end_copy; ++idx) {
135 // Move file handles for segments already opened.
136 new(files() + (idx - begin_segment)) file_t{ std::move(other.file(idx)) };
137 }
138 for(uint64_t idx = end_copy; idx < end_new; ++idx) {
139 new(files() + (idx - begin_segment)) file_t{ filename_ + std::to_string(idx),
140 file_options_ };
141 Status result = file(idx).Open(handler);
142 assert(result == Status::Ok);
143 }
144
145 other.owner_ = false;
146 }
147
148 ~FileSystemSegmentBundle() {
149 if(owner_) {
150 for(uint64_t idx = begin_segment; idx < end_segment; ++idx) {
151 file(idx).~file_t();
152 }
153 }
154 }
155
156 Status Close() {
157 assert(owner_);
158 Status result = Status::Ok;
159 for(uint64_t idx = begin_segment; idx < end_segment; ++idx) {
160 Status r = file(idx).Close();
161 if(r != Status::Ok) {
162 // We'll report the last error.
163 result = r;
164 }
165 }
166 return result;
167 }
168
169 Status Delete() {
170 assert(owner_);
171 Status result = Status::Ok;
172 for(uint64_t idx = begin_segment; idx < end_segment; ++idx) {
173 Status r = file(idx).Delete();
174 if(r != Status::Ok) {
175 // We'll report the last error.
176 result = r;
177 }
178 }
179 return result;
180 }
181
182 file_t* files() {
183 return reinterpret_cast<file_t*>(this + 1);
184 }
185 file_t& file(uint64_t segment) {
186 assert(segment >= begin_segment);
187 return files()[segment - begin_segment];
188 }
189 bool exists(uint64_t segment) const {
190 return segment >= begin_segment && segment < end_segment;
191 }
192
193 static constexpr uint64_t size(uint64_t num_segments) {
194 return sizeof(bundle_t) + num_segments * sizeof(file_t);
195 }
196
197 public:
198 const uint64_t begin_segment;
199 const uint64_t end_segment;
200 private:
201 std::string filename_;
202 environment::FileOptions file_options_;
203 bool owner_;
204};
205
206template <class H, uint64_t S>
207class FileSystemSegmentedFile {
208 public:
209 typedef H handler_t;
210 typedef FileSystemFile<H> file_t;
211 typedef FileSystemSegmentBundle<handler_t> bundle_t;
212
213 static constexpr uint64_t kSegmentSize = S;
214 static_assert(Utility::IsPowerOfTwo(S), "template parameter S is not a power of two!");
215
216 FileSystemSegmentedFile(const std::string& filename,
217 const environment::FileOptions& file_options, LightEpoch* epoch)
218 : begin_segment_{ 0 }
219 , files_{ nullptr }
220 , handler_{ nullptr }
221 , filename_{ filename }
222 , file_options_{ file_options }
223 , epoch_{ epoch } {
224 }
225
226 ~FileSystemSegmentedFile() {
227 bundle_t* files = files_.load();
228 if(files) {
229 files->~bundle_t();
230 std::free(files);
231 }
232 }
233
234 Status Open(handler_t* handler) {
235 handler_ = handler;
236 return Status::Ok;
237 }
238 Status Close() {
239 return (files_) ? files_->Close() : Status::Ok;
240 }
241 Status Delete() {
242 return (files_) ? files_->Delete() : Status::Ok;
243 }
244 void Truncate(uint64_t new_begin_offset, GcState::truncate_callback_t callback) {
245 uint64_t new_begin_segment = new_begin_offset / kSegmentSize;
246 begin_segment_ = new_begin_segment;
247 TruncateSegments(new_begin_segment, callback);
248 }
249
250 Status ReadAsync(uint64_t source, void* dest, uint32_t length, AsyncIOCallback callback,
251 IAsyncContext& context) const {
252 uint64_t segment = source / kSegmentSize;
253 assert(source % kSegmentSize + length <= kSegmentSize);
254
255 bundle_t* files = files_.load();
256
257 if(!files || !files->exists(segment)) {
258 Status result = const_cast<FileSystemSegmentedFile<H, S>*>(this)->OpenSegment(segment);
259 if(result != Status::Ok) {
260 return result;
261 }
262 files = files_.load();
263 }
264 return files->file(segment).ReadAsync(source % kSegmentSize, dest, length, callback, context);
265 }
266
267 Status WriteAsync(const void* source, uint64_t dest, uint32_t length,
268 AsyncIOCallback callback, IAsyncContext& context) {
269 uint64_t segment = dest / kSegmentSize;
270 assert(dest % kSegmentSize + length <= kSegmentSize);
271
272 bundle_t* files = files_.load();
273
274 if(!files || !files->exists(segment)) {
275 Status result = OpenSegment(segment);
276 if(result != Status::Ok) {
277 return result;
278 }
279 files = files_.load();
280 }
281 return files->file(segment).WriteAsync(source, dest % kSegmentSize, length, callback, context);
282 }
283
284 size_t alignment() const {
285 return 512; // For now, assume all disks have 512-bytes alignment.
286 }
287
288 private:
289 Status OpenSegment(uint64_t segment) {
290 class Context : public IAsyncContext {
291 public:
292 Context(void* files_)
293 : files{ files_ } {
294 }
295 /// The deep-copy constructor.
296 Context(const Context& other)
297 : files{ other.files} {
298 }
299 protected:
300 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
301 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
302 }
303 public:
304 void* files;
305 };
306
307 auto callback = [](IAsyncContext* ctxt) {
308 CallbackContext<Context> context{ ctxt };
309 std::free(context->files);
310 };
311
312 // Only one thread can modify the list of files at a given time.
313 std::lock_guard<std::mutex> lock{ mutex_ };
314 bundle_t* files = files_.load();
315
316 if(segment < begin_segment_) {
317 // The requested segment has been truncated.
318 return Status::IOError;
319 }
320 if(files && files->exists(segment)) {
321 // Some other thread already opened this segment for us.
322 return Status::Ok;
323 }
324
325 if(!files) {
326 // First segment opened.
327 void* buffer = std::malloc(bundle_t::size(1));
328 bundle_t* new_files = new(buffer) bundle_t{ filename_, file_options_, handler_,
329 segment, segment + 1 };
330 files_.store(new_files);
331 return Status::Ok;
332 }
333
334 // Expand the list of files_.
335 uint64_t new_begin_segment = std::min(files->begin_segment, segment);
336 uint64_t new_end_segment = std::max(files->end_segment, segment + 1);
337 void* buffer = std::malloc(bundle_t::size(new_end_segment - new_begin_segment));
338 bundle_t* new_files = new(buffer) bundle_t{ handler_, new_begin_segment, new_end_segment,
339 *files };
340 files_.store(new_files);
341 // Delete the old list only after all threads have finished looking at it.
342 Context context{ files };
343 IAsyncContext* context_copy;
344 Status result = context.DeepCopy(context_copy);
345 assert(result == Status::Ok);
346 epoch_->BumpCurrentEpoch(callback, context_copy);
347 return Status::Ok;
348 }
349
350 void TruncateSegments(uint64_t new_begin_segment, GcState::truncate_callback_t caller_callback) {
351 class Context : public IAsyncContext {
352 public:
353 Context(bundle_t* files_, uint64_t new_begin_segment_,
354 GcState::truncate_callback_t caller_callback_)
355 : files{ files_ }
356 , new_begin_segment{ new_begin_segment_ }
357 , caller_callback{ caller_callback_ } {
358 }
359 /// The deep-copy constructor.
360 Context(const Context& other)
361 : files{ other.files }
362 , new_begin_segment{ other.new_begin_segment }
363 , caller_callback{ other.caller_callback } {
364 }
365 protected:
366 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
367 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
368 }
369 public:
370 bundle_t* files;
371 uint64_t new_begin_segment;
372 GcState::truncate_callback_t caller_callback;
373 };
374
375 auto callback = [](IAsyncContext* ctxt) {
376 CallbackContext<Context> context{ ctxt };
377 for(uint64_t idx = context->files->begin_segment; idx < context->new_begin_segment; ++idx) {
378 file_t& file = context->files->file(idx);
379 file.Close();
380 file.Delete();
381 }
382 std::free(context->files);
383 if(context->caller_callback) {
384 context->caller_callback(context->new_begin_segment * kSegmentSize);
385 }
386 };
387
388 // Only one thread can modify the list of files at a given time.
389 std::lock_guard<std::mutex> lock{ mutex_ };
390 bundle_t* files = files_.load();
391 assert(files);
392 if(files->begin_segment >= new_begin_segment) {
393 // Segments have already been truncated.
394 if(caller_callback) {
395 caller_callback(files->begin_segment * kSegmentSize);
396 }
397 return;
398 }
399
400 // Make a copy of the list, excluding the files to be truncated.
401 void* buffer = std::malloc(bundle_t::size(files->end_segment - new_begin_segment));
402 bundle_t* new_files = new(buffer) bundle_t{ handler_, new_begin_segment, files->end_segment,
403 *files };
404 files_.store(new_files);
405 // Delete the old list only after all threads have finished looking at it.
406 Context context{ files, new_begin_segment, caller_callback };
407 IAsyncContext* context_copy;
408 Status result = context.DeepCopy(context_copy);
409 assert(result == Status::Ok);
410 epoch_->BumpCurrentEpoch(callback, context_copy);
411 }
412
413 std::atomic<uint64_t> begin_segment_;
414 std::atomic<bundle_t*> files_;
415 handler_t* handler_;
416 std::string filename_;
417 environment::FileOptions file_options_;
418 LightEpoch* epoch_;
419 std::mutex mutex_;
420};
421
422template <class H, uint64_t S>
423class FileSystemDisk {
424 public:
425 typedef H handler_t;
426 typedef FileSystemFile<handler_t> file_t;
427 typedef FileSystemSegmentedFile<handler_t, S> log_file_t;
428
429 private:
430 static std::string NormalizePath(std::string root_path) {
431 if(root_path.empty() || root_path.back() != FASTER::environment::kPathSeparator[0]) {
432 root_path += FASTER::environment::kPathSeparator;
433 }
434 return root_path;
435 }
436
437 public:
438 FileSystemDisk(const std::string& root_path, LightEpoch& epoch, bool enablePrivileges = false,
439 bool unbuffered = true, bool delete_on_close = false)
440 : root_path_{ NormalizePath(root_path) }
441 , handler_{ 16 /*max threads*/ }
442 , default_file_options_{ unbuffered, delete_on_close }
443 , log_{ root_path_ + "log.log", default_file_options_, &epoch} {
444 Status result = log_.Open(&handler_);
445 assert(result == Status::Ok);
446 }
447
448 /// Methods required by the (implicit) disk interface.
449 uint32_t sector_size() const {
450 return static_cast<uint32_t>(log_.alignment());
451 }
452
453 const log_file_t& log() const {
454 return log_;
455 }
456 log_file_t& log() {
457 return log_;
458 }
459
460 std::string relative_index_checkpoint_path(const Guid& token) const {
461 std::string retval = "index-checkpoints";
462 retval += FASTER::environment::kPathSeparator;
463 retval += token.ToString();
464 retval += FASTER::environment::kPathSeparator;
465 return retval;
466 }
467 std::string index_checkpoint_path(const Guid& token) const {
468 return root_path_ + relative_index_checkpoint_path(token);
469 }
470
471 std::string relative_cpr_checkpoint_path(const Guid& token) const {
472 std::string retval = "cpr-checkpoints";
473 retval += FASTER::environment::kPathSeparator;
474 retval += token.ToString();
475 retval += FASTER::environment::kPathSeparator;
476 return retval;
477 }
478 std::string cpr_checkpoint_path(const Guid& token) const {
479 return root_path_ + relative_cpr_checkpoint_path(token);
480 }
481
482 void CreateIndexCheckpointDirectory(const Guid& token) {
483 std::string index_dir = index_checkpoint_path(token);
484 std::experimental::filesystem::path path{ index_dir };
485 try {
486 std::experimental::filesystem::remove_all(path);
487 } catch(std::experimental::filesystem::filesystem_error&) {
488 // Ignore; throws when path doesn't exist yet.
489 }
490 std::experimental::filesystem::create_directories(path);
491 }
492
493 void CreateCprCheckpointDirectory(const Guid& token) {
494 std::string cpr_dir = cpr_checkpoint_path(token);
495 std::experimental::filesystem::path path{ cpr_dir };
496 try {
497 std::experimental::filesystem::remove_all(path);
498 } catch(std::experimental::filesystem::filesystem_error&) {
499 // Ignore; throws when path doesn't exist yet.
500 }
501 std::experimental::filesystem::create_directories(path);
502 }
503
504 file_t NewFile(const std::string& relative_path) {
505 return file_t{ root_path_ + relative_path, default_file_options_ };
506 }
507
508 /// Implementation-specific accessor.
509 handler_t& handler() {
510 return handler_;
511 }
512
513 bool TryComplete() {
514 return handler_.TryComplete();
515 }
516
517 private:
518 std::string root_path_;
519 handler_t handler_;
520
521 environment::FileOptions default_file_options_;
522
523 /// Store the log (contains all records).
524 log_file_t log_;
525};
526
527}
528} // namespace FASTER::device
529