1 | /* |
2 | * drv_ssd.c |
3 | * |
4 | * Copyright (C) 2009-2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | /* SYNOPSIS |
24 | * "file" based storage driver, which applies to both SSD namespaces and, in |
25 | * some cases, to file-backed main-memory namespaces. |
26 | */ |
27 | |
28 | #include "storage/drv_ssd.h" |
29 | |
30 | #include <fcntl.h> |
31 | #include <errno.h> |
32 | #include <stdbool.h> |
33 | #include <stddef.h> |
34 | #include <stdint.h> |
35 | #include <stdio.h> |
36 | #include <string.h> |
37 | #include <time.h> |
38 | #include <unistd.h> |
39 | #include <linux/fs.h> // for BLKGETSIZE64 |
40 | #include <sys/ioctl.h> |
41 | #include <sys/param.h> // for MAX() |
42 | |
43 | #include "aerospike/as_atomic.h" |
44 | #include "citrusleaf/alloc.h" |
45 | #include "citrusleaf/cf_atomic.h" |
46 | #include "citrusleaf/cf_clock.h" |
47 | #include "citrusleaf/cf_digest.h" |
48 | #include "citrusleaf/cf_queue.h" |
49 | #include "citrusleaf/cf_random.h" |
50 | |
51 | #include "bits.h" |
52 | #include "cf_mutex.h" |
53 | #include "cf_thread.h" |
54 | #include "fault.h" |
55 | #include "hist.h" |
56 | #include "vmapx.h" |
57 | |
58 | #include "base/cfg.h" |
59 | #include "base/datamodel.h" |
60 | #include "base/health.h" |
61 | #include "base/index.h" |
62 | #include "base/nsup.h" |
63 | #include "base/proto.h" |
64 | #include "base/secondary_index.h" |
65 | #include "base/truncate.h" |
66 | #include "fabric/partition.h" |
67 | #include "storage/flat.h" |
68 | #include "storage/storage.h" |
69 | #include "transaction/rw_utils.h" |
70 | |
71 | |
72 | //========================================================== |
73 | // Constants. |
74 | // |
75 | |
76 | #define DEFRAG_STARTUP_RESERVE 4 |
77 | #define DEFRAG_RUNTIME_RESERVE 4 |
78 | |
79 | #define WRITE_IN_PLACE 1 |
80 | |
81 | |
82 | //========================================================== |
83 | // Miscellaneous utility functions. |
84 | // |
85 | |
86 | // Get an open file descriptor from the pool, or a fresh one if necessary. |
87 | int |
88 | ssd_fd_get(drv_ssd *ssd) |
89 | { |
90 | int fd = -1; |
91 | int rv = cf_queue_pop(ssd->fd_q, (void*)&fd, CF_QUEUE_NOWAIT); |
92 | |
93 | if (rv != CF_QUEUE_OK) { |
94 | fd = open(ssd->name, ssd->open_flag, S_IRUSR | S_IWUSR); |
95 | |
96 | if (-1 == fd) { |
97 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)" , |
98 | ssd->name, errno, cf_strerror(errno)); |
99 | } |
100 | } |
101 | |
102 | return fd; |
103 | } |
104 | |
105 | |
106 | int |
107 | ssd_fd_cache_get(drv_ssd *ssd) |
108 | { |
109 | int fd = -1; |
110 | int rv = cf_queue_pop(ssd->fd_cache_q, (void*)&fd, CF_QUEUE_NOWAIT); |
111 | |
112 | if (rv != CF_QUEUE_OK) { |
113 | fd = open(ssd->name, ssd->open_flag & ~(O_DIRECT | O_DSYNC), |
114 | S_IRUSR | S_IWUSR); |
115 | |
116 | if (-1 == fd) { |
117 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)" , |
118 | ssd->name, errno, cf_strerror(errno)); |
119 | } |
120 | } |
121 | |
122 | return fd; |
123 | } |
124 | |
125 | |
126 | int |
127 | ssd_shadow_fd_get(drv_ssd *ssd) |
128 | { |
129 | int fd = -1; |
130 | int rv = cf_queue_pop(ssd->shadow_fd_q, (void*)&fd, CF_QUEUE_NOWAIT); |
131 | |
132 | if (rv != CF_QUEUE_OK) { |
133 | fd = open(ssd->shadow_name, ssd->open_flag, S_IRUSR | S_IWUSR); |
134 | |
135 | if (-1 == fd) { |
136 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED open: errno %d (%s)" , |
137 | ssd->shadow_name, errno, cf_strerror(errno)); |
138 | } |
139 | } |
140 | |
141 | return fd; |
142 | } |
143 | |
144 | |
145 | // Save an open file descriptor in the pool |
146 | void |
147 | ssd_fd_put(drv_ssd *ssd, int fd) |
148 | { |
149 | cf_queue_push(ssd->fd_q, (void*)&fd); |
150 | } |
151 | |
152 | |
153 | static inline void |
154 | ssd_fd_cache_put(drv_ssd *ssd, int fd) |
155 | { |
156 | cf_queue_push(ssd->fd_cache_q, (void*)&fd); |
157 | } |
158 | |
159 | |
160 | static inline void |
161 | ssd_shadow_fd_put(drv_ssd *ssd, int fd) |
162 | { |
163 | cf_queue_push(ssd->shadow_fd_q, (void*)&fd); |
164 | } |
165 | |
166 | |
167 | // Decide which device a record belongs on. |
168 | static inline uint32_t |
169 | ssd_get_file_id(drv_ssds *ssds, cf_digest *keyd) |
170 | { |
171 | return *(uint32_t*)&keyd->digest[DIGEST_STORAGE_BASE_BYTE] % ssds->n_ssds; |
172 | } |
173 | |
174 | |
175 | // Put a wblock on the free queue for reuse. |
176 | static inline void |
177 | push_wblock_to_free_q(drv_ssd *ssd, uint32_t wblock_id) |
178 | { |
179 | // Can get here before queue created, e.g. cold start replacing records. |
180 | if (ssd->free_wblock_q == NULL) { |
181 | return; |
182 | } |
183 | |
184 | cf_assert(wblock_id < ssd->n_wblocks, AS_DRV_SSD, |
185 | "pushing bad wblock_id %d to free_wblock_q" , (int32_t)wblock_id); |
186 | |
187 | cf_queue_push(ssd->free_wblock_q, &wblock_id); |
188 | } |
189 | |
190 | |
191 | // Put a wblock on the defrag queue. |
192 | static inline void |
193 | push_wblock_to_defrag_q(drv_ssd *ssd, uint32_t wblock_id) |
194 | { |
195 | if (ssd->defrag_wblock_q) { // null until devices are loaded at startup |
196 | ssd->wblock_state[wblock_id].state = WBLOCK_STATE_DEFRAG; |
197 | cf_queue_push(ssd->defrag_wblock_q, &wblock_id); |
198 | cf_atomic64_incr(&ssd->n_defrag_wblock_reads); |
199 | } |
200 | } |
201 | |
202 | |
203 | static inline bool |
204 | pop_pristine_wblock_id(drv_ssd *ssd, uint32_t* wblock_id) |
205 | { |
206 | uint32_t id; |
207 | |
208 | while ((id = as_load_uint32(&ssd->pristine_wblock_id)) < ssd->n_wblocks) { |
209 | if (as_cas_uint32(&ssd->pristine_wblock_id, id, id + 1)) { |
210 | *wblock_id = id; |
211 | return true; |
212 | } |
213 | } |
214 | |
215 | return false; // out of space |
216 | } |
217 | |
218 | |
219 | static inline uint32_t |
220 | num_pristine_wblocks(const drv_ssd *ssd) |
221 | { |
222 | return ssd->n_wblocks - ssd->pristine_wblock_id; |
223 | } |
224 | |
225 | |
226 | static inline uint32_t |
227 | num_free_wblocks(const drv_ssd *ssd) |
228 | { |
229 | return cf_queue_sz(ssd->free_wblock_q) + num_pristine_wblocks(ssd); |
230 | } |
231 | |
232 | |
233 | // Available contiguous size. |
234 | static inline uint64_t |
235 | available_size(drv_ssd *ssd) |
236 | { |
237 | // Note - returns 100% available during cold start, to make it irrelevant in |
238 | // cold start eviction threshold check. |
239 | |
240 | return ssd->free_wblock_q != NULL ? |
241 | (uint64_t)num_free_wblocks(ssd) * ssd->write_block_size : |
242 | ssd->file_size; |
243 | } |
244 | |
245 | |
246 | // Since UDF writes can't yet unwind on failure, we ensure that they'll succeed |
247 | // by checking before writing on all threads that there's at least one wblock |
248 | // per thread. TODO - deprecate this methodology when everything can unwind. |
249 | static inline uint32_t |
250 | min_free_wblocks(const as_namespace *ns) |
251 | { |
252 | return g_config.n_service_threads + // client writes |
253 | g_config.n_fabric_channel_recv_threads[AS_FABRIC_CHANNEL_RW] + // prole writes |
254 | g_config.n_fabric_channel_recv_threads[AS_FABRIC_CHANNEL_BULK] + // migration writes |
255 | 1 + // always 1 defrag thread |
256 | DEFRAG_RUNTIME_RESERVE + // reserve for defrag at runtime |
257 | DEFRAG_STARTUP_RESERVE; // reserve for defrag at startup |
258 | } |
259 | |
260 | |
261 | void |
262 | ssd_release_vacated_wblock(drv_ssd *ssd, uint32_t wblock_id, |
263 | ssd_wblock_state* p_wblock_state) |
264 | { |
265 | cf_assert(p_wblock_state->swb == NULL, AS_DRV_SSD, |
266 | "device %s: wblock-id %u swb not null while defragging" , |
267 | ssd->name, wblock_id); |
268 | |
269 | cf_assert(p_wblock_state->state == WBLOCK_STATE_DEFRAG, AS_DRV_SSD, |
270 | "device %s: wblock-id %u state not DEFRAG while defragging" , |
271 | ssd->name, wblock_id); |
272 | |
273 | int32_t n_vac_dests = cf_atomic32_decr(&p_wblock_state->n_vac_dests); |
274 | |
275 | if (n_vac_dests > 0) { |
276 | return; |
277 | } |
278 | // else - all wblocks we defragged into have been flushed. |
279 | |
280 | cf_assert(n_vac_dests == 0, AS_DRV_SSD, |
281 | "device %s: wblock-id %u vacation destinations underflow" , |
282 | ssd->name, wblock_id); |
283 | |
284 | cf_mutex_lock(&p_wblock_state->LOCK); |
285 | |
286 | p_wblock_state->state = WBLOCK_STATE_NONE; |
287 | |
288 | // Free the wblock if it's empty. |
289 | if (cf_atomic32_get(p_wblock_state->inuse_sz) == 0 && |
290 | // TODO - given assertions above, this condition is superfluous: |
291 | p_wblock_state->swb == NULL) { |
292 | push_wblock_to_free_q(ssd, wblock_id); |
293 | } |
294 | |
295 | cf_mutex_unlock(&p_wblock_state->LOCK); |
296 | } |
297 | |
298 | |
299 | //------------------------------------------------ |
300 | // ssd_write_buf "swb" methods. |
301 | // |
302 | |
303 | #define VACATED_CAPACITY_STEP 128 // allocate in 1K chunks |
304 | |
305 | static inline ssd_write_buf* |
306 | swb_create(drv_ssd *ssd) |
307 | { |
308 | ssd_write_buf *swb = (ssd_write_buf*)cf_malloc(sizeof(ssd_write_buf)); |
309 | |
310 | swb->buf = cf_valloc(ssd->write_block_size); |
311 | |
312 | swb->n_vacated = 0; |
313 | swb->vacated_capacity = VACATED_CAPACITY_STEP; |
314 | swb->vacated_wblocks = |
315 | cf_malloc(sizeof(vacated_wblock) * swb->vacated_capacity); |
316 | |
317 | return swb; |
318 | } |
319 | |
320 | static inline void |
321 | swb_destroy(ssd_write_buf *swb) |
322 | { |
323 | cf_free(swb->vacated_wblocks); |
324 | cf_free(swb->buf); |
325 | cf_free(swb); |
326 | } |
327 | |
328 | static inline void |
329 | swb_reset(ssd_write_buf *swb) |
330 | { |
331 | swb->skip_post_write_q = false; |
332 | swb->wblock_id = STORAGE_INVALID_WBLOCK; |
333 | swb->pos = 0; |
334 | } |
335 | |
336 | #define swb_reserve(_swb) cf_atomic32_incr(&(_swb)->rc) |
337 | |
338 | static inline void |
339 | swb_check_and_reserve(ssd_wblock_state *wblock_state, ssd_write_buf **p_swb) |
340 | { |
341 | cf_mutex_lock(&wblock_state->LOCK); |
342 | |
343 | if (wblock_state->swb != NULL) { |
344 | *p_swb = wblock_state->swb; |
345 | swb_reserve(*p_swb); |
346 | } |
347 | |
348 | cf_mutex_unlock(&wblock_state->LOCK); |
349 | } |
350 | |
351 | static inline void |
352 | swb_release(ssd_write_buf *swb) |
353 | { |
354 | if (0 == cf_atomic32_decr(&swb->rc)) { |
355 | swb_reset(swb); |
356 | |
357 | // Put the swb back on the free queue for reuse. |
358 | cf_queue_push(swb->ssd->swb_free_q, &swb); |
359 | } |
360 | } |
361 | |
362 | static inline void |
363 | swb_dereference_and_release(drv_ssd *ssd, uint32_t wblock_id, |
364 | ssd_write_buf *swb) |
365 | { |
366 | ssd_wblock_state *wblock_state = &ssd->wblock_state[wblock_id]; |
367 | |
368 | cf_mutex_lock(&wblock_state->LOCK); |
369 | |
370 | cf_assert(swb == wblock_state->swb, AS_DRV_SSD, |
371 | "releasing wrong swb! %p (%d) != %p (%d), thread %d" , |
372 | swb, (int32_t)swb->wblock_id, wblock_state->swb, |
373 | (int32_t)wblock_state->swb->wblock_id, cf_thread_sys_tid()); |
374 | |
375 | swb_release(wblock_state->swb); |
376 | wblock_state->swb = NULL; |
377 | |
378 | cf_assert(wblock_state->state != WBLOCK_STATE_DEFRAG, AS_DRV_SSD, |
379 | "device %s: wblock-id %u state is DEFRAG on swb release" , ssd->name, |
380 | wblock_id); |
381 | |
382 | uint32_t inuse_sz = cf_atomic32_get(wblock_state->inuse_sz); |
383 | |
384 | // Free wblock if all three gating conditions hold. |
385 | if (inuse_sz == 0) { |
386 | cf_atomic64_incr(&ssd->n_wblock_direct_frees); |
387 | push_wblock_to_free_q(ssd, wblock_id); |
388 | } |
389 | // Queue wblock for defrag if applicable. |
390 | else if (inuse_sz < ssd->ns->defrag_lwm_size) { |
391 | push_wblock_to_defrag_q(ssd, wblock_id); |
392 | } |
393 | |
394 | cf_mutex_unlock(&wblock_state->LOCK); |
395 | } |
396 | |
397 | ssd_write_buf * |
398 | swb_get(drv_ssd *ssd) |
399 | { |
400 | ssd_write_buf *swb; |
401 | |
402 | if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_free_q, &swb, CF_QUEUE_NOWAIT)) { |
403 | swb = swb_create(ssd); |
404 | swb->rc = 0; |
405 | swb->n_writers = 0; |
406 | swb->dirty = false; |
407 | swb->skip_post_write_q = false; |
408 | swb->ssd = ssd; |
409 | swb->wblock_id = STORAGE_INVALID_WBLOCK; |
410 | swb->pos = 0; |
411 | } |
412 | |
413 | // Find a device block to write to. |
414 | if (cf_queue_pop(ssd->free_wblock_q, &swb->wblock_id, CF_QUEUE_NOWAIT) != |
415 | CF_QUEUE_OK && ! pop_pristine_wblock_id(ssd, &swb->wblock_id)) { |
416 | cf_queue_push(ssd->swb_free_q, &swb); |
417 | return NULL; |
418 | } |
419 | |
420 | ssd_wblock_state* p_wblock_state = &ssd->wblock_state[swb->wblock_id]; |
421 | |
422 | uint32_t inuse_sz = cf_atomic32_get(p_wblock_state->inuse_sz); |
423 | |
424 | cf_assert(inuse_sz == 0, AS_DRV_SSD, |
425 | "device %s: wblock-id %u inuse-size %u off free-q" , ssd->name, |
426 | swb->wblock_id, inuse_sz); |
427 | |
428 | cf_assert(p_wblock_state->swb == NULL, AS_DRV_SSD, |
429 | "device %s: wblock-id %u swb not null off free-q" , ssd->name, |
430 | swb->wblock_id); |
431 | |
432 | cf_assert(p_wblock_state->state != WBLOCK_STATE_DEFRAG, AS_DRV_SSD, |
433 | "device %s: wblock-id %u state DEFRAG off free-q" , ssd->name, |
434 | swb->wblock_id); |
435 | |
436 | cf_mutex_lock(&p_wblock_state->LOCK); |
437 | |
438 | swb_reserve(swb); |
439 | p_wblock_state->swb = swb; |
440 | |
441 | cf_mutex_unlock(&p_wblock_state->LOCK); |
442 | |
443 | return swb; |
444 | } |
445 | |
446 | bool |
447 | swb_add_unique_vacated_wblock(ssd_write_buf* swb, uint32_t src_file_id, |
448 | uint32_t src_wblock_id) |
449 | { |
450 | for (uint32_t i = 0; i < swb->n_vacated; i++) { |
451 | vacated_wblock *vw = &swb->vacated_wblocks[i]; |
452 | |
453 | if (vw->wblock_id == src_wblock_id && vw->file_id == src_file_id) { |
454 | return false; // already present |
455 | } |
456 | } |
457 | |
458 | if (swb->n_vacated == swb->vacated_capacity) { |
459 | swb->vacated_capacity += VACATED_CAPACITY_STEP; |
460 | swb->vacated_wblocks = cf_realloc(swb->vacated_wblocks, |
461 | sizeof(vacated_wblock) * swb->vacated_capacity); |
462 | } |
463 | |
464 | swb->vacated_wblocks[swb->n_vacated].file_id = src_file_id; |
465 | swb->vacated_wblocks[swb->n_vacated].wblock_id = src_wblock_id; |
466 | swb->n_vacated++; |
467 | |
468 | return true; // added to list |
469 | } |
470 | |
471 | void |
472 | swb_release_all_vacated_wblocks(ssd_write_buf* swb) |
473 | { |
474 | drv_ssds *ssds = (drv_ssds *)swb->ssd->ns->storage_private; |
475 | |
476 | for (uint32_t i = 0; i < swb->n_vacated; i++) { |
477 | vacated_wblock *vw = &swb->vacated_wblocks[i]; |
478 | |
479 | drv_ssd *src_ssd = &ssds->ssds[vw->file_id]; |
480 | ssd_wblock_state* wblock_state = &src_ssd->wblock_state[vw->wblock_id]; |
481 | |
482 | ssd_release_vacated_wblock(src_ssd, vw->wblock_id, wblock_state); |
483 | } |
484 | |
485 | swb->n_vacated = 0; |
486 | } |
487 | |
488 | // |
489 | // END - ssd_write_buf "swb" methods. |
490 | //------------------------------------------------ |
491 | |
492 | |
493 | // Reduce wblock's used size, if result is 0 put it in the "free" pool, if it's |
494 | // below the defrag threshold put it in the defrag queue. |
495 | void |
496 | ssd_block_free(drv_ssd *ssd, uint64_t rblock_id, uint32_t n_rblocks, char *msg) |
497 | { |
498 | // Determine which wblock we're reducing used size in. |
499 | uint64_t start_offset = RBLOCK_ID_TO_OFFSET(rblock_id); |
500 | uint32_t size = N_RBLOCKS_TO_SIZE(n_rblocks); |
501 | uint32_t wblock_id = OFFSET_TO_WBLOCK_ID(ssd, start_offset); |
502 | uint32_t end_wblock_id = OFFSET_TO_WBLOCK_ID(ssd, start_offset + size - 1); |
503 | |
504 | cf_assert(size >= SSD_RECORD_MIN_SIZE, AS_DRV_SSD, |
505 | "%s: %s: freeing bad size %u rblock_id %lu" , ssd->name, msg, size, |
506 | rblock_id); |
507 | |
508 | cf_assert(start_offset >= SSD_HEADER_SIZE && |
509 | wblock_id < ssd->n_wblocks && wblock_id == end_wblock_id, |
510 | AS_DRV_SSD, "%s: %s: freeing bad range rblock_id %lu n_rblocks %u" , |
511 | ssd->name, msg, rblock_id, n_rblocks); |
512 | |
513 | cf_atomic64_sub(&ssd->inuse_size, (int64_t)size); |
514 | |
515 | ssd_wblock_state *p_wblock_state = &ssd->wblock_state[wblock_id]; |
516 | |
517 | cf_mutex_lock(&p_wblock_state->LOCK); |
518 | |
519 | int64_t resulting_inuse_sz = cf_atomic32_sub(&p_wblock_state->inuse_sz, |
520 | (int32_t)size); |
521 | |
522 | cf_assert(resulting_inuse_sz >= 0 && |
523 | resulting_inuse_sz < (int64_t)ssd->write_block_size, AS_DRV_SSD, |
524 | "%s: %s: wblock %d %s, subtracted %d now %ld" , ssd->name, msg, |
525 | wblock_id, resulting_inuse_sz < 0 ? "over-freed" : "bad inuse_sz" , |
526 | (int32_t)size, resulting_inuse_sz); |
527 | |
528 | if (p_wblock_state->swb == NULL && |
529 | p_wblock_state->state != WBLOCK_STATE_DEFRAG) { |
530 | // Free wblock if all three gating conditions hold. |
531 | if (resulting_inuse_sz == 0) { |
532 | cf_atomic64_incr(&ssd->n_wblock_direct_frees); |
533 | push_wblock_to_free_q(ssd, wblock_id); |
534 | } |
535 | // Queue wblock for defrag if appropriate. |
536 | else if (resulting_inuse_sz < ssd->ns->defrag_lwm_size) { |
537 | push_wblock_to_defrag_q(ssd, wblock_id); |
538 | } |
539 | } |
540 | |
541 | cf_mutex_unlock(&p_wblock_state->LOCK); |
542 | } |
543 | |
544 | |
545 | // FIXME - what really to do if n_rblocks on drive doesn't match index? |
546 | void |
547 | defrag_move_record(drv_ssd *src_ssd, uint32_t src_wblock_id, |
548 | as_flat_record *flat, as_index *r) |
549 | { |
550 | uint64_t old_rblock_id = r->rblock_id; |
551 | uint32_t old_n_rblocks = r->n_rblocks; |
552 | |
553 | drv_ssds *ssds = (drv_ssds*)src_ssd->ns->storage_private; |
554 | |
555 | // Figure out which device to write to. When replacing an old record, it's |
556 | // possible this is different from the old device (e.g. if we've added a |
557 | // fresh device), so derive it from the digest each time. |
558 | drv_ssd *ssd = &ssds->ssds[ssd_get_file_id(ssds, &flat->keyd)]; |
559 | |
560 | cf_assert(ssd, AS_DRV_SSD, "{%s} null ssd" , ssds->ns->name); |
561 | |
562 | uint32_t ssd_n_rblocks = flat->n_rblocks; |
563 | uint32_t write_size = N_RBLOCKS_TO_SIZE(ssd_n_rblocks); |
564 | |
565 | cf_mutex_lock(&ssd->defrag_lock); |
566 | |
567 | ssd_write_buf *swb = ssd->defrag_swb; |
568 | |
569 | if (! swb) { |
570 | swb = swb_get(ssd); |
571 | ssd->defrag_swb = swb; |
572 | |
573 | if (! swb) { |
574 | cf_warning(AS_DRV_SSD, "defrag_move_record: couldn't get swb" ); |
575 | cf_mutex_unlock(&ssd->defrag_lock); |
576 | return; |
577 | } |
578 | } |
579 | |
580 | // Check if there's enough space in defrag buffer - if not, free and zero |
581 | // any remaining unused space, enqueue it to be flushed to device, and grab |
582 | // a new buffer. |
583 | if (write_size > ssd->write_block_size - swb->pos) { |
584 | if (ssd->write_block_size != swb->pos) { |
585 | // Clean the end of the buffer before pushing to write queue. |
586 | memset(swb->buf + swb->pos, 0, ssd->write_block_size - swb->pos); |
587 | } |
588 | |
589 | // Enqueue the buffer, to be flushed to device. |
590 | swb->skip_post_write_q = true; |
591 | cf_queue_push(ssd->swb_write_q, &swb); |
592 | cf_atomic64_incr(&ssd->n_defrag_wblock_writes); |
593 | |
594 | // Get the new buffer. |
595 | swb = swb_get(ssd); |
596 | ssd->defrag_swb = swb; |
597 | |
598 | if (! swb) { |
599 | cf_warning(AS_DRV_SSD, "defrag_move_record: couldn't get swb" ); |
600 | cf_mutex_unlock(&ssd->defrag_lock); |
601 | return; |
602 | } |
603 | } |
604 | |
605 | memcpy(swb->buf + swb->pos, (const uint8_t*)flat, write_size); |
606 | |
607 | uint64_t write_offset = WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id) + swb->pos; |
608 | |
609 | ssd_encrypt(ssd, write_offset, (as_flat_record *)(swb->buf + swb->pos)); |
610 | |
611 | r->file_id = ssd->file_id; |
612 | r->rblock_id = OFFSET_TO_RBLOCK_ID(write_offset); |
613 | r->n_rblocks = ssd_n_rblocks; |
614 | |
615 | swb->pos += write_size; |
616 | |
617 | cf_atomic64_add(&ssd->inuse_size, (int64_t)write_size); |
618 | cf_atomic32_add(&ssd->wblock_state[swb->wblock_id].inuse_sz, |
619 | (int32_t)write_size); |
620 | |
621 | // If we just defragged into a new destination swb, count it. |
622 | if (swb_add_unique_vacated_wblock(swb, src_ssd->file_id, src_wblock_id)) { |
623 | ssd_wblock_state* p_wblock_state = |
624 | &src_ssd->wblock_state[src_wblock_id]; |
625 | |
626 | cf_atomic32_incr(&p_wblock_state->n_vac_dests); |
627 | } |
628 | |
629 | cf_mutex_unlock(&ssd->defrag_lock); |
630 | |
631 | ssd_block_free(src_ssd, old_rblock_id, old_n_rblocks, "defrag-write" ); |
632 | } |
633 | |
634 | |
635 | int |
636 | ssd_record_defrag(drv_ssd *ssd, uint32_t wblock_id, as_flat_record *flat, |
637 | uint64_t rblock_id) |
638 | { |
639 | as_namespace *ns = ssd->ns; |
640 | as_partition_reservation rsv; |
641 | uint32_t pid = as_partition_getid(&flat->keyd); |
642 | |
643 | as_partition_reserve(ns, pid, &rsv); |
644 | |
645 | int rv; |
646 | as_index_ref r_ref; |
647 | bool found = 0 == as_record_get(rsv.tree, &flat->keyd, &r_ref); |
648 | |
649 | if (found) { |
650 | as_index *r = r_ref.r; |
651 | |
652 | if (r->file_id == ssd->file_id && r->rblock_id == rblock_id) { |
653 | if (r->generation != flat->generation) { |
654 | cf_warning_digest(AS_DRV_SSD, &r->keyd, "device %s defrag: rblock_id %lu generation mismatch (%u:%u) " , |
655 | ssd->name, rblock_id, r->generation, flat->generation); |
656 | } |
657 | |
658 | if (r->n_rblocks != flat->n_rblocks) { |
659 | cf_warning_digest(AS_DRV_SSD, &r->keyd, "device %s defrag: rblock_id %lu n_blocks mismatch (%u:%u) " , |
660 | ssd->name, rblock_id, r->n_rblocks, flat->n_rblocks); |
661 | } |
662 | |
663 | defrag_move_record(ssd, wblock_id, flat, r); |
664 | |
665 | rv = 0; // record was in index tree and current - moved it |
666 | } |
667 | else { |
668 | rv = -1; // record was in index tree - presumably was overwritten |
669 | } |
670 | |
671 | as_record_done(&r_ref, ns); |
672 | } |
673 | else { |
674 | rv = -2; // record was not in index tree - presumably was deleted |
675 | } |
676 | |
677 | as_partition_release(&rsv); |
678 | |
679 | return rv; |
680 | } |
681 | |
682 | |
683 | bool |
684 | ssd_is_full(drv_ssd *ssd, uint32_t wblock_id) |
685 | { |
686 | if (num_free_wblocks(ssd) > DEFRAG_STARTUP_RESERVE) { |
687 | return false; |
688 | } |
689 | |
690 | ssd_wblock_state* p_wblock_state = &ssd->wblock_state[wblock_id]; |
691 | |
692 | cf_mutex_lock(&p_wblock_state->LOCK); |
693 | |
694 | if (cf_atomic32_get(p_wblock_state->inuse_sz) == 0) { |
695 | // Lucky - wblock is empty, let ssd_defrag_wblock() free it. |
696 | cf_mutex_unlock(&p_wblock_state->LOCK); |
697 | |
698 | return false; |
699 | } |
700 | |
701 | cf_warning(AS_DRV_SSD, "{%s}: defrag: drive %s totally full, re-queuing wblock %u" , |
702 | ssd->ns->name, ssd->name, wblock_id); |
703 | |
704 | // Not using push_wblock_to_defrag_q() - state is already DEFRAG, we |
705 | // definitely have a queue, and it's better to push back to head. |
706 | cf_queue_push_head(ssd->defrag_wblock_q, &wblock_id); |
707 | |
708 | cf_mutex_unlock(&p_wblock_state->LOCK); |
709 | |
710 | // If we got here, we used all our runtime reserve wblocks, but the wblocks |
711 | // we defragged must still have non-zero inuse_sz. Must wait for those to |
712 | // become free. Sleep prevents retries from overwhelming the log. |
713 | sleep(1); |
714 | |
715 | return true; |
716 | } |
717 | |
718 | |
719 | int |
720 | ssd_defrag_wblock(drv_ssd *ssd, uint32_t wblock_id, uint8_t *read_buf) |
721 | { |
722 | if (ssd_is_full(ssd, wblock_id)) { |
723 | return 0; |
724 | } |
725 | |
726 | int record_count = 0; |
727 | |
728 | ssd_wblock_state* p_wblock_state = &ssd->wblock_state[wblock_id]; |
729 | |
730 | cf_assert(p_wblock_state->n_vac_dests == 0, AS_DRV_SSD, |
731 | "n-vacations not 0 beginning defrag wblock" ); |
732 | |
733 | // Make sure this can't decrement to 0 while defragging this wblock. |
734 | cf_atomic32_set(&p_wblock_state->n_vac_dests, 1); |
735 | |
736 | if (cf_atomic32_get(p_wblock_state->inuse_sz) == 0) { |
737 | cf_atomic64_incr(&ssd->n_wblock_defrag_io_skips); |
738 | goto Finished; |
739 | } |
740 | |
741 | int fd = ssd_fd_get(ssd); |
742 | uint64_t file_offset = WBLOCK_ID_TO_OFFSET(ssd, wblock_id); |
743 | |
744 | uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0; |
745 | |
746 | if (! pread_all(fd, read_buf, ssd->write_block_size, (off_t)file_offset)) { |
747 | cf_warning(AS_DRV_SSD, "%s: read failed: errno %d (%s)" , ssd->name, |
748 | errno, cf_strerror(errno)); |
749 | close(fd); |
750 | fd = -1; |
751 | goto Finished; |
752 | } |
753 | |
754 | if (start_ns != 0) { |
755 | histogram_insert_data_point(ssd->hist_large_block_read, start_ns); |
756 | } |
757 | |
758 | ssd_fd_put(ssd, fd); |
759 | |
760 | bool prefetch = cf_arenax_want_prefetch(ssd->ns->arena); |
761 | |
762 | if (prefetch) { |
763 | ssd_prefetch_wblock(ssd, file_offset, read_buf); |
764 | } |
765 | |
766 | size_t indent = 0; // current offset within the wblock, in bytes |
767 | |
768 | while (indent < ssd->write_block_size && |
769 | cf_atomic32_get(p_wblock_state->inuse_sz) != 0) { |
770 | as_flat_record *flat = (as_flat_record*)&read_buf[indent]; |
771 | |
772 | if (! prefetch) { |
773 | ssd_decrypt(ssd, file_offset + indent, flat); |
774 | } |
775 | |
776 | if (flat->magic != AS_FLAT_MAGIC) { |
777 | // First block must have magic. |
778 | if (indent == 0) { |
779 | cf_warning(AS_DRV_SSD, "%s: no magic at beginning of used wblock %d" , |
780 | ssd->name, wblock_id); |
781 | break; |
782 | } |
783 | |
784 | // Later blocks may have no magic, just skip to next block. |
785 | indent += RBLOCK_SIZE; |
786 | continue; |
787 | } |
788 | |
789 | uint32_t record_size = N_RBLOCKS_TO_SIZE(flat->n_rblocks); |
790 | |
791 | if (record_size < SSD_RECORD_MIN_SIZE) { |
792 | cf_warning(AS_DRV_SSD, "%s: record too small: size %u" , ssd->name, |
793 | record_size); |
794 | indent += RBLOCK_SIZE; |
795 | continue; // try next rblock |
796 | } |
797 | |
798 | size_t next_indent = indent + record_size; |
799 | |
800 | if (next_indent > ssd->write_block_size) { |
801 | cf_warning(AS_DRV_SSD, "%s: record crosses wblock boundary: n-rblocks %u" , |
802 | ssd->name, flat->n_rblocks); |
803 | break; |
804 | } |
805 | |
806 | // Found a good record, move it if it's current. |
807 | int rv = ssd_record_defrag(ssd, wblock_id, flat, |
808 | OFFSET_TO_RBLOCK_ID(file_offset + indent)); |
809 | |
810 | if (rv == 0) { |
811 | record_count++; |
812 | } |
813 | |
814 | indent = next_indent; |
815 | } |
816 | |
817 | Finished: |
818 | |
819 | // Note - usually wblock's inuse_sz is 0 here, but may legitimately be non-0 |
820 | // e.g. if a dropped partition's tree is not done purging. In this case, we |
821 | // may have found deleted records in the wblock whose used-size contribution |
822 | // has not yet been subtracted. |
823 | |
824 | ssd_release_vacated_wblock(ssd, wblock_id, p_wblock_state); |
825 | |
826 | return record_count; |
827 | } |
828 | |
829 | |
830 | // Thread "run" function to service a device's defrag queue. |
831 | void* |
832 | run_defrag(void *pv_data) |
833 | { |
834 | drv_ssd *ssd = (drv_ssd*)pv_data; |
835 | uint32_t wblock_id; |
836 | uint8_t *read_buf = cf_valloc(ssd->write_block_size); |
837 | |
838 | while (true) { |
839 | uint32_t q_min = ssd->ns->storage_defrag_queue_min; |
840 | |
841 | if (q_min != 0) { |
842 | if (cf_queue_sz(ssd->defrag_wblock_q) > q_min) { |
843 | if (CF_QUEUE_OK != |
844 | cf_queue_pop(ssd->defrag_wblock_q, &wblock_id, |
845 | CF_QUEUE_NOWAIT)) { |
846 | // Should never get here! |
847 | break; |
848 | } |
849 | } |
850 | else { |
851 | usleep(1000 * 50); |
852 | continue; |
853 | } |
854 | } |
855 | else { |
856 | if (CF_QUEUE_OK != |
857 | cf_queue_pop(ssd->defrag_wblock_q, &wblock_id, |
858 | CF_QUEUE_FOREVER)) { |
859 | // Should never get here! |
860 | break; |
861 | } |
862 | } |
863 | |
864 | ssd_defrag_wblock(ssd, wblock_id, read_buf); |
865 | |
866 | uint32_t sleep_us = ssd->ns->storage_defrag_sleep; |
867 | |
868 | if (sleep_us != 0) { |
869 | usleep(sleep_us); |
870 | } |
871 | } |
872 | |
873 | // Although we ever expect to get here... |
874 | cf_free(read_buf); |
875 | cf_warning(AS_DRV_SSD, "device %s: quit defrag - queue error" , ssd->name); |
876 | |
877 | return NULL; |
878 | } |
879 | |
880 | |
881 | void |
882 | ssd_start_defrag_threads(drv_ssds *ssds) |
883 | { |
884 | cf_info(AS_DRV_SSD, "{%s} starting defrag threads" , ssds->ns->name); |
885 | |
886 | for (int i = 0; i < ssds->n_ssds; i++) { |
887 | drv_ssd *ssd = &ssds->ssds[i]; |
888 | |
889 | cf_thread_create_detached(run_defrag, (void*)ssd); |
890 | } |
891 | } |
892 | |
893 | |
894 | //------------------------------------------------ |
895 | // defrag_pen class. |
896 | // |
897 | |
898 | #define DEFRAG_PEN_INIT_CAPACITY (8 * 1024) |
899 | |
900 | typedef struct defrag_pen_s { |
901 | uint32_t n_ids; |
902 | uint32_t capacity; |
903 | uint32_t *ids; |
904 | uint32_t stack_ids[DEFRAG_PEN_INIT_CAPACITY]; |
905 | } defrag_pen; |
906 | |
907 | static void |
908 | defrag_pen_init(defrag_pen *pen) |
909 | { |
910 | pen->n_ids = 0; |
911 | pen->capacity = DEFRAG_PEN_INIT_CAPACITY; |
912 | pen->ids = pen->stack_ids; |
913 | } |
914 | |
915 | static void |
916 | defrag_pen_destroy(defrag_pen *pen) |
917 | { |
918 | if (pen->ids != pen->stack_ids) { |
919 | cf_free(pen->ids); |
920 | } |
921 | } |
922 | |
923 | static void |
924 | defrag_pen_add(defrag_pen *pen, uint32_t wblock_id) |
925 | { |
926 | if (pen->n_ids == pen->capacity) { |
927 | if (pen->capacity == DEFRAG_PEN_INIT_CAPACITY) { |
928 | pen->capacity <<= 2; |
929 | pen->ids = cf_malloc(pen->capacity * sizeof(uint32_t)); |
930 | memcpy(pen->ids, pen->stack_ids, sizeof(pen->stack_ids)); |
931 | } |
932 | else { |
933 | pen->capacity <<= 1; |
934 | pen->ids = cf_realloc(pen->ids, pen->capacity * sizeof(uint32_t)); |
935 | } |
936 | } |
937 | |
938 | pen->ids[pen->n_ids++] = wblock_id; |
939 | } |
940 | |
941 | static void |
942 | defrag_pen_transfer(defrag_pen *pen, drv_ssd *ssd) |
943 | { |
944 | // For speed, "customize" instead of using push_wblock_to_defrag_q()... |
945 | for (uint32_t i = 0; i < pen->n_ids; i++) { |
946 | uint32_t wblock_id = pen->ids[i]; |
947 | |
948 | ssd->wblock_state[wblock_id].state = WBLOCK_STATE_DEFRAG; |
949 | cf_queue_push(ssd->defrag_wblock_q, &wblock_id); |
950 | } |
951 | } |
952 | |
953 | static void |
954 | defrag_pens_dump(defrag_pen pens[], uint32_t n_pens, const char* ssd_name) |
955 | { |
956 | char buf[2048]; |
957 | uint32_t n = 0; |
958 | int pos = sprintf(buf, "%u" , pens[n++].n_ids); |
959 | |
960 | while (n < n_pens) { |
961 | pos += sprintf(buf + pos, ",%u" , pens[n++].n_ids); |
962 | } |
963 | |
964 | cf_info(AS_DRV_SSD, "%s init defrag profile: %s" , ssd_name, buf); |
965 | } |
966 | |
967 | // |
968 | // END - defrag_pen class. |
969 | //------------------------------------------------ |
970 | |
971 | |
972 | // Thread "run" function to create and load a device's (wblock) free & defrag |
973 | // queues at startup. Sorts defrag-eligible wblocks so the most depleted ones |
974 | // are at the head of the defrag queue. |
975 | void* |
976 | run_load_queues(void *pv_data) |
977 | { |
978 | drv_ssd *ssd = (drv_ssd*)pv_data; |
979 | |
980 | ssd->free_wblock_q = cf_queue_create(sizeof(uint32_t), true); |
981 | ssd->defrag_wblock_q = cf_queue_create(sizeof(uint32_t), true); |
982 | |
983 | as_namespace *ns = ssd->ns; |
984 | uint32_t lwm_pct = ns->storage_defrag_lwm_pct; |
985 | uint32_t lwm_size = ns->defrag_lwm_size; |
986 | defrag_pen pens[lwm_pct]; |
987 | |
988 | for (uint32_t n = 0; n < lwm_pct; n++) { |
989 | defrag_pen_init(&pens[n]); |
990 | } |
991 | |
992 | uint32_t first_id = ssd->first_wblock_id; |
993 | uint32_t end_id = ssd->pristine_wblock_id; |
994 | |
995 | // TODO - paranoia - remove eventually. |
996 | cf_assert(end_id >= first_id && end_id <= ssd->n_wblocks, AS_DRV_SSD, |
997 | "%s bad pristine-wblock-id %u" , ssd->name, end_id); |
998 | |
999 | for (uint32_t wblock_id = first_id; wblock_id < end_id; wblock_id++) { |
1000 | uint32_t inuse_sz = ssd->wblock_state[wblock_id].inuse_sz; |
1001 | |
1002 | if (inuse_sz == 0) { |
1003 | // Faster than using push_wblock_to_free_q() here... |
1004 | cf_queue_push(ssd->free_wblock_q, &wblock_id); |
1005 | } |
1006 | else if (inuse_sz < lwm_size) { |
1007 | defrag_pen_add(&pens[(inuse_sz * lwm_pct) / lwm_size], wblock_id); |
1008 | } |
1009 | } |
1010 | |
1011 | defrag_pens_dump(pens, lwm_pct, ssd->name); |
1012 | |
1013 | for (uint32_t n = 0; n < lwm_pct; n++) { |
1014 | defrag_pen_transfer(&pens[n], ssd); |
1015 | defrag_pen_destroy(&pens[n]); |
1016 | } |
1017 | |
1018 | ssd->n_defrag_wblock_reads = (uint64_t)cf_queue_sz(ssd->defrag_wblock_q); |
1019 | |
1020 | return NULL; |
1021 | } |
1022 | |
1023 | |
1024 | void |
1025 | ssd_load_wblock_queues(drv_ssds *ssds) |
1026 | { |
1027 | cf_info(AS_DRV_SSD, "{%s} loading free & defrag queues" , ssds->ns->name); |
1028 | |
1029 | // Split this task across multiple threads. |
1030 | cf_tid tids[ssds->n_ssds]; |
1031 | |
1032 | for (int i = 0; i < ssds->n_ssds; i++) { |
1033 | drv_ssd *ssd = &ssds->ssds[i]; |
1034 | |
1035 | tids[i] = cf_thread_create_joinable(run_load_queues, (void*)ssd); |
1036 | } |
1037 | |
1038 | for (int i = 0; i < ssds->n_ssds; i++) { |
1039 | cf_thread_join(tids[i]); |
1040 | } |
1041 | // Now we're single-threaded again. |
1042 | |
1043 | for (int i = 0; i < ssds->n_ssds; i++) { |
1044 | drv_ssd *ssd = &ssds->ssds[i]; |
1045 | |
1046 | cf_info(AS_DRV_SSD, "%s init wblocks: pristine-id %u pristine %u free-q %d, defrag-q %d" , |
1047 | ssd->name, ssd->pristine_wblock_id, num_pristine_wblocks(ssd), |
1048 | cf_queue_sz(ssd->free_wblock_q), |
1049 | cf_queue_sz(ssd->defrag_wblock_q)); |
1050 | } |
1051 | } |
1052 | |
1053 | |
1054 | void |
1055 | ssd_wblock_init(drv_ssd *ssd) |
1056 | { |
1057 | uint32_t n_wblocks = (uint32_t)(ssd->file_size / ssd->write_block_size); |
1058 | |
1059 | cf_info(AS_DRV_SSD, "%s has %u wblocks of size %u" , ssd->name, n_wblocks, |
1060 | ssd->write_block_size); |
1061 | |
1062 | ssd->n_wblocks = n_wblocks; |
1063 | ssd->wblock_state = cf_malloc(n_wblocks * sizeof(ssd_wblock_state)); |
1064 | |
1065 | // Device header wblocks' inuse_sz will (also) be 0 but that doesn't matter. |
1066 | for (uint32_t i = 0; i < n_wblocks; i++) { |
1067 | ssd_wblock_state * p_wblock_state = &ssd->wblock_state[i]; |
1068 | |
1069 | cf_atomic32_set(&p_wblock_state->inuse_sz, 0); |
1070 | cf_mutex_init(&p_wblock_state->LOCK); |
1071 | p_wblock_state->swb = NULL; |
1072 | p_wblock_state->state = WBLOCK_STATE_NONE; |
1073 | p_wblock_state->n_vac_dests = 0; |
1074 | } |
1075 | } |
1076 | |
1077 | |
1078 | //========================================================== |
1079 | // Record reading utilities. |
1080 | // |
1081 | |
1082 | int |
1083 | ssd_read_record(as_storage_rd *rd, bool pickle_only) |
1084 | { |
1085 | as_namespace *ns = rd->ns; |
1086 | as_record *r = rd->r; |
1087 | drv_ssd *ssd = rd->ssd; |
1088 | |
1089 | if (STORAGE_RBLOCK_IS_INVALID(r->rblock_id)) { |
1090 | cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: invalid rblock_id " , |
1091 | ns->name); |
1092 | return -1; |
1093 | } |
1094 | |
1095 | uint64_t record_offset = RBLOCK_ID_TO_OFFSET(r->rblock_id); |
1096 | uint32_t record_size = N_RBLOCKS_TO_SIZE(r->n_rblocks); |
1097 | uint64_t record_end_offset = record_offset + record_size; |
1098 | |
1099 | uint32_t wblock_id = OFFSET_TO_WBLOCK_ID(ssd, record_offset); |
1100 | |
1101 | if (wblock_id >= ssd->n_wblocks) { |
1102 | cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: bad offset %lu " , |
1103 | ns->name, record_offset); |
1104 | return -1; |
1105 | } |
1106 | |
1107 | if (record_size < SSD_RECORD_MIN_SIZE) { |
1108 | cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: bad record size %u " , |
1109 | ns->name, record_size); |
1110 | return -1; |
1111 | } |
1112 | |
1113 | if (record_end_offset > WBLOCK_ID_TO_OFFSET(ssd, wblock_id + 1)) { |
1114 | cf_warning_digest(AS_DRV_SSD, &r->keyd, "{%s} read_ssd: record size %u crosses wblock boundary " , |
1115 | ns->name, record_size); |
1116 | return -1; |
1117 | } |
1118 | |
1119 | uint8_t *read_buf = NULL; |
1120 | as_flat_record *flat = NULL; |
1121 | |
1122 | ssd_write_buf *swb = NULL; |
1123 | |
1124 | swb_check_and_reserve(&ssd->wblock_state[wblock_id], &swb); |
1125 | |
1126 | if (swb) { |
1127 | // Data is in write buffer, so read it from there. |
1128 | cf_atomic32_incr(&ns->n_reads_from_cache); |
1129 | |
1130 | read_buf = cf_malloc(record_size); |
1131 | flat = (as_flat_record*)read_buf; |
1132 | |
1133 | int swb_offset = record_offset - WBLOCK_ID_TO_OFFSET(ssd, wblock_id); |
1134 | memcpy(read_buf, swb->buf + swb_offset, record_size); |
1135 | swb_release(swb); |
1136 | |
1137 | ssd_decrypt_whole(ssd, record_offset, r->n_rblocks, flat); |
1138 | } |
1139 | else { |
1140 | // Normal case - data is read from device. |
1141 | cf_atomic32_incr(&ns->n_reads_from_device); |
1142 | |
1143 | uint64_t read_offset = BYTES_DOWN_TO_IO_MIN(ssd, record_offset); |
1144 | uint64_t read_end_offset = BYTES_UP_TO_IO_MIN(ssd, record_end_offset); |
1145 | size_t read_size = read_end_offset - read_offset; |
1146 | uint64_t record_buf_indent = record_offset - read_offset; |
1147 | |
1148 | read_buf = cf_valloc(read_size); |
1149 | |
1150 | int fd = rd->read_page_cache ? ssd_fd_cache_get(ssd) : ssd_fd_get(ssd); |
1151 | |
1152 | uint64_t start_ns = ns->storage_benchmarks_enabled ? cf_getns() : 0; |
1153 | uint64_t start_us = as_health_sample_device_read() ? cf_getus() : 0; |
1154 | |
1155 | if (! pread_all(fd, read_buf, read_size, (off_t)read_offset)) { |
1156 | cf_warning(AS_DRV_SSD, "%s: read failed: size %lu: errno %d (%s)" , |
1157 | ssd->name, read_size, errno, cf_strerror(errno)); |
1158 | cf_free(read_buf); |
1159 | close(fd); |
1160 | return -1; |
1161 | } |
1162 | |
1163 | if (start_ns != 0) { |
1164 | histogram_insert_data_point(ssd->hist_read, start_ns); |
1165 | } |
1166 | |
1167 | as_health_add_device_latency(ns->id, r->file_id, start_us); |
1168 | |
1169 | if (rd->read_page_cache) { |
1170 | ssd_fd_cache_put(ssd, fd); |
1171 | } |
1172 | else { |
1173 | ssd_fd_put(ssd, fd); |
1174 | } |
1175 | |
1176 | flat = (as_flat_record*)(read_buf + record_buf_indent); |
1177 | ssd_decrypt_whole(ssd, record_offset, r->n_rblocks, flat); |
1178 | |
1179 | // Sanity checks. |
1180 | |
1181 | if (flat->magic != AS_FLAT_MAGIC) { |
1182 | cf_warning(AS_DRV_SSD, "read: bad block magic offset %lu" , |
1183 | read_offset); |
1184 | cf_free(read_buf); |
1185 | return -1; |
1186 | } |
1187 | |
1188 | if (flat->n_rblocks != r->n_rblocks) { |
1189 | cf_warning(AS_DRV_SSD, "read: bad n-rblocks %u %u" , |
1190 | flat->n_rblocks, r->n_rblocks); |
1191 | cf_free(read_buf); |
1192 | return -1; |
1193 | } |
1194 | |
1195 | if (0 != cf_digest_compare(&flat->keyd, &r->keyd)) { |
1196 | cf_warning(AS_DRV_SSD, "read: read wrong key: expecting %lx got %lx" , |
1197 | *(uint64_t*)&r->keyd, *(uint64_t*)&flat->keyd); |
1198 | cf_free(read_buf); |
1199 | return -1; |
1200 | } |
1201 | |
1202 | if (ns->storage_benchmarks_enabled) { |
1203 | histogram_insert_raw(ns->device_read_size_hist, read_size); |
1204 | } |
1205 | } |
1206 | |
1207 | rd->flat = flat; |
1208 | rd->read_buf = read_buf; // no need to free read_buf on error now |
1209 | |
1210 | as_flat_opt_meta opt_meta = { 0 }; |
1211 | |
1212 | rd->flat_end = (const uint8_t*)flat + record_size; |
1213 | rd->flat_bins = as_flat_unpack_record_meta(flat, rd->flat_end, &opt_meta, |
1214 | ns->single_bin); |
1215 | |
1216 | if (! rd->flat_bins) { |
1217 | cf_warning(AS_DRV_SSD, "read: bad record metadata" ); |
1218 | return -1; |
1219 | } |
1220 | |
1221 | // After unpacking meta so there's a bit of sanity checking. |
1222 | if (pickle_only) { |
1223 | return 0; |
1224 | } |
1225 | |
1226 | if (! as_flat_decompress_bins(&opt_meta.cm, rd)) { |
1227 | cf_warning_digest(AS_DRV_SSD, &r->keyd, |
1228 | "{%s} read: bad compressed data (%s:%lu) " , |
1229 | ns->name, ssd->name, record_offset); |
1230 | return -1; |
1231 | } |
1232 | |
1233 | if (opt_meta.key) { |
1234 | rd->key_size = opt_meta.key_size; |
1235 | rd->key = opt_meta.key; |
1236 | } |
1237 | // else - if updating record without key, leave rd (msg) key to be stored. |
1238 | |
1239 | rd->flat_n_bins = (uint16_t)opt_meta.n_bins; |
1240 | |
1241 | return 0; |
1242 | } |
1243 | |
1244 | |
1245 | //========================================================== |
1246 | // Storage API implementation: reading records. |
1247 | // |
1248 | |
1249 | int |
1250 | as_storage_record_load_n_bins_ssd(as_storage_rd *rd) |
1251 | { |
1252 | if (! as_record_is_live(rd->r)) { |
1253 | rd->n_bins = 0; |
1254 | return 0; // no need to read device |
1255 | } |
1256 | |
1257 | // If record hasn't been read, read it - sets rd->block_n_bins. |
1258 | if (! rd->flat && ssd_read_record(rd, false) != 0) { |
1259 | cf_warning(AS_DRV_SSD, "load_n_bins: failed ssd_read_record()" ); |
1260 | return -AS_ERR_UNKNOWN; |
1261 | } |
1262 | |
1263 | rd->n_bins = rd->flat_n_bins; |
1264 | |
1265 | return 0; |
1266 | } |
1267 | |
1268 | |
1269 | int |
1270 | as_storage_record_load_bins_ssd(as_storage_rd *rd) |
1271 | { |
1272 | if (! as_record_is_live(rd->r)) { |
1273 | return 0; // no need to read device |
1274 | } |
1275 | |
1276 | // If record hasn't been read, read it - sets rd->block_bins and |
1277 | // rd->block_n_bins. |
1278 | if (! rd->flat && ssd_read_record(rd, false) != 0) { |
1279 | cf_warning(AS_DRV_SSD, "load_bins: failed ssd_read_record()" ); |
1280 | return -AS_ERR_UNKNOWN; |
1281 | } |
1282 | |
1283 | return as_flat_unpack_bins(rd->ns, rd->flat_bins, rd->flat_end, |
1284 | rd->flat_n_bins, rd->bins); |
1285 | } |
1286 | |
1287 | |
1288 | bool |
1289 | as_storage_record_get_key_ssd(as_storage_rd *rd) |
1290 | { |
1291 | // If record hasn't been read, read it - sets rd->key_size and rd->key. |
1292 | if (! rd->flat && ssd_read_record(rd, false) != 0) { |
1293 | cf_warning(AS_DRV_SSD, "get_key: failed ssd_read_record()" ); |
1294 | return false; |
1295 | } |
1296 | |
1297 | return true; |
1298 | } |
1299 | |
1300 | |
1301 | bool |
1302 | as_storage_record_get_pickle_ssd(as_storage_rd *rd) |
1303 | { |
1304 | if (ssd_read_record(rd, true) != 0) { |
1305 | return false; |
1306 | } |
1307 | |
1308 | size_t sz = rd->flat_end - (const uint8_t*)rd->flat; |
1309 | |
1310 | rd->pickle = cf_malloc(sz); |
1311 | rd->pickle_sz = (uint32_t)sz; |
1312 | |
1313 | memcpy(rd->pickle, rd->flat, sz); |
1314 | |
1315 | return true; |
1316 | } |
1317 | |
1318 | |
1319 | //========================================================== |
1320 | // Record writing utilities. |
1321 | // |
1322 | |
1323 | void |
1324 | ssd_flush_swb(drv_ssd *ssd, ssd_write_buf *swb) |
1325 | { |
1326 | // Wait for all writers to finish. |
1327 | while (cf_atomic32_get(swb->n_writers) != 0) { |
1328 | ; |
1329 | } |
1330 | |
1331 | int fd = ssd_fd_get(ssd); |
1332 | off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id); |
1333 | |
1334 | uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0; |
1335 | |
1336 | if (! pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)) { |
1337 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)" , |
1338 | ssd->name, errno, cf_strerror(errno)); |
1339 | } |
1340 | |
1341 | if (start_ns != 0) { |
1342 | histogram_insert_data_point(ssd->hist_write, start_ns); |
1343 | } |
1344 | |
1345 | ssd_fd_put(ssd, fd); |
1346 | } |
1347 | |
1348 | |
1349 | void |
1350 | ssd_shadow_flush_swb(drv_ssd *ssd, ssd_write_buf *swb) |
1351 | { |
1352 | int fd = ssd_shadow_fd_get(ssd); |
1353 | off_t write_offset = (off_t)WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id); |
1354 | |
1355 | uint64_t start_ns = ssd->ns->storage_benchmarks_enabled ? cf_getns() : 0; |
1356 | |
1357 | if (! pwrite_all(fd, swb->buf, ssd->write_block_size, write_offset)) { |
1358 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)" , |
1359 | ssd->shadow_name, errno, cf_strerror(errno)); |
1360 | } |
1361 | |
1362 | if (start_ns != 0) { |
1363 | histogram_insert_data_point(ssd->hist_shadow_write, start_ns); |
1364 | } |
1365 | |
1366 | ssd_shadow_fd_put(ssd, fd); |
1367 | } |
1368 | |
1369 | |
1370 | void |
1371 | ssd_write_sanity_checks(drv_ssd *ssd, ssd_write_buf *swb) |
1372 | { |
1373 | ssd_wblock_state* p_wblock_state = &ssd->wblock_state[swb->wblock_id]; |
1374 | |
1375 | cf_assert(p_wblock_state->swb == swb, AS_DRV_SSD, |
1376 | "device %s: wblock-id %u swb not consistent while writing" , |
1377 | ssd->name, swb->wblock_id); |
1378 | |
1379 | cf_assert(p_wblock_state->state != WBLOCK_STATE_DEFRAG, AS_DRV_SSD, |
1380 | "device %s: wblock-id %u state DEFRAG while writing" , ssd->name, |
1381 | swb->wblock_id); |
1382 | } |
1383 | |
1384 | |
1385 | void |
1386 | ssd_post_write(drv_ssd *ssd, ssd_write_buf *swb) |
1387 | { |
1388 | if (cf_atomic32_get(ssd->ns->storage_post_write_queue) == 0 || |
1389 | swb->skip_post_write_q) { |
1390 | swb_dereference_and_release(ssd, swb->wblock_id, swb); |
1391 | } |
1392 | else { |
1393 | // Transfer swb to post-write queue. |
1394 | cf_queue_push(ssd->post_write_q, &swb); |
1395 | } |
1396 | |
1397 | if (ssd->post_write_q) { |
1398 | // Release post-write queue swbs if we're over the limit. |
1399 | while ((uint32_t)cf_queue_sz(ssd->post_write_q) > |
1400 | cf_atomic32_get(ssd->ns->storage_post_write_queue)) { |
1401 | ssd_write_buf* cached_swb; |
1402 | |
1403 | if (CF_QUEUE_OK != cf_queue_pop(ssd->post_write_q, &cached_swb, |
1404 | CF_QUEUE_NOWAIT)) { |
1405 | // Should never happen. |
1406 | cf_warning(AS_DRV_SSD, "device %s: post-write queue pop failed" , |
1407 | ssd->name); |
1408 | break; |
1409 | } |
1410 | |
1411 | swb_dereference_and_release(ssd, cached_swb->wblock_id, |
1412 | cached_swb); |
1413 | } |
1414 | } |
1415 | } |
1416 | |
1417 | |
1418 | // Thread "run" function that flushes write buffers to device. |
1419 | void * |
1420 | run_write(void *arg) |
1421 | { |
1422 | drv_ssd *ssd = (drv_ssd*)arg; |
1423 | |
1424 | while (ssd->running) { |
1425 | ssd_write_buf *swb; |
1426 | |
1427 | if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_write_q, &swb, 100)) { |
1428 | continue; |
1429 | } |
1430 | |
1431 | // Sanity checks (optional). |
1432 | ssd_write_sanity_checks(ssd, swb); |
1433 | |
1434 | // Flush to the device. |
1435 | ssd_flush_swb(ssd, swb); |
1436 | |
1437 | if (ssd->shadow_name) { |
1438 | // Queue for shadow device write. |
1439 | cf_queue_push(ssd->swb_shadow_q, &swb); |
1440 | } |
1441 | else { |
1442 | // If this swb was a defrag destination, release the sources. |
1443 | swb_release_all_vacated_wblocks(swb); |
1444 | |
1445 | // Transfer to post-write queue, or release swb, as appropriate. |
1446 | ssd_post_write(ssd, swb); |
1447 | } |
1448 | } // infinite event loop waiting for block to write |
1449 | |
1450 | return NULL; |
1451 | } |
1452 | |
1453 | |
1454 | // Thread "run" function that flushes write buffers to shadow device. |
1455 | void * |
1456 | run_shadow(void *arg) |
1457 | { |
1458 | drv_ssd *ssd = (drv_ssd*)arg; |
1459 | |
1460 | while (ssd->running) { |
1461 | ssd_write_buf *swb; |
1462 | |
1463 | if (CF_QUEUE_OK != cf_queue_pop(ssd->swb_shadow_q, &swb, 100)) { |
1464 | continue; |
1465 | } |
1466 | |
1467 | // Sanity checks (optional). |
1468 | ssd_write_sanity_checks(ssd, swb); |
1469 | |
1470 | // Flush to the shadow device. |
1471 | ssd_shadow_flush_swb(ssd, swb); |
1472 | |
1473 | // If this swb was a defrag destination, release the sources. |
1474 | swb_release_all_vacated_wblocks(swb); |
1475 | |
1476 | // Transfer to post-write queue, or release swb, as appropriate. |
1477 | ssd_post_write(ssd, swb); |
1478 | } |
1479 | |
1480 | return NULL; |
1481 | } |
1482 | |
1483 | |
1484 | void |
1485 | ssd_start_write_threads(drv_ssds *ssds) |
1486 | { |
1487 | cf_info(AS_DRV_SSD, "{%s} starting write threads" , ssds->ns->name); |
1488 | |
1489 | for (int i = 0; i < ssds->n_ssds; i++) { |
1490 | drv_ssd *ssd = &ssds->ssds[i]; |
1491 | |
1492 | ssd->write_tid = cf_thread_create_joinable(run_write, (void*)ssd); |
1493 | |
1494 | if (ssd->shadow_name) { |
1495 | ssd->shadow_tid = cf_thread_create_joinable(run_shadow, (void*)ssd); |
1496 | } |
1497 | } |
1498 | } |
1499 | |
1500 | |
1501 | int |
1502 | ssd_buffer_bins(as_storage_rd *rd) |
1503 | { |
1504 | as_namespace *ns = rd->ns; |
1505 | as_record *r = rd->r; |
1506 | drv_ssd *ssd = rd->ssd; |
1507 | |
1508 | uint32_t flat_sz = rd->pickle == NULL ? |
1509 | as_flat_record_size(rd) : rd->orig_pickle_sz; |
1510 | |
1511 | if (flat_sz > ssd->write_block_size) { |
1512 | cf_detail_digest(AS_DRV_SSD, &r->keyd, "write: size %u - rejecting " , |
1513 | flat_sz); |
1514 | return -AS_ERR_RECORD_TOO_BIG; |
1515 | } |
1516 | |
1517 | as_flat_record *flat; |
1518 | |
1519 | if (rd->pickle == NULL) { |
1520 | flat = as_flat_compress_bins_and_pack_record(rd, ssd->write_block_size, |
1521 | &flat_sz); |
1522 | } |
1523 | else { |
1524 | flat = (as_flat_record *)rd->pickle; |
1525 | flat_sz = rd->pickle_sz; |
1526 | |
1527 | // Tree IDs are node-local - can't use those sent from other nodes. |
1528 | flat->tree_id = r->tree_id; |
1529 | } |
1530 | |
1531 | // Note - this is the only place where rounding size (up to a multiple of |
1532 | // RBLOCK_SIZE) is really necessary. |
1533 | uint32_t write_sz = SIZE_UP_TO_RBLOCK_SIZE(flat_sz); |
1534 | |
1535 | // Reserve the portion of the current swb where this record will be written. |
1536 | cf_mutex_lock(&ssd->write_lock); |
1537 | |
1538 | ssd_write_buf *swb = ssd->current_swb; |
1539 | |
1540 | if (! swb) { |
1541 | swb = swb_get(ssd); |
1542 | ssd->current_swb = swb; |
1543 | |
1544 | if (! swb) { |
1545 | cf_warning(AS_DRV_SSD, "write bins: couldn't get swb" ); |
1546 | cf_mutex_unlock(&ssd->write_lock); |
1547 | return -AS_ERR_OUT_OF_SPACE; |
1548 | } |
1549 | } |
1550 | |
1551 | // Check if there's enough space in current buffer - if not, free and zero |
1552 | // any remaining unused space, enqueue it to be flushed to device, and grab |
1553 | // a new buffer. |
1554 | if (write_sz > ssd->write_block_size - swb->pos) { |
1555 | if (ssd->write_block_size != swb->pos) { |
1556 | // Clean the end of the buffer before pushing to write queue. |
1557 | memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos); |
1558 | } |
1559 | |
1560 | // Enqueue the buffer, to be flushed to device. |
1561 | cf_queue_push(ssd->swb_write_q, &swb); |
1562 | cf_atomic64_incr(&ssd->n_wblock_writes); |
1563 | |
1564 | // Get the new buffer. |
1565 | swb = swb_get(ssd); |
1566 | ssd->current_swb = swb; |
1567 | |
1568 | if (! swb) { |
1569 | cf_warning(AS_DRV_SSD, "write bins: couldn't get swb" ); |
1570 | cf_mutex_unlock(&ssd->write_lock); |
1571 | return -AS_ERR_OUT_OF_SPACE; |
1572 | } |
1573 | } |
1574 | |
1575 | uint32_t n_rblocks = ROUNDED_SIZE_TO_N_RBLOCKS(write_sz); |
1576 | uint32_t swb_pos; |
1577 | int rv = 0; |
1578 | |
1579 | if (n_rblocks == r->n_rblocks && |
1580 | swb->wblock_id == RBLOCK_ID_TO_WBLOCK_ID(ssd, r->rblock_id) && |
1581 | ssd->file_id == r->file_id) { |
1582 | // Stored size is unchanged, and previous version is in this buffer - |
1583 | // just overwrite at the previous position. |
1584 | swb_pos = RBLOCK_ID_TO_OFFSET(r->rblock_id) - |
1585 | WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id); |
1586 | rv = WRITE_IN_PLACE; |
1587 | } |
1588 | else { |
1589 | // There's enough space - save the position where this record will be |
1590 | // written, and advance swb->pos for the next writer. |
1591 | swb_pos = swb->pos; |
1592 | swb->pos += write_sz; |
1593 | } |
1594 | |
1595 | cf_atomic32_incr(&swb->n_writers); |
1596 | swb->dirty = true; |
1597 | |
1598 | cf_mutex_unlock(&ssd->write_lock); |
1599 | // May now write this record concurrently with others in this swb. |
1600 | |
1601 | // Flatten data into the block. |
1602 | |
1603 | as_flat_record *flat_in_swb = (as_flat_record*)&swb->buf[swb_pos]; |
1604 | |
1605 | if (flat == NULL) { |
1606 | as_flat_pack_record(rd, n_rblocks, flat_in_swb); |
1607 | } |
1608 | else { |
1609 | memcpy(flat_in_swb, flat, flat_sz); |
1610 | } |
1611 | |
1612 | // Make a pickle if needed. |
1613 | if (rd->keep_pickle) { |
1614 | rd->pickle_sz = flat_sz; |
1615 | rd->pickle = cf_malloc(flat_sz); |
1616 | memcpy(rd->pickle, flat_in_swb, flat_sz); |
1617 | } |
1618 | |
1619 | uint64_t write_offset = WBLOCK_ID_TO_OFFSET(ssd, swb->wblock_id) + swb_pos; |
1620 | |
1621 | ssd_encrypt(ssd, write_offset, flat_in_swb); |
1622 | |
1623 | if (rv != WRITE_IN_PLACE) { |
1624 | r->file_id = ssd->file_id; |
1625 | r->rblock_id = OFFSET_TO_RBLOCK_ID(write_offset); |
1626 | r->n_rblocks = n_rblocks; |
1627 | |
1628 | cf_atomic64_add(&ssd->inuse_size, (int64_t)write_sz); |
1629 | cf_atomic32_add(&ssd->wblock_state[swb->wblock_id].inuse_sz, |
1630 | (int32_t)write_sz); |
1631 | } |
1632 | |
1633 | // We are finished writing to the buffer. |
1634 | cf_atomic32_decr(&swb->n_writers); |
1635 | |
1636 | if (ns->storage_benchmarks_enabled) { |
1637 | histogram_insert_raw(ns->device_write_size_hist, write_sz); |
1638 | } |
1639 | |
1640 | return rv; |
1641 | } |
1642 | |
1643 | |
1644 | int |
1645 | ssd_write(as_storage_rd *rd) |
1646 | { |
1647 | as_record *r = rd->r; |
1648 | |
1649 | drv_ssd *old_ssd = NULL; |
1650 | uint64_t old_rblock_id = 0; |
1651 | uint32_t old_n_rblocks = 0; |
1652 | |
1653 | if (STORAGE_RBLOCK_IS_VALID(r->rblock_id)) { |
1654 | // Replacing an old record. |
1655 | old_ssd = rd->ssd; |
1656 | old_rblock_id = r->rblock_id; |
1657 | old_n_rblocks = r->n_rblocks; |
1658 | } |
1659 | |
1660 | drv_ssds *ssds = (drv_ssds*)rd->ns->storage_private; |
1661 | |
1662 | // Figure out which device to write to. When replacing an old record, it's |
1663 | // possible this is different from the old device (e.g. if we've added a |
1664 | // fresh device), so derive it from the digest each time. |
1665 | rd->ssd = &ssds->ssds[ssd_get_file_id(ssds, &r->keyd)]; |
1666 | |
1667 | cf_assert(rd->ssd, AS_DRV_SSD, "{%s} null ssd" , rd->ns->name); |
1668 | |
1669 | int rv = ssd_write_bins(rd); |
1670 | |
1671 | if (rv == 0 && old_ssd) { |
1672 | ssd_block_free(old_ssd, old_rblock_id, old_n_rblocks, "ssd-write" ); |
1673 | } |
1674 | else if (rv == WRITE_IN_PLACE) { |
1675 | return 0; // no need to free old block - it's reused |
1676 | } |
1677 | |
1678 | return rv; |
1679 | } |
1680 | |
1681 | |
1682 | //========================================================== |
1683 | // Storage statistics utilities. |
1684 | // |
1685 | |
1686 | void |
1687 | as_storage_show_wblock_stats(as_namespace *ns) |
1688 | { |
1689 | if (AS_STORAGE_ENGINE_SSD != ns->storage_type) { |
1690 | cf_info(AS_DRV_SSD, "Storage engine type must be SSD (%d), not %d." , |
1691 | AS_STORAGE_ENGINE_SSD, ns->storage_type); |
1692 | return; |
1693 | } |
1694 | |
1695 | if (ns->storage_private) { |
1696 | drv_ssds *ssds = ns->storage_private; |
1697 | |
1698 | for (int d = 0; d < ssds->n_ssds; d++) { |
1699 | int num_free_blocks = 0; |
1700 | int num_full_blocks = 0; |
1701 | int num_full_swb = 0; |
1702 | int num_above_wm = 0; |
1703 | int num_defraggable = 0; |
1704 | |
1705 | drv_ssd *ssd = &ssds->ssds[d]; |
1706 | uint32_t lwm_size = ns->defrag_lwm_size; |
1707 | |
1708 | for (uint32_t i = 0; i < ssd->n_wblocks; i++) { |
1709 | ssd_wblock_state *wblock_state = &ssd->wblock_state[i]; |
1710 | uint32_t inuse_sz = cf_atomic32_get(wblock_state->inuse_sz); |
1711 | |
1712 | if (inuse_sz == 0) { |
1713 | num_free_blocks++; |
1714 | } |
1715 | else if (inuse_sz == ssd->write_block_size) { |
1716 | if (wblock_state->swb != NULL) { |
1717 | num_full_swb++; |
1718 | } |
1719 | else { |
1720 | num_full_blocks++; |
1721 | } |
1722 | } |
1723 | else { |
1724 | if (inuse_sz > ssd->write_block_size || inuse_sz < lwm_size) { |
1725 | cf_info(AS_DRV_SSD, "dev %d, wblock %u, inuse_sz %u, %s swb" , |
1726 | d, i, inuse_sz, wblock_state->swb ? "has" : "no" ); |
1727 | |
1728 | num_defraggable++; |
1729 | } |
1730 | else { |
1731 | num_above_wm++; |
1732 | } |
1733 | } |
1734 | } |
1735 | |
1736 | cf_info(AS_DRV_SSD, "device %s free %d full %d fullswb %d pfull %d defrag %d freeq %d" , |
1737 | ssd->name, num_free_blocks, num_full_blocks, num_full_swb, |
1738 | num_above_wm, num_defraggable, cf_queue_sz(ssd->free_wblock_q)); |
1739 | } |
1740 | } |
1741 | else { |
1742 | cf_info(AS_DRV_SSD, "no devices" ); |
1743 | } |
1744 | } |
1745 | |
1746 | |
1747 | void |
1748 | as_storage_summarize_wblock_stats(as_namespace *ns) |
1749 | { |
1750 | if (AS_STORAGE_ENGINE_SSD != ns->storage_type) { |
1751 | cf_info(AS_DRV_SSD, "Storage engine type must be SSD (%d), not %d." , |
1752 | AS_STORAGE_ENGINE_SSD, ns->storage_type); |
1753 | return; |
1754 | } |
1755 | |
1756 | if (! ns->storage_private) { |
1757 | cf_info(AS_DRV_SSD, "no devices" ); |
1758 | return; |
1759 | } |
1760 | |
1761 | drv_ssds *ssds = ns->storage_private; |
1762 | uint32_t total_num_defraggable = 0; |
1763 | uint32_t total_num_above_wm = 0; |
1764 | uint64_t defraggable_sz = 0; |
1765 | uint64_t non_defraggable_sz = 0; |
1766 | |
1767 | // Note: This is a sparse array that could be more efficiently stored. |
1768 | // (In addition, ranges of block sizes could be binned together to |
1769 | // compress the histogram, rather than using one bin per block size.) |
1770 | uint32_t *wb_hist = cf_calloc(1, sizeof(uint32_t) * MAX_WRITE_BLOCK_SIZE); |
1771 | |
1772 | for (uint32_t d = 0; d < ssds->n_ssds; d++) { |
1773 | drv_ssd *ssd = &ssds->ssds[d]; |
1774 | uint32_t num_free_blocks = 0; |
1775 | uint32_t num_full_swb = 0; |
1776 | uint32_t num_full_blocks = 0; |
1777 | uint32_t lwm_size = ns->defrag_lwm_size; |
1778 | uint32_t num_defraggable = 0; |
1779 | uint32_t num_above_wm = 0; |
1780 | |
1781 | for (uint32_t i = 0; i < ssd->n_wblocks; i++) { |
1782 | ssd_wblock_state *wblock_state = &ssd->wblock_state[i]; |
1783 | uint32_t inuse_sz = cf_atomic32_get(wblock_state->inuse_sz); |
1784 | |
1785 | if (inuse_sz > ssd->write_block_size) { |
1786 | cf_warning(AS_DRV_SSD, "wblock size (%d > %d) too large ~~ not counting in histogram" , |
1787 | inuse_sz, ssd->write_block_size); |
1788 | } |
1789 | else { |
1790 | wb_hist[inuse_sz]++; |
1791 | } |
1792 | |
1793 | if (inuse_sz == 0) { |
1794 | num_free_blocks++; |
1795 | } |
1796 | else if (inuse_sz == ssd->write_block_size) { |
1797 | if (wblock_state->swb != NULL) { |
1798 | num_full_swb++; |
1799 | } |
1800 | else { |
1801 | num_full_blocks++; |
1802 | } |
1803 | } |
1804 | else if (inuse_sz < lwm_size) { |
1805 | defraggable_sz += inuse_sz; |
1806 | num_defraggable++; |
1807 | } |
1808 | else { |
1809 | non_defraggable_sz += inuse_sz; |
1810 | num_above_wm++; |
1811 | } |
1812 | } |
1813 | |
1814 | total_num_defraggable += num_defraggable; |
1815 | total_num_above_wm += num_above_wm; |
1816 | |
1817 | cf_info(AS_DRV_SSD, "device %s free %u full %u fullswb %u pfull %u defrag %u freeq %u" , |
1818 | ssd->name, num_free_blocks, num_full_blocks, num_full_swb, |
1819 | num_above_wm, num_defraggable, cf_queue_sz(ssd->free_wblock_q)); |
1820 | } |
1821 | |
1822 | cf_info(AS_DRV_SSD, "WBH: Storage histogram for namespace \"%s\":" , |
1823 | ns->name); |
1824 | cf_info(AS_DRV_SSD, "WBH: Average wblock size of: defraggable blocks: %lu bytes; nondefraggable blocks: %lu bytes; all blocks: %lu bytes" , |
1825 | defraggable_sz / MAX(1, total_num_defraggable), |
1826 | non_defraggable_sz / MAX(1, total_num_above_wm), |
1827 | (defraggable_sz + non_defraggable_sz) / |
1828 | MAX(1, (total_num_defraggable + total_num_above_wm))); |
1829 | |
1830 | for (uint32_t i = 0; i < MAX_WRITE_BLOCK_SIZE; i++) { |
1831 | if (wb_hist[i] > 0) { |
1832 | cf_info(AS_DRV_SSD, "WBH: %u block%s of size %u bytes" , |
1833 | wb_hist[i], (wb_hist[i] != 1 ? "s" : "" ), i); |
1834 | } |
1835 | } |
1836 | |
1837 | cf_free(wb_hist); |
1838 | } |
1839 | |
1840 | |
1841 | //========================================================== |
1842 | // Per-device background jobs. |
1843 | // |
1844 | |
1845 | #define LOG_STATS_INTERVAL_sec 20 |
1846 | |
1847 | void |
1848 | ssd_log_stats(drv_ssd *ssd, uint64_t *p_prev_n_total_writes, |
1849 | uint64_t *p_prev_n_defrag_reads, uint64_t *p_prev_n_defrag_writes, |
1850 | uint64_t *p_prev_n_defrag_io_skips, uint64_t *p_prev_n_direct_frees, |
1851 | uint64_t *p_prev_n_tomb_raider_reads) |
1852 | { |
1853 | uint64_t n_defrag_reads = cf_atomic64_get(ssd->n_defrag_wblock_reads); |
1854 | uint64_t n_defrag_writes = cf_atomic64_get(ssd->n_defrag_wblock_writes); |
1855 | uint64_t n_total_writes = cf_atomic64_get(ssd->n_wblock_writes) + |
1856 | n_defrag_writes; |
1857 | |
1858 | uint64_t n_defrag_io_skips = cf_atomic64_get(ssd->n_wblock_defrag_io_skips); |
1859 | uint64_t n_direct_frees = cf_atomic64_get(ssd->n_wblock_direct_frees); |
1860 | |
1861 | float total_write_rate = (float)(n_total_writes - *p_prev_n_total_writes) / |
1862 | (float)LOG_STATS_INTERVAL_sec; |
1863 | float defrag_read_rate = (float)(n_defrag_reads - *p_prev_n_defrag_reads) / |
1864 | (float)LOG_STATS_INTERVAL_sec; |
1865 | float defrag_write_rate = (float)(n_defrag_writes - *p_prev_n_defrag_writes) / |
1866 | (float)LOG_STATS_INTERVAL_sec; |
1867 | |
1868 | float defrag_io_skip_rate = (float)(n_defrag_io_skips - *p_prev_n_defrag_io_skips) / |
1869 | (float)LOG_STATS_INTERVAL_sec; |
1870 | float direct_free_rate = (float)(n_direct_frees - *p_prev_n_direct_frees) / |
1871 | (float)LOG_STATS_INTERVAL_sec; |
1872 | |
1873 | uint64_t n_tomb_raider_reads = ssd->n_tomb_raider_reads; |
1874 | char tomb_raider_str[64]; |
1875 | |
1876 | *tomb_raider_str = 0; |
1877 | |
1878 | if (n_tomb_raider_reads != 0) { |
1879 | if (*p_prev_n_tomb_raider_reads > n_tomb_raider_reads) { |
1880 | *p_prev_n_tomb_raider_reads = 0; |
1881 | } |
1882 | |
1883 | float tomb_raider_read_rate = |
1884 | (float)(n_tomb_raider_reads - *p_prev_n_tomb_raider_reads) / |
1885 | (float)LOG_STATS_INTERVAL_sec; |
1886 | |
1887 | sprintf(tomb_raider_str, " tomb-raider-read (%lu,%.1f)" , |
1888 | n_tomb_raider_reads, tomb_raider_read_rate); |
1889 | } |
1890 | |
1891 | char shadow_str[64]; |
1892 | |
1893 | *shadow_str = 0; |
1894 | |
1895 | if (ssd->shadow_name) { |
1896 | sprintf(shadow_str, " shadow-write-q %d" , |
1897 | cf_queue_sz(ssd->swb_shadow_q)); |
1898 | } |
1899 | |
1900 | uint32_t free_wblock_q_sz = (uint32_t)cf_queue_sz(ssd->free_wblock_q); |
1901 | uint32_t n_pristine_wblocks = num_pristine_wblocks(ssd); |
1902 | uint32_t n_free_wblocks = free_wblock_q_sz + n_pristine_wblocks; |
1903 | |
1904 | cf_info(AS_DRV_SSD, "{%s} %s: used-bytes %lu free-wblocks %u write-q %d write (%lu,%.1f) defrag-q %d defrag-read (%lu,%.1f) defrag-write (%lu,%.1f)%s%s" , |
1905 | ssd->ns->name, ssd->name, |
1906 | ssd->inuse_size, n_free_wblocks, |
1907 | cf_queue_sz(ssd->swb_write_q), |
1908 | n_total_writes, total_write_rate, |
1909 | cf_queue_sz(ssd->defrag_wblock_q), n_defrag_reads, defrag_read_rate, |
1910 | n_defrag_writes, defrag_write_rate, |
1911 | shadow_str, tomb_raider_str); |
1912 | |
1913 | cf_detail(AS_DRV_SSD, "{%s} %s: free-wblocks (%u,%u) defrag-io-skips (%lu,%.1f) direct-frees (%lu,%.1f)" , |
1914 | ssd->ns->name, ssd->name, |
1915 | free_wblock_q_sz, n_pristine_wblocks, |
1916 | n_defrag_io_skips, defrag_io_skip_rate, |
1917 | n_direct_frees, direct_free_rate); |
1918 | |
1919 | *p_prev_n_total_writes = n_total_writes; |
1920 | *p_prev_n_defrag_reads = n_defrag_reads; |
1921 | *p_prev_n_defrag_writes = n_defrag_writes; |
1922 | *p_prev_n_defrag_io_skips = n_defrag_io_skips; |
1923 | *p_prev_n_direct_frees = n_direct_frees; |
1924 | *p_prev_n_tomb_raider_reads = n_tomb_raider_reads; |
1925 | |
1926 | if (n_free_wblocks == 0) { |
1927 | cf_warning(AS_DRV_SSD, "device %s: out of storage space" , ssd->name); |
1928 | } |
1929 | } |
1930 | |
1931 | |
1932 | void |
1933 | ssd_free_swbs(drv_ssd *ssd) |
1934 | { |
1935 | // Try to recover swbs, 16 at a time, down to 16. |
1936 | for (int i = 0; i < 16 && cf_queue_sz(ssd->swb_free_q) > 16; i++) { |
1937 | ssd_write_buf* swb; |
1938 | |
1939 | if (CF_QUEUE_OK != |
1940 | cf_queue_pop(ssd->swb_free_q, &swb, CF_QUEUE_NOWAIT)) { |
1941 | break; |
1942 | } |
1943 | |
1944 | swb_destroy(swb); |
1945 | } |
1946 | } |
1947 | |
1948 | |
1949 | void |
1950 | ssd_flush_current_swb(drv_ssd *ssd, uint64_t *p_prev_n_writes) |
1951 | { |
1952 | uint64_t n_writes = cf_atomic64_get(ssd->n_wblock_writes); |
1953 | |
1954 | // If there's an active write load, we don't need to flush. |
1955 | if (n_writes != *p_prev_n_writes) { |
1956 | *p_prev_n_writes = n_writes; |
1957 | return; |
1958 | } |
1959 | |
1960 | cf_mutex_lock(&ssd->write_lock); |
1961 | |
1962 | n_writes = cf_atomic64_get(ssd->n_wblock_writes); |
1963 | |
1964 | // Must check under the lock, could be racing a current swb just queued. |
1965 | if (n_writes != *p_prev_n_writes) { |
1966 | |
1967 | cf_mutex_unlock(&ssd->write_lock); |
1968 | |
1969 | *p_prev_n_writes = n_writes; |
1970 | return; |
1971 | } |
1972 | |
1973 | // Flush the current swb if it isn't empty, and has been written to since |
1974 | // last flushed. |
1975 | |
1976 | ssd_write_buf *swb = ssd->current_swb; |
1977 | |
1978 | if (swb && swb->dirty) { |
1979 | swb->dirty = false; |
1980 | |
1981 | // Clean the end of the buffer before flushing. |
1982 | if (ssd->write_block_size != swb->pos) { |
1983 | memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos); |
1984 | } |
1985 | |
1986 | // Flush it. |
1987 | ssd_flush_swb(ssd, swb); |
1988 | |
1989 | if (ssd->shadow_name) { |
1990 | ssd_shadow_flush_swb(ssd, swb); |
1991 | } |
1992 | } |
1993 | |
1994 | cf_mutex_unlock(&ssd->write_lock); |
1995 | } |
1996 | |
1997 | |
1998 | void |
1999 | ssd_flush_defrag_swb(drv_ssd *ssd, uint64_t *p_prev_n_defrag_writes) |
2000 | { |
2001 | uint64_t n_defrag_writes = cf_atomic64_get(ssd->n_defrag_wblock_writes); |
2002 | |
2003 | // If there's an active defrag load, we don't need to flush. |
2004 | if (n_defrag_writes != *p_prev_n_defrag_writes) { |
2005 | *p_prev_n_defrag_writes = n_defrag_writes; |
2006 | return; |
2007 | } |
2008 | |
2009 | cf_mutex_lock(&ssd->defrag_lock); |
2010 | |
2011 | n_defrag_writes = cf_atomic64_get(ssd->n_defrag_wblock_writes); |
2012 | |
2013 | // Must check under the lock, could be racing a current swb just queued. |
2014 | if (n_defrag_writes != *p_prev_n_defrag_writes) { |
2015 | |
2016 | cf_mutex_unlock(&ssd->defrag_lock); |
2017 | |
2018 | *p_prev_n_defrag_writes = n_defrag_writes; |
2019 | return; |
2020 | } |
2021 | |
2022 | // Flush the defrag swb if it isn't empty, and has been written to since |
2023 | // last flushed. |
2024 | |
2025 | ssd_write_buf *swb = ssd->defrag_swb; |
2026 | |
2027 | if (swb && swb->n_vacated != 0) { |
2028 | // Clean the end of the buffer before flushing. |
2029 | if (ssd->write_block_size != swb->pos) { |
2030 | memset(&swb->buf[swb->pos], 0, ssd->write_block_size - swb->pos); |
2031 | } |
2032 | |
2033 | // Flush it. |
2034 | ssd_flush_swb(ssd, swb); |
2035 | |
2036 | if (ssd->shadow_name) { |
2037 | ssd_shadow_flush_swb(ssd, swb); |
2038 | } |
2039 | |
2040 | // The whole point - free source wblocks. |
2041 | swb_release_all_vacated_wblocks(swb); |
2042 | } |
2043 | |
2044 | cf_mutex_unlock(&ssd->defrag_lock); |
2045 | } |
2046 | |
2047 | |
2048 | // Check all wblocks to load a device's defrag queue at runtime. Triggered only |
2049 | // when defrag-lwm-pct is increased by manual intervention. |
2050 | void |
2051 | ssd_defrag_sweep(drv_ssd *ssd) |
2052 | { |
2053 | uint32_t first_id = ssd->first_wblock_id; |
2054 | uint32_t end_id = ssd->n_wblocks; |
2055 | uint32_t n_queued = 0; |
2056 | |
2057 | for (uint32_t wblock_id = first_id; wblock_id < end_id; wblock_id++) { |
2058 | ssd_wblock_state *p_wblock_state = &ssd->wblock_state[wblock_id]; |
2059 | |
2060 | cf_mutex_lock(&p_wblock_state->LOCK); |
2061 | |
2062 | uint32_t inuse_sz = cf_atomic32_get(p_wblock_state->inuse_sz); |
2063 | |
2064 | if (p_wblock_state->swb == NULL && |
2065 | p_wblock_state->state != WBLOCK_STATE_DEFRAG && |
2066 | inuse_sz != 0 && |
2067 | inuse_sz < ssd->ns->defrag_lwm_size) { |
2068 | push_wblock_to_defrag_q(ssd, wblock_id); |
2069 | n_queued++; |
2070 | } |
2071 | |
2072 | cf_mutex_unlock(&p_wblock_state->LOCK); |
2073 | } |
2074 | |
2075 | cf_info(AS_DRV_SSD, "... %s sweep queued %u wblocks for defrag" , ssd->name, |
2076 | n_queued); |
2077 | } |
2078 | |
2079 | |
2080 | static inline uint64_t |
2081 | next_time(uint64_t now, uint64_t job_interval, uint64_t next) |
2082 | { |
2083 | uint64_t next_job = now + job_interval; |
2084 | |
2085 | return next_job < next ? next_job : next; |
2086 | } |
2087 | |
2088 | |
2089 | // All in microseconds since we're using usleep(). |
2090 | #define MAX_INTERVAL (1000 * 1000) |
2091 | #define LOG_STATS_INTERVAL (1000 * 1000 * LOG_STATS_INTERVAL_sec) |
2092 | #define FREE_SWBS_INTERVAL (1000 * 1000 * 20) |
2093 | |
2094 | // Thread "run" function to perform various background jobs per device. |
2095 | void * |
2096 | run_ssd_maintenance(void *udata) |
2097 | { |
2098 | drv_ssd *ssd = (drv_ssd*)udata; |
2099 | as_namespace *ns = ssd->ns; |
2100 | |
2101 | uint64_t prev_n_total_writes = 0; |
2102 | uint64_t prev_n_defrag_reads = 0; |
2103 | uint64_t prev_n_defrag_writes = 0; |
2104 | uint64_t prev_n_defrag_io_skips = 0; |
2105 | uint64_t prev_n_direct_frees = 0; |
2106 | uint64_t prev_n_tomb_raider_reads = 0; |
2107 | |
2108 | uint64_t prev_n_writes_flush = 0; |
2109 | |
2110 | uint64_t prev_n_defrag_writes_flush = 0; |
2111 | |
2112 | uint64_t now = cf_getus(); |
2113 | uint64_t next = now + MAX_INTERVAL; |
2114 | |
2115 | uint64_t prev_log_stats = now; |
2116 | uint64_t prev_free_swbs = now; |
2117 | uint64_t prev_flush = now; |
2118 | uint64_t prev_defrag_flush = now; |
2119 | |
2120 | // If any job's (initial) interval is less than MAX_INTERVAL and we want it |
2121 | // done on its interval the first time through, add a next_time() call for |
2122 | // that job here to adjust 'next'. (No such jobs for now.) |
2123 | |
2124 | uint64_t sleep_us = next - now; |
2125 | |
2126 | while (true) { |
2127 | usleep((uint32_t)sleep_us); |
2128 | |
2129 | now = cf_getus(); |
2130 | next = now + MAX_INTERVAL; |
2131 | |
2132 | if (now >= prev_log_stats + LOG_STATS_INTERVAL) { |
2133 | ssd_log_stats(ssd, &prev_n_total_writes, &prev_n_defrag_reads, |
2134 | &prev_n_defrag_writes, &prev_n_defrag_io_skips, |
2135 | &prev_n_direct_frees, &prev_n_tomb_raider_reads); |
2136 | prev_log_stats = now; |
2137 | next = next_time(now, LOG_STATS_INTERVAL, next); |
2138 | } |
2139 | |
2140 | if (now >= prev_free_swbs + FREE_SWBS_INTERVAL) { |
2141 | ssd_free_swbs(ssd); |
2142 | prev_free_swbs = now; |
2143 | next = next_time(now, FREE_SWBS_INTERVAL, next); |
2144 | } |
2145 | |
2146 | uint64_t flush_max_us = ssd_flush_max_us(ns); |
2147 | |
2148 | if (flush_max_us != 0 && now >= prev_flush + flush_max_us) { |
2149 | ssd_flush_current_swb(ssd, &prev_n_writes_flush); |
2150 | prev_flush = now; |
2151 | next = next_time(now, flush_max_us, next); |
2152 | } |
2153 | |
2154 | static const uint64_t DEFRAG_FLUSH_MAX_US = 3UL * 1000 * 1000; // 3 sec |
2155 | |
2156 | if (now >= prev_defrag_flush + DEFRAG_FLUSH_MAX_US) { |
2157 | ssd_flush_defrag_swb(ssd, &prev_n_defrag_writes_flush); |
2158 | prev_defrag_flush = now; |
2159 | next = next_time(now, DEFRAG_FLUSH_MAX_US, next); |
2160 | } |
2161 | |
2162 | if (cf_atomic32_get(ssd->defrag_sweep) != 0) { |
2163 | // May take long enough to mess up other jobs' schedules, but it's a |
2164 | // very rare manually-triggered intervention. |
2165 | ssd_defrag_sweep(ssd); |
2166 | cf_atomic32_decr(&ssd->defrag_sweep); |
2167 | } |
2168 | |
2169 | now = cf_getus(); // refresh in case jobs took significant time |
2170 | sleep_us = next > now ? next - now : 1; |
2171 | } |
2172 | |
2173 | return NULL; |
2174 | } |
2175 | |
2176 | |
2177 | void |
2178 | ssd_start_maintenance_threads(drv_ssds *ssds) |
2179 | { |
2180 | cf_info(AS_DRV_SSD, "{%s} starting device maintenance threads" , |
2181 | ssds->ns->name); |
2182 | |
2183 | for (int i = 0; i < ssds->n_ssds; i++) { |
2184 | drv_ssd* ssd = &ssds->ssds[i]; |
2185 | |
2186 | cf_thread_create_detached(run_ssd_maintenance, (void*)ssd); |
2187 | } |
2188 | } |
2189 | |
2190 | |
2191 | //========================================================== |
2192 | // Device header utilities. |
2193 | // |
2194 | |
2195 | ssd_device_header * |
2196 | (drv_ssd *ssd) |
2197 | { |
2198 | as_namespace *ns = ssd->ns; |
2199 | |
2200 | bool use_shadow = ns->cold_start && ssd->shadow_name; |
2201 | |
2202 | const char *ssd_name; |
2203 | int fd; |
2204 | size_t read_size; |
2205 | |
2206 | if (use_shadow) { |
2207 | ssd_name = ssd->shadow_name; |
2208 | fd = ssd_shadow_fd_get(ssd); |
2209 | read_size = BYTES_UP_TO_SHADOW_IO_MIN(ssd, sizeof(ssd_device_header)); |
2210 | } |
2211 | else { |
2212 | ssd_name = ssd->name; |
2213 | fd = ssd_fd_get(ssd); |
2214 | read_size = BYTES_UP_TO_IO_MIN(ssd, sizeof(ssd_device_header)); |
2215 | } |
2216 | |
2217 | ssd_device_header * = cf_valloc(read_size); |
2218 | |
2219 | if (! pread_all(fd, (void*)header, read_size, 0)) { |
2220 | cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)" , ssd_name, errno, |
2221 | cf_strerror(errno)); |
2222 | } |
2223 | |
2224 | ssd_common_prefix *prefix = &header->common.prefix; |
2225 | |
2226 | if (prefix->magic == SSD_HEADER_OLD_MAGIC) { |
2227 | cf_crash(AS_DRV_SSD, "%s: Aerospike device has old format - must erase device to upgrade" , |
2228 | ssd_name); |
2229 | } |
2230 | |
2231 | // Normal path for a fresh drive. |
2232 | if (prefix->magic != SSD_HEADER_MAGIC) { |
2233 | cf_detail(AS_DRV_SSD, "%s: bad magic - fresh drive?" , ssd_name); |
2234 | cf_free(header); |
2235 | use_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd); |
2236 | return NULL; |
2237 | } |
2238 | |
2239 | if (prefix->version != SSD_VERSION) { |
2240 | cf_crash(AS_DRV_SSD, "%s: unknown version %u" , ssd_name, |
2241 | prefix->version); |
2242 | } |
2243 | |
2244 | if (strcmp(prefix->namespace, ns->name) != 0) { |
2245 | cf_crash(AS_DRV_SSD, "%s: previous namespace %s now %s - check config or erase device" , |
2246 | ssd_name, prefix->namespace, ns->name); |
2247 | } |
2248 | |
2249 | if (prefix->n_devices > AS_STORAGE_MAX_DEVICES) { |
2250 | cf_crash(AS_DRV_SSD, "%s: bad n-devices %u" , ssd_name, |
2251 | prefix->n_devices); |
2252 | } |
2253 | |
2254 | if (prefix->random == 0) { |
2255 | cf_crash(AS_DRV_SSD, "%s: random signature is 0" , ssd_name); |
2256 | } |
2257 | |
2258 | if (prefix->write_block_size == 0 || |
2259 | ns->storage_write_block_size % prefix->write_block_size != 0) { |
2260 | cf_crash(AS_DRV_SSD, "%s: can't change write-block-size from %u to %u" , |
2261 | ssd_name, prefix->write_block_size, |
2262 | ns->storage_write_block_size); |
2263 | } |
2264 | |
2265 | if (header->unique.device_id >= AS_STORAGE_MAX_DEVICES) { |
2266 | cf_crash(AS_DRV_SSD, "%s: bad device-id %u" , ssd_name, |
2267 | header->unique.device_id); |
2268 | } |
2269 | |
2270 | ssd_header_validate_cfg(ns, ssd, header); |
2271 | |
2272 | if (header->unique.pristine_offset != 0 && // always 0 before 4.6 |
2273 | (header->unique.pristine_offset < SSD_HEADER_SIZE || |
2274 | header->unique.pristine_offset > ssd->file_size)) { |
2275 | cf_crash(AS_DRV_SSD, "%s: bad pristine offset %lu" , ssd_name, |
2276 | header->unique.pristine_offset); |
2277 | } |
2278 | |
2279 | // In case we're increasing write-block-size - ensure new value is recorded. |
2280 | prefix->write_block_size = ns->storage_write_block_size; |
2281 | |
2282 | use_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd); |
2283 | |
2284 | return header; |
2285 | } |
2286 | |
2287 | |
2288 | ssd_device_header * |
2289 | (as_namespace *ns, drv_ssd *ssd) |
2290 | { |
2291 | ssd_device_header * = cf_malloc(sizeof(ssd_device_header)); |
2292 | |
2293 | memset(header, 0, sizeof(ssd_device_header)); |
2294 | |
2295 | ssd_common_prefix *prefix = &header->common.prefix; |
2296 | |
2297 | // Set non-zero common fields. |
2298 | prefix->magic = SSD_HEADER_MAGIC; |
2299 | prefix->version = SSD_VERSION; |
2300 | strcpy(prefix->namespace, ns->name); |
2301 | prefix->write_block_size = ns->storage_write_block_size; |
2302 | |
2303 | ssd_header_init_cfg(ns, ssd, header); |
2304 | |
2305 | return header; |
2306 | } |
2307 | |
2308 | |
2309 | void |
2310 | (int fd, const char* device_name) |
2311 | { |
2312 | void *h = cf_valloc(SSD_HEADER_SIZE); |
2313 | |
2314 | memset(h, 0, SSD_HEADER_SIZE); |
2315 | |
2316 | if (! pwrite_all(fd, h, SSD_HEADER_SIZE, 0)) { |
2317 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)" , |
2318 | device_name, errno, cf_strerror(errno)); |
2319 | } |
2320 | |
2321 | cf_free(h); |
2322 | } |
2323 | |
2324 | |
2325 | void |
2326 | (drv_ssd *ssd, uint8_t *, uint8_t *from, size_t size) |
2327 | { |
2328 | off_t offset = from - header; |
2329 | |
2330 | off_t flush_offset = BYTES_DOWN_TO_IO_MIN(ssd, offset); |
2331 | off_t flush_end_offset = BYTES_UP_TO_IO_MIN(ssd, offset + size); |
2332 | |
2333 | uint8_t *flush = header + flush_offset; |
2334 | size_t flush_sz = flush_end_offset - flush_offset; |
2335 | |
2336 | int fd = ssd_fd_get(ssd); |
2337 | |
2338 | if (! pwrite_all(fd, (void*)flush, flush_sz, flush_offset)) { |
2339 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)" , |
2340 | ssd->name, errno, cf_strerror(errno)); |
2341 | } |
2342 | |
2343 | ssd_fd_put(ssd, fd); |
2344 | |
2345 | if (! ssd->shadow_name) { |
2346 | return; |
2347 | } |
2348 | |
2349 | flush_offset = BYTES_DOWN_TO_SHADOW_IO_MIN(ssd, offset); |
2350 | flush_end_offset = BYTES_UP_TO_SHADOW_IO_MIN(ssd, offset + size); |
2351 | |
2352 | flush = header + flush_offset; |
2353 | flush_sz = flush_end_offset - flush_offset; |
2354 | |
2355 | fd = ssd_shadow_fd_get(ssd); |
2356 | |
2357 | if (! pwrite_all(fd, (void*)flush, flush_sz, flush_offset)) { |
2358 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)" , |
2359 | ssd->shadow_name, errno, cf_strerror(errno)); |
2360 | } |
2361 | |
2362 | ssd_shadow_fd_put(ssd, fd); |
2363 | } |
2364 | |
2365 | |
2366 | //========================================================== |
2367 | // Cold start utilities. |
2368 | // |
2369 | |
2370 | bool |
2371 | prefer_existing_record(drv_ssd* ssd, const as_flat_record* flat, |
2372 | uint32_t block_void_time, const as_index* r) |
2373 | { |
2374 | int result = as_record_resolve_conflict(ssd_cold_start_policy(ssd->ns), |
2375 | r->generation, r->last_update_time, |
2376 | flat->generation, flat->last_update_time); |
2377 | |
2378 | if (result != 0) { |
2379 | return result == -1; // -1 means block record < existing record |
2380 | } |
2381 | |
2382 | // Finally, compare void-times. Note that defragged records will generate |
2383 | // identical copies on drive, so they'll get here and return true. |
2384 | return r->void_time == 0 || |
2385 | (block_void_time != 0 && block_void_time <= r->void_time); |
2386 | } |
2387 | |
2388 | |
2389 | bool |
2390 | is_set_evictable(as_namespace* ns, const as_flat_opt_meta* opt_meta) |
2391 | { |
2392 | if (! opt_meta->set_name) { |
2393 | return true; |
2394 | } |
2395 | |
2396 | as_set *p_set; |
2397 | |
2398 | if (cf_vmapx_get_by_name_w_len(ns->p_sets_vmap, opt_meta->set_name, |
2399 | opt_meta->set_name_len, (void**)&p_set) != CF_VMAPX_OK) { |
2400 | return true; |
2401 | } |
2402 | |
2403 | return ! IS_SET_EVICTION_DISABLED(p_set); |
2404 | } |
2405 | |
2406 | |
2407 | void |
2408 | apply_opt_meta(as_record* r, as_namespace* ns, const as_flat_opt_meta* opt_meta) |
2409 | { |
2410 | // Set record's set-id. (If it already has one, assume they're the same.) |
2411 | if (as_index_get_set_id(r) == INVALID_SET_ID && opt_meta->set_name) { |
2412 | as_index_set_set_w_len(r, ns, opt_meta->set_name, |
2413 | opt_meta->set_name_len, false); |
2414 | } |
2415 | |
2416 | // Store or drop the key according to the props we read. |
2417 | as_record_finalize_key(r, ns, opt_meta->key, opt_meta->key_size); |
2418 | } |
2419 | |
2420 | |
2421 | // Add a record just read from drive to the index, if all is well. |
2422 | void |
2423 | ssd_cold_start_add_record(drv_ssds* ssds, drv_ssd* ssd, |
2424 | const as_flat_record* flat, uint64_t rblock_id, uint32_t record_size) |
2425 | { |
2426 | uint32_t pid = as_partition_getid(&flat->keyd); |
2427 | |
2428 | // If this isn't a partition we're interested in, skip this record. |
2429 | if (! ssds->get_state_from_storage[pid]) { |
2430 | return; |
2431 | } |
2432 | |
2433 | as_namespace* ns = ssds->ns; |
2434 | as_partition* p_partition = &ns->partitions[pid]; |
2435 | |
2436 | const uint8_t* end = (const uint8_t*)flat + record_size; |
2437 | as_flat_opt_meta opt_meta = { 0 }; |
2438 | |
2439 | const uint8_t* p_read = as_flat_unpack_record_meta(flat, end, &opt_meta, |
2440 | ns->single_bin); |
2441 | |
2442 | if (! p_read) { |
2443 | cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad metadata for record " ); |
2444 | return; |
2445 | } |
2446 | |
2447 | if (opt_meta.void_time > ns->startup_max_void_time) { |
2448 | cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad flat record void-time " ); |
2449 | return; |
2450 | } |
2451 | |
2452 | if (! as_flat_decompress_buffer(&opt_meta.cm, ns->storage_write_block_size, |
2453 | &p_read, &end)) { |
2454 | cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad compressed data for record " ); |
2455 | return; |
2456 | } |
2457 | |
2458 | if (! as_flat_check_packed_bins(p_read, end, opt_meta.n_bins, |
2459 | ns->single_bin)) { |
2460 | cf_warning_digest(AS_DRV_SSD, &flat->keyd, "bad flat record " ); |
2461 | return; |
2462 | } |
2463 | |
2464 | // Ignore record if it was in a dropped tree. |
2465 | if (flat->tree_id != p_partition->tree_id) { |
2466 | return; |
2467 | } |
2468 | |
2469 | // Ignore records that were truncated. |
2470 | if (as_truncate_lut_is_truncated(flat->last_update_time, ns, |
2471 | opt_meta.set_name, opt_meta.set_name_len)) { |
2472 | return; |
2473 | } |
2474 | |
2475 | // If eviction is necessary, evict previously added records closest to |
2476 | // expiration. (If evicting, this call will block for a long time.) This |
2477 | // call may also update the cold start threshold void-time. |
2478 | if (! as_cold_start_evict_if_needed(ns)) { |
2479 | cf_crash(AS_DRV_SSD, "hit stop-writes limit before drive scan completed" ); |
2480 | } |
2481 | |
2482 | // Get/create the record from/in the appropriate index tree. |
2483 | as_index_ref r_ref; |
2484 | int rv = as_record_get_create(p_partition->tree, &flat->keyd, &r_ref, ns); |
2485 | |
2486 | if (rv < 0) { |
2487 | cf_detail_digest(AS_DRV_SSD, &flat->keyd, "record-add as_record_get_create() failed " ); |
2488 | return; |
2489 | } |
2490 | |
2491 | bool is_create = rv == 1; |
2492 | |
2493 | as_index* r = r_ref.r; |
2494 | |
2495 | if (! is_create) { |
2496 | // Record already existed. Ignore this one if existing record is newer. |
2497 | if (prefer_existing_record(ssd, flat, opt_meta.void_time, r)) { |
2498 | ssd_cold_start_adjust_cenotaph(ns, flat->has_bins == 1, |
2499 | opt_meta.void_time, r); |
2500 | as_record_done(&r_ref, ns); |
2501 | ssd->record_add_older_counter++; |
2502 | return; |
2503 | } |
2504 | } |
2505 | // The record we're now reading is the latest version (so far) ... |
2506 | |
2507 | // Skip records that have expired. |
2508 | if (opt_meta.void_time != 0 && ns->cold_start_now > opt_meta.void_time) { |
2509 | as_index_delete(p_partition->tree, &flat->keyd); |
2510 | as_record_done(&r_ref, ns); |
2511 | ssd->record_add_expired_counter++; |
2512 | return; |
2513 | } |
2514 | |
2515 | // Skip records that were evicted. |
2516 | if (opt_meta.void_time != 0 && ns->evict_void_time > opt_meta.void_time && |
2517 | is_set_evictable(ns, &opt_meta)) { |
2518 | as_index_delete(p_partition->tree, &flat->keyd); |
2519 | as_record_done(&r_ref, ns); |
2520 | ssd->record_add_evicted_counter++; |
2521 | return; |
2522 | } |
2523 | |
2524 | // We'll keep the record we're now reading ... |
2525 | |
2526 | ssd_cold_start_init_repl_state(ns, r); |
2527 | |
2528 | // Set/reset the record's last-update-time generation, and void-time. |
2529 | r->last_update_time = flat->last_update_time; |
2530 | r->generation = flat->generation; |
2531 | r->void_time = opt_meta.void_time; |
2532 | |
2533 | // Update maximum void-time. |
2534 | cf_atomic32_setmax(&p_partition->max_void_time, (int32_t)r->void_time); |
2535 | |
2536 | // If data is in memory, load bins and particles, adjust secondary index. |
2537 | if (ns->storage_data_in_memory) { |
2538 | as_storage_rd rd; |
2539 | |
2540 | if (is_create) { |
2541 | as_storage_record_create(ns, r, &rd); |
2542 | } |
2543 | else { |
2544 | as_storage_record_open(ns, r, &rd); |
2545 | } |
2546 | |
2547 | as_storage_rd_load_n_bins(&rd); |
2548 | as_storage_rd_load_bins(&rd, NULL); |
2549 | |
2550 | uint64_t bytes_memory = as_storage_record_get_n_bytes_memory(&rd); |
2551 | |
2552 | // Do this early since set-id is needed for the secondary index update. |
2553 | apply_opt_meta(r, ns, &opt_meta); |
2554 | |
2555 | uint16_t old_n_bins = rd.n_bins; |
2556 | |
2557 | bool has_sindex = record_has_sindex(r, ns); |
2558 | int sbins_populated = 0; |
2559 | |
2560 | if (has_sindex) { |
2561 | SINDEX_GRLOCK(); |
2562 | } |
2563 | |
2564 | SINDEX_BINS_SETUP(sbins, 2 * ns->sindex_cnt); |
2565 | as_sindex* si_arr[2 * ns->sindex_cnt]; |
2566 | int si_arr_index = 0; |
2567 | const char* set_name = as_index_get_set_name(r, ns); |
2568 | |
2569 | if (has_sindex) { |
2570 | for (uint16_t i = 0; i < old_n_bins; i++) { |
2571 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, |
2572 | set_name, rd.bins[i].id, &si_arr[si_arr_index]); |
2573 | } |
2574 | } |
2575 | |
2576 | int32_t delta_bins = (int32_t)opt_meta.n_bins - (int32_t)old_n_bins; |
2577 | |
2578 | if (ns->single_bin) { |
2579 | if (delta_bins < 0) { |
2580 | as_record_destroy_bins(&rd); |
2581 | } |
2582 | } |
2583 | else if (delta_bins != 0) { |
2584 | if (has_sindex && delta_bins < 0) { |
2585 | sbins_populated += as_sindex_sbins_from_rd(&rd, |
2586 | (uint16_t)opt_meta.n_bins, old_n_bins, sbins, |
2587 | AS_SINDEX_OP_DELETE); |
2588 | } |
2589 | |
2590 | as_bin_allocate_bin_space(&rd, delta_bins); |
2591 | } |
2592 | |
2593 | for (uint16_t i = 0; i < (uint16_t)opt_meta.n_bins; i++) { |
2594 | as_bin* b; |
2595 | size_t name_len = ns->single_bin ? 0 : *p_read++; |
2596 | |
2597 | if (i < old_n_bins) { |
2598 | b = &rd.bins[i]; |
2599 | |
2600 | if (has_sindex) { |
2601 | sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, |
2602 | &sbins[sbins_populated], AS_SINDEX_OP_DELETE); |
2603 | } |
2604 | |
2605 | if (! as_bin_set_id_from_name_w_len(ns, b, p_read, name_len)) { |
2606 | // TODO - should maybe fail gracefully? |
2607 | cf_crash(AS_DRV_SSD, "bin id assignment failed" ); |
2608 | } |
2609 | } |
2610 | else { |
2611 | b = as_bin_create_from_buf(&rd, p_read, name_len, NULL); |
2612 | |
2613 | if (! b) { |
2614 | // TODO - should maybe fail gracefully? |
2615 | cf_crash(AS_DRV_SSD, "bin create failed" ); |
2616 | } |
2617 | } |
2618 | |
2619 | p_read += name_len; |
2620 | |
2621 | if (! (p_read = |
2622 | as_bin_particle_replace_from_flat(b, p_read, end))) { |
2623 | // TODO - should maybe fail gracefully? |
2624 | cf_crash(AS_DRV_SSD, "particle replace failed" ); |
2625 | } |
2626 | |
2627 | if (has_sindex) { |
2628 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, |
2629 | set_name, b->id, &si_arr[si_arr_index]); |
2630 | sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, |
2631 | &sbins[sbins_populated], AS_SINDEX_OP_INSERT); |
2632 | } |
2633 | } |
2634 | |
2635 | if (has_sindex) { |
2636 | SINDEX_GRUNLOCK(); |
2637 | |
2638 | if (sbins_populated > 0) { |
2639 | as_sindex_update_by_sbin(ns, as_index_get_set_name(r, ns), |
2640 | sbins, sbins_populated, &r->keyd); |
2641 | as_sindex_sbin_freeall(sbins, sbins_populated); |
2642 | } |
2643 | |
2644 | as_sindex_release_arr(si_arr, si_arr_index); |
2645 | } |
2646 | |
2647 | as_storage_record_adjust_mem_stats(&rd, bytes_memory); |
2648 | as_storage_record_close(&rd); |
2649 | } |
2650 | else { |
2651 | apply_opt_meta(r, ns, &opt_meta); |
2652 | } |
2653 | |
2654 | if (is_create) { |
2655 | ssd->record_add_unique_counter++; |
2656 | } |
2657 | else if (STORAGE_RBLOCK_IS_VALID(r->rblock_id)) { |
2658 | // Replacing an existing record, undo its previous storage accounting. |
2659 | ssd_block_free(&ssds->ssds[r->file_id], r->rblock_id, r->n_rblocks, |
2660 | "record-add" ); |
2661 | ssd->record_add_replace_counter++; |
2662 | } |
2663 | else { |
2664 | cf_warning(AS_DRV_SSD, "replacing record with invalid rblock-id" ); |
2665 | } |
2666 | |
2667 | ssd_cold_start_transition_record(ns, flat, r, is_create); |
2668 | |
2669 | uint32_t wblock_id = RBLOCK_ID_TO_WBLOCK_ID(ssd, rblock_id); |
2670 | |
2671 | ssd->inuse_size += record_size; |
2672 | ssd->wblock_state[wblock_id].inuse_sz += record_size; |
2673 | |
2674 | // Set/reset the record's storage information. |
2675 | r->file_id = ssd->file_id; |
2676 | r->rblock_id = rblock_id; |
2677 | r->n_rblocks = flat->n_rblocks; |
2678 | |
2679 | as_record_done(&r_ref, ns); |
2680 | } |
2681 | |
2682 | |
2683 | // Sweep through a storage device to rebuild the index. |
2684 | void |
2685 | ssd_cold_start_sweep(drv_ssds *ssds, drv_ssd *ssd) |
2686 | { |
2687 | size_t wblock_size = ssd->write_block_size; |
2688 | |
2689 | uint8_t *buf = cf_valloc(wblock_size); |
2690 | |
2691 | bool read_shadow = ssd->shadow_name; |
2692 | const char *read_ssd_name = read_shadow ? ssd->shadow_name : ssd->name; |
2693 | int fd = read_shadow ? ssd_shadow_fd_get(ssd) : ssd_fd_get(ssd); |
2694 | int write_fd = read_shadow ? ssd_fd_get(ssd) : -1; |
2695 | |
2696 | // Loop over all wblocks, unless we encounter 10 contiguous unused wblocks. |
2697 | |
2698 | ssd->sweep_wblock_id = ssd->first_wblock_id; |
2699 | |
2700 | uint64_t file_offset = SSD_HEADER_SIZE; |
2701 | uint32_t n_unused_wblocks = 0; |
2702 | |
2703 | bool prefetch = cf_arenax_want_prefetch(ssd->ns->arena); |
2704 | |
2705 | while (file_offset < ssd->file_size && n_unused_wblocks < 10) { |
2706 | if (! pread_all(fd, buf, wblock_size, (off_t)file_offset)) { |
2707 | cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)" , |
2708 | read_ssd_name, errno, cf_strerror(errno)); |
2709 | } |
2710 | |
2711 | if (read_shadow && ! pwrite_all(write_fd, (void*)buf, wblock_size, |
2712 | (off_t)file_offset)) { |
2713 | cf_crash(AS_DRV_SSD, "%s: write failed: errno %d (%s)" , ssd->name, |
2714 | errno, cf_strerror(errno)); |
2715 | } |
2716 | |
2717 | if (prefetch) { |
2718 | ssd_prefetch_wblock(ssd, file_offset, buf); |
2719 | } |
2720 | |
2721 | size_t indent = 0; // current offset within wblock, in bytes |
2722 | |
2723 | while (indent < wblock_size) { |
2724 | as_flat_record *flat = (as_flat_record*)&buf[indent]; |
2725 | |
2726 | if (! prefetch) { |
2727 | ssd_decrypt(ssd, file_offset + indent, flat); |
2728 | } |
2729 | |
2730 | // Look for record magic. |
2731 | if (flat->magic != AS_FLAT_MAGIC) { |
2732 | // Should always find a record at beginning of used wblock. if |
2733 | // not, we've likely encountered the unused part of the device. |
2734 | if (indent == 0) { |
2735 | n_unused_wblocks++; |
2736 | break; // try next wblock |
2737 | } |
2738 | // else - nothing more in this wblock, but keep looking for |
2739 | // magic - necessary if we want to be able to increase |
2740 | // write-block-size across restarts. |
2741 | |
2742 | indent += RBLOCK_SIZE; |
2743 | continue; // try next rblock |
2744 | } |
2745 | |
2746 | if (n_unused_wblocks != 0) { |
2747 | cf_warning(AS_DRV_SSD, "%s: found used wblock after skipping %u unused" , |
2748 | ssd->name, n_unused_wblocks); |
2749 | |
2750 | n_unused_wblocks = 0; // restart contiguous count |
2751 | } |
2752 | |
2753 | uint32_t record_size = N_RBLOCKS_TO_SIZE(flat->n_rblocks); |
2754 | |
2755 | if (record_size < SSD_RECORD_MIN_SIZE) { |
2756 | cf_warning(AS_DRV_SSD, "%s: record too small: size %u" , |
2757 | ssd->name, record_size); |
2758 | indent += RBLOCK_SIZE; |
2759 | continue; // try next rblock |
2760 | } |
2761 | |
2762 | size_t next_indent = indent + record_size; |
2763 | |
2764 | // Sanity-check for wblock overruns. |
2765 | if (next_indent > wblock_size) { |
2766 | cf_warning(AS_DRV_SSD, "%s: record crosses wblock boundary: size %u" , |
2767 | ssd->name, record_size); |
2768 | break; // skip this record, try next wblock |
2769 | } |
2770 | |
2771 | // Found a record - try to add it to the index. |
2772 | ssd_cold_start_add_record(ssds, ssd, flat, |
2773 | OFFSET_TO_RBLOCK_ID(file_offset + indent), record_size); |
2774 | |
2775 | indent = next_indent; |
2776 | } |
2777 | |
2778 | file_offset += wblock_size; |
2779 | ssd->sweep_wblock_id++; |
2780 | } |
2781 | |
2782 | ssd->pristine_wblock_id = ssd->sweep_wblock_id - n_unused_wblocks; |
2783 | |
2784 | ssd->sweep_wblock_id = (uint32_t)(ssd->file_size / wblock_size); |
2785 | |
2786 | if (fd != -1) { |
2787 | read_shadow ? ssd_shadow_fd_put(ssd, fd) : ssd_fd_put(ssd, fd); |
2788 | } |
2789 | |
2790 | if (write_fd != -1) { |
2791 | ssd_fd_put(ssd, write_fd); |
2792 | } |
2793 | |
2794 | cf_free(buf); |
2795 | } |
2796 | |
2797 | |
2798 | // Thread "run" function to read a storage device and rebuild the index. |
2799 | void * |
2800 | run_ssd_cold_start(void *udata) |
2801 | { |
2802 | ssd_load_records_info *lri = (ssd_load_records_info*)udata; |
2803 | drv_ssd *ssd = lri->ssd; |
2804 | drv_ssds *ssds = lri->ssds; |
2805 | cf_queue *complete_q = lri->complete_q; |
2806 | void *complete_rc = lri->complete_rc; |
2807 | |
2808 | cf_free(lri); |
2809 | |
2810 | as_namespace* ns = ssds->ns; |
2811 | |
2812 | cf_info(AS_DRV_SSD, "device %s: reading device to load index" , ssd->name); |
2813 | |
2814 | CF_ALLOC_SET_NS_ARENA(ns); |
2815 | |
2816 | ssd_cold_start_sweep(ssds, ssd); |
2817 | |
2818 | cf_info(AS_DRV_SSD, "device %s: read complete: UNIQUE %lu (REPLACED %lu) (OLDER %lu) (EXPIRED %lu) (EVICTED %lu) records" , |
2819 | ssd->name, ssd->record_add_unique_counter, |
2820 | ssd->record_add_replace_counter, ssd->record_add_older_counter, |
2821 | ssd->record_add_expired_counter, ssd->record_add_evicted_counter); |
2822 | |
2823 | if (cf_rc_release(complete_rc) == 0) { |
2824 | // All drives are done reading. |
2825 | |
2826 | ns->loading_records = false; |
2827 | ssd_cold_start_drop_cenotaphs(ns); |
2828 | ssd_load_wblock_queues(ssds); |
2829 | |
2830 | cf_mutex_destroy(&ns->cold_start_evict_lock); |
2831 | |
2832 | as_truncate_list_cenotaphs(ns); |
2833 | as_truncate_done_startup(ns); // set truncate last-update-times in sets' vmap |
2834 | |
2835 | ssd_start_maintenance_threads(ssds); |
2836 | ssd_start_write_threads(ssds); |
2837 | ssd_start_defrag_threads(ssds); |
2838 | |
2839 | void *_t = NULL; |
2840 | |
2841 | cf_queue_push(complete_q, &_t); |
2842 | cf_rc_free(complete_rc); |
2843 | } |
2844 | |
2845 | return NULL; |
2846 | } |
2847 | |
2848 | |
2849 | void |
2850 | start_loading_records(drv_ssds *ssds, cf_queue *complete_q) |
2851 | { |
2852 | as_namespace *ns = ssds->ns; |
2853 | |
2854 | ns->loading_records = true; |
2855 | |
2856 | void *p = cf_rc_alloc(1); |
2857 | |
2858 | for (int i = 1; i < ssds->n_ssds; i++) { |
2859 | cf_rc_reserve(p); |
2860 | } |
2861 | |
2862 | for (int i = 0; i < ssds->n_ssds; i++) { |
2863 | drv_ssd *ssd = &ssds->ssds[i]; |
2864 | ssd_load_records_info *lri = cf_malloc(sizeof(ssd_load_records_info)); |
2865 | |
2866 | lri->ssds = ssds; |
2867 | lri->ssd = ssd; |
2868 | lri->complete_q = complete_q; |
2869 | lri->complete_rc = p; |
2870 | |
2871 | cf_thread_create_detached( |
2872 | ns->cold_start ? run_ssd_cold_start : run_ssd_cool_start, |
2873 | (void*)lri); |
2874 | } |
2875 | } |
2876 | |
2877 | |
2878 | //========================================================== |
2879 | // Generic startup utilities. |
2880 | // |
2881 | |
2882 | static void |
2883 | (drv_ssds *ssds, ssd_device_header **) |
2884 | { |
2885 | uint8_t* buf = cf_valloc(SSD_HEADER_SIZE); |
2886 | |
2887 | memset(buf, 0, SSD_HEADER_SIZE); |
2888 | memcpy(buf, ssds->common, sizeof(ssd_device_common)); |
2889 | |
2890 | for (int i = 0; i < ssds->n_ssds; i++) { |
2891 | memcpy(buf + SSD_OFFSET_UNIQUE, &headers[i]->unique, |
2892 | sizeof(ssd_device_unique)); |
2893 | |
2894 | ssd_write_header(&ssds->ssds[i], buf, buf, SSD_HEADER_SIZE); |
2895 | } |
2896 | |
2897 | cf_free(buf); |
2898 | } |
2899 | |
2900 | |
2901 | // Not called for fresh devices, but called in all (warm/cool/cold) starts. |
2902 | static void |
2903 | ssd_init_pristine_wblock_id(drv_ssd *ssd, uint64_t offset) |
2904 | { |
2905 | if (offset == 0) { |
2906 | // Legacy device with data - flag to scan and find id on warm restart. |
2907 | ssd->pristine_wblock_id = 0; |
2908 | return; |
2909 | } |
2910 | |
2911 | // Round up, in case write-block-size was increased. |
2912 | ssd->pristine_wblock_id = |
2913 | (offset + (ssd->write_block_size - 1)) / ssd->write_block_size; |
2914 | } |
2915 | |
2916 | |
2917 | void |
2918 | ssd_init_synchronous(drv_ssds *ssds) |
2919 | { |
2920 | uint64_t random = 0; |
2921 | |
2922 | while (random == 0) { |
2923 | random = cf_get_rand64(); |
2924 | } |
2925 | |
2926 | int n_ssds = ssds->n_ssds; |
2927 | as_namespace *ns = ssds->ns; |
2928 | |
2929 | ssd_device_header *[n_ssds]; |
2930 | int first_used = -1; |
2931 | |
2932 | // Check all the headers. Pick one as the representative. |
2933 | for (int i = 0; i < n_ssds; i++) { |
2934 | drv_ssd *ssd = &ssds->ssds[i]; |
2935 | |
2936 | headers[i] = ssd_read_header(ssd); |
2937 | |
2938 | if (! headers[i]) { |
2939 | headers[i] = ssd_init_header(ns, ssd); |
2940 | } |
2941 | else if (first_used < 0) { |
2942 | first_used = i; |
2943 | } |
2944 | } |
2945 | |
2946 | if (first_used < 0) { |
2947 | // Shouldn't find all fresh headers here during warm or cool restart. |
2948 | if (! ns->cold_start) { |
2949 | // There's no going back to cold start now - do so the harsh way. |
2950 | cf_crash(AS_DRV_SSD, "{%s} found all %d devices fresh during %s restart" , |
2951 | ns->name, n_ssds, as_namespace_start_mode_str(ns)); |
2952 | } |
2953 | |
2954 | cf_info(AS_DRV_SSD, "{%s} found all %d devices fresh, initializing to random %lu" , |
2955 | ns->name, n_ssds, random); |
2956 | |
2957 | ssds->common = cf_valloc(ROUND_UP_COMMON); |
2958 | memcpy(ssds->common, &headers[0]->common, ROUND_UP_COMMON); |
2959 | |
2960 | ssds->common->prefix.n_devices = n_ssds; |
2961 | ssds->common->prefix.random = random; |
2962 | |
2963 | for (int i = 0; i < n_ssds; i++) { |
2964 | headers[i]->unique.device_id = (uint32_t)i; |
2965 | } |
2966 | |
2967 | ssd_adjust_versions(ns, ssds->common->pmeta); |
2968 | |
2969 | ssd_flush_header(ssds, headers); |
2970 | |
2971 | for (int i = 0; i < n_ssds; i++) { |
2972 | cf_free(headers[i]); |
2973 | } |
2974 | |
2975 | as_truncate_list_cenotaphs(ns); // all will show as cenotaph |
2976 | as_truncate_done_startup(ns); |
2977 | |
2978 | ssds->all_fresh = true; // won't need to scan devices |
2979 | |
2980 | return; |
2981 | } |
2982 | |
2983 | // At least one device is not fresh. Check that all non-fresh devices match. |
2984 | |
2985 | bool fresh_drive = false; |
2986 | bool non_commit_drive = false; |
2987 | ssd_common_prefix *prefix_first = &headers[first_used]->common.prefix; |
2988 | |
2989 | memset(ssds->device_translation, -1, sizeof(ssds->device_translation)); |
2990 | |
2991 | for (int i = 0; i < n_ssds; i++) { |
2992 | drv_ssd *ssd = &ssds->ssds[i]; |
2993 | ssd_common_prefix *prefix_i = &headers[i]->common.prefix; |
2994 | uint32_t old_device_id = headers[i]->unique.device_id; |
2995 | |
2996 | headers[i]->unique.device_id = (uint32_t)i; |
2997 | |
2998 | // Skip fresh devices. |
2999 | if (prefix_i->random == 0) { |
3000 | cf_info(AS_DRV_SSD, "{%s} device %s is empty" , ns->name, ssd->name); |
3001 | fresh_drive = true; |
3002 | continue; |
3003 | } |
3004 | |
3005 | ssd_init_pristine_wblock_id(ssd, headers[i]->unique.pristine_offset); |
3006 | |
3007 | ssds->device_translation[old_device_id] = (int8_t)i; |
3008 | |
3009 | if (prefix_first->random != prefix_i->random) { |
3010 | cf_crash(AS_DRV_SSD, "{%s} drive set with unmatched headers - devices %s & %s have different signatures" , |
3011 | ns->name, ssds->ssds[first_used].name, ssd->name); |
3012 | } |
3013 | |
3014 | if (prefix_first->n_devices != prefix_i->n_devices) { |
3015 | cf_crash(AS_DRV_SSD, "{%s} drive set with unmatched headers - devices %s & %s have different device counts" , |
3016 | ns->name, ssds->ssds[first_used].name, ssd->name); |
3017 | } |
3018 | |
3019 | // These should all be 0, unless upgrading from pre-4.5.1. |
3020 | if (prefix_first->last_evict_void_time != |
3021 | prefix_i->last_evict_void_time) { |
3022 | cf_warning(AS_DRV_SSD, "{%s} devices have inconsistent evict-void-times - ignoring" , |
3023 | ns->name); |
3024 | prefix_first->last_evict_void_time = 0; |
3025 | } |
3026 | |
3027 | if ((prefix_i->flags & SSD_HEADER_FLAG_TRUSTED) == 0) { |
3028 | cf_info(AS_DRV_SSD, "{%s} device %s prior shutdown not clean" , |
3029 | ns->name, ssd->name); |
3030 | ns->dirty_restart = true; |
3031 | } |
3032 | |
3033 | if ((prefix_i->flags & SSD_HEADER_FLAG_COMMIT_TO_DEVICE) == 0) { |
3034 | non_commit_drive = true; |
3035 | } |
3036 | } |
3037 | |
3038 | // Handle devices' evict threshold - may be upgrading from pre-4.5.1. |
3039 | if (prefix_first->last_evict_void_time != 0) { |
3040 | if (ns->smd_evict_void_time == 0) { |
3041 | ns->smd_evict_void_time = prefix_first->last_evict_void_time; |
3042 | // Leave header threshold in case we don't commit SMD threshold. |
3043 | } |
3044 | else { |
3045 | // Use SMD threshold, may now erase header threshold. |
3046 | prefix_first->last_evict_void_time = 0; |
3047 | } |
3048 | } |
3049 | |
3050 | // Drive set OK - fix up header set. |
3051 | ssds->common = cf_valloc(ROUND_UP_COMMON); |
3052 | memcpy(ssds->common, &headers[first_used]->common, ROUND_UP_COMMON); |
3053 | |
3054 | ssds->common->prefix.n_devices = n_ssds; // may have added fresh drives |
3055 | ssds->common->prefix.random = random; |
3056 | ssds->common->prefix.flags &= ~SSD_HEADER_FLAG_TRUSTED; |
3057 | |
3058 | if (fresh_drive || (ns->dirty_restart && non_commit_drive)) { |
3059 | ssd_adjust_versions(ns, ssds->common->pmeta); |
3060 | } |
3061 | |
3062 | ssd_flush_header(ssds, headers); |
3063 | ssd_flush_final_cfg(ns); |
3064 | |
3065 | for (int i = 0; i < n_ssds; i++) { |
3066 | cf_free(headers[i]); |
3067 | } |
3068 | |
3069 | uint32_t now = as_record_void_time_get(); |
3070 | |
3071 | // Sanity check void-times during startup. |
3072 | ns->startup_max_void_time = now + MAX_ALLOWED_TTL; |
3073 | |
3074 | // Cache booleans indicating whether partitions are owned or not. Also |
3075 | // restore tree-ids - note that absent partitions do have tree-ids. |
3076 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
3077 | ssd_common_pmeta *pmeta = &ssds->common->pmeta[pid]; |
3078 | |
3079 | ssds->get_state_from_storage[pid] = |
3080 | as_partition_version_has_data(&pmeta->version); |
3081 | ns->partitions[pid].tree_id = pmeta->tree_id; |
3082 | } |
3083 | |
3084 | // Warm or cool restart. |
3085 | if (! ns->cold_start) { |
3086 | as_truncate_done_startup(ns); // set truncate last-update-times in sets' vmap |
3087 | ssd_resume_devices(ssds); |
3088 | |
3089 | return; // warm restart, or warm restart phase of cool restart, is done |
3090 | } |
3091 | |
3092 | // Cold start - we can now create our partition trees. |
3093 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
3094 | if (ssds->get_state_from_storage[pid]) { |
3095 | as_partition* p = &ns->partitions[pid]; |
3096 | |
3097 | p->tree = as_index_tree_create(&ns->tree_shared, p->tree_id, |
3098 | as_partition_tree_done, (void*)p); |
3099 | } |
3100 | } |
3101 | |
3102 | // Initialize the cold start expiration and eviction machinery. |
3103 | cf_mutex_init(&ns->cold_start_evict_lock); |
3104 | ns->cold_start_now = now; |
3105 | } |
3106 | |
3107 | |
3108 | static uint64_t |
3109 | check_file_size(as_namespace *ns, uint64_t file_size, const char *tag) |
3110 | { |
3111 | cf_assert(sizeof(off_t) > 4, AS_DRV_SSD, "this OS supports only 32-bit (4g) files - compile with 64 bit offsets" ); |
3112 | |
3113 | if (file_size > SSD_HEADER_SIZE) { |
3114 | off_t unusable_size = |
3115 | (file_size - SSD_HEADER_SIZE) % ns->storage_write_block_size; |
3116 | |
3117 | if (unusable_size != 0) { |
3118 | cf_info(AS_DRV_SSD, "%s size must be header size %u + multiple of %u, rounding down" , |
3119 | tag, SSD_HEADER_SIZE, ns->storage_write_block_size); |
3120 | file_size -= unusable_size; |
3121 | } |
3122 | |
3123 | if (file_size > AS_STORAGE_MAX_DEVICE_SIZE) { |
3124 | cf_warning(AS_DRV_SSD, "%s size must be <= %ld, trimming original size %ld" , |
3125 | tag, AS_STORAGE_MAX_DEVICE_SIZE, file_size); |
3126 | file_size = AS_STORAGE_MAX_DEVICE_SIZE; |
3127 | } |
3128 | } |
3129 | |
3130 | if (file_size <= SSD_HEADER_SIZE) { |
3131 | cf_crash(AS_DRV_SSD, "%s size %ld must be greater than header size %d" , |
3132 | tag, file_size, SSD_HEADER_SIZE); |
3133 | } |
3134 | |
3135 | return file_size; |
3136 | } |
3137 | |
3138 | |
3139 | static uint64_t |
3140 | find_io_min_size(int fd, const char *ssd_name) |
3141 | { |
3142 | uint8_t *buf = cf_valloc(HI_IO_MIN_SIZE); |
3143 | size_t read_sz = LO_IO_MIN_SIZE; |
3144 | |
3145 | while (read_sz <= HI_IO_MIN_SIZE) { |
3146 | if (pread_all(fd, (void*)buf, read_sz, 0)) { |
3147 | cf_free(buf); |
3148 | return read_sz; |
3149 | } |
3150 | |
3151 | read_sz <<= 1; // LO_IO_MIN_SIZE and HI_IO_MIN_SIZE are powers of 2 |
3152 | } |
3153 | |
3154 | cf_crash(AS_DRV_SSD, "%s: read failed at all sizes from %u to %u bytes" , |
3155 | ssd_name, LO_IO_MIN_SIZE, HI_IO_MIN_SIZE); |
3156 | |
3157 | return 0; |
3158 | } |
3159 | |
3160 | |
3161 | void |
3162 | ssd_init_devices(as_namespace *ns, drv_ssds **ssds_p) |
3163 | { |
3164 | size_t ssds_size = sizeof(drv_ssds) + |
3165 | (ns->n_storage_devices * sizeof(drv_ssd)); |
3166 | drv_ssds *ssds = cf_malloc(ssds_size); |
3167 | |
3168 | memset(ssds, 0, ssds_size); |
3169 | ssds->n_ssds = (int)ns->n_storage_devices; |
3170 | ssds->ns = ns; |
3171 | |
3172 | // Raw device-specific initialization of drv_ssd structures. |
3173 | for (uint32_t i = 0; i < ns->n_storage_devices; i++) { |
3174 | drv_ssd *ssd = &ssds->ssds[i]; |
3175 | |
3176 | ssd->name = ns->storage_devices[i]; |
3177 | |
3178 | // Note - can't configure commit-to-device and disable-odsync. |
3179 | ssd->open_flag = O_RDWR | O_DIRECT | |
3180 | (ns->storage_disable_odsync ? 0 : O_DSYNC); |
3181 | |
3182 | int fd = open(ssd->name, ssd->open_flag, S_IRUSR | S_IWUSR); |
3183 | |
3184 | if (fd == -1) { |
3185 | cf_crash(AS_DRV_SSD, "unable to open device %s: %s" , ssd->name, |
3186 | cf_strerror(errno)); |
3187 | } |
3188 | |
3189 | uint64_t size = 0; |
3190 | |
3191 | ioctl(fd, BLKGETSIZE64, &size); // gets the number of bytes |
3192 | |
3193 | ssd->file_size = check_file_size(ns, size, "usable device" ); |
3194 | ssd->io_min_size = find_io_min_size(fd, ssd->name); |
3195 | |
3196 | if (ns->cold_start && ns->storage_cold_start_empty) { |
3197 | ssd_empty_header(fd, ssd->name); |
3198 | |
3199 | cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s" , |
3200 | ssd->name); |
3201 | } |
3202 | |
3203 | close(fd); |
3204 | |
3205 | ns->ssd_size += ssd->file_size; // increment total storage size |
3206 | |
3207 | cf_info(AS_DRV_SSD, "opened device %s: usable size %lu, io-min-size %lu" , |
3208 | ssd->name, ssd->file_size, ssd->io_min_size); |
3209 | |
3210 | if (ns->storage_scheduler_mode) { |
3211 | // Set scheduler mode specified in config file. |
3212 | cf_storage_set_scheduler(ssd->name, ns->storage_scheduler_mode); |
3213 | } |
3214 | } |
3215 | |
3216 | *ssds_p = ssds; |
3217 | } |
3218 | |
3219 | |
3220 | void |
3221 | ssd_init_shadow_devices(as_namespace *ns, drv_ssds *ssds) |
3222 | { |
3223 | if (ns->n_storage_shadows == 0) { |
3224 | // No shadows - a normal deployment. |
3225 | return; |
3226 | } |
3227 | |
3228 | // Check shadow devices. |
3229 | for (uint32_t i = 0; i < ns->n_storage_shadows; i++) { |
3230 | drv_ssd *ssd = &ssds->ssds[i]; |
3231 | |
3232 | ssd->shadow_name = ns->storage_shadows[i]; |
3233 | |
3234 | int fd = open(ssd->shadow_name, ssd->open_flag, S_IRUSR | S_IWUSR); |
3235 | |
3236 | if (fd == -1) { |
3237 | cf_crash(AS_DRV_SSD, "unable to open shadow device %s: %s" , |
3238 | ssd->shadow_name, cf_strerror(errno)); |
3239 | } |
3240 | |
3241 | uint64_t size = 0; |
3242 | |
3243 | ioctl(fd, BLKGETSIZE64, &size); // gets the number of bytes |
3244 | |
3245 | if (size < ssd->file_size) { |
3246 | cf_crash(AS_DRV_SSD, "shadow device %s is smaller than main device - %lu < %lu" , |
3247 | ssd->shadow_name, size, ssd->file_size); |
3248 | } |
3249 | |
3250 | ssd->shadow_io_min_size = find_io_min_size(fd, ssd->shadow_name); |
3251 | |
3252 | if (ns->cold_start && ns->storage_cold_start_empty) { |
3253 | ssd_empty_header(fd, ssd->shadow_name); |
3254 | |
3255 | cf_info(AS_DRV_SSD, "cold-start-empty - erased header of %s" , |
3256 | ssd->shadow_name); |
3257 | } |
3258 | |
3259 | close(fd); |
3260 | |
3261 | cf_info(AS_DRV_SSD, "shadow device %s is compatible with main device, shadow-io-min-size %lu" , |
3262 | ssd->shadow_name, ssd->shadow_io_min_size); |
3263 | |
3264 | if (ns->storage_scheduler_mode) { |
3265 | // Set scheduler mode specified in config file. |
3266 | cf_storage_set_scheduler(ssd->shadow_name, |
3267 | ns->storage_scheduler_mode); |
3268 | } |
3269 | } |
3270 | } |
3271 | |
3272 | |
3273 | void |
3274 | ssd_init_files(as_namespace *ns, drv_ssds **ssds_p) |
3275 | { |
3276 | size_t ssds_size = sizeof(drv_ssds) + |
3277 | (ns->n_storage_files * sizeof(drv_ssd)); |
3278 | drv_ssds *ssds = cf_malloc(ssds_size); |
3279 | |
3280 | memset(ssds, 0, ssds_size); |
3281 | ssds->n_ssds = (int)ns->n_storage_files; |
3282 | ssds->ns = ns; |
3283 | |
3284 | // File-specific initialization of drv_ssd structures. |
3285 | for (uint32_t i = 0; i < ns->n_storage_files; i++) { |
3286 | drv_ssd *ssd = &ssds->ssds[i]; |
3287 | |
3288 | ssd->name = ns->storage_devices[i]; |
3289 | |
3290 | if (ns->cold_start && ns->storage_cold_start_empty) { |
3291 | if (unlink(ssd->name) == 0) { |
3292 | cf_info(AS_DRV_SSD, "cold-start-empty - removed %s" , ssd->name); |
3293 | } |
3294 | else if (errno == ENOENT) { |
3295 | cf_info(AS_DRV_SSD, "cold-start-empty - no file %s" , ssd->name); |
3296 | } |
3297 | else { |
3298 | cf_crash(AS_DRV_SSD, "failed remove: errno %d" , errno); |
3299 | } |
3300 | } |
3301 | |
3302 | // Note - can't configure commit-to-device and disable-odsync. |
3303 | uint32_t direct_flags = |
3304 | O_DIRECT | (ns->storage_disable_odsync ? 0 : O_DSYNC); |
3305 | |
3306 | ssd->open_flag = O_RDWR | |
3307 | (ns->storage_commit_to_device || ns->storage_direct_files ? |
3308 | direct_flags : 0); |
3309 | |
3310 | // Validate that file can be opened, create it if it doesn't exist. |
3311 | int fd = open(ssd->name, ssd->open_flag | O_CREAT, S_IRUSR | S_IWUSR); |
3312 | |
3313 | if (fd == -1) { |
3314 | cf_crash(AS_DRV_SSD, "unable to open file %s: %s" , ssd->name, |
3315 | cf_strerror(errno)); |
3316 | } |
3317 | |
3318 | ssd->file_size = check_file_size(ns, ns->storage_filesize, "file" ); |
3319 | ssd->io_min_size = LO_IO_MIN_SIZE; |
3320 | |
3321 | // Truncate will grow or shrink the file to the correct size. |
3322 | if (ftruncate(fd, (off_t)ssd->file_size) != 0) { |
3323 | cf_crash(AS_DRV_SSD, "unable to truncate file: errno %d" , errno); |
3324 | } |
3325 | |
3326 | close(fd); |
3327 | |
3328 | ns->ssd_size += ssd->file_size; // increment total storage size |
3329 | |
3330 | cf_info(AS_DRV_SSD, "opened file %s: usable size %lu" , ssd->name, |
3331 | ssd->file_size); |
3332 | } |
3333 | |
3334 | *ssds_p = ssds; |
3335 | } |
3336 | |
3337 | |
3338 | void |
3339 | ssd_init_shadow_files(as_namespace *ns, drv_ssds *ssds) |
3340 | { |
3341 | if (ns->n_storage_shadows == 0) { |
3342 | // No shadows - a normal deployment. |
3343 | return; |
3344 | } |
3345 | |
3346 | // Check shadow files. |
3347 | for (uint32_t i = 0; i < ns->n_storage_shadows; i++) { |
3348 | drv_ssd *ssd = &ssds->ssds[i]; |
3349 | |
3350 | ssd->shadow_name = ns->storage_shadows[i]; |
3351 | |
3352 | if (ns->cold_start && ns->storage_cold_start_empty) { |
3353 | if (unlink(ssd->shadow_name) == 0) { |
3354 | cf_info(AS_DRV_SSD, "cold-start-empty - removed %s" , |
3355 | ssd->shadow_name); |
3356 | } |
3357 | else if (errno == ENOENT) { |
3358 | cf_info(AS_DRV_SSD, "cold-start-empty - no shadow file %s" , |
3359 | ssd->shadow_name); |
3360 | } |
3361 | else { |
3362 | cf_crash(AS_DRV_SSD, "failed remove: errno %d" , errno); |
3363 | } |
3364 | } |
3365 | |
3366 | // Validate that file can be opened, create it if it doesn't exist. |
3367 | int fd = open(ssd->shadow_name, ssd->open_flag | O_CREAT, |
3368 | S_IRUSR | S_IWUSR); |
3369 | |
3370 | if (fd == -1) { |
3371 | cf_crash(AS_DRV_SSD, "unable to open shadow file %s: %s" , |
3372 | ssd->shadow_name, cf_strerror(errno)); |
3373 | } |
3374 | |
3375 | // Truncate will grow or shrink the file to the correct size. |
3376 | if (ftruncate(fd, (off_t)ssd->file_size) != 0) { |
3377 | cf_crash(AS_DRV_SSD, "unable to truncate file: errno %d" , errno); |
3378 | } |
3379 | |
3380 | ssd->shadow_io_min_size = LO_IO_MIN_SIZE; |
3381 | |
3382 | close(fd); |
3383 | |
3384 | cf_info(AS_DRV_SSD, "shadow file %s is initialized" , ssd->shadow_name); |
3385 | } |
3386 | } |
3387 | |
3388 | |
3389 | //========================================================== |
3390 | // Generic shutdown utilities. |
3391 | // |
3392 | |
3393 | static void |
3394 | ssd_set_pristine_offset(drv_ssds *ssds) |
3395 | { |
3396 | // Round down to nearest multiple of HI_IO_MIN_SIZE - for simplicity, using |
3397 | // HI_IO_MIN_SIZE to allocate once outside the loop. |
3398 | off_t offset = offsetof(ssd_device_header, unique.pristine_offset) & |
3399 | -(uint64_t)HI_IO_MIN_SIZE; |
3400 | |
3401 | // pristine_offset is a uint64_t, must sit within HI_IO_MIN_SIZE of offset. |
3402 | ssd_device_unique * = cf_valloc(HI_IO_MIN_SIZE); |
3403 | |
3404 | cf_mutex_lock(&ssds->flush_lock); |
3405 | |
3406 | for (int i = 0; i < ssds->n_ssds; i++) { |
3407 | drv_ssd *ssd = &ssds->ssds[i]; |
3408 | |
3409 | int fd = ssd_fd_get(ssd); |
3410 | |
3411 | if (! pread_all(fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) { |
3412 | cf_crash(AS_DRV_SSD, "%s: read failed: errno %d (%s)" , |
3413 | ssd->name, errno, cf_strerror(errno)); |
3414 | } |
3415 | |
3416 | header_unique->pristine_offset = |
3417 | (uint64_t)ssd->pristine_wblock_id * ssd->write_block_size; |
3418 | |
3419 | if (! pwrite_all(fd, (void *)header_unique, HI_IO_MIN_SIZE, offset)) { |
3420 | cf_crash(AS_DRV_SSD, "%s: DEVICE FAILED write: errno %d (%s)" , |
3421 | ssd->name, errno, cf_strerror(errno)); |
3422 | } |
3423 | |
3424 | ssd_fd_put(ssd, fd); |
3425 | |
3426 | // Skip shadow - persisted offset never used at cold start. |
3427 | } |
3428 | |
3429 | cf_mutex_unlock(&ssds->flush_lock); |
3430 | |
3431 | cf_free(header_unique); |
3432 | } |
3433 | |
3434 | |
3435 | static void |
3436 | ssd_set_trusted(drv_ssds *ssds) |
3437 | { |
3438 | cf_mutex_lock(&ssds->flush_lock); |
3439 | |
3440 | ssds->common->prefix.flags |= SSD_HEADER_FLAG_TRUSTED; |
3441 | |
3442 | for (int i = 0; i < ssds->n_ssds; i++) { |
3443 | drv_ssd *ssd = &ssds->ssds[i]; |
3444 | |
3445 | ssd_write_header(ssd, (uint8_t *)ssds->common, |
3446 | (uint8_t *)&ssds->common->prefix.flags, |
3447 | sizeof(ssds->common->prefix.flags)); |
3448 | } |
3449 | |
3450 | cf_mutex_unlock(&ssds->flush_lock); |
3451 | } |
3452 | |
3453 | |
3454 | //========================================================== |
3455 | // Storage API implementation: startup, shutdown, etc. |
3456 | // |
3457 | |
3458 | void |
3459 | as_storage_namespace_init_ssd(as_namespace *ns) |
3460 | { |
3461 | drv_ssds *ssds; |
3462 | |
3463 | if (ns->n_storage_devices != 0) { |
3464 | ssd_init_devices(ns, &ssds); |
3465 | ssd_init_shadow_devices(ns, ssds); |
3466 | } |
3467 | else { |
3468 | ssd_init_files(ns, &ssds); |
3469 | ssd_init_shadow_files(ns, ssds); |
3470 | } |
3471 | |
3472 | cf_mutex_init(&ssds->flush_lock); |
3473 | |
3474 | // Allow defrag to go full speed during startup - restore the configured |
3475 | // settings when startup is done. |
3476 | ns->saved_defrag_sleep = ns->storage_defrag_sleep; |
3477 | ns->storage_defrag_sleep = 0; |
3478 | |
3479 | // The queue limit is more efficient to work with. |
3480 | ns->storage_max_write_q = (int) |
3481 | (ns->storage_max_write_cache / ns->storage_write_block_size); |
3482 | |
3483 | // Minimize how often we recalculate this. |
3484 | ns->defrag_lwm_size = |
3485 | (ns->storage_write_block_size * ns->storage_defrag_lwm_pct) / 100; |
3486 | |
3487 | ns->storage_private = (void*)ssds; |
3488 | |
3489 | char histname[HISTOGRAM_NAME_SIZE]; |
3490 | |
3491 | snprintf(histname, sizeof(histname), "{%s}-device-read-size" , ns->name); |
3492 | ns->device_read_size_hist = histogram_create(histname, HIST_SIZE); |
3493 | |
3494 | snprintf(histname, sizeof(histname), "{%s}-device-write-size" , ns->name); |
3495 | ns->device_write_size_hist = histogram_create(histname, HIST_SIZE); |
3496 | |
3497 | uint32_t first_wblock_id = SSD_HEADER_SIZE / ns->storage_write_block_size; |
3498 | |
3499 | // Finish initializing drv_ssd structures (non-zero-value members). |
3500 | for (int i = 0; i < ssds->n_ssds; i++) { |
3501 | drv_ssd *ssd = &ssds->ssds[i]; |
3502 | |
3503 | ssd->ns = ns; |
3504 | ssd->file_id = i; |
3505 | |
3506 | cf_mutex_init(&ssd->write_lock); |
3507 | cf_mutex_init(&ssd->defrag_lock); |
3508 | |
3509 | ssd->running = true; |
3510 | |
3511 | // Some (non-dynamic) config shortcuts: |
3512 | ssd->write_block_size = ns->storage_write_block_size; |
3513 | ssd->first_wblock_id = first_wblock_id; |
3514 | |
3515 | // Non-fresh devices will initialize this appropriately later. |
3516 | ssd->pristine_wblock_id = first_wblock_id; |
3517 | |
3518 | ssd_wblock_init(ssd); |
3519 | |
3520 | // Note: free_wblock_q, defrag_wblock_q created after loading devices. |
3521 | |
3522 | ssd->fd_q = cf_queue_create(sizeof(int), true); |
3523 | ssd->fd_cache_q = cf_queue_create(sizeof(int), true); |
3524 | |
3525 | if (ssd->shadow_name) { |
3526 | ssd->shadow_fd_q = cf_queue_create(sizeof(int), true); |
3527 | } |
3528 | |
3529 | ssd->swb_write_q = cf_queue_create(sizeof(void*), true); |
3530 | |
3531 | if (ssd->shadow_name) { |
3532 | ssd->swb_shadow_q = cf_queue_create(sizeof(void*), true); |
3533 | } |
3534 | |
3535 | ssd->swb_free_q = cf_queue_create(sizeof(void*), true); |
3536 | |
3537 | if (! ns->storage_data_in_memory) { |
3538 | // TODO - hide the storage_commit_to_device usage. |
3539 | ssd->post_write_q = cf_queue_create(sizeof(void*), |
3540 | ns->storage_commit_to_device); |
3541 | } |
3542 | |
3543 | snprintf(histname, sizeof(histname), "{%s}-%s-read" , ns->name, ssd->name); |
3544 | ssd->hist_read = histogram_create(histname, HIST_MILLISECONDS); |
3545 | |
3546 | snprintf(histname, sizeof(histname), "{%s}-%s-large-block-read" , ns->name, ssd->name); |
3547 | ssd->hist_large_block_read = histogram_create(histname, HIST_MILLISECONDS); |
3548 | |
3549 | snprintf(histname, sizeof(histname), "{%s}-%s-write" , ns->name, ssd->name); |
3550 | ssd->hist_write = histogram_create(histname, HIST_MILLISECONDS); |
3551 | |
3552 | if (ssd->shadow_name) { |
3553 | snprintf(histname, sizeof(histname), "{%s}-%s-shadow-write" , ns->name, ssd->name); |
3554 | ssd->hist_shadow_write = histogram_create(histname, HIST_MILLISECONDS); |
3555 | } |
3556 | |
3557 | ssd_init_commit(ssd); |
3558 | } |
3559 | |
3560 | // Will load headers and, if warm or cool restart, resume persisted index. |
3561 | ssd_init_synchronous(ssds); |
3562 | } |
3563 | |
3564 | |
3565 | void |
3566 | as_storage_namespace_load_ssd(as_namespace *ns, cf_queue *complete_q) |
3567 | { |
3568 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3569 | |
3570 | // If devices have data, and it's cold start or cool restart, scan devices. |
3571 | if (! ssds->all_fresh && |
3572 | (ns->cold_start || as_namespace_cool_restarts(ns))) { |
3573 | // Fire off threads to scan devices to build index and/or load record |
3574 | // data into memory - will signal completion when threads are all done. |
3575 | start_loading_records(ssds, complete_q); |
3576 | return; |
3577 | } |
3578 | // else - fresh devices or warm restart, this namespace is ready to roll. |
3579 | |
3580 | ssd_load_wblock_queues(ssds); |
3581 | |
3582 | ssd_start_maintenance_threads(ssds); |
3583 | ssd_start_write_threads(ssds); |
3584 | ssd_start_defrag_threads(ssds); |
3585 | |
3586 | void *_t = NULL; |
3587 | |
3588 | cf_queue_push(complete_q, &_t); |
3589 | } |
3590 | |
3591 | |
3592 | void |
3593 | as_storage_loading_records_ticker_ssd() |
3594 | { |
3595 | for (uint32_t i = 0; i < g_config.n_namespaces; i++) { |
3596 | as_namespace *ns = g_config.namespaces[i]; |
3597 | |
3598 | if (ns->loading_records) { |
3599 | char buf[2048]; |
3600 | int pos = 0; |
3601 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3602 | |
3603 | for (int j = 0; j < ssds->n_ssds; j++) { |
3604 | drv_ssd *ssd = &ssds->ssds[j]; |
3605 | uint32_t pct = (uint32_t)((ssd->sweep_wblock_id * 100UL) / |
3606 | (ssd->file_size / ssd->write_block_size)); |
3607 | |
3608 | pos += sprintf(buf + pos, ", %s %u%%" , ssd->name, pct); |
3609 | } |
3610 | |
3611 | // TODO - conform with new log standard? |
3612 | if (ns->n_tombstones == 0) { |
3613 | cf_info(AS_DRV_SSD, "{%s} loaded %lu objects%s" , ns->name, |
3614 | ns->n_objects, buf); |
3615 | } |
3616 | else { |
3617 | cf_info(AS_DRV_SSD, "{%s} loaded %lu objects, %lu tombstones%s" , |
3618 | ns->name, ns->n_objects, ns->n_tombstones, buf); |
3619 | } |
3620 | } |
3621 | } |
3622 | } |
3623 | |
3624 | |
3625 | int |
3626 | as_storage_namespace_destroy_ssd(as_namespace *ns) |
3627 | { |
3628 | // This is not called - for now we don't bother unwinding. |
3629 | return 0; |
3630 | } |
3631 | |
3632 | |
3633 | // Note that this is *NOT* the counterpart to as_storage_record_create_ssd()! |
3634 | // That would be as_storage_record_close_ssd(). This is what gets called when a |
3635 | // record is destroyed, to dereference storage. |
3636 | int |
3637 | as_storage_record_destroy_ssd(as_namespace *ns, as_record *r) |
3638 | { |
3639 | if (STORAGE_RBLOCK_IS_VALID(r->rblock_id) && r->n_rblocks != 0) { |
3640 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3641 | drv_ssd *ssd = &ssds->ssds[r->file_id]; |
3642 | |
3643 | ssd_block_free(ssd, r->rblock_id, r->n_rblocks, "destroy" ); |
3644 | |
3645 | r->rblock_id = 0; |
3646 | r->n_rblocks = 0; |
3647 | } |
3648 | |
3649 | return 0; |
3650 | } |
3651 | |
3652 | |
3653 | //========================================================== |
3654 | // Storage API implementation: as_storage_rd cycle. |
3655 | // |
3656 | |
3657 | int |
3658 | as_storage_record_create_ssd(as_storage_rd *rd) |
3659 | { |
3660 | rd->flat = NULL; |
3661 | rd->flat_end = NULL; |
3662 | rd->flat_bins = NULL; |
3663 | rd->flat_n_bins = 0; |
3664 | rd->read_buf = NULL; |
3665 | rd->ssd = NULL; |
3666 | |
3667 | cf_assert(rd->r->rblock_id == 0, AS_DRV_SSD, "unexpected - uninitialized rblock-id" ); |
3668 | |
3669 | return 0; |
3670 | } |
3671 | |
3672 | |
3673 | int |
3674 | as_storage_record_open_ssd(as_storage_rd *rd) |
3675 | { |
3676 | drv_ssds *ssds = (drv_ssds*)rd->ns->storage_private; |
3677 | |
3678 | rd->flat = NULL; |
3679 | rd->flat_end = NULL; |
3680 | rd->flat_bins = NULL; |
3681 | rd->flat_n_bins = 0; |
3682 | rd->read_buf = NULL; |
3683 | rd->ssd = &ssds->ssds[rd->r->file_id]; |
3684 | |
3685 | return 0; |
3686 | } |
3687 | |
3688 | |
3689 | int |
3690 | as_storage_record_close_ssd(as_storage_rd *rd) |
3691 | { |
3692 | if (rd->read_buf) { |
3693 | cf_free(rd->read_buf); |
3694 | rd->read_buf = NULL; |
3695 | } |
3696 | |
3697 | rd->flat = NULL; |
3698 | rd->flat_end = NULL; |
3699 | rd->flat_bins = NULL; |
3700 | rd->flat_n_bins = 0; |
3701 | rd->ssd = NULL; |
3702 | |
3703 | return 0; |
3704 | } |
3705 | |
3706 | |
3707 | // These are near the top of this file: |
3708 | // as_storage_record_get_n_bins_ssd() |
3709 | // as_storage_record_read_ssd() |
3710 | // as_storage_particle_read_all_ssd() |
3711 | // as_storage_particle_read_and_size_all_ssd() |
3712 | |
3713 | |
3714 | bool |
3715 | as_storage_record_size_and_check_ssd(as_storage_rd *rd) |
3716 | { |
3717 | return rd->ns->storage_write_block_size >= as_flat_record_size(rd); |
3718 | } |
3719 | |
3720 | |
3721 | //========================================================== |
3722 | // Storage API implementation: storage capacity monitoring. |
3723 | // |
3724 | |
3725 | void |
3726 | as_storage_wait_for_defrag_ssd(as_namespace *ns) |
3727 | { |
3728 | if (ns->storage_defrag_startup_minimum > 0) { |
3729 | while (true) { |
3730 | int avail_pct; |
3731 | |
3732 | if (0 != as_storage_stats_ssd(ns, &avail_pct, 0)) { |
3733 | cf_crash(AS_DRV_SSD, "namespace %s storage stats failed" , |
3734 | ns->name); |
3735 | } |
3736 | |
3737 | if (avail_pct >= ns->storage_defrag_startup_minimum) { |
3738 | break; |
3739 | } |
3740 | |
3741 | cf_info(AS_DRV_SSD, "namespace %s waiting for defrag: %d pct available, waiting for %d ..." , |
3742 | ns->name, avail_pct, ns->storage_defrag_startup_minimum); |
3743 | |
3744 | sleep(2); |
3745 | } |
3746 | } |
3747 | |
3748 | // Restore configured defrag throttling values. |
3749 | ns->storage_defrag_sleep = ns->saved_defrag_sleep; |
3750 | } |
3751 | |
3752 | |
3753 | bool |
3754 | as_storage_overloaded_ssd(as_namespace *ns) |
3755 | { |
3756 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3757 | int max_write_q = ns->storage_max_write_q; |
3758 | |
3759 | // TODO - would be nice to not do this loop every single write transaction! |
3760 | for (int i = 0; i < ssds->n_ssds; i++) { |
3761 | drv_ssd *ssd = &ssds->ssds[i]; |
3762 | int qsz = cf_queue_sz(ssd->swb_write_q); |
3763 | |
3764 | if (qsz > max_write_q) { |
3765 | cf_ticker_warning(AS_DRV_SSD, "{%s} write fail: queue too deep: exceeds max %d" , |
3766 | ns->name, max_write_q); |
3767 | return true; |
3768 | } |
3769 | |
3770 | if (ssd->shadow_name) { |
3771 | qsz = cf_queue_sz(ssd->swb_shadow_q); |
3772 | |
3773 | if (qsz > max_write_q) { |
3774 | cf_ticker_warning(AS_DRV_SSD, "{%s} write fail: shadow queue too deep: exceeds max %d" , |
3775 | ns->name, max_write_q); |
3776 | return true; |
3777 | } |
3778 | } |
3779 | } |
3780 | |
3781 | return false; |
3782 | } |
3783 | |
3784 | |
3785 | bool |
3786 | as_storage_has_space_ssd(as_namespace *ns) |
3787 | { |
3788 | // Shortcut - assume we can't go from 5% to 0% in 1 ticker interval. |
3789 | if (ns->storage_last_avail_pct > 5) { |
3790 | return true; |
3791 | } |
3792 | // else - running low on available percent, check rigorously... |
3793 | |
3794 | drv_ssds* ssds = (drv_ssds*)ns->storage_private; |
3795 | |
3796 | for (int i = 0; i < ssds->n_ssds; i++) { |
3797 | if (num_free_wblocks(&ssds->ssds[i]) < min_free_wblocks(ns)) { |
3798 | return false; |
3799 | } |
3800 | } |
3801 | |
3802 | return true; |
3803 | } |
3804 | |
3805 | |
3806 | void |
3807 | as_storage_defrag_sweep_ssd(as_namespace *ns) |
3808 | { |
3809 | cf_info(AS_DRV_SSD, "{%s} sweeping all devices for wblocks to defrag ..." , ns->name); |
3810 | |
3811 | drv_ssds* ssds = (drv_ssds*)ns->storage_private; |
3812 | |
3813 | for (int i = 0; i < ssds->n_ssds; i++) { |
3814 | cf_atomic32_incr(&ssds->ssds[i].defrag_sweep); |
3815 | } |
3816 | } |
3817 | |
3818 | |
3819 | //========================================================== |
3820 | // Storage API implementation: data in device headers. |
3821 | // |
3822 | |
3823 | void |
3824 | as_storage_load_regime_ssd(as_namespace *ns) |
3825 | { |
3826 | drv_ssds* ssds = (drv_ssds*)ns->storage_private; |
3827 | |
3828 | ns->eventual_regime = ssds->common->prefix.eventual_regime; |
3829 | ns->rebalance_regime = ns->eventual_regime; |
3830 | } |
3831 | |
3832 | |
3833 | void |
3834 | as_storage_save_regime_ssd(as_namespace *ns) |
3835 | { |
3836 | drv_ssds* ssds = (drv_ssds*)ns->storage_private; |
3837 | |
3838 | cf_mutex_lock(&ssds->flush_lock); |
3839 | |
3840 | ssds->common->prefix.eventual_regime = ns->eventual_regime; |
3841 | |
3842 | for (int i = 0; i < ssds->n_ssds; i++) { |
3843 | drv_ssd* ssd = &ssds->ssds[i]; |
3844 | |
3845 | ssd_write_header(ssd, (uint8_t*)ssds->common, |
3846 | (uint8_t*)&ssds->common->prefix.eventual_regime, |
3847 | sizeof(ssds->common->prefix.eventual_regime)); |
3848 | } |
3849 | |
3850 | cf_mutex_unlock(&ssds->flush_lock); |
3851 | } |
3852 | |
3853 | |
3854 | void |
3855 | as_storage_load_roster_generation_ssd(as_namespace *ns) |
3856 | { |
3857 | drv_ssds* ssds = (drv_ssds*)ns->storage_private; |
3858 | |
3859 | ns->roster_generation = ssds->common->prefix.roster_generation; |
3860 | } |
3861 | |
3862 | |
3863 | void |
3864 | as_storage_save_roster_generation_ssd(as_namespace *ns) |
3865 | { |
3866 | drv_ssds* ssds = (drv_ssds*)ns->storage_private; |
3867 | |
3868 | // Normal for this to not change, cleaner to check here versus outside. |
3869 | if (ns->roster_generation == ssds->common->prefix.roster_generation) { |
3870 | return; |
3871 | } |
3872 | |
3873 | cf_mutex_lock(&ssds->flush_lock); |
3874 | |
3875 | ssds->common->prefix.roster_generation = ns->roster_generation; |
3876 | |
3877 | for (int i = 0; i < ssds->n_ssds; i++) { |
3878 | drv_ssd* ssd = &ssds->ssds[i]; |
3879 | |
3880 | ssd_write_header(ssd, (uint8_t*)ssds->common, |
3881 | (uint8_t*)&ssds->common->prefix.roster_generation, |
3882 | sizeof(ssds->common->prefix.roster_generation)); |
3883 | } |
3884 | |
3885 | cf_mutex_unlock(&ssds->flush_lock); |
3886 | } |
3887 | |
3888 | |
3889 | void |
3890 | as_storage_load_pmeta_ssd(as_namespace *ns, as_partition *p) |
3891 | { |
3892 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3893 | ssd_common_pmeta *pmeta = &ssds->common->pmeta[p->id]; |
3894 | |
3895 | p->version = pmeta->version; |
3896 | } |
3897 | |
3898 | |
3899 | void |
3900 | as_storage_save_pmeta_ssd(as_namespace *ns, const as_partition *p) |
3901 | { |
3902 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3903 | ssd_common_pmeta *pmeta = &ssds->common->pmeta[p->id]; |
3904 | |
3905 | cf_mutex_lock(&ssds->flush_lock); |
3906 | |
3907 | pmeta->version = p->version; |
3908 | pmeta->tree_id = p->tree_id; |
3909 | |
3910 | for (int i = 0; i < ssds->n_ssds; i++) { |
3911 | drv_ssd *ssd = &ssds->ssds[i]; |
3912 | |
3913 | ssd_write_header(ssd, (uint8_t*)ssds->common, (uint8_t*)pmeta, |
3914 | sizeof(*pmeta)); |
3915 | } |
3916 | |
3917 | cf_mutex_unlock(&ssds->flush_lock); |
3918 | } |
3919 | |
3920 | |
3921 | void |
3922 | as_storage_cache_pmeta_ssd(as_namespace *ns, const as_partition *p) |
3923 | { |
3924 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3925 | ssd_common_pmeta *pmeta = &ssds->common->pmeta[p->id]; |
3926 | |
3927 | pmeta->version = p->version; |
3928 | pmeta->tree_id = p->tree_id; |
3929 | } |
3930 | |
3931 | |
3932 | void |
3933 | as_storage_flush_pmeta_ssd(as_namespace *ns, uint32_t start_pid, |
3934 | uint32_t n_partitions) |
3935 | { |
3936 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3937 | ssd_common_pmeta *pmeta = &ssds->common->pmeta[start_pid]; |
3938 | |
3939 | cf_mutex_lock(&ssds->flush_lock); |
3940 | |
3941 | for (int i = 0; i < ssds->n_ssds; i++) { |
3942 | drv_ssd *ssd = &ssds->ssds[i]; |
3943 | |
3944 | ssd_write_header(ssd, (uint8_t*)ssds->common, (uint8_t*)pmeta, |
3945 | sizeof(ssd_common_pmeta) * n_partitions); |
3946 | } |
3947 | |
3948 | cf_mutex_unlock(&ssds->flush_lock); |
3949 | } |
3950 | |
3951 | |
3952 | //========================================================== |
3953 | // Storage API implementation: statistics. |
3954 | // |
3955 | |
3956 | int |
3957 | as_storage_stats_ssd(as_namespace *ns, int *available_pct, |
3958 | uint64_t *used_disk_bytes) |
3959 | { |
3960 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3961 | |
3962 | if (available_pct) { |
3963 | *available_pct = 100; |
3964 | |
3965 | // Find the device with the lowest available percent. |
3966 | for (int i = 0; i < ssds->n_ssds; i++) { |
3967 | drv_ssd *ssd = &ssds->ssds[i]; |
3968 | uint64_t pct = (available_size(ssd) * 100) / ssd->file_size; |
3969 | |
3970 | if (pct < (uint64_t)*available_pct) { |
3971 | *available_pct = pct; |
3972 | } |
3973 | } |
3974 | |
3975 | // Used for shortcut in as_storage_has_space_ssd(), which is done on a |
3976 | // per-transaction basis: |
3977 | ns->storage_last_avail_pct = *available_pct; |
3978 | } |
3979 | |
3980 | if (used_disk_bytes) { |
3981 | uint64_t sz = 0; |
3982 | |
3983 | for (int i = 0; i < ssds->n_ssds; i++) { |
3984 | sz += ssds->ssds[i].inuse_size; |
3985 | } |
3986 | |
3987 | *used_disk_bytes = sz; |
3988 | } |
3989 | |
3990 | return 0; |
3991 | } |
3992 | |
3993 | |
3994 | void |
3995 | as_storage_device_stats_ssd(struct as_namespace_s *ns, uint32_t device_ix, |
3996 | storage_device_stats *stats) |
3997 | { |
3998 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
3999 | drv_ssd *ssd = &ssds->ssds[device_ix]; |
4000 | |
4001 | stats->used_sz = ssd->inuse_size; |
4002 | stats->n_free_wblocks = num_free_wblocks(ssd); |
4003 | |
4004 | stats->write_q_sz = cf_queue_sz(ssd->swb_write_q); |
4005 | stats->n_writes = ssd->n_wblock_writes; |
4006 | |
4007 | stats->defrag_q_sz = cf_queue_sz(ssd->defrag_wblock_q); |
4008 | stats->n_defrag_reads = ssd->n_defrag_wblock_reads; |
4009 | stats->n_defrag_writes = ssd->n_defrag_wblock_writes; |
4010 | |
4011 | stats->shadow_write_q_sz = ssd->swb_shadow_q ? |
4012 | cf_queue_sz(ssd->swb_shadow_q) : 0; |
4013 | } |
4014 | |
4015 | |
4016 | int |
4017 | as_storage_ticker_stats_ssd(as_namespace *ns) |
4018 | { |
4019 | histogram_dump(ns->device_read_size_hist); |
4020 | histogram_dump(ns->device_write_size_hist); |
4021 | |
4022 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
4023 | |
4024 | for (int i = 0; i < ssds->n_ssds; i++) { |
4025 | drv_ssd *ssd = &ssds->ssds[i]; |
4026 | |
4027 | histogram_dump(ssd->hist_read); |
4028 | histogram_dump(ssd->hist_large_block_read); |
4029 | histogram_dump(ssd->hist_write); |
4030 | |
4031 | if (ssd->hist_shadow_write) { |
4032 | histogram_dump(ssd->hist_shadow_write); |
4033 | } |
4034 | } |
4035 | |
4036 | return 0; |
4037 | } |
4038 | |
4039 | |
4040 | int |
4041 | as_storage_histogram_clear_ssd(as_namespace *ns) |
4042 | { |
4043 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
4044 | |
4045 | for (int i = 0; i < ssds->n_ssds; i++) { |
4046 | drv_ssd *ssd = &ssds->ssds[i]; |
4047 | |
4048 | histogram_clear(ssd->hist_read); |
4049 | histogram_clear(ssd->hist_large_block_read); |
4050 | histogram_clear(ssd->hist_write); |
4051 | |
4052 | if (ssd->hist_shadow_write) { |
4053 | histogram_clear(ssd->hist_shadow_write); |
4054 | } |
4055 | } |
4056 | |
4057 | return 0; |
4058 | } |
4059 | |
4060 | |
4061 | //========================================================== |
4062 | // Get record storage metadata. |
4063 | // |
4064 | |
4065 | uint32_t |
4066 | as_storage_record_size_ssd(const as_record *r) |
4067 | { |
4068 | return N_RBLOCKS_TO_SIZE(r->n_rblocks); |
4069 | } |
4070 | |
4071 | |
4072 | //========================================================== |
4073 | // Shutdown. |
4074 | // |
4075 | |
4076 | void |
4077 | as_storage_shutdown_ssd(as_namespace *ns) |
4078 | { |
4079 | drv_ssds *ssds = (drv_ssds*)ns->storage_private; |
4080 | |
4081 | for (int i = 0; i < ssds->n_ssds; i++) { |
4082 | drv_ssd *ssd = &ssds->ssds[i]; |
4083 | |
4084 | // Stop the maintenance thread from (also) flushing the swbs. |
4085 | cf_mutex_lock(&ssd->write_lock); |
4086 | cf_mutex_lock(&ssd->defrag_lock); |
4087 | |
4088 | // Flush current swb by pushing it to write-q. |
4089 | if (ssd->current_swb) { |
4090 | // Clean the end of the buffer before pushing to write-q. |
4091 | if (ssd->write_block_size > ssd->current_swb->pos) { |
4092 | memset(&ssd->current_swb->buf[ssd->current_swb->pos], 0, |
4093 | ssd->write_block_size - ssd->current_swb->pos); |
4094 | } |
4095 | |
4096 | cf_queue_push(ssd->swb_write_q, &ssd->current_swb); |
4097 | ssd->current_swb = NULL; |
4098 | } |
4099 | |
4100 | // Flush defrag swb by pushing it to write-q. |
4101 | if (ssd->defrag_swb) { |
4102 | // Clean the end of the buffer before pushing to write-q. |
4103 | if (ssd->write_block_size > ssd->defrag_swb->pos) { |
4104 | memset(&ssd->defrag_swb->buf[ssd->defrag_swb->pos], 0, |
4105 | ssd->write_block_size - ssd->defrag_swb->pos); |
4106 | } |
4107 | |
4108 | cf_queue_push(ssd->swb_write_q, &ssd->defrag_swb); |
4109 | ssd->defrag_swb = NULL; |
4110 | } |
4111 | } |
4112 | |
4113 | for (int i = 0; i < ssds->n_ssds; i++) { |
4114 | drv_ssd *ssd = &ssds->ssds[i]; |
4115 | |
4116 | while (cf_queue_sz(ssd->swb_write_q)) { |
4117 | usleep(1000); |
4118 | } |
4119 | |
4120 | if (ssd->shadow_name) { |
4121 | while (cf_queue_sz(ssd->swb_shadow_q)) { |
4122 | usleep(1000); |
4123 | } |
4124 | } |
4125 | |
4126 | ssd->running = false; |
4127 | } |
4128 | |
4129 | for (int i = 0; i < ssds->n_ssds; i++) { |
4130 | drv_ssd *ssd = &ssds->ssds[i]; |
4131 | |
4132 | cf_thread_join(ssd->write_tid); |
4133 | |
4134 | if (ssd->shadow_name) { |
4135 | cf_thread_join(ssd->shadow_tid); |
4136 | } |
4137 | } |
4138 | |
4139 | ssd_set_pristine_offset(ssds); |
4140 | ssd_set_trusted(ssds); |
4141 | } |
4142 | |