1/*****************************************************************************
2
3Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
4Copyright (c) 2017, 2018, MariaDB Corporation.
5
6This program is free software; you can redistribute it and/or modify it under
7the terms of the GNU General Public License as published by the Free Software
8Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful, but WITHOUT
11ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License along with
15this program; if not, write to the Free Software Foundation, Inc.,
1651 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
18*****************************************************************************/
19
20/**************************************************//**
21@file trx/trx0purge.cc
22Purge old versions
23
24Created 3/26/1996 Heikki Tuuri
25*******************************************************/
26
27#include "ha_prototypes.h"
28
29#include "trx0purge.h"
30#include "fsp0fsp.h"
31#include "fut0fut.h"
32#include "mach0data.h"
33#include "mtr0log.h"
34#include "os0thread.h"
35#include "que0que.h"
36#include "row0purge.h"
37#include "row0upd.h"
38#include "srv0mon.h"
39#include "fsp0sysspace.h"
40#include "srv0srv.h"
41#include "srv0start.h"
42#include "sync0sync.h"
43#include "trx0rec.h"
44#include "trx0roll.h"
45#include "trx0rseg.h"
46#include "trx0trx.h"
47#include <mysql/service_wsrep.h>
48
49/** Maximum allowable purge history length. <=0 means 'infinite'. */
50ulong srv_max_purge_lag = 0;
51
52/** Max DML user threads delay in micro-seconds. */
53ulong srv_max_purge_lag_delay = 0;
54
55/** The global data structure coordinating a purge */
56purge_sys_t purge_sys;
57
58/** A dummy undo record used as a return value when we have a whole undo log
59which needs no purge */
60trx_undo_rec_t trx_purge_dummy_rec;
61
62#ifdef UNIV_DEBUG
63my_bool srv_purge_view_update_only_debug;
64#endif /* UNIV_DEBUG */
65
66/** Sentinel value */
67static const TrxUndoRsegs NullElement;
68
69/** Default constructor */
70TrxUndoRsegsIterator::TrxUndoRsegsIterator()
71 : m_rsegs(NullElement), m_iter(m_rsegs.begin())
72{
73}
74
75/** Sets the next rseg to purge in purge_sys.
76Executed in the purge coordinator thread.
77@return whether anything is to be purged */
78inline bool TrxUndoRsegsIterator::set_next()
79{
80 mutex_enter(&purge_sys.pq_mutex);
81
82 /* Only purge consumes events from the priority queue, user
83 threads only produce the events. */
84
85 /* Check if there are more rsegs to process in the
86 current element. */
87 if (m_iter != m_rsegs.end()) {
88 /* We are still processing rollback segment from
89 the same transaction and so expected transaction
90 number shouldn't increase. Undo the increment of
91 expected commit done by caller assuming rollback
92 segments from given transaction are done. */
93 purge_sys.tail.commit = (*m_iter)->last_commit;
94 } else if (!purge_sys.purge_queue.empty()) {
95 m_rsegs = purge_sys.purge_queue.top();
96 purge_sys.purge_queue.pop();
97 ut_ad(purge_sys.purge_queue.empty()
98 || purge_sys.purge_queue.top() != m_rsegs);
99 m_iter = m_rsegs.begin();
100 } else {
101 /* Queue is empty, reset iterator. */
102 purge_sys.rseg = NULL;
103 mutex_exit(&purge_sys.pq_mutex);
104 m_rsegs = NullElement;
105 m_iter = m_rsegs.begin();
106 return false;
107 }
108
109 purge_sys.rseg = *m_iter++;
110 mutex_exit(&purge_sys.pq_mutex);
111 mutex_enter(&purge_sys.rseg->mutex);
112
113 ut_a(purge_sys.rseg->last_page_no != FIL_NULL);
114 ut_ad(purge_sys.rseg->last_trx_no() == m_rsegs.trx_no());
115
116 /* We assume in purge of externally stored fields that space id is
117 in the range of UNDO tablespace space ids */
118 ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE
119 || srv_is_undo_tablespace(purge_sys.rseg->space->id));
120
121 ut_a(purge_sys.tail.commit <= purge_sys.rseg->last_commit);
122
123 purge_sys.tail.commit = purge_sys.rseg->last_commit;
124 purge_sys.hdr_offset = purge_sys.rseg->last_offset;
125 purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
126
127 mutex_exit(&purge_sys.rseg->mutex);
128
129 return(true);
130}
131
132/** Build a purge 'query' graph. The actual purge is performed by executing
133this query graph.
134@return own: the query graph */
135static
136que_t*
137purge_graph_build()
138{
139 ut_a(srv_n_purge_threads > 0);
140
141 trx_t* trx = trx_create();
142 ut_ad(!trx->id);
143 trx->start_time = ut_time();
144 trx->state = TRX_STATE_ACTIVE;
145 trx->op_info = "purge trx";
146
147 mem_heap_t* heap = mem_heap_create(512);
148 que_fork_t* fork = que_fork_create(
149 NULL, NULL, QUE_FORK_PURGE, heap);
150 fork->trx = trx;
151
152 for (ulint i = 0; i < srv_n_purge_threads; ++i) {
153 que_thr_t* thr = que_thr_create(fork, heap, NULL);
154 thr->child = row_purge_node_create(thr, heap);
155 }
156
157 return(fork);
158}
159
160/** Initialise the purge system. */
161void purge_sys_t::create()
162{
163 ut_ad(this == &purge_sys);
164 ut_ad(!enabled());
165 ut_ad(!event);
166 event= os_event_create(0);
167 ut_ad(event);
168 m_paused= 0;
169 query= purge_graph_build();
170 n_submitted= 0;
171 n_completed= 0;
172 next_stored= false;
173 rseg= NULL;
174 page_no= 0;
175 offset= 0;
176 hdr_page_no= 0;
177 hdr_offset= 0;
178 rw_lock_create(trx_purge_latch_key, &latch, SYNC_PURGE_LATCH);
179 mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
180 undo_trunc.create();
181}
182
183/** Close the purge subsystem on shutdown. */
184void purge_sys_t::close()
185{
186 ut_ad(this == &purge_sys);
187 if (!event) return;
188
189 m_enabled= false;
190 trx_t* trx = query->trx;
191 que_graph_free(query);
192 ut_ad(!trx->id);
193 ut_ad(trx->state == TRX_STATE_ACTIVE);
194 trx->state= TRX_STATE_NOT_STARTED;
195 trx_free(trx);
196 rw_lock_free(&latch);
197 /* rw_lock_free() already called latch.~rw_lock_t(); tame the
198 debug assertions when the destructor will be called once more. */
199 ut_ad(latch.magic_n == 0);
200 ut_d(latch.magic_n= RW_LOCK_MAGIC_N);
201 mutex_free(&pq_mutex);
202 os_event_destroy(event);
203}
204
205/*================ UNDO LOG HISTORY LIST =============================*/
206
207/** Prepend the history list with an undo log.
208Remove the undo log segment from the rseg slot if it is too big for reuse.
209@param[in] trx transaction
210@param[in,out] undo undo log
211@param[in,out] mtr mini-transaction */
212void
213trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
214{
215 DBUG_PRINT("trx", ("commit(" TRX_ID_FMT "," TRX_ID_FMT ")",
216 trx->id, trx->no));
217 ut_ad(undo == trx->rsegs.m_redo.undo
218 || undo == trx->rsegs.m_redo.old_insert);
219 trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
220 ut_ad(undo->rseg == rseg);
221 trx_rsegf_t* rseg_header = trx_rsegf_get(
222 rseg->space, rseg->page_no, mtr);
223 page_t* undo_page = trx_undo_set_state_at_finish(
224 undo, mtr);
225 trx_ulogf_t* undo_header = undo_page + undo->hdr_offset;
226
227 ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1);
228
229 if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG_FORMAT + rseg_header))) {
230 /* This database must have been upgraded from
231 before MariaDB 10.3.5. */
232 trx_rseg_format_upgrade(rseg_header, mtr);
233 }
234
235 if (undo->state != TRX_UNDO_CACHED) {
236 ulint hist_size;
237#ifdef UNIV_DEBUG
238 trx_usegf_t* seg_header = undo_page + TRX_UNDO_SEG_HDR;
239#endif /* UNIV_DEBUG */
240
241 /* The undo log segment will not be reused */
242 ut_a(undo->id < TRX_RSEG_N_SLOTS);
243 trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
244
245 MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
246
247 hist_size = mtr_read_ulint(
248 rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);
249
250 ut_ad(undo->size == flst_get_len(
251 seg_header + TRX_UNDO_PAGE_LIST));
252
253 mlog_write_ulint(
254 rseg_header + TRX_RSEG_HISTORY_SIZE,
255 hist_size + undo->size, MLOG_4BYTES, mtr);
256
257 mlog_write_ull(rseg_header + TRX_RSEG_MAX_TRX_ID,
258 trx_sys.get_max_trx_id(), mtr);
259 }
260
261 /* Before any transaction-generating background threads or the
262 purge have been started, recv_recovery_rollback_active() can
263 start transactions in row_merge_drop_temp_indexes() and
264 fts_drop_orphaned_tables(), and roll back recovered transactions.
265
266 Arbitrary user transactions may be executed when all the undo log
267 related background processes (including purge) are disabled due to
268 innodb_force_recovery=2 or innodb_force_recovery=3.
269 DROP TABLE may be executed at any innodb_force_recovery level.
270
271 After the purge thread has been given permission to exit,
272 in fast shutdown, we may roll back transactions (trx->undo_no==0)
273 in THD::cleanup() invoked from unlink_thd(), and we may also
274 continue to execute user transactions. */
275 ut_ad(srv_undo_sources
276 || (!purge_sys.enabled()
277 && (srv_startup_is_before_trx_rollback_phase
278 || trx_rollback_is_active
279 || srv_force_recovery >= SRV_FORCE_NO_BACKGROUND))
280 || ((trx->undo_no == 0 || trx->mysql_thd
281 || trx->internal)
282 && srv_fast_shutdown));
283
284#ifdef WITH_WSREP
285 if (wsrep_is_wsrep_xid(trx->xid)) {
286 trx_rseg_update_wsrep_checkpoint(rseg_header, trx->xid, mtr);
287 }
288#endif
289
290 if (trx->mysql_log_file_name && *trx->mysql_log_file_name) {
291 /* Update the latest MySQL binlog name and offset info
292 in rollback segment header if MySQL binlogging is on
293 or the database server is a MySQL replication save. */
294 trx_rseg_update_binlog_offset(rseg_header, trx, mtr);
295 }
296
297 /* Add the log as the first in the history list */
298 flst_add_first(rseg_header + TRX_RSEG_HISTORY,
299 undo_header + TRX_UNDO_HISTORY_NODE, mtr);
300
301 mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
302 /* This is needed for upgrading old undo log pages from
303 before MariaDB 10.3.1. */
304 if (UNIV_UNLIKELY(!mach_read_from_2(undo_header
305 + TRX_UNDO_NEEDS_PURGE))) {
306 mlog_write_ulint(undo_header + TRX_UNDO_NEEDS_PURGE, 1,
307 MLOG_2BYTES, mtr);
308 }
309
310 if (rseg->last_page_no == FIL_NULL) {
311 rseg->last_page_no = undo->hdr_page_no;
312 rseg->last_offset = undo->hdr_offset;
313 rseg->set_last_trx_no(trx->no, undo == trx->rsegs.m_redo.undo);
314 rseg->needs_purge = true;
315 }
316
317 trx_sys.history_insert();
318
319 if (undo->state == TRX_UNDO_CACHED) {
320 UT_LIST_ADD_FIRST(rseg->undo_cached, undo);
321 MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED);
322 } else {
323 ut_ad(undo->state == TRX_UNDO_TO_PURGE);
324 ut_free(undo);
325 }
326
327 undo = NULL;
328}
329
330/** Remove undo log header from the history list.
331@param[in,out] rseg_hdr rollback segment header
332@param[in] log_hdr undo log segment header
333@param[in,out] mtr mini transaction. */
334static
335void
336trx_purge_remove_log_hdr(
337 trx_rsegf_t* rseg_hdr,
338 trx_ulogf_t* log_hdr,
339 mtr_t* mtr)
340{
341 flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
342 log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
343 trx_sys.history_remove();
344}
345
346/** Free an undo log segment, and remove the header from the history list.
347@param[in,out] rseg rollback segment
348@param[in] hdr_addr file address of log_hdr */
349static
350void
351trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
352{
353 mtr_t mtr;
354 trx_rsegf_t* rseg_hdr;
355 page_t* undo_page;
356
357 mtr.start();
358 mutex_enter(&rseg->mutex);
359
360 rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
361 undo_page = trx_undo_page_get(
362 page_id_t(rseg->space->id, hdr_addr.page), &mtr);
363
364 /* Mark the last undo log totally purged, so that if the
365 system crashes, the tail of the undo log will not get accessed
366 again. The list of pages in the undo log tail gets
367 inconsistent during the freeing of the segment, and therefore
368 purge should not try to access them again. */
369 mlog_write_ulint(undo_page + hdr_addr.boffset + TRX_UNDO_NEEDS_PURGE,
370 0, MLOG_2BYTES, &mtr);
371
372 while (!fseg_free_step_not_header(
373 TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
374 + undo_page, false, &mtr)) {
375 mutex_exit(&rseg->mutex);
376
377 mtr.commit();
378 mtr.start();
379
380 mutex_enter(&rseg->mutex);
381
382 rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
383
384 undo_page = trx_undo_page_get(
385 page_id_t(rseg->space->id, hdr_addr.page), &mtr);
386 }
387
388 /* The page list may now be inconsistent, but the length field
389 stored in the list base node tells us how big it was before we
390 started the freeing. */
391
392 const ulint seg_size = flst_get_len(
393 TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + undo_page);
394
395 /* We may free the undo log segment header page; it must be freed
396 within the same mtr as the undo log header is removed from the
397 history list: otherwise, in case of a database crash, the segment
398 could become inaccessible garbage in the file space. */
399
400 trx_purge_remove_log_hdr(rseg_hdr, undo_page + hdr_addr.boffset, &mtr);
401
402 do {
403
404 /* Here we assume that a file segment with just the header
405 page can be freed in a few steps, so that the buffer pool
406 is not flooded with bufferfixed pages: see the note in
407 fsp0fsp.cc. */
408
409 } while (!fseg_free_step(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
410 + undo_page, false, &mtr));
411
412 const ulint hist_size = mach_read_from_4(rseg_hdr
413 + TRX_RSEG_HISTORY_SIZE);
414 ut_ad(hist_size >= seg_size);
415
416 mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
417 hist_size - seg_size, MLOG_4BYTES, &mtr);
418
419 ut_ad(rseg->curr_size >= seg_size);
420
421 rseg->curr_size -= seg_size;
422
423 mutex_exit(&(rseg->mutex));
424
425 mtr_commit(&mtr);
426}
427
428/** Remove unnecessary history data from a rollback segment.
429@param[in,out] rseg rollback segment
430@param[in] limit truncate anything before this */
431static
432void
433trx_purge_truncate_rseg_history(
434 trx_rseg_t& rseg,
435 const purge_sys_t::iterator& limit)
436{
437 fil_addr_t hdr_addr;
438 fil_addr_t prev_hdr_addr;
439 trx_rsegf_t* rseg_hdr;
440 page_t* undo_page;
441 trx_ulogf_t* log_hdr;
442 trx_usegf_t* seg_hdr;
443 mtr_t mtr;
444 trx_id_t undo_trx_no;
445
446 mtr.start();
447 ut_ad(rseg.is_persistent());
448 mutex_enter(&rseg.mutex);
449
450 rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
451
452 hdr_addr = trx_purge_get_log_from_hist(
453 flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
454loop:
455 if (hdr_addr.page == FIL_NULL) {
456func_exit:
457 mutex_exit(&rseg.mutex);
458 mtr.commit();
459 return;
460 }
461
462 undo_page = trx_undo_page_get(page_id_t(rseg.space->id, hdr_addr.page),
463 &mtr);
464
465 log_hdr = undo_page + hdr_addr.boffset;
466
467 undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
468
469 if (undo_trx_no >= limit.trx_no()) {
470 if (undo_trx_no == limit.trx_no()) {
471 trx_undo_truncate_start(
472 &rseg, hdr_addr.page,
473 hdr_addr.boffset, limit.undo_no);
474 }
475
476 goto func_exit;
477 }
478
479 prev_hdr_addr = trx_purge_get_log_from_hist(
480 flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
481
482 seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
483
484 if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
485 && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
486
487 /* We can free the whole log segment */
488
489 mutex_exit(&rseg.mutex);
490 mtr.commit();
491
492 /* calls the trx_purge_remove_log_hdr()
493 inside trx_purge_free_segment(). */
494 trx_purge_free_segment(&rseg, hdr_addr);
495 } else {
496 /* Remove the log hdr from the rseg history. */
497 trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
498
499 mutex_exit(&rseg.mutex);
500 mtr.commit();
501 }
502
503 mtr.start();
504 mutex_enter(&rseg.mutex);
505
506 rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
507
508 hdr_addr = prev_hdr_addr;
509
510 goto loop;
511}
512
513/** UNDO log truncate logger. Needed to track state of truncate during crash.
514An auxiliary redo log file undo_<space_id>_trunc.log will created while the
515truncate of the UNDO is in progress. This file is required during recovery
516to complete the truncate. */
517
518namespace undo {
519
520 /** Populate log file name based on space_id
521 @param[in] space_id id of the undo tablespace.
522 @return DB_SUCCESS or error code */
523 dberr_t populate_log_file_name(
524 ulint space_id,
525 char*& log_file_name)
526 {
527 ulint log_file_name_sz =
528 strlen(srv_log_group_home_dir) + 22 + 1 /* NUL */
529 + strlen(undo::s_log_prefix)
530 + strlen(undo::s_log_ext);
531
532 log_file_name = new (std::nothrow) char[log_file_name_sz];
533 if (log_file_name == 0) {
534 return(DB_OUT_OF_MEMORY);
535 }
536
537 memset(log_file_name, 0, log_file_name_sz);
538
539 strcpy(log_file_name, srv_log_group_home_dir);
540 ulint log_file_name_len = strlen(log_file_name);
541
542 if (log_file_name[log_file_name_len - 1]
543 != OS_PATH_SEPARATOR) {
544
545 log_file_name[log_file_name_len]
546 = OS_PATH_SEPARATOR;
547 log_file_name_len = strlen(log_file_name);
548 }
549
550 snprintf(log_file_name + log_file_name_len,
551 log_file_name_sz - log_file_name_len,
552 "%s%lu_%s", undo::s_log_prefix,
553 (ulong) space_id, s_log_ext);
554
555 return(DB_SUCCESS);
556 }
557
558 /** Create the truncate log file.
559 @param[in] space_id id of the undo tablespace to truncate.
560 @return DB_SUCCESS or error code. */
561 dberr_t init(ulint space_id)
562 {
563 dberr_t err;
564 char* log_file_name;
565
566 /* Step-1: Create the log file name using the pre-decided
567 prefix/suffix and table id of undo tablepsace to truncate. */
568 err = populate_log_file_name(space_id, log_file_name);
569 if (err != DB_SUCCESS) {
570 return(err);
571 }
572
573 /* Step-2: Create the log file, open it and write 0 to
574 indicate init phase. */
575 bool ret;
576 os_file_t handle = os_file_create(
577 innodb_log_file_key, log_file_name, OS_FILE_CREATE,
578 OS_FILE_NORMAL, OS_LOG_FILE, srv_read_only_mode, &ret);
579 if (!ret) {
580 delete[] log_file_name;
581 return(DB_IO_ERROR);
582 }
583
584 ulint sz = srv_page_size;
585 void* buf = ut_zalloc_nokey(sz + srv_page_size);
586 if (buf == NULL) {
587 os_file_close(handle);
588 delete[] log_file_name;
589 return(DB_OUT_OF_MEMORY);
590 }
591
592 byte* log_buf = static_cast<byte*>(
593 ut_align(buf, srv_page_size));
594
595 IORequest request(IORequest::WRITE);
596
597 err = os_file_write(
598 request, log_file_name, handle, log_buf, 0, sz);
599
600 os_file_flush(handle);
601 os_file_close(handle);
602
603 ut_free(buf);
604 delete[] log_file_name;
605
606 return(err);
607 }
608
609 /** Mark completion of undo truncate action by writing magic number to
610 the log file and then removing it from the disk.
611 If we are going to remove it from disk then why write magic number ?
612 This is to safeguard from unlink (file-system) anomalies that will keep
613 the link to the file even after unlink action is successfull and
614 ref-count = 0.
615 @param[in] space_id id of the undo tablespace to truncate.*/
616 void done(
617 ulint space_id)
618 {
619 dberr_t err;
620 char* log_file_name;
621
622 /* Step-1: Create the log file name using the pre-decided
623 prefix/suffix and table id of undo tablepsace to truncate. */
624 err = populate_log_file_name(space_id, log_file_name);
625 if (err != DB_SUCCESS) {
626 return;
627 }
628
629 /* Step-2: Open log file and write magic number to
630 indicate done phase. */
631 bool ret;
632 os_file_t handle =
633 os_file_create_simple_no_error_handling(
634 innodb_log_file_key, log_file_name,
635 OS_FILE_OPEN, OS_FILE_READ_WRITE,
636 srv_read_only_mode, &ret);
637
638 if (!ret) {
639 os_file_delete(innodb_log_file_key, log_file_name);
640 delete[] log_file_name;
641 return;
642 }
643
644 ulint sz = srv_page_size;
645 void* buf = ut_zalloc_nokey(sz + srv_page_size);
646 if (buf == NULL) {
647 os_file_close(handle);
648 os_file_delete(innodb_log_file_key, log_file_name);
649 delete[] log_file_name;
650 return;
651 }
652
653 byte* log_buf = static_cast<byte*>(
654 ut_align(buf, srv_page_size));
655
656 mach_write_to_4(log_buf, undo::s_magic);
657
658 IORequest request(IORequest::WRITE);
659
660 err = os_file_write(
661 request, log_file_name, handle, log_buf, 0, sz);
662
663 ut_ad(err == DB_SUCCESS);
664
665 os_file_flush(handle);
666 os_file_close(handle);
667
668 ut_free(buf);
669 os_file_delete(innodb_log_file_key, log_file_name);
670 delete[] log_file_name;
671 }
672
673 /** Check if TRUNCATE_DDL_LOG file exist.
674 @param[in] space_id id of the undo tablespace.
675 @return true if exist else false. */
676 bool is_log_present(
677 ulint space_id)
678 {
679 dberr_t err;
680 char* log_file_name;
681
682 /* Step-1: Populate log file name. */
683 err = populate_log_file_name(space_id, log_file_name);
684 if (err != DB_SUCCESS) {
685 return(false);
686 }
687
688 /* Step-2: Check for existence of the file. */
689 bool exist;
690 os_file_type_t type;
691 os_file_status(log_file_name, &exist, &type);
692
693 /* Step-3: If file exists, check it for presence of magic
694 number. If found, then delete the file and report file
695 doesn't exist as presence of magic number suggest that
696 truncate action was complete. */
697
698 if (exist) {
699 bool ret;
700 os_file_t handle =
701 os_file_create_simple_no_error_handling(
702 innodb_log_file_key, log_file_name,
703 OS_FILE_OPEN, OS_FILE_READ_WRITE,
704 srv_read_only_mode, &ret);
705 if (!ret) {
706 os_file_delete(innodb_log_file_key,
707 log_file_name);
708 delete[] log_file_name;
709 return(false);
710 }
711
712 ulint sz = srv_page_size;
713 void* buf = ut_zalloc_nokey(sz + srv_page_size);
714 if (buf == NULL) {
715 os_file_close(handle);
716 os_file_delete(innodb_log_file_key,
717 log_file_name);
718 delete[] log_file_name;
719 return(false);
720 }
721
722 byte* log_buf = static_cast<byte*>(
723 ut_align(buf, srv_page_size));
724
725 IORequest request(IORequest::READ);
726
727 dberr_t err;
728
729 err = os_file_read(request, handle, log_buf, 0, sz);
730
731 os_file_close(handle);
732
733 if (err != DB_SUCCESS) {
734
735 ib::info()
736 << "Unable to read '"
737 << log_file_name << "' : "
738 << ut_strerr(err);
739
740 os_file_delete(
741 innodb_log_file_key, log_file_name);
742
743 ut_free(buf);
744
745 delete[] log_file_name;
746
747 return(false);
748 }
749
750 ulint magic_no = mach_read_from_4(log_buf);
751
752 ut_free(buf);
753
754 if (magic_no == undo::s_magic) {
755 /* Found magic number. */
756 os_file_delete(innodb_log_file_key,
757 log_file_name);
758 delete[] log_file_name;
759 return(false);
760 }
761 }
762
763 delete[] log_file_name;
764
765 return(exist);
766 }
767};
768
769/** Iterate over all the UNDO tablespaces and check if any of the UNDO
770tablespace qualifies for TRUNCATE (size > threshold).
771@param[in,out] undo_trunc undo truncate tracker */
772static
773void
774trx_purge_mark_undo_for_truncate(
775 undo::Truncate* undo_trunc)
776{
777 /* Step-1: If UNDO Tablespace
778 - already marked for truncate (OR)
779 - truncate disabled
780 return immediately else search for qualifying tablespace. */
781 if (undo_trunc->is_marked() || !srv_undo_log_truncate) {
782 return;
783 }
784
785 /* Step-2: Validation/Qualification checks
786 a. At-least 2 UNDO tablespaces so even if one UNDO tablespace
787 is being truncated server can continue to operate.
788 b. At-least 2 persistent UNDO logs (besides the default rseg-0)
789 b. At-least 1 UNDO tablespace size > threshold. */
790 if (srv_undo_tablespaces_active < 2 || srv_undo_logs < 3) {
791 return;
792 }
793
794 /* Avoid bias selection and so start the scan from immediate next
795 of last selected UNDO tablespace for truncate. */
796 ulint space_id = undo_trunc->get_scan_start();
797
798 for (ulint i = 1; i <= srv_undo_tablespaces_active; i++) {
799
800 if (fil_space_get_size(space_id)
801 > (srv_max_undo_log_size >> srv_page_size_shift)) {
802 /* Tablespace qualifies for truncate. */
803 undo_trunc->mark(space_id);
804 undo::Truncate::add_space_to_trunc_list(space_id);
805 break;
806 }
807
808 space_id = ((space_id + 1) % (srv_undo_tablespaces_active + 1));
809 if (space_id == 0) {
810 /* Note: UNDO tablespace ids starts from 1. */
811 ++space_id;
812 }
813 }
814
815 /* Couldn't make any selection. */
816 if (!undo_trunc->is_marked()) {
817 return;
818 }
819
820 DBUG_LOG("undo",
821 "marking for truncate UNDO tablespace "
822 << undo_trunc->get_marked_space_id());
823
824 /* Step-3: Iterate over all the rsegs of selected UNDO tablespace
825 and mark them temporarily unavailable for allocation.*/
826 for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
827 if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) {
828 ut_ad(rseg->is_persistent());
829 if (rseg->space->id
830 == undo_trunc->get_marked_space_id()) {
831
832 /* Once set this rseg will not be allocated
833 to new booting transaction but we will wait
834 for existing active transaction to finish. */
835 rseg->skip_allocation = true;
836 undo_trunc->add_rseg_to_trunc(rseg);
837 }
838 }
839 }
840}
841
842undo::undo_spaces_t undo::Truncate::s_spaces_to_truncate;
843
844/** Cleanse purge queue to remove the rseg that reside in undo-tablespace
845marked for truncate.
846@param[in,out] undo_trunc undo truncate tracker */
847static
848void
849trx_purge_cleanse_purge_queue(
850 undo::Truncate* undo_trunc)
851{
852 mutex_enter(&purge_sys.pq_mutex);
853 typedef std::vector<TrxUndoRsegs> purge_elem_list_t;
854 purge_elem_list_t purge_elem_list;
855
856 /* Remove rseg instances that are in the purge queue before we start
857 truncate of corresponding UNDO truncate. */
858 while (!purge_sys.purge_queue.empty()) {
859 purge_elem_list.push_back(purge_sys.purge_queue.top());
860 purge_sys.purge_queue.pop();
861 }
862 ut_ad(purge_sys.purge_queue.empty());
863
864 for (purge_elem_list_t::iterator it = purge_elem_list.begin();
865 it != purge_elem_list.end();
866 ++it) {
867
868 for (TrxUndoRsegs::iterator it2 = it->begin();
869 it2 != it->end();
870 ++it2) {
871
872 if ((*it2)->space->id
873 == undo_trunc->get_marked_space_id()) {
874 it->erase(it2);
875 break;
876 }
877 }
878
879 if (!it->empty()) {
880 purge_sys.purge_queue.push(*it);
881 }
882 }
883 mutex_exit(&purge_sys.pq_mutex);
884}
885
886/** Iterate over selected UNDO tablespace and check if all the rsegs
887that resides in the tablespace are free.
888@param[in] limit truncate_limit
889@param[in,out] undo_trunc undo truncate tracker */
890static
891void
892trx_purge_initiate_truncate(
893 const purge_sys_t::iterator& limit,
894 undo::Truncate* undo_trunc)
895{
896 /* Step-1: Early check to findout if any of the the UNDO tablespace
897 is marked for truncate. */
898 if (!undo_trunc->is_marked()) {
899 /* No tablespace marked for truncate yet. */
900 return;
901 }
902
903 /* Step-2: Scan over each rseg and ensure that it doesn't hold any
904 active undo records. */
905 bool all_free = true;
906
907 for (ulint i = 0; i < undo_trunc->rsegs_size() && all_free; ++i) {
908
909 trx_rseg_t* rseg = undo_trunc->get_ith_rseg(i);
910
911 mutex_enter(&rseg->mutex);
912
913 if (rseg->trx_ref_count > 0) {
914 /* This rseg is still being held by an active
915 transaction. */
916 all_free = false;
917 mutex_exit(&rseg->mutex);
918 continue;
919 }
920
921 ut_ad(rseg->trx_ref_count == 0);
922 ut_ad(rseg->skip_allocation);
923
924 ulint size_of_rsegs = rseg->curr_size;
925
926 if (size_of_rsegs == 1) {
927 mutex_exit(&rseg->mutex);
928 continue;
929 } else {
930
931 /* There could be cached undo segment. Check if records
932 in these segments can be purged. Normal purge history
933 will not touch these cached segment. */
934 ulint cached_undo_size = 0;
935
936 for (trx_undo_t* undo =
937 UT_LIST_GET_FIRST(rseg->undo_cached);
938 undo != NULL && all_free;
939 undo = UT_LIST_GET_NEXT(undo_list, undo)) {
940
941 if (limit.trx_no() < undo->trx_id) {
942 all_free = false;
943 } else {
944 cached_undo_size += undo->size;
945 }
946 }
947
948 ut_ad(size_of_rsegs >= (cached_undo_size + 1));
949
950 if (size_of_rsegs > (cached_undo_size + 1)) {
951 /* There are pages besides cached pages that
952 still hold active data. */
953 all_free = false;
954 }
955 }
956
957 mutex_exit(&rseg->mutex);
958 }
959
960 if (!all_free) {
961 /* rseg still holds active data.*/
962 return;
963 }
964
965
966 /* Step-3: Start the actual truncate.
967 a. log-checkpoint
968 b. Write the DDL log to protect truncate action from CRASH
969 c. Remove rseg instance if added to purge queue before we
970 initiate truncate.
971 d. Execute actual truncate
972 e. Remove the DDL log. */
973
974 /* After truncate if server crashes then redo logging done for this
975 undo tablespace might not stand valid as tablespace has been
976 truncated. */
977 log_make_checkpoint_at(LSN_MAX, TRUE);
978
979 const ulint space_id = undo_trunc->get_marked_space_id();
980
981 ib::info() << "Truncating UNDO tablespace " << space_id;
982
983#ifdef UNIV_DEBUG
984 dberr_t err =
985#endif /* UNIV_DEBUG */
986 undo_trunc->start_logging(space_id);
987 ut_ad(err == DB_SUCCESS);
988
989 DBUG_EXECUTE_IF("ib_undo_trunc_before_truncate",
990 ib::info() << "ib_undo_trunc_before_truncate";
991 DBUG_SUICIDE(););
992
993 trx_purge_cleanse_purge_queue(undo_trunc);
994
995 if (!trx_undo_truncate_tablespace(undo_trunc)) {
996 /* Note: In case of error we don't enable the rsegs
997 and neither unmark the tablespace so the tablespace
998 continue to remain inactive. */
999 ib::error() << "Failed to truncate UNDO tablespace "
1000 << space_id;
1001 return;
1002 }
1003
1004 if (purge_sys.rseg != NULL
1005 && purge_sys.rseg->last_page_no == FIL_NULL) {
1006 /* If purge_sys.rseg is pointing to rseg that was recently
1007 truncated then move to next rseg element.
1008 Note: Ideally purge_sys.rseg should be NULL because purge
1009 should complete processing of all the records but there is
1010 purge_batch_size that can force the purge loop to exit before
1011 all the records are purged and in this case purge_sys.rseg
1012 could point to a valid rseg waiting for next purge cycle. */
1013 purge_sys.next_stored = false;
1014 purge_sys.rseg = NULL;
1015 }
1016
1017 DBUG_EXECUTE_IF("ib_undo_trunc_before_ddl_log_end",
1018 ib::info() << "ib_undo_trunc_before_ddl_log_end";
1019 DBUG_SUICIDE(););
1020
1021 log_make_checkpoint_at(LSN_MAX, TRUE);
1022
1023 undo_trunc->done_logging(space_id);
1024
1025 /* Completed truncate. Now it is safe to re-use the tablespace. */
1026 for (ulint i = 0; i < undo_trunc->rsegs_size(); ++i) {
1027 trx_rseg_t* rseg = undo_trunc->get_ith_rseg(i);
1028 rseg->skip_allocation = false;
1029 }
1030
1031 ib::info() << "Truncated UNDO tablespace " << space_id;
1032
1033 undo_trunc->reset();
1034 undo::Truncate::clear_trunc_list();
1035
1036 DBUG_EXECUTE_IF("ib_undo_trunc_trunc_done",
1037 ib::info() << "ib_undo_trunc_trunc_done";
1038 DBUG_SUICIDE(););
1039}
1040
1041/**
1042Removes unnecessary history data from rollback segments. NOTE that when this
1043function is called, the caller must not have any latches on undo log pages!
1044*/
1045static void trx_purge_truncate_history()
1046{
1047 ut_ad(purge_sys.head <= purge_sys.tail);
1048 purge_sys_t::iterator& head = purge_sys.head.commit
1049 ? purge_sys.head : purge_sys.tail;
1050
1051 if (head.trx_no() >= purge_sys.view.low_limit_no()) {
1052 /* This is sometimes necessary. TODO: find out why. */
1053 head.reset_trx_no(purge_sys.view.low_limit_no());
1054 head.undo_no = 0;
1055 }
1056
1057 for (ulint i = 0; i < TRX_SYS_N_RSEGS; ++i) {
1058 if (trx_rseg_t* rseg = trx_sys.rseg_array[i]) {
1059 ut_ad(rseg->id == i);
1060 trx_purge_truncate_rseg_history(*rseg, head);
1061 }
1062 }
1063
1064 /* UNDO tablespace truncate. We will try to truncate as much as we
1065 can (greedy approach). This will ensure when the server is idle we
1066 try and truncate all the UNDO tablespaces. */
1067 for (ulint i = srv_undo_tablespaces_active; i--; ) {
1068 trx_purge_mark_undo_for_truncate(&purge_sys.undo_trunc);
1069 trx_purge_initiate_truncate(head, &purge_sys.undo_trunc);
1070 }
1071}
1072
1073/***********************************************************************//**
1074Updates the last not yet purged history log info in rseg when we have purged
1075a whole undo log. Advances also purge_sys.purge_trx_no past the purged log. */
1076static
1077void
1078trx_purge_rseg_get_next_history_log(
1079/*================================*/
1080 trx_rseg_t* rseg, /*!< in: rollback segment */
1081 ulint* n_pages_handled)/*!< in/out: number of UNDO pages
1082 handled */
1083{
1084 page_t* undo_page;
1085 trx_ulogf_t* log_hdr;
1086 fil_addr_t prev_log_addr;
1087 trx_id_t trx_no;
1088 mtr_t mtr;
1089
1090 mutex_enter(&(rseg->mutex));
1091
1092 ut_a(rseg->last_page_no != FIL_NULL);
1093
1094 purge_sys.tail.commit = rseg->last_commit + 1;
1095 purge_sys.tail.undo_no = 0;
1096 purge_sys.next_stored = false;
1097
1098 mtr_start(&mtr);
1099
1100 undo_page = trx_undo_page_get_s_latched(
1101 page_id_t(rseg->space->id, rseg->last_page_no), &mtr);
1102
1103 log_hdr = undo_page + rseg->last_offset;
1104
1105 /* Increase the purge page count by one for every handled log */
1106
1107 (*n_pages_handled)++;
1108
1109 prev_log_addr = trx_purge_get_log_from_hist(
1110 flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
1111
1112 if (prev_log_addr.page == FIL_NULL) {
1113 /* No logs left in the history list */
1114
1115 rseg->last_page_no = FIL_NULL;
1116
1117 mutex_exit(&(rseg->mutex));
1118 mtr_commit(&mtr);
1119 return;
1120 }
1121
1122 mutex_exit(&rseg->mutex);
1123
1124 mtr_commit(&mtr);
1125
1126 /* Read the previous log header. */
1127 mtr_start(&mtr);
1128
1129 log_hdr = trx_undo_page_get_s_latched(page_id_t(rseg->space->id,
1130 prev_log_addr.page),
1131 &mtr)
1132 + prev_log_addr.boffset;
1133
1134 trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
1135 unsigned purge = mach_read_from_2(log_hdr + TRX_UNDO_NEEDS_PURGE);
1136 ut_ad(purge <= 1);
1137
1138 mtr_commit(&mtr);
1139
1140 mutex_enter(&(rseg->mutex));
1141
1142 rseg->last_page_no = prev_log_addr.page;
1143 rseg->last_offset = prev_log_addr.boffset;
1144 rseg->set_last_trx_no(trx_no, purge != 0);
1145 rseg->needs_purge = purge != 0;
1146
1147 /* Purge can also produce events, however these are already ordered
1148 in the rollback segment and any user generated event will be greater
1149 than the events that Purge produces. ie. Purge can never produce
1150 events from an empty rollback segment. */
1151
1152 mutex_enter(&purge_sys.pq_mutex);
1153
1154 purge_sys.purge_queue.push(*rseg);
1155
1156 mutex_exit(&purge_sys.pq_mutex);
1157
1158 mutex_exit(&rseg->mutex);
1159}
1160
1161/** Position the purge sys "iterator" on the undo record to use for purging. */
1162static
1163void
1164trx_purge_read_undo_rec()
1165{
1166 ulint offset;
1167 ulint page_no;
1168 ib_uint64_t undo_no;
1169
1170 purge_sys.hdr_offset = purge_sys.rseg->last_offset;
1171 page_no = purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
1172
1173 if (purge_sys.rseg->needs_purge) {
1174 mtr_t mtr;
1175 mtr.start();
1176 if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec(
1177 purge_sys.rseg->space, purge_sys.hdr_page_no,
1178 purge_sys.hdr_offset, RW_S_LATCH, &mtr)) {
1179
1180 offset = page_offset(undo_rec);
1181 undo_no = trx_undo_rec_get_undo_no(undo_rec);
1182 page_no = page_get_page_no(page_align(undo_rec));
1183 } else {
1184 offset = 0;
1185 undo_no = 0;
1186 }
1187
1188 mtr.commit();
1189 } else {
1190 offset = 0;
1191 undo_no = 0;
1192 }
1193
1194 purge_sys.offset = offset;
1195 purge_sys.page_no = page_no;
1196 purge_sys.tail.undo_no = undo_no;
1197
1198 purge_sys.next_stored = true;
1199}
1200
1201/***********************************************************************//**
1202Chooses the next undo log to purge and updates the info in purge_sys. This
1203function is used to initialize purge_sys when the next record to purge is
1204not known, and also to update the purge system info on the next record when
1205purge has handled the whole undo log for a transaction. */
1206static
1207void
1208trx_purge_choose_next_log(void)
1209/*===========================*/
1210{
1211 ut_ad(!purge_sys.next_stored);
1212
1213 if (purge_sys.rseg_iter.set_next()) {
1214 trx_purge_read_undo_rec();
1215 } else {
1216 /* There is nothing to do yet. */
1217 os_thread_yield();
1218 }
1219}
1220
1221/***********************************************************************//**
1222Gets the next record to purge and updates the info in the purge system.
1223@return copy of an undo log record or pointer to the dummy undo log record */
1224static
1225trx_undo_rec_t*
1226trx_purge_get_next_rec(
1227/*===================*/
1228 ulint* n_pages_handled,/*!< in/out: number of UNDO pages
1229 handled */
1230 mem_heap_t* heap) /*!< in: memory heap where copied */
1231{
1232 trx_undo_rec_t* rec;
1233 trx_undo_rec_t* rec_copy;
1234 trx_undo_rec_t* rec2;
1235 page_t* undo_page;
1236 page_t* page;
1237 ulint offset;
1238 ulint page_no;
1239 ulint space;
1240 mtr_t mtr;
1241
1242 ut_ad(purge_sys.next_stored);
1243 ut_ad(purge_sys.tail.trx_no() < purge_sys.view.low_limit_no());
1244
1245 space = purge_sys.rseg->space->id;
1246 page_no = purge_sys.page_no;
1247 offset = purge_sys.offset;
1248
1249 if (offset == 0) {
1250 /* It is the dummy undo log record, which means that there is
1251 no need to purge this undo log */
1252
1253 trx_purge_rseg_get_next_history_log(
1254 purge_sys.rseg, n_pages_handled);
1255
1256 /* Look for the next undo log and record to purge */
1257
1258 trx_purge_choose_next_log();
1259
1260 return(&trx_purge_dummy_rec);
1261 }
1262
1263 mtr_start(&mtr);
1264
1265 undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
1266 &mtr);
1267
1268 rec = undo_page + offset;
1269
1270 rec2 = trx_undo_page_get_next_rec(rec, purge_sys.hdr_page_no,
1271 purge_sys.hdr_offset);
1272
1273 if (rec2 == NULL) {
1274 rec2 = trx_undo_get_next_rec(rec, purge_sys.hdr_page_no,
1275 purge_sys.hdr_offset, &mtr);
1276 }
1277
1278 if (rec2 == NULL) {
1279 mtr_commit(&mtr);
1280
1281 trx_purge_rseg_get_next_history_log(
1282 purge_sys.rseg, n_pages_handled);
1283
1284 /* Look for the next undo log and record to purge */
1285
1286 trx_purge_choose_next_log();
1287
1288 mtr_start(&mtr);
1289
1290 undo_page = trx_undo_page_get_s_latched(
1291 page_id_t(space, page_no), &mtr);
1292
1293 rec = undo_page + offset;
1294 } else {
1295 page = page_align(rec2);
1296
1297 purge_sys.offset = ulint(rec2 - page);
1298 purge_sys.page_no = page_get_page_no(page);
1299 purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2);
1300
1301 if (undo_page != page) {
1302 /* We advance to a new page of the undo log: */
1303 (*n_pages_handled)++;
1304 }
1305 }
1306
1307 rec_copy = trx_undo_rec_copy(rec, heap);
1308
1309 mtr_commit(&mtr);
1310
1311 return(rec_copy);
1312}
1313
1314/********************************************************************//**
1315Fetches the next undo log record from the history list to purge. It must be
1316released with the corresponding release function.
1317@return copy of an undo log record or pointer to trx_purge_dummy_rec,
1318if the whole undo log can skipped in purge; NULL if none left */
1319static MY_ATTRIBUTE((warn_unused_result))
1320trx_undo_rec_t*
1321trx_purge_fetch_next_rec(
1322/*=====================*/
1323 roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */
1324 ulint* n_pages_handled,/*!< in/out: number of UNDO log pages
1325 handled */
1326 mem_heap_t* heap) /*!< in: memory heap where copied */
1327{
1328 if (!purge_sys.next_stored) {
1329 trx_purge_choose_next_log();
1330
1331 if (!purge_sys.next_stored) {
1332 DBUG_PRINT("ib_purge",
1333 ("no logs left in the history list"));
1334 return(NULL);
1335 }
1336 }
1337
1338 if (purge_sys.tail.trx_no() >= purge_sys.view.low_limit_no()) {
1339
1340 return(NULL);
1341 }
1342
1343 /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n",
1344 os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */
1345
1346 *roll_ptr = trx_undo_build_roll_ptr(
1347 /* row_purge_record_func() will later set
1348 ROLL_PTR_INSERT_FLAG for TRX_UNDO_INSERT_REC */
1349 false,
1350 purge_sys.rseg->id,
1351 purge_sys.page_no, purge_sys.offset);
1352
1353 /* The following call will advance the stored values of the
1354 purge iterator. */
1355
1356 return(trx_purge_get_next_rec(n_pages_handled, heap));
1357}
1358
1359/** Run a purge batch.
1360@param n_purge_threads number of purge threads
1361@return number of undo log pages handled in the batch */
1362static
1363ulint
1364trx_purge_attach_undo_recs(ulint n_purge_threads)
1365{
1366 que_thr_t* thr;
1367 ulint i = 0;
1368 ulint n_pages_handled = 0;
1369 ulint n_thrs = UT_LIST_GET_LEN(purge_sys.query->thrs);
1370
1371 ut_a(n_purge_threads > 0);
1372
1373 purge_sys.head = purge_sys.tail;
1374
1375 /* Debug code to validate some pre-requisites and reset done flag. */
1376 for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
1377 thr != NULL && i < n_purge_threads;
1378 thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
1379
1380 purge_node_t* node;
1381
1382 /* Get the purge node. */
1383 node = (purge_node_t*) thr->child;
1384
1385 ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1386 ut_a(node->undo_recs == NULL);
1387 ut_a(node->done);
1388
1389 node->done = FALSE;
1390 }
1391
1392 /* There should never be fewer nodes than threads, the inverse
1393 however is allowed because we only use purge threads as needed. */
1394 ut_a(i == n_purge_threads);
1395
1396 /* Fetch and parse the UNDO records. The UNDO records are added
1397 to a per purge node vector. */
1398 thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
1399 ut_a(n_thrs > 0 && thr != NULL);
1400
1401 ut_ad(purge_sys.head <= purge_sys.tail);
1402
1403 i = 0;
1404
1405 const ulint batch_size = srv_purge_batch_size;
1406
1407 for (;;) {
1408 purge_node_t* node;
1409 trx_purge_rec_t* purge_rec;
1410
1411 ut_a(!thr->is_active);
1412
1413 /* Get the purge node. */
1414 node = (purge_node_t*) thr->child;
1415 ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
1416
1417 purge_rec = static_cast<trx_purge_rec_t*>(
1418 mem_heap_zalloc(node->heap, sizeof(*purge_rec)));
1419
1420 /* Track the max {trx_id, undo_no} for truncating the
1421 UNDO logs once we have purged the records. */
1422
1423 if (purge_sys.head <= purge_sys.tail) {
1424 purge_sys.head = purge_sys.tail;
1425 }
1426
1427 /* Fetch the next record, and advance the purge_sys.tail. */
1428 purge_rec->undo_rec = trx_purge_fetch_next_rec(
1429 &purge_rec->roll_ptr, &n_pages_handled, node->heap);
1430
1431 if (purge_rec->undo_rec != NULL) {
1432
1433 if (node->undo_recs == NULL) {
1434 node->undo_recs = ib_vector_create(
1435 ib_heap_allocator_create(node->heap),
1436 sizeof(trx_purge_rec_t),
1437 batch_size);
1438 } else {
1439 ut_a(!ib_vector_is_empty(node->undo_recs));
1440 }
1441
1442 ib_vector_push(node->undo_recs, purge_rec);
1443
1444 if (n_pages_handled >= batch_size) {
1445
1446 break;
1447 }
1448 } else {
1449 break;
1450 }
1451
1452 thr = UT_LIST_GET_NEXT(thrs, thr);
1453
1454 if (!(++i % n_purge_threads)) {
1455 thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
1456 }
1457
1458 ut_a(thr != NULL);
1459 }
1460
1461 ut_ad(purge_sys.head <= purge_sys.tail);
1462
1463 return(n_pages_handled);
1464}
1465
1466/*******************************************************************//**
1467Calculate the DML delay required.
1468@return delay in microseconds or ULINT_MAX */
1469static
1470ulint
1471trx_purge_dml_delay(void)
1472/*=====================*/
1473{
1474 /* Determine how much data manipulation language (DML) statements
1475 need to be delayed in order to reduce the lagging of the purge
1476 thread. */
1477 ulint delay = 0; /* in microseconds; default: no delay */
1478
1479 /* If purge lag is set (ie. > 0) then calculate the new DML delay.
1480 Note: we do a dirty read of the trx_sys_t data structure here,
1481 without holding trx_sys.mutex. */
1482
1483 if (srv_max_purge_lag > 0) {
1484 float ratio;
1485
1486 ratio = float(trx_sys.history_size()) / srv_max_purge_lag;
1487
1488 if (ratio > 1.0) {
1489 /* If the history list length exceeds the
1490 srv_max_purge_lag, the data manipulation
1491 statements are delayed by at least 5000
1492 microseconds. */
1493 delay = (ulint) ((ratio - .5) * 10000);
1494 }
1495
1496 if (delay > srv_max_purge_lag_delay) {
1497 delay = srv_max_purge_lag_delay;
1498 }
1499
1500 MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay);
1501 }
1502
1503 return(delay);
1504}
1505
1506/** Wait for pending purge jobs to complete. */
1507static
1508void
1509trx_purge_wait_for_workers_to_complete()
1510{
1511 /* Ensure that the work queue empties out. */
1512 while (my_atomic_loadlint(&purge_sys.n_completed)
1513 != purge_sys.n_submitted) {
1514
1515 if (srv_get_task_queue_length() > 0) {
1516 srv_release_threads(SRV_WORKER, 1);
1517 }
1518
1519 os_thread_yield();
1520 }
1521
1522 /* There should be no outstanding tasks as long
1523 as the worker threads are active. */
1524 ut_a(srv_get_task_queue_length() == 0);
1525}
1526
1527/*******************************************************************//**
1528This function runs a purge batch.
1529@return number of undo log pages handled in the batch */
1530ulint
1531trx_purge(
1532/*======*/
1533 ulint n_purge_threads, /*!< in: number of purge tasks
1534 to submit to the work queue */
1535 bool truncate) /*!< in: truncate history if true */
1536{
1537 que_thr_t* thr = NULL;
1538 ulint n_pages_handled;
1539
1540 ut_a(n_purge_threads > 0);
1541
1542 srv_dml_needed_delay = trx_purge_dml_delay();
1543
1544 /* The number of tasks submitted should be completed. */
1545 ut_a(purge_sys.n_submitted
1546 == my_atomic_loadlint(&purge_sys.n_completed));
1547
1548 rw_lock_x_lock(&purge_sys.latch);
1549 trx_sys.clone_oldest_view();
1550 rw_lock_x_unlock(&purge_sys.latch);
1551
1552#ifdef UNIV_DEBUG
1553 if (srv_purge_view_update_only_debug) {
1554 return(0);
1555 }
1556#endif /* UNIV_DEBUG */
1557
1558 /* Fetch the UNDO recs that need to be purged. */
1559 n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads);
1560 purge_sys.n_submitted += n_purge_threads;
1561
1562 /* Submit tasks to workers queue if using multi-threaded purge. */
1563 for (ulint i = n_purge_threads; --i; ) {
1564 thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
1565 ut_a(thr);
1566 srv_que_task_enqueue_low(thr);
1567 }
1568
1569 thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
1570
1571 que_run_threads(thr);
1572
1573 my_atomic_addlint(&purge_sys.n_completed, 1);
1574
1575 if (n_purge_threads > 1) {
1576 trx_purge_wait_for_workers_to_complete();
1577 }
1578
1579 ut_a(purge_sys.n_submitted
1580 == my_atomic_loadlint(&purge_sys.n_completed));
1581
1582 if (truncate) {
1583 trx_purge_truncate_history();
1584 }
1585
1586 MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1);
1587 MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled);
1588
1589 return(n_pages_handled);
1590}
1591
1592/** Stop purge during FLUSH TABLES FOR EXPORT */
1593void purge_sys_t::stop()
1594{
1595 rw_lock_x_lock(&latch);
1596
1597 if (!enabled_latched())
1598 {
1599 /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
1600 ut_ad(!srv_undo_sources);
1601 rw_lock_x_unlock(&latch);
1602 return;
1603 }
1604
1605 ut_ad(srv_n_purge_threads > 0);
1606
1607 if (0 == my_atomic_add32_explicit(&m_paused, 1, MY_MEMORY_ORDER_RELAXED))
1608 {
1609 /* We need to wakeup the purge thread in case it is suspended, so
1610 that it can acknowledge the state change. */
1611 const int64_t sig_count = os_event_reset(event);
1612 rw_lock_x_unlock(&latch);
1613 ib::info() << "Stopping purge";
1614 srv_purge_wakeup();
1615 /* Wait for purge coordinator to signal that it is suspended. */
1616 os_event_wait_low(event, sig_count);
1617 MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT);
1618 return;
1619 }
1620
1621 rw_lock_x_unlock(&latch);
1622
1623 if (running())
1624 {
1625 ib::info() << "Waiting for purge to stop";
1626 while (running())
1627 os_thread_sleep(10000);
1628 }
1629}
1630
1631/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
1632void purge_sys_t::resume()
1633{
1634 if (!enabled())
1635 {
1636 /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
1637 ut_ad(!srv_undo_sources);
1638 return;
1639 }
1640
1641 int32_t paused= my_atomic_add32_explicit(&m_paused, -1,
1642 MY_MEMORY_ORDER_RELAXED);
1643 ut_a(paused);
1644
1645 if (paused == 1)
1646 {
1647 ib::info() << "Resuming purge";
1648 srv_purge_wakeup();
1649 MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
1650 }
1651}
1652