1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "rocksdb/sst_file_writer.h"
7
8#include <vector>
9#include "db/dbformat.h"
10#include "rocksdb/table.h"
11#include "table/block_based_table_builder.h"
12#include "table/sst_file_writer_collectors.h"
13#include "util/file_reader_writer.h"
14#include "util/sync_point.h"
15
16namespace rocksdb {
17
18const std::string ExternalSstFilePropertyNames::kVersion =
19 "rocksdb.external_sst_file.version";
20const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
21 "rocksdb.external_sst_file.global_seqno";
22
23#ifndef ROCKSDB_LITE
24
25const size_t kFadviseTrigger = 1024 * 1024; // 1MB
26
27struct SstFileWriter::Rep {
28 Rep(const EnvOptions& _env_options, const Options& options,
29 Env::IOPriority _io_priority, const Comparator* _user_comparator,
30 ColumnFamilyHandle* _cfh, bool _invalidate_page_cache)
31 : env_options(_env_options),
32 ioptions(options),
33 mutable_cf_options(options),
34 io_priority(_io_priority),
35 internal_comparator(_user_comparator),
36 cfh(_cfh),
37 invalidate_page_cache(_invalidate_page_cache),
38 last_fadvise_size(0) {}
39
40 std::unique_ptr<WritableFileWriter> file_writer;
41 std::unique_ptr<TableBuilder> builder;
42 EnvOptions env_options;
43 ImmutableCFOptions ioptions;
44 MutableCFOptions mutable_cf_options;
45 Env::IOPriority io_priority;
46 InternalKeyComparator internal_comparator;
47 ExternalSstFileInfo file_info;
48 InternalKey ikey;
49 std::string column_family_name;
50 ColumnFamilyHandle* cfh;
51 // If true, We will give the OS a hint that this file pages is not needed
52 // everytime we write 1MB to the file.
53 bool invalidate_page_cache;
54 // The size of the file during the last time we called Fadvise to remove
55 // cached pages from page cache.
56 uint64_t last_fadvise_size;
57 Status Add(const Slice& user_key, const Slice& value,
58 const ValueType value_type) {
59 if (!builder) {
60 return Status::InvalidArgument("File is not opened");
61 }
62
63 if (file_info.num_entries == 0) {
64 file_info.smallest_key.assign(user_key.data(), user_key.size());
65 } else {
66 if (internal_comparator.user_comparator()->Compare(
67 user_key, file_info.largest_key) <= 0) {
68 // Make sure that keys are added in order
69 return Status::InvalidArgument("Keys must be added in order");
70 }
71 }
72
73 // TODO(tec) : For external SST files we could omit the seqno and type.
74 switch (value_type) {
75 case ValueType::kTypeValue:
76 ikey.Set(user_key, 0 /* Sequence Number */,
77 ValueType::kTypeValue /* Put */);
78 break;
79 case ValueType::kTypeMerge:
80 ikey.Set(user_key, 0 /* Sequence Number */,
81 ValueType::kTypeMerge /* Merge */);
82 break;
83 case ValueType::kTypeDeletion:
84 ikey.Set(user_key, 0 /* Sequence Number */,
85 ValueType::kTypeDeletion /* Delete */);
86 break;
87 default:
88 return Status::InvalidArgument("Value type is not supported");
89 }
90 builder->Add(ikey.Encode(), value);
91
92 // update file info
93 file_info.num_entries++;
94 file_info.largest_key.assign(user_key.data(), user_key.size());
95 file_info.file_size = builder->FileSize();
96
97 InvalidatePageCache(false /* closing */);
98
99 return Status::OK();
100 }
101
102 void InvalidatePageCache(bool closing) {
103 if (invalidate_page_cache == false) {
104 // Fadvise disabled
105 return;
106 }
107 uint64_t bytes_since_last_fadvise =
108 builder->FileSize() - last_fadvise_size;
109 if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
110 TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
111 &(bytes_since_last_fadvise));
112 // Tell the OS that we dont need this file in page cache
113 file_writer->InvalidateCache(0, 0);
114 last_fadvise_size = builder->FileSize();
115 }
116 }
117
118};
119
120SstFileWriter::SstFileWriter(const EnvOptions& env_options,
121 const Options& options,
122 const Comparator* user_comparator,
123 ColumnFamilyHandle* column_family,
124 bool invalidate_page_cache,
125 Env::IOPriority io_priority)
126 : rep_(new Rep(env_options, options, io_priority, user_comparator,
127 column_family, invalidate_page_cache)) {
128 rep_->file_info.file_size = 0;
129}
130
131SstFileWriter::~SstFileWriter() {
132 if (rep_->builder) {
133 // User did not call Finish() or Finish() failed, we need to
134 // abandon the builder.
135 rep_->builder->Abandon();
136 }
137}
138
139Status SstFileWriter::Open(const std::string& file_path) {
140 Rep* r = rep_.get();
141 Status s;
142 std::unique_ptr<WritableFile> sst_file;
143 s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
144 if (!s.ok()) {
145 return s;
146 }
147
148 sst_file->SetIOPriority(r->io_priority);
149
150 CompressionType compression_type;
151 if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
152 compression_type = r->ioptions.bottommost_compression;
153 } else if (!r->ioptions.compression_per_level.empty()) {
154 // Use the compression of the last level if we have per level compression
155 compression_type = *(r->ioptions.compression_per_level.rbegin());
156 } else {
157 compression_type = r->mutable_cf_options.compression;
158 }
159
160 std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
161 int_tbl_prop_collector_factories;
162
163 // SstFileWriter properties collector to add SstFileWriter version.
164 int_tbl_prop_collector_factories.emplace_back(
165 new SstFileWriterPropertiesCollectorFactory(2 /* version */,
166 0 /* global_seqno*/));
167
168 // User collector factories
169 auto user_collector_factories =
170 r->ioptions.table_properties_collector_factories;
171 for (size_t i = 0; i < user_collector_factories.size(); i++) {
172 int_tbl_prop_collector_factories.emplace_back(
173 new UserKeyTablePropertiesCollectorFactory(
174 user_collector_factories[i]));
175 }
176 int unknown_level = -1;
177 uint32_t cf_id;
178
179 if (r->cfh != nullptr) {
180 // user explicitly specified that this file will be ingested into cfh,
181 // we can persist this information in the file.
182 cf_id = r->cfh->GetID();
183 r->column_family_name = r->cfh->GetName();
184 } else {
185 r->column_family_name = "";
186 cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
187 }
188
189 TableBuilderOptions table_builder_options(
190 r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
191 compression_type, r->ioptions.compression_opts,
192 nullptr /* compression_dict */, false /* skip_filters */,
193 r->column_family_name, unknown_level);
194 r->file_writer.reset(
195 new WritableFileWriter(std::move(sst_file), r->env_options));
196
197 // TODO(tec) : If table_factory is using compressed block cache, we will
198 // be adding the external sst file blocks into it, which is wasteful.
199 r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
200 table_builder_options, cf_id, r->file_writer.get()));
201
202 r->file_info.file_path = file_path;
203 r->file_info.file_size = 0;
204 r->file_info.num_entries = 0;
205 r->file_info.sequence_number = 0;
206 r->file_info.version = 2;
207 return s;
208}
209
210Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
211 return rep_->Add(user_key, value, ValueType::kTypeValue);
212}
213
214Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
215 return rep_->Add(user_key, value, ValueType::kTypeValue);
216}
217
218Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
219 return rep_->Add(user_key, value, ValueType::kTypeMerge);
220}
221
222Status SstFileWriter::Delete(const Slice& user_key) {
223 return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
224}
225
226Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
227 Rep* r = rep_.get();
228 if (!r->builder) {
229 return Status::InvalidArgument("File is not opened");
230 }
231 if (r->file_info.num_entries == 0) {
232 return Status::InvalidArgument("Cannot create sst file with no entries");
233 }
234
235 Status s = r->builder->Finish();
236 r->file_info.file_size = r->builder->FileSize();
237
238 if (s.ok()) {
239 s = r->file_writer->Sync(r->ioptions.use_fsync);
240 r->InvalidatePageCache(true /* closing */);
241 if (s.ok()) {
242 s = r->file_writer->Close();
243 }
244 }
245 if (!s.ok()) {
246 r->ioptions.env->DeleteFile(r->file_info.file_path);
247 }
248
249 if (file_info != nullptr) {
250 *file_info = r->file_info;
251 }
252
253 r->builder.reset();
254 return s;
255}
256
257uint64_t SstFileWriter::FileSize() {
258 return rep_->file_info.file_size;
259}
260#endif // !ROCKSDB_LITE
261
262} // namespace rocksdb
263