1/*
2 * replica_write.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/replica_write.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33
34#include "citrusleaf/cf_clock.h"
35#include "citrusleaf/cf_digest.h"
36
37#include "cf_mutex.h"
38#include "fault.h"
39#include "msg.h"
40#include "node.h"
41
42#include "base/cfg.h"
43#include "base/datamodel.h"
44#include "base/health.h"
45#include "base/index.h"
46#include "base/proto.h"
47#include "base/secondary_index.h"
48#include "base/transaction.h"
49#include "base/xdr_serverside.h"
50#include "fabric/exchange.h" // TODO - old pickle - remove in "six months"
51#include "fabric/fabric.h"
52#include "fabric/partition.h"
53#include "transaction/delete.h"
54#include "transaction/rw_request.h"
55#include "transaction/rw_request_hash.h"
56#include "transaction/rw_utils.h"
57
58
59//==========================================================
60// Forward declarations.
61//
62
63void old_make_message(rw_request* rw, as_transaction* tr);
64uint32_t pack_info_bits(as_transaction* tr);
65void send_repl_write_ack(cf_node node, msg* m, uint32_t result);
66void send_repl_write_ack_w_digest(cf_node node, msg* m, uint32_t result,
67 const cf_digest* keyd);
68uint32_t parse_result_code(msg* m);
69void drop_replica(as_partition_reservation* rsv, cf_digest* keyd,
70 bool is_xdr_op, cf_node master);
71
72
73//==========================================================
74// Public API.
75//
76
77void
78repl_write_make_message(rw_request* rw, as_transaction* tr)
79{
80 if (rw->dest_msg) {
81 as_fabric_msg_put(rw->dest_msg);
82 }
83
84 rw->dest_msg = as_fabric_msg_get(M_TYPE_RW);
85
86 // TODO - old pickle - remove in "six months".
87 if (rw->is_old_pickle) {
88 old_make_message(rw, tr);
89 return;
90 }
91
92 as_namespace* ns = tr->rsv.ns;
93 msg* m = rw->dest_msg;
94
95 msg_set_uint32(m, RW_FIELD_OP, RW_OP_REPL_WRITE);
96 msg_set_buf(m, RW_FIELD_NAMESPACE, (uint8_t*)ns->name, strlen(ns->name),
97 MSG_SET_COPY);
98 msg_set_uint32(m, RW_FIELD_NS_ID, ns->id);
99 msg_set_uint32(m, RW_FIELD_TID, rw->tid);
100
101 if (rw->pickle != NULL) {
102 msg_set_buf(m, RW_FIELD_RECORD, rw->pickle, rw->pickle_sz,
103 MSG_SET_HANDOFF_MALLOC);
104 rw->pickle = NULL; // make sure destructor doesn't free this
105 }
106 else { // drop
107 msg_set_buf(m, RW_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest),
108 MSG_SET_COPY);
109 }
110
111 uint32_t info = pack_info_bits(tr);
112
113 if (info != 0) {
114 msg_set_uint32(m, RW_FIELD_INFO, info);
115 }
116}
117
118
119void
120repl_write_setup_rw(rw_request* rw, as_transaction* tr,
121 repl_write_done_cb repl_write_cb, timeout_done_cb timeout_cb)
122{
123 rw->msgp = tr->msgp;
124 tr->msgp = NULL;
125
126 rw->msg_fields = tr->msg_fields;
127 rw->origin = tr->origin;
128 rw->from_flags = tr->from_flags;
129
130 rw->from.any = tr->from.any;
131 rw->from_data.any = tr->from_data.any;
132 tr->from.any = NULL;
133
134 rw->start_time = tr->start_time;
135 rw->benchmark_time = tr->benchmark_time;
136
137 as_partition_reservation_copy(&rw->rsv, &tr->rsv);
138 // Hereafter, rw_request must release reservation - happens in destructor.
139
140 rw->end_time = tr->end_time;
141 rw->flags = tr->flags;
142 rw->generation = tr->generation;
143 rw->void_time = tr->void_time;
144 rw->last_update_time = tr->last_update_time;
145
146 rw->repl_write_cb = repl_write_cb;
147 rw->timeout_cb = timeout_cb;
148
149 rw->xmit_ms = cf_getms() + g_config.transaction_retry_ms;
150 rw->retry_interval_ms = g_config.transaction_retry_ms;
151
152 for (uint32_t i = 0; i < rw->n_dest_nodes; i++) {
153 rw->dest_complete[i] = false;
154 }
155
156 // Allow retransmit thread to destroy rw_request as soon as we unlock.
157 rw->is_set_up = true;
158
159 if (as_health_sample_replica_write()) {
160 rw->repl_start_us = cf_getus();
161 }
162}
163
164
165void
166repl_write_reset_rw(rw_request* rw, as_transaction* tr, repl_write_done_cb cb)
167{
168 // Reset rw->from.any which was set null in tr setup.
169 rw->from.any = tr->from.any;
170
171 // Needed for response to origin.
172 rw->flags = tr->flags;
173 rw->generation = tr->generation;
174 rw->void_time = tr->void_time;
175 rw->last_update_time = tr->last_update_time;
176
177 rw->repl_write_cb = cb;
178
179 // TODO - is this better than not resetting? Note - xmit_ms not volatile.
180 rw->xmit_ms = cf_getms() + g_config.transaction_retry_ms;
181 rw->retry_interval_ms = g_config.transaction_retry_ms;
182
183 for (uint32_t i = 0; i < rw->n_dest_nodes; i++) {
184 rw->dest_complete[i] = false;
185 }
186
187 if (as_health_sample_replica_write()) {
188 rw->repl_start_us = cf_getus();
189 }
190}
191
192
193void
194repl_write_handle_op(cf_node node, msg* m)
195{
196 uint8_t* ns_name;
197 size_t ns_name_len;
198
199 if (msg_get_buf(m, RW_FIELD_NAMESPACE, &ns_name, &ns_name_len,
200 MSG_GET_DIRECT) != 0) {
201 cf_warning(AS_RW, "repl_write_handle_op: no namespace");
202 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
203 return;
204 }
205
206 as_namespace* ns = as_namespace_get_bybuf(ns_name, ns_name_len);
207
208 if (! ns) {
209 cf_warning(AS_RW, "repl_write_handle_op: invalid namespace");
210 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
211 return;
212 }
213
214 uint32_t info = 0;
215
216 msg_get_uint32(m, RW_FIELD_INFO, &info);
217
218 cf_digest* keyd;
219 size_t keyd_size;
220
221 // Handle drops.
222 if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, &keyd_size,
223 MSG_GET_DIRECT) == 0) {
224 if (keyd_size != CF_DIGEST_KEY_SZ) {
225 cf_warning(AS_RW, "repl_write_handle_op: invalid digest");
226 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
227 return;
228 }
229
230 as_partition_reservation rsv;
231 uint32_t result = as_partition_reserve_replica(ns,
232 as_partition_getid(keyd), &rsv);
233
234 if (result == AS_OK) {
235 drop_replica(&rsv, keyd, (info & RW_INFO_XDR) != 0, node);
236 as_partition_release(&rsv);
237 }
238
239 send_repl_write_ack(node, m, result);
240
241 return;
242 }
243 // else - flat record, including tombstone.
244
245 as_remote_record rr = { .src = node };
246
247 if (msg_get_buf(m, RW_FIELD_RECORD, &rr.pickle, &rr.pickle_sz,
248 MSG_GET_DIRECT) != 0) {
249 cf_warning(AS_RW, "repl_write_handle_op: no record");
250 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
251 return;
252 }
253
254 if (! as_flat_unpack_remote_record_meta(ns, &rr)) {
255 cf_warning(AS_RW, "repl_write_handle_op: bad record");
256 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
257 return;
258 }
259
260 as_partition_reservation rsv;
261 uint32_t result = as_partition_reserve_replica(ns,
262 as_partition_getid(rr.keyd), &rsv);
263
264 if (result != AS_OK) {
265 send_repl_write_ack_w_digest(node, m, result, rr.keyd);
266 return;
267 }
268
269 rr.rsv = &rsv;
270
271 // Do XDR write if the write is a non-XDR write or forwarding is enabled.
272 bool do_xdr_write = (info & RW_INFO_XDR) == 0 ||
273 is_xdr_forwarding_enabled() || ns->ns_forward_xdr_writes;
274
275 // If source didn't touch sindex, may not need to touch it locally.
276 bool skip_sindex = (info & RW_INFO_SINDEX_TOUCHED) == 0;
277
278 result = (uint32_t)as_record_replace_if_better(&rr, true, skip_sindex,
279 do_xdr_write);
280
281 as_partition_release(&rsv);
282 send_repl_write_ack_w_digest(node, m, result, rr.keyd);
283}
284
285
286// TODO - old pickle - remove in "six months".
287void
288repl_write_handle_old_op(cf_node node, msg* m)
289{
290 uint8_t* ns_name;
291 size_t ns_name_len;
292
293 if (msg_get_buf(m, RW_FIELD_NAMESPACE, &ns_name, &ns_name_len,
294 MSG_GET_DIRECT) != 0) {
295 cf_warning(AS_RW, "repl_write_handle_op: no namespace");
296 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
297 return;
298 }
299
300 as_namespace* ns = as_namespace_get_bybuf(ns_name, ns_name_len);
301
302 if (! ns) {
303 cf_warning(AS_RW, "repl_write_handle_op: invalid namespace");
304 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
305 return;
306 }
307
308 cf_digest* keyd;
309
310 if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
311 MSG_GET_DIRECT) != 0) {
312 cf_warning(AS_RW, "repl_write_handle_op: no digest");
313 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
314 return;
315 }
316
317 as_partition_reservation rsv;
318 uint32_t result = as_partition_reserve_replica(ns, as_partition_getid(keyd),
319 &rsv);
320
321 if (result != AS_OK) {
322 send_repl_write_ack(node, m, result);
323 return;
324 }
325
326 as_remote_record rr = {
327 .src = node,
328 .rsv = &rsv,
329 .keyd = keyd,
330 .is_old_pickle = true
331 };
332
333 if (msg_get_buf(m, RW_FIELD_OLD_RECORD, &rr.pickle, &rr.pickle_sz,
334 MSG_GET_DIRECT) != 0 || rr.pickle_sz < 2) {
335 cf_warning(AS_RW, "repl_write_handle_op: no or bad record");
336 as_partition_release(&rsv);
337 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
338 return;
339 }
340
341 uint32_t info = 0;
342
343 msg_get_uint32(m, RW_FIELD_INFO, &info);
344
345 if (repl_write_pickle_is_drop(rr.pickle, info)) {
346 drop_replica(&rsv, keyd, (info & RW_INFO_XDR) != 0, node);
347
348 as_partition_release(&rsv);
349 send_repl_write_ack(node, m, AS_OK);
350
351 return;
352 }
353
354 if (msg_get_uint32(m, RW_FIELD_GENERATION, &rr.generation) != 0 ||
355 rr.generation == 0) {
356 cf_warning(AS_RW, "repl_write_handle_op: no or bad generation");
357 as_partition_release(&rsv);
358 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
359 return;
360 }
361
362 if (msg_get_uint64(m, RW_FIELD_LAST_UPDATE_TIME,
363 &rr.last_update_time) != 0) {
364 cf_warning(AS_RW, "repl_write_handle_op: no last-update-time");
365 as_partition_release(&rsv);
366 send_repl_write_ack(node, m, AS_ERR_UNKNOWN);
367 return;
368 }
369
370 msg_get_uint32(m, RW_FIELD_VOID_TIME, &rr.void_time);
371
372 msg_get_buf(m, RW_FIELD_SET_NAME, (uint8_t **)&rr.set_name,
373 &rr.set_name_len, MSG_GET_DIRECT);
374
375 msg_get_buf(m, RW_FIELD_KEY, (uint8_t **)&rr.key, &rr.key_size,
376 MSG_GET_DIRECT);
377
378 // Do XDR write if the write is a non-XDR write or forwarding is enabled.
379 bool do_xdr_write = (info & RW_INFO_XDR) == 0 ||
380 is_xdr_forwarding_enabled() || ns->ns_forward_xdr_writes;
381
382 // If source didn't touch sindex, may not need to touch it locally.
383 bool skip_sindex = (info & RW_INFO_SINDEX_TOUCHED) == 0;
384
385 result = (uint32_t)as_record_replace_if_better(&rr, true, skip_sindex,
386 do_xdr_write);
387
388 as_partition_release(&rsv);
389 send_repl_write_ack(node, m, result);
390}
391
392
393void
394repl_write_handle_ack(cf_node node, msg* m)
395{
396 uint32_t ns_id;
397
398 if (msg_get_uint32(m, RW_FIELD_NS_ID, &ns_id) != 0) {
399 cf_warning(AS_RW, "repl-write ack: no ns-id");
400 as_fabric_msg_put(m);
401 return;
402 }
403
404 cf_digest* keyd;
405
406 if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
407 MSG_GET_DIRECT) != 0) {
408 cf_warning(AS_RW, "repl-write ack: no digest");
409 as_fabric_msg_put(m);
410 return;
411 }
412
413 uint32_t tid;
414
415 if (msg_get_uint32(m, RW_FIELD_TID, &tid) != 0) {
416 cf_warning(AS_RW, "repl-write ack: no tid");
417 as_fabric_msg_put(m);
418 return;
419 }
420
421 rw_request_hkey hkey = { ns_id, *keyd };
422 rw_request* rw = rw_request_hash_get(&hkey);
423
424 if (! rw) {
425 // Extra ack, after rw_request is already gone.
426 as_fabric_msg_put(m);
427 return;
428 }
429
430 cf_mutex_lock(&rw->lock);
431
432 if (rw->tid != tid || rw->repl_write_complete) {
433 // Extra ack - rw_request is newer transaction for same digest, or ack
434 // is arriving after rw_request was aborted.
435 cf_mutex_unlock(&rw->lock);
436 rw_request_release(rw);
437 as_fabric_msg_put(m);
438 return;
439 }
440
441 if (! rw->from.any) {
442 // Lost race against timeout in retransmit thread.
443 cf_mutex_unlock(&rw->lock);
444 rw_request_release(rw);
445 as_fabric_msg_put(m);
446 return;
447 }
448
449 // Find remote node in replicas list.
450 int i = index_of_node(rw->dest_nodes, rw->n_dest_nodes, node);
451
452 if (i == -1) {
453 cf_warning(AS_RW, "repl-write ack: from non-dest node %lx", node);
454 cf_mutex_unlock(&rw->lock);
455 rw_request_release(rw);
456 as_fabric_msg_put(m);
457 return;
458 }
459
460 if (rw->dest_complete[i]) {
461 // Extra ack for this replica write.
462 cf_mutex_unlock(&rw->lock);
463 rw_request_release(rw);
464 as_fabric_msg_put(m);
465 return;
466 }
467
468 uint32_t result_code = parse_result_code(m);
469
470 // If it makes sense, retransmit replicas. Note - rw->dest_complete[i] not
471 // yet set true, so that retransmit will go to this remote node.
472 if (repl_write_should_retransmit_replicas(rw, result_code)) {
473 cf_mutex_unlock(&rw->lock);
474 rw_request_release(rw);
475 as_fabric_msg_put(m);
476 return;
477 }
478
479 rw->dest_complete[i] = true;
480
481 as_health_add_ns_latency(node, ns_id, AS_HEALTH_NS_REPL_LAT,
482 rw->repl_start_us);
483
484 for (uint32_t j = 0; j < rw->n_dest_nodes; j++) {
485 if (! rw->dest_complete[j]) {
486 // Still haven't heard from all replicas.
487 cf_mutex_unlock(&rw->lock);
488 rw_request_release(rw);
489 as_fabric_msg_put(m);
490 return;
491 }
492 }
493
494 // Success for all replicas.
495 rw->repl_write_cb(rw);
496 repl_write_send_confirmation(rw);
497
498 rw->repl_write_complete = true;
499
500 cf_mutex_unlock(&rw->lock);
501 rw_request_hash_delete(&hkey, rw);
502 rw_request_release(rw);
503 as_fabric_msg_put(m);
504}
505
506
507//==========================================================
508// Local helpers.
509//
510
511// TODO - old pickle - remove in "six months".
512void
513old_make_message(rw_request* rw, as_transaction* tr)
514{
515 // TODO - remove this when we're comfortable:
516 cf_assert(rw->pickle, AS_RW, "making repl-write msg with null pickle");
517
518 as_namespace* ns = tr->rsv.ns;
519 msg* m = rw->dest_msg;
520
521 msg_set_uint32(m, RW_FIELD_OP, RW_OP_WRITE);
522 msg_set_buf(m, RW_FIELD_NAMESPACE, (uint8_t*)ns->name, strlen(ns->name),
523 MSG_SET_COPY);
524 msg_set_uint32(m, RW_FIELD_NS_ID, ns->id);
525 msg_set_buf(m, RW_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest),
526 MSG_SET_COPY);
527 msg_set_uint32(m, RW_FIELD_TID, rw->tid);
528 msg_set_uint32(m, RW_FIELD_GENERATION, tr->generation);
529 msg_set_uint64(m, RW_FIELD_LAST_UPDATE_TIME, tr->last_update_time);
530
531 if (tr->void_time != 0) {
532 msg_set_uint32(m, RW_FIELD_VOID_TIME, tr->void_time);
533 }
534
535 uint32_t info = pack_info_bits(tr);
536
537 repl_write_flag_pickle(tr, rw->pickle, &info);
538
539 msg_set_buf(m, RW_FIELD_OLD_RECORD, rw->pickle, rw->pickle_sz,
540 MSG_SET_HANDOFF_MALLOC);
541 rw->pickle = NULL; // make sure destructor doesn't free this
542
543 if (rw->set_name) {
544 msg_set_buf(m, RW_FIELD_SET_NAME, (const uint8_t*)rw->set_name,
545 rw->set_name_len, MSG_SET_COPY);
546 // rw->set_name points directly into vmap - never free it.
547 }
548
549 if (rw->key) {
550 msg_set_buf(m, RW_FIELD_KEY, rw->key, rw->key_size,
551 MSG_SET_HANDOFF_MALLOC);
552 rw->key = NULL; // make sure destructor doesn't free this
553 }
554
555 if (info != 0) {
556 msg_set_uint32(m, RW_FIELD_INFO, info);
557 }
558}
559
560
561uint32_t
562pack_info_bits(as_transaction* tr)
563{
564 uint32_t info = 0;
565
566 if (as_transaction_is_xdr(tr)) {
567 info |= RW_INFO_XDR;
568 }
569
570 if ((tr->flags & AS_TRANSACTION_FLAG_SINDEX_TOUCHED) != 0) {
571 info |= RW_INFO_SINDEX_TOUCHED;
572 }
573
574 if (respond_on_master_complete(tr)) {
575 info |= RW_INFO_NO_REPL_ACK;
576 }
577
578 return info;
579}
580
581
582void
583send_repl_write_ack(cf_node node, msg* m, uint32_t result)
584{
585 uint32_t info = 0;
586
587 msg_get_uint32(m, RW_FIELD_INFO, &info);
588
589 if ((info & RW_INFO_NO_REPL_ACK) != 0) {
590 as_fabric_msg_put(m);
591 return;
592 }
593
594 msg_preserve_fields(m, 3, RW_FIELD_NS_ID, RW_FIELD_DIGEST, RW_FIELD_TID);
595
596 msg_set_uint32(m, RW_FIELD_OP, RW_OP_WRITE_ACK);
597 msg_set_uint32(m, RW_FIELD_RESULT, result);
598
599 if (as_fabric_send(node, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
600 as_fabric_msg_put(m);
601 }
602}
603
604
605void
606send_repl_write_ack_w_digest(cf_node node, msg* m, uint32_t result,
607 const cf_digest* keyd)
608{
609 uint32_t info = 0;
610
611 msg_get_uint32(m, RW_FIELD_INFO, &info);
612
613 if ((info & RW_INFO_NO_REPL_ACK) != 0) {
614 as_fabric_msg_put(m);
615 return;
616 }
617
618 msg_preserve_fields(m, 2, RW_FIELD_NS_ID, RW_FIELD_TID);
619
620 msg_set_uint32(m, RW_FIELD_OP, RW_OP_WRITE_ACK);
621 msg_set_uint32(m, RW_FIELD_RESULT, result);
622 msg_set_buf(m, RW_FIELD_DIGEST, (const uint8_t*)keyd, sizeof(cf_digest),
623 MSG_SET_COPY);
624
625 if (as_fabric_send(node, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
626 as_fabric_msg_put(m);
627 }
628}
629
630
631uint32_t
632parse_result_code(msg* m)
633{
634 uint32_t result_code;
635
636 if (msg_get_uint32(m, RW_FIELD_RESULT, &result_code) != 0) {
637 cf_warning(AS_RW, "repl-write ack: no result_code");
638 return AS_ERR_UNKNOWN;
639 }
640
641 return result_code;
642}
643
644
645void
646drop_replica(as_partition_reservation* rsv, cf_digest* keyd, bool is_xdr_op,
647 cf_node master)
648{
649 // Shortcut pointers & flags.
650 as_namespace* ns = rsv->ns;
651 as_index_tree* tree = rsv->tree;
652
653 as_index_ref r_ref;
654
655 if (as_record_get(tree, keyd, &r_ref) != 0) {
656 return; // not found is ok from master's perspective.
657 }
658
659 as_record* r = r_ref.r;
660
661 if (ns->storage_data_in_memory) {
662 record_delete_adjust_sindex(r, ns);
663 }
664
665 // Save the set-ID for XDR.
666 uint16_t set_id = as_index_get_set_id(r);
667
668 as_index_delete(tree, keyd);
669 as_record_done(&r_ref, ns);
670
671 if (xdr_must_ship_delete(ns, is_xdr_op)) {
672 xdr_write(ns, keyd, 0, master, XDR_OP_TYPE_DROP, set_id, NULL);
673 }
674}
675