1/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16/* Create a MARIA table */
17
18#include "ma_ftdefs.h"
19#include "ma_sp_defs.h"
20#include <my_bit.h>
21#include "ma_blockrec.h"
22#include "trnman_public.h"
23#include "ma_crypt.h"
24
25#if defined(MSDOS) || defined(__WIN__)
26#ifdef __WIN__
27#include <fcntl.h>
28#else
29#include <process.h> /* Prototype for getpid */
30#endif
31#endif
32#include <m_ctype.h>
33
34static int compare_columns(MARIA_COLUMNDEF **a, MARIA_COLUMNDEF **b);
35
36
37static ulonglong update_tot_length(ulonglong tot_length, ulonglong max_rows, uint length)
38{
39 ulonglong tot_length_part;
40
41 if (tot_length == ULONGLONG_MAX)
42 return ULONGLONG_MAX;
43
44 tot_length_part= (max_rows/(ulong) ((maria_block_size -
45 MAX_KEYPAGE_HEADER_SIZE - KEYPAGE_CHECKSUM_SIZE)/
46 (length*2)));
47 if (tot_length_part >= ULONGLONG_MAX / maria_block_size)
48 return ULONGLONG_MAX;
49
50 if (tot_length > ULONGLONG_MAX - tot_length_part * maria_block_size)
51 return ULONGLONG_MAX;
52
53 return tot_length + tot_length_part * maria_block_size;
54}
55
56
57/*
58 Old options is used when recreating database, from maria_chk
59*/
60
61int maria_create(const char *name, enum data_file_type datafile_type,
62 uint keys,MARIA_KEYDEF *keydefs,
63 uint columns, MARIA_COLUMNDEF *columndef,
64 uint uniques, MARIA_UNIQUEDEF *uniquedefs,
65 MARIA_CREATE_INFO *ci,uint flags)
66{
67 register uint i,j;
68 File UNINIT_VAR(dfile), UNINIT_VAR(file);
69 int errpos,save_errno, create_mode= O_RDWR | O_TRUNC, res;
70 myf create_flag;
71 uint length,max_key_length,packed,pack_bytes,pointer,real_length_diff,
72 key_length,info_length,key_segs,options,min_key_length,
73 base_pos,long_varchar_count,
74 unique_key_parts,fulltext_keys,offset, not_block_record_extra_length;
75 uint max_field_lengths, extra_header_size, column_nr;
76 uint internal_table= flags & HA_CREATE_INTERNAL_TABLE;
77 ulong reclength, real_reclength,min_pack_length;
78 char kfilename[FN_REFLEN], klinkname[FN_REFLEN], *klinkname_ptr;
79 char dfilename[FN_REFLEN], dlinkname[FN_REFLEN], *dlinkname_ptr= 0;
80 ulong pack_reclength;
81 ulonglong tot_length,max_rows, tmp;
82 enum en_fieldtype type;
83 enum data_file_type org_datafile_type= datafile_type;
84 MARIA_SHARE share;
85 MARIA_KEYDEF *keydef,tmp_keydef;
86 MARIA_UNIQUEDEF *uniquedef;
87 HA_KEYSEG *keyseg,tmp_keyseg;
88 MARIA_COLUMNDEF *column, *end_column;
89 double *rec_per_key_part;
90 ulong *nulls_per_key_part;
91 uint16 *column_array;
92 my_off_t key_root[HA_MAX_POSSIBLE_KEY], kfile_size_before_extension;
93 MARIA_CREATE_INFO tmp_create_info;
94 my_bool tmp_table= FALSE; /* cache for presence of HA_OPTION_TMP_TABLE */
95 my_bool forced_packed;
96 myf sync_dir= 0;
97 uchar *log_data= NULL;
98 my_bool encrypted= maria_encrypt_tables && datafile_type == BLOCK_RECORD;
99 my_bool insert_order= MY_TEST(flags & HA_PRESERVE_INSERT_ORDER);
100 uint crypt_page_header_space= 0;
101 DBUG_ENTER("maria_create");
102 DBUG_PRINT("enter", ("keys: %u columns: %u uniques: %u flags: %u",
103 keys, columns, uniques, flags));
104
105 DBUG_ASSERT(maria_inited);
106
107 if (!ci)
108 {
109 bzero((char*) &tmp_create_info,sizeof(tmp_create_info));
110 ci=&tmp_create_info;
111 }
112
113 if (keys + uniques > MARIA_MAX_KEY)
114 {
115 DBUG_RETURN(my_errno=HA_WRONG_CREATE_OPTION);
116 }
117 errpos=0;
118 options=0;
119 bzero((uchar*) &share,sizeof(share));
120
121 if (flags & HA_DONT_TOUCH_DATA)
122 {
123 /* We come here from recreate table */
124 org_datafile_type= ci->org_data_file_type;
125 if (!(ci->old_options & HA_OPTION_TEMP_COMPRESS_RECORD))
126 options= (ci->old_options &
127 (HA_OPTION_COMPRESS_RECORD | HA_OPTION_PACK_RECORD |
128 HA_OPTION_READ_ONLY_DATA | HA_OPTION_CHECKSUM |
129 HA_OPTION_TMP_TABLE | HA_OPTION_DELAY_KEY_WRITE |
130 HA_OPTION_LONG_BLOB_PTR | HA_OPTION_PAGE_CHECKSUM));
131 else
132 {
133 /* Uncompressing rows */
134 options= (ci->old_options &
135 (HA_OPTION_CHECKSUM | HA_OPTION_TMP_TABLE |
136 HA_OPTION_DELAY_KEY_WRITE | HA_OPTION_LONG_BLOB_PTR |
137 HA_OPTION_PAGE_CHECKSUM));
138 }
139 }
140 else
141 {
142 /* Transactional tables must be of type BLOCK_RECORD */
143 if (ci->transactional)
144 datafile_type= BLOCK_RECORD;
145 }
146
147 if (!(rec_per_key_part=
148 (double*) my_malloc((keys + uniques)*HA_MAX_KEY_SEG*sizeof(double) +
149 (keys + uniques)*HA_MAX_KEY_SEG*sizeof(ulong) +
150 sizeof(uint16) * columns,
151 MYF(MY_WME | MY_ZEROFILL))))
152 DBUG_RETURN(my_errno);
153 nulls_per_key_part= (ulong*) (rec_per_key_part +
154 (keys + uniques) * HA_MAX_KEY_SEG);
155 column_array= (uint16*) (nulls_per_key_part +
156 (keys + uniques) * HA_MAX_KEY_SEG);
157
158
159 /* Start by checking fields and field-types used */
160 long_varchar_count=packed= not_block_record_extra_length=
161 pack_reclength= max_field_lengths= 0;
162 reclength= min_pack_length= ci->null_bytes;
163 forced_packed= 0;
164 column_nr= 0;
165
166 if (encrypted)
167 {
168 DBUG_ASSERT(datafile_type == BLOCK_RECORD);
169 crypt_page_header_space= ma_crypt_get_data_page_header_space();
170 }
171
172 for (column= columndef, end_column= column + columns ;
173 column != end_column ;
174 column++)
175 {
176 /* Fill in not used struct parts */
177 column->column_nr= column_nr++;
178 column->offset= reclength;
179 column->empty_pos= 0;
180 column->empty_bit= 0;
181 column->fill_length= column->length;
182 if (column->null_bit)
183 options|= HA_OPTION_NULL_FIELDS;
184
185 reclength+= column->length;
186 type= column->type;
187 if (datafile_type == BLOCK_RECORD)
188 {
189 if (type == FIELD_SKIP_PRESPACE)
190 type= column->type= FIELD_NORMAL; /* SKIP_PRESPACE not supported */
191 if (type == FIELD_NORMAL &&
192 column->length > FULL_PAGE_SIZE2(maria_block_size,
193 crypt_page_header_space))
194 {
195 /* FIELD_NORMAL can't be split over many blocks, convert to a CHAR */
196 type= column->type= FIELD_SKIP_ENDSPACE;
197 }
198 }
199
200 if (type != FIELD_NORMAL && type != FIELD_CHECK)
201 {
202 column->empty_pos= packed/8;
203 column->empty_bit= (1 << (packed & 7));
204 if (type == FIELD_BLOB)
205 {
206 forced_packed= 1;
207 packed++;
208 share.base.blobs++;
209 if (pack_reclength != INT_MAX32)
210 {
211 if (column->length == 4+portable_sizeof_char_ptr)
212 pack_reclength= INT_MAX32;
213 else
214 {
215 /* Add max possible blob length */
216 pack_reclength+= (1 << ((column->length-
217 portable_sizeof_char_ptr)*8));
218 }
219 }
220 max_field_lengths+= (column->length - portable_sizeof_char_ptr);
221 }
222 else if (type == FIELD_SKIP_PRESPACE ||
223 type == FIELD_SKIP_ENDSPACE)
224 {
225 forced_packed= 1;
226 max_field_lengths+= column->length > 255 ? 2 : 1;
227 not_block_record_extra_length++;
228 packed++;
229 }
230 else if (type == FIELD_VARCHAR)
231 {
232 pack_reclength++;
233 not_block_record_extra_length++;
234 max_field_lengths++;
235 if (datafile_type != DYNAMIC_RECORD)
236 packed++;
237 column->fill_length= 1;
238 options|= HA_OPTION_NULL_FIELDS; /* Use ma_checksum() */
239
240 /* We must test for 257 as length includes pack-length */
241 if (MY_TEST(column->length >= 257))
242 {
243 long_varchar_count++;
244 max_field_lengths++;
245 column->fill_length= 2;
246 }
247 }
248 else if (type == FIELD_SKIP_ZERO)
249 packed++;
250 else
251 {
252 if (!column->null_bit)
253 min_pack_length+= column->length;
254 else
255 {
256 /* Only BLOCK_RECORD skips NULL fields for all field values */
257 not_block_record_extra_length+= column->length;
258 }
259 column->empty_pos= 0;
260 column->empty_bit= 0;
261 }
262 }
263 else /* FIELD_NORMAL */
264 {
265 if (!column->null_bit)
266 {
267 min_pack_length+= column->length;
268 share.base.fixed_not_null_fields++;
269 share.base.fixed_not_null_fields_length+= column->length;
270 }
271 else
272 not_block_record_extra_length+= column->length;
273 }
274 }
275
276 if (datafile_type == STATIC_RECORD && forced_packed)
277 {
278 /* Can't use fixed length records, revert to block records */
279 datafile_type= BLOCK_RECORD;
280 }
281
282 if (datafile_type == NO_RECORD && uniques)
283 {
284 /* Can't do unique without data, revert to block records */
285 datafile_type= BLOCK_RECORD;
286 }
287
288 if (encrypted)
289 {
290 /*
291 datafile_type is set (finally?)
292 update encryption that is only supported for BLOCK_RECORD
293 */
294 if (datafile_type != BLOCK_RECORD)
295 {
296 encrypted= FALSE;
297 crypt_page_header_space= 0;
298 }
299 }
300
301 if (datafile_type == DYNAMIC_RECORD)
302 options|= HA_OPTION_PACK_RECORD; /* Must use packed records */
303
304 if (datafile_type == STATIC_RECORD || datafile_type == NO_RECORD)
305 {
306 /* We can't use checksum with static length rows */
307 flags&= ~HA_CREATE_CHECKSUM;
308 options&= ~HA_OPTION_CHECKSUM;
309 min_pack_length= reclength;
310 packed= 0;
311 }
312 else if (datafile_type != BLOCK_RECORD)
313 min_pack_length+= not_block_record_extra_length;
314 else
315 min_pack_length+= 5; /* Min row overhead */
316
317 if (flags & HA_CREATE_TMP_TABLE)
318 {
319 options|= HA_OPTION_TMP_TABLE;
320 tmp_table= TRUE;
321 create_mode|= O_NOFOLLOW;
322 /* "CREATE TEMPORARY" tables are not crash-safe (dropped at restart) */
323 ci->transactional= FALSE;
324 flags&= ~HA_CREATE_PAGE_CHECKSUM;
325 }
326 share.base.null_bytes= ci->null_bytes;
327 share.base.original_null_bytes= ci->null_bytes;
328 share.base.born_transactional= ci->transactional;
329 share.base.max_field_lengths= max_field_lengths;
330 share.base.field_offsets= 0; /* for future */
331
332 if (flags & HA_CREATE_CHECKSUM || (options & HA_OPTION_CHECKSUM))
333 {
334 options|= HA_OPTION_CHECKSUM;
335 min_pack_length++;
336 pack_reclength++;
337 }
338 if (pack_reclength < INT_MAX32)
339 pack_reclength+= max_field_lengths + long_varchar_count;
340 else
341 pack_reclength= INT_MAX32;
342
343 if (flags & HA_CREATE_DELAY_KEY_WRITE)
344 options|= HA_OPTION_DELAY_KEY_WRITE;
345 if (flags & HA_CREATE_RELIES_ON_SQL_LAYER)
346 options|= HA_OPTION_RELIES_ON_SQL_LAYER;
347 if (flags & HA_CREATE_PAGE_CHECKSUM)
348 options|= HA_OPTION_PAGE_CHECKSUM;
349
350 pack_bytes= (packed + 7) / 8;
351 if (pack_reclength != INT_MAX32)
352 pack_reclength+= reclength+pack_bytes +
353 MY_TEST(test_all_bits(options, HA_OPTION_CHECKSUM |
354 HA_OPTION_PACK_RECORD));
355 min_pack_length+= pack_bytes;
356 /* Calculate min possible row length for rows-in-block */
357 extra_header_size= MAX_FIXED_HEADER_SIZE;
358 if (ci->transactional)
359 {
360 extra_header_size= TRANS_MAX_FIXED_HEADER_SIZE;
361 DBUG_PRINT("info",("creating a transactional table"));
362 }
363 share.base.min_block_length= (extra_header_size + share.base.null_bytes +
364 pack_bytes);
365 if (!ci->data_file_length && ci->max_rows)
366 {
367 set_if_bigger(ci->max_rows, ci->reloc_rows);
368 if (pack_reclength == INT_MAX32 ||
369 (~(ulonglong) 0)/ci->max_rows < (ulonglong) pack_reclength)
370 ci->data_file_length= ~(ulonglong) 0;
371 else
372 {
373 ci->data_file_length= _ma_safe_mul(ci->max_rows, pack_reclength);
374 if (datafile_type == BLOCK_RECORD)
375 {
376 /* Assume that blocks are only half full (very pessimistic!) */
377 ci->data_file_length= _ma_safe_mul(ci->data_file_length, 2);
378 set_if_bigger(ci->data_file_length, maria_block_size*2);
379 }
380 }
381 }
382 else if (!ci->max_rows)
383 {
384 if (datafile_type == BLOCK_RECORD)
385 {
386 uint rows_per_page=
387 ((maria_block_size - PAGE_OVERHEAD_SIZE_RAW - crypt_page_header_space)
388 / (min_pack_length + extra_header_size + DIR_ENTRY_SIZE));
389 ulonglong data_file_length= ci->data_file_length;
390 if (!data_file_length)
391 data_file_length= ((((ulonglong) 1 << ((BLOCK_RECORD_POINTER_SIZE-1) *
392 8))/2 -1) * maria_block_size);
393 if (rows_per_page > 0)
394 {
395 set_if_smaller(rows_per_page, MAX_ROWS_PER_PAGE);
396 ci->max_rows= (data_file_length / maria_block_size+1) * rows_per_page;
397 }
398 else
399 ci->max_rows= data_file_length / (min_pack_length +
400 extra_header_size +
401 DIR_ENTRY_SIZE);
402 }
403 else
404 ci->max_rows=(ha_rows) (ci->data_file_length/(min_pack_length +
405 ((options &
406 HA_OPTION_PACK_RECORD) ?
407 3 : 0)));
408 set_if_smaller(ci->reloc_rows, ci->max_rows);
409 }
410 max_rows= (ulonglong) ci->max_rows;
411 if (datafile_type == BLOCK_RECORD)
412 {
413 /*
414 The + 1 is for record position withing page
415 The * 2 is because we need one bit for knowing if there is transid's
416 after the row pointer
417 */
418 pointer= maria_get_pointer_length((ci->data_file_length /
419 maria_block_size) * 2, 4) + 1;
420 set_if_smaller(pointer, BLOCK_RECORD_POINTER_SIZE);
421
422 if (!max_rows)
423 max_rows= (((((ulonglong) 1 << ((pointer-1)*8)) -1) * maria_block_size) /
424 min_pack_length / 2);
425 }
426 else
427 {
428 if (datafile_type == NO_RECORD)
429 pointer= 0;
430 else if (datafile_type != STATIC_RECORD)
431 pointer= maria_get_pointer_length(ci->data_file_length,
432 maria_data_pointer_size);
433 else
434 pointer= maria_get_pointer_length(ci->max_rows, maria_data_pointer_size);
435 if (!max_rows)
436 max_rows= ((((ulonglong) 1 << (pointer*8)) -1) / min_pack_length);
437 }
438
439 real_reclength=reclength;
440 if (datafile_type == STATIC_RECORD)
441 {
442 if (reclength <= pointer)
443 reclength=pointer+1; /* reserve place for delete link */
444 }
445 else
446 reclength+= long_varchar_count; /* We need space for varchar! */
447
448 max_key_length=0; tot_length=0 ; key_segs=0;
449 fulltext_keys=0;
450 share.state.rec_per_key_part= rec_per_key_part;
451 share.state.nulls_per_key_part= nulls_per_key_part;
452 share.state.key_root=key_root;
453 share.state.key_del= HA_OFFSET_ERROR;
454 if (uniques)
455 max_key_length= MARIA_UNIQUE_HASH_LENGTH + pointer;
456
457 for (i=0, keydef=keydefs ; i < keys ; i++ , keydef++)
458 {
459 share.state.key_root[i]= HA_OFFSET_ERROR;
460 length= real_length_diff= 0;
461 min_key_length= key_length= pointer;
462
463 if (keydef->key_alg == HA_KEY_ALG_RTREE)
464 keydef->flag|= HA_RTREE_INDEX; /* For easier tests */
465
466 if (keydef->flag & HA_SPATIAL)
467 {
468#ifdef HAVE_SPATIAL
469 /* BAR TODO to support 3D and more dimensions in the future */
470 uint sp_segs=SPDIMS*2;
471 keydef->flag=HA_SPATIAL;
472
473 if (flags & HA_DONT_TOUCH_DATA)
474 {
475 /*
476 Called by maria_chk - i.e. table structure was taken from
477 MYI file and SPATIAL key *does have* additional sp_segs keysegs.
478 keydef->seg here points right at the GEOMETRY segment,
479 so we only need to decrease keydef->keysegs.
480 (see maria_recreate_table() in _ma_check.c)
481 */
482 keydef->keysegs-=sp_segs-1;
483 }
484
485 for (j=0, keyseg=keydef->seg ; (int) j < keydef->keysegs ;
486 j++, keyseg++)
487 {
488 if (keyseg->type != HA_KEYTYPE_BINARY &&
489 keyseg->type != HA_KEYTYPE_VARBINARY1 &&
490 keyseg->type != HA_KEYTYPE_VARBINARY2)
491 {
492 my_errno=HA_WRONG_CREATE_OPTION;
493 goto err_no_lock;
494 }
495 }
496 keydef->keysegs+=sp_segs;
497 key_length+=SPLEN*sp_segs;
498 length++; /* At least one length uchar */
499 min_key_length++;
500#else
501 my_errno= HA_ERR_UNSUPPORTED;
502 goto err_no_lock;
503#endif /*HAVE_SPATIAL*/
504 }
505 else if (keydef->flag & HA_FULLTEXT)
506 {
507 keydef->flag=HA_FULLTEXT | HA_PACK_KEY | HA_VAR_LENGTH_KEY;
508 options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
509
510 for (j=0, keyseg=keydef->seg ; (int) j < keydef->keysegs ;
511 j++, keyseg++)
512 {
513 if (keyseg->type != HA_KEYTYPE_TEXT &&
514 keyseg->type != HA_KEYTYPE_VARTEXT1 &&
515 keyseg->type != HA_KEYTYPE_VARTEXT2)
516 {
517 my_errno=HA_WRONG_CREATE_OPTION;
518 goto err_no_lock;
519 }
520 if (!(keyseg->flag & HA_BLOB_PART) &&
521 (keyseg->type == HA_KEYTYPE_VARTEXT1 ||
522 keyseg->type == HA_KEYTYPE_VARTEXT2))
523 {
524 /* Make a flag that this is a VARCHAR */
525 keyseg->flag|= HA_VAR_LENGTH_PART;
526 /* Store in bit_start number of bytes used to pack the length */
527 keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1)?
528 1 : 2);
529 }
530 }
531
532 fulltext_keys++;
533 key_length+= HA_FT_MAXBYTELEN+HA_FT_WLEN;
534 length++; /* At least one length uchar */
535 min_key_length+= 1 + HA_FT_WLEN;
536 real_length_diff=HA_FT_MAXBYTELEN-FT_MAX_WORD_LEN_FOR_SORT;
537 }
538 else
539 {
540 /* Test if prefix compression */
541 if (keydef->flag & HA_PACK_KEY)
542 {
543 /* Can't use space_compression on number keys */
544 if ((keydef->seg[0].flag & HA_SPACE_PACK) &&
545 keydef->seg[0].type == (int) HA_KEYTYPE_NUM)
546 keydef->seg[0].flag&= ~HA_SPACE_PACK;
547
548 /* Only use HA_PACK_KEY when first segment is a variable length key */
549 if (!(keydef->seg[0].flag & (HA_SPACE_PACK | HA_BLOB_PART |
550 HA_VAR_LENGTH_PART)))
551 {
552 /* pack relative to previous key */
553 keydef->flag&= ~HA_PACK_KEY;
554 keydef->flag|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
555 }
556 else
557 {
558 keydef->seg[0].flag|=HA_PACK_KEY; /* for easyer intern test */
559 keydef->flag|=HA_VAR_LENGTH_KEY;
560 options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
561 }
562 }
563 if (keydef->flag & HA_BINARY_PACK_KEY)
564 options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
565
566 if (keydef->flag & HA_AUTO_KEY && ci->with_auto_increment)
567 share.base.auto_key=i+1;
568 for (j=0, keyseg=keydef->seg ; j < keydef->keysegs ; j++, keyseg++)
569 {
570 /* numbers are stored with high by first to make compression easier */
571 switch (keyseg->type) {
572 case HA_KEYTYPE_SHORT_INT:
573 case HA_KEYTYPE_LONG_INT:
574 case HA_KEYTYPE_FLOAT:
575 case HA_KEYTYPE_DOUBLE:
576 case HA_KEYTYPE_USHORT_INT:
577 case HA_KEYTYPE_ULONG_INT:
578 case HA_KEYTYPE_LONGLONG:
579 case HA_KEYTYPE_ULONGLONG:
580 case HA_KEYTYPE_INT24:
581 case HA_KEYTYPE_UINT24:
582 case HA_KEYTYPE_INT8:
583 keyseg->flag|= HA_SWAP_KEY;
584 break;
585 case HA_KEYTYPE_VARTEXT1:
586 case HA_KEYTYPE_VARTEXT2:
587 case HA_KEYTYPE_VARBINARY1:
588 case HA_KEYTYPE_VARBINARY2:
589 if (!(keyseg->flag & HA_BLOB_PART))
590 {
591 /* Make a flag that this is a VARCHAR */
592 keyseg->flag|= HA_VAR_LENGTH_PART;
593 /* Store in bit_start number of bytes used to pack the length */
594 keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1 ||
595 keyseg->type == HA_KEYTYPE_VARBINARY1) ?
596 1 : 2);
597 }
598 break;
599 default:
600 break;
601 }
602 if (keyseg->flag & HA_SPACE_PACK)
603 {
604 DBUG_ASSERT(!(keyseg->flag & (HA_VAR_LENGTH_PART | HA_BLOB_PART)));
605 keydef->flag |= HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY;
606 options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
607 length++; /* At least one length uchar */
608 if (!keyseg->null_bit)
609 min_key_length++;
610 key_length+= keyseg->length;
611 if (keyseg->length >= 255)
612 {
613 /* prefix may be 3 bytes */
614 length+= 2;
615 }
616 }
617 else if (keyseg->flag & (HA_VAR_LENGTH_PART | HA_BLOB_PART))
618 {
619 DBUG_ASSERT(!test_all_bits(keyseg->flag,
620 (HA_VAR_LENGTH_PART | HA_BLOB_PART)));
621 keydef->flag|=HA_VAR_LENGTH_KEY;
622 length++; /* At least one length uchar */
623 if (!keyseg->null_bit)
624 min_key_length++;
625 options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
626 key_length+= keyseg->length;
627 if (keyseg->length >= 255)
628 {
629 /* prefix may be 3 bytes */
630 length+= 2;
631 }
632 }
633 else
634 {
635 key_length+= keyseg->length;
636 if (!keyseg->null_bit)
637 min_key_length+= keyseg->length;
638 }
639 if (keyseg->null_bit)
640 {
641 key_length++;
642 /* min key part is 1 byte */
643 min_key_length++;
644 options|=HA_OPTION_PACK_KEYS;
645 keyseg->flag|=HA_NULL_PART;
646 keydef->flag|=HA_VAR_LENGTH_KEY | HA_NULL_PART_KEY;
647 }
648 }
649 } /* if HA_FULLTEXT */
650 key_segs+=keydef->keysegs;
651 if (keydef->keysegs > HA_MAX_KEY_SEG)
652 {
653 my_errno=HA_WRONG_CREATE_OPTION;
654 goto err_no_lock;
655 }
656 /*
657 key_segs may be 0 in the case when we only want to be able to
658 add on row into the table. This can happen with some DISTINCT queries
659 in MySQL
660 */
661 if ((keydef->flag & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME &&
662 key_segs)
663 share.state.rec_per_key_part[key_segs-1]=1L;
664 length+=key_length;
665 /*
666 A key can't be longer than than half a index block (as we have
667 to be able to put at least 2 keys on an index block for the key
668 algorithms to work).
669 */
670 if (length > _ma_max_key_length())
671 {
672 my_errno=HA_WRONG_CREATE_OPTION;
673 goto err_no_lock;
674 }
675 keydef->block_length= (uint16) maria_block_size;
676 keydef->keylength= (uint16) key_length;
677 keydef->minlength= (uint16) min_key_length;
678 keydef->maxlength= (uint16) length;
679
680 if (length > max_key_length)
681 max_key_length= length;
682
683 tot_length= update_tot_length(tot_length, max_rows, length);
684 }
685
686 unique_key_parts=0;
687 for (i=0, uniquedef=uniquedefs ; i < uniques ; i++ , uniquedef++)
688 {
689 uniquedef->key=keys+i;
690 unique_key_parts+=uniquedef->keysegs;
691 share.state.key_root[keys+i]= HA_OFFSET_ERROR;
692
693 tot_length= update_tot_length(tot_length, max_rows, MARIA_UNIQUE_HASH_LENGTH + pointer);
694 }
695 keys+=uniques; /* Each unique has 1 key */
696 key_segs+=uniques; /* Each unique has 1 key seg */
697
698 base_pos=(MARIA_STATE_INFO_SIZE + keys * MARIA_STATE_KEY_SIZE +
699 key_segs * MARIA_STATE_KEYSEG_SIZE);
700 info_length= base_pos+(uint) (MARIA_BASE_INFO_SIZE+
701 keys * MARIA_KEYDEF_SIZE+
702 uniques * MARIA_UNIQUEDEF_SIZE +
703 (key_segs + unique_key_parts)*HA_KEYSEG_SIZE+
704 columns*(MARIA_COLUMNDEF_SIZE + 2));
705
706 if (encrypted)
707 {
708 share.base.extra_options|= MA_EXTRA_OPTIONS_ENCRYPTED;
709
710 /* store crypt data in info */
711 info_length+= ma_crypt_get_file_length();
712 }
713
714 if (insert_order)
715 {
716 share.base.extra_options|= MA_EXTRA_OPTIONS_INSERT_ORDER;
717 }
718
719 DBUG_PRINT("info", ("info_length: %u", info_length));
720 /* There are only 16 bits for the total header length. */
721 if (info_length > 65535)
722 {
723 my_printf_error(HA_WRONG_CREATE_OPTION,
724 "Aria table '%s' has too many columns and/or "
725 "indexes and/or unique constraints.",
726 MYF(0), name + dirname_length(name));
727 my_errno= HA_WRONG_CREATE_OPTION;
728 goto err_no_lock;
729 }
730
731 bmove(share.state.header.file_version, maria_file_magic, 4);
732 ci->old_options=options | (ci->old_options & HA_OPTION_TEMP_COMPRESS_RECORD ?
733 HA_OPTION_COMPRESS_RECORD |
734 HA_OPTION_TEMP_COMPRESS_RECORD: 0);
735 mi_int2store(share.state.header.options,ci->old_options);
736 mi_int2store(share.state.header.header_length,info_length);
737 mi_int2store(share.state.header.state_info_length,MARIA_STATE_INFO_SIZE);
738 mi_int2store(share.state.header.base_info_length,MARIA_BASE_INFO_SIZE);
739 mi_int2store(share.state.header.base_pos,base_pos);
740 share.state.header.data_file_type= share.data_file_type= datafile_type;
741 share.state.header.org_data_file_type= org_datafile_type;
742 share.state.header.not_used= 0;
743
744 share.state.dellink = HA_OFFSET_ERROR;
745 share.state.first_bitmap_with_space= 0;
746#ifdef MARIA_EXTERNAL_LOCKING
747 share.state.process= (ulong) getpid();
748#endif
749 share.state.version= (ulong) time((time_t*) 0);
750 share.state.sortkey= (ushort) ~0;
751 share.state.auto_increment=ci->auto_increment;
752 share.options=options;
753 share.base.rec_reflength=pointer;
754 share.base.block_size= maria_block_size;
755 share.base.language= (ci->language ? ci->language :
756 default_charset_info->number);
757
758 /*
759 Get estimate for index file length (this may be wrong for FT keys)
760 This is used for pointers to other key pages.
761 */
762 tmp= (tot_length / maria_block_size + keys * MARIA_INDEX_BLOCK_MARGIN);
763
764 /*
765 use maximum of key_file_length we calculated and key_file_length value we
766 got from MAI file header (see also mariapack.c:save_state)
767 */
768 share.base.key_reflength=
769 maria_get_pointer_length(MY_MAX(ci->key_file_length,tmp),3);
770 share.base.keys= share.state.header.keys= keys;
771 share.state.header.uniques= uniques;
772 share.state.header.fulltext_keys= fulltext_keys;
773 mi_int2store(share.state.header.key_parts,key_segs);
774 mi_int2store(share.state.header.unique_key_parts,unique_key_parts);
775
776 maria_set_all_keys_active(share.state.key_map, keys);
777
778 share.base.keystart = share.state.state.key_file_length=
779 MY_ALIGN(info_length, maria_block_size);
780 share.base.max_key_block_length= maria_block_size;
781 share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
782 share.base.records=ci->max_rows;
783 share.base.reloc= ci->reloc_rows;
784 share.base.reclength=real_reclength;
785 share.base.pack_reclength= reclength + MY_TEST(options & HA_OPTION_CHECKSUM);
786 share.base.max_pack_length=pack_reclength;
787 share.base.min_pack_length=min_pack_length;
788 share.base.pack_bytes= pack_bytes;
789 share.base.fields= columns;
790 share.base.pack_fields= packed;
791
792 if (share.data_file_type == BLOCK_RECORD)
793 {
794 /*
795 we are going to create a first bitmap page, set data_file_length
796 to reflect this, before the state goes to disk
797 */
798 share.state.state.data_file_length= maria_block_size;
799 /* Add length of packed fields + length */
800 share.base.pack_reclength+= share.base.max_field_lengths+3;
801 share.base.max_pack_length= share.base.pack_reclength;
802
803 /* Adjust max_pack_length, to be used if we have short rows */
804 if (share.base.max_pack_length < maria_block_size)
805 {
806 share.base.max_pack_length+= FLAG_SIZE;
807 if (ci->transactional)
808 share.base.max_pack_length+= TRANSID_SIZE * 2;
809 }
810 }
811
812 /* max_data_file_length and max_key_file_length are recalculated on open */
813 if (tmp_table)
814 share.base.max_data_file_length= (my_off_t) ci->data_file_length;
815 else if (ci->transactional && translog_status == TRANSLOG_OK &&
816 !maria_in_recovery)
817 {
818 /*
819 we have checked translog_inited above, because maria_chk may call us
820 (via maria_recreate_table()) and it does not have a log.
821 */
822 sync_dir= MY_SYNC_DIR;
823 /*
824 If crash between _ma_state_info_write_sub() and
825 _ma_update_state__lsns_sub(), table should be ignored by Recovery (or
826 old REDOs would fail), so we cannot let LSNs be 0:
827 */
828 share.state.skip_redo_lsn= share.state.is_of_horizon=
829 share.state.create_rename_lsn= LSN_MAX;
830 }
831
832 if (datafile_type == DYNAMIC_RECORD)
833 {
834 share.base.min_block_length=
835 (share.base.pack_reclength+3 < MARIA_EXTEND_BLOCK_LENGTH &&
836 ! share.base.blobs) ?
837 MY_MAX(share.base.pack_reclength,MARIA_MIN_BLOCK_LENGTH) :
838 MARIA_EXTEND_BLOCK_LENGTH;
839 }
840 else if (datafile_type == STATIC_RECORD)
841 share.base.min_block_length= share.base.pack_reclength;
842
843 if (! (flags & HA_DONT_TOUCH_DATA))
844 share.state.create_time= time((time_t*) 0);
845
846 if (!internal_table)
847 mysql_mutex_lock(&THR_LOCK_maria);
848
849 /*
850 NOTE: For test_if_reopen() we need a real path name. Hence we need
851 MY_RETURN_REAL_PATH for every fn_format(filename, ...).
852 */
853 if (ci->index_file_name)
854 {
855 char *iext= strrchr(ci->index_file_name, '.');
856 int have_iext= iext && !strcmp(iext, MARIA_NAME_IEXT);
857 if (tmp_table)
858 {
859 char *path;
860 /* chop off the table name, tempory tables use generated name */
861 if ((path= strrchr(ci->index_file_name, FN_LIBCHAR)))
862 *path= '\0';
863 fn_format(kfilename, name, ci->index_file_name, MARIA_NAME_IEXT,
864 MY_REPLACE_DIR | MY_UNPACK_FILENAME |
865 MY_RETURN_REAL_PATH | MY_APPEND_EXT);
866 }
867 else
868 {
869 fn_format(kfilename, ci->index_file_name, "", MARIA_NAME_IEXT,
870 MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH |
871 (have_iext ? MY_REPLACE_EXT : MY_APPEND_EXT));
872 }
873 fn_format(klinkname, name, "", MARIA_NAME_IEXT,
874 MY_UNPACK_FILENAME|MY_APPEND_EXT);
875 klinkname_ptr= klinkname;
876 /*
877 Don't create the table if the link or file exists to ensure that one
878 doesn't accidently destroy another table.
879 Don't sync dir now if the data file has the same path.
880 */
881 create_flag=
882 (ci->data_file_name &&
883 !strcmp(ci->index_file_name, ci->data_file_name)) ? 0 : sync_dir;
884 }
885 else
886 {
887 char *iext= strrchr(name, '.');
888 int have_iext= iext && !strcmp(iext, MARIA_NAME_IEXT);
889 fn_format(kfilename, name, "", MARIA_NAME_IEXT,
890 MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH |
891 (have_iext ? MY_REPLACE_EXT : MY_APPEND_EXT));
892 klinkname_ptr= NullS;
893 /*
894 Replace the current file.
895 Don't sync dir now if the data file has the same path.
896 */
897 create_flag= (flags & HA_CREATE_KEEP_FILES) ? 0 : MY_DELETE_OLD;
898 create_flag|= (!ci->data_file_name ? 0 : sync_dir);
899 }
900
901 /*
902 If a MRG_MARIA table is in use, the mapped MARIA tables are open,
903 but no entry is made in the table cache for them.
904 A TRUNCATE command checks for the table in the cache only and could
905 be fooled to believe, the table is not open.
906 Pull the emergency brake in this situation. (Bug #8306)
907
908
909 NOTE: The filename is compared against unique_file_name of every
910 open table. Hence we need a real path here.
911 */
912 if (!internal_table && _ma_test_if_reopen(kfilename))
913 {
914 my_printf_error(HA_ERR_TABLE_EXIST, "Aria table '%s' is in use "
915 "(most likely by a MERGE table). Try FLUSH TABLES.",
916 MYF(0), name + dirname_length(name));
917 my_errno= HA_ERR_TABLE_EXIST;
918 goto err;
919 }
920
921 if ((file= mysql_file_create_with_symlink(key_file_kfile, klinkname_ptr,
922 kfilename, 0, create_mode,
923 MYF(MY_WME|create_flag))) < 0)
924 goto err;
925 errpos=1;
926
927 DBUG_PRINT("info", ("write state info and base info"));
928 if (_ma_state_info_write_sub(file, &share.state,
929 MA_STATE_INFO_WRITE_FULL_INFO) ||
930 _ma_base_info_write(file, &share.base))
931 goto err;
932 DBUG_PRINT("info", ("base_pos: %d base_info_size: %d",
933 base_pos, MARIA_BASE_INFO_SIZE));
934 DBUG_ASSERT(mysql_file_tell(file,MYF(0)) == base_pos+ MARIA_BASE_INFO_SIZE);
935
936 /* Write key and keyseg definitions */
937 DBUG_PRINT("info", ("write key and keyseg definitions"));
938 for (i=0 ; i < share.base.keys - uniques; i++)
939 {
940 uint sp_segs=(keydefs[i].flag & HA_SPATIAL) ? 2*SPDIMS : 0;
941
942 if (_ma_keydef_write(file, &keydefs[i]))
943 goto err;
944 for (j=0 ; j < keydefs[i].keysegs-sp_segs ; j++)
945 if (_ma_keyseg_write(file, &keydefs[i].seg[j]))
946 goto err;
947#ifdef HAVE_SPATIAL
948 for (j=0 ; j < sp_segs ; j++)
949 {
950 HA_KEYSEG sseg;
951 sseg.type=SPTYPE;
952 sseg.language= 7; /* Binary */
953 sseg.null_bit=0;
954 sseg.bit_start=0;
955 sseg.bit_length= 0;
956 sseg.bit_pos= 0;
957 sseg.length=SPLEN;
958 sseg.null_pos=0;
959 sseg.start=j*SPLEN;
960 sseg.flag= HA_SWAP_KEY;
961 if (_ma_keyseg_write(file, &sseg))
962 goto err;
963 }
964#endif
965 }
966 /* Create extra keys for unique definitions */
967 offset= real_reclength - uniques*MARIA_UNIQUE_HASH_LENGTH;
968 bzero((char*) &tmp_keydef,sizeof(tmp_keydef));
969 bzero((char*) &tmp_keyseg,sizeof(tmp_keyseg));
970 for (i=0; i < uniques ; i++)
971 {
972 tmp_keydef.keysegs=1;
973 tmp_keydef.flag= HA_UNIQUE_CHECK;
974 tmp_keydef.block_length= (uint16) maria_block_size;
975 tmp_keydef.keylength= MARIA_UNIQUE_HASH_LENGTH + pointer;
976 tmp_keydef.minlength=tmp_keydef.maxlength=tmp_keydef.keylength;
977 tmp_keyseg.type= MARIA_UNIQUE_HASH_TYPE;
978 tmp_keyseg.length= MARIA_UNIQUE_HASH_LENGTH;
979 tmp_keyseg.start= offset;
980 offset+= MARIA_UNIQUE_HASH_LENGTH;
981 if (_ma_keydef_write(file,&tmp_keydef) ||
982 _ma_keyseg_write(file,(&tmp_keyseg)))
983 goto err;
984 }
985
986 /* Save unique definition */
987 DBUG_PRINT("info", ("write unique definitions"));
988 for (i=0 ; i < share.state.header.uniques ; i++)
989 {
990 HA_KEYSEG *keyseg_end;
991 keyseg= uniquedefs[i].seg;
992 if (_ma_uniquedef_write(file, &uniquedefs[i]))
993 goto err;
994 for (keyseg= uniquedefs[i].seg, keyseg_end= keyseg+ uniquedefs[i].keysegs;
995 keyseg < keyseg_end;
996 keyseg++)
997 {
998 switch (keyseg->type) {
999 case HA_KEYTYPE_VARTEXT1:
1000 case HA_KEYTYPE_VARTEXT2:
1001 case HA_KEYTYPE_VARBINARY1:
1002 case HA_KEYTYPE_VARBINARY2:
1003 if (!(keyseg->flag & HA_BLOB_PART))
1004 {
1005 keyseg->flag|= HA_VAR_LENGTH_PART;
1006 keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1 ||
1007 keyseg->type == HA_KEYTYPE_VARBINARY1) ?
1008 1 : 2);
1009 }
1010 break;
1011 default:
1012 DBUG_ASSERT((keyseg->flag & HA_VAR_LENGTH_PART) == 0);
1013 break;
1014 }
1015 if (_ma_keyseg_write(file, keyseg))
1016 goto err;
1017 }
1018 }
1019 DBUG_PRINT("info", ("write field definitions"));
1020 if (datafile_type == BLOCK_RECORD)
1021 {
1022 /* Store columns in a more efficent order */
1023 MARIA_COLUMNDEF **col_order, **pos;
1024 if (!(col_order= (MARIA_COLUMNDEF**) my_malloc(share.base.fields *
1025 sizeof(MARIA_COLUMNDEF*),
1026 MYF(MY_WME))))
1027 goto err;
1028 for (column= columndef, pos= col_order ;
1029 column != end_column ;
1030 column++, pos++)
1031 *pos= column;
1032 qsort(col_order, share.base.fields, sizeof(*col_order),
1033 (qsort_cmp) compare_columns);
1034 for (i=0 ; i < share.base.fields ; i++)
1035 {
1036 column_array[col_order[i]->column_nr]= i;
1037 if (_ma_columndef_write(file, col_order[i]))
1038 {
1039 my_free(col_order);
1040 goto err;
1041 }
1042 }
1043 my_free(col_order);
1044 }
1045 else
1046 {
1047 for (i=0 ; i < share.base.fields ; i++)
1048 {
1049 column_array[i]= (uint16) i;
1050 if (_ma_columndef_write(file, &columndef[i]))
1051 goto err;
1052 }
1053 }
1054 if (_ma_column_nr_write(file, column_array, columns))
1055 goto err;
1056
1057 if (encrypted)
1058 {
1059 if (ma_crypt_create(&share) ||
1060 ma_crypt_write(&share, file))
1061 goto err;
1062 }
1063
1064 if ((kfile_size_before_extension= mysql_file_tell(file,MYF(0))) == MY_FILEPOS_ERROR)
1065 goto err;
1066#ifndef DBUG_OFF
1067 if (kfile_size_before_extension != info_length)
1068 DBUG_PRINT("warning",("info_length: %u != used_length: %u",
1069 info_length, (uint)kfile_size_before_extension));
1070#endif
1071
1072 if (sync_dir)
1073 {
1074 /*
1075 we log the first bytes and then the size to which we extend; this is
1076 not log 1 KB of mostly zeroes if this is a small table.
1077 */
1078 char empty_string[]= "";
1079 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
1080 translog_size_t total_rec_length= 0;
1081 uint k;
1082 LSN lsn;
1083 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= 1 + 2 + 2 +
1084 (uint) kfile_size_before_extension;
1085 /* we are needing maybe 64 kB, so don't use the stack */
1086 log_data= my_malloc(log_array[TRANSLOG_INTERNAL_PARTS + 1].length, MYF(0));
1087 if ((log_data == NULL) ||
1088 mysql_file_pread(file, 1 + 2 + 2 + log_data,
1089 (size_t) kfile_size_before_extension, 0, MYF(MY_NABP)))
1090 goto err;
1091 /*
1092 remember if the data file was created or not, to know if Recovery can
1093 do it or not, in the future
1094 */
1095 log_data[0]= MY_TEST(flags & HA_DONT_TOUCH_DATA);
1096 int2store(log_data + 1, kfile_size_before_extension);
1097 int2store(log_data + 1 + 2, share.base.keystart);
1098 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (uchar *)name;
1099 /* we store the end-zero, for Recovery to just pass it to my_create() */
1100 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= strlen(name) + 1;
1101 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_data;
1102 /* symlink description is also needed for re-creation by Recovery: */
1103 {
1104 const char *s= ci->data_file_name ? ci->data_file_name : empty_string;
1105 log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (uchar*)s;
1106 log_array[TRANSLOG_INTERNAL_PARTS + 2].length= strlen(s) + 1;
1107 s= ci->index_file_name ? ci->index_file_name : empty_string;
1108 log_array[TRANSLOG_INTERNAL_PARTS + 3].str= (uchar*)s;
1109 log_array[TRANSLOG_INTERNAL_PARTS + 3].length= strlen(s) + 1;
1110 }
1111 for (k= TRANSLOG_INTERNAL_PARTS;
1112 k < (sizeof(log_array)/sizeof(log_array[0])); k++)
1113 total_rec_length+= (translog_size_t) log_array[k].length;
1114 /**
1115 For this record to be of any use for Recovery, we need the upper
1116 MySQL layer to be crash-safe, which it is not now (that would require
1117 work using the ddl_log of sql/sql_table.cc); when it is, we should
1118 reconsider the moment of writing this log record (before or after op,
1119 under THR_LOCK_maria or not...), how to use it in Recovery.
1120 For now this record can serve when we apply logs to a backup,
1121 so we sync it. This happens before the data file is created. If the
1122 data file was created before, and we crashed before writing the log
1123 record, at restart the table may be used, so we would not have a
1124 trustable history in the log (impossible to apply this log to a
1125 backup). The way we do it, if we crash before writing the log record
1126 then there is no data file and the table cannot be used.
1127 @todo Note that in case of TRUNCATE TABLE we also come here; for
1128 Recovery to be able to finish TRUNCATE TABLE, instead of leaving a
1129 half-truncated table, we should log the record at start of
1130 maria_create(); for that we shouldn't write to the index file but to a
1131 buffer (DYNAMIC_STRING), put the buffer into the record, then put the
1132 buffer into the index file (so, change _ma_keydef_write() etc). That
1133 would also enable Recovery to finish a CREATE TABLE. The final result
1134 would be that we would be able to finish what the SQL layer has asked
1135 for: it would be atomic.
1136 When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not
1137 called external_lock(), so have no TRN. It does not matter, as all
1138 these operations are non-transactional and sync their files.
1139 */
1140 if (unlikely(translog_write_record(&lsn,
1141 LOGREC_REDO_CREATE_TABLE,
1142 &dummy_transaction_object, NULL,
1143 total_rec_length,
1144 sizeof(log_array)/sizeof(log_array[0]),
1145 log_array, NULL, NULL) ||
1146 translog_flush(lsn)))
1147 goto err;
1148 share.kfile.file= file;
1149 DBUG_EXECUTE_IF("maria_flush_whole_log",
1150 {
1151 DBUG_PRINT("maria_flush_whole_log", ("now"));
1152 translog_flush(translog_get_horizon());
1153 });
1154 DBUG_EXECUTE_IF("maria_crash_create_table",
1155 {
1156 DBUG_PRINT("maria_crash_create_table", ("now"));
1157 DBUG_SUICIDE();
1158 });
1159 /*
1160 store LSN into file, needed for Recovery to not be confused if a
1161 DROP+CREATE happened (applying REDOs to the wrong table).
1162 */
1163 if (_ma_update_state_lsns_sub(&share, lsn, trnman_get_min_safe_trid(),
1164 FALSE, TRUE))
1165 goto err;
1166 my_free(log_data);
1167 }
1168
1169 if (!(flags & HA_DONT_TOUCH_DATA))
1170 {
1171 if (ci->data_file_name)
1172 {
1173 char *dext= strrchr(ci->data_file_name, '.');
1174 int have_dext= dext && !strcmp(dext, MARIA_NAME_DEXT);
1175
1176 if (tmp_table)
1177 {
1178 char *path;
1179 /* chop off the table name, tempory tables use generated name */
1180 if ((path= strrchr(ci->data_file_name, FN_LIBCHAR)))
1181 *path= '\0';
1182 fn_format(dfilename, name, ci->data_file_name, MARIA_NAME_DEXT,
1183 MY_REPLACE_DIR | MY_UNPACK_FILENAME | MY_APPEND_EXT);
1184 }
1185 else
1186 {
1187 fn_format(dfilename, ci->data_file_name, "", MARIA_NAME_DEXT,
1188 MY_UNPACK_FILENAME |
1189 (have_dext ? MY_REPLACE_EXT : MY_APPEND_EXT));
1190 }
1191 fn_format(dlinkname, name, "",MARIA_NAME_DEXT,
1192 MY_UNPACK_FILENAME | MY_APPEND_EXT);
1193 dlinkname_ptr= dlinkname;
1194 create_flag=0;
1195 }
1196 else
1197 {
1198 fn_format(dfilename,name,"", MARIA_NAME_DEXT,
1199 MY_UNPACK_FILENAME | MY_APPEND_EXT);
1200 create_flag= (flags & HA_CREATE_KEEP_FILES) ? 0 : MY_DELETE_OLD;
1201 }
1202 if ((dfile=
1203 mysql_file_create_with_symlink(key_file_dfile, dlinkname_ptr,
1204 dfilename, 0, create_mode,
1205 MYF(MY_WME | create_flag | sync_dir))) < 0)
1206 goto err;
1207 errpos=3;
1208
1209 if (_ma_initialize_data_file(&share, dfile))
1210 goto err;
1211 }
1212
1213 /* Enlarge files */
1214 DBUG_PRINT("info", ("enlarge to keystart: %lu",
1215 (ulong) share.base.keystart));
1216 if (mysql_file_chsize(file,(ulong) share.base.keystart,0,MYF(0)))
1217 goto err;
1218
1219 if (!internal_table && sync_dir && mysql_file_sync(file, MYF(0)))
1220 goto err;
1221
1222 if (! (flags & HA_DONT_TOUCH_DATA))
1223 {
1224#ifdef USE_RELOC
1225 if (mysql_file_chsize(key_file_dfile, dfile,
1226 share.base.min_pack_length*ci->reloc_rows,0,MYF(0)))
1227 goto err;
1228#endif
1229 if (!internal_table && sync_dir && mysql_file_sync(dfile, MYF(0)))
1230 goto err;
1231 if (mysql_file_close(dfile,MYF(0)))
1232 goto err;
1233 }
1234 if (!internal_table)
1235 mysql_mutex_unlock(&THR_LOCK_maria);
1236 res= 0;
1237 my_free((char*) rec_per_key_part);
1238 ma_crypt_free(&share);
1239 errpos=0;
1240 if (mysql_file_close(file,MYF(0)))
1241 res= my_errno;
1242 DBUG_RETURN(res);
1243
1244err:
1245 if (!internal_table)
1246 mysql_mutex_unlock(&THR_LOCK_maria);
1247
1248err_no_lock:
1249 save_errno=my_errno;
1250 switch (errpos) {
1251 case 3:
1252 mysql_file_close(dfile, MYF(0));
1253 if (! (flags & HA_DONT_TOUCH_DATA))
1254 {
1255 mysql_file_delete(key_file_dfile, dfilename, MYF(sync_dir));
1256 if (dlinkname_ptr)
1257 mysql_file_delete(key_file_dfile, dlinkname_ptr, MYF(sync_dir));
1258 }
1259 /* fall through */
1260 case 1:
1261 mysql_file_close(file, MYF(0));
1262 if (! (flags & HA_DONT_TOUCH_DATA))
1263 {
1264 mysql_file_delete(key_file_kfile, kfilename, MYF(sync_dir));
1265 if (klinkname_ptr)
1266 mysql_file_delete(key_file_kfile, klinkname_ptr, MYF(sync_dir));
1267 }
1268 }
1269 ma_crypt_free(&share);
1270 my_free(log_data);
1271 my_free(rec_per_key_part);
1272 DBUG_RETURN(my_errno=save_errno); /* return the fatal errno */
1273}
1274
1275
1276uint maria_get_pointer_length(ulonglong file_length, uint def)
1277{
1278 DBUG_ASSERT(def >= 2 && def <= 7);
1279 if (file_length) /* If not default */
1280 {
1281#ifdef NOT_YET_READY_FOR_8_BYTE_POINTERS
1282 if (file_length >= (1ULL << 56))
1283 def=8;
1284 else
1285#endif
1286 if (file_length >= (1ULL << 48))
1287 def=7;
1288 else if (file_length >= (1ULL << 40))
1289 def=6;
1290 else if (file_length >= (1ULL << 32))
1291 def=5;
1292 else if (file_length >= (1ULL << 24))
1293 def=4;
1294 else if (file_length >= (1ULL << 16))
1295 def=3;
1296 else
1297 def=2;
1298 }
1299 return def;
1300}
1301
1302
1303/*
1304 Sort columns for records-in-block
1305
1306 IMPLEMENTATION
1307 Sort columns in following order:
1308
1309 Fixed size, not null columns
1310 Fixed length, null fields
1311 Numbers (zero fill fields)
1312 Variable length fields (CHAR, VARCHAR) according to length
1313 Blobs
1314
1315 For same kind of fields, keep fields in original order
1316*/
1317
1318static inline int sign(long a)
1319{
1320 return a < 0 ? -1 : (a > 0 ? 1 : 0);
1321}
1322
1323
1324static int compare_columns(MARIA_COLUMNDEF **a_ptr, MARIA_COLUMNDEF **b_ptr)
1325{
1326 MARIA_COLUMNDEF *a= *a_ptr, *b= *b_ptr;
1327 enum en_fieldtype a_type, b_type;
1328
1329 a_type= (a->type == FIELD_CHECK) ? FIELD_NORMAL : a->type;
1330 b_type= (b->type == FIELD_CHECK) ? FIELD_NORMAL : b->type;
1331
1332 if (a_type == FIELD_NORMAL && !a->null_bit)
1333 {
1334 if (b_type != FIELD_NORMAL || b->null_bit)
1335 return -1;
1336 return sign((long) a->offset - (long) b->offset);
1337 }
1338 if (b_type == FIELD_NORMAL && !b->null_bit)
1339 return 1;
1340 if (a_type == b_type)
1341 return sign((long) a->offset - (long) b->offset);
1342 if (a_type == FIELD_NORMAL)
1343 return -1;
1344 if (b_type == FIELD_NORMAL)
1345 return 1;
1346 if (a_type == FIELD_SKIP_ZERO)
1347 return -1;
1348 if (b_type == FIELD_SKIP_ZERO)
1349 return 1;
1350 if (a->type != FIELD_BLOB && b->type != FIELD_BLOB)
1351 if (a->length != b->length)
1352 return sign((long) a->length - (long) b->length);
1353 if (a_type == FIELD_BLOB)
1354 return 1;
1355 if (b_type == FIELD_BLOB)
1356 return -1;
1357 return sign((long) a->offset - (long) b->offset);
1358}
1359
1360
1361/**
1362 @brief Initialize data file
1363
1364 @note
1365 In BLOCK_RECORD, a freshly created datafile is one page long; while in
1366 other formats it is 0-byte long.
1367 */
1368
1369int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
1370{
1371 if (share->data_file_type == BLOCK_RECORD)
1372 {
1373 share->bitmap.block_size= share->base.block_size;
1374 share->bitmap.file.file = dfile;
1375 return _ma_bitmap_create_first(share);
1376 }
1377 return 0;
1378}
1379
1380
1381/**
1382 @brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
1383 can force.
1384
1385 This is for special cases where:
1386 - we don't want to write the full state to disk (so, not call
1387 _ma_state_info_write()) because some parts of the state may be
1388 currently inconsistent, or because it would be overkill
1389 - we must sync these LSNs immediately for correctness.
1390 It acquires intern_lock to protect the LSNs and state write.
1391
1392 @param share table's share
1393 @param lsn LSN to write to log files
1394 @param create_trid Trid to be used as state.create_trid
1395 @param do_sync if the write should be forced to disk
1396 @param update_create_rename_lsn if this LSN should be updated or not
1397
1398 @return Operation status
1399 @retval 0 ok
1400 @retval 1 error (disk problem)
1401*/
1402
1403int _ma_update_state_lsns(MARIA_SHARE *share, LSN lsn, TrID create_trid,
1404 my_bool do_sync, my_bool update_create_rename_lsn)
1405{
1406 int res;
1407 DBUG_ENTER("_ma_update_state_lsns");
1408 mysql_mutex_lock(&share->intern_lock);
1409 res= _ma_update_state_lsns_sub(share, lsn, create_trid, do_sync,
1410 update_create_rename_lsn);
1411 mysql_mutex_unlock(&share->intern_lock);
1412 DBUG_RETURN(res);
1413}
1414
1415
1416/**
1417 @brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
1418 can force.
1419
1420 Shortcut of _ma_update_state_lsns() when we know that intern_lock is not
1421 needed (when creating a table or opening it for the first time).
1422
1423 @param share table's share
1424 @param lsn LSN to write to state; if LSN_IMPOSSIBLE, write
1425 a LOGREC_IMPORTED_TABLE and use its LSN as lsn.
1426 @param create_trid Trid to be used as state.create_trid
1427 @param do_sync if the write should be forced to disk
1428 @param update_create_rename_lsn if this LSN should be updated or not
1429
1430 @return Operation status
1431 @retval 0 ok
1432 @retval 1 error (disk problem)
1433*/
1434
1435#if defined(_MSC_VER) && (_MSC_VER == 1310)
1436/*
1437 Visual Studio 2003 compiler produces internal compiler error
1438 in this function. Disable optimizations to workaround.
1439*/
1440#pragma optimize("",off)
1441#endif
1442int _ma_update_state_lsns_sub(MARIA_SHARE *share, LSN lsn, TrID create_trid,
1443 my_bool do_sync,
1444 my_bool update_create_rename_lsn)
1445{
1446 uchar buf[LSN_STORE_SIZE * 3], *ptr;
1447 uchar trid_buff[8];
1448 File file= share->kfile.file;
1449 DBUG_ASSERT(file >= 0);
1450
1451 if (lsn == LSN_IMPOSSIBLE)
1452 {
1453 int res;
1454 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
1455 /* table name is logged only for information */
1456 log_array[TRANSLOG_INTERNAL_PARTS + 0].str=
1457 (uchar *)(share->open_file_name.str);
1458 log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
1459 share->open_file_name.length + 1;
1460 if ((res= translog_write_record(&lsn, LOGREC_IMPORTED_TABLE,
1461 &dummy_transaction_object, NULL,
1462 (translog_size_t)
1463 log_array[TRANSLOG_INTERNAL_PARTS +
1464 0].length,
1465 sizeof(log_array)/sizeof(log_array[0]),
1466 log_array, NULL, NULL)))
1467 return res;
1468 }
1469
1470 for (ptr= buf; ptr < (buf + sizeof(buf)); ptr+= LSN_STORE_SIZE)
1471 lsn_store(ptr, lsn);
1472 share->state.skip_redo_lsn= share->state.is_of_horizon= lsn;
1473 share->state.create_trid= create_trid;
1474 mi_int8store(trid_buff, create_trid);
1475
1476 /*
1477 Update create_rename_lsn if update was requested or if the old one had an
1478 impossible value.
1479 */
1480 if (update_create_rename_lsn ||
1481 (share->state.create_rename_lsn > lsn && lsn != LSN_IMPOSSIBLE))
1482 {
1483 share->state.create_rename_lsn= lsn;
1484 if (share->id != 0)
1485 {
1486 /*
1487 If OP is the operation which is calling us, if table is later written,
1488 we could see in the log:
1489 FILE_ID ... REDO_OP ... REDO_INSERT.
1490 (that can happen in real life at least with OP=REPAIR).
1491 As FILE_ID will be ignored by Recovery because it is <
1492 create_rename_lsn, REDO_INSERT would be ignored too, wrongly.
1493 To avoid that, we force a LOGREC_FILE_ID to be logged at next write:
1494 */
1495 translog_deassign_id_from_share(share);
1496 }
1497 }
1498 else
1499 lsn_store(buf, share->state.create_rename_lsn);
1500 return (my_pwrite(file, buf, sizeof(buf),
1501 sizeof(share->state.header) +
1502 MARIA_FILE_CREATE_RENAME_LSN_OFFSET, MYF(MY_NABP)) ||
1503 my_pwrite(file, trid_buff, sizeof(trid_buff),
1504 sizeof(share->state.header) +
1505 MARIA_FILE_CREATE_TRID_OFFSET, MYF(MY_NABP)) ||
1506 (do_sync && mysql_file_sync(file, MYF(0))));
1507}
1508#if defined(_MSC_VER) && (_MSC_VER == 1310)
1509#pragma optimize("",on)
1510#endif /*VS2003 compiler bug workaround*/
1511