1/*****************************************************************************
2
3Copyright (c) 2007, 2018, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2016, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/******************************************************************//**
21@file fts/fts0opt.cc
22Full Text Search optimize thread
23
24Created 2007/03/27 Sunny Bains
25Completed 2011/7/10 Sunny and Jimmy Yang
26
27***********************************************************************/
28
29#include "ha_prototypes.h"
30
31#include "fts0fts.h"
32#include "row0sel.h"
33#include "que0types.h"
34#include "fts0priv.h"
35#include "fts0types.h"
36#include "ut0wqueue.h"
37#include "srv0start.h"
38#include "ut0list.h"
39#include "zlib.h"
40
41/** The FTS optimize thread's work queue. */
42static ib_wqueue_t* fts_optimize_wq;
43
44/** The FTS vector to store fts_slot_t */
45static ib_vector_t* fts_slots;
46
47/** Time to wait for a message. */
48static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
49
50/** Default optimize interval in secs. */
51static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
52
53/** Server is shutting down, so does we exiting the optimize thread */
54static bool fts_opt_start_shutdown = false;
55
56/** Event to wait for shutdown of the optimize thread */
57static os_event_t fts_opt_shutdown_event = NULL;
58
59/** Initial size of nodes in fts_word_t. */
60static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
61
62/** Last time we did check whether system need a sync */
63static ib_time_t last_check_sync_time;
64
65#if 0
66/** Check each table in round robin to see whether they'd
67need to be "optimized" */
68static ulint fts_optimize_sync_iterator = 0;
69#endif
70
71/** State of a table within the optimization sub system. */
72enum fts_state_t {
73 FTS_STATE_LOADED,
74 FTS_STATE_RUNNING,
75 FTS_STATE_SUSPENDED,
76 FTS_STATE_DONE,
77 FTS_STATE_EMPTY
78};
79
80/** FTS optimize thread message types. */
81enum fts_msg_type_t {
82 FTS_MSG_START, /*!< Start optimizing thread */
83
84 FTS_MSG_PAUSE, /*!< Pause optimizing thread */
85
86 FTS_MSG_STOP, /*!< Stop optimizing and exit thread */
87
88 FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
89 work queue */
90
91 FTS_MSG_OPTIMIZE_TABLE, /*!< Optimize a table */
92
93 FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
94 threads work queue */
95 FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
96};
97
98/** Compressed list of words that have been read from FTS INDEX
99that needs to be optimized. */
100struct fts_zip_t {
101 lint status; /*!< Status of (un)/zip operation */
102
103 ulint n_words; /*!< Number of words compressed */
104
105 ulint block_sz; /*!< Size of a block in bytes */
106
107 ib_vector_t* blocks; /*!< Vector of compressed blocks */
108
109 ib_alloc_t* heap_alloc; /*!< Heap to use for allocations */
110
111 ulint pos; /*!< Offset into blocks */
112
113 ulint last_big_block; /*!< Offset of last block in the
114 blocks array that is of size
115 block_sz. Blocks beyond this offset
116 are of size FTS_MAX_WORD_LEN */
117
118 z_streamp zp; /*!< ZLib state */
119
120 /*!< The value of the last word read
121 from the FTS INDEX table. This is
122 used to discard duplicates */
123
124 fts_string_t word; /*!< UTF-8 string */
125
126 ulint max_words; /*!< maximum number of words to read
127 in one pase */
128};
129
130/** Prepared statemets used during optimize */
131struct fts_optimize_graph_t {
132 /*!< Delete a word from FTS INDEX */
133 que_t* delete_nodes_graph;
134 /*!< Insert a word into FTS INDEX */
135 que_t* write_nodes_graph;
136 /*!< COMMIT a transaction */
137 que_t* commit_graph;
138 /*!< Read the nodes from FTS_INDEX */
139 que_t* read_nodes_graph;
140};
141
142/** Used by fts_optimize() to store state. */
143struct fts_optimize_t {
144 trx_t* trx; /*!< The transaction used for all SQL */
145
146 ib_alloc_t* self_heap; /*!< Heap to use for allocations */
147
148 char* name_prefix; /*!< FTS table name prefix */
149
150 fts_table_t fts_index_table;/*!< Common table definition */
151
152 /*!< Common table definition */
153 fts_table_t fts_common_table;
154
155 dict_table_t* table; /*!< Table that has to be queried */
156
157 dict_index_t* index; /*!< The FTS index to be optimized */
158
159 fts_doc_ids_t* to_delete; /*!< doc ids to delete, we check against
160 this vector and purge the matching
161 entries during the optimizing
162 process. The vector entries are
163 sorted on doc id */
164
165 ulint del_pos; /*!< Offset within to_delete vector,
166 this is used to keep track of where
167 we are up to in the vector */
168
169 ibool done; /*!< TRUE when optimize finishes */
170
171 ib_vector_t* words; /*!< Word + Nodes read from FTS_INDEX,
172 it contains instances of fts_word_t */
173
174 fts_zip_t* zip; /*!< Words read from the FTS_INDEX */
175
176 fts_optimize_graph_t /*!< Prepared statements used during */
177 graph; /*optimize */
178
179 ulint n_completed; /*!< Number of FTS indexes that have
180 been optimized */
181 ibool del_list_regenerated;
182 /*!< BEING_DELETED list regenarated */
183};
184
185/** Used by the optimize, to keep state during compacting nodes. */
186struct fts_encode_t {
187 doc_id_t src_last_doc_id;/*!< Last doc id read from src node */
188 byte* src_ilist_ptr; /*!< Current ptr within src ilist */
189};
190
191/** We use this information to determine when to start the optimize
192cycle for a table. */
193struct fts_slot_t {
194 dict_table_t* table; /*!< Table to optimize */
195
196 table_id_t table_id; /*!< Table id */
197
198 fts_state_t state; /*!< State of this slot */
199
200 ulint added; /*!< Number of doc ids added since the
201 last time this table was optimized */
202
203 ulint deleted; /*!< Number of doc ids deleted since the
204 last time this table was optimized */
205
206 ib_time_t last_run; /*!< Time last run completed */
207
208 ib_time_t completed; /*!< Optimize finish time */
209
210 ib_time_t interval_time; /*!< Minimum time to wait before
211 optimizing the table again. */
212};
213
214/** A table remove message for the FTS optimize thread. */
215struct fts_msg_del_t {
216 dict_table_t* table; /*!< The table to remove */
217
218 os_event_t event; /*!< Event to synchronize acknowledgement
219 of receipt and processing of the
220 this message by the consumer */
221};
222
223/** The FTS optimize message work queue message type. */
224struct fts_msg_t {
225 fts_msg_type_t type; /*!< Message type */
226
227 void* ptr; /*!< The message contents */
228
229 mem_heap_t* heap; /*!< The heap used to allocate this
230 message, the message consumer will
231 free the heap. */
232};
233
234/** The number of words to read and optimize in a single pass. */
235ulong fts_num_word_optimize;
236
237// FIXME
238char fts_enable_diag_print;
239
240/** ZLib compressed block size.*/
241static ulint FTS_ZIP_BLOCK_SIZE = 1024;
242
243/** The amount of time optimizing in a single pass, in milliseconds. */
244static ib_time_t fts_optimize_time_limit = 0;
245
246/** It's defined in fts0fts.cc */
247extern const char* fts_common_tables[];
248
249/** SQL Statement for changing state of rows to be deleted from FTS Index. */
250static const char* fts_init_delete_sql =
251 "BEGIN\n"
252 "\n"
253 "INSERT INTO $BEING_DELETED\n"
254 "SELECT doc_id FROM $DELETED;\n"
255 "\n"
256 "INSERT INTO $BEING_DELETED_CACHE\n"
257 "SELECT doc_id FROM $DELETED_CACHE;\n";
258
259static const char* fts_delete_doc_ids_sql =
260 "BEGIN\n"
261 "\n"
262 "DELETE FROM $DELETED WHERE doc_id = :doc_id1;\n"
263 "DELETE FROM $DELETED_CACHE WHERE doc_id = :doc_id2;\n";
264
265static const char* fts_end_delete_sql =
266 "BEGIN\n"
267 "\n"
268 "DELETE FROM $BEING_DELETED;\n"
269 "DELETE FROM $BEING_DELETED_CACHE;\n";
270
271/**********************************************************************//**
272Initialize fts_zip_t. */
273static
274void
275fts_zip_initialize(
276/*===============*/
277 fts_zip_t* zip) /*!< out: zip instance to initialize */
278{
279 zip->pos = 0;
280 zip->n_words = 0;
281
282 zip->status = Z_OK;
283
284 zip->last_big_block = 0;
285
286 zip->word.f_len = 0;
287 *zip->word.f_str = 0;
288
289 ib_vector_reset(zip->blocks);
290
291 memset(zip->zp, 0, sizeof(*zip->zp));
292}
293
294/**********************************************************************//**
295Create an instance of fts_zip_t.
296@return a new instance of fts_zip_t */
297static
298fts_zip_t*
299fts_zip_create(
300/*===========*/
301 mem_heap_t* heap, /*!< in: heap */
302 ulint block_sz, /*!< in: size of a zip block.*/
303 ulint max_words) /*!< in: max words to read */
304{
305 fts_zip_t* zip;
306
307 zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
308
309 zip->word.f_str = static_cast<byte*>(
310 mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
311
312 zip->block_sz = block_sz;
313
314 zip->heap_alloc = ib_heap_allocator_create(heap);
315
316 zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
317
318 zip->max_words = max_words;
319
320 zip->zp = static_cast<z_stream*>(
321 mem_heap_zalloc(heap, sizeof(*zip->zp)));
322
323 return(zip);
324}
325
326/**********************************************************************//**
327Initialize an instance of fts_zip_t. */
328static
329void
330fts_zip_init(
331/*=========*/
332
333 fts_zip_t* zip) /*!< in: zip instance to init */
334{
335 memset(zip->zp, 0, sizeof(*zip->zp));
336
337 zip->word.f_len = 0;
338 *zip->word.f_str = '\0';
339}
340
341/**********************************************************************//**
342Create a fts_optimizer_word_t instance.
343@return new instance */
344static
345fts_word_t*
346fts_word_init(
347/*==========*/
348 fts_word_t* word, /*!< in: word to initialize */
349 byte* utf8, /*!< in: UTF-8 string */
350 ulint len) /*!< in: length of string in bytes */
351{
352 mem_heap_t* heap = mem_heap_create(sizeof(fts_node_t));
353
354 memset(word, 0, sizeof(*word));
355
356 word->text.f_len = len;
357 word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
358
359 /* Need to copy the NUL character too. */
360 memcpy(word->text.f_str, utf8, word->text.f_len);
361 word->text.f_str[word->text.f_len] = 0;
362
363 word->heap_alloc = ib_heap_allocator_create(heap);
364
365 word->nodes = ib_vector_create(
366 word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
367
368 return(word);
369}
370
371/**********************************************************************//**
372Read the FTS INDEX row.
373@return fts_node_t instance */
374static
375fts_node_t*
376fts_optimize_read_node(
377/*===================*/
378 fts_word_t* word, /*!< in: */
379 que_node_t* exp) /*!< in: */
380{
381 int i;
382 fts_node_t* node = static_cast<fts_node_t*>(
383 ib_vector_push(word->nodes, NULL));
384
385 /* Start from 1 since the first node has been read by the caller */
386 for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
387
388 dfield_t* dfield = que_node_get_val(exp);
389 byte* data = static_cast<byte*>(
390 dfield_get_data(dfield));
391 ulint len = dfield_get_len(dfield);
392
393 ut_a(len != UNIV_SQL_NULL);
394
395 /* Note: The column numbers below must match the SELECT */
396 switch (i) {
397 case 1: /* DOC_COUNT */
398 node->doc_count = mach_read_from_4(data);
399 break;
400
401 case 2: /* FIRST_DOC_ID */
402 node->first_doc_id = fts_read_doc_id(data);
403 break;
404
405 case 3: /* LAST_DOC_ID */
406 node->last_doc_id = fts_read_doc_id(data);
407 break;
408
409 case 4: /* ILIST */
410 node->ilist_size_alloc = node->ilist_size = len;
411 node->ilist = static_cast<byte*>(ut_malloc_nokey(len));
412 memcpy(node->ilist, data, len);
413 break;
414
415 default:
416 ut_error;
417 }
418 }
419
420 /* Make sure all columns were read. */
421 ut_a(i == 5);
422
423 return(node);
424}
425
426/**********************************************************************//**
427Callback function to fetch the rows in an FTS INDEX record.
428@return always returns non-NULL */
429ibool
430fts_optimize_index_fetch_node(
431/*==========================*/
432 void* row, /*!< in: sel_node_t* */
433 void* user_arg) /*!< in: pointer to ib_vector_t */
434{
435 fts_word_t* word;
436 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
437 fts_fetch_t* fetch = static_cast<fts_fetch_t*>(user_arg);
438 ib_vector_t* words = static_cast<ib_vector_t*>(fetch->read_arg);
439 que_node_t* exp = sel_node->select_list;
440 dfield_t* dfield = que_node_get_val(exp);
441 void* data = dfield_get_data(dfield);
442 ulint dfield_len = dfield_get_len(dfield);
443 fts_node_t* node;
444 bool is_word_init = false;
445
446 ut_a(dfield_len <= FTS_MAX_WORD_LEN);
447
448 if (ib_vector_size(words) == 0) {
449
450 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
451 fts_word_init(word, (byte*) data, dfield_len);
452 is_word_init = true;
453 }
454
455 word = static_cast<fts_word_t*>(ib_vector_last(words));
456
457 if (dfield_len != word->text.f_len
458 || memcmp(word->text.f_str, data, dfield_len)) {
459
460 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
461 fts_word_init(word, (byte*) data, dfield_len);
462 is_word_init = true;
463 }
464
465 node = fts_optimize_read_node(word, que_node_get_next(exp));
466
467 fetch->total_memory += node->ilist_size;
468 if (is_word_init) {
469 fetch->total_memory += sizeof(fts_word_t)
470 + sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
471 + sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
472 } else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
473 fetch->total_memory += sizeof(fts_node_t);
474 }
475
476 if (fetch->total_memory >= fts_result_cache_limit) {
477 return(FALSE);
478 }
479
480 return(TRUE);
481}
482
483/**********************************************************************//**
484Read the rows from the FTS inde.
485@return DB_SUCCESS or error code */
486dberr_t
487fts_index_fetch_nodes(
488/*==================*/
489 trx_t* trx, /*!< in: transaction */
490 que_t** graph, /*!< in: prepared statement */
491 fts_table_t* fts_table, /*!< in: table of the FTS INDEX */
492 const fts_string_t*
493 word, /*!< in: the word to fetch */
494 fts_fetch_t* fetch) /*!< in: fetch callback.*/
495{
496 pars_info_t* info;
497 dberr_t error;
498 char table_name[MAX_FULL_NAME_LEN];
499
500 trx->op_info = "fetching FTS index nodes";
501
502 if (*graph) {
503 info = (*graph)->info;
504 } else {
505 ulint selected;
506
507 info = pars_info_create();
508
509 ut_a(fts_table->type == FTS_INDEX_TABLE);
510
511 selected = fts_select_index(fts_table->charset,
512 word->f_str, word->f_len);
513
514 fts_table->suffix = fts_get_suffix(selected);
515
516 fts_get_table_name(fts_table, table_name);
517
518 pars_info_bind_id(info, true, "table_name", table_name);
519 }
520
521 pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
522 pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
523
524 if (!*graph) {
525
526 *graph = fts_parse_sql(
527 fts_table,
528 info,
529 "DECLARE FUNCTION my_func;\n"
530 "DECLARE CURSOR c IS"
531 " SELECT word, doc_count, first_doc_id, last_doc_id,"
532 " ilist\n"
533 " FROM $table_name\n"
534 " WHERE word LIKE :word\n"
535 " ORDER BY first_doc_id;\n"
536 "BEGIN\n"
537 "\n"
538 "OPEN c;\n"
539 "WHILE 1 = 1 LOOP\n"
540 " FETCH c INTO my_func();\n"
541 " IF c % NOTFOUND THEN\n"
542 " EXIT;\n"
543 " END IF;\n"
544 "END LOOP;\n"
545 "CLOSE c;");
546 }
547
548 for (;;) {
549 error = fts_eval_sql(trx, *graph);
550
551 if (error == DB_SUCCESS) {
552 fts_sql_commit(trx);
553
554 break; /* Exit the loop. */
555 } else {
556 fts_sql_rollback(trx);
557
558 if (error == DB_LOCK_WAIT_TIMEOUT) {
559 ib::warn() << "lock wait timeout reading"
560 " FTS index. Retrying!";
561
562 trx->error_state = DB_SUCCESS;
563 } else {
564 ib::error() << "(" << ut_strerr(error)
565 << ") while reading FTS index.";
566
567 break; /* Exit the loop. */
568 }
569 }
570 }
571
572 return(error);
573}
574
575/**********************************************************************//**
576Read a word */
577static
578byte*
579fts_zip_read_word(
580/*==============*/
581 fts_zip_t* zip, /*!< in: Zip state + data */
582 fts_string_t* word) /*!< out: uncompressed word */
583{
584 short len = 0;
585 void* null = NULL;
586 byte* ptr = word->f_str;
587 int flush = Z_NO_FLUSH;
588
589 /* Either there was an error or we are at the Z_STREAM_END. */
590 if (zip->status != Z_OK) {
591 return(NULL);
592 }
593
594 zip->zp->next_out = reinterpret_cast<byte*>(&len);
595 zip->zp->avail_out = sizeof(len);
596
597 while (zip->status == Z_OK && zip->zp->avail_out > 0) {
598
599 /* Finished decompressing block. */
600 if (zip->zp->avail_in == 0) {
601
602 /* Free the block thats been decompressed. */
603 if (zip->pos > 0) {
604 ulint prev = zip->pos - 1;
605
606 ut_a(zip->pos < ib_vector_size(zip->blocks));
607
608 ut_free(ib_vector_getp(zip->blocks, prev));
609 ib_vector_set(zip->blocks, prev, &null);
610 }
611
612 /* Any more blocks to decompress. */
613 if (zip->pos < ib_vector_size(zip->blocks)) {
614
615 zip->zp->next_in = static_cast<byte*>(
616 ib_vector_getp(
617 zip->blocks, zip->pos));
618
619 if (zip->pos > zip->last_big_block) {
620 zip->zp->avail_in =
621 FTS_MAX_WORD_LEN;
622 } else {
623 zip->zp->avail_in =
624 static_cast<uInt>(zip->block_sz);
625 }
626
627 ++zip->pos;
628 } else {
629 flush = Z_FINISH;
630 }
631 }
632
633 switch (zip->status = inflate(zip->zp, flush)) {
634 case Z_OK:
635 if (zip->zp->avail_out == 0 && len > 0) {
636
637 ut_a(len <= FTS_MAX_WORD_LEN);
638 ptr[len] = 0;
639
640 zip->zp->next_out = ptr;
641 zip->zp->avail_out = uInt(len);
642
643 word->f_len = ulint(len);
644 len = 0;
645 }
646 break;
647
648 case Z_BUF_ERROR: /* No progress possible. */
649 case Z_STREAM_END:
650 inflateEnd(zip->zp);
651 break;
652
653 case Z_STREAM_ERROR:
654 default:
655 ut_error;
656 }
657 }
658
659 /* All blocks must be freed at end of inflate. */
660 if (zip->status != Z_OK) {
661 for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
662 if (ib_vector_getp(zip->blocks, i)) {
663 ut_free(ib_vector_getp(zip->blocks, i));
664 ib_vector_set(zip->blocks, i, &null);
665 }
666 }
667 }
668
669 if (ptr != NULL) {
670 ut_ad(word->f_len == strlen((char*) ptr));
671 }
672
673 return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
674}
675
676/**********************************************************************//**
677Callback function to fetch and compress the word in an FTS
678INDEX record.
679@return FALSE on EOF */
680static
681ibool
682fts_fetch_index_words(
683/*==================*/
684 void* row, /*!< in: sel_node_t* */
685 void* user_arg) /*!< in: pointer to ib_vector_t */
686{
687 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
688 fts_zip_t* zip = static_cast<fts_zip_t*>(user_arg);
689 que_node_t* exp = sel_node->select_list;
690 dfield_t* dfield = que_node_get_val(exp);
691 short len = static_cast<short>(dfield_get_len(dfield));
692 void* data = dfield_get_data(dfield);
693
694 /* Skip the duplicate words. */
695 if (zip->word.f_len == static_cast<ulint>(len)
696 && !memcmp(zip->word.f_str, data, zip->word.f_len)) {
697
698 return(TRUE);
699 }
700
701 ut_a(len <= FTS_MAX_WORD_LEN);
702
703 zip->word.f_len = ulint(len);
704 memcpy(zip->word.f_str, data, zip->word.f_len);
705
706 ut_a(zip->zp->avail_in == 0);
707 ut_a(zip->zp->next_in == NULL);
708
709 /* The string is prefixed by len. */
710 zip->zp->next_in = reinterpret_cast<byte*>(&len);
711 zip->zp->avail_in = sizeof(len);
712
713 /* Compress the word, create output blocks as necessary. */
714 while (zip->zp->avail_in > 0) {
715
716 /* No space left in output buffer, create a new one. */
717 if (zip->zp->avail_out == 0) {
718 byte* block;
719
720 block = static_cast<byte*>(
721 ut_malloc_nokey(zip->block_sz));
722
723 ib_vector_push(zip->blocks, &block);
724
725 zip->zp->next_out = block;
726 zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
727 }
728
729 switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
730 case Z_OK:
731 if (zip->zp->avail_in == 0) {
732 zip->zp->next_in = static_cast<byte*>(data);
733 zip->zp->avail_in = uInt(len);
734 ut_a(len <= FTS_MAX_WORD_LEN);
735 len = 0;
736 }
737 break;
738
739 case Z_STREAM_END:
740 case Z_BUF_ERROR:
741 case Z_STREAM_ERROR:
742 default:
743 ut_error;
744 break;
745 }
746 }
747
748 /* All data should have been compressed. */
749 ut_a(zip->zp->avail_in == 0);
750 zip->zp->next_in = NULL;
751
752 ++zip->n_words;
753
754 return(zip->n_words >= zip->max_words ? FALSE : TRUE);
755}
756
757/**********************************************************************//**
758Finish Zip deflate. */
759static
760void
761fts_zip_deflate_end(
762/*================*/
763 fts_zip_t* zip) /*!< in: instance that should be closed*/
764{
765 ut_a(zip->zp->avail_in == 0);
766 ut_a(zip->zp->next_in == NULL);
767
768 zip->status = deflate(zip->zp, Z_FINISH);
769
770 ut_a(ib_vector_size(zip->blocks) > 0);
771 zip->last_big_block = ib_vector_size(zip->blocks) - 1;
772
773 /* Allocate smaller block(s), since this is trailing data. */
774 while (zip->status == Z_OK) {
775 byte* block;
776
777 ut_a(zip->zp->avail_out == 0);
778
779 block = static_cast<byte*>(
780 ut_malloc_nokey(FTS_MAX_WORD_LEN + 1));
781
782 ib_vector_push(zip->blocks, &block);
783
784 zip->zp->next_out = block;
785 zip->zp->avail_out = FTS_MAX_WORD_LEN;
786
787 zip->status = deflate(zip->zp, Z_FINISH);
788 }
789
790 ut_a(zip->status == Z_STREAM_END);
791
792 zip->status = deflateEnd(zip->zp);
793 ut_a(zip->status == Z_OK);
794
795 /* Reset the ZLib data structure. */
796 memset(zip->zp, 0, sizeof(*zip->zp));
797}
798
799/**********************************************************************//**
800Read the words from the FTS INDEX.
801@return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
802 to search else error code */
803static MY_ATTRIBUTE((nonnull, warn_unused_result))
804dberr_t
805fts_index_fetch_words(
806/*==================*/
807 fts_optimize_t* optim, /*!< in: optimize scratch pad */
808 const fts_string_t* word, /*!< in: get words greater than this
809 word */
810 ulint n_words)/*!< in: max words to read */
811{
812 pars_info_t* info;
813 que_t* graph;
814 ulint selected;
815 fts_zip_t* zip = NULL;
816 dberr_t error = DB_SUCCESS;
817 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
818 ibool inited = FALSE;
819
820 optim->trx->op_info = "fetching FTS index words";
821
822 if (optim->zip == NULL) {
823 optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
824 } else {
825 fts_zip_initialize(optim->zip);
826 }
827
828 for (selected = fts_select_index(
829 optim->fts_index_table.charset, word->f_str, word->f_len);
830 selected < FTS_NUM_AUX_INDEX;
831 selected++) {
832
833 char table_name[MAX_FULL_NAME_LEN];
834
835 optim->fts_index_table.suffix = fts_get_suffix(selected);
836
837 info = pars_info_create();
838
839 pars_info_bind_function(
840 info, "my_func", fts_fetch_index_words, optim->zip);
841
842 pars_info_bind_varchar_literal(
843 info, "word", word->f_str, word->f_len);
844
845 fts_get_table_name(&optim->fts_index_table, table_name);
846 pars_info_bind_id(info, true, "table_name", table_name);
847
848 graph = fts_parse_sql(
849 &optim->fts_index_table,
850 info,
851 "DECLARE FUNCTION my_func;\n"
852 "DECLARE CURSOR c IS"
853 " SELECT word\n"
854 " FROM $table_name\n"
855 " WHERE word > :word\n"
856 " ORDER BY word;\n"
857 "BEGIN\n"
858 "\n"
859 "OPEN c;\n"
860 "WHILE 1 = 1 LOOP\n"
861 " FETCH c INTO my_func();\n"
862 " IF c % NOTFOUND THEN\n"
863 " EXIT;\n"
864 " END IF;\n"
865 "END LOOP;\n"
866 "CLOSE c;");
867
868 zip = optim->zip;
869
870 for (;;) {
871 int err;
872
873 if (!inited && ((err = deflateInit(zip->zp, 9))
874 != Z_OK)) {
875 ib::error() << "ZLib deflateInit() failed: "
876 << err;
877
878 error = DB_ERROR;
879 break;
880 } else {
881 inited = TRUE;
882 error = fts_eval_sql(optim->trx, graph);
883 }
884
885 if (error == DB_SUCCESS) {
886 //FIXME fts_sql_commit(optim->trx);
887 break;
888 } else {
889 //FIXME fts_sql_rollback(optim->trx);
890
891 if (error == DB_LOCK_WAIT_TIMEOUT) {
892 ib::warn() << "Lock wait timeout"
893 " reading document. Retrying!";
894
895 /* We need to reset the ZLib state. */
896 inited = FALSE;
897 deflateEnd(zip->zp);
898 fts_zip_init(zip);
899
900 optim->trx->error_state = DB_SUCCESS;
901 } else {
902 ib::error() << "(" << ut_strerr(error)
903 << ") while reading document.";
904
905 break; /* Exit the loop. */
906 }
907 }
908 }
909
910 fts_que_graph_free(graph);
911
912 /* Check if max word to fetch is exceeded */
913 if (optim->zip->n_words >= n_words) {
914 break;
915 }
916 }
917
918 if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
919
920 /* All data should have been read. */
921 ut_a(zip->zp->avail_in == 0);
922
923 fts_zip_deflate_end(zip);
924 } else {
925 deflateEnd(zip->zp);
926 }
927
928 return(error);
929}
930
931/**********************************************************************//**
932Callback function to fetch the doc id from the record.
933@return always returns TRUE */
934static
935ibool
936fts_fetch_doc_ids(
937/*==============*/
938 void* row, /*!< in: sel_node_t* */
939 void* user_arg) /*!< in: pointer to ib_vector_t */
940{
941 que_node_t* exp;
942 int i = 0;
943 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
944 fts_doc_ids_t* fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
945 fts_update_t* update = static_cast<fts_update_t*>(
946 ib_vector_push(fts_doc_ids->doc_ids, NULL));
947
948 for (exp = sel_node->select_list;
949 exp;
950 exp = que_node_get_next(exp), ++i) {
951
952 dfield_t* dfield = que_node_get_val(exp);
953 void* data = dfield_get_data(dfield);
954 ulint len = dfield_get_len(dfield);
955
956 ut_a(len != UNIV_SQL_NULL);
957
958 /* Note: The column numbers below must match the SELECT. */
959 switch (i) {
960 case 0: /* DOC_ID */
961 update->fts_indexes = NULL;
962 update->doc_id = fts_read_doc_id(
963 static_cast<byte*>(data));
964 break;
965
966 default:
967 ut_error;
968 }
969 }
970
971 return(TRUE);
972}
973
974/**********************************************************************//**
975Read the rows from a FTS common auxiliary table.
976@return DB_SUCCESS or error code */
977dberr_t
978fts_table_fetch_doc_ids(
979/*====================*/
980 trx_t* trx, /*!< in: transaction */
981 fts_table_t* fts_table, /*!< in: table */
982 fts_doc_ids_t* doc_ids) /*!< in: For collecting doc ids */
983{
984 dberr_t error;
985 que_t* graph;
986 pars_info_t* info = pars_info_create();
987 ibool alloc_bk_trx = FALSE;
988 char table_name[MAX_FULL_NAME_LEN];
989
990 ut_a(fts_table->suffix != NULL);
991 ut_a(fts_table->type == FTS_COMMON_TABLE);
992
993 if (!trx) {
994 trx = trx_create();
995 alloc_bk_trx = TRUE;
996 }
997
998 trx->op_info = "fetching FTS doc ids";
999
1000 pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
1001
1002 fts_get_table_name(fts_table, table_name);
1003 pars_info_bind_id(info, true, "table_name", table_name);
1004
1005 graph = fts_parse_sql(
1006 fts_table,
1007 info,
1008 "DECLARE FUNCTION my_func;\n"
1009 "DECLARE CURSOR c IS"
1010 " SELECT doc_id FROM $table_name;\n"
1011 "BEGIN\n"
1012 "\n"
1013 "OPEN c;\n"
1014 "WHILE 1 = 1 LOOP\n"
1015 " FETCH c INTO my_func();\n"
1016 " IF c % NOTFOUND THEN\n"
1017 " EXIT;\n"
1018 " END IF;\n"
1019 "END LOOP;\n"
1020 "CLOSE c;");
1021
1022 error = fts_eval_sql(trx, graph);
1023 fts_sql_commit(trx);
1024
1025 mutex_enter(&dict_sys->mutex);
1026 que_graph_free(graph);
1027 mutex_exit(&dict_sys->mutex);
1028
1029 if (error == DB_SUCCESS) {
1030 ib_vector_sort(doc_ids->doc_ids, fts_update_doc_id_cmp);
1031 }
1032
1033 if (alloc_bk_trx) {
1034 trx_free(trx);
1035 }
1036
1037 return(error);
1038}
1039
1040/**********************************************************************//**
1041Do a binary search for a doc id in the array
1042@return +ve index if found -ve index where it should be inserted
1043 if not found */
1044int
1045fts_bsearch(
1046/*========*/
1047 fts_update_t* array, /*!< in: array to sort */
1048 int lower, /*!< in: the array lower bound */
1049 int upper, /*!< in: the array upper bound */
1050 doc_id_t doc_id) /*!< in: the doc id to search for */
1051{
1052 int orig_size = upper;
1053
1054 if (upper == 0) {
1055 /* Nothing to search */
1056 return(-1);
1057 } else {
1058 while (lower < upper) {
1059 int i = (lower + upper) >> 1;
1060
1061 if (doc_id > array[i].doc_id) {
1062 lower = i + 1;
1063 } else if (doc_id < array[i].doc_id) {
1064 upper = i - 1;
1065 } else {
1066 return(i); /* Found. */
1067 }
1068 }
1069 }
1070
1071 if (lower == upper && lower < orig_size) {
1072 if (doc_id == array[lower].doc_id) {
1073 return(lower);
1074 } else if (lower == 0) {
1075 return(-1);
1076 }
1077 }
1078
1079 /* Not found. */
1080 return( (lower == 0) ? -1 : -(lower));
1081}
1082
1083/**********************************************************************//**
1084Search in the to delete array whether any of the doc ids within
1085the [first, last] range are to be deleted
1086@return +ve index if found -ve index where it should be inserted
1087 if not found */
1088static
1089int
1090fts_optimize_lookup(
1091/*================*/
1092 ib_vector_t* doc_ids, /*!< in: array to search */
1093 ulint lower, /*!< in: lower limit of array */
1094 doc_id_t first_doc_id, /*!< in: doc id to lookup */
1095 doc_id_t last_doc_id) /*!< in: doc id to lookup */
1096{
1097 int pos;
1098 int upper = static_cast<int>(ib_vector_size(doc_ids));
1099 fts_update_t* array = (fts_update_t*) doc_ids->data;
1100
1101 pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1102
1103 ut_a(abs(pos) <= upper + 1);
1104
1105 if (pos < 0) {
1106
1107 int i = abs(pos);
1108
1109 /* If i is 1, it could be first_doc_id is less than
1110 either the first or second array item, do a
1111 double check */
1112 if (i == 1 && array[0].doc_id <= last_doc_id
1113 && first_doc_id < array[0].doc_id) {
1114 pos = 0;
1115 } else if (i < upper && array[i].doc_id <= last_doc_id) {
1116
1117 /* Check if the "next" doc id is within the
1118 first & last doc id of the node. */
1119 pos = i;
1120 }
1121 }
1122
1123 return(pos);
1124}
1125
1126/**********************************************************************//**
1127Encode the word pos list into the node
1128@return DB_SUCCESS or error code*/
1129static MY_ATTRIBUTE((nonnull))
1130dberr_t
1131fts_optimize_encode_node(
1132/*=====================*/
1133 fts_node_t* node, /*!< in: node to fill*/
1134 doc_id_t doc_id, /*!< in: doc id to encode */
1135 fts_encode_t* enc) /*!< in: encoding state.*/
1136{
1137 byte* dst;
1138 ulint enc_len;
1139 ulint pos_enc_len;
1140 doc_id_t doc_id_delta;
1141 dberr_t error = DB_SUCCESS;
1142 byte* src = enc->src_ilist_ptr;
1143
1144 if (node->first_doc_id == 0) {
1145 ut_a(node->last_doc_id == 0);
1146
1147 node->first_doc_id = doc_id;
1148 }
1149
1150 /* Calculate the space required to store the ilist. */
1151 ut_ad(doc_id > node->last_doc_id);
1152 doc_id_delta = doc_id - node->last_doc_id;
1153 enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1154
1155 /* Calculate the size of the encoded pos array. */
1156 while (*src) {
1157 fts_decode_vlc(&src);
1158 }
1159
1160 /* Skip the 0x00 byte at the end of the word positions list. */
1161 ++src;
1162
1163 /* Number of encoded pos bytes to copy. */
1164 pos_enc_len = ulint(src - enc->src_ilist_ptr);
1165
1166 /* Total number of bytes required for copy. */
1167 enc_len += pos_enc_len;
1168
1169 /* Check we have enough space in the destination buffer for
1170 copying the document word list. */
1171 if (!node->ilist) {
1172 ulint new_size;
1173
1174 ut_a(node->ilist_size == 0);
1175
1176 new_size = enc_len > FTS_ILIST_MAX_SIZE
1177 ? enc_len : FTS_ILIST_MAX_SIZE;
1178
1179 node->ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1180 node->ilist_size_alloc = new_size;
1181
1182 } else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1183 ulint new_size = node->ilist_size + enc_len;
1184 byte* ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1185
1186 memcpy(ilist, node->ilist, node->ilist_size);
1187
1188 ut_free(node->ilist);
1189
1190 node->ilist = ilist;
1191 node->ilist_size_alloc = new_size;
1192 }
1193
1194 src = enc->src_ilist_ptr;
1195 dst = node->ilist + node->ilist_size;
1196
1197 /* Encode the doc id. Cast to ulint, the delta should be small and
1198 therefore no loss of precision. */
1199 dst += fts_encode_int((ulint) doc_id_delta, dst);
1200
1201 /* Copy the encoded pos array. */
1202 memcpy(dst, src, pos_enc_len);
1203
1204 node->last_doc_id = doc_id;
1205
1206 /* Data copied upto here. */
1207 node->ilist_size += enc_len;
1208 enc->src_ilist_ptr += pos_enc_len;
1209
1210 ut_a(node->ilist_size <= node->ilist_size_alloc);
1211
1212 return(error);
1213}
1214
1215/**********************************************************************//**
1216Optimize the data contained in a node.
1217@return DB_SUCCESS or error code*/
1218static MY_ATTRIBUTE((nonnull))
1219dberr_t
1220fts_optimize_node(
1221/*==============*/
1222 ib_vector_t* del_vec, /*!< in: vector of doc ids to delete*/
1223 int* del_pos, /*!< in: offset into above vector */
1224 fts_node_t* dst_node, /*!< in: node to fill*/
1225 fts_node_t* src_node, /*!< in: source node for data*/
1226 fts_encode_t* enc) /*!< in: encoding state */
1227{
1228 ulint copied;
1229 dberr_t error = DB_SUCCESS;
1230 doc_id_t doc_id = enc->src_last_doc_id;
1231
1232 if (!enc->src_ilist_ptr) {
1233 enc->src_ilist_ptr = src_node->ilist;
1234 }
1235
1236 copied = ulint(enc->src_ilist_ptr - src_node->ilist);
1237
1238 /* While there is data in the source node and space to copy
1239 into in the destination node. */
1240 while (copied < src_node->ilist_size
1241 && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1242
1243 doc_id_t delta;
1244 doc_id_t del_doc_id = FTS_NULL_DOC_ID;
1245
1246 delta = fts_decode_vlc(&enc->src_ilist_ptr);
1247
1248test_again:
1249 /* Check whether the doc id is in the delete list, if
1250 so then we skip the entries but we need to track the
1251 delta for decoding the entries following this document's
1252 entries. */
1253 if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1254 fts_update_t* update;
1255
1256 update = (fts_update_t*) ib_vector_get(
1257 del_vec, ulint(*del_pos));
1258
1259 del_doc_id = update->doc_id;
1260 }
1261
1262 if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1263 ut_a(delta == src_node->first_doc_id);
1264 }
1265
1266 doc_id += delta;
1267
1268 if (del_doc_id > 0 && doc_id == del_doc_id) {
1269
1270 ++*del_pos;
1271
1272 /* Skip the entries for this document. */
1273 while (*enc->src_ilist_ptr) {
1274 fts_decode_vlc(&enc->src_ilist_ptr);
1275 }
1276
1277 /* Skip the end of word position marker. */
1278 ++enc->src_ilist_ptr;
1279
1280 } else {
1281
1282 /* DOC ID already becomes larger than
1283 del_doc_id, check the next del_doc_id */
1284 if (del_doc_id > 0 && doc_id > del_doc_id) {
1285 del_doc_id = 0;
1286 ++*del_pos;
1287 delta = 0;
1288 goto test_again;
1289 }
1290
1291 /* Decode and copy the word positions into
1292 the dest node. */
1293 fts_optimize_encode_node(dst_node, doc_id, enc);
1294
1295 ++dst_node->doc_count;
1296
1297 ut_a(dst_node->last_doc_id == doc_id);
1298 }
1299
1300 /* Bytes copied so for from source. */
1301 copied = ulint(enc->src_ilist_ptr - src_node->ilist);
1302 }
1303
1304 if (copied >= src_node->ilist_size) {
1305 ut_a(doc_id == src_node->last_doc_id);
1306 }
1307
1308 enc->src_last_doc_id = doc_id;
1309
1310 return(error);
1311}
1312
1313/**********************************************************************//**
1314Determine the starting pos within the deleted doc id vector for a word.
1315@return delete position */
1316static MY_ATTRIBUTE((nonnull, warn_unused_result))
1317int
1318fts_optimize_deleted_pos(
1319/*=====================*/
1320 fts_optimize_t* optim, /*!< in: optimize state data */
1321 fts_word_t* word) /*!< in: the word data to check */
1322{
1323 int del_pos;
1324 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1325
1326 /* Get the first and last dict ids for the word, we will use
1327 these values to determine which doc ids need to be removed
1328 when we coalesce the nodes. This way we can reduce the numer
1329 of elements that need to be searched in the deleted doc ids
1330 vector and secondly we can remove the doc ids during the
1331 coalescing phase. */
1332 if (ib_vector_size(del_vec) > 0) {
1333 fts_node_t* node;
1334 doc_id_t last_id;
1335 doc_id_t first_id;
1336 ulint size = ib_vector_size(word->nodes);
1337
1338 node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1339 first_id = node->first_doc_id;
1340
1341 node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1342 last_id = node->last_doc_id;
1343
1344 ut_a(first_id <= last_id);
1345
1346 del_pos = fts_optimize_lookup(
1347 del_vec, optim->del_pos, first_id, last_id);
1348 } else {
1349
1350 del_pos = -1; /* Note that there is nothing to delete. */
1351 }
1352
1353 return(del_pos);
1354}
1355
1356#define FTS_DEBUG_PRINT
1357/**********************************************************************//**
1358Compact the nodes for a word, we also remove any doc ids during the
1359compaction pass.
1360@return DB_SUCCESS or error code.*/
1361static
1362ib_vector_t*
1363fts_optimize_word(
1364/*==============*/
1365 fts_optimize_t* optim, /*!< in: optimize state data */
1366 fts_word_t* word) /*!< in: the word to optimize */
1367{
1368 fts_encode_t enc;
1369 ib_vector_t* nodes;
1370 ulint i = 0;
1371 int del_pos;
1372 fts_node_t* dst_node = NULL;
1373 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1374 ulint size = ib_vector_size(word->nodes);
1375
1376 del_pos = fts_optimize_deleted_pos(optim, word);
1377 nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1378
1379 enc.src_last_doc_id = 0;
1380 enc.src_ilist_ptr = NULL;
1381
1382 if (fts_enable_diag_print) {
1383 word->text.f_str[word->text.f_len] = 0;
1384 ib::info() << "FTS_OPTIMIZE: optimize \"" << word->text.f_str
1385 << "\"";
1386 }
1387
1388 while (i < size) {
1389 ulint copied;
1390 fts_node_t* src_node;
1391
1392 src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1393
1394 if (dst_node == NULL
1395 || dst_node->last_doc_id > src_node->first_doc_id) {
1396
1397 dst_node = static_cast<fts_node_t*>(
1398 ib_vector_push(nodes, NULL));
1399 memset(dst_node, 0, sizeof(*dst_node));
1400 }
1401
1402 /* Copy from the src to the dst node. */
1403 fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1404
1405 ut_a(enc.src_ilist_ptr != NULL);
1406
1407 /* Determine the numer of bytes copied to dst_node. */
1408 copied = ulint(enc.src_ilist_ptr - src_node->ilist);
1409
1410 /* Can't copy more than whats in the vlc array. */
1411 ut_a(copied <= src_node->ilist_size);
1412
1413 /* We are done with this node release the resources. */
1414 if (copied == src_node->ilist_size) {
1415
1416 enc.src_last_doc_id = 0;
1417 enc.src_ilist_ptr = NULL;
1418
1419 ut_free(src_node->ilist);
1420
1421 src_node->ilist = NULL;
1422 src_node->ilist_size = src_node->ilist_size_alloc = 0;
1423
1424 src_node = NULL;
1425
1426 ++i; /* Get next source node to OPTIMIZE. */
1427 }
1428
1429 if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1430
1431 dst_node = NULL;
1432 }
1433 }
1434
1435 /* All dst nodes created should have been added to the vector. */
1436 ut_a(dst_node == NULL);
1437
1438 /* Return the OPTIMIZED nodes. */
1439 return(nodes);
1440}
1441
1442/**********************************************************************//**
1443Update the FTS index table. This is a delete followed by an insert.
1444@return DB_SUCCESS or error code */
1445static MY_ATTRIBUTE((nonnull, warn_unused_result))
1446dberr_t
1447fts_optimize_write_word(
1448/*====================*/
1449 trx_t* trx, /*!< in: transaction */
1450 fts_table_t* fts_table, /*!< in: table of FTS index */
1451 fts_string_t* word, /*!< in: word data to write */
1452 ib_vector_t* nodes) /*!< in: the nodes to write */
1453{
1454 ulint i;
1455 pars_info_t* info;
1456 que_t* graph;
1457 ulint selected;
1458 dberr_t error = DB_SUCCESS;
1459 char table_name[MAX_FULL_NAME_LEN];
1460
1461 info = pars_info_create();
1462
1463 ut_ad(fts_table->charset);
1464
1465 if (fts_enable_diag_print) {
1466 ib::info() << "FTS_OPTIMIZE: processed \"" << word->f_str
1467 << "\"";
1468 }
1469
1470 pars_info_bind_varchar_literal(
1471 info, "word", word->f_str, word->f_len);
1472
1473 selected = fts_select_index(fts_table->charset,
1474 word->f_str, word->f_len);
1475
1476 fts_table->suffix = fts_get_suffix(selected);
1477 fts_get_table_name(fts_table, table_name);
1478 pars_info_bind_id(info, true, "table_name", table_name);
1479
1480 graph = fts_parse_sql(
1481 fts_table,
1482 info,
1483 "BEGIN DELETE FROM $table_name WHERE word = :word;");
1484
1485 error = fts_eval_sql(trx, graph);
1486
1487 if (error != DB_SUCCESS) {
1488 ib::error() << "(" << ut_strerr(error) << ") during optimize,"
1489 " when deleting a word from the FTS index.";
1490 }
1491
1492 fts_que_graph_free(graph);
1493 graph = NULL;
1494
1495 /* Even if the operation needs to be rolled back and redone,
1496 we iterate over the nodes in order to free the ilist. */
1497 for (i = 0; i < ib_vector_size(nodes); ++i) {
1498
1499 fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1500
1501 if (error == DB_SUCCESS) {
1502 /* Skip empty node. */
1503 if (node->ilist == NULL) {
1504 ut_ad(node->ilist_size == 0);
1505 continue;
1506 }
1507
1508 error = fts_write_node(
1509 trx, &graph, fts_table, word, node);
1510
1511 if (error != DB_SUCCESS) {
1512 ib::error() << "(" << ut_strerr(error) << ")"
1513 " during optimize, while adding a"
1514 " word to the FTS index.";
1515 }
1516 }
1517
1518 ut_free(node->ilist);
1519 node->ilist = NULL;
1520 node->ilist_size = node->ilist_size_alloc = 0;
1521 }
1522
1523 if (graph != NULL) {
1524 fts_que_graph_free(graph);
1525 }
1526
1527 return(error);
1528}
1529
1530/**********************************************************************//**
1531Free fts_optimizer_word_t instanace.*/
1532void
1533fts_word_free(
1534/*==========*/
1535 fts_word_t* word) /*!< in: instance to free.*/
1536{
1537 mem_heap_t* heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1538
1539#ifdef UNIV_DEBUG
1540 memset(word, 0, sizeof(*word));
1541#endif /* UNIV_DEBUG */
1542
1543 mem_heap_free(heap);
1544}
1545
1546/**********************************************************************//**
1547Optimize the word ilist and rewrite data to the FTS index.
1548@return status one of RESTART, EXIT, ERROR */
1549static MY_ATTRIBUTE((nonnull, warn_unused_result))
1550dberr_t
1551fts_optimize_compact(
1552/*=================*/
1553 fts_optimize_t* optim, /*!< in: optimize state data */
1554 dict_index_t* index, /*!< in: current FTS being optimized */
1555 ib_time_t start_time) /*!< in: optimize start time */
1556{
1557 ulint i;
1558 dberr_t error = DB_SUCCESS;
1559 ulint size = ib_vector_size(optim->words);
1560
1561 for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1562 fts_word_t* word;
1563 ib_vector_t* nodes;
1564 trx_t* trx = optim->trx;
1565
1566 word = (fts_word_t*) ib_vector_get(optim->words, i);
1567
1568 /* nodes is allocated from the word heap and will be destroyed
1569 when the word is freed. We however have to be careful about
1570 the ilist, that needs to be freed explicitly. */
1571 nodes = fts_optimize_word(optim, word);
1572
1573 /* Update the data on disk. */
1574 error = fts_optimize_write_word(
1575 trx, &optim->fts_index_table, &word->text, nodes);
1576
1577 if (error == DB_SUCCESS) {
1578 /* Write the last word optimized to the config table,
1579 we use this value for restarting optimize. */
1580 error = fts_config_set_index_value(
1581 optim->trx, index,
1582 FTS_LAST_OPTIMIZED_WORD, &word->text);
1583 }
1584
1585 /* Free the word that was optimized. */
1586 fts_word_free(word);
1587
1588 if (fts_optimize_time_limit > 0
1589 && (ut_time() - start_time) > fts_optimize_time_limit) {
1590
1591 optim->done = TRUE;
1592 }
1593 }
1594
1595 return(error);
1596}
1597
1598/**********************************************************************//**
1599Create an instance of fts_optimize_t. Also create a new
1600background transaction.*/
1601static
1602fts_optimize_t*
1603fts_optimize_create(
1604/*================*/
1605 dict_table_t* table) /*!< in: table with FTS indexes */
1606{
1607 fts_optimize_t* optim;
1608 mem_heap_t* heap = mem_heap_create(128);
1609
1610 optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1611
1612 optim->self_heap = ib_heap_allocator_create(heap);
1613
1614 optim->to_delete = fts_doc_ids_create();
1615
1616 optim->words = ib_vector_create(
1617 optim->self_heap, sizeof(fts_word_t), 256);
1618
1619 optim->table = table;
1620
1621 optim->trx = trx_create();
1622 trx_start_internal(optim->trx);
1623
1624 optim->fts_common_table.parent = table->name.m_name;
1625 optim->fts_common_table.table_id = table->id;
1626 optim->fts_common_table.type = FTS_COMMON_TABLE;
1627 optim->fts_common_table.table = table;
1628
1629 optim->fts_index_table.parent = table->name.m_name;
1630 optim->fts_index_table.table_id = table->id;
1631 optim->fts_index_table.type = FTS_INDEX_TABLE;
1632 optim->fts_index_table.table = table;
1633
1634 /* The common prefix for all this parent table's aux tables. */
1635 optim->name_prefix = fts_get_table_name_prefix(
1636 &optim->fts_common_table);
1637
1638 return(optim);
1639}
1640
1641#ifdef FTS_OPTIMIZE_DEBUG
1642/**********************************************************************//**
1643Get optimize start time of an FTS index.
1644@return DB_SUCCESS if all OK else error code */
1645static MY_ATTRIBUTE((nonnull, warn_unused_result))
1646dberr_t
1647fts_optimize_get_index_start_time(
1648/*==============================*/
1649 trx_t* trx, /*!< in: transaction */
1650 dict_index_t* index, /*!< in: FTS index */
1651 ib_time_t* start_time) /*!< out: time in secs */
1652{
1653 return(fts_config_get_index_ulint(
1654 trx, index, FTS_OPTIMIZE_START_TIME,
1655 (ulint*) start_time));
1656}
1657
1658/**********************************************************************//**
1659Set the optimize start time of an FTS index.
1660@return DB_SUCCESS if all OK else error code */
1661static MY_ATTRIBUTE((nonnull, warn_unused_result))
1662dberr_t
1663fts_optimize_set_index_start_time(
1664/*==============================*/
1665 trx_t* trx, /*!< in: transaction */
1666 dict_index_t* index, /*!< in: FTS index */
1667 ib_time_t start_time) /*!< in: start time */
1668{
1669 return(fts_config_set_index_ulint(
1670 trx, index, FTS_OPTIMIZE_START_TIME,
1671 (ulint) start_time));
1672}
1673
1674/**********************************************************************//**
1675Get optimize end time of an FTS index.
1676@return DB_SUCCESS if all OK else error code */
1677static MY_ATTRIBUTE((nonnull, warn_unused_result))
1678dberr_t
1679fts_optimize_get_index_end_time(
1680/*============================*/
1681 trx_t* trx, /*!< in: transaction */
1682 dict_index_t* index, /*!< in: FTS index */
1683 ib_time_t* end_time) /*!< out: time in secs */
1684{
1685 return(fts_config_get_index_ulint(
1686 trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1687}
1688
1689/**********************************************************************//**
1690Set the optimize end time of an FTS index.
1691@return DB_SUCCESS if all OK else error code */
1692static MY_ATTRIBUTE((nonnull, warn_unused_result))
1693dberr_t
1694fts_optimize_set_index_end_time(
1695/*============================*/
1696 trx_t* trx, /*!< in: transaction */
1697 dict_index_t* index, /*!< in: FTS index */
1698 ib_time_t end_time) /*!< in: end time */
1699{
1700 return(fts_config_set_index_ulint(
1701 trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1702}
1703#endif
1704
1705/**********************************************************************//**
1706Free the optimize prepared statements.*/
1707static
1708void
1709fts_optimize_graph_free(
1710/*====================*/
1711 fts_optimize_graph_t* graph) /*!< in/out: The graph instances
1712 to free */
1713{
1714 if (graph->commit_graph) {
1715 que_graph_free(graph->commit_graph);
1716 graph->commit_graph = NULL;
1717 }
1718
1719 if (graph->write_nodes_graph) {
1720 que_graph_free(graph->write_nodes_graph);
1721 graph->write_nodes_graph = NULL;
1722 }
1723
1724 if (graph->delete_nodes_graph) {
1725 que_graph_free(graph->delete_nodes_graph);
1726 graph->delete_nodes_graph = NULL;
1727 }
1728
1729 if (graph->read_nodes_graph) {
1730 que_graph_free(graph->read_nodes_graph);
1731 graph->read_nodes_graph = NULL;
1732 }
1733}
1734
1735/**********************************************************************//**
1736Free all optimize resources. */
1737static
1738void
1739fts_optimize_free(
1740/*==============*/
1741 fts_optimize_t* optim) /*!< in: table with on FTS index */
1742{
1743 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1744
1745 trx_commit_for_mysql(optim->trx);
1746 trx_free(optim->trx);
1747
1748 fts_doc_ids_free(optim->to_delete);
1749 fts_optimize_graph_free(&optim->graph);
1750
1751 ut_free(optim->name_prefix);
1752
1753 /* This will free the heap from which optim itself was allocated. */
1754 mem_heap_free(heap);
1755}
1756
1757/**********************************************************************//**
1758Get the max time optimize should run in millisecs.
1759@return max optimize time limit in millisecs. */
1760static
1761ib_time_t
1762fts_optimize_get_time_limit(
1763/*========================*/
1764 trx_t* trx, /*!< in: transaction */
1765 fts_table_t* fts_table) /*!< in: aux table */
1766{
1767 ib_time_t time_limit = 0;
1768
1769 fts_config_get_ulint(
1770 trx, fts_table,
1771 FTS_OPTIMIZE_LIMIT_IN_SECS, (ulint*) &time_limit);
1772
1773 return(time_limit * 1000);
1774}
1775
1776
1777/**********************************************************************//**
1778Run OPTIMIZE on the given table. Note: this can take a very long time
1779(hours). */
1780static
1781void
1782fts_optimize_words(
1783/*===============*/
1784 fts_optimize_t* optim, /*!< in: optimize instance */
1785 dict_index_t* index, /*!< in: current FTS being optimized */
1786 fts_string_t* word) /*!< in: the starting word to optimize */
1787{
1788 fts_fetch_t fetch;
1789 ib_time_t start_time;
1790 que_t* graph = NULL;
1791 CHARSET_INFO* charset = optim->fts_index_table.charset;
1792
1793 ut_a(!optim->done);
1794
1795 /* Get the time limit from the config table. */
1796 fts_optimize_time_limit = fts_optimize_get_time_limit(
1797 optim->trx, &optim->fts_common_table);
1798
1799 start_time = ut_time();
1800
1801 /* Setup the callback to use for fetching the word ilist etc. */
1802 fetch.read_arg = optim->words;
1803 fetch.read_record = fts_optimize_index_fetch_node;
1804
1805 ib::info().write(word->f_str, word->f_len);
1806
1807 while (!optim->done) {
1808 dberr_t error;
1809 trx_t* trx = optim->trx;
1810 ulint selected;
1811
1812 ut_a(ib_vector_size(optim->words) == 0);
1813
1814 selected = fts_select_index(charset, word->f_str, word->f_len);
1815
1816 /* Read the index records to optimize. */
1817 fetch.total_memory = 0;
1818 error = fts_index_fetch_nodes(
1819 trx, &graph, &optim->fts_index_table, word,
1820 &fetch);
1821 ut_ad(fetch.total_memory < fts_result_cache_limit);
1822
1823 if (error == DB_SUCCESS) {
1824 /* There must be some nodes to read. */
1825 ut_a(ib_vector_size(optim->words) > 0);
1826
1827 /* Optimize the nodes that were read and write
1828 back to DB. */
1829 error = fts_optimize_compact(optim, index, start_time);
1830
1831 if (error == DB_SUCCESS) {
1832 fts_sql_commit(optim->trx);
1833 } else {
1834 fts_sql_rollback(optim->trx);
1835 }
1836 }
1837
1838 ib_vector_reset(optim->words);
1839
1840 if (error == DB_SUCCESS) {
1841 if (!optim->done) {
1842 if (!fts_zip_read_word(optim->zip, word)) {
1843 optim->done = TRUE;
1844 } else if (selected
1845 != fts_select_index(
1846 charset, word->f_str,
1847 word->f_len)
1848 && graph) {
1849 fts_que_graph_free(graph);
1850 graph = NULL;
1851 }
1852 }
1853 } else if (error == DB_LOCK_WAIT_TIMEOUT) {
1854 ib::warn() << "Lock wait timeout during optimize."
1855 " Retrying!";
1856
1857 trx->error_state = DB_SUCCESS;
1858 } else if (error == DB_DEADLOCK) {
1859 ib::warn() << "Deadlock during optimize. Retrying!";
1860
1861 trx->error_state = DB_SUCCESS;
1862 } else {
1863 optim->done = TRUE; /* Exit the loop. */
1864 }
1865 }
1866
1867 if (graph != NULL) {
1868 fts_que_graph_free(graph);
1869 }
1870}
1871
1872/**********************************************************************//**
1873Optimize is complete. Set the completion time, and reset the optimize
1874start string for this FTS index to "".
1875@return DB_SUCCESS if all OK */
1876static MY_ATTRIBUTE((nonnull, warn_unused_result))
1877dberr_t
1878fts_optimize_index_completed(
1879/*=========================*/
1880 fts_optimize_t* optim, /*!< in: optimize instance */
1881 dict_index_t* index) /*!< in: table with one FTS index */
1882{
1883 fts_string_t word;
1884 dberr_t error;
1885 byte buf[sizeof(ulint)];
1886#ifdef FTS_OPTIMIZE_DEBUG
1887 ib_time_t end_time = ut_time();
1888
1889 error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1890#endif
1891
1892 /* If we've reached the end of the index then set the start
1893 word to the empty string. */
1894
1895 word.f_len = 0;
1896 word.f_str = buf;
1897 *word.f_str = '\0';
1898
1899 error = fts_config_set_index_value(
1900 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1901
1902 if (error != DB_SUCCESS) {
1903
1904 ib::error() << "(" << ut_strerr(error) << ") while updating"
1905 " last optimized word!";
1906 }
1907
1908 return(error);
1909}
1910
1911
1912/**********************************************************************//**
1913Read the list of words from the FTS auxiliary index that will be
1914optimized in this pass.
1915@return DB_SUCCESS if all OK */
1916static MY_ATTRIBUTE((nonnull, warn_unused_result))
1917dberr_t
1918fts_optimize_index_read_words(
1919/*==========================*/
1920 fts_optimize_t* optim, /*!< in: optimize instance */
1921 dict_index_t* index, /*!< in: table with one FTS index */
1922 fts_string_t* word) /*!< in: buffer to use */
1923{
1924 dberr_t error = DB_SUCCESS;
1925
1926 if (optim->del_list_regenerated) {
1927 word->f_len = 0;
1928 } else {
1929
1930 /* Get the last word that was optimized from
1931 the config table. */
1932 error = fts_config_get_index_value(
1933 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1934 }
1935
1936 /* If record not found then we start from the top. */
1937 if (error == DB_RECORD_NOT_FOUND) {
1938 word->f_len = 0;
1939 error = DB_SUCCESS;
1940 }
1941
1942 while (error == DB_SUCCESS) {
1943
1944 error = fts_index_fetch_words(
1945 optim, word, fts_num_word_optimize);
1946
1947 if (error == DB_SUCCESS) {
1948 /* Reset the last optimized word to '' if no
1949 more words could be read from the FTS index. */
1950 if (optim->zip->n_words == 0) {
1951 word->f_len = 0;
1952 *word->f_str = 0;
1953 }
1954
1955 break;
1956 }
1957 }
1958
1959 return(error);
1960}
1961
1962/**********************************************************************//**
1963Run OPTIMIZE on the given FTS index. Note: this can take a very long
1964time (hours).
1965@return DB_SUCCESS if all OK */
1966static MY_ATTRIBUTE((nonnull, warn_unused_result))
1967dberr_t
1968fts_optimize_index(
1969/*===============*/
1970 fts_optimize_t* optim, /*!< in: optimize instance */
1971 dict_index_t* index) /*!< in: table with one FTS index */
1972{
1973 fts_string_t word;
1974 dberr_t error;
1975 byte str[FTS_MAX_WORD_LEN + 1];
1976
1977 /* Set the current index that we have to optimize. */
1978 optim->fts_index_table.index_id = index->id;
1979 optim->fts_index_table.charset = fts_index_get_charset(index);
1980
1981 optim->done = FALSE; /* Optimize until !done */
1982
1983 /* We need to read the last word optimized so that we start from
1984 the next word. */
1985 word.f_str = str;
1986
1987 /* We set the length of word to the size of str since we
1988 need to pass the max len info to the fts_get_config_value() function. */
1989 word.f_len = sizeof(str) - 1;
1990
1991 memset(word.f_str, 0x0, word.f_len);
1992
1993 /* Read the words that will be optimized in this pass. */
1994 error = fts_optimize_index_read_words(optim, index, &word);
1995
1996 if (error == DB_SUCCESS) {
1997 int zip_error;
1998
1999 ut_a(optim->zip->pos == 0);
2000 ut_a(optim->zip->zp->total_in == 0);
2001 ut_a(optim->zip->zp->total_out == 0);
2002
2003 zip_error = inflateInit(optim->zip->zp);
2004 ut_a(zip_error == Z_OK);
2005
2006 word.f_len = 0;
2007 word.f_str = str;
2008
2009 /* Read the first word to optimize from the Zip buffer. */
2010 if (!fts_zip_read_word(optim->zip, &word)) {
2011
2012 optim->done = TRUE;
2013 } else {
2014 fts_optimize_words(optim, index, &word);
2015 }
2016
2017 /* If we couldn't read any records then optimize is
2018 complete. Increment the number of indexes that have
2019 been optimized and set FTS index optimize state to
2020 completed. */
2021 if (error == DB_SUCCESS && optim->zip->n_words == 0) {
2022
2023 error = fts_optimize_index_completed(optim, index);
2024
2025 if (error == DB_SUCCESS) {
2026 ++optim->n_completed;
2027 }
2028 }
2029 }
2030
2031 return(error);
2032}
2033
2034/**********************************************************************//**
2035Delete the document ids in the delete, and delete cache tables.
2036@return DB_SUCCESS if all OK */
2037static MY_ATTRIBUTE((nonnull, warn_unused_result))
2038dberr_t
2039fts_optimize_purge_deleted_doc_ids(
2040/*===============================*/
2041 fts_optimize_t* optim) /*!< in: optimize instance */
2042{
2043 ulint i;
2044 pars_info_t* info;
2045 que_t* graph;
2046 fts_update_t* update;
2047 doc_id_t write_doc_id;
2048 dberr_t error = DB_SUCCESS;
2049 char deleted[MAX_FULL_NAME_LEN];
2050 char deleted_cache[MAX_FULL_NAME_LEN];
2051
2052 info = pars_info_create();
2053
2054 ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2055
2056 update = static_cast<fts_update_t*>(
2057 ib_vector_get(optim->to_delete->doc_ids, 0));
2058
2059 /* Convert to "storage" byte order. */
2060 fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2061
2062 /* This is required for the SQL parser to work. It must be able
2063 to find the following variables. So we do it twice. */
2064 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2065 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2066
2067 /* Make sure the following two names are consistent with the name
2068 used in the fts_delete_doc_ids_sql */
2069 optim->fts_common_table.suffix = fts_common_tables[3];
2070 fts_get_table_name(&optim->fts_common_table, deleted);
2071 pars_info_bind_id(info, true, fts_common_tables[3], deleted);
2072
2073 optim->fts_common_table.suffix = fts_common_tables[4];
2074 fts_get_table_name(&optim->fts_common_table, deleted_cache);
2075 pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
2076
2077 graph = fts_parse_sql(NULL, info, fts_delete_doc_ids_sql);
2078
2079 /* Delete the doc ids that were copied at the start. */
2080 for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2081
2082 update = static_cast<fts_update_t*>(ib_vector_get(
2083 optim->to_delete->doc_ids, i));
2084
2085 /* Convert to "storage" byte order. */
2086 fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2087
2088 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2089
2090 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2091
2092 error = fts_eval_sql(optim->trx, graph);
2093
2094 // FIXME: Check whether delete actually succeeded!
2095 if (error != DB_SUCCESS) {
2096
2097 fts_sql_rollback(optim->trx);
2098 break;
2099 }
2100 }
2101
2102 fts_que_graph_free(graph);
2103
2104 return(error);
2105}
2106
2107/**********************************************************************//**
2108Delete the document ids in the pending delete, and delete tables.
2109@return DB_SUCCESS if all OK */
2110static MY_ATTRIBUTE((nonnull, warn_unused_result))
2111dberr_t
2112fts_optimize_purge_deleted_doc_id_snapshot(
2113/*=======================================*/
2114 fts_optimize_t* optim) /*!< in: optimize instance */
2115{
2116 dberr_t error;
2117 que_t* graph;
2118 pars_info_t* info;
2119 char being_deleted[MAX_FULL_NAME_LEN];
2120 char being_deleted_cache[MAX_FULL_NAME_LEN];
2121
2122 info = pars_info_create();
2123
2124 /* Make sure the following two names are consistent with the name
2125 used in the fts_end_delete_sql */
2126 optim->fts_common_table.suffix = fts_common_tables[0];
2127 fts_get_table_name(&optim->fts_common_table, being_deleted);
2128 pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
2129
2130 optim->fts_common_table.suffix = fts_common_tables[1];
2131 fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2132 pars_info_bind_id(info, true, fts_common_tables[1],
2133 being_deleted_cache);
2134
2135 /* Delete the doc ids that were copied to delete pending state at
2136 the start of optimize. */
2137 graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
2138
2139 error = fts_eval_sql(optim->trx, graph);
2140 fts_que_graph_free(graph);
2141
2142 return(error);
2143}
2144
2145/**********************************************************************//**
2146Copy the deleted doc ids that will be purged during this optimize run
2147to the being deleted FTS auxiliary tables. The transaction is committed
2148upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2149@return DB_SUCCESS if all OK */
2150static
2151ulint
2152fts_optimize_being_deleted_count(
2153/*=============================*/
2154 fts_optimize_t* optim) /*!< in: optimize instance */
2155{
2156 fts_table_t fts_table;
2157
2158 FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2159 optim->table);
2160
2161 return(fts_get_rows_count(&fts_table));
2162}
2163
2164/*********************************************************************//**
2165Copy the deleted doc ids that will be purged during this optimize run
2166to the being deleted FTS auxiliary tables. The transaction is committed
2167upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2168@return DB_SUCCESS if all OK */
2169static MY_ATTRIBUTE((nonnull, warn_unused_result))
2170dberr_t
2171fts_optimize_create_deleted_doc_id_snapshot(
2172/*========================================*/
2173 fts_optimize_t* optim) /*!< in: optimize instance */
2174{
2175 dberr_t error;
2176 que_t* graph;
2177 pars_info_t* info;
2178 char being_deleted[MAX_FULL_NAME_LEN];
2179 char deleted[MAX_FULL_NAME_LEN];
2180 char being_deleted_cache[MAX_FULL_NAME_LEN];
2181 char deleted_cache[MAX_FULL_NAME_LEN];
2182
2183 info = pars_info_create();
2184
2185 /* Make sure the following four names are consistent with the name
2186 used in the fts_init_delete_sql */
2187 optim->fts_common_table.suffix = fts_common_tables[0];
2188 fts_get_table_name(&optim->fts_common_table, being_deleted);
2189 pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
2190
2191 optim->fts_common_table.suffix = fts_common_tables[3];
2192 fts_get_table_name(&optim->fts_common_table, deleted);
2193 pars_info_bind_id(info, true, fts_common_tables[3], deleted);
2194
2195 optim->fts_common_table.suffix = fts_common_tables[1];
2196 fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2197 pars_info_bind_id(info, true, fts_common_tables[1],
2198 being_deleted_cache);
2199
2200 optim->fts_common_table.suffix = fts_common_tables[4];
2201 fts_get_table_name(&optim->fts_common_table, deleted_cache);
2202 pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
2203
2204 /* Move doc_ids that are to be deleted to state being deleted. */
2205 graph = fts_parse_sql(NULL, info, fts_init_delete_sql);
2206
2207 error = fts_eval_sql(optim->trx, graph);
2208
2209 fts_que_graph_free(graph);
2210
2211 if (error != DB_SUCCESS) {
2212 fts_sql_rollback(optim->trx);
2213 } else {
2214 fts_sql_commit(optim->trx);
2215 }
2216
2217 optim->del_list_regenerated = TRUE;
2218
2219 return(error);
2220}
2221
2222/*********************************************************************//**
2223Read in the document ids that are to be purged during optimize. The
2224transaction is committed upon successfully read.
2225@return DB_SUCCESS if all OK */
2226static MY_ATTRIBUTE((nonnull, warn_unused_result))
2227dberr_t
2228fts_optimize_read_deleted_doc_id_snapshot(
2229/*======================================*/
2230 fts_optimize_t* optim) /*!< in: optimize instance */
2231{
2232 dberr_t error;
2233
2234 optim->fts_common_table.suffix = "BEING_DELETED";
2235
2236 /* Read the doc_ids to delete. */
2237 error = fts_table_fetch_doc_ids(
2238 optim->trx, &optim->fts_common_table, optim->to_delete);
2239
2240 if (error == DB_SUCCESS) {
2241
2242 optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2243
2244 /* Read additional doc_ids to delete. */
2245 error = fts_table_fetch_doc_ids(
2246 optim->trx, &optim->fts_common_table, optim->to_delete);
2247 }
2248
2249 if (error != DB_SUCCESS) {
2250
2251 fts_doc_ids_free(optim->to_delete);
2252 optim->to_delete = NULL;
2253 }
2254
2255 return(error);
2256}
2257
2258/*********************************************************************//**
2259Optimze all the FTS indexes, skipping those that have already been
2260optimized, since the FTS auxiliary indexes are not guaranteed to be
2261of the same cardinality.
2262@return DB_SUCCESS if all OK */
2263static MY_ATTRIBUTE((nonnull, warn_unused_result))
2264dberr_t
2265fts_optimize_indexes(
2266/*=================*/
2267 fts_optimize_t* optim) /*!< in: optimize instance */
2268{
2269 ulint i;
2270 dberr_t error = DB_SUCCESS;
2271 fts_t* fts = optim->table->fts;
2272
2273 /* Optimize the FTS indexes. */
2274 for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2275 dict_index_t* index;
2276
2277#ifdef FTS_OPTIMIZE_DEBUG
2278 ib_time_t end_time;
2279 ib_time_t start_time;
2280
2281 /* Get the start and end optimize times for this index. */
2282 error = fts_optimize_get_index_start_time(
2283 optim->trx, index, &start_time);
2284
2285 if (error != DB_SUCCESS) {
2286 break;
2287 }
2288
2289 error = fts_optimize_get_index_end_time(
2290 optim->trx, index, &end_time);
2291
2292 if (error != DB_SUCCESS) {
2293 break;
2294 }
2295
2296 /* Start time will be 0 only for the first time or after
2297 completing the optimization of all FTS indexes. */
2298 if (start_time == 0) {
2299 start_time = ut_time();
2300
2301 error = fts_optimize_set_index_start_time(
2302 optim->trx, index, start_time);
2303 }
2304
2305 /* Check if this index needs to be optimized or not. */
2306 if (ut_difftime(end_time, start_time) < 0) {
2307 error = fts_optimize_index(optim, index);
2308
2309 if (error != DB_SUCCESS) {
2310 break;
2311 }
2312 } else {
2313 ++optim->n_completed;
2314 }
2315#endif
2316 index = static_cast<dict_index_t*>(
2317 ib_vector_getp(fts->indexes, i));
2318 error = fts_optimize_index(optim, index);
2319 }
2320
2321 if (error == DB_SUCCESS) {
2322 fts_sql_commit(optim->trx);
2323 } else {
2324 fts_sql_rollback(optim->trx);
2325 }
2326
2327 return(error);
2328}
2329
2330/*********************************************************************//**
2331Cleanup the snapshot tables and the master deleted table.
2332@return DB_SUCCESS if all OK */
2333static MY_ATTRIBUTE((nonnull, warn_unused_result))
2334dberr_t
2335fts_optimize_purge_snapshot(
2336/*========================*/
2337 fts_optimize_t* optim) /*!< in: optimize instance */
2338{
2339 dberr_t error;
2340
2341 /* Delete the doc ids from the master deleted tables, that were
2342 in the snapshot that was taken at the start of optimize. */
2343 error = fts_optimize_purge_deleted_doc_ids(optim);
2344
2345 if (error == DB_SUCCESS) {
2346 /* Destroy the deleted doc id snapshot. */
2347 error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2348 }
2349
2350 if (error == DB_SUCCESS) {
2351 fts_sql_commit(optim->trx);
2352 } else {
2353 fts_sql_rollback(optim->trx);
2354 }
2355
2356 return(error);
2357}
2358
2359/*********************************************************************//**
2360Reset the start time to 0 so that a new optimize can be started.
2361@return DB_SUCCESS if all OK */
2362static MY_ATTRIBUTE((nonnull, warn_unused_result))
2363dberr_t
2364fts_optimize_reset_start_time(
2365/*==========================*/
2366 fts_optimize_t* optim) /*!< in: optimize instance */
2367{
2368 dberr_t error = DB_SUCCESS;
2369#ifdef FTS_OPTIMIZE_DEBUG
2370 fts_t* fts = optim->table->fts;
2371
2372 /* Optimization should have been completed for all indexes. */
2373 ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2374
2375 for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2376 dict_index_t* index;
2377
2378 ib_time_t start_time = 0;
2379
2380 /* Reset the start time to 0 for this index. */
2381 error = fts_optimize_set_index_start_time(
2382 optim->trx, index, start_time);
2383
2384 index = static_cast<dict_index_t*>(
2385 ib_vector_getp(fts->indexes, i));
2386 }
2387#endif
2388
2389 if (error == DB_SUCCESS) {
2390 fts_sql_commit(optim->trx);
2391 } else {
2392 fts_sql_rollback(optim->trx);
2393 }
2394
2395 return(error);
2396}
2397
2398/*********************************************************************//**
2399Run OPTIMIZE on the given table by a background thread.
2400@return DB_SUCCESS if all OK */
2401static MY_ATTRIBUTE((nonnull))
2402dberr_t
2403fts_optimize_table_bk(
2404/*==================*/
2405 fts_slot_t* slot) /*!< in: table to optimiza */
2406{
2407 dberr_t error;
2408 dict_table_t* table = slot->table;
2409 fts_t* fts = table->fts;
2410
2411 /* Avoid optimizing tables that were optimized recently. */
2412 if (slot->last_run > 0
2413 && (ut_time() - slot->last_run) < slot->interval_time) {
2414
2415 return(DB_SUCCESS);
2416
2417 } else if (fts && fts->cache
2418 && fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2419
2420 error = fts_optimize_table(table);
2421
2422 if (error == DB_SUCCESS) {
2423 slot->state = FTS_STATE_DONE;
2424 slot->last_run = 0;
2425 slot->completed = ut_time();
2426 }
2427 } else {
2428 error = DB_SUCCESS;
2429 }
2430
2431 /* Note time this run completed. */
2432 slot->last_run = ut_time();
2433
2434 return(error);
2435}
2436/*********************************************************************//**
2437Run OPTIMIZE on the given table.
2438@return DB_SUCCESS if all OK */
2439dberr_t
2440fts_optimize_table(
2441/*===============*/
2442 dict_table_t* table) /*!< in: table to optimiza */
2443{
2444 if (srv_read_only_mode) {
2445 return DB_READ_ONLY;
2446 }
2447
2448 dberr_t error = DB_SUCCESS;
2449 fts_optimize_t* optim = NULL;
2450 fts_t* fts = table->fts;
2451
2452 if (fts_enable_diag_print) {
2453 ib::info() << "FTS start optimize " << table->name;
2454 }
2455
2456 optim = fts_optimize_create(table);
2457
2458 // FIXME: Call this only at the start of optimize, currently we
2459 // rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2460
2461 /* Check whether there are still records in BEING_DELETED table */
2462 if (fts_optimize_being_deleted_count(optim) == 0) {
2463 /* Take a snapshot of the deleted document ids, they are copied
2464 to the BEING_ tables. */
2465 error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2466 }
2467
2468 /* A duplicate error is OK, since we don't erase the
2469 doc ids from the being deleted state until all FTS
2470 indexes have been optimized. */
2471 if (error == DB_DUPLICATE_KEY) {
2472 error = DB_SUCCESS;
2473 }
2474
2475 if (error == DB_SUCCESS) {
2476
2477 /* These document ids will be filtered out during the
2478 index optimization phase. They are in the snapshot that we
2479 took above, at the start of the optimize. */
2480 error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2481
2482 if (error == DB_SUCCESS) {
2483
2484 /* Commit the read of being deleted
2485 doc ids transaction. */
2486 fts_sql_commit(optim->trx);
2487
2488 /* We would do optimization only if there
2489 are deleted records to be cleaned up */
2490 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2491 error = fts_optimize_indexes(optim);
2492 }
2493
2494 } else {
2495 ut_a(optim->to_delete == NULL);
2496 }
2497
2498 /* Only after all indexes have been optimized can we
2499 delete the (snapshot) doc ids in the pending delete,
2500 and master deleted tables. */
2501 if (error == DB_SUCCESS
2502 && optim->n_completed == ib_vector_size(fts->indexes)) {
2503
2504 if (fts_enable_diag_print) {
2505 ib::info() << "FTS_OPTIMIZE: Completed"
2506 " Optimize, cleanup DELETED table";
2507 }
2508
2509 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2510
2511 /* Purge the doc ids that were in the
2512 snapshot from the snapshot tables and
2513 the master deleted table. */
2514 error = fts_optimize_purge_snapshot(optim);
2515 }
2516
2517 if (error == DB_SUCCESS) {
2518 /* Reset the start time of all the FTS indexes
2519 so that optimize can be restarted. */
2520 error = fts_optimize_reset_start_time(optim);
2521 }
2522 }
2523 }
2524
2525 fts_optimize_free(optim);
2526
2527 if (fts_enable_diag_print) {
2528 ib::info() << "FTS end optimize " << table->name;
2529 }
2530
2531 return(error);
2532}
2533
2534/********************************************************************//**
2535Add the table to add to the OPTIMIZER's list.
2536@return new message instance */
2537static
2538fts_msg_t*
2539fts_optimize_create_msg(
2540/*====================*/
2541 fts_msg_type_t type, /*!< in: type of message */
2542 void* ptr) /*!< in: message payload */
2543{
2544 mem_heap_t* heap;
2545 fts_msg_t* msg;
2546
2547 heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2548 msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2549
2550 msg->ptr = ptr;
2551 msg->type = type;
2552 msg->heap = heap;
2553
2554 return(msg);
2555}
2556
2557/**********************************************************************//**
2558Add the table to add to the OPTIMIZER's list. */
2559void
2560fts_optimize_add_table(
2561/*===================*/
2562 dict_table_t* table) /*!< in: table to add */
2563{
2564 fts_msg_t* msg;
2565
2566 if (!fts_optimize_wq) {
2567 return;
2568 }
2569
2570 /* Make sure table with FTS index cannot be evicted */
2571 dict_table_prevent_eviction(table);
2572
2573 msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2574
2575 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2576}
2577
2578#if 0
2579/**********************************************************************//**
2580Optimize a table. */
2581static
2582void
2583fts_optimize_do_table(
2584/*==================*/
2585 dict_table_t* table) /*!< in: table to optimize */
2586{
2587 fts_msg_t* msg;
2588
2589 /* Optimizer thread could be shutdown */
2590 if (!fts_optimize_wq) {
2591 return;
2592 }
2593
2594 msg = fts_optimize_create_msg(FTS_MSG_OPTIMIZE_TABLE, table);
2595
2596 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2597}
2598#endif
2599
2600/**********************************************************************//**
2601Remove the table from the OPTIMIZER's list. We do wait for
2602acknowledgement from the consumer of the message. */
2603void
2604fts_optimize_remove_table(
2605/*======================*/
2606 dict_table_t* table) /*!< in: table to remove */
2607{
2608 fts_msg_t* msg;
2609 os_event_t event;
2610 fts_msg_del_t* remove;
2611
2612 /* if the optimize system not yet initialized, return */
2613 if (!fts_optimize_wq) {
2614 return;
2615 }
2616
2617 /* FTS optimizer thread is already exited */
2618 if (fts_opt_start_shutdown) {
2619 ib::info() << "Try to remove table " << table->name
2620 << " after FTS optimize thread exiting.";
2621 return;
2622 }
2623
2624 msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2625
2626 /* We will wait on this event until signalled by the consumer. */
2627 event = os_event_create(0);
2628
2629 remove = static_cast<fts_msg_del_t*>(
2630 mem_heap_alloc(msg->heap, sizeof(*remove)));
2631
2632 remove->table = table;
2633 remove->event = event;
2634 msg->ptr = remove;
2635
2636 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2637
2638 os_event_wait(event);
2639
2640 os_event_destroy(event);
2641}
2642
2643/** Send sync fts cache for the table.
2644@param[in] table table to sync */
2645void
2646fts_optimize_request_sync_table(
2647 dict_table_t* table)
2648{
2649 fts_msg_t* msg;
2650 table_id_t* table_id;
2651
2652 /* if the optimize system not yet initialized, return */
2653 if (!fts_optimize_wq) {
2654 return;
2655 }
2656
2657 /* FTS optimizer thread is already exited */
2658 if (fts_opt_start_shutdown) {
2659 ib::info() << "Try to sync table " << table->name
2660 << " after FTS optimize thread exiting.";
2661 return;
2662 }
2663
2664 msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
2665
2666 table_id = static_cast<table_id_t*>(
2667 mem_heap_alloc(msg->heap, sizeof(table_id_t)));
2668 *table_id = table->id;
2669 msg->ptr = table_id;
2670
2671 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2672}
2673
2674/**********************************************************************//**
2675Find the slot for a particular table.
2676@return slot if found else NULL. */
2677static
2678fts_slot_t*
2679fts_optimize_find_slot(
2680/*===================*/
2681 ib_vector_t* tables, /*!< in: vector of tables */
2682 const dict_table_t* table) /*!< in: table to add */
2683{
2684 ulint i;
2685
2686 for (i = 0; i < ib_vector_size(tables); ++i) {
2687 fts_slot_t* slot;
2688
2689 slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2690
2691 if (slot->table == table) {
2692 return(slot);
2693 }
2694 }
2695
2696 return(NULL);
2697}
2698
2699/**********************************************************************//**
2700Start optimizing table. */
2701static
2702void
2703fts_optimize_start_table(
2704/*=====================*/
2705 ib_vector_t* tables, /*!< in/out: vector of tables */
2706 dict_table_t* table) /*!< in: table to optimize */
2707{
2708 fts_slot_t* slot;
2709
2710 slot = fts_optimize_find_slot(tables, table);
2711
2712 if (slot == NULL) {
2713 ib::error() << "Table " << table->name << " not registered"
2714 " with the optimize thread.";
2715 } else {
2716 slot->last_run = 0;
2717 slot->completed = 0;
2718 }
2719}
2720
2721/**********************************************************************//**
2722Add the table to the vector if it doesn't already exist. */
2723static
2724ibool
2725fts_optimize_new_table(
2726/*===================*/
2727 ib_vector_t* tables, /*!< in/out: vector of tables */
2728 dict_table_t* table) /*!< in: table to add */
2729{
2730 ulint i;
2731 fts_slot_t* slot;
2732 ulint empty_slot = ULINT_UNDEFINED;
2733
2734 /* Search for duplicates, also find a free slot if one exists. */
2735 for (i = 0; i < ib_vector_size(tables); ++i) {
2736
2737 slot = static_cast<fts_slot_t*>(
2738 ib_vector_get(tables, i));
2739
2740 if (slot->state == FTS_STATE_EMPTY) {
2741 empty_slot = i;
2742 } else if (slot->table == table) {
2743 /* Already exists in our optimize queue. */
2744 ut_ad(slot->table_id = table->id);
2745 return(FALSE);
2746 }
2747 }
2748
2749 /* Reuse old slot. */
2750 if (empty_slot != ULINT_UNDEFINED) {
2751
2752 slot = static_cast<fts_slot_t*>(
2753 ib_vector_get(tables, empty_slot));
2754
2755 ut_a(slot->state == FTS_STATE_EMPTY);
2756
2757 } else { /* Create a new slot. */
2758
2759 slot = static_cast<fts_slot_t*>(ib_vector_push(tables, NULL));
2760 }
2761
2762 memset(slot, 0x0, sizeof(*slot));
2763
2764 slot->table = table;
2765 slot->table_id = table->id;
2766 slot->state = FTS_STATE_LOADED;
2767 slot->interval_time = FTS_OPTIMIZE_INTERVAL_IN_SECS;
2768
2769 return(TRUE);
2770}
2771
2772/**********************************************************************//**
2773Remove the table from the vector if it exists. */
2774static
2775ibool
2776fts_optimize_del_table(
2777/*===================*/
2778 ib_vector_t* tables, /*!< in/out: vector of tables */
2779 fts_msg_del_t* msg) /*!< in: table to delete */
2780{
2781 ulint i;
2782 dict_table_t* table = msg->table;
2783
2784 for (i = 0; i < ib_vector_size(tables); ++i) {
2785 fts_slot_t* slot;
2786
2787 slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2788
2789 if (slot->state != FTS_STATE_EMPTY
2790 && slot->table == table) {
2791
2792 if (fts_enable_diag_print) {
2793 ib::info() << "FTS Optimize Removing table "
2794 << table->name;
2795 }
2796
2797 slot->table = NULL;
2798 slot->state = FTS_STATE_EMPTY;
2799
2800 return(TRUE);
2801 }
2802 }
2803
2804 return(FALSE);
2805}
2806
2807/**********************************************************************//**
2808Calculate how many of the registered tables need to be optimized.
2809@return no. of tables to optimize */
2810static
2811ulint
2812fts_optimize_how_many(
2813/*==================*/
2814 const ib_vector_t* tables) /*!< in: registered tables
2815 vector*/
2816{
2817 ulint i;
2818 ib_time_t delta;
2819 ulint n_tables = 0;
2820 ib_time_t current_time;
2821
2822 current_time = ut_time();
2823
2824 for (i = 0; i < ib_vector_size(tables); ++i) {
2825 const fts_slot_t* slot;
2826
2827 slot = static_cast<const fts_slot_t*>(
2828 ib_vector_get_const(tables, i));
2829
2830 switch (slot->state) {
2831 case FTS_STATE_DONE:
2832 case FTS_STATE_LOADED:
2833 ut_a(slot->completed <= current_time);
2834
2835 delta = current_time - slot->completed;
2836
2837 /* Skip slots that have been optimized recently. */
2838 if (delta >= slot->interval_time) {
2839 ++n_tables;
2840 }
2841 break;
2842
2843 case FTS_STATE_RUNNING:
2844 ut_a(slot->last_run <= current_time);
2845
2846 delta = current_time - slot->last_run;
2847
2848 if (delta > slot->interval_time) {
2849 ++n_tables;
2850 }
2851 break;
2852
2853 /* Slots in a state other than the above
2854 are ignored. */
2855 case FTS_STATE_EMPTY:
2856 case FTS_STATE_SUSPENDED:
2857 break;
2858 }
2859
2860 }
2861
2862 return(n_tables);
2863}
2864
2865/**********************************************************************//**
2866Check if the total memory used by all FTS table exceeds the maximum limit.
2867@return true if a sync is needed, false otherwise */
2868static
2869bool
2870fts_is_sync_needed(
2871/*===============*/
2872 const ib_vector_t* tables) /*!< in: registered tables
2873 vector*/
2874{
2875 ulint total_memory = 0;
2876 double time_diff = difftime(ut_time(), last_check_sync_time);
2877
2878 if (fts_need_sync || time_diff < 5) {
2879 return(false);
2880 }
2881
2882 last_check_sync_time = ut_time();
2883
2884 for (ulint i = 0; i < ib_vector_size(tables); ++i) {
2885 const fts_slot_t* slot;
2886
2887 slot = static_cast<const fts_slot_t*>(
2888 ib_vector_get_const(tables, i));
2889
2890 if (slot->state != FTS_STATE_EMPTY && slot->table
2891 && slot->table->fts && slot->table->fts->cache) {
2892 total_memory += slot->table->fts->cache->total_size;
2893 }
2894
2895 if (total_memory > fts_max_total_cache_size) {
2896 return(true);
2897 }
2898 }
2899
2900 return(false);
2901}
2902
2903#if 0
2904/*********************************************************************//**
2905Check whether a table needs to be optimized. */
2906static
2907void
2908fts_optimize_need_sync(
2909/*===================*/
2910 ib_vector_t* tables) /*!< in: list of tables */
2911{
2912 dict_table_t* table = NULL;
2913 fts_slot_t* slot;
2914 ulint num_table = ib_vector_size(tables);
2915
2916 if (!num_table) {
2917 return;
2918 }
2919
2920 if (fts_optimize_sync_iterator >= num_table) {
2921 fts_optimize_sync_iterator = 0;
2922 }
2923
2924 slot = ib_vector_get(tables, fts_optimize_sync_iterator);
2925 table = slot->table;
2926
2927 if (!table) {
2928 return;
2929 }
2930
2931 ut_ad(table->fts);
2932
2933 if (table->fts->cache) {
2934 ulint deleted = table->fts->cache->deleted;
2935
2936 if (table->fts->cache->added
2937 >= fts_optimize_add_threshold) {
2938 fts_sync_table(table);
2939 } else if (deleted >= fts_optimize_delete_threshold) {
2940 fts_optimize_do_table(table);
2941
2942 mutex_enter(&table->fts->cache->deleted_lock);
2943 table->fts->cache->deleted -= deleted;
2944 mutex_exit(&table->fts->cache->deleted_lock);
2945 }
2946 }
2947
2948 fts_optimize_sync_iterator++;
2949
2950 return;
2951}
2952#endif
2953
2954/** Sync fts cache of a table
2955@param[in] table_id table id */
2956void
2957fts_optimize_sync_table(
2958 table_id_t table_id)
2959{
2960 dict_table_t* table = NULL;
2961
2962 table = dict_table_open_on_id(table_id, FALSE, DICT_TABLE_OP_NORMAL);
2963
2964 if (table) {
2965 if (dict_table_has_fts_index(table) && table->fts->cache) {
2966 fts_sync_table(table, true, false, true);
2967 }
2968
2969 dict_table_close(table, FALSE, FALSE);
2970 }
2971}
2972
2973/**********************************************************************//**
2974Optimize all FTS tables.
2975@return Dummy return */
2976static
2977os_thread_ret_t
2978DECLARE_THREAD(fts_optimize_thread)(
2979/*================*/
2980 void* arg) /*!< in: work queue*/
2981{
2982 ulint current = 0;
2983 ibool done = FALSE;
2984 ulint n_tables = 0;
2985 ulint n_optimize = 0;
2986 ib_wqueue_t* wq = (ib_wqueue_t*) arg;
2987
2988 ut_ad(!srv_read_only_mode);
2989 my_thread_init();
2990
2991 ut_ad(fts_slots);
2992
2993 /* Assign number of tables added in fts_slots_t to n_tables */
2994 n_tables = ib_vector_size(fts_slots);
2995
2996 while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
2997
2998 /* If there is no message in the queue and we have tables
2999 to optimize then optimize the tables. */
3000
3001 if (!done
3002 && ib_wqueue_is_empty(wq)
3003 && n_tables > 0
3004 && n_optimize > 0) {
3005
3006 fts_slot_t* slot;
3007
3008 ut_a(ib_vector_size(fts_slots) > 0);
3009
3010 slot = static_cast<fts_slot_t*>(
3011 ib_vector_get(fts_slots, current));
3012
3013 /* Handle the case of empty slots. */
3014 if (slot->state != FTS_STATE_EMPTY) {
3015
3016 slot->state = FTS_STATE_RUNNING;
3017
3018 fts_optimize_table_bk(slot);
3019 }
3020
3021 ++current;
3022
3023 /* Wrap around the counter. */
3024 if (current >= ib_vector_size(fts_slots)) {
3025 n_optimize = fts_optimize_how_many(fts_slots);
3026
3027 current = 0;
3028 }
3029
3030 } else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
3031 fts_msg_t* msg;
3032
3033 msg = static_cast<fts_msg_t*>(
3034 ib_wqueue_timedwait(wq,
3035 FTS_QUEUE_WAIT_IN_USECS));
3036
3037 /* Timeout ? */
3038 if (msg == NULL) {
3039 if (fts_is_sync_needed(fts_slots)) {
3040 fts_need_sync = true;
3041 }
3042
3043 continue;
3044 }
3045
3046 switch (msg->type) {
3047 case FTS_MSG_START:
3048 break;
3049
3050 case FTS_MSG_PAUSE:
3051 break;
3052
3053 case FTS_MSG_STOP:
3054 done = TRUE;
3055 break;
3056
3057 case FTS_MSG_ADD_TABLE:
3058 ut_a(!done);
3059 if (fts_optimize_new_table(
3060 fts_slots,
3061 static_cast<dict_table_t*>(
3062 msg->ptr))) {
3063 ++n_tables;
3064 }
3065 break;
3066
3067 case FTS_MSG_OPTIMIZE_TABLE:
3068 if (!done) {
3069 fts_optimize_start_table(
3070 fts_slots,
3071 static_cast<dict_table_t*>(
3072 msg->ptr));
3073 }
3074 break;
3075
3076 case FTS_MSG_DEL_TABLE:
3077 if (fts_optimize_del_table(
3078 fts_slots, static_cast<fts_msg_del_t*>(
3079 msg->ptr))) {
3080 --n_tables;
3081 }
3082
3083 /* Signal the producer that we have
3084 removed the table. */
3085 os_event_set(
3086 ((fts_msg_del_t*) msg->ptr)->event);
3087 break;
3088
3089 case FTS_MSG_SYNC_TABLE:
3090 fts_optimize_sync_table(
3091 *static_cast<table_id_t*>(msg->ptr));
3092 break;
3093
3094 default:
3095 ut_error;
3096 }
3097
3098 mem_heap_free(msg->heap);
3099
3100 if (!done) {
3101 n_optimize = fts_optimize_how_many(fts_slots);
3102 } else {
3103 n_optimize = 0;
3104 }
3105 }
3106 }
3107
3108 /* Server is being shutdown, sync the data from FTS cache to disk
3109 if needed */
3110 if (n_tables > 0) {
3111 ulint i;
3112
3113 for (i = 0; i < ib_vector_size(fts_slots); i++) {
3114 fts_slot_t* slot;
3115
3116 slot = static_cast<fts_slot_t*>(
3117 ib_vector_get(fts_slots, i));
3118
3119 if (slot->state != FTS_STATE_EMPTY) {
3120 fts_optimize_sync_table(slot->table_id);
3121 }
3122 }
3123 }
3124
3125 ib_vector_free(fts_slots);
3126
3127 ib::info() << "FTS optimize thread exiting.";
3128
3129 os_event_set(fts_opt_shutdown_event);
3130 my_thread_end();
3131
3132 /* We count the number of threads in os_thread_exit(). A created
3133 thread should always use that to exit and not use return() to exit. */
3134 os_thread_exit();
3135
3136 OS_THREAD_DUMMY_RETURN;
3137}
3138
3139/**********************************************************************//**
3140Startup the optimize thread and create the work queue. */
3141void
3142fts_optimize_init(void)
3143/*===================*/
3144{
3145 mem_heap_t* heap;
3146 ib_alloc_t* heap_alloc;
3147 dict_table_t* table;
3148
3149 ut_ad(!srv_read_only_mode);
3150
3151 /* For now we only support one optimize thread. */
3152 ut_a(fts_optimize_wq == NULL);
3153
3154 /* Create FTS optimize work queue */
3155 fts_optimize_wq = ib_wqueue_create();
3156 ut_a(fts_optimize_wq != NULL);
3157
3158 /* Create FTS vector to store fts_slot_t */
3159 heap = mem_heap_create(sizeof(dict_table_t*) * 64);
3160 heap_alloc = ib_heap_allocator_create(heap);
3161 fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
3162
3163 /* Add fts tables to the fts_slots vector which were skipped during restart */
3164 std::vector<dict_table_t*> table_vector;
3165 std::vector<dict_table_t*>::iterator it;
3166
3167 mutex_enter(&dict_sys->mutex);
3168 for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
3169 table != NULL;
3170 table = UT_LIST_GET_NEXT(table_LRU, table)) {
3171 if (table->fts &&
3172 dict_table_has_fts_index(table)) {
3173 if (fts_optimize_new_table(fts_slots,
3174 table)){
3175 table_vector.push_back(table);
3176 }
3177 }
3178 }
3179
3180 /* It is better to call dict_table_prevent_eviction()
3181 outside the above loop because it operates on
3182 dict_sys->table_LRU list.*/
3183 for (it=table_vector.begin();it!=table_vector.end();++it) {
3184 dict_table_prevent_eviction(*it);
3185 }
3186
3187 mutex_exit(&dict_sys->mutex);
3188 table_vector.clear();
3189
3190 fts_opt_shutdown_event = os_event_create(0);
3191 last_check_sync_time = ut_time();
3192
3193 os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
3194}
3195
3196/** Shutdown fts optimize thread. */
3197void
3198fts_optimize_shutdown()
3199{
3200 ut_ad(!srv_read_only_mode);
3201
3202 fts_msg_t* msg;
3203
3204 /* If there is an ongoing activity on dictionary, such as
3205 srv_master_evict_from_table_cache(), wait for it */
3206 dict_mutex_enter_for_mysql();
3207
3208 /* Tells FTS optimizer system that we are exiting from
3209 optimizer thread, message send their after will not be
3210 processed */
3211 fts_opt_start_shutdown = true;
3212 dict_mutex_exit_for_mysql();
3213
3214 /* We tell the OPTIMIZE thread to switch to state done, we
3215 can't delete the work queue here because the add thread needs
3216 deregister the FTS tables. */
3217
3218 msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
3219
3220 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
3221
3222 os_event_wait(fts_opt_shutdown_event);
3223
3224 os_event_destroy(fts_opt_shutdown_event);
3225
3226 ib_wqueue_free(fts_optimize_wq);
3227 fts_optimize_wq = NULL;
3228}
3229