1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/io/RecordIO.h> |
18 | |
19 | #include <sys/types.h> |
20 | |
21 | #include <folly/Exception.h> |
22 | #include <folly/FileUtil.h> |
23 | #include <folly/Memory.h> |
24 | #include <folly/Portability.h> |
25 | #include <folly/ScopeGuard.h> |
26 | #include <folly/String.h> |
27 | #include <folly/portability/Unistd.h> |
28 | |
29 | namespace folly { |
30 | |
31 | using namespace recordio_helpers; |
32 | |
33 | RecordIOWriter::RecordIOWriter(File file, uint32_t fileId) |
34 | : file_(std::move(file)), |
35 | fileId_(fileId), |
36 | writeLock_(file_, std::defer_lock), |
37 | filePos_(0) { |
38 | if (!writeLock_.try_lock()) { |
39 | throw std::runtime_error("RecordIOWriter: file locked by another process" ); |
40 | } |
41 | |
42 | struct stat st; |
43 | checkUnixError(fstat(file_.fd(), &st), "fstat() failed" ); |
44 | |
45 | filePos_ = st.st_size; |
46 | } |
47 | |
48 | void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) { |
49 | size_t totalLength = prependHeader(buf, fileId_); |
50 | if (totalLength == 0) { |
51 | return; // nothing to do |
52 | } |
53 | |
54 | DCHECK_EQ(buf->computeChainDataLength(), totalLength); |
55 | |
56 | // We're going to write. Reserve space for ourselves. |
57 | off_t pos = filePos_.fetch_add(off_t(totalLength)); |
58 | |
59 | #if FOLLY_HAVE_PWRITEV |
60 | auto iov = buf->getIov(); |
61 | ssize_t bytes = pwritevFull(file_.fd(), iov.data(), iov.size(), pos); |
62 | #else |
63 | buf->unshare(); |
64 | buf->coalesce(); |
65 | ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos); |
66 | #endif |
67 | |
68 | checkUnixError(bytes, "pwrite() failed" ); |
69 | DCHECK_EQ(size_t(bytes), totalLength); |
70 | } |
71 | |
72 | RecordIOReader::RecordIOReader(File file, uint32_t fileId) |
73 | : map_(std::move(file)), fileId_(fileId) {} |
74 | |
75 | RecordIOReader::Iterator::Iterator(ByteRange range, uint32_t fileId, off_t pos) |
76 | : range_(range), fileId_(fileId), recordAndPos_(ByteRange(), 0) { |
77 | if (size_t(pos) >= range_.size()) { |
78 | // Note that this branch can execute if pos is negative as well. |
79 | recordAndPos_.second = off_t(-1); |
80 | range_.clear(); |
81 | } else { |
82 | recordAndPos_.second = pos; |
83 | range_.advance(size_t(pos)); |
84 | advanceToValid(); |
85 | } |
86 | } |
87 | |
88 | void RecordIOReader::Iterator::advanceToValid() { |
89 | ByteRange record = findRecord(range_, fileId_).record; |
90 | if (record.empty()) { |
91 | recordAndPos_ = std::make_pair(ByteRange(), off_t(-1)); |
92 | range_.clear(); // at end |
93 | } else { |
94 | size_t skipped = size_t(record.begin() - range_.begin()); |
95 | DCHECK_GE(skipped, headerSize()); |
96 | skipped -= headerSize(); |
97 | range_.advance(skipped); |
98 | recordAndPos_.first = record; |
99 | recordAndPos_.second += off_t(skipped); |
100 | } |
101 | } |
102 | |
103 | namespace recordio_helpers { |
104 | |
105 | using recordio_detail::Header; |
106 | |
107 | namespace { |
108 | |
109 | constexpr uint32_t kHashSeed = 0xdeadbeef; // for mcurtiss |
110 | |
111 | uint32_t (const Header& ) { |
112 | return hash::SpookyHashV2::Hash32( |
113 | &header, offsetof(Header, headerHash), kHashSeed); |
114 | } |
115 | |
116 | std::pair<size_t, std::size_t> dataLengthAndHash(const IOBuf* buf) { |
117 | size_t len = 0; |
118 | hash::SpookyHashV2 hasher; |
119 | hasher.Init(kHashSeed, kHashSeed); |
120 | for (auto br : *buf) { |
121 | len += br.size(); |
122 | hasher.Update(br.data(), br.size()); |
123 | } |
124 | uint64_t hash1; |
125 | uint64_t hash2; |
126 | hasher.Final(&hash1, &hash2); |
127 | if (len + headerSize() >= std::numeric_limits<uint32_t>::max()) { |
128 | throw std::invalid_argument("Record length must fit in 32 bits" ); |
129 | } |
130 | return std::make_pair(len, static_cast<std::size_t>(hash1)); |
131 | } |
132 | |
133 | std::size_t dataHash(ByteRange range) { |
134 | return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed); |
135 | } |
136 | |
137 | } // namespace |
138 | |
139 | size_t (std::unique_ptr<IOBuf>& buf, uint32_t fileId) { |
140 | if (fileId == 0) { |
141 | throw std::invalid_argument("invalid file id" ); |
142 | } |
143 | auto lengthAndHash = dataLengthAndHash(buf.get()); |
144 | if (lengthAndHash.first == 0) { |
145 | return 0; // empty, nothing to do, no zero-length records |
146 | } |
147 | |
148 | // Prepend to the first buffer in the chain if we have room, otherwise |
149 | // prepend a new buffer. |
150 | if (buf->headroom() >= headerSize()) { |
151 | buf->unshareOne(); |
152 | buf->prepend(headerSize()); |
153 | } else { |
154 | auto b = IOBuf::create(headerSize()); |
155 | b->append(headerSize()); |
156 | b->appendChain(std::move(buf)); |
157 | buf = std::move(b); |
158 | } |
159 | Header* = reinterpret_cast<Header*>(buf->writableData()); |
160 | memset(header, 0, sizeof(Header)); |
161 | header->magic = Header::kMagic; |
162 | header->fileId = fileId; |
163 | header->dataLength = uint32_t(lengthAndHash.first); |
164 | header->dataHash = lengthAndHash.second; |
165 | header->headerHash = headerHash(*header); |
166 | |
167 | return lengthAndHash.first + headerSize(); |
168 | } |
169 | |
170 | RecordInfo validateRecord(ByteRange range, uint32_t fileId) { |
171 | if (range.size() <= headerSize()) { // records may not be empty |
172 | return {0, {}}; |
173 | } |
174 | const Header* = reinterpret_cast<const Header*>(range.begin()); |
175 | range.advance(sizeof(Header)); |
176 | if (header->magic != Header::kMagic || header->version != 0 || |
177 | header->hashFunction != 0 || header->flags != 0 || |
178 | (fileId != 0 && header->fileId != fileId) || |
179 | header->dataLength > range.size()) { |
180 | return {0, {}}; |
181 | } |
182 | if (headerHash(*header) != header->headerHash) { |
183 | return {0, {}}; |
184 | } |
185 | range.reset(range.begin(), header->dataLength); |
186 | if (dataHash(range) != header->dataHash) { |
187 | return {0, {}}; |
188 | } |
189 | return {header->fileId, range}; |
190 | } |
191 | |
192 | RecordInfo |
193 | findRecord(ByteRange searchRange, ByteRange wholeRange, uint32_t fileId) { |
194 | static const uint32_t magic = Header::kMagic; |
195 | static const ByteRange magicRange( |
196 | reinterpret_cast<const uint8_t*>(&magic), sizeof(magic)); |
197 | |
198 | DCHECK_GE(searchRange.begin(), wholeRange.begin()); |
199 | DCHECK_LE(searchRange.end(), wholeRange.end()); |
200 | |
201 | const uint8_t* start = searchRange.begin(); |
202 | const uint8_t* end = |
203 | std::min(searchRange.end(), wholeRange.end() - sizeof(Header)); |
204 | // end-1: the last place where a Header could start |
205 | while (start < end) { |
206 | auto p = ByteRange(start, end + sizeof(magic)).find(magicRange); |
207 | if (p == ByteRange::npos) { |
208 | break; |
209 | } |
210 | |
211 | start += p; |
212 | auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId); |
213 | if (!r.record.empty()) { |
214 | return r; |
215 | } |
216 | |
217 | // No repeated prefix in magic, so we can do better than start++ |
218 | start += sizeof(magic); |
219 | } |
220 | |
221 | return {0, {}}; |
222 | } |
223 | |
224 | } // namespace recordio_helpers |
225 | |
226 | } // namespace folly |
227 | |