1/*
2 * udf_aerospike.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "base/udf_aerospike.h"
24
25#include <stdbool.h>
26#include <stddef.h>
27#include <stdint.h>
28#include <string.h>
29#include <asm/byteorder.h>
30
31#include "aerospike/as_aerospike.h"
32#include "aerospike/as_boolean.h"
33#include "aerospike/as_buffer.h"
34#include "aerospike/as_bytes.h"
35#include "aerospike/as_integer.h"
36#include "aerospike/as_msgpack.h"
37#include "aerospike/as_serializer.h"
38#include "aerospike/as_string.h"
39#include "aerospike/as_val.h"
40#include "citrusleaf/cf_clock.h"
41
42#include "fault.h"
43
44#include "base/datamodel.h"
45#include "base/index.h"
46#include "base/secondary_index.h"
47#include "base/transaction.h"
48#include "base/truncate.h"
49#include "base/udf_record.h"
50#include "base/xdr_serverside.h"
51#include "storage/storage.h"
52#include "transaction/rw_utils.h"
53#include "transaction/udf.h"
54
55
56static int udf_aerospike_rec_remove(const as_aerospike *, const as_rec *);
57/*
58 * Internal Function: udf_aerospike_delbin
59 *
60 * Parameters:
61 * r - udf_record to be manipulated
62 * bname - name of the bin to be deleted
63 *
64 * Return value:
65 * 0 on success
66 * -1 on failure
67 *
68 * Description:
69 * The function deletes the bin with the name
70 * passed in as parameter. The as_bin_destroy function
71 * which is called here, only frees the data and
72 * the bin is marked as not in use. The bin can then be reused later.
73 *
74 * Synchronization : object lock acquired by the transaction thread executing UDF.
75 * Partition reservation takes place just before the transaction starts executing
76 * ( look for as_partition_reserve_udf in thr_tsvc.c )
77 *
78 * Callers:
79 * udf_aerospike__apply_update_atomic
80 * In this function, if it fails at the time of update, the record is set
81 * to rollback all the updates till this point. The case where it fails in
82 * rollback is not handled.
83 *
84 * Side Notes:
85 * i. write_to_device will be set to true on a successful bin destroy.
86 * If all the updates from udf_aerospike__apply_update_atomic (including this) are
87 * successful, the record will be written to disk and reopened so that the rest of
88 * sets of updates can be applied.
89 *
90 * ii. If delete from sindex fails, we do not handle it.
91 */
92static int
93udf_aerospike_delbin(udf_record * urecord, const char * bname)
94{
95 as_storage_rd *rd = urecord->rd;
96 as_namespace *ns = rd->ns;
97
98 // Check that bname is not completely invalid
99 if (bname == NULL || (ns->single_bin && bname[0] != 0) || (! ns->single_bin && bname[0] == 0)) {
100 cf_warning(AS_UDF, "udf_aerospike_delbin: Invalid Parameters: [Invalid bin name supplied]... Fail");
101 return -1;
102 }
103
104 // Check quality of bname -- check that it is proper length, then make sure
105 // that the bin exists.
106 if (strlen(bname) >= AS_BIN_NAME_MAX_SZ) {
107 // Can't read bin if name too large.
108 cf_warning(AS_UDF, "udf_aerospike_delbin: Invalid Parameters [bin name(%s) too big]... Fail", bname);
109 return -1;
110 }
111
112 as_bin * b = as_bin_get(rd, bname);
113 if ( !b ) {
114 cf_debug(AS_UDF, "udf_aerospike_delbin: Invalid Operation [Bin name(%s) not found of delete]... Fail", bname);
115 return -1;
116 }
117
118 const char * set_name = as_index_get_set_name(rd->r, ns);
119
120 bool has_sindex = record_has_sindex(rd->r, ns);
121 SINDEX_BINS_SETUP(sbins, ns->sindex_cnt);
122 as_sindex * si_arr[ns->sindex_cnt];
123 int si_arr_index = 0;
124 int sbins_populated = 0;
125 if (has_sindex) {
126 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, b->id, &si_arr[si_arr_index]);
127 sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, sbins, AS_SINDEX_OP_DELETE);
128 }
129
130 int32_t i = as_bin_get_index(rd, bname);
131 if (i != -1) {
132 if (has_sindex) {
133 if (sbins_populated > 0) {
134 urecord->tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED;
135 as_sindex_update_by_sbin(ns, as_index_get_set_name(rd->r, ns), sbins, sbins_populated, &rd->r->keyd);
136 }
137 }
138 as_bin_destroy(rd, i);
139 } else {
140 cf_warning(AS_UDF, "udf_aerospike_delbin: Internal Error [Deleting non-existing bin %s]... Fail", bname);
141 }
142
143 if (has_sindex) {
144 as_sindex_sbin_freeall(sbins, sbins_populated);
145 as_sindex_release_arr(si_arr, si_arr_index);
146 }
147
148 return 0;
149}
150/*
151 * Internal function: udf__aerospike_get_particle_buf
152 *
153 * Parameters:
154 * r -- udf_record_bin for which particle buf is requested
155 * type -- bin type
156 * pbytes -- current space required
157 *
158 * Return value:
159 * NULL on failure
160 * valid buf pointer success
161 *
162 * Description:
163 * The function find space on preallocated particle_data for requested size.
164 * In case it is found it tries to allocate space for bin independently.
165 * Return back the pointer to the offset on preallocated particle_data or newly
166 * allocated space.
167 *
168 * Return NULL if both fails
169 *
170 * Note: ubin->particle_buf will be set if new per bin memory is allocated.
171 *
172 * Callers:
173 * udf_aerospike_setbin
174 */
175uint8_t *
176udf__aerospike_get_particle_buf(udf_record *urecord, udf_record_bin *ubin, uint32_t pbytes)
177{
178 if (pbytes > urecord->rd->ns->storage_write_block_size) {
179 cf_warning(AS_UDF, "udf__aerospike_get_particle_buf: Invalid Operation [Bin %s data too big size=%u]... Fail", ubin->name, pbytes);
180 return NULL;
181 }
182
183 uint32_t alloc_size = pbytes == 0 ? 0 : urecord->rd->ns->storage_write_block_size;
184 uint8_t *buf = NULL;
185
186 if (ubin->particle_buf) {
187 buf = ubin->particle_buf;
188 } else {
189 // Disable dynamic shifting from the flat allocater to dynamic
190 // allocation.
191 if ((urecord->cur_particle_data + pbytes) < urecord->end_particle_data) {
192 buf = urecord->cur_particle_data;
193 urecord->cur_particle_data += pbytes;
194 } else if (alloc_size) {
195 // If there is no space in preallocated buffer then go
196 // ahead and allocate space per bin. This may happen
197 // if user keeps doing lot of execute update exhausting
198 // the buffer. After this point the record size check will
199 // trip instead of at the code when bin value is set.
200 ubin->particle_buf = cf_malloc(alloc_size);
201 buf = ubin->particle_buf;
202 }
203 }
204 return buf;
205}
206/*
207 * Internal function: udf_aerospike_setbin
208 *
209 * Parameters:
210 * offset -- offset of udf bin in updates array
211 * r -- udf_record to be manipulated
212 * bname -- name of the bin to be deleted
213 * val -- value to be updated with
214 *
215 * Return value:
216 * 0 on success
217 * -1 on failure
218 *
219 * Description:
220 * The function sets the bin with the name
221 * passed in as parameter to the value, passed as the third parameter.
222 * Before updating the bin, it is checked if the value can fit in the storage
223 *
224 * Synchronization : object lock acquired by the transaction thread executing UDF.
225 * Partition reservation takes place just before the transaction starts executing
226 * ( look for as_partition_reserve_udf in thr_tsvc.c )
227 *
228 * Callers:
229 * udf_aerospike__apply_update_atomic
230 * In this function, if it fails at the time of update, the record is set
231 * to rollback all the updates till this point. The case where it fails in
232 * rollback is not handled.
233 *
234 * Side Notes:
235 * i. write_to_device will be set to true on a successful bin update.
236 * If all the updates from udf_aerospike__apply_update_atomic (including this) are
237 * successful, the record will be written to disk and reopened so that the rest of
238 * sets of updates can be applied.
239 *
240 * ii. If put in sindex fails, we do not handle it.
241 *
242 * TODO make sure anything goes into setbin only if the bin value is
243 * changed
244 */
245static int
246udf_aerospike_setbin(udf_record * urecord, int offset, const char * bname, const as_val * val)
247{
248 as_storage_rd *rd = urecord->rd;
249 as_namespace *ns = rd->ns;
250
251 if (bname == NULL || (ns->single_bin && bname[0] != 0) || (! ns->single_bin && bname[0] == 0)) {
252 cf_warning(AS_UDF, "udf_aerospike_setbin: Invalid Parameters: [Invalid bin name supplied]... Fail");
253 return -1;
254 }
255
256 if (as_particle_type_from_asval(val) == AS_PARTICLE_TYPE_NULL) {
257 cf_warning(AS_UDF, "udf_aerospike_setbin: [%s] called with unusable as_val", bname);
258 return -3;
259 }
260
261 uint8_t type = as_val_type(val);
262
263 as_bin * b = as_bin_get_or_create(rd, bname);
264
265 if ( !b ) {
266 cf_warning(AS_UDF, "udf_aerospike_setbin: Internal Error [Bin %s not found.. Possibly ran out of bins]... Fail", bname);
267 return -1;
268 }
269
270 bool has_sindex = record_has_sindex(rd->r, ns);
271 SINDEX_BINS_SETUP(sbins, 2 * ns->sindex_cnt);
272 as_sindex * si_arr[2 * ns->sindex_cnt];
273 int sbins_populated = 0;
274 int si_arr_index = 0;
275 const char * set_name = as_index_get_set_name(rd->r, ns);
276
277 if (has_sindex ) {
278 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, b->id, &si_arr[si_arr_index]);
279 sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, &sbins[sbins_populated], AS_SINDEX_OP_DELETE);
280 }
281
282 // we know we are doing an update now, make sure there is particle data,
283 // set to be 1 wblock size now @TODO!
284 int ret = 0;
285
286 cf_detail(AS_UDF, "udf_setbin: bin %s type %d ", bname, type );
287
288 if (ns->storage_data_in_memory) {
289 if (as_bin_particle_replace_from_asval(b, val) != 0) {
290 cf_warning(AS_UDF, "udf_aerospike_setbin: [%s] failed to replace particle", bname);
291 ret = -4;
292 }
293 }
294 else {
295 uint32_t size = as_particle_size_from_asval(val);
296 uint8_t *particle_buf = udf__aerospike_get_particle_buf(urecord, &urecord->updates[offset], size);
297
298 if (particle_buf) {
299 as_bin_particle_stack_from_asval(b, particle_buf, val);
300 }
301 else {
302 cf_warning(AS_UDF, "udf_aerospike_setbin: [%s] failed to get space for particle size %u", bname, size);
303 ret = -4;
304 }
305 }
306
307 // Update sindex if required
308 if (has_sindex) {
309 if (ret) {
310 if (sbins_populated > 0) {
311 as_sindex_sbin_freeall(sbins, sbins_populated);
312 }
313 as_sindex_release_arr(si_arr, si_arr_index);
314 return ret;
315 }
316
317 si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, b->id, &si_arr[si_arr_index]);
318 sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, &sbins[sbins_populated], AS_SINDEX_OP_INSERT);
319 if (sbins_populated > 0) {
320 urecord->tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED;
321 as_sindex_update_by_sbin(ns, as_index_get_set_name(rd->r, ns), sbins, sbins_populated, &rd->r->keyd);
322 as_sindex_sbin_freeall(sbins, sbins_populated);
323 }
324 as_sindex_release_arr(si_arr, si_arr_index);
325 }
326
327 return ret;
328} // end udf_aerospike_setbin()
329
330/*
331 * Check and validate parameter before performing operation
332 *
333 * return:
334 * UDF_ERR * in case of failure
335 * 0 in case of success
336 */
337static int
338udf_aerospike_param_check(const as_aerospike *as, const as_rec *rec, char *fname, int lineno)
339{
340 if (!as) {
341 cf_debug(AS_UDF, "Invalid Parameters: aerospike=%p", as);
342 return UDF_ERR_INTERNAL_PARAMETER;
343 }
344
345 int ret = udf_record_param_check(rec, fname, lineno);
346 if (ret) {
347 return ret;
348 }
349 return 0;
350}
351
352/*
353 * Internal function: udf_aerospike__apply_update_atomic
354 *
355 * Parameters:
356 * rec -- udf_record to be updated
357 *
358 * Return Values:
359 * 0 success
360 * -1 failure
361 *
362 * Description:
363 * This function applies all the updates atomically. That is,
364 * if one of the bin update/delete/create fails, the entire function
365 * will fail. If the nth update fails, all the n-1 updates are rolled
366 * back to their initial values
367 *
368 * Special Notes:
369 * i. The basic checks of bin name being too long or if there is enough space
370 * on the disk for the bin values is done before allocating space for any
371 * of the bins.
372 *
373 * ii. If one of the updates to be rolled back is a bin creation,
374 * udf_aerospike_delbin is called. This will not free up the bin metadata.
375 * So there will be a small memory mismatch b/w replica (which did not get the
376 * record at all and hence no memory is accounted) and the master will be seen.
377 * To avoid such cases, we are doing checks upfront.
378 *
379 * Callers:
380 * udf_aerospike__execute_updates
381 * In this function, if udf_aerospike__apply_update_atomic fails, the record
382 * is not committed to the storage. On success, record is closed which commits to
383 * the storage and reopened for the next set of udf updates.
384 * The return value from udf_aerospike__apply_update_atomic is passed on to the
385 * callers of this function.
386 */
387int
388udf_aerospike__apply_update_atomic(udf_record *urecord)
389{
390 int rc = 0;
391 int failmax = 0;
392 int new_bins = 0; // How many new bins have to be created in this update
393 as_storage_rd * rd = urecord->rd;
394 as_namespace * ns = rd->ns;
395 bool has_sindex = record_has_sindex(rd->r, ns);
396 bool is_record_dirty = false;
397
398 // This will iterate over all the updates and apply them to storage.
399 // The items will remain, and be used as cache values. If an error
400 // occurred during setbin(), we rollback all the operation which
401 // is and return failure
402 cf_detail(AS_UDF, "execute updates: %d updates", urecord->nupdates);
403
404 // loop twice to make sure the updates are performed first so in case
405 // something wrong it can be rolled back. The deletes will go through
406 // successfully generally.
407
408 as_val* old_values[urecord->nupdates];
409
410 // In first iteration, just calculate how many new bins need to be created
411 for(uint32_t i = 0; i < urecord->nupdates; i++ ) {
412 old_values[i] = NULL;
413
414 if ( urecord->updates[i].dirty ) {
415 char * k = urecord->updates[i].name;
416 if ( k != NULL ) {
417 if ( !as_bin_get(rd, k) ) {
418 new_bins++;
419 }
420 }
421 }
422 }
423 // Free bins - total bins not in use in the record
424 // Delta bins - new bins that need to be created
425 int inuse_bins = as_bin_inuse_count(rd);
426 int free_bins = rd->n_bins - inuse_bins;
427 int delta_bins = new_bins - free_bins;
428 cf_detail(AS_UDF, "Total bins %d, In use bins %d, Free bins %d , New bins %d, Delta bins %d",
429 rd->n_bins, as_bin_inuse_count(urecord->rd), free_bins, new_bins, delta_bins);
430
431 // Check bin usage limit.
432 if ((inuse_bins + new_bins > UDF_RECORD_BIN_ULIMIT) ||
433 (urecord->flag & UDF_RECORD_FLAG_TOO_MANY_BINS)) {
434 cf_warning(AS_UDF, "bin limit of %d for UDF exceeded: %d bins in use, %d bins free, %s%d new bins needed",
435 (int)UDF_RECORD_BIN_ULIMIT, inuse_bins, free_bins,
436 (urecord->flag & UDF_RECORD_FLAG_TOO_MANY_BINS) ? ">" : "", new_bins);
437 goto Rollback;
438 }
439
440 // Allocate space for all the new bins that need to be created beforehand
441 if (delta_bins > 0 && rd->ns->storage_data_in_memory && ! rd->ns->single_bin) {
442 as_bin_allocate_bin_space(rd, delta_bins);
443 }
444
445 if (!rd->ns->storage_data_in_memory && !urecord->particle_data) {
446 urecord->particle_data = cf_malloc(rd->ns->storage_write_block_size);
447 urecord->cur_particle_data = urecord->particle_data;
448 urecord->end_particle_data = urecord->particle_data + rd->ns->storage_write_block_size;
449 }
450
451 if (has_sindex) {
452 SINDEX_GRLOCK();
453 }
454
455 // In second iteration apply updates.
456 for(uint32_t i = 0; i < urecord->nupdates; i++ ) {
457 if ( urecord->updates[i].dirty && rc == 0) {
458
459 char * k = urecord->updates[i].name;
460 as_val * v = urecord->updates[i].value;
461
462 if ( k != NULL ) {
463 if ( v == NULL || v->type == AS_NIL ) {
464 // if the value is NIL, then do a delete
465 cf_detail(AS_UDF, "execute update: position %d deletes bin %s", i, k);
466 old_values[i] = udf_record_storage_get(urecord, k);
467 // Only case delete fails if bin is not found that is
468 // as good as delete. Ignore return code !!
469 udf_aerospike_delbin(urecord, k);
470
471 if (urecord->dirty != NULL) {
472 xdr_fill_dirty_bins(urecord->dirty);
473 }
474 }
475 else {
476 // otherwise, it is a set
477 cf_detail(AS_UDF, "execute update: position %d sets bin %s", i, k);
478 old_values[i] = udf_record_storage_get(urecord, k);
479 rc = udf_aerospike_setbin(urecord, i, k, v);
480 if (rc) {
481 if (old_values[i]) {
482 as_val_destroy(old_values[i]);
483 old_values[i] = NULL;
484 }
485 failmax = i;
486 goto Rollback;
487 }
488
489 if (urecord->dirty != NULL) {
490 xdr_add_dirty_bin(ns, urecord->dirty, k, strlen(k));
491 }
492 }
493 }
494
495 is_record_dirty = true;
496 }
497 }
498
499 {
500 if (! as_storage_record_size_and_check(rd)) {
501 cf_warning(AS_UDF, "record failed storage size check, will not be updated");
502 failmax = (int)urecord->nupdates;
503 goto Rollback;
504 }
505
506 if (rd->ns->clock_skew_stop_writes) {
507 failmax = (int)urecord->nupdates;
508 goto Rollback;
509 }
510
511 if (rd->ns->stop_writes) {
512 cf_warning(AS_UDF, "UDF failed by stop-writes, record will not be updated");
513 failmax = (int)urecord->nupdates;
514 goto Rollback;
515 }
516
517 if (! as_storage_has_space(rd->ns)) {
518 cf_warning(AS_UDF, "drives full, record will not be updated");
519 failmax = (int)urecord->nupdates;
520 goto Rollback;
521 }
522
523 if (! is_valid_ttl(urecord->tr->msgp->msg.record_ttl)) {
524 cf_warning(AS_UDF, "invalid ttl %u", urecord->tr->msgp->msg.record_ttl);
525 failmax = (int)urecord->nupdates;
526 goto Rollback;
527 }
528 }
529
530 if (has_sindex) {
531 SINDEX_GRUNLOCK();
532 }
533
534 // If there were updates do miscellaneous successful commit
535 // tasks
536 if (is_record_dirty
537 || (urecord->flag & UDF_RECORD_FLAG_METADATA_UPDATED)) {
538 urecord->flag |= UDF_RECORD_FLAG_HAS_UPDATES; // will write to storage
539 }
540
541 // Clean up oldvalue cache and reset dirty. All the changes made
542 // here has made to the particle buffer. Nothing will now be backed out.
543 for (uint32_t i = 0; i < urecord->nupdates; i++) {
544 if (old_values[i]) {
545 as_val_destroy(old_values[i]);
546 }
547 urecord->updates[i].dirty = false;
548 }
549 return rc;
550
551Rollback:
552 cf_debug(AS_UDF, "Rollback Called: failmax %d", failmax);
553 for (int i = 0; i < failmax; i++) {
554 if (urecord->updates[i].dirty) {
555 char * k = urecord->updates[i].name;
556 // Pick the oldvalue for rollback
557 as_val * v = old_values[i];
558 if ( k != NULL ) {
559 if ( v == NULL || v->type == AS_NIL ) {
560 // if the value is NIL, then do a delete
561 cf_detail(AS_UDF, "execute rollback: position %d deletes bin %s", i, k);
562 rc = udf_aerospike_delbin(urecord, k);
563 }
564 else {
565 // otherwise, it is a set
566 cf_detail(AS_UDF, "execute rollback: position %d sets bin %s", i, k);
567 rc = udf_aerospike_setbin(urecord, i, k, v);
568 if (rc) {
569 cf_warning(AS_UDF, "Rollback failed .. not good ... !!");
570 }
571 }
572 }
573 if (v) {
574 as_val_destroy(v);
575 cf_debug(AS_UDF, "ROLLBACK as_val_destroy()");
576 }
577 }
578 }
579
580 if (is_record_dirty && urecord->dirty != NULL) {
581 xdr_clear_dirty_bins(urecord->dirty);
582 }
583
584 if (has_sindex) {
585 SINDEX_GRUNLOCK();
586 }
587
588 // Reset the flat size in case the stuff is backedout !!! it should not
589 // fail in the backout code ...
590 if (! as_storage_record_size_and_check(rd)) {
591 cf_warning(AS_UDF, "Does not fit even after rollback... it is trouble");
592 }
593
594 // Do not clean up the cache in case of failure
595 return -1;
596}
597
598/*
599 * Internal function: udf_aerospike_execute_updates
600 *
601 * Parameters:
602 * rec - udf record to be updated
603 *
604 * Return values
605 * 0 on success
606 * -1 on failure
607 *
608 * Description:
609 * Execute set of udf_record updates. If these updates are successfully
610 * applied atomically, the storage record is closed (committed to the disk)
611 * and reopened. The cache is freed up at the end.
612 *
613 * Callers:
614 * udf_aerospike_rec_create, interface func - aerospike:create(r)
615 * udf_aerospike_rec_update, interface func - aerospike:update(r)
616 * udf_aerospike__execute_updates is the key function which is executed in these
617 * functions. The return value is directly passed on to the lua.
618 */
619int
620udf_aerospike__execute_updates(udf_record * urecord)
621{
622 int rc = 0;
623 as_storage_rd *rd = urecord->rd;
624
625 if ( urecord->nupdates == 0 &&
626 (urecord->flag & UDF_RECORD_FLAG_METADATA_UPDATED) == 0 ) {
627 cf_detail(AS_UDF, "No Update when execute update is called");
628 return 0;
629 }
630
631 // fail updates in case update is not allowed. Queries and scans do not
632 // not allow updates. Updates will never be true .. just being paranoid
633 if (!(urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES)) {
634 cf_warning(AS_UDF, "Udf: execute updates: allow updates false; FAIL");
635 return -1;
636 }
637
638 // Commit semantics is either all the update make it or none of it
639 rc = udf_aerospike__apply_update_atomic(urecord);
640
641 // allocate down if bins are deleted / not in use
642 if (rd->ns && rd->ns->storage_data_in_memory && ! rd->ns->single_bin) {
643 int32_t delta_bins = (int32_t)as_bin_inuse_count(rd) - (int32_t)rd->n_bins;
644 if (delta_bins) {
645 as_bin_allocate_bin_space(rd, delta_bins);
646 }
647 }
648 return rc;
649}
650
651static void
652udf_aerospike_destroy(as_aerospike * as)
653{
654 as_aerospike_destroy(as);
655}
656
657static cf_clock
658udf_aerospike_get_current_time(const as_aerospike * as)
659{
660 (void)as;
661 return cf_clock_getabsolute();
662}
663
664/**
665 * aerospike::create(record)
666 * Function: udf_aerospike_rec_create
667 *
668 * Parameters:
669 * as - as_aerospike
670 * rec - as_rec
671 *
672 * Return Values:
673 * 1 if record is being read or on a create, it already exists
674 * o/w return value of udf_aerospike__execute_updates
675 *
676 * Description:
677 * Create a new record in local storage.
678 * The record will only be created if it does not exist.
679 * This assumes the record has a digest that is valid for local storage.
680 *
681 * Synchronization : object lock acquired by the transaction thread executing UDF.
682 * Partition reservation takes place just before the transaction starts executing
683 * ( look for as_partition_reserve_udf in thr_tsvc.c )
684 *
685 * Callers:
686 * lua interfacing function, mod_lua_aerospike_rec_create
687 * The return value of udf_aerospike_rec_create is pushed on to the lua stack
688 *
689 * Notes:
690 * The 'read' and 'exists' flag of udf_record are set to true.
691*/
692static int
693udf_aerospike_rec_create(const as_aerospike * as, const as_rec * rec)
694{
695 int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__);
696 if (ret) {
697 return ret;
698 }
699
700 udf_record * urecord = (udf_record *) as_rec_source(rec);
701
702 // make sure record isn't already successfully read
703 if ((urecord->flag & UDF_RECORD_FLAG_OPEN) != 0) {
704 if (as_bin_inuse_has(urecord->rd)) {
705 cf_detail(AS_UDF, "udf_aerospike_rec_create: Record Already Exists");
706 return 1;
707 }
708 // else - binless record ok...
709
710 if ((ret = udf_aerospike__execute_updates(urecord)) != 0) {
711 cf_warning(AS_UDF, "udf_aerospike_rec_create: failure executing record updates");
712 udf_aerospike_rec_remove(as, rec);
713 }
714
715 return ret;
716 }
717
718 as_transaction *tr = urecord->tr;
719 as_index_ref *r_ref = urecord->r_ref;
720 as_storage_rd *rd = urecord->rd;
721 as_index_tree *tree = tr->rsv.tree;
722
723 // make sure we got the record as a create
724 int rv = as_record_get_create(tree, &tr->keyd, r_ref, tr->rsv.ns);
725 cf_detail_digest(AS_UDF, &tr->keyd, "Creating Record ");
726
727 // rv 0 means record exists, 1 means create, < 0 means fail
728 // TODO: Verify correct result codes.
729 if (rv == 1) {
730 // Record created.
731 } else if (rv == 0) {
732 // If it's an expired or truncated record, pretend it's a fresh create.
733 if (as_record_is_doomed(r_ref->r, tr->rsv.ns)) {
734 as_record_rescue(r_ref, tr->rsv.ns);
735 } else {
736 cf_warning(AS_UDF, "udf_aerospike_rec_create: Record Already Exists 2");
737 as_record_done(r_ref, tr->rsv.ns);
738 // DO NOT change it has special meaning for caller
739 return 1;
740 }
741 } else if (rv < 0) {
742 cf_detail_digest(AS_UDF, &tr->keyd, "udf_aerospike_rec_create: Record Open Failed with rv=%d ", rv);
743 return rv;
744 }
745
746 // Associates the set name with the storage rec and index
747 if (tr->msgp) {
748 // Set the set name to index and close record if the setting the set name
749 // is not successful
750 int rv_set = as_transaction_has_set(tr) ?
751 set_set_from_msg(r_ref->r, tr->rsv.ns, &tr->msgp->msg) : 0;
752 if (rv_set != 0) {
753 cf_warning(AS_UDF, "udf_aerospike_rec_create: Failed to set setname");
754 as_index_delete(tree, &tr->keyd);
755 as_record_done(r_ref, tr->rsv.ns);
756 return 4;
757 }
758
759 // Don't write record if it would be truncated.
760 if (as_truncate_now_is_truncated(tr->rsv.ns, as_index_get_set_id(r_ref->r))) {
761 as_index_delete(tree, &tr->keyd);
762 as_record_done(r_ref, tr->rsv.ns);
763 return 4;
764 }
765 }
766
767 // open up storage
768 as_storage_record_create(tr->rsv.ns, r_ref->r, rd);
769
770 // Shortcut for set name storage.
771 as_storage_record_get_set_name(rd);
772
773 // If the message has a key, apply it to the record.
774 if (! get_msg_key(tr, rd)) {
775 cf_warning(AS_UDF, "udf_aerospike_rec_create: Can't store key");
776 as_storage_record_close(rd);
777 as_index_delete(tree, &tr->keyd);
778 as_record_done(r_ref, tr->rsv.ns);
779 return 4;
780 }
781
782 // if multibin storage, we will use urecord->stack_bins, so set the size appropriately
783 if (rd->ns->single_bin) {
784 rd->n_bins = 1;
785 }
786 else if (! rd->ns->storage_data_in_memory) {
787 rd->n_bins = sizeof(urecord->stack_bins) / sizeof(as_bin);
788 }
789
790 // side effect: will set the unused bins to properly unused
791 as_storage_rd_load_bins(rd, urecord->stack_bins); // TODO - handle error returned
792
793 int rc = udf_aerospike__execute_updates(urecord);
794
795 if (rc != 0) {
796 // Creating the udf record failed, destroy the as_record
797 cf_warning(AS_UDF, "udf_aerospike_rec_create: failure executing record updates (%d)", rc);
798 udf_record_close(urecord); // handles particle data and cache only
799 as_storage_record_close(rd);
800 as_index_delete(tree, &tr->keyd);
801 as_record_done(r_ref, tr->rsv.ns);
802 return rc;
803 }
804
805 // Success...
806
807 urecord->flag |= UDF_RECORD_FLAG_OPEN | UDF_RECORD_FLAG_STORAGE_OPEN;
808
809 return 0;
810}
811
812/**
813 * aerospike::update(record)
814 * Function: udf_aerospike_rec_update
815 *
816 * Parameters:
817 *
818 * Return Values:
819 * -2 if record does not exist
820 * o/w return value of udf_aerospike__execute_updates
821 *
822 * Description:
823 * Updates an existing record in local storage.
824 * The record will only be updated if it exists.
825 *
826 * Synchronization : object lock acquired by the transaction thread executing UDF.
827 * Partition reservation takes place just before the transaction starts executing
828 * ( look for as_partition_reserve_udf in thr_tsvc.c )
829 *
830 * Callers:
831 * lua interfacing function, mod_lua_aerospike_rec_update
832 * The return value of udf_aerospike_rec_update is pushed on to the lua stack
833 *
834 * Notes:
835 * If the record does not exist or is not read by anyone yet, we cannot
836 * carry on with the update. 'exists' and 'set' are set to false on record
837 * init or record remove.
838*/
839static int
840udf_aerospike_rec_update(const as_aerospike * as, const as_rec * rec)
841{
842 int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__);
843 if (ret) {
844 return ret;
845 }
846
847 udf_record * urecord = (udf_record *) as_rec_source(rec);
848
849 // make sure record exists and is already opened up
850 if (!urecord || !(urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)
851 || !(urecord->flag & UDF_RECORD_FLAG_OPEN) ) {
852 cf_warning(AS_UDF, "Record not found to be open while updating urecord flag=%d", urecord ? urecord->flag : -1);
853 return -2;
854 }
855 cf_detail_digest(AS_UDF, &urecord->rd->r->keyd, "Executing Updates");
856 ret = udf_aerospike__execute_updates(urecord);
857
858 if (ret < 0) {
859 cf_warning(AS_UDF, "udf_aerospike_rec_update: failure executing record updates (%d)", ret);
860 }
861
862 return ret;
863}
864
865/**
866 * Function udf_aerospike_rec_exists
867 *
868 * Parameters:
869 *
870 * Return Values:
871 * 1 if record exists
872 * 0 o/w
873 *
874 * Description:
875 * Check to see if the record exists
876 */
877static int
878udf_aerospike_rec_exists(const as_aerospike * as, const as_rec * rec)
879{
880 int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__);
881 if (ret) {
882 return ret;
883 }
884
885 udf_record * urecord = (udf_record *) as_rec_source(rec);
886
887 return (urecord && (urecord->flag & UDF_RECORD_FLAG_OPEN)) ? true : false;
888}
889
890/*
891 * Function: udf_aerospike_rec_remove
892 *
893 * Parameters:
894 *
895 * Return Values:
896 * 1 if record does not exist
897 * 0 on success
898 *
899 * Description:
900 * Removes an existing record from local storage.
901 * The record will only be removed if it exists.
902 */
903static int
904udf_aerospike_rec_remove(const as_aerospike * as, const as_rec * rec)
905{
906 int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__);
907 if (ret) {
908 return ret;
909 }
910 udf_record * urecord = (udf_record *) as_rec_source(rec);
911
912 // make sure record is already exists before removing it
913 if (!urecord || !(urecord->flag & UDF_RECORD_FLAG_OPEN)) {
914 return 1;
915 }
916
917 as_storage_rd* rd = urecord->rd;
918
919 if (rd->ns->storage_data_in_memory && ! rd->ns->single_bin) {
920 delete_adjust_sindex(rd);
921 }
922
923 as_record_destroy_bins(rd);
924
925 if (rd->ns->storage_data_in_memory && ! rd->ns->single_bin) {
926 as_record_free_bin_space(rd->r);
927 rd->bins = NULL;
928 rd->n_bins = 0;
929 }
930
931 if (urecord->particle_data) {
932 cf_free(urecord->particle_data);
933 urecord->particle_data = NULL;
934 }
935
936 udf_record_cache_free(urecord);
937 urecord->flag |= UDF_RECORD_FLAG_HAS_UPDATES;
938
939 return 0;
940}
941
942/**
943 * Writes a log message
944 */
945static int
946udf_aerospike_log(const as_aerospike * a, const char * file, const int line, const int lvl, const char * msg)
947{
948 (void)a;
949 cf_fault_event(AS_UDF, lvl, file, line, "%s", (char *) msg);
950 return 0;
951}
952
953// Would someone please explain the structure of these hooks? Why are some null?
954const as_aerospike_hooks udf_aerospike_hooks = {
955 .rec_create = udf_aerospike_rec_create,
956 .rec_update = udf_aerospike_rec_update,
957 .rec_remove = udf_aerospike_rec_remove,
958 .rec_exists = udf_aerospike_rec_exists,
959 .log = udf_aerospike_log,
960 .get_current_time = udf_aerospike_get_current_time,
961 .destroy = udf_aerospike_destroy
962};
963