1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <cassert> |
7 | #include <cstdint> |
8 | #include <cstring> |
9 | |
10 | #include "alloc.h" |
11 | #include "utility.h" |
12 | |
13 | #ifdef _WIN32 |
14 | #include <intrin.h> |
15 | #pragma intrinsic(_BitScanReverse) |
16 | |
17 | /// Microsoft's concurrency::concurrent_queue is based on Intel's tbb::concurrent_queue. |
18 | #include <concurrent_queue.h> |
19 | template <typename T> |
20 | using concurrent_queue = concurrency::concurrent_queue<T>; |
21 | #else |
22 | namespace FASTER { |
23 | /// Convert GCC's __builtin_clz() to Microsoft's _BitScanReverse. |
24 | inline uint8_t _BitScanReverse(unsigned long* index, uint32_t mask) { |
25 | bool found = mask > 0; |
26 | *index = 31 - __builtin_clz(mask); |
27 | return found; |
28 | } |
29 | } |
30 | |
31 | #include <tbb/concurrent_queue.h> |
32 | template <typename T> |
33 | using concurrent_queue = tbb::concurrent_queue<T>; |
34 | #endif |
35 | |
36 | namespace FASTER { |
37 | namespace core { |
38 | |
39 | /// A buffer pool used for file I/Os. |
40 | |
41 | class NativeSectorAlignedBufferPool; |
42 | |
43 | /// A sector-aligned memory block, along with offsets into the block. |
44 | class SectorAlignedMemory { |
45 | public: |
46 | /// Default constructor. |
47 | SectorAlignedMemory() |
48 | : buffer_{ nullptr } |
49 | , valid_offset{ 0 } |
50 | , required_bytes{ 0 } |
51 | , available_bytes{ 0 } |
52 | , level_{ 0 } |
53 | , pool_{ nullptr } { |
54 | } |
55 | SectorAlignedMemory(uint8_t* buffer, uint32_t level, NativeSectorAlignedBufferPool* pool) |
56 | : buffer_{ buffer } |
57 | , valid_offset{ 0 } |
58 | , required_bytes{ 0 } |
59 | , available_bytes{ 0 } |
60 | , level_{ level } |
61 | , pool_{ pool } { |
62 | } |
63 | /// No copy constructor. |
64 | SectorAlignedMemory(const SectorAlignedMemory&) = delete; |
65 | /// Move constructor. |
66 | SectorAlignedMemory(SectorAlignedMemory&& other) |
67 | : buffer_{ other.buffer_ } |
68 | , valid_offset{ other.valid_offset } |
69 | , required_bytes{ other.required_bytes } |
70 | , available_bytes{ other.available_bytes } |
71 | , level_{ other.level_ } |
72 | , pool_{ other.pool_ } { |
73 | other.buffer_ = nullptr; |
74 | other.pool_ = nullptr; |
75 | } |
76 | |
77 | inline ~SectorAlignedMemory(); |
78 | |
79 | /// Move assignment operator. |
80 | inline SectorAlignedMemory& operator=(SectorAlignedMemory&& other); |
81 | |
82 | inline void CopyValidBytesToAddress(uint8_t* pt) const { |
83 | std::memcpy(pt, &buffer_[valid_offset], required_bytes); |
84 | } |
85 | inline uint8_t* GetValidPointer() { |
86 | return &buffer_[valid_offset]; |
87 | } |
88 | inline uint8_t* buffer() { |
89 | return buffer_; |
90 | } |
91 | |
92 | private: |
93 | uint8_t* buffer_; |
94 | public: |
95 | uint32_t valid_offset; |
96 | uint32_t required_bytes; |
97 | uint32_t available_bytes; |
98 | private: |
99 | uint32_t level_; |
100 | NativeSectorAlignedBufferPool* pool_; |
101 | }; |
102 | static_assert(sizeof(SectorAlignedMemory) == 32, "sizeof(SectorAlignedMemory) != 32" ); |
103 | |
104 | /// Aligned buffer pool is a pool of memory. |
105 | /// Internally, it is organized as an array of concurrent queues where each concurrent |
106 | /// queue represents a memory of size in particular range. queue_[i] contains memory |
107 | /// segments each of size (2^i * sectorSize). |
108 | class NativeSectorAlignedBufferPool { |
109 | private: |
110 | static constexpr uint32_t kLevels = 32; |
111 | |
112 | public: |
113 | NativeSectorAlignedBufferPool(uint32_t recordSize, uint32_t sectorSize) |
114 | : record_size_{ recordSize } |
115 | , sector_size_{ sectorSize } { |
116 | } |
117 | |
118 | inline void Return(uint32_t level, uint8_t* buffer) { |
119 | assert(level < kLevels); |
120 | queue_[level].push(buffer); |
121 | } |
122 | inline SectorAlignedMemory Get(uint32_t numRecords); |
123 | |
124 | private: |
125 | uint32_t Level(uint32_t sectors) { |
126 | assert(sectors > 0); |
127 | if(sectors == 1) { |
128 | return 0; |
129 | } |
130 | // BSR returns the page_index k of the most-significant 1 bit. So 2^(k+1) > (sectors - 1) >= |
131 | // 2^k, which means 2^(k+1) >= sectors > 2^k. |
132 | unsigned long k; |
133 | _BitScanReverse(&k, sectors - 1); |
134 | return k + 1; |
135 | } |
136 | |
137 | uint32_t record_size_; |
138 | uint32_t sector_size_; |
139 | /// Level 0 caches memory allocations of size (sectorSize); level n+1 caches allocations of size |
140 | /// (sectorSize) * 2^n. |
141 | concurrent_queue<uint8_t*> queue_[kLevels]; |
142 | }; |
143 | |
144 | /// Implementations. |
145 | inline SectorAlignedMemory& SectorAlignedMemory::operator=(SectorAlignedMemory&& other) { |
146 | if(buffer_ == other.buffer_) { |
147 | // Self-assignment is a no-op. |
148 | return *this; |
149 | } |
150 | if(buffer_ != nullptr) { |
151 | // Return our buffer to the pool, before taking ownership of a new buffer. |
152 | pool_->Return(level_, buffer_); |
153 | } |
154 | buffer_ = other.buffer_; |
155 | valid_offset = other.valid_offset; |
156 | required_bytes = other.required_bytes; |
157 | available_bytes = other.available_bytes; |
158 | level_ = other.level_; |
159 | pool_ = other.pool_; |
160 | |
161 | // We own the buffer now; other SectorAlignedMemory does not. |
162 | other.buffer_ = nullptr; |
163 | other.pool_ = nullptr; |
164 | return *this; |
165 | } |
166 | |
167 | inline SectorAlignedMemory::~SectorAlignedMemory() { |
168 | if(buffer_) { |
169 | pool_->Return(level_, buffer_); |
170 | } |
171 | } |
172 | |
173 | inline SectorAlignedMemory NativeSectorAlignedBufferPool::Get(uint32_t numRecords) { |
174 | // How many sectors do we need? |
175 | uint32_t sectors_required = (numRecords * record_size_ + sector_size_ - 1) / sector_size_; |
176 | uint32_t level = Level(sectors_required); |
177 | uint8_t* buffer; |
178 | if(queue_[level].try_pop(buffer)) { |
179 | return SectorAlignedMemory{ buffer, level, this }; |
180 | } else { |
181 | uint8_t* buffer = reinterpret_cast<uint8_t*>(aligned_alloc(sector_size_, |
182 | sector_size_ * (1 << level))); |
183 | return SectorAlignedMemory{ buffer, level, this }; |
184 | } |
185 | } |
186 | |
187 | } |
188 | } // namespace FASTER::core |