| 1 | /* | 
| 2 |    Copyright (c) 2016, Facebook, Inc. | 
| 3 |  | 
| 4 |    This program is free software; you can redistribute it and/or modify | 
| 5 |    it under the terms of the GNU General Public License as published by | 
| 6 |    the Free Software Foundation; version 2 of the License. | 
| 7 |  | 
| 8 |    This program is distributed in the hope that it will be useful, | 
| 9 |    but WITHOUT ANY WARRANTY; without even the implied warranty of | 
| 10 |    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
| 11 |    GNU General Public License for more details. | 
| 12 |  | 
| 13 |    You should have received a copy of the GNU General Public License | 
| 14 |    along with this program; if not, write to the Free Software | 
| 15 |    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */ | 
| 16 |  | 
| 17 | #include <my_global.h> | 
| 18 |  | 
| 19 | /* This C++ file's header file */ | 
| 20 | #include "./rdb_index_merge.h" | 
| 21 |  | 
| 22 | /* MySQL header files */ | 
| 23 | #include "../sql/sql_class.h" | 
| 24 |  | 
| 25 | /* MyRocks header files */ | 
| 26 | #include "./ha_rocksdb.h" | 
| 27 | #include "./rdb_datadic.h" | 
| 28 |  | 
| 29 | namespace myrocks { | 
| 30 |  | 
| 31 | Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path, | 
| 32 |                                  const ulonglong &merge_buf_size, | 
| 33 |                                  const ulonglong &merge_combine_read_size, | 
| 34 |                                  const ulonglong &merge_tmp_file_removal_delay, | 
| 35 |                                  rocksdb::ColumnFamilyHandle *cf) | 
| 36 |     : m_tmpfile_path(tmpfile_path), m_merge_buf_size(merge_buf_size), | 
| 37 |       m_merge_combine_read_size(merge_combine_read_size), | 
| 38 |       m_merge_tmp_file_removal_delay(merge_tmp_file_removal_delay), | 
| 39 |       m_cf_handle(cf), m_rec_buf_unsorted(nullptr), m_output_buf(nullptr) {} | 
| 40 |  | 
| 41 | Rdb_index_merge::~Rdb_index_merge() { | 
| 42 |   /* | 
| 43 |     If merge_tmp_file_removal_delay is set, sleep between calls to chsize. | 
| 44 |  | 
| 45 |     This helps mitigate potential trim stalls on flash when large files are | 
| 46 |     being deleted too quickly. | 
| 47 |   */ | 
| 48 |   if (m_merge_tmp_file_removal_delay > 0) { | 
| 49 |     uint64 curr_size = m_merge_buf_size * m_merge_file.m_num_sort_buffers; | 
| 50 |     for (uint i = 0; i < m_merge_file.m_num_sort_buffers; i++) { | 
| 51 |       if (my_chsize(m_merge_file.m_fd, curr_size, 0, MYF(MY_WME))) { | 
| 52 |         // NO_LINT_DEBUG | 
| 53 |         sql_print_error("Error truncating file during fast index creation." ); | 
| 54 |       } | 
| 55 |  | 
| 56 |       my_sleep(m_merge_tmp_file_removal_delay * 1000); | 
| 57 |       curr_size -= m_merge_buf_size; | 
| 58 |     } | 
| 59 |   } | 
| 60 |  | 
| 61 |   /* | 
| 62 |     Close file descriptor, we don't need to worry about deletion, | 
| 63 |     mysql handles it. | 
| 64 |   */ | 
| 65 |   my_close(m_merge_file.m_fd, MYF(MY_WME)); | 
| 66 | } | 
| 67 |  | 
| 68 | int Rdb_index_merge::init() { | 
| 69 |   /* | 
| 70 |     Create a temporary merge file on disk to store sorted chunks during | 
| 71 |     inplace index creation. | 
| 72 |   */ | 
| 73 |   if (merge_file_create()) { | 
| 74 |     return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 75 |   } | 
| 76 |  | 
| 77 |   /* | 
| 78 |     Then, allocate buffer to store unsorted records before they are written | 
| 79 |     to disk. They will be written to disk sorted. A sorted tree is used to | 
| 80 |     keep track of the offset of each record within the unsorted buffer. | 
| 81 |   */ | 
| 82 |   m_rec_buf_unsorted = | 
| 83 |       std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size)); | 
| 84 |  | 
| 85 |   /* | 
| 86 |     Allocate output buffer that will contain sorted block that is written to | 
| 87 |     disk. | 
| 88 |   */ | 
| 89 |   m_output_buf = | 
| 90 |       std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size)); | 
| 91 |  | 
| 92 |   return HA_EXIT_SUCCESS; | 
| 93 | } | 
| 94 |  | 
| 95 | /** | 
| 96 |   Create a merge file in the given location. | 
| 97 | */ | 
| 98 | int Rdb_index_merge::merge_file_create() { | 
| 99 |   DBUG_ASSERT(m_merge_file.m_fd == -1); | 
| 100 |  | 
| 101 |   int fd; | 
| 102 | #ifdef MARIAROCKS_NOT_YET // mysql_tmpfile_path use  | 
| 103 |   /* If no path set for tmpfile, use mysql_tmpdir by default */ | 
| 104 |   if (m_tmpfile_path == nullptr) { | 
| 105 |     fd = mysql_tmpfile("myrocks" ); | 
| 106 |   } else { | 
| 107 |     fd = mysql_tmpfile_path(m_tmpfile_path, "myrocks" ); | 
| 108 |   } | 
| 109 | #else | 
| 110 |   fd = mysql_tmpfile("myrocks" ); | 
| 111 | #endif | 
| 112 |   if (fd < 0) { | 
| 113 |     // NO_LINT_DEBUG | 
| 114 |     sql_print_error("Failed to create temp file during fast index creation." ); | 
| 115 |     return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 116 |   } | 
| 117 |  | 
| 118 |   m_merge_file.m_fd = fd; | 
| 119 |   m_merge_file.m_num_sort_buffers = 0; | 
| 120 |  | 
| 121 |   return HA_EXIT_SUCCESS; | 
| 122 | } | 
| 123 |  | 
| 124 | /** | 
| 125 |   Add record to offset tree (and unsorted merge buffer) in preparation for | 
| 126 |   writing out to disk in sorted chunks. | 
| 127 |  | 
| 128 |   If buffer in memory is full, write the buffer out to disk sorted using the | 
| 129 |   offset tree, and clear the tree. (Happens in merge_buf_write) | 
| 130 | */ | 
| 131 | int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) { | 
| 132 |   /* Adding a record after heap is already created results in error */ | 
| 133 |   DBUG_ASSERT(m_merge_min_heap.empty()); | 
| 134 |  | 
| 135 |   /* | 
| 136 |     Check if sort buffer is going to be out of space, if so write it | 
| 137 |     out to disk in sorted order using offset tree. | 
| 138 |   */ | 
| 139 |   const uint total_offset = RDB_MERGE_CHUNK_LEN + | 
| 140 |                             m_rec_buf_unsorted->m_curr_offset + | 
| 141 |                             RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER + | 
| 142 |                             key.size() + val.size(); | 
| 143 |   if (total_offset >= m_rec_buf_unsorted->m_total_size) { | 
| 144 |     /* | 
| 145 |       If the offset tree is empty here, that means that the proposed key to | 
| 146 |       add is too large for the buffer. | 
| 147 |     */ | 
| 148 |     if (m_offset_tree.empty()) { | 
| 149 |       // NO_LINT_DEBUG | 
| 150 |       sql_print_error("Sort buffer size is too small to process merge. "  | 
| 151 |                       "Please set merge buffer size to a higher value." ); | 
| 152 |       return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 153 |     } | 
| 154 |  | 
| 155 |     if (merge_buf_write()) { | 
| 156 |       // NO_LINT_DEBUG | 
| 157 |       sql_print_error("Error writing sort buffer to disk." ); | 
| 158 |       return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 159 |     } | 
| 160 |   } | 
| 161 |  | 
| 162 |   const ulonglong rec_offset = m_rec_buf_unsorted->m_curr_offset; | 
| 163 |  | 
| 164 |   /* | 
| 165 |     Store key and value in temporary unsorted in memory buffer pointed to by | 
| 166 |     offset tree. | 
| 167 |   */ | 
| 168 |   m_rec_buf_unsorted->store_key_value(key, val); | 
| 169 |  | 
| 170 |   /* Find sort order of the new record */ | 
| 171 |   auto res = | 
| 172 |       m_offset_tree.emplace(m_rec_buf_unsorted->m_block.get() + rec_offset, | 
| 173 |                             m_cf_handle->GetComparator()); | 
| 174 |   if (!res.second) { | 
| 175 |     my_printf_error(ER_DUP_ENTRY, | 
| 176 |                     "Failed to insert the record: the key already exists" , | 
| 177 |                     MYF(0)); | 
| 178 |     return ER_DUP_ENTRY; | 
| 179 |   } | 
| 180 |  | 
| 181 |   return HA_EXIT_SUCCESS; | 
| 182 | } | 
| 183 |  | 
| 184 | /** | 
| 185 |   Sort + write merge buffer chunk out to disk. | 
| 186 | */ | 
| 187 | int Rdb_index_merge::merge_buf_write() { | 
| 188 |   DBUG_ASSERT(m_merge_file.m_fd != -1); | 
| 189 |   DBUG_ASSERT(m_rec_buf_unsorted != nullptr); | 
| 190 |   DBUG_ASSERT(m_output_buf != nullptr); | 
| 191 |   DBUG_ASSERT(!m_offset_tree.empty()); | 
| 192 |  | 
| 193 |   /* Write actual chunk size to first 8 bytes of the merge buffer */ | 
| 194 |   merge_store_uint64(m_output_buf->m_block.get(), | 
| 195 |                      m_rec_buf_unsorted->m_curr_offset + RDB_MERGE_CHUNK_LEN); | 
| 196 |   m_output_buf->m_curr_offset += RDB_MERGE_CHUNK_LEN; | 
| 197 |  | 
| 198 |   /* | 
| 199 |     Iterate through the offset tree.  Should be ordered by the secondary key | 
| 200 |     at this point. | 
| 201 |   */ | 
| 202 |   for (const auto &rec : m_offset_tree) { | 
| 203 |     DBUG_ASSERT(m_output_buf->m_curr_offset <= m_merge_buf_size); | 
| 204 |  | 
| 205 |     /* Read record from offset (should never fail) */ | 
| 206 |     rocksdb::Slice key; | 
| 207 |     rocksdb::Slice val; | 
| 208 |     merge_read_rec(rec.m_block, &key, &val); | 
| 209 |  | 
| 210 |     /* Store key and value into sorted output buffer */ | 
| 211 |     m_output_buf->store_key_value(key, val); | 
| 212 |   } | 
| 213 |  | 
| 214 |   DBUG_ASSERT(m_output_buf->m_curr_offset <= m_output_buf->m_total_size); | 
| 215 |  | 
| 216 |   /* | 
| 217 |     Write output buffer to disk. | 
| 218 |  | 
| 219 |     Need to position cursor to the chunk it needs to be at on filesystem | 
| 220 |     then write into the respective merge buffer. | 
| 221 |   */ | 
| 222 |   if (my_seek(m_merge_file.m_fd, | 
| 223 |               m_merge_file.m_num_sort_buffers * m_merge_buf_size, SEEK_SET, | 
| 224 |               MYF(0)) == MY_FILEPOS_ERROR) { | 
| 225 |     // NO_LINT_DEBUG | 
| 226 |     sql_print_error("Error seeking to location in merge file on disk." ); | 
| 227 |     return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 228 |   } | 
| 229 |  | 
| 230 |   /* | 
| 231 |     Add a file sync call here to flush the data out. Otherwise, the filesystem | 
| 232 |     cache can flush out all of the files at the same time, causing a write | 
| 233 |     burst. | 
| 234 |   */ | 
| 235 |   if (my_write(m_merge_file.m_fd, m_output_buf->m_block.get(), | 
| 236 |                m_output_buf->m_total_size, MYF(MY_WME | MY_NABP)) || | 
| 237 |       mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) { | 
| 238 |     // NO_LINT_DEBUG | 
| 239 |     sql_print_error("Error writing sorted merge buffer to disk." ); | 
| 240 |     return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 241 |   } | 
| 242 |  | 
| 243 |   /* Increment merge file offset to track number of merge buffers written */ | 
| 244 |   m_merge_file.m_num_sort_buffers += 1; | 
| 245 |  | 
| 246 |   /* Reset everything for next run */ | 
| 247 |   merge_reset(); | 
| 248 |  | 
| 249 |   return HA_EXIT_SUCCESS; | 
| 250 | } | 
| 251 |  | 
| 252 | /** | 
| 253 |   Prepare n-way merge of n sorted buffers on disk, using a heap sorted by | 
| 254 |   secondary key records. | 
| 255 | */ | 
| 256 | int Rdb_index_merge::merge_heap_prepare() { | 
| 257 |   DBUG_ASSERT(m_merge_min_heap.empty()); | 
| 258 |  | 
| 259 |   /* | 
| 260 |     If the offset tree is not empty, there are still some records that need to | 
| 261 |     be written to disk. Write them out now. | 
| 262 |   */ | 
| 263 |   if (!m_offset_tree.empty() && merge_buf_write()) { | 
| 264 |     return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 265 |   } | 
| 266 |  | 
| 267 |   DBUG_ASSERT(m_merge_file.m_num_sort_buffers > 0); | 
| 268 |  | 
| 269 |   /* | 
| 270 |     For an n-way merge, we need to read chunks of each merge file | 
| 271 |     simultaneously. | 
| 272 |   */ | 
| 273 |   ulonglong chunk_size = | 
| 274 |       m_merge_combine_read_size / m_merge_file.m_num_sort_buffers; | 
| 275 |   if (chunk_size >= m_merge_buf_size) { | 
| 276 |     chunk_size = m_merge_buf_size; | 
| 277 |   } | 
| 278 |  | 
| 279 |   /* Allocate buffers for each chunk */ | 
| 280 |   for (ulonglong i = 0; i < m_merge_file.m_num_sort_buffers; i++) { | 
| 281 |     const auto entry = | 
| 282 |         std::make_shared<merge_heap_entry>(m_cf_handle->GetComparator()); | 
| 283 |  | 
| 284 |     /* | 
| 285 |       Read chunk_size bytes from each chunk on disk, and place inside | 
| 286 |       respective chunk buffer. | 
| 287 |     */ | 
| 288 |     const size_t total_size = | 
| 289 |         entry->prepare(m_merge_file.m_fd, i * m_merge_buf_size, chunk_size); | 
| 290 |  | 
| 291 |     if (total_size == (size_t)-1) { | 
| 292 |       return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 293 |     } | 
| 294 |  | 
| 295 |     /* Can reach this condition if an index was added on table w/ no rows */ | 
| 296 |     if (total_size - RDB_MERGE_CHUNK_LEN == 0) { | 
| 297 |       break; | 
| 298 |     } | 
| 299 |  | 
| 300 |     /* Read the first record from each buffer to initially populate the heap */ | 
| 301 |     if (entry->read_rec(&entry->m_key, &entry->m_val)) { | 
| 302 |       // NO_LINT_DEBUG | 
| 303 |       sql_print_error("Chunk size is too small to process merge." ); | 
| 304 |       return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 305 |     } | 
| 306 |  | 
| 307 |     m_merge_min_heap.push(std::move(entry)); | 
| 308 |   } | 
| 309 |  | 
| 310 |   return HA_EXIT_SUCCESS; | 
| 311 | } | 
| 312 |  | 
| 313 | /** | 
| 314 |   Create and/or iterate through keys in the merge heap. | 
| 315 | */ | 
| 316 | int Rdb_index_merge::next(rocksdb::Slice *const key, | 
| 317 |                           rocksdb::Slice *const val) { | 
| 318 |   /* | 
| 319 |     If table fits in one sort buffer, we can optimize by writing | 
| 320 |     the sort buffer directly through to the sstfilewriter instead of | 
| 321 |     needing to create tmp files/heap to merge the sort buffers. | 
| 322 |  | 
| 323 |     If there are no sort buffer records (alters on empty tables), | 
| 324 |     also exit here. | 
| 325 |   */ | 
| 326 |   if (m_merge_file.m_num_sort_buffers == 0) { | 
| 327 |     if (m_offset_tree.empty()) { | 
| 328 |       return -1; | 
| 329 |     } | 
| 330 |  | 
| 331 |     const auto rec = m_offset_tree.begin(); | 
| 332 |  | 
| 333 |     /* Read record from offset */ | 
| 334 |     merge_read_rec(rec->m_block, key, val); | 
| 335 |  | 
| 336 |     m_offset_tree.erase(rec); | 
| 337 |     return HA_EXIT_SUCCESS; | 
| 338 |   } | 
| 339 |  | 
| 340 |   int res; | 
| 341 |  | 
| 342 |   /* | 
| 343 |     If heap and heap chunk info are empty, we must be beginning the merge phase | 
| 344 |     of the external sort. Populate the heap with initial values from each | 
| 345 |     disk chunk. | 
| 346 |   */ | 
| 347 |   if (m_merge_min_heap.empty()) { | 
| 348 |     if ((res = merge_heap_prepare())) { | 
| 349 |       // NO_LINT_DEBUG | 
| 350 |       sql_print_error("Error during preparation of heap." ); | 
| 351 |       return res; | 
| 352 |     } | 
| 353 |  | 
| 354 |     /* | 
| 355 |       Return the first top record without popping, as we haven't put this | 
| 356 |       inside the SST file yet. | 
| 357 |     */ | 
| 358 |     merge_heap_top(key, val); | 
| 359 |     return HA_EXIT_SUCCESS; | 
| 360 |   } | 
| 361 |  | 
| 362 |   DBUG_ASSERT(!m_merge_min_heap.empty()); | 
| 363 |   return merge_heap_pop_and_get_next(key, val); | 
| 364 | } | 
| 365 |  | 
| 366 | /** | 
| 367 |   Get current top record from the heap. | 
| 368 | */ | 
| 369 | void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key, | 
| 370 |                                      rocksdb::Slice *const val) { | 
| 371 |   DBUG_ASSERT(!m_merge_min_heap.empty()); | 
| 372 |  | 
| 373 |   const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top(); | 
| 374 |   *key = entry->m_key; | 
| 375 |   *val = entry->m_val; | 
| 376 | } | 
| 377 |  | 
| 378 | /** | 
| 379 |   Pops the top record, and uses it to read next record from the | 
| 380 |   corresponding sort buffer and push onto the heap. | 
| 381 |  | 
| 382 |   Returns -1 when there are no more records in the heap. | 
| 383 | */ | 
| 384 | int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key, | 
| 385 |                                                  rocksdb::Slice *const val) { | 
| 386 |   /* | 
| 387 |     Make a new reference to shared ptr so it doesn't get destroyed | 
| 388 |     during pop(). We are going to push this entry back onto the heap. | 
| 389 |   */ | 
| 390 |   const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top(); | 
| 391 |   m_merge_min_heap.pop(); | 
| 392 |  | 
| 393 |   /* | 
| 394 |     We are finished w/ current chunk if: | 
| 395 |     current_offset + disk_offset == m_total_size | 
| 396 |  | 
| 397 |     Return without adding entry back onto heap. | 
| 398 |     If heap is also empty, we must be finished with merge. | 
| 399 |   */ | 
| 400 |   if (entry->m_chunk_info->is_chunk_finished()) { | 
| 401 |     if (m_merge_min_heap.empty()) { | 
| 402 |       return -1; | 
| 403 |     } | 
| 404 |  | 
| 405 |     merge_heap_top(key, val); | 
| 406 |     return HA_EXIT_SUCCESS; | 
| 407 |   } | 
| 408 |  | 
| 409 |   /* | 
| 410 |     Make sure we haven't reached the end of the chunk. | 
| 411 |   */ | 
| 412 |   DBUG_ASSERT(!entry->m_chunk_info->is_chunk_finished()); | 
| 413 |  | 
| 414 |   /* | 
| 415 |     If merge_read_rec fails, it means the either the chunk was cut off | 
| 416 |     or we've reached the end of the respective chunk. | 
| 417 |   */ | 
| 418 |   if (entry->read_rec(&entry->m_key, &entry->m_val)) { | 
| 419 |     if (entry->read_next_chunk_from_disk(m_merge_file.m_fd)) { | 
| 420 |       return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 421 |     } | 
| 422 |  | 
| 423 |     /* Try reading record again, should never fail. */ | 
| 424 |     if (entry->read_rec(&entry->m_key, &entry->m_val)) { | 
| 425 |       return HA_ERR_ROCKSDB_MERGE_FILE_ERR; | 
| 426 |     } | 
| 427 |   } | 
| 428 |  | 
| 429 |   /* Push entry back on to the heap w/ updated buffer + offset ptr */ | 
| 430 |   m_merge_min_heap.push(std::move(entry)); | 
| 431 |  | 
| 432 |   /* Return the current top record on heap */ | 
| 433 |   merge_heap_top(key, val); | 
| 434 |   return HA_EXIT_SUCCESS; | 
| 435 | } | 
| 436 |  | 
| 437 | int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) { | 
| 438 |   if (m_chunk_info->read_next_chunk_from_disk(fd)) { | 
| 439 |     return HA_EXIT_FAILURE; | 
| 440 |   } | 
| 441 |  | 
| 442 |   m_block = m_chunk_info->m_block.get(); | 
| 443 |   return HA_EXIT_SUCCESS; | 
| 444 | } | 
| 445 |  | 
| 446 | int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) { | 
| 447 |   m_disk_curr_offset += m_curr_offset; | 
| 448 |  | 
| 449 |   if (my_seek(fd, m_disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) { | 
| 450 |     // NO_LINT_DEBUG | 
| 451 |     sql_print_error("Error seeking to location in merge file on disk." ); | 
| 452 |     return HA_EXIT_FAILURE; | 
| 453 |   } | 
| 454 |  | 
| 455 |   /* Overwrite the old block */ | 
| 456 |   const size_t bytes_read = | 
| 457 |       my_read(fd, m_block.get(), m_block_len, MYF(MY_WME)); | 
| 458 |   if (bytes_read == (size_t)-1) { | 
| 459 |     // NO_LINT_DEBUG | 
| 460 |     sql_print_error("Error reading merge file from disk." ); | 
| 461 |     return HA_EXIT_FAILURE; | 
| 462 |   } | 
| 463 |  | 
| 464 |   m_curr_offset = 0; | 
| 465 |   return HA_EXIT_SUCCESS; | 
| 466 | } | 
| 467 |  | 
| 468 | /** | 
| 469 |   Get records from offset within sort buffer and compare them. | 
| 470 |   Sort by least to greatest. | 
| 471 | */ | 
| 472 | int Rdb_index_merge::merge_record_compare( | 
| 473 |     const uchar *const a_block, const uchar *const b_block, | 
| 474 |     const rocksdb::Comparator *const comparator) { | 
| 475 |   return comparator->Compare(as_slice(a_block), as_slice(b_block)); | 
| 476 | } | 
| 477 |  | 
| 478 | /** | 
| 479 |   Given an offset in a merge sort buffer, read out the keys + values. | 
| 480 |   After this, block will point to the next record in the buffer. | 
| 481 | **/ | 
| 482 | void Rdb_index_merge::merge_read_rec(const uchar *const block, | 
| 483 |                                      rocksdb::Slice *const key, | 
| 484 |                                      rocksdb::Slice *const val) { | 
| 485 |   /* Read key at block offset into key slice and the value into value slice*/ | 
| 486 |   read_slice(key, block); | 
| 487 |   read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size()); | 
| 488 | } | 
| 489 |  | 
| 490 | void Rdb_index_merge::read_slice(rocksdb::Slice *slice, | 
| 491 |                                  const uchar *block_ptr) { | 
| 492 |   uint64 slice_len; | 
| 493 |   merge_read_uint64(&block_ptr, &slice_len); | 
| 494 |  | 
| 495 |   *slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len); | 
| 496 | } | 
| 497 |  | 
| 498 | int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key, | 
| 499 |                                                 rocksdb::Slice *const val) { | 
| 500 |   const uchar *block_ptr = m_block; | 
| 501 |   const auto orig_offset = m_chunk_info->m_curr_offset; | 
| 502 |   const auto orig_block = m_block; | 
| 503 |  | 
| 504 |   /* Read key at block offset into key slice and the value into value slice*/ | 
| 505 |   if (read_slice(key, &block_ptr) != 0) { | 
| 506 |     return HA_EXIT_FAILURE; | 
| 507 |   } | 
| 508 |  | 
| 509 |   m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block; | 
| 510 |   m_block += (uintptr_t)block_ptr - (uintptr_t)m_block; | 
| 511 |  | 
| 512 |   if (read_slice(val, &block_ptr) != 0) { | 
| 513 |     m_chunk_info->m_curr_offset = orig_offset; | 
| 514 |     m_block = orig_block; | 
| 515 |     return HA_EXIT_FAILURE; | 
| 516 |   } | 
| 517 |  | 
| 518 |   m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block; | 
| 519 |   m_block += (uintptr_t)block_ptr - (uintptr_t)m_block; | 
| 520 |  | 
| 521 |   return HA_EXIT_SUCCESS; | 
| 522 | } | 
| 523 |  | 
| 524 | int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice, | 
| 525 |                                                   const uchar **block_ptr) { | 
| 526 |   if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) { | 
| 527 |     return HA_EXIT_FAILURE; | 
| 528 |   } | 
| 529 |  | 
| 530 |   uint64 slice_len; | 
| 531 |   merge_read_uint64(block_ptr, &slice_len); | 
| 532 |   if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) { | 
| 533 |     return HA_EXIT_FAILURE; | 
| 534 |   } | 
| 535 |  | 
| 536 |   *slice = | 
| 537 |       rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len); | 
| 538 |   *block_ptr += slice_len; | 
| 539 |   return HA_EXIT_SUCCESS; | 
| 540 | } | 
| 541 |  | 
| 542 | size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset, | 
| 543 |                                                   ulonglong chunk_size) { | 
| 544 |   m_chunk_info = std::make_shared<merge_buf_info>(chunk_size); | 
| 545 |   const size_t res = m_chunk_info->prepare(fd, f_offset); | 
| 546 |   if (res != (size_t)-1) { | 
| 547 |     m_block = m_chunk_info->m_block.get() + RDB_MERGE_CHUNK_LEN; | 
| 548 |   } | 
| 549 |  | 
| 550 |   return res; | 
| 551 | } | 
| 552 |  | 
| 553 | size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) { | 
| 554 |   m_disk_start_offset = f_offset; | 
| 555 |   m_disk_curr_offset = f_offset; | 
| 556 |  | 
| 557 |   /* | 
| 558 |     Need to position cursor to the chunk it needs to be at on filesystem | 
| 559 |     then read 'chunk_size' bytes into the respective chunk buffer. | 
| 560 |   */ | 
| 561 |   if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) { | 
| 562 |     // NO_LINT_DEBUG | 
| 563 |     sql_print_error("Error seeking to location in merge file on disk." ); | 
| 564 |     return (size_t)-1; | 
| 565 |   } | 
| 566 |  | 
| 567 |   const size_t bytes_read = | 
| 568 |       my_read(fd, m_block.get(), m_total_size, MYF(MY_WME)); | 
| 569 |   if (bytes_read == (size_t)-1) { | 
| 570 |     // NO_LINT_DEBUG | 
| 571 |     sql_print_error("Error reading merge file from disk." ); | 
| 572 |     return (size_t)-1; | 
| 573 |   } | 
| 574 |  | 
| 575 |   /* | 
| 576 |     Read the first 8 bytes of each chunk, this gives us the actual | 
| 577 |     size of each chunk. | 
| 578 |   */ | 
| 579 |   const uchar *block_ptr = m_block.get(); | 
| 580 |   merge_read_uint64(&block_ptr, &m_total_size); | 
| 581 |   m_curr_offset += RDB_MERGE_CHUNK_LEN; | 
| 582 |   return m_total_size; | 
| 583 | } | 
| 584 |  | 
| 585 | /* Store key and value w/ their respective delimiters at the given offset */ | 
| 586 | void Rdb_index_merge::merge_buf_info::store_key_value( | 
| 587 |     const rocksdb::Slice &key, const rocksdb::Slice &val) { | 
| 588 |   store_slice(key); | 
| 589 |   store_slice(val); | 
| 590 | } | 
| 591 |  | 
| 592 | void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) { | 
| 593 |   /* Store length delimiter */ | 
| 594 |   merge_store_uint64(&m_block[m_curr_offset], slice.size()); | 
| 595 |  | 
| 596 |   /* Store slice data */ | 
| 597 |   memcpy(&m_block[m_curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(), | 
| 598 |          slice.size()); | 
| 599 |  | 
| 600 |   m_curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER; | 
| 601 | } | 
| 602 |  | 
| 603 | void Rdb_index_merge::merge_reset() { | 
| 604 |   /* | 
| 605 |     Either error, or all values in the sort buffer have been written to disk, | 
| 606 |     so we need to clear the offset tree. | 
| 607 |   */ | 
| 608 |   m_offset_tree.clear(); | 
| 609 |  | 
| 610 |   /* Reset sort buffer block */ | 
| 611 |   if (m_rec_buf_unsorted && m_rec_buf_unsorted->m_block) { | 
| 612 |     m_rec_buf_unsorted->m_curr_offset = 0; | 
| 613 |   } | 
| 614 |  | 
| 615 |   /* Reset output buf */ | 
| 616 |   if (m_output_buf && m_output_buf->m_block) { | 
| 617 |     m_output_buf->m_curr_offset = 0; | 
| 618 |   } | 
| 619 | } | 
| 620 |  | 
| 621 | } // namespace myrocks | 
| 622 |  |