1 | /* |
2 | * scan.c |
3 | * |
4 | * Copyright (C) 2015 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //============================================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/scan.h" |
28 | |
29 | #include <errno.h> |
30 | #include <fcntl.h> |
31 | #include <stdbool.h> |
32 | #include <stddef.h> |
33 | #include <stdint.h> |
34 | #include <stdio.h> |
35 | #include <string.h> |
36 | #include <unistd.h> |
37 | |
38 | #include "aerospike/as_atomic.h" |
39 | #include "aerospike/as_list.h" |
40 | #include "aerospike/as_module.h" |
41 | #include "aerospike/as_string.h" |
42 | #include "aerospike/as_val.h" |
43 | #include "citrusleaf/alloc.h" |
44 | #include "citrusleaf/cf_clock.h" |
45 | #include "citrusleaf/cf_digest.h" |
46 | #include "citrusleaf/cf_ll.h" |
47 | #include "citrusleaf/cf_vector.h" |
48 | |
49 | #include "cf_mutex.h" |
50 | #include "cf_thread.h" |
51 | #include "dynbuf.h" |
52 | #include "fault.h" |
53 | #include "socket.h" |
54 | |
55 | #include "base/aggr.h" |
56 | #include "base/cfg.h" |
57 | #include "base/datamodel.h" |
58 | #include "base/index.h" |
59 | #include "base/monitor.h" |
60 | #include "base/predexp.h" |
61 | #include "base/proto.h" |
62 | #include "base/scan_job.h" |
63 | #include "base/scan_manager.h" |
64 | #include "base/secondary_index.h" |
65 | #include "base/service.h" |
66 | #include "base/transaction.h" |
67 | #include "fabric/exchange.h" |
68 | #include "fabric/partition.h" |
69 | #include "transaction/rw_utils.h" |
70 | #include "transaction/udf.h" |
71 | #include "transaction/write.h" |
72 | |
73 | |
74 | |
75 | //============================================================================== |
76 | // Typedefs and forward declarations. |
77 | // |
78 | |
79 | //---------------------------------------------------------- |
80 | // Scan types. |
81 | // |
82 | |
83 | typedef enum { |
84 | SCAN_TYPE_BASIC = 0, |
85 | SCAN_TYPE_AGGR = 1, |
86 | SCAN_TYPE_UDF_BG = 2, |
87 | SCAN_TYPE_OPS_BG = 3, |
88 | |
89 | SCAN_TYPE_UNKNOWN = -1 |
90 | } scan_type; |
91 | |
92 | static inline const char* |
93 | scan_type_str(scan_type type) |
94 | { |
95 | switch (type) { |
96 | case SCAN_TYPE_BASIC: |
97 | return "basic" ; |
98 | case SCAN_TYPE_AGGR: |
99 | return "aggregation" ; |
100 | case SCAN_TYPE_UDF_BG: |
101 | return "background-udf" ; |
102 | case SCAN_TYPE_OPS_BG: |
103 | return "background-ops" ; |
104 | default: |
105 | return "?" ; |
106 | } |
107 | } |
108 | |
109 | //---------------------------------------------------------- |
110 | // scan_job - derived classes' public methods. |
111 | // |
112 | |
113 | int basic_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id); |
114 | int aggr_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id); |
115 | int udf_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id); |
116 | int ops_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id); |
117 | |
118 | //---------------------------------------------------------- |
119 | // Non-class-specific utilities. |
120 | // |
121 | |
122 | typedef struct scan_options_s { |
123 | int priority; |
124 | bool fail_on_cluster_change; |
125 | uint32_t sample_pct; |
126 | } scan_options; |
127 | |
128 | int get_scan_set_id(as_transaction* tr, as_namespace* ns, uint16_t* p_set_id); |
129 | scan_type get_scan_type(as_transaction* tr); |
130 | bool get_scan_options(as_transaction* tr, scan_options* options); |
131 | bool get_scan_rps(as_transaction* tr, uint32_t* rps); |
132 | void convert_old_priority(int old_priority, uint32_t* rps); |
133 | bool validate_background_scan_rps(const as_namespace* ns, uint32_t* rps); |
134 | bool get_scan_socket_timeout(as_transaction* tr, uint32_t* timeout); |
135 | bool get_scan_predexp(as_transaction* tr, predexp_eval_t** p_predexp); |
136 | size_t send_blocking_response_chunk(as_file_handle* fd_h, uint8_t* buf, size_t size, int32_t timeout); |
137 | static inline bool excluded_set(as_index* r, uint16_t set_id); |
138 | |
139 | |
140 | |
141 | //============================================================================== |
142 | // Constants. |
143 | // |
144 | |
145 | #define LOW_PRIORITY_RPS 5000 // for compatibility with old clients |
146 | |
147 | const size_t INIT_BUF_BUILDER_SIZE = 1024 * 1024 * 2; |
148 | const size_t SCAN_CHUNK_LIMIT = 1024 * 1024; |
149 | |
150 | #define MAX_ACTIVE_TRANSACTIONS 200 |
151 | |
152 | |
153 | |
154 | //============================================================================== |
155 | // Public API. |
156 | // |
157 | |
158 | void |
159 | as_scan_init(void) |
160 | { |
161 | as_scan_manager_init(); |
162 | } |
163 | |
164 | int |
165 | as_scan(as_transaction* tr, as_namespace* ns) |
166 | { |
167 | int result; |
168 | uint16_t set_id = INVALID_SET_ID; |
169 | |
170 | if ((result = get_scan_set_id(tr, ns, &set_id)) != AS_OK) { |
171 | return result; |
172 | } |
173 | |
174 | switch (get_scan_type(tr)) { |
175 | case SCAN_TYPE_BASIC: |
176 | result = basic_scan_job_start(tr, ns, set_id); |
177 | break; |
178 | case SCAN_TYPE_AGGR: |
179 | result = aggr_scan_job_start(tr, ns, set_id); |
180 | break; |
181 | case SCAN_TYPE_UDF_BG: |
182 | result = udf_bg_scan_job_start(tr, ns, set_id); |
183 | break; |
184 | case SCAN_TYPE_OPS_BG: |
185 | result = ops_bg_scan_job_start(tr, ns, set_id); |
186 | break; |
187 | default: |
188 | cf_warning(AS_SCAN, "can't identify scan type" ); |
189 | result = AS_ERR_PARAMETER; |
190 | break; |
191 | } |
192 | |
193 | return result; |
194 | } |
195 | |
196 | void |
197 | as_scan_limit_finished_jobs(void) |
198 | { |
199 | as_scan_manager_limit_finished_jobs(); |
200 | } |
201 | |
202 | int |
203 | as_scan_get_active_job_count(void) |
204 | { |
205 | return as_scan_manager_get_active_job_count(); |
206 | } |
207 | |
208 | int |
209 | as_scan_list(char* name, cf_dyn_buf* db) |
210 | { |
211 | (void)name; |
212 | |
213 | as_mon_info_cmd(AS_MON_MODULES[SCAN_MOD], NULL, 0, 0, db); |
214 | return 0; |
215 | } |
216 | |
217 | as_mon_jobstat* |
218 | as_scan_get_jobstat(uint64_t trid) |
219 | { |
220 | return as_scan_manager_get_job_info(trid); |
221 | } |
222 | |
223 | as_mon_jobstat* |
224 | as_scan_get_jobstat_all(int* size) |
225 | { |
226 | return as_scan_manager_get_info(size); |
227 | } |
228 | |
229 | int |
230 | as_scan_abort(uint64_t trid) |
231 | { |
232 | return as_scan_manager_abort_job(trid) ? 0 : -1; |
233 | } |
234 | |
235 | int |
236 | as_scan_abort_all(void) |
237 | { |
238 | return as_scan_manager_abort_all_jobs(); |
239 | } |
240 | |
241 | |
242 | //============================================================================== |
243 | // Non-class-specific utilities. |
244 | // |
245 | |
246 | int |
247 | get_scan_set_id(as_transaction* tr, as_namespace* ns, uint16_t* p_set_id) |
248 | { |
249 | uint16_t set_id = INVALID_SET_ID; |
250 | as_msg_field* f = as_transaction_has_set(tr) ? |
251 | as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_SET) : NULL; |
252 | |
253 | if (f && as_msg_field_get_value_sz(f) != 0) { |
254 | uint32_t set_name_len = as_msg_field_get_value_sz(f); |
255 | char set_name[set_name_len + 1]; |
256 | |
257 | memcpy(set_name, f->data, set_name_len); |
258 | set_name[set_name_len] = '\0'; |
259 | set_id = as_namespace_get_set_id(ns, set_name); |
260 | |
261 | if (set_id == INVALID_SET_ID) { |
262 | cf_warning(AS_SCAN, "scan msg from %s has unrecognized set %s" , |
263 | tr->from.proto_fd_h->client, set_name); |
264 | return AS_ERR_NOT_FOUND; |
265 | } |
266 | } |
267 | |
268 | *p_set_id = set_id; |
269 | |
270 | return AS_OK; |
271 | } |
272 | |
273 | scan_type |
274 | get_scan_type(as_transaction* tr) |
275 | { |
276 | if (! as_transaction_is_udf(tr)) { |
277 | return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ? |
278 | SCAN_TYPE_OPS_BG : SCAN_TYPE_BASIC; |
279 | } |
280 | |
281 | as_msg_field* udf_op_f = as_msg_field_get(&tr->msgp->msg, |
282 | AS_MSG_FIELD_TYPE_UDF_OP); |
283 | |
284 | if (udf_op_f && *udf_op_f->data == (uint8_t)AS_UDF_OP_AGGREGATE) { |
285 | return SCAN_TYPE_AGGR; |
286 | } |
287 | |
288 | if (udf_op_f && *udf_op_f->data == (uint8_t)AS_UDF_OP_BACKGROUND) { |
289 | return SCAN_TYPE_UDF_BG; |
290 | } |
291 | |
292 | return SCAN_TYPE_UNKNOWN; |
293 | } |
294 | |
295 | bool |
296 | get_scan_options(as_transaction* tr, scan_options* options) |
297 | { |
298 | if (! as_transaction_has_scan_options(tr)) { |
299 | return true; |
300 | } |
301 | |
302 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, |
303 | AS_MSG_FIELD_TYPE_SCAN_OPTIONS); |
304 | |
305 | if (as_msg_field_get_value_sz(f) != 2) { |
306 | cf_warning(AS_SCAN, "scan msg options field size not 2" ); |
307 | return false; |
308 | } |
309 | |
310 | options->priority = AS_MSG_FIELD_SCAN_PRIORITY(f->data[0]); |
311 | options->fail_on_cluster_change = |
312 | (AS_MSG_FIELD_SCAN_FAIL_ON_CLUSTER_CHANGE & f->data[0]) != 0; |
313 | options->sample_pct = f->data[1]; |
314 | |
315 | return true; |
316 | } |
317 | |
318 | bool |
319 | get_scan_rps(as_transaction* tr, uint32_t* rps) |
320 | { |
321 | if (! as_transaction_has_recs_per_sec(tr)) { |
322 | return true; |
323 | } |
324 | |
325 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, |
326 | AS_MSG_FIELD_TYPE_RECS_PER_SEC); |
327 | |
328 | if (as_msg_field_get_value_sz(f) != 4) { |
329 | cf_warning(AS_SCAN, "scan recs-per-sec field size not 4" ); |
330 | return false; |
331 | } |
332 | |
333 | *rps = cf_swap_from_be32(*(uint32_t*)f->data); |
334 | |
335 | return true; |
336 | } |
337 | |
338 | void |
339 | convert_old_priority(int old_priority, uint32_t* rps) |
340 | { |
341 | if (old_priority != 0 && *rps != 0) { |
342 | cf_warning(AS_SCAN, "unexpected - scan has rps %u and priority %d" , |
343 | *rps, old_priority); |
344 | return; |
345 | } |
346 | |
347 | if (old_priority == 1 && *rps == 0) { |
348 | cf_info(AS_SCAN, "low-priority scan from old client will use %u rps" , |
349 | LOW_PRIORITY_RPS); |
350 | |
351 | *rps = LOW_PRIORITY_RPS; |
352 | } |
353 | } |
354 | |
355 | bool |
356 | validate_background_scan_rps(const as_namespace* ns, uint32_t* rps) |
357 | { |
358 | if (*rps > ns->background_scan_max_rps) { |
359 | cf_warning(AS_SCAN, "scan rps %u exceeds 'background-scan-max-rps' %u" , |
360 | *rps, ns->background_scan_max_rps); |
361 | return false; |
362 | } |
363 | |
364 | if (*rps == 0) { |
365 | *rps = ns->background_scan_max_rps; |
366 | } |
367 | |
368 | return true; |
369 | } |
370 | |
371 | bool |
372 | get_scan_socket_timeout(as_transaction* tr, uint32_t* timeout) |
373 | { |
374 | if (! as_transaction_has_socket_timeout(tr)) { |
375 | return true; |
376 | } |
377 | |
378 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, |
379 | AS_MSG_FIELD_TYPE_SOCKET_TIMEOUT); |
380 | |
381 | if (as_msg_field_get_value_sz(f) != 4) { |
382 | cf_warning(AS_SCAN, "scan socket timeout field size not 4" ); |
383 | return false; |
384 | } |
385 | |
386 | *timeout = cf_swap_from_be32(*(uint32_t*)f->data); |
387 | |
388 | return true; |
389 | } |
390 | |
391 | bool |
392 | get_scan_predexp(as_transaction* tr, predexp_eval_t** p_predexp) |
393 | { |
394 | if (! as_transaction_has_predexp(tr)) { |
395 | return true; |
396 | } |
397 | |
398 | as_msg_field* f = as_msg_field_get(&tr->msgp->msg, |
399 | AS_MSG_FIELD_TYPE_PREDEXP); |
400 | |
401 | *p_predexp = predexp_build(f); |
402 | |
403 | return *p_predexp != NULL; |
404 | } |
405 | |
406 | size_t |
407 | send_blocking_response_chunk(as_file_handle* fd_h, uint8_t* buf, size_t size, |
408 | int32_t timeout) |
409 | { |
410 | cf_socket* sock = &fd_h->sock; |
411 | as_proto proto; |
412 | |
413 | proto.version = PROTO_VERSION; |
414 | proto.type = PROTO_TYPE_AS_MSG; |
415 | proto.sz = size; |
416 | as_proto_swap(&proto); |
417 | |
418 | if (cf_socket_send_all(sock, (uint8_t*)&proto, sizeof(as_proto), |
419 | MSG_NOSIGNAL | MSG_MORE, timeout) < 0) { |
420 | cf_warning(AS_SCAN, "error sending to %s - fd %d %s" , fd_h->client, |
421 | CSFD(sock), cf_strerror(errno)); |
422 | return 0; |
423 | } |
424 | |
425 | if (cf_socket_send_all(sock, buf, size, MSG_NOSIGNAL, timeout) < 0) { |
426 | cf_warning(AS_SCAN, "error sending to %s - fd %d sz %lu %s" , |
427 | fd_h->client, CSFD(sock), size, cf_strerror(errno)); |
428 | return 0; |
429 | } |
430 | |
431 | return sizeof(as_proto) + size; |
432 | } |
433 | |
434 | static inline bool |
435 | excluded_set(as_index* r, uint16_t set_id) |
436 | { |
437 | return set_id != INVALID_SET_ID && set_id != as_index_get_set_id(r); |
438 | } |
439 | |
440 | static inline void |
441 | throttle_sleep(as_scan_job* _job) |
442 | { |
443 | uint32_t sleep_us = as_scan_job_throttle(_job); |
444 | |
445 | if (sleep_us != 0) { |
446 | usleep(sleep_us); |
447 | } |
448 | } |
449 | |
450 | |
451 | |
452 | //============================================================================== |
453 | // conn_scan_job derived class implementation - not final class. |
454 | // |
455 | |
456 | //---------------------------------------------------------- |
457 | // conn_scan_job typedefs and forward declarations. |
458 | // |
459 | |
460 | typedef struct conn_scan_job_s { |
461 | // Base object must be first: |
462 | as_scan_job _base; |
463 | |
464 | // Derived class data: |
465 | cf_mutex fd_lock; |
466 | as_file_handle* fd_h; |
467 | int32_t fd_timeout; |
468 | |
469 | uint64_t net_io_bytes; |
470 | } conn_scan_job; |
471 | |
472 | void conn_scan_job_own_fd(conn_scan_job* job, as_file_handle* fd_h, uint32_t timeout); |
473 | void conn_scan_job_disown_fd(conn_scan_job* job); |
474 | void conn_scan_job_finish(conn_scan_job* job); |
475 | bool conn_scan_job_send_response(conn_scan_job* job, uint8_t* buf, size_t size); |
476 | void conn_scan_job_release_fd(conn_scan_job* job, bool force_close); |
477 | void conn_scan_job_info(conn_scan_job* job, as_mon_jobstat* stat); |
478 | |
479 | //---------------------------------------------------------- |
480 | // conn_scan_job API. |
481 | // |
482 | |
483 | void |
484 | conn_scan_job_own_fd(conn_scan_job* job, as_file_handle* fd_h, uint32_t timeout) |
485 | { |
486 | cf_mutex_init(&job->fd_lock); |
487 | |
488 | job->fd_h = fd_h; |
489 | job->fd_timeout = timeout == 0 ? -1 : (int32_t)timeout; |
490 | |
491 | job->net_io_bytes = 0; |
492 | } |
493 | |
494 | void |
495 | conn_scan_job_disown_fd(conn_scan_job* job) |
496 | { |
497 | // Just undo conn_scan_job_own_fd(), nothing more. |
498 | |
499 | cf_mutex_destroy(&job->fd_lock); |
500 | } |
501 | |
502 | void |
503 | conn_scan_job_finish(conn_scan_job* job) |
504 | { |
505 | as_scan_job* _job = (as_scan_job*)job; |
506 | |
507 | if (job->fd_h) { |
508 | // TODO - perhaps reflect in monitor if send fails? |
509 | size_t size_sent = as_msg_send_fin_timeout(&job->fd_h->sock, |
510 | _job->abandoned, job->fd_timeout); |
511 | |
512 | job->net_io_bytes += size_sent; |
513 | conn_scan_job_release_fd(job, size_sent == 0); |
514 | } |
515 | |
516 | cf_mutex_destroy(&job->fd_lock); |
517 | } |
518 | |
519 | bool |
520 | conn_scan_job_send_response(conn_scan_job* job, uint8_t* buf, size_t size) |
521 | { |
522 | as_scan_job* _job = (as_scan_job*)job; |
523 | |
524 | cf_mutex_lock(&job->fd_lock); |
525 | |
526 | if (! job->fd_h) { |
527 | cf_mutex_unlock(&job->fd_lock); |
528 | // Job already abandoned. |
529 | return false; |
530 | } |
531 | |
532 | size_t size_sent = send_blocking_response_chunk(job->fd_h, buf, size, |
533 | job->fd_timeout); |
534 | |
535 | if (size_sent == 0) { |
536 | int reason = errno == ETIMEDOUT ? |
537 | AS_SCAN_ERR_RESPONSE_TIMEOUT : AS_SCAN_ERR_RESPONSE_ERROR; |
538 | |
539 | conn_scan_job_release_fd(job, true); |
540 | cf_mutex_unlock(&job->fd_lock); |
541 | as_scan_manager_abandon_job(_job, reason); |
542 | return false; |
543 | } |
544 | |
545 | job->net_io_bytes += size_sent; |
546 | |
547 | cf_mutex_unlock(&job->fd_lock); |
548 | return true; |
549 | } |
550 | |
551 | void |
552 | conn_scan_job_release_fd(conn_scan_job* job, bool force_close) |
553 | { |
554 | job->fd_h->last_used = cf_getns(); |
555 | as_end_of_transaction(job->fd_h, force_close); |
556 | job->fd_h = NULL; |
557 | } |
558 | |
559 | void |
560 | conn_scan_job_info(conn_scan_job* job, as_mon_jobstat* stat) |
561 | { |
562 | stat->net_io_bytes = job->net_io_bytes; |
563 | stat->socket_timeout = job->fd_timeout; |
564 | } |
565 | |
566 | |
567 | |
568 | //============================================================================== |
569 | // basic_scan_job derived class implementation. |
570 | // |
571 | |
572 | //---------------------------------------------------------- |
573 | // basic_scan_job typedefs and forward declarations. |
574 | // |
575 | |
576 | typedef struct basic_scan_job_s { |
577 | // Base object must be first: |
578 | conn_scan_job _base; |
579 | |
580 | // Derived class data: |
581 | uint64_t cluster_key; |
582 | bool fail_on_cluster_change; |
583 | bool no_bin_data; |
584 | uint32_t sample_pct; |
585 | predexp_eval_t* predexp; |
586 | cf_vector* bin_names; |
587 | } basic_scan_job; |
588 | |
589 | void basic_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv); |
590 | void basic_scan_job_finish(as_scan_job* _job); |
591 | void basic_scan_job_destroy(as_scan_job* _job); |
592 | void basic_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat); |
593 | |
594 | const as_scan_vtable basic_scan_job_vtable = { |
595 | basic_scan_job_slice, |
596 | basic_scan_job_finish, |
597 | basic_scan_job_destroy, |
598 | basic_scan_job_info |
599 | }; |
600 | |
601 | typedef struct basic_scan_slice_s { |
602 | basic_scan_job* job; |
603 | cf_buf_builder** bb_r; |
604 | } basic_scan_slice; |
605 | |
606 | void basic_scan_job_reduce_cb(as_index_ref* r_ref, void* udata); |
607 | bool basic_scan_predexp_filter_meta(const basic_scan_job* job, const as_record* r, predexp_eval_t** predexp); |
608 | cf_vector* bin_names_from_op(as_msg* m, int* result); |
609 | |
610 | //---------------------------------------------------------- |
611 | // basic_scan_job public API. |
612 | // |
613 | |
614 | int |
615 | basic_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id) |
616 | { |
617 | basic_scan_job* job = cf_malloc(sizeof(basic_scan_job)); |
618 | as_scan_job* _job = (as_scan_job*)job; |
619 | |
620 | scan_options options = { .sample_pct = 100 }; |
621 | uint32_t rps = 0; |
622 | uint32_t timeout = CF_SOCKET_TIMEOUT; |
623 | predexp_eval_t* predexp = NULL; |
624 | |
625 | if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps) || |
626 | ! get_scan_socket_timeout(tr, &timeout) || |
627 | ! get_scan_predexp(tr, &predexp)) { |
628 | cf_warning(AS_SCAN, "basic scan job failed msg field processing" ); |
629 | cf_free(job); |
630 | return AS_ERR_PARAMETER; |
631 | } |
632 | |
633 | convert_old_priority(options.priority, &rps); |
634 | |
635 | as_scan_job_init(_job, &basic_scan_job_vtable, as_transaction_trid(tr), ns, |
636 | set_id, rps, tr->from.proto_fd_h->client); |
637 | |
638 | job->cluster_key = as_exchange_cluster_key(); |
639 | job->fail_on_cluster_change = options.fail_on_cluster_change; |
640 | job->no_bin_data = (tr->msgp->msg.info1 & AS_MSG_INFO1_GET_NO_BINS) != 0; |
641 | job->sample_pct = options.sample_pct; |
642 | job->predexp = predexp; |
643 | |
644 | int result; |
645 | |
646 | job->bin_names = bin_names_from_op(&tr->msgp->msg, &result); |
647 | |
648 | if (! job->bin_names && result != AS_OK) { |
649 | as_scan_job_destroy(_job); |
650 | return result; |
651 | } |
652 | |
653 | if (job->fail_on_cluster_change && |
654 | (ns->migrate_tx_partitions_remaining != 0 || |
655 | ns->migrate_rx_partitions_remaining != 0)) { |
656 | cf_warning(AS_SCAN, "basic scan job not started - migration" ); |
657 | as_scan_job_destroy(_job); |
658 | return AS_ERR_CLUSTER_KEY_MISMATCH; |
659 | } |
660 | |
661 | // Take ownership of socket from transaction. |
662 | conn_scan_job_own_fd((conn_scan_job*)job, tr->from.proto_fd_h, timeout); |
663 | |
664 | cf_info(AS_SCAN, "starting basic scan job %lu {%s:%s} rps %u sample-pct %u%s%s socket-timeout %u from %s" , |
665 | _job->trid, ns->name, as_namespace_get_set_name(ns, set_id), |
666 | _job->rps, job->sample_pct, |
667 | job->no_bin_data ? ", metadata-only" : "" , |
668 | job->fail_on_cluster_change ? ", fail-on-cluster-change" : "" , |
669 | timeout, _job->client); |
670 | |
671 | if ((result = as_scan_manager_start_job(_job)) != 0) { |
672 | cf_warning(AS_SCAN, "basic scan job %lu failed to start (%d)" , |
673 | _job->trid, result); |
674 | conn_scan_job_disown_fd((conn_scan_job*)job); |
675 | as_scan_job_destroy(_job); |
676 | return result; |
677 | } |
678 | |
679 | return AS_OK; |
680 | } |
681 | |
682 | //---------------------------------------------------------- |
683 | // basic_scan_job mandatory scan_job interface. |
684 | // |
685 | |
686 | void |
687 | basic_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv) |
688 | { |
689 | basic_scan_job* job = (basic_scan_job*)_job; |
690 | as_index_tree* tree = rsv->tree; |
691 | cf_buf_builder* bb = cf_buf_builder_create(INIT_BUF_BUILDER_SIZE); |
692 | uint64_t slice_start = cf_getms(); |
693 | basic_scan_slice slice = { job, &bb }; |
694 | |
695 | if (job->sample_pct == 100) { |
696 | as_index_reduce_live(tree, basic_scan_job_reduce_cb, (void*)&slice); |
697 | } |
698 | else { |
699 | uint64_t sample_count = |
700 | ((as_index_tree_size(tree) * job->sample_pct) / 100); |
701 | |
702 | as_index_reduce_partial_live(tree, sample_count, |
703 | basic_scan_job_reduce_cb, (void*)&slice); |
704 | } |
705 | |
706 | if (bb->used_sz != 0) { |
707 | conn_scan_job_send_response((conn_scan_job*)job, bb->buf, bb->used_sz); |
708 | } |
709 | |
710 | cf_buf_builder_free(bb); |
711 | |
712 | cf_detail(AS_SCAN, "%s:%u basic scan job %lu in thread %d took %lu ms" , |
713 | rsv->ns->name, rsv->p->id, _job->trid, cf_thread_sys_tid(), |
714 | cf_getms() - slice_start); |
715 | } |
716 | |
717 | void |
718 | basic_scan_job_finish(as_scan_job* _job) |
719 | { |
720 | conn_scan_job_finish((conn_scan_job*)_job); |
721 | |
722 | switch (_job->abandoned) { |
723 | case 0: |
724 | as_incr_uint64(&_job->ns->n_scan_basic_complete); |
725 | break; |
726 | case AS_SCAN_ERR_USER_ABORT: |
727 | as_incr_uint64(&_job->ns->n_scan_basic_abort); |
728 | break; |
729 | case AS_SCAN_ERR_UNKNOWN: |
730 | case AS_SCAN_ERR_CLUSTER_KEY: |
731 | case AS_SCAN_ERR_RESPONSE_ERROR: |
732 | case AS_SCAN_ERR_RESPONSE_TIMEOUT: |
733 | default: |
734 | as_incr_uint64(&_job->ns->n_scan_basic_error); |
735 | break; |
736 | } |
737 | |
738 | cf_info(AS_SCAN, "finished basic scan job %lu (%d)" , _job->trid, |
739 | _job->abandoned); |
740 | } |
741 | |
742 | void |
743 | basic_scan_job_destroy(as_scan_job* _job) |
744 | { |
745 | basic_scan_job* job = (basic_scan_job*)_job; |
746 | |
747 | if (job->bin_names) { |
748 | cf_vector_destroy(job->bin_names); |
749 | } |
750 | |
751 | predexp_destroy(job->predexp); |
752 | } |
753 | |
754 | void |
755 | basic_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat) |
756 | { |
757 | strcpy(stat->job_type, scan_type_str(SCAN_TYPE_BASIC)); |
758 | conn_scan_job_info((conn_scan_job*)_job, stat); |
759 | } |
760 | |
761 | //---------------------------------------------------------- |
762 | // basic_scan_job utilities. |
763 | // |
764 | |
765 | void |
766 | basic_scan_job_reduce_cb(as_index_ref* r_ref, void* udata) |
767 | { |
768 | basic_scan_slice* slice = (basic_scan_slice*)udata; |
769 | basic_scan_job* job = slice->job; |
770 | as_scan_job* _job = (as_scan_job*)job; |
771 | as_namespace* ns = _job->ns; |
772 | |
773 | if (_job->abandoned != 0) { |
774 | as_record_done(r_ref, ns); |
775 | return; |
776 | } |
777 | |
778 | if (job->fail_on_cluster_change && |
779 | job->cluster_key != as_exchange_cluster_key()) { |
780 | as_record_done(r_ref, ns); |
781 | as_scan_manager_abandon_job(_job, AS_ERR_CLUSTER_KEY_MISMATCH); |
782 | return; |
783 | } |
784 | |
785 | as_index* r = r_ref->r; |
786 | |
787 | if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) { |
788 | as_record_done(r_ref, ns); |
789 | return; |
790 | } |
791 | |
792 | predexp_eval_t* predexp = NULL; |
793 | |
794 | if (! basic_scan_predexp_filter_meta(job, r, &predexp)) { |
795 | as_record_done(r_ref, ns); |
796 | as_incr_uint64(&_job->n_filtered_meta); |
797 | return; |
798 | } |
799 | |
800 | as_storage_rd rd; |
801 | |
802 | as_storage_record_open(ns, r, &rd); |
803 | |
804 | if (predexp != NULL && predexp_read_and_filter_bins(&rd, predexp) != 0) { |
805 | as_storage_record_close(&rd); |
806 | as_record_done(r_ref, ns); |
807 | as_incr_uint64(&_job->n_filtered_bins); |
808 | |
809 | if (! ns->storage_data_in_memory) { |
810 | throttle_sleep(_job); |
811 | } |
812 | |
813 | return; |
814 | } |
815 | |
816 | if (job->no_bin_data) { |
817 | as_msg_make_response_bufbuilder(slice->bb_r, &rd, true, NULL); |
818 | } |
819 | else { |
820 | if (as_storage_rd_load_n_bins(&rd) < 0) { |
821 | cf_warning(AS_SCAN, "job %lu - record unreadable" , _job->trid); |
822 | as_storage_record_close(&rd); |
823 | as_record_done(r_ref, ns); |
824 | as_incr_uint64(&_job->n_failed); |
825 | return; |
826 | } |
827 | |
828 | as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd.n_bins]; |
829 | |
830 | if (as_storage_rd_load_bins(&rd, stack_bins)) { |
831 | cf_warning(AS_SCAN, "job %lu - record unreadable" , _job->trid); |
832 | as_storage_record_close(&rd); |
833 | as_record_done(r_ref, ns); |
834 | as_incr_uint64(&_job->n_failed); |
835 | return; |
836 | } |
837 | |
838 | as_msg_make_response_bufbuilder(slice->bb_r, &rd, false, |
839 | job->bin_names); |
840 | } |
841 | |
842 | as_storage_record_close(&rd); |
843 | as_record_done(r_ref, ns); |
844 | as_incr_uint64(&_job->n_succeeded); |
845 | |
846 | throttle_sleep(_job); |
847 | |
848 | cf_buf_builder* bb = *slice->bb_r; |
849 | |
850 | // If we exceed the proto size limit, send accumulated data back to client |
851 | // and reset the buf-builder to start a new proto. |
852 | if (bb->used_sz > SCAN_CHUNK_LIMIT) { |
853 | if (! conn_scan_job_send_response((conn_scan_job*)job, bb->buf, |
854 | bb->used_sz)) { |
855 | return; |
856 | } |
857 | |
858 | cf_buf_builder_reset(bb); |
859 | } |
860 | } |
861 | |
862 | bool |
863 | basic_scan_predexp_filter_meta(const basic_scan_job* job, const as_record* r, |
864 | predexp_eval_t** predexp) |
865 | { |
866 | *predexp = job->predexp; |
867 | |
868 | if (*predexp == NULL) { |
869 | return true; |
870 | } |
871 | |
872 | as_namespace* ns = ((as_scan_job*)job)->ns; |
873 | predexp_args_t predargs = { .ns = ns, .md = (as_record*)r }; |
874 | predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs); |
875 | |
876 | if (predrv == PREDEXP_UNKNOWN) { |
877 | return true; // caller must later check bins using *predexp |
878 | } |
879 | // else - caller will not need to apply filter later. |
880 | |
881 | *predexp = NULL; |
882 | |
883 | return predrv == PREDEXP_TRUE; |
884 | } |
885 | |
886 | cf_vector* |
887 | bin_names_from_op(as_msg* m, int* result) |
888 | { |
889 | *result = AS_OK; |
890 | |
891 | if (m->n_ops == 0) { |
892 | return NULL; |
893 | } |
894 | |
895 | cf_vector* v = cf_vector_create(AS_BIN_NAME_MAX_SZ, m->n_ops, 0); |
896 | |
897 | as_msg_op* op = NULL; |
898 | int n = 0; |
899 | |
900 | while ((op = as_msg_op_iterate(m, op, &n)) != NULL) { |
901 | if (op->name_sz >= AS_BIN_NAME_MAX_SZ) { |
902 | cf_warning(AS_SCAN, "basic scan job bin name too long" ); |
903 | cf_vector_destroy(v); |
904 | *result = AS_ERR_BIN_NAME; |
905 | return NULL; |
906 | } |
907 | |
908 | char bin_name[AS_BIN_NAME_MAX_SZ]; |
909 | |
910 | memcpy(bin_name, op->name, op->name_sz); |
911 | bin_name[op->name_sz] = 0; |
912 | cf_vector_append_unique(v, (void*)bin_name); |
913 | } |
914 | |
915 | return v; |
916 | } |
917 | |
918 | |
919 | |
920 | //============================================================================== |
921 | // aggr_scan_job derived class implementation. |
922 | // |
923 | |
924 | //---------------------------------------------------------- |
925 | // aggr_scan_job typedefs and forward declarations. |
926 | // |
927 | |
928 | typedef struct aggr_scan_job_s { |
929 | // Base object must be first: |
930 | conn_scan_job _base; |
931 | |
932 | // Derived class data: |
933 | as_aggr_call aggr_call; |
934 | } aggr_scan_job; |
935 | |
936 | void aggr_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv); |
937 | void aggr_scan_job_finish(as_scan_job* _job); |
938 | void aggr_scan_job_destroy(as_scan_job* _job); |
939 | void aggr_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat); |
940 | |
941 | const as_scan_vtable aggr_scan_job_vtable = { |
942 | aggr_scan_job_slice, |
943 | aggr_scan_job_finish, |
944 | aggr_scan_job_destroy, |
945 | aggr_scan_job_info |
946 | }; |
947 | |
948 | typedef struct aggr_scan_slice_s { |
949 | aggr_scan_job* job; |
950 | cf_ll* ll; |
951 | cf_buf_builder** bb_r; |
952 | as_partition_reservation* rsv; |
953 | } aggr_scan_slice; |
954 | |
955 | bool aggr_scan_init(as_aggr_call* call, const as_transaction* tr); |
956 | void aggr_scan_job_reduce_cb(as_index_ref* r_ref, void* udata); |
957 | bool aggr_scan_add_digest(cf_ll* ll, cf_digest* keyd); |
958 | as_partition_reservation* aggr_scan_ptn_reserve(void* udata, as_namespace* ns, |
959 | uint32_t pid, as_partition_reservation* rsv); |
960 | as_stream_status aggr_scan_ostream_write(void* udata, as_val* val); |
961 | |
962 | const as_aggr_hooks scan_aggr_hooks = { |
963 | .ostream_write = aggr_scan_ostream_write, |
964 | .set_error = NULL, |
965 | .ptn_reserve = aggr_scan_ptn_reserve, |
966 | .ptn_release = NULL, |
967 | .pre_check = NULL |
968 | }; |
969 | |
970 | void aggr_scan_add_val_response(aggr_scan_slice* slice, const as_val* val, |
971 | bool success); |
972 | |
973 | //---------------------------------------------------------- |
974 | // aggr_scan_job public API. |
975 | // |
976 | |
977 | int |
978 | aggr_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id) |
979 | { |
980 | aggr_scan_job* job = cf_malloc(sizeof(aggr_scan_job)); |
981 | as_scan_job* _job = (as_scan_job*)job; |
982 | |
983 | scan_options options = { .sample_pct = 100 }; |
984 | uint32_t rps = 0; |
985 | uint32_t timeout = CF_SOCKET_TIMEOUT; |
986 | |
987 | if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps) || |
988 | ! get_scan_socket_timeout(tr, &timeout)) { |
989 | cf_warning(AS_SCAN, "aggregation scan job failed msg field processing" ); |
990 | cf_free(job); |
991 | return AS_ERR_PARAMETER; |
992 | } |
993 | |
994 | if (as_transaction_has_predexp(tr)) { |
995 | cf_warning(AS_SCAN, "aggregation scans do not support predexp filters" ); |
996 | cf_free(job); |
997 | return AS_ERR_UNSUPPORTED_FEATURE; |
998 | } |
999 | |
1000 | convert_old_priority(options.priority, &rps); |
1001 | |
1002 | as_scan_job_init(_job, &aggr_scan_job_vtable, as_transaction_trid(tr), ns, |
1003 | set_id, rps, tr->from.proto_fd_h->client); |
1004 | |
1005 | if (! aggr_scan_init(&job->aggr_call, tr)) { |
1006 | cf_warning(AS_SCAN, "aggregation scan job failed call init" ); |
1007 | as_scan_job_destroy(_job); |
1008 | return AS_ERR_PARAMETER; |
1009 | } |
1010 | |
1011 | // Take ownership of socket from transaction. |
1012 | conn_scan_job_own_fd((conn_scan_job*)job, tr->from.proto_fd_h, timeout); |
1013 | |
1014 | cf_info(AS_SCAN, "starting aggregation scan job %lu {%s:%s} rps %u socket-timeout %u from %s" , |
1015 | _job->trid, ns->name, as_namespace_get_set_name(ns, set_id), |
1016 | _job->rps, timeout, _job->client); |
1017 | |
1018 | int result = as_scan_manager_start_job(_job); |
1019 | |
1020 | if (result != 0) { |
1021 | cf_warning(AS_SCAN, "aggregation scan job %lu failed to start (%d)" , |
1022 | _job->trid, result); |
1023 | conn_scan_job_disown_fd((conn_scan_job*)job); |
1024 | as_scan_job_destroy(_job); |
1025 | return result; |
1026 | } |
1027 | |
1028 | return AS_OK; |
1029 | } |
1030 | |
1031 | //---------------------------------------------------------- |
1032 | // aggr_scan_job mandatory scan_job interface. |
1033 | // |
1034 | |
1035 | void |
1036 | aggr_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv) |
1037 | { |
1038 | aggr_scan_job* job = (aggr_scan_job*)_job; |
1039 | cf_ll ll; |
1040 | |
1041 | cf_ll_init(&ll, as_index_keys_ll_destroy_fn, false); |
1042 | |
1043 | cf_buf_builder* bb = cf_buf_builder_create(INIT_BUF_BUILDER_SIZE); |
1044 | aggr_scan_slice slice = { job, &ll, &bb, rsv }; |
1045 | |
1046 | as_index_reduce_live(rsv->tree, aggr_scan_job_reduce_cb, (void*)&slice); |
1047 | |
1048 | if (cf_ll_size(&ll) != 0) { |
1049 | as_result result; |
1050 | as_result_init(&result); |
1051 | |
1052 | int ret = as_aggr_process(_job->ns, &job->aggr_call, &ll, (void*)&slice, |
1053 | &result); |
1054 | |
1055 | if (ret != 0) { |
1056 | char* rs = as_module_err_string(ret); |
1057 | |
1058 | if (result.value) { |
1059 | as_string* lua_s = as_string_fromval(result.value); |
1060 | char* lua_err = (char*)as_string_tostring(lua_s); |
1061 | |
1062 | if (lua_err) { |
1063 | int l_rs_len = strlen(rs); |
1064 | |
1065 | rs = cf_realloc(rs, l_rs_len + strlen(lua_err) + 4); |
1066 | sprintf(&rs[l_rs_len], " : %s" , lua_err); |
1067 | } |
1068 | } |
1069 | |
1070 | const as_val* v = (as_val*)as_string_new(rs, false); |
1071 | |
1072 | aggr_scan_add_val_response(&slice, v, false); |
1073 | as_val_destroy(v); |
1074 | cf_free(rs); |
1075 | as_scan_manager_abandon_job(_job, AS_ERR_UNKNOWN); |
1076 | } |
1077 | |
1078 | as_result_destroy(&result); |
1079 | } |
1080 | |
1081 | cf_ll_reduce(&ll, true, as_index_keys_ll_reduce_fn, NULL); |
1082 | |
1083 | if (bb->used_sz != 0) { |
1084 | conn_scan_job_send_response((conn_scan_job*)job, bb->buf, bb->used_sz); |
1085 | } |
1086 | |
1087 | cf_buf_builder_free(bb); |
1088 | } |
1089 | |
1090 | void |
1091 | aggr_scan_job_finish(as_scan_job* _job) |
1092 | { |
1093 | aggr_scan_job* job = (aggr_scan_job*)_job; |
1094 | |
1095 | conn_scan_job_finish((conn_scan_job*)job); |
1096 | |
1097 | if (job->aggr_call.def.arglist) { |
1098 | as_list_destroy(job->aggr_call.def.arglist); |
1099 | job->aggr_call.def.arglist = NULL; |
1100 | } |
1101 | |
1102 | switch (_job->abandoned) { |
1103 | case 0: |
1104 | as_incr_uint64(&_job->ns->n_scan_aggr_complete); |
1105 | break; |
1106 | case AS_SCAN_ERR_USER_ABORT: |
1107 | as_incr_uint64(&_job->ns->n_scan_aggr_abort); |
1108 | break; |
1109 | case AS_SCAN_ERR_UNKNOWN: |
1110 | case AS_SCAN_ERR_CLUSTER_KEY: |
1111 | case AS_SCAN_ERR_RESPONSE_ERROR: |
1112 | case AS_SCAN_ERR_RESPONSE_TIMEOUT: |
1113 | default: |
1114 | as_incr_uint64(&_job->ns->n_scan_aggr_error); |
1115 | break; |
1116 | } |
1117 | |
1118 | cf_info(AS_SCAN, "finished aggregation scan job %lu (%d)" , _job->trid, |
1119 | _job->abandoned); |
1120 | } |
1121 | |
1122 | void |
1123 | aggr_scan_job_destroy(as_scan_job* _job) |
1124 | { |
1125 | aggr_scan_job* job = (aggr_scan_job*)_job; |
1126 | |
1127 | if (job->aggr_call.def.arglist) { |
1128 | as_list_destroy(job->aggr_call.def.arglist); |
1129 | } |
1130 | } |
1131 | |
1132 | void |
1133 | aggr_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat) |
1134 | { |
1135 | strcpy(stat->job_type, scan_type_str(SCAN_TYPE_AGGR)); |
1136 | conn_scan_job_info((conn_scan_job*)_job, stat); |
1137 | } |
1138 | |
1139 | //---------------------------------------------------------- |
1140 | // aggr_scan_job utilities. |
1141 | // |
1142 | |
1143 | bool |
1144 | aggr_scan_init(as_aggr_call* call, const as_transaction* tr) |
1145 | { |
1146 | if (! udf_def_init_from_msg(&call->def, tr)) { |
1147 | return false; |
1148 | } |
1149 | |
1150 | call->aggr_hooks = &scan_aggr_hooks; |
1151 | |
1152 | return true; |
1153 | } |
1154 | |
1155 | void |
1156 | aggr_scan_job_reduce_cb(as_index_ref* r_ref, void* udata) |
1157 | { |
1158 | aggr_scan_slice* slice = (aggr_scan_slice*)udata; |
1159 | aggr_scan_job* job = slice->job; |
1160 | as_scan_job* _job = (as_scan_job*)job; |
1161 | as_namespace* ns = _job->ns; |
1162 | |
1163 | if (_job->abandoned != 0) { |
1164 | as_record_done(r_ref, ns); |
1165 | return; |
1166 | } |
1167 | |
1168 | as_index* r = r_ref->r; |
1169 | |
1170 | if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) { |
1171 | as_record_done(r_ref, ns); |
1172 | return; |
1173 | } |
1174 | |
1175 | if (! aggr_scan_add_digest(slice->ll, &r->keyd)) { |
1176 | as_record_done(r_ref, ns); |
1177 | as_scan_manager_abandon_job(_job, AS_ERR_UNKNOWN); |
1178 | return; |
1179 | } |
1180 | |
1181 | as_record_done(r_ref, ns); |
1182 | as_incr_uint64(&_job->n_succeeded); |
1183 | |
1184 | throttle_sleep(_job); |
1185 | } |
1186 | |
1187 | bool |
1188 | aggr_scan_add_digest(cf_ll* ll, cf_digest* keyd) |
1189 | { |
1190 | as_index_keys_ll_element* tail_e = (as_index_keys_ll_element*)ll->tail; |
1191 | as_index_keys_arr* keys_arr; |
1192 | |
1193 | if (tail_e) { |
1194 | keys_arr = tail_e->keys_arr; |
1195 | |
1196 | if (keys_arr->num == AS_INDEX_KEYS_PER_ARR) { |
1197 | tail_e = NULL; |
1198 | } |
1199 | } |
1200 | |
1201 | if (! tail_e) { |
1202 | if (! (keys_arr = as_index_get_keys_arr())) { |
1203 | return false; |
1204 | } |
1205 | |
1206 | tail_e = cf_malloc(sizeof(as_index_keys_ll_element)); |
1207 | |
1208 | tail_e->keys_arr = keys_arr; |
1209 | cf_ll_append(ll, (cf_ll_element*)tail_e); |
1210 | } |
1211 | |
1212 | keys_arr->pindex_digs[keys_arr->num] = *keyd; |
1213 | keys_arr->num++; |
1214 | |
1215 | return true; |
1216 | } |
1217 | |
1218 | as_partition_reservation* |
1219 | aggr_scan_ptn_reserve(void* udata, as_namespace* ns, uint32_t pid, |
1220 | as_partition_reservation* rsv) |
1221 | { |
1222 | aggr_scan_slice* slice = (aggr_scan_slice*)udata; |
1223 | |
1224 | return slice->rsv; |
1225 | } |
1226 | |
1227 | as_stream_status |
1228 | aggr_scan_ostream_write(void* udata, as_val* val) |
1229 | { |
1230 | aggr_scan_slice* slice = (aggr_scan_slice*)udata; |
1231 | |
1232 | if (val) { |
1233 | aggr_scan_add_val_response(slice, val, true); |
1234 | as_val_destroy(val); |
1235 | } |
1236 | |
1237 | return AS_STREAM_OK; |
1238 | } |
1239 | |
1240 | void |
1241 | aggr_scan_add_val_response(aggr_scan_slice* slice, const as_val* val, |
1242 | bool success) |
1243 | { |
1244 | uint32_t size = as_particle_asval_client_value_size(val); |
1245 | |
1246 | as_msg_make_val_response_bufbuilder(val, slice->bb_r, size, success); |
1247 | |
1248 | cf_buf_builder* bb = *slice->bb_r; |
1249 | conn_scan_job* conn_job = (conn_scan_job*)slice->job; |
1250 | |
1251 | // If we exceed the proto size limit, send accumulated data back to client |
1252 | // and reset the buf-builder to start a new proto. |
1253 | if (bb->used_sz > SCAN_CHUNK_LIMIT) { |
1254 | if (! conn_scan_job_send_response(conn_job, bb->buf, bb->used_sz)) { |
1255 | return; |
1256 | } |
1257 | |
1258 | cf_buf_builder_reset(bb); |
1259 | } |
1260 | } |
1261 | |
1262 | |
1263 | |
1264 | //============================================================================== |
1265 | // udf_bg_scan_job derived class implementation. |
1266 | // |
1267 | |
1268 | //---------------------------------------------------------- |
1269 | // udf_bg_scan_job typedefs and forward declarations. |
1270 | // |
1271 | |
1272 | typedef struct udf_bg_scan_job_s { |
1273 | // Base object must be first: |
1274 | as_scan_job _base; |
1275 | |
1276 | // Derived class data: |
1277 | iudf_origin origin; |
1278 | uint32_t n_active_tr; |
1279 | } udf_bg_scan_job; |
1280 | |
1281 | void udf_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv); |
1282 | void udf_bg_scan_job_finish(as_scan_job* _job); |
1283 | void udf_bg_scan_job_destroy(as_scan_job* _job); |
1284 | void udf_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat); |
1285 | |
1286 | const as_scan_vtable udf_bg_scan_job_vtable = { |
1287 | udf_bg_scan_job_slice, |
1288 | udf_bg_scan_job_finish, |
1289 | udf_bg_scan_job_destroy, |
1290 | udf_bg_scan_job_info |
1291 | }; |
1292 | |
1293 | void udf_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata); |
1294 | void udf_bg_scan_tr_complete(void* udata, int result); |
1295 | |
1296 | //---------------------------------------------------------- |
1297 | // udf_bg_scan_job public API. |
1298 | // |
1299 | |
1300 | int |
1301 | udf_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id) |
1302 | { |
1303 | udf_bg_scan_job* job = cf_malloc(sizeof(udf_bg_scan_job)); |
1304 | as_scan_job* _job = (as_scan_job*)job; |
1305 | |
1306 | scan_options options = { .sample_pct = 100 }; |
1307 | uint32_t rps = 0; |
1308 | |
1309 | if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps)) { |
1310 | cf_warning(AS_SCAN, "udf-bg scan job failed msg field processing" ); |
1311 | cf_free(job); |
1312 | return AS_ERR_PARAMETER; |
1313 | } |
1314 | |
1315 | convert_old_priority(options.priority, &rps); |
1316 | |
1317 | if (! validate_background_scan_rps(ns, &rps)) { |
1318 | cf_warning(AS_SCAN, "udf-bg scan job failed rps check" ); |
1319 | cf_free(job); |
1320 | return AS_ERR_PARAMETER; |
1321 | } |
1322 | |
1323 | predexp_eval_t* predexp = NULL; |
1324 | |
1325 | if (! get_scan_predexp(tr, &predexp)) { |
1326 | cf_warning(AS_SCAN, "udf-bg scan job failed predexp processing" ); |
1327 | cf_free(job); |
1328 | return AS_ERR_PARAMETER; |
1329 | } |
1330 | |
1331 | as_scan_job_init(_job, &udf_bg_scan_job_vtable, as_transaction_trid(tr), ns, |
1332 | set_id, rps, tr->from.proto_fd_h->client); |
1333 | |
1334 | job->n_active_tr = 0; |
1335 | |
1336 | if (! udf_def_init_from_msg(&job->origin.def, tr)) { |
1337 | cf_warning(AS_SCAN, "udf-bg scan job failed def init" ); |
1338 | as_scan_job_destroy(_job); |
1339 | return AS_ERR_PARAMETER; |
1340 | } |
1341 | |
1342 | uint8_t info2 = AS_MSG_INFO2_WRITE | |
1343 | (tr->msgp->msg.info2 & AS_MSG_INFO2_DURABLE_DELETE); |
1344 | |
1345 | job->origin.msgp = |
1346 | as_msg_create_internal(ns->name, 0, info2, 0, 0, NULL, 0); |
1347 | |
1348 | job->origin.predexp = predexp; |
1349 | job->origin.cb = udf_bg_scan_tr_complete; |
1350 | job->origin.udata = (void*)job; |
1351 | |
1352 | cf_info(AS_SCAN, "starting udf-bg scan job %lu {%s:%s} rps %u from %s" , |
1353 | _job->trid, ns->name, as_namespace_get_set_name(ns, set_id), |
1354 | _job->rps, _job->client); |
1355 | |
1356 | int result = as_scan_manager_start_job(_job); |
1357 | |
1358 | if (result != 0) { |
1359 | cf_warning(AS_SCAN, "udf-bg scan job %lu failed to start (%d)" , |
1360 | _job->trid, result); |
1361 | as_scan_job_destroy(_job); |
1362 | return result; |
1363 | } |
1364 | |
1365 | if (as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK)) { |
1366 | tr->from.proto_fd_h->last_used = cf_getns(); |
1367 | as_end_of_transaction_ok(tr->from.proto_fd_h); |
1368 | } |
1369 | else { |
1370 | cf_warning(AS_SCAN, "udf-bg scan job error sending fin" ); |
1371 | as_end_of_transaction_force_close(tr->from.proto_fd_h); |
1372 | // No point returning an error - it can't be reported on this socket. |
1373 | } |
1374 | |
1375 | tr->from.proto_fd_h = NULL; |
1376 | |
1377 | return AS_OK; |
1378 | } |
1379 | |
1380 | //---------------------------------------------------------- |
1381 | // udf_bg_scan_job mandatory scan_job interface. |
1382 | // |
1383 | |
1384 | void |
1385 | udf_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv) |
1386 | { |
1387 | as_index_reduce_live(rsv->tree, udf_bg_scan_job_reduce_cb, (void*)_job); |
1388 | } |
1389 | |
1390 | void |
1391 | udf_bg_scan_job_finish(as_scan_job* _job) |
1392 | { |
1393 | udf_bg_scan_job* job = (udf_bg_scan_job*)_job; |
1394 | |
1395 | while (job->n_active_tr != 0) { |
1396 | usleep(100); |
1397 | } |
1398 | |
1399 | switch (_job->abandoned) { |
1400 | case 0: |
1401 | as_incr_uint64(&_job->ns->n_scan_udf_bg_complete); |
1402 | break; |
1403 | case AS_SCAN_ERR_USER_ABORT: |
1404 | as_incr_uint64(&_job->ns->n_scan_udf_bg_abort); |
1405 | break; |
1406 | case AS_SCAN_ERR_UNKNOWN: |
1407 | case AS_SCAN_ERR_CLUSTER_KEY: |
1408 | default: |
1409 | as_incr_uint64(&_job->ns->n_scan_udf_bg_error); |
1410 | break; |
1411 | } |
1412 | |
1413 | cf_info(AS_SCAN, "finished udf-bg scan job %lu (%d)" , _job->trid, |
1414 | _job->abandoned); |
1415 | } |
1416 | |
1417 | void |
1418 | udf_bg_scan_job_destroy(as_scan_job* _job) |
1419 | { |
1420 | udf_bg_scan_job* job = (udf_bg_scan_job*)_job; |
1421 | |
1422 | iudf_origin_destroy(&job->origin); |
1423 | } |
1424 | |
1425 | void |
1426 | udf_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat) |
1427 | { |
1428 | strcpy(stat->job_type, scan_type_str(SCAN_TYPE_UDF_BG)); |
1429 | stat->net_io_bytes = sizeof(cl_msg); // size of original synchronous fin |
1430 | stat->socket_timeout = CF_SOCKET_TIMEOUT; |
1431 | |
1432 | udf_bg_scan_job* job = (udf_bg_scan_job*)_job; |
1433 | char* = stat->jdata + strlen(stat->jdata); |
1434 | |
1435 | sprintf(extra, ":udf-filename=%s:udf-function=%s:udf-active=%u" , |
1436 | job->origin.def.filename, job->origin.def.function, |
1437 | job->n_active_tr); |
1438 | } |
1439 | |
1440 | //---------------------------------------------------------- |
1441 | // udf_bg_scan_job utilities. |
1442 | // |
1443 | |
1444 | void |
1445 | udf_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata) |
1446 | { |
1447 | as_scan_job* _job = (as_scan_job*)udata; |
1448 | udf_bg_scan_job* job = (udf_bg_scan_job*)_job; |
1449 | as_namespace* ns = _job->ns; |
1450 | |
1451 | if (_job->abandoned != 0) { |
1452 | as_record_done(r_ref, ns); |
1453 | return; |
1454 | } |
1455 | |
1456 | as_index* r = r_ref->r; |
1457 | |
1458 | if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) { |
1459 | as_record_done(r_ref, ns); |
1460 | return; |
1461 | } |
1462 | |
1463 | predexp_args_t predargs = { .ns = ns, .md = r }; |
1464 | |
1465 | if (job->origin.predexp != NULL && |
1466 | predexp_matches_metadata(job->origin.predexp, &predargs) == |
1467 | PREDEXP_FALSE) { |
1468 | as_record_done(r_ref, ns); |
1469 | as_incr_uint64(&_job->n_filtered_meta); |
1470 | as_incr_uint64(&ns->n_udf_sub_udf_filtered_out); |
1471 | return; |
1472 | } |
1473 | |
1474 | // Save this before releasing record. |
1475 | cf_digest keyd = r->keyd; |
1476 | |
1477 | // Release record lock before throttling and enqueuing transaction. |
1478 | as_record_done(r_ref, ns); |
1479 | |
1480 | // Prefer not reaching target RPS to queue buildup and transaction timeouts. |
1481 | while (as_load_uint32(&job->n_active_tr) > MAX_ACTIVE_TRANSACTIONS) { |
1482 | usleep(1000); |
1483 | } |
1484 | |
1485 | throttle_sleep(_job); |
1486 | |
1487 | as_transaction tr; |
1488 | as_transaction_init_iudf(&tr, ns, &keyd, &job->origin); |
1489 | |
1490 | as_incr_uint32(&job->n_active_tr); |
1491 | as_service_enqueue_internal(&tr); |
1492 | } |
1493 | |
1494 | void |
1495 | udf_bg_scan_tr_complete(void* udata, int result) |
1496 | { |
1497 | as_scan_job* _job = (as_scan_job*)udata; |
1498 | udf_bg_scan_job* job = (udf_bg_scan_job*)_job; |
1499 | |
1500 | as_decr_uint32(&job->n_active_tr); |
1501 | |
1502 | switch (result) { |
1503 | case AS_OK: |
1504 | as_incr_uint64(&_job->n_succeeded); |
1505 | break; |
1506 | case AS_ERR_NOT_FOUND: // record deleted after generating tr |
1507 | break; |
1508 | case AS_ERR_FILTERED_OUT: |
1509 | as_incr_uint64(&_job->n_filtered_bins); |
1510 | break; |
1511 | default: |
1512 | as_incr_uint64(&_job->n_failed); |
1513 | break; |
1514 | } |
1515 | } |
1516 | |
1517 | |
1518 | |
1519 | //============================================================================== |
1520 | // ops_bg_scan_job derived class implementation. |
1521 | // |
1522 | |
1523 | //---------------------------------------------------------- |
1524 | // ops_bg_scan_job typedefs and forward declarations. |
1525 | // |
1526 | |
1527 | typedef struct ops_bg_scan_job_s { |
1528 | // Base object must be first: |
1529 | as_scan_job _base; |
1530 | |
1531 | // Derived class data: |
1532 | iops_origin origin; |
1533 | uint32_t n_active_tr; |
1534 | } ops_bg_scan_job; |
1535 | |
1536 | void ops_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv); |
1537 | void ops_bg_scan_job_finish(as_scan_job* _job); |
1538 | void ops_bg_scan_job_destroy(as_scan_job* _job); |
1539 | void ops_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat); |
1540 | |
1541 | const as_scan_vtable ops_bg_scan_job_vtable = { |
1542 | ops_bg_scan_job_slice, |
1543 | ops_bg_scan_job_finish, |
1544 | ops_bg_scan_job_destroy, |
1545 | ops_bg_scan_job_info |
1546 | }; |
1547 | |
1548 | uint8_t* ops_bg_validate_ops(const as_msg* m); |
1549 | void ops_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata); |
1550 | void ops_bg_scan_tr_complete(void* udata, int result); |
1551 | |
1552 | //---------------------------------------------------------- |
1553 | // ops_bg_scan_job public API. |
1554 | // |
1555 | |
1556 | int |
1557 | ops_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id) |
1558 | { |
1559 | ops_bg_scan_job* job = cf_malloc(sizeof(ops_bg_scan_job)); |
1560 | as_scan_job* _job = (as_scan_job*)job; |
1561 | |
1562 | scan_options options = { .sample_pct = 100 }; |
1563 | uint32_t rps = 0; |
1564 | |
1565 | if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps)) { |
1566 | cf_warning(AS_SCAN, "ops-bg scan job failed msg field processing" ); |
1567 | cf_free(job); |
1568 | return AS_ERR_PARAMETER; |
1569 | } |
1570 | |
1571 | if (! validate_background_scan_rps(ns, &rps)) { |
1572 | cf_warning(AS_SCAN, "ops-bg scan job failed rps check" ); |
1573 | cf_free(job); |
1574 | return AS_ERR_PARAMETER; |
1575 | } |
1576 | |
1577 | as_msg* om = &tr->msgp->msg; |
1578 | uint8_t* ops = ops_bg_validate_ops(om); |
1579 | |
1580 | if (ops == NULL) { |
1581 | cf_warning(AS_SCAN, "ops-bg scan job failed ops check" ); |
1582 | cf_free(job); |
1583 | return AS_ERR_PARAMETER; |
1584 | } |
1585 | |
1586 | predexp_eval_t* predexp = NULL; |
1587 | |
1588 | if (! get_scan_predexp(tr, &predexp)) { |
1589 | cf_warning(AS_SCAN, "ops-bg scan job failed predexp processing" ); |
1590 | cf_free(job); |
1591 | return AS_ERR_PARAMETER; |
1592 | } |
1593 | |
1594 | as_scan_job_init(_job, &ops_bg_scan_job_vtable, as_transaction_trid(tr), ns, |
1595 | set_id, rps, tr->from.proto_fd_h->client); |
1596 | |
1597 | job->n_active_tr = 0; |
1598 | |
1599 | uint8_t info2 = AS_MSG_INFO2_WRITE | |
1600 | (om->info2 & AS_MSG_INFO2_DURABLE_DELETE); |
1601 | uint8_t info3 = AS_MSG_INFO3_UPDATE_ONLY | |
1602 | (om->info3 & AS_MSG_INFO3_REPLACE_ONLY); |
1603 | |
1604 | job->origin.msgp = as_msg_create_internal(ns->name, 0, info2, info3, |
1605 | om->n_ops, ops, tr->msgp->proto.sz - (ops - (uint8_t*)om)); |
1606 | |
1607 | job->origin.predexp = predexp; |
1608 | job->origin.cb = ops_bg_scan_tr_complete; |
1609 | job->origin.udata = (void*)job; |
1610 | |
1611 | cf_info(AS_SCAN, "starting ops-bg scan job %lu {%s:%s} rps %u from %s" , |
1612 | _job->trid, ns->name, as_namespace_get_set_name(ns, set_id), |
1613 | _job->rps, _job->client); |
1614 | |
1615 | int result = as_scan_manager_start_job(_job); |
1616 | |
1617 | if (result != 0) { |
1618 | cf_warning(AS_SCAN, "ops-bg scan job %lu failed to start (%d)" , |
1619 | _job->trid, result); |
1620 | as_scan_job_destroy(_job); |
1621 | return result; |
1622 | } |
1623 | |
1624 | if (as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK)) { |
1625 | tr->from.proto_fd_h->last_used = cf_getns(); |
1626 | as_end_of_transaction_ok(tr->from.proto_fd_h); |
1627 | } |
1628 | else { |
1629 | cf_warning(AS_SCAN, "ops-bg scan job error sending fin" ); |
1630 | as_end_of_transaction_force_close(tr->from.proto_fd_h); |
1631 | // No point returning an error - it can't be reported on this socket. |
1632 | } |
1633 | |
1634 | tr->from.proto_fd_h = NULL; |
1635 | |
1636 | return AS_OK; |
1637 | } |
1638 | |
1639 | //---------------------------------------------------------- |
1640 | // ops_bg_scan_job mandatory scan_job interface. |
1641 | // |
1642 | |
1643 | void |
1644 | ops_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv) |
1645 | { |
1646 | as_index_reduce_live(rsv->tree, ops_bg_scan_job_reduce_cb, (void*)_job); |
1647 | } |
1648 | |
1649 | void |
1650 | ops_bg_scan_job_finish(as_scan_job* _job) |
1651 | { |
1652 | ops_bg_scan_job* job = (ops_bg_scan_job*)_job; |
1653 | |
1654 | while (job->n_active_tr != 0) { |
1655 | usleep(100); |
1656 | } |
1657 | |
1658 | switch (_job->abandoned) { |
1659 | case 0: |
1660 | as_incr_uint64(&_job->ns->n_scan_ops_bg_complete); |
1661 | break; |
1662 | case AS_SCAN_ERR_USER_ABORT: |
1663 | as_incr_uint64(&_job->ns->n_scan_ops_bg_abort); |
1664 | break; |
1665 | case AS_SCAN_ERR_UNKNOWN: |
1666 | case AS_SCAN_ERR_CLUSTER_KEY: |
1667 | default: |
1668 | as_incr_uint64(&_job->ns->n_scan_ops_bg_error); |
1669 | break; |
1670 | } |
1671 | |
1672 | cf_info(AS_SCAN, "finished ops-bg scan job %lu (%d)" , _job->trid, |
1673 | _job->abandoned); |
1674 | } |
1675 | |
1676 | void |
1677 | ops_bg_scan_job_destroy(as_scan_job* _job) |
1678 | { |
1679 | ops_bg_scan_job* job = (ops_bg_scan_job*)_job; |
1680 | |
1681 | iops_origin_destroy(&job->origin); |
1682 | } |
1683 | |
1684 | void |
1685 | ops_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat) |
1686 | { |
1687 | strcpy(stat->job_type, scan_type_str(SCAN_TYPE_OPS_BG)); |
1688 | stat->net_io_bytes = sizeof(cl_msg); // size of original synchronous fin |
1689 | stat->socket_timeout = CF_SOCKET_TIMEOUT; |
1690 | |
1691 | ops_bg_scan_job* job = (ops_bg_scan_job*)_job; |
1692 | char* = stat->jdata + strlen(stat->jdata); |
1693 | |
1694 | sprintf(extra, ":ops-active=%u" , job->n_active_tr); |
1695 | } |
1696 | |
1697 | //---------------------------------------------------------- |
1698 | // ops_bg_scan_job utilities. |
1699 | // |
1700 | |
1701 | uint8_t* |
1702 | ops_bg_validate_ops(const as_msg* m) |
1703 | { |
1704 | if ((m->info1 & AS_MSG_INFO1_READ) != 0) { |
1705 | cf_warning(AS_SCAN, "ops not write only" ); |
1706 | return NULL; |
1707 | } |
1708 | |
1709 | if (m->n_ops == 0) { |
1710 | cf_warning(AS_SCAN, "ops scan has no ops" ); |
1711 | return NULL; |
1712 | } |
1713 | |
1714 | // TODO - should we at least de-fuzz the ops, so all the sub-transactions |
1715 | // won't fail later? |
1716 | int i = 0; |
1717 | |
1718 | return (uint8_t*)as_msg_op_iterate(m, NULL, &i); |
1719 | } |
1720 | |
1721 | void |
1722 | ops_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata) |
1723 | { |
1724 | as_scan_job* _job = (as_scan_job*)udata; |
1725 | ops_bg_scan_job* job = (ops_bg_scan_job*)_job; |
1726 | as_namespace* ns = _job->ns; |
1727 | |
1728 | if (_job->abandoned != 0) { |
1729 | as_record_done(r_ref, ns); |
1730 | return; |
1731 | } |
1732 | |
1733 | as_index* r = r_ref->r; |
1734 | |
1735 | if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) { |
1736 | as_record_done(r_ref, ns); |
1737 | return; |
1738 | } |
1739 | |
1740 | predexp_args_t predargs = { .ns = ns, .md = r }; |
1741 | |
1742 | if (job->origin.predexp != NULL && |
1743 | predexp_matches_metadata(job->origin.predexp, &predargs) == |
1744 | PREDEXP_FALSE) { |
1745 | as_record_done(r_ref, ns); |
1746 | as_incr_uint64(&_job->n_filtered_meta); |
1747 | as_incr_uint64(&ns->n_ops_sub_write_filtered_out); |
1748 | return; |
1749 | } |
1750 | |
1751 | // Save this before releasing record. |
1752 | cf_digest keyd = r->keyd; |
1753 | |
1754 | // Release record lock before throttling and enqueuing transaction. |
1755 | as_record_done(r_ref, ns); |
1756 | |
1757 | // Prefer not reaching target RPS to queue buildup and transaction timeouts. |
1758 | while (as_load_uint32(&job->n_active_tr) > MAX_ACTIVE_TRANSACTIONS) { |
1759 | usleep(1000); |
1760 | } |
1761 | |
1762 | throttle_sleep(_job); |
1763 | |
1764 | as_transaction tr; |
1765 | as_transaction_init_iops(&tr, ns, &keyd, &job->origin); |
1766 | |
1767 | as_incr_uint32(&job->n_active_tr); |
1768 | as_service_enqueue_internal(&tr); |
1769 | } |
1770 | |
1771 | void |
1772 | ops_bg_scan_tr_complete(void* udata, int result) |
1773 | { |
1774 | as_scan_job* _job = (as_scan_job*)udata; |
1775 | ops_bg_scan_job* job = (ops_bg_scan_job*)_job; |
1776 | |
1777 | as_decr_uint32(&job->n_active_tr); |
1778 | |
1779 | switch (result) { |
1780 | case AS_OK: |
1781 | as_incr_uint64(&_job->n_succeeded); |
1782 | break; |
1783 | case AS_ERR_NOT_FOUND: // record deleted after generating tr |
1784 | break; |
1785 | case AS_ERR_FILTERED_OUT: |
1786 | as_incr_uint64(&_job->n_filtered_bins); |
1787 | break; |
1788 | default: |
1789 | as_incr_uint64(&_job->n_failed); |
1790 | break; |
1791 | } |
1792 | } |
1793 | |