1 | /* |
2 | Portions Copyright (c) 2016-Present, Facebook, Inc. |
3 | Portions Copyright (c) 2012, Monty Program Ab |
4 | |
5 | This program is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published by |
7 | the Free Software Foundation; version 2 of the License. |
8 | |
9 | This program is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | GNU General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU General Public License |
15 | along with this program; if not, write to the Free Software |
16 | Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ |
17 | #pragma once |
18 | |
19 | #ifdef USE_PRAGMA_IMPLEMENTATION |
20 | #pragma implementation // gcc: Class implementation |
21 | #endif |
22 | |
23 | /* C++ system header files */ |
24 | #include <string> |
25 | #include <time.h> |
26 | #include <ctime> |
27 | |
28 | /* RocksDB includes */ |
29 | #include "rocksdb/compaction_filter.h" |
30 | |
31 | /* MyRocks includes */ |
32 | #include "./ha_rocksdb_proto.h" |
33 | #include "./rdb_datadic.h" |
34 | |
35 | namespace myrocks { |
36 | |
37 | class Rdb_compact_filter : public rocksdb::CompactionFilter { |
38 | public: |
39 | Rdb_compact_filter(const Rdb_compact_filter &) = delete; |
40 | Rdb_compact_filter &operator=(const Rdb_compact_filter &) = delete; |
41 | |
42 | explicit Rdb_compact_filter(uint32_t _cf_id) : m_cf_id(_cf_id) {} |
43 | ~Rdb_compact_filter() { |
44 | // Increment stats by num expired at the end of compaction |
45 | rdb_update_global_stats(ROWS_EXPIRED, m_num_expired); |
46 | } |
47 | |
48 | // keys are passed in sorted order within the same sst. |
49 | // V1 Filter is thread safe on our usage (creating from Factory). |
50 | // Make sure to protect instance variables when switching to thread |
51 | // unsafe in the future. |
52 | virtual bool Filter(int level, const rocksdb::Slice &key, |
53 | const rocksdb::Slice &existing_value, |
54 | std::string *new_value, |
55 | bool *value_changed) const override { |
56 | DBUG_ASSERT(key.size() >= sizeof(uint32)); |
57 | |
58 | GL_INDEX_ID gl_index_id; |
59 | gl_index_id.cf_id = m_cf_id; |
60 | gl_index_id.index_id = rdb_netbuf_to_uint32((const uchar *)key.data()); |
61 | DBUG_ASSERT(gl_index_id.index_id >= 1); |
62 | |
63 | if (gl_index_id != m_prev_index) { |
64 | m_should_delete = |
65 | rdb_get_dict_manager()->is_drop_index_ongoing(gl_index_id); |
66 | |
67 | if (!m_should_delete) { |
68 | get_ttl_duration_and_offset(gl_index_id, &m_ttl_duration, |
69 | &m_ttl_offset); |
70 | |
71 | if (m_ttl_duration != 0 && m_snapshot_timestamp == 0) { |
72 | /* |
73 | For efficiency reasons, we lazily call GetIntProperty to get the |
74 | oldest snapshot time (occurs once per compaction). |
75 | */ |
76 | rocksdb::DB *const rdb = rdb_get_rocksdb_db(); |
77 | if (!rdb->GetIntProperty(rocksdb::DB::Properties::kOldestSnapshotTime, |
78 | &m_snapshot_timestamp) || |
79 | m_snapshot_timestamp == 0) { |
80 | m_snapshot_timestamp = static_cast<uint64_t>(std::time(nullptr)); |
81 | } |
82 | |
83 | #ifndef NDEBUG |
84 | int snapshot_ts = rdb_dbug_set_ttl_snapshot_ts(); |
85 | if (snapshot_ts) { |
86 | m_snapshot_timestamp = |
87 | static_cast<uint64_t>(std::time(nullptr)) + snapshot_ts; |
88 | } |
89 | #endif |
90 | } |
91 | } |
92 | |
93 | m_prev_index = gl_index_id; |
94 | } |
95 | |
96 | if (m_should_delete) { |
97 | m_num_deleted++; |
98 | return true; |
99 | } else if (m_ttl_duration > 0 && |
100 | should_filter_ttl_rec(key, existing_value)) { |
101 | m_num_expired++; |
102 | return true; |
103 | } |
104 | |
105 | return false; |
106 | } |
107 | |
108 | virtual bool IgnoreSnapshots() const override { return true; } |
109 | |
110 | virtual const char *Name() const override { return "Rdb_compact_filter" ; } |
111 | |
112 | void get_ttl_duration_and_offset(const GL_INDEX_ID &gl_index_id, |
113 | uint64 *ttl_duration, |
114 | uint32 *ttl_offset) const { |
115 | DBUG_ASSERT(ttl_duration != nullptr); |
116 | /* |
117 | If TTL is disabled set ttl_duration to 0. This prevents the compaction |
118 | filter from dropping expired records. |
119 | */ |
120 | if (!rdb_is_ttl_enabled()) { |
121 | *ttl_duration = 0; |
122 | return; |
123 | } |
124 | |
125 | /* |
126 | If key is part of system column family, it's definitely not a TTL key. |
127 | */ |
128 | rocksdb::ColumnFamilyHandle *s_cf = rdb_get_dict_manager()->get_system_cf(); |
129 | if (s_cf == nullptr || gl_index_id.cf_id == s_cf->GetID()) { |
130 | *ttl_duration = 0; |
131 | return; |
132 | } |
133 | |
134 | struct Rdb_index_info index_info; |
135 | if (!rdb_get_dict_manager()->get_index_info(gl_index_id, &index_info)) { |
136 | // NO_LINT_DEBUG |
137 | sql_print_error("RocksDB: Could not get index information " |
138 | "for Index Number (%u,%u)" , |
139 | gl_index_id.cf_id, gl_index_id.index_id); |
140 | } |
141 | |
142 | #ifndef NDEBUG |
143 | if (rdb_dbug_set_ttl_ignore_pk() && |
144 | index_info.m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY) { |
145 | *ttl_duration = 0; |
146 | return; |
147 | } |
148 | #endif |
149 | |
150 | *ttl_duration = index_info.m_ttl_duration; |
151 | if (Rdb_key_def::has_index_flag(index_info.m_index_flags, |
152 | Rdb_key_def::TTL_FLAG)) { |
153 | *ttl_offset = Rdb_key_def::calculate_index_flag_offset( |
154 | index_info.m_index_flags, Rdb_key_def::TTL_FLAG); |
155 | } |
156 | } |
157 | |
158 | bool should_filter_ttl_rec(const rocksdb::Slice &key, |
159 | const rocksdb::Slice &existing_value) const { |
160 | uint64 ttl_timestamp; |
161 | Rdb_string_reader reader(&existing_value); |
162 | if (!reader.read(m_ttl_offset) || reader.read_uint64(&ttl_timestamp)) { |
163 | std::string buf; |
164 | buf = rdb_hexdump(existing_value.data(), existing_value.size(), |
165 | RDB_MAX_HEXDUMP_LEN); |
166 | // NO_LINT_DEBUG |
167 | sql_print_error("Decoding ttl from PK value failed in compaction filter, " |
168 | "for index (%u,%u), val: %s" , |
169 | m_prev_index.cf_id, m_prev_index.index_id, buf.c_str()); |
170 | abort(); |
171 | } |
172 | |
173 | /* |
174 | Filter out the record only if it is older than the oldest snapshot |
175 | timestamp. This prevents any rows from expiring in the middle of |
176 | long-running transactions. |
177 | */ |
178 | return ttl_timestamp + m_ttl_duration <= m_snapshot_timestamp; |
179 | } |
180 | |
181 | private: |
182 | // Column family for this compaction filter |
183 | const uint32_t m_cf_id; |
184 | // Index id of the previous record |
185 | mutable GL_INDEX_ID m_prev_index = {0, 0}; |
186 | // Number of rows deleted for the same index id |
187 | mutable uint64 m_num_deleted = 0; |
188 | // Number of rows expired for the TTL index |
189 | mutable uint64 m_num_expired = 0; |
190 | // Current index id should be deleted or not (should be deleted if true) |
191 | mutable bool m_should_delete = false; |
192 | // TTL duration for the current index if TTL is enabled |
193 | mutable uint64 m_ttl_duration = 0; |
194 | // TTL offset for all records in the current index |
195 | mutable uint32 m_ttl_offset = 0; |
196 | // Oldest snapshot timestamp at the time a TTL index is discovered |
197 | mutable uint64_t m_snapshot_timestamp = 0; |
198 | }; |
199 | |
200 | class Rdb_compact_filter_factory : public rocksdb::CompactionFilterFactory { |
201 | public: |
202 | Rdb_compact_filter_factory(const Rdb_compact_filter_factory &) = delete; |
203 | Rdb_compact_filter_factory & |
204 | operator=(const Rdb_compact_filter_factory &) = delete; |
205 | Rdb_compact_filter_factory() {} |
206 | |
207 | ~Rdb_compact_filter_factory() {} |
208 | |
209 | const char *Name() const override { return "Rdb_compact_filter_factory" ; } |
210 | |
211 | std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter( |
212 | const rocksdb::CompactionFilter::Context &context) override { |
213 | return std::unique_ptr<rocksdb::CompactionFilter>( |
214 | new Rdb_compact_filter(context.column_family_id)); |
215 | } |
216 | }; |
217 | |
218 | } // namespace myrocks |
219 | |