1/* Copyright (C) 2007 Michael Widenius
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16/* Redo of index */
17
18#include "maria_def.h"
19#include "ma_blockrec.h"
20#include "trnman.h"
21#include "ma_key_recover.h"
22#include "ma_rt_index.h"
23
24/****************************************************************************
25 Some helper functions used both by key page loggin and block page loggin
26****************************************************************************/
27
28/**
29 @brief Unpin all pinned pages
30
31 @fn _ma_unpin_all_pages()
32 @param info Maria handler
33 @param undo_lsn LSN for undo pages. LSN_IMPOSSIBLE if we shouldn't write
34 undo (like on duplicate key errors)
35
36 info->pinned_pages is the list of pages to unpin. Each member of the list
37 must have its 'changed' saying if the page was changed or not.
38
39 @note
40 We unpin pages in the reverse order as they where pinned; This is not
41 necessary now, but may simplify things in the future.
42
43 @return
44 @retval 0 ok
45 @retval 1 error (fatal disk error)
46*/
47
48void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
49{
50 MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*)
51 dynamic_array_ptr(&info->pinned_pages, 0));
52 MARIA_PINNED_PAGE *pinned_page= page_link + info->pinned_pages.elements;
53 DBUG_ENTER("_ma_unpin_all_pages");
54 DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn));
55
56 if (!info->s->now_transactional)
57 DBUG_ASSERT(undo_lsn == LSN_IMPOSSIBLE || maria_in_recovery);
58
59 while (pinned_page-- != page_link)
60 {
61 /*
62 Note this assert fails if we got a disk error or the record file
63 is corrupted, which means we should have this enabled only in debug
64 builds.
65 */
66#ifdef EXTRA_DEBUG
67 DBUG_ASSERT((!pinned_page->changed ||
68 undo_lsn != LSN_IMPOSSIBLE || !info->s->now_transactional) ||
69 (info->s->state.changed & STATE_CRASHED_FLAGS));
70#endif
71 pagecache_unlock_by_link(info->s->pagecache, pinned_page->link,
72 pinned_page->unlock, PAGECACHE_UNPIN,
73 info->trn->rec_lsn, undo_lsn,
74 pinned_page->changed, FALSE);
75 }
76
77 info->pinned_pages.elements= 0;
78 DBUG_VOID_RETURN;
79}
80
81
82my_bool _ma_write_clr(MARIA_HA *info, LSN undo_lsn,
83 enum translog_record_type undo_type,
84 my_bool store_checksum, ha_checksum checksum,
85 LSN *res_lsn, void *extra_msg)
86{
87 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE +
88 HA_CHECKSUM_STORE_SIZE+ KEY_NR_STORE_SIZE + PAGE_STORE_SIZE];
89 uchar *log_pos;
90 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
91 struct st_msg_to_write_hook_for_clr_end msg;
92 my_bool res;
93 DBUG_ENTER("_ma_write_clr");
94
95 /* undo_lsn must be first for compression to work */
96 lsn_store(log_data, undo_lsn);
97 clr_type_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, undo_type);
98 log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE;
99
100 /* Extra_msg is handled in write_hook_for_clr_end() */
101 msg.undone_record_type= undo_type;
102 msg.previous_undo_lsn= undo_lsn;
103 msg.extra_msg= extra_msg;
104 msg.checksum_delta= 0;
105
106 if (store_checksum)
107 {
108 msg.checksum_delta= checksum;
109 ha_checksum_store(log_pos, checksum);
110 log_pos+= HA_CHECKSUM_STORE_SIZE;
111 }
112 else if (undo_type == LOGREC_UNDO_KEY_INSERT_WITH_ROOT ||
113 undo_type == LOGREC_UNDO_KEY_DELETE_WITH_ROOT)
114 {
115 /* Key root changed. Store new key root */
116 struct st_msg_to_write_hook_for_undo_key *undo_msg= extra_msg;
117 pgcache_page_no_t page;
118 key_nr_store(log_pos, undo_msg->keynr);
119 page= (undo_msg->value == HA_OFFSET_ERROR ? IMPOSSIBLE_PAGE_NO :
120 undo_msg->value / info->s->block_size);
121 page_store(log_pos + KEY_NR_STORE_SIZE, page);
122 log_pos+= KEY_NR_STORE_SIZE + PAGE_STORE_SIZE;
123 }
124 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
125 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
126
127
128 /*
129 We need intern_lock mutex for calling _ma_state_info_write in the trigger.
130 We do it here to have the same sequence of mutexes locking everywhere
131 (first intern_lock then transactional log buffer lock)
132 */
133 if (undo_type == LOGREC_UNDO_BULK_INSERT)
134 mysql_mutex_lock(&info->s->intern_lock);
135
136 res= translog_write_record(res_lsn, LOGREC_CLR_END,
137 info->trn, info,
138 (translog_size_t)
139 log_array[TRANSLOG_INTERNAL_PARTS + 0].length,
140 TRANSLOG_INTERNAL_PARTS + 1, log_array,
141 log_data + LSN_STORE_SIZE, &msg);
142 if (undo_type == LOGREC_UNDO_BULK_INSERT)
143 mysql_mutex_unlock(&info->s->intern_lock);
144 DBUG_RETURN(res);
145}
146
147
148/**
149 @brief Sets transaction's undo_lsn, first_undo_lsn if needed
150
151 @return Operation status, always 0 (success)
152*/
153
154my_bool write_hook_for_clr_end(enum translog_record_type type
155 __attribute__ ((unused)),
156 TRN *trn, MARIA_HA *tbl_info,
157 LSN *lsn __attribute__ ((unused)),
158 void *hook_arg)
159{
160 MARIA_SHARE *share= tbl_info->s;
161 struct st_msg_to_write_hook_for_clr_end *msg=
162 (struct st_msg_to_write_hook_for_clr_end *)hook_arg;
163 my_bool error= FALSE;
164 DBUG_ENTER("write_hook_for_clr_end");
165 DBUG_ASSERT(trn->trid != 0);
166 trn->undo_lsn= msg->previous_undo_lsn;
167
168 switch (msg->undone_record_type) {
169 case LOGREC_UNDO_ROW_DELETE:
170 share->state.state.records++;
171 share->state.state.checksum+= msg->checksum_delta;
172 break;
173 case LOGREC_UNDO_ROW_INSERT:
174 share->state.state.records--;
175 share->state.state.checksum+= msg->checksum_delta;
176 break;
177 case LOGREC_UNDO_ROW_UPDATE:
178 share->state.state.checksum+= msg->checksum_delta;
179 break;
180 case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
181 case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
182 {
183 /* Update key root */
184 struct st_msg_to_write_hook_for_undo_key *extra_msg=
185 (struct st_msg_to_write_hook_for_undo_key *) msg->extra_msg;
186 *extra_msg->root= extra_msg->value;
187 break;
188 }
189 case LOGREC_UNDO_KEY_INSERT:
190 case LOGREC_UNDO_KEY_DELETE:
191 break;
192 case LOGREC_UNDO_BULK_INSERT:
193 mysql_mutex_assert_owner(&share->intern_lock);
194 error= (maria_enable_indexes(tbl_info) ||
195 /* we enabled indices, need '2' below */
196 _ma_state_info_write(share,
197 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
198 MA_STATE_INFO_WRITE_FULL_INFO));
199 /* no need for _ma_reset_status(): REDO_DELETE_ALL is just before us */
200 break;
201 default:
202 DBUG_ASSERT(0);
203 }
204 if (trn->undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
205 trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
206 DBUG_RETURN(error);
207}
208
209
210/**
211 @brief write hook for undo key
212*/
213
214my_bool write_hook_for_undo_key(enum translog_record_type type,
215 TRN *trn, MARIA_HA *tbl_info,
216 LSN *lsn, void *hook_arg)
217{
218 struct st_msg_to_write_hook_for_undo_key *msg=
219 (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
220
221 *msg->root= msg->value;
222 _ma_fast_unlock_key_del(tbl_info);
223 return write_hook_for_undo(type, trn, tbl_info, lsn, 0);
224}
225
226
227/**
228 Updates "auto_increment" and calls the generic UNDO_KEY hook
229
230 @return Operation status, always 0 (success)
231*/
232
233my_bool write_hook_for_undo_key_insert(enum translog_record_type type,
234 TRN *trn, MARIA_HA *tbl_info,
235 LSN *lsn, void *hook_arg)
236{
237 struct st_msg_to_write_hook_for_undo_key *msg=
238 (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
239 MARIA_SHARE *share= tbl_info->s;
240 if (msg->auto_increment > 0)
241 {
242 /*
243 Only reason to set it here is to have a mutex protect from checkpoint
244 reading at the same time (would see a corrupted value).
245
246 The purpose of the following code is to set auto_increment if the row
247 has a with auto_increment value higher than the current one. We also
248 want to be able to restore the old value, in case of rollback,
249 if no one else has tried to set the value.
250
251 The logic used is that we only restore the auto_increment value if
252 tbl_info->last_auto_increment == share->last_auto_increment
253 when it's time to do the rollback.
254 */
255 DBUG_PRINT("info",("auto_inc: %lu new auto_inc: %lu",
256 (ulong)share->state.auto_increment,
257 (ulong)msg->auto_increment));
258 if (share->state.auto_increment < msg->auto_increment)
259 {
260 /* Remember the original value, in case of rollback */
261 tbl_info->last_auto_increment= share->last_auto_increment=
262 share->state.auto_increment;
263 share->state.auto_increment= msg->auto_increment;
264 }
265 else
266 {
267 /*
268 If the current value would have affected the original auto_increment
269 value, set it to an impossible value so that it's not restored on
270 rollback
271 */
272 if (msg->auto_increment > share->last_auto_increment)
273 share->last_auto_increment= ~(ulonglong) 0;
274 }
275 }
276 return write_hook_for_undo_key(type, trn, tbl_info, lsn, hook_arg);
277}
278
279
280/**
281 @brief Updates "share->auto_increment" in case of abort and calls
282 generic UNDO_KEY hook
283
284 @return Operation status, always 0 (success)
285*/
286
287my_bool write_hook_for_undo_key_delete(enum translog_record_type type,
288 TRN *trn, MARIA_HA *tbl_info,
289 LSN *lsn, void *hook_arg)
290{
291 struct st_msg_to_write_hook_for_undo_key *msg=
292 (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
293 MARIA_SHARE *share= tbl_info->s;
294 if (msg->auto_increment > 0) /* If auto increment key */
295 {
296 /* Restore auto increment if no one has changed it in between */
297 if (share->last_auto_increment == tbl_info->last_auto_increment &&
298 tbl_info->last_auto_increment != ~(ulonglong) 0)
299 share->state.auto_increment= tbl_info->last_auto_increment;
300 }
301 return write_hook_for_undo_key(type, trn, tbl_info, lsn, hook_arg);
302}
303
304
305/*****************************************************************************
306 Functions for logging of key page changes
307*****************************************************************************/
308
309/**
310 @brief
311 Write log entry for page that has got data added or deleted at start of page
312*/
313
314my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
315 int move_length,
316 enum en_key_debug debug_marker __attribute__((unused)))
317{
318 uint translog_parts;
319 LSN lsn;
320 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2 + 2];
321 uchar *log_pos;
322 uchar *buff= ma_page->buff;
323 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
324 MARIA_HA *info= ma_page->info;
325 pgcache_page_no_t page= ma_page->pos / info->s->block_size;
326 DBUG_ENTER("_ma_log_prefix");
327 DBUG_PRINT("enter", ("page: %lu changed_length: %u move_length: %d",
328 (ulong) page, changed_length, move_length));
329
330 DBUG_ASSERT(ma_page->size == ma_page->org_size + move_length);
331
332 log_pos= log_data + FILEID_STORE_SIZE;
333 page_store(log_pos, page);
334 log_pos+= PAGE_STORE_SIZE;
335
336#ifdef EXTRA_DEBUG_KEY_CHANGES
337 (*log_pos++)= KEY_OP_DEBUG;
338 (*log_pos++)= debug_marker;
339#endif
340
341 /* Store keypage_flag */
342 *log_pos++= KEY_OP_SET_PAGEFLAG;
343 *log_pos++= _ma_get_keypage_flag(info->s, buff);
344
345 if (move_length < 0)
346 {
347 /* Delete prefix */
348 log_pos[0]= KEY_OP_DEL_PREFIX;
349 int2store(log_pos+1, -move_length);
350 log_pos+= 3;
351 if (changed_length)
352 {
353 /*
354 We don't need a KEY_OP_OFFSET as KEY_OP_DEL_PREFIX has an implicit
355 offset
356 */
357 log_pos[0]= KEY_OP_CHANGE;
358 int2store(log_pos+1, changed_length);
359 log_pos+= 3;
360 }
361 }
362 else
363 {
364 /* Add prefix */
365 DBUG_ASSERT(changed_length >0 && (int) changed_length >= move_length);
366 log_pos[0]= KEY_OP_ADD_PREFIX;
367 int2store(log_pos+1, move_length);
368 int2store(log_pos+3, changed_length);
369 log_pos+= 5;
370 }
371
372 translog_parts= 1;
373 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
374 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
375 log_data);
376 if (changed_length)
377 {
378 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (buff +
379 info->s->keypage_header);
380 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
381 translog_parts= 2;
382 }
383
384 _ma_log_key_changes(ma_page, log_array + TRANSLOG_INTERNAL_PARTS +
385 translog_parts, log_pos, &changed_length,
386 &translog_parts);
387 /* Remember new page length for future log entires for same page */
388 ma_page->org_size= ma_page->size;
389
390 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
391 info->trn, info,
392 (translog_size_t)
393 log_array[TRANSLOG_INTERNAL_PARTS +
394 0].length + changed_length,
395 TRANSLOG_INTERNAL_PARTS + translog_parts,
396 log_array, log_data, NULL));
397}
398
399
400/**
401 @brief
402 Write log entry for page that has got data added or deleted at end of page
403*/
404
405my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
406{
407 LSN lsn;
408 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
409 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 10 + 7 + 2], *log_pos;
410 uchar *buff= ma_page->buff;
411 int diff;
412 uint translog_parts, extra_length;
413 MARIA_HA *info= ma_page->info;
414 pgcache_page_no_t page= ma_page->pos / info->s->block_size;
415 DBUG_ENTER("_ma_log_suffix");
416 DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
417 (ulong) page, org_length, new_length));
418 DBUG_ASSERT(ma_page->size == new_length);
419 DBUG_ASSERT(ma_page->org_size == org_length);
420
421 log_pos= log_data + FILEID_STORE_SIZE;
422 page_store(log_pos, page);
423 log_pos+= PAGE_STORE_SIZE;
424
425 /* Store keypage_flag */
426 *log_pos++= KEY_OP_SET_PAGEFLAG;
427 *log_pos++= _ma_get_keypage_flag(info->s, buff);
428
429 if ((diff= (int) (new_length - org_length)) < 0)
430 {
431 log_pos[0]= KEY_OP_DEL_SUFFIX;
432 int2store(log_pos+1, -diff);
433 log_pos+= 3;
434 translog_parts= 1;
435 extra_length= 0;
436 }
437 else
438 {
439 log_pos[0]= KEY_OP_ADD_SUFFIX;
440 int2store(log_pos+1, diff);
441 log_pos+= 3;
442 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= buff + org_length;
443 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (uint) diff;
444 translog_parts= 2;
445 extra_length= (uint) diff;
446 }
447
448 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
449 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
450 log_data);
451
452 _ma_log_key_changes(ma_page,
453 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
454 log_pos, &extra_length, &translog_parts);
455 /* Remember new page length for future log entires for same page */
456 ma_page->org_size= ma_page->size;
457
458 DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
459 info->trn, info,
460 (translog_size_t)
461 log_array[TRANSLOG_INTERNAL_PARTS +
462 0].length + extra_length,
463 TRANSLOG_INTERNAL_PARTS + translog_parts,
464 log_array, log_data, NULL));
465}
466
467
468/**
469 @brief Log that a key was added to the page
470
471 @param ma_page Changed page
472 @param org_page_length Length of data in page before key was added
473 Final length in ma_page->size
474
475 @note
476 If handle_overflow is set, then we have to protect against
477 logging changes that is outside of the page.
478 This may happen during underflow() handling where the buffer
479 in memory temporary contains more data than block_size
480
481 ma_page may be a page that was previously logged and cuted down
482 becasue it's too big. (org_page_length > ma_page->org_size)
483*/
484
485my_bool _ma_log_add(MARIA_PAGE *ma_page,
486 uint org_page_length __attribute__ ((unused)),
487 uchar *key_pos, uint changed_length, int move_length,
488 my_bool handle_overflow __attribute__ ((unused)),
489 enum en_key_debug debug_marker __attribute__((unused)))
490{
491 LSN lsn;
492 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 3 + 3 + 3 + 3 + 7 +
493 3 + 2];
494 uchar *log_pos;
495 uchar *buff= ma_page->buff;
496 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
497 MARIA_HA *info= ma_page->info;
498 uint offset= (uint) (key_pos - buff);
499 uint max_page_size= info->s->max_index_block_size;
500 uint translog_parts, current_size;
501 pgcache_page_no_t page_pos= ma_page->pos / info->s->block_size;
502 DBUG_ENTER("_ma_log_add");
503 DBUG_PRINT("enter", ("page: %lu org_page_length: %u changed_length: %u "
504 "move_length: %d",
505 (ulong) page_pos, org_page_length, changed_length,
506 move_length));
507 DBUG_ASSERT(info->s->now_transactional);
508 DBUG_ASSERT(move_length <= (int) changed_length);
509 DBUG_ASSERT(ma_page->org_size == MY_MIN(org_page_length, max_page_size));
510 DBUG_ASSERT(ma_page->size == org_page_length + move_length);
511 DBUG_ASSERT(offset <= ma_page->org_size);
512
513 /*
514 Write REDO entry that contains the logical operations we need
515 to do the page
516 */
517 log_pos= log_data + FILEID_STORE_SIZE;
518 page_store(log_pos, page_pos);
519 current_size= ma_page->org_size;
520 log_pos+= PAGE_STORE_SIZE;
521
522#ifdef EXTRA_DEBUG_KEY_CHANGES
523 *log_pos++= KEY_OP_DEBUG;
524 *log_pos++= debug_marker;
525#endif
526
527 /* Store keypage_flag */
528 *log_pos++= KEY_OP_SET_PAGEFLAG;
529 *log_pos++= _ma_get_keypage_flag(info->s, buff);
530
531 /*
532 Don't overwrite page boundary
533 It's ok to cut this as we will append the data at end of page
534 in the next log entry
535 */
536 if (offset + changed_length > max_page_size)
537 {
538 DBUG_ASSERT(handle_overflow);
539 changed_length= max_page_size - offset; /* Update to end of page */
540 move_length= 0; /* Nothing to move */
541 /* Extend the page to max length on recovery */
542 *log_pos++= KEY_OP_MAX_PAGELENGTH;
543 current_size= max_page_size;
544 }
545
546 /* Check if adding the key made the page overflow */
547 if (current_size + move_length > max_page_size)
548 {
549 /*
550 Adding the key caused an overflow. Cut away the part of the
551 page that doesn't fit.
552 */
553 uint diff;
554 DBUG_ASSERT(handle_overflow);
555 diff= current_size + move_length - max_page_size;
556 log_pos[0]= KEY_OP_DEL_SUFFIX;
557 int2store(log_pos+1, diff);
558 log_pos+= 3;
559 current_size= max_page_size - move_length;
560 }
561
562 if (offset == current_size)
563 {
564 log_pos[0]= KEY_OP_ADD_SUFFIX;
565 current_size+= changed_length;
566 }
567 else
568 {
569 log_pos[0]= KEY_OP_OFFSET;
570 int2store(log_pos+1, offset);
571 log_pos+= 3;
572 if (move_length)
573 {
574 if (move_length < 0)
575 {
576 DBUG_ASSERT(offset - move_length <= org_page_length);
577 if (offset - move_length > current_size)
578 {
579 /*
580 Truncate to end of page. We will add data to it from
581 the page buffer below
582 */
583 move_length= (int) offset - (int) current_size;
584 }
585 }
586 log_pos[0]= KEY_OP_SHIFT;
587 int2store(log_pos+1, move_length);
588 log_pos+= 3;
589 current_size+= move_length;
590 }
591 /*
592 Handle case where page was shortend but 'changed_length' goes over
593 'current_size'. This can only happen when there was a page overflow
594 and we will below add back the overflow part
595 */
596 if (offset + changed_length > current_size)
597 {
598 DBUG_ASSERT(offset + changed_length <= ma_page->size);
599 changed_length= current_size - offset;
600 }
601 log_pos[0]= KEY_OP_CHANGE;
602 }
603 int2store(log_pos+1, changed_length);
604 log_pos+= 3;
605
606 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
607 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
608 log_data);
609 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
610 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
611 translog_parts= TRANSLOG_INTERNAL_PARTS + 2;
612
613 /*
614 If page was originally > block_size before operation and now all data
615 fits, append the end data that was not part of the previous logged
616 page to it.
617 */
618 DBUG_ASSERT(current_size <= max_page_size && current_size <= ma_page->size);
619 if (current_size != ma_page->size && current_size != max_page_size)
620 {
621 uint length= MY_MIN(ma_page->size, max_page_size) - current_size;
622 uchar *data= ma_page->buff + current_size;
623
624 log_pos[0]= KEY_OP_ADD_SUFFIX;
625 int2store(log_pos+1, length);
626 log_array[translog_parts].str= log_pos;
627 log_array[translog_parts].length= 3;
628 log_array[translog_parts+1].str= data;
629 log_array[translog_parts+1].length= length;
630 log_pos+= 3;
631 translog_parts+= 2;
632 current_size+= length;
633 changed_length+= length + 3;
634 }
635
636 _ma_log_key_changes(ma_page, log_array + translog_parts,
637 log_pos, &changed_length, &translog_parts);
638 /*
639 Remember new page length for future log entries for same page
640 Note that this can be different from ma_page->size in case of page
641 overflow!
642 */
643 ma_page->org_size= current_size;
644 DBUG_ASSERT(ma_page->org_size == MY_MIN(ma_page->size, max_page_size));
645
646 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
647 info->trn, info,
648 (translog_size_t)
649 log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
650 changed_length, translog_parts,
651 log_array, log_data, NULL))
652 DBUG_RETURN(-1);
653 DBUG_RETURN(0);
654}
655
656
657#ifdef EXTRA_DEBUG_KEY_CHANGES
658
659/* Log checksum and optionally key page to log */
660
661void _ma_log_key_changes(MARIA_PAGE *ma_page, LEX_CUSTRING *log_array,
662 uchar *log_pos, uint *changed_length,
663 uint *translog_parts)
664{
665 MARIA_SHARE *share= ma_page->info->s;
666 int page_length= MY_MIN(ma_page->size, share->max_index_block_size);
667 uint org_length;
668 ha_checksum crc;
669
670 DBUG_ASSERT(ma_page->flag == (uint) _ma_get_keypage_flag(share, ma_page->buff));
671
672 /* We have to change length as the page may have been shortened */
673 org_length= _ma_get_page_used(share, ma_page->buff);
674 _ma_store_page_used(share, ma_page->buff, page_length);
675 crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
676 page_length - LSN_STORE_SIZE);
677 _ma_store_page_used(share, ma_page->buff, org_length);
678
679 log_pos[0]= KEY_OP_CHECK;
680 int2store(log_pos+1, page_length);
681 int4store(log_pos+3, crc);
682
683 log_array[0].str= log_pos;
684 log_array[0].length= 7;
685 (*changed_length)+= 7;
686 (*translog_parts)++;
687#ifdef EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES
688 log_array[1].str= ma_page->buff;
689 log_array[1].length= page_length;
690 (*changed_length)+= page_length;
691 (*translog_parts)++;
692#endif /* EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES */
693}
694
695#endif /* EXTRA_DEBUG_KEY_CHANGES */
696
697/****************************************************************************
698 Redo of key pages
699****************************************************************************/
700
701/**
702 @brief Apply LOGREC_REDO_INDEX_NEW_PAGE
703
704 @param info Maria handler
705 @param header Header (without FILEID)
706
707 @return Operation status
708 @retval 0 OK
709 @retval 1 Error
710*/
711
712uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
713 const uchar *header, uint length)
714{
715 pgcache_page_no_t root_page= page_korr(header);
716 pgcache_page_no_t free_page= page_korr(header + PAGE_STORE_SIZE);
717 uint key_nr= key_nr_korr(header + PAGE_STORE_SIZE * 2);
718 my_bool page_type_flag= header[PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE];
719 enum pagecache_page_lock unlock_method;
720 enum pagecache_page_pin unpin_method;
721 MARIA_PINNED_PAGE page_link;
722 my_off_t file_size;
723 uchar *buff;
724 uint result;
725 MARIA_SHARE *share= info->s;
726 DBUG_ENTER("_ma_apply_redo_index_new_page");
727 DBUG_PRINT("enter", ("root_page: %lu free_page: %lu",
728 (ulong) root_page, (ulong) free_page));
729
730 /* Set header to point at key data */
731
732 share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
733 STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
734 STATE_NOT_MOVABLE);
735
736 header+= PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1;
737 length-= PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1;
738
739 file_size= (my_off_t) (root_page + 1) * share->block_size;
740 if (cmp_translog_addr(lsn, share->state.is_of_horizon) >= 0)
741 {
742 /* free_page is 0 if we shouldn't set key_del */
743 if (free_page)
744 {
745 if (free_page != IMPOSSIBLE_PAGE_NO)
746 share->state.key_del= (my_off_t) free_page * share->block_size;
747 else
748 share->state.key_del= HA_OFFSET_ERROR;
749 }
750 if (page_type_flag) /* root page */
751 share->state.key_root[key_nr]= file_size - share->block_size;
752 }
753
754 if (file_size > share->state.state.key_file_length)
755 {
756 share->state.state.key_file_length= file_size;
757 buff= info->keyread_buff;
758 info->keyread_buff_used= 1;
759 unlock_method= PAGECACHE_LOCK_WRITE;
760 unpin_method= PAGECACHE_PIN;
761 }
762 else
763 {
764 if (!(buff= pagecache_read(share->pagecache, &share->kfile,
765 root_page, 0, 0,
766 PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
767 &page_link.link)))
768 {
769 if (my_errno != HA_ERR_FILE_TOO_SHORT &&
770 my_errno != HA_ERR_WRONG_CRC)
771 {
772 result= 1;
773 goto err;
774 }
775 buff= pagecache_block_link_to_buffer(page_link.link);
776 }
777 else if (lsn_korr(buff) >= lsn)
778 {
779 /* Already applied */
780 DBUG_PRINT("info", ("Page is up to date, skipping redo"));
781 result= 0;
782 goto err;
783 }
784 unlock_method= PAGECACHE_LOCK_LEFT_WRITELOCKED;
785 unpin_method= PAGECACHE_PIN_LEFT_PINNED;
786 }
787
788 /* Write modified page */
789 bzero(buff, LSN_STORE_SIZE);
790 memcpy(buff + LSN_STORE_SIZE, header, length);
791 bzero(buff + LSN_STORE_SIZE + length,
792 share->max_index_block_size - LSN_STORE_SIZE - length);
793 bfill(buff + share->block_size - KEYPAGE_CHECKSUM_SIZE,
794 KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
795
796 result= 0;
797 if (unlock_method == PAGECACHE_LOCK_WRITE &&
798 pagecache_write(share->pagecache,
799 &share->kfile, root_page, 0,
800 buff, PAGECACHE_PLAIN_PAGE,
801 unlock_method, unpin_method,
802 PAGECACHE_WRITE_DELAY, &page_link.link,
803 LSN_IMPOSSIBLE))
804 result= 1;
805
806 /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
807 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
808 page_link.changed= 1;
809 push_dynamic(&info->pinned_pages, (void*) &page_link);
810 DBUG_RETURN(result);
811
812err:
813 pagecache_unlock_by_link(share->pagecache, page_link.link,
814 PAGECACHE_LOCK_WRITE_UNLOCK,
815 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
816 LSN_IMPOSSIBLE, 0, FALSE);
817 DBUG_RETURN(result);
818}
819
820
821/**
822 @brief Apply LOGREC_REDO_INDEX_FREE_PAGE
823
824 @param info Maria handler
825 @param header Header (without FILEID)
826
827 @return Operation status
828 @retval 0 OK
829 @retval 1 Error
830*/
831
832uint _ma_apply_redo_index_free_page(MARIA_HA *info,
833 LSN lsn,
834 const uchar *header)
835{
836 pgcache_page_no_t page= page_korr(header);
837 pgcache_page_no_t free_page= page_korr(header + PAGE_STORE_SIZE);
838 my_off_t old_link;
839 MARIA_PINNED_PAGE page_link;
840 MARIA_SHARE *share= info->s;
841 uchar *buff;
842 int result;
843 DBUG_ENTER("_ma_apply_redo_index_free_page");
844 DBUG_PRINT("enter", ("page: %lu free_page: %lu",
845 (ulong) page, (ulong) free_page));
846
847 share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
848 STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
849 STATE_NOT_MOVABLE);
850
851 if (cmp_translog_addr(lsn, share->state.is_of_horizon) >= 0)
852 share->state.key_del= (my_off_t) page * share->block_size;
853
854 old_link= ((free_page != IMPOSSIBLE_PAGE_NO) ?
855 (my_off_t) free_page * share->block_size :
856 HA_OFFSET_ERROR);
857 if (!(buff= pagecache_read(share->pagecache, &share->kfile,
858 page, 0, 0,
859 PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
860 &page_link.link)))
861 {
862 result= (uint) my_errno;
863 goto err;
864 }
865 if (lsn_korr(buff) >= lsn)
866 {
867 /* Already applied */
868 result= 0;
869 goto err;
870 }
871 /* Free page */
872 bzero(buff + LSN_STORE_SIZE, share->keypage_header - LSN_STORE_SIZE);
873 _ma_store_keynr(share, buff, (uchar) MARIA_DELETE_KEY_NR);
874 _ma_store_page_used(share, buff, share->keypage_header + 8);
875 mi_sizestore(buff + share->keypage_header, old_link);
876
877#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
878 {
879 bzero(buff + share->keypage_header + 8,
880 share->block_size - share->keypage_header - 8 -
881 KEYPAGE_CHECKSUM_SIZE);
882 }
883#endif
884
885 /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
886 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
887 page_link.changed= 1;
888 push_dynamic(&info->pinned_pages, (void*) &page_link);
889 DBUG_RETURN(0);
890
891err:
892 pagecache_unlock_by_link(share->pagecache, page_link.link,
893 PAGECACHE_LOCK_WRITE_UNLOCK,
894 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
895 LSN_IMPOSSIBLE, 0, FALSE);
896 DBUG_RETURN(result);
897}
898
899
900/**
901 @brief Apply LOGREC_REDO_INDEX
902
903 @fn ma_apply_redo_index()
904 @param info Maria handler
905 @param header Header (without FILEID)
906
907 @notes
908 Data for this part is a set of logical instructions of how to
909 construct the key page.
910
911 Information of the layout of the components for REDO_INDEX:
912
913 Name Parameters (in byte) Information
914 KEY_OP_OFFSET 2 Set position for next operations
915 KEY_OP_SHIFT 2 (signed int) How much to shift down or up
916 KEY_OP_CHANGE 2 length, data Data to replace at 'pos'
917 KEY_OP_ADD_PREFIX 2 move-length How much data should be moved up
918 2 change-length Data to be replaced at page start
919 KEY_OP_DEL_PREFIX 2 length Bytes to be deleted at page start
920 KEY_OP_ADD_SUFFIX 2 length, data Add data to end of page
921 KEY_OP_DEL_SUFFIX 2 length Reduce page length with this
922 Sets position to start of page
923 KEY_OP_CHECK 6 page_length[2],CRC Used only when debugging
924 This may be followed by page_length
925 of data (until end of log record)
926 KEY_OP_COMPACT_PAGE 6 transid
927 KEY_OP_SET_PAGEFLAG 1 flag for page
928 KEY_OP_MAX_PAGELENGTH 0 Set page to max length
929 KEY_OP_DEBUG 1 Info where logging was done
930
931 @return Operation status
932 @retval 0 OK
933 @retval 1 Error
934*/
935
936long my_counter= 0;
937
938uint _ma_apply_redo_index(MARIA_HA *info,
939 LSN lsn, const uchar *header, uint head_length)
940{
941 MARIA_SHARE *share= info->s;
942 pgcache_page_no_t page_pos= page_korr(header);
943 MARIA_PINNED_PAGE page_link;
944 uchar *buff;
945 const uchar *header_end= header + head_length;
946 uint page_offset= 0, org_page_length;
947 uint page_length, keypage_header, keynr;
948 uint max_page_size= share->max_index_block_size;
949#ifdef DBUG_ASSERT_EXISTS
950 uint new_page_length= 0;
951#endif
952 int result;
953 MARIA_PAGE page;
954 DBUG_ENTER("_ma_apply_redo_index");
955 DBUG_PRINT("enter", ("page: %lu", (ulong) page_pos));
956
957 /* Set header to point at key data */
958 header+= PAGE_STORE_SIZE;
959
960 if (!(buff= pagecache_read(share->pagecache, &share->kfile,
961 page_pos, 0, 0,
962 PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
963 &page_link.link)))
964 {
965 result= 1;
966 goto err;
967 }
968 if (lsn_korr(buff) >= lsn)
969 {
970 /* Already applied */
971 DBUG_PRINT("info", ("Page is up to date, skipping redo"));
972 result= 0;
973 goto err;
974 }
975
976 keynr= _ma_get_keynr(share, buff);
977 _ma_page_setup(&page, info, share->keyinfo + keynr, page_pos, buff);
978 org_page_length= page_length= page.size;
979
980 keypage_header= share->keypage_header;
981 DBUG_PRINT("redo", ("page_length: %u", page_length));
982
983 /* Apply modifications to page */
984 do
985 {
986 switch ((enum en_key_op) (*header++)) {
987 case KEY_OP_OFFSET: /* 1 */
988 page_offset= uint2korr(header);
989 header+= 2;
990 DBUG_PRINT("redo", ("key_op_offset: %u", page_offset));
991 DBUG_ASSERT(page_offset >= keypage_header && page_offset <= page_length);
992 break;
993 case KEY_OP_SHIFT: /* 2 */
994 {
995 int length= sint2korr(header);
996 header+= 2;
997 DBUG_PRINT("redo", ("key_op_shift: %d", length));
998 DBUG_ASSERT(page_offset != 0 && page_offset <= page_length &&
999 page_length + length <= max_page_size);
1000
1001 if (length < 0)
1002 {
1003 DBUG_ASSERT(page_offset - length <= page_length);
1004 bmove(buff + page_offset, buff + page_offset - length,
1005 page_length - page_offset + length);
1006 }
1007 else if (page_length != page_offset)
1008 bmove_upp(buff + page_length + length, buff + page_length,
1009 page_length - page_offset);
1010 page_length+= length;
1011 break;
1012 }
1013 case KEY_OP_CHANGE: /* 3 */
1014 {
1015 uint length= uint2korr(header);
1016 DBUG_PRINT("redo", ("key_op_change: %u", length));
1017 DBUG_ASSERT(page_offset != 0 && page_offset + length <= page_length);
1018
1019 memcpy(buff + page_offset, header + 2 , length);
1020 page_offset+= length; /* Put offset after changed length */
1021 header+= 2 + length;
1022 break;
1023 }
1024 case KEY_OP_ADD_PREFIX: /* 4 */
1025 {
1026 uint insert_length= uint2korr(header);
1027 uint changed_length= uint2korr(header+2);
1028 DBUG_PRINT("redo", ("key_op_add_prefix: %u %u",
1029 insert_length, changed_length));
1030
1031 DBUG_ASSERT(insert_length <= changed_length &&
1032 page_length + insert_length <= max_page_size);
1033
1034 bmove_upp(buff + page_length + insert_length, buff + page_length,
1035 page_length - keypage_header);
1036 memcpy(buff + keypage_header, header + 4 , changed_length);
1037 header+= 4 + changed_length;
1038 page_length+= insert_length;
1039 break;
1040 }
1041 case KEY_OP_DEL_PREFIX: /* 5 */
1042 {
1043 uint length= uint2korr(header);
1044 header+= 2;
1045 DBUG_PRINT("redo", ("key_op_del_prefix: %u", length));
1046 DBUG_ASSERT(length <= page_length - keypage_header);
1047
1048 bmove(buff + keypage_header, buff + keypage_header +
1049 length, page_length - keypage_header - length);
1050 page_length-= length;
1051
1052 page_offset= keypage_header; /* Prepare for change */
1053 break;
1054 }
1055 case KEY_OP_ADD_SUFFIX: /* 6 */
1056 {
1057 uint insert_length= uint2korr(header);
1058 DBUG_PRINT("redo", ("key_op_add_suffix: %u", insert_length));
1059 DBUG_ASSERT(page_length + insert_length <= max_page_size);
1060 memcpy(buff + page_length, header+2, insert_length);
1061
1062 page_length+= insert_length;
1063 header+= 2 + insert_length;
1064 break;
1065 }
1066 case KEY_OP_DEL_SUFFIX: /* 7 */
1067 {
1068 uint del_length= uint2korr(header);
1069 header+= 2;
1070 DBUG_PRINT("redo", ("key_op_del_suffix: %u", del_length));
1071 DBUG_ASSERT(page_length - del_length >= keypage_header);
1072 page_length-= del_length;
1073 break;
1074 }
1075 case KEY_OP_CHECK: /* 8 */
1076 {
1077#ifdef EXTRA_DEBUG_KEY_CHANGES
1078 uint check_page_length;
1079 ha_checksum crc;
1080 check_page_length= uint2korr(header);
1081 crc= uint4korr(header+2);
1082 _ma_store_page_used(share, buff, page_length);
1083 if (check_page_length != page_length ||
1084 crc != (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
1085 page_length - LSN_STORE_SIZE))
1086 {
1087 DBUG_DUMP("KEY_OP_CHECK bad page", buff, page_length);
1088 if (header + 6 + check_page_length <= header_end)
1089 {
1090 DBUG_DUMP("KEY_OP_CHECK org page", header + 6, check_page_length);
1091 }
1092 DBUG_ASSERT("crc failure in REDO_INDEX" == 0);
1093 }
1094#endif
1095 DBUG_PRINT("redo", ("key_op_check"));
1096 /*
1097 This is the last entry in the block and it can contain page_length
1098 data or not
1099 */
1100 DBUG_ASSERT(header + 6 == header_end ||
1101 header + 6 + page_length == header_end);
1102 header= header_end;
1103 break;
1104 }
1105 case KEY_OP_DEBUG:
1106 DBUG_PRINT("redo", ("Debug: %u", (uint) header[0]));
1107 header++;
1108 break;
1109 case KEY_OP_DEBUG_2:
1110 DBUG_PRINT("redo", ("org_page_length: %u new_page_length: %u",
1111 uint2korr(header), uint2korr(header+2)));
1112 DBUG_ASSERT(uint2korr(header) == page_length);
1113#ifdef DBUG_ASSERT_EXISTS
1114 new_page_length= MY_MIN(uint2korr(header+2), max_page_size);
1115#endif
1116 header+= 4;
1117 break;
1118 case KEY_OP_MAX_PAGELENGTH:
1119 DBUG_PRINT("redo", ("key_op_max_page_length"));
1120 page_length= max_page_size;
1121 break;
1122 case KEY_OP_MULTI_COPY: /* 9 */
1123 {
1124 /*
1125 List of fixed-len memcpy() operations with their source located inside
1126 the page. The log record's piece looks like:
1127 first the length 'full_length' to be used by memcpy()
1128 then the number of bytes used by the list of (to,from) pairs
1129 then the (to,from) pairs, so we do:
1130 for (t,f) in [list of (to,from) pairs]:
1131 memcpy(t, f, full_length).
1132 */
1133 uint full_length, log_memcpy_length;
1134 const uchar *log_memcpy_end;
1135
1136 DBUG_PRINT("redo", ("key_op_multi_copy"));
1137 full_length= uint2korr(header);
1138 header+= 2;
1139 log_memcpy_length= uint2korr(header);
1140 header+= 2;
1141 log_memcpy_end= header + log_memcpy_length;
1142 DBUG_ASSERT(full_length <= max_page_size);
1143 while (header < log_memcpy_end)
1144 {
1145 uint to, from;
1146 to= uint2korr(header);
1147 header+= 2;
1148 from= uint2korr(header);
1149 header+= 2;
1150 /* "from" is a place in the existing page */
1151 DBUG_ASSERT(MY_MAX(from, to) < max_page_size);
1152 memcpy(buff + to, buff + from, full_length);
1153 }
1154 break;
1155 }
1156 case KEY_OP_SET_PAGEFLAG:
1157 DBUG_PRINT("redo", ("key_op_set_pageflag"));
1158 _ma_store_keypage_flag(share, buff, *header++);
1159 break;
1160 case KEY_OP_COMPACT_PAGE:
1161 {
1162 TrID transid= transid_korr(header);
1163
1164 DBUG_PRINT("redo", ("key_op_compact_page"));
1165 header+= TRANSID_SIZE;
1166 if (_ma_compact_keypage(&page, transid))
1167 {
1168 result= 1;
1169 goto err;
1170 }
1171 page_length= page.size;
1172 break;
1173 }
1174 case KEY_OP_NONE:
1175 default:
1176 DBUG_ASSERT(0);
1177 result= 1;
1178 goto err;
1179 }
1180 } while (header < header_end);
1181 DBUG_ASSERT(header == header_end);
1182 DBUG_ASSERT(new_page_length == 0 || new_page_length == page_length);
1183
1184 /* Write modified page */
1185 page.size= page_length;
1186 _ma_store_page_used(share, buff, page_length);
1187
1188 /*
1189 Clean old stuff up. Gives us better compression of we archive things
1190 and makes things easer to debug
1191 */
1192 if (page_length < org_page_length)
1193 bzero(buff + page_length, org_page_length-page_length);
1194
1195 /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
1196 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
1197 page_link.changed= 1;
1198 push_dynamic(&info->pinned_pages, (void*) &page_link);
1199 DBUG_RETURN(0);
1200
1201err:
1202 pagecache_unlock_by_link(share->pagecache, page_link.link,
1203 PAGECACHE_LOCK_WRITE_UNLOCK,
1204 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
1205 LSN_IMPOSSIBLE, 0, FALSE);
1206 if (result)
1207 _ma_mark_file_crashed(share);
1208 DBUG_RETURN(result);
1209}
1210
1211
1212/****************************************************************************
1213 Undo of key block changes
1214****************************************************************************/
1215
1216/**
1217 @brief Undo of insert of key (ie, delete the inserted key)
1218*/
1219
1220my_bool _ma_apply_undo_key_insert(MARIA_HA *info, LSN undo_lsn,
1221 const uchar *header, uint length)
1222{
1223 LSN lsn;
1224 my_bool res;
1225 uint keynr;
1226 uchar key_buff[MARIA_MAX_KEY_BUFF];
1227 MARIA_SHARE *share= info->s;
1228 MARIA_KEY key;
1229 my_off_t new_root;
1230 struct st_msg_to_write_hook_for_undo_key msg;
1231 DBUG_ENTER("_ma_apply_undo_key_insert");
1232
1233 share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
1234 STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
1235 STATE_NOT_MOVABLE);
1236 keynr= key_nr_korr(header);
1237 length-= KEY_NR_STORE_SIZE;
1238
1239 /* We have to copy key as _ma_ck_real_delete() may change it */
1240 memcpy(key_buff, header + KEY_NR_STORE_SIZE, length);
1241 DBUG_DUMP("key_buff", key_buff, length);
1242
1243 new_root= share->state.key_root[keynr];
1244 /*
1245 Change the key to an internal structure.
1246 It's safe to have SEARCH_USER_KEY_HAS_TRANSID even if there isn't
1247 a transaction id, as ha_key_cmp() will stop comparison when key length
1248 is reached.
1249 For index with transid flag, the ref_length of the key is not correct.
1250 This should however be safe as long as this key is only used for
1251 comparsion against other keys (not for packing or for read-next etc as
1252 in this case we use data_length + ref_length, which is correct.
1253 */
1254 key.keyinfo= share->keyinfo + keynr;
1255 key.data= key_buff;
1256 key.data_length= length - share->rec_reflength;
1257 key.ref_length= share->rec_reflength;
1258 key.flag= SEARCH_USER_KEY_HAS_TRANSID;
1259
1260 res= ((share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
1261 maria_rtree_real_delete(info, &key, &new_root) :
1262 _ma_ck_real_delete(info, &key, &new_root));
1263 if (res)
1264 _ma_mark_file_crashed(share);
1265 msg.root= &share->state.key_root[keynr];
1266 msg.value= new_root;
1267 msg.keynr= keynr;
1268
1269 if (_ma_write_clr(info, undo_lsn, *msg.root == msg.value ?
1270 LOGREC_UNDO_KEY_INSERT : LOGREC_UNDO_KEY_INSERT_WITH_ROOT,
1271 0, 0, &lsn, (void*) &msg))
1272 res= 1;
1273
1274 _ma_fast_unlock_key_del(info);
1275 _ma_unpin_all_pages_and_finalize_row(info, lsn);
1276 DBUG_RETURN(res);
1277}
1278
1279
1280/**
1281 @brief Undo of delete of key (ie, insert the deleted key)
1282
1283 @param with_root If the UNDO is UNDO_KEY_DELETE_WITH_ROOT
1284*/
1285
1286my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
1287 const uchar *header, uint length,
1288 my_bool with_root)
1289{
1290 LSN lsn;
1291 my_bool res;
1292 uint keynr, skip_bytes;
1293 uchar key_buff[MARIA_MAX_KEY_BUFF];
1294 MARIA_SHARE *share= info->s;
1295 my_off_t new_root;
1296 struct st_msg_to_write_hook_for_undo_key msg;
1297 MARIA_KEY key;
1298 DBUG_ENTER("_ma_apply_undo_key_delete");
1299
1300 share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
1301 STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
1302 STATE_NOT_MOVABLE);
1303 keynr= key_nr_korr(header);
1304 skip_bytes= KEY_NR_STORE_SIZE + (with_root ? PAGE_STORE_SIZE : 0);
1305 header+= skip_bytes;
1306 length-= skip_bytes;
1307
1308 /* We have to copy key as _ma_ck_real_write_btree() may change it */
1309 memcpy(key_buff, header, length);
1310 DBUG_DUMP("key", key_buff, length);
1311
1312 key.keyinfo= share->keyinfo + keynr;
1313 key.data= key_buff;
1314 key.data_length= length - share->rec_reflength;
1315 key.ref_length= share->rec_reflength;
1316 key.flag= SEARCH_USER_KEY_HAS_TRANSID;
1317
1318 new_root= share->state.key_root[keynr];
1319 res= (share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
1320 maria_rtree_insert_level(info, &key, -1, &new_root) :
1321 _ma_ck_real_write_btree(info, &key, &new_root,
1322 share->keyinfo[keynr].write_comp_flag |
1323 key.flag);
1324 if (res)
1325 _ma_mark_file_crashed(share);
1326
1327 msg.root= &share->state.key_root[keynr];
1328 msg.value= new_root;
1329 msg.keynr= keynr;
1330 if (_ma_write_clr(info, undo_lsn,
1331 *msg.root == msg.value ?
1332 LOGREC_UNDO_KEY_DELETE : LOGREC_UNDO_KEY_DELETE_WITH_ROOT,
1333 0, 0, &lsn,
1334 (void*) &msg))
1335 res= 1;
1336
1337 _ma_fast_unlock_key_del(info);
1338 _ma_unpin_all_pages_and_finalize_row(info, lsn);
1339 DBUG_RETURN(res);
1340}
1341
1342
1343/****************************************************************************
1344 Handle some local variables
1345****************************************************************************/
1346
1347/**
1348 @brief lock key_del for other threads usage
1349
1350 @fn _ma_lock_key_del()
1351 @param info Maria handler
1352 @param insert_at_end Set to 1 if we are doing an insert
1353
1354 @note
1355 To allow higher concurrency in the common case where we do inserts
1356 and we don't have any linked blocks we do the following:
1357 - Mark in info->key_del_used that we are not using key_del
1358 - Return at once (without marking key_del as used)
1359
1360 This is safe as we in this case don't write key_del_current into
1361 the redo log and during recover we are not updating key_del.
1362
1363 @retval 1 Use page at end of file
1364 @retval 0 Use page at share->key_del_current
1365*/
1366
1367my_bool _ma_lock_key_del(MARIA_HA *info, my_bool insert_at_end)
1368{
1369 MARIA_SHARE *share= info->s;
1370
1371 /*
1372 info->key_del_used is 0 initially.
1373 If the caller needs a block (_ma_new()), we look at the free list:
1374 - looks empty? then caller will create a new block at end of file and
1375 remember (through info->key_del_used==2) that it will not change
1376 state.key_del and does not need to wake up waiters as nobody will wait for
1377 it.
1378 - non-empty? then we wait for other users of the state.key_del list to
1379 have finished, then we lock this list (through share->key_del_used==1)
1380 because we need to prevent some other thread to also read state.key_del
1381 and use the same page as ours. We remember through info->key_del_used==1
1382 that we will have to set state.key_del at unlock time and wake up
1383 waiters.
1384 If the caller wants to free a block (_ma_dispose()), "empty" and
1385 "non-empty" are treated as "non-empty" is treated above.
1386 When we are ready to unlock, we copy share->key_del_current into
1387 state.key_del. Unlocking happens when writing the UNDO log record, that
1388 can make a long lock time.
1389 Why we wrote "*looks* empty": because we are looking at state.key_del
1390 which may be slightly old (share->key_del_current may be more recent and
1391 exact): when we want a new page, we tolerate to treat "there was no free
1392 page 1 millisecond ago" as "there is no free page". It's ok to non-pop
1393 (_ma_new(), page will be found later anyway) but it's not ok to non-push
1394 (_ma_dispose(), page would be lost).
1395 When we leave this function, info->key_del_used is always 1 or 2.
1396 */
1397 if (info->key_del_used != 1)
1398 {
1399 mysql_mutex_lock(&share->key_del_lock);
1400 if (share->state.key_del == HA_OFFSET_ERROR && insert_at_end)
1401 {
1402 mysql_mutex_unlock(&share->key_del_lock);
1403 info->key_del_used= 2; /* insert-with-append */
1404 return 1;
1405 }
1406 while (share->key_del_used)
1407 mysql_cond_wait(&share->key_del_cond, &share->key_del_lock);
1408 info->key_del_used= 1;
1409 share->key_del_used= 1;
1410 share->key_del_current= share->state.key_del;
1411 mysql_mutex_unlock(&share->key_del_lock);
1412 }
1413 return share->key_del_current == HA_OFFSET_ERROR;
1414}
1415
1416
1417/**
1418 @brief copy changes to key_del and unlock it
1419
1420 @notes
1421 In case of many threads using the maria table, we always have a lock
1422 on the translog when comming here.
1423*/
1424
1425void _ma_unlock_key_del(MARIA_HA *info)
1426{
1427 DBUG_ASSERT(info->key_del_used);
1428 if (info->key_del_used == 1) /* Ignore insert-with-append */
1429 {
1430 MARIA_SHARE *share= info->s;
1431 mysql_mutex_lock(&share->key_del_lock);
1432 share->key_del_used= 0;
1433 share->state.key_del= share->key_del_current;
1434 mysql_mutex_unlock(&share->key_del_lock);
1435 mysql_cond_signal(&share->key_del_cond);
1436 }
1437 info->key_del_used= 0;
1438}
1439