| 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| 2 | // This source code is licensed under both the GPLv2 (found in the |
| 3 | // COPYING file in the root directory) and Apache 2.0 License |
| 4 | // (found in the LICENSE.Apache file in the root directory). |
| 5 | |
| 6 | #include "rocksdb/sst_file_writer.h" |
| 7 | |
| 8 | #include <vector> |
| 9 | #include "db/dbformat.h" |
| 10 | #include "rocksdb/table.h" |
| 11 | #include "table/block_based_table_builder.h" |
| 12 | #include "table/sst_file_writer_collectors.h" |
| 13 | #include "util/file_reader_writer.h" |
| 14 | #include "util/sync_point.h" |
| 15 | |
| 16 | namespace rocksdb { |
| 17 | |
| 18 | const std::string ExternalSstFilePropertyNames::kVersion = |
| 19 | "rocksdb.external_sst_file.version" ; |
| 20 | const std::string ExternalSstFilePropertyNames::kGlobalSeqno = |
| 21 | "rocksdb.external_sst_file.global_seqno" ; |
| 22 | |
| 23 | #ifndef ROCKSDB_LITE |
| 24 | |
| 25 | const size_t kFadviseTrigger = 1024 * 1024; // 1MB |
| 26 | |
| 27 | struct SstFileWriter::Rep { |
| 28 | Rep(const EnvOptions& _env_options, const Options& options, |
| 29 | Env::IOPriority _io_priority, const Comparator* _user_comparator, |
| 30 | ColumnFamilyHandle* _cfh, bool _invalidate_page_cache) |
| 31 | : env_options(_env_options), |
| 32 | ioptions(options), |
| 33 | mutable_cf_options(options), |
| 34 | io_priority(_io_priority), |
| 35 | internal_comparator(_user_comparator), |
| 36 | cfh(_cfh), |
| 37 | invalidate_page_cache(_invalidate_page_cache), |
| 38 | last_fadvise_size(0) {} |
| 39 | |
| 40 | std::unique_ptr<WritableFileWriter> file_writer; |
| 41 | std::unique_ptr<TableBuilder> builder; |
| 42 | EnvOptions env_options; |
| 43 | ImmutableCFOptions ioptions; |
| 44 | MutableCFOptions mutable_cf_options; |
| 45 | Env::IOPriority io_priority; |
| 46 | InternalKeyComparator internal_comparator; |
| 47 | ExternalSstFileInfo file_info; |
| 48 | InternalKey ikey; |
| 49 | std::string column_family_name; |
| 50 | ColumnFamilyHandle* cfh; |
| 51 | // If true, We will give the OS a hint that this file pages is not needed |
| 52 | // everytime we write 1MB to the file. |
| 53 | bool invalidate_page_cache; |
| 54 | // The size of the file during the last time we called Fadvise to remove |
| 55 | // cached pages from page cache. |
| 56 | uint64_t last_fadvise_size; |
| 57 | Status Add(const Slice& user_key, const Slice& value, |
| 58 | const ValueType value_type) { |
| 59 | if (!builder) { |
| 60 | return Status::InvalidArgument("File is not opened" ); |
| 61 | } |
| 62 | |
| 63 | if (file_info.num_entries == 0) { |
| 64 | file_info.smallest_key.assign(user_key.data(), user_key.size()); |
| 65 | } else { |
| 66 | if (internal_comparator.user_comparator()->Compare( |
| 67 | user_key, file_info.largest_key) <= 0) { |
| 68 | // Make sure that keys are added in order |
| 69 | return Status::InvalidArgument("Keys must be added in order" ); |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | // TODO(tec) : For external SST files we could omit the seqno and type. |
| 74 | switch (value_type) { |
| 75 | case ValueType::kTypeValue: |
| 76 | ikey.Set(user_key, 0 /* Sequence Number */, |
| 77 | ValueType::kTypeValue /* Put */); |
| 78 | break; |
| 79 | case ValueType::kTypeMerge: |
| 80 | ikey.Set(user_key, 0 /* Sequence Number */, |
| 81 | ValueType::kTypeMerge /* Merge */); |
| 82 | break; |
| 83 | case ValueType::kTypeDeletion: |
| 84 | ikey.Set(user_key, 0 /* Sequence Number */, |
| 85 | ValueType::kTypeDeletion /* Delete */); |
| 86 | break; |
| 87 | default: |
| 88 | return Status::InvalidArgument("Value type is not supported" ); |
| 89 | } |
| 90 | builder->Add(ikey.Encode(), value); |
| 91 | |
| 92 | // update file info |
| 93 | file_info.num_entries++; |
| 94 | file_info.largest_key.assign(user_key.data(), user_key.size()); |
| 95 | file_info.file_size = builder->FileSize(); |
| 96 | |
| 97 | InvalidatePageCache(false /* closing */); |
| 98 | |
| 99 | return Status::OK(); |
| 100 | } |
| 101 | |
| 102 | void InvalidatePageCache(bool closing) { |
| 103 | if (invalidate_page_cache == false) { |
| 104 | // Fadvise disabled |
| 105 | return; |
| 106 | } |
| 107 | uint64_t bytes_since_last_fadvise = |
| 108 | builder->FileSize() - last_fadvise_size; |
| 109 | if (bytes_since_last_fadvise > kFadviseTrigger || closing) { |
| 110 | TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache" , |
| 111 | &(bytes_since_last_fadvise)); |
| 112 | // Tell the OS that we dont need this file in page cache |
| 113 | file_writer->InvalidateCache(0, 0); |
| 114 | last_fadvise_size = builder->FileSize(); |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | }; |
| 119 | |
| 120 | SstFileWriter::SstFileWriter(const EnvOptions& env_options, |
| 121 | const Options& options, |
| 122 | const Comparator* user_comparator, |
| 123 | ColumnFamilyHandle* column_family, |
| 124 | bool invalidate_page_cache, |
| 125 | Env::IOPriority io_priority) |
| 126 | : rep_(new Rep(env_options, options, io_priority, user_comparator, |
| 127 | column_family, invalidate_page_cache)) { |
| 128 | rep_->file_info.file_size = 0; |
| 129 | } |
| 130 | |
| 131 | SstFileWriter::~SstFileWriter() { |
| 132 | if (rep_->builder) { |
| 133 | // User did not call Finish() or Finish() failed, we need to |
| 134 | // abandon the builder. |
| 135 | rep_->builder->Abandon(); |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | Status SstFileWriter::Open(const std::string& file_path) { |
| 140 | Rep* r = rep_.get(); |
| 141 | Status s; |
| 142 | std::unique_ptr<WritableFile> sst_file; |
| 143 | s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); |
| 144 | if (!s.ok()) { |
| 145 | return s; |
| 146 | } |
| 147 | |
| 148 | sst_file->SetIOPriority(r->io_priority); |
| 149 | |
| 150 | CompressionType compression_type; |
| 151 | if (r->ioptions.bottommost_compression != kDisableCompressionOption) { |
| 152 | compression_type = r->ioptions.bottommost_compression; |
| 153 | } else if (!r->ioptions.compression_per_level.empty()) { |
| 154 | // Use the compression of the last level if we have per level compression |
| 155 | compression_type = *(r->ioptions.compression_per_level.rbegin()); |
| 156 | } else { |
| 157 | compression_type = r->mutable_cf_options.compression; |
| 158 | } |
| 159 | |
| 160 | std::vector<std::unique_ptr<IntTblPropCollectorFactory>> |
| 161 | int_tbl_prop_collector_factories; |
| 162 | |
| 163 | // SstFileWriter properties collector to add SstFileWriter version. |
| 164 | int_tbl_prop_collector_factories.emplace_back( |
| 165 | new SstFileWriterPropertiesCollectorFactory(2 /* version */, |
| 166 | 0 /* global_seqno*/)); |
| 167 | |
| 168 | // User collector factories |
| 169 | auto user_collector_factories = |
| 170 | r->ioptions.table_properties_collector_factories; |
| 171 | for (size_t i = 0; i < user_collector_factories.size(); i++) { |
| 172 | int_tbl_prop_collector_factories.emplace_back( |
| 173 | new UserKeyTablePropertiesCollectorFactory( |
| 174 | user_collector_factories[i])); |
| 175 | } |
| 176 | int unknown_level = -1; |
| 177 | uint32_t cf_id; |
| 178 | |
| 179 | if (r->cfh != nullptr) { |
| 180 | // user explicitly specified that this file will be ingested into cfh, |
| 181 | // we can persist this information in the file. |
| 182 | cf_id = r->cfh->GetID(); |
| 183 | r->column_family_name = r->cfh->GetName(); |
| 184 | } else { |
| 185 | r->column_family_name = "" ; |
| 186 | cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; |
| 187 | } |
| 188 | |
| 189 | TableBuilderOptions table_builder_options( |
| 190 | r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, |
| 191 | compression_type, r->ioptions.compression_opts, |
| 192 | nullptr /* compression_dict */, false /* skip_filters */, |
| 193 | r->column_family_name, unknown_level); |
| 194 | r->file_writer.reset( |
| 195 | new WritableFileWriter(std::move(sst_file), r->env_options)); |
| 196 | |
| 197 | // TODO(tec) : If table_factory is using compressed block cache, we will |
| 198 | // be adding the external sst file blocks into it, which is wasteful. |
| 199 | r->builder.reset(r->ioptions.table_factory->NewTableBuilder( |
| 200 | table_builder_options, cf_id, r->file_writer.get())); |
| 201 | |
| 202 | r->file_info.file_path = file_path; |
| 203 | r->file_info.file_size = 0; |
| 204 | r->file_info.num_entries = 0; |
| 205 | r->file_info.sequence_number = 0; |
| 206 | r->file_info.version = 2; |
| 207 | return s; |
| 208 | } |
| 209 | |
| 210 | Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { |
| 211 | return rep_->Add(user_key, value, ValueType::kTypeValue); |
| 212 | } |
| 213 | |
| 214 | Status SstFileWriter::Put(const Slice& user_key, const Slice& value) { |
| 215 | return rep_->Add(user_key, value, ValueType::kTypeValue); |
| 216 | } |
| 217 | |
| 218 | Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) { |
| 219 | return rep_->Add(user_key, value, ValueType::kTypeMerge); |
| 220 | } |
| 221 | |
| 222 | Status SstFileWriter::Delete(const Slice& user_key) { |
| 223 | return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion); |
| 224 | } |
| 225 | |
| 226 | Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { |
| 227 | Rep* r = rep_.get(); |
| 228 | if (!r->builder) { |
| 229 | return Status::InvalidArgument("File is not opened" ); |
| 230 | } |
| 231 | if (r->file_info.num_entries == 0) { |
| 232 | return Status::InvalidArgument("Cannot create sst file with no entries" ); |
| 233 | } |
| 234 | |
| 235 | Status s = r->builder->Finish(); |
| 236 | r->file_info.file_size = r->builder->FileSize(); |
| 237 | |
| 238 | if (s.ok()) { |
| 239 | s = r->file_writer->Sync(r->ioptions.use_fsync); |
| 240 | r->InvalidatePageCache(true /* closing */); |
| 241 | if (s.ok()) { |
| 242 | s = r->file_writer->Close(); |
| 243 | } |
| 244 | } |
| 245 | if (!s.ok()) { |
| 246 | r->ioptions.env->DeleteFile(r->file_info.file_path); |
| 247 | } |
| 248 | |
| 249 | if (file_info != nullptr) { |
| 250 | *file_info = r->file_info; |
| 251 | } |
| 252 | |
| 253 | r->builder.reset(); |
| 254 | return s; |
| 255 | } |
| 256 | |
| 257 | uint64_t SstFileWriter::FileSize() { |
| 258 | return rep_->file_info.file_size; |
| 259 | } |
| 260 | #endif // !ROCKSDB_LITE |
| 261 | |
| 262 | } // namespace rocksdb |
| 263 | |