1/*
2 * drv_ssd.c
3 *
4 * Copyright (C) 2009-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23/* SYNOPSIS
24 * "file" based storage driver, which applies to both SSD namespaces and, in
25 * some cases, to file-backed main-memory namespaces.
26 */
27
28#include "storage/drv_ssd.h"
29
30#include <fcntl.h>
31#include <errno.h>
32#include <stdbool.h>
33#include <stddef.h>
34#include <stdint.h>
35#include <stdio.h>
36#include <string.h>
37#include <time.h>
38#include <unistd.h>
39#include <linux/fs.h> // for BLKGETSIZE64
40#include <sys/ioctl.h>
41#include <sys/param.h> // for MAX()
42
43#include "aerospike/as_atomic.h"
44#include "citrusleaf/alloc.h"
45#include "citrusleaf/cf_atomic.h"
46#include "citrusleaf/cf_clock.h"
47#include "citrusleaf/cf_digest.h"
48#include "citrusleaf/cf_queue.h"
49#include "citrusleaf/cf_random.h"
50
51#include "bits.h"
52#include "cf_mutex.h"
53#include "cf_thread.h"
54#include "fault.h"
55#include "hist.h"
56#include "vmapx.h"
57
58#include "base/cfg.h"
59#include "base/datamodel.h"
60#include "base/health.h"
61#include "base/index.h"
62#include "base/nsup.h"
63#include "base/proto.h"
64#include "base/secondary_index.h"
65#include "base/truncate.h"
66#include "fabric/partition.h"
67#include "storage/flat.h"
68#include "storage/storage.h"
69#include "transaction/rw_utils.h"
70
71
72//==========================================================
73// Constants.
74//
75
76#define DEFRAG_STARTUP_RESERVE 4
77#define DEFRAG_RUNTIME_RESERVE 4
78
79#define WRITE_IN_PLACE 1
80
81
82//==========================================================
83// Miscellaneous utility functions.
84//
85
86// Get an open file descriptor from the pool, or a fresh one if necessary.
87int
88ssd_fd_get(drv_ssd *ssd)
89{
90 int fd = -1;
91 int rv = cf_queue_pop(ssd->fd_q, (void*)&fd, CF_QUEUE_NOWAIT);
92
93 if (rv != CF_QUEUE_OK) {
94 fd = open(ssd->name, ssd->open_flag, S_IRUSR | S_IWUSR);
95
96 if (-1 == fd) {
97 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)",
98 ssd->name, errno, cf_strerror(errno));
99 }
100 }
101
102 return fd;
103}
104
105
106int
107ssd_fd_cache_get(drv_ssd *ssd)
108{
109 int fd = -1;
110 int rv = cf_queue_pop(ssd->fd_cache_q, (void*)&fd, CF_QUEUE_NOWAIT);
111
112 if (rv != CF_QUEUE_OK) {
113 fd = open(ssd->name, ssd->open_flag & ~(O_DIRECT | O_DSYNC),
114 S_IRUSR | S_IWUSR);
115
116 if (-1 == fd) {
117 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)",
118 ssd->name, errno, cf_strerror(errno));
119 }
120 }
121
122 return fd;
123}
124
125
126int
127ssd_shadow_fd_get(drv_ssd *ssd)
128{
129 int fd = -1;
130 int rv = cf_queue_pop(ssd->shadow_fd_q, (void*)&fd, CF_QUEUE_NOWAIT);
131
132 if (rv != CF_QUEUE_OK) {
133 fd = open(ssd->shadow_name, ssd->open_flag, S_IRUSR | S_IWUSR);
134
135 if (-1 == fd) {
136 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)",
137 ssd->shadow_name, errno, cf_strerror(errno));
138 }
139 }
140
141 return fd;
142}
143
144
145// Save an open file descriptor in the pool
146void
147ssd_fd_put(drv_ssd *ssd, int fd)
148{
149 cf_queue_push(ssd->fd_q, (void*)&fd);
150}
151
152
153static inline void
154ssd_fd_cache_put(drv_ssd *ssd, int fd)
155{
156 cf_queue_push(ssd->fd_cache_q, (void*)&fd);
157}
158
159
160static inline void
161ssd_shadow_fd_put(drv_ssd *ssd, int fd)
162{
163 cf_queue_push(ssd->shadow_fd_q, (void*)&fd);
164}
165
166
167// Decide which device a record belongs on.
168static inline uint32_t
169ssd_get_file_id(drv_ssds *ssds, cf_digest *keyd)
170{
171 return *(uint32_t*)&keyd->digest[DIGEST_STORAGE_BASE_BYTE] % ssds->n_ssds;
172}
173
174
175// Put a wblock on the free queue for reuse.
176static inline void
177push_wblock_to_free_q(drv_ssd *ssd, uint32_t wblock_id)
178{
179 // Can get here before queue created, e.g. cold start replacing records.
180 if (ssd->free_wblock_q == NULL) {
181 return;
182 }
183
184 cf_assert(wblock_id < ssd->n_wblocks, AS_DRV_SSD,
185 "pushing bad wblock_id %d to free_wblock_q", (int32_t)wblock_id);
186
187 cf_queue_push(ssd->free_wblock_q, &wblock_id);
188}
189
190
191// Put a wblock on the defrag queue.
192static inline void
193push_wblock_to_defrag_q(drv_ssd *ssd, uint32_t wblock_id)
194{
195 if (ssd->defrag_wblock_q) { // null until devices are loaded at startup
196 ssd->wblock_state[wblock_id].state = WBLOCK_STATE_DEFRAG;
197 cf_queue_push(ssd->defrag_wblock_q, &wblock_id);
198 cf_atomic64_incr(&ssd->n_defrag_wblock_reads);
199 }
200}
201
202
203static inline bool
204pop_pristine_wblock_id(drv_ssd *ssd, uint32_t* wblock_id)
205{
206 uint32_t id;
207
208 while ((id = as_load_uint32(&ssd->pristine_wblock_id)) < ssd->n_wblocks) {
209 if (as_cas_uint32(&ssd->pristine_wblock_id, id, id + 1)) {
210 *wblock_id = id;
211 return true;
212 }
213 }
214
215 return false; // out of space
216}
217
218
219static inline uint32_t
220num_pristine_wblocks(const drv_ssd *ssd)
221{
222 return ssd->n_wblocks - ssd->pristine_wblock_id;
223}
224
225
226static inline uint32_t
227num_free_wblocks(const drv_ssd *ssd)
228{
229 return cf_queue_sz(ssd->free_wblock_q) + num_pristine_wblocks(ssd);
230}
231
232
233// Available contiguous size.
234static inline uint64_t
235available_size(drv_ssd *ssd)
236{
237 // Note - returns 100% available during cold start, to make it irrelevant in
238 // cold start eviction threshold check.
239
240 return ssd->free_wblock_q != NULL ?
241 (uint64_t)num_free_wblocks(ssd) * ssd->write_block_size :
242 ssd->file_size;
243}
244
245
246// Since UDF writes can't yet unwind on failure, we ensure that they'll succeed
247// by checking before writing on all threads that there's at least one wblock
248// per thread. TODO - deprecate this methodology when everything can unwind.
249static inline uint32_t
250min_free_wblocks(const as_namespace *ns)
251{
252 return g_config.n_service_threads + // client writes
253 g_config.n_fabric_channel_recv_threads[AS_FABRIC_CHANNEL_RW] + // prole writes
254 g_config.n_fabric_channel_recv_threads[AS_FABRIC_CHANNEL_BULK] + // migration writes
255 1 + // always 1 defrag thread
256 DEFRAG_RUNTIME_RESERVE + // reserve for defrag at runtime
257 DEFRAG_STARTUP_RESERVE; // reserve for defrag at startup
258}
259
260
261void
262ssd_release_vacated_wblock(drv_ssd *ssd, uint32_t wblock_id,
263 ssd_wblock_state* p_wblock_state)
264{
265 cf_assert(p_wblock_state->swb == NULL, AS_DRV_SSD,
266 "device %s: wblock-id %u swb not null while defragging",
267 ssd->name, wblock_id);
268
269 cf_assert(p_wblock_state->state == WBLOCK_STATE_DEFRAG, AS_DRV_SSD,
270 "device %s: wblock-id %u state not DEFRAG while defragging",
271 ssd->name, wblock_id);
272
273 int32_t n_vac_dests = cf_atomic32_decr(&p_wblock_state->n_vac_dests);
274
275 if (n_vac_dests > 0) {
276 return;
277 }
278 // else - all wblocks we defragged into have been flushed.
279
280 cf_assert(n_vac_dests == 0, AS_DRV_SSD,
281 "device %s: wblock-id %u vacation destinations underflow",
282 ssd->name, wblock_id);
283
284 cf_mutex_lock(&p_wblock_state->LOCK);
285
286 p_wblock_state->state = WBLOCK_STATE_NONE;
287
288 // Free the wblock if it's empty.
289 if (cf_atomic32_get(p_wblock_state->inuse_sz) == 0 &&
290 // TODO - given assertions above, this condition is superfluous:
291 p_wblock_state->swb == NULL) {
292 push_wblock_to_free_q(ssd, wblock_id);
293 }
294
295 cf_mutex_unlock(&p_wblock_state->LOCK);
296}
297
298
299//------------------------------------------------
300// ssd_write_buf "swb" methods.
301//
302
303#define VACATED_CAPACITY_STEP 128 // allocate in 1K chunks
304
305static inline ssd_write_buf*
306swb_create(drv_ssd *ssd)
307{
308 ssd_write_buf *swb = (ssd_write_buf*)cf_malloc(sizeof(ssd_write_buf));
309
310 swb->buf = cf_valloc(ssd->write_block_size);
311
312 swb->n_vacated = 0;
313 swb->vacated_capacity = VACATED_CAPACITY_STEP;
314 swb->vacated_wblocks =
315 cf_malloc(sizeof(vacated_wblock) * swb->vacated_capacity);
316
317 return swb;
318}
319
320static inline void
321swb_destroy(ssd_write_buf *swb)
322{
323 cf_free(swb->vacated_wblocks);
324 cf_free(swb->buf);
325 cf_free(swb);
326}
327
328static inline void
329swb_reset(ssd_write_buf *swb)
330{
331 swb->skip_post_write_q = false;
332 swb->wblock_id = STORAGE_INVALID_WBLOCK;
333 swb->pos = 0;
334}
335
336#define swb_reserve(_swb) cf_atomic32_incr(&(_swb)->rc)
337
338static inline void
339swb_check_and_reserve(ssd_wblock_state *wblock_state, ssd_write_buf **p_swb)
340{
341 cf_mutex_lock(&wblock_state->LOCK);
342
343 if (wblock_state->swb != NULL) {
344 *p_swb = wblock_state->swb;
345 swb_reserve(*p_swb);
346 }
347
348 cf_mutex_unlock(&wblock_state->LOCK);
349}
350
351static inline void
352swb_release(ssd_write_buf *swb)
353{
354 if (0 == cf_atomic32_decr(&swb->rc)) {
355 swb_reset(swb);
356
357 // Put the swb back on the free queue for reuse.
358 cf_queue_push(swb->ssd->swb_free_q, &swb);
359 }
360}
361
362static inline void
363swb_dereference_and_release(drv_ssd *ssd, uint32_t wblock_id,
364 ssd_write_buf *swb)
365{
366 ssd_wblock_state *wblock_state = &ssd->wblock_state[wblock_id];
367
368 cf_mutex_lock(&wblock_state->LOCK);
369
370 cf_assert(swb == wblock_state->swb, AS_DRV_SSD,
371 "releasing wrong swb! %p (%d) != %p (%d), thread %d",
372 swb, (int32_t)swb->wblock_id, wblock_state->swb,
373 (int32_t)wblock_state->swb->wblock_id, cf_thread_sys_tid());
374
375 swb_release(wblock_state->swb);
376 wblock_state->swb = NULL;
377
378 cf_assert(wblock_state->state != WBLOCK_STATE_DEFRAG, AS_DRV_SSD,
379 "device %s: wblock-id %u state is DEFRAG on swb release", ssd->name,
380 wblock_id);
381
382 uint32_t inuse_sz = cf_atomic32_get(wblock_state->inuse_sz);
383
384 // Free wblock if all three gating conditions hold.
385 if (inuse_sz == 0) {
386 cf_atomic64_incr(&ssd->n_wblock_direct_frees);
387 push_wblock_to_free_q(ssd, wblock_id);
388 }
389 // Queue wblock for defrag if applicable.
390 else if (inuse_sz < ssd->ns->defrag_lwm_size) {
391 push_wblock_to_defrag_q(ssd, wblock_id);
392 }
393
394 cf_mutex_unlock(&wblock_state->LOCK);
395}
396
397ssd_write_buf *
398swb_get(drv_ssd *ssd)
399{
400 ssd_write_buf *swb;
401
402 if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_free_q, &swb, CF_QUEUE_NOWAIT)) {
403 swb = swb_create(ssd);
404 swb->rc = 0;
405 swb->n_writers = 0;
406 swb->dirty = false;
407 swb->skip_post_write_q = false;
408 swb->ssd = ssd;
409 swb->wblock_id = STORAGE_INVALID_WBLOCK;
410 swb->pos = 0;
411 }
412
413 // Find a device block to write to.
414 if (cf_queue_pop(ssd->free_wblock_q, &swb->wblock_id, CF_QUEUE_NOWAIT) !=
415 CF_QUEUE_OK && ! pop_pristine_wblock_id(ssd, &swb->wblock_id)) {
416 cf_queue_push(ssd->swb_free_q, &swb);
417 return NULL;
418 }
419
420 ssd_wblock_state* p_wblock_state = &ssd->wblock_state[swb->wblock_id];
421
422 uint32_t inuse_sz = cf_atomic32_get(p_wblock_state->inuse_sz);
423
424 cf_assert(inuse_sz == 0, AS_DRV_SSD,
425 "device %s: wblock-id %u inuse-size %u off free-q", ssd->name,
426 swb->wblock_id, inuse_sz);
427
428 cf_assert(p_wblock_state->swb == NULL, AS_DRV_SSD,
429 "device %s: wblock-id %u swb not null off free-q", ssd->name,
430 swb->wblock_id);
431
432 cf_assert(p_wblock_state->state != WBLOCK_STATE_DEFRAG, AS_DRV_SSD,
433 "device %s: wblock-id %u state DEFRAG off free-q", ssd->name,
434 swb->wblock_id);
435
436 cf_mutex_lock(&p_wblock_state->LOCK);
437
438 swb_reserve(swb);
439 p_wblock_state->swb = swb;
440
441 cf_mutex_unlock(&p_wblock_state->LOCK);
442
443 return swb;
444}
445
446bool
447swb_add_unique_vacated_wblock(ssd_write_buf* swb, uint32_t src_file_id,
448 uint32_t src_wblock_id)
449{
450 for (uint32_t i = 0; i < swb->n_vacated; i++) {
451 vacated_wblock *vw = &swb->vacated_wblocks[i];
452
453 if (vw->wblock_id == src_wblock_id && vw->file_id == src_file_id) {
454 return false; // already present
455 }
456 }
457
458 if (swb->n_vacated == swb->vacated_capacity) {
459 swb->vacated_capacity += VACATED_CAPACITY_STEP;
460 swb->vacated_wblocks = cf_realloc(swb->vacated_wblocks,
461 sizeof(vacated_wblock) * swb->vacated_capacity);
462 }
463
464 swb->vacated_wblocks[swb->n_vacated].file_id = src_file_id;
465 swb->vacated_wblocks[swb->n_vacated].wblock_id = src_wblock_id;
466 swb->n_vacated++;
467
468 return true; // added to list
469}
470
471void
472swb_release_all_vacated_wblocks(ssd_write_buf* swb)
473{
474 drv_ssds *ssds = (drv_ssds *)swb->ssd->ns->storage_private;
475
476 for (uint32_t i = 0; i < swb->n_vacated; i++) {
477 vacated_wblock *vw = &swb->vacated_wblocks[i];
478
479 drv_ssd *src_ssd = &ssds->ssds[vw->file_id];
480 ssd_wblock_state* wblock_state = &src_ssd->wblock_state[vw->wblock_id];
481
482 ssd_release_vacated_wblock(src_ssd, vw->wblock_id, wblock_state);
483 }
484
485 swb->n_vacated = 0;
486}
487
488//
489// END - ssd_write_buf "swb" methods.
490//------------------------------------------------
491
492
493// Reduce wblock's used size, if result is 0 put it in the "free" pool, if it's
494// below the defrag threshold put it in the defrag queue.
495void
496ssd_block_free(drv_ssd *ssd, uint64_t rblock_id, uint32_t n_rblocks, char *msg)
497{
498 // Determine which wblock we're reducing used size in.
499 uint64_t start_offset = RBLOCK_ID_TO_OFFSET(rblock_id);
500 uint32_t size = N_RBLOCKS_TO_SIZE(n_rblocks);
501 uint32_t wblock_id = OFFSET_TO_WBLOCK_ID(ssd, start_offset);
502 uint32_t end_wblock_id = OFFSET_TO_WBLOCK_ID(ssd, start_offset + size - 1);
503
504 cf_assert(size >= SSD_RECORD_MIN_SIZE, AS_DRV_SSD,
505 "%s: %s: freeing bad size %u rblock_id %lu", ssd->name, msg, size,
506 rblock_id);
507
508 cf_assert(start_offset >= SSD_HEADER_SIZE &&
509 wblock_id < ssd->n_wblocks && wblock_id == end_wblock_id,
510 AS_DRV_SSD, "%s: %s: freeing bad range rblock_id %lu n_rblocks %u",
511 ssd->name, msg, rblock_id, n_rblocks);
512
513 cf_atomic64_sub(&ssd->inuse_size, (int64_t)size);
514
515 ssd_wblock_state *p_wblock_state = &ssd->wblock_state[wblock_id];
516
517 cf_mutex_lock(&p_wblock_state->LOCK);
518
519 int64_t resulting_inuse_sz = cf_atomic32_sub(&p_wblock_state->inuse_sz,
520 (int32_t)size);
521
522 cf_assert(resulting_inuse_sz >= 0 &&
523 resulting_inuse_sz < (int64_t)ssd->write_block_size, AS_DRV_SSD,
524 "%s: %s: wblock %d %s, subtracted %d now %ld", ssd->name, msg,
525 wblock_id, resulting_inuse_sz < 0 ? "over-freed" : "bad inuse_sz",
526 (int32_t)size, resulting_inuse_sz);
527
528 if (p_wblock_state->swb == NULL &&
529 p_wblock_state->state != WBLOCK_STATE_DEFRAG) {
530 // Free wblock if all three gating conditions hold.
531 if (resulting_inuse_sz == 0) {
532 cf_atomic64_incr(&ssd->n_wblock_direct_frees);
533 push_wblock_to_free_q(ssd, wblock_id);
534 }
535 // Queue wblock for defrag if appropriate.
536 else if (resulting_inuse_sz < ssd->ns->defrag_lwm_size) {
537 push_wblock_to_defrag_q(ssd, wblock_id);
538 }
539 }
540
541 cf_mutex_unlock(&p_wblock_state->LOCK);
542}
543
544
545// FIXME - what really to do if n_rblocks on drive doesn't match index?
546void
547defrag_move_record(drv_ssd *src_ssd, uint32_t src_wblock_id,
548 as_flat_record *flat, as_index *r)
549{
550 uint64_t old_rblock_id = r->rblock_id;
551 uint32_t old_n_rblocks = r->n_rblocks;
552
553 drv_ssds *ssds = (drv_ssds*)src_ssd->ns->storage_private;
554
555 // Figure out which device to write to. When replacing an old record, it's
556 // possible this is different from the old device (e.g. if we've added a
557 // fresh device), so derive it from the digest each time.
558 drv_ssd *ssd = &ssds->ssds[ssd_get_file_id(ssds, &flat->keyd)];
559
560 cf_assert(ssd, AS_DRV_SSD, "{%s} null ssd", ssds->ns->name);
561
562 uint32_t ssd_n_rblocks = flat->n_rblocks;
563 uint32_t write_size = N_RBLOCKS_TO_SIZE(ssd_n_rblocks);
564
565 cf_mutex_lock(&ssd->defrag_lock);
566
567 ssd_write_buf *swb = ssd->defrag_swb;
568
569 if (! swb) {
570 swb = swb_get(ssd);
571 ssd->defrag_swb = swb;
572
573 if (! swb) {
574 cf_warning(AS_DRV_SSD, "defrag_move_record: couldn't get swb");
575 cf_mutex_unlock(&ssd->defrag_lock);
576 return;
577 }
578 }
579
580 // Check if there's enough space in defrag buffer - if not, free and zero
581 // any remaining unused space, enqueue it to be flushed to device, and grab
582 // a new buffer.
583 if (write_size > ssd->write_block_size - swb->pos) {
584 if (ssd->write_block_size != swb->pos) {
585 // Clean the end of the buffer before pushing to write queue.
586 memset(swb->buf + swb->pos, 0, ssd->write_block_size - swb->pos);
587 }
588
589 // Enqueue the buffer, to be flushed to device.
590 swb->skip_post_write_q = true;
591 cf_queue_push(ssd->swb_write_q, &swb);
592 cf_atomic64_incr(&ssd->n_defrag_wblock_writes);
593
594 // Get the new buffer.
595 swb = swb_get(ssd);
596 ssd->defrag_swb = swb;
597
598 if (! swb) {
599 cf_warning(AS_DRV_SSD, "defrag_move_record: couldn't get swb");
600 cf_mutex_unlock(&ssd->defrag_lock);
601 return;
602 }
603 }
604
605 memcpy(swb->buf + swb->pos, (const uint8_t*)flat, write_size);
606
607 uint64_t write_offset = WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id) + swb->pos;
608
609 ssd_encrypt(ssd, write_offset, (as_flat_record *)(swb->buf + swb->pos));
610
611 r->file_id = ssd->file_id;
612 r->rblock_id = OFFSET_TO_RBLOCK_ID(write_offset);
613 r->n_rblocks = ssd_n_rblocks;
614
615 swb->pos += write_size;
616
617 cf_atomic64_add(&ssd->inuse_size, (int64_t)write_size);
618 cf_atomic32_add(&ssd->wblock_state[swb->wblock_id].inuse_sz,
619 (int32_t)write_size);
620
621 // If we just defragged into a new destination swb, count it.
622 if (swb_add_unique_vacated_wblock(swb, src_ssd->file_id, src_wblock_id)) {
623 ssd_wblock_state* p_wblock_state =
624 &src_ssd->wblock_state[src_wblock_id];
625
626 cf_atomic32_incr(&p_wblock_state->n_vac_dests);
627 }
628
629 cf_mutex_unlock(&ssd->defrag_lock);
630
631 ssd_block_free(src_ssd, old_rblock_id, old_n_rblocks, "defrag-write");
632}
633
634
635int
636ssd_record_defrag(drv_ssd *ssd, uint32_t wblock_id, as_flat_record *flat,
637 uint64_t rblock_id)
638{
639 as_namespace *ns = ssd->ns;
640 as_partition_reservation rsv;
641 uint32_t pid = as_partition_getid(&flat->keyd);
642
643 as_partition_reserve(ns, pid, &rsv);
644
645 int rv;
646 as_index_ref r_ref;
647 bool found = 0 == as_record_get(rsv.tree, &flat->keyd, &r_ref);
648
649 if (found) {
650 as_index *r = r_ref.r;
651
652 if (r->file_id == ssd->file_id && r->rblock_id == rblock_id) {
653 if (r->generation != flat->generation) {
654 cf_warning_digest(AS_DRV_SSD, &r->keyd, "device %s defrag: rblock_id %lu generation mismatch (%u:%u) ",
655 ssd->name, rblock_id, r->generation, flat->generation);
656 }
657
658 if (r->n_rblocks != flat->n_rblocks) {
659 cf_warning_digest(AS_DRV_SSD, &r->keyd, "device %s defrag: rblock_id %lu n_blocks mismatch (%u:%u) ",
660 ssd->name, rblock_id, r->n_rblocks, flat->n_rblocks);
661 }
662
663 defrag_move_record(ssd, wblock_id, flat, r);
664
665 rv = 0; // record was in index tree and current - moved it
666 }
667 else {
668 rv = -1; // record was in index tree - presumably was overwritten
669 }
670
671 as_record_done(&r_ref, ns);
672 }
673 else {
674 rv = -2; // record was not in index tree - presumably was deleted
675 }
676
677 as_partition_release(&rsv);
678
679 return rv;
680}
681
682
683bool
684ssd_is_full(drv_ssd *ssd, uint32_t wblock_id)
685{
686 if (num_free_wblocks(ssd) > DEFRAG_STARTUP_RESERVE) {
687 return false;
688 }
689
690 ssd_wblock_state* p_wblock_state = &ssd->wblock_state[wblock_id];
691
692 cf_mutex_lock(&p_wblock_state->LOCK);
693
694 if (cf_atomic32_get(p_wblock_state->inuse_sz) == 0) {
695 // Lucky - wblock is empty, let ssd_defrag_wblock() free it.
696 cf_mutex_unlock(&p_wblock_state->LOCK);
697
698 return false;
699 }
700
701 cf_warning(AS_DRV_SSD, "{%s}: defrag: drive %s totally full, re-queuing wblock %u",
702 ssd->ns->name, ssd->name, wblock_id);
703
704 // Not using push_wblock_to_defrag_q() - state is already DEFRAG, we
705 // definitely have a queue, and it's better to push back to head.
706 cf_queue_push_head(ssd->defrag_wblock_q, &wblock_id);
707
708 cf_mutex_unlock(&p_wblock_state->LOCK);
709
710 // If we got here, we used all our runtime reserve wblocks, but the wblocks
711 // we defragged must still have non-zero inuse_sz. Must wait for those to
712 // become free. Sleep prevents retries from overwhelming the log.
713 sleep(1);
714
715 return true;
716}
717
718
719int
720ssd_defrag_wblock(drv_ssd *ssd, uint32_t wblock_id, uint8_t *read_buf)
721{
722 if (ssd_is_full(ssd, wblock_id)) {
723 return 0;
724 }
725
726 int record_count = 0;
727
728 ssd_wblock_state* p_wblock_state = &ssd->wblock_state[wblock_id];
729
730 cf_assert(p_wblock_state->n_vac_dests == 0, AS_DRV_SSD,
731 "n-vacations not 0 beginning defrag wblock");
732
733 // Make sure this can't decrement to 0 while defragging this wblock.
734 cf_atomic32_set(&p_wblock_state->n_vac_dests, 1);
735
736 if (cf_atomic32_get(p_wblock_state->inuse_sz) == 0) {
737 cf_atomic64_incr(&ssd->n_wblock_defrag_io_skips);
738 goto Finished;
739 }
740
741 int fd = ssd_fd_get(ssd);
742 uint64_t file_offset = WBLOCK_ID_TO_OFFSET(ssd, wblock_id);
743
744 uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0;
745
746 if (! pread_all(fd, read_buf, ssd->write_block_size, (off_t)file_offset)) {
747 cf_warning(AS_DRV_SSD, "%s: read failed: errno %d (%s)", ssd->name,
748 errno, cf_strerror(errno));
749 close(fd);
750 fd = -1;
751 goto Finished;
752 }
753
754 if (start_ns != 0) {
755 histogram_insert_data_point(ssd->hist_large_block_read, start_ns);
756 }
757
758 ssd_fd_put(ssd, fd);
759
760 bool prefetch = cf_arenax_want_prefetch(ssd->ns->arena);
761
762 if (prefetch) {
763 ssd_prefetch_wblock(ssd, file_offset, read_buf);
764 }
765
766 size_t indent = 0; // current offset within the wblock, in bytes
767
768 while (indent < ssd->write_block_size &&
769 cf_atomic32_get(p_wblock_state->inuse_sz) != 0) {
770 as_flat_record *flat = (as_flat_record*)&read_buf[indent];
771
772 if (! prefetch) {
773 ssd_decrypt(ssd, file_offset + indent, flat);
774 }
775
776 if (flat->magic != AS_FLAT_MAGIC) {
777 // First block must have magic.
778 if (indent == 0) {
779 cf_warning(AS_DRV_SSD, "%s: no magic at beginning of used wblock %d",
780 ssd->name, wblock_id);
781 break;
782 }
783
784 // Later blocks may have no magic, just skip to next block.
785 indent += RBLOCK_SIZE;
786 continue;
787 }
788
789 uint32_t record_size = N_RBLOCKS_TO_SIZE(flat->n_rblocks);
790
791 if (record_size < SSD_RECORD_MIN_SIZE) {
792 cf_warning(AS_DRV_SSD, "%s: record too small: size %u", ssd->name,
793 record_size);
794 indent += RBLOCK_SIZE;
795 continue; // try next rblock
796 }
797
798 size_t next_indent = indent + record_size;
799
800 if (next_indent > ssd->write_block_size) {
801 cf_warning(AS_DRV_SSD, "%s: record crosses wblock boundary: n-rblocks %u",
802 ssd->name, flat->n_rblocks);
803 break;
804 }
805
806 // Found a good record, move it if it's current.
807 int rv = ssd_record_defrag(ssd, wblock_id, flat,
808 OFFSET_TO_RBLOCK_ID(file_offset + indent));
809
810 if (rv == 0) {
811 record_count++;
812 }
813
814 indent = next_indent;
815 }
816
817Finished:
818
819 // Note - usually wblock's inuse_sz is 0 here, but may legitimately be non-0
820 // e.g. if a dropped partition's tree is not done purging. In this case, we
821 // may have found deleted records in the wblock whose used-size contribution
822 // has not yet been subtracted.
823
824 ssd_release_vacated_wblock(ssd, wblock_id, p_wblock_state);
825
826 return record_count;
827}
828
829
830// Thread "run" function to service a device's defrag queue.
831void*
832run_defrag(void *pv_data)
833{
834 drv_ssd *ssd = (drv_ssd*)pv_data;
835 uint32_t wblock_id;
836 uint8_t *read_buf = cf_valloc(ssd->write_block_size);
837
838 while (true) {
839 uint32_t q_min = ssd->ns->storage_defrag_queue_min;
840
841 if (q_min != 0) {
842 if (cf_queue_sz(ssd->defrag_wblock_q) > q_min) {
843 if (CF_QUEUE_OK !=
844 cf_queue_pop(ssd->defrag_wblock_q, &wblock_id,
845 CF_QUEUE_NOWAIT)) {
846 // Should never get here!
847 break;
848 }
849 }
850 else {
851 usleep(1000 * 50);
852 continue;
853 }
854 }
855 else {
856 if (CF_QUEUE_OK !=
857 cf_queue_pop(ssd->defrag_wblock_q, &wblock_id,
858 CF_QUEUE_FOREVER)) {
859 // Should never get here!
860 break;
861 }
862 }
863
864 ssd_defrag_wblock(ssd, wblock_id, read_buf);
865
866 uint32_t sleep_us = ssd->ns->storage_defrag_sleep;
867
868 if (sleep_us != 0) {
869 usleep(sleep_us);
870 }
871 }
872
873 // Although we ever expect to get here...
874 cf_free(read_buf);
875 cf_warning(AS_DRV_SSD, "device %s: quit defrag - queue error", ssd->name);
876
877 return NULL;
878}
879
880
881void
882ssd_start_defrag_threads(drv_ssds *ssds)
883{
884 cf_info(AS_DRV_SSD, "{%s} starting defrag threads", ssds->ns->name);
885
886 for (int i = 0; i < ssds->n_ssds; i++) {
887 drv_ssd *ssd = &ssds->ssds[i];
888
889 cf_thread_create_detached(run_defrag, (void*)ssd);
890 }
891}
892
893
894//------------------------------------------------
895// defrag_pen class.
896//
897
898#define DEFRAG_PEN_INIT_CAPACITY (8 * 1024)
899
900typedef struct defrag_pen_s {
901 uint32_t n_ids;
902 uint32_t capacity;
903 uint32_t *ids;
904 uint32_t stack_ids[DEFRAG_PEN_INIT_CAPACITY];
905} defrag_pen;
906
907static void
908defrag_pen_init(defrag_pen *pen)
909{
910 pen->n_ids = 0;
911 pen->capacity = DEFRAG_PEN_INIT_CAPACITY;
912 pen->ids = pen->stack_ids;
913}
914
915static void
916defrag_pen_destroy(defrag_pen *pen)
917{
918 if (pen->ids != pen->stack_ids) {
919 cf_free(pen->ids);
920 }
921}
922
923static void
924defrag_pen_add(defrag_pen *pen, uint32_t wblock_id)
925{
926 if (pen->n_ids == pen->capacity) {
927 if (pen->capacity == DEFRAG_PEN_INIT_CAPACITY) {
928 pen->capacity <<= 2;
929 pen->ids = cf_malloc(pen->capacity * sizeof(uint32_t));
930 memcpy(pen->ids, pen->stack_ids, sizeof(pen->stack_ids));
931 }
932 else {
933 pen->capacity <<= 1;
934 pen->ids = cf_realloc(pen->ids, pen->capacity * sizeof(uint32_t));
935 }
936 }
937
938 pen->ids[pen->n_ids++] = wblock_id;
939}
940
941static void
942defrag_pen_transfer(defrag_pen *pen, drv_ssd *ssd)
943{
944 // For speed, "customize" instead of using push_wblock_to_defrag_q()...
945 for (uint32_t i = 0; i < pen->n_ids; i++) {
946 uint32_t wblock_id = pen->ids[i];
947
948 ssd->wblock_state[wblock_id].state = WBLOCK_STATE_DEFRAG;
949 cf_queue_push(ssd->defrag_wblock_q, &wblock_id);
950 }
951}
952
953static void
954defrag_pens_dump(defrag_pen pens[], uint32_t n_pens, const char* ssd_name)
955{
956 char buf[2048];
957 uint32_t n = 0;
958 int pos = sprintf(buf, "%u", pens[n++].n_ids);
959
960 while (n < n_pens) {
961 pos += sprintf(buf + pos, ",%u", pens[n++].n_ids);
962 }
963
964 cf_info(AS_DRV_SSD, "%s init defrag profile: %s", ssd_name, buf);
965}
966
967//
968// END - defrag_pen class.
969//------------------------------------------------
970
971
972// Thread "run" function to create and load a device's (wblock) free & defrag
973// queues at startup. Sorts defrag-eligible wblocks so the most depleted ones
974// are at the head of the defrag queue.
975void*
976run_load_queues(void *pv_data)
977{
978 drv_ssd *ssd = (drv_ssd*)pv_data;
979
980 ssd->free_wblock_q = cf_queue_create(sizeof(uint32_t), true);
981 ssd->defrag_wblock_q = cf_queue_create(sizeof(uint32_t), true);
982
983 as_namespace *ns = ssd->ns;
984 uint32_t lwm_pct = ns->storage_defrag_lwm_pct;
985 uint32_t lwm_size = ns->defrag_lwm_size;
986 defrag_pen pens[lwm_pct];
987
988 for (uint32_t n = 0; n < lwm_pct; n++) {
989 defrag_pen_init(&pens[n]);
990 }
991
992 uint32_t first_id = ssd->first_wblock_id;
993 uint32_t end_id = ssd->pristine_wblock_id;
994
995 // TODO - paranoia - remove eventually.
996 cf_assert(end_id >= first_id && end_id <= ssd->n_wblocks, AS_DRV_SSD,
997 "%s bad pristine-wblock-id %u", ssd->name, end_id);
998
999 for (uint32_t wblock_id = first_id; wblock_id < end_id; wblock_id++) {
1000 uint32_t inuse_sz = ssd->wblock_state[wblock_id].inuse_sz;
1001
1002 if (inuse_sz == 0) {
1003 // Faster than using push_wblock_to_free_q() here...
1004 cf_queue_push(ssd->free_wblock_q, &wblock_id);
1005 }
1006 else if (inuse_sz < lwm_size) {
1007 defrag_pen_add(&pens[(inuse_sz * lwm_pct) / lwm_size], wblock_id);
1008 }
1009 }
1010
1011 defrag_pens_dump(pens, lwm_pct, ssd->name);
1012
1013 for (uint32_t n = 0; n < lwm_pct; n++) {
1014 defrag_pen_transfer(&pens[n], ssd);
1015 defrag_pen_destroy(&pens[n]);
1016 }
1017
1018 ssd->n_defrag_wblock_reads = (uint64_t)cf_queue_sz(ssd->defrag_wblock_q);
1019
1020 return NULL;
1021}
1022
1023
1024void
1025ssd_load_wblock_queues(drv_ssds *ssds)
1026{
1027 cf_info(AS_DRV_SSD, "{%s} loading free & defrag queues", ssds->ns->name);
1028
1029 // Split this task across multiple threads.
1030 cf_tid tids[ssds->n_ssds];
1031
1032 for (int i = 0; i < ssds->n_ssds; i++) {
1033 drv_ssd *ssd = &ssds->ssds[i];
1034
1035 tids[i] = cf_thread_create_joinable(run_load_queues, (void*)ssd);
1036 }
1037
1038 for (int i = 0; i < ssds->n_ssds; i++) {
1039 cf_thread_join(tids[i]);
1040 }
1041 // Now we're single-threaded again.
1042
1043 for (int i = 0; i < ssds->n_ssds; i++) {
1044 drv_ssd *ssd = &ssds->ssds[i];
1045
1046 cf_info(AS_DRV_SSD, "%s init wblocks: pristine-id %u pristine %u free-q %d, defrag-q %d",
1047 ssd->name, ssd->pristine_wblock_id, num_pristine_wblocks(ssd),
1048 cf_queue_sz(ssd->free_wblock_q),
1049 cf_queue_sz(ssd->defrag_wblock_q));
1050 }
1051}
1052
1053
1054void
1055ssd_wblock_init(drv_ssd *ssd)
1056{
1057 uint32_t n_wblocks = (uint32_t)(ssd->file_size / ssd->write_block_size);
1058
1059 cf_info(AS_DRV_SSD, "%s has %u wblocks of size %u", ssd->name, n_wblocks,
1060 ssd->write_block_size);
1061
1062 ssd->n_wblocks = n_wblocks;
1063 ssd->wblock_state = cf_malloc(n_wblocks * sizeof(ssd_wblock_state));
1064
1065 // Device header wblocks' inuse_sz will (also) be 0 but that doesn't matter.
1066 for (uint32_t i = 0; i < n_wblocks; i++) {
1067 ssd_wblock_state * p_wblock_state = &ssd->wblock_state[i];
1068
1069 cf_atomic32_set(&p_wblock_state->inuse_sz, 0);
1070 cf_mutex_init(&p_wblock_state->LOCK);
1071 p_wblock_state->swb = NULL;
1072 p_wblock_state->state = WBLOCK_STATE_NONE;
1073 p_wblock_state->n_vac_dests = 0;
1074 }
1075}
1076
1077
1078//==========================================================
1079// Record reading utilities.
1080//
1081
1082int
1083ssd_read_record(as_storage_rd *rd, bool pickle_only)
1084{
1085 as_namespace *ns = rd->ns;
1086 as_record *r = rd->r;
1087 drv_ssd *ssd = rd->ssd;
1088
1089 if (STORAGE_RBLOCK_IS_INVALID(r->rblock_id)) {
1090 cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: invalid rblock_id ",
1091 ns->name);
1092 return -1;
1093 }
1094
1095 uint64_t record_offset = RBLOCK_ID_TO_OFFSET(r->rblock_id);
1096 uint32_t record_size = N_RBLOCKS_TO_SIZE(r->n_rblocks);
1097 uint64_t record_end_offset = record_offset + record_size;
1098
1099 uint32_t wblock_id = OFFSET_TO_WBLOCK_ID(ssd, record_offset);
1100
1101 if (wblock_id >= ssd->n_wblocks) {
1102 cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: bad offset %lu ",
1103 ns->name, record_offset);
1104 return -1;
1105 }
1106
1107 if (record_size < SSD_RECORD_MIN_SIZE) {
1108 cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: bad record size %u ",
1109 ns->name, record_size);
1110 return -1;
1111 }
1112
1113 if (record_end_offset > WBLOCK_ID_TO_OFFSET(ssd, wblock_id + 1)) {
1114 cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: record size %u crosses wblock boundary ",
1115 ns->name, record_size);
1116 return -1;
1117 }
1118
1119 uint8_t *read_buf = NULL;
1120 as_flat_record *flat = NULL;
1121
1122 ssd_write_buf *swb = NULL;
1123
1124 swb_check_and_reserve(&ssd->wblock_state[wblock_id], &swb);
1125
1126 if (swb) {
1127 // Data is in write buffer, so read it from there.
1128 cf_atomic32_incr(&ns->n_reads_from_cache);
1129
1130 read_buf = cf_malloc(record_size);
1131 flat = (as_flat_record*)read_buf;
1132
1133 int swb_offset = record_offset - WBLOCK_ID_TO_OFFSET(ssd, wblock_id);
1134 memcpy(read_buf, swb->buf + swb_offset, record_size);
1135 swb_release(swb);
1136
1137 ssd_decrypt_whole(ssd, record_offset, r->n_rblocks, flat);
1138 }
1139 else {
1140 // Normal case - data is read from device.
1141 cf_atomic32_incr(&ns->n_reads_from_device);
1142
1143 uint64_t read_offset = BYTES_DOWN_TO_IO_MIN(ssd, record_offset);
1144 uint64_t read_end_offset = BYTES_UP_TO_IO_MIN(ssd, record_end_offset);
1145 size_t read_size = read_end_offset - read_offset;
1146 uint64_t record_buf_indent = record_offset - read_offset;
1147
1148 read_buf = cf_valloc(read_size);
1149
1150 int fd = rd->read_page_cache ? ssd_fd_cache_get(ssd) : ssd_fd_get(ssd);
1151
1152 uint64_t start_ns = ns->storage_benchmarks_enabled ? cf_getns() : 0;
1153 uint64_t start_us = as_health_sample_device_read() ? cf_getus() : 0;
1154
1155 if (! pread_all(fd, read_buf, read_size, (off_t)read_offset)) {
1156 cf_warning(AS_DRV_SSD, "%s: read failed: size %lu: errno %d (%s)",
1157 ssd->name, read_size, errno, cf_strerror(errno));
1158 cf_free(read_buf);
1159 close(fd);
1160 return -1;
1161 }
1162
1163 if (start_ns != 0) {
1164 histogram_insert_data_point(ssd->hist_read, start_ns);
1165 }
1166
1167 as_health_add_device_latency(ns->id, r->file_id, start_us);
1168
1169 if (rd->read_page_cache) {
1170 ssd_fd_cache_put(ssd, fd);
1171 }
1172 else {
1173 ssd_fd_put(ssd, fd);
1174 }
1175
1176 flat = (as_flat_record*)(read_buf + record_buf_indent);
1177 ssd_decrypt_whole(ssd, record_offset, r->n_rblocks, flat);
1178
1179 // Sanity checks.
1180
1181 if (flat->magic != AS_FLAT_MAGIC) {
1182 cf_warning(AS_DRV_SSD, "read: bad block magic offset %lu",
1183 read_offset);
1184 cf_free(read_buf);
1185 return -1;
1186 }
1187
1188 if (flat->n_rblocks != r->n_rblocks) {
1189 cf_warning(AS_DRV_SSD, "read: bad n-rblocks %u %u",
1190 flat->n_rblocks, r->n_rblocks);
1191 cf_free(read_buf);
1192 return -1;
1193 }
1194
1195 if (0 != cf_digest_compare(&flat->keyd, &r->keyd)) {
1196 cf_warning(AS_DRV_SSD, "read: read wrong key: expecting %lx got %lx",
1197 *(uint64_t*)&r->keyd, *(uint64_t*)&flat->keyd);
1198 cf_free(read_buf);
1199 return -1;
1200 }
1201
1202 if (ns->storage_benchmarks_enabled) {
1203 histogram_insert_raw(ns->device_read_size_hist, read_size);
1204 }
1205 }
1206
1207 rd->flat = flat;
1208 rd->read_buf = read_buf; // no need to free read_buf on error now
1209
1210 as_flat_opt_meta opt_meta = { 0 };
1211
1212 rd->flat_end = (const uint8_t*)flat + record_size;
1213 rd->flat_bins = as_flat_unpack_record_meta(flat, rd->flat_end, &opt_meta,
1214 ns->single_bin);
1215
1216 if (! rd->flat_bins) {
1217 cf_warning(AS_DRV_SSD, "read: bad record metadata");
1218 return -1;
1219 }
1220
1221 // After unpacking meta so there's a bit of sanity checking.
1222 if (pickle_only) {
1223 return 0;
1224 }
1225
1226 if (! as_flat_decompress_bins(&opt_meta.cm, rd)) {
1227 cf_warning_digest(AS_DRV_SSD, &r->keyd,
1228 "{%s} read: bad compressed data (%s:%lu) ",
1229 ns->name, ssd->name, record_offset);
1230 return -1;
1231 }
1232
1233 if (opt_meta.key) {
1234 rd->key_size = opt_meta.key_size;
1235 rd->key = opt_meta.key;
1236 }
1237 // else - if updating record without key, leave rd (msg) key to be stored.
1238
1239 rd->flat_n_bins = (uint16_t)opt_meta.n_bins;
1240
1241 return 0;
1242}
1243
1244
1245//==========================================================
1246// Storage API implementation: reading records.
1247//
1248
1249int
1250as_storage_record_load_n_bins_ssd(as_storage_rd *rd)
1251{
1252 if (! as_record_is_live(rd->r)) {
1253 rd->n_bins = 0;
1254 return 0; // no need to read device
1255 }
1256
1257 // If record hasn't been read, read it - sets rd->block_n_bins.
1258 if (! rd->flat && ssd_read_record(rd, false) != 0) {
1259 cf_warning(AS_DRV_SSD, "load_n_bins: failed ssd_read_record()");
1260 return -AS_ERR_UNKNOWN;
1261 }
1262
1263 rd->n_bins = rd->flat_n_bins;
1264
1265 return 0;
1266}
1267
1268
1269int
1270as_storage_record_load_bins_ssd(as_storage_rd *rd)
1271{
1272 if (! as_record_is_live(rd->r)) {
1273 return 0; // no need to read device
1274 }
1275
1276 // If record hasn't been read, read it - sets rd->block_bins and
1277 // rd->block_n_bins.
1278 if (! rd->flat && ssd_read_record(rd, false) != 0) {
1279 cf_warning(AS_DRV_SSD, "load_bins: failed ssd_read_record()");
1280 return -AS_ERR_UNKNOWN;
1281 }
1282
1283 return as_flat_unpack_bins(rd->ns, rd->flat_bins, rd->flat_end,
1284 rd->flat_n_bins, rd->bins);
1285}
1286
1287
1288bool
1289as_storage_record_get_key_ssd(as_storage_rd *rd)
1290{
1291 // If record hasn't been read, read it - sets rd->key_size and rd->key.
1292 if (! rd->flat && ssd_read_record(rd, false) != 0) {
1293 cf_warning(AS_DRV_SSD, "get_key: failed ssd_read_record()");
1294 return false;
1295 }
1296
1297 return true;
1298}
1299
1300
1301bool
1302as_storage_record_get_pickle_ssd(as_storage_rd *rd)
1303{
1304 if (ssd_read_record(rd, true) != 0) {
1305 return false;
1306 }
1307
1308 size_t sz = rd->flat_end - (const uint8_t*)rd->flat;
1309
1310 rd->pickle = cf_malloc(sz);
1311 rd->pickle_sz = (uint32_t)sz;
1312
1313 memcpy(rd->pickle, rd->flat, sz);
1314
1315 return true;
1316}
1317
1318
1319//==========================================================
1320// Record writing utilities.
1321//
1322
1323void
1324ssd_flush_swb(drv_ssd *ssd, ssd_write_buf *swb)
1325{
1326 // Wait for all writers to finish.
1327 while (cf_atomic32_get(swb->n_writers) != 0) {
1328 ;
1329 }
1330
1331 int fd = ssd_fd_get(ssd);
1332 off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id);
1333
1334 uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0;
1335
1336 if (! pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)) {
1337 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)",
1338 ssd->name, errno, cf_strerror(errno));
1339 }
1340
1341 if (start_ns != 0) {
1342 histogram_insert_data_point(ssd->hist_write, start_ns);
1343 }
1344
1345 ssd_fd_put(ssd, fd);
1346}
1347
1348
1349void
1350ssd_shadow_flush_swb(drv_ssd *ssd, ssd_write_buf *swb)
1351{
1352 int fd = ssd_shadow_fd_get(ssd);
1353 off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id);
1354
1355 uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0;
1356
1357 if (! pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)) {
1358 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)",
1359 ssd->shadow_name, errno, cf_strerror(errno));
1360 }
1361
1362 if (start_ns != 0) {
1363 histogram_insert_data_point(ssd->hist_shadow_write, start_ns);
1364 }
1365
1366 ssd_shadow_fd_put(ssd, fd);
1367}
1368
1369
1370void
1371ssd_write_sanity_checks(drv_ssd *ssd, ssd_write_buf *swb)
1372{
1373 ssd_wblock_state* p_wblock_state = &ssd->wblock_state[swb->wblock_id];
1374
1375 cf_assert(p_wblock_state->swb == swb, AS_DRV_SSD,
1376 "device %s: wblock-id %u swb not consistent while writing",
1377 ssd->name, swb->wblock_id);
1378
1379 cf_assert(p_wblock_state->state != WBLOCK_STATE_DEFRAG, AS_DRV_SSD,
1380 "device %s: wblock-id %u state DEFRAG while writing", ssd->name,
1381 swb->wblock_id);
1382}
1383
1384
1385void
1386ssd_post_write(drv_ssd *ssd, ssd_write_buf *swb)
1387{
1388 if (cf_atomic32_get(ssd->ns->storage_post_write_queue) == 0 ||
1389 swb->skip_post_write_q) {
1390 swb_dereference_and_release(ssd, swb->wblock_id, swb);
1391 }
1392 else {
1393 // Transfer swb to post-write queue.
1394 cf_queue_push(ssd->post_write_q, &swb);
1395 }
1396
1397 if (ssd->post_write_q) {
1398 // Release post-write queue swbs if we're over the limit.
1399 while ((uint32_t)cf_queue_sz(ssd->post_write_q) >
1400 cf_atomic32_get(ssd->ns->storage_post_write_queue)) {
1401 ssd_write_buf* cached_swb;
1402
1403 if (CF_QUEUE_OK != cf_queue_pop(ssd->post_write_q, &cached_swb,
1404 CF_QUEUE_NOWAIT)) {
1405 // Should never happen.
1406 cf_warning(AS_DRV_SSD, "device %s: post-write queue pop failed",
1407 ssd->name);
1408 break;
1409 }
1410
1411 swb_dereference_and_release(ssd, cached_swb->wblock_id,
1412 cached_swb);
1413 }
1414 }
1415}
1416
1417
1418// Thread "run" function that flushes write buffers to device.
1419void *
1420run_write(void *arg)
1421{
1422 drv_ssd *ssd = (drv_ssd*)arg;
1423
1424 while (ssd->running) {
1425 ssd_write_buf *swb;
1426
1427 if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_write_q, &swb, 100)) {
1428 continue;
1429 }
1430
1431 // Sanity checks (optional).
1432 ssd_write_sanity_checks(ssd, swb);
1433
1434 // Flush to the device.
1435 ssd_flush_swb(ssd, swb);
1436
1437 if (ssd->shadow_name) {
1438 // Queue for shadow device write.
1439 cf_queue_push(ssd->swb_shadow_q, &swb);
1440 }
1441 else {
1442 // If this swb was a defrag destination, release the sources.
1443 swb_release_all_vacated_wblocks(swb);
1444
1445 // Transfer to post-write queue, or release swb, as appropriate.
1446 ssd_post_write(ssd, swb);
1447 }
1448 } // infinite event loop waiting for block to write
1449
1450 return NULL;
1451}
1452
1453
1454// Thread "run" function that flushes write buffers to shadow device.
1455void *
1456run_shadow(void *arg)
1457{
1458 drv_ssd *ssd = (drv_ssd*)arg;
1459
1460 while (ssd->running) {
1461 ssd_write_buf *swb;
1462
1463 if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_shadow_q, &swb, 100)) {
1464 continue;
1465 }
1466
1467 // Sanity checks (optional).
1468 ssd_write_sanity_checks(ssd, swb);
1469
1470 // Flush to the shadow device.
1471 ssd_shadow_flush_swb(ssd, swb);
1472
1473 // If this swb was a defrag destination, release the sources.
1474 swb_release_all_vacated_wblocks(swb);
1475
1476 // Transfer to post-write queue, or release swb, as appropriate.
1477 ssd_post_write(ssd, swb);
1478 }
1479
1480 return NULL;
1481}
1482
1483
1484void
1485ssd_start_write_threads(drv_ssds *ssds)
1486{
1487 cf_info(AS_DRV_SSD, "{%s} starting write threads", ssds->ns->name);
1488
1489 for (int i = 0; i < ssds->n_ssds; i++) {
1490 drv_ssd *ssd = &ssds->ssds[i];
1491
1492 ssd->write_tid = cf_thread_create_joinable(run_write, (void*)ssd);
1493
1494 if (ssd->shadow_name) {
1495 ssd->shadow_tid = cf_thread_create_joinable(run_shadow, (void*)ssd);
1496 }
1497 }
1498}
1499
1500
1501int
1502ssd_buffer_bins(as_storage_rd *rd)
1503{
1504 as_namespace *ns = rd->ns;
1505 as_record *r = rd->r;
1506 drv_ssd *ssd = rd->ssd;
1507
1508 uint32_t flat_sz = rd->pickle == NULL ?
1509 as_flat_record_size(rd) : rd->orig_pickle_sz;
1510
1511 if (flat_sz > ssd->write_block_size) {
1512 cf_detail_digest(AS_DRV_SSD, &r->keyd, "write: size %u - rejecting ",
1513 flat_sz);
1514 return -AS_ERR_RECORD_TOO_BIG;
1515 }
1516
1517 as_flat_record *flat;
1518
1519 if (rd->pickle == NULL) {
1520 flat = as_flat_compress_bins_and_pack_record(rd, ssd->write_block_size,
1521 &flat_sz);
1522 }
1523 else {
1524 flat = (as_flat_record *)rd->pickle;
1525 flat_sz = rd->pickle_sz;
1526
1527 // Tree IDs are node-local - can't use those sent from other nodes.
1528 flat->tree_id = r->tree_id;
1529 }
1530
1531 // Note - this is the only place where rounding size (up to a multiple of
1532 // RBLOCK_SIZE) is really necessary.
1533 uint32_t write_sz = SIZE_UP_TO_RBLOCK_SIZE(flat_sz);
1534
1535 // Reserve the portion of the current swb where this record will be written.
1536 cf_mutex_lock(&ssd->write_lock);
1537
1538 ssd_write_buf *swb = ssd->current_swb;
1539
1540 if (! swb) {
1541 swb = swb_get(ssd);
1542 ssd->current_swb = swb;
1543
1544 if (! swb) {
1545 cf_warning(AS_DRV_SSD, "write bins: couldn't get swb");
1546 cf_mutex_unlock(&ssd->write_lock);
1547 return -AS_ERR_OUT_OF_SPACE;
1548 }
1549 }
1550
1551 // Check if there's enough space in current buffer - if not, free and zero
1552 // any remaining unused space, enqueue it to be flushed to device, and grab
1553 // a new buffer.
1554 if (write_sz > ssd->write_block_size - swb->pos) {
1555 if (ssd->write_block_size != swb->pos) {
1556 // Clean the end of the buffer before pushing to write queue.
1557 memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos);
1558 }
1559
1560 // Enqueue the buffer, to be flushed to device.
1561 cf_queue_push(ssd->swb_write_q, &swb);
1562 cf_atomic64_incr(&ssd->n_wblock_writes);
1563
1564 // Get the new buffer.
1565 swb = swb_get(ssd);
1566 ssd->current_swb = swb;
1567
1568 if (! swb) {
1569 cf_warning(AS_DRV_SSD, "write bins: couldn't get swb");
1570 cf_mutex_unlock(&ssd->write_lock);
1571 return -AS_ERR_OUT_OF_SPACE;
1572 }
1573 }
1574
1575 uint32_t n_rblocks = ROUNDED_SIZE_TO_N_RBLOCKS(write_sz);
1576 uint32_t swb_pos;
1577 int rv = 0;
1578
1579 if (n_rblocks == r->n_rblocks &&
1580 swb->wblock_id == RBLOCK_ID_TO_WBLOCK_ID(ssd, r->rblock_id) &&
1581 ssd->file_id == r->file_id) {
1582 // Stored size is unchanged, and previous version is in this buffer -
1583 // just overwrite at the previous position.
1584 swb_pos = RBLOCK_ID_TO_OFFSET(r->rblock_id) -
1585 WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id);
1586 rv = WRITE_IN_PLACE;
1587 }
1588 else {
1589 // There's enough space - save the position where this record will be
1590 // written, and advance swb->pos for the next writer.
1591 swb_pos = swb->pos;
1592 swb->pos += write_sz;
1593 }
1594
1595 cf_atomic32_incr(&swb->n_writers);
1596 swb->dirty = true;
1597
1598 cf_mutex_unlock(&ssd->write_lock);
1599 // May now write this record concurrently with others in this swb.
1600
1601 // Flatten data into the block.
1602
1603 as_flat_record *flat_in_swb = (as_flat_record*)&swb->buf[swb_pos];
1604
1605 if (flat == NULL) {
1606 as_flat_pack_record(rd, n_rblocks, flat_in_swb);
1607 }
1608 else {
1609 memcpy(flat_in_swb, flat, flat_sz);
1610 }
1611
1612 // Make a pickle if needed.
1613 if (rd->keep_pickle) {
1614 rd->pickle_sz = flat_sz;
1615 rd->pickle = cf_malloc(flat_sz);
1616 memcpy(rd->pickle, flat_in_swb, flat_sz);
1617 }
1618
1619 uint64_t write_offset = WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id) + swb_pos;
1620
1621 ssd_encrypt(ssd, write_offset, flat_in_swb);
1622
1623 if (rv != WRITE_IN_PLACE) {
1624 r->file_id = ssd->file_id;
1625 r->rblock_id = OFFSET_TO_RBLOCK_ID(write_offset);
1626 r->n_rblocks = n_rblocks;
1627
1628 cf_atomic64_add(&ssd->inuse_size, (int64_t)write_sz);
1629 cf_atomic32_add(&ssd->wblock_state[swb->wblock_id].inuse_sz,
1630 (int32_t)write_sz);
1631 }
1632
1633 // We are finished writing to the buffer.
1634 cf_atomic32_decr(&swb->n_writers);
1635
1636 if (ns->storage_benchmarks_enabled) {
1637 histogram_insert_raw(ns->device_write_size_hist, write_sz);
1638 }
1639
1640 return rv;
1641}
1642
1643
1644int
1645ssd_write(as_storage_rd *rd)
1646{
1647 as_record *r = rd->r;
1648
1649 drv_ssd *old_ssd = NULL;
1650 uint64_t old_rblock_id = 0;
1651 uint32_t old_n_rblocks = 0;
1652
1653 if (STORAGE_RBLOCK_IS_VALID(r->rblock_id)) {
1654 // Replacing an old record.
1655 old_ssd = rd->ssd;
1656 old_rblock_id = r->rblock_id;
1657 old_n_rblocks = r->n_rblocks;
1658 }
1659
1660 drv_ssds *ssds = (drv_ssds*)rd->ns->storage_private;
1661
1662 // Figure out which device to write to. When replacing an old record, it's
1663 // possible this is different from the old device (e.g. if we've added a
1664 // fresh device), so derive it from the digest each time.
1665 rd->ssd = &ssds->ssds[ssd_get_file_id(ssds, &r->keyd)];
1666
1667 cf_assert(rd->ssd, AS_DRV_SSD, "{%s} null ssd", rd->ns->name);
1668
1669 int rv = ssd_write_bins(rd);
1670
1671 if (rv == 0 && old_ssd) {
1672 ssd_block_free(old_ssd, old_rblock_id, old_n_rblocks, "ssd-write");
1673 }
1674 else if (rv == WRITE_IN_PLACE) {
1675 return 0; // no need to free old block - it's reused
1676 }
1677
1678 return rv;
1679}
1680
1681
1682//==========================================================
1683// Storage statistics utilities.
1684//
1685
1686void
1687as_storage_show_wblock_stats(as_namespace *ns)
1688{
1689 if (AS_STORAGE_ENGINE_SSD != ns->storage_type) {
1690 cf_info(AS_DRV_SSD, "Storage engine type must be SSD (%d), not %d.",
1691 AS_STORAGE_ENGINE_SSD, ns->storage_type);
1692 return;
1693 }
1694
1695 if (ns->storage_private) {
1696 drv_ssds *ssds = ns->storage_private;
1697
1698 for (int d = 0; d < ssds->n_ssds; d++) {
1699 int num_free_blocks = 0;
1700 int num_full_blocks = 0;
1701 int num_full_swb = 0;
1702 int num_above_wm = 0;
1703 int num_defraggable = 0;
1704
1705 drv_ssd *ssd = &ssds->ssds[d];
1706 uint32_t lwm_size = ns->defrag_lwm_size;
1707
1708 for (uint32_t i = 0; i < ssd->n_wblocks; i++) {
1709 ssd_wblock_state *wblock_state = &ssd->wblock_state[i];
1710 uint32_t inuse_sz = cf_atomic32_get(wblock_state->inuse_sz);
1711
1712 if (inuse_sz == 0) {
1713 num_free_blocks++;
1714 }
1715 else if (inuse_sz == ssd->write_block_size) {
1716 if (wblock_state->swb != NULL) {
1717 num_full_swb++;
1718 }
1719 else {
1720 num_full_blocks++;
1721 }
1722 }
1723 else {
1724 if (inuse_sz > ssd->write_block_size || inuse_sz < lwm_size) {
1725 cf_info(AS_DRV_SSD, "dev %d, wblock %u, inuse_sz %u, %s swb",
1726 d, i, inuse_sz, wblock_state->swb ? "has" : "no");
1727
1728 num_defraggable++;
1729 }
1730 else {
1731 num_above_wm++;
1732 }
1733 }
1734 }
1735
1736 cf_info(AS_DRV_SSD, "device %s free %d full %d fullswb %d pfull %d defrag %d freeq %d",
1737 ssd->name, num_free_blocks, num_full_blocks, num_full_swb,
1738 num_above_wm, num_defraggable, cf_queue_sz(ssd->free_wblock_q));
1739 }
1740 }
1741 else {
1742 cf_info(AS_DRV_SSD, "no devices");
1743 }
1744}
1745
1746
1747void
1748as_storage_summarize_wblock_stats(as_namespace *ns)
1749{
1750 if (AS_STORAGE_ENGINE_SSD != ns->storage_type) {
1751 cf_info(AS_DRV_SSD, "Storage engine type must be SSD (%d), not %d.",
1752 AS_STORAGE_ENGINE_SSD, ns->storage_type);
1753 return;
1754 }
1755
1756 if (! ns->storage_private) {
1757 cf_info(AS_DRV_SSD, "no devices");
1758 return;
1759 }
1760
1761 drv_ssds *ssds = ns->storage_private;
1762 uint32_t total_num_defraggable = 0;
1763 uint32_t total_num_above_wm = 0;
1764 uint64_t defraggable_sz = 0;
1765 uint64_t non_defraggable_sz = 0;
1766
1767 // Note: This is a sparse array that could be more efficiently stored.
1768 // (In addition, ranges of block sizes could be binned together to
1769 // compress the histogram, rather than using one bin per block size.)
1770 uint32_t *wb_hist = cf_calloc(1, sizeof(uint32_t) * MAX_WRITE_BLOCK_SIZE);
1771
1772 for (uint32_t d = 0; d < ssds->n_ssds; d++) {
1773 drv_ssd *ssd = &ssds->ssds[d];
1774 uint32_t num_free_blocks = 0;
1775 uint32_t num_full_swb = 0;
1776 uint32_t num_full_blocks = 0;
1777 uint32_t lwm_size = ns->defrag_lwm_size;
1778 uint32_t num_defraggable = 0;
1779 uint32_t num_above_wm = 0;
1780
1781 for (uint32_t i = 0; i < ssd->n_wblocks; i++) {
1782 ssd_wblock_state *wblock_state = &ssd->wblock_state[i];
1783 uint32_t inuse_sz = cf_atomic32_get(wblock_state->inuse_sz);
1784
1785 if (inuse_sz > ssd->write_block_size) {
1786 cf_warning(AS_DRV_SSD, "wblock size (%d > %d) too large ~~ not counting in histogram",
1787 inuse_sz, ssd->write_block_size);
1788 }
1789 else {
1790 wb_hist[inuse_sz]++;
1791 }
1792
1793 if (inuse_sz == 0) {
1794 num_free_blocks++;
1795 }
1796 else if (inuse_sz == ssd->write_block_size) {
1797 if (wblock_state->swb != NULL) {
1798 num_full_swb++;
1799 }
1800 else {
1801 num_full_blocks++;
1802 }
1803 }
1804 else if (inuse_sz < lwm_size) {
1805 defraggable_sz += inuse_sz;
1806 num_defraggable++;
1807 }
1808 else {
1809 non_defraggable_sz += inuse_sz;
1810 num_above_wm++;
1811 }
1812 }
1813
1814 total_num_defraggable += num_defraggable;
1815 total_num_above_wm += num_above_wm;
1816
1817 cf_info(AS_DRV_SSD, "device %s free %u full %u fullswb %u pfull %u defrag %u freeq %u",
1818 ssd->name, num_free_blocks, num_full_blocks, num_full_swb,
1819 num_above_wm, num_defraggable, cf_queue_sz(ssd->free_wblock_q));
1820 }
1821
1822 cf_info(AS_DRV_SSD, "WBH: Storage histogram for namespace \"%s\":",
1823 ns->name);
1824 cf_info(AS_DRV_SSD, "WBH: Average wblock size of: defraggable blocks: %lu bytes; nondefraggable blocks: %lu bytes; all blocks: %lu bytes",
1825 defraggable_sz / MAX(1, total_num_defraggable),
1826 non_defraggable_sz / MAX(1, total_num_above_wm),
1827 (defraggable_sz + non_defraggable_sz) /
1828 MAX(1, (total_num_defraggable + total_num_above_wm)));
1829
1830 for (uint32_t i = 0; i < MAX_WRITE_BLOCK_SIZE; i++) {
1831 if (wb_hist[i] > 0) {
1832 cf_info(AS_DRV_SSD, "WBH: %u block%s of size %u bytes",
1833 wb_hist[i], (wb_hist[i] != 1 ? "s" : ""), i);
1834 }
1835 }
1836
1837 cf_free(wb_hist);
1838}
1839
1840
1841//==========================================================
1842// Per-device background jobs.
1843//
1844
1845#define LOG_STATS_INTERVAL_sec 20
1846
1847void
1848ssd_log_stats(drv_ssd *ssd, uint64_t *p_prev_n_total_writes,
1849 uint64_t *p_prev_n_defrag_reads, uint64_t *p_prev_n_defrag_writes,
1850 uint64_t *p_prev_n_defrag_io_skips, uint64_t *p_prev_n_direct_frees,
1851 uint64_t *p_prev_n_tomb_raider_reads)
1852{
1853 uint64_t n_defrag_reads = cf_atomic64_get(ssd->n_defrag_wblock_reads);
1854 uint64_t n_defrag_writes = cf_atomic64_get(ssd->n_defrag_wblock_writes);
1855 uint64_t n_total_writes = cf_atomic64_get(ssd->n_wblock_writes) +
1856 n_defrag_writes;
1857
1858 uint64_t n_defrag_io_skips = cf_atomic64_get(ssd->n_wblock_defrag_io_skips);
1859 uint64_t n_direct_frees = cf_atomic64_get(ssd->n_wblock_direct_frees);
1860
1861 float total_write_rate = (float)(n_total_writes - *p_prev_n_total_writes) /
1862 (float)LOG_STATS_INTERVAL_sec;
1863 float defrag_read_rate = (float)(n_defrag_reads - *p_prev_n_defrag_reads) /
1864 (float)LOG_STATS_INTERVAL_sec;
1865 float defrag_write_rate = (float)(n_defrag_writes - *p_prev_n_defrag_writes) /
1866 (float)LOG_STATS_INTERVAL_sec;
1867
1868 float defrag_io_skip_rate = (float)(n_defrag_io_skips - *p_prev_n_defrag_io_skips) /
1869 (float)LOG_STATS_INTERVAL_sec;
1870 float direct_free_rate = (float)(n_direct_frees - *p_prev_n_direct_frees) /
1871 (float)LOG_STATS_INTERVAL_sec;
1872
1873 uint64_t n_tomb_raider_reads = ssd->n_tomb_raider_reads;
1874 char tomb_raider_str[64];
1875
1876 *tomb_raider_str = 0;
1877
1878 if (n_tomb_raider_reads != 0) {
1879 if (*p_prev_n_tomb_raider_reads > n_tomb_raider_reads) {
1880 *p_prev_n_tomb_raider_reads = 0;
1881 }
1882
1883 float tomb_raider_read_rate =
1884 (float)(n_tomb_raider_reads - *p_prev_n_tomb_raider_reads) /
1885 (float)LOG_STATS_INTERVAL_sec;
1886
1887 sprintf(tomb_raider_str, " tomb-raider-read (%lu,%.1f)",
1888 n_tomb_raider_reads, tomb_raider_read_rate);
1889 }
1890
1891 char shadow_str[64];
1892
1893 *shadow_str = 0;
1894
1895 if (ssd->shadow_name) {
1896 sprintf(shadow_str, " shadow-write-q %d",
1897 cf_queue_sz(ssd->swb_shadow_q));
1898 }
1899
1900 uint32_t free_wblock_q_sz = (uint32_t)cf_queue_sz(ssd->free_wblock_q);
1901 uint32_t n_pristine_wblocks = num_pristine_wblocks(ssd);
1902 uint32_t n_free_wblocks = free_wblock_q_sz + n_pristine_wblocks;
1903
1904 cf_info(AS_DRV_SSD, "{%s} %s: used-bytes %lu free-wblocks %u write-q %d write (%lu,%.1f) defrag-q %d defrag-read (%lu,%.1f) defrag-write (%lu,%.1f)%s%s",
1905 ssd->ns->name, ssd->name,
1906 ssd->inuse_size, n_free_wblocks,
1907 cf_queue_sz(ssd->swb_write_q),
1908 n_total_writes, total_write_rate,
1909 cf_queue_sz(ssd->defrag_wblock_q), n_defrag_reads, defrag_read_rate,
1910 n_defrag_writes, defrag_write_rate,
1911 shadow_str, tomb_raider_str);
1912
1913 cf_detail(AS_DRV_SSD, "{%s} %s: free-wblocks (%u,%u) defrag-io-skips (%lu,%.1f) direct-frees (%lu,%.1f)",
1914 ssd->ns->name, ssd->name,
1915 free_wblock_q_sz, n_pristine_wblocks,
1916 n_defrag_io_skips, defrag_io_skip_rate,
1917 n_direct_frees, direct_free_rate);
1918
1919 *p_prev_n_total_writes = n_total_writes;
1920 *p_prev_n_defrag_reads = n_defrag_reads;
1921 *p_prev_n_defrag_writes = n_defrag_writes;
1922 *p_prev_n_defrag_io_skips = n_defrag_io_skips;
1923 *p_prev_n_direct_frees = n_direct_frees;
1924 *p_prev_n_tomb_raider_reads = n_tomb_raider_reads;
1925
1926 if (n_free_wblocks == 0) {
1927 cf_warning(AS_DRV_SSD, "device %s: out of storage space", ssd->name);
1928 }
1929}
1930
1931
1932void
1933ssd_free_swbs(drv_ssd *ssd)
1934{
1935 // Try to recover swbs, 16 at a time, down to 16.
1936 for (int i = 0; i < 16 && cf_queue_sz(ssd->swb_free_q) > 16; i++) {
1937 ssd_write_buf* swb;
1938
1939 if (CF_QUEUE_OK !=
1940 cf_queue_pop(ssd->swb_free_q, &swb, CF_QUEUE_NOWAIT)) {
1941 break;
1942 }
1943
1944 swb_destroy(swb);
1945 }
1946}
1947
1948
1949void
1950ssd_flush_current_swb(drv_ssd *ssd, uint64_t *p_prev_n_writes)
1951{
1952 uint64_t n_writes = cf_atomic64_get(ssd->n_wblock_writes);
1953
1954 // If there's an active write load, we don't need to flush.
1955 if (n_writes != *p_prev_n_writes) {
1956 *p_prev_n_writes = n_writes;
1957 return;
1958 }
1959
1960 cf_mutex_lock(&ssd->write_lock);
1961
1962 n_writes = cf_atomic64_get(ssd->n_wblock_writes);
1963
1964 // Must check under the lock, could be racing a current swb just queued.
1965 if (n_writes != *p_prev_n_writes) {
1966
1967 cf_mutex_unlock(&ssd->write_lock);
1968
1969 *p_prev_n_writes = n_writes;
1970 return;
1971 }
1972
1973 // Flush the current swb if it isn't empty, and has been written to since
1974 // last flushed.
1975
1976 ssd_write_buf *swb = ssd->current_swb;
1977
1978 if (swb && swb->dirty) {
1979 swb->dirty = false;
1980
1981 // Clean the end of the buffer before flushing.
1982 if (ssd->write_block_size != swb->pos) {
1983 memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos);
1984 }
1985
1986 // Flush it.
1987 ssd_flush_swb(ssd, swb);
1988
1989 if (ssd->shadow_name) {
1990 ssd_shadow_flush_swb(ssd, swb);
1991 }
1992 }
1993
1994 cf_mutex_unlock(&ssd->write_lock);
1995}
1996
1997
1998void
1999ssd_flush_defrag_swb(drv_ssd *ssd, uint64_t *p_prev_n_defrag_writes)
2000{
2001 uint64_t n_defrag_writes = cf_atomic64_get(ssd->n_defrag_wblock_writes);
2002
2003 // If there's an active defrag load, we don't need to flush.
2004 if (n_defrag_writes != *p_prev_n_defrag_writes) {
2005 *p_prev_n_defrag_writes = n_defrag_writes;
2006 return;
2007 }
2008
2009 cf_mutex_lock(&ssd->defrag_lock);
2010
2011 n_defrag_writes = cf_atomic64_get(ssd->n_defrag_wblock_writes);
2012
2013 // Must check under the lock, could be racing a current swb just queued.
2014 if (n_defrag_writes != *p_prev_n_defrag_writes) {
2015
2016 cf_mutex_unlock(&ssd->defrag_lock);
2017
2018 *p_prev_n_defrag_writes = n_defrag_writes;
2019 return;
2020 }
2021
2022 // Flush the defrag swb if it isn't empty, and has been written to since
2023 // last flushed.
2024
2025 ssd_write_buf *swb = ssd->defrag_swb;
2026
2027 if (swb && swb->n_vacated != 0) {
2028 // Clean the end of the buffer before flushing.
2029 if (ssd->write_block_size != swb->pos) {
2030 memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos);
2031 }
2032
2033 // Flush it.
2034 ssd_flush_swb(ssd, swb);
2035
2036 if (ssd->shadow_name) {
2037 ssd_shadow_flush_swb(ssd, swb);
2038 }
2039
2040 // The whole point - free source wblocks.
2041 swb_release_all_vacated_wblocks(swb);
2042 }
2043
2044 cf_mutex_unlock(&ssd->defrag_lock);
2045}
2046
2047
2048// Check all wblocks to load a device's defrag queue at runtime. Triggered only
2049// when defrag-lwm-pct is increased by manual intervention.
2050void
2051ssd_defrag_sweep(drv_ssd *ssd)
2052{
2053 uint32_t first_id = ssd->first_wblock_id;
2054 uint32_t end_id = ssd->n_wblocks;
2055 uint32_t n_queued = 0;
2056
2057 for (uint32_t wblock_id = first_id; wblock_id < end_id; wblock_id++) {
2058 ssd_wblock_state *p_wblock_state = &ssd->wblock_state[wblock_id];
2059
2060 cf_mutex_lock(&p_wblock_state->LOCK);
2061
2062 uint32_t inuse_sz = cf_atomic32_get(p_wblock_state->inuse_sz);
2063
2064 if (p_wblock_state->swb == NULL &&
2065 p_wblock_state->state != WBLOCK_STATE_DEFRAG &&
2066 inuse_sz != 0 &&
2067 inuse_sz < ssd->ns->defrag_lwm_size) {
2068 push_wblock_to_defrag_q(ssd, wblock_id);
2069 n_queued++;
2070 }
2071
2072 cf_mutex_unlock(&p_wblock_state->LOCK);
2073 }
2074
2075 cf_info(AS_DRV_SSD, "... %s sweep queued %u wblocks for defrag", ssd->name,
2076 n_queued);
2077}
2078
2079
2080static inline uint64_t
2081next_time(uint64_t now, uint64_t job_interval, uint64_t next)
2082{
2083 uint64_t next_job = now + job_interval;
2084
2085 return next_job < next ? next_job : next;
2086}
2087
2088
2089// All in microseconds since we're using usleep().
2090#define MAX_INTERVAL (1000 * 1000)
2091#define LOG_STATS_INTERVAL (1000 * 1000 * LOG_STATS_INTERVAL_sec)
2092#define FREE_SWBS_INTERVAL (1000 * 1000 * 20)
2093
2094// Thread "run" function to perform various background jobs per device.
2095void *
2096run_ssd_maintenance(void *udata)
2097{
2098 drv_ssd *ssd = (drv_ssd*)udata;
2099 as_namespace *ns = ssd->ns;
2100
2101 uint64_t prev_n_total_writes = 0;
2102 uint64_t prev_n_defrag_reads = 0;
2103 uint64_t prev_n_defrag_writes = 0;
2104 uint64_t prev_n_defrag_io_skips = 0;
2105 uint64_t prev_n_direct_frees = 0;
2106 uint64_t prev_n_tomb_raider_reads = 0;
2107
2108 uint64_t prev_n_writes_flush = 0;
2109
2110 uint64_t prev_n_defrag_writes_flush = 0;
2111
2112 uint64_t now = cf_getus();
2113 uint64_t next = now + MAX_INTERVAL;
2114
2115 uint64_t prev_log_stats = now;
2116 uint64_t prev_free_swbs = now;
2117 uint64_t prev_flush = now;
2118 uint64_t prev_defrag_flush = now;
2119
2120 // If any job's (initial) interval is less than MAX_INTERVAL and we want it
2121 // done on its interval the first time through, add a next_time() call for
2122 // that job here to adjust 'next'. (No such jobs for now.)
2123
2124 uint64_t sleep_us = next - now;
2125
2126 while (true) {
2127 usleep((uint32_t)sleep_us);
2128
2129 now = cf_getus();
2130 next = now + MAX_INTERVAL;
2131
2132 if (now >= prev_log_stats + LOG_STATS_INTERVAL) {
2133 ssd_log_stats(ssd, &prev_n_total_writes, &prev_n_defrag_reads,
2134 &prev_n_defrag_writes, &prev_n_defrag_io_skips,
2135 &prev_n_direct_frees, &prev_n_tomb_raider_reads);
2136 prev_log_stats = now;
2137 next = next_time(now, LOG_STATS_INTERVAL, next);
2138 }
2139
2140 if (now >= prev_free_swbs + FREE_SWBS_INTERVAL) {
2141 ssd_free_swbs(ssd);
2142 prev_free_swbs = now;
2143 next = next_time(now, FREE_SWBS_INTERVAL, next);
2144 }
2145
2146 uint64_t flush_max_us = ssd_flush_max_us(ns);
2147
2148 if (flush_max_us != 0 && now >= prev_flush + flush_max_us) {
2149 ssd_flush_current_swb(ssd, &prev_n_writes_flush);
2150 prev_flush = now;
2151 next = next_time(now, flush_max_us, next);
2152 }
2153
2154 static const uint64_t DEFRAG_FLUSH_MAX_US = 3UL * 1000 * 1000; // 3 sec
2155
2156 if (now >= prev_defrag_flush + DEFRAG_FLUSH_MAX_US) {
2157 ssd_flush_defrag_swb(ssd, &prev_n_defrag_writes_flush);
2158 prev_defrag_flush = now;
2159 next = next_time(now, DEFRAG_FLUSH_MAX_US, next);
2160 }
2161
2162 if (cf_atomic32_get(ssd->defrag_sweep) != 0) {
2163 // May take long enough to mess up other jobs' schedules, but it's a
2164 // very rare manually-triggered intervention.
2165 ssd_defrag_sweep(ssd);
2166 cf_atomic32_decr(&ssd->defrag_sweep);
2167 }
2168
2169 now = cf_getus(); // refresh in case jobs took significant time
2170 sleep_us = next > now ? next - now : 1;
2171 }
2172
2173 return NULL;
2174}
2175
2176
2177void
2178ssd_start_maintenance_threads(drv_ssds *ssds)
2179{
2180 cf_info(AS_DRV_SSD, "{%s} starting device maintenance threads",
2181 ssds->ns->name);
2182
2183 for (int i = 0; i < ssds->n_ssds; i++) {
2184 drv_ssd* ssd = &ssds->ssds[i];
2185
2186 cf_thread_create_detached(run_ssd_maintenance, (void*)ssd);
2187 }
2188}
2189
2190
2191//==========================================================
2192// Device header utilities.
2193//
2194
2195ssd_device_header *
2196ssd_read_header(drv_ssd *ssd)
2197{
2198 as_namespace *ns = ssd->ns;
2199
2200 bool use_shadow = ns->cold_start && ssd->shadow_name;
2201
2202 const char *ssd_name;
2203 int fd;
2204 size_t read_size;
2205
2206 if (use_shadow) {
2207 ssd_name = ssd->shadow_name;
2208 fd = ssd_shadow_fd_get(ssd);
2209 read_size = BYTES_UP_TO_SHADOW_IO_MIN(ssd, sizeof(ssd_device_header));
2210 }
2211 else {
2212 ssd_name = ssd->name;
2213 fd = ssd_fd_get(ssd);
2214 read_size = BYTES_UP_TO_IO_MIN(ssd, sizeof(ssd_device_header));
2215 }
2216
2217 ssd_device_header *header = cf_valloc(read_size);
2218
2219 if (! pread_all(fd, (void*)header, read_size, 0)) {
2220 cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)", ssd_name, errno,
2221 cf_strerror(errno));
2222 }
2223
2224 ssd_common_prefix *prefix = &header->common.prefix;
2225
2226 if (prefix->magic == SSD_HEADER_OLD_MAGIC) {
2227 cf_crash(AS_DRV_SSD, "%s: Aerospike device has old format - must erase device to upgrade",
2228 ssd_name);
2229 }
2230
2231 // Normal path for a fresh drive.
2232 if (prefix->magic != SSD_HEADER_MAGIC) {
2233 cf_detail(AS_DRV_SSD, "%s: bad magic - fresh drive?", ssd_name);
2234 cf_free(header);
2235 use_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd);
2236 return NULL;
2237 }
2238
2239 if (prefix->version != SSD_VERSION) {
2240 cf_crash(AS_DRV_SSD, "%s: unknown version %u", ssd_name,
2241 prefix->version);
2242 }
2243
2244 if (strcmp(prefix->namespace, ns->name) != 0) {
2245 cf_crash(AS_DRV_SSD, "%s: previous namespace %s now %s - check config or erase device",
2246 ssd_name, prefix->namespace, ns->name);
2247 }
2248
2249 if (prefix->n_devices > AS_STORAGE_MAX_DEVICES) {
2250 cf_crash(AS_DRV_SSD, "%s: bad n-devices %u", ssd_name,
2251 prefix->n_devices);
2252 }
2253
2254 if (prefix->random == 0) {
2255 cf_crash(AS_DRV_SSD, "%s: random signature is 0", ssd_name);
2256 }
2257
2258 if (prefix->write_block_size == 0 ||
2259 ns->storage_write_block_size % prefix->write_block_size != 0) {
2260 cf_crash(AS_DRV_SSD, "%s: can't change write-block-size from %u to %u",
2261 ssd_name, prefix->write_block_size,
2262 ns->storage_write_block_size);
2263 }
2264
2265 if (header->unique.device_id >= AS_STORAGE_MAX_DEVICES) {
2266 cf_crash(AS_DRV_SSD, "%s: bad device-id %u", ssd_name,
2267 header->unique.device_id);
2268 }
2269
2270 ssd_header_validate_cfg(ns, ssd, header);
2271
2272 if (header->unique.pristine_offset != 0 && // always 0 before 4.6
2273 (header->unique.pristine_offset < SSD_HEADER_SIZE ||
2274 header->unique.pristine_offset > ssd->file_size)) {
2275 cf_crash(AS_DRV_SSD, "%s: bad pristine offset %lu", ssd_name,
2276 header->unique.pristine_offset);
2277 }
2278
2279 // In case we're increasing write-block-size - ensure new value is recorded.
2280 prefix->write_block_size = ns->storage_write_block_size;
2281
2282 use_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd);
2283
2284 return header;
2285}
2286
2287
2288ssd_device_header *
2289ssd_init_header(as_namespace *ns, drv_ssd *ssd)
2290{
2291 ssd_device_header *header = cf_malloc(sizeof(ssd_device_header));
2292
2293 memset(header, 0, sizeof(ssd_device_header));
2294
2295 ssd_common_prefix *prefix = &header->common.prefix;
2296
2297 // Set non-zero common fields.
2298 prefix->magic = SSD_HEADER_MAGIC;
2299 prefix->version = SSD_VERSION;
2300 strcpy(prefix->namespace, ns->name);
2301 prefix->write_block_size = ns->storage_write_block_size;
2302
2303 ssd_header_init_cfg(ns, ssd, header);
2304
2305 return header;
2306}
2307
2308
2309void
2310ssd_empty_header(int fd, const char* device_name)
2311{
2312 void *h = cf_valloc(SSD_HEADER_SIZE);
2313
2314 memset(h, 0, SSD_HEADER_SIZE);
2315
2316 if (! pwrite_all(fd, h, SSD_HEADER_SIZE, 0)) {
2317 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)",
2318 device_name, errno, cf_strerror(errno));
2319 }
2320
2321 cf_free(h);
2322}
2323
2324
2325void
2326ssd_write_header(drv_ssd *ssd, uint8_t *header, uint8_t *from, size_t size)
2327{
2328 off_t offset = from - header;
2329
2330 off_t flush_offset = BYTES_DOWN_TO_IO_MIN(ssd, offset);
2331 off_t flush_end_offset = BYTES_UP_TO_IO_MIN(ssd, offset + size);
2332
2333 uint8_t *flush = header + flush_offset;
2334 size_t flush_sz = flush_end_offset - flush_offset;
2335
2336 int fd = ssd_fd_get(ssd);
2337
2338 if (! pwrite_all(fd, (void*)flush, flush_sz, flush_offset)) {
2339 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)",
2340 ssd->name, errno, cf_strerror(errno));
2341 }
2342
2343 ssd_fd_put(ssd, fd);
2344
2345 if (! ssd->shadow_name) {
2346 return;
2347 }
2348
2349 flush_offset = BYTES_DOWN_TO_SHADOW_IO_MIN(ssd, offset);
2350 flush_end_offset = BYTES_UP_TO_SHADOW_IO_MIN(ssd, offset + size);
2351
2352 flush = header + flush_offset;
2353 flush_sz = flush_end_offset - flush_offset;
2354
2355 fd = ssd_shadow_fd_get(ssd);
2356
2357 if (! pwrite_all(fd, (void*)flush, flush_sz, flush_offset)) {
2358 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)",
2359 ssd->shadow_name, errno, cf_strerror(errno));
2360 }
2361
2362 ssd_shadow_fd_put(ssd, fd);
2363}
2364
2365
2366//==========================================================
2367// Cold start utilities.
2368//
2369
2370bool
2371prefer_existing_record(drv_ssd* ssd, const as_flat_record* flat,
2372 uint32_t block_void_time, const as_index* r)
2373{
2374 int result = as_record_resolve_conflict(ssd_cold_start_policy(ssd->ns),
2375 r->generation, r->last_update_time,
2376 flat->generation, flat->last_update_time);
2377
2378 if (result != 0) {
2379 return result == -1; // -1 means block record < existing record
2380 }
2381
2382 // Finally, compare void-times. Note that defragged records will generate
2383 // identical copies on drive, so they'll get here and return true.
2384 return r->void_time == 0 ||
2385 (block_void_time != 0 && block_void_time <= r->void_time);
2386}
2387
2388
2389bool
2390is_set_evictable(as_namespace* ns, const as_flat_opt_meta* opt_meta)
2391{
2392 if (! opt_meta->set_name) {
2393 return true;
2394 }
2395
2396 as_set *p_set;
2397
2398 if (cf_vmapx_get_by_name_w_len(ns->p_sets_vmap, opt_meta->set_name,
2399 opt_meta->set_name_len, (void**)&p_set) != CF_VMAPX_OK) {
2400 return true;
2401 }
2402
2403 return ! IS_SET_EVICTION_DISABLED(p_set);
2404}
2405
2406
2407void
2408apply_opt_meta(as_record* r, as_namespace* ns, const as_flat_opt_meta* opt_meta)
2409{
2410 // Set record's set-id. (If it already has one, assume they're the same.)
2411 if (as_index_get_set_id(r) == INVALID_SET_ID && opt_meta->set_name) {
2412 as_index_set_set_w_len(r, ns, opt_meta->set_name,
2413 opt_meta->set_name_len, false);
2414 }
2415
2416 // Store or drop the key according to the props we read.
2417 as_record_finalize_key(r, ns, opt_meta->key, opt_meta->key_size);
2418}
2419
2420
2421// Add a record just read from drive to the index, if all is well.
2422void
2423ssd_cold_start_add_record(drv_ssds* ssds, drv_ssd* ssd,
2424 const as_flat_record* flat, uint64_t rblock_id, uint32_t record_size)
2425{
2426 uint32_t pid = as_partition_getid(&flat->keyd);
2427
2428 // If this isn't a partition we're interested in, skip this record.
2429 if (! ssds->get_state_from_storage[pid]) {
2430 return;
2431 }
2432
2433 as_namespace* ns = ssds->ns;
2434 as_partition* p_partition = &ns->partitions[pid];
2435
2436 const uint8_t* end = (const uint8_t*)flat + record_size;
2437 as_flat_opt_meta opt_meta = { 0 };
2438
2439 const uint8_t* p_read = as_flat_unpack_record_meta(flat, end, &opt_meta,
2440 ns->single_bin);
2441
2442 if (! p_read) {
2443 cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad metadata for record ");
2444 return;
2445 }
2446
2447 if (opt_meta.void_time > ns->startup_max_void_time) {
2448 cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad flat record void-time ");
2449 return;
2450 }
2451
2452 if (! as_flat_decompress_buffer(&opt_meta.cm, ns->storage_write_block_size,
2453 &p_read, &end)) {
2454 cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad compressed data for record ");
2455 return;
2456 }
2457
2458 if (! as_flat_check_packed_bins(p_read, end, opt_meta.n_bins,
2459 ns->single_bin)) {
2460 cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad flat record ");
2461 return;
2462 }
2463
2464 // Ignore record if it was in a dropped tree.
2465 if (flat->tree_id != p_partition->tree_id) {
2466 return;
2467 }
2468
2469 // Ignore records that were truncated.
2470 if (as_truncate_lut_is_truncated(flat->last_update_time, ns,
2471 opt_meta.set_name, opt_meta.set_name_len)) {
2472 return;
2473 }
2474
2475 // If eviction is necessary, evict previously added records closest to
2476 // expiration. (If evicting, this call will block for a long time.) This
2477 // call may also update the cold start threshold void-time.
2478 if (! as_cold_start_evict_if_needed(ns)) {
2479 cf_crash(AS_DRV_SSD, "hit stop-writes limit before drive scan completed");
2480 }
2481
2482 // Get/create the record from/in the appropriate index tree.
2483 as_index_ref r_ref;
2484 int rv = as_record_get_create(p_partition->tree, &flat->keyd, &r_ref, ns);
2485
2486 if (rv < 0) {
2487 cf_detail_digest(AS_DRV_SSD, &flat->keyd, "record-add as_record_get_create() failed ");
2488 return;
2489 }
2490
2491 bool is_create = rv == 1;
2492
2493 as_index* r = r_ref.r;
2494
2495 if (! is_create) {
2496 // Record already existed. Ignore this one if existing record is newer.
2497 if (prefer_existing_record(ssd, flat, opt_meta.void_time, r)) {
2498 ssd_cold_start_adjust_cenotaph(ns, flat->has_bins == 1,
2499 opt_meta.void_time, r);
2500 as_record_done(&r_ref, ns);
2501 ssd->record_add_older_counter++;
2502 return;
2503 }
2504 }
2505 // The record we're now reading is the latest version (so far) ...
2506
2507 // Skip records that have expired.
2508 if (opt_meta.void_time != 0 && ns->cold_start_now > opt_meta.void_time) {
2509 as_index_delete(p_partition->tree, &flat->keyd);
2510 as_record_done(&r_ref, ns);
2511 ssd->record_add_expired_counter++;
2512 return;
2513 }
2514
2515 // Skip records that were evicted.
2516 if (opt_meta.void_time != 0 && ns->evict_void_time > opt_meta.void_time &&
2517 is_set_evictable(ns, &opt_meta)) {
2518 as_index_delete(p_partition->tree, &flat->keyd);
2519 as_record_done(&r_ref, ns);
2520 ssd->record_add_evicted_counter++;
2521 return;
2522 }
2523
2524 // We'll keep the record we're now reading ...
2525
2526 ssd_cold_start_init_repl_state(ns, r);
2527
2528 // Set/reset the record's last-update-time generation, and void-time.
2529 r->last_update_time = flat->last_update_time;
2530 r->generation = flat->generation;
2531 r->void_time = opt_meta.void_time;
2532
2533 // Update maximum void-time.
2534 cf_atomic32_setmax(&p_partition->max_void_time, (int32_t)r->void_time);
2535
2536 // If data is in memory, load bins and particles, adjust secondary index.
2537 if (ns->storage_data_in_memory) {
2538 as_storage_rd rd;
2539
2540 if (is_create) {
2541 as_storage_record_create(ns, r, &rd);
2542 }
2543 else {
2544 as_storage_record_open(ns, r, &rd);
2545 }
2546
2547 as_storage_rd_load_n_bins(&rd);
2548 as_storage_rd_load_bins(&rd, NULL);
2549
2550 uint64_t bytes_memory = as_storage_record_get_n_bytes_memory(&rd);
2551
2552 // Do this early since set-id is needed for the secondary index update.
2553 apply_opt_meta(r, ns, &opt_meta);
2554
2555 uint16_t old_n_bins = rd.n_bins;
2556
2557 bool has_sindex = record_has_sindex(r, ns);
2558 int sbins_populated = 0;
2559
2560 if (has_sindex) {
2561 SINDEX_GRLOCK();
2562 }
2563
2564 SINDEX_BINS_SETUP(sbins, 2 * ns->sindex_cnt);
2565 as_sindex* si_arr[2 * ns->sindex_cnt];
2566 int si_arr_index = 0;
2567 const char* set_name = as_index_get_set_name(r, ns);
2568
2569 if (has_sindex) {
2570 for (uint16_t i = 0; i < old_n_bins; i++) {
2571 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns,
2572 set_name, rd.bins[i].id, &si_arr[si_arr_index]);
2573 }
2574 }
2575
2576 int32_t delta_bins = (int32_t)opt_meta.n_bins - (int32_t)old_n_bins;
2577
2578 if (ns->single_bin) {
2579 if (delta_bins < 0) {
2580 as_record_destroy_bins(&rd);
2581 }
2582 }
2583 else if (delta_bins != 0) {
2584 if (has_sindex && delta_bins < 0) {
2585 sbins_populated += as_sindex_sbins_from_rd(&rd,
2586 (uint16_t)opt_meta.n_bins, old_n_bins, sbins,
2587 AS_SINDEX_OP_DELETE);
2588 }
2589
2590 as_bin_allocate_bin_space(&rd, delta_bins);
2591 }
2592
2593 for (uint16_t i = 0; i < (uint16_t)opt_meta.n_bins; i++) {
2594 as_bin* b;
2595 size_t name_len = ns->single_bin ? 0 : *p_read++;
2596
2597 if (i < old_n_bins) {
2598 b = &rd.bins[i];
2599
2600 if (has_sindex) {
2601 sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b,
2602 &sbins[sbins_populated], AS_SINDEX_OP_DELETE);
2603 }
2604
2605 if (! as_bin_set_id_from_name_w_len(ns, b, p_read, name_len)) {
2606 // TODO - should maybe fail gracefully?
2607 cf_crash(AS_DRV_SSD, "bin id assignment failed");
2608 }
2609 }
2610 else {
2611 b = as_bin_create_from_buf(&rd, p_read, name_len, NULL);
2612
2613 if (! b) {
2614 // TODO - should maybe fail gracefully?
2615 cf_crash(AS_DRV_SSD, "bin create failed");
2616 }
2617 }
2618
2619 p_read += name_len;
2620
2621 if (! (p_read =
2622 as_bin_particle_replace_from_flat(b, p_read, end))) {
2623 // TODO - should maybe fail gracefully?
2624 cf_crash(AS_DRV_SSD, "particle replace failed");
2625 }
2626
2627 if (has_sindex) {
2628 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns,
2629 set_name, b->id, &si_arr[si_arr_index]);
2630 sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b,
2631 &sbins[sbins_populated], AS_SINDEX_OP_INSERT);
2632 }
2633 }
2634
2635 if (has_sindex) {
2636 SINDEX_GRUNLOCK();
2637
2638 if (sbins_populated > 0) {
2639 as_sindex_update_by_sbin(ns, as_index_get_set_name(r, ns),
2640 sbins, sbins_populated, &r->keyd);
2641 as_sindex_sbin_freeall(sbins, sbins_populated);
2642 }
2643
2644 as_sindex_release_arr(si_arr, si_arr_index);
2645 }
2646
2647 as_storage_record_adjust_mem_stats(&rd, bytes_memory);
2648 as_storage_record_close(&rd);
2649 }
2650 else {
2651 apply_opt_meta(r, ns, &opt_meta);
2652 }
2653
2654 if (is_create) {
2655 ssd->record_add_unique_counter++;
2656 }
2657 else if (STORAGE_RBLOCK_IS_VALID(r->rblock_id)) {
2658 // Replacing an existing record, undo its previous storage accounting.
2659 ssd_block_free(&ssds->ssds[r->file_id], r->rblock_id, r->n_rblocks,
2660 "record-add");
2661 ssd->record_add_replace_counter++;
2662 }
2663 else {
2664 cf_warning(AS_DRV_SSD, "replacing record with invalid rblock-id");
2665 }
2666
2667 ssd_cold_start_transition_record(ns, flat, r, is_create);
2668
2669 uint32_t wblock_id = RBLOCK_ID_TO_WBLOCK_ID(ssd, rblock_id);
2670
2671 ssd->inuse_size += record_size;
2672 ssd->wblock_state[wblock_id].inuse_sz += record_size;
2673
2674 // Set/reset the record's storage information.
2675 r->file_id = ssd->file_id;
2676 r->rblock_id = rblock_id;
2677 r->n_rblocks = flat->n_rblocks;
2678
2679 as_record_done(&r_ref, ns);
2680}
2681
2682
2683// Sweep through a storage device to rebuild the index.
2684void
2685ssd_cold_start_sweep(drv_ssds *ssds, drv_ssd *ssd)
2686{
2687 size_t wblock_size = ssd->write_block_size;
2688
2689 uint8_t *buf = cf_valloc(wblock_size);
2690
2691 bool read_shadow = ssd->shadow_name;
2692 const char *read_ssd_name = read_shadow ? ssd->shadow_name : ssd->name;
2693 int fd = read_shadow ? ssd_shadow_fd_get(ssd) : ssd_fd_get(ssd);
2694 int write_fd = read_shadow ? ssd_fd_get(ssd) : -1;
2695
2696 // Loop over all wblocks, unless we encounter 10 contiguous unused wblocks.
2697
2698 ssd->sweep_wblock_id = ssd->first_wblock_id;
2699
2700 uint64_t file_offset = SSD_HEADER_SIZE;
2701 uint32_t n_unused_wblocks = 0;
2702
2703 bool prefetch = cf_arenax_want_prefetch(ssd->ns->arena);
2704
2705 while (file_offset < ssd->file_size && n_unused_wblocks < 10) {
2706 if (! pread_all(fd, buf, wblock_size, (off_t)file_offset)) {
2707 cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)",
2708 read_ssd_name, errno, cf_strerror(errno));
2709 }
2710
2711 if (read_shadow && ! pwrite_all(write_fd, (void*)buf, wblock_size,
2712 (off_t)file_offset)) {
2713 cf_crash(AS_DRV_SSD, "%s: write failed: errno %d (%s)", ssd->name,
2714 errno, cf_strerror(errno));
2715 }
2716
2717 if (prefetch) {
2718 ssd_prefetch_wblock(ssd, file_offset, buf);
2719 }
2720
2721 size_t indent = 0; // current offset within wblock, in bytes
2722
2723 while (indent < wblock_size) {
2724 as_flat_record *flat = (as_flat_record*)&buf[indent];
2725
2726 if (! prefetch) {
2727 ssd_decrypt(ssd, file_offset + indent, flat);
2728 }
2729
2730 // Look for record magic.
2731 if (flat->magic != AS_FLAT_MAGIC) {
2732 // Should always find a record at beginning of used wblock. if
2733 // not, we've likely encountered the unused part of the device.
2734 if (indent == 0) {
2735 n_unused_wblocks++;
2736 break; // try next wblock
2737 }
2738 // else - nothing more in this wblock, but keep looking for
2739 // magic - necessary if we want to be able to increase
2740 // write-block-size across restarts.
2741
2742 indent += RBLOCK_SIZE;
2743 continue; // try next rblock
2744 }
2745
2746 if (n_unused_wblocks != 0) {
2747 cf_warning(AS_DRV_SSD, "%s: found used wblock after skipping %u unused",
2748 ssd->name, n_unused_wblocks);
2749
2750 n_unused_wblocks = 0; // restart contiguous count
2751 }
2752
2753 uint32_t record_size = N_RBLOCKS_TO_SIZE(flat->n_rblocks);
2754
2755 if (record_size < SSD_RECORD_MIN_SIZE) {
2756 cf_warning(AS_DRV_SSD, "%s: record too small: size %u",
2757 ssd->name, record_size);
2758 indent += RBLOCK_SIZE;
2759 continue; // try next rblock
2760 }
2761
2762 size_t next_indent = indent + record_size;
2763
2764 // Sanity-check for wblock overruns.
2765 if (next_indent > wblock_size) {
2766 cf_warning(AS_DRV_SSD, "%s: record crosses wblock boundary: size %u",
2767 ssd->name, record_size);
2768 break; // skip this record, try next wblock
2769 }
2770
2771 // Found a record - try to add it to the index.
2772 ssd_cold_start_add_record(ssds, ssd, flat,
2773 OFFSET_TO_RBLOCK_ID(file_offset + indent), record_size);
2774
2775 indent = next_indent;
2776 }
2777
2778 file_offset += wblock_size;
2779 ssd->sweep_wblock_id++;
2780 }
2781
2782 ssd->pristine_wblock_id = ssd->sweep_wblock_id - n_unused_wblocks;
2783
2784 ssd->sweep_wblock_id = (uint32_t)(ssd->file_size / wblock_size);
2785
2786 if (fd != -1) {
2787 read_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd);
2788 }
2789
2790 if (write_fd != -1) {
2791 ssd_fd_put(ssd, write_fd);
2792 }
2793
2794 cf_free(buf);
2795}
2796
2797
2798// Thread "run" function to read a storage device and rebuild the index.
2799void *
2800run_ssd_cold_start(void *udata)
2801{
2802 ssd_load_records_info *lri = (ssd_load_records_info*)udata;
2803 drv_ssd *ssd = lri->ssd;
2804 drv_ssds *ssds = lri->ssds;
2805 cf_queue *complete_q = lri->complete_q;
2806 void *complete_rc = lri->complete_rc;
2807
2808 cf_free(lri);
2809
2810 as_namespace* ns = ssds->ns;
2811
2812 cf_info(AS_DRV_SSD, "device %s: reading device to load index", ssd->name);
2813
2814 CF_ALLOC_SET_NS_ARENA(ns);
2815
2816 ssd_cold_start_sweep(ssds, ssd);
2817
2818 cf_info(AS_DRV_SSD, "device %s: read complete: UNIQUE %lu (REPLACED %lu) (OLDER %lu) (EXPIRED %lu) (EVICTED %lu) records",
2819 ssd->name, ssd->record_add_unique_counter,
2820 ssd->record_add_replace_counter, ssd->record_add_older_counter,
2821 ssd->record_add_expired_counter, ssd->record_add_evicted_counter);
2822
2823 if (cf_rc_release(complete_rc) == 0) {
2824 // All drives are done reading.
2825
2826 ns->loading_records = false;
2827 ssd_cold_start_drop_cenotaphs(ns);
2828 ssd_load_wblock_queues(ssds);
2829
2830 cf_mutex_destroy(&ns->cold_start_evict_lock);
2831
2832 as_truncate_list_cenotaphs(ns);
2833 as_truncate_done_startup(ns); // set truncate last-update-times in sets' vmap
2834
2835 ssd_start_maintenance_threads(ssds);
2836 ssd_start_write_threads(ssds);
2837 ssd_start_defrag_threads(ssds);
2838
2839 void *_t = NULL;
2840
2841 cf_queue_push(complete_q, &_t);
2842 cf_rc_free(complete_rc);
2843 }
2844
2845 return NULL;
2846}
2847
2848
2849void
2850start_loading_records(drv_ssds *ssds, cf_queue *complete_q)
2851{
2852 as_namespace *ns = ssds->ns;
2853
2854 ns->loading_records = true;
2855
2856 void *p = cf_rc_alloc(1);
2857
2858 for (int i = 1; i < ssds->n_ssds; i++) {
2859 cf_rc_reserve(p);
2860 }
2861
2862 for (int i = 0; i < ssds->n_ssds; i++) {
2863 drv_ssd *ssd = &ssds->ssds[i];
2864 ssd_load_records_info *lri = cf_malloc(sizeof(ssd_load_records_info));
2865
2866 lri->ssds = ssds;
2867 lri->ssd = ssd;
2868 lri->complete_q = complete_q;
2869 lri->complete_rc = p;
2870
2871 cf_thread_create_detached(
2872 ns->cold_start ? run_ssd_cold_start : run_ssd_cool_start,
2873 (void*)lri);
2874 }
2875}
2876
2877
2878//==========================================================
2879// Generic startup utilities.
2880//
2881
2882static void
2883ssd_flush_header(drv_ssds *ssds, ssd_device_header **headers)
2884{
2885 uint8_t* buf = cf_valloc(SSD_HEADER_SIZE);
2886
2887 memset(buf, 0, SSD_HEADER_SIZE);
2888 memcpy(buf, ssds->common, sizeof(ssd_device_common));
2889
2890 for (int i = 0; i < ssds->n_ssds; i++) {
2891 memcpy(buf + SSD_OFFSET_UNIQUE, &headers[i]->unique,
2892 sizeof(ssd_device_unique));
2893
2894 ssd_write_header(&ssds->ssds[i], buf, buf, SSD_HEADER_SIZE);
2895 }
2896
2897 cf_free(buf);
2898}
2899
2900
2901// Not called for fresh devices, but called in all (warm/cool/cold) starts.
2902static void
2903ssd_init_pristine_wblock_id(drv_ssd *ssd, uint64_t offset)
2904{
2905 if (offset == 0) {
2906 // Legacy device with data - flag to scan and find id on warm restart.
2907 ssd->pristine_wblock_id = 0;
2908 return;
2909 }
2910
2911 // Round up, in case write-block-size was increased.
2912 ssd->pristine_wblock_id =
2913 (offset + (ssd->write_block_size - 1)) / ssd->write_block_size;
2914}
2915
2916
2917void
2918ssd_init_synchronous(drv_ssds *ssds)
2919{
2920 uint64_t random = 0;
2921
2922 while (random == 0) {
2923 random = cf_get_rand64();
2924 }
2925
2926 int n_ssds = ssds->n_ssds;
2927 as_namespace *ns = ssds->ns;
2928
2929 ssd_device_header *headers[n_ssds];
2930 int first_used = -1;
2931
2932 // Check all the headers. Pick one as the representative.
2933 for (int i = 0; i < n_ssds; i++) {
2934 drv_ssd *ssd = &ssds->ssds[i];
2935
2936 headers[i] = ssd_read_header(ssd);
2937
2938 if (! headers[i]) {
2939 headers[i] = ssd_init_header(ns, ssd);
2940 }
2941 else if (first_used < 0) {
2942 first_used = i;
2943 }
2944 }
2945
2946 if (first_used < 0) {
2947 // Shouldn't find all fresh headers here during warm or cool restart.
2948 if (! ns->cold_start) {
2949 // There's no going back to cold start now - do so the harsh way.
2950 cf_crash(AS_DRV_SSD, "{%s} found all %d devices fresh during %s restart",
2951 ns->name, n_ssds, as_namespace_start_mode_str(ns));
2952 }
2953
2954 cf_info(AS_DRV_SSD, "{%s} found all %d devices fresh, initializing to random %lu",
2955 ns->name, n_ssds, random);
2956
2957 ssds->common = cf_valloc(ROUND_UP_COMMON);
2958 memcpy(ssds->common, &headers[0]->common, ROUND_UP_COMMON);
2959
2960 ssds->common->prefix.n_devices = n_ssds;
2961 ssds->common->prefix.random = random;
2962
2963 for (int i = 0; i < n_ssds; i++) {
2964 headers[i]->unique.device_id = (uint32_t)i;
2965 }
2966
2967 ssd_adjust_versions(ns, ssds->common->pmeta);
2968
2969 ssd_flush_header(ssds, headers);
2970
2971 for (int i = 0; i < n_ssds; i++) {
2972 cf_free(headers[i]);
2973 }
2974
2975 as_truncate_list_cenotaphs(ns); // all will show as cenotaph
2976 as_truncate_done_startup(ns);
2977
2978 ssds->all_fresh = true; // won't need to scan devices
2979
2980 return;
2981 }
2982
2983 // At least one device is not fresh. Check that all non-fresh devices match.
2984
2985 bool fresh_drive = false;
2986 bool non_commit_drive = false;
2987 ssd_common_prefix *prefix_first = &headers[first_used]->common.prefix;
2988
2989 memset(ssds->device_translation, -1, sizeof(ssds->device_translation));
2990
2991 for (int i = 0; i < n_ssds; i++) {
2992 drv_ssd *ssd = &ssds->ssds[i];
2993 ssd_common_prefix *prefix_i = &headers[i]->common.prefix;
2994 uint32_t old_device_id = headers[i]->unique.device_id;
2995
2996 headers[i]->unique.device_id = (uint32_t)i;
2997
2998 // Skip fresh devices.
2999 if (prefix_i->random == 0) {
3000 cf_info(AS_DRV_SSD, "{%s} device %s is empty", ns->name, ssd->name);
3001 fresh_drive = true;
3002 continue;
3003 }
3004
3005 ssd_init_pristine_wblock_id(ssd, headers[i]->unique.pristine_offset);
3006
3007 ssds->device_translation[old_device_id] = (int8_t)i;
3008
3009 if (prefix_first->random != prefix_i->random) {
3010 cf_crash(AS_DRV_SSD, "{%s} drive set with unmatched headers - devices %s & %s have different signatures",
3011 ns->name, ssds->ssds[first_used].name, ssd->name);
3012 }
3013
3014 if (prefix_first->n_devices != prefix_i->n_devices) {
3015 cf_crash(AS_DRV_SSD, "{%s} drive set with unmatched headers - devices %s & %s have different device counts",
3016 ns->name, ssds->ssds[first_used].name, ssd->name);
3017 }
3018
3019 // These should all be 0, unless upgrading from pre-4.5.1.
3020 if (prefix_first->last_evict_void_time !=
3021 prefix_i->last_evict_void_time) {
3022 cf_warning(AS_DRV_SSD, "{%s} devices have inconsistent evict-void-times - ignoring",
3023 ns->name);
3024 prefix_first->last_evict_void_time = 0;
3025 }
3026
3027 if ((prefix_i->flags & SSD_HEADER_FLAG_TRUSTED) == 0) {
3028 cf_info(AS_DRV_SSD, "{%s} device %s prior shutdown not clean",
3029 ns->name, ssd->name);
3030 ns->dirty_restart = true;
3031 }
3032
3033 if ((prefix_i->flags & SSD_HEADER_FLAG_COMMIT_TO_DEVICE) == 0) {
3034 non_commit_drive = true;
3035 }
3036 }
3037
3038 // Handle devices' evict threshold - may be upgrading from pre-4.5.1.
3039 if (prefix_first->last_evict_void_time != 0) {
3040 if (ns->smd_evict_void_time == 0) {
3041 ns->smd_evict_void_time = prefix_first->last_evict_void_time;
3042 // Leave header threshold in case we don't commit SMD threshold.
3043 }
3044 else {
3045 // Use SMD threshold, may now erase header threshold.
3046 prefix_first->last_evict_void_time = 0;
3047 }
3048 }
3049
3050 // Drive set OK - fix up header set.
3051 ssds->common = cf_valloc(ROUND_UP_COMMON);
3052 memcpy(ssds->common, &headers[first_used]->common, ROUND_UP_COMMON);
3053
3054 ssds->common->prefix.n_devices = n_ssds; // may have added fresh drives
3055 ssds->common->prefix.random = random;
3056 ssds->common->prefix.flags &= ~SSD_HEADER_FLAG_TRUSTED;
3057
3058 if (fresh_drive || (ns->dirty_restart && non_commit_drive)) {
3059 ssd_adjust_versions(ns, ssds->common->pmeta);
3060 }
3061
3062 ssd_flush_header(ssds, headers);
3063 ssd_flush_final_cfg(ns);
3064
3065 for (int i = 0; i < n_ssds; i++) {
3066 cf_free(headers[i]);
3067 }
3068
3069 uint32_t now = as_record_void_time_get();
3070
3071 // Sanity check void-times during startup.
3072 ns->startup_max_void_time = now + MAX_ALLOWED_TTL;
3073
3074 // Cache booleans indicating whether partitions are owned or not. Also
3075 // restore tree-ids - note that absent partitions do have tree-ids.
3076 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
3077 ssd_common_pmeta *pmeta = &ssds->common->pmeta[pid];
3078
3079 ssds->get_state_from_storage[pid] =
3080 as_partition_version_has_data(&pmeta->version);
3081 ns->partitions[pid].tree_id = pmeta->tree_id;
3082 }
3083
3084 // Warm or cool restart.
3085 if (! ns->cold_start) {
3086 as_truncate_done_startup(ns); // set truncate last-update-times in sets' vmap
3087 ssd_resume_devices(ssds);
3088
3089 return; // warm restart, or warm restart phase of cool restart, is done
3090 }
3091
3092 // Cold start - we can now create our partition trees.
3093 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
3094 if (ssds->get_state_from_storage[pid]) {
3095 as_partition* p = &ns->partitions[pid];
3096
3097 p->tree = as_index_tree_create(&ns->tree_shared, p->tree_id,
3098 as_partition_tree_done, (void*)p);
3099 }
3100 }
3101
3102 // Initialize the cold start expiration and eviction machinery.
3103 cf_mutex_init(&ns->cold_start_evict_lock);
3104 ns->cold_start_now = now;
3105}
3106
3107
3108static uint64_t
3109check_file_size(as_namespace *ns, uint64_t file_size, const char *tag)
3110{
3111 cf_assert(sizeof(off_t) > 4, AS_DRV_SSD, "this OS supports only 32-bit (4g) files - compile with 64 bit offsets");
3112
3113 if (file_size > SSD_HEADER_SIZE) {
3114 off_t unusable_size =
3115 (file_size - SSD_HEADER_SIZE) % ns->storage_write_block_size;
3116
3117 if (unusable_size != 0) {
3118 cf_info(AS_DRV_SSD, "%s size must be header size %u + multiple of %u, rounding down",
3119 tag, SSD_HEADER_SIZE, ns->storage_write_block_size);
3120 file_size -= unusable_size;
3121 }
3122
3123 if (file_size > AS_STORAGE_MAX_DEVICE_SIZE) {
3124 cf_warning(AS_DRV_SSD, "%s size must be <= %ld, trimming original size %ld",
3125 tag, AS_STORAGE_MAX_DEVICE_SIZE, file_size);
3126 file_size = AS_STORAGE_MAX_DEVICE_SIZE;
3127 }
3128 }
3129
3130 if (file_size <= SSD_HEADER_SIZE) {
3131 cf_crash(AS_DRV_SSD, "%s size %ld must be greater than header size %d",
3132 tag, file_size, SSD_HEADER_SIZE);
3133 }
3134
3135 return file_size;
3136}
3137
3138
3139static uint64_t
3140find_io_min_size(int fd, const char *ssd_name)
3141{
3142 uint8_t *buf = cf_valloc(HI_IO_MIN_SIZE);
3143 size_t read_sz = LO_IO_MIN_SIZE;
3144
3145 while (read_sz <= HI_IO_MIN_SIZE) {
3146 if (pread_all(fd, (void*)buf, read_sz, 0)) {
3147 cf_free(buf);
3148 return read_sz;
3149 }
3150
3151 read_sz <<= 1; // LO_IO_MIN_SIZE and HI_IO_MIN_SIZE are powers of 2
3152 }
3153
3154 cf_crash(AS_DRV_SSD, "%s: read failed at all sizes from %u to %u bytes",
3155 ssd_name, LO_IO_MIN_SIZE, HI_IO_MIN_SIZE);
3156
3157 return 0;
3158}
3159
3160
3161void
3162ssd_init_devices(as_namespace *ns, drv_ssds **ssds_p)
3163{
3164 size_t ssds_size = sizeof(drv_ssds) +
3165 (ns->n_storage_devices * sizeof(drv_ssd));
3166 drv_ssds *ssds = cf_malloc(ssds_size);
3167
3168 memset(ssds, 0, ssds_size);
3169 ssds->n_ssds = (int)ns->n_storage_devices;
3170 ssds->ns = ns;
3171
3172 // Raw device-specific initialization of drv_ssd structures.
3173 for (uint32_t i = 0; i < ns->n_storage_devices; i++) {
3174 drv_ssd *ssd = &ssds->ssds[i];
3175
3176 ssd->name = ns->storage_devices[i];
3177
3178 // Note - can't configure commit-to-device and disable-odsync.
3179 ssd->open_flag = O_RDWR | O_DIRECT |
3180 (ns->storage_disable_odsync ? 0 : O_DSYNC);
3181
3182 int fd = open(ssd->name, ssd->open_flag, S_IRUSR | S_IWUSR);
3183
3184 if (fd == -1) {
3185 cf_crash(AS_DRV_SSD, "unable to open device %s: %s", ssd->name,
3186 cf_strerror(errno));
3187 }
3188
3189 uint64_t size = 0;
3190
3191 ioctl(fd, BLKGETSIZE64, &size); // gets the number of bytes
3192
3193 ssd->file_size = check_file_size(ns, size, "usable device");
3194 ssd->io_min_size = find_io_min_size(fd, ssd->name);
3195
3196 if (ns->cold_start && ns->storage_cold_start_empty) {
3197 ssd_empty_header(fd, ssd->name);
3198
3199 cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s",
3200 ssd->name);
3201 }
3202
3203 close(fd);
3204
3205 ns->ssd_size += ssd->file_size; // increment total storage size
3206
3207 cf_info(AS_DRV_SSD, "opened device %s: usable size %lu, io-min-size %lu",
3208 ssd->name, ssd->file_size, ssd->io_min_size);
3209
3210 if (ns->storage_scheduler_mode) {
3211 // Set scheduler mode specified in config file.
3212 cf_storage_set_scheduler(ssd->name, ns->storage_scheduler_mode);
3213 }
3214 }
3215
3216 *ssds_p = ssds;
3217}
3218
3219
3220void
3221ssd_init_shadow_devices(as_namespace *ns, drv_ssds *ssds)
3222{
3223 if (ns->n_storage_shadows == 0) {
3224 // No shadows - a normal deployment.
3225 return;
3226 }
3227
3228 // Check shadow devices.
3229 for (uint32_t i = 0; i < ns->n_storage_shadows; i++) {
3230 drv_ssd *ssd = &ssds->ssds[i];
3231
3232 ssd->shadow_name = ns->storage_shadows[i];
3233
3234 int fd = open(ssd->shadow_name, ssd->open_flag, S_IRUSR | S_IWUSR);
3235
3236 if (fd == -1) {
3237 cf_crash(AS_DRV_SSD, "unable to open shadow device %s: %s",
3238 ssd->shadow_name, cf_strerror(errno));
3239 }
3240
3241 uint64_t size = 0;
3242
3243 ioctl(fd, BLKGETSIZE64, &size); // gets the number of bytes
3244
3245 if (size < ssd->file_size) {
3246 cf_crash(AS_DRV_SSD, "shadow device %s is smaller than main device - %lu < %lu",
3247 ssd->shadow_name, size, ssd->file_size);
3248 }
3249
3250 ssd->shadow_io_min_size = find_io_min_size(fd, ssd->shadow_name);
3251
3252 if (ns->cold_start && ns->storage_cold_start_empty) {
3253 ssd_empty_header(fd, ssd->shadow_name);
3254
3255 cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s",
3256 ssd->shadow_name);
3257 }
3258
3259 close(fd);
3260
3261 cf_info(AS_DRV_SSD, "shadow device %s is compatible with main device, shadow-io-min-size %lu",
3262 ssd->shadow_name, ssd->shadow_io_min_size);
3263
3264 if (ns->storage_scheduler_mode) {
3265 // Set scheduler mode specified in config file.
3266 cf_storage_set_scheduler(ssd->shadow_name,
3267 ns->storage_scheduler_mode);
3268 }
3269 }
3270}
3271
3272
3273void
3274ssd_init_files(as_namespace *ns, drv_ssds **ssds_p)
3275{
3276 size_t ssds_size = sizeof(drv_ssds) +
3277 (ns->n_storage_files * sizeof(drv_ssd));
3278 drv_ssds *ssds = cf_malloc(ssds_size);
3279
3280 memset(ssds, 0, ssds_size);
3281 ssds->n_ssds = (int)ns->n_storage_files;
3282 ssds->ns = ns;
3283
3284 // File-specific initialization of drv_ssd structures.
3285 for (uint32_t i = 0; i < ns->n_storage_files; i++) {
3286 drv_ssd *ssd = &ssds->ssds[i];
3287
3288 ssd->name = ns->storage_devices[i];
3289
3290 if (ns->cold_start && ns->storage_cold_start_empty) {
3291 if (unlink(ssd->name) == 0) {
3292 cf_info(AS_DRV_SSD, "cold-start-empty - removed %s", ssd->name);
3293 }
3294 else if (errno == ENOENT) {
3295 cf_info(AS_DRV_SSD, "cold-start-empty - no file %s", ssd->name);
3296 }
3297 else {
3298 cf_crash(AS_DRV_SSD, "failed remove: errno %d", errno);
3299 }
3300 }
3301
3302 // Note - can't configure commit-to-device and disable-odsync.
3303 uint32_t direct_flags =
3304 O_DIRECT | (ns->storage_disable_odsync ? 0 : O_DSYNC);
3305
3306 ssd->open_flag = O_RDWR |
3307 (ns->storage_commit_to_device || ns->storage_direct_files ?
3308 direct_flags : 0);
3309
3310 // Validate that file can be opened, create it if it doesn't exist.
3311 int fd = open(ssd->name, ssd->open_flag | O_CREAT, S_IRUSR | S_IWUSR);
3312
3313 if (fd == -1) {
3314 cf_crash(AS_DRV_SSD, "unable to open file %s: %s", ssd->name,
3315 cf_strerror(errno));
3316 }
3317
3318 ssd->file_size = check_file_size(ns, ns->storage_filesize, "file");
3319 ssd->io_min_size = LO_IO_MIN_SIZE;
3320
3321 // Truncate will grow or shrink the file to the correct size.
3322 if (ftruncate(fd, (off_t)ssd->file_size) != 0) {
3323 cf_crash(AS_DRV_SSD, "unable to truncate file: errno %d", errno);
3324 }
3325
3326 close(fd);
3327
3328 ns->ssd_size += ssd->file_size; // increment total storage size
3329
3330 cf_info(AS_DRV_SSD, "opened file %s: usable size %lu", ssd->name,
3331 ssd->file_size);
3332 }
3333
3334 *ssds_p = ssds;
3335}
3336
3337
3338void
3339ssd_init_shadow_files(as_namespace *ns, drv_ssds *ssds)
3340{
3341 if (ns->n_storage_shadows == 0) {
3342 // No shadows - a normal deployment.
3343 return;
3344 }
3345
3346 // Check shadow files.
3347 for (uint32_t i = 0; i < ns->n_storage_shadows; i++) {
3348 drv_ssd *ssd = &ssds->ssds[i];
3349
3350 ssd->shadow_name = ns->storage_shadows[i];
3351
3352 if (ns->cold_start && ns->storage_cold_start_empty) {
3353 if (unlink(ssd->shadow_name) == 0) {
3354 cf_info(AS_DRV_SSD, "cold-start-empty - removed %s",
3355 ssd->shadow_name);
3356 }
3357 else if (errno == ENOENT) {
3358 cf_info(AS_DRV_SSD, "cold-start-empty - no shadow file %s",
3359 ssd->shadow_name);
3360 }
3361 else {
3362 cf_crash(AS_DRV_SSD, "failed remove: errno %d", errno);
3363 }
3364 }
3365
3366 // Validate that file can be opened, create it if it doesn't exist.
3367 int fd = open(ssd->shadow_name, ssd->open_flag | O_CREAT,
3368 S_IRUSR | S_IWUSR);
3369
3370 if (fd == -1) {
3371 cf_crash(AS_DRV_SSD, "unable to open shadow file %s: %s",
3372 ssd->shadow_name, cf_strerror(errno));
3373 }
3374
3375 // Truncate will grow or shrink the file to the correct size.
3376 if (ftruncate(fd, (off_t)ssd->file_size) != 0) {
3377 cf_crash(AS_DRV_SSD, "unable to truncate file: errno %d", errno);
3378 }
3379
3380 ssd->shadow_io_min_size = LO_IO_MIN_SIZE;
3381
3382 close(fd);
3383
3384 cf_info(AS_DRV_SSD, "shadow file %s is initialized", ssd->shadow_name);
3385 }
3386}
3387
3388
3389//==========================================================
3390// Generic shutdown utilities.
3391//
3392
3393static void
3394ssd_set_pristine_offset(drv_ssds *ssds)
3395{
3396 // Round down to nearest multiple of HI_IO_MIN_SIZE - for simplicity, using
3397 // HI_IO_MIN_SIZE to allocate once outside the loop.
3398 off_t offset = offsetof(ssd_device_header, unique.pristine_offset) &
3399 -(uint64_t)HI_IO_MIN_SIZE;
3400
3401 // pristine_offset is a uint64_t, must sit within HI_IO_MIN_SIZE of offset.
3402 ssd_device_unique *header_unique = cf_valloc(HI_IO_MIN_SIZE);
3403
3404 cf_mutex_lock(&ssds->flush_lock);
3405
3406 for (int i = 0; i < ssds->n_ssds; i++) {
3407 drv_ssd *ssd = &ssds->ssds[i];
3408
3409 int fd = ssd_fd_get(ssd);
3410
3411 if (! pread_all(fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) {
3412 cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)",
3413 ssd->name, errno, cf_strerror(errno));
3414 }
3415
3416 header_unique->pristine_offset =
3417 (uint64_t)ssd->pristine_wblock_id * ssd->write_block_size;
3418
3419 if (! pwrite_all(fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) {
3420 cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)",
3421 ssd->name, errno, cf_strerror(errno));
3422 }
3423
3424 ssd_fd_put(ssd, fd);
3425
3426 // Skip shadow - persisted offset never used at cold start.
3427 }
3428
3429 cf_mutex_unlock(&ssds->flush_lock);
3430
3431 cf_free(header_unique);
3432}
3433
3434
3435static void
3436ssd_set_trusted(drv_ssds *ssds)
3437{
3438 cf_mutex_lock(&ssds->flush_lock);
3439
3440 ssds->common->prefix.flags |= SSD_HEADER_FLAG_TRUSTED;
3441
3442 for (int i = 0; i < ssds->n_ssds; i++) {
3443 drv_ssd *ssd = &ssds->ssds[i];
3444
3445 ssd_write_header(ssd, (uint8_t *)ssds->common,
3446 (uint8_t *)&ssds->common->prefix.flags,
3447 sizeof(ssds->common->prefix.flags));
3448 }
3449
3450 cf_mutex_unlock(&ssds->flush_lock);
3451}
3452
3453
3454//==========================================================
3455// Storage API implementation: startup, shutdown, etc.
3456//
3457
3458void
3459as_storage_namespace_init_ssd(as_namespace *ns)
3460{
3461 drv_ssds *ssds;
3462
3463 if (ns->n_storage_devices != 0) {
3464 ssd_init_devices(ns, &ssds);
3465 ssd_init_shadow_devices(ns, ssds);
3466 }
3467 else {
3468 ssd_init_files(ns, &ssds);
3469 ssd_init_shadow_files(ns, ssds);
3470 }
3471
3472 cf_mutex_init(&ssds->flush_lock);
3473
3474 // Allow defrag to go full speed during startup - restore the configured
3475 // settings when startup is done.
3476 ns->saved_defrag_sleep = ns->storage_defrag_sleep;
3477 ns->storage_defrag_sleep = 0;
3478
3479 // The queue limit is more efficient to work with.
3480 ns->storage_max_write_q = (int)
3481 (ns->storage_max_write_cache / ns->storage_write_block_size);
3482
3483 // Minimize how often we recalculate this.
3484 ns->defrag_lwm_size =
3485 (ns->storage_write_block_size * ns->storage_defrag_lwm_pct) / 100;
3486
3487 ns->storage_private = (void*)ssds;
3488
3489 char histname[HISTOGRAM_NAME_SIZE];
3490
3491 snprintf(histname, sizeof(histname), "{%s}-device-read-size", ns->name);
3492 ns->device_read_size_hist = histogram_create(histname, HIST_SIZE);
3493
3494 snprintf(histname, sizeof(histname), "{%s}-device-write-size", ns->name);
3495 ns->device_write_size_hist = histogram_create(histname, HIST_SIZE);
3496
3497 uint32_t first_wblock_id = SSD_HEADER_SIZE / ns->storage_write_block_size;
3498
3499 // Finish initializing drv_ssd structures (non-zero-value members).
3500 for (int i = 0; i < ssds->n_ssds; i++) {
3501 drv_ssd *ssd = &ssds->ssds[i];
3502
3503 ssd->ns = ns;
3504 ssd->file_id = i;
3505
3506 cf_mutex_init(&ssd->write_lock);
3507 cf_mutex_init(&ssd->defrag_lock);
3508
3509 ssd->running = true;
3510
3511 // Some (non-dynamic) config shortcuts:
3512 ssd->write_block_size = ns->storage_write_block_size;
3513 ssd->first_wblock_id = first_wblock_id;
3514
3515 // Non-fresh devices will initialize this appropriately later.
3516 ssd->pristine_wblock_id = first_wblock_id;
3517
3518 ssd_wblock_init(ssd);
3519
3520 // Note: free_wblock_q, defrag_wblock_q created after loading devices.
3521
3522 ssd->fd_q = cf_queue_create(sizeof(int), true);
3523 ssd->fd_cache_q = cf_queue_create(sizeof(int), true);
3524
3525 if (ssd->shadow_name) {
3526 ssd->shadow_fd_q = cf_queue_create(sizeof(int), true);
3527 }
3528
3529 ssd->swb_write_q = cf_queue_create(sizeof(void*), true);
3530
3531 if (ssd->shadow_name) {
3532 ssd->swb_shadow_q = cf_queue_create(sizeof(void*), true);
3533 }
3534
3535 ssd->swb_free_q = cf_queue_create(sizeof(void*), true);
3536
3537 if (! ns->storage_data_in_memory) {
3538 // TODO - hide the storage_commit_to_device usage.
3539 ssd->post_write_q = cf_queue_create(sizeof(void*),
3540 ns->storage_commit_to_device);
3541 }
3542
3543 snprintf(histname, sizeof(histname), "{%s}-%s-read", ns->name, ssd->name);
3544 ssd->hist_read = histogram_create(histname, HIST_MILLISECONDS);
3545
3546 snprintf(histname, sizeof(histname), "{%s}-%s-large-block-read", ns->name, ssd->name);
3547 ssd->hist_large_block_read = histogram_create(histname, HIST_MILLISECONDS);
3548
3549 snprintf(histname, sizeof(histname), "{%s}-%s-write", ns->name, ssd->name);
3550 ssd->hist_write = histogram_create(histname, HIST_MILLISECONDS);
3551
3552 if (ssd->shadow_name) {
3553 snprintf(histname, sizeof(histname), "{%s}-%s-shadow-write", ns->name, ssd->name);
3554 ssd->hist_shadow_write = histogram_create(histname, HIST_MILLISECONDS);
3555 }
3556
3557 ssd_init_commit(ssd);
3558 }
3559
3560 // Will load headers and, if warm or cool restart, resume persisted index.
3561 ssd_init_synchronous(ssds);
3562}
3563
3564
3565void
3566as_storage_namespace_load_ssd(as_namespace *ns, cf_queue *complete_q)
3567{
3568 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3569
3570 // If devices have data, and it's cold start or cool restart, scan devices.
3571 if (! ssds->all_fresh &&
3572 (ns->cold_start || as_namespace_cool_restarts(ns))) {
3573 // Fire off threads to scan devices to build index and/or load record
3574 // data into memory - will signal completion when threads are all done.
3575 start_loading_records(ssds, complete_q);
3576 return;
3577 }
3578 // else - fresh devices or warm restart, this namespace is ready to roll.
3579
3580 ssd_load_wblock_queues(ssds);
3581
3582 ssd_start_maintenance_threads(ssds);
3583 ssd_start_write_threads(ssds);
3584 ssd_start_defrag_threads(ssds);
3585
3586 void *_t = NULL;
3587
3588 cf_queue_push(complete_q, &_t);
3589}
3590
3591
3592void
3593as_storage_loading_records_ticker_ssd()
3594{
3595 for (uint32_t i = 0; i < g_config.n_namespaces; i++) {
3596 as_namespace *ns = g_config.namespaces[i];
3597
3598 if (ns->loading_records) {
3599 char buf[2048];
3600 int pos = 0;
3601 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3602
3603 for (int j = 0; j < ssds->n_ssds; j++) {
3604 drv_ssd *ssd = &ssds->ssds[j];
3605 uint32_t pct = (uint32_t)((ssd->sweep_wblock_id * 100UL) /
3606 (ssd->file_size / ssd->write_block_size));
3607
3608 pos += sprintf(buf + pos, ", %s %u%%", ssd->name, pct);
3609 }
3610
3611 // TODO - conform with new log standard?
3612 if (ns->n_tombstones == 0) {
3613 cf_info(AS_DRV_SSD, "{%s} loaded %lu objects%s", ns->name,
3614 ns->n_objects, buf);
3615 }
3616 else {
3617 cf_info(AS_DRV_SSD, "{%s} loaded %lu objects, %lu tombstones%s",
3618 ns->name, ns->n_objects, ns->n_tombstones, buf);
3619 }
3620 }
3621 }
3622}
3623
3624
3625int
3626as_storage_namespace_destroy_ssd(as_namespace *ns)
3627{
3628 // This is not called - for now we don't bother unwinding.
3629 return 0;
3630}
3631
3632
3633// Note that this is *NOT* the counterpart to as_storage_record_create_ssd()!
3634// That would be as_storage_record_close_ssd(). This is what gets called when a
3635// record is destroyed, to dereference storage.
3636int
3637as_storage_record_destroy_ssd(as_namespace *ns, as_record *r)
3638{
3639 if (STORAGE_RBLOCK_IS_VALID(r->rblock_id) && r->n_rblocks != 0) {
3640 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3641 drv_ssd *ssd = &ssds->ssds[r->file_id];
3642
3643 ssd_block_free(ssd, r->rblock_id, r->n_rblocks, "destroy");
3644
3645 r->rblock_id = 0;
3646 r->n_rblocks = 0;
3647 }
3648
3649 return 0;
3650}
3651
3652
3653//==========================================================
3654// Storage API implementation: as_storage_rd cycle.
3655//
3656
3657int
3658as_storage_record_create_ssd(as_storage_rd *rd)
3659{
3660 rd->flat = NULL;
3661 rd->flat_end = NULL;
3662 rd->flat_bins = NULL;
3663 rd->flat_n_bins = 0;
3664 rd->read_buf = NULL;
3665 rd->ssd = NULL;
3666
3667 cf_assert(rd->r->rblock_id == 0, AS_DRV_SSD, "unexpected - uninitialized rblock-id");
3668
3669 return 0;
3670}
3671
3672
3673int
3674as_storage_record_open_ssd(as_storage_rd *rd)
3675{
3676 drv_ssds *ssds = (drv_ssds*)rd->ns->storage_private;
3677
3678 rd->flat = NULL;
3679 rd->flat_end = NULL;
3680 rd->flat_bins = NULL;
3681 rd->flat_n_bins = 0;
3682 rd->read_buf = NULL;
3683 rd->ssd = &ssds->ssds[rd->r->file_id];
3684
3685 return 0;
3686}
3687
3688
3689int
3690as_storage_record_close_ssd(as_storage_rd *rd)
3691{
3692 if (rd->read_buf) {
3693 cf_free(rd->read_buf);
3694 rd->read_buf = NULL;
3695 }
3696
3697 rd->flat = NULL;
3698 rd->flat_end = NULL;
3699 rd->flat_bins = NULL;
3700 rd->flat_n_bins = 0;
3701 rd->ssd = NULL;
3702
3703 return 0;
3704}
3705
3706
3707// These are near the top of this file:
3708// as_storage_record_get_n_bins_ssd()
3709// as_storage_record_read_ssd()
3710// as_storage_particle_read_all_ssd()
3711// as_storage_particle_read_and_size_all_ssd()
3712
3713
3714bool
3715as_storage_record_size_and_check_ssd(as_storage_rd *rd)
3716{
3717 return rd->ns->storage_write_block_size >= as_flat_record_size(rd);
3718}
3719
3720
3721//==========================================================
3722// Storage API implementation: storage capacity monitoring.
3723//
3724
3725void
3726as_storage_wait_for_defrag_ssd(as_namespace *ns)
3727{
3728 if (ns->storage_defrag_startup_minimum > 0) {
3729 while (true) {
3730 int avail_pct;
3731
3732 if (0 != as_storage_stats_ssd(ns, &avail_pct, 0)) {
3733 cf_crash(AS_DRV_SSD, "namespace %s storage stats failed",
3734 ns->name);
3735 }
3736
3737 if (avail_pct >= ns->storage_defrag_startup_minimum) {
3738 break;
3739 }
3740
3741 cf_info(AS_DRV_SSD, "namespace %s waiting for defrag: %d pct available, waiting for %d ...",
3742 ns->name, avail_pct, ns->storage_defrag_startup_minimum);
3743
3744 sleep(2);
3745 }
3746 }
3747
3748 // Restore configured defrag throttling values.
3749 ns->storage_defrag_sleep = ns->saved_defrag_sleep;
3750}
3751
3752
3753bool
3754as_storage_overloaded_ssd(as_namespace *ns)
3755{
3756 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3757 int max_write_q = ns->storage_max_write_q;
3758
3759 // TODO - would be nice to not do this loop every single write transaction!
3760 for (int i = 0; i < ssds->n_ssds; i++) {
3761 drv_ssd *ssd = &ssds->ssds[i];
3762 int qsz = cf_queue_sz(ssd->swb_write_q);
3763
3764 if (qsz > max_write_q) {
3765 cf_ticker_warning(AS_DRV_SSD, "{%s} write fail: queue too deep: exceeds max %d",
3766 ns->name, max_write_q);
3767 return true;
3768 }
3769
3770 if (ssd->shadow_name) {
3771 qsz = cf_queue_sz(ssd->swb_shadow_q);
3772
3773 if (qsz > max_write_q) {
3774 cf_ticker_warning(AS_DRV_SSD, "{%s} write fail: shadow queue too deep: exceeds max %d",
3775 ns->name, max_write_q);
3776 return true;
3777 }
3778 }
3779 }
3780
3781 return false;
3782}
3783
3784
3785bool
3786as_storage_has_space_ssd(as_namespace *ns)
3787{
3788 // Shortcut - assume we can't go from 5% to 0% in 1 ticker interval.
3789 if (ns->storage_last_avail_pct > 5) {
3790 return true;
3791 }
3792 // else - running low on available percent, check rigorously...
3793
3794 drv_ssds* ssds = (drv_ssds*)ns->storage_private;
3795
3796 for (int i = 0; i < ssds->n_ssds; i++) {
3797 if (num_free_wblocks(&ssds->ssds[i]) < min_free_wblocks(ns)) {
3798 return false;
3799 }
3800 }
3801
3802 return true;
3803}
3804
3805
3806void
3807as_storage_defrag_sweep_ssd(as_namespace *ns)
3808{
3809 cf_info(AS_DRV_SSD, "{%s} sweeping all devices for wblocks to defrag ...", ns->name);
3810
3811 drv_ssds* ssds = (drv_ssds*)ns->storage_private;
3812
3813 for (int i = 0; i < ssds->n_ssds; i++) {
3814 cf_atomic32_incr(&ssds->ssds[i].defrag_sweep);
3815 }
3816}
3817
3818
3819//==========================================================
3820// Storage API implementation: data in device headers.
3821//
3822
3823void
3824as_storage_load_regime_ssd(as_namespace *ns)
3825{
3826 drv_ssds* ssds = (drv_ssds*)ns->storage_private;
3827
3828 ns->eventual_regime = ssds->common->prefix.eventual_regime;
3829 ns->rebalance_regime = ns->eventual_regime;
3830}
3831
3832
3833void
3834as_storage_save_regime_ssd(as_namespace *ns)
3835{
3836 drv_ssds* ssds = (drv_ssds*)ns->storage_private;
3837
3838 cf_mutex_lock(&ssds->flush_lock);
3839
3840 ssds->common->prefix.eventual_regime = ns->eventual_regime;
3841
3842 for (int i = 0; i < ssds->n_ssds; i++) {
3843 drv_ssd* ssd = &ssds->ssds[i];
3844
3845 ssd_write_header(ssd, (uint8_t*)ssds->common,
3846 (uint8_t*)&ssds->common->prefix.eventual_regime,
3847 sizeof(ssds->common->prefix.eventual_regime));
3848 }
3849
3850 cf_mutex_unlock(&ssds->flush_lock);
3851}
3852
3853
3854void
3855as_storage_load_roster_generation_ssd(as_namespace *ns)
3856{
3857 drv_ssds* ssds = (drv_ssds*)ns->storage_private;
3858
3859 ns->roster_generation = ssds->common->prefix.roster_generation;
3860}
3861
3862
3863void
3864as_storage_save_roster_generation_ssd(as_namespace *ns)
3865{
3866 drv_ssds* ssds = (drv_ssds*)ns->storage_private;
3867
3868 // Normal for this to not change, cleaner to check here versus outside.
3869 if (ns->roster_generation == ssds->common->prefix.roster_generation) {
3870 return;
3871 }
3872
3873 cf_mutex_lock(&ssds->flush_lock);
3874
3875 ssds->common->prefix.roster_generation = ns->roster_generation;
3876
3877 for (int i = 0; i < ssds->n_ssds; i++) {
3878 drv_ssd* ssd = &ssds->ssds[i];
3879
3880 ssd_write_header(ssd, (uint8_t*)ssds->common,
3881 (uint8_t*)&ssds->common->prefix.roster_generation,
3882 sizeof(ssds->common->prefix.roster_generation));
3883 }
3884
3885 cf_mutex_unlock(&ssds->flush_lock);
3886}
3887
3888
3889void
3890as_storage_load_pmeta_ssd(as_namespace *ns, as_partition *p)
3891{
3892 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3893 ssd_common_pmeta *pmeta = &ssds->common->pmeta[p->id];
3894
3895 p->version = pmeta->version;
3896}
3897
3898
3899void
3900as_storage_save_pmeta_ssd(as_namespace *ns, const as_partition *p)
3901{
3902 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3903 ssd_common_pmeta *pmeta = &ssds->common->pmeta[p->id];
3904
3905 cf_mutex_lock(&ssds->flush_lock);
3906
3907 pmeta->version = p->version;
3908 pmeta->tree_id = p->tree_id;
3909
3910 for (int i = 0; i < ssds->n_ssds; i++) {
3911 drv_ssd *ssd = &ssds->ssds[i];
3912
3913 ssd_write_header(ssd, (uint8_t*)ssds->common, (uint8_t*)pmeta,
3914 sizeof(*pmeta));
3915 }
3916
3917 cf_mutex_unlock(&ssds->flush_lock);
3918}
3919
3920
3921void
3922as_storage_cache_pmeta_ssd(as_namespace *ns, const as_partition *p)
3923{
3924 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3925 ssd_common_pmeta *pmeta = &ssds->common->pmeta[p->id];
3926
3927 pmeta->version = p->version;
3928 pmeta->tree_id = p->tree_id;
3929}
3930
3931
3932void
3933as_storage_flush_pmeta_ssd(as_namespace *ns, uint32_t start_pid,
3934 uint32_t n_partitions)
3935{
3936 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3937 ssd_common_pmeta *pmeta = &ssds->common->pmeta[start_pid];
3938
3939 cf_mutex_lock(&ssds->flush_lock);
3940
3941 for (int i = 0; i < ssds->n_ssds; i++) {
3942 drv_ssd *ssd = &ssds->ssds[i];
3943
3944 ssd_write_header(ssd, (uint8_t*)ssds->common, (uint8_t*)pmeta,
3945 sizeof(ssd_common_pmeta) * n_partitions);
3946 }
3947
3948 cf_mutex_unlock(&ssds->flush_lock);
3949}
3950
3951
3952//==========================================================
3953// Storage API implementation: statistics.
3954//
3955
3956int
3957as_storage_stats_ssd(as_namespace *ns, int *available_pct,
3958 uint64_t *used_disk_bytes)
3959{
3960 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3961
3962 if (available_pct) {
3963 *available_pct = 100;
3964
3965 // Find the device with the lowest available percent.
3966 for (int i = 0; i < ssds->n_ssds; i++) {
3967 drv_ssd *ssd = &ssds->ssds[i];
3968 uint64_t pct = (available_size(ssd) * 100) / ssd->file_size;
3969
3970 if (pct < (uint64_t)*available_pct) {
3971 *available_pct = pct;
3972 }
3973 }
3974
3975 // Used for shortcut in as_storage_has_space_ssd(), which is done on a
3976 // per-transaction basis:
3977 ns->storage_last_avail_pct = *available_pct;
3978 }
3979
3980 if (used_disk_bytes) {
3981 uint64_t sz = 0;
3982
3983 for (int i = 0; i < ssds->n_ssds; i++) {
3984 sz += ssds->ssds[i].inuse_size;
3985 }
3986
3987 *used_disk_bytes = sz;
3988 }
3989
3990 return 0;
3991}
3992
3993
3994void
3995as_storage_device_stats_ssd(struct as_namespace_s *ns, uint32_t device_ix,
3996 storage_device_stats *stats)
3997{
3998 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
3999 drv_ssd *ssd = &ssds->ssds[device_ix];
4000
4001 stats->used_sz = ssd->inuse_size;
4002 stats->n_free_wblocks = num_free_wblocks(ssd);
4003
4004 stats->write_q_sz = cf_queue_sz(ssd->swb_write_q);
4005 stats->n_writes = ssd->n_wblock_writes;
4006
4007 stats->defrag_q_sz = cf_queue_sz(ssd->defrag_wblock_q);
4008 stats->n_defrag_reads = ssd->n_defrag_wblock_reads;
4009 stats->n_defrag_writes = ssd->n_defrag_wblock_writes;
4010
4011 stats->shadow_write_q_sz = ssd->swb_shadow_q ?
4012 cf_queue_sz(ssd->swb_shadow_q) : 0;
4013}
4014
4015
4016int
4017as_storage_ticker_stats_ssd(as_namespace *ns)
4018{
4019 histogram_dump(ns->device_read_size_hist);
4020 histogram_dump(ns->device_write_size_hist);
4021
4022 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
4023
4024 for (int i = 0; i < ssds->n_ssds; i++) {
4025 drv_ssd *ssd = &ssds->ssds[i];
4026
4027 histogram_dump(ssd->hist_read);
4028 histogram_dump(ssd->hist_large_block_read);
4029 histogram_dump(ssd->hist_write);
4030
4031 if (ssd->hist_shadow_write) {
4032 histogram_dump(ssd->hist_shadow_write);
4033 }
4034 }
4035
4036 return 0;
4037}
4038
4039
4040int
4041as_storage_histogram_clear_ssd(as_namespace *ns)
4042{
4043 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
4044
4045 for (int i = 0; i < ssds->n_ssds; i++) {
4046 drv_ssd *ssd = &ssds->ssds[i];
4047
4048 histogram_clear(ssd->hist_read);
4049 histogram_clear(ssd->hist_large_block_read);
4050 histogram_clear(ssd->hist_write);
4051
4052 if (ssd->hist_shadow_write) {
4053 histogram_clear(ssd->hist_shadow_write);
4054 }
4055 }
4056
4057 return 0;
4058}
4059
4060
4061//==========================================================
4062// Get record storage metadata.
4063//
4064
4065uint32_t
4066as_storage_record_size_ssd(const as_record *r)
4067{
4068 return N_RBLOCKS_TO_SIZE(r->n_rblocks);
4069}
4070
4071
4072//==========================================================
4073// Shutdown.
4074//
4075
4076void
4077as_storage_shutdown_ssd(as_namespace *ns)
4078{
4079 drv_ssds *ssds = (drv_ssds*)ns->storage_private;
4080
4081 for (int i = 0; i < ssds->n_ssds; i++) {
4082 drv_ssd *ssd = &ssds->ssds[i];
4083
4084 // Stop the maintenance thread from (also) flushing the swbs.
4085 cf_mutex_lock(&ssd->write_lock);
4086 cf_mutex_lock(&ssd->defrag_lock);
4087
4088 // Flush current swb by pushing it to write-q.
4089 if (ssd->current_swb) {
4090 // Clean the end of the buffer before pushing to write-q.
4091 if (ssd->write_block_size > ssd->current_swb->pos) {
4092 memset(&ssd->current_swb->buf[ssd->current_swb->pos], 0,
4093 ssd->write_block_size - ssd->current_swb->pos);
4094 }
4095
4096 cf_queue_push(ssd->swb_write_q, &ssd->current_swb);
4097 ssd->current_swb = NULL;
4098 }
4099
4100 // Flush defrag swb by pushing it to write-q.
4101 if (ssd->defrag_swb) {
4102 // Clean the end of the buffer before pushing to write-q.
4103 if (ssd->write_block_size > ssd->defrag_swb->pos) {
4104 memset(&ssd->defrag_swb->buf[ssd->defrag_swb->pos], 0,
4105 ssd->write_block_size - ssd->defrag_swb->pos);
4106 }
4107
4108 cf_queue_push(ssd->swb_write_q, &ssd->defrag_swb);
4109 ssd->defrag_swb = NULL;
4110 }
4111 }
4112
4113 for (int i = 0; i < ssds->n_ssds; i++) {
4114 drv_ssd *ssd = &ssds->ssds[i];
4115
4116 while (cf_queue_sz(ssd->swb_write_q)) {
4117 usleep(1000);
4118 }
4119
4120 if (ssd->shadow_name) {
4121 while (cf_queue_sz(ssd->swb_shadow_q)) {
4122 usleep(1000);
4123 }
4124 }
4125
4126 ssd->running = false;
4127 }
4128
4129 for (int i = 0; i < ssds->n_ssds; i++) {
4130 drv_ssd *ssd = &ssds->ssds[i];
4131
4132 cf_thread_join(ssd->write_tid);
4133
4134 if (ssd->shadow_name) {
4135 cf_thread_join(ssd->shadow_tid);
4136 }
4137 }
4138
4139 ssd_set_pristine_offset(ssds);
4140 ssd_set_trusted(ssds);
4141}
4142