1/* Copyright (c) 2000, 2012, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15
16/*
17 Creates a index for a database by reading keys, sorting them and outputing
18 them in sorted order through MI_SORT_INFO functions.
19*/
20
21#include "fulltext.h"
22#if defined(__WIN__)
23#include <fcntl.h>
24#else
25#include <stddef.h>
26#endif
27#include <queues.h>
28
29/* static variables */
30
31#undef MYF_RW
32#undef DISK_BUFFER_SIZE
33
34#define MERGEBUFF 15
35#define MERGEBUFF2 31
36#define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
37#define DISK_BUFFER_SIZE (IO_SIZE*128)
38
39/* How many keys we can keep in memory */
40typedef ulonglong ha_keys;
41
42/*
43 Pointers of functions for store and read keys from temp file
44*/
45
46extern void print_error(const char *fmt,...);
47
48/* Functions defined in this file */
49
50static ha_rows find_all_keys(MI_SORT_PARAM *info, ha_keys keys,
51 uchar **sort_keys,
52 DYNAMIC_ARRAY *buffpek,uint *maxbuffer,
53 IO_CACHE *tempfile,
54 IO_CACHE *tempfile_for_exceptions);
55static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys,
56 ha_keys count, BUFFPEK *buffpek,IO_CACHE *tempfile);
57static int write_key(MI_SORT_PARAM *info, uchar *key,
58 IO_CACHE *tempfile);
59static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
60 ha_keys count);
61static int merge_many_buff(MI_SORT_PARAM *info, ha_keys keys,
62 uchar * *sort_keys,
63 BUFFPEK *buffpek, uint *maxbuffer,
64 IO_CACHE *t_file);
65static my_off_t read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
66 uint sort_length);
67static int merge_buffers(MI_SORT_PARAM *info, ha_keys keys,
68 IO_CACHE *from_file, IO_CACHE *to_file,
69 uchar * *sort_keys, BUFFPEK *lastbuff,
70 BUFFPEK *Fb, BUFFPEK *Tb);
71static int merge_index(MI_SORT_PARAM *,ha_keys,uchar **,BUFFPEK *, uint,
72 IO_CACHE *);
73static int flush_ft_buf(MI_SORT_PARAM *info);
74
75static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys,
76 ha_keys count, BUFFPEK *buffpek,
77 IO_CACHE *tempfile);
78static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
79 uint sort_length);
80static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file,
81 uchar *key, uint sort_length, ha_keys count);
82static int write_merge_key_varlen(MI_SORT_PARAM *info,
83 IO_CACHE *to_file,
84 uchar* key, uint sort_length,
85 ha_keys count);
86static inline int
87my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
88
89
90/*
91 Sets the appropriate read and write methods for the MI_SORT_PARAM
92 based on the variable length key flag.
93*/
94static void set_sort_param_read_write(MI_SORT_PARAM *sort_param)
95{
96 if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
97 {
98 sort_param->write_keys= write_keys_varlen;
99 sort_param->read_to_buffer= read_to_buffer_varlen;
100 sort_param->write_key= write_merge_key_varlen;
101 }
102 else
103 {
104 sort_param->write_keys= write_keys;
105 sort_param->read_to_buffer= read_to_buffer;
106 sort_param->write_key= write_merge_key;
107 }
108}
109
110
111/*
112 Creates a index of sorted keys
113
114 SYNOPSIS
115 _create_index_by_sort()
116 info Sort parameters
117 no_messages Set to 1 if no output
118 sortbuff_size Size if sortbuffer to allocate
119
120 RESULT
121 0 ok
122 <> 0 Error
123*/
124
125int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
126 ulonglong sortbuff_size)
127{
128 int error;
129 uint sort_length, maxbuffer;
130 ulonglong memavl, old_memavl;
131 DYNAMIC_ARRAY buffpek;
132 ha_rows records, UNINIT_VAR(keys);
133 uchar **sort_keys;
134 IO_CACHE tempfile, tempfile_for_exceptions;
135 DBUG_ENTER("_create_index_by_sort");
136 DBUG_PRINT("enter",("sort_length: %u", info->key_length));
137
138 set_sort_param_read_write(info);
139
140 my_b_clear(&tempfile);
141 my_b_clear(&tempfile_for_exceptions);
142 bzero((char*) &buffpek,sizeof(buffpek));
143 sort_keys= (uchar **) NULL; error= 1;
144 maxbuffer=1;
145
146 memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
147 records= info->sort_info->max_records;
148 sort_length= info->key_length;
149
150 while (memavl >= MIN_SORT_BUFFER)
151 {
152 /* Check if we can fit all keys into memory */
153 if (((ulonglong) (records + 1) *
154 (sort_length + sizeof(char*)) <= memavl))
155 keys= records+1;
156 else if ((info->sort_info->param->testflag &
157 (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) ==
158 T_FORCE_SORT_MEMORY)
159 {
160 /*
161 Use all of the given sort buffer for key data.
162 Allocate 1000 buffers at a start for new data. More buffers
163 will be allocated when needed.
164 */
165 keys= memavl / (sort_length+sizeof(char*));
166 maxbuffer= (uint) MY_MIN((ulonglong) 1000, (records / keys)+1);
167 }
168 else
169 {
170 /*
171 All keys can't fit in memory.
172 Calculate how many keys + buffers we can keep in memory
173 */
174 uint maxbuffer_org;
175 do
176 {
177 maxbuffer_org= maxbuffer;
178 if (memavl < sizeof(BUFFPEK) * maxbuffer ||
179 (keys= (memavl-sizeof(BUFFPEK)*maxbuffer)/
180 (sort_length+sizeof(char*))) <= 1 ||
181 keys < maxbuffer)
182 {
183 mi_check_print_error(info->sort_info->param,
184 "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u",
185 sortbuff_size, (ulonglong) records,
186 sort_length);
187 my_errno= ENOMEM;
188 goto err;
189 }
190 }
191 while ((maxbuffer= (uint) (records/(keys-1)+1)) != maxbuffer_org);
192 }
193
194 if ((sort_keys= ((uchar **)
195 my_malloc((size_t) (keys*(sort_length+sizeof(char*))+
196 HA_FT_MAXBYTELEN), MYF(0)))))
197 {
198 if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
199 MY_MIN(maxbuffer/2, 1000), MYF(0)))
200 {
201 my_free(sort_keys);
202 sort_keys= 0;
203 }
204 else
205 break;
206 }
207 old_memavl=memavl;
208 if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER)
209 memavl= MIN_SORT_BUFFER;
210 }
211 if (memavl < MIN_SORT_BUFFER)
212 {
213 /* purecov: begin inspected */
214 mi_check_print_error(info->sort_info->param,
215 "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u",
216 sortbuff_size, (ulonglong) records, sort_length);
217 my_errno= ENOMEM;
218 goto err;
219 /* purecov: end inspected */
220 }
221 (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
222
223 if (!no_messages)
224 my_fprintf(stdout,
225 " - Searching for keys, allocating buffer for %llu keys\n",
226 (ulonglong) keys);
227
228 if ((records= find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
229 &tempfile,&tempfile_for_exceptions))
230 == HA_POS_ERROR)
231 goto err; /* purecov: tested */
232 if (maxbuffer == 0)
233 {
234 if (!no_messages)
235 my_fprintf(stdout, " - Dumping %llu keys\n", (ulonglong) records);
236 if (write_index(info, sort_keys, (ha_keys) records))
237 goto err; /* purecov: inspected */
238 }
239 else
240 {
241 keys=(keys*(sort_length+sizeof(char*)))/sort_length;
242 if (maxbuffer >= MERGEBUFF2)
243 {
244 if (!no_messages)
245 my_fprintf(stdout, " - Merging %llu keys\n",
246 (ulonglong) records); /* purecov: tested */
247 if (merge_many_buff(info,keys,sort_keys,
248 dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
249 goto err; /* purecov: inspected */
250 }
251 if (flush_io_cache(&tempfile) ||
252 reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
253 goto err; /* purecov: inspected */
254 if (!no_messages)
255 printf(" - Last merge and dumping keys\n"); /* purecov: tested */
256 if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
257 maxbuffer,&tempfile))
258 goto err; /* purecov: inspected */
259 }
260
261 if (flush_ft_buf(info) || flush_pending_blocks(info))
262 goto err;
263
264 if (my_b_inited(&tempfile_for_exceptions))
265 {
266 MI_INFO *idx=info->sort_info->info;
267 uint keyno=info->key;
268 uint key_length, ref_length=idx->s->rec_reflength;
269
270 if (!no_messages)
271 printf(" - Adding exceptions\n"); /* purecov: tested */
272 if (flush_io_cache(&tempfile_for_exceptions) ||
273 reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
274 goto err;
275
276 while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
277 sizeof(key_length))
278 && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
279 (uint) key_length))
280 {
281 if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
282 goto err;
283 }
284 }
285
286 error =0;
287
288err:
289 my_free(sort_keys);
290 delete_dynamic(&buffpek);
291 close_cached_file(&tempfile);
292 close_cached_file(&tempfile_for_exceptions);
293
294 DBUG_RETURN(error ? -1 : 0);
295} /* _create_index_by_sort */
296
297
298/* Search after all keys and place them in a temp. file */
299
300static ha_rows find_all_keys(MI_SORT_PARAM *info, ha_rows keys,
301 uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
302 uint *maxbuffer, IO_CACHE *tempfile,
303 IO_CACHE *tempfile_for_exceptions)
304{
305 int error;
306 ha_rows idx;
307 DBUG_ENTER("find_all_keys");
308
309 idx=error=0;
310 sort_keys[0]=(uchar*) (sort_keys+keys);
311
312 while (!(error=(*info->key_read)(info,sort_keys[idx])))
313 {
314 if (info->real_key_length > info->key_length)
315 {
316 if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
317 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
318 continue;
319 }
320
321 if (++idx == keys)
322 {
323 if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
324 tempfile))
325 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
326
327 sort_keys[0]=(uchar*) (sort_keys+keys);
328 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
329 idx=1;
330 }
331 sort_keys[idx]=sort_keys[idx-1]+info->key_length;
332 }
333 if (error > 0)
334 DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
335 if (buffpek->elements)
336 {
337 if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
338 tempfile))
339 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
340 *maxbuffer=buffpek->elements-1;
341 }
342 else
343 *maxbuffer=0;
344
345 DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
346} /* find_all_keys */
347
348static my_bool thr_find_all_keys_exec(MI_SORT_PARAM *sort_param)
349{
350 ulonglong memavl, old_memavl, sortbuff_size;
351 ha_keys UNINIT_VAR(keys), idx;
352 uint sort_length;
353 uint maxbuffer;
354 uchar **sort_keys= NULL;
355 int error= 0;
356 DBUG_ENTER("thr_find_all_keys");
357 DBUG_PRINT("enter", ("master: %d", sort_param->master));
358
359 if (sort_param->sort_info->got_error)
360 DBUG_RETURN(TRUE);
361
362 set_sort_param_read_write(sort_param);
363
364 my_b_clear(&sort_param->tempfile);
365 my_b_clear(&sort_param->tempfile_for_exceptions);
366 bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
367 bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
368
369 sortbuff_size= sort_param->sortbuff_size;
370 memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER);
371 idx= (ha_keys) sort_param->sort_info->max_records;
372 sort_length= sort_param->key_length;
373 maxbuffer= 1;
374
375 while (memavl >= MIN_SORT_BUFFER)
376 {
377 if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <=
378 (my_off_t) memavl)
379 keys= idx+1;
380 else if ((sort_param->sort_info->param->testflag &
381 (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) ==
382 T_FORCE_SORT_MEMORY)
383 {
384 /*
385 Use all of the given sort buffer for key data.
386 Allocate 1000 buffers at a start for new data. More buffers
387 will be allocated when needed.
388 */
389 keys= memavl / (sort_length+sizeof(char*));
390 maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1);
391 }
392 else
393 {
394 uint maxbuffer_org;
395 do
396 {
397 maxbuffer_org= maxbuffer;
398 if (memavl < sizeof(BUFFPEK)*maxbuffer ||
399 (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
400 (sort_length+sizeof(char*))) <= 1 ||
401 keys < (uint) maxbuffer)
402 {
403 mi_check_print_error(sort_param->sort_info->param,
404 "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u",
405 sortbuff_size, (ulonglong) idx, sort_length);
406 DBUG_RETURN(TRUE);
407 }
408 }
409 while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org);
410 }
411 if ((sort_keys= (uchar**) my_malloc((size_t)(keys * (sort_length + sizeof(char*)) +
412 ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
413 HA_FT_MAXBYTELEN : 0)), MYF(0))))
414 {
415 if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
416 maxbuffer, MY_MIN(maxbuffer / 2, 1000), MYF(0)))
417 {
418 my_free(sort_keys);
419 sort_keys= NULL; /* Safety against double free on error. */
420 }
421 else
422 break;
423 }
424 old_memavl= memavl;
425 if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER &&
426 old_memavl > MIN_SORT_BUFFER)
427 memavl= MIN_SORT_BUFFER;
428 }
429 if (memavl < MIN_SORT_BUFFER)
430 {
431 /* purecov: begin inspected */
432 mi_check_print_error(sort_param->sort_info->param,
433 "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u",
434 sortbuff_size, (ulonglong) idx, sort_length);
435 my_errno= ENOMEM;
436 goto err;
437 /* purecov: end inspected */
438 }
439
440 if (sort_param->sort_info->param->testflag & T_VERBOSE)
441 my_fprintf(stdout,
442 "Key %d - Allocating buffer for %llu keys\n",
443 sort_param->key + 1, (ulonglong) keys);
444 sort_param->sort_keys= sort_keys;
445
446 idx= error= 0;
447 sort_keys[0]= (uchar*) (sort_keys+keys);
448
449 DBUG_PRINT("info", ("reading keys"));
450 while (!(error= sort_param->sort_info->got_error) &&
451 !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
452 {
453 if (sort_param->real_key_length > sort_param->key_length)
454 {
455 if (write_key(sort_param, sort_keys[idx],
456 &sort_param->tempfile_for_exceptions))
457 goto err;
458 continue;
459 }
460
461 if (++idx == keys)
462 {
463 if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
464 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
465 &sort_param->tempfile))
466 goto err;
467 sort_keys[0]= (uchar*) (sort_keys+keys);
468 memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
469 idx= 1;
470 }
471 sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
472 }
473
474 if (error > 0)
475 goto err;
476
477 if (sort_param->buffpek.elements)
478 {
479 if (sort_param->write_keys(sort_param, sort_keys, idx,
480 (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
481 &sort_param->tempfile))
482 goto err;
483 sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
484 }
485 else
486 sort_param->keys= idx;
487
488 DBUG_RETURN(FALSE);
489
490err:
491 DBUG_PRINT("error", ("got some error"));
492 sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
493 my_free(sort_keys);
494 sort_param->sort_keys= 0;
495 delete_dynamic(& sort_param->buffpek);
496 close_cached_file(&sort_param->tempfile);
497 close_cached_file(&sort_param->tempfile_for_exceptions);
498
499 DBUG_RETURN(TRUE);
500}
501
502/* Search after all keys and place them in a temp. file */
503
504pthread_handler_t thr_find_all_keys(void *arg)
505{
506 MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
507 my_bool error= FALSE;
508 /* If my_thread_init fails */
509 if (my_thread_init() || thr_find_all_keys_exec(sort_param))
510 error= TRUE;
511
512 /*
513 Thread must clean up after itself.
514 */
515 free_root(&sort_param->wordroot, MYF(0));
516 /*
517 Detach from the share if the writer is involved. Avoid others to
518 be blocked. This includes a flush of the write buffer. This will
519 also indicate EOF to the readers.
520 That means that a writer always gets here first and readers -
521 only when they see EOF. But if a reader finishes prematurely
522 because of an error it may reach this earlier - don't allow it
523 to detach the writer thread.
524 */
525 if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
526 remove_io_thread(&sort_param->sort_info->info->rec_cache);
527
528 /* Readers detach from the share if any. Avoid others to be blocked. */
529 if (sort_param->read_cache.share)
530 remove_io_thread(&sort_param->read_cache);
531
532 mysql_mutex_lock(&sort_param->sort_info->mutex);
533 if (error)
534 sort_param->sort_info->got_error= 1;
535
536 if (!--sort_param->sort_info->threads_running)
537 mysql_cond_signal(&sort_param->sort_info->cond);
538 mysql_mutex_unlock(&sort_param->sort_info->mutex);
539 my_thread_end();
540 return NULL;
541}
542
543
544int thr_write_keys(MI_SORT_PARAM *sort_param)
545{
546 MI_SORT_INFO *sort_info=sort_param->sort_info;
547 HA_CHECK *param=sort_info->param;
548 ulonglong UNINIT_VAR(length);
549 ha_rows keys;
550 ulong *rec_per_key_part=param->rec_per_key_part;
551 int got_error=sort_info->got_error;
552 uint i;
553 MI_INFO *info=sort_info->info;
554 MYISAM_SHARE *share=info->s;
555 MI_SORT_PARAM *sinfo;
556 uchar *mergebuf=0;
557 DBUG_ENTER("thr_write_keys");
558
559 for (i= 0, sinfo= sort_param ;
560 i < sort_info->total_keys ;
561 i++, sinfo++)
562 {
563 if (!sinfo->sort_keys)
564 {
565 got_error=1;
566 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
567 continue;
568 }
569 if (!got_error)
570 {
571 mi_set_key_active(share->state.key_map, sinfo->key);
572 if (!sinfo->buffpek.elements)
573 {
574 if (param->testflag & T_VERBOSE)
575 {
576 my_fprintf(stdout,
577 "Key %d - Dumping %llu keys\n", sinfo->key+1,
578 (ulonglong) sinfo->keys);
579 fflush(stdout);
580 }
581 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
582 flush_ft_buf(sinfo) || flush_pending_blocks(sinfo))
583 got_error=1;
584 }
585 }
586 my_free(sinfo->sort_keys);
587 my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff));
588 sinfo->sort_keys=0;
589 }
590
591 for (i= 0, sinfo= sort_param ;
592 i < sort_info->total_keys ;
593 i++,
594 delete_dynamic(&sinfo->buffpek),
595 close_cached_file(&sinfo->tempfile),
596 close_cached_file(&sinfo->tempfile_for_exceptions),
597 rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++)
598 {
599 if (got_error)
600 continue;
601
602 set_sort_param_read_write(sinfo);
603
604 if (sinfo->buffpek.elements)
605 {
606 uint maxbuffer=sinfo->buffpek.elements-1;
607 if (!mergebuf)
608 {
609 length=param->sort_buffer_length;
610 while (length >= MIN_SORT_BUFFER)
611 {
612 if ((mergebuf= my_malloc((size_t) length, MYF(0))))
613 break;
614 length=length*3/4;
615 }
616 if (!mergebuf)
617 {
618 got_error=1;
619 continue;
620 }
621 }
622 keys=length/sinfo->key_length;
623 if (maxbuffer >= MERGEBUFF2)
624 {
625 if (param->testflag & T_VERBOSE)
626 my_fprintf(stdout,
627 "Key %d - Merging %llu keys\n",
628 sinfo->key+1, (ulonglong) sinfo->keys);
629 if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
630 dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
631 &maxbuffer, &sinfo->tempfile))
632 {
633 got_error=1;
634 continue;
635 }
636 }
637 if (flush_io_cache(&sinfo->tempfile) ||
638 reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
639 {
640 got_error=1;
641 continue;
642 }
643 if (param->testflag & T_VERBOSE)
644 printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
645 if (merge_index(sinfo, keys, (uchar **)mergebuf,
646 dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
647 maxbuffer,&sinfo->tempfile) ||
648 flush_ft_buf(sinfo) ||
649 flush_pending_blocks(sinfo))
650 {
651 got_error=1;
652 continue;
653 }
654 }
655 if (my_b_inited(&sinfo->tempfile_for_exceptions))
656 {
657 uint key_length;
658
659 if (param->testflag & T_VERBOSE)
660 printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
661
662 if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
663 reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
664 {
665 got_error=1;
666 continue;
667 }
668
669 while (!got_error &&
670 !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
671 sizeof(key_length)))
672 {
673 uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
674 if (key_length > sizeof(ft_buf) ||
675 my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf,
676 (uint)key_length) ||
677 _mi_ck_write(info, sinfo->key, (uchar*)ft_buf,
678 key_length - info->s->rec_reflength))
679 got_error=1;
680 }
681 }
682 if (!got_error && param->testflag & T_STATISTICS)
683 update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
684 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ?
685 sinfo->notnull : NULL,
686 (ulonglong) info->state->records);
687 }
688 my_free(mergebuf);
689 DBUG_RETURN(got_error);
690}
691
692 /* Write all keys in memory to file for later merge */
693
694static int write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
695 ha_keys count, BUFFPEK *buffpek, IO_CACHE *tempfile)
696{
697 uchar **end;
698 uint sort_length=info->key_length;
699 DBUG_ENTER("write_keys");
700
701 if (!buffpek)
702 DBUG_RETURN(1); /* Out of memory */
703
704 my_qsort2((uchar*) sort_keys,(size_t) count, sizeof(uchar*),
705 (qsort2_cmp) info->key_cmp, info);
706 if (!my_b_inited(tempfile) &&
707 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
708 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
709 DBUG_RETURN(1); /* purecov: inspected */
710
711 buffpek->file_pos=my_b_tell(tempfile);
712 buffpek->count=count;
713
714 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
715 {
716 if (my_b_write(tempfile,(uchar*) *sort_keys, sort_length))
717 DBUG_RETURN(1); /* purecov: inspected */
718 }
719 DBUG_RETURN(0);
720} /* write_keys */
721
722
723static inline int
724my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
725{
726 int err;
727 uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs);
728
729 /* The following is safe as this is a local file */
730 if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
731 return (err);
732 if ((err= my_b_write(to_file,bufs, (uint) len)))
733 return (err);
734 return (0);
735}
736
737
738static int write_keys_varlen(MI_SORT_PARAM *info,
739 register uchar **sort_keys,
740 ha_keys count, BUFFPEK *buffpek,
741 IO_CACHE *tempfile)
742{
743 uchar **end;
744 int err;
745 DBUG_ENTER("write_keys_varlen");
746
747 if (!buffpek)
748 DBUG_RETURN(1); /* Out of memory */
749
750 my_qsort2((uchar*) sort_keys, (size_t) count, sizeof(uchar*),
751 (qsort2_cmp) info->key_cmp, info);
752 if (!my_b_inited(tempfile) &&
753 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
754 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
755 DBUG_RETURN(1); /* purecov: inspected */
756
757 buffpek->file_pos=my_b_tell(tempfile);
758 buffpek->count=count;
759 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
760 {
761 if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys)))
762 DBUG_RETURN(err);
763 }
764 DBUG_RETURN(0);
765} /* write_keys_varlen */
766
767
768static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile)
769{
770 uint key_length=info->real_key_length;
771 DBUG_ENTER("write_key");
772
773 if (!my_b_inited(tempfile) &&
774 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
775 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
776 DBUG_RETURN(1);
777
778 if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) ||
779 my_b_write(tempfile,(uchar*)key,(uint) key_length))
780 DBUG_RETURN(1);
781 DBUG_RETURN(0);
782} /* write_key */
783
784
785/* Write index */
786
787static int write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
788 register ha_keys count)
789{
790 DBUG_ENTER("write_index");
791
792 my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
793 (qsort2_cmp) info->key_cmp,info);
794 while (count--)
795 {
796 if ((*info->key_write)(info,*sort_keys++))
797 DBUG_RETURN(-1); /* purecov: inspected */
798 }
799 DBUG_RETURN(0);
800} /* write_index */
801
802
803 /* Merge buffers to make < MERGEBUFF2 buffers */
804
805static int merge_many_buff(MI_SORT_PARAM *info, ha_keys keys,
806 uchar **sort_keys, BUFFPEK *buffpek,
807 uint *maxbuffer, IO_CACHE *t_file)
808{
809 register uint i;
810 IO_CACHE t_file2, *from_file, *to_file, *temp;
811 BUFFPEK *lastbuff;
812 DBUG_ENTER("merge_many_buff");
813
814 if (*maxbuffer < MERGEBUFF2)
815 DBUG_RETURN(0); /* purecov: inspected */
816 if (flush_io_cache(t_file) ||
817 open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
818 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
819 DBUG_RETURN(1); /* purecov: inspected */
820
821 from_file= t_file ; to_file= &t_file2;
822 while (*maxbuffer >= MERGEBUFF2)
823 {
824 reinit_io_cache(from_file,READ_CACHE,0L,0,0);
825 reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
826 lastbuff=buffpek;
827 for (i=0 ; i + MERGEBUFF*3/2 <= *maxbuffer ; i+=MERGEBUFF)
828 {
829 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
830 buffpek+i,buffpek+i+MERGEBUFF-1))
831 goto cleanup;
832 }
833 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
834 buffpek+i,buffpek+ *maxbuffer))
835 break; /* purecov: inspected */
836 if (flush_io_cache(to_file))
837 break; /* purecov: inspected */
838 temp=from_file; from_file=to_file; to_file=temp;
839 *maxbuffer= (uint) (lastbuff-buffpek)-1;
840 }
841cleanup:
842 close_cached_file(to_file); /* This holds old result */
843 if (to_file == t_file)
844 {
845 DBUG_ASSERT(t_file2.type == WRITE_CACHE);
846 *t_file=t_file2; /* Copy result file */
847 }
848
849 DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
850} /* merge_many_buff */
851
852
853/*
854 Read data to buffer
855
856 SYNOPSIS
857 read_to_buffer()
858 fromfile File to read from
859 buffpek Where to read from
860 sort_length max length to read
861 RESULT
862 > 0 Ammount of bytes read
863 -1 Error
864*/
865
866static my_off_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
867 uint sort_length)
868{
869 register ha_keys count;
870 size_t length;
871
872 if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,
873 (ha_rows) buffpek->count)))
874 {
875 if (my_b_pread(fromfile, (uchar*) buffpek->base,
876 (length= (size_t) (sort_length * count)),
877 buffpek->file_pos))
878 return(HA_OFFSET_ERROR);
879 buffpek->key=buffpek->base;
880 buffpek->file_pos+= length; /* New filepos */
881 buffpek->count-= count;
882 buffpek->mem_count= count;
883 }
884 return (((my_off_t) count) * sort_length);
885} /* read_to_buffer */
886
887
888static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
889 uint sort_length)
890{
891 register ha_keys count;
892 uint16 length_of_key = 0;
893 uint idx;
894 uchar *buffp;
895
896 if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
897 {
898 buffp = buffpek->base;
899
900 for (idx=1;idx<=count;idx++)
901 {
902 if (my_b_pread(fromfile, (uchar*)&length_of_key,
903 sizeof(length_of_key), buffpek->file_pos))
904 return(HA_OFFSET_ERROR);
905 buffpek->file_pos+=sizeof(length_of_key);
906 if (my_b_pread(fromfile, (uchar*) buffp,
907 length_of_key, buffpek->file_pos))
908 return(HA_OFFSET_ERROR);
909 buffpek->file_pos+=length_of_key;
910 buffp = buffp + sort_length;
911 }
912 buffpek->key=buffpek->base;
913 buffpek->count-= count;
914 buffpek->mem_count= count;
915 }
916 return (((my_off_t) count) * sort_length);
917} /* read_to_buffer_varlen */
918
919
920static int write_merge_key_varlen(MI_SORT_PARAM *info,
921 IO_CACHE *to_file, uchar* key,
922 uint sort_length, ha_keys count)
923{
924 ha_keys idx;
925 uchar *bufs = key;
926
927 for (idx=1;idx<=count;idx++)
928 {
929 int err;
930 if ((err= my_var_write(info, to_file, bufs)))
931 return (err);
932 bufs=bufs+sort_length;
933 }
934 return(0);
935}
936
937
938static int write_merge_key(MI_SORT_PARAM *info __attribute__((unused)),
939 IO_CACHE *to_file, uchar *key,
940 uint sort_length, ha_keys count)
941{
942 return my_b_write(to_file, key, (size_t) (sort_length * count));
943}
944
945/*
946 Merge buffers to one buffer
947 If to_file == 0 then use info->key_write
948
949 Return:
950 0 ok
951 1 error
952*/
953
954static int
955merge_buffers(MI_SORT_PARAM *info, ha_keys keys, IO_CACHE *from_file,
956 IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
957 BUFFPEK *Fb, BUFFPEK *Tb)
958{
959 int error= 1;
960 uint sort_length;
961 ha_keys maxcount;
962 ha_rows count;
963 my_off_t UNINIT_VAR(to_start_filepos), read_length;
964 uchar *strpos;
965 BUFFPEK *buffpek,**refpek;
966 QUEUE queue;
967 DBUG_ENTER("merge_buffers");
968
969 count=error=0;
970 maxcount= keys/((uint) (Tb-Fb) +1);
971 DBUG_ASSERT(maxcount > 0);
972 if (to_file)
973 to_start_filepos=my_b_tell(to_file);
974 strpos=(uchar*) sort_keys;
975 sort_length=info->key_length;
976
977 if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
978 (int (*)(void*, uchar *,uchar*)) info->key_cmp,
979 (void*) info, 0, 0))
980 DBUG_RETURN(1); /* purecov: inspected */
981
982 for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
983 {
984 count+= buffpek->count;
985 buffpek->base= (uchar*) strpos;
986 buffpek->max_keys= maxcount;
987 strpos+= (read_length= info->read_to_buffer(from_file,buffpek,
988 sort_length));
989 if (read_length == HA_OFFSET_ERROR)
990 goto err; /* purecov: inspected */
991 queue_insert(&queue,(uchar*) buffpek);
992 }
993
994 while (queue.elements > 1)
995 {
996 for (;;)
997 {
998 buffpek=(BUFFPEK*) queue_top(&queue);
999 if (to_file)
1000 {
1001 if (info->write_key(info,to_file,(uchar*) buffpek->key,
1002 sort_length, 1))
1003 {
1004 error=1; goto err; /* purecov: inspected */
1005 }
1006 }
1007 else
1008 {
1009 if ((*info->key_write)(info,(void*) buffpek->key))
1010 {
1011 error=1; goto err; /* purecov: inspected */
1012 }
1013 }
1014 buffpek->key+=sort_length;
1015 if (! --buffpek->mem_count)
1016 {
1017 /* It's enough to check for killedptr before a slow operation */
1018 if (killed_ptr(info->sort_info->param))
1019 {
1020 goto err;
1021 }
1022 if (!(read_length= info->read_to_buffer(from_file,buffpek,sort_length)))
1023 {
1024 uchar *base= buffpek->base;
1025 ha_keys max_keys=buffpek->max_keys;
1026
1027 queue_remove_top(&queue);
1028
1029 /* Put room used by buffer to use in other buffer */
1030 for (refpek= (BUFFPEK**) &queue_top(&queue);
1031 refpek <= (BUFFPEK**) &queue_end(&queue);
1032 refpek++)
1033 {
1034 buffpek= *refpek;
1035 if (buffpek->base+buffpek->max_keys*sort_length == base)
1036 {
1037 buffpek->max_keys+=max_keys;
1038 break;
1039 }
1040 else if (base+max_keys*sort_length == buffpek->base)
1041 {
1042 buffpek->base=base;
1043 buffpek->max_keys+=max_keys;
1044 break;
1045 }
1046 }
1047 break; /* One buffer have been removed */
1048 }
1049 else if (read_length == HA_OFFSET_ERROR)
1050 goto err; /* purecov: inspected */
1051 }
1052 queue_replace_top(&queue); /* Top element has been replaced */
1053 }
1054 }
1055 buffpek=(BUFFPEK*) queue_top(&queue);
1056 buffpek->base= (uchar*) sort_keys;
1057 buffpek->max_keys=keys;
1058 do
1059 {
1060 if (to_file)
1061 {
1062 if (info->write_key(info,to_file,(uchar*) buffpek->key,
1063 sort_length,buffpek->mem_count))
1064 {
1065 error=1; goto err; /* purecov: inspected */
1066 }
1067 }
1068 else
1069 {
1070 register uchar *end;
1071 strpos= (uchar*) buffpek->key;
1072 for (end=strpos+buffpek->mem_count*sort_length;
1073 strpos != end ;
1074 strpos+=sort_length)
1075 {
1076 if ((*info->key_write)(info,(void*) strpos))
1077 {
1078 error=1; goto err; /* purecov: inspected */
1079 }
1080 }
1081 }
1082 }
1083 while ((read_length= info->read_to_buffer(from_file,buffpek,sort_length)) != HA_OFFSET_ERROR && read_length != 0);
1084 if (read_length == 0)
1085 error= 0;
1086
1087 lastbuff->count=count;
1088 if (to_file)
1089 lastbuff->file_pos=to_start_filepos;
1090err:
1091 delete_queue(&queue);
1092 DBUG_RETURN(error);
1093} /* merge_buffers */
1094
1095
1096 /* Do a merge to output-file (save only positions) */
1097
1098static int
1099merge_index(MI_SORT_PARAM *info, ha_keys keys, uchar **sort_keys,
1100 BUFFPEK *buffpek, uint maxbuffer, IO_CACHE *tempfile)
1101{
1102 DBUG_ENTER("merge_index");
1103 if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1104 buffpek+maxbuffer))
1105 DBUG_RETURN(1); /* purecov: inspected */
1106 DBUG_RETURN(0);
1107} /* merge_index */
1108
1109
1110static int
1111flush_ft_buf(MI_SORT_PARAM *info)
1112{
1113 int err=0;
1114 if (info->sort_info->ft_buf)
1115 {
1116 err=sort_ft_buf_flush(info);
1117 my_free(info->sort_info->ft_buf);
1118 info->sort_info->ft_buf=0;
1119 }
1120 return err;
1121}
1122