1/*
2 * Copyright 2008-2018 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#include <aerospike/mod_lua.h>
18
19#include <aerospike/as_aerospike.h>
20#include <aerospike/as_atomic.h>
21#include <aerospike/as_dir.h>
22#include <aerospike/as_log_macros.h>
23#include <aerospike/as_types.h>
24#include <aerospike/mod_lua_aerospike.h>
25#include <aerospike/mod_lua_bytes.h>
26#include <aerospike/mod_lua_config.h>
27#include <aerospike/mod_lua_geojson.h>
28#include <aerospike/mod_lua_iterator.h>
29#include <aerospike/mod_lua_list.h>
30#include <aerospike/mod_lua_map.h>
31#include <aerospike/mod_lua_record.h>
32#include <aerospike/mod_lua_stream.h>
33#include <aerospike/mod_lua_val.h>
34#include <citrusleaf/alloc.h>
35#include <citrusleaf/cf_hash_math.h>
36#include <citrusleaf/cf_queue.h>
37
38#include <lauxlib.h>
39#include <lua.h>
40#include <lualib.h>
41#include <pthread.h>
42#include <setjmp.h> // needed for gracefully handling lua panics
43#include <stdio.h>
44#include <stdlib.h>
45#include <string.h>
46#include <sys/stat.h>
47#include <sys/types.h>
48
49#include "internal.h"
50
51pthread_rwlock_t g_cache_lock = PTHREAD_RWLOCK_INITIALIZER;
52#define RDLOCK pthread_rwlock_rdlock(&g_cache_lock)
53#define WRLOCK pthread_rwlock_wrlock(&g_cache_lock)
54#define UNLOCK pthread_rwlock_unlock(&g_cache_lock)
55/******************************************************************************
56 * MACROS
57 ******************************************************************************/
58
59#define CACHE_TABLE_ENTRY_MAX 128 // Doesn't appear to be used
60#define CACHE_ENTRY_KEY_MAX 128
61#define CACHE_ENTRY_GEN_MAX 128
62#define CACHE_ENTRY_STATE_MAX 128
63#define CACHE_ENTRY_STATE_MIN 10
64
65#define LUA_PARAM_COUNT_THRESHOLD 20 // warn if a function call exceeds this
66
67#define MOD_LUA_CONFIG_USRPATH "/opt/aerospike/usr/udf/lua"
68
69/******************************************************************************
70 * TYPES
71 ******************************************************************************/
72
73struct cache_entry_s;
74typedef struct cache_entry_s cache_entry;
75
76struct cache_item_s;
77typedef struct cache_item_s cache_item;
78
79struct cache_entry_s {
80 char key[CACHE_ENTRY_KEY_MAX];
81 char gen[CACHE_ENTRY_GEN_MAX];
82 uint64_t cache_miss;
83 uint64_t total;
84 cf_queue* lua_state_q;
85};
86
87struct cache_item_s {
88 char key[CACHE_ENTRY_KEY_MAX];
89 char gen[CACHE_ENTRY_GEN_MAX];
90 lua_State * state;
91};
92
93struct context_s;
94typedef struct context_s context;
95
96struct context_s {
97 mod_lua_config config;
98 pthread_rwlock_t * lock;
99};
100
101typedef struct lua_hash_ele_s {
102 struct lua_hash_ele_s* next;
103 cache_entry* value;
104 char key[]; // key_size bytes of key
105} lua_hash_ele;
106
107typedef struct lua_hash_s {
108 uint32_t ele_size;
109 uint32_t n_rows;
110 uint8_t* table;
111} lua_hash;
112
113/******************************************************************************
114 * VARIABLES
115 ******************************************************************************/
116
117static pthread_rwlock_t lock;
118
119static lua_hash* g_lua_hash = NULL;
120
121/**
122 * Lua Module Specific Data
123 * This will populate the module.source field
124 */
125static context mod_lua_source = {
126 .config = {
127 .cache_enabled = true,
128 .user_path = MOD_LUA_CONFIG_USRPATH,
129 .server_mode = true
130 },
131 .lock = NULL
132};
133
134
135/******************************************************************************
136 * STATIC FORWARD DECLARATIONS
137 ******************************************************************************/
138
139static int update(as_module *, as_module_event *);
140static int apply_record(as_module *, as_udf_context *, const char *, const char *, as_rec *, as_list *, as_result *);
141static int apply_stream(as_module *, as_udf_context *, const char *, const char *, as_stream *, as_list *, as_stream *, as_result *);
142
143static lua_State * create_state(context *, const char *filename);
144static int poll_state(context *, cache_item *);
145static int offer_state(context *, cache_item *);
146static void destroy_cache_entry(cache_entry *centry);
147static inline void lua_hash_call_cb_if(void (*cb)(cache_entry *), cache_entry *centry);
148static inline lua_hash_ele* lua_hash_get_row_head(const lua_hash* h, const char* key);
149
150
151/******************************************************************************
152 * FORWARD DECLARATIONS
153 *****************************************************************************/
154
155void lua_hash_clear(lua_hash* h, void (*cb)(cache_entry *));
156lua_hash* lua_hash_create(uint32_t key_size, uint32_t n_rows);
157void lua_hash_destroy(lua_hash* h);
158bool lua_hash_get(const lua_hash* h, const char* key, cache_entry** p_value);
159cache_entry* lua_hash_put(lua_hash* h, const char* key, cache_entry* value);
160cache_entry* lua_hash_remove(lua_hash* h, const char* key);
161
162
163/******************************************************************************
164 * FUNCTIONS
165 ******************************************************************************/
166
167static inline int cache_entry_cleanup(cache_entry * centry) {
168 lua_State *l = NULL;
169 while(cf_queue_pop(centry->lua_state_q, &l, CF_QUEUE_NOWAIT) == CF_QUEUE_OK) {
170 lua_close(l);
171 }
172 return 0;
173}
174
175static inline int cache_entry_populate(context *ctx, cache_entry *centry, const char *key) {
176 lua_State *l = NULL;
177 for ( int i = 0; i < CACHE_ENTRY_STATE_MIN; i++ ) {
178 l = create_state(ctx, key);
179 if (l) cf_queue_push(centry->lua_state_q, &l);
180 }
181 return 0;
182}
183
184/**
185 * Clear the entry:
186 * - truncate the key
187 * - truncate the gen
188 * - release all lua_States
189 * - set size to 0
190 */
191static inline int cache_entry_init(context * ctx, cache_entry * centry, const char *key, const char *gen) {
192 cache_entry_cleanup(centry);
193 cache_entry_populate(ctx, centry, key);
194 strncpy(centry->key, key, CACHE_ENTRY_KEY_MAX - 1);
195 strncpy(centry->gen, gen, CACHE_ENTRY_GEN_MAX - 1);
196
197 return 0;
198}
199
200int
201cache_rm(context* ctx, const char *key)
202{
203 if (key == NULL || *key == '\0') {
204 return 0;
205 }
206
207 WRLOCK;
208 cache_entry* centry = lua_hash_remove(g_lua_hash, key);
209 UNLOCK;
210
211 if (centry != NULL) {
212 destroy_cache_entry(centry);
213 }
214
215 return 0;
216}
217
218int cache_init(context * ctx, const char *key, const char * gen) {
219 if (key == NULL || *key == '\0') {
220 return 0;
221 }
222
223 cache_entry * centry = NULL;
224 WRLOCK;
225
226 if (! lua_hash_get(g_lua_hash, key, &centry)) {
227 centry = cf_malloc(sizeof(cache_entry));
228 as_store_uint64(&centry->total, 0);
229 as_store_uint64(&centry->cache_miss, 0);
230 // Start Small and grow (as necessary) up to the max
231 centry->lua_state_q = cf_queue_create(sizeof(lua_State *), true);
232 cache_entry_init(ctx, centry, key, gen);
233 lua_hash_put(g_lua_hash, key, centry);
234 UNLOCK;
235 as_log_trace("[CACHE] Added [%s:%p]", key, centry)
236 } else {
237 UNLOCK;
238 cache_entry_init(ctx, centry, key, gen);
239 centry = 0;
240 }
241 return 0;
242}
243
244static int cache_remove_file(context * ctx, const char * filename) {
245 char key[CACHE_ENTRY_KEY_MAX];
246 if (as_strncpy(key, filename, sizeof(key))) {
247 as_log_error("LUA cache remove failed : filename truncated %s", key);
248 return -1;
249 }
250
251 char* p = strrchr(key, '.');
252
253 if (p) {
254 *p = '\0';
255 }
256 cache_rm(ctx, key);
257 return 0;
258}
259
260static int cache_add_file(context * ctx, const char * filename) {
261 char key[CACHE_ENTRY_KEY_MAX];
262 if (as_strncpy(key, filename, sizeof(key))) {
263 as_log_error("LUA registration failed : filename truncated %s", key);
264 return -1;
265 }
266
267 char *tmp_char = strrchr(key, '.');
268 if ( !tmp_char // Filename without extension
269 || key == tmp_char // '.' as first character
270 || strlen(tmp_char) <= 1) // '.' in filename , but no extension e.g. "abc."
271 {
272 as_log_error("LUA registration failed : Invalid filename %s", filename);
273 return -1;
274 }
275 *tmp_char = '\0';
276
277 char gen[CACHE_ENTRY_GEN_MAX];
278 gen[0] = 0;
279
280 cache_init(ctx, key, gen);
281 return 0;
282}
283
284static char * dropext(char * name, size_t name_len, const char * ext, size_t ext_len) {
285 char * p = (name + name_len - ext_len);
286 if (ext_len < name_len && strncmp(p, ext, ext_len) == 0) {
287 *p = '\0';
288 return name;
289 }
290 return NULL;
291}
292
293static bool hasext(const char * name, size_t name_len, const char * ext, size_t ext_len) {
294 const char * p = (name + name_len - ext_len);
295 if (ext_len < name_len && strncmp(p, ext, ext_len) == 0) {
296 return true;
297 }
298 return false;
299}
300
301static int cache_scan_dir(context * ctx, const char * directory) {
302
303 as_dir dir;
304 const char* entry;
305
306 if (!as_dir_open(&dir, directory)) {
307 return -1;
308 }
309
310 while ( (entry = as_dir_read(&dir)) ) {
311
312 char key[CACHE_ENTRY_KEY_MAX];
313 if (as_strncpy(key, entry, sizeof(key))) {
314 as_log_error("LUA cache dir scan skipping truncated entry %s", key);
315 continue;
316 }
317
318 char gen[CACHE_ENTRY_GEN_MAX];
319 gen[0] = 0;
320
321 char * base = NULL;
322 size_t len = strlen(key);
323
324 // If file ends with ".lua", then drop ".lua"
325 base = dropext(key, len, ".lua", 4);
326 if ( base != NULL ) {
327 cache_init(ctx, key, gen);
328 continue;
329 }
330
331 // If file ends with ".so", then drop ".so"
332 base = dropext(key, len, ".so", 3);
333 if ( base != NULL ) {
334 cache_init(ctx, key, gen);
335 continue;
336 }
337 }
338
339 as_dir_close(&dir);
340 return 0;
341}
342
343static void
344destroy_cache_entry(cache_entry *centry)
345{
346 cache_entry_cleanup(centry);
347 cf_queue_destroy(centry->lua_state_q);
348 cf_free(centry);
349}
350/**
351 * Module Configurator.
352 * This configures and reconfigures the module. This can be called an
353 * arbitrary number of times during the lifetime of the server.
354 *
355 * @param m the module being configured.
356 * @return 0 = success, 1 = source is NULL, 2 = event.data is invalid, 3 = unable to create lock, 4 = unabled to create cache
357 * @sychronization: Caller should have a write lock
358 */
359static int update(as_module * m, as_module_event * e) {
360
361 context * ctx = (context *) (m ? m->source : NULL);
362
363 if ( ctx == NULL ) return 1;
364
365 switch ( e->type ) {
366 case AS_MODULE_EVENT_CONFIGURE: {
367 mod_lua_config * config = (mod_lua_config *) e->data.config;
368
369 ctx->config.server_mode = config->server_mode;
370 ctx->config.cache_enabled = config->cache_enabled;
371
372 if (g_lua_hash == NULL && ctx->config.cache_enabled) {
373 // No Internal Lock
374 g_lua_hash = lua_hash_create(CACHE_ENTRY_KEY_MAX, 64);
375 }
376
377 if ( ctx->lock == NULL ) {
378 ctx->lock = &lock;
379
380#if defined(_MSC_VER)
381 if (0 != pthread_rwlock_init(ctx->lock, NULL)) {
382 return 3;
383 }
384#else
385 pthread_rwlockattr_t rwattr;
386 if (0 != pthread_rwlockattr_init(&rwattr)) {
387 return 3;
388 }
389
390#if defined(__USE_UNIX98) || defined (__USE_XOPEN2K)
391 if (0 != pthread_rwlockattr_setkind_np(&rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)) {
392 return 3;
393 }
394#endif
395 if (0 != pthread_rwlock_init(ctx->lock, &rwattr)) {
396 return 3;
397 }
398#endif
399 }
400
401 // Attempt to open the directory.
402 // If it opens, then set the ctx value.
403 // Otherwise, we alert the user of the error when a UDF is called. (for now)
404 if ( config->user_path[0] != '\0' ) {
405 if (!as_dir_exists(config->user_path)) {
406 ctx->config.user_path[0] = '\0';
407 strncpy(ctx->config.user_path+1, config->user_path, 255);
408 }
409 else {
410 strncpy(ctx->config.user_path, config->user_path, 256);
411 }
412 }
413
414 if ( ctx->config.cache_enabled ) {
415 // Set up the USER path
416 cache_scan_dir(ctx, ctx->config.user_path);
417 }
418
419 break;
420 }
421 case AS_MODULE_EVENT_FILE_SCAN: {
422 if ( ctx->config.user_path[0] == '\0' ) return 2;
423 if ( ctx->config.cache_enabled ) {
424 // Set up the USER path
425 cache_scan_dir(ctx, ctx->config.user_path);
426 }
427 break;
428 }
429 case AS_MODULE_EVENT_FILE_ADD: {
430 if ( e->data.filename == NULL ) return 2;
431 if (ctx->config.cache_enabled) {
432 if (cache_add_file(ctx, e->data.filename)) {
433 return 4; //Why 4? - No defined error codes, so returning distinct nonzero value.
434 }
435 }
436 break;
437 }
438 case AS_MODULE_EVENT_FILE_REMOVE: {
439 if ( e->data.filename == NULL ) return 2;
440 if ( ctx->config.cache_enabled ) {
441 if (cache_remove_file(ctx, e->data.filename) != 0) {
442 return 2;
443 }
444 }
445 break;
446 }
447 case AS_MODULE_EVENT_CLEAR_CACHE: {
448 if (ctx->config.cache_enabled) {
449 WRLOCK;
450 lua_hash_clear(g_lua_hash, &destroy_cache_entry);
451 UNLOCK;
452 }
453 break;
454 }
455 }
456
457 return 0;
458}
459
460static void package_path_set(lua_State * l, char * user_path) {
461 int stack = 0;
462
463 lua_getglobal(l, "package");
464 lua_getfield(l, -1, "path");
465 stack += 1;
466
467 lua_pushstring(l, ";");
468 lua_pushstring(l, user_path);
469 lua_pushstring(l, "/?.lua");
470 stack += 3;
471
472 lua_concat(l, stack);
473
474 lua_setfield(l, -2, "path");
475 lua_pop(l, 1);
476}
477
478static void package_cpath_set(lua_State * l, char * user_path) {
479 int stack = 0;
480
481 lua_getglobal(l, "package");
482 lua_getfield(l, -1, "cpath");
483 stack += 1;
484
485 lua_pushstring(l, ";");
486 lua_pushstring(l, user_path);
487 lua_pushstring(l, "/?.so");
488 stack += 3;
489
490 lua_concat(l, stack);
491
492 lua_setfield(l, -2, "cpath");
493 lua_pop(l, 1);
494}
495
496/**
497 * Checks whether a module is native (i.e., a ".so" file.)
498 *
499 * @return true if native, otherwise false
500 */
501static bool is_native_module(context * ctx, const char *filename)
502{
503 struct stat buf;
504 char fn[1024];
505
506 snprintf(fn, sizeof(fn), "%s/%s.so", ctx->config.user_path, filename);
507 if (!stat(fn, &buf)) {
508 return true;
509 }
510
511 return false;
512}
513
514extern const char as_lua_as[];
515extern const char as_lua_stream_ops[];
516extern const char as_lua_aerospike[];
517
518extern size_t as_lua_as_size;
519extern size_t as_lua_stream_ops_size;
520extern size_t as_lua_aerospike_size;
521
522static bool load_buffer(lua_State* l, const char* script, size_t size, const char* name)
523{
524 if (luaL_loadbuffer(l, script, size - 1, name) || lua_pcall(l, 0, LUA_MULTRET, 0)) {
525 as_log_error("Failed to load lua string: %s %d", name, (int)size);
526 lua_close(l);
527 return false;
528 }
529 return true;
530}
531
532/**
533 * Creates a new context (lua_State) populating it with default values.
534 *
535 * @return a new lua_State
536 */
537static lua_State * create_state(context * ctx, const char * filename) {
538 lua_State * l = NULL;
539
540 l = lua_open();
541
542 luaL_openlibs(l);
543
544 package_path_set(l, ctx->config.user_path);
545 package_cpath_set(l, ctx->config.user_path);
546
547 mod_lua_aerospike_register(l);
548 mod_lua_record_register(l);
549 mod_lua_iterator_register(l);
550 mod_lua_stream_register(l);
551 mod_lua_list_register(l);
552 mod_lua_map_register(l);
553 mod_lua_bytes_register(l);
554 mod_lua_geojson_register(l);
555
556 if (! load_buffer(l, as_lua_as, as_lua_as_size, "as.lua")) {
557 return NULL;
558 }
559
560 if (! load_buffer(l, as_lua_stream_ops, as_lua_stream_ops_size, "stream_ops.lua")) {
561 return NULL;
562 }
563
564 if (! load_buffer(l, as_lua_aerospike, as_lua_aerospike_size, "aerospike.lua")) {
565 return NULL;
566 }
567
568 if (is_native_module(ctx, filename)) {
569 as_log_trace("Not requiring native module: %s", filename);
570 return l;
571 }
572
573 lua_getglobal(l, "require");
574 lua_pushstring(l, filename);
575 int rc = lua_pcall(l, 1, 1, 0);
576 if (rc) {
577 as_log_error("Lua Create Error: %s", lua_tostring(l, -1));
578 lua_close(l);
579 return NULL;
580 }
581 as_log_debug("Size of the lua state created for the file %s in KB %d",
582 filename, lua_gc(l, LUA_GCCOUNT,0));
583 return l;
584}
585
586/**
587 * Leases a context (lua_State). This will attempt to reuse an
588 * existing context or create a new one as needed.
589 *
590 * @param ctx mod lua context.
591 * @param citem context cache item to be populated.
592 * @return populate citem with lua_State to be used as the context.
593 * @return 0 on success, otherwise 1
594 */
595static int poll_state(context * ctx, cache_item * citem) {
596 if ( ctx->config.cache_enabled == true ) {
597 cache_entry * centry = NULL;
598 RDLOCK;
599 if (lua_hash_get(g_lua_hash, citem->key, &centry)) {
600 uint64_t miss;
601 if (cf_queue_pop(centry->lua_state_q, &citem->state, CF_QUEUE_NOWAIT) != CF_QUEUE_EMPTY) {
602 strncpy(citem->key, centry->key, CACHE_ENTRY_KEY_MAX);
603 strncpy(citem->gen, centry->gen, CACHE_ENTRY_GEN_MAX);
604 as_log_trace("[CACHE] took state: %s", citem->key);
605 miss = as_load_uint64(&centry->cache_miss);
606 } else {
607 as_log_trace("[CACHE] miss state: %s", citem->key);
608 miss = as_aaf_uint64(&centry->cache_miss, 1);
609 citem->state = NULL;
610 }
611 uint64_t total = as_aaf_uint64(&centry->total, 1);
612 centry = 0;
613 as_log_trace("[CACHE] Miss %lu : Total %lu", miss, total);
614 } else {
615 centry = NULL;
616 }
617 UNLOCK;
618 }
619 else {
620 as_log_trace("[CACHE] is disabled.");
621 }
622
623 if ( citem->state == NULL ) {
624 citem->gen[0] = '\0';
625 pthread_rwlock_rdlock(ctx->lock);
626 citem->state = create_state(ctx, citem->key);
627 pthread_rwlock_unlock(ctx->lock);
628 if (!citem->state) {
629 as_log_trace("[CACHE] state create failed: %s", citem->key);
630 return 1;
631 } else {
632 as_log_trace("[CACHE] state created: %s", citem->key);
633 }
634 }
635
636 return 0;
637}
638
639// Soon we will make these defaults "overridable" via config so that someone
640// who actually knew what they were doing could make adjustments.
641// Experimentation showed that at least 40 steps were needed to clean up after
642// a simple UDF (in our system), so for us, that is the "light threshold". (tjl)
643#define LUA_KMEM_GC_THRESHOLD 1024*10 // Perform GC if we've used over 10Mb
644#define LUA_GC_STEP_THRESHOLD 40 // Perform 40 "iterations" of STEP GC
645
646// NB: When it comes to Lua garbage collection, it appears to be more art than
647// science. When I fully understand what's really going on with GC, I will
648// document it more clearly (and fully). (tjl)
649// For now, here's the story:
650// Much has already been done with Lua GC measurement, and performance measurements
651// showed that a "light GC" with step data=2 provided for good performance.
652// However, once a customer build a pretty heavy-duty UDF (one that iteratively
653// built a list that exceeded Bin limits), the server ran out of memory and was
654// then killed by the "OOM-KILLER". Initially, changing the light "Step 2" to
655// a heavy "full GC" solved the OOM problem, but ran noticeably slower
656// (about 1/2 to 2/3 the speed). So, for now, we've decided to take the "tiered
657// approach", where for low thresholds we delay GC, and then when we decide to
658// perform GC, we try "light GC first", then "Heavy GC" if that fails.
659//
660// The current thresholds are part guess and part measurement. The most trivial
661// UDF seems to use 99k of Lua space in our environment. That's probably because
662// of the automatic cost of our existing system UDFs (including LDTs). So, we
663// set the "start GC" threshold at 200k, at which point we perform "light GC".
664// Our measurements showed that we needed Step Data=40 to successfully perform
665// a cycle with a relatively simple UDF (read a record bin, update it and write
666// it back). For heavier UDFs the simple step GC will not complete a cycle,
667// and so we then enlist the FULL GC to clean up.
668//
669// We've tested this approach for (sufficiently) large runs of small and large
670// UDFs, and we appear to get the best possible performance coupled with the
671// safety of full GC when we need it. (May 9, 2014 tjl)
672
673/**
674 * Release the context.
675 *
676 * @param m the module from which the context was leased from.
677 * @param filename name of the udf file
678 * @param l the context being released
679 * @return 0 on success, otherwise 1
680 */
681static int offer_state(context * ctx, cache_item * citem) {
682 int rc;
683
684 if ( ctx->config.cache_enabled == true ) {
685 // For Lua GC, we take the tiered approach, where if mem-used is below
686 // our threshold, we don't do anything (we wait until later). If
687 // we should do "something", then we try "GC Light" first and look at
688 // the result. If that didn't work (didn't complete a cycle), then we
689 // call in the heavy artillery, "GC Heavy". (tjl: May 2014).
690 // Note that garbage collection should be done outside the spinlock.
691 int kmem_used = lua_gc(citem->state, LUA_GCCOUNT, 0);
692 if ( kmem_used > LUA_KMEM_GC_THRESHOLD ) {
693 // We will want some sort of counter here, but can't use a
694 // g_config-related stat because g_config is not visible here.
695 // This comment is a placeholder that we need to add some sort
696 // of bridge beween the AS world and the mod-lua world.
697 // as_incr_uint64(&g_config.stat_lua_gc_step);
698 if ((rc = lua_gc(citem->state, LUA_GCSTEP, 40)) != 1) {
699 lua_gc(citem->state, LUA_GCCOLLECT, 200);
700 // We will want some sort of counter here, but can't use g_config
701 // as_incr_uint64(&g_config.stat_lua_gc_full);
702 }
703 }
704 // else {
705 // We will want some sort of counter here, but can't use g_config
706 // as_incr_uint64(&g_config.stat_lua_gc_delay);
707 // }
708
709 cache_entry *centry = NULL;
710 RDLOCK;
711 if (lua_hash_get(g_lua_hash, citem->key, &centry) ) {
712 as_log_trace("[CACHE] found entry: %s", citem->key);
713 if (( cf_queue_sz(centry->lua_state_q) < CACHE_ENTRY_STATE_MAX )
714 && ( !strncmp(centry->gen, citem->gen, CACHE_ENTRY_GEN_MAX) )) {
715 cf_queue_push(centry->lua_state_q, &citem->state);
716 as_log_trace("[CACHE] returning state: %s", citem->key);
717 citem->state = NULL;
718 }
719 centry = 0;
720 }
721 else {
722 as_log_trace("[CACHE] entry not found: %s", citem->key);
723 }
724 UNLOCK;
725 }
726 else {
727 as_log_trace("[CACHE] is disabled.");
728 }
729
730 // l is not NULL
731 // This means that it was not returned to the cache.
732 // So, we free it up.
733 if ( citem->state != NULL) {
734 lua_close(citem->state);
735 as_log_trace("[CACHE] state closed: %s", citem->key);
736 }
737
738 return 0;
739}
740
741
742typedef struct {
743 lua_State * l;
744 uint32_t count;
745} pushargs_data;
746
747/**
748 * Pushes arguments into the Lua stack.
749 * We scope the arguments to Lua, so Lua is responsible for releasing them.
750 */
751static bool pushargs_foreach(as_val * val, void * context) {
752 pushargs_data * data = (pushargs_data *) context;
753 data->count += mod_lua_pushval(data->l, val);
754 return true;
755}
756
757/**
758 * Pushes arguments from a list on to the stack
759 *
760 * @param l the lua_State to push arguments onto
761 * @param args the list containing the arguments
762 * @return the number of arguments pushed onto the stack.
763 */
764static int pushargs(lua_State * l, as_list * args) {
765 pushargs_data data = {
766 .l = l,
767 .count = 0
768 };
769
770 as_list_foreach(args, pushargs_foreach, &data);
771 as_log_trace("pushargs: %d", data.count);
772 return data.count;
773}
774
775static int handle_error(lua_State * l) {
776 const char * msg = luaL_optstring(l, 1, 0);
777 as_log_error("Lua Runtime Error: %s", msg);
778 return 1;
779}
780
781
782#ifndef LUA_DEBUG_HOOK
783static as_timer g_timer = {
784 .is_malloc = false,
785 .source = NULL,
786 .hooks = NULL
787};
788
789/**
790 * Lua debug hook to check for a timeout.
791 */
792static void check_timer(lua_State *L, lua_Debug *ar)
793{
794 as_log_trace("%s %p", __func__, &g_timer);
795
796 if (ar->event == LUA_HOOKCOUNT) {
797 if (as_timer_timedout(&g_timer)) {
798 luaL_error(L, "UDF Execution Timeout");
799 }
800 }
801}
802#endif
803
804static int apply(lua_State * l, as_udf_context *udf_ctx, int err, int argc, as_result * res, bool is_stream) {
805
806 as_log_trace("apply");
807
808#ifndef LUA_DEBUG_HOOK
809 if ( !g_timer.hooks && udf_ctx->timer ) {
810 g_timer.hooks = udf_ctx->timer->hooks;
811 }
812
813 if ( udf_ctx->timer ) {
814 uint64_t slice = as_timer_timeslice(udf_ctx->timer);
815 as_log_trace("setting lua_debug hook (%p), count = %lu, thread ID = %lu", &check_timer, slice, pthread_self());
816 lua_sethook(l, &check_timer, LUA_MASKCOUNT, (int)slice);
817 }
818#endif
819
820 // call apply_record(f, r, ...)
821 as_log_trace("call function");
822 int rc = lua_pcall(l, argc, 1, err);
823
824 as_log_trace("rc = %d", rc);
825
826 // Convert the return value from a lua type to a val type
827 as_log_trace("convert lua type to val");
828
829 if ( rc == 0 ) { // indicates lua-execution success
830 if ( (is_stream == false) && (res != NULL) ) { // if is_stream is true, no need to set result as success
831 as_val * rv = mod_lua_retval(l);
832 as_result_setsuccess(res, rv);
833 }
834 }
835 else {
836 if ( res != NULL ) {
837 as_val * rv = mod_lua_retval(l);
838 as_result_setfailure(res, rv);
839 }
840 }
841
842#ifndef LUA_DEBUG_HOOK
843 // Disable the hook.
844 if ( udf_ctx->timer ) {
845 lua_sethook(l, &check_timer, 0, 0);
846 }
847#endif
848
849 // Pop the return value off the stack
850 as_log_trace("pop return value from the stack");
851 lua_pop(l, -1);
852
853 if ( is_stream || (res == NULL) ) { //if is_stream is true then whether res is null or not rc should be returned
854 return rc;
855 } else {
856 return 0;
857 }
858}
859
860
861char * as_module_err_string(int err_no) {
862 char *rs;
863 switch(err_no) {
864 case -1:
865 rs = cf_strdup("UDF: Mod-Lua system path not found");
866 break;
867 case -2:
868 rs = cf_strdup("UDF: Mod-Lua user path not found");
869 break;
870 case -3:
871 rs = cf_strdup("UDF: Mod-Lua system and user path not found");
872 break;
873 default:
874 rs = cf_malloc(sizeof(char) * 128);
875 sprintf(rs, "UDF: Execution Error %d", err_no);
876 break;
877 }
878 return rs;
879}
880
881static void populate_error(lua_State * l, const char * filename, int rc, as_module_error * err) {
882
883 const char * message = lua_tostring(l, -1);
884 size_t len = 0;
885
886 err->scope = 2; // UDF Module
887
888 switch ( rc ) {
889 case LUA_ERRSYNTAX : {
890 err->code = 10;
891 break;
892 }
893 case LUA_ERRRUN : {
894 err->code = 11;
895 break;
896 }
897 case LUA_ERRMEM : {
898 err->code = 12;
899 break;
900 }
901 case LUA_ERRERR : {
902 err->code = 13;
903 break;
904 }
905 default : {
906 err->code = 0;
907 break;
908 }
909 }
910
911 if ( ! message ) {
912 message = "(Null error message returned by lua)";
913 }
914
915 if ( err->code == 10 || err->code == 11 ) {
916 if ( message[0] == '[' ) {
917 char * fileL = strchr(message,'"');
918 if ( fileL ) {
919 fileL++;
920 char * fileR = strchr(fileL, '"');
921 if ( fileR ) {
922 memcpy(err->file, fileL, fileR-fileL);
923 err->file[fileR-fileL] = '\0';
924 char * lineL = strchr(fileR, ':');
925 if ( lineL ) {
926 lineL++;
927 char * lineR = strchr(lineL, ':');
928 if ( lineR ) {
929 char line[11] = {0};
930 memcpy(line, lineL, lineR-lineL);
931 err->line = atoi(line);
932 lineR += 2;
933 len = strlen(lineR);
934 if ( len > 1024 ) {
935 len = 1024;
936 }
937 memcpy(err->message, lineR, len);
938 err->message[len] = '\0';
939 }
940 }
941 }
942 }
943 }
944 else {
945 char * c = strstr(message, "module 'aerospike' not found");
946 if ( c ) {
947 strcpy(err->message, "'aerospike' lua module not found, check mod-lua system-path");
948 }
949 else {
950 // Unrecognized error message. Just return the first line, up
951 // to 256 chars.
952 c = strchr(message, '\n');
953 if ( c ) {
954 len = c - message;
955 }
956 else {
957 len = strlen(message);
958 }
959 if ( len > 256 ) {
960 len = 256;
961 }
962 memcpy(err->message, message, len);
963 err->message[len] = '\0';
964 }
965 }
966 }
967 else {
968
969 lua_Debug ar;
970 lua_getfield(l, LUA_GLOBALSINDEX, "f");
971 lua_getinfo(l, ">Snl", &ar);
972
973 printf("## name = %s\n", ar.name);
974 printf("## namewhat = %s\n", ar.namewhat);
975 printf("## what = %s\n", ar.what);
976 printf("## source = %s\n", ar.source);
977 printf("## currentline = %d\n", ar.currentline);
978 printf("## nups = %d\n", ar.nups);
979 printf("## linedefined = %d\n", ar.linedefined);
980 printf("## lastlinedefined = %d\n", ar.lastlinedefined);
981 printf("## short_src = %s\n", ar.short_src);
982
983 len = strlen(message);
984 if ( len > 1024 ) {
985 len = 1024;
986 }
987 memcpy(err->message, message, len);
988 err->message[len] = '\0';
989 memcpy(err->file, filename, 256);
990 err->file[255] = '\0';
991 err->line = ar.currentline;
992 memcpy(err->func, ar.name, 256);
993 err->func[255] = '\0';
994 }
995}
996
997static bool
998load_buffer_validate(
999 lua_State* l, const char* filename, const char* script, size_t size, const char* name,
1000 as_module_error* err
1001 )
1002{
1003 int rc = luaL_loadbuffer(l, script, size - 1, name);
1004
1005 if (rc) {
1006 populate_error(l, filename, rc, err);
1007 return false;
1008 }
1009
1010 rc = lua_pcall(l, 0, LUA_MULTRET, 0);
1011
1012 if (rc) {
1013 populate_error(l, filename, rc, err);
1014 return false;
1015 }
1016
1017 return true;
1018}
1019
1020/**
1021 * Validates a UDF module
1022 */
1023static int validate(as_module * m, as_aerospike * as, const char * filename, const char * content, uint32_t size, as_module_error * err) {
1024
1025 int rc = 0;
1026
1027 err->scope = 0;
1028 err->code = 0;
1029 err->message[0] = '\0';
1030 err->file[0] = '\0';
1031 err->line = 0;
1032 err->func[0] = '\0';
1033
1034 context * ctx = (context *) m->source;
1035 lua_State * l = NULL;
1036
1037 l = lua_open();
1038
1039 if ( l == NULL ) {
1040 err->scope = 1;
1041 err->code = 1;
1042 strcpy(err->message,"Unable to create a new Lua state");
1043 goto Cleanup;
1044 }
1045
1046 luaL_openlibs(l);
1047
1048 package_path_set(l, ctx->config.user_path);
1049 package_cpath_set(l, ctx->config.user_path);
1050
1051 mod_lua_aerospike_register(l);
1052 mod_lua_record_register(l);
1053 mod_lua_iterator_register(l);
1054 mod_lua_stream_register(l);
1055 mod_lua_list_register(l);
1056 mod_lua_map_register(l);
1057 mod_lua_bytes_register(l);
1058 mod_lua_geojson_register(l);
1059
1060 if (! load_buffer_validate(l, filename, as_lua_as, as_lua_as_size, "as.lua", err)) {
1061 goto Cleanup;
1062 }
1063
1064 if (! load_buffer_validate(l, filename, as_lua_stream_ops, as_lua_stream_ops_size, "stream_ops.lua", err)) {
1065 goto Cleanup;
1066 }
1067
1068 if (! load_buffer_validate(l, filename, as_lua_aerospike, as_lua_aerospike_size, "aerospike.lua", err)) {
1069 goto Cleanup;
1070 }
1071
1072 // No validation for .so file
1073 if (hasext(filename, strlen(filename), ".so", 3)) {
1074 as_log_trace("No validation required for native module: %s", filename);
1075 goto Cleanup;
1076 }
1077
1078 rc = luaL_loadbuffer(l, content, size, filename);
1079 if ( rc ) {
1080 populate_error(l, filename, rc, err);
1081 goto Cleanup;
1082 }
1083
1084 rc = lua_pcall(l, 0, 1, 0);
1085 if ( rc ) {
1086 populate_error(l, filename, rc, err);
1087 goto Cleanup;
1088 }
1089
1090Cleanup:
1091 if ( err->code == 0 ) {
1092 as_log_trace("Lua Validation Pass for '%s'", filename);
1093 }
1094 else {
1095 as_log_debug("Lua Validation Fail for '%s': (%d) %s", filename, err->code, err->message);
1096 }
1097
1098 if ( l != NULL ) {
1099 lua_close(l);
1100 }
1101
1102 return err->code;
1103}
1104
1105
1106/**
1107 * Applies a record and arguments to the function specified by a fully-qualified name.
1108 *
1109 * Proxies to `m->hooks->apply_record(m, ...)`
1110 *
1111 * TODO: Remove redundancies between apply_record() and apply_stream()
1112 *
1113 * @param m module from which the fqn will be resolved.
1114 * @param udf_ctx udf execution context
1115 * @param function fully-qualified name of the function to invoke.
1116 * @param r record to apply to the function.
1117 * @param args list of arguments for the function represented as vals
1118 * @param res pointer to an as_result that will be populated with the result.
1119 * @return 0 on success, otherwise 1
1120 */
1121static int apply_record(as_module * m, as_udf_context * udf_ctx, const char * filename, const char * function, as_rec * r, as_list * args, as_result * res) {
1122
1123 int rc = 0;
1124 context * ctx = (context *) m->source; // mod-lua context
1125 lua_State * l = (lua_State *) NULL; // Lua State
1126 int argc = 0; // Number of arguments pushed onto the stack
1127 int err = 0; // Error handler
1128 as_aerospike *as = udf_ctx->as; // aerospike object
1129
1130 cache_item citem = {
1131 .key = "",
1132 .gen = "",
1133 .state = NULL
1134 };
1135
1136 strncpy(citem.key, filename, CACHE_ENTRY_KEY_MAX - 1);
1137
1138 as_log_trace("apply_record: BEGIN");
1139
1140 // lease a state
1141 as_log_trace("apply_record: poll state");
1142 rc = poll_state(ctx, &citem);
1143
1144 if ( rc != 0 ) {
1145 as_log_trace("apply_record: Unable to poll a state");
1146 return rc;
1147 }
1148
1149 l = citem.state;
1150
1151 // push error handler
1152 // lua_pushcfunction(l, handle_error);
1153 // int err = lua_gettop(l);
1154
1155 // push aerospike into the global scope
1156 as_log_trace("apply_record: push aerospike into the global scope");
1157 mod_lua_pushaerospike(l, as);
1158 lua_setglobal(l, "aerospike");
1159
1160 // push apply_record() onto the stack
1161 as_log_trace("apply_record: push apply_record() onto the stack");
1162 lua_getglobal(l, "apply_record");
1163
1164 // push function onto the stack
1165 as_log_trace("apply_record: push function onto the stack");
1166 lua_getglobal(l, function);
1167
1168 // push the record onto the stack
1169 as_log_trace("apply_record: push the record onto the stack");
1170 mod_lua_pushrecord(l, r);
1171
1172 // push each argument onto the stack
1173 as_log_trace("apply_record: push each argument onto the stack");
1174 argc = pushargs(l, args);
1175
1176 if (argc > LUA_PARAM_COUNT_THRESHOLD) {
1177 as_log_error("large number of Lua function arguments (%d)", argc);
1178 }
1179
1180 // function + record + arglist
1181 argc = argc + 2;
1182
1183 // apply the function
1184 as_log_trace("apply_record: apply the function %s.%s", filename, function);
1185 rc = apply(l, udf_ctx, err, argc, res, false);
1186
1187 // return the state
1188 pthread_rwlock_rdlock(ctx->lock);
1189 as_log_trace("apply_record: offer state");
1190 offer_state(ctx, &citem);
1191 pthread_rwlock_unlock(ctx->lock);
1192
1193 as_log_trace("apply_record: END");
1194 return rc;
1195}
1196
1197
1198
1199/**
1200 * Applies function to a stream and set of arguments.
1201 *
1202 * Proxies to `m->hooks->apply_stream(m, ...)`
1203 *
1204 * TODO: Remove redundancies between apply_record() and apply_stream()
1205 *
1206 * @param m module from which the fqn will be resolved.
1207 * @param udf_ctx udf execution context
1208 * @param function fully-qualified name of the function to invoke.
1209 * @param istream stream to apply to the function.
1210 * @param args list of arguments for the function represented as vals
1211 * @param ostream output stream which will be populated by applying the function.
1212 * @param res pointer to an as_result that will be populated with the result.
1213 * @return 0 on success, otherwise 1
1214 */
1215static int apply_stream(as_module * m, as_udf_context *udf_ctx, const char * filename, const char * function, as_stream * istream, as_list * args, as_stream * ostream, as_result * res) {
1216
1217 int rc = 0;
1218 context * ctx = (context *) m->source; // mod-lua context
1219 lua_State * l = (lua_State *) NULL; // Lua State
1220 int argc = 0; // Number of arguments pushed onto the stack
1221 int err = 0; // Error handler
1222 as_aerospike *as = udf_ctx->as; // aerospike object
1223
1224 cache_item citem = {
1225 .key = "",
1226 .gen = "",
1227 .state = NULL
1228 };
1229
1230 strncpy(citem.key, filename, CACHE_ENTRY_KEY_MAX - 1);
1231
1232 as_log_trace("apply_stream: BEGIN");
1233
1234 // lease a state
1235 as_log_trace("apply_stream: poll state");
1236 rc = poll_state(ctx, &citem);
1237
1238 if ( rc != 0 ) {
1239 as_log_trace("apply_stream: Unable to poll a state");
1240 return rc;
1241 }
1242
1243 l = citem.state;
1244
1245 // push error handler
1246 lua_pushcfunction(l, handle_error);
1247 err = lua_gettop(l);
1248
1249 // push aerospike into the global scope
1250 as_log_trace("apply_stream: push aerospike into the global scope");
1251 mod_lua_pushaerospike(l, as);
1252 lua_setglobal(l, "aerospike");
1253
1254 // push apply_stream() onto the stack
1255 as_log_trace("apply_stream: push apply_stream() onto the stack");
1256 lua_getglobal(l, "apply_stream");
1257
1258 // push function onto the stack
1259 as_log_trace("apply_stream: push function onto the stack");
1260 lua_getglobal(l, function);
1261
1262 // push the stream onto the stack
1263 // if server_mode == true then SCOPE_SERVER(1) else SCOPE_CLIENT(2)
1264 as_log_trace("apply_stream: push scope onto the stack");
1265 lua_pushinteger(l, ctx->config.server_mode ? 1 : 2);
1266
1267 // push the stream onto the stack
1268 as_log_trace("apply_stream: push istream onto the stack");
1269 mod_lua_pushstream(l, istream);
1270
1271 as_log_trace("apply_stream: push ostream onto the stack");
1272 mod_lua_pushstream(l, ostream);
1273
1274 // push each argument onto the stack
1275 as_log_trace("apply_stream: push each argument onto the stack");
1276 argc = pushargs(l, args);
1277
1278 if (argc > LUA_PARAM_COUNT_THRESHOLD) {
1279 as_log_error("large number of Lua function arguments (%d)", argc);
1280 }
1281
1282 // function + scope + istream + ostream + arglist
1283 argc = 4 + argc;
1284
1285 // call apply_stream(f, s, ...)
1286 as_log_trace("apply_stream: apply the function %s.%s", filename, function);
1287 rc = apply(l, udf_ctx, err, argc, res, true);
1288
1289 // release the context
1290 pthread_rwlock_rdlock(ctx->lock);
1291 as_log_trace("apply_stream: lose the context");
1292 offer_state(ctx, &citem);
1293 pthread_rwlock_unlock(ctx->lock);
1294
1295 as_log_trace("apply_stream: END");
1296 return rc;
1297}
1298
1299
1300int mod_lua_rdlock(as_module * m) {
1301 context * c = (context *) ( m ? m->source : NULL );
1302 if ( c && c->lock ) {
1303 return pthread_rwlock_rdlock(c->lock);
1304 }
1305 return 1;
1306}
1307
1308int mod_lua_wrlock(as_module * m) {
1309 context * c = (context *) ( m ? m->source : NULL );
1310 if ( c && c->lock ) {
1311 return pthread_rwlock_wrlock(c->lock);
1312 }
1313 return 1;
1314}
1315
1316int mod_lua_unlock(as_module * m) {
1317 context * c = (context *) ( m ? m->source : NULL );
1318 if ( c && c->lock ) {
1319 return pthread_rwlock_unlock(c->lock);
1320 }
1321 return 1;
1322}
1323
1324
1325/**
1326 * Module Hooks
1327 */
1328static const as_module_hooks mod_lua_hooks = {
1329 .destroy = NULL,
1330 .update = update,
1331 .validate = validate,
1332 .apply_record = apply_record,
1333 .apply_stream = apply_stream
1334};
1335
1336/**
1337 * Module
1338 */
1339as_module mod_lua = {
1340 .source = &mod_lua_source,
1341 .hooks = &mod_lua_hooks
1342};
1343
1344//==========================================================
1345// Simple hashmap for lua configuration.
1346// - keys are null terminated but fixed-size allocated
1347// - key parameters are assumed to be good
1348// - values are lua cache_entry struct pointers
1349//
1350
1351static lua_hash_ele*
1352lua_hash_get_row_head(const lua_hash* h, const char* key)
1353{
1354 uint64_t hashed_key = cf_hash_fnv32((const uint8_t*)key, strlen(key));
1355 uint32_t row_i = (uint32_t)(hashed_key % h->n_rows);
1356
1357 lua_hash_ele* e = (lua_hash_ele*)(h->table + (h->ele_size * row_i));
1358
1359 return e;
1360}
1361
1362lua_hash*
1363lua_hash_create(uint32_t key_size, uint32_t n_rows)
1364{
1365 lua_hash* h = (lua_hash*)cf_malloc(sizeof(lua_hash));
1366
1367 h->ele_size = sizeof(lua_hash_ele) + key_size;
1368 h->n_rows = n_rows;
1369
1370 size_t table_size = n_rows * h->ele_size;
1371
1372 h->table = cf_malloc(table_size);
1373
1374 memset((void*)h->table, 0, table_size);
1375
1376 return h;
1377}
1378
1379cache_entry*
1380lua_hash_remove(lua_hash* h, const char* key)
1381{
1382 lua_hash_ele* e = lua_hash_get_row_head(h, key);
1383
1384 // Nobody in row yet so nothing to delete.
1385 if (e->value == NULL) {
1386 return NULL;
1387 }
1388
1389 lua_hash_ele* e_head = e;
1390 lua_hash_ele* e_last = NULL;
1391
1392 while (e != NULL) {
1393 if (strcmp(e->key, key) == 0) {
1394 cache_entry* ele_to_remove_val = e->value;
1395
1396 // Special cases for removing first element in a row.
1397 if (e == e_head) {
1398 if (e->next) { // move the next element up to the head
1399 lua_hash_ele* e_next = e->next;
1400 e->next = e_next->next;
1401 e->value = e_next->value;
1402 strcpy(e->key, e_next->key);
1403 cf_free(e_next);
1404 }
1405 else { // remove only element in row
1406 e->next = NULL;
1407 e->value = NULL;
1408 e->key[0] = '\0';
1409 }
1410 }
1411 else {
1412 e_last->next = e->next;
1413 cf_free(e);
1414 }
1415
1416 return ele_to_remove_val;
1417 }
1418
1419 e_last = e;
1420 e = e->next;
1421 }
1422
1423 return NULL;
1424}
1425
1426static inline void
1427lua_hash_call_cb_if(void (*cb)(cache_entry*), cache_entry* centry)
1428{
1429 if (cb != NULL && centry != NULL) {
1430 (*cb)(centry);
1431 }
1432}
1433
1434// Wipe out all entries but leave hash itself intact. This function cleans up
1435// the hash itself. The callback may be used to do any additional cleanup on the
1436// hash values.
1437void
1438lua_hash_clear(lua_hash* h, void (*cb)(cache_entry*))
1439{
1440 lua_hash_ele* e_table = (lua_hash_ele*)h->table;
1441
1442 for (uint32_t i = 0; i < h->n_rows; i++) {
1443 lua_hash_call_cb_if(cb, e_table->value);
1444
1445 if (e_table->next != NULL) {
1446 lua_hash_ele* e = e_table->next;
1447
1448 while (e != NULL) {
1449 lua_hash_call_cb_if(cb, e->value);
1450
1451 lua_hash_ele* t = e->next;
1452 cf_free(e);
1453 e = t;
1454 }
1455 }
1456
1457 e_table->next = NULL;
1458 e_table->value = NULL;
1459 e_table->key[0] = '\0';
1460 e_table = (lua_hash_ele*)((uint8_t*)e_table + h->ele_size);
1461 }
1462}
1463
1464// Wipe out entire hash (hash not usable after calling).
1465void
1466lua_hash_destroy(lua_hash* h)
1467{
1468 lua_hash_clear(h, NULL);
1469 cf_free(h->table);
1470 cf_free(h);
1471}
1472
1473// Returns old cache_entry if key had value before NULL otherwise.
1474cache_entry*
1475lua_hash_put(lua_hash* h, const char* key, cache_entry* value)
1476{
1477 lua_hash_ele* e = lua_hash_get_row_head(h, key);
1478
1479 // Nobody in row yet so just set that first element
1480 if (e->value == NULL) {
1481 strcpy(e->key, key);
1482 e->value = value;
1483 return NULL;
1484 }
1485
1486 lua_hash_ele* e_head = e;
1487 cache_entry* overwritten_value = NULL;
1488
1489 while (e) {
1490 if (strcmp(e->key, key) == 0) {
1491 overwritten_value = e->value;
1492 break;
1493 }
1494
1495 e = e->next;
1496 }
1497
1498 if (overwritten_value == NULL) {
1499 e = (lua_hash_ele*)cf_malloc(h->ele_size);
1500 strcpy(e->key, key);
1501 e->next = e_head->next;
1502 e_head->next = e;
1503 }
1504
1505 e->value = value;
1506
1507 return overwritten_value;
1508}
1509
1510// Functions as a "has" if called with a null p_value.
1511bool
1512lua_hash_get(const lua_hash* h, const char* key, cache_entry** p_value)
1513{
1514 lua_hash_ele* e = lua_hash_get_row_head(h, key);
1515
1516 if (e->value == NULL) {
1517 return false;
1518 }
1519
1520 while (e) {
1521 if (strcmp(e->key, key) == 0) {
1522 if (p_value) {
1523 *p_value = e->value;
1524 }
1525
1526 return true;
1527 }
1528
1529 e = e->next;
1530 }
1531
1532 return false;
1533}
1534