1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include "portability/toku_race_tools.h"
40
41#include "ft/txn/txn.h"
42#include "locktree/locktree.h"
43#include "locktree/lock_request.h"
44#include "util/dbt.h"
45
46namespace toku {
47
48// initialize a lock request's internals
49void lock_request::create(void) {
50 m_txnid = TXNID_NONE;
51 m_conflicting_txnid = TXNID_NONE;
52 m_start_time = 0;
53 m_left_key = nullptr;
54 m_right_key = nullptr;
55 toku_init_dbt(&m_left_key_copy);
56 toku_init_dbt(&m_right_key_copy);
57
58 m_type = type::UNKNOWN;
59 m_lt = nullptr;
60
61 m_complete_r = 0;
62 m_state = state::UNINITIALIZED;
63 m_info = nullptr;
64
65 toku_cond_init(*lock_request_m_wait_cond_key, &m_wait_cond, nullptr);
66
67 m_start_test_callback = nullptr;
68 m_start_before_pending_test_callback = nullptr;
69 m_retry_test_callback = nullptr;
70}
71
72// destroy a lock request.
73void lock_request::destroy(void) {
74 invariant(m_state != state::PENDING);
75 invariant(m_state != state::DESTROYED);
76 m_state = state::DESTROYED;
77 toku_destroy_dbt(&m_left_key_copy);
78 toku_destroy_dbt(&m_right_key_copy);
79 toku_cond_destroy(&m_wait_cond);
80}
81
82// set the lock request parameters. this API allows a lock request to be reused.
83void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
84 invariant(m_state != state::PENDING);
85 m_lt = lt;
86 m_txnid = txnid;
87 m_left_key = left_key;
88 m_right_key = right_key;
89 toku_destroy_dbt(&m_left_key_copy);
90 toku_destroy_dbt(&m_right_key_copy);
91 m_type = lock_type;
92 m_state = state::INITIALIZED;
93 m_info = lt ? lt->get_lock_request_info() : nullptr;
94 m_big_txn = big_txn;
95 m_extra = extra;
96}
97
98// get rid of any stored left and right key copies and
99// replace them with copies of the given left and right key
100void lock_request::copy_keys() {
101 if (!toku_dbt_is_infinite(m_left_key)) {
102 toku_clone_dbt(&m_left_key_copy, *m_left_key);
103 m_left_key = &m_left_key_copy;
104 }
105 if (!toku_dbt_is_infinite(m_right_key)) {
106 toku_clone_dbt(&m_right_key_copy, *m_right_key);
107 m_right_key = &m_right_key_copy;
108 }
109}
110
111// what are the conflicts for this pending lock request?
112void lock_request::get_conflicts(txnid_set *conflicts) {
113 invariant(m_state == state::PENDING);
114 const bool is_write_request = m_type == type::WRITE;
115 m_lt->get_conflicts(is_write_request, m_txnid, m_left_key, m_right_key, conflicts);
116}
117
118// build a wait-for-graph for this lock request and the given conflict set
119// for each transaction B that blocks A's lock request
120// if B is blocked then
121// add (A,T) to the WFG and if B is new, fill in the WFG from B
122void lock_request::build_wait_graph(wfg *wait_graph, const txnid_set &conflicts) {
123 size_t num_conflicts = conflicts.size();
124 for (size_t i = 0; i < num_conflicts; i++) {
125 TXNID conflicting_txnid = conflicts.get(i);
126 lock_request *conflicting_request = find_lock_request(conflicting_txnid);
127 invariant(conflicting_txnid != m_txnid);
128 invariant(conflicting_request != this);
129 if (conflicting_request) {
130 bool already_exists = wait_graph->node_exists(conflicting_txnid);
131 wait_graph->add_edge(m_txnid, conflicting_txnid);
132 if (!already_exists) {
133 // recursively build the wait for graph rooted at the conflicting
134 // request, given its set of lock conflicts.
135 txnid_set other_conflicts;
136 other_conflicts.create();
137 conflicting_request->get_conflicts(&other_conflicts);
138 conflicting_request->build_wait_graph(wait_graph, other_conflicts);
139 other_conflicts.destroy();
140 }
141 }
142 }
143}
144
145// returns: true if the current set of lock requests contains
146// a deadlock, false otherwise.
147bool lock_request::deadlock_exists(const txnid_set &conflicts) {
148 wfg wait_graph;
149 wait_graph.create();
150
151 build_wait_graph(&wait_graph, conflicts);
152 bool deadlock = wait_graph.cycle_exists_from_txnid(m_txnid);
153
154 wait_graph.destroy();
155 return deadlock;
156}
157
158// try to acquire a lock described by this lock request.
159int lock_request::start(void) {
160 int r;
161
162 txnid_set conflicts;
163 conflicts.create();
164 if (m_type == type::WRITE) {
165 r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
166 } else {
167 invariant(m_type == type::READ);
168 r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
169 }
170
171 // if the lock is not granted, save it to the set of lock requests
172 // and check for a deadlock. if there is one, complete it as failed
173 if (r == DB_LOCK_NOTGRANTED) {
174 copy_keys();
175 m_state = state::PENDING;
176 m_start_time = toku_current_time_microsec() / 1000;
177 m_conflicting_txnid = conflicts.get(0);
178 if (m_start_before_pending_test_callback)
179 m_start_before_pending_test_callback();
180 toku_mutex_lock(&m_info->mutex);
181 insert_into_lock_requests();
182 if (deadlock_exists(conflicts)) {
183 remove_from_lock_requests();
184 r = DB_LOCK_DEADLOCK;
185 }
186 toku_mutex_unlock(&m_info->mutex);
187 if (m_start_test_callback)
188 m_start_test_callback(); // test callback
189 }
190
191 if (r != DB_LOCK_NOTGRANTED) {
192 complete(r);
193 }
194
195 conflicts.destroy();
196 return r;
197}
198
199// sleep on the lock request until it becomes resolved or the wait time has elapsed.
200int lock_request::wait(uint64_t wait_time_ms) {
201 return wait(wait_time_ms, 0, nullptr);
202}
203
204int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
205 void (*lock_wait_callback)(void *, TXNID, TXNID)) {
206 uint64_t t_now = toku_current_time_microsec();
207 uint64_t t_start = t_now;
208 uint64_t t_end = t_start + wait_time_ms * 1000;
209
210 toku_mutex_lock(&m_info->mutex);
211
212 // check again, this time locking out other retry calls
213 if (m_state == state::PENDING) {
214 GrowableArray<TXNID> conflicts_collector;
215 conflicts_collector.init();
216 retry(&conflicts_collector);
217 if (m_state == state::PENDING) {
218 report_waits(&conflicts_collector, lock_wait_callback);
219 }
220 conflicts_collector.deinit();
221 }
222
223 while (m_state == state::PENDING) {
224 // check if this thread is killed
225 if (killed_callback && killed_callback()) {
226 remove_from_lock_requests();
227 complete(DB_LOCK_NOTGRANTED);
228 continue;
229 }
230
231 // compute next wait time
232 uint64_t t_wait;
233 if (killed_time_ms == 0) {
234 t_wait = t_end;
235 } else {
236 t_wait = t_now + killed_time_ms * 1000;
237 if (t_wait > t_end)
238 t_wait = t_end;
239 }
240 struct timespec ts = {};
241 ts.tv_sec = t_wait / 1000000;
242 ts.tv_nsec = (t_wait % 1000000) * 1000;
243 int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts);
244 invariant(r == 0 || r == ETIMEDOUT);
245
246 t_now = toku_current_time_microsec();
247 if (m_state == state::PENDING && (t_now >= t_end)) {
248 m_info->counters.timeout_count += 1;
249
250 // if we're still pending and we timed out, then remove our
251 // request from the set of lock requests and fail.
252 remove_from_lock_requests();
253
254 // complete sets m_state to COMPLETE, breaking us out of the loop
255 complete(DB_LOCK_NOTGRANTED);
256 }
257 }
258
259 uint64_t t_real_end = toku_current_time_microsec();
260 uint64_t duration = t_real_end - t_start;
261 m_info->counters.wait_count += 1;
262 m_info->counters.wait_time += duration;
263 if (duration >= 1000000) {
264 m_info->counters.long_wait_count += 1;
265 m_info->counters.long_wait_time += duration;
266 }
267 toku_mutex_unlock(&m_info->mutex);
268
269 invariant(m_state == state::COMPLETE);
270 return m_complete_r;
271}
272
273// complete this lock request with the given return value
274void lock_request::complete(int complete_r) {
275 m_complete_r = complete_r;
276 m_state = state::COMPLETE;
277}
278
279const DBT *lock_request::get_left_key(void) const {
280 return m_left_key;
281}
282
283const DBT *lock_request::get_right_key(void) const {
284 return m_right_key;
285}
286
287TXNID lock_request::get_txnid(void) const {
288 return m_txnid;
289}
290
291uint64_t lock_request::get_start_time(void) const {
292 return m_start_time;
293}
294
295TXNID lock_request::get_conflicting_txnid(void) const {
296 return m_conflicting_txnid;
297}
298
299int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
300 invariant(m_state == state::PENDING);
301 int r;
302 txnid_set conflicts;
303 conflicts.create();
304
305 if (m_type == type::WRITE) {
306 r = m_lt->acquire_write_lock(
307 m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
308 } else {
309 r = m_lt->acquire_read_lock(
310 m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
311 }
312
313 // if the acquisition succeeded then remove ourselves from the
314 // set of lock requests, complete, and signal the waiting thread.
315 if (r == 0) {
316 remove_from_lock_requests();
317 complete(r);
318 if (m_retry_test_callback)
319 m_retry_test_callback(); // test callback
320 toku_cond_broadcast(&m_wait_cond);
321 } else {
322 m_conflicting_txnid = conflicts.get(0);
323 add_conflicts_to_waits(&conflicts, conflicts_collector);
324 }
325 conflicts.destroy();
326
327 return r;
328}
329
330void lock_request::retry_all_lock_requests(
331 locktree *lt,
332 void (*lock_wait_callback)(void *, TXNID, TXNID),
333 void (*after_retry_all_test_callback)(void)) {
334 lt_lock_request_info *info = lt->get_lock_request_info();
335
336 // if there are no pending lock requests than there is nothing to do
337 // the unlocked data race on pending_is_empty is OK since lock requests
338 // are retried after added to the pending set.
339 if (info->pending_is_empty)
340 return;
341
342 // get my retry generation (post increment of retry_want)
343 unsigned long long my_retry_want = (info->retry_want += 1);
344
345 toku_mutex_lock(&info->retry_mutex);
346
347 GrowableArray<TXNID> conflicts_collector;
348 conflicts_collector.init();
349
350 // here is the group retry algorithm.
351 // get the latest retry_want count and use it as the generation number of
352 // this retry operation. if this retry generation is > the last retry
353 // generation, then do the lock retries. otherwise, no lock retries
354 // are needed.
355 if ((my_retry_want - 1) == info->retry_done) {
356 for (;;) {
357 if (!info->running_retry) {
358 info->running_retry = true;
359 info->retry_done = info->retry_want;
360 toku_mutex_unlock(&info->retry_mutex);
361 retry_all_lock_requests_info(info, &conflicts_collector);
362 if (after_retry_all_test_callback)
363 after_retry_all_test_callback();
364 toku_mutex_lock(&info->retry_mutex);
365 info->running_retry = false;
366 toku_cond_broadcast(&info->retry_cv);
367 break;
368 } else {
369 toku_cond_wait(&info->retry_cv, &info->retry_mutex);
370 }
371 }
372 }
373 toku_mutex_unlock(&info->retry_mutex);
374
375 report_waits(&conflicts_collector, lock_wait_callback);
376 conflicts_collector.deinit();
377}
378
379void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, GrowableArray<TXNID> *collector) {
380 toku_mutex_lock(&info->mutex);
381 // retry all of the pending lock requests.
382 for (size_t i = 0; i < info->pending_lock_requests.size();) {
383 lock_request *request;
384 int r = info->pending_lock_requests.fetch(i, &request);
385 invariant_zero(r);
386
387 // retry the lock request. if it didn't succeed,
388 // move on to the next lock request. otherwise
389 // the request is gone from the list so we may
390 // read the i'th entry for the next one.
391 r = request->retry(collector);
392 if (r != 0) {
393 i++;
394 }
395 }
396
397 // future threads should only retry lock requests if some still exist
398 info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
399 toku_mutex_unlock(&info->mutex);
400}
401
402void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
403 GrowableArray<TXNID> *wait_conflicts) {
404 size_t num_conflicts = conflicts->size();
405 for (size_t i = 0; i < num_conflicts; i++) {
406 wait_conflicts->push(m_txnid);
407 wait_conflicts->push(conflicts->get(i));
408 }
409}
410
411void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
412 void (*lock_wait_callback)(void *, TXNID, TXNID)) {
413 if (!lock_wait_callback)
414 return;
415 size_t num_conflicts = wait_conflicts->get_size();
416 for (size_t i = 0; i < num_conflicts; i += 2) {
417 TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
418 TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
419 (*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
420 }
421}
422
423void *lock_request::get_extra(void) const {
424 return m_extra;
425}
426
427void lock_request::kill_waiter(void) {
428 remove_from_lock_requests();
429 complete(DB_LOCK_NOTGRANTED);
430 toku_cond_broadcast(&m_wait_cond);
431}
432
433void lock_request::kill_waiter(locktree *lt, void *extra) {
434 lt_lock_request_info *info = lt->get_lock_request_info();
435 toku_mutex_lock(&info->mutex);
436 for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
437 lock_request *request;
438 int r = info->pending_lock_requests.fetch(i, &request);
439 if (r == 0 && request->get_extra() == extra) {
440 request->kill_waiter();
441 break;
442 }
443 }
444 toku_mutex_unlock(&info->mutex);
445}
446
447// find another lock request by txnid. must hold the mutex.
448lock_request *lock_request::find_lock_request(const TXNID &txnid) {
449 lock_request *request;
450 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(txnid, &request, nullptr);
451 if (r != 0) {
452 request = nullptr;
453 }
454 return request;
455}
456
457// insert this lock request into the locktree's set. must hold the mutex.
458void lock_request::insert_into_lock_requests(void) {
459 uint32_t idx;
460 lock_request *request;
461 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
462 m_txnid, &request, &idx);
463 invariant(r == DB_NOTFOUND);
464 r = m_info->pending_lock_requests.insert_at(this, idx);
465 invariant_zero(r);
466 m_info->pending_is_empty = false;
467}
468
469// remove this lock request from the locktree's set. must hold the mutex.
470void lock_request::remove_from_lock_requests(void) {
471 uint32_t idx;
472 lock_request *request;
473 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
474 m_txnid, &request, &idx);
475 invariant_zero(r);
476 invariant(request == this);
477 r = m_info->pending_lock_requests.delete_at(idx);
478 invariant_zero(r);
479 if (m_info->pending_lock_requests.size() == 0)
480 m_info->pending_is_empty = true;
481}
482
483int lock_request::find_by_txnid(lock_request *const &request,
484 const TXNID &txnid) {
485 TXNID request_txnid = request->m_txnid;
486 if (request_txnid < txnid) {
487 return -1;
488 } else if (request_txnid == txnid) {
489 return 0;
490 } else {
491 return 1;
492 }
493}
494
495void lock_request::set_start_test_callback(void (*f)(void)) {
496 m_start_test_callback = f;
497}
498
499void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
500 m_start_before_pending_test_callback = f;
501}
502
503void lock_request::set_retry_test_callback(void (*f)(void)) {
504 m_retry_test_callback = f;
505}
506
507} /* namespace toku */
508