1 | /* |
2 | * replica_write.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/replica_write.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | #include <string.h> |
33 | |
34 | #include "citrusleaf/cf_clock.h" |
35 | #include "citrusleaf/cf_digest.h" |
36 | |
37 | #include "cf_mutex.h" |
38 | #include "fault.h" |
39 | #include "msg.h" |
40 | #include "node.h" |
41 | |
42 | #include "base/cfg.h" |
43 | #include "base/datamodel.h" |
44 | #include "base/health.h" |
45 | #include "base/index.h" |
46 | #include "base/proto.h" |
47 | #include "base/secondary_index.h" |
48 | #include "base/transaction.h" |
49 | #include "base/xdr_serverside.h" |
50 | #include "fabric/exchange.h" // TODO - old pickle - remove in "six months" |
51 | #include "fabric/fabric.h" |
52 | #include "fabric/partition.h" |
53 | #include "transaction/delete.h" |
54 | #include "transaction/rw_request.h" |
55 | #include "transaction/rw_request_hash.h" |
56 | #include "transaction/rw_utils.h" |
57 | |
58 | |
59 | //========================================================== |
60 | // Forward declarations. |
61 | // |
62 | |
63 | void old_make_message(rw_request* rw, as_transaction* tr); |
64 | uint32_t pack_info_bits(as_transaction* tr); |
65 | void send_repl_write_ack(cf_node node, msg* m, uint32_t result); |
66 | void send_repl_write_ack_w_digest(cf_node node, msg* m, uint32_t result, |
67 | const cf_digest* keyd); |
68 | uint32_t parse_result_code(msg* m); |
69 | void drop_replica(as_partition_reservation* rsv, cf_digest* keyd, |
70 | bool is_xdr_op, cf_node master); |
71 | |
72 | |
73 | //========================================================== |
74 | // Public API. |
75 | // |
76 | |
77 | void |
78 | repl_write_make_message(rw_request* rw, as_transaction* tr) |
79 | { |
80 | if (rw->dest_msg) { |
81 | as_fabric_msg_put(rw->dest_msg); |
82 | } |
83 | |
84 | rw->dest_msg = as_fabric_msg_get(M_TYPE_RW); |
85 | |
86 | // TODO - old pickle - remove in "six months". |
87 | if (rw->is_old_pickle) { |
88 | old_make_message(rw, tr); |
89 | return; |
90 | } |
91 | |
92 | as_namespace* ns = tr->rsv.ns; |
93 | msg* m = rw->dest_msg; |
94 | |
95 | msg_set_uint32(m, RW_FIELD_OP, RW_OP_REPL_WRITE); |
96 | msg_set_buf(m, RW_FIELD_NAMESPACE, (uint8_t*)ns->name, strlen(ns->name), |
97 | MSG_SET_COPY); |
98 | msg_set_uint32(m, RW_FIELD_NS_ID, ns->id); |
99 | msg_set_uint32(m, RW_FIELD_TID, rw->tid); |
100 | |
101 | if (rw->pickle != NULL) { |
102 | msg_set_buf(m, RW_FIELD_RECORD, rw->pickle, rw->pickle_sz, |
103 | MSG_SET_HANDOFF_MALLOC); |
104 | rw->pickle = NULL; // make sure destructor doesn't free this |
105 | } |
106 | else { // drop |
107 | msg_set_buf(m, RW_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest), |
108 | MSG_SET_COPY); |
109 | } |
110 | |
111 | uint32_t info = pack_info_bits(tr); |
112 | |
113 | if (info != 0) { |
114 | msg_set_uint32(m, RW_FIELD_INFO, info); |
115 | } |
116 | } |
117 | |
118 | |
119 | void |
120 | repl_write_setup_rw(rw_request* rw, as_transaction* tr, |
121 | repl_write_done_cb repl_write_cb, timeout_done_cb timeout_cb) |
122 | { |
123 | rw->msgp = tr->msgp; |
124 | tr->msgp = NULL; |
125 | |
126 | rw->msg_fields = tr->msg_fields; |
127 | rw->origin = tr->origin; |
128 | rw->from_flags = tr->from_flags; |
129 | |
130 | rw->from.any = tr->from.any; |
131 | rw->from_data.any = tr->from_data.any; |
132 | tr->from.any = NULL; |
133 | |
134 | rw->start_time = tr->start_time; |
135 | rw->benchmark_time = tr->benchmark_time; |
136 | |
137 | as_partition_reservation_copy(&rw->rsv, &tr->rsv); |
138 | // Hereafter, rw_request must release reservation - happens in destructor. |
139 | |
140 | rw->end_time = tr->end_time; |
141 | rw->flags = tr->flags; |
142 | rw->generation = tr->generation; |
143 | rw->void_time = tr->void_time; |
144 | rw->last_update_time = tr->last_update_time; |
145 | |
146 | rw->repl_write_cb = repl_write_cb; |
147 | rw->timeout_cb = timeout_cb; |
148 | |
149 | rw->xmit_ms = cf_getms() + g_config.transaction_retry_ms; |
150 | rw->retry_interval_ms = g_config.transaction_retry_ms; |
151 | |
152 | for (uint32_t i = 0; i < rw->n_dest_nodes; i++) { |
153 | rw->dest_complete[i] = false; |
154 | } |
155 | |
156 | // Allow retransmit thread to destroy rw_request as soon as we unlock. |
157 | rw->is_set_up = true; |
158 | |
159 | if (as_health_sample_replica_write()) { |
160 | rw->repl_start_us = cf_getus(); |
161 | } |
162 | } |
163 | |
164 | |
165 | void |
166 | repl_write_reset_rw(rw_request* rw, as_transaction* tr, repl_write_done_cb cb) |
167 | { |
168 | // Reset rw->from.any which was set null in tr setup. |
169 | rw->from.any = tr->from.any; |
170 | |
171 | // Needed for response to origin. |
172 | rw->flags = tr->flags; |
173 | rw->generation = tr->generation; |
174 | rw->void_time = tr->void_time; |
175 | rw->last_update_time = tr->last_update_time; |
176 | |
177 | rw->repl_write_cb = cb; |
178 | |
179 | // TODO - is this better than not resetting? Note - xmit_ms not volatile. |
180 | rw->xmit_ms = cf_getms() + g_config.transaction_retry_ms; |
181 | rw->retry_interval_ms = g_config.transaction_retry_ms; |
182 | |
183 | for (uint32_t i = 0; i < rw->n_dest_nodes; i++) { |
184 | rw->dest_complete[i] = false; |
185 | } |
186 | |
187 | if (as_health_sample_replica_write()) { |
188 | rw->repl_start_us = cf_getus(); |
189 | } |
190 | } |
191 | |
192 | |
193 | void |
194 | repl_write_handle_op(cf_node node, msg* m) |
195 | { |
196 | uint8_t* ns_name; |
197 | size_t ns_name_len; |
198 | |
199 | if (msg_get_buf(m, RW_FIELD_NAMESPACE, &ns_name, &ns_name_len, |
200 | MSG_GET_DIRECT) != 0) { |
201 | cf_warning(AS_RW, "repl_write_handle_op: no namespace" ); |
202 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
203 | return; |
204 | } |
205 | |
206 | as_namespace* ns = as_namespace_get_bybuf(ns_name, ns_name_len); |
207 | |
208 | if (! ns) { |
209 | cf_warning(AS_RW, "repl_write_handle_op: invalid namespace" ); |
210 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
211 | return; |
212 | } |
213 | |
214 | uint32_t info = 0; |
215 | |
216 | msg_get_uint32(m, RW_FIELD_INFO, &info); |
217 | |
218 | cf_digest* keyd; |
219 | size_t keyd_size; |
220 | |
221 | // Handle drops. |
222 | if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, &keyd_size, |
223 | MSG_GET_DIRECT) == 0) { |
224 | if (keyd_size != CF_DIGEST_KEY_SZ) { |
225 | cf_warning(AS_RW, "repl_write_handle_op: invalid digest" ); |
226 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
227 | return; |
228 | } |
229 | |
230 | as_partition_reservation rsv; |
231 | uint32_t result = as_partition_reserve_replica(ns, |
232 | as_partition_getid(keyd), &rsv); |
233 | |
234 | if (result == AS_OK) { |
235 | drop_replica(&rsv, keyd, (info & RW_INFO_XDR) != 0, node); |
236 | as_partition_release(&rsv); |
237 | } |
238 | |
239 | send_repl_write_ack(node, m, result); |
240 | |
241 | return; |
242 | } |
243 | // else - flat record, including tombstone. |
244 | |
245 | as_remote_record rr = { .src = node }; |
246 | |
247 | if (msg_get_buf(m, RW_FIELD_RECORD, &rr.pickle, &rr.pickle_sz, |
248 | MSG_GET_DIRECT) != 0) { |
249 | cf_warning(AS_RW, "repl_write_handle_op: no record" ); |
250 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
251 | return; |
252 | } |
253 | |
254 | if (! as_flat_unpack_remote_record_meta(ns, &rr)) { |
255 | cf_warning(AS_RW, "repl_write_handle_op: bad record" ); |
256 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
257 | return; |
258 | } |
259 | |
260 | as_partition_reservation rsv; |
261 | uint32_t result = as_partition_reserve_replica(ns, |
262 | as_partition_getid(rr.keyd), &rsv); |
263 | |
264 | if (result != AS_OK) { |
265 | send_repl_write_ack_w_digest(node, m, result, rr.keyd); |
266 | return; |
267 | } |
268 | |
269 | rr.rsv = &rsv; |
270 | |
271 | // Do XDR write if the write is a non-XDR write or forwarding is enabled. |
272 | bool do_xdr_write = (info & RW_INFO_XDR) == 0 || |
273 | is_xdr_forwarding_enabled() || ns->ns_forward_xdr_writes; |
274 | |
275 | // If source didn't touch sindex, may not need to touch it locally. |
276 | bool skip_sindex = (info & RW_INFO_SINDEX_TOUCHED) == 0; |
277 | |
278 | result = (uint32_t)as_record_replace_if_better(&rr, true, skip_sindex, |
279 | do_xdr_write); |
280 | |
281 | as_partition_release(&rsv); |
282 | send_repl_write_ack_w_digest(node, m, result, rr.keyd); |
283 | } |
284 | |
285 | |
286 | // TODO - old pickle - remove in "six months". |
287 | void |
288 | repl_write_handle_old_op(cf_node node, msg* m) |
289 | { |
290 | uint8_t* ns_name; |
291 | size_t ns_name_len; |
292 | |
293 | if (msg_get_buf(m, RW_FIELD_NAMESPACE, &ns_name, &ns_name_len, |
294 | MSG_GET_DIRECT) != 0) { |
295 | cf_warning(AS_RW, "repl_write_handle_op: no namespace" ); |
296 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
297 | return; |
298 | } |
299 | |
300 | as_namespace* ns = as_namespace_get_bybuf(ns_name, ns_name_len); |
301 | |
302 | if (! ns) { |
303 | cf_warning(AS_RW, "repl_write_handle_op: invalid namespace" ); |
304 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
305 | return; |
306 | } |
307 | |
308 | cf_digest* keyd; |
309 | |
310 | if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
311 | MSG_GET_DIRECT) != 0) { |
312 | cf_warning(AS_RW, "repl_write_handle_op: no digest" ); |
313 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
314 | return; |
315 | } |
316 | |
317 | as_partition_reservation rsv; |
318 | uint32_t result = as_partition_reserve_replica(ns, as_partition_getid(keyd), |
319 | &rsv); |
320 | |
321 | if (result != AS_OK) { |
322 | send_repl_write_ack(node, m, result); |
323 | return; |
324 | } |
325 | |
326 | as_remote_record rr = { |
327 | .src = node, |
328 | .rsv = &rsv, |
329 | .keyd = keyd, |
330 | .is_old_pickle = true |
331 | }; |
332 | |
333 | if (msg_get_buf(m, RW_FIELD_OLD_RECORD, &rr.pickle, &rr.pickle_sz, |
334 | MSG_GET_DIRECT) != 0 || rr.pickle_sz < 2) { |
335 | cf_warning(AS_RW, "repl_write_handle_op: no or bad record" ); |
336 | as_partition_release(&rsv); |
337 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
338 | return; |
339 | } |
340 | |
341 | uint32_t info = 0; |
342 | |
343 | msg_get_uint32(m, RW_FIELD_INFO, &info); |
344 | |
345 | if (repl_write_pickle_is_drop(rr.pickle, info)) { |
346 | drop_replica(&rsv, keyd, (info & RW_INFO_XDR) != 0, node); |
347 | |
348 | as_partition_release(&rsv); |
349 | send_repl_write_ack(node, m, AS_OK); |
350 | |
351 | return; |
352 | } |
353 | |
354 | if (msg_get_uint32(m, RW_FIELD_GENERATION, &rr.generation) != 0 || |
355 | rr.generation == 0) { |
356 | cf_warning(AS_RW, "repl_write_handle_op: no or bad generation" ); |
357 | as_partition_release(&rsv); |
358 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
359 | return; |
360 | } |
361 | |
362 | if (msg_get_uint64(m, RW_FIELD_LAST_UPDATE_TIME, |
363 | &rr.last_update_time) != 0) { |
364 | cf_warning(AS_RW, "repl_write_handle_op: no last-update-time" ); |
365 | as_partition_release(&rsv); |
366 | send_repl_write_ack(node, m, AS_ERR_UNKNOWN); |
367 | return; |
368 | } |
369 | |
370 | msg_get_uint32(m, RW_FIELD_VOID_TIME, &rr.void_time); |
371 | |
372 | msg_get_buf(m, RW_FIELD_SET_NAME, (uint8_t **)&rr.set_name, |
373 | &rr.set_name_len, MSG_GET_DIRECT); |
374 | |
375 | msg_get_buf(m, RW_FIELD_KEY, (uint8_t **)&rr.key, &rr.key_size, |
376 | MSG_GET_DIRECT); |
377 | |
378 | // Do XDR write if the write is a non-XDR write or forwarding is enabled. |
379 | bool do_xdr_write = (info & RW_INFO_XDR) == 0 || |
380 | is_xdr_forwarding_enabled() || ns->ns_forward_xdr_writes; |
381 | |
382 | // If source didn't touch sindex, may not need to touch it locally. |
383 | bool skip_sindex = (info & RW_INFO_SINDEX_TOUCHED) == 0; |
384 | |
385 | result = (uint32_t)as_record_replace_if_better(&rr, true, skip_sindex, |
386 | do_xdr_write); |
387 | |
388 | as_partition_release(&rsv); |
389 | send_repl_write_ack(node, m, result); |
390 | } |
391 | |
392 | |
393 | void |
394 | repl_write_handle_ack(cf_node node, msg* m) |
395 | { |
396 | uint32_t ns_id; |
397 | |
398 | if (msg_get_uint32(m, RW_FIELD_NS_ID, &ns_id) != 0) { |
399 | cf_warning(AS_RW, "repl-write ack: no ns-id" ); |
400 | as_fabric_msg_put(m); |
401 | return; |
402 | } |
403 | |
404 | cf_digest* keyd; |
405 | |
406 | if (msg_get_buf(m, RW_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
407 | MSG_GET_DIRECT) != 0) { |
408 | cf_warning(AS_RW, "repl-write ack: no digest" ); |
409 | as_fabric_msg_put(m); |
410 | return; |
411 | } |
412 | |
413 | uint32_t tid; |
414 | |
415 | if (msg_get_uint32(m, RW_FIELD_TID, &tid) != 0) { |
416 | cf_warning(AS_RW, "repl-write ack: no tid" ); |
417 | as_fabric_msg_put(m); |
418 | return; |
419 | } |
420 | |
421 | rw_request_hkey hkey = { ns_id, *keyd }; |
422 | rw_request* rw = rw_request_hash_get(&hkey); |
423 | |
424 | if (! rw) { |
425 | // Extra ack, after rw_request is already gone. |
426 | as_fabric_msg_put(m); |
427 | return; |
428 | } |
429 | |
430 | cf_mutex_lock(&rw->lock); |
431 | |
432 | if (rw->tid != tid || rw->repl_write_complete) { |
433 | // Extra ack - rw_request is newer transaction for same digest, or ack |
434 | // is arriving after rw_request was aborted. |
435 | cf_mutex_unlock(&rw->lock); |
436 | rw_request_release(rw); |
437 | as_fabric_msg_put(m); |
438 | return; |
439 | } |
440 | |
441 | if (! rw->from.any) { |
442 | // Lost race against timeout in retransmit thread. |
443 | cf_mutex_unlock(&rw->lock); |
444 | rw_request_release(rw); |
445 | as_fabric_msg_put(m); |
446 | return; |
447 | } |
448 | |
449 | // Find remote node in replicas list. |
450 | int i = index_of_node(rw->dest_nodes, rw->n_dest_nodes, node); |
451 | |
452 | if (i == -1) { |
453 | cf_warning(AS_RW, "repl-write ack: from non-dest node %lx" , node); |
454 | cf_mutex_unlock(&rw->lock); |
455 | rw_request_release(rw); |
456 | as_fabric_msg_put(m); |
457 | return; |
458 | } |
459 | |
460 | if (rw->dest_complete[i]) { |
461 | // Extra ack for this replica write. |
462 | cf_mutex_unlock(&rw->lock); |
463 | rw_request_release(rw); |
464 | as_fabric_msg_put(m); |
465 | return; |
466 | } |
467 | |
468 | uint32_t result_code = parse_result_code(m); |
469 | |
470 | // If it makes sense, retransmit replicas. Note - rw->dest_complete[i] not |
471 | // yet set true, so that retransmit will go to this remote node. |
472 | if (repl_write_should_retransmit_replicas(rw, result_code)) { |
473 | cf_mutex_unlock(&rw->lock); |
474 | rw_request_release(rw); |
475 | as_fabric_msg_put(m); |
476 | return; |
477 | } |
478 | |
479 | rw->dest_complete[i] = true; |
480 | |
481 | as_health_add_ns_latency(node, ns_id, AS_HEALTH_NS_REPL_LAT, |
482 | rw->repl_start_us); |
483 | |
484 | for (uint32_t j = 0; j < rw->n_dest_nodes; j++) { |
485 | if (! rw->dest_complete[j]) { |
486 | // Still haven't heard from all replicas. |
487 | cf_mutex_unlock(&rw->lock); |
488 | rw_request_release(rw); |
489 | as_fabric_msg_put(m); |
490 | return; |
491 | } |
492 | } |
493 | |
494 | // Success for all replicas. |
495 | rw->repl_write_cb(rw); |
496 | repl_write_send_confirmation(rw); |
497 | |
498 | rw->repl_write_complete = true; |
499 | |
500 | cf_mutex_unlock(&rw->lock); |
501 | rw_request_hash_delete(&hkey, rw); |
502 | rw_request_release(rw); |
503 | as_fabric_msg_put(m); |
504 | } |
505 | |
506 | |
507 | //========================================================== |
508 | // Local helpers. |
509 | // |
510 | |
511 | // TODO - old pickle - remove in "six months". |
512 | void |
513 | old_make_message(rw_request* rw, as_transaction* tr) |
514 | { |
515 | // TODO - remove this when we're comfortable: |
516 | cf_assert(rw->pickle, AS_RW, "making repl-write msg with null pickle" ); |
517 | |
518 | as_namespace* ns = tr->rsv.ns; |
519 | msg* m = rw->dest_msg; |
520 | |
521 | msg_set_uint32(m, RW_FIELD_OP, RW_OP_WRITE); |
522 | msg_set_buf(m, RW_FIELD_NAMESPACE, (uint8_t*)ns->name, strlen(ns->name), |
523 | MSG_SET_COPY); |
524 | msg_set_uint32(m, RW_FIELD_NS_ID, ns->id); |
525 | msg_set_buf(m, RW_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest), |
526 | MSG_SET_COPY); |
527 | msg_set_uint32(m, RW_FIELD_TID, rw->tid); |
528 | msg_set_uint32(m, RW_FIELD_GENERATION, tr->generation); |
529 | msg_set_uint64(m, RW_FIELD_LAST_UPDATE_TIME, tr->last_update_time); |
530 | |
531 | if (tr->void_time != 0) { |
532 | msg_set_uint32(m, RW_FIELD_VOID_TIME, tr->void_time); |
533 | } |
534 | |
535 | uint32_t info = pack_info_bits(tr); |
536 | |
537 | repl_write_flag_pickle(tr, rw->pickle, &info); |
538 | |
539 | msg_set_buf(m, RW_FIELD_OLD_RECORD, rw->pickle, rw->pickle_sz, |
540 | MSG_SET_HANDOFF_MALLOC); |
541 | rw->pickle = NULL; // make sure destructor doesn't free this |
542 | |
543 | if (rw->set_name) { |
544 | msg_set_buf(m, RW_FIELD_SET_NAME, (const uint8_t*)rw->set_name, |
545 | rw->set_name_len, MSG_SET_COPY); |
546 | // rw->set_name points directly into vmap - never free it. |
547 | } |
548 | |
549 | if (rw->key) { |
550 | msg_set_buf(m, RW_FIELD_KEY, rw->key, rw->key_size, |
551 | MSG_SET_HANDOFF_MALLOC); |
552 | rw->key = NULL; // make sure destructor doesn't free this |
553 | } |
554 | |
555 | if (info != 0) { |
556 | msg_set_uint32(m, RW_FIELD_INFO, info); |
557 | } |
558 | } |
559 | |
560 | |
561 | uint32_t |
562 | pack_info_bits(as_transaction* tr) |
563 | { |
564 | uint32_t info = 0; |
565 | |
566 | if (as_transaction_is_xdr(tr)) { |
567 | info |= RW_INFO_XDR; |
568 | } |
569 | |
570 | if ((tr->flags & AS_TRANSACTION_FLAG_SINDEX_TOUCHED) != 0) { |
571 | info |= RW_INFO_SINDEX_TOUCHED; |
572 | } |
573 | |
574 | if (respond_on_master_complete(tr)) { |
575 | info |= RW_INFO_NO_REPL_ACK; |
576 | } |
577 | |
578 | return info; |
579 | } |
580 | |
581 | |
582 | void |
583 | send_repl_write_ack(cf_node node, msg* m, uint32_t result) |
584 | { |
585 | uint32_t info = 0; |
586 | |
587 | msg_get_uint32(m, RW_FIELD_INFO, &info); |
588 | |
589 | if ((info & RW_INFO_NO_REPL_ACK) != 0) { |
590 | as_fabric_msg_put(m); |
591 | return; |
592 | } |
593 | |
594 | msg_preserve_fields(m, 3, RW_FIELD_NS_ID, RW_FIELD_DIGEST, RW_FIELD_TID); |
595 | |
596 | msg_set_uint32(m, RW_FIELD_OP, RW_OP_WRITE_ACK); |
597 | msg_set_uint32(m, RW_FIELD_RESULT, result); |
598 | |
599 | if (as_fabric_send(node, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
600 | as_fabric_msg_put(m); |
601 | } |
602 | } |
603 | |
604 | |
605 | void |
606 | send_repl_write_ack_w_digest(cf_node node, msg* m, uint32_t result, |
607 | const cf_digest* keyd) |
608 | { |
609 | uint32_t info = 0; |
610 | |
611 | msg_get_uint32(m, RW_FIELD_INFO, &info); |
612 | |
613 | if ((info & RW_INFO_NO_REPL_ACK) != 0) { |
614 | as_fabric_msg_put(m); |
615 | return; |
616 | } |
617 | |
618 | msg_preserve_fields(m, 2, RW_FIELD_NS_ID, RW_FIELD_TID); |
619 | |
620 | msg_set_uint32(m, RW_FIELD_OP, RW_OP_WRITE_ACK); |
621 | msg_set_uint32(m, RW_FIELD_RESULT, result); |
622 | msg_set_buf(m, RW_FIELD_DIGEST, (const uint8_t*)keyd, sizeof(cf_digest), |
623 | MSG_SET_COPY); |
624 | |
625 | if (as_fabric_send(node, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
626 | as_fabric_msg_put(m); |
627 | } |
628 | } |
629 | |
630 | |
631 | uint32_t |
632 | parse_result_code(msg* m) |
633 | { |
634 | uint32_t result_code; |
635 | |
636 | if (msg_get_uint32(m, RW_FIELD_RESULT, &result_code) != 0) { |
637 | cf_warning(AS_RW, "repl-write ack: no result_code" ); |
638 | return AS_ERR_UNKNOWN; |
639 | } |
640 | |
641 | return result_code; |
642 | } |
643 | |
644 | |
645 | void |
646 | drop_replica(as_partition_reservation* rsv, cf_digest* keyd, bool is_xdr_op, |
647 | cf_node master) |
648 | { |
649 | // Shortcut pointers & flags. |
650 | as_namespace* ns = rsv->ns; |
651 | as_index_tree* tree = rsv->tree; |
652 | |
653 | as_index_ref r_ref; |
654 | |
655 | if (as_record_get(tree, keyd, &r_ref) != 0) { |
656 | return; // not found is ok from master's perspective. |
657 | } |
658 | |
659 | as_record* r = r_ref.r; |
660 | |
661 | if (ns->storage_data_in_memory) { |
662 | record_delete_adjust_sindex(r, ns); |
663 | } |
664 | |
665 | // Save the set-ID for XDR. |
666 | uint16_t set_id = as_index_get_set_id(r); |
667 | |
668 | as_index_delete(tree, keyd); |
669 | as_record_done(&r_ref, ns); |
670 | |
671 | if (xdr_must_ship_delete(ns, is_xdr_op)) { |
672 | xdr_write(ns, keyd, 0, master, XDR_OP_TYPE_DROP, set_id, NULL); |
673 | } |
674 | } |
675 | |