1 | /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ |
15 | |
16 | /* |
17 | Creates a index for a database by reading keys, sorting them and outputing |
18 | them in sorted order through MARIA_SORT_INFO functions. |
19 | */ |
20 | |
21 | #include "ma_fulltext.h" |
22 | #include <my_check_opt.h> |
23 | #if defined(MSDOS) || defined(__WIN__) |
24 | #include <fcntl.h> |
25 | #else |
26 | #include <stddef.h> |
27 | #endif |
28 | #include <queues.h> |
29 | |
30 | /* static variables */ |
31 | |
32 | #undef MIN_SORT_MEMORY |
33 | #undef MYF_RW |
34 | #undef DISK_BUFFER_SIZE |
35 | |
36 | #define MERGEBUFF 15 |
37 | #define MERGEBUFF2 31 |
38 | #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD) |
39 | #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL) |
40 | #define DISK_BUFFER_SIZE (IO_SIZE*128) |
41 | |
42 | /* How many keys we can keep in memory */ |
43 | typedef ulonglong ha_keys; |
44 | |
45 | /* |
46 | Pointers of functions for store and read keys from temp file |
47 | */ |
48 | |
49 | extern void print_error(const char *fmt,...); |
50 | |
51 | /* Functions defined in this file */ |
52 | |
53 | static ha_rows find_all_keys(MARIA_SORT_PARAM *info, ha_keys keys, |
54 | uchar **sort_keys, |
55 | DYNAMIC_ARRAY *buffpek,uint *maxbuffer, |
56 | IO_CACHE *tempfile, |
57 | IO_CACHE *tempfile_for_exceptions); |
58 | static int write_keys(MARIA_SORT_PARAM *info,uchar **sort_keys, |
59 | ha_keys count, BUFFPEK *buffpek,IO_CACHE *tempfile); |
60 | static int write_key(MARIA_SORT_PARAM *info, uchar *key, |
61 | IO_CACHE *tempfile); |
62 | static int write_index(MARIA_SORT_PARAM *info, uchar **sort_keys, |
63 | ha_keys count); |
64 | static int merge_many_buff(MARIA_SORT_PARAM *info, ha_keys keys, |
65 | uchar **sort_keys, |
66 | BUFFPEK *buffpek, uint *maxbuffer, |
67 | IO_CACHE *t_file); |
68 | static my_off_t read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek, |
69 | uint sort_length); |
70 | static int merge_buffers(MARIA_SORT_PARAM *info, ha_keys keys, |
71 | IO_CACHE *from_file, IO_CACHE *to_file, |
72 | uchar **sort_keys, BUFFPEK *lastbuff, |
73 | BUFFPEK *Fb, BUFFPEK *Tb); |
74 | static int merge_index(MARIA_SORT_PARAM *,ha_keys,uchar **,BUFFPEK *, uint, |
75 | IO_CACHE *); |
76 | static int flush_maria_ft_buf(MARIA_SORT_PARAM *info); |
77 | |
78 | static int write_keys_varlen(MARIA_SORT_PARAM *info,uchar **sort_keys, |
79 | ha_keys count, BUFFPEK *buffpek, |
80 | IO_CACHE *tempfile); |
81 | static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek, |
82 | uint sort_length); |
83 | static int write_merge_key(MARIA_SORT_PARAM *info, IO_CACHE *to_file, |
84 | uchar *key, uint sort_length, ha_keys count); |
85 | static int write_merge_key_varlen(MARIA_SORT_PARAM *info, |
86 | IO_CACHE *to_file, |
87 | uchar* key, uint sort_length, |
88 | ha_keys count); |
89 | static inline int |
90 | my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs); |
91 | |
92 | /* |
93 | Sets the appropriate read and write methods for the MARIA_SORT_PARAM |
94 | based on the variable length key flag. |
95 | */ |
96 | static void set_sort_param_read_write(MARIA_SORT_PARAM *sort_param) |
97 | { |
98 | if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY) |
99 | { |
100 | sort_param->write_keys= write_keys_varlen; |
101 | sort_param->read_to_buffer= read_to_buffer_varlen; |
102 | sort_param->write_key= write_merge_key_varlen; |
103 | } |
104 | else |
105 | { |
106 | sort_param->write_keys= write_keys; |
107 | sort_param->read_to_buffer= read_to_buffer; |
108 | sort_param->write_key= write_merge_key; |
109 | } |
110 | } |
111 | |
112 | |
113 | /* |
114 | Creates a index of sorted keys |
115 | |
116 | SYNOPSIS |
117 | _ma_create_index_by_sort() |
118 | info Sort parameters |
119 | no_messages Set to 1 if no output |
120 | sortbuff_size Size of sortbuffer to allocate |
121 | |
122 | RESULT |
123 | 0 ok |
124 | <> 0 Error |
125 | */ |
126 | |
127 | int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages, |
128 | size_t sortbuff_size) |
129 | { |
130 | int error; |
131 | uint sort_length, maxbuffer; |
132 | size_t memavl, old_memavl; |
133 | DYNAMIC_ARRAY buffpek; |
134 | ha_rows records, UNINIT_VAR(keys); |
135 | uchar **sort_keys; |
136 | IO_CACHE tempfile, tempfile_for_exceptions; |
137 | DBUG_ENTER("_ma_create_index_by_sort" ); |
138 | DBUG_PRINT("enter" ,("sort_buff_size: %lu sort_length: %d max_records: %lu" , |
139 | (ulong) sortbuff_size, info->key_length, |
140 | (ulong) info->sort_info->max_records)); |
141 | |
142 | set_sort_param_read_write(info); |
143 | |
144 | my_b_clear(&tempfile); |
145 | my_b_clear(&tempfile_for_exceptions); |
146 | bzero((char*) &buffpek,sizeof(buffpek)); |
147 | sort_keys= (uchar **) NULL; error= 1; |
148 | maxbuffer=1; |
149 | |
150 | memavl=MY_MAX(sortbuff_size,MIN_SORT_MEMORY); |
151 | records= info->sort_info->max_records; |
152 | sort_length= info->key_length; |
153 | |
154 | while (memavl >= MIN_SORT_MEMORY) |
155 | { |
156 | /* Check if we can fit all keys into memory */ |
157 | if (((ulonglong) (records + 1) * |
158 | (sort_length + sizeof(char*)) <= memavl)) |
159 | keys= records+1; |
160 | else if ((info->sort_info->param->testflag & |
161 | (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) == |
162 | T_FORCE_SORT_MEMORY) |
163 | { |
164 | /* |
165 | Use all of the given sort buffer for key data. |
166 | Allocate 1000 buffers at a start for new data. More buffers |
167 | will be allocated when needed. |
168 | */ |
169 | keys= memavl / (sort_length+sizeof(char*)); |
170 | maxbuffer= (uint) MY_MIN((ulonglong) 1000, (records / keys)+1); |
171 | } |
172 | else |
173 | { |
174 | /* |
175 | All keys can't fit in memory. |
176 | Calculate how many keys + buffers we can keep in memory |
177 | */ |
178 | uint maxbuffer_org; |
179 | do |
180 | { |
181 | maxbuffer_org= maxbuffer; |
182 | if (memavl < sizeof(BUFFPEK) * maxbuffer || |
183 | (keys= (memavl-sizeof(BUFFPEK)*maxbuffer)/ |
184 | (sort_length+sizeof(char*))) <= 1 || |
185 | keys < maxbuffer) |
186 | { |
187 | _ma_check_print_error(info->sort_info->param, |
188 | "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
189 | (ulonglong) sortbuff_size, (ulonglong) records, |
190 | sort_length); |
191 | my_errno= ENOMEM; |
192 | goto err; |
193 | } |
194 | } |
195 | while ((maxbuffer= (uint) (records/(keys-1)+1)) != maxbuffer_org); |
196 | } |
197 | |
198 | if ((sort_keys= ((uchar**) |
199 | my_malloc((size_t) (keys*(sort_length+sizeof(char*))+ |
200 | HA_FT_MAXBYTELEN), |
201 | MYF(0))))) |
202 | { |
203 | if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer, |
204 | MY_MIN(maxbuffer/2, 1000), MYF(0))) |
205 | { |
206 | my_free(sort_keys); |
207 | sort_keys= 0; |
208 | } |
209 | else |
210 | break; |
211 | } |
212 | old_memavl=memavl; |
213 | if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY) |
214 | memavl=MIN_SORT_MEMORY; |
215 | } |
216 | if (memavl < MIN_SORT_MEMORY) |
217 | { |
218 | /* purecov: begin inspected */ |
219 | _ma_check_print_error(info->sort_info->param, |
220 | "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
221 | (ulonglong) sortbuff_size, (ulonglong) records, sort_length); |
222 | my_errno= ENOMEM; |
223 | goto err; |
224 | /* purecov: end inspected */ |
225 | } |
226 | (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */ |
227 | |
228 | if (!no_messages) |
229 | my_fprintf(stdout, |
230 | " - Searching for keys, allocating buffer for %llu keys\n" , |
231 | (ulonglong) keys); |
232 | |
233 | if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer, |
234 | &tempfile,&tempfile_for_exceptions)) |
235 | == HA_POS_ERROR) |
236 | goto err; /* purecov: tested */ |
237 | |
238 | info->sort_info->param->stage++; /* Merge stage */ |
239 | |
240 | if (maxbuffer == 0) |
241 | { |
242 | if (!no_messages) |
243 | my_fprintf(stdout, " - Dumping %llu keys\n" , (ulonglong) records); |
244 | if (write_index(info, sort_keys, (ha_keys) records)) |
245 | goto err; /* purecov: inspected */ |
246 | } |
247 | else |
248 | { |
249 | keys=(keys*(sort_length+sizeof(char*)))/sort_length; |
250 | if (maxbuffer >= MERGEBUFF2) |
251 | { |
252 | if (!no_messages) |
253 | my_fprintf(stdout, " - Merging %llu keys\n" , |
254 | (ulonglong) records); /* purecov: tested */ |
255 | if (merge_many_buff(info,keys,sort_keys, |
256 | dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile)) |
257 | goto err; /* purecov: inspected */ |
258 | } |
259 | if (flush_io_cache(&tempfile) || |
260 | reinit_io_cache(&tempfile,READ_CACHE,0L,0,0)) |
261 | goto err; /* purecov: inspected */ |
262 | if (!no_messages) |
263 | printf(" - Last merge and dumping keys\n" ); /* purecov: tested */ |
264 | if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *), |
265 | maxbuffer,&tempfile)) |
266 | goto err; /* purecov: inspected */ |
267 | } |
268 | |
269 | if (flush_maria_ft_buf(info) || _ma_flush_pending_blocks(info)) |
270 | goto err; |
271 | |
272 | if (my_b_inited(&tempfile_for_exceptions)) |
273 | { |
274 | MARIA_HA *idx=info->sort_info->info; |
275 | uint16 key_length; |
276 | MARIA_KEY key; |
277 | key.keyinfo= idx->s->keyinfo + info->key; |
278 | |
279 | if (!no_messages) |
280 | printf(" - Adding exceptions\n" ); /* purecov: tested */ |
281 | if (flush_io_cache(&tempfile_for_exceptions) || |
282 | reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0)) |
283 | goto err; |
284 | |
285 | while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length, |
286 | sizeof(key_length)) |
287 | && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys, |
288 | (uint) key_length)) |
289 | { |
290 | key.data= (uchar*) sort_keys; |
291 | key.ref_length= idx->s->rec_reflength; |
292 | key.data_length= key_length - key.ref_length; |
293 | key.flag= 0; |
294 | if (_ma_ck_write(idx, &key)) |
295 | goto err; |
296 | } |
297 | } |
298 | |
299 | error =0; |
300 | |
301 | err: |
302 | my_free(sort_keys); |
303 | delete_dynamic(&buffpek); |
304 | close_cached_file(&tempfile); |
305 | close_cached_file(&tempfile_for_exceptions); |
306 | |
307 | DBUG_RETURN(error ? -1 : 0); |
308 | } /* _ma_create_index_by_sort */ |
309 | |
310 | |
311 | /* Search after all keys and place them in a temp. file */ |
312 | |
313 | static ha_rows find_all_keys(MARIA_SORT_PARAM *info, ha_rows keys, |
314 | uchar **sort_keys, DYNAMIC_ARRAY *buffpek, |
315 | uint *maxbuffer, IO_CACHE *tempfile, |
316 | IO_CACHE *tempfile_for_exceptions) |
317 | { |
318 | int error; |
319 | ha_rows idx; |
320 | DBUG_ENTER("find_all_keys" ); |
321 | |
322 | idx=error=0; |
323 | sort_keys[0]= (uchar*) (sort_keys+keys); |
324 | |
325 | info->sort_info->info->in_check_table= 1; |
326 | while (!(error=(*info->key_read)(info,sort_keys[idx]))) |
327 | { |
328 | if (info->real_key_length > info->key_length) |
329 | { |
330 | if (write_key(info,sort_keys[idx],tempfile_for_exceptions)) |
331 | goto err; /* purecov: inspected */ |
332 | continue; |
333 | } |
334 | |
335 | if (++idx == keys) |
336 | { |
337 | if (info->write_keys(info,sort_keys,idx-1, |
338 | (BUFFPEK *)alloc_dynamic(buffpek), |
339 | tempfile)) |
340 | goto err; /* purecov: inspected */ |
341 | |
342 | sort_keys[0]=(uchar*) (sort_keys+keys); |
343 | memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); |
344 | idx=1; |
345 | } |
346 | sort_keys[idx]=sort_keys[idx-1]+info->key_length; |
347 | } |
348 | if (error > 0) |
349 | goto err; /* purecov: inspected */ |
350 | if (buffpek->elements) |
351 | { |
352 | if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek), |
353 | tempfile)) |
354 | goto err; /* purecov: inspected */ |
355 | *maxbuffer=buffpek->elements-1; |
356 | } |
357 | else |
358 | *maxbuffer=0; |
359 | |
360 | info->sort_info->info->in_check_table= 0; |
361 | DBUG_RETURN((*maxbuffer)*(keys-1)+idx); |
362 | |
363 | err: |
364 | info->sort_info->info->in_check_table= 0; /* purecov: inspected */ |
365 | DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ |
366 | } /* find_all_keys */ |
367 | |
368 | |
369 | static my_bool _ma_thr_find_all_keys_exec(MARIA_SORT_PARAM* sort_param) |
370 | { |
371 | int error= 0; |
372 | ulonglong memavl, old_memavl; |
373 | longlong sortbuff_size; |
374 | ha_keys UNINIT_VAR(keys), idx; |
375 | uint sort_length; |
376 | uint maxbuffer; |
377 | uchar **sort_keys= NULL; |
378 | DBUG_ENTER("_ma_thr_find_all_keys_exec" ); |
379 | DBUG_PRINT("enter" , ("master: %d" , sort_param->master)); |
380 | |
381 | if (sort_param->sort_info->got_error) |
382 | DBUG_RETURN(TRUE); |
383 | |
384 | set_sort_param_read_write(sort_param); |
385 | |
386 | my_b_clear(&sort_param->tempfile); |
387 | my_b_clear(&sort_param->tempfile_for_exceptions); |
388 | bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek)); |
389 | bzero((char*) &sort_param->unique, sizeof(sort_param->unique)); |
390 | |
391 | sortbuff_size= sort_param->sortbuff_size; |
392 | memavl= MY_MAX(sortbuff_size, MIN_SORT_MEMORY); |
393 | idx= (ha_keys) sort_param->sort_info->max_records; |
394 | sort_length= sort_param->key_length; |
395 | maxbuffer= 1; |
396 | |
397 | while (memavl >= MIN_SORT_MEMORY) |
398 | { |
399 | if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl) |
400 | keys= idx+1; |
401 | else if ((sort_param->sort_info->param->testflag & |
402 | (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) == |
403 | T_FORCE_SORT_MEMORY) |
404 | { |
405 | /* |
406 | Use all of the given sort buffer for key data. |
407 | Allocate 1000 buffers at a start for new data. More buffers |
408 | will be allocated when needed. |
409 | */ |
410 | keys= memavl / (sort_length+sizeof(char*)); |
411 | maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1); |
412 | } |
413 | else |
414 | { |
415 | uint maxbuffer_org; |
416 | do |
417 | { |
418 | maxbuffer_org= maxbuffer; |
419 | if (memavl < sizeof(BUFFPEK)*maxbuffer || |
420 | (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ |
421 | (sort_length+sizeof(char*))) <= 1 || |
422 | keys < maxbuffer) |
423 | { |
424 | _ma_check_print_error(sort_param->sort_info->param, |
425 | "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
426 | sortbuff_size, (ulonglong) idx, sort_length); |
427 | goto err; |
428 | } |
429 | } |
430 | while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org); |
431 | } |
432 | if ((sort_keys= (uchar **) |
433 | my_malloc((size_t)(keys*(sort_length+sizeof(char*))+ |
434 | ((sort_param->keyinfo->flag & HA_FULLTEXT) ? |
435 | HA_FT_MAXBYTELEN : 0)), MYF(0)))) |
436 | { |
437 | if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK), |
438 | maxbuffer, MY_MIN(maxbuffer / 2, 1000), MYF(0))) |
439 | { |
440 | my_free(sort_keys); |
441 | sort_keys= NULL; /* Safety against double free on error. */ |
442 | } |
443 | else |
444 | break; |
445 | } |
446 | old_memavl= memavl; |
447 | if ((memavl= memavl/4*3) < MIN_SORT_MEMORY && |
448 | old_memavl > MIN_SORT_MEMORY) |
449 | memavl= MIN_SORT_MEMORY; |
450 | } |
451 | if (memavl < MIN_SORT_MEMORY) |
452 | { |
453 | /* purecov: begin inspected */ |
454 | _ma_check_print_error(sort_param->sort_info->param, |
455 | "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
456 | sortbuff_size, (ulonglong) idx, sort_length); |
457 | my_errno= ENOMEM; |
458 | goto err; |
459 | /* purecov: end inspected */ |
460 | } |
461 | |
462 | if (sort_param->sort_info->param->testflag & T_VERBOSE) |
463 | my_fprintf(stdout, |
464 | "Key %d - Allocating buffer for %llu keys\n" , |
465 | sort_param->key + 1, (ulonglong) keys); |
466 | sort_param->sort_keys= sort_keys; |
467 | |
468 | idx= error= 0; |
469 | sort_keys[0]= (uchar*) (sort_keys+keys); |
470 | |
471 | DBUG_PRINT("info" , ("reading keys" )); |
472 | while (!(error= sort_param->sort_info->got_error) && |
473 | !(error= (*sort_param->key_read)(sort_param, sort_keys[idx]))) |
474 | { |
475 | if (sort_param->real_key_length > sort_param->key_length) |
476 | { |
477 | if (write_key(sort_param, sort_keys[idx], |
478 | &sort_param->tempfile_for_exceptions)) |
479 | goto err; |
480 | continue; |
481 | } |
482 | |
483 | if (++idx == keys) |
484 | { |
485 | if (sort_param->write_keys(sort_param, sort_keys, idx - 1, |
486 | (BUFFPEK *)alloc_dynamic(&sort_param->buffpek), |
487 | &sort_param->tempfile)) |
488 | goto err; |
489 | sort_keys[0]= (uchar*) (sort_keys+keys); |
490 | memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length); |
491 | idx= 1; |
492 | } |
493 | sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length; |
494 | } |
495 | if (error > 0) |
496 | goto err; |
497 | if (sort_param->buffpek.elements) |
498 | { |
499 | if (sort_param->write_keys(sort_param,sort_keys, idx, |
500 | (BUFFPEK *) alloc_dynamic(&sort_param->buffpek), |
501 | &sort_param->tempfile)) |
502 | goto err; |
503 | sort_param->keys= (uint)((sort_param->buffpek.elements - 1) * (keys - 1) + idx); |
504 | } |
505 | else |
506 | sort_param->keys= (uint)idx; |
507 | |
508 | DBUG_RETURN(FALSE); |
509 | |
510 | err: |
511 | DBUG_PRINT("error" , ("got some error" )); |
512 | my_free(sort_keys); |
513 | sort_param->sort_keys= 0; |
514 | delete_dynamic(& sort_param->buffpek); |
515 | close_cached_file(&sort_param->tempfile); |
516 | close_cached_file(&sort_param->tempfile_for_exceptions); |
517 | |
518 | DBUG_RETURN(TRUE); |
519 | } |
520 | |
521 | /* Search after all keys and place them in a temp. file */ |
522 | |
523 | pthread_handler_t _ma_thr_find_all_keys(void *arg) |
524 | { |
525 | MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg; |
526 | my_bool error= FALSE; |
527 | /* If my_thread_init fails */ |
528 | if (my_thread_init() || _ma_thr_find_all_keys_exec(sort_param)) |
529 | error= TRUE; |
530 | |
531 | /* |
532 | Thread must clean up after itself. |
533 | */ |
534 | free_root(&sort_param->wordroot, MYF(0)); |
535 | /* |
536 | Detach from the share if the writer is involved. Avoid others to |
537 | be blocked. This includes a flush of the write buffer. This will |
538 | also indicate EOF to the readers. |
539 | That means that a writer always gets here first and readers - |
540 | only when they see EOF. But if a reader finishes prematurely |
541 | because of an error it may reach this earlier - don't allow it |
542 | to detach the writer thread. |
543 | */ |
544 | if (sort_param->master && sort_param->sort_info->info->rec_cache.share) |
545 | remove_io_thread(&sort_param->sort_info->info->rec_cache); |
546 | |
547 | /* Readers detach from the share if any. Avoid others to be blocked. */ |
548 | if (sort_param->read_cache.share) |
549 | remove_io_thread(&sort_param->read_cache); |
550 | |
551 | mysql_mutex_lock(&sort_param->sort_info->mutex); |
552 | if (error) |
553 | sort_param->sort_info->got_error= 1; |
554 | |
555 | if (!--sort_param->sort_info->threads_running) |
556 | mysql_cond_signal(&sort_param->sort_info->cond); |
557 | mysql_mutex_unlock(&sort_param->sort_info->mutex); |
558 | |
559 | my_thread_end(); |
560 | return NULL; |
561 | } |
562 | |
563 | |
564 | int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param) |
565 | { |
566 | MARIA_SORT_INFO *sort_info=sort_param->sort_info; |
567 | HA_CHECK *param=sort_info->param; |
568 | size_t UNINIT_VAR(length), keys; |
569 | double *rec_per_key_part= param->new_rec_per_key_part; |
570 | int got_error=sort_info->got_error; |
571 | uint i; |
572 | MARIA_HA *info=sort_info->info; |
573 | MARIA_SHARE *share= info->s; |
574 | MARIA_SORT_PARAM *sinfo; |
575 | uchar *mergebuf=0; |
576 | DBUG_ENTER("_ma_thr_write_keys" ); |
577 | |
578 | for (i= 0, sinfo= sort_param ; |
579 | i < sort_info->total_keys ; |
580 | i++, sinfo++) |
581 | { |
582 | if (!sinfo->sort_keys) |
583 | { |
584 | got_error=1; |
585 | my_free(sinfo->rec_buff); |
586 | continue; |
587 | } |
588 | if (!got_error) |
589 | { |
590 | maria_set_key_active(share->state.key_map, sinfo->key); |
591 | |
592 | if (!sinfo->buffpek.elements) |
593 | { |
594 | if (param->testflag & T_VERBOSE) |
595 | { |
596 | my_fprintf(stdout, |
597 | "Key %d - Dumping %llu keys\n" , sinfo->key+1, |
598 | (ulonglong) sinfo->keys); |
599 | fflush(stdout); |
600 | } |
601 | if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || |
602 | flush_maria_ft_buf(sinfo) || _ma_flush_pending_blocks(sinfo)) |
603 | got_error=1; |
604 | } |
605 | } |
606 | my_free(sinfo->sort_keys); |
607 | my_free(sinfo->rec_buff); |
608 | sinfo->sort_keys=0; |
609 | } |
610 | |
611 | for (i= 0, sinfo= sort_param ; |
612 | i < sort_info->total_keys ; |
613 | i++, |
614 | delete_dynamic(&sinfo->buffpek), |
615 | close_cached_file(&sinfo->tempfile), |
616 | close_cached_file(&sinfo->tempfile_for_exceptions), |
617 | rec_per_key_part+= sinfo->keyinfo->keysegs, |
618 | sinfo++) |
619 | { |
620 | if (got_error) |
621 | continue; |
622 | |
623 | set_sort_param_read_write(sinfo); |
624 | |
625 | if (sinfo->buffpek.elements) |
626 | { |
627 | uint maxbuffer=sinfo->buffpek.elements-1; |
628 | if (!mergebuf) |
629 | { |
630 | length=(size_t)param->sort_buffer_length; |
631 | while (length >= MIN_SORT_MEMORY) |
632 | { |
633 | if ((mergebuf= my_malloc((size_t) length, MYF(0)))) |
634 | break; |
635 | length=length*3/4; |
636 | } |
637 | if (!mergebuf) |
638 | { |
639 | got_error=1; |
640 | continue; |
641 | } |
642 | } |
643 | keys=length/sinfo->key_length; |
644 | if (maxbuffer >= MERGEBUFF2) |
645 | { |
646 | if (param->testflag & T_VERBOSE) |
647 | my_fprintf(stdout, |
648 | "Key %d - Merging %llu keys\n" , |
649 | sinfo->key+1, (ulonglong) sinfo->keys); |
650 | if (merge_many_buff(sinfo, keys, (uchar **)mergebuf, |
651 | dynamic_element(&sinfo->buffpek, 0, BUFFPEK *), |
652 | &maxbuffer, &sinfo->tempfile)) |
653 | { |
654 | got_error=1; |
655 | continue; |
656 | } |
657 | } |
658 | if (flush_io_cache(&sinfo->tempfile) || |
659 | reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0)) |
660 | { |
661 | got_error=1; |
662 | continue; |
663 | } |
664 | if (param->testflag & T_VERBOSE) |
665 | printf("Key %d - Last merge and dumping keys\n" , sinfo->key+1); |
666 | if (merge_index(sinfo, keys, (uchar**) mergebuf, |
667 | dynamic_element(&sinfo->buffpek,0,BUFFPEK *), |
668 | maxbuffer,&sinfo->tempfile) || |
669 | flush_maria_ft_buf(sinfo) || |
670 | _ma_flush_pending_blocks(sinfo)) |
671 | { |
672 | got_error=1; |
673 | continue; |
674 | } |
675 | } |
676 | if (my_b_inited(&sinfo->tempfile_for_exceptions)) |
677 | { |
678 | uint16 key_length; |
679 | |
680 | if (param->testflag & T_VERBOSE) |
681 | printf("Key %d - Dumping 'long' keys\n" , sinfo->key+1); |
682 | |
683 | if (flush_io_cache(&sinfo->tempfile_for_exceptions) || |
684 | reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0)) |
685 | { |
686 | got_error=1; |
687 | continue; |
688 | } |
689 | |
690 | while (!got_error && |
691 | !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length, |
692 | sizeof(key_length))) |
693 | { |
694 | uchar maria_ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10]; |
695 | if (key_length > sizeof(maria_ft_buf) || |
696 | my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)maria_ft_buf, |
697 | (uint) key_length)) |
698 | got_error= 1; |
699 | else |
700 | { |
701 | MARIA_KEY tmp_key; |
702 | tmp_key.keyinfo= info->s->keyinfo + sinfo->key; |
703 | tmp_key.data= maria_ft_buf; |
704 | tmp_key.ref_length= info->s->rec_reflength; |
705 | tmp_key.data_length= key_length - info->s->rec_reflength; |
706 | tmp_key.flag= 0; |
707 | if (_ma_ck_write(info, &tmp_key)) |
708 | got_error=1; |
709 | } |
710 | } |
711 | } |
712 | if (!got_error && (param->testflag & T_STATISTICS)) |
713 | maria_update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique, |
714 | param->stats_method == |
715 | MI_STATS_METHOD_IGNORE_NULLS ? |
716 | sinfo->notnull : NULL, |
717 | (ulonglong) share->state.state.records); |
718 | |
719 | } |
720 | my_free(mergebuf); |
721 | DBUG_RETURN(got_error); |
722 | } |
723 | |
724 | |
725 | /* Write all keys in memory to file for later merge */ |
726 | |
727 | static int write_keys(MARIA_SORT_PARAM *info, register uchar **sort_keys, |
728 | ha_keys count, BUFFPEK *buffpek, IO_CACHE *tempfile) |
729 | { |
730 | uchar **end; |
731 | uint sort_length=info->key_length; |
732 | DBUG_ENTER("write_keys" ); |
733 | |
734 | if (!buffpek) |
735 | DBUG_RETURN(1); /* Out of memory */ |
736 | |
737 | my_qsort2((uchar*) sort_keys,(size_t) count, sizeof(uchar*), |
738 | (qsort2_cmp) info->key_cmp, info); |
739 | if (!my_b_inited(tempfile) && |
740 | open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST" , |
741 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
742 | DBUG_RETURN(1); /* purecov: inspected */ |
743 | |
744 | buffpek->file_pos=my_b_tell(tempfile); |
745 | buffpek->count=count; |
746 | |
747 | for (end=sort_keys+count ; sort_keys != end ; sort_keys++) |
748 | { |
749 | if (my_b_write(tempfile, *sort_keys, sort_length)) |
750 | DBUG_RETURN(1); /* purecov: inspected */ |
751 | } |
752 | DBUG_RETURN(0); |
753 | } /* write_keys */ |
754 | |
755 | |
756 | static inline int |
757 | my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs) |
758 | { |
759 | int err; |
760 | uint16 len= _ma_keylength(info->keyinfo, bufs); |
761 | |
762 | /* The following is safe as this is a local file */ |
763 | if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len)))) |
764 | return (err); |
765 | if ((err= my_b_write(to_file,bufs, (uint) len))) |
766 | return (err); |
767 | return (0); |
768 | } |
769 | |
770 | |
771 | static int write_keys_varlen(MARIA_SORT_PARAM *info, |
772 | register uchar **sort_keys, |
773 | ha_keys count, BUFFPEK *buffpek, |
774 | IO_CACHE *tempfile) |
775 | { |
776 | uchar **end; |
777 | int err; |
778 | DBUG_ENTER("write_keys_varlen" ); |
779 | |
780 | if (!buffpek) |
781 | DBUG_RETURN(1); /* Out of memory */ |
782 | |
783 | my_qsort2((uchar*) sort_keys, (size_t) count, sizeof(uchar*), |
784 | (qsort2_cmp) info->key_cmp, info); |
785 | if (!my_b_inited(tempfile) && |
786 | open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST" , |
787 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
788 | DBUG_RETURN(1); /* purecov: inspected */ |
789 | |
790 | buffpek->file_pos=my_b_tell(tempfile); |
791 | buffpek->count=count; |
792 | for (end=sort_keys+count ; sort_keys != end ; sort_keys++) |
793 | { |
794 | if ((err= my_var_write(info,tempfile, *sort_keys))) |
795 | DBUG_RETURN(err); |
796 | } |
797 | DBUG_RETURN(0); |
798 | } /* write_keys_varlen */ |
799 | |
800 | |
801 | static int write_key(MARIA_SORT_PARAM *info, uchar *key, |
802 | IO_CACHE *tempfile) |
803 | { |
804 | uint16 key_length=info->real_key_length; |
805 | DBUG_ENTER("write_key" ); |
806 | |
807 | if (!my_b_inited(tempfile) && |
808 | open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST" , |
809 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
810 | DBUG_RETURN(1); |
811 | |
812 | if (my_b_write(tempfile, (uchar*)&key_length,sizeof(key_length)) || |
813 | my_b_write(tempfile, key, (uint) key_length)) |
814 | DBUG_RETURN(1); |
815 | DBUG_RETURN(0); |
816 | } /* write_key */ |
817 | |
818 | |
819 | /* Write index */ |
820 | |
821 | static int write_index(MARIA_SORT_PARAM *info, register uchar **sort_keys, |
822 | register ha_keys count) |
823 | { |
824 | DBUG_ENTER("write_index" ); |
825 | |
826 | my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*), |
827 | (qsort2_cmp) info->key_cmp,info); |
828 | while (count--) |
829 | { |
830 | if ((*info->key_write)(info, *sort_keys++)) |
831 | DBUG_RETURN(-1); /* purecov: inspected */ |
832 | } |
833 | if (info->sort_info->param->max_stage != 1) /* If not parallel */ |
834 | _ma_report_progress(info->sort_info->param, 1, 1); |
835 | DBUG_RETURN(0); |
836 | } /* write_index */ |
837 | |
838 | |
839 | /* Merge buffers to make < MERGEBUFF2 buffers */ |
840 | |
841 | static int merge_many_buff(MARIA_SORT_PARAM *info, ha_keys keys, |
842 | uchar **sort_keys, BUFFPEK *buffpek, |
843 | uint *maxbuffer, IO_CACHE *t_file) |
844 | { |
845 | uint tmp, merges, max_merges; |
846 | IO_CACHE t_file2, *from_file, *to_file, *temp; |
847 | BUFFPEK *lastbuff; |
848 | DBUG_ENTER("merge_many_buff" ); |
849 | |
850 | if (*maxbuffer < MERGEBUFF2) |
851 | DBUG_RETURN(0); /* purecov: inspected */ |
852 | if (flush_io_cache(t_file) || |
853 | open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST" , |
854 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
855 | DBUG_RETURN(1); /* purecov: inspected */ |
856 | |
857 | /* Calculate how many merges are needed */ |
858 | max_merges= 1; /* Count merge_index */ |
859 | tmp= *maxbuffer; |
860 | while (tmp >= MERGEBUFF2) |
861 | { |
862 | merges= (tmp-MERGEBUFF*3/2 + 1) / MERGEBUFF + 1; |
863 | max_merges+= merges; |
864 | tmp= merges; |
865 | } |
866 | merges= 0; |
867 | |
868 | from_file= t_file ; to_file= &t_file2; |
869 | while (*maxbuffer >= MERGEBUFF2) |
870 | { |
871 | uint i; |
872 | reinit_io_cache(from_file,READ_CACHE,0L,0,0); |
873 | reinit_io_cache(to_file,WRITE_CACHE,0L,0,0); |
874 | lastbuff=buffpek; |
875 | for (i=0 ; i + MERGEBUFF*3/2 <= *maxbuffer ; i+=MERGEBUFF) |
876 | { |
877 | if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, |
878 | buffpek+i,buffpek+i+MERGEBUFF-1)) |
879 | goto cleanup; |
880 | if (info->sort_info->param->max_stage != 1) /* If not parallel */ |
881 | _ma_report_progress(info->sort_info->param, merges++, max_merges); |
882 | } |
883 | if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, |
884 | buffpek+i,buffpek+ *maxbuffer)) |
885 | break; /* purecov: inspected */ |
886 | if (flush_io_cache(to_file)) |
887 | break; /* purecov: inspected */ |
888 | temp=from_file; from_file=to_file; to_file=temp; |
889 | *maxbuffer= (uint) (lastbuff-buffpek)-1; |
890 | if (info->sort_info->param->max_stage != 1) /* If not parallel */ |
891 | _ma_report_progress(info->sort_info->param, merges++, max_merges); |
892 | } |
893 | cleanup: |
894 | close_cached_file(to_file); /* This holds old result */ |
895 | if (to_file == t_file) |
896 | { |
897 | DBUG_ASSERT(t_file2.type == WRITE_CACHE); |
898 | *t_file=t_file2; /* Copy result file */ |
899 | } |
900 | |
901 | DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */ |
902 | } /* merge_many_buff */ |
903 | |
904 | |
905 | /* |
906 | Read data to buffer |
907 | |
908 | SYNOPSIS |
909 | read_to_buffer() |
910 | fromfile File to read from |
911 | buffpek Where to read from |
912 | sort_length max length to read |
913 | RESULT |
914 | > 0 Ammount of bytes read |
915 | -1 Error |
916 | */ |
917 | |
918 | static my_off_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek, |
919 | uint sort_length) |
920 | { |
921 | register ha_keys count; |
922 | size_t length; |
923 | |
924 | if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys, |
925 | (ha_rows) buffpek->count))) |
926 | { |
927 | if (my_b_pread(fromfile, (uchar*) buffpek->base, |
928 | (length= sort_length * (size_t)count), buffpek->file_pos)) |
929 | return(HA_OFFSET_ERROR); /* purecov: inspected */ |
930 | buffpek->key=buffpek->base; |
931 | buffpek->file_pos+= length; /* New filepos */ |
932 | buffpek->count-= count; |
933 | buffpek->mem_count= count; |
934 | } |
935 | return (((my_off_t) count) * sort_length); |
936 | } /* read_to_buffer */ |
937 | |
938 | |
939 | static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek, |
940 | uint sort_length) |
941 | { |
942 | register ha_keys count; |
943 | uint idx; |
944 | uchar *buffp; |
945 | |
946 | if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count))) |
947 | { |
948 | buffp= buffpek->base; |
949 | |
950 | for (idx=1;idx<=count;idx++) |
951 | { |
952 | uint16 length_of_key; |
953 | if (my_b_pread(fromfile, (uchar*)&length_of_key, |
954 | sizeof(length_of_key), buffpek->file_pos)) |
955 | return(HA_OFFSET_ERROR); |
956 | buffpek->file_pos+=sizeof(length_of_key); |
957 | if (my_b_pread(fromfile, (uchar*) buffp, |
958 | length_of_key, buffpek->file_pos)) |
959 | return((uint) -1); |
960 | buffpek->file_pos+=length_of_key; |
961 | buffp = buffp + sort_length; |
962 | } |
963 | buffpek->key=buffpek->base; |
964 | buffpek->count-= count; |
965 | buffpek->mem_count= count; |
966 | } |
967 | return (((my_off_t) count) * sort_length); |
968 | } /* read_to_buffer_varlen */ |
969 | |
970 | |
971 | static int write_merge_key_varlen(MARIA_SORT_PARAM *info, |
972 | IO_CACHE *to_file, uchar* key, |
973 | uint sort_length, ha_keys count) |
974 | { |
975 | ha_keys idx; |
976 | uchar *bufs = key; |
977 | |
978 | for (idx=1;idx<=count;idx++) |
979 | { |
980 | int err; |
981 | if ((err= my_var_write(info, to_file, bufs))) |
982 | return (err); |
983 | bufs=bufs+sort_length; |
984 | } |
985 | return(0); |
986 | } |
987 | |
988 | |
989 | static int write_merge_key(MARIA_SORT_PARAM *info __attribute__((unused)), |
990 | IO_CACHE *to_file, uchar *key, |
991 | uint sort_length, ha_keys count) |
992 | { |
993 | return my_b_write(to_file, key, (size_t) (sort_length * count)); |
994 | } |
995 | |
996 | /* |
997 | Merge buffers to one buffer |
998 | If to_file == 0 then use info->key_write |
999 | |
1000 | Return: |
1001 | 0 ok |
1002 | 1 error |
1003 | */ |
1004 | |
1005 | static int |
1006 | merge_buffers(MARIA_SORT_PARAM *info, ha_keys keys, IO_CACHE *from_file, |
1007 | IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff, |
1008 | BUFFPEK *Fb, BUFFPEK *Tb) |
1009 | { |
1010 | int error= 1; |
1011 | uint sort_length; |
1012 | ha_keys maxcount; |
1013 | ha_rows count; |
1014 | my_off_t UNINIT_VAR(to_start_filepos), read_length; |
1015 | uchar *strpos; |
1016 | BUFFPEK *buffpek,**refpek; |
1017 | QUEUE queue; |
1018 | DBUG_ENTER("merge_buffers" ); |
1019 | |
1020 | count= 0; |
1021 | maxcount= keys/((uint) (Tb-Fb) +1); |
1022 | DBUG_ASSERT(maxcount > 0); |
1023 | if (to_file) |
1024 | to_start_filepos=my_b_tell(to_file); |
1025 | strpos= (uchar*) sort_keys; |
1026 | sort_length=info->key_length; |
1027 | |
1028 | if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0, |
1029 | (int (*)(void*, uchar *,uchar*)) info->key_cmp, |
1030 | (void*) info, 0, 0)) |
1031 | DBUG_RETURN(1); /* purecov: inspected */ |
1032 | |
1033 | for (buffpek= Fb ; buffpek <= Tb ; buffpek++) |
1034 | { |
1035 | count+= buffpek->count; |
1036 | buffpek->base= strpos; |
1037 | buffpek->max_keys= maxcount; |
1038 | strpos+= (read_length= info->read_to_buffer(from_file,buffpek, |
1039 | sort_length)); |
1040 | if (read_length == HA_OFFSET_ERROR) |
1041 | goto err; /* purecov: inspected */ |
1042 | queue_insert(&queue,(uchar*) buffpek); |
1043 | } |
1044 | |
1045 | while (queue.elements > 1) |
1046 | { |
1047 | for (;;) |
1048 | { |
1049 | buffpek=(BUFFPEK*) queue_top(&queue); |
1050 | if (to_file) |
1051 | { |
1052 | if (info->write_key(info,to_file, buffpek->key, |
1053 | sort_length, 1)) |
1054 | goto err; /* purecov: inspected */ |
1055 | } |
1056 | else |
1057 | { |
1058 | if ((*info->key_write)(info,(void*) buffpek->key)) |
1059 | goto err; /* purecov: inspected */ |
1060 | } |
1061 | buffpek->key+=sort_length; |
1062 | if (! --buffpek->mem_count) |
1063 | { |
1064 | /* It's enough to check for killedptr before a slow operation */ |
1065 | if (_ma_killed_ptr(info->sort_info->param)) |
1066 | goto err; |
1067 | if (!(read_length= info->read_to_buffer(from_file,buffpek,sort_length))) |
1068 | { |
1069 | uchar *base= buffpek->base; |
1070 | ha_keys max_keys=buffpek->max_keys; |
1071 | |
1072 | queue_remove_top(&queue); |
1073 | |
1074 | /* Put room used by buffer to use in other buffer */ |
1075 | for (refpek= (BUFFPEK**) &queue_top(&queue); |
1076 | refpek <= (BUFFPEK**) &queue_end(&queue); |
1077 | refpek++) |
1078 | { |
1079 | buffpek= *refpek; |
1080 | if (buffpek->base+buffpek->max_keys*sort_length == base) |
1081 | { |
1082 | buffpek->max_keys+=max_keys; |
1083 | break; |
1084 | } |
1085 | else if (base+max_keys*sort_length == buffpek->base) |
1086 | { |
1087 | buffpek->base=base; |
1088 | buffpek->max_keys+=max_keys; |
1089 | break; |
1090 | } |
1091 | } |
1092 | break; /* One buffer have been removed */ |
1093 | } |
1094 | else if (read_length == HA_OFFSET_ERROR) |
1095 | goto err; /* purecov: inspected */ |
1096 | } |
1097 | queue_replace_top(&queue); /* Top element has been replaced */ |
1098 | } |
1099 | } |
1100 | buffpek=(BUFFPEK*) queue_top(&queue); |
1101 | buffpek->base= (uchar*) sort_keys; |
1102 | buffpek->max_keys=keys; |
1103 | do |
1104 | { |
1105 | if (to_file) |
1106 | { |
1107 | if (info->write_key(info, to_file, buffpek->key, |
1108 | sort_length,buffpek->mem_count)) |
1109 | { |
1110 | error=1; goto err; /* purecov: inspected */ |
1111 | } |
1112 | } |
1113 | else |
1114 | { |
1115 | register uchar *end; |
1116 | strpos= buffpek->key; |
1117 | for (end= strpos+buffpek->mem_count*sort_length; |
1118 | strpos != end ; |
1119 | strpos+=sort_length) |
1120 | { |
1121 | if ((*info->key_write)(info, strpos)) |
1122 | { |
1123 | error=1; goto err; /* purecov: inspected */ |
1124 | } |
1125 | } |
1126 | } |
1127 | } |
1128 | while ((read_length= info->read_to_buffer(from_file,buffpek,sort_length)) != HA_OFFSET_ERROR && read_length != 0); |
1129 | if (read_length == 0) |
1130 | error= 0; |
1131 | |
1132 | lastbuff->count=count; |
1133 | if (to_file) |
1134 | lastbuff->file_pos=to_start_filepos; |
1135 | err: |
1136 | delete_queue(&queue); |
1137 | DBUG_RETURN(error); |
1138 | } /* merge_buffers */ |
1139 | |
1140 | |
1141 | /* Do a merge to output-file (save only positions) */ |
1142 | |
1143 | static int |
1144 | merge_index(MARIA_SORT_PARAM *info, ha_keys keys, uchar **sort_keys, |
1145 | BUFFPEK *buffpek, uint maxbuffer, IO_CACHE *tempfile) |
1146 | { |
1147 | DBUG_ENTER("merge_index" ); |
1148 | if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek, |
1149 | buffpek+maxbuffer)) |
1150 | DBUG_RETURN(1); /* purecov: inspected */ |
1151 | if (info->sort_info->param->max_stage != 1) /* If not parallel */ |
1152 | _ma_report_progress(info->sort_info->param, 1, 1); |
1153 | DBUG_RETURN(0); |
1154 | } /* merge_index */ |
1155 | |
1156 | |
1157 | static int flush_maria_ft_buf(MARIA_SORT_PARAM *info) |
1158 | { |
1159 | int err=0; |
1160 | if (info->sort_info->ft_buf) |
1161 | { |
1162 | err=_ma_sort_ft_buf_flush(info); |
1163 | my_free(info->sort_info->ft_buf); |
1164 | info->sort_info->ft_buf=0; |
1165 | } |
1166 | return err; |
1167 | } |
1168 | |