1 | /* |
2 | * skew_monitor.c |
3 | * |
4 | * Copyright (C) 2012-2017 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #include "fabric/skew_monitor.h" |
24 | |
25 | #include <math.h> |
26 | #include <stdbool.h> |
27 | #include <stdint.h> |
28 | #include <stdlib.h> |
29 | #include <sys/param.h> |
30 | |
31 | #include "citrusleaf/alloc.h" |
32 | |
33 | #include "msg.h" |
34 | |
35 | #include "base/cfg.h" |
36 | #include "base/datamodel.h" |
37 | #include "base/nsup.h" |
38 | #include "fabric/clustering.h" |
39 | #include "fabric/exchange.h" |
40 | #include "fabric/hb.h" |
41 | |
42 | /* |
43 | * Overview |
44 | * ======== |
45 | * Monitors skew across nodes in a cluster to allow other modules to handle skew |
46 | * beyond tolerances. For example CP namespaces block transctions on skew beyond |
47 | * tolerable limits. |
48 | * |
49 | * Principle of skew monitoring |
50 | * ============================ |
51 | * The hlc clock forms a pretty close upper bound on the physical clocks for |
52 | * adjacent nodes within the bounds of network trip time. |
53 | * |
54 | * Lets call the difference between a node's physical component of hlc time and |
55 | * physical time at the same instant as its hlc_delta. |
56 | * The premise is that the difference between the min hlc_delta and max |
57 | * hlc_delta observed for adjacent nodes closely follows the maximum clock skew |
58 | * in the cluster. |
59 | * |
60 | * The clock skew monitor adds a physical timestamp field to each heartbeat |
61 | * pulse message. |
62 | * For a peer node on receipt of a heartbeat pulse, hlc_delta is computed as |
63 | * hlc_delta = physical-component(pulse-hlc) - pulse-timestamp |
64 | * |
65 | * We maintain a exponential moving average of the hlc_delta to buffer against |
66 | * small fluctuations |
67 | * avg_hlc_delta = (ALPHA)(hlc_delta) + (1-ALPHA)(avg_hlc_delta) |
68 | * |
69 | * where ALPHA is set to weigh current values more over older values. |
70 | * |
71 | * Design |
72 | * ======= |
73 | * The monitor is ticks on heartbeat message sends without requiring an |
74 | * additional thread. This is alright as heartbeat pulse messages are the |
75 | * vehicle used for skew detection. The amount of computation amortized across |
76 | * sent heartbeat pulse messages is minimal and should be maintained so. |
77 | */ |
78 | |
79 | /* |
80 | * ---------------------------------------------------------------------------- |
81 | * Constants |
82 | * ---------------------------------------------------------------------------- |
83 | */ |
84 | |
85 | /** |
86 | * Maximum allowed deviation in HLC clocks. A peer node's HLC clock will be |
87 | * considered bad if the difference between self HLC and peer's HLC exceeds this |
88 | * value. |
89 | * |
90 | * This value allows a node that barely synchronizes HLC once per node timeout. |
91 | */ |
92 | #define HLC_DEVIATION_MAX_MS (as_hb_node_timeout_get()) |
93 | |
94 | /** |
95 | * Max allowed streak of bad HLC clock readings for a peer node. During the |
96 | * allowed streak, |
97 | * the peer node will be assumed to have hlc delta same as self node. Limited |
98 | * between 3 and 5. |
99 | */ |
100 | #define BAD_HLC_STREAK_MAX (MIN(5, MAX(3, as_hb_max_intervals_missed_get() / 2))) |
101 | |
102 | /** |
103 | * Ring buffer maximum capacity. The actual capacity is a function of the |
104 | * heartbeat node timeout. |
105 | */ |
106 | #define RING_BUFFER_CAPACITY_MAX (100) |
107 | |
108 | /** |
109 | * Threshold for (absolute deviation/median absolute deviation) beyond which |
110 | * nodes are labelled outliers. |
111 | */ |
112 | #define MAD_RATIO_OUTLIER_THRESHOLD 2 |
113 | |
114 | /* |
115 | * ---------------------------------------------------------------------------- |
116 | * Logging |
117 | * ---------------------------------------------------------------------------- |
118 | */ |
119 | #define CRASH(format, ...) cf_crash(AS_SKEW, format, ##__VA_ARGS__) |
120 | #define TICKER_WARNING(format, ...) \ |
121 | cf_ticker_warning(AS_SKEW, format, ##__VA_ARGS__) |
122 | #define WARNING(format, ...) cf_warning(AS_SKEW, format, ##__VA_ARGS__) |
123 | #define INFO(format, ...) cf_info(AS_SKEW, format, ##__VA_ARGS__) |
124 | #define DEBUG(format, ...) cf_debug(AS_SKEW, format, ##__VA_ARGS__) |
125 | #define DETAIL(format, ...) cf_detail(AS_SKEW, format, ##__VA_ARGS__) |
126 | #define ring_buffer_log(buffer, message, severity) ring_buffer_log_event(buffer, message, severity, AS_SKEW, \ |
127 | __FILENAME__, __LINE__); |
128 | |
129 | /* |
130 | * ---------------------------------------------------------------------------- |
131 | * Skew monitor data structures |
132 | * ---------------------------------------------------------------------------- |
133 | */ |
134 | |
135 | /** |
136 | * Ring buffer holding a window of skew delta for a node. |
137 | */ |
138 | typedef struct as_skew_ring_buffer_s |
139 | { |
140 | int64_t data[RING_BUFFER_CAPACITY_MAX]; |
141 | int start; |
142 | int size; |
143 | int capacity; |
144 | } as_skew_ring_buffer; |
145 | |
146 | /** |
147 | * Skew plugin data stored for all adjacent nodes. |
148 | */ |
149 | typedef struct as_skew_plugin_data_s |
150 | { |
151 | as_skew_ring_buffer ring_buffer; |
152 | uint8_t bad_hlc_streak; |
153 | } as_skew_plugin_data; |
154 | |
155 | /** |
156 | * Skew summary for a node for the current skew update interval. |
157 | */ |
158 | typedef struct as_skew_node_summary_s |
159 | { |
160 | cf_node nodeid; |
161 | int64_t delta; |
162 | } as_skew_node_summary; |
163 | |
164 | /** |
165 | * HB plugin data iterate struct to get node hlc deltas. |
166 | */ |
167 | typedef struct as_skew_monitor_hlc_delta_udata_s |
168 | { |
169 | int num_nodes; |
170 | as_skew_node_summary skew_summary[AS_CLUSTER_SZ]; |
171 | } as_skew_monitor_hlc_delta_udata; |
172 | |
173 | /* |
174 | * ---------------------------------------------------------------------------- |
175 | * External protected API for skew monitor |
176 | * ---------------------------------------------------------------------------- |
177 | */ |
178 | extern int |
179 | as_hb_msg_send_hlc_ts_get(msg* msg, as_hlc_timestamp* send_ts); |
180 | |
181 | /* |
182 | * ---------------------------------------------------------------------------- |
183 | * Globals |
184 | * ---------------------------------------------------------------------------- |
185 | */ |
186 | |
187 | /** |
188 | * Last time skew was checked. |
189 | */ |
190 | static cf_atomic64 g_last_skew_check_time = 0; |
191 | |
192 | /** |
193 | * Current value of clock skew. |
194 | */ |
195 | static cf_atomic64 g_skew = 0; |
196 | |
197 | /** |
198 | * Self HLC delta over the last skew window. Access should under the self skew |
199 | * data lock. |
200 | */ |
201 | static as_skew_ring_buffer g_self_skew_ring_buffer = { { 0 } }; |
202 | |
203 | /** |
204 | * Lock for self skew ring buffer. |
205 | */ |
206 | static pthread_mutex_t g_self_skew_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; |
207 | |
208 | /* |
209 | * ---------------------------------------------------------------------------- |
210 | * Skew intervals and limits |
211 | * ---------------------------------------------------------------------------- |
212 | */ |
213 | |
214 | /** |
215 | * Interval at which skew checks should be made. |
216 | */ |
217 | static uint64_t |
218 | skew_check_interval() |
219 | { |
220 | return MIN(2000, as_clustering_quantum_interval() / 2); |
221 | } |
222 | |
223 | /** |
224 | * Threshold for outlier detection. Skew values less than this threshold will |
225 | * not invoke outlier detection. |
226 | */ |
227 | static uint64_t |
228 | skew_monitor_outlier_detection_threshold() |
229 | { |
230 | return as_clustering_quantum_interval(); |
231 | } |
232 | |
233 | /* |
234 | * ---------------------------------------------------------------------------- |
235 | * Ring buffer related |
236 | * ---------------------------------------------------------------------------- |
237 | */ |
238 | |
239 | /** |
240 | * Log contents of the ring buffer. |
241 | */ |
242 | static void |
243 | ring_buffer_log_event(as_skew_ring_buffer* ring_buffer, char* prefix, |
244 | cf_fault_severity severity, cf_fault_context context, char* file_name, |
245 | int line) |
246 | { |
247 | int max_per_line = 25; |
248 | int max_bytes_per_value = 21; // Include the space as well |
249 | char log_buffer[(max_per_line * max_bytes_per_value) + 1]; // For the NULL terminator. |
250 | char* value_buffer_start = log_buffer; |
251 | |
252 | if (prefix) { |
253 | value_buffer_start += sprintf(log_buffer, "%s" , prefix); |
254 | } |
255 | |
256 | for (int i = 0; i < ring_buffer->size; i++) { |
257 | char* buffer = value_buffer_start; |
258 | for (int j = 0; j < max_per_line && i < ring_buffer->size; j++) { |
259 | buffer += sprintf(buffer, "%ld " , |
260 | ring_buffer->data[(ring_buffer->start + i) |
261 | % ring_buffer->capacity]); |
262 | i++; |
263 | } |
264 | |
265 | // Overwrite the space from the last node on the log line only if there |
266 | // is atleast one node output |
267 | if (buffer != value_buffer_start) { |
268 | *(buffer - 1) = 0; |
269 | cf_fault_event(context, severity, file_name, line, "%s" , |
270 | log_buffer); |
271 | } |
272 | } |
273 | } |
274 | |
275 | /** |
276 | * Get the representative hlc delta value for current ring buffer contents. |
277 | */ |
278 | static int64_t |
279 | ring_buffer_hlc_delta(as_skew_ring_buffer* buffer) |
280 | { |
281 | int64_t max_delta = 0; |
282 | |
283 | for (int i = 0; i < buffer->size; i++) { |
284 | int64_t delta = buffer->data[(buffer->start + i) % buffer->capacity]; |
285 | if (delta > max_delta) { |
286 | max_delta = delta; |
287 | } |
288 | } |
289 | |
290 | return max_delta; |
291 | } |
292 | |
293 | /** |
294 | * The current capacity of the ring buffer based on heartbeat node timeout, |
295 | * which determines how much skew history is maintained. |
296 | */ |
297 | static int |
298 | ring_buffer_current_capacity() |
299 | { |
300 | // Maintain a history for one node timeout interval |
301 | return MIN(RING_BUFFER_CAPACITY_MAX, as_hb_max_intervals_missed_get()); |
302 | } |
303 | |
304 | /** |
305 | * Adjust the contents of the ring buffer to new capacity. |
306 | */ |
307 | static void |
308 | ring_buffer_adjust_to_capacity(as_skew_ring_buffer* buffer, |
309 | const int new_capacity) |
310 | { |
311 | if (buffer->capacity == new_capacity) { |
312 | // No adjustments to data needed. |
313 | return; |
314 | } |
315 | |
316 | // Will only happen if the heartbeat node timeout is changed of if this is |
317 | // the first insert. |
318 | int new_size = buffer->size; |
319 | if (buffer->size > new_capacity) { |
320 | int shrink_by = buffer->size - new_capacity; |
321 | // Drop the oldest values and copy over the rest. |
322 | buffer->start = (buffer->start + shrink_by) % buffer->capacity; |
323 | new_size = new_capacity; |
324 | } |
325 | |
326 | // Shift values to be retained to start of the data array. Since this is not |
327 | // a frequent operations use the simple technique of making a copy. |
328 | int64_t adjusted_data[RING_BUFFER_CAPACITY_MAX]; |
329 | for (int i = 0; i < new_size; i++) { |
330 | int buffer_index = (buffer->start + i) % buffer->capacity; |
331 | adjusted_data[i] = buffer->data[buffer_index]; |
332 | } |
333 | |
334 | // Reset the buffer to start at index 0 and have new capacity. |
335 | memcpy(buffer->data, adjusted_data, new_size); |
336 | buffer->capacity = new_capacity; |
337 | buffer->start = 0; |
338 | buffer->size = new_size; |
339 | } |
340 | |
341 | /** |
342 | * Insert a new delta into the ring_buffer. |
343 | */ |
344 | static void |
345 | ring_buffer_insert(as_skew_ring_buffer* buffer, const int64_t delta) |
346 | { |
347 | ring_buffer_adjust_to_capacity(buffer, ring_buffer_current_capacity()); |
348 | |
349 | int insert_index = 0; |
350 | if (buffer->size == buffer->capacity) { |
351 | insert_index = buffer->start; |
352 | buffer->start = (buffer->start + 1) % buffer->capacity; |
353 | } |
354 | else { |
355 | insert_index = buffer->size; |
356 | buffer->size++; |
357 | } |
358 | |
359 | buffer->data[insert_index] = delta; |
360 | } |
361 | |
362 | /* |
363 | * ---------------------------------------------------------------------------- |
364 | * HLC delta related |
365 | * ---------------------------------------------------------------------------- |
366 | */ |
367 | |
368 | /** |
369 | * Find min and max skew using difference between physical clock and hlc. |
370 | */ |
371 | static void |
372 | skew_monitor_delta_collect_iterate(cf_node nodeid, void* plugin_data, |
373 | size_t plugin_data_size, cf_clock recv_monotonic_ts, |
374 | as_hlc_msg_timestamp* msg_hlc_ts, void* udata) |
375 | { |
376 | int64_t delta = 0; |
377 | as_skew_monitor_hlc_delta_udata* deltas = |
378 | (as_skew_monitor_hlc_delta_udata*)udata; |
379 | |
380 | if (!plugin_data || plugin_data_size < sizeof(as_skew_plugin_data)) { |
381 | // Assume missing nodes share the same delta as self. |
382 | // Note: self node will not be in adjacency list and hence will also |
383 | // follow same code path. |
384 | pthread_mutex_lock(&g_self_skew_lock); |
385 | delta = ring_buffer_hlc_delta(&g_self_skew_ring_buffer); |
386 | pthread_mutex_unlock(&g_self_skew_lock); |
387 | } |
388 | else { |
389 | as_skew_plugin_data* skew_plugin_data = |
390 | (as_skew_plugin_data*)plugin_data; |
391 | delta = ring_buffer_hlc_delta(&skew_plugin_data->ring_buffer); |
392 | } |
393 | |
394 | int index = deltas->num_nodes; |
395 | deltas->skew_summary[index].delta = delta; |
396 | deltas->skew_summary[index].nodeid = nodeid; |
397 | deltas->num_nodes++; |
398 | } |
399 | |
400 | /** |
401 | * Compute the skew across the cluster. |
402 | */ |
403 | static uint64_t |
404 | skew_monitor_compute_skew() |
405 | { |
406 | uint64_t skew = 0; |
407 | uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)]; |
408 | cf_vector succession = { 0 }; |
409 | |
410 | cf_vector_init_smalloc(&succession, sizeof(cf_node), buffer, sizeof(buffer), |
411 | VECTOR_FLAG_INITZERO); |
412 | as_exchange_succession(&succession); |
413 | |
414 | if (cf_vector_size(&succession) <= 1) { |
415 | // Self node is an orphan or single node cluster. No cluster wide skew. |
416 | skew = 0; |
417 | goto Cleanup; |
418 | } |
419 | |
420 | as_skew_monitor_hlc_delta_udata udata = { 0 }; |
421 | as_hb_plugin_data_iterate(&succession, AS_HB_PLUGIN_SKEW_MONITOR, |
422 | skew_monitor_delta_collect_iterate, &udata); |
423 | |
424 | int64_t min = INT64_MAX; |
425 | int64_t max = INT64_MIN; |
426 | |
427 | for (int i = 0; i < udata.num_nodes; i++) { |
428 | int64_t delta = udata.skew_summary[i].delta; |
429 | if (delta < min) { |
430 | min = delta; |
431 | } |
432 | |
433 | if (delta > max) { |
434 | max = delta; |
435 | } |
436 | } |
437 | skew = max - min; |
438 | |
439 | Cleanup: |
440 | cf_vector_destroy(&succession); |
441 | return skew; |
442 | } |
443 | |
444 | /** |
445 | * Update clock skew and fire skew events. |
446 | */ |
447 | static void |
448 | skew_monitor_update() |
449 | { |
450 | cf_clock now = cf_getms(); |
451 | cf_atomic64_set(&g_last_skew_check_time, now); |
452 | |
453 | uint64_t skew = skew_monitor_compute_skew(); |
454 | cf_atomic64_set(&g_skew, skew); |
455 | |
456 | for (int i = 0; i < g_config.n_namespaces; i++) { |
457 | as_namespace* ns = g_config.namespaces[i]; |
458 | |
459 | // Store return values so all handlers warn independently. |
460 | bool record_stop_writes = as_record_handle_clock_skew(ns, skew); |
461 | bool nsup_stop_writes = as_nsup_handle_clock_skew(ns, skew); |
462 | |
463 | ns->clock_skew_stop_writes = record_stop_writes || nsup_stop_writes; |
464 | } |
465 | } |
466 | |
467 | /* |
468 | * ---------------------------------------------------------------------------- |
469 | * Outlier detection |
470 | * ---------------------------------------------------------------------------- |
471 | */ |
472 | |
473 | /** |
474 | * Comparator for deltas. |
475 | */ |
476 | static int |
477 | skew_monitor_hlc_float_compare(const void* o1, const void* o2) |
478 | { |
479 | float v1 = *(float*)o1; |
480 | float v2 = *(float*)o2; |
481 | return v1 > v2 ? 1 : (v1 == v2 ? 0 : -1); |
482 | } |
483 | |
484 | /** |
485 | * Compute the median of the data. |
486 | * |
487 | * @param values the values sorted. |
488 | * @param from the start index (inclusive) |
489 | * @param to the end index (inclusive) |
490 | * @return the index of the median element |
491 | */ |
492 | static float |
493 | skew_monitor_median(float* values, int num_elements) |
494 | { |
495 | if (num_elements % 2 == 0) { |
496 | int median_left = (num_elements - 1) / 2; |
497 | int median_right = median_left + 1; |
498 | return (values[median_left] + values[median_right]) / 2.0f; |
499 | } |
500 | |
501 | int median_index = (num_elements / 2); |
502 | return (float)values[median_index]; |
503 | } |
504 | |
505 | /** |
506 | * Return the currently estimated outliers from our cluster. |
507 | * Outliers should have space to hold at least AS_CLUSTER_SZ nodes. |
508 | */ |
509 | static uint32_t |
510 | skew_monitor_outliers_from_skew_summary(cf_vector* outliers, |
511 | as_skew_monitor_hlc_delta_udata* udata) |
512 | { |
513 | // Use Median Absolute Deviation(MAD) to detect outliers, in general the |
514 | // delta distribution would be symmetric and very close to the median. |
515 | int num_nodes = udata->num_nodes; |
516 | float deltas[num_nodes]; |
517 | for (int i = 0; i < num_nodes; i++) { |
518 | deltas[i] = udata->skew_summary[i].delta; |
519 | } |
520 | |
521 | // Compute median. |
522 | qsort(deltas, num_nodes, sizeof(float), skew_monitor_hlc_float_compare); |
523 | float median = skew_monitor_median(deltas, num_nodes); |
524 | |
525 | // Compute absolute deviation from median. |
526 | float abs_dev[num_nodes]; |
527 | for (int i = 0; i < num_nodes; i++) { |
528 | abs_dev[i] = fabsf(deltas[i] - median); |
529 | } |
530 | |
531 | // Compute MAD. |
532 | qsort(abs_dev, num_nodes, sizeof(float), skew_monitor_hlc_float_compare); |
533 | float mad = skew_monitor_median(abs_dev, num_nodes); |
534 | |
535 | uint32_t num_outliers = 0; |
536 | |
537 | if (mad < 0.001f) { |
538 | // Most deltas are very close to the median. Call values significantly |
539 | // away as outliers. |
540 | for (int i = 0; i < udata->num_nodes; i++) { |
541 | if (fabsf(udata->skew_summary[i].delta - median) |
542 | > skew_monitor_outlier_detection_threshold()) { |
543 | if (outliers) { |
544 | cf_vector_append(outliers, &udata->skew_summary[i].nodeid); |
545 | } |
546 | |
547 | num_outliers++; |
548 | } |
549 | } |
550 | } |
551 | else { |
552 | // Any node with delta deviating significantly compared to MAD is an |
553 | // outlier. |
554 | for (int i = 0; i < udata->num_nodes; i++) { |
555 | if ((fabsf(udata->skew_summary[i].delta - median) / mad) |
556 | > MAD_RATIO_OUTLIER_THRESHOLD) { |
557 | if (outliers) { |
558 | cf_vector_append(outliers, &udata->skew_summary[i].nodeid); |
559 | } |
560 | |
561 | num_outliers++; |
562 | } |
563 | } |
564 | } |
565 | |
566 | return num_outliers; |
567 | } |
568 | |
569 | /** |
570 | * Return the currently estimated outliers from our cluster. |
571 | * Outliers should have space to hold at least AS_CLUSTER_SZ nodes. |
572 | */ |
573 | static uint32_t |
574 | skew_monitor_outliers(cf_vector* outliers) |
575 | { |
576 | if (as_skew_monitor_skew() < skew_monitor_outlier_detection_threshold()) { |
577 | // Skew is not significant. Skip printing outliers. |
578 | return 0; |
579 | } |
580 | |
581 | uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)]; |
582 | cf_vector succession; |
583 | cf_vector_init_smalloc(&succession, sizeof(cf_node), buffer, sizeof(buffer), |
584 | VECTOR_FLAG_INITZERO); |
585 | as_exchange_succession(&succession); |
586 | |
587 | uint32_t num_outliers = 0; |
588 | |
589 | uint32_t cluster_size = cf_vector_size(&succession); |
590 | if (cluster_size <= 1) { |
591 | // Self node is an orphan or single node cluster. No cluster wide skew. |
592 | goto Cleanup; |
593 | } |
594 | |
595 | as_skew_monitor_hlc_delta_udata udata = { 0 }; |
596 | as_hb_plugin_data_iterate(&succession, AS_HB_PLUGIN_SKEW_MONITOR, |
597 | skew_monitor_delta_collect_iterate, &udata); |
598 | |
599 | num_outliers = skew_monitor_outliers_from_skew_summary(outliers, &udata); |
600 | |
601 | Cleanup: |
602 | cf_vector_destroy(&succession); |
603 | |
604 | return num_outliers; |
605 | } |
606 | |
607 | /* |
608 | * ---------------------------------------------------------------------------- |
609 | * HB plugin functions |
610 | * ---------------------------------------------------------------------------- |
611 | */ |
612 | |
613 | /** |
614 | * Push current timestamp for self node into the heartbeat pulse message. |
615 | */ |
616 | static void |
617 | skew_monitor_hb_plugin_set_fn(msg* msg) |
618 | { |
619 | cf_clock send_ts = cf_clock_getabsolute(); |
620 | msg_set_uint64(msg, AS_HB_MSG_SKEW_MONITOR_DATA, send_ts); |
621 | |
622 | // Update self skew. |
623 | as_hlc_timestamp send_hlc_ts = as_hlc_timestamp_now(); |
624 | int64_t clock_delta = as_hlc_physical_ts_get(send_hlc_ts) - send_ts; |
625 | |
626 | // Update the clock delta for self. |
627 | pthread_mutex_lock(&g_self_skew_lock); |
628 | ring_buffer_insert(&g_self_skew_ring_buffer, clock_delta); |
629 | pthread_mutex_unlock(&g_self_skew_lock); |
630 | |
631 | cf_clock now = cf_getms(); |
632 | if (cf_atomic64_get(g_last_skew_check_time) + skew_check_interval() < now) { |
633 | skew_monitor_update(); |
634 | } |
635 | } |
636 | |
637 | /** |
638 | * Compare the HLC timestamp and the physical clock and store the difference as |
639 | * plugin data for the source node to enable skew detection. |
640 | */ |
641 | static void |
642 | skew_monitor_hb_plugin_parse_data_fn(msg* msg, cf_node source, |
643 | as_hb_plugin_node_data* prev_plugin_data, |
644 | as_hb_plugin_node_data* plugin_data) |
645 | { |
646 | cf_clock send_ts = 0; |
647 | as_hlc_timestamp send_hlc_ts = 0; |
648 | as_hlc_timestamp hlc_now = as_hlc_timestamp_now(); |
649 | |
650 | if (msg_get_uint64(msg, AS_HB_MSG_SKEW_MONITOR_DATA, &send_ts) != 0 |
651 | || as_hb_msg_send_hlc_ts_get(msg, &send_hlc_ts) != 0) { |
652 | // Pre SC mode node. For now assumes it shares the same delta with hlc |
653 | // as us. |
654 | send_hlc_ts = hlc_now; |
655 | send_ts = cf_clock_getabsolute(); |
656 | } |
657 | |
658 | size_t required_capacity = sizeof(as_skew_plugin_data); |
659 | if (required_capacity > plugin_data->data_capacity) { |
660 | plugin_data->data = cf_realloc(plugin_data->data, required_capacity); |
661 | |
662 | if (plugin_data->data == NULL) { |
663 | CRASH("error allocating skew data for node %lx" , source); |
664 | } |
665 | plugin_data->data_capacity = required_capacity; |
666 | } |
667 | |
668 | if (plugin_data->data_size == 0) { |
669 | // First data point. |
670 | memset(plugin_data->data, 0, required_capacity); |
671 | } |
672 | |
673 | if (prev_plugin_data->data_size != 0) { |
674 | // Copy over older values to carry forward. |
675 | memcpy(plugin_data->data, prev_plugin_data->data, required_capacity); |
676 | } |
677 | |
678 | as_skew_plugin_data* skew_plugin_data = |
679 | (as_skew_plugin_data*)plugin_data->data; |
680 | |
681 | int64_t hlc_diff_ms = abs(as_hlc_timestamp_diff_ms(send_hlc_ts, hlc_now)); |
682 | |
683 | if (hlc_diff_ms > HLC_DEVIATION_MAX_MS) { |
684 | if (skew_plugin_data->bad_hlc_streak < BAD_HLC_STREAK_MAX) { |
685 | skew_plugin_data->bad_hlc_streak++; |
686 | INFO("node %lx HLC not in sync - hlc %lu self-hlc %lu diff %ld" , |
687 | source, send_hlc_ts, hlc_now, hlc_diff_ms); |
688 | } |
689 | else { |
690 | // Long running streak. |
691 | TICKER_WARNING("node %lx HLC not in sync" , source); |
692 | } |
693 | } |
694 | else { |
695 | // End the bad streak if the source is in one. |
696 | skew_plugin_data->bad_hlc_streak = 0; |
697 | } |
698 | |
699 | int64_t delta = 0; |
700 | if ((skew_plugin_data->bad_hlc_streak > 0) |
701 | && skew_plugin_data->bad_hlc_streak <= BAD_HLC_STREAK_MAX) { |
702 | // The peer is in a tolerable bad hlc streak. Assume it has nominal hlc |
703 | // delta. This is most likely a restarted or a new node that hasn't |
704 | // caught up with the cluster HLC yet. |
705 | pthread_mutex_lock(&g_self_skew_lock); |
706 | delta = ring_buffer_hlc_delta(&g_self_skew_ring_buffer); |
707 | pthread_mutex_unlock(&g_self_skew_lock); |
708 | } |
709 | else { |
710 | // This measurement is safe to use. |
711 | delta = as_hlc_physical_ts_get(send_hlc_ts) - send_ts; |
712 | } |
713 | |
714 | // Update the ring buffer with the new delta. |
715 | ring_buffer_insert(&skew_plugin_data->ring_buffer, delta); |
716 | |
717 | if (cf_context_at_severity(AS_SKEW, CF_DETAIL)) { |
718 | // Temporary debugging. |
719 | char message[100]; |
720 | sprintf(message, "Insert for node: %lx - " , source); |
721 | ring_buffer_log(&skew_plugin_data->ring_buffer, message, CF_DETAIL); |
722 | } |
723 | |
724 | // Ensure the data size is set correctly. |
725 | plugin_data->data_size = required_capacity; |
726 | |
727 | DETAIL("node %lx - hlc:%lu clock:%lu delta:%ld" , source, send_hlc_ts, |
728 | send_ts, delta); |
729 | } |
730 | |
731 | /* |
732 | * ---------------------------------------------------------------------------- |
733 | * Protceted API only meant for clustering. |
734 | * ---------------------------------------------------------------------------- |
735 | */ |
736 | |
737 | /** |
738 | * Update clock skew and fire skew events. |
739 | */ |
740 | void |
741 | as_skew_monitor_update() |
742 | { |
743 | skew_monitor_update(); |
744 | } |
745 | |
746 | /* |
747 | * ---------------------------------------------------------------------------- |
748 | * Public API |
749 | * ---------------------------------------------------------------------------- |
750 | */ |
751 | |
752 | /** |
753 | * Initialize skew monitor. |
754 | */ |
755 | void |
756 | as_skew_monitor_init() |
757 | { |
758 | as_hb_plugin skew_monitor_plugin = { 0 }; |
759 | |
760 | skew_monitor_plugin.id = AS_HB_PLUGIN_SKEW_MONITOR; |
761 | skew_monitor_plugin.wire_size_fixed = sizeof(int64_t); |
762 | // Size of the node in succession list. |
763 | skew_monitor_plugin.wire_size_per_node = 0; |
764 | skew_monitor_plugin.set_fn = skew_monitor_hb_plugin_set_fn; |
765 | skew_monitor_plugin.parse_fn = skew_monitor_hb_plugin_parse_data_fn; |
766 | as_hb_plugin_register(&skew_monitor_plugin); |
767 | |
768 | DETAIL("skew monitor initialized" ); |
769 | } |
770 | |
771 | /** |
772 | * Return the current estimate of the clock skew in the cluster. |
773 | */ |
774 | uint64_t |
775 | as_skew_monitor_skew() |
776 | { |
777 | return cf_atomic64_get(g_skew); |
778 | } |
779 | |
780 | /** |
781 | * Return the currently estimated outliers from our cluster. |
782 | * Outliers should have space to hold at least AS_CLUSTER_SZ nodes. |
783 | */ |
784 | uint32_t |
785 | as_skew_monitor_outliers(cf_vector* outliers) |
786 | { |
787 | return skew_monitor_outliers(outliers); |
788 | } |
789 | |
790 | /** |
791 | * Print skew outliers to a dynamic buffer. |
792 | */ |
793 | uint32_t |
794 | as_skew_monitor_outliers_append(cf_dyn_buf* db) |
795 | { |
796 | uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)]; |
797 | cf_vector outliers; |
798 | cf_vector_init_smalloc(&outliers, sizeof(cf_node), buffer, sizeof(buffer), |
799 | VECTOR_FLAG_INITZERO); |
800 | uint32_t num_outliers = skew_monitor_outliers(&outliers); |
801 | |
802 | for (uint32_t i = 0; i < num_outliers; i++) { |
803 | cf_node outlier_id; |
804 | cf_vector_get(&outliers, i, &outlier_id); |
805 | cf_dyn_buf_append_uint64_x(db, outlier_id); |
806 | cf_dyn_buf_append_char(db, ','); |
807 | } |
808 | |
809 | if (num_outliers) { |
810 | cf_dyn_buf_chomp(db); |
811 | } |
812 | |
813 | cf_vector_destroy(&outliers); |
814 | |
815 | return num_outliers; |
816 | } |
817 | |
818 | /** |
819 | * Print skew monitor info to a dynamic buffer. |
820 | */ |
821 | void |
822 | as_skew_monitor_info(cf_dyn_buf* db) |
823 | { |
824 | cf_dyn_buf_append_string(db, "cluster_clock_skew_outliers=" ); |
825 | uint32_t num_outliers = as_skew_monitor_outliers_append(db); |
826 | if (num_outliers == 0) { |
827 | cf_dyn_buf_append_string(db, "null" ); |
828 | } |
829 | cf_dyn_buf_append_char(db, ';'); |
830 | } |
831 | |
832 | /** |
833 | * Dump some debugging information to the logs. |
834 | */ |
835 | void |
836 | as_skew_monitor_dump() |
837 | { |
838 | uint8_t buffer[AS_CLUSTER_SZ * sizeof(cf_node)]; |
839 | cf_vector node_vector; |
840 | cf_vector_init_smalloc(&node_vector, sizeof(cf_node), buffer, |
841 | sizeof(buffer), VECTOR_FLAG_INITZERO); |
842 | as_exchange_succession(&node_vector); |
843 | |
844 | INFO("CSM: cluster-clock-skew:%ld" , as_skew_monitor_skew()); |
845 | if (cf_vector_size(&node_vector) <= 1) { |
846 | // Self node is an orphan or single node cluster. No cluster wide skew. |
847 | goto Cleanup; |
848 | } |
849 | |
850 | as_skew_monitor_hlc_delta_udata udata = { 0 }; |
851 | as_hb_plugin_data_iterate(&node_vector, AS_HB_PLUGIN_SKEW_MONITOR, |
852 | skew_monitor_delta_collect_iterate, &udata); |
853 | |
854 | for (int i = 0; i < udata.num_nodes; i++) { |
855 | INFO("CSM: node:%lx hlc-delta:%ld" , udata.skew_summary[i].nodeid, |
856 | udata.skew_summary[i].delta); |
857 | } |
858 | |
859 | // Log the outliers. |
860 | cf_vector_clear(&node_vector); |
861 | skew_monitor_outliers(&node_vector); |
862 | if (cf_vector_size(&node_vector)) { |
863 | as_clustering_log_cf_node_vector(CF_INFO, AS_SKEW, |
864 | "CSM: Estimated clock outliers" , &node_vector); |
865 | } |
866 | |
867 | Cleanup: |
868 | cf_vector_destroy(&node_vector); |
869 | } |
870 | |