1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/memory_pool.h"
19
20#include <algorithm> // IWYU pragma: keep
21#include <cstdlib> // IWYU pragma: keep
22#include <cstring> // IWYU pragma: keep
23#include <iostream> // IWYU pragma: keep
24#include <limits>
25#include <memory>
26#include <sstream> // IWYU pragma: keep
27
28#include "arrow/status.h"
29#include "arrow/util/logging.h" // IWYU pragma: keep
30
31#ifdef ARROW_JEMALLOC
32// Needed to support jemalloc 3 and 4
33#define JEMALLOC_MANGLE
34// Explicitly link to our version of jemalloc
35#include "jemalloc_ep/dist/include/jemalloc/jemalloc.h"
36#endif
37
38namespace arrow {
39
40constexpr size_t kAlignment = 64;
41
42namespace {
43
44// A static piece of memory for 0-size allocations, so as to return
45// an aligned non-null pointer.
46alignas(kAlignment) static uint8_t zero_size_area[1];
47
48// Allocate memory according to the alignment requirements for Arrow
49// (as of May 2016 64 bytes)
50Status AllocateAligned(int64_t size, uint8_t** out) {
51 // TODO(emkornfield) find something compatible with windows
52 if (size < 0) {
53 return Status::Invalid("negative malloc size");
54 }
55 if (size == 0) {
56 *out = zero_size_area;
57 return Status::OK();
58 }
59 if (static_cast<uint64_t>(size) >= std::numeric_limits<size_t>::max()) {
60 return Status::CapacityError("malloc size overflows size_t");
61 }
62#ifdef _WIN32
63 // Special code path for Windows
64 *out =
65 reinterpret_cast<uint8_t*>(_aligned_malloc(static_cast<size_t>(size), kAlignment));
66 if (!*out) {
67 return Status::OutOfMemory("malloc of size ", size, " failed");
68 }
69#elif defined(ARROW_JEMALLOC)
70 *out = reinterpret_cast<uint8_t*>(
71 mallocx(static_cast<size_t>(size), MALLOCX_ALIGN(kAlignment)));
72 if (*out == NULL) {
73 return Status::OutOfMemory("malloc of size ", size, " failed");
74 }
75#else
76 const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment,
77 static_cast<size_t>(size));
78 if (result == ENOMEM) {
79 return Status::OutOfMemory("malloc of size ", size, " failed");
80 }
81
82 if (result == EINVAL) {
83 return Status::Invalid("invalid alignment parameter: ", kAlignment);
84 }
85#endif
86 return Status::OK();
87}
88
89void DeallocateAligned(uint8_t* ptr, int64_t size) {
90 if (ptr == zero_size_area) {
91 DCHECK_EQ(size, 0);
92 } else {
93#ifdef _WIN32
94 _aligned_free(ptr);
95#elif defined(ARROW_JEMALLOC)
96 dallocx(ptr, MALLOCX_ALIGN(kAlignment));
97#else
98 std::free(ptr);
99#endif
100 }
101}
102
103Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) {
104 uint8_t* previous_ptr = *ptr;
105 if (previous_ptr == zero_size_area) {
106 DCHECK_EQ(old_size, 0);
107 return AllocateAligned(new_size, ptr);
108 }
109 if (new_size == 0) {
110 DeallocateAligned(previous_ptr, old_size);
111 *ptr = zero_size_area;
112 return Status::OK();
113 }
114#ifdef ARROW_JEMALLOC
115 if (new_size < 0) {
116 return Status::Invalid("negative realloc size");
117 }
118 if (static_cast<uint64_t>(new_size) >= std::numeric_limits<size_t>::max()) {
119 return Status::CapacityError("realloc overflows size_t");
120 }
121 *ptr = reinterpret_cast<uint8_t*>(
122 rallocx(*ptr, static_cast<size_t>(new_size), MALLOCX_ALIGN(kAlignment)));
123 if (*ptr == NULL) {
124 *ptr = previous_ptr;
125 return Status::OutOfMemory("realloc of size ", new_size, " failed");
126 }
127#else
128 // Note: We cannot use realloc() here as it doesn't guarantee alignment.
129
130 // Allocate new chunk
131 uint8_t* out = nullptr;
132 RETURN_NOT_OK(AllocateAligned(new_size, &out));
133 DCHECK(out);
134 // Copy contents and release old memory chunk
135 memcpy(out, *ptr, static_cast<size_t>(std::min(new_size, old_size)));
136#ifdef _WIN32
137 _aligned_free(*ptr);
138#else
139 std::free(*ptr);
140#endif // defined(_MSC_VER)
141 *ptr = out;
142#endif // defined(ARROW_JEMALLOC)
143
144 return Status::OK();
145}
146
147} // namespace
148
149MemoryPool::MemoryPool() {}
150
151MemoryPool::~MemoryPool() {}
152
153int64_t MemoryPool::max_memory() const { return -1; }
154
155///////////////////////////////////////////////////////////////////////
156// Default MemoryPool implementation
157
158class DefaultMemoryPool : public MemoryPool {
159 public:
160 ~DefaultMemoryPool() override {}
161
162 Status Allocate(int64_t size, uint8_t** out) override {
163 RETURN_NOT_OK(AllocateAligned(size, out));
164
165 stats_.UpdateAllocatedBytes(size);
166 return Status::OK();
167 }
168
169 Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
170 RETURN_NOT_OK(ReallocateAligned(old_size, new_size, ptr));
171
172 stats_.UpdateAllocatedBytes(new_size - old_size);
173 return Status::OK();
174 }
175
176 int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
177
178 void Free(uint8_t* buffer, int64_t size) override {
179 DeallocateAligned(buffer, size);
180
181 stats_.UpdateAllocatedBytes(-size);
182 }
183
184 int64_t max_memory() const override { return stats_.max_memory(); }
185
186 private:
187 internal::MemoryPoolStats stats_;
188};
189
190std::unique_ptr<MemoryPool> MemoryPool::CreateDefault() {
191 return std::unique_ptr<MemoryPool>(new DefaultMemoryPool);
192}
193
194MemoryPool* default_memory_pool() {
195 static DefaultMemoryPool default_memory_pool_;
196 return &default_memory_pool_;
197}
198
199///////////////////////////////////////////////////////////////////////
200// LoggingMemoryPool implementation
201
202LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {}
203
204Status LoggingMemoryPool::Allocate(int64_t size, uint8_t** out) {
205 Status s = pool_->Allocate(size, out);
206 std::cout << "Allocate: size = " << size << std::endl;
207 return s;
208}
209
210Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
211 Status s = pool_->Reallocate(old_size, new_size, ptr);
212 std::cout << "Reallocate: old_size = " << old_size << " - new_size = " << new_size
213 << std::endl;
214 return s;
215}
216
217void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size) {
218 pool_->Free(buffer, size);
219 std::cout << "Free: size = " << size << std::endl;
220}
221
222int64_t LoggingMemoryPool::bytes_allocated() const {
223 int64_t nb_bytes = pool_->bytes_allocated();
224 std::cout << "bytes_allocated: " << nb_bytes << std::endl;
225 return nb_bytes;
226}
227
228int64_t LoggingMemoryPool::max_memory() const {
229 int64_t mem = pool_->max_memory();
230 std::cout << "max_memory: " << mem << std::endl;
231 return mem;
232}
233
234///////////////////////////////////////////////////////////////////////
235// ProxyMemoryPool implementation
236
237class ProxyMemoryPool::ProxyMemoryPoolImpl {
238 public:
239 explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {}
240
241 Status Allocate(int64_t size, uint8_t** out) {
242 RETURN_NOT_OK(pool_->Allocate(size, out));
243 stats_.UpdateAllocatedBytes(size);
244 return Status::OK();
245 }
246
247 Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
248 RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
249 stats_.UpdateAllocatedBytes(new_size - old_size);
250 return Status::OK();
251 }
252
253 void Free(uint8_t* buffer, int64_t size) {
254 pool_->Free(buffer, size);
255 stats_.UpdateAllocatedBytes(-size);
256 }
257
258 int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
259
260 int64_t max_memory() const { return stats_.max_memory(); }
261
262 private:
263 MemoryPool* pool_;
264 internal::MemoryPoolStats stats_;
265};
266
267ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) {
268 impl_.reset(new ProxyMemoryPoolImpl(pool));
269}
270
271ProxyMemoryPool::~ProxyMemoryPool() {}
272
273Status ProxyMemoryPool::Allocate(int64_t size, uint8_t** out) {
274 return impl_->Allocate(size, out);
275}
276
277Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
278 return impl_->Reallocate(old_size, new_size, ptr);
279}
280
281void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size) {
282 return impl_->Free(buffer, size);
283}
284
285int64_t ProxyMemoryPool::bytes_allocated() const { return impl_->bytes_allocated(); }
286
287int64_t ProxyMemoryPool::max_memory() const { return impl_->max_memory(); }
288
289} // namespace arrow
290