1 | /* |
2 | * udf_record.c |
3 | * |
4 | * Copyright (C) 2012-2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #include "base/udf_record.h" |
24 | |
25 | #include <stdbool.h> |
26 | #include <stddef.h> |
27 | #include <stdint.h> |
28 | #include <string.h> |
29 | |
30 | #include "aerospike/as_rec.h" |
31 | #include "aerospike/as_val.h" |
32 | #include "citrusleaf/alloc.h" |
33 | #include "citrusleaf/cf_atomic.h" |
34 | #include "citrusleaf/cf_byte_order.h" |
35 | #include "citrusleaf/cf_clock.h" |
36 | |
37 | #include "fault.h" |
38 | |
39 | #include "base/cfg.h" |
40 | #include "base/datamodel.h" |
41 | #include "base/index.h" |
42 | #include "base/transaction.h" |
43 | #include "storage/storage.h" |
44 | #include "transaction/rw_utils.h" |
45 | #include "transaction/udf.h" |
46 | |
47 | |
48 | /* |
49 | * Function: Open storage record for passed in udf record |
50 | * also set up flag like exists / read et al. |
51 | * |
52 | * Parameters: |
53 | * urec : UDF record |
54 | * |
55 | * Return value : 0 on success |
56 | * -1 if the record's bin count exceeds the UDF limit |
57 | * |
58 | * Callers: |
59 | * udf_record_open |
60 | * |
61 | * Note: There are no checks, so the caller has to make sure that all |
62 | * protections are taken and all checks are done. |
63 | * |
64 | * Side effect: |
65 | * Counters will be reset |
66 | * flag will be set |
67 | * bins will be opened |
68 | */ |
69 | int |
70 | udf_storage_record_open(udf_record *urecord) |
71 | { |
72 | cf_debug_digest(AS_UDF, &urecord->tr->keyd, "[ENTER] Opening record key:" ); |
73 | as_storage_rd *rd = urecord->rd; |
74 | as_index *r = urecord->r_ref->r; |
75 | as_transaction *tr = urecord->tr; |
76 | |
77 | as_storage_record_open(tr->rsv.ns, r, rd); |
78 | |
79 | // Deal with delete durability (enterprise only). |
80 | if ((urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES) != 0 && |
81 | set_delete_durablility(tr, rd) != 0) { |
82 | as_storage_record_close(rd); |
83 | return -1; |
84 | } |
85 | |
86 | as_storage_rd_load_n_bins(rd); // TODO - handle error returned |
87 | |
88 | if (rd->n_bins > UDF_RECORD_BIN_ULIMIT) { |
89 | cf_warning(AS_UDF, "record has too many bins (%d) for UDF processing" , rd->n_bins); |
90 | as_storage_record_close(rd); |
91 | return -1; |
92 | } |
93 | |
94 | // if multibin storage, we will use urecord->stack_bins, so set the size appropriately |
95 | if ( ! tr->rsv.ns->storage_data_in_memory && ! tr->rsv.ns->single_bin ) { |
96 | rd->n_bins = sizeof(urecord->stack_bins) / sizeof(as_bin); |
97 | } |
98 | |
99 | as_storage_rd_load_bins(rd, urecord->stack_bins); // TODO - handle error returned |
100 | urecord->starting_memory_bytes = as_storage_record_get_n_bytes_memory(rd); |
101 | |
102 | as_storage_record_get_set_name(rd); |
103 | as_storage_record_get_key(rd); |
104 | |
105 | urecord->flag |= UDF_RECORD_FLAG_STORAGE_OPEN; |
106 | |
107 | cf_detail_digest(AS_UDF, &tr->keyd, "Storage Open: Rec(%p) flag(%x) Digest:" , urecord, urecord->flag); |
108 | return 0; |
109 | } |
110 | |
111 | /* |
112 | * Function: Close storage record if it open and also set flags |
113 | * |
114 | * Parameters: |
115 | * urec : UDF record |
116 | * |
117 | * Return value : 0 in case storage was open |
118 | * 1 in case storage was not open |
119 | * |
120 | * Callers: |
121 | * udf_record_close |
122 | * |
123 | * Side effect: |
124 | * flag will be reset |
125 | * bins will be closed |
126 | */ |
127 | int |
128 | udf_storage_record_close(udf_record *urecord) |
129 | { |
130 | if (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) { |
131 | as_index_ref *r_ref = urecord->r_ref; |
132 | as_storage_rd *rd = urecord->rd; |
133 | |
134 | bool has_bins = as_bin_inuse_has(rd); |
135 | |
136 | if (r_ref) { |
137 | if (urecord->flag & UDF_RECORD_FLAG_HAS_UPDATES) { |
138 | as_storage_record_write(rd); |
139 | |
140 | // The urecord fields survive as_storage_record_close(). |
141 | urecord->pickle = rd->pickle; |
142 | urecord->pickle_sz = rd->pickle_sz; |
143 | |
144 | urecord->flag &= ~UDF_RECORD_FLAG_HAS_UPDATES; // TODO - necessary? |
145 | } |
146 | |
147 | as_storage_record_close(rd); |
148 | |
149 | if (! has_bins) { |
150 | write_delete_record(r_ref->r, urecord->tr->rsv.tree); |
151 | } |
152 | } else { |
153 | // Should never happen. |
154 | cf_warning(AS_UDF, "Unexpected Internal Error (null r_ref)" ); |
155 | } |
156 | |
157 | urecord->flag &= ~UDF_RECORD_FLAG_STORAGE_OPEN; |
158 | cf_detail_digest(AS_UDF, &urecord->tr->keyd, "Storage Close:: Rec(%p) Flag(%x) Digest:" , |
159 | urecord, urecord->flag ); |
160 | return 0; |
161 | } else { |
162 | return 1; |
163 | } |
164 | } |
165 | |
166 | /* |
167 | * Function: Open storage record for passed in udf record |
168 | * also set up flag like exists / read et al. |
169 | * Does as_record_get as well if it is not done yet. |
170 | * |
171 | * Parameters: |
172 | * urec : UDF record |
173 | * |
174 | * Return value : |
175 | * 0 in case record is successfully read |
176 | * -1 in case record is not found |
177 | * -2 in case record is found but has expired |
178 | * |
179 | * Callers: |
180 | * query_agg_istream_read |
181 | */ |
182 | int |
183 | udf_record_open(udf_record * urecord) |
184 | { |
185 | cf_debug_digest(AS_UDF, &urecord->tr->keyd, "[ENTER] Opening record key:" ); |
186 | if (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) { |
187 | cf_info(AS_UDF, "Record already open" ); |
188 | return 0; |
189 | } |
190 | as_transaction *tr = urecord->tr; |
191 | as_index_ref *r_ref = urecord->r_ref; |
192 | as_index_tree *tree = tr->rsv.tree; |
193 | |
194 | int rec_rv = 0; |
195 | if (!(urecord->flag & UDF_RECORD_FLAG_OPEN)) { |
196 | cf_detail(AS_UDF, "Opening Record" ); |
197 | rec_rv = as_record_get_live(tree, &tr->keyd, r_ref, tr->rsv.ns); |
198 | } |
199 | |
200 | if (!rec_rv) { |
201 | as_index *r = r_ref->r; |
202 | // check to see this isn't an expired record waiting to die |
203 | if (as_record_is_doomed(r, tr->rsv.ns)) { |
204 | as_record_done(r_ref, tr->rsv.ns); |
205 | cf_detail(AS_UDF, "udf_record_open: Record has expired cannot read" ); |
206 | rec_rv = -2; |
207 | } else { |
208 | urecord->flag |= UDF_RECORD_FLAG_OPEN; |
209 | urecord->flag |= UDF_RECORD_FLAG_PREEXISTS; |
210 | cf_detail_digest(AS_UDF, &tr->keyd, "Open %p %x Digest:" , urecord, urecord->flag); |
211 | rec_rv = udf_storage_record_open(urecord); |
212 | } |
213 | } else { |
214 | cf_detail_digest(AS_UDF, &urecord->tr->keyd, "udf_record_open: rec_get returned with %d " , |
215 | rec_rv); |
216 | } |
217 | return rec_rv; |
218 | } |
219 | |
220 | /* |
221 | * Function: Close storage record for udf record. Release |
222 | * all locks and partition reservation / namespace |
223 | * reservation etc. if requested. |
224 | * Also cleans up entire cache (updated from udf) |
225 | * |
226 | * Parameters: |
227 | * urec : UDF record being operated on |
228 | * |
229 | * Return value : Nothing |
230 | * |
231 | * Callers: |
232 | * query_agg_istream_read |
233 | * as_query__agg |
234 | * udf_record_destroy |
235 | */ |
236 | void |
237 | udf_record_close(udf_record *urecord) |
238 | { |
239 | as_transaction *tr = urecord->tr; |
240 | cf_debug_digest(AS_UDF, &tr->keyd, "[ENTER] Closing record key:" ); |
241 | |
242 | if (urecord->flag & UDF_RECORD_FLAG_OPEN) { |
243 | as_index_ref *r_ref = urecord->r_ref; |
244 | cf_detail(AS_UDF, "Closing Record" ); |
245 | udf_storage_record_close(urecord); |
246 | as_record_done(r_ref, tr->rsv.ns); |
247 | urecord->flag &= ~UDF_RECORD_FLAG_OPEN; |
248 | cf_detail_digest(AS_UDF, &urecord->tr->keyd, |
249 | "Storage Close:: Rec(%p) Flag(%x) Digest:" , urecord, urecord->flag ); |
250 | } |
251 | |
252 | // Replication happens when the main record replicates |
253 | if (urecord->particle_data) { |
254 | cf_free(urecord->particle_data); |
255 | urecord->particle_data = 0; |
256 | } |
257 | udf_record_cache_free(urecord); |
258 | } |
259 | |
260 | /* |
261 | * Function: This function called to reinitialize the udf_record. It sets up |
262 | * the basic value back to default. Can be called after the UDF |
263 | * record has been used. Reset the fact that record pre_exits or |
264 | * was actually read etc. |
265 | * |
266 | * Parameters: |
267 | * urec : UDF record being initialized |
268 | * |
269 | * Return value : Nothing |
270 | * |
271 | * Callers: |
272 | * udf_rw_local (parent record before calling UDF) |
273 | */ |
274 | void |
275 | udf_record_init(udf_record *urecord, bool allow_updates) |
276 | { |
277 | urecord->tr = NULL; |
278 | urecord->r_ref = NULL; |
279 | urecord->rd = NULL; |
280 | urecord->dirty = NULL; |
281 | urecord->nupdates = 0; |
282 | urecord->particle_data = NULL; |
283 | urecord->cur_particle_data = NULL; |
284 | urecord->end_particle_data = NULL; |
285 | urecord->starting_memory_bytes = 0; |
286 | |
287 | // Init flag |
288 | urecord->flag = UDF_RECORD_FLAG_ISVALID; |
289 | |
290 | if (allow_updates) { |
291 | urecord->flag |= UDF_RECORD_FLAG_ALLOW_UPDATES; |
292 | } |
293 | |
294 | urecord->keyd = cf_digest_zero; |
295 | for (uint32_t i = 0; i < UDF_RECORD_BIN_ULIMIT; i++) { |
296 | urecord->updates[i].particle_buf = NULL; |
297 | } |
298 | |
299 | urecord->pickle = NULL; |
300 | urecord->pickle_sz = 0; |
301 | } |
302 | |
303 | /* |
304 | static int print_buffer(as_buffer * buff) { |
305 | msgpack_sbuffer sbuf; |
306 | msgpack_sbuffer_init(&sbuf); |
307 | |
308 | sbuf.data = buff->data; |
309 | sbuf.size = buff->size; |
310 | sbuf.alloc = buff->capacity; |
311 | |
312 | msgpack_zone mempool; |
313 | msgpack_zone_init(&mempool, 2048); |
314 | |
315 | msgpack_object deserialized; |
316 | msgpack_unpack(sbuf.data, sbuf.size, NULL, &mempool, &deserialized); |
317 | |
318 | printf("msg_buf:\n"); |
319 | msgpack_object_print(stdout, deserialized); |
320 | puts(""); |
321 | |
322 | msgpack_zone_destroy(&mempool); |
323 | return 0; |
324 | } |
325 | */ |
326 | |
327 | /* |
328 | * Function: Get bin value from cached copy. All the update in a |
329 | * commit window is not applied to the record directly |
330 | * but maintained in-memory cache. This function used |
331 | * to retrieve cached value |
332 | * |
333 | * Similar function for get and free of cache |
334 | * |
335 | * Return value : |
336 | * value (as_val) in case of success [for get] |
337 | * NULL in case of failure |
338 | * set and free return Nothing |
339 | * |
340 | * Callers: |
341 | * GET and SET |
342 | * udf_record_get |
343 | * udf_record_set |
344 | * udf_record_remove |
345 | * |
346 | * FREE |
347 | * udf_aerospike__execute_updates (when crossing commit window) |
348 | * udf_record_close (finally closing record) |
349 | * udf_rw_commit (commit the udf record) |
350 | */ |
351 | static as_val * |
352 | udf_record_cache_get(udf_record * urecord, const char * name) |
353 | { |
354 | cf_debug(AS_UDF, "[ENTER] BinName(%s) " , name ); |
355 | if ( urecord->nupdates > 0 ) { |
356 | cf_detail(AS_UDF, "udf_record_get: %s find" , name); |
357 | for ( uint32_t i = 0; i < urecord->nupdates; i++ ) { |
358 | udf_record_bin * bin = &(urecord->updates[i]); |
359 | if ( strncmp(name, bin->name, AS_BIN_NAME_MAX_SZ) == 0 ) { |
360 | cf_detail(AS_UDF, "Bin %s found, type(%d)" , name, bin->value->type ); |
361 | return bin->value; // note it's OK if the bin contains a nil |
362 | } |
363 | } |
364 | } |
365 | return NULL; |
366 | } |
367 | |
368 | void |
369 | udf_record_cache_free(udf_record * urecord) |
370 | { |
371 | cf_debug(AS_UDF, "[ENTER] NumUpdates(%d) " , urecord->nupdates ); |
372 | |
373 | for (uint32_t i = 0; i < urecord->nupdates; i ++ ) { |
374 | udf_record_bin * bin = &urecord->updates[i]; |
375 | if ( bin->value != NULL ) { |
376 | as_val_destroy(bin->value); |
377 | bin->value = NULL; |
378 | } |
379 | } |
380 | |
381 | for (uint32_t i = 0; i < UDF_RECORD_BIN_ULIMIT; i++) { |
382 | if (urecord->updates[i].particle_buf) { |
383 | cf_free(urecord->updates[i].particle_buf); |
384 | urecord->updates[i].particle_buf = NULL; |
385 | } |
386 | } |
387 | urecord->nupdates = 0; |
388 | urecord->flag &= ~UDF_RECORD_FLAG_TOO_MANY_BINS; |
389 | } |
390 | |
391 | /** |
392 | * Set the cache value for a bin, including flags. |
393 | */ |
394 | static void |
395 | udf_record_cache_set(udf_record * urecord, const char * name, as_val * value, |
396 | bool dirty) |
397 | { |
398 | cf_debug(AS_UDF, "[ENTER] urecord(%p) name(%p)[%s] dirty(%d)" , |
399 | urecord, name, name, dirty); |
400 | |
401 | bool modified = false; |
402 | |
403 | for ( uint32_t i = 0; i < urecord->nupdates; i++ ) { |
404 | udf_record_bin * bin = &(urecord->updates[i]); |
405 | |
406 | // bin exists, then we will release old value and set new value. |
407 | if ( strncmp(name, bin->name, AS_BIN_NAME_MAX_SZ) == 0 ) { |
408 | cf_detail(AS_UDF, "udf_record_set: %s found" , name); |
409 | |
410 | // release previously set value |
411 | as_val_destroy(bin->value); |
412 | |
413 | // set new value, with dirty flag |
414 | if( value != NULL ) { |
415 | bin->value = (as_val *) value; |
416 | } |
417 | bin->dirty = dirty; |
418 | cf_detail(AS_UDF, "udf_record_set: %s set for %p:%p" , name, |
419 | urecord, bin->value); |
420 | |
421 | modified = true; |
422 | break; |
423 | } |
424 | } |
425 | |
426 | // If not modified, then we will add the bin to the cache |
427 | if ( ! modified ) { |
428 | if ( urecord->nupdates < UDF_RECORD_BIN_ULIMIT ) { |
429 | udf_record_bin * bin = &(urecord->updates[urecord->nupdates]); |
430 | strncpy(bin->name, name, AS_BIN_NAME_MAX_SZ); |
431 | bin->value = (as_val *) value; |
432 | bin->dirty = dirty; |
433 | urecord->nupdates++; |
434 | cf_detail(AS_UDF, "udf_record_set: %s not modified, add for %p:%p" , |
435 | name, urecord, bin->value); |
436 | } |
437 | else { |
438 | cf_warning(AS_UDF, "UDF bin limit (%d) exceeded (bin %s)" , |
439 | UDF_RECORD_BIN_ULIMIT, name); |
440 | urecord->flag |= UDF_RECORD_FLAG_TOO_MANY_BINS; |
441 | } |
442 | } |
443 | } |
444 | |
445 | /* |
446 | * Internal Function: Read the bin from storage and convert it |
447 | * into as_val and return |
448 | * |
449 | * Parameters: |
450 | * r : udf record |
451 | * bname: Bin name of the bin which need to be read. |
452 | * |
453 | * Return value : |
454 | * value (as_val *) in case of success |
455 | * NULL in case of failure |
456 | * |
457 | * Description: |
458 | * Expectation is the record is already open. No checks are |
459 | * performed in this function. Caller needs to make sure the |
460 | * record is good to read e.g binname etc. |
461 | * |
462 | * NB: as_val which is returned is allocated one. It is callers |
463 | * responsibility to free else in case it is passed on to |
464 | * lua ... lua has responsibility of garbage collecting it. |
465 | * Hence this function call incurs and malloc cost. |
466 | * |
467 | * Callers: |
468 | * udf_record_get |
469 | */ |
470 | as_val * |
471 | udf_record_storage_get(const udf_record *urecord, const char *name) |
472 | { |
473 | if (!name) { |
474 | cf_detail(AS_UDF, "Passed Null bin name to storage get" ); |
475 | return NULL; |
476 | } |
477 | as_bin * bb = as_bin_get(urecord->rd, name); |
478 | |
479 | if ( !bb ) { |
480 | cf_detail(AS_UDF, "udf_record_get: bin not found (%s)" , name); |
481 | return NULL; |
482 | } |
483 | |
484 | return as_bin_particle_to_asval(bb); |
485 | } |
486 | |
487 | /* |
488 | * Check and validate parameter before performing operation |
489 | * |
490 | * return: |
491 | * 2 : UDF_ERR_INTERNAL_PARAM |
492 | * 3 : UDF_ERR_RECORD_IS_NOT_VALID |
493 | * 4 : UDF_ERR_PARAMETER |
494 | * 0 : Success |
495 | * |
496 | */ |
497 | int |
498 | udf_record_param_check(const as_rec *rec, char *fname, int lineno) |
499 | { |
500 | if (! rec) { |
501 | cf_warning(AS_UDF, "Invalid Parameter: null record" ); |
502 | return UDF_ERR_INTERNAL_PARAMETER; |
503 | } |
504 | |
505 | udf_record *urecord = (udf_record *)as_rec_source(rec); |
506 | if (!urecord) { |
507 | return UDF_ERR_INTERNAL_PARAMETER;; |
508 | } |
509 | |
510 | if (!(urecord->flag & UDF_RECORD_FLAG_ISVALID)) { |
511 | cf_debug(AS_UDF, "(%s:%d): Trying to Open Invalid Record" , fname, lineno); |
512 | return UDF_ERR_RECORD_NOT_VALID; |
513 | } |
514 | |
515 | return 0; |
516 | } |
517 | |
518 | static int |
519 | udf_record_param_check_w_bin(const as_rec *rec, const char *bname, char *fname, int lineno) |
520 | { |
521 | int rv = udf_record_param_check(rec, fname, lineno); |
522 | |
523 | if (rv != 0) { |
524 | return rv; |
525 | } |
526 | |
527 | if (! bname) { |
528 | cf_warning(AS_UDF, "Invalid Parameter: null bin name" ); |
529 | return UDF_ERR_INTERNAL_PARAMETER; |
530 | } |
531 | |
532 | udf_record *urecord = (udf_record *)as_rec_source(rec); |
533 | as_namespace *ns = urecord->tr->rsv.ns; |
534 | |
535 | if (ns->single_bin) { |
536 | if (*bname != 0) { |
537 | cf_warning(AS_UDF, "Invalid Parameter: non-empty bin name in single-bin namespace" ); |
538 | return UDF_ERR_INTERNAL_PARAMETER; |
539 | } |
540 | |
541 | return 0; |
542 | } |
543 | |
544 | if (*bname == 0) { |
545 | cf_warning(AS_UDF, "Invalid Parameter: empty bin name" ); |
546 | return UDF_ERR_INTERNAL_PARAMETER; |
547 | } |
548 | |
549 | if (strlen(bname) >= AS_BIN_NAME_MAX_SZ) { |
550 | cf_warning(AS_UDF, "Invalid Parameter: bin name %s too big" , bname); |
551 | return UDF_ERR_PARAMETER; |
552 | } |
553 | |
554 | if (! as_bin_name_within_quota(ns, bname)) { |
555 | cf_warning(AS_UDF, "{%s} exceeded bin name quota" , ns->name); |
556 | return UDF_ERR_PARAMETER; |
557 | } |
558 | |
559 | return 0; |
560 | } |
561 | |
562 | /********************************************************************* |
563 | * INTERFACE FUNCTIONS * |
564 | * * |
565 | * See the as_aerospike for the API definition * |
566 | ********************************************************************/ |
567 | static as_val * |
568 | udf_record_get(const as_rec * rec, const char * name) |
569 | { |
570 | if (udf_record_param_check_w_bin(rec, name, __FILE__, __LINE__)) { |
571 | return NULL; |
572 | } |
573 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
574 | as_val * value = NULL; |
575 | |
576 | cf_debug(AS_UDF, "[ENTER] rec(%p) name(%s)" , rec, name ); |
577 | |
578 | // Get from cache |
579 | value = udf_record_cache_get(urecord, name); |
580 | |
581 | // If value not NULL, then return it. |
582 | if ( value != NULL ) { |
583 | return value; |
584 | } |
585 | |
586 | // Check in the cache before trying to look up in record |
587 | // Note: Record may not have been created yet ... Do not |
588 | // change the order unless you fully understand what you |
589 | // are doing |
590 | if ( !(urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) ) { |
591 | if (udf_record_open(urecord)) { // lazy read the record from storage |
592 | return NULL; |
593 | } |
594 | } |
595 | |
596 | // Check if storage is available |
597 | if ( !urecord->rd->ns ) { |
598 | cf_detail(AS_UDF, "udf_record_get: storage unavailable" ); |
599 | return NULL; |
600 | } |
601 | |
602 | value = udf_record_storage_get(urecord, name); |
603 | |
604 | // We have a value, so we will cache it. |
605 | // DO NOT remove this. We need to cache copy to makes sure ref count |
606 | // gets decremented post handing this as_val over to the lua world |
607 | if (value) { |
608 | udf_record_cache_set(urecord, name, value, false); |
609 | } |
610 | |
611 | cf_detail(AS_UDF, "udf_record_get: end (%s) [%p,%p]" , name, urecord, value); |
612 | return value; |
613 | } |
614 | |
615 | static int |
616 | udf_record_set(const as_rec * rec, const char * name, const as_val * value) |
617 | { |
618 | int ret = udf_record_param_check_w_bin(rec, name, __FILE__, __LINE__); |
619 | if (ret) { |
620 | return ret; |
621 | } |
622 | |
623 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
624 | cf_detail(AS_UDF, "udf_record_set: begin (%s)" , name); |
625 | if ( urecord && name ) { |
626 | udf_record_cache_set(urecord, name, (as_val *) value, true); |
627 | } |
628 | cf_detail(AS_UDF, "udf_record_set: end (%s)" , name); |
629 | |
630 | return 0; |
631 | } |
632 | |
633 | static int |
634 | udf_record_set_ttl(const as_rec * rec, uint32_t ttl) |
635 | { |
636 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
637 | if (ret) { |
638 | return ret; |
639 | } |
640 | |
641 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
642 | if (!(urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES)) { |
643 | return -1; |
644 | } |
645 | |
646 | urecord->tr->msgp->msg.record_ttl = ttl; |
647 | urecord->flag |= UDF_RECORD_FLAG_METADATA_UPDATED; |
648 | |
649 | return 0; |
650 | } |
651 | |
652 | static int |
653 | udf_record_drop_key(const as_rec * rec) |
654 | { |
655 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
656 | if (ret) { |
657 | return ret; |
658 | } |
659 | |
660 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
661 | if (!(urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES)) { |
662 | return -1; |
663 | } |
664 | |
665 | // Flag the key to be dropped. |
666 | if (urecord->rd->key) { |
667 | urecord->rd->key = NULL; |
668 | urecord->rd->key_size = 0; |
669 | } |
670 | |
671 | urecord->flag |= UDF_RECORD_FLAG_METADATA_UPDATED; |
672 | |
673 | return 0; |
674 | } |
675 | |
676 | static int |
677 | udf_record_remove(const as_rec * rec, const char * name) |
678 | { |
679 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
680 | if (ret) { |
681 | return ret; |
682 | } |
683 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
684 | |
685 | |
686 | cf_detail(AS_UDF, "udf_record_remove: begin (%s)" , name); |
687 | if ( urecord && name ) { |
688 | udf_record_cache_set(urecord, name, (as_val *) &as_nil, true); |
689 | } |
690 | cf_detail(AS_UDF, "udf_record_remove: end (%s)" , name); |
691 | |
692 | return 0; |
693 | } |
694 | |
695 | static uint32_t |
696 | udf_record_ttl(const as_rec * rec) |
697 | { |
698 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
699 | if (ret) { |
700 | return 0; |
701 | } |
702 | |
703 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
704 | |
705 | if ((urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) { |
706 | uint32_t now = as_record_void_time_get(); |
707 | |
708 | return urecord->r_ref->r->void_time > now ? |
709 | urecord->r_ref->r->void_time - now : 0; |
710 | } |
711 | else { |
712 | cf_info(AS_UDF, "Error in getting ttl: no record found" ); |
713 | return 0; // since we can't indicate the record doesn't exist |
714 | } |
715 | return 0; |
716 | } |
717 | |
718 | static uint64_t |
719 | udf_record_last_update_time(const as_rec * rec) |
720 | { |
721 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
722 | if (ret) { |
723 | return 0; |
724 | } |
725 | |
726 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
727 | if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) { |
728 | return urecord->r_ref->r->last_update_time; |
729 | } |
730 | else { |
731 | cf_warning(AS_UDF, "Error getting last update time: no record found" ); |
732 | return 0; |
733 | } |
734 | } |
735 | |
736 | static uint16_t |
737 | udf_record_gen(const as_rec * rec) |
738 | { |
739 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
740 | if (ret) { |
741 | return 0; |
742 | } |
743 | |
744 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
745 | if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) != 0) { |
746 | return plain_generation(urecord->rd->r->generation, urecord->rd->ns); |
747 | } |
748 | else { |
749 | cf_warning(AS_UDF, "Error in getting generation: no record found" ); |
750 | return 0; |
751 | } |
752 | } |
753 | |
754 | // Local utility. |
755 | static as_val * |
756 | as_val_from_flat_key(const uint8_t * flat_key, uint32_t size) |
757 | { |
758 | uint8_t type = *flat_key; |
759 | const uint8_t * key = flat_key + 1; |
760 | |
761 | switch ( type ) { |
762 | case AS_PARTICLE_TYPE_INTEGER: |
763 | if (size != 1 + sizeof(uint64_t)) { |
764 | return NULL; |
765 | } |
766 | // Flat integer keys are in big-endian order. |
767 | return (as_val *) as_integer_new(cf_swap_from_be64(*(int64_t *)key)); |
768 | case AS_PARTICLE_TYPE_STRING: |
769 | { |
770 | // Key length is size - 1, then +1 for null-termination. |
771 | char * buf = cf_malloc(size); |
772 | uint32_t len = size - 1; |
773 | memcpy(buf, key, len); |
774 | buf[len] = '\0'; |
775 | |
776 | return (as_val *) as_string_new(buf, true); |
777 | } |
778 | case AS_PARTICLE_TYPE_BLOB: |
779 | { |
780 | uint32_t blob_size = size - 1; |
781 | uint8_t *buf = cf_malloc(blob_size); |
782 | |
783 | memcpy(buf, key, blob_size); |
784 | |
785 | return (as_val *) as_bytes_new_wrap(buf, blob_size, true); |
786 | } |
787 | default: |
788 | return NULL; |
789 | } |
790 | } |
791 | |
792 | static as_val * |
793 | udf_record_key(const as_rec * rec) |
794 | { |
795 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
796 | if (ret) { |
797 | return NULL; |
798 | } |
799 | |
800 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
801 | if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) { |
802 | if (urecord->rd->key) { |
803 | return as_val_from_flat_key(urecord->rd->key, urecord->rd->key_size); |
804 | } |
805 | // TODO - perhaps look for the key in the message. |
806 | return NULL; |
807 | } |
808 | else { |
809 | cf_warning(AS_UDF, "Error in getting key: no record found" ); |
810 | return NULL; |
811 | } |
812 | } |
813 | |
814 | static const char * |
815 | udf_record_setname(const as_rec * rec) |
816 | { |
817 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
818 | if (ret) { |
819 | return NULL; |
820 | } |
821 | |
822 | udf_record * urecord = (udf_record *) as_rec_source(rec); |
823 | if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) { |
824 | return as_index_get_set_name(urecord->r_ref->r, urecord->rd->ns); |
825 | } |
826 | else { |
827 | cf_warning(AS_UDF, "Error in getting set name: no record found" ); |
828 | return NULL; |
829 | } |
830 | } |
831 | |
832 | bool |
833 | udf_record_destroy(as_rec *rec) |
834 | { |
835 | if (!rec) { |
836 | return false; |
837 | } |
838 | |
839 | udf_record *urecord = (udf_record *) as_rec_source(rec); |
840 | udf_record_close(urecord); |
841 | as_rec_destroy(rec); |
842 | return true; |
843 | } |
844 | |
845 | static as_bytes * |
846 | udf_record_digest(const as_rec *rec) |
847 | { |
848 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
849 | if (ret) { |
850 | return NULL; |
851 | } |
852 | |
853 | udf_record *urecord = (udf_record *)as_rec_source(rec); |
854 | if (urecord && urecord->flag & UDF_RECORD_FLAG_OPEN) { |
855 | cf_digest *keyd = cf_malloc(sizeof(cf_digest)); |
856 | memcpy(keyd, &urecord->keyd, CF_DIGEST_KEY_SZ); |
857 | as_bytes *b = as_bytes_new_wrap(keyd->digest, CF_DIGEST_KEY_SZ, true); |
858 | return b; |
859 | } |
860 | return NULL; |
861 | } |
862 | |
863 | static int |
864 | udf_record_bin_names(const as_rec *rec, as_rec_bin_names_callback callback, void * udata) |
865 | { |
866 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
867 | if (ret) { |
868 | return 1; |
869 | } |
870 | |
871 | udf_record *urecord = (udf_record *)as_rec_source(rec); |
872 | char * bin_names = NULL; |
873 | if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) { |
874 | uint16_t nbins; |
875 | |
876 | if (urecord->rd->ns->single_bin) { |
877 | nbins = 1; |
878 | bin_names = alloca(1); |
879 | *bin_names = 0; |
880 | } |
881 | else { |
882 | nbins = urecord->rd->n_bins; |
883 | bin_names = alloca(nbins * AS_BIN_NAME_MAX_SZ); |
884 | for (uint16_t i = 0; i < nbins; i++) { |
885 | as_bin *b = &urecord->rd->bins[i]; |
886 | if (! as_bin_inuse(b)) { |
887 | nbins = i; |
888 | break; |
889 | } |
890 | const char * name = as_bin_get_name_from_id(urecord->rd->ns, b->id); |
891 | strcpy(bin_names + (i * AS_BIN_NAME_MAX_SZ), name); |
892 | } |
893 | } |
894 | callback(bin_names, nbins, AS_BIN_NAME_MAX_SZ, udata); |
895 | return 0; |
896 | } |
897 | else { |
898 | cf_warning(AS_UDF, "Error in getting bin names: no record found" ); |
899 | bin_names = alloca(1); |
900 | *bin_names = 0; |
901 | callback(bin_names, 1, AS_BIN_NAME_MAX_SZ, udata); |
902 | return -1; |
903 | } |
904 | } |
905 | |
906 | static uint16_t |
907 | udf_record_numbins(const as_rec * rec) |
908 | { |
909 | int ret = udf_record_param_check(rec, __FILE__, __LINE__); |
910 | if (ret) { |
911 | return 0; |
912 | } |
913 | |
914 | udf_record *urecord = (udf_record *) as_rec_source(rec); |
915 | if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) { |
916 | |
917 | if (urecord->rd->ns->single_bin) { |
918 | return 1; |
919 | } |
920 | |
921 | uint16_t i; |
922 | as_storage_rd *rd = urecord->rd; |
923 | for (i = 0; i < rd->n_bins; i++) { |
924 | as_bin *b = &rd->bins[i]; |
925 | if (! as_bin_inuse(b)) { |
926 | break; |
927 | } |
928 | } |
929 | return i; |
930 | } |
931 | else { |
932 | cf_warning(AS_UDF, "Error in getting numbins: no record found" ); |
933 | return 0; |
934 | } |
935 | } |
936 | |
937 | const as_rec_hooks udf_record_hooks = { |
938 | .get = udf_record_get, |
939 | .set = udf_record_set, |
940 | .remove = udf_record_remove, |
941 | .ttl = udf_record_ttl, |
942 | .last_update_time = udf_record_last_update_time, |
943 | .gen = udf_record_gen, |
944 | .key = udf_record_key, |
945 | .setname = udf_record_setname, |
946 | .destroy = NULL, |
947 | .digest = udf_record_digest, |
948 | .set_ttl = udf_record_set_ttl, |
949 | .drop_key = udf_record_drop_key, |
950 | .bin_names = udf_record_bin_names, |
951 | .numbins = udf_record_numbins |
952 | }; |
953 | |