1/*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17/* For PRIu64 use below: */
18#define __STDC_FORMAT_MACROS
19
20#include <my_global.h>
21
22/* This C++ file's header file */
23#include "./rdb_sst_info.h"
24
25#include <inttypes.h>
26
27/* C++ standard header files */
28#include <cstdio>
29#include <string>
30#include <utility>
31#include <vector>
32
33/* MySQL header files */
34#include <mysqld_error.h>
35#include "../sql/log.h"
36#include "./my_dir.h"
37
38/* RocksDB header files */
39#include "rocksdb/db.h"
40#include "rocksdb/options.h"
41
42/* MyRocks header files */
43#include "./ha_rocksdb.h"
44#include "./ha_rocksdb_proto.h"
45#include "./rdb_cf_options.h"
46#include "./rdb_psi.h"
47
48namespace myrocks {
49
50Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file(
51 rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
52 const rocksdb::DBOptions &db_options, const std::string &name,
53 const bool tracing)
54 : m_db(db), m_cf(cf), m_db_options(db_options), m_sst_file_writer(nullptr),
55 m_name(name), m_tracing(tracing), m_comparator(cf->GetComparator()) {
56 DBUG_ASSERT(db != nullptr);
57 DBUG_ASSERT(cf != nullptr);
58}
59
60Rdb_sst_file_ordered::Rdb_sst_file::~Rdb_sst_file() {
61 // Make sure we clean up
62 delete m_sst_file_writer;
63 m_sst_file_writer = nullptr;
64
65 // In case something went wrong attempt to delete the temporary file.
66 // If everything went fine that file will have been renamed and this
67 // function call will fail.
68 std::remove(m_name.c_str());
69}
70
71rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() {
72 DBUG_ASSERT(m_sst_file_writer == nullptr);
73
74 rocksdb::ColumnFamilyDescriptor cf_descr;
75
76 rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
77 if (!s.ok()) {
78 return s;
79 }
80
81 // Create an sst file writer with the current options and comparator
82 const rocksdb::EnvOptions env_options(m_db_options);
83 const rocksdb::Options options(m_db_options, cf_descr.options);
84
85 m_sst_file_writer =
86 new rocksdb::SstFileWriter(env_options, options, m_comparator, m_cf);
87
88 s = m_sst_file_writer->Open(m_name);
89 if (m_tracing) {
90 // NO_LINT_DEBUG
91 sql_print_information("SST Tracing: Open(%s) returned %s", m_name.c_str(),
92 s.ok() ? "ok" : "not ok");
93 }
94
95 if (!s.ok()) {
96 delete m_sst_file_writer;
97 m_sst_file_writer = nullptr;
98 }
99
100 return s;
101}
102
103rocksdb::Status
104Rdb_sst_file_ordered::Rdb_sst_file::put(const rocksdb::Slice &key,
105 const rocksdb::Slice &value) {
106 DBUG_ASSERT(m_sst_file_writer != nullptr);
107
108#ifdef __GNUC__
109 // Add the specified key/value to the sst file writer
110#pragma GCC diagnostic push
111#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
112#endif
113#ifdef _MSC_VER
114#pragma warning (disable : 4996)
115#endif
116 return m_sst_file_writer->Add(key, value);
117}
118
119std::string
120Rdb_sst_file_ordered::Rdb_sst_file::generateKey(const std::string &key) {
121 static char const hexdigit[] = {'0', '1', '2', '3', '4', '5', '6', '7',
122 '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
123
124 std::string res;
125
126 res.reserve(key.size() * 2);
127
128 for (auto ch : key) {
129 res += hexdigit[((uint8_t)ch) >> 4];
130 res += hexdigit[((uint8_t)ch) & 0x0F];
131 }
132
133 return res;
134}
135
136// This function is run by the background thread
137rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() {
138 DBUG_ASSERT(m_sst_file_writer != nullptr);
139
140 rocksdb::Status s;
141 rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified
142
143 // Close out the sst file
144 s = m_sst_file_writer->Finish(&fileinfo);
145 if (m_tracing) {
146 // NO_LINT_DEBUG
147 sql_print_information("SST Tracing: Finish returned %s",
148 s.ok() ? "ok" : "not ok");
149 }
150
151 if (s.ok()) {
152 if (m_tracing) {
153 // NO_LINT_DEBUG
154 sql_print_information("SST Tracing: Adding file %s, smallest key: %s, "
155 "largest key: %s, file size: %" PRIu64 ", "
156 "num_entries: %" PRIu64,
157 fileinfo.file_path.c_str(),
158 generateKey(fileinfo.smallest_key).c_str(),
159 generateKey(fileinfo.largest_key).c_str(),
160 fileinfo.file_size, fileinfo.num_entries);
161 }
162
163 // Add the file to the database
164 // Set the snapshot_consistency parameter to false since no one
165 // should be accessing the table we are bulk loading
166 rocksdb::IngestExternalFileOptions opts;
167 opts.move_files = true;
168 opts.snapshot_consistency = false;
169 opts.allow_global_seqno = false;
170 opts.allow_blocking_flush = false;
171 s = m_db->IngestExternalFile(m_cf, {m_name}, opts);
172
173 if (m_tracing) {
174 // NO_LINT_DEBUG
175 sql_print_information("SST Tracing: AddFile(%s) returned %s",
176 fileinfo.file_path.c_str(),
177 s.ok() ? "ok" : "not ok");
178 }
179 }
180
181 delete m_sst_file_writer;
182 m_sst_file_writer = nullptr;
183
184 return s;
185}
186
187void Rdb_sst_file_ordered::Rdb_sst_stack::push(const rocksdb::Slice &key,
188 const rocksdb::Slice &value) {
189 if (m_buffer == nullptr) {
190 m_buffer = new char[m_buffer_size];
191 }
192
193 // Put the actual key and value data unto our stack
194 size_t key_offset = m_offset;
195 memcpy(m_buffer + m_offset, key.data(), key.size());
196 m_offset += key.size();
197 memcpy(m_buffer + m_offset, value.data(), value.size());
198 m_offset += value.size();
199
200 // Push just the offset, the key length and the value length onto the stack
201 m_stack.push(std::make_tuple(key_offset, key.size(), value.size()));
202}
203
204std::pair<rocksdb::Slice, rocksdb::Slice>
205Rdb_sst_file_ordered::Rdb_sst_stack::top() {
206 size_t offset, key_len, value_len;
207 // Pop the next item off the internal stack
208 std::tie(offset, key_len, value_len) = m_stack.top();
209
210 // Make slices from the offset (first), key length (second), and value
211 // length (third)
212 DBUG_ASSERT(m_buffer != nullptr);
213 rocksdb::Slice key(m_buffer + offset, key_len);
214 rocksdb::Slice value(m_buffer + offset + key_len, value_len);
215
216 return std::make_pair(key, value);
217}
218
219Rdb_sst_file_ordered::Rdb_sst_file_ordered(
220 rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
221 const rocksdb::DBOptions &db_options, const std::string &name,
222 const bool tracing, size_t max_size)
223 : m_use_stack(false), m_first(true), m_stack(max_size),
224 m_file(db, cf, db_options, name, tracing) {
225 m_stack.reset();
226}
227
228rocksdb::Status Rdb_sst_file_ordered::apply_first() {
229 rocksdb::Slice first_key_slice(m_first_key);
230 rocksdb::Slice first_value_slice(m_first_value);
231 rocksdb::Status s;
232
233 if (m_use_stack) {
234 // Put the first key onto the stack
235 m_stack.push(first_key_slice, first_value_slice);
236 } else {
237 // Put the first key into the SST
238 s = m_file.put(first_key_slice, first_value_slice);
239 if (!s.ok()) {
240 return s;
241 }
242 }
243
244 // Clear out the 'first' strings for next key/value
245 m_first_key.clear();
246 m_first_value.clear();
247
248 return s;
249}
250
251rocksdb::Status Rdb_sst_file_ordered::put(const rocksdb::Slice &key,
252 const rocksdb::Slice &value) {
253 rocksdb::Status s;
254
255 // If this is the first key, just store a copy of the key and value
256 if (m_first) {
257 m_first_key = key.ToString();
258 m_first_value = value.ToString();
259 m_first = false;
260 return rocksdb::Status::OK();
261 }
262
263 // If the first key is not empty we must be the second key. Compare the
264 // new key with the first key to determine if the data will go straight
265 // the SST or be put on the stack to be retrieved later.
266 if (!m_first_key.empty()) {
267 rocksdb::Slice first_key_slice(m_first_key);
268 int cmp = m_file.compare(first_key_slice, key);
269 m_use_stack = (cmp > 0);
270
271 // Apply the first key to the stack or SST
272 s = apply_first();
273 if (!s.ok()) {
274 return s;
275 }
276 }
277
278 // Put this key on the stack or into the SST
279 if (m_use_stack) {
280 m_stack.push(key, value);
281 } else {
282 s = m_file.put(key, value);
283 }
284
285 return s;
286}
287
288rocksdb::Status Rdb_sst_file_ordered::commit() {
289 rocksdb::Status s;
290
291 // Make sure we get the first key if it was the only key given to us.
292 if (!m_first_key.empty()) {
293 s = apply_first();
294 if (!s.ok()) {
295 return s;
296 }
297 }
298
299 if (m_use_stack) {
300 rocksdb::Slice key;
301 rocksdb::Slice value;
302
303 // We are ready to commit, pull each entry off the stack (which reverses
304 // the original data) and send it to the SST file.
305 while (!m_stack.empty()) {
306 std::tie(key, value) = m_stack.top();
307 s = m_file.put(key, value);
308 if (!s.ok()) {
309 return s;
310 }
311
312 m_stack.pop();
313 }
314
315 // We have pulled everything off the stack, reset for the next time
316 m_stack.reset();
317 m_use_stack = false;
318 }
319
320 // reset m_first
321 m_first = true;
322
323 return m_file.commit();
324}
325
326Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
327 const std::string &indexname,
328 rocksdb::ColumnFamilyHandle *const cf,
329 const rocksdb::DBOptions &db_options,
330 const bool &tracing)
331 : m_db(db), m_cf(cf), m_db_options(db_options), m_curr_size(0),
332 m_sst_count(0), m_background_error(HA_EXIT_SUCCESS), m_committed(false),
333#if defined(RDB_SST_INFO_USE_THREAD)
334 m_queue(), m_mutex(), m_cond(), m_thread(nullptr), m_finished(false),
335#endif
336 m_sst_file(nullptr), m_tracing(tracing), m_print_client_error(true) {
337 m_prefix = db->GetName() + "/";
338
339 std::string normalized_table;
340 if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) {
341 // We failed to get a normalized table name. This should never happen,
342 // but handle it anyway.
343 m_prefix += "fallback_" + std::to_string(reinterpret_cast<intptr_t>(
344 reinterpret_cast<void *>(this))) +
345 "_" + indexname + "_";
346 } else {
347 m_prefix += normalized_table + "_" + indexname + "_";
348 }
349
350 // Unique filename generated to prevent collisions when the same table
351 // is loaded in parallel
352 m_prefix += std::to_string(m_prefix_counter.fetch_add(1)) + "_";
353
354 rocksdb::ColumnFamilyDescriptor cf_descr;
355 const rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
356 if (!s.ok()) {
357 // Default size if we can't get the cf's target size
358 m_max_size = 64 * 1024 * 1024;
359 } else {
360 // Set the maximum size to 3 times the cf's target size
361 m_max_size = cf_descr.options.target_file_size_base * 3;
362 }
363 mysql_mutex_init(rdb_sst_commit_key, &m_commit_mutex, MY_MUTEX_INIT_FAST);
364}
365
366Rdb_sst_info::~Rdb_sst_info() {
367 DBUG_ASSERT(m_sst_file == nullptr);
368#if defined(RDB_SST_INFO_USE_THREAD)
369 DBUG_ASSERT(m_thread == nullptr);
370#endif
371 mysql_mutex_destroy(&m_commit_mutex);
372}
373
374int Rdb_sst_info::open_new_sst_file() {
375 DBUG_ASSERT(m_sst_file == nullptr);
376
377 // Create the new sst file's name
378 const std::string name = m_prefix + std::to_string(m_sst_count++) + m_suffix;
379
380 // Create the new sst file object
381 m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options,
382 name, m_tracing, m_max_size);
383
384 // Open the sst file
385 const rocksdb::Status s = m_sst_file->open();
386 if (!s.ok()) {
387 set_error_msg(m_sst_file->get_name(), s);
388 delete m_sst_file;
389 m_sst_file = nullptr;
390 return HA_ERR_ROCKSDB_BULK_LOAD;
391 }
392
393 m_curr_size = 0;
394
395 return HA_EXIT_SUCCESS;
396}
397
398void Rdb_sst_info::close_curr_sst_file() {
399 DBUG_ASSERT(m_sst_file != nullptr);
400 DBUG_ASSERT(m_curr_size > 0);
401
402#if defined(RDB_SST_INFO_USE_THREAD)
403 if (m_thread == nullptr) {
404 // We haven't already started a background thread, so start one
405 m_thread = new std::thread(thread_fcn, this);
406 }
407
408 DBUG_ASSERT(m_thread != nullptr);
409
410 {
411 // Add this finished sst file to the queue (while holding mutex)
412 const std::lock_guard<std::mutex> guard(m_mutex);
413 m_queue.push(m_sst_file);
414 }
415
416 // Notify the background thread that there is a new entry in the queue
417 m_cond.notify_one();
418#else
419 const rocksdb::Status s = m_sst_file->commit();
420 if (!s.ok()) {
421 set_error_msg(m_sst_file->get_name(), s);
422 set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
423 }
424
425 delete m_sst_file;
426#endif
427
428 // Reset for next sst file
429 m_sst_file = nullptr;
430 m_curr_size = 0;
431}
432
433int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) {
434 int rc;
435
436 DBUG_ASSERT(!m_committed);
437
438 if (m_curr_size + key.size() + value.size() >= m_max_size) {
439 // The current sst file has reached its maximum, close it out
440 close_curr_sst_file();
441
442 // While we are here, check to see if we have had any errors from the
443 // background thread - we don't want to wait for the end to report them
444 if (have_background_error()) {
445 return get_and_reset_background_error();
446 }
447 }
448
449 if (m_curr_size == 0) {
450 // We don't have an sst file open - open one
451 rc = open_new_sst_file();
452 if (rc != 0) {
453 return rc;
454 }
455 }
456
457 DBUG_ASSERT(m_sst_file != nullptr);
458
459 // Add the key/value to the current sst file
460 const rocksdb::Status s = m_sst_file->put(key, value);
461 if (!s.ok()) {
462 set_error_msg(m_sst_file->get_name(), s);
463 return HA_ERR_ROCKSDB_BULK_LOAD;
464 }
465
466 m_curr_size += key.size() + value.size();
467
468 return HA_EXIT_SUCCESS;
469}
470
471int Rdb_sst_info::commit(bool print_client_error) {
472 int ret = HA_EXIT_SUCCESS;
473
474 // Both the transaction clean up and the ha_rocksdb handler have
475 // references to this Rdb_sst_info and both can call commit, so
476 // synchronize on the object here.
477 RDB_MUTEX_LOCK_CHECK(m_commit_mutex);
478
479 if (m_committed) {
480 RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
481 return ret;
482 }
483
484 m_print_client_error = print_client_error;
485
486 if (m_curr_size > 0) {
487 // Close out any existing files
488 close_curr_sst_file();
489 }
490
491#if defined(RDB_SST_INFO_USE_THREAD)
492 if (m_thread != nullptr) {
493 // Tell the background thread we are done
494 m_finished = true;
495 m_cond.notify_one();
496
497 // Wait for the background thread to finish
498 m_thread->join();
499 delete m_thread;
500 m_thread = nullptr;
501 }
502#endif
503
504 m_committed = true;
505 RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
506
507 // Did we get any errors?
508 if (have_background_error()) {
509 ret = get_and_reset_background_error();
510 }
511
512 m_print_client_error = true;
513 return ret;
514}
515
516void Rdb_sst_info::set_error_msg(const std::string &sst_file_name,
517 const rocksdb::Status &s) {
518
519 if (!m_print_client_error)
520 return;
521
522#if defined(RDB_SST_INFO_USE_THREAD)
523 // Both the foreground and background threads can set the error message
524 // so lock the mutex to protect it. We only want the first error that
525 // we encounter.
526 const std::lock_guard<std::mutex> guard(m_mutex);
527#endif
528 if (s.IsInvalidArgument() &&
529 strcmp(s.getState(), "Keys must be added in order") == 0) {
530 my_printf_error(ER_KEYS_OUT_OF_ORDER,
531 "Rows must be inserted in primary key order "
532 "during bulk load operation",
533 MYF(0));
534 } else if (s.IsInvalidArgument() &&
535 strcmp(s.getState(), "Global seqno is required, but disabled") ==
536 0) {
537 my_printf_error(ER_OVERLAPPING_KEYS, "Rows inserted during bulk load "
538 "must not overlap existing rows",
539 MYF(0));
540 } else {
541 my_printf_error(ER_UNKNOWN_ERROR, "[%s] bulk load error: %s", MYF(0),
542 sst_file_name.c_str(), s.ToString().c_str());
543 }
544}
545
546#if defined(RDB_SST_INFO_USE_THREAD)
547// Static thread function - the Rdb_sst_info object is in 'object'
548void Rdb_sst_info::thread_fcn(void *object) {
549 reinterpret_cast<Rdb_sst_info *>(object)->run_thread();
550}
551
552void Rdb_sst_info::run_thread() {
553 std::unique_lock<std::mutex> lk(m_mutex);
554
555 do {
556 // Wait for notification or 1 second to pass
557 m_cond.wait_for(lk, std::chrono::seconds(1));
558
559 // Inner loop pulls off all Rdb_sst_file_ordered entries and processes them
560 while (!m_queue.empty()) {
561 Rdb_sst_file_ordered *const sst_file = m_queue.front();
562 m_queue.pop();
563
564 // Release the lock - we don't want to hold it while committing the file
565 lk.unlock();
566
567 // Close out the sst file and add it to the database
568 const rocksdb::Status s = sst_file->commit();
569 if (!s.ok()) {
570 set_error_msg(sst_file->get_name(), s);
571 set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
572 }
573
574 delete sst_file;
575
576 // Reacquire the lock for the next inner loop iteration
577 lk.lock();
578 }
579
580 // If the queue is empty and the main thread has indicated we should exit
581 // break out of the loop.
582 } while (!m_finished);
583
584 DBUG_ASSERT(m_queue.empty());
585}
586#endif
587
588void Rdb_sst_info::init(const rocksdb::DB *const db) {
589 const std::string path = db->GetName() + FN_DIRSEP;
590 struct st_my_dir *const dir_info = my_dir(path.c_str(), MYF(MY_DONT_SORT));
591
592 // Access the directory
593 if (dir_info == nullptr) {
594 // NO_LINT_DEBUG
595 sql_print_warning("RocksDB: Could not access database directory: %s",
596 path.c_str());
597 return;
598 }
599
600 // Scan through the files in the directory
601 const struct fileinfo *file_info = dir_info->dir_entry;
602 for (uint ii= 0; ii < dir_info->number_of_files; ii++, file_info++) {
603 // find any files ending with m_suffix ...
604 const std::string name = file_info->name;
605 const size_t pos = name.find(m_suffix);
606 if (pos != std::string::npos && name.size() - pos == m_suffix.size()) {
607 // ... and remove them
608 const std::string fullname = path + name;
609 my_delete(fullname.c_str(), MYF(0));
610 }
611 }
612
613 // Release the directory entry
614 my_dirend(dir_info);
615}
616
617std::atomic<uint64_t> Rdb_sst_info::m_prefix_counter(0);
618std::string Rdb_sst_info::m_suffix = ".bulk_load.tmp";
619} // namespace myrocks
620