1 | /* |
2 | * udf_aerospike.c |
3 | * |
4 | * Copyright (C) 2012-2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #include "base/udf_aerospike.h" |
24 | |
25 | #include <stdbool.h> |
26 | #include <stddef.h> |
27 | #include <stdint.h> |
28 | #include <string.h> |
29 | #include <asm/byteorder.h> |
30 | |
31 | #include "aerospike/as_aerospike.h" |
32 | #include "aerospike/as_boolean.h" |
33 | #include "aerospike/as_buffer.h" |
34 | #include "aerospike/as_bytes.h" |
35 | #include "aerospike/as_integer.h" |
36 | #include "aerospike/as_msgpack.h" |
37 | #include "aerospike/as_serializer.h" |
38 | #include "aerospike/as_string.h" |
39 | #include "aerospike/as_val.h" |
40 | #include "citrusleaf/cf_clock.h" |
41 | |
42 | #include "fault.h" |
43 | |
44 | #include "base/datamodel.h" |
45 | #include "base/index.h" |
46 | #include "base/secondary_index.h" |
47 | #include "base/transaction.h" |
48 | #include "base/truncate.h" |
49 | #include "base/udf_record.h" |
50 | #include "base/xdr_serverside.h" |
51 | #include "storage/storage.h" |
52 | #include "transaction/rw_utils.h" |
53 | #include "transaction/udf.h" |
54 | |
55 | |
56 | static int udf_aerospike_rec_remove(const as_aerospike *, const as_rec *); |
57 | /* |
58 | * Internal Function: udf_aerospike_delbin |
59 | * |
60 | * Parameters: |
61 | * r - udf_record to be manipulated |
62 | * bname - name of the bin to be deleted |
63 | * |
64 | * Return value: |
65 | * 0 on success |
66 | * -1 on failure |
67 | * |
68 | * Description: |
69 | * The function deletes the bin with the name |
70 | * passed in as parameter. The as_bin_destroy function |
71 | * which is called here, only frees the data and |
72 | * the bin is marked as not in use. The bin can then be reused later. |
73 | * |
74 | * Synchronization : object lock acquired by the transaction thread executing UDF. |
75 | * Partition reservation takes place just before the transaction starts executing |
76 | * ( look for as_partition_reserve_udf in thr_tsvc.c ) |
77 | * |
78 | * Callers: |
79 | * udf_aerospike__apply_update_atomic |
80 | * In this function, if it fails at the time of update, the record is set |
81 | * to rollback all the updates till this point. The case where it fails in |
82 | * rollback is not handled. |
83 | * |
84 | * Side Notes: |
85 | * i. write_to_device will be set to true on a successful bin destroy. |
86 | * If all the updates from udf_aerospike__apply_update_atomic (including this) are |
87 | * successful, the record will be written to disk and reopened so that the rest of |
88 | * sets of updates can be applied. |
89 | * |
90 | * ii. If delete from sindex fails, we do not handle it. |
91 | */ |
92 | static int |
93 | udf_aerospike_delbin(udf_record * urecord, const char * bname) |
94 | { |
95 | as_storage_rd *rd = urecord->rd; |
96 | as_namespace *ns = rd->ns; |
97 | |
98 | // Check that bname is not completely invalid |
99 | if (bname == NULL || (ns->single_bin && bname[0] != 0) || (! ns->single_bin && bname[0] == 0)) { |
100 | cf_warning(AS_UDF, "udf_aerospike_delbin: Invalid Parameters: [Invalid bin name supplied]... Fail" ); |
101 | return -1; |
102 | } |
103 | |
104 | // Check quality of bname -- check that it is proper length, then make sure |
105 | // that the bin exists. |
106 | if (strlen(bname) >= AS_BIN_NAME_MAX_SZ) { |
107 | // Can't read bin if name too large. |
108 | cf_warning(AS_UDF, "udf_aerospike_delbin: Invalid Parameters [bin name(%s) too big]... Fail" , bname); |
109 | return -1; |
110 | } |
111 | |
112 | as_bin * b = as_bin_get(rd, bname); |
113 | if ( !b ) { |
114 | cf_debug(AS_UDF, "udf_aerospike_delbin: Invalid Operation [Bin name(%s) not found of delete]... Fail" , bname); |
115 | return -1; |
116 | } |
117 | |
118 | const char * set_name = as_index_get_set_name(rd->r, ns); |
119 | |
120 | bool has_sindex = record_has_sindex(rd->r, ns); |
121 | SINDEX_BINS_SETUP(sbins, ns->sindex_cnt); |
122 | as_sindex * si_arr[ns->sindex_cnt]; |
123 | int si_arr_index = 0; |
124 | int sbins_populated = 0; |
125 | if (has_sindex) { |
126 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, b->id, &si_arr[si_arr_index]); |
127 | sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, sbins, AS_SINDEX_OP_DELETE); |
128 | } |
129 | |
130 | int32_t i = as_bin_get_index(rd, bname); |
131 | if (i != -1) { |
132 | if (has_sindex) { |
133 | if (sbins_populated > 0) { |
134 | urecord->tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED; |
135 | as_sindex_update_by_sbin(ns, as_index_get_set_name(rd->r, ns), sbins, sbins_populated, &rd->r->keyd); |
136 | } |
137 | } |
138 | as_bin_destroy(rd, i); |
139 | } else { |
140 | cf_warning(AS_UDF, "udf_aerospike_delbin: Internal Error [Deleting non-existing bin %s]... Fail" , bname); |
141 | } |
142 | |
143 | if (has_sindex) { |
144 | as_sindex_sbin_freeall(sbins, sbins_populated); |
145 | as_sindex_release_arr(si_arr, si_arr_index); |
146 | } |
147 | |
148 | return 0; |
149 | } |
150 | /* |
151 | * Internal function: udf__aerospike_get_particle_buf |
152 | * |
153 | * Parameters: |
154 | * r -- udf_record_bin for which particle buf is requested |
155 | * type -- bin type |
156 | * pbytes -- current space required |
157 | * |
158 | * Return value: |
159 | * NULL on failure |
160 | * valid buf pointer success |
161 | * |
162 | * Description: |
163 | * The function find space on preallocated particle_data for requested size. |
164 | * In case it is found it tries to allocate space for bin independently. |
165 | * Return back the pointer to the offset on preallocated particle_data or newly |
166 | * allocated space. |
167 | * |
168 | * Return NULL if both fails |
169 | * |
170 | * Note: ubin->particle_buf will be set if new per bin memory is allocated. |
171 | * |
172 | * Callers: |
173 | * udf_aerospike_setbin |
174 | */ |
175 | uint8_t * |
176 | udf__aerospike_get_particle_buf(udf_record *urecord, udf_record_bin *ubin, uint32_t pbytes) |
177 | { |
178 | if (pbytes > urecord->rd->ns->storage_write_block_size) { |
179 | cf_warning(AS_UDF, "udf__aerospike_get_particle_buf: Invalid Operation [Bin %s data too big size=%u]... Fail" , ubin->name, pbytes); |
180 | return NULL; |
181 | } |
182 | |
183 | uint32_t alloc_size = pbytes == 0 ? 0 : urecord->rd->ns->storage_write_block_size; |
184 | uint8_t *buf = NULL; |
185 | |
186 | if (ubin->particle_buf) { |
187 | buf = ubin->particle_buf; |
188 | } else { |
189 | // Disable dynamic shifting from the flat allocater to dynamic |
190 | // allocation. |
191 | if ((urecord->cur_particle_data + pbytes) < urecord->end_particle_data) { |
192 | buf = urecord->cur_particle_data; |
193 | urecord->cur_particle_data += pbytes; |
194 | } else if (alloc_size) { |
195 | // If there is no space in preallocated buffer then go |
196 | // ahead and allocate space per bin. This may happen |
197 | // if user keeps doing lot of execute update exhausting |
198 | // the buffer. After this point the record size check will |
199 | // trip instead of at the code when bin value is set. |
200 | ubin->particle_buf = cf_malloc(alloc_size); |
201 | buf = ubin->particle_buf; |
202 | } |
203 | } |
204 | return buf; |
205 | } |
206 | /* |
207 | * Internal function: udf_aerospike_setbin |
208 | * |
209 | * Parameters: |
210 | * offset -- offset of udf bin in updates array |
211 | * r -- udf_record to be manipulated |
212 | * bname -- name of the bin to be deleted |
213 | * val -- value to be updated with |
214 | * |
215 | * Return value: |
216 | * 0 on success |
217 | * -1 on failure |
218 | * |
219 | * Description: |
220 | * The function sets the bin with the name |
221 | * passed in as parameter to the value, passed as the third parameter. |
222 | * Before updating the bin, it is checked if the value can fit in the storage |
223 | * |
224 | * Synchronization : object lock acquired by the transaction thread executing UDF. |
225 | * Partition reservation takes place just before the transaction starts executing |
226 | * ( look for as_partition_reserve_udf in thr_tsvc.c ) |
227 | * |
228 | * Callers: |
229 | * udf_aerospike__apply_update_atomic |
230 | * In this function, if it fails at the time of update, the record is set |
231 | * to rollback all the updates till this point. The case where it fails in |
232 | * rollback is not handled. |
233 | * |
234 | * Side Notes: |
235 | * i. write_to_device will be set to true on a successful bin update. |
236 | * If all the updates from udf_aerospike__apply_update_atomic (including this) are |
237 | * successful, the record will be written to disk and reopened so that the rest of |
238 | * sets of updates can be applied. |
239 | * |
240 | * ii. If put in sindex fails, we do not handle it. |
241 | * |
242 | * TODO make sure anything goes into setbin only if the bin value is |
243 | * changed |
244 | */ |
245 | static int |
246 | udf_aerospike_setbin(udf_record * urecord, int offset, const char * bname, const as_val * val) |
247 | { |
248 | as_storage_rd *rd = urecord->rd; |
249 | as_namespace *ns = rd->ns; |
250 | |
251 | if (bname == NULL || (ns->single_bin && bname[0] != 0) || (! ns->single_bin && bname[0] == 0)) { |
252 | cf_warning(AS_UDF, "udf_aerospike_setbin: Invalid Parameters: [Invalid bin name supplied]... Fail" ); |
253 | return -1; |
254 | } |
255 | |
256 | if (as_particle_type_from_asval(val) == AS_PARTICLE_TYPE_NULL) { |
257 | cf_warning(AS_UDF, "udf_aerospike_setbin: [%s] called with unusable as_val" , bname); |
258 | return -3; |
259 | } |
260 | |
261 | uint8_t type = as_val_type(val); |
262 | |
263 | as_bin * b = as_bin_get_or_create(rd, bname); |
264 | |
265 | if ( !b ) { |
266 | cf_warning(AS_UDF, "udf_aerospike_setbin: Internal Error [Bin %s not found.. Possibly ran out of bins]... Fail" , bname); |
267 | return -1; |
268 | } |
269 | |
270 | bool has_sindex = record_has_sindex(rd->r, ns); |
271 | SINDEX_BINS_SETUP(sbins, 2 * ns->sindex_cnt); |
272 | as_sindex * si_arr[2 * ns->sindex_cnt]; |
273 | int sbins_populated = 0; |
274 | int si_arr_index = 0; |
275 | const char * set_name = as_index_get_set_name(rd->r, ns); |
276 | |
277 | if (has_sindex ) { |
278 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, b->id, &si_arr[si_arr_index]); |
279 | sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, &sbins[sbins_populated], AS_SINDEX_OP_DELETE); |
280 | } |
281 | |
282 | // we know we are doing an update now, make sure there is particle data, |
283 | // set to be 1 wblock size now @TODO! |
284 | int ret = 0; |
285 | |
286 | cf_detail(AS_UDF, "udf_setbin: bin %s type %d " , bname, type ); |
287 | |
288 | if (ns->storage_data_in_memory) { |
289 | if (as_bin_particle_replace_from_asval(b, val) != 0) { |
290 | cf_warning(AS_UDF, "udf_aerospike_setbin: [%s] failed to replace particle" , bname); |
291 | ret = -4; |
292 | } |
293 | } |
294 | else { |
295 | uint32_t size = as_particle_size_from_asval(val); |
296 | uint8_t *particle_buf = udf__aerospike_get_particle_buf(urecord, &urecord->updates[offset], size); |
297 | |
298 | if (particle_buf) { |
299 | as_bin_particle_stack_from_asval(b, particle_buf, val); |
300 | } |
301 | else { |
302 | cf_warning(AS_UDF, "udf_aerospike_setbin: [%s] failed to get space for particle size %u" , bname, size); |
303 | ret = -4; |
304 | } |
305 | } |
306 | |
307 | // Update sindex if required |
308 | if (has_sindex) { |
309 | if (ret) { |
310 | if (sbins_populated > 0) { |
311 | as_sindex_sbin_freeall(sbins, sbins_populated); |
312 | } |
313 | as_sindex_release_arr(si_arr, si_arr_index); |
314 | return ret; |
315 | } |
316 | |
317 | si_arr_index += as_sindex_arr_lookup_by_set_binid_lockfree(ns, set_name, b->id, &si_arr[si_arr_index]); |
318 | sbins_populated += as_sindex_sbins_from_bin(ns, set_name, b, &sbins[sbins_populated], AS_SINDEX_OP_INSERT); |
319 | if (sbins_populated > 0) { |
320 | urecord->tr->flags |= AS_TRANSACTION_FLAG_SINDEX_TOUCHED; |
321 | as_sindex_update_by_sbin(ns, as_index_get_set_name(rd->r, ns), sbins, sbins_populated, &rd->r->keyd); |
322 | as_sindex_sbin_freeall(sbins, sbins_populated); |
323 | } |
324 | as_sindex_release_arr(si_arr, si_arr_index); |
325 | } |
326 | |
327 | return ret; |
328 | } // end udf_aerospike_setbin() |
329 | |
330 | /* |
331 | * Check and validate parameter before performing operation |
332 | * |
333 | * return: |
334 | * UDF_ERR * in case of failure |
335 | * 0 in case of success |
336 | */ |
337 | static int |
338 | udf_aerospike_param_check(const as_aerospike *as, const as_rec *rec, char *fname, int lineno) |
339 | { |
340 | if (!as) { |
341 | cf_debug(AS_UDF, "Invalid Parameters: aerospike=%p" , as); |
342 | return UDF_ERR_INTERNAL_PARAMETER; |
343 | } |
344 | |
345 | int ret = udf_record_param_check(rec, fname, lineno); |
346 | if (ret) { |
347 | return ret; |
348 | } |
349 | return 0; |
350 | } |
351 | |
352 | /* |
353 | * Internal function: udf_aerospike__apply_update_atomic |
354 | * |
355 | * Parameters: |
356 | * rec -- udf_record to be updated |
357 | * |
358 | * Return Values: |
359 | * 0 success |
360 | * -1 failure |
361 | * |
362 | * Description: |
363 | * This function applies all the updates atomically. That is, |
364 | * if one of the bin update/delete/create fails, the entire function |
365 | * will fail. If the nth update fails, all the n-1 updates are rolled |
366 | * back to their initial values |
367 | * |
368 | * Special Notes: |
369 | * i. The basic checks of bin name being too long or if there is enough space |
370 | * on the disk for the bin values is done before allocating space for any |
371 | * of the bins. |
372 | * |
373 | * ii. If one of the updates to be rolled back is a bin creation, |
374 | * udf_aerospike_delbin is called. This will not free up the bin metadata. |
375 | * So there will be a small memory mismatch b/w replica (which did not get the |
376 | * record at all and hence no memory is accounted) and the master will be seen. |
377 | * To avoid such cases, we are doing checks upfront. |
378 | * |
379 | * Callers: |
380 | * udf_aerospike__execute_updates |
381 | * In this function, if udf_aerospike__apply_update_atomic fails, the record |
382 | * is not committed to the storage. On success, record is closed which commits to |
383 | * the storage and reopened for the next set of udf updates. |
384 | * The return value from udf_aerospike__apply_update_atomic is passed on to the |
385 | * callers of this function. |
386 | */ |
387 | int |
388 | udf_aerospike__apply_update_atomic(udf_record *urecord) |
389 | { |
390 | int rc = 0; |
391 | int failmax = 0; |
392 | int new_bins = 0; // How many new bins have to be created in this update |
393 | as_storage_rd * rd = urecord->rd; |
394 | as_namespace * ns = rd->ns; |
395 | bool has_sindex = record_has_sindex(rd->r, ns); |
396 | bool is_record_dirty = false; |
397 | |
398 | // This will iterate over all the updates and apply them to storage. |
399 | // The items will remain, and be used as cache values. If an error |
400 | // occurred during setbin(), we rollback all the operation which |
401 | // is and return failure |
402 | cf_detail(AS_UDF, "execute updates: %d updates" , urecord->nupdates); |
403 | |
404 | // loop twice to make sure the updates are performed first so in case |
405 | // something wrong it can be rolled back. The deletes will go through |
406 | // successfully generally. |
407 | |
408 | as_val* old_values[urecord->nupdates]; |
409 | |
410 | // In first iteration, just calculate how many new bins need to be created |
411 | for(uint32_t i = 0; i < urecord->nupdates; i++ ) { |
412 | old_values[i] = NULL; |
413 | |
414 | if ( urecord->updates[i].dirty ) { |
415 | char * k = urecord->updates[i].name; |
416 | if ( k != NULL ) { |
417 | if ( !as_bin_get(rd, k) ) { |
418 | new_bins++; |
419 | } |
420 | } |
421 | } |
422 | } |
423 | // Free bins - total bins not in use in the record |
424 | // Delta bins - new bins that need to be created |
425 | int inuse_bins = as_bin_inuse_count(rd); |
426 | int free_bins = rd->n_bins - inuse_bins; |
427 | int delta_bins = new_bins - free_bins; |
428 | cf_detail(AS_UDF, "Total bins %d, In use bins %d, Free bins %d , New bins %d, Delta bins %d" , |
429 | rd->n_bins, as_bin_inuse_count(urecord->rd), free_bins, new_bins, delta_bins); |
430 | |
431 | // Check bin usage limit. |
432 | if ((inuse_bins + new_bins > UDF_RECORD_BIN_ULIMIT) || |
433 | (urecord->flag & UDF_RECORD_FLAG_TOO_MANY_BINS)) { |
434 | cf_warning(AS_UDF, "bin limit of %d for UDF exceeded: %d bins in use, %d bins free, %s%d new bins needed" , |
435 | (int)UDF_RECORD_BIN_ULIMIT, inuse_bins, free_bins, |
436 | (urecord->flag & UDF_RECORD_FLAG_TOO_MANY_BINS) ? ">" : "" , new_bins); |
437 | goto Rollback; |
438 | } |
439 | |
440 | // Allocate space for all the new bins that need to be created beforehand |
441 | if (delta_bins > 0 && rd->ns->storage_data_in_memory && ! rd->ns->single_bin) { |
442 | as_bin_allocate_bin_space(rd, delta_bins); |
443 | } |
444 | |
445 | if (!rd->ns->storage_data_in_memory && !urecord->particle_data) { |
446 | urecord->particle_data = cf_malloc(rd->ns->storage_write_block_size); |
447 | urecord->cur_particle_data = urecord->particle_data; |
448 | urecord->end_particle_data = urecord->particle_data + rd->ns->storage_write_block_size; |
449 | } |
450 | |
451 | if (has_sindex) { |
452 | SINDEX_GRLOCK(); |
453 | } |
454 | |
455 | // In second iteration apply updates. |
456 | for(uint32_t i = 0; i < urecord->nupdates; i++ ) { |
457 | if ( urecord->updates[i].dirty && rc == 0) { |
458 | |
459 | char * k = urecord->updates[i].name; |
460 | as_val * v = urecord->updates[i].value; |
461 | |
462 | if ( k != NULL ) { |
463 | if ( v == NULL || v->type == AS_NIL ) { |
464 | // if the value is NIL, then do a delete |
465 | cf_detail(AS_UDF, "execute update: position %d deletes bin %s" , i, k); |
466 | old_values[i] = udf_record_storage_get(urecord, k); |
467 | // Only case delete fails if bin is not found that is |
468 | // as good as delete. Ignore return code !! |
469 | udf_aerospike_delbin(urecord, k); |
470 | |
471 | if (urecord->dirty != NULL) { |
472 | xdr_fill_dirty_bins(urecord->dirty); |
473 | } |
474 | } |
475 | else { |
476 | // otherwise, it is a set |
477 | cf_detail(AS_UDF, "execute update: position %d sets bin %s" , i, k); |
478 | old_values[i] = udf_record_storage_get(urecord, k); |
479 | rc = udf_aerospike_setbin(urecord, i, k, v); |
480 | if (rc) { |
481 | if (old_values[i]) { |
482 | as_val_destroy(old_values[i]); |
483 | old_values[i] = NULL; |
484 | } |
485 | failmax = i; |
486 | goto Rollback; |
487 | } |
488 | |
489 | if (urecord->dirty != NULL) { |
490 | xdr_add_dirty_bin(ns, urecord->dirty, k, strlen(k)); |
491 | } |
492 | } |
493 | } |
494 | |
495 | is_record_dirty = true; |
496 | } |
497 | } |
498 | |
499 | { |
500 | if (! as_storage_record_size_and_check(rd)) { |
501 | cf_warning(AS_UDF, "record failed storage size check, will not be updated" ); |
502 | failmax = (int)urecord->nupdates; |
503 | goto Rollback; |
504 | } |
505 | |
506 | if (rd->ns->clock_skew_stop_writes) { |
507 | failmax = (int)urecord->nupdates; |
508 | goto Rollback; |
509 | } |
510 | |
511 | if (rd->ns->stop_writes) { |
512 | cf_warning(AS_UDF, "UDF failed by stop-writes, record will not be updated" ); |
513 | failmax = (int)urecord->nupdates; |
514 | goto Rollback; |
515 | } |
516 | |
517 | if (! as_storage_has_space(rd->ns)) { |
518 | cf_warning(AS_UDF, "drives full, record will not be updated" ); |
519 | failmax = (int)urecord->nupdates; |
520 | goto Rollback; |
521 | } |
522 | |
523 | if (! is_valid_ttl(urecord->tr->msgp->msg.record_ttl)) { |
524 | cf_warning(AS_UDF, "invalid ttl %u" , urecord->tr->msgp->msg.record_ttl); |
525 | failmax = (int)urecord->nupdates; |
526 | goto Rollback; |
527 | } |
528 | } |
529 | |
530 | if (has_sindex) { |
531 | SINDEX_GRUNLOCK(); |
532 | } |
533 | |
534 | // If there were updates do miscellaneous successful commit |
535 | // tasks |
536 | if (is_record_dirty |
537 | || (urecord->flag & UDF_RECORD_FLAG_METADATA_UPDATED)) { |
538 | urecord->flag |= UDF_RECORD_FLAG_HAS_UPDATES; // will write to storage |
539 | } |
540 | |
541 | // Clean up oldvalue cache and reset dirty. All the changes made |
542 | // here has made to the particle buffer. Nothing will now be backed out. |
543 | for (uint32_t i = 0; i < urecord->nupdates; i++) { |
544 | if (old_values[i]) { |
545 | as_val_destroy(old_values[i]); |
546 | } |
547 | urecord->updates[i].dirty = false; |
548 | } |
549 | return rc; |
550 | |
551 | Rollback: |
552 | cf_debug(AS_UDF, "Rollback Called: failmax %d" , failmax); |
553 | for (int i = 0; i < failmax; i++) { |
554 | if (urecord->updates[i].dirty) { |
555 | char * k = urecord->updates[i].name; |
556 | // Pick the oldvalue for rollback |
557 | as_val * v = old_values[i]; |
558 | if ( k != NULL ) { |
559 | if ( v == NULL || v->type == AS_NIL ) { |
560 | // if the value is NIL, then do a delete |
561 | cf_detail(AS_UDF, "execute rollback: position %d deletes bin %s" , i, k); |
562 | rc = udf_aerospike_delbin(urecord, k); |
563 | } |
564 | else { |
565 | // otherwise, it is a set |
566 | cf_detail(AS_UDF, "execute rollback: position %d sets bin %s" , i, k); |
567 | rc = udf_aerospike_setbin(urecord, i, k, v); |
568 | if (rc) { |
569 | cf_warning(AS_UDF, "Rollback failed .. not good ... !!" ); |
570 | } |
571 | } |
572 | } |
573 | if (v) { |
574 | as_val_destroy(v); |
575 | cf_debug(AS_UDF, "ROLLBACK as_val_destroy()" ); |
576 | } |
577 | } |
578 | } |
579 | |
580 | if (is_record_dirty && urecord->dirty != NULL) { |
581 | xdr_clear_dirty_bins(urecord->dirty); |
582 | } |
583 | |
584 | if (has_sindex) { |
585 | SINDEX_GRUNLOCK(); |
586 | } |
587 | |
588 | // Reset the flat size in case the stuff is backedout !!! it should not |
589 | // fail in the backout code ... |
590 | if (! as_storage_record_size_and_check(rd)) { |
591 | cf_warning(AS_UDF, "Does not fit even after rollback... it is trouble" ); |
592 | } |
593 | |
594 | // Do not clean up the cache in case of failure |
595 | return -1; |
596 | } |
597 | |
598 | /* |
599 | * Internal function: udf_aerospike_execute_updates |
600 | * |
601 | * Parameters: |
602 | * rec - udf record to be updated |
603 | * |
604 | * Return values |
605 | * 0 on success |
606 | * -1 on failure |
607 | * |
608 | * Description: |
609 | * Execute set of udf_record updates. If these updates are successfully |
610 | * applied atomically, the storage record is closed (committed to the disk) |
611 | * and reopened. The cache is freed up at the end. |
612 | * |
613 | * Callers: |
614 | * udf_aerospike_rec_create, interface func - aerospike:create(r) |
615 | * udf_aerospike_rec_update, interface func - aerospike:update(r) |
616 | * udf_aerospike__execute_updates is the key function which is executed in these |
617 | * functions. The return value is directly passed on to the lua. |
618 | */ |
619 | int |
620 | udf_aerospike__execute_updates(udf_record * urecord) |
621 | { |
622 | int rc = 0; |
623 | as_storage_rd *rd = urecord->rd; |
624 | |
625 | if ( urecord->nupdates == 0 && |
626 | (urecord->flag & UDF_RECORD_FLAG_METADATA_UPDATED) == 0 ) { |
627 | cf_detail(AS_UDF, "No Update when execute update is called" ); |
628 | return 0; |
629 | } |
630 | |
631 | // fail updates in case update is not allowed. Queries and scans do not |
632 | // not allow updates. Updates will never be true .. just being paranoid |
633 | if (!(urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES)) { |
634 | cf_warning(AS_UDF, "Udf: execute updates: allow updates false; FAIL" ); |
635 | return -1; |
636 | } |
637 | |
638 | // Commit semantics is either all the update make it or none of it |
639 | rc = udf_aerospike__apply_update_atomic(urecord); |
640 | |
641 | // allocate down if bins are deleted / not in use |
642 | if (rd->ns && rd->ns->storage_data_in_memory && ! rd->ns->single_bin) { |
643 | int32_t delta_bins = (int32_t)as_bin_inuse_count(rd) - (int32_t)rd->n_bins; |
644 | if (delta_bins) { |
645 | as_bin_allocate_bin_space(rd, delta_bins); |
646 | } |
647 | } |
648 | return rc; |
649 | } |
650 | |
651 | static void |
652 | udf_aerospike_destroy(as_aerospike * as) |
653 | { |
654 | as_aerospike_destroy(as); |
655 | } |
656 | |
657 | static cf_clock |
658 | udf_aerospike_get_current_time(const as_aerospike * as) |
659 | { |
660 | (void)as; |
661 | return cf_clock_getabsolute(); |
662 | } |
663 | |
664 | /** |
665 | * aerospike::create(record) |
666 | * Function: udf_aerospike_rec_create |
667 | * |
668 | * Parameters: |
669 | * as - as_aerospike |
670 | * rec - as_rec |
671 | * |
672 | * Return Values: |
673 | * 1 if record is being read or on a create, it already exists |
674 | * o/w return value of udf_aerospike__execute_updates |
675 | * |
676 | * Description: |
677 | * Create a new record in local storage. |
678 | * The record will only be created if it does not exist. |
679 | * This assumes the record has a digest that is valid for local storage. |
680 | * |
681 | * Synchronization : object lock acquired by the transaction thread executing UDF. |
682 | * Partition reservation takes place just before the transaction starts executing |
683 | * ( look for as_partition_reserve_udf in thr_tsvc.c ) |
684 | * |
685 | * Callers: |
686 | * lua interfacing function, mod_lua_aerospike_rec_create |
687 | * The return value of udf_aerospike_rec_create is pushed on to the lua stack |
688 | * |
689 | * Notes: |
690 | * The 'read' and 'exists' flag of udf_record are set to true. |
691 | */ |
692 | static int |
693 | udf_aerospike_rec_create(const as_aerospike * as, const as_rec * rec) |
694 | { |
695 | int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__); |
696 | if (ret) { |
697 | return ret; |
698 | } |
699 | |
700 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
701 | |
702 | // make sure record isn't already successfully read |
703 | if ((urecord->flag & UDF_RECORD_FLAG_OPEN) != 0) { |
704 | if (as_bin_inuse_has(urecord->rd)) { |
705 | cf_detail(AS_UDF, "udf_aerospike_rec_create: Record Already Exists" ); |
706 | return 1; |
707 | } |
708 | // else - binless record ok... |
709 | |
710 | if ((ret = udf_aerospike__execute_updates(urecord)) != 0) { |
711 | cf_warning(AS_UDF, "udf_aerospike_rec_create: failure executing record updates" ); |
712 | udf_aerospike_rec_remove(as, rec); |
713 | } |
714 | |
715 | return ret; |
716 | } |
717 | |
718 | as_transaction *tr = urecord->tr; |
719 | as_index_ref *r_ref = urecord->r_ref; |
720 | as_storage_rd *rd = urecord->rd; |
721 | as_index_tree *tree = tr->rsv.tree; |
722 | |
723 | // make sure we got the record as a create |
724 | int rv = as_record_get_create(tree, &tr->keyd, r_ref, tr->rsv.ns); |
725 | cf_detail_digest(AS_UDF, &tr->keyd, "Creating Record " ); |
726 | |
727 | // rv 0 means record exists, 1 means create, < 0 means fail |
728 | // TODO: Verify correct result codes. |
729 | if (rv == 1) { |
730 | // Record created. |
731 | } else if (rv == 0) { |
732 | // If it's an expired or truncated record, pretend it's a fresh create. |
733 | if (as_record_is_doomed(r_ref->r, tr->rsv.ns)) { |
734 | as_record_rescue(r_ref, tr->rsv.ns); |
735 | } else { |
736 | cf_warning(AS_UDF, "udf_aerospike_rec_create: Record Already Exists 2" ); |
737 | as_record_done(r_ref, tr->rsv.ns); |
738 | // DO NOT change it has special meaning for caller |
739 | return 1; |
740 | } |
741 | } else if (rv < 0) { |
742 | cf_detail_digest(AS_UDF, &tr->keyd, "udf_aerospike_rec_create: Record Open Failed with rv=%d " , rv); |
743 | return rv; |
744 | } |
745 | |
746 | // Associates the set name with the storage rec and index |
747 | if (tr->msgp) { |
748 | // Set the set name to index and close record if the setting the set name |
749 | // is not successful |
750 | int rv_set = as_transaction_has_set(tr) ? |
751 | set_set_from_msg(r_ref->r, tr->rsv.ns, &tr->msgp->msg) : 0; |
752 | if (rv_set != 0) { |
753 | cf_warning(AS_UDF, "udf_aerospike_rec_create: Failed to set setname" ); |
754 | as_index_delete(tree, &tr->keyd); |
755 | as_record_done(r_ref, tr->rsv.ns); |
756 | return 4; |
757 | } |
758 | |
759 | // Don't write record if it would be truncated. |
760 | if (as_truncate_now_is_truncated(tr->rsv.ns, as_index_get_set_id(r_ref->r))) { |
761 | as_index_delete(tree, &tr->keyd); |
762 | as_record_done(r_ref, tr->rsv.ns); |
763 | return 4; |
764 | } |
765 | } |
766 | |
767 | // open up storage |
768 | as_storage_record_create(tr->rsv.ns, r_ref->r, rd); |
769 | |
770 | // Shortcut for set name storage. |
771 | as_storage_record_get_set_name(rd); |
772 | |
773 | // If the message has a key, apply it to the record. |
774 | if (! get_msg_key(tr, rd)) { |
775 | cf_warning(AS_UDF, "udf_aerospike_rec_create: Can't store key" ); |
776 | as_storage_record_close(rd); |
777 | as_index_delete(tree, &tr->keyd); |
778 | as_record_done(r_ref, tr->rsv.ns); |
779 | return 4; |
780 | } |
781 | |
782 | // if multibin storage, we will use urecord->stack_bins, so set the size appropriately |
783 | if (rd->ns->single_bin) { |
784 | rd->n_bins = 1; |
785 | } |
786 | else if (! rd->ns->storage_data_in_memory) { |
787 | rd->n_bins = sizeof(urecord->stack_bins) / sizeof(as_bin); |
788 | } |
789 | |
790 | // side effect: will set the unused bins to properly unused |
791 | as_storage_rd_load_bins(rd, urecord->stack_bins); // TODO - handle error returned |
792 | |
793 | int rc = udf_aerospike__execute_updates(urecord); |
794 | |
795 | if (rc != 0) { |
796 | // Creating the udf record failed, destroy the as_record |
797 | cf_warning(AS_UDF, "udf_aerospike_rec_create: failure executing record updates (%d)" , rc); |
798 | udf_record_close(urecord); // handles particle data and cache only |
799 | as_storage_record_close(rd); |
800 | as_index_delete(tree, &tr->keyd); |
801 | as_record_done(r_ref, tr->rsv.ns); |
802 | return rc; |
803 | } |
804 | |
805 | // Success... |
806 | |
807 | urecord->flag |= UDF_RECORD_FLAG_OPEN | UDF_RECORD_FLAG_STORAGE_OPEN; |
808 | |
809 | return 0; |
810 | } |
811 | |
812 | /** |
813 | * aerospike::update(record) |
814 | * Function: udf_aerospike_rec_update |
815 | * |
816 | * Parameters: |
817 | * |
818 | * Return Values: |
819 | * -2 if record does not exist |
820 | * o/w return value of udf_aerospike__execute_updates |
821 | * |
822 | * Description: |
823 | * Updates an existing record in local storage. |
824 | * The record will only be updated if it exists. |
825 | * |
826 | * Synchronization : object lock acquired by the transaction thread executing UDF. |
827 | * Partition reservation takes place just before the transaction starts executing |
828 | * ( look for as_partition_reserve_udf in thr_tsvc.c ) |
829 | * |
830 | * Callers: |
831 | * lua interfacing function, mod_lua_aerospike_rec_update |
832 | * The return value of udf_aerospike_rec_update is pushed on to the lua stack |
833 | * |
834 | * Notes: |
835 | * If the record does not exist or is not read by anyone yet, we cannot |
836 | * carry on with the update. 'exists' and 'set' are set to false on record |
837 | * init or record remove. |
838 | */ |
839 | static int |
840 | udf_aerospike_rec_update(const as_aerospike * as, const as_rec * rec) |
841 | { |
842 | int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__); |
843 | if (ret) { |
844 | return ret; |
845 | } |
846 | |
847 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
848 | |
849 | // make sure record exists and is already opened up |
850 | if (!urecord || !(urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) |
851 | || !(urecord->flag & UDF_RECORD_FLAG_OPEN) ) { |
852 | cf_warning(AS_UDF, "Record not found to be open while updating urecord flag=%d" , urecord ? urecord->flag : -1); |
853 | return -2; |
854 | } |
855 | cf_detail_digest(AS_UDF, &urecord->rd->r->keyd, "Executing Updates" ); |
856 | ret = udf_aerospike__execute_updates(urecord); |
857 | |
858 | if (ret < 0) { |
859 | cf_warning(AS_UDF, "udf_aerospike_rec_update: failure executing record updates (%d)" , ret); |
860 | } |
861 | |
862 | return ret; |
863 | } |
864 | |
865 | /** |
866 | * Function udf_aerospike_rec_exists |
867 | * |
868 | * Parameters: |
869 | * |
870 | * Return Values: |
871 | * 1 if record exists |
872 | * 0 o/w |
873 | * |
874 | * Description: |
875 | * Check to see if the record exists |
876 | */ |
877 | static int |
878 | udf_aerospike_rec_exists(const as_aerospike * as, const as_rec * rec) |
879 | { |
880 | int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__); |
881 | if (ret) { |
882 | return ret; |
883 | } |
884 | |
885 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
886 | |
887 | return (urecord && (urecord->flag & UDF_RECORD_FLAG_OPEN)) ? true : false; |
888 | } |
889 | |
890 | /* |
891 | * Function: udf_aerospike_rec_remove |
892 | * |
893 | * Parameters: |
894 | * |
895 | * Return Values: |
896 | * 1 if record does not exist |
897 | * 0 on success |
898 | * |
899 | * Description: |
900 | * Removes an existing record from local storage. |
901 | * The record will only be removed if it exists. |
902 | */ |
903 | static int |
904 | udf_aerospike_rec_remove(const as_aerospike * as, const as_rec * rec) |
905 | { |
906 | int ret = udf_aerospike_param_check(as, rec, __FILE__, __LINE__); |
907 | if (ret) { |
908 | return ret; |
909 | } |
910 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
911 | |
912 | // make sure record is already exists before removing it |
913 | if (!urecord || !(urecord->flag & UDF_RECORD_FLAG_OPEN)) { |
914 | return 1; |
915 | } |
916 | |
917 | as_storage_rd* rd = urecord->rd; |
918 | |
919 | if (rd->ns->storage_data_in_memory && ! rd->ns->single_bin) { |
920 | delete_adjust_sindex(rd); |
921 | } |
922 | |
923 | as_record_destroy_bins(rd); |
924 | |
925 | if (rd->ns->storage_data_in_memory && ! rd->ns->single_bin) { |
926 | as_record_free_bin_space(rd->r); |
927 | rd->bins = NULL; |
928 | rd->n_bins = 0; |
929 | } |
930 | |
931 | if (urecord->particle_data) { |
932 | cf_free(urecord->particle_data); |
933 | urecord->particle_data = NULL; |
934 | } |
935 | |
936 | udf_record_cache_free(urecord); |
937 | urecord->flag |= UDF_RECORD_FLAG_HAS_UPDATES; |
938 | |
939 | return 0; |
940 | } |
941 | |
942 | /** |
943 | * Writes a log message |
944 | */ |
945 | static int |
946 | udf_aerospike_log(const as_aerospike * a, const char * file, const int line, const int lvl, const char * msg) |
947 | { |
948 | (void)a; |
949 | cf_fault_event(AS_UDF, lvl, file, line, "%s" , (char *) msg); |
950 | return 0; |
951 | } |
952 | |
953 | // Would someone please explain the structure of these hooks? Why are some null? |
954 | const as_aerospike_hooks udf_aerospike_hooks = { |
955 | .rec_create = udf_aerospike_rec_create, |
956 | .rec_update = udf_aerospike_rec_update, |
957 | .rec_remove = udf_aerospike_rec_remove, |
958 | .rec_exists = udf_aerospike_rec_exists, |
959 | .log = udf_aerospike_log, |
960 | .get_current_time = udf_aerospike_get_current_time, |
961 | .destroy = udf_aerospike_destroy |
962 | }; |
963 | |