1/*
2 * delete.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/delete.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32
33#include "citrusleaf/alloc.h"
34#include "citrusleaf/cf_atomic.h"
35#include "citrusleaf/cf_clock.h"
36
37#include "cf_mutex.h"
38#include "dynbuf.h"
39#include "fault.h"
40
41#include "base/cfg.h"
42#include "base/datamodel.h"
43#include "base/index.h"
44#include "base/predexp.h"
45#include "base/proto.h"
46#include "base/secondary_index.h"
47#include "base/transaction.h"
48#include "base/transaction_policy.h"
49#include "base/xdr_serverside.h"
50#include "fabric/exchange.h" // TODO - old pickle - remove in "six months"
51#include "fabric/partition.h"
52#include "storage/storage.h"
53#include "transaction/duplicate_resolve.h"
54#include "transaction/proxy.h"
55#include "transaction/replica_write.h"
56#include "transaction/rw_request.h"
57#include "transaction/rw_request_hash.h"
58#include "transaction/rw_utils.h"
59
60
61//==========================================================
62// Forward declarations.
63//
64
65void start_delete_dup_res(rw_request* rw, as_transaction* tr);
66void start_delete_repl_write(rw_request* rw, as_transaction* tr);
67void start_delete_repl_write_forget(rw_request* rw, as_transaction* tr);
68bool delete_dup_res_cb(rw_request* rw);
69void delete_repl_write_after_dup_res(rw_request* rw, as_transaction* tr);
70void delete_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr);
71void delete_repl_write_cb(rw_request* rw);
72
73void send_delete_response(as_transaction* tr);
74void delete_timeout_cb(rw_request* rw);
75
76
77//==========================================================
78// Inlines & macros.
79//
80
81static inline void
82client_delete_update_stats(as_namespace* ns, uint8_t result_code,
83 bool is_xdr_op)
84{
85 switch (result_code) {
86 case AS_OK:
87 cf_atomic64_incr(&ns->n_client_delete_success);
88 if (is_xdr_op) {
89 cf_atomic64_incr(&ns->n_xdr_client_delete_success);
90 }
91 break;
92 default:
93 cf_atomic64_incr(&ns->n_client_delete_error);
94 if (is_xdr_op) {
95 cf_atomic64_incr(&ns->n_xdr_client_delete_error);
96 }
97 break;
98 case AS_ERR_TIMEOUT:
99 cf_atomic64_incr(&ns->n_client_delete_timeout);
100 if (is_xdr_op) {
101 cf_atomic64_incr(&ns->n_xdr_client_delete_timeout);
102 }
103 break;
104 case AS_ERR_NOT_FOUND:
105 cf_atomic64_incr(&ns->n_client_delete_not_found);
106 if (is_xdr_op) {
107 cf_atomic64_incr(&ns->n_xdr_client_delete_not_found);
108 }
109 break;
110 case AS_ERR_FILTERED_OUT:
111 // Can't be an XDR delete.
112 cf_atomic64_incr(&ns->n_client_delete_filtered_out);
113 break;
114 }
115}
116
117static inline void
118from_proxy_delete_update_stats(as_namespace* ns, uint8_t result_code,
119 bool is_xdr_op)
120{
121 switch (result_code) {
122 case AS_OK:
123 cf_atomic64_incr(&ns->n_from_proxy_delete_success);
124 if (is_xdr_op) {
125 cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_success);
126 }
127 break;
128 default:
129 cf_atomic64_incr(&ns->n_from_proxy_delete_error);
130 if (is_xdr_op) {
131 cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_error);
132 }
133 break;
134 case AS_ERR_TIMEOUT:
135 cf_atomic64_incr(&ns->n_from_proxy_delete_timeout);
136 if (is_xdr_op) {
137 cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_timeout);
138 }
139 break;
140 case AS_ERR_NOT_FOUND:
141 cf_atomic64_incr(&ns->n_from_proxy_delete_not_found);
142 if (is_xdr_op) {
143 cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_not_found);
144 }
145 break;
146 case AS_ERR_FILTERED_OUT:
147 // Can't be an XDR delete.
148 cf_atomic64_incr(&ns->n_from_proxy_delete_filtered_out);
149 break;
150 }
151}
152
153
154//==========================================================
155// Public API.
156//
157
158transaction_status
159as_delete_start(as_transaction* tr)
160{
161 // Apply XDR filter.
162 if (! xdr_allows_write(tr)) {
163 tr->result_code = AS_ERR_ALWAYS_FORBIDDEN;
164 send_delete_response(tr);
165 return TRANS_DONE_ERROR;
166 }
167
168 if (! validate_delete_durability(tr)) {
169 tr->result_code = AS_ERR_FORBIDDEN;
170 send_delete_response(tr);
171 return TRANS_DONE_ERROR;
172 }
173
174 if (delete_storage_overloaded(tr)) {
175 tr->result_code = AS_ERR_DEVICE_OVERLOAD;
176 send_delete_response(tr);
177 return TRANS_DONE_ERROR;
178 }
179
180 // Create rw_request and add to hash.
181 rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd };
182 rw_request* rw = rw_request_create(&tr->keyd);
183 transaction_status status = rw_request_hash_insert(&hkey, rw, tr);
184
185 // If rw_request wasn't inserted in hash, transaction is finished.
186 if (status != TRANS_IN_PROGRESS) {
187 rw_request_release(rw);
188
189 if (status != TRANS_WAITING) {
190 send_delete_response(tr);
191 }
192
193 return status;
194 }
195 // else - rw_request is now in hash, continue...
196
197 if (tr->rsv.ns->write_dup_res_disabled) {
198 // Note - preventing duplicate resolution this way allows
199 // rw_request_destroy() to handle dup_msg[] cleanup correctly.
200 tr->rsv.n_dupl = 0;
201 }
202
203 // If there are duplicates to resolve, start doing so.
204 // TODO - should we bother if there's no generation check?
205 if (tr->rsv.n_dupl != 0) {
206 start_delete_dup_res(rw, tr);
207
208 // Started duplicate resolution.
209 return TRANS_IN_PROGRESS;
210 }
211 // else - no duplicate resolution phase, apply operation to master.
212
213 // Set up the nodes to which we'll write replicas.
214 rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p,
215 rw->dest_nodes);
216
217 if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) {
218 rw_request_hash_delete(&hkey, rw);
219 tr->result_code = AS_ERR_UNAVAILABLE;
220 send_delete_response(tr);
221 return TRANS_DONE_ERROR;
222 }
223
224 // If error, transaction is finished.
225 if ((status = delete_master(tr, rw)) != TRANS_IN_PROGRESS) {
226 rw_request_hash_delete(&hkey, rw);
227
228 if (status != TRANS_WAITING) {
229 send_delete_response(tr);
230 }
231
232 return status;
233 }
234
235 // If we don't need replica writes, transaction is finished.
236 if (rw->n_dest_nodes == 0) {
237 finished_replicated(tr);
238 rw_request_hash_delete(&hkey, rw);
239 send_delete_response(tr);
240 return TRANS_DONE_SUCCESS;
241 }
242
243 // If we don't need to wait for replica write acks, fire and forget.
244 if (respond_on_master_complete(tr)) {
245 start_delete_repl_write_forget(rw, tr);
246 rw_request_hash_delete(&hkey, rw);
247 send_delete_response(tr);
248 return TRANS_DONE_SUCCESS;
249 }
250
251 start_delete_repl_write(rw, tr);
252
253 // Started replica write.
254 return TRANS_IN_PROGRESS;
255}
256
257
258//==========================================================
259// Local helpers - transaction flow.
260//
261
262void
263start_delete_dup_res(rw_request* rw, as_transaction* tr)
264{
265 // Finish initializing rw, construct and send dup-res message.
266
267 dup_res_make_message(rw, tr);
268
269 cf_mutex_lock(&rw->lock);
270
271 dup_res_setup_rw(rw, tr, delete_dup_res_cb, delete_timeout_cb);
272 send_rw_messages(rw);
273
274 cf_mutex_unlock(&rw->lock);
275}
276
277
278void
279start_delete_repl_write(rw_request* rw, as_transaction* tr)
280{
281 // Finish initializing rw, construct and send repl-delete message.
282
283 repl_write_make_message(rw, tr);
284
285 cf_mutex_lock(&rw->lock);
286
287 repl_write_setup_rw(rw, tr, delete_repl_write_cb, delete_timeout_cb);
288 send_rw_messages(rw);
289
290 cf_mutex_unlock(&rw->lock);
291}
292
293
294void
295start_delete_repl_write_forget(rw_request* rw, as_transaction* tr)
296{
297 // Construct and send repl-write message. No need to finish rw setup.
298
299 repl_write_make_message(rw, tr);
300 send_rw_messages_forget(rw);
301}
302
303
304bool
305delete_dup_res_cb(rw_request* rw)
306{
307 as_transaction tr;
308 as_transaction_init_from_rw(&tr, rw);
309
310 if (tr.result_code != AS_OK) {
311 send_delete_response(&tr);
312 return true;
313 }
314
315 // Set up the nodes to which we'll write replicas.
316 rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p,
317 rw->dest_nodes);
318
319 if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) {
320 tr.result_code = AS_ERR_UNAVAILABLE;
321 send_delete_response(&tr);
322 return true;
323 }
324
325 transaction_status status = delete_master(&tr, rw);
326
327 if (status == TRANS_WAITING) {
328 // Note - new tr now owns msgp, make sure rw destructor doesn't free it.
329 // Also, rw will release rsv - new tr will get a new one.
330 rw->msgp = NULL;
331 return true;
332 }
333
334 if (status == TRANS_DONE_ERROR) {
335 send_delete_response(&tr);
336 return true;
337 }
338
339 // If we don't need replica writes, transaction is finished.
340 if (rw->n_dest_nodes == 0) {
341 finished_replicated(&tr);
342 send_delete_response(&tr);
343 return true;
344 }
345
346 // If we don't need to wait for replica write acks, fire and forget.
347 // (Remember that nsup deletes can't get here, so no need to check.)
348 if (respond_on_master_complete(&tr)) {
349 delete_repl_write_forget_after_dup_res(rw, &tr);
350 send_delete_response(&tr);
351 return true;
352 }
353
354 delete_repl_write_after_dup_res(rw, &tr);
355
356 // Started replica write - don't delete rw_request from hash.
357 return false;
358}
359
360
361void
362delete_repl_write_after_dup_res(rw_request* rw, as_transaction* tr)
363{
364 // Recycle rw_request that was just used for duplicate resolution to now do
365 // replica writes. Note - we are under the rw_request lock here!
366
367 repl_write_make_message(rw, tr);
368 repl_write_reset_rw(rw, tr, delete_repl_write_cb);
369 send_rw_messages(rw);
370}
371
372
373void
374delete_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr)
375{
376 // Send replica writes. Not waiting for acks, so need to reset rw_request.
377 // Note - we are under the rw_request lock here!
378
379 repl_write_make_message(rw, tr);
380 send_rw_messages_forget(rw);
381}
382
383
384void
385delete_repl_write_cb(rw_request* rw)
386{
387 as_transaction tr;
388 as_transaction_init_from_rw(&tr, rw);
389
390 finished_replicated(&tr);
391 send_delete_response(&tr);
392
393 // Finished transaction - rw_request cleans up reservation and msgp!
394}
395
396
397//==========================================================
398// Local helpers - transaction end.
399//
400
401void
402send_delete_response(as_transaction* tr)
403{
404 // Paranoia - shouldn't get here on losing race with timeout.
405 if (! tr->from.any) {
406 cf_warning(AS_RW, "transaction origin %u has null 'from'", tr->origin);
407 return;
408 }
409
410 // Note - if tr was setup from rw, rw->from.any has been set null and
411 // informs timeout it lost the race.
412
413 switch (tr->origin) {
414 case FROM_CLIENT:
415 as_msg_send_reply(tr->from.proto_fd_h, tr->result_code, 0, 0, NULL,
416 NULL, 0, tr->rsv.ns, as_transaction_trid(tr));
417 client_delete_update_stats(tr->rsv.ns, tr->result_code,
418 as_transaction_is_xdr(tr));
419 break;
420 case FROM_PROXY:
421 as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid,
422 tr->result_code, 0, 0, NULL, NULL, 0, tr->rsv.ns,
423 as_transaction_trid(tr));
424 from_proxy_delete_update_stats(tr->rsv.ns, tr->result_code,
425 as_transaction_is_xdr(tr));
426 break;
427 default:
428 cf_crash(AS_RW, "unexpected transaction origin %u", tr->origin);
429 break;
430 }
431
432 tr->from.any = NULL; // pattern, not needed
433}
434
435
436void
437delete_timeout_cb(rw_request* rw)
438{
439 if (! rw->from.any) {
440 return; // lost race against dup-res or repl-write callback
441 }
442
443 finished_not_replicated(rw);
444
445 switch (rw->origin) {
446 case FROM_CLIENT:
447 as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL,
448 0, rw->rsv.ns, rw_request_trid(rw));
449 client_delete_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT,
450 as_msg_is_xdr(&rw->msgp->msg));
451 break;
452 case FROM_PROXY:
453 from_proxy_delete_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT,
454 as_msg_is_xdr(&rw->msgp->msg));
455 break;
456 default:
457 cf_crash(AS_RW, "unexpected transaction origin %u", rw->origin);
458 break;
459 }
460
461 rw->from.any = NULL; // inform other callback it lost the race
462}
463
464
465//==========================================================
466// Local helpers - delete master.
467//
468
469transaction_status
470drop_master(as_transaction* tr, as_index_ref* r_ref, rw_request* rw)
471{
472 as_msg* m = &tr->msgp->msg;
473 as_namespace* ns = tr->rsv.ns;
474 as_index_tree* tree = tr->rsv.tree;
475 as_record* r = r_ref->r;
476
477 // Check generation requirement, if any.
478 if (! generation_check(r, m, ns)) {
479 as_record_done(r_ref, ns);
480 cf_atomic64_incr(&ns->n_fail_generation);
481 tr->result_code = AS_ERR_GENERATION;
482 return TRANS_DONE_ERROR;
483 }
484
485 // Apply predexp metadata filter if present.
486
487 predexp_eval_t* predexp = NULL;
488 int result = build_predexp_and_filter_meta(tr, r, &predexp);
489
490 if (result != 0) {
491 as_record_done(r_ref, ns);
492 tr->result_code = result;
493 return TRANS_DONE_ERROR;
494 }
495
496 bool check_key = as_transaction_has_key(tr);
497
498 if (ns->storage_data_in_memory || predexp != NULL || check_key) {
499 as_storage_rd rd;
500 as_storage_record_open(ns, r, &rd);
501
502 // Apply predexp record bins filter if present.
503 if (predexp != NULL) {
504 if ((result = predexp_read_and_filter_bins(&rd, predexp)) != 0) {
505 predexp_destroy(predexp);
506 as_storage_record_close(&rd);
507 as_record_done(r_ref, ns);
508 tr->result_code = result;
509 return TRANS_DONE_ERROR;
510 }
511
512 predexp_destroy(predexp);
513 }
514
515 // Check the key if required.
516 // Note - for data-not-in-memory a key check is expensive!
517 if (check_key && as_storage_record_get_key(&rd) &&
518 ! check_msg_key(m, &rd)) {
519 as_storage_record_close(&rd);
520 as_record_done(r_ref, ns);
521 tr->result_code = AS_ERR_KEY_MISMATCH;
522 return TRANS_DONE_ERROR;
523 }
524
525 if (ns->storage_data_in_memory) {
526 delete_adjust_sindex(&rd);
527 }
528
529 as_storage_record_close(&rd);
530 }
531
532 // TODO - old pickle - remove in "six months".
533 if (as_exchange_min_compatibility_id() < 3 && rw->n_dest_nodes != 0) {
534 // Generate a binless pickle, but don't generate pickled rec-props -
535 // these are useless for a drop.
536 rw->is_old_pickle = true;
537 rw->pickle_sz = sizeof(uint16_t);
538 rw->pickle = cf_malloc(rw->pickle_sz);
539 *(uint16_t*)rw->pickle = 0;
540 }
541
542 // Save the set-ID for XDR.
543 uint16_t set_id = as_index_get_set_id(r);
544
545 as_index_delete(tree, &tr->keyd);
546 as_record_done(r_ref, ns);
547
548 if (xdr_must_ship_delete(ns, as_msg_is_xdr(m))) {
549 xdr_write(ns, &tr->keyd, 0, 0, XDR_OP_TYPE_DROP, set_id, NULL);
550 }
551
552 return TRANS_IN_PROGRESS;
553}
554