1/*
2 * udf.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/udf.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32
33#include "aerospike/as_aerospike.h"
34#include "aerospike/as_atomic.h"
35#include "aerospike/as_buffer.h"
36#include "aerospike/as_log.h"
37#include "aerospike/as_list.h"
38#include "aerospike/as_module.h"
39#include "aerospike/as_msgpack.h"
40#include "aerospike/as_serializer.h"
41#include "aerospike/as_types.h"
42#include "aerospike/as_udf_context.h"
43#include "aerospike/mod_lua.h"
44
45#include "citrusleaf/alloc.h"
46#include "citrusleaf/cf_atomic.h"
47#include "citrusleaf/cf_clock.h"
48
49#include "cf_mutex.h"
50#include "dynbuf.h"
51#include "fault.h"
52
53#include "base/cfg.h"
54#include "base/datamodel.h"
55#include "base/predexp.h"
56#include "base/proto.h"
57#include "base/secondary_index.h"
58#include "base/transaction.h"
59#include "base/transaction_policy.h"
60#include "base/udf_aerospike.h"
61#include "base/udf_arglist.h"
62#include "base/udf_cask.h"
63#include "base/udf_record.h"
64#include "fabric/exchange.h" // TODO - old pickle - remove in "six months"
65#include "fabric/partition.h"
66#include "storage/storage.h"
67#include "transaction/duplicate_resolve.h"
68#include "transaction/proxy.h"
69#include "transaction/replica_write.h"
70#include "transaction/rw_request.h"
71#include "transaction/rw_request_hash.h"
72#include "transaction/rw_utils.h"
73
74
75//==========================================================
76// Typedefs & constants.
77//
78
79static const cf_fault_severity as_log_level_map[5] = {
80 [AS_LOG_LEVEL_ERROR] = CF_WARNING,
81 [AS_LOG_LEVEL_WARN] = CF_WARNING,
82 [AS_LOG_LEVEL_INFO] = CF_INFO,
83 [AS_LOG_LEVEL_DEBUG] = CF_DEBUG,
84 [AS_LOG_LEVEL_TRACE] = CF_DETAIL
85};
86
87typedef struct udf_call_s {
88 udf_def* def;
89 as_transaction* tr;
90} udf_call;
91
92
93//==========================================================
94// Globals.
95//
96
97as_aerospike g_as_aerospike;
98
99// Deadline per UDF.
100static __thread uint64_t g_end_ns;
101
102
103//==========================================================
104// Forward declarations.
105//
106
107bool log_callback(as_log_level level, const char* func, const char* file,
108 uint32_t line, const char* fmt, ...);
109
110void start_udf_dup_res(rw_request* rw, as_transaction* tr);
111void start_udf_repl_write(rw_request* rw, as_transaction* tr);
112void start_udf_repl_write_forget(rw_request* rw, as_transaction* tr);
113bool udf_dup_res_cb(rw_request* rw);
114void udf_repl_write_after_dup_res(rw_request* rw, as_transaction* tr);
115void udf_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr);
116void udf_repl_write_cb(rw_request* rw);
117
118void send_udf_response(as_transaction* tr, cf_dyn_buf* db);
119void udf_timeout_cb(rw_request* rw);
120
121transaction_status udf_master(rw_request* rw, as_transaction* tr);
122udf_optype udf_master_apply(udf_call* call, rw_request* rw);
123int udf_apply_record(udf_call* call, as_rec* rec, as_result* result);
124void udf_finish(udf_record* urecord, rw_request* rw, udf_optype* record_op);
125udf_optype udf_finish_op(udf_record* urecord);
126void udf_post_processing(udf_record* urecord, rw_request* rw,
127 udf_optype urecord_op);
128bool udf_timer_timedout(const as_timer* timer);
129uint64_t udf_timer_timeslice(const as_timer* timer);
130
131void update_lua_complete_stats(uint8_t origin, as_namespace* ns, udf_optype op,
132 int ret, bool is_success);
133
134void process_failure_str(udf_call* call, const char* err_str, size_t len,
135 cf_dyn_buf* db);
136void process_result(const as_result* result, udf_call* call, cf_dyn_buf* db);
137void process_response(udf_call* call, bool success, const as_val* val,
138 cf_dyn_buf* db);
139
140
141//==========================================================
142// Inlines & macros.
143//
144
145static inline void
146client_udf_update_stats(as_namespace* ns, uint8_t result_code)
147{
148 switch (result_code) {
149 case AS_OK:
150 cf_atomic64_incr(&ns->n_client_udf_complete);
151 break;
152 default:
153 cf_atomic64_incr(&ns->n_client_udf_error);
154 break;
155 case AS_ERR_TIMEOUT:
156 cf_atomic64_incr(&ns->n_client_udf_timeout);
157 break;
158 case AS_ERR_FILTERED_OUT:
159 cf_atomic64_incr(&ns->n_client_udf_filtered_out);
160 break;
161 }
162}
163
164static inline void
165from_proxy_udf_update_stats(as_namespace* ns, uint8_t result_code)
166{
167 switch (result_code) {
168 case AS_OK:
169 cf_atomic64_incr(&ns->n_from_proxy_udf_complete);
170 break;
171 default:
172 cf_atomic64_incr(&ns->n_from_proxy_udf_error);
173 break;
174 case AS_ERR_TIMEOUT:
175 cf_atomic64_incr(&ns->n_from_proxy_udf_timeout);
176 break;
177 case AS_ERR_FILTERED_OUT:
178 cf_atomic64_incr(&ns->n_from_proxy_udf_filtered_out);
179 break;
180 }
181}
182
183static inline void
184udf_sub_udf_update_stats(as_namespace* ns, uint8_t result_code)
185{
186 switch (result_code) {
187 case AS_OK:
188 cf_atomic64_incr(&ns->n_udf_sub_udf_complete);
189 break;
190 default:
191 cf_atomic64_incr(&ns->n_udf_sub_udf_error);
192 break;
193 case AS_ERR_TIMEOUT:
194 cf_atomic64_incr(&ns->n_udf_sub_udf_timeout);
195 break;
196 case AS_ERR_FILTERED_OUT: // doesn't include those filtered out by metadata
197 as_incr_uint64(&ns->n_udf_sub_udf_filtered_out);
198 break;
199 }
200}
201
202static inline bool
203udf_zero_bins_left(udf_record* urecord)
204{
205 return (urecord->flag & UDF_RECORD_FLAG_OPEN) != 0 &&
206 ! as_bin_inuse_has(urecord->rd);
207}
208
209static inline void
210process_failure(udf_call* call, const as_val* val, cf_dyn_buf* db)
211{
212 process_response(call, false, val, db);
213}
214
215static inline void
216process_success(udf_call* call, const as_val* val, cf_dyn_buf* db)
217{
218 process_response(call, true, val, db);
219}
220
221
222//==========================================================
223// Public API.
224//
225
226void
227as_udf_init()
228{
229 as_module_configure(&mod_lua, &g_config.mod_lua);
230 as_log_set_callback(log_callback);
231 udf_cask_init();
232 as_aerospike_init(&g_as_aerospike, NULL, &udf_aerospike_hooks);
233}
234
235
236// Public API for udf_def class, not big enough for it's own file.
237udf_def*
238udf_def_init_from_msg(udf_def* def, const as_transaction* tr)
239{
240 def->arglist = NULL;
241
242 as_msg* m = &tr->msgp->msg;
243 as_msg_field* filename =
244 as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_FILENAME);
245
246 if (! filename) {
247 return NULL;
248 }
249
250 as_msg_field* function =
251 as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_FUNCTION);
252
253 if (! function) {
254 return NULL;
255 }
256
257 as_msg_field* arglist = as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_ARGLIST);
258
259 if (! arglist) {
260 return NULL;
261 }
262
263 uint32_t filename_len = as_msg_field_get_value_sz(filename);
264
265 if (filename_len >= sizeof(def->filename)) {
266 return NULL;
267 }
268
269 uint32_t function_len = as_msg_field_get_value_sz(function);
270
271 if (function_len >= sizeof(def->function)) {
272 return NULL;
273 }
274
275 memcpy(def->filename, filename->data, filename_len);
276 def->filename[filename_len] = '\0';
277
278 memcpy(def->function, function->data, function_len);
279 def->function[function_len] = '\0';
280
281 as_unpacker unpacker;
282
283 unpacker.buffer = (const unsigned char*)arglist->data;
284 unpacker.length = as_msg_field_get_value_sz(arglist);
285 unpacker.offset = 0;
286
287 if (unpacker.length > 0) {
288 as_val* val = NULL;
289 int ret = as_unpack_val(&unpacker, &val);
290
291 if (ret == 0 && as_val_type(val) == AS_LIST) {
292 def->arglist = (as_list*)val;
293 }
294 }
295
296 as_msg_field* op = as_transaction_has_udf_op(tr) ?
297 as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_OP) : NULL;
298
299 def->type = op ? *op->data : AS_UDF_OP_KVS;
300
301 return def;
302}
303
304
305transaction_status
306as_udf_start(as_transaction* tr)
307{
308 BENCHMARK_START(tr, udf, FROM_CLIENT);
309 BENCHMARK_START(tr, udf_sub, FROM_IUDF);
310
311 // Apply XDR filter.
312 if (! xdr_allows_write(tr)) {
313 tr->result_code = AS_ERR_ALWAYS_FORBIDDEN;
314 send_udf_response(tr, NULL);
315 return TRANS_DONE_ERROR;
316 }
317
318 // Don't know if UDF is read or delete - check that we aren't backed up.
319 if (as_storage_overloaded(tr->rsv.ns)) {
320 tr->result_code = AS_ERR_DEVICE_OVERLOAD;
321 send_udf_response(tr, NULL);
322 return TRANS_DONE_ERROR;
323 }
324
325 // Create rw_request and add to hash.
326 rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd };
327 rw_request* rw = rw_request_create(&tr->keyd);
328 transaction_status status = rw_request_hash_insert(&hkey, rw, tr);
329
330 // If rw_request wasn't inserted in hash, transaction is finished.
331 if (status != TRANS_IN_PROGRESS) {
332 rw_request_release(rw);
333
334 if (status != TRANS_WAITING) {
335 send_udf_response(tr, NULL);
336 }
337
338 return status;
339 }
340 // else - rw_request is now in hash, continue...
341
342 if (tr->rsv.ns->write_dup_res_disabled) {
343 // Note - preventing duplicate resolution this way allows
344 // rw_request_destroy() to handle dup_msg[] cleanup correctly.
345 tr->rsv.n_dupl = 0;
346 }
347
348 // If there are duplicates to resolve, start doing so.
349 if (tr->rsv.n_dupl != 0) {
350 start_udf_dup_res(rw, tr);
351
352 // Started duplicate resolution.
353 return TRANS_IN_PROGRESS;
354 }
355 // else - no duplicate resolution phase, apply operation to master.
356
357 // Set up the nodes to which we'll write replicas.
358 rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p,
359 rw->dest_nodes);
360
361 if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) {
362 rw_request_hash_delete(&hkey, rw);
363 tr->result_code = AS_ERR_UNAVAILABLE;
364 send_udf_response(tr, NULL);
365 return TRANS_DONE_ERROR;
366 }
367
368 status = udf_master(rw, tr);
369
370 BENCHMARK_NEXT_DATA_POINT_FROM(tr, udf, FROM_CLIENT, master);
371 BENCHMARK_NEXT_DATA_POINT_FROM(tr, udf_sub, FROM_IUDF, master);
372
373 // If error or UDF was a read, transaction is finished.
374 if (status != TRANS_IN_PROGRESS) {
375 if (status != TRANS_WAITING) {
376 send_udf_response(tr, &rw->response_db);
377 }
378
379 rw_request_hash_delete(&hkey, rw);
380 return status;
381 }
382
383 // If we don't need replica writes, transaction is finished.
384 if (rw->n_dest_nodes == 0) {
385 finished_replicated(tr);
386 send_udf_response(tr, &rw->response_db);
387 rw_request_hash_delete(&hkey, rw);
388 return TRANS_DONE_SUCCESS;
389 }
390
391 // If we don't need to wait for replica write acks, fire and forget.
392 if (respond_on_master_complete(tr)) {
393 start_udf_repl_write_forget(rw, tr);
394 send_udf_response(tr, &rw->response_db);
395 rw_request_hash_delete(&hkey, rw);
396 return TRANS_DONE_SUCCESS;
397 }
398
399 start_udf_repl_write(rw, tr);
400
401 // Started replica write.
402 return TRANS_IN_PROGRESS;
403}
404
405
406//==========================================================
407// Local helpers - initialization.
408//
409
410bool
411log_callback(as_log_level level, const char* func, const char* file,
412 uint32_t line, const char* fmt, ...)
413{
414 cf_fault_severity severity = as_log_level_map[level];
415
416 if (severity > cf_fault_filter[AS_UDF]) {
417 return true;
418 }
419
420 va_list ap;
421
422 va_start(ap, fmt);
423 char message[1024] = { '\0' };
424 vsnprintf(message, 1024, fmt, ap);
425 va_end(ap);
426
427 cf_fault_event(AS_UDF, severity, file, line, "%s", message);
428
429 return true;
430}
431
432
433//==========================================================
434// Local helpers - transaction flow.
435//
436
437void
438start_udf_dup_res(rw_request* rw, as_transaction* tr)
439{
440 // Finish initializing rw, construct and send dup-res message.
441
442 dup_res_make_message(rw, tr);
443
444 cf_mutex_lock(&rw->lock);
445
446 dup_res_setup_rw(rw, tr, udf_dup_res_cb, udf_timeout_cb);
447 send_rw_messages(rw);
448
449 cf_mutex_unlock(&rw->lock);
450}
451
452
453void
454start_udf_repl_write(rw_request* rw, as_transaction* tr)
455{
456 // Finish initializing rw, construct and send repl-write message.
457
458 repl_write_make_message(rw, tr);
459
460 cf_mutex_lock(&rw->lock);
461
462 repl_write_setup_rw(rw, tr, udf_repl_write_cb, udf_timeout_cb);
463 send_rw_messages(rw);
464
465 cf_mutex_unlock(&rw->lock);
466}
467
468
469void
470start_udf_repl_write_forget(rw_request* rw, as_transaction* tr)
471{
472 // Construct and send repl-write message. No need to finish rw setup.
473
474 repl_write_make_message(rw, tr);
475 send_rw_messages_forget(rw);
476}
477
478
479bool
480udf_dup_res_cb(rw_request* rw)
481{
482 BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf, FROM_CLIENT, dup_res);
483 BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf_sub, FROM_IUDF, dup_res);
484
485 as_transaction tr;
486 as_transaction_init_from_rw(&tr, rw);
487
488 if (tr.result_code != AS_OK) {
489 send_udf_response(&tr, NULL);
490 return true;
491 }
492
493 // Set up the nodes to which we'll write replicas.
494 rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p,
495 rw->dest_nodes);
496
497 if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) {
498 tr.result_code = AS_ERR_UNAVAILABLE;
499 send_udf_response(&tr, NULL);
500 return true;
501 }
502
503 transaction_status status = udf_master(rw, &tr);
504
505 BENCHMARK_NEXT_DATA_POINT_FROM((&tr), udf, FROM_CLIENT, master);
506 BENCHMARK_NEXT_DATA_POINT_FROM((&tr), udf_sub, FROM_IUDF, master);
507
508 if (status == TRANS_WAITING) {
509 // Note - new tr now owns msgp, make sure rw destructor doesn't free it.
510 // Also, rw will release rsv - new tr will get a new one.
511 rw->msgp = NULL;
512 return true;
513 }
514
515 if (status != TRANS_IN_PROGRESS) {
516 send_udf_response(&tr, &rw->response_db);
517 return true;
518 }
519
520 // If we don't need replica writes, transaction is finished.
521 if (rw->n_dest_nodes == 0) {
522 finished_replicated(&tr);
523 send_udf_response(&tr, &rw->response_db);
524 return true;
525 }
526
527 // If we don't need to wait for replica write acks, fire and forget.
528 if (respond_on_master_complete(&tr)) {
529 udf_repl_write_forget_after_dup_res(rw, &tr);
530 send_udf_response(&tr, &rw->response_db);
531 return true;
532 }
533
534 udf_repl_write_after_dup_res(rw, &tr);
535
536 // Started replica write - don't delete rw_request from hash.
537 return false;
538}
539
540
541void
542udf_repl_write_after_dup_res(rw_request* rw, as_transaction* tr)
543{
544 // Recycle rw_request that was just used for duplicate resolution to now do
545 // replica writes. Note - we are under the rw_request lock here!
546
547 repl_write_make_message(rw, tr);
548 repl_write_reset_rw(rw, tr, udf_repl_write_cb);
549 send_rw_messages(rw);
550}
551
552
553void
554udf_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr)
555{
556 // Send replica writes. Not waiting for acks, so need to reset rw_request.
557 // Note - we are under the rw_request lock here!
558
559 repl_write_make_message(rw, tr);
560 send_rw_messages_forget(rw);
561}
562
563
564void
565udf_repl_write_cb(rw_request* rw)
566{
567 BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf, FROM_CLIENT, repl_write);
568 BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf_sub, FROM_IUDF, repl_write);
569
570 as_transaction tr;
571 as_transaction_init_from_rw(&tr, rw);
572
573 finished_replicated(&tr);
574 send_udf_response(&tr, &rw->response_db);
575
576 // Finished transaction - rw_request cleans up reservation and msgp!
577}
578
579
580//==========================================================
581// Local helpers - transaction end.
582//
583
584void
585send_udf_response(as_transaction* tr, cf_dyn_buf* db)
586{
587 // Paranoia - shouldn't get here on losing race with timeout.
588 if (! tr->from.any) {
589 cf_warning(AS_RW, "transaction origin %u has null 'from'", tr->origin);
590 return;
591 }
592
593 // Note - if tr was setup from rw, rw->from.any has been set null and
594 // informs timeout it lost the race.
595
596 clear_delete_response_metadata(tr);
597
598 switch (tr->origin) {
599 case FROM_CLIENT:
600 if (db && db->used_sz != 0) {
601 as_msg_send_ops_reply(tr->from.proto_fd_h, db);
602 }
603 else {
604 as_msg_send_reply(tr->from.proto_fd_h, tr->result_code,
605 tr->generation, tr->void_time, NULL, NULL, 0, tr->rsv.ns,
606 as_transaction_trid(tr));
607 }
608 BENCHMARK_NEXT_DATA_POINT(tr, udf, response);
609 HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(tr, udf_hist);
610 client_udf_update_stats(tr->rsv.ns, tr->result_code);
611 break;
612 case FROM_PROXY:
613 if (db && db->used_sz != 0) {
614 as_proxy_send_ops_response(tr->from.proxy_node,
615 tr->from_data.proxy_tid, db);
616 }
617 else {
618 as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid,
619 tr->result_code, tr->generation, tr->void_time, NULL, NULL,
620 0, tr->rsv.ns, as_transaction_trid(tr));
621 }
622 from_proxy_udf_update_stats(tr->rsv.ns, tr->result_code);
623 break;
624 case FROM_IUDF:
625 if (db && db->used_sz != 0) {
626 cf_crash(AS_RW, "unexpected - internal udf has response");
627 }
628 tr->from.iudf_orig->cb(tr->from.iudf_orig->udata, tr->result_code);
629 BENCHMARK_NEXT_DATA_POINT(tr, udf_sub, response);
630 udf_sub_udf_update_stats(tr->rsv.ns, tr->result_code);
631 break;
632 default:
633 cf_crash(AS_RW, "unexpected transaction origin %u", tr->origin);
634 break;
635 }
636
637 tr->from.any = NULL; // pattern, not needed
638}
639
640
641void
642udf_timeout_cb(rw_request* rw)
643{
644 if (! rw->from.any) {
645 return; // lost race against dup-res or repl-write callback
646 }
647
648 finished_not_replicated(rw);
649
650 switch (rw->origin) {
651 case FROM_CLIENT:
652 as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL,
653 0, rw->rsv.ns, rw_request_trid(rw));
654 // Timeouts aren't included in histograms.
655 client_udf_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
656 break;
657 case FROM_PROXY:
658 from_proxy_udf_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
659 break;
660 case FROM_IUDF:
661 rw->from.iudf_orig->cb(rw->from.iudf_orig->udata, AS_ERR_TIMEOUT);
662 // Timeouts aren't included in histograms.
663 udf_sub_udf_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
664 break;
665 default:
666 cf_crash(AS_RW, "unexpected transaction origin %u", rw->origin);
667 break;
668 }
669
670 rw->from.any = NULL; // inform other callback it lost the race
671}
672
673
674//==========================================================
675// Local helpers - UDF.
676//
677
678transaction_status
679udf_master(rw_request* rw, as_transaction* tr)
680{
681 CF_ALLOC_SET_NS_ARENA(tr->rsv.ns);
682
683 udf_def def;
684 udf_call call = { &def, tr };
685
686 if (tr->origin == FROM_IUDF) {
687 call.def = &tr->from.iudf_orig->def;
688 }
689 else if (! udf_def_init_from_msg(call.def, tr)) {
690 cf_warning(AS_UDF, "failed udf_def_init_from_msg");
691 tr->result_code = AS_ERR_PARAMETER;
692 return TRANS_DONE_ERROR;
693 }
694
695 udf_optype optype = udf_master_apply(&call, rw);
696
697 if (tr->origin != FROM_IUDF && call.def->arglist) {
698 as_list_destroy(call.def->arglist);
699 }
700
701 if (optype == UDF_OPTYPE_READ || optype == UDF_OPTYPE_NONE) {
702 // UDF is done, no replica writes needed.
703 return TRANS_DONE_SUCCESS;
704 }
705
706 return optype == UDF_OPTYPE_WAITING ? TRANS_WAITING : TRANS_IN_PROGRESS;
707}
708
709
710udf_optype
711udf_master_apply(udf_call* call, rw_request* rw)
712{
713 as_transaction* tr = call->tr;
714 as_namespace* ns = tr->rsv.ns;
715
716 // Find record in index.
717
718 as_index_ref r_ref;
719 int get_rv = as_record_get(tr->rsv.tree, &tr->keyd, &r_ref);
720
721 if (get_rv == 0 && as_record_is_doomed(r_ref.r, ns)) {
722 // If record is expired or truncated, pretend it was not found.
723 as_record_done(&r_ref, ns);
724 get_rv = -1;
725 }
726
727 if (get_rv == 0 && repl_state_check(r_ref.r, tr) < 0) {
728 as_record_done(&r_ref, ns);
729 return UDF_OPTYPE_WAITING;
730 }
731
732 if (tr->origin == FROM_IUDF &&
733 (get_rv == -1 || ! as_record_is_live(r_ref.r))) {
734 // Internal UDFs must not create records.
735 tr->result_code = AS_ERR_NOT_FOUND;
736 process_failure(call, NULL, &rw->response_db);
737 return UDF_OPTYPE_NONE;
738 }
739
740 // Apply predexp metadata filter if present & not internal UDF.
741
742 int rv;
743 predexp_eval_t* predexp = NULL;
744
745 if (tr->origin != FROM_IUDF && get_rv == 0 && as_record_is_live(r_ref.r) &&
746 (rv = build_predexp_and_filter_meta(tr, r_ref.r, &predexp)) != 0) {
747 tr->result_code = rv;
748 process_failure(call, NULL, &rw->response_db);
749 return UDF_OPTYPE_NONE;
750 }
751
752 // Open storage record.
753
754 as_storage_rd rd;
755
756 udf_record urecord;
757 udf_record_init(&urecord, true);
758
759 xdr_dirty_bins dirty_bins;
760 xdr_clear_dirty_bins(&dirty_bins);
761
762 urecord.r_ref = &r_ref;
763 urecord.tr = tr;
764 urecord.rd = &rd;
765 urecord.dirty = &dirty_bins;
766 urecord.keyd = tr->keyd;
767
768 if (get_rv == 0) {
769 urecord.flag |= (UDF_RECORD_FLAG_OPEN | UDF_RECORD_FLAG_PREEXISTS);
770
771 if (udf_storage_record_open(&urecord) != 0) {
772 predexp_destroy(predexp);
773 udf_record_close(&urecord);
774 tr->result_code = AS_ERR_BIN_NAME; // overloaded... add bin_count error?
775 process_failure(call, NULL, &rw->response_db);
776 return UDF_OPTYPE_NONE;
777 }
778
779 if (predexp != NULL || (tr->origin == FROM_IUDF &&
780 tr->from.iudf_orig->predexp != NULL)) {
781 predexp_args_t predargs = { .ns = ns, .md = r_ref.r, .rd = &rd };
782
783 if (! predexp_matches_record(tr->origin == FROM_IUDF ?
784 tr->from.iudf_orig->predexp : predexp, &predargs)) {
785 predexp_destroy(predexp);
786 udf_record_close(&urecord);
787 tr->result_code = AS_ERR_FILTERED_OUT;
788 process_failure(call, NULL, &rw->response_db);
789 return UDF_OPTYPE_NONE;
790 }
791
792 predexp_destroy(predexp);
793 }
794
795 as_msg* m = &tr->msgp->msg;
796
797 // If both the record and the message have keys, check them.
798 if (rd.key) {
799 if (as_transaction_has_key(tr) && ! check_msg_key(m, &rd)) {
800 udf_record_close(&urecord);
801 tr->result_code = AS_ERR_KEY_MISMATCH;
802 process_failure(call, NULL, &rw->response_db);
803 return UDF_OPTYPE_NONE;
804 }
805 }
806 else {
807 // If the message has a key, apply it to the record.
808 if (! get_msg_key(tr, &rd)) {
809 udf_record_close(&urecord);
810 tr->result_code = AS_ERR_UNSUPPORTED_FEATURE;
811 process_failure(call, NULL, &rw->response_db);
812 return UDF_OPTYPE_NONE;
813 }
814
815 urecord.flag |= UDF_RECORD_FLAG_METADATA_UPDATED;
816 }
817 }
818 else {
819 urecord.flag &= ~(UDF_RECORD_FLAG_OPEN |
820 UDF_RECORD_FLAG_STORAGE_OPEN |
821 UDF_RECORD_FLAG_PREEXISTS);
822 }
823
824 // Run UDF.
825
826 // This as_rec needs to be in the heap - once passed into the lua scope it
827 // gets garbage collected later. Also, the destroy hook is set to NULL so
828 // garbage collection has nothing to do.
829 as_rec* urec = as_rec_new(&urecord, &udf_record_hooks);
830
831 as_val_reserve(urec); // for lua
832
833 as_result result;
834 as_result_init(&result);
835
836 int apply_rv = udf_apply_record(call, urec, &result);
837
838 udf_optype optype = UDF_OPTYPE_NONE;
839
840 if (apply_rv == 0) {
841 udf_finish(&urecord, rw, &optype);
842 process_result(&result, call, &rw->response_db);
843 }
844 else {
845 udf_record_close(&urecord);
846
847 char* rs = as_module_err_string(apply_rv);
848
849 tr->result_code = AS_ERR_UDF_EXECUTION;
850 process_failure_str(call, rs, strlen(rs), &rw->response_db);
851 cf_free(rs);
852 }
853
854 update_lua_complete_stats(tr->origin, ns, optype, apply_rv,
855 result.is_success);
856
857 as_result_destroy(&result);
858 udf_record_destroy(urec);
859
860 return optype;
861}
862
863
864int
865udf_apply_record(udf_call* call, as_rec* rec, as_result* result)
866{
867 // timedout callback gives no 'udata' per UDF - use thread-local.
868 g_end_ns = ((udf_record*)rec->data)->tr->end_time;
869
870 as_timer timer;
871
872 static const as_timer_hooks udf_timer_hooks = {
873 .destroy = NULL,
874 .timedout = udf_timer_timedout,
875 .timeslice = udf_timer_timeslice
876 };
877
878 as_timer_init(&timer, NULL, &udf_timer_hooks);
879
880 as_udf_context ctx = {
881 .as = &g_as_aerospike,
882 .timer = &timer,
883 .memtracker = NULL
884 };
885
886 return as_module_apply_record(&mod_lua, &ctx, call->def->filename,
887 call->def->function, rec, call->def->arglist, result);
888}
889
890
891void
892udf_finish(udf_record* urecord, rw_request* rw, udf_optype* record_op)
893{
894 *record_op = UDF_OPTYPE_READ;
895
896 udf_optype final_op = udf_finish_op(urecord);
897
898 if (final_op == UDF_OPTYPE_DELETE) {
899 *record_op = UDF_OPTYPE_DELETE;
900 urecord->tr->flags |= AS_TRANSACTION_FLAG_IS_DELETE;
901 }
902 else if (final_op == UDF_OPTYPE_WRITE) {
903 *record_op = UDF_OPTYPE_WRITE;
904 }
905
906 udf_post_processing(urecord, rw, final_op);
907}
908
909
910udf_optype
911udf_finish_op(udf_record* urecord)
912{
913 if (udf_zero_bins_left(urecord)) {
914 // Amazingly, with respect to stored key, memory statistics work out
915 // correctly regardless of what this returns.
916 return udf_finish_delete(urecord);
917 }
918
919 if ((urecord->flag & UDF_RECORD_FLAG_HAS_UPDATES) != 0) {
920 if ((urecord->flag & UDF_RECORD_FLAG_OPEN) == 0) {
921 cf_crash(AS_UDF, "updated record not open");
922 }
923
924 return UDF_OPTYPE_WRITE;
925 }
926
927 return UDF_OPTYPE_READ;
928}
929
930
931void
932udf_post_processing(udf_record* urecord, rw_request* rw, udf_optype urecord_op)
933{
934 as_storage_rd* rd = urecord->rd;
935 as_transaction* tr = urecord->tr;
936 as_namespace* ns = rd->ns;
937 as_record* r = rd->r;
938
939 uint16_t generation = 0;
940 uint16_t set_id = 0;
941 xdr_dirty_bins dirty_bins;
942
943 if (urecord_op == UDF_OPTYPE_WRITE || urecord_op == UDF_OPTYPE_DELETE) {
944 as_msg* m = &tr->msgp->msg;
945
946 // Convert message TTL special value if appropriate.
947 if (m->record_ttl == TTL_DONT_UPDATE &&
948 (urecord->flag & UDF_RECORD_FLAG_PREEXISTS) == 0) {
949 m->record_ttl = TTL_NAMESPACE_DEFAULT;
950 }
951
952 update_metadata_in_index(tr, r);
953
954 // TODO - old pickle - remove in "six months".
955 if (as_exchange_min_compatibility_id() < 3) {
956 rw->is_old_pickle = true;
957 pickle_all(rd, rw);
958 }
959
960 tr->generation = r->generation;
961 tr->void_time = r->void_time;
962 tr->last_update_time = r->last_update_time;
963
964 // Store or drop the key as appropriate.
965 as_record_finalize_key(r, ns, rd->key, rd->key_size);
966
967 as_storage_record_adjust_mem_stats(rd, urecord->starting_memory_bytes);
968
969 will_replicate(r, ns);
970
971 // Collect information for XDR before closing the record.
972 generation = plain_generation(r->generation, ns);
973 set_id = as_index_get_set_id(r);
974
975 if (urecord->dirty && urecord_op == UDF_OPTYPE_WRITE) {
976 xdr_clear_dirty_bins(&dirty_bins);
977 xdr_copy_dirty_bins(urecord->dirty, &dirty_bins);
978 }
979 }
980
981 // Will we need a pickle?
982 // TODO - old pickle - remove condition in "six months".
983 if (! rw->is_old_pickle) {
984 rd->keep_pickle = rw->n_dest_nodes != 0;
985 }
986
987 // Close the record for all the cases.
988 udf_record_close(urecord);
989
990 // TODO - old pickle - remove condition in "six months".
991 if (! rw->is_old_pickle) {
992 // Yes, it's safe to use these urecord fields after udf_record_close().
993 rw->pickle = urecord->pickle;
994 rw->pickle_sz = urecord->pickle_sz;
995 }
996
997 // Write to XDR pipe.
998 if (urecord_op == UDF_OPTYPE_WRITE) {
999 xdr_write(tr->rsv.ns, &tr->keyd, generation, 0, XDR_OP_TYPE_WRITE,
1000 set_id, &dirty_bins);
1001 }
1002 else if (urecord_op == UDF_OPTYPE_DELETE) {
1003 xdr_write(tr->rsv.ns, &tr->keyd, 0, 0,
1004 as_transaction_is_durable_delete(tr) ?
1005 XDR_OP_TYPE_DURABLE_DELETE : XDR_OP_TYPE_DROP,
1006 set_id, NULL);
1007 }
1008}
1009
1010
1011bool
1012udf_timer_timedout(const as_timer* timer)
1013{
1014 uint64_t now = cf_getns();
1015
1016 if (now < g_end_ns) {
1017 return false;
1018 }
1019
1020 cf_warning(AS_UDF, "UDF timed out %lu ms ago", (now - g_end_ns) / 1000000);
1021
1022 return true;
1023}
1024
1025
1026uint64_t
1027udf_timer_timeslice(const as_timer* timer)
1028{
1029 uint64_t now = cf_getns();
1030
1031 return g_end_ns > now ? (g_end_ns - now) / 1000000 : 1;
1032}
1033
1034
1035//==========================================================
1036// Local helpers - statistics.
1037//
1038
1039void
1040update_lua_complete_stats(uint8_t origin, as_namespace* ns, udf_optype op,
1041 int ret, bool is_success)
1042{
1043 switch (origin) {
1044 case FROM_CLIENT:
1045 if (ret == 0 && is_success) {
1046 if (op == UDF_OPTYPE_READ) {
1047 cf_atomic64_incr(&ns->n_client_lang_read_success);
1048 }
1049 else if (op == UDF_OPTYPE_DELETE) {
1050 cf_atomic64_incr(&ns->n_client_lang_delete_success);
1051 }
1052 else if (op == UDF_OPTYPE_WRITE) {
1053 cf_atomic64_incr(&ns->n_client_lang_write_success);
1054 }
1055 }
1056 else {
1057 cf_info(AS_UDF, "lua error, ret:%d", ret);
1058 cf_atomic64_incr(&ns->n_client_lang_error);
1059 }
1060 break;
1061 case FROM_PROXY:
1062 if (ret == 0 && is_success) {
1063 if (op == UDF_OPTYPE_READ) {
1064 cf_atomic64_incr(&ns->n_from_proxy_lang_read_success);
1065 }
1066 else if (op == UDF_OPTYPE_DELETE) {
1067 cf_atomic64_incr(&ns->n_from_proxy_lang_delete_success);
1068 }
1069 else if (op == UDF_OPTYPE_WRITE) {
1070 cf_atomic64_incr(&ns->n_from_proxy_lang_write_success);
1071 }
1072 }
1073 else {
1074 cf_info(AS_UDF, "lua error, ret:%d", ret);
1075 cf_atomic64_incr(&ns->n_from_proxy_lang_error);
1076 }
1077 break;
1078 case FROM_IUDF:
1079 if (ret == 0 && is_success) {
1080 if (op == UDF_OPTYPE_READ) {
1081 // Note - this would be weird, since there's nowhere for a
1082 // response to go in our current UDF scans & queries.
1083 cf_atomic64_incr(&ns->n_udf_sub_lang_read_success);
1084 }
1085 else if (op == UDF_OPTYPE_DELETE) {
1086 cf_atomic64_incr(&ns->n_udf_sub_lang_delete_success);
1087 }
1088 else if (op == UDF_OPTYPE_WRITE) {
1089 cf_atomic64_incr(&ns->n_udf_sub_lang_write_success);
1090 }
1091 }
1092 else {
1093 cf_info(AS_UDF, "lua error, ret:%d", ret);
1094 cf_atomic64_incr(&ns->n_udf_sub_lang_error);
1095 }
1096 break;
1097 default:
1098 cf_crash(AS_UDF, "unexpected transaction origin %u", origin);
1099 break;
1100 }
1101}
1102
1103
1104//==========================================================
1105// Local helpers - construct response to be sent to origin.
1106//
1107
1108void
1109process_failure_str(udf_call* call, const char* err_str, size_t len,
1110 cf_dyn_buf* db)
1111{
1112 if (! err_str) {
1113 // Better than sending an as_string with null value.
1114 process_failure(call, NULL, db);
1115 return;
1116 }
1117
1118 as_string stack_s;
1119 as_string_init_wlen(&stack_s, (char*)err_str, len, false);
1120
1121 process_failure(call, as_string_toval(&stack_s), db);
1122}
1123
1124
1125void
1126process_result(const as_result* result, udf_call* call, cf_dyn_buf* db)
1127{
1128 as_val* val = result->value;
1129
1130 if (result->is_success) {
1131 process_success(call, val, db);
1132 return;
1133 }
1134
1135 // Failures...
1136
1137 if (as_val_type(val) == AS_STRING) {
1138 call->tr->result_code = AS_ERR_UDF_EXECUTION;
1139 process_failure(call, val, db);
1140 return;
1141 }
1142
1143 char lua_err_str[1024];
1144 size_t len = (size_t)sprintf(lua_err_str,
1145 "%s:0: in function %s() - error() argument type not handled",
1146 call->def->filename, call->def->function);
1147
1148 call->tr->result_code = AS_ERR_UDF_EXECUTION;
1149 process_failure_str(call, lua_err_str, len, db);
1150}
1151
1152
1153void
1154process_response(udf_call* call, bool success, const as_val* val,
1155 cf_dyn_buf* db)
1156{
1157 // No response for background (internal) UDF.
1158 if (call->def->type == AS_UDF_OP_BACKGROUND) {
1159 return;
1160 }
1161
1162 as_transaction* tr = call->tr;
1163
1164 // Note - this function quietly handles a null val. The response call will
1165 // be given a bin with a name but not 'in use', and it does the right thing.
1166
1167 size_t msg_sz = 0;
1168
1169 db->buf = (uint8_t *)as_msg_make_val_response(success, val, tr->result_code,
1170 tr->generation, tr->void_time, as_transaction_trid(tr), &msg_sz);
1171
1172 db->is_stack = false;
1173 db->alloc_sz = msg_sz;
1174 db->used_sz = msg_sz;
1175}
1176