1 | /* Copyright (c) 2000, 2012, Oracle and/or its affiliates. |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License |
13 | along with this program; if not, write to the Free Software |
14 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
15 | |
16 | /* |
17 | Creates a index for a database by reading keys, sorting them and outputing |
18 | them in sorted order through MI_SORT_INFO functions. |
19 | */ |
20 | |
21 | #include "fulltext.h" |
22 | #if defined(__WIN__) |
23 | #include <fcntl.h> |
24 | #else |
25 | #include <stddef.h> |
26 | #endif |
27 | #include <queues.h> |
28 | |
29 | /* static variables */ |
30 | |
31 | #undef MYF_RW |
32 | #undef DISK_BUFFER_SIZE |
33 | |
34 | #define MERGEBUFF 15 |
35 | #define MERGEBUFF2 31 |
36 | #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL) |
37 | #define DISK_BUFFER_SIZE (IO_SIZE*128) |
38 | |
39 | /* How many keys we can keep in memory */ |
40 | typedef ulonglong ha_keys; |
41 | |
42 | /* |
43 | Pointers of functions for store and read keys from temp file |
44 | */ |
45 | |
46 | extern void print_error(const char *fmt,...); |
47 | |
48 | /* Functions defined in this file */ |
49 | |
50 | static ha_rows find_all_keys(MI_SORT_PARAM *info, ha_keys keys, |
51 | uchar **sort_keys, |
52 | DYNAMIC_ARRAY *buffpek,uint *maxbuffer, |
53 | IO_CACHE *tempfile, |
54 | IO_CACHE *tempfile_for_exceptions); |
55 | static int write_keys(MI_SORT_PARAM *info,uchar **sort_keys, |
56 | ha_keys count, BUFFPEK *buffpek,IO_CACHE *tempfile); |
57 | static int write_key(MI_SORT_PARAM *info, uchar *key, |
58 | IO_CACHE *tempfile); |
59 | static int write_index(MI_SORT_PARAM *info,uchar * *sort_keys, |
60 | ha_keys count); |
61 | static int merge_many_buff(MI_SORT_PARAM *info, ha_keys keys, |
62 | uchar * *sort_keys, |
63 | BUFFPEK *buffpek, uint *maxbuffer, |
64 | IO_CACHE *t_file); |
65 | static my_off_t read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek, |
66 | uint sort_length); |
67 | static int merge_buffers(MI_SORT_PARAM *info, ha_keys keys, |
68 | IO_CACHE *from_file, IO_CACHE *to_file, |
69 | uchar * *sort_keys, BUFFPEK *lastbuff, |
70 | BUFFPEK *Fb, BUFFPEK *Tb); |
71 | static int merge_index(MI_SORT_PARAM *,ha_keys,uchar **,BUFFPEK *, uint, |
72 | IO_CACHE *); |
73 | static int flush_ft_buf(MI_SORT_PARAM *info); |
74 | |
75 | static int write_keys_varlen(MI_SORT_PARAM *info,uchar **sort_keys, |
76 | ha_keys count, BUFFPEK *buffpek, |
77 | IO_CACHE *tempfile); |
78 | static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek, |
79 | uint sort_length); |
80 | static int write_merge_key(MI_SORT_PARAM *info, IO_CACHE *to_file, |
81 | uchar *key, uint sort_length, ha_keys count); |
82 | static int write_merge_key_varlen(MI_SORT_PARAM *info, |
83 | IO_CACHE *to_file, |
84 | uchar* key, uint sort_length, |
85 | ha_keys count); |
86 | static inline int |
87 | my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs); |
88 | |
89 | |
90 | /* |
91 | Sets the appropriate read and write methods for the MI_SORT_PARAM |
92 | based on the variable length key flag. |
93 | */ |
94 | static void set_sort_param_read_write(MI_SORT_PARAM *sort_param) |
95 | { |
96 | if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY) |
97 | { |
98 | sort_param->write_keys= write_keys_varlen; |
99 | sort_param->read_to_buffer= read_to_buffer_varlen; |
100 | sort_param->write_key= write_merge_key_varlen; |
101 | } |
102 | else |
103 | { |
104 | sort_param->write_keys= write_keys; |
105 | sort_param->read_to_buffer= read_to_buffer; |
106 | sort_param->write_key= write_merge_key; |
107 | } |
108 | } |
109 | |
110 | |
111 | /* |
112 | Creates a index of sorted keys |
113 | |
114 | SYNOPSIS |
115 | _create_index_by_sort() |
116 | info Sort parameters |
117 | no_messages Set to 1 if no output |
118 | sortbuff_size Size if sortbuffer to allocate |
119 | |
120 | RESULT |
121 | 0 ok |
122 | <> 0 Error |
123 | */ |
124 | |
125 | int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, |
126 | ulonglong sortbuff_size) |
127 | { |
128 | int error; |
129 | uint sort_length, maxbuffer; |
130 | ulonglong memavl, old_memavl; |
131 | DYNAMIC_ARRAY buffpek; |
132 | ha_rows records, UNINIT_VAR(keys); |
133 | uchar **sort_keys; |
134 | IO_CACHE tempfile, tempfile_for_exceptions; |
135 | DBUG_ENTER("_create_index_by_sort" ); |
136 | DBUG_PRINT("enter" ,("sort_length: %u" , info->key_length)); |
137 | |
138 | set_sort_param_read_write(info); |
139 | |
140 | my_b_clear(&tempfile); |
141 | my_b_clear(&tempfile_for_exceptions); |
142 | bzero((char*) &buffpek,sizeof(buffpek)); |
143 | sort_keys= (uchar **) NULL; error= 1; |
144 | maxbuffer=1; |
145 | |
146 | memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER); |
147 | records= info->sort_info->max_records; |
148 | sort_length= info->key_length; |
149 | |
150 | while (memavl >= MIN_SORT_BUFFER) |
151 | { |
152 | /* Check if we can fit all keys into memory */ |
153 | if (((ulonglong) (records + 1) * |
154 | (sort_length + sizeof(char*)) <= memavl)) |
155 | keys= records+1; |
156 | else if ((info->sort_info->param->testflag & |
157 | (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) == |
158 | T_FORCE_SORT_MEMORY) |
159 | { |
160 | /* |
161 | Use all of the given sort buffer for key data. |
162 | Allocate 1000 buffers at a start for new data. More buffers |
163 | will be allocated when needed. |
164 | */ |
165 | keys= memavl / (sort_length+sizeof(char*)); |
166 | maxbuffer= (uint) MY_MIN((ulonglong) 1000, (records / keys)+1); |
167 | } |
168 | else |
169 | { |
170 | /* |
171 | All keys can't fit in memory. |
172 | Calculate how many keys + buffers we can keep in memory |
173 | */ |
174 | uint maxbuffer_org; |
175 | do |
176 | { |
177 | maxbuffer_org= maxbuffer; |
178 | if (memavl < sizeof(BUFFPEK) * maxbuffer || |
179 | (keys= (memavl-sizeof(BUFFPEK)*maxbuffer)/ |
180 | (sort_length+sizeof(char*))) <= 1 || |
181 | keys < maxbuffer) |
182 | { |
183 | mi_check_print_error(info->sort_info->param, |
184 | "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
185 | sortbuff_size, (ulonglong) records, |
186 | sort_length); |
187 | my_errno= ENOMEM; |
188 | goto err; |
189 | } |
190 | } |
191 | while ((maxbuffer= (uint) (records/(keys-1)+1)) != maxbuffer_org); |
192 | } |
193 | |
194 | if ((sort_keys= ((uchar **) |
195 | my_malloc((size_t) (keys*(sort_length+sizeof(char*))+ |
196 | HA_FT_MAXBYTELEN), MYF(0))))) |
197 | { |
198 | if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer, |
199 | MY_MIN(maxbuffer/2, 1000), MYF(0))) |
200 | { |
201 | my_free(sort_keys); |
202 | sort_keys= 0; |
203 | } |
204 | else |
205 | break; |
206 | } |
207 | old_memavl=memavl; |
208 | if ((memavl= memavl/4*3) < MIN_SORT_BUFFER && old_memavl > MIN_SORT_BUFFER) |
209 | memavl= MIN_SORT_BUFFER; |
210 | } |
211 | if (memavl < MIN_SORT_BUFFER) |
212 | { |
213 | /* purecov: begin inspected */ |
214 | mi_check_print_error(info->sort_info->param, |
215 | "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
216 | sortbuff_size, (ulonglong) records, sort_length); |
217 | my_errno= ENOMEM; |
218 | goto err; |
219 | /* purecov: end inspected */ |
220 | } |
221 | (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */ |
222 | |
223 | if (!no_messages) |
224 | my_fprintf(stdout, |
225 | " - Searching for keys, allocating buffer for %llu keys\n" , |
226 | (ulonglong) keys); |
227 | |
228 | if ((records= find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer, |
229 | &tempfile,&tempfile_for_exceptions)) |
230 | == HA_POS_ERROR) |
231 | goto err; /* purecov: tested */ |
232 | if (maxbuffer == 0) |
233 | { |
234 | if (!no_messages) |
235 | my_fprintf(stdout, " - Dumping %llu keys\n" , (ulonglong) records); |
236 | if (write_index(info, sort_keys, (ha_keys) records)) |
237 | goto err; /* purecov: inspected */ |
238 | } |
239 | else |
240 | { |
241 | keys=(keys*(sort_length+sizeof(char*)))/sort_length; |
242 | if (maxbuffer >= MERGEBUFF2) |
243 | { |
244 | if (!no_messages) |
245 | my_fprintf(stdout, " - Merging %llu keys\n" , |
246 | (ulonglong) records); /* purecov: tested */ |
247 | if (merge_many_buff(info,keys,sort_keys, |
248 | dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile)) |
249 | goto err; /* purecov: inspected */ |
250 | } |
251 | if (flush_io_cache(&tempfile) || |
252 | reinit_io_cache(&tempfile,READ_CACHE,0L,0,0)) |
253 | goto err; /* purecov: inspected */ |
254 | if (!no_messages) |
255 | printf(" - Last merge and dumping keys\n" ); /* purecov: tested */ |
256 | if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *), |
257 | maxbuffer,&tempfile)) |
258 | goto err; /* purecov: inspected */ |
259 | } |
260 | |
261 | if (flush_ft_buf(info) || flush_pending_blocks(info)) |
262 | goto err; |
263 | |
264 | if (my_b_inited(&tempfile_for_exceptions)) |
265 | { |
266 | MI_INFO *idx=info->sort_info->info; |
267 | uint keyno=info->key; |
268 | uint key_length, ref_length=idx->s->rec_reflength; |
269 | |
270 | if (!no_messages) |
271 | printf(" - Adding exceptions\n" ); /* purecov: tested */ |
272 | if (flush_io_cache(&tempfile_for_exceptions) || |
273 | reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0)) |
274 | goto err; |
275 | |
276 | while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length, |
277 | sizeof(key_length)) |
278 | && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys, |
279 | (uint) key_length)) |
280 | { |
281 | if (_mi_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length)) |
282 | goto err; |
283 | } |
284 | } |
285 | |
286 | error =0; |
287 | |
288 | err: |
289 | my_free(sort_keys); |
290 | delete_dynamic(&buffpek); |
291 | close_cached_file(&tempfile); |
292 | close_cached_file(&tempfile_for_exceptions); |
293 | |
294 | DBUG_RETURN(error ? -1 : 0); |
295 | } /* _create_index_by_sort */ |
296 | |
297 | |
298 | /* Search after all keys and place them in a temp. file */ |
299 | |
300 | static ha_rows find_all_keys(MI_SORT_PARAM *info, ha_rows keys, |
301 | uchar **sort_keys, DYNAMIC_ARRAY *buffpek, |
302 | uint *maxbuffer, IO_CACHE *tempfile, |
303 | IO_CACHE *tempfile_for_exceptions) |
304 | { |
305 | int error; |
306 | ha_rows idx; |
307 | DBUG_ENTER("find_all_keys" ); |
308 | |
309 | idx=error=0; |
310 | sort_keys[0]=(uchar*) (sort_keys+keys); |
311 | |
312 | while (!(error=(*info->key_read)(info,sort_keys[idx]))) |
313 | { |
314 | if (info->real_key_length > info->key_length) |
315 | { |
316 | if (write_key(info,sort_keys[idx],tempfile_for_exceptions)) |
317 | DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ |
318 | continue; |
319 | } |
320 | |
321 | if (++idx == keys) |
322 | { |
323 | if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek), |
324 | tempfile)) |
325 | DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ |
326 | |
327 | sort_keys[0]=(uchar*) (sort_keys+keys); |
328 | memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); |
329 | idx=1; |
330 | } |
331 | sort_keys[idx]=sort_keys[idx-1]+info->key_length; |
332 | } |
333 | if (error > 0) |
334 | DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */ |
335 | if (buffpek->elements) |
336 | { |
337 | if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek), |
338 | tempfile)) |
339 | DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ |
340 | *maxbuffer=buffpek->elements-1; |
341 | } |
342 | else |
343 | *maxbuffer=0; |
344 | |
345 | DBUG_RETURN((*maxbuffer)*(keys-1)+idx); |
346 | } /* find_all_keys */ |
347 | |
348 | static my_bool thr_find_all_keys_exec(MI_SORT_PARAM *sort_param) |
349 | { |
350 | ulonglong memavl, old_memavl, sortbuff_size; |
351 | ha_keys UNINIT_VAR(keys), idx; |
352 | uint sort_length; |
353 | uint maxbuffer; |
354 | uchar **sort_keys= NULL; |
355 | int error= 0; |
356 | DBUG_ENTER("thr_find_all_keys" ); |
357 | DBUG_PRINT("enter" , ("master: %d" , sort_param->master)); |
358 | |
359 | if (sort_param->sort_info->got_error) |
360 | DBUG_RETURN(TRUE); |
361 | |
362 | set_sort_param_read_write(sort_param); |
363 | |
364 | my_b_clear(&sort_param->tempfile); |
365 | my_b_clear(&sort_param->tempfile_for_exceptions); |
366 | bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek)); |
367 | bzero((char*) &sort_param->unique, sizeof(sort_param->unique)); |
368 | |
369 | sortbuff_size= sort_param->sortbuff_size; |
370 | memavl= MY_MAX(sortbuff_size, MIN_SORT_BUFFER); |
371 | idx= (ha_keys) sort_param->sort_info->max_records; |
372 | sort_length= sort_param->key_length; |
373 | maxbuffer= 1; |
374 | |
375 | while (memavl >= MIN_SORT_BUFFER) |
376 | { |
377 | if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= |
378 | (my_off_t) memavl) |
379 | keys= idx+1; |
380 | else if ((sort_param->sort_info->param->testflag & |
381 | (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) == |
382 | T_FORCE_SORT_MEMORY) |
383 | { |
384 | /* |
385 | Use all of the given sort buffer for key data. |
386 | Allocate 1000 buffers at a start for new data. More buffers |
387 | will be allocated when needed. |
388 | */ |
389 | keys= memavl / (sort_length+sizeof(char*)); |
390 | maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1); |
391 | } |
392 | else |
393 | { |
394 | uint maxbuffer_org; |
395 | do |
396 | { |
397 | maxbuffer_org= maxbuffer; |
398 | if (memavl < sizeof(BUFFPEK)*maxbuffer || |
399 | (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ |
400 | (sort_length+sizeof(char*))) <= 1 || |
401 | keys < (uint) maxbuffer) |
402 | { |
403 | mi_check_print_error(sort_param->sort_info->param, |
404 | "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
405 | sortbuff_size, (ulonglong) idx, sort_length); |
406 | DBUG_RETURN(TRUE); |
407 | } |
408 | } |
409 | while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org); |
410 | } |
411 | if ((sort_keys= (uchar**) my_malloc((size_t)(keys * (sort_length + sizeof(char*)) + |
412 | ((sort_param->keyinfo->flag & HA_FULLTEXT) ? |
413 | HA_FT_MAXBYTELEN : 0)), MYF(0)))) |
414 | { |
415 | if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK), |
416 | maxbuffer, MY_MIN(maxbuffer / 2, 1000), MYF(0))) |
417 | { |
418 | my_free(sort_keys); |
419 | sort_keys= NULL; /* Safety against double free on error. */ |
420 | } |
421 | else |
422 | break; |
423 | } |
424 | old_memavl= memavl; |
425 | if ((memavl= memavl / 4 * 3) < MIN_SORT_BUFFER && |
426 | old_memavl > MIN_SORT_BUFFER) |
427 | memavl= MIN_SORT_BUFFER; |
428 | } |
429 | if (memavl < MIN_SORT_BUFFER) |
430 | { |
431 | /* purecov: begin inspected */ |
432 | mi_check_print_error(sort_param->sort_info->param, |
433 | "myisam_sort_buffer_size is too small. Current myisam_sort_buffer_size: %llu rows: %llu sort_length: %u" , |
434 | sortbuff_size, (ulonglong) idx, sort_length); |
435 | my_errno= ENOMEM; |
436 | goto err; |
437 | /* purecov: end inspected */ |
438 | } |
439 | |
440 | if (sort_param->sort_info->param->testflag & T_VERBOSE) |
441 | my_fprintf(stdout, |
442 | "Key %d - Allocating buffer for %llu keys\n" , |
443 | sort_param->key + 1, (ulonglong) keys); |
444 | sort_param->sort_keys= sort_keys; |
445 | |
446 | idx= error= 0; |
447 | sort_keys[0]= (uchar*) (sort_keys+keys); |
448 | |
449 | DBUG_PRINT("info" , ("reading keys" )); |
450 | while (!(error= sort_param->sort_info->got_error) && |
451 | !(error= (*sort_param->key_read)(sort_param, sort_keys[idx]))) |
452 | { |
453 | if (sort_param->real_key_length > sort_param->key_length) |
454 | { |
455 | if (write_key(sort_param, sort_keys[idx], |
456 | &sort_param->tempfile_for_exceptions)) |
457 | goto err; |
458 | continue; |
459 | } |
460 | |
461 | if (++idx == keys) |
462 | { |
463 | if (sort_param->write_keys(sort_param, sort_keys, idx - 1, |
464 | (BUFFPEK*) alloc_dynamic(&sort_param->buffpek), |
465 | &sort_param->tempfile)) |
466 | goto err; |
467 | sort_keys[0]= (uchar*) (sort_keys+keys); |
468 | memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length); |
469 | idx= 1; |
470 | } |
471 | sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length; |
472 | } |
473 | |
474 | if (error > 0) |
475 | goto err; |
476 | |
477 | if (sort_param->buffpek.elements) |
478 | { |
479 | if (sort_param->write_keys(sort_param, sort_keys, idx, |
480 | (BUFFPEK*) alloc_dynamic(&sort_param->buffpek), |
481 | &sort_param->tempfile)) |
482 | goto err; |
483 | sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx; |
484 | } |
485 | else |
486 | sort_param->keys= idx; |
487 | |
488 | DBUG_RETURN(FALSE); |
489 | |
490 | err: |
491 | DBUG_PRINT("error" , ("got some error" )); |
492 | sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */ |
493 | my_free(sort_keys); |
494 | sort_param->sort_keys= 0; |
495 | delete_dynamic(& sort_param->buffpek); |
496 | close_cached_file(&sort_param->tempfile); |
497 | close_cached_file(&sort_param->tempfile_for_exceptions); |
498 | |
499 | DBUG_RETURN(TRUE); |
500 | } |
501 | |
502 | /* Search after all keys and place them in a temp. file */ |
503 | |
504 | pthread_handler_t thr_find_all_keys(void *arg) |
505 | { |
506 | MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg; |
507 | my_bool error= FALSE; |
508 | /* If my_thread_init fails */ |
509 | if (my_thread_init() || thr_find_all_keys_exec(sort_param)) |
510 | error= TRUE; |
511 | |
512 | /* |
513 | Thread must clean up after itself. |
514 | */ |
515 | free_root(&sort_param->wordroot, MYF(0)); |
516 | /* |
517 | Detach from the share if the writer is involved. Avoid others to |
518 | be blocked. This includes a flush of the write buffer. This will |
519 | also indicate EOF to the readers. |
520 | That means that a writer always gets here first and readers - |
521 | only when they see EOF. But if a reader finishes prematurely |
522 | because of an error it may reach this earlier - don't allow it |
523 | to detach the writer thread. |
524 | */ |
525 | if (sort_param->master && sort_param->sort_info->info->rec_cache.share) |
526 | remove_io_thread(&sort_param->sort_info->info->rec_cache); |
527 | |
528 | /* Readers detach from the share if any. Avoid others to be blocked. */ |
529 | if (sort_param->read_cache.share) |
530 | remove_io_thread(&sort_param->read_cache); |
531 | |
532 | mysql_mutex_lock(&sort_param->sort_info->mutex); |
533 | if (error) |
534 | sort_param->sort_info->got_error= 1; |
535 | |
536 | if (!--sort_param->sort_info->threads_running) |
537 | mysql_cond_signal(&sort_param->sort_info->cond); |
538 | mysql_mutex_unlock(&sort_param->sort_info->mutex); |
539 | my_thread_end(); |
540 | return NULL; |
541 | } |
542 | |
543 | |
544 | int thr_write_keys(MI_SORT_PARAM *sort_param) |
545 | { |
546 | MI_SORT_INFO *sort_info=sort_param->sort_info; |
547 | HA_CHECK *param=sort_info->param; |
548 | ulonglong UNINIT_VAR(length); |
549 | ha_rows keys; |
550 | ulong *rec_per_key_part=param->rec_per_key_part; |
551 | int got_error=sort_info->got_error; |
552 | uint i; |
553 | MI_INFO *info=sort_info->info; |
554 | MYISAM_SHARE *share=info->s; |
555 | MI_SORT_PARAM *sinfo; |
556 | uchar *mergebuf=0; |
557 | DBUG_ENTER("thr_write_keys" ); |
558 | |
559 | for (i= 0, sinfo= sort_param ; |
560 | i < sort_info->total_keys ; |
561 | i++, sinfo++) |
562 | { |
563 | if (!sinfo->sort_keys) |
564 | { |
565 | got_error=1; |
566 | my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff)); |
567 | continue; |
568 | } |
569 | if (!got_error) |
570 | { |
571 | mi_set_key_active(share->state.key_map, sinfo->key); |
572 | if (!sinfo->buffpek.elements) |
573 | { |
574 | if (param->testflag & T_VERBOSE) |
575 | { |
576 | my_fprintf(stdout, |
577 | "Key %d - Dumping %llu keys\n" , sinfo->key+1, |
578 | (ulonglong) sinfo->keys); |
579 | fflush(stdout); |
580 | } |
581 | if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || |
582 | flush_ft_buf(sinfo) || flush_pending_blocks(sinfo)) |
583 | got_error=1; |
584 | } |
585 | } |
586 | my_free(sinfo->sort_keys); |
587 | my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff)); |
588 | sinfo->sort_keys=0; |
589 | } |
590 | |
591 | for (i= 0, sinfo= sort_param ; |
592 | i < sort_info->total_keys ; |
593 | i++, |
594 | delete_dynamic(&sinfo->buffpek), |
595 | close_cached_file(&sinfo->tempfile), |
596 | close_cached_file(&sinfo->tempfile_for_exceptions), |
597 | rec_per_key_part+= sinfo->keyinfo->keysegs, sinfo++) |
598 | { |
599 | if (got_error) |
600 | continue; |
601 | |
602 | set_sort_param_read_write(sinfo); |
603 | |
604 | if (sinfo->buffpek.elements) |
605 | { |
606 | uint maxbuffer=sinfo->buffpek.elements-1; |
607 | if (!mergebuf) |
608 | { |
609 | length=param->sort_buffer_length; |
610 | while (length >= MIN_SORT_BUFFER) |
611 | { |
612 | if ((mergebuf= my_malloc((size_t) length, MYF(0)))) |
613 | break; |
614 | length=length*3/4; |
615 | } |
616 | if (!mergebuf) |
617 | { |
618 | got_error=1; |
619 | continue; |
620 | } |
621 | } |
622 | keys=length/sinfo->key_length; |
623 | if (maxbuffer >= MERGEBUFF2) |
624 | { |
625 | if (param->testflag & T_VERBOSE) |
626 | my_fprintf(stdout, |
627 | "Key %d - Merging %llu keys\n" , |
628 | sinfo->key+1, (ulonglong) sinfo->keys); |
629 | if (merge_many_buff(sinfo, keys, (uchar **)mergebuf, |
630 | dynamic_element(&sinfo->buffpek, 0, BUFFPEK *), |
631 | &maxbuffer, &sinfo->tempfile)) |
632 | { |
633 | got_error=1; |
634 | continue; |
635 | } |
636 | } |
637 | if (flush_io_cache(&sinfo->tempfile) || |
638 | reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0)) |
639 | { |
640 | got_error=1; |
641 | continue; |
642 | } |
643 | if (param->testflag & T_VERBOSE) |
644 | printf("Key %d - Last merge and dumping keys\n" , sinfo->key+1); |
645 | if (merge_index(sinfo, keys, (uchar **)mergebuf, |
646 | dynamic_element(&sinfo->buffpek,0,BUFFPEK *), |
647 | maxbuffer,&sinfo->tempfile) || |
648 | flush_ft_buf(sinfo) || |
649 | flush_pending_blocks(sinfo)) |
650 | { |
651 | got_error=1; |
652 | continue; |
653 | } |
654 | } |
655 | if (my_b_inited(&sinfo->tempfile_for_exceptions)) |
656 | { |
657 | uint key_length; |
658 | |
659 | if (param->testflag & T_VERBOSE) |
660 | printf("Key %d - Dumping 'long' keys\n" , sinfo->key+1); |
661 | |
662 | if (flush_io_cache(&sinfo->tempfile_for_exceptions) || |
663 | reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0)) |
664 | { |
665 | got_error=1; |
666 | continue; |
667 | } |
668 | |
669 | while (!got_error && |
670 | !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length, |
671 | sizeof(key_length))) |
672 | { |
673 | uchar ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10]; |
674 | if (key_length > sizeof(ft_buf) || |
675 | my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)ft_buf, |
676 | (uint)key_length) || |
677 | _mi_ck_write(info, sinfo->key, (uchar*)ft_buf, |
678 | key_length - info->s->rec_reflength)) |
679 | got_error=1; |
680 | } |
681 | } |
682 | if (!got_error && param->testflag & T_STATISTICS) |
683 | update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique, |
684 | param->stats_method == MI_STATS_METHOD_IGNORE_NULLS ? |
685 | sinfo->notnull : NULL, |
686 | (ulonglong) info->state->records); |
687 | } |
688 | my_free(mergebuf); |
689 | DBUG_RETURN(got_error); |
690 | } |
691 | |
692 | /* Write all keys in memory to file for later merge */ |
693 | |
694 | static int write_keys(MI_SORT_PARAM *info, register uchar **sort_keys, |
695 | ha_keys count, BUFFPEK *buffpek, IO_CACHE *tempfile) |
696 | { |
697 | uchar **end; |
698 | uint sort_length=info->key_length; |
699 | DBUG_ENTER("write_keys" ); |
700 | |
701 | if (!buffpek) |
702 | DBUG_RETURN(1); /* Out of memory */ |
703 | |
704 | my_qsort2((uchar*) sort_keys,(size_t) count, sizeof(uchar*), |
705 | (qsort2_cmp) info->key_cmp, info); |
706 | if (!my_b_inited(tempfile) && |
707 | open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST" , |
708 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
709 | DBUG_RETURN(1); /* purecov: inspected */ |
710 | |
711 | buffpek->file_pos=my_b_tell(tempfile); |
712 | buffpek->count=count; |
713 | |
714 | for (end=sort_keys+count ; sort_keys != end ; sort_keys++) |
715 | { |
716 | if (my_b_write(tempfile,(uchar*) *sort_keys, sort_length)) |
717 | DBUG_RETURN(1); /* purecov: inspected */ |
718 | } |
719 | DBUG_RETURN(0); |
720 | } /* write_keys */ |
721 | |
722 | |
723 | static inline int |
724 | my_var_write(MI_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs) |
725 | { |
726 | int err; |
727 | uint16 len = _mi_keylength(info->keyinfo, (uchar*) bufs); |
728 | |
729 | /* The following is safe as this is a local file */ |
730 | if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len)))) |
731 | return (err); |
732 | if ((err= my_b_write(to_file,bufs, (uint) len))) |
733 | return (err); |
734 | return (0); |
735 | } |
736 | |
737 | |
738 | static int write_keys_varlen(MI_SORT_PARAM *info, |
739 | register uchar **sort_keys, |
740 | ha_keys count, BUFFPEK *buffpek, |
741 | IO_CACHE *tempfile) |
742 | { |
743 | uchar **end; |
744 | int err; |
745 | DBUG_ENTER("write_keys_varlen" ); |
746 | |
747 | if (!buffpek) |
748 | DBUG_RETURN(1); /* Out of memory */ |
749 | |
750 | my_qsort2((uchar*) sort_keys, (size_t) count, sizeof(uchar*), |
751 | (qsort2_cmp) info->key_cmp, info); |
752 | if (!my_b_inited(tempfile) && |
753 | open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST" , |
754 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
755 | DBUG_RETURN(1); /* purecov: inspected */ |
756 | |
757 | buffpek->file_pos=my_b_tell(tempfile); |
758 | buffpek->count=count; |
759 | for (end=sort_keys+count ; sort_keys != end ; sort_keys++) |
760 | { |
761 | if ((err= my_var_write(info,tempfile, (uchar*) *sort_keys))) |
762 | DBUG_RETURN(err); |
763 | } |
764 | DBUG_RETURN(0); |
765 | } /* write_keys_varlen */ |
766 | |
767 | |
768 | static int write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile) |
769 | { |
770 | uint key_length=info->real_key_length; |
771 | DBUG_ENTER("write_key" ); |
772 | |
773 | if (!my_b_inited(tempfile) && |
774 | open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST" , |
775 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
776 | DBUG_RETURN(1); |
777 | |
778 | if (my_b_write(tempfile,(uchar*)&key_length,sizeof(key_length)) || |
779 | my_b_write(tempfile,(uchar*)key,(uint) key_length)) |
780 | DBUG_RETURN(1); |
781 | DBUG_RETURN(0); |
782 | } /* write_key */ |
783 | |
784 | |
785 | /* Write index */ |
786 | |
787 | static int write_index(MI_SORT_PARAM *info, register uchar **sort_keys, |
788 | register ha_keys count) |
789 | { |
790 | DBUG_ENTER("write_index" ); |
791 | |
792 | my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*), |
793 | (qsort2_cmp) info->key_cmp,info); |
794 | while (count--) |
795 | { |
796 | if ((*info->key_write)(info,*sort_keys++)) |
797 | DBUG_RETURN(-1); /* purecov: inspected */ |
798 | } |
799 | DBUG_RETURN(0); |
800 | } /* write_index */ |
801 | |
802 | |
803 | /* Merge buffers to make < MERGEBUFF2 buffers */ |
804 | |
805 | static int merge_many_buff(MI_SORT_PARAM *info, ha_keys keys, |
806 | uchar **sort_keys, BUFFPEK *buffpek, |
807 | uint *maxbuffer, IO_CACHE *t_file) |
808 | { |
809 | register uint i; |
810 | IO_CACHE t_file2, *from_file, *to_file, *temp; |
811 | BUFFPEK *lastbuff; |
812 | DBUG_ENTER("merge_many_buff" ); |
813 | |
814 | if (*maxbuffer < MERGEBUFF2) |
815 | DBUG_RETURN(0); /* purecov: inspected */ |
816 | if (flush_io_cache(t_file) || |
817 | open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST" , |
818 | DISK_BUFFER_SIZE, info->sort_info->param->myf_rw)) |
819 | DBUG_RETURN(1); /* purecov: inspected */ |
820 | |
821 | from_file= t_file ; to_file= &t_file2; |
822 | while (*maxbuffer >= MERGEBUFF2) |
823 | { |
824 | reinit_io_cache(from_file,READ_CACHE,0L,0,0); |
825 | reinit_io_cache(to_file,WRITE_CACHE,0L,0,0); |
826 | lastbuff=buffpek; |
827 | for (i=0 ; i + MERGEBUFF*3/2 <= *maxbuffer ; i+=MERGEBUFF) |
828 | { |
829 | if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, |
830 | buffpek+i,buffpek+i+MERGEBUFF-1)) |
831 | goto cleanup; |
832 | } |
833 | if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, |
834 | buffpek+i,buffpek+ *maxbuffer)) |
835 | break; /* purecov: inspected */ |
836 | if (flush_io_cache(to_file)) |
837 | break; /* purecov: inspected */ |
838 | temp=from_file; from_file=to_file; to_file=temp; |
839 | *maxbuffer= (uint) (lastbuff-buffpek)-1; |
840 | } |
841 | cleanup: |
842 | close_cached_file(to_file); /* This holds old result */ |
843 | if (to_file == t_file) |
844 | { |
845 | DBUG_ASSERT(t_file2.type == WRITE_CACHE); |
846 | *t_file=t_file2; /* Copy result file */ |
847 | } |
848 | |
849 | DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */ |
850 | } /* merge_many_buff */ |
851 | |
852 | |
853 | /* |
854 | Read data to buffer |
855 | |
856 | SYNOPSIS |
857 | read_to_buffer() |
858 | fromfile File to read from |
859 | buffpek Where to read from |
860 | sort_length max length to read |
861 | RESULT |
862 | > 0 Ammount of bytes read |
863 | -1 Error |
864 | */ |
865 | |
866 | static my_off_t read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek, |
867 | uint sort_length) |
868 | { |
869 | register ha_keys count; |
870 | size_t length; |
871 | |
872 | if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys, |
873 | (ha_rows) buffpek->count))) |
874 | { |
875 | if (my_b_pread(fromfile, (uchar*) buffpek->base, |
876 | (length= (size_t) (sort_length * count)), |
877 | buffpek->file_pos)) |
878 | return(HA_OFFSET_ERROR); |
879 | buffpek->key=buffpek->base; |
880 | buffpek->file_pos+= length; /* New filepos */ |
881 | buffpek->count-= count; |
882 | buffpek->mem_count= count; |
883 | } |
884 | return (((my_off_t) count) * sort_length); |
885 | } /* read_to_buffer */ |
886 | |
887 | |
888 | static my_off_t read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek, |
889 | uint sort_length) |
890 | { |
891 | register ha_keys count; |
892 | uint16 length_of_key = 0; |
893 | uint idx; |
894 | uchar *buffp; |
895 | |
896 | if ((count= (ha_keys) MY_MIN((ha_rows) buffpek->max_keys,buffpek->count))) |
897 | { |
898 | buffp = buffpek->base; |
899 | |
900 | for (idx=1;idx<=count;idx++) |
901 | { |
902 | if (my_b_pread(fromfile, (uchar*)&length_of_key, |
903 | sizeof(length_of_key), buffpek->file_pos)) |
904 | return(HA_OFFSET_ERROR); |
905 | buffpek->file_pos+=sizeof(length_of_key); |
906 | if (my_b_pread(fromfile, (uchar*) buffp, |
907 | length_of_key, buffpek->file_pos)) |
908 | return(HA_OFFSET_ERROR); |
909 | buffpek->file_pos+=length_of_key; |
910 | buffp = buffp + sort_length; |
911 | } |
912 | buffpek->key=buffpek->base; |
913 | buffpek->count-= count; |
914 | buffpek->mem_count= count; |
915 | } |
916 | return (((my_off_t) count) * sort_length); |
917 | } /* read_to_buffer_varlen */ |
918 | |
919 | |
920 | static int write_merge_key_varlen(MI_SORT_PARAM *info, |
921 | IO_CACHE *to_file, uchar* key, |
922 | uint sort_length, ha_keys count) |
923 | { |
924 | ha_keys idx; |
925 | uchar *bufs = key; |
926 | |
927 | for (idx=1;idx<=count;idx++) |
928 | { |
929 | int err; |
930 | if ((err= my_var_write(info, to_file, bufs))) |
931 | return (err); |
932 | bufs=bufs+sort_length; |
933 | } |
934 | return(0); |
935 | } |
936 | |
937 | |
938 | static int write_merge_key(MI_SORT_PARAM *info __attribute__((unused)), |
939 | IO_CACHE *to_file, uchar *key, |
940 | uint sort_length, ha_keys count) |
941 | { |
942 | return my_b_write(to_file, key, (size_t) (sort_length * count)); |
943 | } |
944 | |
945 | /* |
946 | Merge buffers to one buffer |
947 | If to_file == 0 then use info->key_write |
948 | |
949 | Return: |
950 | 0 ok |
951 | 1 error |
952 | */ |
953 | |
954 | static int |
955 | merge_buffers(MI_SORT_PARAM *info, ha_keys keys, IO_CACHE *from_file, |
956 | IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff, |
957 | BUFFPEK *Fb, BUFFPEK *Tb) |
958 | { |
959 | int error= 1; |
960 | uint sort_length; |
961 | ha_keys maxcount; |
962 | ha_rows count; |
963 | my_off_t UNINIT_VAR(to_start_filepos), read_length; |
964 | uchar *strpos; |
965 | BUFFPEK *buffpek,**refpek; |
966 | QUEUE queue; |
967 | DBUG_ENTER("merge_buffers" ); |
968 | |
969 | count=error=0; |
970 | maxcount= keys/((uint) (Tb-Fb) +1); |
971 | DBUG_ASSERT(maxcount > 0); |
972 | if (to_file) |
973 | to_start_filepos=my_b_tell(to_file); |
974 | strpos=(uchar*) sort_keys; |
975 | sort_length=info->key_length; |
976 | |
977 | if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0, |
978 | (int (*)(void*, uchar *,uchar*)) info->key_cmp, |
979 | (void*) info, 0, 0)) |
980 | DBUG_RETURN(1); /* purecov: inspected */ |
981 | |
982 | for (buffpek= Fb ; buffpek <= Tb ; buffpek++) |
983 | { |
984 | count+= buffpek->count; |
985 | buffpek->base= (uchar*) strpos; |
986 | buffpek->max_keys= maxcount; |
987 | strpos+= (read_length= info->read_to_buffer(from_file,buffpek, |
988 | sort_length)); |
989 | if (read_length == HA_OFFSET_ERROR) |
990 | goto err; /* purecov: inspected */ |
991 | queue_insert(&queue,(uchar*) buffpek); |
992 | } |
993 | |
994 | while (queue.elements > 1) |
995 | { |
996 | for (;;) |
997 | { |
998 | buffpek=(BUFFPEK*) queue_top(&queue); |
999 | if (to_file) |
1000 | { |
1001 | if (info->write_key(info,to_file,(uchar*) buffpek->key, |
1002 | sort_length, 1)) |
1003 | { |
1004 | error=1; goto err; /* purecov: inspected */ |
1005 | } |
1006 | } |
1007 | else |
1008 | { |
1009 | if ((*info->key_write)(info,(void*) buffpek->key)) |
1010 | { |
1011 | error=1; goto err; /* purecov: inspected */ |
1012 | } |
1013 | } |
1014 | buffpek->key+=sort_length; |
1015 | if (! --buffpek->mem_count) |
1016 | { |
1017 | /* It's enough to check for killedptr before a slow operation */ |
1018 | if (killed_ptr(info->sort_info->param)) |
1019 | { |
1020 | goto err; |
1021 | } |
1022 | if (!(read_length= info->read_to_buffer(from_file,buffpek,sort_length))) |
1023 | { |
1024 | uchar *base= buffpek->base; |
1025 | ha_keys max_keys=buffpek->max_keys; |
1026 | |
1027 | queue_remove_top(&queue); |
1028 | |
1029 | /* Put room used by buffer to use in other buffer */ |
1030 | for (refpek= (BUFFPEK**) &queue_top(&queue); |
1031 | refpek <= (BUFFPEK**) &queue_end(&queue); |
1032 | refpek++) |
1033 | { |
1034 | buffpek= *refpek; |
1035 | if (buffpek->base+buffpek->max_keys*sort_length == base) |
1036 | { |
1037 | buffpek->max_keys+=max_keys; |
1038 | break; |
1039 | } |
1040 | else if (base+max_keys*sort_length == buffpek->base) |
1041 | { |
1042 | buffpek->base=base; |
1043 | buffpek->max_keys+=max_keys; |
1044 | break; |
1045 | } |
1046 | } |
1047 | break; /* One buffer have been removed */ |
1048 | } |
1049 | else if (read_length == HA_OFFSET_ERROR) |
1050 | goto err; /* purecov: inspected */ |
1051 | } |
1052 | queue_replace_top(&queue); /* Top element has been replaced */ |
1053 | } |
1054 | } |
1055 | buffpek=(BUFFPEK*) queue_top(&queue); |
1056 | buffpek->base= (uchar*) sort_keys; |
1057 | buffpek->max_keys=keys; |
1058 | do |
1059 | { |
1060 | if (to_file) |
1061 | { |
1062 | if (info->write_key(info,to_file,(uchar*) buffpek->key, |
1063 | sort_length,buffpek->mem_count)) |
1064 | { |
1065 | error=1; goto err; /* purecov: inspected */ |
1066 | } |
1067 | } |
1068 | else |
1069 | { |
1070 | register uchar *end; |
1071 | strpos= (uchar*) buffpek->key; |
1072 | for (end=strpos+buffpek->mem_count*sort_length; |
1073 | strpos != end ; |
1074 | strpos+=sort_length) |
1075 | { |
1076 | if ((*info->key_write)(info,(void*) strpos)) |
1077 | { |
1078 | error=1; goto err; /* purecov: inspected */ |
1079 | } |
1080 | } |
1081 | } |
1082 | } |
1083 | while ((read_length= info->read_to_buffer(from_file,buffpek,sort_length)) != HA_OFFSET_ERROR && read_length != 0); |
1084 | if (read_length == 0) |
1085 | error= 0; |
1086 | |
1087 | lastbuff->count=count; |
1088 | if (to_file) |
1089 | lastbuff->file_pos=to_start_filepos; |
1090 | err: |
1091 | delete_queue(&queue); |
1092 | DBUG_RETURN(error); |
1093 | } /* merge_buffers */ |
1094 | |
1095 | |
1096 | /* Do a merge to output-file (save only positions) */ |
1097 | |
1098 | static int |
1099 | merge_index(MI_SORT_PARAM *info, ha_keys keys, uchar **sort_keys, |
1100 | BUFFPEK *buffpek, uint maxbuffer, IO_CACHE *tempfile) |
1101 | { |
1102 | DBUG_ENTER("merge_index" ); |
1103 | if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek, |
1104 | buffpek+maxbuffer)) |
1105 | DBUG_RETURN(1); /* purecov: inspected */ |
1106 | DBUG_RETURN(0); |
1107 | } /* merge_index */ |
1108 | |
1109 | |
1110 | static int |
1111 | flush_ft_buf(MI_SORT_PARAM *info) |
1112 | { |
1113 | int err=0; |
1114 | if (info->sort_info->ft_buf) |
1115 | { |
1116 | err=sort_ft_buf_flush(info); |
1117 | my_free(info->sort_info->ft_buf); |
1118 | info->sort_info->ft_buf=0; |
1119 | } |
1120 | return err; |
1121 | } |
1122 | |