1/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 Copyright (C) 2009-2010 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17#include "ma_fulltext.h"
18#include "ma_rt_index.h"
19#include "trnman.h"
20#include "ma_key_recover.h"
21
22static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
23 MARIA_PAGE *page);
24static int del(MARIA_HA *info, MARIA_KEY *key,
25 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
26 uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
27static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
28 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
29 uchar *keypos);
30static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
31 uchar *keypos, uchar *lastkey, uchar *page_end,
32 my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
33
34/* @breif Remove a row from a MARIA table */
35
36int maria_delete(MARIA_HA *info,const uchar *record)
37{
38 uint i;
39 uchar *old_key;
40 int save_errno;
41 char lastpos[8];
42 MARIA_SHARE *share= info->s;
43 MARIA_KEYDEF *keyinfo;
44 DBUG_ENTER("maria_delete");
45
46 /* Test if record is in datafile */
47 DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
48 maria_print_error(share, HA_ERR_CRASHED);
49 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
50 DBUG_EXECUTE_IF("my_error_test_undefined_error",
51 maria_print_error(share, INT_MAX);
52 DBUG_RETURN(my_errno= INT_MAX););
53 if (!(info->update & HA_STATE_AKTIV))
54 {
55 DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */
56 }
57 if (share->options & HA_OPTION_READ_ONLY_DATA)
58 {
59 DBUG_RETURN(my_errno=EACCES);
60 }
61 if (_ma_readinfo(info,F_WRLCK,1))
62 DBUG_RETURN(my_errno);
63 if ((*share->compare_record)(info,record))
64 goto err; /* Error on read-check */
65
66 if (_ma_mark_file_changed(share))
67 goto err;
68
69 /* Ensure we don't change the autoincrement value */
70 info->last_auto_increment= ~(ulonglong) 0;
71 /* Remove all keys from the index file */
72
73 old_key= info->lastkey_buff2;
74
75 for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
76 {
77 if (maria_is_key_active(share->state.key_map, i))
78 {
79 keyinfo->version++;
80 if (keyinfo->flag & HA_FULLTEXT)
81 {
82 if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
83 goto err;
84 }
85 else
86 {
87 MARIA_KEY key;
88 if (keyinfo->ck_delete(info,
89 (*keyinfo->make_key)(info, &key, i, old_key,
90 record,
91 info->cur_row.lastpos,
92 info->cur_row.trid)))
93 goto err;
94 }
95 /* The above changed info->lastkey2. Inform maria_rnext_same(). */
96 info->update&= ~HA_STATE_RNEXT_SAME;
97 }
98 }
99
100 if (share->calc_checksum)
101 {
102 /*
103 We can't use the row based checksum as this doesn't have enough
104 precision.
105 */
106 info->cur_row.checksum= (*share->calc_checksum)(info, record);
107 }
108
109 if ((*share->delete_record)(info, record))
110 goto err; /* Remove record from database */
111
112 info->state->checksum-= info->cur_row.checksum;
113 info->state->records--;
114 info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
115 info->row_changes++;
116 share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
117 STATE_NOT_ZEROFILLED);
118 info->state->changed=1;
119
120 mi_sizestore(lastpos, info->cur_row.lastpos);
121 _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
122 if (info->invalidator != 0)
123 {
124 DBUG_PRINT("info", ("invalidator... '%s' (delete)",
125 share->open_file_name.str));
126 (*info->invalidator)(share->open_file_name.str);
127 info->invalidator=0;
128 }
129 DBUG_RETURN(0);
130
131err:
132 save_errno= my_errno;
133 DBUG_ASSERT(save_errno);
134 if (!save_errno)
135 save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
136
137 mi_sizestore(lastpos, info->cur_row.lastpos);
138 (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
139 info->update|=HA_STATE_WRITTEN; /* Buffer changed */
140 if (save_errno != HA_ERR_RECORD_CHANGED)
141 {
142 _ma_set_fatal_error(share, HA_ERR_CRASHED);
143 save_errno= HA_ERR_CRASHED;
144 }
145 DBUG_RETURN(my_errno= save_errno);
146} /* maria_delete */
147
148
149/*
150 Remove a key from the btree index
151
152 TODO:
153 Change ma_ck_real_delete() to use another buffer for changed keys instead
154 of key->data. This would allows us to remove the copying of the key here.
155*/
156
157my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
158{
159 MARIA_SHARE *share= info->s;
160 int res;
161 LSN lsn= LSN_IMPOSSIBLE;
162 my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
163 uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
164 MARIA_KEY org_key;
165 DBUG_ENTER("_ma_ck_delete");
166
167 LINT_INIT_STRUCT(org_key);
168
169 save_key_data= key->data;
170 if (share->now_transactional)
171 {
172 /* Save original value as the key may change */
173 memcpy(key_buff, key->data, key->data_length + key->ref_length);
174 org_key= *key;
175 key->data= key_buff;
176 }
177
178 if ((res= _ma_ck_real_delete(info, key, &new_root)))
179 {
180 /* We have to mark the table crashed before unpin_all_pages() */
181 maria_mark_crashed(info);
182 }
183
184 key->data= save_key_data;
185 if (!res && share->now_transactional)
186 res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
187 else
188 {
189 share->state.key_root[key->keyinfo->key_nr]= new_root;
190 _ma_fast_unlock_key_del(info);
191 }
192 _ma_unpin_all_pages_and_finalize_row(info, lsn);
193 DBUG_RETURN(res != 0);
194} /* _ma_ck_delete */
195
196
197my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
198 my_off_t *root)
199{
200 int error;
201 my_bool result= 0;
202 my_off_t old_root;
203 uchar *root_buff;
204 MARIA_KEYDEF *keyinfo= key->keyinfo;
205 MARIA_PAGE page;
206 DBUG_ENTER("_ma_ck_real_delete");
207
208 if ((old_root=*root) == HA_OFFSET_ERROR)
209 {
210 _ma_set_fatal_error(info->s, HA_ERR_CRASHED);
211 DBUG_RETURN(1);
212 }
213 if (!(root_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
214 MARIA_MAX_KEY_BUFF*2)))
215 {
216 DBUG_PRINT("error",("Couldn't allocate memory"));
217 my_errno=ENOMEM;
218 DBUG_RETURN(1);
219 }
220 DBUG_PRINT("info",("root_page: %lu",
221 (ulong) (old_root / keyinfo->block_length)));
222 if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
223 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
224 {
225 result= 1;
226 goto err;
227 }
228 if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
229 SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
230 SEARCH_SAME),
231 &page)))
232 {
233 if (error < 0)
234 result= 1;
235 else if (error == 2)
236 {
237 DBUG_PRINT("test",("Enlarging of root when deleting"));
238 if (_ma_enlarge_root(info, key, root))
239 result= 1;
240 }
241 else /* error == 1 */
242 {
243 MARIA_SHARE *share= info->s;
244
245 page_mark_changed(info, &page);
246
247 if (page.size <= page.node + share->keypage_header + 1)
248 {
249 DBUG_ASSERT(page.size == page.node + share->keypage_header);
250 if (page.node)
251 *root= _ma_kpos(page.node, root_buff +share->keypage_header +
252 page.node);
253 else
254 *root=HA_OFFSET_ERROR;
255 if (_ma_dispose(info, old_root, 0))
256 result= 1;
257 }
258 else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
259 DFLT_INIT_HITS))
260 result= 1;
261 }
262 }
263err:
264 my_afree(root_buff);
265 DBUG_PRINT("exit",("Return: %d",result));
266 DBUG_RETURN(result);
267} /* _ma_ck_real_delete */
268
269
270/**
271 @brief Remove key below key root
272
273 @param key Key to delete. Will contain new key if block was enlarged
274
275 @return
276 @retval 0 ok (anc_page is not changed)
277 @retval 1 If data on page is too small; In this case anc_buff is not saved
278 @retval 2 If data on page is too big
279 @retval -1 On errors
280*/
281
282static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
283 MARIA_PAGE *anc_page)
284{
285 int flag,ret_value,save_flag;
286 uint nod_flag, page_flag;
287 my_bool last_key;
288 uchar *leaf_buff,*keypos;
289 uchar lastkey[MARIA_MAX_KEY_BUFF];
290 MARIA_KEY_PARAM s_temp;
291 MARIA_SHARE *share= info->s;
292 MARIA_KEYDEF *keyinfo= key->keyinfo;
293 MARIA_PAGE leaf_page;
294 DBUG_ENTER("d_search");
295 DBUG_DUMP("page", anc_page->buff, anc_page->size);
296
297 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
298 &last_key);
299 if (flag == MARIA_FOUND_WRONG_KEY)
300 {
301 DBUG_PRINT("error",("Found wrong key"));
302 DBUG_RETURN(-1);
303 }
304 page_flag= anc_page->flag;
305 nod_flag= anc_page->node;
306
307 if (!flag && (keyinfo->flag & HA_FULLTEXT))
308 {
309 uint off;
310 int subkeys;
311
312 get_key_full_length_rdonly(off, lastkey);
313 subkeys=ft_sintXkorr(lastkey+off);
314 DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
315 comp_flag=SEARCH_SAME;
316 if (subkeys >= 0)
317 {
318 /* normal word, one-level tree structure */
319 if (info->ft1_to_ft2)
320 {
321 /* we're in ft1->ft2 conversion mode. Saving key data */
322 insert_dynamic(info->ft1_to_ft2, (lastkey+off));
323 }
324 else
325 {
326 /* we need exact match only if not in ft1->ft2 conversion mode */
327 flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
328 lastkey, &last_key);
329 }
330 /* fall through to normal delete */
331 }
332 else
333 {
334 /* popular word. two-level tree. going down */
335 uint tmp_key_length;
336 my_off_t root;
337 uchar *kpos=keypos;
338 MARIA_KEY tmp_key;
339
340 tmp_key.data= lastkey;
341 tmp_key.keyinfo= keyinfo;
342
343 if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
344 &kpos)))
345 {
346 _ma_set_fatal_error(share, HA_ERR_CRASHED);
347 DBUG_RETURN(-1);
348 }
349 root= _ma_row_pos_from_key(&tmp_key);
350 if (subkeys == -1)
351 {
352 /* the last entry in sub-tree */
353 if (_ma_dispose(info, root, 1))
354 DBUG_RETURN(-1);
355 /* fall through to normal delete */
356 }
357 else
358 {
359 MARIA_KEY word_key;
360 keyinfo=&share->ft2_keyinfo;
361 /* we'll modify key entry 'in vivo' */
362 kpos-=keyinfo->keylength+nod_flag;
363 get_key_full_length_rdonly(off, key->data);
364
365 word_key.data= key->data + off;
366 word_key.keyinfo= &share->ft2_keyinfo;
367 word_key.data_length= HA_FT_WLEN;
368 word_key.ref_length= 0;
369 word_key.flag= 0;
370 ret_value= _ma_ck_real_delete(info, &word_key, &root);
371 _ma_dpointer(share, kpos+HA_FT_WLEN, root);
372 subkeys++;
373 ft_intXstore(kpos, subkeys);
374 if (!ret_value)
375 {
376 page_mark_changed(info, anc_page);
377 ret_value= _ma_write_keypage(anc_page,
378 PAGECACHE_LOCK_LEFT_WRITELOCKED,
379 DFLT_INIT_HITS);
380 }
381 DBUG_PRINT("exit",("Return: %d",ret_value));
382 DBUG_RETURN(ret_value);
383 }
384 }
385 }
386 leaf_buff=0;
387 if (nod_flag)
388 {
389 /* Read left child page */
390 leaf_page.pos= _ma_kpos(nod_flag,keypos);
391 if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
392 MARIA_MAX_KEY_BUFF*2)))
393 {
394 DBUG_PRINT("error", ("Couldn't allocate memory"));
395 my_errno=ENOMEM;
396 DBUG_RETURN(-1);
397 }
398 if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
399 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
400 0))
401 goto err;
402 }
403
404 if (flag != 0)
405 {
406 if (!nod_flag)
407 {
408 /* This should newer happend */
409 DBUG_PRINT("error",("Didn't find key"));
410 _ma_set_fatal_error(share, HA_ERR_CRASHED);
411 goto err;
412 }
413 save_flag=0;
414 ret_value= d_search(info, key, comp_flag, &leaf_page);
415 }
416 else
417 { /* Found key */
418 uint tmp;
419 uint anc_buff_length= anc_page->size;
420 uint anc_page_flag= anc_page->flag;
421 my_off_t next_block;
422
423 if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
424 anc_page->buff + anc_buff_length,
425 &next_block, &s_temp)))
426 goto err;
427
428 page_mark_changed(info, anc_page);
429 anc_buff_length-= tmp;
430 anc_page->size= anc_buff_length;
431 page_store_size(share, anc_page);
432
433 /*
434 Log initial changes on pages
435 If there is an underflow, there will be more changes logged to the
436 page
437 */
438 if (share->now_transactional &&
439 _ma_log_delete(anc_page, s_temp.key_pos,
440 s_temp.changed_length, s_temp.move_length,
441 0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
442 DBUG_RETURN(-1);
443
444 if (!nod_flag)
445 { /* On leaf page */
446 if (anc_buff_length <= (info->quick_mode ?
447 MARIA_MIN_KEYBLOCK_LENGTH :
448 (uint) keyinfo->underflow_block_length))
449 {
450 /* Page will be written by caller if we return 1 */
451 DBUG_RETURN(1);
452 }
453 if (_ma_write_keypage(anc_page,
454 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
455 DBUG_RETURN(-1);
456 DBUG_RETURN(0);
457 }
458 save_flag=1; /* Mark that anc_buff is changed */
459 ret_value= del(info, key, anc_page, &leaf_page,
460 keypos, next_block, lastkey);
461 }
462 if (ret_value >0)
463 {
464 save_flag= 2;
465 if (ret_value == 1)
466 ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
467 else
468 {
469 /* This can only happen with variable length keys */
470 MARIA_KEY last_key;
471 DBUG_PRINT("test",("Enlarging of key when deleting"));
472
473 last_key.data= lastkey;
474 last_key.keyinfo= keyinfo;
475 if (!_ma_get_last_key(&last_key, anc_page, keypos))
476 goto err;
477 ret_value= _ma_insert(info, key, anc_page, keypos,
478 last_key.data,
479 (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
480
481 if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
482 DFLT_INIT_HITS))
483 ret_value= -1;
484 }
485 }
486 if (ret_value == 0 && anc_page->size > share->max_index_block_size)
487 {
488 /*
489 parent buffer got too big ; We have to split the page.
490 The | 2 is there to force write of anc page below
491 */
492 save_flag= 3;
493 ret_value= _ma_split_page(info, key, anc_page,
494 share->max_index_block_size,
495 (uchar*) 0, 0, 0, lastkey, 0) | 2;
496 DBUG_ASSERT(anc_page->org_size == anc_page->size);
497 }
498 if (save_flag && ret_value != 1)
499 {
500 page_mark_changed(info, anc_page);
501 if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
502 DFLT_INIT_HITS))
503 ret_value= -1;
504 }
505 else
506 {
507 DBUG_DUMP("page", anc_page->buff, anc_page->size);
508 }
509 my_afree(leaf_buff);
510 DBUG_PRINT("exit",("Return: %d",ret_value));
511 DBUG_RETURN(ret_value);
512
513err:
514 my_afree(leaf_buff);
515 DBUG_PRINT("exit",("Error: %d",my_errno));
516 DBUG_RETURN (-1);
517} /* d_search */
518
519
520/**
521 @brief Remove a key that has a page-reference
522
523 @param info Maria handler
524 @param key Buffer for key to be inserted at upper level
525 @param anc_page Page address for page where deleted key was
526 @param anc_buff Page buffer (nod) where deleted key was
527 @param leaf_page Page address for nod before the deleted key
528 @param leaf_buff Buffer for leaf_page
529 @param leaf_buff_link Pinned page link for leaf_buff
530 @param keypos Pos to where deleted key was on anc_buff
531 @param next_block Page adress for nod after deleted key
532 @param ret_key_buff Key before keypos in anc_buff
533
534 @notes
535 leaf_page must be written to disk if retval > 0
536 anc_page is not updated on disk. Caller should do this
537
538 @return
539 @retval < 0 Error
540 @retval 0 OK. leaf_buff is written to disk
541
542 @retval 1 key contains key to upper level (from balance page)
543 leaf_buff has underflow
544 @retval 2 key contains key to upper level (from split space)
545*/
546
547static int del(MARIA_HA *info, MARIA_KEY *key,
548 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
549 uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
550{
551 int ret_value,length;
552 uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
553 uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
554 uchar *anc_buff;
555 MARIA_KEY_PARAM s_temp;
556 MARIA_KEY tmp_key;
557 MARIA_SHARE *share= info->s;
558 MARIA_KEYDEF *keyinfo= key->keyinfo;
559 MARIA_KEY ret_key;
560 MARIA_PAGE next_page;
561 DBUG_ENTER("del");
562 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
563 (ulong) (leaf_page->pos / share->block_size),
564 keypos));
565 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
566
567 page_flag= leaf_page->flag;
568 leaf_length= leaf_page->size;
569 nod_flag= leaf_page->node;
570
571 endpos= leaf_page->buff + leaf_length;
572 tmp_key.keyinfo= keyinfo;
573 tmp_key.data= keybuff;
574 next_buff= 0;
575
576 if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
577 DBUG_RETURN(-1);
578
579 if (nod_flag)
580 {
581 next_page.pos= _ma_kpos(nod_flag,endpos);
582 if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
583 MARIA_MAX_KEY_BUFF*2)))
584 DBUG_RETURN(-1);
585 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
586 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
587 ret_value= -1;
588 else
589 {
590 DBUG_DUMP("next_page", next_page.buff, next_page.size);
591 if ((ret_value= del(info, key, anc_page, &next_page,
592 keypos, next_block, ret_key_buff)) >0)
593 {
594 /* Get new length after key was deleted */
595 endpos= leaf_page->buff+ leaf_page->size;
596 if (ret_value == 1)
597 {
598 /* underflow writes "next_page" to disk */
599 ret_value= underflow(info, keyinfo, leaf_page, &next_page,
600 endpos);
601 if (ret_value < 0)
602 goto err;
603 if (leaf_page->size > share->max_index_block_size)
604 {
605 DBUG_ASSERT(ret_value == 0);
606 ret_value= (_ma_split_page(info, key, leaf_page,
607 share->max_index_block_size,
608 (uchar*) 0, 0, 0,
609 ret_key_buff, 0) | 2);
610 }
611 }
612 else
613 {
614 if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
615 DFLT_INIT_HITS))
616 goto err;
617 DBUG_PRINT("test",("Inserting of key when deleting"));
618 if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
619 goto err;
620 ret_value= _ma_insert(info, key, leaf_page, endpos,
621 tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
622 0);
623 }
624 }
625 page_mark_changed(info, leaf_page);
626 /*
627 If ret_value <> 0, then leaf_page underflowed and caller will have
628 to handle underflow and write leaf_page to disk.
629 We can't write it here, as if leaf_page is empty we get an assert
630 in _ma_write_keypage.
631 */
632 if (ret_value == 0 && _ma_write_keypage(leaf_page,
633 PAGECACHE_LOCK_LEFT_WRITELOCKED,
634 DFLT_INIT_HITS))
635 goto err;
636 }
637 my_afree(next_buff);
638 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
639 DBUG_RETURN(ret_value);
640 }
641
642 /*
643 Remove last key from leaf page
644 Note that leaf_page page may only have had one key (can normally only
645 happen in quick mode), in which ase it will now temporary have 0 keys
646 on it. This will be corrected by the caller as we will return 0.
647 */
648 new_leaf_length= (uint) (key_start - leaf_page->buff);
649 leaf_page->size= new_leaf_length;
650 page_store_size(share, leaf_page);
651
652 if (share->now_transactional &&
653 _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
654 goto err;
655
656 page_mark_changed(info, leaf_page); /* Safety */
657 if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
658 (uint) keyinfo->underflow_block_length))
659 {
660 /* Underflow, leaf_page will be written by caller */
661 ret_value= 1;
662 }
663 else
664 {
665 ret_value= 0;
666 if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
667 DFLT_INIT_HITS))
668 goto err;
669 }
670
671 /* Place last key in ancestor page on deleted key position */
672 a_length= anc_page->size;
673 anc_buff= anc_page->buff;
674 endpos= anc_buff + a_length;
675
676 ret_key.keyinfo= keyinfo;
677 ret_key.data= ret_key_buff;
678
679 prev_key= 0;
680 if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
681 {
682 if (!_ma_get_last_key(&ret_key, anc_page, keypos))
683 goto err;
684 prev_key= ret_key.data;
685 }
686 length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
687 keypos == endpos ? (uchar*) 0 : keypos,
688 prev_key, prev_key,
689 &s_temp);
690 if (length > 0)
691 bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
692 else
693 bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
694 (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
695 key_start= keypos;
696 if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
697 SEARCH_PAGE_KEY_HAS_TRANSID))
698 {
699 _ma_mark_page_with_transid(share, anc_page);
700 }
701
702 /* Save pointer to next leaf on parent page */
703 if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
704 &keypos))
705 goto err;
706 _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
707 anc_page->size= a_length + length;
708 page_store_size(share, anc_page);
709
710 if (share->now_transactional &&
711 _ma_log_add(anc_page, a_length,
712 key_start, s_temp.changed_length, s_temp.move_length, 1,
713 KEY_OP_DEBUG_LOG_ADD_2))
714 goto err;
715
716 DBUG_ASSERT(leaf_page->size <= share->max_index_block_size);
717 DBUG_RETURN(new_leaf_length <=
718 (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
719 (uint) keyinfo->underflow_block_length));
720err:
721 if (next_buff)
722 my_afree(next_buff);
723
724 DBUG_RETURN(-1);
725} /* del */
726
727
728/**
729 @brief Balances adjacent pages if underflow occours
730
731 @fn underflow()
732 @param anc_buff Anchestor page data
733 @param leaf_page Leaf page (page that underflowed)
734 @param leaf_page_link Pointer to pin information about leaf page
735 @param keypos Position after current key in anc_buff
736
737 @note
738 This function writes redo entries for all changes
739 leaf_page is saved to disk
740 Caller must save anc_buff
741
742 For the algoritm to work, we have to ensure for packed keys that
743 key_length + (underflow_length + max_block_length + key_length) / 2
744 <= block_length.
745 From which follows that underflow_length <= block_length - key_length *3
746 For not packed keys we have:
747 (underflow_length + max_block_length + key_length) / 2 <= block_length
748 From which follows that underflow_length < block_length - key_length
749 This is ensured by setting of underflow_block_length.
750
751 @return
752 @retval 0 ok
753 @retval 1 ok, but anc_page did underflow
754 @retval -1 error
755 */
756
757static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
758 MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
759 uchar *keypos)
760{
761 int t_length;
762 uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
763 uint next_buff_length, new_buff_length, key_reflength;
764 uint unchanged_leaf_length, new_leaf_length, new_anc_length;
765 uint anc_page_flag, page_flag;
766 uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
767 uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
768 uchar *anc_buff, *leaf_buff;
769 uchar *after_key, *anc_end_pos;
770 MARIA_KEY_PARAM key_deleted, key_inserted;
771 MARIA_SHARE *share= info->s;
772 my_bool first_key;
773 MARIA_KEY tmp_key, anc_key, leaf_key;
774 MARIA_PAGE next_page;
775 DBUG_ENTER("underflow");
776 DBUG_PRINT("enter",("leaf_page: %lu keypos: %p",
777 (ulong) (leaf_page->pos / share->block_size),
778 keypos));
779 DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size);
780 DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
781
782 anc_page_flag= anc_page->flag;
783 anc_buff= anc_page->buff;
784 leaf_buff= leaf_page->buff;
785 info->keyread_buff_used=1;
786 next_keypos=keypos;
787 nod_flag= leaf_page->node;
788 p_length= nod_flag+share->keypage_header;
789 anc_length= anc_page->size;
790 leaf_length= leaf_page->size;
791 key_reflength= share->base.key_reflength;
792 if (share->keyinfo+info->lastinx == keyinfo)
793 info->page_changed=1;
794 first_key= keypos == anc_buff + share->keypage_header + key_reflength;
795
796 tmp_key.data= info->buff;
797 anc_key.data= anc_key_buff;
798 leaf_key.data= leaf_key_buff;
799 tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
800
801 if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
802 first_key)
803 {
804 uint tmp_length;
805 uint next_page_flag;
806 /* Use page right of anc-page */
807 DBUG_PRINT("test",("use right page"));
808
809 /*
810 Calculate position after the current key. Note that keydata itself is
811 not used
812 */
813 if (keyinfo->flag & HA_BINARY_PACK_KEY)
814 {
815 if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
816 goto err;
817 }
818 else
819 {
820 /* Avoid length error check if packed key */
821 tmp_key.data[0]= tmp_key.data[1]= 0;
822 /* Got to end of found key */
823 if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
824 &next_keypos))
825 goto err;
826 }
827 next_page.pos= _ma_kpos(key_reflength, next_keypos);
828 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
829 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
830 goto err;
831 next_buff_length= next_page.size;
832 next_page_flag= next_page.flag;
833 DBUG_DUMP("next", next_page.buff, next_page.size);
834
835 /* find keys to make a big key-page */
836 bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
837 key_reflength);
838
839 if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
840 !_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
841 goto err;
842
843 /* merge pages and put parting key from anc_page between */
844 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
845 t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
846 prev_key, prev_key, &key_inserted);
847 tmp_length= next_buff_length - p_length;
848 endpos= next_page.buff + tmp_length + leaf_length + t_length;
849 /* next_page.buff will always be larger than before !*/
850 bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
851 memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
852 (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
853 buff_length= (uint) (endpos - next_page.buff);
854
855 /* Set page flag from combination of both key pages and parting key */
856 page_flag= next_page_flag | leaf_page->flag;
857 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
858 SEARCH_PAGE_KEY_HAS_TRANSID))
859 page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
860
861 next_page.size= buff_length;
862 next_page.flag= page_flag;
863 page_store_info(share, &next_page);
864
865 /* remove key from anc_page */
866 if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
867 anc_key_buff, anc_buff+anc_length,
868 (my_off_t *) 0, &key_deleted)))
869 goto err;
870
871 new_anc_length= anc_length - s_length;
872 anc_page->size= new_anc_length;
873 page_store_size(share, anc_page);
874
875 if (buff_length <= share->max_index_block_size)
876 {
877 /* All keys fitted into one page */
878 page_mark_changed(info, &next_page);
879 if (_ma_dispose(info, next_page.pos, 0))
880 goto err;
881
882 memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
883 leaf_page->size= next_page.size;
884 leaf_page->flag= next_page.flag;
885
886 if (share->now_transactional)
887 {
888 /*
889 Log changes to parent page. Note that this page may have been
890 temporarily bigger than block_size.
891 */
892 if (_ma_log_delete(anc_page, key_deleted.key_pos,
893 key_deleted.changed_length,
894 key_deleted.move_length,
895 anc_length - anc_page->org_size,
896 KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
897 goto err;
898 /*
899 Log changes to leaf page. Data for leaf page is in leaf_buff
900 which contains original leaf_buff, parting key and next_buff
901 */
902 if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
903 goto err;
904 }
905 }
906 else
907 {
908 /*
909 Balancing didn't free a page, so we have to split 'buff' into two
910 pages:
911 - Find key in middle of buffer
912 - Store everything before key in 'leaf_page'
913 - Pack key into anc_page at position of deleted key
914 Note that anc_page may overflow! (is handled by caller)
915 - Store remaining keys in next_page (buff)
916 */
917 MARIA_KEY_PARAM anc_key_inserted;
918
919 anc_end_pos= anc_buff + new_anc_length;
920
921 DBUG_PRINT("test",("anc_buff:%p anc_end_pos:%p",
922 anc_buff, anc_end_pos));
923
924 if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
925 goto err;
926 if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
927 goto err;
928 new_leaf_length= (uint) (half_pos - next_page.buff);
929 memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
930
931 leaf_page->size= new_leaf_length;
932 leaf_page->flag= page_flag;
933 page_store_info(share, leaf_page);
934
935 /* Correct new keypointer to leaf_page */
936 half_pos=after_key;
937 _ma_kpointer(info,
938 leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
939 next_page.pos);
940
941 /* Save key in anc_page */
942 prev_key= (first_key ? (uchar*) 0 : anc_key.data);
943 t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
944 (keypos == anc_end_pos ? (uchar*) 0 :
945 keypos),
946 prev_key, prev_key, &anc_key_inserted);
947 if (t_length >= 0)
948 bmove_upp(anc_end_pos+t_length, anc_end_pos,
949 (uint) (anc_end_pos - keypos));
950 else
951 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
952 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
953 new_anc_length+= t_length;
954 anc_page->size= new_anc_length;
955 page_store_size(share, anc_page);
956
957 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
958 SEARCH_PAGE_KEY_HAS_TRANSID))
959 _ma_mark_page_with_transid(share, anc_page);
960
961 /* Store key first in new page */
962 if (nod_flag)
963 bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
964 (size_t) nod_flag);
965 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
966 goto err;
967 t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
968 (uchar*) 0, (uchar*) 0,
969 &key_inserted);
970 /* t_length will always be > 0 for a new page !*/
971 tmp_length= (uint) ((next_page.buff + buff_length) - half_pos);
972 bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
973 (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
974 new_buff_length= tmp_length + t_length + p_length;
975 next_page.size= new_buff_length;
976 page_store_size(share, &next_page);
977 /* keypage flag is already up to date */
978
979 if (share->now_transactional)
980 {
981 /*
982 Log changes to parent page
983 This has one key deleted from it and one key inserted to it at
984 keypos
985
986 ma_log_add ensures that we don't log changes that is outside of
987 key block size, as the REDO code can't handle that
988 */
989 if (_ma_log_add(anc_page, anc_length, keypos,
990 anc_key_inserted.move_length +
991 MY_MAX(anc_key_inserted.changed_length -
992 anc_key_inserted.move_length,
993 key_deleted.changed_length),
994 anc_key_inserted.move_length -
995 key_deleted.move_length, 1,
996 KEY_OP_DEBUG_LOG_ADD_3))
997 goto err;
998
999 /*
1000 Log changes to leaf page.
1001 This contains original data with new data added at end
1002 */
1003 DBUG_ASSERT(leaf_length <= new_leaf_length);
1004 if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
1005 goto err;
1006 /*
1007 Log changes to next page
1008
1009 This contains original data with some prefix data deleted and
1010 some compressed data at start possible extended
1011
1012 Data in buff was originally:
1013 org_leaf_buff [leaf_length]
1014 separator_key [buff_key_inserted.move_length]
1015 next_key_changes [buff_key_inserted.changed_length -move_length]
1016 next_page_data [next_buff_length - p_length -
1017 (buff_key_inserted.changed_length -move_length)]
1018
1019 After changes it's now:
1020 unpacked_key [key_inserted.changed_length]
1021 next_suffix [next_buff_length - key_inserted.changed_length]
1022
1023 */
1024 DBUG_ASSERT(new_buff_length <= next_buff_length);
1025 if (_ma_log_prefix(&next_page, key_inserted.changed_length,
1026 (int) (new_buff_length - next_buff_length),
1027 KEY_OP_DEBUG_LOG_PREFIX_1))
1028 goto err;
1029 }
1030 page_mark_changed(info, &next_page);
1031 if (_ma_write_keypage(&next_page,
1032 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1033 goto err;
1034 }
1035
1036 page_mark_changed(info, leaf_page);
1037 if (_ma_write_keypage(leaf_page,
1038 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1039 goto err;
1040 DBUG_RETURN(new_anc_length <=
1041 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1042 (uint) keyinfo->underflow_block_length)));
1043 }
1044
1045 DBUG_PRINT("test",("use left page"));
1046
1047 keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1048 if (!keypos)
1049 goto err;
1050 next_page.pos= _ma_kpos(key_reflength,keypos);
1051 if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1052 PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1053 goto err;
1054 buff_length= next_page.size;
1055 endpos= next_page.buff + buff_length;
1056 DBUG_DUMP("prev", next_page.buff, next_page.size);
1057
1058 /* find keys to make a big key-page */
1059 bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1060 key_reflength);
1061 next_keypos=keypos;
1062 if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
1063 &next_keypos))
1064 goto err;
1065 if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1066 goto err;
1067
1068 /* merge pages and put parting key from anc_page between */
1069 prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
1070 t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1071 (leaf_length == p_length ?
1072 (uchar*) 0 : leaf_buff+p_length),
1073 prev_key, prev_key,
1074 &key_inserted);
1075 if (t_length >= 0)
1076 bmove(endpos+t_length, leaf_buff+p_length,
1077 (size_t) (leaf_length-p_length));
1078 else /* We gained space */
1079 bmove(endpos,leaf_buff+((int) p_length-t_length),
1080 (size_t) (leaf_length-p_length+t_length));
1081 (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1082
1083 /* Remember for logging how many bytes of leaf_buff that are not changed */
1084 DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1085 unchanged_leaf_length= (leaf_length - p_length -
1086 (key_inserted.changed_length -
1087 key_inserted.move_length));
1088
1089 new_buff_length= buff_length + leaf_length - p_length + t_length;
1090
1091#ifdef EXTRA_DEBUG
1092 /* Ensure that unchanged_leaf_length is correct */
1093 DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
1094 leaf_buff + leaf_length - unchanged_leaf_length,
1095 unchanged_leaf_length) == 0);
1096#endif
1097
1098 page_flag= next_page.flag | leaf_page->flag;
1099 if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1100 SEARCH_PAGE_KEY_HAS_TRANSID))
1101 page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
1102
1103 next_page.size= new_buff_length;
1104 next_page.flag= page_flag;
1105 page_store_info(share, &next_page);
1106
1107 /* remove key from anc_page */
1108 if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
1109 anc_key_buff,
1110 anc_buff+anc_length, (my_off_t *) 0,
1111 &key_deleted)))
1112 goto err;
1113
1114 new_anc_length= anc_length - s_length;
1115 anc_page->size= new_anc_length;
1116 page_store_size(share, anc_page);
1117
1118 if (new_buff_length <= share->max_index_block_size)
1119 {
1120 /* All keys fitted into one page */
1121 page_mark_changed(info, leaf_page);
1122 if (_ma_dispose(info, leaf_page->pos, 0))
1123 goto err;
1124
1125 if (share->now_transactional)
1126 {
1127 /*
1128 Log changes to parent page. Note that this page may have been
1129 temporarily bigger than block_size.
1130 */
1131 if (_ma_log_delete(anc_page, key_deleted.key_pos,
1132 key_deleted.changed_length, key_deleted.move_length,
1133 anc_length - anc_page->org_size,
1134 KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
1135 goto err;
1136 /*
1137 Log changes to next page. Data for leaf page is in buff
1138 that contains original leaf_buff, parting key and next_buff
1139 */
1140 if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1141 goto err;
1142 }
1143 }
1144 else
1145 {
1146 /*
1147 Balancing didn't free a page, so we have to split 'next_page' into two
1148 pages
1149 - Find key in middle of buffer (buff)
1150 - Pack key at half_buff into anc_page at position of deleted key
1151 Note that anc_page may overflow! (is handled by caller)
1152 - Move everything after middlekey to 'leaf_buff'
1153 - Shorten buff at 'endpos'
1154 */
1155 MARIA_KEY_PARAM anc_key_inserted;
1156 size_t tmp_length;
1157
1158 if (keypos == anc_buff + share->keypage_header + key_reflength)
1159 anc_pos= 0; /* First key */
1160 else
1161 {
1162 if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1163 goto err;
1164 anc_pos= anc_key.data;
1165 }
1166 if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1167 goto err;
1168
1169 /* Correct new keypointer to leaf_page */
1170 _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1171 leaf_key.ref_length, leaf_page->pos);
1172
1173 /* Save parting key found by _ma_find_half_pos() in anc_page */
1174 DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1175 DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1176 anc_end_pos= anc_buff + new_anc_length;
1177 t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1178 keypos == anc_end_pos ? (uchar*) 0
1179 : keypos,
1180 anc_pos, anc_pos,
1181 &anc_key_inserted);
1182 if (t_length >= 0)
1183 bmove_upp(anc_end_pos+t_length, anc_end_pos,
1184 (uint) (anc_end_pos-keypos));
1185 else
1186 bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
1187 (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1188 new_anc_length+= t_length;
1189 anc_page->size= new_anc_length;
1190 page_store_size(share, anc_page);
1191
1192 if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1193 SEARCH_PAGE_KEY_HAS_TRANSID))
1194 _ma_mark_page_with_transid(share, anc_page);
1195
1196 /* Store first key on new page */
1197 if (nod_flag)
1198 bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1199 (size_t) nod_flag);
1200 if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1201 goto err;
1202 DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
1203 t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1204 (uchar*) 0, (uchar*) 0, &key_inserted);
1205 /* t_length will always be > 0 for a new page !*/
1206 tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1207 DBUG_PRINT("info",("t_length: %d length: %d",t_length, (int) tmp_length));
1208 bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1209 (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1210 new_leaf_length= (uint)(tmp_length + t_length + p_length);
1211 DBUG_ASSERT(new_leaf_length <= share->max_index_block_size);
1212
1213 leaf_page->size= new_leaf_length;
1214 leaf_page->flag= page_flag;
1215 page_store_info(share, leaf_page);
1216
1217 new_buff_length= (uint) (endpos - next_page.buff);
1218 next_page.size= new_buff_length;
1219 page_store_size(share, &next_page);
1220
1221 if (share->now_transactional)
1222 {
1223 /*
1224 Log changes to parent page
1225 This has one key deleted from it and one key inserted to it at
1226 keypos
1227
1228 ma_log_add() ensures that we don't log changes that is outside of
1229 key block size, as the REDO code can't handle that
1230 */
1231 if (_ma_log_add(anc_page, anc_length, keypos,
1232 anc_key_inserted.move_length +
1233 MY_MAX(anc_key_inserted.changed_length -
1234 anc_key_inserted.move_length,
1235 key_deleted.changed_length),
1236 anc_key_inserted.move_length -
1237 key_deleted.move_length, 1,KEY_OP_DEBUG_LOG_ADD_4))
1238 goto err;
1239
1240 /*
1241 Log changes to leaf page.
1242 This contains original data with new data added first
1243 */
1244 DBUG_ASSERT(leaf_length <= new_leaf_length);
1245 DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
1246 if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1247 (int) (new_leaf_length - leaf_length),
1248 KEY_OP_DEBUG_LOG_PREFIX_2))
1249 goto err;
1250 /*
1251 Log changes to next page
1252 This contains original data with some suffix data deleted
1253 */
1254 DBUG_ASSERT(new_buff_length <= buff_length);
1255 if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1256 goto err;
1257 }
1258
1259 page_mark_changed(info, leaf_page);
1260 if (_ma_write_keypage(leaf_page,
1261 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1262 goto err;
1263 }
1264 page_mark_changed(info, &next_page);
1265 if (_ma_write_keypage(&next_page,
1266 PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1267 goto err;
1268
1269 DBUG_RETURN(new_anc_length <=
1270 ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1271 (uint) keyinfo->underflow_block_length)));
1272
1273err:
1274 DBUG_RETURN(-1);
1275} /* underflow */
1276
1277
1278/**
1279 @brief Remove a key from page
1280
1281 @fn remove_key()
1282 keyinfo Key handle
1283 nod_flag Length of node ptr
1284 keypos Where on page key starts
1285 lastkey Buffer for storing keys to be removed
1286 page_end Pointer to end of page
1287 next_block If <> 0 and node-page, this is set to address of
1288 next page
1289 s_temp Information about what changes was done one the page:
1290 s_temp.key_pos Start of key
1291 s_temp.move_length Number of bytes removed at keypos
1292 s_temp.changed_length Number of bytes changed at keypos
1293
1294 @todo
1295 The current code doesn't handle the case that the next key may be
1296 packed better against the previous key if there is a case difference
1297
1298 @return
1299 @retval 0 error
1300 @retval # How many chars was removed
1301*/
1302
1303static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1304 uchar *keypos, uchar *lastkey,
1305 uchar *page_end, my_off_t *next_block,
1306 MARIA_KEY_PARAM *s_temp)
1307{
1308 int s_length;
1309 uchar *start;
1310 DBUG_ENTER("remove_key");
1311 DBUG_PRINT("enter", ("keypos:%p page_end: %p",
1312 keypos, page_end));
1313
1314 start= s_temp->key_pos= keypos;
1315 s_temp->changed_length= 0;
1316 if (!(keyinfo->flag &
1317 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1318 HA_BINARY_PACK_KEY)) &&
1319 !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1320 {
1321 /* Static length key */
1322 s_length=(int) (keyinfo->keylength+nod_flag);
1323 if (next_block && nod_flag)
1324 *next_block= _ma_kpos(nod_flag,keypos+s_length);
1325 }
1326 else
1327 {
1328 /* Let keypos point at next key */
1329 MARIA_KEY key;
1330
1331 /* Calculate length of key */
1332 key.keyinfo= keyinfo;
1333 key.data= lastkey;
1334 if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1335 DBUG_RETURN(0); /* Error */
1336
1337 if (next_block && nod_flag)
1338 *next_block= _ma_kpos(nod_flag,keypos);
1339 s_length=(int) (keypos-start);
1340 if (keypos != page_end)
1341 {
1342 if (keyinfo->flag & HA_BINARY_PACK_KEY)
1343 {
1344 uchar *old_key= start;
1345 uint next_length,prev_length,prev_pack_length;
1346
1347 /* keypos points here on start of next key */
1348 get_key_length(next_length,keypos);
1349 get_key_pack_length(prev_length,prev_pack_length,old_key);
1350 if (next_length > prev_length)
1351 {
1352 uint diff= (next_length-prev_length);
1353 /* We have to copy data from the current key to the next key */
1354 keypos-= diff + prev_pack_length;
1355 store_key_length(keypos, prev_length);
1356 bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1357 s_length=(int) (keypos-start);
1358 s_temp->changed_length= diff + prev_pack_length;
1359 }
1360 }
1361 else
1362 {
1363 /* Check if a variable length first key part */
1364 if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
1365 {
1366 /* Next key is packed against the current one */
1367 uint next_length,prev_length,prev_pack_length,lastkey_length,
1368 rest_length;
1369 if (keyinfo->seg[0].length >= 127)
1370 {
1371 if (!(prev_length=mi_uint2korr(start) & 32767))
1372 goto end;
1373 next_length=mi_uint2korr(keypos) & 32767;
1374 keypos+=2;
1375 prev_pack_length=2;
1376 }
1377 else
1378 {
1379 if (!(prev_length= *start & 127))
1380 goto end; /* Same key as previous*/
1381 next_length= *keypos & 127;
1382 keypos++;
1383 prev_pack_length=1;
1384 }
1385 if (!(*start & 128))
1386 prev_length=0; /* prev key not packed */
1387 if (keyinfo->seg[0].flag & HA_NULL_PART)
1388 lastkey++; /* Skip null marker */
1389 get_key_length(lastkey_length,lastkey);
1390 if (!next_length) /* Same key after */
1391 {
1392 next_length=lastkey_length;
1393 rest_length=0;
1394 }
1395 else
1396 get_key_length(rest_length,keypos);
1397
1398 if (next_length >= prev_length)
1399 {
1400 /* Next key is based on deleted key */
1401 uint pack_length;
1402 uint diff= (next_length-prev_length);
1403
1404 /* keypos points to data of next key (after key length) */
1405 bmove(keypos - diff, lastkey + prev_length, diff);
1406 rest_length+= diff;
1407 pack_length= prev_length ? get_pack_length(rest_length): 0;
1408 keypos-= diff + pack_length + prev_pack_length;
1409 s_length=(int) (keypos-start);
1410 if (prev_length) /* Pack against prev key */
1411 {
1412 *keypos++= start[0];
1413 if (prev_pack_length == 2)
1414 *keypos++= start[1];
1415 store_key_length(keypos,rest_length);
1416 }
1417 else
1418 {
1419 /* Next key is not packed anymore */
1420 if (keyinfo->seg[0].flag & HA_NULL_PART)
1421 {
1422 rest_length++; /* Mark not null */
1423 }
1424 if (prev_pack_length == 2)
1425 {
1426 mi_int2store(keypos,rest_length);
1427 }
1428 else
1429 *keypos= rest_length;
1430 }
1431 s_temp->changed_length= diff + pack_length + prev_pack_length;
1432 }
1433 }
1434 }
1435 }
1436 }
1437 end:
1438 bmove(start, start+s_length, (uint) (page_end-start-s_length));
1439 s_temp->move_length= s_length;
1440 DBUG_RETURN((uint) s_length);
1441} /* remove_key */
1442
1443
1444/****************************************************************************
1445 Logging of redos
1446****************************************************************************/
1447
1448/**
1449 @brief
1450 log entry where some parts are deleted and some things are changed
1451 and some data could be added last.
1452
1453 @fn _ma_log_delete()
1454 @param info Maria handler
1455 @param page Pageaddress for changed page
1456 @param buff Page buffer
1457 @param key_pos Start of change area
1458 @param changed_length How many bytes where changed at key_pos
1459 @param move_length How many bytes where deleted at key_pos
1460 @param append_length Length of data added last
1461 This is taken from end of ma_page->buff
1462
1463 This is mainly used when a key is deleted. The append happens
1464 when we delete a key from a page with data > block_size kept in
1465 memory and we have to add back the data that was stored > block_size
1466*/
1467
1468my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1469 uint changed_length, uint move_length,
1470 uint append_length __attribute__((unused)),
1471 enum en_key_debug debug_marker __attribute__((unused)))
1472{
1473 LSN lsn;
1474 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 5+ 2 + 3 + 3 + 6 + 3 + 7];
1475 uchar *log_pos;
1476 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
1477 uint translog_parts, current_size, extra_length;
1478 uint offset= (uint) (key_pos - ma_page->buff);
1479 MARIA_HA *info= ma_page->info;
1480 MARIA_SHARE *share= info->s;
1481 my_off_t page= ma_page->pos / share->block_size;
1482 DBUG_ENTER("_ma_log_delete");
1483 DBUG_PRINT("enter", ("page: %lu offset: %u changed_length: %u move_length: %u append_length: %u page_size: %u",
1484 (ulong) page, offset, changed_length, move_length,
1485 append_length, ma_page->size));
1486 DBUG_ASSERT(share->now_transactional && move_length);
1487 DBUG_ASSERT(offset + changed_length <= ma_page->size);
1488 DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
1489 DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
1490
1491 /* Store address of new root page */
1492 page_store(log_data + FILEID_STORE_SIZE, page);
1493 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1494 current_size= ma_page->org_size;
1495
1496#ifdef EXTRA_DEBUG_KEY_CHANGES
1497 *log_pos++= KEY_OP_DEBUG;
1498 *log_pos++= debug_marker;
1499
1500 *log_pos++= KEY_OP_DEBUG_2;
1501 int2store(log_pos, ma_page->org_size);
1502 int2store(log_pos+2, ma_page->size);
1503 log_pos+=4;
1504#endif
1505
1506 /* Store keypage_flag */
1507 *log_pos++= KEY_OP_SET_PAGEFLAG;
1508 *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff);
1509
1510 log_pos[0]= KEY_OP_OFFSET;
1511 int2store(log_pos+1, offset);
1512 log_pos+= 3;
1513 translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
1514 extra_length= 0;
1515
1516 if (changed_length)
1517 {
1518 if (offset + changed_length >= share->max_index_block_size)
1519 {
1520 changed_length= share->max_index_block_size - offset;
1521 move_length= 0; /* Nothing to move */
1522 current_size= share->max_index_block_size;
1523 }
1524
1525 log_pos[0]= KEY_OP_CHANGE;
1526 int2store(log_pos+1, changed_length);
1527 log_pos+= 3;
1528 log_array[translog_parts].str= ma_page->buff + offset;
1529 log_array[translog_parts].length= changed_length;
1530 translog_parts++;
1531
1532 /* We only have to move things after offset+changed_length */
1533 offset+= changed_length;
1534 }
1535
1536 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1537 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1538
1539 if (move_length)
1540 {
1541 uint log_length;
1542 if (offset + move_length < share->max_index_block_size)
1543 {
1544 /*
1545 Move down things that is on page.
1546 page_offset in apply_redo_inxed() will be at original offset
1547 + changed_length.
1548 */
1549 log_pos[0]= KEY_OP_SHIFT;
1550 int2store(log_pos+1, - (int) move_length);
1551 log_length= 3;
1552 current_size-= move_length;
1553 }
1554 else
1555 {
1556 /* Delete to end of page */
1557 uint tmp= current_size - offset;
1558 current_size= offset;
1559 log_pos[0]= KEY_OP_DEL_SUFFIX;
1560 int2store(log_pos+1, tmp);
1561 log_length= 3;
1562 }
1563 log_array[translog_parts].str= log_pos;
1564 log_array[translog_parts].length= log_length;
1565 translog_parts++;
1566 log_pos+= log_length;
1567 extra_length+= log_length;
1568 }
1569
1570 if (current_size != ma_page->size &&
1571 current_size != share->max_index_block_size)
1572 {
1573 /* Append data that didn't fit on the page before */
1574 uint length= (MY_MIN(ma_page->size, share->max_index_block_size) -
1575 current_size);
1576 uchar *data= ma_page->buff + current_size;
1577
1578 DBUG_ASSERT(length <= append_length);
1579
1580 log_pos[0]= KEY_OP_ADD_SUFFIX;
1581 int2store(log_pos+1, length);
1582 log_array[translog_parts].str= log_pos;
1583 log_array[translog_parts].length= 3;
1584 log_array[translog_parts + 1].str= data;
1585 log_array[translog_parts + 1].length= length;
1586 log_pos+= 3;
1587 translog_parts+= 2;
1588 current_size+= length;
1589 extra_length+= 3 + length;
1590 }
1591
1592 _ma_log_key_changes(ma_page,
1593 log_array + translog_parts,
1594 log_pos, &extra_length, &translog_parts);
1595 /* Remember new page length for future log entires for same page */
1596 ma_page->org_size= current_size;
1597
1598 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1599 info->trn, info,
1600 (translog_size_t)
1601 log_array[TRANSLOG_INTERNAL_PARTS].length +
1602 changed_length + extra_length, translog_parts,
1603 log_array, log_data, NULL))
1604 DBUG_RETURN(1);
1605
1606 DBUG_RETURN(0);
1607}
1608
1609
1610/****************************************************************************
1611 Logging of undos
1612****************************************************************************/
1613
1614my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
1615 my_off_t new_root, LSN *res_lsn)
1616{
1617 MARIA_SHARE *share= info->s;
1618 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1619 KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
1620 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1621 struct st_msg_to_write_hook_for_undo_key msg;
1622 enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1623 uint keynr= key->keyinfo->key_nr;
1624
1625 lsn_store(log_data, info->trn->undo_lsn);
1626 key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
1627 log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
1628
1629 /**
1630 @todo BUG if we had concurrent insert/deletes, reading state's key_root
1631 like this would be unsafe.
1632 */
1633 if (new_root != share->state.key_root[keynr])
1634 {
1635 my_off_t page;
1636 page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1637 new_root / share->block_size);
1638 page_store(log_pos, page);
1639 log_pos+= PAGE_STORE_SIZE;
1640 log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
1641 }
1642
1643 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1644 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1645 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1646 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
1647 key->ref_length);
1648
1649 msg.root= &share->state.key_root[keynr];
1650 msg.value= new_root;
1651 /*
1652 set autoincrement to 1 if this is an auto_increment key
1653 This is only used if we are now in a rollback of a duplicate key
1654 */
1655 msg.auto_increment= share->base.auto_key == keynr + 1;
1656
1657 return translog_write_record(res_lsn, log_type,
1658 info->trn, info,
1659 (translog_size_t)
1660 (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1661 log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
1662 TRANSLOG_INTERNAL_PARTS + 2, log_array,
1663 log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1664}
1665