1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License |
4 | // (found in the LICENSE.Apache file in the root directory). |
5 | |
6 | #include "rocksdb/sst_file_writer.h" |
7 | |
8 | #include <vector> |
9 | #include "db/dbformat.h" |
10 | #include "rocksdb/table.h" |
11 | #include "table/block_based_table_builder.h" |
12 | #include "table/sst_file_writer_collectors.h" |
13 | #include "util/file_reader_writer.h" |
14 | #include "util/sync_point.h" |
15 | |
16 | namespace rocksdb { |
17 | |
18 | const std::string ExternalSstFilePropertyNames::kVersion = |
19 | "rocksdb.external_sst_file.version" ; |
20 | const std::string ExternalSstFilePropertyNames::kGlobalSeqno = |
21 | "rocksdb.external_sst_file.global_seqno" ; |
22 | |
23 | #ifndef ROCKSDB_LITE |
24 | |
25 | const size_t kFadviseTrigger = 1024 * 1024; // 1MB |
26 | |
27 | struct SstFileWriter::Rep { |
28 | Rep(const EnvOptions& _env_options, const Options& options, |
29 | Env::IOPriority _io_priority, const Comparator* _user_comparator, |
30 | ColumnFamilyHandle* _cfh, bool _invalidate_page_cache) |
31 | : env_options(_env_options), |
32 | ioptions(options), |
33 | mutable_cf_options(options), |
34 | io_priority(_io_priority), |
35 | internal_comparator(_user_comparator), |
36 | cfh(_cfh), |
37 | invalidate_page_cache(_invalidate_page_cache), |
38 | last_fadvise_size(0) {} |
39 | |
40 | std::unique_ptr<WritableFileWriter> file_writer; |
41 | std::unique_ptr<TableBuilder> builder; |
42 | EnvOptions env_options; |
43 | ImmutableCFOptions ioptions; |
44 | MutableCFOptions mutable_cf_options; |
45 | Env::IOPriority io_priority; |
46 | InternalKeyComparator internal_comparator; |
47 | ExternalSstFileInfo file_info; |
48 | InternalKey ikey; |
49 | std::string column_family_name; |
50 | ColumnFamilyHandle* cfh; |
51 | // If true, We will give the OS a hint that this file pages is not needed |
52 | // everytime we write 1MB to the file. |
53 | bool invalidate_page_cache; |
54 | // The size of the file during the last time we called Fadvise to remove |
55 | // cached pages from page cache. |
56 | uint64_t last_fadvise_size; |
57 | Status Add(const Slice& user_key, const Slice& value, |
58 | const ValueType value_type) { |
59 | if (!builder) { |
60 | return Status::InvalidArgument("File is not opened" ); |
61 | } |
62 | |
63 | if (file_info.num_entries == 0) { |
64 | file_info.smallest_key.assign(user_key.data(), user_key.size()); |
65 | } else { |
66 | if (internal_comparator.user_comparator()->Compare( |
67 | user_key, file_info.largest_key) <= 0) { |
68 | // Make sure that keys are added in order |
69 | return Status::InvalidArgument("Keys must be added in order" ); |
70 | } |
71 | } |
72 | |
73 | // TODO(tec) : For external SST files we could omit the seqno and type. |
74 | switch (value_type) { |
75 | case ValueType::kTypeValue: |
76 | ikey.Set(user_key, 0 /* Sequence Number */, |
77 | ValueType::kTypeValue /* Put */); |
78 | break; |
79 | case ValueType::kTypeMerge: |
80 | ikey.Set(user_key, 0 /* Sequence Number */, |
81 | ValueType::kTypeMerge /* Merge */); |
82 | break; |
83 | case ValueType::kTypeDeletion: |
84 | ikey.Set(user_key, 0 /* Sequence Number */, |
85 | ValueType::kTypeDeletion /* Delete */); |
86 | break; |
87 | default: |
88 | return Status::InvalidArgument("Value type is not supported" ); |
89 | } |
90 | builder->Add(ikey.Encode(), value); |
91 | |
92 | // update file info |
93 | file_info.num_entries++; |
94 | file_info.largest_key.assign(user_key.data(), user_key.size()); |
95 | file_info.file_size = builder->FileSize(); |
96 | |
97 | InvalidatePageCache(false /* closing */); |
98 | |
99 | return Status::OK(); |
100 | } |
101 | |
102 | void InvalidatePageCache(bool closing) { |
103 | if (invalidate_page_cache == false) { |
104 | // Fadvise disabled |
105 | return; |
106 | } |
107 | uint64_t bytes_since_last_fadvise = |
108 | builder->FileSize() - last_fadvise_size; |
109 | if (bytes_since_last_fadvise > kFadviseTrigger || closing) { |
110 | TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache" , |
111 | &(bytes_since_last_fadvise)); |
112 | // Tell the OS that we dont need this file in page cache |
113 | file_writer->InvalidateCache(0, 0); |
114 | last_fadvise_size = builder->FileSize(); |
115 | } |
116 | } |
117 | |
118 | }; |
119 | |
120 | SstFileWriter::SstFileWriter(const EnvOptions& env_options, |
121 | const Options& options, |
122 | const Comparator* user_comparator, |
123 | ColumnFamilyHandle* column_family, |
124 | bool invalidate_page_cache, |
125 | Env::IOPriority io_priority) |
126 | : rep_(new Rep(env_options, options, io_priority, user_comparator, |
127 | column_family, invalidate_page_cache)) { |
128 | rep_->file_info.file_size = 0; |
129 | } |
130 | |
131 | SstFileWriter::~SstFileWriter() { |
132 | if (rep_->builder) { |
133 | // User did not call Finish() or Finish() failed, we need to |
134 | // abandon the builder. |
135 | rep_->builder->Abandon(); |
136 | } |
137 | } |
138 | |
139 | Status SstFileWriter::Open(const std::string& file_path) { |
140 | Rep* r = rep_.get(); |
141 | Status s; |
142 | std::unique_ptr<WritableFile> sst_file; |
143 | s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); |
144 | if (!s.ok()) { |
145 | return s; |
146 | } |
147 | |
148 | sst_file->SetIOPriority(r->io_priority); |
149 | |
150 | CompressionType compression_type; |
151 | if (r->ioptions.bottommost_compression != kDisableCompressionOption) { |
152 | compression_type = r->ioptions.bottommost_compression; |
153 | } else if (!r->ioptions.compression_per_level.empty()) { |
154 | // Use the compression of the last level if we have per level compression |
155 | compression_type = *(r->ioptions.compression_per_level.rbegin()); |
156 | } else { |
157 | compression_type = r->mutable_cf_options.compression; |
158 | } |
159 | |
160 | std::vector<std::unique_ptr<IntTblPropCollectorFactory>> |
161 | int_tbl_prop_collector_factories; |
162 | |
163 | // SstFileWriter properties collector to add SstFileWriter version. |
164 | int_tbl_prop_collector_factories.emplace_back( |
165 | new SstFileWriterPropertiesCollectorFactory(2 /* version */, |
166 | 0 /* global_seqno*/)); |
167 | |
168 | // User collector factories |
169 | auto user_collector_factories = |
170 | r->ioptions.table_properties_collector_factories; |
171 | for (size_t i = 0; i < user_collector_factories.size(); i++) { |
172 | int_tbl_prop_collector_factories.emplace_back( |
173 | new UserKeyTablePropertiesCollectorFactory( |
174 | user_collector_factories[i])); |
175 | } |
176 | int unknown_level = -1; |
177 | uint32_t cf_id; |
178 | |
179 | if (r->cfh != nullptr) { |
180 | // user explicitly specified that this file will be ingested into cfh, |
181 | // we can persist this information in the file. |
182 | cf_id = r->cfh->GetID(); |
183 | r->column_family_name = r->cfh->GetName(); |
184 | } else { |
185 | r->column_family_name = "" ; |
186 | cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; |
187 | } |
188 | |
189 | TableBuilderOptions table_builder_options( |
190 | r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, |
191 | compression_type, r->ioptions.compression_opts, |
192 | nullptr /* compression_dict */, false /* skip_filters */, |
193 | r->column_family_name, unknown_level); |
194 | r->file_writer.reset( |
195 | new WritableFileWriter(std::move(sst_file), r->env_options)); |
196 | |
197 | // TODO(tec) : If table_factory is using compressed block cache, we will |
198 | // be adding the external sst file blocks into it, which is wasteful. |
199 | r->builder.reset(r->ioptions.table_factory->NewTableBuilder( |
200 | table_builder_options, cf_id, r->file_writer.get())); |
201 | |
202 | r->file_info.file_path = file_path; |
203 | r->file_info.file_size = 0; |
204 | r->file_info.num_entries = 0; |
205 | r->file_info.sequence_number = 0; |
206 | r->file_info.version = 2; |
207 | return s; |
208 | } |
209 | |
210 | Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { |
211 | return rep_->Add(user_key, value, ValueType::kTypeValue); |
212 | } |
213 | |
214 | Status SstFileWriter::Put(const Slice& user_key, const Slice& value) { |
215 | return rep_->Add(user_key, value, ValueType::kTypeValue); |
216 | } |
217 | |
218 | Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) { |
219 | return rep_->Add(user_key, value, ValueType::kTypeMerge); |
220 | } |
221 | |
222 | Status SstFileWriter::Delete(const Slice& user_key) { |
223 | return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion); |
224 | } |
225 | |
226 | Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { |
227 | Rep* r = rep_.get(); |
228 | if (!r->builder) { |
229 | return Status::InvalidArgument("File is not opened" ); |
230 | } |
231 | if (r->file_info.num_entries == 0) { |
232 | return Status::InvalidArgument("Cannot create sst file with no entries" ); |
233 | } |
234 | |
235 | Status s = r->builder->Finish(); |
236 | r->file_info.file_size = r->builder->FileSize(); |
237 | |
238 | if (s.ok()) { |
239 | s = r->file_writer->Sync(r->ioptions.use_fsync); |
240 | r->InvalidatePageCache(true /* closing */); |
241 | if (s.ok()) { |
242 | s = r->file_writer->Close(); |
243 | } |
244 | } |
245 | if (!s.ok()) { |
246 | r->ioptions.env->DeleteFile(r->file_info.file_path); |
247 | } |
248 | |
249 | if (file_info != nullptr) { |
250 | *file_info = r->file_info; |
251 | } |
252 | |
253 | r->builder.reset(); |
254 | return s; |
255 | } |
256 | |
257 | uint64_t SstFileWriter::FileSize() { |
258 | return rep_->file_info.file_size; |
259 | } |
260 | #endif // !ROCKSDB_LITE |
261 | |
262 | } // namespace rocksdb |
263 | |