1/*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17#include <my_global.h>
18
19/* This C++ file's header file */
20#include "./rdb_index_merge.h"
21
22/* MySQL header files */
23#include "../sql/sql_class.h"
24
25/* MyRocks header files */
26#include "./ha_rocksdb.h"
27#include "./rdb_datadic.h"
28
29namespace myrocks {
30
31Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
32 const ulonglong &merge_buf_size,
33 const ulonglong &merge_combine_read_size,
34 const ulonglong &merge_tmp_file_removal_delay,
35 rocksdb::ColumnFamilyHandle *cf)
36 : m_tmpfile_path(tmpfile_path), m_merge_buf_size(merge_buf_size),
37 m_merge_combine_read_size(merge_combine_read_size),
38 m_merge_tmp_file_removal_delay(merge_tmp_file_removal_delay),
39 m_cf_handle(cf), m_rec_buf_unsorted(nullptr), m_output_buf(nullptr) {}
40
41Rdb_index_merge::~Rdb_index_merge() {
42 /*
43 If merge_tmp_file_removal_delay is set, sleep between calls to chsize.
44
45 This helps mitigate potential trim stalls on flash when large files are
46 being deleted too quickly.
47 */
48 if (m_merge_tmp_file_removal_delay > 0) {
49 uint64 curr_size = m_merge_buf_size * m_merge_file.m_num_sort_buffers;
50 for (uint i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
51 if (my_chsize(m_merge_file.m_fd, curr_size, 0, MYF(MY_WME))) {
52 // NO_LINT_DEBUG
53 sql_print_error("Error truncating file during fast index creation.");
54 }
55
56 my_sleep(m_merge_tmp_file_removal_delay * 1000);
57 curr_size -= m_merge_buf_size;
58 }
59 }
60
61 /*
62 Close file descriptor, we don't need to worry about deletion,
63 mysql handles it.
64 */
65 my_close(m_merge_file.m_fd, MYF(MY_WME));
66}
67
68int Rdb_index_merge::init() {
69 /*
70 Create a temporary merge file on disk to store sorted chunks during
71 inplace index creation.
72 */
73 if (merge_file_create()) {
74 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
75 }
76
77 /*
78 Then, allocate buffer to store unsorted records before they are written
79 to disk. They will be written to disk sorted. A sorted tree is used to
80 keep track of the offset of each record within the unsorted buffer.
81 */
82 m_rec_buf_unsorted =
83 std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
84
85 /*
86 Allocate output buffer that will contain sorted block that is written to
87 disk.
88 */
89 m_output_buf =
90 std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
91
92 return HA_EXIT_SUCCESS;
93}
94
95/**
96 Create a merge file in the given location.
97*/
98int Rdb_index_merge::merge_file_create() {
99 DBUG_ASSERT(m_merge_file.m_fd == -1);
100
101 int fd;
102#ifdef MARIAROCKS_NOT_YET // mysql_tmpfile_path use
103 /* If no path set for tmpfile, use mysql_tmpdir by default */
104 if (m_tmpfile_path == nullptr) {
105 fd = mysql_tmpfile("myrocks");
106 } else {
107 fd = mysql_tmpfile_path(m_tmpfile_path, "myrocks");
108 }
109#else
110 fd = mysql_tmpfile("myrocks");
111#endif
112 if (fd < 0) {
113 // NO_LINT_DEBUG
114 sql_print_error("Failed to create temp file during fast index creation.");
115 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
116 }
117
118 m_merge_file.m_fd = fd;
119 m_merge_file.m_num_sort_buffers = 0;
120
121 return HA_EXIT_SUCCESS;
122}
123
124/**
125 Add record to offset tree (and unsorted merge buffer) in preparation for
126 writing out to disk in sorted chunks.
127
128 If buffer in memory is full, write the buffer out to disk sorted using the
129 offset tree, and clear the tree. (Happens in merge_buf_write)
130*/
131int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
132 /* Adding a record after heap is already created results in error */
133 DBUG_ASSERT(m_merge_min_heap.empty());
134
135 /*
136 Check if sort buffer is going to be out of space, if so write it
137 out to disk in sorted order using offset tree.
138 */
139 const uint total_offset = RDB_MERGE_CHUNK_LEN +
140 m_rec_buf_unsorted->m_curr_offset +
141 RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
142 key.size() + val.size();
143 if (total_offset >= m_rec_buf_unsorted->m_total_size) {
144 /*
145 If the offset tree is empty here, that means that the proposed key to
146 add is too large for the buffer.
147 */
148 if (m_offset_tree.empty()) {
149 // NO_LINT_DEBUG
150 sql_print_error("Sort buffer size is too small to process merge. "
151 "Please set merge buffer size to a higher value.");
152 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
153 }
154
155 if (merge_buf_write()) {
156 // NO_LINT_DEBUG
157 sql_print_error("Error writing sort buffer to disk.");
158 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
159 }
160 }
161
162 const ulonglong rec_offset = m_rec_buf_unsorted->m_curr_offset;
163
164 /*
165 Store key and value in temporary unsorted in memory buffer pointed to by
166 offset tree.
167 */
168 m_rec_buf_unsorted->store_key_value(key, val);
169
170 /* Find sort order of the new record */
171 auto res =
172 m_offset_tree.emplace(m_rec_buf_unsorted->m_block.get() + rec_offset,
173 m_cf_handle->GetComparator());
174 if (!res.second) {
175 my_printf_error(ER_DUP_ENTRY,
176 "Failed to insert the record: the key already exists",
177 MYF(0));
178 return ER_DUP_ENTRY;
179 }
180
181 return HA_EXIT_SUCCESS;
182}
183
184/**
185 Sort + write merge buffer chunk out to disk.
186*/
187int Rdb_index_merge::merge_buf_write() {
188 DBUG_ASSERT(m_merge_file.m_fd != -1);
189 DBUG_ASSERT(m_rec_buf_unsorted != nullptr);
190 DBUG_ASSERT(m_output_buf != nullptr);
191 DBUG_ASSERT(!m_offset_tree.empty());
192
193 /* Write actual chunk size to first 8 bytes of the merge buffer */
194 merge_store_uint64(m_output_buf->m_block.get(),
195 m_rec_buf_unsorted->m_curr_offset + RDB_MERGE_CHUNK_LEN);
196 m_output_buf->m_curr_offset += RDB_MERGE_CHUNK_LEN;
197
198 /*
199 Iterate through the offset tree. Should be ordered by the secondary key
200 at this point.
201 */
202 for (const auto &rec : m_offset_tree) {
203 DBUG_ASSERT(m_output_buf->m_curr_offset <= m_merge_buf_size);
204
205 /* Read record from offset (should never fail) */
206 rocksdb::Slice key;
207 rocksdb::Slice val;
208 merge_read_rec(rec.m_block, &key, &val);
209
210 /* Store key and value into sorted output buffer */
211 m_output_buf->store_key_value(key, val);
212 }
213
214 DBUG_ASSERT(m_output_buf->m_curr_offset <= m_output_buf->m_total_size);
215
216 /*
217 Write output buffer to disk.
218
219 Need to position cursor to the chunk it needs to be at on filesystem
220 then write into the respective merge buffer.
221 */
222 if (my_seek(m_merge_file.m_fd,
223 m_merge_file.m_num_sort_buffers * m_merge_buf_size, SEEK_SET,
224 MYF(0)) == MY_FILEPOS_ERROR) {
225 // NO_LINT_DEBUG
226 sql_print_error("Error seeking to location in merge file on disk.");
227 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
228 }
229
230 /*
231 Add a file sync call here to flush the data out. Otherwise, the filesystem
232 cache can flush out all of the files at the same time, causing a write
233 burst.
234 */
235 if (my_write(m_merge_file.m_fd, m_output_buf->m_block.get(),
236 m_output_buf->m_total_size, MYF(MY_WME | MY_NABP)) ||
237 mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
238 // NO_LINT_DEBUG
239 sql_print_error("Error writing sorted merge buffer to disk.");
240 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
241 }
242
243 /* Increment merge file offset to track number of merge buffers written */
244 m_merge_file.m_num_sort_buffers += 1;
245
246 /* Reset everything for next run */
247 merge_reset();
248
249 return HA_EXIT_SUCCESS;
250}
251
252/**
253 Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
254 secondary key records.
255*/
256int Rdb_index_merge::merge_heap_prepare() {
257 DBUG_ASSERT(m_merge_min_heap.empty());
258
259 /*
260 If the offset tree is not empty, there are still some records that need to
261 be written to disk. Write them out now.
262 */
263 if (!m_offset_tree.empty() && merge_buf_write()) {
264 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
265 }
266
267 DBUG_ASSERT(m_merge_file.m_num_sort_buffers > 0);
268
269 /*
270 For an n-way merge, we need to read chunks of each merge file
271 simultaneously.
272 */
273 ulonglong chunk_size =
274 m_merge_combine_read_size / m_merge_file.m_num_sort_buffers;
275 if (chunk_size >= m_merge_buf_size) {
276 chunk_size = m_merge_buf_size;
277 }
278
279 /* Allocate buffers for each chunk */
280 for (ulonglong i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
281 const auto entry =
282 std::make_shared<merge_heap_entry>(m_cf_handle->GetComparator());
283
284 /*
285 Read chunk_size bytes from each chunk on disk, and place inside
286 respective chunk buffer.
287 */
288 const size_t total_size =
289 entry->prepare(m_merge_file.m_fd, i * m_merge_buf_size, chunk_size);
290
291 if (total_size == (size_t)-1) {
292 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
293 }
294
295 /* Can reach this condition if an index was added on table w/ no rows */
296 if (total_size - RDB_MERGE_CHUNK_LEN == 0) {
297 break;
298 }
299
300 /* Read the first record from each buffer to initially populate the heap */
301 if (entry->read_rec(&entry->m_key, &entry->m_val)) {
302 // NO_LINT_DEBUG
303 sql_print_error("Chunk size is too small to process merge.");
304 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
305 }
306
307 m_merge_min_heap.push(std::move(entry));
308 }
309
310 return HA_EXIT_SUCCESS;
311}
312
313/**
314 Create and/or iterate through keys in the merge heap.
315*/
316int Rdb_index_merge::next(rocksdb::Slice *const key,
317 rocksdb::Slice *const val) {
318 /*
319 If table fits in one sort buffer, we can optimize by writing
320 the sort buffer directly through to the sstfilewriter instead of
321 needing to create tmp files/heap to merge the sort buffers.
322
323 If there are no sort buffer records (alters on empty tables),
324 also exit here.
325 */
326 if (m_merge_file.m_num_sort_buffers == 0) {
327 if (m_offset_tree.empty()) {
328 return -1;
329 }
330
331 const auto rec = m_offset_tree.begin();
332
333 /* Read record from offset */
334 merge_read_rec(rec->m_block, key, val);
335
336 m_offset_tree.erase(rec);
337 return HA_EXIT_SUCCESS;
338 }
339
340 int res;
341
342 /*
343 If heap and heap chunk info are empty, we must be beginning the merge phase
344 of the external sort. Populate the heap with initial values from each
345 disk chunk.
346 */
347 if (m_merge_min_heap.empty()) {
348 if ((res = merge_heap_prepare())) {
349 // NO_LINT_DEBUG
350 sql_print_error("Error during preparation of heap.");
351 return res;
352 }
353
354 /*
355 Return the first top record without popping, as we haven't put this
356 inside the SST file yet.
357 */
358 merge_heap_top(key, val);
359 return HA_EXIT_SUCCESS;
360 }
361
362 DBUG_ASSERT(!m_merge_min_heap.empty());
363 return merge_heap_pop_and_get_next(key, val);
364}
365
366/**
367 Get current top record from the heap.
368*/
369void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key,
370 rocksdb::Slice *const val) {
371 DBUG_ASSERT(!m_merge_min_heap.empty());
372
373 const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top();
374 *key = entry->m_key;
375 *val = entry->m_val;
376}
377
378/**
379 Pops the top record, and uses it to read next record from the
380 corresponding sort buffer and push onto the heap.
381
382 Returns -1 when there are no more records in the heap.
383*/
384int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key,
385 rocksdb::Slice *const val) {
386 /*
387 Make a new reference to shared ptr so it doesn't get destroyed
388 during pop(). We are going to push this entry back onto the heap.
389 */
390 const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top();
391 m_merge_min_heap.pop();
392
393 /*
394 We are finished w/ current chunk if:
395 current_offset + disk_offset == m_total_size
396
397 Return without adding entry back onto heap.
398 If heap is also empty, we must be finished with merge.
399 */
400 if (entry->m_chunk_info->is_chunk_finished()) {
401 if (m_merge_min_heap.empty()) {
402 return -1;
403 }
404
405 merge_heap_top(key, val);
406 return HA_EXIT_SUCCESS;
407 }
408
409 /*
410 Make sure we haven't reached the end of the chunk.
411 */
412 DBUG_ASSERT(!entry->m_chunk_info->is_chunk_finished());
413
414 /*
415 If merge_read_rec fails, it means the either the chunk was cut off
416 or we've reached the end of the respective chunk.
417 */
418 if (entry->read_rec(&entry->m_key, &entry->m_val)) {
419 if (entry->read_next_chunk_from_disk(m_merge_file.m_fd)) {
420 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
421 }
422
423 /* Try reading record again, should never fail. */
424 if (entry->read_rec(&entry->m_key, &entry->m_val)) {
425 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
426 }
427 }
428
429 /* Push entry back on to the heap w/ updated buffer + offset ptr */
430 m_merge_min_heap.push(std::move(entry));
431
432 /* Return the current top record on heap */
433 merge_heap_top(key, val);
434 return HA_EXIT_SUCCESS;
435}
436
437int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) {
438 if (m_chunk_info->read_next_chunk_from_disk(fd)) {
439 return HA_EXIT_FAILURE;
440 }
441
442 m_block = m_chunk_info->m_block.get();
443 return HA_EXIT_SUCCESS;
444}
445
446int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) {
447 m_disk_curr_offset += m_curr_offset;
448
449 if (my_seek(fd, m_disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
450 // NO_LINT_DEBUG
451 sql_print_error("Error seeking to location in merge file on disk.");
452 return HA_EXIT_FAILURE;
453 }
454
455 /* Overwrite the old block */
456 const size_t bytes_read =
457 my_read(fd, m_block.get(), m_block_len, MYF(MY_WME));
458 if (bytes_read == (size_t)-1) {
459 // NO_LINT_DEBUG
460 sql_print_error("Error reading merge file from disk.");
461 return HA_EXIT_FAILURE;
462 }
463
464 m_curr_offset = 0;
465 return HA_EXIT_SUCCESS;
466}
467
468/**
469 Get records from offset within sort buffer and compare them.
470 Sort by least to greatest.
471*/
472int Rdb_index_merge::merge_record_compare(
473 const uchar *const a_block, const uchar *const b_block,
474 const rocksdb::Comparator *const comparator) {
475 return comparator->Compare(as_slice(a_block), as_slice(b_block));
476}
477
478/**
479 Given an offset in a merge sort buffer, read out the keys + values.
480 After this, block will point to the next record in the buffer.
481**/
482void Rdb_index_merge::merge_read_rec(const uchar *const block,
483 rocksdb::Slice *const key,
484 rocksdb::Slice *const val) {
485 /* Read key at block offset into key slice and the value into value slice*/
486 read_slice(key, block);
487 read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
488}
489
490void Rdb_index_merge::read_slice(rocksdb::Slice *slice,
491 const uchar *block_ptr) {
492 uint64 slice_len;
493 merge_read_uint64(&block_ptr, &slice_len);
494
495 *slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len);
496}
497
498int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key,
499 rocksdb::Slice *const val) {
500 const uchar *block_ptr = m_block;
501 const auto orig_offset = m_chunk_info->m_curr_offset;
502 const auto orig_block = m_block;
503
504 /* Read key at block offset into key slice and the value into value slice*/
505 if (read_slice(key, &block_ptr) != 0) {
506 return HA_EXIT_FAILURE;
507 }
508
509 m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
510 m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
511
512 if (read_slice(val, &block_ptr) != 0) {
513 m_chunk_info->m_curr_offset = orig_offset;
514 m_block = orig_block;
515 return HA_EXIT_FAILURE;
516 }
517
518 m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
519 m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
520
521 return HA_EXIT_SUCCESS;
522}
523
524int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice,
525 const uchar **block_ptr) {
526 if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) {
527 return HA_EXIT_FAILURE;
528 }
529
530 uint64 slice_len;
531 merge_read_uint64(block_ptr, &slice_len);
532 if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) {
533 return HA_EXIT_FAILURE;
534 }
535
536 *slice =
537 rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len);
538 *block_ptr += slice_len;
539 return HA_EXIT_SUCCESS;
540}
541
542size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
543 ulonglong chunk_size) {
544 m_chunk_info = std::make_shared<merge_buf_info>(chunk_size);
545 const size_t res = m_chunk_info->prepare(fd, f_offset);
546 if (res != (size_t)-1) {
547 m_block = m_chunk_info->m_block.get() + RDB_MERGE_CHUNK_LEN;
548 }
549
550 return res;
551}
552
553size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) {
554 m_disk_start_offset = f_offset;
555 m_disk_curr_offset = f_offset;
556
557 /*
558 Need to position cursor to the chunk it needs to be at on filesystem
559 then read 'chunk_size' bytes into the respective chunk buffer.
560 */
561 if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
562 // NO_LINT_DEBUG
563 sql_print_error("Error seeking to location in merge file on disk.");
564 return (size_t)-1;
565 }
566
567 const size_t bytes_read =
568 my_read(fd, m_block.get(), m_total_size, MYF(MY_WME));
569 if (bytes_read == (size_t)-1) {
570 // NO_LINT_DEBUG
571 sql_print_error("Error reading merge file from disk.");
572 return (size_t)-1;
573 }
574
575 /*
576 Read the first 8 bytes of each chunk, this gives us the actual
577 size of each chunk.
578 */
579 const uchar *block_ptr = m_block.get();
580 merge_read_uint64(&block_ptr, &m_total_size);
581 m_curr_offset += RDB_MERGE_CHUNK_LEN;
582 return m_total_size;
583}
584
585/* Store key and value w/ their respective delimiters at the given offset */
586void Rdb_index_merge::merge_buf_info::store_key_value(
587 const rocksdb::Slice &key, const rocksdb::Slice &val) {
588 store_slice(key);
589 store_slice(val);
590}
591
592void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) {
593 /* Store length delimiter */
594 merge_store_uint64(&m_block[m_curr_offset], slice.size());
595
596 /* Store slice data */
597 memcpy(&m_block[m_curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
598 slice.size());
599
600 m_curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
601}
602
603void Rdb_index_merge::merge_reset() {
604 /*
605 Either error, or all values in the sort buffer have been written to disk,
606 so we need to clear the offset tree.
607 */
608 m_offset_tree.clear();
609
610 /* Reset sort buffer block */
611 if (m_rec_buf_unsorted && m_rec_buf_unsorted->m_block) {
612 m_rec_buf_unsorted->m_curr_offset = 0;
613 }
614
615 /* Reset output buf */
616 if (m_output_buf && m_output_buf->m_block) {
617 m_output_buf->m_curr_offset = 0;
618 }
619}
620
621} // namespace myrocks
622