1 | /* |
2 | * delete.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/delete.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | |
33 | #include "citrusleaf/alloc.h" |
34 | #include "citrusleaf/cf_atomic.h" |
35 | #include "citrusleaf/cf_clock.h" |
36 | |
37 | #include "cf_mutex.h" |
38 | #include "dynbuf.h" |
39 | #include "fault.h" |
40 | |
41 | #include "base/cfg.h" |
42 | #include "base/datamodel.h" |
43 | #include "base/index.h" |
44 | #include "base/predexp.h" |
45 | #include "base/proto.h" |
46 | #include "base/secondary_index.h" |
47 | #include "base/transaction.h" |
48 | #include "base/transaction_policy.h" |
49 | #include "base/xdr_serverside.h" |
50 | #include "fabric/exchange.h" // TODO - old pickle - remove in "six months" |
51 | #include "fabric/partition.h" |
52 | #include "storage/storage.h" |
53 | #include "transaction/duplicate_resolve.h" |
54 | #include "transaction/proxy.h" |
55 | #include "transaction/replica_write.h" |
56 | #include "transaction/rw_request.h" |
57 | #include "transaction/rw_request_hash.h" |
58 | #include "transaction/rw_utils.h" |
59 | |
60 | |
61 | //========================================================== |
62 | // Forward declarations. |
63 | // |
64 | |
65 | void start_delete_dup_res(rw_request* rw, as_transaction* tr); |
66 | void start_delete_repl_write(rw_request* rw, as_transaction* tr); |
67 | void start_delete_repl_write_forget(rw_request* rw, as_transaction* tr); |
68 | bool delete_dup_res_cb(rw_request* rw); |
69 | void delete_repl_write_after_dup_res(rw_request* rw, as_transaction* tr); |
70 | void delete_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr); |
71 | void delete_repl_write_cb(rw_request* rw); |
72 | |
73 | void send_delete_response(as_transaction* tr); |
74 | void delete_timeout_cb(rw_request* rw); |
75 | |
76 | |
77 | //========================================================== |
78 | // Inlines & macros. |
79 | // |
80 | |
81 | static inline void |
82 | client_delete_update_stats(as_namespace* ns, uint8_t result_code, |
83 | bool is_xdr_op) |
84 | { |
85 | switch (result_code) { |
86 | case AS_OK: |
87 | cf_atomic64_incr(&ns->n_client_delete_success); |
88 | if (is_xdr_op) { |
89 | cf_atomic64_incr(&ns->n_xdr_client_delete_success); |
90 | } |
91 | break; |
92 | default: |
93 | cf_atomic64_incr(&ns->n_client_delete_error); |
94 | if (is_xdr_op) { |
95 | cf_atomic64_incr(&ns->n_xdr_client_delete_error); |
96 | } |
97 | break; |
98 | case AS_ERR_TIMEOUT: |
99 | cf_atomic64_incr(&ns->n_client_delete_timeout); |
100 | if (is_xdr_op) { |
101 | cf_atomic64_incr(&ns->n_xdr_client_delete_timeout); |
102 | } |
103 | break; |
104 | case AS_ERR_NOT_FOUND: |
105 | cf_atomic64_incr(&ns->n_client_delete_not_found); |
106 | if (is_xdr_op) { |
107 | cf_atomic64_incr(&ns->n_xdr_client_delete_not_found); |
108 | } |
109 | break; |
110 | case AS_ERR_FILTERED_OUT: |
111 | // Can't be an XDR delete. |
112 | cf_atomic64_incr(&ns->n_client_delete_filtered_out); |
113 | break; |
114 | } |
115 | } |
116 | |
117 | static inline void |
118 | from_proxy_delete_update_stats(as_namespace* ns, uint8_t result_code, |
119 | bool is_xdr_op) |
120 | { |
121 | switch (result_code) { |
122 | case AS_OK: |
123 | cf_atomic64_incr(&ns->n_from_proxy_delete_success); |
124 | if (is_xdr_op) { |
125 | cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_success); |
126 | } |
127 | break; |
128 | default: |
129 | cf_atomic64_incr(&ns->n_from_proxy_delete_error); |
130 | if (is_xdr_op) { |
131 | cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_error); |
132 | } |
133 | break; |
134 | case AS_ERR_TIMEOUT: |
135 | cf_atomic64_incr(&ns->n_from_proxy_delete_timeout); |
136 | if (is_xdr_op) { |
137 | cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_timeout); |
138 | } |
139 | break; |
140 | case AS_ERR_NOT_FOUND: |
141 | cf_atomic64_incr(&ns->n_from_proxy_delete_not_found); |
142 | if (is_xdr_op) { |
143 | cf_atomic64_incr(&ns->n_xdr_from_proxy_delete_not_found); |
144 | } |
145 | break; |
146 | case AS_ERR_FILTERED_OUT: |
147 | // Can't be an XDR delete. |
148 | cf_atomic64_incr(&ns->n_from_proxy_delete_filtered_out); |
149 | break; |
150 | } |
151 | } |
152 | |
153 | |
154 | //========================================================== |
155 | // Public API. |
156 | // |
157 | |
158 | transaction_status |
159 | as_delete_start(as_transaction* tr) |
160 | { |
161 | // Apply XDR filter. |
162 | if (! xdr_allows_write(tr)) { |
163 | tr->result_code = AS_ERR_ALWAYS_FORBIDDEN; |
164 | send_delete_response(tr); |
165 | return TRANS_DONE_ERROR; |
166 | } |
167 | |
168 | if (! validate_delete_durability(tr)) { |
169 | tr->result_code = AS_ERR_FORBIDDEN; |
170 | send_delete_response(tr); |
171 | return TRANS_DONE_ERROR; |
172 | } |
173 | |
174 | if (delete_storage_overloaded(tr)) { |
175 | tr->result_code = AS_ERR_DEVICE_OVERLOAD; |
176 | send_delete_response(tr); |
177 | return TRANS_DONE_ERROR; |
178 | } |
179 | |
180 | // Create rw_request and add to hash. |
181 | rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd }; |
182 | rw_request* rw = rw_request_create(&tr->keyd); |
183 | transaction_status status = rw_request_hash_insert(&hkey, rw, tr); |
184 | |
185 | // If rw_request wasn't inserted in hash, transaction is finished. |
186 | if (status != TRANS_IN_PROGRESS) { |
187 | rw_request_release(rw); |
188 | |
189 | if (status != TRANS_WAITING) { |
190 | send_delete_response(tr); |
191 | } |
192 | |
193 | return status; |
194 | } |
195 | // else - rw_request is now in hash, continue... |
196 | |
197 | if (tr->rsv.ns->write_dup_res_disabled) { |
198 | // Note - preventing duplicate resolution this way allows |
199 | // rw_request_destroy() to handle dup_msg[] cleanup correctly. |
200 | tr->rsv.n_dupl = 0; |
201 | } |
202 | |
203 | // If there are duplicates to resolve, start doing so. |
204 | // TODO - should we bother if there's no generation check? |
205 | if (tr->rsv.n_dupl != 0) { |
206 | start_delete_dup_res(rw, tr); |
207 | |
208 | // Started duplicate resolution. |
209 | return TRANS_IN_PROGRESS; |
210 | } |
211 | // else - no duplicate resolution phase, apply operation to master. |
212 | |
213 | // Set up the nodes to which we'll write replicas. |
214 | rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p, |
215 | rw->dest_nodes); |
216 | |
217 | if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) { |
218 | rw_request_hash_delete(&hkey, rw); |
219 | tr->result_code = AS_ERR_UNAVAILABLE; |
220 | send_delete_response(tr); |
221 | return TRANS_DONE_ERROR; |
222 | } |
223 | |
224 | // If error, transaction is finished. |
225 | if ((status = delete_master(tr, rw)) != TRANS_IN_PROGRESS) { |
226 | rw_request_hash_delete(&hkey, rw); |
227 | |
228 | if (status != TRANS_WAITING) { |
229 | send_delete_response(tr); |
230 | } |
231 | |
232 | return status; |
233 | } |
234 | |
235 | // If we don't need replica writes, transaction is finished. |
236 | if (rw->n_dest_nodes == 0) { |
237 | finished_replicated(tr); |
238 | rw_request_hash_delete(&hkey, rw); |
239 | send_delete_response(tr); |
240 | return TRANS_DONE_SUCCESS; |
241 | } |
242 | |
243 | // If we don't need to wait for replica write acks, fire and forget. |
244 | if (respond_on_master_complete(tr)) { |
245 | start_delete_repl_write_forget(rw, tr); |
246 | rw_request_hash_delete(&hkey, rw); |
247 | send_delete_response(tr); |
248 | return TRANS_DONE_SUCCESS; |
249 | } |
250 | |
251 | start_delete_repl_write(rw, tr); |
252 | |
253 | // Started replica write. |
254 | return TRANS_IN_PROGRESS; |
255 | } |
256 | |
257 | |
258 | //========================================================== |
259 | // Local helpers - transaction flow. |
260 | // |
261 | |
262 | void |
263 | start_delete_dup_res(rw_request* rw, as_transaction* tr) |
264 | { |
265 | // Finish initializing rw, construct and send dup-res message. |
266 | |
267 | dup_res_make_message(rw, tr); |
268 | |
269 | cf_mutex_lock(&rw->lock); |
270 | |
271 | dup_res_setup_rw(rw, tr, delete_dup_res_cb, delete_timeout_cb); |
272 | send_rw_messages(rw); |
273 | |
274 | cf_mutex_unlock(&rw->lock); |
275 | } |
276 | |
277 | |
278 | void |
279 | start_delete_repl_write(rw_request* rw, as_transaction* tr) |
280 | { |
281 | // Finish initializing rw, construct and send repl-delete message. |
282 | |
283 | repl_write_make_message(rw, tr); |
284 | |
285 | cf_mutex_lock(&rw->lock); |
286 | |
287 | repl_write_setup_rw(rw, tr, delete_repl_write_cb, delete_timeout_cb); |
288 | send_rw_messages(rw); |
289 | |
290 | cf_mutex_unlock(&rw->lock); |
291 | } |
292 | |
293 | |
294 | void |
295 | start_delete_repl_write_forget(rw_request* rw, as_transaction* tr) |
296 | { |
297 | // Construct and send repl-write message. No need to finish rw setup. |
298 | |
299 | repl_write_make_message(rw, tr); |
300 | send_rw_messages_forget(rw); |
301 | } |
302 | |
303 | |
304 | bool |
305 | delete_dup_res_cb(rw_request* rw) |
306 | { |
307 | as_transaction tr; |
308 | as_transaction_init_from_rw(&tr, rw); |
309 | |
310 | if (tr.result_code != AS_OK) { |
311 | send_delete_response(&tr); |
312 | return true; |
313 | } |
314 | |
315 | // Set up the nodes to which we'll write replicas. |
316 | rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p, |
317 | rw->dest_nodes); |
318 | |
319 | if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) { |
320 | tr.result_code = AS_ERR_UNAVAILABLE; |
321 | send_delete_response(&tr); |
322 | return true; |
323 | } |
324 | |
325 | transaction_status status = delete_master(&tr, rw); |
326 | |
327 | if (status == TRANS_WAITING) { |
328 | // Note - new tr now owns msgp, make sure rw destructor doesn't free it. |
329 | // Also, rw will release rsv - new tr will get a new one. |
330 | rw->msgp = NULL; |
331 | return true; |
332 | } |
333 | |
334 | if (status == TRANS_DONE_ERROR) { |
335 | send_delete_response(&tr); |
336 | return true; |
337 | } |
338 | |
339 | // If we don't need replica writes, transaction is finished. |
340 | if (rw->n_dest_nodes == 0) { |
341 | finished_replicated(&tr); |
342 | send_delete_response(&tr); |
343 | return true; |
344 | } |
345 | |
346 | // If we don't need to wait for replica write acks, fire and forget. |
347 | // (Remember that nsup deletes can't get here, so no need to check.) |
348 | if (respond_on_master_complete(&tr)) { |
349 | delete_repl_write_forget_after_dup_res(rw, &tr); |
350 | send_delete_response(&tr); |
351 | return true; |
352 | } |
353 | |
354 | delete_repl_write_after_dup_res(rw, &tr); |
355 | |
356 | // Started replica write - don't delete rw_request from hash. |
357 | return false; |
358 | } |
359 | |
360 | |
361 | void |
362 | delete_repl_write_after_dup_res(rw_request* rw, as_transaction* tr) |
363 | { |
364 | // Recycle rw_request that was just used for duplicate resolution to now do |
365 | // replica writes. Note - we are under the rw_request lock here! |
366 | |
367 | repl_write_make_message(rw, tr); |
368 | repl_write_reset_rw(rw, tr, delete_repl_write_cb); |
369 | send_rw_messages(rw); |
370 | } |
371 | |
372 | |
373 | void |
374 | delete_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr) |
375 | { |
376 | // Send replica writes. Not waiting for acks, so need to reset rw_request. |
377 | // Note - we are under the rw_request lock here! |
378 | |
379 | repl_write_make_message(rw, tr); |
380 | send_rw_messages_forget(rw); |
381 | } |
382 | |
383 | |
384 | void |
385 | delete_repl_write_cb(rw_request* rw) |
386 | { |
387 | as_transaction tr; |
388 | as_transaction_init_from_rw(&tr, rw); |
389 | |
390 | finished_replicated(&tr); |
391 | send_delete_response(&tr); |
392 | |
393 | // Finished transaction - rw_request cleans up reservation and msgp! |
394 | } |
395 | |
396 | |
397 | //========================================================== |
398 | // Local helpers - transaction end. |
399 | // |
400 | |
401 | void |
402 | send_delete_response(as_transaction* tr) |
403 | { |
404 | // Paranoia - shouldn't get here on losing race with timeout. |
405 | if (! tr->from.any) { |
406 | cf_warning(AS_RW, "transaction origin %u has null 'from'" , tr->origin); |
407 | return; |
408 | } |
409 | |
410 | // Note - if tr was setup from rw, rw->from.any has been set null and |
411 | // informs timeout it lost the race. |
412 | |
413 | switch (tr->origin) { |
414 | case FROM_CLIENT: |
415 | as_msg_send_reply(tr->from.proto_fd_h, tr->result_code, 0, 0, NULL, |
416 | NULL, 0, tr->rsv.ns, as_transaction_trid(tr)); |
417 | client_delete_update_stats(tr->rsv.ns, tr->result_code, |
418 | as_transaction_is_xdr(tr)); |
419 | break; |
420 | case FROM_PROXY: |
421 | as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid, |
422 | tr->result_code, 0, 0, NULL, NULL, 0, tr->rsv.ns, |
423 | as_transaction_trid(tr)); |
424 | from_proxy_delete_update_stats(tr->rsv.ns, tr->result_code, |
425 | as_transaction_is_xdr(tr)); |
426 | break; |
427 | default: |
428 | cf_crash(AS_RW, "unexpected transaction origin %u" , tr->origin); |
429 | break; |
430 | } |
431 | |
432 | tr->from.any = NULL; // pattern, not needed |
433 | } |
434 | |
435 | |
436 | void |
437 | delete_timeout_cb(rw_request* rw) |
438 | { |
439 | if (! rw->from.any) { |
440 | return; // lost race against dup-res or repl-write callback |
441 | } |
442 | |
443 | finished_not_replicated(rw); |
444 | |
445 | switch (rw->origin) { |
446 | case FROM_CLIENT: |
447 | as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL, |
448 | 0, rw->rsv.ns, rw_request_trid(rw)); |
449 | client_delete_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT, |
450 | as_msg_is_xdr(&rw->msgp->msg)); |
451 | break; |
452 | case FROM_PROXY: |
453 | from_proxy_delete_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT, |
454 | as_msg_is_xdr(&rw->msgp->msg)); |
455 | break; |
456 | default: |
457 | cf_crash(AS_RW, "unexpected transaction origin %u" , rw->origin); |
458 | break; |
459 | } |
460 | |
461 | rw->from.any = NULL; // inform other callback it lost the race |
462 | } |
463 | |
464 | |
465 | //========================================================== |
466 | // Local helpers - delete master. |
467 | // |
468 | |
469 | transaction_status |
470 | drop_master(as_transaction* tr, as_index_ref* r_ref, rw_request* rw) |
471 | { |
472 | as_msg* m = &tr->msgp->msg; |
473 | as_namespace* ns = tr->rsv.ns; |
474 | as_index_tree* tree = tr->rsv.tree; |
475 | as_record* r = r_ref->r; |
476 | |
477 | // Check generation requirement, if any. |
478 | if (! generation_check(r, m, ns)) { |
479 | as_record_done(r_ref, ns); |
480 | cf_atomic64_incr(&ns->n_fail_generation); |
481 | tr->result_code = AS_ERR_GENERATION; |
482 | return TRANS_DONE_ERROR; |
483 | } |
484 | |
485 | // Apply predexp metadata filter if present. |
486 | |
487 | predexp_eval_t* predexp = NULL; |
488 | int result = build_predexp_and_filter_meta(tr, r, &predexp); |
489 | |
490 | if (result != 0) { |
491 | as_record_done(r_ref, ns); |
492 | tr->result_code = result; |
493 | return TRANS_DONE_ERROR; |
494 | } |
495 | |
496 | bool check_key = as_transaction_has_key(tr); |
497 | |
498 | if (ns->storage_data_in_memory || predexp != NULL || check_key) { |
499 | as_storage_rd rd; |
500 | as_storage_record_open(ns, r, &rd); |
501 | |
502 | // Apply predexp record bins filter if present. |
503 | if (predexp != NULL) { |
504 | if ((result = predexp_read_and_filter_bins(&rd, predexp)) != 0) { |
505 | predexp_destroy(predexp); |
506 | as_storage_record_close(&rd); |
507 | as_record_done(r_ref, ns); |
508 | tr->result_code = result; |
509 | return TRANS_DONE_ERROR; |
510 | } |
511 | |
512 | predexp_destroy(predexp); |
513 | } |
514 | |
515 | // Check the key if required. |
516 | // Note - for data-not-in-memory a key check is expensive! |
517 | if (check_key && as_storage_record_get_key(&rd) && |
518 | ! check_msg_key(m, &rd)) { |
519 | as_storage_record_close(&rd); |
520 | as_record_done(r_ref, ns); |
521 | tr->result_code = AS_ERR_KEY_MISMATCH; |
522 | return TRANS_DONE_ERROR; |
523 | } |
524 | |
525 | if (ns->storage_data_in_memory) { |
526 | delete_adjust_sindex(&rd); |
527 | } |
528 | |
529 | as_storage_record_close(&rd); |
530 | } |
531 | |
532 | // TODO - old pickle - remove in "six months". |
533 | if (as_exchange_min_compatibility_id() < 3 && rw->n_dest_nodes != 0) { |
534 | // Generate a binless pickle, but don't generate pickled rec-props - |
535 | // these are useless for a drop. |
536 | rw->is_old_pickle = true; |
537 | rw->pickle_sz = sizeof(uint16_t); |
538 | rw->pickle = cf_malloc(rw->pickle_sz); |
539 | *(uint16_t*)rw->pickle = 0; |
540 | } |
541 | |
542 | // Save the set-ID for XDR. |
543 | uint16_t set_id = as_index_get_set_id(r); |
544 | |
545 | as_index_delete(tree, &tr->keyd); |
546 | as_record_done(r_ref, ns); |
547 | |
548 | if (xdr_must_ship_delete(ns, as_msg_is_xdr(m))) { |
549 | xdr_write(ns, &tr->keyd, 0, 0, XDR_OP_TYPE_DROP, set_id, NULL); |
550 | } |
551 | |
552 | return TRANS_IN_PROGRESS; |
553 | } |
554 | |