1/*
2 Portions Copyright (c) 2016-Present, Facebook, Inc.
3 Portions Copyright (c) 2012, Monty Program Ab
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17#pragma once
18
19#ifdef USE_PRAGMA_IMPLEMENTATION
20#pragma implementation // gcc: Class implementation
21#endif
22
23/* C++ system header files */
24#include <string>
25#include <time.h>
26#include <ctime>
27
28/* RocksDB includes */
29#include "rocksdb/compaction_filter.h"
30
31/* MyRocks includes */
32#include "./ha_rocksdb_proto.h"
33#include "./rdb_datadic.h"
34
35namespace myrocks {
36
37class Rdb_compact_filter : public rocksdb::CompactionFilter {
38public:
39 Rdb_compact_filter(const Rdb_compact_filter &) = delete;
40 Rdb_compact_filter &operator=(const Rdb_compact_filter &) = delete;
41
42 explicit Rdb_compact_filter(uint32_t _cf_id) : m_cf_id(_cf_id) {}
43 ~Rdb_compact_filter() {
44 // Increment stats by num expired at the end of compaction
45 rdb_update_global_stats(ROWS_EXPIRED, m_num_expired);
46 }
47
48 // keys are passed in sorted order within the same sst.
49 // V1 Filter is thread safe on our usage (creating from Factory).
50 // Make sure to protect instance variables when switching to thread
51 // unsafe in the future.
52 virtual bool Filter(int level, const rocksdb::Slice &key,
53 const rocksdb::Slice &existing_value,
54 std::string *new_value,
55 bool *value_changed) const override {
56 DBUG_ASSERT(key.size() >= sizeof(uint32));
57
58 GL_INDEX_ID gl_index_id;
59 gl_index_id.cf_id = m_cf_id;
60 gl_index_id.index_id = rdb_netbuf_to_uint32((const uchar *)key.data());
61 DBUG_ASSERT(gl_index_id.index_id >= 1);
62
63 if (gl_index_id != m_prev_index) {
64 m_should_delete =
65 rdb_get_dict_manager()->is_drop_index_ongoing(gl_index_id);
66
67 if (!m_should_delete) {
68 get_ttl_duration_and_offset(gl_index_id, &m_ttl_duration,
69 &m_ttl_offset);
70
71 if (m_ttl_duration != 0 && m_snapshot_timestamp == 0) {
72 /*
73 For efficiency reasons, we lazily call GetIntProperty to get the
74 oldest snapshot time (occurs once per compaction).
75 */
76 rocksdb::DB *const rdb = rdb_get_rocksdb_db();
77 if (!rdb->GetIntProperty(rocksdb::DB::Properties::kOldestSnapshotTime,
78 &m_snapshot_timestamp) ||
79 m_snapshot_timestamp == 0) {
80 m_snapshot_timestamp = static_cast<uint64_t>(std::time(nullptr));
81 }
82
83#ifndef NDEBUG
84 int snapshot_ts = rdb_dbug_set_ttl_snapshot_ts();
85 if (snapshot_ts) {
86 m_snapshot_timestamp =
87 static_cast<uint64_t>(std::time(nullptr)) + snapshot_ts;
88 }
89#endif
90 }
91 }
92
93 m_prev_index = gl_index_id;
94 }
95
96 if (m_should_delete) {
97 m_num_deleted++;
98 return true;
99 } else if (m_ttl_duration > 0 &&
100 should_filter_ttl_rec(key, existing_value)) {
101 m_num_expired++;
102 return true;
103 }
104
105 return false;
106 }
107
108 virtual bool IgnoreSnapshots() const override { return true; }
109
110 virtual const char *Name() const override { return "Rdb_compact_filter"; }
111
112 void get_ttl_duration_and_offset(const GL_INDEX_ID &gl_index_id,
113 uint64 *ttl_duration,
114 uint32 *ttl_offset) const {
115 DBUG_ASSERT(ttl_duration != nullptr);
116 /*
117 If TTL is disabled set ttl_duration to 0. This prevents the compaction
118 filter from dropping expired records.
119 */
120 if (!rdb_is_ttl_enabled()) {
121 *ttl_duration = 0;
122 return;
123 }
124
125 /*
126 If key is part of system column family, it's definitely not a TTL key.
127 */
128 rocksdb::ColumnFamilyHandle *s_cf = rdb_get_dict_manager()->get_system_cf();
129 if (s_cf == nullptr || gl_index_id.cf_id == s_cf->GetID()) {
130 *ttl_duration = 0;
131 return;
132 }
133
134 struct Rdb_index_info index_info;
135 if (!rdb_get_dict_manager()->get_index_info(gl_index_id, &index_info)) {
136 // NO_LINT_DEBUG
137 sql_print_error("RocksDB: Could not get index information "
138 "for Index Number (%u,%u)",
139 gl_index_id.cf_id, gl_index_id.index_id);
140 }
141
142#ifndef NDEBUG
143 if (rdb_dbug_set_ttl_ignore_pk() &&
144 index_info.m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY) {
145 *ttl_duration = 0;
146 return;
147 }
148#endif
149
150 *ttl_duration = index_info.m_ttl_duration;
151 if (Rdb_key_def::has_index_flag(index_info.m_index_flags,
152 Rdb_key_def::TTL_FLAG)) {
153 *ttl_offset = Rdb_key_def::calculate_index_flag_offset(
154 index_info.m_index_flags, Rdb_key_def::TTL_FLAG);
155 }
156 }
157
158 bool should_filter_ttl_rec(const rocksdb::Slice &key,
159 const rocksdb::Slice &existing_value) const {
160 uint64 ttl_timestamp;
161 Rdb_string_reader reader(&existing_value);
162 if (!reader.read(m_ttl_offset) || reader.read_uint64(&ttl_timestamp)) {
163 std::string buf;
164 buf = rdb_hexdump(existing_value.data(), existing_value.size(),
165 RDB_MAX_HEXDUMP_LEN);
166 // NO_LINT_DEBUG
167 sql_print_error("Decoding ttl from PK value failed in compaction filter, "
168 "for index (%u,%u), val: %s",
169 m_prev_index.cf_id, m_prev_index.index_id, buf.c_str());
170 abort();
171 }
172
173 /*
174 Filter out the record only if it is older than the oldest snapshot
175 timestamp. This prevents any rows from expiring in the middle of
176 long-running transactions.
177 */
178 return ttl_timestamp + m_ttl_duration <= m_snapshot_timestamp;
179 }
180
181 private:
182 // Column family for this compaction filter
183 const uint32_t m_cf_id;
184 // Index id of the previous record
185 mutable GL_INDEX_ID m_prev_index = {0, 0};
186 // Number of rows deleted for the same index id
187 mutable uint64 m_num_deleted = 0;
188 // Number of rows expired for the TTL index
189 mutable uint64 m_num_expired = 0;
190 // Current index id should be deleted or not (should be deleted if true)
191 mutable bool m_should_delete = false;
192 // TTL duration for the current index if TTL is enabled
193 mutable uint64 m_ttl_duration = 0;
194 // TTL offset for all records in the current index
195 mutable uint32 m_ttl_offset = 0;
196 // Oldest snapshot timestamp at the time a TTL index is discovered
197 mutable uint64_t m_snapshot_timestamp = 0;
198};
199
200class Rdb_compact_filter_factory : public rocksdb::CompactionFilterFactory {
201public:
202 Rdb_compact_filter_factory(const Rdb_compact_filter_factory &) = delete;
203 Rdb_compact_filter_factory &
204 operator=(const Rdb_compact_filter_factory &) = delete;
205 Rdb_compact_filter_factory() {}
206
207 ~Rdb_compact_filter_factory() {}
208
209 const char *Name() const override { return "Rdb_compact_filter_factory"; }
210
211 std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
212 const rocksdb::CompactionFilter::Context &context) override {
213 return std::unique_ptr<rocksdb::CompactionFilter>(
214 new Rdb_compact_filter(context.column_family_id));
215 }
216};
217
218} // namespace myrocks
219