1/*
2 * udf_cast.c
3 *
4 * Copyright (C) 2012-2014 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "base/udf_cask.h"
24
25#include <dirent.h>
26#include <errno.h>
27#include <stddef.h>
28#include <stdint.h>
29#include <stdio.h>
30#include <stdlib.h>
31#include <string.h>
32#include <unistd.h>
33#include <openssl/sha.h>
34
35#include "jansson.h"
36
37#include "aerospike/as_module.h"
38#include "aerospike/mod_lua.h"
39#include "citrusleaf/alloc.h"
40#include "citrusleaf/cf_b64.h"
41#include "citrusleaf/cf_crypto.h"
42
43#include "dynbuf.h"
44#include "fault.h"
45
46#include "base/cfg.h"
47#include "base/smd.h"
48#include "base/thr_info.h"
49#include <sys/stat.h>
50
51char *as_udf_type_name[] = {"LUA", 0};
52
53static int file_read(char *, uint8_t **, size_t *, unsigned char *);
54static int file_write(char *, uint8_t *, size_t, unsigned char *);
55static int file_remove(char *);
56static int file_generation(char *, uint8_t *, size_t, unsigned char *);
57
58static inline int file_resolve(char * filepath, char * filename, char * ext) {
59
60 char * p = filepath;
61 char * user_path = g_config.mod_lua.user_path;
62 size_t user_path_len = strlen(user_path);
63 int filename_len = strlen(filename);
64
65 memcpy(p, user_path, sizeof(char) * user_path_len);
66 p += user_path_len;
67
68 memcpy(p, "/", 1);
69 p += 1;
70
71 memcpy(p, filename, filename_len);
72 p += filename_len;
73
74 if ( ext ) {
75 int ext_len = strlen(ext);
76 memcpy(p, ext, ext_len);
77 p += ext_len;
78 }
79
80 p[0] = '\0';
81
82 return 0;
83}
84
85static int file_read(char * filename, uint8_t ** content, size_t * content_len, unsigned char * hash) {
86
87 char filepath[256] = {0};
88 char line[1024] = {0};
89 size_t line_len = sizeof(line);
90
91 file_resolve(filepath, filename, NULL);
92
93 cf_dyn_buf_define(buf);
94
95 FILE *file = fopen(filepath, "r");
96
97 if ( file ) {
98
99 while( fgets(line, line_len, file) != NULL ) {
100 cf_dyn_buf_append_string(&buf, line);
101 }
102
103 fclose(file);
104 file = NULL;
105
106 if ( buf.used_sz > 0 ) {
107
108 char *src = cf_dyn_buf_strdup(&buf);
109
110 file_generation(filepath, (uint8_t *)src, buf.used_sz, hash);
111
112 uint32_t src_len = (uint32_t)buf.used_sz;
113 uint32_t out_size = cf_b64_encoded_len(src_len);
114
115 *content = (uint8_t *)cf_malloc(out_size);
116 *content_len = out_size;
117
118 cf_b64_encode((const uint8_t*)src, src_len, (char*)(*content));
119
120 cf_free(src);
121 src = NULL;
122
123 return 0;
124 }
125
126 *content = NULL;
127 *content_len = 0;
128 return 2;
129 }
130
131 *content = NULL;
132 *content_len = 0;
133 return 1;
134}
135
136static int file_write(char * filename, uint8_t * content, size_t content_len, unsigned char * hash) {
137
138 char filepath[256] = {0};
139
140 file_resolve(filepath, filename, NULL);
141
142 FILE *file = fopen(filepath, "w");
143
144 if (file == NULL) {
145 cf_warning(AS_UDF, "could not open udf put to %s: %s", filepath, cf_strerror(errno));
146 return -1;
147 }
148 int r = fwrite(content, sizeof(char), content_len, file);
149 if (r <= 0) {
150 cf_warning(AS_UDF, "could not write file %s: %d", filepath, r);
151 fclose(file);
152 return -1;
153 }
154
155 fclose(file);
156 file = NULL;
157
158 file_generation(filepath, content, content_len, hash);
159
160 return 0;
161}
162
163static int file_remove(char * filename) {
164 char filepath[256] = {0};
165 file_resolve(filepath, filename, NULL);
166 unlink(filepath);
167 return 0;
168}
169
170static int file_generation(char * filename, uint8_t * content, size_t content_len, unsigned char * hash) {
171 unsigned char sha1[128] = {0};
172 int len = 20;
173 SHA1((const unsigned char *) content, (unsigned long) content_len, (unsigned char *) sha1);
174 cf_b64_encode(sha1, len, (char*)hash);
175 hash[cf_b64_encoded_len(len)] = 0;
176 return 0;
177}
178
179// return -1 if not found otherwise the index in as_udf_type_name
180static int udf_type_getid(char *type) {
181 int index = 0;
182 while (as_udf_type_name[index]) {
183 if (strcmp( type, as_udf_type_name[index]) == 0 ) {
184 return(index);
185 }
186 index++;
187 }
188 return(-1);
189}
190
191/*
192 * Type for user data passed to the get metadata callback.
193 */
194typedef struct udf_get_data_s {
195 cf_dyn_buf *db; // DynBuf for output.
196 bool done; // Has the callback finished?
197} udf_get_data_t;
198
199/*
200 * UDF SMD get metadata items callback.
201 */
202static void udf_cask_get_metadata_cb(const cf_vector *items, void *udata)
203{
204 udf_get_data_t *p_get_data = (udf_get_data_t *) udata;
205 cf_dyn_buf *out = p_get_data->db;
206
207 unsigned char hash[SHA_DIGEST_LENGTH];
208 // hex string to be returned to the client
209 unsigned char sha1_hex_buff[CF_SHA_HEX_BUFF_LEN];
210 // Currently just return directly for LUA
211 uint8_t udf_type = AS_UDF_TYPE_LUA;
212
213 for (uint32_t index = 0; index < cf_vector_size(items); index++) {
214 as_smd_item *item = cf_vector_get_ptr(items, index);
215
216 if (item->value == NULL) {
217 continue;
218 }
219
220 cf_debug(AS_UDF, "UDF metadata item[%d]: key \"%s\" ; value \"%s\" ; generation %u ; timestamp %lu",
221 index, item->key, item->value, item->generation, item->timestamp);
222 cf_dyn_buf_append_string(out, "filename=");
223 cf_dyn_buf_append_buf(out, (uint8_t *)item->key, strlen(item->key));
224 cf_dyn_buf_append_string(out, ",");
225 SHA1((uint8_t *)item->value, strlen(item->value), hash);
226
227 // Convert to a hexadecimal string
228 cf_convert_sha1_to_hex(hash, sha1_hex_buff);
229 cf_dyn_buf_append_string(out, "hash=");
230 cf_dyn_buf_append_buf(out, sha1_hex_buff, CF_SHA_HEX_BUFF_LEN);
231 cf_dyn_buf_append_string(out, ",type=");
232 cf_dyn_buf_append_string(out, as_udf_type_name[udf_type]);
233 cf_dyn_buf_append_string(out, ";");
234 }
235
236 p_get_data->done = true;
237}
238
239/*
240 * Implementation of the "udf-list" Info. Command.
241 */
242int udf_cask_info_list(char *name, cf_dyn_buf *out)
243{
244 cf_debug(AS_UDF, "UDF CASK INFO LIST");
245
246 udf_get_data_t get_data = { .db = out, .done = false };
247
248 as_smd_get_all(AS_SMD_MODULE_UDF, udf_cask_get_metadata_cb, &get_data);
249
250 return 0;
251}
252
253/*
254 * Reading local directory to get specific module item's contents.
255 * In future if needed we can change this to reading from smd metadata.
256 */
257int udf_cask_info_get(char *name, char * params, cf_dyn_buf * out) {
258
259 int resp = 0;
260 char filename[128] = {0};
261 int filename_len = sizeof(filename);
262 uint8_t * content = NULL;
263 size_t content_len = 0;
264 unsigned char content_gen[256] = {0};
265 uint8_t udf_type = AS_UDF_TYPE_LUA;
266
267 cf_debug(AS_INFO, "UDF CASK INFO GET");
268
269 // get (required) script filename
270 if ( as_info_parameter_get(params, "filename", filename, &filename_len) ) {
271 cf_info(AS_INFO, "invalid or missing filename");
272 cf_dyn_buf_append_string(out, "error=invalid_filename");
273 return 0;
274 }
275
276 mod_lua_rdlock(&mod_lua);
277 // read the script from filesystem
278 resp = file_read(filename, &content, &content_len, content_gen);
279 mod_lua_unlock(&mod_lua);
280 if ( resp ) {
281 switch ( resp ) {
282 case 1 : {
283 cf_dyn_buf_append_string(out, "error=not_found");
284 break;
285 }
286 case 2 : {
287 cf_dyn_buf_append_string(out, "error=empty");
288 break;
289 }
290 default : {
291 cf_dyn_buf_append_string(out, "error=unknown_error");
292 break; // complier complains without a break;
293 }
294 }
295 }
296 else {
297 // put back the result
298 cf_dyn_buf_append_string(out, "gen=");
299 cf_dyn_buf_append_string(out, (char *) content_gen);
300 cf_dyn_buf_append_string(out, ";type=");
301 cf_dyn_buf_append_string(out, as_udf_type_name[udf_type]);
302 cf_dyn_buf_append_string(out, ";content=");
303 cf_dyn_buf_append_buf(out, content, content_len);
304 cf_dyn_buf_append_string(out, ";");
305 }
306
307 if ( content ) {
308 cf_free(content);
309 content = NULL;
310 }
311
312 return 0;
313}
314
315// An info put call will call system metadata
316//
317// Data is reflected into json as an object with the following fields
318// which can be added to later if necessary, for example, instead of using
319// the specific data, it could include the URL to the data
320//
321// key - name of the UDF file
322//
323// content64 - base64 encoded data
324// type - language to execute
325// name - reptition of the name, same as the key
326
327int udf_cask_info_put(char *name, char * params, cf_dyn_buf * out) {
328
329 cf_debug(AS_INFO, "UDF CASK INFO PUT");
330
331 int rc = 0;
332 char filename[128] = {0};
333 int filename_len = sizeof(filename);
334 // Content_len from the client and its expected size
335 char content_len[32] = {0};
336 int clen = sizeof(content_len);
337 // Udf content from the client and its expected length
338 char *udf_content = NULL;
339 int udf_content_len = 0;
340 // Udf type from the client and its expected size
341 char type[8] = {0};
342 int type_len = sizeof(type);
343
344 // get (required) script filename
345 char *tmp_char;
346
347 if ( as_info_parameter_get(params, "filename", filename, &filename_len)
348 || !(tmp_char = strchr(filename, '.')) // No extension in filename
349 || tmp_char == filename // '.' at the begining of filename
350 || strlen (tmp_char) <= 1) { // '.' in filename, but no extnsion e.g. "abc."
351 cf_info(AS_INFO, "invalid or missing filename");
352 cf_dyn_buf_append_string(out, "error=invalid_filename");
353 return 0;
354 }
355
356 if ( as_info_parameter_get(params, "content-len", content_len, &(clen)) ) {
357 cf_info(AS_INFO, "invalid or missing content-len");
358 cf_dyn_buf_append_string(out, "error=invalid_content_len");
359 return 0;
360 }
361
362 if ( as_info_parameter_get(params, "udf-type", type, &type_len) ) {
363 // Replace with DEFAULT IS LUA
364 strcpy(type, as_udf_type_name[0]);
365 }
366
367 // check type field
368 if (-1 == udf_type_getid(type)) {
369 cf_info(AS_INFO, "invalid or missing udf-type : %s not valid", type);
370 cf_dyn_buf_append_string(out, "error=invalid_udf_type");
371 return 0;
372 }
373
374 // get b64 encoded script
375 udf_content_len = atoi(content_len) + 1;
376 udf_content = (char *) cf_malloc(udf_content_len);
377
378 // cf_info(AS_UDF, "content_len = %s", content_len);
379 // cf_info(AS_UDF, "udf_content_len = %d", udf_content_len);
380
381
382 // get (required) script content - base64 encoded here.
383 if ( as_info_parameter_get(params, "content", udf_content, &(udf_content_len)) ) {
384 cf_info(AS_UDF, "invalid content");
385 cf_dyn_buf_append_string(out, "error=invalid_content");
386 cf_free(udf_content);
387 return 0;
388 }
389
390 // base 64 decode it
391 uint32_t encoded_len = strlen(udf_content);
392 uint32_t decoded_len = cf_b64_decoded_buf_size(encoded_len) + 1;
393
394 // Don't allow UDF file size > 1MB
395 if ( decoded_len > MAX_UDF_CONTENT_LENGTH) {
396 cf_info(AS_INFO, "lua file size:%d > 1MB", decoded_len);
397 cf_dyn_buf_append_string(out, "error=invalid_udf_content_len, lua file size > 1MB");
398 cf_free(udf_content);
399 return 0;
400 }
401
402 char * decoded_str = cf_malloc(decoded_len);
403
404 if ( ! cf_b64_validate_and_decode(udf_content, encoded_len, (uint8_t*)decoded_str, &decoded_len) ) {
405 cf_info(AS_UDF, "invalid base64 content %s", filename);
406 cf_dyn_buf_append_string(out, "error=invalid_base64_content");
407 cf_free(decoded_str);
408 cf_free(udf_content);
409 return 0;
410 }
411
412 decoded_str[decoded_len] = '\0';
413
414 as_module_error err;
415 rc = as_module_validate(&mod_lua, NULL, filename, decoded_str, decoded_len, &err);
416
417 cf_free(decoded_str);
418 decoded_str = NULL;
419 decoded_len = 0;
420
421 if ( rc ) {
422 cf_warning(AS_UDF, "udf-put: compile error: [%s:%d] %s", err.file, err.line, err.message);
423 cf_dyn_buf_append_string(out, "error=compile_error");
424 cf_dyn_buf_append_string(out, ";file=");
425 cf_dyn_buf_append_string(out, err.file);
426 cf_dyn_buf_append_string(out, ";line=");
427 cf_dyn_buf_append_uint32(out, err.line);
428
429 uint32_t message_len = strlen(err.message);
430 uint32_t enc_message_len = cf_b64_encoded_len(message_len);
431 char enc_message[enc_message_len];
432
433 cf_b64_encode((const uint8_t*)err.message, message_len, enc_message);
434
435 cf_dyn_buf_append_string(out, ";message=");
436 cf_dyn_buf_append_buf(out, (uint8_t *)enc_message, enc_message_len);
437
438 cf_free(udf_content);
439 return 0;
440 }
441
442 // Create an empty JSON object
443 json_t *udf_obj = 0;
444 if (!(udf_obj = json_object())) {
445 cf_warning(AS_UDF, "failed to create JSON array for receiving UDF");
446 cf_free(udf_content);
447 return -1;
448 }
449 int e = 0;
450 e += json_object_set_new(udf_obj, "content64", json_string(udf_content));
451 e += json_object_set_new(udf_obj, "type", json_string(type));
452 e += json_object_set_new(udf_obj, "name", json_string(filename));
453
454 cf_free(udf_content);
455
456 if (e) {
457 cf_warning(AS_UDF, "could not encode UDF object, error %d", e);
458 json_decref(udf_obj);
459 return(-1);
460 }
461 // make it into a string, yet another buffer copy
462 char *udf_obj_str = json_dumps(udf_obj, 0/*flags*/);
463 json_decref(udf_obj);
464 udf_obj = 0;
465
466 cf_debug(AS_UDF, "created json object %s", udf_obj_str);
467
468 // how do I know whether to call create or add?
469 if (as_smd_set_blocking(AS_SMD_MODULE_UDF, filename, udf_obj_str, 0)) {
470 cf_info(AS_UDF, "UDF module '%s' (%s/%s) registered", filename, g_config.mod_lua.user_path, filename);
471 }
472 else {
473 cf_warning(AS_UDF, "UDF module '%s' (%s/%s) timeout", filename, g_config.mod_lua.user_path, filename);
474 cf_dyn_buf_append_string(out, "error=timeout");
475 }
476
477 // free the metadata
478 cf_free(udf_obj_str);
479 udf_obj_str = 0;
480
481 return 0;
482}
483
484int udf_cask_info_remove(char *name, char * params, cf_dyn_buf * out) {
485
486 char filename[128] = {0};
487 int filename_len = sizeof(filename);
488 char file_path[1024] = {0};
489
490 cf_debug(AS_INFO, "UDF CASK INFO REMOVE");
491
492 // get (required) script filename
493 if ( as_info_parameter_get(params, "filename", filename, &filename_len) ) {
494 cf_info(AS_UDF, "invalid or missing filename");
495 cf_dyn_buf_append_string(out, "error=invalid_filename");
496 }
497
498 // now check if such a file-name exists :
499
500 snprintf(file_path, 1024, "%s/%s", g_config.mod_lua.user_path, filename);
501
502 cf_debug(AS_INFO, " Lua file removal full-path is : %s \n", file_path);
503
504 if (! as_smd_delete_blocking(AS_SMD_MODULE_UDF, filename, 0)) {
505 cf_warning(AS_UDF, "UDF module '%s' (%s) remove timeout", filename, file_path);
506 cf_dyn_buf_append_string(out, "error=timeout");
507 return -1;
508 }
509
510 cf_info(AS_UDF, "UDF module '%s' (%s) removed", filename, file_path);
511 cf_dyn_buf_append_string(out, "ok");
512
513 return 0;
514}
515
516/*
517 * Clear out the Lua cache.
518 */
519int udf_cask_info_clear_cache(char *name, char * params, cf_dyn_buf * out)
520{
521 cf_debug(AS_INFO, "UDF CASK INFO CLEAR CACHE");
522
523 mod_lua_wrlock(&mod_lua);
524
525 as_module_event e = {
526 .type = AS_MODULE_EVENT_CLEAR_CACHE
527 };
528 as_module_update(&mod_lua, &e);
529
530 mod_lua_unlock(&mod_lua);
531
532 cf_dyn_buf_append_string(out, "ok");
533
534 return 0;
535}
536
537/**
538 * (Re-)Configure UDF modules
539 */
540int udf_cask_info_configure(char *name, char * params, cf_dyn_buf * buf) {
541 as_module_configure(&mod_lua, &g_config.mod_lua);
542 return 0;
543}
544
545// This function must take the current "view of the world" and
546// make the local store the same as that.
547
548void
549udf_cask_smd_accept_fn(const cf_vector *items, as_smd_accept_type accept_type)
550{
551 cf_debug(AS_UDF, "UDF CASK accept fn : n items %u", cf_vector_size(items));
552
553 // For each item in the list, see if the current version
554 // is different from the curretly stored version
555 // and if the new item is new, write to the storage directory
556 for (uint32_t i = 0; i < cf_vector_size(items); i++) {
557 as_smd_item *item = cf_vector_get_ptr(items, i);
558
559 if (item->value != NULL) {
560 json_error_t json_error;
561 json_t *item_obj = json_loads(item->value, 0 /*flags*/, &json_error);
562
563 if (!item_obj) {
564 cf_warning(AS_UDF, "failed to parse UDF \"%s\" with JSON error: %s ; source: %s ; line: %d ; column: %d ; position: %d",
565 item->key, json_error.text, json_error.source, json_error.line, json_error.column, json_error.position);
566 continue;
567 }
568
569 /*item->key is name */
570 json_t *content64_obj = json_object_get(item_obj, "content64");
571 const char *content64_str = json_string_value(content64_obj);
572
573 // base 64 decode it
574 uint32_t encoded_len = strlen(content64_str);
575 uint32_t decoded_len = cf_b64_decoded_buf_size(encoded_len) + 1;
576 char *content_str = cf_malloc(decoded_len);
577
578 if (! cf_b64_validate_and_decode(content64_str, encoded_len, (uint8_t*)content_str, &decoded_len)) {
579 cf_info(AS_UDF, "invalid script on accept, will not register %s", item->key);
580 cf_free(content_str);
581 json_decref(item_obj);
582 continue;
583 }
584
585 content_str[decoded_len] = 0;
586
587 cf_debug(AS_UDF, "pushing to %s, %d bytes [%s]", item->key, decoded_len, content_str);
588 mod_lua_wrlock(&mod_lua);
589
590 // content_gen is actually a hash. Not sure if it's filled out or what.
591 unsigned char content_gen[256] = {0};
592 int e = file_write(item->key, (uint8_t *) content_str, decoded_len, content_gen);
593 cf_free(content_str);
594 json_decref(item_obj);
595 if ( e ) {
596 mod_lua_unlock(&mod_lua);
597 cf_info(AS_UDF, "invalid script on accept, will not register %s", item->key);
598 continue;
599 }
600 // Update the cache
601 as_module_event ame = {
602 .type = AS_MODULE_EVENT_FILE_ADD,
603 .data.filename = item->key
604 };
605 as_module_update(&mod_lua, &ame);
606 mod_lua_unlock(&mod_lua);
607 }
608 else {
609 cf_debug(AS_UDF, "received DELETE SMD key %s", item->key);
610
611 mod_lua_wrlock(&mod_lua);
612 file_remove(item->key);
613
614 // fixes potential cache issues
615 as_module_event e = {
616 .type = AS_MODULE_EVENT_FILE_REMOVE,
617 .data.filename = item->key
618 };
619 as_module_update(&mod_lua, &e);
620
621 mod_lua_unlock(&mod_lua);
622 }
623 }
624}
625
626
627void
628udf_cask_init()
629{
630 // Have to delete the existing files in the user path on startup
631 struct dirent * entry = NULL;
632 // opendir(NULL) seg-faults
633 if (!g_config.mod_lua.user_path)
634 {
635 cf_crash(AS_UDF, "cask init: null mod-lua user-path");
636 }
637
638 DIR *dir = opendir(g_config.mod_lua.user_path);
639 if ( dir == 0 ) {
640 cf_crash(AS_UDF, "cask init: could not open udf directory %s: %s", g_config.mod_lua.user_path, cf_strerror(errno));
641 }
642 while ( (entry = readdir(dir))) {
643 // readdir also reads "." and ".." entries.
644 if (strcmp(entry->d_name, ".") && strcmp(entry->d_name, ".."))
645 {
646 char fn[1024];
647 snprintf(fn, sizeof(fn), "%s/%s", g_config.mod_lua.user_path, entry->d_name);
648 int rem_rv = remove(fn);
649 if (rem_rv != 0) {
650 cf_warning(AS_UDF, "Failed to remove the file %s. Error %d", fn, errno);
651 }
652 }
653 }
654 closedir(dir);
655
656 as_smd_module_load(AS_SMD_MODULE_UDF, udf_cask_smd_accept_fn, NULL, NULL);
657}
658