1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <stdio.h>
40#include <stdlib.h>
41#include <string.h>
42#include <db.h>
43#include "toku_assert.h"
44#include "ydb-internal.h"
45#include "ydb_cursor.h"
46#include "ydb_row_lock.h"
47#include "ft/cursor.h"
48
49static YDB_C_LAYER_STATUS_S ydb_c_layer_status;
50#ifdef STATUS_VALUE
51#undef STATUS_VALUE
52#endif
53#define STATUS_VALUE(x) ydb_c_layer_status.status[x].value.num
54
55#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_c_layer_status, k, c, t, l, inc)
56
57static void
58ydb_c_layer_status_init (void) {
59 // Note, this function initializes the keyname, type, and legend fields.
60 // Value fields are initialized to zero by compiler.
61 ydb_c_layer_status.initialized = true;
62}
63#undef STATUS_INIT
64
65void
66ydb_c_layer_get_status(YDB_C_LAYER_STATUS statp) {
67 if (!ydb_c_layer_status.initialized)
68 ydb_c_layer_status_init();
69 *statp = ydb_c_layer_status;
70}
71
72//Get the main portion of a cursor flag (excluding the bitwise or'd components).
73static int
74get_main_cursor_flag(uint32_t flags) {
75 return flags & DB_OPFLAGS_MASK;
76}
77
78static int
79get_nonmain_cursor_flags(uint32_t flags) {
80 return flags & ~(DB_OPFLAGS_MASK);
81}
82
83static inline bool
84c_uninitialized(DBC *c) {
85 return toku_ft_cursor_uninitialized(dbc_ftcursor(c));
86}
87
88typedef struct query_context_wrapped_t {
89 DBT *key;
90 DBT *val;
91 struct simple_dbt *skey;
92 struct simple_dbt *sval;
93} *QUERY_CONTEXT_WRAPPED, QUERY_CONTEXT_WRAPPED_S;
94
95static inline void
96query_context_wrapped_init(QUERY_CONTEXT_WRAPPED context, DBC *c, DBT *key, DBT *val) {
97 context->key = key;
98 context->val = val;
99 context->skey = dbc_struct_i(c)->skey;
100 context->sval = dbc_struct_i(c)->sval;
101}
102
103static int
104c_get_wrapper_callback(DBT const *key, DBT const *val, void *extra) {
105 QUERY_CONTEXT_WRAPPED context = (QUERY_CONTEXT_WRAPPED) extra;
106 int r = toku_dbt_set(key->size, key->data, context->key, context->skey);
107 if (r == 0) {
108 r = toku_dbt_set(val->size, val->data, context->val, context->sval);
109 }
110 return r;
111}
112
113static inline uint32_t get_cursor_prelocked_flags(uint32_t flags, DBC *dbc) {
114 uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
115
116 // DB_READ_UNCOMMITTED and DB_READ_COMMITTED transactions 'own' all read
117 // locks for user-data dictionaries.
118 if (dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE &&
119 !(dbc_struct_i(dbc)->iso == TOKU_ISO_SNAPSHOT &&
120 dbc_struct_i(dbc)->locking_read)) {
121 lock_flags |= DB_PRELOCKED;
122 }
123 return lock_flags;
124}
125
126//This is the user level callback function given to ydb layer functions like
127//c_getf_first
128
129typedef struct query_context_base_t {
130 FT_CURSOR c;
131 DB_TXN *txn;
132 DB *db;
133 YDB_CALLBACK_FUNCTION f;
134 void *f_extra;
135 int r_user_callback;
136 bool do_locking;
137 bool is_write_op;
138 toku::lock_request request;
139} *QUERY_CONTEXT_BASE, QUERY_CONTEXT_BASE_S;
140
141typedef struct query_context_t {
142 QUERY_CONTEXT_BASE_S base;
143} *QUERY_CONTEXT, QUERY_CONTEXT_S;
144
145typedef struct query_context_with_input_t {
146 QUERY_CONTEXT_BASE_S base;
147 DBT *input_key;
148 DBT *input_val;
149} *QUERY_CONTEXT_WITH_INPUT, QUERY_CONTEXT_WITH_INPUT_S;
150
151static void
152query_context_base_init(QUERY_CONTEXT_BASE context, DBC *c, uint32_t flag, bool is_write_op, YDB_CALLBACK_FUNCTION f, void *extra) {
153 context->c = dbc_ftcursor(c);
154 context->txn = dbc_struct_i(c)->txn;
155 context->db = c->dbp;
156 context->f = f;
157 context->f_extra = extra;
158 context->is_write_op = is_write_op;
159 uint32_t lock_flags = get_cursor_prelocked_flags(flag, c);
160 if (context->is_write_op) {
161 lock_flags &= DB_PRELOCKED_WRITE; // Only care about whether already locked for write
162 }
163 context->do_locking = (context->db->i->lt != nullptr && !(lock_flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE)));
164 context->r_user_callback = 0;
165 context->request.create();
166}
167
168static toku::lock_request::type
169query_context_determine_lock_type(QUERY_CONTEXT_BASE context) {
170 return context->is_write_op ?
171 toku::lock_request::type::WRITE : toku::lock_request::type::READ;
172}
173
174static void
175query_context_base_destroy(QUERY_CONTEXT_BASE context) {
176 context->request.destroy();
177}
178
179static void
180query_context_init_read(QUERY_CONTEXT context, DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
181 const bool is_write = false;
182 query_context_base_init(&context->base, c, flag, is_write, f, extra);
183}
184
185static void
186query_context_init_write(QUERY_CONTEXT context, DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
187 const bool is_write = true;
188 query_context_base_init(&context->base, c, flag, is_write, f, extra);
189}
190
191static void
192query_context_with_input_init(QUERY_CONTEXT_WITH_INPUT context, DBC *c, uint32_t flag, DBT *key, DBT *val, YDB_CALLBACK_FUNCTION f, void *extra) {
193 // grab write locks if the DB_RMW flag is set or the cursor was created with the DB_RMW flag
194 const bool is_write = ((flag & DB_RMW) != 0) || dbc_struct_i(c)->rmw;
195 query_context_base_init(&context->base, c, flag, is_write, f, extra);
196 context->input_key = key;
197 context->input_val = val;
198}
199
200static int c_getf_first_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
201
202static void
203c_query_context_init(QUERY_CONTEXT context, DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
204 bool is_write_op = false;
205 // grab write locks if the DB_RMW flag is set or the cursor was created with the DB_RMW flag
206 if ((flag & DB_RMW) || dbc_struct_i(c)->rmw) {
207 is_write_op = true;
208 }
209 if (is_write_op) {
210 query_context_init_write(context, c, flag, f, extra);
211 } else {
212 query_context_init_read(context, c, flag, f, extra);
213 }
214}
215
216static void
217c_query_context_destroy(QUERY_CONTEXT context) {
218 query_context_base_destroy(&context->base);
219}
220
221static int
222c_getf_first(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
223 HANDLE_PANICKED_DB(c->dbp);
224 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
225 int r = 0;
226 QUERY_CONTEXT_S context; //Describes the context of this query.
227 c_query_context_init(&context, c, flag, f, extra);
228 while (r == 0) {
229 //toku_ft_cursor_first will call c_getf_first_callback(..., context) (if query is successful)
230 r = toku_ft_cursor_first(dbc_ftcursor(c), c_getf_first_callback, &context);
231 if (r == DB_LOCK_NOTGRANTED) {
232 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
233 } else {
234 break;
235 }
236 }
237 c_query_context_destroy(&context);
238 return r;
239}
240
241//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
242static int
243c_getf_first_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
244 QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
245 QUERY_CONTEXT_BASE context = &super_context->base;
246
247 int r;
248 DBT found_key = { .data = (void *) key, .size = keylen };
249
250 if (context->do_locking) {
251 const DBT *left_key = toku_dbt_negative_infinity();
252 const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
253 r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
254 query_context_determine_lock_type(context), &context->request);
255 } else {
256 r = 0;
257 }
258
259 //Call application-layer callback if found and locks were successfully obtained.
260 if (r==0 && key!=NULL && !lock_only) {
261 DBT found_val = { .data = (void *) val, .size = vallen };
262 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
263 r = context->r_user_callback;
264 }
265
266 //Give ft-layer an error (if any) to return from toku_ft_cursor_first
267 return r;
268}
269
270static int c_getf_last_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
271
272static int
273c_getf_last(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
274 HANDLE_PANICKED_DB(c->dbp);
275 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
276 int r = 0;
277 QUERY_CONTEXT_S context; //Describes the context of this query.
278 c_query_context_init(&context, c, flag, f, extra);
279 while (r == 0) {
280 //toku_ft_cursor_last will call c_getf_last_callback(..., context) (if query is successful)
281 r = toku_ft_cursor_last(dbc_ftcursor(c), c_getf_last_callback, &context);
282 if (r == DB_LOCK_NOTGRANTED) {
283 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
284 } else {
285 break;
286 }
287 }
288 c_query_context_destroy(&context);
289 return r;
290}
291
292//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
293static int
294c_getf_last_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
295 QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
296 QUERY_CONTEXT_BASE context = &super_context->base;
297
298 int r;
299 DBT found_key = { .data = (void *) key, .size = keylen };
300
301 if (context->do_locking) {
302 const DBT *left_key = key != NULL ? &found_key : toku_dbt_negative_infinity();
303 const DBT *right_key = toku_dbt_positive_infinity();
304 r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
305 query_context_determine_lock_type(context), &context->request);
306 } else {
307 r = 0;
308 }
309
310 //Call application-layer callback if found and locks were successfully obtained.
311 if (r==0 && key!=NULL && !lock_only) {
312 DBT found_val = { .data = (void *) val, .size = vallen };
313 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
314 r = context->r_user_callback;
315 }
316
317 //Give ft-layer an error (if any) to return from toku_ft_cursor_last
318 return r;
319}
320
321static int c_getf_next_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
322
323static int
324c_getf_next(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
325 int r;
326 HANDLE_PANICKED_DB(c->dbp);
327 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
328 if (c_uninitialized(c)) {
329 r = c_getf_first(c, flag, f, extra);
330 } else {
331 r = 0;
332 QUERY_CONTEXT_S context; //Describes the context of this query.
333 c_query_context_init(&context, c, flag, f, extra);
334 while (r == 0) {
335 //toku_ft_cursor_next will call c_getf_next_callback(..., context) (if query is successful)
336 r = toku_ft_cursor_next(dbc_ftcursor(c), c_getf_next_callback, &context);
337 if (r == DB_LOCK_NOTGRANTED) {
338 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
339 } else {
340 break;
341 }
342 }
343 c_query_context_destroy(&context);
344 }
345 return r;
346}
347
348//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
349static int
350c_getf_next_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
351 QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
352 QUERY_CONTEXT_BASE context = &super_context->base;
353
354 int r;
355
356 DBT found_key = { .data = (void *) key, .size = keylen };
357
358 if (context->do_locking) {
359 const DBT *prevkey, *prevval;
360 toku_ft_cursor_peek(context->c, &prevkey, &prevval);
361 const DBT *left_key = prevkey;
362 const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
363 r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
364 query_context_determine_lock_type(context), &context->request);
365 } else {
366 r = 0;
367 }
368
369 //Call application-layer callback if found and locks were successfully obtained.
370 if (r==0 && key!=NULL && !lock_only) {
371 DBT found_val = { .data = (void *) val, .size = vallen };
372 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
373 r = context->r_user_callback;
374 }
375
376 //Give ft-layer an error (if any) to return from toku_ft_cursor_next
377 return r;
378}
379
380static int c_getf_prev_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
381
382static int
383c_getf_prev(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
384 int r;
385 HANDLE_PANICKED_DB(c->dbp);
386 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
387 if (c_uninitialized(c)) {
388 r = c_getf_last(c, flag, f, extra);
389 } else {
390 r = 0;
391 QUERY_CONTEXT_S context; //Describes the context of this query.
392 c_query_context_init(&context, c, flag, f, extra);
393 while (r == 0) {
394 //toku_ft_cursor_prev will call c_getf_prev_callback(..., context) (if query is successful)
395 r = toku_ft_cursor_prev(dbc_ftcursor(c), c_getf_prev_callback, &context);
396 if (r == DB_LOCK_NOTGRANTED) {
397 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
398 } else {
399 break;
400 }
401 }
402 c_query_context_destroy(&context);
403 }
404 return r;
405}
406
407//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
408static int
409c_getf_prev_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
410 QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
411 QUERY_CONTEXT_BASE context = &super_context->base;
412
413 int r;
414 DBT found_key = { .data = (void *) key, .size = keylen };
415
416 if (context->do_locking) {
417 const DBT *prevkey, *prevval;
418 toku_ft_cursor_peek(context->c, &prevkey, &prevval);
419 const DBT *left_key = key != NULL ? &found_key : toku_dbt_negative_infinity();
420 const DBT *right_key = prevkey;
421 r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
422 query_context_determine_lock_type(context), &context->request);
423 } else {
424 r = 0;
425 }
426
427 //Call application-layer callback if found and locks were successfully obtained.
428 if (r==0 && key!=NULL && !lock_only) {
429 DBT found_val = { .data = (void *) val, .size = vallen };
430 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
431 r = context->r_user_callback;
432 }
433
434 //Give ft-layer an error (if any) to return from toku_ft_cursor_prev
435 return r;
436}
437
438static int c_getf_current_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
439
440static int
441c_getf_current(DBC *c, uint32_t flag, YDB_CALLBACK_FUNCTION f, void *extra) {
442 HANDLE_PANICKED_DB(c->dbp);
443 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
444
445 QUERY_CONTEXT_S context; //Describes the context of this query.
446 c_query_context_init(&context, c, flag, f, extra);
447 //toku_ft_cursor_current will call c_getf_current_callback(..., context) (if query is successful)
448 int r = toku_ft_cursor_current(dbc_ftcursor(c), DB_CURRENT, c_getf_current_callback, &context);
449 c_query_context_destroy(&context);
450 return r;
451}
452
453//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
454static int
455c_getf_current_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
456 QUERY_CONTEXT super_context = (QUERY_CONTEXT) extra;
457 QUERY_CONTEXT_BASE context = &super_context->base;
458
459 int r;
460
461 //Call application-layer callback if found.
462 if (key!=NULL && !lock_only) {
463 DBT found_key = { .data = (void *) key, .size = keylen };
464 DBT found_val = { .data = (void *) val, .size = vallen };
465 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
466 r = context->r_user_callback;
467 } else {
468 r = 0;
469 }
470
471 //Give ft-layer an error (if any) to return from toku_ft_cursor_current
472 return r;
473}
474
475static int c_getf_set_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
476
477int
478toku_c_getf_set(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
479 HANDLE_PANICKED_DB(c->dbp);
480 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
481
482 int r = 0;
483 QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
484 query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
485 while (r == 0) {
486 //toku_ft_cursor_set will call c_getf_set_callback(..., context) (if query is successful)
487 r = toku_ft_cursor_set(dbc_ftcursor(c), key, c_getf_set_callback, &context);
488 if (r == DB_LOCK_NOTGRANTED) {
489 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
490 } else {
491 break;
492 }
493 }
494 query_context_base_destroy(&context.base);
495 return r;
496}
497
498//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
499static int
500c_getf_set_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
501 QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
502 QUERY_CONTEXT_BASE context = &super_context->base;
503
504 int r;
505
506 //Lock:
507 // left(key,val) = (input_key, -infinity)
508 // right(key,val) = (input_key, found ? found_val : infinity)
509 if (context->do_locking) {
510 r = toku_db_start_range_lock(context->db, context->txn, super_context->input_key, super_context->input_key,
511 query_context_determine_lock_type(context), &context->request);
512 } else {
513 r = 0;
514 }
515
516 //Call application-layer callback if found and locks were successfully obtained.
517 if (r==0 && key!=NULL && !lock_only) {
518 DBT found_key = { .data = (void *) key, .size = keylen };
519 DBT found_val = { .data = (void *) val, .size = vallen };
520 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
521 r = context->r_user_callback;
522 }
523
524 //Give ft-layer an error (if any) to return from toku_ft_cursor_set
525 return r;
526}
527
528static int c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
529
530static int
531c_getf_set_range(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
532 HANDLE_PANICKED_DB(c->dbp);
533 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
534
535 int r = 0;
536 QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
537 query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
538 while (r == 0) {
539 //toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
540 r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, nullptr, c_getf_set_range_callback, &context);
541 if (r == DB_LOCK_NOTGRANTED) {
542 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
543 } else {
544 break;
545 }
546 }
547 query_context_base_destroy(&context.base);
548 return r;
549}
550
551//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
552static int
553c_getf_set_range_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
554 QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
555 QUERY_CONTEXT_BASE context = &super_context->base;
556
557 int r;
558 DBT found_key = { .data = (void *) key, .size = keylen };
559
560 //Lock:
561 // left(key,val) = (input_key, -infinity)
562 // right(key) = found ? found_key : infinity
563 // right(val) = found ? found_val : infinity
564 if (context->do_locking) {
565 const DBT *left_key = super_context->input_key;
566 const DBT *right_key = key != NULL ? &found_key : toku_dbt_positive_infinity();
567 r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
568 query_context_determine_lock_type(context), &context->request);
569 } else {
570 r = 0;
571 }
572
573 //Call application-layer callback if found and locks were successfully obtained.
574 if (r==0 && key!=NULL && !lock_only) {
575 DBT found_val = { .data = (void *) val, .size = vallen };
576 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
577 r = context->r_user_callback;
578 }
579
580 //Give ft-layer an error (if any) to return from toku_ft_cursor_set_range
581 return r;
582}
583
584static int
585c_getf_set_range_with_bound(DBC *c, uint32_t flag, DBT *key, DBT *key_bound, YDB_CALLBACK_FUNCTION f, void *extra) {
586 HANDLE_PANICKED_DB(c->dbp);
587 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
588
589 int r = 0;
590 QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
591 query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
592 while (r == 0) {
593 //toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
594 r = toku_ft_cursor_set_range(dbc_ftcursor(c), key, key_bound, c_getf_set_range_callback, &context);
595 if (r == DB_LOCK_NOTGRANTED) {
596 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
597 } else {
598 break;
599 }
600 }
601 query_context_base_destroy(&context.base);
602 return r;
603}
604
605static int c_getf_set_range_reverse_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool);
606
607static int
608c_getf_set_range_reverse(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
609 HANDLE_PANICKED_DB(c->dbp);
610 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
611
612 int r = 0;
613 QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
614 query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
615 while (r == 0) {
616 //toku_ft_cursor_set_range_reverse will call c_getf_set_range_reverse_callback(..., context) (if query is successful)
617 r = toku_ft_cursor_set_range_reverse(dbc_ftcursor(c), key, c_getf_set_range_reverse_callback, &context);
618 if (r == DB_LOCK_NOTGRANTED) {
619 r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
620 } else {
621 break;
622 }
623 }
624 query_context_base_destroy(&context.base);
625 return r;
626}
627
628//result is the result of the query (i.e. 0 means found, DB_NOTFOUND, etc..)
629static int
630c_getf_set_range_reverse_callback(uint32_t keylen, const void *key, uint32_t vallen, const void *val, void *extra, bool lock_only) {
631 QUERY_CONTEXT_WITH_INPUT super_context = (QUERY_CONTEXT_WITH_INPUT) extra;
632 QUERY_CONTEXT_BASE context = &super_context->base;
633
634 int r;
635 DBT found_key = { .data = (void *) key, .size = keylen };
636
637 //Lock:
638 // left(key) = found ? found_key : -infinity
639 // left(val) = found ? found_val : -infinity
640 // right(key,val) = (input_key, infinity)
641 if (context->do_locking) {
642 const DBT *left_key = key != NULL ? &found_key : toku_dbt_negative_infinity();
643 const DBT *right_key = super_context->input_key;
644 r = toku_db_start_range_lock(context->db, context->txn, left_key, right_key,
645 query_context_determine_lock_type(context), &context->request);
646 } else {
647 r = 0;
648 }
649
650 //Call application-layer callback if found and locks were successfully obtained.
651 if (r==0 && key!=NULL && !lock_only) {
652 DBT found_val = { .data = (void *) val, .size = vallen };
653 context->r_user_callback = context->f(&found_key, &found_val, context->f_extra);
654 r = context->r_user_callback;
655 }
656
657 //Give ft-layer an error (if any) to return from toku_ft_cursor_set_range_reverse
658 return r;
659}
660
661
662int toku_c_close_internal(DBC *c) {
663 toku_ft_cursor_destroy(dbc_ftcursor(c));
664 toku_sdbt_cleanup(&dbc_struct_i(c)->skey_s);
665 toku_sdbt_cleanup(&dbc_struct_i(c)->sval_s);
666 return 0;
667}
668
669// Close a cursor.
670int toku_c_close(DBC *c) {
671 toku_c_close_internal(c);
672 toku_free(c);
673 return 0;
674}
675
676static int c_set_bounds(DBC *dbc,
677 const DBT *left_key,
678 const DBT *right_key,
679 bool pre_acquire,
680 int out_of_range_error) {
681 if (out_of_range_error != DB_NOTFOUND &&
682 out_of_range_error != TOKUDB_OUT_OF_RANGE && out_of_range_error != 0) {
683 return toku_ydb_do_error(dbc->dbp->dbenv,
684 EINVAL,
685 "Invalid out_of_range_error [%d] for %s\n",
686 out_of_range_error,
687 __FUNCTION__);
688 }
689 if (left_key == toku_dbt_negative_infinity() &&
690 right_key == toku_dbt_positive_infinity()) {
691 out_of_range_error = 0;
692 }
693 DB *db = dbc->dbp;
694 DB_TXN *txn = dbc_struct_i(dbc)->txn;
695 HANDLE_PANICKED_DB(db);
696 toku_ft_cursor_set_range_lock(dbc_ftcursor(dbc),
697 left_key,
698 right_key,
699 (left_key == toku_dbt_negative_infinity()),
700 (right_key == toku_dbt_positive_infinity()),
701 out_of_range_error);
702 if (!db->i->lt || !txn || !pre_acquire)
703 return 0;
704 // READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
705 if (!dbc_struct_i(dbc)->rmw &&
706 dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE &&
707 !(dbc_struct_i(dbc)->iso == TOKU_ISO_SNAPSHOT &&
708 dbc_struct_i(dbc)->locking_read))
709 return 0;
710
711 toku::lock_request::type lock_type = dbc_struct_i(dbc)->rmw
712 ? toku::lock_request::type::WRITE
713 : toku::lock_request::type::READ;
714 int r = toku_db_get_range_lock(db, txn, left_key, right_key, lock_type);
715 return r;
716}
717
718static void
719c_remove_restriction(DBC *dbc) {
720 toku_ft_cursor_remove_restriction(dbc_ftcursor(dbc));
721}
722
723static void c_set_txn(DBC *dbc, DB_TXN *txn) {
724 dbc_struct_i(dbc)->txn = txn;
725 dbc_ftcursor(dbc)->ttxn = db_txn_struct_i(txn)->tokutxn;
726}
727
728static void
729c_set_check_interrupt_callback(DBC* dbc, bool (*interrupt_callback)(void*, uint64_t), void *extra) {
730 toku_ft_cursor_set_check_interrupt_cb(dbc_ftcursor(dbc), interrupt_callback, extra);
731}
732
733int
734toku_c_get(DBC* c, DBT* key, DBT* val, uint32_t flag) {
735 HANDLE_PANICKED_DB(c->dbp);
736 HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
737
738 uint32_t main_flag = get_main_cursor_flag(flag);
739 uint32_t remaining_flags = get_nonmain_cursor_flags(flag);
740 int r;
741 QUERY_CONTEXT_WRAPPED_S context;
742 //Passing in NULL for a key or val means that it is NOT an output.
743 // Both key and val are output:
744 // query_context_wrapped_init(&context, c, key, val);
745 // Val is output, key is not:
746 // query_context_wrapped_init(&context, c, NULL, val);
747 // Neither key nor val are output:
748 // query_context_wrapped_init(&context, c, NULL, NULL);
749 switch (main_flag) {
750 case (DB_FIRST):
751 query_context_wrapped_init(&context, c, key, val);
752 r = c_getf_first(c, remaining_flags, c_get_wrapper_callback, &context);
753 break;
754 case (DB_LAST):
755 query_context_wrapped_init(&context, c, key, val);
756 r = c_getf_last(c, remaining_flags, c_get_wrapper_callback, &context);
757 break;
758 case (DB_NEXT):
759 query_context_wrapped_init(&context, c, key, val);
760 r = c_getf_next(c, remaining_flags, c_get_wrapper_callback, &context);
761 break;
762 case (DB_PREV):
763 query_context_wrapped_init(&context, c, key, val);
764 r = c_getf_prev(c, remaining_flags, c_get_wrapper_callback, &context);
765 break;
766#ifdef DB_PREV_DUP
767 case (DB_PREV_DUP):
768 query_context_wrapped_init(&context, c, key, val);
769 r = toku_c_getf_prev_dup(c, remaining_flags, c_get_wrapper_callback, &context);
770 break;
771#endif
772 case (DB_CURRENT):
773 query_context_wrapped_init(&context, c, key, val);
774 r = c_getf_current(c, remaining_flags, c_get_wrapper_callback, &context);
775 break;
776 case (DB_SET):
777 query_context_wrapped_init(&context, c, NULL, val);
778 r = toku_c_getf_set(c, remaining_flags, key, c_get_wrapper_callback, &context);
779 break;
780 case (DB_SET_RANGE):
781 query_context_wrapped_init(&context, c, key, val);
782 r = c_getf_set_range(c, remaining_flags, key, c_get_wrapper_callback, &context);
783 break;
784 case (DB_SET_RANGE_REVERSE):
785 query_context_wrapped_init(&context, c, key, val);
786 r = c_getf_set_range_reverse(c, remaining_flags, key, c_get_wrapper_callback, &context);
787 break;
788 default:
789 r = EINVAL;
790 break;
791 }
792 return r;
793}
794
795int toku_db_cursor_internal(DB *db,
796 DB_TXN *txn,
797 DBC *c,
798 uint32_t flags,
799 int is_temporary_cursor) {
800 HANDLE_PANICKED_DB(db);
801 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
802 DB_ENV *env = db->dbenv;
803
804 if (flags &
805 ~(DB_SERIALIZABLE | DB_INHERIT_ISOLATION | DB_LOCKING_READ | DB_RMW |
806 DBC_DISABLE_PREFETCHING)) {
807 return toku_ydb_do_error(
808 env, EINVAL, "Invalid flags set for toku_db_cursor\n");
809 }
810
811#define SCRS(name) c->name = name
812 SCRS(c_getf_first);
813 SCRS(c_getf_last);
814 SCRS(c_getf_next);
815 SCRS(c_getf_prev);
816 SCRS(c_getf_current);
817 SCRS(c_getf_set_range);
818 SCRS(c_getf_set_range_reverse);
819 SCRS(c_getf_set_range_with_bound);
820 SCRS(c_set_bounds);
821 SCRS(c_remove_restriction);
822 SCRS(c_set_txn);
823 SCRS(c_set_check_interrupt_callback);
824#undef SCRS
825
826 c->c_get = toku_c_get;
827 c->c_getf_set = toku_c_getf_set;
828 c->c_close = toku_c_close;
829
830 c->dbp = db;
831
832 dbc_struct_i(c)->txn = txn;
833 dbc_struct_i(c)->skey_s = (struct simple_dbt){0, 0};
834 dbc_struct_i(c)->sval_s = (struct simple_dbt){0, 0};
835 if (is_temporary_cursor) {
836 dbc_struct_i(c)->skey = &db->i->skey;
837 dbc_struct_i(c)->sval = &db->i->sval;
838 } else {
839 dbc_struct_i(c)->skey = &dbc_struct_i(c)->skey_s;
840 dbc_struct_i(c)->sval = &dbc_struct_i(c)->sval_s;
841 }
842 if (flags & DB_SERIALIZABLE) {
843 dbc_struct_i(c)->iso = TOKU_ISO_SERIALIZABLE;
844 } else {
845 dbc_struct_i(c)->iso =
846 txn ? db_txn_struct_i(txn)->iso : TOKU_ISO_SERIALIZABLE;
847 }
848 dbc_struct_i(c)->rmw = (flags & DB_RMW) != 0;
849 dbc_struct_i(c)->locking_read = (flags & DB_LOCKING_READ) != 0;
850 enum cursor_read_type read_type =
851 C_READ_ANY; // default, used in serializable and read uncommitted
852 if (txn) {
853 if (dbc_struct_i(c)->iso == TOKU_ISO_READ_COMMITTED ||
854 dbc_struct_i(c)->iso == TOKU_ISO_SNAPSHOT) {
855 read_type = C_READ_SNAPSHOT;
856 } else if (dbc_struct_i(c)->iso == TOKU_ISO_READ_COMMITTED_ALWAYS) {
857 read_type = C_READ_COMMITTED;
858 }
859 }
860 int r = toku_ft_cursor_create(db->i->ft_handle,
861 dbc_ftcursor(c),
862 txn ? db_txn_struct_i(txn)->tokutxn : NULL,
863 read_type,
864 ((flags & DBC_DISABLE_PREFETCHING) != 0),
865 is_temporary_cursor != 0);
866 if (r != 0) {
867 invariant(r == TOKUDB_MVCC_DICTIONARY_TOO_NEW);
868 }
869 return r;
870}
871
872static inline int
873autotxn_db_cursor(DB *db, DB_TXN *txn, DBC *c, uint32_t flags) {
874 if (!txn && (db->dbenv->i->open_flags & DB_INIT_TXN)) {
875 return toku_ydb_do_error(db->dbenv, EINVAL,
876 "Cursors in a transaction environment must have transactions.\n");
877 }
878 return toku_db_cursor_internal(db, txn, c, flags, 0);
879}
880
881// Create a cursor on a db.
882int toku_db_cursor(DB *db, DB_TXN *txn, DBC **c, uint32_t flags) {
883 DBC *XMALLOC(cursor);
884 int r = autotxn_db_cursor(db, txn, cursor, flags);
885 if (r == 0) {
886 *c = cursor;
887 } else {
888 toku_free(cursor);
889 }
890 return r;
891}
892
893#undef STATUS_VALUE
894
895#include <toku_race_tools.h>
896void __attribute__((constructor)) toku_ydb_cursor_helgrind_ignore(void);
897void
898toku_ydb_cursor_helgrind_ignore(void) {
899 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_c_layer_status, sizeof ydb_c_layer_status);
900}
901