1/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16/*
17 Read and write key blocks
18
19 The basic structure of a key block is as follows:
20
21 LSN 7 (LSN_STORE_SIZE); Log number for last change;
22 Only for transactional pages
23 PACK_TRANSID 6 (TRANSID_SIZE); Relative transid to pack page transid's
24 Only for transactional pages
25 KEYNR 1 (KEYPAGE_KEYID_SIZE) Which index this page belongs to
26 FLAG 1 (KEYPAGE_FLAG_SIZE) Flags for page
27 PAGE_SIZE 2 (KEYPAGE_USED_SIZE) How much of the page is used.
28 high-byte-first
29
30 The flag is a combination of the following values:
31
32 KEYPAGE_FLAG_ISNOD Page is a node
33 KEYPAGE_FLAG_HAS_TRANSID There may be a transid on the page.
34
35 After this we store key data, either packed or not packed, directly
36 after each other. If the page is a node flag, there is a pointer to
37 the next key page at page start and after each key.
38
39 At end of page the last KEYPAGE_CHECKSUM_SIZE bytes are reserved for a
40 page checksum.
41*/
42
43#include "maria_def.h"
44#include "trnman.h"
45#include "ma_key_recover.h"
46
47/**
48 Fill MARIA_PAGE structure for usage with _ma_write_keypage
49*/
50
51void _ma_page_setup(MARIA_PAGE *page, MARIA_HA *info,
52 const MARIA_KEYDEF *keyinfo, my_off_t pos,
53 uchar *buff)
54{
55 MARIA_SHARE *share= info->s;
56
57 page->info= info;
58 page->keyinfo= keyinfo;
59 page->buff= buff;
60 page->pos= pos;
61 page->size= _ma_get_page_used(share, buff);
62 page->org_size= page->size;
63 page->flag= _ma_get_keypage_flag(share, buff);
64 page->node= ((page->flag & KEYPAGE_FLAG_ISNOD) ?
65 share->base.key_reflength : 0);
66}
67
68#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
69void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page)
70{
71 uint length= page->size;
72 DBUG_ASSERT(length <= share->max_index_block_size);
73 bzero(page->buff + length, share->block_size - length);
74}
75#endif
76
77
78/**
79 Fetch a key-page in memory
80
81 @fn _ma_fetch_keypage()
82 @param page Fill this struct with information about read page
83 @param info Maria handler
84 @param keyinfo Key definition for used key
85 @param pos Position for page (in bytes)
86 @param lock Lock type for page
87 @param level Importance of page; Priority for page cache
88 @param buff Buffer to use for page
89 @param return_buffer Set to 1 if we want to force useage of buff
90
91 @return
92 @retval 0 ok
93 @retval 1 error
94*/
95
96my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
97 const MARIA_KEYDEF *keyinfo,
98 my_off_t pos, enum pagecache_page_lock lock,
99 int level, uchar *buff,
100 my_bool return_buffer __attribute__ ((unused)))
101{
102 uchar *tmp;
103 MARIA_PINNED_PAGE page_link;
104 MARIA_SHARE *share= info->s;
105 uint block_size= share->block_size;
106 DBUG_ENTER("_ma_fetch_keypage");
107 DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
108
109 tmp= pagecache_read(share->pagecache, &share->kfile,
110 (pgcache_page_no_t) (pos / block_size), level, buff,
111 share->page_type, lock, &page_link.link);
112
113 if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
114 {
115 DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE || PAGECACHE_LOCK_READ);
116 page_link.unlock= (lock == PAGECACHE_LOCK_WRITE ?
117 PAGECACHE_LOCK_WRITE_UNLOCK :
118 PAGECACHE_LOCK_READ_UNLOCK);
119 page_link.changed= 0;
120 push_dynamic(&info->pinned_pages, (void*) &page_link);
121 page->link_offset= info->pinned_pages.elements-1;
122 }
123
124 if (tmp == info->buff)
125 info->keyread_buff_used=1;
126 else if (!tmp)
127 {
128 DBUG_PRINT("error",("Got errno: %d from pagecache_read",my_errno));
129 info->last_keypage=HA_OFFSET_ERROR;
130 _ma_set_fatal_error(share, HA_ERR_CRASHED);
131 DBUG_RETURN(1);
132 }
133 info->last_keypage= pos;
134
135 /*
136 Setup page structure to make pages easy to use
137 This is same as page_fill_info, but here inlined as this si used
138 so often.
139 */
140 page->info= info;
141 page->keyinfo= keyinfo;
142 page->buff= tmp;
143 page->pos= pos;
144 page->size= _ma_get_page_used(share, tmp);
145 page->org_size= page->size; /* For debugging */
146 page->flag= _ma_get_keypage_flag(share, tmp);
147 page->node= ((page->flag & KEYPAGE_FLAG_ISNOD) ?
148 share->base.key_reflength : 0);
149
150#ifdef EXTRA_DEBUG
151 {
152 uint page_size= page->size;
153 if (page_size < 4 || page_size > share->max_index_block_size ||
154 _ma_get_keynr(share, tmp) != keyinfo->key_nr)
155 {
156 DBUG_PRINT("error",("page %lu had wrong page length: %u keynr: %u",
157 (ulong) (pos / block_size), page_size,
158 _ma_get_keynr(share, tmp)));
159 DBUG_DUMP("page", tmp, page_size);
160 info->last_keypage = HA_OFFSET_ERROR;
161 _ma_set_fatal_error(share, HA_ERR_CRASHED);
162 DBUG_RETURN(1);
163 }
164 }
165#endif
166 DBUG_RETURN(0);
167} /* _ma_fetch_keypage */
168
169
170/* Write a key-page on disk */
171
172my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock,
173 int level)
174{
175 MARIA_SHARE *share= page->info->s;
176 uint block_size= share->block_size;
177 uchar *buff= page->buff;
178 my_bool res;
179 MARIA_PINNED_PAGE page_link;
180 DBUG_ENTER("_ma_write_keypage");
181
182 /*
183 The following ensures that for transactional tables we have logged
184 all changes that changes the page size (as the logging code sets
185 page->org_size)
186 */
187 DBUG_ASSERT(!share->now_transactional || page->size == page->org_size);
188
189#ifdef EXTRA_DEBUG /* Safety check */
190 {
191 uint page_length, nod_flag;
192 page_length= _ma_get_page_used(share, buff);
193 nod_flag= _ma_test_if_nod(share, buff);
194
195 DBUG_ASSERT(page->size == page_length);
196 DBUG_ASSERT(page->size <= share->max_index_block_size);
197 DBUG_ASSERT(page->flag == _ma_get_keypage_flag(share, buff));
198
199 if (page->pos < share->base.keystart ||
200 page->pos+block_size > share->state.state.key_file_length ||
201 (page->pos & (maria_block_size-1)))
202 {
203 DBUG_PRINT("error",("Trying to write inside key status region: "
204 "key_start: %lu length: %lu page_pos: %lu",
205 (long) share->base.keystart,
206 (long) share->state.state.key_file_length,
207 (long) page->pos));
208 my_errno=EINVAL;
209 DBUG_ASSERT(0);
210 DBUG_RETURN(1);
211 }
212 DBUG_PRINT("page",("write page at: %lu",(ulong) (page->pos / block_size)));
213 DBUG_DUMP("buff", buff, page_length);
214 DBUG_ASSERT(page_length >= share->keypage_header + nod_flag +
215 page->keyinfo->minlength || maria_in_recovery);
216 }
217#endif
218
219 /* Verify that keynr is correct */
220 DBUG_ASSERT(_ma_get_keynr(share, buff) == page->keyinfo->key_nr);
221
222#if defined(EXTRA_DEBUG) && defined(HAVE_valgrind) && defined(WHEN_DEBUGGING)
223 MEM_CHECK_DEFINED(buff, block_size);
224#endif
225
226 page_cleanup(share, page);
227 {
228 PAGECACHE_BLOCK_LINK **link;
229 enum pagecache_page_pin pin;
230 if (lock == PAGECACHE_LOCK_LEFT_WRITELOCKED)
231 {
232 pin= PAGECACHE_PIN_LEFT_PINNED;
233 link= &page_link.link;
234 }
235 else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK)
236 {
237 pin= PAGECACHE_UNPIN;
238 /*
239 We unlock this page so link should be 0 to prevent it usage
240 even accidentally
241 */
242 link= NULL;
243 }
244 else
245 {
246 pin= PAGECACHE_PIN;
247 link= &page_link.link;
248 }
249 res= pagecache_write(share->pagecache,
250 &share->kfile,
251 (pgcache_page_no_t) (page->pos / block_size),
252 level, buff, share->page_type,
253 lock, pin, PAGECACHE_WRITE_DELAY, link,
254 LSN_IMPOSSIBLE);
255 }
256
257 if (lock == PAGECACHE_LOCK_WRITE)
258 {
259 /* It was not locked before, we have to unlock it when we unpin pages */
260 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
261 page_link.changed= 1;
262 push_dynamic(&page->info->pinned_pages, (void*) &page_link);
263 }
264 DBUG_RETURN(res);
265}
266
267
268/**
269 @brief Put page in free list
270
271 @fn _ma_dispose()
272 @param info Maria handle
273 @param pos Address to page
274 @param page_not_read 1 if page has not yet been read
275
276 @note
277 The page at 'pos' must have been read with a write lock.
278 This function does logging (unlike _ma_new()).
279
280 @return
281 @retval 0 ok
282 @retval 1 error
283
284*/
285
286int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
287{
288 my_off_t old_link;
289 uchar buff[MAX_KEYPAGE_HEADER_SIZE+ 8 + 2];
290 ulonglong page_no;
291 MARIA_SHARE *share= info->s;
292 MARIA_PINNED_PAGE page_link;
293 uint block_size= share->block_size;
294 int result= 0;
295 enum pagecache_page_lock lock_method;
296 enum pagecache_page_pin pin_method;
297 DBUG_ENTER("_ma_dispose");
298 DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
299 DBUG_ASSERT(pos % block_size == 0);
300
301 (void) _ma_lock_key_del(info, 0);
302
303 old_link= share->key_del_current;
304 share->key_del_current= pos;
305 page_no= pos / block_size;
306 bzero(buff, share->keypage_header);
307 _ma_store_keynr(share, buff, (uchar) MARIA_DELETE_KEY_NR);
308 _ma_store_page_used(share, buff, share->keypage_header + 8);
309 mi_sizestore(buff + share->keypage_header, old_link);
310 share->state.changed|= STATE_NOT_SORTED_PAGES;
311
312 if (share->now_transactional)
313 {
314 LSN lsn;
315 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2];
316 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
317 my_off_t page;
318
319 /* Store address of deleted page */
320 page_store(log_data + FILEID_STORE_SIZE, page_no);
321
322 /* Store link to next unused page (the link that is written to page) */
323 page= (old_link == HA_OFFSET_ERROR ? IMPOSSIBLE_PAGE_NO :
324 old_link / block_size);
325 page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page);
326
327 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
328 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
329
330 if (translog_write_record(&lsn, LOGREC_REDO_INDEX_FREE_PAGE,
331 info->trn, info,
332 (translog_size_t) sizeof(log_data),
333 TRANSLOG_INTERNAL_PARTS + 1, log_array,
334 log_data, NULL))
335 result= 1;
336 }
337
338 if (page_not_read)
339 {
340 lock_method= PAGECACHE_LOCK_WRITE;
341 pin_method= PAGECACHE_PIN;
342 }
343 else
344 {
345 lock_method= PAGECACHE_LOCK_LEFT_WRITELOCKED;
346 pin_method= PAGECACHE_PIN_LEFT_PINNED;
347 }
348
349 if (pagecache_write_part(share->pagecache,
350 &share->kfile, (pgcache_page_no_t) page_no,
351 PAGECACHE_PRIORITY_LOW, buff,
352 share->page_type,
353 lock_method, pin_method,
354 PAGECACHE_WRITE_DELAY, &page_link.link,
355 LSN_IMPOSSIBLE,
356 0, share->keypage_header + 8))
357 result= 1;
358
359#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
360 {
361 uchar *page_buff= pagecache_block_link_to_buffer(page_link.link);
362 bzero(page_buff + share->keypage_header + 8,
363 block_size - share->keypage_header - 8 - KEYPAGE_CHECKSUM_SIZE);
364 }
365#endif
366
367 if (page_not_read)
368 {
369 /* It was not locked before, we have to unlock it when we unpin pages */
370 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
371 page_link.changed= 1;
372 push_dynamic(&info->pinned_pages, (void*) &page_link);
373 }
374
375 DBUG_RETURN(result);
376} /* _ma_dispose */
377
378
379/**
380 @brief Get address for free page to use
381
382 @fn _ma_new()
383 @param info Maria handle
384 @param level Type of key block (caching priority for pagecache)
385 @param page_link Pointer to page in page cache if read. One can
386 check if this is used by checking if
387 page_link->changed != 0
388
389 @note Logging of this is left to the caller (so that the "new"ing and the
390 first changes done to this new page can be logged as one single entry - one
391 single _ma_log_new()) call).
392
393 @return
394 HA_OFFSET_ERROR File is full or page read error
395 # Page address to use
396*/
397
398my_off_t _ma_new(register MARIA_HA *info, int level,
399 MARIA_PINNED_PAGE **page_link)
400
401{
402 my_off_t pos;
403 MARIA_SHARE *share= info->s;
404 uint block_size= share->block_size;
405 DBUG_ENTER("_ma_new");
406
407 if (_ma_lock_key_del(info, 1))
408 {
409 mysql_mutex_lock(&share->intern_lock);
410 pos= share->state.state.key_file_length;
411 if (pos >= share->base.max_key_file_length - block_size)
412 {
413 my_errno=HA_ERR_INDEX_FILE_FULL;
414 mysql_mutex_unlock(&share->intern_lock);
415 DBUG_RETURN(HA_OFFSET_ERROR);
416 }
417 share->state.state.key_file_length+= block_size;
418 /* Following is for not transactional tables */
419 info->state->key_file_length= share->state.state.key_file_length;
420 mysql_mutex_unlock(&share->intern_lock);
421 (*page_link)->changed= 0;
422 (*page_link)->write_lock= PAGECACHE_LOCK_WRITE;
423 }
424 else
425 {
426 uchar *buff;
427 pos= share->key_del_current; /* Protected */
428 DBUG_ASSERT(share->pagecache->block_size == block_size);
429 if (!(buff= pagecache_read(share->pagecache,
430 &share->kfile,
431 (pgcache_page_no_t) (pos / block_size), level,
432 0, share->page_type,
433 PAGECACHE_LOCK_WRITE, &(*page_link)->link)))
434 pos= HA_OFFSET_ERROR;
435 else
436 {
437 /*
438 Next deleted page's number is in the header of the present page
439 (single linked list):
440 */
441#ifdef DBUG_ASSERT_EXISTS
442 my_off_t key_del_current;
443#endif
444 share->key_del_current= mi_sizekorr(buff+share->keypage_header);
445#ifdef DBUG_ASSERT_EXISTS
446 key_del_current= share->key_del_current;
447 DBUG_ASSERT((key_del_current != 0) &&
448 ((key_del_current == HA_OFFSET_ERROR) ||
449 (key_del_current <=
450 (share->state.state.key_file_length - block_size))));
451#endif
452 }
453
454 (*page_link)->unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
455 (*page_link)->write_lock= PAGECACHE_LOCK_WRITE;
456 /*
457 We have to mark it changed as _ma_flush_pending_blocks() uses
458 'changed' to know if we used the page cache or not
459 */
460 (*page_link)->changed= 1;
461 push_dynamic(&info->pinned_pages, (void*) *page_link);
462 *page_link= dynamic_element(&info->pinned_pages,
463 info->pinned_pages.elements-1,
464 MARIA_PINNED_PAGE *);
465 }
466 share->state.changed|= STATE_NOT_SORTED_PAGES;
467 DBUG_PRINT("exit",("Pos: %ld",(long) pos));
468 DBUG_RETURN(pos);
469} /* _ma_new */
470
471
472/**
473 Log compactation of a index page
474*/
475
476static my_bool _ma_log_compact_keypage(MARIA_PAGE *ma_page,
477 TrID min_read_from)
478{
479 LSN lsn;
480 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 7 + TRANSID_SIZE];
481 uchar *log_pos;
482 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
483 MARIA_HA *info= ma_page->info;
484 MARIA_SHARE *share= info->s;
485 uint translog_parts, extra_length;
486 my_off_t page= ma_page->pos;
487 DBUG_ENTER("_ma_log_compact_keypage");
488 DBUG_PRINT("enter", ("page: %lu", (ulong) (page / share->block_size)));
489
490 /* Store address of new root page */
491 page/= share->block_size;
492 page_store(log_data + FILEID_STORE_SIZE, page);
493
494 log_pos= log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE;
495
496 log_pos[0]= KEY_OP_COMPACT_PAGE;
497 transid_store(log_pos + 1, min_read_from);
498 log_pos+= 1 + TRANSID_SIZE;
499
500 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
501 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
502 log_data);
503 translog_parts= 1;
504 extra_length= 0;
505
506 _ma_log_key_changes(ma_page,
507 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
508 log_pos, &extra_length, &translog_parts);
509 /* Remember new page length for future log entires for same page */
510 ma_page->org_size= ma_page->size;
511
512 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
513 info->trn, info,
514 (translog_size_t)(log_array[TRANSLOG_INTERNAL_PARTS +
515 0].length + extra_length),
516 TRANSLOG_INTERNAL_PARTS + translog_parts,
517 log_array, log_data, NULL))
518 DBUG_RETURN(1);
519 DBUG_RETURN(0);
520}
521
522
523/**
524 Remove all transaction id's less than given one from a key page
525
526 @fn _ma_compact_keypage()
527 @param keyinfo Key handler
528 @param page_pos Page position on disk
529 @param page Buffer for page
530 @param min_read_from Remove all trids from page less than this
531
532 @retval 0 Ok
533 ®retval 1 Error; my_errno contains the error
534*/
535
536my_bool _ma_compact_keypage(MARIA_PAGE *ma_page, TrID min_read_from)
537{
538 MARIA_HA *info= ma_page->info;
539 MARIA_SHARE *share= info->s;
540 MARIA_KEY key;
541 uchar *page, *endpos, *start_of_empty_space;
542 uint page_flag, nod_flag, saved_space;
543 my_bool page_has_transid;
544 DBUG_ENTER("_ma_compact_keypage");
545
546 page_flag= ma_page->flag;
547 if (!(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
548 DBUG_RETURN(0); /* No transaction id on page */
549
550 nod_flag= ma_page->node;
551 page= ma_page->buff;
552 endpos= page + ma_page->size;
553 key.data= info->lastkey_buff;
554 key.keyinfo= (MARIA_KEYDEF*) ma_page->keyinfo;
555
556 page_has_transid= 0;
557 page+= share->keypage_header + nod_flag;
558 key.data[0]= 0; /* safety */
559 start_of_empty_space= 0;
560 saved_space= 0;
561 do
562 {
563 if (!(page= (*ma_page->keyinfo->skip_key)(&key, 0, 0, page)))
564 {
565 DBUG_PRINT("error",("Couldn't find last key: page_pos: %p",
566 page));
567 _ma_set_fatal_error(share, HA_ERR_CRASHED);
568 DBUG_RETURN(1);
569 }
570 if (key_has_transid(page-1))
571 {
572 uint transid_length;
573 transid_length= transid_packed_length(page);
574
575 if (min_read_from == ~(TrID) 0 ||
576 min_read_from < transid_get_packed(share, page))
577 {
578 page[-1]&= 254; /* Remove transid marker */
579 transid_length= transid_packed_length(page);
580 if (start_of_empty_space)
581 {
582 /* Move block before the transid up in page */
583 uint copy_length= (uint) (page - start_of_empty_space) - saved_space;
584 memmove(start_of_empty_space, start_of_empty_space + saved_space,
585 copy_length);
586 start_of_empty_space+= copy_length;
587 }
588 else
589 start_of_empty_space= page;
590 saved_space+= transid_length;
591 }
592 else
593 page_has_transid= 1; /* At least one id left */
594 page+= transid_length;
595 }
596 page+= nod_flag;
597 } while (page < endpos);
598
599 DBUG_ASSERT(page == endpos);
600
601 if (start_of_empty_space)
602 {
603 /*
604 Move last block down
605 This is always true if any transid was removed
606 */
607 uint copy_length= (uint) (endpos - start_of_empty_space) - saved_space;
608
609 if (copy_length)
610 memmove(start_of_empty_space, start_of_empty_space + saved_space,
611 copy_length);
612 ma_page->size= (uint) (start_of_empty_space + copy_length - ma_page->buff);
613 page_store_size(share, ma_page);
614 }
615
616 if (!page_has_transid)
617 {
618 ma_page->flag&= ~KEYPAGE_FLAG_HAS_TRANSID;
619 _ma_store_keypage_flag(share, ma_page->buff, ma_page->flag);
620 /* Clear packed transid (in case of zerofill) */
621 bzero(ma_page->buff + LSN_STORE_SIZE, TRANSID_SIZE);
622 }
623
624 if (share->now_transactional)
625 {
626 if (_ma_log_compact_keypage(ma_page, min_read_from))
627 DBUG_RETURN(1);
628 }
629 DBUG_RETURN(0);
630}
631