1 | /* |
2 | * Copyright 2018-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/stats/detail/DigestBuilder.h> |
20 | |
21 | #include <algorithm> |
22 | |
23 | #include <folly/concurrency/CacheLocality.h> |
24 | |
25 | namespace folly { |
26 | namespace detail { |
27 | |
28 | static FOLLY_TLS uint32_t tls_lastCpuBufferSlot = 0; |
29 | |
30 | template <typename DigestT> |
31 | DigestBuilder<DigestT>::DigestBuilder(size_t bufferSize, size_t digestSize) |
32 | : bufferSize_(bufferSize), digestSize_(digestSize) { |
33 | auto& cl = CacheLocality::system(); |
34 | cpuLocalBuffers_.resize(cl.numCachesByLevel[0]); |
35 | } |
36 | |
37 | template <typename DigestT> |
38 | DigestT DigestBuilder<DigestT>::build() { |
39 | std::vector<std::vector<double>> valuesVec; |
40 | std::vector<std::unique_ptr<DigestT>> digestPtrs; |
41 | valuesVec.reserve(cpuLocalBuffers_.size()); |
42 | digestPtrs.reserve(cpuLocalBuffers_.size()); |
43 | |
44 | for (auto& cpuLocalBuffer : cpuLocalBuffers_) { |
45 | SpinLockGuard g(cpuLocalBuffer.mutex); |
46 | valuesVec.push_back(std::move(cpuLocalBuffer.buffer)); |
47 | if (cpuLocalBuffer.digest) { |
48 | digestPtrs.push_back(std::move(cpuLocalBuffer.digest)); |
49 | } |
50 | } |
51 | |
52 | std::vector<DigestT> digests; |
53 | for (auto& digestPtr : digestPtrs) { |
54 | digests.push_back(std::move(*digestPtr)); |
55 | } |
56 | |
57 | size_t count = 0; |
58 | for (const auto& vec : valuesVec) { |
59 | count += vec.size(); |
60 | } |
61 | if (count) { |
62 | std::vector<double> values; |
63 | values.reserve(count); |
64 | for (const auto& vec : valuesVec) { |
65 | values.insert(values.end(), vec.begin(), vec.end()); |
66 | } |
67 | DigestT digest(digestSize_); |
68 | digests.push_back(digest.merge(values)); |
69 | } |
70 | return DigestT::merge(digests); |
71 | } |
72 | |
73 | template <typename DigestT> |
74 | void DigestBuilder<DigestT>::append(double value) { |
75 | auto& which = tls_lastCpuBufferSlot; |
76 | auto cpuLocalBuf = &cpuLocalBuffers_[which]; |
77 | std::unique_lock<SpinLock> g(cpuLocalBuf->mutex, std::try_to_lock_t()); |
78 | if (!g.owns_lock()) { |
79 | which = AccessSpreader<>::current(cpuLocalBuffers_.size()); |
80 | cpuLocalBuf = &cpuLocalBuffers_[which]; |
81 | g = std::unique_lock<SpinLock>(cpuLocalBuf->mutex); |
82 | } |
83 | cpuLocalBuf->buffer.push_back(value); |
84 | if (cpuLocalBuf->buffer.size() == bufferSize_) { |
85 | if (!cpuLocalBuf->digest) { |
86 | cpuLocalBuf->digest = std::make_unique<DigestT>(digestSize_); |
87 | } |
88 | *cpuLocalBuf->digest = cpuLocalBuf->digest->merge(cpuLocalBuf->buffer); |
89 | cpuLocalBuf->buffer.clear(); |
90 | } |
91 | } |
92 | |
93 | } // namespace detail |
94 | } // namespace folly |
95 | |