1/* Copyright (C) 2006 MySQL AB & Alexey Botchkov & MySQL Finland AB
2 & TCX DataKonsult AB
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17#include "maria_def.h"
18#include "trnman.h"
19#include "ma_key_recover.h"
20
21#ifdef HAVE_RTREE_KEYS
22
23#include "ma_rt_index.h"
24#include "ma_rt_key.h"
25#include "ma_rt_mbr.h"
26
27typedef struct
28{
29 double square;
30 int n_node;
31 const uchar *key;
32 double *coords;
33} SplitStruct;
34
35inline static double *reserve_coords(double **d_buffer, int n_dim)
36{
37 double *coords= *d_buffer;
38 (*d_buffer)+= n_dim * 2;
39 return coords;
40}
41
42static void mbr_join(double *a, const double *b, int n_dim)
43{
44 double *end= a + n_dim * 2;
45 do
46 {
47 if (a[0] > b[0])
48 a[0]= b[0];
49
50 if (a[1] < b[1])
51 a[1]= b[1];
52
53 a+= 2;
54 b+= 2;
55 } while (a != end);
56}
57
58/*
59Counts the square of mbr which is a join of a and b
60*/
61static double mbr_join_square(const double *a, const double *b, int n_dim)
62{
63 const double *end= a + n_dim * 2;
64 double square= 1.0;
65 do
66 {
67 square *=
68 ((a[1] < b[1]) ? b[1] : a[1]) - ((a[0] > b[0]) ? b[0] : a[0]);
69
70 a+= 2;
71 b+= 2;
72 } while (a != end);
73
74 return square;
75}
76
77static double count_square(const double *a, int n_dim)
78{
79 const double *end= a + n_dim * 2;
80 double square= 1.0;
81 do
82 {
83 square *= a[1] - a[0];
84 a+= 2;
85 } while (a != end);
86 return square;
87}
88
89inline static void copy_coords(double *dst, const double *src, int n_dim)
90{
91 memcpy(dst, src, sizeof(double) * (n_dim * 2));
92}
93
94/**
95 Select two nodes to collect group upon.
96
97 Note that such function uses 'double' arithmetic so may behave differently
98 on different platforms/builds. There are others in this file.
99*/
100static void pick_seeds(SplitStruct *node, int n_entries,
101 SplitStruct **seed_a, SplitStruct **seed_b, int n_dim)
102{
103 SplitStruct *cur1;
104 SplitStruct *lim1= node + (n_entries - 1);
105 SplitStruct *cur2;
106 SplitStruct *lim2= node + n_entries;
107
108 double max_d= -DBL_MAX;
109 double d;
110
111 for (cur1= node; cur1 < lim1; cur1++)
112 {
113 for (cur2=cur1 + 1; cur2 < lim2; cur2++)
114 {
115
116 d= mbr_join_square(cur1->coords, cur2->coords, n_dim) - cur1->square -
117 cur2->square;
118 if (d > max_d)
119 {
120 max_d= d;
121 *seed_a= cur1;
122 *seed_b= cur2;
123 }
124 }
125 }
126}
127
128/*
129Select next node and group where to add
130*/
131static void pick_next(SplitStruct *node, int n_entries, double *g1, double *g2,
132 SplitStruct **choice, int *n_group, int n_dim)
133{
134 SplitStruct *cur= node;
135 SplitStruct *end= node + n_entries;
136
137 double max_diff= -DBL_MAX;
138
139 for (; cur < end; cur++)
140 {
141 double diff;
142 double abs_diff;
143
144 if (cur->n_node)
145 {
146 continue;
147 }
148
149 diff= mbr_join_square(g1, cur->coords, n_dim) -
150 mbr_join_square(g2, cur->coords, n_dim);
151
152 abs_diff= fabs(diff);
153 if (abs_diff > max_diff)
154 {
155 max_diff= abs_diff;
156 *n_group= 1 + (diff > 0);
157 *choice= cur;
158 }
159 }
160}
161
162/*
163Mark not-in-group entries as n_group
164*/
165static void mark_all_entries(SplitStruct *node, int n_entries, int n_group)
166{
167 SplitStruct *cur= node;
168 SplitStruct *end= node + n_entries;
169
170 for (; cur < end; cur++)
171 {
172 if (cur->n_node)
173 {
174 continue;
175 }
176 cur->n_node= n_group;
177 }
178}
179
180static int split_maria_rtree_node(SplitStruct *node, int n_entries,
181 int all_size, /* Total key's size */
182 int key_size,
183 int min_size, /* Minimal group size */
184 int size1, int size2 /* initial group sizes */,
185 double **d_buffer, int n_dim)
186{
187 SplitStruct *cur;
188 SplitStruct *UNINIT_VAR(a);
189 SplitStruct *UNINIT_VAR(b);
190 double *g1= reserve_coords(d_buffer, n_dim);
191 double *g2= reserve_coords(d_buffer, n_dim);
192 SplitStruct *UNINIT_VAR(next);
193 int UNINIT_VAR(next_node);
194 int i;
195 SplitStruct *end= node + n_entries;
196
197 if (all_size < min_size * 2)
198 {
199 return 1;
200 }
201
202 cur= node;
203 for (; cur < end; cur++)
204 {
205 cur->square= count_square(cur->coords, n_dim);
206 cur->n_node= 0;
207 }
208
209 pick_seeds(node, n_entries, &a, &b, n_dim);
210 a->n_node= 1;
211 b->n_node= 2;
212
213
214 copy_coords(g1, a->coords, n_dim);
215 size1+= key_size;
216 copy_coords(g2, b->coords, n_dim);
217 size2+= key_size;
218
219
220 for (i=n_entries - 2; i>0; --i)
221 {
222 if (all_size - (size2 + key_size) < min_size) /* Can't write into group 2 */
223 {
224 mark_all_entries(node, n_entries, 1);
225 break;
226 }
227
228 if (all_size - (size1 + key_size) < min_size) /* Can't write into group 1 */
229 {
230 mark_all_entries(node, n_entries, 2);
231 break;
232 }
233
234 pick_next(node, n_entries, g1, g2, &next, &next_node, n_dim);
235 if (next_node == 1)
236 {
237 size1+= key_size;
238 mbr_join(g1, next->coords, n_dim);
239 }
240 else
241 {
242 size2+= key_size;
243 mbr_join(g2, next->coords, n_dim);
244 }
245 next->n_node= next_node;
246 }
247
248 return 0;
249}
250
251
252/**
253 Logs key reorganization done in a split page (new page is logged elsewhere).
254
255 The effect of a split on the split page is three changes:
256 - some piece of the page move to different places inside this page (we are
257 not interested here in the pieces which move to the new page)
258 - the key is inserted into the page or not (could be in the new page)
259 - page is shrunk
260 All this is uniquely determined by a few parameters:
261 - the key (starting at 'key-nod_flag', for 'full_length' bytes
262 (maria_rtree_split_page() seems to depend on its parameters key&key_length
263 but in fact it reads more (to the left: nod_flag, and to the right:
264 full_length)
265 - the binary content of the page
266 - some variables in the share
267 - double arithmetic, which is unpredictable from machine to machine and
268 from build to build (see pick_seeds() above: it has a comparison between
269 double-s 'if (d > max_d)' so the comparison can go differently from machine
270 to machine or build to build, it has happened in real life).
271 If one day we use precision-math instead of double-math, in GIS, then the
272 last parameter would become constant accross machines and builds and we
273 could some cheap logging: just log the few parameters above.
274 Until then, we log the list of memcpy() operations (fortunately, we often do
275 not have to log the source bytes, as they can be found in the page before
276 applying the REDO; the only source bytes to log are the key), the key if it
277 was inserted into this page, and the shrinking.
278
279 @param info table
280 @param page page's offset in the file
281 @param buff content of the page (post-split)
282 @param key_with_nod_flag pointer to key-nod_flag
283 @param full_length length of (key + (nod_flag (if node) or rowid (if
284 leaf)))
285 @param log_internal_copy encoded list of mempcy() operations done on
286 split page, having their source in the page
287 @param log_internal_copy_length length of above list, in bytes
288 @param log_key_copy operation describing the key's copy, or NULL if the
289 inserted key was not put into the page (was put in
290 new page, so does not have to be logged here)
291 @param length_diff by how much the page has shrunk during split
292*/
293
294static my_bool _ma_log_rt_split(MARIA_PAGE *page,
295 const uchar *key_with_nod_flag,
296 uint full_length,
297 const uchar *log_internal_copy,
298 uint log_internal_copy_length,
299 const uchar *log_key_copy,
300 uint length_diff)
301{
302 MARIA_HA *info= page->info;
303 MARIA_SHARE *share= info->s;
304 LSN lsn;
305 uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 2 + 1 + 2 + 2 + 7],
306 *log_pos;
307 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
308 uint translog_parts, extra_length= 0;
309 my_off_t page_pos;
310 DBUG_ENTER("_ma_log_rt_split");
311 DBUG_PRINT("enter", ("page: %p", page));
312
313 DBUG_ASSERT(share->now_transactional);
314 page_pos= page->pos / share->block_size;
315 page_store(log_data + FILEID_STORE_SIZE, page_pos);
316 log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
317 log_pos[0]= KEY_OP_DEL_SUFFIX;
318 log_pos++;
319 DBUG_ASSERT((int)length_diff > 0);
320 int2store(log_pos, length_diff);
321 log_pos+= 2;
322 log_pos[0]= KEY_OP_MULTI_COPY;
323 log_pos++;
324 int2store(log_pos, full_length);
325 log_pos+= 2;
326 int2store(log_pos, log_internal_copy_length);
327 log_pos+= 2;
328 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
329 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
330 log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_internal_copy;
331 log_array[TRANSLOG_INTERNAL_PARTS + 1].length= log_internal_copy_length;
332 translog_parts= 2;
333 if (log_key_copy != NULL) /* need to store key into record */
334 {
335 log_array[TRANSLOG_INTERNAL_PARTS + 2].str= log_key_copy;
336 log_array[TRANSLOG_INTERNAL_PARTS + 2].length= 1 + 2 + 1 + 2;
337 log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_with_nod_flag;
338 log_array[TRANSLOG_INTERNAL_PARTS + 3].length= full_length;
339 extra_length= 1 + 2 + 1 + 2 + full_length;
340 translog_parts+= 2;
341 }
342
343 _ma_log_key_changes(page,
344 log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
345 log_pos, &extra_length, &translog_parts);
346 /* Remember new page length for future log entires for same page */
347 page->org_size= page->size;
348
349 if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
350 info->trn, info,
351 (translog_size_t) ((log_pos - log_data) +
352 log_internal_copy_length +
353 extra_length),
354 TRANSLOG_INTERNAL_PARTS + translog_parts,
355 log_array, log_data, NULL))
356 DBUG_RETURN(1);
357 DBUG_RETURN(0);
358}
359
360/**
361 0 ok; the created page is put into page cache; the shortened one is not (up
362 to the caller to do it)
363 1 or -1: error.
364 If new_page_offs==NULL, won't create new page (for redo phase).
365*/
366
367int maria_rtree_split_page(const MARIA_KEY *key, MARIA_PAGE *page,
368 my_off_t *new_page_offs)
369{
370 MARIA_HA *info= page->info;
371 MARIA_SHARE *share= info->s;
372 const my_bool transactional= share->now_transactional;
373 int n1, n2; /* Number of items in groups */
374 SplitStruct *task;
375 SplitStruct *cur;
376 SplitStruct *stop;
377 double *coord_buf;
378 double *next_coord;
379 int n_dim;
380 uchar *source_cur, *cur1, *cur2;
381 uchar *new_page_buff, *log_internal_copy, *log_internal_copy_ptr,
382 *log_key_copy= NULL;
383 int err_code= 0;
384 uint new_page_length;
385 uint nod_flag= page->node;
386 uint org_length= page->size;
387 uint full_length= key->data_length + (nod_flag ? nod_flag :
388 key->ref_length);
389 uint key_data_length= key->data_length;
390 int max_keys= ((org_length - share->keypage_header) / (full_length));
391 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
392 MARIA_KEYDEF *keyinfo= key->keyinfo;
393 DBUG_ENTER("maria_rtree_split_page");
394 DBUG_PRINT("rtree", ("splitting block"));
395
396 n_dim= keyinfo->keysegs / 2;
397
398 if (!(coord_buf= (double*) my_alloca(n_dim * 2 * sizeof(double) *
399 (max_keys + 1 + 4) +
400 sizeof(SplitStruct) * (max_keys + 1))))
401 DBUG_RETURN(-1); /* purecov: inspected */
402
403 task= (SplitStruct *)(coord_buf + n_dim * 2 * (max_keys + 1 + 4));
404
405 next_coord= coord_buf;
406
407 stop= task + max_keys;
408 source_cur= rt_PAGE_FIRST_KEY(share, page->buff, nod_flag);
409
410 for (cur= task;
411 cur < stop;
412 cur++, source_cur= rt_PAGE_NEXT_KEY(share, source_cur, key_data_length,
413 nod_flag))
414 {
415 cur->coords= reserve_coords(&next_coord, n_dim);
416 cur->key= source_cur;
417 maria_rtree_d_mbr(keyinfo->seg, source_cur, key_data_length, cur->coords);
418 }
419
420 cur->coords= reserve_coords(&next_coord, n_dim);
421 maria_rtree_d_mbr(keyinfo->seg, key->data, key_data_length, cur->coords);
422 cur->key= key->data;
423
424
425 if (split_maria_rtree_node(task, max_keys + 1,
426 page->size + full_length + 2,
427 full_length,
428 rt_PAGE_MIN_SIZE(keyinfo->block_length),
429 2, 2, &next_coord, n_dim))
430 {
431 err_code= 1;
432 goto split_err;
433 }
434
435 /* Allocate buffer for new page and piece of log record */
436 if (!(new_page_buff= (uchar*) my_alloca((uint)keyinfo->block_length +
437 (transactional ?
438 (max_keys * (2 + 2) +
439 1 + 2 + 1 + 2) : 0))))
440 {
441 err_code= -1;
442 goto split_err;
443 }
444 log_internal_copy= log_internal_copy_ptr= new_page_buff +
445 keyinfo->block_length;
446 bzero(new_page_buff, share->block_size);
447
448 stop= task + (max_keys + 1);
449 cur1= rt_PAGE_FIRST_KEY(share, page->buff, nod_flag);
450 cur2= rt_PAGE_FIRST_KEY(share, new_page_buff, nod_flag);
451
452 n1= n2= 0;
453 for (cur= task; cur < stop; cur++)
454 {
455 uchar *to;
456 const uchar *cur_key= cur->key;
457 my_bool log_this_change;
458 DBUG_ASSERT(log_key_copy == NULL);
459 if (cur->n_node == 1)
460 {
461 to= cur1;
462 cur1= rt_PAGE_NEXT_KEY(share, cur1, key_data_length, nod_flag);
463 n1++;
464 log_this_change= transactional;
465 }
466 else
467 {
468 to= cur2;
469 cur2= rt_PAGE_NEXT_KEY(share, cur2, key_data_length, nod_flag);
470 n2++;
471 log_this_change= FALSE;
472 }
473 if (to != cur_key)
474 {
475 uchar *to_with_nod_flag= to - nod_flag;
476 const uchar *cur_key_with_nod_flag= cur_key - nod_flag;
477 memcpy(to_with_nod_flag, cur_key_with_nod_flag, full_length);
478 if (log_this_change)
479 {
480 size_t to_with_nod_flag_offs= to_with_nod_flag - page->buff;
481 if (likely(cur_key != key->data))
482 {
483 /* this memcpy() is internal to the page (source in the page) */
484 size_t cur_key_with_nod_flag_offs= cur_key_with_nod_flag - page->buff;
485 int2store(log_internal_copy_ptr, to_with_nod_flag_offs);
486 log_internal_copy_ptr+= 2;
487 int2store(log_internal_copy_ptr, cur_key_with_nod_flag_offs);
488 log_internal_copy_ptr+= 2;
489 }
490 else
491 {
492 /* last iteration, and this involves *key: source is external */
493 log_key_copy= log_internal_copy_ptr;
494 log_key_copy[0]= KEY_OP_OFFSET;
495 int2store(log_key_copy + 1, to_with_nod_flag_offs);
496 log_key_copy[3]= KEY_OP_CHANGE;
497 int2store(log_key_copy + 4, full_length);
498 /* _ma_log_rt_split() will store *key, right after */
499 }
500 }
501 }
502 }
503 { /* verify that above loop didn't touch header bytes */
504 uint i;
505 for (i= 0; i < share->keypage_header; i++)
506 DBUG_ASSERT(new_page_buff[i]==0);
507 }
508
509 if (nod_flag)
510 _ma_store_keypage_flag(share, new_page_buff, KEYPAGE_FLAG_ISNOD);
511 _ma_store_keynr(share, new_page_buff, keyinfo->key_nr);
512 new_page_length= share->keypage_header + n2 * full_length;
513 _ma_store_page_used(share, new_page_buff, new_page_length);
514 page->size= share->keypage_header + n1 * full_length;
515 page_store_size(share, page);
516
517 if ((*new_page_offs= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
518 HA_OFFSET_ERROR)
519 err_code= -1;
520 else
521 {
522 MARIA_PAGE new_page;
523 _ma_page_setup(&new_page, info, keyinfo, *new_page_offs, new_page_buff);
524
525 if (transactional &&
526 ( /* log change to split page */
527 _ma_log_rt_split(page, key->data - nod_flag,
528 full_length, log_internal_copy,
529 (uint)(log_internal_copy_ptr - log_internal_copy),
530 log_key_copy, (uint)(org_length - page->size)) ||
531 /* and to new page */
532 _ma_log_new(&new_page, 0)))
533 err_code= -1;
534
535 if (_ma_write_keypage(&new_page, page_link->write_lock,
536 DFLT_INIT_HITS))
537 err_code= -1;
538 }
539 DBUG_PRINT("rtree", ("split new block: %lu", (ulong) *new_page_offs));
540
541 my_afree(new_page_buff);
542split_err:
543 my_afree(coord_buf);
544 DBUG_RETURN(err_code);
545}
546
547#endif /*HAVE_RTREE_KEYS*/
548