1/*
2 * Copyright 2012-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/stats/detail/BufferedStat.h>
20
21#include <folly/stats/detail/DigestBuilder-defs.h>
22#include <folly/stats/detail/SlidingWindow-defs.h>
23
24namespace folly {
25namespace detail {
26
27template <typename DigestT, typename ClockT>
28BufferedStat<DigestT, ClockT>::BufferedStat(
29 typename ClockT::duration bufferDuration,
30 size_t bufferSize,
31 size_t digestSize)
32 : bufferDuration_(bufferDuration), digestBuilder_(bufferSize, digestSize) {
33 expiry_.store(
34 TimePointHolder(roundUp(ClockT::now())), std::memory_order_relaxed);
35}
36
37template <typename DigestT, typename ClockT>
38void BufferedStat<DigestT, ClockT>::append(double value, TimePoint now) {
39 if (UNLIKELY(now > expiry_.load(std::memory_order_relaxed).tp)) {
40 std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t());
41 if (g.owns_lock()) {
42 doUpdate(now, g, UpdateMode::OnExpiry);
43 }
44 }
45 digestBuilder_.append(value);
46}
47
48template <typename DigestT, typename ClockT>
49typename BufferedStat<DigestT, ClockT>::TimePoint
50BufferedStat<DigestT, ClockT>::roundUp(TimePoint t) {
51 auto remainder = t.time_since_epoch() % bufferDuration_;
52 if (remainder.count() != 0) {
53 return t + bufferDuration_ - remainder;
54 }
55 return t;
56}
57
58template <typename DigestT, typename ClockT>
59std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired(
60 TimePoint now) {
61 std::unique_lock<SharedMutex> g(mutex_);
62 doUpdate(now, g, UpdateMode::OnExpiry);
63 return g;
64}
65
66template <typename DigestT, typename ClockT>
67void BufferedStat<DigestT, ClockT>::flush() {
68 std::unique_lock<SharedMutex> g(mutex_);
69 doUpdate(ClockT::now(), g, UpdateMode::Now);
70}
71
72template <typename DigestT, typename ClockT>
73void BufferedStat<DigestT, ClockT>::doUpdate(
74 TimePoint now,
75 const std::unique_lock<SharedMutex>& g,
76 UpdateMode updateMode) {
77 DCHECK(g.owns_lock());
78 // Check that no other thread has performed the slide after the check
79 auto oldExpiry = expiry_.load(std::memory_order_relaxed).tp;
80 if (now > oldExpiry || updateMode == UpdateMode::Now) {
81 now = roundUp(now);
82 expiry_.store(TimePointHolder(now), std::memory_order_relaxed);
83 onNewDigest(digestBuilder_.build(), now, oldExpiry, g);
84 }
85}
86
87template <typename DigestT, typename ClockT>
88BufferedDigest<DigestT, ClockT>::BufferedDigest(
89 typename ClockT::duration bufferDuration,
90 size_t bufferSize,
91 size_t digestSize)
92 : BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize),
93 digest_(digestSize) {}
94
95template <typename DigestT, typename ClockT>
96DigestT BufferedDigest<DigestT, ClockT>::get(TimePoint now) {
97 auto g = this->updateIfExpired(now);
98 return digest_;
99}
100
101template <typename DigestT, typename ClockT>
102void BufferedDigest<DigestT, ClockT>::onNewDigest(
103 DigestT digest,
104 TimePoint /*newExpiry*/,
105 TimePoint /*oldExpiry*/,
106 const std::unique_lock<SharedMutex>& /*g*/) {
107 std::array<DigestT, 2> a{{digest_, std::move(digest)}};
108 digest_ = DigestT::merge(a);
109}
110
111template <typename DigestT, typename ClockT>
112BufferedSlidingWindow<DigestT, ClockT>::BufferedSlidingWindow(
113 size_t nBuckets,
114 typename ClockT::duration bufferDuration,
115 size_t bufferSize,
116 size_t digestSize)
117 : BufferedStat<DigestT, ClockT>(bufferDuration, bufferSize, digestSize),
118 slidingWindow_([=]() { return DigestT(digestSize); }, nBuckets) {}
119
120template <typename DigestT, typename ClockT>
121std::vector<DigestT> BufferedSlidingWindow<DigestT, ClockT>::get(
122 TimePoint now) {
123 std::vector<DigestT> digests;
124 {
125 auto g = this->updateIfExpired(now);
126 digests = slidingWindow_.get();
127 }
128 digests.erase(
129 std::remove_if(
130 digests.begin(),
131 digests.end(),
132 [](const DigestT& digest) { return digest.empty(); }),
133 digests.end());
134 return digests;
135}
136
137template <typename DigestT, typename ClockT>
138void BufferedSlidingWindow<DigestT, ClockT>::onNewDigest(
139 DigestT digest,
140 TimePoint newExpiry,
141 TimePoint oldExpiry,
142 const std::unique_lock<SharedMutex>& /*g*/) {
143 if (newExpiry > oldExpiry) {
144 auto diff = newExpiry - oldExpiry;
145 slidingWindow_.slide(diff / this->bufferDuration_);
146 diff -= this->bufferDuration_;
147 slidingWindow_.set(diff / this->bufferDuration_, std::move(digest));
148 } else {
149 // just update current window
150 std::array<DigestT, 2> a{{slidingWindow_.front(), std::move(digest)}};
151 slidingWindow_.set(0 /* current window */, DigestT::merge(a));
152 }
153}
154
155} // namespace detail
156} // namespace folly
157