1 | /* |
2 | * rw_utils.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/rw_utils.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | #include <string.h> |
33 | |
34 | #include "citrusleaf/cf_atomic.h" // xdr_allows_write |
35 | #include "citrusleaf/cf_clock.h" |
36 | #include "citrusleaf/cf_digest.h" |
37 | |
38 | #include "fault.h" |
39 | #include "msg.h" |
40 | |
41 | #include "base/cfg.h" // xdr_allows_write |
42 | #include "base/datamodel.h" |
43 | #include "base/index.h" |
44 | #include "base/predexp.h" |
45 | #include "base/proto.h" // xdr_allows_write |
46 | #include "base/secondary_index.h" |
47 | #include "base/transaction.h" |
48 | #include "base/xdr_serverside.h" |
49 | #include "fabric/fabric.h" |
50 | #include "storage/storage.h" |
51 | #include "transaction/rw_request.h" |
52 | |
53 | |
54 | //========================================================== |
55 | // Public API. |
56 | // |
57 | |
58 | // TODO - really? we can't hide this behind an XDR stub? |
59 | bool |
60 | xdr_allows_write(as_transaction* tr) |
61 | { |
62 | if (as_transaction_is_xdr(tr)) { |
63 | if (tr->rsv.ns->ns_allow_xdr_writes) { |
64 | return true; |
65 | } |
66 | } |
67 | else { |
68 | if (tr->rsv.ns->ns_allow_nonxdr_writes) { |
69 | return true; |
70 | } |
71 | } |
72 | |
73 | cf_atomic_int_incr(&tr->rsv.ns->n_fail_xdr_forbidden); |
74 | |
75 | return false; |
76 | } |
77 | |
78 | |
79 | void |
80 | send_rw_messages(rw_request* rw) |
81 | { |
82 | for (uint32_t i = 0; i < rw->n_dest_nodes; i++) { |
83 | if (rw->dest_complete[i]) { |
84 | continue; |
85 | } |
86 | |
87 | msg_incr_ref(rw->dest_msg); |
88 | |
89 | if (as_fabric_send(rw->dest_nodes[i], rw->dest_msg, |
90 | AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
91 | as_fabric_msg_put(rw->dest_msg); |
92 | rw->xmit_ms = 0; // force a retransmit on next cycle |
93 | } |
94 | } |
95 | } |
96 | |
97 | |
98 | void |
99 | send_rw_messages_forget(rw_request* rw) |
100 | { |
101 | for (uint32_t i = 0; i < rw->n_dest_nodes; i++) { |
102 | msg_incr_ref(rw->dest_msg); |
103 | |
104 | if (as_fabric_send(rw->dest_nodes[i], rw->dest_msg, |
105 | AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
106 | as_fabric_msg_put(rw->dest_msg); |
107 | } |
108 | } |
109 | } |
110 | |
111 | |
112 | bool |
113 | set_name_check(const as_transaction* tr, const as_record* r) |
114 | { |
115 | if (! as_transaction_has_set(tr)) { |
116 | return true; // allowed to not send set name in read or delete message |
117 | } |
118 | |
119 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_SET); |
120 | uint32_t msg_set_name_len = as_msg_field_get_value_sz(f); |
121 | |
122 | if (msg_set_name_len == 0) { |
123 | return true; // treat the same as no set name |
124 | } |
125 | |
126 | as_namespace* ns = tr->rsv.ns; |
127 | const char* set_name = as_index_get_set_name(r, ns); |
128 | |
129 | if (set_name == NULL || |
130 | strncmp(set_name, (const char*)f->data, msg_set_name_len) != 0 || |
131 | set_name[msg_set_name_len] != 0) { |
132 | CF_ZSTR_DEFINE(msg_set_name, AS_SET_NAME_MAX_SIZE + 4, f->data, |
133 | msg_set_name_len); |
134 | |
135 | cf_warning(AS_RW, "{%s} set name mismatch %s %s" , ns->name, |
136 | set_name == NULL ? "(null)" : set_name, msg_set_name); |
137 | return false; |
138 | } |
139 | |
140 | return true; |
141 | } |
142 | |
143 | |
144 | int |
145 | set_set_from_msg(as_record* r, as_namespace* ns, as_msg* m) |
146 | { |
147 | as_msg_field* f = as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET); |
148 | size_t name_len = (size_t)as_msg_field_get_value_sz(f); |
149 | |
150 | if (name_len == 0) { |
151 | return 0; |
152 | } |
153 | |
154 | // Given the name, find/assign the set-ID and write it in the as_index. |
155 | return as_index_set_set_w_len(r, ns, (const char*)f->data, name_len, true); |
156 | } |
157 | |
158 | |
159 | int |
160 | build_predexp_and_filter_meta(const as_transaction* tr, const as_record* r, |
161 | predexp_eval_t** predexp) |
162 | { |
163 | if (! as_transaction_has_predexp(tr)) { |
164 | *predexp = NULL; |
165 | return AS_OK; |
166 | } |
167 | |
168 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, |
169 | AS_MSG_FIELD_TYPE_PREDEXP); |
170 | |
171 | if ((*predexp = predexp_build(f)) == NULL) { |
172 | return AS_ERR_PARAMETER; |
173 | } |
174 | |
175 | // TODO - perhaps fields of predexp_args_t should be const? |
176 | predexp_args_t predargs = { .ns = tr->rsv.ns, .md = (as_record*)r }; |
177 | predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs); |
178 | |
179 | if (predrv == PREDEXP_UNKNOWN) { |
180 | return AS_OK; // caller must later check bins using *predexp |
181 | } |
182 | // else - caller will not need to apply filter later. |
183 | |
184 | predexp_destroy(*predexp); |
185 | *predexp = NULL; |
186 | |
187 | return predrv == PREDEXP_TRUE ? AS_OK : AS_ERR_FILTERED_OUT; |
188 | } |
189 | |
190 | |
191 | int |
192 | predexp_read_and_filter_bins(as_storage_rd* rd, predexp_eval_t* predexp) |
193 | { |
194 | int result; |
195 | |
196 | if ((result = as_storage_rd_load_n_bins(rd)) < 0) { |
197 | return -result; |
198 | } |
199 | |
200 | as_namespace* ns = rd->ns; |
201 | as_record* r = rd->r; |
202 | |
203 | as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd->n_bins]; |
204 | |
205 | if ((result = as_storage_rd_load_bins(rd, stack_bins)) < 0) { |
206 | return -result; |
207 | } |
208 | |
209 | predexp_args_t predargs = { .ns = ns, .md = r, .rd = rd }; |
210 | |
211 | if (! predexp_matches_record(predexp, &predargs)) { |
212 | return AS_ERR_FILTERED_OUT; |
213 | } |
214 | |
215 | return AS_OK; |
216 | } |
217 | |
218 | |
219 | // Caller must have checked that key is present in message. |
220 | bool |
221 | check_msg_key(as_msg* m, as_storage_rd* rd) |
222 | { |
223 | as_msg_field* f = as_msg_field_get(m, AS_MSG_FIELD_TYPE_KEY); |
224 | uint32_t key_size = as_msg_field_get_value_sz(f); |
225 | uint8_t* key = f->data; |
226 | |
227 | if (key_size != rd->key_size || memcmp(key, rd->key, key_size) != 0) { |
228 | cf_warning(AS_RW, "key mismatch - end of universe?" ); |
229 | return false; |
230 | } |
231 | |
232 | return true; |
233 | } |
234 | |
235 | |
236 | bool |
237 | get_msg_key(as_transaction* tr, as_storage_rd* rd) |
238 | { |
239 | if (! as_transaction_has_key(tr)) { |
240 | return true; |
241 | } |
242 | |
243 | if (rd->ns->single_bin && rd->ns->storage_data_in_memory) { |
244 | cf_warning(AS_RW, "{%s} can't store key if data-in-memory & single-bin" , |
245 | tr->rsv.ns->name); |
246 | return false; |
247 | } |
248 | |
249 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_KEY); |
250 | |
251 | if ((rd->key_size = as_msg_field_get_value_sz(f)) == 0) { |
252 | cf_warning(AS_RW, "msg flat key size is 0" ); |
253 | return false; |
254 | } |
255 | |
256 | rd->key = f->data; |
257 | |
258 | if (*rd->key == AS_PARTICLE_TYPE_INTEGER && |
259 | rd->key_size != 1 + sizeof(uint64_t)) { |
260 | cf_warning(AS_RW, "bad msg integer key flat size %u" , rd->key_size); |
261 | return false; |
262 | } |
263 | |
264 | return true; |
265 | } |
266 | |
267 | |
268 | int |
269 | handle_msg_key(as_transaction* tr, as_storage_rd* rd) |
270 | { |
271 | // Shortcut pointers. |
272 | as_msg* m = &tr->msgp->msg; |
273 | as_namespace* ns = tr->rsv.ns; |
274 | |
275 | if (rd->r->key_stored == 1) { |
276 | // Key stored for this record - be sure it gets rewritten. |
277 | |
278 | // This will force a device read for non-data-in-memory, even if |
279 | // must_fetch_data is false! Since there's no advantage to using the |
280 | // loaded block after this if must_fetch_data is false, leave the |
281 | // subsequent code as-is. |
282 | if (! as_storage_record_get_key(rd)) { |
283 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} can't get stored key " , |
284 | ns->name); |
285 | return AS_ERR_UNKNOWN; |
286 | } |
287 | |
288 | // Check the client-sent key, if any, against the stored key. |
289 | if (as_transaction_has_key(tr) && ! check_msg_key(m, rd)) { |
290 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} key mismatch " , ns->name); |
291 | return AS_ERR_KEY_MISMATCH; |
292 | } |
293 | } |
294 | // If we got a key without a digest, it's an old client, not a cue to store |
295 | // the key. (Remove this check when we're sure all old C clients are gone.) |
296 | else if (as_transaction_has_digest(tr)) { |
297 | // Key not stored for this record - store one if sent from client. For |
298 | // data-in-memory, don't allocate the key until we reach the point of no |
299 | // return. Also don't set AS_INDEX_FLAG_KEY_STORED flag until then. |
300 | if (! get_msg_key(tr, rd)) { |
301 | return AS_ERR_UNSUPPORTED_FEATURE; |
302 | } |
303 | } |
304 | |
305 | return 0; |
306 | } |
307 | |
308 | |
309 | void |
310 | update_metadata_in_index(as_transaction* tr, as_record* r) |
311 | { |
312 | // Shortcut pointers. |
313 | as_msg* m = &tr->msgp->msg; |
314 | as_namespace* ns = tr->rsv.ns; |
315 | |
316 | uint64_t now = cf_clepoch_milliseconds(); |
317 | |
318 | switch (m->record_ttl) { |
319 | case TTL_NAMESPACE_DEFAULT: |
320 | if (ns->default_ttl != 0) { |
321 | // Set record void-time using default TTL value. |
322 | r->void_time = (now / 1000) + ns->default_ttl; |
323 | } |
324 | else { |
325 | // Default TTL is "never expire". |
326 | r->void_time = 0; |
327 | } |
328 | break; |
329 | case TTL_NEVER_EXPIRE: |
330 | // Set record to "never expire". |
331 | r->void_time = 0; |
332 | break; |
333 | case TTL_DONT_UPDATE: |
334 | // Do not change record's void time. |
335 | break; |
336 | default: |
337 | // Apply non-special m->record_ttl directly. Have already checked |
338 | // m->record_ttl <= 10 years, so no overflow etc. |
339 | r->void_time = (now / 1000) + m->record_ttl; |
340 | break; |
341 | } |
342 | |
343 | as_record_set_lut(r, tr->rsv.regime, now, ns); |
344 | as_record_increment_generation(r, ns); |
345 | } |
346 | |
347 | |
348 | void |
349 | pickle_all(as_storage_rd* rd, rw_request* rw) |
350 | { |
351 | if (rd->keep_pickle) { |
352 | rw->pickle = rd->pickle; |
353 | rw->pickle_sz = rd->pickle_sz; |
354 | return; |
355 | } |
356 | // else - new protocol with no destination node(s), or old protocol. |
357 | |
358 | if (rw->n_dest_nodes == 0) { |
359 | return; |
360 | } |
361 | // else - old protocol with destination node(s). |
362 | |
363 | // TODO - old pickle - remove in "six months". |
364 | |
365 | rw->is_old_pickle = true; |
366 | rw->pickle = as_record_pickle(rd, &rw->pickle_sz); |
367 | |
368 | rw->set_name = rd->set_name; |
369 | rw->set_name_len = rd->set_name_len; |
370 | |
371 | if (rd->key) { |
372 | rw->key = cf_malloc(rd->key_size); |
373 | rw->key_size = rd->key_size; |
374 | memcpy(rw->key, rd->key, rd->key_size); |
375 | } |
376 | } |
377 | |
378 | |
379 | bool |
380 | write_sindex_update(as_namespace* ns, const char* set_name, cf_digest* keyd, |
381 | as_bin* old_bins, uint32_t n_old_bins, as_bin* new_bins, |
382 | uint32_t n_new_bins) |
383 | { |
384 | int n_populated = 0; |
385 | bool not_just_created[n_new_bins]; |
386 | |
387 | memset(not_just_created, 0, sizeof(not_just_created)); |
388 | |
389 | // Maximum number of sindexes which can be changed in one transaction is |
390 | // 2 * ns->sindex_cnt. |
391 | |
392 | SINDEX_GRLOCK(); |
393 | SINDEX_BINS_SETUP(sbins, 2 * ns->sindex_cnt); |
394 | as_sindex* si_arr[2 * ns->sindex_cnt]; |
395 | int si_arr_index = 0; |
396 | |
397 | // Reserve matching SIs. |
398 | |
399 | for (int i = 0; i < n_old_bins; i++) { |
400 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, |
401 | old_bins[i].id, &si_arr[si_arr_index]); |
402 | } |
403 | |
404 | for (int i = 0; i < n_new_bins; i++) { |
405 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, |
406 | new_bins[i].id, &si_arr[si_arr_index]); |
407 | } |
408 | |
409 | // For every old bin, find the corresponding new bin (if any) and adjust the |
410 | // secondary index if the bin was modified. If no corresponding new bin is |
411 | // found, it means the old bin was deleted - also adjust the secondary index |
412 | // accordingly. |
413 | |
414 | for (int32_t i_old = 0; i_old < (int32_t)n_old_bins; i_old++) { |
415 | as_bin* b_old = &old_bins[i_old]; |
416 | bool found = false; |
417 | |
418 | // Loop over new bins. Start at old bin index (if possible) and go down, |
419 | // wrapping around to do the higher indexes last. This will find a match |
420 | // (if any) very quickly - instantly, unless there were bins deleted. |
421 | |
422 | bool any_new = n_new_bins != 0; |
423 | int32_t n_new_minus_1 = (int32_t)n_new_bins - 1; |
424 | int32_t i_new = n_new_minus_1 < i_old ? n_new_minus_1 : i_old; |
425 | |
426 | while (any_new) { |
427 | as_bin* b_new = &new_bins[i_new]; |
428 | |
429 | if (b_old->id == b_new->id) { |
430 | if (as_bin_get_particle_type(b_old) != |
431 | as_bin_get_particle_type(b_new) || |
432 | b_old->particle != b_new->particle) { |
433 | n_populated += as_sindex_sbins_populate( |
434 | &sbins[n_populated], ns, set_name, b_old, b_new); |
435 | } |
436 | |
437 | found = true; |
438 | not_just_created[i_new] = true; |
439 | break; |
440 | } |
441 | |
442 | if (--i_new < 0 && (i_new = n_new_minus_1) <= i_old) { |
443 | break; |
444 | } |
445 | |
446 | if (i_new == i_old) { |
447 | break; |
448 | } |
449 | } |
450 | |
451 | if (! found) { |
452 | n_populated += as_sindex_sbins_from_bin(ns, set_name, b_old, |
453 | &sbins[n_populated], AS_SINDEX_OP_DELETE); |
454 | } |
455 | } |
456 | |
457 | // Now find the new bins that are just-created bins. We've marked the others |
458 | // in the loop above, so any left are just-created. |
459 | |
460 | for (uint32_t i_new = 0; i_new < n_new_bins; i_new++) { |
461 | if (not_just_created[i_new]) { |
462 | continue; |
463 | } |
464 | |
465 | n_populated += as_sindex_sbins_from_bin(ns, set_name, &new_bins[i_new], |
466 | &sbins[n_populated], AS_SINDEX_OP_INSERT); |
467 | } |
468 | |
469 | SINDEX_GRUNLOCK(); |
470 | |
471 | if (n_populated != 0) { |
472 | as_sindex_update_by_sbin(ns, set_name, sbins, n_populated, keyd); |
473 | as_sindex_sbin_freeall(sbins, n_populated); |
474 | } |
475 | |
476 | as_sindex_release_arr(si_arr, si_arr_index); |
477 | |
478 | return n_populated != 0; |
479 | } |
480 | |
481 | |
482 | // If called for data-not-in-memory, this may read record from drive! |
483 | // TODO - rename as as_record_... and move to record.c? |
484 | void |
485 | record_delete_adjust_sindex(as_record* r, as_namespace* ns) |
486 | { |
487 | if (! record_has_sindex(r, ns)) { |
488 | return; |
489 | } |
490 | |
491 | as_storage_rd rd; |
492 | |
493 | as_storage_record_open(ns, r, &rd); |
494 | as_storage_rd_load_n_bins(&rd); |
495 | |
496 | as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd.n_bins]; |
497 | |
498 | as_storage_rd_load_bins(&rd, stack_bins); |
499 | |
500 | remove_from_sindex(ns, as_index_get_set_name(r, ns), &r->keyd, rd.bins, |
501 | rd.n_bins); |
502 | |
503 | as_storage_record_close(&rd); |
504 | } |
505 | |
506 | |
507 | // Remove record from secondary index. Called only for data-in-memory. If |
508 | // data-not-in-memory, existing record is not read, and secondary index entry is |
509 | // cleaned up by background sindex defrag thread. |
510 | // TODO - rename as as_record_... and move to record.c? |
511 | void |
512 | delete_adjust_sindex(as_storage_rd* rd) |
513 | { |
514 | as_namespace* ns = rd->ns; |
515 | |
516 | if (! record_has_sindex(rd->r, ns)) { |
517 | return; |
518 | } |
519 | |
520 | as_storage_rd_load_n_bins(rd); |
521 | as_storage_rd_load_bins(rd, NULL); |
522 | |
523 | remove_from_sindex(ns, as_index_get_set_name(rd->r, ns), &rd->r->keyd, |
524 | rd->bins, rd->n_bins); |
525 | } |
526 | |
527 | |
528 | // TODO - rename as as_record_..., move to record.c, take r instead of set_name, |
529 | // and lose keyd parameter? |
530 | void |
531 | remove_from_sindex(as_namespace* ns, const char* set_name, cf_digest* keyd, |
532 | as_bin* bins, uint32_t n_bins) |
533 | { |
534 | SINDEX_GRLOCK(); |
535 | |
536 | SINDEX_BINS_SETUP(sbins, ns->sindex_cnt); |
537 | |
538 | as_sindex* si_arr[ns->sindex_cnt]; |
539 | int si_arr_index = 0; |
540 | int sbins_populated = 0; |
541 | |
542 | // Reserve matching sindexes. |
543 | for (int i = 0; i < (int)n_bins; i++) { |
544 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, |
545 | bins[i].id, &si_arr[si_arr_index]); |
546 | } |
547 | |
548 | for (int i = 0; i < (int)n_bins; i++) { |
549 | sbins_populated += as_sindex_sbins_from_bin(ns, set_name, &bins[i], |
550 | &sbins[sbins_populated], AS_SINDEX_OP_DELETE); |
551 | } |
552 | |
553 | SINDEX_GRUNLOCK(); |
554 | |
555 | if (sbins_populated) { |
556 | as_sindex_update_by_sbin(ns, set_name, sbins, sbins_populated, keyd); |
557 | as_sindex_sbin_freeall(sbins, sbins_populated); |
558 | } |
559 | |
560 | as_sindex_release_arr(si_arr, si_arr_index); |
561 | } |
562 | |
563 | |
564 | bool |
565 | xdr_must_ship_delete(as_namespace* ns, bool is_xdr_op) |
566 | { |
567 | if (! is_xdr_delete_shipping_enabled()) { |
568 | return false; |
569 | } |
570 | |
571 | return ! is_xdr_op || |
572 | // If this delete is a result of XDR shipping, don't ship it unless |
573 | // configured to do so. |
574 | is_xdr_forwarding_enabled() || ns->ns_forward_xdr_writes; |
575 | } |
576 | |