1/* Copyright (c) 2000, 2013, Oracle and/or its affiliates
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16/* Write a row to a MyISAM table */
17
18#include "fulltext.h"
19#include "rt_index.h"
20
21#define MAX_POINTER_LENGTH 8
22
23 /* Functions declared in this file */
24
25static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
26 uint comp_flag, uchar *key,
27 uint key_length, my_off_t pos, uchar *father_buff,
28 uchar *father_keypos, my_off_t father_page,
29 my_bool insert_last);
30static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,uchar *key,
31 uchar *curr_buff,uchar *father_buff,
32 uchar *father_keypos,my_off_t father_page);
33static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
34 uchar *key, uint *return_key_length,
35 uchar **after_key);
36int _mi_ck_write_tree(register MI_INFO *info, uint keynr,uchar *key,
37 uint key_length);
38int _mi_ck_write_btree(register MI_INFO *info, uint keynr,uchar *key,
39 uint key_length);
40
41 /* Write new record to database */
42
43int mi_write(MI_INFO *info, uchar *record)
44{
45 MYISAM_SHARE *share=info->s;
46 uint i;
47 int save_errno;
48 my_off_t filepos;
49 uchar *buff;
50 my_bool lock_tree= share->concurrent_insert;
51 DBUG_ENTER("mi_write");
52 DBUG_PRINT("enter",("isam: %d data: %d",info->s->kfile,info->dfile));
53
54 DBUG_EXECUTE_IF("myisam_pretend_crashed_table_on_usage",
55 mi_print_error(info->s, HA_ERR_CRASHED);
56 DBUG_RETURN(my_errno= HA_ERR_CRASHED););
57
58 /* it's always a bug to try to write a record with the deleted flag set */
59 DBUG_ASSERT(info->s->data_file_type != STATIC_RECORD || *record);
60
61 if (share->options & HA_OPTION_READ_ONLY_DATA)
62 {
63 DBUG_RETURN(my_errno=EACCES);
64 }
65 if (_mi_readinfo(info,F_WRLCK,1))
66 DBUG_RETURN(my_errno);
67
68 filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
69 !info->append_insert_at_end) ?
70 share->state.dellink :
71 info->state->data_file_length);
72
73 if (share->base.reloc == (ha_rows) 1 &&
74 share->base.records == (ha_rows) 1 &&
75 info->state->records == (ha_rows) 1)
76 { /* System file */
77 my_errno=HA_ERR_RECORD_FILE_FULL;
78 goto err2;
79 }
80 if (info->state->key_file_length >= share->base.margin_key_file_length)
81 {
82 my_errno=HA_ERR_INDEX_FILE_FULL;
83 goto err2;
84 }
85 if (_mi_mark_file_changed(info))
86 goto err2;
87
88 /* Calculate and check all unique constraints */
89 for (i=0 ; i < share->state.header.uniques ; i++)
90 {
91 MI_UNIQUEDEF *def= share->uniqueinfo + i;
92 if (mi_is_key_active(share->state.key_map, def->key) &&
93 mi_check_unique(info, def, record, mi_unique_hash(def, record),
94 HA_OFFSET_ERROR))
95 goto err2;
96 }
97
98 /* Write all keys to indextree */
99
100 buff=info->lastkey2;
101 for (i=0 ; i < share->base.keys ; i++)
102 {
103 if (mi_is_key_active(share->state.key_map, i))
104 {
105 my_bool local_lock_tree= (lock_tree &&
106 !(info->bulk_insert &&
107 is_tree_inited(&info->bulk_insert[i])));
108 if (local_lock_tree)
109 {
110 mysql_rwlock_wrlock(&share->key_root_lock[i]);
111 share->keyinfo[i].version++;
112 }
113 if (share->keyinfo[i].flag & HA_FULLTEXT )
114 {
115 if (_mi_ft_add(info,i, buff, record, filepos))
116 {
117 if (local_lock_tree)
118 mysql_rwlock_unlock(&share->key_root_lock[i]);
119 DBUG_PRINT("error",("Got error: %d on write",my_errno));
120 goto err;
121 }
122 }
123 else
124 {
125 if (share->keyinfo[i].ck_insert(info,i,buff,
126 _mi_make_key(info,i,buff,record,filepos)))
127 {
128 if (local_lock_tree)
129 mysql_rwlock_unlock(&share->key_root_lock[i]);
130 DBUG_PRINT("error",("Got error: %d on write",my_errno));
131 goto err;
132 }
133 }
134
135 /* The above changed info->lastkey2. Inform mi_rnext_same(). */
136 info->update&= ~HA_STATE_RNEXT_SAME;
137
138 if (local_lock_tree)
139 mysql_rwlock_unlock(&share->key_root_lock[i]);
140 }
141 }
142 if (share->calc_checksum)
143 info->checksum=(*share->calc_checksum)(info,record);
144 if (!(info->opt_flag & OPT_NO_ROWS))
145 {
146 if ((*share->write_record)(info,record))
147 goto err;
148 info->state->checksum+=info->checksum;
149 }
150 if (share->base.auto_key)
151 set_if_bigger(info->s->state.auto_increment,
152 retrieve_auto_increment(info, record));
153 info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
154 HA_STATE_ROW_CHANGED);
155 info->state->records++;
156 info->lastpos=filepos;
157 myisam_log_record(MI_LOG_WRITE,info,record,filepos,0);
158 (void) _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
159 if (info->invalidator != 0)
160 {
161 DBUG_PRINT("info", ("invalidator... '%s' (update)", info->filename));
162 (*info->invalidator)(info->filename);
163 info->invalidator=0;
164 }
165
166 /*
167 Update status of the table. We need to do so after each row write
168 for the log tables, as we want the new row to become visible to
169 other threads as soon as possible. We don't lock mutex here
170 (as it is required by pthread memory visibility rules) as (1) it's
171 not critical to use outdated share->is_log_table value (2) locking
172 mutex here for every write is too expensive.
173 */
174 if (share->is_log_table)
175 mi_update_status((void*) info);
176
177 DBUG_RETURN(0);
178
179err:
180 save_errno=my_errno;
181 if (my_errno == HA_ERR_FOUND_DUPP_KEY || my_errno == HA_ERR_RECORD_FILE_FULL ||
182 my_errno == HA_ERR_NULL_IN_SPATIAL || my_errno == HA_ERR_OUT_OF_MEM)
183 {
184 if (info->bulk_insert)
185 {
186 uint j;
187 for (j=0 ; j < share->base.keys ; j++)
188 mi_flush_bulk_insert(info, j);
189 }
190 info->errkey= (int) i;
191 while ( i-- > 0)
192 {
193 if (mi_is_key_active(share->state.key_map, i))
194 {
195 my_bool local_lock_tree= (lock_tree &&
196 !(info->bulk_insert &&
197 is_tree_inited(&info->bulk_insert[i])));
198 if (local_lock_tree)
199 mysql_rwlock_wrlock(&share->key_root_lock[i]);
200 if (share->keyinfo[i].flag & HA_FULLTEXT)
201 {
202 if (_mi_ft_del(info,i, buff,record,filepos))
203 {
204 if (local_lock_tree)
205 mysql_rwlock_unlock(&share->key_root_lock[i]);
206 break;
207 }
208 }
209 else
210 {
211 uint key_length=_mi_make_key(info,i,buff,record,filepos);
212 if (share->keyinfo[i].ck_delete(info, i, buff, key_length))
213 {
214 if (local_lock_tree)
215 mysql_rwlock_unlock(&share->key_root_lock[i]);
216 break;
217 }
218 }
219 if (local_lock_tree)
220 mysql_rwlock_unlock(&share->key_root_lock[i]);
221 }
222 }
223 }
224 else
225 {
226 mi_print_error(info->s, HA_ERR_CRASHED);
227 mi_mark_crashed(info);
228 }
229 info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
230 my_errno=save_errno;
231err2:
232 save_errno=my_errno;
233 myisam_log_record(MI_LOG_WRITE,info,record,filepos,my_errno);
234 (void) _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
235 DBUG_RETURN(my_errno=save_errno);
236} /* mi_write */
237
238
239 /* Write one key to btree */
240
241int _mi_ck_write(MI_INFO *info, uint keynr, uchar *key, uint key_length)
242{
243 DBUG_ENTER("_mi_ck_write");
244
245 if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
246 {
247 DBUG_RETURN(_mi_ck_write_tree(info, keynr, key, key_length));
248 }
249 else
250 {
251 DBUG_RETURN(_mi_ck_write_btree(info, keynr, key, key_length));
252 }
253} /* _mi_ck_write */
254
255
256/**********************************************************************
257 * Normal insert code *
258 **********************************************************************/
259
260int _mi_ck_write_btree(register MI_INFO *info, uint keynr, uchar *key,
261 uint key_length)
262{
263 int error;
264 uint comp_flag;
265 MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
266 my_off_t *root=&info->s->state.key_root[keynr];
267 DBUG_ENTER("_mi_ck_write_btree");
268
269 if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
270 comp_flag=SEARCH_BIGGER; /* Put after same key */
271 else if (keyinfo->flag & (HA_NOSAME|HA_FULLTEXT))
272 {
273 comp_flag=SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT; /* No duplicates */
274 if (keyinfo->flag & HA_NULL_ARE_EQUAL)
275 comp_flag|= SEARCH_NULL_ARE_EQUAL;
276 }
277 else
278 comp_flag=SEARCH_SAME; /* Keys in rec-pos order */
279
280 error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
281 root, comp_flag);
282 if (info->ft1_to_ft2)
283 {
284 if (!error)
285 error= _mi_ft_convert_to_ft2(info, keynr, key);
286 delete_dynamic(info->ft1_to_ft2);
287 my_free(info->ft1_to_ft2);
288 info->ft1_to_ft2=0;
289 }
290 DBUG_RETURN(error);
291} /* _mi_ck_write_btree */
292
293int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
294 uchar *key, uint key_length, my_off_t *root, uint comp_flag)
295{
296 int error;
297 DBUG_ENTER("_mi_ck_real_write_btree");
298 /* key_length parameter is used only if comp_flag is SEARCH_FIND */
299 if (*root == HA_OFFSET_ERROR ||
300 (error=w_search(info, keyinfo, comp_flag, key, key_length,
301 *root, (uchar *) 0, (uchar*) 0,
302 (my_off_t) 0, 1)) > 0)
303 error=_mi_enlarge_root(info,keyinfo,key,root);
304 DBUG_RETURN(error);
305} /* _mi_ck_real_write_btree */
306
307
308 /* Make a new root with key as only pointer */
309
310int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, uchar *key,
311 my_off_t *root)
312{
313 uint t_length,nod_flag;
314 MI_KEY_PARAM s_temp;
315 MYISAM_SHARE *share=info->s;
316 DBUG_ENTER("_mi_enlarge_root");
317
318 nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
319 _mi_kpointer(info,info->buff+2,*root); /* if nod */
320 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar*) 0,
321 (uchar*) 0, (uchar*) 0, key,&s_temp);
322 mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
323 (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
324 info->buff_used=info->page_changed=1; /* info->buff is used */
325 if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
326 _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
327 DBUG_RETURN(-1);
328 DBUG_RETURN(0);
329} /* _mi_enlarge_root */
330
331
332 /*
333 Search after a position for a key and store it there
334 Returns -1 = error
335 0 = ok
336 1 = key should be stored in higher tree
337 */
338
339static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
340 uint comp_flag, uchar *key, uint key_length, my_off_t page,
341 uchar *father_buff, uchar *father_keypos,
342 my_off_t father_page, my_bool insert_last)
343{
344 int error,flag;
345 uint nod_flag, search_key_length;
346 uchar *temp_buff,*keypos;
347 uchar keybuff[HA_MAX_KEY_BUFF];
348 my_bool was_last_key;
349 my_off_t next_page, dupp_key_pos;
350 DBUG_ENTER("w_search");
351 DBUG_PRINT("enter",("page: %ld", (long) page));
352
353 search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
354 if (!(temp_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
355 HA_MAX_KEY_BUFF*2)))
356 DBUG_RETURN(-1);
357 if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
358 goto err;
359
360 flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
361 comp_flag, &keypos, keybuff, &was_last_key);
362 nod_flag=mi_test_if_nod(temp_buff);
363 if (flag == 0)
364 {
365 uint tmp_key_length;
366 /* get position to record with duplicated key */
367 tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
368 if (tmp_key_length)
369 dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
370 else
371 dupp_key_pos= HA_OFFSET_ERROR;
372
373 if (keyinfo->flag & HA_FULLTEXT)
374 {
375 uint off;
376 int subkeys;
377
378 get_key_full_length_rdonly(off, keybuff);
379 subkeys=ft_sintXkorr(keybuff+off);
380 comp_flag=SEARCH_SAME;
381 if (subkeys >= 0)
382 {
383 /* normal word, one-level tree structure */
384 flag=(*keyinfo->bin_search)(info, keyinfo, temp_buff, key,
385 USE_WHOLE_KEY, comp_flag,
386 &keypos, keybuff, &was_last_key);
387 }
388 else
389 {
390 /* popular word. two-level tree. going down */
391 my_off_t root=dupp_key_pos;
392 keyinfo=&info->s->ft2_keyinfo;
393 get_key_full_length_rdonly(off, key);
394 key+=off;
395 keypos-=keyinfo->keylength+nod_flag; /* we'll modify key entry 'in vivo' */
396 error=_mi_ck_real_write_btree(info, keyinfo, key, 0,
397 &root, comp_flag);
398 _mi_dpointer(info, keypos+HA_FT_WLEN, root);
399 subkeys--; /* should there be underflow protection ? */
400 DBUG_ASSERT(subkeys < 0);
401 ft_intXstore(keypos, subkeys);
402 if (!error)
403 error=_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff);
404 my_afree((uchar*) temp_buff);
405 DBUG_RETURN(error);
406 }
407 }
408 else /* not HA_FULLTEXT, normal HA_NOSAME key */
409 {
410 info->dupp_key_pos= dupp_key_pos;
411 my_afree((uchar*) temp_buff);
412 my_errno=HA_ERR_FOUND_DUPP_KEY;
413 DBUG_RETURN(-1);
414 }
415 }
416 if (flag == MI_FOUND_WRONG_KEY)
417 DBUG_RETURN(-1);
418 if (!was_last_key)
419 insert_last=0;
420 next_page=_mi_kpos(nod_flag,keypos);
421 if (next_page == HA_OFFSET_ERROR ||
422 (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
423 temp_buff, keypos, page, insert_last)) >0)
424 {
425 error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
426 father_keypos,father_page, insert_last);
427 if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
428 goto err;
429 }
430 my_afree((uchar*) temp_buff);
431 DBUG_RETURN(error);
432err:
433 my_afree((uchar*) temp_buff);
434 DBUG_PRINT("exit",("Error: %d",my_errno));
435 DBUG_RETURN (-1);
436} /* w_search */
437
438
439/*
440 Insert new key.
441
442 SYNOPSIS
443 _mi_insert()
444 info Open table information.
445 keyinfo Key definition information.
446 key New key.
447 anc_buff Key page (beginning).
448 key_pos Position in key page where to insert.
449 key_buff Copy of previous key.
450 father_buff parent key page for balancing.
451 father_key_pos position in parent key page for balancing.
452 father_page position of parent key page in file.
453 insert_last If to append at end of page.
454
455 DESCRIPTION
456 Insert new key at right of key_pos.
457
458 RETURN
459 2 if key contains key to upper level.
460 0 OK.
461 < 0 Error.
462*/
463
464int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
465 uchar *key, uchar *anc_buff, uchar *key_pos, uchar *key_buff,
466 uchar *father_buff, uchar *father_key_pos, my_off_t father_page,
467 my_bool insert_last)
468{
469 uint a_length,nod_flag;
470 int t_length;
471 uchar *endpos, *prev_key;
472 MI_KEY_PARAM s_temp;
473 DBUG_ENTER("_mi_insert");
474 DBUG_PRINT("enter",("key_pos: %p", key_pos));
475 DBUG_EXECUTE("key",_mi_print_key(DBUG_FILE,keyinfo->seg,key,USE_WHOLE_KEY););
476
477 nod_flag=mi_test_if_nod(anc_buff);
478 a_length=mi_getint(anc_buff);
479 endpos= anc_buff+ a_length;
480 prev_key=(key_pos == anc_buff+2+nod_flag ? (uchar*) 0 : key_buff);
481 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
482 (key_pos == endpos ? (uchar*) 0 : key_pos),
483 prev_key, prev_key,
484 key,&s_temp);
485#ifndef DBUG_OFF
486 if (key_pos != anc_buff+2+nod_flag && (keyinfo->flag &
487 (HA_BINARY_PACK_KEY | HA_PACK_KEY)))
488 {
489 DBUG_DUMP("prev_key",(uchar*) key_buff,_mi_keylength(keyinfo,key_buff));
490 }
491 if (keyinfo->flag & HA_PACK_KEY)
492 {
493 DBUG_PRINT("test",("t_length: %d ref_len: %d",
494 t_length,s_temp.ref_length));
495 DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: %p",
496 s_temp.n_ref_length,s_temp.n_length, s_temp.key));
497 }
498#endif
499 if (t_length > 0)
500 {
501 if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
502 {
503 mi_print_error(info->s, HA_ERR_CRASHED);
504 my_errno=HA_ERR_CRASHED;
505 DBUG_RETURN(-1);
506 }
507 bmove_upp((uchar*) endpos+t_length,(uchar*) endpos,(uint) (endpos-key_pos));
508 }
509 else
510 {
511 if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
512 {
513 mi_print_error(info->s, HA_ERR_CRASHED);
514 my_errno=HA_ERR_CRASHED;
515 DBUG_RETURN(-1);
516 }
517 bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length);
518 }
519 (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
520 a_length+=t_length;
521 mi_putint(anc_buff,a_length,nod_flag);
522 if (a_length <= keyinfo->block_length)
523 {
524 if (keyinfo->block_length - a_length < 32 &&
525 keyinfo->flag & HA_FULLTEXT && key_pos == endpos &&
526 info->s->base.key_reflength <= info->s->rec_reflength &&
527 info->s->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
528 {
529 /*
530 Normal word. One-level tree. Page is almost full.
531 Let's consider converting.
532 We'll compare 'key' and the first key at anc_buff
533 */
534 uchar *a=key, *b=anc_buff+2+nod_flag;
535 uint alen, blen, ft2len=info->s->ft2_keyinfo.keylength;
536 /* the very first key on the page is always unpacked */
537 DBUG_ASSERT((*b & 128) == 0);
538#if HA_FT_MAXLEN >= 127
539 blen= mi_uint2korr(b); b+=2;
540#else
541 blen= *b++;
542#endif
543 get_key_length(alen,a);
544 DBUG_ASSERT(info->ft1_to_ft2==0);
545 if (alen == blen &&
546 ha_compare_text(keyinfo->seg->charset, a, alen, b, blen, 0)==0)
547 {
548 /* yup. converting */
549 info->ft1_to_ft2=(DYNAMIC_ARRAY *)
550 my_malloc(sizeof(DYNAMIC_ARRAY), MYF(MY_WME));
551 my_init_dynamic_array(info->ft1_to_ft2, ft2len, 300, 50, MYF(0));
552
553 /*
554 now, adding all keys from the page to dynarray
555 if the page is a leaf (if not keys will be deleted later)
556 */
557 if (!nod_flag)
558 {
559 /* let's leave the first key on the page, though, because
560 we cannot easily dispatch an empty page here */
561 b+=blen+ft2len+2;
562 for (a=anc_buff+a_length ; b < a ; b+=ft2len+2)
563 {
564 if (insert_dynamic(info->ft1_to_ft2, b))
565 {
566 mi_print_error(info->s, HA_ERR_OUT_OF_MEM);
567 my_errno= HA_ERR_OUT_OF_MEM;
568 DBUG_RETURN(-1);
569 }
570 }
571
572 /* fixing the page's length - it contains only one key now */
573 mi_putint(anc_buff,2+blen+ft2len+2,0);
574 }
575 /* the rest will be done when we're back from recursion */
576 }
577 }
578 DBUG_RETURN(0); /* There is room on page */
579 }
580 /* Page is full */
581 if (nod_flag)
582 insert_last=0;
583 if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
584 father_buff && !insert_last)
585 DBUG_RETURN(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
586 father_key_pos,father_page));
587 DBUG_RETURN(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
588} /* _mi_insert */
589
590
591 /* split a full page in two and assign emerging item to key */
592
593int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
594 uchar *key, uchar *buff, uchar *key_buff,
595 my_bool insert_last_key)
596{
597 uint length,a_length,key_ref_length,t_length,nod_flag,key_length;
598 uchar *key_pos,*pos, *UNINIT_VAR(after_key);
599 my_off_t new_pos;
600 MI_KEY_PARAM s_temp;
601 DBUG_ENTER("mi_split_page");
602 DBUG_DUMP("buff",(uchar*) buff,mi_getint(buff));
603
604 if (info->s->keyinfo+info->lastinx == keyinfo)
605 info->page_changed=1; /* Info->buff is used */
606 info->buff_used=1;
607 nod_flag=mi_test_if_nod(buff);
608 key_ref_length=2+nod_flag;
609 if (insert_last_key)
610 key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
611 else
612 key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
613 &after_key);
614 if (!key_pos)
615 DBUG_RETURN(-1);
616
617 length=(uint) (key_pos-buff);
618 a_length=mi_getint(buff);
619 mi_putint(buff,length,nod_flag);
620
621 key_pos=after_key;
622 if (nod_flag)
623 {
624 DBUG_PRINT("test",("Splitting nod"));
625 pos=key_pos-nod_flag;
626 memcpy((uchar*) info->buff+2,(uchar*) pos,(size_t) nod_flag);
627 }
628
629 /* Move middle item to key and pointer to new page */
630 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
631 DBUG_RETURN(-1);
632 _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
633
634 /* Store new page */
635 if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
636 DBUG_RETURN(-1);
637
638 t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(uchar *) 0,
639 (uchar*) 0, (uchar*) 0,
640 key_buff, &s_temp);
641 length=(uint) ((buff+a_length)-key_pos);
642 memcpy((uchar*) info->buff+key_ref_length+t_length,(uchar*) key_pos,
643 (size_t) length);
644 (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
645 mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
646
647 if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
648 DBUG_RETURN(-1);
649 DBUG_DUMP("key",(uchar*) key,_mi_keylength(keyinfo,key));
650 DBUG_RETURN(2); /* Middle key up */
651} /* _mi_split_page */
652
653
654 /*
655 Calculate how to much to move to split a page in two
656 Returns pointer to start of key.
657 key will contain the key.
658 return_key_length will contain the length of key
659 after_key will contain the position to where the next key starts
660 */
661
662uchar *_mi_find_half_pos(uint nod_flag, MI_KEYDEF *keyinfo, uchar *page,
663 uchar *key, uint *return_key_length,
664 uchar **after_key)
665{
666 uint keys,length,key_ref_length;
667 uchar *end,*lastpos;
668 DBUG_ENTER("_mi_find_half_pos");
669
670 key_ref_length=2+nod_flag;
671 length=mi_getint(page)-key_ref_length;
672 page+=key_ref_length;
673 if (!(keyinfo->flag &
674 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
675 HA_BINARY_PACK_KEY)))
676 {
677 key_ref_length=keyinfo->keylength+nod_flag;
678 keys=length/(key_ref_length*2);
679 *return_key_length=keyinfo->keylength;
680 end=page+keys*key_ref_length;
681 *after_key=end+key_ref_length;
682 memcpy(key,end,key_ref_length);
683 DBUG_RETURN(end);
684 }
685
686 end=page+length/2-key_ref_length; /* This is aprox. half */
687 *key='\0';
688 do
689 {
690 lastpos=page;
691 if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
692 DBUG_RETURN(0);
693 } while (page < end);
694 *return_key_length=length;
695 *after_key=page;
696 DBUG_PRINT("exit",("returns: %p page: %p half: %p",
697 lastpos, page, end));
698 DBUG_RETURN(lastpos);
699} /* _mi_find_half_pos */
700
701
702/*
703 Split buffer at last key
704 Returns pointer to the start of the key before the last key
705 key will contain the last key
706*/
707
708static uchar *_mi_find_last_pos(MI_KEYDEF *keyinfo, uchar *page,
709 uchar *key, uint *return_key_length,
710 uchar **after_key)
711{
712 uint keys, length, UNINIT_VAR(last_length), key_ref_length;
713 uchar *end,*lastpos,*prevpos;
714 uchar key_buff[HA_MAX_KEY_BUFF];
715 DBUG_ENTER("_mi_find_last_pos");
716
717 key_ref_length=2;
718 length=mi_getint(page)-key_ref_length;
719 page+=key_ref_length;
720 if (!(keyinfo->flag &
721 (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
722 HA_BINARY_PACK_KEY)))
723 {
724 keys=length/keyinfo->keylength-2;
725 *return_key_length=length=keyinfo->keylength;
726 end=page+keys*length;
727 *after_key=end+length;
728 memcpy(key,end,length);
729 DBUG_RETURN(end);
730 }
731
732 end=page+length-key_ref_length;
733 DBUG_ASSERT(page < end);
734 *key='\0';
735 length=0;
736 lastpos=page;
737
738 do
739 {
740 prevpos=lastpos; lastpos=page;
741 last_length=length;
742 memcpy(key, key_buff, length); /* previous key */
743 if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
744 {
745 mi_print_error(keyinfo->share, HA_ERR_CRASHED);
746 my_errno=HA_ERR_CRASHED;
747 DBUG_RETURN(0);
748 }
749 } while (page < end);
750
751 *return_key_length=last_length;
752 *after_key=lastpos;
753 DBUG_PRINT("exit",("returns: %p page: %p end: %p",
754 prevpos, page, end));
755 DBUG_RETURN(prevpos);
756} /* _mi_find_last_pos */
757
758
759 /* Balance page with not packed keys with page on right/left */
760 /* returns 0 if balance was done */
761
762static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
763 uchar *key, uchar *curr_buff, uchar *father_buff,
764 uchar *father_key_pos, my_off_t father_page)
765{
766 my_bool right;
767 uint k_length,father_length,father_keylength,nod_flag,curr_keylength,
768 right_length,left_length,new_right_length,new_left_length,extra_length,
769 length,keys;
770 uchar *pos,*buff,*extra_buff;
771 my_off_t next_page,new_pos;
772 uchar tmp_part_key[HA_MAX_KEY_BUFF];
773 DBUG_ENTER("_mi_balance_page");
774
775 k_length=keyinfo->keylength;
776 father_length=mi_getint(father_buff);
777 father_keylength=k_length+info->s->base.key_reflength;
778 nod_flag=mi_test_if_nod(curr_buff);
779 curr_keylength=k_length+nod_flag;
780 info->page_changed=1;
781
782 if ((father_key_pos != father_buff+father_length &&
783 (info->state->records & 1)) ||
784 father_key_pos == father_buff+2+info->s->base.key_reflength)
785 {
786 right=1;
787 next_page= _mi_kpos(info->s->base.key_reflength,
788 father_key_pos+father_keylength);
789 buff=info->buff;
790 DBUG_PRINT("test",("use right page: %lu", (ulong) next_page));
791 }
792 else
793 {
794 right=0;
795 father_key_pos-=father_keylength;
796 next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
797 /* Fix that curr_buff is to left */
798 buff=curr_buff; curr_buff=info->buff;
799 DBUG_PRINT("test",("use left page: %lu", (ulong) next_page));
800 } /* father_key_pos ptr to parting key */
801
802 if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
803 goto err;
804 DBUG_DUMP("next",(uchar*) info->buff,mi_getint(info->buff));
805
806 /* Test if there is room to share keys */
807
808 left_length=mi_getint(curr_buff);
809 right_length=mi_getint(buff);
810 keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
811
812 if ((right ? right_length : left_length) + curr_keylength <=
813 keyinfo->block_length)
814 { /* Merge buffs */
815 new_left_length=2+nod_flag+(keys/2)*curr_keylength;
816 new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
817 mi_putint(curr_buff,new_left_length,nod_flag);
818 mi_putint(buff,new_right_length,nod_flag);
819
820 if (left_length < new_left_length)
821 { /* Move keys buff -> leaf */
822 pos=curr_buff+left_length;
823 memcpy((uchar*) pos,(uchar*) father_key_pos, (size_t) k_length);
824 memcpy((uchar*) pos+k_length, (uchar*) buff+2,
825 (size_t) (length=new_left_length - left_length - k_length));
826 pos=buff+2+length;
827 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
828 bmove((uchar*) buff + 2, (uchar*) pos + k_length, new_right_length - 2);
829 }
830 else
831 { /* Move keys -> buff */
832
833 bmove_upp((uchar*) buff+new_right_length,(uchar*) buff+right_length,
834 right_length-2);
835 length=new_right_length-right_length-k_length;
836 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
837 pos=curr_buff+new_left_length;
838 memcpy((uchar*) father_key_pos,(uchar*) pos,(size_t) k_length);
839 memcpy((uchar*) buff+2,(uchar*) pos+k_length,(size_t) length);
840 }
841
842 if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
843 _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
844 goto err;
845 DBUG_RETURN(0);
846 }
847
848 /* curr_buff[] and buff[] are full, lets split and make new nod */
849
850 extra_buff=info->buff+info->s->base.max_key_block_length;
851 new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
852 if (keys == 5) /* Too few keys to balance */
853 new_left_length-=curr_keylength;
854 extra_length=nod_flag+left_length+right_length-
855 new_left_length-new_right_length-curr_keylength;
856 DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d",
857 left_length, right_length,
858 new_left_length, new_right_length,
859 extra_length));
860 mi_putint(curr_buff,new_left_length,nod_flag);
861 mi_putint(buff,new_right_length,nod_flag);
862 mi_putint(extra_buff,extra_length+2,nod_flag);
863
864 /* move first largest keys to new page */
865 pos=buff+right_length-extra_length;
866 memcpy((uchar*) extra_buff+2,pos,(size_t) extra_length);
867 /* Save new parting key */
868 memcpy(tmp_part_key, pos-k_length,k_length);
869 /* Make place for new keys */
870 bmove_upp((uchar*) buff+new_right_length,(uchar*) pos-k_length,
871 right_length-extra_length-k_length-2);
872 /* Copy keys from left page */
873 pos= curr_buff+new_left_length;
874 memcpy((uchar*) buff+2,(uchar*) pos+k_length,
875 (size_t) (length=left_length-new_left_length-k_length));
876 /* Copy old parting key */
877 memcpy((uchar*) buff+2+length,father_key_pos,(size_t) k_length);
878
879 /* Move new parting keys up to caller */
880 memcpy((uchar*) (right ? key : father_key_pos),pos,(size_t) k_length);
881 memcpy((uchar*) (right ? father_key_pos : key),tmp_part_key, k_length);
882
883 if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
884 goto err;
885 _mi_kpointer(info,key+k_length,new_pos);
886 if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
887 DFLT_INIT_HITS,info->buff) ||
888 _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
889 DFLT_INIT_HITS,extra_buff))
890 goto err;
891
892 DBUG_RETURN(1); /* Middle key up */
893
894err:
895 DBUG_RETURN(-1);
896} /* _mi_balance_page */
897
898/**********************************************************************
899 * Bulk insert code *
900 **********************************************************************/
901
902typedef struct {
903 MI_INFO *info;
904 uint keynr;
905} bulk_insert_param;
906
907int _mi_ck_write_tree(register MI_INFO *info, uint keynr, uchar *key,
908 uint key_length)
909{
910 int error;
911 DBUG_ENTER("_mi_ck_write_tree");
912
913 error= tree_insert(&info->bulk_insert[keynr], key,
914 key_length + info->s->rec_reflength,
915 info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
916
917 DBUG_RETURN(error);
918} /* _mi_ck_write_tree */
919
920
921/* typeof(_mi_keys_compare)=qsort_cmp2 */
922
923static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2)
924{
925 uint not_used[2];
926 return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
927 key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
928 not_used);
929}
930
931
932static int keys_free(uchar *key, TREE_FREE mode, bulk_insert_param *param)
933{
934 /*
935 Probably I can use info->lastkey here, but I'm not sure,
936 and to be safe I'd better use local lastkey.
937 */
938 uchar lastkey[HA_MAX_KEY_BUFF];
939 uint keylen;
940 MI_KEYDEF *keyinfo;
941
942 switch (mode) {
943 case free_init:
944 if (param->info->s->concurrent_insert)
945 {
946 mysql_rwlock_wrlock(&param->info->s->key_root_lock[param->keynr]);
947 param->info->s->keyinfo[param->keynr].version++;
948 }
949 return 0;
950 case free_free:
951 keyinfo=param->info->s->keyinfo+param->keynr;
952 keylen=_mi_keylength(keyinfo, key);
953 memcpy(lastkey, key, keylen);
954 return _mi_ck_write_btree(param->info,param->keynr,lastkey,
955 keylen - param->info->s->rec_reflength);
956 case free_end:
957 if (param->info->s->concurrent_insert)
958 mysql_rwlock_unlock(&param->info->s->key_root_lock[param->keynr]);
959 return 0;
960 }
961 return -1;
962}
963
964
965int mi_init_bulk_insert(MI_INFO *info, size_t cache_size, ha_rows rows)
966{
967 MYISAM_SHARE *share=info->s;
968 MI_KEYDEF *key=share->keyinfo;
969 bulk_insert_param *params;
970 uint i, num_keys, total_keylength;
971 ulonglong key_map;
972 DBUG_ENTER("_mi_init_bulk_insert");
973 DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size));
974
975 DBUG_ASSERT(!info->bulk_insert &&
976 (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
977
978 mi_clear_all_keys_active(key_map);
979 for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
980 {
981 if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
982 mi_is_key_active(share->state.key_map, i))
983 {
984 num_keys++;
985 mi_set_key_active(key_map, i);
986 total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
987 }
988 }
989
990 if (num_keys==0 ||
991 num_keys * (size_t) MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
992 DBUG_RETURN(0);
993
994 if (rows && rows*total_keylength < cache_size)
995 cache_size= (size_t) rows;
996 else
997 cache_size/=total_keylength*16;
998
999 info->bulk_insert=(TREE *)
1000 my_malloc((sizeof(TREE)*share->base.keys+
1001 sizeof(bulk_insert_param)*num_keys),MYF(0));
1002
1003 if (!info->bulk_insert)
1004 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1005
1006 params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
1007 for (i=0 ; i < share->base.keys ; i++)
1008 {
1009 if (mi_is_key_active(key_map, i))
1010 {
1011 params->info=info;
1012 params->keynr=i;
1013 /* Only allocate a 16'th of the buffer at a time */
1014 init_tree(&info->bulk_insert[i],
1015 cache_size * key[i].maxlength,
1016 cache_size * key[i].maxlength, 0,
1017 (qsort_cmp2)keys_compare,
1018 (tree_element_free) keys_free, (void *)params++, MYF(0));
1019 }
1020 else
1021 info->bulk_insert[i].root=0;
1022 }
1023
1024 DBUG_RETURN(0);
1025}
1026
1027void mi_flush_bulk_insert(MI_INFO *info, uint inx)
1028{
1029 if (info->bulk_insert)
1030 {
1031 if (is_tree_inited(&info->bulk_insert[inx]))
1032 reset_tree(&info->bulk_insert[inx]);
1033 }
1034}
1035
1036int mi_end_bulk_insert(MI_INFO *info, my_bool abort)
1037{
1038 int first_error= 0;
1039 if (info->bulk_insert)
1040 {
1041 uint i;
1042 for (i=0 ; i < info->s->base.keys ; i++)
1043 {
1044 if (is_tree_inited(& info->bulk_insert[i]))
1045 {
1046 int error;
1047 if ((error= delete_tree(& info->bulk_insert[i], abort)))
1048 {
1049 first_error= first_error ? first_error : error;
1050 abort= 1;
1051 }
1052 }
1053 }
1054 my_free(info->bulk_insert);
1055 info->bulk_insert=0;
1056 }
1057 return first_error;
1058}
1059