1 | /* |
2 | * Copyright 2012-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/Likely.h> |
20 | #include <folly/stats/BucketedTimeSeries.h> |
21 | #include <glog/logging.h> |
22 | #include <algorithm> |
23 | #include <stdexcept> |
24 | |
25 | namespace folly { |
26 | |
27 | template <typename VT, typename CT> |
28 | BucketedTimeSeries<VT, CT>::BucketedTimeSeries( |
29 | size_t nBuckets, |
30 | Duration maxDuration) |
31 | : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) { |
32 | // For tracking all-time data we only use total_, and don't need to bother |
33 | // with buckets_ |
34 | if (!isAllTime()) { |
35 | // Round nBuckets down to duration_.count(). |
36 | // |
37 | // There is no point in having more buckets than our timestamp |
38 | // granularity: otherwise we would have buckets that could never be used. |
39 | if (nBuckets > size_t(duration_.count())) { |
40 | nBuckets = size_t(duration_.count()); |
41 | } |
42 | |
43 | buckets_.resize(nBuckets, Bucket()); |
44 | } |
45 | } |
46 | |
47 | template <typename VT, typename CT> |
48 | BucketedTimeSeries<VT, CT>::BucketedTimeSeries( |
49 | TimePoint theFirstTime, |
50 | TimePoint theLatestTime, |
51 | Duration maxDuration, |
52 | const std::vector<Bucket>& bucketsList) |
53 | : firstTime_(theFirstTime), |
54 | latestTime_(theLatestTime), |
55 | duration_(maxDuration), |
56 | buckets_(bucketsList) { |
57 | // Come up with the total_ from buckets_ being passed in |
58 | for (auto const& bucket : buckets_) { |
59 | total_.add(bucket.sum, bucket.count); |
60 | } |
61 | |
62 | // Verify the integrity of the data |
63 | |
64 | // If firstTime is greater than latestTime, the total count should be 0. |
65 | // (firstTime being greater than latestTime means that no data points have |
66 | // ever been added to the time series.) |
67 | if (firstTime_ > latestTime_ && (total_.sum != 0 || total_.count != 0)) { |
68 | throw std::invalid_argument( |
69 | "The total should have been 0 " |
70 | "if firstTime is greater than lastestTime" ); |
71 | } |
72 | |
73 | // If firstTime is less than or equal to latestTime, |
74 | // latestTime - firstTime should be less than or equal to the duration. |
75 | if (firstTime_ <= latestTime_ && latestTime_ - firstTime_ > duration_) { |
76 | throw std::invalid_argument( |
77 | "The difference between firstTime and latestTime " |
78 | "should be less than or equal to the duration" ); |
79 | } |
80 | } |
81 | |
82 | template <typename VT, typename CT> |
83 | bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) { |
84 | return addValueAggregated(now, val, 1); |
85 | } |
86 | |
87 | template <typename VT, typename CT> |
88 | bool BucketedTimeSeries<VT, CT>::addValue( |
89 | TimePoint now, |
90 | const ValueType& val, |
91 | uint64_t times) { |
92 | return addValueAggregated(now, val * ValueType(times), times); |
93 | } |
94 | |
95 | template <typename VT, typename CT> |
96 | bool BucketedTimeSeries<VT, CT>::addValueAggregated( |
97 | TimePoint now, |
98 | const ValueType& total, |
99 | uint64_t nsamples) { |
100 | if (isAllTime()) { |
101 | if (UNLIKELY(empty())) { |
102 | firstTime_ = now; |
103 | latestTime_ = now; |
104 | } else if (now > latestTime_) { |
105 | latestTime_ = now; |
106 | } else if (now < firstTime_) { |
107 | firstTime_ = now; |
108 | } |
109 | total_.add(total, nsamples); |
110 | return true; |
111 | } |
112 | |
113 | size_t bucketIdx; |
114 | if (UNLIKELY(empty())) { |
115 | // First data point we've ever seen |
116 | firstTime_ = now; |
117 | latestTime_ = now; |
118 | bucketIdx = getBucketIdx(now); |
119 | } else if (now > latestTime_) { |
120 | // More recent time. Need to update the buckets. |
121 | bucketIdx = updateBuckets(now); |
122 | } else if (LIKELY(now == latestTime_)) { |
123 | // Current time. |
124 | bucketIdx = getBucketIdx(now); |
125 | } else { |
126 | // An earlier time in the past. We need to check if this time still falls |
127 | // within our window. |
128 | if (now < getEarliestTimeNonEmpty()) { |
129 | return false; |
130 | } |
131 | bucketIdx = getBucketIdx(now); |
132 | } |
133 | |
134 | total_.add(total, nsamples); |
135 | buckets_[bucketIdx].add(total, nsamples); |
136 | return true; |
137 | } |
138 | |
139 | template <typename VT, typename CT> |
140 | size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) { |
141 | if (empty()) { |
142 | // This is the first data point. |
143 | firstTime_ = now; |
144 | } |
145 | |
146 | // For all-time data, all we need to do is update latestTime_ |
147 | if (isAllTime()) { |
148 | latestTime_ = std::max(latestTime_, now); |
149 | return 0; |
150 | } |
151 | |
152 | // Make sure time doesn't go backwards. |
153 | // If the time is less than or equal to the latest time we have already seen, |
154 | // we don't need to do anything. |
155 | if (now <= latestTime_) { |
156 | return getBucketIdx(latestTime_); |
157 | } |
158 | |
159 | return updateBuckets(now); |
160 | } |
161 | |
162 | template <typename VT, typename CT> |
163 | size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) { |
164 | // We could cache nextBucketStart as a member variable, so we don't have to |
165 | // recompute it each time update() is called with a new timestamp value. |
166 | // This makes things faster when update() (or addValue()) is called once |
167 | // per second, but slightly slower when update() is called multiple times a |
168 | // second. We care more about optimizing the cases where addValue() is being |
169 | // called frequently. If addValue() is only being called once every few |
170 | // seconds, it doesn't matter as much if it is fast. |
171 | |
172 | // Get info about the bucket that latestTime_ points at |
173 | size_t currentBucket; |
174 | TimePoint currentBucketStart; |
175 | TimePoint nextBucketStart; |
176 | getBucketInfo( |
177 | latestTime_, ¤tBucket, ¤tBucketStart, &nextBucketStart); |
178 | |
179 | // Update latestTime_ |
180 | latestTime_ = now; |
181 | |
182 | if (now < nextBucketStart) { |
183 | // We're still in the same bucket. |
184 | // We're done after updating latestTime_. |
185 | return currentBucket; |
186 | } else if (now >= currentBucketStart + duration_) { |
187 | // It's been a while. We have wrapped, and all of the buckets need to be |
188 | // cleared. |
189 | for (Bucket& bucket : buckets_) { |
190 | bucket.clear(); |
191 | } |
192 | total_.clear(); |
193 | return getBucketIdx(latestTime_); |
194 | } else { |
195 | // clear all the buckets between the last time and current time, meaning |
196 | // buckets in the range [(currentBucket+1), newBucket]. Note that |
197 | // the bucket (currentBucket+1) is always the oldest bucket we have. Since |
198 | // our array is circular, loop when we reach the end. |
199 | size_t newBucket = getBucketIdx(now); |
200 | size_t idx = currentBucket; |
201 | while (idx != newBucket) { |
202 | ++idx; |
203 | if (idx >= buckets_.size()) { |
204 | idx = 0; |
205 | } |
206 | total_ -= buckets_[idx]; |
207 | buckets_[idx].clear(); |
208 | } |
209 | return newBucket; |
210 | } |
211 | } |
212 | |
213 | template <typename VT, typename CT> |
214 | void BucketedTimeSeries<VT, CT>::clear() { |
215 | for (Bucket& bucket : buckets_) { |
216 | bucket.clear(); |
217 | } |
218 | total_.clear(); |
219 | // Set firstTime_ larger than latestTime_, |
220 | // to indicate that the timeseries is empty |
221 | firstTime_ = TimePoint(Duration(1)); |
222 | latestTime_ = TimePoint(); |
223 | } |
224 | |
225 | template <typename VT, typename CT> |
226 | typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const { |
227 | if (empty()) { |
228 | return TimePoint(); |
229 | } |
230 | if (isAllTime()) { |
231 | return firstTime_; |
232 | } |
233 | |
234 | // Compute the earliest time we can track |
235 | TimePoint earliestTime = getEarliestTimeNonEmpty(); |
236 | |
237 | // We're never tracking data before firstTime_ |
238 | earliestTime = std::max(earliestTime, firstTime_); |
239 | |
240 | return earliestTime; |
241 | } |
242 | |
243 | template <typename VT, typename CT> |
244 | typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty() |
245 | const { |
246 | size_t currentBucket; |
247 | TimePoint currentBucketStart; |
248 | TimePoint nextBucketStart; |
249 | getBucketInfo( |
250 | latestTime_, ¤tBucket, ¤tBucketStart, &nextBucketStart); |
251 | |
252 | // Subtract 1 duration from the start of the next bucket to find the |
253 | // earliest possible data point we could be tracking. |
254 | return nextBucketStart - duration_; |
255 | } |
256 | |
257 | template <typename VT, typename CT> |
258 | typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const { |
259 | if (empty()) { |
260 | return Duration(0); |
261 | } |
262 | |
263 | // Add 1 since [latestTime_, earliestTime] is an inclusive interval. |
264 | return latestTime_ - getEarliestTime() + Duration(1); |
265 | } |
266 | |
267 | template <typename VT, typename CT> |
268 | typename CT::duration BucketedTimeSeries<VT, CT>::elapsed( |
269 | TimePoint start, |
270 | TimePoint end) const { |
271 | if (empty()) { |
272 | return Duration(0); |
273 | } |
274 | start = std::max(start, getEarliestTime()); |
275 | end = std::min(end, latestTime_ + Duration(1)); |
276 | end = std::max(start, end); |
277 | return end - start; |
278 | } |
279 | |
280 | template <typename VT, typename CT> |
281 | VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const { |
282 | ValueType total = ValueType(); |
283 | forEachBucket( |
284 | start, |
285 | end, |
286 | [&](const Bucket& bucket, |
287 | TimePoint bucketStart, |
288 | TimePoint nextBucketStart) -> bool { |
289 | total += this->rangeAdjust( |
290 | bucketStart, nextBucketStart, start, end, bucket.sum); |
291 | return true; |
292 | }); |
293 | |
294 | return total; |
295 | } |
296 | |
297 | template <typename VT, typename CT> |
298 | uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end) |
299 | const { |
300 | uint64_t sample_count = 0; |
301 | forEachBucket( |
302 | start, |
303 | end, |
304 | [&](const Bucket& bucket, |
305 | TimePoint bucketStart, |
306 | TimePoint nextBucketStart) -> bool { |
307 | sample_count += this->rangeAdjust( |
308 | bucketStart, nextBucketStart, start, end, ValueType(bucket.count)); |
309 | return true; |
310 | }); |
311 | |
312 | return sample_count; |
313 | } |
314 | |
315 | template <typename VT, typename CT> |
316 | template <typename ReturnType> |
317 | ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end) |
318 | const { |
319 | ValueType total = ValueType(); |
320 | uint64_t sample_count = 0; |
321 | forEachBucket( |
322 | start, |
323 | end, |
324 | [&](const Bucket& bucket, |
325 | TimePoint bucketStart, |
326 | TimePoint nextBucketStart) -> bool { |
327 | total += this->rangeAdjust( |
328 | bucketStart, nextBucketStart, start, end, bucket.sum); |
329 | sample_count += this->rangeAdjust( |
330 | bucketStart, nextBucketStart, start, end, ValueType(bucket.count)); |
331 | return true; |
332 | }); |
333 | |
334 | if (sample_count == 0) { |
335 | return ReturnType(0); |
336 | } |
337 | |
338 | return detail::avgHelper<ReturnType>(total, sample_count); |
339 | } |
340 | |
341 | /* |
342 | * A note about some of the bucket index calculations below: |
343 | * |
344 | * buckets_.size() may not divide evenly into duration_. When this happens, |
345 | * some buckets will be wider than others. We still want to spread the data |
346 | * out as evenly as possible among the buckets (as opposed to just making the |
347 | * last bucket be significantly wider than all of the others). |
348 | * |
349 | * To make the division work out, we pretend that the buckets are each |
350 | * duration_ wide, so that the overall duration becomes |
351 | * buckets.size() * duration_. |
352 | * |
353 | * To transform a real timestamp into the scale used by our buckets, |
354 | * we have to multiply by buckets_.size(). To figure out which bucket it goes |
355 | * into, we then divide by duration_. |
356 | */ |
357 | |
358 | template <typename VT, typename CT> |
359 | size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const { |
360 | // For all-time data we don't use buckets_. Everything is tracked in total_. |
361 | DCHECK(!isAllTime()); |
362 | |
363 | auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_); |
364 | return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count(); |
365 | } |
366 | |
367 | /* |
368 | * Compute the bucket index for the specified time, as well as the earliest |
369 | * time that falls into this bucket. |
370 | */ |
371 | template <typename VT, typename CT> |
372 | void BucketedTimeSeries<VT, CT>::getBucketInfo( |
373 | TimePoint time, |
374 | size_t* bucketIdx, |
375 | TimePoint* bucketStart, |
376 | TimePoint* nextBucketStart) const { |
377 | typedef typename Duration::rep TimeInt; |
378 | DCHECK(!isAllTime()); |
379 | |
380 | // Keep these two lines together. The compiler should be able to compute |
381 | // both the division and modulus with a single operation. |
382 | Duration timeMod = time.time_since_epoch() % duration_; |
383 | TimeInt numFullDurations = time.time_since_epoch() / duration_; |
384 | |
385 | TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size()); |
386 | |
387 | // Keep these two lines together. The compiler should be able to compute |
388 | // both the division and modulus with a single operation. |
389 | *bucketIdx = size_t(scaledTime / duration_.count()); |
390 | TimeInt scaledOffsetInBucket = scaledTime % duration_.count(); |
391 | |
392 | TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket; |
393 | TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count(); |
394 | |
395 | Duration bucketStartMod( |
396 | (scaledBucketStart + buckets_.size() - 1) / buckets_.size()); |
397 | Duration nextBucketStartMod( |
398 | (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()); |
399 | |
400 | TimePoint durationStart(numFullDurations * duration_); |
401 | *bucketStart = bucketStartMod + durationStart; |
402 | *nextBucketStart = nextBucketStartMod + durationStart; |
403 | } |
404 | |
405 | template <typename VT, typename CT> |
406 | template <typename Function> |
407 | void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const { |
408 | if (isAllTime()) { |
409 | fn(total_, firstTime_, latestTime_ + Duration(1)); |
410 | return; |
411 | } |
412 | |
413 | typedef typename Duration::rep TimeInt; |
414 | |
415 | // Compute durationStart, latestBucketIdx, and scaledNextBucketStart, |
416 | // the same way as in getBucketInfo(). |
417 | Duration timeMod = latestTime_.time_since_epoch() % duration_; |
418 | TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_; |
419 | TimePoint durationStart(numFullDurations * duration_); |
420 | TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size()); |
421 | size_t latestBucketIdx = size_t(scaledTime / duration_.count()); |
422 | TimeInt scaledOffsetInBucket = scaledTime % duration_.count(); |
423 | TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket; |
424 | TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count(); |
425 | |
426 | // Walk through the buckets, starting one past the current bucket. |
427 | // The next bucket is from the previous cycle, so subtract 1 duration |
428 | // from durationStart. |
429 | size_t idx = latestBucketIdx; |
430 | durationStart -= duration_; |
431 | |
432 | TimePoint nextBucketStart = |
433 | Duration( |
434 | (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) + |
435 | durationStart; |
436 | while (true) { |
437 | ++idx; |
438 | if (idx >= buckets_.size()) { |
439 | idx = 0; |
440 | durationStart += duration_; |
441 | scaledNextBucketStart = duration_.count(); |
442 | } else { |
443 | scaledNextBucketStart += duration_.count(); |
444 | } |
445 | |
446 | TimePoint bucketStart = nextBucketStart; |
447 | nextBucketStart = |
448 | Duration( |
449 | (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) + |
450 | durationStart; |
451 | |
452 | // Should we bother skipping buckets where firstTime_ >= nextBucketStart? |
453 | // For now we go ahead and invoke the function with these buckets. |
454 | // sum and count should always be 0 in these buckets. |
455 | |
456 | DCHECK_LE( |
457 | bucketStart.time_since_epoch().count(), |
458 | latestTime_.time_since_epoch().count()); |
459 | bool ret = fn(buckets_[idx], bucketStart, nextBucketStart); |
460 | if (!ret) { |
461 | break; |
462 | } |
463 | |
464 | if (idx == latestBucketIdx) { |
465 | // all done |
466 | break; |
467 | } |
468 | } |
469 | } |
470 | |
471 | /* |
472 | * Adjust the input value from the specified bucket to only account |
473 | * for the desired range. |
474 | * |
475 | * For example, if the bucket spans time [10, 20), but we only care about the |
476 | * range [10, 16), this will return 60% of the input value. |
477 | */ |
478 | template <typename VT, typename CT> |
479 | VT BucketedTimeSeries<VT, CT>::rangeAdjust( |
480 | TimePoint bucketStart, |
481 | TimePoint nextBucketStart, |
482 | TimePoint start, |
483 | TimePoint end, |
484 | ValueType input) const { |
485 | // If nextBucketStart is greater than latestTime_, treat nextBucketStart as |
486 | // if it were latestTime_. This makes us more accurate when someone is |
487 | // querying for all of the data up to latestTime_. Even though latestTime_ |
488 | // may only be partially through the bucket, we don't want to adjust |
489 | // downwards in this case, because the bucket really only has data up to |
490 | // latestTime_. |
491 | if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) { |
492 | nextBucketStart = latestTime_ + Duration(1); |
493 | } |
494 | |
495 | if (start <= bucketStart && end >= nextBucketStart) { |
496 | // The bucket is wholly contained in the [start, end) interval |
497 | return input; |
498 | } |
499 | |
500 | TimePoint intervalStart = std::max(start, bucketStart); |
501 | TimePoint intervalEnd = std::min(end, nextBucketStart); |
502 | return input * (intervalEnd - intervalStart) / |
503 | (nextBucketStart - bucketStart); |
504 | } |
505 | |
506 | template <typename VT, typename CT> |
507 | template <typename Function> |
508 | void BucketedTimeSeries<VT, CT>::forEachBucket( |
509 | TimePoint start, |
510 | TimePoint end, |
511 | Function fn) const { |
512 | forEachBucket( |
513 | [&start, &end, &fn]( |
514 | const Bucket& bucket, |
515 | TimePoint bucketStart, |
516 | TimePoint nextBucketStart) -> bool { |
517 | if (start >= nextBucketStart) { |
518 | return true; |
519 | } |
520 | if (end <= bucketStart) { |
521 | return false; |
522 | } |
523 | bool ret = fn(bucket, bucketStart, nextBucketStart); |
524 | return ret; |
525 | }); |
526 | } |
527 | |
528 | } // namespace folly |
529 | |