1 | /* |
2 | * record.c |
3 | * |
4 | * Copyright (C) 2012-2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include <alloca.h> |
28 | #include <stdbool.h> |
29 | #include <stddef.h> |
30 | #include <stdint.h> |
31 | #include <string.h> |
32 | #include <unistd.h> |
33 | |
34 | #include "citrusleaf/alloc.h" |
35 | #include "citrusleaf/cf_atomic.h" |
36 | #include "citrusleaf/cf_byte_order.h" |
37 | #include "citrusleaf/cf_digest.h" |
38 | |
39 | #include "arenax.h" |
40 | #include "dynbuf.h" |
41 | #include "fault.h" |
42 | |
43 | #include "base/cfg.h" |
44 | #include "base/datamodel.h" |
45 | #include "base/index.h" |
46 | #include "base/proto.h" |
47 | #include "base/secondary_index.h" |
48 | #include "base/truncate.h" |
49 | #include "base/xdr_serverside.h" |
50 | #include "storage/storage.h" |
51 | #include "transaction/rw_utils.h" |
52 | |
53 | |
54 | //========================================================== |
55 | // Typedefs & constants. |
56 | // |
57 | |
58 | #define STACK_PARTICLES_SIZE (1024 * 1024) |
59 | |
60 | |
61 | //========================================================== |
62 | // Forward declarations. |
63 | // |
64 | |
65 | void record_replace_failed(as_remote_record *rr, as_index_ref* r_ref, as_storage_rd* rd, bool is_create); |
66 | |
67 | int record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete); |
68 | int record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete); |
69 | int record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete); |
70 | int record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete); |
71 | |
72 | int old_record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete); |
73 | int old_record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete); |
74 | int old_record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd, bool *is_delete); |
75 | int old_record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, bool *is_delete); |
76 | |
77 | void update_index_metadata(as_remote_record *rr, index_metadata *old, as_record *r); |
78 | void unwind_index_metadata(const index_metadata *old, as_record *r); |
79 | void unwind_dim_single_bin(as_bin* old_bin, as_bin* new_bin); |
80 | |
81 | int unpickle_bins(as_remote_record *rr, as_storage_rd *rd, cf_ll_buf *particles_llb); |
82 | |
83 | void xdr_write_replica(as_remote_record *rr, bool is_delete, uint32_t set_id); |
84 | |
85 | |
86 | //========================================================== |
87 | // Inlines & macros. |
88 | // |
89 | |
90 | static inline int |
91 | resolve_generation_direct(uint16_t left, uint16_t right) |
92 | { |
93 | return left == right ? 0 : (right > left ? 1 : -1); |
94 | } |
95 | |
96 | static inline int |
97 | resolve_generation(uint16_t left, uint16_t right) |
98 | { |
99 | return left == right ? 0 : (as_gen_less_than(left, right) ? 1 : -1); |
100 | } |
101 | |
102 | // Assumes remote generation is not 0. (Local may be 0 if creating record.) |
103 | static inline bool |
104 | next_generation(uint16_t local, uint16_t remote, as_namespace* ns) |
105 | { |
106 | local = plain_generation(local, ns); |
107 | remote = plain_generation(remote, ns); |
108 | |
109 | return local == 0xFFFF ? remote == 1 : remote - local == 1; |
110 | } |
111 | |
112 | // Quietly trim void-time. (Clock on remote node different?) TODO - best way? |
113 | static inline uint32_t |
114 | trim_void_time(uint32_t void_time) |
115 | { |
116 | uint32_t max_void_time = as_record_void_time_get() + MAX_ALLOWED_TTL; |
117 | |
118 | return void_time > max_void_time ? max_void_time : void_time; |
119 | } |
120 | |
121 | |
122 | //========================================================== |
123 | // Public API - record lock lifecycle. |
124 | // |
125 | |
126 | // Returns: |
127 | // 1 - created new record |
128 | // 0 - found existing record |
129 | // -1 - failure - could not allocate arena stage |
130 | int |
131 | as_record_get_create(as_index_tree *tree, const cf_digest *keyd, |
132 | as_index_ref *r_ref, as_namespace *ns) |
133 | { |
134 | int rv = as_index_get_insert_vlock(tree, keyd, r_ref); |
135 | |
136 | if (rv == 1) { |
137 | cf_atomic64_incr(&ns->n_objects); |
138 | } |
139 | |
140 | return rv; |
141 | } |
142 | |
143 | |
144 | // Returns: |
145 | // 0 - found |
146 | // -1 - not found |
147 | int |
148 | as_record_get(as_index_tree *tree, const cf_digest *keyd, as_index_ref *r_ref) |
149 | { |
150 | return as_index_get_vlock(tree, keyd, r_ref); |
151 | } |
152 | |
153 | |
154 | // Done with record - unlock. If record was removed from tree and is not |
155 | // reserved (by reduce), destroy record and free arena element. |
156 | void |
157 | as_record_done(as_index_ref *r_ref, as_namespace *ns) |
158 | { |
159 | as_record *r = r_ref->r; |
160 | |
161 | if (! as_index_is_valid_record(r) && r->rc == 0) { |
162 | as_record_destroy(r, ns); |
163 | cf_arenax_free(ns->arena, r_ref->r_h, r_ref->puddle); |
164 | } |
165 | |
166 | cf_mutex_unlock(r_ref->olock); |
167 | } |
168 | |
169 | |
170 | //========================================================== |
171 | // Public API - record lifecycle utilities. |
172 | // |
173 | |
174 | // Returns: |
175 | // 0 - found |
176 | // -1 - not found |
177 | // -2 - can't lock |
178 | int |
179 | as_record_exists(as_index_tree *tree, const cf_digest *keyd) |
180 | { |
181 | return as_index_try_exists(tree, keyd); |
182 | } |
183 | |
184 | |
185 | // TODO - inline this, if/when we unravel header files. |
186 | bool |
187 | as_record_is_expired(const as_record *r) |
188 | { |
189 | return r->void_time != 0 && r->void_time < as_record_void_time_get(); |
190 | } |
191 | |
192 | |
193 | // Called when writes encounter a "doomed" record, to delete the doomed record |
194 | // and create a new one in place without giving up the record lock. |
195 | // FIXME - won't be able to "rescue" with future sindex method - will go away. |
196 | void |
197 | as_record_rescue(as_index_ref *r_ref, as_namespace *ns) |
198 | { |
199 | record_delete_adjust_sindex(r_ref->r, ns); |
200 | as_record_destroy(r_ref->r, ns); |
201 | as_index_clear_record_info(r_ref->r); |
202 | cf_atomic64_incr(&ns->n_objects); |
203 | } |
204 | |
205 | |
206 | // Called only after last reference is released. Called by as_record_done(), |
207 | // also given to index trees to be called when tree releases record reference. |
208 | void |
209 | as_record_destroy(as_record *r, as_namespace *ns) |
210 | { |
211 | if (ns->storage_data_in_memory) { |
212 | // Note - rd is a limited container here - not calling |
213 | // as_storage_record_create(), _open(), _close(). |
214 | as_storage_rd rd; |
215 | |
216 | rd.r = r; |
217 | rd.ns = ns; |
218 | as_storage_rd_load_n_bins(&rd); |
219 | as_storage_rd_load_bins(&rd, NULL); |
220 | |
221 | as_storage_record_drop_from_mem_stats(&rd); |
222 | |
223 | as_record_destroy_bins(&rd); |
224 | |
225 | if (! ns->single_bin) { |
226 | as_record_free_bin_space(r); |
227 | |
228 | if (r->dim) { |
229 | cf_free(r->dim); // frees the key |
230 | } |
231 | } |
232 | } |
233 | |
234 | as_record_drop_stats(r, ns); |
235 | |
236 | // Dereference record's storage used-size. |
237 | as_storage_record_destroy(ns, r); |
238 | |
239 | return; |
240 | } |
241 | |
242 | |
243 | // Called only if data-in-memory, and not single-bin. |
244 | void |
245 | as_record_free_bin_space(as_record *r) |
246 | { |
247 | as_bin_space *bin_space = as_index_get_bin_space(r); |
248 | |
249 | if (bin_space) { |
250 | cf_free((void*)bin_space); |
251 | as_index_set_bin_space(r, NULL); |
252 | } |
253 | } |
254 | |
255 | |
256 | // Destroy all particles in all bins. |
257 | void |
258 | as_record_destroy_bins(as_storage_rd *rd) |
259 | { |
260 | as_record_destroy_bins_from(rd, 0); |
261 | } |
262 | |
263 | |
264 | // Destroy particles in specified bins. |
265 | void |
266 | as_record_destroy_bins_from(as_storage_rd *rd, uint16_t from) |
267 | { |
268 | for (uint16_t i = from; i < rd->n_bins; i++) { |
269 | as_bin *b = &rd->bins[i]; |
270 | |
271 | if (! as_bin_inuse(b)) { |
272 | return; // no more used bins - there are never unused bin gaps |
273 | } |
274 | |
275 | as_bin_particle_destroy(b, rd->ns->storage_data_in_memory); |
276 | as_bin_set_empty(b); |
277 | } |
278 | } |
279 | |
280 | |
281 | // Note - this is not called on the master write (or durable delete) path, where |
282 | // keys are stored but never dropped. Only a UDF will drop a key on master. |
283 | void |
284 | as_record_finalize_key(as_record *r, as_namespace *ns, const uint8_t *key, |
285 | uint32_t key_size) |
286 | { |
287 | // If a key wasn't stored, and we got one, accommodate it. |
288 | if (r->key_stored == 0) { |
289 | if (key != NULL) { |
290 | if (ns->storage_data_in_memory) { |
291 | as_record_allocate_key(r, key, key_size); |
292 | } |
293 | |
294 | r->key_stored = 1; |
295 | } |
296 | } |
297 | // If a key was stored, but we didn't get one, remove the key. |
298 | else if (key == NULL) { |
299 | if (ns->storage_data_in_memory) { |
300 | as_bin_space *bin_space = ((as_rec_space *)r->dim)->bin_space; |
301 | |
302 | cf_free(r->dim); |
303 | r->dim = (void *)bin_space; |
304 | } |
305 | |
306 | r->key_stored = 0; |
307 | } |
308 | } |
309 | |
310 | |
311 | // Called only for data-in-memory multi-bin, with no key currently stored. |
312 | // Note - have to modify if/when other metadata joins key in as_rec_space. |
313 | void |
314 | as_record_allocate_key(as_record *r, const uint8_t *key, uint32_t key_size) |
315 | { |
316 | as_rec_space *rec_space = (as_rec_space *) |
317 | cf_malloc_ns(sizeof(as_rec_space) + key_size); |
318 | |
319 | rec_space->bin_space = (as_bin_space *)r->dim; |
320 | rec_space->key_size = key_size; |
321 | memcpy((void*)rec_space->key, (const void*)key, key_size); |
322 | |
323 | r->dim = (void*)rec_space; |
324 | } |
325 | |
326 | |
327 | //========================================================== |
328 | // Public API - pickled record utilities. |
329 | // |
330 | |
331 | // TODO - old pickle - remove in "six months". |
332 | // Flatten record's bins into "pickle" format for fabric. |
333 | uint8_t * |
334 | as_record_pickle(as_storage_rd *rd, size_t *len_r) |
335 | { |
336 | as_namespace *ns = rd->ns; |
337 | |
338 | uint32_t sz = 2; // always 2 bytes for number of bins |
339 | uint16_t n_bins_in_use; |
340 | |
341 | for (n_bins_in_use = 0; n_bins_in_use < rd->n_bins; n_bins_in_use++) { |
342 | as_bin *b = &rd->bins[n_bins_in_use]; |
343 | |
344 | if (! as_bin_inuse(b)) { |
345 | break; |
346 | } |
347 | |
348 | sz += 1; // for bin name length |
349 | sz += ns->single_bin ? |
350 | 0 : strlen(as_bin_get_name_from_id(ns, b->id)); // for bin name |
351 | sz += 1; // was for version - currently not used |
352 | |
353 | sz += as_bin_particle_pickled_size(b); |
354 | } |
355 | |
356 | uint8_t *pickle = cf_malloc(sz); |
357 | uint8_t *buf = pickle; |
358 | |
359 | (*(uint16_t *)buf) = cf_swap_to_be16(n_bins_in_use); // number of bins |
360 | buf += 2; |
361 | |
362 | for (uint16_t i = 0; i < n_bins_in_use; i++) { |
363 | as_bin *b = &rd->bins[i]; |
364 | |
365 | // Copy bin name, skipping a byte for name length. |
366 | uint8_t name_len = (uint8_t)as_bin_memcpy_name(ns, buf + 1, b); |
367 | |
368 | *buf++ = name_len; // fill in bin name length |
369 | buf += name_len; // skip past bin name |
370 | *buf++ = 0; // was version - currently not used |
371 | |
372 | buf += as_bin_particle_to_pickled(b, buf); |
373 | } |
374 | |
375 | *len_r = sz; |
376 | |
377 | return pickle; |
378 | } |
379 | |
380 | |
381 | // If remote record is better than local record, replace local with remote. |
382 | int |
383 | as_record_replace_if_better(as_remote_record *rr, bool is_repl_write, |
384 | bool skip_sindex, bool do_xdr_write) |
385 | { |
386 | as_namespace *ns = rr->rsv->ns; |
387 | |
388 | if (! as_storage_has_space(ns)) { |
389 | cf_warning(AS_RECORD, "{%s} record replace: drives full" , ns->name); |
390 | return AS_ERR_OUT_OF_SPACE; |
391 | } |
392 | |
393 | CF_ALLOC_SET_NS_ARENA(ns); |
394 | |
395 | as_index_tree *tree = rr->rsv->tree; |
396 | |
397 | as_index_ref r_ref; |
398 | int rv = as_record_get_create(tree, rr->keyd, &r_ref, ns); |
399 | |
400 | if (rv < 0) { |
401 | return AS_ERR_OUT_OF_SPACE; |
402 | } |
403 | |
404 | bool is_create = rv == 1; |
405 | as_index *r = r_ref.r; |
406 | |
407 | int result; |
408 | |
409 | conflict_resolution_pol policy = ns->conflict_resolution_policy; |
410 | |
411 | if (is_repl_write) { |
412 | bool from_replica; |
413 | |
414 | if ((result = as_partition_check_source(ns, rr->rsv->p, rr->src, |
415 | &from_replica)) != AS_OK) { |
416 | record_replace_failed(rr, &r_ref, NULL, is_create); |
417 | return result; |
418 | } |
419 | |
420 | repl_write_init_repl_state(rr, from_replica); |
421 | policy = repl_write_conflict_resolution_policy(ns); |
422 | } |
423 | |
424 | if (! is_create && record_replace_check(r, ns) < 0) { |
425 | record_replace_failed(rr, &r_ref, NULL, is_create); |
426 | return AS_ERR_FORBIDDEN; |
427 | } |
428 | |
429 | // If local record is better, no-op or fail. |
430 | if (! is_create && (result = as_record_resolve_conflict(policy, |
431 | r->generation, r->last_update_time, (uint16_t)rr->generation, |
432 | rr->last_update_time)) <= 0) { |
433 | record_replace_failed(rr, &r_ref, NULL, is_create); |
434 | return result == 0 ? AS_ERR_RECORD_EXISTS : AS_ERR_GENERATION; |
435 | } |
436 | // else - remote winner - apply it. |
437 | |
438 | // If creating record, write set-ID into index. |
439 | if (is_create) { |
440 | if (rr->set_name && (result = as_index_set_set_w_len(r, ns, |
441 | rr->set_name, rr->set_name_len, false)) < 0) { |
442 | record_replace_failed(rr, &r_ref, NULL, is_create); |
443 | return -result; |
444 | } |
445 | |
446 | r->last_update_time = rr->last_update_time; |
447 | |
448 | // Don't write record if it would be truncated. |
449 | if (as_truncate_record_is_truncated(r, ns)) { |
450 | record_replace_failed(rr, &r_ref, NULL, is_create); |
451 | return AS_OK; |
452 | } |
453 | } |
454 | // else - not bothering to check that sets match. |
455 | |
456 | as_storage_rd rd; |
457 | |
458 | if (is_create) { |
459 | as_storage_record_create(ns, r, &rd); |
460 | } |
461 | else { |
462 | as_storage_record_open(ns, r, &rd); |
463 | } |
464 | |
465 | // TODO - old pickle - remove condition in "six months". |
466 | if (rr->is_old_pickle) { |
467 | // Prepare to store set name, if there is one. |
468 | rd.set_name = rr->set_name; |
469 | rd.set_name_len = rr->set_name_len; |
470 | } |
471 | else { |
472 | rd.pickle = rr->pickle; |
473 | rd.pickle_sz = rr->pickle_sz; |
474 | rd.orig_pickle_sz = as_flat_orig_pickle_size(rr, rd.pickle_sz); |
475 | } |
476 | |
477 | // Note - deal with key after reading existing record (if such), in case |
478 | // we're dropping the key. |
479 | |
480 | // Split according to configuration to replace local record. |
481 | bool is_delete = false; |
482 | |
483 | if (ns->storage_data_in_memory) { |
484 | if (ns->single_bin) { |
485 | result = record_apply_dim_single_bin(rr, &rd, &is_delete); |
486 | } |
487 | else { |
488 | result = record_apply_dim(rr, &rd, skip_sindex, &is_delete); |
489 | } |
490 | } |
491 | else { |
492 | if (ns->single_bin) { |
493 | result = record_apply_ssd_single_bin(rr, &rd, &is_delete); |
494 | } |
495 | else { |
496 | result = record_apply_ssd(rr, &rd, skip_sindex, &is_delete); |
497 | } |
498 | } |
499 | |
500 | if (result != 0) { |
501 | record_replace_failed(rr, &r_ref, &rd, is_create); |
502 | return result; |
503 | } |
504 | |
505 | uint16_t set_id = as_index_get_set_id(r); // save for XDR write |
506 | |
507 | record_replaced(r, rr); |
508 | |
509 | as_storage_record_close(&rd); |
510 | as_record_done(&r_ref, ns); |
511 | |
512 | if (do_xdr_write) { |
513 | xdr_write_replica(rr, is_delete, set_id); |
514 | } |
515 | |
516 | return AS_OK; |
517 | } |
518 | |
519 | |
520 | //========================================================== |
521 | // Public API - conflict resolution. |
522 | // |
523 | |
524 | // Returns -1 if left wins, 1 if right wins, and 0 for tie. |
525 | int |
526 | as_record_resolve_conflict(conflict_resolution_pol policy, uint16_t left_gen, |
527 | uint64_t left_lut, uint16_t right_gen, uint64_t right_lut) |
528 | { |
529 | int result = 0; |
530 | |
531 | switch (policy) { |
532 | case AS_NAMESPACE_CONFLICT_RESOLUTION_POLICY_GENERATION: |
533 | // Doesn't use resolve_generation() - direct comparison gives much |
534 | // better odds of picking the record with more history after a split |
535 | // brain where one side starts the record from scratch. |
536 | result = resolve_generation_direct(left_gen, right_gen); |
537 | if (result == 0) { |
538 | result = resolve_last_update_time(left_lut, right_lut); |
539 | } |
540 | break; |
541 | case AS_NAMESPACE_CONFLICT_RESOLUTION_POLICY_LAST_UPDATE_TIME: |
542 | result = resolve_last_update_time(left_lut, right_lut); |
543 | if (result == 0) { |
544 | result = resolve_generation(left_gen, right_gen); |
545 | } |
546 | break; |
547 | case AS_NAMESPACE_CONFLICT_RESOLUTION_POLICY_CP: |
548 | result = record_resolve_conflict_cp(left_gen, left_lut, right_gen, |
549 | right_lut); |
550 | break; |
551 | default: |
552 | cf_crash(AS_RECORD, "invalid conflict resolution policy" ); |
553 | break; |
554 | } |
555 | |
556 | return result; |
557 | } |
558 | |
559 | |
560 | //========================================================== |
561 | // Local helpers. |
562 | // |
563 | |
564 | void |
565 | record_replace_failed(as_remote_record *rr, as_index_ref* r_ref, |
566 | as_storage_rd* rd, bool is_create) |
567 | { |
568 | if (rd) { |
569 | as_storage_record_close(rd); |
570 | } |
571 | |
572 | if (is_create) { |
573 | as_index_delete(rr->rsv->tree, rr->keyd); |
574 | } |
575 | |
576 | as_record_done(r_ref, rr->rsv->ns); |
577 | } |
578 | |
579 | |
580 | int |
581 | record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd, |
582 | bool *is_delete) |
583 | { |
584 | // TODO - old pickle - remove in "six months". |
585 | if (rr->is_old_pickle) { |
586 | return old_record_apply_dim_single_bin(rr, rd, is_delete); |
587 | } |
588 | |
589 | as_namespace* ns = rr->rsv->ns; |
590 | as_record* r = rd->r; |
591 | |
592 | rd->n_bins = 1; |
593 | |
594 | // Set rd->bins! |
595 | as_storage_rd_load_bins(rd, NULL); |
596 | |
597 | // For memory accounting, note current usage. |
598 | uint64_t memory_bytes = 0; |
599 | |
600 | // TODO - as_storage_record_get_n_bytes_memory() could check bins in use. |
601 | if (as_bin_inuse(rd->bins)) { |
602 | memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
603 | } |
604 | |
605 | uint16_t n_new_bins = rr->n_bins; |
606 | |
607 | if (n_new_bins > 1) { |
608 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins " , ns->name, n_new_bins); |
609 | return AS_ERR_UNKNOWN; |
610 | } |
611 | |
612 | // Keep old bin for unwinding. |
613 | as_bin old_bin; |
614 | |
615 | as_single_bin_copy(&old_bin, rd->bins); |
616 | |
617 | // No stack new bin - simpler to operate directly on bin embedded in index. |
618 | as_bin_set_empty(rd->bins); |
619 | |
620 | int result; |
621 | |
622 | // Fill the new bins and particles. |
623 | if (n_new_bins == 1 && |
624 | (result = as_flat_unpack_remote_bins(rr, rd->bins)) != 0) { |
625 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bin " , ns->name); |
626 | unwind_dim_single_bin(&old_bin, rd->bins); |
627 | return -result; |
628 | } |
629 | |
630 | // Won't use to flatten, but needed to know if bins are in use. Amazingly, |
631 | // rd->n_bins 0 ok adjusting memory stats. Also, rd->bins already filled. |
632 | rd->n_bins = n_new_bins; |
633 | |
634 | // Apply changes to metadata in as_index needed for and writing. |
635 | index_metadata old_metadata; |
636 | |
637 | update_index_metadata(rr, &old_metadata, r); |
638 | |
639 | // Write the record to storage. |
640 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
641 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
642 | unwind_index_metadata(&old_metadata, r); |
643 | unwind_dim_single_bin(&old_bin, rd->bins); |
644 | return -result; |
645 | } |
646 | |
647 | // Cleanup - destroy old bin, can't unwind after. |
648 | as_bin_particle_destroy(&old_bin, true); |
649 | |
650 | as_storage_record_adjust_mem_stats(rd, memory_bytes); |
651 | *is_delete = n_new_bins == 0; |
652 | |
653 | return AS_OK; |
654 | } |
655 | |
656 | |
657 | int |
658 | record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, |
659 | bool *is_delete) |
660 | { |
661 | // TODO - old pickle - remove in "six months". |
662 | if (rr->is_old_pickle) { |
663 | return old_record_apply_dim(rr, rd, skip_sindex, is_delete); |
664 | } |
665 | |
666 | as_namespace* ns = rr->rsv->ns; |
667 | as_record* r = rd->r; |
668 | |
669 | // Set rd->n_bins! |
670 | as_storage_rd_load_n_bins(rd); |
671 | |
672 | // Set rd->bins! |
673 | as_storage_rd_load_bins(rd, NULL); |
674 | |
675 | // For memory accounting, note current usage. |
676 | uint64_t memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
677 | |
678 | int result; |
679 | |
680 | // Keep old bins intact for sindex adjustment and unwinding. |
681 | uint16_t n_old_bins = rd->n_bins; |
682 | as_bin* old_bins = rd->bins; |
683 | |
684 | uint16_t n_new_bins = rr->n_bins; |
685 | as_bin new_bins[n_new_bins]; |
686 | |
687 | if (n_new_bins != 0) { |
688 | memset(new_bins, 0, sizeof(new_bins)); |
689 | |
690 | // Fill the new bins and particles. |
691 | if ((result = as_flat_unpack_remote_bins(rr, new_bins)) != 0) { |
692 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins " , ns->name); |
693 | destroy_stack_bins(new_bins, n_new_bins); |
694 | return -result; |
695 | } |
696 | } |
697 | |
698 | // Won't use to flatten, but needed for memory stats, bins in use, etc. |
699 | rd->n_bins = n_new_bins; |
700 | rd->bins = new_bins; |
701 | |
702 | // Apply changes to metadata in as_index needed for and writing. |
703 | index_metadata old_metadata; |
704 | |
705 | update_index_metadata(rr, &old_metadata, r); |
706 | |
707 | // Write the record to storage. |
708 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
709 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
710 | unwind_index_metadata(&old_metadata, r); |
711 | destroy_stack_bins(new_bins, n_new_bins); |
712 | return -result; |
713 | } |
714 | |
715 | // Success - adjust sindex, looking at old and new bins. |
716 | if (! (skip_sindex && |
717 | next_generation(r->generation, (uint16_t)rr->generation, ns)) && |
718 | record_has_sindex(r, ns)) { |
719 | write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd, |
720 | old_bins, n_old_bins, new_bins, n_new_bins); |
721 | } |
722 | |
723 | // Cleanup - destroy relevant bins, can't unwind after. |
724 | destroy_stack_bins(old_bins, n_old_bins); |
725 | |
726 | // Fill out new_bin_space. |
727 | as_bin_space* new_bin_space = NULL; |
728 | |
729 | if (n_new_bins != 0) { |
730 | new_bin_space = (as_bin_space*) |
731 | cf_malloc_ns(sizeof(as_bin_space) + sizeof(new_bins)); |
732 | |
733 | new_bin_space->n_bins = n_new_bins; |
734 | memcpy((void*)new_bin_space->bins, new_bins, sizeof(new_bins)); |
735 | } |
736 | |
737 | // Swizzle the index element's as_bin_space pointer. |
738 | as_bin_space* old_bin_space = as_index_get_bin_space(r); |
739 | |
740 | if (old_bin_space) { |
741 | cf_free(old_bin_space); |
742 | } |
743 | |
744 | as_index_set_bin_space(r, new_bin_space); |
745 | |
746 | // Now ok to store or drop key, as determined by message. |
747 | as_record_finalize_key(r, ns, rr->key, rr->key_size); |
748 | |
749 | as_storage_record_adjust_mem_stats(rd, memory_bytes); |
750 | *is_delete = n_new_bins == 0; |
751 | |
752 | return AS_OK; |
753 | } |
754 | |
755 | |
756 | int |
757 | record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd, |
758 | bool *is_delete) |
759 | { |
760 | // TODO - old pickle - remove in "six months". |
761 | if (rr->is_old_pickle) { |
762 | return old_record_apply_ssd_single_bin(rr, rd, is_delete); |
763 | } |
764 | |
765 | as_namespace* ns = rr->rsv->ns; |
766 | as_record* r = rd->r; |
767 | |
768 | uint16_t n_new_bins = rr->n_bins; |
769 | |
770 | if (n_new_bins > 1) { |
771 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins " , ns->name, n_new_bins); |
772 | return AS_ERR_UNKNOWN; |
773 | } |
774 | |
775 | // Won't use to flatten, but needed to know if bins are in use. |
776 | rd->n_bins = n_new_bins; |
777 | |
778 | // Apply changes to metadata in as_index needed for and writing. |
779 | index_metadata old_metadata; |
780 | |
781 | update_index_metadata(rr, &old_metadata, r); |
782 | |
783 | // Write the record to storage. |
784 | int result = as_record_write_from_pickle(rd); |
785 | |
786 | if (result < 0) { |
787 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
788 | unwind_index_metadata(&old_metadata, r); |
789 | return -result; |
790 | } |
791 | |
792 | // Now ok to store or drop key, as determined by message. |
793 | as_record_finalize_key(r, ns, rr->key, rr->key_size); |
794 | |
795 | *is_delete = n_new_bins == 0; |
796 | |
797 | return AS_OK; |
798 | } |
799 | |
800 | |
801 | int |
802 | record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, |
803 | bool *is_delete) |
804 | { |
805 | // TODO - old pickle - remove in "six months". |
806 | if (rr->is_old_pickle) { |
807 | return old_record_apply_ssd(rr, rd, skip_sindex, is_delete); |
808 | } |
809 | |
810 | as_namespace* ns = rr->rsv->ns; |
811 | as_record* r = rd->r; |
812 | |
813 | bool has_sindex = ! (skip_sindex && |
814 | next_generation(r->generation, (uint16_t)rr->generation, ns)) && |
815 | record_has_sindex(r, ns); |
816 | |
817 | int result; |
818 | |
819 | uint16_t n_old_bins = 0; |
820 | as_bin *old_bins = NULL; |
821 | |
822 | uint16_t n_new_bins = rr->n_bins; |
823 | as_bin *new_bins = NULL; |
824 | |
825 | if (has_sindex) { |
826 | // TODO - separate function? |
827 | if ((result = as_storage_rd_load_n_bins(rd)) < 0) { |
828 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load n-bins " , ns->name); |
829 | return -result; |
830 | } |
831 | |
832 | n_old_bins = rd->n_bins; |
833 | old_bins = alloca(n_old_bins * sizeof(as_bin)); |
834 | |
835 | if ((result = as_storage_rd_load_bins(rd, old_bins)) < 0) { |
836 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load bins " , ns->name); |
837 | return -result; |
838 | } |
839 | |
840 | // Won't use to flatten. |
841 | rd->bins = NULL; |
842 | |
843 | if (n_new_bins != 0) { |
844 | new_bins = alloca(n_new_bins * sizeof(as_bin)); |
845 | memset(new_bins, 0, n_new_bins * sizeof(as_bin)); |
846 | |
847 | if ((result = as_flat_unpack_remote_bins(rr, new_bins)) != 0) { |
848 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins " , ns->name); |
849 | return -result; |
850 | } |
851 | } |
852 | } |
853 | |
854 | // Won't use to flatten, but needed to know if bins are in use. |
855 | rd->n_bins = n_new_bins; |
856 | |
857 | // Apply changes to metadata in as_index needed for and writing. |
858 | index_metadata old_metadata; |
859 | |
860 | update_index_metadata(rr, &old_metadata, r); |
861 | |
862 | // Write the record to storage. |
863 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
864 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
865 | unwind_index_metadata(&old_metadata, r); |
866 | return -result; |
867 | } |
868 | |
869 | // Success - adjust sindex, looking at old and new bins. |
870 | if (has_sindex) { |
871 | write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd, |
872 | old_bins, n_old_bins, new_bins, n_new_bins); |
873 | } |
874 | |
875 | // Now ok to store or drop key, as determined by message. |
876 | as_record_finalize_key(r, ns, rr->key, rr->key_size); |
877 | |
878 | *is_delete = n_new_bins == 0; |
879 | |
880 | return AS_OK; |
881 | } |
882 | |
883 | |
884 | // TODO - old pickle - remove in "six months". |
885 | int |
886 | old_record_apply_dim_single_bin(as_remote_record *rr, as_storage_rd *rd, |
887 | bool *is_delete) |
888 | { |
889 | as_namespace* ns = rr->rsv->ns; |
890 | as_record* r = rd->r; |
891 | |
892 | rd->n_bins = 1; |
893 | |
894 | // Set rd->bins! |
895 | as_storage_rd_load_bins(rd, NULL); |
896 | |
897 | // For memory accounting, note current usage. |
898 | uint64_t memory_bytes = 0; |
899 | |
900 | // TODO - as_storage_record_get_n_bytes_memory() could check bins in use. |
901 | if (as_bin_inuse(rd->bins)) { |
902 | memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
903 | } |
904 | |
905 | uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle); |
906 | |
907 | if (n_new_bins > 1) { |
908 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins " , ns->name, n_new_bins); |
909 | return AS_ERR_UNKNOWN; |
910 | } |
911 | |
912 | // Keep old bin intact for unwinding, clear record bin for incoming. |
913 | as_bin old_bin; |
914 | |
915 | as_single_bin_copy(&old_bin, rd->bins); |
916 | as_bin_set_empty(rd->bins); |
917 | |
918 | int result; |
919 | |
920 | // Fill the new bins and particles. |
921 | if (n_new_bins == 1 && |
922 | (result = unpickle_bins(rr, rd, NULL)) != 0) { |
923 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bin " , ns->name); |
924 | unwind_dim_single_bin(&old_bin, rd->bins); |
925 | return result; |
926 | } |
927 | |
928 | // Apply changes to metadata in as_index needed for and writing. |
929 | index_metadata old_metadata; |
930 | |
931 | update_index_metadata(rr, &old_metadata, r); |
932 | |
933 | // Write the record to storage. |
934 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
935 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
936 | unwind_index_metadata(&old_metadata, r); |
937 | unwind_dim_single_bin(&old_bin, rd->bins); |
938 | return -result; |
939 | } |
940 | |
941 | // Cleanup - destroy old bin, can't unwind after. |
942 | as_bin_particle_destroy(&old_bin, true); |
943 | |
944 | as_storage_record_adjust_mem_stats(rd, memory_bytes); |
945 | *is_delete = n_new_bins == 0; |
946 | |
947 | return AS_OK; |
948 | } |
949 | |
950 | |
951 | // TODO - old pickle - remove in "six months". |
952 | int |
953 | old_record_apply_dim(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, |
954 | bool *is_delete) |
955 | { |
956 | as_namespace* ns = rr->rsv->ns; |
957 | as_record* r = rd->r; |
958 | |
959 | // Set rd->n_bins! |
960 | as_storage_rd_load_n_bins(rd); |
961 | |
962 | // Set rd->bins! |
963 | as_storage_rd_load_bins(rd, NULL); |
964 | |
965 | // For memory accounting, note current usage. |
966 | uint64_t memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
967 | |
968 | // Keep old bins intact for sindex adjustment and unwinding. |
969 | uint16_t n_old_bins = rd->n_bins; |
970 | as_bin* old_bins = rd->bins; |
971 | |
972 | uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle); |
973 | as_bin new_bins[n_new_bins]; |
974 | |
975 | memset(new_bins, 0, sizeof(new_bins)); |
976 | rd->n_bins = n_new_bins; |
977 | rd->bins = new_bins; |
978 | |
979 | // Fill the new bins and particles. |
980 | int result = unpickle_bins(rr, rd, NULL); |
981 | |
982 | if (result != 0) { |
983 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins " , ns->name); |
984 | destroy_stack_bins(new_bins, n_new_bins); |
985 | return result; |
986 | } |
987 | |
988 | // Apply changes to metadata in as_index needed for and writing. |
989 | index_metadata old_metadata; |
990 | |
991 | update_index_metadata(rr, &old_metadata, r); |
992 | |
993 | // Prepare to store or drop key, as determined by message. |
994 | rd->key = rr->key; |
995 | rd->key_size = rr->key_size; |
996 | |
997 | // Write the record to storage. |
998 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
999 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
1000 | unwind_index_metadata(&old_metadata, r); |
1001 | destroy_stack_bins(new_bins, n_new_bins); |
1002 | return -result; |
1003 | } |
1004 | |
1005 | // Success - adjust sindex, looking at old and new bins. |
1006 | if (! (skip_sindex && |
1007 | next_generation(r->generation, (uint16_t)rr->generation, ns)) && |
1008 | record_has_sindex(r, ns)) { |
1009 | write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd, |
1010 | old_bins, n_old_bins, new_bins, n_new_bins); |
1011 | } |
1012 | |
1013 | // Cleanup - destroy relevant bins, can't unwind after. |
1014 | destroy_stack_bins(old_bins, n_old_bins); |
1015 | |
1016 | // Fill out new_bin_space. |
1017 | as_bin_space* new_bin_space = NULL; |
1018 | |
1019 | if (n_new_bins != 0) { |
1020 | new_bin_space = (as_bin_space*) |
1021 | cf_malloc_ns(sizeof(as_bin_space) + sizeof(new_bins)); |
1022 | |
1023 | new_bin_space->n_bins = rd->n_bins; |
1024 | memcpy((void*)new_bin_space->bins, new_bins, sizeof(new_bins)); |
1025 | } |
1026 | |
1027 | // Swizzle the index element's as_bin_space pointer. |
1028 | as_bin_space* old_bin_space = as_index_get_bin_space(r); |
1029 | |
1030 | if (old_bin_space) { |
1031 | cf_free(old_bin_space); |
1032 | } |
1033 | |
1034 | as_index_set_bin_space(r, new_bin_space); |
1035 | |
1036 | // Now ok to store or drop key, as determined by message. |
1037 | as_record_finalize_key(r, ns, rd->key, rd->key_size); |
1038 | |
1039 | as_storage_record_adjust_mem_stats(rd, memory_bytes); |
1040 | *is_delete = n_new_bins == 0; |
1041 | |
1042 | return AS_OK; |
1043 | } |
1044 | |
1045 | |
1046 | // TODO - old pickle - remove in "six months". |
1047 | int |
1048 | old_record_apply_ssd_single_bin(as_remote_record *rr, as_storage_rd *rd, |
1049 | bool *is_delete) |
1050 | { |
1051 | as_namespace* ns = rr->rsv->ns; |
1052 | as_record* r = rd->r; |
1053 | |
1054 | uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle); |
1055 | |
1056 | if (n_new_bins > 1) { |
1057 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: single-bin got %u bins " , ns->name, n_new_bins); |
1058 | return AS_ERR_UNKNOWN; |
1059 | } |
1060 | |
1061 | as_bin stack_bin = { { 0 } }; |
1062 | |
1063 | rd->n_bins = 1; |
1064 | rd->bins = &stack_bin; |
1065 | |
1066 | // Fill the new bin and particle. |
1067 | cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE); |
1068 | |
1069 | int result; |
1070 | |
1071 | if (n_new_bins == 1 && |
1072 | (result = unpickle_bins(rr, rd, &particles_llb)) != 0) { |
1073 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bin " , ns->name); |
1074 | cf_ll_buf_free(&particles_llb); |
1075 | return result; |
1076 | } |
1077 | |
1078 | // Apply changes to metadata in as_index needed for and writing. |
1079 | index_metadata old_metadata; |
1080 | |
1081 | update_index_metadata(rr, &old_metadata, r); |
1082 | |
1083 | // Prepare to store or drop key, as determined by message. |
1084 | rd->key = rr->key; |
1085 | rd->key_size = rr->key_size; |
1086 | |
1087 | // Write the record to storage. |
1088 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
1089 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
1090 | unwind_index_metadata(&old_metadata, r); |
1091 | cf_ll_buf_free(&particles_llb); |
1092 | return -result; |
1093 | } |
1094 | |
1095 | // Now ok to store or drop key, as determined by message. |
1096 | as_record_finalize_key(r, ns, rd->key, rd->key_size); |
1097 | |
1098 | *is_delete = n_new_bins == 0; |
1099 | |
1100 | cf_ll_buf_free(&particles_llb); |
1101 | |
1102 | return AS_OK; |
1103 | } |
1104 | |
1105 | |
1106 | // TODO - old pickle - remove in "six months". |
1107 | int |
1108 | old_record_apply_ssd(as_remote_record *rr, as_storage_rd *rd, bool skip_sindex, |
1109 | bool *is_delete) |
1110 | { |
1111 | as_namespace* ns = rr->rsv->ns; |
1112 | as_record* r = rd->r; |
1113 | bool has_sindex = ! (skip_sindex && |
1114 | next_generation(r->generation, (uint16_t)rr->generation, ns)) && |
1115 | record_has_sindex(r, ns); |
1116 | |
1117 | uint16_t n_old_bins = 0; |
1118 | int result; |
1119 | |
1120 | if (has_sindex) { |
1121 | // Set rd->n_bins! |
1122 | if ((result = as_storage_rd_load_n_bins(rd)) < 0) { |
1123 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load n-bins " , ns->name); |
1124 | return -result; |
1125 | } |
1126 | |
1127 | n_old_bins = rd->n_bins; |
1128 | } |
1129 | |
1130 | as_bin old_bins[n_old_bins]; |
1131 | |
1132 | if (has_sindex) { |
1133 | // Set rd->bins! |
1134 | if ((result = as_storage_rd_load_bins(rd, old_bins)) < 0) { |
1135 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed load bins " , ns->name); |
1136 | return -result; |
1137 | } |
1138 | } |
1139 | |
1140 | // Stack space for resulting record's bins. |
1141 | uint16_t n_new_bins = cf_swap_from_be16(*(uint16_t *)rr->pickle); |
1142 | as_bin new_bins[n_new_bins]; |
1143 | |
1144 | memset(new_bins, 0, sizeof(new_bins)); |
1145 | rd->n_bins = n_new_bins; |
1146 | rd->bins = new_bins; |
1147 | |
1148 | // Fill the new bins and particles. |
1149 | cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE); |
1150 | |
1151 | if ((result = unpickle_bins(rr, rd, &particles_llb)) != 0) { |
1152 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed unpickle bins " , ns->name); |
1153 | cf_ll_buf_free(&particles_llb); |
1154 | return result; |
1155 | } |
1156 | |
1157 | // Apply changes to metadata in as_index needed for and writing. |
1158 | index_metadata old_metadata; |
1159 | |
1160 | update_index_metadata(rr, &old_metadata, r); |
1161 | |
1162 | // Prepare to store or drop key, as determined by message. |
1163 | rd->key = rr->key; |
1164 | rd->key_size = rr->key_size; |
1165 | |
1166 | // Write the record to storage. |
1167 | if ((result = as_record_write_from_pickle(rd)) < 0) { |
1168 | cf_warning_digest(AS_RECORD, rr->keyd, "{%s} record replace: failed write " , ns->name); |
1169 | unwind_index_metadata(&old_metadata, r); |
1170 | cf_ll_buf_free(&particles_llb); |
1171 | return -result; |
1172 | } |
1173 | |
1174 | // Success - adjust sindex, looking at old and new bins. |
1175 | if (has_sindex) { |
1176 | write_sindex_update(ns, as_index_get_set_name(r, ns), rr->keyd, |
1177 | old_bins, n_old_bins, new_bins, n_new_bins); |
1178 | } |
1179 | |
1180 | // Now ok to store or drop key, as determined by message. |
1181 | as_record_finalize_key(r, ns, rd->key, rd->key_size); |
1182 | |
1183 | *is_delete = n_new_bins == 0; |
1184 | |
1185 | cf_ll_buf_free(&particles_llb); |
1186 | |
1187 | return AS_OK; |
1188 | } |
1189 | |
1190 | |
1191 | void |
1192 | update_index_metadata(as_remote_record *rr, index_metadata *old, as_record *r) |
1193 | { |
1194 | old->void_time = r->void_time; |
1195 | old->last_update_time = r->last_update_time; |
1196 | old->generation = r->generation; |
1197 | |
1198 | r->generation = (uint16_t)rr->generation; |
1199 | r->void_time = trim_void_time(rr->void_time); |
1200 | r->last_update_time = rr->last_update_time; |
1201 | } |
1202 | |
1203 | |
1204 | void |
1205 | unwind_index_metadata(const index_metadata *old, as_record *r) |
1206 | { |
1207 | r->void_time = old->void_time; |
1208 | r->last_update_time = old->last_update_time; |
1209 | r->generation = old->generation; |
1210 | } |
1211 | |
1212 | |
1213 | void |
1214 | unwind_dim_single_bin(as_bin* old_bin, as_bin* new_bin) |
1215 | { |
1216 | if (as_bin_inuse(new_bin)) { |
1217 | as_bin_particle_destroy(new_bin, true); |
1218 | } |
1219 | |
1220 | as_single_bin_copy(new_bin, old_bin); |
1221 | } |
1222 | |
1223 | |
1224 | // TODO - old pickle - remove in "six months". |
1225 | int |
1226 | unpickle_bins(as_remote_record *rr, as_storage_rd *rd, cf_ll_buf *particles_llb) |
1227 | { |
1228 | as_namespace *ns = rd->ns; |
1229 | |
1230 | const uint8_t *end = rr->pickle + rr->pickle_sz; |
1231 | const uint8_t *buf = rr->pickle + 2; |
1232 | |
1233 | for (uint16_t i = 0; i < rd->n_bins; i++) { |
1234 | if (buf >= end) { |
1235 | cf_warning(AS_RECORD, "incomplete pickled record" ); |
1236 | return AS_ERR_UNKNOWN; |
1237 | } |
1238 | |
1239 | uint8_t name_sz = *buf++; |
1240 | const uint8_t *name = buf; |
1241 | |
1242 | buf += name_sz; |
1243 | buf++; // skipped byte was version |
1244 | |
1245 | if (buf > end) { |
1246 | cf_warning(AS_RECORD, "incomplete pickled record" ); |
1247 | return AS_ERR_UNKNOWN; |
1248 | } |
1249 | |
1250 | int result; |
1251 | as_bin *b = as_bin_create_from_buf(rd, name, name_sz, &result); |
1252 | |
1253 | if (! b) { |
1254 | return result; |
1255 | } |
1256 | |
1257 | if (ns->storage_data_in_memory) { |
1258 | if ((result = as_bin_particle_alloc_from_pickled(b, |
1259 | &buf, end)) < 0) { |
1260 | return -result; |
1261 | } |
1262 | } |
1263 | else { |
1264 | if ((result = as_bin_particle_stack_from_pickled(b, particles_llb, |
1265 | &buf, end)) < 0) { |
1266 | return -result; |
1267 | } |
1268 | } |
1269 | } |
1270 | |
1271 | if (buf != end) { |
1272 | cf_warning(AS_RECORD, "extra bytes on pickled record" ); |
1273 | return AS_ERR_UNKNOWN; |
1274 | } |
1275 | |
1276 | return AS_OK; |
1277 | } |
1278 | |
1279 | |
1280 | void |
1281 | xdr_write_replica(as_remote_record *rr, bool is_delete, uint32_t set_id) |
1282 | { |
1283 | uint16_t generation = (uint16_t)rr->generation; |
1284 | xdr_op_type op_type = XDR_OP_TYPE_WRITE; |
1285 | |
1286 | // Note - in this code path, only durable deletes get here. |
1287 | if (is_delete) { |
1288 | generation = 0; |
1289 | op_type = XDR_OP_TYPE_DURABLE_DELETE; |
1290 | } |
1291 | |
1292 | // Don't send an XDR delete if it's disallowed. |
1293 | if (is_delete && ! is_xdr_delete_shipping_enabled()) { |
1294 | // TODO - should we also not ship if there was no record here before? |
1295 | return; |
1296 | } |
1297 | |
1298 | xdr_write(rr->rsv->ns, rr->keyd, generation, rr->src, op_type, set_id, |
1299 | NULL); |
1300 | } |
1301 | |