1/* Copyright (C) 2004-2008 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 Copyright (C) 2008-2009 Sun Microsystems, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17/* Write a row to a MARIA table */
18
19#include "ma_fulltext.h"
20#include "ma_rt_index.h"
21#include "trnman.h"
22#include "ma_key_recover.h"
23#include "ma_blockrec.h"
24
25 /* Functions declared in this file */
26
27static int w_search(MARIA_HA *info, uint32 comp_flag,
28 MARIA_KEY *key, my_off_t page,
29 MARIA_PAGE *father_page, uchar *father_keypos,
30 my_bool insert_last);
31static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
32 MARIA_KEY *key, MARIA_PAGE *curr_page,
33 MARIA_PAGE *father_page,
34 uchar *father_key_pos, MARIA_KEY_PARAM *s_temp);
35static uchar *_ma_find_last_pos(MARIA_KEY *int_key,
36 MARIA_PAGE *page, uchar **after_key);
37static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key);
38static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key);
39static my_bool _ma_ck_write_btree_with_log(MARIA_HA *, MARIA_KEY *, my_off_t *,
40 uint32);
41static my_bool _ma_log_split(MARIA_PAGE *page, uint org_length,
42 uint new_length,
43 const uchar *key_pos,
44 uint key_length, int move_length,
45 enum en_key_op prefix_or_suffix,
46 const uchar *data, uint data_length,
47 uint changed_length);
48static my_bool _ma_log_del_prefix(MARIA_PAGE *page,
49 uint org_length, uint new_length,
50 const uchar *key_pos, uint key_length,
51 int move_length);
52static my_bool _ma_log_key_middle(MARIA_PAGE *page,
53 uint new_length,
54 uint data_added_first,
55 uint data_changed_first,
56 uint data_deleted_last,
57 const uchar *key_pos,
58 uint key_length, int move_length);
59
60/*
61 @brief Default handler for returing position to new row
62
63 @note
64 This is only called for non transactional tables and not for block format
65 which is why we use info->state here.
66*/
67
68MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info,
69 const uchar *record
70 __attribute__((unused)))
71{
72 return ((info->s->state.dellink != HA_OFFSET_ERROR &&
73 !info->append_insert_at_end) ?
74 info->s->state.dellink :
75 info->state->data_file_length);
76}
77
78my_bool _ma_write_abort_default(MARIA_HA *info __attribute__((unused)))
79{
80 return 0;
81}
82
83
84/* Write new record to a table */
85
86int maria_write(MARIA_HA *info, uchar *record)
87{
88 MARIA_SHARE *share= info->s;
89 uint i;
90 int save_errno;
91 MARIA_RECORD_POS filepos;
92 uchar *buff;
93 my_bool lock_tree= share->lock_key_trees;
94 my_bool fatal_error;
95 MARIA_KEYDEF *keyinfo;
96 DBUG_ENTER("maria_write");
97 DBUG_PRINT("enter",("index_file: %d data_file: %d",
98 share->kfile.file, info->dfile.file));
99
100 DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
101 maria_print_error(info->s, HA_ERR_CRASHED);
102 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
103 if (share->options & HA_OPTION_READ_ONLY_DATA)
104 {
105 DBUG_RETURN(my_errno=EACCES);
106 }
107 if (_ma_readinfo(info,F_WRLCK,1))
108 DBUG_RETURN(my_errno);
109
110 if ((share->state.changed & STATE_DATA_FILE_FULL) ||
111 (share->base.reloc == (ha_rows) 1 &&
112 share->base.records == (ha_rows) 1 &&
113 share->state.state.records == (ha_rows) 1))
114 { /* System file */
115 my_errno=HA_ERR_RECORD_FILE_FULL;
116 goto err2;
117 }
118 if (share->state.state.key_file_length >= share->base.margin_key_file_length)
119 {
120 my_errno=HA_ERR_INDEX_FILE_FULL;
121 goto err2;
122 }
123 if (_ma_mark_file_changed(share))
124 goto err2;
125
126 /* Calculate and check all unique constraints */
127
128 if (share->state.header.uniques)
129 {
130 for (i=0 ; i < share->state.header.uniques ; i++)
131 {
132 MARIA_UNIQUEDEF *def= share->uniqueinfo + i;
133 ha_checksum unique_hash= _ma_unique_hash(share->uniqueinfo+i,record);
134 if (maria_is_key_active(share->state.key_map, def->key))
135 {
136 if (_ma_check_unique(info, def, record,
137 unique_hash, HA_OFFSET_ERROR))
138 goto err2;
139 }
140 else
141 maria_unique_store(record+ share->keyinfo[def->key].seg->start,
142 unique_hash);
143 }
144 }
145
146 /* Ensure we don't try to restore auto_increment if it doesn't change */
147 info->last_auto_increment= ~(ulonglong) 0;
148
149 if ((info->opt_flag & OPT_NO_ROWS))
150 filepos= HA_OFFSET_ERROR;
151 else
152 {
153 /*
154 This may either calculate a record or, or write the record and return
155 the record id
156 */
157 if ((filepos= (*share->write_record_init)(info, record)) ==
158 HA_OFFSET_ERROR)
159 goto err2;
160 }
161
162 /* Write all keys to indextree */
163 buff= info->lastkey_buff2;
164 for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
165 {
166 MARIA_KEY int_key;
167 if (maria_is_key_active(share->state.key_map, i))
168 {
169 my_bool local_lock_tree= (lock_tree &&
170 !(info->bulk_insert &&
171 is_tree_inited(&info->bulk_insert[i])));
172 if (local_lock_tree)
173 {
174 mysql_rwlock_wrlock(&keyinfo->root_lock);
175 keyinfo->version++;
176 }
177 if (keyinfo->flag & HA_FULLTEXT )
178 {
179 if (_ma_ft_add(info,i, buff,record,filepos))
180 {
181 if (local_lock_tree)
182 mysql_rwlock_unlock(&keyinfo->root_lock);
183 DBUG_PRINT("error",("Got error: %d on write",my_errno));
184 goto err;
185 }
186 }
187 else
188 {
189 while (keyinfo->ck_insert(info,
190 (*keyinfo->make_key)(info, &int_key, i,
191 buff, record, filepos,
192 info->trn->trid)))
193 {
194 TRN *blocker;
195 DBUG_PRINT("error",("Got error: %d on write",my_errno));
196 /*
197 explicit check to filter out temp tables, they aren't
198 transactional and don't have a proper TRN so the code
199 below doesn't work for them.
200 Also, filter out non-thread maria use, and table modified in
201 the same transaction.
202 At last, filter out non-dup-unique errors.
203 */
204 if (!local_lock_tree)
205 goto err;
206 if (info->dup_key_trid == info->trn->trid ||
207 my_errno != HA_ERR_FOUND_DUPP_KEY)
208 {
209 mysql_rwlock_unlock(&keyinfo->root_lock);
210 goto err;
211 }
212 /* Different TrIDs: table must be transactional */
213 DBUG_ASSERT(share->base.born_transactional);
214 /*
215 If transactions are disabled, and dup_key_trid is different from
216 our TrID, it must be ALTER TABLE with dup_key_trid==0 (no
217 transaction). ALTER TABLE does have MARIA_HA::TRN not dummy but
218 puts TrID=0 in rows/keys.
219 */
220 DBUG_ASSERT(share->now_transactional ||
221 (info->dup_key_trid == 0));
222 blocker= trnman_trid_to_trn(info->trn, info->dup_key_trid);
223 /*
224 if blocker TRN was not found, it means that the conflicting
225 transaction was committed long time ago. It could not be
226 aborted, as it would have to wait on the key tree lock
227 to remove the conflicting key it has inserted.
228 */
229 if (!blocker || blocker->commit_trid != ~(TrID)0)
230 { /* committed */
231 if (blocker)
232 mysql_mutex_unlock(& blocker->state_lock);
233 mysql_rwlock_unlock(&keyinfo->root_lock);
234 goto err;
235 }
236 mysql_rwlock_unlock(&keyinfo->root_lock);
237 {
238 /* running. now we wait */
239 WT_RESOURCE_ID rc;
240 int res;
241 PSI_stage_info old_stage_info;
242
243 rc.type= &ma_rc_dup_unique;
244 /* TODO savepoint id when we'll have them */
245 rc.value= (intptr)blocker;
246 res= wt_thd_will_wait_for(info->trn->wt, blocker->wt, & rc);
247 if (res != WT_OK)
248 {
249 mysql_mutex_unlock(& blocker->state_lock);
250 my_errno= HA_ERR_LOCK_DEADLOCK;
251 goto err;
252 }
253 proc_info_hook(0, &stage_waiting_for_a_resource, &old_stage_info,
254 __func__, __FILE__, __LINE__);
255 res= wt_thd_cond_timedwait(info->trn->wt, & blocker->state_lock);
256 proc_info_hook(0, &old_stage_info, 0, __func__, __FILE__, __LINE__);
257
258 mysql_mutex_unlock(& blocker->state_lock);
259 if (res != WT_OK)
260 {
261 my_errno= res == WT_TIMEOUT ? HA_ERR_LOCK_WAIT_TIMEOUT
262 : HA_ERR_LOCK_DEADLOCK;
263 goto err;
264 }
265 }
266 mysql_rwlock_wrlock(&keyinfo->root_lock);
267#ifndef MARIA_CANNOT_ROLLBACK
268 keyinfo->version++;
269#endif
270 }
271 }
272
273 /* The above changed info->lastkey2. Inform maria_rnext_same(). */
274 info->update&= ~HA_STATE_RNEXT_SAME;
275
276 if (local_lock_tree)
277 mysql_rwlock_unlock(&keyinfo->root_lock);
278 }
279 }
280 if (share->calc_write_checksum)
281 info->cur_row.checksum= (*share->calc_write_checksum)(info,record);
282 if (filepos != HA_OFFSET_ERROR)
283 {
284 if ((*share->write_record)(info,record))
285 goto err;
286 info->state->checksum+= info->cur_row.checksum;
287 }
288 if (!share->now_transactional)
289 {
290 if (share->base.auto_key != 0)
291 {
292 const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
293 const uchar *key= record + keyseg->start;
294 set_if_bigger(share->state.auto_increment,
295 ma_retrieve_auto_increment(key, keyseg->type));
296 }
297 }
298 info->state->records++;
299 info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
300 HA_STATE_ROW_CHANGED);
301 info->row_changes++;
302 share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
303 info->state->changed= 1;
304
305 info->cur_row.lastpos= filepos;
306 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
307 if (info->invalidator != 0)
308 {
309 DBUG_PRINT("info", ("invalidator... '%s' (update)",
310 share->open_file_name.str));
311 (*info->invalidator)(share->open_file_name.str);
312 info->invalidator=0;
313 }
314
315 /*
316 Update status of the table. We need to do so after each row write
317 for the log tables, as we want the new row to become visible to
318 other threads as soon as possible. We don't lock mutex here
319 (as it is required by pthread memory visibility rules) as (1) it's
320 not critical to use outdated share->is_log_table value (2) locking
321 mutex here for every write is too expensive.
322 */
323 if (share->is_log_table)
324 _ma_update_status((void*) info);
325
326 DBUG_RETURN(0);
327
328err:
329 save_errno= my_errno;
330 fatal_error= 0;
331 if (my_errno == HA_ERR_FOUND_DUPP_KEY ||
332 my_errno == HA_ERR_RECORD_FILE_FULL ||
333 my_errno == HA_ERR_LOCK_DEADLOCK ||
334 my_errno == HA_ERR_LOCK_WAIT_TIMEOUT ||
335 my_errno == HA_ERR_NULL_IN_SPATIAL ||
336 my_errno == HA_ERR_OUT_OF_MEM)
337 {
338 if (info->bulk_insert)
339 {
340 uint j;
341 for (j=0 ; j < share->base.keys ; j++)
342 maria_flush_bulk_insert(info, j);
343 }
344 info->errkey= i < share->base.keys ? (int) i : -1;
345 /*
346 We delete keys in the reverse order of insertion. This is the order that
347 a rollback would do and is important for CLR_ENDs generated by
348 _ma_ft|ck_delete() and write_record_abort() to work (with any other
349 order they would cause wrong jumps in the chain).
350 */
351 while ( i-- > 0)
352 {
353 if (maria_is_key_active(share->state.key_map, i))
354 {
355 my_bool local_lock_tree= (lock_tree &&
356 !(info->bulk_insert &&
357 is_tree_inited(&info->bulk_insert[i])));
358 keyinfo= share->keyinfo + i;
359 if (local_lock_tree)
360 mysql_rwlock_wrlock(&keyinfo->root_lock);
361 /**
362 @todo RECOVERY BUG
363 The key deletes below should generate CLR_ENDs
364 */
365 if (keyinfo->flag & HA_FULLTEXT)
366 {
367 if (_ma_ft_del(info,i,buff,record,filepos))
368 {
369 if (local_lock_tree)
370 mysql_rwlock_unlock(&keyinfo->root_lock);
371 break;
372 }
373 }
374 else
375 {
376 MARIA_KEY key;
377 if (keyinfo->ck_delete(info,
378 (*keyinfo->make_key)(info, &key, i, buff,
379 record,
380 filepos,
381 info->trn->trid)))
382 {
383 if (local_lock_tree)
384 mysql_rwlock_unlock(&keyinfo->root_lock);
385 break;
386 }
387 }
388 if (local_lock_tree)
389 mysql_rwlock_unlock(&keyinfo->root_lock);
390 }
391 }
392 }
393 else
394 fatal_error= 1;
395
396 if (filepos != HA_OFFSET_ERROR)
397 {
398 if ((*share->write_record_abort)(info))
399 fatal_error= 1;
400 }
401
402 if (fatal_error)
403 {
404 maria_print_error(info->s, HA_ERR_CRASHED);
405 maria_mark_crashed(info);
406 }
407
408 info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
409 my_errno=save_errno;
410err2:
411 save_errno=my_errno;
412 DBUG_ASSERT(save_errno);
413 if (!save_errno)
414 save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
415 DBUG_PRINT("error", ("got error: %d", save_errno));
416 _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
417 DBUG_RETURN(my_errno=save_errno);
418} /* maria_write */
419
420
421/*
422 Write one key to btree
423
424 TODO
425 Remove this function and have bulk insert change keyinfo->ck_insert
426 to point to the right function
427*/
428
429my_bool _ma_ck_write(MARIA_HA *info, MARIA_KEY *key)
430{
431 DBUG_ENTER("_ma_ck_write");
432
433 if (info->bulk_insert &&
434 is_tree_inited(&info->bulk_insert[key->keyinfo->key_nr]))
435 {
436 DBUG_RETURN(_ma_ck_write_tree(info, key));
437 }
438 DBUG_RETURN(_ma_ck_write_btree(info, key));
439} /* _ma_ck_write */
440
441
442/**********************************************************************
443 Insert key into btree (normal case)
444**********************************************************************/
445
446static my_bool _ma_ck_write_btree(MARIA_HA *info, MARIA_KEY *key)
447{
448 my_bool error;
449 MARIA_KEYDEF *keyinfo= key->keyinfo;
450 my_off_t *root= &info->s->state.key_root[keyinfo->key_nr];
451 DBUG_ENTER("_ma_ck_write_btree");
452
453 error= _ma_ck_write_btree_with_log(info, key, root,
454 keyinfo->write_comp_flag | key->flag);
455 if (info->ft1_to_ft2)
456 {
457 if (!error)
458 error= _ma_ft_convert_to_ft2(info, key);
459 delete_dynamic(info->ft1_to_ft2);
460 my_free(info->ft1_to_ft2);
461 info->ft1_to_ft2=0;
462 }
463 DBUG_RETURN(error);
464} /* _ma_ck_write_btree */
465
466
467/**
468 @brief Write a key to the b-tree
469
470 @retval 1 error
471 @retval 0 ok
472*/
473
474static my_bool _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key,
475 my_off_t *root, uint32 comp_flag)
476{
477 MARIA_SHARE *share= info->s;
478 LSN lsn= LSN_IMPOSSIBLE;
479 int error;
480 my_off_t new_root= *root;
481 uchar key_buff[MARIA_MAX_KEY_BUFF];
482 MARIA_KEY org_key; /* Set/used when now_transactional=TRUE */
483 my_bool transactional= share->now_transactional;
484 DBUG_ENTER("_ma_ck_write_btree_with_log");
485
486 LINT_INIT_STRUCT(org_key);
487
488 if (transactional)
489 {
490 /* Save original value as the key may change */
491 org_key= *key;
492 memcpy(key_buff, key->data, key->data_length + key->ref_length);
493 }
494
495 error= _ma_ck_real_write_btree(info, key, &new_root, comp_flag);
496 if (!error && transactional)
497 {
498 /* Log the original value */
499 *key= org_key;
500 key->data= key_buff;
501 error= _ma_write_undo_key_insert(info, key, root, new_root, &lsn);
502 }
503 else
504 {
505 *root= new_root;
506 _ma_fast_unlock_key_del(info);
507 }
508 _ma_unpin_all_pages_and_finalize_row(info, lsn);
509
510 DBUG_RETURN(error != 0);
511} /* _ma_ck_write_btree_with_log */
512
513
514/**
515 @brief Write a key to the b-tree
516
517 @retval 1 error
518 @retval 0 ok
519*/
520
521my_bool _ma_ck_real_write_btree(MARIA_HA *info, MARIA_KEY *key, my_off_t *root,
522 uint32 comp_flag)
523{
524 int error;
525 DBUG_ENTER("_ma_ck_real_write_btree");
526
527 /* key_length parameter is used only if comp_flag is SEARCH_FIND */
528 if (*root == HA_OFFSET_ERROR ||
529 (error= w_search(info, comp_flag, key, *root, (MARIA_PAGE *) 0,
530 (uchar*) 0, 1)) > 0)
531 error= _ma_enlarge_root(info, key, root);
532 DBUG_RETURN(error != 0);
533} /* _ma_ck_real_write_btree */
534
535
536/**
537 @brief Make a new root with key as only pointer
538
539 @retval 1 error
540 @retval 0 ok
541*/
542
543my_bool _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root)
544{
545 uint t_length, nod_flag;
546 MARIA_KEY_PARAM s_temp;
547 MARIA_SHARE *share= info->s;
548 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
549 MARIA_KEYDEF *keyinfo= key->keyinfo;
550 MARIA_PAGE page;
551 my_bool res= 0;
552 DBUG_ENTER("_ma_enlarge_root");
553
554 page.info= info;
555 page.keyinfo= keyinfo;
556 page.buff= info->buff;
557 page.flag= 0;
558
559 nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
560 /* Store pointer to prev page if nod */
561 _ma_kpointer(info, page.buff + share->keypage_header, *root);
562 t_length= (*keyinfo->pack_key)(key, nod_flag, (uchar*) 0,
563 (uchar*) 0, (uchar*) 0, &s_temp);
564 page.size= share->keypage_header + t_length + nod_flag;
565
566 bzero(page.buff, share->keypage_header);
567 _ma_store_keynr(share, page.buff, keyinfo->key_nr);
568 if (nod_flag)
569 page.flag|= KEYPAGE_FLAG_ISNOD;
570 if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
571 page.flag|= KEYPAGE_FLAG_HAS_TRANSID;
572 (*keyinfo->store_key)(keyinfo, page.buff + share->keypage_header +
573 nod_flag, &s_temp);
574
575 /* Mark that info->buff was used */
576 info->keyread_buff_used= info->page_changed= 1;
577 if ((page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
578 HA_OFFSET_ERROR)
579 DBUG_RETURN(1);
580 *root= page.pos;
581
582 page_store_info(share, &page);
583
584 /*
585 Clear unitialized part of page to avoid valgrind/purify warnings
586 and to get a clean page that is easier to compress and compare with
587 pages generated with redo
588 */
589 bzero(page.buff + page.size, share->block_size - page.size);
590
591 if (share->now_transactional && _ma_log_new(&page, 1))
592 res= 1;
593
594 if (_ma_write_keypage(&page, page_link->write_lock,
595 PAGECACHE_PRIORITY_HIGH))
596 res= 1;
597
598 DBUG_RETURN(res);
599} /* _ma_enlarge_root */
600
601
602/*
603 Search after a position for a key and store it there
604
605 TODO:
606 Change this to use pagecache directly instead of creating a copy
607 of the page. To do this, we must however change write-key-on-page
608 algorithm to not overwrite the buffer but instead store any overflow
609 key in a separate buffer.
610
611 @return
612 @retval -1 error
613 @retval 0 ok
614 @retval > 0 Key should be stored in higher tree
615*/
616
617static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
618 my_off_t page_pos,
619 MARIA_PAGE *father_page, uchar *father_keypos,
620 my_bool insert_last)
621{
622 int error,flag;
623 uchar *temp_buff,*keypos;
624 uchar keybuff[MARIA_MAX_KEY_BUFF];
625 my_bool was_last_key;
626 my_off_t next_page, dup_key_pos;
627 MARIA_SHARE *share= info->s;
628 MARIA_KEYDEF *keyinfo= key->keyinfo;
629 MARIA_PAGE page;
630 DBUG_ENTER("w_search");
631 DBUG_PRINT("enter", ("page: %lu", (ulong) (page_pos/keyinfo->block_length)));
632
633 if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
634 MARIA_MAX_KEY_BUFF*2)))
635 DBUG_RETURN(-1);
636 if (_ma_fetch_keypage(&page, info, keyinfo, page_pos, PAGECACHE_LOCK_WRITE,
637 DFLT_INIT_HITS, temp_buff, 0))
638 goto err;
639
640 flag= (*keyinfo->bin_search)(key, &page, comp_flag, &keypos,
641 keybuff, &was_last_key);
642 if (flag == 0)
643 {
644 MARIA_KEY tmp_key;
645 /* get position to record with duplicated key */
646
647 tmp_key.keyinfo= keyinfo;
648 tmp_key.data= keybuff;
649
650 if ((*keyinfo->get_key)(&tmp_key, page.flag, page.node, &keypos))
651 dup_key_pos= _ma_row_pos_from_key(&tmp_key);
652 else
653 dup_key_pos= HA_OFFSET_ERROR;
654
655 if (keyinfo->flag & HA_FULLTEXT)
656 {
657 uint off;
658 int subkeys;
659
660 get_key_full_length_rdonly(off, keybuff);
661 subkeys=ft_sintXkorr(keybuff+off);
662 comp_flag=SEARCH_SAME;
663 if (subkeys >= 0)
664 {
665 /* normal word, one-level tree structure */
666 flag=(*keyinfo->bin_search)(key, &page, comp_flag,
667 &keypos, keybuff, &was_last_key);
668 }
669 else
670 {
671 /* popular word. two-level tree. going down */
672 my_off_t root=dup_key_pos;
673 keyinfo= &share->ft2_keyinfo;
674 get_key_full_length_rdonly(off, key);
675 key+=off;
676 /* we'll modify key entry 'in vivo' */
677 keypos-= keyinfo->keylength + page.node;
678 error= _ma_ck_real_write_btree(info, key, &root, comp_flag);
679 _ma_dpointer(share, keypos+HA_FT_WLEN, root);
680 subkeys--; /* should there be underflow protection ? */
681 DBUG_ASSERT(subkeys < 0);
682 ft_intXstore(keypos, subkeys);
683 if (!error)
684 {
685 page_mark_changed(info, &page);
686 if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
687 DFLT_INIT_HITS))
688 goto err;
689 }
690 my_afree(temp_buff);
691 DBUG_RETURN(error);
692 }
693 }
694 else /* not HA_FULLTEXT, normal HA_NOSAME key */
695 {
696 /*
697 TODO
698 When the index will support true versioning - with multiple
699 identical values in the UNIQUE index, invisible to each other -
700 the following should be changed to "continue inserting keys, at the
701 end (of the row or statement) wait". We need to wait on *all*
702 unique conflicts at once, not one-at-a-time, because we need to
703 know all blockers in advance, otherwise we'll have incomplete wait-for
704 graph.
705 */
706 /*
707 transaction that has inserted the conflicting key may be in progress.
708 the caller will wait for it to be committed or aborted.
709 */
710 info->dup_key_trid= _ma_trid_from_key(&tmp_key);
711 info->dup_key_pos= dup_key_pos;
712 my_errno= HA_ERR_FOUND_DUPP_KEY;
713 DBUG_PRINT("warning",
714 ("Duplicate key. dup_key_trid: %lu pos %lu visible: %d",
715 (ulong) info->dup_key_trid,
716 (ulong) info->dup_key_pos,
717 info->trn ? trnman_can_read_from(info->trn,
718 info->dup_key_trid) : 2));
719 goto err;
720 }
721 }
722 if (flag == MARIA_FOUND_WRONG_KEY)
723 goto err;
724 if (!was_last_key)
725 insert_last=0;
726 next_page= _ma_kpos(page.node, keypos);
727 if (next_page == HA_OFFSET_ERROR ||
728 (error= w_search(info, comp_flag, key, next_page,
729 &page, keypos, insert_last)) > 0)
730 {
731 error= _ma_insert(info, key, &page, keypos, keybuff,
732 father_page, father_keypos, insert_last);
733 if (error < 0)
734 goto err;
735 page_mark_changed(info, &page);
736 if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
737 DFLT_INIT_HITS))
738 goto err;
739 }
740 my_afree(temp_buff);
741 DBUG_RETURN(error);
742err:
743 my_afree(temp_buff);
744 DBUG_PRINT("exit",("Error: %d",my_errno));
745 DBUG_RETURN(-1);
746} /* w_search */
747
748
749/*
750 Insert new key.
751
752 SYNOPSIS
753 _ma_insert()
754 info Open table information.
755 keyinfo Key definition information.
756 key New key
757 anc_page Key page (beginning)
758 key_pos Position in key page where to insert.
759 key_buff Copy of previous key if keys where packed.
760 father_page position of parent key page in file.
761 father_key_pos position in parent key page for balancing.
762 insert_last If to append at end of page.
763
764 DESCRIPTION
765 Insert new key at right of key_pos.
766 Note that caller must save anc_buff
767
768 This function writes log records for all changed pages
769 (Including anc_buff and father page)
770
771 RETURN
772 < 0 Error.
773 0 OK
774 1 If key contains key to upper level (from balance page)
775 2 If key contains key to upper level (from split space)
776*/
777
778int _ma_insert(register MARIA_HA *info, MARIA_KEY *key,
779 MARIA_PAGE *anc_page, uchar *key_pos, uchar *key_buff,
780 MARIA_PAGE *father_page, uchar *father_key_pos,
781 my_bool insert_last)
782{
783 uint a_length, nod_flag, org_anc_length;
784 int t_length;
785 uchar *endpos, *prev_key, *anc_buff;
786 MARIA_KEY_PARAM s_temp;
787 MARIA_SHARE *share= info->s;
788 MARIA_KEYDEF *keyinfo= key->keyinfo;
789 DBUG_ENTER("_ma_insert");
790 DBUG_PRINT("enter",("key_pos:%p", key_pos));
791 DBUG_EXECUTE("key", _ma_print_key(DBUG_FILE, key););
792
793 /*
794 Note that anc_page->size can be bigger then block_size in case of
795 delete key that caused increase of page length
796 */
797 org_anc_length= a_length= anc_page->size;
798 nod_flag= anc_page->node;
799
800 anc_buff= anc_page->buff;
801 endpos= anc_buff+ a_length;
802 prev_key= (key_pos == anc_buff + share->keypage_header + nod_flag ?
803 (uchar*) 0 : key_buff);
804 t_length= (*keyinfo->pack_key)(key, nod_flag,
805 (key_pos == endpos ? (uchar*) 0 : key_pos),
806 prev_key, prev_key, &s_temp);
807#ifndef DBUG_OFF
808 if (prev_key && (keyinfo->flag & (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
809 {
810 DBUG_DUMP("prev_key", prev_key, _ma_keylength(keyinfo,prev_key));
811 }
812 if (keyinfo->flag & HA_PACK_KEY)
813 {
814 DBUG_PRINT("test",("t_length: %d ref_len: %d",
815 t_length,s_temp.ref_length));
816 DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: %p",
817 s_temp.n_ref_length, s_temp.n_length, s_temp.key));
818 }
819#endif
820 if (t_length > 0)
821 {
822 if (t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE)
823 {
824 _ma_set_fatal_error(share, HA_ERR_CRASHED);
825 DBUG_RETURN(-1);
826 }
827 bmove_upp(endpos+t_length, endpos, (uint) (endpos-key_pos));
828 }
829 else
830 {
831 if (-t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE)
832 {
833 _ma_set_fatal_error(share, HA_ERR_CRASHED);
834 DBUG_RETURN(-1);
835 }
836 bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
837 }
838 (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
839 a_length+=t_length;
840
841 if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID))
842 {
843 _ma_mark_page_with_transid(share, anc_page);
844 }
845 anc_page->size= a_length;
846 page_store_size(share, anc_page);
847
848 /*
849 Check if the new key fits totally into the the page
850 (anc_buff is big enough to contain a full page + one key)
851 */
852 if (a_length <= share->max_index_block_size)
853 {
854 if (share->max_index_block_size - a_length < 32 &&
855 (keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
856 share->base.key_reflength <= share->rec_reflength &&
857 share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
858 {
859 /*
860 Normal word. One-level tree. Page is almost full.
861 Let's consider converting.
862 We'll compare 'key' and the first key at anc_buff
863 */
864 const uchar *a= key->data;
865 const uchar *b= anc_buff + share->keypage_header + nod_flag;
866 uint alen, blen, ft2len= share->ft2_keyinfo.keylength;
867 /* the very first key on the page is always unpacked */
868 DBUG_ASSERT((*b & 128) == 0);
869#if HA_FT_MAXLEN >= 127
870 blen= mi_uint2korr(b); b+=2;
871 When you enable this code, as part of the MyISAM->Maria merge of
872ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
873 restore ft2 functionality, fix bugs.
874 Then this will enable two-level fulltext index, which is not totally
875 recoverable yet.
876 So remove this text and inform Guilhem so that he fixes the issue.
877#else
878 blen= *b++;
879#endif
880 get_key_length(alen,a);
881 DBUG_ASSERT(info->ft1_to_ft2==0);
882 if (alen == blen &&
883 ha_compare_text(keyinfo->seg->charset, a, alen,
884 b, blen, 0) == 0)
885 {
886 /* Yup. converting */
887 info->ft1_to_ft2=(DYNAMIC_ARRAY *)
888 my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
889 my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50, MYF(0));
890
891 /*
892 Now, adding all keys from the page to dynarray
893 if the page is a leaf (if not keys will be deleted later)
894 */
895 if (!nod_flag)
896 {
897 /*
898 Let's leave the first key on the page, though, because
899 we cannot easily dispatch an empty page here
900 */
901 b+=blen+ft2len+2;
902 for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
903 insert_dynamic(info->ft1_to_ft2, b);
904
905 /* fixing the page's length - it contains only one key now */
906 anc_page->size= share->keypage_header + blen + ft2len + 2;
907 page_store_size(share, anc_page);
908 }
909 /* the rest will be done when we're back from recursion */
910 }
911 }
912 else
913 {
914 if (share->now_transactional &&
915 _ma_log_add(anc_page, org_anc_length,
916 key_pos, s_temp.changed_length, t_length, 1,
917 KEY_OP_DEBUG_LOG_ADD_1))
918 DBUG_RETURN(-1);
919 }
920 DBUG_RETURN(0); /* There is room on page */
921 }
922 /* Page is full */
923 if (nod_flag)
924 insert_last=0;
925 /*
926 TODO:
927 Remove 'born_transactional' here.
928 The only reason for having it here is that the current
929 _ma_balance_page_ can't handle variable length keys.
930 */
931 if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
932 father_page && !insert_last && !info->quick_mode &&
933 !info->s->base.born_transactional)
934 {
935 s_temp.key_pos= key_pos;
936 page_mark_changed(info, father_page);
937 DBUG_RETURN(_ma_balance_page(info, keyinfo, key, anc_page,
938 father_page, father_key_pos,
939 &s_temp));
940 }
941 DBUG_RETURN(_ma_split_page(info, key, anc_page,
942 MY_MIN(org_anc_length,
943 info->s->max_index_block_size),
944 key_pos, s_temp.changed_length, t_length,
945 key_buff, insert_last));
946} /* _ma_insert */
947
948
949/**
950 @brief split a full page in two and assign emerging item to key
951
952 @fn _ma_split_page()
953 info Maria handler
954 keyinfo Key handler
955 key Buffer for middle key
956 split_page Page that should be split
957 org_split_length Original length of split_page before key was inserted
958 inserted_key_pos Address in buffer where key was inserted
959 changed_length Number of bytes changed at 'inserted_key_pos'
960 move_length Number of bytes buffer was moved when key was inserted
961 key_buff Key buffer to use for temporary storage of key
962 insert_last_key If we are insert key on rightmost key page
963
964 @note
965 split_buff is not stored on disk (caller has to do this)
966
967 @return
968 @retval 2 ok (Middle key up from _ma_insert())
969 @retval -1 error
970*/
971
972int _ma_split_page(MARIA_HA *info, MARIA_KEY *key, MARIA_PAGE *split_page,
973 uint org_split_length,
974 uchar *inserted_key_pos, uint changed_length,
975 int move_length,
976 uchar *key_buff, my_bool insert_last_key)
977{
978 uint keynr;
979 uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
980 uint page_length, split_length, page_flag;
981 uchar *key_pos, *pos, *UNINIT_VAR(after_key);
982 MARIA_KEY_PARAM s_temp;
983 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
984 MARIA_SHARE *share= info->s;
985 MARIA_KEYDEF *keyinfo= key->keyinfo;
986 MARIA_KEY tmp_key;
987 MARIA_PAGE new_page;
988 int res;
989 DBUG_ENTER("_ma_split_page");
990
991 DBUG_DUMP("buff", split_page->buff, split_page->size);
992
993 info->page_changed=1; /* Info->buff is used */
994 info->keyread_buff_used=1;
995 page_flag= split_page->flag;
996 nod_flag= split_page->node;
997 key_ref_length= share->keypage_header + nod_flag;
998
999 new_page.info= info;
1000 new_page.buff= info->buff;
1001 new_page.keyinfo= keyinfo;
1002
1003 tmp_key.data= key_buff;
1004 tmp_key.keyinfo= keyinfo;
1005 if (insert_last_key)
1006 key_pos= _ma_find_last_pos(&tmp_key, split_page, &after_key);
1007 else
1008 key_pos= _ma_find_half_pos(&tmp_key, split_page, &after_key);
1009 if (!key_pos)
1010 DBUG_RETURN(-1);
1011
1012 key_length= tmp_key.data_length + tmp_key.ref_length;
1013 split_length= (uint) (key_pos - split_page->buff);
1014 a_length= split_page->size;
1015 split_page->size= split_length;
1016 page_store_size(share, split_page);
1017
1018 key_pos=after_key;
1019 if (nod_flag)
1020 {
1021 DBUG_PRINT("test",("Splitting nod"));
1022 pos=key_pos-nod_flag;
1023 memcpy(new_page.buff + share->keypage_header, pos, (size_t) nod_flag);
1024 }
1025
1026 /* Move middle item to key and pointer to new page */
1027 if ((new_page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) ==
1028 HA_OFFSET_ERROR)
1029 DBUG_RETURN(-1);
1030
1031 _ma_copy_key(key, &tmp_key);
1032 _ma_kpointer(info, key->data + key_length, new_page.pos);
1033
1034 /* Store new page */
1035 if (!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &key_pos))
1036 DBUG_RETURN(-1);
1037
1038 t_length=(*keyinfo->pack_key)(&tmp_key, nod_flag, (uchar *) 0,
1039 (uchar*) 0, (uchar*) 0, &s_temp);
1040 length=(uint) ((split_page->buff + a_length) - key_pos);
1041 memcpy(new_page.buff + key_ref_length + t_length, key_pos,
1042 (size_t) length);
1043 (*keyinfo->store_key)(keyinfo,new_page.buff+key_ref_length,&s_temp);
1044 page_length= length + t_length + key_ref_length;
1045
1046 bzero(new_page.buff, share->keypage_header);
1047 /* Copy KEYFLAG_FLAG_ISNODE and KEYPAGE_FLAG_HAS_TRANSID from parent page */
1048 new_page.flag= page_flag;
1049 new_page.size= page_length;
1050 page_store_info(share, &new_page);
1051
1052 /* Copy key number */
1053 keynr= _ma_get_keynr(share, split_page->buff);
1054 _ma_store_keynr(share, new_page.buff, keynr);
1055
1056 res= 2; /* Middle key up */
1057 if (share->now_transactional && _ma_log_new(&new_page, 0))
1058 res= -1;
1059
1060 /*
1061 Clear unitialized part of page to avoid valgrind/purify warnings
1062 and to get a clean page that is easier to compress and compare with
1063 pages generated with redo
1064 */
1065 bzero(new_page.buff + page_length, share->block_size - page_length);
1066
1067 if (_ma_write_keypage(&new_page, page_link->write_lock,
1068 DFLT_INIT_HITS))
1069 res= -1;
1070
1071 /* Save changes to split pages */
1072 if (share->now_transactional &&
1073 _ma_log_split(split_page, org_split_length, split_length,
1074 inserted_key_pos, changed_length, move_length,
1075 KEY_OP_NONE, (uchar*) 0, 0, 0))
1076 res= -1;
1077
1078 DBUG_DUMP_KEY("middle_key", key);
1079 DBUG_RETURN(res);
1080} /* _ma_split_page */
1081
1082
1083/*
1084 Calculate how to much to move to split a page in two
1085
1086 Returns pointer to start of key.
1087 key will contain the key.
1088 after_key will contain the position to where the next key starts
1089*/
1090
1091uchar *_ma_find_half_pos(MARIA_KEY *key, MARIA_PAGE *ma_page,
1092 uchar **after_key)
1093{
1094 uint keys, length, key_ref_length, page_flag, nod_flag;
1095 uchar *page, *end, *lastpos;
1096 MARIA_HA *info= ma_page->info;
1097 MARIA_SHARE *share= info->s;
1098 MARIA_KEYDEF *keyinfo= key->keyinfo;
1099 DBUG_ENTER("_ma_find_half_pos");
1100
1101 nod_flag= ma_page->node;
1102 key_ref_length= share->keypage_header + nod_flag;
1103 page_flag= ma_page->flag;
1104 length= ma_page->size - key_ref_length;
1105 page= ma_page->buff+ key_ref_length; /* Point to first key */
1106
1107 if (!(keyinfo->flag &
1108 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1109 HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1110 {
1111 key_ref_length= keyinfo->keylength+nod_flag;
1112 key->data_length= keyinfo->keylength - info->s->rec_reflength;
1113 key->ref_length= info->s->rec_reflength;
1114 key->flag= 0;
1115 keys=length/(key_ref_length*2);
1116 end=page+keys*key_ref_length;
1117 *after_key=end+key_ref_length;
1118 memcpy(key->data, end, key_ref_length);
1119 DBUG_RETURN(end);
1120 }
1121
1122 end=page+length/2-key_ref_length; /* This is aprox. half */
1123 key->data[0]= 0; /* Safety */
1124 do
1125 {
1126 lastpos=page;
1127 if (!(length= (*keyinfo->get_key)(key, page_flag, nod_flag, &page)))
1128 DBUG_RETURN(0);
1129 } while (page < end);
1130 *after_key= page;
1131 DBUG_PRINT("exit",("returns: %p page: %p half: %p",
1132 lastpos, page, end));
1133 DBUG_RETURN(lastpos);
1134} /* _ma_find_half_pos */
1135
1136
1137/**
1138 Find second to last key on leaf page
1139
1140 @notes
1141 Used to split buffer at last key. In this case the next to last
1142 key will be moved to parent page and last key will be on it's own page.
1143
1144 @TODO
1145 Add one argument for 'last key value' to get_key so that one can
1146 do the loop without having to copy the found key the whole time
1147
1148 @return
1149 @retval Pointer to the start of the key before the last key
1150 @retval int_key will contain the last key
1151*/
1152
1153static uchar *_ma_find_last_pos(MARIA_KEY *int_key, MARIA_PAGE *ma_page,
1154 uchar **after_key)
1155{
1156 uint keys, length, key_ref_length, page_flag;
1157 uchar *page, *end, *lastpos, *prevpos;
1158 uchar key_buff[MARIA_MAX_KEY_BUFF];
1159 MARIA_HA *info= ma_page->info;
1160 MARIA_SHARE *share= info->s;
1161 MARIA_KEYDEF *keyinfo= int_key->keyinfo;
1162 MARIA_KEY tmp_key;
1163 DBUG_ENTER("_ma_find_last_pos");
1164
1165 key_ref_length= share->keypage_header;
1166 page_flag= ma_page->flag;
1167 length= ma_page->size - key_ref_length;
1168 page= ma_page->buff + key_ref_length;
1169
1170 if (!(keyinfo->flag &
1171 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1172 HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1173 {
1174 keys= length / keyinfo->keylength - 2;
1175 length= keyinfo->keylength;
1176 int_key->data_length= length - info->s->rec_reflength;
1177 int_key->ref_length= info->s->rec_reflength;
1178 int_key->flag= 0;
1179 end=page+keys*length;
1180 *after_key=end+length;
1181 memcpy(int_key->data, end, length);
1182 DBUG_RETURN(end);
1183 }
1184
1185 end=page+length-key_ref_length;
1186 lastpos=page;
1187 tmp_key.data= key_buff;
1188 tmp_key.keyinfo= int_key->keyinfo;
1189 key_buff[0]= 0; /* Safety */
1190
1191 /* We know that there are at least 2 keys on the page */
1192
1193 if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
1194 {
1195 _ma_set_fatal_error(share, HA_ERR_CRASHED);
1196 DBUG_RETURN(0);
1197 }
1198
1199 do
1200 {
1201 prevpos=lastpos; lastpos=page;
1202 int_key->data_length= tmp_key.data_length;
1203 int_key->ref_length= tmp_key.ref_length;
1204 int_key->flag= tmp_key.flag;
1205 memcpy(int_key->data, key_buff, length); /* previous key */
1206 if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
1207 {
1208 _ma_set_fatal_error(share, HA_ERR_CRASHED);
1209 DBUG_RETURN(0);
1210 }
1211 } while (page < end);
1212
1213 *after_key=lastpos;
1214 DBUG_PRINT("exit",("returns: %p page: %p end: %p",
1215 prevpos,page,end));
1216 DBUG_RETURN(prevpos);
1217} /* _ma_find_last_pos */
1218
1219
1220/**
1221 @brief Balance page with static size keys with page on right/left
1222
1223 @param key Middle key will be stored here
1224
1225 @notes
1226 Father_buff will always be changed
1227 Caller must handle saving of curr_buff
1228
1229 @return
1230 @retval 0 Balance was done (father buff is saved)
1231 @retval 1 Middle key up (father buff is not saved)
1232 @retval -1 Error
1233*/
1234
1235static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
1236 MARIA_KEY *key, MARIA_PAGE *curr_page,
1237 MARIA_PAGE *father_page,
1238 uchar *father_key_pos, MARIA_KEY_PARAM *s_temp)
1239{
1240 MARIA_PINNED_PAGE tmp_page_link, *new_page_link= &tmp_page_link;
1241 MARIA_SHARE *share= info->s;
1242 my_bool right;
1243 uint k_length,father_length,father_keylength,nod_flag,curr_keylength;
1244 uint right_length,left_length,new_right_length,new_left_length,extra_length;
1245 uint keys, tmp_length, extra_buff_length;
1246 uchar *pos, *extra_buff, *parting_key;
1247 uchar tmp_part_key[MARIA_MAX_KEY_BUFF];
1248 MARIA_PAGE next_page, extra_page, *left_page, *right_page;
1249 DBUG_ENTER("_ma_balance_page");
1250
1251 k_length= keyinfo->keylength;
1252 father_length= father_page->size;
1253 father_keylength= k_length + share->base.key_reflength;
1254 nod_flag= curr_page->node;
1255 curr_keylength= k_length+nod_flag;
1256 info->page_changed=1;
1257
1258 if ((father_key_pos != father_page->buff+father_length &&
1259 (info->state->records & 1)) ||
1260 father_key_pos == father_page->buff+ share->keypage_header +
1261 share->base.key_reflength)
1262 {
1263 right=1;
1264 next_page.pos= _ma_kpos(share->base.key_reflength,
1265 father_key_pos+father_keylength);
1266 left_page= curr_page;
1267 right_page= &next_page;
1268 DBUG_PRINT("info", ("use right page: %lu",
1269 (ulong) (next_page.pos / keyinfo->block_length)));
1270 }
1271 else
1272 {
1273 right=0;
1274 father_key_pos-=father_keylength;
1275 next_page.pos= _ma_kpos(share->base.key_reflength,father_key_pos);
1276 left_page= &next_page;
1277 right_page= curr_page;
1278 DBUG_PRINT("info", ("use left page: %lu",
1279 (ulong) (next_page.pos / keyinfo->block_length)));
1280 } /* father_key_pos ptr to parting key */
1281
1282 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1283 PAGECACHE_LOCK_WRITE,
1284 DFLT_INIT_HITS, info->buff, 0))
1285 goto err;
1286 page_mark_changed(info, &next_page);
1287 DBUG_DUMP("next", next_page.buff, next_page.size);
1288
1289 /* Test if there is room to share keys */
1290 left_length= left_page->size;
1291 right_length= right_page->size;
1292 keys= ((left_length+right_length-share->keypage_header*2-nod_flag*2)/
1293 curr_keylength);
1294
1295 if ((right ? right_length : left_length) + curr_keylength <=
1296 share->max_index_block_size)
1297 {
1298 /* Enough space to hold all keys in the two buffers ; Balance bufferts */
1299 new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
1300 new_right_length=share->keypage_header+nod_flag+(((keys+1)/2)*
1301 curr_keylength);
1302 left_page->size= new_left_length;
1303 page_store_size(share, left_page);
1304 right_page->size= new_right_length;
1305 page_store_size(share, right_page);
1306
1307 DBUG_PRINT("info", ("left_length: %u -> %u right_length: %u -> %u",
1308 left_length, new_left_length,
1309 right_length, new_right_length));
1310 if (left_length < new_left_length)
1311 {
1312 uint length;
1313 DBUG_PRINT("info", ("move keys to end of buff"));
1314
1315 /* Move keys right_page -> left_page */
1316 pos= left_page->buff+left_length;
1317 memcpy(pos,father_key_pos, (size_t) k_length);
1318 memcpy(pos+k_length, right_page->buff + share->keypage_header,
1319 (size_t) (length=new_left_length - left_length - k_length));
1320 pos= right_page->buff + share->keypage_header + length;
1321 memcpy(father_key_pos, pos, (size_t) k_length);
1322 bmove(right_page->buff + share->keypage_header,
1323 pos + k_length, new_right_length - share->keypage_header);
1324
1325 if (share->now_transactional)
1326 {
1327 if (right)
1328 {
1329 /*
1330 Log changes to page on left
1331 The original page is on the left and stored in left_page->buff
1332 We have on the page the newly inserted key and data
1333 from buff added last on the page
1334 */
1335 if (_ma_log_split(curr_page,
1336 left_length - s_temp->move_length,
1337 new_left_length,
1338 s_temp->key_pos, s_temp->changed_length,
1339 s_temp->move_length,
1340 KEY_OP_ADD_SUFFIX,
1341 curr_page->buff + left_length,
1342 new_left_length - left_length,
1343 new_left_length - left_length+ k_length))
1344 goto err;
1345 /*
1346 Log changes to page on right
1347 This contains the original data with some keys deleted from
1348 start of page
1349 */
1350 if (_ma_log_prefix(&next_page, 0,
1351 ((int) new_right_length - (int) right_length),
1352 KEY_OP_DEBUG_LOG_PREFIX_3))
1353 goto err;
1354 }
1355 else
1356 {
1357 /*
1358 Log changes to page on right (the original page) which is in buff
1359 Data is removed from start of page
1360 The inserted key may be in buff or moved to curr_buff
1361 */
1362 if (_ma_log_del_prefix(curr_page,
1363 right_length - s_temp->changed_length,
1364 new_right_length,
1365 s_temp->key_pos, s_temp->changed_length,
1366 s_temp->move_length))
1367 goto err;
1368 /*
1369 Log changes to page on left, which has new data added last
1370 */
1371 if (_ma_log_suffix(&next_page, left_length, new_left_length))
1372 goto err;
1373 }
1374 }
1375 }
1376 else
1377 {
1378 uint length;
1379 DBUG_PRINT("info", ("move keys to start of right_page"));
1380
1381 bmove_upp(right_page->buff + new_right_length,
1382 right_page->buff + right_length,
1383 right_length - share->keypage_header);
1384 length= new_right_length -right_length - k_length;
1385 memcpy(right_page->buff + share->keypage_header + length, father_key_pos,
1386 (size_t) k_length);
1387 pos= left_page->buff + new_left_length;
1388 memcpy(father_key_pos, pos, (size_t) k_length);
1389 memcpy(right_page->buff + share->keypage_header, pos+k_length,
1390 (size_t) length);
1391
1392 if (share->now_transactional)
1393 {
1394 if (right)
1395 {
1396 /*
1397 Log changes to page on left
1398 The original page is on the left and stored in curr_buff
1399 The page is shortened from end and the key may be on the page
1400 */
1401 if (_ma_log_split(curr_page,
1402 left_length - s_temp->move_length,
1403 new_left_length,
1404 s_temp->key_pos, s_temp->changed_length,
1405 s_temp->move_length,
1406 KEY_OP_NONE, (uchar*) 0, 0, 0))
1407 goto err;
1408 /*
1409 Log changes to page on right
1410 This contains the original data, with some data from cur_buff
1411 added first
1412 */
1413 if (_ma_log_prefix(&next_page,
1414 (uint) (new_right_length - right_length),
1415 (int) (new_right_length - right_length),
1416 KEY_OP_DEBUG_LOG_PREFIX_4))
1417 goto err;
1418 }
1419 else
1420 {
1421 /*
1422 Log changes to page on right (the original page) which is in buff
1423 We have on the page the newly inserted key and data
1424 from buff added first on the page
1425 */
1426 uint diff_length= new_right_length - right_length;
1427 if (_ma_log_split(curr_page,
1428 left_length - s_temp->move_length,
1429 new_right_length,
1430 s_temp->key_pos + diff_length,
1431 s_temp->changed_length,
1432 s_temp->move_length,
1433 KEY_OP_ADD_PREFIX,
1434 curr_page->buff + share->keypage_header,
1435 diff_length, diff_length + k_length))
1436 goto err;
1437 /*
1438 Log changes to page on left, which is shortened from end
1439 */
1440 if (_ma_log_suffix(&next_page, left_length, new_left_length))
1441 goto err;
1442 }
1443 }
1444 }
1445
1446 /* Log changes to father (one level up) page */
1447
1448 if (share->now_transactional &&
1449 _ma_log_change(father_page, father_key_pos, k_length,
1450 KEY_OP_DEBUG_FATHER_CHANGED_1))
1451 goto err;
1452
1453 /*
1454 next_page_link->changed is marked as true above and fathers
1455 page_link->changed is marked as true in caller
1456 */
1457 if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
1458 DFLT_INIT_HITS) ||
1459 _ma_write_keypage(father_page,
1460 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1461 goto err;
1462 DBUG_RETURN(0);
1463 }
1464
1465 /* left_page and right_page are full, lets split and make new nod */
1466
1467 extra_buff= info->buff+share->base.max_key_block_length;
1468 new_left_length= new_right_length= (share->keypage_header + nod_flag +
1469 (keys+1) / 3 * curr_keylength);
1470 extra_page.info= info;
1471 extra_page.keyinfo= keyinfo;
1472 extra_page.buff= extra_buff;
1473
1474 /*
1475 5 is the minum number of keys we can have here. This comes from
1476 the fact that each full page can store at least 2 keys and in this case
1477 we have a 'split' key, ie 2+2+1 = 5
1478 */
1479 if (keys == 5) /* Too few keys to balance */
1480 new_left_length-=curr_keylength;
1481 extra_length= (nod_flag + left_length + right_length -
1482 new_left_length - new_right_length - curr_keylength);
1483 extra_buff_length= extra_length + share->keypage_header;
1484 DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
1485 left_length, right_length,
1486 new_left_length, new_right_length,
1487 extra_length));
1488
1489 left_page->size= new_left_length;
1490 page_store_size(share, left_page);
1491 right_page->size= new_right_length;
1492 page_store_size(share, right_page);
1493
1494 bzero(extra_buff, share->keypage_header);
1495 extra_page.flag= nod_flag ? KEYPAGE_FLAG_ISNOD : 0;
1496 extra_page.size= extra_buff_length;
1497 page_store_info(share, &extra_page);
1498
1499 /* Copy key number */
1500 _ma_store_keynr(share, extra_buff, keyinfo->key_nr);
1501
1502 /* move first largest keys to new page */
1503 pos= right_page->buff + right_length-extra_length;
1504 memcpy(extra_buff + share->keypage_header, pos, extra_length);
1505 /* Zero old data from buffer */
1506 bzero(extra_buff + extra_buff_length,
1507 share->block_size - extra_buff_length);
1508
1509 /* Save new parting key between buff and extra_buff */
1510 memcpy(tmp_part_key, pos-k_length,k_length);
1511 /* Make place for new keys */
1512 bmove_upp(right_page->buff + new_right_length, pos - k_length,
1513 right_length - extra_length - k_length - share->keypage_header);
1514 /* Copy keys from left page */
1515 pos= left_page->buff + new_left_length;
1516 memcpy(right_page->buff + share->keypage_header, pos + k_length,
1517 (size_t) (tmp_length= left_length - new_left_length - k_length));
1518 /* Copy old parting key */
1519 parting_key= right_page->buff + share->keypage_header + tmp_length;
1520 memcpy(parting_key, father_key_pos, (size_t) k_length);
1521
1522 /* Move new parting keys up to caller */
1523 memcpy((right ? key->data : father_key_pos),pos,(size_t) k_length);
1524 memcpy((right ? father_key_pos : key->data),tmp_part_key, k_length);
1525
1526 if ((extra_page.pos= _ma_new(info, DFLT_INIT_HITS, &new_page_link))
1527 == HA_OFFSET_ERROR)
1528 goto err;
1529 _ma_kpointer(info,key->data+k_length, extra_page.pos);
1530 /* This is safe as long we are using not keys with transid */
1531 key->data_length= k_length - info->s->rec_reflength;
1532 key->ref_length= info->s->rec_reflength;
1533
1534 if (right)
1535 {
1536 /*
1537 Page order according to key values:
1538 orignal_page (curr_page = left_page), next_page (buff), extra_buff
1539
1540 Move page positions so that we store data in extra_page where
1541 next_page was and next_page will be stored at the new position
1542 */
1543 swap_variables(my_off_t, extra_page.pos, next_page.pos);
1544 }
1545
1546 if (share->now_transactional)
1547 {
1548 if (right)
1549 {
1550 /*
1551 left_page is shortened,
1552 right_page is getting new keys at start and shortened from end.
1553 extra_page is new page
1554
1555 Note that extra_page (largest key parts) will be stored at the
1556 place of the original 'right' page (next_page) and right page
1557 will be stored at the new page position
1558
1559 This makes the log entries smaller as right_page contains all
1560 data to generate the data extra_buff
1561 */
1562
1563 /*
1564 Log changes to page on left (page shortened page at end)
1565 */
1566 if (_ma_log_split(curr_page,
1567 left_length - s_temp->move_length, new_left_length,
1568 s_temp->key_pos, s_temp->changed_length,
1569 s_temp->move_length,
1570 KEY_OP_NONE, (uchar*) 0, 0, 0))
1571 goto err;
1572 /*
1573 Log changes to right page (stored at next page)
1574 This contains the last 'extra_buff' from 'buff'
1575 */
1576 if (_ma_log_prefix(&extra_page,
1577 0, (int) (extra_buff_length - right_length),
1578 KEY_OP_DEBUG_LOG_PREFIX_5))
1579 goto err;
1580
1581 /*
1582 Log changes to middle page, which is stored at the new page
1583 position
1584 */
1585 if (_ma_log_new(&next_page, 0))
1586 goto err;
1587 }
1588 else
1589 {
1590 /*
1591 Log changes to page on right (the original page) which is in buff
1592 This contains the original data, with some data from curr_buff
1593 added first and shortened at end
1594 */
1595 int data_added_first= left_length - new_left_length;
1596 if (_ma_log_key_middle(right_page,
1597 new_right_length,
1598 data_added_first,
1599 data_added_first,
1600 extra_length,
1601 s_temp->key_pos,
1602 s_temp->changed_length,
1603 s_temp->move_length))
1604 goto err;
1605
1606 /* Log changes to page on left, which is shortened from end */
1607 if (_ma_log_suffix(left_page, left_length, new_left_length))
1608 goto err;
1609
1610 /* Log change to rightmost (new) page */
1611 if (_ma_log_new(&extra_page, 0))
1612 goto err;
1613 }
1614
1615 /* Log changes to father (one level up) page */
1616 if (share->now_transactional &&
1617 _ma_log_change(father_page, father_key_pos, k_length,
1618 KEY_OP_DEBUG_FATHER_CHANGED_2))
1619 goto err;
1620 }
1621
1622 if (_ma_write_keypage(&next_page,
1623 (right ? new_page_link->write_lock :
1624 PAGECACHE_LOCK_LEFT_WRITELOCKED),
1625 DFLT_INIT_HITS) ||
1626 _ma_write_keypage(&extra_page,
1627 (!right ? new_page_link->write_lock :
1628 PAGECACHE_LOCK_LEFT_WRITELOCKED),
1629 DFLT_INIT_HITS))
1630 goto err;
1631
1632 DBUG_RETURN(1); /* Middle key up */
1633
1634err:
1635 DBUG_RETURN(-1);
1636} /* _ma_balance_page */
1637
1638
1639/**********************************************************************
1640 * Bulk insert code *
1641 **********************************************************************/
1642
1643typedef struct {
1644 MARIA_HA *info;
1645 uint keynr;
1646} bulk_insert_param;
1647
1648
1649static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key)
1650{
1651 my_bool error;
1652 uint keynr= key->keyinfo->key_nr;
1653 DBUG_ENTER("_ma_ck_write_tree");
1654
1655 /* Store ref_length as this is always constant */
1656 info->bulk_insert_ref_length= key->ref_length;
1657 error= tree_insert(&info->bulk_insert[keynr], key->data,
1658 key->data_length + key->ref_length,
1659 info->bulk_insert[keynr].custom_arg) == 0;
1660 DBUG_RETURN(error);
1661} /* _ma_ck_write_tree */
1662
1663
1664/* typeof(_ma_keys_compare)=qsort_cmp2 */
1665
1666static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
1667{
1668 uint not_used[2];
1669 return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
1670 key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
1671 not_used);
1672}
1673
1674
1675static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
1676{
1677 /*
1678 Probably I can use info->lastkey here, but I'm not sure,
1679 and to be safe I'd better use local lastkey.
1680 */
1681 MARIA_SHARE *share= param->info->s;
1682 uchar lastkey[MARIA_MAX_KEY_BUFF];
1683 uint keylen;
1684 MARIA_KEYDEF *keyinfo= share->keyinfo + param->keynr;
1685 MARIA_KEY tmp_key;
1686
1687 switch (mode) {
1688 case free_init:
1689 if (share->lock_key_trees)
1690 {
1691 mysql_rwlock_wrlock(&keyinfo->root_lock);
1692 keyinfo->version++;
1693 }
1694 return 0;
1695 case free_free:
1696 /* Note: keylen doesn't contain transid lengths */
1697 keylen= _ma_keylength(keyinfo, key);
1698 tmp_key.data= lastkey;
1699 tmp_key.keyinfo= keyinfo;
1700 tmp_key.data_length= keylen - share->rec_reflength;
1701 tmp_key.ref_length= param->info->bulk_insert_ref_length;
1702 tmp_key.flag= (param->info->bulk_insert_ref_length ==
1703 share->rec_reflength ? 0 : SEARCH_USER_KEY_HAS_TRANSID);
1704 /*
1705 We have to copy key as ma_ck_write_btree may need the buffer for
1706 copying middle key up if tree is growing
1707 */
1708 memcpy(lastkey, key, tmp_key.data_length + tmp_key.ref_length);
1709 return _ma_ck_write_btree(param->info, &tmp_key);
1710 case free_end:
1711 if (share->lock_key_trees)
1712 mysql_rwlock_unlock(&keyinfo->root_lock);
1713 return 0;
1714 }
1715 return 1;
1716}
1717
1718
1719int maria_init_bulk_insert(MARIA_HA *info, size_t cache_size, ha_rows rows)
1720{
1721 MARIA_SHARE *share= info->s;
1722 MARIA_KEYDEF *key=share->keyinfo;
1723 bulk_insert_param *params;
1724 uint i, num_keys, total_keylength;
1725 ulonglong key_map;
1726 DBUG_ENTER("_ma_init_bulk_insert");
1727 DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size));
1728
1729 DBUG_ASSERT(!info->bulk_insert &&
1730 (!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT));
1731
1732 maria_clear_all_keys_active(key_map);
1733 for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
1734 {
1735 if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
1736 maria_is_key_active(share->state.key_map, i))
1737 {
1738 num_keys++;
1739 maria_set_key_active(key_map, i);
1740 total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
1741 }
1742 }
1743
1744 if (num_keys==0 ||
1745 num_keys * (size_t) MARIA_MIN_SIZE_BULK_INSERT_TREE > cache_size)
1746 DBUG_RETURN(0);
1747
1748 if (rows && rows*total_keylength < cache_size)
1749 cache_size= (size_t)rows;
1750 else
1751 cache_size/=total_keylength*16;
1752
1753 info->bulk_insert=(TREE *)
1754 my_malloc((sizeof(TREE)*share->base.keys+
1755 sizeof(bulk_insert_param)*num_keys),MYF(0));
1756
1757 if (!info->bulk_insert)
1758 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1759
1760 params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1761 for (i=0 ; i < share->base.keys ; i++)
1762 {
1763 if (maria_is_key_active(key_map, i))
1764 {
1765 params->info=info;
1766 params->keynr=i;
1767 /* Only allocate a 16'th of the buffer at a time */
1768 init_tree(&info->bulk_insert[i],
1769 cache_size * key[i].maxlength,
1770 cache_size * key[i].maxlength, 0,
1771 (qsort_cmp2)keys_compare,
1772 (tree_element_free) keys_free, (void *)params++, MYF(0));
1773 }
1774 else
1775 info->bulk_insert[i].root=0;
1776 }
1777
1778 DBUG_RETURN(0);
1779}
1780
1781void maria_flush_bulk_insert(MARIA_HA *info, uint inx)
1782{
1783 if (info->bulk_insert)
1784 {
1785 if (is_tree_inited(&info->bulk_insert[inx]))
1786 reset_tree(&info->bulk_insert[inx]);
1787 }
1788}
1789
1790
1791int maria_end_bulk_insert(MARIA_HA *info, my_bool abort)
1792{
1793 int first_error= 0;
1794 DBUG_ENTER("maria_end_bulk_insert");
1795 if (info->bulk_insert)
1796 {
1797 uint i;
1798 for (i=0 ; i < info->s->base.keys ; i++)
1799 {
1800 if (is_tree_inited(&info->bulk_insert[i]))
1801 {
1802 int error;
1803 if (info->s->deleting)
1804 reset_free_element(&info->bulk_insert[i]);
1805 if ((error= delete_tree(&info->bulk_insert[i], abort)))
1806 {
1807 first_error= first_error ? first_error : error;
1808 abort= 1;
1809 }
1810 }
1811 }
1812 my_free(info->bulk_insert);
1813 info->bulk_insert= 0;
1814 }
1815 DBUG_RETURN(first_error);
1816}
1817
1818
1819/****************************************************************************
1820 Dedicated functions that generate log entries
1821****************************************************************************/
1822
1823
1824int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEY *key,
1825 my_off_t *root, my_off_t new_root, LSN *res_lsn)
1826{
1827 MARIA_SHARE *share= info->s;
1828 MARIA_KEYDEF *keyinfo= key->keyinfo;
1829 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1830 KEY_NR_STORE_SIZE];
1831 const uchar *key_value;
1832 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1833 struct st_msg_to_write_hook_for_undo_key msg;
1834 uint key_length;
1835
1836 /* Save if we need to write a clr record */
1837 lsn_store(log_data, info->trn->undo_lsn);
1838 key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
1839 keyinfo->key_nr);
1840 key_length= key->data_length + key->ref_length;
1841 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1842 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1843 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1844 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
1845
1846 msg.root= root;
1847 msg.value= new_root;
1848 msg.auto_increment= 0;
1849 key_value= key->data;
1850 if (share->base.auto_key == ((uint) keyinfo->key_nr + 1))
1851 {
1852 const HA_KEYSEG *keyseg= keyinfo->seg;
1853 uchar reversed[MARIA_MAX_KEY_BUFF];
1854 if (keyseg->flag & HA_SWAP_KEY)
1855 {
1856 /* We put key from log record to "data record" packing format... */
1857 const uchar *key_ptr= key->data, *key_end= key->data + keyseg->length;
1858 uchar *to= reversed + keyseg->length;
1859 do
1860 {
1861 *--to= *key_ptr++;
1862 } while (key_ptr != key_end);
1863 key_value= to;
1864 }
1865 /* ... so that we can read it with: */
1866 msg.auto_increment=
1867 ma_retrieve_auto_increment(key_value, keyseg->type);
1868 /* and write_hook_for_undo_key_insert() will pick this. */
1869 }
1870
1871 return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT,
1872 info->trn, info,
1873 (translog_size_t)
1874 log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1875 key_length,
1876 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1877 log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1878}
1879
1880
1881/**
1882 @brief Log creation of new page
1883
1884 @note
1885 We don't have to store the page_length into the log entry as we can
1886 calculate this from the length of the log entry
1887
1888 @retval 1 error
1889 @retval 0 ok
1890*/
1891
1892my_bool _ma_log_new(MARIA_PAGE *ma_page, my_bool root_page)
1893{
1894 LSN lsn;
1895 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE
1896 +1];
1897 uint page_length;
1898 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1899 MARIA_HA *info= ma_page->info;
1900 MARIA_SHARE *share= info->s;
1901 my_off_t page= ma_page->pos / share->block_size;
1902 DBUG_ENTER("_ma_log_new");
1903 DBUG_PRINT("enter", ("page: %lu", (ulong) page));
1904
1905 DBUG_ASSERT(share->now_transactional);
1906
1907 /* Store address of new root page */
1908 page_store(log_data + FILEID_STORE_SIZE, page);
1909
1910 /* Store link to next unused page */
1911 if (info->key_del_used == 2)
1912 page= 0; /* key_del not changed */
1913 else
1914 page= ((share->key_del_current == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1915 share->key_del_current / share->block_size);
1916
1917 page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
1918 key_nr_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE*2,
1919 ma_page->keyinfo->key_nr);
1920 log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE*2 + KEY_NR_STORE_SIZE]=
1921 (uchar) root_page;
1922
1923 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1924 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
1925
1926 page_length= ma_page->size - LSN_STORE_SIZE;
1927 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ma_page->buff + LSN_STORE_SIZE;
1928 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
1929
1930 /* Remember new page length for future log entires for same page */
1931 ma_page->org_size= ma_page->size;
1932
1933 if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
1934 info->trn, info,
1935 (translog_size_t)
1936 (sizeof(log_data) + page_length),
1937 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1938 log_data, NULL))
1939 DBUG_RETURN(1);
1940 DBUG_RETURN(0);
1941}
1942
1943
1944/**
1945 @brief
1946 Log when some part of the key page changes
1947*/
1948
1949my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
1950 enum en_key_debug debug_marker __attribute__((unused)))
1951{
1952 LSN lsn;
1953 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 6 + 7], *log_pos;
1954 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
1955 uint offset= (uint) (key_pos - ma_page->buff), translog_parts;
1956 MARIA_HA *info= ma_page->info;
1957 my_off_t page= ma_page->pos / info->s->block_size;
1958 DBUG_ENTER("_ma_log_change");
1959 DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) page, length));
1960
1961 DBUG_ASSERT(info->s->now_transactional);
1962 DBUG_ASSERT(offset + length <= ma_page->size);
1963 DBUG_ASSERT(ma_page->org_size == ma_page->size);
1964
1965 /* Store address of new root page */
1966 page= ma_page->pos / info->s->block_size;
1967 page_store(log_data + FILEID_STORE_SIZE, page);
1968 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1969
1970#ifdef EXTRA_DEBUG_KEY_CHANGES
1971 (*log_pos++)= KEY_OP_DEBUG;
1972 (*log_pos++)= debug_marker;
1973#endif
1974
1975 log_pos[0]= KEY_OP_OFFSET;
1976 int2store(log_pos+1, offset);
1977 log_pos[3]= KEY_OP_CHANGE;
1978 int2store(log_pos+4, length);
1979 log_pos+= 6;
1980
1981 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1982 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (log_pos - log_data);
1983 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
1984 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
1985 translog_parts= 2;
1986
1987 _ma_log_key_changes(ma_page,
1988 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
1989 log_pos, &length, &translog_parts);
1990
1991 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1992 info->trn, info,
1993 (translog_size_t) (log_pos - log_data) + length,
1994 TRANSLOG_INTERNAL_PARTS + translog_parts,
1995 log_array, log_data, NULL))
1996 DBUG_RETURN(1);
1997 DBUG_RETURN(0);
1998}
1999
2000
2001/**
2002 @brief Write log entry for page splitting
2003
2004 @fn _ma_log_split()
2005 @param
2006 ma_page Page that is changed
2007 org_length Original length of page. Can be bigger than block_size
2008 for block that overflowed
2009 new_length New length of page
2010 key_pos Where key is inserted on page (may be 0 if no key)
2011 key_length Number of bytes changed at key_pos
2012 move_length Number of bytes moved at key_pos to make room for key
2013 prefix_or_suffix KEY_OP_NONE Ignored
2014 KEY_OP_ADD_PREFIX Add data to start of page
2015 KEY_OP_ADD_SUFFIX Add data to end of page
2016 data What data was added
2017 data_length Number of bytes added first or last
2018 changed_length Number of bytes changed first or last.
2019
2020 @note
2021 Write log entry for page that has got a key added to the page under
2022 one and only one of the following senarios:
2023 - Page is shortened from end
2024 - Data is added to end of page
2025 - Data added at front of page
2026*/
2027
2028static my_bool _ma_log_split(MARIA_PAGE *ma_page,
2029 uint org_length, uint new_length,
2030 const uchar *key_pos, uint key_length,
2031 int move_length, enum en_key_op prefix_or_suffix,
2032 const uchar *data, uint data_length,
2033 uint changed_length)
2034{
2035 LSN lsn;
2036 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+3+3+3+3+2 +7];
2037 uchar *log_pos;
2038 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
2039 uint offset= (uint) (key_pos - ma_page->buff);
2040 uint translog_parts, extra_length;
2041 MARIA_HA *info= ma_page->info;
2042 my_off_t page= ma_page->pos / info->s->block_size;
2043 DBUG_ENTER("_ma_log_split");
2044 DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
2045 (ulong) page, org_length, new_length));
2046
2047 DBUG_ASSERT(changed_length >= data_length);
2048 DBUG_ASSERT(org_length <= info->s->max_index_block_size);
2049 DBUG_ASSERT(new_length == ma_page->size);
2050 DBUG_ASSERT(org_length == ma_page->org_size);
2051
2052 log_pos= log_data + FILEID_STORE_SIZE;
2053 page_store(log_pos, page);
2054 log_pos+= PAGE_STORE_SIZE;
2055
2056#ifdef EXTRA_DEBUG_KEY_CHANGES
2057 (*log_pos++)= KEY_OP_DEBUG;
2058 (*log_pos++)= KEY_OP_DEBUG_LOG_SPLIT;
2059#endif
2060
2061 /* Store keypage_flag */
2062 *log_pos++= KEY_OP_SET_PAGEFLAG;
2063 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2064
2065 if (new_length <= offset || !key_pos)
2066 {
2067 /*
2068 Page was split before inserted key. Write redo entry where
2069 we just cut current page at page_length
2070 */
2071 uint length_offset= org_length - new_length;
2072 log_pos[0]= KEY_OP_DEL_SUFFIX;
2073 int2store(log_pos+1, length_offset);
2074 log_pos+= 3;
2075 translog_parts= 1;
2076 extra_length= 0;
2077 DBUG_ASSERT(data_length == 0);
2078 }
2079 else
2080 {
2081 /* Key was added to page which was split after the inserted key */
2082 uint max_key_length;
2083
2084 /*
2085 Handle case when split happened directly after the newly inserted key.
2086 */
2087 max_key_length= new_length - offset;
2088 extra_length= MY_MIN(key_length, max_key_length);
2089 if (offset + move_length > new_length)
2090 {
2091 /* This is true when move_length includes changes for next packed key */
2092 move_length= new_length - offset;
2093 }
2094
2095 if ((int) new_length < (int) (org_length + move_length + data_length))
2096 {
2097 /* Shorten page */
2098 uint diff= org_length + move_length + data_length - new_length;
2099 log_pos[0]= KEY_OP_DEL_SUFFIX;
2100 int2store(log_pos + 1, diff);
2101 log_pos+= 3;
2102 DBUG_ASSERT(data_length == 0); /* Page is shortened */
2103 DBUG_ASSERT(offset <= org_length - diff);
2104 }
2105 else
2106 {
2107 DBUG_ASSERT(new_length == org_length + move_length + data_length);
2108 DBUG_ASSERT(offset <= org_length);
2109 }
2110
2111 log_pos[0]= KEY_OP_OFFSET;
2112 int2store(log_pos+1, offset);
2113 log_pos+= 3;
2114
2115 if (move_length)
2116 {
2117 log_pos[0]= KEY_OP_SHIFT;
2118 int2store(log_pos+1, move_length);
2119 log_pos+= 3;
2120 }
2121
2122 log_pos[0]= KEY_OP_CHANGE;
2123 int2store(log_pos+1, extra_length);
2124 log_pos+= 3;
2125
2126 /* Point to original inserted key data */
2127 if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
2128 key_pos+= data_length;
2129
2130 translog_parts= 2;
2131 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
2132 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extra_length;
2133 }
2134
2135 if (data_length)
2136 {
2137 /* Add prefix or suffix */
2138 log_pos[0]= prefix_or_suffix;
2139 int2store(log_pos+1, data_length);
2140 log_pos+= 3;
2141 if (prefix_or_suffix == KEY_OP_ADD_PREFIX)
2142 {
2143 int2store(log_pos+1, changed_length);
2144 log_pos+= 2;
2145 data_length= changed_length;
2146 }
2147 log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= data;
2148 log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= data_length;
2149 translog_parts++;
2150 extra_length+= data_length;
2151 }
2152
2153 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2154 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2155 log_data);
2156
2157 _ma_log_key_changes(ma_page,
2158 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2159 log_pos, &extra_length, &translog_parts);
2160 /* Remember new page length for future log entires for same page */
2161 ma_page->org_size= ma_page->size;
2162
2163 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2164 info->trn, info,
2165 (translog_size_t)
2166 log_array[TRANSLOG_INTERNAL_PARTS +
2167 0].length + extra_length,
2168 TRANSLOG_INTERNAL_PARTS + translog_parts,
2169 log_array, log_data, NULL));
2170}
2171
2172
2173/**
2174 @brief
2175 Write log entry for page that has got a key added to the page
2176 and page is shortened from start of page
2177
2178 @fn _ma_log_del_prefix()
2179 @param info Maria handler
2180 @param page Page number
2181 @param buff Page buffer
2182 @param org_length Length of buffer when read
2183 @param new_length Final length
2184 @param key_pos Where on page buffer key was added. This is position
2185 before prefix was removed
2186 @param key_length How many bytes was changed at 'key_pos'
2187 @param move_length How many bytes was moved up when key was added
2188
2189 @return
2190 @retval 0 ok
2191 @retval 1 error
2192*/
2193
2194static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
2195 uint org_length, uint new_length,
2196 const uchar *key_pos, uint key_length,
2197 int move_length)
2198{
2199 LSN lsn;
2200 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 12 + 7];
2201 uchar *log_pos;
2202 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2203 uint offset= (uint) (key_pos - ma_page->buff);
2204 uint diff_length= org_length + move_length - new_length;
2205 uint translog_parts, extra_length;
2206 MARIA_HA *info= ma_page->info;
2207 my_off_t page= ma_page->pos / info->s->block_size;
2208 DBUG_ENTER("_ma_log_del_prefix");
2209 DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
2210 (ulong) page, org_length, new_length));
2211
2212 DBUG_ASSERT((int) diff_length > 0);
2213 DBUG_ASSERT(ma_page->org_size == org_length);
2214 DBUG_ASSERT(ma_page->size == new_length);
2215
2216 log_pos= log_data + FILEID_STORE_SIZE;
2217 page_store(log_pos, page);
2218 log_pos+= PAGE_STORE_SIZE;
2219
2220 translog_parts= 1;
2221 extra_length= 0;
2222
2223#ifdef EXTRA_DEBUG_KEY_CHANGES
2224 *log_pos++= KEY_OP_DEBUG;
2225 *log_pos++= KEY_OP_DEBUG_LOG_DEL_PREFIX;
2226#endif
2227
2228 /* Store keypage_flag */
2229 *log_pos++= KEY_OP_SET_PAGEFLAG;
2230 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2231
2232 if (offset < diff_length + info->s->keypage_header)
2233 {
2234 /*
2235 Key is not anymore on page. Move data down, but take into account that
2236 the original page had grown with 'move_length bytes'
2237 */
2238 DBUG_ASSERT(offset + key_length <= diff_length + info->s->keypage_header);
2239
2240 log_pos[0]= KEY_OP_DEL_PREFIX;
2241 int2store(log_pos+1, diff_length - move_length);
2242 log_pos+= 3;
2243 }
2244 else
2245 {
2246 /*
2247 Correct position to key, as data before key has been delete and key
2248 has thus been moved down
2249 */
2250 offset-= diff_length;
2251 key_pos-= diff_length;
2252
2253 /* Move data down */
2254 log_pos[0]= KEY_OP_DEL_PREFIX;
2255 int2store(log_pos+1, diff_length);
2256 log_pos+= 3;
2257
2258 log_pos[0]= KEY_OP_OFFSET;
2259 int2store(log_pos+1, offset);
2260 log_pos+= 3;
2261
2262 if (move_length)
2263 {
2264 log_pos[0]= KEY_OP_SHIFT;
2265 int2store(log_pos+1, move_length);
2266 log_pos+= 3;
2267 }
2268 log_pos[0]= KEY_OP_CHANGE;
2269 int2store(log_pos+1, key_length);
2270 log_pos+= 3;
2271 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
2272 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
2273 translog_parts= 2;
2274 extra_length= key_length;
2275 }
2276 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2277 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2278 log_data);
2279 _ma_log_key_changes(ma_page,
2280 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2281 log_pos, &extra_length, &translog_parts);
2282 /* Remember new page length for future log entires for same page */
2283 ma_page->org_size= ma_page->size;
2284
2285 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2286 info->trn, info,
2287 (translog_size_t)
2288 log_array[TRANSLOG_INTERNAL_PARTS +
2289 0].length + extra_length,
2290 TRANSLOG_INTERNAL_PARTS + translog_parts,
2291 log_array, log_data, NULL));
2292}
2293
2294
2295/**
2296 @brief
2297 Write log entry for page that has got data added first and
2298 data deleted last. Old changed key may be part of page
2299*/
2300
2301static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
2302 uint new_length,
2303 uint data_added_first,
2304 uint data_changed_first,
2305 uint data_deleted_last,
2306 const uchar *key_pos,
2307 uint key_length, int move_length)
2308{
2309 LSN lsn;
2310 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+5+3+3+3 + 7];
2311 uchar *log_pos;
2312 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
2313 uint key_offset;
2314 uint translog_parts, extra_length;
2315 MARIA_HA *info= ma_page->info;
2316 my_off_t page= ma_page->pos / info->s->block_size;
2317 DBUG_ENTER("_ma_log_key_middle");
2318 DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2319
2320 DBUG_ASSERT(ma_page->size == new_length);
2321
2322 /* new place of key after changes */
2323 key_pos+= data_added_first;
2324 key_offset= (uint) (key_pos - ma_page->buff);
2325 if (key_offset < new_length)
2326 {
2327 /* key is on page; Calculate how much of the key is there */
2328 uint max_key_length= new_length - key_offset;
2329 if (max_key_length < key_length)
2330 {
2331 /* Key is last on page */
2332 key_length= max_key_length;
2333 move_length= 0;
2334 }
2335 /*
2336 Take into account that new data was added as part of original key
2337 that also needs to be removed from page
2338 */
2339 data_deleted_last+= move_length;
2340 }
2341
2342 /* First log changes to page */
2343 log_pos= log_data + FILEID_STORE_SIZE;
2344 page_store(log_pos, page);
2345 log_pos+= PAGE_STORE_SIZE;
2346
2347#ifdef EXTRA_DEBUG_KEY_CHANGES
2348 *log_pos++= KEY_OP_DEBUG;
2349 *log_pos++= KEY_OP_DEBUG_LOG_MIDDLE;
2350#endif
2351
2352 /* Store keypage_flag */
2353 *log_pos++= KEY_OP_SET_PAGEFLAG;
2354 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
2355
2356 log_pos[0]= KEY_OP_DEL_SUFFIX;
2357 int2store(log_pos+1, data_deleted_last);
2358 log_pos+= 3;
2359
2360 log_pos[0]= KEY_OP_ADD_PREFIX;
2361 int2store(log_pos+1, data_added_first);
2362 int2store(log_pos+3, data_changed_first);
2363 log_pos+= 5;
2364
2365 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2366 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2367 log_data);
2368 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (ma_page->buff +
2369 info->s->keypage_header);
2370 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2371 translog_parts= 2;
2372 extra_length= data_changed_first;
2373
2374 /* If changed key is on page, log those changes too */
2375
2376 if (key_offset < new_length)
2377 {
2378 uchar *start_log_pos= log_pos;
2379
2380 log_pos[0]= KEY_OP_OFFSET;
2381 int2store(log_pos+1, key_offset);
2382 log_pos+= 3;
2383 if (move_length)
2384 {
2385 log_pos[0]= KEY_OP_SHIFT;
2386 int2store(log_pos+1, move_length);
2387 log_pos+= 3;
2388 }
2389 log_pos[0]= KEY_OP_CHANGE;
2390 int2store(log_pos+1, key_length);
2391 log_pos+= 3;
2392
2393 log_array[TRANSLOG_INTERNAL_PARTS + 2].str= start_log_pos;
2394 log_array[TRANSLOG_INTERNAL_PARTS + 2].length= (uint) (log_pos -
2395 start_log_pos);
2396
2397 log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_pos;
2398 log_array[TRANSLOG_INTERNAL_PARTS + 3].length= key_length;
2399 translog_parts+=2;
2400 extra_length+= (uint) (log_array[TRANSLOG_INTERNAL_PARTS + 2].length +
2401 key_length);
2402 }
2403
2404 _ma_log_key_changes(ma_page,
2405 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2406 log_pos, &extra_length, &translog_parts);
2407 /* Remember new page length for future log entires for same page */
2408 ma_page->org_size= ma_page->size;
2409
2410 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2411 info->trn, info,
2412 (translog_size_t)
2413 (log_array[TRANSLOG_INTERNAL_PARTS +
2414 0].length + extra_length),
2415 TRANSLOG_INTERNAL_PARTS + translog_parts,
2416 log_array, log_data, NULL));
2417}
2418
2419
2420#ifdef NOT_NEEDED
2421
2422/**
2423 @brief
2424 Write log entry for page that has got data added first and
2425 data deleted last
2426*/
2427
2428static my_bool _ma_log_middle(MARIA_PAGE *ma_page,
2429 uint data_added_first, uint data_changed_first,
2430 uint data_deleted_last)
2431{
2432 LSN lsn;
2433 LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
2434 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5 + 7], *log_pos;
2435 MARIA_HA *info= ma_page->info;
2436 my_off_t page= ma_page->page / info->s->block_size;
2437 uint translog_parts, extra_length;
2438 DBUG_ENTER("_ma_log_middle");
2439 DBUG_PRINT("enter", ("page: %lu", (ulong) page));
2440
2441 DBUG_ASSERT(ma_page->org_size + data_added_first - data_deleted_last ==
2442 ma_page->size);
2443
2444 log_pos= log_data + FILEID_STORE_SIZE;
2445 page_store(log_pos, page);
2446 log_pos+= PAGE_STORE_SIZE;
2447
2448 log_pos[0]= KEY_OP_DEL_PREFIX;
2449 int2store(log_pos+1, data_deleted_last);
2450 log_pos+= 3;
2451
2452 log_pos[0]= KEY_OP_ADD_PREFIX;
2453 int2store(log_pos+1, data_added_first);
2454 int2store(log_pos+3, data_changed_first);
2455 log_pos+= 5;
2456
2457 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2458 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
2459 log_data);
2460
2461 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ((char*) buff +
2462 info->s->keypage_header);
2463 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first;
2464 translog_parts= 2;
2465 extra_length= data_changed_first;
2466
2467 _ma_log_key_changes(ma_page,
2468 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
2469 log_pos, &extra_length, &translog_parts);
2470 /* Remember new page length for future log entires for same page */
2471 ma_page->org_size= ma_page->size;
2472
2473 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
2474 info->trn, info,
2475 (translog_size_t)
2476 log_array[TRANSLOG_INTERNAL_PARTS +
2477 0].length + extra_length,
2478 TRANSLOG_INTERNAL_PARTS + translog_parts,
2479 log_array, log_data, NULL));
2480}
2481#endif
2482