1/*
2 * health.c
3 *
4 * Copyright (C) 2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/health.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33#include <unistd.h>
34
35#include "citrusleaf/cf_atomic.h"
36#include "citrusleaf/cf_clock.h"
37#include "citrusleaf/cf_vector.h"
38
39#include "cf_mutex.h"
40#include "cf_thread.h"
41#include "dynbuf.h"
42#include "fault.h"
43#include "node.h"
44#include "shash.h"
45
46#include "base/cfg.h"
47#include "base/datamodel.h"
48#include "fabric/exchange.h"
49#include "storage/storage.h"
50
51
52//==========================================================
53// Typedefs & constants.
54//
55
56#define DETECTION_INTERVAL 60 // 1 min
57#define MAX_IDLE_SEC (60 * 30) // 30 min
58#define MAX_ID_SZ 200
59#define MAX_NODES_TRACKED (2 * AS_CLUSTER_SZ)
60#define MIN_CONFIDENCE_PCT 50
61#define MOV_AVG_COEFF 0.5
62#define SAMPLE_PERIOD_US (50 * 1000) // sampling 50ms for every sec
63#define SEC_US (1000 * 1000)
64
65typedef struct bucket_s {
66 cf_atomic64 sample_sum;
67 cf_atomic32 n_samples; // relevant only for specific stats
68} bucket;
69
70typedef struct health_stat_s {
71 bucket* buckets;
72 volatile uint32_t cur_bucket;
73 char id[MAX_ID_SZ];
74} health_stat;
75
76typedef struct peer_stats_s {
77 health_stat node_stats[AS_HEALTH_NODE_TYPE_MAX];
78 health_stat ns_stats[AS_NAMESPACE_SZ][AS_HEALTH_NS_TYPE_MAX];
79 bool is_in_cluster;
80 uint64_t last_sample;
81} peer_stats;
82
83typedef struct local_stats_s {
84 health_stat device_read_lat[AS_NAMESPACE_SZ][AS_STORAGE_MAX_DEVICES];
85} local_stats;
86
87typedef struct mov_avg_s {
88 char* id;
89 double value;
90} mov_avg;
91
92typedef struct cluster_mov_avg_s {
93 uint32_t n_nodes;
94 mov_avg nma_array[MAX_NODES_TRACKED];
95} cluster_mov_avg;
96
97typedef struct cluster_all_mov_avg_s {
98 cluster_mov_avg cl_node_stats[AS_HEALTH_NODE_TYPE_MAX];
99 cluster_mov_avg cl_ns_stats[AS_NAMESPACE_SZ][AS_HEALTH_NS_TYPE_MAX];
100} cluster_all_mov_avg;
101
102typedef struct local_all_mov_avg_s {
103 mov_avg device_mov_avg[AS_NAMESPACE_SZ][AS_STORAGE_MAX_DEVICES];
104} local_all_mov_avg;
105
106typedef struct outlier_s {
107 uint32_t confidence_pct;
108 char* id;
109 uint32_t ns_id;
110 const char* reason;
111} outlier;
112
113typedef struct stat_spec_s {
114 bool is_counter;
115 bool depends_on_cluster;
116 uint32_t n_buckets; // one bucket per detection interval
117 uint32_t threshold; // min sum/avg below which stat is not considered outlier
118 const char* stat_str;
119} stat_spec;
120
121// Maintain order as per as_health_local_stat_type enum.
122static const stat_spec local_stat_spec[] = {
123 { false, false, 30, 0, "device_read_latency" } // AS_HEALTH_LOCAL_DEVICE_READ_LAT
124};
125
126// Maintain order as per as_health_node_stat_type enum.
127static const stat_spec node_stat_spec[] = {
128 { true, false, 30, 5, "fabric_connections_opened" }, // AS_HEALTH_NODE_FABRIC_FDS
129 { true, true, 30, 2, "proxies" }, // AS_HEALTH_NODE_PROXIES
130 { true, false, 30, 1, "node_arrivals" } // AS_HEALTH_NODE_ARRIVALS
131};
132
133// Maintain order as per as_health_ns_stat_type enum.
134static const stat_spec ns_stat_spec[] = {
135 { false, true, 30, 0, "replication_latency" } // AS_HEALTH_NS_REPL_LAT
136};
137
138
139//==========================================================
140// Forward declarations.
141//
142
143static int32_t clear_data_reduce_fn(const void* key, void* data, void* udata);
144static void cluster_state_changed_fn(const as_exchange_cluster_changed_event* event, void* udata);
145static int compare_stats(const void* o1, const void* o2);
146static void compute_local_stats_mov_avg (local_all_mov_avg* lma);
147static void compute_node_mov_avg(peer_stats* ps, cluster_all_mov_avg* cs, cf_node node);
148static void compute_ns_mov_avg(peer_stats* ps, cluster_all_mov_avg* cs, cf_node node);
149static void create_local_stats();
150static peer_stats* create_node(cf_node node);
151static void find_outliers_from_local_stats(local_all_mov_avg* lma);
152static void find_outliers_from_stats(cluster_all_mov_avg* cs);
153static void find_outlier_per_stat(mov_avg* ma, uint32_t n_entries, uint32_t threshold, const char* reason, uint32_t ns_id);
154static int32_t mark_cl_membership_reduce_fn(const void* key, void* data, void* udata);
155static void print_local_stats(cf_dyn_buf* db);
156static void print_node_stats(peer_stats* ps, cf_dyn_buf* db, cf_node node);
157static void print_ns_stats(peer_stats* ps, cf_dyn_buf* db, cf_node node);
158static int32_t print_stats_reduce_fn(const void* key, void* data, void* udata);
159static void reset_local_stats();
160static void* run_health();
161static void shift_window_local_stat();
162static void shift_window_node_stat(peer_stats* ps);
163static void shift_window_ns_stat(peer_stats* ps);
164static int32_t update_mov_avg_reduce_fn(const void* key, void* data, void* udata);
165
166
167//==========================================================
168// Globals.
169//
170
171bool g_health_enabled = false;
172
173__thread uint64_t g_device_read_counter = 0;
174__thread uint64_t g_replica_write_counter = 0;
175
176static local_stats g_local_stats;
177static cf_mutex g_outlier_lock = CF_MUTEX_INIT;
178static cf_vector* g_outliers;
179static cf_shash* g_stats;
180
181
182//==========================================================
183// Inlines and macros.
184//
185
186static inline void
187add_counter_sample(health_stat* hs)
188{
189 bucket* b = &hs->buckets[hs->cur_bucket];
190
191 // For counters, sample_sum is just the total count, n_samples is unused.
192 cf_atomic64_incr(&b->sample_sum);
193}
194
195static inline void
196add_latency_sample(health_stat* hs, uint64_t delta_us)
197{
198 bucket* b = &hs->buckets[hs->cur_bucket];
199
200 cf_atomic64_add(&b->sample_sum, delta_us);
201 cf_atomic32_incr(&b->n_samples);
202}
203
204static inline double
205compute_mov_avg_count(bucket* buckets, uint32_t n_buckets)
206{
207 uint64_t sample_sum = 0;
208
209 for (uint32_t i = 0; i < n_buckets; i++) {
210 sample_sum += buckets[i].sample_sum;
211 }
212
213 return (double)sample_sum;
214}
215
216static inline double
217compute_mov_avg_latency(bucket* buckets, uint32_t n_buckets)
218{
219 uint64_t sample_sum = 0;
220 uint64_t n_samples = 0;
221
222 for (uint32_t i = 0; i < n_buckets; i++) {
223 sample_sum += buckets[i].sample_sum;
224 n_samples += buckets[i].n_samples;
225 }
226
227 return n_samples == 0 ? 0.0 : (double)sample_sum / (double)n_samples;
228}
229
230static inline double
231compute_mov_avg(bucket* buckets, uint32_t n_buckets, bool is_counter)
232{
233 return is_counter ?
234 compute_mov_avg_count(buckets, n_buckets) :
235 compute_mov_avg_latency(buckets, n_buckets);
236}
237
238static inline uint32_t
239find_median_index(uint32_t from, uint32_t to)
240{
241 return (to + from) / 2;
242}
243
244static inline bool
245is_node_active(peer_stats* ps)
246{
247 return ps->is_in_cluster ||
248 cf_get_seconds() - ps->last_sample < MAX_IDLE_SEC;
249}
250
251
252//==========================================================
253// Public API.
254//
255
256void
257as_health_get_outliers(cf_dyn_buf* db)
258{
259 if (! g_health_enabled) {
260 return;
261 }
262
263 cf_detail(AS_HEALTH, "getting outlier info");
264
265 cf_mutex_lock(&g_outlier_lock);
266
267 for (uint32_t i = 0; i < cf_vector_size(g_outliers); i++) {
268 outlier cur;
269 cf_vector_get(g_outliers, i, &cur);
270
271 cf_dyn_buf_append_string(db, "id=");
272 cf_dyn_buf_append_string(db, cur.id);
273 cf_dyn_buf_append_char(db, ':');
274
275 if (cur.ns_id != 0) {
276 cf_dyn_buf_append_string(db, "namespace=");
277 cf_dyn_buf_append_string(db,
278 g_config.namespaces[cur.ns_id - 1]->name);
279 cf_dyn_buf_append_char(db, ':');
280 }
281
282 cf_dyn_buf_append_string(db, "confidence_pct=");
283 cf_dyn_buf_append_uint32(db, cur.confidence_pct);
284 cf_dyn_buf_append_char(db, ':');
285 cf_dyn_buf_append_string(db, "reason=");
286 cf_dyn_buf_append_string(db, cur.reason);
287
288 cf_dyn_buf_append_char(db, ';');
289 }
290
291 cf_dyn_buf_chomp(db);
292
293 cf_mutex_unlock(&g_outlier_lock);
294}
295
296void
297as_health_get_stats(cf_dyn_buf* db)
298{
299 if (! g_health_enabled) {
300 return;
301 }
302
303 cf_shash_reduce(g_stats, print_stats_reduce_fn, db);
304 print_local_stats(db);
305 cf_dyn_buf_chomp(db);
306}
307
308void
309as_health_start()
310{
311 g_stats = cf_shash_create(cf_nodeid_shash_fn, sizeof(cf_node),
312 sizeof(peer_stats*), AS_CLUSTER_SZ, CF_SHASH_MANY_LOCK);
313 g_outliers = cf_vector_create(sizeof(outlier), 10, 0);
314
315 create_local_stats();
316
317 as_exchange_register_listener(cluster_state_changed_fn, NULL);
318
319 cf_info(AS_HEALTH, "starting health monitor thread");
320
321 cf_thread_create_detached(run_health, NULL);
322}
323
324void
325health_add_device_latency(uint32_t ns_id, uint32_t d_id, uint64_t start_us)
326{
327 uint64_t delta_us = cf_getus() - start_us;
328 health_stat* hs = &g_local_stats.device_read_lat[ns_id - 1][d_id];
329
330 add_latency_sample(hs, delta_us);
331}
332
333void
334health_add_node_counter(cf_node node, as_health_node_stat_type type)
335{
336 peer_stats* ps = NULL;
337
338 if (cf_shash_get(g_stats, &node, &ps) != CF_SHASH_OK) {
339 ps = create_node(node);
340 }
341
342 add_counter_sample(&ps->node_stats[type]);
343}
344
345void
346health_add_ns_latency(cf_node node, uint32_t ns_id,
347 as_health_ns_stat_type type, uint64_t start_us)
348{
349 uint64_t delta_us = cf_getus() - start_us;
350 peer_stats* ps = NULL;
351
352 if (cf_shash_get(g_stats, &node, &ps) != CF_SHASH_OK) {
353 ps = create_node(node);
354 }
355
356 add_latency_sample(&ps->ns_stats[ns_id - 1][type], delta_us);
357}
358
359
360//==========================================================
361// Local helpers.
362//
363
364static int32_t
365clear_data_reduce_fn(const void* key, void* data, void* udata)
366{
367 peer_stats* ps = *(peer_stats**)data;
368
369 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
370 size_t buckets_sz = sizeof(bucket) * node_stat_spec[type].n_buckets;
371 memset(ps->node_stats[type].buckets, 0, buckets_sz);
372 ps->node_stats[type].cur_bucket = 0;
373 }
374
375 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
376 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
377 size_t buckets_sz = sizeof(bucket) * ns_stat_spec[type].n_buckets;
378 memset(ps->ns_stats[ns_ix][type].buckets, 0, buckets_sz);
379 ps->ns_stats[ns_ix][type].cur_bucket = 0;
380 }
381 }
382
383 return CF_SHASH_OK;
384}
385
386static void
387cluster_state_changed_fn(const as_exchange_cluster_changed_event* event,
388 void* udata)
389{
390 cf_detail(AS_HEALTH, "received cluster state changed event");
391 cf_shash_reduce(g_stats, mark_cl_membership_reduce_fn, (void*)event);
392}
393
394static int
395compare_stats(const void* o1, const void* o2)
396{
397 double stat1 = ((mov_avg*)o1)->value;
398 double stat2 = ((mov_avg*)o2)->value;
399
400 return stat1 > stat2 ? 1 : (stat1 == stat2 ? 0 : -1);
401}
402
403static void
404compute_local_stats_mov_avg(local_all_mov_avg* lma)
405{
406 const stat_spec* spec = &local_stat_spec[AS_HEALTH_LOCAL_DEVICE_READ_LAT];
407
408 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
409 uint32_t n_devices =
410 as_namespace_device_count(g_config.namespaces[ns_ix]);
411
412 for (uint32_t d_id = 0; d_id < n_devices; d_id++) {
413 health_stat* hs = &g_local_stats.device_read_lat[ns_ix][d_id];
414 mov_avg* dma = &lma->device_mov_avg[ns_ix][d_id];
415 const char* device_name =
416 g_config.namespaces[ns_ix]->storage_devices[d_id];
417
418 dma->id = hs->id;
419 dma->value = compute_mov_avg(hs->buckets, spec->n_buckets,
420 spec->is_counter);
421
422 cf_detail(AS_HEALTH, "moving average: device %s value %lf current bucket %lu",
423 device_name, dma->value,
424 hs->buckets[hs->cur_bucket].sample_sum);
425 }
426 }
427}
428
429// Fills per node information while computing moving avg/sum.
430static void
431compute_node_mov_avg(peer_stats* ps, cluster_all_mov_avg* cs, cf_node node)
432{
433 if (! is_node_active(ps)) {
434 return;
435 }
436
437 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
438 const stat_spec* spec = &node_stat_spec[type];
439
440 if (spec->depends_on_cluster && ! ps->is_in_cluster) {
441 continue;
442 }
443
444 health_stat* hs = &ps->node_stats[type];
445 uint32_t node_index = cs->cl_node_stats[type].n_nodes;
446
447 // In extreme cases (e.g., adjacency list size >> succession list size)
448 // we may have more nodes than MAX_NODES_TRACKED.
449 if (node_index >= MAX_NODES_TRACKED) {
450 continue;
451 }
452
453 mov_avg* nma = &cs->cl_node_stats[type].nma_array[node_index];
454
455 nma->id = hs->id;
456 nma->value = compute_mov_avg(hs->buckets, spec->n_buckets,
457 spec->is_counter);
458 cs->cl_node_stats[type].n_nodes++;
459
460 cf_detail(AS_HEALTH, "moving average/sum: node %lx type %u value %lf current-bucket %lu",
461 node, type, nma->value, hs->buckets[hs->cur_bucket].sample_sum);
462 }
463}
464
465// Fills per namespace and per node information while computing moving avg/sum.
466static void
467compute_ns_mov_avg(peer_stats* ps, cluster_all_mov_avg* cs, cf_node node)
468{
469 if (! is_node_active(ps)) {
470 return;
471 }
472
473 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
474 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
475 const stat_spec* spec = &ns_stat_spec[type];
476
477 if (spec->depends_on_cluster && ! ps->is_in_cluster) {
478 continue;
479 }
480
481 health_stat* hs = &ps->ns_stats[ns_ix][type];
482 uint32_t node_index = cs->cl_ns_stats[ns_ix][type].n_nodes;
483
484 // In extreme cases (e.g., too many alumni) we may have more nodes
485 // than MAX_NODES_TRACKED.
486 if (node_index >= MAX_NODES_TRACKED) {
487 continue;
488 }
489
490 mov_avg* nma = &cs->cl_ns_stats[ns_ix][type].nma_array[node_index];
491
492 nma->id = hs->id;
493 nma->value = compute_mov_avg(hs->buckets, spec->n_buckets,
494 spec->is_counter);
495 cs->cl_ns_stats[ns_ix][type].n_nodes++;
496
497 cf_detail(AS_HEALTH, "moving average/sum: node %lx ns-id %u type %u value %lf current-bucket %lu",
498 node, ns_ix + 1, type, nma->value,
499 hs->buckets[hs->cur_bucket].sample_sum);
500 }
501 }
502}
503
504static void
505create_local_stats()
506{
507 size_t buckets_sz = sizeof(bucket) *
508 local_stat_spec[AS_HEALTH_LOCAL_DEVICE_READ_LAT].n_buckets;
509
510 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
511 as_namespace* ns = g_config.namespaces[ns_ix];
512 health_stat* device_stats = g_local_stats.device_read_lat[ns_ix];
513 uint32_t n_devices = as_namespace_device_count(ns);
514
515 for (uint32_t d_id = 0; d_id < n_devices; d_id++) {
516 health_stat* hs = &device_stats[d_id];
517 hs->buckets = cf_malloc(buckets_sz);
518 memset(hs->buckets, 0, buckets_sz);
519 hs->cur_bucket = 0;
520 strncpy(hs->id, ns->storage_devices[d_id], MAX_ID_SZ);
521 }
522 }
523}
524
525static peer_stats*
526create_node(cf_node node)
527{
528 peer_stats* ps = cf_malloc(sizeof(peer_stats));
529 memset(ps, 0, sizeof(peer_stats));
530 ps->is_in_cluster = true;
531
532 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
533 size_t buckets_sz = sizeof(bucket) * node_stat_spec[type].n_buckets;
534 health_stat* hs = &ps->node_stats[type];
535 hs->buckets = cf_malloc(buckets_sz);
536 memset(hs->buckets, 0, buckets_sz);
537 sprintf(hs->id, "%lx", node);
538 }
539
540 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
541 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
542 size_t buckets_sz = sizeof(bucket) * ns_stat_spec[type].n_buckets;
543 health_stat* hs = &ps->ns_stats[ns_ix][type];
544 hs->buckets = cf_malloc(buckets_sz);
545 memset(hs->buckets, 0, buckets_sz);
546 sprintf(hs->id, "%lx", node);
547 }
548 }
549
550 // Multiple callers may race to create a node.
551 if (cf_shash_put_unique(g_stats, &node, &ps) == CF_SHASH_ERR_FOUND) {
552 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
553 cf_free(ps->node_stats[type].buckets);
554 }
555
556 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
557 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
558 cf_free(ps->ns_stats[ns_ix][type].buckets);
559 }
560 }
561
562 cf_free(ps);
563 cf_shash_get(g_stats, &node, &ps);
564 }
565
566 return ps;
567}
568
569// Use inter-quartile distance to detect outliers.
570static void
571find_outliers_from_local_stats(local_all_mov_avg* lma)
572{
573 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
574 uint32_t n_devices =
575 as_namespace_device_count(g_config.namespaces[ns_ix]);
576 mov_avg* dma = lma->device_mov_avg[ns_ix];
577 const stat_spec* spec =
578 &local_stat_spec[AS_HEALTH_LOCAL_DEVICE_READ_LAT];
579
580 find_outlier_per_stat(dma, n_devices, spec->threshold, spec->stat_str,
581 ns_ix + 1);
582 }
583}
584
585// Use inter-quartile distance to detect outliers.
586static void
587find_outliers_from_stats(cluster_all_mov_avg* cs)
588{
589 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
590 cluster_mov_avg* cma = &cs->cl_node_stats[type];
591 const stat_spec* spec = &node_stat_spec[type];
592
593 find_outlier_per_stat(cma->nma_array, cma->n_nodes, spec->threshold,
594 spec->stat_str, 0);
595 }
596
597 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
598 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
599 cluster_mov_avg* cma = &cs->cl_ns_stats[ns_ix][type];
600 const stat_spec* spec = &ns_stat_spec[type];
601
602 find_outlier_per_stat(cma->nma_array, cma->n_nodes, spec->threshold,
603 spec->stat_str, ns_ix + 1);
604 }
605 }
606}
607
608static void
609find_outlier_per_stat(mov_avg* ma, uint32_t n_entries, uint32_t threshold,
610 const char* reason, uint32_t ns_id)
611{
612 // Nobody can be declared as outliers with 1 or 2 entries.
613 if (n_entries <= 2) {
614 return;
615 }
616
617 qsort(ma, n_entries, sizeof(mov_avg), compare_stats);
618 uint32_t q2_index = find_median_index(0, n_entries - 1);
619 uint32_t q3_index = find_median_index(q2_index, n_entries - 1);
620 uint32_t q1_index = find_median_index(0, q2_index - 1);
621
622 double q3 = ma[q3_index].value;
623 double q2 = ma[q2_index].value;
624 double q1 = ma[q1_index].value;
625 // Picking k-factor as 3 to detect far off outliers.
626 double iqr = q3 - q1;
627 double upper_bound = q3 + 3 * iqr;
628
629 for (uint32_t i = 0; i < n_entries; i++) {
630 double mov_avg = ma[i].value;
631 uint32_t confidence_pct = (uint32_t)
632 (((mov_avg - q2) * 100 / mov_avg) + 0.5);
633
634 if (mov_avg > upper_bound && mov_avg > threshold &&
635 confidence_pct >= MIN_CONFIDENCE_PCT) {
636 outlier outlier = {
637 .confidence_pct = confidence_pct,
638 .id = ma[i].id,
639 .ns_id = ns_id,
640 .reason = reason
641 };
642
643 cf_vector_append(g_outliers, &outlier);
644 }
645 }
646}
647
648static int32_t
649mark_cl_membership_reduce_fn(const void* key, void* data, void* udata)
650{
651 peer_stats* ps = *(peer_stats**)data;
652 cf_node* node = (cf_node*)key;
653 as_exchange_cluster_changed_event* event =
654 (as_exchange_cluster_changed_event*)udata;
655
656 ps->is_in_cluster = contains_node(event->succession, event->cluster_size,
657 *node);
658
659 return CF_SHASH_OK;
660}
661
662static void
663print_local_stats(cf_dyn_buf* db)
664{
665 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
666 as_namespace* ns = g_config.namespaces[ns_ix];
667 health_stat* device_stats = g_local_stats.device_read_lat[ns_ix];
668 uint32_t n_devices = as_namespace_device_count(ns);
669 const stat_spec* spec =
670 &local_stat_spec[AS_HEALTH_LOCAL_DEVICE_READ_LAT];
671
672 for (uint32_t d_id = 0; d_id < n_devices; d_id++) {
673 health_stat* hs = &device_stats[d_id];
674 double mov_avg = compute_mov_avg(hs->buckets, spec->n_buckets,
675 spec->is_counter);
676
677 cf_dyn_buf_append_string(db, "stat=");
678 cf_dyn_buf_append_string(db, ns->name);
679 cf_dyn_buf_append_string(db, "_");
680 cf_dyn_buf_append_string(db, spec->stat_str);
681 cf_dyn_buf_append_string(db, ":");
682 cf_dyn_buf_append_string(db, "value=");
683 cf_dyn_buf_append_int(db, (int)(mov_avg + 0.5));
684 cf_dyn_buf_append_string(db, ":");
685 cf_dyn_buf_append_string(db, "device=");
686 cf_dyn_buf_append_string(db, hs->id);
687 cf_dyn_buf_append_string(db, ":");
688 cf_dyn_buf_append_string(db, "namespace=");
689 cf_dyn_buf_append_string(db, ns->name);
690 cf_dyn_buf_append_string(db, ";");
691 }
692 }
693}
694
695static void
696print_node_stats(peer_stats* ps, cf_dyn_buf* db, cf_node node)
697{
698 if (! is_node_active(ps)) {
699 return;
700 }
701
702 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
703 const stat_spec* spec = &node_stat_spec[type];
704
705 if (spec->depends_on_cluster && ! ps->is_in_cluster) {
706 continue;
707 }
708
709 health_stat* hs = &ps->node_stats[type];
710 double mov_avg = compute_mov_avg(hs->buckets, spec->n_buckets,
711 spec->is_counter);
712
713 cf_dyn_buf_append_string(db, "stat=");
714 cf_dyn_buf_append_string(db, spec->stat_str);
715 cf_dyn_buf_append_string(db, ":");
716 cf_dyn_buf_append_string(db, "value=");
717 cf_dyn_buf_append_int(db, (int)(mov_avg + 0.5));
718 cf_dyn_buf_append_string(db, ":");
719 cf_dyn_buf_append_string(db, "node=");
720 cf_dyn_buf_append_uint64_x(db, node);
721 cf_dyn_buf_append_string(db, ";");
722 }
723}
724
725static void
726print_ns_stats(peer_stats* ps, cf_dyn_buf* db, cf_node node)
727{
728 if (! is_node_active(ps)) {
729 return;
730 }
731
732 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
733 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
734 const stat_spec* spec = &ns_stat_spec[type];
735
736 if (spec->depends_on_cluster && ! ps->is_in_cluster) {
737 continue;
738 }
739
740 health_stat* hs = &ps->ns_stats[ns_ix][type];
741 double mov_avg = compute_mov_avg(hs->buckets, spec->n_buckets,
742 spec->is_counter);
743
744 as_namespace* ns = g_config.namespaces[ns_ix];
745
746 cf_dyn_buf_append_string(db, "stat=");
747 cf_dyn_buf_append_string(db, ns->name);
748 cf_dyn_buf_append_string(db, "_");
749 cf_dyn_buf_append_string(db, spec->stat_str);
750 cf_dyn_buf_append_string(db, ":");
751 cf_dyn_buf_append_string(db, "value=");
752 cf_dyn_buf_append_int(db, (int)(mov_avg + 0.5));
753 cf_dyn_buf_append_string(db, ":");
754 cf_dyn_buf_append_string(db, "node=");
755 cf_dyn_buf_append_uint64_x(db, node);
756 cf_dyn_buf_append_string(db, ":");
757 cf_dyn_buf_append_string(db, "namespace=");
758 cf_dyn_buf_append_string(db, ns->name);
759 cf_dyn_buf_append_string(db, ";");
760 }
761 }
762}
763
764static int32_t
765print_stats_reduce_fn(const void* key, void* data, void* udata)
766{
767 peer_stats* ps = *(peer_stats**)data;
768 cf_dyn_buf* db = (cf_dyn_buf*)udata;
769 cf_node* node = (cf_node*)key;
770
771 print_node_stats(ps, db, *node);
772 print_ns_stats(ps, db, *node);
773
774 return CF_SHASH_OK;
775}
776
777static void
778reset_local_stats()
779{
780 size_t buckets_sz = sizeof(bucket) *
781 local_stat_spec[AS_HEALTH_LOCAL_DEVICE_READ_LAT].n_buckets;
782
783 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
784 health_stat* device_stats = g_local_stats.device_read_lat[ns_ix];
785 uint32_t n_devices =
786 as_namespace_device_count(g_config.namespaces[ns_ix]);
787
788 for (uint32_t d_id = 0; d_id < n_devices; d_id++) {
789 health_stat* hs = &device_stats[d_id];
790
791 memset(hs->buckets, 0, buckets_sz);
792 hs->cur_bucket = 0;
793 }
794 }
795}
796
797static void*
798run_health()
799{
800 uint64_t last_time = 0; // try to ensure we run immediately at startup
801
802 while (true) {
803 sleep(1); // wake up every second to check
804
805 if (! g_config.health_check_enabled) {
806 if (g_health_enabled) {
807 g_health_enabled = false;
808 last_time = 0; // allow re-enabling to take immediate effect
809 }
810
811 continue;
812 }
813
814 uint64_t curr_time = cf_get_seconds(); // may be near 0 at startup
815
816 if (curr_time - last_time < DETECTION_INTERVAL) {
817 continue;
818 }
819
820 last_time = curr_time;
821
822 cf_mutex_lock(&g_outlier_lock);
823
824 cf_vector_clear(g_outliers);
825
826 if (! g_health_enabled) {
827 cf_mutex_unlock(&g_outlier_lock);
828
829 // Clear everything.
830 cf_shash_reduce(g_stats, clear_data_reduce_fn, NULL);
831 reset_local_stats();
832
833 g_health_enabled = true;
834 continue; // no point analyzing yet - wait one interval
835 }
836
837 cluster_all_mov_avg cs;
838 local_all_mov_avg lma;
839 memset(&cs, 0, sizeof(cs));
840 memset(&lma, 0, sizeof(lma));
841
842 cf_shash_reduce(g_stats, update_mov_avg_reduce_fn, &cs);
843 compute_local_stats_mov_avg(&lma);
844 shift_window_local_stat();
845
846 find_outliers_from_stats(&cs);
847 find_outliers_from_local_stats(&lma);
848
849 for (uint32_t i = 0; i < cf_vector_size(g_outliers); i++) {
850 outlier cur;
851 cf_vector_get(g_outliers, i, &cur);
852
853 if (cur.ns_id == 0) {
854 cf_warning(AS_HEALTH, "outlier %s: confidence-pct %u reason %s",
855 cur.id, cur.confidence_pct, cur.reason);
856 }
857 else {
858 cf_warning(AS_HEALTH, "outlier %s: namespace %s confidence-pct %u reason %s",
859 cur.id, g_config.namespaces[cur.ns_id - 1]->name,
860 cur.confidence_pct, cur.reason);
861 }
862 }
863
864 cf_mutex_unlock(&g_outlier_lock);
865 }
866
867 return NULL;
868}
869
870static void
871shift_window_local_stat()
872{
873 const stat_spec* spec = &local_stat_spec[AS_HEALTH_LOCAL_DEVICE_READ_LAT];
874
875 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
876 uint32_t n_devices =
877 as_namespace_device_count(g_config.namespaces[ns_ix]);
878
879 for (uint32_t d_id = 0; d_id < n_devices; d_id++) {
880 health_stat* stat = &g_local_stats.device_read_lat[ns_ix][d_id];
881 uint32_t index = (stat->cur_bucket + 1) % spec->n_buckets;
882 bucket* new_bucket = &stat->buckets[index];
883
884 cf_atomic64_set(&new_bucket->sample_sum, 0);
885 cf_atomic32_set(&new_bucket->n_samples, 0);
886 stat->cur_bucket = index;
887 }
888 }
889}
890
891static void
892shift_window_node_stat(peer_stats* ps)
893{
894 for (uint32_t type = 0; type < AS_HEALTH_NODE_TYPE_MAX; type++) {
895 health_stat* stat = &ps->node_stats[type];
896 uint32_t index = stat->cur_bucket;
897 bucket* cur_bucket = &stat->buckets[index];
898
899 if (cur_bucket->n_samples != 0) {
900 ps->last_sample = cf_get_seconds();
901 }
902
903 index = (index + 1) % node_stat_spec[type].n_buckets;
904 bucket* new_bucket = &stat->buckets[index];
905
906 cf_atomic64_set(&new_bucket->sample_sum, 0);
907 cf_atomic64_set(&new_bucket->n_samples, 0);
908 stat->cur_bucket = index;
909 }
910}
911
912static void
913shift_window_ns_stat(peer_stats* ps)
914{
915 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++ ) {
916 for (uint32_t type = 0; type < AS_HEALTH_NS_TYPE_MAX; type++) {
917 health_stat* stat = &ps->ns_stats[ns_ix][type];
918 uint32_t index = stat->cur_bucket;
919 bucket* cur_bucket = &stat->buckets[index];
920
921 if (cur_bucket->n_samples != 0) {
922 ps->last_sample = cf_get_seconds();
923 }
924
925 index = (index + 1) % ns_stat_spec[type].n_buckets;
926 bucket* new_bucket = &stat->buckets[index];
927
928 cf_atomic64_set(&new_bucket->sample_sum, 0);
929 cf_atomic32_set(&new_bucket->n_samples, 0);
930 stat->cur_bucket = index;
931 }
932 }
933}
934
935static int32_t
936update_mov_avg_reduce_fn(const void* key, void* data, void* udata)
937{
938 peer_stats* ps = *(peer_stats**)data;
939 cluster_all_mov_avg* cs = (cluster_all_mov_avg*)udata;
940 cf_node* node = (cf_node*)key;
941
942 compute_node_mov_avg(ps, cs, *node);
943 compute_ns_mov_avg(ps, cs, *node);
944 shift_window_node_stat(ps);
945 shift_window_ns_stat(ps);
946
947 return CF_SHASH_OK;
948}
949