1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/io/RecordIO.h>
18
19#include <sys/types.h>
20
21#include <folly/Exception.h>
22#include <folly/FileUtil.h>
23#include <folly/Memory.h>
24#include <folly/Portability.h>
25#include <folly/ScopeGuard.h>
26#include <folly/String.h>
27#include <folly/portability/Unistd.h>
28
29namespace folly {
30
31using namespace recordio_helpers;
32
33RecordIOWriter::RecordIOWriter(File file, uint32_t fileId)
34 : file_(std::move(file)),
35 fileId_(fileId),
36 writeLock_(file_, std::defer_lock),
37 filePos_(0) {
38 if (!writeLock_.try_lock()) {
39 throw std::runtime_error("RecordIOWriter: file locked by another process");
40 }
41
42 struct stat st;
43 checkUnixError(fstat(file_.fd(), &st), "fstat() failed");
44
45 filePos_ = st.st_size;
46}
47
48void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) {
49 size_t totalLength = prependHeader(buf, fileId_);
50 if (totalLength == 0) {
51 return; // nothing to do
52 }
53
54 DCHECK_EQ(buf->computeChainDataLength(), totalLength);
55
56 // We're going to write. Reserve space for ourselves.
57 off_t pos = filePos_.fetch_add(off_t(totalLength));
58
59#if FOLLY_HAVE_PWRITEV
60 auto iov = buf->getIov();
61 ssize_t bytes = pwritevFull(file_.fd(), iov.data(), iov.size(), pos);
62#else
63 buf->unshare();
64 buf->coalesce();
65 ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos);
66#endif
67
68 checkUnixError(bytes, "pwrite() failed");
69 DCHECK_EQ(size_t(bytes), totalLength);
70}
71
72RecordIOReader::RecordIOReader(File file, uint32_t fileId)
73 : map_(std::move(file)), fileId_(fileId) {}
74
75RecordIOReader::Iterator::Iterator(ByteRange range, uint32_t fileId, off_t pos)
76 : range_(range), fileId_(fileId), recordAndPos_(ByteRange(), 0) {
77 if (size_t(pos) >= range_.size()) {
78 // Note that this branch can execute if pos is negative as well.
79 recordAndPos_.second = off_t(-1);
80 range_.clear();
81 } else {
82 recordAndPos_.second = pos;
83 range_.advance(size_t(pos));
84 advanceToValid();
85 }
86}
87
88void RecordIOReader::Iterator::advanceToValid() {
89 ByteRange record = findRecord(range_, fileId_).record;
90 if (record.empty()) {
91 recordAndPos_ = std::make_pair(ByteRange(), off_t(-1));
92 range_.clear(); // at end
93 } else {
94 size_t skipped = size_t(record.begin() - range_.begin());
95 DCHECK_GE(skipped, headerSize());
96 skipped -= headerSize();
97 range_.advance(skipped);
98 recordAndPos_.first = record;
99 recordAndPos_.second += off_t(skipped);
100 }
101}
102
103namespace recordio_helpers {
104
105using recordio_detail::Header;
106
107namespace {
108
109constexpr uint32_t kHashSeed = 0xdeadbeef; // for mcurtiss
110
111uint32_t headerHash(const Header& header) {
112 return hash::SpookyHashV2::Hash32(
113 &header, offsetof(Header, headerHash), kHashSeed);
114}
115
116std::pair<size_t, std::size_t> dataLengthAndHash(const IOBuf* buf) {
117 size_t len = 0;
118 hash::SpookyHashV2 hasher;
119 hasher.Init(kHashSeed, kHashSeed);
120 for (auto br : *buf) {
121 len += br.size();
122 hasher.Update(br.data(), br.size());
123 }
124 uint64_t hash1;
125 uint64_t hash2;
126 hasher.Final(&hash1, &hash2);
127 if (len + headerSize() >= std::numeric_limits<uint32_t>::max()) {
128 throw std::invalid_argument("Record length must fit in 32 bits");
129 }
130 return std::make_pair(len, static_cast<std::size_t>(hash1));
131}
132
133std::size_t dataHash(ByteRange range) {
134 return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed);
135}
136
137} // namespace
138
139size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId) {
140 if (fileId == 0) {
141 throw std::invalid_argument("invalid file id");
142 }
143 auto lengthAndHash = dataLengthAndHash(buf.get());
144 if (lengthAndHash.first == 0) {
145 return 0; // empty, nothing to do, no zero-length records
146 }
147
148 // Prepend to the first buffer in the chain if we have room, otherwise
149 // prepend a new buffer.
150 if (buf->headroom() >= headerSize()) {
151 buf->unshareOne();
152 buf->prepend(headerSize());
153 } else {
154 auto b = IOBuf::create(headerSize());
155 b->append(headerSize());
156 b->appendChain(std::move(buf));
157 buf = std::move(b);
158 }
159 Header* header = reinterpret_cast<Header*>(buf->writableData());
160 memset(header, 0, sizeof(Header));
161 header->magic = Header::kMagic;
162 header->fileId = fileId;
163 header->dataLength = uint32_t(lengthAndHash.first);
164 header->dataHash = lengthAndHash.second;
165 header->headerHash = headerHash(*header);
166
167 return lengthAndHash.first + headerSize();
168}
169
170RecordInfo validateRecord(ByteRange range, uint32_t fileId) {
171 if (range.size() <= headerSize()) { // records may not be empty
172 return {0, {}};
173 }
174 const Header* header = reinterpret_cast<const Header*>(range.begin());
175 range.advance(sizeof(Header));
176 if (header->magic != Header::kMagic || header->version != 0 ||
177 header->hashFunction != 0 || header->flags != 0 ||
178 (fileId != 0 && header->fileId != fileId) ||
179 header->dataLength > range.size()) {
180 return {0, {}};
181 }
182 if (headerHash(*header) != header->headerHash) {
183 return {0, {}};
184 }
185 range.reset(range.begin(), header->dataLength);
186 if (dataHash(range) != header->dataHash) {
187 return {0, {}};
188 }
189 return {header->fileId, range};
190}
191
192RecordInfo
193findRecord(ByteRange searchRange, ByteRange wholeRange, uint32_t fileId) {
194 static const uint32_t magic = Header::kMagic;
195 static const ByteRange magicRange(
196 reinterpret_cast<const uint8_t*>(&magic), sizeof(magic));
197
198 DCHECK_GE(searchRange.begin(), wholeRange.begin());
199 DCHECK_LE(searchRange.end(), wholeRange.end());
200
201 const uint8_t* start = searchRange.begin();
202 const uint8_t* end =
203 std::min(searchRange.end(), wholeRange.end() - sizeof(Header));
204 // end-1: the last place where a Header could start
205 while (start < end) {
206 auto p = ByteRange(start, end + sizeof(magic)).find(magicRange);
207 if (p == ByteRange::npos) {
208 break;
209 }
210
211 start += p;
212 auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId);
213 if (!r.record.empty()) {
214 return r;
215 }
216
217 // No repeated prefix in magic, so we can do better than start++
218 start += sizeof(magic);
219 }
220
221 return {0, {}};
222}
223
224} // namespace recordio_helpers
225
226} // namespace folly
227