1/*
2 * hist_track.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23/*
24 * A histogram with cached data.
25 */
26
27
28//==========================================================
29// Includes
30//
31
32#include "hist_track.h"
33
34#include <stdbool.h>
35#include <stdint.h>
36#include <stdio.h>
37#include <stdlib.h>
38#include <string.h>
39#include <time.h>
40
41#include <aerospike/as_atomic.h>
42#include <citrusleaf/alloc.h>
43
44#include "cf_mutex.h"
45#include "dynbuf.h"
46#include "fault.h"
47#include "hist.h"
48
49
50//==========================================================
51// Private "Class Members"
52//
53
54//------------------------------------------------
55// Constants
56//
57
58// More than one day of 10 second slices uses too much memory.
59const uint32_t MAX_NUM_ROWS = (24 * 60 * 60) / 10;
60
61// Caching this few is legal but silly.
62const uint32_t MIN_NUM_ROWS = 2;
63
64// Don't track/report thresholds with a larger bucket index than this.
65// This corresponds to the 32 second threshold - that should be big enough.
66#define MAX_BUCKET 15
67
68// Don't track/report more than this many thresholds.
69// This could in principle be less than (MAX_BUCKET + 1), e.g. it could be
70// 4, and we could track buckets 0, 5, 10, 15.
71#define MAX_NUM_COLS (MAX_BUCKET + 1)
72
73#define DEFAULT_NUM_COLS 3
74const uint32_t default_buckets[DEFAULT_NUM_COLS] = { 0, 3, 6 };
75// For our standard latency histograms, 0: >1ms, 3: >8ms, 6: >64ms.
76
77// No output line can be longer than this.
78#define MAX_FORMATTED_ROW_SIZE 512
79#define MAX_FORMATTED_SETTINGS_SIZE 512
80
81//------------------------------------------------
82// Data
83//
84
85typedef struct row_s {
86 uint32_t timestamp;
87 uint64_t total;
88 uint64_t overs[];
89} row;
90
91struct cf_hist_track_s {
92 // Base Histogram (must be first)
93 histogram hist;
94
95 // Tracking-related
96 row* rows;
97 size_t row_size;
98 uint32_t num_rows;
99 uint32_t write_row_n;
100 uint32_t oldest_row_n;
101 cf_mutex rows_lock;
102 uint32_t slice_sec;
103 uint32_t buckets[MAX_NUM_COLS];
104 uint32_t num_cols;
105};
106
107//------------------------------------------------
108// Function Declarations
109//
110
111static inline row* get_row(cf_hist_track* this, uint32_t row_n);
112static uint32_t get_start_row_n(cf_hist_track* this, uint32_t back_sec);
113static void output_header(cf_hist_track* this, uint32_t start_ts,
114 uint32_t num_cols, cf_hist_track_info_format info_fmt,
115 cf_dyn_buf* db_p);
116static void output_slice(cf_hist_track* this, row* prev_row_p, row* row_p,
117 uint32_t diff_sec, uint32_t num_cols,
118 cf_hist_track_info_format info_fmt, cf_dyn_buf* db_p);
119static int threshold_to_bucket(int threshold);
120static int thresholds_to_buckets(const char* thresholds, uint32_t buckets[]);
121
122
123//==========================================================
124// Public API
125//
126
127//------------------------------------------------
128// Create a cf_hist_track object.
129//
130cf_hist_track*
131cf_hist_track_create(const char* name, histogram_scale scale)
132{
133 cf_assert(name, AS_INFO, "null histogram name");
134 cf_assert(strlen(name) < HISTOGRAM_NAME_SIZE, AS_INFO,
135 "bad histogram name %s", name);
136 cf_assert(scale >= 0 && scale < HIST_SCALE_MAX_PLUS_1, AS_INFO,
137 "bad histogram scale %d", scale);
138
139 cf_hist_track* this = (cf_hist_track*)cf_malloc(sizeof(cf_hist_track));
140
141 cf_mutex_init(&this->rows_lock);
142
143 // Base histogram setup, same as in histogram_create():
144 strcpy(this->hist.name, name);
145 memset((void*)this->hist.counts, 0, sizeof(this->hist.counts));
146
147 // If cf_hist_track_insert_data_point() is called for a size or count
148 // histogram, the divide by 0 will crash - consider that a high-performance
149 // assert.
150
151 switch (scale) {
152 case HIST_MILLISECONDS:
153 this->hist.scale_tag = HIST_TAG_MILLISECONDS;
154 this->hist.time_div = 1000 * 1000;
155 break;
156 case HIST_MICROSECONDS:
157 this->hist.scale_tag = HIST_TAG_MICROSECONDS;
158 this->hist.time_div = 1000;
159 break;
160 case HIST_SIZE:
161 this->hist.scale_tag = HIST_TAG_SIZE;
162 this->hist.time_div = 0;
163 break;
164 case HIST_COUNT:
165 this->hist.scale_tag = HIST_TAG_COUNT;
166 this->hist.time_div = 0;
167 break;
168 default:
169 cf_crash(AS_INFO, "%s: unrecognized histogram scale %d", name, scale);
170 break;
171 }
172
173 // Start with tracking off.
174 this->rows = NULL;
175
176 return this;
177}
178
179//------------------------------------------------
180// Destroy a cf_hist_track object.
181//
182void
183cf_hist_track_destroy(cf_hist_track* this)
184{
185 cf_hist_track_stop(this);
186 cf_mutex_destroy(&this->rows_lock);
187 cf_free(this);
188}
189
190//------------------------------------------------
191// Start tracking. May call this again without
192// first calling cf_hist_track_disable() to use
193// different caching parameters, but previous
194// cache is lost.
195//
196// TODO - resolve errors ???
197bool
198cf_hist_track_start(cf_hist_track* this, uint32_t back_sec, uint32_t slice_sec,
199 const char* thresholds)
200{
201 if (slice_sec == 0) {
202 return false;
203 }
204
205 uint32_t num_rows = back_sec / slice_sec;
206
207 // Check basic sanity of row-related parameters.
208 if (num_rows > MAX_NUM_ROWS || num_rows < MIN_NUM_ROWS) {
209 return false;
210 }
211
212 // If thresholds aren't specified, use defaults.
213 uint32_t* buckets = (uint32_t*)default_buckets;
214 int num_cols = DEFAULT_NUM_COLS;
215
216 // Parse non-default thresholds and check resulting buckets.
217 uint32_t parsed_buckets[MAX_NUM_COLS];
218
219 if (thresholds) {
220 buckets = parsed_buckets;
221 num_cols = thresholds_to_buckets(thresholds, buckets);
222
223 if (num_cols < 0) {
224 return false;
225 }
226 }
227
228 cf_mutex_lock(&this->rows_lock);
229
230 if (this->rows) {
231 cf_free(this->rows);
232 }
233
234 this->row_size = sizeof(row) + (num_cols * sizeof(uint64_t));
235 this->rows = (row*)cf_malloc(num_rows * this->row_size);
236 this->num_rows = num_rows;
237 this->write_row_n = 0;
238 this->oldest_row_n = 0;
239 this->slice_sec = slice_sec;
240
241 for (int i = 0; i < num_cols; i++) {
242 this->buckets[i] = buckets[i];
243 }
244
245 this->num_cols = (uint32_t)num_cols;
246
247 cf_mutex_unlock(&this->rows_lock);
248
249 return true;
250}
251
252//------------------------------------------------
253// Stop tracking, freeing cache.
254//
255void
256cf_hist_track_stop(cf_hist_track* this)
257{
258 cf_mutex_lock(&this->rows_lock);
259
260 if (this->rows) {
261 cf_free(this->rows);
262 this->rows = NULL;
263 }
264
265 cf_mutex_unlock(&this->rows_lock);
266}
267
268//------------------------------------------------
269// Clear histogram buckets, and if tracking, stop.
270// Must call cf_hist_track_enable() to start
271// tracking again.
272//
273void
274cf_hist_track_clear(cf_hist_track* this)
275{
276 cf_hist_track_stop(this);
277 histogram_clear((histogram*)this);
278}
279
280//------------------------------------------------
281// Print all non-zero histogram buckets, and if
282// tracking, cache timestamp, total data points,
283// and threshold data.
284//
285void
286cf_hist_track_dump(cf_hist_track* this)
287{
288 // Always print the histogram.
289 histogram_dump((histogram*)this);
290
291 // If tracking is enabled, save a row in the cache.
292 cf_mutex_lock(&this->rows_lock);
293
294 if (! this->rows) {
295 cf_mutex_unlock(&this->rows_lock);
296 return;
297 }
298
299 uint32_t now_ts = (uint32_t)time(NULL);
300
301 // But don't save row if slice_sec hasn't elapsed since last saved row.
302 if (this->write_row_n != 0 &&
303 now_ts - get_row(this, this->write_row_n - 1)->timestamp <
304 this->slice_sec) {
305 cf_mutex_unlock(&this->rows_lock);
306 return;
307 }
308
309 row* row_p = get_row(this, this->write_row_n);
310
311 // "Freeze" the histogram for consistency of total.
312 uint64_t counts[N_BUCKETS];
313 uint64_t total_count = 0;
314
315 for (int j = 0; j < N_BUCKETS; j++) {
316 counts[j] = as_load_uint64(&this->hist.counts[j]);
317 total_count += counts[j];
318 }
319
320 uint64_t subtotal = 0;
321
322 // b's "over" is total minus sum of values in all buckets 0 thru b.
323 for (int i = 0, b = 0; i < this->num_cols; b++) {
324 subtotal += counts[b];
325
326 if (this->buckets[i] == b) {
327 row_p->overs[i++] = total_count - subtotal;
328 }
329 }
330
331 row_p->total = total_count;
332 row_p->timestamp = now_ts;
333
334 // Increment the current and oldest row indexes.
335 this->write_row_n++;
336
337 if (this->write_row_n > this->num_rows) {
338 this->oldest_row_n++;
339 }
340
341 cf_mutex_unlock(&this->rows_lock);
342}
343
344//------------------------------------------------
345// Pass-through to base histogram.
346//
347uint64_t
348cf_hist_track_insert_data_point(cf_hist_track* this, uint64_t start_ns)
349{
350 return histogram_insert_data_point((histogram*)this, start_ns);
351}
352
353//------------------------------------------------
354// Pass-through to base histogram.
355//
356void
357cf_hist_track_insert_raw(cf_hist_track* this, uint64_t value)
358{
359 histogram_insert_raw((histogram*)this, value);
360}
361
362//------------------------------------------------
363// Get time-sliced info from cache.
364//
365void
366cf_hist_track_get_info(cf_hist_track* this, uint32_t back_sec,
367 uint32_t duration_sec, uint32_t slice_sec, bool throughput_only,
368 cf_hist_track_info_format info_fmt, cf_dyn_buf* db_p)
369{
370 cf_mutex_lock(&this->rows_lock);
371
372 if (! this->rows) {
373 cf_dyn_buf_append_string(db_p, "error-not-tracking;");
374 cf_mutex_unlock(&this->rows_lock);
375 return;
376 }
377
378 uint32_t start_row_n = get_start_row_n(this, back_sec);
379
380 if (start_row_n == -1) {
381 cf_dyn_buf_append_string(db_p, "error-no-data-yet-or-back-too-small;");
382 cf_mutex_unlock(&this->rows_lock);
383 return;
384 }
385
386 uint32_t num_cols = throughput_only ? 0 : this->num_cols;
387 row* prev_row_p = get_row(this, start_row_n);
388
389 output_header(this, prev_row_p->timestamp, num_cols, info_fmt, db_p);
390
391 if (slice_sec == 0) {
392 row* row_p = get_row(this, this->write_row_n - 1);
393 uint32_t diff_sec = row_p->timestamp - prev_row_p->timestamp;
394
395 output_slice(this, prev_row_p, row_p, diff_sec, num_cols, info_fmt,
396 db_p);
397
398 cf_mutex_unlock(&this->rows_lock);
399 return;
400 }
401
402 uint32_t start_ts = prev_row_p->timestamp;
403 bool no_slices = true;
404
405 for (uint32_t row_n = start_row_n + 1; row_n < this->write_row_n; row_n++) {
406 row* row_p = get_row(this, row_n);
407
408 uint32_t diff_sec = row_p->timestamp - prev_row_p->timestamp;
409
410 if (diff_sec < slice_sec) {
411 continue;
412 }
413
414 output_slice(this, prev_row_p, row_p, diff_sec, num_cols, info_fmt,
415 db_p);
416 no_slices = false;
417
418 // Doing this at the end guarantees we get at least one slice.
419 if (duration_sec != 0 && row_p->timestamp - start_ts > duration_sec) {
420 break;
421 }
422
423 prev_row_p = row_p;
424 }
425
426 if (no_slices) {
427 cf_dyn_buf_append_string(db_p,
428 "error-slice-too-big-or-back-too-small;");
429 }
430
431 cf_mutex_unlock(&this->rows_lock);
432}
433
434//------------------------------------------------
435// Get current settings which were passed into
436// cf_hist_track_start(), in format suitable for
437// info_command_config_get().
438//
439void
440cf_hist_track_get_settings(cf_hist_track* this, cf_dyn_buf* db_p)
441{
442 cf_mutex_lock(&this->rows_lock);
443
444 if (! this->rows) {
445 cf_mutex_unlock(&this->rows_lock);
446 return;
447 }
448
449 const char* name = ((histogram*)this)->name;
450 char output[MAX_FORMATTED_SETTINGS_SIZE];
451 char* write_p = output;
452 char* end_p = output + MAX_FORMATTED_SETTINGS_SIZE - 2;
453
454 write_p += snprintf(output, MAX_FORMATTED_SETTINGS_SIZE - 2,
455 "%s-hist-track-back=%u;"
456 "%s-hist-track-slice=%u;"
457 "%s-hist-track-thresholds=",
458 name, this->num_rows * this->slice_sec,
459 name, this->slice_sec,
460 name);
461
462 for (int i = 0; i < this->num_cols; i++) {
463 write_p += snprintf(write_p, end_p - write_p, "%u,",
464 (uint32_t)1 << this->buckets[i]);
465 }
466
467 if (this->num_cols > 0) {
468 write_p--;
469 }
470
471 *write_p++ = ';';
472 *write_p = 0;
473
474 cf_dyn_buf_append_string(db_p, output);
475
476 cf_mutex_unlock(&this->rows_lock);
477}
478
479
480//==========================================================
481// Private Functions
482//
483
484//------------------------------------------------
485// Get row pointer for specified row count. Note
486// that row_size is determined dynamically, so we
487// can't just do rows[i].
488//
489static inline row*
490get_row(cf_hist_track* this, uint32_t row_n)
491{
492 return (row*)((uint8_t*)this->rows +
493 ((row_n % this->num_rows) * this->row_size));
494}
495
496//------------------------------------------------
497// Find row at or after timestamp specified by
498// back_sec.
499//
500static uint32_t
501get_start_row_n(cf_hist_track* this, uint32_t back_sec)
502{
503 // Must be at least two rows to get a slice.
504 if (this->write_row_n < 2) {
505 return -1;
506 }
507
508 uint32_t now_ts = (uint32_t)time(NULL);
509
510 // In case we call this with default back_sec (0) or back_sec more than UTC
511 // epoch to now - start from the beginning.
512 if (back_sec == 0 || back_sec >= now_ts) {
513 return this->oldest_row_n;
514 }
515
516 uint32_t start_ts = now_ts - back_sec;
517
518 // Find the most recent slice interval.
519 uint32_t last_row_n = this->write_row_n - 1;
520 uint32_t slice_sec = get_row(this, last_row_n)->timestamp -
521 get_row(this, last_row_n - 1)->timestamp;
522
523 // Use recent slice interval to guess how many rows back to look.
524 uint32_t back_row_n = back_sec / slice_sec;
525 uint32_t guess_row_n = last_row_n > back_row_n ?
526 last_row_n - back_row_n : 0;
527
528 if (guess_row_n < this->oldest_row_n) {
529 guess_row_n = this->oldest_row_n;
530 }
531
532 // Begin at guessed row, and iterate to find exact row to start at.
533 uint32_t guess_ts = get_row(this, guess_row_n)->timestamp;
534 uint32_t start_row_n;
535
536 if (guess_ts < start_ts) {
537 for (start_row_n = guess_row_n + 1; start_row_n < last_row_n;
538 start_row_n++) {
539 if (get_row(this, start_row_n)->timestamp >= start_ts) {
540 break;
541 }
542 }
543 }
544 else if (guess_ts > start_ts) {
545 for (start_row_n = guess_row_n; start_row_n > this->oldest_row_n;
546 start_row_n--) {
547 if (get_row(this, start_row_n - 1)->timestamp < start_ts) {
548 break;
549 }
550 }
551 }
552 else {
553 start_row_n = guess_row_n;
554 }
555
556 // Make sure when default query is run (e.g. latency:), we return at least
557 // valid last data instead of returning an error. This case happens when the
558 // query is timed such that it's right when histogram is being dumped.
559 if (start_row_n == last_row_n) {
560 start_row_n = last_row_n - 1;
561 }
562
563 // Can't get a slice if there isn't at least one row after the start row.
564 return start_row_n < last_row_n ? start_row_n : -1;
565}
566
567//------------------------------------------------
568// Make info "header" and append it to db_p.
569//
570static void
571output_header(cf_hist_track* this, uint32_t start_ts, uint32_t num_cols,
572 cf_hist_track_info_format info_fmt, cf_dyn_buf* db_p)
573{
574 cf_dyn_buf_append_string(db_p, ((histogram*)this)->name);
575
576 const char* time_fmt;
577 const char* rate_fmt;
578 const char* pcts_fmt;
579 char line_sep;
580
581 switch (info_fmt) {
582 case CF_HIST_TRACK_FMT_PACKED:
583 default:
584 time_fmt = ":%T-GMT";
585 rate_fmt = ",ops/sec";
586 pcts_fmt = ",>%ums";
587 line_sep = ';';
588 break;
589 case CF_HIST_TRACK_FMT_TABLE:
590 time_fmt = ":\n%T GMT % > (ms)";
591 rate_fmt = "\n to ops/sec";
592 pcts_fmt = " %6u";
593 line_sep = '\n';
594 break;
595 }
596
597 char output[MAX_FORMATTED_ROW_SIZE];
598 char* write_p = output;
599 char* end_p = output + MAX_FORMATTED_ROW_SIZE - 2;
600 time_t start_ts_time_t = (time_t)start_ts;
601 struct tm start_tm;
602
603 gmtime_r(&start_ts_time_t, &start_tm);
604 write_p += strftime(output, MAX_FORMATTED_ROW_SIZE - 2, time_fmt, &start_tm);
605 write_p += snprintf(write_p, end_p - write_p, "%s", rate_fmt);
606
607 for (int i = 0; i < num_cols; i++) {
608 write_p += snprintf(write_p, end_p - write_p, pcts_fmt,
609 (uint32_t)(1 << this->buckets[i]));
610 }
611
612 *write_p++ = line_sep;
613 *write_p = 0;
614
615 cf_dyn_buf_append_string(db_p, output);
616}
617
618//------------------------------------------------
619// Calculate output info for slice defined by two
620// rows, and append to db_p.
621//
622static void
623output_slice(cf_hist_track* this, row* prev_row_p, row* row_p,
624 uint32_t diff_sec, uint32_t num_cols,
625 cf_hist_track_info_format info_fmt, cf_dyn_buf* db_p)
626{
627 const char* time_fmt;
628 const char* rate_fmt;
629 const char* pcts_fmt;
630 char line_sep;
631
632 switch (info_fmt) {
633 case CF_HIST_TRACK_FMT_PACKED:
634 default:
635 time_fmt = "%T";
636 rate_fmt = ",%.1f";
637 pcts_fmt = ",%.2f";
638 line_sep = ';';
639 break;
640 case CF_HIST_TRACK_FMT_TABLE:
641 time_fmt = "%T";
642 rate_fmt = " %9.1f";
643 pcts_fmt = " %6.2f";
644 line_sep = '\n';
645 break;
646 }
647
648 char output[MAX_FORMATTED_ROW_SIZE];
649 char* write_p = output;
650 char* end_p = output + MAX_FORMATTED_ROW_SIZE - 2;
651 time_t row_ts_time_t = (time_t)row_p->timestamp;
652 struct tm row_tm;
653
654 gmtime_r(&row_ts_time_t, &row_tm);
655 write_p += strftime(output, MAX_FORMATTED_ROW_SIZE - 2, time_fmt, &row_tm);
656
657 uint64_t diff_total = row_p->total - prev_row_p->total;
658 double ops_per_sec = (double)(diff_total) / diff_sec;
659
660 write_p += snprintf(write_p, end_p - write_p, rate_fmt, ops_per_sec);
661
662 for (int i = 0; i < num_cols; i++) {
663 // We "freeze" the histogram to calculate "overs", so it shouldn't be
664 // possible for an "over" to be less than the one in the previous row.
665 uint64_t diff_overs = row_p->overs[i] - prev_row_p->overs[i];
666 double pcts_over_i = diff_total != 0 ?
667 (double)(diff_overs * 100) / diff_total : 0;
668
669 write_p += snprintf(write_p, end_p - write_p, pcts_fmt, pcts_over_i);
670 }
671
672 *write_p++ = line_sep;
673 *write_p = 0;
674
675 cf_dyn_buf_append_string(db_p, output);
676}
677
678//------------------------------------------------
679// Convert threshold milliseconds to bucket index.
680//
681static int
682threshold_to_bucket(int threshold)
683{
684 if (threshold < 1) {
685 return -1;
686 }
687
688 int n = threshold;
689 int b = 0;
690
691 while (n > 1) {
692 n >>= 1;
693 b++;
694 }
695
696 // Check that threshold is an exact power of 2.
697 return (1 << b) == threshold ? b : -1;
698}
699
700//------------------------------------------------
701// Convert thresholds string to buckets array.
702//
703static int
704thresholds_to_buckets(const char* thresholds, uint32_t buckets[])
705{
706 // Copy since strtok_r() is destructive.
707 char toks[strlen(thresholds) + 1];
708
709 strcpy(toks, thresholds);
710
711 char* save_ptr = NULL;
712 char* tok = strtok_r(toks, ",", &save_ptr);
713
714 int i = 0;
715
716 while (tok) {
717 if (i == MAX_NUM_COLS) {
718 return -1;
719 }
720
721 int b = threshold_to_bucket(atoi(tok));
722
723 // Make sure it's a rising sequence of valid bucket indexes.
724 if (b < 0 || b > MAX_BUCKET || (i > 0 && b <= buckets[i - 1])) {
725 return -1;
726 }
727
728 buckets[i++] = (uint32_t)b;
729
730 tok = strtok_r(NULL, ",", &save_ptr);
731 }
732
733 return i;
734}
735