1 | /* |
2 | * udf.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/udf.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | |
33 | #include "aerospike/as_aerospike.h" |
34 | #include "aerospike/as_atomic.h" |
35 | #include "aerospike/as_buffer.h" |
36 | #include "aerospike/as_log.h" |
37 | #include "aerospike/as_list.h" |
38 | #include "aerospike/as_module.h" |
39 | #include "aerospike/as_msgpack.h" |
40 | #include "aerospike/as_serializer.h" |
41 | #include "aerospike/as_types.h" |
42 | #include "aerospike/as_udf_context.h" |
43 | #include "aerospike/mod_lua.h" |
44 | |
45 | #include "citrusleaf/alloc.h" |
46 | #include "citrusleaf/cf_atomic.h" |
47 | #include "citrusleaf/cf_clock.h" |
48 | |
49 | #include "cf_mutex.h" |
50 | #include "dynbuf.h" |
51 | #include "fault.h" |
52 | |
53 | #include "base/cfg.h" |
54 | #include "base/datamodel.h" |
55 | #include "base/predexp.h" |
56 | #include "base/proto.h" |
57 | #include "base/secondary_index.h" |
58 | #include "base/transaction.h" |
59 | #include "base/transaction_policy.h" |
60 | #include "base/udf_aerospike.h" |
61 | #include "base/udf_arglist.h" |
62 | #include "base/udf_cask.h" |
63 | #include "base/udf_record.h" |
64 | #include "fabric/exchange.h" // TODO - old pickle - remove in "six months" |
65 | #include "fabric/partition.h" |
66 | #include "storage/storage.h" |
67 | #include "transaction/duplicate_resolve.h" |
68 | #include "transaction/proxy.h" |
69 | #include "transaction/replica_write.h" |
70 | #include "transaction/rw_request.h" |
71 | #include "transaction/rw_request_hash.h" |
72 | #include "transaction/rw_utils.h" |
73 | |
74 | |
75 | //========================================================== |
76 | // Typedefs & constants. |
77 | // |
78 | |
79 | static const cf_fault_severity as_log_level_map[5] = { |
80 | [AS_LOG_LEVEL_ERROR] = CF_WARNING, |
81 | [AS_LOG_LEVEL_WARN] = CF_WARNING, |
82 | [AS_LOG_LEVEL_INFO] = CF_INFO, |
83 | [AS_LOG_LEVEL_DEBUG] = CF_DEBUG, |
84 | [AS_LOG_LEVEL_TRACE] = CF_DETAIL |
85 | }; |
86 | |
87 | typedef struct udf_call_s { |
88 | udf_def* def; |
89 | as_transaction* tr; |
90 | } udf_call; |
91 | |
92 | |
93 | //========================================================== |
94 | // Globals. |
95 | // |
96 | |
97 | as_aerospike g_as_aerospike; |
98 | |
99 | // Deadline per UDF. |
100 | static __thread uint64_t g_end_ns; |
101 | |
102 | |
103 | //========================================================== |
104 | // Forward declarations. |
105 | // |
106 | |
107 | bool log_callback(as_log_level level, const char* func, const char* file, |
108 | uint32_t line, const char* fmt, ...); |
109 | |
110 | void start_udf_dup_res(rw_request* rw, as_transaction* tr); |
111 | void start_udf_repl_write(rw_request* rw, as_transaction* tr); |
112 | void start_udf_repl_write_forget(rw_request* rw, as_transaction* tr); |
113 | bool udf_dup_res_cb(rw_request* rw); |
114 | void udf_repl_write_after_dup_res(rw_request* rw, as_transaction* tr); |
115 | void udf_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr); |
116 | void udf_repl_write_cb(rw_request* rw); |
117 | |
118 | void send_udf_response(as_transaction* tr, cf_dyn_buf* db); |
119 | void udf_timeout_cb(rw_request* rw); |
120 | |
121 | transaction_status udf_master(rw_request* rw, as_transaction* tr); |
122 | udf_optype udf_master_apply(udf_call* call, rw_request* rw); |
123 | int udf_apply_record(udf_call* call, as_rec* rec, as_result* result); |
124 | void udf_finish(udf_record* urecord, rw_request* rw, udf_optype* record_op); |
125 | udf_optype udf_finish_op(udf_record* urecord); |
126 | void udf_post_processing(udf_record* urecord, rw_request* rw, |
127 | udf_optype urecord_op); |
128 | bool udf_timer_timedout(const as_timer* timer); |
129 | uint64_t udf_timer_timeslice(const as_timer* timer); |
130 | |
131 | void update_lua_complete_stats(uint8_t origin, as_namespace* ns, udf_optype op, |
132 | int ret, bool is_success); |
133 | |
134 | void process_failure_str(udf_call* call, const char* err_str, size_t len, |
135 | cf_dyn_buf* db); |
136 | void process_result(const as_result* result, udf_call* call, cf_dyn_buf* db); |
137 | void process_response(udf_call* call, bool success, const as_val* val, |
138 | cf_dyn_buf* db); |
139 | |
140 | |
141 | //========================================================== |
142 | // Inlines & macros. |
143 | // |
144 | |
145 | static inline void |
146 | client_udf_update_stats(as_namespace* ns, uint8_t result_code) |
147 | { |
148 | switch (result_code) { |
149 | case AS_OK: |
150 | cf_atomic64_incr(&ns->n_client_udf_complete); |
151 | break; |
152 | default: |
153 | cf_atomic64_incr(&ns->n_client_udf_error); |
154 | break; |
155 | case AS_ERR_TIMEOUT: |
156 | cf_atomic64_incr(&ns->n_client_udf_timeout); |
157 | break; |
158 | case AS_ERR_FILTERED_OUT: |
159 | cf_atomic64_incr(&ns->n_client_udf_filtered_out); |
160 | break; |
161 | } |
162 | } |
163 | |
164 | static inline void |
165 | from_proxy_udf_update_stats(as_namespace* ns, uint8_t result_code) |
166 | { |
167 | switch (result_code) { |
168 | case AS_OK: |
169 | cf_atomic64_incr(&ns->n_from_proxy_udf_complete); |
170 | break; |
171 | default: |
172 | cf_atomic64_incr(&ns->n_from_proxy_udf_error); |
173 | break; |
174 | case AS_ERR_TIMEOUT: |
175 | cf_atomic64_incr(&ns->n_from_proxy_udf_timeout); |
176 | break; |
177 | case AS_ERR_FILTERED_OUT: |
178 | cf_atomic64_incr(&ns->n_from_proxy_udf_filtered_out); |
179 | break; |
180 | } |
181 | } |
182 | |
183 | static inline void |
184 | udf_sub_udf_update_stats(as_namespace* ns, uint8_t result_code) |
185 | { |
186 | switch (result_code) { |
187 | case AS_OK: |
188 | cf_atomic64_incr(&ns->n_udf_sub_udf_complete); |
189 | break; |
190 | default: |
191 | cf_atomic64_incr(&ns->n_udf_sub_udf_error); |
192 | break; |
193 | case AS_ERR_TIMEOUT: |
194 | cf_atomic64_incr(&ns->n_udf_sub_udf_timeout); |
195 | break; |
196 | case AS_ERR_FILTERED_OUT: // doesn't include those filtered out by metadata |
197 | as_incr_uint64(&ns->n_udf_sub_udf_filtered_out); |
198 | break; |
199 | } |
200 | } |
201 | |
202 | static inline bool |
203 | udf_zero_bins_left(udf_record* urecord) |
204 | { |
205 | return (urecord->flag & UDF_RECORD_FLAG_OPEN) != 0 && |
206 | ! as_bin_inuse_has(urecord->rd); |
207 | } |
208 | |
209 | static inline void |
210 | process_failure(udf_call* call, const as_val* val, cf_dyn_buf* db) |
211 | { |
212 | process_response(call, false, val, db); |
213 | } |
214 | |
215 | static inline void |
216 | process_success(udf_call* call, const as_val* val, cf_dyn_buf* db) |
217 | { |
218 | process_response(call, true, val, db); |
219 | } |
220 | |
221 | |
222 | //========================================================== |
223 | // Public API. |
224 | // |
225 | |
226 | void |
227 | as_udf_init() |
228 | { |
229 | as_module_configure(&mod_lua, &g_config.mod_lua); |
230 | as_log_set_callback(log_callback); |
231 | udf_cask_init(); |
232 | as_aerospike_init(&g_as_aerospike, NULL, &udf_aerospike_hooks); |
233 | } |
234 | |
235 | |
236 | // Public API for udf_def class, not big enough for it's own file. |
237 | udf_def* |
238 | udf_def_init_from_msg(udf_def* def, const as_transaction* tr) |
239 | { |
240 | def->arglist = NULL; |
241 | |
242 | as_msg* m = &tr->msgp->msg; |
243 | as_msg_field* filename = |
244 | as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_FILENAME); |
245 | |
246 | if (! filename) { |
247 | return NULL; |
248 | } |
249 | |
250 | as_msg_field* function = |
251 | as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_FUNCTION); |
252 | |
253 | if (! function) { |
254 | return NULL; |
255 | } |
256 | |
257 | as_msg_field* arglist = as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_ARGLIST); |
258 | |
259 | if (! arglist) { |
260 | return NULL; |
261 | } |
262 | |
263 | uint32_t filename_len = as_msg_field_get_value_sz(filename); |
264 | |
265 | if (filename_len >= sizeof(def->filename)) { |
266 | return NULL; |
267 | } |
268 | |
269 | uint32_t function_len = as_msg_field_get_value_sz(function); |
270 | |
271 | if (function_len >= sizeof(def->function)) { |
272 | return NULL; |
273 | } |
274 | |
275 | memcpy(def->filename, filename->data, filename_len); |
276 | def->filename[filename_len] = '\0'; |
277 | |
278 | memcpy(def->function, function->data, function_len); |
279 | def->function[function_len] = '\0'; |
280 | |
281 | as_unpacker unpacker; |
282 | |
283 | unpacker.buffer = (const unsigned char*)arglist->data; |
284 | unpacker.length = as_msg_field_get_value_sz(arglist); |
285 | unpacker.offset = 0; |
286 | |
287 | if (unpacker.length > 0) { |
288 | as_val* val = NULL; |
289 | int ret = as_unpack_val(&unpacker, &val); |
290 | |
291 | if (ret == 0 && as_val_type(val) == AS_LIST) { |
292 | def->arglist = (as_list*)val; |
293 | } |
294 | } |
295 | |
296 | as_msg_field* op = as_transaction_has_udf_op(tr) ? |
297 | as_msg_field_get(m, AS_MSG_FIELD_TYPE_UDF_OP) : NULL; |
298 | |
299 | def->type = op ? *op->data : AS_UDF_OP_KVS; |
300 | |
301 | return def; |
302 | } |
303 | |
304 | |
305 | transaction_status |
306 | as_udf_start(as_transaction* tr) |
307 | { |
308 | BENCHMARK_START(tr, udf, FROM_CLIENT); |
309 | BENCHMARK_START(tr, udf_sub, FROM_IUDF); |
310 | |
311 | // Apply XDR filter. |
312 | if (! xdr_allows_write(tr)) { |
313 | tr->result_code = AS_ERR_ALWAYS_FORBIDDEN; |
314 | send_udf_response(tr, NULL); |
315 | return TRANS_DONE_ERROR; |
316 | } |
317 | |
318 | // Don't know if UDF is read or delete - check that we aren't backed up. |
319 | if (as_storage_overloaded(tr->rsv.ns)) { |
320 | tr->result_code = AS_ERR_DEVICE_OVERLOAD; |
321 | send_udf_response(tr, NULL); |
322 | return TRANS_DONE_ERROR; |
323 | } |
324 | |
325 | // Create rw_request and add to hash. |
326 | rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd }; |
327 | rw_request* rw = rw_request_create(&tr->keyd); |
328 | transaction_status status = rw_request_hash_insert(&hkey, rw, tr); |
329 | |
330 | // If rw_request wasn't inserted in hash, transaction is finished. |
331 | if (status != TRANS_IN_PROGRESS) { |
332 | rw_request_release(rw); |
333 | |
334 | if (status != TRANS_WAITING) { |
335 | send_udf_response(tr, NULL); |
336 | } |
337 | |
338 | return status; |
339 | } |
340 | // else - rw_request is now in hash, continue... |
341 | |
342 | if (tr->rsv.ns->write_dup_res_disabled) { |
343 | // Note - preventing duplicate resolution this way allows |
344 | // rw_request_destroy() to handle dup_msg[] cleanup correctly. |
345 | tr->rsv.n_dupl = 0; |
346 | } |
347 | |
348 | // If there are duplicates to resolve, start doing so. |
349 | if (tr->rsv.n_dupl != 0) { |
350 | start_udf_dup_res(rw, tr); |
351 | |
352 | // Started duplicate resolution. |
353 | return TRANS_IN_PROGRESS; |
354 | } |
355 | // else - no duplicate resolution phase, apply operation to master. |
356 | |
357 | // Set up the nodes to which we'll write replicas. |
358 | rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p, |
359 | rw->dest_nodes); |
360 | |
361 | if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) { |
362 | rw_request_hash_delete(&hkey, rw); |
363 | tr->result_code = AS_ERR_UNAVAILABLE; |
364 | send_udf_response(tr, NULL); |
365 | return TRANS_DONE_ERROR; |
366 | } |
367 | |
368 | status = udf_master(rw, tr); |
369 | |
370 | BENCHMARK_NEXT_DATA_POINT_FROM(tr, udf, FROM_CLIENT, master); |
371 | BENCHMARK_NEXT_DATA_POINT_FROM(tr, udf_sub, FROM_IUDF, master); |
372 | |
373 | // If error or UDF was a read, transaction is finished. |
374 | if (status != TRANS_IN_PROGRESS) { |
375 | if (status != TRANS_WAITING) { |
376 | send_udf_response(tr, &rw->response_db); |
377 | } |
378 | |
379 | rw_request_hash_delete(&hkey, rw); |
380 | return status; |
381 | } |
382 | |
383 | // If we don't need replica writes, transaction is finished. |
384 | if (rw->n_dest_nodes == 0) { |
385 | finished_replicated(tr); |
386 | send_udf_response(tr, &rw->response_db); |
387 | rw_request_hash_delete(&hkey, rw); |
388 | return TRANS_DONE_SUCCESS; |
389 | } |
390 | |
391 | // If we don't need to wait for replica write acks, fire and forget. |
392 | if (respond_on_master_complete(tr)) { |
393 | start_udf_repl_write_forget(rw, tr); |
394 | send_udf_response(tr, &rw->response_db); |
395 | rw_request_hash_delete(&hkey, rw); |
396 | return TRANS_DONE_SUCCESS; |
397 | } |
398 | |
399 | start_udf_repl_write(rw, tr); |
400 | |
401 | // Started replica write. |
402 | return TRANS_IN_PROGRESS; |
403 | } |
404 | |
405 | |
406 | //========================================================== |
407 | // Local helpers - initialization. |
408 | // |
409 | |
410 | bool |
411 | log_callback(as_log_level level, const char* func, const char* file, |
412 | uint32_t line, const char* fmt, ...) |
413 | { |
414 | cf_fault_severity severity = as_log_level_map[level]; |
415 | |
416 | if (severity > cf_fault_filter[AS_UDF]) { |
417 | return true; |
418 | } |
419 | |
420 | va_list ap; |
421 | |
422 | va_start(ap, fmt); |
423 | char message[1024] = { '\0' }; |
424 | vsnprintf(message, 1024, fmt, ap); |
425 | va_end(ap); |
426 | |
427 | cf_fault_event(AS_UDF, severity, file, line, "%s" , message); |
428 | |
429 | return true; |
430 | } |
431 | |
432 | |
433 | //========================================================== |
434 | // Local helpers - transaction flow. |
435 | // |
436 | |
437 | void |
438 | start_udf_dup_res(rw_request* rw, as_transaction* tr) |
439 | { |
440 | // Finish initializing rw, construct and send dup-res message. |
441 | |
442 | dup_res_make_message(rw, tr); |
443 | |
444 | cf_mutex_lock(&rw->lock); |
445 | |
446 | dup_res_setup_rw(rw, tr, udf_dup_res_cb, udf_timeout_cb); |
447 | send_rw_messages(rw); |
448 | |
449 | cf_mutex_unlock(&rw->lock); |
450 | } |
451 | |
452 | |
453 | void |
454 | start_udf_repl_write(rw_request* rw, as_transaction* tr) |
455 | { |
456 | // Finish initializing rw, construct and send repl-write message. |
457 | |
458 | repl_write_make_message(rw, tr); |
459 | |
460 | cf_mutex_lock(&rw->lock); |
461 | |
462 | repl_write_setup_rw(rw, tr, udf_repl_write_cb, udf_timeout_cb); |
463 | send_rw_messages(rw); |
464 | |
465 | cf_mutex_unlock(&rw->lock); |
466 | } |
467 | |
468 | |
469 | void |
470 | start_udf_repl_write_forget(rw_request* rw, as_transaction* tr) |
471 | { |
472 | // Construct and send repl-write message. No need to finish rw setup. |
473 | |
474 | repl_write_make_message(rw, tr); |
475 | send_rw_messages_forget(rw); |
476 | } |
477 | |
478 | |
479 | bool |
480 | udf_dup_res_cb(rw_request* rw) |
481 | { |
482 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf, FROM_CLIENT, dup_res); |
483 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf_sub, FROM_IUDF, dup_res); |
484 | |
485 | as_transaction tr; |
486 | as_transaction_init_from_rw(&tr, rw); |
487 | |
488 | if (tr.result_code != AS_OK) { |
489 | send_udf_response(&tr, NULL); |
490 | return true; |
491 | } |
492 | |
493 | // Set up the nodes to which we'll write replicas. |
494 | rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p, |
495 | rw->dest_nodes); |
496 | |
497 | if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) { |
498 | tr.result_code = AS_ERR_UNAVAILABLE; |
499 | send_udf_response(&tr, NULL); |
500 | return true; |
501 | } |
502 | |
503 | transaction_status status = udf_master(rw, &tr); |
504 | |
505 | BENCHMARK_NEXT_DATA_POINT_FROM((&tr), udf, FROM_CLIENT, master); |
506 | BENCHMARK_NEXT_DATA_POINT_FROM((&tr), udf_sub, FROM_IUDF, master); |
507 | |
508 | if (status == TRANS_WAITING) { |
509 | // Note - new tr now owns msgp, make sure rw destructor doesn't free it. |
510 | // Also, rw will release rsv - new tr will get a new one. |
511 | rw->msgp = NULL; |
512 | return true; |
513 | } |
514 | |
515 | if (status != TRANS_IN_PROGRESS) { |
516 | send_udf_response(&tr, &rw->response_db); |
517 | return true; |
518 | } |
519 | |
520 | // If we don't need replica writes, transaction is finished. |
521 | if (rw->n_dest_nodes == 0) { |
522 | finished_replicated(&tr); |
523 | send_udf_response(&tr, &rw->response_db); |
524 | return true; |
525 | } |
526 | |
527 | // If we don't need to wait for replica write acks, fire and forget. |
528 | if (respond_on_master_complete(&tr)) { |
529 | udf_repl_write_forget_after_dup_res(rw, &tr); |
530 | send_udf_response(&tr, &rw->response_db); |
531 | return true; |
532 | } |
533 | |
534 | udf_repl_write_after_dup_res(rw, &tr); |
535 | |
536 | // Started replica write - don't delete rw_request from hash. |
537 | return false; |
538 | } |
539 | |
540 | |
541 | void |
542 | udf_repl_write_after_dup_res(rw_request* rw, as_transaction* tr) |
543 | { |
544 | // Recycle rw_request that was just used for duplicate resolution to now do |
545 | // replica writes. Note - we are under the rw_request lock here! |
546 | |
547 | repl_write_make_message(rw, tr); |
548 | repl_write_reset_rw(rw, tr, udf_repl_write_cb); |
549 | send_rw_messages(rw); |
550 | } |
551 | |
552 | |
553 | void |
554 | udf_repl_write_forget_after_dup_res(rw_request* rw, as_transaction* tr) |
555 | { |
556 | // Send replica writes. Not waiting for acks, so need to reset rw_request. |
557 | // Note - we are under the rw_request lock here! |
558 | |
559 | repl_write_make_message(rw, tr); |
560 | send_rw_messages_forget(rw); |
561 | } |
562 | |
563 | |
564 | void |
565 | udf_repl_write_cb(rw_request* rw) |
566 | { |
567 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf, FROM_CLIENT, repl_write); |
568 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, udf_sub, FROM_IUDF, repl_write); |
569 | |
570 | as_transaction tr; |
571 | as_transaction_init_from_rw(&tr, rw); |
572 | |
573 | finished_replicated(&tr); |
574 | send_udf_response(&tr, &rw->response_db); |
575 | |
576 | // Finished transaction - rw_request cleans up reservation and msgp! |
577 | } |
578 | |
579 | |
580 | //========================================================== |
581 | // Local helpers - transaction end. |
582 | // |
583 | |
584 | void |
585 | send_udf_response(as_transaction* tr, cf_dyn_buf* db) |
586 | { |
587 | // Paranoia - shouldn't get here on losing race with timeout. |
588 | if (! tr->from.any) { |
589 | cf_warning(AS_RW, "transaction origin %u has null 'from'" , tr->origin); |
590 | return; |
591 | } |
592 | |
593 | // Note - if tr was setup from rw, rw->from.any has been set null and |
594 | // informs timeout it lost the race. |
595 | |
596 | clear_delete_response_metadata(tr); |
597 | |
598 | switch (tr->origin) { |
599 | case FROM_CLIENT: |
600 | if (db && db->used_sz != 0) { |
601 | as_msg_send_ops_reply(tr->from.proto_fd_h, db); |
602 | } |
603 | else { |
604 | as_msg_send_reply(tr->from.proto_fd_h, tr->result_code, |
605 | tr->generation, tr->void_time, NULL, NULL, 0, tr->rsv.ns, |
606 | as_transaction_trid(tr)); |
607 | } |
608 | BENCHMARK_NEXT_DATA_POINT(tr, udf, response); |
609 | HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(tr, udf_hist); |
610 | client_udf_update_stats(tr->rsv.ns, tr->result_code); |
611 | break; |
612 | case FROM_PROXY: |
613 | if (db && db->used_sz != 0) { |
614 | as_proxy_send_ops_response(tr->from.proxy_node, |
615 | tr->from_data.proxy_tid, db); |
616 | } |
617 | else { |
618 | as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid, |
619 | tr->result_code, tr->generation, tr->void_time, NULL, NULL, |
620 | 0, tr->rsv.ns, as_transaction_trid(tr)); |
621 | } |
622 | from_proxy_udf_update_stats(tr->rsv.ns, tr->result_code); |
623 | break; |
624 | case FROM_IUDF: |
625 | if (db && db->used_sz != 0) { |
626 | cf_crash(AS_RW, "unexpected - internal udf has response" ); |
627 | } |
628 | tr->from.iudf_orig->cb(tr->from.iudf_orig->udata, tr->result_code); |
629 | BENCHMARK_NEXT_DATA_POINT(tr, udf_sub, response); |
630 | udf_sub_udf_update_stats(tr->rsv.ns, tr->result_code); |
631 | break; |
632 | default: |
633 | cf_crash(AS_RW, "unexpected transaction origin %u" , tr->origin); |
634 | break; |
635 | } |
636 | |
637 | tr->from.any = NULL; // pattern, not needed |
638 | } |
639 | |
640 | |
641 | void |
642 | udf_timeout_cb(rw_request* rw) |
643 | { |
644 | if (! rw->from.any) { |
645 | return; // lost race against dup-res or repl-write callback |
646 | } |
647 | |
648 | finished_not_replicated(rw); |
649 | |
650 | switch (rw->origin) { |
651 | case FROM_CLIENT: |
652 | as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL, |
653 | 0, rw->rsv.ns, rw_request_trid(rw)); |
654 | // Timeouts aren't included in histograms. |
655 | client_udf_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
656 | break; |
657 | case FROM_PROXY: |
658 | from_proxy_udf_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
659 | break; |
660 | case FROM_IUDF: |
661 | rw->from.iudf_orig->cb(rw->from.iudf_orig->udata, AS_ERR_TIMEOUT); |
662 | // Timeouts aren't included in histograms. |
663 | udf_sub_udf_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
664 | break; |
665 | default: |
666 | cf_crash(AS_RW, "unexpected transaction origin %u" , rw->origin); |
667 | break; |
668 | } |
669 | |
670 | rw->from.any = NULL; // inform other callback it lost the race |
671 | } |
672 | |
673 | |
674 | //========================================================== |
675 | // Local helpers - UDF. |
676 | // |
677 | |
678 | transaction_status |
679 | udf_master(rw_request* rw, as_transaction* tr) |
680 | { |
681 | CF_ALLOC_SET_NS_ARENA(tr->rsv.ns); |
682 | |
683 | udf_def def; |
684 | udf_call call = { &def, tr }; |
685 | |
686 | if (tr->origin == FROM_IUDF) { |
687 | call.def = &tr->from.iudf_orig->def; |
688 | } |
689 | else if (! udf_def_init_from_msg(call.def, tr)) { |
690 | cf_warning(AS_UDF, "failed udf_def_init_from_msg" ); |
691 | tr->result_code = AS_ERR_PARAMETER; |
692 | return TRANS_DONE_ERROR; |
693 | } |
694 | |
695 | udf_optype optype = udf_master_apply(&call, rw); |
696 | |
697 | if (tr->origin != FROM_IUDF && call.def->arglist) { |
698 | as_list_destroy(call.def->arglist); |
699 | } |
700 | |
701 | if (optype == UDF_OPTYPE_READ || optype == UDF_OPTYPE_NONE) { |
702 | // UDF is done, no replica writes needed. |
703 | return TRANS_DONE_SUCCESS; |
704 | } |
705 | |
706 | return optype == UDF_OPTYPE_WAITING ? TRANS_WAITING : TRANS_IN_PROGRESS; |
707 | } |
708 | |
709 | |
710 | udf_optype |
711 | udf_master_apply(udf_call* call, rw_request* rw) |
712 | { |
713 | as_transaction* tr = call->tr; |
714 | as_namespace* ns = tr->rsv.ns; |
715 | |
716 | // Find record in index. |
717 | |
718 | as_index_ref r_ref; |
719 | int get_rv = as_record_get(tr->rsv.tree, &tr->keyd, &r_ref); |
720 | |
721 | if (get_rv == 0 && as_record_is_doomed(r_ref.r, ns)) { |
722 | // If record is expired or truncated, pretend it was not found. |
723 | as_record_done(&r_ref, ns); |
724 | get_rv = -1; |
725 | } |
726 | |
727 | if (get_rv == 0 && repl_state_check(r_ref.r, tr) < 0) { |
728 | as_record_done(&r_ref, ns); |
729 | return UDF_OPTYPE_WAITING; |
730 | } |
731 | |
732 | if (tr->origin == FROM_IUDF && |
733 | (get_rv == -1 || ! as_record_is_live(r_ref.r))) { |
734 | // Internal UDFs must not create records. |
735 | tr->result_code = AS_ERR_NOT_FOUND; |
736 | process_failure(call, NULL, &rw->response_db); |
737 | return UDF_OPTYPE_NONE; |
738 | } |
739 | |
740 | // Apply predexp metadata filter if present & not internal UDF. |
741 | |
742 | int rv; |
743 | predexp_eval_t* predexp = NULL; |
744 | |
745 | if (tr->origin != FROM_IUDF && get_rv == 0 && as_record_is_live(r_ref.r) && |
746 | (rv = build_predexp_and_filter_meta(tr, r_ref.r, &predexp)) != 0) { |
747 | tr->result_code = rv; |
748 | process_failure(call, NULL, &rw->response_db); |
749 | return UDF_OPTYPE_NONE; |
750 | } |
751 | |
752 | // Open storage record. |
753 | |
754 | as_storage_rd rd; |
755 | |
756 | udf_record urecord; |
757 | udf_record_init(&urecord, true); |
758 | |
759 | xdr_dirty_bins dirty_bins; |
760 | xdr_clear_dirty_bins(&dirty_bins); |
761 | |
762 | urecord.r_ref = &r_ref; |
763 | urecord.tr = tr; |
764 | urecord.rd = &rd; |
765 | urecord.dirty = &dirty_bins; |
766 | urecord.keyd = tr->keyd; |
767 | |
768 | if (get_rv == 0) { |
769 | urecord.flag |= (UDF_RECORD_FLAG_OPEN | UDF_RECORD_FLAG_PREEXISTS); |
770 | |
771 | if (udf_storage_record_open(&urecord) != 0) { |
772 | predexp_destroy(predexp); |
773 | udf_record_close(&urecord); |
774 | tr->result_code = AS_ERR_BIN_NAME; // overloaded... add bin_count error? |
775 | process_failure(call, NULL, &rw->response_db); |
776 | return UDF_OPTYPE_NONE; |
777 | } |
778 | |
779 | if (predexp != NULL || (tr->origin == FROM_IUDF && |
780 | tr->from.iudf_orig->predexp != NULL)) { |
781 | predexp_args_t predargs = { .ns = ns, .md = r_ref.r, .rd = &rd }; |
782 | |
783 | if (! predexp_matches_record(tr->origin == FROM_IUDF ? |
784 | tr->from.iudf_orig->predexp : predexp, &predargs)) { |
785 | predexp_destroy(predexp); |
786 | udf_record_close(&urecord); |
787 | tr->result_code = AS_ERR_FILTERED_OUT; |
788 | process_failure(call, NULL, &rw->response_db); |
789 | return UDF_OPTYPE_NONE; |
790 | } |
791 | |
792 | predexp_destroy(predexp); |
793 | } |
794 | |
795 | as_msg* m = &tr->msgp->msg; |
796 | |
797 | // If both the record and the message have keys, check them. |
798 | if (rd.key) { |
799 | if (as_transaction_has_key(tr) && ! check_msg_key(m, &rd)) { |
800 | udf_record_close(&urecord); |
801 | tr->result_code = AS_ERR_KEY_MISMATCH; |
802 | process_failure(call, NULL, &rw->response_db); |
803 | return UDF_OPTYPE_NONE; |
804 | } |
805 | } |
806 | else { |
807 | // If the message has a key, apply it to the record. |
808 | if (! get_msg_key(tr, &rd)) { |
809 | udf_record_close(&urecord); |
810 | tr->result_code = AS_ERR_UNSUPPORTED_FEATURE; |
811 | process_failure(call, NULL, &rw->response_db); |
812 | return UDF_OPTYPE_NONE; |
813 | } |
814 | |
815 | urecord.flag |= UDF_RECORD_FLAG_METADATA_UPDATED; |
816 | } |
817 | } |
818 | else { |
819 | urecord.flag &= ~(UDF_RECORD_FLAG_OPEN | |
820 | UDF_RECORD_FLAG_STORAGE_OPEN | |
821 | UDF_RECORD_FLAG_PREEXISTS); |
822 | } |
823 | |
824 | // Run UDF. |
825 | |
826 | // This as_rec needs to be in the heap - once passed into the lua scope it |
827 | // gets garbage collected later. Also, the destroy hook is set to NULL so |
828 | // garbage collection has nothing to do. |
829 | as_rec* urec = as_rec_new(&urecord, &udf_record_hooks); |
830 | |
831 | as_val_reserve(urec); // for lua |
832 | |
833 | as_result result; |
834 | as_result_init(&result); |
835 | |
836 | int apply_rv = udf_apply_record(call, urec, &result); |
837 | |
838 | udf_optype optype = UDF_OPTYPE_NONE; |
839 | |
840 | if (apply_rv == 0) { |
841 | udf_finish(&urecord, rw, &optype); |
842 | process_result(&result, call, &rw->response_db); |
843 | } |
844 | else { |
845 | udf_record_close(&urecord); |
846 | |
847 | char* rs = as_module_err_string(apply_rv); |
848 | |
849 | tr->result_code = AS_ERR_UDF_EXECUTION; |
850 | process_failure_str(call, rs, strlen(rs), &rw->response_db); |
851 | cf_free(rs); |
852 | } |
853 | |
854 | update_lua_complete_stats(tr->origin, ns, optype, apply_rv, |
855 | result.is_success); |
856 | |
857 | as_result_destroy(&result); |
858 | udf_record_destroy(urec); |
859 | |
860 | return optype; |
861 | } |
862 | |
863 | |
864 | int |
865 | udf_apply_record(udf_call* call, as_rec* rec, as_result* result) |
866 | { |
867 | // timedout callback gives no 'udata' per UDF - use thread-local. |
868 | g_end_ns = ((udf_record*)rec->data)->tr->end_time; |
869 | |
870 | as_timer timer; |
871 | |
872 | static const as_timer_hooks udf_timer_hooks = { |
873 | .destroy = NULL, |
874 | .timedout = udf_timer_timedout, |
875 | .timeslice = udf_timer_timeslice |
876 | }; |
877 | |
878 | as_timer_init(&timer, NULL, &udf_timer_hooks); |
879 | |
880 | as_udf_context ctx = { |
881 | .as = &g_as_aerospike, |
882 | .timer = &timer, |
883 | .memtracker = NULL |
884 | }; |
885 | |
886 | return as_module_apply_record(&mod_lua, &ctx, call->def->filename, |
887 | call->def->function, rec, call->def->arglist, result); |
888 | } |
889 | |
890 | |
891 | void |
892 | udf_finish(udf_record* urecord, rw_request* rw, udf_optype* record_op) |
893 | { |
894 | *record_op = UDF_OPTYPE_READ; |
895 | |
896 | udf_optype final_op = udf_finish_op(urecord); |
897 | |
898 | if (final_op == UDF_OPTYPE_DELETE) { |
899 | *record_op = UDF_OPTYPE_DELETE; |
900 | urecord->tr->flags |= AS_TRANSACTION_FLAG_IS_DELETE; |
901 | } |
902 | else if (final_op == UDF_OPTYPE_WRITE) { |
903 | *record_op = UDF_OPTYPE_WRITE; |
904 | } |
905 | |
906 | udf_post_processing(urecord, rw, final_op); |
907 | } |
908 | |
909 | |
910 | udf_optype |
911 | udf_finish_op(udf_record* urecord) |
912 | { |
913 | if (udf_zero_bins_left(urecord)) { |
914 | // Amazingly, with respect to stored key, memory statistics work out |
915 | // correctly regardless of what this returns. |
916 | return udf_finish_delete(urecord); |
917 | } |
918 | |
919 | if ((urecord->flag & UDF_RECORD_FLAG_HAS_UPDATES) != 0) { |
920 | if ((urecord->flag & UDF_RECORD_FLAG_OPEN) == 0) { |
921 | cf_crash(AS_UDF, "updated record not open" ); |
922 | } |
923 | |
924 | return UDF_OPTYPE_WRITE; |
925 | } |
926 | |
927 | return UDF_OPTYPE_READ; |
928 | } |
929 | |
930 | |
931 | void |
932 | udf_post_processing(udf_record* urecord, rw_request* rw, udf_optype urecord_op) |
933 | { |
934 | as_storage_rd* rd = urecord->rd; |
935 | as_transaction* tr = urecord->tr; |
936 | as_namespace* ns = rd->ns; |
937 | as_record* r = rd->r; |
938 | |
939 | uint16_t generation = 0; |
940 | uint16_t set_id = 0; |
941 | xdr_dirty_bins dirty_bins; |
942 | |
943 | if (urecord_op == UDF_OPTYPE_WRITE || urecord_op == UDF_OPTYPE_DELETE) { |
944 | as_msg* m = &tr->msgp->msg; |
945 | |
946 | // Convert message TTL special value if appropriate. |
947 | if (m->record_ttl == TTL_DONT_UPDATE && |
948 | (urecord->flag & UDF_RECORD_FLAG_PREEXISTS) == 0) { |
949 | m->record_ttl = TTL_NAMESPACE_DEFAULT; |
950 | } |
951 | |
952 | update_metadata_in_index(tr, r); |
953 | |
954 | // TODO - old pickle - remove in "six months". |
955 | if (as_exchange_min_compatibility_id() < 3) { |
956 | rw->is_old_pickle = true; |
957 | pickle_all(rd, rw); |
958 | } |
959 | |
960 | tr->generation = r->generation; |
961 | tr->void_time = r->void_time; |
962 | tr->last_update_time = r->last_update_time; |
963 | |
964 | // Store or drop the key as appropriate. |
965 | as_record_finalize_key(r, ns, rd->key, rd->key_size); |
966 | |
967 | as_storage_record_adjust_mem_stats(rd, urecord->starting_memory_bytes); |
968 | |
969 | will_replicate(r, ns); |
970 | |
971 | // Collect information for XDR before closing the record. |
972 | generation = plain_generation(r->generation, ns); |
973 | set_id = as_index_get_set_id(r); |
974 | |
975 | if (urecord->dirty && urecord_op == UDF_OPTYPE_WRITE) { |
976 | xdr_clear_dirty_bins(&dirty_bins); |
977 | xdr_copy_dirty_bins(urecord->dirty, &dirty_bins); |
978 | } |
979 | } |
980 | |
981 | // Will we need a pickle? |
982 | // TODO - old pickle - remove condition in "six months". |
983 | if (! rw->is_old_pickle) { |
984 | rd->keep_pickle = rw->n_dest_nodes != 0; |
985 | } |
986 | |
987 | // Close the record for all the cases. |
988 | udf_record_close(urecord); |
989 | |
990 | // TODO - old pickle - remove condition in "six months". |
991 | if (! rw->is_old_pickle) { |
992 | // Yes, it's safe to use these urecord fields after udf_record_close(). |
993 | rw->pickle = urecord->pickle; |
994 | rw->pickle_sz = urecord->pickle_sz; |
995 | } |
996 | |
997 | // Write to XDR pipe. |
998 | if (urecord_op == UDF_OPTYPE_WRITE) { |
999 | xdr_write(tr->rsv.ns, &tr->keyd, generation, 0, XDR_OP_TYPE_WRITE, |
1000 | set_id, &dirty_bins); |
1001 | } |
1002 | else if (urecord_op == UDF_OPTYPE_DELETE) { |
1003 | xdr_write(tr->rsv.ns, &tr->keyd, 0, 0, |
1004 | as_transaction_is_durable_delete(tr) ? |
1005 | XDR_OP_TYPE_DURABLE_DELETE : XDR_OP_TYPE_DROP, |
1006 | set_id, NULL); |
1007 | } |
1008 | } |
1009 | |
1010 | |
1011 | bool |
1012 | udf_timer_timedout(const as_timer* timer) |
1013 | { |
1014 | uint64_t now = cf_getns(); |
1015 | |
1016 | if (now < g_end_ns) { |
1017 | return false; |
1018 | } |
1019 | |
1020 | cf_warning(AS_UDF, "UDF timed out %lu ms ago" , (now - g_end_ns) / 1000000); |
1021 | |
1022 | return true; |
1023 | } |
1024 | |
1025 | |
1026 | uint64_t |
1027 | udf_timer_timeslice(const as_timer* timer) |
1028 | { |
1029 | uint64_t now = cf_getns(); |
1030 | |
1031 | return g_end_ns > now ? (g_end_ns - now) / 1000000 : 1; |
1032 | } |
1033 | |
1034 | |
1035 | //========================================================== |
1036 | // Local helpers - statistics. |
1037 | // |
1038 | |
1039 | void |
1040 | update_lua_complete_stats(uint8_t origin, as_namespace* ns, udf_optype op, |
1041 | int ret, bool is_success) |
1042 | { |
1043 | switch (origin) { |
1044 | case FROM_CLIENT: |
1045 | if (ret == 0 && is_success) { |
1046 | if (op == UDF_OPTYPE_READ) { |
1047 | cf_atomic64_incr(&ns->n_client_lang_read_success); |
1048 | } |
1049 | else if (op == UDF_OPTYPE_DELETE) { |
1050 | cf_atomic64_incr(&ns->n_client_lang_delete_success); |
1051 | } |
1052 | else if (op == UDF_OPTYPE_WRITE) { |
1053 | cf_atomic64_incr(&ns->n_client_lang_write_success); |
1054 | } |
1055 | } |
1056 | else { |
1057 | cf_info(AS_UDF, "lua error, ret:%d" , ret); |
1058 | cf_atomic64_incr(&ns->n_client_lang_error); |
1059 | } |
1060 | break; |
1061 | case FROM_PROXY: |
1062 | if (ret == 0 && is_success) { |
1063 | if (op == UDF_OPTYPE_READ) { |
1064 | cf_atomic64_incr(&ns->n_from_proxy_lang_read_success); |
1065 | } |
1066 | else if (op == UDF_OPTYPE_DELETE) { |
1067 | cf_atomic64_incr(&ns->n_from_proxy_lang_delete_success); |
1068 | } |
1069 | else if (op == UDF_OPTYPE_WRITE) { |
1070 | cf_atomic64_incr(&ns->n_from_proxy_lang_write_success); |
1071 | } |
1072 | } |
1073 | else { |
1074 | cf_info(AS_UDF, "lua error, ret:%d" , ret); |
1075 | cf_atomic64_incr(&ns->n_from_proxy_lang_error); |
1076 | } |
1077 | break; |
1078 | case FROM_IUDF: |
1079 | if (ret == 0 && is_success) { |
1080 | if (op == UDF_OPTYPE_READ) { |
1081 | // Note - this would be weird, since there's nowhere for a |
1082 | // response to go in our current UDF scans & queries. |
1083 | cf_atomic64_incr(&ns->n_udf_sub_lang_read_success); |
1084 | } |
1085 | else if (op == UDF_OPTYPE_DELETE) { |
1086 | cf_atomic64_incr(&ns->n_udf_sub_lang_delete_success); |
1087 | } |
1088 | else if (op == UDF_OPTYPE_WRITE) { |
1089 | cf_atomic64_incr(&ns->n_udf_sub_lang_write_success); |
1090 | } |
1091 | } |
1092 | else { |
1093 | cf_info(AS_UDF, "lua error, ret:%d" , ret); |
1094 | cf_atomic64_incr(&ns->n_udf_sub_lang_error); |
1095 | } |
1096 | break; |
1097 | default: |
1098 | cf_crash(AS_UDF, "unexpected transaction origin %u" , origin); |
1099 | break; |
1100 | } |
1101 | } |
1102 | |
1103 | |
1104 | //========================================================== |
1105 | // Local helpers - construct response to be sent to origin. |
1106 | // |
1107 | |
1108 | void |
1109 | process_failure_str(udf_call* call, const char* err_str, size_t len, |
1110 | cf_dyn_buf* db) |
1111 | { |
1112 | if (! err_str) { |
1113 | // Better than sending an as_string with null value. |
1114 | process_failure(call, NULL, db); |
1115 | return; |
1116 | } |
1117 | |
1118 | as_string stack_s; |
1119 | as_string_init_wlen(&stack_s, (char*)err_str, len, false); |
1120 | |
1121 | process_failure(call, as_string_toval(&stack_s), db); |
1122 | } |
1123 | |
1124 | |
1125 | void |
1126 | process_result(const as_result* result, udf_call* call, cf_dyn_buf* db) |
1127 | { |
1128 | as_val* val = result->value; |
1129 | |
1130 | if (result->is_success) { |
1131 | process_success(call, val, db); |
1132 | return; |
1133 | } |
1134 | |
1135 | // Failures... |
1136 | |
1137 | if (as_val_type(val) == AS_STRING) { |
1138 | call->tr->result_code = AS_ERR_UDF_EXECUTION; |
1139 | process_failure(call, val, db); |
1140 | return; |
1141 | } |
1142 | |
1143 | char lua_err_str[1024]; |
1144 | size_t len = (size_t)sprintf(lua_err_str, |
1145 | "%s:0: in function %s() - error() argument type not handled" , |
1146 | call->def->filename, call->def->function); |
1147 | |
1148 | call->tr->result_code = AS_ERR_UDF_EXECUTION; |
1149 | process_failure_str(call, lua_err_str, len, db); |
1150 | } |
1151 | |
1152 | |
1153 | void |
1154 | process_response(udf_call* call, bool success, const as_val* val, |
1155 | cf_dyn_buf* db) |
1156 | { |
1157 | // No response for background (internal) UDF. |
1158 | if (call->def->type == AS_UDF_OP_BACKGROUND) { |
1159 | return; |
1160 | } |
1161 | |
1162 | as_transaction* tr = call->tr; |
1163 | |
1164 | // Note - this function quietly handles a null val. The response call will |
1165 | // be given a bin with a name but not 'in use', and it does the right thing. |
1166 | |
1167 | size_t msg_sz = 0; |
1168 | |
1169 | db->buf = (uint8_t *)as_msg_make_val_response(success, val, tr->result_code, |
1170 | tr->generation, tr->void_time, as_transaction_trid(tr), &msg_sz); |
1171 | |
1172 | db->is_stack = false; |
1173 | db->alloc_sz = msg_sz; |
1174 | db->used_sz = msg_sz; |
1175 | } |
1176 | |