1/*
2 * scan.c
3 *
4 * Copyright (C) 2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==============================================================================
24// Includes.
25//
26
27#include "base/scan.h"
28
29#include <errno.h>
30#include <fcntl.h>
31#include <stdbool.h>
32#include <stddef.h>
33#include <stdint.h>
34#include <stdio.h>
35#include <string.h>
36#include <unistd.h>
37
38#include "aerospike/as_atomic.h"
39#include "aerospike/as_list.h"
40#include "aerospike/as_module.h"
41#include "aerospike/as_string.h"
42#include "aerospike/as_val.h"
43#include "citrusleaf/alloc.h"
44#include "citrusleaf/cf_clock.h"
45#include "citrusleaf/cf_digest.h"
46#include "citrusleaf/cf_ll.h"
47#include "citrusleaf/cf_vector.h"
48
49#include "cf_mutex.h"
50#include "cf_thread.h"
51#include "dynbuf.h"
52#include "fault.h"
53#include "socket.h"
54
55#include "base/aggr.h"
56#include "base/cfg.h"
57#include "base/datamodel.h"
58#include "base/index.h"
59#include "base/monitor.h"
60#include "base/predexp.h"
61#include "base/proto.h"
62#include "base/scan_job.h"
63#include "base/scan_manager.h"
64#include "base/secondary_index.h"
65#include "base/service.h"
66#include "base/transaction.h"
67#include "fabric/exchange.h"
68#include "fabric/partition.h"
69#include "transaction/rw_utils.h"
70#include "transaction/udf.h"
71#include "transaction/write.h"
72
73
74
75//==============================================================================
76// Typedefs and forward declarations.
77//
78
79//----------------------------------------------------------
80// Scan types.
81//
82
83typedef enum {
84 SCAN_TYPE_BASIC = 0,
85 SCAN_TYPE_AGGR = 1,
86 SCAN_TYPE_UDF_BG = 2,
87 SCAN_TYPE_OPS_BG = 3,
88
89 SCAN_TYPE_UNKNOWN = -1
90} scan_type;
91
92static inline const char*
93scan_type_str(scan_type type)
94{
95 switch (type) {
96 case SCAN_TYPE_BASIC:
97 return "basic";
98 case SCAN_TYPE_AGGR:
99 return "aggregation";
100 case SCAN_TYPE_UDF_BG:
101 return "background-udf";
102 case SCAN_TYPE_OPS_BG:
103 return "background-ops";
104 default:
105 return "?";
106 }
107}
108
109//----------------------------------------------------------
110// scan_job - derived classes' public methods.
111//
112
113int basic_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id);
114int aggr_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id);
115int udf_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id);
116int ops_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id);
117
118//----------------------------------------------------------
119// Non-class-specific utilities.
120//
121
122typedef struct scan_options_s {
123 int priority;
124 bool fail_on_cluster_change;
125 uint32_t sample_pct;
126} scan_options;
127
128int get_scan_set_id(as_transaction* tr, as_namespace* ns, uint16_t* p_set_id);
129scan_type get_scan_type(as_transaction* tr);
130bool get_scan_options(as_transaction* tr, scan_options* options);
131bool get_scan_rps(as_transaction* tr, uint32_t* rps);
132void convert_old_priority(int old_priority, uint32_t* rps);
133bool validate_background_scan_rps(const as_namespace* ns, uint32_t* rps);
134bool get_scan_socket_timeout(as_transaction* tr, uint32_t* timeout);
135bool get_scan_predexp(as_transaction* tr, predexp_eval_t** p_predexp);
136size_t send_blocking_response_chunk(as_file_handle* fd_h, uint8_t* buf, size_t size, int32_t timeout);
137static inline bool excluded_set(as_index* r, uint16_t set_id);
138
139
140
141//==============================================================================
142// Constants.
143//
144
145#define LOW_PRIORITY_RPS 5000 // for compatibility with old clients
146
147const size_t INIT_BUF_BUILDER_SIZE = 1024 * 1024 * 2;
148const size_t SCAN_CHUNK_LIMIT = 1024 * 1024;
149
150#define MAX_ACTIVE_TRANSACTIONS 200
151
152
153
154//==============================================================================
155// Public API.
156//
157
158void
159as_scan_init(void)
160{
161 as_scan_manager_init();
162}
163
164int
165as_scan(as_transaction* tr, as_namespace* ns)
166{
167 int result;
168 uint16_t set_id = INVALID_SET_ID;
169
170 if ((result = get_scan_set_id(tr, ns, &set_id)) != AS_OK) {
171 return result;
172 }
173
174 switch (get_scan_type(tr)) {
175 case SCAN_TYPE_BASIC:
176 result = basic_scan_job_start(tr, ns, set_id);
177 break;
178 case SCAN_TYPE_AGGR:
179 result = aggr_scan_job_start(tr, ns, set_id);
180 break;
181 case SCAN_TYPE_UDF_BG:
182 result = udf_bg_scan_job_start(tr, ns, set_id);
183 break;
184 case SCAN_TYPE_OPS_BG:
185 result = ops_bg_scan_job_start(tr, ns, set_id);
186 break;
187 default:
188 cf_warning(AS_SCAN, "can't identify scan type");
189 result = AS_ERR_PARAMETER;
190 break;
191 }
192
193 return result;
194}
195
196void
197as_scan_limit_finished_jobs(void)
198{
199 as_scan_manager_limit_finished_jobs();
200}
201
202int
203as_scan_get_active_job_count(void)
204{
205 return as_scan_manager_get_active_job_count();
206}
207
208int
209as_scan_list(char* name, cf_dyn_buf* db)
210{
211 (void)name;
212
213 as_mon_info_cmd(AS_MON_MODULES[SCAN_MOD], NULL, 0, 0, db);
214 return 0;
215}
216
217as_mon_jobstat*
218as_scan_get_jobstat(uint64_t trid)
219{
220 return as_scan_manager_get_job_info(trid);
221}
222
223as_mon_jobstat*
224as_scan_get_jobstat_all(int* size)
225{
226 return as_scan_manager_get_info(size);
227}
228
229int
230as_scan_abort(uint64_t trid)
231{
232 return as_scan_manager_abort_job(trid) ? 0 : -1;
233}
234
235int
236as_scan_abort_all(void)
237{
238 return as_scan_manager_abort_all_jobs();
239}
240
241
242//==============================================================================
243// Non-class-specific utilities.
244//
245
246int
247get_scan_set_id(as_transaction* tr, as_namespace* ns, uint16_t* p_set_id)
248{
249 uint16_t set_id = INVALID_SET_ID;
250 as_msg_field* f = as_transaction_has_set(tr) ?
251 as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_SET) : NULL;
252
253 if (f && as_msg_field_get_value_sz(f) != 0) {
254 uint32_t set_name_len = as_msg_field_get_value_sz(f);
255 char set_name[set_name_len + 1];
256
257 memcpy(set_name, f->data, set_name_len);
258 set_name[set_name_len] = '\0';
259 set_id = as_namespace_get_set_id(ns, set_name);
260
261 if (set_id == INVALID_SET_ID) {
262 cf_warning(AS_SCAN, "scan msg from %s has unrecognized set %s",
263 tr->from.proto_fd_h->client, set_name);
264 return AS_ERR_NOT_FOUND;
265 }
266 }
267
268 *p_set_id = set_id;
269
270 return AS_OK;
271}
272
273scan_type
274get_scan_type(as_transaction* tr)
275{
276 if (! as_transaction_is_udf(tr)) {
277 return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ?
278 SCAN_TYPE_OPS_BG : SCAN_TYPE_BASIC;
279 }
280
281 as_msg_field* udf_op_f = as_msg_field_get(&tr->msgp->msg,
282 AS_MSG_FIELD_TYPE_UDF_OP);
283
284 if (udf_op_f && *udf_op_f->data == (uint8_t)AS_UDF_OP_AGGREGATE) {
285 return SCAN_TYPE_AGGR;
286 }
287
288 if (udf_op_f && *udf_op_f->data == (uint8_t)AS_UDF_OP_BACKGROUND) {
289 return SCAN_TYPE_UDF_BG;
290 }
291
292 return SCAN_TYPE_UNKNOWN;
293}
294
295bool
296get_scan_options(as_transaction* tr, scan_options* options)
297{
298 if (! as_transaction_has_scan_options(tr)) {
299 return true;
300 }
301
302 as_msg_field* f = as_msg_field_get(&tr->msgp->msg,
303 AS_MSG_FIELD_TYPE_SCAN_OPTIONS);
304
305 if (as_msg_field_get_value_sz(f) != 2) {
306 cf_warning(AS_SCAN, "scan msg options field size not 2");
307 return false;
308 }
309
310 options->priority = AS_MSG_FIELD_SCAN_PRIORITY(f->data[0]);
311 options->fail_on_cluster_change =
312 (AS_MSG_FIELD_SCAN_FAIL_ON_CLUSTER_CHANGE & f->data[0]) != 0;
313 options->sample_pct = f->data[1];
314
315 return true;
316}
317
318bool
319get_scan_rps(as_transaction* tr, uint32_t* rps)
320{
321 if (! as_transaction_has_recs_per_sec(tr)) {
322 return true;
323 }
324
325 as_msg_field* f = as_msg_field_get(&tr->msgp->msg,
326 AS_MSG_FIELD_TYPE_RECS_PER_SEC);
327
328 if (as_msg_field_get_value_sz(f) != 4) {
329 cf_warning(AS_SCAN, "scan recs-per-sec field size not 4");
330 return false;
331 }
332
333 *rps = cf_swap_from_be32(*(uint32_t*)f->data);
334
335 return true;
336}
337
338void
339convert_old_priority(int old_priority, uint32_t* rps)
340{
341 if (old_priority != 0 && *rps != 0) {
342 cf_warning(AS_SCAN, "unexpected - scan has rps %u and priority %d",
343 *rps, old_priority);
344 return;
345 }
346
347 if (old_priority == 1 && *rps == 0) {
348 cf_info(AS_SCAN, "low-priority scan from old client will use %u rps",
349 LOW_PRIORITY_RPS);
350
351 *rps = LOW_PRIORITY_RPS;
352 }
353}
354
355bool
356validate_background_scan_rps(const as_namespace* ns, uint32_t* rps)
357{
358 if (*rps > ns->background_scan_max_rps) {
359 cf_warning(AS_SCAN, "scan rps %u exceeds 'background-scan-max-rps' %u",
360 *rps, ns->background_scan_max_rps);
361 return false;
362 }
363
364 if (*rps == 0) {
365 *rps = ns->background_scan_max_rps;
366 }
367
368 return true;
369}
370
371bool
372get_scan_socket_timeout(as_transaction* tr, uint32_t* timeout)
373{
374 if (! as_transaction_has_socket_timeout(tr)) {
375 return true;
376 }
377
378 as_msg_field* f = as_msg_field_get(&tr->msgp->msg,
379 AS_MSG_FIELD_TYPE_SOCKET_TIMEOUT);
380
381 if (as_msg_field_get_value_sz(f) != 4) {
382 cf_warning(AS_SCAN, "scan socket timeout field size not 4");
383 return false;
384 }
385
386 *timeout = cf_swap_from_be32(*(uint32_t*)f->data);
387
388 return true;
389}
390
391bool
392get_scan_predexp(as_transaction* tr, predexp_eval_t** p_predexp)
393{
394 if (! as_transaction_has_predexp(tr)) {
395 return true;
396 }
397
398 as_msg_field* f = as_msg_field_get(&tr->msgp->msg,
399 AS_MSG_FIELD_TYPE_PREDEXP);
400
401 *p_predexp = predexp_build(f);
402
403 return *p_predexp != NULL;
404}
405
406size_t
407send_blocking_response_chunk(as_file_handle* fd_h, uint8_t* buf, size_t size,
408 int32_t timeout)
409{
410 cf_socket* sock = &fd_h->sock;
411 as_proto proto;
412
413 proto.version = PROTO_VERSION;
414 proto.type = PROTO_TYPE_AS_MSG;
415 proto.sz = size;
416 as_proto_swap(&proto);
417
418 if (cf_socket_send_all(sock, (uint8_t*)&proto, sizeof(as_proto),
419 MSG_NOSIGNAL | MSG_MORE, timeout) < 0) {
420 cf_warning(AS_SCAN, "error sending to %s - fd %d %s", fd_h->client,
421 CSFD(sock), cf_strerror(errno));
422 return 0;
423 }
424
425 if (cf_socket_send_all(sock, buf, size, MSG_NOSIGNAL, timeout) < 0) {
426 cf_warning(AS_SCAN, "error sending to %s - fd %d sz %lu %s",
427 fd_h->client, CSFD(sock), size, cf_strerror(errno));
428 return 0;
429 }
430
431 return sizeof(as_proto) + size;
432}
433
434static inline bool
435excluded_set(as_index* r, uint16_t set_id)
436{
437 return set_id != INVALID_SET_ID && set_id != as_index_get_set_id(r);
438}
439
440static inline void
441throttle_sleep(as_scan_job* _job)
442{
443 uint32_t sleep_us = as_scan_job_throttle(_job);
444
445 if (sleep_us != 0) {
446 usleep(sleep_us);
447 }
448}
449
450
451
452//==============================================================================
453// conn_scan_job derived class implementation - not final class.
454//
455
456//----------------------------------------------------------
457// conn_scan_job typedefs and forward declarations.
458//
459
460typedef struct conn_scan_job_s {
461 // Base object must be first:
462 as_scan_job _base;
463
464 // Derived class data:
465 cf_mutex fd_lock;
466 as_file_handle* fd_h;
467 int32_t fd_timeout;
468
469 uint64_t net_io_bytes;
470} conn_scan_job;
471
472void conn_scan_job_own_fd(conn_scan_job* job, as_file_handle* fd_h, uint32_t timeout);
473void conn_scan_job_disown_fd(conn_scan_job* job);
474void conn_scan_job_finish(conn_scan_job* job);
475bool conn_scan_job_send_response(conn_scan_job* job, uint8_t* buf, size_t size);
476void conn_scan_job_release_fd(conn_scan_job* job, bool force_close);
477void conn_scan_job_info(conn_scan_job* job, as_mon_jobstat* stat);
478
479//----------------------------------------------------------
480// conn_scan_job API.
481//
482
483void
484conn_scan_job_own_fd(conn_scan_job* job, as_file_handle* fd_h, uint32_t timeout)
485{
486 cf_mutex_init(&job->fd_lock);
487
488 job->fd_h = fd_h;
489 job->fd_timeout = timeout == 0 ? -1 : (int32_t)timeout;
490
491 job->net_io_bytes = 0;
492}
493
494void
495conn_scan_job_disown_fd(conn_scan_job* job)
496{
497 // Just undo conn_scan_job_own_fd(), nothing more.
498
499 cf_mutex_destroy(&job->fd_lock);
500}
501
502void
503conn_scan_job_finish(conn_scan_job* job)
504{
505 as_scan_job* _job = (as_scan_job*)job;
506
507 if (job->fd_h) {
508 // TODO - perhaps reflect in monitor if send fails?
509 size_t size_sent = as_msg_send_fin_timeout(&job->fd_h->sock,
510 _job->abandoned, job->fd_timeout);
511
512 job->net_io_bytes += size_sent;
513 conn_scan_job_release_fd(job, size_sent == 0);
514 }
515
516 cf_mutex_destroy(&job->fd_lock);
517}
518
519bool
520conn_scan_job_send_response(conn_scan_job* job, uint8_t* buf, size_t size)
521{
522 as_scan_job* _job = (as_scan_job*)job;
523
524 cf_mutex_lock(&job->fd_lock);
525
526 if (! job->fd_h) {
527 cf_mutex_unlock(&job->fd_lock);
528 // Job already abandoned.
529 return false;
530 }
531
532 size_t size_sent = send_blocking_response_chunk(job->fd_h, buf, size,
533 job->fd_timeout);
534
535 if (size_sent == 0) {
536 int reason = errno == ETIMEDOUT ?
537 AS_SCAN_ERR_RESPONSE_TIMEOUT : AS_SCAN_ERR_RESPONSE_ERROR;
538
539 conn_scan_job_release_fd(job, true);
540 cf_mutex_unlock(&job->fd_lock);
541 as_scan_manager_abandon_job(_job, reason);
542 return false;
543 }
544
545 job->net_io_bytes += size_sent;
546
547 cf_mutex_unlock(&job->fd_lock);
548 return true;
549}
550
551void
552conn_scan_job_release_fd(conn_scan_job* job, bool force_close)
553{
554 job->fd_h->last_used = cf_getns();
555 as_end_of_transaction(job->fd_h, force_close);
556 job->fd_h = NULL;
557}
558
559void
560conn_scan_job_info(conn_scan_job* job, as_mon_jobstat* stat)
561{
562 stat->net_io_bytes = job->net_io_bytes;
563 stat->socket_timeout = job->fd_timeout;
564}
565
566
567
568//==============================================================================
569// basic_scan_job derived class implementation.
570//
571
572//----------------------------------------------------------
573// basic_scan_job typedefs and forward declarations.
574//
575
576typedef struct basic_scan_job_s {
577 // Base object must be first:
578 conn_scan_job _base;
579
580 // Derived class data:
581 uint64_t cluster_key;
582 bool fail_on_cluster_change;
583 bool no_bin_data;
584 uint32_t sample_pct;
585 predexp_eval_t* predexp;
586 cf_vector* bin_names;
587} basic_scan_job;
588
589void basic_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv);
590void basic_scan_job_finish(as_scan_job* _job);
591void basic_scan_job_destroy(as_scan_job* _job);
592void basic_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat);
593
594const as_scan_vtable basic_scan_job_vtable = {
595 basic_scan_job_slice,
596 basic_scan_job_finish,
597 basic_scan_job_destroy,
598 basic_scan_job_info
599};
600
601typedef struct basic_scan_slice_s {
602 basic_scan_job* job;
603 cf_buf_builder** bb_r;
604} basic_scan_slice;
605
606void basic_scan_job_reduce_cb(as_index_ref* r_ref, void* udata);
607bool basic_scan_predexp_filter_meta(const basic_scan_job* job, const as_record* r, predexp_eval_t** predexp);
608cf_vector* bin_names_from_op(as_msg* m, int* result);
609
610//----------------------------------------------------------
611// basic_scan_job public API.
612//
613
614int
615basic_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id)
616{
617 basic_scan_job* job = cf_malloc(sizeof(basic_scan_job));
618 as_scan_job* _job = (as_scan_job*)job;
619
620 scan_options options = { .sample_pct = 100 };
621 uint32_t rps = 0;
622 uint32_t timeout = CF_SOCKET_TIMEOUT;
623 predexp_eval_t* predexp = NULL;
624
625 if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps) ||
626 ! get_scan_socket_timeout(tr, &timeout) ||
627 ! get_scan_predexp(tr, &predexp)) {
628 cf_warning(AS_SCAN, "basic scan job failed msg field processing");
629 cf_free(job);
630 return AS_ERR_PARAMETER;
631 }
632
633 convert_old_priority(options.priority, &rps);
634
635 as_scan_job_init(_job, &basic_scan_job_vtable, as_transaction_trid(tr), ns,
636 set_id, rps, tr->from.proto_fd_h->client);
637
638 job->cluster_key = as_exchange_cluster_key();
639 job->fail_on_cluster_change = options.fail_on_cluster_change;
640 job->no_bin_data = (tr->msgp->msg.info1 & AS_MSG_INFO1_GET_NO_BINS) != 0;
641 job->sample_pct = options.sample_pct;
642 job->predexp = predexp;
643
644 int result;
645
646 job->bin_names = bin_names_from_op(&tr->msgp->msg, &result);
647
648 if (! job->bin_names && result != AS_OK) {
649 as_scan_job_destroy(_job);
650 return result;
651 }
652
653 if (job->fail_on_cluster_change &&
654 (ns->migrate_tx_partitions_remaining != 0 ||
655 ns->migrate_rx_partitions_remaining != 0)) {
656 cf_warning(AS_SCAN, "basic scan job not started - migration");
657 as_scan_job_destroy(_job);
658 return AS_ERR_CLUSTER_KEY_MISMATCH;
659 }
660
661 // Take ownership of socket from transaction.
662 conn_scan_job_own_fd((conn_scan_job*)job, tr->from.proto_fd_h, timeout);
663
664 cf_info(AS_SCAN, "starting basic scan job %lu {%s:%s} rps %u sample-pct %u%s%s socket-timeout %u from %s",
665 _job->trid, ns->name, as_namespace_get_set_name(ns, set_id),
666 _job->rps, job->sample_pct,
667 job->no_bin_data ? ", metadata-only" : "",
668 job->fail_on_cluster_change ? ", fail-on-cluster-change" : "",
669 timeout, _job->client);
670
671 if ((result = as_scan_manager_start_job(_job)) != 0) {
672 cf_warning(AS_SCAN, "basic scan job %lu failed to start (%d)",
673 _job->trid, result);
674 conn_scan_job_disown_fd((conn_scan_job*)job);
675 as_scan_job_destroy(_job);
676 return result;
677 }
678
679 return AS_OK;
680}
681
682//----------------------------------------------------------
683// basic_scan_job mandatory scan_job interface.
684//
685
686void
687basic_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv)
688{
689 basic_scan_job* job = (basic_scan_job*)_job;
690 as_index_tree* tree = rsv->tree;
691 cf_buf_builder* bb = cf_buf_builder_create(INIT_BUF_BUILDER_SIZE);
692 uint64_t slice_start = cf_getms();
693 basic_scan_slice slice = { job, &bb };
694
695 if (job->sample_pct == 100) {
696 as_index_reduce_live(tree, basic_scan_job_reduce_cb, (void*)&slice);
697 }
698 else {
699 uint64_t sample_count =
700 ((as_index_tree_size(tree) * job->sample_pct) / 100);
701
702 as_index_reduce_partial_live(tree, sample_count,
703 basic_scan_job_reduce_cb, (void*)&slice);
704 }
705
706 if (bb->used_sz != 0) {
707 conn_scan_job_send_response((conn_scan_job*)job, bb->buf, bb->used_sz);
708 }
709
710 cf_buf_builder_free(bb);
711
712 cf_detail(AS_SCAN, "%s:%u basic scan job %lu in thread %d took %lu ms",
713 rsv->ns->name, rsv->p->id, _job->trid, cf_thread_sys_tid(),
714 cf_getms() - slice_start);
715}
716
717void
718basic_scan_job_finish(as_scan_job* _job)
719{
720 conn_scan_job_finish((conn_scan_job*)_job);
721
722 switch (_job->abandoned) {
723 case 0:
724 as_incr_uint64(&_job->ns->n_scan_basic_complete);
725 break;
726 case AS_SCAN_ERR_USER_ABORT:
727 as_incr_uint64(&_job->ns->n_scan_basic_abort);
728 break;
729 case AS_SCAN_ERR_UNKNOWN:
730 case AS_SCAN_ERR_CLUSTER_KEY:
731 case AS_SCAN_ERR_RESPONSE_ERROR:
732 case AS_SCAN_ERR_RESPONSE_TIMEOUT:
733 default:
734 as_incr_uint64(&_job->ns->n_scan_basic_error);
735 break;
736 }
737
738 cf_info(AS_SCAN, "finished basic scan job %lu (%d)", _job->trid,
739 _job->abandoned);
740}
741
742void
743basic_scan_job_destroy(as_scan_job* _job)
744{
745 basic_scan_job* job = (basic_scan_job*)_job;
746
747 if (job->bin_names) {
748 cf_vector_destroy(job->bin_names);
749 }
750
751 predexp_destroy(job->predexp);
752}
753
754void
755basic_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat)
756{
757 strcpy(stat->job_type, scan_type_str(SCAN_TYPE_BASIC));
758 conn_scan_job_info((conn_scan_job*)_job, stat);
759}
760
761//----------------------------------------------------------
762// basic_scan_job utilities.
763//
764
765void
766basic_scan_job_reduce_cb(as_index_ref* r_ref, void* udata)
767{
768 basic_scan_slice* slice = (basic_scan_slice*)udata;
769 basic_scan_job* job = slice->job;
770 as_scan_job* _job = (as_scan_job*)job;
771 as_namespace* ns = _job->ns;
772
773 if (_job->abandoned != 0) {
774 as_record_done(r_ref, ns);
775 return;
776 }
777
778 if (job->fail_on_cluster_change &&
779 job->cluster_key != as_exchange_cluster_key()) {
780 as_record_done(r_ref, ns);
781 as_scan_manager_abandon_job(_job, AS_ERR_CLUSTER_KEY_MISMATCH);
782 return;
783 }
784
785 as_index* r = r_ref->r;
786
787 if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) {
788 as_record_done(r_ref, ns);
789 return;
790 }
791
792 predexp_eval_t* predexp = NULL;
793
794 if (! basic_scan_predexp_filter_meta(job, r, &predexp)) {
795 as_record_done(r_ref, ns);
796 as_incr_uint64(&_job->n_filtered_meta);
797 return;
798 }
799
800 as_storage_rd rd;
801
802 as_storage_record_open(ns, r, &rd);
803
804 if (predexp != NULL && predexp_read_and_filter_bins(&rd, predexp) != 0) {
805 as_storage_record_close(&rd);
806 as_record_done(r_ref, ns);
807 as_incr_uint64(&_job->n_filtered_bins);
808
809 if (! ns->storage_data_in_memory) {
810 throttle_sleep(_job);
811 }
812
813 return;
814 }
815
816 if (job->no_bin_data) {
817 as_msg_make_response_bufbuilder(slice->bb_r, &rd, true, NULL);
818 }
819 else {
820 if (as_storage_rd_load_n_bins(&rd) < 0) {
821 cf_warning(AS_SCAN, "job %lu - record unreadable", _job->trid);
822 as_storage_record_close(&rd);
823 as_record_done(r_ref, ns);
824 as_incr_uint64(&_job->n_failed);
825 return;
826 }
827
828 as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd.n_bins];
829
830 if (as_storage_rd_load_bins(&rd, stack_bins)) {
831 cf_warning(AS_SCAN, "job %lu - record unreadable", _job->trid);
832 as_storage_record_close(&rd);
833 as_record_done(r_ref, ns);
834 as_incr_uint64(&_job->n_failed);
835 return;
836 }
837
838 as_msg_make_response_bufbuilder(slice->bb_r, &rd, false,
839 job->bin_names);
840 }
841
842 as_storage_record_close(&rd);
843 as_record_done(r_ref, ns);
844 as_incr_uint64(&_job->n_succeeded);
845
846 throttle_sleep(_job);
847
848 cf_buf_builder* bb = *slice->bb_r;
849
850 // If we exceed the proto size limit, send accumulated data back to client
851 // and reset the buf-builder to start a new proto.
852 if (bb->used_sz > SCAN_CHUNK_LIMIT) {
853 if (! conn_scan_job_send_response((conn_scan_job*)job, bb->buf,
854 bb->used_sz)) {
855 return;
856 }
857
858 cf_buf_builder_reset(bb);
859 }
860}
861
862bool
863basic_scan_predexp_filter_meta(const basic_scan_job* job, const as_record* r,
864 predexp_eval_t** predexp)
865{
866 *predexp = job->predexp;
867
868 if (*predexp == NULL) {
869 return true;
870 }
871
872 as_namespace* ns = ((as_scan_job*)job)->ns;
873 predexp_args_t predargs = { .ns = ns, .md = (as_record*)r };
874 predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs);
875
876 if (predrv == PREDEXP_UNKNOWN) {
877 return true; // caller must later check bins using *predexp
878 }
879 // else - caller will not need to apply filter later.
880
881 *predexp = NULL;
882
883 return predrv == PREDEXP_TRUE;
884}
885
886cf_vector*
887bin_names_from_op(as_msg* m, int* result)
888{
889 *result = AS_OK;
890
891 if (m->n_ops == 0) {
892 return NULL;
893 }
894
895 cf_vector* v = cf_vector_create(AS_BIN_NAME_MAX_SZ, m->n_ops, 0);
896
897 as_msg_op* op = NULL;
898 int n = 0;
899
900 while ((op = as_msg_op_iterate(m, op, &n)) != NULL) {
901 if (op->name_sz >= AS_BIN_NAME_MAX_SZ) {
902 cf_warning(AS_SCAN, "basic scan job bin name too long");
903 cf_vector_destroy(v);
904 *result = AS_ERR_BIN_NAME;
905 return NULL;
906 }
907
908 char bin_name[AS_BIN_NAME_MAX_SZ];
909
910 memcpy(bin_name, op->name, op->name_sz);
911 bin_name[op->name_sz] = 0;
912 cf_vector_append_unique(v, (void*)bin_name);
913 }
914
915 return v;
916}
917
918
919
920//==============================================================================
921// aggr_scan_job derived class implementation.
922//
923
924//----------------------------------------------------------
925// aggr_scan_job typedefs and forward declarations.
926//
927
928typedef struct aggr_scan_job_s {
929 // Base object must be first:
930 conn_scan_job _base;
931
932 // Derived class data:
933 as_aggr_call aggr_call;
934} aggr_scan_job;
935
936void aggr_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv);
937void aggr_scan_job_finish(as_scan_job* _job);
938void aggr_scan_job_destroy(as_scan_job* _job);
939void aggr_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat);
940
941const as_scan_vtable aggr_scan_job_vtable = {
942 aggr_scan_job_slice,
943 aggr_scan_job_finish,
944 aggr_scan_job_destroy,
945 aggr_scan_job_info
946};
947
948typedef struct aggr_scan_slice_s {
949 aggr_scan_job* job;
950 cf_ll* ll;
951 cf_buf_builder** bb_r;
952 as_partition_reservation* rsv;
953} aggr_scan_slice;
954
955bool aggr_scan_init(as_aggr_call* call, const as_transaction* tr);
956void aggr_scan_job_reduce_cb(as_index_ref* r_ref, void* udata);
957bool aggr_scan_add_digest(cf_ll* ll, cf_digest* keyd);
958as_partition_reservation* aggr_scan_ptn_reserve(void* udata, as_namespace* ns,
959 uint32_t pid, as_partition_reservation* rsv);
960as_stream_status aggr_scan_ostream_write(void* udata, as_val* val);
961
962const as_aggr_hooks scan_aggr_hooks = {
963 .ostream_write = aggr_scan_ostream_write,
964 .set_error = NULL,
965 .ptn_reserve = aggr_scan_ptn_reserve,
966 .ptn_release = NULL,
967 .pre_check = NULL
968};
969
970void aggr_scan_add_val_response(aggr_scan_slice* slice, const as_val* val,
971 bool success);
972
973//----------------------------------------------------------
974// aggr_scan_job public API.
975//
976
977int
978aggr_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id)
979{
980 aggr_scan_job* job = cf_malloc(sizeof(aggr_scan_job));
981 as_scan_job* _job = (as_scan_job*)job;
982
983 scan_options options = { .sample_pct = 100 };
984 uint32_t rps = 0;
985 uint32_t timeout = CF_SOCKET_TIMEOUT;
986
987 if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps) ||
988 ! get_scan_socket_timeout(tr, &timeout)) {
989 cf_warning(AS_SCAN, "aggregation scan job failed msg field processing");
990 cf_free(job);
991 return AS_ERR_PARAMETER;
992 }
993
994 if (as_transaction_has_predexp(tr)) {
995 cf_warning(AS_SCAN, "aggregation scans do not support predexp filters");
996 cf_free(job);
997 return AS_ERR_UNSUPPORTED_FEATURE;
998 }
999
1000 convert_old_priority(options.priority, &rps);
1001
1002 as_scan_job_init(_job, &aggr_scan_job_vtable, as_transaction_trid(tr), ns,
1003 set_id, rps, tr->from.proto_fd_h->client);
1004
1005 if (! aggr_scan_init(&job->aggr_call, tr)) {
1006 cf_warning(AS_SCAN, "aggregation scan job failed call init");
1007 as_scan_job_destroy(_job);
1008 return AS_ERR_PARAMETER;
1009 }
1010
1011 // Take ownership of socket from transaction.
1012 conn_scan_job_own_fd((conn_scan_job*)job, tr->from.proto_fd_h, timeout);
1013
1014 cf_info(AS_SCAN, "starting aggregation scan job %lu {%s:%s} rps %u socket-timeout %u from %s",
1015 _job->trid, ns->name, as_namespace_get_set_name(ns, set_id),
1016 _job->rps, timeout, _job->client);
1017
1018 int result = as_scan_manager_start_job(_job);
1019
1020 if (result != 0) {
1021 cf_warning(AS_SCAN, "aggregation scan job %lu failed to start (%d)",
1022 _job->trid, result);
1023 conn_scan_job_disown_fd((conn_scan_job*)job);
1024 as_scan_job_destroy(_job);
1025 return result;
1026 }
1027
1028 return AS_OK;
1029}
1030
1031//----------------------------------------------------------
1032// aggr_scan_job mandatory scan_job interface.
1033//
1034
1035void
1036aggr_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv)
1037{
1038 aggr_scan_job* job = (aggr_scan_job*)_job;
1039 cf_ll ll;
1040
1041 cf_ll_init(&ll, as_index_keys_ll_destroy_fn, false);
1042
1043 cf_buf_builder* bb = cf_buf_builder_create(INIT_BUF_BUILDER_SIZE);
1044 aggr_scan_slice slice = { job, &ll, &bb, rsv };
1045
1046 as_index_reduce_live(rsv->tree, aggr_scan_job_reduce_cb, (void*)&slice);
1047
1048 if (cf_ll_size(&ll) != 0) {
1049 as_result result;
1050 as_result_init(&result);
1051
1052 int ret = as_aggr_process(_job->ns, &job->aggr_call, &ll, (void*)&slice,
1053 &result);
1054
1055 if (ret != 0) {
1056 char* rs = as_module_err_string(ret);
1057
1058 if (result.value) {
1059 as_string* lua_s = as_string_fromval(result.value);
1060 char* lua_err = (char*)as_string_tostring(lua_s);
1061
1062 if (lua_err) {
1063 int l_rs_len = strlen(rs);
1064
1065 rs = cf_realloc(rs, l_rs_len + strlen(lua_err) + 4);
1066 sprintf(&rs[l_rs_len], " : %s", lua_err);
1067 }
1068 }
1069
1070 const as_val* v = (as_val*)as_string_new(rs, false);
1071
1072 aggr_scan_add_val_response(&slice, v, false);
1073 as_val_destroy(v);
1074 cf_free(rs);
1075 as_scan_manager_abandon_job(_job, AS_ERR_UNKNOWN);
1076 }
1077
1078 as_result_destroy(&result);
1079 }
1080
1081 cf_ll_reduce(&ll, true, as_index_keys_ll_reduce_fn, NULL);
1082
1083 if (bb->used_sz != 0) {
1084 conn_scan_job_send_response((conn_scan_job*)job, bb->buf, bb->used_sz);
1085 }
1086
1087 cf_buf_builder_free(bb);
1088}
1089
1090void
1091aggr_scan_job_finish(as_scan_job* _job)
1092{
1093 aggr_scan_job* job = (aggr_scan_job*)_job;
1094
1095 conn_scan_job_finish((conn_scan_job*)job);
1096
1097 if (job->aggr_call.def.arglist) {
1098 as_list_destroy(job->aggr_call.def.arglist);
1099 job->aggr_call.def.arglist = NULL;
1100 }
1101
1102 switch (_job->abandoned) {
1103 case 0:
1104 as_incr_uint64(&_job->ns->n_scan_aggr_complete);
1105 break;
1106 case AS_SCAN_ERR_USER_ABORT:
1107 as_incr_uint64(&_job->ns->n_scan_aggr_abort);
1108 break;
1109 case AS_SCAN_ERR_UNKNOWN:
1110 case AS_SCAN_ERR_CLUSTER_KEY:
1111 case AS_SCAN_ERR_RESPONSE_ERROR:
1112 case AS_SCAN_ERR_RESPONSE_TIMEOUT:
1113 default:
1114 as_incr_uint64(&_job->ns->n_scan_aggr_error);
1115 break;
1116 }
1117
1118 cf_info(AS_SCAN, "finished aggregation scan job %lu (%d)", _job->trid,
1119 _job->abandoned);
1120}
1121
1122void
1123aggr_scan_job_destroy(as_scan_job* _job)
1124{
1125 aggr_scan_job* job = (aggr_scan_job*)_job;
1126
1127 if (job->aggr_call.def.arglist) {
1128 as_list_destroy(job->aggr_call.def.arglist);
1129 }
1130}
1131
1132void
1133aggr_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat)
1134{
1135 strcpy(stat->job_type, scan_type_str(SCAN_TYPE_AGGR));
1136 conn_scan_job_info((conn_scan_job*)_job, stat);
1137}
1138
1139//----------------------------------------------------------
1140// aggr_scan_job utilities.
1141//
1142
1143bool
1144aggr_scan_init(as_aggr_call* call, const as_transaction* tr)
1145{
1146 if (! udf_def_init_from_msg(&call->def, tr)) {
1147 return false;
1148 }
1149
1150 call->aggr_hooks = &scan_aggr_hooks;
1151
1152 return true;
1153}
1154
1155void
1156aggr_scan_job_reduce_cb(as_index_ref* r_ref, void* udata)
1157{
1158 aggr_scan_slice* slice = (aggr_scan_slice*)udata;
1159 aggr_scan_job* job = slice->job;
1160 as_scan_job* _job = (as_scan_job*)job;
1161 as_namespace* ns = _job->ns;
1162
1163 if (_job->abandoned != 0) {
1164 as_record_done(r_ref, ns);
1165 return;
1166 }
1167
1168 as_index* r = r_ref->r;
1169
1170 if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) {
1171 as_record_done(r_ref, ns);
1172 return;
1173 }
1174
1175 if (! aggr_scan_add_digest(slice->ll, &r->keyd)) {
1176 as_record_done(r_ref, ns);
1177 as_scan_manager_abandon_job(_job, AS_ERR_UNKNOWN);
1178 return;
1179 }
1180
1181 as_record_done(r_ref, ns);
1182 as_incr_uint64(&_job->n_succeeded);
1183
1184 throttle_sleep(_job);
1185}
1186
1187bool
1188aggr_scan_add_digest(cf_ll* ll, cf_digest* keyd)
1189{
1190 as_index_keys_ll_element* tail_e = (as_index_keys_ll_element*)ll->tail;
1191 as_index_keys_arr* keys_arr;
1192
1193 if (tail_e) {
1194 keys_arr = tail_e->keys_arr;
1195
1196 if (keys_arr->num == AS_INDEX_KEYS_PER_ARR) {
1197 tail_e = NULL;
1198 }
1199 }
1200
1201 if (! tail_e) {
1202 if (! (keys_arr = as_index_get_keys_arr())) {
1203 return false;
1204 }
1205
1206 tail_e = cf_malloc(sizeof(as_index_keys_ll_element));
1207
1208 tail_e->keys_arr = keys_arr;
1209 cf_ll_append(ll, (cf_ll_element*)tail_e);
1210 }
1211
1212 keys_arr->pindex_digs[keys_arr->num] = *keyd;
1213 keys_arr->num++;
1214
1215 return true;
1216}
1217
1218as_partition_reservation*
1219aggr_scan_ptn_reserve(void* udata, as_namespace* ns, uint32_t pid,
1220 as_partition_reservation* rsv)
1221{
1222 aggr_scan_slice* slice = (aggr_scan_slice*)udata;
1223
1224 return slice->rsv;
1225}
1226
1227as_stream_status
1228aggr_scan_ostream_write(void* udata, as_val* val)
1229{
1230 aggr_scan_slice* slice = (aggr_scan_slice*)udata;
1231
1232 if (val) {
1233 aggr_scan_add_val_response(slice, val, true);
1234 as_val_destroy(val);
1235 }
1236
1237 return AS_STREAM_OK;
1238}
1239
1240void
1241aggr_scan_add_val_response(aggr_scan_slice* slice, const as_val* val,
1242 bool success)
1243{
1244 uint32_t size = as_particle_asval_client_value_size(val);
1245
1246 as_msg_make_val_response_bufbuilder(val, slice->bb_r, size, success);
1247
1248 cf_buf_builder* bb = *slice->bb_r;
1249 conn_scan_job* conn_job = (conn_scan_job*)slice->job;
1250
1251 // If we exceed the proto size limit, send accumulated data back to client
1252 // and reset the buf-builder to start a new proto.
1253 if (bb->used_sz > SCAN_CHUNK_LIMIT) {
1254 if (! conn_scan_job_send_response(conn_job, bb->buf, bb->used_sz)) {
1255 return;
1256 }
1257
1258 cf_buf_builder_reset(bb);
1259 }
1260}
1261
1262
1263
1264//==============================================================================
1265// udf_bg_scan_job derived class implementation.
1266//
1267
1268//----------------------------------------------------------
1269// udf_bg_scan_job typedefs and forward declarations.
1270//
1271
1272typedef struct udf_bg_scan_job_s {
1273 // Base object must be first:
1274 as_scan_job _base;
1275
1276 // Derived class data:
1277 iudf_origin origin;
1278 uint32_t n_active_tr;
1279} udf_bg_scan_job;
1280
1281void udf_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv);
1282void udf_bg_scan_job_finish(as_scan_job* _job);
1283void udf_bg_scan_job_destroy(as_scan_job* _job);
1284void udf_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat);
1285
1286const as_scan_vtable udf_bg_scan_job_vtable = {
1287 udf_bg_scan_job_slice,
1288 udf_bg_scan_job_finish,
1289 udf_bg_scan_job_destroy,
1290 udf_bg_scan_job_info
1291};
1292
1293void udf_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata);
1294void udf_bg_scan_tr_complete(void* udata, int result);
1295
1296//----------------------------------------------------------
1297// udf_bg_scan_job public API.
1298//
1299
1300int
1301udf_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id)
1302{
1303 udf_bg_scan_job* job = cf_malloc(sizeof(udf_bg_scan_job));
1304 as_scan_job* _job = (as_scan_job*)job;
1305
1306 scan_options options = { .sample_pct = 100 };
1307 uint32_t rps = 0;
1308
1309 if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps)) {
1310 cf_warning(AS_SCAN, "udf-bg scan job failed msg field processing");
1311 cf_free(job);
1312 return AS_ERR_PARAMETER;
1313 }
1314
1315 convert_old_priority(options.priority, &rps);
1316
1317 if (! validate_background_scan_rps(ns, &rps)) {
1318 cf_warning(AS_SCAN, "udf-bg scan job failed rps check");
1319 cf_free(job);
1320 return AS_ERR_PARAMETER;
1321 }
1322
1323 predexp_eval_t* predexp = NULL;
1324
1325 if (! get_scan_predexp(tr, &predexp)) {
1326 cf_warning(AS_SCAN, "udf-bg scan job failed predexp processing");
1327 cf_free(job);
1328 return AS_ERR_PARAMETER;
1329 }
1330
1331 as_scan_job_init(_job, &udf_bg_scan_job_vtable, as_transaction_trid(tr), ns,
1332 set_id, rps, tr->from.proto_fd_h->client);
1333
1334 job->n_active_tr = 0;
1335
1336 if (! udf_def_init_from_msg(&job->origin.def, tr)) {
1337 cf_warning(AS_SCAN, "udf-bg scan job failed def init");
1338 as_scan_job_destroy(_job);
1339 return AS_ERR_PARAMETER;
1340 }
1341
1342 uint8_t info2 = AS_MSG_INFO2_WRITE |
1343 (tr->msgp->msg.info2 & AS_MSG_INFO2_DURABLE_DELETE);
1344
1345 job->origin.msgp =
1346 as_msg_create_internal(ns->name, 0, info2, 0, 0, NULL, 0);
1347
1348 job->origin.predexp = predexp;
1349 job->origin.cb = udf_bg_scan_tr_complete;
1350 job->origin.udata = (void*)job;
1351
1352 cf_info(AS_SCAN, "starting udf-bg scan job %lu {%s:%s} rps %u from %s",
1353 _job->trid, ns->name, as_namespace_get_set_name(ns, set_id),
1354 _job->rps, _job->client);
1355
1356 int result = as_scan_manager_start_job(_job);
1357
1358 if (result != 0) {
1359 cf_warning(AS_SCAN, "udf-bg scan job %lu failed to start (%d)",
1360 _job->trid, result);
1361 as_scan_job_destroy(_job);
1362 return result;
1363 }
1364
1365 if (as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK)) {
1366 tr->from.proto_fd_h->last_used = cf_getns();
1367 as_end_of_transaction_ok(tr->from.proto_fd_h);
1368 }
1369 else {
1370 cf_warning(AS_SCAN, "udf-bg scan job error sending fin");
1371 as_end_of_transaction_force_close(tr->from.proto_fd_h);
1372 // No point returning an error - it can't be reported on this socket.
1373 }
1374
1375 tr->from.proto_fd_h = NULL;
1376
1377 return AS_OK;
1378}
1379
1380//----------------------------------------------------------
1381// udf_bg_scan_job mandatory scan_job interface.
1382//
1383
1384void
1385udf_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv)
1386{
1387 as_index_reduce_live(rsv->tree, udf_bg_scan_job_reduce_cb, (void*)_job);
1388}
1389
1390void
1391udf_bg_scan_job_finish(as_scan_job* _job)
1392{
1393 udf_bg_scan_job* job = (udf_bg_scan_job*)_job;
1394
1395 while (job->n_active_tr != 0) {
1396 usleep(100);
1397 }
1398
1399 switch (_job->abandoned) {
1400 case 0:
1401 as_incr_uint64(&_job->ns->n_scan_udf_bg_complete);
1402 break;
1403 case AS_SCAN_ERR_USER_ABORT:
1404 as_incr_uint64(&_job->ns->n_scan_udf_bg_abort);
1405 break;
1406 case AS_SCAN_ERR_UNKNOWN:
1407 case AS_SCAN_ERR_CLUSTER_KEY:
1408 default:
1409 as_incr_uint64(&_job->ns->n_scan_udf_bg_error);
1410 break;
1411 }
1412
1413 cf_info(AS_SCAN, "finished udf-bg scan job %lu (%d)", _job->trid,
1414 _job->abandoned);
1415}
1416
1417void
1418udf_bg_scan_job_destroy(as_scan_job* _job)
1419{
1420 udf_bg_scan_job* job = (udf_bg_scan_job*)_job;
1421
1422 iudf_origin_destroy(&job->origin);
1423}
1424
1425void
1426udf_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat)
1427{
1428 strcpy(stat->job_type, scan_type_str(SCAN_TYPE_UDF_BG));
1429 stat->net_io_bytes = sizeof(cl_msg); // size of original synchronous fin
1430 stat->socket_timeout = CF_SOCKET_TIMEOUT;
1431
1432 udf_bg_scan_job* job = (udf_bg_scan_job*)_job;
1433 char* extra = stat->jdata + strlen(stat->jdata);
1434
1435 sprintf(extra, ":udf-filename=%s:udf-function=%s:udf-active=%u",
1436 job->origin.def.filename, job->origin.def.function,
1437 job->n_active_tr);
1438}
1439
1440//----------------------------------------------------------
1441// udf_bg_scan_job utilities.
1442//
1443
1444void
1445udf_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata)
1446{
1447 as_scan_job* _job = (as_scan_job*)udata;
1448 udf_bg_scan_job* job = (udf_bg_scan_job*)_job;
1449 as_namespace* ns = _job->ns;
1450
1451 if (_job->abandoned != 0) {
1452 as_record_done(r_ref, ns);
1453 return;
1454 }
1455
1456 as_index* r = r_ref->r;
1457
1458 if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) {
1459 as_record_done(r_ref, ns);
1460 return;
1461 }
1462
1463 predexp_args_t predargs = { .ns = ns, .md = r };
1464
1465 if (job->origin.predexp != NULL &&
1466 predexp_matches_metadata(job->origin.predexp, &predargs) ==
1467 PREDEXP_FALSE) {
1468 as_record_done(r_ref, ns);
1469 as_incr_uint64(&_job->n_filtered_meta);
1470 as_incr_uint64(&ns->n_udf_sub_udf_filtered_out);
1471 return;
1472 }
1473
1474 // Save this before releasing record.
1475 cf_digest keyd = r->keyd;
1476
1477 // Release record lock before throttling and enqueuing transaction.
1478 as_record_done(r_ref, ns);
1479
1480 // Prefer not reaching target RPS to queue buildup and transaction timeouts.
1481 while (as_load_uint32(&job->n_active_tr) > MAX_ACTIVE_TRANSACTIONS) {
1482 usleep(1000);
1483 }
1484
1485 throttle_sleep(_job);
1486
1487 as_transaction tr;
1488 as_transaction_init_iudf(&tr, ns, &keyd, &job->origin);
1489
1490 as_incr_uint32(&job->n_active_tr);
1491 as_service_enqueue_internal(&tr);
1492}
1493
1494void
1495udf_bg_scan_tr_complete(void* udata, int result)
1496{
1497 as_scan_job* _job = (as_scan_job*)udata;
1498 udf_bg_scan_job* job = (udf_bg_scan_job*)_job;
1499
1500 as_decr_uint32(&job->n_active_tr);
1501
1502 switch (result) {
1503 case AS_OK:
1504 as_incr_uint64(&_job->n_succeeded);
1505 break;
1506 case AS_ERR_NOT_FOUND: // record deleted after generating tr
1507 break;
1508 case AS_ERR_FILTERED_OUT:
1509 as_incr_uint64(&_job->n_filtered_bins);
1510 break;
1511 default:
1512 as_incr_uint64(&_job->n_failed);
1513 break;
1514 }
1515}
1516
1517
1518
1519//==============================================================================
1520// ops_bg_scan_job derived class implementation.
1521//
1522
1523//----------------------------------------------------------
1524// ops_bg_scan_job typedefs and forward declarations.
1525//
1526
1527typedef struct ops_bg_scan_job_s {
1528 // Base object must be first:
1529 as_scan_job _base;
1530
1531 // Derived class data:
1532 iops_origin origin;
1533 uint32_t n_active_tr;
1534} ops_bg_scan_job;
1535
1536void ops_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv);
1537void ops_bg_scan_job_finish(as_scan_job* _job);
1538void ops_bg_scan_job_destroy(as_scan_job* _job);
1539void ops_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat);
1540
1541const as_scan_vtable ops_bg_scan_job_vtable = {
1542 ops_bg_scan_job_slice,
1543 ops_bg_scan_job_finish,
1544 ops_bg_scan_job_destroy,
1545 ops_bg_scan_job_info
1546};
1547
1548uint8_t* ops_bg_validate_ops(const as_msg* m);
1549void ops_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata);
1550void ops_bg_scan_tr_complete(void* udata, int result);
1551
1552//----------------------------------------------------------
1553// ops_bg_scan_job public API.
1554//
1555
1556int
1557ops_bg_scan_job_start(as_transaction* tr, as_namespace* ns, uint16_t set_id)
1558{
1559 ops_bg_scan_job* job = cf_malloc(sizeof(ops_bg_scan_job));
1560 as_scan_job* _job = (as_scan_job*)job;
1561
1562 scan_options options = { .sample_pct = 100 };
1563 uint32_t rps = 0;
1564
1565 if (! get_scan_options(tr, &options) || ! get_scan_rps(tr, &rps)) {
1566 cf_warning(AS_SCAN, "ops-bg scan job failed msg field processing");
1567 cf_free(job);
1568 return AS_ERR_PARAMETER;
1569 }
1570
1571 if (! validate_background_scan_rps(ns, &rps)) {
1572 cf_warning(AS_SCAN, "ops-bg scan job failed rps check");
1573 cf_free(job);
1574 return AS_ERR_PARAMETER;
1575 }
1576
1577 as_msg* om = &tr->msgp->msg;
1578 uint8_t* ops = ops_bg_validate_ops(om);
1579
1580 if (ops == NULL) {
1581 cf_warning(AS_SCAN, "ops-bg scan job failed ops check");
1582 cf_free(job);
1583 return AS_ERR_PARAMETER;
1584 }
1585
1586 predexp_eval_t* predexp = NULL;
1587
1588 if (! get_scan_predexp(tr, &predexp)) {
1589 cf_warning(AS_SCAN, "ops-bg scan job failed predexp processing");
1590 cf_free(job);
1591 return AS_ERR_PARAMETER;
1592 }
1593
1594 as_scan_job_init(_job, &ops_bg_scan_job_vtable, as_transaction_trid(tr), ns,
1595 set_id, rps, tr->from.proto_fd_h->client);
1596
1597 job->n_active_tr = 0;
1598
1599 uint8_t info2 = AS_MSG_INFO2_WRITE |
1600 (om->info2 & AS_MSG_INFO2_DURABLE_DELETE);
1601 uint8_t info3 = AS_MSG_INFO3_UPDATE_ONLY |
1602 (om->info3 & AS_MSG_INFO3_REPLACE_ONLY);
1603
1604 job->origin.msgp = as_msg_create_internal(ns->name, 0, info2, info3,
1605 om->n_ops, ops, tr->msgp->proto.sz - (ops - (uint8_t*)om));
1606
1607 job->origin.predexp = predexp;
1608 job->origin.cb = ops_bg_scan_tr_complete;
1609 job->origin.udata = (void*)job;
1610
1611 cf_info(AS_SCAN, "starting ops-bg scan job %lu {%s:%s} rps %u from %s",
1612 _job->trid, ns->name, as_namespace_get_set_name(ns, set_id),
1613 _job->rps, _job->client);
1614
1615 int result = as_scan_manager_start_job(_job);
1616
1617 if (result != 0) {
1618 cf_warning(AS_SCAN, "ops-bg scan job %lu failed to start (%d)",
1619 _job->trid, result);
1620 as_scan_job_destroy(_job);
1621 return result;
1622 }
1623
1624 if (as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK)) {
1625 tr->from.proto_fd_h->last_used = cf_getns();
1626 as_end_of_transaction_ok(tr->from.proto_fd_h);
1627 }
1628 else {
1629 cf_warning(AS_SCAN, "ops-bg scan job error sending fin");
1630 as_end_of_transaction_force_close(tr->from.proto_fd_h);
1631 // No point returning an error - it can't be reported on this socket.
1632 }
1633
1634 tr->from.proto_fd_h = NULL;
1635
1636 return AS_OK;
1637}
1638
1639//----------------------------------------------------------
1640// ops_bg_scan_job mandatory scan_job interface.
1641//
1642
1643void
1644ops_bg_scan_job_slice(as_scan_job* _job, as_partition_reservation* rsv)
1645{
1646 as_index_reduce_live(rsv->tree, ops_bg_scan_job_reduce_cb, (void*)_job);
1647}
1648
1649void
1650ops_bg_scan_job_finish(as_scan_job* _job)
1651{
1652 ops_bg_scan_job* job = (ops_bg_scan_job*)_job;
1653
1654 while (job->n_active_tr != 0) {
1655 usleep(100);
1656 }
1657
1658 switch (_job->abandoned) {
1659 case 0:
1660 as_incr_uint64(&_job->ns->n_scan_ops_bg_complete);
1661 break;
1662 case AS_SCAN_ERR_USER_ABORT:
1663 as_incr_uint64(&_job->ns->n_scan_ops_bg_abort);
1664 break;
1665 case AS_SCAN_ERR_UNKNOWN:
1666 case AS_SCAN_ERR_CLUSTER_KEY:
1667 default:
1668 as_incr_uint64(&_job->ns->n_scan_ops_bg_error);
1669 break;
1670 }
1671
1672 cf_info(AS_SCAN, "finished ops-bg scan job %lu (%d)", _job->trid,
1673 _job->abandoned);
1674}
1675
1676void
1677ops_bg_scan_job_destroy(as_scan_job* _job)
1678{
1679 ops_bg_scan_job* job = (ops_bg_scan_job*)_job;
1680
1681 iops_origin_destroy(&job->origin);
1682}
1683
1684void
1685ops_bg_scan_job_info(as_scan_job* _job, as_mon_jobstat* stat)
1686{
1687 strcpy(stat->job_type, scan_type_str(SCAN_TYPE_OPS_BG));
1688 stat->net_io_bytes = sizeof(cl_msg); // size of original synchronous fin
1689 stat->socket_timeout = CF_SOCKET_TIMEOUT;
1690
1691 ops_bg_scan_job* job = (ops_bg_scan_job*)_job;
1692 char* extra = stat->jdata + strlen(stat->jdata);
1693
1694 sprintf(extra, ":ops-active=%u", job->n_active_tr);
1695}
1696
1697//----------------------------------------------------------
1698// ops_bg_scan_job utilities.
1699//
1700
1701uint8_t*
1702ops_bg_validate_ops(const as_msg* m)
1703{
1704 if ((m->info1 & AS_MSG_INFO1_READ) != 0) {
1705 cf_warning(AS_SCAN, "ops not write only");
1706 return NULL;
1707 }
1708
1709 if (m->n_ops == 0) {
1710 cf_warning(AS_SCAN, "ops scan has no ops");
1711 return NULL;
1712 }
1713
1714 // TODO - should we at least de-fuzz the ops, so all the sub-transactions
1715 // won't fail later?
1716 int i = 0;
1717
1718 return (uint8_t*)as_msg_op_iterate(m, NULL, &i);
1719}
1720
1721void
1722ops_bg_scan_job_reduce_cb(as_index_ref* r_ref, void* udata)
1723{
1724 as_scan_job* _job = (as_scan_job*)udata;
1725 ops_bg_scan_job* job = (ops_bg_scan_job*)_job;
1726 as_namespace* ns = _job->ns;
1727
1728 if (_job->abandoned != 0) {
1729 as_record_done(r_ref, ns);
1730 return;
1731 }
1732
1733 as_index* r = r_ref->r;
1734
1735 if (excluded_set(r, _job->set_id) || as_record_is_doomed(r, ns)) {
1736 as_record_done(r_ref, ns);
1737 return;
1738 }
1739
1740 predexp_args_t predargs = { .ns = ns, .md = r };
1741
1742 if (job->origin.predexp != NULL &&
1743 predexp_matches_metadata(job->origin.predexp, &predargs) ==
1744 PREDEXP_FALSE) {
1745 as_record_done(r_ref, ns);
1746 as_incr_uint64(&_job->n_filtered_meta);
1747 as_incr_uint64(&ns->n_ops_sub_write_filtered_out);
1748 return;
1749 }
1750
1751 // Save this before releasing record.
1752 cf_digest keyd = r->keyd;
1753
1754 // Release record lock before throttling and enqueuing transaction.
1755 as_record_done(r_ref, ns);
1756
1757 // Prefer not reaching target RPS to queue buildup and transaction timeouts.
1758 while (as_load_uint32(&job->n_active_tr) > MAX_ACTIVE_TRANSACTIONS) {
1759 usleep(1000);
1760 }
1761
1762 throttle_sleep(_job);
1763
1764 as_transaction tr;
1765 as_transaction_init_iops(&tr, ns, &keyd, &job->origin);
1766
1767 as_incr_uint32(&job->n_active_tr);
1768 as_service_enqueue_internal(&tr);
1769}
1770
1771void
1772ops_bg_scan_tr_complete(void* udata, int result)
1773{
1774 as_scan_job* _job = (as_scan_job*)udata;
1775 ops_bg_scan_job* job = (ops_bg_scan_job*)_job;
1776
1777 as_decr_uint32(&job->n_active_tr);
1778
1779 switch (result) {
1780 case AS_OK:
1781 as_incr_uint64(&_job->n_succeeded);
1782 break;
1783 case AS_ERR_NOT_FOUND: // record deleted after generating tr
1784 break;
1785 case AS_ERR_FILTERED_OUT:
1786 as_incr_uint64(&_job->n_filtered_bins);
1787 break;
1788 default:
1789 as_incr_uint64(&_job->n_failed);
1790 break;
1791 }
1792}
1793