1/* Copyright (C) 2007-2008 Michael Widenius
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16/*
17 Storage of records in block
18
19 Some clarifications about the abbrev used:
20
21 NULL fields -> Fields that may have contain a NULL value.
22 Not null fields -> Fields that may not contain a NULL value.
23 Critical fields -> Fields that can't be null and can't be dropped without
24 causing a table reorganization.
25
26
27 Maria will have a LSN at start of each page (excluding the bitmap pages)
28
29 The different page types that are in a data file are:
30
31 Bitmap pages Map of free pages in the next extent (8192 page size
32 gives us 256M of mapped pages / bitmap)
33 Head page Start of rows are stored on this page.
34 A rowid always points to a head page
35 Blob page This page is totally filled with data from one blob or by
36 a set of long VARCHAR/CHAR fields
37 Tail page This contains the last part from different rows, blobs
38 or varchar fields.
39
40 The data file starts with a bitmap page, followed by as many data
41 pages as the bitmap can cover. After this there is a new bitmap page
42 and more data pages etc.
43
44 For information about the bitmap page, see ma_bitmap.c
45
46 Structure of data and tail page:
47
48 The page has a row directory at end of page to allow us to do deletes
49 without having to reorganize the page. It also allows us to later store
50 some more bytes after each row to allow them to grow without having to move
51 around other rows.
52
53 Page header:
54
55 LSN 7 bytes Log position for last page change
56 PAGE_TYPE 1 uchar 1 for head / 2 for tail / 3 for blob
57 DIR_COUNT 1 uchar Number of row/tail entries on page
58 FREE_DIR_LINK 1 uchar Pointer to first free director entry or 255 if no
59 empty space 2 bytes Empty space on page
60
61 The most significant bit in PAGE_TYPE is set to 1 if the data on the page
62 can be compacted to get more space. (PAGE_CAN_BE_COMPACTED)
63
64 Row data
65
66 Row directory of NO entries, that consist of the following for each row
67 (in reverse order; i.e., first record is stored last):
68
69 Position 2 bytes Position of row on page
70 Length 2 bytes Length of entry
71
72 For Position and Length, the 1 most significant bit of the position and
73 the 1 most significant bit of the length could be used for some states of
74 the row (in other words, we should try to keep these reserved)
75
76 Position is 0 if the entry is not used. In this case length[0] points
77 to a previous free entry (255 if no previous entry) and length[1]
78 to the next free entry (or 255 if last free entry). This works because
79 the directory entry 255 can never be marked free (if the first directory
80 entry is freed, the directory is shrinked).
81
82 checksum 4 bytes Reserved for full page read testing and live backup.
83
84 ----------------
85
86 Structure of blob pages:
87
88 LSN 7 bytes Log position for last page change
89 PAGE_TYPE 1 uchar 3
90
91 data
92
93 -----------------
94
95 Row data structure:
96
97 Flag 1 uchar Marker of which header field exists
98 TRANSID 6 bytes TRANSID of changing transaction
99 (optional, added on insert and first
100 update/delete)
101 VER_PTR 7 bytes Pointer to older version in log
102 (undo record)
103 (optional, added after first
104 update/delete)
105 DELETE_TRANSID 6 bytes (optional). TRANSID of original row.
106 Added on delete.
107 Nulls_extended 1 uchar To allow us to add new DEFAULT NULL
108 fields (optional, added after first
109 change of row after alter table)
110 Number of ROW_EXTENT's 1-3 uchar Length encoded, optional
111 This is the number of extents the
112 row is split into
113 First row_extent 7 uchar Pointer to first row extent (optional)
114
115 Total length of length array 1-3 uchar Only used if we have
116 char/varchar/blob fields.
117 Row checksum 1 uchar Only if table created with checksums
118 Null_bits .. One bit for each NULL field (a field that may
119 have the value NULL)
120 Empty_bits .. One bit for each field that may be 'empty'.
121 (Both for null and not null fields).
122 This bit is 1 if the value for the field is
123 0 or empty string.
124
125 field_offsets 2 byte/offset
126 For each 32'th field, there is one offset
127 that points to where the field information
128 starts in the block. This is to provide
129 fast access to later field in the row
130 when we only need to return a small
131 set of fields.
132 TODO: Implement this.
133
134 Things marked above as 'optional' will only be present if the
135 corresponding bit is set in 'Flag' field. Flag gives us a way to
136 get more space on a page when doing page compaction as we don't need
137 to store TRANSID that have committed before the smallest running
138 transaction we have in memory.
139
140 Data in the following order:
141 (Field order is precalculated when table is created)
142
143 Critical fixed length, not null, fields. (Note, these can't be dropped)
144 Fixed length, null fields
145
146 Length array, 1-4 uchar per field for all CHAR/VARCHAR/BLOB fields.
147 Number of bytes used in length array per entry is depending on max length
148 for field.
149
150 ROW_EXTENT's
151 CHAR data (space stripped)
152 VARCHAR data
153 BLOB data
154
155 Fields marked in null_bits or empty_bits are not stored in data part or
156 length array.
157
158 If row doesn't fit into the given block, then the first EXTENT will be
159 stored last on the row. This is done so that we don't break any field
160 data in the middle.
161
162 We first try to store the full row into one block. If that's not possible
163 we move out each big blob into their own extents. If this is not enough we
164 move out a concatenation of all varchars to their own extent.
165
166 Each blob and the concatenated char/varchar fields are stored the following
167 way:
168 - Store the parts in as many full-contiguous pages as possible.
169 - The last part, that doesn't fill a full page, is stored in tail page.
170
171 When doing an insert of a new row, we don't have to have
172 VER_PTR in the row. This will make rows that are not changed stored
173 efficiently. On update and delete we would add TRANSID (if it was an old
174 committed row) and VER_PTR to
175 the row. On row page compaction we can easily detect rows where
176 TRANSID was committed before the longest running transaction
177 started and we can then delete TRANSID and VER_PTR from the row to
178 gain more space.
179
180 If a row is deleted in Maria, we change TRANSID to the deleting
181 transaction's id, change VER_PTR to point to the undo record for the delete,
182 and add DELETE_TRANSID (the id of the transaction which last
183 inserted/updated the row before its deletion). DELETE_TRANSID allows an old
184 transaction to avoid reading the log to know if it can see the last version
185 before delete (in other words it reduces the probability of having to follow
186 VER_PTR). TODO: depending on a compilation option, evaluate the performance
187 impact of not storing DELETE_TRANSID (which would make the row smaller).
188
189 Description of the different parts:
190
191 Flag is coded as:
192
193 Description bit
194 TRANS_ID_exists 0
195 VER_PTR_exists 1
196 Row is deleted 2 (Means that DELETE_TRANSID exists)
197 Nulls_extended_exists 3
198 Row is split 7 This means that 'Number_of_row_extents' exists
199
200 Nulls_extended is the number of new DEFAULT NULL fields in the row
201 compared to the number of DEFAULT NULL fields when the first version
202 of the table was created. If Nulls_extended doesn't exist in the row,
203 we know it's 0 as this must be one of the original rows from when the
204 table was created first time. This coding allows us to add 255*8 =
205 2048 new fields without requiring a full alter table.
206
207 Empty_bits is used to allow us to store 0, 0.0, empty string, empty
208 varstring and empty blob efficiently. (This is very good for data
209 warehousing where NULL's are often regarded as evil). Having this
210 bitmap also allows us to drop information of a field during a future
211 delete if field was deleted with ALTER TABLE DROP COLUMN. To be able
212 to handle DROP COLUMN, we must store in the index header the fields
213 that has been dropped. When unpacking a row we will ignore dropped
214 fields. When storing a row, we will mark a dropped field either with a
215 null in the null bit map or in the empty_bits and not store any data
216 for it.
217 TODO: Add code for handling dropped fields.
218
219
220 A ROW EXTENT is range of pages. One ROW_EXTENT is coded as:
221
222 START_PAGE 5 bytes
223 PAGE_COUNT 2 bytes. Bit 16 is set if this is a tail page.
224 Bit 15 is to set if this is start of a new
225 blob extent.
226
227 With 8K pages, we can cover 256M in one extent. This coding gives us a
228 maximum file size of 2^40*8192 = 8192 tera
229
230 As an example of ROW_EXTENT handling, assume a row with one integer
231 field (value 5), two big VARCHAR fields (size 250 and 8192*3), and 2
232 big BLOB fields that we have updated.
233
234 The record format for storing this into an empty file would be:
235
236 Page 1:
237
238 00 00 00 00 00 00 00 LSN
239 01 Only one row in page
240 FF No free dir entry
241 xx xx Empty space on page
242
243 10 Flag: row split, VER_PTR exists
244 01 00 00 00 00 00 TRANSID 1
245 00 00 00 00 00 01 00 VER_PTR to first block in LOG file 1
246 5 Number of row extents
247 02 00 00 00 00 03 00 VARCHAR's are stored in full pages 2,3,4
248 0 No null fields
249 0 No empty fields
250 05 00 00 00 00 00 80 Tail page for VARCHAR, rowid 0
251 06 00 00 00 00 80 00 First blob, stored at page 6-133
252 05 00 00 00 00 01 80 Tail of first blob (896 bytes) at page 5
253 86 00 00 00 00 80 00 Second blob, stored at page 134-262
254 05 00 00 00 00 02 80 Tail of second blob (896 bytes) at page 5
255 05 00 5 integer
256 FA Length of first varchar field (size 250)
257 00 60 Length of second varchar field (size 8192*3)
258 00 60 10 First medium BLOB, 1M
259 01 00 10 00 Second BLOB, 1M
260 xx xx xx xx xx xx Varchars are stored here until end of page
261
262 ..... until end of page
263
264 09 00 F4 1F Start position 9, length 8180
265 xx xx xx xx Checksum
266
267 A data page is allowed to have a wrong CRC and header as long as it is
268 marked empty in the bitmap and its directory's count is 0.
269*/
270
271#include "maria_def.h"
272#include "ma_blockrec.h"
273#include "trnman.h"
274#include "ma_key_recover.h"
275#include "ma_recovery_util.h"
276#include <lf.h>
277
278/*
279 Struct for having a cursor over a set of extent.
280 This is used to loop over all extents for a row when reading
281 the row data. It's also used to store the tail positions for
282 a read row to be used by a later update/delete command.
283*/
284
285typedef struct st_maria_extent_cursor
286{
287 /*
288 Pointer to packed uchar array of extents for the row.
289 Format is described above in the header
290 */
291 uchar *extent;
292 /* Where data starts on page; Only for debugging */
293 uchar *data_start;
294 /* Position to all tails in the row. Updated when reading a row */
295 MARIA_RECORD_POS *tail_positions;
296 /* Current page */
297 pgcache_page_no_t page;
298 /* How many pages in the page region */
299 uint page_count;
300 /* What kind of lock to use for tail pages */
301 enum pagecache_page_lock lock_for_tail_pages;
302 /* Total number of extents (i.e., entries in the 'extent' slot) */
303 uint extent_count;
304 /* <> 0 if current extent is a tail page; Set while using cursor */
305 uint tail;
306 /* Position for tail on tail page */
307 uint tail_row_nr;
308 /*
309 == 1 if we are working on the first extent (i.e., the one that is stored in
310 the row header, not an extent that is stored as part of the row data).
311 */
312 my_bool first_extent;
313} MARIA_EXTENT_CURSOR;
314
315
316/**
317 @brief Structure for passing down info to write_hook_for_clr_end().
318 This hooks needs to know the variation of the live checksum caused by the
319 current operation to update state.checksum under log's mutex,
320 needs to know the transaction's previous undo_lsn to set
321 trn->undo_lsn under log mutex, and needs to know the type of UNDO being
322 undone now to modify state.records under log mutex.
323*/
324
325/** S:share,D:checksum_delta,E:expression,P:pointer_into_record,L:length */
326#define store_checksum_in_rec(S,D,E,P,L) do \
327 { \
328 D= 0; \
329 if ((S)->calc_checksum != NULL) \
330 { \
331 D= (E); \
332 ha_checksum_store(P, D); \
333 L+= HA_CHECKSUM_STORE_SIZE; \
334 } \
335 } while (0)
336
337
338static my_bool delete_tails(MARIA_HA *info, MARIA_RECORD_POS *tails);
339static my_bool delete_head_or_tail(MARIA_HA *info,
340 pgcache_page_no_t page, uint record_number,
341 my_bool head, my_bool from_update);
342#ifndef DBUG_OFF
343static void _ma_print_directory(MARIA_SHARE *share,
344 FILE *file, uchar *buff, uint block_size);
345#endif
346static uchar *store_page_range(MARIA_SHARE *share,
347 uchar *to, MARIA_BITMAP_BLOCK *block,
348 ulong length,
349 uint *tot_ranges);
350static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
351 LEX_CUSTRING *log_parts,
352 uint *log_parts_count);
353static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
354 const uchar *newrec,
355 LEX_CUSTRING *log_parts,
356 uint *log_parts_count);
357
358/****************************************************************************
359 Initialization
360****************************************************************************/
361
362/*
363 Initialize data needed for block structures
364*/
365
366
367/* Size of the different header elements for a row */
368
369static uchar header_sizes[]=
370{
371 TRANSID_SIZE,
372 VERPTR_SIZE,
373 TRANSID_SIZE, /* Delete transid */
374 1 /* Null extends */
375};
376
377/*
378 Calculate array of all used headers
379
380 Used to speed up:
381
382 size= 1;
383 if (flag & 1)
384 size+= TRANSID_SIZE;
385 if (flag & 2)
386 size+= VERPTR_SIZE;
387 if (flag & 4)
388 size+= TRANSID_SIZE
389 if (flag & 8)
390 size+= 1;
391
392 NOTES
393 This is called only once at startup of Maria
394*/
395
396static uchar total_header_size[1 << array_elements(header_sizes)];
397#define PRECALC_HEADER_BITMASK (array_elements(total_header_size) -1)
398
399void _ma_init_block_record_data(void)
400{
401 uint i;
402 bzero(total_header_size, sizeof(total_header_size));
403 total_header_size[0]= FLAG_SIZE; /* Flag uchar */
404 for (i= 1; i < array_elements(total_header_size); i++)
405 {
406 uint size= FLAG_SIZE, j, bit;
407 for (j= 0; (bit= (1 << j)) <= i; j++)
408 {
409 if (i & bit)
410 size+= header_sizes[j];
411 }
412 total_header_size[i]= size;
413 }
414}
415
416
417my_bool _ma_once_init_block_record(MARIA_SHARE *share, File data_file)
418{
419 my_bool res;
420 pgcache_page_no_t last_page;
421
422 /*
423 First calculate the max file length with can have with a pointer of size
424 rec_reflength.
425
426 The 'rec_reflength - 1' is because one byte is used for row
427 position withing the page.
428 The /2 comes from _ma_transaction_recpos_to_keypos() where we use
429 the lowest bit to mark if there is a transid following the rownr.
430 */
431 last_page= ((ulonglong) 1 << ((share->base.rec_reflength-1)*8))/2;
432 if (!last_page) /* Overflow; set max size */
433 last_page= ~(pgcache_page_no_t) 0;
434
435 res= _ma_bitmap_init(share, data_file, &last_page);
436 share->base.max_data_file_length= _ma_safe_mul(last_page + 1,
437 share->block_size);
438#if SIZEOF_OFF_T == 4
439 set_if_smaller(share->base.max_data_file_length, INT_MAX32);
440#endif
441 return res;
442}
443
444
445my_bool _ma_once_end_block_record(MARIA_SHARE *share)
446{
447 int res= _ma_bitmap_end(share);
448 if (share->bitmap.file.file >= 0)
449 {
450 if (flush_pagecache_blocks(share->pagecache, &share->bitmap.file,
451 ((share->temporary || share->deleting) ?
452 FLUSH_IGNORE_CHANGED :
453 FLUSH_RELEASE)))
454 res= 1;
455 /*
456 File must be synced as it is going out of the maria_open_list and so
457 becoming unknown to Checkpoint.
458 */
459 if (share->now_transactional &&
460 mysql_file_sync(share->bitmap.file.file, MYF(MY_WME)))
461 res= 1;
462 if (mysql_file_close(share->bitmap.file.file, MYF(MY_WME)))
463 res= 1;
464 /*
465 Trivial assignment to guard against multiple invocations
466 (May happen if file are closed but we want to keep the maria object
467 around a bit longer)
468 */
469 share->bitmap.file.file= -1;
470 }
471 if (share->id != 0)
472 {
473 /*
474 We de-assign the id even though index has not been flushed, this is ok
475 as close_lock serializes us with a Checkpoint looking at our share.
476 */
477 translog_deassign_id_from_share(share);
478 }
479 return res;
480}
481
482
483/* Init info->cur_row structure */
484
485my_bool _ma_init_block_record(MARIA_HA *info)
486{
487 MARIA_ROW *row= &info->cur_row, *new_row= &info->new_row;
488 MARIA_SHARE *share= info->s;
489 uint default_extents;
490 DBUG_ENTER("_ma_init_block_record");
491
492 if (!my_multi_malloc(MY_WME,
493 &row->empty_bits, share->base.pack_bytes,
494 &row->field_lengths,
495 share->base.max_field_lengths + 2,
496 &row->blob_lengths, sizeof(ulong) * share->base.blobs,
497 &row->null_field_lengths, (sizeof(uint) *
498 (share->base.fields -
499 share->base.blobs +
500 EXTRA_LENGTH_FIELDS)),
501 &row->tail_positions, (sizeof(MARIA_RECORD_POS) *
502 (share->base.blobs + 2)),
503 &new_row->empty_bits, share->base.pack_bytes,
504 &new_row->field_lengths,
505 share->base.max_field_lengths + 2,
506 &new_row->blob_lengths,
507 sizeof(ulong) * share->base.blobs,
508 &new_row->null_field_lengths, (sizeof(uint) *
509 (share->base.fields -
510 share->base.blobs +
511 EXTRA_LENGTH_FIELDS)),
512 &info->log_row_parts,
513 sizeof(*info->log_row_parts) *
514 (TRANSLOG_INTERNAL_PARTS + 3 +
515 share->base.fields + 3),
516 &info->update_field_data,
517 (share->base.fields * 4 +
518 share->base.max_field_lengths + 1 + 4),
519 NullS, 0))
520 DBUG_RETURN(1);
521 /* Skip over bytes used to store length of field length for logging */
522 row->field_lengths+= 2;
523 new_row->field_lengths+= 2;
524
525 /* Reserve some initial space to avoid mallocs during execution */
526 default_extents= (ELEMENTS_RESERVED_FOR_MAIN_PART + 1 +
527 (AVERAGE_BLOB_SIZE /
528 FULL_PAGE_SIZE(share) /
529 BLOB_SEGMENT_MIN_SIZE));
530
531 if (my_init_dynamic_array(&info->bitmap_blocks,
532 sizeof(MARIA_BITMAP_BLOCK), default_extents,
533 64, MYF(0)))
534 goto err;
535 info->cur_row.extents_buffer_length= default_extents * ROW_EXTENT_SIZE;
536 if (!(info->cur_row.extents= my_malloc(info->cur_row.extents_buffer_length,
537 MYF(MY_WME))))
538 goto err;
539
540 info->row_base_length= share->base_length;
541 info->row_flag= share->base.default_row_flag;
542
543 /*
544 We need to reserve 'EXTRA_LENGTH_FIELDS' number of parts in
545 null_field_lengths to allow splitting of rows in 'find_where_to_split_row'
546 */
547 row->null_field_lengths+= EXTRA_LENGTH_FIELDS;
548 new_row->null_field_lengths+= EXTRA_LENGTH_FIELDS;
549
550 DBUG_RETURN(0);
551
552err:
553 _ma_end_block_record(info);
554 DBUG_RETURN(1);
555}
556
557
558void _ma_end_block_record(MARIA_HA *info)
559{
560 DBUG_ENTER("_ma_end_block_record");
561 my_free(info->cur_row.empty_bits);
562 delete_dynamic(&info->bitmap_blocks);
563 my_free(info->cur_row.extents);
564 my_free(info->blob_buff);
565 /*
566 The data file is closed, when needed, in ma_once_end_block_record().
567 The following protects us from doing an extra, not allowed, close
568 in maria_close()
569 */
570 info->dfile.file= -1;
571 DBUG_VOID_RETURN;
572}
573
574
575/****************************************************************************
576 Helper functions
577****************************************************************************/
578
579/*
580 Return the next unused postion on the page after a directory entry.
581
582 SYNOPSIS
583 start_of_next_entry()
584 dir Directory entry to be used. This can not be the
585 the last entry on the page!
586
587 RETURN
588 # Position in page where next entry starts.
589 Everything between the '*dir' and this are free to be used.
590*/
591
592static inline uint start_of_next_entry(uchar *dir)
593{
594 uchar *prev;
595 /*
596 Find previous used entry. (There is always a previous entry as
597 the directory never starts with a deleted entry)
598 */
599 for (prev= dir - DIR_ENTRY_SIZE ;
600 prev[0] == 0 && prev[1] == 0 ;
601 prev-= DIR_ENTRY_SIZE)
602 {}
603 return (uint) uint2korr(prev);
604}
605
606
607/*
608 Return the offset where the previous entry ends (before on page)
609
610 SYNOPSIS
611 end_of_previous_entry()
612 dir Address for current directory entry
613 end Address to last directory entry
614
615 RETURN
616 # Position where previous entry ends (smallest address on page)
617 Everything between # and current entry are free to be used.
618*/
619
620
621static inline uint end_of_previous_entry(MARIA_SHARE *share,
622 uchar *dir, uchar *end)
623{
624 uchar *pos;
625 for (pos= dir + DIR_ENTRY_SIZE ; pos < end ; pos+= DIR_ENTRY_SIZE)
626 {
627 uint offset;
628 if ((offset= uint2korr(pos)))
629 return offset + uint2korr(pos+2);
630 }
631 return PAGE_HEADER_SIZE(share);
632}
633
634
635#ifndef DBUG_OFF
636
637static void _ma_print_directory(MARIA_SHARE *share,
638 FILE *file, uchar *buff, uint block_size)
639{
640 uint max_entry= (uint) ((uchar *) buff)[DIR_COUNT_OFFSET], row= 0;
641 uint end_of_prev_row= PAGE_HEADER_SIZE(share);
642 uchar *dir, *end;
643
644 dir= dir_entry_pos(buff, block_size, max_entry-1);
645 end= dir_entry_pos(buff, block_size, 0);
646
647 DBUG_LOCK_FILE; /* If using DBUG_FILE */
648 fprintf(file,"Directory dump (pos:length):\n");
649
650 for (row= 1; dir <= end ; end-= DIR_ENTRY_SIZE, row++)
651 {
652 uint offset= uint2korr(end);
653 uint length= uint2korr(end+2);
654 fprintf(file, " %4u:%4u", offset, offset ? length : 0);
655 if (!(row % (80/12)))
656 fputc('\n', file);
657 if (offset)
658 {
659 DBUG_ASSERT(offset >= end_of_prev_row);
660 end_of_prev_row= offset + length;
661 }
662 }
663 fputc('\n', file);
664 fflush(file);
665 DBUG_UNLOCK_FILE;
666}
667
668
669static void check_directory(MARIA_SHARE *share,
670 uchar *buff, uint block_size, uint min_row_length,
671 uint real_empty_size)
672{
673 uchar *dir, *end;
674 uint max_entry= (uint) buff[DIR_COUNT_OFFSET];
675 uint start_of_dir, deleted;
676 uint end_of_prev_row= PAGE_HEADER_SIZE(share);
677 uint empty_size_on_page;
678 uint empty_size;
679 uchar free_entry, prev_free_entry;
680
681 dir= dir_entry_pos(buff, block_size, max_entry-1);
682 start_of_dir= (uint) (dir - buff);
683 end= dir_entry_pos(buff, block_size, 0);
684 deleted= empty_size= 0;
685
686 empty_size_on_page= (real_empty_size != (uint) -1 ? real_empty_size :
687 uint2korr(buff + EMPTY_SPACE_OFFSET));
688
689 /* Ensure that all rows are in increasing order and no overlaps */
690 for (; dir <= end ; end-= DIR_ENTRY_SIZE)
691 {
692 uint offset= uint2korr(end);
693 uint length= uint2korr(end+2);
694 if (offset)
695 {
696 DBUG_ASSERT(offset >= end_of_prev_row);
697 DBUG_ASSERT(!length || length >= min_row_length);
698 empty_size+= offset - end_of_prev_row;
699 end_of_prev_row= offset + length;
700 }
701 else
702 deleted++;
703 }
704 empty_size+= start_of_dir - end_of_prev_row;
705 DBUG_ASSERT(end_of_prev_row <= start_of_dir);
706 DBUG_ASSERT(empty_size == empty_size_on_page);
707
708 /* check free links */
709 free_entry= buff[DIR_FREE_OFFSET];
710 prev_free_entry= END_OF_DIR_FREE_LIST;
711 while (free_entry != END_OF_DIR_FREE_LIST)
712 {
713 uchar *dir= dir_entry_pos(buff, block_size, free_entry);
714 DBUG_ASSERT(dir[0] == 0 && dir[1] == 0);
715 DBUG_ASSERT(dir[2] == prev_free_entry);
716 prev_free_entry= free_entry;
717 free_entry= dir[3];
718 deleted--;
719 }
720 DBUG_ASSERT(deleted == 0);
721}
722#else
723#define check_directory(A,B,C,D,E)
724#endif /* DBUG_OFF */
725
726
727/**
728 @brief Calculate if there is enough entries on the page
729*/
730
731static my_bool enough_free_entries(uchar *buff, uint block_size,
732 uint wanted_entries)
733{
734 uint entries= (uint) buff[DIR_COUNT_OFFSET];
735 uint needed_free_entries, free_entry;
736
737 if (entries + wanted_entries <= MAX_ROWS_PER_PAGE)
738 return 1;
739
740 /* Check if enough free entries in free list */
741 needed_free_entries= entries + wanted_entries - MAX_ROWS_PER_PAGE;
742
743 free_entry= (uint) buff[DIR_FREE_OFFSET];
744 while (free_entry != END_OF_DIR_FREE_LIST)
745 {
746 uchar *dir;
747 if (!--needed_free_entries)
748 return 1;
749 dir= dir_entry_pos(buff, block_size, free_entry);
750 free_entry= dir[3];
751 }
752 return 0; /* Not enough entries */
753}
754
755
756/**
757 @brief Check if there is room for more rows on page
758
759 @fn enough_free_entries_on_page
760
761 @return 0 Directory is full
762 @return 1 There is room for more entries on the page
763*/
764
765my_bool enough_free_entries_on_page(MARIA_SHARE *share,
766 uchar *page_buff)
767{
768 enum en_page_type page_type;
769 page_type= (enum en_page_type) (page_buff[PAGE_TYPE_OFFSET] &
770 ~(uchar) PAGE_CAN_BE_COMPACTED);
771
772 if (page_type == HEAD_PAGE)
773 {
774 uint row_count= (uint) page_buff[DIR_COUNT_OFFSET];
775 return !(row_count == MAX_ROWS_PER_PAGE &&
776 page_buff[DIR_FREE_OFFSET] == END_OF_DIR_FREE_LIST);
777 }
778 return enough_free_entries(page_buff, share->block_size,
779 1 + share->base.blobs);
780}
781
782
783/**
784 @brief Extend a record area to fit a given size block
785
786 @fn extend_area_on_page()
787 @param info Handler
788 @param buff Page buffer
789 @param dir Pointer to dir entry in buffer
790 @param rownr Row number we working on
791 @param block_size Block size of buffer
792 @param request_length How much data we want to put at [dir]
793 @param empty_space Total empty space in buffer
794 This is updated with length after dir
795 is allocated and current block freed
796 @param head_page 1 if head page, 0 for tail page
797
798 @implementation
799 The logic is as follows (same as in _ma_update_block_record())
800 - If new data fits in old block, use old block.
801 - Extend block with empty space before block. If enough, use it.
802 - Extend block with empty space after block. If enough, use it.
803 - Use _ma_compact_block_page() to get all empty space at dir.
804
805 @note
806 The given directory entry is set to rec length.
807 empty_space doesn't include the new directory entry
808
809
810 @return
811 @retval 0 ok
812 @retval ret_offset Pointer to store offset to found area
813 @retval ret_length Pointer to store length of found area
814 @retval [dir] rec_offset is store here too
815
816 @retval 1 error (wrong info in block)
817*/
818
819static my_bool extend_area_on_page(MARIA_HA *info,
820 uchar *buff, uchar *dir,
821 uint rownr,
822 uint request_length,
823 uint *empty_space, uint *ret_offset,
824 uint *ret_length,
825 my_bool head_page)
826{
827 uint rec_offset, length, org_rec_length;
828 uint max_entry= (uint) buff[DIR_COUNT_OFFSET];
829 MARIA_SHARE *share= info->s;
830 uint block_size= share->block_size;
831 DBUG_ENTER("extend_area_on_page");
832
833 /*
834 We can't check for min length here as we may have called
835 extend_directory() to create a new (empty) entry just before
836 */
837 check_directory(share, buff, block_size, 0, *empty_space);
838
839 rec_offset= uint2korr(dir);
840 if (rec_offset)
841 {
842 /* Extending old row; Mark current space as 'free' */
843 length= org_rec_length= uint2korr(dir + 2);
844 DBUG_PRINT("info", ("rec_offset: %u length: %u request_length: %u "
845 "empty_space: %u",
846 rec_offset, org_rec_length, request_length,
847 *empty_space));
848
849 *empty_space+= org_rec_length;
850 }
851 else
852 {
853 /* Reusing free directory entry; Free it from the directory list */
854 if (dir[2] == END_OF_DIR_FREE_LIST)
855 buff[DIR_FREE_OFFSET]= dir[3];
856 else
857 {
858 uchar *prev_dir= dir_entry_pos(buff, block_size, (uint) dir[2]);
859 DBUG_ASSERT(uint2korr(prev_dir) == 0 && prev_dir[3] == (uchar) rownr);
860 prev_dir[3]= dir[3];
861 }
862 if (dir[3] != END_OF_DIR_FREE_LIST)
863 {
864 uchar *next_dir= dir_entry_pos(buff, block_size, (uint) dir[3]);
865 DBUG_ASSERT(uint2korr(next_dir) == 0 && next_dir[2] == (uchar) rownr);
866 next_dir[2]= dir[2];
867 }
868 rec_offset= start_of_next_entry(dir);
869 length= 0;
870 }
871 if (length < request_length)
872 {
873 uint old_rec_offset;
874 /*
875 New data did not fit in old position.
876 Find first possible position where to put new data.
877 */
878 old_rec_offset= rec_offset;
879 rec_offset= end_of_previous_entry(share,
880 dir, buff + block_size -
881 PAGE_SUFFIX_SIZE);
882 length+= (uint) (old_rec_offset - rec_offset);
883 DBUG_ASSERT(old_rec_offset);
884 /*
885 'length' is 0 if we are doing an insert into a not allocated block.
886 This can only happen during "REDO of INSERT" or "UNDO of DELETE."
887 */
888 if (length < request_length)
889 {
890 /*
891 Did not fit in current block + empty space. Extend with
892 empty space after block.
893 */
894 if (rownr == max_entry - 1)
895 {
896 /* Last entry; Everything is free between this and directory */
897 length= ((block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE * max_entry) -
898 rec_offset);
899 }
900 else
901 length= start_of_next_entry(dir) - rec_offset;
902 DBUG_ASSERT((int) length >= 0);
903 if (length < request_length)
904 {
905 /* Not enough continuous space, compact page to get more */
906 int2store(dir, rec_offset);
907 /* Reset length, as this may be a deleted block */
908 int2store(dir+2, 0);
909 _ma_compact_block_page(share,
910 buff, rownr, 1,
911 head_page ? info->trn->min_read_from: 0,
912 head_page ? share->base.min_block_length : 0);
913 rec_offset= uint2korr(dir);
914 length= uint2korr(dir+2);
915 if (length < request_length)
916 {
917 DBUG_PRINT("error", ("Not enough space: "
918 "length: %u request_length: %u",
919 length, request_length));
920 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
921 DBUG_RETURN(1); /* Error in block */
922 }
923 *empty_space= length; /* All space is here */
924 }
925 }
926 }
927 int2store(dir, rec_offset);
928 int2store(dir + 2, length);
929 *ret_offset= rec_offset;
930 *ret_length= length;
931
932 check_directory(share,
933 buff, block_size,
934 head_page ? share->base.min_block_length : 0,
935 *empty_space - length);
936 DBUG_RETURN(0);
937}
938
939
940/**
941 @brief Copy not changed fields from 'from' to 'to'
942
943 @notes
944 Assumption is that most fields are not changed!
945 (Which is why we don't test if all bits are set for some bytes in bitmap)
946*/
947
948void copy_not_changed_fields(MARIA_HA *info, MY_BITMAP *changed_fields,
949 uchar *to, uchar *from)
950{
951 MARIA_COLUMNDEF *column, *end_column;
952 uchar *bitmap= (uchar*) changed_fields->bitmap;
953 MARIA_SHARE *share= info->s;
954 uint bit= 1;
955
956 for (column= share->columndef, end_column= column+ share->base.fields;
957 column < end_column; column++)
958 {
959 if (!(*bitmap & bit))
960 {
961 uint field_length= column->length;
962 if (column->type == FIELD_VARCHAR)
963 {
964 if (column->fill_length == 1)
965 field_length= (uint) from[column->offset] + 1;
966 else
967 field_length= uint2korr(from + column->offset) + 2;
968 }
969 memcpy(to + column->offset, from + column->offset, field_length);
970 }
971 if ((bit= (bit << 1)) == 256)
972 {
973 bitmap++;
974 bit= 1;
975 }
976 }
977}
978
979#ifdef NOT_YET_NEEDED
980/* Calculate empty space on a page */
981
982static uint empty_space_on_page(uchar *buff, uint block_size)
983{
984 enum en_page_type;
985 page_type= (enum en_page_type) (buff[PAGE_TYPE_OFFSET] &
986 ~(uchar) PAGE_CAN_BE_COMPACTED);
987 if (page_type == UNALLOCATED_PAGE)
988 return block_size;
989 if ((uint) page_type <= TAIL_PAGE)
990 return uint2korr(buff+EMPTY_SPACE_OFFSET);
991 return 0; /* Blob page */
992}
993#endif
994
995
996/*
997 @brief Ensure we have space for new directory entries
998
999 @fn make_space_for_directory()
1000 @param info Handler
1001 @param buff Page buffer
1002 @param max_entry Number of current entries in directory
1003 @param count Number of new entries to be added to directory
1004 @param first_dir First directory entry on page
1005 @param empty_space Total empty space in buffer. It's updated
1006 to reflect the new empty space
1007 @param first_pos Store position to last data byte on page here
1008 @param head_page 1 if head page, 0 for tail page.
1009
1010 @note
1011 This function is inline as the argument passing is the biggest
1012 part of the function
1013
1014 @return
1015 @retval 0 ok
1016 @retval 1 error (No data on page, fatal error)
1017*/
1018
1019static inline my_bool
1020make_space_for_directory(MARIA_HA *info,
1021 uchar *buff, uint max_entry,
1022 uint count, uchar *first_dir, uint *empty_space,
1023 uint *first_pos,
1024 my_bool head_page)
1025{
1026 uint length_needed= DIR_ENTRY_SIZE * count;
1027 MARIA_SHARE *share= info->s;
1028
1029 /*
1030 The following is not true only in the case and UNDO is used to reinsert
1031 a row on a previously not used page
1032 */
1033 if (likely(max_entry))
1034 {
1035 /* Check if there is place for the directory entry on the page */
1036 *first_pos= uint2korr(first_dir) + uint2korr(first_dir + 2);
1037
1038 if ((uint) (first_dir - buff) < *first_pos + length_needed)
1039 {
1040 /* Create place for directory */
1041 _ma_compact_block_page(share,
1042 buff, max_entry - 1, 0,
1043 head_page ? info->trn->min_read_from : 0,
1044 head_page ? share->base.min_block_length : 0);
1045 *first_pos= (uint2korr(first_dir) + uint2korr(first_dir + 2));
1046 *empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
1047 if (*empty_space < length_needed)
1048 {
1049 /*
1050 We should always have space, as we only come here for
1051 UNDO of DELETE (in which case we know the row was on the
1052 page before) or if the bitmap told us there was space on page
1053 */
1054 DBUG_ASSERT(!maria_assert_if_crashed_table);
1055 return(1);
1056 }
1057 }
1058 }
1059 else
1060 *first_pos= PAGE_HEADER_SIZE(share);
1061
1062 /* Reduce directory entry size from free space size */
1063 (*empty_space)-= length_needed;
1064 buff[DIR_COUNT_OFFSET]= (uchar) (max_entry + count);
1065 return(0);
1066}
1067
1068
1069/*
1070 Find free position in directory
1071
1072 SYNOPSIS
1073 find_free_position()
1074 info Handler
1075 buff Page
1076 block_size Size of page
1077 res_rownr Store index to free position here
1078 res_length Store length of found segment here
1079 empty_space Store length of empty space on disk here. This is
1080 all empty space, including the found block.
1081 @param head_page 1 if head page, 0 for tail page.
1082
1083 NOTES
1084 If there is a free directory entry (entry with position == 0),
1085 then use it and change it to be the size of the empty block
1086 after the previous entry. This guarantees that all row entries
1087 are stored on disk in inverse directory order, which makes life easier for
1088 '_ma_compact_block_page()' and to know if there is free space after any
1089 block.
1090
1091 If there is no free entry (entry with position == 0), then we create
1092 a new one. If there is not space for the directory entry (because
1093 the last block overlapps with the directory), we compact the page.
1094
1095 We will update the offset and the length of the found dir entry to
1096 match the position and empty space found.
1097
1098 buff[EMPTY_SPACE_OFFSET] is NOT updated but left up to the caller
1099
1100 See start of file for description of how free directory entires are linked
1101
1102 RETURN
1103 0 Error (directory full or last block goes over directory)
1104 # Pointer to directory entry on page
1105*/
1106
1107static uchar *find_free_position(MARIA_HA *info,
1108 uchar *buff, uint block_size, uint *res_rownr,
1109 uint *res_length, uint *empty_space,
1110 my_bool head_page)
1111{
1112 uint max_entry, free_entry;
1113 uint length, first_pos;
1114 uchar *dir, *first_dir;
1115 MARIA_SHARE *share= info->s;
1116 DBUG_ENTER("find_free_position");
1117
1118 max_entry= (uint) buff[DIR_COUNT_OFFSET];
1119 free_entry= (uint) buff[DIR_FREE_OFFSET];
1120 *empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
1121
1122 DBUG_PRINT("info", ("max_entry: %u free_entry: %u", max_entry, free_entry));
1123
1124 first_dir= dir_entry_pos(buff, block_size, max_entry - 1);
1125
1126 /* Search after first free position */
1127 if (free_entry != END_OF_DIR_FREE_LIST)
1128 {
1129 if (free_entry >= max_entry)
1130 DBUG_RETURN(0); /* Consistency error */
1131 dir= dir_entry_pos(buff, block_size, free_entry);
1132 DBUG_ASSERT(uint2korr(dir) == 0 && dir[2] == END_OF_DIR_FREE_LIST);
1133 /* Relink free list */
1134 if ((buff[DIR_FREE_OFFSET]= dir[3]) != END_OF_DIR_FREE_LIST)
1135 {
1136 uchar *next_entry= dir_entry_pos(buff, block_size, (uint) dir[3]);
1137 DBUG_ASSERT((uint) next_entry[2] == free_entry &&
1138 uint2korr(next_entry) == 0);
1139 next_entry[2]= END_OF_DIR_FREE_LIST; /* Backlink */
1140 }
1141
1142 first_pos= end_of_previous_entry(share,
1143 dir, buff + block_size -
1144 PAGE_SUFFIX_SIZE);
1145 length= start_of_next_entry(dir) - first_pos;
1146 int2store(dir, first_pos); /* Update dir entry */
1147 int2store(dir + 2, 0);
1148 *res_rownr= free_entry;
1149 *res_length= length;
1150
1151 check_directory(share, buff, block_size,
1152 head_page ? share->base.min_block_length : 0, (uint) -1);
1153 DBUG_RETURN(dir);
1154 }
1155 /* No free places in dir; create a new one */
1156
1157 /* Check if there is place for the directory entry */
1158 if (max_entry == MAX_ROWS_PER_PAGE)
1159 DBUG_RETURN(0);
1160
1161 if (make_space_for_directory(info, buff, max_entry, 1,
1162 first_dir, empty_space, &first_pos, head_page))
1163 DBUG_RETURN(0);
1164
1165 dir= first_dir - DIR_ENTRY_SIZE;
1166 length= (uint) (dir - buff - first_pos);
1167 DBUG_ASSERT(length <= *empty_space);
1168 int2store(dir, first_pos);
1169 int2store(dir + 2, 0); /* Max length of region */
1170 *res_rownr= max_entry;
1171 *res_length= length;
1172
1173 check_directory(share,
1174 buff, block_size,
1175 head_page ? share->base.min_block_length : 0,
1176 *empty_space);
1177 DBUG_RETURN(dir);
1178}
1179
1180
1181/**
1182 @brief Enlarge page directory to hold more entries
1183
1184 @fn extend_directory()
1185 @param info Handler
1186 @param buff Page buffer
1187 @param block_size Block size
1188 @param max_entry Number of directory entries on page
1189 @param new_entry Position for new entry
1190 @param empty_space Total empty space in buffer. It's updated
1191 to reflect the new empty space
1192 @param head_page 1 if head page, 0 for tail page.
1193
1194 @note
1195 This is only called on UNDO when we want to expand the directory
1196 to be able to re-insert row in a given position
1197
1198 The new directory entry will be set to cover the maximum possible space
1199
1200 @return
1201 @retval 0 ok
1202 @retval 1 error (No data on page, fatal error)
1203*/
1204
1205static my_bool extend_directory(MARIA_HA *info, uchar *buff, uint block_size,
1206 uint max_entry, uint new_entry,
1207 uint *empty_space, my_bool head_page)
1208{
1209 uint length, first_pos;
1210 uchar *dir, *first_dir;
1211 DBUG_ENTER("extend_directory");
1212
1213 /*
1214 Note that in if max_entry is 0, then first_dir will point to
1215 an illegal directory entry. This is ok, as in this case we will
1216 not access anything through first_dir.
1217 */
1218 first_dir= dir_entry_pos(buff, block_size, max_entry) + DIR_ENTRY_SIZE;
1219
1220 if (make_space_for_directory(info, buff, max_entry,
1221 new_entry - max_entry + 1,
1222 first_dir, empty_space, &first_pos, head_page))
1223 DBUG_RETURN(1);
1224
1225 /* Set the new directory entry to cover the max possible length */
1226 dir= first_dir - DIR_ENTRY_SIZE * (new_entry - max_entry + 1);
1227 length= (uint) (dir - buff - first_pos);
1228 int2store(dir, first_pos);
1229 int2store(dir+2, length);
1230 *empty_space-= length;
1231
1232 if (new_entry-- > max_entry)
1233 {
1234 /* Link all row entries between new_entry and max_entry into free list */
1235 uint free_entry= (uint) buff[DIR_FREE_OFFSET];
1236 uint prev_entry= END_OF_DIR_FREE_LIST;
1237 buff[DIR_FREE_OFFSET]= new_entry;
1238 do
1239 {
1240 dir+= DIR_ENTRY_SIZE;
1241 dir[0]= dir[1]= 0;
1242 dir[2]= (uchar) prev_entry;
1243 dir[3]= (uchar) new_entry-1;
1244 prev_entry= new_entry;
1245 } while (new_entry-- > max_entry);
1246 if ((dir[3]= free_entry) != END_OF_DIR_FREE_LIST)
1247 {
1248 /* Relink next entry to point to newly freed entry */
1249 uchar *next_entry= dir_entry_pos(buff, block_size, (uint) dir[3]);
1250 DBUG_ASSERT(uint2korr(next_entry) == 0 &&
1251 next_entry[2] == END_OF_DIR_FREE_LIST);
1252 next_entry[2]= max_entry;
1253 }
1254 }
1255
1256 check_directory(info->s,
1257 buff, block_size,
1258 head_page ? MY_MIN(info->s->base.min_block_length, length) :
1259 0, *empty_space);
1260 DBUG_RETURN(0);
1261}
1262
1263
1264/****************************************************************************
1265 Updating records
1266****************************************************************************/
1267
1268/*
1269 Calculate length of all the different field parts
1270
1271 SYNOPSIS
1272 calc_record_size()
1273 info Maria handler
1274 record Row to store
1275 row Store statistics about row here
1276
1277 NOTES
1278 The statistics is used to find out how much space a row will need
1279 and also where we can split a row when we need to split it into several
1280 extents.
1281*/
1282
1283static void calc_record_size(MARIA_HA *info, const uchar *record,
1284 MARIA_ROW *row)
1285{
1286 MARIA_SHARE *share= info->s;
1287 uchar *field_length_data;
1288 MARIA_COLUMNDEF *column, *end_column;
1289 uint *null_field_lengths= row->null_field_lengths;
1290 ulong *blob_lengths= row->blob_lengths;
1291 DBUG_ENTER("calc_record_size");
1292
1293 row->normal_length= row->char_length= row->varchar_length=
1294 row->blob_length= row->extents_count= 0;
1295
1296 /* Create empty bitmap and calculate length of each varlength/char field */
1297 bzero(row->empty_bits, share->base.pack_bytes);
1298 field_length_data= row->field_lengths;
1299 for (column= share->columndef + share->base.fixed_not_null_fields,
1300 end_column= share->columndef + share->base.fields;
1301 column < end_column; column++, null_field_lengths++)
1302 {
1303 if ((record[column->null_pos] & column->null_bit))
1304 {
1305 if (column->type != FIELD_BLOB)
1306 *null_field_lengths= 0;
1307 else
1308 *blob_lengths++= 0;
1309 continue;
1310 }
1311 switch (column->type) {
1312 case FIELD_CHECK:
1313 case FIELD_NORMAL: /* Fixed length field */
1314 case FIELD_ZERO:
1315 DBUG_ASSERT(column->empty_bit == 0);
1316 /* fall through */
1317 case FIELD_SKIP_PRESPACE: /* Not packed */
1318 row->normal_length+= column->length;
1319 *null_field_lengths= column->length;
1320 break;
1321 case FIELD_SKIP_ZERO: /* Fixed length field */
1322 if (memcmp(record+ column->offset, maria_zero_string,
1323 column->length) == 0)
1324 {
1325 row->empty_bits[column->empty_pos] |= column->empty_bit;
1326 *null_field_lengths= 0;
1327 }
1328 else
1329 {
1330 row->normal_length+= column->length;
1331 *null_field_lengths= column->length;
1332 }
1333 break;
1334 case FIELD_SKIP_ENDSPACE: /* CHAR */
1335 {
1336 const uchar *pos, *end;
1337 for (pos= record + column->offset, end= pos + column->length;
1338 end > pos && end[-1] == ' '; end--)
1339 ;
1340 if (pos == end) /* If empty string */
1341 {
1342 row->empty_bits[column->empty_pos]|= column->empty_bit;
1343 *null_field_lengths= 0;
1344 }
1345 else
1346 {
1347 uint length= (uint) (end - pos);
1348 if (column->length <= 255)
1349 *field_length_data++= (uchar) length;
1350 else
1351 {
1352 int2store(field_length_data, length);
1353 field_length_data+= 2;
1354 }
1355 row->char_length+= length;
1356 *null_field_lengths= length;
1357 }
1358 break;
1359 }
1360 case FIELD_VARCHAR:
1361 {
1362 uint length, field_length_data_length;
1363 const uchar *field_pos= record + column->offset;
1364
1365 /* 256 is correct as this includes the length uchar */
1366 field_length_data[0]= field_pos[0];
1367 if (column->length <= 256)
1368 {
1369 length= (uint) (uchar) *field_pos;
1370 field_length_data_length= 1;
1371 }
1372 else
1373 {
1374 length= uint2korr(field_pos);
1375 field_length_data[1]= field_pos[1];
1376 field_length_data_length= 2;
1377 }
1378 *null_field_lengths= length;
1379 if (!length)
1380 {
1381 row->empty_bits[column->empty_pos]|= column->empty_bit;
1382 break;
1383 }
1384 row->varchar_length+= length;
1385 *null_field_lengths= length;
1386 field_length_data+= field_length_data_length;
1387 break;
1388 }
1389 case FIELD_BLOB:
1390 {
1391 const uchar *field_pos= record + column->offset;
1392 uint size_length= column->length - portable_sizeof_char_ptr;
1393 ulong blob_length= _ma_calc_blob_length(size_length, field_pos);
1394
1395 *blob_lengths++= blob_length;
1396 if (!blob_length)
1397 row->empty_bits[column->empty_pos]|= column->empty_bit;
1398 else
1399 {
1400 row->blob_length+= blob_length;
1401 memcpy(field_length_data, field_pos, size_length);
1402 field_length_data+= size_length;
1403 }
1404 break;
1405 }
1406 default:
1407 DBUG_ASSERT(0);
1408 }
1409 }
1410 row->field_lengths_length= (uint) (field_length_data - row->field_lengths);
1411 /*
1412 - info->row_base_length is base information we must have on a page in first
1413 extent:
1414 - flag byte (1) + is_nulls_extended (0 | 1) + null_bytes + pack_bytes +
1415 table_checksum (0 | 1)
1416 - row->min_length is minimum amount of data we must store on
1417 a page. bitmap code will ensure we get at list this much +
1418 total number of extents and one extent information
1419 - fixed_not_null_fields_length is length of fixed length fields that can't
1420 be compacted
1421 - head_length is the amount of data for the head page
1422 (ie, all fields except blobs)
1423 */
1424 row->min_length= (info->row_base_length +
1425 (share->base.max_field_lengths ?
1426 size_to_store_key_length(row->field_lengths_length) :
1427 0));
1428 row->head_length= (row->min_length +
1429 share->base.fixed_not_null_fields_length +
1430 row->field_lengths_length +
1431 row->normal_length +
1432 row->char_length + row->varchar_length);
1433 row->total_length= (row->head_length + row->blob_length);
1434 if (row->total_length < share->base.min_block_length)
1435 row->total_length= share->base.min_block_length;
1436 DBUG_PRINT("exit", ("head_length: %lu total_length: %lu",
1437 (ulong) row->head_length, (ulong) row->total_length));
1438 DBUG_VOID_RETURN;
1439}
1440
1441
1442/**
1443 Compact page by removing all space between rows
1444
1445 Moves up all rows to start of page. Moves blocks that are directly after
1446 each other with one memmove.
1447
1448 @note if rownr is the last row in the page, and extend_block is false,
1449 caller has to make sure to update bitmap page afterwards to reflect freed
1450 space.
1451
1452 @param buff Page to compact
1453 @param block_size Size of page
1454 @param rownr Put empty data after this row
1455 @param extend_block If 1, extend the block at 'rownr' to cover the
1456 whole block.
1457 @param min_read_from If <> 0, remove all trid's that are less than this
1458*/
1459
1460void _ma_compact_block_page(MARIA_SHARE *share,
1461 uchar *buff, uint rownr,
1462 my_bool extend_block, TrID min_read_from,
1463 uint min_row_length)
1464{
1465 uint max_entry= (uint) buff[DIR_COUNT_OFFSET];
1466 uint page_pos, next_free_pos, start_of_found_block, diff, end_of_found_block;
1467 uint freed_size= 0;
1468 uint block_size= share->block_size;
1469 uchar *dir, *end;
1470 DBUG_ENTER("_ma_compact_block_page");
1471 DBUG_PRINT("enter", ("rownr: %u min_read_from: %lu", rownr,
1472 (ulong) min_read_from));
1473 DBUG_ASSERT(max_entry > 0 &&
1474 max_entry < (block_size - PAGE_HEADER_SIZE(share) -
1475 PAGE_SUFFIX_SIZE) / DIR_ENTRY_SIZE);
1476
1477 /* Move all entries before and including rownr up to start of page */
1478 dir= dir_entry_pos(buff, block_size, rownr);
1479 end= dir_entry_pos(buff, block_size, 0);
1480 page_pos= next_free_pos= start_of_found_block= PAGE_HEADER_SIZE(share);
1481 diff= 0;
1482 for (; dir <= end ; end-= DIR_ENTRY_SIZE)
1483 {
1484 uint offset= uint2korr(end);
1485
1486 if (offset)
1487 {
1488 uint row_length= uint2korr(end + 2);
1489 DBUG_ASSERT(offset >= page_pos);
1490 DBUG_ASSERT(buff + offset + row_length <= dir);
1491 DBUG_ASSERT(row_length >= min_row_length || row_length == 0);
1492
1493 /* Row length can be zero if row is to be deleted */
1494 if (min_read_from && row_length && (buff[offset] & ROW_FLAG_TRANSID))
1495 {
1496 TrID transid= transid_korr(buff+offset+1);
1497 if (transid < min_read_from)
1498 {
1499 /* Remove transid from row by moving the start point of the row up */
1500 buff[offset + TRANSID_SIZE]= buff[offset] & ~ROW_FLAG_TRANSID;
1501 offset+= TRANSID_SIZE;
1502 freed_size+= TRANSID_SIZE;
1503 row_length-= TRANSID_SIZE;
1504 int2store(end+2, row_length);
1505 }
1506 }
1507
1508 if (offset != next_free_pos)
1509 {
1510 uint length= (next_free_pos - start_of_found_block);
1511 /*
1512 There was empty space before this and prev block
1513 Check if we have to move previous block up to page start
1514 */
1515 if (page_pos != start_of_found_block)
1516 {
1517 /* move up previous block */
1518 memmove(buff + page_pos, buff + start_of_found_block, length);
1519 }
1520 page_pos+= length;
1521 /* next continuous block starts here */
1522 start_of_found_block= offset;
1523 diff= offset - page_pos;
1524 }
1525 int2store(end, offset - diff); /* correct current pos */
1526 next_free_pos= offset + row_length;
1527
1528 if (unlikely(row_length < min_row_length) && row_length)
1529 {
1530 /*
1531 This can only happen in the case we compacted transid and
1532 the row become 'too short'
1533
1534 Move the current row down to it's right place and extend it
1535 with 0.
1536 */
1537 uint row_diff= min_row_length - row_length;
1538 uint length= (next_free_pos - start_of_found_block);
1539
1540 DBUG_ASSERT(page_pos != start_of_found_block);
1541 bmove(buff + page_pos, buff + start_of_found_block, length);
1542 bzero(buff+ page_pos + length, row_diff);
1543 page_pos+= min_row_length;
1544 int2store(end+2, min_row_length);
1545 freed_size-= row_diff;
1546 next_free_pos= start_of_found_block= page_pos;
1547 diff= 0;
1548 }
1549 }
1550 }
1551 if (page_pos != start_of_found_block)
1552 {
1553 uint length= (next_free_pos - start_of_found_block);
1554 memmove(buff + page_pos, buff + start_of_found_block, length);
1555 }
1556 start_of_found_block= uint2korr(dir);
1557
1558 if (rownr != max_entry - 1)
1559 {
1560 /* Move all entries after rownr to end of page */
1561 uint rownr_length;
1562
1563 DBUG_ASSERT(extend_block); /* Should always be true */
1564 next_free_pos= end_of_found_block= page_pos=
1565 block_size - DIR_ENTRY_SIZE * max_entry - PAGE_SUFFIX_SIZE;
1566 diff= 0;
1567 /* End points to entry before 'rownr' */
1568 for (dir= buff + end_of_found_block ; dir <= end ; dir+= DIR_ENTRY_SIZE)
1569 {
1570 uint offset= uint2korr(dir);
1571 uint row_length;
1572 uint row_end;
1573 if (!offset)
1574 continue;
1575 row_length= uint2korr(dir + 2);
1576 row_end= offset + row_length;
1577 DBUG_ASSERT(offset >= start_of_found_block &&
1578 row_end <= next_free_pos && row_length >= min_row_length);
1579
1580 if (min_read_from && (buff[offset] & ROW_FLAG_TRANSID))
1581 {
1582 TrID transid= transid_korr(buff + offset+1);
1583 if (transid < min_read_from)
1584 {
1585 /* Remove transid from row */
1586 buff[offset + TRANSID_SIZE]= buff[offset] & ~ROW_FLAG_TRANSID;
1587 offset+= TRANSID_SIZE;
1588 row_length-= TRANSID_SIZE;
1589 int2store(dir+2, row_length);
1590 }
1591 if (unlikely(row_length < min_row_length))
1592 {
1593 /*
1594 This can only happen in the case we compacted transid and
1595 the row become 'too short'
1596 */
1597 uint row_diff= min_row_length - row_length;
1598 if (next_free_pos < row_end + row_diff)
1599 {
1600 /*
1601 Not enough space for extending next block with enough
1602 end 0's. Move current data down to get place for them
1603 */
1604 uint move_down= row_diff - (next_free_pos - row_end);
1605 bmove(buff + offset - move_down, buff + offset, row_length);
1606 offset-= move_down;
1607 }
1608 /*
1609 Extend the next block with 0, which will be part of current
1610 row when the blocks are joined together later
1611 */
1612 bzero(buff + next_free_pos - row_diff, row_diff);
1613 next_free_pos-= row_diff;
1614 int2store(dir+2, min_row_length);
1615 }
1616 row_end= offset + row_length;
1617 }
1618
1619 if (row_end != next_free_pos)
1620 {
1621 uint length= (end_of_found_block - next_free_pos);
1622 if (page_pos != end_of_found_block)
1623 {
1624 /* move next block down */
1625 memmove(buff + page_pos - length, buff + next_free_pos, length);
1626 }
1627 page_pos-= length;
1628 /* next continuous block starts here */
1629 end_of_found_block= row_end;
1630 diff= page_pos - row_end;
1631 }
1632 int2store(dir, offset + diff); /* correct current pos */
1633 next_free_pos= offset;
1634 }
1635 if (page_pos != end_of_found_block)
1636 {
1637 uint length= (end_of_found_block - next_free_pos);
1638 memmove(buff + page_pos - length, buff + next_free_pos, length);
1639 next_free_pos= page_pos- length;
1640 }
1641
1642 /* Extend rownr block to cover hole */
1643 rownr_length= next_free_pos - start_of_found_block;
1644 int2store(dir+2, rownr_length);
1645 DBUG_ASSERT(rownr_length >= min_row_length);
1646 }
1647 else
1648 {
1649 if (extend_block)
1650 {
1651 /* Extend last block to cover whole page */
1652 uint length= ((uint) (dir - buff) - start_of_found_block);
1653 int2store(dir+2, length);
1654 DBUG_ASSERT(length >= min_row_length);
1655 }
1656 else
1657 {
1658 /* Add length gained from freed transaction id's to this page */
1659 uint length= uint2korr(buff+ EMPTY_SPACE_OFFSET) + freed_size;
1660 int2store(buff + EMPTY_SPACE_OFFSET, length);
1661 }
1662 buff[PAGE_TYPE_OFFSET]&= ~(uchar) PAGE_CAN_BE_COMPACTED;
1663 }
1664 check_directory(share, buff, block_size, min_row_length,
1665 extend_block ? 0 : (uint) -1);
1666 DBUG_EXECUTE("directory", _ma_print_directory(share,
1667 DBUG_FILE, buff, block_size););
1668 DBUG_VOID_RETURN;
1669}
1670
1671
1672/*
1673 Create an empty tail or head page
1674
1675 SYNOPSIS
1676 make_empty_page()
1677 buff Page buffer
1678 block_size Block size
1679 page_type HEAD_PAGE or TAIL_PAGE
1680 create_dir_entry TRUE of we should create a directory entry
1681
1682 NOTES
1683 EMPTY_SPACE is not updated
1684*/
1685
1686static void make_empty_page(MARIA_HA *info, uchar *buff, uint page_type,
1687 my_bool create_dir_entry)
1688{
1689 uint block_size= info->s->block_size;
1690 DBUG_ENTER("make_empty_page");
1691
1692 bzero(buff, PAGE_HEADER_SIZE(info->s));
1693
1694#if !defined(DONT_ZERO_PAGE_BLOCKS) || defined(HAVE_valgrind)
1695 /*
1696 We zero the rest of the block to avoid getting old memory information
1697 to disk and to allow the file to be compressed better if archived.
1698 The code does not assume the block is zeroed.
1699 */
1700 if (page_type != BLOB_PAGE)
1701 bzero(buff+ PAGE_HEADER_SIZE(info->s),
1702 block_size - PAGE_HEADER_SIZE(info->s));
1703#endif
1704 buff[PAGE_TYPE_OFFSET]= (uchar) page_type;
1705 buff[DIR_COUNT_OFFSET]= (int) create_dir_entry;
1706 buff[DIR_FREE_OFFSET]= END_OF_DIR_FREE_LIST;
1707 if (create_dir_entry)
1708 {
1709 /* Create directory entry to point to start of page with size 0 */
1710 buff+= block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
1711 int2store(buff, PAGE_HEADER_SIZE(info->s));
1712 int2store(buff+2, 0);
1713 }
1714 DBUG_VOID_RETURN;
1715}
1716
1717
1718/*
1719 Read or initialize new head or tail page
1720
1721 SYNOPSIS
1722 get_head_or_tail_page()
1723 info Maria handler
1724 block Block to read
1725 buff Suggest this buffer to key cache
1726 length Minimum space needed
1727 page_type HEAD_PAGE || TAIL_PAGE
1728 res Store result position here
1729
1730 NOTES
1731 We don't decremented buff[EMPTY_SPACE_OFFSET] with the allocated data
1732 as we don't know how much data the caller will actually use.
1733
1734 res->empty_space is set to length of empty space
1735
1736 RETURN
1737 0 ok All slots in 'res' are updated
1738 1 error my_errno is set
1739*/
1740
1741struct st_row_pos_info
1742{
1743 uchar *buff; /* page buffer */
1744 uchar *data; /* Place for data */
1745 uchar *dir; /* Directory */
1746 uint length; /* Length for data */
1747 uint rownr; /* Offset in directory */
1748 uint empty_space; /* Space left on page */
1749};
1750
1751
1752static my_bool get_head_or_tail_page(MARIA_HA *info,
1753 const MARIA_BITMAP_BLOCK *block,
1754 uchar *buff, uint length, uint page_type,
1755 enum pagecache_page_lock lock,
1756 struct st_row_pos_info *res)
1757{
1758 uint block_size;
1759 MARIA_PINNED_PAGE page_link;
1760 MARIA_SHARE *share= info->s;
1761 DBUG_ENTER("get_head_or_tail_page");
1762 DBUG_PRINT("enter", ("page_type: %u length: %u", page_type, length));
1763
1764 block_size= share->block_size;
1765 if (block->org_bitmap_value == 0) /* Empty block */
1766 {
1767 /* New page */
1768 make_empty_page(info, buff, page_type, 1);
1769 res->buff= buff;
1770 res->empty_space= res->length= (block_size - PAGE_OVERHEAD_SIZE(share));
1771 res->data= (buff + PAGE_HEADER_SIZE(share));
1772 res->dir= res->data + res->length;
1773 res->rownr= 0;
1774 DBUG_ASSERT(length <= res->length);
1775 }
1776 else
1777 {
1778 uchar *dir;
1779 /* Read old page */
1780 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
1781 res->buff= pagecache_read(share->pagecache, &info->dfile,
1782 block->page, 0, 0, share->page_type,
1783 lock, &page_link.link);
1784 page_link.changed= res->buff != 0;
1785 push_dynamic(&info->pinned_pages, (void*) &page_link);
1786 if (!page_link.changed)
1787 goto crashed;
1788
1789 DBUG_ASSERT((uint) (res->buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) ==
1790 page_type);
1791 if (!(dir= find_free_position(info, res->buff, block_size, &res->rownr,
1792 &res->length, &res->empty_space,
1793 page_type == HEAD_PAGE)))
1794 goto crashed;
1795
1796 if (res->length < length)
1797 {
1798 if (res->empty_space + res->length >= length)
1799 {
1800 _ma_compact_block_page(share,
1801 res->buff, res->rownr, 1,
1802 (page_type == HEAD_PAGE ?
1803 info->trn->min_read_from : 0),
1804 (page_type == HEAD_PAGE ?
1805 share->base.min_block_length :
1806 0));
1807 /* All empty space are now after current position */
1808 dir= dir_entry_pos(res->buff, block_size, res->rownr);
1809 res->length= res->empty_space= uint2korr(dir+2);
1810 }
1811 if (res->length < length)
1812 {
1813 DBUG_PRINT("error", ("length: %u res->length: %u empty_space: %u",
1814 length, res->length, res->empty_space));
1815 goto crashed; /* Wrong bitmap information */
1816 }
1817 }
1818 res->dir= dir;
1819 res->data= res->buff + uint2korr(dir);
1820 }
1821 DBUG_RETURN(0);
1822
1823crashed:
1824 DBUG_ASSERT(!maria_assert_if_crashed_table);
1825 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD); /* File crashed */
1826 DBUG_RETURN(1);
1827}
1828
1829
1830/*
1831 @brief Create room for a head or tail row on a given page at given position
1832
1833 @fn get_rowpos_in_head_or_tail_page()
1834 @param info Maria handler
1835 @param block Block to read
1836 @param buff Suggest this buffer to key cache
1837 @param length Minimum space needed
1838 @param page_type HEAD_PAGE || TAIL_PAGE
1839 @param rownr Rownr to use
1840 @param res Store result position here
1841
1842 @note
1843 This is essential same as get_head_or_tail_page, with the difference
1844 that the caller species at what position the row should be put.
1845 This is used when restoring a row to it's original position as
1846 part of UNDO DELETE or UNDO UPDATE
1847
1848 @return
1849 @retval 0 ok All slots in 'res' are updated
1850 @retval 1 error my_errno is set
1851*/
1852
1853static my_bool get_rowpos_in_head_or_tail_page(MARIA_HA *info,
1854 const MARIA_BITMAP_BLOCK *block,
1855 uchar *buff, uint length,
1856 uint page_type,
1857 enum pagecache_page_lock lock,
1858 uint rownr,
1859 struct st_row_pos_info *res)
1860{
1861 MARIA_PINNED_PAGE page_link;
1862 MARIA_SHARE *share= info->s;
1863 uchar *dir;
1864 uint block_size= share->block_size;
1865 uint max_entry, max_length, rec_offset;
1866 DBUG_ENTER("get_rowpos_in_head_or_tail_page");
1867
1868 if (block->org_bitmap_value == 0) /* Empty block */
1869 {
1870 /* New page */
1871 make_empty_page(info, buff, page_type, 0);
1872 res->empty_space= block_size - PAGE_HEADER_SIZE(share) - PAGE_SUFFIX_SIZE;
1873 }
1874 else
1875 {
1876 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
1877 buff= pagecache_read(share->pagecache, &info->dfile,
1878 block->page, 0, 0, share->page_type,
1879 lock, &page_link.link);
1880 page_link.changed= buff != 0;
1881 push_dynamic(&info->pinned_pages, (void*) &page_link);
1882 if (!page_link.changed) /* Read error */
1883 goto err;
1884 DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) ==
1885 (uchar) page_type);
1886 if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != (uchar) page_type)
1887 goto err;
1888 res->empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
1889 }
1890
1891 max_entry= (uint) buff[DIR_COUNT_OFFSET];
1892 if (max_entry <= rownr)
1893 {
1894 if (extend_directory(info, buff, block_size,
1895 max_entry, rownr, &res->empty_space,
1896 page_type == HEAD_PAGE))
1897 goto err;
1898 }
1899
1900 /*
1901 The following dir entry is unused in case of insert / update but
1902 not in case of undo_update / undo_delete
1903 */
1904 dir= dir_entry_pos(buff, block_size, rownr);
1905
1906 if (extend_area_on_page(info, buff, dir, rownr, length,
1907 &res->empty_space, &rec_offset, &max_length,
1908 page_type == HEAD_PAGE))
1909 goto err;
1910
1911 res->buff= buff;
1912 res->rownr= rownr;
1913 res->dir= dir;
1914 res->data= buff + rec_offset;
1915 res->length= length;
1916 DBUG_RETURN(0);
1917
1918err:
1919 DBUG_ASSERT(!maria_assert_if_crashed_table);
1920 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD); /* File crashed */
1921 DBUG_RETURN(1);
1922}
1923
1924
1925/*
1926 Write tail for head data or blob
1927
1928 SYNOPSIS
1929 write_tail()
1930 info Maria handler
1931 block Block to tail page
1932 row_part Data to write to page
1933 length Length of data
1934
1935 NOTES
1936 block->page_count is updated to the directory offset for the tail
1937 so that we can store the position in the row extent information
1938
1939 RETURN
1940 0 ok
1941 block->page_count is set to point (dir entry + TAIL_BIT)
1942
1943 1 error; In this case my_errno is set to the error
1944*/
1945
1946static my_bool write_tail(MARIA_HA *info,
1947 MARIA_BITMAP_BLOCK *block,
1948 uchar *row_part, uint org_length)
1949{
1950 MARIA_SHARE *share= info->s;
1951 MARIA_PINNED_PAGE page_link;
1952 uint block_size= share->block_size, empty_space, length= org_length;
1953 struct st_row_pos_info row_pos;
1954 my_off_t position;
1955 my_bool res, block_is_read;
1956 DBUG_ENTER("write_tail");
1957 DBUG_PRINT("enter", ("page: %lu length: %u",
1958 (ulong) block->page, length));
1959
1960 info->keyread_buff_used= 1;
1961 /*
1962 Don't allocate smaller block than MIN_TAIL_SIZE (we want to give rows
1963 some place to grow in the future)
1964 */
1965 if (length < MIN_TAIL_SIZE)
1966 length= MIN_TAIL_SIZE;
1967
1968 if (block->page_count == TAIL_PAGE_COUNT_MARKER)
1969 {
1970 /*
1971 Create new tail
1972 page will be pinned & locked by get_head_or_tail_page
1973 */
1974 if (get_head_or_tail_page(info, block, info->keyread_buff, length,
1975 TAIL_PAGE, PAGECACHE_LOCK_WRITE,
1976 &row_pos))
1977 DBUG_RETURN(1);
1978 }
1979 else
1980 {
1981 /* Write tail on predefined row position */
1982 if (get_rowpos_in_head_or_tail_page(info, block, info->keyread_buff,
1983 length, TAIL_PAGE,
1984 PAGECACHE_LOCK_WRITE,
1985 block->page_count & ~TAIL_BIT,
1986 &row_pos))
1987 DBUG_RETURN(1);
1988 }
1989 DBUG_PRINT("info", ("tailid: %lu (%lu:%u)",
1990 (ulong) ma_recordpos(block->page, row_pos.rownr),
1991 (ulong) block->page, row_pos.rownr));
1992
1993 block_is_read= block->org_bitmap_value != 0;
1994
1995 memcpy(row_pos.data, row_part, org_length);
1996
1997 if (share->now_transactional)
1998 {
1999 /* Log changes in tail block */
2000 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
2001 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
2002 LSN lsn;
2003
2004 /*
2005 Log REDO changes of tail page
2006 Note that we have to log length, not org_length, to be sure that
2007 REDO, which doesn't use write_tail, also creates a block of at least
2008 MIN_TAIL_SIZE
2009 */
2010 page_store(log_data + FILEID_STORE_SIZE, block->page);
2011 dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
2012 row_pos.rownr);
2013 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2014 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
2015 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= row_pos.data;
2016 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
2017 if (translog_write_record(&lsn,
2018 (block_is_read ? LOGREC_REDO_INSERT_ROW_TAIL :
2019 LOGREC_REDO_NEW_ROW_TAIL),
2020 info->trn, info,
2021 (translog_size_t) (sizeof(log_data) + length),
2022 TRANSLOG_INTERNAL_PARTS + 2, log_array,
2023 log_data, NULL))
2024 DBUG_RETURN(1);
2025 }
2026
2027 int2store(row_pos.dir + 2, length);
2028 empty_space= row_pos.empty_space - length;
2029 int2store(row_pos.buff + EMPTY_SPACE_OFFSET, empty_space);
2030 block->page_count= row_pos.rownr + TAIL_BIT;
2031 /*
2032 If there is less directory entries free than number of possible tails
2033 we can write for a row, we mark the page full to ensure that we don't
2034 during _ma_bitmap_find_place() allocate more entries on the tail page
2035 than it can hold
2036 */
2037 block->empty_space= (enough_free_entries(row_pos.buff, share->block_size,
2038 1 + share->base.blobs) ?
2039 empty_space : 0);
2040 /* Keep BLOCKUSED_USE_ORG_BITMAP */
2041 block->used|= BLOCKUSED_USED | BLOCKUSED_TAIL;
2042
2043 if (block_is_read)
2044 {
2045 /* Current page link is last element in pinned_pages */
2046 MARIA_PINNED_PAGE *page_link;
2047 page_link= dynamic_element(&info->pinned_pages,
2048 info->pinned_pages.elements-1,
2049 MARIA_PINNED_PAGE*);
2050 pagecache_unlock_by_link(share->pagecache, page_link->link,
2051 PAGECACHE_LOCK_WRITE_TO_READ,
2052 PAGECACHE_PIN_LEFT_PINNED, LSN_IMPOSSIBLE,
2053 LSN_IMPOSSIBLE, 1, FALSE);
2054 DBUG_ASSERT(page_link->changed);
2055 page_link->unlock= PAGECACHE_LOCK_READ_UNLOCK;
2056 res= 0;
2057 }
2058 else
2059 {
2060 if (!(res= pagecache_write(share->pagecache,
2061 &info->dfile, block->page, 0,
2062 row_pos.buff,share->page_type,
2063 PAGECACHE_LOCK_READ,
2064 PAGECACHE_PIN,
2065 PAGECACHE_WRITE_DELAY, &page_link.link,
2066 LSN_IMPOSSIBLE)))
2067 {
2068 DBUG_ASSERT(page_link.link);
2069 page_link.unlock= PAGECACHE_LOCK_READ_UNLOCK;
2070 page_link.changed= 1;
2071 push_dynamic(&info->pinned_pages, (void*) &page_link);
2072 }
2073
2074 /* Increase data file size, if extended */
2075 position= (my_off_t) block->page * block_size;
2076 if (share->state.state.data_file_length <= position)
2077 {
2078 /*
2079 We are modifying a state member before writing the UNDO; this is a WAL
2080 violation. But for data_file_length this is ok, as long as we change
2081 data_file_length after writing any log record (FILE_ID/REDO/UNDO) (see
2082 collect_tables()).
2083 */
2084 _ma_set_share_data_file_length(share, position + block_size);
2085 }
2086 }
2087 DBUG_RETURN(res);
2088}
2089
2090
2091/*
2092 Write full pages
2093
2094 SYNOPSIS
2095 write_full_pages()
2096 info Maria handler
2097 lsn LSN for the undo record
2098 block Where to write data
2099 data Data to write
2100 length Length of data
2101
2102 NOTES
2103 Logging of the changes to the full pages are done in the caller
2104 write_block_record().
2105
2106 RETURN
2107 0 ok
2108 1 error on write
2109*/
2110
2111static my_bool write_full_pages(MARIA_HA *info,
2112 LSN lsn,
2113 MARIA_BITMAP_BLOCK *block,
2114 uchar *data, ulong length)
2115{
2116 pgcache_page_no_t page;
2117 MARIA_SHARE *share= info->s;
2118 uint block_size= share->block_size;
2119 uint data_size= FULL_PAGE_SIZE(share);
2120 uchar *buff= info->keyread_buff;
2121 uint page_count, sub_blocks;
2122 my_off_t position, max_position;
2123 DBUG_ENTER("write_full_pages");
2124 DBUG_PRINT("enter", ("length: %lu page: %lu page_count: %lu",
2125 (ulong) length, (ulong) block->page,
2126 (ulong) block->page_count));
2127 DBUG_ASSERT((block->page_count & TAIL_BIT) == 0);
2128
2129 info->keyread_buff_used= 1;
2130 page= block->page;
2131 page_count= block->page_count;
2132 sub_blocks= block->sub_blocks;
2133
2134 max_position= (my_off_t) (page + page_count) * block_size;
2135
2136 /* Increase data file size, if extended */
2137
2138 for (; length; data+= data_size)
2139 {
2140 uint copy_length;
2141 if (!page_count--)
2142 {
2143 if (!--sub_blocks)
2144 {
2145 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
2146 DBUG_RETURN(1);
2147 }
2148
2149 block++;
2150 page= block->page;
2151 page_count= block->page_count - 1;
2152 DBUG_PRINT("info", ("page: %lu page_count: %lu",
2153 (ulong) block->page, (ulong) block->page_count));
2154
2155 position= (page + page_count + 1) * block_size;
2156 set_if_bigger(max_position, position);
2157 }
2158 lsn_store(buff, lsn);
2159 buff[PAGE_TYPE_OFFSET]= (uchar) BLOB_PAGE;
2160 bzero(buff + LSN_SIZE + PAGE_TYPE_SIZE,
2161 FULL_PAGE_HEADER_SIZE(share) - (LSN_SIZE + PAGE_TYPE_SIZE));
2162 copy_length= MY_MIN(data_size, length);
2163 memcpy(buff + FULL_PAGE_HEADER_SIZE(share), data, copy_length);
2164 length-= copy_length;
2165
2166 /*
2167 Zero out old information from the block. This removes possible
2168 sensitive information from the block and also makes the file
2169 easier to compress and easier to compare after recovery.
2170 */
2171 if (copy_length != data_size)
2172 bzero(buff + block_size - PAGE_SUFFIX_SIZE - (data_size - copy_length),
2173 (data_size - copy_length) + PAGE_SUFFIX_SIZE);
2174
2175 if (pagecache_write(share->pagecache,
2176 &info->dfile, page, 0,
2177 buff, share->page_type,
2178 PAGECACHE_LOCK_LEFT_UNLOCKED,
2179 PAGECACHE_PIN_LEFT_UNPINNED,
2180 PAGECACHE_WRITE_DELAY,
2181 0, info->trn->rec_lsn))
2182 DBUG_RETURN(1);
2183 page++;
2184 DBUG_ASSERT(block->used & BLOCKUSED_USED);
2185 }
2186 if (share->state.state.data_file_length < max_position)
2187 _ma_set_share_data_file_length(share, max_position);
2188 DBUG_RETURN(0);
2189}
2190
2191
2192/*
2193 Store ranges of full pages in compact format for logging
2194
2195 SYNOPSIS
2196 store_page_range()
2197 to Store data here
2198 block Where pages are to be written
2199 length Length of data to be written
2200 Normally this is full pages, except for the last
2201 tail block that may only partly fit the last page.
2202 tot_ranges Add here the number of ranges used
2203
2204 NOTES
2205 The format of one entry is:
2206
2207 Ranges SUB_RANGE_SIZE
2208 Empty bytes at end of last byte BLOCK_FILLER_SIZE
2209 For each range
2210 Page number PAGE_STORE_SIZE
2211 Number of pages PAGERANGE_STORE_SIZE
2212
2213 RETURN
2214 # end position for 'to'
2215*/
2216
2217static uchar *store_page_range(MARIA_SHARE *share,
2218 uchar *to, MARIA_BITMAP_BLOCK *block,
2219 ulong length,
2220 uint *tot_ranges)
2221{
2222 uint data_size= FULL_PAGE_SIZE(share);
2223 ulong pages_left= (length + data_size -1) / data_size;
2224 uint page_count, ranges, empty_space;
2225 uchar *to_start;
2226 DBUG_ENTER("store_page_range");
2227
2228 to_start= to;
2229 to+= SUB_RANGE_SIZE;
2230
2231 /* Store number of unused bytes at last page */
2232 empty_space= (uint) (pages_left * data_size - length);
2233 int2store(to, empty_space);
2234 to+= BLOCK_FILLER_SIZE;
2235
2236 ranges= 0;
2237 do
2238 {
2239 pgcache_page_no_t page;
2240 page= block->page;
2241 page_count= block->page_count;
2242 block++;
2243 if (page_count > pages_left)
2244 page_count= pages_left;
2245
2246 page_store(to, page);
2247 to+= PAGE_STORE_SIZE;
2248 pagerange_store(to, page_count);
2249 to+= PAGERANGE_STORE_SIZE;
2250 ranges++;
2251 } while ((pages_left-= page_count));
2252 /* Store number of ranges for this block */
2253 int2store(to_start, ranges);
2254 (*tot_ranges)+= ranges;
2255
2256 DBUG_RETURN(to);
2257}
2258
2259
2260/*
2261 Store packed extent data
2262
2263 SYNOPSIS
2264 store_extent_info()
2265 to Store first packed data here
2266 row_extents_second_part Store rest here
2267 first_block First block to store
2268 count Number of blocks
2269
2270 NOTES
2271 We don't have to store the position for the head block
2272
2273 We have to set the START_EXTENT_BIT for every extent where the
2274 blob will be stored on a page of it's own. We need this in the
2275 UNDO phase to generate MARIA_BITMAP_BLOCK's for undo-delete and
2276 undo-update.
2277*/
2278
2279static void store_extent_info(uchar *to,
2280 uchar *row_extents_second_part,
2281 MARIA_BITMAP_BLOCK *first_block,
2282 uint count)
2283{
2284 MARIA_BITMAP_BLOCK *block, *end_block;
2285 uint copy_length;
2286 my_bool first_found= 0;
2287 DBUG_ENTER("store_extent_info");
2288 DBUG_PRINT("enter", ("count: %u", count));
2289
2290 for (block= first_block, end_block= first_block+count ;
2291 block < end_block; block++)
2292 {
2293 /* The following is only false for marker (unused) blocks */
2294 if (likely(block->used & BLOCKUSED_USED))
2295 {
2296 uint page_count= block->page_count;
2297 DBUG_ASSERT(page_count != 0);
2298 page_store(to, block->page);
2299 if (block->sub_blocks)
2300 {
2301 /*
2302 Set a bit so that we later know that this was the first block
2303 for a blob
2304 */
2305 page_count|= START_EXTENT_BIT;
2306 }
2307 pagerange_store(to + PAGE_STORE_SIZE, page_count);
2308 DBUG_DUMP("extent", to, ROW_EXTENT_SIZE);
2309 to+= ROW_EXTENT_SIZE;
2310 if (!first_found)
2311 {
2312 first_found= 1;
2313 to= row_extents_second_part;
2314 }
2315 }
2316 }
2317 copy_length= (count - 1) * ROW_EXTENT_SIZE;
2318 /*
2319 In some unlikely cases we have allocated to many blocks. Clear this
2320 data.
2321 */
2322 bzero(to, (size_t) (row_extents_second_part + copy_length - to));
2323 DBUG_VOID_RETURN;
2324}
2325
2326
2327/**
2328 @brief
2329 Convert extent info read from file to MARIA_BITMAP_BLOCKS suitable
2330 for write_block_record
2331
2332 @note
2333 In case of blobs, this function marks all the blob pages in the bitmap
2334 as full pages. The bitmap bits for other pages will be marked
2335 when write_block_record() calls _ma_bitmap_release_unused().
2336
2337 This function will be removed in Maria 2.0 when we instead of delete rows
2338 mark them as deleted and only remove them after commit.
2339
2340 @return
2341 @retval 0 ok
2342 @retval 1 Error (out of memory or disk error changing bitmap) or
2343 wrong information in extent information
2344*/
2345
2346static my_bool extent_to_bitmap_blocks(MARIA_HA *info,
2347 MARIA_BITMAP_BLOCKS *blocks,
2348 pgcache_page_no_t head_page,
2349 uint extent_count,
2350 const uchar *extent_info)
2351{
2352 MARIA_BITMAP_BLOCK *block, *start_block;
2353 MARIA_SHARE *share= info->s;
2354 uint i, tail_page;
2355 DBUG_ENTER("extent_to_bitmap_blocks");
2356
2357 if (allocate_dynamic(&info->bitmap_blocks, extent_count + 2))
2358 DBUG_RETURN(1);
2359 block= blocks->block= dynamic_element(&info->bitmap_blocks, 0,
2360 MARIA_BITMAP_BLOCK*);
2361 blocks->count= extent_count + 1;
2362 blocks->tail_page_skipped= blocks->page_skipped= 0;
2363 block->page= head_page;
2364 block->page_count= 1;
2365 block->used= BLOCKUSED_USED | BLOCKUSED_USE_ORG_BITMAP;
2366 /* Impossible value, will force storage of real value */
2367 block->org_bitmap_value= 255;
2368
2369 start_block= block++;
2370 for (i=0 ;
2371 i++ < extent_count ;
2372 block++, extent_info+= ROW_EXTENT_SIZE)
2373 {
2374 uint page_count= uint2korr(extent_info + ROW_EXTENT_PAGE_SIZE);
2375 if (page_count & START_EXTENT_BIT)
2376 {
2377 page_count&= ~START_EXTENT_BIT;
2378 start_block->sub_blocks= (uint) (block - start_block);
2379 start_block= block;
2380 }
2381 block->page= page_korr(extent_info);
2382 block->page_count= page_count;
2383 block->sub_blocks= 0;
2384 if (block->page_count == 0)
2385 {
2386 /* Extend allocated but not used by write_block_record() */
2387 DBUG_ASSERT(block->page == 0);
2388 /* This is the last block */
2389 blocks->count= i;
2390 break;
2391 }
2392 if ((tail_page= page_count & TAIL_BIT))
2393 page_count= 1;
2394
2395 /* Check if wrong data */
2396 if (block->page == 0 || page_count == 0 ||
2397 (block->page + page_count) * share->block_size >
2398 share->state.state.data_file_length)
2399 {
2400 DBUG_PRINT("error", ("page: %lu page_count: %u tail: %u length: %ld data_length: %ld",
2401 (ulong) block->page,
2402 (block->page_count & ~TAIL_BIT),
2403 (uint) MY_TEST(block->page_count & TAIL_BIT),
2404 (ulong) ((block->page + (page_count & ~TAIL_BIT)) *
2405 share->block_size),
2406 (ulong) share->state.state.data_file_length));
2407 DBUG_RETURN(1);
2408 }
2409 if (tail_page)
2410 {
2411 block->org_bitmap_value= _ma_bitmap_get_page_bits(info, &share->bitmap,
2412 block->page);
2413 block->used= (BLOCKUSED_TAIL | BLOCKUSED_USED |
2414 BLOCKUSED_USE_ORG_BITMAP);
2415 }
2416 else
2417 {
2418 my_bool res;
2419 mysql_mutex_lock(&share->bitmap.bitmap_lock);
2420 res= _ma_bitmap_set_full_page_bits(info, &share->bitmap,
2421 block->page, page_count);
2422 mysql_mutex_unlock(&share->bitmap.bitmap_lock);
2423 if (res)
2424 DBUG_RETURN(1);
2425 block->used= BLOCKUSED_USED;
2426 }
2427 }
2428 start_block->sub_blocks= (uint) (block - start_block);
2429 DBUG_RETURN(0);
2430}
2431
2432
2433/*
2434 Free regions of pages with logging
2435
2436 NOTES
2437 We are removing filler events and tail page events from
2438 row->extents to get smaller log.
2439
2440 RETURN
2441 0 ok
2442 1 error
2443*/
2444
2445static my_bool free_full_pages(MARIA_HA *info, MARIA_ROW *row)
2446{
2447 uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE];
2448 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
2449 LSN lsn;
2450 size_t extents_length;
2451 uchar *extents= row->extents;
2452 DBUG_ENTER("free_full_pages");
2453
2454 if (info->s->now_transactional)
2455 {
2456 /* Compact events by removing filler and tail events */
2457 uchar *new_block= 0;
2458 uchar *end, *to, *compact_extent_info;
2459 my_bool res;
2460 uint extents_count;
2461
2462 if (!(compact_extent_info= my_alloca(row->extents_count *
2463 ROW_EXTENT_SIZE)))
2464 DBUG_RETURN(1);
2465
2466 to= compact_extent_info;
2467 for (end= extents + row->extents_count * ROW_EXTENT_SIZE ;
2468 extents < end ;
2469 extents+= ROW_EXTENT_SIZE)
2470 {
2471 uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
2472 page_count&= ~START_EXTENT_BIT;
2473 if (! (page_count & TAIL_BIT) && page_count != 0)
2474 {
2475 /* Found correct extent */
2476 if (!new_block)
2477 new_block= extents; /* First extent in range */
2478 continue;
2479 }
2480 /* Found extent to remove, copy everything found so far */
2481 if (new_block)
2482 {
2483 size_t length= (size_t) (extents - new_block);
2484 memcpy(to, new_block, length);
2485 to+= length;
2486 new_block= 0;
2487 }
2488 }
2489 if (new_block)
2490 {
2491 size_t length= (size_t) (extents - new_block);
2492 memcpy(to, new_block, length);
2493 to+= length;
2494 }
2495
2496 if (!unlikely(extents_length= (uint) (to - compact_extent_info)))
2497 {
2498 /*
2499 No ranges. This happens in the rear case when we have a allocated
2500 place for a blob on a tail page but it did fit into the main page.
2501 */
2502 my_afree(compact_extent_info);
2503 DBUG_RETURN(0);
2504 }
2505 extents_count= (uint) (extents_length / ROW_EXTENT_SIZE);
2506 pagerange_store(log_data + FILEID_STORE_SIZE, extents_count);
2507 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2508 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
2509 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= compact_extent_info;
2510 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extents_length;
2511 res= translog_write_record(&lsn, LOGREC_REDO_FREE_BLOCKS, info->trn,
2512 info,
2513 (translog_size_t) (sizeof(log_data) +
2514 extents_length),
2515 TRANSLOG_INTERNAL_PARTS + 2, log_array,
2516 log_data, NULL);
2517 my_afree(compact_extent_info);
2518 if (res)
2519 DBUG_RETURN(1);
2520 }
2521
2522 DBUG_RETURN(_ma_bitmap_free_full_pages(info, row->extents,
2523 row->extents_count));
2524}
2525
2526
2527/*
2528 Free one page range
2529
2530 NOTES
2531 This is very similar to free_full_pages()
2532
2533 RETURN
2534 0 ok
2535 1 error
2536*/
2537
2538static my_bool free_full_page_range(MARIA_HA *info, pgcache_page_no_t page,
2539 uint count)
2540{
2541 my_bool res= 0;
2542 uint delete_count;
2543 MARIA_SHARE *share= info->s;
2544 DBUG_ENTER("free_full_page_range");
2545
2546 delete_count= count;
2547 if (share->state.state.data_file_length ==
2548 (page + count) * share->block_size)
2549 {
2550 /*
2551 Don't delete last page from pagecache as this will make the file
2552 shorter than expected if the last operation extended the file
2553 */
2554 delete_count--;
2555 }
2556 if (delete_count &&
2557 pagecache_delete_pages(share->pagecache, &info->dfile,
2558 page, delete_count, PAGECACHE_LOCK_WRITE, 1))
2559 res= 1;
2560
2561 if (share->now_transactional)
2562 {
2563 LSN lsn;
2564 /** @todo unify log_data's shape with delete_head_or_tail() */
2565 uchar log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
2566 ROW_EXTENT_SIZE];
2567 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
2568 DBUG_ASSERT(info->trn->rec_lsn);
2569 pagerange_store(log_data + FILEID_STORE_SIZE, 1);
2570 page_store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
2571 page);
2572 int2store(log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
2573 PAGE_STORE_SIZE, count);
2574 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
2575 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
2576
2577 if (translog_write_record(&lsn, LOGREC_REDO_FREE_BLOCKS,
2578 info->trn, info,
2579 (translog_size_t) sizeof(log_data),
2580 TRANSLOG_INTERNAL_PARTS + 1, log_array,
2581 log_data, NULL))
2582 res= 1;
2583 }
2584 mysql_mutex_lock(&share->bitmap.bitmap_lock);
2585 if (_ma_bitmap_reset_full_page_bits(info, &share->bitmap, page, count))
2586 res= 1;
2587 mysql_mutex_unlock(&share->bitmap.bitmap_lock);
2588 DBUG_RETURN(res);
2589}
2590
2591
2592/**
2593 @brief Write a record to a (set of) pages
2594
2595 @fn write_block_record()
2596 @param info Maria handler
2597 @param old_record Original record in case of update; NULL in case of
2598 insert
2599 @param record Record we should write
2600 @param row Statistics about record (calculated by
2601 calc_record_size())
2602 @param map_blocks On which pages the record should be stored
2603 @param row_pos Position on head page where to put head part of
2604 record
2605 @param undo_lsn <> LSN_ERROR if we are executing an UNDO
2606 @param old_record_checksum Checksum of old_record: ignored if table does
2607 not have live checksum; otherwise if
2608 old_record==NULL it must be 0.
2609
2610 @note
2611 On return all pinned pages are released.
2612
2613 [page_buff + EMPTY_SPACE_OFFSET] is set to
2614 row_pos->empty_space - head_length
2615
2616 @return Operation status
2617 @retval 0 OK
2618 @retval 1 Error
2619*/
2620
2621static my_bool write_block_record(MARIA_HA *info,
2622 const uchar *old_record,
2623 const uchar *record,
2624 MARIA_ROW *row,
2625 MARIA_BITMAP_BLOCKS *bitmap_blocks,
2626 my_bool head_block_is_read,
2627 struct st_row_pos_info *row_pos,
2628 LSN undo_lsn,
2629 ha_checksum old_record_checksum)
2630{
2631 uchar *data, *end_of_data, *tmp_data_used, *tmp_data;
2632 uchar *UNINIT_VAR(row_extents_first_part), *UNINIT_VAR(row_extents_second_part);
2633 uchar *field_length_data;
2634 uchar *page_buff;
2635 MARIA_BITMAP_BLOCK *block, *head_block;
2636 MARIA_SHARE *share= info->s;
2637 MARIA_COLUMNDEF *column, *end_column;
2638 MARIA_PINNED_PAGE page_link;
2639 uint block_size, flag, head_length;
2640 ulong *blob_lengths;
2641 my_bool row_extents_in_use, blob_full_pages_exists;
2642 LSN lsn;
2643 my_off_t position;
2644 uint save_my_errno;
2645 DBUG_ENTER("write_block_record");
2646
2647 head_block= bitmap_blocks->block;
2648 block_size= share->block_size;
2649
2650 page_buff= row_pos->buff;
2651 /* Position on head page where we should store the head part */
2652 data= row_pos->data;
2653 end_of_data= data + row_pos->length;
2654
2655 /* Write header */
2656 flag= info->row_flag;
2657 row_extents_in_use= 0;
2658 if (unlikely(row->total_length > row_pos->length))
2659 {
2660 /* Need extent */
2661 DBUG_ASSERT(bitmap_blocks->count > 1);
2662 if (bitmap_blocks->count <= 1)
2663 goto crashed; /* Wrong in bitmap */
2664 flag|= ROW_FLAG_EXTENTS;
2665 row_extents_in_use= 1;
2666 }
2667 /* For now we have only a minimum header */
2668 *data++= (uchar) flag;
2669 if (flag & ROW_FLAG_TRANSID)
2670 {
2671 transid_store(data, info->trn->trid);
2672 data+= TRANSID_SIZE;
2673 }
2674
2675 if (unlikely(flag & ROW_FLAG_NULLS_EXTENDED))
2676 *data++= (uchar) (share->base.null_bytes -
2677 share->base.original_null_bytes);
2678 if (row_extents_in_use)
2679 {
2680 /* Store first extent in header */
2681 store_key_length_inc(data, bitmap_blocks->count - 1);
2682 row_extents_first_part= data;
2683 data+= ROW_EXTENT_SIZE;
2684 }
2685 if (share->base.max_field_lengths)
2686 store_key_length_inc(data, row->field_lengths_length);
2687 if (share->calc_checksum)
2688 {
2689 *(data++)= (uchar) (row->checksum); /* store least significant byte */
2690 DBUG_ASSERT(!((old_record_checksum != 0) && (old_record == NULL)));
2691 }
2692 memcpy(data, record, share->base.null_bytes);
2693 data+= share->base.null_bytes;
2694 memcpy(data, row->empty_bits, share->base.pack_bytes);
2695 data+= share->base.pack_bytes;
2696
2697 DBUG_ASSERT(row_extents_in_use || undo_lsn != LSN_ERROR ||
2698 (uint) (data - row_pos->data) == row->min_length);
2699
2700 /*
2701 Allocate a buffer of rest of data (except blobs)
2702
2703 To avoid double copying of data, we copy as many columns that fits into
2704 the page. The rest goes into info->packed_row.
2705
2706 Using an extra buffer, instead of doing continuous writes to different
2707 pages, uses less code and we don't need to have to do a complex call
2708 for every data segment we want to store.
2709 */
2710 if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
2711 row->head_length))
2712 DBUG_RETURN(1);
2713
2714 tmp_data_used= 0; /* Either 0 or last used uchar in 'data' */
2715 tmp_data= data;
2716
2717 if (row_extents_in_use)
2718 {
2719 uint copy_length= (bitmap_blocks->count - 2) * ROW_EXTENT_SIZE;
2720 if (!tmp_data_used && tmp_data + copy_length > end_of_data)
2721 {
2722 tmp_data_used= tmp_data;
2723 tmp_data= info->rec_buff;
2724 }
2725 row_extents_second_part= tmp_data;
2726 /*
2727 We will copy the extents here when we have figured out the tail
2728 positions.
2729 */
2730 tmp_data+= copy_length;
2731 }
2732
2733 /* Copy fields that has fixed lengths (primary key etc) */
2734 for (column= share->columndef,
2735 end_column= column + share->base.fixed_not_null_fields;
2736 column < end_column; column++)
2737 {
2738 if (!tmp_data_used && tmp_data + column->length > end_of_data)
2739 {
2740 tmp_data_used= tmp_data;
2741 tmp_data= info->rec_buff;
2742 }
2743 memcpy(tmp_data, record + column->offset, column->length);
2744 tmp_data+= column->length;
2745 }
2746
2747 /* Copy length of data for variable length fields */
2748 if (!tmp_data_used && tmp_data + row->field_lengths_length > end_of_data)
2749 {
2750 tmp_data_used= tmp_data;
2751 tmp_data= info->rec_buff;
2752 }
2753 field_length_data= row->field_lengths;
2754 memcpy(tmp_data, field_length_data, row->field_lengths_length);
2755 tmp_data+= row->field_lengths_length;
2756
2757 DBUG_ASSERT(row_extents_in_use || undo_lsn != LSN_ERROR ||
2758 (uint) (tmp_data - row_pos->data) == row->min_length +
2759 share->base.fixed_not_null_fields_length +
2760 row->field_lengths_length);
2761
2762 /* Copy variable length fields and fields with null/zero */
2763 for (end_column= share->columndef + share->base.fields - share->base.blobs;
2764 column < end_column ;
2765 column++)
2766 {
2767 const uchar *field_pos;
2768 ulong length;
2769 if ((record[column->null_pos] & column->null_bit) ||
2770 (row->empty_bits[column->empty_pos] & column->empty_bit))
2771 continue;
2772
2773 field_pos= record + column->offset;
2774 switch (column->type) {
2775 case FIELD_NORMAL: /* Fixed length field */
2776 case FIELD_SKIP_PRESPACE:
2777 case FIELD_SKIP_ZERO: /* Fixed length field */
2778 length= column->length;
2779 break;
2780 case FIELD_SKIP_ENDSPACE: /* CHAR */
2781 /* Char that is space filled */
2782 if (column->length <= 255)
2783 length= (uint) (uchar) *field_length_data++;
2784 else
2785 {
2786 length= uint2korr(field_length_data);
2787 field_length_data+= 2;
2788 }
2789 break;
2790 case FIELD_VARCHAR:
2791 if (column->length <= 256)
2792 {
2793 length= (uint) (uchar) *field_length_data++;
2794 field_pos++; /* Skip length uchar */
2795 }
2796 else
2797 {
2798 length= uint2korr(field_length_data);
2799 field_length_data+= 2;
2800 field_pos+= 2;
2801 }
2802 DBUG_ASSERT(length <= column->length);
2803 break;
2804 default: /* Wrong data */
2805 DBUG_ASSERT(!maria_assert_if_crashed_table);
2806 length=0;
2807 break;
2808 }
2809 if (!tmp_data_used && tmp_data + length > end_of_data)
2810 {
2811 /* Data didn't fit in page; Change to use tmp buffer */
2812 tmp_data_used= tmp_data;
2813 tmp_data= info->rec_buff;
2814 }
2815 memcpy((char*) tmp_data, field_pos, length);
2816 tmp_data+= length;
2817 }
2818
2819 block= head_block + head_block->sub_blocks; /* Point to first blob data */
2820
2821 end_column= column + share->base.blobs;
2822 blob_lengths= row->blob_lengths;
2823 if (!tmp_data_used)
2824 {
2825 /* Still room on page; Copy as many blobs we can into this page */
2826 data= tmp_data;
2827 for (; column < end_column &&
2828 *blob_lengths <= (ulong)(end_of_data - data);
2829 column++, blob_lengths++)
2830 {
2831 uchar *tmp_pos;
2832 uint length;
2833 if (!*blob_lengths) /* Null or "" */
2834 continue;
2835 length= column->length - portable_sizeof_char_ptr;
2836 memcpy(&tmp_pos, record + column->offset + length, sizeof(char*));
2837 memcpy(data, tmp_pos, *blob_lengths);
2838 data+= *blob_lengths;
2839 /*
2840 The following is not true when we want to insert data into original
2841 place. In this case we don't have any extra blocks allocated
2842 */
2843 if (likely(undo_lsn == LSN_ERROR))
2844 {
2845 /* Skip over tail page that was prepared for storing blob */
2846 block++;
2847 bitmap_blocks->tail_page_skipped= 1;
2848 }
2849 }
2850 if (head_block->sub_blocks > 1)
2851 {
2852 /* We have allocated pages that where not used */
2853 bitmap_blocks->page_skipped= 1;
2854 }
2855 }
2856 else
2857 data= tmp_data_used; /* Get last used on page */
2858
2859 /* Update page directory */
2860 head_length= (uint) (data - row_pos->data);
2861 DBUG_PRINT("info", ("Used head length on page: %u header_length: %u",
2862 head_length,
2863 (uint) (flag & ROW_FLAG_TRANSID ? TRANSID_SIZE : 0)));
2864 if (head_length < share->base.min_block_length)
2865 {
2866 /* Extend row to be of size min_block_length */
2867 uint diff_length= share->base.min_block_length - head_length;
2868 bzero(data, diff_length);
2869 data+= diff_length;
2870 head_length= share->base.min_block_length;
2871 }
2872 DBUG_ASSERT(data <= end_of_data);
2873 /*
2874 If this is a redo entry (ie, undo_lsn != LSN_ERROR) then we should have
2875 written exactly head_length bytes (same as original record).
2876 */
2877 DBUG_ASSERT(undo_lsn == LSN_ERROR || head_length == row_pos->length);
2878 int2store(row_pos->dir + 2, head_length);
2879 /* update empty space at start of block */
2880 row_pos->empty_space-= head_length;
2881 int2store(page_buff + EMPTY_SPACE_OFFSET, row_pos->empty_space);
2882 /* Mark in bitmaps how the current page was actually used */
2883 head_block->empty_space= row_pos->empty_space;
2884 if (page_buff[DIR_COUNT_OFFSET] == MAX_ROWS_PER_PAGE &&
2885 page_buff[DIR_FREE_OFFSET] == END_OF_DIR_FREE_LIST)
2886 head_block->empty_space= 0; /* Page is full */
2887 head_block->used|= BLOCKUSED_USED;
2888
2889 check_directory(share,
2890 page_buff, share->block_size, share->base.min_block_length,
2891 (uint) -1);
2892
2893 /*
2894 Now we have to write tail pages, as we need to store the position
2895 to them in the row extent header.
2896
2897 We first write out all blob tails, to be able to store them in
2898 the current page or 'tmp_data'.
2899
2900 Then we write the tail of the non-blob fields (The position to the
2901 tail page is stored either in row header, the extents in the head
2902 page or in the first full page of the non-blob data. It's never in
2903 the tail page of the non-blob data)
2904 */
2905
2906 blob_full_pages_exists= 0;
2907 if (row_extents_in_use)
2908 {
2909 if (column != end_column) /* If blob fields */
2910 {
2911 MARIA_COLUMNDEF *save_column= column;
2912 MARIA_BITMAP_BLOCK *save_block= block;
2913 MARIA_BITMAP_BLOCK *end_block;
2914 ulong *save_blob_lengths= blob_lengths;
2915
2916 for (; column < end_column; column++, blob_lengths++)
2917 {
2918 uchar *blob_pos;
2919 if (!*blob_lengths) /* Null or "" */
2920 continue;
2921 if (block[block->sub_blocks - 1].used & BLOCKUSED_TAIL)
2922 {
2923 uint length;
2924 length= column->length - portable_sizeof_char_ptr;
2925 memcpy(&blob_pos, record + column->offset + length, sizeof(char*));
2926 length= *blob_lengths % FULL_PAGE_SIZE(share); /* tail size */
2927 if (length != *blob_lengths)
2928 blob_full_pages_exists= 1;
2929 if (write_tail(info, block + block->sub_blocks-1,
2930 blob_pos + *blob_lengths - length,
2931 length))
2932 goto disk_err;
2933 }
2934 else
2935 blob_full_pages_exists= 1;
2936
2937 for (end_block= block + block->sub_blocks; block < end_block; block++)
2938 {
2939 /*
2940 Set only a bit, to not cause bitmap code to believe a block is full
2941 when there is still a lot of entries in it.
2942 */
2943 block->used|= BLOCKUSED_USED;
2944 }
2945 }
2946 DBUG_ASSERT((undo_lsn == LSN_ERROR ||
2947 block == bitmap_blocks->block + bitmap_blocks->count));
2948 column= save_column;
2949 block= save_block;
2950 blob_lengths= save_blob_lengths;
2951 }
2952
2953 if (tmp_data_used) /* non blob data overflows */
2954 {
2955 MARIA_BITMAP_BLOCK *cur_block, *end_block, *last_head_block;
2956 MARIA_BITMAP_BLOCK *head_tail_block= 0;
2957 ulong length;
2958 ulong data_length= (ulong) (tmp_data - info->rec_buff);
2959
2960#ifdef SANITY_CHECKS
2961 DBUG_ASSERT(head_block->sub_blocks != 1);
2962 if (head_block->sub_blocks == 1)
2963 goto crashed; /* no reserved full or tails */
2964#endif
2965 /*
2966 Find out where to write tail for non-blob fields.
2967
2968 Problem here is that the bitmap code may have allocated more
2969 space than we need. We have to handle the following cases:
2970
2971 - Bitmap code allocated a tail page we don't need.
2972 - The last full page allocated needs to be changed to a tail page
2973 (Because we where able to put more data on the head page than
2974 the bitmap allocation assumed)
2975
2976 The reserved pages in bitmap_blocks for the main page has one of
2977 the following allocations:
2978 - Full pages, with following blocks:
2979 # * full pages
2980 empty page ; To be used if we change last full to tail page. This
2981 has 'count' = 0.
2982 tail page (optional, if last full page was part full)
2983 - One tail page
2984 */
2985
2986 cur_block= head_block + 1;
2987 end_block= head_block + head_block->sub_blocks;
2988 /*
2989 Loop until we have find a block bigger than we need or
2990 we find the empty page block.
2991 */
2992 while (data_length >= (length= (cur_block->page_count *
2993 FULL_PAGE_SIZE(share))) &&
2994 cur_block->page_count)
2995 {
2996#ifdef SANITY_CHECKS
2997 DBUG_ASSERT(!((cur_block == end_block) ||
2998 (cur_block->used & BLOCKUSED_USED)));
2999 if ((cur_block == end_block) || (cur_block->used & BLOCKUSED_USED))
3000 goto crashed;
3001#endif
3002 data_length-= length;
3003 (cur_block++)->used|= BLOCKUSED_USED;
3004 }
3005 last_head_block= cur_block;
3006 if (data_length)
3007 {
3008 if (cur_block->page_count == 0)
3009 {
3010 /* Skip empty filler block */
3011 cur_block++;
3012 }
3013#ifdef SANITY_CHECKS
3014 DBUG_ASSERT(!(cur_block >= end_block));
3015 if ((cur_block >= end_block))
3016 goto crashed;
3017#endif
3018 if (cur_block->used & BLOCKUSED_TAIL)
3019 {
3020 DBUG_ASSERT(data_length < MAX_TAIL_SIZE(block_size));
3021 /* tail written to tail page */
3022 cur_block->used|= BLOCKUSED_USED;
3023 head_tail_block= cur_block;
3024 }
3025 else if (data_length > length - MAX_TAIL_SIZE(block_size))
3026 {
3027 /* tail written to full page */
3028 cur_block->used|= BLOCKUSED_USED;
3029 if ((cur_block != end_block - 1) &&
3030 (end_block[-1].used & BLOCKUSED_TAIL))
3031 bitmap_blocks->tail_page_skipped= 1;
3032 }
3033 else
3034 {
3035 /*
3036 cur_block is a full block, followed by an empty and optional
3037 tail block. Change cur_block to a tail block or split it
3038 into full blocks and tail blocks.
3039
3040 TODO:
3041 If there is enough space on the following tail block, use
3042 this instead of creating a new tail block.
3043 */
3044 DBUG_ASSERT(cur_block[1].page_count == 0);
3045 if (cur_block->page_count == 1)
3046 {
3047 /* convert full block to tail block */
3048 cur_block->used|= BLOCKUSED_USED | BLOCKUSED_TAIL;
3049 head_tail_block= cur_block;
3050 }
3051 else
3052 {
3053 DBUG_ASSERT(data_length < length - FULL_PAGE_SIZE(share));
3054 DBUG_PRINT("info", ("Splitting blocks into full and tail"));
3055 cur_block[1].page= (cur_block->page + cur_block->page_count - 1);
3056 cur_block[1].page_count= 1; /* Avoid DBUG_ASSERT */
3057 cur_block[1].used= BLOCKUSED_USED | BLOCKUSED_TAIL;
3058 cur_block->page_count--;
3059 cur_block->used|= BLOCKUSED_USED;
3060 last_head_block= head_tail_block= cur_block+1;
3061 }
3062 if (end_block[-1].used & BLOCKUSED_TAIL)
3063 bitmap_blocks->tail_page_skipped= 1;
3064 }
3065 }
3066 else
3067 {
3068 /* Must be an empty or tail page */
3069 DBUG_ASSERT(cur_block->page_count == 0 ||
3070 cur_block->used & BLOCKUSED_TAIL);
3071 if (end_block[-1].used & BLOCKUSED_TAIL)
3072 bitmap_blocks->tail_page_skipped= 1;
3073 }
3074
3075 /*
3076 Write all extents into page or tmp_data
3077
3078 Note that we still don't have a correct position for the tail
3079 of the non-blob fields.
3080 */
3081 store_extent_info(row_extents_first_part,
3082 row_extents_second_part,
3083 head_block+1, bitmap_blocks->count - 1);
3084 if (head_tail_block)
3085 {
3086 ulong block_length= (ulong) (tmp_data - info->rec_buff);
3087 uchar *extent_data;
3088
3089 length= (uint) (block_length % FULL_PAGE_SIZE(share));
3090 if (write_tail(info, head_tail_block,
3091 info->rec_buff + block_length - length,
3092 length))
3093 goto disk_err;
3094 tmp_data-= length; /* Remove the tail */
3095 if (tmp_data == info->rec_buff)
3096 {
3097 /* We have no full blocks to write for the head part */
3098 tmp_data_used= 0;
3099 }
3100
3101 /* Store the tail position for the non-blob fields */
3102 if (head_tail_block == head_block + 1)
3103 {
3104 /*
3105 We had a head block + tail block, which means that the
3106 tail block is the first extent
3107 */
3108 extent_data= row_extents_first_part;
3109 }
3110 else
3111 {
3112 /*
3113 We have a head block + some full blocks + tail block
3114 last_head_block is pointing after the last used extent
3115 for the head block.
3116 */
3117 extent_data= row_extents_second_part +
3118 ((last_head_block - head_block) - 2) * ROW_EXTENT_SIZE;
3119 }
3120 /* Write information for tail block in the reserved space */
3121 page_store(extent_data, head_tail_block->page);
3122 pagerange_store(extent_data + PAGE_STORE_SIZE,
3123 head_tail_block->page_count);
3124 }
3125 }
3126 else
3127 store_extent_info(row_extents_first_part,
3128 row_extents_second_part,
3129 head_block+1, bitmap_blocks->count - 1);
3130 }
3131
3132 if (share->now_transactional)
3133 {
3134 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
3135 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
3136
3137 /* Log REDO changes of head page */
3138 page_store(log_data + FILEID_STORE_SIZE, head_block->page);
3139 dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
3140 row_pos->rownr);
3141 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
3142 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
3143 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= row_pos->data;
3144 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= head_length;
3145 if (translog_write_record(&lsn,
3146 head_block_is_read ?
3147 LOGREC_REDO_INSERT_ROW_HEAD :
3148 LOGREC_REDO_NEW_ROW_HEAD,
3149 info->trn,
3150 info,
3151 (translog_size_t) (sizeof(log_data) +
3152 head_length),
3153 TRANSLOG_INTERNAL_PARTS + 2, log_array,
3154 log_data, NULL))
3155 goto disk_err;
3156 }
3157
3158#ifdef RECOVERY_EXTRA_DEBUG
3159 if (info->trn->undo_lsn != LSN_IMPOSSIBLE)
3160 {
3161 /* Stop right after the REDO; testing incomplete log record groups */
3162 DBUG_EXECUTE_IF("maria_flush_whole_log",
3163 {
3164 DBUG_PRINT("maria_flush_whole_log", ("now"));
3165 translog_flush(translog_get_horizon());
3166 });
3167 DBUG_EXECUTE_IF("maria_crash",
3168 { DBUG_PRINT("maria_crash", ("now")); DBUG_SUICIDE(); });
3169 }
3170#endif
3171
3172 if (head_block_is_read)
3173 {
3174 MARIA_PINNED_PAGE *page_link;
3175 /* Head page is always the first pinned page */
3176 page_link= dynamic_element(&info->pinned_pages, 0,
3177 MARIA_PINNED_PAGE*);
3178 pagecache_unlock_by_link(share->pagecache, page_link->link,
3179 PAGECACHE_LOCK_WRITE_TO_READ,
3180 PAGECACHE_PIN_LEFT_PINNED, LSN_IMPOSSIBLE,
3181 LSN_IMPOSSIBLE, 1, FALSE);
3182 page_link->unlock= PAGECACHE_LOCK_READ_UNLOCK;
3183 page_link->changed= 1;
3184 }
3185 else
3186 {
3187 if (pagecache_write(share->pagecache,
3188 &info->dfile, head_block->page, 0,
3189 page_buff, share->page_type,
3190 head_block_is_read ? PAGECACHE_LOCK_WRITE_TO_READ :
3191 PAGECACHE_LOCK_READ,
3192 head_block_is_read ? PAGECACHE_PIN_LEFT_PINNED :
3193 PAGECACHE_PIN,
3194 PAGECACHE_WRITE_DELAY, &page_link.link,
3195 LSN_IMPOSSIBLE))
3196 goto disk_err;
3197 DBUG_ASSERT(page_link.link);
3198 page_link.unlock= PAGECACHE_LOCK_READ_UNLOCK;
3199 page_link.changed= 1;
3200 push_dynamic(&info->pinned_pages, (void*) &page_link);
3201
3202 /* Increase data file size, if extended */
3203 position= (my_off_t) head_block->page * block_size;
3204 if (share->state.state.data_file_length <= position)
3205 _ma_set_share_data_file_length(share, position + block_size);
3206 }
3207
3208 if (share->now_transactional && (tmp_data_used || blob_full_pages_exists))
3209 {
3210 /*
3211 Log REDO writes for all full pages (head part and all blobs)
3212 We write all here to be able to generate the UNDO record early
3213 so that we can write the LSN for the UNDO record to all full pages.
3214 */
3215 uchar tmp_log_data[FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
3216 (ROW_EXTENT_SIZE + BLOCK_FILLER_SIZE + SUB_RANGE_SIZE) *
3217 ROW_EXTENTS_ON_STACK];
3218 uchar *log_data, *log_pos;
3219 LEX_CUSTRING tmp_log_array[TRANSLOG_INTERNAL_PARTS + 2 +
3220 ROW_EXTENTS_ON_STACK];
3221 LEX_CUSTRING *log_array_pos, *log_array;
3222 int error;
3223 translog_size_t log_entry_length= 0;
3224 uint ext_length, extents= 0, sub_extents= 0;
3225
3226 /* If few extents, then allocate things on stack to avoid a malloc call */
3227 if (bitmap_blocks->count < ROW_EXTENTS_ON_STACK)
3228 {
3229 log_array= tmp_log_array;
3230 log_data= tmp_log_data;
3231 }
3232 else
3233 {
3234 if (!my_multi_malloc(MY_WME, &log_array,
3235 (uint) ((bitmap_blocks->count +
3236 TRANSLOG_INTERNAL_PARTS + 2) *
3237 sizeof(*log_array)),
3238 &log_data, FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE +
3239 bitmap_blocks->count * (ROW_EXTENT_SIZE +
3240 BLOCK_FILLER_SIZE +
3241 SUB_RANGE_SIZE),
3242 NullS))
3243 goto disk_err;
3244 }
3245 log_pos= log_data + FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE * 2;
3246 log_array_pos= log_array+ TRANSLOG_INTERNAL_PARTS+1;
3247
3248 if (tmp_data_used)
3249 {
3250 /* Full head page */
3251 translog_size_t block_length= (translog_size_t) (tmp_data -
3252 info->rec_buff);
3253 log_pos= store_page_range(share,
3254 log_pos, head_block+1,
3255 (ulong) block_length, &extents);
3256 log_array_pos->str= info->rec_buff;
3257 log_array_pos->length= block_length;
3258 log_entry_length+= block_length;
3259 log_array_pos++;
3260 sub_extents++;
3261 }
3262 if (blob_full_pages_exists)
3263 {
3264 MARIA_COLUMNDEF *tmp_column= column;
3265 ulong *tmp_blob_lengths= blob_lengths;
3266 MARIA_BITMAP_BLOCK *tmp_block= block;
3267
3268 /* Full blob pages */
3269 for (; tmp_column < end_column; tmp_column++, tmp_blob_lengths++)
3270 {
3271 ulong blob_length;
3272 uint length;
3273
3274 if (!*tmp_blob_lengths) /* Null or "" */
3275 continue;
3276 blob_length= *tmp_blob_lengths;
3277 length= tmp_column->length - portable_sizeof_char_ptr;
3278 /*
3279 If last part of blog was on tail page, change blob_length to
3280 reflect this
3281 */
3282 if (tmp_block[tmp_block->sub_blocks - 1].used & BLOCKUSED_TAIL)
3283 blob_length-= (blob_length % FULL_PAGE_SIZE(share));
3284 if (blob_length)
3285 {
3286 memcpy((void*) &log_array_pos->str,
3287 record + tmp_column->offset + length,
3288 sizeof(uchar*));
3289 log_array_pos->length= blob_length;
3290 log_entry_length+= blob_length;
3291 log_array_pos++;
3292 sub_extents++;
3293
3294 log_pos= store_page_range(share,
3295 log_pos, tmp_block,
3296 blob_length, &extents);
3297 }
3298 tmp_block+= tmp_block->sub_blocks;
3299 }
3300 }
3301
3302 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
3303 ext_length= (uint) (log_pos - log_data);
3304 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= ext_length;
3305 pagerange_store(log_data+ FILEID_STORE_SIZE, extents);
3306 pagerange_store(log_data+ FILEID_STORE_SIZE + PAGERANGE_STORE_SIZE,
3307 sub_extents);
3308
3309 log_entry_length+= ext_length;
3310 /* trn->rec_lsn is already set earlier in this function */
3311 error= translog_write_record(&lsn, LOGREC_REDO_INSERT_ROW_BLOBS,
3312 info->trn, info, log_entry_length,
3313 (uint) (log_array_pos - log_array),
3314 log_array, log_data, NULL);
3315 if (log_array != tmp_log_array)
3316 my_free(log_array);
3317 if (error)
3318 goto disk_err;
3319 }
3320
3321 /* Write UNDO or CLR record */
3322 lsn= LSN_IMPOSSIBLE;
3323 if (share->now_transactional)
3324 {
3325 LEX_CUSTRING *log_array= info->log_row_parts;
3326
3327 if (undo_lsn != LSN_ERROR)
3328 {
3329 /*
3330 Store if this CLR is about UNDO_DELETE or UNDO_UPDATE;
3331 in the first case, Recovery, when it sees the CLR_END in the
3332 REDO phase, may decrement the records' count.
3333 */
3334 if (_ma_write_clr(info, undo_lsn,
3335 old_record ? LOGREC_UNDO_ROW_UPDATE :
3336 LOGREC_UNDO_ROW_DELETE,
3337 share->calc_checksum != 0,
3338 row->checksum - old_record_checksum,
3339 &lsn, (void*) 0))
3340 goto disk_err;
3341 }
3342 else
3343 {
3344 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
3345 PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
3346 HA_CHECKSUM_STORE_SIZE + 2 + PAGERANGE_STORE_SIZE +
3347 ROW_EXTENT_SIZE];
3348 uchar *log_pos;
3349 ha_checksum checksum_delta;
3350
3351 /* LOGREC_UNDO_ROW_INSERT & LOGREC_UNDO_ROW_UPDATE share same header */
3352 lsn_store(log_data, info->trn->undo_lsn);
3353 page_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
3354 head_block->page);
3355 dirpos_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
3356 PAGE_STORE_SIZE,
3357 row_pos->rownr);
3358 log_pos= (log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE +
3359 PAGE_STORE_SIZE + DIRPOS_STORE_SIZE);
3360 store_checksum_in_rec(share, checksum_delta,
3361 row->checksum - old_record_checksum,
3362 log_pos, log_pos);
3363 compile_time_assert(sizeof(ha_checksum) == HA_CHECKSUM_STORE_SIZE);
3364
3365 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
3366 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
3367 log_data);
3368
3369 if (!old_record)
3370 {
3371 /* Store undo_lsn in case we are aborting the insert */
3372 row->orig_undo_lsn= info->trn->undo_lsn;
3373 /* Write UNDO log record for the INSERT */
3374 if (translog_write_record(&lsn, LOGREC_UNDO_ROW_INSERT,
3375 info->trn, info,
3376 (translog_size_t)
3377 log_array[TRANSLOG_INTERNAL_PARTS +
3378 0].length,
3379 TRANSLOG_INTERNAL_PARTS + 1,
3380 log_array,
3381 log_data + LSN_STORE_SIZE, &checksum_delta))
3382 goto disk_err;
3383 }
3384 else
3385 {
3386 /* Write UNDO log record for the UPDATE */
3387 size_t row_length, extents_length;
3388 uint row_parts_count, cur_head_length;
3389
3390 /*
3391 Write head length and extents of the original row so that we
3392 during UNDO can put it back in the original position.
3393 We don't store size for TRANSID, as we don't write this during
3394 UNDO.
3395 */
3396 cur_head_length= (info->cur_row.head_length -
3397 info->cur_row.header_length);
3398 int2store(log_pos, cur_head_length);
3399 pagerange_store(log_pos + 2, info->cur_row.extents_count);
3400 log_pos+= 2 + PAGERANGE_STORE_SIZE;
3401 log_array[TRANSLOG_INTERNAL_PARTS + 0].length+= (2 +
3402 PAGERANGE_STORE_SIZE);
3403 info->log_row_parts[TRANSLOG_INTERNAL_PARTS+1].str=
3404 info->cur_row.extents;
3405 info->log_row_parts[TRANSLOG_INTERNAL_PARTS+1].length=
3406 extents_length= info->cur_row.extents_count * ROW_EXTENT_SIZE;
3407
3408 row_length= fill_update_undo_parts(info, old_record, record,
3409 log_array +
3410 TRANSLOG_INTERNAL_PARTS + 2,
3411 &row_parts_count);
3412 if (translog_write_record(&lsn, LOGREC_UNDO_ROW_UPDATE, info->trn,
3413 info,
3414 (translog_size_t)
3415 (log_array[TRANSLOG_INTERNAL_PARTS +
3416 0].length + extents_length +
3417 row_length),
3418 TRANSLOG_INTERNAL_PARTS + 2 +
3419 row_parts_count,
3420 log_array,
3421 log_data + LSN_STORE_SIZE,
3422 &checksum_delta))
3423 goto disk_err;
3424 }
3425 }
3426 }
3427 /* Release not used space in used pages */
3428 if (_ma_bitmap_release_unused(info, bitmap_blocks))
3429 goto disk_err;
3430 _ma_unpin_all_pages(info, lsn);
3431
3432 if (tmp_data_used)
3433 {
3434 /*
3435 Write data stored in info->rec_buff to pages
3436 This is the char/varchar data that didn't fit into the head page.
3437 */
3438 DBUG_ASSERT(bitmap_blocks->count != 0);
3439 if (write_full_pages(info, lsn, head_block + 1,
3440 info->rec_buff, (ulong) (tmp_data - info->rec_buff)))
3441 goto disk_err;
3442 }
3443
3444 /* Write rest of blobs (data, but no tails as they are already written) */
3445 for (; column < end_column; column++, blob_lengths++)
3446 {
3447 uchar *blob_pos;
3448 uint length;
3449 ulong blob_length;
3450 if (!*blob_lengths) /* Null or "" */
3451 continue;
3452 length= column->length - portable_sizeof_char_ptr;
3453 memcpy(&blob_pos, record + column->offset + length, sizeof(char*));
3454 /* remove tail part */
3455 blob_length= *blob_lengths;
3456 if (block[block->sub_blocks - 1].used & BLOCKUSED_TAIL)
3457 blob_length-= (blob_length % FULL_PAGE_SIZE(share));
3458
3459 if (blob_length && write_full_pages(info, lsn, block,
3460 blob_pos, blob_length))
3461 goto disk_err;
3462 block+= block->sub_blocks;
3463 }
3464
3465 _ma_finalize_row(info);
3466 DBUG_RETURN(0);
3467
3468crashed:
3469 DBUG_ASSERT(!maria_assert_if_crashed_table);
3470 /* Something was wrong with data on page */
3471 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
3472
3473disk_err:
3474 /**
3475 @todo RECOVERY we are going to let dirty pages go to disk while we have
3476 logged UNDO, this violates WAL. We must mark the table corrupted!
3477
3478 @todo RECOVERY we have written some REDOs without a closing UNDO,
3479 it's possible that a next operation by this transaction succeeds and then
3480 Recovery would glue the "orphan REDOs" to the succeeded operation and
3481 execute the failed REDOs. We need some mark "abort this group" in the
3482 log, or mark the table corrupted (then user will repair it and thus REDOs
3483 will be skipped).
3484
3485 @todo RECOVERY to not let write errors go unnoticed, pagecache_write()
3486 should take a MARIA_HA* in argument, and it it
3487 fails when flushing a page to disk it should call
3488 (*the_maria_ha->write_error_func)(the_maria_ha)
3489 and this hook will mark the table corrupted.
3490 Maybe hook should be stored in the pagecache's block structure, or in a
3491 hash "file->maria_ha*".
3492
3493 @todo RECOVERY we should distinguish below between log write error and
3494 table write error. The former should stop Maria immediately, the latter
3495 should mark the table corrupted.
3496 */
3497 /*
3498 Unpin all pinned pages to not cause problems for disk cache. This is
3499 safe to call even if we already called _ma_unpin_all_pages() above.
3500 */
3501 save_my_errno= my_errno;
3502 _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
3503 my_errno= save_my_errno;
3504 DBUG_RETURN(1);
3505}
3506
3507
3508/*
3509 @brief Write a record
3510
3511 @fn allocate_and_write_block_record()
3512 @param info Maria handler
3513 @param record Record to write
3514 @param row Information about fields in 'record'
3515 @param undo_lsn <> LSN_ERROR if we are executing an UNDO
3516
3517 @return
3518 @retval 0 ok
3519 @retval 1 Error
3520*/
3521
3522static my_bool allocate_and_write_block_record(MARIA_HA *info,
3523 const uchar *record,
3524 MARIA_ROW *row,
3525 LSN undo_lsn)
3526{
3527 struct st_row_pos_info row_pos;
3528 MARIA_BITMAP_BLOCKS *blocks= &row->insert_blocks;
3529 int save_my_errno;
3530 DBUG_ENTER("allocate_and_write_block_record");
3531
3532 _ma_bitmap_flushable(info, 1);
3533 if (_ma_bitmap_find_place(info, row, blocks))
3534 goto err; /* Error reading bitmap */
3535
3536 /*
3537 Sleep; a checkpoint will happen and should not send this over-allocated
3538 bitmap to disk but rather wait.
3539 */
3540 DBUG_EXECUTE_IF("maria_over_alloc_bitmap", sleep(10););
3541
3542 /* page will be pinned & locked by get_head_or_tail_page */
3543 if (get_head_or_tail_page(info, blocks->block, info->buff,
3544 MY_MAX(row->space_on_head_page,
3545 info->s->base.min_block_length),
3546 HEAD_PAGE,
3547 PAGECACHE_LOCK_WRITE, &row_pos))
3548 goto err;
3549 row->lastpos= ma_recordpos(blocks->block->page, row_pos.rownr);
3550 if (info->s->calc_checksum)
3551 {
3552 if (undo_lsn == LSN_ERROR)
3553 row->checksum= (info->s->calc_checksum)(info, record);
3554 else
3555 {
3556 /* _ma_apply_undo_row_delete() already set row's checksum. Verify it. */
3557 DBUG_ASSERT(row->checksum == (info->s->calc_checksum)(info, record));
3558 }
3559 }
3560 DBUG_PRINT("info", ("rowid: %lu (%lu:%u) length: %u", (ulong) row->lastpos,
3561 (ulong) ma_recordpos_to_page(row->lastpos),
3562 ma_recordpos_to_dir_entry(row->lastpos),
3563 row_pos.length));
3564 if (write_block_record(info, (uchar*) 0, record, row,
3565 blocks, blocks->block->org_bitmap_value != 0,
3566 &row_pos, undo_lsn, 0))
3567 goto err;
3568 /* Now let checkpoint happen but don't commit */
3569 DBUG_EXECUTE_IF("maria_over_alloc_bitmap", sleep(1000););
3570 DBUG_RETURN(0);
3571
3572err:
3573 save_my_errno= my_errno;
3574 if (info->non_flushable_state)
3575 _ma_bitmap_flushable(info, -1);
3576 _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
3577 my_errno= save_my_errno;
3578 DBUG_RETURN(1);
3579}
3580
3581
3582/*
3583 Write a record and return rowid for it
3584
3585 SYNOPSIS
3586 _ma_write_init_block_record()
3587 info Maria handler
3588 record Record to write
3589
3590 NOTES
3591 This is done BEFORE we write the keys to the row!
3592
3593 RETURN
3594 HA_OFFSET_ERROR Something went wrong
3595 # Rowid for row
3596*/
3597
3598MARIA_RECORD_POS _ma_write_init_block_record(MARIA_HA *info,
3599 const uchar *record)
3600{
3601 DBUG_ENTER("_ma_write_init_block_record");
3602
3603 calc_record_size(info, record, &info->cur_row);
3604 if (allocate_and_write_block_record(info, record,
3605 &info->cur_row, LSN_ERROR))
3606 DBUG_RETURN(HA_OFFSET_ERROR);
3607 DBUG_RETURN(info->cur_row.lastpos);
3608}
3609
3610
3611/*
3612 Dummy function for (*info->s->write_record)()
3613
3614 Nothing to do here, as we already wrote the record in
3615 _ma_write_init_block_record()
3616*/
3617
3618my_bool _ma_write_block_record(MARIA_HA *info __attribute__ ((unused)),
3619 const uchar *record __attribute__ ((unused)))
3620{
3621 return 0; /* Row already written */
3622}
3623
3624
3625/**
3626 @brief Remove row written by _ma_write_block_record() and log undo
3627
3628 @param info Maria handler
3629
3630 @note
3631 This is called in case we got a duplicate unique key while
3632 writing keys.
3633
3634 @return Operation status
3635 @retval 0 OK
3636 @retval 1 Error
3637*/
3638
3639my_bool _ma_write_abort_block_record(MARIA_HA *info)
3640{
3641 my_bool res= 0;
3642 MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
3643 MARIA_BITMAP_BLOCK *block, *end;
3644 LSN lsn= LSN_IMPOSSIBLE;
3645 MARIA_SHARE *share= info->s;
3646 DBUG_ENTER("_ma_write_abort_block_record");
3647
3648 _ma_bitmap_lock(share); /* Lock bitmap from other insert threads */
3649 if (delete_head_or_tail(info,
3650 ma_recordpos_to_page(info->cur_row.lastpos),
3651 ma_recordpos_to_dir_entry(info->cur_row.lastpos), 1,
3652 0))
3653 res= 1;
3654 for (block= blocks->block + 1, end= block + blocks->count - 1; block < end;
3655 block++)
3656 {
3657 if (block->used & BLOCKUSED_USED)
3658 {
3659 if (block->used & BLOCKUSED_TAIL)
3660 {
3661 /*
3662 block->page_count is set to the tail directory entry number in
3663 write_block_record()
3664 */
3665 if (delete_head_or_tail(info, block->page,
3666 block->page_count & ~TAIL_BIT,
3667 0, 0))
3668 res= 1;
3669 }
3670 else
3671 {
3672 if (free_full_page_range(info, block->page, block->page_count))
3673 res= 1;
3674 }
3675 }
3676 }
3677 _ma_bitmap_unlock(share);
3678 if (share->now_transactional)
3679 {
3680 if (_ma_write_clr(info, info->cur_row.orig_undo_lsn,
3681 LOGREC_UNDO_ROW_INSERT,
3682 share->calc_checksum != 0,
3683 (ha_checksum) 0 - info->cur_row.checksum,
3684 &lsn, (void*) 0))
3685 res= 1;
3686 }
3687 _ma_unpin_all_pages_and_finalize_row(info, lsn);
3688 DBUG_RETURN(res);
3689}
3690
3691
3692/*
3693 Update a record
3694
3695 NOTES
3696 For the moment, we assume that info->curr_row.extents is always updated
3697 when a row is read. In the future we may decide to read this on demand
3698 for rows split into many extents.
3699*/
3700
3701static my_bool _ma_update_block_record2(MARIA_HA *info,
3702 MARIA_RECORD_POS record_pos,
3703 const uchar *oldrec,
3704 const uchar *record,
3705 LSN undo_lsn)
3706{
3707 MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
3708 uchar *buff;
3709 MARIA_ROW *cur_row= &info->cur_row, *new_row= &info->new_row;
3710 MARIA_PINNED_PAGE page_link;
3711 uint rownr, org_empty_size, head_length;
3712 uint block_size= info->s->block_size;
3713 uint errpos __attribute__((unused)) = 0;
3714 uchar *dir;
3715 pgcache_page_no_t page;
3716 struct st_row_pos_info row_pos;
3717 my_bool res;
3718 ha_checksum old_checksum;
3719 MARIA_SHARE *share= info->s;
3720 DBUG_ENTER("_ma_update_block_record2");
3721 DBUG_PRINT("enter", ("rowid: %lu", (long) record_pos));
3722
3723#ifdef ENABLE_IF_PROBLEM_WITH_UPDATE
3724 DBUG_DUMP("oldrec", oldrec, share->base.reclength);
3725 DBUG_DUMP("newrec", record, share->base.reclength);
3726#endif
3727
3728 /*
3729 Checksums of new and old rows were computed by callers already; new
3730 row's was put into cur_row, old row's was put into new_row.
3731 */
3732 old_checksum= new_row->checksum;
3733 new_row->checksum= cur_row->checksum;
3734 calc_record_size(info, record, new_row);
3735 page= ma_recordpos_to_page(record_pos);
3736
3737 _ma_bitmap_flushable(info, 1);
3738 buff= pagecache_read(share->pagecache,
3739 &info->dfile, (pgcache_page_no_t) page, 0, 0,
3740 share->page_type,
3741 PAGECACHE_LOCK_WRITE, &page_link.link);
3742 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
3743 page_link.changed= buff != 0;
3744 push_dynamic(&info->pinned_pages, (void*) &page_link);
3745 if (!buff)
3746 goto err;
3747
3748 org_empty_size= uint2korr(buff + EMPTY_SPACE_OFFSET);
3749 rownr= ma_recordpos_to_dir_entry(record_pos);
3750 dir= dir_entry_pos(buff, block_size, rownr);
3751
3752 /*
3753 We can't use cur_row->head_length as the block may have been compacted
3754 since we read it.
3755 */
3756 head_length= uint2korr(dir + 2);
3757
3758 if ((org_empty_size + head_length) >= new_row->total_length)
3759 {
3760 uint rec_offset, length;
3761 MARIA_BITMAP_BLOCK block;
3762
3763 DBUG_PRINT("info", ("org_empty_size: %u org_length: %u new_length: %lu",
3764 org_empty_size, head_length,
3765 new_row->total_length));
3766
3767 /*
3768 We can fit the new row in the same page as the original head part
3769 of the row
3770 */
3771 block.org_bitmap_value= _ma_free_size_to_head_pattern(&share->bitmap,
3772 org_empty_size);
3773 if (extend_area_on_page(info, buff, dir, rownr,
3774 new_row->total_length, &org_empty_size,
3775 &rec_offset, &length, 1))
3776 {
3777 errpos= 1;
3778 goto err;
3779 }
3780
3781 row_pos.buff= buff;
3782 row_pos.rownr= rownr;
3783 row_pos.empty_space= org_empty_size;
3784 row_pos.dir= dir;
3785 row_pos.data= buff + rec_offset;
3786 row_pos.length= length;
3787 blocks->block= &block;
3788 blocks->count= 1;
3789 block.page= page;
3790 block.sub_blocks= 1;
3791 block.used= BLOCKUSED_USED | BLOCKUSED_USE_ORG_BITMAP;
3792 block.empty_space= row_pos.empty_space;
3793
3794 if (*cur_row->tail_positions &&
3795 delete_tails(info, cur_row->tail_positions))
3796 {
3797 errpos= 2;
3798 goto err;
3799 }
3800 if (cur_row->extents_count && free_full_pages(info, cur_row))
3801 {
3802 errpos= 3;
3803 goto err;
3804 }
3805 res= write_block_record(info, oldrec, record, new_row, blocks,
3806 1, &row_pos, undo_lsn, old_checksum);
3807 /* We can't update or delete this without re-reading it again */
3808 info->update&= ~HA_STATE_AKTIV;
3809 DBUG_RETURN(res);
3810 }
3811 /* Delete old row */
3812 if (*cur_row->tail_positions &&
3813 delete_tails(info, cur_row->tail_positions))
3814 {
3815 errpos= 4;
3816 goto err;
3817 }
3818 if (cur_row->extents_count && free_full_pages(info, cur_row))
3819 {
3820 errpos= 5;
3821 goto err;
3822 }
3823
3824 head_length= uint2korr(dir + 2);
3825 if (_ma_bitmap_find_new_place(info, new_row, page, head_length +
3826 org_empty_size, blocks))
3827 {
3828 errpos= 6;
3829 goto err;
3830 }
3831
3832 /*
3833 Allocate all size in block for record
3834 TODO:
3835 Need to improve this to do compact if we can fit one more blob into
3836 the head page
3837 */
3838 if ((head_length < new_row->space_on_head_page ||
3839 (new_row->total_length <= head_length &&
3840 org_empty_size + head_length >= new_row->total_length)))
3841 {
3842 _ma_compact_block_page(share,
3843 buff, rownr, 1,
3844 info->trn->min_read_from,
3845 share->base.min_block_length);
3846 org_empty_size= 0;
3847 head_length= uint2korr(dir + 2);
3848 }
3849
3850 row_pos.buff= buff;
3851 row_pos.rownr= rownr;
3852 row_pos.empty_space= org_empty_size + head_length;
3853 row_pos.dir= dir;
3854 row_pos.data= buff + uint2korr(dir);
3855 row_pos.length= head_length;
3856 if ((res= write_block_record(info, oldrec, record, new_row, blocks, 1,
3857 &row_pos, undo_lsn, old_checksum)))
3858 {
3859 errpos= 7;
3860 goto err;
3861 }
3862 DBUG_RETURN(0);
3863
3864err:
3865 DBUG_ASSERT(!maria_assert_if_crashed_table);
3866 DBUG_PRINT("error", ("errpos: %d", errpos));
3867 if (info->non_flushable_state)
3868 _ma_bitmap_flushable(info, -1);
3869 _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
3870 DBUG_RETURN(1);
3871}
3872
3873
3874/*
3875 @brief Store new row on it's original position
3876
3877 @note
3878 This is basicly a copy of _ma_update_block_record2
3879 When we have a purge thread for deleted row, we can remove this function
3880 and use _ma_update_block_record2 instead.
3881
3882 This is the main reason we don't make a lot of subfunctions that are
3883 common between _ma_update_block_record2() and this function.
3884
3885 Note: If something goes wrong we mark the file crashed
3886*/
3887
3888static my_bool _ma_update_at_original_place(MARIA_HA *info,
3889 pgcache_page_no_t page,
3890 uint rownr,
3891 uint length_on_head_page,
3892 uint extent_count,
3893 const uchar *extent_info,
3894 const uchar *oldrec,
3895 const uchar *record,
3896 LSN undo_lsn)
3897{
3898 MARIA_BITMAP_BLOCKS *blocks;
3899 MARIA_BITMAP_BLOCK *block;
3900 MARIA_ROW *cur_row= &info->cur_row, *new_row= &info->new_row;
3901 MARIA_PINNED_PAGE page_link;
3902 MARIA_SHARE *share= info->s;
3903 ha_checksum old_checksum;
3904 uint org_empty_size, empty_size;
3905 uint block_size= info->s->block_size;
3906 uchar *dir, *buff;
3907 struct st_row_pos_info row_pos;
3908 my_bool res;
3909 uint rec_offset, length;
3910 DBUG_ENTER("_ma_update_at_original_place");
3911
3912#ifdef ENABLE_IF_PROBLEM_WITH_UPDATE
3913 DBUG_DUMP("oldrec", oldrec, share->base.reclength);
3914 DBUG_DUMP("newrec", record, share->base.reclength);
3915#endif
3916
3917 /*
3918 Checksums of new and old rows were computed by callers already; new
3919 row's was put into cur_row, old row's was put into new_row.
3920 */
3921 old_checksum= new_row->checksum;
3922 new_row->checksum= cur_row->checksum;
3923 calc_record_size(info, record, new_row);
3924
3925 _ma_bitmap_flushable(info, 1);
3926 buff= pagecache_read(share->pagecache,
3927 &info->dfile, (pgcache_page_no_t) page, 0, 0,
3928 share->page_type,
3929 PAGECACHE_LOCK_WRITE, &page_link.link);
3930 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
3931 page_link.changed= buff != 0;
3932 push_dynamic(&info->pinned_pages, (void*) &page_link);
3933 if (!buff)
3934 goto err;
3935
3936 org_empty_size= uint2korr(buff + EMPTY_SPACE_OFFSET);
3937 dir= dir_entry_pos(buff, block_size, rownr);
3938
3939 if ((org_empty_size + cur_row->head_length) < length_on_head_page)
3940 {
3941 DBUG_PRINT("error",
3942 ("org_empty_size: %u head_length: %u length_on_page: %u",
3943 org_empty_size, (uint) cur_row->head_length,
3944 length_on_head_page));
3945 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
3946 goto err;
3947 }
3948
3949 /*
3950 We can fit the new row in the same page as the original head part
3951 of the row
3952 */
3953 empty_size= org_empty_size;
3954 if (extend_area_on_page(info, buff, dir, rownr,
3955 length_on_head_page, &empty_size,
3956 &rec_offset, &length, 1))
3957 goto err;
3958
3959 row_pos.buff= buff;
3960 row_pos.rownr= rownr;
3961 row_pos.empty_space= empty_size;
3962 row_pos.dir= dir;
3963 row_pos.data= buff + rec_offset;
3964
3965 /* Delete old row */
3966 if (*cur_row->tail_positions &&
3967 delete_tails(info, cur_row->tail_positions))
3968 goto err;
3969 if (cur_row->extents_count && free_full_pages(info, cur_row))
3970 goto err;
3971
3972 /* Change extent information to be usable by write_block_record() */
3973 blocks= &cur_row->insert_blocks;
3974 if (extent_to_bitmap_blocks(info, blocks, page, extent_count, extent_info))
3975 goto err;
3976 block= blocks->block;
3977 block->empty_space= row_pos.empty_space;
3978 block->org_bitmap_value=
3979 _ma_free_size_to_head_pattern(&share->bitmap,
3980 (enough_free_entries_on_page(share, buff) ?
3981 org_empty_size : 0));
3982
3983 DBUG_ASSERT(block->org_bitmap_value ==
3984 _ma_bitmap_get_page_bits(info, &info->s->bitmap, page));
3985 block->used|= BLOCKUSED_USE_ORG_BITMAP;
3986
3987 /*
3988 We have to use <= below as the new_row may be smaller than the original
3989 row as the new row doesn't have transaction id
3990 */
3991
3992 DBUG_ASSERT(blocks->count > 1 ||
3993 MY_MAX(new_row->total_length, share->base.min_block_length) <=
3994 length_on_head_page);
3995
3996 /* Store same amount of data on head page as on original page */
3997 row_pos.length= (length_on_head_page -
3998 (extent_count + 1 - blocks->count) * ROW_EXTENT_SIZE);
3999 set_if_bigger(row_pos.length, share->base.min_block_length);
4000 if ((res= write_block_record(info, oldrec, record, new_row, blocks,
4001 1, &row_pos, undo_lsn, old_checksum)))
4002 goto err;
4003 DBUG_RETURN(0);
4004
4005err:
4006 DBUG_ASSERT(!maria_assert_if_crashed_table);
4007 _ma_mark_file_crashed(share);
4008 if (info->non_flushable_state)
4009 _ma_bitmap_flushable(info, -1);
4010 _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
4011 DBUG_RETURN(1);
4012}
4013
4014
4015/* Wrapper for _ma_update_block_record2() used by ma_update() */
4016
4017my_bool _ma_update_block_record(MARIA_HA *info, MARIA_RECORD_POS record_pos,
4018 const uchar *orig_rec, const uchar *new_rec)
4019{
4020 return _ma_update_block_record2(info, record_pos, orig_rec, new_rec,
4021 LSN_ERROR);
4022}
4023
4024
4025/*
4026 Delete a directory entry
4027
4028 SYNOPSIS
4029 delete_dir_entry()
4030 buff Page buffer
4031 record_number Record number to delete
4032 empty_space Empty space on page after delete
4033
4034 RETURN
4035 -1 Error on page
4036 0 ok
4037 1 Page is now empty
4038*/
4039
4040static int delete_dir_entry(MARIA_SHARE *share,
4041 uchar *buff, uint record_number,
4042 uint *empty_space_res)
4043{
4044 uint block_size= share->block_size;
4045 uint number_of_records= (uint) buff[DIR_COUNT_OFFSET];
4046 uint length, empty_space;
4047 uchar *dir;
4048 DBUG_ENTER("delete_dir_entry");
4049 DBUG_PRINT("enter", ("record_number: %u number_of_records: %u",
4050 record_number, number_of_records));
4051
4052#ifdef SANITY_CHECKS
4053 if (record_number >= number_of_records ||
4054 record_number > ((block_size - LSN_SIZE - PAGE_TYPE_SIZE - 1 -
4055 PAGE_SUFFIX_SIZE) / DIR_ENTRY_SIZE))
4056 {
4057 DBUG_PRINT("error", ("record_number: %u number_of_records: %u",
4058 record_number, number_of_records));
4059
4060 DBUG_RETURN(-1);
4061 }
4062#endif
4063
4064 check_directory(share, buff, block_size, 0, (uint) -1);
4065 empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
4066 dir= dir_entry_pos(buff, block_size, record_number);
4067 length= uint2korr(dir + 2); /* Length of entry we just deleted */
4068 DBUG_ASSERT(uint2korr(dir) != 0 && length < block_size);
4069
4070 if (record_number == number_of_records - 1)
4071 {
4072 /* Delete this entry and all following free directory entries */
4073 uchar *end= buff + block_size - PAGE_SUFFIX_SIZE;
4074 number_of_records--;
4075 dir+= DIR_ENTRY_SIZE;
4076 empty_space+= DIR_ENTRY_SIZE;
4077
4078 /* Unlink and free the next empty ones */
4079 while (dir < end && dir[0] == 0 && dir[1] == 0)
4080 {
4081 number_of_records--;
4082 if (dir[2] == END_OF_DIR_FREE_LIST)
4083 buff[DIR_FREE_OFFSET]= dir[3];
4084 else
4085 {
4086 uchar *prev_entry= dir_entry_pos(buff, block_size, (uint) dir[2]);
4087 DBUG_ASSERT(uint2korr(prev_entry) == 0 && prev_entry[3] ==
4088 number_of_records);
4089 prev_entry[3]= dir[3];
4090 }
4091 if (dir[3] != END_OF_DIR_FREE_LIST)
4092 {
4093 uchar *next_entry= dir_entry_pos(buff, block_size, (uint) dir[3]);
4094 DBUG_ASSERT(uint2korr(next_entry) == 0 && next_entry[2] ==
4095 number_of_records);
4096 next_entry[2]= dir[2];
4097 }
4098 dir+= DIR_ENTRY_SIZE;
4099 empty_space+= DIR_ENTRY_SIZE;
4100 }
4101
4102 if (number_of_records == 0)
4103 {
4104 /* All entries on page deleted */
4105 DBUG_PRINT("info", ("Page marked as unallocated"));
4106 buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
4107#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
4108 {
4109 dir= dir_entry_pos(buff, block_size, record_number);
4110 bzero(dir, (record_number+1) * DIR_ENTRY_SIZE);
4111 }
4112#endif
4113 *empty_space_res= block_size;
4114 DBUG_RETURN(1);
4115 }
4116 buff[DIR_COUNT_OFFSET]= (uchar) number_of_records;
4117 }
4118 else
4119 {
4120 /* Update directory */
4121 dir[0]= dir[1]= 0;
4122 dir[2]= END_OF_DIR_FREE_LIST;
4123 if ((dir[3]= buff[DIR_FREE_OFFSET]) != END_OF_DIR_FREE_LIST)
4124 {
4125 /* Relink next entry to point to newly freed entry */
4126 uchar *next_entry= dir_entry_pos(buff, block_size, (uint) dir[3]);
4127 DBUG_ASSERT(uint2korr(next_entry) == 0 &&
4128 next_entry[2] == END_OF_DIR_FREE_LIST);
4129 next_entry[2]= record_number;
4130 }
4131 buff[DIR_FREE_OFFSET]= record_number;
4132 }
4133 empty_space+= length;
4134
4135 int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
4136 buff[PAGE_TYPE_OFFSET]|= (uchar) PAGE_CAN_BE_COMPACTED;
4137
4138 *empty_space_res= empty_space;
4139
4140 check_directory(share, buff, block_size, 0, empty_space);
4141 DBUG_RETURN(0);
4142}
4143
4144
4145/*
4146 Delete a head a tail part
4147
4148 SYNOPSIS
4149 delete_head_or_tail()
4150 info Maria handler
4151 page Page (not file offset!) on which the row is
4152 head 1 if this is a head page
4153 from_update 1 if we are called from update. In this case we
4154 leave the page as write locked as we may put
4155 the new row into the old position.
4156
4157 RETURN
4158 0 ok
4159 1 error
4160*/
4161
4162static my_bool delete_head_or_tail(MARIA_HA *info,
4163 pgcache_page_no_t page, uint record_number,
4164 my_bool head, my_bool from_update)
4165{
4166 MARIA_SHARE *share= info->s;
4167 uint empty_space;
4168 int res;
4169 my_bool page_is_empty;
4170 uchar *buff;
4171 LSN lsn;
4172 MARIA_PINNED_PAGE page_link;
4173 enum pagecache_page_lock lock_at_write, lock_at_unpin;
4174 DBUG_ENTER("delete_head_or_tail");
4175 DBUG_PRINT("enter", ("id: %lu (%lu:%u)",
4176 (ulong) ma_recordpos(page, record_number),
4177 (ulong) page, record_number));
4178
4179 buff= pagecache_read(share->pagecache,
4180 &info->dfile, page, 0, 0,
4181 share->page_type,
4182 PAGECACHE_LOCK_WRITE, &page_link.link);
4183 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
4184 page_link.changed= buff != 0;
4185 push_dynamic(&info->pinned_pages, (void*) &page_link);
4186 if (!buff)
4187 DBUG_RETURN(1);
4188 DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) ==
4189 (head ? HEAD_PAGE : TAIL_PAGE));
4190
4191 if (from_update)
4192 {
4193 lock_at_write= PAGECACHE_LOCK_LEFT_WRITELOCKED;
4194 lock_at_unpin= PAGECACHE_LOCK_WRITE_UNLOCK;
4195 }
4196 else
4197 {
4198 lock_at_write= PAGECACHE_LOCK_WRITE_TO_READ;
4199 lock_at_unpin= PAGECACHE_LOCK_READ_UNLOCK;
4200 }
4201
4202 res= delete_dir_entry(share, buff, record_number, &empty_space);
4203 if (res < 0)
4204 DBUG_RETURN(1);
4205 if (res == 0) /* after our deletion, page is still not empty */
4206 {
4207 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
4208 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
4209 page_is_empty= 0;
4210 if (share->now_transactional)
4211 {
4212 /* Log REDO data */
4213 page_store(log_data + FILEID_STORE_SIZE, page);
4214 dirpos_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE,
4215 record_number);
4216
4217 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
4218 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
4219 if (translog_write_record(&lsn, (head ? LOGREC_REDO_PURGE_ROW_HEAD :
4220 LOGREC_REDO_PURGE_ROW_TAIL),
4221 info->trn, info,
4222 (translog_size_t) sizeof(log_data),
4223 TRANSLOG_INTERNAL_PARTS + 1, log_array,
4224 log_data, NULL))
4225 DBUG_RETURN(1);
4226 }
4227 }
4228 else /* page is now empty */
4229 {
4230 page_is_empty= 1;
4231 if (share->now_transactional)
4232 {
4233 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE];
4234 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
4235 page_store(log_data + FILEID_STORE_SIZE, page);
4236 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
4237 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
4238 if (translog_write_record(&lsn, LOGREC_REDO_FREE_HEAD_OR_TAIL,
4239 info->trn, info,
4240 (translog_size_t) sizeof(log_data),
4241 TRANSLOG_INTERNAL_PARTS + 1, log_array,
4242 log_data, NULL))
4243 DBUG_RETURN(1);
4244 }
4245 /*
4246 Mark that this page must be written to disk by page cache, even
4247 if we could call pagecache_delete() on it.
4248 This is needed to ensure that repair finds the empty page on disk
4249 and not old data.
4250 */
4251 pagecache_set_write_on_delete_by_link(page_link.link);
4252 DBUG_ASSERT(empty_space >= share->bitmap.sizes[0]);
4253 }
4254
4255 pagecache_unlock_by_link(share->pagecache, page_link.link,
4256 lock_at_write,
4257 PAGECACHE_PIN_LEFT_PINNED, LSN_IMPOSSIBLE,
4258 LSN_IMPOSSIBLE, 1, FALSE);
4259 page_link.unlock= lock_at_unpin;
4260 set_dynamic(&info->pinned_pages, (void*) &page_link,
4261 info->pinned_pages.elements-1);
4262
4263 DBUG_PRINT("info", ("empty_space: %u", empty_space));
4264
4265 /*
4266 If there is not enough space for all possible tails, mark the
4267 page full
4268 */
4269 if (!head && !page_is_empty && !enough_free_entries(buff, share->block_size,
4270 1 + share->base.blobs))
4271 empty_space= 0;
4272
4273 DBUG_RETURN(_ma_bitmap_set(info, page, head, empty_space));
4274}
4275
4276
4277/*
4278 delete all tails
4279
4280 SYNOPSIS
4281 delete_tails()
4282 info Handler
4283 tails Pointer to vector of tail positions, ending with 0
4284
4285 RETURN
4286 0 ok
4287 1 error
4288*/
4289
4290static my_bool delete_tails(MARIA_HA *info, MARIA_RECORD_POS *tails)
4291{
4292 my_bool res= 0;
4293 DBUG_ENTER("delete_tails");
4294 for (; *tails; tails++)
4295 {
4296 if (delete_head_or_tail(info,
4297 ma_recordpos_to_page(*tails),
4298 ma_recordpos_to_dir_entry(*tails), 0, 1))
4299 res= 1;
4300 }
4301 DBUG_RETURN(res);
4302}
4303
4304
4305/*
4306 Delete a record
4307
4308 NOTES
4309 For the moment, we assume that info->cur_row.extents is always updated
4310 when a row is read. In the future we may decide to read this on demand
4311 for rows with many splits.
4312*/
4313
4314my_bool _ma_delete_block_record(MARIA_HA *info, const uchar *record)
4315{
4316 pgcache_page_no_t page;
4317 uint record_number;
4318 MARIA_SHARE *share= info->s;
4319 LSN lsn= LSN_IMPOSSIBLE;
4320 DBUG_ENTER("_ma_delete_block_record");
4321
4322 page= ma_recordpos_to_page(info->cur_row.lastpos);
4323 record_number= ma_recordpos_to_dir_entry(info->cur_row.lastpos);
4324 DBUG_PRINT("enter", ("rowid: %lu (%lu:%u)", (ulong) info->cur_row.lastpos,
4325 (ulong) page, record_number));
4326
4327 _ma_bitmap_flushable(info, 1);
4328 if (delete_head_or_tail(info, page, record_number, 1, 0) ||
4329 delete_tails(info, info->cur_row.tail_positions))
4330 goto err;
4331
4332 if (info->cur_row.extents_count && free_full_pages(info, &info->cur_row))
4333 goto err;
4334
4335 if (share->now_transactional)
4336 {
4337 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE +
4338 DIRPOS_STORE_SIZE + 2 + PAGERANGE_STORE_SIZE +
4339 HA_CHECKSUM_STORE_SIZE];
4340 uchar *log_pos;
4341 size_t row_length;
4342 uint row_parts_count, extents_length;
4343 ha_checksum checksum_delta;
4344
4345 /* Write UNDO record */
4346 lsn_store(log_data, info->trn->undo_lsn);
4347 page_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, page);
4348 log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE;
4349 dirpos_store(log_pos, record_number);
4350 log_pos+= DIRPOS_STORE_SIZE;
4351 int2store(log_pos, info->cur_row.head_length -
4352 info->cur_row.header_length);
4353 log_pos+= 2;
4354 pagerange_store(log_pos, info->cur_row.extents_count);
4355 log_pos+= PAGERANGE_STORE_SIZE;
4356
4357 info->log_row_parts[TRANSLOG_INTERNAL_PARTS].str= log_data;
4358 info->log_row_parts[TRANSLOG_INTERNAL_PARTS].length=
4359 sizeof(log_data) - HA_CHECKSUM_STORE_SIZE;
4360 store_checksum_in_rec(share, checksum_delta,
4361 (ha_checksum) 0 - info->cur_row.checksum, log_pos,
4362 info->log_row_parts[TRANSLOG_INTERNAL_PARTS +
4363 0].length);
4364 info->log_row_parts[TRANSLOG_INTERNAL_PARTS+1].str=
4365 info->cur_row.extents;
4366 info->log_row_parts[TRANSLOG_INTERNAL_PARTS+1].length=
4367 extents_length= info->cur_row.extents_count * ROW_EXTENT_SIZE;
4368
4369 row_length= fill_insert_undo_parts(info, record,
4370 (info->log_row_parts +
4371 TRANSLOG_INTERNAL_PARTS + 2),
4372 &row_parts_count);
4373
4374 if (translog_write_record(&lsn, LOGREC_UNDO_ROW_DELETE, info->trn,
4375 info,
4376 (translog_size_t)
4377 (info->log_row_parts[TRANSLOG_INTERNAL_PARTS +
4378 0].length + row_length +
4379 extents_length),
4380 TRANSLOG_INTERNAL_PARTS + 2 + row_parts_count,
4381 info->log_row_parts,
4382 log_data + LSN_STORE_SIZE,
4383 &checksum_delta))
4384 goto err;
4385 }
4386
4387 _ma_bitmap_flushable(info, -1);
4388 _ma_unpin_all_pages_and_finalize_row(info, lsn);
4389 DBUG_RETURN(0);
4390
4391err:
4392 DBUG_ASSERT(!maria_assert_if_crashed_table);
4393 _ma_bitmap_flushable(info, -1);
4394 _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
4395 DBUG_RETURN(1);
4396}
4397
4398
4399/****************************************************************************
4400 Reading of records
4401****************************************************************************/
4402
4403/*
4404 Read position to record from record directory at end of page
4405
4406 SYNOPSIS
4407 get_record_position()
4408 buff page buffer
4409 block_size block size for page
4410 record_number Record number in index
4411 end_of_data pointer to end of data for record
4412
4413 RETURN
4414 0 Error in data
4415 # Pointer to start of record.
4416 In this case *end_of_data is set.
4417*/
4418
4419static uchar *get_record_position(MARIA_SHARE *share, uchar *buff,
4420 uint record_number, uchar **end_of_data)
4421{
4422 uint block_size= share->block_size;
4423 uint number_of_records= (uint) buff[DIR_COUNT_OFFSET];
4424 uchar *dir;
4425 uchar *data;
4426 uint offset, length;
4427
4428#ifdef SANITY_CHECKS
4429 if (record_number >= number_of_records ||
4430 record_number > ((block_size - PAGE_HEADER_SIZE(share) - PAGE_SUFFIX_SIZE)
4431 / DIR_ENTRY_SIZE))
4432 {
4433 DBUG_PRINT("error",
4434 ("Wrong row number: record_number: %u number_of_records: %u",
4435 record_number, number_of_records));
4436 return 0;
4437 }
4438#endif
4439
4440 dir= dir_entry_pos(buff, block_size, record_number);
4441 offset= uint2korr(dir);
4442 length= uint2korr(dir + 2);
4443#ifdef SANITY_CHECKS
4444 if (offset < PAGE_HEADER_SIZE(share) ||
4445 offset + length > (block_size -
4446 number_of_records * DIR_ENTRY_SIZE -
4447 PAGE_SUFFIX_SIZE))
4448 {
4449 DBUG_PRINT("error",
4450 ("Wrong row position: record_number: %u offset: %u "
4451 "length: %u number_of_records: %u",
4452 record_number, offset, length, number_of_records));
4453 return 0;
4454 }
4455#endif
4456 data= buff + offset;
4457 *end_of_data= data + length;
4458 return data;
4459}
4460
4461
4462/*
4463 Init extent
4464
4465 NOTES
4466 extent is a cursor over which pages to read
4467*/
4468
4469static void init_extent(MARIA_EXTENT_CURSOR *extent, uchar *extent_info,
4470 uint extents, MARIA_RECORD_POS *tail_positions)
4471{
4472 uint page_count;
4473 extent->extent= extent_info;
4474 extent->extent_count= extents;
4475 extent->page= page_korr(extent_info); /* First extent */
4476 page_count= (uint2korr(extent_info + ROW_EXTENT_PAGE_SIZE) &
4477 ~START_EXTENT_BIT);
4478 extent->tail= page_count & TAIL_BIT;
4479 if (extent->tail)
4480 {
4481 extent->page_count= 1;
4482 extent->tail_row_nr= page_count & ~TAIL_BIT;
4483 }
4484 else
4485 extent->page_count= page_count;
4486 extent->tail_positions= tail_positions;
4487 extent->lock_for_tail_pages= PAGECACHE_LOCK_LEFT_UNLOCKED;
4488}
4489
4490
4491/*
4492 Read next extent
4493
4494 SYNOPSIS
4495 read_next_extent()
4496 info Maria handler
4497 extent Pointer to current extent (this is updated to point
4498 to next)
4499 end_of_data Pointer to end of data in read block (out)
4500
4501 NOTES
4502 New block is read into info->buff
4503
4504 RETURN
4505 0 Error; my_errno is set
4506 # Pointer to start of data in read block
4507 In this case end_of_data is updated to point to end of data.
4508*/
4509
4510static uchar *read_next_extent(MARIA_HA *info, MARIA_EXTENT_CURSOR *extent,
4511 uchar **end_of_data)
4512{
4513 MARIA_SHARE *share= info->s;
4514 uchar *buff, *data;
4515 MARIA_PINNED_PAGE page_link;
4516 enum pagecache_page_lock lock;
4517 DBUG_ENTER("read_next_extent");
4518
4519 if (!extent->page_count)
4520 {
4521 uint page_count;
4522 if (!--extent->extent_count)
4523 goto crashed;
4524 extent->extent+= ROW_EXTENT_SIZE;
4525 extent->page= page_korr(extent->extent);
4526 page_count= (uint2korr(extent->extent+ROW_EXTENT_PAGE_SIZE) &
4527 ~START_EXTENT_BIT);
4528 if (!page_count)
4529 goto crashed;
4530 extent->tail= page_count & TAIL_BIT;
4531 if (extent->tail)
4532 extent->tail_row_nr= page_count & ~TAIL_BIT;
4533 else
4534 extent->page_count= page_count;
4535 DBUG_PRINT("info",("New extent. Page: %lu page_count: %u tail_flag: %d",
4536 (ulong) extent->page, extent->page_count,
4537 extent->tail != 0));
4538 }
4539 extent->first_extent= 0;
4540
4541 lock= PAGECACHE_LOCK_LEFT_UNLOCKED;
4542 if (extent->tail)
4543 lock= extent->lock_for_tail_pages;
4544
4545 buff= pagecache_read(share->pagecache,
4546 &info->dfile, extent->page, 0,
4547 info->buff, share->page_type,
4548 lock, &page_link.link);
4549 if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
4550 {
4551 /* Read during UNDO */
4552 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
4553 page_link.changed= buff != 0;
4554 push_dynamic(&info->pinned_pages, (void*) &page_link);
4555 }
4556 if (!buff)
4557 {
4558 /* check if we tried to read over end of file (ie: bad data in record) */
4559 if ((extent->page + 1) * share->block_size >
4560 share->state.state.data_file_length)
4561 goto crashed;
4562 DBUG_RETURN(0);
4563 }
4564
4565 if (!extent->tail)
4566 {
4567 /* Full data page */
4568 if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != BLOB_PAGE)
4569 goto crashed;
4570 extent->page++; /* point to next page */
4571 extent->page_count--;
4572 *end_of_data= buff + share->block_size - PAGE_SUFFIX_SIZE;
4573 info->cur_row.full_page_count++; /* For maria_chk */
4574 DBUG_RETURN(extent->data_start= buff + FULL_PAGE_HEADER_SIZE(share));
4575 }
4576
4577 /* Found tail */
4578 if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != TAIL_PAGE)
4579 goto crashed;
4580 *(extent->tail_positions++)= ma_recordpos(extent->page,
4581 extent->tail_row_nr);
4582 info->cur_row.tail_count++; /* For maria_chk */
4583
4584 if (!(data= get_record_position(share, buff,
4585 extent->tail_row_nr,
4586 end_of_data)))
4587 goto crashed;
4588 extent->data_start= data;
4589 extent->page_count= 0; /* No more data in extent */
4590 DBUG_RETURN(data);
4591
4592
4593crashed:
4594 DBUG_ASSERT(!maria_assert_if_crashed_table);
4595 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
4596 DBUG_PRINT("error", ("wrong extent information"));
4597 DBUG_RETURN(0);
4598}
4599
4600
4601/*
4602 Read data that may be split over many blocks
4603
4604 SYNOPSIS
4605 read_long_data()
4606 info Maria handler
4607 to Store result string here (this is allocated)
4608 extent Pointer to current extent position
4609 data Current position in buffer
4610 end_of_data End of data in buffer
4611
4612 NOTES
4613 When we have to read a new buffer, it's read into info->buff
4614
4615 This loop is implemented by goto's instead of a for() loop as
4616 the code is notable smaller and faster this way (and it's not nice
4617 to jump into a for loop() or into a 'then' clause)
4618
4619 RETURN
4620 0 ok
4621 1 error
4622*/
4623
4624static my_bool read_long_data2(MARIA_HA *info, uchar *to, ulong length,
4625 MARIA_EXTENT_CURSOR *extent,
4626 uchar **data, uchar **end_of_data)
4627{
4628 uint left_length= (uint) (*end_of_data - *data);
4629 DBUG_ENTER("read_long_data2");
4630 DBUG_PRINT("enter", ("length: %lu left_length: %u",
4631 length, left_length));
4632 DBUG_ASSERT(*data <= *end_of_data);
4633
4634 /*
4635 Fields are never split in middle. This means that if length > rest-of-data
4636 we should start reading from the next extent. The reason we may have
4637 data left on the page is that if the fixed part of the row was less than
4638 min_block_length the head block was extended to min_block_length.
4639
4640 This may change in the future, which is why we have the loop written
4641 the way it's written.
4642 */
4643 if (extent->first_extent && length > left_length)
4644 {
4645 *end_of_data= *data;
4646 left_length= 0;
4647 }
4648
4649 for(;;)
4650 {
4651 if (unlikely(left_length >= length))
4652 {
4653 memcpy(to, *data, length);
4654 (*data)+= length;
4655 DBUG_PRINT("info", ("left_length: %u", left_length - (uint) length));
4656 DBUG_RETURN(0);
4657 }
4658 memcpy(to, *data, left_length);
4659 to+= left_length;
4660 length-= left_length;
4661 if (!(*data= read_next_extent(info, extent, end_of_data)))
4662 break;
4663 left_length= (uint) (*end_of_data - *data);
4664 }
4665 DBUG_RETURN(1);
4666}
4667
4668static inline my_bool read_long_data(MARIA_HA *info, uchar *to, ulong length,
4669 MARIA_EXTENT_CURSOR *extent,
4670 uchar **data, uchar **end_of_data)
4671{
4672 uint left_length= (uint) (*end_of_data - *data);
4673 if (likely(left_length >= length))
4674 {
4675 memcpy(to, *data, length);
4676 (*data)+= length;
4677 return 0;
4678 }
4679 return read_long_data2(info, to, length, extent, data, end_of_data);
4680}
4681
4682
4683/*
4684 Read a record from page (helper function for _ma_read_block_record())
4685
4686 SYNOPSIS
4687 _ma_read_block_record2()
4688 info Maria handler
4689 record Store record here
4690 data Start of head data for row
4691 end_of_data End of data for row
4692
4693 NOTES
4694 The head page is already read by caller
4695 Following data is update in info->cur_row:
4696
4697 cur_row.head_length is set to size of entry in head block
4698 cur_row.tail_positions is set to point to all tail blocks
4699 cur_row.extents points to extents data
4700 cur_row.extents_counts contains number of extents
4701 cur_row.empty_bits is set to empty bits
4702 cur_row.field_lengths contains packed length of all fields
4703 cur_row.blob_length contains total length of all blobs
4704 cur_row.checksum contains checksum of read record.
4705
4706 RETURN
4707 0 ok
4708 # Error code
4709*/
4710
4711int _ma_read_block_record2(MARIA_HA *info, uchar *record,
4712 uchar *data, uchar *end_of_data)
4713{
4714 MARIA_SHARE *share= info->s;
4715 uchar *UNINIT_VAR(field_length_data), *UNINIT_VAR(blob_buffer), *start_of_data;
4716 uint flag, null_bytes, cur_null_bytes, row_extents, field_lengths;
4717 my_bool found_blob= 0;
4718 MARIA_EXTENT_CURSOR extent;
4719 MARIA_COLUMNDEF *column, *end_column;
4720 MARIA_ROW *cur_row= &info->cur_row;
4721 DBUG_ENTER("_ma_read_block_record2");
4722
4723 start_of_data= data;
4724 flag= (uint) (uchar) data[0];
4725 cur_null_bytes= share->base.original_null_bytes;
4726 null_bytes= share->base.null_bytes;
4727 cur_row->head_length= (uint) (end_of_data - data);
4728 cur_row->full_page_count= cur_row->tail_count= 0;
4729 cur_row->blob_length= 0;
4730 /* Number of bytes in header that we don't need to write during undo */
4731 cur_row->header_length= total_header_size[(flag & PRECALC_HEADER_BITMASK)]-1;
4732
4733 if (flag & ROW_FLAG_TRANSID)
4734 {
4735 cur_row->trid= transid_korr(data+1);
4736 if (!info->trn)
4737 {
4738 /* File crashed */
4739 DBUG_ASSERT(!maria_assert_if_crashed_table);
4740 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
4741 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
4742 }
4743 if (!trnman_can_read_from(info->trn, cur_row->trid))
4744 DBUG_RETURN(my_errno= HA_ERR_ROW_NOT_VISIBLE);
4745 }
4746
4747 /* Skip trans header (for now, until we have MVCC csupport) */
4748 data+= cur_row->header_length + 1 ;
4749 if (flag & ROW_FLAG_NULLS_EXTENDED)
4750 cur_null_bytes+= data[-1];
4751
4752 row_extents= 0;
4753 if (flag & ROW_FLAG_EXTENTS)
4754 {
4755 uint row_extent_size;
4756 /*
4757 Record is split over many data pages.
4758 Get number of extents and first extent
4759 */
4760 get_key_length(row_extents, data);
4761 cur_row->extents_count= row_extents;
4762 row_extent_size= row_extents * ROW_EXTENT_SIZE;
4763 if (cur_row->extents_buffer_length < row_extent_size &&
4764 _ma_alloc_buffer(&cur_row->extents,
4765 &cur_row->extents_buffer_length,
4766 row_extent_size))
4767 DBUG_RETURN(my_errno);
4768 memcpy(cur_row->extents, data, ROW_EXTENT_SIZE);
4769 data+= ROW_EXTENT_SIZE;
4770 init_extent(&extent, cur_row->extents, row_extents,
4771 cur_row->tail_positions);
4772 }
4773 else
4774 {
4775 cur_row->extents_count= 0;
4776 (*cur_row->tail_positions)= 0;
4777 extent.page_count= 0;
4778 extent.extent_count= 1;
4779 }
4780 extent.first_extent= 1;
4781
4782 field_lengths= 0;
4783 if (share->base.max_field_lengths)
4784 {
4785 get_key_length(field_lengths, data);
4786 cur_row->field_lengths_length= field_lengths;
4787#ifdef SANITY_CHECKS
4788 if (field_lengths > share->base.max_field_lengths)
4789 goto err;
4790#endif
4791 }
4792
4793 if (share->calc_checksum)
4794 cur_row->checksum= (uint) (uchar) *data++;
4795 /* data now points on null bits */
4796 memcpy(record, data, cur_null_bytes);
4797 if (unlikely(cur_null_bytes != null_bytes))
4798 {
4799 /*
4800 This only happens if we have added more NULL columns with
4801 ALTER TABLE and are fetching an old, not yet modified old row
4802 */
4803 bzero(record + cur_null_bytes, (uint) (null_bytes - cur_null_bytes));
4804 }
4805 data+= null_bytes;
4806 /* We copy the empty bits to be able to use them for delete/update */
4807 memcpy(cur_row->empty_bits, data, share->base.pack_bytes);
4808 data+= share->base.pack_bytes;
4809
4810 /* TODO: Use field offsets, instead of just skipping them */
4811 data+= share->base.field_offsets * FIELD_OFFSET_SIZE;
4812
4813 /*
4814 Read row extents (note that first extent was already read into
4815 cur_row->extents above)
4816 */
4817 if (row_extents > 1)
4818 {
4819 if (read_long_data(info, cur_row->extents + ROW_EXTENT_SIZE,
4820 (row_extents - 1) * ROW_EXTENT_SIZE,
4821 &extent, &data, &end_of_data))
4822 DBUG_RETURN(my_errno);
4823 }
4824
4825 /*
4826 Data now points to start of fixed length field data that can't be null
4827 or 'empty'. Note that these fields can't be split over blocks.
4828 */
4829 for (column= share->columndef,
4830 end_column= column + share->base.fixed_not_null_fields;
4831 column < end_column; column++)
4832 {
4833 uint column_length= column->length;
4834 if (data + column_length > end_of_data &&
4835 !(data= read_next_extent(info, &extent, &end_of_data)))
4836 goto err;
4837 memcpy(record + column->offset, data, column_length);
4838 data+= column_length;
4839 }
4840
4841 /* Read array of field lengths. This may be stored in several extents */
4842 if (field_lengths)
4843 {
4844 field_length_data= cur_row->field_lengths;
4845 if (read_long_data(info, field_length_data, field_lengths, &extent,
4846 &data, &end_of_data))
4847 DBUG_RETURN(my_errno);
4848 }
4849
4850 /* Read variable length data. Each of these may be split over many extents */
4851 for (end_column= share->columndef + share->base.fields;
4852 column < end_column; column++)
4853 {
4854 enum en_fieldtype type= column->type;
4855 uchar *field_pos= record + column->offset;
4856 /* First check if field is present in record */
4857 if ((record[column->null_pos] & column->null_bit) ||
4858 (cur_row->empty_bits[column->empty_pos] & column->empty_bit))
4859 {
4860 bfill(record + column->offset, column->fill_length,
4861 type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
4862 continue;
4863 }
4864 switch (type) {
4865 case FIELD_NORMAL: /* Fixed length field */
4866 case FIELD_SKIP_PRESPACE:
4867 case FIELD_SKIP_ZERO: /* Fixed length field */
4868 if (data + column->length > end_of_data &&
4869 !(data= read_next_extent(info, &extent, &end_of_data)))
4870 goto err;
4871 memcpy(field_pos, data, column->length);
4872 data+= column->length;
4873 break;
4874 case FIELD_SKIP_ENDSPACE: /* CHAR */
4875 {
4876 /* Char that is space filled */
4877 uint length;
4878 if (column->length <= 255)
4879 length= (uint) (uchar) *field_length_data++;
4880 else
4881 {
4882 length= uint2korr(field_length_data);
4883 field_length_data+= 2;
4884 }
4885#ifdef SANITY_CHECKS
4886 if (length > column->length)
4887 goto err;
4888#endif
4889 if (read_long_data(info, field_pos, length, &extent, &data,
4890 &end_of_data))
4891 DBUG_RETURN(my_errno);
4892 bfill(field_pos + length, column->length - length, ' ');
4893 break;
4894 }
4895 case FIELD_VARCHAR:
4896 {
4897 ulong length;
4898 if (column->length <= 256)
4899 {
4900 length= (uint) (uchar) (*field_pos++= *field_length_data++);
4901 }
4902 else
4903 {
4904 length= uint2korr(field_length_data);
4905 field_pos[0]= field_length_data[0];
4906 field_pos[1]= field_length_data[1];
4907 field_pos+= 2;
4908 field_length_data+= 2;
4909 }
4910#ifdef SANITY_CHECKS
4911 if (length > column->length)
4912 goto err;
4913#endif
4914 if (read_long_data(info, field_pos, length, &extent, &data,
4915 &end_of_data))
4916 DBUG_RETURN(my_errno);
4917 break;
4918 }
4919 case FIELD_BLOB:
4920 {
4921 uint column_size_length= column->length - portable_sizeof_char_ptr;
4922 ulong blob_length= _ma_calc_blob_length(column_size_length,
4923 field_length_data);
4924
4925 if (!found_blob)
4926 {
4927 /* Calculate total length for all blobs */
4928 ulong blob_lengths= 0;
4929 uchar *length_data= field_length_data;
4930 MARIA_COLUMNDEF *blob_field= column;
4931
4932 found_blob= 1;
4933 for (; blob_field < end_column; blob_field++)
4934 {
4935 uint size_length;
4936 if ((record[blob_field->null_pos] & blob_field->null_bit) ||
4937 (cur_row->empty_bits[blob_field->empty_pos] &
4938 blob_field->empty_bit))
4939 continue;
4940 size_length= blob_field->length - portable_sizeof_char_ptr;
4941 blob_lengths+= _ma_calc_blob_length(size_length, length_data);
4942 length_data+= size_length;
4943 }
4944 cur_row->blob_length= blob_lengths;
4945 DBUG_PRINT("info", ("Total blob length: %lu", blob_lengths));
4946 if (_ma_alloc_buffer(&info->blob_buff, &info->blob_buff_size,
4947 blob_lengths))
4948 DBUG_RETURN(my_errno);
4949 blob_buffer= info->blob_buff;
4950 }
4951
4952 memcpy(field_pos, field_length_data, column_size_length);
4953 memcpy(field_pos + column_size_length, (uchar *) &blob_buffer,
4954 sizeof(char*));
4955 field_length_data+= column_size_length;
4956
4957 /*
4958 After we have read one extent, then each blob is in it's own extent
4959 */
4960 if (!extent.first_extent || (ulong) (end_of_data - data) < blob_length)
4961 end_of_data= data; /* Force read of next extent */
4962
4963 if (read_long_data(info, blob_buffer, blob_length, &extent, &data,
4964 &end_of_data))
4965 DBUG_RETURN(my_errno);
4966 blob_buffer+= blob_length;
4967 break;
4968 }
4969 default:
4970#ifdef EXTRA_DEBUG
4971 DBUG_ASSERT(0); /* purecov: deadcode */
4972#endif
4973 goto err;
4974 }
4975 continue;
4976 }
4977
4978 if (row_extents)
4979 {
4980 DBUG_PRINT("info", ("Row read: page_count: %u extent_count: %u",
4981 extent.page_count, extent.extent_count));
4982 *extent.tail_positions= 0; /* End marker */
4983 if (extent.page_count)
4984 goto err;
4985 if (extent.extent_count > 1)
4986 {
4987 if (_ma_check_if_zero(extent.extent + ROW_EXTENT_SIZE,
4988 (extent.extent_count-1) * ROW_EXTENT_SIZE))
4989 {
4990 DBUG_PRINT("error", ("Data in extent is not zero"));
4991 DBUG_DUMP("extent", extent.extent + ROW_EXTENT_SIZE,
4992 (extent.extent_count-1) * ROW_EXTENT_SIZE);
4993 goto err;
4994 }
4995 }
4996 }
4997 else
4998 {
4999 DBUG_PRINT("info", ("Row read"));
5000 /*
5001 data should normally point to end_of_date. The only exception is if
5002 the row is very short in which case we allocated 'min_block_length' data
5003 for allowing the row to expand.
5004 */
5005 if (data != end_of_data && (uint) (end_of_data - start_of_data) >
5006 share->base.min_block_length)
5007 goto err;
5008 }
5009#ifdef EXTRA_DEBUG
5010 if (share->calc_checksum && !info->in_check_table)
5011 {
5012 /* Esnure that row checksum is correct */
5013 DBUG_ASSERT(((share->calc_checksum)(info, record) & 255) ==
5014 cur_row->checksum);
5015 }
5016#endif
5017 info->update|= HA_STATE_AKTIV; /* We have an active record */
5018 DBUG_RETURN(0);
5019
5020err:
5021 DBUG_ASSERT(!maria_assert_if_crashed_table);
5022 /* Something was wrong with data on record */
5023 DBUG_PRINT("error", ("Found record with wrong data"));
5024 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
5025 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
5026}
5027
5028
5029/** @brief Read positions to tail blocks and full blocks
5030
5031 @fn read_row_extent_info()
5032 @param info Handler
5033
5034 @notes
5035 This function is a simpler version of _ma_read_block_record2()
5036 The data about the used pages is stored in info->cur_row.
5037
5038 @return Status
5039 @retval 0 ok
5040 @retval 1 Error. my_errno contains error number
5041*/
5042
5043static my_bool read_row_extent_info(MARIA_HA *info, uchar *buff,
5044 uint record_number)
5045{
5046 MARIA_SHARE *share= info->s;
5047 MARIA_EXTENT_CURSOR extent;
5048 MARIA_RECORD_POS *tail_pos;
5049 uchar *data, *end_of_data;
5050 uint flag, row_extents, row_extents_size;
5051 uint field_lengths __attribute__ ((unused));
5052 uchar *extents, *end;
5053 DBUG_ENTER("read_row_extent_info");
5054
5055 if (!(data= get_record_position(share, buff,
5056 record_number, &end_of_data)))
5057 DBUG_RETURN(1); /* Wrong in record */
5058
5059 flag= (uint) (uchar) data[0];
5060 /* Skip trans header */
5061 data+= total_header_size[(flag & PRECALC_HEADER_BITMASK)];
5062
5063 row_extents= 0;
5064 row_extents_size= 0;
5065 if (flag & ROW_FLAG_EXTENTS)
5066 {
5067 /*
5068 Record is split over many data pages.
5069 Get number of extents and first extent
5070 */
5071 get_key_length(row_extents, data);
5072 row_extents_size= row_extents * ROW_EXTENT_SIZE;
5073 if (info->cur_row.extents_buffer_length < row_extents_size &&
5074 _ma_alloc_buffer(&info->cur_row.extents,
5075 &info->cur_row.extents_buffer_length,
5076 row_extents_size))
5077 DBUG_RETURN(1);
5078 memcpy(info->cur_row.extents, data, ROW_EXTENT_SIZE);
5079 data+= ROW_EXTENT_SIZE;
5080 init_extent(&extent, info->cur_row.extents, row_extents,
5081 info->cur_row.tail_positions);
5082 extent.first_extent= 1;
5083 }
5084 info->cur_row.extents_count= row_extents;
5085
5086 /*
5087 field_lengths looks unused but get_key_length will
5088 increment data, which is required as data it's used later.
5089 */
5090 if (share->base.max_field_lengths)
5091 get_key_length(field_lengths, data);
5092
5093 if (share->calc_checksum)
5094 info->cur_row.checksum= (uint) (uchar) *data++;
5095 if (row_extents > 1)
5096 {
5097 data+= share->base.null_bytes;
5098 data+= share->base.pack_bytes;
5099 data+= share->base.field_offsets * FIELD_OFFSET_SIZE;
5100
5101 /*
5102 Read row extents (note that first extent was already read into
5103 info->cur_row.extents above)
5104 Lock tails with write lock as we will delete them later.
5105 */
5106 extent.lock_for_tail_pages= PAGECACHE_LOCK_LEFT_WRITELOCKED;
5107 if (read_long_data(info, info->cur_row.extents + ROW_EXTENT_SIZE,
5108 row_extents_size - ROW_EXTENT_SIZE,
5109 &extent, &data, &end_of_data))
5110 DBUG_RETURN(1);
5111 }
5112
5113 /* Update tail_positions with pointer to tails */
5114 tail_pos= info->cur_row.tail_positions;
5115 for (extents= info->cur_row.extents, end= extents + row_extents_size;
5116 extents < end;
5117 extents+= ROW_EXTENT_SIZE)
5118 {
5119 pgcache_page_no_t page= uint5korr(extents);
5120 uint page_count= uint2korr(extents + ROW_EXTENT_PAGE_SIZE);
5121 if (page_count & TAIL_BIT)
5122 *(tail_pos++)= ma_recordpos(page, (page_count & ~ (TAIL_BIT |
5123 START_EXTENT_BIT)));
5124 }
5125 *tail_pos= 0; /* End marker */
5126 DBUG_RETURN(0);
5127}
5128
5129
5130/*
5131 Read a record based on record position
5132
5133 @fn _ma_read_block_record()
5134 @param info Maria handler
5135 @param record Store record here
5136 @param record_pos Record position
5137
5138 @return Status
5139 @retval 0 ok
5140 @retval # Error number
5141*/
5142
5143int _ma_read_block_record(MARIA_HA *info, uchar *record,
5144 MARIA_RECORD_POS record_pos)
5145{
5146 MARIA_SHARE *share= info->s;
5147 uchar *data, *end_of_data, *buff;
5148 uint offset;
5149 int ret;
5150 DBUG_ENTER("_ma_read_block_record");
5151 DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
5152 (ulong) record_pos,
5153 (ulong) ma_recordpos_to_page(record_pos),
5154 ma_recordpos_to_dir_entry(record_pos)));
5155
5156 offset= ma_recordpos_to_dir_entry(record_pos);
5157
5158 if (!(buff= pagecache_read(share->pagecache,
5159 &info->dfile, ma_recordpos_to_page(record_pos), 0,
5160 info->buff, share->page_type,
5161 PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
5162 DBUG_RETURN(my_errno);
5163 DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == HEAD_PAGE);
5164 if (!(data= get_record_position(share, buff, offset, &end_of_data)))
5165 {
5166 DBUG_ASSERT(!maria_assert_if_crashed_table);
5167 DBUG_PRINT("error", ("Wrong directory entry in data block"));
5168 my_errno= HA_ERR_RECORD_DELETED; /* File crashed */
5169 DBUG_RETURN(HA_ERR_RECORD_DELETED);
5170 }
5171 ret= _ma_read_block_record2(info, record, data, end_of_data);
5172 DBUG_RETURN(ret);
5173}
5174
5175
5176/* compare unique constraint between stored rows */
5177
5178my_bool _ma_cmp_block_unique(MARIA_HA *info, MARIA_UNIQUEDEF *def,
5179 const uchar *record, MARIA_RECORD_POS pos)
5180{
5181 uchar *org_rec_buff, *old_record;
5182 size_t org_rec_buff_size;
5183 int error;
5184 DBUG_ENTER("_ma_cmp_block_unique");
5185
5186 /*
5187 Don't allocate more than 16K on the stack to ensure we don't get
5188 stack overflow.
5189 */
5190 if (!(old_record= my_safe_alloca(info->s->base.reclength)))
5191 DBUG_RETURN(1);
5192
5193 /* Don't let the compare destroy blobs that may be in use */
5194 org_rec_buff= info->rec_buff;
5195 org_rec_buff_size= info->rec_buff_size;
5196 if (info->s->base.blobs)
5197 {
5198 /* Force realloc of record buffer*/
5199 info->rec_buff= 0;
5200 info->rec_buff_size= 0;
5201 }
5202 error= _ma_read_block_record(info, old_record, pos);
5203 if (!error)
5204 error= _ma_unique_comp(def, record, old_record, def->null_are_equal);
5205 if (info->s->base.blobs)
5206 {
5207 my_free(info->rec_buff);
5208 info->rec_buff= org_rec_buff;
5209 info->rec_buff_size= org_rec_buff_size;
5210 }
5211 DBUG_PRINT("exit", ("result: %d", error));
5212 my_safe_afree(old_record, info->s->base.reclength);
5213 DBUG_RETURN(error != 0);
5214}
5215
5216
5217/****************************************************************************
5218 Table scan
5219****************************************************************************/
5220
5221/*
5222 Allocate buffers for table scan
5223
5224 SYNOPSIS
5225 _ma_scan_init_block_record(MARIA_HA *info)
5226
5227 IMPLEMENTATION
5228 We allocate one buffer for the current bitmap and one buffer for the
5229 current page
5230
5231 RETURN
5232 0 ok
5233 1 error (couldn't allocate memory or disk error)
5234*/
5235
5236my_bool _ma_scan_init_block_record(MARIA_HA *info)
5237{
5238 MARIA_SHARE *share= info->s;
5239 DBUG_ENTER("_ma_scan_init_block_record");
5240 /*
5241 bitmap_buff may already be allocated if this is the second call to
5242 rnd_init() without a rnd_end() in between, see sql/handler.h
5243 */
5244 if (!(info->scan.bitmap_buff ||
5245 ((info->scan.bitmap_buff=
5246 (uchar *) my_malloc(share->block_size * 2, MYF(MY_WME))))))
5247 DBUG_RETURN(1);
5248 info->scan.page_buff= info->scan.bitmap_buff + share->block_size;
5249 info->scan.bitmap_end= info->scan.bitmap_buff + share->bitmap.max_total_size;
5250
5251 /* Set scan variables to get _ma_scan_block() to start with reading bitmap */
5252 info->scan.number_of_rows= 0;
5253 info->scan.bitmap_pos= info->scan.bitmap_end;
5254 info->scan.bitmap_page= (pgcache_page_no_t) 0 - share->bitmap.pages_covered;
5255 info->scan.max_page= share->state.state.data_file_length / share->block_size;
5256 /*
5257 We need to flush what's in memory (bitmap.map) to page cache otherwise, as
5258 we are going to read bitmaps from page cache in table scan (see
5259 _ma_scan_block_record()), we may miss recently inserted rows (bitmap page
5260 in page cache would be too old).
5261 */
5262 DBUG_RETURN(_ma_bitmap_flush(info->s));
5263}
5264
5265
5266/* Free buffers allocated by _ma_scan_block_init() */
5267
5268void _ma_scan_end_block_record(MARIA_HA *info)
5269{
5270 DBUG_ENTER("_ma_scan_end_block_record");
5271 my_free(info->scan.bitmap_buff);
5272 info->scan.bitmap_buff= 0;
5273 if (info->scan_save)
5274 {
5275 my_free(info->scan_save);
5276 info->scan_save= 0;
5277 }
5278 DBUG_VOID_RETURN;
5279}
5280
5281
5282/**
5283 @brief Save current scan position
5284
5285 @note
5286 For the moment we can only remember one position, but this is
5287 good enough for MySQL usage
5288
5289 @return
5290 @retval 0 ok
5291 @retval HA_ERR_WRONG_IN_RECORD Could not allocate memory to hold position
5292*/
5293
5294int _ma_scan_remember_block_record(MARIA_HA *info,
5295 MARIA_RECORD_POS *lastpos)
5296{
5297 uchar *bitmap_buff;
5298 DBUG_ENTER("_ma_scan_remember_block_record");
5299 if (!(info->scan_save))
5300 {
5301 if (!(info->scan_save= my_malloc(ALIGN_SIZE(sizeof(*info->scan_save)) +
5302 info->s->block_size * 2,
5303 MYF(MY_WME))))
5304 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
5305 info->scan_save->bitmap_buff= ((uchar*) info->scan_save +
5306 ALIGN_SIZE(sizeof(*info->scan_save)));
5307 }
5308 /* For checking if pages have changed since we last read it */
5309 info->scan.row_changes= info->row_changes;
5310
5311 /* Remember used bitmap and used head page */
5312 bitmap_buff= info->scan_save->bitmap_buff;
5313 memcpy(info->scan_save, &info->scan, sizeof(*info->scan_save));
5314 info->scan_save->bitmap_buff= bitmap_buff;
5315 memcpy(bitmap_buff, info->scan.bitmap_buff, info->s->block_size * 2);
5316
5317 /* Point to the last read row */
5318 *lastpos= info->cur_row.nextpos - 1;
5319 info->scan_save->dir+= DIR_ENTRY_SIZE;
5320 DBUG_RETURN(0);
5321}
5322
5323
5324/**
5325 @brief restore scan block it's original values
5326
5327 @return
5328 0 ok
5329 # error
5330
5331 @note
5332 In theory we could swap bitmap buffers instead of copy them.
5333 For the moment we don't do that because there are variables pointing
5334 inside the buffers and it's a bit of hassle to either make them relative
5335 or repoint them.
5336
5337 If the data file has changed, we will re-read the new block record
5338 to ensure that when we continue scanning we can ignore any deleted rows.
5339*/
5340
5341int _ma_scan_restore_block_record(MARIA_HA *info,
5342 MARIA_RECORD_POS lastpos)
5343{
5344 uchar *bitmap_buff;
5345 DBUG_ENTER("_ma_scan_restore_block_record");
5346
5347 info->cur_row.nextpos= lastpos;
5348 bitmap_buff= info->scan.bitmap_buff;
5349 memcpy(&info->scan, info->scan_save, sizeof(*info->scan_save));
5350 info->scan.bitmap_buff= bitmap_buff;
5351 memcpy(bitmap_buff, info->scan_save->bitmap_buff, info->s->block_size * 2);
5352
5353 if (info->scan.row_changes != info->row_changes)
5354 {
5355 /*
5356 Table has been changed. We have to re-read the current page block as
5357 data may have changed on it that we have to see.
5358 */
5359 if (!(pagecache_read(info->s->pagecache,
5360 &info->dfile,
5361 ma_recordpos_to_page(info->scan.row_base_page),
5362 0, info->scan.page_buff,
5363 info->s->page_type,
5364 PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
5365 DBUG_RETURN(my_errno);
5366 info->scan.number_of_rows=
5367 (uint) (uchar) info->scan.page_buff[DIR_COUNT_OFFSET];
5368 info->scan.dir_end= (info->scan.page_buff + info->s->block_size -
5369 PAGE_SUFFIX_SIZE -
5370 info->scan.number_of_rows * DIR_ENTRY_SIZE);
5371 }
5372 DBUG_RETURN(0);
5373}
5374
5375
5376/*
5377 Read next record while scanning table
5378
5379 SYNOPSIS
5380 _ma_scan_block_record()
5381 info Maria handler
5382 record Store found here
5383 record_pos Value stored in info->cur_row.next_pos after last call
5384 This is offset inside the current pagebuff
5385 skip_deleted
5386
5387 NOTES
5388 - One must have called mi_scan() before this
5389 - In this version, we don't actually need record_pos, we as easily
5390 use a variable in info->scan
5391
5392 IMPLEMENTATION
5393 Current code uses a lot of goto's to separate the different kind of
5394 states we may be in. This gives us a minimum of executed if's for
5395 the normal cases. I tried several different ways to code this, but
5396 the current one was in the end the most readable and fastest.
5397
5398 RETURN
5399 0 ok
5400 # Error code (Normally HA_ERR_END_OF_FILE)
5401*/
5402
5403int _ma_scan_block_record(MARIA_HA *info, uchar *record,
5404 MARIA_RECORD_POS record_pos,
5405 my_bool skip_deleted __attribute__ ((unused)))
5406{
5407 uint block_size;
5408 MARIA_SHARE *share= info->s;
5409 DBUG_ENTER("_ma_scan_block_record");
5410
5411restart_record_read:
5412 /* Find next row in current page */
5413 while (likely(record_pos < info->scan.number_of_rows))
5414 {
5415 uint length, offset;
5416 uchar *data, *end_of_data;
5417 int error;
5418
5419 /* Ensure that scan.dir and record_pos are in sync */
5420 DBUG_ASSERT(info->scan.dir == dir_entry_pos(info->scan.page_buff,
5421 share->block_size,
5422 (uint) record_pos));
5423
5424 /* Search for a valid directory entry (not 0) */
5425 while (!(offset= uint2korr(info->scan.dir)))
5426 {
5427 info->scan.dir-= DIR_ENTRY_SIZE;
5428 record_pos++;
5429#ifdef SANITY_CHECKS
5430 if (info->scan.dir < info->scan.dir_end)
5431 {
5432 DBUG_ASSERT(!maria_assert_if_crashed_table);
5433 goto err;
5434 }
5435#endif
5436 }
5437 /*
5438 This should always be true as the directory should always start with
5439 a valid entry.
5440 */
5441 DBUG_ASSERT(info->scan.dir >= info->scan.dir_end);
5442
5443 /* found row */
5444 info->cur_row.lastpos= info->scan.row_base_page + record_pos;
5445 info->cur_row.nextpos= record_pos + 1;
5446 data= info->scan.page_buff + offset;
5447 length= uint2korr(info->scan.dir + 2);
5448 end_of_data= data + length;
5449 info->scan.dir-= DIR_ENTRY_SIZE; /* Point to next row to process */
5450#ifdef SANITY_CHECKS
5451 if (end_of_data > info->scan.dir_end ||
5452 offset < PAGE_HEADER_SIZE(share) ||
5453 length < share->base.min_block_length)
5454 {
5455 DBUG_ASSERT(!(end_of_data > info->scan.dir_end));
5456 DBUG_ASSERT(!(offset < PAGE_HEADER_SIZE(share)));
5457 DBUG_ASSERT(!(length < share->base.min_block_length));
5458 goto err;
5459 }
5460#endif
5461 DBUG_PRINT("info", ("rowid: %lu", (ulong) info->cur_row.lastpos));
5462 error= _ma_read_block_record2(info, record, data, end_of_data);
5463 if (error != HA_ERR_ROW_NOT_VISIBLE)
5464 DBUG_RETURN(error);
5465 record_pos++;
5466 }
5467
5468 /* Find next head page in current bitmap */
5469restart_bitmap_scan:
5470 block_size= share->block_size;
5471 if (likely(info->scan.bitmap_pos < info->scan.bitmap_end))
5472 {
5473 uchar *data= info->scan.bitmap_pos;
5474 longlong bits= info->scan.bits;
5475 uint bit_pos= info->scan.bit_pos;
5476
5477 do
5478 {
5479 while (likely(bits))
5480 {
5481 uint pattern= (uint) (bits & 7);
5482 bits >>= 3;
5483 bit_pos++;
5484 if (pattern > 0 && pattern <= 4)
5485 {
5486 /* Found head page; Read it */
5487 pgcache_page_no_t page;
5488 info->scan.bitmap_pos= data;
5489 info->scan.bits= bits;
5490 info->scan.bit_pos= bit_pos;
5491 page= (info->scan.bitmap_page + 1 +
5492 (data - info->scan.bitmap_buff) / 6 * 16 + bit_pos - 1);
5493 info->scan.row_base_page= ma_recordpos(page, 0);
5494 if (page >= info->scan.max_page)
5495 {
5496 DBUG_PRINT("info", ("Found end of file"));
5497 DBUG_RETURN((my_errno= HA_ERR_END_OF_FILE));
5498 }
5499 if (!(pagecache_read(share->pagecache,
5500 &info->dfile,
5501 page, 0, info->scan.page_buff,
5502 share->page_type,
5503 PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
5504 DBUG_RETURN(my_errno);
5505 if (((info->scan.page_buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) !=
5506 HEAD_PAGE))
5507 {
5508 /*
5509 This may happen if someone has been deleting all rows
5510 from a page since we read the bitmap, so it may be ok.
5511 Print warning in debug log and continue.
5512 */
5513 DBUG_PRINT("warning",
5514 ("Found page of type %d when expecting head page",
5515 (info->scan.page_buff[PAGE_TYPE_OFFSET] &
5516 PAGE_TYPE_MASK)));
5517 continue;
5518 }
5519 if ((info->scan.number_of_rows=
5520 (uint) (uchar) info->scan.page_buff[DIR_COUNT_OFFSET]) == 0)
5521 {
5522 DBUG_PRINT("error", ("Wrong page header"));
5523 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
5524 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
5525 }
5526 DBUG_PRINT("info", ("Page %lu has %u rows",
5527 (ulong) page, info->scan.number_of_rows));
5528 info->scan.dir= (info->scan.page_buff + block_size -
5529 PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE);
5530 info->scan.dir_end= (info->scan.dir -
5531 (info->scan.number_of_rows - 1) *
5532 DIR_ENTRY_SIZE);
5533 record_pos= 0;
5534 goto restart_record_read;
5535 }
5536 }
5537 for (data+= 6; data < info->scan.bitmap_end; data+= 6)
5538 {
5539 bits= uint6korr(data);
5540 /* Skip not allocated pages and blob / full tail pages */
5541 if (bits && bits != 07777777777777777LL)
5542 break;
5543 }
5544 bit_pos= 0;
5545 } while (data < info->scan.bitmap_end);
5546 }
5547
5548 /* Read next bitmap */
5549 info->scan.bitmap_page+= share->bitmap.pages_covered;
5550 if (unlikely(info->scan.bitmap_page >= info->scan.max_page))
5551 {
5552 DBUG_PRINT("info", ("Found end of file"));
5553 DBUG_RETURN((my_errno= HA_ERR_END_OF_FILE));
5554 }
5555 DBUG_PRINT("info", ("Reading bitmap at %lu",
5556 (ulong) info->scan.bitmap_page));
5557 if (!(pagecache_read(share->pagecache, &info->s->bitmap.file,
5558 info->scan.bitmap_page,
5559 0, info->scan.bitmap_buff, PAGECACHE_PLAIN_PAGE,
5560 PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
5561 DBUG_RETURN(my_errno);
5562 /* Skip scanning 'bits' in bitmap scan code */
5563 info->scan.bitmap_pos= info->scan.bitmap_buff - 6;
5564 info->scan.bits= 0;
5565 goto restart_bitmap_scan;
5566
5567err:
5568 DBUG_ASSERT(!maria_assert_if_crashed_table);
5569 DBUG_PRINT("error", ("Wrong data on page"));
5570 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
5571 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
5572}
5573
5574
5575/*
5576 Compare a row against a stored one
5577
5578 NOTES
5579 Not implemented, as block record is not supposed to be used in a shared
5580 global environment
5581*/
5582
5583my_bool _ma_compare_block_record(MARIA_HA *info __attribute__ ((unused)),
5584 const uchar *record __attribute__ ((unused)))
5585{
5586 return 0;
5587}
5588
5589
5590/*
5591 Store an integer with simple packing
5592
5593 SYNOPSIS
5594 ma_store_integer()
5595 to Store the packed integer here
5596 nr Integer to store
5597
5598 NOTES
5599 This is mostly used to store field numbers and lengths of strings.
5600 We have to cast the result for the LL() becasue of a bug in Forte CC
5601 compiler.
5602
5603 Packing used is:
5604 nr < 251 is stored as is (in 1 byte)
5605 Numbers that require 1-4 bytes are stored as char(250+byte_length), data
5606 Bigger numbers are stored as 255, data as ulonglong (not yet done).
5607
5608 RETURN
5609 Position in 'to' after the packed length
5610*/
5611
5612uchar *ma_store_length(uchar *to, ulong nr)
5613{
5614 if (nr < 251)
5615 {
5616 *to=(uchar) nr;
5617 return to+1;
5618 }
5619 if (nr < 65536)
5620 {
5621 if (nr <= 255)
5622 {
5623 to[0]= (uchar) 251;
5624 to[1]= (uchar) nr;
5625 return to+2;
5626 }
5627 to[0]= (uchar) 252;
5628 int2store(to+1, nr);
5629 return to+3;
5630 }
5631 if (nr < 16777216)
5632 {
5633 *to++= (uchar) 253;
5634 int3store(to, nr);
5635 return to+3;
5636 }
5637 *to++= (uchar) 254;
5638 int4store(to, nr);
5639 return to+4;
5640}
5641
5642
5643/* Calculate how many bytes needed to store a number */
5644
5645uint ma_calc_length_for_store_length(ulong nr)
5646{
5647 if (nr < 251)
5648 return 1;
5649 if (nr < 65536)
5650 {
5651 if (nr <= 255)
5652 return 2;
5653 return 3;
5654 }
5655 if (nr < 16777216)
5656 return 4;
5657 return 5;
5658}
5659
5660
5661/* Retrive a stored number */
5662
5663static ulong ma_get_length(const uchar **packet)
5664{
5665 reg1 const uchar *pos= *packet;
5666 if (*pos < 251)
5667 {
5668 (*packet)++;
5669 return (ulong) *pos;
5670 }
5671 if (*pos == 251)
5672 {
5673 (*packet)+= 2;
5674 return (ulong) pos[1];
5675 }
5676 if (*pos == 252)
5677 {
5678 (*packet)+= 3;
5679 return (ulong) uint2korr(pos+1);
5680 }
5681 if (*pos == 253)
5682 {
5683 (*packet)+= 4;
5684 return (ulong) uint3korr(pos+1);
5685 }
5686 DBUG_ASSERT(*pos == 254);
5687 (*packet)+= 5;
5688 return (ulong) uint4korr(pos+1);
5689}
5690
5691
5692/*
5693 Fill array with pointers to field parts to be stored in log for insert
5694
5695 SYNOPSIS
5696 fill_insert_undo_parts()
5697 info Maria handler
5698 record Inserted row
5699 log_parts Store pointers to changed memory areas here
5700 log_parts_count See RETURN
5701
5702 NOTES
5703 We have information in info->cur_row about the read row.
5704
5705 RETURN
5706 length of data in log_parts.
5707 log_parts_count contains number of used log_parts
5708*/
5709
5710static size_t fill_insert_undo_parts(MARIA_HA *info, const uchar *record,
5711 LEX_CUSTRING *log_parts,
5712 uint *log_parts_count)
5713{
5714 MARIA_SHARE *share= info->s;
5715 MARIA_COLUMNDEF *column, *end_column;
5716 uchar *field_lengths= info->cur_row.field_lengths;
5717 size_t row_length;
5718 MARIA_ROW *cur_row= &info->cur_row;
5719 LEX_CUSTRING *start_log_parts;
5720 DBUG_ENTER("fill_insert_undo_parts");
5721
5722 start_log_parts= log_parts;
5723
5724 /* Store null bits */
5725 log_parts->str= record;
5726 log_parts->length= share->base.null_bytes;
5727 row_length= log_parts->length;
5728 log_parts++;
5729
5730 /* Stored bitmap over packed (zero length or all-zero fields) */
5731 log_parts->str= info->cur_row.empty_bits;
5732 log_parts->length= share->base.pack_bytes;
5733 row_length+= log_parts->length;
5734 log_parts++;
5735
5736 if (share->base.max_field_lengths)
5737 {
5738 /* Store length of all not empty char, varchar and blob fields */
5739 log_parts->str= field_lengths - 2;
5740 log_parts->length= info->cur_row.field_lengths_length+2;
5741 int2store(log_parts->str, info->cur_row.field_lengths_length);
5742 row_length+= log_parts->length;
5743 log_parts++;
5744 }
5745
5746 if (share->base.blobs)
5747 {
5748 /*
5749 Store total blob length to make buffer allocation easier during UNDO
5750 */
5751 log_parts->str= info->length_buff;
5752 log_parts->length= (uint) (ma_store_length(info->length_buff,
5753 info->cur_row.blob_length) -
5754 (uchar*) log_parts->str);
5755 row_length+= log_parts->length;
5756 log_parts++;
5757 }
5758
5759 /* Handle constant length fields that are always present */
5760 for (column= share->columndef,
5761 end_column= column+ share->base.fixed_not_null_fields;
5762 column < end_column;
5763 column++)
5764 {
5765 log_parts->str= record + column->offset;
5766 log_parts->length= column->length;
5767 row_length+= log_parts->length;
5768 log_parts++;
5769 }
5770
5771 /* Handle NULL fields and CHAR/VARCHAR fields */
5772 for (end_column= share->columndef + share->base.fields - share->base.blobs;
5773 column < end_column;
5774 column++)
5775 {
5776 const uchar *column_pos;
5777 size_t column_length;
5778 if ((record[column->null_pos] & column->null_bit) ||
5779 cur_row->empty_bits[column->empty_pos] & column->empty_bit)
5780 continue;
5781
5782 column_pos= record+ column->offset;
5783 column_length= column->length;
5784
5785 switch (column->type) {
5786 case FIELD_CHECK:
5787 case FIELD_NORMAL: /* Fixed length field */
5788 case FIELD_ZERO:
5789 case FIELD_SKIP_PRESPACE: /* Not packed */
5790 case FIELD_SKIP_ZERO: /* Fixed length field */
5791 break;
5792 case FIELD_SKIP_ENDSPACE: /* CHAR */
5793 {
5794 if (column->length <= 255)
5795 column_length= *field_lengths++;
5796 else
5797 {
5798 column_length= uint2korr(field_lengths);
5799 field_lengths+= 2;
5800 }
5801 break;
5802 }
5803 case FIELD_VARCHAR:
5804 {
5805 if (column->fill_length == 1)
5806 column_length= *field_lengths;
5807 else
5808 column_length= uint2korr(field_lengths);
5809 field_lengths+= column->fill_length;
5810 column_pos+= column->fill_length;
5811 break;
5812 }
5813 default:
5814 DBUG_ASSERT(0);
5815 }
5816 log_parts->str= column_pos;
5817 log_parts->length= column_length;
5818 row_length+= log_parts->length;
5819 log_parts++;
5820 }
5821
5822 /* Add blobs */
5823 for (end_column+= share->base.blobs; column < end_column; column++)
5824 {
5825 const uchar *field_pos= record + column->offset;
5826 uint size_length= column->length - portable_sizeof_char_ptr;
5827 ulong blob_length= _ma_calc_blob_length(size_length, field_pos);
5828
5829 /*
5830 We don't have to check for null, as blob_length is guranteed to be 0
5831 if the blob is null
5832 */
5833 if (blob_length)
5834 {
5835 uchar *blob_pos;
5836 memcpy(&blob_pos, record + column->offset + size_length,
5837 sizeof(blob_pos));
5838 log_parts->str= blob_pos;
5839 log_parts->length= blob_length;
5840 row_length+= log_parts->length;
5841 log_parts++;
5842 }
5843 }
5844 *log_parts_count= (uint) (log_parts - start_log_parts);
5845 DBUG_RETURN(row_length);
5846}
5847
5848
5849/*
5850 Fill array with pointers to field parts to be stored in log for update
5851
5852 SYNOPSIS
5853 fill_update_undo_parts()
5854 info Maria handler
5855 oldrec Original row
5856 newrec New row
5857 log_parts Store pointers to changed memory areas here
5858 log_parts_count See RETURN
5859
5860 IMPLEMENTATION
5861 Format of undo record:
5862
5863 Fields are stored in same order as the field array.
5864
5865 Offset to changed field data (packed)
5866
5867 For each changed field
5868 Fieldnumber (packed)
5869 Length, if variable length field (packed)
5870
5871 For each changed field
5872 Data
5873
5874 Packing is using ma_store_integer()
5875
5876 The reason we store field numbers & length separated from data (ie, not
5877 after each other) is to get better cpu caching when we loop over
5878 fields (as we probably don't have to access data for each field when we
5879 want to read and old row through the undo log record).
5880
5881 As a special case, we use '255' for the field number of the null bitmap.
5882
5883 RETURN
5884 length of data in log_parts.
5885 log_parts_count contains number of used log_parts
5886*/
5887
5888static size_t fill_update_undo_parts(MARIA_HA *info, const uchar *oldrec,
5889 const uchar *newrec,
5890 LEX_CUSTRING *log_parts,
5891 uint *log_parts_count)
5892{
5893 MARIA_SHARE *share= info->s;
5894 MARIA_COLUMNDEF *column, *end_column;
5895 MARIA_ROW *old_row= &info->cur_row, *new_row= &info->new_row;
5896 uchar *field_data, *start_field_data, *length_str;
5897 uchar *old_field_lengths= old_row->field_lengths;
5898 uchar *new_field_lengths= new_row->field_lengths;
5899 size_t row_length= 0;
5900 uint field_lengths;
5901 LEX_CUSTRING *start_log_parts;
5902 my_bool new_column_is_empty;
5903 DBUG_ENTER("fill_update_undo_parts");
5904
5905 start_log_parts= log_parts;
5906
5907 /*
5908 First log part is for number of fields, field numbers and lengths
5909 The +4 is to reserve place for the number of changed fields.
5910 */
5911 start_field_data= field_data= info->update_field_data + 4;
5912 log_parts++;
5913
5914 if (memcmp(oldrec, newrec, share->base.null_bytes))
5915 {
5916 /* Store changed null bits */
5917 *field_data++= (uchar) 255; /* Special case */
5918 log_parts->str= oldrec;
5919 log_parts->length= share->base.null_bytes;
5920 row_length= log_parts->length;
5921 log_parts++;
5922 }
5923
5924 /* Handle constant length fields */
5925 for (column= share->columndef,
5926 end_column= column+ share->base.fixed_not_null_fields;
5927 column < end_column;
5928 column++)
5929 {
5930 if (memcmp(oldrec + column->offset, newrec + column->offset,
5931 column->length))
5932 {
5933 field_data= ma_store_length(field_data,
5934 (uint) (column - share->columndef));
5935 log_parts->str= oldrec + column->offset;
5936 log_parts->length= column->length;
5937 row_length+= column->length;
5938 log_parts++;
5939 }
5940 }
5941
5942 /* Handle the rest: NULL fields and CHAR/VARCHAR fields and BLOB's */
5943 for (end_column= share->columndef + share->base.fields;
5944 column < end_column;
5945 column++)
5946 {
5947 const uchar *new_column_pos, *old_column_pos;
5948 size_t new_column_length, old_column_length;
5949
5950 /* First check if old column is null or empty */
5951 if (oldrec[column->null_pos] & column->null_bit)
5952 {
5953 /*
5954 It's safe to skip this one as either the new column is also null
5955 (no change) or the new_column is not null, in which case the null-bit
5956 maps differed and we have already stored the null bitmap.
5957 */
5958 continue;
5959 }
5960 if (old_row->empty_bits[column->empty_pos] & column->empty_bit)
5961 {
5962 if (new_row->empty_bits[column->empty_pos] & column->empty_bit)
5963 continue; /* Both are empty; skip */
5964
5965 /* Store null length column */
5966 field_data= ma_store_length(field_data,
5967 (uint) (column - share->columndef));
5968 field_data= ma_store_length(field_data, 0);
5969 continue;
5970 }
5971 /*
5972 Remember if the 'new' value is empty (as in this case we must always
5973 log the original value
5974 */
5975 new_column_is_empty= ((newrec[column->null_pos] & column->null_bit) ||
5976 (new_row->empty_bits[column->empty_pos] &
5977 column->empty_bit));
5978
5979 old_column_pos= oldrec + column->offset;
5980 new_column_pos= newrec + column->offset;
5981 old_column_length= new_column_length= column->length;
5982
5983 switch (column->type) {
5984 case FIELD_CHECK:
5985 case FIELD_NORMAL: /* Fixed length field */
5986 case FIELD_ZERO:
5987 case FIELD_SKIP_PRESPACE: /* Not packed */
5988 case FIELD_SKIP_ZERO: /* Fixed length field */
5989 break;
5990 case FIELD_VARCHAR:
5991 new_column_length--; /* Skip length prefix */
5992 old_column_pos+= column->fill_length;
5993 new_column_pos+= column->fill_length;
5994 /* Fall through */
5995 case FIELD_SKIP_ENDSPACE: /* CHAR */
5996 {
5997 if (new_column_length <= 255)
5998 {
5999 old_column_length= *old_field_lengths++;
6000 if (!new_column_is_empty)
6001 new_column_length= *new_field_lengths++;
6002 }
6003 else
6004 {
6005 old_column_length= uint2korr(old_field_lengths);
6006 old_field_lengths+= 2;
6007 if (!new_column_is_empty)
6008 {
6009 new_column_length= uint2korr(new_field_lengths);
6010 new_field_lengths+= 2;
6011 }
6012 }
6013 break;
6014 }
6015 case FIELD_BLOB:
6016 {
6017 uint size_length= column->length - portable_sizeof_char_ptr;
6018 old_column_length= _ma_calc_blob_length(size_length, old_column_pos);
6019 memcpy((void*) &old_column_pos, oldrec + column->offset + size_length,
6020 sizeof(old_column_pos));
6021 if (!new_column_is_empty)
6022 {
6023 new_column_length= _ma_calc_blob_length(size_length, new_column_pos);
6024 memcpy((void*) &new_column_pos, newrec + column->offset + size_length,
6025 sizeof(old_column_pos));
6026 }
6027 break;
6028 }
6029 default:
6030 DBUG_ASSERT(0);
6031 }
6032
6033 if (new_column_is_empty || new_column_length != old_column_length ||
6034 memcmp(old_column_pos, new_column_pos, new_column_length))
6035 {
6036 field_data= ma_store_length(field_data,
6037 (ulong) (column - share->columndef));
6038 field_data= ma_store_length(field_data, (ulong) old_column_length);
6039
6040 log_parts->str= old_column_pos;
6041 log_parts->length= old_column_length;
6042 row_length+= old_column_length;
6043 log_parts++;
6044 }
6045 }
6046
6047 *log_parts_count= (uint) (log_parts - start_log_parts);
6048
6049 /* Store length of field length data before the field/field_lengths */
6050 field_lengths= (uint) (field_data - start_field_data);
6051 length_str= start_field_data - ma_calc_length_for_store_length(field_lengths);
6052 start_log_parts->str= length_str;
6053 ma_store_length(length_str, field_lengths);
6054 start_log_parts->length= (size_t) (field_data - start_log_parts->str);
6055 row_length+= start_log_parts->length;
6056 DBUG_RETURN(row_length);
6057}
6058
6059/***************************************************************************
6060 In-write hooks called under log's lock when log record is written
6061***************************************************************************/
6062
6063/**
6064 @brief Sets transaction's rec_lsn if needed
6065
6066 A transaction sometimes writes a REDO even before the page is in the
6067 pagecache (example: brand new head or tail pages; full pages). So, if
6068 Checkpoint happens just after the REDO write, it needs to know that the
6069 REDO phase must start before this REDO. Scanning the pagecache cannot
6070 tell that as the page is not in the cache. So, transaction sets its rec_lsn
6071 to the REDO's LSN or somewhere before, and Checkpoint reads the
6072 transaction's rec_lsn.
6073
6074 @return Operation status, always 0 (success)
6075*/
6076
6077my_bool write_hook_for_redo(enum translog_record_type type
6078 __attribute__ ((unused)),
6079 TRN *trn, MARIA_HA *tbl_info
6080 __attribute__ ((unused)),
6081 LSN *lsn, void *hook_arg
6082 __attribute__ ((unused)))
6083{
6084 /*
6085 Users of dummy_transaction_object must keep this TRN clean as it
6086 is used by many threads (like those manipulating non-transactional
6087 tables). It might be dangerous if one user sets rec_lsn or some other
6088 member and it is picked up by another user (like putting this rec_lsn into
6089 a page of a non-transactional table); it's safer if all members stay 0. So
6090 non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
6091 call this hook; we trust them but verify ;)
6092 */
6093 DBUG_ASSERT(trn->trid != 0);
6094 /*
6095 If the hook stays so simple, it would be faster to pass
6096 !trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
6097 to translog_write_record(), like Monty did in his original code, and not
6098 have a hook. For now we keep it like this.
6099 */
6100 if (trn->rec_lsn == 0)
6101 trn->rec_lsn= *lsn;
6102 return 0;
6103}
6104
6105
6106/**
6107 @brief Sets transaction's undo_lsn, first_undo_lsn if needed
6108
6109 @return Operation status, always 0 (success)
6110*/
6111
6112my_bool write_hook_for_undo(enum translog_record_type type
6113 __attribute__ ((unused)),
6114 TRN *trn, MARIA_HA *tbl_info
6115 __attribute__ ((unused)),
6116 LSN *lsn, void *hook_arg
6117 __attribute__ ((unused)))
6118{
6119 DBUG_ASSERT(trn->trid != 0);
6120 trn->undo_lsn= *lsn;
6121 if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
6122 trn->first_undo_lsn=
6123 trn->undo_lsn | LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
6124 return 0;
6125 /*
6126 when we implement purging, we will specialize this hook: UNDO_PURGE
6127 records will additionally set trn->undo_purge_lsn
6128 */
6129}
6130
6131
6132/**
6133 @brief Sets the table's records count and checksum and others to 0, then
6134 calls the generic REDO hook.
6135
6136 @return Operation status, always 0 (success)
6137*/
6138
6139my_bool write_hook_for_redo_delete_all(enum translog_record_type type
6140 __attribute__ ((unused)),
6141 TRN *trn, MARIA_HA *tbl_info
6142 __attribute__ ((unused)),
6143 LSN *lsn, void *hook_arg)
6144{
6145 _ma_reset_status(tbl_info);
6146 return write_hook_for_redo(type, trn, tbl_info, lsn, hook_arg);
6147}
6148
6149
6150/**
6151 @brief Updates "records" and "checksum" and calls the generic UNDO hook
6152
6153 @return Operation status, always 0 (success)
6154*/
6155
6156my_bool write_hook_for_undo_row_insert(enum translog_record_type type
6157 __attribute__ ((unused)),
6158 TRN *trn, MARIA_HA *tbl_info,
6159 LSN *lsn, void *hook_arg)
6160{
6161 MARIA_SHARE *share= tbl_info->s;
6162 share->state.state.records++;
6163 share->state.state.checksum+= *(ha_checksum *)hook_arg;
6164 return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
6165}
6166
6167
6168/**
6169 @brief Updates "records" and calls the generic UNDO hook
6170
6171 @return Operation status, always 0 (success)
6172*/
6173
6174my_bool write_hook_for_undo_row_delete(enum translog_record_type type
6175 __attribute__ ((unused)),
6176 TRN *trn, MARIA_HA *tbl_info,
6177 LSN *lsn, void *hook_arg)
6178{
6179 MARIA_SHARE *share= tbl_info->s;
6180 share->state.state.records--;
6181 share->state.state.checksum+= *(ha_checksum *)hook_arg;
6182 return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
6183}
6184
6185
6186/**
6187 @brief Upates "records" and "checksum" and calls the generic UNDO hook
6188
6189 @return Operation status, always 0 (success)
6190*/
6191
6192my_bool write_hook_for_undo_row_update(enum translog_record_type type
6193 __attribute__ ((unused)),
6194 TRN *trn, MARIA_HA *tbl_info,
6195 LSN *lsn, void *hook_arg)
6196{
6197 MARIA_SHARE *share= tbl_info->s;
6198 share->state.state.checksum+= *(ha_checksum *)hook_arg;
6199 return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
6200}
6201
6202
6203my_bool write_hook_for_undo_bulk_insert(enum translog_record_type type
6204 __attribute__ ((unused)),
6205 TRN *trn, MARIA_HA *tbl_info,
6206 LSN *lsn, void *hook_arg)
6207{
6208 /*
6209 We are going to call maria_delete_all_rows(), but without logging and
6210 syncing, as an optimization (if we crash before commit, the UNDO will
6211 empty; if we crash after commit, we have flushed and forced the files).
6212 Status still needs to be reset under log mutex, in case of a concurrent
6213 checkpoint.
6214 */
6215 _ma_reset_status(tbl_info);
6216 return write_hook_for_undo(type, trn, tbl_info, lsn, hook_arg);
6217}
6218
6219
6220/**
6221 @brief Updates table's lsn_of_file_id.
6222
6223 @return Operation status, always 0 (success)
6224*/
6225
6226my_bool write_hook_for_file_id(enum translog_record_type type
6227 __attribute__ ((unused)),
6228 TRN *trn
6229 __attribute__ ((unused)),
6230 MARIA_HA *tbl_info,
6231 LSN *lsn,
6232 void *hook_arg
6233 __attribute__ ((unused)))
6234{
6235 DBUG_ASSERT(cmp_translog_addr(tbl_info->s->lsn_of_file_id, *lsn) < 0);
6236 tbl_info->s->lsn_of_file_id= *lsn;
6237 return 0;
6238}
6239
6240
6241/**
6242 Updates transaction's rec_lsn when committing.
6243
6244 A transaction writes its commit record before being committed in trnman, so
6245 if Checkpoint happens just between the COMMIT record log write and the
6246 commit in trnman, it will record that transaction is not committed. Assume
6247 the transaction (trn1) did an INSERT; after the checkpoint, a second
6248 transaction (trn2) does a DELETE of what trn1 has inserted. Then crash,
6249 Checkpoint record says that trn1 was not committed, and REDO phase starts
6250 from Checkpoint record's LSN. So it will not find the COMMIT record of
6251 trn1, will want to roll back trn1, which will fail because the row/key
6252 which it wants to delete does not exist anymore.
6253 To avoid this, Checkpoint needs to know that the REDO phase must start
6254 before this COMMIT record, so transaction sets its rec_lsn to the COMMIT's
6255 record LSN, and as Checkpoint reads the transaction's rec_lsn, Checkpoint
6256 will know.
6257
6258 @note so after commit trn->rec_lsn is a "commit LSN", which could be of
6259 use later.
6260
6261 @return Operation status, always 0 (success)
6262*/
6263
6264my_bool write_hook_for_commit(enum translog_record_type type
6265 __attribute__ ((unused)),
6266 TRN *trn,
6267 MARIA_HA *tbl_info
6268 __attribute__ ((unused)),
6269 LSN *lsn,
6270 void *hook_arg
6271 __attribute__ ((unused)))
6272{
6273 trn->rec_lsn= *lsn;
6274 return 0;
6275}
6276
6277
6278/***************************************************************************
6279 Applying of REDO log records
6280***************************************************************************/
6281
6282/*
6283 Apply changes to head and tail pages
6284
6285 SYNOPSIS
6286 _ma_apply_redo_insert_row_head_or_tail()
6287 info Maria handler
6288 lsn LSN to put on page
6289 page_type HEAD_PAGE or TAIL_PAGE
6290 new_page True if this is first entry on page
6291 header Header (without FILEID)
6292 data Data to be put on page
6293 data_length Length of data
6294
6295 NOTE
6296 Handles LOGREC_REDO_INSERT_ROW_HEAD, LOGREC_REDO_INSERT_ROW_TAIL
6297 LOGREC_REDO_NEW_ROW_HEAD and LOGREC_REDO_NEW_ROW_TAIL
6298
6299 RETURN
6300 0 ok
6301 # Error number
6302*/
6303
6304uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
6305 uint page_type,
6306 my_bool new_page,
6307 const uchar *header,
6308 const uchar *data,
6309 size_t data_length)
6310{
6311 MARIA_SHARE *share= info->s;
6312 pgcache_page_no_t page;
6313 uint rownr, empty_space;
6314 uint block_size= share->block_size;
6315 uint rec_offset;
6316 uchar *buff, *dir;
6317 uint result;
6318 MARIA_PINNED_PAGE page_link;
6319 enum pagecache_page_lock lock_method;
6320 enum pagecache_page_pin pin_method;
6321 my_off_t end_of_page;
6322 uint error;
6323 DBUG_ENTER("_ma_apply_redo_insert_row_head_or_tail");
6324
6325 page= page_korr(header);
6326 rownr= dirpos_korr(header + PAGE_STORE_SIZE);
6327
6328 DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u data_length: %u",
6329 (ulong) ma_recordpos(page, rownr),
6330 (ulong) page, rownr, (uint) data_length));
6331
6332 share->state.changed|= (STATE_CHANGED | STATE_NOT_ZEROFILLED |
6333 STATE_NOT_MOVABLE);
6334
6335 end_of_page= (page + 1) * share->block_size;
6336 if (end_of_page > share->state.state.data_file_length)
6337 {
6338 DBUG_PRINT("info", ("Enlarging data file from %lu to %lu",
6339 (ulong) share->state.state.data_file_length,
6340 (ulong) end_of_page));
6341 /*
6342 New page at end of file. Note that the test above is also positive if
6343 data_file_length is not a multiple of block_size (system crashed while
6344 writing the last page): in this case we just extend the last page and
6345 fill it entirely with zeroes, then the REDO will put correct data on
6346 it.
6347 */
6348 lock_method= PAGECACHE_LOCK_WRITE;
6349 pin_method= PAGECACHE_PIN;
6350
6351 DBUG_ASSERT(rownr == 0 && new_page);
6352 if (rownr != 0 || !new_page)
6353 goto crashed_file;
6354
6355 buff= info->keyread_buff;
6356 info->keyread_buff_used= 1;
6357 make_empty_page(info, buff, page_type, 1);
6358 empty_space= (block_size - PAGE_OVERHEAD_SIZE(share));
6359 rec_offset= PAGE_HEADER_SIZE(share);
6360 dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
6361 }
6362 else
6363 {
6364 lock_method= PAGECACHE_LOCK_LEFT_WRITELOCKED;
6365 pin_method= PAGECACHE_PIN_LEFT_PINNED;
6366
6367 share->pagecache->readwrite_flags&= ~MY_WME;
6368 buff= pagecache_read(share->pagecache, &info->dfile,
6369 page, 0, 0,
6370 PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
6371 &page_link.link);
6372 share->pagecache->readwrite_flags= share->pagecache->org_readwrite_flags;
6373 if (!buff)
6374 {
6375 /* Skip errors when reading outside of file and uninitialized pages */
6376 if (!new_page || (my_errno != HA_ERR_FILE_TOO_SHORT &&
6377 my_errno != HA_ERR_WRONG_CRC))
6378 {
6379 DBUG_PRINT("error", ("Error %d when reading page", (int) my_errno));
6380 goto err;
6381 }
6382 /* Create new page */
6383 buff= pagecache_block_link_to_buffer(page_link.link);
6384 buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
6385 }
6386 else if (lsn_korr(buff) >= lsn) /* Test if already applied */
6387 {
6388 /* Fix bitmap, just in case */
6389 empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
6390 if (!enough_free_entries_on_page(share, buff))
6391 empty_space= 0; /* Page is full */
6392
6393 if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
6394 goto err;
6395 pagecache_unlock_by_link(share->pagecache, page_link.link,
6396 PAGECACHE_LOCK_WRITE_UNLOCK,
6397 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6398 LSN_IMPOSSIBLE, 0, FALSE);
6399 DBUG_RETURN(0);
6400 }
6401
6402 if (((uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type))
6403 {
6404 /*
6405 This is a page that has been freed before and now should be
6406 changed to new type.
6407 */
6408 if (!new_page)
6409 {
6410 DBUG_PRINT("error",
6411 ("Found page of wrong type: %u, should have been %u",
6412 (uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK),
6413 page_type));
6414 goto crashed_file;
6415 }
6416 make_empty_page(info, buff, page_type, 0);
6417 empty_space= block_size - PAGE_HEADER_SIZE(share) - PAGE_SUFFIX_SIZE;
6418 (void) extend_directory(info, buff, block_size, 0, rownr, &empty_space,
6419 page_type == HEAD_PAGE);
6420 rec_offset= PAGE_HEADER_SIZE(share);
6421 dir= dir_entry_pos(buff, block_size, rownr);
6422 empty_space+= uint2korr(dir+2);
6423 }
6424 else
6425 {
6426 uint max_entry= (uint) buff[DIR_COUNT_OFFSET];
6427 uint length;
6428
6429 DBUG_ASSERT(!new_page);
6430 dir= dir_entry_pos(buff, block_size, rownr);
6431 empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
6432
6433 if (max_entry <= rownr)
6434 {
6435 /* Add directory entry first in directory and data last on page */
6436 if (extend_directory(info, buff, block_size, max_entry, rownr,
6437 &empty_space, page_type == HEAD_PAGE))
6438 goto crashed_file;
6439 }
6440 if (extend_area_on_page(info, buff, dir, rownr,
6441 (uint) data_length, &empty_space,
6442 &rec_offset, &length, page_type == HEAD_PAGE))
6443 goto crashed_file;
6444 }
6445 }
6446 /* Copy data */
6447 int2store(dir+2, data_length);
6448 memcpy(buff + rec_offset, data, data_length);
6449 empty_space-= (uint) data_length;
6450 int2store(buff + EMPTY_SPACE_OFFSET, empty_space);
6451
6452 /* Fix bitmap */
6453 if (!enough_free_entries_on_page(share, buff))
6454 empty_space= 0; /* Page is full */
6455 if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
6456 goto err;
6457
6458 /*
6459 If page was not read before, write it but keep it pinned.
6460 We don't update its LSN When we have processed all REDOs for this page
6461 in the current REDO's group, we will stamp page with UNDO's LSN
6462 (if we stamped it now, a next REDO, in
6463 this group, for this page, would be skipped) and unpin then.
6464 */
6465 result= 0;
6466 if (lock_method == PAGECACHE_LOCK_WRITE &&
6467 pagecache_write(share->pagecache,
6468 &info->dfile, page, 0,
6469 buff, PAGECACHE_PLAIN_PAGE,
6470 lock_method, pin_method,
6471 PAGECACHE_WRITE_DELAY, &page_link.link,
6472 LSN_IMPOSSIBLE))
6473 result= my_errno;
6474
6475 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
6476 page_link.changed= 1;
6477 push_dynamic(&info->pinned_pages, (void*) &page_link);
6478
6479 /*
6480 Data page and bitmap page are in place, we can update data_file_length in
6481 case we extended the file. We could not do it earlier: bitmap code tests
6482 data_file_length to know if it has to create a new page or not.
6483 */
6484 set_if_bigger(share->state.state.data_file_length, end_of_page);
6485 DBUG_RETURN(result);
6486
6487crashed_file:
6488 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
6489err:
6490 error= my_errno;
6491 if (lock_method == PAGECACHE_LOCK_LEFT_WRITELOCKED)
6492 pagecache_unlock_by_link(share->pagecache, page_link.link,
6493 PAGECACHE_LOCK_WRITE_UNLOCK,
6494 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6495 LSN_IMPOSSIBLE, 0, FALSE);
6496 _ma_mark_file_crashed(share);
6497 DBUG_ASSERT(!maria_assert_if_crashed_table); /* catch recovery error early */
6498 DBUG_RETURN((my_errno= error));
6499}
6500
6501
6502/*
6503 Apply LOGREC_REDO_PURGE_ROW_HEAD & LOGREC_REDO_PURGE_ROW_TAIL
6504
6505 SYNOPSIS
6506 _ma_apply_redo_purge_row_head_or_tail()
6507 info Maria handler
6508 lsn LSN to put on page
6509 page_type HEAD_PAGE or TAIL_PAGE
6510 header Header (without FILEID)
6511
6512 NOTES
6513 This function is very similar to delete_head_or_tail()
6514
6515 RETURN
6516 0 ok
6517 # Error number
6518*/
6519
6520uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
6521 uint page_type,
6522 const uchar *header)
6523{
6524 MARIA_SHARE *share= info->s;
6525 pgcache_page_no_t page;
6526 uint rownr, empty_space;
6527 uchar *buff;
6528 int result;
6529 uint error;
6530 MARIA_PINNED_PAGE page_link;
6531 DBUG_ENTER("_ma_apply_redo_purge_row_head_or_tail");
6532
6533 page= page_korr(header);
6534 rownr= dirpos_korr(header+PAGE_STORE_SIZE);
6535 DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
6536 (ulong) ma_recordpos(page, rownr),
6537 (ulong) page, rownr));
6538
6539 share->state.changed|= (STATE_CHANGED | STATE_NOT_ZEROFILLED |
6540 STATE_NOT_MOVABLE);
6541
6542 if (!(buff= pagecache_read(share->pagecache, &info->dfile,
6543 page, 0, 0,
6544 PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
6545 &page_link.link)))
6546 goto err;
6547
6548 if (lsn_korr(buff) >= lsn)
6549 {
6550 /*
6551 Already applied
6552 Note that in case the page is not anymore a head or tail page
6553 a future redo will fix the bitmap.
6554 */
6555 if ((uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
6556 {
6557 empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET);
6558 if (!enough_free_entries_on_page(share, buff))
6559 empty_space= 0; /* Page is full */
6560 if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE,
6561 empty_space))
6562 goto err;
6563 }
6564 pagecache_unlock_by_link(share->pagecache, page_link.link,
6565 PAGECACHE_LOCK_WRITE_UNLOCK,
6566 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6567 LSN_IMPOSSIBLE, 0, FALSE);
6568 DBUG_RETURN(0);
6569 }
6570
6571 DBUG_ASSERT((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == (uchar) page_type);
6572
6573 if (delete_dir_entry(share, buff, rownr, &empty_space) < 0)
6574 {
6575 _ma_set_fatal_error(share, HA_ERR_WRONG_IN_RECORD);
6576 goto err;
6577 }
6578
6579 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
6580 page_link.changed= 1;
6581 push_dynamic(&info->pinned_pages, (void*) &page_link);
6582
6583 result= 0;
6584 if (!enough_free_entries_on_page(share, buff))
6585 empty_space= 0; /* Page is full */
6586 /* This will work even if the page was marked as UNALLOCATED_PAGE */
6587 if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
6588 result= my_errno;
6589
6590 DBUG_RETURN(result);
6591
6592err:
6593 error= my_errno;
6594 pagecache_unlock_by_link(share->pagecache, page_link.link,
6595 PAGECACHE_LOCK_WRITE_UNLOCK,
6596 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6597 LSN_IMPOSSIBLE, 0, FALSE);
6598 _ma_mark_file_crashed(share);
6599 DBUG_ASSERT(!maria_assert_if_crashed_table);
6600 DBUG_RETURN((my_errno= error));
6601
6602}
6603
6604
6605/**
6606 @brief Apply LOGREC_REDO_FREE_BLOCKS
6607
6608 @param info Maria handler
6609 @param header Header (without FILEID)
6610
6611 Mark the pages free in the bitmap.
6612
6613 We have to check against _ma_redo_not_needed_for_page()
6614 to guard against the case where we first clear a block and after
6615 that insert new data into the blocks. If we would unconditionally
6616 clear the bitmap here, future changes would be ignored for the page
6617 if it's not in the dirty list (ie, it would be flushed).
6618
6619 @return Operation status
6620 @retval 0 OK
6621 @retval 1 Error
6622*/
6623
6624uint _ma_apply_redo_free_blocks(MARIA_HA *info,
6625 LSN lsn __attribute__((unused)),
6626 LSN redo_lsn,
6627 const uchar *header)
6628{
6629 MARIA_SHARE *share= info->s;
6630 uint ranges;
6631 uint16 sid;
6632 DBUG_ENTER("_ma_apply_redo_free_blocks");
6633
6634 share->state.changed|= (STATE_CHANGED | STATE_NOT_ZEROFILLED |
6635 STATE_NOT_MOVABLE);
6636
6637 sid= fileid_korr(header);
6638 header+= FILEID_STORE_SIZE;
6639 ranges= pagerange_korr(header);
6640 header+= PAGERANGE_STORE_SIZE;
6641 DBUG_ASSERT(ranges > 0);
6642
6643 /** @todo leave bitmap lock to the bitmap code... */
6644 mysql_mutex_lock(&share->bitmap.bitmap_lock);
6645 while (ranges--)
6646 {
6647 my_bool res;
6648 uint page_range;
6649 pgcache_page_no_t page, start_page;
6650
6651 start_page= page= page_korr(header);
6652 header+= PAGE_STORE_SIZE;
6653 /* Page range may have this bit set to indicate a tail page */
6654 page_range= pagerange_korr(header) & ~(TAIL_BIT | START_EXTENT_BIT);
6655 DBUG_ASSERT(page_range > 0);
6656
6657 header+= PAGERANGE_STORE_SIZE;
6658
6659 DBUG_PRINT("info", ("page: %lu pages: %u", (long) page, page_range));
6660
6661 for ( ; page_range-- ; start_page++)
6662 {
6663 if (_ma_redo_not_needed_for_page(sid, redo_lsn, start_page, FALSE))
6664 continue;
6665 res= _ma_bitmap_reset_full_page_bits(info, &share->bitmap, start_page,
6666 1);
6667 if (res)
6668 {
6669 mysql_mutex_unlock(&share->bitmap.bitmap_lock);
6670 _ma_mark_file_crashed(share);
6671 DBUG_ASSERT(!maria_assert_if_crashed_table);
6672 DBUG_RETURN(res);
6673 }
6674 }
6675 }
6676 mysql_mutex_unlock(&share->bitmap.bitmap_lock);
6677 DBUG_RETURN(0);
6678}
6679
6680
6681/**
6682 @brief Apply LOGREC_REDO_FREE_HEAD_OR_TAIL
6683
6684 @param info Maria handler
6685 @param header Header (without FILEID)
6686
6687 @note It marks the page free in the bitmap, and sets the directory's count
6688 to 0.
6689
6690 @return Operation status
6691 @retval 0 OK
6692 @retval 1 Error
6693*/
6694
6695uint _ma_apply_redo_free_head_or_tail(MARIA_HA *info, LSN lsn,
6696 const uchar *header)
6697{
6698 MARIA_SHARE *share= info->s;
6699 uchar *buff;
6700 pgcache_page_no_t page;
6701 MARIA_PINNED_PAGE page_link;
6702 my_bool res;
6703 DBUG_ENTER("_ma_apply_redo_free_head_or_tail");
6704
6705 share->state.changed|= (STATE_CHANGED | STATE_NOT_ZEROFILLED |
6706 STATE_NOT_MOVABLE);
6707
6708 page= page_korr(header);
6709
6710 if (!(buff= pagecache_read(share->pagecache,
6711 &info->dfile,
6712 page, 0, 0,
6713 PAGECACHE_PLAIN_PAGE,
6714 PAGECACHE_LOCK_WRITE, &page_link.link)))
6715 {
6716 pagecache_unlock_by_link(share->pagecache, page_link.link,
6717 PAGECACHE_LOCK_WRITE_UNLOCK,
6718 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6719 LSN_IMPOSSIBLE, 0, FALSE);
6720 goto err;
6721 }
6722 if (lsn_korr(buff) >= lsn)
6723 {
6724 /* Already applied */
6725 pagecache_unlock_by_link(share->pagecache, page_link.link,
6726 PAGECACHE_LOCK_WRITE_UNLOCK,
6727 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6728 LSN_IMPOSSIBLE, 0, FALSE);
6729 }
6730 else
6731 {
6732 buff[PAGE_TYPE_OFFSET]= UNALLOCATED_PAGE;
6733#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
6734 {
6735 uint number_of_records= (uint) buff[DIR_COUNT_OFFSET];
6736 uchar *dir= dir_entry_pos(buff, share->block_size,
6737 number_of_records-1);
6738 buff[DIR_FREE_OFFSET]= END_OF_DIR_FREE_LIST;
6739 bzero(dir, number_of_records * DIR_ENTRY_SIZE);
6740 }
6741#endif
6742
6743 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
6744 page_link.changed= 1;
6745 push_dynamic(&info->pinned_pages, (void*) &page_link);
6746 }
6747 /** @todo leave bitmap lock to the bitmap code... */
6748 mysql_mutex_lock(&share->bitmap.bitmap_lock);
6749 res= _ma_bitmap_reset_full_page_bits(info, &share->bitmap, page, 1);
6750 mysql_mutex_unlock(&share->bitmap.bitmap_lock);
6751 if (res)
6752 goto err;
6753 DBUG_RETURN(0);
6754
6755err:
6756 _ma_mark_file_crashed(share);
6757 DBUG_ASSERT(!maria_assert_if_crashed_table);
6758 DBUG_RETURN(1);
6759}
6760
6761
6762/**
6763 @brief Apply LOGREC_REDO_INSERT_ROW_BLOBS
6764
6765 @param info Maria handler
6766 @parma lsn LSN to put on pages
6767 @param header Header (with FILEID)
6768 @param redo_lsn REDO record's LSN
6769 @param[out] number_of_blobs Number of blobs found in log record
6770 @param[out] number_of_ranges Number of ranges found
6771 @param[out] first_page First page touched
6772 @param[out] last_page Last page touched
6773
6774 @note Write full pages (full head & blob pages)
6775
6776 @return Operation status
6777 @retval 0 OK
6778 @retval !=0 Error
6779*/
6780
6781uint _ma_apply_redo_insert_row_blobs(MARIA_HA *info,
6782 LSN lsn, const uchar *header,
6783 LSN redo_lsn,
6784 uint * const number_of_blobs,
6785 uint * const number_of_ranges,
6786 pgcache_page_no_t * const first_page,
6787 pgcache_page_no_t * const last_page)
6788{
6789 MARIA_SHARE *share= info->s;
6790 const uchar *data;
6791 uint data_size= FULL_PAGE_SIZE(share);
6792 uint blob_count, ranges;
6793 uint16 sid;
6794 pgcache_page_no_t first_page2= ULONGLONG_MAX, last_page2= 0;
6795 DBUG_ENTER("_ma_apply_redo_insert_row_blobs");
6796
6797 share->state.changed|= (STATE_CHANGED | STATE_NOT_ZEROFILLED |
6798 STATE_NOT_MOVABLE);
6799
6800 sid= fileid_korr(header);
6801 header+= FILEID_STORE_SIZE;
6802 *number_of_ranges= ranges= pagerange_korr(header);
6803 header+= PAGERANGE_STORE_SIZE;
6804 *number_of_blobs= blob_count= pagerange_korr(header);
6805 header+= PAGERANGE_STORE_SIZE;
6806 DBUG_ASSERT(ranges >= blob_count);
6807
6808 data= (header + ranges * ROW_EXTENT_SIZE +
6809 blob_count * (SUB_RANGE_SIZE + BLOCK_FILLER_SIZE));
6810
6811 while (blob_count--)
6812 {
6813 uint sub_ranges, empty_space;
6814
6815 sub_ranges= uint2korr(header);
6816 header+= SUB_RANGE_SIZE;
6817 empty_space= uint2korr(header);
6818 header+= BLOCK_FILLER_SIZE;
6819 DBUG_ASSERT(sub_ranges <= ranges && empty_space < data_size);
6820 ranges-= sub_ranges;
6821
6822 while (sub_ranges--)
6823 {
6824 uint i;
6825 uint res;
6826 uint page_range;
6827 pgcache_page_no_t page;
6828 uchar *buff;
6829 uint data_on_page= data_size;
6830
6831 page= page_korr(header);
6832 header+= PAGE_STORE_SIZE;
6833 page_range= pagerange_korr(header);
6834 header+= PAGERANGE_STORE_SIZE;
6835
6836 for (i= page_range; i-- > 0 ; page++, data+= data_on_page)
6837 {
6838 MARIA_PINNED_PAGE page_link;
6839 enum pagecache_page_lock unlock_method;
6840 enum pagecache_page_pin unpin_method;
6841
6842 set_if_smaller(first_page2, page);
6843 set_if_bigger(last_page2, page);
6844 if (i == 0 && sub_ranges == 0)
6845 data_on_page= data_size - empty_space; /* data on last page */
6846 if (_ma_redo_not_needed_for_page(sid, redo_lsn, page, FALSE))
6847 continue;
6848
6849 if (((page + 1) * share->block_size) >
6850 share->state.state.data_file_length)
6851 {
6852 /* New page or half written page at end of file */
6853 DBUG_PRINT("info", ("Enlarging data file from %lu to %lu",
6854 (ulong) share->state.state.data_file_length,
6855 (ulong) ((page + 1 ) * share->block_size)));
6856 share->state.state.data_file_length= (page + 1) * share->block_size;
6857 buff= info->keyread_buff;
6858 info->keyread_buff_used= 1;
6859 make_empty_page(info, buff, BLOB_PAGE, 0);
6860 unlock_method= PAGECACHE_LOCK_LEFT_UNLOCKED;
6861 unpin_method= PAGECACHE_PIN_LEFT_UNPINNED;
6862 }
6863 else
6864 {
6865 share->pagecache->readwrite_flags&= ~MY_WME;
6866 buff= pagecache_read(share->pagecache,
6867 &info->dfile,
6868 page, 0, 0,
6869 PAGECACHE_PLAIN_PAGE,
6870 PAGECACHE_LOCK_WRITE, &page_link.link);
6871 share->pagecache->readwrite_flags= share->pagecache->
6872 org_readwrite_flags;
6873 if (!buff)
6874 {
6875 if (my_errno != HA_ERR_FILE_TOO_SHORT &&
6876 my_errno != HA_ERR_WRONG_CRC)
6877 {
6878 /* If not read outside of file */
6879 pagecache_unlock_by_link(share->pagecache, page_link.link,
6880 PAGECACHE_LOCK_WRITE_UNLOCK,
6881 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6882 LSN_IMPOSSIBLE, 0, FALSE);
6883 goto err;
6884 }
6885 /*
6886 Physical file was too short, create new page. It can be that
6887 recovery started with a file with N pages, wrote page N+2 into
6888 pagecache (increased data_file_length but not physical file
6889 length), now reads page N+1: the read fails.
6890 */
6891 buff= pagecache_block_link_to_buffer(page_link.link);
6892 make_empty_page(info, buff, BLOB_PAGE, 0);
6893 }
6894 else
6895 {
6896#ifdef DBUG_ASSERT_EXISTS
6897 uchar found_page_type= (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK);
6898#endif
6899 if (lsn_korr(buff) >= lsn)
6900 {
6901 /* Already applied */
6902 DBUG_PRINT("info", ("already applied %llu >= %llu",
6903 lsn_korr(buff), lsn));
6904 pagecache_unlock_by_link(share->pagecache, page_link.link,
6905 PAGECACHE_LOCK_WRITE_UNLOCK,
6906 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
6907 LSN_IMPOSSIBLE, 0, FALSE);
6908 goto fix_bitmap;
6909 }
6910 DBUG_ASSERT((found_page_type == (uchar) BLOB_PAGE) ||
6911 (found_page_type == (uchar) UNALLOCATED_PAGE));
6912 }
6913 unlock_method= PAGECACHE_LOCK_WRITE_UNLOCK;
6914 unpin_method= PAGECACHE_UNPIN;
6915 }
6916
6917 /*
6918 Blob pages are never updated twice in same redo-undo chain, so
6919 it's safe to update lsn for them here
6920 */
6921 lsn_store(buff, lsn);
6922 buff[PAGE_TYPE_OFFSET]= BLOB_PAGE;
6923 bzero(buff + LSN_SIZE + PAGE_TYPE_SIZE,
6924 FULL_PAGE_HEADER_SIZE(share) - (LSN_SIZE + PAGE_TYPE_SIZE));
6925
6926 if (data_on_page != data_size)
6927 {
6928 /*
6929 Last page may be only partly filled. We zero the rest, like
6930 write_full_pages() does.
6931 */
6932 bzero(buff + share->block_size - PAGE_SUFFIX_SIZE - empty_space,
6933 empty_space);
6934 }
6935 memcpy(buff + FULL_PAGE_HEADER_SIZE(share), data, data_on_page);
6936 if (pagecache_write(share->pagecache,
6937 &info->dfile, page, 0,
6938 buff, PAGECACHE_PLAIN_PAGE,
6939 unlock_method, unpin_method,
6940 PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE))
6941 goto err;
6942
6943 fix_bitmap:
6944 /** @todo leave bitmap lock to the bitmap code... */
6945 mysql_mutex_lock(&share->bitmap.bitmap_lock);
6946 res= _ma_bitmap_set_full_page_bits(info, &share->bitmap, page,
6947 1);
6948 mysql_mutex_unlock(&share->bitmap.bitmap_lock);
6949 if (res)
6950 goto err;
6951 }
6952 }
6953 }
6954 *first_page= first_page2;
6955 *last_page= last_page2;
6956 DBUG_RETURN(0);
6957
6958err:
6959 _ma_mark_file_crashed(share);
6960 DBUG_ASSERT(!maria_assert_if_crashed_table);
6961 DBUG_RETURN(1);
6962}
6963
6964
6965/****************************************************************************
6966 Applying of UNDO entries
6967****************************************************************************/
6968
6969/** Execute undo of a row insert (delete the inserted row) */
6970
6971my_bool _ma_apply_undo_row_insert(MARIA_HA *info, LSN undo_lsn,
6972 const uchar *header)
6973{
6974 pgcache_page_no_t page;
6975 uint rownr;
6976 uchar *buff;
6977 my_bool res;
6978 MARIA_PINNED_PAGE page_link;
6979 MARIA_SHARE *share= info->s;
6980 ha_checksum checksum;
6981 LSN lsn;
6982 DBUG_ENTER("_ma_apply_undo_row_insert");
6983
6984 page= page_korr(header);
6985 header+= PAGE_STORE_SIZE;
6986 rownr= dirpos_korr(header);
6987 header+= DIRPOS_STORE_SIZE;
6988 DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
6989 (ulong) ma_recordpos(page, rownr),
6990 (ulong) page, rownr));
6991
6992 buff= pagecache_read(share->pagecache,
6993 &info->dfile, page, 0,
6994 0, share->page_type,
6995 PAGECACHE_LOCK_WRITE,
6996 &page_link.link);
6997 page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
6998 page_link.changed= buff != 0;
6999 push_dynamic(&info->pinned_pages, (void*) &page_link);
7000 if (!buff)
7001 goto err;
7002
7003 if (read_row_extent_info(info, buff, rownr))
7004 goto err;
7005
7006 _ma_bitmap_flushable(info, 1);
7007 if (delete_head_or_tail(info, page, rownr, 1, 1) ||
7008 delete_tails(info, info->cur_row.tail_positions))
7009 goto err;
7010
7011 if (info->cur_row.extents_count && free_full_pages(info, &info->cur_row))
7012 goto err;
7013
7014 checksum= 0;
7015 if (share->calc_checksum)
7016 checksum= (ha_checksum) 0 - ha_checksum_korr(header);
7017 info->last_auto_increment= ~ (ulonglong) 0;
7018 if (_ma_write_clr(info, undo_lsn, LOGREC_UNDO_ROW_INSERT,
7019 share->calc_checksum != 0, checksum, &lsn, (void*) 0))
7020 goto err;
7021
7022 res= 0;
7023end:
7024 /* The following is true only if _ma_bitmap_flushable() was called earlier */
7025 if (info->non_flushable_state)
7026 _ma_bitmap_flushable(info, -1);
7027 _ma_unpin_all_pages_and_finalize_row(info, lsn);
7028 DBUG_RETURN(res);
7029
7030err:
7031 DBUG_ASSERT(!maria_assert_if_crashed_table);
7032 res= 1;
7033 _ma_mark_file_crashed(share);
7034 /*
7035 Don't write a new LSN on the used pages. Not important as the file is
7036 marked as crashed and need to be repaired before it can be used.
7037 */
7038 lsn= LSN_IMPOSSIBLE;
7039 goto end;
7040}
7041
7042
7043/** Execute undo of a row delete (insert the row back where it was) */
7044
7045my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
7046 const uchar *header, size_t header_length
7047 __attribute__((unused)))
7048{
7049 MARIA_SHARE *share= info->s;
7050 MARIA_ROW row;
7051 MARIA_COLUMNDEF *column, *end_column;
7052 MARIA_BITMAP_BLOCKS *blocks;
7053 struct st_row_pos_info row_pos;
7054 uchar *record;
7055 const uchar *null_bits, *field_length_data, *extent_info;
7056 pgcache_page_no_t page;
7057 ulong *blob_lengths;
7058 uint *null_field_lengths, extent_count, rownr, length_on_head_page;
7059 DBUG_ENTER("_ma_apply_undo_row_delete");
7060
7061 /*
7062 Use cur row as a base; We need to make a copy as we will change
7063 some buffers to point directly to 'header'
7064 */
7065 memcpy(&row, &info->cur_row, sizeof(row));
7066
7067 page= page_korr(header);
7068 header+= PAGE_STORE_SIZE;
7069 rownr= dirpos_korr(header);
7070 header+= DIRPOS_STORE_SIZE;
7071 length_on_head_page= uint2korr(header);
7072 header+= 2;
7073 extent_count= pagerange_korr(header);
7074 header+= PAGERANGE_STORE_SIZE;
7075 DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
7076 (ulong) ma_recordpos(page, rownr),
7077 (ulong) page, rownr));
7078
7079 if (share->calc_checksum)
7080 {
7081 /*
7082 We extract the checksum delta here, saving a recomputation in
7083 allocate_and_write_block_record(). It's only an optimization.
7084 */
7085 row.checksum= (ha_checksum) 0 - ha_checksum_korr(header);
7086 header+= HA_CHECKSUM_STORE_SIZE;
7087 }
7088 extent_info= header;
7089 header+= extent_count * ROW_EXTENT_SIZE;
7090
7091 null_field_lengths= row.null_field_lengths;
7092 blob_lengths= row.blob_lengths;
7093
7094 /*
7095 Fill in info->cur_row with information about the row, like in
7096 calc_record_size(), to be used by write_block_record()
7097 */
7098
7099 row.normal_length= row.char_length= row.varchar_length=
7100 row.blob_length= row.extents_count= row.field_lengths_length= 0;
7101
7102 null_bits= header;
7103 header+= share->base.null_bytes;
7104 /* This will not be changed */
7105 row.empty_bits= (uchar*) header;
7106 header+= share->base.pack_bytes;
7107 if (share->base.max_field_lengths)
7108 {
7109 row.field_lengths_length= uint2korr(header);
7110 row.field_lengths= (uchar*) header + 2 ;
7111 header+= 2 + row.field_lengths_length;
7112 }
7113 if (share->base.blobs)
7114 row.blob_length= ma_get_length(&header);
7115
7116 /* We need to build up a record (without blobs) in rec_buff */
7117 if (!(record= my_malloc(share->base.reclength, MYF(MY_WME))))
7118 DBUG_RETURN(1);
7119
7120 memcpy(record, null_bits, share->base.null_bytes);
7121
7122 /* Copy field information from header to record */
7123
7124 /* Handle constant length fields that are always present */
7125 for (column= share->columndef,
7126 end_column= column+ share->base.fixed_not_null_fields;
7127 column < end_column;
7128 column++)
7129 {
7130 memcpy(record + column->offset, header, column->length);
7131 header+= column->length;
7132 }
7133
7134 /* Handle NULL fields and CHAR/VARCHAR fields */
7135 field_length_data= row.field_lengths;
7136 for (end_column= share->columndef + share->base.fields;
7137 column < end_column;
7138 column++, null_field_lengths++)
7139 {
7140 if ((record[column->null_pos] & column->null_bit) ||
7141 row.empty_bits[column->empty_pos] & column->empty_bit)
7142 {
7143 if (column->type != FIELD_BLOB)
7144 *null_field_lengths= 0;
7145 else
7146 *blob_lengths++= 0;
7147 if (share->calc_checksum)
7148 bfill(record + column->offset, column->fill_length,
7149 column->type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
7150 continue;
7151 }
7152 switch (column->type) {
7153 case FIELD_CHECK:
7154 case FIELD_NORMAL: /* Fixed length field */
7155 case FIELD_ZERO:
7156 case FIELD_SKIP_PRESPACE: /* Not packed */
7157 case FIELD_SKIP_ZERO: /* Fixed length field */
7158 row.normal_length+= column->length;
7159 *null_field_lengths= column->length;
7160 memcpy(record + column->offset, header, column->length);
7161 header+= column->length;
7162 break;
7163 case FIELD_SKIP_ENDSPACE: /* CHAR */
7164 {
7165 uint length;
7166 if (column->length <= 255)
7167 length= (uint) *field_length_data++;
7168 else
7169 {
7170 length= uint2korr(field_length_data);
7171 field_length_data+= 2;
7172 }
7173 row.char_length+= length;
7174 *null_field_lengths= length;
7175 memcpy(record + column->offset, header, length);
7176 if (share->calc_checksum)
7177 bfill(record + column->offset + length, (column->length - length),
7178 ' ');
7179 header+= length;
7180 break;
7181 }
7182 case FIELD_VARCHAR:
7183 {
7184 uint length;
7185 uchar *field_pos= record + column->offset;
7186
7187 /* 256 is correct as this includes the length uchar */
7188 if (column->fill_length == 1)
7189 {
7190 field_pos[0]= *field_length_data;
7191 length= (uint) *field_length_data;
7192 }
7193 else
7194 {
7195 field_pos[0]= field_length_data[0];
7196 field_pos[1]= field_length_data[1];
7197 length= uint2korr(field_length_data);
7198 }
7199 field_length_data+= column->fill_length;
7200 field_pos+= column->fill_length;
7201 row.varchar_length+= length;
7202 *null_field_lengths= length;
7203 memcpy(field_pos, header, length);
7204 header+= length;
7205 break;
7206 }
7207 case FIELD_BLOB:
7208 {
7209 /* Copy length of blob and pointer to blob data to record */
7210 uchar *field_pos= record + column->offset;
7211 uint size_length= column->length - portable_sizeof_char_ptr;
7212 ulong blob_length= _ma_calc_blob_length(size_length, field_length_data);
7213
7214 memcpy(field_pos, field_length_data, size_length);
7215 field_length_data+= size_length;
7216 memcpy(field_pos + size_length, &header, sizeof(header));
7217 header+= blob_length;
7218 *blob_lengths++= blob_length;
7219 break;
7220 }
7221 default:
7222 DBUG_ASSERT(0);
7223 }
7224 }
7225 row.head_length= (info->row_base_length +
7226 share->base.fixed_not_null_fields_length +
7227 row.field_lengths_length +
7228 size_to_store_key_length(row.field_lengths_length) +
7229 row.normal_length +
7230 row.char_length + row.varchar_length);
7231 row.total_length= (row.head_length + row.blob_length);
7232 if (row.total_length < share->base.min_block_length)
7233 row.total_length= share->base.min_block_length;
7234
7235 /*
7236 Row is now generated. Now we need to insert record on the original
7237 pages with original size on each page.
7238 */
7239
7240 _ma_bitmap_flushable(info, 1);
7241 /* Change extent information to be usable by write_block_record() */
7242 blocks= &row.insert_blocks;
7243 if (extent_to_bitmap_blocks(info, blocks, page, extent_count, extent_info))
7244 goto err;
7245 blocks->block->org_bitmap_value= _ma_bitmap_get_page_bits(info,
7246 &share->bitmap,
7247 page);
7248 blocks->block->used|= BLOCKUSED_USE_ORG_BITMAP;
7249
7250 /* Read head page and allocate data for rowid */
7251 if (get_rowpos_in_head_or_tail_page(info, blocks->block,
7252 info->buff,
7253 length_on_head_page,
7254 HEAD_PAGE, PAGECACHE_LOCK_WRITE,
7255 rownr, &row_pos))
7256 goto err;
7257
7258 if (share->calc_checksum)
7259 {
7260 DBUG_ASSERT(row.checksum == (share->calc_checksum)(info, record));
7261 }
7262 /* Store same amount of data on head page as on original page */
7263 row_pos.length= (length_on_head_page -
7264 (extent_count + 1 - blocks->count) * ROW_EXTENT_SIZE);
7265 set_if_bigger(row_pos.length, share->base.min_block_length);
7266 if (write_block_record(info, (uchar*) 0, record, &row,
7267 blocks, blocks->block->org_bitmap_value != 0,
7268 &row_pos, undo_lsn, 0))
7269 goto err;
7270
7271 my_free(record);
7272 DBUG_RETURN(0);
7273
7274err:
7275 DBUG_ASSERT(!maria_assert_if_crashed_table);
7276 _ma_mark_file_crashed(share);
7277 if (info->non_flushable_state)
7278 _ma_bitmap_flushable(info, -1);
7279 _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
7280 my_free(record);
7281 DBUG_RETURN(1);
7282}
7283
7284
7285/**
7286 Execute undo of a row update
7287
7288 @fn _ma_apply_undo_row_update()
7289
7290 @return Operation status
7291 @retval 0 OK
7292 @retval 1 Error
7293*/
7294
7295my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
7296 const uchar *header,
7297 size_t header_length
7298 __attribute__((unused)))
7299{
7300 MARIA_SHARE *share= info->s;
7301 MARIA_RECORD_POS record_pos;
7302 const uchar *field_length_data, *field_length_data_end, *extent_info;
7303 uchar *current_record, *orig_record;
7304 pgcache_page_no_t page;
7305 ha_checksum UNINIT_VAR(checksum_delta);
7306 uint rownr, field_length_header, extent_count, length_on_head_page;
7307 int error;
7308 DBUG_ENTER("_ma_apply_undo_row_update");
7309
7310 page= page_korr(header);
7311 header+= PAGE_STORE_SIZE;
7312 rownr= dirpos_korr(header);
7313 header+= DIRPOS_STORE_SIZE;
7314
7315 record_pos= ma_recordpos(page, rownr);
7316 DBUG_PRINT("enter", ("rowid: %lu page: %lu rownr: %u",
7317 (ulong) record_pos, (ulong) page, rownr));
7318
7319 if (share->calc_checksum)
7320 {
7321 checksum_delta= ha_checksum_korr(header);
7322 header+= HA_CHECKSUM_STORE_SIZE;
7323 }
7324 length_on_head_page= uint2korr(header);
7325 set_if_bigger(length_on_head_page, share->base.min_block_length);
7326 header+= 2;
7327 extent_count= pagerange_korr(header);
7328 header+= PAGERANGE_STORE_SIZE;
7329 extent_info= header;
7330 header+= extent_count * ROW_EXTENT_SIZE;
7331
7332 /*
7333 Set header to point to old field values, generated by
7334 fill_update_undo_parts()
7335 */
7336 field_length_header= ma_get_length(&header);
7337 field_length_data= (uchar*) header;
7338 header+= field_length_header;
7339 field_length_data_end= header;
7340
7341 /* Allocate buffer for current row & original row */
7342 if (!(current_record= my_malloc(share->base.reclength * 2, MYF(MY_WME))))
7343 DBUG_RETURN(1);
7344 orig_record= current_record+ share->base.reclength;
7345
7346 /* Read current record */
7347 if (_ma_read_block_record(info, current_record, record_pos))
7348 goto err;
7349
7350 if (*field_length_data == 255)
7351 {
7352 /* Bitmap changed */
7353 field_length_data++;
7354 memcpy(orig_record, header, share->base.null_bytes);
7355 header+= share->base.null_bytes;
7356 }
7357 else
7358 memcpy(orig_record, current_record, share->base.null_bytes);
7359 bitmap_clear_all(&info->changed_fields);
7360
7361 while (field_length_data < field_length_data_end)
7362 {
7363 uint field_nr= ma_get_length(&field_length_data), field_length;
7364 MARIA_COLUMNDEF *column= share->columndef + field_nr;
7365 uchar *orig_field_pos= orig_record + column->offset;
7366
7367 bitmap_set_bit(&info->changed_fields, field_nr);
7368 if (field_nr >= share->base.fixed_not_null_fields)
7369 {
7370 if (!(field_length= ma_get_length(&field_length_data)))
7371 {
7372 /* Null field or empty field */
7373 bfill(orig_field_pos, column->fill_length,
7374 column->type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
7375 continue;
7376 }
7377 }
7378 else
7379 field_length= column->length;
7380
7381 switch (column->type) {
7382 case FIELD_CHECK:
7383 case FIELD_NORMAL: /* Fixed length field */
7384 case FIELD_ZERO:
7385 case FIELD_SKIP_PRESPACE: /* Not packed */
7386 memcpy(orig_field_pos, header, column->length);
7387 header+= column->length;
7388 break;
7389 case FIELD_SKIP_ZERO: /* Number */
7390 case FIELD_SKIP_ENDSPACE: /* CHAR */
7391 {
7392 uint diff;
7393 memcpy(orig_field_pos, header, field_length);
7394 if ((diff= (column->length - field_length)))
7395 bfill(orig_field_pos + column->length - diff, diff,
7396 column->type == FIELD_SKIP_ENDSPACE ? ' ' : 0);
7397 header+= field_length;
7398 }
7399 break;
7400 case FIELD_VARCHAR:
7401 if (column->length <= 256)
7402 {
7403 *orig_field_pos++= (uchar) field_length;
7404 }
7405 else
7406 {
7407 int2store(orig_field_pos, field_length);
7408 orig_field_pos+= 2;
7409 }
7410 memcpy(orig_field_pos, header, field_length);
7411 header+= field_length;
7412 break;
7413 case FIELD_BLOB:
7414 {
7415 uint size_length= column->length - portable_sizeof_char_ptr;
7416 _ma_store_blob_length(orig_field_pos, size_length, field_length);
7417 memcpy(orig_field_pos + size_length, &header, sizeof(header));
7418 header+= field_length;
7419 break;
7420 }
7421 default:
7422 DBUG_ASSERT(0);
7423 }
7424 }
7425 copy_not_changed_fields(info, &info->changed_fields,
7426 orig_record, current_record);
7427
7428 if (share->calc_checksum)
7429 {
7430 info->new_row.checksum= checksum_delta +
7431 (info->cur_row.checksum= (*share->calc_checksum)(info, orig_record));
7432 /* verify that record's content is sane */
7433 DBUG_ASSERT(info->new_row.checksum ==
7434 (*share->calc_checksum)(info, current_record));
7435 }
7436
7437 info->last_auto_increment= ~ (ulonglong) 0;
7438 /* Now records are up to date, execute the update to original values */
7439 if (_ma_update_at_original_place(info, page, rownr, length_on_head_page,
7440 extent_count, extent_info,
7441 current_record, orig_record, undo_lsn))
7442 goto err;
7443
7444 error= 0;
7445end:
7446 my_free(current_record);
7447 DBUG_RETURN(error);
7448
7449err:
7450 DBUG_ASSERT(!maria_assert_if_crashed_table);
7451 error= 1;
7452 _ma_mark_file_crashed(share);
7453 goto end;
7454}
7455
7456
7457/**
7458 Execute undo of a bulk insert which used repair
7459
7460 @return Operation status
7461 @retval 0 OK
7462 @retval 1 Error
7463*/
7464
7465my_bool _ma_apply_undo_bulk_insert(MARIA_HA *info, LSN undo_lsn)
7466{
7467 my_bool error;
7468 LSN lsn;
7469 DBUG_ENTER("_ma_apply_undo_bulk_insert");
7470 /*
7471 We delete all rows, re-enable indices as bulk insert had disabled
7472 non-unique ones.
7473 */
7474 error= (maria_delete_all_rows(info) ||
7475 maria_enable_indexes(info) ||
7476 /* we enabled indices so need '2' below */
7477 _ma_state_info_write(info->s,
7478 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
7479 MA_STATE_INFO_WRITE_FULL_INFO |
7480 MA_STATE_INFO_WRITE_LOCK) ||
7481 _ma_write_clr(info, undo_lsn, LOGREC_UNDO_BULK_INSERT,
7482 FALSE, 0, &lsn, NULL));
7483 DBUG_RETURN(error);
7484}
7485
7486
7487/**
7488 @brief Get the TRANSLOG_ADDRESS to flush up to
7489
7490 @param page Page's content
7491 @param page_no Page's number (<offset>/<page length>)
7492 @param data_ptr Callback data pointer (pointer to MARIA_SHARE)
7493
7494 @note
7495 Usable for data (non-bitmap) and index pages
7496
7497 @retval LSN to flush up to
7498*/
7499
7500TRANSLOG_ADDRESS
7501maria_page_get_lsn(uchar *page,
7502 pgcache_page_no_t page_no __attribute__((unused)),
7503 uchar* data_ptr __attribute__((unused)))
7504{
7505#ifndef DBUG_OFF
7506 const MARIA_SHARE *share= (MARIA_SHARE*)data_ptr;
7507 DBUG_ASSERT(share->page_type == PAGECACHE_LSN_PAGE &&
7508 share->now_transactional);
7509#endif
7510 return lsn_korr(page);
7511}
7512
7513
7514/**
7515 @brief Enable reading of all rows, ignoring versioning
7516
7517 @note
7518 This is mainly useful in single user applications, like maria_pack,
7519 where we want to be able to read all rows without having to read the
7520 transaction id from the control file
7521*/
7522
7523void maria_ignore_trids(MARIA_HA *info)
7524{
7525 if (info->s->base.born_transactional)
7526 {
7527 if (!info->trn)
7528 _ma_set_trn_for_table(info, &dummy_transaction_object);
7529 /* Ignore transaction id when row is read */
7530 info->trn->min_read_from= ~(TrID) 0;
7531 }
7532}
7533
7534
7535#ifndef DBUG_OFF
7536
7537/* The following functions are useful to call from debugger */
7538
7539void _ma_print_block_info(MARIA_SHARE *share, uchar *buff)
7540{
7541 LSN lsn= lsn_korr(buff);
7542
7543 printf("LSN:" LSN_FMT " type: %u dir_entries: %u dir_free: %u empty_space: %u\n",
7544 LSN_IN_PARTS(lsn),
7545 (uint)buff[PAGE_TYPE_OFFSET],
7546 (uint)buff[DIR_COUNT_OFFSET],
7547 (uint)buff[DIR_FREE_OFFSET],
7548 (uint) uint2korr(buff + EMPTY_SPACE_OFFSET));
7549 printf("Start of directory: %lu\n",
7550 maria_block_size - PAGE_SUFFIX_SIZE -
7551 (uint) buff[DIR_COUNT_OFFSET] * DIR_ENTRY_SIZE);
7552 _ma_print_directory(share, stdout, buff, maria_block_size);
7553}
7554#endif
7555