1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <cassert>
7#include <cstdint>
8#include <cstring>
9
10#include "alloc.h"
11#include "utility.h"
12
13#ifdef _WIN32
14#include <intrin.h>
15#pragma intrinsic(_BitScanReverse)
16
17/// Microsoft's concurrency::concurrent_queue is based on Intel's tbb::concurrent_queue.
18#include <concurrent_queue.h>
19template <typename T>
20using concurrent_queue = concurrency::concurrent_queue<T>;
21#else
22namespace FASTER {
23/// Convert GCC's __builtin_clz() to Microsoft's _BitScanReverse.
24inline uint8_t _BitScanReverse(unsigned long* index, uint32_t mask) {
25 bool found = mask > 0;
26 *index = 31 - __builtin_clz(mask);
27 return found;
28}
29}
30
31#include <tbb/concurrent_queue.h>
32template <typename T>
33using concurrent_queue = tbb::concurrent_queue<T>;
34#endif
35
36namespace FASTER {
37namespace core {
38
39/// A buffer pool used for file I/Os.
40
41class NativeSectorAlignedBufferPool;
42
43/// A sector-aligned memory block, along with offsets into the block.
44class SectorAlignedMemory {
45 public:
46 /// Default constructor.
47 SectorAlignedMemory()
48 : buffer_{ nullptr }
49 , valid_offset{ 0 }
50 , required_bytes{ 0 }
51 , available_bytes{ 0 }
52 , level_{ 0 }
53 , pool_{ nullptr } {
54 }
55 SectorAlignedMemory(uint8_t* buffer, uint32_t level, NativeSectorAlignedBufferPool* pool)
56 : buffer_{ buffer }
57 , valid_offset{ 0 }
58 , required_bytes{ 0 }
59 , available_bytes{ 0 }
60 , level_{ level }
61 , pool_{ pool } {
62 }
63 /// No copy constructor.
64 SectorAlignedMemory(const SectorAlignedMemory&) = delete;
65 /// Move constructor.
66 SectorAlignedMemory(SectorAlignedMemory&& other)
67 : buffer_{ other.buffer_ }
68 , valid_offset{ other.valid_offset }
69 , required_bytes{ other.required_bytes }
70 , available_bytes{ other.available_bytes }
71 , level_{ other.level_ }
72 , pool_{ other.pool_ } {
73 other.buffer_ = nullptr;
74 other.pool_ = nullptr;
75 }
76
77 inline ~SectorAlignedMemory();
78
79 /// Move assignment operator.
80 inline SectorAlignedMemory& operator=(SectorAlignedMemory&& other);
81
82 inline void CopyValidBytesToAddress(uint8_t* pt) const {
83 std::memcpy(pt, &buffer_[valid_offset], required_bytes);
84 }
85 inline uint8_t* GetValidPointer() {
86 return &buffer_[valid_offset];
87 }
88 inline uint8_t* buffer() {
89 return buffer_;
90 }
91
92 private:
93 uint8_t* buffer_;
94 public:
95 uint32_t valid_offset;
96 uint32_t required_bytes;
97 uint32_t available_bytes;
98 private:
99 uint32_t level_;
100 NativeSectorAlignedBufferPool* pool_;
101};
102static_assert(sizeof(SectorAlignedMemory) == 32, "sizeof(SectorAlignedMemory) != 32");
103
104/// Aligned buffer pool is a pool of memory.
105/// Internally, it is organized as an array of concurrent queues where each concurrent
106/// queue represents a memory of size in particular range. queue_[i] contains memory
107/// segments each of size (2^i * sectorSize).
108class NativeSectorAlignedBufferPool {
109 private:
110 static constexpr uint32_t kLevels = 32;
111
112 public:
113 NativeSectorAlignedBufferPool(uint32_t recordSize, uint32_t sectorSize)
114 : record_size_{ recordSize }
115 , sector_size_{ sectorSize } {
116 }
117
118 inline void Return(uint32_t level, uint8_t* buffer) {
119 assert(level < kLevels);
120 queue_[level].push(buffer);
121 }
122 inline SectorAlignedMemory Get(uint32_t numRecords);
123
124 private:
125 uint32_t Level(uint32_t sectors) {
126 assert(sectors > 0);
127 if(sectors == 1) {
128 return 0;
129 }
130 // BSR returns the page_index k of the most-significant 1 bit. So 2^(k+1) > (sectors - 1) >=
131 // 2^k, which means 2^(k+1) >= sectors > 2^k.
132 unsigned long k;
133 _BitScanReverse(&k, sectors - 1);
134 return k + 1;
135 }
136
137 uint32_t record_size_;
138 uint32_t sector_size_;
139 /// Level 0 caches memory allocations of size (sectorSize); level n+1 caches allocations of size
140 /// (sectorSize) * 2^n.
141 concurrent_queue<uint8_t*> queue_[kLevels];
142};
143
144/// Implementations.
145inline SectorAlignedMemory& SectorAlignedMemory::operator=(SectorAlignedMemory&& other) {
146 if(buffer_ == other.buffer_) {
147 // Self-assignment is a no-op.
148 return *this;
149 }
150 if(buffer_ != nullptr) {
151 // Return our buffer to the pool, before taking ownership of a new buffer.
152 pool_->Return(level_, buffer_);
153 }
154 buffer_ = other.buffer_;
155 valid_offset = other.valid_offset;
156 required_bytes = other.required_bytes;
157 available_bytes = other.available_bytes;
158 level_ = other.level_;
159 pool_ = other.pool_;
160
161 // We own the buffer now; other SectorAlignedMemory does not.
162 other.buffer_ = nullptr;
163 other.pool_ = nullptr;
164 return *this;
165}
166
167inline SectorAlignedMemory::~SectorAlignedMemory() {
168 if(buffer_) {
169 pool_->Return(level_, buffer_);
170 }
171}
172
173inline SectorAlignedMemory NativeSectorAlignedBufferPool::Get(uint32_t numRecords) {
174 // How many sectors do we need?
175 uint32_t sectors_required = (numRecords * record_size_ + sector_size_ - 1) / sector_size_;
176 uint32_t level = Level(sectors_required);
177 uint8_t* buffer;
178 if(queue_[level].try_pop(buffer)) {
179 return SectorAlignedMemory{ buffer, level, this };
180 } else {
181 uint8_t* buffer = reinterpret_cast<uint8_t*>(aligned_alloc(sector_size_,
182 sector_size_ * (1 << level)));
183 return SectorAlignedMemory{ buffer, level, this };
184 }
185}
186
187}
188} // namespace FASTER::core