1/*
2 * skew_monitor.c
3 *
4 * Copyright (C) 2012-2017 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "fabric/skew_monitor.h"
24
25#include <math.h>
26#include <stdbool.h>
27#include <stdint.h>
28#include <stdlib.h>
29#include <sys/param.h>
30
31#include "citrusleaf/alloc.h"
32
33#include "msg.h"
34
35#include "base/cfg.h"
36#include "base/datamodel.h"
37#include "base/nsup.h"
38#include "fabric/clustering.h"
39#include "fabric/exchange.h"
40#include "fabric/hb.h"
41
42/*
43 * Overview
44 * ========
45 * Monitors skew across nodes in a cluster to allow other modules to handle skew
46 * beyond tolerances. For example CP namespaces block transctions on skew beyond
47 * tolerable limits.
48 *
49 * Principle of skew monitoring
50 * ============================
51 * The hlc clock forms a pretty close upper bound on the physical clocks for
52 * adjacent nodes within the bounds of network trip time.
53 *
54 * Lets call the difference between a node's physical component of hlc time and
55 * physical time at the same instant as its hlc_delta.
56 * The premise is that the difference between the min hlc_delta and max
57 * hlc_delta observed for adjacent nodes closely follows the maximum clock skew
58 * in the cluster.
59 *
60 * The clock skew monitor adds a physical timestamp field to each heartbeat
61 * pulse message.
62 * For a peer node on receipt of a heartbeat pulse, hlc_delta is computed as
63 * hlc_delta = physical-component(pulse-hlc) - pulse-timestamp
64 *
65 * We maintain a exponential moving average of the hlc_delta to buffer against
66 * small fluctuations
67 * avg_hlc_delta = (ALPHA)(hlc_delta) + (1-ALPHA)(avg_hlc_delta)
68 *
69 * where ALPHA is set to weigh current values more over older values.
70 *
71 * Design
72 * =======
73 * The monitor is ticks on heartbeat message sends without requiring an
74 * additional thread. This is alright as heartbeat pulse messages are the
75 * vehicle used for skew detection. The amount of computation amortized across
76 * sent heartbeat pulse messages is minimal and should be maintained so.
77 */
78
79/*
80 * ----------------------------------------------------------------------------
81 * Constants
82 * ----------------------------------------------------------------------------
83 */
84
85/**
86 * Maximum allowed deviation in HLC clocks. A peer node's HLC clock will be
87 * considered bad if the difference between self HLC and peer's HLC exceeds this
88 * value.
89 *
90 * This value allows a node that barely synchronizes HLC once per node timeout.
91 */
92#define HLC_DEVIATION_MAX_MS (as_hb_node_timeout_get())
93
94/**
95 * Max allowed streak of bad HLC clock readings for a peer node. During the
96 * allowed streak,
97 * the peer node will be assumed to have hlc delta same as self node. Limited
98 * between 3 and 5.
99 */
100#define BAD_HLC_STREAK_MAX (MIN(5, MAX(3, as_hb_max_intervals_missed_get() / 2)))
101
102/**
103 * Ring buffer maximum capacity. The actual capacity is a function of the
104 * heartbeat node timeout.
105 */
106#define RING_BUFFER_CAPACITY_MAX (100)
107
108/**
109 * Threshold for (absolute deviation/median absolute deviation) beyond which
110 * nodes are labelled outliers.
111 */
112#define MAD_RATIO_OUTLIER_THRESHOLD 2
113
114/*
115 * ----------------------------------------------------------------------------
116 * Logging
117 * ----------------------------------------------------------------------------
118 */
119#define CRASH(format, ...) cf_crash(AS_SKEW, format, ##__VA_ARGS__)
120#define TICKER_WARNING(format, ...) \
121cf_ticker_warning(AS_SKEW, format, ##__VA_ARGS__)
122#define WARNING(format, ...) cf_warning(AS_SKEW, format, ##__VA_ARGS__)
123#define INFO(format, ...) cf_info(AS_SKEW, format, ##__VA_ARGS__)
124#define DEBUG(format, ...) cf_debug(AS_SKEW, format, ##__VA_ARGS__)
125#define DETAIL(format, ...) cf_detail(AS_SKEW, format, ##__VA_ARGS__)
126#define ring_buffer_log(buffer, message, severity) ring_buffer_log_event(buffer, message, severity, AS_SKEW, \
127 __FILENAME__, __LINE__);
128
129/*
130 * ----------------------------------------------------------------------------
131 * Skew monitor data structures
132 * ----------------------------------------------------------------------------
133 */
134
135/**
136 * Ring buffer holding a window of skew delta for a node.
137 */
138typedef struct as_skew_ring_buffer_s
139{
140 int64_t data[RING_BUFFER_CAPACITY_MAX];
141 int start;
142 int size;
143 int capacity;
144} as_skew_ring_buffer;
145
146/**
147 * Skew plugin data stored for all adjacent nodes.
148 */
149typedef struct as_skew_plugin_data_s
150{
151 as_skew_ring_buffer ring_buffer;
152 uint8_t bad_hlc_streak;
153} as_skew_plugin_data;
154
155/**
156 * Skew summary for a node for the current skew update interval.
157 */
158typedef struct as_skew_node_summary_s
159{
160 cf_node nodeid;
161 int64_t delta;
162} as_skew_node_summary;
163
164/**
165 * HB plugin data iterate struct to get node hlc deltas.
166 */
167typedef struct as_skew_monitor_hlc_delta_udata_s
168{
169 int num_nodes;
170 as_skew_node_summary skew_summary[AS_CLUSTER_SZ];
171} as_skew_monitor_hlc_delta_udata;
172
173/*
174 * ----------------------------------------------------------------------------
175 * External protected API for skew monitor
176 * ----------------------------------------------------------------------------
177 */
178extern int
179as_hb_msg_send_hlc_ts_get(msg* msg, as_hlc_timestamp* send_ts);
180
181/*
182 * ----------------------------------------------------------------------------
183 * Globals
184 * ----------------------------------------------------------------------------
185 */
186
187/**
188 * Last time skew was checked.
189 */
190static cf_atomic64 g_last_skew_check_time = 0;
191
192/**
193 * Current value of clock skew.
194 */
195static cf_atomic64 g_skew = 0;
196
197/**
198 * Self HLC delta over the last skew window. Access should under the self skew
199 * data lock.
200 */
201static as_skew_ring_buffer g_self_skew_ring_buffer = { { 0 } };
202
203/**
204 * Lock for self skew ring buffer.
205 */
206static pthread_mutex_t g_self_skew_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
207
208/*
209 * ----------------------------------------------------------------------------
210 * Skew intervals and limits
211 * ----------------------------------------------------------------------------
212 */
213
214/**
215 * Interval at which skew checks should be made.
216 */
217static uint64_t
218skew_check_interval()
219{
220 return MIN(2000, as_clustering_quantum_interval() / 2);
221}
222
223/**
224 * Threshold for outlier detection. Skew values less than this threshold will
225 * not invoke outlier detection.
226 */
227static uint64_t
228skew_monitor_outlier_detection_threshold()
229{
230 return as_clustering_quantum_interval();
231}
232
233/*
234 * ----------------------------------------------------------------------------
235 * Ring buffer related
236 * ----------------------------------------------------------------------------
237 */
238
239/**
240 * Log contents of the ring buffer.
241 */
242static void
243ring_buffer_log_event(as_skew_ring_buffer* ring_buffer, char* prefix,
244 cf_fault_severity severity, cf_fault_context context, char* file_name,
245 int line)
246{
247 int max_per_line = 25;
248 int max_bytes_per_value = 21; // Include the space as well
249 char log_buffer[(max_per_line * max_bytes_per_value) + 1]; // For the NULL terminator.
250 char* value_buffer_start = log_buffer;
251
252 if (prefix) {
253 value_buffer_start += sprintf(log_buffer, "%s", prefix);
254 }
255
256 for (int i = 0; i < ring_buffer->size; i++) {
257 char* buffer = value_buffer_start;
258 for (int j = 0; j < max_per_line && i < ring_buffer->size; j++) {
259 buffer += sprintf(buffer, "%ld ",
260 ring_buffer->data[(ring_buffer->start + i)
261 % ring_buffer->capacity]);
262 i++;
263 }
264
265 // Overwrite the space from the last node on the log line only if there
266 // is atleast one node output
267 if (buffer != value_buffer_start) {
268 *(buffer - 1) = 0;
269 cf_fault_event(context, severity, file_name, line, "%s",
270 log_buffer);
271 }
272 }
273}
274
275/**
276 * Get the representative hlc delta value for current ring buffer contents.
277 */
278static int64_t
279ring_buffer_hlc_delta(as_skew_ring_buffer* buffer)
280{
281 int64_t max_delta = 0;
282
283 for (int i = 0; i < buffer->size; i++) {
284 int64_t delta = buffer->data[(buffer->start + i) % buffer->capacity];
285 if (delta > max_delta) {
286 max_delta = delta;
287 }
288 }
289
290 return max_delta;
291}
292
293/**
294 * The current capacity of the ring buffer based on heartbeat node timeout,
295 * which determines how much skew history is maintained.
296 */
297static int
298ring_buffer_current_capacity()
299{
300 // Maintain a history for one node timeout interval
301 return MIN(RING_BUFFER_CAPACITY_MAX, as_hb_max_intervals_missed_get());
302}
303
304/**
305 * Adjust the contents of the ring buffer to new capacity.
306 */
307static void
308ring_buffer_adjust_to_capacity(as_skew_ring_buffer* buffer,
309 const int new_capacity)
310{
311 if (buffer->capacity == new_capacity) {
312 // No adjustments to data needed.
313 return;
314 }
315
316 // Will only happen if the heartbeat node timeout is changed of if this is
317 // the first insert.
318 int new_size = buffer->size;
319 if (buffer->size > new_capacity) {
320 int shrink_by = buffer->size - new_capacity;
321 // Drop the oldest values and copy over the rest.
322 buffer->start = (buffer->start + shrink_by) % buffer->capacity;
323 new_size = new_capacity;
324 }
325
326 // Shift values to be retained to start of the data array. Since this is not
327 // a frequent operations use the simple technique of making a copy.
328 int64_t adjusted_data[RING_BUFFER_CAPACITY_MAX];
329 for (int i = 0; i < new_size; i++) {
330 int buffer_index = (buffer->start + i) % buffer->capacity;
331 adjusted_data[i] = buffer->data[buffer_index];
332 }
333
334 // Reset the buffer to start at index 0 and have new capacity.
335 memcpy(buffer->data, adjusted_data, new_size);
336 buffer->capacity = new_capacity;
337 buffer->start = 0;
338 buffer->size = new_size;
339}
340
341/**
342 * Insert a new delta into the ring_buffer.
343 */
344static void
345ring_buffer_insert(as_skew_ring_buffer* buffer, const int64_t delta)
346{
347 ring_buffer_adjust_to_capacity(buffer, ring_buffer_current_capacity());
348
349 int insert_index = 0;
350 if (buffer->size == buffer->capacity) {
351 insert_index = buffer->start;
352 buffer->start = (buffer->start + 1) % buffer->capacity;
353 }
354 else {
355 insert_index = buffer->size;
356 buffer->size++;
357 }
358
359 buffer->data[insert_index] = delta;
360}
361
362/*
363 * ----------------------------------------------------------------------------
364 * HLC delta related
365 * ----------------------------------------------------------------------------
366 */
367
368/**
369 * Find min and max skew using difference between physical clock and hlc.
370 */
371static void
372skew_monitor_delta_collect_iterate(cf_node nodeid, void* plugin_data,
373 size_t plugin_data_size, cf_clock recv_monotonic_ts,
374 as_hlc_msg_timestamp* msg_hlc_ts, void* udata)
375{
376 int64_t delta = 0;
377 as_skew_monitor_hlc_delta_udata* deltas =
378 (as_skew_monitor_hlc_delta_udata*)udata;
379
380 if (!plugin_data || plugin_data_size < sizeof(as_skew_plugin_data)) {
381 // Assume missing nodes share the same delta as self.
382 // Note: self node will not be in adjacency list and hence will also
383 // follow same code path.
384 pthread_mutex_lock(&g_self_skew_lock);
385 delta = ring_buffer_hlc_delta(&g_self_skew_ring_buffer);
386 pthread_mutex_unlock(&g_self_skew_lock);
387 }
388 else {
389 as_skew_plugin_data* skew_plugin_data =
390 (as_skew_plugin_data*)plugin_data;
391 delta = ring_buffer_hlc_delta(&skew_plugin_data->ring_buffer);
392 }
393
394 int index = deltas->num_nodes;
395 deltas->skew_summary[index].delta = delta;
396 deltas->skew_summary[index].nodeid = nodeid;
397 deltas->num_nodes++;
398}
399
400/**
401 * Compute the skew across the cluster.
402 */
403static uint64_t
404skew_monitor_compute_skew()
405{
406 uint64_t skew = 0;
407 uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)];
408 cf_vector succession = { 0 };
409
410 cf_vector_init_smalloc(&succession, sizeof(cf_node), buffer, sizeof(buffer),
411 VECTOR_FLAG_INITZERO);
412 as_exchange_succession(&succession);
413
414 if (cf_vector_size(&succession) <= 1) {
415 // Self node is an orphan or single node cluster. No cluster wide skew.
416 skew = 0;
417 goto Cleanup;
418 }
419
420 as_skew_monitor_hlc_delta_udata udata = { 0 };
421 as_hb_plugin_data_iterate(&succession, AS_HB_PLUGIN_SKEW_MONITOR,
422 skew_monitor_delta_collect_iterate, &udata);
423
424 int64_t min = INT64_MAX;
425 int64_t max = INT64_MIN;
426
427 for (int i = 0; i < udata.num_nodes; i++) {
428 int64_t delta = udata.skew_summary[i].delta;
429 if (delta < min) {
430 min = delta;
431 }
432
433 if (delta > max) {
434 max = delta;
435 }
436 }
437 skew = max - min;
438
439Cleanup:
440 cf_vector_destroy(&succession);
441 return skew;
442}
443
444/**
445 * Update clock skew and fire skew events.
446 */
447static void
448skew_monitor_update()
449{
450 cf_clock now = cf_getms();
451 cf_atomic64_set(&g_last_skew_check_time, now);
452
453 uint64_t skew = skew_monitor_compute_skew();
454 cf_atomic64_set(&g_skew, skew);
455
456 for (int i = 0; i < g_config.n_namespaces; i++) {
457 as_namespace* ns = g_config.namespaces[i];
458
459 // Store return values so all handlers warn independently.
460 bool record_stop_writes = as_record_handle_clock_skew(ns, skew);
461 bool nsup_stop_writes = as_nsup_handle_clock_skew(ns, skew);
462
463 ns->clock_skew_stop_writes = record_stop_writes || nsup_stop_writes;
464 }
465}
466
467/*
468 * ----------------------------------------------------------------------------
469 * Outlier detection
470 * ----------------------------------------------------------------------------
471 */
472
473/**
474 * Comparator for deltas.
475 */
476static int
477skew_monitor_hlc_float_compare(const void* o1, const void* o2)
478{
479 float v1 = *(float*)o1;
480 float v2 = *(float*)o2;
481 return v1 > v2 ? 1 : (v1 == v2 ? 0 : -1);
482}
483
484/**
485 * Compute the median of the data.
486 *
487 * @param values the values sorted.
488 * @param from the start index (inclusive)
489 * @param to the end index (inclusive)
490 * @return the index of the median element
491 */
492static float
493skew_monitor_median(float* values, int num_elements)
494{
495 if (num_elements % 2 == 0) {
496 int median_left = (num_elements - 1) / 2;
497 int median_right = median_left + 1;
498 return (values[median_left] + values[median_right]) / 2.0f;
499 }
500
501 int median_index = (num_elements / 2);
502 return (float)values[median_index];
503}
504
505/**
506 * Return the currently estimated outliers from our cluster.
507 * Outliers should have space to hold at least AS_CLUSTER_SZ nodes.
508 */
509static uint32_t
510skew_monitor_outliers_from_skew_summary(cf_vector* outliers,
511 as_skew_monitor_hlc_delta_udata* udata)
512{
513 // Use Median Absolute Deviation(MAD) to detect outliers, in general the
514 // delta distribution would be symmetric and very close to the median.
515 int num_nodes = udata->num_nodes;
516 float deltas[num_nodes];
517 for (int i = 0; i < num_nodes; i++) {
518 deltas[i] = udata->skew_summary[i].delta;
519 }
520
521 // Compute median.
522 qsort(deltas, num_nodes, sizeof(float), skew_monitor_hlc_float_compare);
523 float median = skew_monitor_median(deltas, num_nodes);
524
525 // Compute absolute deviation from median.
526 float abs_dev[num_nodes];
527 for (int i = 0; i < num_nodes; i++) {
528 abs_dev[i] = fabsf(deltas[i] - median);
529 }
530
531 // Compute MAD.
532 qsort(abs_dev, num_nodes, sizeof(float), skew_monitor_hlc_float_compare);
533 float mad = skew_monitor_median(abs_dev, num_nodes);
534
535 uint32_t num_outliers = 0;
536
537 if (mad < 0.001f) {
538 // Most deltas are very close to the median. Call values significantly
539 // away as outliers.
540 for (int i = 0; i < udata->num_nodes; i++) {
541 if (fabsf(udata->skew_summary[i].delta - median)
542 > skew_monitor_outlier_detection_threshold()) {
543 if (outliers) {
544 cf_vector_append(outliers, &udata->skew_summary[i].nodeid);
545 }
546
547 num_outliers++;
548 }
549 }
550 }
551 else {
552 // Any node with delta deviating significantly compared to MAD is an
553 // outlier.
554 for (int i = 0; i < udata->num_nodes; i++) {
555 if ((fabsf(udata->skew_summary[i].delta - median) / mad)
556 > MAD_RATIO_OUTLIER_THRESHOLD) {
557 if (outliers) {
558 cf_vector_append(outliers, &udata->skew_summary[i].nodeid);
559 }
560
561 num_outliers++;
562 }
563 }
564 }
565
566 return num_outliers;
567}
568
569/**
570 * Return the currently estimated outliers from our cluster.
571 * Outliers should have space to hold at least AS_CLUSTER_SZ nodes.
572 */
573static uint32_t
574skew_monitor_outliers(cf_vector* outliers)
575{
576 if (as_skew_monitor_skew() < skew_monitor_outlier_detection_threshold()) {
577 // Skew is not significant. Skip printing outliers.
578 return 0;
579 }
580
581 uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)];
582 cf_vector succession;
583 cf_vector_init_smalloc(&succession, sizeof(cf_node), buffer, sizeof(buffer),
584 VECTOR_FLAG_INITZERO);
585 as_exchange_succession(&succession);
586
587 uint32_t num_outliers = 0;
588
589 uint32_t cluster_size = cf_vector_size(&succession);
590 if (cluster_size <= 1) {
591 // Self node is an orphan or single node cluster. No cluster wide skew.
592 goto Cleanup;
593 }
594
595 as_skew_monitor_hlc_delta_udata udata = { 0 };
596 as_hb_plugin_data_iterate(&succession, AS_HB_PLUGIN_SKEW_MONITOR,
597 skew_monitor_delta_collect_iterate, &udata);
598
599 num_outliers = skew_monitor_outliers_from_skew_summary(outliers, &udata);
600
601Cleanup:
602 cf_vector_destroy(&succession);
603
604 return num_outliers;
605}
606
607/*
608 * ----------------------------------------------------------------------------
609 * HB plugin functions
610 * ----------------------------------------------------------------------------
611 */
612
613/**
614 * Push current timestamp for self node into the heartbeat pulse message.
615 */
616static void
617skew_monitor_hb_plugin_set_fn(msg* msg)
618{
619 cf_clock send_ts = cf_clock_getabsolute();
620 msg_set_uint64(msg, AS_HB_MSG_SKEW_MONITOR_DATA, send_ts);
621
622 // Update self skew.
623 as_hlc_timestamp send_hlc_ts = as_hlc_timestamp_now();
624 int64_t clock_delta = as_hlc_physical_ts_get(send_hlc_ts) - send_ts;
625
626 // Update the clock delta for self.
627 pthread_mutex_lock(&g_self_skew_lock);
628 ring_buffer_insert(&g_self_skew_ring_buffer, clock_delta);
629 pthread_mutex_unlock(&g_self_skew_lock);
630
631 cf_clock now = cf_getms();
632 if (cf_atomic64_get(g_last_skew_check_time) + skew_check_interval() < now) {
633 skew_monitor_update();
634 }
635}
636
637/**
638 * Compare the HLC timestamp and the physical clock and store the difference as
639 * plugin data for the source node to enable skew detection.
640 */
641static void
642skew_monitor_hb_plugin_parse_data_fn(msg* msg, cf_node source,
643 as_hb_plugin_node_data* prev_plugin_data,
644 as_hb_plugin_node_data* plugin_data)
645{
646 cf_clock send_ts = 0;
647 as_hlc_timestamp send_hlc_ts = 0;
648 as_hlc_timestamp hlc_now = as_hlc_timestamp_now();
649
650 if (msg_get_uint64(msg, AS_HB_MSG_SKEW_MONITOR_DATA, &send_ts) != 0
651 || as_hb_msg_send_hlc_ts_get(msg, &send_hlc_ts) != 0) {
652 // Pre SC mode node. For now assumes it shares the same delta with hlc
653 // as us.
654 send_hlc_ts = hlc_now;
655 send_ts = cf_clock_getabsolute();
656 }
657
658 size_t required_capacity = sizeof(as_skew_plugin_data);
659 if (required_capacity > plugin_data->data_capacity) {
660 plugin_data->data = cf_realloc(plugin_data->data, required_capacity);
661
662 if (plugin_data->data == NULL) {
663 CRASH("error allocating skew data for node %lx", source);
664 }
665 plugin_data->data_capacity = required_capacity;
666 }
667
668 if (plugin_data->data_size == 0) {
669 // First data point.
670 memset(plugin_data->data, 0, required_capacity);
671 }
672
673 if (prev_plugin_data->data_size != 0) {
674 // Copy over older values to carry forward.
675 memcpy(plugin_data->data, prev_plugin_data->data, required_capacity);
676 }
677
678 as_skew_plugin_data* skew_plugin_data =
679 (as_skew_plugin_data*)plugin_data->data;
680
681 int64_t hlc_diff_ms = abs(as_hlc_timestamp_diff_ms(send_hlc_ts, hlc_now));
682
683 if (hlc_diff_ms > HLC_DEVIATION_MAX_MS) {
684 if (skew_plugin_data->bad_hlc_streak < BAD_HLC_STREAK_MAX) {
685 skew_plugin_data->bad_hlc_streak++;
686 INFO("node %lx HLC not in sync - hlc %lu self-hlc %lu diff %ld",
687 source, send_hlc_ts, hlc_now, hlc_diff_ms);
688 }
689 else {
690 // Long running streak.
691 TICKER_WARNING("node %lx HLC not in sync", source);
692 }
693 }
694 else {
695 // End the bad streak if the source is in one.
696 skew_plugin_data->bad_hlc_streak = 0;
697 }
698
699 int64_t delta = 0;
700 if ((skew_plugin_data->bad_hlc_streak > 0)
701 && skew_plugin_data->bad_hlc_streak <= BAD_HLC_STREAK_MAX) {
702 // The peer is in a tolerable bad hlc streak. Assume it has nominal hlc
703 // delta. This is most likely a restarted or a new node that hasn't
704 // caught up with the cluster HLC yet.
705 pthread_mutex_lock(&g_self_skew_lock);
706 delta = ring_buffer_hlc_delta(&g_self_skew_ring_buffer);
707 pthread_mutex_unlock(&g_self_skew_lock);
708 }
709 else {
710 // This measurement is safe to use.
711 delta = as_hlc_physical_ts_get(send_hlc_ts) - send_ts;
712 }
713
714 // Update the ring buffer with the new delta.
715 ring_buffer_insert(&skew_plugin_data->ring_buffer, delta);
716
717 if (cf_context_at_severity(AS_SKEW, CF_DETAIL)) {
718 // Temporary debugging.
719 char message[100];
720 sprintf(message, "Insert for node: %lx - ", source);
721 ring_buffer_log(&skew_plugin_data->ring_buffer, message, CF_DETAIL);
722 }
723
724 // Ensure the data size is set correctly.
725 plugin_data->data_size = required_capacity;
726
727 DETAIL("node %lx - hlc:%lu clock:%lu delta:%ld", source, send_hlc_ts,
728 send_ts, delta);
729}
730
731/*
732 * ----------------------------------------------------------------------------
733 * Protceted API only meant for clustering.
734 * ----------------------------------------------------------------------------
735 */
736
737/**
738 * Update clock skew and fire skew events.
739 */
740void
741as_skew_monitor_update()
742{
743 skew_monitor_update();
744}
745
746/*
747 * ----------------------------------------------------------------------------
748 * Public API
749 * ----------------------------------------------------------------------------
750 */
751
752/**
753 * Initialize skew monitor.
754 */
755void
756as_skew_monitor_init()
757{
758 as_hb_plugin skew_monitor_plugin = { 0 };
759
760 skew_monitor_plugin.id = AS_HB_PLUGIN_SKEW_MONITOR;
761 skew_monitor_plugin.wire_size_fixed = sizeof(int64_t);
762 // Size of the node in succession list.
763 skew_monitor_plugin.wire_size_per_node = 0;
764 skew_monitor_plugin.set_fn = skew_monitor_hb_plugin_set_fn;
765 skew_monitor_plugin.parse_fn = skew_monitor_hb_plugin_parse_data_fn;
766 as_hb_plugin_register(&skew_monitor_plugin);
767
768 DETAIL("skew monitor initialized");
769}
770
771/**
772 * Return the current estimate of the clock skew in the cluster.
773 */
774uint64_t
775as_skew_monitor_skew()
776{
777 return cf_atomic64_get(g_skew);
778}
779
780/**
781 * Return the currently estimated outliers from our cluster.
782 * Outliers should have space to hold at least AS_CLUSTER_SZ nodes.
783 */
784uint32_t
785as_skew_monitor_outliers(cf_vector* outliers)
786{
787 return skew_monitor_outliers(outliers);
788}
789
790/**
791 * Print skew outliers to a dynamic buffer.
792 */
793uint32_t
794as_skew_monitor_outliers_append(cf_dyn_buf* db)
795{
796 uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)];
797 cf_vector outliers;
798 cf_vector_init_smalloc(&outliers, sizeof(cf_node), buffer, sizeof(buffer),
799 VECTOR_FLAG_INITZERO);
800 uint32_t num_outliers = skew_monitor_outliers(&outliers);
801
802 for (uint32_t i = 0; i < num_outliers; i++) {
803 cf_node outlier_id;
804 cf_vector_get(&outliers, i, &outlier_id);
805 cf_dyn_buf_append_uint64_x(db, outlier_id);
806 cf_dyn_buf_append_char(db, ',');
807 }
808
809 if (num_outliers) {
810 cf_dyn_buf_chomp(db);
811 }
812
813 cf_vector_destroy(&outliers);
814
815 return num_outliers;
816}
817
818/**
819 * Print skew monitor info to a dynamic buffer.
820 */
821void
822as_skew_monitor_info(cf_dyn_buf* db)
823{
824 cf_dyn_buf_append_string(db, "cluster_clock_skew_outliers=");
825 uint32_t num_outliers = as_skew_monitor_outliers_append(db);
826 if (num_outliers == 0) {
827 cf_dyn_buf_append_string(db, "null");
828 }
829 cf_dyn_buf_append_char(db, ';');
830}
831
832/**
833 * Dump some debugging information to the logs.
834 */
835void
836as_skew_monitor_dump()
837{
838 uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)];
839 cf_vector node_vector;
840 cf_vector_init_smalloc(&node_vector, sizeof(cf_node), buffer,
841 sizeof(buffer), VECTOR_FLAG_INITZERO);
842 as_exchange_succession(&node_vector);
843
844 INFO("CSM: cluster-clock-skew:%ld", as_skew_monitor_skew());
845 if (cf_vector_size(&node_vector) <= 1) {
846 // Self node is an orphan or single node cluster. No cluster wide skew.
847 goto Cleanup;
848 }
849
850 as_skew_monitor_hlc_delta_udata udata = { 0 };
851 as_hb_plugin_data_iterate(&node_vector, AS_HB_PLUGIN_SKEW_MONITOR,
852 skew_monitor_delta_collect_iterate, &udata);
853
854 for (int i = 0; i < udata.num_nodes; i++) {
855 INFO("CSM: node:%lx hlc-delta:%ld", udata.skew_summary[i].nodeid,
856 udata.skew_summary[i].delta);
857 }
858
859 // Log the outliers.
860 cf_vector_clear(&node_vector);
861 skew_monitor_outliers(&node_vector);
862 if (cf_vector_size(&node_vector)) {
863 as_clustering_log_cf_node_vector(CF_INFO, AS_SKEW,
864 "CSM: Estimated clock outliers", &node_vector);
865 }
866
867Cleanup:
868 cf_vector_destroy(&node_vector);
869}
870