1/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16/*
17 Creates a index for a database by reading keys, sorting them and outputing
18 them in sorted order through MARIA_SORT_INFO functions.
19*/
20
21#include "ma_fulltext.h"
22#include <my_check_opt.h>
23#if defined(MSDOS) || defined(__WIN__)
24#include <fcntl.h>
25#else
26#include <stddef.h>
27#endif
28#include <queues.h>
29
30/* static variables */
31
32#undef MIN_SORT_MEMORY
33#undef MYF_RW
34#undef DISK_BUFFER_SIZE
35
36#define MERGEBUFF 15
37#define MERGEBUFF2 31
38#define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
39#define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
40#define DISK_BUFFER_SIZE (IO_SIZE*128)
41
42/* How many keys we can keep in memory */
43typedef ulonglong ha_keys;
44
45/*
46 Pointers of functions for store and read keys from temp file
47*/
48
49extern void print_error(const char *fmt,...);
50
51/* Functions defined in this file */
52
53static ha_rows find_all_keys(MARIA_SORT_PARAM *info, ha_keys keys,
54 uchar **sort_keys,
55 DYNAMIC_ARRAY *buffpek,uint *maxbuffer,
56 IO_CACHE *tempfile,
57 IO_CACHE *tempfile_for_exceptions);
58static int write_keys(MARIA_SORT_PARAM *info,uchar **sort_keys,
59 ha_keys count, BUFFPEK *buffpek,IO_CACHE *tempfile);
60static int write_key(MARIA_SORT_PARAM *info, uchar *key,
61 IO_CACHE *tempfile);
62static int write_index(MARIA_SORT_PARAM *info, uchar **sort_keys,
63 ha_keys count);
64static int merge_many_buff(MARIA_SORT_PARAM *info, ha_keys keys,
65 uchar **sort_keys,
66 BUFFPEK *buffpek, uint *maxbuffer,
67 IO_CACHE *t_file);
68static my_off_t read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
69 uint sort_length);
70static int merge_buffers(MARIA_SORT_PARAM *info, ha_keys keys,
71 IO_CACHE *from_file, IO_CACHE *to_file,
72 uchar **sort_keys, BUFFPEK *lastbuff,
73 BUFFPEK *Fb, BUFFPEK *Tb);
74static int merge_index(MARIA_SORT_PARAM *,ha_keys,uchar **,BUFFPEK *, uint,
75 IO_CACHE *);
76static int flush_maria_ft_buf(MARIA_SORT_PARAM *info);
77
78static int write_keys_varlen(MARIA_SORT_PARAM *info,uchar **sort_keys,
79 ha_keys count, BUFFPEK *buffpek,
80 IO_CACHE *tempfile);
81static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
82 uint sort_length);
83static int write_merge_key(MARIA_SORT_PARAM *info, IO_CACHE *to_file,
84 uchar *key, uint sort_length, ha_keys count);
85static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
86 IO_CACHE *to_file,
87 uchar* key, uint sort_length,
88 ha_keys count);
89static inline int
90my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
91
92/*
93 Sets the appropriate read and write methods for the MARIA_SORT_PARAM
94 based on the variable length key flag.
95*/
96static void set_sort_param_read_write(MARIA_SORT_PARAM *sort_param)
97{
98 if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
99 {
100 sort_param->write_keys= write_keys_varlen;
101 sort_param->read_to_buffer= read_to_buffer_varlen;
102 sort_param->write_key= write_merge_key_varlen;
103 }
104 else
105 {
106 sort_param->write_keys= write_keys;
107 sort_param->read_to_buffer= read_to_buffer;
108 sort_param->write_key= write_merge_key;
109 }
110}
111
112
113/*
114 Creates a index of sorted keys
115
116 SYNOPSIS
117 _ma_create_index_by_sort()
118 info Sort parameters
119 no_messages Set to 1 if no output
120 sortbuff_size Size of sortbuffer to allocate
121
122 RESULT
123 0 ok
124 <> 0 Error
125*/
126
127int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
128 size_t sortbuff_size)
129{
130 int error;
131 uint sort_length, maxbuffer;
132 size_t memavl, old_memavl;
133 DYNAMIC_ARRAY buffpek;
134 ha_rows records, UNINIT_VAR(keys);
135 uchar **sort_keys;
136 IO_CACHE tempfile, tempfile_for_exceptions;
137 DBUG_ENTER("_ma_create_index_by_sort");
138 DBUG_PRINT("enter",("sort_buff_size: %lu sort_length: %d max_records: %lu",
139 (ulong) sortbuff_size, info->key_length,
140 (ulong) info->sort_info->max_records));
141
142 set_sort_param_read_write(info);
143
144 my_b_clear(&tempfile);
145 my_b_clear(&tempfile_for_exceptions);
146 bzero((char*) &buffpek,sizeof(buffpek));
147 sort_keys= (uchar **) NULL; error= 1;
148 maxbuffer=1;
149
150 memavl=MY_MAX(sortbuff_size,MIN_SORT_MEMORY);
151 records= info->sort_info->max_records;
152 sort_length= info->key_length;
153
154 while (memavl >= MIN_SORT_MEMORY)
155 {
156 /* Check if we can fit all keys into memory */
157 if (((ulonglong) (records + 1) *
158 (sort_length + sizeof(char*)) <= memavl))
159 keys= records+1;
160 else if ((info->sort_info->param->testflag &
161 (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) ==
162 T_FORCE_SORT_MEMORY)
163 {
164 /*
165 Use all of the given sort buffer for key data.
166 Allocate 1000 buffers at a start for new data. More buffers
167 will be allocated when needed.
168 */
169 keys= memavl / (sort_length+sizeof(char*));
170 maxbuffer= (uint) MY_MIN((ulonglong) 1000, (records / keys)+1);
171 }
172 else
173 {
174 /*
175 All keys can't fit in memory.
176 Calculate how many keys + buffers we can keep in memory
177 */
178 uint maxbuffer_org;
179 do
180 {
181 maxbuffer_org= maxbuffer;
182 if (memavl < sizeof(BUFFPEK) * maxbuffer ||
183 (keys= (memavl-sizeof(BUFFPEK)*maxbuffer)/
184 (sort_length+sizeof(char*))) <= 1 ||
185 keys < maxbuffer)
186 {
187 _ma_check_print_error(info->sort_info->param,
188 "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u",
189 (ulonglong) sortbuff_size, (ulonglong) records,
190 sort_length);
191 my_errno= ENOMEM;
192 goto err;
193 }
194 }
195 while ((maxbuffer= (uint) (records/(keys-1)+1)) != maxbuffer_org);
196 }
197
198 if ((sort_keys= ((uchar**)
199 my_malloc((size_t) (keys*(sort_length+sizeof(char*))+
200 HA_FT_MAXBYTELEN),
201 MYF(0)))))
202 {
203 if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
204 MY_MIN(maxbuffer/2, 1000), MYF(0)))
205 {
206 my_free(sort_keys);
207 sort_keys= 0;
208 }
209 else
210 break;
211 }
212 old_memavl=memavl;
213 if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
214 memavl=MIN_SORT_MEMORY;
215 }
216 if (memavl < MIN_SORT_MEMORY)
217 {
218 /* purecov: begin inspected */
219 _ma_check_print_error(info->sort_info->param,
220 "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u",
221 (ulonglong) sortbuff_size, (ulonglong) records, sort_length);
222 my_errno= ENOMEM;
223 goto err;
224 /* purecov: end inspected */
225 }
226 (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
227
228 if (!no_messages)
229 my_fprintf(stdout,
230 " - Searching for keys, allocating buffer for %llu keys\n",
231 (ulonglong) keys);
232
233 if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
234 &tempfile,&tempfile_for_exceptions))
235 == HA_POS_ERROR)
236 goto err; /* purecov: tested */
237
238 info->sort_info->param->stage++; /* Merge stage */
239
240 if (maxbuffer == 0)
241 {
242 if (!no_messages)
243 my_fprintf(stdout, " - Dumping %llu keys\n", (ulonglong) records);
244 if (write_index(info, sort_keys, (ha_keys) records))
245 goto err; /* purecov: inspected */
246 }
247 else
248 {
249 keys=(keys*(sort_length+sizeof(char*)))/sort_length;
250 if (maxbuffer >= MERGEBUFF2)
251 {
252 if (!no_messages)
253 my_fprintf(stdout, " - Merging %llu keys\n",
254 (ulonglong) records); /* purecov: tested */
255 if (merge_many_buff(info,keys,sort_keys,
256 dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
257 goto err; /* purecov: inspected */
258 }
259 if (flush_io_cache(&tempfile) ||
260 reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
261 goto err; /* purecov: inspected */
262 if (!no_messages)
263 printf(" - Last merge and dumping keys\n"); /* purecov: tested */
264 if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
265 maxbuffer,&tempfile))
266 goto err; /* purecov: inspected */
267 }
268
269 if (flush_maria_ft_buf(info) || _ma_flush_pending_blocks(info))
270 goto err;
271
272 if (my_b_inited(&tempfile_for_exceptions))
273 {
274 MARIA_HA *idx=info->sort_info->info;
275 uint16 key_length;
276 MARIA_KEY key;
277 key.keyinfo= idx->s->keyinfo + info->key;
278
279 if (!no_messages)
280 printf(" - Adding exceptions\n"); /* purecov: tested */
281 if (flush_io_cache(&tempfile_for_exceptions) ||
282 reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
283 goto err;
284
285 while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
286 sizeof(key_length))
287 && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
288 (uint) key_length))
289 {
290 key.data= (uchar*) sort_keys;
291 key.ref_length= idx->s->rec_reflength;
292 key.data_length= key_length - key.ref_length;
293 key.flag= 0;
294 if (_ma_ck_write(idx, &key))
295 goto err;
296 }
297 }
298
299 error =0;
300
301err:
302 my_free(sort_keys);
303 delete_dynamic(&buffpek);
304 close_cached_file(&tempfile);
305 close_cached_file(&tempfile_for_exceptions);
306
307 DBUG_RETURN(error ? -1 : 0);
308} /* _ma_create_index_by_sort */
309
310
311/* Search after all keys and place them in a temp. file */
312
313static ha_rows find_all_keys(MARIA_SORT_PARAM *info, ha_rows keys,
314 uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
315 uint *maxbuffer, IO_CACHE *tempfile,
316 IO_CACHE *tempfile_for_exceptions)
317{
318 int error;
319 ha_rows idx;
320 DBUG_ENTER("find_all_keys");
321
322 idx=error=0;
323 sort_keys[0]= (uchar*) (sort_keys+keys);
324
325 info->sort_info->info->in_check_table= 1;
326 while (!(error=(*info->key_read)(info,sort_keys[idx])))
327 {
328 if (info->real_key_length > info->key_length)
329 {
330 if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
331 goto err; /* purecov: inspected */
332 continue;
333 }
334
335 if (++idx == keys)
336 {
337 if (info->write_keys(info,sort_keys,idx-1,
338 (BUFFPEK *)alloc_dynamic(buffpek),
339 tempfile))
340 goto err; /* purecov: inspected */
341
342 sort_keys[0]=(uchar*) (sort_keys+keys);
343 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
344 idx=1;
345 }
346 sort_keys[idx]=sort_keys[idx-1]+info->key_length;
347 }
348 if (error > 0)
349 goto err; /* purecov: inspected */
350 if (buffpek->elements)
351 {
352 if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
353 tempfile))
354 goto err; /* purecov: inspected */
355 *maxbuffer=buffpek->elements-1;
356 }
357 else
358 *maxbuffer=0;
359
360 info->sort_info->info->in_check_table= 0;
361 DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
362
363err:
364 info->sort_info->info->in_check_table= 0; /* purecov: inspected */
365 DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
366} /* find_all_keys */
367
368
369static my_bool _ma_thr_find_all_keys_exec(MARIA_SORT_PARAM* sort_param)
370{
371 int error= 0;
372 ulonglong memavl, old_memavl;
373 longlong sortbuff_size;
374 ha_keys UNINIT_VAR(keys), idx;
375 uint sort_length;
376 uint maxbuffer;
377 uchar **sort_keys= NULL;
378 DBUG_ENTER("_ma_thr_find_all_keys_exec");
379 DBUG_PRINT("enter", ("master: %d", sort_param->master));
380
381 if (sort_param->sort_info->got_error)
382 DBUG_RETURN(TRUE);
383
384 set_sort_param_read_write(sort_param);
385
386 my_b_clear(&sort_param->tempfile);
387 my_b_clear(&sort_param->tempfile_for_exceptions);
388 bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
389 bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
390
391 sortbuff_size= sort_param->sortbuff_size;
392 memavl= MY_MAX(sortbuff_size, MIN_SORT_MEMORY);
393 idx= (ha_keys) sort_param->sort_info->max_records;
394 sort_length= sort_param->key_length;
395 maxbuffer= 1;
396
397 while (memavl >= MIN_SORT_MEMORY)
398 {
399 if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl)
400 keys= idx+1;
401 else if ((sort_param->sort_info->param->testflag &
402 (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) ==
403 T_FORCE_SORT_MEMORY)
404 {
405 /*
406 Use all of the given sort buffer for key data.
407 Allocate 1000 buffers at a start for new data. More buffers
408 will be allocated when needed.
409 */
410 keys= memavl / (sort_length+sizeof(char*));
411 maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1);
412 }
413 else
414 {
415 uint maxbuffer_org;
416 do
417 {
418 maxbuffer_org= maxbuffer;
419 if (memavl < sizeof(BUFFPEK)*maxbuffer ||
420 (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
421 (sort_length+sizeof(char*))) <= 1 ||
422 keys < maxbuffer)
423 {
424 _ma_check_print_error(sort_param->sort_info->param,
425 "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u",
426 sortbuff_size, (ulonglong) idx, sort_length);
427 goto err;
428 }
429 }
430 while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org);
431 }
432 if ((sort_keys= (uchar **)
433 my_malloc((size_t)(keys*(sort_length+sizeof(char*))+
434 ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
435 HA_FT_MAXBYTELEN : 0)), MYF(0))))
436 {
437 if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
438 maxbuffer, MY_MIN(maxbuffer / 2, 1000), MYF(0)))
439 {
440 my_free(sort_keys);
441 sort_keys= NULL; /* Safety against double free on error. */
442 }
443 else
444 break;
445 }
446 old_memavl= memavl;
447 if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
448 old_memavl > MIN_SORT_MEMORY)
449 memavl= MIN_SORT_MEMORY;
450 }
451 if (memavl < MIN_SORT_MEMORY)
452 {
453 /* purecov: begin inspected */
454 _ma_check_print_error(sort_param->sort_info->param,
455 "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u",
456 sortbuff_size, (ulonglong) idx, sort_length);
457 my_errno= ENOMEM;
458 goto err;
459 /* purecov: end inspected */
460 }
461
462 if (sort_param->sort_info->param->testflag & T_VERBOSE)
463 my_fprintf(stdout,
464 "Key %d - Allocating buffer for %llu keys\n",
465 sort_param->key + 1, (ulonglong) keys);
466 sort_param->sort_keys= sort_keys;
467
468 idx= error= 0;
469 sort_keys[0]= (uchar*) (sort_keys+keys);
470
471 DBUG_PRINT("info", ("reading keys"));
472 while (!(error= sort_param->sort_info->got_error) &&
473 !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
474 {
475 if (sort_param->real_key_length > sort_param->key_length)
476 {
477 if (write_key(sort_param, sort_keys[idx],
478 &sort_param->tempfile_for_exceptions))
479 goto err;
480 continue;
481 }
482
483 if (++idx == keys)
484 {
485 if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
486 (BUFFPEK *)alloc_dynamic(&sort_param->buffpek),
487 &sort_param->tempfile))
488 goto err;
489 sort_keys[0]= (uchar*) (sort_keys+keys);
490 memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
491 idx= 1;
492 }
493 sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
494 }
495 if (error > 0)
496 goto err;
497 if (sort_param->buffpek.elements)
498 {
499 if (sort_param->write_keys(sort_param,sort_keys, idx,
500 (BUFFPEK *) alloc_dynamic(&sort_param->buffpek),
501 &sort_param->tempfile))
502 goto err;
503 sort_param->keys= (uint)((sort_param->buffpek.elements - 1) * (keys - 1) + idx);
504 }
505 else
506 sort_param->keys= (uint)idx;
507
508 DBUG_RETURN(FALSE);
509
510err:
511 DBUG_PRINT("error", ("got some error"));
512 my_free(sort_keys);
513 sort_param->sort_keys= 0;
514 delete_dynamic(& sort_param->buffpek);
515 close_cached_file(&sort_param->tempfile);
516 close_cached_file(&sort_param->tempfile_for_exceptions);
517
518 DBUG_RETURN(TRUE);
519}
520
521/* Search after all keys and place them in a temp. file */
522
523pthread_handler_t _ma_thr_find_all_keys(void *arg)
524{
525 MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg;
526 my_bool error= FALSE;
527 /* If my_thread_init fails */
528 if (my_thread_init() || _ma_thr_find_all_keys_exec(sort_param))
529 error= TRUE;
530
531 /*
532 Thread must clean up after itself.
533 */
534 free_root(&sort_param->wordroot, MYF(0));
535 /*
536 Detach from the share if the writer is involved. Avoid others to
537 be blocked. This includes a flush of the write buffer. This will
538 also indicate EOF to the readers.
539 That means that a writer always gets here first and readers -
540 only when they see EOF. But if a reader finishes prematurely
541 because of an error it may reach this earlier - don't allow it
542 to detach the writer thread.
543 */
544 if (sort_param->master && sort_param->sort_info->info->rec_cache.share)
545 remove_io_thread(&sort_param->sort_info->info->rec_cache);
546
547 /* Readers detach from the share if any. Avoid others to be blocked. */
548 if (sort_param->read_cache.share)
549 remove_io_thread(&sort_param->read_cache);
550
551 mysql_mutex_lock(&sort_param->sort_info->mutex);
552 if (error)
553 sort_param->sort_info->got_error= 1;
554
555 if (!--sort_param->sort_info->threads_running)
556 mysql_cond_signal(&sort_param->sort_info->cond);
557 mysql_mutex_unlock(&sort_param->sort_info->mutex);
558
559 my_thread_end();
560 return NULL;
561}
562
563
564int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
565{
566 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
567 HA_CHECK *param=sort_info->param;
568 size_t UNINIT_VAR(length), keys;
569 double *rec_per_key_part= param->new_rec_per_key_part;
570 int got_error=sort_info->got_error;
571 uint i;
572 MARIA_HA *info=sort_info->info;
573 MARIA_SHARE *share= info->s;
574 MARIA_SORT_PARAM *sinfo;
575 uchar *mergebuf=0;
576 DBUG_ENTER("_ma_thr_write_keys");
577
578 for (i= 0, sinfo= sort_param ;
579 i < sort_info->total_keys ;
580 i++, sinfo++)
581 {
582 if (!sinfo->sort_keys)
583 {
584 got_error=1;
585 my_free(sinfo->rec_buff);
586 continue;
587 }
588 if (!got_error)
589 {
590 maria_set_key_active(share->state.key_map, sinfo->key);
591
592 if (!sinfo->buffpek.elements)
593 {
594 if (param->testflag & T_VERBOSE)
595 {
596 my_fprintf(stdout,
597 "Key %d - Dumping %llu keys\n", sinfo->key+1,
598 (ulonglong) sinfo->keys);
599 fflush(stdout);
600 }
601 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
602 flush_maria_ft_buf(sinfo) || _ma_flush_pending_blocks(sinfo))
603 got_error=1;
604 }
605 }
606 my_free(sinfo->sort_keys);
607 my_free(sinfo->rec_buff);
608 sinfo->sort_keys=0;
609 }
610
611 for (i= 0, sinfo= sort_param ;
612 i < sort_info->total_keys ;
613 i++,
614 delete_dynamic(&sinfo->buffpek),
615 close_cached_file(&sinfo->tempfile),
616 close_cached_file(&sinfo->tempfile_for_exceptions),
617 rec_per_key_part+= sinfo->keyinfo->keysegs,
618 sinfo++)
619 {
620 if (got_error)
621 continue;
622
623 set_sort_param_read_write(sinfo);
624
625 if (sinfo->buffpek.elements)
626 {
627 uint maxbuffer=sinfo->buffpek.elements-1;
628 if (!mergebuf)
629 {
630 length=(size_t)param->sort_buffer_length;
631 while (length >= MIN_SORT_MEMORY)
632 {
633 if ((mergebuf= my_malloc((size_t) length, MYF(0))))
634 break;
635 length=length*3/4;
636 }
637 if (!mergebuf)
638 {
639 got_error=1;
640 continue;
641 }
642 }
643 keys=length/sinfo->key_length;
644 if (maxbuffer >= MERGEBUFF2)
645 {
646 if (param->testflag & T_VERBOSE)
647 my_fprintf(stdout,
648 "Key %d - Merging %llu keys\n",
649 sinfo->key+1, (ulonglong) sinfo->keys);
650 if (merge_many_buff(sinfo, keys, (uchar **)mergebuf,
651 dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
652 &maxbuffer, &sinfo->tempfile))
653 {
654 got_error=1;
655 continue;
656 }
657 }
658 if (flush_io_cache(&sinfo->tempfile) ||
659 reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
660 {
661 got_error=1;
662 continue;
663 }
664 if (param->testflag & T_VERBOSE)
665 printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
666 if (merge_index(sinfo, keys, (uchar**) mergebuf,
667 dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
668 maxbuffer,&sinfo->tempfile) ||
669 flush_maria_ft_buf(sinfo) ||
670 _ma_flush_pending_blocks(sinfo))
671 {
672 got_error=1;
673 continue;
674 }
675 }
676 if (my_b_inited(&sinfo->tempfile_for_exceptions))
677 {
678 uint16 key_length;
679
680 if (param->testflag & T_VERBOSE)
681 printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
682
683 if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
684 reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
685 {
686 got_error=1;
687 continue;
688 }
689
690 while (!got_error &&
691 !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
692 sizeof(key_length)))
693 {
694 uchar maria_ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
695 if (key_length > sizeof(maria_ft_buf) ||
696 my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)maria_ft_buf,
697 (uint) key_length))
698 got_error= 1;
699 else
700 {
701 MARIA_KEY tmp_key;
702 tmp_key.keyinfo= info->s->keyinfo + sinfo->key;
703 tmp_key.data= maria_ft_buf;
704 tmp_key.ref_length= info->s->rec_reflength;
705 tmp_key.data_length= key_length - info->s->rec_reflength;
706 tmp_key.flag= 0;
707 if (_ma_ck_write(info, &tmp_key))
708 got_error=1;
709 }
710 }
711 }
712 if (!got_error && (param->testflag & T_STATISTICS))
713 maria_update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
714 param->stats_method ==
715 MI_STATS_METHOD_IGNORE_NULLS ?
716 sinfo->notnull : NULL,
717 (ulonglong) share->state.state.records);
718
719 }
720 my_free(mergebuf);
721 DBUG_RETURN(got_error);
722}
723
724
725/* Write all keys in memory to file for later merge */
726
727static int write_keys(MARIA_SORT_PARAM *info, register uchar **sort_keys,
728 ha_keys count, BUFFPEK *buffpek, IO_CACHE *tempfile)
729{
730 uchar **end;
731 uint sort_length=info->key_length;
732 DBUG_ENTER("write_keys");
733
734 if (!buffpek)
735 DBUG_RETURN(1); /* Out of memory */
736
737 my_qsort2((uchar*) sort_keys,(size_t) count, sizeof(uchar*),
738 (qsort2_cmp) info->key_cmp, info);
739 if (!my_b_inited(tempfile) &&
740 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
741 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
742 DBUG_RETURN(1); /* purecov: inspected */
743
744 buffpek->file_pos=my_b_tell(tempfile);
745 buffpek->count=count;
746
747 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
748 {
749 if (my_b_write(tempfile, *sort_keys, sort_length))
750 DBUG_RETURN(1); /* purecov: inspected */
751 }
752 DBUG_RETURN(0);
753} /* write_keys */
754
755
756static inline int
757my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
758{
759 int err;
760 uint16 len= _ma_keylength(info->keyinfo, bufs);
761
762 /* The following is safe as this is a local file */
763 if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
764 return (err);
765 if ((err= my_b_write(to_file,bufs, (uint) len)))
766 return (err);
767 return (0);
768}
769
770
771static int write_keys_varlen(MARIA_SORT_PARAM *info,
772 register uchar **sort_keys,
773 ha_keys count, BUFFPEK *buffpek,
774 IO_CACHE *tempfile)
775{
776 uchar **end;
777 int err;
778 DBUG_ENTER("write_keys_varlen");
779
780 if (!buffpek)
781 DBUG_RETURN(1); /* Out of memory */
782
783 my_qsort2((uchar*) sort_keys, (size_t) count, sizeof(uchar*),
784 (qsort2_cmp) info->key_cmp, info);
785 if (!my_b_inited(tempfile) &&
786 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
787 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
788 DBUG_RETURN(1); /* purecov: inspected */
789
790 buffpek->file_pos=my_b_tell(tempfile);
791 buffpek->count=count;
792 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
793 {
794 if ((err= my_var_write(info,tempfile, *sort_keys)))
795 DBUG_RETURN(err);
796 }
797 DBUG_RETURN(0);
798} /* write_keys_varlen */
799
800
801static int write_key(MARIA_SORT_PARAM *info, uchar *key,
802 IO_CACHE *tempfile)
803{
804 uint16 key_length=info->real_key_length;
805 DBUG_ENTER("write_key");
806
807 if (!my_b_inited(tempfile) &&
808 open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
809 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
810 DBUG_RETURN(1);
811
812 if (my_b_write(tempfile, (uchar*)&key_length,sizeof(key_length)) ||
813 my_b_write(tempfile, key, (uint) key_length))
814 DBUG_RETURN(1);
815 DBUG_RETURN(0);
816} /* write_key */
817
818
819/* Write index */
820
821static int write_index(MARIA_SORT_PARAM *info, register uchar **sort_keys,
822 register ha_keys count)
823{
824 DBUG_ENTER("write_index");
825
826 my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
827 (qsort2_cmp) info->key_cmp,info);
828 while (count--)
829 {
830 if ((*info->key_write)(info, *sort_keys++))
831 DBUG_RETURN(-1); /* purecov: inspected */
832 }
833 if (info->sort_info->param->max_stage != 1) /* If not parallel */
834 _ma_report_progress(info->sort_info->param, 1, 1);
835 DBUG_RETURN(0);
836} /* write_index */
837
838
839 /* Merge buffers to make < MERGEBUFF2 buffers */
840
841static int merge_many_buff(MARIA_SORT_PARAM *info, ha_keys keys,
842 uchar **sort_keys, BUFFPEK *buffpek,
843 uint *maxbuffer, IO_CACHE *t_file)
844{
845 uint tmp, merges, max_merges;
846 IO_CACHE t_file2, *from_file, *to_file, *temp;
847 BUFFPEK *lastbuff;
848 DBUG_ENTER("merge_many_buff");
849
850 if (*maxbuffer < MERGEBUFF2)
851 DBUG_RETURN(0); /* purecov: inspected */
852 if (flush_io_cache(t_file) ||
853 open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
854 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
855 DBUG_RETURN(1); /* purecov: inspected */
856
857 /* Calculate how many merges are needed */
858 max_merges= 1; /* Count merge_index */
859 tmp= *maxbuffer;
860 while (tmp >= MERGEBUFF2)
861 {
862 merges= (tmp-MERGEBUFF*3/2 + 1) / MERGEBUFF + 1;
863 max_merges+= merges;
864 tmp= merges;
865 }
866 merges= 0;
867
868 from_file= t_file ; to_file= &t_file2;
869 while (*maxbuffer >= MERGEBUFF2)
870 {
871 uint i;
872 reinit_io_cache(from_file,READ_CACHE,0L,0,0);
873 reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
874 lastbuff=buffpek;
875 for (i=0 ; i + MERGEBUFF*3/2 <= *maxbuffer ; i+=MERGEBUFF)
876 {
877 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
878 buffpek+i,buffpek+i+MERGEBUFF-1))
879 goto cleanup;
880 if (info->sort_info->param->max_stage != 1) /* If not parallel */
881 _ma_report_progress(info->sort_info->param, merges++, max_merges);
882 }
883 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
884 buffpek+i,buffpek+ *maxbuffer))
885 break; /* purecov: inspected */
886 if (flush_io_cache(to_file))
887 break; /* purecov: inspected */
888 temp=from_file; from_file=to_file; to_file=temp;
889 *maxbuffer= (uint) (lastbuff-buffpek)-1;
890 if (info->sort_info->param->max_stage != 1) /* If not parallel */
891 _ma_report_progress(info->sort_info->param, merges++, max_merges);
892 }
893cleanup:
894 close_cached_file(to_file); /* This holds old result */
895 if (to_file == t_file)
896 {
897 DBUG_ASSERT(t_file2.type == WRITE_CACHE);
898 *t_file=t_file2; /* Copy result file */
899 }
900
901 DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
902} /* merge_many_buff */
903
904
905/*
906 Read data to buffer
907
908 SYNOPSIS
909 read_to_buffer()
910 fromfile File to read from
911 buffpek Where to read from
912 sort_length max length to read
913 RESULT
914 > 0 Ammount of bytes read
915 -1 Error
916*/
917
918static my_off_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
919 uint sort_length)
920{
921 register ha_keys count;
922 size_t length;
923
924 if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,
925 (ha_rows) buffpek->count)))
926 {
927 if (my_b_pread(fromfile, (uchar*) buffpek->base,
928 (length= sort_length * (size_t)count), buffpek->file_pos))
929 return(HA_OFFSET_ERROR); /* purecov: inspected */
930 buffpek->key=buffpek->base;
931 buffpek->file_pos+= length; /* New filepos */
932 buffpek->count-= count;
933 buffpek->mem_count= count;
934 }
935 return (((my_off_t) count) * sort_length);
936} /* read_to_buffer */
937
938
939static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
940 uint sort_length)
941{
942 register ha_keys count;
943 uint idx;
944 uchar *buffp;
945
946 if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count)))
947 {
948 buffp= buffpek->base;
949
950 for (idx=1;idx<=count;idx++)
951 {
952 uint16 length_of_key;
953 if (my_b_pread(fromfile, (uchar*)&length_of_key,
954 sizeof(length_of_key), buffpek->file_pos))
955 return(HA_OFFSET_ERROR);
956 buffpek->file_pos+=sizeof(length_of_key);
957 if (my_b_pread(fromfile, (uchar*) buffp,
958 length_of_key, buffpek->file_pos))
959 return((uint) -1);
960 buffpek->file_pos+=length_of_key;
961 buffp = buffp + sort_length;
962 }
963 buffpek->key=buffpek->base;
964 buffpek->count-= count;
965 buffpek->mem_count= count;
966 }
967 return (((my_off_t) count) * sort_length);
968} /* read_to_buffer_varlen */
969
970
971static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
972 IO_CACHE *to_file, uchar* key,
973 uint sort_length, ha_keys count)
974{
975 ha_keys idx;
976 uchar *bufs = key;
977
978 for (idx=1;idx<=count;idx++)
979 {
980 int err;
981 if ((err= my_var_write(info, to_file, bufs)))
982 return (err);
983 bufs=bufs+sort_length;
984 }
985 return(0);
986}
987
988
989static int write_merge_key(MARIA_SORT_PARAM *info __attribute__((unused)),
990 IO_CACHE *to_file, uchar *key,
991 uint sort_length, ha_keys count)
992{
993 return my_b_write(to_file, key, (size_t) (sort_length * count));
994}
995
996/*
997 Merge buffers to one buffer
998 If to_file == 0 then use info->key_write
999
1000 Return:
1001 0 ok
1002 1 error
1003*/
1004
1005static int
1006merge_buffers(MARIA_SORT_PARAM *info, ha_keys keys, IO_CACHE *from_file,
1007 IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
1008 BUFFPEK *Fb, BUFFPEK *Tb)
1009{
1010 int error= 1;
1011 uint sort_length;
1012 ha_keys maxcount;
1013 ha_rows count;
1014 my_off_t UNINIT_VAR(to_start_filepos), read_length;
1015 uchar *strpos;
1016 BUFFPEK *buffpek,**refpek;
1017 QUEUE queue;
1018 DBUG_ENTER("merge_buffers");
1019
1020 count= 0;
1021 maxcount= keys/((uint) (Tb-Fb) +1);
1022 DBUG_ASSERT(maxcount > 0);
1023 if (to_file)
1024 to_start_filepos=my_b_tell(to_file);
1025 strpos= (uchar*) sort_keys;
1026 sort_length=info->key_length;
1027
1028 if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
1029 (int (*)(void*, uchar *,uchar*)) info->key_cmp,
1030 (void*) info, 0, 0))
1031 DBUG_RETURN(1); /* purecov: inspected */
1032
1033 for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
1034 {
1035 count+= buffpek->count;
1036 buffpek->base= strpos;
1037 buffpek->max_keys= maxcount;
1038 strpos+= (read_length= info->read_to_buffer(from_file,buffpek,
1039 sort_length));
1040 if (read_length == HA_OFFSET_ERROR)
1041 goto err; /* purecov: inspected */
1042 queue_insert(&queue,(uchar*) buffpek);
1043 }
1044
1045 while (queue.elements > 1)
1046 {
1047 for (;;)
1048 {
1049 buffpek=(BUFFPEK*) queue_top(&queue);
1050 if (to_file)
1051 {
1052 if (info->write_key(info,to_file, buffpek->key,
1053 sort_length, 1))
1054 goto err; /* purecov: inspected */
1055 }
1056 else
1057 {
1058 if ((*info->key_write)(info,(void*) buffpek->key))
1059 goto err; /* purecov: inspected */
1060 }
1061 buffpek->key+=sort_length;
1062 if (! --buffpek->mem_count)
1063 {
1064 /* It's enough to check for killedptr before a slow operation */
1065 if (_ma_killed_ptr(info->sort_info->param))
1066 goto err;
1067 if (!(read_length= info->read_to_buffer(from_file,buffpek,sort_length)))
1068 {
1069 uchar *base= buffpek->base;
1070 ha_keys max_keys=buffpek->max_keys;
1071
1072 queue_remove_top(&queue);
1073
1074 /* Put room used by buffer to use in other buffer */
1075 for (refpek= (BUFFPEK**) &queue_top(&queue);
1076 refpek <= (BUFFPEK**) &queue_end(&queue);
1077 refpek++)
1078 {
1079 buffpek= *refpek;
1080 if (buffpek->base+buffpek->max_keys*sort_length == base)
1081 {
1082 buffpek->max_keys+=max_keys;
1083 break;
1084 }
1085 else if (base+max_keys*sort_length == buffpek->base)
1086 {
1087 buffpek->base=base;
1088 buffpek->max_keys+=max_keys;
1089 break;
1090 }
1091 }
1092 break; /* One buffer have been removed */
1093 }
1094 else if (read_length == HA_OFFSET_ERROR)
1095 goto err; /* purecov: inspected */
1096 }
1097 queue_replace_top(&queue); /* Top element has been replaced */
1098 }
1099 }
1100 buffpek=(BUFFPEK*) queue_top(&queue);
1101 buffpek->base= (uchar*) sort_keys;
1102 buffpek->max_keys=keys;
1103 do
1104 {
1105 if (to_file)
1106 {
1107 if (info->write_key(info, to_file, buffpek->key,
1108 sort_length,buffpek->mem_count))
1109 {
1110 error=1; goto err; /* purecov: inspected */
1111 }
1112 }
1113 else
1114 {
1115 register uchar *end;
1116 strpos= buffpek->key;
1117 for (end= strpos+buffpek->mem_count*sort_length;
1118 strpos != end ;
1119 strpos+=sort_length)
1120 {
1121 if ((*info->key_write)(info, strpos))
1122 {
1123 error=1; goto err; /* purecov: inspected */
1124 }
1125 }
1126 }
1127 }
1128 while ((read_length= info->read_to_buffer(from_file,buffpek,sort_length)) != HA_OFFSET_ERROR && read_length != 0);
1129 if (read_length == 0)
1130 error= 0;
1131
1132 lastbuff->count=count;
1133 if (to_file)
1134 lastbuff->file_pos=to_start_filepos;
1135err:
1136 delete_queue(&queue);
1137 DBUG_RETURN(error);
1138} /* merge_buffers */
1139
1140
1141 /* Do a merge to output-file (save only positions) */
1142
1143static int
1144merge_index(MARIA_SORT_PARAM *info, ha_keys keys, uchar **sort_keys,
1145 BUFFPEK *buffpek, uint maxbuffer, IO_CACHE *tempfile)
1146{
1147 DBUG_ENTER("merge_index");
1148 if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1149 buffpek+maxbuffer))
1150 DBUG_RETURN(1); /* purecov: inspected */
1151 if (info->sort_info->param->max_stage != 1) /* If not parallel */
1152 _ma_report_progress(info->sort_info->param, 1, 1);
1153 DBUG_RETURN(0);
1154} /* merge_index */
1155
1156
1157static int flush_maria_ft_buf(MARIA_SORT_PARAM *info)
1158{
1159 int err=0;
1160 if (info->sort_info->ft_buf)
1161 {
1162 err=_ma_sort_ft_buf_flush(info);
1163 my_free(info->sort_info->ft_buf);
1164 info->sort_info->ft_buf=0;
1165 }
1166 return err;
1167}
1168