1 | /* Copyright (c) 2001, 2010, Oracle and/or its affiliates. |
2 | Copyright (c) 2010, 2015, MariaDB |
3 | |
4 | This program is free software; you can redistribute it and/or modify |
5 | it under the terms of the GNU General Public License as published by |
6 | the Free Software Foundation; version 2 of the License. |
7 | |
8 | This program is distributed in the hope that it will be useful, |
9 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
11 | GNU General Public License for more details. |
12 | |
13 | You should have received a copy of the GNU General Public License |
14 | along with this program; if not, write to the Free Software |
15 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
16 | |
17 | /* |
18 | Function to handle quick removal of duplicates |
19 | This code is used when doing multi-table deletes to find the rows in |
20 | reference tables that needs to be deleted. |
21 | |
22 | The basic idea is as follows: |
23 | |
24 | Store first all strings in a binary tree, ignoring duplicates. |
25 | When the tree uses more memory than 'max_heap_table_size', |
26 | write the tree (in sorted order) out to disk and start with a new tree. |
27 | When all data has been generated, merge the trees (removing any found |
28 | duplicates). |
29 | |
30 | The unique entries will be returned in sort order, to ensure that we do the |
31 | deletes in disk order. |
32 | */ |
33 | |
34 | #include "mariadb.h" |
35 | #include "sql_priv.h" |
36 | #include "unireg.h" |
37 | #include "sql_sort.h" |
38 | #include "queues.h" // QUEUE |
39 | #include "my_tree.h" // element_count |
40 | #include "uniques.h" // Unique |
41 | #include "sql_sort.h" |
42 | #include "myisamchk.h" // BUFFPEK |
43 | |
44 | int unique_write_to_file(uchar* key, element_count count, Unique *unique) |
45 | { |
46 | /* |
47 | Use unique->size (size of element stored in the tree) and not |
48 | unique->tree.size_of_element. The latter is different from unique->size |
49 | when tree implementation chooses to store pointer to key in TREE_ELEMENT |
50 | (instead of storing the element itself there) |
51 | */ |
52 | return my_b_write(&unique->file, key, unique->size) ? 1 : 0; |
53 | } |
54 | |
55 | int unique_write_to_file_with_count(uchar* key, element_count count, Unique *unique) |
56 | { |
57 | return my_b_write(&unique->file, key, unique->size) || |
58 | my_b_write(&unique->file, (uchar*)&count, sizeof(element_count)) ? 1 : 0; |
59 | } |
60 | |
61 | int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique) |
62 | { |
63 | memcpy(unique->sort.record_pointers, key, unique->size); |
64 | unique->sort.record_pointers+=unique->size; |
65 | return 0; |
66 | } |
67 | |
68 | int unique_intersect_write_to_ptrs(uchar* key, element_count count, Unique *unique) |
69 | { |
70 | if (count >= unique->min_dupl_count) |
71 | { |
72 | memcpy(unique->sort.record_pointers, key, unique->size); |
73 | unique->sort.record_pointers+=unique->size; |
74 | } |
75 | else |
76 | unique->filtered_out_elems++; |
77 | return 0; |
78 | } |
79 | |
80 | |
81 | Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg, |
82 | uint size_arg, size_t max_in_memory_size_arg, |
83 | uint min_dupl_count_arg) |
84 | :max_in_memory_size(max_in_memory_size_arg), |
85 | size(size_arg), |
86 | elements(0) |
87 | { |
88 | my_b_clear(&file); |
89 | min_dupl_count= min_dupl_count_arg; |
90 | full_size= size; |
91 | if (min_dupl_count_arg) |
92 | full_size+= sizeof(element_count); |
93 | with_counters= MY_TEST(min_dupl_count_arg); |
94 | init_tree(&tree, (max_in_memory_size / 16), 0, size, comp_func, |
95 | NULL, comp_func_fixed_arg, MYF(MY_THREAD_SPECIFIC)); |
96 | /* If the following fail's the next add will also fail */ |
97 | my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16, |
98 | MYF(MY_THREAD_SPECIFIC)); |
99 | /* |
100 | If you change the following, change it in get_max_elements function, too. |
101 | */ |
102 | max_elements= (ulong) (max_in_memory_size / |
103 | ALIGN_SIZE(sizeof(TREE_ELEMENT)+size)); |
104 | if (!max_elements) |
105 | max_elements= 1; |
106 | |
107 | (void) open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE, |
108 | MYF(MY_WME)); |
109 | } |
110 | |
111 | |
112 | /* |
113 | Calculate log2(n!) |
114 | |
115 | NOTES |
116 | Stirling's approximate formula is used: |
117 | |
118 | n! ~= sqrt(2*M_PI*n) * (n/M_E)^n |
119 | |
120 | Derivation of formula used for calculations is as follows: |
121 | |
122 | log2(n!) = log(n!)/log(2) = log(sqrt(2*M_PI*n)*(n/M_E)^n) / log(2) = |
123 | |
124 | = (log(2*M_PI*n)/2 + n*log(n/M_E)) / log(2). |
125 | */ |
126 | |
127 | inline double log2_n_fact(double x) |
128 | { |
129 | return (log(2*M_PI*x)/2 + x*log(x/M_E)) / M_LN2; |
130 | } |
131 | |
132 | |
133 | /* |
134 | Calculate cost of merge_buffers function call for given sequence of |
135 | input stream lengths and store the number of rows in result stream in *last. |
136 | |
137 | SYNOPSIS |
138 | get_merge_buffers_cost() |
139 | buff_elems Array of #s of elements in buffers |
140 | elem_size Size of element stored in buffer |
141 | first Pointer to first merged element size |
142 | last Pointer to last merged element size |
143 | |
144 | RETURN |
145 | Cost of merge_buffers operation in disk seeks. |
146 | |
147 | NOTES |
148 | It is assumed that no rows are eliminated during merge. |
149 | The cost is calculated as |
150 | |
151 | cost(read_and_write) + cost(merge_comparisons). |
152 | |
153 | All bytes in the sequences is read and written back during merge so cost |
154 | of disk io is 2*elem_size*total_buf_elems/IO_SIZE (2 is for read + write) |
155 | |
156 | For comparisons cost calculations we assume that all merged sequences have |
157 | the same length, so each of total_buf_size elements will be added to a sort |
158 | heap with (n_buffers-1) elements. This gives the comparison cost: |
159 | |
160 | total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID; |
161 | */ |
162 | |
163 | static double get_merge_buffers_cost(uint *buff_elems, uint elem_size, |
164 | uint *first, uint *last, |
165 | uint compare_factor) |
166 | { |
167 | uint total_buf_elems= 0; |
168 | for (uint *pbuf= first; pbuf <= last; pbuf++) |
169 | total_buf_elems+= *pbuf; |
170 | *last= total_buf_elems; |
171 | |
172 | size_t n_buffers= last - first + 1; |
173 | |
174 | /* Using log2(n)=log(n)/log(2) formula */ |
175 | return 2*((double)total_buf_elems*elem_size) / IO_SIZE + |
176 | total_buf_elems*log((double) n_buffers) / (compare_factor * M_LN2); |
177 | } |
178 | |
179 | |
180 | /* |
181 | Calculate cost of merging buffers into one in Unique::get, i.e. calculate |
182 | how long (in terms of disk seeks) the two calls |
183 | merge_many_buffs(...); |
184 | merge_buffers(...); |
185 | will take. |
186 | |
187 | SYNOPSIS |
188 | get_merge_many_buffs_cost() |
189 | buffer buffer space for temporary data, at least |
190 | Unique::get_cost_calc_buff_size bytes |
191 | maxbuffer # of full buffers |
192 | max_n_elems # of elements in first maxbuffer buffers |
193 | last_n_elems # of elements in last buffer |
194 | elem_size size of buffer element |
195 | |
196 | NOTES |
197 | maxbuffer+1 buffers are merged, where first maxbuffer buffers contain |
198 | max_n_elems elements each and last buffer contains last_n_elems elements. |
199 | |
200 | The current implementation does a dumb simulation of merge_many_buffs |
201 | function actions. |
202 | |
203 | RETURN |
204 | Cost of merge in disk seeks. |
205 | */ |
206 | |
207 | static double get_merge_many_buffs_cost(uint *buffer, |
208 | uint maxbuffer, uint max_n_elems, |
209 | uint last_n_elems, int elem_size, |
210 | uint compare_factor) |
211 | { |
212 | int i; |
213 | double total_cost= 0.0; |
214 | uint *buff_elems= buffer; /* #s of elements in each of merged sequences */ |
215 | |
216 | /* |
217 | Set initial state: first maxbuffer sequences contain max_n_elems elements |
218 | each, last sequence contains last_n_elems elements. |
219 | */ |
220 | for (i = 0; i < (int)maxbuffer; i++) |
221 | buff_elems[i]= max_n_elems; |
222 | buff_elems[maxbuffer]= last_n_elems; |
223 | |
224 | /* |
225 | Do it exactly as merge_many_buff function does, calling |
226 | get_merge_buffers_cost to get cost of merge_buffers. |
227 | */ |
228 | if (maxbuffer >= MERGEBUFF2) |
229 | { |
230 | while (maxbuffer >= MERGEBUFF2) |
231 | { |
232 | uint lastbuff= 0; |
233 | for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF) |
234 | { |
235 | total_cost+=get_merge_buffers_cost(buff_elems, elem_size, |
236 | buff_elems + i, |
237 | buff_elems + i + MERGEBUFF-1, |
238 | compare_factor); |
239 | lastbuff++; |
240 | } |
241 | total_cost+=get_merge_buffers_cost(buff_elems, elem_size, |
242 | buff_elems + i, |
243 | buff_elems + maxbuffer, |
244 | compare_factor); |
245 | maxbuffer= lastbuff; |
246 | } |
247 | } |
248 | |
249 | /* Simulate final merge_buff call. */ |
250 | total_cost += get_merge_buffers_cost(buff_elems, elem_size, |
251 | buff_elems, buff_elems + maxbuffer, |
252 | compare_factor); |
253 | return total_cost; |
254 | } |
255 | |
256 | |
257 | /* |
258 | Calculate cost of using Unique for processing nkeys elements of size |
259 | key_size using max_in_memory_size memory. |
260 | |
261 | SYNOPSIS |
262 | Unique::get_use_cost() |
263 | buffer space for temporary data, use Unique::get_cost_calc_buff_size |
264 | to get # bytes needed. |
265 | nkeys #of elements in Unique |
266 | key_size size of each elements in bytes |
267 | max_in_memory_size amount of memory Unique will be allowed to use |
268 | compare_factor used to calculate cost of one comparison |
269 | write_fl if the result must be saved written to disk |
270 | in_memory_elems OUT estimate of the number of elements in memory |
271 | if disk is not used |
272 | |
273 | RETURN |
274 | Cost in disk seeks. |
275 | |
276 | NOTES |
277 | cost(using_unqiue) = |
278 | cost(create_trees) + (see #1) |
279 | cost(merge) + (see #2) |
280 | cost(read_result) (see #3) |
281 | |
282 | 1. Cost of trees creation |
283 | For each Unique::put operation there will be 2*log2(n+1) elements |
284 | comparisons, where n runs from 1 tree_size (we assume that all added |
285 | elements are different). Together this gives: |
286 | |
287 | n_compares = 2*(log2(2) + log2(3) + ... + log2(N+1)) = 2*log2((N+1)!) |
288 | |
289 | then cost(tree_creation) = n_compares*ROWID_COMPARE_COST; |
290 | |
291 | Total cost of creating trees: |
292 | (n_trees - 1)*max_size_tree_cost + non_max_size_tree_cost. |
293 | |
294 | Approximate value of log2(N!) is calculated by log2_n_fact function. |
295 | |
296 | 2. Cost of merging. |
297 | If only one tree is created by Unique no merging will be necessary. |
298 | Otherwise, we model execution of merge_many_buff function and count |
299 | #of merges. (The reason behind this is that number of buffers is small, |
300 | while size of buffers is big and we don't want to loose precision with |
301 | O(x)-style formula) |
302 | |
303 | 3. If only one tree is created by Unique no disk io will happen. |
304 | Otherwise, ceil(key_len*n_keys) disk seeks are necessary. We assume |
305 | these will be random seeks. |
306 | */ |
307 | |
308 | double Unique::get_use_cost(uint *buffer, size_t nkeys, uint key_size, |
309 | size_t max_in_memory_size, |
310 | uint compare_factor, |
311 | bool intersect_fl, bool *in_memory) |
312 | { |
313 | size_t max_elements_in_tree; |
314 | size_t last_tree_elems; |
315 | size_t n_full_trees; /* number of trees in unique - 1 */ |
316 | double result; |
317 | |
318 | max_elements_in_tree= ((size_t) max_in_memory_size / |
319 | ALIGN_SIZE(sizeof(TREE_ELEMENT)+key_size)); |
320 | |
321 | n_full_trees= nkeys / max_elements_in_tree; |
322 | last_tree_elems= nkeys % max_elements_in_tree; |
323 | |
324 | /* Calculate cost of creating trees */ |
325 | result= 2*log2_n_fact(last_tree_elems + 1.0); |
326 | if (n_full_trees) |
327 | result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0); |
328 | result /= compare_factor; |
329 | |
330 | DBUG_PRINT("info" ,("unique trees sizes: %u=%u*%u + %u" , (uint)nkeys, |
331 | (uint)n_full_trees, |
332 | (uint)(n_full_trees?max_elements_in_tree:0), |
333 | (uint)last_tree_elems)); |
334 | |
335 | if (in_memory) |
336 | *in_memory= !n_full_trees; |
337 | |
338 | if (!n_full_trees) |
339 | return result; |
340 | |
341 | /* |
342 | There is more then one tree and merging is necessary. |
343 | First, add cost of writing all trees to disk, assuming that all disk |
344 | writes are sequential. |
345 | */ |
346 | result += DISK_SEEK_BASE_COST * n_full_trees * |
347 | ceil(((double) key_size)*max_elements_in_tree / IO_SIZE); |
348 | result += DISK_SEEK_BASE_COST * ceil(((double) key_size)*last_tree_elems / IO_SIZE); |
349 | |
350 | /* Cost of merge */ |
351 | if (intersect_fl) |
352 | key_size+= sizeof(element_count); |
353 | double merge_cost= get_merge_many_buffs_cost(buffer, (uint)n_full_trees, |
354 | (uint)max_elements_in_tree, |
355 | (uint)last_tree_elems, key_size, |
356 | compare_factor); |
357 | result += merge_cost; |
358 | /* |
359 | Add cost of reading the resulting sequence, assuming there were no |
360 | duplicate elements. |
361 | */ |
362 | result += ceil((double)key_size*nkeys/IO_SIZE); |
363 | |
364 | return result; |
365 | } |
366 | |
367 | Unique::~Unique() |
368 | { |
369 | close_cached_file(&file); |
370 | delete_tree(&tree, 0); |
371 | delete_dynamic(&file_ptrs); |
372 | } |
373 | |
374 | |
375 | /* Write tree to disk; clear tree */ |
376 | bool Unique::flush() |
377 | { |
378 | BUFFPEK file_ptr; |
379 | elements+= tree.elements_in_tree; |
380 | file_ptr.count=tree.elements_in_tree; |
381 | file_ptr.file_pos=my_b_tell(&file); |
382 | |
383 | tree_walk_action action= min_dupl_count ? |
384 | (tree_walk_action) unique_write_to_file_with_count : |
385 | (tree_walk_action) unique_write_to_file; |
386 | if (tree_walk(&tree, action, |
387 | (void*) this, left_root_right) || |
388 | insert_dynamic(&file_ptrs, (uchar*) &file_ptr)) |
389 | return 1; |
390 | delete_tree(&tree, 0); |
391 | return 0; |
392 | } |
393 | |
394 | |
395 | /* |
396 | Clear the tree and the file. |
397 | You must call reset() if you want to reuse Unique after walk(). |
398 | */ |
399 | |
400 | void |
401 | Unique::reset() |
402 | { |
403 | reset_tree(&tree); |
404 | /* |
405 | If elements != 0, some trees were stored in the file (see how |
406 | flush() works). Note, that we can not count on my_b_tell(&file) == 0 |
407 | here, because it can return 0 right after walk(), and walk() does not |
408 | reset any Unique member. |
409 | */ |
410 | if (elements) |
411 | { |
412 | reset_dynamic(&file_ptrs); |
413 | reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1); |
414 | } |
415 | my_free(sort.record_pointers); |
416 | elements= 0; |
417 | tree.flag= 0; |
418 | sort.record_pointers= 0; |
419 | } |
420 | |
421 | /* |
422 | The comparison function, passed to queue_init() in merge_walk() and in |
423 | merge_buffers() when the latter is called from Uniques::get() must |
424 | use comparison function of Uniques::tree, but compare members of struct |
425 | BUFFPEK. |
426 | */ |
427 | |
428 | C_MODE_START |
429 | |
430 | static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2) |
431 | { |
432 | BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg; |
433 | return ctx->key_compare(ctx->key_compare_arg, |
434 | *((uchar **) key_ptr1), *((uchar **)key_ptr2)); |
435 | } |
436 | |
437 | C_MODE_END |
438 | |
439 | |
440 | inline |
441 | element_count get_counter_from_merged_element(void *ptr, uint ofs) |
442 | { |
443 | element_count cnt; |
444 | memcpy((uchar *) &cnt, (uchar *) ptr + ofs, sizeof(element_count)); |
445 | return cnt; |
446 | } |
447 | |
448 | |
449 | inline |
450 | void put_counter_into_merged_element(void *ptr, uint ofs, element_count cnt) |
451 | { |
452 | memcpy((uchar *) ptr + ofs, (uchar *) &cnt, sizeof(element_count)); |
453 | } |
454 | |
455 | |
456 | /* |
457 | DESCRIPTION |
458 | |
459 | Function is very similar to merge_buffers, but instead of writing sorted |
460 | unique keys to the output file, it invokes walk_action for each key. |
461 | This saves I/O if you need to pass through all unique keys only once. |
462 | |
463 | SYNOPSIS |
464 | merge_walk() |
465 | All params are 'IN' (but see comment for begin, end): |
466 | merge_buffer buffer to perform cached piece-by-piece loading |
467 | of trees; initially the buffer is empty |
468 | merge_buffer_size size of merge_buffer. Must be aligned with |
469 | key_length |
470 | key_length size of tree element; key_length * (end - begin) |
471 | must be less or equal than merge_buffer_size. |
472 | begin pointer to BUFFPEK struct for the first tree. |
473 | end pointer to BUFFPEK struct for the last tree; |
474 | end > begin and [begin, end) form a consecutive |
475 | range. BUFFPEKs structs in that range are used and |
476 | overwritten in merge_walk(). |
477 | walk_action element visitor. Action is called for each unique |
478 | key. |
479 | walk_action_arg argument to walk action. Passed to it on each call. |
480 | compare elements comparison function |
481 | compare_arg comparison function argument |
482 | file file with all trees dumped. Trees in the file |
483 | must contain sorted unique values. Cache must be |
484 | initialized in read mode. |
485 | with counters take into account counters for equal merged |
486 | elements |
487 | RETURN VALUE |
488 | 0 ok |
489 | <> 0 error |
490 | */ |
491 | |
492 | static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size, |
493 | uint key_length, BUFFPEK *begin, BUFFPEK *end, |
494 | tree_walk_action walk_action, void *walk_action_arg, |
495 | qsort_cmp2 compare, void *compare_arg, |
496 | IO_CACHE *file, bool with_counters) |
497 | { |
498 | BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg }; |
499 | QUEUE queue; |
500 | if (end <= begin || |
501 | merge_buffer_size < (size_t) (key_length * (end - begin + 1)) || |
502 | init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0, |
503 | buffpek_compare, &compare_context, 0, 0)) |
504 | return 1; |
505 | /* we need space for one key when a piece of merge buffer is re-read */ |
506 | merge_buffer_size-= key_length; |
507 | uchar *save_key_buff= merge_buffer + merge_buffer_size; |
508 | uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) / |
509 | key_length); |
510 | /* if piece_size is aligned reuse_freed_buffer will always hit */ |
511 | uint piece_size= max_key_count_per_piece * key_length; |
512 | ulong bytes_read; /* to hold return value of read_to_buffer */ |
513 | BUFFPEK *top; |
514 | int res= 1; |
515 | uint cnt_ofs= key_length - (with_counters ? sizeof(element_count) : 0); |
516 | element_count cnt; |
517 | /* |
518 | Invariant: queue must contain top element from each tree, until a tree |
519 | is not completely walked through. |
520 | Here we're forcing the invariant, inserting one element from each tree |
521 | to the queue. |
522 | */ |
523 | for (top= begin; top != end; ++top) |
524 | { |
525 | top->base= merge_buffer + (top - begin) * piece_size; |
526 | top->max_keys= max_key_count_per_piece; |
527 | bytes_read= read_to_buffer(file, top, key_length); |
528 | if (unlikely(bytes_read == (ulong) -1)) |
529 | goto end; |
530 | DBUG_ASSERT(bytes_read); |
531 | queue_insert(&queue, (uchar *) top); |
532 | } |
533 | top= (BUFFPEK *) queue_top(&queue); |
534 | while (queue.elements > 1) |
535 | { |
536 | /* |
537 | Every iteration one element is removed from the queue, and one is |
538 | inserted by the rules of the invariant. If two adjacent elements on |
539 | the top of the queue are not equal, biggest one is unique, because all |
540 | elements in each tree are unique. Action is applied only to unique |
541 | elements. |
542 | */ |
543 | void *old_key= top->key; |
544 | /* |
545 | read next key from the cache or from the file and push it to the |
546 | queue; this gives new top. |
547 | */ |
548 | top->key+= key_length; |
549 | if (--top->mem_count) |
550 | queue_replace_top(&queue); |
551 | else /* next piece should be read */ |
552 | { |
553 | /* save old_key not to overwrite it in read_to_buffer */ |
554 | memcpy(save_key_buff, old_key, key_length); |
555 | old_key= save_key_buff; |
556 | bytes_read= read_to_buffer(file, top, key_length); |
557 | if (unlikely(bytes_read == (ulong) -1)) |
558 | goto end; |
559 | else if (bytes_read) /* top->key, top->mem_count are reset */ |
560 | queue_replace_top(&queue); /* in read_to_buffer */ |
561 | else |
562 | { |
563 | /* |
564 | Tree for old 'top' element is empty: remove it from the queue and |
565 | give all its memory to the nearest tree. |
566 | */ |
567 | queue_remove_top(&queue); |
568 | reuse_freed_buff(&queue, top, key_length); |
569 | } |
570 | } |
571 | top= (BUFFPEK *) queue_top(&queue); |
572 | /* new top has been obtained; if old top is unique, apply the action */ |
573 | if (compare(compare_arg, old_key, top->key)) |
574 | { |
575 | cnt= with_counters ? |
576 | get_counter_from_merged_element(old_key, cnt_ofs) : 1; |
577 | if (walk_action(old_key, cnt, walk_action_arg)) |
578 | goto end; |
579 | } |
580 | else if (with_counters) |
581 | { |
582 | cnt= get_counter_from_merged_element(top->key, cnt_ofs); |
583 | cnt+= get_counter_from_merged_element(old_key, cnt_ofs); |
584 | put_counter_into_merged_element(top->key, cnt_ofs, cnt); |
585 | } |
586 | } |
587 | /* |
588 | Applying walk_action to the tail of the last tree: this is safe because |
589 | either we had only one tree in the beginning, either we work with the |
590 | last tree in the queue. |
591 | */ |
592 | do |
593 | { |
594 | do |
595 | { |
596 | |
597 | cnt= with_counters ? |
598 | get_counter_from_merged_element(top->key, cnt_ofs) : 1; |
599 | if (walk_action(top->key, cnt, walk_action_arg)) |
600 | goto end; |
601 | top->key+= key_length; |
602 | } |
603 | while (--top->mem_count); |
604 | bytes_read= read_to_buffer(file, top, key_length); |
605 | if (unlikely(bytes_read == (ulong) -1)) |
606 | goto end; |
607 | } |
608 | while (bytes_read); |
609 | res= 0; |
610 | end: |
611 | delete_queue(&queue); |
612 | return res; |
613 | } |
614 | |
615 | |
616 | /* |
617 | DESCRIPTION |
618 | Walks consecutively through all unique elements: |
619 | if all elements are in memory, then it simply invokes 'tree_walk', else |
620 | all flushed trees are loaded to memory piece-by-piece, pieces are |
621 | sorted, and action is called for each unique value. |
622 | Note: so as merging resets file_ptrs state, this method can change |
623 | internal Unique state to undefined: if you want to reuse Unique after |
624 | walk() you must call reset() first! |
625 | SYNOPSIS |
626 | Unique:walk() |
627 | All params are 'IN': |
628 | table parameter for the call of the merge method |
629 | action function-visitor, typed in include/my_tree.h |
630 | function is called for each unique element |
631 | arg argument for visitor, which is passed to it on each call |
632 | RETURN VALUE |
633 | 0 OK |
634 | <> 0 error |
635 | */ |
636 | |
637 | bool Unique::walk(TABLE *table, tree_walk_action action, void *walk_action_arg) |
638 | { |
639 | int res= 0; |
640 | uchar *merge_buffer; |
641 | |
642 | if (elements == 0) /* the whole tree is in memory */ |
643 | return tree_walk(&tree, action, walk_action_arg, left_root_right); |
644 | |
645 | sort.return_rows= elements+tree.elements_in_tree; |
646 | /* flush current tree to the file to have some memory for merge buffer */ |
647 | if (flush()) |
648 | return 1; |
649 | if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0)) |
650 | return 1; |
651 | /* |
652 | merge_buffer must fit at least MERGEBUFF2 + 1 keys, because |
653 | merge_index() can merge that many BUFFPEKs at once. The extra space for one key |
654 | is needed when a piece of merge buffer is re-read, see merge_walk() |
655 | */ |
656 | size_t buff_sz= MY_MAX(MERGEBUFF2+1, max_in_memory_size/full_size+1) * full_size; |
657 | if (!(merge_buffer = (uchar *)my_malloc(buff_sz, MYF(MY_WME)))) |
658 | return 1; |
659 | if (buff_sz < full_size * (file_ptrs.elements + 1UL)) |
660 | res= merge(table, merge_buffer, buff_sz >= full_size * MERGEBUFF2) ; |
661 | |
662 | if (!res) |
663 | { |
664 | res= merge_walk(merge_buffer, buff_sz, full_size, |
665 | (BUFFPEK *) file_ptrs.buffer, |
666 | (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements, |
667 | action, walk_action_arg, |
668 | tree.compare, tree.custom_arg, &file, with_counters); |
669 | } |
670 | my_free(merge_buffer); |
671 | return res; |
672 | } |
673 | |
674 | |
675 | /* |
676 | DESCRIPTION |
677 | |
678 | Perform multi-pass sort merge of the elements using the buffer buff as |
679 | the merge buffer. The last pass is not performed if without_last_merge is |
680 | TRUE. |
681 | |
682 | SYNOPSIS |
683 | Unique:merge() |
684 | All params are 'IN': |
685 | table the parameter to access sort context |
686 | buff merge buffer |
687 | without_last_merge TRUE <=> do not perform the last merge |
688 | RETURN VALUE |
689 | 0 OK |
690 | <> 0 error |
691 | */ |
692 | |
693 | bool Unique::merge(TABLE *table, uchar *buff, bool without_last_merge) |
694 | { |
695 | IO_CACHE *outfile= &sort.io_cache; |
696 | BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer; |
697 | uint maxbuffer= file_ptrs.elements - 1; |
698 | my_off_t save_pos; |
699 | bool error= 1; |
700 | Sort_param sort_param; |
701 | |
702 | /* Open cached file for table records if it isn't open */ |
703 | if (! my_b_inited(outfile) && |
704 | open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, |
705 | MYF(MY_WME))) |
706 | return 1; |
707 | |
708 | bzero((char*) &sort_param,sizeof(sort_param)); |
709 | sort_param.max_rows= elements; |
710 | sort_param.sort_form= table; |
711 | sort_param.rec_length= sort_param.sort_length= sort_param.ref_length= |
712 | full_size; |
713 | sort_param.min_dupl_count= min_dupl_count; |
714 | sort_param.res_length= 0; |
715 | sort_param.max_keys_per_buffer= |
716 | (uint) MY_MAX((max_in_memory_size / sort_param.sort_length), MERGEBUFF2); |
717 | sort_param.not_killable= 1; |
718 | |
719 | sort_param.unique_buff= buff +(sort_param.max_keys_per_buffer * |
720 | sort_param.sort_length); |
721 | |
722 | sort_param.compare= (qsort2_cmp) buffpek_compare; |
723 | sort_param.cmp_context.key_compare= tree.compare; |
724 | sort_param.cmp_context.key_compare_arg= tree.custom_arg; |
725 | |
726 | /* Merge the buffers to one file, removing duplicates */ |
727 | if (merge_many_buff(&sort_param,buff,file_ptr,&maxbuffer,&file)) |
728 | goto err; |
729 | if (flush_io_cache(&file) || |
730 | reinit_io_cache(&file,READ_CACHE,0L,0,0)) |
731 | goto err; |
732 | sort_param.res_length= sort_param.rec_length- |
733 | (min_dupl_count ? sizeof(min_dupl_count) : 0); |
734 | if (without_last_merge) |
735 | { |
736 | file_ptrs.elements= maxbuffer+1; |
737 | return 0; |
738 | } |
739 | if (merge_index(&sort_param, buff, file_ptr, maxbuffer, &file, outfile)) |
740 | goto err; |
741 | error= 0; |
742 | err: |
743 | if (flush_io_cache(outfile)) |
744 | error= 1; |
745 | |
746 | /* Setup io_cache for reading */ |
747 | save_pos= outfile->pos_in_file; |
748 | if (reinit_io_cache(outfile,READ_CACHE,0L,0,0)) |
749 | error= 1; |
750 | outfile->end_of_file=save_pos; |
751 | return error; |
752 | } |
753 | |
754 | |
755 | /* |
756 | Allocate memory that can be used with init_records() so that |
757 | rows will be read in priority order. |
758 | */ |
759 | |
760 | bool Unique::get(TABLE *table) |
761 | { |
762 | bool rc= 1; |
763 | uchar *sort_buffer= NULL; |
764 | sort.return_rows= elements+tree.elements_in_tree; |
765 | DBUG_ENTER("Unique::get" ); |
766 | |
767 | if (my_b_tell(&file) == 0) |
768 | { |
769 | /* Whole tree is in memory; Don't use disk if you don't need to */ |
770 | if ((sort.record_pointers= (uchar*) |
771 | my_malloc(size * tree.elements_in_tree, MYF(MY_THREAD_SPECIFIC)))) |
772 | { |
773 | uchar *save_record_pointers= sort.record_pointers; |
774 | tree_walk_action action= min_dupl_count ? |
775 | (tree_walk_action) unique_intersect_write_to_ptrs : |
776 | (tree_walk_action) unique_write_to_ptrs; |
777 | filtered_out_elems= 0; |
778 | (void) tree_walk(&tree, action, |
779 | this, left_root_right); |
780 | /* Restore record_pointers that was changed in by 'action' above */ |
781 | sort.record_pointers= save_record_pointers; |
782 | sort.return_rows-= filtered_out_elems; |
783 | DBUG_RETURN(0); |
784 | } |
785 | } |
786 | /* Not enough memory; Save the result to file && free memory used by tree */ |
787 | if (flush()) |
788 | DBUG_RETURN(1); |
789 | size_t buff_sz= (max_in_memory_size / full_size + 1) * full_size; |
790 | if (!(sort_buffer= (uchar*) my_malloc(buff_sz, |
791 | MYF(MY_THREAD_SPECIFIC|MY_WME)))) |
792 | DBUG_RETURN(1); |
793 | |
794 | if (merge(table, sort_buffer, FALSE)) |
795 | goto err; |
796 | rc= 0; |
797 | |
798 | err: |
799 | my_free(sort_buffer); |
800 | DBUG_RETURN(rc); |
801 | } |
802 | |