1/*
2 * duplicate_resolve.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/duplicate_resolve.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33
34#include "citrusleaf/cf_atomic.h"
35#include "citrusleaf/cf_digest.h"
36
37#include "cf_mutex.h"
38#include "fault.h"
39#include "msg.h"
40#include "node.h"
41
42#include "base/datamodel.h"
43#include "base/proto.h"
44#include "base/service.h"
45#include "base/transaction.h"
46#include "fabric/exchange.h"
47#include "fabric/fabric.h"
48#include "fabric/partition.h"
49#include "storage/flat.h"
50#include "storage/storage.h"
51#include "transaction/rw_request.h"
52#include "transaction/rw_request_hash.h"
53#include "transaction/rw_utils.h"
54
55
56//==========================================================
57// Forward declarations.
58//
59
60int fill_ack_w_pickle(as_storage_rd* rd, msg* m);
61int old_fill_ack_w_pickle(as_storage_rd* rd, msg* m);
62void done_handle_request(as_partition_reservation* rsv, as_index_ref* r_ref, as_storage_rd* rd);
63void send_dup_res_ack(cf_node node, msg* m, uint32_t result, uint32_t info);
64void send_dup_res_ack_preserved(cf_node node, msg* m, uint32_t result, uint32_t info);
65uint32_t parse_dup_meta(msg* m, uint32_t* p_generation, uint64_t* p_last_update_time);
66uint32_t old_parse_conflict_meta(msg* m, uint32_t* generation, uint64_t* lut);
67void apply_winner(rw_request* rw);
68bool old_parse_winner(rw_request* rw, uint32_t info, as_remote_record* rr);
69
70
71//==========================================================
72// Public API.
73//
74
75void
76dup_res_make_message(rw_request* rw, as_transaction* tr)
77{
78 rw->dest_msg = as_fabric_msg_get(M_TYPE_RW);
79
80 as_namespace* ns = tr->rsv.ns;
81 msg* m = rw->dest_msg;
82
83 msg_set_uint32(m, RW_FIELD_OP, RW_OP_DUP);
84 msg_set_buf(m, RW_FIELD_NAMESPACE, (uint8_t*)ns->name, strlen(ns->name),
85 MSG_SET_COPY);
86 msg_set_uint32(m, RW_FIELD_NS_ID, ns->id);
87 msg_set_buf(m, RW_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest),
88 MSG_SET_COPY);
89 msg_set_uint32(m, RW_FIELD_TID, rw->tid);
90
91 // TODO - JUMP - send this only because versions up to 3.14.x require it.
92 msg_set_uint64(m, RW_FIELD_CLUSTER_KEY, as_exchange_cluster_key());
93
94 as_index_ref r_ref;
95
96 if (as_record_get(tr->rsv.tree, &tr->keyd, &r_ref) == 0) {
97 as_record* r = r_ref.r;
98
99 msg_set_uint32(m, RW_FIELD_GENERATION, r->generation);
100 msg_set_uint64(m, RW_FIELD_LAST_UPDATE_TIME, r->last_update_time);
101
102 as_record_done(&r_ref, ns);
103 }
104}
105
106
107void
108dup_res_setup_rw(rw_request* rw, as_transaction* tr, dup_res_done_cb dup_res_cb,
109 timeout_done_cb timeout_cb)
110{
111 rw->msgp = tr->msgp;
112 tr->msgp = NULL;
113
114 rw->msg_fields = tr->msg_fields;
115 rw->origin = tr->origin;
116 rw->from_flags = tr->from_flags;
117
118 rw->from.any = tr->from.any;
119 rw->from_data.any = tr->from_data.any;
120 tr->from.any = NULL;
121
122 rw->start_time = tr->start_time;
123 rw->benchmark_time = tr->benchmark_time;
124
125 as_partition_reservation_copy(&rw->rsv, &tr->rsv);
126 // Hereafter, rw must release the reservation - happens in destructor.
127
128 rw->end_time = tr->end_time;
129 // Note - don't need as_transaction's other 'container' members.
130
131 rw->dup_res_cb = dup_res_cb;
132 rw->timeout_cb = timeout_cb;
133
134 rw->xmit_ms = cf_getms() + g_config.transaction_retry_ms;
135 rw->retry_interval_ms = g_config.transaction_retry_ms;
136
137 rw->n_dest_nodes = tr->rsv.n_dupl;
138
139 for (uint32_t i = 0; i < rw->n_dest_nodes; i++) {
140 rw->dest_complete[i] = false;
141 rw->dest_nodes[i] = tr->rsv.dupl_nodes[i];
142 }
143
144 // Allow retransmit thread to destroy rw as soon as we unlock.
145 rw->is_set_up = true;
146}
147
148
149void
150dup_res_handle_request(cf_node node, msg* m)
151{
152 cf_digest* keyd;
153
154 if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
155 MSG_GET_DIRECT) != 0) {
156 cf_warning(AS_RW, "dup-res handler: no digest");
157 send_dup_res_ack(node, m, AS_ERR_UNKNOWN, 0);
158 return;
159 }
160
161 uint8_t* ns_name;
162 size_t ns_name_len;
163
164 if (msg_get_buf(m, RW_FIELD_NAMESPACE, &ns_name, &ns_name_len,
165 MSG_GET_DIRECT) != 0) {
166 cf_warning(AS_RW, "dup-res handler: no namespace");
167 send_dup_res_ack(node, m, AS_ERR_UNKNOWN, 0);
168 return;
169 }
170
171 as_namespace* ns = as_namespace_get_bybuf(ns_name, ns_name_len);
172
173 if (! ns) {
174 cf_warning(AS_RW, "dup-res handler: invalid namespace");
175 send_dup_res_ack(node, m, AS_ERR_UNKNOWN, 0);
176 return;
177 }
178
179 uint32_t generation = 0;
180 uint64_t last_update_time = 0;
181
182 bool local_conflict_check =
183 msg_get_uint32(m, RW_FIELD_GENERATION, &generation) == 0 &&
184 msg_get_uint64(m, RW_FIELD_LAST_UPDATE_TIME,
185 &last_update_time) == 0;
186
187 as_partition_reservation rsv;
188
189 as_partition_reserve(ns, as_partition_getid(keyd), &rsv);
190
191 as_index_ref r_ref;
192
193 if (as_record_get(rsv.tree, keyd, &r_ref) != 0) {
194 done_handle_request(&rsv, NULL, NULL);
195 send_dup_res_ack(node, m, AS_ERR_NOT_FOUND, 0);
196 return;
197 }
198
199 as_record* r = r_ref.r;
200
201 int result;
202
203 if ((result = as_partition_check_source(ns, rsv.p, node, NULL)) != AS_OK) {
204 done_handle_request(&rsv, &r_ref, NULL);
205 send_dup_res_ack(node, m, result, 0);
206 return;
207 }
208
209 if (local_conflict_check &&
210 (result = as_record_resolve_conflict(ns->conflict_resolution_policy,
211 generation, last_update_time, r->generation,
212 r->last_update_time)) <= 0) {
213 uint32_t info = dup_res_pack_repl_state_info(r, ns);
214
215 done_handle_request(&rsv, &r_ref, NULL);
216 send_dup_res_ack(node, m,
217 result == 0 ? AS_ERR_RECORD_EXISTS : AS_ERR_GENERATION, info);
218 return;
219 }
220
221 as_storage_rd rd;
222
223 as_storage_record_open(ns, r, &rd);
224
225 // TODO - old pickle - remove old method in "six months".
226 result = as_exchange_min_compatibility_id() >= 3 ?
227 fill_ack_w_pickle(&rd, m) : old_fill_ack_w_pickle(&rd, m);
228
229 if (result < 0) {
230 done_handle_request(&rsv, &r_ref, &rd);
231 send_dup_res_ack(node, m, (uint32_t)-result, 0);
232 return;
233 }
234
235 uint32_t info = dup_res_pack_info(r, ns);
236
237 done_handle_request(&rsv, &r_ref, &rd);
238 send_dup_res_ack_preserved(node, m, AS_OK, info);
239}
240
241
242void
243dup_res_handle_ack(cf_node node, msg* m)
244{
245 uint32_t ns_id;
246
247 if (msg_get_uint32(m, RW_FIELD_NS_ID, &ns_id) != 0) {
248 cf_warning(AS_RW, "dup-res ack: no ns-id");
249 as_fabric_msg_put(m);
250 return;
251 }
252
253 cf_digest* keyd;
254
255 if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
256 MSG_GET_DIRECT) != 0) {
257 uint8_t* pickle;
258 size_t pickle_sz;
259
260 if (msg_get_buf(m, RW_FIELD_RECORD, &pickle, &pickle_sz,
261 MSG_GET_DIRECT) != 0 ||
262 pickle_sz < sizeof(as_flat_record)) {
263 cf_warning(AS_RW, "dup-res ack: no or bad digest");
264 as_fabric_msg_put(m);
265 return;
266 }
267
268 keyd = &((as_flat_record*)pickle)->keyd;
269 }
270
271 uint32_t tid;
272
273 if (msg_get_uint32(m, RW_FIELD_TID, &tid) != 0) {
274 cf_warning(AS_RW, "dup-res ack: no tid");
275 as_fabric_msg_put(m);
276 return;
277 }
278
279 rw_request_hkey hkey = { ns_id, *keyd };
280 rw_request* rw = rw_request_hash_get(&hkey);
281
282 if (! rw) {
283 // Extra ack, after rw_request is already gone.
284 as_fabric_msg_put(m);
285 return;
286 }
287
288 cf_mutex_lock(&rw->lock);
289
290 if (rw->tid != tid || rw->dup_res_complete) {
291 // Extra ack - rw_request is newer transaction for same digest, or ack
292 // is arriving after rw_request was aborted or finished dup-res.
293 cf_mutex_unlock(&rw->lock);
294 rw_request_release(rw);
295 as_fabric_msg_put(m);
296 return;
297 }
298
299 // Find remote node in duplicates list.
300 int i = index_of_node(rw->dest_nodes, rw->n_dest_nodes, node);
301
302 if (i == -1) {
303 cf_warning(AS_RW, "dup-res ack: from non-dest node %lx", node);
304 cf_mutex_unlock(&rw->lock);
305 rw_request_release(rw);
306 as_fabric_msg_put(m);
307 return;
308 }
309
310 if (rw->dest_complete[i]) {
311 // Extra ack for this duplicate.
312 cf_mutex_unlock(&rw->lock);
313 rw_request_release(rw);
314 as_fabric_msg_put(m);
315 return;
316 }
317
318 rw->dest_complete[i] = true;
319
320 uint32_t generation = 0;
321 uint64_t last_update_time = 0;
322 uint32_t result_code = parse_dup_meta(m, &generation, &last_update_time);
323
324 // If it makes sense, retry transaction from the beginning.
325 // TODO - is this retry too fast? Should there be a throttle? If so, how?
326 if (dup_res_should_retry_transaction(rw, result_code)) {
327 if (! rw->from.any) {
328 // Lost race against timeout in retransmit thread.
329 cf_mutex_unlock(&rw->lock);
330 rw_request_release(rw);
331 as_fabric_msg_put(m);
332 return;
333 }
334
335 as_transaction tr;
336 as_transaction_init_head_from_rw(&tr, rw);
337
338 // Note that tr now owns msgp - make sure rw destructor doesn't free it.
339 // Note also that rw will release rsv - tr will get a new one.
340 rw->msgp = NULL;
341
342 tr.from_flags |= FROM_FLAG_RESTART;
343 as_service_enqueue_internal(&tr);
344
345 rw->dup_res_complete = true;
346
347 cf_mutex_unlock(&rw->lock);
348 rw_request_hash_delete(&hkey, rw);
349 rw_request_release(rw);
350 as_fabric_msg_put(m);
351 return;
352 }
353
354 dup_res_handle_tie(rw, m, result_code);
355
356 // Compare this duplicate with previous best, if any.
357 bool keep_previous_best = rw->best_dup_msg &&
358 as_record_resolve_conflict(rw->rsv.ns->conflict_resolution_policy,
359 rw->best_dup_gen, rw->best_dup_lut,
360 (uint16_t)generation, last_update_time) <= 0;
361
362 if (keep_previous_best) {
363 // This duplicate is no better than previous best - keep previous best.
364 as_fabric_msg_put(m);
365 }
366 else {
367 // No previous best, or this duplicate is better - keep this one.
368 if (rw->best_dup_msg) {
369 as_fabric_msg_put(rw->best_dup_msg);
370 }
371
372 msg_preserve_all_fields(m);
373 rw->best_dup_msg = m;
374 rw->best_dup_result_code = (uint8_t)result_code;
375 rw->best_dup_gen = generation;
376 rw->best_dup_lut = last_update_time;
377 }
378
379 // Saved or discarded m - from here down don't call as_fabric_msg_put(m)!
380
381 for (uint32_t j = 0; j < rw->n_dest_nodes; j++) {
382 if (! rw->dest_complete[j]) {
383 // Still haven't heard from all duplicates.
384 cf_mutex_unlock(&rw->lock);
385 rw_request_release(rw);
386 return;
387 }
388 }
389
390 if (rw->best_dup_result_code == AS_OK) {
391 apply_winner(rw); // sets rw->result_code to pass along to callback
392 }
393 else {
394 apply_if_tie(rw);
395 }
396
397 // Check for lost race against timeout in retransmit thread *after* applying
398 // winner - may save a future transaction from re-fetching the duplicates.
399 // Note - nsup deletes don't get here, so check using rw->from.any is ok.
400 if (! rw->from.any) {
401 cf_mutex_unlock(&rw->lock);
402 rw_request_release(rw);
403 return;
404 }
405
406 dup_res_translate_result_code(rw);
407
408 bool delete_from_hash = rw->dup_res_cb(rw);
409
410 rw->dup_res_complete = true;
411
412 cf_mutex_unlock(&rw->lock);
413
414 if (delete_from_hash) {
415 rw_request_hash_delete(&hkey, rw);
416 }
417
418 rw_request_release(rw);
419}
420
421
422//==========================================================
423// Local helpers.
424//
425
426int
427fill_ack_w_pickle(as_storage_rd* rd, msg* m)
428{
429 if (! as_storage_record_get_pickle(rd)) {
430 return -AS_ERR_UNKNOWN;
431 }
432
433 msg_preserve_fields(m, 2, RW_FIELD_NS_ID, RW_FIELD_TID);
434
435 // Can't fail from here on - ok to add message fields.
436
437 msg_set_buf(m, RW_FIELD_RECORD, rd->pickle, rd->pickle_sz,
438 MSG_SET_HANDOFF_MALLOC);
439
440 return AS_OK;
441}
442
443
444// TODO - old pickle - remove in "six months".
445int
446old_fill_ack_w_pickle(as_storage_rd* rd, msg* m)
447{
448 as_namespace* ns = rd->ns;
449 as_record* r = rd->r;
450
451 int result = as_storage_rd_load_n_bins(rd);
452
453 if (result < 0) {
454 return result;
455 }
456
457 as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd->n_bins];
458
459 if ((result = as_storage_rd_load_bins(rd, stack_bins)) < 0) {
460 return result;
461 }
462
463 if (! as_storage_record_get_key(rd)) {
464 return -AS_ERR_UNKNOWN;
465 }
466
467 msg_preserve_fields(m, 3, RW_FIELD_NS_ID, RW_FIELD_DIGEST, RW_FIELD_TID);
468
469 // Can't fail from here on - ok to add message fields.
470
471 msg_set_uint32(m, RW_FIELD_GENERATION, r->generation);
472 msg_set_uint64(m, RW_FIELD_LAST_UPDATE_TIME, r->last_update_time);
473
474 if (r->void_time != 0) {
475 msg_set_uint32(m, RW_FIELD_VOID_TIME, r->void_time);
476 }
477
478 const char* set_name = as_index_get_set_name(r, ns);
479
480 if (set_name) {
481 msg_set_buf(m, RW_FIELD_SET_NAME, (const uint8_t*)set_name,
482 strlen(set_name), MSG_SET_COPY);
483 }
484
485 if (rd->key) {
486 msg_set_buf(m, RW_FIELD_KEY, rd->key, rd->key_size, MSG_SET_COPY);
487 }
488
489 size_t buf_len;
490 uint8_t* buf = as_record_pickle(rd, &buf_len);
491
492 msg_set_buf(m, RW_FIELD_OLD_RECORD, buf, buf_len, MSG_SET_HANDOFF_MALLOC);
493
494 return AS_OK;
495}
496
497
498void
499done_handle_request(as_partition_reservation* rsv, as_index_ref* r_ref,
500 as_storage_rd* rd)
501{
502 if (rd) {
503 as_storage_record_close(rd);
504 }
505
506 if (r_ref) {
507 as_record_done(r_ref, rsv->ns);
508 }
509
510 if (rsv) {
511 as_partition_release(rsv);
512 }
513}
514
515
516void
517send_dup_res_ack(cf_node node, msg* m, uint32_t result, uint32_t info)
518{
519 msg_preserve_fields(m, 3, RW_FIELD_NS_ID, RW_FIELD_DIGEST, RW_FIELD_TID);
520
521 send_dup_res_ack_preserved(node, m, result, info);
522}
523
524
525void
526send_dup_res_ack_preserved(cf_node node, msg* m, uint32_t result, uint32_t info)
527{
528 msg_set_uint32(m, RW_FIELD_OP, RW_OP_DUP_ACK);
529 msg_set_uint32(m, RW_FIELD_RESULT, result);
530
531 if (info != 0) {
532 msg_set_uint32(m, RW_FIELD_INFO, info);
533 }
534
535 if (as_fabric_send(node, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
536 as_fabric_msg_put(m);
537 }
538}
539
540
541uint32_t
542parse_dup_meta(msg* m, uint32_t* p_generation, uint64_t* p_last_update_time)
543{
544 uint32_t result_code;
545
546 if (msg_get_uint32(m, RW_FIELD_RESULT, &result_code) != 0) {
547 cf_warning(AS_RW, "dup-res ack: no result_code");
548 return AS_ERR_UNKNOWN;
549 }
550
551 if (result_code != AS_OK) {
552 return result_code;
553 }
554
555 uint8_t* pickle;
556 size_t pickle_sz;
557
558 if (msg_get_buf(m, RW_FIELD_RECORD, &pickle, &pickle_sz,
559 MSG_GET_DIRECT) != 0) {
560 // TODO - old pickle - remove in "six months".
561 return old_parse_conflict_meta(m, p_generation, p_last_update_time);
562 }
563
564 *p_generation = ((as_flat_record*)pickle)->generation;
565
566 if (*p_generation == 0) {
567 cf_warning(AS_RW, "dup-res ack: generation 0");
568 return AS_ERR_UNKNOWN;
569 }
570
571 *p_last_update_time = ((as_flat_record*)pickle)->last_update_time;
572
573 return AS_OK;
574}
575
576
577// TODO - old pickle - remove in "six months".
578uint32_t
579old_parse_conflict_meta(msg* m, uint32_t* generation, uint64_t* lut)
580{
581 // TODO - old pickle - remove in "six months".
582 if (msg_get_uint32(m, RW_FIELD_GENERATION, generation) != 0 ||
583 *generation == 0) {
584 cf_warning(AS_RW, "dup-res ack: no or bad generation");
585 return AS_ERR_UNKNOWN;
586 }
587
588 if (msg_get_uint64(m, RW_FIELD_LAST_UPDATE_TIME, lut) != 0) {
589 cf_warning(AS_RW, "dup-res ack: no last-update-time");
590 return AS_ERR_UNKNOWN;
591 }
592
593 return AS_OK;
594}
595
596
597void
598apply_winner(rw_request* rw)
599{
600 msg* m = rw->best_dup_msg;
601
602 as_remote_record rr = {
603 // Skipping .src for now.
604 .rsv = &rw->rsv,
605 .keyd = &rw->keyd
606 };
607
608 uint32_t info = 0;
609
610 msg_get_uint32(m, RW_FIELD_INFO, &info);
611
612 if (msg_get_buf(m, RW_FIELD_RECORD, &rr.pickle, &rr.pickle_sz,
613 MSG_GET_DIRECT) != 0) {
614 // TODO - old pickle - remove in "six months".
615 if (! old_parse_winner(rw, info, &rr)) {
616 rw->result_code = AS_ERR_UNKNOWN;
617 return;
618 }
619 }
620 else if (! as_flat_unpack_remote_record_meta(rr.rsv->ns, &rr)) {
621 cf_warning_digest(AS_RW, &rw->keyd, "dup-res ack: bad record ");
622 rw->result_code = AS_ERR_UNKNOWN;
623 return;
624 }
625
626 dup_res_init_repl_state(&rr, info);
627
628 rw->result_code = (uint8_t)as_record_replace_if_better(&rr, false, false,
629 false);
630
631 // Duplicate resolution just treats these errors as successful no-ops:
632 if (rw->result_code == AS_ERR_RECORD_EXISTS ||
633 rw->result_code == AS_ERR_GENERATION) {
634 rw->result_code = AS_OK;
635 }
636}
637
638
639// TODO - old pickle - remove in "six months".
640bool
641old_parse_winner(rw_request* rw, uint32_t info, as_remote_record* rr)
642{
643 rr->generation = rw->best_dup_gen;
644 rr->last_update_time = rw->best_dup_lut;
645 rr->is_old_pickle = true;
646
647 msg* m = rw->best_dup_msg;
648
649 if (msg_get_buf(m, RW_FIELD_OLD_RECORD, &rr->pickle, &rr->pickle_sz,
650 MSG_GET_DIRECT) != 0 || rr->pickle_sz < 2) {
651 cf_warning_digest(AS_RW, &rw->keyd, "dup-res ack: no or bad record ");
652 return false;
653 }
654
655 if (dup_res_ignore_pickle(rr->pickle, info)) {
656 cf_warning_digest(AS_RW, &rw->keyd, "dup-res ack: binless pickle ");
657 return false;
658 }
659
660 msg_get_uint32(m, RW_FIELD_VOID_TIME, &rr->void_time);
661
662 msg_get_buf(m, RW_FIELD_SET_NAME, (uint8_t **)&rr->set_name,
663 &rr->set_name_len, MSG_GET_DIRECT);
664
665 msg_get_buf(m, RW_FIELD_KEY, (uint8_t **)&rr->key, &rr->key_size,
666 MSG_GET_DIRECT);
667
668 return true;
669}
670