1/*
2 * nsup.c
3 *
4 * Copyright (C) 2019 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/nsup.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <stdio.h>
33#include <stdlib.h>
34#include <unistd.h>
35
36#include "aerospike/as_atomic.h"
37#include "citrusleaf/cf_clock.h"
38#include "citrusleaf/cf_vector.h"
39
40#include "cf_thread.h"
41#include "fault.h"
42#include "hardware.h"
43#include "linear_hist.h"
44#include "node.h"
45#include "vmapx.h"
46
47#include "base/cfg.h"
48#include "base/datamodel.h"
49#include "base/index.h"
50#include "base/smd.h"
51#include "base/xdr_serverside.h"
52#include "fabric/partition.h"
53#include "storage/storage.h"
54
55#include "warnings.h"
56
57
58//==========================================================
59// Typedefs & constants.
60//
61
62typedef struct expire_overall_info_s {
63 as_namespace* ns;
64 uint32_t pid;
65 uint32_t now;
66 uint64_t n_0_void_time;
67 uint64_t n_expired;
68} expire_overall_info;
69
70typedef struct expire_per_thread_info_s {
71 as_namespace* ns;
72 as_index_tree* tree;
73 cf_node master; // for XDR reporting
74 uint32_t now;
75 uint64_t n_0_void_time;
76 uint64_t n_expired;
77} expire_per_thread_info;
78
79typedef struct evict_overall_info_s {
80 as_namespace* ns;
81 uint32_t pid;
82 uint32_t i_cpu; // for cold start eviction only
83 uint32_t now;
84 uint32_t evict_void_time;
85 const bool* sets_not_evicting;
86 uint64_t n_0_void_time;
87 uint64_t n_expired;
88 uint64_t n_evicted;
89} evict_overall_info;
90
91typedef struct evict_per_thread_info_s {
92 as_namespace* ns;
93 as_index_tree* tree;
94 cf_node master; // for XDR reporting
95 uint32_t now;
96 uint32_t evict_void_time;
97 const bool* sets_not_evicting;
98 uint64_t n_0_void_time;
99 uint64_t n_expired;
100 uint64_t n_evicted;
101} evict_per_thread_info;
102
103typedef struct prep_evict_per_thread_info_s {
104 as_namespace* ns;
105 uint32_t* p_pid;
106 uint32_t i_cpu; // for cold start eviction only
107 const bool* sets_not_evicting;
108 linear_hist* evict_hist;
109} prep_evict_per_thread_info;
110
111#define SKEW_STOP_SEC 40
112#define SKEW_WARN_SEC 30
113
114#define EVICT_SMD_TIMEOUT (5 * 1000) // 5 seconds
115
116#define EVAL_STOP_WRITES_PERIOD 10 // seconds
117
118#define EVAL_WRITE_STATE_FREQUENCY 1024
119#define COLD_START_HIST_MIN_BUCKETS 100000 // histogram memory is transient
120
121
122//==========================================================
123// Forward declarations.
124//
125
126static bool nsup_smd_conflict_cb(const as_smd_item* existing_item, const as_smd_item* new_item);
127static void nsup_smd_accept_cb(const cf_vector* items, as_smd_accept_type accept_type);
128
129static void* run_expire_or_evict(void* udata);
130
131static void expire(as_namespace* ns);
132static void* run_expire(void* udata);
133static void expire_reduce_cb(as_index_ref* r_ref, void* udata);
134
135static bool evict(as_namespace* ns);
136static void* run_evict(void* udata);
137static void evict_reduce_cb(as_index_ref* r_ref, void* udata);
138
139static bool eval_hwm_breached(as_namespace* ns);
140static uint32_t find_evict_void_time(as_namespace* ns, uint32_t now);
141static void* run_prep_evict(void* udata);
142static void prep_evict_reduce_cb(as_index_ref* r_ref, void* udata);
143
144static void update_stats(as_namespace* ns, uint64_t n_0_void_time, uint64_t n_expired_objects, uint64_t n_evicted_objects, uint64_t start_ms);
145
146static void* run_stop_writes(void* udata);
147static bool eval_stop_writes(as_namespace* ns);
148
149static void* run_nsup_histograms(void* udata);
150static void collect_nsup_histograms(as_namespace* ns);
151static void nsup_histograms_reduce_cb(as_index_ref* r_ref, void* udata);
152
153static bool cold_start_evict(as_namespace* ns);
154static void* run_prep_cold_start_evict(void* udata);
155static uint64_t set_cold_start_threshold(as_namespace* ns, linear_hist* hist);
156static void* run_cold_start_evict(void* udata);
157static void cold_start_evict_reduce_cb(as_index_ref* r_ref, void* udata);
158
159static bool sets_protected(as_namespace* ns);
160static void init_sets_not_evicting(as_namespace* ns, bool sets_not_evicting[]);
161static uint32_t get_ttl_range(as_namespace* ns, uint32_t now);
162
163
164//==========================================================
165// Inlines & macros.
166//
167
168static inline uint32_t
169evict_void_time_from_smd(const as_smd_item* item)
170{
171 return (uint32_t)strtoul(item->value, NULL, 10); // TODO - sanity check?
172}
173
174static inline cf_node
175find_xdr_delete_master(as_namespace* ns, uint32_t pid)
176{
177 if (is_xdr_delete_shipping_enabled() && is_xdr_nsup_deletes_enabled()) {
178 cf_node master = as_partition_writable_node(ns, pid);
179
180 // If master is 0, e.g. unavailable in SC, just log it locally.
181 return master != (cf_node)0 ? master : g_config.self_node;
182 }
183
184 return (cf_node)0;
185}
186
187static inline void
188report_to_xdr(as_namespace* ns, as_record* r, cf_node master)
189{
190 if (master != (cf_node)0) {
191 xdr_write(ns, &r->keyd, 0,
192 master == g_config.self_node ? (cf_node)0 : master,
193 XDR_OP_TYPE_DROP, as_index_get_set_id(r), NULL);
194 }
195}
196
197
198//==========================================================
199// Public API.
200//
201
202void
203as_nsup_init(void)
204{
205 as_smd_module_load(AS_SMD_MODULE_EVICT, nsup_smd_accept_cb,
206 nsup_smd_conflict_cb, NULL);
207}
208
209void
210as_nsup_start(void)
211{
212 cf_info(AS_NSUP, "starting namespace supervisor threads");
213
214 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
215 as_namespace* ns = g_config.namespaces[ns_ix];
216
217 cf_thread_create_detached(run_expire_or_evict, ns);
218 cf_thread_create_detached(run_nsup_histograms, ns);
219 }
220
221 cf_thread_create_detached(run_stop_writes, NULL);
222}
223
224bool
225as_nsup_handle_clock_skew(as_namespace* ns, uint64_t skew_ms)
226{
227 if (ns->nsup_period == 0 || skew_ms < SKEW_WARN_SEC * 1000UL) {
228 return false;
229 }
230
231 if (skew_ms > SKEW_STOP_SEC * 1000UL) {
232 cf_ticker_warning(AS_NSUP, "{%s} clock-skew > %u sec stopped writes",
233 ns->name, SKEW_STOP_SEC);
234 return true;
235 }
236
237 cf_ticker_warning(AS_NSUP, "{%s} clock-skew > %u sec", ns->name,
238 SKEW_WARN_SEC);
239
240 return false;
241}
242
243bool
244as_nsup_eviction_reset_cmd(const char* ns_name, const char* ttl_str)
245{
246 if (ttl_str == NULL) {
247 cf_info(AS_NSUP, "{%s} got command to delete evict-void-time", ns_name);
248
249 if (! as_smd_delete_blocking(AS_SMD_MODULE_EVICT, ns_name, 0)) {
250 cf_warning(AS_NSUP, "{%s} failed delete evict-void-time", ns_name);
251 return false;
252 }
253
254 return true;
255 }
256
257 uint64_t ttl = strtoul(ttl_str, NULL, 0);
258
259 if (ttl > MAX_ALLOWED_TTL) {
260 cf_warning(AS_NSUP, "{%s} command ttl %lu is too big", ns_name, ttl);
261 return false;
262 }
263
264 uint32_t now = as_record_void_time_get();
265 uint32_t evict_void_time = now + (uint32_t)ttl;
266
267 cf_info(AS_NSUP, "{%s} got command to set evict-ttl %lu evict-void-time %u",
268 ns_name, ttl, evict_void_time);
269
270 char value[10 + 1];
271
272 sprintf(value, "%u", evict_void_time);
273
274 if (! as_smd_set_blocking(AS_SMD_MODULE_EVICT, ns_name, value, 0)) {
275 cf_warning(AS_NSUP, "{%s} failed set evict-ttl %lu evict-void-time %u",
276 ns_name, ttl, evict_void_time);
277 return false;
278 }
279
280 return true;
281}
282
283bool
284as_cold_start_evict_if_needed(as_namespace* ns)
285{
286 cf_mutex_lock(&ns->cold_start_evict_lock);
287
288 bool result = cold_start_evict(ns);
289
290 cf_mutex_unlock(&ns->cold_start_evict_lock);
291
292 return result;
293}
294
295
296//==========================================================
297// Local helpers - SMD callbacks.
298//
299
300static bool
301nsup_smd_conflict_cb(const as_smd_item* existing_item,
302 const as_smd_item* new_item)
303{
304 return evict_void_time_from_smd(new_item) >
305 evict_void_time_from_smd(existing_item);
306}
307
308static void
309nsup_smd_accept_cb(const cf_vector* items, as_smd_accept_type accept_type)
310{
311 for (uint32_t i = 0; i < cf_vector_size(items); i++) {
312 as_smd_item* item = cf_vector_get_ptr(items, i);
313 as_namespace* ns = as_namespace_get_byname(item->key);
314
315 if (ns == NULL) {
316 cf_detail(AS_NSUP, "skipping invalid ns");
317 continue;
318 }
319
320 if (item->value != NULL) {
321 ns->smd_evict_void_time = evict_void_time_from_smd(item);
322
323 cf_info(AS_NSUP, "{%s} got smd evict-void-time %u", ns->name,
324 ns->smd_evict_void_time);
325
326 if (accept_type == AS_SMD_ACCEPT_OPT_START) {
327 ns->evict_void_time = ns->smd_evict_void_time;
328 }
329 }
330 else {
331 cf_info(AS_NSUP, "{%s} deleted evict-void-time (%u,%u)", ns->name,
332 ns->evict_void_time, ns->smd_evict_void_time);
333
334 ns->evict_void_time = 0;
335 ns->smd_evict_void_time = 0;
336 }
337 }
338}
339
340
341//==========================================================
342// Local helpers - expiration/eviction control loop.
343//
344
345static void*
346run_expire_or_evict(void* udata)
347{
348 as_namespace* ns = (as_namespace*)udata;
349
350 uint64_t last_time = cf_get_seconds();
351
352 while (true) {
353 sleep(1); // wake up every second to check
354
355 uint64_t period = as_load_uint32(&ns->nsup_period);
356
357 if (period == 0 || ns->clock_skew_stop_writes) {
358 continue;
359 }
360
361 if (evict(ns)) {
362 continue;
363 }
364
365 uint64_t curr_time = cf_get_seconds();
366
367 if (curr_time - last_time < period) {
368 continue;
369 }
370
371 last_time = curr_time;
372
373 if (eval_hwm_breached(ns)) {
374 uint32_t now = as_record_void_time_get();
375 uint32_t evict_void_time = find_evict_void_time(ns, now);
376
377 if (evict_void_time > now) {
378 if (evict_void_time < ns->evict_void_time) {
379 // Unusual, maybe lots of new records with short TTLs ...
380 cf_info(AS_NSUP, "{%s} evict-void-time %u < previous",
381 ns->name, evict_void_time);
382
383 if (! as_smd_delete_blocking(AS_SMD_MODULE_EVICT, ns->name,
384 EVICT_SMD_TIMEOUT)) {
385 cf_warning(AS_NSUP, "{%s} failed delete evict-void-time",
386 ns->name);
387 }
388 }
389
390 char value[10 + 1];
391
392 sprintf(value, "%u", evict_void_time);
393
394 if (! as_smd_set_blocking(AS_SMD_MODULE_EVICT, ns->name, value,
395 EVICT_SMD_TIMEOUT)) {
396 cf_warning(AS_NSUP, "{%s} failed set evict-void-time %u",
397 ns->name, evict_void_time);
398 }
399
400 continue;
401 }
402 // else - evict_void_time is now or 0.
403
404 if (! sets_protected(ns) && evict_void_time == 0) {
405 continue; // no need to expire
406 }
407 // else - expire protected sets, or if evict-void-time is now.
408 }
409
410 expire(ns);
411 }
412
413 return NULL;
414}
415
416
417//==========================================================
418// Local helpers - expire.
419//
420
421static void
422expire(as_namespace* ns)
423{
424 uint64_t start_ms = cf_getms();
425 uint32_t n_threads = as_load_uint32(&ns->n_nsup_threads);
426
427 cf_info(AS_NSUP, "{%s} nsup-start: expire-threads %u", ns->name, n_threads);
428
429 cf_tid tids[n_threads];
430
431 expire_overall_info overall = {
432 .ns = ns,
433 .now = as_record_void_time_get()
434 };
435
436 for (uint32_t i = 0; i < n_threads; i++) {
437 tids[i] = cf_thread_create_joinable(run_expire, (void*)&overall);
438 }
439
440 for (uint32_t i = 0; i < n_threads; i++) {
441 cf_thread_join(tids[i]);
442 }
443
444 update_stats(ns, overall.n_0_void_time, overall.n_expired, 0, start_ms);
445}
446
447static void*
448run_expire(void* udata)
449{
450 expire_overall_info* overall = (expire_overall_info*)udata;
451 as_namespace* ns = overall->ns;
452
453 expire_per_thread_info per_thread = {
454 .ns = ns,
455 .now = overall->now
456 };
457
458 uint32_t pid;
459
460 while ((pid = as_faa_uint32(&overall->pid, 1)) < AS_PARTITIONS) {
461 as_partition_reservation rsv;
462 as_partition_reserve(ns, pid, &rsv);
463
464 per_thread.tree = rsv.tree;
465 per_thread.master = find_xdr_delete_master(ns, pid);
466
467 as_index_reduce_live(rsv.tree, expire_reduce_cb, (void*)&per_thread);
468 as_partition_release(&rsv);
469 }
470
471 as_add_uint64(&overall->n_0_void_time, (int64_t)per_thread.n_0_void_time);
472 as_add_uint64(&overall->n_expired, (int64_t)per_thread.n_expired);
473
474 return NULL;
475}
476
477static void
478expire_reduce_cb(as_index_ref* r_ref, void* udata)
479{
480 as_index* r = r_ref->r;
481 expire_per_thread_info* per_thread = (expire_per_thread_info*)udata;
482 as_namespace* ns = per_thread->ns;
483 uint32_t void_time = r->void_time;
484
485 if (void_time == 0) {
486 per_thread->n_0_void_time++;
487 }
488 else if (per_thread->now > void_time) {
489 report_to_xdr(ns, r, per_thread->master);
490 as_index_delete(per_thread->tree, &r->keyd);
491 per_thread->n_expired++;
492 }
493
494 as_record_done(r_ref, ns);
495}
496
497
498//==========================================================
499// Local helpers - evict.
500//
501
502static bool
503evict(as_namespace* ns)
504{
505 uint32_t evict_void_time = as_load_uint32(&ns->evict_void_time);
506 uint32_t smd_evict_void_time = as_load_uint32(&ns->smd_evict_void_time);
507
508 if (evict_void_time >= smd_evict_void_time) {
509 return false;
510 }
511
512 uint64_t start_ms = cf_getms();
513 uint32_t now = as_record_void_time_get();
514 uint32_t n_threads = as_load_uint32(&ns->n_nsup_threads);
515
516 // For stats, show eviction depth WRT local time. Note - unlikely to be
517 // negative, but theoretically possible - cutoff could have come from
518 // another node, and/or taken very long to calculate/transmit.
519 ns->evict_ttl = (int32_t)(smd_evict_void_time - now);
520
521 cf_info(AS_NSUP, "{%s} nsup-start: evict-threads %u evict-ttl %d evict-void-time (%u,%u)",
522 ns->name, n_threads, ns->evict_ttl, evict_void_time,
523 smd_evict_void_time);
524
525 evict_void_time = smd_evict_void_time;
526
527 if (now > evict_void_time) {
528 evict_void_time = now;
529
530 cf_info(AS_NSUP, "{%s} now (%u) > evict-void-time - using now",
531 ns->name, now);
532 }
533
534 bool sets_not_evicting[AS_SET_MAX_COUNT + 1] = { false };
535 init_sets_not_evicting(ns, sets_not_evicting);
536
537 cf_tid tids[n_threads];
538
539 evict_overall_info overall = {
540 .ns = ns,
541 .now = now,
542 .evict_void_time = evict_void_time,
543 .sets_not_evicting = (const bool*)sets_not_evicting
544 };
545
546 for (uint32_t i = 0; i < n_threads; i++) {
547 tids[i] = cf_thread_create_joinable(run_evict, (void*)&overall);
548 }
549
550 for (uint32_t i = 0; i < n_threads; i++) {
551 cf_thread_join(tids[i]);
552 }
553
554 update_stats(ns, overall.n_0_void_time, overall.n_expired,
555 overall.n_evicted, start_ms);
556
557 ns->evict_void_time = evict_void_time;
558
559 return true;
560}
561
562static void*
563run_evict(void* udata)
564{
565 evict_overall_info* overall = (evict_overall_info*)udata;
566 as_namespace* ns = overall->ns;
567
568 evict_per_thread_info per_thread = {
569 .ns = ns,
570 .now = overall->now,
571 .evict_void_time = overall->evict_void_time,
572 .sets_not_evicting = overall->sets_not_evicting
573 };
574
575 uint32_t pid;
576
577 while ((pid = as_faa_uint32(&overall->pid, 1)) < AS_PARTITIONS) {
578 as_partition_reservation rsv;
579 as_partition_reserve(ns, pid, &rsv);
580
581 per_thread.tree = rsv.tree;
582 per_thread.master = find_xdr_delete_master(ns, pid);
583
584 as_index_reduce_live(rsv.tree, evict_reduce_cb, (void*)&per_thread);
585 as_partition_release(&rsv);
586 }
587
588 as_add_uint64(&overall->n_0_void_time, (int64_t)per_thread.n_0_void_time);
589 as_add_uint64(&overall->n_expired, (int64_t)per_thread.n_expired);
590 as_add_uint64(&overall->n_evicted, (int64_t)per_thread.n_evicted);
591
592 return NULL;
593}
594
595static void
596evict_reduce_cb(as_index_ref* r_ref, void* udata)
597{
598 as_index* r = r_ref->r;
599 evict_per_thread_info* per_thread = (evict_per_thread_info*)udata;
600 as_namespace* ns = per_thread->ns;
601 uint32_t void_time = r->void_time;
602
603 if (void_time == 0) {
604 per_thread->n_0_void_time++;
605 }
606 else if (per_thread->sets_not_evicting[as_index_get_set_id(r)]) {
607 if (per_thread->now > void_time) {
608 report_to_xdr(ns, r, per_thread->master);
609 as_index_delete(per_thread->tree, &r->keyd);
610 per_thread->n_expired++;
611 }
612 }
613 else if (per_thread->evict_void_time > void_time) {
614 report_to_xdr(ns, r, per_thread->master);
615 as_index_delete(per_thread->tree, &r->keyd);
616 per_thread->n_evicted++;
617 }
618
619 as_record_done(r_ref, ns);
620}
621
622
623//==========================================================
624// Local helpers - initiate eviction.
625//
626
627static bool
628eval_hwm_breached(as_namespace* ns)
629{
630 uint64_t index_sz = (ns->n_tombstones + ns->n_objects) * sizeof(as_index);
631
632 uint64_t index_mem_sz = 0;
633 uint64_t index_dev_sz = 0;
634 uint64_t pix_hwm = 0;
635
636 if (as_namespace_index_persisted(ns)) {
637 index_dev_sz = index_sz;
638 pix_hwm = (ns->mounts_size_limit * ns->mounts_hwm_pct) / 100;
639 }
640 else {
641 index_mem_sz = index_sz;
642 }
643
644 uint64_t sindex_sz = ns->n_bytes_sindex_memory;
645 uint64_t data_in_memory_sz = ns->n_bytes_memory;
646 uint64_t memory_sz = index_mem_sz + sindex_sz + data_in_memory_sz;
647 uint64_t mem_hwm = (ns->memory_size * ns->hwm_memory_pct) / 100;
648
649 uint64_t used_disk_sz = 0;
650
651 as_storage_stats(ns, NULL, &used_disk_sz);
652
653 uint64_t ssd_hwm = (ns->ssd_size * ns->hwm_disk_pct) / 100;
654
655 static const char* reasons[] = {
656 NULL, // 0x0
657 "(memory)", // 0x1
658 "(index-device)", // 0x2
659 "(memory & index-device)", // 0x3 (0x1 | 0x2)
660 "(disk)", // 0x4
661 "(memory & disk)", // 0x5 (0x1 | 0x4)
662 "(index-device & disk)", // 0x6 (0x2 | 0x4)
663 "(memory & index-device & disk)" // 0x7 (0x1 | 0x2 | 0x4)
664 };
665
666 uint32_t how_breached = 0x0;
667
668 if (memory_sz > mem_hwm) {
669 how_breached |= 0x1;
670 }
671
672 if (index_dev_sz > pix_hwm) {
673 how_breached |= 0x2;
674 }
675
676 if (used_disk_sz > ssd_hwm) {
677 how_breached |= 0x4;
678 }
679
680 if (how_breached != 0) {
681 cf_warning(AS_NSUP, "{%s} breached eviction hwm %s, memory sz:%lu (%lu + %lu + %lu) hwm:%lu, index-device sz:%lu hwm:%lu, disk sz:%lu hwm:%lu",
682 ns->name, reasons[how_breached],
683 memory_sz, index_mem_sz, sindex_sz, data_in_memory_sz, mem_hwm,
684 index_dev_sz, pix_hwm,
685 used_disk_sz, ssd_hwm);
686
687 ns->hwm_breached = true;
688 return true;
689 }
690
691 cf_debug(AS_NSUP, "{%s} no eviction hwm breached, memory sz:%lu (%lu + %lu + %lu) hwm:%lu, index-device sz:%lu hwm:%lu, disk sz:%lu hwm:%lu",
692 ns->name,
693 memory_sz, index_mem_sz, sindex_sz, data_in_memory_sz, mem_hwm,
694 index_dev_sz, pix_hwm,
695 used_disk_sz, ssd_hwm);
696
697 ns->hwm_breached = false;
698
699 return false;
700}
701
702static uint32_t
703find_evict_void_time(as_namespace* ns, uint32_t now)
704{
705 bool sets_not_evicting[AS_SET_MAX_COUNT + 1] = { false };
706 init_sets_not_evicting(ns, sets_not_evicting);
707
708 uint32_t ttl_range = get_ttl_range(ns, now);
709 uint32_t n_buckets = ns->evict_hist_buckets;
710 linear_hist_reset(ns->evict_hist, now, ttl_range, n_buckets);
711
712 uint32_t n_threads = as_load_uint32(&ns->n_nsup_threads);
713 cf_tid tids[n_threads];
714
715 prep_evict_per_thread_info per_threads[n_threads];
716 uint32_t pid = 0;
717
718 for (uint32_t i = 0; i < n_threads; i++) {
719 prep_evict_per_thread_info* per_thread = &per_threads[i];
720
721 per_thread->ns = ns;
722 per_thread->p_pid = &pid;
723 per_thread->sets_not_evicting = (const bool*)sets_not_evicting;
724 per_thread->evict_hist = linear_hist_create("per-thread-hist",
725 LINEAR_HIST_SECONDS, now, ttl_range, n_buckets);
726
727 tids[i] = cf_thread_create_joinable(run_prep_evict, (void*)per_thread);
728 }
729
730 for (uint32_t i = 0; i < n_threads; i++) {
731 cf_thread_join(tids[i]);
732
733 linear_hist_merge(ns->evict_hist, per_threads[i].evict_hist);
734 linear_hist_destroy(per_threads[i].evict_hist);
735 }
736
737 linear_hist_threshold threshold;
738 uint64_t subtotal = linear_hist_get_threshold_for_fraction(ns->evict_hist,
739 ns->evict_tenths_pct, &threshold);
740 uint32_t evict_void_time = threshold.value;
741
742 if (evict_void_time == 0xFFFFffff) { // looped past all buckets
743 if (subtotal == 0) {
744 cf_warning(AS_NSUP, "{%s} no records eligible for eviction",
745 ns->name);
746 }
747 else {
748 cf_warning(AS_NSUP, "{%s} would evict all %lu records eligible - not evicting!",
749 ns->name, subtotal);
750 }
751
752 return 0;
753 }
754
755 if (subtotal == 0) {
756 cf_warning(AS_NSUP, "{%s} no records below eviction void-time %u - threshold bucket %u, width %u sec, count %lu > target %lu (%.1f pct)",
757 ns->name, evict_void_time, threshold.bucket_index,
758 threshold.bucket_width, threshold.bucket_count,
759 threshold.target_count, (float)ns->evict_tenths_pct / 10.0);
760
761 // If threshold > now and there are no records below it, there's nothing
762 // to expire. But the first bucket is special - if threshold == now,
763 // it's possible entries in the first bucket have expired.
764 return evict_void_time == now ? now : 0;
765 }
766
767 cf_info(AS_NSUP, "{%s} found %lu records eligible for eviction at evict-ttl %u - submitting evict-void-time %u",
768 ns->name, subtotal, evict_void_time - now, evict_void_time);
769
770 return evict_void_time;
771}
772
773static void*
774run_prep_evict(void* udata)
775{
776 prep_evict_per_thread_info* per_thread = (prep_evict_per_thread_info*)udata;
777 uint32_t pid;
778
779 while ((pid = as_faa_uint32(per_thread->p_pid, 1)) < AS_PARTITIONS) {
780 as_partition_reservation rsv;
781 as_partition_reserve(per_thread->ns, pid, &rsv);
782
783 as_index_reduce_live(rsv.tree, prep_evict_reduce_cb, (void*)per_thread);
784 as_partition_release(&rsv);
785 }
786
787 return NULL;
788}
789
790static void
791prep_evict_reduce_cb(as_index_ref* r_ref, void* udata)
792{
793 as_index* r = r_ref->r;
794 prep_evict_per_thread_info* per_thread = (prep_evict_per_thread_info*)udata;
795 uint32_t void_time = r->void_time;
796
797 if (void_time != 0 &&
798 ! per_thread->sets_not_evicting[as_index_get_set_id(r)]) {
799 linear_hist_insert_data_point(per_thread->evict_hist, void_time);
800 }
801
802 as_record_done(r_ref, per_thread->ns);
803}
804
805
806//==========================================================
807// Local helpers - expiration/eviction ticker.
808//
809
810static void
811update_stats(as_namespace* ns, uint64_t n_0_void_time,
812 uint64_t n_expired_objects, uint64_t n_evicted_objects,
813 uint64_t start_ms)
814{
815 ns->non_expirable_objects = n_0_void_time;
816
817 ns->n_expired_objects += n_expired_objects;
818 ns->n_evicted_objects += n_evicted_objects;
819
820 uint64_t total_duration_ms = cf_getms() - start_ms;
821
822 ns->nsup_cycle_duration = (uint32_t)(total_duration_ms / 1000);
823
824 cf_info(AS_NSUP, "{%s} nsup-done: non-expirable %lu expired (%lu,%lu) evicted (%lu,%lu) evict-ttl %d total-ms %lu",
825 ns->name,
826 n_0_void_time,
827 ns->n_expired_objects, n_expired_objects,
828 ns->n_evicted_objects, n_evicted_objects,
829 ns->evict_ttl,
830 total_duration_ms);
831}
832
833
834//==========================================================
835// Local helpers - stop writes.
836//
837
838static void*
839run_stop_writes(void* udata)
840{
841 (void)udata;
842
843 while (true) {
844 sleep(EVAL_STOP_WRITES_PERIOD);
845
846 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
847 eval_stop_writes(g_config.namespaces[ns_ix]);
848 }
849 }
850
851 return NULL;
852}
853
854static bool
855eval_stop_writes(as_namespace* ns)
856{
857 uint64_t mem_stop_writes = (ns->memory_size * ns->stop_writes_pct) / 100;
858
859 int device_avail_pct = 0;
860
861 as_storage_stats(ns, &device_avail_pct, NULL);
862
863 // Note that persisted index is not counted against stop-writes.
864 uint64_t index_mem_sz = as_namespace_index_persisted(ns) ?
865 0 : (ns->n_tombstones + ns->n_objects) * sizeof(as_index);
866 uint64_t sindex_sz = ns->n_bytes_sindex_memory;
867 uint64_t data_in_memory_sz = ns->n_bytes_memory;
868 uint64_t memory_sz = index_mem_sz + sindex_sz + data_in_memory_sz;
869
870 static const char* reasons[] = {
871 NULL, // 0x0
872 "(memory)", // 0x1
873 "(device-avail-pct)", // 0x2
874 "(memory & device-avail-pct)", // 0x3 (0x1 | 0x2)
875 "(xdr-log)", // 0x4
876 "(memory & xdr-log)", // 0x5 (0x1 | 0x4)
877 "(device-avail-pct & xdr-log)", // 0x6 (0x2 | 0x4)
878 "(memory & device-avail-pct & xdr-log)" // 0x7 (0x1 | 0x2 | 0x4)
879 };
880
881 uint32_t why_stopped = 0x0;
882
883 if (memory_sz > mem_stop_writes) {
884 why_stopped |= 0x1;
885 }
886
887 if (device_avail_pct < (int)ns->storage_min_avail_pct) {
888 why_stopped |= 0x2;
889 }
890
891 if (is_xdr_digestlog_low(ns)) {
892 why_stopped |= 0x4;
893 }
894
895 if (why_stopped != 0) {
896 cf_warning(AS_NSUP, "{%s} breached stop-writes limit %s, memory sz:%lu (%lu + %lu + %lu) limit:%lu, disk avail-pct:%d",
897 ns->name, reasons[why_stopped],
898 memory_sz, index_mem_sz, sindex_sz, data_in_memory_sz,
899 mem_stop_writes, device_avail_pct);
900
901 ns->stop_writes = true;
902 return true;
903 }
904
905 cf_debug(AS_NSUP, "{%s} stop-writes limit not breached, memory sz:%lu (%lu + %lu + %lu) limit:%lu, disk avail-pct:%d",
906 ns->name,
907 memory_sz, index_mem_sz, sindex_sz, data_in_memory_sz,
908 mem_stop_writes, device_avail_pct);
909
910 ns->stop_writes = false;
911
912 return false;
913}
914
915
916//==========================================================
917// Local helpers - background histograms.
918//
919
920static void*
921run_nsup_histograms(void* udata)
922{
923 as_namespace* ns = (as_namespace*)udata;
924
925 bool wait = false; // make sure we run once right away on startup
926 uint64_t last_time = 0;
927
928 while (true) {
929 sleep(1); // wake up every second to check
930
931 uint64_t period = ns->nsup_hist_period;
932 uint64_t curr_time = cf_get_seconds();
933
934 if (period == 0 || (wait && curr_time - last_time < period)) {
935 continue;
936 }
937
938 wait = true;
939 last_time = curr_time;
940
941 collect_nsup_histograms(ns);
942 }
943
944 return NULL;
945}
946
947static void
948collect_nsup_histograms(as_namespace* ns)
949{
950 if (ns->n_objects == 0) {
951 return;
952 }
953
954 const char* tag = ns->obj_size_log_hist != NULL ?
955 "ttl & object size" : "ttl";
956
957 cf_info(AS_NSUP, "{%s} collecting %s info ...", ns->name, tag);
958
959 uint32_t now = as_record_void_time_get();
960 uint32_t ttl_range = get_ttl_range(ns, now);
961
962 linear_hist_clear(ns->ttl_hist, now, ttl_range);
963
964 if (ns->obj_size_log_hist != NULL) {
965 histogram_clear(ns->obj_size_log_hist);
966 linear_hist_clear(ns->obj_size_lin_hist, 0,
967 ns->storage_write_block_size);
968 }
969
970 uint32_t num_sets = cf_vmapx_count(ns->p_sets_vmap);
971
972 for (uint32_t j = 0; j < num_sets; j++) {
973 uint32_t set_id = j + 1;
974
975 if (ns->set_ttl_hists[set_id] != NULL) {
976 linear_hist_clear(ns->set_ttl_hists[set_id], now, ttl_range);
977 }
978 else {
979 char hist_name[HISTOGRAM_NAME_SIZE];
980 const char* set_name =
981 as_namespace_get_set_name(ns, (uint16_t)set_id);
982
983 sprintf(hist_name, "{%s}-%s-ttl", ns->name, set_name);
984 ns->set_ttl_hists[set_id] =
985 linear_hist_create(hist_name, LINEAR_HIST_SECONDS, 0, 0,
986 TTL_HIST_NUM_BUCKETS);
987 }
988
989 if (ns->set_obj_size_log_hists[set_id] != NULL) {
990 histogram_clear(ns->set_obj_size_log_hists[set_id]);
991 linear_hist_clear(ns->set_obj_size_lin_hists[set_id], 0,
992 ns->storage_write_block_size);
993 }
994 else if (ns->obj_size_log_hist != NULL) {
995 char hist_name[HISTOGRAM_NAME_SIZE];
996 const char* set_name =
997 as_namespace_get_set_name(ns, (uint16_t)set_id);
998
999 sprintf(hist_name, "{%s}-%s-obj-size-log2", ns->name, set_name);
1000 ns->set_obj_size_log_hists[set_id] =
1001 histogram_create(hist_name, HIST_SIZE);
1002
1003 sprintf(hist_name, "{%s}-%s-obj-size-linear", ns->name, set_name);
1004 ns->set_obj_size_lin_hists[set_id] =
1005 linear_hist_create(hist_name, LINEAR_HIST_SIZE, 0,
1006 ns->storage_write_block_size,
1007 OBJ_SIZE_HIST_NUM_BUCKETS);
1008 }
1009 }
1010
1011 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
1012 as_partition_reservation rsv;
1013 as_partition_reserve(ns, pid, &rsv);
1014
1015 as_index_reduce_live(rsv.tree, nsup_histograms_reduce_cb, (void*)ns);
1016 as_partition_release(&rsv);
1017 }
1018
1019 linear_hist_dump(ns->ttl_hist);
1020 linear_hist_save_info(ns->ttl_hist);
1021
1022 if (ns->obj_size_log_hist != NULL) {
1023 histogram_save_info(ns->obj_size_log_hist);
1024 linear_hist_save_info(ns->obj_size_lin_hist);
1025 }
1026
1027 for (uint32_t j = 0; j < num_sets; j++) {
1028 uint32_t set_id = j + 1;
1029
1030 linear_hist_dump(ns->set_ttl_hists[set_id]);
1031 linear_hist_save_info(ns->set_ttl_hists[set_id]);
1032
1033 if (ns->obj_size_log_hist != NULL) {
1034 histogram_save_info(ns->set_obj_size_log_hists[set_id]);
1035 linear_hist_save_info(ns->set_obj_size_lin_hists[set_id]);
1036 }
1037 }
1038
1039 cf_info(AS_NSUP, "{%s} ... done collecting %s info", ns->name, tag);
1040}
1041
1042static void
1043nsup_histograms_reduce_cb(as_index_ref* r_ref, void* udata)
1044{
1045 as_index* r = r_ref->r;
1046 as_namespace* ns = (as_namespace*)udata;
1047 uint32_t set_id = as_index_get_set_id(r);
1048 linear_hist* set_ttl_hist = ns->set_ttl_hists[set_id];
1049 uint32_t void_time = r->void_time;
1050
1051 linear_hist_insert_data_point(ns->ttl_hist, void_time);
1052
1053 if (set_ttl_hist != NULL) {
1054 linear_hist_insert_data_point(set_ttl_hist, void_time);
1055 }
1056
1057 if (ns->obj_size_log_hist == NULL) {
1058 as_record_done(r_ref, ns);
1059 return;
1060 }
1061
1062 uint32_t size = as_storage_record_size(ns, r);
1063
1064 histogram_insert_raw_unsafe(ns->obj_size_log_hist, size);
1065 linear_hist_insert_data_point(ns->obj_size_lin_hist, size);
1066
1067 histogram* set_obj_size_log_hist = ns->set_obj_size_log_hists[set_id];
1068 linear_hist* set_obj_size_lin_hist = ns->set_obj_size_lin_hists[set_id];
1069
1070 if (set_obj_size_log_hist != NULL) {
1071 histogram_insert_raw_unsafe(set_obj_size_log_hist, size);
1072 linear_hist_insert_data_point(set_obj_size_lin_hist, size);
1073 }
1074
1075 as_record_done(r_ref, ns);
1076}
1077
1078
1079//==========================================================
1080// Local helpers - cold start eviction.
1081//
1082
1083static bool
1084cold_start_evict(as_namespace* ns)
1085{
1086 if (ns->cold_start_record_add_count++ % EVAL_WRITE_STATE_FREQUENCY != 0) {
1087 return true;
1088 }
1089
1090 uint32_t now = as_record_void_time_get();
1091
1092 if (now > ns->cold_start_now) {
1093 ns->cold_start_now = now;
1094 }
1095
1096 if (eval_stop_writes(ns)) {
1097 cf_warning(AS_NSUP, "{%s} hit stop-writes limit", ns->name);
1098 return false;
1099 }
1100
1101 if (! eval_hwm_breached(ns)) {
1102 return true;
1103 }
1104
1105 if (ns->cold_start_eviction_disabled) {
1106 cf_warning(AS_NSUP, "{%s} breached but eviction disabled", ns->name);
1107 return true;
1108 }
1109
1110 cf_info(AS_NSUP, "{%s} cold start building evict histogram ...", ns->name);
1111
1112 uint32_t ttl_range = get_ttl_range(ns, now);
1113 uint32_t n_buckets = ns->evict_hist_buckets > COLD_START_HIST_MIN_BUCKETS ?
1114 ns->evict_hist_buckets : COLD_START_HIST_MIN_BUCKETS;
1115
1116 bool sets_not_evicting[AS_SET_MAX_COUNT + 1] = { false };
1117 init_sets_not_evicting(ns, sets_not_evicting);
1118
1119 uint32_t n_cpus = cf_topo_count_cpus();
1120 cf_tid tids[n_cpus];
1121
1122 prep_evict_per_thread_info per_threads[n_cpus];
1123 uint32_t pid = 0;
1124
1125 for (uint32_t n = 0; n < n_cpus; n++) {
1126 prep_evict_per_thread_info* per_thread = &per_threads[n];
1127
1128 per_thread->ns = ns;
1129 per_thread->p_pid = &pid;
1130 per_thread->i_cpu = n;
1131 per_thread->sets_not_evicting = sets_not_evicting;
1132 per_thread->evict_hist = linear_hist_create("per-thread-hist",
1133 LINEAR_HIST_SECONDS, now, ttl_range, n_buckets);
1134
1135 tids[n] = cf_thread_create_joinable(run_prep_cold_start_evict,
1136 (void*)per_thread);
1137 }
1138
1139 for (uint32_t n = 0; n < n_cpus; n++) {
1140 cf_thread_join(tids[n]);
1141
1142 if (n == 0) {
1143 continue;
1144 }
1145
1146 linear_hist_merge(per_threads[0].evict_hist, per_threads[n].evict_hist);
1147 linear_hist_destroy(per_threads[n].evict_hist);
1148 }
1149
1150 uint64_t n_evictable =
1151 set_cold_start_threshold(ns, per_threads[0].evict_hist);
1152
1153 linear_hist_destroy(per_threads[0].evict_hist);
1154
1155 if (n_evictable == 0) {
1156 cf_warning(AS_NSUP, "{%s} hwm breached but nothing to evict", ns->name);
1157 return true;
1158 }
1159
1160 cf_info(AS_NSUP, "{%s} cold start found %lu records eligible for eviction at evict-ttl %u",
1161 ns->name, n_evictable, ns->evict_void_time - now);
1162
1163 evict_overall_info overall = {
1164 .ns = ns,
1165 .sets_not_evicting = sets_not_evicting
1166 // Note - .now and .expired not needed at startup.
1167 };
1168
1169 for (uint32_t n = 0; n < n_cpus; n++) {
1170 tids[n] = cf_thread_create_joinable(run_cold_start_evict,
1171 (void*)&overall);
1172 }
1173
1174 for (uint32_t n = 0; n < n_cpus; n++) {
1175 cf_thread_join(tids[n]);
1176 }
1177
1178 cf_info(AS_NSUP, "{%s} cold start evicted %lu records, found %lu 0-void-time records",
1179 ns->name, overall.n_evicted, overall.n_0_void_time);
1180
1181 return true;
1182}
1183
1184static void*
1185run_prep_cold_start_evict(void* udata)
1186{
1187 prep_evict_per_thread_info* per_thread = (prep_evict_per_thread_info*)udata;
1188
1189 cf_topo_pin_to_cpu((cf_topo_cpu_index)per_thread->i_cpu);
1190
1191 uint32_t pid;
1192
1193 while ((pid = as_faa_uint32(per_thread->p_pid, 1)) < AS_PARTITIONS) {
1194 // Don't bother with partition reservations - it's startup. Otherwise,
1195 // use the same reduce callback as at runtime.
1196 as_index_reduce_live(per_thread->ns->partitions[pid].tree,
1197 prep_evict_reduce_cb, (void*)per_thread);
1198 }
1199
1200 return NULL;
1201}
1202
1203static uint64_t
1204set_cold_start_threshold(as_namespace* ns, linear_hist* hist)
1205{
1206 linear_hist_threshold threshold;
1207 uint64_t subtotal = linear_hist_get_threshold_for_fraction(hist, ns->evict_tenths_pct, &threshold);
1208 uint32_t evict_void_time = threshold.value;
1209
1210 if (evict_void_time == 0xFFFFffff) { // looped past all buckets
1211 if (subtotal == 0) {
1212 cf_warning(AS_NSUP, "{%s} cold start found no records eligible for eviction",
1213 ns->name);
1214 }
1215 else {
1216 cf_warning(AS_NSUP, "{%s} cold start would evict all %lu records eligible - not evicting!",
1217 ns->name, subtotal);
1218 }
1219
1220 return 0;
1221 }
1222
1223 if (subtotal == 0) {
1224 cf_warning(AS_NSUP, "{%s} cold start found no records below eviction void-time %u - threshold bucket %u, width %u sec, count %lu > target %lu (%.1f pct)",
1225 ns->name, evict_void_time, threshold.bucket_index,
1226 threshold.bucket_width, threshold.bucket_count,
1227 threshold.target_count, (float)ns->evict_tenths_pct / 10.0);
1228
1229 // Unlike at runtime, bottom bucket is not special, no need to expire.
1230 return 0;
1231 }
1232
1233 ns->evict_void_time = evict_void_time;
1234
1235 return subtotal;
1236}
1237
1238static void*
1239run_cold_start_evict(void* udata)
1240{
1241 evict_overall_info* overall = (evict_overall_info*)udata;
1242
1243 cf_topo_pin_to_cpu((cf_topo_cpu_index)as_faa_uint32(&overall->i_cpu, 1));
1244
1245 as_namespace* ns = overall->ns;
1246
1247 evict_per_thread_info per_thread = {
1248 .ns = ns,
1249 .sets_not_evicting = overall->sets_not_evicting
1250 };
1251
1252 uint32_t pid;
1253
1254 while ((pid = as_faa_uint32(&overall->pid, 1)) < AS_PARTITIONS) {
1255 // Don't bother with partition reservations - it's startup.
1256 per_thread.tree = ns->partitions[pid].tree;
1257
1258 as_index_reduce_live(per_thread.tree, cold_start_evict_reduce_cb,
1259 &per_thread);
1260 }
1261
1262 as_add_uint64(&overall->n_0_void_time, (int64_t)per_thread.n_0_void_time);
1263 as_add_uint64(&overall->n_evicted, (int64_t)per_thread.n_evicted);
1264
1265 return NULL;
1266}
1267
1268static void
1269cold_start_evict_reduce_cb(as_index_ref* r_ref, void* udata)
1270{
1271 as_index* r = r_ref->r;
1272 evict_per_thread_info* per_thread = (evict_per_thread_info*)udata;
1273 as_namespace* ns = per_thread->ns;
1274 uint32_t void_time = r->void_time;
1275
1276 if (void_time == 0) {
1277 per_thread->n_0_void_time++;
1278 }
1279 else if (! per_thread->sets_not_evicting[as_index_get_set_id(r)] &&
1280 ns->evict_void_time > void_time) {
1281 as_index_delete(per_thread->tree, &r->keyd);
1282 per_thread->n_evicted++;
1283 }
1284
1285 as_record_done(r_ref, ns);
1286}
1287
1288
1289//==========================================================
1290// Local helpers - generic.
1291//
1292
1293static bool
1294sets_protected(as_namespace* ns)
1295{
1296 uint32_t num_sets = cf_vmapx_count(ns->p_sets_vmap);
1297
1298 for (uint32_t j = 0; j < num_sets; j++) {
1299 as_set* p_set;
1300
1301 if (cf_vmapx_get_by_index(ns->p_sets_vmap, j, (void**)&p_set) !=
1302 CF_VMAPX_OK) {
1303 cf_crash(AS_NSUP, "failed to get set index %u from vmap", j);
1304 }
1305
1306 if (IS_SET_EVICTION_DISABLED(p_set)) {
1307 return true;
1308 }
1309 }
1310
1311 return false;
1312}
1313
1314static void
1315init_sets_not_evicting(as_namespace* ns, bool sets_not_evicting[])
1316{
1317 uint32_t num_sets = cf_vmapx_count(ns->p_sets_vmap);
1318
1319 for (uint32_t j = 0; j < num_sets; j++) {
1320 as_set* p_set;
1321
1322 if (cf_vmapx_get_by_index(ns->p_sets_vmap, j, (void**)&p_set) !=
1323 CF_VMAPX_OK) {
1324 cf_crash(AS_NSUP, "failed to get set index %u from vmap", j);
1325 }
1326
1327 if (IS_SET_EVICTION_DISABLED(p_set)) {
1328 sets_not_evicting[j + 1] = true;
1329 }
1330 }
1331}
1332
1333static uint32_t
1334get_ttl_range(as_namespace* ns, uint32_t now)
1335{
1336 uint32_t max_void_time = 0;
1337
1338 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
1339 // Note - non-masters may or may not have a max-void-time.
1340 uint32_t partition_max_void_time = ns->partitions[pid].max_void_time;
1341
1342 if (partition_max_void_time > max_void_time) {
1343 max_void_time = partition_max_void_time;
1344 }
1345 }
1346
1347 return max_void_time > now ? max_void_time - now : 0;
1348}
1349