1/*
2 * write.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/write.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32
33#include "aerospike/as_atomic.h"
34#include "citrusleaf/alloc.h"
35#include "citrusleaf/cf_atomic.h"
36#include "citrusleaf/cf_clock.h"
37
38#include "cf_mutex.h"
39#include "dynbuf.h"
40#include "fault.h"
41
42#include "base/cfg.h"
43#include "base/datamodel.h"
44#include "base/index.h"
45#include "base/predexp.h"
46#include "base/proto.h"
47#include "base/secondary_index.h"
48#include "base/transaction.h"
49#include "base/transaction_policy.h"
50#include "base/truncate.h"
51#include "base/xdr_serverside.h"
52#include "fabric/exchange.h" // TODO - old pickle - remove in "six months"
53#include "fabric/partition.h"
54#include "storage/storage.h"
55#include "transaction/duplicate_resolve.h"
56#include "transaction/proxy.h"
57#include "transaction/replica_write.h"
58#include "transaction/rw_request.h"
59#include "transaction/rw_request_hash.h"
60#include "transaction/rw_utils.h"
61
62
63//==========================================================
64// Typedefs & constants.
65//
66
67#define STACK_PARTICLES_SIZE (1024 * 1024)
68
69
70//==========================================================
71// Forward declarations.
72//
73
74void start_write_dup_res(rw_request* rw, as_transaction* tr);
75void start_write_repl_write(rw_request* rw, as_transaction* tr);
76void start_write_repl_write_forget(rw_request* rw, as_transaction* tr);
77bool write_dup_res_cb(rw_request* rw);
78void write_repl_write_after_dup_res(rw_request* rw, as_transaction* tr);
79void write_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr);
80void write_repl_write_cb(rw_request* rw);
81
82void send_write_response(as_transaction* tr, cf_dyn_buf* db);
83void write_timeout_cb(rw_request* rw);
84
85transaction_status write_master(rw_request* rw, as_transaction* tr);
86void write_master_failed(as_transaction* tr, as_index_ref* r_ref,
87 bool record_created, as_index_tree* tree, as_storage_rd* rd,
88 int result_code);
89int write_master_preprocessing(as_transaction* tr);
90int write_master_policies(as_transaction* tr, bool* p_must_not_create,
91 bool* p_record_level_replace, bool* p_must_fetch_data);
92bool check_msg_set_name(as_transaction* tr, const char* set_name);
93int iops_predexp_filter_meta(const as_transaction* tr, const as_record* r,
94 predexp_eval_t** predexp);
95
96int write_master_dim_single_bin(as_transaction* tr, as_storage_rd* rd,
97 rw_request* rw, bool* is_delete, xdr_dirty_bins* dirty_bins);
98int write_master_dim(as_transaction* tr, as_storage_rd* rd,
99 bool record_level_replace, rw_request* rw, bool* is_delete,
100 xdr_dirty_bins* dirty_bins);
101int write_master_ssd_single_bin(as_transaction* tr, as_storage_rd* rd,
102 bool must_fetch_data, rw_request* rw, bool* is_delete,
103 xdr_dirty_bins* dirty_bins);
104int write_master_ssd(as_transaction* tr, as_storage_rd* rd,
105 bool must_fetch_data, bool record_level_replace, rw_request* rw,
106 bool* is_delete, xdr_dirty_bins* dirty_bins);
107
108void write_master_update_index_metadata(as_transaction* tr, index_metadata* old,
109 as_record* r);
110int write_master_bin_ops(as_transaction* tr, as_storage_rd* rd,
111 cf_ll_buf* particles_llb, as_bin* cleanup_bins,
112 uint32_t* p_n_cleanup_bins, cf_dyn_buf* db, uint32_t* p_n_final_bins,
113 xdr_dirty_bins* dirty_bins);
114int write_master_bin_ops_loop(as_transaction* tr, as_storage_rd* rd,
115 as_msg_op** ops, as_bin* response_bins, uint32_t* p_n_response_bins,
116 as_bin* result_bins, uint32_t* p_n_result_bins,
117 cf_ll_buf* particles_llb, as_bin* cleanup_bins,
118 uint32_t* p_n_cleanup_bins, xdr_dirty_bins* dirty_bins);
119
120void write_master_index_metadata_unwind(index_metadata* old, as_record* r);
121void write_master_dim_single_bin_unwind(as_bin* old_bin, as_bin* new_bin,
122 as_bin* cleanup_bins, uint32_t n_cleanup_bins);
123void write_master_dim_unwind(as_bin* old_bins, uint32_t n_old_bins,
124 as_bin* new_bins, uint32_t n_new_bins, as_bin* cleanup_bins,
125 uint32_t n_cleanup_bins);
126
127
128//==========================================================
129// Inlines & macros.
130//
131
132static inline void
133client_write_update_stats(as_namespace* ns, uint8_t result_code, bool is_xdr_op)
134{
135 switch (result_code) {
136 case AS_OK:
137 cf_atomic64_incr(&ns->n_client_write_success);
138 if (is_xdr_op) {
139 cf_atomic64_incr(&ns->n_xdr_client_write_success);
140 }
141 break;
142 default:
143 cf_atomic64_incr(&ns->n_client_write_error);
144 if (is_xdr_op) {
145 cf_atomic64_incr(&ns->n_xdr_client_write_error);
146 }
147 break;
148 case AS_ERR_TIMEOUT:
149 cf_atomic64_incr(&ns->n_client_write_timeout);
150 if (is_xdr_op) {
151 cf_atomic64_incr(&ns->n_xdr_client_write_timeout);
152 }
153 break;
154 case AS_ERR_FILTERED_OUT:
155 // Can't be an XDR write.
156 cf_atomic64_incr(&ns->n_client_write_filtered_out);
157 break;
158 }
159}
160
161static inline void
162from_proxy_write_update_stats(as_namespace* ns, uint8_t result_code,
163 bool is_xdr_op)
164{
165 switch (result_code) {
166 case AS_OK:
167 cf_atomic64_incr(&ns->n_from_proxy_write_success);
168 if (is_xdr_op) {
169 cf_atomic64_incr(&ns->n_xdr_from_proxy_write_success);
170 }
171 break;
172 default:
173 cf_atomic64_incr(&ns->n_from_proxy_write_error);
174 if (is_xdr_op) {
175 cf_atomic64_incr(&ns->n_xdr_from_proxy_write_error);
176 }
177 break;
178 case AS_ERR_TIMEOUT:
179 cf_atomic64_incr(&ns->n_from_proxy_write_timeout);
180 if (is_xdr_op) {
181 cf_atomic64_incr(&ns->n_xdr_from_proxy_write_timeout);
182 }
183 break;
184 case AS_ERR_FILTERED_OUT:
185 // Can't be an XDR write.
186 cf_atomic64_incr(&ns->n_from_proxy_write_filtered_out);
187 break;
188 }
189}
190
191static inline void
192ops_sub_write_update_stats(as_namespace* ns, uint8_t result_code)
193{
194 switch (result_code) {
195 case AS_OK:
196 cf_atomic64_incr(&ns->n_ops_sub_write_success);
197 break;
198 default:
199 cf_atomic64_incr(&ns->n_ops_sub_write_error);
200 break;
201 case AS_ERR_TIMEOUT:
202 cf_atomic64_incr(&ns->n_ops_sub_write_timeout);
203 break;
204 case AS_ERR_FILTERED_OUT: // doesn't include those filtered out by metadata
205 as_incr_uint64(&ns->n_ops_sub_write_filtered_out);
206 break;
207 }
208}
209
210static inline void
211append_bin_to_destroy(as_bin* b, as_bin* bins, uint32_t* p_n_bins)
212{
213 if (as_bin_is_external_particle(b)) {
214 bins[(*p_n_bins)++] = *b;
215 }
216}
217
218
219//==========================================================
220// Public API.
221//
222
223transaction_status
224as_write_start(as_transaction* tr)
225{
226 BENCHMARK_START(tr, write, FROM_CLIENT);
227 BENCHMARK_START(tr, ops_sub, FROM_IOPS);
228
229 // Apply XDR filter.
230 if (! xdr_allows_write(tr)) {
231 tr->result_code = AS_ERR_ALWAYS_FORBIDDEN;
232 send_write_response(tr, NULL);
233 return TRANS_DONE_ERROR;
234 }
235
236 // Check that we aren't backed up.
237 if (as_storage_overloaded(tr->rsv.ns)) {
238 tr->result_code = AS_ERR_DEVICE_OVERLOAD;
239 send_write_response(tr, NULL);
240 return TRANS_DONE_ERROR;
241 }
242
243 // Create rw_request and add to hash.
244 rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd };
245 rw_request* rw = rw_request_create(&tr->keyd);
246 transaction_status status = rw_request_hash_insert(&hkey, rw, tr);
247
248 // If rw_request wasn't inserted in hash, transaction is finished.
249 if (status != TRANS_IN_PROGRESS) {
250 rw_request_release(rw);
251
252 if (status != TRANS_WAITING) {
253 send_write_response(tr, NULL);
254 }
255
256 return status;
257 }
258 // else - rw_request is now in hash, continue...
259
260 if (tr->rsv.ns->write_dup_res_disabled) {
261 // Note - preventing duplicate resolution this way allows
262 // rw_request_destroy() to handle dup_msg[] cleanup correctly.
263 tr->rsv.n_dupl = 0;
264 }
265
266 // If there are duplicates to resolve, start doing so.
267 if (tr->rsv.n_dupl != 0) {
268 start_write_dup_res(rw, tr);
269
270 // Started duplicate resolution.
271 return TRANS_IN_PROGRESS;
272 }
273 // else - no duplicate resolution phase, apply operation to master.
274
275 // Set up the nodes to which we'll write replicas.
276 rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p,
277 rw->dest_nodes);
278
279 if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) {
280 rw_request_hash_delete(&hkey, rw);
281 tr->result_code = AS_ERR_UNAVAILABLE;
282 send_write_response(tr, NULL);
283 return TRANS_DONE_ERROR;
284 }
285
286 status = write_master(rw, tr);
287
288 BENCHMARK_NEXT_DATA_POINT_FROM(tr, write, FROM_CLIENT, master);
289 BENCHMARK_NEXT_DATA_POINT_FROM(tr, ops_sub, FROM_IOPS, master);
290
291 // If error, transaction is finished.
292 if (status != TRANS_IN_PROGRESS) {
293 rw_request_hash_delete(&hkey, rw);
294
295 if (status != TRANS_WAITING) {
296 send_write_response(tr, NULL);
297 }
298
299 return status;
300 }
301
302 // If we don't need replica writes, transaction is finished.
303 if (rw->n_dest_nodes == 0) {
304 finished_replicated(tr);
305 send_write_response(tr, &rw->response_db);
306 rw_request_hash_delete(&hkey, rw);
307 return TRANS_DONE_SUCCESS;
308 }
309
310 // If we don't need to wait for replica write acks, fire and forget.
311 if (respond_on_master_complete(tr)) {
312 start_write_repl_write_forget(rw, tr);
313 send_write_response(tr, &rw->response_db);
314 rw_request_hash_delete(&hkey, rw);
315 return TRANS_DONE_SUCCESS;
316 }
317
318 start_write_repl_write(rw, tr);
319
320 // Started replica write.
321 return TRANS_IN_PROGRESS;
322}
323
324
325//==========================================================
326// Local helpers - transaction flow.
327//
328
329void
330start_write_dup_res(rw_request* rw, as_transaction* tr)
331{
332 // Finish initializing rw, construct and send dup-res message.
333
334 dup_res_make_message(rw, tr);
335
336 cf_mutex_lock(&rw->lock);
337
338 dup_res_setup_rw(rw, tr, write_dup_res_cb, write_timeout_cb);
339 send_rw_messages(rw);
340
341 cf_mutex_unlock(&rw->lock);
342}
343
344
345void
346start_write_repl_write(rw_request* rw, as_transaction* tr)
347{
348 // Finish initializing rw, construct and send repl-write message.
349
350 repl_write_make_message(rw, tr);
351
352 cf_mutex_lock(&rw->lock);
353
354 repl_write_setup_rw(rw, tr, write_repl_write_cb, write_timeout_cb);
355 send_rw_messages(rw);
356
357 cf_mutex_unlock(&rw->lock);
358}
359
360
361void
362start_write_repl_write_forget(rw_request* rw, as_transaction* tr)
363{
364 // Construct and send repl-write message. No need to finish rw setup.
365
366 repl_write_make_message(rw, tr);
367 send_rw_messages_forget(rw);
368}
369
370
371bool
372write_dup_res_cb(rw_request* rw)
373{
374 BENCHMARK_NEXT_DATA_POINT_FROM(rw, write, FROM_CLIENT, dup_res);
375 BENCHMARK_NEXT_DATA_POINT_FROM(rw, ops_sub, FROM_IOPS, dup_res);
376
377 as_transaction tr;
378 as_transaction_init_from_rw(&tr, rw);
379
380 if (tr.result_code != AS_OK) {
381 send_write_response(&tr, NULL);
382 return true;
383 }
384
385 // Set up the nodes to which we'll write replicas.
386 rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p,
387 rw->dest_nodes);
388
389 if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) {
390 tr.result_code = AS_ERR_UNAVAILABLE;
391 send_write_response(&tr, NULL);
392 return true;
393 }
394
395 transaction_status status = write_master(rw, &tr);
396
397 BENCHMARK_NEXT_DATA_POINT_FROM((&tr), write, FROM_CLIENT, master);
398 BENCHMARK_NEXT_DATA_POINT_FROM((&tr), ops_sub, FROM_IOPS, master);
399
400 if (status == TRANS_WAITING) {
401 // Note - new tr now owns msgp, make sure rw destructor doesn't free it.
402 // Also, rw will release rsv - new tr will get a new one.
403 rw->msgp = NULL;
404 return true;
405 }
406
407 if (status == TRANS_DONE_ERROR) {
408 send_write_response(&tr, NULL);
409 return true;
410 }
411
412 // If we don't need replica writes, transaction is finished.
413 if (rw->n_dest_nodes == 0) {
414 finished_replicated(&tr);
415 send_write_response(&tr, &rw->response_db);
416 return true;
417 }
418
419 // If we don't need to wait for replica write acks, fire and forget.
420 if (respond_on_master_complete(&tr)) {
421 write_repl_write_forget_after_dup_res(rw, &tr);
422 send_write_response(&tr, &rw->response_db);
423 return true;
424 }
425
426 write_repl_write_after_dup_res(rw, &tr);
427
428 // Started replica write - don't delete rw_request from hash.
429 return false;
430}
431
432
433void
434write_repl_write_after_dup_res(rw_request* rw, as_transaction* tr)
435{
436 // Recycle rw_request that was just used for duplicate resolution to now do
437 // replica writes. Note - we are under the rw_request lock here!
438
439 repl_write_make_message(rw, tr);
440 repl_write_reset_rw(rw, tr, write_repl_write_cb);
441 send_rw_messages(rw);
442}
443
444
445void
446write_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr)
447{
448 // Send replica writes. Not waiting for acks, so need to reset rw_request.
449 // Note - we are under the rw_request lock here!
450
451 repl_write_make_message(rw, tr);
452 send_rw_messages_forget(rw);
453}
454
455
456void
457write_repl_write_cb(rw_request* rw)
458{
459 BENCHMARK_NEXT_DATA_POINT_FROM(rw, write, FROM_CLIENT, repl_write);
460 BENCHMARK_NEXT_DATA_POINT_FROM(rw, ops_sub, FROM_IOPS, repl_write);
461
462 as_transaction tr;
463 as_transaction_init_from_rw(&tr, rw);
464
465 finished_replicated(&tr);
466 send_write_response(&tr, &rw->response_db);
467
468 // Finished transaction - rw_request cleans up reservation and msgp!
469}
470
471
472//==========================================================
473// Local helpers - transaction end.
474//
475
476void
477send_write_response(as_transaction* tr, cf_dyn_buf* db)
478{
479 // Paranoia - shouldn't get here on losing race with timeout.
480 if (! tr->from.any) {
481 cf_warning(AS_RW, "transaction origin %u has null 'from'", tr->origin);
482 return;
483 }
484
485 // Note - if tr was setup from rw, rw->from.any has been set null and
486 // informs timeout it lost the race.
487
488 clear_delete_response_metadata(tr);
489
490 switch (tr->origin) {
491 case FROM_CLIENT:
492 if (db && db->used_sz != 0) {
493 as_msg_send_ops_reply(tr->from.proto_fd_h, db);
494 }
495 else {
496 as_msg_send_reply(tr->from.proto_fd_h, tr->result_code,
497 tr->generation, tr->void_time, NULL, NULL, 0, tr->rsv.ns,
498 as_transaction_trid(tr));
499 }
500 BENCHMARK_NEXT_DATA_POINT(tr, write, response);
501 HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(tr, write_hist);
502 client_write_update_stats(tr->rsv.ns, tr->result_code,
503 as_transaction_is_xdr(tr));
504 break;
505 case FROM_PROXY:
506 if (db && db->used_sz != 0) {
507 as_proxy_send_ops_response(tr->from.proxy_node,
508 tr->from_data.proxy_tid, db);
509 }
510 else {
511 as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid,
512 tr->result_code, tr->generation, tr->void_time, NULL, NULL,
513 0, tr->rsv.ns, as_transaction_trid(tr));
514 }
515 from_proxy_write_update_stats(tr->rsv.ns, tr->result_code,
516 as_transaction_is_xdr(tr));
517 break;
518 case FROM_IOPS:
519 tr->from.iops_orig->cb(tr->from.iops_orig->udata, tr->result_code);
520 BENCHMARK_NEXT_DATA_POINT(tr, ops_sub, response);
521 ops_sub_write_update_stats(tr->rsv.ns, tr->result_code);
522 break;
523 default:
524 cf_crash(AS_RW, "unexpected transaction origin %u", tr->origin);
525 break;
526 }
527
528 tr->from.any = NULL; // pattern, not needed
529}
530
531
532void
533write_timeout_cb(rw_request* rw)
534{
535 if (! rw->from.any) {
536 return; // lost race against dup-res or repl-write callback
537 }
538
539 finished_not_replicated(rw);
540
541 switch (rw->origin) {
542 case FROM_CLIENT:
543 as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL,
544 0, rw->rsv.ns, rw_request_trid(rw));
545 // Timeouts aren't included in histograms.
546 client_write_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT,
547 as_msg_is_xdr(&rw->msgp->msg));
548 break;
549 case FROM_PROXY:
550 from_proxy_write_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT,
551 as_msg_is_xdr(&rw->msgp->msg));
552 break;
553 case FROM_IOPS:
554 rw->from.iops_orig->cb(rw->from.iops_orig->udata, AS_ERR_TIMEOUT);
555 // Timeouts aren't included in histograms.
556 ops_sub_write_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
557 break;
558 default:
559 cf_crash(AS_RW, "unexpected transaction origin %u", rw->origin);
560 break;
561 }
562
563 rw->from.any = NULL; // inform other callback it lost the race
564}
565
566
567//==========================================================
568// Local helpers - write master.
569//
570
571transaction_status
572write_master(rw_request* rw, as_transaction* tr)
573{
574 CF_ALLOC_SET_NS_ARENA(tr->rsv.ns);
575
576 //------------------------------------------------------
577 // Perform checks that don't need to loop over ops, or
578 // create or find (and lock) the as_index.
579 //
580
581 if (! write_master_preprocessing(tr)) {
582 // Failure cases all call write_master_failed().
583 return TRANS_DONE_ERROR;
584 }
585
586 //------------------------------------------------------
587 // Loop over ops to set some essential policy flags.
588 //
589
590 bool must_not_create;
591 bool record_level_replace;
592 bool must_fetch_data;
593
594 int result = write_master_policies(tr, &must_not_create,
595 &record_level_replace, &must_fetch_data);
596
597 if (result != 0) {
598 write_master_failed(tr, 0, false, 0, 0, result);
599 return TRANS_DONE_ERROR;
600 }
601
602 //------------------------------------------------------
603 // Find or create the as_index and get a reference -
604 // this locks the record. Perform all checks that don't
605 // need the as_storage_rd.
606 //
607
608 // Shortcut pointers.
609 as_msg* m = &tr->msgp->msg;
610 as_namespace* ns = tr->rsv.ns;
611 as_index_tree* tree = tr->rsv.tree;
612
613 // Find or create as_index, populate as_index_ref, lock record.
614 as_index_ref r_ref;
615 as_record* r = NULL;
616 bool record_created = false;
617
618 if (must_not_create) {
619 if (as_record_get(tree, &tr->keyd, &r_ref) != 0) {
620 write_master_failed(tr, 0, record_created, tree, 0, AS_ERR_NOT_FOUND);
621 return TRANS_DONE_ERROR;
622 }
623
624 r = r_ref.r;
625
626 if (as_record_is_doomed(r, ns)) {
627 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_NOT_FOUND);
628 return TRANS_DONE_ERROR;
629 }
630
631 if (repl_state_check(r, tr) < 0) {
632 as_record_done(&r_ref, ns);
633 return TRANS_WAITING;
634 }
635
636 if (! as_record_is_live(r)) {
637 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_NOT_FOUND);
638 return TRANS_DONE_ERROR;
639 }
640 }
641 else {
642 int rv = as_record_get_create(tree, &tr->keyd, &r_ref, ns);
643
644 if (rv < 0) {
645 cf_detail_digest(AS_RW, &tr->keyd, "{%s} write_master: fail as_record_get_create() ", ns->name);
646 write_master_failed(tr, 0, record_created, tree, 0, AS_ERR_UNKNOWN);
647 return TRANS_DONE_ERROR;
648 }
649
650 r = r_ref.r;
651 record_created = rv == 1;
652
653 bool is_doomed = as_record_is_doomed(r, ns);
654
655 if (! record_created && ! is_doomed && repl_state_check(r, tr) < 0) {
656 as_record_done(&r_ref, ns);
657 return TRANS_WAITING;
658 }
659
660 // If it's an expired or truncated record, pretend it's a fresh create.
661 if (! record_created && is_doomed) {
662 as_record_rescue(&r_ref, ns);
663 record_created = true;
664 }
665 }
666
667 // Enforce record-level create-only existence policy.
668 if (! record_created && ! create_only_check(r, m)) {
669 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_RECORD_EXISTS);
670 return TRANS_DONE_ERROR;
671 }
672
673 // Check generation requirement, if any.
674 if (! generation_check(r, m, ns)) {
675 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_GENERATION);
676 return TRANS_DONE_ERROR;
677 }
678
679 // If creating record, write set-ID into index.
680 if (record_created) {
681 int rv_set = as_transaction_has_set(tr) ?
682 set_set_from_msg(r, ns, m) : 0;
683
684 if (rv_set == -1) {
685 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: set can't be added ", ns->name);
686 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_PARAMETER);
687 return TRANS_DONE_ERROR;
688 }
689 else if (rv_set == -2) {
690 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_FORBIDDEN);
691 return TRANS_DONE_ERROR;
692 }
693
694 // Don't write record if it would be truncated.
695 if (as_truncate_now_is_truncated(ns, as_index_get_set_id(r))) {
696 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_FORBIDDEN);
697 return TRANS_DONE_ERROR;
698 }
699 }
700
701 // Shortcut set name.
702 const char* set_name = as_index_get_set_name(r, ns);
703
704 // If record existed, check that as_msg set name matches.
705 if (! record_created && tr->origin != FROM_IOPS &&
706 ! check_msg_set_name(tr, set_name)) {
707 write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_PARAMETER);
708 return TRANS_DONE_ERROR;
709 }
710
711 // Apply predexp metadata filter if present.
712
713 predexp_eval_t* predexp = NULL;
714 predexp_eval_t* iops_predexp = NULL;
715
716 if (! record_created && as_record_is_live(r) &&
717 (result = tr->origin == FROM_IOPS ?
718 iops_predexp_filter_meta(tr, r, &iops_predexp) :
719 build_predexp_and_filter_meta(tr, r, &predexp)) != 0) {
720 write_master_failed(tr, &r_ref, false, tree, 0, result);
721 return TRANS_DONE_ERROR;
722 }
723
724 //------------------------------------------------------
725 // Open or create the as_storage_rd, and handle record
726 // metadata.
727 //
728
729 as_storage_rd rd;
730
731 if (record_created) {
732 as_storage_record_create(ns, r, &rd);
733 }
734 else {
735 as_storage_record_open(ns, r, &rd);
736 }
737
738 // Apply predexp record bins filter if present.
739 if (predexp != NULL || iops_predexp != NULL) {
740 if ((result = predexp_read_and_filter_bins(&rd,
741 tr->origin == FROM_IOPS ? iops_predexp : predexp)) != 0) {
742 predexp_destroy(predexp);
743 write_master_failed(tr, &r_ref, false, tree, &rd, result);
744 return TRANS_DONE_ERROR;
745 }
746
747 predexp_destroy(predexp);
748 }
749
750 // Deal with delete durability (enterprise only).
751 if ((result = set_delete_durablility(tr, &rd)) != 0) {
752 write_master_failed(tr, &r_ref, record_created, tree, &rd, result);
753 return TRANS_DONE_ERROR;
754 }
755
756 // Shortcut for set name storage.
757 if (set_name) {
758 rd.set_name = set_name;
759 rd.set_name_len = strlen(set_name);
760 }
761
762 // Deal with key storage as needed.
763 if ((result = handle_msg_key(tr, &rd)) != 0) {
764 write_master_failed(tr, &r_ref, record_created, tree, &rd, result);
765 return TRANS_DONE_ERROR;
766 }
767
768 // Convert message TTL special value if appropriate.
769 if (record_created && m->record_ttl == TTL_DONT_UPDATE) {
770 m->record_ttl = TTL_NAMESPACE_DEFAULT;
771 }
772
773 // Will we need a pickle?
774 // TODO - old pickle - remove condition in "six months".
775 if (as_exchange_min_compatibility_id() >= 3) {
776 rd.keep_pickle = rw->n_dest_nodes != 0;
777 }
778
779 //------------------------------------------------------
780 // Split write_master() according to configuration to
781 // handle record bins.
782 //
783
784 xdr_dirty_bins dirty_bins;
785 xdr_clear_dirty_bins(&dirty_bins);
786
787 bool is_delete = false;
788
789 if (ns->storage_data_in_memory) {
790 if (ns->single_bin) {
791 result = write_master_dim_single_bin(tr, &rd,
792 rw, &is_delete, &dirty_bins);
793 }
794 else {
795 result = write_master_dim(tr, &rd,
796 record_level_replace,
797 rw, &is_delete, &dirty_bins);
798 }
799 }
800 else {
801 if (ns->single_bin) {
802 result = write_master_ssd_single_bin(tr, &rd,
803 must_fetch_data,
804 rw, &is_delete, &dirty_bins);
805 }
806 else {
807 result = write_master_ssd(tr, &rd,
808 must_fetch_data, record_level_replace,
809 rw, &is_delete, &dirty_bins);
810 }
811 }
812
813 if (result != 0) {
814 write_master_failed(tr, &r_ref, record_created, tree, &rd, result);
815 return TRANS_DONE_ERROR;
816 }
817
818 //------------------------------------------------------
819 // Done - complete function's output, release the record
820 // lock, and do XDR write if appropriate.
821 //
822
823 tr->generation = r->generation;
824 tr->void_time = r->void_time;
825 tr->last_update_time = r->last_update_time;
826
827 // Get set-id before releasing.
828 uint16_t set_id = as_index_get_set_id(r_ref.r);
829
830 // Collect more info for XDR.
831 uint16_t generation = plain_generation(r->generation, ns);
832 xdr_op_type op_type = XDR_OP_TYPE_WRITE;
833
834 // Handle deletion if appropriate.
835 if (is_delete) {
836 write_delete_record(r_ref.r, tree);
837 cf_atomic64_incr(&ns->n_deleted_last_bin);
838 tr->flags |= AS_TRANSACTION_FLAG_IS_DELETE;
839
840 generation = 0;
841 op_type = as_transaction_is_durable_delete(tr) ?
842 XDR_OP_TYPE_DURABLE_DELETE : XDR_OP_TYPE_DROP;
843 }
844 // Or (normally) adjust max void-time.
845 else if (r->void_time != 0) {
846 cf_atomic32_setmax(&tr->rsv.p->max_void_time, (int32_t)r->void_time);
847 }
848
849 will_replicate(r, ns);
850
851 as_storage_record_close(&rd);
852 as_record_done(&r_ref, ns);
853
854 // Don't send an XDR delete if it's disallowed.
855 if (is_delete && ! is_xdr_delete_shipping_enabled()) {
856 return TRANS_IN_PROGRESS;
857 }
858
859 // Do an XDR write if the write is a non-XDR write or is an XDR write with
860 // forwarding enabled.
861 if (! as_msg_is_xdr(m) || is_xdr_forwarding_enabled() ||
862 ns->ns_forward_xdr_writes) {
863 xdr_write(ns, &tr->keyd, generation, 0, op_type, set_id, &dirty_bins);
864 }
865
866 return TRANS_IN_PROGRESS;
867}
868
869
870void
871write_master_failed(as_transaction* tr, as_index_ref* r_ref,
872 bool record_created, as_index_tree* tree, as_storage_rd* rd,
873 int result_code)
874{
875 as_namespace* ns = tr->rsv.ns;
876
877 if (r_ref) {
878 if (rd) {
879 as_storage_record_close(rd);
880 }
881
882 if (record_created) {
883 as_index_delete(tree, &tr->keyd);
884 }
885
886 as_record_done(r_ref, ns);
887 }
888
889 switch (result_code) {
890 case AS_ERR_GENERATION:
891 cf_atomic64_incr(&ns->n_fail_generation);
892 break;
893 case AS_ERR_RECORD_TOO_BIG:
894 cf_detail_digest(AS_RW, &tr->keyd, "{%s} write_master: record too big ", ns->name);
895 cf_atomic64_incr(&ns->n_fail_record_too_big);
896 break;
897 default:
898 // These either log warnings or aren't interesting enough to count.
899 break;
900 }
901
902 tr->result_code = (uint8_t)result_code;
903}
904
905
906int
907write_master_preprocessing(as_transaction* tr)
908{
909 as_namespace* ns = tr->rsv.ns;
910 as_msg* m = &tr->msgp->msg;
911
912 if (ns->clock_skew_stop_writes) {
913 // TODO - new error code?
914 write_master_failed(tr, 0, false, 0, 0, AS_ERR_FORBIDDEN);
915 return false;
916 }
917
918 // ns->stop_writes is set by nsup if configured threshold is breached.
919 if (ns->stop_writes) {
920 write_master_failed(tr, 0, false, 0, 0, AS_ERR_OUT_OF_SPACE);
921 return false;
922 }
923
924 if (! as_storage_has_space(ns)) {
925 cf_warning(AS_RW, "{%s}: write_master: drives full", ns->name);
926 write_master_failed(tr, 0, false, 0, 0, AS_ERR_OUT_OF_SPACE);
927 return false;
928 }
929
930 if (! is_valid_ttl(m->record_ttl)) {
931 cf_warning(AS_RW, "write_master: invalid ttl %u", m->record_ttl);
932 write_master_failed(tr, 0, false, 0, 0, AS_ERR_PARAMETER);
933 return false;
934 }
935
936 // Fail if disallow_null_setname is true and set name is absent or empty.
937 if (ns->disallow_null_setname) {
938 as_msg_field* f = as_transaction_has_set(tr) ?
939 as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET) : NULL;
940
941 if (! f || as_msg_field_get_value_sz(f) == 0) {
942 cf_warning(AS_RW, "write_master: null/empty set name not allowed for namespace %s", ns->name);
943 write_master_failed(tr, 0, false, 0, 0, AS_ERR_PARAMETER);
944 return false;
945 }
946 }
947
948 return true;
949}
950
951
952int
953write_master_policies(as_transaction* tr, bool* p_must_not_create,
954 bool* p_record_level_replace, bool* p_must_fetch_data)
955{
956 // Shortcut pointers.
957 as_msg* m = &tr->msgp->msg;
958 as_namespace* ns = tr->rsv.ns;
959
960 if (m->n_ops == 0) {
961 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bin op(s) expected, none present ", ns->name);
962 return AS_ERR_PARAMETER;
963 }
964
965 bool info1_get_all = (m->info1 & AS_MSG_INFO1_GET_ALL) != 0;
966 bool respond_all_ops = (m->info2 & AS_MSG_INFO2_RESPOND_ALL_OPS) != 0;
967
968 bool must_not_create =
969 (m->info3 & AS_MSG_INFO3_UPDATE_ONLY) != 0 ||
970 (m->info3 & AS_MSG_INFO3_REPLACE_ONLY) != 0;
971
972 bool record_level_replace =
973 (m->info3 & AS_MSG_INFO3_CREATE_OR_REPLACE) != 0 ||
974 (m->info3 & AS_MSG_INFO3_REPLACE_ONLY) != 0;
975
976 bool single_bin_write_first = false;
977 bool has_read_op = false;
978 bool has_read_all_op = false;
979 bool generates_response_bin = false;
980
981 // Loop over ops to check and modify flags.
982 as_msg_op* op = NULL;
983 int i = 0;
984
985 while ((op = as_msg_op_iterate(m, op, &i)) != NULL) {
986 if (op->op == AS_MSG_OP_TOUCH) {
987 if (record_level_replace) {
988 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: touch op can't have record-level replace flag ", ns->name);
989 return AS_ERR_PARAMETER;
990 }
991
992 must_not_create = true;
993 continue;
994 }
995
996 if (ns->data_in_index &&
997 ! is_embedded_particle_type(op->particle_type) &&
998 // Allow AS_PARTICLE_TYPE_NULL, although bin-delete operations
999 // are not likely in single-bin configuration.
1000 op->particle_type != AS_PARTICLE_TYPE_NULL) {
1001 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: can't write data type %u in data-in-index configuration ", ns->name, op->particle_type);
1002 return AS_ERR_INCOMPATIBLE_TYPE;
1003 }
1004
1005 if (op->name_sz >= AS_BIN_NAME_MAX_SZ) {
1006 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bin name too long (%d) ", ns->name, op->name_sz);
1007 return AS_ERR_BIN_NAME;
1008 }
1009
1010 if (op->op == AS_MSG_OP_WRITE) {
1011 if (op->particle_type == AS_PARTICLE_TYPE_NULL &&
1012 record_level_replace) {
1013 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bin delete can't have record-level replace flag ", ns->name);
1014 return AS_ERR_PARAMETER;
1015 }
1016
1017 if (ns->single_bin && i == 0) {
1018 single_bin_write_first = true;
1019 }
1020 }
1021 else if (OP_IS_MODIFY(op->op)) {
1022 if (record_level_replace) {
1023 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: modify op can't have record-level replace flag ", ns->name);
1024 return AS_ERR_PARAMETER;
1025 }
1026 }
1027 else if (op->op == AS_MSG_OP_DELETE_ALL) {
1028 if (record_level_replace) {
1029 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: delete-all op can't have record-level replace flag ", ns->name);
1030 return AS_ERR_PARAMETER;
1031 }
1032
1033 // Could forbid multiple delete-alls and delete-all being first op
1034 // (should use replace), but these are nonsensical, not unworkable.
1035 }
1036 else if (op_is_read_all(op, m)) {
1037 if (respond_all_ops) {
1038 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: read-all op can't have respond-all-ops flag ", ns->name);
1039 return AS_ERR_PARAMETER;
1040 }
1041
1042 if (has_read_all_op) {
1043 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: can't have more than one read-all op ", ns->name);
1044 return AS_ERR_PARAMETER;
1045 }
1046
1047 has_read_op = true;
1048 has_read_all_op = true;
1049 }
1050 else if (op->op == AS_MSG_OP_READ) {
1051 has_read_op = true;
1052 generates_response_bin = true;
1053 }
1054 else if (op->op == AS_MSG_OP_BITS_MODIFY) {
1055 if (record_level_replace) {
1056 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bits modify op can't have record-level replace flag ", ns->name);
1057 return AS_ERR_PARAMETER;
1058 }
1059 }
1060 else if (op->op == AS_MSG_OP_BITS_READ) {
1061 has_read_op = true;
1062 generates_response_bin = true;
1063 }
1064 else if (op->op == AS_MSG_OP_CDT_MODIFY) {
1065 if (record_level_replace) {
1066 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: cdt modify op can't have record-level replace flag ", ns->name);
1067 return AS_ERR_PARAMETER;
1068 }
1069
1070 generates_response_bin = true; // CDT modify may generate a response bin
1071 }
1072 else if (op->op == AS_MSG_OP_CDT_READ) {
1073 has_read_op = true;
1074 generates_response_bin = true;
1075 }
1076 }
1077
1078 if (has_read_op && (m->info1 & AS_MSG_INFO1_READ) == 0) {
1079 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: has read op but read flag not set ", ns->name);
1080 return AS_ERR_PARAMETER;
1081 }
1082
1083 if (has_read_all_op && generates_response_bin) {
1084 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: read-all op can't mix with ops that generate response bins ", ns->name);
1085 return AS_ERR_PARAMETER;
1086 }
1087
1088 if (info1_get_all && ! has_read_all_op) {
1089 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: get-all flag set with no read-all op ", ns->name);
1090 return AS_ERR_PARAMETER;
1091 }
1092
1093 bool must_fetch_data = ! record_level_replace;
1094 // Multi-bin case may modify this to force fetch if there's a sindex.
1095
1096 if (single_bin_write_first && must_fetch_data) {
1097 must_fetch_data = false;
1098 }
1099
1100 *p_must_not_create = must_not_create;
1101 *p_record_level_replace = record_level_replace;
1102 *p_must_fetch_data = must_fetch_data;
1103
1104 return 0;
1105}
1106
1107
1108bool
1109check_msg_set_name(as_transaction* tr, const char* set_name)
1110{
1111 as_msg_field* f = as_transaction_has_set(tr) ?
1112 as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_SET) : NULL;
1113
1114 if (! f || as_msg_field_get_value_sz(f) == 0) {
1115 if (set_name) {
1116 cf_warning_digest(AS_RW, &tr->keyd, "overwriting record in set '%s' but msg has no set name ",
1117 set_name);
1118 }
1119
1120 return true;
1121 }
1122
1123 size_t msg_set_name_len = as_msg_field_get_value_sz(f);
1124
1125 if (! set_name ||
1126 strncmp(set_name, (const char*)f->data, msg_set_name_len) != 0 ||
1127 set_name[msg_set_name_len] != 0) {
1128 CF_ZSTR_DEFINE(msg_set_name, AS_SET_NAME_MAX_SIZE + 4, f->data,
1129 msg_set_name_len);
1130
1131 cf_warning_digest(AS_RW, &tr->keyd, "overwriting record in set '%s' but msg has different set name '%s' ",
1132 set_name ? set_name : "(null)", msg_set_name);
1133 return false;
1134 }
1135
1136 return true;
1137}
1138
1139
1140int
1141iops_predexp_filter_meta(const as_transaction* tr, const as_record* r,
1142 predexp_eval_t** predexp)
1143{
1144 *predexp = tr->from.iops_orig->predexp;
1145
1146 if (*predexp == NULL) {
1147 return AS_OK;
1148 }
1149
1150 predexp_args_t predargs = { .ns = tr->rsv.ns, .md = (as_record*)r };
1151 predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs);
1152
1153 if (predrv == PREDEXP_UNKNOWN) {
1154 return AS_OK; // caller must later check bins using *predexp
1155 }
1156 // else - caller will not need to apply filter later.
1157
1158 *predexp = NULL;
1159
1160 return predrv == PREDEXP_TRUE ? AS_OK : AS_ERR_FILTERED_OUT;
1161}
1162
1163
1164//==========================================================
1165// write_master() splits based on configuration -
1166// data-in-memory & single-bin.
1167//
1168// These handle the bin operations part of write_master()
1169// which are very different per configuration.
1170//
1171
1172int
1173write_master_dim_single_bin(as_transaction* tr, as_storage_rd* rd,
1174 rw_request* rw, bool* is_delete, xdr_dirty_bins* dirty_bins)
1175{
1176 // Shortcut pointers.
1177 as_msg* m = &tr->msgp->msg;
1178 as_namespace* ns = tr->rsv.ns;
1179 as_record* r = rd->r;
1180
1181 rd->n_bins = 1;
1182
1183 // Set rd->bins!
1184 // For data-in-memory:
1185 // - if just created record - sets rd->bins to empty bin embedded in index
1186 // - otherwise - sets rd->bins to existing embedded bin
1187 as_storage_rd_load_bins(rd, NULL);
1188
1189 // For memory accounting, note current usage.
1190 uint64_t memory_bytes = 0;
1191
1192 if (as_bin_inuse(rd->bins)) {
1193 memory_bytes = as_storage_record_get_n_bytes_memory(rd);
1194 }
1195
1196 //------------------------------------------------------
1197 // Copy existing bin into old_bin to enable unwinding.
1198 //
1199
1200 uint32_t n_old_bins = as_bin_inuse(rd->bins) ? 1 : 0;
1201 as_bin old_bin;
1202
1203 as_single_bin_copy(&old_bin, rd->bins);
1204
1205 // Collect bins (old or intermediate versions) to destroy on cleanup.
1206 as_bin cleanup_bins[m->n_ops];
1207 uint32_t n_cleanup_bins = 0;
1208
1209 //------------------------------------------------------
1210 // Apply changes to metadata in as_index needed for
1211 // response, pickling, and writing.
1212 //
1213
1214 index_metadata old_metadata;
1215
1216 write_master_update_index_metadata(tr, &old_metadata, r);
1217
1218 //------------------------------------------------------
1219 // Loop over bin ops to affect new bin space, creating
1220 // the new record bin to write.
1221 //
1222
1223 uint32_t n_new_bins = 0;
1224 int result = write_master_bin_ops(tr, rd, NULL, cleanup_bins,
1225 &n_cleanup_bins, &rw->response_db, &n_new_bins, dirty_bins);
1226
1227 if (result != 0) {
1228 write_master_index_metadata_unwind(&old_metadata, r);
1229 write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins);
1230 return result;
1231 }
1232
1233 //------------------------------------------------------
1234 // Created the new bin to write.
1235 //
1236
1237 if (n_new_bins == 0) {
1238 if (n_old_bins == 0) {
1239 write_master_index_metadata_unwind(&old_metadata, r);
1240 write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins);
1241 return AS_ERR_NOT_FOUND;
1242 }
1243
1244 if (! validate_delete_durability(tr)) {
1245 write_master_index_metadata_unwind(&old_metadata, r);
1246 write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins);
1247 return AS_ERR_FORBIDDEN;
1248 }
1249
1250 *is_delete = true;
1251 }
1252
1253 //------------------------------------------------------
1254 // Write the record to storage.
1255 //
1256
1257 if ((result = as_storage_record_write(rd)) < 0) {
1258 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() ", ns->name);
1259 write_master_index_metadata_unwind(&old_metadata, r);
1260 write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins);
1261 return -result;
1262 }
1263
1264 pickle_all(rd, rw);
1265
1266 //------------------------------------------------------
1267 // Cleanup - destroy relevant bins, can't unwind after.
1268 //
1269
1270 destroy_stack_bins(cleanup_bins, n_cleanup_bins);
1271
1272 as_storage_record_adjust_mem_stats(rd, memory_bytes);
1273
1274 return 0;
1275}
1276
1277
1278int
1279write_master_dim(as_transaction* tr, as_storage_rd* rd,
1280 bool record_level_replace, rw_request* rw, bool* is_delete,
1281 xdr_dirty_bins* dirty_bins)
1282{
1283 // Shortcut pointers.
1284 as_msg* m = &tr->msgp->msg;
1285 as_namespace* ns = tr->rsv.ns;
1286 as_record* r = rd->r;
1287
1288 // Set rd->n_bins!
1289 // For data-in-memory - number of bins in existing record.
1290 as_storage_rd_load_n_bins(rd);
1291
1292 // Set rd->bins!
1293 // For data-in-memory:
1294 // - if just created record - sets rd->bins to NULL
1295 // - otherwise - sets rd->bins to existing (already populated) bins array
1296 as_storage_rd_load_bins(rd, NULL);
1297
1298 // For memory accounting, note current usage.
1299 uint64_t memory_bytes = as_storage_record_get_n_bytes_memory(rd);
1300
1301 //------------------------------------------------------
1302 // Copy existing bins to new space, and keep old bins
1303 // intact for sindex adjustment and so it's possible to
1304 // unwind on failure.
1305 //
1306
1307 uint32_t n_old_bins = (uint32_t)rd->n_bins;
1308 uint32_t n_new_bins = n_old_bins + m->n_ops; // can't be more than this
1309
1310 size_t old_bins_size = n_old_bins * sizeof(as_bin);
1311 size_t new_bins_size = n_new_bins * sizeof(as_bin);
1312
1313 as_bin* old_bins = rd->bins;
1314 as_bin new_bins[n_new_bins];
1315
1316 if (old_bins_size == 0 || record_level_replace) {
1317 memset(new_bins, 0, new_bins_size);
1318 }
1319 else {
1320 memcpy(new_bins, old_bins, old_bins_size);
1321 memset(new_bins + n_old_bins, 0, new_bins_size - old_bins_size);
1322 }
1323
1324 rd->n_bins = (uint16_t)n_new_bins;
1325 rd->bins = new_bins;
1326
1327 // Collect bins (old or intermediate versions) to destroy on cleanup.
1328 as_bin cleanup_bins[m->n_ops];
1329 uint32_t n_cleanup_bins = 0;
1330
1331 //------------------------------------------------------
1332 // Apply changes to metadata in as_index needed for
1333 // response, pickling, and writing.
1334 //
1335
1336 index_metadata old_metadata;
1337
1338 write_master_update_index_metadata(tr, &old_metadata, r);
1339
1340 //------------------------------------------------------
1341 // Loop over bin ops to affect new bin space, creating
1342 // the new record bins to write.
1343 //
1344
1345 int result = write_master_bin_ops(tr, rd, NULL, cleanup_bins,
1346 &n_cleanup_bins, &rw->response_db, &n_new_bins, dirty_bins);
1347
1348 if (result != 0) {
1349 write_master_index_metadata_unwind(&old_metadata, r);
1350 write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins);
1351 return result;
1352 }
1353
1354 //------------------------------------------------------
1355 // Created the new bins to write.
1356 //
1357
1358 as_bin_space* new_bin_space = NULL;
1359
1360 // Adjust - the actual number of new bins.
1361 rd->n_bins = n_new_bins;
1362
1363 if (n_new_bins != 0) {
1364 new_bins_size = n_new_bins * sizeof(as_bin);
1365 new_bin_space = (as_bin_space*)
1366 cf_malloc_ns(sizeof(as_bin_space) + new_bins_size);
1367 }
1368 else {
1369 if (n_old_bins == 0) {
1370 write_master_index_metadata_unwind(&old_metadata, r);
1371 write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins);
1372 return AS_ERR_NOT_FOUND;
1373 }
1374
1375 if (! validate_delete_durability(tr)) {
1376 write_master_index_metadata_unwind(&old_metadata, r);
1377 write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins);
1378 return AS_ERR_FORBIDDEN;
1379 }
1380
1381 *is_delete = true;
1382 }
1383
1384 //------------------------------------------------------
1385 // Write the record to storage.
1386 //
1387
1388 if ((result = as_storage_record_write(rd)) < 0) {
1389 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() ", ns->name);
1390
1391 if (new_bin_space) {
1392 cf_free(new_bin_space);
1393 }
1394
1395 write_master_index_metadata_unwind(&old_metadata, r);
1396 write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins);
1397 return -result;
1398 }
1399
1400 pickle_all(rd, rw);
1401
1402 //------------------------------------------------------
1403 // Success - adjust sindex, looking at old and new bins.
1404 //
1405
1406 if (record_has_sindex(r, ns) &&
1407 write_sindex_update(ns, rd->set_name, &tr->keyd, old_bins,
1408 n_old_bins, new_bins, n_new_bins)) {
1409 tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED;
1410 }
1411
1412 //------------------------------------------------------
1413 // Cleanup - destroy relevant bins, can't unwind after.
1414 //
1415
1416 if (record_level_replace) {
1417 destroy_stack_bins(old_bins, n_old_bins);
1418 }
1419
1420 destroy_stack_bins(cleanup_bins, n_cleanup_bins);
1421
1422 //------------------------------------------------------
1423 // Final changes to record data in as_index.
1424 //
1425
1426 // Fill out new_bin_space.
1427 if (n_new_bins != 0) {
1428 new_bin_space->n_bins = rd->n_bins;
1429 memcpy((void*)new_bin_space->bins, new_bins, new_bins_size);
1430 }
1431
1432 // Swizzle the index element's as_bin_space pointer.
1433 as_bin_space* old_bin_space = as_index_get_bin_space(r);
1434
1435 if (old_bin_space) {
1436 cf_free(old_bin_space);
1437 }
1438
1439 as_index_set_bin_space(r, new_bin_space);
1440
1441 // Accommodate a new stored key - wasn't needed for pickling and writing.
1442 if (r->key_stored == 0 && rd->key) {
1443 as_record_allocate_key(r, rd->key, rd->key_size);
1444 r->key_stored = 1;
1445 }
1446
1447 as_storage_record_adjust_mem_stats(rd, memory_bytes);
1448
1449 return 0;
1450}
1451
1452
1453int
1454write_master_ssd_single_bin(as_transaction* tr, as_storage_rd* rd,
1455 bool must_fetch_data, rw_request* rw, bool* is_delete,
1456 xdr_dirty_bins* dirty_bins)
1457{
1458 // Shortcut pointers.
1459 as_namespace* ns = tr->rsv.ns;
1460 as_record* r = rd->r;
1461
1462 rd->ignore_record_on_device = ! must_fetch_data;
1463 rd->n_bins = 1;
1464
1465 as_bin stack_bin;
1466
1467 // Set rd->bins!
1468 // For non-data-in-memory:
1469 // - if just created record, or must_fetch_data is false - sets rd->bins to
1470 // empty stack_bin
1471 // - otherwise - sets rd->bins to stack_bin, reads existing record off
1472 // device and populates bin (including particle pointer into block
1473 // buffer)
1474 int result = as_storage_rd_load_bins(rd, &stack_bin);
1475
1476 if (result < 0) {
1477 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_rd_load_bins()", ns->name);
1478 return -result;
1479 }
1480
1481 uint32_t n_old_bins = as_bin_inuse(rd->bins) ? 1 : 0;
1482
1483 //------------------------------------------------------
1484 // Apply changes to metadata in as_index needed for
1485 // response, pickling, and writing.
1486 //
1487
1488 index_metadata old_metadata;
1489
1490 write_master_update_index_metadata(tr, &old_metadata, r);
1491
1492 //------------------------------------------------------
1493 // Loop over bin ops to affect new bin space, creating
1494 // the new record bin to write.
1495 //
1496
1497 cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE);
1498
1499 uint32_t n_new_bins = 0;
1500
1501 if ((result = write_master_bin_ops(tr, rd, &particles_llb, NULL, NULL,
1502 &rw->response_db, &n_new_bins, dirty_bins)) != 0) {
1503 cf_ll_buf_free(&particles_llb);
1504 write_master_index_metadata_unwind(&old_metadata, r);
1505 return result;
1506 }
1507
1508 //------------------------------------------------------
1509 // Created the new bin to write.
1510 //
1511
1512 if (n_new_bins == 0) {
1513 if (n_old_bins == 0) {
1514 cf_ll_buf_free(&particles_llb);
1515 write_master_index_metadata_unwind(&old_metadata, r);
1516 return AS_ERR_NOT_FOUND;
1517 }
1518
1519 if (! validate_delete_durability(tr)) {
1520 cf_ll_buf_free(&particles_llb);
1521 write_master_index_metadata_unwind(&old_metadata, r);
1522 return AS_ERR_FORBIDDEN;
1523 }
1524
1525 *is_delete = true;
1526 }
1527
1528 //------------------------------------------------------
1529 // Write the record to storage.
1530 //
1531
1532 if ((result = as_storage_record_write(rd)) < 0) {
1533 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() ", ns->name);
1534 cf_ll_buf_free(&particles_llb);
1535 write_master_index_metadata_unwind(&old_metadata, r);
1536 return -result;
1537 }
1538
1539 pickle_all(rd, rw);
1540
1541 //------------------------------------------------------
1542 // Final changes to record data in as_index.
1543 //
1544
1545 // Accommodate a new stored key - wasn't needed for pickling and writing.
1546 if (r->key_stored == 0 && rd->key) {
1547 r->key_stored = 1;
1548 }
1549
1550 cf_ll_buf_free(&particles_llb);
1551
1552 return 0;
1553}
1554
1555
1556int
1557write_master_ssd(as_transaction* tr, as_storage_rd* rd, bool must_fetch_data,
1558 bool record_level_replace, rw_request* rw, bool* is_delete,
1559 xdr_dirty_bins* dirty_bins)
1560{
1561 // Shortcut pointers.
1562 as_msg* m = &tr->msgp->msg;
1563 as_namespace* ns = tr->rsv.ns;
1564 as_record* r = rd->r;
1565 bool has_sindex = record_has_sindex(r, ns);
1566
1567 // For sindex, we must read existing record even if replacing.
1568 if (! must_fetch_data) {
1569 must_fetch_data = has_sindex;
1570 }
1571
1572 rd->ignore_record_on_device = ! must_fetch_data;
1573
1574 // Set rd->n_bins!
1575 // For non-data-in-memory:
1576 // - if just created record, or must_fetch_data is false - 0
1577 // - otherwise - number of bins in existing record
1578 int result = as_storage_rd_load_n_bins(rd);
1579
1580 if (result < 0) {
1581 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_rd_load_n_bins()", ns->name);
1582 return -result;
1583 }
1584
1585 uint32_t n_old_bins = (uint32_t)rd->n_bins;
1586 uint32_t n_new_bins = n_old_bins + m->n_ops; // can't be more than this
1587
1588 // Needed for as_storage_rd_load_bins() to clear all unused bins.
1589 rd->n_bins = (uint16_t)n_new_bins;
1590
1591 // Stack space for resulting record's bins.
1592 as_bin old_bins[n_old_bins];
1593 as_bin new_bins[n_new_bins];
1594
1595 // Set rd->bins!
1596 // For non-data-in-memory:
1597 // - if just created record, or must_fetch_data is false - sets rd->bins to
1598 // empty new_bins
1599 // - otherwise - sets rd->bins to new_bins, reads existing record off device
1600 // and populates bins (including particle pointers into block buffer)
1601 if ((result = as_storage_rd_load_bins(rd, new_bins)) < 0) {
1602 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_rd_load_bins()", ns->name);
1603 return -result;
1604 }
1605
1606 //------------------------------------------------------
1607 // Copy old bins (if any) - which are currently in new
1608 // bins array - to old bins array, for sindex purposes.
1609 //
1610
1611 if (has_sindex && n_old_bins != 0) {
1612 memcpy(old_bins, new_bins, n_old_bins * sizeof(as_bin));
1613
1614 // If it's a replace, clear the new bins array.
1615 if (record_level_replace) {
1616 as_bin_set_all_empty(rd);
1617 }
1618 }
1619
1620 //------------------------------------------------------
1621 // Apply changes to metadata in as_index needed for
1622 // response, pickling, and writing.
1623 //
1624
1625 index_metadata old_metadata;
1626
1627 write_master_update_index_metadata(tr, &old_metadata, r);
1628
1629 //------------------------------------------------------
1630 // Loop over bin ops to affect new bin space, creating
1631 // the new record bins to write.
1632 //
1633
1634 cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE);
1635
1636 if ((result = write_master_bin_ops(tr, rd, &particles_llb, NULL, NULL,
1637 &rw->response_db, &n_new_bins, dirty_bins)) != 0) {
1638 cf_ll_buf_free(&particles_llb);
1639 write_master_index_metadata_unwind(&old_metadata, r);
1640 return result;
1641 }
1642
1643 //------------------------------------------------------
1644 // Created the new bins to write.
1645 //
1646
1647 // Adjust - the actual number of new bins.
1648 rd->n_bins = n_new_bins;
1649
1650 if (n_new_bins == 0) {
1651 if (n_old_bins == 0) {
1652 cf_ll_buf_free(&particles_llb);
1653 write_master_index_metadata_unwind(&old_metadata, r);
1654 return AS_ERR_NOT_FOUND;
1655 }
1656
1657 if (! validate_delete_durability(tr)) {
1658 cf_ll_buf_free(&particles_llb);
1659 write_master_index_metadata_unwind(&old_metadata, r);
1660 return AS_ERR_FORBIDDEN;
1661 }
1662
1663 *is_delete = true;
1664 }
1665
1666 //------------------------------------------------------
1667 // Write the record to storage.
1668 //
1669
1670 if ((result = as_storage_record_write(rd)) < 0) {
1671 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() ", ns->name);
1672 cf_ll_buf_free(&particles_llb);
1673 write_master_index_metadata_unwind(&old_metadata, r);
1674 return -result;
1675 }
1676
1677 pickle_all(rd, rw);
1678
1679 //------------------------------------------------------
1680 // Success - adjust sindex, looking at old and new bins.
1681 //
1682
1683 if (has_sindex &&
1684 write_sindex_update(ns, rd->set_name, &tr->keyd, old_bins,
1685 n_old_bins, new_bins, n_new_bins)) {
1686 tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED;
1687 }
1688
1689 //------------------------------------------------------
1690 // Final changes to record data in as_index.
1691 //
1692
1693 // Accommodate a new stored key - wasn't needed for pickling and writing.
1694 if (r->key_stored == 0 && rd->key) {
1695 r->key_stored = 1;
1696 }
1697
1698 cf_ll_buf_free(&particles_llb);
1699
1700 return 0;
1701}
1702
1703
1704//==========================================================
1705// write_master() - apply record updates.
1706//
1707
1708void
1709write_master_update_index_metadata(as_transaction* tr, index_metadata* old,
1710 as_record* r)
1711{
1712 old->void_time = r->void_time;
1713 old->last_update_time = r->last_update_time;
1714 old->generation = r->generation;
1715
1716 update_metadata_in_index(tr, r);
1717}
1718
1719
1720int
1721write_master_bin_ops(as_transaction* tr, as_storage_rd* rd,
1722 cf_ll_buf* particles_llb, as_bin* cleanup_bins,
1723 uint32_t* p_n_cleanup_bins, cf_dyn_buf* db, uint32_t* p_n_final_bins,
1724 xdr_dirty_bins* dirty_bins)
1725{
1726 // Shortcut pointers.
1727 as_msg* m = &tr->msgp->msg;
1728 as_namespace* ns = tr->rsv.ns;
1729 as_record* r = rd->r;
1730 bool has_read_all_op = (m->info1 & AS_MSG_INFO1_GET_ALL) != 0;
1731
1732 as_msg_op* ops[m->n_ops];
1733 as_bin response_bins[has_read_all_op ? rd->n_bins : m->n_ops];
1734 as_bin result_bins[m->n_ops];
1735
1736 uint32_t n_response_bins = 0;
1737 uint32_t n_result_bins = 0;
1738
1739 int result = write_master_bin_ops_loop(tr, rd, ops, response_bins,
1740 &n_response_bins, result_bins, &n_result_bins, particles_llb,
1741 cleanup_bins, p_n_cleanup_bins, dirty_bins);
1742
1743 if (result != 0) {
1744 destroy_stack_bins(result_bins, n_result_bins);
1745 return result;
1746 }
1747
1748 *p_n_final_bins = as_bin_inuse_count(rd);
1749
1750 if (n_response_bins == 0) {
1751 // If 'ordered-ops' flag was not set, and there were no read ops or CDT
1752 // ops with results, there's no response to build and send later.
1753 return 0;
1754 }
1755
1756 as_bin* bins[n_response_bins];
1757
1758 for (uint32_t i = 0; i < n_response_bins; i++) {
1759 as_bin* b = &response_bins[i];
1760
1761 bins[i] = as_bin_inuse(b) ? b : NULL;
1762 }
1763
1764 uint32_t generation = r->generation;
1765 uint32_t void_time = r->void_time;
1766
1767 // Deletes don't return metadata.
1768 if (*p_n_final_bins == 0) {
1769 generation = 0;
1770 void_time = 0;
1771 }
1772
1773 size_t msg_sz = 0;
1774 uint8_t* msgp = (uint8_t*)as_msg_make_response_msg(AS_OK, generation,
1775 void_time, has_read_all_op ? NULL : ops, bins,
1776 (uint16_t)n_response_bins, ns, NULL, &msg_sz,
1777 as_transaction_trid(tr));
1778
1779 destroy_stack_bins(result_bins, n_result_bins);
1780
1781 // Stash the message, to be sent later.
1782 db->buf = msgp;
1783 db->is_stack = false;
1784 db->alloc_sz = msg_sz;
1785 db->used_sz = msg_sz;
1786
1787 return 0;
1788}
1789
1790
1791int
1792write_master_bin_ops_loop(as_transaction* tr, as_storage_rd* rd,
1793 as_msg_op** ops, as_bin* response_bins, uint32_t* p_n_response_bins,
1794 as_bin* result_bins, uint32_t* p_n_result_bins,
1795 cf_ll_buf* particles_llb, as_bin* cleanup_bins,
1796 uint32_t* p_n_cleanup_bins, xdr_dirty_bins* dirty_bins)
1797{
1798 // Shortcut pointers.
1799 as_msg* m = &tr->msgp->msg;
1800 as_namespace* ns = tr->rsv.ns;
1801 bool respond_all_ops = (m->info2 & AS_MSG_INFO2_RESPOND_ALL_OPS) != 0;
1802
1803 int result;
1804
1805 as_msg_op* op = NULL;
1806 int i = 0;
1807
1808 while ((op = as_msg_op_iterate(m, op, &i)) != NULL) {
1809 if (op->op == AS_MSG_OP_TOUCH) {
1810 continue;
1811 }
1812
1813 if (op->op == AS_MSG_OP_WRITE) {
1814 // AS_PARTICLE_TYPE_NULL means delete the bin.
1815 // TODO - should this even be allowed for single-bin?
1816 if (op->particle_type == AS_PARTICLE_TYPE_NULL) {
1817 int32_t j = as_bin_get_index_from_buf(rd, op->name, op->name_sz);
1818
1819 if (j != -1) {
1820 if (ns->storage_data_in_memory) {
1821 // Double copy necessary for single-bin, but doing it
1822 // generally for code simplicity.
1823 as_bin cleanup_bin;
1824 as_bin_copy(ns, &cleanup_bin, &rd->bins[j]);
1825
1826 append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins);
1827 }
1828
1829 as_bin_set_empty_shift(rd, j);
1830 xdr_fill_dirty_bins(dirty_bins);
1831 }
1832 }
1833 // It's a regular bin write.
1834 else {
1835 as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result);
1836
1837 if (! b) {
1838 return result;
1839 }
1840
1841 if (ns->storage_data_in_memory) {
1842 as_bin cleanup_bin;
1843 as_bin_copy(ns, &cleanup_bin, b);
1844
1845 if ((result = as_bin_particle_alloc_from_client(b, op)) < 0) {
1846 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_alloc_from_client() ", ns->name);
1847 return -result;
1848 }
1849
1850 append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins);
1851 }
1852 else {
1853 if ((result = as_bin_particle_stack_from_client(b, particles_llb, op)) < 0) {
1854 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_stack_from_client() ", ns->name);
1855 return -result;
1856 }
1857 }
1858
1859 xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz);
1860 }
1861
1862 if (respond_all_ops) {
1863 ops[*p_n_response_bins] = op;
1864 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
1865 }
1866 }
1867 // Modify an existing bin value.
1868 else if (OP_IS_MODIFY(op->op)) {
1869 as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result);
1870
1871 if (! b) {
1872 return result;
1873 }
1874
1875 if (ns->storage_data_in_memory) {
1876 as_bin cleanup_bin;
1877 as_bin_copy(ns, &cleanup_bin, b);
1878
1879 if ((result = as_bin_particle_alloc_modify_from_client(b, op)) < 0) {
1880 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_alloc_modify_from_client() ", ns->name);
1881 return -result;
1882 }
1883
1884 append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins);
1885 }
1886 else {
1887 if ((result = as_bin_particle_stack_modify_from_client(b, particles_llb, op)) < 0) {
1888 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_stack_modify_from_client() ", ns->name);
1889 return -result;
1890 }
1891 }
1892
1893 xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz);
1894
1895 if (respond_all_ops) {
1896 ops[*p_n_response_bins] = op;
1897 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
1898 }
1899 }
1900 else if (op->op == AS_MSG_OP_DELETE_ALL) {
1901 if (ns->storage_data_in_memory) {
1902 for (uint16_t i = 0; i < rd->n_bins; i++) {
1903 as_bin* b = &rd->bins[i];
1904
1905 if (! as_bin_inuse(b)) {
1906 break;
1907 }
1908
1909 // Double copy necessary for single-bin, but doing it
1910 // generally for code simplicity.
1911 as_bin cleanup_bin;
1912 as_bin_copy(ns, &cleanup_bin, b);
1913
1914 append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins);
1915 }
1916 }
1917
1918 as_bin_set_all_empty(rd);
1919 xdr_fill_dirty_bins(dirty_bins);
1920
1921 if (respond_all_ops) {
1922 ops[*p_n_response_bins] = op;
1923 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
1924 }
1925 }
1926 else if (op_is_read_all(op, m)) {
1927 for (uint16_t i = 0; i < rd->n_bins; i++) {
1928 as_bin* b = &rd->bins[i];
1929
1930 if (! as_bin_inuse(b)) {
1931 break;
1932 }
1933
1934 // ops array will not be not used in this case.
1935 as_bin_copy(ns, &response_bins[(*p_n_response_bins)++], b);
1936 }
1937 }
1938 else if (op->op == AS_MSG_OP_READ) {
1939 as_bin* b = as_bin_get_from_buf(rd, op->name, op->name_sz);
1940
1941 if (b) {
1942 ops[*p_n_response_bins] = op;
1943 as_bin_copy(ns, &response_bins[(*p_n_response_bins)++], b);
1944 }
1945 else if (respond_all_ops) {
1946 ops[*p_n_response_bins] = op;
1947 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
1948 }
1949 }
1950 else if (op->op == AS_MSG_OP_BITS_MODIFY) {
1951 as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result);
1952
1953 if (! b) {
1954 return result;
1955 }
1956
1957 if (ns->storage_data_in_memory) {
1958 as_bin cleanup_bin;
1959 as_bin_copy(ns, &cleanup_bin, b);
1960
1961 if ((result = as_bin_bits_alloc_modify_from_client(b, op)) < 0) {
1962 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_bits_alloc_modify_from_client() ", ns->name);
1963 return -result;
1964 }
1965
1966 // Account for noop bits operations. Modifying non-mutable
1967 // particle contents in-place is still disallowed.
1968 if (cleanup_bin.particle != b->particle) {
1969 append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins);
1970 }
1971 }
1972 else {
1973 if ((result = as_bin_bits_stack_modify_from_client(b, particles_llb, op)) < 0) {
1974 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_bits_stack_modify_from_client() ", ns->name);
1975 return -result;
1976 }
1977 }
1978
1979 xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz);
1980
1981 if (respond_all_ops) {
1982 ops[*p_n_response_bins] = op;
1983 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
1984 }
1985 }
1986 else if (op->op == AS_MSG_OP_BITS_READ) {
1987 as_bin* b = as_bin_get_from_buf(rd, op->name, op->name_sz);
1988
1989 if (b) {
1990 as_bin result_bin;
1991 as_bin_set_empty(&result_bin);
1992
1993 if ((result = as_bin_bits_read_from_client(b, op, &result_bin)) < 0) {
1994 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_bits_read_from_client() ", ns->name);
1995 return -result;
1996 }
1997
1998 ops[*p_n_response_bins] = op;
1999 response_bins[(*p_n_response_bins)++] = result_bin;
2000 append_bin_to_destroy(&result_bin, result_bins, p_n_result_bins);
2001 }
2002 else if (respond_all_ops) {
2003 ops[*p_n_response_bins] = op;
2004 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
2005 }
2006 }
2007 else if (op->op == AS_MSG_OP_CDT_MODIFY) {
2008 as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result);
2009
2010 if (! b) {
2011 return result;
2012 }
2013
2014 as_bin result_bin;
2015 as_bin_set_empty(&result_bin);
2016
2017 if (ns->storage_data_in_memory) {
2018 as_bin cleanup_bin;
2019 as_bin_copy(ns, &cleanup_bin, b);
2020
2021 if ((result = as_bin_cdt_alloc_modify_from_client(b, op, &result_bin)) < 0) {
2022 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_cdt_alloc_modify_from_client() ", ns->name);
2023 return -result;
2024 }
2025
2026 // Account for noop CDT operations. Modifying non-mutable
2027 // particle contents in-place is still disallowed.
2028 if (cleanup_bin.particle != b->particle) {
2029 append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins);
2030 }
2031 }
2032 else {
2033 if ((result = as_bin_cdt_stack_modify_from_client(b, particles_llb, op, &result_bin)) < 0) {
2034 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_cdt_stack_modify_from_client() ", ns->name);
2035 return -result;
2036 }
2037 }
2038
2039 if (respond_all_ops || as_bin_inuse(&result_bin)) {
2040 ops[*p_n_response_bins] = op;
2041 response_bins[(*p_n_response_bins)++] = result_bin;
2042 append_bin_to_destroy(&result_bin, result_bins, p_n_result_bins);
2043 }
2044
2045 if (! as_bin_inuse(b)) {
2046 // TODO - could do better than finding index from name.
2047 int32_t index = as_bin_get_index_from_buf(rd, op->name, op->name_sz);
2048
2049 if (index >= 0) {
2050 as_bin_set_empty_shift(rd, (uint32_t)index);
2051 xdr_fill_dirty_bins(dirty_bins);
2052 }
2053 }
2054 else {
2055 xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz);
2056 }
2057 }
2058 else if (op->op == AS_MSG_OP_CDT_READ) {
2059 as_bin* b = as_bin_get_from_buf(rd, op->name, op->name_sz);
2060
2061 if (b) {
2062 as_bin result_bin;
2063 as_bin_set_empty(&result_bin);
2064
2065 if ((result = as_bin_cdt_read_from_client(b, op, &result_bin)) < 0) {
2066 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_cdt_read_from_client() ", ns->name);
2067 return -result;
2068 }
2069
2070 ops[*p_n_response_bins] = op;
2071 response_bins[(*p_n_response_bins)++] = result_bin;
2072 append_bin_to_destroy(&result_bin, result_bins, p_n_result_bins);
2073 }
2074 else if (respond_all_ops) {
2075 ops[*p_n_response_bins] = op;
2076 as_bin_set_empty(&response_bins[(*p_n_response_bins)++]);
2077 }
2078 }
2079 else {
2080 cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: unknown bin op %u ", ns->name, op->op);
2081 return AS_ERR_PARAMETER;
2082 }
2083 }
2084
2085 return 0;
2086}
2087
2088
2089//==========================================================
2090// write_master() - unwind on failure or cleanup.
2091//
2092
2093void
2094write_master_index_metadata_unwind(index_metadata* old, as_record* r)
2095{
2096 r->void_time = old->void_time;
2097 r->last_update_time = old->last_update_time;
2098 r->generation = old->generation;
2099}
2100
2101
2102void
2103write_master_dim_single_bin_unwind(as_bin* old_bin, as_bin* new_bin,
2104 as_bin* cleanup_bins, uint32_t n_cleanup_bins)
2105{
2106 as_particle* p_old = as_bin_get_particle(old_bin);
2107
2108 if (as_bin_is_external_particle(new_bin) && new_bin->particle != p_old) {
2109 as_bin_particle_destroy(new_bin, true);
2110 }
2111
2112 for (uint32_t i_cleanup = 0; i_cleanup < n_cleanup_bins; i_cleanup++) {
2113 as_bin* b_cleanup = &cleanup_bins[i_cleanup];
2114
2115 if (b_cleanup->particle != p_old) {
2116 as_bin_particle_destroy(b_cleanup, true);
2117 }
2118 }
2119
2120 as_single_bin_copy(new_bin, old_bin);
2121}
2122
2123
2124void
2125write_master_dim_unwind(as_bin* old_bins, uint32_t n_old_bins, as_bin* new_bins,
2126 uint32_t n_new_bins, as_bin* cleanup_bins, uint32_t n_cleanup_bins)
2127{
2128 for (uint32_t i_new = 0; i_new < n_new_bins; i_new++) {
2129 as_bin* b_new = &new_bins[i_new];
2130
2131 if (! as_bin_inuse(b_new)) {
2132 break;
2133 }
2134
2135 // Embedded particles have no-op destructors - skip loop over old bins.
2136 if (as_bin_is_embedded_particle(b_new)) {
2137 continue;
2138 }
2139
2140 as_particle* p_new = b_new->particle;
2141 uint32_t i_old;
2142
2143 for (i_old = 0; i_old < n_old_bins; i_old++) {
2144 as_bin* b_old = &old_bins[i_old];
2145
2146 if (b_new->id == b_old->id) {
2147 if (p_new != as_bin_get_particle(b_old)) {
2148 as_bin_particle_destroy(b_new, true);
2149 }
2150
2151 break;
2152 }
2153 }
2154
2155 if (i_old == n_old_bins) {
2156 as_bin_particle_destroy(b_new, true);
2157 }
2158 }
2159
2160 for (uint32_t i_cleanup = 0; i_cleanup < n_cleanup_bins; i_cleanup++) {
2161 as_bin* b_cleanup = &cleanup_bins[i_cleanup];
2162 as_particle* p_cleanup = b_cleanup->particle;
2163 uint32_t i_old;
2164
2165 for (i_old = 0; i_old < n_old_bins; i_old++) {
2166 as_bin* b_old = &old_bins[i_old];
2167
2168 if (b_cleanup->id == b_old->id) {
2169 if (p_cleanup != as_bin_get_particle(b_old)) {
2170 as_bin_particle_destroy(b_cleanup, true);
2171 }
2172
2173 break;
2174 }
2175 }
2176
2177 if (i_old == n_old_bins) {
2178 as_bin_particle_destroy(b_cleanup, true);
2179 }
2180 }
2181
2182 // The index element's as_bin_space pointer still points at old bins.
2183}
2184