1 | /* |
2 | * Copyright 2012-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/stats/detail/BufferedStat.h> |
20 | |
21 | #include <folly/stats/detail/DigestBuilder-defs.h> |
22 | #include <folly/stats/detail/SlidingWindow-defs.h> |
23 | |
24 | namespace folly { |
25 | namespace detail { |
26 | |
27 | template <typename DigestT, typename ClockT> |
28 | BufferedStat<DigestT, ClockT>::BufferedStat( |
29 | typename ClockT::duration bufferDuration, |
30 | size_t bufferSize, |
31 | size_t digestSize) |
32 | : bufferDuration_(bufferDuration), digestBuilder_(bufferSize, digestSize) { |
33 | expiry_.store( |
34 | TimePointHolder(roundUp(ClockT::now())), std::memory_order_relaxed); |
35 | } |
36 | |
37 | template <typename DigestT, typename ClockT> |
38 | void BufferedStat<DigestT, ClockT>::append(double value, TimePoint now) { |
39 | if (UNLIKELY(now > expiry_.load(std::memory_order_relaxed).tp)) { |
40 | std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t()); |
41 | if (g.owns_lock()) { |
42 | doUpdate(now, g, UpdateMode::OnExpiry); |
43 | } |
44 | } |
45 | digestBuilder_.append(value); |
46 | } |
47 | |
48 | template <typename DigestT, typename ClockT> |
49 | typename BufferedStat<DigestT, ClockT>::TimePoint |
50 | BufferedStat<DigestT, ClockT>::roundUp(TimePoint t) { |
51 | auto remainder = t.time_since_epoch() % bufferDuration_; |
52 | if (remainder.count() != 0) { |
53 | return t + bufferDuration_ - remainder; |
54 | } |
55 | return t; |
56 | } |
57 | |
58 | template <typename DigestT, typename ClockT> |
59 | std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired( |
60 | TimePoint now) { |
61 | std::unique_lock<SharedMutex> g(mutex_); |
62 | doUpdate(now, g, UpdateMode::OnExpiry); |
63 | return g; |
64 | } |
65 | |
66 | template <typename DigestT, typename ClockT> |
67 | void BufferedStat<DigestT, ClockT>::flush() { |
68 | std::unique_lock<SharedMutex> g(mutex_); |
69 | doUpdate(ClockT::now(), g, UpdateMode::Now); |
70 | } |
71 | |
72 | template <typename DigestT, typename ClockT> |
73 | void BufferedStat<DigestT, ClockT>::doUpdate( |
74 | TimePoint now, |
75 | const std::unique_lock<SharedMutex>& g, |
76 | UpdateMode updateMode) { |
77 | DCHECK(g.owns_lock()); |
78 | // Check that no other thread has performed the slide after the check |
79 | auto oldExpiry = expiry_.load(std::memory_order_relaxed).tp; |
80 | if (now > oldExpiry || updateMode == UpdateMode::Now) { |
81 | now = roundUp(now); |
82 | expiry_.store(TimePointHolder(now), std::memory_order_relaxed); |
83 | onNewDigest(digestBuilder_.build(), now, oldExpiry, g); |
84 | } |
85 | } |
86 | |
87 | template <typename DigestT, typename ClockT> |
88 | BufferedDigest<DigestT, ClockT>::BufferedDigest( |
89 | typename ClockT::duration bufferDuration, |
90 | size_t bufferSize, |
91 | size_t digestSize) |
92 | : BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize), |
93 | digest_(digestSize) {} |
94 | |
95 | template <typename DigestT, typename ClockT> |
96 | DigestT BufferedDigest<DigestT, ClockT>::get(TimePoint now) { |
97 | auto g = this->updateIfExpired(now); |
98 | return digest_; |
99 | } |
100 | |
101 | template <typename DigestT, typename ClockT> |
102 | void BufferedDigest<DigestT, ClockT>::onNewDigest( |
103 | DigestT digest, |
104 | TimePoint /*newExpiry*/, |
105 | TimePoint /*oldExpiry*/, |
106 | const std::unique_lock<SharedMutex>& /*g*/) { |
107 | std::array<DigestT, 2> a{{digest_, std::move(digest)}}; |
108 | digest_ = DigestT::merge(a); |
109 | } |
110 | |
111 | template <typename DigestT, typename ClockT> |
112 | BufferedSlidingWindow<DigestT, ClockT>::BufferedSlidingWindow( |
113 | size_t nBuckets, |
114 | typename ClockT::duration bufferDuration, |
115 | size_t bufferSize, |
116 | size_t digestSize) |
117 | : BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize), |
118 | slidingWindow_([=]() { return DigestT(digestSize); }, nBuckets) {} |
119 | |
120 | template <typename DigestT, typename ClockT> |
121 | std::vector<DigestT> BufferedSlidingWindow<DigestT, ClockT>::get( |
122 | TimePoint now) { |
123 | std::vector<DigestT> digests; |
124 | { |
125 | auto g = this->updateIfExpired(now); |
126 | digests = slidingWindow_.get(); |
127 | } |
128 | digests.erase( |
129 | std::remove_if( |
130 | digests.begin(), |
131 | digests.end(), |
132 | [](const DigestT& digest) { return digest.empty(); }), |
133 | digests.end()); |
134 | return digests; |
135 | } |
136 | |
137 | template <typename DigestT, typename ClockT> |
138 | void BufferedSlidingWindow<DigestT, ClockT>::onNewDigest( |
139 | DigestT digest, |
140 | TimePoint newExpiry, |
141 | TimePoint oldExpiry, |
142 | const std::unique_lock<SharedMutex>& /*g*/) { |
143 | if (newExpiry > oldExpiry) { |
144 | auto diff = newExpiry - oldExpiry; |
145 | slidingWindow_.slide(diff / this->bufferDuration_); |
146 | diff -= this->bufferDuration_; |
147 | slidingWindow_.set(diff / this->bufferDuration_, std::move(digest)); |
148 | } else { |
149 | // just update current window |
150 | std::array<DigestT, 2> a{{slidingWindow_.front(), std::move(digest)}}; |
151 | slidingWindow_.set(0 /* current window */, DigestT::merge(a)); |
152 | } |
153 | } |
154 | |
155 | } // namespace detail |
156 | } // namespace folly |
157 | |