1/*
2 * rw_utils.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/rw_utils.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33
34#include "citrusleaf/cf_atomic.h" // xdr_allows_write
35#include "citrusleaf/cf_clock.h"
36#include "citrusleaf/cf_digest.h"
37
38#include "fault.h"
39#include "msg.h"
40
41#include "base/cfg.h" // xdr_allows_write
42#include "base/datamodel.h"
43#include "base/index.h"
44#include "base/predexp.h"
45#include "base/proto.h" // xdr_allows_write
46#include "base/secondary_index.h"
47#include "base/transaction.h"
48#include "base/xdr_serverside.h"
49#include "fabric/fabric.h"
50#include "storage/storage.h"
51#include "transaction/rw_request.h"
52
53
54//==========================================================
55// Public API.
56//
57
58// TODO - really? we can't hide this behind an XDR stub?
59bool
60xdr_allows_write(as_transaction* tr)
61{
62 if (as_transaction_is_xdr(tr)) {
63 if (tr->rsv.ns->ns_allow_xdr_writes) {
64 return true;
65 }
66 }
67 else {
68 if (tr->rsv.ns->ns_allow_nonxdr_writes) {
69 return true;
70 }
71 }
72
73 cf_atomic_int_incr(&tr->rsv.ns->n_fail_xdr_forbidden);
74
75 return false;
76}
77
78
79void
80send_rw_messages(rw_request* rw)
81{
82 for (uint32_t i = 0; i < rw->n_dest_nodes; i++) {
83 if (rw->dest_complete[i]) {
84 continue;
85 }
86
87 msg_incr_ref(rw->dest_msg);
88
89 if (as_fabric_send(rw->dest_nodes[i], rw->dest_msg,
90 AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
91 as_fabric_msg_put(rw->dest_msg);
92 rw->xmit_ms = 0; // force a retransmit on next cycle
93 }
94 }
95}
96
97
98void
99send_rw_messages_forget(rw_request* rw)
100{
101 for (uint32_t i = 0; i < rw->n_dest_nodes; i++) {
102 msg_incr_ref(rw->dest_msg);
103
104 if (as_fabric_send(rw->dest_nodes[i], rw->dest_msg,
105 AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
106 as_fabric_msg_put(rw->dest_msg);
107 }
108 }
109}
110
111
112bool
113set_name_check(const as_transaction* tr, const as_record* r)
114{
115 if (! as_transaction_has_set(tr)) {
116 return true; // allowed to not send set name in read or delete message
117 }
118
119 as_msg_field* f = as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_SET);
120 uint32_t msg_set_name_len = as_msg_field_get_value_sz(f);
121
122 if (msg_set_name_len == 0) {
123 return true; // treat the same as no set name
124 }
125
126 as_namespace* ns = tr->rsv.ns;
127 const char* set_name = as_index_get_set_name(r, ns);
128
129 if (set_name == NULL ||
130 strncmp(set_name, (const char*)f->data, msg_set_name_len) != 0 ||
131 set_name[msg_set_name_len] != 0) {
132 CF_ZSTR_DEFINE(msg_set_name, AS_SET_NAME_MAX_SIZE + 4, f->data,
133 msg_set_name_len);
134
135 cf_warning(AS_RW, "{%s} set name mismatch %s %s", ns->name,
136 set_name == NULL ? "(null)" : set_name, msg_set_name);
137 return false;
138 }
139
140 return true;
141}
142
143
144int
145set_set_from_msg(as_record* r, as_namespace* ns, as_msg* m)
146{
147 as_msg_field* f = as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET);
148 size_t name_len = (size_t)as_msg_field_get_value_sz(f);
149
150 if (name_len == 0) {
151 return 0;
152 }
153
154 // Given the name, find/assign the set-ID and write it in the as_index.
155 return as_index_set_set_w_len(r, ns, (const char*)f->data, name_len, true);
156}
157
158
159int
160build_predexp_and_filter_meta(const as_transaction* tr, const as_record* r,
161 predexp_eval_t** predexp)
162{
163 if (! as_transaction_has_predexp(tr)) {
164 *predexp = NULL;
165 return AS_OK;
166 }
167
168 as_msg_field* f = as_msg_field_get(&tr->msgp->msg,
169 AS_MSG_FIELD_TYPE_PREDEXP);
170
171 if ((*predexp = predexp_build(f)) == NULL) {
172 return AS_ERR_PARAMETER;
173 }
174
175 // TODO - perhaps fields of predexp_args_t should be const?
176 predexp_args_t predargs = { .ns = tr->rsv.ns, .md = (as_record*)r };
177 predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs);
178
179 if (predrv == PREDEXP_UNKNOWN) {
180 return AS_OK; // caller must later check bins using *predexp
181 }
182 // else - caller will not need to apply filter later.
183
184 predexp_destroy(*predexp);
185 *predexp = NULL;
186
187 return predrv == PREDEXP_TRUE ? AS_OK : AS_ERR_FILTERED_OUT;
188}
189
190
191int
192predexp_read_and_filter_bins(as_storage_rd* rd, predexp_eval_t* predexp)
193{
194 int result;
195
196 if ((result = as_storage_rd_load_n_bins(rd)) < 0) {
197 return -result;
198 }
199
200 as_namespace* ns = rd->ns;
201 as_record* r = rd->r;
202
203 as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd->n_bins];
204
205 if ((result = as_storage_rd_load_bins(rd, stack_bins)) < 0) {
206 return -result;
207 }
208
209 predexp_args_t predargs = { .ns = ns, .md = r, .rd = rd };
210
211 if (! predexp_matches_record(predexp, &predargs)) {
212 return AS_ERR_FILTERED_OUT;
213 }
214
215 return AS_OK;
216}
217
218
219// Caller must have checked that key is present in message.
220bool
221check_msg_key(as_msg* m, as_storage_rd* rd)
222{
223 as_msg_field* f = as_msg_field_get(m, AS_MSG_FIELD_TYPE_KEY);
224 uint32_t key_size = as_msg_field_get_value_sz(f);
225 uint8_t* key = f->data;
226
227 if (key_size != rd->key_size || memcmp(key, rd->key, key_size) != 0) {
228 cf_warning(AS_RW, "key mismatch - end of universe?");
229 return false;
230 }
231
232 return true;
233}
234
235
236bool
237get_msg_key(as_transaction* tr, as_storage_rd* rd)
238{
239 if (! as_transaction_has_key(tr)) {
240 return true;
241 }
242
243 if (rd->ns->single_bin && rd->ns->storage_data_in_memory) {
244 cf_warning(AS_RW, "{%s} can't store key if data-in-memory & single-bin",
245 tr->rsv.ns->name);
246 return false;
247 }
248
249 as_msg_field* f = as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_KEY);
250
251 if ((rd->key_size = as_msg_field_get_value_sz(f)) == 0) {
252 cf_warning(AS_RW, "msg flat key size is 0");
253 return false;
254 }
255
256 rd->key = f->data;
257
258 if (*rd->key == AS_PARTICLE_TYPE_INTEGER &&
259 rd->key_size != 1 + sizeof(uint64_t)) {
260 cf_warning(AS_RW, "bad msg integer key flat size %u", rd->key_size);
261 return false;
262 }
263
264 return true;
265}
266
267
268int
269handle_msg_key(as_transaction* tr, as_storage_rd* rd)
270{
271 // Shortcut pointers.
272 as_msg* m = &tr->msgp->msg;
273 as_namespace* ns = tr->rsv.ns;
274
275 if (rd->r->key_stored == 1) {
276 // Key stored for this record - be sure it gets rewritten.
277
278 // This will force a device read for non-data-in-memory, even if
279 // must_fetch_data is false! Since there's no advantage to using the
280 // loaded block after this if must_fetch_data is false, leave the
281 // subsequent code as-is.
282 if (! as_storage_record_get_key(rd)) {
283 cf_warning_digest(AS_RW, &tr->keyd, "{%s} can't get stored key ",
284 ns->name);
285 return AS_ERR_UNKNOWN;
286 }
287
288 // Check the client-sent key, if any, against the stored key.
289 if (as_transaction_has_key(tr) && ! check_msg_key(m, rd)) {
290 cf_warning_digest(AS_RW, &tr->keyd, "{%s} key mismatch ", ns->name);
291 return AS_ERR_KEY_MISMATCH;
292 }
293 }
294 // If we got a key without a digest, it's an old client, not a cue to store
295 // the key. (Remove this check when we're sure all old C clients are gone.)
296 else if (as_transaction_has_digest(tr)) {
297 // Key not stored for this record - store one if sent from client. For
298 // data-in-memory, don't allocate the key until we reach the point of no
299 // return. Also don't set AS_INDEX_FLAG_KEY_STORED flag until then.
300 if (! get_msg_key(tr, rd)) {
301 return AS_ERR_UNSUPPORTED_FEATURE;
302 }
303 }
304
305 return 0;
306}
307
308
309void
310update_metadata_in_index(as_transaction* tr, as_record* r)
311{
312 // Shortcut pointers.
313 as_msg* m = &tr->msgp->msg;
314 as_namespace* ns = tr->rsv.ns;
315
316 uint64_t now = cf_clepoch_milliseconds();
317
318 switch (m->record_ttl) {
319 case TTL_NAMESPACE_DEFAULT:
320 if (ns->default_ttl != 0) {
321 // Set record void-time using default TTL value.
322 r->void_time = (now / 1000) + ns->default_ttl;
323 }
324 else {
325 // Default TTL is "never expire".
326 r->void_time = 0;
327 }
328 break;
329 case TTL_NEVER_EXPIRE:
330 // Set record to "never expire".
331 r->void_time = 0;
332 break;
333 case TTL_DONT_UPDATE:
334 // Do not change record's void time.
335 break;
336 default:
337 // Apply non-special m->record_ttl directly. Have already checked
338 // m->record_ttl <= 10 years, so no overflow etc.
339 r->void_time = (now / 1000) + m->record_ttl;
340 break;
341 }
342
343 as_record_set_lut(r, tr->rsv.regime, now, ns);
344 as_record_increment_generation(r, ns);
345}
346
347
348void
349pickle_all(as_storage_rd* rd, rw_request* rw)
350{
351 if (rd->keep_pickle) {
352 rw->pickle = rd->pickle;
353 rw->pickle_sz = rd->pickle_sz;
354 return;
355 }
356 // else - new protocol with no destination node(s), or old protocol.
357
358 if (rw->n_dest_nodes == 0) {
359 return;
360 }
361 // else - old protocol with destination node(s).
362
363 // TODO - old pickle - remove in "six months".
364
365 rw->is_old_pickle = true;
366 rw->pickle = as_record_pickle(rd, &rw->pickle_sz);
367
368 rw->set_name = rd->set_name;
369 rw->set_name_len = rd->set_name_len;
370
371 if (rd->key) {
372 rw->key = cf_malloc(rd->key_size);
373 rw->key_size = rd->key_size;
374 memcpy(rw->key, rd->key, rd->key_size);
375 }
376}
377
378
379bool
380write_sindex_update(as_namespace* ns, const char* set_name, cf_digest* keyd,
381 as_bin* old_bins, uint32_t n_old_bins, as_bin* new_bins,
382 uint32_t n_new_bins)
383{
384 int n_populated = 0;
385 bool not_just_created[n_new_bins];
386
387 memset(not_just_created, 0, sizeof(not_just_created));
388
389 // Maximum number of sindexes which can be changed in one transaction is
390 // 2 * ns->sindex_cnt.
391
392 SINDEX_GRLOCK();
393 SINDEX_BINS_SETUP(sbins, 2 * ns->sindex_cnt);
394 as_sindex* si_arr[2 * ns->sindex_cnt];
395 int si_arr_index = 0;
396
397 // Reserve matching SIs.
398
399 for (int i = 0; i < n_old_bins; i++) {
400 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name,
401 old_bins[i].id, &si_arr[si_arr_index]);
402 }
403
404 for (int i = 0; i < n_new_bins; i++) {
405 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name,
406 new_bins[i].id, &si_arr[si_arr_index]);
407 }
408
409 // For every old bin, find the corresponding new bin (if any) and adjust the
410 // secondary index if the bin was modified. If no corresponding new bin is
411 // found, it means the old bin was deleted - also adjust the secondary index
412 // accordingly.
413
414 for (int32_t i_old = 0; i_old < (int32_t)n_old_bins; i_old++) {
415 as_bin* b_old = &old_bins[i_old];
416 bool found = false;
417
418 // Loop over new bins. Start at old bin index (if possible) and go down,
419 // wrapping around to do the higher indexes last. This will find a match
420 // (if any) very quickly - instantly, unless there were bins deleted.
421
422 bool any_new = n_new_bins != 0;
423 int32_t n_new_minus_1 = (int32_t)n_new_bins - 1;
424 int32_t i_new = n_new_minus_1 < i_old ? n_new_minus_1 : i_old;
425
426 while (any_new) {
427 as_bin* b_new = &new_bins[i_new];
428
429 if (b_old->id == b_new->id) {
430 if (as_bin_get_particle_type(b_old) !=
431 as_bin_get_particle_type(b_new) ||
432 b_old->particle != b_new->particle) {
433 n_populated += as_sindex_sbins_populate(
434 &sbins[n_populated], ns, set_name, b_old, b_new);
435 }
436
437 found = true;
438 not_just_created[i_new] = true;
439 break;
440 }
441
442 if (--i_new < 0 && (i_new = n_new_minus_1) <= i_old) {
443 break;
444 }
445
446 if (i_new == i_old) {
447 break;
448 }
449 }
450
451 if (! found) {
452 n_populated += as_sindex_sbins_from_bin(ns, set_name, b_old,
453 &sbins[n_populated], AS_SINDEX_OP_DELETE);
454 }
455 }
456
457 // Now find the new bins that are just-created bins. We've marked the others
458 // in the loop above, so any left are just-created.
459
460 for (uint32_t i_new = 0; i_new < n_new_bins; i_new++) {
461 if (not_just_created[i_new]) {
462 continue;
463 }
464
465 n_populated += as_sindex_sbins_from_bin(ns, set_name, &new_bins[i_new],
466 &sbins[n_populated], AS_SINDEX_OP_INSERT);
467 }
468
469 SINDEX_GRUNLOCK();
470
471 if (n_populated != 0) {
472 as_sindex_update_by_sbin(ns, set_name, sbins, n_populated, keyd);
473 as_sindex_sbin_freeall(sbins, n_populated);
474 }
475
476 as_sindex_release_arr(si_arr, si_arr_index);
477
478 return n_populated != 0;
479}
480
481
482// If called for data-not-in-memory, this may read record from drive!
483// TODO - rename as as_record_... and move to record.c?
484void
485record_delete_adjust_sindex(as_record* r, as_namespace* ns)
486{
487 if (! record_has_sindex(r, ns)) {
488 return;
489 }
490
491 as_storage_rd rd;
492
493 as_storage_record_open(ns, r, &rd);
494 as_storage_rd_load_n_bins(&rd);
495
496 as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd.n_bins];
497
498 as_storage_rd_load_bins(&rd, stack_bins);
499
500 remove_from_sindex(ns, as_index_get_set_name(r, ns), &r->keyd, rd.bins,
501 rd.n_bins);
502
503 as_storage_record_close(&rd);
504}
505
506
507// Remove record from secondary index. Called only for data-in-memory. If
508// data-not-in-memory, existing record is not read, and secondary index entry is
509// cleaned up by background sindex defrag thread.
510// TODO - rename as as_record_... and move to record.c?
511void
512delete_adjust_sindex(as_storage_rd* rd)
513{
514 as_namespace* ns = rd->ns;
515
516 if (! record_has_sindex(rd->r, ns)) {
517 return;
518 }
519
520 as_storage_rd_load_n_bins(rd);
521 as_storage_rd_load_bins(rd, NULL);
522
523 remove_from_sindex(ns, as_index_get_set_name(rd->r, ns), &rd->r->keyd,
524 rd->bins, rd->n_bins);
525}
526
527
528// TODO - rename as as_record_..., move to record.c, take r instead of set_name,
529// and lose keyd parameter?
530void
531remove_from_sindex(as_namespace* ns, const char* set_name, cf_digest* keyd,
532 as_bin* bins, uint32_t n_bins)
533{
534 SINDEX_GRLOCK();
535
536 SINDEX_BINS_SETUP(sbins, ns->sindex_cnt);
537
538 as_sindex* si_arr[ns->sindex_cnt];
539 int si_arr_index = 0;
540 int sbins_populated = 0;
541
542 // Reserve matching sindexes.
543 for (int i = 0; i < (int)n_bins; i++) {
544 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name,
545 bins[i].id, &si_arr[si_arr_index]);
546 }
547
548 for (int i = 0; i < (int)n_bins; i++) {
549 sbins_populated += as_sindex_sbins_from_bin(ns, set_name, &bins[i],
550 &sbins[sbins_populated], AS_SINDEX_OP_DELETE);
551 }
552
553 SINDEX_GRUNLOCK();
554
555 if (sbins_populated) {
556 as_sindex_update_by_sbin(ns, set_name, sbins, sbins_populated, keyd);
557 as_sindex_sbin_freeall(sbins, sbins_populated);
558 }
559
560 as_sindex_release_arr(si_arr, si_arr_index);
561}
562
563
564bool
565xdr_must_ship_delete(as_namespace* ns, bool is_xdr_op)
566{
567 if (! is_xdr_delete_shipping_enabled()) {
568 return false;
569 }
570
571 return ! is_xdr_op ||
572 // If this delete is a result of XDR shipping, don't ship it unless
573 // configured to do so.
574 is_xdr_forwarding_enabled() || ns->ns_forward_xdr_writes;
575}
576