1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <cstdint> |
7 | #include <experimental/filesystem> |
8 | #include <mutex> |
9 | #include <string> |
10 | |
11 | #include "../core/gc_state.h" |
12 | #include "../core/guid.h" |
13 | #include "../core/light_epoch.h" |
14 | #include "../core/utility.h" |
15 | #include "../environment/file.h" |
16 | |
17 | /// Wrapper that exposes files to FASTER. Encapsulates segmented files, etc. |
18 | |
19 | namespace FASTER { |
20 | namespace device { |
21 | |
22 | template <class H, uint64_t S> |
23 | class FileSystemDisk; |
24 | |
25 | template <class H> |
26 | class FileSystemFile { |
27 | public: |
28 | typedef H handler_t; |
29 | typedef typename handler_t::async_file_t file_t; |
30 | |
31 | /// Default constructor |
32 | FileSystemFile() |
33 | : file_{} |
34 | , file_options_{} { |
35 | } |
36 | |
37 | FileSystemFile(const std::string& filename, const environment::FileOptions& file_options) |
38 | : file_{ filename } |
39 | , file_options_{ file_options } { |
40 | } |
41 | |
42 | /// Move constructor. |
43 | FileSystemFile(FileSystemFile&& other) |
44 | : file_{ std::move(other.file_) } |
45 | , file_options_{ other.file_options_ } { |
46 | } |
47 | |
48 | /// Move assignment operator. |
49 | FileSystemFile& operator=(FileSystemFile&& other) { |
50 | file_ = std::move(other.file_); |
51 | file_options_ = other.file_options_; |
52 | return *this; |
53 | } |
54 | |
55 | Status Open(handler_t* handler) { |
56 | return file_.Open(FASTER::environment::FileCreateDisposition::OpenOrCreate, file_options_, |
57 | handler, nullptr); |
58 | } |
59 | Status Close() { |
60 | return file_.Close(); |
61 | } |
62 | Status Delete() { |
63 | return file_.Delete(); |
64 | } |
65 | void Truncate(uint64_t new_begin_offset, GcState::truncate_callback_t callback) { |
66 | // Truncation is a no-op. |
67 | if(callback) { |
68 | callback(new_begin_offset); |
69 | } |
70 | } |
71 | |
72 | Status ReadAsync(uint64_t source, void* dest, uint32_t length, |
73 | AsyncIOCallback callback, IAsyncContext& context) const { |
74 | return file_.Read(source, length, reinterpret_cast<uint8_t*>(dest), context, callback); |
75 | } |
76 | Status WriteAsync(const void* source, uint64_t dest, uint32_t length, |
77 | AsyncIOCallback callback, IAsyncContext& context) { |
78 | return file_.Write(dest, length, reinterpret_cast<const uint8_t*>(source), context, callback); |
79 | } |
80 | |
81 | size_t alignment() const { |
82 | return file_.device_alignment(); |
83 | } |
84 | |
85 | private: |
86 | file_t file_; |
87 | environment::FileOptions file_options_; |
88 | }; |
89 | |
90 | /// Manages a bundle of segment files. |
91 | template <class H> |
92 | class FileSystemSegmentBundle { |
93 | public: |
94 | typedef H handler_t; |
95 | typedef FileSystemFile<handler_t> file_t; |
96 | typedef FileSystemSegmentBundle<handler_t> bundle_t; |
97 | |
98 | FileSystemSegmentBundle(const std::string& filename, |
99 | const environment::FileOptions& file_options, handler_t* handler, |
100 | uint64_t begin_segment_, uint64_t end_segment_) |
101 | : filename_{ filename } |
102 | , file_options_{ file_options } |
103 | , begin_segment{ begin_segment_ } |
104 | , end_segment{ end_segment_ } |
105 | , owner_{ true } { |
106 | for(uint64_t idx = begin_segment; idx < end_segment; ++idx) { |
107 | new(files() + (idx - begin_segment)) file_t{ filename_ + std::to_string(idx), |
108 | file_options_ }; |
109 | Status result = file(idx).Open(handler); |
110 | assert(result == Status::Ok); |
111 | } |
112 | } |
113 | |
114 | FileSystemSegmentBundle(handler_t* handler, uint64_t begin_segment_, uint64_t end_segment_, |
115 | bundle_t& other) |
116 | : filename_{ std::move(other.filename_) } |
117 | , file_options_{ other.file_options_ } |
118 | , begin_segment{ begin_segment_ } |
119 | , end_segment{ end_segment_ } |
120 | , owner_{ true } { |
121 | assert(end_segment >= other.end_segment); |
122 | |
123 | uint64_t begin_new = begin_segment; |
124 | uint64_t begin_copy = std::max(begin_segment, other.begin_segment); |
125 | uint64_t end_copy = std::min(end_segment, other.end_segment); |
126 | uint64_t end_new = end_segment; |
127 | |
128 | for(uint64_t idx = begin_segment; idx < begin_copy; ++idx) { |
129 | new(files() + (idx - begin_segment)) file_t{ filename_ + std::to_string(idx), |
130 | file_options_ }; |
131 | Status result = file(idx).Open(handler); |
132 | assert(result == Status::Ok); |
133 | } |
134 | for(uint64_t idx = begin_copy; idx < end_copy; ++idx) { |
135 | // Move file handles for segments already opened. |
136 | new(files() + (idx - begin_segment)) file_t{ std::move(other.file(idx)) }; |
137 | } |
138 | for(uint64_t idx = end_copy; idx < end_new; ++idx) { |
139 | new(files() + (idx - begin_segment)) file_t{ filename_ + std::to_string(idx), |
140 | file_options_ }; |
141 | Status result = file(idx).Open(handler); |
142 | assert(result == Status::Ok); |
143 | } |
144 | |
145 | other.owner_ = false; |
146 | } |
147 | |
148 | ~FileSystemSegmentBundle() { |
149 | if(owner_) { |
150 | for(uint64_t idx = begin_segment; idx < end_segment; ++idx) { |
151 | file(idx).~file_t(); |
152 | } |
153 | } |
154 | } |
155 | |
156 | Status Close() { |
157 | assert(owner_); |
158 | Status result = Status::Ok; |
159 | for(uint64_t idx = begin_segment; idx < end_segment; ++idx) { |
160 | Status r = file(idx).Close(); |
161 | if(r != Status::Ok) { |
162 | // We'll report the last error. |
163 | result = r; |
164 | } |
165 | } |
166 | return result; |
167 | } |
168 | |
169 | Status Delete() { |
170 | assert(owner_); |
171 | Status result = Status::Ok; |
172 | for(uint64_t idx = begin_segment; idx < end_segment; ++idx) { |
173 | Status r = file(idx).Delete(); |
174 | if(r != Status::Ok) { |
175 | // We'll report the last error. |
176 | result = r; |
177 | } |
178 | } |
179 | return result; |
180 | } |
181 | |
182 | file_t* files() { |
183 | return reinterpret_cast<file_t*>(this + 1); |
184 | } |
185 | file_t& file(uint64_t segment) { |
186 | assert(segment >= begin_segment); |
187 | return files()[segment - begin_segment]; |
188 | } |
189 | bool exists(uint64_t segment) const { |
190 | return segment >= begin_segment && segment < end_segment; |
191 | } |
192 | |
193 | static constexpr uint64_t size(uint64_t num_segments) { |
194 | return sizeof(bundle_t) + num_segments * sizeof(file_t); |
195 | } |
196 | |
197 | public: |
198 | const uint64_t begin_segment; |
199 | const uint64_t end_segment; |
200 | private: |
201 | std::string filename_; |
202 | environment::FileOptions file_options_; |
203 | bool owner_; |
204 | }; |
205 | |
206 | template <class H, uint64_t S> |
207 | class FileSystemSegmentedFile { |
208 | public: |
209 | typedef H handler_t; |
210 | typedef FileSystemFile<H> file_t; |
211 | typedef FileSystemSegmentBundle<handler_t> bundle_t; |
212 | |
213 | static constexpr uint64_t kSegmentSize = S; |
214 | static_assert(Utility::IsPowerOfTwo(S), "template parameter S is not a power of two!" ); |
215 | |
216 | FileSystemSegmentedFile(const std::string& filename, |
217 | const environment::FileOptions& file_options, LightEpoch* epoch) |
218 | : begin_segment_{ 0 } |
219 | , files_{ nullptr } |
220 | , handler_{ nullptr } |
221 | , filename_{ filename } |
222 | , file_options_{ file_options } |
223 | , epoch_{ epoch } { |
224 | } |
225 | |
226 | ~FileSystemSegmentedFile() { |
227 | bundle_t* files = files_.load(); |
228 | if(files) { |
229 | files->~bundle_t(); |
230 | std::free(files); |
231 | } |
232 | } |
233 | |
234 | Status Open(handler_t* handler) { |
235 | handler_ = handler; |
236 | return Status::Ok; |
237 | } |
238 | Status Close() { |
239 | return (files_) ? files_->Close() : Status::Ok; |
240 | } |
241 | Status Delete() { |
242 | return (files_) ? files_->Delete() : Status::Ok; |
243 | } |
244 | void Truncate(uint64_t new_begin_offset, GcState::truncate_callback_t callback) { |
245 | uint64_t new_begin_segment = new_begin_offset / kSegmentSize; |
246 | begin_segment_ = new_begin_segment; |
247 | TruncateSegments(new_begin_segment, callback); |
248 | } |
249 | |
250 | Status ReadAsync(uint64_t source, void* dest, uint32_t length, AsyncIOCallback callback, |
251 | IAsyncContext& context) const { |
252 | uint64_t segment = source / kSegmentSize; |
253 | assert(source % kSegmentSize + length <= kSegmentSize); |
254 | |
255 | bundle_t* files = files_.load(); |
256 | |
257 | if(!files || !files->exists(segment)) { |
258 | Status result = const_cast<FileSystemSegmentedFile<H, S>*>(this)->OpenSegment(segment); |
259 | if(result != Status::Ok) { |
260 | return result; |
261 | } |
262 | files = files_.load(); |
263 | } |
264 | return files->file(segment).ReadAsync(source % kSegmentSize, dest, length, callback, context); |
265 | } |
266 | |
267 | Status WriteAsync(const void* source, uint64_t dest, uint32_t length, |
268 | AsyncIOCallback callback, IAsyncContext& context) { |
269 | uint64_t segment = dest / kSegmentSize; |
270 | assert(dest % kSegmentSize + length <= kSegmentSize); |
271 | |
272 | bundle_t* files = files_.load(); |
273 | |
274 | if(!files || !files->exists(segment)) { |
275 | Status result = OpenSegment(segment); |
276 | if(result != Status::Ok) { |
277 | return result; |
278 | } |
279 | files = files_.load(); |
280 | } |
281 | return files->file(segment).WriteAsync(source, dest % kSegmentSize, length, callback, context); |
282 | } |
283 | |
284 | size_t alignment() const { |
285 | return 512; // For now, assume all disks have 512-bytes alignment. |
286 | } |
287 | |
288 | private: |
289 | Status OpenSegment(uint64_t segment) { |
290 | class Context : public IAsyncContext { |
291 | public: |
292 | Context(void* files_) |
293 | : files{ files_ } { |
294 | } |
295 | /// The deep-copy constructor. |
296 | Context(const Context& other) |
297 | : files{ other.files} { |
298 | } |
299 | protected: |
300 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
301 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
302 | } |
303 | public: |
304 | void* files; |
305 | }; |
306 | |
307 | auto callback = [](IAsyncContext* ctxt) { |
308 | CallbackContext<Context> context{ ctxt }; |
309 | std::free(context->files); |
310 | }; |
311 | |
312 | // Only one thread can modify the list of files at a given time. |
313 | std::lock_guard<std::mutex> lock{ mutex_ }; |
314 | bundle_t* files = files_.load(); |
315 | |
316 | if(segment < begin_segment_) { |
317 | // The requested segment has been truncated. |
318 | return Status::IOError; |
319 | } |
320 | if(files && files->exists(segment)) { |
321 | // Some other thread already opened this segment for us. |
322 | return Status::Ok; |
323 | } |
324 | |
325 | if(!files) { |
326 | // First segment opened. |
327 | void* buffer = std::malloc(bundle_t::size(1)); |
328 | bundle_t* new_files = new(buffer) bundle_t{ filename_, file_options_, handler_, |
329 | segment, segment + 1 }; |
330 | files_.store(new_files); |
331 | return Status::Ok; |
332 | } |
333 | |
334 | // Expand the list of files_. |
335 | uint64_t new_begin_segment = std::min(files->begin_segment, segment); |
336 | uint64_t new_end_segment = std::max(files->end_segment, segment + 1); |
337 | void* buffer = std::malloc(bundle_t::size(new_end_segment - new_begin_segment)); |
338 | bundle_t* new_files = new(buffer) bundle_t{ handler_, new_begin_segment, new_end_segment, |
339 | *files }; |
340 | files_.store(new_files); |
341 | // Delete the old list only after all threads have finished looking at it. |
342 | Context context{ files }; |
343 | IAsyncContext* context_copy; |
344 | Status result = context.DeepCopy(context_copy); |
345 | assert(result == Status::Ok); |
346 | epoch_->BumpCurrentEpoch(callback, context_copy); |
347 | return Status::Ok; |
348 | } |
349 | |
350 | void TruncateSegments(uint64_t new_begin_segment, GcState::truncate_callback_t caller_callback) { |
351 | class Context : public IAsyncContext { |
352 | public: |
353 | Context(bundle_t* files_, uint64_t new_begin_segment_, |
354 | GcState::truncate_callback_t caller_callback_) |
355 | : files{ files_ } |
356 | , new_begin_segment{ new_begin_segment_ } |
357 | , caller_callback{ caller_callback_ } { |
358 | } |
359 | /// The deep-copy constructor. |
360 | Context(const Context& other) |
361 | : files{ other.files } |
362 | , new_begin_segment{ other.new_begin_segment } |
363 | , caller_callback{ other.caller_callback } { |
364 | } |
365 | protected: |
366 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
367 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
368 | } |
369 | public: |
370 | bundle_t* files; |
371 | uint64_t new_begin_segment; |
372 | GcState::truncate_callback_t caller_callback; |
373 | }; |
374 | |
375 | auto callback = [](IAsyncContext* ctxt) { |
376 | CallbackContext<Context> context{ ctxt }; |
377 | for(uint64_t idx = context->files->begin_segment; idx < context->new_begin_segment; ++idx) { |
378 | file_t& file = context->files->file(idx); |
379 | file.Close(); |
380 | file.Delete(); |
381 | } |
382 | std::free(context->files); |
383 | if(context->caller_callback) { |
384 | context->caller_callback(context->new_begin_segment * kSegmentSize); |
385 | } |
386 | }; |
387 | |
388 | // Only one thread can modify the list of files at a given time. |
389 | std::lock_guard<std::mutex> lock{ mutex_ }; |
390 | bundle_t* files = files_.load(); |
391 | assert(files); |
392 | if(files->begin_segment >= new_begin_segment) { |
393 | // Segments have already been truncated. |
394 | if(caller_callback) { |
395 | caller_callback(files->begin_segment * kSegmentSize); |
396 | } |
397 | return; |
398 | } |
399 | |
400 | // Make a copy of the list, excluding the files to be truncated. |
401 | void* buffer = std::malloc(bundle_t::size(files->end_segment - new_begin_segment)); |
402 | bundle_t* new_files = new(buffer) bundle_t{ handler_, new_begin_segment, files->end_segment, |
403 | *files }; |
404 | files_.store(new_files); |
405 | // Delete the old list only after all threads have finished looking at it. |
406 | Context context{ files, new_begin_segment, caller_callback }; |
407 | IAsyncContext* context_copy; |
408 | Status result = context.DeepCopy(context_copy); |
409 | assert(result == Status::Ok); |
410 | epoch_->BumpCurrentEpoch(callback, context_copy); |
411 | } |
412 | |
413 | std::atomic<uint64_t> begin_segment_; |
414 | std::atomic<bundle_t*> files_; |
415 | handler_t* handler_; |
416 | std::string filename_; |
417 | environment::FileOptions file_options_; |
418 | LightEpoch* epoch_; |
419 | std::mutex mutex_; |
420 | }; |
421 | |
422 | template <class H, uint64_t S> |
423 | class FileSystemDisk { |
424 | public: |
425 | typedef H handler_t; |
426 | typedef FileSystemFile<handler_t> file_t; |
427 | typedef FileSystemSegmentedFile<handler_t, S> log_file_t; |
428 | |
429 | private: |
430 | static std::string NormalizePath(std::string root_path) { |
431 | if(root_path.empty() || root_path.back() != FASTER::environment::kPathSeparator[0]) { |
432 | root_path += FASTER::environment::kPathSeparator; |
433 | } |
434 | return root_path; |
435 | } |
436 | |
437 | public: |
438 | FileSystemDisk(const std::string& root_path, LightEpoch& epoch, bool enablePrivileges = false, |
439 | bool unbuffered = true, bool delete_on_close = false) |
440 | : root_path_{ NormalizePath(root_path) } |
441 | , handler_{ 16 /*max threads*/ } |
442 | , default_file_options_{ unbuffered, delete_on_close } |
443 | , log_{ root_path_ + "log.log" , default_file_options_, &epoch} { |
444 | Status result = log_.Open(&handler_); |
445 | assert(result == Status::Ok); |
446 | } |
447 | |
448 | /// Methods required by the (implicit) disk interface. |
449 | uint32_t sector_size() const { |
450 | return static_cast<uint32_t>(log_.alignment()); |
451 | } |
452 | |
453 | const log_file_t& log() const { |
454 | return log_; |
455 | } |
456 | log_file_t& log() { |
457 | return log_; |
458 | } |
459 | |
460 | std::string relative_index_checkpoint_path(const Guid& token) const { |
461 | std::string retval = "index-checkpoints" ; |
462 | retval += FASTER::environment::kPathSeparator; |
463 | retval += token.ToString(); |
464 | retval += FASTER::environment::kPathSeparator; |
465 | return retval; |
466 | } |
467 | std::string index_checkpoint_path(const Guid& token) const { |
468 | return root_path_ + relative_index_checkpoint_path(token); |
469 | } |
470 | |
471 | std::string relative_cpr_checkpoint_path(const Guid& token) const { |
472 | std::string retval = "cpr-checkpoints" ; |
473 | retval += FASTER::environment::kPathSeparator; |
474 | retval += token.ToString(); |
475 | retval += FASTER::environment::kPathSeparator; |
476 | return retval; |
477 | } |
478 | std::string cpr_checkpoint_path(const Guid& token) const { |
479 | return root_path_ + relative_cpr_checkpoint_path(token); |
480 | } |
481 | |
482 | void CreateIndexCheckpointDirectory(const Guid& token) { |
483 | std::string index_dir = index_checkpoint_path(token); |
484 | std::experimental::filesystem::path path{ index_dir }; |
485 | try { |
486 | std::experimental::filesystem::remove_all(path); |
487 | } catch(std::experimental::filesystem::filesystem_error&) { |
488 | // Ignore; throws when path doesn't exist yet. |
489 | } |
490 | std::experimental::filesystem::create_directories(path); |
491 | } |
492 | |
493 | void CreateCprCheckpointDirectory(const Guid& token) { |
494 | std::string cpr_dir = cpr_checkpoint_path(token); |
495 | std::experimental::filesystem::path path{ cpr_dir }; |
496 | try { |
497 | std::experimental::filesystem::remove_all(path); |
498 | } catch(std::experimental::filesystem::filesystem_error&) { |
499 | // Ignore; throws when path doesn't exist yet. |
500 | } |
501 | std::experimental::filesystem::create_directories(path); |
502 | } |
503 | |
504 | file_t NewFile(const std::string& relative_path) { |
505 | return file_t{ root_path_ + relative_path, default_file_options_ }; |
506 | } |
507 | |
508 | /// Implementation-specific accessor. |
509 | handler_t& handler() { |
510 | return handler_; |
511 | } |
512 | |
513 | bool TryComplete() { |
514 | return handler_.TryComplete(); |
515 | } |
516 | |
517 | private: |
518 | std::string root_path_; |
519 | handler_t handler_; |
520 | |
521 | environment::FileOptions default_file_options_; |
522 | |
523 | /// Store the log (contains all records). |
524 | log_file_t log_; |
525 | }; |
526 | |
527 | } |
528 | } // namespace FASTER::device |
529 | |