1/*
2 * job_manager.h
3 *
4 * Copyright (C) 2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#pragma once
24
25#include <stdbool.h>
26#include <stdint.h>
27
28#include "citrusleaf/cf_atomic.h"
29#include "citrusleaf/cf_queue.h"
30#include "citrusleaf/cf_queue_priority.h"
31
32#include "cf_mutex.h"
33
34struct as_job_s;
35struct as_job_manager_s;
36struct as_mon_jobstat_s;
37struct as_namespace_s;
38struct as_partition_reservation_s;
39
40//----------------------------------------------------------
41// as_priority_thread_pool - class header.
42// TODO - move to common.
43//
44
45typedef struct as_priority_thread_pool_s {
46 cf_queue_priority* dispatch_queue;
47 uint32_t n_threads;
48} as_priority_thread_pool;
49
50typedef void (*as_priority_thread_pool_task_fn)(void* task);
51
52// Same as cf_queue_priority scheme, so no internal conversion needed:
53#define THREAD_POOL_PRIORITY_LOW CF_QUEUE_PRIORITY_LOW
54#define THREAD_POOL_PRIORITY_MEDIUM CF_QUEUE_PRIORITY_MEDIUM
55#define THREAD_POOL_PRIORITY_HIGH CF_QUEUE_PRIORITY_HIGH
56
57void as_priority_thread_pool_init(as_priority_thread_pool* pool, uint32_t n_threads);
58void as_priority_thread_pool_resize(as_priority_thread_pool* pool, uint32_t n_threads);
59void as_priority_thread_pool_queue_task(as_priority_thread_pool* pool, as_priority_thread_pool_task_fn task_fn, void* task, int priority);
60bool as_priority_thread_pool_remove_task(as_priority_thread_pool* pool, void* task);
61void as_priority_thread_pool_change_task_priority(as_priority_thread_pool* pool, void* task, int new_priority);
62
63//----------------------------------------------------------
64// as_job - base class header.
65//
66
67typedef void (*as_job_slice_fn)(struct as_job_s* _job, struct as_partition_reservation_s* rsv);
68typedef void (*as_job_finish_fn)(struct as_job_s* _job);
69typedef void (*as_job_destroy_fn)(struct as_job_s* _job);
70typedef void (*as_job_info_fn)(struct as_job_s* _job, struct as_mon_jobstat_s* stat);
71
72typedef struct as_job_vtable_s {
73 as_job_slice_fn slice_fn;
74 as_job_finish_fn finish_fn;
75 as_job_destroy_fn destroy_fn;
76 as_job_info_fn info_mon_fn;
77} as_job_vtable;
78
79typedef enum {
80 RSV_WRITE = 0,
81 RSV_MIGRATE = 1
82} as_job_rsv_type;
83
84// Same as cf_queue_priority scheme, so no internal conversion needed:
85#define AS_JOB_PRIORITY_LOW THREAD_POOL_PRIORITY_LOW
86#define AS_JOB_PRIORITY_MEDIUM THREAD_POOL_PRIORITY_MEDIUM
87#define AS_JOB_PRIORITY_HIGH THREAD_POOL_PRIORITY_HIGH
88
89// Same as proto result codes so connected scans don't have to convert:
90#define AS_JOB_FAIL_UNKNOWN AS_ERR_UNKNOWN
91#define AS_JOB_FAIL_PARAMETER AS_ERR_PARAMETER
92#define AS_JOB_FAIL_CLUSTER_KEY AS_ERR_CLUSTER_KEY_MISMATCH
93#define AS_JOB_FAIL_USER_ABORT AS_ERR_SCAN_ABORT
94#define AS_JOB_FAIL_FORBIDDEN AS_ERR_FORBIDDEN
95
96// These result codes can't make it back to the client, but show in monitor:
97#define AS_JOB_FAIL_RESPONSE_ERROR (-1)
98#define AS_JOB_FAIL_RESPONSE_TIMEOUT (-2)
99
100typedef struct as_job_s {
101 // Mandatory interface for derived classes:
102 as_job_vtable vtable;
103
104 // Parent:
105 struct as_job_manager_s* mgr;
106
107 // Which partitions to reduce:
108 as_job_rsv_type rsv_type;
109
110 // Unique identifier:
111 uint64_t trid;
112
113 // Job scope:
114 struct as_namespace_s* ns;
115 uint16_t set_id;
116
117 // Handle active phase:
118 cf_mutex requeue_lock;
119 int priority;
120 cf_atomic32 active_rc;
121 volatile int next_pid;
122 volatile int abandoned;
123
124 // For tracking:
125 char client[64];
126 uint64_t start_ms;
127 uint64_t finish_ms;
128 cf_atomic64 n_records_read;
129} as_job;
130
131void as_job_init(as_job* _job, const as_job_vtable* vtable,
132 struct as_job_manager_s* manager, as_job_rsv_type rsv_type,
133 uint64_t trid, struct as_namespace_s* ns, uint16_t set_id, int priority,
134 const char* client);
135void as_job_slice(void* task);
136void as_job_finish(as_job* _job);
137void as_job_destroy(as_job* _job);
138void as_job_info(as_job* _job, struct as_mon_jobstat_s* stat);
139void as_job_active_reserve(as_job* _job);
140void as_job_active_release(as_job* _job);
141
142//----------------------------------------------------------
143// as_job_manager - class header.
144//
145
146typedef struct as_job_manager_s {
147 cf_mutex lock;
148 cf_queue* active_jobs;
149 cf_queue* finished_jobs;
150 as_priority_thread_pool thread_pool;
151
152 // Manager configuration:
153 uint32_t max_active;
154 uint32_t max_done;
155} as_job_manager;
156
157void as_job_manager_init(as_job_manager* mgr, uint32_t max_active, uint32_t max_done, uint32_t n_threads);
158int as_job_manager_start_job(as_job_manager* mgr, as_job* _job);
159void as_job_manager_requeue_job(as_job_manager* mgr, as_job* _job);
160void as_job_manager_finish_job(as_job_manager* mgr, as_job* _job);
161void as_job_manager_abandon_job(as_job_manager* mgr, as_job* _job, int reason);
162bool as_job_manager_abort_job(as_job_manager* mgr, uint64_t trid);
163int as_job_manager_abort_all_jobs(as_job_manager* mgr);
164bool as_job_manager_change_job_priority(as_job_manager* mgr, uint64_t trid, int priority);
165void as_job_manager_limit_active_jobs(as_job_manager* mgr, uint32_t max_active);
166void as_job_manager_limit_finished_jobs(as_job_manager* mgr, uint32_t max_done);
167void as_job_manager_resize_thread_pool(as_job_manager* mgr, uint32_t n_threads);
168struct as_mon_jobstat_s* as_job_manager_get_job_info(as_job_manager* mgr, uint64_t trid);
169struct as_mon_jobstat_s* as_job_manager_get_info(as_job_manager* mgr, int* size);
170int as_job_manager_get_active_job_count(as_job_manager* mgr);
171