1 | /* |
2 | * write.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/write.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | |
33 | #include "aerospike/as_atomic.h" |
34 | #include "citrusleaf/alloc.h" |
35 | #include "citrusleaf/cf_atomic.h" |
36 | #include "citrusleaf/cf_clock.h" |
37 | |
38 | #include "cf_mutex.h" |
39 | #include "dynbuf.h" |
40 | #include "fault.h" |
41 | |
42 | #include "base/cfg.h" |
43 | #include "base/datamodel.h" |
44 | #include "base/index.h" |
45 | #include "base/predexp.h" |
46 | #include "base/proto.h" |
47 | #include "base/secondary_index.h" |
48 | #include "base/transaction.h" |
49 | #include "base/transaction_policy.h" |
50 | #include "base/truncate.h" |
51 | #include "base/xdr_serverside.h" |
52 | #include "fabric/exchange.h" // TODO - old pickle - remove in "six months" |
53 | #include "fabric/partition.h" |
54 | #include "storage/storage.h" |
55 | #include "transaction/duplicate_resolve.h" |
56 | #include "transaction/proxy.h" |
57 | #include "transaction/replica_write.h" |
58 | #include "transaction/rw_request.h" |
59 | #include "transaction/rw_request_hash.h" |
60 | #include "transaction/rw_utils.h" |
61 | |
62 | |
63 | //========================================================== |
64 | // Typedefs & constants. |
65 | // |
66 | |
67 | #define STACK_PARTICLES_SIZE (1024 * 1024) |
68 | |
69 | |
70 | //========================================================== |
71 | // Forward declarations. |
72 | // |
73 | |
74 | void start_write_dup_res(rw_request* rw, as_transaction* tr); |
75 | void start_write_repl_write(rw_request* rw, as_transaction* tr); |
76 | void start_write_repl_write_forget(rw_request* rw, as_transaction* tr); |
77 | bool write_dup_res_cb(rw_request* rw); |
78 | void write_repl_write_after_dup_res(rw_request* rw, as_transaction* tr); |
79 | void write_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr); |
80 | void write_repl_write_cb(rw_request* rw); |
81 | |
82 | void send_write_response(as_transaction* tr, cf_dyn_buf* db); |
83 | void write_timeout_cb(rw_request* rw); |
84 | |
85 | transaction_status write_master(rw_request* rw, as_transaction* tr); |
86 | void write_master_failed(as_transaction* tr, as_index_ref* r_ref, |
87 | bool record_created, as_index_tree* tree, as_storage_rd* rd, |
88 | int result_code); |
89 | int write_master_preprocessing(as_transaction* tr); |
90 | int write_master_policies(as_transaction* tr, bool* p_must_not_create, |
91 | bool* p_record_level_replace, bool* p_must_fetch_data); |
92 | bool check_msg_set_name(as_transaction* tr, const char* set_name); |
93 | int iops_predexp_filter_meta(const as_transaction* tr, const as_record* r, |
94 | predexp_eval_t** predexp); |
95 | |
96 | int write_master_dim_single_bin(as_transaction* tr, as_storage_rd* rd, |
97 | rw_request* rw, bool* is_delete, xdr_dirty_bins* dirty_bins); |
98 | int write_master_dim(as_transaction* tr, as_storage_rd* rd, |
99 | bool record_level_replace, rw_request* rw, bool* is_delete, |
100 | xdr_dirty_bins* dirty_bins); |
101 | int write_master_ssd_single_bin(as_transaction* tr, as_storage_rd* rd, |
102 | bool must_fetch_data, rw_request* rw, bool* is_delete, |
103 | xdr_dirty_bins* dirty_bins); |
104 | int write_master_ssd(as_transaction* tr, as_storage_rd* rd, |
105 | bool must_fetch_data, bool record_level_replace, rw_request* rw, |
106 | bool* is_delete, xdr_dirty_bins* dirty_bins); |
107 | |
108 | void write_master_update_index_metadata(as_transaction* tr, index_metadata* old, |
109 | as_record* r); |
110 | int write_master_bin_ops(as_transaction* tr, as_storage_rd* rd, |
111 | cf_ll_buf* particles_llb, as_bin* cleanup_bins, |
112 | uint32_t* p_n_cleanup_bins, cf_dyn_buf* db, uint32_t* p_n_final_bins, |
113 | xdr_dirty_bins* dirty_bins); |
114 | int write_master_bin_ops_loop(as_transaction* tr, as_storage_rd* rd, |
115 | as_msg_op** ops, as_bin* response_bins, uint32_t* p_n_response_bins, |
116 | as_bin* result_bins, uint32_t* p_n_result_bins, |
117 | cf_ll_buf* particles_llb, as_bin* cleanup_bins, |
118 | uint32_t* p_n_cleanup_bins, xdr_dirty_bins* dirty_bins); |
119 | |
120 | void write_master_index_metadata_unwind(index_metadata* old, as_record* r); |
121 | void write_master_dim_single_bin_unwind(as_bin* old_bin, as_bin* new_bin, |
122 | as_bin* cleanup_bins, uint32_t n_cleanup_bins); |
123 | void write_master_dim_unwind(as_bin* old_bins, uint32_t n_old_bins, |
124 | as_bin* new_bins, uint32_t n_new_bins, as_bin* cleanup_bins, |
125 | uint32_t n_cleanup_bins); |
126 | |
127 | |
128 | //========================================================== |
129 | // Inlines & macros. |
130 | // |
131 | |
132 | static inline void |
133 | client_write_update_stats(as_namespace* ns, uint8_t result_code, bool is_xdr_op) |
134 | { |
135 | switch (result_code) { |
136 | case AS_OK: |
137 | cf_atomic64_incr(&ns->n_client_write_success); |
138 | if (is_xdr_op) { |
139 | cf_atomic64_incr(&ns->n_xdr_client_write_success); |
140 | } |
141 | break; |
142 | default: |
143 | cf_atomic64_incr(&ns->n_client_write_error); |
144 | if (is_xdr_op) { |
145 | cf_atomic64_incr(&ns->n_xdr_client_write_error); |
146 | } |
147 | break; |
148 | case AS_ERR_TIMEOUT: |
149 | cf_atomic64_incr(&ns->n_client_write_timeout); |
150 | if (is_xdr_op) { |
151 | cf_atomic64_incr(&ns->n_xdr_client_write_timeout); |
152 | } |
153 | break; |
154 | case AS_ERR_FILTERED_OUT: |
155 | // Can't be an XDR write. |
156 | cf_atomic64_incr(&ns->n_client_write_filtered_out); |
157 | break; |
158 | } |
159 | } |
160 | |
161 | static inline void |
162 | from_proxy_write_update_stats(as_namespace* ns, uint8_t result_code, |
163 | bool is_xdr_op) |
164 | { |
165 | switch (result_code) { |
166 | case AS_OK: |
167 | cf_atomic64_incr(&ns->n_from_proxy_write_success); |
168 | if (is_xdr_op) { |
169 | cf_atomic64_incr(&ns->n_xdr_from_proxy_write_success); |
170 | } |
171 | break; |
172 | default: |
173 | cf_atomic64_incr(&ns->n_from_proxy_write_error); |
174 | if (is_xdr_op) { |
175 | cf_atomic64_incr(&ns->n_xdr_from_proxy_write_error); |
176 | } |
177 | break; |
178 | case AS_ERR_TIMEOUT: |
179 | cf_atomic64_incr(&ns->n_from_proxy_write_timeout); |
180 | if (is_xdr_op) { |
181 | cf_atomic64_incr(&ns->n_xdr_from_proxy_write_timeout); |
182 | } |
183 | break; |
184 | case AS_ERR_FILTERED_OUT: |
185 | // Can't be an XDR write. |
186 | cf_atomic64_incr(&ns->n_from_proxy_write_filtered_out); |
187 | break; |
188 | } |
189 | } |
190 | |
191 | static inline void |
192 | ops_sub_write_update_stats(as_namespace* ns, uint8_t result_code) |
193 | { |
194 | switch (result_code) { |
195 | case AS_OK: |
196 | cf_atomic64_incr(&ns->n_ops_sub_write_success); |
197 | break; |
198 | default: |
199 | cf_atomic64_incr(&ns->n_ops_sub_write_error); |
200 | break; |
201 | case AS_ERR_TIMEOUT: |
202 | cf_atomic64_incr(&ns->n_ops_sub_write_timeout); |
203 | break; |
204 | case AS_ERR_FILTERED_OUT: // doesn't include those filtered out by metadata |
205 | as_incr_uint64(&ns->n_ops_sub_write_filtered_out); |
206 | break; |
207 | } |
208 | } |
209 | |
210 | static inline void |
211 | append_bin_to_destroy(as_bin* b, as_bin* bins, uint32_t* p_n_bins) |
212 | { |
213 | if (as_bin_is_external_particle(b)) { |
214 | bins[(*p_n_bins)++] = *b; |
215 | } |
216 | } |
217 | |
218 | |
219 | //========================================================== |
220 | // Public API. |
221 | // |
222 | |
223 | transaction_status |
224 | as_write_start(as_transaction* tr) |
225 | { |
226 | BENCHMARK_START(tr, write, FROM_CLIENT); |
227 | BENCHMARK_START(tr, ops_sub, FROM_IOPS); |
228 | |
229 | // Apply XDR filter. |
230 | if (! xdr_allows_write(tr)) { |
231 | tr->result_code = AS_ERR_ALWAYS_FORBIDDEN; |
232 | send_write_response(tr, NULL); |
233 | return TRANS_DONE_ERROR; |
234 | } |
235 | |
236 | // Check that we aren't backed up. |
237 | if (as_storage_overloaded(tr->rsv.ns)) { |
238 | tr->result_code = AS_ERR_DEVICE_OVERLOAD; |
239 | send_write_response(tr, NULL); |
240 | return TRANS_DONE_ERROR; |
241 | } |
242 | |
243 | // Create rw_request and add to hash. |
244 | rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd }; |
245 | rw_request* rw = rw_request_create(&tr->keyd); |
246 | transaction_status status = rw_request_hash_insert(&hkey, rw, tr); |
247 | |
248 | // If rw_request wasn't inserted in hash, transaction is finished. |
249 | if (status != TRANS_IN_PROGRESS) { |
250 | rw_request_release(rw); |
251 | |
252 | if (status != TRANS_WAITING) { |
253 | send_write_response(tr, NULL); |
254 | } |
255 | |
256 | return status; |
257 | } |
258 | // else - rw_request is now in hash, continue... |
259 | |
260 | if (tr->rsv.ns->write_dup_res_disabled) { |
261 | // Note - preventing duplicate resolution this way allows |
262 | // rw_request_destroy() to handle dup_msg[] cleanup correctly. |
263 | tr->rsv.n_dupl = 0; |
264 | } |
265 | |
266 | // If there are duplicates to resolve, start doing so. |
267 | if (tr->rsv.n_dupl != 0) { |
268 | start_write_dup_res(rw, tr); |
269 | |
270 | // Started duplicate resolution. |
271 | return TRANS_IN_PROGRESS; |
272 | } |
273 | // else - no duplicate resolution phase, apply operation to master. |
274 | |
275 | // Set up the nodes to which we'll write replicas. |
276 | rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p, |
277 | rw->dest_nodes); |
278 | |
279 | if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) { |
280 | rw_request_hash_delete(&hkey, rw); |
281 | tr->result_code = AS_ERR_UNAVAILABLE; |
282 | send_write_response(tr, NULL); |
283 | return TRANS_DONE_ERROR; |
284 | } |
285 | |
286 | status = write_master(rw, tr); |
287 | |
288 | BENCHMARK_NEXT_DATA_POINT_FROM(tr, write, FROM_CLIENT, master); |
289 | BENCHMARK_NEXT_DATA_POINT_FROM(tr, ops_sub, FROM_IOPS, master); |
290 | |
291 | // If error, transaction is finished. |
292 | if (status != TRANS_IN_PROGRESS) { |
293 | rw_request_hash_delete(&hkey, rw); |
294 | |
295 | if (status != TRANS_WAITING) { |
296 | send_write_response(tr, NULL); |
297 | } |
298 | |
299 | return status; |
300 | } |
301 | |
302 | // If we don't need replica writes, transaction is finished. |
303 | if (rw->n_dest_nodes == 0) { |
304 | finished_replicated(tr); |
305 | send_write_response(tr, &rw->response_db); |
306 | rw_request_hash_delete(&hkey, rw); |
307 | return TRANS_DONE_SUCCESS; |
308 | } |
309 | |
310 | // If we don't need to wait for replica write acks, fire and forget. |
311 | if (respond_on_master_complete(tr)) { |
312 | start_write_repl_write_forget(rw, tr); |
313 | send_write_response(tr, &rw->response_db); |
314 | rw_request_hash_delete(&hkey, rw); |
315 | return TRANS_DONE_SUCCESS; |
316 | } |
317 | |
318 | start_write_repl_write(rw, tr); |
319 | |
320 | // Started replica write. |
321 | return TRANS_IN_PROGRESS; |
322 | } |
323 | |
324 | |
325 | //========================================================== |
326 | // Local helpers - transaction flow. |
327 | // |
328 | |
329 | void |
330 | start_write_dup_res(rw_request* rw, as_transaction* tr) |
331 | { |
332 | // Finish initializing rw, construct and send dup-res message. |
333 | |
334 | dup_res_make_message(rw, tr); |
335 | |
336 | cf_mutex_lock(&rw->lock); |
337 | |
338 | dup_res_setup_rw(rw, tr, write_dup_res_cb, write_timeout_cb); |
339 | send_rw_messages(rw); |
340 | |
341 | cf_mutex_unlock(&rw->lock); |
342 | } |
343 | |
344 | |
345 | void |
346 | start_write_repl_write(rw_request* rw, as_transaction* tr) |
347 | { |
348 | // Finish initializing rw, construct and send repl-write message. |
349 | |
350 | repl_write_make_message(rw, tr); |
351 | |
352 | cf_mutex_lock(&rw->lock); |
353 | |
354 | repl_write_setup_rw(rw, tr, write_repl_write_cb, write_timeout_cb); |
355 | send_rw_messages(rw); |
356 | |
357 | cf_mutex_unlock(&rw->lock); |
358 | } |
359 | |
360 | |
361 | void |
362 | start_write_repl_write_forget(rw_request* rw, as_transaction* tr) |
363 | { |
364 | // Construct and send repl-write message. No need to finish rw setup. |
365 | |
366 | repl_write_make_message(rw, tr); |
367 | send_rw_messages_forget(rw); |
368 | } |
369 | |
370 | |
371 | bool |
372 | write_dup_res_cb(rw_request* rw) |
373 | { |
374 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, write, FROM_CLIENT, dup_res); |
375 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, ops_sub, FROM_IOPS, dup_res); |
376 | |
377 | as_transaction tr; |
378 | as_transaction_init_from_rw(&tr, rw); |
379 | |
380 | if (tr.result_code != AS_OK) { |
381 | send_write_response(&tr, NULL); |
382 | return true; |
383 | } |
384 | |
385 | // Set up the nodes to which we'll write replicas. |
386 | rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p, |
387 | rw->dest_nodes); |
388 | |
389 | if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) { |
390 | tr.result_code = AS_ERR_UNAVAILABLE; |
391 | send_write_response(&tr, NULL); |
392 | return true; |
393 | } |
394 | |
395 | transaction_status status = write_master(rw, &tr); |
396 | |
397 | BENCHMARK_NEXT_DATA_POINT_FROM((&tr), write, FROM_CLIENT, master); |
398 | BENCHMARK_NEXT_DATA_POINT_FROM((&tr), ops_sub, FROM_IOPS, master); |
399 | |
400 | if (status == TRANS_WAITING) { |
401 | // Note - new tr now owns msgp, make sure rw destructor doesn't free it. |
402 | // Also, rw will release rsv - new tr will get a new one. |
403 | rw->msgp = NULL; |
404 | return true; |
405 | } |
406 | |
407 | if (status == TRANS_DONE_ERROR) { |
408 | send_write_response(&tr, NULL); |
409 | return true; |
410 | } |
411 | |
412 | // If we don't need replica writes, transaction is finished. |
413 | if (rw->n_dest_nodes == 0) { |
414 | finished_replicated(&tr); |
415 | send_write_response(&tr, &rw->response_db); |
416 | return true; |
417 | } |
418 | |
419 | // If we don't need to wait for replica write acks, fire and forget. |
420 | if (respond_on_master_complete(&tr)) { |
421 | write_repl_write_forget_after_dup_res(rw, &tr); |
422 | send_write_response(&tr, &rw->response_db); |
423 | return true; |
424 | } |
425 | |
426 | write_repl_write_after_dup_res(rw, &tr); |
427 | |
428 | // Started replica write - don't delete rw_request from hash. |
429 | return false; |
430 | } |
431 | |
432 | |
433 | void |
434 | write_repl_write_after_dup_res(rw_request* rw, as_transaction* tr) |
435 | { |
436 | // Recycle rw_request that was just used for duplicate resolution to now do |
437 | // replica writes. Note - we are under the rw_request lock here! |
438 | |
439 | repl_write_make_message(rw, tr); |
440 | repl_write_reset_rw(rw, tr, write_repl_write_cb); |
441 | send_rw_messages(rw); |
442 | } |
443 | |
444 | |
445 | void |
446 | write_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr) |
447 | { |
448 | // Send replica writes. Not waiting for acks, so need to reset rw_request. |
449 | // Note - we are under the rw_request lock here! |
450 | |
451 | repl_write_make_message(rw, tr); |
452 | send_rw_messages_forget(rw); |
453 | } |
454 | |
455 | |
456 | void |
457 | write_repl_write_cb(rw_request* rw) |
458 | { |
459 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, write, FROM_CLIENT, repl_write); |
460 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, ops_sub, FROM_IOPS, repl_write); |
461 | |
462 | as_transaction tr; |
463 | as_transaction_init_from_rw(&tr, rw); |
464 | |
465 | finished_replicated(&tr); |
466 | send_write_response(&tr, &rw->response_db); |
467 | |
468 | // Finished transaction - rw_request cleans up reservation and msgp! |
469 | } |
470 | |
471 | |
472 | //========================================================== |
473 | // Local helpers - transaction end. |
474 | // |
475 | |
476 | void |
477 | send_write_response(as_transaction* tr, cf_dyn_buf* db) |
478 | { |
479 | // Paranoia - shouldn't get here on losing race with timeout. |
480 | if (! tr->from.any) { |
481 | cf_warning(AS_RW, "transaction origin %u has null 'from'" , tr->origin); |
482 | return; |
483 | } |
484 | |
485 | // Note - if tr was setup from rw, rw->from.any has been set null and |
486 | // informs timeout it lost the race. |
487 | |
488 | clear_delete_response_metadata(tr); |
489 | |
490 | switch (tr->origin) { |
491 | case FROM_CLIENT: |
492 | if (db && db->used_sz != 0) { |
493 | as_msg_send_ops_reply(tr->from.proto_fd_h, db); |
494 | } |
495 | else { |
496 | as_msg_send_reply(tr->from.proto_fd_h, tr->result_code, |
497 | tr->generation, tr->void_time, NULL, NULL, 0, tr->rsv.ns, |
498 | as_transaction_trid(tr)); |
499 | } |
500 | BENCHMARK_NEXT_DATA_POINT(tr, write, response); |
501 | HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(tr, write_hist); |
502 | client_write_update_stats(tr->rsv.ns, tr->result_code, |
503 | as_transaction_is_xdr(tr)); |
504 | break; |
505 | case FROM_PROXY: |
506 | if (db && db->used_sz != 0) { |
507 | as_proxy_send_ops_response(tr->from.proxy_node, |
508 | tr->from_data.proxy_tid, db); |
509 | } |
510 | else { |
511 | as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid, |
512 | tr->result_code, tr->generation, tr->void_time, NULL, NULL, |
513 | 0, tr->rsv.ns, as_transaction_trid(tr)); |
514 | } |
515 | from_proxy_write_update_stats(tr->rsv.ns, tr->result_code, |
516 | as_transaction_is_xdr(tr)); |
517 | break; |
518 | case FROM_IOPS: |
519 | tr->from.iops_orig->cb(tr->from.iops_orig->udata, tr->result_code); |
520 | BENCHMARK_NEXT_DATA_POINT(tr, ops_sub, response); |
521 | ops_sub_write_update_stats(tr->rsv.ns, tr->result_code); |
522 | break; |
523 | default: |
524 | cf_crash(AS_RW, "unexpected transaction origin %u" , tr->origin); |
525 | break; |
526 | } |
527 | |
528 | tr->from.any = NULL; // pattern, not needed |
529 | } |
530 | |
531 | |
532 | void |
533 | write_timeout_cb(rw_request* rw) |
534 | { |
535 | if (! rw->from.any) { |
536 | return; // lost race against dup-res or repl-write callback |
537 | } |
538 | |
539 | finished_not_replicated(rw); |
540 | |
541 | switch (rw->origin) { |
542 | case FROM_CLIENT: |
543 | as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL, |
544 | 0, rw->rsv.ns, rw_request_trid(rw)); |
545 | // Timeouts aren't included in histograms. |
546 | client_write_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT, |
547 | as_msg_is_xdr(&rw->msgp->msg)); |
548 | break; |
549 | case FROM_PROXY: |
550 | from_proxy_write_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT, |
551 | as_msg_is_xdr(&rw->msgp->msg)); |
552 | break; |
553 | case FROM_IOPS: |
554 | rw->from.iops_orig->cb(rw->from.iops_orig->udata, AS_ERR_TIMEOUT); |
555 | // Timeouts aren't included in histograms. |
556 | ops_sub_write_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
557 | break; |
558 | default: |
559 | cf_crash(AS_RW, "unexpected transaction origin %u" , rw->origin); |
560 | break; |
561 | } |
562 | |
563 | rw->from.any = NULL; // inform other callback it lost the race |
564 | } |
565 | |
566 | |
567 | //========================================================== |
568 | // Local helpers - write master. |
569 | // |
570 | |
571 | transaction_status |
572 | write_master(rw_request* rw, as_transaction* tr) |
573 | { |
574 | CF_ALLOC_SET_NS_ARENA(tr->rsv.ns); |
575 | |
576 | //------------------------------------------------------ |
577 | // Perform checks that don't need to loop over ops, or |
578 | // create or find (and lock) the as_index. |
579 | // |
580 | |
581 | if (! write_master_preprocessing(tr)) { |
582 | // Failure cases all call write_master_failed(). |
583 | return TRANS_DONE_ERROR; |
584 | } |
585 | |
586 | //------------------------------------------------------ |
587 | // Loop over ops to set some essential policy flags. |
588 | // |
589 | |
590 | bool must_not_create; |
591 | bool record_level_replace; |
592 | bool must_fetch_data; |
593 | |
594 | int result = write_master_policies(tr, &must_not_create, |
595 | &record_level_replace, &must_fetch_data); |
596 | |
597 | if (result != 0) { |
598 | write_master_failed(tr, 0, false, 0, 0, result); |
599 | return TRANS_DONE_ERROR; |
600 | } |
601 | |
602 | //------------------------------------------------------ |
603 | // Find or create the as_index and get a reference - |
604 | // this locks the record. Perform all checks that don't |
605 | // need the as_storage_rd. |
606 | // |
607 | |
608 | // Shortcut pointers. |
609 | as_msg* m = &tr->msgp->msg; |
610 | as_namespace* ns = tr->rsv.ns; |
611 | as_index_tree* tree = tr->rsv.tree; |
612 | |
613 | // Find or create as_index, populate as_index_ref, lock record. |
614 | as_index_ref r_ref; |
615 | as_record* r = NULL; |
616 | bool record_created = false; |
617 | |
618 | if (must_not_create) { |
619 | if (as_record_get(tree, &tr->keyd, &r_ref) != 0) { |
620 | write_master_failed(tr, 0, record_created, tree, 0, AS_ERR_NOT_FOUND); |
621 | return TRANS_DONE_ERROR; |
622 | } |
623 | |
624 | r = r_ref.r; |
625 | |
626 | if (as_record_is_doomed(r, ns)) { |
627 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_NOT_FOUND); |
628 | return TRANS_DONE_ERROR; |
629 | } |
630 | |
631 | if (repl_state_check(r, tr) < 0) { |
632 | as_record_done(&r_ref, ns); |
633 | return TRANS_WAITING; |
634 | } |
635 | |
636 | if (! as_record_is_live(r)) { |
637 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_NOT_FOUND); |
638 | return TRANS_DONE_ERROR; |
639 | } |
640 | } |
641 | else { |
642 | int rv = as_record_get_create(tree, &tr->keyd, &r_ref, ns); |
643 | |
644 | if (rv < 0) { |
645 | cf_detail_digest(AS_RW, &tr->keyd, "{%s} write_master: fail as_record_get_create() " , ns->name); |
646 | write_master_failed(tr, 0, record_created, tree, 0, AS_ERR_UNKNOWN); |
647 | return TRANS_DONE_ERROR; |
648 | } |
649 | |
650 | r = r_ref.r; |
651 | record_created = rv == 1; |
652 | |
653 | bool is_doomed = as_record_is_doomed(r, ns); |
654 | |
655 | if (! record_created && ! is_doomed && repl_state_check(r, tr) < 0) { |
656 | as_record_done(&r_ref, ns); |
657 | return TRANS_WAITING; |
658 | } |
659 | |
660 | // If it's an expired or truncated record, pretend it's a fresh create. |
661 | if (! record_created && is_doomed) { |
662 | as_record_rescue(&r_ref, ns); |
663 | record_created = true; |
664 | } |
665 | } |
666 | |
667 | // Enforce record-level create-only existence policy. |
668 | if (! record_created && ! create_only_check(r, m)) { |
669 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_RECORD_EXISTS); |
670 | return TRANS_DONE_ERROR; |
671 | } |
672 | |
673 | // Check generation requirement, if any. |
674 | if (! generation_check(r, m, ns)) { |
675 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_GENERATION); |
676 | return TRANS_DONE_ERROR; |
677 | } |
678 | |
679 | // If creating record, write set-ID into index. |
680 | if (record_created) { |
681 | int rv_set = as_transaction_has_set(tr) ? |
682 | set_set_from_msg(r, ns, m) : 0; |
683 | |
684 | if (rv_set == -1) { |
685 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: set can't be added " , ns->name); |
686 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_PARAMETER); |
687 | return TRANS_DONE_ERROR; |
688 | } |
689 | else if (rv_set == -2) { |
690 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_FORBIDDEN); |
691 | return TRANS_DONE_ERROR; |
692 | } |
693 | |
694 | // Don't write record if it would be truncated. |
695 | if (as_truncate_now_is_truncated(ns, as_index_get_set_id(r))) { |
696 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_FORBIDDEN); |
697 | return TRANS_DONE_ERROR; |
698 | } |
699 | } |
700 | |
701 | // Shortcut set name. |
702 | const char* set_name = as_index_get_set_name(r, ns); |
703 | |
704 | // If record existed, check that as_msg set name matches. |
705 | if (! record_created && tr->origin != FROM_IOPS && |
706 | ! check_msg_set_name(tr, set_name)) { |
707 | write_master_failed(tr, &r_ref, record_created, tree, 0, AS_ERR_PARAMETER); |
708 | return TRANS_DONE_ERROR; |
709 | } |
710 | |
711 | // Apply predexp metadata filter if present. |
712 | |
713 | predexp_eval_t* predexp = NULL; |
714 | predexp_eval_t* iops_predexp = NULL; |
715 | |
716 | if (! record_created && as_record_is_live(r) && |
717 | (result = tr->origin == FROM_IOPS ? |
718 | iops_predexp_filter_meta(tr, r, &iops_predexp) : |
719 | build_predexp_and_filter_meta(tr, r, &predexp)) != 0) { |
720 | write_master_failed(tr, &r_ref, false, tree, 0, result); |
721 | return TRANS_DONE_ERROR; |
722 | } |
723 | |
724 | //------------------------------------------------------ |
725 | // Open or create the as_storage_rd, and handle record |
726 | // metadata. |
727 | // |
728 | |
729 | as_storage_rd rd; |
730 | |
731 | if (record_created) { |
732 | as_storage_record_create(ns, r, &rd); |
733 | } |
734 | else { |
735 | as_storage_record_open(ns, r, &rd); |
736 | } |
737 | |
738 | // Apply predexp record bins filter if present. |
739 | if (predexp != NULL || iops_predexp != NULL) { |
740 | if ((result = predexp_read_and_filter_bins(&rd, |
741 | tr->origin == FROM_IOPS ? iops_predexp : predexp)) != 0) { |
742 | predexp_destroy(predexp); |
743 | write_master_failed(tr, &r_ref, false, tree, &rd, result); |
744 | return TRANS_DONE_ERROR; |
745 | } |
746 | |
747 | predexp_destroy(predexp); |
748 | } |
749 | |
750 | // Deal with delete durability (enterprise only). |
751 | if ((result = set_delete_durablility(tr, &rd)) != 0) { |
752 | write_master_failed(tr, &r_ref, record_created, tree, &rd, result); |
753 | return TRANS_DONE_ERROR; |
754 | } |
755 | |
756 | // Shortcut for set name storage. |
757 | if (set_name) { |
758 | rd.set_name = set_name; |
759 | rd.set_name_len = strlen(set_name); |
760 | } |
761 | |
762 | // Deal with key storage as needed. |
763 | if ((result = handle_msg_key(tr, &rd)) != 0) { |
764 | write_master_failed(tr, &r_ref, record_created, tree, &rd, result); |
765 | return TRANS_DONE_ERROR; |
766 | } |
767 | |
768 | // Convert message TTL special value if appropriate. |
769 | if (record_created && m->record_ttl == TTL_DONT_UPDATE) { |
770 | m->record_ttl = TTL_NAMESPACE_DEFAULT; |
771 | } |
772 | |
773 | // Will we need a pickle? |
774 | // TODO - old pickle - remove condition in "six months". |
775 | if (as_exchange_min_compatibility_id() >= 3) { |
776 | rd.keep_pickle = rw->n_dest_nodes != 0; |
777 | } |
778 | |
779 | //------------------------------------------------------ |
780 | // Split write_master() according to configuration to |
781 | // handle record bins. |
782 | // |
783 | |
784 | xdr_dirty_bins dirty_bins; |
785 | xdr_clear_dirty_bins(&dirty_bins); |
786 | |
787 | bool is_delete = false; |
788 | |
789 | if (ns->storage_data_in_memory) { |
790 | if (ns->single_bin) { |
791 | result = write_master_dim_single_bin(tr, &rd, |
792 | rw, &is_delete, &dirty_bins); |
793 | } |
794 | else { |
795 | result = write_master_dim(tr, &rd, |
796 | record_level_replace, |
797 | rw, &is_delete, &dirty_bins); |
798 | } |
799 | } |
800 | else { |
801 | if (ns->single_bin) { |
802 | result = write_master_ssd_single_bin(tr, &rd, |
803 | must_fetch_data, |
804 | rw, &is_delete, &dirty_bins); |
805 | } |
806 | else { |
807 | result = write_master_ssd(tr, &rd, |
808 | must_fetch_data, record_level_replace, |
809 | rw, &is_delete, &dirty_bins); |
810 | } |
811 | } |
812 | |
813 | if (result != 0) { |
814 | write_master_failed(tr, &r_ref, record_created, tree, &rd, result); |
815 | return TRANS_DONE_ERROR; |
816 | } |
817 | |
818 | //------------------------------------------------------ |
819 | // Done - complete function's output, release the record |
820 | // lock, and do XDR write if appropriate. |
821 | // |
822 | |
823 | tr->generation = r->generation; |
824 | tr->void_time = r->void_time; |
825 | tr->last_update_time = r->last_update_time; |
826 | |
827 | // Get set-id before releasing. |
828 | uint16_t set_id = as_index_get_set_id(r_ref.r); |
829 | |
830 | // Collect more info for XDR. |
831 | uint16_t generation = plain_generation(r->generation, ns); |
832 | xdr_op_type op_type = XDR_OP_TYPE_WRITE; |
833 | |
834 | // Handle deletion if appropriate. |
835 | if (is_delete) { |
836 | write_delete_record(r_ref.r, tree); |
837 | cf_atomic64_incr(&ns->n_deleted_last_bin); |
838 | tr->flags |= AS_TRANSACTION_FLAG_IS_DELETE; |
839 | |
840 | generation = 0; |
841 | op_type = as_transaction_is_durable_delete(tr) ? |
842 | XDR_OP_TYPE_DURABLE_DELETE : XDR_OP_TYPE_DROP; |
843 | } |
844 | // Or (normally) adjust max void-time. |
845 | else if (r->void_time != 0) { |
846 | cf_atomic32_setmax(&tr->rsv.p->max_void_time, (int32_t)r->void_time); |
847 | } |
848 | |
849 | will_replicate(r, ns); |
850 | |
851 | as_storage_record_close(&rd); |
852 | as_record_done(&r_ref, ns); |
853 | |
854 | // Don't send an XDR delete if it's disallowed. |
855 | if (is_delete && ! is_xdr_delete_shipping_enabled()) { |
856 | return TRANS_IN_PROGRESS; |
857 | } |
858 | |
859 | // Do an XDR write if the write is a non-XDR write or is an XDR write with |
860 | // forwarding enabled. |
861 | if (! as_msg_is_xdr(m) || is_xdr_forwarding_enabled() || |
862 | ns->ns_forward_xdr_writes) { |
863 | xdr_write(ns, &tr->keyd, generation, 0, op_type, set_id, &dirty_bins); |
864 | } |
865 | |
866 | return TRANS_IN_PROGRESS; |
867 | } |
868 | |
869 | |
870 | void |
871 | write_master_failed(as_transaction* tr, as_index_ref* r_ref, |
872 | bool record_created, as_index_tree* tree, as_storage_rd* rd, |
873 | int result_code) |
874 | { |
875 | as_namespace* ns = tr->rsv.ns; |
876 | |
877 | if (r_ref) { |
878 | if (rd) { |
879 | as_storage_record_close(rd); |
880 | } |
881 | |
882 | if (record_created) { |
883 | as_index_delete(tree, &tr->keyd); |
884 | } |
885 | |
886 | as_record_done(r_ref, ns); |
887 | } |
888 | |
889 | switch (result_code) { |
890 | case AS_ERR_GENERATION: |
891 | cf_atomic64_incr(&ns->n_fail_generation); |
892 | break; |
893 | case AS_ERR_RECORD_TOO_BIG: |
894 | cf_detail_digest(AS_RW, &tr->keyd, "{%s} write_master: record too big " , ns->name); |
895 | cf_atomic64_incr(&ns->n_fail_record_too_big); |
896 | break; |
897 | default: |
898 | // These either log warnings or aren't interesting enough to count. |
899 | break; |
900 | } |
901 | |
902 | tr->result_code = (uint8_t)result_code; |
903 | } |
904 | |
905 | |
906 | int |
907 | write_master_preprocessing(as_transaction* tr) |
908 | { |
909 | as_namespace* ns = tr->rsv.ns; |
910 | as_msg* m = &tr->msgp->msg; |
911 | |
912 | if (ns->clock_skew_stop_writes) { |
913 | // TODO - new error code? |
914 | write_master_failed(tr, 0, false, 0, 0, AS_ERR_FORBIDDEN); |
915 | return false; |
916 | } |
917 | |
918 | // ns->stop_writes is set by nsup if configured threshold is breached. |
919 | if (ns->stop_writes) { |
920 | write_master_failed(tr, 0, false, 0, 0, AS_ERR_OUT_OF_SPACE); |
921 | return false; |
922 | } |
923 | |
924 | if (! as_storage_has_space(ns)) { |
925 | cf_warning(AS_RW, "{%s}: write_master: drives full" , ns->name); |
926 | write_master_failed(tr, 0, false, 0, 0, AS_ERR_OUT_OF_SPACE); |
927 | return false; |
928 | } |
929 | |
930 | if (! is_valid_ttl(m->record_ttl)) { |
931 | cf_warning(AS_RW, "write_master: invalid ttl %u" , m->record_ttl); |
932 | write_master_failed(tr, 0, false, 0, 0, AS_ERR_PARAMETER); |
933 | return false; |
934 | } |
935 | |
936 | // Fail if disallow_null_setname is true and set name is absent or empty. |
937 | if (ns->disallow_null_setname) { |
938 | as_msg_field* f = as_transaction_has_set(tr) ? |
939 | as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET) : NULL; |
940 | |
941 | if (! f || as_msg_field_get_value_sz(f) == 0) { |
942 | cf_warning(AS_RW, "write_master: null/empty set name not allowed for namespace %s" , ns->name); |
943 | write_master_failed(tr, 0, false, 0, 0, AS_ERR_PARAMETER); |
944 | return false; |
945 | } |
946 | } |
947 | |
948 | return true; |
949 | } |
950 | |
951 | |
952 | int |
953 | write_master_policies(as_transaction* tr, bool* p_must_not_create, |
954 | bool* p_record_level_replace, bool* p_must_fetch_data) |
955 | { |
956 | // Shortcut pointers. |
957 | as_msg* m = &tr->msgp->msg; |
958 | as_namespace* ns = tr->rsv.ns; |
959 | |
960 | if (m->n_ops == 0) { |
961 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bin op(s) expected, none present " , ns->name); |
962 | return AS_ERR_PARAMETER; |
963 | } |
964 | |
965 | bool info1_get_all = (m->info1 & AS_MSG_INFO1_GET_ALL) != 0; |
966 | bool respond_all_ops = (m->info2 & AS_MSG_INFO2_RESPOND_ALL_OPS) != 0; |
967 | |
968 | bool must_not_create = |
969 | (m->info3 & AS_MSG_INFO3_UPDATE_ONLY) != 0 || |
970 | (m->info3 & AS_MSG_INFO3_REPLACE_ONLY) != 0; |
971 | |
972 | bool record_level_replace = |
973 | (m->info3 & AS_MSG_INFO3_CREATE_OR_REPLACE) != 0 || |
974 | (m->info3 & AS_MSG_INFO3_REPLACE_ONLY) != 0; |
975 | |
976 | bool single_bin_write_first = false; |
977 | bool has_read_op = false; |
978 | bool has_read_all_op = false; |
979 | bool generates_response_bin = false; |
980 | |
981 | // Loop over ops to check and modify flags. |
982 | as_msg_op* op = NULL; |
983 | int i = 0; |
984 | |
985 | while ((op = as_msg_op_iterate(m, op, &i)) != NULL) { |
986 | if (op->op == AS_MSG_OP_TOUCH) { |
987 | if (record_level_replace) { |
988 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: touch op can't have record-level replace flag " , ns->name); |
989 | return AS_ERR_PARAMETER; |
990 | } |
991 | |
992 | must_not_create = true; |
993 | continue; |
994 | } |
995 | |
996 | if (ns->data_in_index && |
997 | ! is_embedded_particle_type(op->particle_type) && |
998 | // Allow AS_PARTICLE_TYPE_NULL, although bin-delete operations |
999 | // are not likely in single-bin configuration. |
1000 | op->particle_type != AS_PARTICLE_TYPE_NULL) { |
1001 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: can't write data type %u in data-in-index configuration " , ns->name, op->particle_type); |
1002 | return AS_ERR_INCOMPATIBLE_TYPE; |
1003 | } |
1004 | |
1005 | if (op->name_sz >= AS_BIN_NAME_MAX_SZ) { |
1006 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bin name too long (%d) " , ns->name, op->name_sz); |
1007 | return AS_ERR_BIN_NAME; |
1008 | } |
1009 | |
1010 | if (op->op == AS_MSG_OP_WRITE) { |
1011 | if (op->particle_type == AS_PARTICLE_TYPE_NULL && |
1012 | record_level_replace) { |
1013 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bin delete can't have record-level replace flag " , ns->name); |
1014 | return AS_ERR_PARAMETER; |
1015 | } |
1016 | |
1017 | if (ns->single_bin && i == 0) { |
1018 | single_bin_write_first = true; |
1019 | } |
1020 | } |
1021 | else if (OP_IS_MODIFY(op->op)) { |
1022 | if (record_level_replace) { |
1023 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: modify op can't have record-level replace flag " , ns->name); |
1024 | return AS_ERR_PARAMETER; |
1025 | } |
1026 | } |
1027 | else if (op->op == AS_MSG_OP_DELETE_ALL) { |
1028 | if (record_level_replace) { |
1029 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: delete-all op can't have record-level replace flag " , ns->name); |
1030 | return AS_ERR_PARAMETER; |
1031 | } |
1032 | |
1033 | // Could forbid multiple delete-alls and delete-all being first op |
1034 | // (should use replace), but these are nonsensical, not unworkable. |
1035 | } |
1036 | else if (op_is_read_all(op, m)) { |
1037 | if (respond_all_ops) { |
1038 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: read-all op can't have respond-all-ops flag " , ns->name); |
1039 | return AS_ERR_PARAMETER; |
1040 | } |
1041 | |
1042 | if (has_read_all_op) { |
1043 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: can't have more than one read-all op " , ns->name); |
1044 | return AS_ERR_PARAMETER; |
1045 | } |
1046 | |
1047 | has_read_op = true; |
1048 | has_read_all_op = true; |
1049 | } |
1050 | else if (op->op == AS_MSG_OP_READ) { |
1051 | has_read_op = true; |
1052 | generates_response_bin = true; |
1053 | } |
1054 | else if (op->op == AS_MSG_OP_BITS_MODIFY) { |
1055 | if (record_level_replace) { |
1056 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: bits modify op can't have record-level replace flag " , ns->name); |
1057 | return AS_ERR_PARAMETER; |
1058 | } |
1059 | } |
1060 | else if (op->op == AS_MSG_OP_BITS_READ) { |
1061 | has_read_op = true; |
1062 | generates_response_bin = true; |
1063 | } |
1064 | else if (op->op == AS_MSG_OP_CDT_MODIFY) { |
1065 | if (record_level_replace) { |
1066 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: cdt modify op can't have record-level replace flag " , ns->name); |
1067 | return AS_ERR_PARAMETER; |
1068 | } |
1069 | |
1070 | generates_response_bin = true; // CDT modify may generate a response bin |
1071 | } |
1072 | else if (op->op == AS_MSG_OP_CDT_READ) { |
1073 | has_read_op = true; |
1074 | generates_response_bin = true; |
1075 | } |
1076 | } |
1077 | |
1078 | if (has_read_op && (m->info1 & AS_MSG_INFO1_READ) == 0) { |
1079 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: has read op but read flag not set " , ns->name); |
1080 | return AS_ERR_PARAMETER; |
1081 | } |
1082 | |
1083 | if (has_read_all_op && generates_response_bin) { |
1084 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: read-all op can't mix with ops that generate response bins " , ns->name); |
1085 | return AS_ERR_PARAMETER; |
1086 | } |
1087 | |
1088 | if (info1_get_all && ! has_read_all_op) { |
1089 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: get-all flag set with no read-all op " , ns->name); |
1090 | return AS_ERR_PARAMETER; |
1091 | } |
1092 | |
1093 | bool must_fetch_data = ! record_level_replace; |
1094 | // Multi-bin case may modify this to force fetch if there's a sindex. |
1095 | |
1096 | if (single_bin_write_first && must_fetch_data) { |
1097 | must_fetch_data = false; |
1098 | } |
1099 | |
1100 | *p_must_not_create = must_not_create; |
1101 | *p_record_level_replace = record_level_replace; |
1102 | *p_must_fetch_data = must_fetch_data; |
1103 | |
1104 | return 0; |
1105 | } |
1106 | |
1107 | |
1108 | bool |
1109 | check_msg_set_name(as_transaction* tr, const char* set_name) |
1110 | { |
1111 | as_msg_field* f = as_transaction_has_set(tr) ? |
1112 | as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_SET) : NULL; |
1113 | |
1114 | if (! f || as_msg_field_get_value_sz(f) == 0) { |
1115 | if (set_name) { |
1116 | cf_warning_digest(AS_RW, &tr->keyd, "overwriting record in set '%s' but msg has no set name " , |
1117 | set_name); |
1118 | } |
1119 | |
1120 | return true; |
1121 | } |
1122 | |
1123 | size_t msg_set_name_len = as_msg_field_get_value_sz(f); |
1124 | |
1125 | if (! set_name || |
1126 | strncmp(set_name, (const char*)f->data, msg_set_name_len) != 0 || |
1127 | set_name[msg_set_name_len] != 0) { |
1128 | CF_ZSTR_DEFINE(msg_set_name, AS_SET_NAME_MAX_SIZE + 4, f->data, |
1129 | msg_set_name_len); |
1130 | |
1131 | cf_warning_digest(AS_RW, &tr->keyd, "overwriting record in set '%s' but msg has different set name '%s' " , |
1132 | set_name ? set_name : "(null)" , msg_set_name); |
1133 | return false; |
1134 | } |
1135 | |
1136 | return true; |
1137 | } |
1138 | |
1139 | |
1140 | int |
1141 | iops_predexp_filter_meta(const as_transaction* tr, const as_record* r, |
1142 | predexp_eval_t** predexp) |
1143 | { |
1144 | *predexp = tr->from.iops_orig->predexp; |
1145 | |
1146 | if (*predexp == NULL) { |
1147 | return AS_OK; |
1148 | } |
1149 | |
1150 | predexp_args_t predargs = { .ns = tr->rsv.ns, .md = (as_record*)r }; |
1151 | predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs); |
1152 | |
1153 | if (predrv == PREDEXP_UNKNOWN) { |
1154 | return AS_OK; // caller must later check bins using *predexp |
1155 | } |
1156 | // else - caller will not need to apply filter later. |
1157 | |
1158 | *predexp = NULL; |
1159 | |
1160 | return predrv == PREDEXP_TRUE ? AS_OK : AS_ERR_FILTERED_OUT; |
1161 | } |
1162 | |
1163 | |
1164 | //========================================================== |
1165 | // write_master() splits based on configuration - |
1166 | // data-in-memory & single-bin. |
1167 | // |
1168 | // These handle the bin operations part of write_master() |
1169 | // which are very different per configuration. |
1170 | // |
1171 | |
1172 | int |
1173 | write_master_dim_single_bin(as_transaction* tr, as_storage_rd* rd, |
1174 | rw_request* rw, bool* is_delete, xdr_dirty_bins* dirty_bins) |
1175 | { |
1176 | // Shortcut pointers. |
1177 | as_msg* m = &tr->msgp->msg; |
1178 | as_namespace* ns = tr->rsv.ns; |
1179 | as_record* r = rd->r; |
1180 | |
1181 | rd->n_bins = 1; |
1182 | |
1183 | // Set rd->bins! |
1184 | // For data-in-memory: |
1185 | // - if just created record - sets rd->bins to empty bin embedded in index |
1186 | // - otherwise - sets rd->bins to existing embedded bin |
1187 | as_storage_rd_load_bins(rd, NULL); |
1188 | |
1189 | // For memory accounting, note current usage. |
1190 | uint64_t memory_bytes = 0; |
1191 | |
1192 | if (as_bin_inuse(rd->bins)) { |
1193 | memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
1194 | } |
1195 | |
1196 | //------------------------------------------------------ |
1197 | // Copy existing bin into old_bin to enable unwinding. |
1198 | // |
1199 | |
1200 | uint32_t n_old_bins = as_bin_inuse(rd->bins) ? 1 : 0; |
1201 | as_bin old_bin; |
1202 | |
1203 | as_single_bin_copy(&old_bin, rd->bins); |
1204 | |
1205 | // Collect bins (old or intermediate versions) to destroy on cleanup. |
1206 | as_bin cleanup_bins[m->n_ops]; |
1207 | uint32_t n_cleanup_bins = 0; |
1208 | |
1209 | //------------------------------------------------------ |
1210 | // Apply changes to metadata in as_index needed for |
1211 | // response, pickling, and writing. |
1212 | // |
1213 | |
1214 | index_metadata old_metadata; |
1215 | |
1216 | write_master_update_index_metadata(tr, &old_metadata, r); |
1217 | |
1218 | //------------------------------------------------------ |
1219 | // Loop over bin ops to affect new bin space, creating |
1220 | // the new record bin to write. |
1221 | // |
1222 | |
1223 | uint32_t n_new_bins = 0; |
1224 | int result = write_master_bin_ops(tr, rd, NULL, cleanup_bins, |
1225 | &n_cleanup_bins, &rw->response_db, &n_new_bins, dirty_bins); |
1226 | |
1227 | if (result != 0) { |
1228 | write_master_index_metadata_unwind(&old_metadata, r); |
1229 | write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins); |
1230 | return result; |
1231 | } |
1232 | |
1233 | //------------------------------------------------------ |
1234 | // Created the new bin to write. |
1235 | // |
1236 | |
1237 | if (n_new_bins == 0) { |
1238 | if (n_old_bins == 0) { |
1239 | write_master_index_metadata_unwind(&old_metadata, r); |
1240 | write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins); |
1241 | return AS_ERR_NOT_FOUND; |
1242 | } |
1243 | |
1244 | if (! validate_delete_durability(tr)) { |
1245 | write_master_index_metadata_unwind(&old_metadata, r); |
1246 | write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins); |
1247 | return AS_ERR_FORBIDDEN; |
1248 | } |
1249 | |
1250 | *is_delete = true; |
1251 | } |
1252 | |
1253 | //------------------------------------------------------ |
1254 | // Write the record to storage. |
1255 | // |
1256 | |
1257 | if ((result = as_storage_record_write(rd)) < 0) { |
1258 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() " , ns->name); |
1259 | write_master_index_metadata_unwind(&old_metadata, r); |
1260 | write_master_dim_single_bin_unwind(&old_bin, rd->bins, cleanup_bins, n_cleanup_bins); |
1261 | return -result; |
1262 | } |
1263 | |
1264 | pickle_all(rd, rw); |
1265 | |
1266 | //------------------------------------------------------ |
1267 | // Cleanup - destroy relevant bins, can't unwind after. |
1268 | // |
1269 | |
1270 | destroy_stack_bins(cleanup_bins, n_cleanup_bins); |
1271 | |
1272 | as_storage_record_adjust_mem_stats(rd, memory_bytes); |
1273 | |
1274 | return 0; |
1275 | } |
1276 | |
1277 | |
1278 | int |
1279 | write_master_dim(as_transaction* tr, as_storage_rd* rd, |
1280 | bool record_level_replace, rw_request* rw, bool* is_delete, |
1281 | xdr_dirty_bins* dirty_bins) |
1282 | { |
1283 | // Shortcut pointers. |
1284 | as_msg* m = &tr->msgp->msg; |
1285 | as_namespace* ns = tr->rsv.ns; |
1286 | as_record* r = rd->r; |
1287 | |
1288 | // Set rd->n_bins! |
1289 | // For data-in-memory - number of bins in existing record. |
1290 | as_storage_rd_load_n_bins(rd); |
1291 | |
1292 | // Set rd->bins! |
1293 | // For data-in-memory: |
1294 | // - if just created record - sets rd->bins to NULL |
1295 | // - otherwise - sets rd->bins to existing (already populated) bins array |
1296 | as_storage_rd_load_bins(rd, NULL); |
1297 | |
1298 | // For memory accounting, note current usage. |
1299 | uint64_t memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
1300 | |
1301 | //------------------------------------------------------ |
1302 | // Copy existing bins to new space, and keep old bins |
1303 | // intact for sindex adjustment and so it's possible to |
1304 | // unwind on failure. |
1305 | // |
1306 | |
1307 | uint32_t n_old_bins = (uint32_t)rd->n_bins; |
1308 | uint32_t n_new_bins = n_old_bins + m->n_ops; // can't be more than this |
1309 | |
1310 | size_t old_bins_size = n_old_bins * sizeof(as_bin); |
1311 | size_t new_bins_size = n_new_bins * sizeof(as_bin); |
1312 | |
1313 | as_bin* old_bins = rd->bins; |
1314 | as_bin new_bins[n_new_bins]; |
1315 | |
1316 | if (old_bins_size == 0 || record_level_replace) { |
1317 | memset(new_bins, 0, new_bins_size); |
1318 | } |
1319 | else { |
1320 | memcpy(new_bins, old_bins, old_bins_size); |
1321 | memset(new_bins + n_old_bins, 0, new_bins_size - old_bins_size); |
1322 | } |
1323 | |
1324 | rd->n_bins = (uint16_t)n_new_bins; |
1325 | rd->bins = new_bins; |
1326 | |
1327 | // Collect bins (old or intermediate versions) to destroy on cleanup. |
1328 | as_bin cleanup_bins[m->n_ops]; |
1329 | uint32_t n_cleanup_bins = 0; |
1330 | |
1331 | //------------------------------------------------------ |
1332 | // Apply changes to metadata in as_index needed for |
1333 | // response, pickling, and writing. |
1334 | // |
1335 | |
1336 | index_metadata old_metadata; |
1337 | |
1338 | write_master_update_index_metadata(tr, &old_metadata, r); |
1339 | |
1340 | //------------------------------------------------------ |
1341 | // Loop over bin ops to affect new bin space, creating |
1342 | // the new record bins to write. |
1343 | // |
1344 | |
1345 | int result = write_master_bin_ops(tr, rd, NULL, cleanup_bins, |
1346 | &n_cleanup_bins, &rw->response_db, &n_new_bins, dirty_bins); |
1347 | |
1348 | if (result != 0) { |
1349 | write_master_index_metadata_unwind(&old_metadata, r); |
1350 | write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins); |
1351 | return result; |
1352 | } |
1353 | |
1354 | //------------------------------------------------------ |
1355 | // Created the new bins to write. |
1356 | // |
1357 | |
1358 | as_bin_space* new_bin_space = NULL; |
1359 | |
1360 | // Adjust - the actual number of new bins. |
1361 | rd->n_bins = n_new_bins; |
1362 | |
1363 | if (n_new_bins != 0) { |
1364 | new_bins_size = n_new_bins * sizeof(as_bin); |
1365 | new_bin_space = (as_bin_space*) |
1366 | cf_malloc_ns(sizeof(as_bin_space) + new_bins_size); |
1367 | } |
1368 | else { |
1369 | if (n_old_bins == 0) { |
1370 | write_master_index_metadata_unwind(&old_metadata, r); |
1371 | write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins); |
1372 | return AS_ERR_NOT_FOUND; |
1373 | } |
1374 | |
1375 | if (! validate_delete_durability(tr)) { |
1376 | write_master_index_metadata_unwind(&old_metadata, r); |
1377 | write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins); |
1378 | return AS_ERR_FORBIDDEN; |
1379 | } |
1380 | |
1381 | *is_delete = true; |
1382 | } |
1383 | |
1384 | //------------------------------------------------------ |
1385 | // Write the record to storage. |
1386 | // |
1387 | |
1388 | if ((result = as_storage_record_write(rd)) < 0) { |
1389 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() " , ns->name); |
1390 | |
1391 | if (new_bin_space) { |
1392 | cf_free(new_bin_space); |
1393 | } |
1394 | |
1395 | write_master_index_metadata_unwind(&old_metadata, r); |
1396 | write_master_dim_unwind(old_bins, n_old_bins, new_bins, n_new_bins, cleanup_bins, n_cleanup_bins); |
1397 | return -result; |
1398 | } |
1399 | |
1400 | pickle_all(rd, rw); |
1401 | |
1402 | //------------------------------------------------------ |
1403 | // Success - adjust sindex, looking at old and new bins. |
1404 | // |
1405 | |
1406 | if (record_has_sindex(r, ns) && |
1407 | write_sindex_update(ns, rd->set_name, &tr->keyd, old_bins, |
1408 | n_old_bins, new_bins, n_new_bins)) { |
1409 | tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED; |
1410 | } |
1411 | |
1412 | //------------------------------------------------------ |
1413 | // Cleanup - destroy relevant bins, can't unwind after. |
1414 | // |
1415 | |
1416 | if (record_level_replace) { |
1417 | destroy_stack_bins(old_bins, n_old_bins); |
1418 | } |
1419 | |
1420 | destroy_stack_bins(cleanup_bins, n_cleanup_bins); |
1421 | |
1422 | //------------------------------------------------------ |
1423 | // Final changes to record data in as_index. |
1424 | // |
1425 | |
1426 | // Fill out new_bin_space. |
1427 | if (n_new_bins != 0) { |
1428 | new_bin_space->n_bins = rd->n_bins; |
1429 | memcpy((void*)new_bin_space->bins, new_bins, new_bins_size); |
1430 | } |
1431 | |
1432 | // Swizzle the index element's as_bin_space pointer. |
1433 | as_bin_space* old_bin_space = as_index_get_bin_space(r); |
1434 | |
1435 | if (old_bin_space) { |
1436 | cf_free(old_bin_space); |
1437 | } |
1438 | |
1439 | as_index_set_bin_space(r, new_bin_space); |
1440 | |
1441 | // Accommodate a new stored key - wasn't needed for pickling and writing. |
1442 | if (r->key_stored == 0 && rd->key) { |
1443 | as_record_allocate_key(r, rd->key, rd->key_size); |
1444 | r->key_stored = 1; |
1445 | } |
1446 | |
1447 | as_storage_record_adjust_mem_stats(rd, memory_bytes); |
1448 | |
1449 | return 0; |
1450 | } |
1451 | |
1452 | |
1453 | int |
1454 | write_master_ssd_single_bin(as_transaction* tr, as_storage_rd* rd, |
1455 | bool must_fetch_data, rw_request* rw, bool* is_delete, |
1456 | xdr_dirty_bins* dirty_bins) |
1457 | { |
1458 | // Shortcut pointers. |
1459 | as_namespace* ns = tr->rsv.ns; |
1460 | as_record* r = rd->r; |
1461 | |
1462 | rd->ignore_record_on_device = ! must_fetch_data; |
1463 | rd->n_bins = 1; |
1464 | |
1465 | as_bin stack_bin; |
1466 | |
1467 | // Set rd->bins! |
1468 | // For non-data-in-memory: |
1469 | // - if just created record, or must_fetch_data is false - sets rd->bins to |
1470 | // empty stack_bin |
1471 | // - otherwise - sets rd->bins to stack_bin, reads existing record off |
1472 | // device and populates bin (including particle pointer into block |
1473 | // buffer) |
1474 | int result = as_storage_rd_load_bins(rd, &stack_bin); |
1475 | |
1476 | if (result < 0) { |
1477 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_rd_load_bins()" , ns->name); |
1478 | return -result; |
1479 | } |
1480 | |
1481 | uint32_t n_old_bins = as_bin_inuse(rd->bins) ? 1 : 0; |
1482 | |
1483 | //------------------------------------------------------ |
1484 | // Apply changes to metadata in as_index needed for |
1485 | // response, pickling, and writing. |
1486 | // |
1487 | |
1488 | index_metadata old_metadata; |
1489 | |
1490 | write_master_update_index_metadata(tr, &old_metadata, r); |
1491 | |
1492 | //------------------------------------------------------ |
1493 | // Loop over bin ops to affect new bin space, creating |
1494 | // the new record bin to write. |
1495 | // |
1496 | |
1497 | cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE); |
1498 | |
1499 | uint32_t n_new_bins = 0; |
1500 | |
1501 | if ((result = write_master_bin_ops(tr, rd, &particles_llb, NULL, NULL, |
1502 | &rw->response_db, &n_new_bins, dirty_bins)) != 0) { |
1503 | cf_ll_buf_free(&particles_llb); |
1504 | write_master_index_metadata_unwind(&old_metadata, r); |
1505 | return result; |
1506 | } |
1507 | |
1508 | //------------------------------------------------------ |
1509 | // Created the new bin to write. |
1510 | // |
1511 | |
1512 | if (n_new_bins == 0) { |
1513 | if (n_old_bins == 0) { |
1514 | cf_ll_buf_free(&particles_llb); |
1515 | write_master_index_metadata_unwind(&old_metadata, r); |
1516 | return AS_ERR_NOT_FOUND; |
1517 | } |
1518 | |
1519 | if (! validate_delete_durability(tr)) { |
1520 | cf_ll_buf_free(&particles_llb); |
1521 | write_master_index_metadata_unwind(&old_metadata, r); |
1522 | return AS_ERR_FORBIDDEN; |
1523 | } |
1524 | |
1525 | *is_delete = true; |
1526 | } |
1527 | |
1528 | //------------------------------------------------------ |
1529 | // Write the record to storage. |
1530 | // |
1531 | |
1532 | if ((result = as_storage_record_write(rd)) < 0) { |
1533 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() " , ns->name); |
1534 | cf_ll_buf_free(&particles_llb); |
1535 | write_master_index_metadata_unwind(&old_metadata, r); |
1536 | return -result; |
1537 | } |
1538 | |
1539 | pickle_all(rd, rw); |
1540 | |
1541 | //------------------------------------------------------ |
1542 | // Final changes to record data in as_index. |
1543 | // |
1544 | |
1545 | // Accommodate a new stored key - wasn't needed for pickling and writing. |
1546 | if (r->key_stored == 0 && rd->key) { |
1547 | r->key_stored = 1; |
1548 | } |
1549 | |
1550 | cf_ll_buf_free(&particles_llb); |
1551 | |
1552 | return 0; |
1553 | } |
1554 | |
1555 | |
1556 | int |
1557 | write_master_ssd(as_transaction* tr, as_storage_rd* rd, bool must_fetch_data, |
1558 | bool record_level_replace, rw_request* rw, bool* is_delete, |
1559 | xdr_dirty_bins* dirty_bins) |
1560 | { |
1561 | // Shortcut pointers. |
1562 | as_msg* m = &tr->msgp->msg; |
1563 | as_namespace* ns = tr->rsv.ns; |
1564 | as_record* r = rd->r; |
1565 | bool has_sindex = record_has_sindex(r, ns); |
1566 | |
1567 | // For sindex, we must read existing record even if replacing. |
1568 | if (! must_fetch_data) { |
1569 | must_fetch_data = has_sindex; |
1570 | } |
1571 | |
1572 | rd->ignore_record_on_device = ! must_fetch_data; |
1573 | |
1574 | // Set rd->n_bins! |
1575 | // For non-data-in-memory: |
1576 | // - if just created record, or must_fetch_data is false - 0 |
1577 | // - otherwise - number of bins in existing record |
1578 | int result = as_storage_rd_load_n_bins(rd); |
1579 | |
1580 | if (result < 0) { |
1581 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_rd_load_n_bins()" , ns->name); |
1582 | return -result; |
1583 | } |
1584 | |
1585 | uint32_t n_old_bins = (uint32_t)rd->n_bins; |
1586 | uint32_t n_new_bins = n_old_bins + m->n_ops; // can't be more than this |
1587 | |
1588 | // Needed for as_storage_rd_load_bins() to clear all unused bins. |
1589 | rd->n_bins = (uint16_t)n_new_bins; |
1590 | |
1591 | // Stack space for resulting record's bins. |
1592 | as_bin old_bins[n_old_bins]; |
1593 | as_bin new_bins[n_new_bins]; |
1594 | |
1595 | // Set rd->bins! |
1596 | // For non-data-in-memory: |
1597 | // - if just created record, or must_fetch_data is false - sets rd->bins to |
1598 | // empty new_bins |
1599 | // - otherwise - sets rd->bins to new_bins, reads existing record off device |
1600 | // and populates bins (including particle pointers into block buffer) |
1601 | if ((result = as_storage_rd_load_bins(rd, new_bins)) < 0) { |
1602 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_rd_load_bins()" , ns->name); |
1603 | return -result; |
1604 | } |
1605 | |
1606 | //------------------------------------------------------ |
1607 | // Copy old bins (if any) - which are currently in new |
1608 | // bins array - to old bins array, for sindex purposes. |
1609 | // |
1610 | |
1611 | if (has_sindex && n_old_bins != 0) { |
1612 | memcpy(old_bins, new_bins, n_old_bins * sizeof(as_bin)); |
1613 | |
1614 | // If it's a replace, clear the new bins array. |
1615 | if (record_level_replace) { |
1616 | as_bin_set_all_empty(rd); |
1617 | } |
1618 | } |
1619 | |
1620 | //------------------------------------------------------ |
1621 | // Apply changes to metadata in as_index needed for |
1622 | // response, pickling, and writing. |
1623 | // |
1624 | |
1625 | index_metadata old_metadata; |
1626 | |
1627 | write_master_update_index_metadata(tr, &old_metadata, r); |
1628 | |
1629 | //------------------------------------------------------ |
1630 | // Loop over bin ops to affect new bin space, creating |
1631 | // the new record bins to write. |
1632 | // |
1633 | |
1634 | cf_ll_buf_define(particles_llb, STACK_PARTICLES_SIZE); |
1635 | |
1636 | if ((result = write_master_bin_ops(tr, rd, &particles_llb, NULL, NULL, |
1637 | &rw->response_db, &n_new_bins, dirty_bins)) != 0) { |
1638 | cf_ll_buf_free(&particles_llb); |
1639 | write_master_index_metadata_unwind(&old_metadata, r); |
1640 | return result; |
1641 | } |
1642 | |
1643 | //------------------------------------------------------ |
1644 | // Created the new bins to write. |
1645 | // |
1646 | |
1647 | // Adjust - the actual number of new bins. |
1648 | rd->n_bins = n_new_bins; |
1649 | |
1650 | if (n_new_bins == 0) { |
1651 | if (n_old_bins == 0) { |
1652 | cf_ll_buf_free(&particles_llb); |
1653 | write_master_index_metadata_unwind(&old_metadata, r); |
1654 | return AS_ERR_NOT_FOUND; |
1655 | } |
1656 | |
1657 | if (! validate_delete_durability(tr)) { |
1658 | cf_ll_buf_free(&particles_llb); |
1659 | write_master_index_metadata_unwind(&old_metadata, r); |
1660 | return AS_ERR_FORBIDDEN; |
1661 | } |
1662 | |
1663 | *is_delete = true; |
1664 | } |
1665 | |
1666 | //------------------------------------------------------ |
1667 | // Write the record to storage. |
1668 | // |
1669 | |
1670 | if ((result = as_storage_record_write(rd)) < 0) { |
1671 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_storage_record_write() " , ns->name); |
1672 | cf_ll_buf_free(&particles_llb); |
1673 | write_master_index_metadata_unwind(&old_metadata, r); |
1674 | return -result; |
1675 | } |
1676 | |
1677 | pickle_all(rd, rw); |
1678 | |
1679 | //------------------------------------------------------ |
1680 | // Success - adjust sindex, looking at old and new bins. |
1681 | // |
1682 | |
1683 | if (has_sindex && |
1684 | write_sindex_update(ns, rd->set_name, &tr->keyd, old_bins, |
1685 | n_old_bins, new_bins, n_new_bins)) { |
1686 | tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED; |
1687 | } |
1688 | |
1689 | //------------------------------------------------------ |
1690 | // Final changes to record data in as_index. |
1691 | // |
1692 | |
1693 | // Accommodate a new stored key - wasn't needed for pickling and writing. |
1694 | if (r->key_stored == 0 && rd->key) { |
1695 | r->key_stored = 1; |
1696 | } |
1697 | |
1698 | cf_ll_buf_free(&particles_llb); |
1699 | |
1700 | return 0; |
1701 | } |
1702 | |
1703 | |
1704 | //========================================================== |
1705 | // write_master() - apply record updates. |
1706 | // |
1707 | |
1708 | void |
1709 | write_master_update_index_metadata(as_transaction* tr, index_metadata* old, |
1710 | as_record* r) |
1711 | { |
1712 | old->void_time = r->void_time; |
1713 | old->last_update_time = r->last_update_time; |
1714 | old->generation = r->generation; |
1715 | |
1716 | update_metadata_in_index(tr, r); |
1717 | } |
1718 | |
1719 | |
1720 | int |
1721 | write_master_bin_ops(as_transaction* tr, as_storage_rd* rd, |
1722 | cf_ll_buf* particles_llb, as_bin* cleanup_bins, |
1723 | uint32_t* p_n_cleanup_bins, cf_dyn_buf* db, uint32_t* p_n_final_bins, |
1724 | xdr_dirty_bins* dirty_bins) |
1725 | { |
1726 | // Shortcut pointers. |
1727 | as_msg* m = &tr->msgp->msg; |
1728 | as_namespace* ns = tr->rsv.ns; |
1729 | as_record* r = rd->r; |
1730 | bool has_read_all_op = (m->info1 & AS_MSG_INFO1_GET_ALL) != 0; |
1731 | |
1732 | as_msg_op* ops[m->n_ops]; |
1733 | as_bin response_bins[has_read_all_op ? rd->n_bins : m->n_ops]; |
1734 | as_bin result_bins[m->n_ops]; |
1735 | |
1736 | uint32_t n_response_bins = 0; |
1737 | uint32_t n_result_bins = 0; |
1738 | |
1739 | int result = write_master_bin_ops_loop(tr, rd, ops, response_bins, |
1740 | &n_response_bins, result_bins, &n_result_bins, particles_llb, |
1741 | cleanup_bins, p_n_cleanup_bins, dirty_bins); |
1742 | |
1743 | if (result != 0) { |
1744 | destroy_stack_bins(result_bins, n_result_bins); |
1745 | return result; |
1746 | } |
1747 | |
1748 | *p_n_final_bins = as_bin_inuse_count(rd); |
1749 | |
1750 | if (n_response_bins == 0) { |
1751 | // If 'ordered-ops' flag was not set, and there were no read ops or CDT |
1752 | // ops with results, there's no response to build and send later. |
1753 | return 0; |
1754 | } |
1755 | |
1756 | as_bin* bins[n_response_bins]; |
1757 | |
1758 | for (uint32_t i = 0; i < n_response_bins; i++) { |
1759 | as_bin* b = &response_bins[i]; |
1760 | |
1761 | bins[i] = as_bin_inuse(b) ? b : NULL; |
1762 | } |
1763 | |
1764 | uint32_t generation = r->generation; |
1765 | uint32_t void_time = r->void_time; |
1766 | |
1767 | // Deletes don't return metadata. |
1768 | if (*p_n_final_bins == 0) { |
1769 | generation = 0; |
1770 | void_time = 0; |
1771 | } |
1772 | |
1773 | size_t msg_sz = 0; |
1774 | uint8_t* msgp = (uint8_t*)as_msg_make_response_msg(AS_OK, generation, |
1775 | void_time, has_read_all_op ? NULL : ops, bins, |
1776 | (uint16_t)n_response_bins, ns, NULL, &msg_sz, |
1777 | as_transaction_trid(tr)); |
1778 | |
1779 | destroy_stack_bins(result_bins, n_result_bins); |
1780 | |
1781 | // Stash the message, to be sent later. |
1782 | db->buf = msgp; |
1783 | db->is_stack = false; |
1784 | db->alloc_sz = msg_sz; |
1785 | db->used_sz = msg_sz; |
1786 | |
1787 | return 0; |
1788 | } |
1789 | |
1790 | |
1791 | int |
1792 | write_master_bin_ops_loop(as_transaction* tr, as_storage_rd* rd, |
1793 | as_msg_op** ops, as_bin* response_bins, uint32_t* p_n_response_bins, |
1794 | as_bin* result_bins, uint32_t* p_n_result_bins, |
1795 | cf_ll_buf* particles_llb, as_bin* cleanup_bins, |
1796 | uint32_t* p_n_cleanup_bins, xdr_dirty_bins* dirty_bins) |
1797 | { |
1798 | // Shortcut pointers. |
1799 | as_msg* m = &tr->msgp->msg; |
1800 | as_namespace* ns = tr->rsv.ns; |
1801 | bool respond_all_ops = (m->info2 & AS_MSG_INFO2_RESPOND_ALL_OPS) != 0; |
1802 | |
1803 | int result; |
1804 | |
1805 | as_msg_op* op = NULL; |
1806 | int i = 0; |
1807 | |
1808 | while ((op = as_msg_op_iterate(m, op, &i)) != NULL) { |
1809 | if (op->op == AS_MSG_OP_TOUCH) { |
1810 | continue; |
1811 | } |
1812 | |
1813 | if (op->op == AS_MSG_OP_WRITE) { |
1814 | // AS_PARTICLE_TYPE_NULL means delete the bin. |
1815 | // TODO - should this even be allowed for single-bin? |
1816 | if (op->particle_type == AS_PARTICLE_TYPE_NULL) { |
1817 | int32_t j = as_bin_get_index_from_buf(rd, op->name, op->name_sz); |
1818 | |
1819 | if (j != -1) { |
1820 | if (ns->storage_data_in_memory) { |
1821 | // Double copy necessary for single-bin, but doing it |
1822 | // generally for code simplicity. |
1823 | as_bin cleanup_bin; |
1824 | as_bin_copy(ns, &cleanup_bin, &rd->bins[j]); |
1825 | |
1826 | append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins); |
1827 | } |
1828 | |
1829 | as_bin_set_empty_shift(rd, j); |
1830 | xdr_fill_dirty_bins(dirty_bins); |
1831 | } |
1832 | } |
1833 | // It's a regular bin write. |
1834 | else { |
1835 | as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result); |
1836 | |
1837 | if (! b) { |
1838 | return result; |
1839 | } |
1840 | |
1841 | if (ns->storage_data_in_memory) { |
1842 | as_bin cleanup_bin; |
1843 | as_bin_copy(ns, &cleanup_bin, b); |
1844 | |
1845 | if ((result = as_bin_particle_alloc_from_client(b, op)) < 0) { |
1846 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_alloc_from_client() " , ns->name); |
1847 | return -result; |
1848 | } |
1849 | |
1850 | append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins); |
1851 | } |
1852 | else { |
1853 | if ((result = as_bin_particle_stack_from_client(b, particles_llb, op)) < 0) { |
1854 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_stack_from_client() " , ns->name); |
1855 | return -result; |
1856 | } |
1857 | } |
1858 | |
1859 | xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz); |
1860 | } |
1861 | |
1862 | if (respond_all_ops) { |
1863 | ops[*p_n_response_bins] = op; |
1864 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
1865 | } |
1866 | } |
1867 | // Modify an existing bin value. |
1868 | else if (OP_IS_MODIFY(op->op)) { |
1869 | as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result); |
1870 | |
1871 | if (! b) { |
1872 | return result; |
1873 | } |
1874 | |
1875 | if (ns->storage_data_in_memory) { |
1876 | as_bin cleanup_bin; |
1877 | as_bin_copy(ns, &cleanup_bin, b); |
1878 | |
1879 | if ((result = as_bin_particle_alloc_modify_from_client(b, op)) < 0) { |
1880 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_alloc_modify_from_client() " , ns->name); |
1881 | return -result; |
1882 | } |
1883 | |
1884 | append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins); |
1885 | } |
1886 | else { |
1887 | if ((result = as_bin_particle_stack_modify_from_client(b, particles_llb, op)) < 0) { |
1888 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_particle_stack_modify_from_client() " , ns->name); |
1889 | return -result; |
1890 | } |
1891 | } |
1892 | |
1893 | xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz); |
1894 | |
1895 | if (respond_all_ops) { |
1896 | ops[*p_n_response_bins] = op; |
1897 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
1898 | } |
1899 | } |
1900 | else if (op->op == AS_MSG_OP_DELETE_ALL) { |
1901 | if (ns->storage_data_in_memory) { |
1902 | for (uint16_t i = 0; i < rd->n_bins; i++) { |
1903 | as_bin* b = &rd->bins[i]; |
1904 | |
1905 | if (! as_bin_inuse(b)) { |
1906 | break; |
1907 | } |
1908 | |
1909 | // Double copy necessary for single-bin, but doing it |
1910 | // generally for code simplicity. |
1911 | as_bin cleanup_bin; |
1912 | as_bin_copy(ns, &cleanup_bin, b); |
1913 | |
1914 | append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins); |
1915 | } |
1916 | } |
1917 | |
1918 | as_bin_set_all_empty(rd); |
1919 | xdr_fill_dirty_bins(dirty_bins); |
1920 | |
1921 | if (respond_all_ops) { |
1922 | ops[*p_n_response_bins] = op; |
1923 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
1924 | } |
1925 | } |
1926 | else if (op_is_read_all(op, m)) { |
1927 | for (uint16_t i = 0; i < rd->n_bins; i++) { |
1928 | as_bin* b = &rd->bins[i]; |
1929 | |
1930 | if (! as_bin_inuse(b)) { |
1931 | break; |
1932 | } |
1933 | |
1934 | // ops array will not be not used in this case. |
1935 | as_bin_copy(ns, &response_bins[(*p_n_response_bins)++], b); |
1936 | } |
1937 | } |
1938 | else if (op->op == AS_MSG_OP_READ) { |
1939 | as_bin* b = as_bin_get_from_buf(rd, op->name, op->name_sz); |
1940 | |
1941 | if (b) { |
1942 | ops[*p_n_response_bins] = op; |
1943 | as_bin_copy(ns, &response_bins[(*p_n_response_bins)++], b); |
1944 | } |
1945 | else if (respond_all_ops) { |
1946 | ops[*p_n_response_bins] = op; |
1947 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
1948 | } |
1949 | } |
1950 | else if (op->op == AS_MSG_OP_BITS_MODIFY) { |
1951 | as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result); |
1952 | |
1953 | if (! b) { |
1954 | return result; |
1955 | } |
1956 | |
1957 | if (ns->storage_data_in_memory) { |
1958 | as_bin cleanup_bin; |
1959 | as_bin_copy(ns, &cleanup_bin, b); |
1960 | |
1961 | if ((result = as_bin_bits_alloc_modify_from_client(b, op)) < 0) { |
1962 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_bits_alloc_modify_from_client() " , ns->name); |
1963 | return -result; |
1964 | } |
1965 | |
1966 | // Account for noop bits operations. Modifying non-mutable |
1967 | // particle contents in-place is still disallowed. |
1968 | if (cleanup_bin.particle != b->particle) { |
1969 | append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins); |
1970 | } |
1971 | } |
1972 | else { |
1973 | if ((result = as_bin_bits_stack_modify_from_client(b, particles_llb, op)) < 0) { |
1974 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_bits_stack_modify_from_client() " , ns->name); |
1975 | return -result; |
1976 | } |
1977 | } |
1978 | |
1979 | xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz); |
1980 | |
1981 | if (respond_all_ops) { |
1982 | ops[*p_n_response_bins] = op; |
1983 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
1984 | } |
1985 | } |
1986 | else if (op->op == AS_MSG_OP_BITS_READ) { |
1987 | as_bin* b = as_bin_get_from_buf(rd, op->name, op->name_sz); |
1988 | |
1989 | if (b) { |
1990 | as_bin result_bin; |
1991 | as_bin_set_empty(&result_bin); |
1992 | |
1993 | if ((result = as_bin_bits_read_from_client(b, op, &result_bin)) < 0) { |
1994 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_bits_read_from_client() " , ns->name); |
1995 | return -result; |
1996 | } |
1997 | |
1998 | ops[*p_n_response_bins] = op; |
1999 | response_bins[(*p_n_response_bins)++] = result_bin; |
2000 | append_bin_to_destroy(&result_bin, result_bins, p_n_result_bins); |
2001 | } |
2002 | else if (respond_all_ops) { |
2003 | ops[*p_n_response_bins] = op; |
2004 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
2005 | } |
2006 | } |
2007 | else if (op->op == AS_MSG_OP_CDT_MODIFY) { |
2008 | as_bin* b = as_bin_get_or_create_from_buf(rd, op->name, op->name_sz, &result); |
2009 | |
2010 | if (! b) { |
2011 | return result; |
2012 | } |
2013 | |
2014 | as_bin result_bin; |
2015 | as_bin_set_empty(&result_bin); |
2016 | |
2017 | if (ns->storage_data_in_memory) { |
2018 | as_bin cleanup_bin; |
2019 | as_bin_copy(ns, &cleanup_bin, b); |
2020 | |
2021 | if ((result = as_bin_cdt_alloc_modify_from_client(b, op, &result_bin)) < 0) { |
2022 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_cdt_alloc_modify_from_client() " , ns->name); |
2023 | return -result; |
2024 | } |
2025 | |
2026 | // Account for noop CDT operations. Modifying non-mutable |
2027 | // particle contents in-place is still disallowed. |
2028 | if (cleanup_bin.particle != b->particle) { |
2029 | append_bin_to_destroy(&cleanup_bin, cleanup_bins, p_n_cleanup_bins); |
2030 | } |
2031 | } |
2032 | else { |
2033 | if ((result = as_bin_cdt_stack_modify_from_client(b, particles_llb, op, &result_bin)) < 0) { |
2034 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_cdt_stack_modify_from_client() " , ns->name); |
2035 | return -result; |
2036 | } |
2037 | } |
2038 | |
2039 | if (respond_all_ops || as_bin_inuse(&result_bin)) { |
2040 | ops[*p_n_response_bins] = op; |
2041 | response_bins[(*p_n_response_bins)++] = result_bin; |
2042 | append_bin_to_destroy(&result_bin, result_bins, p_n_result_bins); |
2043 | } |
2044 | |
2045 | if (! as_bin_inuse(b)) { |
2046 | // TODO - could do better than finding index from name. |
2047 | int32_t index = as_bin_get_index_from_buf(rd, op->name, op->name_sz); |
2048 | |
2049 | if (index >= 0) { |
2050 | as_bin_set_empty_shift(rd, (uint32_t)index); |
2051 | xdr_fill_dirty_bins(dirty_bins); |
2052 | } |
2053 | } |
2054 | else { |
2055 | xdr_add_dirty_bin(ns, dirty_bins, (const char*)op->name, op->name_sz); |
2056 | } |
2057 | } |
2058 | else if (op->op == AS_MSG_OP_CDT_READ) { |
2059 | as_bin* b = as_bin_get_from_buf(rd, op->name, op->name_sz); |
2060 | |
2061 | if (b) { |
2062 | as_bin result_bin; |
2063 | as_bin_set_empty(&result_bin); |
2064 | |
2065 | if ((result = as_bin_cdt_read_from_client(b, op, &result_bin)) < 0) { |
2066 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: failed as_bin_cdt_read_from_client() " , ns->name); |
2067 | return -result; |
2068 | } |
2069 | |
2070 | ops[*p_n_response_bins] = op; |
2071 | response_bins[(*p_n_response_bins)++] = result_bin; |
2072 | append_bin_to_destroy(&result_bin, result_bins, p_n_result_bins); |
2073 | } |
2074 | else if (respond_all_ops) { |
2075 | ops[*p_n_response_bins] = op; |
2076 | as_bin_set_empty(&response_bins[(*p_n_response_bins)++]); |
2077 | } |
2078 | } |
2079 | else { |
2080 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} write_master: unknown bin op %u " , ns->name, op->op); |
2081 | return AS_ERR_PARAMETER; |
2082 | } |
2083 | } |
2084 | |
2085 | return 0; |
2086 | } |
2087 | |
2088 | |
2089 | //========================================================== |
2090 | // write_master() - unwind on failure or cleanup. |
2091 | // |
2092 | |
2093 | void |
2094 | write_master_index_metadata_unwind(index_metadata* old, as_record* r) |
2095 | { |
2096 | r->void_time = old->void_time; |
2097 | r->last_update_time = old->last_update_time; |
2098 | r->generation = old->generation; |
2099 | } |
2100 | |
2101 | |
2102 | void |
2103 | write_master_dim_single_bin_unwind(as_bin* old_bin, as_bin* new_bin, |
2104 | as_bin* cleanup_bins, uint32_t n_cleanup_bins) |
2105 | { |
2106 | as_particle* p_old = as_bin_get_particle(old_bin); |
2107 | |
2108 | if (as_bin_is_external_particle(new_bin) && new_bin->particle != p_old) { |
2109 | as_bin_particle_destroy(new_bin, true); |
2110 | } |
2111 | |
2112 | for (uint32_t i_cleanup = 0; i_cleanup < n_cleanup_bins; i_cleanup++) { |
2113 | as_bin* b_cleanup = &cleanup_bins[i_cleanup]; |
2114 | |
2115 | if (b_cleanup->particle != p_old) { |
2116 | as_bin_particle_destroy(b_cleanup, true); |
2117 | } |
2118 | } |
2119 | |
2120 | as_single_bin_copy(new_bin, old_bin); |
2121 | } |
2122 | |
2123 | |
2124 | void |
2125 | write_master_dim_unwind(as_bin* old_bins, uint32_t n_old_bins, as_bin* new_bins, |
2126 | uint32_t n_new_bins, as_bin* cleanup_bins, uint32_t n_cleanup_bins) |
2127 | { |
2128 | for (uint32_t i_new = 0; i_new < n_new_bins; i_new++) { |
2129 | as_bin* b_new = &new_bins[i_new]; |
2130 | |
2131 | if (! as_bin_inuse(b_new)) { |
2132 | break; |
2133 | } |
2134 | |
2135 | // Embedded particles have no-op destructors - skip loop over old bins. |
2136 | if (as_bin_is_embedded_particle(b_new)) { |
2137 | continue; |
2138 | } |
2139 | |
2140 | as_particle* p_new = b_new->particle; |
2141 | uint32_t i_old; |
2142 | |
2143 | for (i_old = 0; i_old < n_old_bins; i_old++) { |
2144 | as_bin* b_old = &old_bins[i_old]; |
2145 | |
2146 | if (b_new->id == b_old->id) { |
2147 | if (p_new != as_bin_get_particle(b_old)) { |
2148 | as_bin_particle_destroy(b_new, true); |
2149 | } |
2150 | |
2151 | break; |
2152 | } |
2153 | } |
2154 | |
2155 | if (i_old == n_old_bins) { |
2156 | as_bin_particle_destroy(b_new, true); |
2157 | } |
2158 | } |
2159 | |
2160 | for (uint32_t i_cleanup = 0; i_cleanup < n_cleanup_bins; i_cleanup++) { |
2161 | as_bin* b_cleanup = &cleanup_bins[i_cleanup]; |
2162 | as_particle* p_cleanup = b_cleanup->particle; |
2163 | uint32_t i_old; |
2164 | |
2165 | for (i_old = 0; i_old < n_old_bins; i_old++) { |
2166 | as_bin* b_old = &old_bins[i_old]; |
2167 | |
2168 | if (b_cleanup->id == b_old->id) { |
2169 | if (p_cleanup != as_bin_get_particle(b_old)) { |
2170 | as_bin_particle_destroy(b_cleanup, true); |
2171 | } |
2172 | |
2173 | break; |
2174 | } |
2175 | } |
2176 | |
2177 | if (i_old == n_old_bins) { |
2178 | as_bin_particle_destroy(b_cleanup, true); |
2179 | } |
2180 | } |
2181 | |
2182 | // The index element's as_bin_space pointer still points at old bins. |
2183 | } |
2184 | |