1/*****************************************************************************
2
3Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2015, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file row/row0ftsort.cc
22Create Full Text Index with (parallel) merge sort
23
24Created 10/13/2010 Jimmy Yang
25*******************************************************/
26
27#include "ha_prototypes.h"
28
29#include "dict0dict.h"
30#include "row0merge.h"
31#include "pars0pars.h"
32#include "row0ftsort.h"
33#include "row0merge.h"
34#include "row0row.h"
35#include "btr0cur.h"
36#include "btr0bulk.h"
37#include "fts0plugin.h"
38#include "log0crypt.h"
39
40/** Read the next record to buffer N.
41@param N index into array of merge info structure */
42#define ROW_MERGE_READ_GET_NEXT(N) \
43 do { \
44 b[N] = row_merge_read_rec( \
45 block[N], buf[N], b[N], index, \
46 fd[N], &foffs[N], &mrec[N], offsets[N], \
47 crypt_block[N], space); \
48 if (UNIV_UNLIKELY(!b[N])) { \
49 if (mrec[N]) { \
50 goto exit; \
51 } \
52 } \
53 } while (0)
54
55/** Parallel sort degree */
56ulong fts_sort_pll_degree = 2;
57
58/*********************************************************************//**
59Create a temporary "fts sort index" used to merge sort the
60tokenized doc string. The index has three "fields":
61
621) Tokenized word,
632) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
64integer value)
653) Word's position in original doc.
66
67@see fts_create_one_index_table()
68
69@return dict_index_t structure for the fts sort index */
70dict_index_t*
71row_merge_create_fts_sort_index(
72/*============================*/
73 dict_index_t* index, /*!< in: Original FTS index
74 based on which this sort index
75 is created */
76 dict_table_t* table, /*!< in,out: table that FTS index
77 is being created on */
78 ibool* opt_doc_id_size)
79 /*!< out: whether to use 4 bytes
80 instead of 8 bytes integer to
81 store Doc ID during sort */
82{
83 dict_index_t* new_index;
84 dict_field_t* field;
85 dict_field_t* idx_field;
86 CHARSET_INFO* charset;
87
88 // FIXME: This name shouldn't be hard coded here.
89 new_index = dict_mem_index_create(table, "tmp_fts_idx", DICT_FTS, 3);
90
91 new_index->id = index->id;
92 new_index->n_uniq = FTS_NUM_FIELDS_SORT;
93 new_index->n_def = FTS_NUM_FIELDS_SORT;
94 new_index->cached = TRUE;
95 new_index->parser = index->parser;
96
97 idx_field = dict_index_get_nth_field(index, 0);
98 charset = fts_index_get_charset(index);
99
100 /* The first field is on the Tokenized Word */
101 field = dict_index_get_nth_field(new_index, 0);
102 field->name = NULL;
103 field->prefix_len = 0;
104 field->col = static_cast<dict_col_t*>(
105 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
106 field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
107 field->col->mtype = charset == &my_charset_latin1
108 ? DATA_VARCHAR : DATA_VARMYSQL;
109 field->col->mbminlen = idx_field->col->mbminlen;
110 field->col->mbmaxlen = idx_field->col->mbmaxlen;
111 field->col->len = HA_FT_MAXCHARLEN * unsigned(field->col->mbmaxlen);
112
113 field->fixed_len = 0;
114
115 /* Doc ID */
116 field = dict_index_get_nth_field(new_index, 1);
117 field->name = NULL;
118 field->prefix_len = 0;
119 field->col = static_cast<dict_col_t*>(
120 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
121 field->col->mtype = DATA_INT;
122 *opt_doc_id_size = FALSE;
123
124 /* Check whether we can use 4 bytes instead of 8 bytes integer
125 field to hold the Doc ID, thus reduce the overall sort size */
126 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
127 /* If Doc ID column is being added by this create
128 index, then just check the number of rows in the table */
129 if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
130 *opt_doc_id_size = TRUE;
131 }
132 } else {
133 doc_id_t max_doc_id;
134
135 /* If the Doc ID column is supplied by user, then
136 check the maximum Doc ID in the table */
137 max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
138
139 if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
140 *opt_doc_id_size = TRUE;
141 }
142 }
143
144 if (*opt_doc_id_size) {
145 field->col->len = sizeof(ib_uint32_t);
146 field->fixed_len = sizeof(ib_uint32_t);
147 } else {
148 field->col->len = FTS_DOC_ID_LEN;
149 field->fixed_len = FTS_DOC_ID_LEN;
150 }
151
152 field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
153
154 field->col->mbminlen = 0;
155 field->col->mbmaxlen = 0;
156
157 /* The third field is on the word's position in the original doc */
158 field = dict_index_get_nth_field(new_index, 2);
159 field->name = NULL;
160 field->prefix_len = 0;
161 field->col = static_cast<dict_col_t*>(
162 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
163 field->col->mtype = DATA_INT;
164 field->col->len = 4 ;
165 field->fixed_len = 4;
166 field->col->prtype = DATA_NOT_NULL;
167 field->col->mbminlen = 0;
168 field->col->mbmaxlen = 0;
169
170 return(new_index);
171}
172/*********************************************************************//**
173Initialize FTS parallel sort structures.
174@return TRUE if all successful */
175ibool
176row_fts_psort_info_init(
177/*====================*/
178 trx_t* trx, /*!< in: transaction */
179 row_merge_dup_t* dup, /*!< in,own: descriptor of
180 FTS index being created */
181 const dict_table_t* new_table,/*!< in: table on which indexes are
182 created */
183 ibool opt_doc_id_size,
184 /*!< in: whether to use 4 bytes
185 instead of 8 bytes integer to
186 store Doc ID during sort */
187 fts_psort_t** psort, /*!< out: parallel sort info to be
188 instantiated */
189 fts_psort_t** merge) /*!< out: parallel merge info
190 to be instantiated */
191{
192 ulint i;
193 ulint j;
194 fts_psort_common_t* common_info = NULL;
195 fts_psort_t* psort_info = NULL;
196 fts_psort_t* merge_info = NULL;
197 ulint block_size;
198 ibool ret = TRUE;
199 bool encrypted = false;
200
201 block_size = 3 * srv_sort_buf_size;
202
203 *psort = psort_info = static_cast<fts_psort_t*>(ut_zalloc_nokey(
204 fts_sort_pll_degree * sizeof *psort_info));
205
206 if (!psort_info) {
207 ut_free(dup);
208 return(FALSE);
209 }
210
211 /* Common Info for all sort threads */
212 common_info = static_cast<fts_psort_common_t*>(
213 ut_malloc_nokey(sizeof *common_info));
214
215 if (!common_info) {
216 ut_free(dup);
217 ut_free(psort_info);
218 return(FALSE);
219 }
220
221 common_info->dup = dup;
222 common_info->new_table = (dict_table_t*) new_table;
223 common_info->trx = trx;
224 common_info->all_info = psort_info;
225 common_info->sort_event = os_event_create(0);
226 common_info->merge_event = os_event_create(0);
227 common_info->opt_doc_id_size = opt_doc_id_size;
228
229 if (log_tmp_is_encrypted()) {
230 encrypted = true;
231 }
232
233 ut_ad(trx->mysql_thd != NULL);
234 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
235 /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
236 each parallel sort thread. Each "sort bucket" holds records for
237 a particular "FTS index partition" */
238 for (j = 0; j < fts_sort_pll_degree; j++) {
239
240 UT_LIST_INIT(
241 psort_info[j].fts_doc_list, &fts_doc_item_t::doc_list);
242
243 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
244
245 psort_info[j].merge_file[i] =
246 static_cast<merge_file_t*>(
247 ut_zalloc_nokey(sizeof(merge_file_t)));
248
249 if (!psort_info[j].merge_file[i]) {
250 ret = FALSE;
251 goto func_exit;
252 }
253
254 psort_info[j].merge_buf[i] = row_merge_buf_create(
255 dup->index);
256
257 if (row_merge_file_create(psort_info[j].merge_file[i],
258 path) == OS_FILE_CLOSED) {
259 goto func_exit;
260 }
261
262 /* Need to align memory for O_DIRECT write */
263 psort_info[j].block_alloc[i] =
264 static_cast<row_merge_block_t*>(ut_malloc_nokey(
265 block_size + 1024));
266
267 psort_info[j].merge_block[i] =
268 static_cast<row_merge_block_t*>(
269 ut_align(
270 psort_info[j].block_alloc[i], 1024));
271
272 if (!psort_info[j].merge_block[i]) {
273 ret = FALSE;
274 goto func_exit;
275 }
276
277 /* If tablespace is encrypted, allocate additional buffer for
278 encryption/decryption. */
279 if (encrypted) {
280
281 /* Need to align memory for O_DIRECT write */
282 psort_info[j].crypt_alloc[i] =
283 static_cast<row_merge_block_t*>(ut_malloc_nokey(
284 block_size + 1024));
285
286 psort_info[j].crypt_block[i] =
287 static_cast<row_merge_block_t*>(
288 ut_align(
289 psort_info[j].crypt_alloc[i], 1024));
290
291 if (!psort_info[j].crypt_block[i]) {
292 ret = FALSE;
293 goto func_exit;
294 }
295 } else {
296 psort_info[j].crypt_alloc[i] = NULL;
297 psort_info[j].crypt_block[i] = NULL;
298 }
299 }
300
301 psort_info[j].child_status = 0;
302 psort_info[j].state = 0;
303 psort_info[j].psort_common = common_info;
304 psort_info[j].error = DB_SUCCESS;
305 psort_info[j].memory_used = 0;
306 mutex_create(LATCH_ID_FTS_PLL_TOKENIZE, &psort_info[j].mutex);
307 }
308
309 /* Initialize merge_info structures parallel merge and insert
310 into auxiliary FTS tables (FTS_INDEX_TABLE) */
311 *merge = merge_info = static_cast<fts_psort_t*>(
312 ut_malloc_nokey(FTS_NUM_AUX_INDEX * sizeof *merge_info));
313
314 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
315
316 merge_info[j].child_status = 0;
317 merge_info[j].state = 0;
318 merge_info[j].psort_common = common_info;
319 }
320
321func_exit:
322 if (!ret) {
323 row_fts_psort_info_destroy(psort_info, merge_info);
324 }
325
326 return(ret);
327}
328/*********************************************************************//**
329Clean up and deallocate FTS parallel sort structures, and close the
330merge sort files */
331void
332row_fts_psort_info_destroy(
333/*=======================*/
334 fts_psort_t* psort_info, /*!< parallel sort info */
335 fts_psort_t* merge_info) /*!< parallel merge info */
336{
337 ulint i;
338 ulint j;
339
340 if (psort_info) {
341 for (j = 0; j < fts_sort_pll_degree; j++) {
342 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
343 if (psort_info[j].merge_file[i]) {
344 row_merge_file_destroy(
345 psort_info[j].merge_file[i]);
346 }
347
348 ut_free(psort_info[j].block_alloc[i]);
349 ut_free(psort_info[j].merge_file[i]);
350
351 if (psort_info[j].crypt_alloc[i]) {
352 ut_free(psort_info[j].crypt_alloc[i]);
353 }
354 }
355
356 mutex_free(&psort_info[j].mutex);
357 }
358
359 os_event_destroy(merge_info[0].psort_common->sort_event);
360 os_event_destroy(merge_info[0].psort_common->merge_event);
361 ut_free(merge_info[0].psort_common->dup);
362 ut_free(merge_info[0].psort_common);
363 ut_free(psort_info);
364 }
365
366 ut_free(merge_info);
367}
368/*********************************************************************//**
369Free up merge buffers when merge sort is done */
370void
371row_fts_free_pll_merge_buf(
372/*=======================*/
373 fts_psort_t* psort_info) /*!< in: parallel sort info */
374{
375 ulint j;
376 ulint i;
377
378 if (!psort_info) {
379 return;
380 }
381
382 for (j = 0; j < fts_sort_pll_degree; j++) {
383 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
384 row_merge_buf_free(psort_info[j].merge_buf[i]);
385 }
386 }
387
388 return;
389}
390
391/*********************************************************************//**
392FTS plugin parser 'myql_add_word' callback function for row merge.
393Refer to 'st_mysql_ftparser_param' for more detail.
394@return always returns 0 */
395static
396int
397row_merge_fts_doc_add_word_for_parser(
398/*==================================*/
399 MYSQL_FTPARSER_PARAM *param, /* in: parser paramter */
400 const char *word, /* in: token word */
401 int word_len, /* in: word len */
402 MYSQL_FTPARSER_BOOLEAN_INFO* boolean_info) /* in: boolean info */
403{
404 fts_string_t str;
405 fts_tokenize_ctx_t* t_ctx;
406 row_fts_token_t* fts_token;
407 byte* ptr;
408
409 ut_ad(param);
410 ut_ad(param->mysql_ftparam);
411 ut_ad(word);
412 ut_ad(boolean_info);
413
414 t_ctx = static_cast<fts_tokenize_ctx_t*>(param->mysql_ftparam);
415 ut_ad(t_ctx);
416
417 str.f_str = (byte*)(word);
418 str.f_len = ulint(word_len);
419 str.f_n_char = fts_get_token_size(
420 (CHARSET_INFO*)param->cs, word, ulint(word_len));
421
422 /* JAN: TODO: MySQL 5.7 FTS
423 ut_ad(boolean_info->position >= 0);
424 */
425
426 ptr = static_cast<byte*>(ut_malloc_nokey(sizeof(row_fts_token_t)
427 + sizeof(fts_string_t) + str.f_len));
428 fts_token = reinterpret_cast<row_fts_token_t*>(ptr);
429 fts_token->text = reinterpret_cast<fts_string_t*>(
430 ptr + sizeof(row_fts_token_t));
431 fts_token->text->f_str = static_cast<byte*>(
432 ptr + sizeof(row_fts_token_t) + sizeof(fts_string_t));
433
434 fts_token->text->f_len = str.f_len;
435 fts_token->text->f_n_char = str.f_n_char;
436 memcpy(fts_token->text->f_str, str.f_str, str.f_len);
437
438 /* JAN: TODO: MySQL 5.7 FTS
439 fts_token->position = boolean_info->position;
440 */
441
442 /* Add token to list */
443 UT_LIST_ADD_LAST(t_ctx->fts_token_list, fts_token);
444
445 return(0);
446}
447
448/*********************************************************************//**
449Tokenize by fts plugin parser */
450static
451void
452row_merge_fts_doc_tokenize_by_parser(
453/*=================================*/
454 fts_doc_t* doc, /* in: doc to tokenize */
455 st_mysql_ftparser* parser, /* in: plugin parser instance */
456 fts_tokenize_ctx_t* t_ctx) /* in/out: tokenize ctx instance */
457{
458 MYSQL_FTPARSER_PARAM param;
459
460 ut_a(parser);
461
462 /* Set paramters for param */
463 param.mysql_parse = fts_tokenize_document_internal;
464 param.mysql_add_word = row_merge_fts_doc_add_word_for_parser;
465 param.mysql_ftparam = t_ctx;
466 param.cs = doc->charset;
467 param.doc = reinterpret_cast<char*>(doc->text.f_str);
468 param.length = static_cast<int>(doc->text.f_len);
469 param.mode= MYSQL_FTPARSER_SIMPLE_MODE;
470
471 PARSER_INIT(parser, &param);
472 /* We assume parse returns successfully here. */
473 parser->parse(&param);
474 PARSER_DEINIT(parser, &param);
475}
476
477/*********************************************************************//**
478Tokenize incoming text data and add to the sort buffer.
479@see row_merge_buf_encode()
480@return TRUE if the record passed, FALSE if out of space */
481static
482ibool
483row_merge_fts_doc_tokenize(
484/*=======================*/
485 row_merge_buf_t** sort_buf, /*!< in/out: sort buffer */
486 doc_id_t doc_id, /*!< in: Doc ID */
487 fts_doc_t* doc, /*!< in: Doc to be tokenized */
488 merge_file_t** merge_file, /*!< in/out: merge file */
489 ibool opt_doc_id_size,/*!< in: whether to use 4 bytes
490 instead of 8 bytes integer to
491 store Doc ID during sort*/
492 fts_tokenize_ctx_t* t_ctx) /*!< in/out: tokenize context */
493{
494 ulint inc = 0;
495 fts_string_t str;
496 ulint len;
497 row_merge_buf_t* buf;
498 dfield_t* field;
499 fts_string_t t_str;
500 ibool buf_full = FALSE;
501 byte str_buf[FTS_MAX_WORD_LEN + 1];
502 ulint data_size[FTS_NUM_AUX_INDEX];
503 ulint n_tuple[FTS_NUM_AUX_INDEX];
504 st_mysql_ftparser* parser;
505
506 t_str.f_n_char = 0;
507 t_ctx->buf_used = 0;
508
509 memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
510 memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
511
512 parser = sort_buf[0]->index->parser;
513
514 /* Tokenize the data and add each word string, its corresponding
515 doc id and position to sort buffer */
516 while (t_ctx->processed_len < doc->text.f_len) {
517 ulint idx = 0;
518 ulint cur_len;
519 doc_id_t write_doc_id;
520 row_fts_token_t* fts_token = NULL;
521
522 if (parser != NULL) {
523 if (t_ctx->processed_len == 0) {
524 UT_LIST_INIT(t_ctx->fts_token_list, &row_fts_token_t::token_list);
525
526 /* Parse the whole doc and cache tokens */
527 row_merge_fts_doc_tokenize_by_parser(doc,
528 parser, t_ctx);
529
530 /* Just indictate we have parsed all the word */
531 t_ctx->processed_len += 1;
532 }
533
534 /* Then get a token */
535 fts_token = UT_LIST_GET_FIRST(t_ctx->fts_token_list);
536 if (fts_token) {
537 str.f_len = fts_token->text->f_len;
538 str.f_n_char = fts_token->text->f_n_char;
539 str.f_str = fts_token->text->f_str;
540 } else {
541 ut_ad(UT_LIST_GET_LEN(t_ctx->fts_token_list) == 0);
542 /* Reach the end of the list */
543 t_ctx->processed_len = doc->text.f_len;
544 break;
545 }
546 } else {
547 inc = innobase_mysql_fts_get_token(
548 doc->charset,
549 doc->text.f_str + t_ctx->processed_len,
550 doc->text.f_str + doc->text.f_len, &str);
551
552 ut_a(inc > 0);
553 }
554
555 /* Ignore string whose character number is less than
556 "fts_min_token_size" or more than "fts_max_token_size" */
557 if (!fts_check_token(&str, NULL, NULL)) {
558 if (parser != NULL) {
559 UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
560 ut_free(fts_token);
561 } else {
562 t_ctx->processed_len += inc;
563 }
564
565 continue;
566 }
567
568 t_str.f_len = innobase_fts_casedn_str(
569 doc->charset, (char*) str.f_str, str.f_len,
570 (char*) &str_buf, FTS_MAX_WORD_LEN + 1);
571
572 t_str.f_str = (byte*) &str_buf;
573
574 /* if "cached_stopword" is defined, ignore words in the
575 stopword list */
576 if (!fts_check_token(&str, t_ctx->cached_stopword,
577 doc->charset)) {
578 if (parser != NULL) {
579 UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
580 ut_free(fts_token);
581 } else {
582 t_ctx->processed_len += inc;
583 }
584
585 continue;
586 }
587
588 /* There are FTS_NUM_AUX_INDEX auxiliary tables, find
589 out which sort buffer to put this word record in */
590 t_ctx->buf_used = fts_select_index(
591 doc->charset, t_str.f_str, t_str.f_len);
592
593 buf = sort_buf[t_ctx->buf_used];
594
595 ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
596 idx = t_ctx->buf_used;
597
598 mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
599
600 field = mtuple->fields = static_cast<dfield_t*>(
601 mem_heap_alloc(buf->heap,
602 FTS_NUM_FIELDS_SORT * sizeof *field));
603
604 /* The first field is the tokenized word */
605 dfield_set_data(field, t_str.f_str, t_str.f_len);
606 len = dfield_get_len(field);
607
608 dict_col_copy_type(dict_index_get_nth_col(buf->index, 0), &field->type);
609 field->type.prtype |= DATA_NOT_NULL;
610 ut_ad(len <= field->type.len);
611
612 /* For the temporary file, row_merge_buf_encode() uses
613 1 byte for representing the number of extra_size bytes.
614 This number will always be 1, because for this 3-field index
615 consisting of one variable-size column, extra_size will always
616 be 1 or 2, which can be encoded in one byte.
617
618 The extra_size is 1 byte if the length of the
619 variable-length column is less than 128 bytes or the
620 maximum length is less than 256 bytes. */
621
622 /* One variable length column, word with its lenght less than
623 fts_max_token_size, add one extra size and one extra byte.
624
625 Since the max length for FTS token now is larger than 255,
626 so we will need to signify length byte itself, so only 1 to 128
627 bytes can be used for 1 bytes, larger than that 2 bytes. */
628 if (len < 128 || field->type.len < 256) {
629 /* Extra size is one byte. */
630 cur_len = 2 + len;
631 } else {
632 /* Extra size is two bytes. */
633 cur_len = 3 + len;
634 }
635
636 dfield_dup(field, buf->heap);
637 field++;
638
639 /* The second field is the Doc ID */
640
641 ib_uint32_t doc_id_32_bit;
642
643 if (!opt_doc_id_size) {
644 fts_write_doc_id((byte*) &write_doc_id, doc_id);
645
646 dfield_set_data(
647 field, &write_doc_id, sizeof(write_doc_id));
648 } else {
649 mach_write_to_4(
650 (byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
651
652 dfield_set_data(
653 field, &doc_id_32_bit, sizeof(doc_id_32_bit));
654 }
655
656 len = field->len;
657 ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
658
659 field->type.mtype = DATA_INT;
660 field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
661 field->type.len = len;
662 field->type.mbminlen = 0;
663 field->type.mbmaxlen = 0;
664
665 cur_len += len;
666 dfield_dup(field, buf->heap);
667
668 ++field;
669
670 /* The third field is the position.
671 MySQL 5.7 changed the fulltext parser plugin interface
672 by adding MYSQL_FTPARSER_BOOLEAN_INFO::position.
673 Below we assume that the field is always 0. */
674 ulint pos = t_ctx->init_pos;
675 byte position[4];
676 if (parser == NULL) {
677 pos += t_ctx->processed_len + inc - str.f_len;
678 }
679 len = 4;
680 mach_write_to_4(position, pos);
681 dfield_set_data(field, &position, len);
682
683 field->type.mtype = DATA_INT;
684 field->type.prtype = DATA_NOT_NULL;
685 field->type.len = len;
686 field->type.mbminlen = 0;
687 field->type.mbmaxlen = 0;
688 cur_len += len;
689 dfield_dup(field, buf->heap);
690
691 /* Reserve one byte for the end marker of row_merge_block_t */
692 if (buf->total_size + data_size[idx] + cur_len
693 >= srv_sort_buf_size - 1) {
694
695 buf_full = TRUE;
696 break;
697 }
698
699 /* Increment the number of tuples */
700 n_tuple[idx]++;
701 if (parser != NULL) {
702 UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
703 ut_free(fts_token);
704 } else {
705 t_ctx->processed_len += inc;
706 }
707 data_size[idx] += cur_len;
708 }
709
710 /* Update the data length and the number of new word tuples
711 added in this round of tokenization */
712 for (ulint i = 0; i < FTS_NUM_AUX_INDEX; i++) {
713 /* The computation of total_size below assumes that no
714 delete-mark flags will be stored and that all fields
715 are NOT NULL and fixed-length. */
716
717 sort_buf[i]->total_size += data_size[i];
718
719 sort_buf[i]->n_tuples += n_tuple[i];
720
721 merge_file[i]->n_rec += n_tuple[i];
722 t_ctx->rows_added[i] += n_tuple[i];
723 }
724
725 if (!buf_full) {
726 /* we pad one byte between text accross two fields */
727 t_ctx->init_pos += doc->text.f_len + 1;
728 }
729
730 return(!buf_full);
731}
732
733/*********************************************************************//**
734Get next doc item from fts_doc_list */
735UNIV_INLINE
736void
737row_merge_fts_get_next_doc_item(
738/*============================*/
739 fts_psort_t* psort_info, /*!< in: psort_info */
740 fts_doc_item_t** doc_item) /*!< in/out: doc item */
741{
742 if (*doc_item != NULL) {
743 ut_free(*doc_item);
744 }
745
746 mutex_enter(&psort_info->mutex);
747
748 *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
749 if (*doc_item != NULL) {
750 UT_LIST_REMOVE(psort_info->fts_doc_list, *doc_item);
751
752 ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
753 + (*doc_item)->field->len);
754 psort_info->memory_used -= sizeof(fts_doc_item_t)
755 + (*doc_item)->field->len;
756 }
757
758 mutex_exit(&psort_info->mutex);
759}
760
761/*********************************************************************//**
762Function performs parallel tokenization of the incoming doc strings.
763It also performs the initial in memory sort of the parsed records.
764@return OS_THREAD_DUMMY_RETURN */
765static
766os_thread_ret_t
767DECLARE_THREAD(fts_parallel_tokenization)(
768/*======================*/
769 void* arg) /*!< in: psort_info for the thread */
770{
771 fts_psort_t* psort_info = (fts_psort_t*) arg;
772 ulint i;
773 fts_doc_item_t* doc_item = NULL;
774 row_merge_buf_t** buf;
775 ibool processed = FALSE;
776 merge_file_t** merge_file;
777 row_merge_block_t** block;
778 row_merge_block_t** crypt_block;
779 pfs_os_file_t tmpfd[FTS_NUM_AUX_INDEX];
780 ulint mycount[FTS_NUM_AUX_INDEX];
781 ib_uint64_t total_rec = 0;
782 ulint num_doc_processed = 0;
783 doc_id_t last_doc_id = 0;
784 mem_heap_t* blob_heap = NULL;
785 fts_doc_t doc;
786 dict_table_t* table = psort_info->psort_common->new_table;
787 fts_tokenize_ctx_t t_ctx;
788 ulint retried = 0;
789 dberr_t error = DB_SUCCESS;
790
791 ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
792
793 /* const char* path = thd_innodb_tmpdir(
794 psort_info->psort_common->trx->mysql_thd);
795 */
796
797 ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
798
799 const char* path = thd_innodb_tmpdir(
800 psort_info->psort_common->trx->mysql_thd);
801
802 ut_ad(psort_info);
803
804 buf = psort_info->merge_buf;
805 merge_file = psort_info->merge_file;
806 blob_heap = mem_heap_create(512);
807 memset(&doc, 0, sizeof(doc));
808 memset(&t_ctx, 0, sizeof(t_ctx));
809 memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
810
811 doc.charset = fts_index_get_charset(
812 psort_info->psort_common->dup->index);
813
814 block = psort_info->merge_block;
815 crypt_block = psort_info->crypt_block;
816
817 const page_size_t& page_size = dict_table_page_size(table);
818
819 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
820
821 t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
822 processed = TRUE;
823loop:
824 while (doc_item) {
825 dfield_t* dfield = doc_item->field;
826
827 last_doc_id = doc_item->doc_id;
828
829 ut_ad (dfield->data != NULL
830 && dfield_get_len(dfield) != UNIV_SQL_NULL);
831
832 /* If finish processing the last item, update "doc" with
833 strings in the doc_item, otherwise continue processing last
834 item */
835 if (processed) {
836 byte* data;
837 ulint data_len;
838
839 dfield = doc_item->field;
840 data = static_cast<byte*>(dfield_get_data(dfield));
841 data_len = dfield_get_len(dfield);
842
843 if (dfield_is_ext(dfield)) {
844 doc.text.f_str =
845 btr_copy_externally_stored_field(
846 &doc.text.f_len, data,
847 page_size, data_len, blob_heap);
848 } else {
849 doc.text.f_str = data;
850 doc.text.f_len = data_len;
851 }
852
853 doc.tokens = 0;
854 t_ctx.processed_len = 0;
855 } else {
856 /* Not yet finish processing the "doc" on hand,
857 continue processing it */
858 ut_ad(doc.text.f_str);
859 ut_ad(t_ctx.processed_len < doc.text.f_len);
860 }
861
862 processed = row_merge_fts_doc_tokenize(
863 buf, doc_item->doc_id, &doc,
864 merge_file, psort_info->psort_common->opt_doc_id_size,
865 &t_ctx);
866
867 /* Current sort buffer full, need to recycle */
868 if (!processed) {
869 ut_ad(t_ctx.processed_len < doc.text.f_len);
870 ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
871 break;
872 }
873
874 num_doc_processed++;
875
876 if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
877 ib::info() << "Number of documents processed: "
878 << num_doc_processed;
879#ifdef FTS_INTERNAL_DIAG_PRINT
880 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
881 ib::info() << "ID " << psort_info->psort_id
882 << ", partition " << i << ", word "
883 << mycount[i];
884 }
885#endif
886 }
887
888 mem_heap_empty(blob_heap);
889
890 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
891
892 if (doc_item && last_doc_id != doc_item->doc_id) {
893 t_ctx.init_pos = 0;
894 }
895 }
896
897 /* If we run out of current sort buffer, need to sort
898 and flush the sort buffer to disk */
899 if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
900 row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
901 row_merge_buf_write(buf[t_ctx.buf_used],
902 merge_file[t_ctx.buf_used],
903 block[t_ctx.buf_used]);
904
905 if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
906 merge_file[t_ctx.buf_used]->offset++,
907 block[t_ctx.buf_used],
908 crypt_block[t_ctx.buf_used],
909 table->space->id)) {
910 error = DB_TEMP_FILE_WRITE_FAIL;
911 goto func_exit;
912 }
913
914 UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
915 buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
916 mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
917 t_ctx.rows_added[t_ctx.buf_used] = 0;
918
919 ut_a(doc_item);
920 goto loop;
921 }
922
923 /* Parent done scanning, and if finish processing all the docs, exit */
924 if (psort_info->state == FTS_PARENT_COMPLETE) {
925 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
926 goto exit;
927 } else if (retried > 10000) {
928 ut_ad(!doc_item);
929 /* retried too many times and cannot get new record */
930 ib::error() << "FTS parallel sort processed "
931 << num_doc_processed
932 << " records, the sort queue has "
933 << UT_LIST_GET_LEN(psort_info->fts_doc_list)
934 << " records. But sort cannot get the next"
935 " records";
936 goto exit;
937 }
938 } else if (psort_info->state == FTS_PARENT_EXITING) {
939 /* Parent abort */
940 goto func_exit;
941 }
942
943 if (doc_item == NULL) {
944 os_thread_yield();
945 }
946
947 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
948
949 if (doc_item != NULL) {
950 if (last_doc_id != doc_item->doc_id) {
951 t_ctx.init_pos = 0;
952 }
953
954 retried = 0;
955 } else if (psort_info->state == FTS_PARENT_COMPLETE) {
956 retried++;
957 }
958
959 goto loop;
960
961exit:
962 /* Do a final sort of the last (or latest) batch of records
963 in block memory. Flush them to temp file if records cannot
964 be hold in one block memory */
965 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
966 if (t_ctx.rows_added[i]) {
967 row_merge_buf_sort(buf[i], NULL);
968 row_merge_buf_write(
969 buf[i], merge_file[i], block[i]);
970
971 /* Write to temp file, only if records have
972 been flushed to temp file before (offset > 0):
973 The pseudo code for sort is following:
974
975 while (there are rows) {
976 tokenize rows, put result in block[]
977 if (block[] runs out) {
978 sort rows;
979 write to temp file with
980 row_merge_write();
981 offset++;
982 }
983 }
984
985 # write out the last batch
986 if (offset > 0) {
987 row_merge_write();
988 offset++;
989 } else {
990 # no need to write anything
991 offset stay as 0
992 }
993
994 so if merge_file[i]->offset is 0 when we come to
995 here as the last batch, this means rows have
996 never flush to temp file, it can be held all in
997 memory */
998 if (merge_file[i]->offset != 0) {
999 if (!row_merge_write(merge_file[i]->fd,
1000 merge_file[i]->offset++,
1001 block[i],
1002 crypt_block[i],
1003 table->space->id)) {
1004 error = DB_TEMP_FILE_WRITE_FAIL;
1005 goto func_exit;
1006 }
1007
1008 UNIV_MEM_INVALID(block[i][0],
1009 srv_sort_buf_size);
1010
1011 if (crypt_block[i]) {
1012 UNIV_MEM_INVALID(crypt_block[i][0],
1013 srv_sort_buf_size);
1014 }
1015 }
1016
1017 buf[i] = row_merge_buf_empty(buf[i]);
1018 t_ctx.rows_added[i] = 0;
1019 }
1020 }
1021
1022 if (fts_enable_diag_print) {
1023 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: start merge sort\n");
1024 }
1025
1026 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1027 if (!merge_file[i]->offset) {
1028 continue;
1029 }
1030
1031 tmpfd[i] = row_merge_file_create_low(path);
1032 if (tmpfd[i] == OS_FILE_CLOSED) {
1033 error = DB_OUT_OF_MEMORY;
1034 goto func_exit;
1035 }
1036
1037 error = row_merge_sort(psort_info->psort_common->trx,
1038 psort_info->psort_common->dup,
1039 merge_file[i], block[i], &tmpfd[i],
1040 false, 0.0/* pct_progress */, 0.0/* pct_cost */,
1041 crypt_block[i], table->space->id);
1042
1043 if (error != DB_SUCCESS) {
1044 os_file_close(tmpfd[i]);
1045 goto func_exit;
1046 }
1047
1048 total_rec += merge_file[i]->n_rec;
1049 os_file_close(tmpfd[i]);
1050 }
1051
1052func_exit:
1053 if (fts_enable_diag_print) {
1054 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n");
1055 }
1056
1057 mem_heap_free(blob_heap);
1058
1059 mutex_enter(&psort_info->mutex);
1060 psort_info->error = error;
1061 mutex_exit(&psort_info->mutex);
1062
1063 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
1064 /* child can exit either with error or told by parent. */
1065 ut_ad(error != DB_SUCCESS
1066 || psort_info->state == FTS_PARENT_EXITING);
1067 }
1068
1069 /* Free fts doc list in case of error. */
1070 do {
1071 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
1072 } while (doc_item != NULL);
1073
1074 psort_info->child_status = FTS_CHILD_COMPLETE;
1075 os_event_set(psort_info->psort_common->sort_event);
1076 psort_info->child_status = FTS_CHILD_EXITING;
1077
1078 os_thread_exit();
1079
1080 OS_THREAD_DUMMY_RETURN;
1081}
1082
1083/*********************************************************************//**
1084Start the parallel tokenization and parallel merge sort */
1085void
1086row_fts_start_psort(
1087/*================*/
1088 fts_psort_t* psort_info) /*!< parallel sort structure */
1089{
1090 ulint i = 0;
1091 os_thread_id_t thd_id;
1092
1093 for (i = 0; i < fts_sort_pll_degree; i++) {
1094 psort_info[i].psort_id = i;
1095 psort_info[i].thread_hdl =
1096 os_thread_create(fts_parallel_tokenization,
1097 (void*) &psort_info[i],
1098 &thd_id);
1099 }
1100}
1101
1102/*********************************************************************//**
1103Function performs the merge and insertion of the sorted records.
1104@return OS_THREAD_DUMMY_RETURN */
1105static
1106os_thread_ret_t
1107DECLARE_THREAD(fts_parallel_merge)(
1108/*===============*/
1109 void* arg) /*!< in: parallel merge info */
1110{
1111 fts_psort_t* psort_info = (fts_psort_t*) arg;
1112 ulint id;
1113
1114 ut_ad(psort_info);
1115
1116 id = psort_info->psort_id;
1117
1118 row_fts_merge_insert(psort_info->psort_common->dup->index,
1119 psort_info->psort_common->new_table,
1120 psort_info->psort_common->all_info, id);
1121
1122 psort_info->child_status = FTS_CHILD_COMPLETE;
1123 os_event_set(psort_info->psort_common->merge_event);
1124 psort_info->child_status = FTS_CHILD_EXITING;
1125
1126 os_thread_exit(false);
1127
1128 OS_THREAD_DUMMY_RETURN;
1129}
1130
1131/*********************************************************************//**
1132Kick off the parallel merge and insert thread */
1133void
1134row_fts_start_parallel_merge(
1135/*=========================*/
1136 fts_psort_t* merge_info) /*!< in: parallel sort info */
1137{
1138 ulint i = 0;
1139
1140 /* Kick off merge/insert threads */
1141 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1142 merge_info[i].psort_id = i;
1143 merge_info[i].child_status = 0;
1144
1145 merge_info[i].thread_hdl = os_thread_create(
1146 fts_parallel_merge,
1147 (void*) &merge_info[i],
1148 &merge_info[i].thread_hdl);
1149 }
1150}
1151
1152/**
1153Write out a single word's data as new entry/entries in the INDEX table.
1154@param[in] ins_ctx insert context
1155@param[in] word word string
1156@param[in] node node colmns
1157@return DB_SUCCUESS if insertion runs fine, otherwise error code */
1158static
1159dberr_t
1160row_merge_write_fts_node(
1161 const fts_psort_insert_t* ins_ctx,
1162 const fts_string_t* word,
1163 const fts_node_t* node)
1164{
1165 dtuple_t* tuple;
1166 dfield_t* field;
1167 dberr_t ret = DB_SUCCESS;
1168 doc_id_t write_first_doc_id[8];
1169 doc_id_t write_last_doc_id[8];
1170 ib_uint32_t write_doc_count;
1171
1172 tuple = ins_ctx->tuple;
1173
1174 /* The first field is the tokenized word */
1175 field = dtuple_get_nth_field(tuple, 0);
1176 dfield_set_data(field, word->f_str, word->f_len);
1177
1178 /* The second field is first_doc_id */
1179 field = dtuple_get_nth_field(tuple, 1);
1180 fts_write_doc_id((byte*)&write_first_doc_id, node->first_doc_id);
1181 dfield_set_data(field, &write_first_doc_id, sizeof(doc_id_t));
1182
1183 /* The third and fourth fileds(TRX_ID, ROLL_PTR) are filled already.*/
1184 /* The fifth field is last_doc_id */
1185 field = dtuple_get_nth_field(tuple, 4);
1186 fts_write_doc_id((byte*)&write_last_doc_id, node->last_doc_id);
1187 dfield_set_data(field, &write_last_doc_id, sizeof(doc_id_t));
1188
1189 /* The sixth field is doc_count */
1190 field = dtuple_get_nth_field(tuple, 5);
1191 mach_write_to_4((byte*)&write_doc_count, (ib_uint32_t)node->doc_count);
1192 dfield_set_data(field, &write_doc_count, sizeof(ib_uint32_t));
1193
1194 /* The seventh field is ilist */
1195 field = dtuple_get_nth_field(tuple, 6);
1196 dfield_set_data(field, node->ilist, node->ilist_size);
1197
1198 ret = ins_ctx->btr_bulk->insert(tuple);
1199
1200 return(ret);
1201}
1202
1203/********************************************************************//**
1204Insert processed FTS data to auxillary index tables.
1205@return DB_SUCCESS if insertion runs fine */
1206static MY_ATTRIBUTE((nonnull))
1207dberr_t
1208row_merge_write_fts_word(
1209/*=====================*/
1210 fts_psort_insert_t* ins_ctx, /*!< in: insert context */
1211 fts_tokenizer_word_t* word) /*!< in: sorted and tokenized
1212 word */
1213{
1214 dberr_t ret = DB_SUCCESS;
1215
1216 ut_ad(ins_ctx->aux_index_id == fts_select_index(
1217 ins_ctx->charset, word->text.f_str, word->text.f_len));
1218
1219 /* Pop out each fts_node in word->nodes write them to auxiliary table */
1220 for (ulint i = 0; i < ib_vector_size(word->nodes); i++) {
1221 dberr_t error;
1222 fts_node_t* fts_node;
1223
1224 fts_node = static_cast<fts_node_t*>(ib_vector_get(word->nodes, i));
1225
1226 error = row_merge_write_fts_node(ins_ctx, &word->text, fts_node);
1227
1228 if (error != DB_SUCCESS) {
1229 ib::error() << "Failed to write word "
1230 << word->text.f_str << " to FTS auxiliary"
1231 " index table, error (" << ut_strerr(error)
1232 << ")";
1233 ret = error;
1234 }
1235
1236 ut_free(fts_node->ilist);
1237 fts_node->ilist = NULL;
1238 }
1239
1240 ib_vector_reset(word->nodes);
1241
1242 return(ret);
1243}
1244
1245/*********************************************************************//**
1246Read sorted FTS data files and insert data tuples to auxillary tables.
1247@return DB_SUCCESS or error number */
1248static
1249void
1250row_fts_insert_tuple(
1251/*=================*/
1252 fts_psort_insert_t*
1253 ins_ctx, /*!< in: insert context */
1254 fts_tokenizer_word_t* word, /*!< in: last processed
1255 tokenized word */
1256 ib_vector_t* positions, /*!< in: word position */
1257 doc_id_t* in_doc_id, /*!< in: last item doc id */
1258 dtuple_t* dtuple) /*!< in: entry to insert */
1259{
1260 fts_node_t* fts_node = NULL;
1261 dfield_t* dfield;
1262 doc_id_t doc_id;
1263 ulint position;
1264 fts_string_t token_word;
1265 ulint i;
1266
1267 /* Get fts_node for the FTS auxillary INDEX table */
1268 if (ib_vector_size(word->nodes) > 0) {
1269 fts_node = static_cast<fts_node_t*>(
1270 ib_vector_last(word->nodes));
1271 }
1272
1273 if (fts_node == NULL
1274 || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1275
1276 fts_node = static_cast<fts_node_t*>(
1277 ib_vector_push(word->nodes, NULL));
1278
1279 memset(fts_node, 0x0, sizeof(*fts_node));
1280 }
1281
1282 /* If dtuple == NULL, this is the last word to be processed */
1283 if (!dtuple) {
1284 if (fts_node && ib_vector_size(positions) > 0) {
1285 fts_cache_node_add_positions(
1286 NULL, fts_node, *in_doc_id,
1287 positions);
1288
1289 /* Write out the current word */
1290 row_merge_write_fts_word(ins_ctx, word);
1291 }
1292
1293 return;
1294 }
1295
1296 /* Get the first field for the tokenized word */
1297 dfield = dtuple_get_nth_field(dtuple, 0);
1298
1299 token_word.f_n_char = 0;
1300 token_word.f_len = dfield->len;
1301 token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1302
1303 if (!word->text.f_str) {
1304 fts_string_dup(&word->text, &token_word, ins_ctx->heap);
1305 }
1306
1307 /* compare to the last word, to see if they are the same
1308 word */
1309 if (innobase_fts_text_cmp(ins_ctx->charset,
1310 &word->text, &token_word) != 0) {
1311 ulint num_item;
1312
1313 /* Getting a new word, flush the last position info
1314 for the currnt word in fts_node */
1315 if (ib_vector_size(positions) > 0) {
1316 fts_cache_node_add_positions(
1317 NULL, fts_node, *in_doc_id, positions);
1318 }
1319
1320 /* Write out the current word */
1321 row_merge_write_fts_word(ins_ctx, word);
1322
1323 /* Copy the new word */
1324 fts_string_dup(&word->text, &token_word, ins_ctx->heap);
1325
1326 num_item = ib_vector_size(positions);
1327
1328 /* Clean up position queue */
1329 for (i = 0; i < num_item; i++) {
1330 ib_vector_pop(positions);
1331 }
1332
1333 /* Reset Doc ID */
1334 *in_doc_id = 0;
1335 memset(fts_node, 0x0, sizeof(*fts_node));
1336 }
1337
1338 /* Get the word's Doc ID */
1339 dfield = dtuple_get_nth_field(dtuple, 1);
1340
1341 if (!ins_ctx->opt_doc_id_size) {
1342 doc_id = fts_read_doc_id(
1343 static_cast<byte*>(dfield_get_data(dfield)));
1344 } else {
1345 doc_id = (doc_id_t) mach_read_from_4(
1346 static_cast<byte*>(dfield_get_data(dfield)));
1347 }
1348
1349 /* Get the word's position info */
1350 dfield = dtuple_get_nth_field(dtuple, 2);
1351 position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1352
1353 /* If this is the same word as the last word, and they
1354 have the same Doc ID, we just need to add its position
1355 info. Otherwise, we will flush position info to the
1356 fts_node and initiate a new position vector */
1357 if (!(*in_doc_id) || *in_doc_id == doc_id) {
1358 ib_vector_push(positions, &position);
1359 } else {
1360 ulint num_pos = ib_vector_size(positions);
1361
1362 fts_cache_node_add_positions(NULL, fts_node,
1363 *in_doc_id, positions);
1364 for (i = 0; i < num_pos; i++) {
1365 ib_vector_pop(positions);
1366 }
1367 ib_vector_push(positions, &position);
1368 }
1369
1370 /* record the current Doc ID */
1371 *in_doc_id = doc_id;
1372}
1373
1374/*********************************************************************//**
1375Propagate a newly added record up one level in the selection tree
1376@return parent where this value propagated to */
1377static
1378ulint
1379row_fts_sel_tree_propagate(
1380/*=======================*/
1381 ulint propogated, /*<! in: tree node propagated */
1382 int* sel_tree, /*<! in: selection tree */
1383 const mrec_t** mrec, /*<! in: sort record */
1384 ulint** offsets, /*<! in: record offsets */
1385 dict_index_t* index) /*<! in/out: FTS index */
1386{
1387 ulint parent;
1388 int child_left;
1389 int child_right;
1390 int selected;
1391
1392 /* Find which parent this value will be propagated to */
1393 parent = (propogated - 1) / 2;
1394
1395 /* Find out which value is smaller, and to propagate */
1396 child_left = sel_tree[parent * 2 + 1];
1397 child_right = sel_tree[parent * 2 + 2];
1398
1399 if (child_left == -1 || mrec[child_left] == NULL) {
1400 if (child_right == -1
1401 || mrec[child_right] == NULL) {
1402 selected = -1;
1403 } else {
1404 selected = child_right ;
1405 }
1406 } else if (child_right == -1
1407 || mrec[child_right] == NULL) {
1408 selected = child_left;
1409 } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1410 offsets[child_left],
1411 offsets[child_right],
1412 index, NULL) < 0) {
1413 selected = child_left;
1414 } else {
1415 selected = child_right;
1416 }
1417
1418 sel_tree[parent] = selected;
1419
1420 return parent;
1421}
1422
1423/*********************************************************************//**
1424Readjust selection tree after popping the root and read a new value
1425@return the new root */
1426static
1427int
1428row_fts_sel_tree_update(
1429/*====================*/
1430 int* sel_tree, /*<! in/out: selection tree */
1431 ulint propagated, /*<! in: node to propagate up */
1432 ulint height, /*<! in: tree height */
1433 const mrec_t** mrec, /*<! in: sort record */
1434 ulint** offsets, /*<! in: record offsets */
1435 dict_index_t* index) /*<! in: index dictionary */
1436{
1437 ulint i;
1438
1439 for (i = 1; i <= height; i++) {
1440 propagated = row_fts_sel_tree_propagate(
1441 propagated, sel_tree, mrec, offsets, index);
1442 }
1443
1444 return(sel_tree[0]);
1445}
1446
1447/*********************************************************************//**
1448Build selection tree at a specified level */
1449static
1450void
1451row_fts_build_sel_tree_level(
1452/*=========================*/
1453 int* sel_tree, /*<! in/out: selection tree */
1454 ulint level, /*<! in: selection tree level */
1455 const mrec_t** mrec, /*<! in: sort record */
1456 ulint** offsets, /*<! in: record offsets */
1457 dict_index_t* index) /*<! in: index dictionary */
1458{
1459 ulint start;
1460 int child_left;
1461 int child_right;
1462 ulint i;
1463 ulint num_item = ulint(1) << level;
1464
1465 start = num_item - 1;
1466
1467 for (i = 0; i < num_item; i++) {
1468 child_left = sel_tree[(start + i) * 2 + 1];
1469 child_right = sel_tree[(start + i) * 2 + 2];
1470
1471 if (child_left == -1) {
1472 if (child_right == -1) {
1473 sel_tree[start + i] = -1;
1474 } else {
1475 sel_tree[start + i] = child_right;
1476 }
1477 continue;
1478 } else if (child_right == -1) {
1479 sel_tree[start + i] = child_left;
1480 continue;
1481 }
1482
1483 /* Deal with NULL child conditions */
1484 if (!mrec[child_left]) {
1485 if (!mrec[child_right]) {
1486 sel_tree[start + i] = -1;
1487 } else {
1488 sel_tree[start + i] = child_right;
1489 }
1490 continue;
1491 } else if (!mrec[child_right]) {
1492 sel_tree[start + i] = child_left;
1493 continue;
1494 }
1495
1496 /* Select the smaller one to set parent pointer */
1497 int cmp = cmp_rec_rec_simple(
1498 mrec[child_left], mrec[child_right],
1499 offsets[child_left], offsets[child_right],
1500 index, NULL);
1501
1502 sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1503 }
1504}
1505
1506/*********************************************************************//**
1507Build a selection tree for merge. The selection tree is a binary tree
1508and should have fts_sort_pll_degree / 2 levels. With root as level 0
1509@return number of tree levels */
1510static
1511ulint
1512row_fts_build_sel_tree(
1513/*===================*/
1514 int* sel_tree, /*<! in/out: selection tree */
1515 const mrec_t** mrec, /*<! in: sort record */
1516 ulint** offsets, /*<! in: record offsets */
1517 dict_index_t* index) /*<! in: index dictionary */
1518{
1519 ulint treelevel = 1;
1520 ulint num = 2;
1521 ulint i = 0;
1522 ulint start;
1523
1524 /* No need to build selection tree if we only have two merge threads */
1525 if (fts_sort_pll_degree <= 2) {
1526 return(0);
1527 }
1528
1529 while (num < fts_sort_pll_degree) {
1530 num = num << 1;
1531 treelevel++;
1532 }
1533
1534 start = (ulint(1) << treelevel) - 1;
1535
1536 for (i = 0; i < fts_sort_pll_degree; i++) {
1537 sel_tree[i + start] = int(i);
1538 }
1539
1540 for (i = treelevel; --i; ) {
1541 row_fts_build_sel_tree_level(
1542 sel_tree, i, mrec, offsets, index);
1543 }
1544
1545 return(treelevel);
1546}
1547
1548/*********************************************************************//**
1549Read sorted file containing index data tuples and insert these data
1550tuples to the index
1551@return DB_SUCCESS or error number */
1552dberr_t
1553row_fts_merge_insert(
1554/*=================*/
1555 dict_index_t* index, /*!< in: index */
1556 dict_table_t* table, /*!< in: new table */
1557 fts_psort_t* psort_info, /*!< parallel sort info */
1558 ulint id) /* !< in: which auxiliary table's data
1559 to insert to */
1560{
1561 const byte** b;
1562 mem_heap_t* tuple_heap;
1563 mem_heap_t* heap;
1564 dberr_t error = DB_SUCCESS;
1565 ulint* foffs;
1566 ulint** offsets;
1567 fts_tokenizer_word_t new_word;
1568 ib_vector_t* positions;
1569 doc_id_t last_doc_id;
1570 ib_alloc_t* heap_alloc;
1571 ulint i;
1572 mrec_buf_t** buf;
1573 pfs_os_file_t* fd;
1574 byte** block;
1575 byte** crypt_block;
1576 const mrec_t** mrec;
1577 ulint count = 0;
1578 int* sel_tree;
1579 ulint height;
1580 ulint start;
1581 fts_psort_insert_t ins_ctx;
1582 uint64_t count_diag = 0;
1583 fts_table_t fts_table;
1584 char aux_table_name[MAX_FULL_NAME_LEN];
1585 dict_table_t* aux_table;
1586 dict_index_t* aux_index;
1587 trx_t* trx;
1588 byte trx_id_buf[6];
1589 roll_ptr_t roll_ptr = 0;
1590 dfield_t* field;
1591
1592 ut_ad(index);
1593 ut_ad(table);
1594
1595 /* We use the insert query graph as the dummy graph
1596 needed in the row module call */
1597
1598 trx = trx_create();
1599 trx_start_if_not_started(trx, true);
1600
1601 trx->op_info = "inserting index entries";
1602
1603 ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1604
1605 heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1606
1607 b = (const byte**) mem_heap_alloc(
1608 heap, sizeof (*b) * fts_sort_pll_degree);
1609 foffs = (ulint*) mem_heap_alloc(
1610 heap, sizeof(*foffs) * fts_sort_pll_degree);
1611 offsets = (ulint**) mem_heap_alloc(
1612 heap, sizeof(*offsets) * fts_sort_pll_degree);
1613 buf = (mrec_buf_t**) mem_heap_alloc(
1614 heap, sizeof(*buf) * fts_sort_pll_degree);
1615 fd = (pfs_os_file_t*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1616 block = (byte**) mem_heap_alloc(
1617 heap, sizeof(*block) * fts_sort_pll_degree);
1618 crypt_block = (byte**) mem_heap_alloc(
1619 heap, sizeof(*block) * fts_sort_pll_degree);
1620 mrec = (const mrec_t**) mem_heap_alloc(
1621 heap, sizeof(*mrec) * fts_sort_pll_degree);
1622 sel_tree = (int*) mem_heap_alloc(
1623 heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1624
1625 tuple_heap = mem_heap_create(1000);
1626
1627 ins_ctx.charset = fts_index_get_charset(index);
1628 ins_ctx.heap = heap;
1629
1630 for (i = 0; i < fts_sort_pll_degree; i++) {
1631 ulint num;
1632
1633 num = 1 + REC_OFFS_HEADER_SIZE
1634 + dict_index_get_n_fields(index);
1635 offsets[i] = static_cast<ulint*>(mem_heap_zalloc(
1636 heap, num * sizeof *offsets[i]));
1637 offsets[i][0] = num;
1638 offsets[i][1] = dict_index_get_n_fields(index);
1639 block[i] = psort_info[i].merge_block[id];
1640 crypt_block[i] = psort_info[i].crypt_block[id];
1641 b[i] = psort_info[i].merge_block[id];
1642 fd[i] = psort_info[i].merge_file[id]->fd;
1643 foffs[i] = 0;
1644
1645 buf[i] = static_cast<mrec_buf_t*>(
1646 mem_heap_alloc(heap, sizeof *buf[i]));
1647
1648 count_diag += psort_info[i].merge_file[id]->n_rec;
1649 }
1650
1651 if (fts_enable_diag_print) {
1652 ib::info() << "InnoDB_FTS: to insert " << count_diag
1653 << " records";
1654 }
1655
1656 /* Initialize related variables if creating FTS indexes */
1657 heap_alloc = ib_heap_allocator_create(heap);
1658
1659 memset(&new_word, 0, sizeof(new_word));
1660
1661 new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1662 positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1663 last_doc_id = 0;
1664
1665 /* We should set the flags2 with aux_table_name here,
1666 in order to get the correct aux table names. */
1667 index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1668 DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1669 index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;);
1670 fts_table.type = FTS_INDEX_TABLE;
1671 fts_table.index_id = index->id;
1672 fts_table.table_id = table->id;
1673 fts_table.parent = index->table->name.m_name;
1674 fts_table.table = index->table;
1675 fts_table.suffix = fts_get_suffix(id);
1676
1677 /* Get aux index */
1678 fts_get_table_name(&fts_table, aux_table_name);
1679 aux_table = dict_table_open_on_name(aux_table_name, FALSE, FALSE,
1680 DICT_ERR_IGNORE_NONE);
1681 ut_ad(aux_table != NULL);
1682 dict_table_close(aux_table, FALSE, FALSE);
1683 aux_index = dict_table_get_first_index(aux_table);
1684
1685 ut_ad(!aux_index->is_instant());
1686 /* row_merge_write_fts_node() depends on the correct value */
1687 ut_ad(aux_index->n_core_null_bytes
1688 == UT_BITS_IN_BYTES(aux_index->n_nullable));
1689
1690 FlushObserver* observer;
1691 observer = psort_info[0].psort_common->trx->flush_observer;
1692
1693 /* Create bulk load instance */
1694 ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx->id, observer));
1695 ins_ctx.btr_bulk->init();
1696
1697 /* Create tuple for insert */
1698 ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index));
1699 dict_index_copy_types(ins_ctx.tuple, aux_index,
1700 dict_index_get_n_fields(aux_index));
1701
1702 /* Set TRX_ID and ROLL_PTR */
1703 trx_write_trx_id(trx_id_buf, trx->id);
1704 field = dtuple_get_nth_field(ins_ctx.tuple, 2);
1705 dfield_set_data(field, &trx_id_buf, 6);
1706
1707 field = dtuple_get_nth_field(ins_ctx.tuple, 3);
1708 dfield_set_data(field, &roll_ptr, 7);
1709
1710#ifdef UNIV_DEBUG
1711 ins_ctx.aux_index_id = id;
1712#endif
1713 const ulint space = table->space->id;
1714
1715 for (i = 0; i < fts_sort_pll_degree; i++) {
1716 if (psort_info[i].merge_file[id]->n_rec == 0) {
1717 /* No Rows to read */
1718 mrec[i] = b[i] = NULL;
1719 } else {
1720 /* Read from temp file only if it has been
1721 written to. Otherwise, block memory holds
1722 all the sorted records */
1723 if (psort_info[i].merge_file[id]->offset > 0
1724 && (!row_merge_read(
1725 fd[i], foffs[i],
1726 (row_merge_block_t*) block[i],
1727 (row_merge_block_t*) crypt_block[i],
1728 space))) {
1729 error = DB_CORRUPTION;
1730 goto exit;
1731 }
1732
1733 ROW_MERGE_READ_GET_NEXT(i);
1734 }
1735 }
1736
1737 height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1738 offsets, index);
1739
1740 start = (1U << height) - 1;
1741
1742 /* Fetch sorted records from sort buffer and insert them into
1743 corresponding FTS index auxiliary tables */
1744 for (;;) {
1745 dtuple_t* dtuple;
1746 ulint n_ext;
1747 int min_rec = 0;
1748
1749 if (fts_sort_pll_degree <= 2) {
1750 while (!mrec[min_rec]) {
1751 min_rec++;
1752
1753 if (min_rec >= (int) fts_sort_pll_degree) {
1754 row_fts_insert_tuple(
1755 &ins_ctx, &new_word,
1756 positions, &last_doc_id,
1757 NULL);
1758
1759 goto exit;
1760 }
1761 }
1762
1763 for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1764 if (!mrec[i]) {
1765 continue;
1766 }
1767
1768 if (cmp_rec_rec_simple(
1769 mrec[i], mrec[min_rec],
1770 offsets[i], offsets[min_rec],
1771 index, NULL) < 0) {
1772 min_rec = static_cast<int>(i);
1773 }
1774 }
1775 } else {
1776 min_rec = sel_tree[0];
1777
1778 if (min_rec == -1) {
1779 row_fts_insert_tuple(
1780 &ins_ctx, &new_word,
1781 positions, &last_doc_id,
1782 NULL);
1783
1784 goto exit;
1785 }
1786 }
1787
1788 dtuple = row_rec_to_index_entry_low(
1789 mrec[min_rec], index, offsets[min_rec], &n_ext,
1790 tuple_heap);
1791
1792 row_fts_insert_tuple(
1793 &ins_ctx, &new_word, positions,
1794 &last_doc_id, dtuple);
1795
1796
1797 ROW_MERGE_READ_GET_NEXT(min_rec);
1798
1799 if (fts_sort_pll_degree > 2) {
1800 if (!mrec[min_rec]) {
1801 sel_tree[start + min_rec] = -1;
1802 }
1803
1804 row_fts_sel_tree_update(sel_tree, start + min_rec,
1805 height, mrec,
1806 offsets, index);
1807 }
1808
1809 count++;
1810
1811 mem_heap_empty(tuple_heap);
1812 }
1813
1814exit:
1815 fts_sql_commit(trx);
1816
1817 trx->op_info = "";
1818
1819 mem_heap_free(tuple_heap);
1820
1821 error = ins_ctx.btr_bulk->finish(error);
1822 UT_DELETE(ins_ctx.btr_bulk);
1823
1824 trx_free(trx);
1825
1826 mem_heap_free(heap);
1827
1828 if (fts_enable_diag_print) {
1829 ib::info() << "InnoDB_FTS: inserted " << count << " records";
1830 }
1831
1832 return(error);
1833}
1834