1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <db.h>
40#include "ydb-internal.h"
41#include "indexer.h"
42#include <ft/log_header.h>
43#include <ft/cachetable/checkpoint.h>
44#include "ydb_row_lock.h"
45#include "ydb_write.h"
46#include "ydb_db.h"
47#include <portability/toku_atomic.h>
48#include <util/status.h>
49
50static YDB_WRITE_LAYER_STATUS_S ydb_write_layer_status;
51#ifdef STATUS_VALUE
52#undef STATUS_VALUE
53#endif
54#define STATUS_VALUE(x) ydb_write_layer_status.status[x].value.num
55
56#define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_write_layer_status, k, c, t, l, inc)
57
58static void
59ydb_write_layer_status_init (void) {
60 // Note, this function initializes the keyname, type, and legend fields.
61 // Value fields are initialized to zero by compiler.
62 STATUS_INIT(YDB_LAYER_NUM_INSERTS, nullptr, UINT64, "dictionary inserts", TOKU_ENGINE_STATUS);
63 STATUS_INIT(YDB_LAYER_NUM_INSERTS_FAIL, nullptr, UINT64, "dictionary inserts fail", TOKU_ENGINE_STATUS);
64 STATUS_INIT(YDB_LAYER_NUM_DELETES, nullptr, UINT64, "dictionary deletes", TOKU_ENGINE_STATUS);
65 STATUS_INIT(YDB_LAYER_NUM_DELETES_FAIL, nullptr, UINT64, "dictionary deletes fail", TOKU_ENGINE_STATUS);
66 STATUS_INIT(YDB_LAYER_NUM_UPDATES, nullptr, UINT64, "dictionary updates", TOKU_ENGINE_STATUS);
67 STATUS_INIT(YDB_LAYER_NUM_UPDATES_FAIL, nullptr, UINT64, "dictionary updates fail", TOKU_ENGINE_STATUS);
68 STATUS_INIT(YDB_LAYER_NUM_UPDATES_BROADCAST, nullptr, UINT64, "dictionary broadcast updates", TOKU_ENGINE_STATUS);
69 STATUS_INIT(YDB_LAYER_NUM_UPDATES_BROADCAST_FAIL, nullptr, UINT64, "dictionary broadcast updates fail", TOKU_ENGINE_STATUS);
70 STATUS_INIT(YDB_LAYER_NUM_MULTI_INSERTS, nullptr, UINT64, "dictionary multi inserts", TOKU_ENGINE_STATUS);
71 STATUS_INIT(YDB_LAYER_NUM_MULTI_INSERTS_FAIL, nullptr, UINT64, "dictionary multi inserts fail", TOKU_ENGINE_STATUS);
72 STATUS_INIT(YDB_LAYER_NUM_MULTI_DELETES, nullptr, UINT64, "dictionary multi deletes", TOKU_ENGINE_STATUS);
73 STATUS_INIT(YDB_LAYER_NUM_MULTI_DELETES_FAIL, nullptr, UINT64, "dictionary multi deletes fail", TOKU_ENGINE_STATUS);
74 STATUS_INIT(YDB_LAYER_NUM_MULTI_UPDATES, nullptr, UINT64, "dictionary updates multi", TOKU_ENGINE_STATUS);
75 STATUS_INIT(YDB_LAYER_NUM_MULTI_UPDATES_FAIL, nullptr, UINT64, "dictionary updates multi fail", TOKU_ENGINE_STATUS);
76 ydb_write_layer_status.initialized = true;
77}
78#undef STATUS_INIT
79
80void
81ydb_write_layer_get_status(YDB_WRITE_LAYER_STATUS statp) {
82 if (!ydb_write_layer_status.initialized)
83 ydb_write_layer_status_init();
84 *statp = ydb_write_layer_status;
85}
86
87
88static inline uint32_t
89get_prelocked_flags(uint32_t flags) {
90 uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
91 return lock_flags;
92}
93
94// these next two static functions are defined
95// both here and ydb.c. We should find a good
96// place for them.
97static int
98ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
99 return 0;
100}
101
102// Check if the available file system space is less than the reserve
103// Returns ENOSPC if not enough space, othersize 0
104static inline int
105env_check_avail_fs_space(DB_ENV *env) {
106 int r = env->i->fs_state == FS_RED ? ENOSPC : 0;
107 if (r) {
108 env->i->enospc_redzone_ctr++;
109 }
110 return r;
111}
112
113// Return 0 if proposed pair do not violate size constraints of DB
114// (insertion is legal)
115// Return non zero otherwise.
116static int
117db_put_check_size_constraints(DB *db, const DBT *key, const DBT *val) {
118 int r = 0;
119 unsigned int klimit, vlimit;
120
121 toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);
122 if (key->size > klimit) {
123 r = toku_ydb_do_error(db->dbenv, EINVAL,
124 "The largest key allowed is %u bytes", klimit);
125 } else if (val->size > vlimit) {
126 r = toku_ydb_do_error(db->dbenv, EINVAL,
127 "The largest value allowed is %u bytes", vlimit);
128 }
129 return r;
130}
131
132//Return 0 if insert is legal
133static int
134db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key,
135 uint32_t lock_flags, uint32_t overwrite_flag) {
136 int r;
137
138 if (overwrite_flag == 0) { // 0 (yesoverwrite) does not impose constraints.
139 r = 0;
140 } else if (overwrite_flag == DB_NOOVERWRITE) {
141 // Check if (key,anything) exists in dictionary.
142 // If exists, fail. Otherwise, do insert.
143 // The DB_RMW flag causes the cursor to grab a write lock instead of a read lock on the key if it exists.
144 r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE|DB_RMW, key, ydb_getf_do_nothing, NULL);
145 if (r == DB_NOTFOUND)
146 r = 0;
147 else if (r == 0)
148 r = DB_KEYEXIST;
149 //Any other error is passed through.
150 } else if (overwrite_flag == DB_NOOVERWRITE_NO_ERROR) {
151 r = 0;
152 } else {
153 //Other flags are not (yet) supported.
154 r = EINVAL;
155 }
156 return r;
157}
158
159
160int
161toku_db_del(DB *db, DB_TXN *txn, DBT *key, uint32_t flags, bool holds_mo_lock) {
162 HANDLE_PANICKED_DB(db);
163 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
164 HANDLE_READ_ONLY_TXN(txn);
165
166 uint32_t unchecked_flags = flags;
167 //DB_DELETE_ANY means delete regardless of whether it exists in the db.
168 bool error_if_missing = (bool)(!(flags&DB_DELETE_ANY));
169 unchecked_flags &= ~DB_DELETE_ANY;
170 uint32_t lock_flags = get_prelocked_flags(flags);
171 unchecked_flags &= ~lock_flags;
172 bool do_locking = (bool)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
173
174 int r = 0;
175 if (unchecked_flags!=0) {
176 r = EINVAL;
177 }
178
179 if (r == 0 && error_if_missing) {
180 //Check if the key exists in the db.
181 r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE|DB_RMW, key, ydb_getf_do_nothing, NULL);
182 }
183 if (r == 0 && do_locking) {
184 //Do locking if necessary.
185 r = toku_db_get_point_write_lock(db, txn, key);
186 }
187 if (r == 0) {
188 //Do the actual deleting.
189 if (!holds_mo_lock) toku_multi_operation_client_lock();
190 toku_ft_delete(db->i->ft_handle, key, txn ? db_txn_struct_i(txn)->tokutxn : 0);
191 if (!holds_mo_lock) toku_multi_operation_client_unlock();
192 }
193
194 if (r == 0) {
195 STATUS_VALUE(YDB_LAYER_NUM_DELETES)++; // accountability
196 }
197 else {
198 STATUS_VALUE(YDB_LAYER_NUM_DELETES_FAIL)++; // accountability
199 }
200 return r;
201}
202
203static int
204db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, int flags, bool do_log) {
205 int r = 0;
206 bool unique = false;
207 enum ft_msg_type type = FT_INSERT;
208 if (flags == DB_NOOVERWRITE) {
209 unique = true;
210 } else if (flags == DB_NOOVERWRITE_NO_ERROR) {
211 type = FT_INSERT_NO_OVERWRITE;
212 } else if (flags != 0) {
213 // All other non-zero flags are unsupported
214 r = EINVAL;
215 }
216 if (r == 0) {
217 TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : nullptr;
218 if (unique) {
219 r = toku_ft_insert_unique(db->i->ft_handle, key, val, ttxn, do_log);
220 } else {
221 toku_ft_maybe_insert(db->i->ft_handle, key, val, ttxn, false, ZERO_LSN, do_log, type);
222 }
223 invariant(r == DB_KEYEXIST || r == 0);
224 }
225 return r;
226}
227
228int
229toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, uint32_t flags, bool holds_mo_lock) {
230 HANDLE_PANICKED_DB(db);
231 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
232 HANDLE_READ_ONLY_TXN(txn);
233 int r = 0;
234
235 uint32_t lock_flags = get_prelocked_flags(flags);
236 flags &= ~lock_flags;
237
238 r = db_put_check_size_constraints(db, key, val);
239
240 //Do locking if necessary.
241 bool do_locking = (bool)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
242 if (r == 0 && do_locking) {
243 r = toku_db_get_point_write_lock(db, txn, key);
244 }
245 if (r == 0) {
246 //Insert into the ft.
247 if (!holds_mo_lock) toku_multi_operation_client_lock();
248 r = db_put(db, txn, key, val, flags, true);
249 if (!holds_mo_lock) toku_multi_operation_client_unlock();
250 }
251
252 if (r == 0) {
253 // helgrind flags a race on this status update. we increment it atomically to satisfy helgrind.
254 // STATUS_VALUE(YDB_LAYER_NUM_INSERTS)++; // accountability
255 (void) toku_sync_fetch_and_add(&STATUS_VALUE(YDB_LAYER_NUM_INSERTS), 1);
256 } else {
257 // STATUS_VALUE(YDB_LAYER_NUM_INSERTS_FAIL)++; // accountability
258 (void) toku_sync_fetch_and_add(&STATUS_VALUE(YDB_LAYER_NUM_INSERTS_FAIL), 1);
259 }
260
261 return r;
262}
263
264static int
265toku_db_update(DB *db, DB_TXN *txn,
266 const DBT *key,
267 const DBT *update_function_extra,
268 uint32_t flags) {
269 HANDLE_PANICKED_DB(db);
270 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
271 HANDLE_READ_ONLY_TXN(txn);
272 int r = 0;
273
274 uint32_t lock_flags = get_prelocked_flags(flags);
275 flags &= ~lock_flags;
276
277 r = db_put_check_size_constraints(db, key, update_function_extra);
278 if (r != 0) { goto cleanup; }
279
280 bool do_locking;
281 do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE));
282 if (do_locking) {
283 r = toku_db_get_point_write_lock(db, txn, key);
284 if (r != 0) { goto cleanup; }
285 }
286
287 TOKUTXN ttxn;
288 ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
289 toku_multi_operation_client_lock();
290 toku_ft_maybe_update(db->i->ft_handle, key, update_function_extra, ttxn,
291 false, ZERO_LSN, true);
292 toku_multi_operation_client_unlock();
293
294cleanup:
295 if (r == 0)
296 STATUS_VALUE(YDB_LAYER_NUM_UPDATES)++; // accountability
297 else
298 STATUS_VALUE(YDB_LAYER_NUM_UPDATES_FAIL)++; // accountability
299 return r;
300}
301
302
303// DB_IS_RESETTING_OP is true if the dictionary should be considered as if created by this transaction.
304// For example, it will be true if toku_db_update_broadcast() is used to implement a schema change (such
305// as adding a column), and will be false if used simply to update all the rows of a table (such as
306// incrementing a field).
307static int
308toku_db_update_broadcast(DB *db, DB_TXN *txn,
309 const DBT *update_function_extra,
310 uint32_t flags) {
311 HANDLE_PANICKED_DB(db);
312 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
313 HANDLE_READ_ONLY_TXN(txn);
314 int r = 0;
315
316 uint32_t lock_flags = get_prelocked_flags(flags);
317 flags &= ~lock_flags;
318 uint32_t is_resetting_op_flag = flags & DB_IS_RESETTING_OP;
319 flags &= is_resetting_op_flag;
320 bool is_resetting_op = (is_resetting_op_flag != 0);
321
322
323 if (is_resetting_op) {
324 if (txn->parent != NULL) {
325 r = EINVAL; // cannot have a parent if you are a resetting op
326 goto cleanup;
327 }
328 r = toku_db_pre_acquire_fileops_lock(db, txn);
329 if (r != 0) { goto cleanup; }
330 }
331 {
332 DBT null_key;
333 toku_init_dbt(&null_key);
334 r = db_put_check_size_constraints(db, &null_key, update_function_extra);
335 if (r != 0) { goto cleanup; }
336 }
337
338 bool do_locking;
339 do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE));
340 if (do_locking) {
341 r = toku_db_pre_acquire_table_lock(db, txn);
342 if (r != 0) { goto cleanup; }
343 }
344
345 TOKUTXN ttxn;
346 ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
347 toku_multi_operation_client_lock();
348 toku_ft_maybe_update_broadcast(db->i->ft_handle, update_function_extra, ttxn,
349 false, ZERO_LSN, true, is_resetting_op);
350 toku_multi_operation_client_unlock();
351
352cleanup:
353 if (r == 0)
354 STATUS_VALUE(YDB_LAYER_NUM_UPDATES_BROADCAST)++; // accountability
355 else
356 STATUS_VALUE(YDB_LAYER_NUM_UPDATES_BROADCAST_FAIL)++; // accountability
357 return r;
358}
359
360static void
361log_del_single(DB_TXN *txn, FT_HANDLE ft_handle, const DBT *key) {
362 TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
363 toku_ft_log_del(ttxn, ft_handle, key);
364}
365
366static uint32_t
367sum_size(uint32_t num_arrays, DBT_ARRAY keys[], uint32_t overhead) {
368 uint32_t sum = 0;
369 for (uint32_t i = 0; i < num_arrays; i++) {
370 for (uint32_t j = 0; j < keys[i].size; j++) {
371 sum += keys[i].dbts[j].size + overhead;
372 }
373 }
374 return sum;
375}
376
377static void
378log_del_multiple(DB_TXN *txn, DB *src_db, const DBT *key, const DBT *val, uint32_t num_dbs, FT_HANDLE fts[], DBT_ARRAY keys[]) {
379 if (num_dbs > 0) {
380 TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
381 FT_HANDLE src_ft = src_db ? src_db->i->ft_handle : NULL;
382 uint32_t del_multiple_size = key->size + val->size + num_dbs*sizeof (uint32_t) + toku_log_enq_delete_multiple_overhead;
383 uint32_t del_single_sizes = sum_size(num_dbs, keys, toku_log_enq_delete_any_overhead);
384 if (del_single_sizes < del_multiple_size) {
385 for (uint32_t i = 0; i < num_dbs; i++) {
386 for (uint32_t j = 0; j < keys[i].size; j++) {
387 log_del_single(txn, fts[i], &keys[i].dbts[j]);
388 }
389 }
390 } else {
391 toku_ft_log_del_multiple(ttxn, src_ft, fts, num_dbs, key, val);
392 }
393 }
394}
395
396static uint32_t
397lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) {
398 uint32_t which_db;
399 for (which_db = 0; which_db < num_dbs; which_db++)
400 if (db_array[which_db] == src_db)
401 break;
402 return which_db;
403}
404
405static int
406do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key, bool indexer_shortcut) {
407 int r = 0;
408 TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
409 for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) {
410 DB *db = db_array[which_db];
411
412 paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
413
414 // if db is being indexed by an indexer, then insert a delete message into the db if the src key is to the left or equal to the
415 // indexers cursor. we have to get the src_db from the indexer and find it in the db_array.
416 int do_delete = true;
417 DB_INDEXER *indexer = toku_db_get_indexer(db);
418 if (indexer && !indexer_shortcut) { // if this db is the index under construction
419 DB *indexer_src_db = toku_indexer_get_src_db(indexer);
420 invariant(indexer_src_db != NULL);
421 const DBT *indexer_src_key;
422 if (src_db == indexer_src_db)
423 indexer_src_key = src_key;
424 else {
425 uint32_t which_src_db = lookup_src_db(num_dbs, db_array, indexer_src_db);
426 invariant(which_src_db < num_dbs);
427 // The indexer src db must have exactly one item or we don't know how to continue.
428 invariant(keys[which_src_db].size == 1);
429 indexer_src_key = &keys[which_src_db].dbts[0];
430 }
431 do_delete = toku_indexer_should_insert_key(indexer, indexer_src_key);
432 toku_indexer_update_estimate(indexer);
433 }
434 if (do_delete) {
435 for (uint32_t i = 0; i < keys[which_db].size; i++) {
436 toku_ft_maybe_delete(db->i->ft_handle, &keys[which_db].dbts[i], ttxn, false, ZERO_LSN, false);
437 }
438 }
439 }
440 return r;
441}
442
443//
444// if a hot index is in progress, gets the indexer
445// also verifies that there is at most one hot index
446// in progress. If it finds more than one, then returns EINVAL
447//
448static int
449get_indexer_if_exists(
450 uint32_t num_dbs,
451 DB **db_array,
452 DB *src_db,
453 DB_INDEXER** indexerp,
454 bool *src_db_is_indexer_src
455 )
456{
457 int r = 0;
458 DB_INDEXER* first_indexer = NULL;
459 for (uint32_t i = 0; i < num_dbs; i++) {
460 DB_INDEXER* indexer = toku_db_get_indexer(db_array[i]);
461 if (indexer) {
462 if (!first_indexer) {
463 first_indexer = indexer;
464 }
465 else if (first_indexer != indexer) {
466 r = EINVAL;
467 }
468 }
469 }
470 if (r == 0) {
471 if (first_indexer) {
472 DB* indexer_src_db = toku_indexer_get_src_db(first_indexer);
473 // we should just make this an invariant
474 if (src_db == indexer_src_db) {
475 *src_db_is_indexer_src = true;
476 }
477 }
478 *indexerp = first_indexer;
479 }
480 return r;
481}
482
483int
484env_del_multiple(
485 DB_ENV *env,
486 DB *src_db,
487 DB_TXN *txn,
488 const DBT *src_key,
489 const DBT *src_val,
490 uint32_t num_dbs,
491 DB **db_array,
492 DBT_ARRAY *keys,
493 uint32_t *flags_array)
494{
495 int r;
496 DBT_ARRAY del_keys[num_dbs];
497 DB_INDEXER* indexer = NULL;
498
499 HANDLE_PANICKED_ENV(env);
500 HANDLE_READ_ONLY_TXN(txn);
501
502 uint32_t lock_flags[num_dbs];
503 uint32_t remaining_flags[num_dbs];
504 FT_HANDLE fts[num_dbs];
505 bool indexer_lock_taken = false;
506 bool src_same = false;
507 bool indexer_shortcut = false;
508 if (!txn) {
509 r = EINVAL;
510 goto cleanup;
511 }
512 if (!env->i->generate_row_for_del) {
513 r = EINVAL;
514 goto cleanup;
515 }
516
517 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
518 r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
519 if (r) {
520 goto cleanup;
521 }
522
523 for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
524 DB *db = db_array[which_db];
525 lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
526 remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
527
528 if (db == src_db) {
529 del_keys[which_db].size = 1;
530 del_keys[which_db].capacity = 1;
531 del_keys[which_db].dbts = const_cast<DBT*>(src_key);
532 }
533 else {
534 //Generate the key
535 r = env->i->generate_row_for_del(db, src_db, &keys[which_db], src_key, src_val);
536 if (r != 0) goto cleanup;
537 del_keys[which_db] = keys[which_db];
538 paranoid_invariant(del_keys[which_db].size <= del_keys[which_db].capacity);
539 }
540
541 if (remaining_flags[which_db] & ~DB_DELETE_ANY) {
542 r = EINVAL;
543 goto cleanup;
544 }
545 bool error_if_missing = (bool)(!(remaining_flags[which_db]&DB_DELETE_ANY));
546 for (uint32_t which_key = 0; which_key < del_keys[which_db].size; which_key++) {
547 DBT *del_key = &del_keys[which_db].dbts[which_key];
548 if (error_if_missing) {
549 //Check if the key exists in the db.
550 //Grabs a write lock
551 r = db_getf_set(db, txn, lock_flags[which_db]|DB_SERIALIZABLE|DB_RMW, del_key, ydb_getf_do_nothing, NULL);
552 if (r != 0) goto cleanup;
553 } else if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) { //Do locking if necessary.
554 //Needs locking
555 r = toku_db_get_point_write_lock(db, txn, del_key);
556 if (r != 0) goto cleanup;
557 }
558 }
559 fts[which_db] = db->i->ft_handle;
560 }
561
562 if (indexer) {
563 // do a cheap check
564 if (src_same) {
565 bool may_insert = toku_indexer_may_insert(indexer, src_key);
566 if (!may_insert) {
567 toku_indexer_lock(indexer);
568 indexer_lock_taken = true;
569 }
570 else {
571 indexer_shortcut = true;
572 }
573 }
574 }
575 toku_multi_operation_client_lock();
576 log_del_multiple(txn, src_db, src_key, src_val, num_dbs, fts, del_keys);
577 r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key, indexer_shortcut);
578 toku_multi_operation_client_unlock();
579 if (indexer_lock_taken) {
580 toku_indexer_unlock(indexer);
581 }
582
583cleanup:
584 if (r == 0)
585 STATUS_VALUE(YDB_LAYER_NUM_MULTI_DELETES) += num_dbs; // accountability
586 else
587 STATUS_VALUE(YDB_LAYER_NUM_MULTI_DELETES_FAIL) += num_dbs; // accountability
588 return r;
589}
590
591static void
592log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val, uint32_t num_dbs, FT_HANDLE fts[]) {
593 if (num_dbs > 0) {
594 TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
595 FT_HANDLE src_ft = src_db ? src_db->i->ft_handle : NULL;
596 toku_ft_log_put_multiple(ttxn, src_ft, fts, num_dbs, src_key, src_val);
597 }
598}
599
600// Requires: If remaining_flags is non-null, this function performs any required uniqueness checks
601// Otherwise, the caller is responsible.
602static int
603do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], uint32_t *remaining_flags, DB *src_db, const DBT *src_key, bool indexer_shortcut) {
604 int r = 0;
605 for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
606 DB *db = db_array[which_db];
607
608 invariant(keys[which_db].size == vals[which_db].size);
609 paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
610 paranoid_invariant(vals[which_db].size <= vals[which_db].capacity);
611
612 if (keys[which_db].size > 0) {
613 bool do_put = true;
614 DB_INDEXER *indexer = toku_db_get_indexer(db);
615 if (indexer && !indexer_shortcut) { // if this db is the index under construction
616 DB *indexer_src_db = toku_indexer_get_src_db(indexer);
617 invariant(indexer_src_db != NULL);
618 const DBT *indexer_src_key;
619 if (src_db == indexer_src_db)
620 indexer_src_key = src_key;
621 else {
622 uint32_t which_src_db = lookup_src_db(num_dbs, db_array, indexer_src_db);
623 invariant(which_src_db < num_dbs);
624 // The indexer src db must have exactly one item or we don't know how to continue.
625 invariant(keys[which_src_db].size == 1);
626 indexer_src_key = &keys[which_src_db].dbts[0];
627 }
628 do_put = toku_indexer_should_insert_key(indexer, indexer_src_key);
629 toku_indexer_update_estimate(indexer);
630 }
631 if (do_put) {
632 for (uint32_t i = 0; i < keys[which_db].size; i++) {
633 int flags = 0;
634 if (remaining_flags != nullptr) {
635 flags = remaining_flags[which_db];
636 invariant(!(flags & DB_NOOVERWRITE_NO_ERROR));
637 }
638 r = db_put(db, txn, &keys[which_db].dbts[i], &vals[which_db].dbts[i], flags, false);
639 if (r != 0) {
640 goto done;
641 }
642 }
643 }
644 }
645 }
646done:
647 return r;
648}
649
650static int
651env_put_multiple_internal(
652 DB_ENV *env,
653 DB *src_db,
654 DB_TXN *txn,
655 const DBT *src_key,
656 const DBT *src_val,
657 uint32_t num_dbs,
658 DB **db_array,
659 DBT_ARRAY *keys,
660 DBT_ARRAY *vals,
661 uint32_t *flags_array)
662{
663 int r;
664 DBT_ARRAY put_keys[num_dbs];
665 DBT_ARRAY put_vals[num_dbs];
666 DB_INDEXER* indexer = NULL;
667
668 HANDLE_PANICKED_ENV(env);
669 HANDLE_READ_ONLY_TXN(txn);
670
671 uint32_t lock_flags[num_dbs];
672 uint32_t remaining_flags[num_dbs];
673 FT_HANDLE fts[num_dbs];
674 bool indexer_shortcut = false;
675 bool indexer_lock_taken = false;
676 bool src_same = false;
677
678 if (!txn || !num_dbs) {
679 r = EINVAL;
680 goto cleanup;
681 }
682 if (!env->i->generate_row_for_put) {
683 r = EINVAL;
684 goto cleanup;
685 }
686
687 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
688 r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
689 if (r) {
690 goto cleanup;
691 }
692
693 for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
694 DB *db = db_array[which_db];
695
696 lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
697 remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
698
699 //Generate the row
700 if (db == src_db) {
701 put_keys[which_db].size = put_keys[which_db].capacity = 1;
702 put_keys[which_db].dbts = const_cast<DBT*>(src_key);
703
704 put_vals[which_db].size = put_vals[which_db].capacity = 1;
705 put_vals[which_db].dbts = const_cast<DBT*>(src_val);
706 }
707 else {
708 r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], src_key, src_val);
709 if (r != 0) goto cleanup;
710
711 paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
712 paranoid_invariant(vals[which_db].size <= vals[which_db].capacity);
713 paranoid_invariant(keys[which_db].size == vals[which_db].size);
714
715 put_keys[which_db] = keys[which_db];
716 put_vals[which_db] = vals[which_db];
717 }
718 for (uint32_t i = 0; i < put_keys[which_db].size; i++) {
719 DBT &put_key = put_keys[which_db].dbts[i];
720 DBT &put_val = put_vals[which_db].dbts[i];
721
722 // check size constraints
723 r = db_put_check_size_constraints(db, &put_key, &put_val);
724 if (r != 0) goto cleanup;
725
726 if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
727 //put_multiple does not support delaying the no error, since we would
728 //have to log the flag in the put_multiple.
729 r = EINVAL; goto cleanup;
730 }
731
732 //Do locking if necessary.
733 if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
734 //Needs locking
735 r = toku_db_get_point_write_lock(db, txn, &put_key);
736 if (r != 0) goto cleanup;
737 }
738 }
739 fts[which_db] = db->i->ft_handle;
740 }
741
742 if (indexer) {
743 // do a cheap check
744 if (src_same) {
745 bool may_insert = toku_indexer_may_insert(indexer, src_key);
746 if (!may_insert) {
747 toku_indexer_lock(indexer);
748 indexer_lock_taken = true;
749 }
750 else {
751 indexer_shortcut = true;
752 }
753 }
754 }
755 toku_multi_operation_client_lock();
756 r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, remaining_flags, src_db, src_key, indexer_shortcut);
757 if (r == 0) {
758 log_put_multiple(txn, src_db, src_key, src_val, num_dbs, fts);
759 }
760 toku_multi_operation_client_unlock();
761 if (indexer_lock_taken) {
762 toku_indexer_unlock(indexer);
763 }
764
765cleanup:
766 if (r == 0)
767 STATUS_VALUE(YDB_LAYER_NUM_MULTI_INSERTS) += num_dbs; // accountability
768 else
769 STATUS_VALUE(YDB_LAYER_NUM_MULTI_INSERTS_FAIL) += num_dbs; // accountability
770 return r;
771}
772
773static void swap_dbts(DBT *a, DBT *b) {
774 DBT c;
775 c = *a;
776 *a = *b;
777 *b = c;
778}
779
780//TODO: 26 Add comment in API description about.. new val.size being generated as '0' REQUIRES old_val.size == 0
781//
782int
783env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
784 DBT *old_src_key, DBT *old_src_data,
785 DBT *new_src_key, DBT *new_src_data,
786 uint32_t num_dbs, DB **db_array, uint32_t* flags_array,
787 uint32_t num_keys, DBT_ARRAY keys[],
788 uint32_t num_vals, DBT_ARRAY vals[]) {
789 int r = 0;
790
791 HANDLE_PANICKED_ENV(env);
792 DB_INDEXER* indexer = NULL;
793 bool indexer_shortcut = false;
794 bool indexer_lock_taken = false;
795 bool src_same = false;
796 HANDLE_READ_ONLY_TXN(txn);
797 DBT_ARRAY old_key_arrays[num_dbs];
798 DBT_ARRAY new_key_arrays[num_dbs];
799 DBT_ARRAY new_val_arrays[num_dbs];
800
801 if (!txn) {
802 r = EINVAL;
803 goto cleanup;
804 }
805 if (!env->i->generate_row_for_put) {
806 r = EINVAL;
807 goto cleanup;
808 }
809
810 if (num_dbs + num_dbs > num_keys || num_dbs > num_vals) {
811 r = ENOMEM; goto cleanup;
812 }
813
814 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
815 r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
816 if (r) {
817 goto cleanup;
818 }
819
820 {
821 uint32_t n_del_dbs = 0;
822 DB *del_dbs[num_dbs];
823 FT_HANDLE del_fts[num_dbs];
824 DBT_ARRAY del_key_arrays[num_dbs];
825
826 uint32_t n_put_dbs = 0;
827 DB *put_dbs[num_dbs];
828 FT_HANDLE put_fts[num_dbs];
829 DBT_ARRAY put_key_arrays[num_dbs];
830 DBT_ARRAY put_val_arrays[num_dbs];
831
832 uint32_t lock_flags[num_dbs];
833 uint32_t remaining_flags[num_dbs];
834
835 for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
836 DB *db = db_array[which_db];
837
838 lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
839 remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
840
841 if (db == src_db) {
842 // Copy the old keys
843 old_key_arrays[which_db].size = old_key_arrays[which_db].capacity = 1;
844 old_key_arrays[which_db].dbts = old_src_key;
845
846 // Copy the new keys and vals
847 new_key_arrays[which_db].size = new_key_arrays[which_db].capacity = 1;
848 new_key_arrays[which_db].dbts = new_src_key;
849
850 new_val_arrays[which_db].size = new_val_arrays[which_db].capacity = 1;
851 new_val_arrays[which_db].dbts = new_src_data;
852 } else {
853 // keys[0..num_dbs-1] are the new keys
854 // keys[num_dbs..2*num_dbs-1] are the old keys
855 // vals[0..num_dbs-1] are the new vals
856
857 // Generate the old keys
858 r = env->i->generate_row_for_put(db, src_db, &keys[which_db + num_dbs], NULL, old_src_key, old_src_data);
859 if (r != 0) goto cleanup;
860
861 paranoid_invariant(keys[which_db+num_dbs].size <= keys[which_db+num_dbs].capacity);
862 old_key_arrays[which_db] = keys[which_db+num_dbs];
863
864 // Generate the new keys and vals
865 r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], new_src_key, new_src_data);
866 if (r != 0) goto cleanup;
867
868 paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
869 paranoid_invariant(vals[which_db].size <= vals[which_db].capacity);
870 paranoid_invariant(keys[which_db].size == vals[which_db].size);
871
872 new_key_arrays[which_db] = keys[which_db];
873 new_val_arrays[which_db] = vals[which_db];
874 }
875 DBT_ARRAY &old_keys = old_key_arrays[which_db];
876 DBT_ARRAY &new_keys = new_key_arrays[which_db];
877 DBT_ARRAY &new_vals = new_val_arrays[which_db];
878
879 uint32_t num_skip = 0;
880 uint32_t num_del = 0;
881 uint32_t num_put = 0;
882 // Next index in old_keys to look at
883 uint32_t idx_old = 0;
884 // Next index in new_keys/new_vals to look at
885 uint32_t idx_new = 0;
886 uint32_t idx_old_used = 0;
887 uint32_t idx_new_used = 0;
888 while (idx_old < old_keys.size || idx_new < new_keys.size) {
889 // Check for old key, both, new key
890 DBT *curr_old_key = &old_keys.dbts[idx_old];
891 DBT *curr_new_key = &new_keys.dbts[idx_new];
892 DBT *curr_new_val = &new_vals.dbts[idx_new];
893
894 bool locked_new_key = false;
895 int cmp;
896 if (idx_new == new_keys.size) {
897 cmp = -1;
898 } else if (idx_old == old_keys.size) {
899 cmp = +1;
900 } else {
901 const toku::comparator &cmpfn = toku_db_get_comparator(db);
902 cmp = cmpfn(curr_old_key, curr_new_key);
903 }
904
905 bool do_del = false;
906 bool do_put = false;
907 bool do_skip = false;
908 if (cmp > 0) { // New key does not exist in old array
909 //Check overwrite constraints only in the case where the keys are not equal
910 //(new key is alone/not equal to old key)
911 // If the keys are equal, then we do not care of the flag is DB_NOOVERWRITE or 0
912 r = db_put_check_overwrite_constraint(db, txn,
913 curr_new_key,
914 lock_flags[which_db], remaining_flags[which_db]);
915 if (r != 0) goto cleanup;
916 if (remaining_flags[which_db] == DB_NOOVERWRITE) {
917 locked_new_key = true;
918 }
919
920 if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
921 //update_multiple does not support delaying the no error, since we would
922 //have to log the flag in the put_multiple.
923 r = EINVAL; goto cleanup;
924 }
925 do_put = true;
926 } else if (cmp < 0) {
927 // lock old key only when it does not exist in new array
928 // otherwise locking new key takes care of this
929 if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
930 r = toku_db_get_point_write_lock(db, txn, curr_old_key);
931 if (r != 0) goto cleanup;
932 }
933 do_del = true;
934 } else {
935 do_put = curr_new_val->size > 0 ||
936 curr_old_key->size != curr_new_key->size ||
937 memcmp(curr_old_key->data, curr_new_key->data, curr_old_key->size);
938 do_skip = !do_put;
939 }
940 // Check put size constraints and insert new key only if keys are unequal (byte for byte) or there is a val
941 // We assume any val.size > 0 as unequal (saves on generating old val)
942 // (allows us to avoid generating the old val)
943 // we assume that any new vals with size > 0 are different than the old val
944 // if (!key_eq || !(dbt_cmp(&vals[which_db], &vals[which_db + num_dbs]) == 0)) { /* ... */ }
945 if (do_put) {
946 r = db_put_check_size_constraints(db, curr_new_key, curr_new_val);
947 if (r != 0) goto cleanup;
948
949 // lock new key unless already locked
950 if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE) && !locked_new_key) {
951 r = toku_db_get_point_write_lock(db, txn, curr_new_key);
952 if (r != 0) goto cleanup;
953 }
954 }
955
956 // TODO: 26 Add comments explaining squish and why not just use another stack array
957 // Add more comments to explain this if elseif else well
958 if (do_skip) {
959 paranoid_invariant(cmp == 0);
960 paranoid_invariant(!do_put);
961 paranoid_invariant(!do_del);
962
963 num_skip++;
964 idx_old++;
965 idx_new++;
966 } else if (do_put) {
967 paranoid_invariant(cmp >= 0);
968 paranoid_invariant(!do_skip);
969 paranoid_invariant(!do_del);
970
971 num_put++;
972 if (idx_new != idx_new_used) {
973 swap_dbts(&new_keys.dbts[idx_new_used], &new_keys.dbts[idx_new]);
974 swap_dbts(&new_vals.dbts[idx_new_used], &new_vals.dbts[idx_new]);
975 }
976 idx_new++;
977 idx_new_used++;
978 if (cmp == 0) {
979 idx_old++;
980 }
981 } else {
982 invariant(do_del);
983 paranoid_invariant(cmp < 0);
984 paranoid_invariant(!do_skip);
985 paranoid_invariant(!do_put);
986
987 num_del++;
988 if (idx_old != idx_old_used) {
989 swap_dbts(&old_keys.dbts[idx_old_used], &old_keys.dbts[idx_old]);
990 }
991 idx_old++;
992 idx_old_used++;
993 }
994 }
995 old_keys.size = idx_old_used;
996 new_keys.size = idx_new_used;
997 new_vals.size = idx_new_used;
998
999 if (num_del > 0) {
1000 del_dbs[n_del_dbs] = db;
1001 del_fts[n_del_dbs] = db->i->ft_handle;
1002 del_key_arrays[n_del_dbs] = old_keys;
1003 n_del_dbs++;
1004 }
1005 // If we put none, but delete some, but not all, then we need the log_put_multiple to happen.
1006 // Include this db in the put_dbs so we do log_put_multiple.
1007 // do_put_multiple will be a no-op for this db.
1008 if (num_put > 0 || (num_del > 0 && num_skip > 0)) {
1009 put_dbs[n_put_dbs] = db;
1010 put_fts[n_put_dbs] = db->i->ft_handle;
1011 put_key_arrays[n_put_dbs] = new_keys;
1012 put_val_arrays[n_put_dbs] = new_vals;
1013 n_put_dbs++;
1014 }
1015 }
1016 if (indexer) {
1017 // do a cheap check
1018 if (src_same) {
1019 bool may_insert =
1020 toku_indexer_may_insert(indexer, old_src_key) &&
1021 toku_indexer_may_insert(indexer, new_src_key);
1022 if (!may_insert) {
1023 toku_indexer_lock(indexer);
1024 indexer_lock_taken = true;
1025 }
1026 else {
1027 indexer_shortcut = true;
1028 }
1029 }
1030 }
1031 toku_multi_operation_client_lock();
1032 if (r == 0 && n_del_dbs > 0) {
1033 log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_fts, del_key_arrays);
1034 r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key, indexer_shortcut);
1035 }
1036
1037 if (r == 0 && n_put_dbs > 0) {
1038 // We sometimes skip some keys for del/put during runtime, but during recovery
1039 // we (may) delete ALL the keys for a given DB. Therefore we must put ALL the keys during
1040 // recovery so we don't end up losing data.
1041 // So unlike env->put_multiple, we ONLY log a 'put_multiple' log entry.
1042 log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_fts);
1043 r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, nullptr, src_db, new_src_key, indexer_shortcut);
1044 }
1045 toku_multi_operation_client_unlock();
1046 if (indexer_lock_taken) {
1047 toku_indexer_unlock(indexer);
1048 }
1049 }
1050
1051cleanup:
1052 if (r == 0)
1053 STATUS_VALUE(YDB_LAYER_NUM_MULTI_UPDATES) += num_dbs; // accountability
1054 else
1055 STATUS_VALUE(YDB_LAYER_NUM_MULTI_UPDATES_FAIL) += num_dbs; // accountability
1056 return r;
1057}
1058
1059int
1060autotxn_db_del(DB* db, DB_TXN* txn, DBT* key, uint32_t flags) {
1061 bool changed; int r;
1062 r = toku_db_construct_autotxn(db, &txn, &changed, false);
1063 if (r!=0) return r;
1064 r = toku_db_del(db, txn, key, flags, false);
1065 return toku_db_destruct_autotxn(txn, r, changed);
1066}
1067
1068int
1069autotxn_db_put(DB* db, DB_TXN* txn, DBT* key, DBT* data, uint32_t flags) {
1070 //{ unsigned i; printf("put %p keylen=%d key={", db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", data->size); for(i=0; i<data->size; i++) printf("%d,", ((char*)data->data)[i]); printf("}\n"); }
1071 bool changed; int r;
1072 r = env_check_avail_fs_space(db->dbenv);
1073 if (r != 0) { goto cleanup; }
1074 r = toku_db_construct_autotxn(db, &txn, &changed, false);
1075 if (r!=0) {
1076 goto cleanup;
1077 }
1078 r = toku_db_put(db, txn, key, data, flags, false);
1079 r = toku_db_destruct_autotxn(txn, r, changed);
1080cleanup:
1081 return r;
1082}
1083
1084int
1085autotxn_db_update(DB *db, DB_TXN *txn,
1086 const DBT *key,
1087 const DBT *update_function_extra,
1088 uint32_t flags) {
1089 bool changed; int r;
1090 r = env_check_avail_fs_space(db->dbenv);
1091 if (r != 0) { goto cleanup; }
1092 r = toku_db_construct_autotxn(db, &txn, &changed, false);
1093 if (r != 0) { return r; }
1094 r = toku_db_update(db, txn, key, update_function_extra, flags);
1095 r = toku_db_destruct_autotxn(txn, r, changed);
1096cleanup:
1097 return r;
1098}
1099
1100int
1101autotxn_db_update_broadcast(DB *db, DB_TXN *txn,
1102 const DBT *update_function_extra,
1103 uint32_t flags) {
1104 bool changed; int r;
1105 r = env_check_avail_fs_space(db->dbenv);
1106 if (r != 0) { goto cleanup; }
1107 r = toku_db_construct_autotxn(db, &txn, &changed, false);
1108 if (r != 0) { return r; }
1109 r = toku_db_update_broadcast(db, txn, update_function_extra, flags);
1110 r = toku_db_destruct_autotxn(txn, r, changed);
1111cleanup:
1112 return r;
1113}
1114
1115int
1116env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *src_key, const DBT *src_val, uint32_t num_dbs, DB **db_array, DBT_ARRAY *keys, DBT_ARRAY *vals, uint32_t *flags_array) {
1117 int r = env_check_avail_fs_space(env);
1118 if (r == 0) {
1119 r = env_put_multiple_internal(env, src_db, txn, src_key, src_val, num_dbs, db_array, keys, vals, flags_array);
1120 }
1121 return r;
1122}
1123
1124int
1125toku_ydb_check_avail_fs_space(DB_ENV *env) {
1126 int rval = env_check_avail_fs_space(env);
1127 return rval;
1128}
1129#undef STATUS_VALUE
1130
1131#include <toku_race_tools.h>
1132void __attribute__((constructor)) toku_ydb_write_helgrind_ignore(void);
1133void
1134toku_ydb_write_helgrind_ignore(void) {
1135 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_write_layer_status, sizeof ydb_write_layer_status);
1136}
1137