1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3/* -*- mode: C; c-basic-offset: 4 -*- */
4#ident "$Id$"
5/*======
6This file is part of TokuDB
7
8
9Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
10
11 TokuDB is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License, version 2,
13 as published by the Free Software Foundation.
14
15 TokuDB is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
22
23======= */
24
25#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
26
27#include "tokudb_background.h"
28#include "tokudb_sysvars.h"
29
30namespace tokudb {
31namespace background {
32
33
34std::atomic<uint64_t> job_manager_t::job_t::_next_id(1);
35
36job_manager_t::job_t::job_t(bool user_scheduled) :
37 _running(false),
38 _cancelled(false),
39 _id(_next_id++),
40 _user_scheduled(user_scheduled),
41 _scheduled_time(::time(0)),
42 _started_time(0) {
43}
44job_manager_t::job_t::~job_t() {
45}
46void* job_manager_t::operator new(size_t sz) {
47 return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
48}
49void job_manager_t::operator delete(void* p) {
50 tokudb::memory::free(p);
51}
52job_manager_t::job_manager_t() :
53 _sem(0, 65535),
54 _shutdown(false) {
55}
56job_manager_t::~job_manager_t() {
57}
58void job_manager_t::initialize() {
59 int r = _thread.start(thread_func, this);
60 assert_always(r == 0);
61}
62void job_manager_t::destroy() {
63 assert_always(!_shutdown);
64 assert_always(_foreground_jobs.size() == 0);
65 _shutdown = true;
66 _sem.set_interrupt();
67
68 while (_background_jobs.size()) {
69 mutex_t_lock(_mutex);
70 job_t* job = _background_jobs.front();
71 if (!job->cancelled())
72 cancel(job);
73 _background_jobs.pop_front();
74 delete job;
75 mutex_t_unlock(_mutex);
76 }
77
78 void* result;
79 int r = _thread.join(&result);
80 assert_always(r == 0);
81}
82bool job_manager_t::run_job(job_t* newjob, bool background) {
83 bool ret = false;
84 const char* jobkey = newjob->key();
85
86 mutex_t_lock(_mutex);
87 assert_always(!_shutdown);
88
89 for (jobs_t::iterator it = _background_jobs.begin();
90 it != _background_jobs.end();
91 it++) {
92 job_t* job = *it;
93 if (!job->cancelled() && strcmp(job->key(), jobkey) == 0) {
94 // if this is a foreground job being run and
95 // there is an existing background job of the same type
96 // and it is not running yet, we can cancel the background job
97 // and just run this one in the foreground, might have different
98 // params, but that is up to the user to figure out.
99 if (!background && !job->running()) {
100 job->cancel();
101 } else {
102 // can't schedule or run another job on the same key
103 goto cleanup;
104 }
105 }
106 }
107 for (jobs_t::iterator it = _foreground_jobs.begin();
108 it != _foreground_jobs.end();
109 it++) {
110 job_t* job = *it;
111 if (strcmp(job->key(), jobkey) == 0) {
112 // can't schedule or run another job on the same key
113 // as an existing foreground job
114 goto cleanup;
115 }
116 }
117
118 if (background) {
119 _background_jobs.push_back(newjob);
120 _sem.signal();
121 ret = true;
122 } else {
123 _foreground_jobs.push_back(newjob);
124
125 run(newjob);
126
127 for (jobs_t::iterator it = _foreground_jobs.begin();
128 it != _foreground_jobs.end();
129 it++) {
130 job_t* job = *it;
131 if (job == newjob) {
132 _foreground_jobs.erase(it);
133 delete job;
134 break;
135 }
136 }
137 ret = true;
138 }
139
140cleanup:
141 mutex_t_unlock(_mutex);
142 return ret;
143}
144bool job_manager_t::cancel_job(const char* key) {
145 bool ret = false;
146 mutex_t_lock(_mutex);
147
148 for (jobs_t::iterator it = _background_jobs.begin();
149 it != _background_jobs.end();
150 it++) {
151 job_t* job = *it;
152
153 if (!job->cancelled() && strcmp(job->key(), key) == 0) {
154 cancel(job);
155 ret = true;
156 }
157 }
158
159 mutex_t_unlock(_mutex);
160 return ret;
161}
162void job_manager_t::iterate_jobs(pfn_iterate_t callback, void* extra) const {
163 mutex_t_lock(_mutex);
164
165 for (jobs_t::const_iterator it = _background_jobs.begin();
166 it != _background_jobs.end();
167 it++) {
168 job_t* job = *it;
169 if (!job->cancelled()) {
170 callback(job, extra);
171 }
172 }
173
174 mutex_t_unlock(_mutex);
175}
176void* job_manager_t::thread_func(void* v) {
177 return ((tokudb::background::job_manager_t*)v)->real_thread_func();
178}
179void* job_manager_t::real_thread_func() {
180 while (_shutdown == false) {
181 tokudb::thread::semaphore_t::E_WAIT res = _sem.wait();
182 if (res == tokudb::thread::semaphore_t::E_INTERRUPTED || _shutdown) {
183 break;
184 } else if (res == tokudb::thread::semaphore_t::E_SIGNALLED) {
185#if TOKUDB_DEBUG
186 if (TOKUDB_UNLIKELY(
187 tokudb::sysvars::debug_pause_background_job_manager)) {
188 _sem.signal();
189 tokudb::time::sleep_microsec(250000);
190 continue;
191 }
192#endif // TOKUDB_DEBUG
193
194 mutex_t_lock(_mutex);
195 assert_debug(_background_jobs.size() > 0);
196 job_t* job = _background_jobs.front();
197 run(job);
198 _background_jobs.pop_front();
199 mutex_t_unlock(_mutex);
200 delete job;
201 }
202 }
203 return NULL;
204}
205void job_manager_t::run(job_t* job) {
206 assert_debug(_mutex.is_owned_by_me());
207 if (!job->cancelled()) {
208 mutex_t_unlock(_mutex);
209 // do job
210 job->run();
211 // done job
212 mutex_t_lock(_mutex);
213 }
214 if (!job->cancelled()) {
215 job->destroy();
216 }
217}
218void job_manager_t::cancel(job_t* job) {
219 assert_debug(_mutex.is_owned_by_me());
220 assert_always(!job->cancelled());
221 job->cancel();
222}
223job_manager_t* _job_manager = NULL;
224
225bool initialize() {
226 assert_always(_job_manager == NULL);
227 _job_manager = new job_manager_t;
228 _job_manager->initialize();
229 return true;
230}
231bool destroy() {
232 _job_manager->destroy();
233 delete _job_manager;
234 _job_manager = NULL;
235 return true;
236}
237} // namespace background
238} // namespace tokudb
239