1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <db.h>
40
41#include <locktree/lock_request.h>
42
43#include "ydb-internal.h"
44#include "ydb_txn.h"
45#include "ydb_row_lock.h"
46
47/*
48 Used for partial implementation of nested transactions.
49 Work is done by children as normal, but all locking is done by the
50 root of the nested txn tree.
51 This may hold extra locks, and will not work as expected when
52 a node has two non-completed txns at any time.
53*/
54static DB_TXN *txn_oldest_ancester(DB_TXN* txn) {
55 while (txn && txn->parent) {
56 txn = txn->parent;
57 }
58 return txn;
59}
60
61int find_key_ranges_by_lt(const txn_lt_key_ranges &ranges,
62 const toku::locktree *const &find_lt);
63int find_key_ranges_by_lt(const txn_lt_key_ranges &ranges,
64 const toku::locktree *const &find_lt) {
65 return ranges.lt->compare(find_lt);
66}
67
68static void db_txn_note_row_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key) {
69 const toku::locktree *lt = db->i->lt;
70
71 toku_mutex_lock(&db_txn_struct_i(txn)->txn_mutex);
72
73 uint32_t idx;
74 txn_lt_key_ranges ranges;
75 toku::omt<txn_lt_key_ranges> *map = &db_txn_struct_i(txn)->lt_map;
76
77 // if this txn has not yet already referenced this
78 // locktree, then add it to this txn's locktree map
79 int r = map->find_zero<const toku::locktree *, find_key_ranges_by_lt>(lt, &ranges, &idx);
80 if (r == DB_NOTFOUND) {
81 ranges.lt = db->i->lt;
82 XMALLOC(ranges.buffer);
83 ranges.buffer->create();
84 map->insert_at(ranges, idx);
85
86 // let the manager know we're referencing this lt
87 toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
88 ltm->reference_lt(ranges.lt);
89 } else {
90 invariant_zero(r);
91 }
92
93 // add a new lock range to this txn's row lock buffer
94 size_t old_mem_size = ranges.buffer->total_memory_size();
95 ranges.buffer->append(left_key, right_key);
96 size_t new_mem_size = ranges.buffer->total_memory_size();
97 invariant(new_mem_size > old_mem_size);
98 lt->get_manager()->note_mem_used(new_mem_size - old_mem_size);
99
100 toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
101}
102
103void toku_db_txn_escalate_callback(TXNID txnid, const toku::locktree *lt, const toku::range_buffer &buffer, void *extra) {
104 DB_ENV *CAST_FROM_VOIDP(env, extra);
105
106 // Get the TOKUTXN and DB_TXN for this txnid from the environment's txn manager.
107 // Only the parent id is used in the search.
108 TOKUTXN ttxn;
109 TXNID_PAIR txnid_pair = { .parent_id64 = txnid, .child_id64 = 0 };
110 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
111
112 toku_txn_manager_suspend(txn_manager);
113 toku_txn_manager_id2txn_unlocked(txn_manager, txnid_pair, &ttxn);
114
115 // We are still holding the txn manager lock. If we couldn't find the txn,
116 // then we lost a race with a committing transaction that got removed
117 // from the txn manager before it released its locktree locks. In this
118 // case we do nothing - that transaction has or is just about to release
119 // its locks and be gone, so there's not point in updating its lt_map
120 // with the new escalated ranges. It will go about releasing the old
121 // locks it thinks it had, and will succeed as if nothing happened.
122 //
123 // If we did find the transaction, then it has not yet been removed
124 // from the manager and therefore has not yet released its locks.
125 // We must try to replace the range buffer associated with this locktree,
126 // if it exists. This is impotant, otherwise it can grow out of
127 // control (ticket 5961).
128
129 if (ttxn != nullptr) {
130 DB_TXN *txn = toku_txn_get_container_db_txn(ttxn);
131
132 // One subtle point is that if the transaction is still live, it is impossible
133 // to deadlock on the txn mutex, even though we are holding the locktree's root
134 // mutex and release locks takes them in the opposite order.
135 //
136 // Proof: releasing locks takes the txn mutex and then acquires the locktree's
137 // root mutex, escalation takes the root mutex and possibly takes the txn mutex.
138 // releasing locks implies the txn is not live, and a non-live txn implies we
139 // will not need to take the txn mutex, so the deadlock is avoided.
140 toku_mutex_lock(&db_txn_struct_i(txn)->txn_mutex);
141
142 uint32_t idx;
143 txn_lt_key_ranges ranges;
144 toku::omt<txn_lt_key_ranges> *map = &db_txn_struct_i(txn)->lt_map;
145 int r = map->find_zero<const toku::locktree *, find_key_ranges_by_lt>(lt, &ranges, &idx);
146 if (r == 0) {
147 // Destroy the old range buffer, create a new one, and insert the new ranges.
148 //
149 // We could theoretically steal the memory from the caller instead of copying
150 // it, but it's simpler to have a callback API that doesn't transfer memory ownership.
151 lt->get_manager()->note_mem_released(ranges.buffer->total_memory_size());
152 ranges.buffer->destroy();
153 ranges.buffer->create();
154 toku::range_buffer::iterator iter(&buffer);
155 toku::range_buffer::iterator::record rec;
156 while (iter.current(&rec)) {
157 ranges.buffer->append(rec.get_left_key(), rec.get_right_key());
158 iter.next();
159 }
160 lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size());
161 } else {
162 // In rare cases, we may not find the associated locktree, because we are
163 // racing with the transaction trying to add this locktree to the lt map
164 // after acquiring its first lock. The escalated lock set must be the single
165 // lock that this txnid just acquired. Do nothing here and let the txn
166 // take care of adding this locktree and range to its lt map as usual.
167 invariant(buffer.get_num_ranges() == 1);
168 }
169
170 toku_mutex_unlock(&db_txn_struct_i(txn)->txn_mutex);
171 }
172
173 toku_txn_manager_resume(txn_manager);
174}
175
176// Get a range lock.
177// Return when the range lock is acquired or the default lock tree timeout has expired.
178int toku_db_get_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key,
179 toku::lock_request::type lock_type) {
180 toku::lock_request request;
181 request.create();
182 int r = toku_db_start_range_lock(db, txn, left_key, right_key, lock_type, &request);
183 if (r == DB_LOCK_NOTGRANTED) {
184 toku_debug_sync(db_txn_struct_i(txn)->tokutxn,
185 "toku_range_lock_before_wait");
186 r = toku_db_wait_range_lock(db, txn, &request);
187 if (r == DB_LOCK_NOTGRANTED)
188 toku_debug_sync(db_txn_struct_i(txn)->tokutxn,
189 "toku_range_lock_not_granted_after_wait");
190 }
191 else if (r == 0) {
192 toku_debug_sync(db_txn_struct_i(txn)->tokutxn,
193 "toku_range_lock_granted_immediately");
194 }
195
196 request.destroy();
197 return r;
198}
199
200// Setup and start an asynchronous lock request.
201int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key,
202 toku::lock_request::type lock_type, toku::lock_request *request) {
203 uint64_t client_id;
204 void *client_extra;
205 DB_TXN *txn_anc = txn_oldest_ancester(txn);
206 TXNID txn_anc_id = txn_anc->id64(txn_anc);
207 txn->get_client_id(txn, &client_id, &client_extra);
208 request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type,
209 toku_is_big_txn(txn_anc), client_extra);
210
211 const int r = request->start();
212 if (r == 0) {
213 db_txn_note_row_lock(db, txn_anc, left_key, right_key);
214 } else if (r == DB_LOCK_DEADLOCK) {
215 lock_timeout_callback callback = txn->mgrp->i->lock_wait_timeout_callback;
216 if (callback != nullptr) {
217 callback(db, txn_anc_id, left_key, right_key,
218 request->get_conflicting_txnid());
219 }
220 }
221 return r;
222}
223
224// Complete a lock request by waiting until the request is ready
225// and then storing the acquired lock if successful.
226int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) {
227 DB_TXN *txn_anc = txn_oldest_ancester(txn);
228 const DBT *left_key = request->get_left_key();
229 const DBT *right_key = request->get_right_key();
230 DB_ENV *env = db->dbenv;
231 uint64_t wait_time_msec = env->i->default_lock_timeout_msec;
232 if (env->i->get_lock_timeout_callback)
233 wait_time_msec = env->i->get_lock_timeout_callback(wait_time_msec);
234 uint64_t killed_time_msec = env->i->default_killed_time_msec;
235 if (env->i->get_killed_time_callback)
236 killed_time_msec = env->i->get_killed_time_callback(killed_time_msec);
237 const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback,
238 env->i->lock_wait_needed_callback);
239 if (r == 0) {
240 db_txn_note_row_lock(db, txn_anc, left_key, right_key);
241 } else if (r == DB_LOCK_NOTGRANTED) {
242 lock_timeout_callback callback = txn->mgrp->i->lock_wait_timeout_callback;
243 if (callback != nullptr) {
244 callback(db, txn_anc->id64(txn_anc), left_key, right_key,
245 request->get_conflicting_txnid());
246 }
247 }
248 return r;
249}
250
251int toku_db_get_point_write_lock(DB *db, DB_TXN *txn, const DBT *key) {
252 return toku_db_get_range_lock(db, txn, key, key, toku::lock_request::type::WRITE);
253}
254
255// acquire a point write lock on the key for a given txn.
256// this does not block the calling thread.
257void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) {
258 uint64_t client_id;
259 void *client_extra;
260 DB_TXN *txn = toku_txn_get_container_db_txn(tokutxn);
261 DB_TXN *txn_anc = txn_oldest_ancester(txn);
262 TXNID txn_anc_id = txn_anc->id64(txn_anc);
263
264 // This lock request must succeed, so we do not want to wait
265 toku::lock_request request;
266 request.create();
267 txn->get_client_id(txn, &client_id, &client_extra);
268 request.set(db->i->lt, txn_anc_id, key, key,
269 toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc),
270 client_extra);
271 int r = request.start();
272 invariant_zero(r);
273 db_txn_note_row_lock(db, txn_anc, key, key);
274 request.destroy();
275}
276
277void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) {
278 toku::locktree *lt = ranges->lt;
279 TXNID txnid = txn->id64(txn);
280
281 // release all of the locks this txn has ever successfully
282 // acquired and stored in the range buffer for this locktree
283 lt->release_locks(txnid, ranges->buffer);
284 lt->get_manager()->note_mem_released(ranges->buffer->total_memory_size());
285 ranges->buffer->destroy();
286 toku_free(ranges->buffer);
287
288 // all of our locks have been released, so first try to wake up
289 // pending lock requests, then release our reference on the lt
290 toku::lock_request::retry_all_lock_requests(lt, txn->mgrp->i->lock_wait_needed_callback);
291
292 // Release our reference on this locktree
293 toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
294 ltm->release_lt(lt);
295}
296