1/*
2 * udf_record.c
3 *
4 * Copyright (C) 2012-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "base/udf_record.h"
24
25#include <stdbool.h>
26#include <stddef.h>
27#include <stdint.h>
28#include <string.h>
29
30#include "aerospike/as_rec.h"
31#include "aerospike/as_val.h"
32#include "citrusleaf/alloc.h"
33#include "citrusleaf/cf_atomic.h"
34#include "citrusleaf/cf_byte_order.h"
35#include "citrusleaf/cf_clock.h"
36
37#include "fault.h"
38
39#include "base/cfg.h"
40#include "base/datamodel.h"
41#include "base/index.h"
42#include "base/transaction.h"
43#include "storage/storage.h"
44#include "transaction/rw_utils.h"
45#include "transaction/udf.h"
46
47
48/*
49 * Function: Open storage record for passed in udf record
50 * also set up flag like exists / read et al.
51 *
52 * Parameters:
53 * urec : UDF record
54 *
55 * Return value : 0 on success
56 * -1 if the record's bin count exceeds the UDF limit
57 *
58 * Callers:
59 * udf_record_open
60 *
61 * Note: There are no checks, so the caller has to make sure that all
62 * protections are taken and all checks are done.
63 *
64 * Side effect:
65 * Counters will be reset
66 * flag will be set
67 * bins will be opened
68 */
69int
70udf_storage_record_open(udf_record *urecord)
71{
72 cf_debug_digest(AS_UDF, &urecord->tr->keyd, "[ENTER] Opening record key:");
73 as_storage_rd *rd = urecord->rd;
74 as_index *r = urecord->r_ref->r;
75 as_transaction *tr = urecord->tr;
76
77 as_storage_record_open(tr->rsv.ns, r, rd);
78
79 // Deal with delete durability (enterprise only).
80 if ((urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES) != 0 &&
81 set_delete_durablility(tr, rd) != 0) {
82 as_storage_record_close(rd);
83 return -1;
84 }
85
86 as_storage_rd_load_n_bins(rd); // TODO - handle error returned
87
88 if (rd->n_bins > UDF_RECORD_BIN_ULIMIT) {
89 cf_warning(AS_UDF, "record has too many bins (%d) for UDF processing", rd->n_bins);
90 as_storage_record_close(rd);
91 return -1;
92 }
93
94 // if multibin storage, we will use urecord->stack_bins, so set the size appropriately
95 if ( ! tr->rsv.ns->storage_data_in_memory && ! tr->rsv.ns->single_bin ) {
96 rd->n_bins = sizeof(urecord->stack_bins) / sizeof(as_bin);
97 }
98
99 as_storage_rd_load_bins(rd, urecord->stack_bins); // TODO - handle error returned
100 urecord->starting_memory_bytes = as_storage_record_get_n_bytes_memory(rd);
101
102 as_storage_record_get_set_name(rd);
103 as_storage_record_get_key(rd);
104
105 urecord->flag |= UDF_RECORD_FLAG_STORAGE_OPEN;
106
107 cf_detail_digest(AS_UDF, &tr->keyd, "Storage Open: Rec(%p) flag(%x) Digest:", urecord, urecord->flag);
108 return 0;
109}
110
111/*
112 * Function: Close storage record if it open and also set flags
113 *
114 * Parameters:
115 * urec : UDF record
116 *
117 * Return value : 0 in case storage was open
118 * 1 in case storage was not open
119 *
120 * Callers:
121 * udf_record_close
122 *
123 * Side effect:
124 * flag will be reset
125 * bins will be closed
126 */
127int
128udf_storage_record_close(udf_record *urecord)
129{
130 if (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) {
131 as_index_ref *r_ref = urecord->r_ref;
132 as_storage_rd *rd = urecord->rd;
133
134 bool has_bins = as_bin_inuse_has(rd);
135
136 if (r_ref) {
137 if (urecord->flag & UDF_RECORD_FLAG_HAS_UPDATES) {
138 as_storage_record_write(rd);
139
140 // The urecord fields survive as_storage_record_close().
141 urecord->pickle = rd->pickle;
142 urecord->pickle_sz = rd->pickle_sz;
143
144 urecord->flag &= ~UDF_RECORD_FLAG_HAS_UPDATES; // TODO - necessary?
145 }
146
147 as_storage_record_close(rd);
148
149 if (! has_bins) {
150 write_delete_record(r_ref->r, urecord->tr->rsv.tree);
151 }
152 } else {
153 // Should never happen.
154 cf_warning(AS_UDF, "Unexpected Internal Error (null r_ref)");
155 }
156
157 urecord->flag &= ~UDF_RECORD_FLAG_STORAGE_OPEN;
158 cf_detail_digest(AS_UDF, &urecord->tr->keyd, "Storage Close:: Rec(%p) Flag(%x) Digest:",
159 urecord, urecord->flag );
160 return 0;
161 } else {
162 return 1;
163 }
164}
165
166/*
167 * Function: Open storage record for passed in udf record
168 * also set up flag like exists / read et al.
169 * Does as_record_get as well if it is not done yet.
170 *
171 * Parameters:
172 * urec : UDF record
173 *
174 * Return value :
175 * 0 in case record is successfully read
176 * -1 in case record is not found
177 * -2 in case record is found but has expired
178 *
179 * Callers:
180 * query_agg_istream_read
181 */
182int
183udf_record_open(udf_record * urecord)
184{
185 cf_debug_digest(AS_UDF, &urecord->tr->keyd, "[ENTER] Opening record key:");
186 if (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) {
187 cf_info(AS_UDF, "Record already open");
188 return 0;
189 }
190 as_transaction *tr = urecord->tr;
191 as_index_ref *r_ref = urecord->r_ref;
192 as_index_tree *tree = tr->rsv.tree;
193
194 int rec_rv = 0;
195 if (!(urecord->flag & UDF_RECORD_FLAG_OPEN)) {
196 cf_detail(AS_UDF, "Opening Record");
197 rec_rv = as_record_get_live(tree, &tr->keyd, r_ref, tr->rsv.ns);
198 }
199
200 if (!rec_rv) {
201 as_index *r = r_ref->r;
202 // check to see this isn't an expired record waiting to die
203 if (as_record_is_doomed(r, tr->rsv.ns)) {
204 as_record_done(r_ref, tr->rsv.ns);
205 cf_detail(AS_UDF, "udf_record_open: Record has expired cannot read");
206 rec_rv = -2;
207 } else {
208 urecord->flag |= UDF_RECORD_FLAG_OPEN;
209 urecord->flag |= UDF_RECORD_FLAG_PREEXISTS;
210 cf_detail_digest(AS_UDF, &tr->keyd, "Open %p %x Digest:", urecord, urecord->flag);
211 rec_rv = udf_storage_record_open(urecord);
212 }
213 } else {
214 cf_detail_digest(AS_UDF, &urecord->tr->keyd, "udf_record_open: rec_get returned with %d ",
215 rec_rv);
216 }
217 return rec_rv;
218}
219
220/*
221 * Function: Close storage record for udf record. Release
222 * all locks and partition reservation / namespace
223 * reservation etc. if requested.
224 * Also cleans up entire cache (updated from udf)
225 *
226 * Parameters:
227 * urec : UDF record being operated on
228 *
229 * Return value : Nothing
230 *
231 * Callers:
232 * query_agg_istream_read
233 * as_query__agg
234 * udf_record_destroy
235 */
236void
237udf_record_close(udf_record *urecord)
238{
239 as_transaction *tr = urecord->tr;
240 cf_debug_digest(AS_UDF, &tr->keyd, "[ENTER] Closing record key:");
241
242 if (urecord->flag & UDF_RECORD_FLAG_OPEN) {
243 as_index_ref *r_ref = urecord->r_ref;
244 cf_detail(AS_UDF, "Closing Record");
245 udf_storage_record_close(urecord);
246 as_record_done(r_ref, tr->rsv.ns);
247 urecord->flag &= ~UDF_RECORD_FLAG_OPEN;
248 cf_detail_digest(AS_UDF, &urecord->tr->keyd,
249 "Storage Close:: Rec(%p) Flag(%x) Digest:", urecord, urecord->flag );
250 }
251
252 // Replication happens when the main record replicates
253 if (urecord->particle_data) {
254 cf_free(urecord->particle_data);
255 urecord->particle_data = 0;
256 }
257 udf_record_cache_free(urecord);
258}
259
260/*
261 * Function: This function called to reinitialize the udf_record. It sets up
262 * the basic value back to default. Can be called after the UDF
263 * record has been used. Reset the fact that record pre_exits or
264 * was actually read etc.
265 *
266 * Parameters:
267 * urec : UDF record being initialized
268 *
269 * Return value : Nothing
270 *
271 * Callers:
272 * udf_rw_local (parent record before calling UDF)
273 */
274void
275udf_record_init(udf_record *urecord, bool allow_updates)
276{
277 urecord->tr = NULL;
278 urecord->r_ref = NULL;
279 urecord->rd = NULL;
280 urecord->dirty = NULL;
281 urecord->nupdates = 0;
282 urecord->particle_data = NULL;
283 urecord->cur_particle_data = NULL;
284 urecord->end_particle_data = NULL;
285 urecord->starting_memory_bytes = 0;
286
287 // Init flag
288 urecord->flag = UDF_RECORD_FLAG_ISVALID;
289
290 if (allow_updates) {
291 urecord->flag |= UDF_RECORD_FLAG_ALLOW_UPDATES;
292 }
293
294 urecord->keyd = cf_digest_zero;
295 for (uint32_t i = 0; i < UDF_RECORD_BIN_ULIMIT; i++) {
296 urecord->updates[i].particle_buf = NULL;
297 }
298
299 urecord->pickle = NULL;
300 urecord->pickle_sz = 0;
301}
302
303/*
304static int print_buffer(as_buffer * buff) {
305 msgpack_sbuffer sbuf;
306 msgpack_sbuffer_init(&sbuf);
307
308 sbuf.data = buff->data;
309 sbuf.size = buff->size;
310 sbuf.alloc = buff->capacity;
311
312 msgpack_zone mempool;
313 msgpack_zone_init(&mempool, 2048);
314
315 msgpack_object deserialized;
316 msgpack_unpack(sbuf.data, sbuf.size, NULL, &mempool, &deserialized);
317
318 printf("msg_buf:\n");
319 msgpack_object_print(stdout, deserialized);
320 puts("");
321
322 msgpack_zone_destroy(&mempool);
323 return 0;
324}
325*/
326
327/*
328 * Function: Get bin value from cached copy. All the update in a
329 * commit window is not applied to the record directly
330 * but maintained in-memory cache. This function used
331 * to retrieve cached value
332 *
333 * Similar function for get and free of cache
334 *
335 * Return value :
336 * value (as_val) in case of success [for get]
337 * NULL in case of failure
338 * set and free return Nothing
339 *
340 * Callers:
341 * GET and SET
342 * udf_record_get
343 * udf_record_set
344 * udf_record_remove
345 *
346 * FREE
347 * udf_aerospike__execute_updates (when crossing commit window)
348 * udf_record_close (finally closing record)
349 * udf_rw_commit (commit the udf record)
350 */
351static as_val *
352udf_record_cache_get(udf_record * urecord, const char * name)
353{
354 cf_debug(AS_UDF, "[ENTER] BinName(%s) ", name );
355 if ( urecord->nupdates > 0 ) {
356 cf_detail(AS_UDF, "udf_record_get: %s find", name);
357 for ( uint32_t i = 0; i < urecord->nupdates; i++ ) {
358 udf_record_bin * bin = &(urecord->updates[i]);
359 if ( strncmp(name, bin->name, AS_BIN_NAME_MAX_SZ) == 0 ) {
360 cf_detail(AS_UDF, "Bin %s found, type(%d)", name, bin->value->type );
361 return bin->value; // note it's OK if the bin contains a nil
362 }
363 }
364 }
365 return NULL;
366}
367
368void
369udf_record_cache_free(udf_record * urecord)
370{
371 cf_debug(AS_UDF, "[ENTER] NumUpdates(%d) ", urecord->nupdates );
372
373 for (uint32_t i = 0; i < urecord->nupdates; i ++ ) {
374 udf_record_bin * bin = &urecord->updates[i];
375 if ( bin->value != NULL ) {
376 as_val_destroy(bin->value);
377 bin->value = NULL;
378 }
379 }
380
381 for (uint32_t i = 0; i < UDF_RECORD_BIN_ULIMIT; i++) {
382 if (urecord->updates[i].particle_buf) {
383 cf_free(urecord->updates[i].particle_buf);
384 urecord->updates[i].particle_buf = NULL;
385 }
386 }
387 urecord->nupdates = 0;
388 urecord->flag &= ~UDF_RECORD_FLAG_TOO_MANY_BINS;
389}
390
391/**
392 * Set the cache value for a bin, including flags.
393 */
394static void
395udf_record_cache_set(udf_record * urecord, const char * name, as_val * value,
396 bool dirty)
397{
398 cf_debug(AS_UDF, "[ENTER] urecord(%p) name(%p)[%s] dirty(%d)",
399 urecord, name, name, dirty);
400
401 bool modified = false;
402
403 for ( uint32_t i = 0; i < urecord->nupdates; i++ ) {
404 udf_record_bin * bin = &(urecord->updates[i]);
405
406 // bin exists, then we will release old value and set new value.
407 if ( strncmp(name, bin->name, AS_BIN_NAME_MAX_SZ) == 0 ) {
408 cf_detail(AS_UDF, "udf_record_set: %s found", name);
409
410 // release previously set value
411 as_val_destroy(bin->value);
412
413 // set new value, with dirty flag
414 if( value != NULL ) {
415 bin->value = (as_val *) value;
416 }
417 bin->dirty = dirty;
418 cf_detail(AS_UDF, "udf_record_set: %s set for %p:%p", name,
419 urecord, bin->value);
420
421 modified = true;
422 break;
423 }
424 }
425
426 // If not modified, then we will add the bin to the cache
427 if ( ! modified ) {
428 if ( urecord->nupdates < UDF_RECORD_BIN_ULIMIT ) {
429 udf_record_bin * bin = &(urecord->updates[urecord->nupdates]);
430 strncpy(bin->name, name, AS_BIN_NAME_MAX_SZ);
431 bin->value = (as_val *) value;
432 bin->dirty = dirty;
433 urecord->nupdates++;
434 cf_detail(AS_UDF, "udf_record_set: %s not modified, add for %p:%p",
435 name, urecord, bin->value);
436 }
437 else {
438 cf_warning(AS_UDF, "UDF bin limit (%d) exceeded (bin %s)",
439 UDF_RECORD_BIN_ULIMIT, name);
440 urecord->flag |= UDF_RECORD_FLAG_TOO_MANY_BINS;
441 }
442 }
443}
444
445/*
446 * Internal Function: Read the bin from storage and convert it
447 * into as_val and return
448 *
449 * Parameters:
450 * r : udf record
451 * bname: Bin name of the bin which need to be read.
452 *
453 * Return value :
454 * value (as_val *) in case of success
455 * NULL in case of failure
456 *
457 * Description:
458 * Expectation is the record is already open. No checks are
459 * performed in this function. Caller needs to make sure the
460 * record is good to read e.g binname etc.
461 *
462 * NB: as_val which is returned is allocated one. It is callers
463 * responsibility to free else in case it is passed on to
464 * lua ... lua has responsibility of garbage collecting it.
465 * Hence this function call incurs and malloc cost.
466 *
467 * Callers:
468 * udf_record_get
469 */
470as_val *
471udf_record_storage_get(const udf_record *urecord, const char *name)
472{
473 if (!name) {
474 cf_detail(AS_UDF, "Passed Null bin name to storage get");
475 return NULL;
476 }
477 as_bin * bb = as_bin_get(urecord->rd, name);
478
479 if ( !bb ) {
480 cf_detail(AS_UDF, "udf_record_get: bin not found (%s)", name);
481 return NULL;
482 }
483
484 return as_bin_particle_to_asval(bb);
485}
486
487/*
488 * Check and validate parameter before performing operation
489 *
490 * return:
491 * 2 : UDF_ERR_INTERNAL_PARAM
492 * 3 : UDF_ERR_RECORD_IS_NOT_VALID
493 * 4 : UDF_ERR_PARAMETER
494 * 0 : Success
495 *
496 */
497int
498udf_record_param_check(const as_rec *rec, char *fname, int lineno)
499{
500 if (! rec) {
501 cf_warning(AS_UDF, "Invalid Parameter: null record");
502 return UDF_ERR_INTERNAL_PARAMETER;
503 }
504
505 udf_record *urecord = (udf_record *)as_rec_source(rec);
506 if (!urecord) {
507 return UDF_ERR_INTERNAL_PARAMETER;;
508 }
509
510 if (!(urecord->flag & UDF_RECORD_FLAG_ISVALID)) {
511 cf_debug(AS_UDF, "(%s:%d): Trying to Open Invalid Record", fname, lineno);
512 return UDF_ERR_RECORD_NOT_VALID;
513 }
514
515 return 0;
516}
517
518static int
519udf_record_param_check_w_bin(const as_rec *rec, const char *bname, char *fname, int lineno)
520{
521 int rv = udf_record_param_check(rec, fname, lineno);
522
523 if (rv != 0) {
524 return rv;
525 }
526
527 if (! bname) {
528 cf_warning(AS_UDF, "Invalid Parameter: null bin name");
529 return UDF_ERR_INTERNAL_PARAMETER;
530 }
531
532 udf_record *urecord = (udf_record *)as_rec_source(rec);
533 as_namespace *ns = urecord->tr->rsv.ns;
534
535 if (ns->single_bin) {
536 if (*bname != 0) {
537 cf_warning(AS_UDF, "Invalid Parameter: non-empty bin name in single-bin namespace");
538 return UDF_ERR_INTERNAL_PARAMETER;
539 }
540
541 return 0;
542 }
543
544 if (*bname == 0) {
545 cf_warning(AS_UDF, "Invalid Parameter: empty bin name");
546 return UDF_ERR_INTERNAL_PARAMETER;
547 }
548
549 if (strlen(bname) >= AS_BIN_NAME_MAX_SZ) {
550 cf_warning(AS_UDF, "Invalid Parameter: bin name %s too big", bname);
551 return UDF_ERR_PARAMETER;
552 }
553
554 if (! as_bin_name_within_quota(ns, bname)) {
555 cf_warning(AS_UDF, "{%s} exceeded bin name quota", ns->name);
556 return UDF_ERR_PARAMETER;
557 }
558
559 return 0;
560}
561
562/*********************************************************************
563 * INTERFACE FUNCTIONS *
564 * *
565 * See the as_aerospike for the API definition *
566 ********************************************************************/
567static as_val *
568udf_record_get(const as_rec * rec, const char * name)
569{
570 if (udf_record_param_check_w_bin(rec, name, __FILE__, __LINE__)) {
571 return NULL;
572 }
573 udf_record * urecord = (udf_record *) as_rec_source(rec);
574 as_val * value = NULL;
575
576 cf_debug(AS_UDF, "[ENTER] rec(%p) name(%s)", rec, name );
577
578 // Get from cache
579 value = udf_record_cache_get(urecord, name);
580
581 // If value not NULL, then return it.
582 if ( value != NULL ) {
583 return value;
584 }
585
586 // Check in the cache before trying to look up in record
587 // Note: Record may not have been created yet ... Do not
588 // change the order unless you fully understand what you
589 // are doing
590 if ( !(urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) ) {
591 if (udf_record_open(urecord)) { // lazy read the record from storage
592 return NULL;
593 }
594 }
595
596 // Check if storage is available
597 if ( !urecord->rd->ns ) {
598 cf_detail(AS_UDF, "udf_record_get: storage unavailable");
599 return NULL;
600 }
601
602 value = udf_record_storage_get(urecord, name);
603
604 // We have a value, so we will cache it.
605 // DO NOT remove this. We need to cache copy to makes sure ref count
606 // gets decremented post handing this as_val over to the lua world
607 if (value) {
608 udf_record_cache_set(urecord, name, value, false);
609 }
610
611 cf_detail(AS_UDF, "udf_record_get: end (%s) [%p,%p]", name, urecord, value);
612 return value;
613}
614
615static int
616udf_record_set(const as_rec * rec, const char * name, const as_val * value)
617{
618 int ret = udf_record_param_check_w_bin(rec, name, __FILE__, __LINE__);
619 if (ret) {
620 return ret;
621 }
622
623 udf_record * urecord = (udf_record *) as_rec_source(rec);
624 cf_detail(AS_UDF, "udf_record_set: begin (%s)", name);
625 if ( urecord && name ) {
626 udf_record_cache_set(urecord, name, (as_val *) value, true);
627 }
628 cf_detail(AS_UDF, "udf_record_set: end (%s)", name);
629
630 return 0;
631}
632
633static int
634udf_record_set_ttl(const as_rec * rec, uint32_t ttl)
635{
636 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
637 if (ret) {
638 return ret;
639 }
640
641 udf_record * urecord = (udf_record *) as_rec_source(rec);
642 if (!(urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES)) {
643 return -1;
644 }
645
646 urecord->tr->msgp->msg.record_ttl = ttl;
647 urecord->flag |= UDF_RECORD_FLAG_METADATA_UPDATED;
648
649 return 0;
650}
651
652static int
653udf_record_drop_key(const as_rec * rec)
654{
655 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
656 if (ret) {
657 return ret;
658 }
659
660 udf_record * urecord = (udf_record *) as_rec_source(rec);
661 if (!(urecord->flag & UDF_RECORD_FLAG_ALLOW_UPDATES)) {
662 return -1;
663 }
664
665 // Flag the key to be dropped.
666 if (urecord->rd->key) {
667 urecord->rd->key = NULL;
668 urecord->rd->key_size = 0;
669 }
670
671 urecord->flag |= UDF_RECORD_FLAG_METADATA_UPDATED;
672
673 return 0;
674}
675
676static int
677udf_record_remove(const as_rec * rec, const char * name)
678{
679 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
680 if (ret) {
681 return ret;
682 }
683 udf_record * urecord = (udf_record *) as_rec_source(rec);
684
685
686 cf_detail(AS_UDF, "udf_record_remove: begin (%s)", name);
687 if ( urecord && name ) {
688 udf_record_cache_set(urecord, name, (as_val *) &as_nil, true);
689 }
690 cf_detail(AS_UDF, "udf_record_remove: end (%s)", name);
691
692 return 0;
693}
694
695static uint32_t
696udf_record_ttl(const as_rec * rec)
697{
698 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
699 if (ret) {
700 return 0;
701 }
702
703 udf_record * urecord = (udf_record *) as_rec_source(rec);
704
705 if ((urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) {
706 uint32_t now = as_record_void_time_get();
707
708 return urecord->r_ref->r->void_time > now ?
709 urecord->r_ref->r->void_time - now : 0;
710 }
711 else {
712 cf_info(AS_UDF, "Error in getting ttl: no record found");
713 return 0; // since we can't indicate the record doesn't exist
714 }
715 return 0;
716}
717
718static uint64_t
719udf_record_last_update_time(const as_rec * rec)
720{
721 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
722 if (ret) {
723 return 0;
724 }
725
726 udf_record * urecord = (udf_record *) as_rec_source(rec);
727 if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) {
728 return urecord->r_ref->r->last_update_time;
729 }
730 else {
731 cf_warning(AS_UDF, "Error getting last update time: no record found");
732 return 0;
733 }
734}
735
736static uint16_t
737udf_record_gen(const as_rec * rec)
738{
739 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
740 if (ret) {
741 return 0;
742 }
743
744 udf_record * urecord = (udf_record *) as_rec_source(rec);
745 if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN) != 0) {
746 return plain_generation(urecord->rd->r->generation, urecord->rd->ns);
747 }
748 else {
749 cf_warning(AS_UDF, "Error in getting generation: no record found");
750 return 0;
751 }
752}
753
754// Local utility.
755static as_val *
756as_val_from_flat_key(const uint8_t * flat_key, uint32_t size)
757{
758 uint8_t type = *flat_key;
759 const uint8_t * key = flat_key + 1;
760
761 switch ( type ) {
762 case AS_PARTICLE_TYPE_INTEGER:
763 if (size != 1 + sizeof(uint64_t)) {
764 return NULL;
765 }
766 // Flat integer keys are in big-endian order.
767 return (as_val *) as_integer_new(cf_swap_from_be64(*(int64_t *)key));
768 case AS_PARTICLE_TYPE_STRING:
769 {
770 // Key length is size - 1, then +1 for null-termination.
771 char * buf = cf_malloc(size);
772 uint32_t len = size - 1;
773 memcpy(buf, key, len);
774 buf[len] = '\0';
775
776 return (as_val *) as_string_new(buf, true);
777 }
778 case AS_PARTICLE_TYPE_BLOB:
779 {
780 uint32_t blob_size = size - 1;
781 uint8_t *buf = cf_malloc(blob_size);
782
783 memcpy(buf, key, blob_size);
784
785 return (as_val *) as_bytes_new_wrap(buf, blob_size, true);
786 }
787 default:
788 return NULL;
789 }
790}
791
792static as_val *
793udf_record_key(const as_rec * rec)
794{
795 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
796 if (ret) {
797 return NULL;
798 }
799
800 udf_record * urecord = (udf_record *) as_rec_source(rec);
801 if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) {
802 if (urecord->rd->key) {
803 return as_val_from_flat_key(urecord->rd->key, urecord->rd->key_size);
804 }
805 // TODO - perhaps look for the key in the message.
806 return NULL;
807 }
808 else {
809 cf_warning(AS_UDF, "Error in getting key: no record found");
810 return NULL;
811 }
812}
813
814static const char *
815udf_record_setname(const as_rec * rec)
816{
817 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
818 if (ret) {
819 return NULL;
820 }
821
822 udf_record * urecord = (udf_record *) as_rec_source(rec);
823 if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) {
824 return as_index_get_set_name(urecord->r_ref->r, urecord->rd->ns);
825 }
826 else {
827 cf_warning(AS_UDF, "Error in getting set name: no record found");
828 return NULL;
829 }
830}
831
832bool
833udf_record_destroy(as_rec *rec)
834{
835 if (!rec) {
836 return false;
837 }
838
839 udf_record *urecord = (udf_record *) as_rec_source(rec);
840 udf_record_close(urecord);
841 as_rec_destroy(rec);
842 return true;
843}
844
845static as_bytes *
846udf_record_digest(const as_rec *rec)
847{
848 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
849 if (ret) {
850 return NULL;
851 }
852
853 udf_record *urecord = (udf_record *)as_rec_source(rec);
854 if (urecord && urecord->flag & UDF_RECORD_FLAG_OPEN) {
855 cf_digest *keyd = cf_malloc(sizeof(cf_digest));
856 memcpy(keyd, &urecord->keyd, CF_DIGEST_KEY_SZ);
857 as_bytes *b = as_bytes_new_wrap(keyd->digest, CF_DIGEST_KEY_SZ, true);
858 return b;
859 }
860 return NULL;
861}
862
863static int
864udf_record_bin_names(const as_rec *rec, as_rec_bin_names_callback callback, void * udata)
865{
866 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
867 if (ret) {
868 return 1;
869 }
870
871 udf_record *urecord = (udf_record *)as_rec_source(rec);
872 char * bin_names = NULL;
873 if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) {
874 uint16_t nbins;
875
876 if (urecord->rd->ns->single_bin) {
877 nbins = 1;
878 bin_names = alloca(1);
879 *bin_names = 0;
880 }
881 else {
882 nbins = urecord->rd->n_bins;
883 bin_names = alloca(nbins * AS_BIN_NAME_MAX_SZ);
884 for (uint16_t i = 0; i < nbins; i++) {
885 as_bin *b = &urecord->rd->bins[i];
886 if (! as_bin_inuse(b)) {
887 nbins = i;
888 break;
889 }
890 const char * name = as_bin_get_name_from_id(urecord->rd->ns, b->id);
891 strcpy(bin_names + (i * AS_BIN_NAME_MAX_SZ), name);
892 }
893 }
894 callback(bin_names, nbins, AS_BIN_NAME_MAX_SZ, udata);
895 return 0;
896 }
897 else {
898 cf_warning(AS_UDF, "Error in getting bin names: no record found");
899 bin_names = alloca(1);
900 *bin_names = 0;
901 callback(bin_names, 1, AS_BIN_NAME_MAX_SZ, udata);
902 return -1;
903 }
904}
905
906static uint16_t
907udf_record_numbins(const as_rec * rec)
908{
909 int ret = udf_record_param_check(rec, __FILE__, __LINE__);
910 if (ret) {
911 return 0;
912 }
913
914 udf_record *urecord = (udf_record *) as_rec_source(rec);
915 if (urecord && (urecord->flag & UDF_RECORD_FLAG_STORAGE_OPEN)) {
916
917 if (urecord->rd->ns->single_bin) {
918 return 1;
919 }
920
921 uint16_t i;
922 as_storage_rd *rd = urecord->rd;
923 for (i = 0; i < rd->n_bins; i++) {
924 as_bin *b = &rd->bins[i];
925 if (! as_bin_inuse(b)) {
926 break;
927 }
928 }
929 return i;
930 }
931 else {
932 cf_warning(AS_UDF, "Error in getting numbins: no record found");
933 return 0;
934 }
935}
936
937const as_rec_hooks udf_record_hooks = {
938 .get = udf_record_get,
939 .set = udf_record_set,
940 .remove = udf_record_remove,
941 .ttl = udf_record_ttl,
942 .last_update_time = udf_record_last_update_time,
943 .gen = udf_record_gen,
944 .key = udf_record_key,
945 .setname = udf_record_setname,
946 .destroy = NULL,
947 .digest = udf_record_digest,
948 .set_ttl = udf_record_set_ttl,
949 .drop_key = udf_record_drop_key,
950 .bin_names = udf_record_bin_names,
951 .numbins = udf_record_numbins
952};
953