1/*
2 * Copyright 2012-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/Likely.h>
20#include <folly/stats/BucketedTimeSeries.h>
21#include <glog/logging.h>
22#include <algorithm>
23#include <stdexcept>
24
25namespace folly {
26
27template <typename VT, typename CT>
28BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
29 size_t nBuckets,
30 Duration maxDuration)
31 : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
32 // For tracking all-time data we only use total_, and don't need to bother
33 // with buckets_
34 if (!isAllTime()) {
35 // Round nBuckets down to duration_.count().
36 //
37 // There is no point in having more buckets than our timestamp
38 // granularity: otherwise we would have buckets that could never be used.
39 if (nBuckets > size_t(duration_.count())) {
40 nBuckets = size_t(duration_.count());
41 }
42
43 buckets_.resize(nBuckets, Bucket());
44 }
45}
46
47template <typename VT, typename CT>
48BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
49 TimePoint theFirstTime,
50 TimePoint theLatestTime,
51 Duration maxDuration,
52 const std::vector<Bucket>& bucketsList)
53 : firstTime_(theFirstTime),
54 latestTime_(theLatestTime),
55 duration_(maxDuration),
56 buckets_(bucketsList) {
57 // Come up with the total_ from buckets_ being passed in
58 for (auto const& bucket : buckets_) {
59 total_.add(bucket.sum, bucket.count);
60 }
61
62 // Verify the integrity of the data
63
64 // If firstTime is greater than latestTime, the total count should be 0.
65 // (firstTime being greater than latestTime means that no data points have
66 // ever been added to the time series.)
67 if (firstTime_ > latestTime_ && (total_.sum != 0 || total_.count != 0)) {
68 throw std::invalid_argument(
69 "The total should have been 0 "
70 "if firstTime is greater than lastestTime");
71 }
72
73 // If firstTime is less than or equal to latestTime,
74 // latestTime - firstTime should be less than or equal to the duration.
75 if (firstTime_ <= latestTime_ && latestTime_ - firstTime_ > duration_) {
76 throw std::invalid_argument(
77 "The difference between firstTime and latestTime "
78 "should be less than or equal to the duration");
79 }
80}
81
82template <typename VT, typename CT>
83bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
84 return addValueAggregated(now, val, 1);
85}
86
87template <typename VT, typename CT>
88bool BucketedTimeSeries<VT, CT>::addValue(
89 TimePoint now,
90 const ValueType& val,
91 uint64_t times) {
92 return addValueAggregated(now, val * ValueType(times), times);
93}
94
95template <typename VT, typename CT>
96bool BucketedTimeSeries<VT, CT>::addValueAggregated(
97 TimePoint now,
98 const ValueType& total,
99 uint64_t nsamples) {
100 if (isAllTime()) {
101 if (UNLIKELY(empty())) {
102 firstTime_ = now;
103 latestTime_ = now;
104 } else if (now > latestTime_) {
105 latestTime_ = now;
106 } else if (now < firstTime_) {
107 firstTime_ = now;
108 }
109 total_.add(total, nsamples);
110 return true;
111 }
112
113 size_t bucketIdx;
114 if (UNLIKELY(empty())) {
115 // First data point we've ever seen
116 firstTime_ = now;
117 latestTime_ = now;
118 bucketIdx = getBucketIdx(now);
119 } else if (now > latestTime_) {
120 // More recent time. Need to update the buckets.
121 bucketIdx = updateBuckets(now);
122 } else if (LIKELY(now == latestTime_)) {
123 // Current time.
124 bucketIdx = getBucketIdx(now);
125 } else {
126 // An earlier time in the past. We need to check if this time still falls
127 // within our window.
128 if (now < getEarliestTimeNonEmpty()) {
129 return false;
130 }
131 bucketIdx = getBucketIdx(now);
132 }
133
134 total_.add(total, nsamples);
135 buckets_[bucketIdx].add(total, nsamples);
136 return true;
137}
138
139template <typename VT, typename CT>
140size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
141 if (empty()) {
142 // This is the first data point.
143 firstTime_ = now;
144 }
145
146 // For all-time data, all we need to do is update latestTime_
147 if (isAllTime()) {
148 latestTime_ = std::max(latestTime_, now);
149 return 0;
150 }
151
152 // Make sure time doesn't go backwards.
153 // If the time is less than or equal to the latest time we have already seen,
154 // we don't need to do anything.
155 if (now <= latestTime_) {
156 return getBucketIdx(latestTime_);
157 }
158
159 return updateBuckets(now);
160}
161
162template <typename VT, typename CT>
163size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
164 // We could cache nextBucketStart as a member variable, so we don't have to
165 // recompute it each time update() is called with a new timestamp value.
166 // This makes things faster when update() (or addValue()) is called once
167 // per second, but slightly slower when update() is called multiple times a
168 // second. We care more about optimizing the cases where addValue() is being
169 // called frequently. If addValue() is only being called once every few
170 // seconds, it doesn't matter as much if it is fast.
171
172 // Get info about the bucket that latestTime_ points at
173 size_t currentBucket;
174 TimePoint currentBucketStart;
175 TimePoint nextBucketStart;
176 getBucketInfo(
177 latestTime_, &currentBucket, &currentBucketStart, &nextBucketStart);
178
179 // Update latestTime_
180 latestTime_ = now;
181
182 if (now < nextBucketStart) {
183 // We're still in the same bucket.
184 // We're done after updating latestTime_.
185 return currentBucket;
186 } else if (now >= currentBucketStart + duration_) {
187 // It's been a while. We have wrapped, and all of the buckets need to be
188 // cleared.
189 for (Bucket& bucket : buckets_) {
190 bucket.clear();
191 }
192 total_.clear();
193 return getBucketIdx(latestTime_);
194 } else {
195 // clear all the buckets between the last time and current time, meaning
196 // buckets in the range [(currentBucket+1), newBucket]. Note that
197 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
198 // our array is circular, loop when we reach the end.
199 size_t newBucket = getBucketIdx(now);
200 size_t idx = currentBucket;
201 while (idx != newBucket) {
202 ++idx;
203 if (idx >= buckets_.size()) {
204 idx = 0;
205 }
206 total_ -= buckets_[idx];
207 buckets_[idx].clear();
208 }
209 return newBucket;
210 }
211}
212
213template <typename VT, typename CT>
214void BucketedTimeSeries<VT, CT>::clear() {
215 for (Bucket& bucket : buckets_) {
216 bucket.clear();
217 }
218 total_.clear();
219 // Set firstTime_ larger than latestTime_,
220 // to indicate that the timeseries is empty
221 firstTime_ = TimePoint(Duration(1));
222 latestTime_ = TimePoint();
223}
224
225template <typename VT, typename CT>
226typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
227 if (empty()) {
228 return TimePoint();
229 }
230 if (isAllTime()) {
231 return firstTime_;
232 }
233
234 // Compute the earliest time we can track
235 TimePoint earliestTime = getEarliestTimeNonEmpty();
236
237 // We're never tracking data before firstTime_
238 earliestTime = std::max(earliestTime, firstTime_);
239
240 return earliestTime;
241}
242
243template <typename VT, typename CT>
244typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
245 const {
246 size_t currentBucket;
247 TimePoint currentBucketStart;
248 TimePoint nextBucketStart;
249 getBucketInfo(
250 latestTime_, &currentBucket, &currentBucketStart, &nextBucketStart);
251
252 // Subtract 1 duration from the start of the next bucket to find the
253 // earliest possible data point we could be tracking.
254 return nextBucketStart - duration_;
255}
256
257template <typename VT, typename CT>
258typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
259 if (empty()) {
260 return Duration(0);
261 }
262
263 // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
264 return latestTime_ - getEarliestTime() + Duration(1);
265}
266
267template <typename VT, typename CT>
268typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
269 TimePoint start,
270 TimePoint end) const {
271 if (empty()) {
272 return Duration(0);
273 }
274 start = std::max(start, getEarliestTime());
275 end = std::min(end, latestTime_ + Duration(1));
276 end = std::max(start, end);
277 return end - start;
278}
279
280template <typename VT, typename CT>
281VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
282 ValueType total = ValueType();
283 forEachBucket(
284 start,
285 end,
286 [&](const Bucket& bucket,
287 TimePoint bucketStart,
288 TimePoint nextBucketStart) -> bool {
289 total += this->rangeAdjust(
290 bucketStart, nextBucketStart, start, end, bucket.sum);
291 return true;
292 });
293
294 return total;
295}
296
297template <typename VT, typename CT>
298uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
299 const {
300 uint64_t sample_count = 0;
301 forEachBucket(
302 start,
303 end,
304 [&](const Bucket& bucket,
305 TimePoint bucketStart,
306 TimePoint nextBucketStart) -> bool {
307 sample_count += this->rangeAdjust(
308 bucketStart, nextBucketStart, start, end, ValueType(bucket.count));
309 return true;
310 });
311
312 return sample_count;
313}
314
315template <typename VT, typename CT>
316template <typename ReturnType>
317ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
318 const {
319 ValueType total = ValueType();
320 uint64_t sample_count = 0;
321 forEachBucket(
322 start,
323 end,
324 [&](const Bucket& bucket,
325 TimePoint bucketStart,
326 TimePoint nextBucketStart) -> bool {
327 total += this->rangeAdjust(
328 bucketStart, nextBucketStart, start, end, bucket.sum);
329 sample_count += this->rangeAdjust(
330 bucketStart, nextBucketStart, start, end, ValueType(bucket.count));
331 return true;
332 });
333
334 if (sample_count == 0) {
335 return ReturnType(0);
336 }
337
338 return detail::avgHelper<ReturnType>(total, sample_count);
339}
340
341/*
342 * A note about some of the bucket index calculations below:
343 *
344 * buckets_.size() may not divide evenly into duration_. When this happens,
345 * some buckets will be wider than others. We still want to spread the data
346 * out as evenly as possible among the buckets (as opposed to just making the
347 * last bucket be significantly wider than all of the others).
348 *
349 * To make the division work out, we pretend that the buckets are each
350 * duration_ wide, so that the overall duration becomes
351 * buckets.size() * duration_.
352 *
353 * To transform a real timestamp into the scale used by our buckets,
354 * we have to multiply by buckets_.size(). To figure out which bucket it goes
355 * into, we then divide by duration_.
356 */
357
358template <typename VT, typename CT>
359size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
360 // For all-time data we don't use buckets_. Everything is tracked in total_.
361 DCHECK(!isAllTime());
362
363 auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
364 return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
365}
366
367/*
368 * Compute the bucket index for the specified time, as well as the earliest
369 * time that falls into this bucket.
370 */
371template <typename VT, typename CT>
372void BucketedTimeSeries<VT, CT>::getBucketInfo(
373 TimePoint time,
374 size_t* bucketIdx,
375 TimePoint* bucketStart,
376 TimePoint* nextBucketStart) const {
377 typedef typename Duration::rep TimeInt;
378 DCHECK(!isAllTime());
379
380 // Keep these two lines together. The compiler should be able to compute
381 // both the division and modulus with a single operation.
382 Duration timeMod = time.time_since_epoch() % duration_;
383 TimeInt numFullDurations = time.time_since_epoch() / duration_;
384
385 TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size());
386
387 // Keep these two lines together. The compiler should be able to compute
388 // both the division and modulus with a single operation.
389 *bucketIdx = size_t(scaledTime / duration_.count());
390 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
391
392 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
393 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
394
395 Duration bucketStartMod(
396 (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
397 Duration nextBucketStartMod(
398 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
399
400 TimePoint durationStart(numFullDurations * duration_);
401 *bucketStart = bucketStartMod + durationStart;
402 *nextBucketStart = nextBucketStartMod + durationStart;
403}
404
405template <typename VT, typename CT>
406template <typename Function>
407void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
408 if (isAllTime()) {
409 fn(total_, firstTime_, latestTime_ + Duration(1));
410 return;
411 }
412
413 typedef typename Duration::rep TimeInt;
414
415 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
416 // the same way as in getBucketInfo().
417 Duration timeMod = latestTime_.time_since_epoch() % duration_;
418 TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
419 TimePoint durationStart(numFullDurations * duration_);
420 TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size());
421 size_t latestBucketIdx = size_t(scaledTime / duration_.count());
422 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
423 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
424 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
425
426 // Walk through the buckets, starting one past the current bucket.
427 // The next bucket is from the previous cycle, so subtract 1 duration
428 // from durationStart.
429 size_t idx = latestBucketIdx;
430 durationStart -= duration_;
431
432 TimePoint nextBucketStart =
433 Duration(
434 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
435 durationStart;
436 while (true) {
437 ++idx;
438 if (idx >= buckets_.size()) {
439 idx = 0;
440 durationStart += duration_;
441 scaledNextBucketStart = duration_.count();
442 } else {
443 scaledNextBucketStart += duration_.count();
444 }
445
446 TimePoint bucketStart = nextBucketStart;
447 nextBucketStart =
448 Duration(
449 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
450 durationStart;
451
452 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
453 // For now we go ahead and invoke the function with these buckets.
454 // sum and count should always be 0 in these buckets.
455
456 DCHECK_LE(
457 bucketStart.time_since_epoch().count(),
458 latestTime_.time_since_epoch().count());
459 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
460 if (!ret) {
461 break;
462 }
463
464 if (idx == latestBucketIdx) {
465 // all done
466 break;
467 }
468 }
469}
470
471/*
472 * Adjust the input value from the specified bucket to only account
473 * for the desired range.
474 *
475 * For example, if the bucket spans time [10, 20), but we only care about the
476 * range [10, 16), this will return 60% of the input value.
477 */
478template <typename VT, typename CT>
479VT BucketedTimeSeries<VT, CT>::rangeAdjust(
480 TimePoint bucketStart,
481 TimePoint nextBucketStart,
482 TimePoint start,
483 TimePoint end,
484 ValueType input) const {
485 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
486 // if it were latestTime_. This makes us more accurate when someone is
487 // querying for all of the data up to latestTime_. Even though latestTime_
488 // may only be partially through the bucket, we don't want to adjust
489 // downwards in this case, because the bucket really only has data up to
490 // latestTime_.
491 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
492 nextBucketStart = latestTime_ + Duration(1);
493 }
494
495 if (start <= bucketStart && end >= nextBucketStart) {
496 // The bucket is wholly contained in the [start, end) interval
497 return input;
498 }
499
500 TimePoint intervalStart = std::max(start, bucketStart);
501 TimePoint intervalEnd = std::min(end, nextBucketStart);
502 return input * (intervalEnd - intervalStart) /
503 (nextBucketStart - bucketStart);
504}
505
506template <typename VT, typename CT>
507template <typename Function>
508void BucketedTimeSeries<VT, CT>::forEachBucket(
509 TimePoint start,
510 TimePoint end,
511 Function fn) const {
512 forEachBucket(
513 [&start, &end, &fn](
514 const Bucket& bucket,
515 TimePoint bucketStart,
516 TimePoint nextBucketStart) -> bool {
517 if (start >= nextBucketStart) {
518 return true;
519 }
520 if (end <= bucketStart) {
521 return false;
522 }
523 bool ret = fn(bucket, bucketStart, nextBucketStart);
524 return ret;
525 });
526}
527
528} // namespace folly
529