1/*
2 * record.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include <alloca.h>
28#include <stdbool.h>
29#include <stddef.h>
30#include <stdint.h>
31#include <string.h>
32#include <unistd.h>
33
34#include "citrusleaf/alloc.h"
35#include "citrusleaf/cf_atomic.h"
36#include "citrusleaf/cf_byte_order.h"
37#include "citrusleaf/cf_digest.h"
38
39#include "arenax.h"
40#include "dynbuf.h"
41#include "fault.h"
42
43#include "base/cfg.h"
44#include "base/datamodel.h"
45#include "base/index.h"
46#include "base/proto.h"
47#include "base/secondary_index.h"
48#include "base/truncate.h"
49#include "base/xdr_serverside.h"
50#include "storage/storage.h"
51#include "transaction/rw_utils.h"
52
53
54//==========================================================
55// Typedefs & constants.
56//
57
58#define STACK_PARTICLES_SIZE (1024 * 1024)
59
60
61//==========================================================
62// Forward declarations.
63//
64
65void record_replace_failed(as_remote_record *rr, as_index_ref* r_ref, as_storage_rd* rd, bool is_create);
66
67int record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete);
68int record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete);
69int record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete);
70int record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete);
71
72int old_record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete);
73int old_record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete);
74int old_record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete);
75int old_record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete);
76
77void update_index_metadata(as_remote_record *rr, index_metadata *old, as_record *r);
78void unwind_index_metadata(const index_metadata *old, as_record *r);
79void unwind_dim_single_bin(as_bin* old_bin, as_bin* new_bin);
80
81int unpickle_bins(as_remote_record *rr, as_storage_rd *rd, cf_ll_buf *particles_llb);
82
83void xdr_write_replica(as_remote_record *rr, bool is_delete, uint32_t set_id);
84
85
86//==========================================================
87// Inlines & macros.
88//
89
90static inline int
91resolve_generation_direct(uint16_t left, uint16_t right)
92{
93 return left == right ? 0 : (right > left ? 1 : -1);
94}
95
96static inline int
97resolve_generation(uint16_t left, uint16_t right)
98{
99 return left == right ? 0 : (as_gen_less_than(left, right) ? 1 : -1);
100}
101
102// Assumes remote generation is not 0. (Local may be 0 if creating record.)
103static inline bool
104next_generation(uint16_t local, uint16_t remote, as_namespace* ns)
105{
106 local = plain_generation(local, ns);
107 remote = plain_generation(remote, ns);
108
109 return local == 0xFFFF ? remote == 1 : remote - local == 1;
110}
111
112// Quietly trim void-time. (Clock on remote node different?) TODO - best way?
113static inline uint32_t
114trim_void_time(uint32_t void_time)
115{
116 uint32_t max_void_time = as_record_void_time_get() + MAX_ALLOWED_TTL;
117
118 return void_time > max_void_time ? max_void_time : void_time;
119}
120
121
122//==========================================================
123// Public API - record lock lifecycle.
124//
125
126// Returns:
127// 1 - created new record
128// 0 - found existing record
129// -1 - failure - could not allocate arena stage
130int
131as_record_get_create(as_index_tree *tree, const cf_digest *keyd,
132 as_index_ref *r_ref, as_namespace *ns)
133{
134 int rv = as_index_get_insert_vlock(tree, keyd, r_ref);
135
136 if (rv == 1) {
137 cf_atomic64_incr(&ns->n_objects);
138 }
139
140 return rv;
141}
142
143
144// Returns:
145// 0 - found
146// -1 - not found
147int
148as_record_get(as_index_tree *tree, const cf_digest *keyd, as_index_ref *r_ref)
149{
150 return as_index_get_vlock(tree, keyd, r_ref);
151}
152
153
154// Done with record - unlock. If record was removed from tree and is not
155// reserved (by reduce), destroy record and free arena element.
156void
157as_record_done(as_index_ref *r_ref, as_namespace *ns)
158{
159 as_record *r = r_ref->r;
160
161 if (! as_index_is_valid_record(r) && r->rc == 0) {
162 as_record_destroy(r, ns);
163 cf_arenax_free(ns->arena, r_ref->r_h, r_ref->puddle);
164 }
165
166 cf_mutex_unlock(r_ref->olock);
167}
168
169
170//==========================================================
171// Public API - record lifecycle utilities.
172//
173
174// Returns:
175// 0 - found
176// -1 - not found
177// -2 - can't lock
178int
179as_record_exists(as_index_tree *tree, const cf_digest *keyd)
180{
181 return as_index_try_exists(tree, keyd);
182}
183
184
185// TODO - inline this, if/when we unravel header files.
186bool
187as_record_is_expired(const as_record *r)
188{
189 return r->void_time != 0 && r->void_time < as_record_void_time_get();
190}
191
192
193// Called when writes encounter a "doomed" record, to delete the doomed record
194// and create a new one in place without giving up the record lock.
195// FIXME - won't be able to "rescue" with future sindex method - will go away.
196void
197as_record_rescue(as_index_ref *r_ref, as_namespace *ns)
198{
199 record_delete_adjust_sindex(r_ref->r, ns);
200 as_record_destroy(r_ref->r, ns);
201 as_index_clear_record_info(r_ref->r);
202 cf_atomic64_incr(&ns->n_objects);
203}
204
205
206// Called only after last reference is released. Called by as_record_done(),
207// also given to index trees to be called when tree releases record reference.
208void
209as_record_destroy(as_record *r, as_namespace *ns)
210{
211 if (ns->storage_data_in_memory) {
212 // Note - rd is a limited container here - not calling
213 // as_storage_record_create(), _open(), _close().
214 as_storage_rd rd;
215
216 rd.r = r;
217 rd.ns = ns;
218 as_storage_rd_load_n_bins(&rd);
219 as_storage_rd_load_bins(&rd, NULL);
220
221 as_storage_record_drop_from_mem_stats(&rd);
222
223 as_record_destroy_bins(&rd);
224
225 if (! ns->single_bin) {
226 as_record_free_bin_space(r);
227
228 if (r->dim) {
229 cf_free(r->dim); // frees the key
230 }
231 }
232 }
233
234 as_record_drop_stats(r, ns);
235
236 // Dereference record's storage used-size.
237 as_storage_record_destroy(ns, r);
238
239 return;
240}
241
242
243// Called only if data-in-memory, and not single-bin.
244void
245as_record_free_bin_space(as_record *r)
246{
247 as_bin_space *bin_space = as_index_get_bin_space(r);
248
249 if (bin_space) {
250 cf_free((void*)bin_space);
251 as_index_set_bin_space(r, NULL);
252 }
253}
254
255
256// Destroy all particles in all bins.
257void
258as_record_destroy_bins(as_storage_rd *rd)
259{
260 as_record_destroy_bins_from(rd, 0);
261}
262
263
264// Destroy particles in specified bins.
265void
266as_record_destroy_bins_from(as_storage_rd *rd, uint16_t from)
267{
268 for (uint16_t i = from; i < rd->n_bins; i++) {
269 as_bin *b = &rd->bins[i];
270
271 if (! as_bin_inuse(b)) {
272 return; // no more used bins - there are never unused bin gaps
273 }
274
275 as_bin_particle_destroy(b, rd->ns->storage_data_in_memory);
276 as_bin_set_empty(b);
277 }
278}
279
280
281// Note - this is not called on the master write (or durable delete) path, where
282// keys are stored but never dropped. Only a UDF will drop a key on master.
283void
284as_record_finalize_key(as_record *r, as_namespace *ns, const uint8_t *key,
285 uint32_t key_size)
286{
287 // If a key wasn't stored, and we got one, accommodate it.
288 if (r->key_stored == 0) {
289 if (key != NULL) {
290 if (ns->storage_data_in_memory) {
291 as_record_allocate_key(r, key, key_size);
292 }
293
294 r->key_stored = 1;
295 }
296 }
297 // If a key was stored, but we didn't get one, remove the key.
298 else if (key == NULL) {
299 if (ns->storage_data_in_memory) {
300 as_bin_space *bin_space = ((as_rec_space *)r->dim)->bin_space;
301
302 cf_free(r->dim);
303 r->dim = (void *)bin_space;
304 }
305
306 r->key_stored = 0;
307 }
308}
309
310
311// Called only for data-in-memory multi-bin, with no key currently stored.
312// Note - have to modify if/when other metadata joins key in as_rec_space.
313void
314as_record_allocate_key(as_record *r, const uint8_t *key, uint32_t key_size)
315{
316 as_rec_space *rec_space = (as_rec_space *)
317 cf_malloc_ns(sizeof(as_rec_space) + key_size);
318
319 rec_space->bin_space = (as_bin_space *)r->dim;
320 rec_space->key_size = key_size;
321 memcpy((void*)rec_space->key, (const void*)key, key_size);
322
323 r->dim = (void*)rec_space;
324}
325
326
327//==========================================================
328// Public API - pickled record utilities.
329//
330
331// TODO - old pickle - remove in "six months".
332// Flatten record's bins into "pickle" format for fabric.
333uint8_t *
334as_record_pickle(as_storage_rd *rd, size_t *len_r)
335{
336 as_namespace *ns = rd->ns;
337
338 uint32_t sz = 2; // always 2 bytes for number of bins
339 uint16_t n_bins_in_use;
340
341 for (n_bins_in_use = 0; n_bins_in_use < rd->n_bins; n_bins_in_use++) {
342 as_bin *b = &rd->bins[n_bins_in_use];
343
344 if (! as_bin_inuse(b)) {
345 break;
346 }
347
348 sz += 1; // for bin name length
349 sz += ns->single_bin ?
350 0 : strlen(as_bin_get_name_from_id(ns, b->id)); // for bin name
351 sz += 1; // was for version - currently not used
352
353 sz += as_bin_particle_pickled_size(b);
354 }
355
356 uint8_t *pickle = cf_malloc(sz);
357 uint8_t *buf = pickle;
358
359 (*(uint16_t *)buf) = cf_swap_to_be16(n_bins_in_use); // number of bins
360 buf += 2;
361
362 for (uint16_t i = 0; i < n_bins_in_use; i++) {
363 as_bin *b = &rd->bins[i];
364
365 // Copy bin name, skipping a byte for name length.
366 uint8_t name_len = (uint8_t)as_bin_memcpy_name(ns, buf + 1, b);
367
368 *buf++ = name_len; // fill in bin name length
369 buf += name_len; // skip past bin name
370 *buf++ = 0; // was version - currently not used
371
372 buf += as_bin_particle_to_pickled(b, buf);
373 }
374
375 *len_r = sz;
376
377 return pickle;
378}
379
380
381// If remote record is better than local record, replace local with remote.
382int
383as_record_replace_if_better(as_remote_record *rr, bool is_repl_write,
384 bool skip_sindex, bool do_xdr_write)
385{
386 as_namespace *ns = rr->rsv->ns;
387
388 if (! as_storage_has_space(ns)) {
389 cf_warning(AS_RECORD, "{%s} record replace: drives full", ns->name);
390 return AS_ERR_OUT_OF_SPACE;
391 }
392
393 CF_ALLOC_SET_NS_ARENA(ns);
394
395 as_index_tree *tree = rr->rsv->tree;
396
397 as_index_ref r_ref;
398 int rv = as_record_get_create(tree, rr->keyd, &r_ref, ns);
399
400 if (rv < 0) {
401 return AS_ERR_OUT_OF_SPACE;
402 }
403
404 bool is_create = rv == 1;
405 as_index *r = r_ref.r;
406
407 int result;
408
409 conflict_resolution_pol policy = ns->conflict_resolution_policy;
410
411 if (is_repl_write) {
412 bool from_replica;
413
414 if ((result = as_partition_check_source(ns, rr->rsv->p, rr->src,
415 &from_replica)) != AS_OK) {
416 record_replace_failed(rr, &r_ref, NULL, is_create);
417 return result;
418 }
419
420 repl_write_init_repl_state(rr, from_replica);
421 policy = repl_write_conflict_resolution_policy(ns);
422 }
423
424 if (! is_create && record_replace_check(r, ns) < 0) {
425 record_replace_failed(rr, &r_ref, NULL, is_create);
426 return AS_ERR_FORBIDDEN;
427 }
428
429 // If local record is better, no-op or fail.
430 if (! is_create && (result = as_record_resolve_conflict(policy,
431 r->generation, r->last_update_time, (uint16_t)rr->generation,
432 rr->last_update_time)) <= 0) {
433 record_replace_failed(rr, &r_ref, NULL, is_create);
434 return result == 0 ? AS_ERR_RECORD_EXISTS : AS_ERR_GENERATION;
435 }
436 // else - remote winner - apply it.
437
438 // If creating record, write set-ID into index.
439 if (is_create) {
440 if (rr->set_name && (result = as_index_set_set_w_len(r, ns,
441 rr->set_name, rr->set_name_len, false)) < 0) {
442 record_replace_failed(rr, &r_ref, NULL, is_create);
443 return -result;
444 }
445
446 r->last_update_time = rr->last_update_time;
447
448 // Don't write record if it would be truncated.
449 if (as_truncate_record_is_truncated(r, ns)) {
450 record_replace_failed(rr, &r_ref, NULL, is_create);
451 return AS_OK;
452 }
453 }
454 // else - not bothering to check that sets match.
455
456 as_storage_rd rd;
457
458 if (is_create) {
459 as_storage_record_create(ns, r, &rd);
460 }
461 else {
462 as_storage_record_open(ns, r, &rd);
463 }
464
465 // TODO - old pickle - remove condition in "six months".
466 if (rr->is_old_pickle) {
467 // Prepare to store set name, if there is one.
468 rd.set_name = rr->set_name;
469 rd.set_name_len = rr->set_name_len;
470 }
471 else {
472 rd.pickle = rr->pickle;
473 rd.pickle_sz = rr->pickle_sz;
474 rd.orig_pickle_sz = as_flat_orig_pickle_size(rr, rd.pickle_sz);
475 }
476
477 // Note - deal with key after reading existing record (if such), in case
478 // we're dropping the key.
479
480 // Split according to configuration to replace local record.
481 bool is_delete = false;
482
483 if (ns->storage_data_in_memory) {
484 if (ns->single_bin) {
485 result = record_apply_dim_single_bin(rr, &rd, &is_delete);
486 }
487 else {
488 result = record_apply_dim(rr, &rd, skip_sindex, &is_delete);
489 }
490 }
491 else {
492 if (ns->single_bin) {
493 result = record_apply_ssd_single_bin(rr, &rd, &is_delete);
494 }
495 else {
496 result = record_apply_ssd(rr, &rd, skip_sindex, &is_delete);
497 }
498 }
499
500 if (result != 0) {
501 record_replace_failed(rr, &r_ref, &rd, is_create);
502 return result;
503 }
504
505 uint16_t set_id = as_index_get_set_id(r); // save for XDR write
506
507 record_replaced(r, rr);
508
509 as_storage_record_close(&rd);
510 as_record_done(&r_ref, ns);
511
512 if (do_xdr_write) {
513 xdr_write_replica(rr, is_delete, set_id);
514 }
515
516 return AS_OK;
517}
518
519
520//==========================================================
521// Public API - conflict resolution.
522//
523
524// Returns -1 if left wins, 1 if right wins, and 0 for tie.
525int
526as_record_resolve_conflict(conflict_resolution_pol policy, uint16_t left_gen,
527 uint64_t left_lut, uint16_t right_gen, uint64_t right_lut)
528{
529 int result = 0;
530
531 switch (policy) {
532 case AS_NAMESPACE_CONFLICT_RESOLUTION_POLICY_GENERATION:
533 // Doesn't use resolve_generation() - direct comparison gives much
534 // better odds of picking the record with more history after a split
535 // brain where one side starts the record from scratch.
536 result = resolve_generation_direct(left_gen, right_gen);
537 if (result == 0) {
538 result = resolve_last_update_time(left_lut, right_lut);
539 }
540 break;
541 case AS_NAMESPACE_CONFLICT_RESOLUTION_POLICY_LAST_UPDATE_TIME:
542 result = resolve_last_update_time(left_lut, right_lut);
543 if (result == 0) {
544 result = resolve_generation(left_gen, right_gen);
545 }
546 break;
547 case AS_NAMESPACE_CONFLICT_RESOLUTION_POLICY_CP:
548 result = record_resolve_conflict_cp(left_gen, left_lut, right_gen,
549 right_lut);
550 break;
551 default:
552 cf_crash(AS_RECORD, "invalid conflict resolution policy");
553 break;
554 }
555
556 return result;
557}
558
559
560//==========================================================
561// Local helpers.
562//
563
564void
565record_replace_failed(as_remote_record *rr, as_index_ref* r_ref,
566 as_storage_rd* rd, bool is_create)
567{
568 if (rd) {
569 as_storage_record_close(rd);
570 }
571
572 if (is_create) {
573 as_index_delete(rr->rsv->tree, rr->keyd);
574 }
575
576 as_record_done(r_ref, rr->rsv->ns);
577}
578
579
580int
581record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd,
582 bool *is_delete)
583{
584 // TODO - old pickle - remove in "six months".
585 if (rr->is_old_pickle) {
586 return old_record_apply_dim_single_bin(rr, rd, is_delete);
587 }
588
589 as_namespace* ns = rr->rsv->ns;
590 as_record* r = rd->r;
591
592 rd->n_bins = 1;
593
594 // Set rd->bins!
595 as_storage_rd_load_bins(rd, NULL);
596
597 // For memory accounting, note current usage.
598 uint64_t memory_bytes = 0;
599
600 // TODO - as_storage_record_get_n_bytes_memory() could check bins in use.
601 if (as_bin_inuse(rd->bins)) {
602 memory_bytes = as_storage_record_get_n_bytes_memory(rd);
603 }
604
605 uint16_t n_new_bins = rr->n_bins;
606
607 if (n_new_bins > 1) {
608 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins ", ns->name, n_new_bins);
609 return AS_ERR_UNKNOWN;
610 }
611
612 // Keep old bin for unwinding.
613 as_bin old_bin;
614
615 as_single_bin_copy(&old_bin, rd->bins);
616
617 // No stack new bin - simpler to operate directly on bin embedded in index.
618 as_bin_set_empty(rd->bins);
619
620 int result;
621
622 // Fill the new bins and particles.
623 if (n_new_bins == 1 &&
624 (result = as_flat_unpack_remote_bins(rr, rd->bins)) != 0) {
625 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bin ", ns->name);
626 unwind_dim_single_bin(&old_bin, rd->bins);
627 return -result;
628 }
629
630 // Won't use to flatten, but needed to know if bins are in use. Amazingly,
631 // rd->n_bins 0 ok adjusting memory stats. Also, rd->bins already filled.
632 rd->n_bins = n_new_bins;
633
634 // Apply changes to metadata in as_index needed for and writing.
635 index_metadata old_metadata;
636
637 update_index_metadata(rr, &old_metadata, r);
638
639 // Write the record to storage.
640 if ((result = as_record_write_from_pickle(rd)) < 0) {
641 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
642 unwind_index_metadata(&old_metadata, r);
643 unwind_dim_single_bin(&old_bin, rd->bins);
644 return -result;
645 }
646
647 // Cleanup - destroy old bin, can't unwind after.
648 as_bin_particle_destroy(&old_bin, true);
649
650 as_storage_record_adjust_mem_stats(rd, memory_bytes);
651 *is_delete = n_new_bins == 0;
652
653 return AS_OK;
654}
655
656
657int
658record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex,
659 bool *is_delete)
660{
661 // TODO - old pickle - remove in "six months".
662 if (rr->is_old_pickle) {
663 return old_record_apply_dim(rr, rd, skip_sindex, is_delete);
664 }
665
666 as_namespace* ns = rr->rsv->ns;
667 as_record* r = rd->r;
668
669 // Set rd->n_bins!
670 as_storage_rd_load_n_bins(rd);
671
672 // Set rd->bins!
673 as_storage_rd_load_bins(rd, NULL);
674
675 // For memory accounting, note current usage.
676 uint64_t memory_bytes = as_storage_record_get_n_bytes_memory(rd);
677
678 int result;
679
680 // Keep old bins intact for sindex adjustment and unwinding.
681 uint16_t n_old_bins = rd->n_bins;
682 as_bin* old_bins = rd->bins;
683
684 uint16_t n_new_bins = rr->n_bins;
685 as_bin new_bins[n_new_bins];
686
687 if (n_new_bins != 0) {
688 memset(new_bins, 0, sizeof(new_bins));
689
690 // Fill the new bins and particles.
691 if ((result = as_flat_unpack_remote_bins(rr, new_bins)) != 0) {
692 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins ", ns->name);
693 destroy_stack_bins(new_bins, n_new_bins);
694 return -result;
695 }
696 }
697
698 // Won't use to flatten, but needed for memory stats, bins in use, etc.
699 rd->n_bins = n_new_bins;
700 rd->bins = new_bins;
701
702 // Apply changes to metadata in as_index needed for and writing.
703 index_metadata old_metadata;
704
705 update_index_metadata(rr, &old_metadata, r);
706
707 // Write the record to storage.
708 if ((result = as_record_write_from_pickle(rd)) < 0) {
709 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
710 unwind_index_metadata(&old_metadata, r);
711 destroy_stack_bins(new_bins, n_new_bins);
712 return -result;
713 }
714
715 // Success - adjust sindex, looking at old and new bins.
716 if (! (skip_sindex &&
717 next_generation(r->generation, (uint16_t)rr->generation, ns)) &&
718 record_has_sindex(r, ns)) {
719 write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd,
720 old_bins, n_old_bins, new_bins, n_new_bins);
721 }
722
723 // Cleanup - destroy relevant bins, can't unwind after.
724 destroy_stack_bins(old_bins, n_old_bins);
725
726 // Fill out new_bin_space.
727 as_bin_space* new_bin_space = NULL;
728
729 if (n_new_bins != 0) {
730 new_bin_space = (as_bin_space*)
731 cf_malloc_ns(sizeof(as_bin_space) + sizeof(new_bins));
732
733 new_bin_space->n_bins = n_new_bins;
734 memcpy((void*)new_bin_space->bins, new_bins, sizeof(new_bins));
735 }
736
737 // Swizzle the index element's as_bin_space pointer.
738 as_bin_space* old_bin_space = as_index_get_bin_space(r);
739
740 if (old_bin_space) {
741 cf_free(old_bin_space);
742 }
743
744 as_index_set_bin_space(r, new_bin_space);
745
746 // Now ok to store or drop key, as determined by message.
747 as_record_finalize_key(r, ns, rr->key, rr->key_size);
748
749 as_storage_record_adjust_mem_stats(rd, memory_bytes);
750 *is_delete = n_new_bins == 0;
751
752 return AS_OK;
753}
754
755
756int
757record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd,
758 bool *is_delete)
759{
760 // TODO - old pickle - remove in "six months".
761 if (rr->is_old_pickle) {
762 return old_record_apply_ssd_single_bin(rr, rd, is_delete);
763 }
764
765 as_namespace* ns = rr->rsv->ns;
766 as_record* r = rd->r;
767
768 uint16_t n_new_bins = rr->n_bins;
769
770 if (n_new_bins > 1) {
771 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins ", ns->name, n_new_bins);
772 return AS_ERR_UNKNOWN;
773 }
774
775 // Won't use to flatten, but needed to know if bins are in use.
776 rd->n_bins = n_new_bins;
777
778 // Apply changes to metadata in as_index needed for and writing.
779 index_metadata old_metadata;
780
781 update_index_metadata(rr, &old_metadata, r);
782
783 // Write the record to storage.
784 int result = as_record_write_from_pickle(rd);
785
786 if (result < 0) {
787 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
788 unwind_index_metadata(&old_metadata, r);
789 return -result;
790 }
791
792 // Now ok to store or drop key, as determined by message.
793 as_record_finalize_key(r, ns, rr->key, rr->key_size);
794
795 *is_delete = n_new_bins == 0;
796
797 return AS_OK;
798}
799
800
801int
802record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex,
803 bool *is_delete)
804{
805 // TODO - old pickle - remove in "six months".
806 if (rr->is_old_pickle) {
807 return old_record_apply_ssd(rr, rd, skip_sindex, is_delete);
808 }
809
810 as_namespace* ns = rr->rsv->ns;
811 as_record* r = rd->r;
812
813 bool has_sindex = ! (skip_sindex &&
814 next_generation(r->generation, (uint16_t)rr->generation, ns)) &&
815 record_has_sindex(r, ns);
816
817 int result;
818
819 uint16_t n_old_bins = 0;
820 as_bin *old_bins = NULL;
821
822 uint16_t n_new_bins = rr->n_bins;
823 as_bin *new_bins = NULL;
824
825 if (has_sindex) {
826 // TODO - separate function?
827 if ((result = as_storage_rd_load_n_bins(rd)) < 0) {
828 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load n-bins ", ns->name);
829 return -result;
830 }
831
832 n_old_bins = rd->n_bins;
833 old_bins = alloca(n_old_bins * sizeof(as_bin));
834
835 if ((result = as_storage_rd_load_bins(rd, old_bins)) < 0) {
836 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load bins ", ns->name);
837 return -result;
838 }
839
840 // Won't use to flatten.
841 rd->bins = NULL;
842
843 if (n_new_bins != 0) {
844 new_bins = alloca(n_new_bins * sizeof(as_bin));
845 memset(new_bins, 0, n_new_bins * sizeof(as_bin));
846
847 if ((result = as_flat_unpack_remote_bins(rr, new_bins)) != 0) {
848 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins ", ns->name);
849 return -result;
850 }
851 }
852 }
853
854 // Won't use to flatten, but needed to know if bins are in use.
855 rd->n_bins = n_new_bins;
856
857 // Apply changes to metadata in as_index needed for and writing.
858 index_metadata old_metadata;
859
860 update_index_metadata(rr, &old_metadata, r);
861
862 // Write the record to storage.
863 if ((result = as_record_write_from_pickle(rd)) < 0) {
864 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
865 unwind_index_metadata(&old_metadata, r);
866 return -result;
867 }
868
869 // Success - adjust sindex, looking at old and new bins.
870 if (has_sindex) {
871 write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd,
872 old_bins, n_old_bins, new_bins, n_new_bins);
873 }
874
875 // Now ok to store or drop key, as determined by message.
876 as_record_finalize_key(r, ns, rr->key, rr->key_size);
877
878 *is_delete = n_new_bins == 0;
879
880 return AS_OK;
881}
882
883
884// TODO - old pickle - remove in "six months".
885int
886old_record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd,
887 bool *is_delete)
888{
889 as_namespace* ns = rr->rsv->ns;
890 as_record* r = rd->r;
891
892 rd->n_bins = 1;
893
894 // Set rd->bins!
895 as_storage_rd_load_bins(rd, NULL);
896
897 // For memory accounting, note current usage.
898 uint64_t memory_bytes = 0;
899
900 // TODO - as_storage_record_get_n_bytes_memory() could check bins in use.
901 if (as_bin_inuse(rd->bins)) {
902 memory_bytes = as_storage_record_get_n_bytes_memory(rd);
903 }
904
905 uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle);
906
907 if (n_new_bins > 1) {
908 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins ", ns->name, n_new_bins);
909 return AS_ERR_UNKNOWN;
910 }
911
912 // Keep old bin intact for unwinding, clear record bin for incoming.
913 as_bin old_bin;
914
915 as_single_bin_copy(&old_bin, rd->bins);
916 as_bin_set_empty(rd->bins);
917
918 int result;
919
920 // Fill the new bins and particles.
921 if (n_new_bins == 1 &&
922 (result = unpickle_bins(rr, rd, NULL)) != 0) {
923 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bin ", ns->name);
924 unwind_dim_single_bin(&old_bin, rd->bins);
925 return result;
926 }
927
928 // Apply changes to metadata in as_index needed for and writing.
929 index_metadata old_metadata;
930
931 update_index_metadata(rr, &old_metadata, r);
932
933 // Write the record to storage.
934 if ((result = as_record_write_from_pickle(rd)) < 0) {
935 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
936 unwind_index_metadata(&old_metadata, r);
937 unwind_dim_single_bin(&old_bin, rd->bins);
938 return -result;
939 }
940
941 // Cleanup - destroy old bin, can't unwind after.
942 as_bin_particle_destroy(&old_bin, true);
943
944 as_storage_record_adjust_mem_stats(rd, memory_bytes);
945 *is_delete = n_new_bins == 0;
946
947 return AS_OK;
948}
949
950
951// TODO - old pickle - remove in "six months".
952int
953old_record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex,
954 bool *is_delete)
955{
956 as_namespace* ns = rr->rsv->ns;
957 as_record* r = rd->r;
958
959 // Set rd->n_bins!
960 as_storage_rd_load_n_bins(rd);
961
962 // Set rd->bins!
963 as_storage_rd_load_bins(rd, NULL);
964
965 // For memory accounting, note current usage.
966 uint64_t memory_bytes = as_storage_record_get_n_bytes_memory(rd);
967
968 // Keep old bins intact for sindex adjustment and unwinding.
969 uint16_t n_old_bins = rd->n_bins;
970 as_bin* old_bins = rd->bins;
971
972 uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle);
973 as_bin new_bins[n_new_bins];
974
975 memset(new_bins, 0, sizeof(new_bins));
976 rd->n_bins = n_new_bins;
977 rd->bins = new_bins;
978
979 // Fill the new bins and particles.
980 int result = unpickle_bins(rr, rd, NULL);
981
982 if (result != 0) {
983 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins ", ns->name);
984 destroy_stack_bins(new_bins, n_new_bins);
985 return result;
986 }
987
988 // Apply changes to metadata in as_index needed for and writing.
989 index_metadata old_metadata;
990
991 update_index_metadata(rr, &old_metadata, r);
992
993 // Prepare to store or drop key, as determined by message.
994 rd->key = rr->key;
995 rd->key_size = rr->key_size;
996
997 // Write the record to storage.
998 if ((result = as_record_write_from_pickle(rd)) < 0) {
999 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
1000 unwind_index_metadata(&old_metadata, r);
1001 destroy_stack_bins(new_bins, n_new_bins);
1002 return -result;
1003 }
1004
1005 // Success - adjust sindex, looking at old and new bins.
1006 if (! (skip_sindex &&
1007 next_generation(r->generation, (uint16_t)rr->generation, ns)) &&
1008 record_has_sindex(r, ns)) {
1009 write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd,
1010 old_bins, n_old_bins, new_bins, n_new_bins);
1011 }
1012
1013 // Cleanup - destroy relevant bins, can't unwind after.
1014 destroy_stack_bins(old_bins, n_old_bins);
1015
1016 // Fill out new_bin_space.
1017 as_bin_space* new_bin_space = NULL;
1018
1019 if (n_new_bins != 0) {
1020 new_bin_space = (as_bin_space*)
1021 cf_malloc_ns(sizeof(as_bin_space) + sizeof(new_bins));
1022
1023 new_bin_space->n_bins = rd->n_bins;
1024 memcpy((void*)new_bin_space->bins, new_bins, sizeof(new_bins));
1025 }
1026
1027 // Swizzle the index element's as_bin_space pointer.
1028 as_bin_space* old_bin_space = as_index_get_bin_space(r);
1029
1030 if (old_bin_space) {
1031 cf_free(old_bin_space);
1032 }
1033
1034 as_index_set_bin_space(r, new_bin_space);
1035
1036 // Now ok to store or drop key, as determined by message.
1037 as_record_finalize_key(r, ns, rd->key, rd->key_size);
1038
1039 as_storage_record_adjust_mem_stats(rd, memory_bytes);
1040 *is_delete = n_new_bins == 0;
1041
1042 return AS_OK;
1043}
1044
1045
1046// TODO - old pickle - remove in "six months".
1047int
1048old_record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd,
1049 bool *is_delete)
1050{
1051 as_namespace* ns = rr->rsv->ns;
1052 as_record* r = rd->r;
1053
1054 uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle);
1055
1056 if (n_new_bins > 1) {
1057 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins ", ns->name, n_new_bins);
1058 return AS_ERR_UNKNOWN;
1059 }
1060
1061 as_bin stack_bin = { { 0 } };
1062
1063 rd->n_bins = 1;
1064 rd->bins = &stack_bin;
1065
1066 // Fill the new bin and particle.
1067 cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE);
1068
1069 int result;
1070
1071 if (n_new_bins == 1 &&
1072 (result = unpickle_bins(rr, rd, &particles_llb)) != 0) {
1073 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bin ", ns->name);
1074 cf_ll_buf_free(&particles_llb);
1075 return result;
1076 }
1077
1078 // Apply changes to metadata in as_index needed for and writing.
1079 index_metadata old_metadata;
1080
1081 update_index_metadata(rr, &old_metadata, r);
1082
1083 // Prepare to store or drop key, as determined by message.
1084 rd->key = rr->key;
1085 rd->key_size = rr->key_size;
1086
1087 // Write the record to storage.
1088 if ((result = as_record_write_from_pickle(rd)) < 0) {
1089 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
1090 unwind_index_metadata(&old_metadata, r);
1091 cf_ll_buf_free(&particles_llb);
1092 return -result;
1093 }
1094
1095 // Now ok to store or drop key, as determined by message.
1096 as_record_finalize_key(r, ns, rd->key, rd->key_size);
1097
1098 *is_delete = n_new_bins == 0;
1099
1100 cf_ll_buf_free(&particles_llb);
1101
1102 return AS_OK;
1103}
1104
1105
1106// TODO - old pickle - remove in "six months".
1107int
1108old_record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex,
1109 bool *is_delete)
1110{
1111 as_namespace* ns = rr->rsv->ns;
1112 as_record* r = rd->r;
1113 bool has_sindex = ! (skip_sindex &&
1114 next_generation(r->generation, (uint16_t)rr->generation, ns)) &&
1115 record_has_sindex(r, ns);
1116
1117 uint16_t n_old_bins = 0;
1118 int result;
1119
1120 if (has_sindex) {
1121 // Set rd->n_bins!
1122 if ((result = as_storage_rd_load_n_bins(rd)) < 0) {
1123 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load n-bins ", ns->name);
1124 return -result;
1125 }
1126
1127 n_old_bins = rd->n_bins;
1128 }
1129
1130 as_bin old_bins[n_old_bins];
1131
1132 if (has_sindex) {
1133 // Set rd->bins!
1134 if ((result = as_storage_rd_load_bins(rd, old_bins)) < 0) {
1135 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load bins ", ns->name);
1136 return -result;
1137 }
1138 }
1139
1140 // Stack space for resulting record's bins.
1141 uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle);
1142 as_bin new_bins[n_new_bins];
1143
1144 memset(new_bins, 0, sizeof(new_bins));
1145 rd->n_bins = n_new_bins;
1146 rd->bins = new_bins;
1147
1148 // Fill the new bins and particles.
1149 cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE);
1150
1151 if ((result = unpickle_bins(rr, rd, &particles_llb)) != 0) {
1152 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins ", ns->name);
1153 cf_ll_buf_free(&particles_llb);
1154 return result;
1155 }
1156
1157 // Apply changes to metadata in as_index needed for and writing.
1158 index_metadata old_metadata;
1159
1160 update_index_metadata(rr, &old_metadata, r);
1161
1162 // Prepare to store or drop key, as determined by message.
1163 rd->key = rr->key;
1164 rd->key_size = rr->key_size;
1165
1166 // Write the record to storage.
1167 if ((result = as_record_write_from_pickle(rd)) < 0) {
1168 cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write ", ns->name);
1169 unwind_index_metadata(&old_metadata, r);
1170 cf_ll_buf_free(&particles_llb);
1171 return -result;
1172 }
1173
1174 // Success - adjust sindex, looking at old and new bins.
1175 if (has_sindex) {
1176 write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd,
1177 old_bins, n_old_bins, new_bins, n_new_bins);
1178 }
1179
1180 // Now ok to store or drop key, as determined by message.
1181 as_record_finalize_key(r, ns, rd->key, rd->key_size);
1182
1183 *is_delete = n_new_bins == 0;
1184
1185 cf_ll_buf_free(&particles_llb);
1186
1187 return AS_OK;
1188}
1189
1190
1191void
1192update_index_metadata(as_remote_record *rr, index_metadata *old, as_record *r)
1193{
1194 old->void_time = r->void_time;
1195 old->last_update_time = r->last_update_time;
1196 old->generation = r->generation;
1197
1198 r->generation = (uint16_t)rr->generation;
1199 r->void_time = trim_void_time(rr->void_time);
1200 r->last_update_time = rr->last_update_time;
1201}
1202
1203
1204void
1205unwind_index_metadata(const index_metadata *old, as_record *r)
1206{
1207 r->void_time = old->void_time;
1208 r->last_update_time = old->last_update_time;
1209 r->generation = old->generation;
1210}
1211
1212
1213void
1214unwind_dim_single_bin(as_bin* old_bin, as_bin* new_bin)
1215{
1216 if (as_bin_inuse(new_bin)) {
1217 as_bin_particle_destroy(new_bin, true);
1218 }
1219
1220 as_single_bin_copy(new_bin, old_bin);
1221}
1222
1223
1224// TODO - old pickle - remove in "six months".
1225int
1226unpickle_bins(as_remote_record *rr, as_storage_rd *rd, cf_ll_buf *particles_llb)
1227{
1228 as_namespace *ns = rd->ns;
1229
1230 const uint8_t *end = rr->pickle + rr->pickle_sz;
1231 const uint8_t *buf = rr->pickle + 2;
1232
1233 for (uint16_t i = 0; i < rd->n_bins; i++) {
1234 if (buf >= end) {
1235 cf_warning(AS_RECORD, "incomplete pickled record");
1236 return AS_ERR_UNKNOWN;
1237 }
1238
1239 uint8_t name_sz = *buf++;
1240 const uint8_t *name = buf;
1241
1242 buf += name_sz;
1243 buf++; // skipped byte was version
1244
1245 if (buf > end) {
1246 cf_warning(AS_RECORD, "incomplete pickled record");
1247 return AS_ERR_UNKNOWN;
1248 }
1249
1250 int result;
1251 as_bin *b = as_bin_create_from_buf(rd, name, name_sz, &result);
1252
1253 if (! b) {
1254 return result;
1255 }
1256
1257 if (ns->storage_data_in_memory) {
1258 if ((result = as_bin_particle_alloc_from_pickled(b,
1259 &buf, end)) < 0) {
1260 return -result;
1261 }
1262 }
1263 else {
1264 if ((result = as_bin_particle_stack_from_pickled(b, particles_llb,
1265 &buf, end)) < 0) {
1266 return -result;
1267 }
1268 }
1269 }
1270
1271 if (buf != end) {
1272 cf_warning(AS_RECORD, "extra bytes on pickled record");
1273 return AS_ERR_UNKNOWN;
1274 }
1275
1276 return AS_OK;
1277}
1278
1279
1280void
1281xdr_write_replica(as_remote_record *rr, bool is_delete, uint32_t set_id)
1282{
1283 uint16_t generation = (uint16_t)rr->generation;
1284 xdr_op_type op_type = XDR_OP_TYPE_WRITE;
1285
1286 // Note - in this code path, only durable deletes get here.
1287 if (is_delete) {
1288 generation = 0;
1289 op_type = XDR_OP_TYPE_DURABLE_DELETE;
1290 }
1291
1292 // Don't send an XDR delete if it's disallowed.
1293 if (is_delete && ! is_xdr_delete_shipping_enabled()) {
1294 // TODO - should we also not ship if there was no record here before?
1295 return;
1296 }
1297
1298 xdr_write(rr->rsv->ns, rr->keyd, generation, rr->src, op_type, set_id,
1299 NULL);
1300}
1301