1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef ARROW_UTIL_MEMORY_H
19#define ARROW_UTIL_MEMORY_H
20
21#include <thread>
22#include <vector>
23
24#include "arrow/util/thread-pool.h"
25
26namespace arrow {
27namespace internal {
28
29uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
30 uintptr_t value = reinterpret_cast<uintptr_t>(address);
31 return reinterpret_cast<uint8_t*>(value & bits);
32}
33
34// This function is just for avoiding MinGW-w64 32bit crash.
35// See also: https://sourceforge.net/p/mingw-w64/bugs/767/
36void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
37
38// A helper function for doing memcpy with multiple threads. This is required
39// to saturate the memory bandwidth of modern cpus.
40void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
41 uintptr_t block_size, int num_threads) {
42 // XXX This function is really using `num_threads + 1` threads.
43 auto pool = GetCpuThreadPool();
44
45 uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
46 uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
47 int64_t num_blocks = (right - left) / block_size;
48
49 // Update right address
50 right = right - (num_blocks % num_threads) * block_size;
51
52 // Now we divide these blocks between available threads. The remainder is
53 // handled separately.
54 size_t chunk_size = (right - left) / num_threads;
55 int64_t prefix = left - src;
56 int64_t suffix = src + nbytes - right;
57 // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
58 // We have chunk_size = k * block_size, therefore the data layout is
59 // | prefix | num_threads * chunk_size | suffix |.
60 // Each thread gets a "chunk" of k blocks.
61
62 // Start all parallel memcpy tasks and handle leftovers while threads run.
63 std::vector<std::future<void*>> futures;
64
65 for (int i = 0; i < num_threads; i++) {
66 futures.emplace_back(pool->Submit(wrap_memcpy, dst + prefix + i * chunk_size,
67 left + i * chunk_size, chunk_size));
68 }
69 memcpy(dst, src, prefix);
70 memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
71
72 for (auto& fut : futures) {
73 fut.get();
74 }
75}
76
77} // namespace internal
78} // namespace arrow
79
80#endif // ARROW_UTIL_MEMORY_H
81