1 | /***************************************************************************** |
2 | |
3 | Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved. |
4 | Copyright (c) 2015, 2018, MariaDB Corporation. |
5 | |
6 | This program is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU General Public License as published by the Free Software |
8 | Foundation; version 2 of the License. |
9 | |
10 | This program is distributed in the hope that it will be useful, but WITHOUT |
11 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
12 | FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU General Public License along with |
15 | this program; if not, write to the Free Software Foundation, Inc., |
16 | 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA |
17 | |
18 | *****************************************************************************/ |
19 | |
20 | /**************************************************//** |
21 | @file row/row0ftsort.cc |
22 | Create Full Text Index with (parallel) merge sort |
23 | |
24 | Created 10/13/2010 Jimmy Yang |
25 | *******************************************************/ |
26 | |
27 | #include "ha_prototypes.h" |
28 | |
29 | #include "dict0dict.h" |
30 | #include "row0merge.h" |
31 | #include "pars0pars.h" |
32 | #include "row0ftsort.h" |
33 | #include "row0merge.h" |
34 | #include "row0row.h" |
35 | #include "btr0cur.h" |
36 | #include "btr0bulk.h" |
37 | #include "fts0plugin.h" |
38 | #include "log0crypt.h" |
39 | |
40 | /** Read the next record to buffer N. |
41 | @param N index into array of merge info structure */ |
42 | #define ROW_MERGE_READ_GET_NEXT(N) \ |
43 | do { \ |
44 | b[N] = row_merge_read_rec( \ |
45 | block[N], buf[N], b[N], index, \ |
46 | fd[N], &foffs[N], &mrec[N], offsets[N], \ |
47 | crypt_block[N], space); \ |
48 | if (UNIV_UNLIKELY(!b[N])) { \ |
49 | if (mrec[N]) { \ |
50 | goto exit; \ |
51 | } \ |
52 | } \ |
53 | } while (0) |
54 | |
55 | /** Parallel sort degree */ |
56 | ulong fts_sort_pll_degree = 2; |
57 | |
58 | /*********************************************************************//** |
59 | Create a temporary "fts sort index" used to merge sort the |
60 | tokenized doc string. The index has three "fields": |
61 | |
62 | 1) Tokenized word, |
63 | 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes |
64 | integer value) |
65 | 3) Word's position in original doc. |
66 | |
67 | @see fts_create_one_index_table() |
68 | |
69 | @return dict_index_t structure for the fts sort index */ |
70 | dict_index_t* |
71 | row_merge_create_fts_sort_index( |
72 | /*============================*/ |
73 | dict_index_t* index, /*!< in: Original FTS index |
74 | based on which this sort index |
75 | is created */ |
76 | dict_table_t* table, /*!< in,out: table that FTS index |
77 | is being created on */ |
78 | ibool* opt_doc_id_size) |
79 | /*!< out: whether to use 4 bytes |
80 | instead of 8 bytes integer to |
81 | store Doc ID during sort */ |
82 | { |
83 | dict_index_t* new_index; |
84 | dict_field_t* field; |
85 | dict_field_t* idx_field; |
86 | CHARSET_INFO* charset; |
87 | |
88 | // FIXME: This name shouldn't be hard coded here. |
89 | new_index = dict_mem_index_create(table, "tmp_fts_idx" , DICT_FTS, 3); |
90 | |
91 | new_index->id = index->id; |
92 | new_index->n_uniq = FTS_NUM_FIELDS_SORT; |
93 | new_index->n_def = FTS_NUM_FIELDS_SORT; |
94 | new_index->cached = TRUE; |
95 | new_index->parser = index->parser; |
96 | |
97 | idx_field = dict_index_get_nth_field(index, 0); |
98 | charset = fts_index_get_charset(index); |
99 | |
100 | /* The first field is on the Tokenized Word */ |
101 | field = dict_index_get_nth_field(new_index, 0); |
102 | field->name = NULL; |
103 | field->prefix_len = 0; |
104 | field->col = static_cast<dict_col_t*>( |
105 | mem_heap_alloc(new_index->heap, sizeof(dict_col_t))); |
106 | field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL; |
107 | field->col->mtype = charset == &my_charset_latin1 |
108 | ? DATA_VARCHAR : DATA_VARMYSQL; |
109 | field->col->mbminlen = idx_field->col->mbminlen; |
110 | field->col->mbmaxlen = idx_field->col->mbmaxlen; |
111 | field->col->len = HA_FT_MAXCHARLEN * unsigned(field->col->mbmaxlen); |
112 | |
113 | field->fixed_len = 0; |
114 | |
115 | /* Doc ID */ |
116 | field = dict_index_get_nth_field(new_index, 1); |
117 | field->name = NULL; |
118 | field->prefix_len = 0; |
119 | field->col = static_cast<dict_col_t*>( |
120 | mem_heap_alloc(new_index->heap, sizeof(dict_col_t))); |
121 | field->col->mtype = DATA_INT; |
122 | *opt_doc_id_size = FALSE; |
123 | |
124 | /* Check whether we can use 4 bytes instead of 8 bytes integer |
125 | field to hold the Doc ID, thus reduce the overall sort size */ |
126 | if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) { |
127 | /* If Doc ID column is being added by this create |
128 | index, then just check the number of rows in the table */ |
129 | if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) { |
130 | *opt_doc_id_size = TRUE; |
131 | } |
132 | } else { |
133 | doc_id_t max_doc_id; |
134 | |
135 | /* If the Doc ID column is supplied by user, then |
136 | check the maximum Doc ID in the table */ |
137 | max_doc_id = fts_get_max_doc_id((dict_table_t*) table); |
138 | |
139 | if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) { |
140 | *opt_doc_id_size = TRUE; |
141 | } |
142 | } |
143 | |
144 | if (*opt_doc_id_size) { |
145 | field->col->len = sizeof(ib_uint32_t); |
146 | field->fixed_len = sizeof(ib_uint32_t); |
147 | } else { |
148 | field->col->len = FTS_DOC_ID_LEN; |
149 | field->fixed_len = FTS_DOC_ID_LEN; |
150 | } |
151 | |
152 | field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE; |
153 | |
154 | field->col->mbminlen = 0; |
155 | field->col->mbmaxlen = 0; |
156 | |
157 | /* The third field is on the word's position in the original doc */ |
158 | field = dict_index_get_nth_field(new_index, 2); |
159 | field->name = NULL; |
160 | field->prefix_len = 0; |
161 | field->col = static_cast<dict_col_t*>( |
162 | mem_heap_alloc(new_index->heap, sizeof(dict_col_t))); |
163 | field->col->mtype = DATA_INT; |
164 | field->col->len = 4 ; |
165 | field->fixed_len = 4; |
166 | field->col->prtype = DATA_NOT_NULL; |
167 | field->col->mbminlen = 0; |
168 | field->col->mbmaxlen = 0; |
169 | |
170 | return(new_index); |
171 | } |
172 | /*********************************************************************//** |
173 | Initialize FTS parallel sort structures. |
174 | @return TRUE if all successful */ |
175 | ibool |
176 | row_fts_psort_info_init( |
177 | /*====================*/ |
178 | trx_t* trx, /*!< in: transaction */ |
179 | row_merge_dup_t* dup, /*!< in,own: descriptor of |
180 | FTS index being created */ |
181 | const dict_table_t* new_table,/*!< in: table on which indexes are |
182 | created */ |
183 | ibool opt_doc_id_size, |
184 | /*!< in: whether to use 4 bytes |
185 | instead of 8 bytes integer to |
186 | store Doc ID during sort */ |
187 | fts_psort_t** psort, /*!< out: parallel sort info to be |
188 | instantiated */ |
189 | fts_psort_t** merge) /*!< out: parallel merge info |
190 | to be instantiated */ |
191 | { |
192 | ulint i; |
193 | ulint j; |
194 | fts_psort_common_t* common_info = NULL; |
195 | fts_psort_t* psort_info = NULL; |
196 | fts_psort_t* merge_info = NULL; |
197 | ulint block_size; |
198 | ibool ret = TRUE; |
199 | bool encrypted = false; |
200 | |
201 | block_size = 3 * srv_sort_buf_size; |
202 | |
203 | *psort = psort_info = static_cast<fts_psort_t*>(ut_zalloc_nokey( |
204 | fts_sort_pll_degree * sizeof *psort_info)); |
205 | |
206 | if (!psort_info) { |
207 | ut_free(dup); |
208 | return(FALSE); |
209 | } |
210 | |
211 | /* Common Info for all sort threads */ |
212 | common_info = static_cast<fts_psort_common_t*>( |
213 | ut_malloc_nokey(sizeof *common_info)); |
214 | |
215 | if (!common_info) { |
216 | ut_free(dup); |
217 | ut_free(psort_info); |
218 | return(FALSE); |
219 | } |
220 | |
221 | common_info->dup = dup; |
222 | common_info->new_table = (dict_table_t*) new_table; |
223 | common_info->trx = trx; |
224 | common_info->all_info = psort_info; |
225 | common_info->sort_event = os_event_create(0); |
226 | common_info->merge_event = os_event_create(0); |
227 | common_info->opt_doc_id_size = opt_doc_id_size; |
228 | |
229 | if (log_tmp_is_encrypted()) { |
230 | encrypted = true; |
231 | } |
232 | |
233 | ut_ad(trx->mysql_thd != NULL); |
234 | const char* path = thd_innodb_tmpdir(trx->mysql_thd); |
235 | /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for |
236 | each parallel sort thread. Each "sort bucket" holds records for |
237 | a particular "FTS index partition" */ |
238 | for (j = 0; j < fts_sort_pll_degree; j++) { |
239 | |
240 | UT_LIST_INIT( |
241 | psort_info[j].fts_doc_list, &fts_doc_item_t::doc_list); |
242 | |
243 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
244 | |
245 | psort_info[j].merge_file[i] = |
246 | static_cast<merge_file_t*>( |
247 | ut_zalloc_nokey(sizeof(merge_file_t))); |
248 | |
249 | if (!psort_info[j].merge_file[i]) { |
250 | ret = FALSE; |
251 | goto func_exit; |
252 | } |
253 | |
254 | psort_info[j].merge_buf[i] = row_merge_buf_create( |
255 | dup->index); |
256 | |
257 | if (row_merge_file_create(psort_info[j].merge_file[i], |
258 | path) == OS_FILE_CLOSED) { |
259 | goto func_exit; |
260 | } |
261 | |
262 | /* Need to align memory for O_DIRECT write */ |
263 | psort_info[j].block_alloc[i] = |
264 | static_cast<row_merge_block_t*>(ut_malloc_nokey( |
265 | block_size + 1024)); |
266 | |
267 | psort_info[j].merge_block[i] = |
268 | static_cast<row_merge_block_t*>( |
269 | ut_align( |
270 | psort_info[j].block_alloc[i], 1024)); |
271 | |
272 | if (!psort_info[j].merge_block[i]) { |
273 | ret = FALSE; |
274 | goto func_exit; |
275 | } |
276 | |
277 | /* If tablespace is encrypted, allocate additional buffer for |
278 | encryption/decryption. */ |
279 | if (encrypted) { |
280 | |
281 | /* Need to align memory for O_DIRECT write */ |
282 | psort_info[j].crypt_alloc[i] = |
283 | static_cast<row_merge_block_t*>(ut_malloc_nokey( |
284 | block_size + 1024)); |
285 | |
286 | psort_info[j].crypt_block[i] = |
287 | static_cast<row_merge_block_t*>( |
288 | ut_align( |
289 | psort_info[j].crypt_alloc[i], 1024)); |
290 | |
291 | if (!psort_info[j].crypt_block[i]) { |
292 | ret = FALSE; |
293 | goto func_exit; |
294 | } |
295 | } else { |
296 | psort_info[j].crypt_alloc[i] = NULL; |
297 | psort_info[j].crypt_block[i] = NULL; |
298 | } |
299 | } |
300 | |
301 | psort_info[j].child_status = 0; |
302 | psort_info[j].state = 0; |
303 | psort_info[j].psort_common = common_info; |
304 | psort_info[j].error = DB_SUCCESS; |
305 | psort_info[j].memory_used = 0; |
306 | mutex_create(LATCH_ID_FTS_PLL_TOKENIZE, &psort_info[j].mutex); |
307 | } |
308 | |
309 | /* Initialize merge_info structures parallel merge and insert |
310 | into auxiliary FTS tables (FTS_INDEX_TABLE) */ |
311 | *merge = merge_info = static_cast<fts_psort_t*>( |
312 | ut_malloc_nokey(FTS_NUM_AUX_INDEX * sizeof *merge_info)); |
313 | |
314 | for (j = 0; j < FTS_NUM_AUX_INDEX; j++) { |
315 | |
316 | merge_info[j].child_status = 0; |
317 | merge_info[j].state = 0; |
318 | merge_info[j].psort_common = common_info; |
319 | } |
320 | |
321 | func_exit: |
322 | if (!ret) { |
323 | row_fts_psort_info_destroy(psort_info, merge_info); |
324 | } |
325 | |
326 | return(ret); |
327 | } |
328 | /*********************************************************************//** |
329 | Clean up and deallocate FTS parallel sort structures, and close the |
330 | merge sort files */ |
331 | void |
332 | row_fts_psort_info_destroy( |
333 | /*=======================*/ |
334 | fts_psort_t* psort_info, /*!< parallel sort info */ |
335 | fts_psort_t* merge_info) /*!< parallel merge info */ |
336 | { |
337 | ulint i; |
338 | ulint j; |
339 | |
340 | if (psort_info) { |
341 | for (j = 0; j < fts_sort_pll_degree; j++) { |
342 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
343 | if (psort_info[j].merge_file[i]) { |
344 | row_merge_file_destroy( |
345 | psort_info[j].merge_file[i]); |
346 | } |
347 | |
348 | ut_free(psort_info[j].block_alloc[i]); |
349 | ut_free(psort_info[j].merge_file[i]); |
350 | |
351 | if (psort_info[j].crypt_alloc[i]) { |
352 | ut_free(psort_info[j].crypt_alloc[i]); |
353 | } |
354 | } |
355 | |
356 | mutex_free(&psort_info[j].mutex); |
357 | } |
358 | |
359 | os_event_destroy(merge_info[0].psort_common->sort_event); |
360 | os_event_destroy(merge_info[0].psort_common->merge_event); |
361 | ut_free(merge_info[0].psort_common->dup); |
362 | ut_free(merge_info[0].psort_common); |
363 | ut_free(psort_info); |
364 | } |
365 | |
366 | ut_free(merge_info); |
367 | } |
368 | /*********************************************************************//** |
369 | Free up merge buffers when merge sort is done */ |
370 | void |
371 | row_fts_free_pll_merge_buf( |
372 | /*=======================*/ |
373 | fts_psort_t* psort_info) /*!< in: parallel sort info */ |
374 | { |
375 | ulint j; |
376 | ulint i; |
377 | |
378 | if (!psort_info) { |
379 | return; |
380 | } |
381 | |
382 | for (j = 0; j < fts_sort_pll_degree; j++) { |
383 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
384 | row_merge_buf_free(psort_info[j].merge_buf[i]); |
385 | } |
386 | } |
387 | |
388 | return; |
389 | } |
390 | |
391 | /*********************************************************************//** |
392 | FTS plugin parser 'myql_add_word' callback function for row merge. |
393 | Refer to 'st_mysql_ftparser_param' for more detail. |
394 | @return always returns 0 */ |
395 | static |
396 | int |
397 | row_merge_fts_doc_add_word_for_parser( |
398 | /*==================================*/ |
399 | MYSQL_FTPARSER_PARAM *param, /* in: parser paramter */ |
400 | const char *word, /* in: token word */ |
401 | int word_len, /* in: word len */ |
402 | MYSQL_FTPARSER_BOOLEAN_INFO* boolean_info) /* in: boolean info */ |
403 | { |
404 | fts_string_t str; |
405 | fts_tokenize_ctx_t* t_ctx; |
406 | row_fts_token_t* fts_token; |
407 | byte* ptr; |
408 | |
409 | ut_ad(param); |
410 | ut_ad(param->mysql_ftparam); |
411 | ut_ad(word); |
412 | ut_ad(boolean_info); |
413 | |
414 | t_ctx = static_cast<fts_tokenize_ctx_t*>(param->mysql_ftparam); |
415 | ut_ad(t_ctx); |
416 | |
417 | str.f_str = (byte*)(word); |
418 | str.f_len = ulint(word_len); |
419 | str.f_n_char = fts_get_token_size( |
420 | (CHARSET_INFO*)param->cs, word, ulint(word_len)); |
421 | |
422 | /* JAN: TODO: MySQL 5.7 FTS |
423 | ut_ad(boolean_info->position >= 0); |
424 | */ |
425 | |
426 | ptr = static_cast<byte*>(ut_malloc_nokey(sizeof(row_fts_token_t) |
427 | + sizeof(fts_string_t) + str.f_len)); |
428 | fts_token = reinterpret_cast<row_fts_token_t*>(ptr); |
429 | fts_token->text = reinterpret_cast<fts_string_t*>( |
430 | ptr + sizeof(row_fts_token_t)); |
431 | fts_token->text->f_str = static_cast<byte*>( |
432 | ptr + sizeof(row_fts_token_t) + sizeof(fts_string_t)); |
433 | |
434 | fts_token->text->f_len = str.f_len; |
435 | fts_token->text->f_n_char = str.f_n_char; |
436 | memcpy(fts_token->text->f_str, str.f_str, str.f_len); |
437 | |
438 | /* JAN: TODO: MySQL 5.7 FTS |
439 | fts_token->position = boolean_info->position; |
440 | */ |
441 | |
442 | /* Add token to list */ |
443 | UT_LIST_ADD_LAST(t_ctx->fts_token_list, fts_token); |
444 | |
445 | return(0); |
446 | } |
447 | |
448 | /*********************************************************************//** |
449 | Tokenize by fts plugin parser */ |
450 | static |
451 | void |
452 | row_merge_fts_doc_tokenize_by_parser( |
453 | /*=================================*/ |
454 | fts_doc_t* doc, /* in: doc to tokenize */ |
455 | st_mysql_ftparser* parser, /* in: plugin parser instance */ |
456 | fts_tokenize_ctx_t* t_ctx) /* in/out: tokenize ctx instance */ |
457 | { |
458 | MYSQL_FTPARSER_PARAM param; |
459 | |
460 | ut_a(parser); |
461 | |
462 | /* Set paramters for param */ |
463 | param.mysql_parse = fts_tokenize_document_internal; |
464 | param.mysql_add_word = row_merge_fts_doc_add_word_for_parser; |
465 | param.mysql_ftparam = t_ctx; |
466 | param.cs = doc->charset; |
467 | param.doc = reinterpret_cast<char*>(doc->text.f_str); |
468 | param.length = static_cast<int>(doc->text.f_len); |
469 | param.mode= MYSQL_FTPARSER_SIMPLE_MODE; |
470 | |
471 | PARSER_INIT(parser, ¶m); |
472 | /* We assume parse returns successfully here. */ |
473 | parser->parse(¶m); |
474 | PARSER_DEINIT(parser, ¶m); |
475 | } |
476 | |
477 | /*********************************************************************//** |
478 | Tokenize incoming text data and add to the sort buffer. |
479 | @see row_merge_buf_encode() |
480 | @return TRUE if the record passed, FALSE if out of space */ |
481 | static |
482 | ibool |
483 | row_merge_fts_doc_tokenize( |
484 | /*=======================*/ |
485 | row_merge_buf_t** sort_buf, /*!< in/out: sort buffer */ |
486 | doc_id_t doc_id, /*!< in: Doc ID */ |
487 | fts_doc_t* doc, /*!< in: Doc to be tokenized */ |
488 | merge_file_t** merge_file, /*!< in/out: merge file */ |
489 | ibool opt_doc_id_size,/*!< in: whether to use 4 bytes |
490 | instead of 8 bytes integer to |
491 | store Doc ID during sort*/ |
492 | fts_tokenize_ctx_t* t_ctx) /*!< in/out: tokenize context */ |
493 | { |
494 | ulint inc = 0; |
495 | fts_string_t str; |
496 | ulint len; |
497 | row_merge_buf_t* buf; |
498 | dfield_t* field; |
499 | fts_string_t t_str; |
500 | ibool buf_full = FALSE; |
501 | byte str_buf[FTS_MAX_WORD_LEN + 1]; |
502 | ulint data_size[FTS_NUM_AUX_INDEX]; |
503 | ulint n_tuple[FTS_NUM_AUX_INDEX]; |
504 | st_mysql_ftparser* parser; |
505 | |
506 | t_str.f_n_char = 0; |
507 | t_ctx->buf_used = 0; |
508 | |
509 | memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint)); |
510 | memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint)); |
511 | |
512 | parser = sort_buf[0]->index->parser; |
513 | |
514 | /* Tokenize the data and add each word string, its corresponding |
515 | doc id and position to sort buffer */ |
516 | while (t_ctx->processed_len < doc->text.f_len) { |
517 | ulint idx = 0; |
518 | ulint cur_len; |
519 | doc_id_t write_doc_id; |
520 | row_fts_token_t* fts_token = NULL; |
521 | |
522 | if (parser != NULL) { |
523 | if (t_ctx->processed_len == 0) { |
524 | UT_LIST_INIT(t_ctx->fts_token_list, &row_fts_token_t::token_list); |
525 | |
526 | /* Parse the whole doc and cache tokens */ |
527 | row_merge_fts_doc_tokenize_by_parser(doc, |
528 | parser, t_ctx); |
529 | |
530 | /* Just indictate we have parsed all the word */ |
531 | t_ctx->processed_len += 1; |
532 | } |
533 | |
534 | /* Then get a token */ |
535 | fts_token = UT_LIST_GET_FIRST(t_ctx->fts_token_list); |
536 | if (fts_token) { |
537 | str.f_len = fts_token->text->f_len; |
538 | str.f_n_char = fts_token->text->f_n_char; |
539 | str.f_str = fts_token->text->f_str; |
540 | } else { |
541 | ut_ad(UT_LIST_GET_LEN(t_ctx->fts_token_list) == 0); |
542 | /* Reach the end of the list */ |
543 | t_ctx->processed_len = doc->text.f_len; |
544 | break; |
545 | } |
546 | } else { |
547 | inc = innobase_mysql_fts_get_token( |
548 | doc->charset, |
549 | doc->text.f_str + t_ctx->processed_len, |
550 | doc->text.f_str + doc->text.f_len, &str); |
551 | |
552 | ut_a(inc > 0); |
553 | } |
554 | |
555 | /* Ignore string whose character number is less than |
556 | "fts_min_token_size" or more than "fts_max_token_size" */ |
557 | if (!fts_check_token(&str, NULL, NULL)) { |
558 | if (parser != NULL) { |
559 | UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token); |
560 | ut_free(fts_token); |
561 | } else { |
562 | t_ctx->processed_len += inc; |
563 | } |
564 | |
565 | continue; |
566 | } |
567 | |
568 | t_str.f_len = innobase_fts_casedn_str( |
569 | doc->charset, (char*) str.f_str, str.f_len, |
570 | (char*) &str_buf, FTS_MAX_WORD_LEN + 1); |
571 | |
572 | t_str.f_str = (byte*) &str_buf; |
573 | |
574 | /* if "cached_stopword" is defined, ignore words in the |
575 | stopword list */ |
576 | if (!fts_check_token(&str, t_ctx->cached_stopword, |
577 | doc->charset)) { |
578 | if (parser != NULL) { |
579 | UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token); |
580 | ut_free(fts_token); |
581 | } else { |
582 | t_ctx->processed_len += inc; |
583 | } |
584 | |
585 | continue; |
586 | } |
587 | |
588 | /* There are FTS_NUM_AUX_INDEX auxiliary tables, find |
589 | out which sort buffer to put this word record in */ |
590 | t_ctx->buf_used = fts_select_index( |
591 | doc->charset, t_str.f_str, t_str.f_len); |
592 | |
593 | buf = sort_buf[t_ctx->buf_used]; |
594 | |
595 | ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX); |
596 | idx = t_ctx->buf_used; |
597 | |
598 | mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]]; |
599 | |
600 | field = mtuple->fields = static_cast<dfield_t*>( |
601 | mem_heap_alloc(buf->heap, |
602 | FTS_NUM_FIELDS_SORT * sizeof *field)); |
603 | |
604 | /* The first field is the tokenized word */ |
605 | dfield_set_data(field, t_str.f_str, t_str.f_len); |
606 | len = dfield_get_len(field); |
607 | |
608 | dict_col_copy_type(dict_index_get_nth_col(buf->index, 0), &field->type); |
609 | field->type.prtype |= DATA_NOT_NULL; |
610 | ut_ad(len <= field->type.len); |
611 | |
612 | /* For the temporary file, row_merge_buf_encode() uses |
613 | 1 byte for representing the number of extra_size bytes. |
614 | This number will always be 1, because for this 3-field index |
615 | consisting of one variable-size column, extra_size will always |
616 | be 1 or 2, which can be encoded in one byte. |
617 | |
618 | The extra_size is 1 byte if the length of the |
619 | variable-length column is less than 128 bytes or the |
620 | maximum length is less than 256 bytes. */ |
621 | |
622 | /* One variable length column, word with its lenght less than |
623 | fts_max_token_size, add one extra size and one extra byte. |
624 | |
625 | Since the max length for FTS token now is larger than 255, |
626 | so we will need to signify length byte itself, so only 1 to 128 |
627 | bytes can be used for 1 bytes, larger than that 2 bytes. */ |
628 | if (len < 128 || field->type.len < 256) { |
629 | /* Extra size is one byte. */ |
630 | cur_len = 2 + len; |
631 | } else { |
632 | /* Extra size is two bytes. */ |
633 | cur_len = 3 + len; |
634 | } |
635 | |
636 | dfield_dup(field, buf->heap); |
637 | field++; |
638 | |
639 | /* The second field is the Doc ID */ |
640 | |
641 | ib_uint32_t doc_id_32_bit; |
642 | |
643 | if (!opt_doc_id_size) { |
644 | fts_write_doc_id((byte*) &write_doc_id, doc_id); |
645 | |
646 | dfield_set_data( |
647 | field, &write_doc_id, sizeof(write_doc_id)); |
648 | } else { |
649 | mach_write_to_4( |
650 | (byte*) &doc_id_32_bit, (ib_uint32_t) doc_id); |
651 | |
652 | dfield_set_data( |
653 | field, &doc_id_32_bit, sizeof(doc_id_32_bit)); |
654 | } |
655 | |
656 | len = field->len; |
657 | ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t)); |
658 | |
659 | field->type.mtype = DATA_INT; |
660 | field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE; |
661 | field->type.len = len; |
662 | field->type.mbminlen = 0; |
663 | field->type.mbmaxlen = 0; |
664 | |
665 | cur_len += len; |
666 | dfield_dup(field, buf->heap); |
667 | |
668 | ++field; |
669 | |
670 | /* The third field is the position. |
671 | MySQL 5.7 changed the fulltext parser plugin interface |
672 | by adding MYSQL_FTPARSER_BOOLEAN_INFO::position. |
673 | Below we assume that the field is always 0. */ |
674 | ulint pos = t_ctx->init_pos; |
675 | byte position[4]; |
676 | if (parser == NULL) { |
677 | pos += t_ctx->processed_len + inc - str.f_len; |
678 | } |
679 | len = 4; |
680 | mach_write_to_4(position, pos); |
681 | dfield_set_data(field, &position, len); |
682 | |
683 | field->type.mtype = DATA_INT; |
684 | field->type.prtype = DATA_NOT_NULL; |
685 | field->type.len = len; |
686 | field->type.mbminlen = 0; |
687 | field->type.mbmaxlen = 0; |
688 | cur_len += len; |
689 | dfield_dup(field, buf->heap); |
690 | |
691 | /* Reserve one byte for the end marker of row_merge_block_t */ |
692 | if (buf->total_size + data_size[idx] + cur_len |
693 | >= srv_sort_buf_size - 1) { |
694 | |
695 | buf_full = TRUE; |
696 | break; |
697 | } |
698 | |
699 | /* Increment the number of tuples */ |
700 | n_tuple[idx]++; |
701 | if (parser != NULL) { |
702 | UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token); |
703 | ut_free(fts_token); |
704 | } else { |
705 | t_ctx->processed_len += inc; |
706 | } |
707 | data_size[idx] += cur_len; |
708 | } |
709 | |
710 | /* Update the data length and the number of new word tuples |
711 | added in this round of tokenization */ |
712 | for (ulint i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
713 | /* The computation of total_size below assumes that no |
714 | delete-mark flags will be stored and that all fields |
715 | are NOT NULL and fixed-length. */ |
716 | |
717 | sort_buf[i]->total_size += data_size[i]; |
718 | |
719 | sort_buf[i]->n_tuples += n_tuple[i]; |
720 | |
721 | merge_file[i]->n_rec += n_tuple[i]; |
722 | t_ctx->rows_added[i] += n_tuple[i]; |
723 | } |
724 | |
725 | if (!buf_full) { |
726 | /* we pad one byte between text accross two fields */ |
727 | t_ctx->init_pos += doc->text.f_len + 1; |
728 | } |
729 | |
730 | return(!buf_full); |
731 | } |
732 | |
733 | /*********************************************************************//** |
734 | Get next doc item from fts_doc_list */ |
735 | UNIV_INLINE |
736 | void |
737 | row_merge_fts_get_next_doc_item( |
738 | /*============================*/ |
739 | fts_psort_t* psort_info, /*!< in: psort_info */ |
740 | fts_doc_item_t** doc_item) /*!< in/out: doc item */ |
741 | { |
742 | if (*doc_item != NULL) { |
743 | ut_free(*doc_item); |
744 | } |
745 | |
746 | mutex_enter(&psort_info->mutex); |
747 | |
748 | *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list); |
749 | if (*doc_item != NULL) { |
750 | UT_LIST_REMOVE(psort_info->fts_doc_list, *doc_item); |
751 | |
752 | ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t) |
753 | + (*doc_item)->field->len); |
754 | psort_info->memory_used -= sizeof(fts_doc_item_t) |
755 | + (*doc_item)->field->len; |
756 | } |
757 | |
758 | mutex_exit(&psort_info->mutex); |
759 | } |
760 | |
761 | /*********************************************************************//** |
762 | Function performs parallel tokenization of the incoming doc strings. |
763 | It also performs the initial in memory sort of the parsed records. |
764 | @return OS_THREAD_DUMMY_RETURN */ |
765 | static |
766 | os_thread_ret_t |
767 | DECLARE_THREAD(fts_parallel_tokenization)( |
768 | /*======================*/ |
769 | void* arg) /*!< in: psort_info for the thread */ |
770 | { |
771 | fts_psort_t* psort_info = (fts_psort_t*) arg; |
772 | ulint i; |
773 | fts_doc_item_t* doc_item = NULL; |
774 | row_merge_buf_t** buf; |
775 | ibool processed = FALSE; |
776 | merge_file_t** merge_file; |
777 | row_merge_block_t** block; |
778 | row_merge_block_t** crypt_block; |
779 | pfs_os_file_t tmpfd[FTS_NUM_AUX_INDEX]; |
780 | ulint mycount[FTS_NUM_AUX_INDEX]; |
781 | ib_uint64_t total_rec = 0; |
782 | ulint num_doc_processed = 0; |
783 | doc_id_t last_doc_id = 0; |
784 | mem_heap_t* blob_heap = NULL; |
785 | fts_doc_t doc; |
786 | dict_table_t* table = psort_info->psort_common->new_table; |
787 | fts_tokenize_ctx_t t_ctx; |
788 | ulint retried = 0; |
789 | dberr_t error = DB_SUCCESS; |
790 | |
791 | ut_ad(psort_info->psort_common->trx->mysql_thd != NULL); |
792 | |
793 | /* const char* path = thd_innodb_tmpdir( |
794 | psort_info->psort_common->trx->mysql_thd); |
795 | */ |
796 | |
797 | ut_ad(psort_info->psort_common->trx->mysql_thd != NULL); |
798 | |
799 | const char* path = thd_innodb_tmpdir( |
800 | psort_info->psort_common->trx->mysql_thd); |
801 | |
802 | ut_ad(psort_info); |
803 | |
804 | buf = psort_info->merge_buf; |
805 | merge_file = psort_info->merge_file; |
806 | blob_heap = mem_heap_create(512); |
807 | memset(&doc, 0, sizeof(doc)); |
808 | memset(&t_ctx, 0, sizeof(t_ctx)); |
809 | memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int)); |
810 | |
811 | doc.charset = fts_index_get_charset( |
812 | psort_info->psort_common->dup->index); |
813 | |
814 | block = psort_info->merge_block; |
815 | crypt_block = psort_info->crypt_block; |
816 | |
817 | const page_size_t& page_size = dict_table_page_size(table); |
818 | |
819 | row_merge_fts_get_next_doc_item(psort_info, &doc_item); |
820 | |
821 | t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword; |
822 | processed = TRUE; |
823 | loop: |
824 | while (doc_item) { |
825 | dfield_t* dfield = doc_item->field; |
826 | |
827 | last_doc_id = doc_item->doc_id; |
828 | |
829 | ut_ad (dfield->data != NULL |
830 | && dfield_get_len(dfield) != UNIV_SQL_NULL); |
831 | |
832 | /* If finish processing the last item, update "doc" with |
833 | strings in the doc_item, otherwise continue processing last |
834 | item */ |
835 | if (processed) { |
836 | byte* data; |
837 | ulint data_len; |
838 | |
839 | dfield = doc_item->field; |
840 | data = static_cast<byte*>(dfield_get_data(dfield)); |
841 | data_len = dfield_get_len(dfield); |
842 | |
843 | if (dfield_is_ext(dfield)) { |
844 | doc.text.f_str = |
845 | btr_copy_externally_stored_field( |
846 | &doc.text.f_len, data, |
847 | page_size, data_len, blob_heap); |
848 | } else { |
849 | doc.text.f_str = data; |
850 | doc.text.f_len = data_len; |
851 | } |
852 | |
853 | doc.tokens = 0; |
854 | t_ctx.processed_len = 0; |
855 | } else { |
856 | /* Not yet finish processing the "doc" on hand, |
857 | continue processing it */ |
858 | ut_ad(doc.text.f_str); |
859 | ut_ad(t_ctx.processed_len < doc.text.f_len); |
860 | } |
861 | |
862 | processed = row_merge_fts_doc_tokenize( |
863 | buf, doc_item->doc_id, &doc, |
864 | merge_file, psort_info->psort_common->opt_doc_id_size, |
865 | &t_ctx); |
866 | |
867 | /* Current sort buffer full, need to recycle */ |
868 | if (!processed) { |
869 | ut_ad(t_ctx.processed_len < doc.text.f_len); |
870 | ut_ad(t_ctx.rows_added[t_ctx.buf_used]); |
871 | break; |
872 | } |
873 | |
874 | num_doc_processed++; |
875 | |
876 | if (fts_enable_diag_print && num_doc_processed % 10000 == 1) { |
877 | ib::info() << "Number of documents processed: " |
878 | << num_doc_processed; |
879 | #ifdef FTS_INTERNAL_DIAG_PRINT |
880 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
881 | ib::info() << "ID " << psort_info->psort_id |
882 | << ", partition " << i << ", word " |
883 | << mycount[i]; |
884 | } |
885 | #endif |
886 | } |
887 | |
888 | mem_heap_empty(blob_heap); |
889 | |
890 | row_merge_fts_get_next_doc_item(psort_info, &doc_item); |
891 | |
892 | if (doc_item && last_doc_id != doc_item->doc_id) { |
893 | t_ctx.init_pos = 0; |
894 | } |
895 | } |
896 | |
897 | /* If we run out of current sort buffer, need to sort |
898 | and flush the sort buffer to disk */ |
899 | if (t_ctx.rows_added[t_ctx.buf_used] && !processed) { |
900 | row_merge_buf_sort(buf[t_ctx.buf_used], NULL); |
901 | row_merge_buf_write(buf[t_ctx.buf_used], |
902 | merge_file[t_ctx.buf_used], |
903 | block[t_ctx.buf_used]); |
904 | |
905 | if (!row_merge_write(merge_file[t_ctx.buf_used]->fd, |
906 | merge_file[t_ctx.buf_used]->offset++, |
907 | block[t_ctx.buf_used], |
908 | crypt_block[t_ctx.buf_used], |
909 | table->space->id)) { |
910 | error = DB_TEMP_FILE_WRITE_FAIL; |
911 | goto func_exit; |
912 | } |
913 | |
914 | UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size); |
915 | buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]); |
916 | mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used]; |
917 | t_ctx.rows_added[t_ctx.buf_used] = 0; |
918 | |
919 | ut_a(doc_item); |
920 | goto loop; |
921 | } |
922 | |
923 | /* Parent done scanning, and if finish processing all the docs, exit */ |
924 | if (psort_info->state == FTS_PARENT_COMPLETE) { |
925 | if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) { |
926 | goto exit; |
927 | } else if (retried > 10000) { |
928 | ut_ad(!doc_item); |
929 | /* retried too many times and cannot get new record */ |
930 | ib::error() << "FTS parallel sort processed " |
931 | << num_doc_processed |
932 | << " records, the sort queue has " |
933 | << UT_LIST_GET_LEN(psort_info->fts_doc_list) |
934 | << " records. But sort cannot get the next" |
935 | " records" ; |
936 | goto exit; |
937 | } |
938 | } else if (psort_info->state == FTS_PARENT_EXITING) { |
939 | /* Parent abort */ |
940 | goto func_exit; |
941 | } |
942 | |
943 | if (doc_item == NULL) { |
944 | os_thread_yield(); |
945 | } |
946 | |
947 | row_merge_fts_get_next_doc_item(psort_info, &doc_item); |
948 | |
949 | if (doc_item != NULL) { |
950 | if (last_doc_id != doc_item->doc_id) { |
951 | t_ctx.init_pos = 0; |
952 | } |
953 | |
954 | retried = 0; |
955 | } else if (psort_info->state == FTS_PARENT_COMPLETE) { |
956 | retried++; |
957 | } |
958 | |
959 | goto loop; |
960 | |
961 | exit: |
962 | /* Do a final sort of the last (or latest) batch of records |
963 | in block memory. Flush them to temp file if records cannot |
964 | be hold in one block memory */ |
965 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
966 | if (t_ctx.rows_added[i]) { |
967 | row_merge_buf_sort(buf[i], NULL); |
968 | row_merge_buf_write( |
969 | buf[i], merge_file[i], block[i]); |
970 | |
971 | /* Write to temp file, only if records have |
972 | been flushed to temp file before (offset > 0): |
973 | The pseudo code for sort is following: |
974 | |
975 | while (there are rows) { |
976 | tokenize rows, put result in block[] |
977 | if (block[] runs out) { |
978 | sort rows; |
979 | write to temp file with |
980 | row_merge_write(); |
981 | offset++; |
982 | } |
983 | } |
984 | |
985 | # write out the last batch |
986 | if (offset > 0) { |
987 | row_merge_write(); |
988 | offset++; |
989 | } else { |
990 | # no need to write anything |
991 | offset stay as 0 |
992 | } |
993 | |
994 | so if merge_file[i]->offset is 0 when we come to |
995 | here as the last batch, this means rows have |
996 | never flush to temp file, it can be held all in |
997 | memory */ |
998 | if (merge_file[i]->offset != 0) { |
999 | if (!row_merge_write(merge_file[i]->fd, |
1000 | merge_file[i]->offset++, |
1001 | block[i], |
1002 | crypt_block[i], |
1003 | table->space->id)) { |
1004 | error = DB_TEMP_FILE_WRITE_FAIL; |
1005 | goto func_exit; |
1006 | } |
1007 | |
1008 | UNIV_MEM_INVALID(block[i][0], |
1009 | srv_sort_buf_size); |
1010 | |
1011 | if (crypt_block[i]) { |
1012 | UNIV_MEM_INVALID(crypt_block[i][0], |
1013 | srv_sort_buf_size); |
1014 | } |
1015 | } |
1016 | |
1017 | buf[i] = row_merge_buf_empty(buf[i]); |
1018 | t_ctx.rows_added[i] = 0; |
1019 | } |
1020 | } |
1021 | |
1022 | if (fts_enable_diag_print) { |
1023 | DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: start merge sort\n" ); |
1024 | } |
1025 | |
1026 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
1027 | if (!merge_file[i]->offset) { |
1028 | continue; |
1029 | } |
1030 | |
1031 | tmpfd[i] = row_merge_file_create_low(path); |
1032 | if (tmpfd[i] == OS_FILE_CLOSED) { |
1033 | error = DB_OUT_OF_MEMORY; |
1034 | goto func_exit; |
1035 | } |
1036 | |
1037 | error = row_merge_sort(psort_info->psort_common->trx, |
1038 | psort_info->psort_common->dup, |
1039 | merge_file[i], block[i], &tmpfd[i], |
1040 | false, 0.0/* pct_progress */, 0.0/* pct_cost */, |
1041 | crypt_block[i], table->space->id); |
1042 | |
1043 | if (error != DB_SUCCESS) { |
1044 | os_file_close(tmpfd[i]); |
1045 | goto func_exit; |
1046 | } |
1047 | |
1048 | total_rec += merge_file[i]->n_rec; |
1049 | os_file_close(tmpfd[i]); |
1050 | } |
1051 | |
1052 | func_exit: |
1053 | if (fts_enable_diag_print) { |
1054 | DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n" ); |
1055 | } |
1056 | |
1057 | mem_heap_free(blob_heap); |
1058 | |
1059 | mutex_enter(&psort_info->mutex); |
1060 | psort_info->error = error; |
1061 | mutex_exit(&psort_info->mutex); |
1062 | |
1063 | if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) { |
1064 | /* child can exit either with error or told by parent. */ |
1065 | ut_ad(error != DB_SUCCESS |
1066 | || psort_info->state == FTS_PARENT_EXITING); |
1067 | } |
1068 | |
1069 | /* Free fts doc list in case of error. */ |
1070 | do { |
1071 | row_merge_fts_get_next_doc_item(psort_info, &doc_item); |
1072 | } while (doc_item != NULL); |
1073 | |
1074 | psort_info->child_status = FTS_CHILD_COMPLETE; |
1075 | os_event_set(psort_info->psort_common->sort_event); |
1076 | psort_info->child_status = FTS_CHILD_EXITING; |
1077 | |
1078 | os_thread_exit(); |
1079 | |
1080 | OS_THREAD_DUMMY_RETURN; |
1081 | } |
1082 | |
1083 | /*********************************************************************//** |
1084 | Start the parallel tokenization and parallel merge sort */ |
1085 | void |
1086 | row_fts_start_psort( |
1087 | /*================*/ |
1088 | fts_psort_t* psort_info) /*!< parallel sort structure */ |
1089 | { |
1090 | ulint i = 0; |
1091 | os_thread_id_t thd_id; |
1092 | |
1093 | for (i = 0; i < fts_sort_pll_degree; i++) { |
1094 | psort_info[i].psort_id = i; |
1095 | psort_info[i].thread_hdl = |
1096 | os_thread_create(fts_parallel_tokenization, |
1097 | (void*) &psort_info[i], |
1098 | &thd_id); |
1099 | } |
1100 | } |
1101 | |
1102 | /*********************************************************************//** |
1103 | Function performs the merge and insertion of the sorted records. |
1104 | @return OS_THREAD_DUMMY_RETURN */ |
1105 | static |
1106 | os_thread_ret_t |
1107 | DECLARE_THREAD(fts_parallel_merge)( |
1108 | /*===============*/ |
1109 | void* arg) /*!< in: parallel merge info */ |
1110 | { |
1111 | fts_psort_t* psort_info = (fts_psort_t*) arg; |
1112 | ulint id; |
1113 | |
1114 | ut_ad(psort_info); |
1115 | |
1116 | id = psort_info->psort_id; |
1117 | |
1118 | row_fts_merge_insert(psort_info->psort_common->dup->index, |
1119 | psort_info->psort_common->new_table, |
1120 | psort_info->psort_common->all_info, id); |
1121 | |
1122 | psort_info->child_status = FTS_CHILD_COMPLETE; |
1123 | os_event_set(psort_info->psort_common->merge_event); |
1124 | psort_info->child_status = FTS_CHILD_EXITING; |
1125 | |
1126 | os_thread_exit(false); |
1127 | |
1128 | OS_THREAD_DUMMY_RETURN; |
1129 | } |
1130 | |
1131 | /*********************************************************************//** |
1132 | Kick off the parallel merge and insert thread */ |
1133 | void |
1134 | row_fts_start_parallel_merge( |
1135 | /*=========================*/ |
1136 | fts_psort_t* merge_info) /*!< in: parallel sort info */ |
1137 | { |
1138 | ulint i = 0; |
1139 | |
1140 | /* Kick off merge/insert threads */ |
1141 | for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { |
1142 | merge_info[i].psort_id = i; |
1143 | merge_info[i].child_status = 0; |
1144 | |
1145 | merge_info[i].thread_hdl = os_thread_create( |
1146 | fts_parallel_merge, |
1147 | (void*) &merge_info[i], |
1148 | &merge_info[i].thread_hdl); |
1149 | } |
1150 | } |
1151 | |
1152 | /** |
1153 | Write out a single word's data as new entry/entries in the INDEX table. |
1154 | @param[in] ins_ctx insert context |
1155 | @param[in] word word string |
1156 | @param[in] node node colmns |
1157 | @return DB_SUCCUESS if insertion runs fine, otherwise error code */ |
1158 | static |
1159 | dberr_t |
1160 | row_merge_write_fts_node( |
1161 | const fts_psort_insert_t* ins_ctx, |
1162 | const fts_string_t* word, |
1163 | const fts_node_t* node) |
1164 | { |
1165 | dtuple_t* tuple; |
1166 | dfield_t* field; |
1167 | dberr_t ret = DB_SUCCESS; |
1168 | doc_id_t write_first_doc_id[8]; |
1169 | doc_id_t write_last_doc_id[8]; |
1170 | ib_uint32_t write_doc_count; |
1171 | |
1172 | tuple = ins_ctx->tuple; |
1173 | |
1174 | /* The first field is the tokenized word */ |
1175 | field = dtuple_get_nth_field(tuple, 0); |
1176 | dfield_set_data(field, word->f_str, word->f_len); |
1177 | |
1178 | /* The second field is first_doc_id */ |
1179 | field = dtuple_get_nth_field(tuple, 1); |
1180 | fts_write_doc_id((byte*)&write_first_doc_id, node->first_doc_id); |
1181 | dfield_set_data(field, &write_first_doc_id, sizeof(doc_id_t)); |
1182 | |
1183 | /* The third and fourth fileds(TRX_ID, ROLL_PTR) are filled already.*/ |
1184 | /* The fifth field is last_doc_id */ |
1185 | field = dtuple_get_nth_field(tuple, 4); |
1186 | fts_write_doc_id((byte*)&write_last_doc_id, node->last_doc_id); |
1187 | dfield_set_data(field, &write_last_doc_id, sizeof(doc_id_t)); |
1188 | |
1189 | /* The sixth field is doc_count */ |
1190 | field = dtuple_get_nth_field(tuple, 5); |
1191 | mach_write_to_4((byte*)&write_doc_count, (ib_uint32_t)node->doc_count); |
1192 | dfield_set_data(field, &write_doc_count, sizeof(ib_uint32_t)); |
1193 | |
1194 | /* The seventh field is ilist */ |
1195 | field = dtuple_get_nth_field(tuple, 6); |
1196 | dfield_set_data(field, node->ilist, node->ilist_size); |
1197 | |
1198 | ret = ins_ctx->btr_bulk->insert(tuple); |
1199 | |
1200 | return(ret); |
1201 | } |
1202 | |
1203 | /********************************************************************//** |
1204 | Insert processed FTS data to auxillary index tables. |
1205 | @return DB_SUCCESS if insertion runs fine */ |
1206 | static MY_ATTRIBUTE((nonnull)) |
1207 | dberr_t |
1208 | row_merge_write_fts_word( |
1209 | /*=====================*/ |
1210 | fts_psort_insert_t* ins_ctx, /*!< in: insert context */ |
1211 | fts_tokenizer_word_t* word) /*!< in: sorted and tokenized |
1212 | word */ |
1213 | { |
1214 | dberr_t ret = DB_SUCCESS; |
1215 | |
1216 | ut_ad(ins_ctx->aux_index_id == fts_select_index( |
1217 | ins_ctx->charset, word->text.f_str, word->text.f_len)); |
1218 | |
1219 | /* Pop out each fts_node in word->nodes write them to auxiliary table */ |
1220 | for (ulint i = 0; i < ib_vector_size(word->nodes); i++) { |
1221 | dberr_t error; |
1222 | fts_node_t* fts_node; |
1223 | |
1224 | fts_node = static_cast<fts_node_t*>(ib_vector_get(word->nodes, i)); |
1225 | |
1226 | error = row_merge_write_fts_node(ins_ctx, &word->text, fts_node); |
1227 | |
1228 | if (error != DB_SUCCESS) { |
1229 | ib::error() << "Failed to write word " |
1230 | << word->text.f_str << " to FTS auxiliary" |
1231 | " index table, error (" << ut_strerr(error) |
1232 | << ")" ; |
1233 | ret = error; |
1234 | } |
1235 | |
1236 | ut_free(fts_node->ilist); |
1237 | fts_node->ilist = NULL; |
1238 | } |
1239 | |
1240 | ib_vector_reset(word->nodes); |
1241 | |
1242 | return(ret); |
1243 | } |
1244 | |
1245 | /*********************************************************************//** |
1246 | Read sorted FTS data files and insert data tuples to auxillary tables. |
1247 | @return DB_SUCCESS or error number */ |
1248 | static |
1249 | void |
1250 | row_fts_insert_tuple( |
1251 | /*=================*/ |
1252 | fts_psort_insert_t* |
1253 | ins_ctx, /*!< in: insert context */ |
1254 | fts_tokenizer_word_t* word, /*!< in: last processed |
1255 | tokenized word */ |
1256 | ib_vector_t* positions, /*!< in: word position */ |
1257 | doc_id_t* in_doc_id, /*!< in: last item doc id */ |
1258 | dtuple_t* dtuple) /*!< in: entry to insert */ |
1259 | { |
1260 | fts_node_t* fts_node = NULL; |
1261 | dfield_t* dfield; |
1262 | doc_id_t doc_id; |
1263 | ulint position; |
1264 | fts_string_t token_word; |
1265 | ulint i; |
1266 | |
1267 | /* Get fts_node for the FTS auxillary INDEX table */ |
1268 | if (ib_vector_size(word->nodes) > 0) { |
1269 | fts_node = static_cast<fts_node_t*>( |
1270 | ib_vector_last(word->nodes)); |
1271 | } |
1272 | |
1273 | if (fts_node == NULL |
1274 | || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) { |
1275 | |
1276 | fts_node = static_cast<fts_node_t*>( |
1277 | ib_vector_push(word->nodes, NULL)); |
1278 | |
1279 | memset(fts_node, 0x0, sizeof(*fts_node)); |
1280 | } |
1281 | |
1282 | /* If dtuple == NULL, this is the last word to be processed */ |
1283 | if (!dtuple) { |
1284 | if (fts_node && ib_vector_size(positions) > 0) { |
1285 | fts_cache_node_add_positions( |
1286 | NULL, fts_node, *in_doc_id, |
1287 | positions); |
1288 | |
1289 | /* Write out the current word */ |
1290 | row_merge_write_fts_word(ins_ctx, word); |
1291 | } |
1292 | |
1293 | return; |
1294 | } |
1295 | |
1296 | /* Get the first field for the tokenized word */ |
1297 | dfield = dtuple_get_nth_field(dtuple, 0); |
1298 | |
1299 | token_word.f_n_char = 0; |
1300 | token_word.f_len = dfield->len; |
1301 | token_word.f_str = static_cast<byte*>(dfield_get_data(dfield)); |
1302 | |
1303 | if (!word->text.f_str) { |
1304 | fts_string_dup(&word->text, &token_word, ins_ctx->heap); |
1305 | } |
1306 | |
1307 | /* compare to the last word, to see if they are the same |
1308 | word */ |
1309 | if (innobase_fts_text_cmp(ins_ctx->charset, |
1310 | &word->text, &token_word) != 0) { |
1311 | ulint num_item; |
1312 | |
1313 | /* Getting a new word, flush the last position info |
1314 | for the currnt word in fts_node */ |
1315 | if (ib_vector_size(positions) > 0) { |
1316 | fts_cache_node_add_positions( |
1317 | NULL, fts_node, *in_doc_id, positions); |
1318 | } |
1319 | |
1320 | /* Write out the current word */ |
1321 | row_merge_write_fts_word(ins_ctx, word); |
1322 | |
1323 | /* Copy the new word */ |
1324 | fts_string_dup(&word->text, &token_word, ins_ctx->heap); |
1325 | |
1326 | num_item = ib_vector_size(positions); |
1327 | |
1328 | /* Clean up position queue */ |
1329 | for (i = 0; i < num_item; i++) { |
1330 | ib_vector_pop(positions); |
1331 | } |
1332 | |
1333 | /* Reset Doc ID */ |
1334 | *in_doc_id = 0; |
1335 | memset(fts_node, 0x0, sizeof(*fts_node)); |
1336 | } |
1337 | |
1338 | /* Get the word's Doc ID */ |
1339 | dfield = dtuple_get_nth_field(dtuple, 1); |
1340 | |
1341 | if (!ins_ctx->opt_doc_id_size) { |
1342 | doc_id = fts_read_doc_id( |
1343 | static_cast<byte*>(dfield_get_data(dfield))); |
1344 | } else { |
1345 | doc_id = (doc_id_t) mach_read_from_4( |
1346 | static_cast<byte*>(dfield_get_data(dfield))); |
1347 | } |
1348 | |
1349 | /* Get the word's position info */ |
1350 | dfield = dtuple_get_nth_field(dtuple, 2); |
1351 | position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield))); |
1352 | |
1353 | /* If this is the same word as the last word, and they |
1354 | have the same Doc ID, we just need to add its position |
1355 | info. Otherwise, we will flush position info to the |
1356 | fts_node and initiate a new position vector */ |
1357 | if (!(*in_doc_id) || *in_doc_id == doc_id) { |
1358 | ib_vector_push(positions, &position); |
1359 | } else { |
1360 | ulint num_pos = ib_vector_size(positions); |
1361 | |
1362 | fts_cache_node_add_positions(NULL, fts_node, |
1363 | *in_doc_id, positions); |
1364 | for (i = 0; i < num_pos; i++) { |
1365 | ib_vector_pop(positions); |
1366 | } |
1367 | ib_vector_push(positions, &position); |
1368 | } |
1369 | |
1370 | /* record the current Doc ID */ |
1371 | *in_doc_id = doc_id; |
1372 | } |
1373 | |
1374 | /*********************************************************************//** |
1375 | Propagate a newly added record up one level in the selection tree |
1376 | @return parent where this value propagated to */ |
1377 | static |
1378 | ulint |
1379 | row_fts_sel_tree_propagate( |
1380 | /*=======================*/ |
1381 | ulint propogated, /*<! in: tree node propagated */ |
1382 | int* sel_tree, /*<! in: selection tree */ |
1383 | const mrec_t** mrec, /*<! in: sort record */ |
1384 | ulint** offsets, /*<! in: record offsets */ |
1385 | dict_index_t* index) /*<! in/out: FTS index */ |
1386 | { |
1387 | ulint parent; |
1388 | int child_left; |
1389 | int child_right; |
1390 | int selected; |
1391 | |
1392 | /* Find which parent this value will be propagated to */ |
1393 | parent = (propogated - 1) / 2; |
1394 | |
1395 | /* Find out which value is smaller, and to propagate */ |
1396 | child_left = sel_tree[parent * 2 + 1]; |
1397 | child_right = sel_tree[parent * 2 + 2]; |
1398 | |
1399 | if (child_left == -1 || mrec[child_left] == NULL) { |
1400 | if (child_right == -1 |
1401 | || mrec[child_right] == NULL) { |
1402 | selected = -1; |
1403 | } else { |
1404 | selected = child_right ; |
1405 | } |
1406 | } else if (child_right == -1 |
1407 | || mrec[child_right] == NULL) { |
1408 | selected = child_left; |
1409 | } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right], |
1410 | offsets[child_left], |
1411 | offsets[child_right], |
1412 | index, NULL) < 0) { |
1413 | selected = child_left; |
1414 | } else { |
1415 | selected = child_right; |
1416 | } |
1417 | |
1418 | sel_tree[parent] = selected; |
1419 | |
1420 | return parent; |
1421 | } |
1422 | |
1423 | /*********************************************************************//** |
1424 | Readjust selection tree after popping the root and read a new value |
1425 | @return the new root */ |
1426 | static |
1427 | int |
1428 | row_fts_sel_tree_update( |
1429 | /*====================*/ |
1430 | int* sel_tree, /*<! in/out: selection tree */ |
1431 | ulint propagated, /*<! in: node to propagate up */ |
1432 | ulint height, /*<! in: tree height */ |
1433 | const mrec_t** mrec, /*<! in: sort record */ |
1434 | ulint** offsets, /*<! in: record offsets */ |
1435 | dict_index_t* index) /*<! in: index dictionary */ |
1436 | { |
1437 | ulint i; |
1438 | |
1439 | for (i = 1; i <= height; i++) { |
1440 | propagated = row_fts_sel_tree_propagate( |
1441 | propagated, sel_tree, mrec, offsets, index); |
1442 | } |
1443 | |
1444 | return(sel_tree[0]); |
1445 | } |
1446 | |
1447 | /*********************************************************************//** |
1448 | Build selection tree at a specified level */ |
1449 | static |
1450 | void |
1451 | row_fts_build_sel_tree_level( |
1452 | /*=========================*/ |
1453 | int* sel_tree, /*<! in/out: selection tree */ |
1454 | ulint level, /*<! in: selection tree level */ |
1455 | const mrec_t** mrec, /*<! in: sort record */ |
1456 | ulint** offsets, /*<! in: record offsets */ |
1457 | dict_index_t* index) /*<! in: index dictionary */ |
1458 | { |
1459 | ulint start; |
1460 | int child_left; |
1461 | int child_right; |
1462 | ulint i; |
1463 | ulint num_item = ulint(1) << level; |
1464 | |
1465 | start = num_item - 1; |
1466 | |
1467 | for (i = 0; i < num_item; i++) { |
1468 | child_left = sel_tree[(start + i) * 2 + 1]; |
1469 | child_right = sel_tree[(start + i) * 2 + 2]; |
1470 | |
1471 | if (child_left == -1) { |
1472 | if (child_right == -1) { |
1473 | sel_tree[start + i] = -1; |
1474 | } else { |
1475 | sel_tree[start + i] = child_right; |
1476 | } |
1477 | continue; |
1478 | } else if (child_right == -1) { |
1479 | sel_tree[start + i] = child_left; |
1480 | continue; |
1481 | } |
1482 | |
1483 | /* Deal with NULL child conditions */ |
1484 | if (!mrec[child_left]) { |
1485 | if (!mrec[child_right]) { |
1486 | sel_tree[start + i] = -1; |
1487 | } else { |
1488 | sel_tree[start + i] = child_right; |
1489 | } |
1490 | continue; |
1491 | } else if (!mrec[child_right]) { |
1492 | sel_tree[start + i] = child_left; |
1493 | continue; |
1494 | } |
1495 | |
1496 | /* Select the smaller one to set parent pointer */ |
1497 | int cmp = cmp_rec_rec_simple( |
1498 | mrec[child_left], mrec[child_right], |
1499 | offsets[child_left], offsets[child_right], |
1500 | index, NULL); |
1501 | |
1502 | sel_tree[start + i] = cmp < 0 ? child_left : child_right; |
1503 | } |
1504 | } |
1505 | |
1506 | /*********************************************************************//** |
1507 | Build a selection tree for merge. The selection tree is a binary tree |
1508 | and should have fts_sort_pll_degree / 2 levels. With root as level 0 |
1509 | @return number of tree levels */ |
1510 | static |
1511 | ulint |
1512 | row_fts_build_sel_tree( |
1513 | /*===================*/ |
1514 | int* sel_tree, /*<! in/out: selection tree */ |
1515 | const mrec_t** mrec, /*<! in: sort record */ |
1516 | ulint** offsets, /*<! in: record offsets */ |
1517 | dict_index_t* index) /*<! in: index dictionary */ |
1518 | { |
1519 | ulint treelevel = 1; |
1520 | ulint num = 2; |
1521 | ulint i = 0; |
1522 | ulint start; |
1523 | |
1524 | /* No need to build selection tree if we only have two merge threads */ |
1525 | if (fts_sort_pll_degree <= 2) { |
1526 | return(0); |
1527 | } |
1528 | |
1529 | while (num < fts_sort_pll_degree) { |
1530 | num = num << 1; |
1531 | treelevel++; |
1532 | } |
1533 | |
1534 | start = (ulint(1) << treelevel) - 1; |
1535 | |
1536 | for (i = 0; i < fts_sort_pll_degree; i++) { |
1537 | sel_tree[i + start] = int(i); |
1538 | } |
1539 | |
1540 | for (i = treelevel; --i; ) { |
1541 | row_fts_build_sel_tree_level( |
1542 | sel_tree, i, mrec, offsets, index); |
1543 | } |
1544 | |
1545 | return(treelevel); |
1546 | } |
1547 | |
1548 | /*********************************************************************//** |
1549 | Read sorted file containing index data tuples and insert these data |
1550 | tuples to the index |
1551 | @return DB_SUCCESS or error number */ |
1552 | dberr_t |
1553 | row_fts_merge_insert( |
1554 | /*=================*/ |
1555 | dict_index_t* index, /*!< in: index */ |
1556 | dict_table_t* table, /*!< in: new table */ |
1557 | fts_psort_t* psort_info, /*!< parallel sort info */ |
1558 | ulint id) /* !< in: which auxiliary table's data |
1559 | to insert to */ |
1560 | { |
1561 | const byte** b; |
1562 | mem_heap_t* tuple_heap; |
1563 | mem_heap_t* heap; |
1564 | dberr_t error = DB_SUCCESS; |
1565 | ulint* foffs; |
1566 | ulint** offsets; |
1567 | fts_tokenizer_word_t new_word; |
1568 | ib_vector_t* positions; |
1569 | doc_id_t last_doc_id; |
1570 | ib_alloc_t* heap_alloc; |
1571 | ulint i; |
1572 | mrec_buf_t** buf; |
1573 | pfs_os_file_t* fd; |
1574 | byte** block; |
1575 | byte** crypt_block; |
1576 | const mrec_t** mrec; |
1577 | ulint count = 0; |
1578 | int* sel_tree; |
1579 | ulint height; |
1580 | ulint start; |
1581 | fts_psort_insert_t ins_ctx; |
1582 | uint64_t count_diag = 0; |
1583 | fts_table_t fts_table; |
1584 | char aux_table_name[MAX_FULL_NAME_LEN]; |
1585 | dict_table_t* aux_table; |
1586 | dict_index_t* aux_index; |
1587 | trx_t* trx; |
1588 | byte trx_id_buf[6]; |
1589 | roll_ptr_t roll_ptr = 0; |
1590 | dfield_t* field; |
1591 | |
1592 | ut_ad(index); |
1593 | ut_ad(table); |
1594 | |
1595 | /* We use the insert query graph as the dummy graph |
1596 | needed in the row module call */ |
1597 | |
1598 | trx = trx_create(); |
1599 | trx_start_if_not_started(trx, true); |
1600 | |
1601 | trx->op_info = "inserting index entries" ; |
1602 | |
1603 | ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size; |
1604 | |
1605 | heap = mem_heap_create(500 + sizeof(mrec_buf_t)); |
1606 | |
1607 | b = (const byte**) mem_heap_alloc( |
1608 | heap, sizeof (*b) * fts_sort_pll_degree); |
1609 | foffs = (ulint*) mem_heap_alloc( |
1610 | heap, sizeof(*foffs) * fts_sort_pll_degree); |
1611 | offsets = (ulint**) mem_heap_alloc( |
1612 | heap, sizeof(*offsets) * fts_sort_pll_degree); |
1613 | buf = (mrec_buf_t**) mem_heap_alloc( |
1614 | heap, sizeof(*buf) * fts_sort_pll_degree); |
1615 | fd = (pfs_os_file_t*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree); |
1616 | block = (byte**) mem_heap_alloc( |
1617 | heap, sizeof(*block) * fts_sort_pll_degree); |
1618 | crypt_block = (byte**) mem_heap_alloc( |
1619 | heap, sizeof(*block) * fts_sort_pll_degree); |
1620 | mrec = (const mrec_t**) mem_heap_alloc( |
1621 | heap, sizeof(*mrec) * fts_sort_pll_degree); |
1622 | sel_tree = (int*) mem_heap_alloc( |
1623 | heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2)); |
1624 | |
1625 | tuple_heap = mem_heap_create(1000); |
1626 | |
1627 | ins_ctx.charset = fts_index_get_charset(index); |
1628 | ins_ctx.heap = heap; |
1629 | |
1630 | for (i = 0; i < fts_sort_pll_degree; i++) { |
1631 | ulint num; |
1632 | |
1633 | num = 1 + REC_OFFS_HEADER_SIZE |
1634 | + dict_index_get_n_fields(index); |
1635 | offsets[i] = static_cast<ulint*>(mem_heap_zalloc( |
1636 | heap, num * sizeof *offsets[i])); |
1637 | offsets[i][0] = num; |
1638 | offsets[i][1] = dict_index_get_n_fields(index); |
1639 | block[i] = psort_info[i].merge_block[id]; |
1640 | crypt_block[i] = psort_info[i].crypt_block[id]; |
1641 | b[i] = psort_info[i].merge_block[id]; |
1642 | fd[i] = psort_info[i].merge_file[id]->fd; |
1643 | foffs[i] = 0; |
1644 | |
1645 | buf[i] = static_cast<mrec_buf_t*>( |
1646 | mem_heap_alloc(heap, sizeof *buf[i])); |
1647 | |
1648 | count_diag += psort_info[i].merge_file[id]->n_rec; |
1649 | } |
1650 | |
1651 | if (fts_enable_diag_print) { |
1652 | ib::info() << "InnoDB_FTS: to insert " << count_diag |
1653 | << " records" ; |
1654 | } |
1655 | |
1656 | /* Initialize related variables if creating FTS indexes */ |
1657 | heap_alloc = ib_heap_allocator_create(heap); |
1658 | |
1659 | memset(&new_word, 0, sizeof(new_word)); |
1660 | |
1661 | new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4); |
1662 | positions = ib_vector_create(heap_alloc, sizeof(ulint), 32); |
1663 | last_doc_id = 0; |
1664 | |
1665 | /* We should set the flags2 with aux_table_name here, |
1666 | in order to get the correct aux table names. */ |
1667 | index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME; |
1668 | DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name" , |
1669 | index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;); |
1670 | fts_table.type = FTS_INDEX_TABLE; |
1671 | fts_table.index_id = index->id; |
1672 | fts_table.table_id = table->id; |
1673 | fts_table.parent = index->table->name.m_name; |
1674 | fts_table.table = index->table; |
1675 | fts_table.suffix = fts_get_suffix(id); |
1676 | |
1677 | /* Get aux index */ |
1678 | fts_get_table_name(&fts_table, aux_table_name); |
1679 | aux_table = dict_table_open_on_name(aux_table_name, FALSE, FALSE, |
1680 | DICT_ERR_IGNORE_NONE); |
1681 | ut_ad(aux_table != NULL); |
1682 | dict_table_close(aux_table, FALSE, FALSE); |
1683 | aux_index = dict_table_get_first_index(aux_table); |
1684 | |
1685 | ut_ad(!aux_index->is_instant()); |
1686 | /* row_merge_write_fts_node() depends on the correct value */ |
1687 | ut_ad(aux_index->n_core_null_bytes |
1688 | == UT_BITS_IN_BYTES(aux_index->n_nullable)); |
1689 | |
1690 | FlushObserver* observer; |
1691 | observer = psort_info[0].psort_common->trx->flush_observer; |
1692 | |
1693 | /* Create bulk load instance */ |
1694 | ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx->id, observer)); |
1695 | ins_ctx.btr_bulk->init(); |
1696 | |
1697 | /* Create tuple for insert */ |
1698 | ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index)); |
1699 | dict_index_copy_types(ins_ctx.tuple, aux_index, |
1700 | dict_index_get_n_fields(aux_index)); |
1701 | |
1702 | /* Set TRX_ID and ROLL_PTR */ |
1703 | trx_write_trx_id(trx_id_buf, trx->id); |
1704 | field = dtuple_get_nth_field(ins_ctx.tuple, 2); |
1705 | dfield_set_data(field, &trx_id_buf, 6); |
1706 | |
1707 | field = dtuple_get_nth_field(ins_ctx.tuple, 3); |
1708 | dfield_set_data(field, &roll_ptr, 7); |
1709 | |
1710 | #ifdef UNIV_DEBUG |
1711 | ins_ctx.aux_index_id = id; |
1712 | #endif |
1713 | const ulint space = table->space->id; |
1714 | |
1715 | for (i = 0; i < fts_sort_pll_degree; i++) { |
1716 | if (psort_info[i].merge_file[id]->n_rec == 0) { |
1717 | /* No Rows to read */ |
1718 | mrec[i] = b[i] = NULL; |
1719 | } else { |
1720 | /* Read from temp file only if it has been |
1721 | written to. Otherwise, block memory holds |
1722 | all the sorted records */ |
1723 | if (psort_info[i].merge_file[id]->offset > 0 |
1724 | && (!row_merge_read( |
1725 | fd[i], foffs[i], |
1726 | (row_merge_block_t*) block[i], |
1727 | (row_merge_block_t*) crypt_block[i], |
1728 | space))) { |
1729 | error = DB_CORRUPTION; |
1730 | goto exit; |
1731 | } |
1732 | |
1733 | ROW_MERGE_READ_GET_NEXT(i); |
1734 | } |
1735 | } |
1736 | |
1737 | height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec, |
1738 | offsets, index); |
1739 | |
1740 | start = (1U << height) - 1; |
1741 | |
1742 | /* Fetch sorted records from sort buffer and insert them into |
1743 | corresponding FTS index auxiliary tables */ |
1744 | for (;;) { |
1745 | dtuple_t* dtuple; |
1746 | ulint n_ext; |
1747 | int min_rec = 0; |
1748 | |
1749 | if (fts_sort_pll_degree <= 2) { |
1750 | while (!mrec[min_rec]) { |
1751 | min_rec++; |
1752 | |
1753 | if (min_rec >= (int) fts_sort_pll_degree) { |
1754 | row_fts_insert_tuple( |
1755 | &ins_ctx, &new_word, |
1756 | positions, &last_doc_id, |
1757 | NULL); |
1758 | |
1759 | goto exit; |
1760 | } |
1761 | } |
1762 | |
1763 | for (i = min_rec + 1; i < fts_sort_pll_degree; i++) { |
1764 | if (!mrec[i]) { |
1765 | continue; |
1766 | } |
1767 | |
1768 | if (cmp_rec_rec_simple( |
1769 | mrec[i], mrec[min_rec], |
1770 | offsets[i], offsets[min_rec], |
1771 | index, NULL) < 0) { |
1772 | min_rec = static_cast<int>(i); |
1773 | } |
1774 | } |
1775 | } else { |
1776 | min_rec = sel_tree[0]; |
1777 | |
1778 | if (min_rec == -1) { |
1779 | row_fts_insert_tuple( |
1780 | &ins_ctx, &new_word, |
1781 | positions, &last_doc_id, |
1782 | NULL); |
1783 | |
1784 | goto exit; |
1785 | } |
1786 | } |
1787 | |
1788 | dtuple = row_rec_to_index_entry_low( |
1789 | mrec[min_rec], index, offsets[min_rec], &n_ext, |
1790 | tuple_heap); |
1791 | |
1792 | row_fts_insert_tuple( |
1793 | &ins_ctx, &new_word, positions, |
1794 | &last_doc_id, dtuple); |
1795 | |
1796 | |
1797 | ROW_MERGE_READ_GET_NEXT(min_rec); |
1798 | |
1799 | if (fts_sort_pll_degree > 2) { |
1800 | if (!mrec[min_rec]) { |
1801 | sel_tree[start + min_rec] = -1; |
1802 | } |
1803 | |
1804 | row_fts_sel_tree_update(sel_tree, start + min_rec, |
1805 | height, mrec, |
1806 | offsets, index); |
1807 | } |
1808 | |
1809 | count++; |
1810 | |
1811 | mem_heap_empty(tuple_heap); |
1812 | } |
1813 | |
1814 | exit: |
1815 | fts_sql_commit(trx); |
1816 | |
1817 | trx->op_info = "" ; |
1818 | |
1819 | mem_heap_free(tuple_heap); |
1820 | |
1821 | error = ins_ctx.btr_bulk->finish(error); |
1822 | UT_DELETE(ins_ctx.btr_bulk); |
1823 | |
1824 | trx_free(trx); |
1825 | |
1826 | mem_heap_free(heap); |
1827 | |
1828 | if (fts_enable_diag_print) { |
1829 | ib::info() << "InnoDB_FTS: inserted " << count << " records" ; |
1830 | } |
1831 | |
1832 | return(error); |
1833 | } |
1834 | |