1/*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17#pragma once
18
19/* C++ standard header files */
20#include <atomic>
21#include <condition_variable>
22#include <mutex>
23#include <queue>
24#include <stack>
25#include <string>
26#include <thread>
27#include <utility>
28#include <vector>
29
30/* RocksDB header files */
31#include "rocksdb/db.h"
32#include "rocksdb/sst_file_writer.h"
33
34/* MyRocks header files */
35#include "./rdb_utils.h"
36
37// #define RDB_SST_INFO_USE_THREAD /* uncomment to use threads */
38
39namespace myrocks {
40
41class Rdb_sst_file_ordered {
42 private:
43 class Rdb_sst_file {
44 private:
45 Rdb_sst_file(const Rdb_sst_file &p) = delete;
46 Rdb_sst_file &operator=(const Rdb_sst_file &p) = delete;
47
48 rocksdb::DB *const m_db;
49 rocksdb::ColumnFamilyHandle *const m_cf;
50 const rocksdb::DBOptions &m_db_options;
51 rocksdb::SstFileWriter *m_sst_file_writer;
52 const std::string m_name;
53 const bool m_tracing;
54 const rocksdb::Comparator *m_comparator;
55
56 std::string generateKey(const std::string &key);
57
58 public:
59 Rdb_sst_file(rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
60 const rocksdb::DBOptions &db_options, const std::string &name,
61 const bool tracing);
62 ~Rdb_sst_file();
63
64 rocksdb::Status open();
65 rocksdb::Status put(const rocksdb::Slice &key, const rocksdb::Slice &value);
66 rocksdb::Status commit();
67
68 inline const std::string get_name() const { return m_name; }
69 inline int compare(rocksdb::Slice key1, rocksdb::Slice key2) {
70 return m_comparator->Compare(key1, key2);
71 }
72 };
73
74 class Rdb_sst_stack {
75 private:
76 char *m_buffer;
77 size_t m_buffer_size;
78 size_t m_offset;
79 std::stack<std::tuple<size_t, size_t, size_t>> m_stack;
80
81 public:
82 explicit Rdb_sst_stack(size_t max_size)
83 : m_buffer(nullptr), m_buffer_size(max_size) {}
84 ~Rdb_sst_stack() { delete[] m_buffer; }
85
86 void reset() { m_offset = 0; }
87 bool empty() { return m_stack.empty(); }
88 void push(const rocksdb::Slice &key, const rocksdb::Slice &value);
89 std::pair<rocksdb::Slice, rocksdb::Slice> top();
90 void pop() { m_stack.pop(); }
91 size_t size() { return m_stack.size(); }
92 };
93
94 bool m_use_stack;
95 bool m_first;
96 std::string m_first_key;
97 std::string m_first_value;
98 Rdb_sst_stack m_stack;
99 Rdb_sst_file m_file;
100
101 rocksdb::Status apply_first();
102
103 public:
104 Rdb_sst_file_ordered(rocksdb::DB *const db,
105 rocksdb::ColumnFamilyHandle *const cf,
106 const rocksdb::DBOptions &db_options,
107 const std::string &name, const bool tracing,
108 size_t max_size);
109
110 inline rocksdb::Status open() { return m_file.open(); }
111 rocksdb::Status put(const rocksdb::Slice &key, const rocksdb::Slice &value);
112 rocksdb::Status commit();
113 inline const std::string get_name() const { return m_file.get_name(); }
114};
115
116class Rdb_sst_info {
117 private:
118 Rdb_sst_info(const Rdb_sst_info &p) = delete;
119 Rdb_sst_info &operator=(const Rdb_sst_info &p) = delete;
120
121 rocksdb::DB *const m_db;
122 rocksdb::ColumnFamilyHandle *const m_cf;
123 const rocksdb::DBOptions &m_db_options;
124 uint64_t m_curr_size;
125 uint64_t m_max_size;
126 uint32_t m_sst_count;
127 std::atomic<int> m_background_error;
128 std::string m_prefix;
129 static std::atomic<uint64_t> m_prefix_counter;
130 static std::string m_suffix;
131 bool m_committed;
132 mysql_mutex_t m_commit_mutex;
133#if defined(RDB_SST_INFO_USE_THREAD)
134 std::queue<Rdb_sst_file_ordered *> m_queue;
135 std::mutex m_mutex;
136 std::condition_variable m_cond;
137 std::thread *m_thread;
138 bool m_finished;
139#endif
140 Rdb_sst_file_ordered *m_sst_file;
141 const bool m_tracing;
142 bool m_print_client_error;
143
144 int open_new_sst_file();
145 void close_curr_sst_file();
146 void set_error_msg(const std::string &sst_file_name,
147 const rocksdb::Status &s);
148
149#if defined(RDB_SST_INFO_USE_THREAD)
150 void run_thread();
151
152 static void thread_fcn(void *object);
153#endif
154
155 public:
156 Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
157 const std::string &indexname,
158 rocksdb::ColumnFamilyHandle *const cf,
159 const rocksdb::DBOptions &db_options, const bool &tracing);
160 ~Rdb_sst_info();
161
162 int put(const rocksdb::Slice &key, const rocksdb::Slice &value);
163 int commit(bool print_client_error = true);
164 bool is_committed() const { return m_committed; }
165
166 bool have_background_error() { return m_background_error != 0; }
167
168 int get_and_reset_background_error() {
169 int ret = m_background_error;
170 while (!m_background_error.compare_exchange_weak(ret, HA_EXIT_SUCCESS)) {
171 // Do nothing
172 }
173
174 return ret;
175 }
176
177 void set_background_error(int code) {
178 int expected = HA_EXIT_SUCCESS;
179 // Only assign 'code' into the error if it is already 0, otherwise ignore it
180 m_background_error.compare_exchange_strong(expected, code);
181 }
182
183 static void init(const rocksdb::DB *const db);
184};
185
186} // namespace myrocks
187