| 1 | /* Copyright (c) 2001, 2010, Oracle and/or its affiliates. |
| 2 | Copyright (c) 2010, 2015, MariaDB |
| 3 | |
| 4 | This program is free software; you can redistribute it and/or modify |
| 5 | it under the terms of the GNU General Public License as published by |
| 6 | the Free Software Foundation; version 2 of the License. |
| 7 | |
| 8 | This program is distributed in the hope that it will be useful, |
| 9 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | GNU General Public License for more details. |
| 12 | |
| 13 | You should have received a copy of the GNU General Public License |
| 14 | along with this program; if not, write to the Free Software |
| 15 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
| 16 | |
| 17 | /* |
| 18 | Function to handle quick removal of duplicates |
| 19 | This code is used when doing multi-table deletes to find the rows in |
| 20 | reference tables that needs to be deleted. |
| 21 | |
| 22 | The basic idea is as follows: |
| 23 | |
| 24 | Store first all strings in a binary tree, ignoring duplicates. |
| 25 | When the tree uses more memory than 'max_heap_table_size', |
| 26 | write the tree (in sorted order) out to disk and start with a new tree. |
| 27 | When all data has been generated, merge the trees (removing any found |
| 28 | duplicates). |
| 29 | |
| 30 | The unique entries will be returned in sort order, to ensure that we do the |
| 31 | deletes in disk order. |
| 32 | */ |
| 33 | |
| 34 | #include "mariadb.h" |
| 35 | #include "sql_priv.h" |
| 36 | #include "unireg.h" |
| 37 | #include "sql_sort.h" |
| 38 | #include "queues.h" // QUEUE |
| 39 | #include "my_tree.h" // element_count |
| 40 | #include "uniques.h" // Unique |
| 41 | #include "sql_sort.h" |
| 42 | #include "myisamchk.h" // BUFFPEK |
| 43 | |
| 44 | int unique_write_to_file(uchar* key, element_count count, Unique *unique) |
| 45 | { |
| 46 | /* |
| 47 | Use unique->size (size of element stored in the tree) and not |
| 48 | unique->tree.size_of_element. The latter is different from unique->size |
| 49 | when tree implementation chooses to store pointer to key in TREE_ELEMENT |
| 50 | (instead of storing the element itself there) |
| 51 | */ |
| 52 | return my_b_write(&unique->file, key, unique->size) ? 1 : 0; |
| 53 | } |
| 54 | |
| 55 | int unique_write_to_file_with_count(uchar* key, element_count count, Unique *unique) |
| 56 | { |
| 57 | return my_b_write(&unique->file, key, unique->size) || |
| 58 | my_b_write(&unique->file, (uchar*)&count, sizeof(element_count)) ? 1 : 0; |
| 59 | } |
| 60 | |
| 61 | int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique) |
| 62 | { |
| 63 | memcpy(unique->sort.record_pointers, key, unique->size); |
| 64 | unique->sort.record_pointers+=unique->size; |
| 65 | return 0; |
| 66 | } |
| 67 | |
| 68 | int unique_intersect_write_to_ptrs(uchar* key, element_count count, Unique *unique) |
| 69 | { |
| 70 | if (count >= unique->min_dupl_count) |
| 71 | { |
| 72 | memcpy(unique->sort.record_pointers, key, unique->size); |
| 73 | unique->sort.record_pointers+=unique->size; |
| 74 | } |
| 75 | else |
| 76 | unique->filtered_out_elems++; |
| 77 | return 0; |
| 78 | } |
| 79 | |
| 80 | |
| 81 | Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg, |
| 82 | uint size_arg, size_t max_in_memory_size_arg, |
| 83 | uint min_dupl_count_arg) |
| 84 | :max_in_memory_size(max_in_memory_size_arg), |
| 85 | size(size_arg), |
| 86 | elements(0) |
| 87 | { |
| 88 | my_b_clear(&file); |
| 89 | min_dupl_count= min_dupl_count_arg; |
| 90 | full_size= size; |
| 91 | if (min_dupl_count_arg) |
| 92 | full_size+= sizeof(element_count); |
| 93 | with_counters= MY_TEST(min_dupl_count_arg); |
| 94 | init_tree(&tree, (max_in_memory_size / 16), 0, size, comp_func, |
| 95 | NULL, comp_func_fixed_arg, MYF(MY_THREAD_SPECIFIC)); |
| 96 | /* If the following fail's the next add will also fail */ |
| 97 | my_init_dynamic_array(&file_ptrs, sizeof(BUFFPEK), 16, 16, |
| 98 | MYF(MY_THREAD_SPECIFIC)); |
| 99 | /* |
| 100 | If you change the following, change it in get_max_elements function, too. |
| 101 | */ |
| 102 | max_elements= (ulong) (max_in_memory_size / |
| 103 | ALIGN_SIZE(sizeof(TREE_ELEMENT)+size)); |
| 104 | if (!max_elements) |
| 105 | max_elements= 1; |
| 106 | |
| 107 | (void) open_cached_file(&file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE, |
| 108 | MYF(MY_WME)); |
| 109 | } |
| 110 | |
| 111 | |
| 112 | /* |
| 113 | Calculate log2(n!) |
| 114 | |
| 115 | NOTES |
| 116 | Stirling's approximate formula is used: |
| 117 | |
| 118 | n! ~= sqrt(2*M_PI*n) * (n/M_E)^n |
| 119 | |
| 120 | Derivation of formula used for calculations is as follows: |
| 121 | |
| 122 | log2(n!) = log(n!)/log(2) = log(sqrt(2*M_PI*n)*(n/M_E)^n) / log(2) = |
| 123 | |
| 124 | = (log(2*M_PI*n)/2 + n*log(n/M_E)) / log(2). |
| 125 | */ |
| 126 | |
| 127 | inline double log2_n_fact(double x) |
| 128 | { |
| 129 | return (log(2*M_PI*x)/2 + x*log(x/M_E)) / M_LN2; |
| 130 | } |
| 131 | |
| 132 | |
| 133 | /* |
| 134 | Calculate cost of merge_buffers function call for given sequence of |
| 135 | input stream lengths and store the number of rows in result stream in *last. |
| 136 | |
| 137 | SYNOPSIS |
| 138 | get_merge_buffers_cost() |
| 139 | buff_elems Array of #s of elements in buffers |
| 140 | elem_size Size of element stored in buffer |
| 141 | first Pointer to first merged element size |
| 142 | last Pointer to last merged element size |
| 143 | |
| 144 | RETURN |
| 145 | Cost of merge_buffers operation in disk seeks. |
| 146 | |
| 147 | NOTES |
| 148 | It is assumed that no rows are eliminated during merge. |
| 149 | The cost is calculated as |
| 150 | |
| 151 | cost(read_and_write) + cost(merge_comparisons). |
| 152 | |
| 153 | All bytes in the sequences is read and written back during merge so cost |
| 154 | of disk io is 2*elem_size*total_buf_elems/IO_SIZE (2 is for read + write) |
| 155 | |
| 156 | For comparisons cost calculations we assume that all merged sequences have |
| 157 | the same length, so each of total_buf_size elements will be added to a sort |
| 158 | heap with (n_buffers-1) elements. This gives the comparison cost: |
| 159 | |
| 160 | total_buf_elems* log2(n_buffers) / TIME_FOR_COMPARE_ROWID; |
| 161 | */ |
| 162 | |
| 163 | static double get_merge_buffers_cost(uint *buff_elems, uint elem_size, |
| 164 | uint *first, uint *last, |
| 165 | uint compare_factor) |
| 166 | { |
| 167 | uint total_buf_elems= 0; |
| 168 | for (uint *pbuf= first; pbuf <= last; pbuf++) |
| 169 | total_buf_elems+= *pbuf; |
| 170 | *last= total_buf_elems; |
| 171 | |
| 172 | size_t n_buffers= last - first + 1; |
| 173 | |
| 174 | /* Using log2(n)=log(n)/log(2) formula */ |
| 175 | return 2*((double)total_buf_elems*elem_size) / IO_SIZE + |
| 176 | total_buf_elems*log((double) n_buffers) / (compare_factor * M_LN2); |
| 177 | } |
| 178 | |
| 179 | |
| 180 | /* |
| 181 | Calculate cost of merging buffers into one in Unique::get, i.e. calculate |
| 182 | how long (in terms of disk seeks) the two calls |
| 183 | merge_many_buffs(...); |
| 184 | merge_buffers(...); |
| 185 | will take. |
| 186 | |
| 187 | SYNOPSIS |
| 188 | get_merge_many_buffs_cost() |
| 189 | buffer buffer space for temporary data, at least |
| 190 | Unique::get_cost_calc_buff_size bytes |
| 191 | maxbuffer # of full buffers |
| 192 | max_n_elems # of elements in first maxbuffer buffers |
| 193 | last_n_elems # of elements in last buffer |
| 194 | elem_size size of buffer element |
| 195 | |
| 196 | NOTES |
| 197 | maxbuffer+1 buffers are merged, where first maxbuffer buffers contain |
| 198 | max_n_elems elements each and last buffer contains last_n_elems elements. |
| 199 | |
| 200 | The current implementation does a dumb simulation of merge_many_buffs |
| 201 | function actions. |
| 202 | |
| 203 | RETURN |
| 204 | Cost of merge in disk seeks. |
| 205 | */ |
| 206 | |
| 207 | static double get_merge_many_buffs_cost(uint *buffer, |
| 208 | uint maxbuffer, uint max_n_elems, |
| 209 | uint last_n_elems, int elem_size, |
| 210 | uint compare_factor) |
| 211 | { |
| 212 | int i; |
| 213 | double total_cost= 0.0; |
| 214 | uint *buff_elems= buffer; /* #s of elements in each of merged sequences */ |
| 215 | |
| 216 | /* |
| 217 | Set initial state: first maxbuffer sequences contain max_n_elems elements |
| 218 | each, last sequence contains last_n_elems elements. |
| 219 | */ |
| 220 | for (i = 0; i < (int)maxbuffer; i++) |
| 221 | buff_elems[i]= max_n_elems; |
| 222 | buff_elems[maxbuffer]= last_n_elems; |
| 223 | |
| 224 | /* |
| 225 | Do it exactly as merge_many_buff function does, calling |
| 226 | get_merge_buffers_cost to get cost of merge_buffers. |
| 227 | */ |
| 228 | if (maxbuffer >= MERGEBUFF2) |
| 229 | { |
| 230 | while (maxbuffer >= MERGEBUFF2) |
| 231 | { |
| 232 | uint lastbuff= 0; |
| 233 | for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF) |
| 234 | { |
| 235 | total_cost+=get_merge_buffers_cost(buff_elems, elem_size, |
| 236 | buff_elems + i, |
| 237 | buff_elems + i + MERGEBUFF-1, |
| 238 | compare_factor); |
| 239 | lastbuff++; |
| 240 | } |
| 241 | total_cost+=get_merge_buffers_cost(buff_elems, elem_size, |
| 242 | buff_elems + i, |
| 243 | buff_elems + maxbuffer, |
| 244 | compare_factor); |
| 245 | maxbuffer= lastbuff; |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | /* Simulate final merge_buff call. */ |
| 250 | total_cost += get_merge_buffers_cost(buff_elems, elem_size, |
| 251 | buff_elems, buff_elems + maxbuffer, |
| 252 | compare_factor); |
| 253 | return total_cost; |
| 254 | } |
| 255 | |
| 256 | |
| 257 | /* |
| 258 | Calculate cost of using Unique for processing nkeys elements of size |
| 259 | key_size using max_in_memory_size memory. |
| 260 | |
| 261 | SYNOPSIS |
| 262 | Unique::get_use_cost() |
| 263 | buffer space for temporary data, use Unique::get_cost_calc_buff_size |
| 264 | to get # bytes needed. |
| 265 | nkeys #of elements in Unique |
| 266 | key_size size of each elements in bytes |
| 267 | max_in_memory_size amount of memory Unique will be allowed to use |
| 268 | compare_factor used to calculate cost of one comparison |
| 269 | write_fl if the result must be saved written to disk |
| 270 | in_memory_elems OUT estimate of the number of elements in memory |
| 271 | if disk is not used |
| 272 | |
| 273 | RETURN |
| 274 | Cost in disk seeks. |
| 275 | |
| 276 | NOTES |
| 277 | cost(using_unqiue) = |
| 278 | cost(create_trees) + (see #1) |
| 279 | cost(merge) + (see #2) |
| 280 | cost(read_result) (see #3) |
| 281 | |
| 282 | 1. Cost of trees creation |
| 283 | For each Unique::put operation there will be 2*log2(n+1) elements |
| 284 | comparisons, where n runs from 1 tree_size (we assume that all added |
| 285 | elements are different). Together this gives: |
| 286 | |
| 287 | n_compares = 2*(log2(2) + log2(3) + ... + log2(N+1)) = 2*log2((N+1)!) |
| 288 | |
| 289 | then cost(tree_creation) = n_compares*ROWID_COMPARE_COST; |
| 290 | |
| 291 | Total cost of creating trees: |
| 292 | (n_trees - 1)*max_size_tree_cost + non_max_size_tree_cost. |
| 293 | |
| 294 | Approximate value of log2(N!) is calculated by log2_n_fact function. |
| 295 | |
| 296 | 2. Cost of merging. |
| 297 | If only one tree is created by Unique no merging will be necessary. |
| 298 | Otherwise, we model execution of merge_many_buff function and count |
| 299 | #of merges. (The reason behind this is that number of buffers is small, |
| 300 | while size of buffers is big and we don't want to loose precision with |
| 301 | O(x)-style formula) |
| 302 | |
| 303 | 3. If only one tree is created by Unique no disk io will happen. |
| 304 | Otherwise, ceil(key_len*n_keys) disk seeks are necessary. We assume |
| 305 | these will be random seeks. |
| 306 | */ |
| 307 | |
| 308 | double Unique::get_use_cost(uint *buffer, size_t nkeys, uint key_size, |
| 309 | size_t max_in_memory_size, |
| 310 | uint compare_factor, |
| 311 | bool intersect_fl, bool *in_memory) |
| 312 | { |
| 313 | size_t max_elements_in_tree; |
| 314 | size_t last_tree_elems; |
| 315 | size_t n_full_trees; /* number of trees in unique - 1 */ |
| 316 | double result; |
| 317 | |
| 318 | max_elements_in_tree= ((size_t) max_in_memory_size / |
| 319 | ALIGN_SIZE(sizeof(TREE_ELEMENT)+key_size)); |
| 320 | |
| 321 | n_full_trees= nkeys / max_elements_in_tree; |
| 322 | last_tree_elems= nkeys % max_elements_in_tree; |
| 323 | |
| 324 | /* Calculate cost of creating trees */ |
| 325 | result= 2*log2_n_fact(last_tree_elems + 1.0); |
| 326 | if (n_full_trees) |
| 327 | result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0); |
| 328 | result /= compare_factor; |
| 329 | |
| 330 | DBUG_PRINT("info" ,("unique trees sizes: %u=%u*%u + %u" , (uint)nkeys, |
| 331 | (uint)n_full_trees, |
| 332 | (uint)(n_full_trees?max_elements_in_tree:0), |
| 333 | (uint)last_tree_elems)); |
| 334 | |
| 335 | if (in_memory) |
| 336 | *in_memory= !n_full_trees; |
| 337 | |
| 338 | if (!n_full_trees) |
| 339 | return result; |
| 340 | |
| 341 | /* |
| 342 | There is more then one tree and merging is necessary. |
| 343 | First, add cost of writing all trees to disk, assuming that all disk |
| 344 | writes are sequential. |
| 345 | */ |
| 346 | result += DISK_SEEK_BASE_COST * n_full_trees * |
| 347 | ceil(((double) key_size)*max_elements_in_tree / IO_SIZE); |
| 348 | result += DISK_SEEK_BASE_COST * ceil(((double) key_size)*last_tree_elems / IO_SIZE); |
| 349 | |
| 350 | /* Cost of merge */ |
| 351 | if (intersect_fl) |
| 352 | key_size+= sizeof(element_count); |
| 353 | double merge_cost= get_merge_many_buffs_cost(buffer, (uint)n_full_trees, |
| 354 | (uint)max_elements_in_tree, |
| 355 | (uint)last_tree_elems, key_size, |
| 356 | compare_factor); |
| 357 | result += merge_cost; |
| 358 | /* |
| 359 | Add cost of reading the resulting sequence, assuming there were no |
| 360 | duplicate elements. |
| 361 | */ |
| 362 | result += ceil((double)key_size*nkeys/IO_SIZE); |
| 363 | |
| 364 | return result; |
| 365 | } |
| 366 | |
| 367 | Unique::~Unique() |
| 368 | { |
| 369 | close_cached_file(&file); |
| 370 | delete_tree(&tree, 0); |
| 371 | delete_dynamic(&file_ptrs); |
| 372 | } |
| 373 | |
| 374 | |
| 375 | /* Write tree to disk; clear tree */ |
| 376 | bool Unique::flush() |
| 377 | { |
| 378 | BUFFPEK file_ptr; |
| 379 | elements+= tree.elements_in_tree; |
| 380 | file_ptr.count=tree.elements_in_tree; |
| 381 | file_ptr.file_pos=my_b_tell(&file); |
| 382 | |
| 383 | tree_walk_action action= min_dupl_count ? |
| 384 | (tree_walk_action) unique_write_to_file_with_count : |
| 385 | (tree_walk_action) unique_write_to_file; |
| 386 | if (tree_walk(&tree, action, |
| 387 | (void*) this, left_root_right) || |
| 388 | insert_dynamic(&file_ptrs, (uchar*) &file_ptr)) |
| 389 | return 1; |
| 390 | delete_tree(&tree, 0); |
| 391 | return 0; |
| 392 | } |
| 393 | |
| 394 | |
| 395 | /* |
| 396 | Clear the tree and the file. |
| 397 | You must call reset() if you want to reuse Unique after walk(). |
| 398 | */ |
| 399 | |
| 400 | void |
| 401 | Unique::reset() |
| 402 | { |
| 403 | reset_tree(&tree); |
| 404 | /* |
| 405 | If elements != 0, some trees were stored in the file (see how |
| 406 | flush() works). Note, that we can not count on my_b_tell(&file) == 0 |
| 407 | here, because it can return 0 right after walk(), and walk() does not |
| 408 | reset any Unique member. |
| 409 | */ |
| 410 | if (elements) |
| 411 | { |
| 412 | reset_dynamic(&file_ptrs); |
| 413 | reinit_io_cache(&file, WRITE_CACHE, 0L, 0, 1); |
| 414 | } |
| 415 | my_free(sort.record_pointers); |
| 416 | elements= 0; |
| 417 | tree.flag= 0; |
| 418 | sort.record_pointers= 0; |
| 419 | } |
| 420 | |
| 421 | /* |
| 422 | The comparison function, passed to queue_init() in merge_walk() and in |
| 423 | merge_buffers() when the latter is called from Uniques::get() must |
| 424 | use comparison function of Uniques::tree, but compare members of struct |
| 425 | BUFFPEK. |
| 426 | */ |
| 427 | |
| 428 | C_MODE_START |
| 429 | |
| 430 | static int buffpek_compare(void *arg, uchar *key_ptr1, uchar *key_ptr2) |
| 431 | { |
| 432 | BUFFPEK_COMPARE_CONTEXT *ctx= (BUFFPEK_COMPARE_CONTEXT *) arg; |
| 433 | return ctx->key_compare(ctx->key_compare_arg, |
| 434 | *((uchar **) key_ptr1), *((uchar **)key_ptr2)); |
| 435 | } |
| 436 | |
| 437 | C_MODE_END |
| 438 | |
| 439 | |
| 440 | inline |
| 441 | element_count get_counter_from_merged_element(void *ptr, uint ofs) |
| 442 | { |
| 443 | element_count cnt; |
| 444 | memcpy((uchar *) &cnt, (uchar *) ptr + ofs, sizeof(element_count)); |
| 445 | return cnt; |
| 446 | } |
| 447 | |
| 448 | |
| 449 | inline |
| 450 | void put_counter_into_merged_element(void *ptr, uint ofs, element_count cnt) |
| 451 | { |
| 452 | memcpy((uchar *) ptr + ofs, (uchar *) &cnt, sizeof(element_count)); |
| 453 | } |
| 454 | |
| 455 | |
| 456 | /* |
| 457 | DESCRIPTION |
| 458 | |
| 459 | Function is very similar to merge_buffers, but instead of writing sorted |
| 460 | unique keys to the output file, it invokes walk_action for each key. |
| 461 | This saves I/O if you need to pass through all unique keys only once. |
| 462 | |
| 463 | SYNOPSIS |
| 464 | merge_walk() |
| 465 | All params are 'IN' (but see comment for begin, end): |
| 466 | merge_buffer buffer to perform cached piece-by-piece loading |
| 467 | of trees; initially the buffer is empty |
| 468 | merge_buffer_size size of merge_buffer. Must be aligned with |
| 469 | key_length |
| 470 | key_length size of tree element; key_length * (end - begin) |
| 471 | must be less or equal than merge_buffer_size. |
| 472 | begin pointer to BUFFPEK struct for the first tree. |
| 473 | end pointer to BUFFPEK struct for the last tree; |
| 474 | end > begin and [begin, end) form a consecutive |
| 475 | range. BUFFPEKs structs in that range are used and |
| 476 | overwritten in merge_walk(). |
| 477 | walk_action element visitor. Action is called for each unique |
| 478 | key. |
| 479 | walk_action_arg argument to walk action. Passed to it on each call. |
| 480 | compare elements comparison function |
| 481 | compare_arg comparison function argument |
| 482 | file file with all trees dumped. Trees in the file |
| 483 | must contain sorted unique values. Cache must be |
| 484 | initialized in read mode. |
| 485 | with counters take into account counters for equal merged |
| 486 | elements |
| 487 | RETURN VALUE |
| 488 | 0 ok |
| 489 | <> 0 error |
| 490 | */ |
| 491 | |
| 492 | static bool merge_walk(uchar *merge_buffer, size_t merge_buffer_size, |
| 493 | uint key_length, BUFFPEK *begin, BUFFPEK *end, |
| 494 | tree_walk_action walk_action, void *walk_action_arg, |
| 495 | qsort_cmp2 compare, void *compare_arg, |
| 496 | IO_CACHE *file, bool with_counters) |
| 497 | { |
| 498 | BUFFPEK_COMPARE_CONTEXT compare_context = { compare, compare_arg }; |
| 499 | QUEUE queue; |
| 500 | if (end <= begin || |
| 501 | merge_buffer_size < (size_t) (key_length * (end - begin + 1)) || |
| 502 | init_queue(&queue, (uint) (end - begin), offsetof(BUFFPEK, key), 0, |
| 503 | buffpek_compare, &compare_context, 0, 0)) |
| 504 | return 1; |
| 505 | /* we need space for one key when a piece of merge buffer is re-read */ |
| 506 | merge_buffer_size-= key_length; |
| 507 | uchar *save_key_buff= merge_buffer + merge_buffer_size; |
| 508 | uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) / |
| 509 | key_length); |
| 510 | /* if piece_size is aligned reuse_freed_buffer will always hit */ |
| 511 | uint piece_size= max_key_count_per_piece * key_length; |
| 512 | ulong bytes_read; /* to hold return value of read_to_buffer */ |
| 513 | BUFFPEK *top; |
| 514 | int res= 1; |
| 515 | uint cnt_ofs= key_length - (with_counters ? sizeof(element_count) : 0); |
| 516 | element_count cnt; |
| 517 | /* |
| 518 | Invariant: queue must contain top element from each tree, until a tree |
| 519 | is not completely walked through. |
| 520 | Here we're forcing the invariant, inserting one element from each tree |
| 521 | to the queue. |
| 522 | */ |
| 523 | for (top= begin; top != end; ++top) |
| 524 | { |
| 525 | top->base= merge_buffer + (top - begin) * piece_size; |
| 526 | top->max_keys= max_key_count_per_piece; |
| 527 | bytes_read= read_to_buffer(file, top, key_length); |
| 528 | if (unlikely(bytes_read == (ulong) -1)) |
| 529 | goto end; |
| 530 | DBUG_ASSERT(bytes_read); |
| 531 | queue_insert(&queue, (uchar *) top); |
| 532 | } |
| 533 | top= (BUFFPEK *) queue_top(&queue); |
| 534 | while (queue.elements > 1) |
| 535 | { |
| 536 | /* |
| 537 | Every iteration one element is removed from the queue, and one is |
| 538 | inserted by the rules of the invariant. If two adjacent elements on |
| 539 | the top of the queue are not equal, biggest one is unique, because all |
| 540 | elements in each tree are unique. Action is applied only to unique |
| 541 | elements. |
| 542 | */ |
| 543 | void *old_key= top->key; |
| 544 | /* |
| 545 | read next key from the cache or from the file and push it to the |
| 546 | queue; this gives new top. |
| 547 | */ |
| 548 | top->key+= key_length; |
| 549 | if (--top->mem_count) |
| 550 | queue_replace_top(&queue); |
| 551 | else /* next piece should be read */ |
| 552 | { |
| 553 | /* save old_key not to overwrite it in read_to_buffer */ |
| 554 | memcpy(save_key_buff, old_key, key_length); |
| 555 | old_key= save_key_buff; |
| 556 | bytes_read= read_to_buffer(file, top, key_length); |
| 557 | if (unlikely(bytes_read == (ulong) -1)) |
| 558 | goto end; |
| 559 | else if (bytes_read) /* top->key, top->mem_count are reset */ |
| 560 | queue_replace_top(&queue); /* in read_to_buffer */ |
| 561 | else |
| 562 | { |
| 563 | /* |
| 564 | Tree for old 'top' element is empty: remove it from the queue and |
| 565 | give all its memory to the nearest tree. |
| 566 | */ |
| 567 | queue_remove_top(&queue); |
| 568 | reuse_freed_buff(&queue, top, key_length); |
| 569 | } |
| 570 | } |
| 571 | top= (BUFFPEK *) queue_top(&queue); |
| 572 | /* new top has been obtained; if old top is unique, apply the action */ |
| 573 | if (compare(compare_arg, old_key, top->key)) |
| 574 | { |
| 575 | cnt= with_counters ? |
| 576 | get_counter_from_merged_element(old_key, cnt_ofs) : 1; |
| 577 | if (walk_action(old_key, cnt, walk_action_arg)) |
| 578 | goto end; |
| 579 | } |
| 580 | else if (with_counters) |
| 581 | { |
| 582 | cnt= get_counter_from_merged_element(top->key, cnt_ofs); |
| 583 | cnt+= get_counter_from_merged_element(old_key, cnt_ofs); |
| 584 | put_counter_into_merged_element(top->key, cnt_ofs, cnt); |
| 585 | } |
| 586 | } |
| 587 | /* |
| 588 | Applying walk_action to the tail of the last tree: this is safe because |
| 589 | either we had only one tree in the beginning, either we work with the |
| 590 | last tree in the queue. |
| 591 | */ |
| 592 | do |
| 593 | { |
| 594 | do |
| 595 | { |
| 596 | |
| 597 | cnt= with_counters ? |
| 598 | get_counter_from_merged_element(top->key, cnt_ofs) : 1; |
| 599 | if (walk_action(top->key, cnt, walk_action_arg)) |
| 600 | goto end; |
| 601 | top->key+= key_length; |
| 602 | } |
| 603 | while (--top->mem_count); |
| 604 | bytes_read= read_to_buffer(file, top, key_length); |
| 605 | if (unlikely(bytes_read == (ulong) -1)) |
| 606 | goto end; |
| 607 | } |
| 608 | while (bytes_read); |
| 609 | res= 0; |
| 610 | end: |
| 611 | delete_queue(&queue); |
| 612 | return res; |
| 613 | } |
| 614 | |
| 615 | |
| 616 | /* |
| 617 | DESCRIPTION |
| 618 | Walks consecutively through all unique elements: |
| 619 | if all elements are in memory, then it simply invokes 'tree_walk', else |
| 620 | all flushed trees are loaded to memory piece-by-piece, pieces are |
| 621 | sorted, and action is called for each unique value. |
| 622 | Note: so as merging resets file_ptrs state, this method can change |
| 623 | internal Unique state to undefined: if you want to reuse Unique after |
| 624 | walk() you must call reset() first! |
| 625 | SYNOPSIS |
| 626 | Unique:walk() |
| 627 | All params are 'IN': |
| 628 | table parameter for the call of the merge method |
| 629 | action function-visitor, typed in include/my_tree.h |
| 630 | function is called for each unique element |
| 631 | arg argument for visitor, which is passed to it on each call |
| 632 | RETURN VALUE |
| 633 | 0 OK |
| 634 | <> 0 error |
| 635 | */ |
| 636 | |
| 637 | bool Unique::walk(TABLE *table, tree_walk_action action, void *walk_action_arg) |
| 638 | { |
| 639 | int res= 0; |
| 640 | uchar *merge_buffer; |
| 641 | |
| 642 | if (elements == 0) /* the whole tree is in memory */ |
| 643 | return tree_walk(&tree, action, walk_action_arg, left_root_right); |
| 644 | |
| 645 | sort.return_rows= elements+tree.elements_in_tree; |
| 646 | /* flush current tree to the file to have some memory for merge buffer */ |
| 647 | if (flush()) |
| 648 | return 1; |
| 649 | if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0)) |
| 650 | return 1; |
| 651 | /* |
| 652 | merge_buffer must fit at least MERGEBUFF2 + 1 keys, because |
| 653 | merge_index() can merge that many BUFFPEKs at once. The extra space for one key |
| 654 | is needed when a piece of merge buffer is re-read, see merge_walk() |
| 655 | */ |
| 656 | size_t buff_sz= MY_MAX(MERGEBUFF2+1, max_in_memory_size/full_size+1) * full_size; |
| 657 | if (!(merge_buffer = (uchar *)my_malloc(buff_sz, MYF(MY_WME)))) |
| 658 | return 1; |
| 659 | if (buff_sz < full_size * (file_ptrs.elements + 1UL)) |
| 660 | res= merge(table, merge_buffer, buff_sz >= full_size * MERGEBUFF2) ; |
| 661 | |
| 662 | if (!res) |
| 663 | { |
| 664 | res= merge_walk(merge_buffer, buff_sz, full_size, |
| 665 | (BUFFPEK *) file_ptrs.buffer, |
| 666 | (BUFFPEK *) file_ptrs.buffer + file_ptrs.elements, |
| 667 | action, walk_action_arg, |
| 668 | tree.compare, tree.custom_arg, &file, with_counters); |
| 669 | } |
| 670 | my_free(merge_buffer); |
| 671 | return res; |
| 672 | } |
| 673 | |
| 674 | |
| 675 | /* |
| 676 | DESCRIPTION |
| 677 | |
| 678 | Perform multi-pass sort merge of the elements using the buffer buff as |
| 679 | the merge buffer. The last pass is not performed if without_last_merge is |
| 680 | TRUE. |
| 681 | |
| 682 | SYNOPSIS |
| 683 | Unique:merge() |
| 684 | All params are 'IN': |
| 685 | table the parameter to access sort context |
| 686 | buff merge buffer |
| 687 | without_last_merge TRUE <=> do not perform the last merge |
| 688 | RETURN VALUE |
| 689 | 0 OK |
| 690 | <> 0 error |
| 691 | */ |
| 692 | |
| 693 | bool Unique::merge(TABLE *table, uchar *buff, bool without_last_merge) |
| 694 | { |
| 695 | IO_CACHE *outfile= &sort.io_cache; |
| 696 | BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer; |
| 697 | uint maxbuffer= file_ptrs.elements - 1; |
| 698 | my_off_t save_pos; |
| 699 | bool error= 1; |
| 700 | Sort_param sort_param; |
| 701 | |
| 702 | /* Open cached file for table records if it isn't open */ |
| 703 | if (! my_b_inited(outfile) && |
| 704 | open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER, |
| 705 | MYF(MY_WME))) |
| 706 | return 1; |
| 707 | |
| 708 | bzero((char*) &sort_param,sizeof(sort_param)); |
| 709 | sort_param.max_rows= elements; |
| 710 | sort_param.sort_form= table; |
| 711 | sort_param.rec_length= sort_param.sort_length= sort_param.ref_length= |
| 712 | full_size; |
| 713 | sort_param.min_dupl_count= min_dupl_count; |
| 714 | sort_param.res_length= 0; |
| 715 | sort_param.max_keys_per_buffer= |
| 716 | (uint) MY_MAX((max_in_memory_size / sort_param.sort_length), MERGEBUFF2); |
| 717 | sort_param.not_killable= 1; |
| 718 | |
| 719 | sort_param.unique_buff= buff +(sort_param.max_keys_per_buffer * |
| 720 | sort_param.sort_length); |
| 721 | |
| 722 | sort_param.compare= (qsort2_cmp) buffpek_compare; |
| 723 | sort_param.cmp_context.key_compare= tree.compare; |
| 724 | sort_param.cmp_context.key_compare_arg= tree.custom_arg; |
| 725 | |
| 726 | /* Merge the buffers to one file, removing duplicates */ |
| 727 | if (merge_many_buff(&sort_param,buff,file_ptr,&maxbuffer,&file)) |
| 728 | goto err; |
| 729 | if (flush_io_cache(&file) || |
| 730 | reinit_io_cache(&file,READ_CACHE,0L,0,0)) |
| 731 | goto err; |
| 732 | sort_param.res_length= sort_param.rec_length- |
| 733 | (min_dupl_count ? sizeof(min_dupl_count) : 0); |
| 734 | if (without_last_merge) |
| 735 | { |
| 736 | file_ptrs.elements= maxbuffer+1; |
| 737 | return 0; |
| 738 | } |
| 739 | if (merge_index(&sort_param, buff, file_ptr, maxbuffer, &file, outfile)) |
| 740 | goto err; |
| 741 | error= 0; |
| 742 | err: |
| 743 | if (flush_io_cache(outfile)) |
| 744 | error= 1; |
| 745 | |
| 746 | /* Setup io_cache for reading */ |
| 747 | save_pos= outfile->pos_in_file; |
| 748 | if (reinit_io_cache(outfile,READ_CACHE,0L,0,0)) |
| 749 | error= 1; |
| 750 | outfile->end_of_file=save_pos; |
| 751 | return error; |
| 752 | } |
| 753 | |
| 754 | |
| 755 | /* |
| 756 | Allocate memory that can be used with init_records() so that |
| 757 | rows will be read in priority order. |
| 758 | */ |
| 759 | |
| 760 | bool Unique::get(TABLE *table) |
| 761 | { |
| 762 | bool rc= 1; |
| 763 | uchar *sort_buffer= NULL; |
| 764 | sort.return_rows= elements+tree.elements_in_tree; |
| 765 | DBUG_ENTER("Unique::get" ); |
| 766 | |
| 767 | if (my_b_tell(&file) == 0) |
| 768 | { |
| 769 | /* Whole tree is in memory; Don't use disk if you don't need to */ |
| 770 | if ((sort.record_pointers= (uchar*) |
| 771 | my_malloc(size * tree.elements_in_tree, MYF(MY_THREAD_SPECIFIC)))) |
| 772 | { |
| 773 | uchar *save_record_pointers= sort.record_pointers; |
| 774 | tree_walk_action action= min_dupl_count ? |
| 775 | (tree_walk_action) unique_intersect_write_to_ptrs : |
| 776 | (tree_walk_action) unique_write_to_ptrs; |
| 777 | filtered_out_elems= 0; |
| 778 | (void) tree_walk(&tree, action, |
| 779 | this, left_root_right); |
| 780 | /* Restore record_pointers that was changed in by 'action' above */ |
| 781 | sort.record_pointers= save_record_pointers; |
| 782 | sort.return_rows-= filtered_out_elems; |
| 783 | DBUG_RETURN(0); |
| 784 | } |
| 785 | } |
| 786 | /* Not enough memory; Save the result to file && free memory used by tree */ |
| 787 | if (flush()) |
| 788 | DBUG_RETURN(1); |
| 789 | size_t buff_sz= (max_in_memory_size / full_size + 1) * full_size; |
| 790 | if (!(sort_buffer= (uchar*) my_malloc(buff_sz, |
| 791 | MYF(MY_THREAD_SPECIFIC|MY_WME)))) |
| 792 | DBUG_RETURN(1); |
| 793 | |
| 794 | if (merge(table, sort_buffer, FALSE)) |
| 795 | goto err; |
| 796 | rc= 0; |
| 797 | |
| 798 | err: |
| 799 | my_free(sort_buffer); |
| 800 | DBUG_RETURN(rc); |
| 801 | } |
| 802 | |