1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/io/IOBufQueue.h>
18
19#include <string.h>
20
21#include <stdexcept>
22
23using std::make_pair;
24using std::pair;
25using std::unique_ptr;
26
27namespace {
28
29using folly::IOBuf;
30
31const size_t MIN_ALLOC_SIZE = 2000;
32const size_t MAX_ALLOC_SIZE = 8000;
33const size_t MAX_PACK_COPY = 4096;
34
35/**
36 * Convenience function to append chain src to chain dst.
37 */
38void appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
39 if (dst == nullptr) {
40 dst = std::move(src);
41 } else {
42 IOBuf* tail = dst->prev();
43 if (pack) {
44 // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
45 // reduce wastage (the tail's tailroom and the head's headroom) when
46 // joining two IOBufQueues together.
47 size_t copyRemaining = MAX_PACK_COPY;
48 std::size_t n;
49 while (src && (n = src->length()) < copyRemaining &&
50 n < tail->tailroom() && n > 0) {
51 memcpy(tail->writableTail(), src->data(), n);
52 tail->append(n);
53 copyRemaining -= n;
54 src = src->pop();
55 }
56 }
57 if (src) {
58 tail->appendChain(std::move(src));
59 }
60 }
61}
62
63} // namespace
64
65namespace folly {
66
67IOBufQueue::IOBufQueue(const Options& options)
68 : options_(options), cachePtr_(&localCache_) {
69 localCache_.attached = true;
70}
71
72IOBufQueue::~IOBufQueue() {
73 clearWritableRangeCache();
74}
75
76IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept
77 : options_(other.options_), cachePtr_(&localCache_) {
78 other.clearWritableRangeCache();
79 head_ = std::move(other.head_);
80 chainLength_ = other.chainLength_;
81
82 tailStart_ = other.tailStart_;
83 localCache_.cachedRange = other.localCache_.cachedRange;
84 localCache_.attached = true;
85
86 other.chainLength_ = 0;
87 other.tailStart_ = nullptr;
88 other.localCache_.cachedRange = {nullptr, nullptr};
89}
90
91IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
92 if (&other != this) {
93 other.clearWritableRangeCache();
94 clearWritableRangeCache();
95
96 options_ = other.options_;
97 head_ = std::move(other.head_);
98 chainLength_ = other.chainLength_;
99
100 tailStart_ = other.tailStart_;
101 localCache_.cachedRange = other.localCache_.cachedRange;
102 localCache_.attached = true;
103
104 other.chainLength_ = 0;
105 other.tailStart_ = nullptr;
106 other.localCache_.cachedRange = {nullptr, nullptr};
107 }
108 return *this;
109}
110
111std::pair<void*, std::size_t> IOBufQueue::headroom() {
112 // Note, headroom is independent from the tail, so we don't need to flush the
113 // cache.
114 if (head_) {
115 return std::make_pair(head_->writableBuffer(), head_->headroom());
116 } else {
117 return std::make_pair(nullptr, 0);
118 }
119}
120
121void IOBufQueue::markPrepended(std::size_t n) {
122 if (n == 0) {
123 return;
124 }
125 // Note, headroom is independent from the tail, so we don't need to flush the
126 // cache.
127 assert(head_);
128 head_->prepend(n);
129 chainLength_ += n;
130}
131
132void IOBufQueue::prepend(const void* buf, std::size_t n) {
133 // We're not touching the tail, so we don't need to flush the cache.
134 auto hroom = head_->headroom();
135 if (!head_ || hroom < n) {
136 throw std::overflow_error("Not enough room to prepend");
137 }
138 memcpy(head_->writableBuffer() + hroom - n, buf, n);
139 head_->prepend(n);
140 chainLength_ += n;
141}
142
143void IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
144 if (!buf) {
145 return;
146 }
147 auto guard = updateGuard();
148 if (options_.cacheChainLength) {
149 chainLength_ += buf->computeChainDataLength();
150 }
151 appendToChain(head_, std::move(buf), pack);
152}
153
154void IOBufQueue::append(IOBufQueue& other, bool pack) {
155 if (!other.head_) {
156 return;
157 }
158 // We're going to chain other, thus we need to grab both guards.
159 auto otherGuard = other.updateGuard();
160 auto guard = updateGuard();
161 if (options_.cacheChainLength) {
162 if (other.options_.cacheChainLength) {
163 chainLength_ += other.chainLength_;
164 } else {
165 chainLength_ += other.head_->computeChainDataLength();
166 }
167 }
168 appendToChain(head_, std::move(other.head_), pack);
169 other.chainLength_ = 0;
170}
171
172void IOBufQueue::append(const void* buf, size_t len) {
173 auto guard = updateGuard();
174 auto src = static_cast<const uint8_t*>(buf);
175 while (len != 0) {
176 if ((head_ == nullptr) || head_->prev()->isSharedOne() ||
177 (head_->prev()->tailroom() == 0)) {
178 appendToChain(
179 head_,
180 IOBuf::create(
181 std::max(MIN_ALLOC_SIZE, std::min(len, MAX_ALLOC_SIZE))),
182 false);
183 }
184 IOBuf* last = head_->prev();
185 std::size_t copyLen = std::min(len, (size_t)last->tailroom());
186 memcpy(last->writableTail(), src, copyLen);
187 src += copyLen;
188 last->append(copyLen);
189 chainLength_ += copyLen;
190 len -= copyLen;
191 }
192}
193
194void IOBufQueue::wrapBuffer(
195 const void* buf,
196 size_t len,
197 std::size_t blockSize) {
198 auto src = static_cast<const uint8_t*>(buf);
199 while (len != 0) {
200 size_t n = std::min(len, size_t(blockSize));
201 append(IOBuf::wrapBuffer(src, n));
202 src += n;
203 len -= n;
204 }
205}
206
207pair<void*, std::size_t> IOBufQueue::preallocateSlow(
208 std::size_t min,
209 std::size_t newAllocationSize,
210 std::size_t max) {
211 // Avoid grabbing update guard, since we're manually setting the cache ptrs.
212 flushCache();
213 // Allocate a new buffer of the requested max size.
214 unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
215
216 tailStart_ = newBuf->writableTail();
217 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
218 tailStart_, tailStart_ + newBuf->tailroom());
219 appendToChain(head_, std::move(newBuf), false);
220 return make_pair(writableTail(), std::min<std::size_t>(max, tailroom()));
221}
222
223unique_ptr<IOBuf> IOBufQueue::split(size_t n, bool throwOnUnderflow) {
224 auto guard = updateGuard();
225 unique_ptr<IOBuf> result;
226 while (n != 0) {
227 if (head_ == nullptr) {
228 if (throwOnUnderflow) {
229 throw std::underflow_error(
230 "Attempt to remove more bytes than are present in IOBufQueue");
231 } else {
232 break;
233 }
234 } else if (head_->length() <= n) {
235 n -= head_->length();
236 chainLength_ -= head_->length();
237 unique_ptr<IOBuf> remainder = head_->pop();
238 appendToChain(result, std::move(head_), false);
239 head_ = std::move(remainder);
240 } else {
241 unique_ptr<IOBuf> clone = head_->cloneOne();
242 clone->trimEnd(clone->length() - n);
243 appendToChain(result, std::move(clone), false);
244 head_->trimStart(n);
245 chainLength_ -= n;
246 break;
247 }
248 }
249 if (UNLIKELY(result == nullptr)) {
250 return IOBuf::create(0);
251 }
252 return result;
253}
254
255void IOBufQueue::trimStart(size_t amount) {
256 auto trimmed = trimStartAtMost(amount);
257 if (trimmed != amount) {
258 throw std::underflow_error(
259 "Attempt to trim more bytes than are present in IOBufQueue");
260 }
261}
262
263size_t IOBufQueue::trimStartAtMost(size_t amount) {
264 auto guard = updateGuard();
265 auto original = amount;
266 while (amount > 0) {
267 if (!head_) {
268 break;
269 }
270 if (head_->length() > amount) {
271 head_->trimStart(amount);
272 chainLength_ -= amount;
273 amount = 0;
274 break;
275 }
276 amount -= head_->length();
277 chainLength_ -= head_->length();
278 head_ = head_->pop();
279 }
280 return original - amount;
281}
282
283void IOBufQueue::trimEnd(size_t amount) {
284 auto trimmed = trimEndAtMost(amount);
285 if (trimmed != amount) {
286 throw std::underflow_error(
287 "Attempt to trim more bytes than are present in IOBufQueue");
288 }
289}
290
291size_t IOBufQueue::trimEndAtMost(size_t amount) {
292 auto guard = updateGuard();
293 auto original = amount;
294 while (amount > 0) {
295 if (!head_) {
296 break;
297 }
298 if (head_->prev()->length() > amount) {
299 head_->prev()->trimEnd(amount);
300 chainLength_ -= amount;
301 amount = 0;
302 break;
303 }
304 amount -= head_->prev()->length();
305 chainLength_ -= head_->prev()->length();
306
307 if (head_->isChained()) {
308 head_->prev()->unlink();
309 } else {
310 head_.reset();
311 }
312 }
313 return original - amount;
314}
315
316std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
317 auto guard = updateGuard();
318 if (!head_) {
319 return nullptr;
320 }
321 chainLength_ -= head_->length();
322 std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
323 head_ = retBuf->pop();
324 return retBuf;
325}
326
327void IOBufQueue::clear() {
328 if (!head_) {
329 return;
330 }
331 auto guard = updateGuard();
332 IOBuf* buf = head_.get();
333 do {
334 buf->clear();
335 buf = buf->next();
336 } while (buf != head_.get());
337 chainLength_ = 0;
338}
339
340void IOBufQueue::appendToString(std::string& out) const {
341 if (!head_) {
342 return;
343 }
344 auto len = options_.cacheChainLength
345 ? chainLength_ + (cachePtr_->cachedRange.first - tailStart_)
346 : head_->computeChainDataLength() +
347 (cachePtr_->cachedRange.first - tailStart_);
348 out.reserve(out.size() + len);
349
350 for (auto range : *head_) {
351 out.append(reinterpret_cast<const char*>(range.data()), range.size());
352 }
353
354 if (tailStart_ != cachePtr_->cachedRange.first) {
355 out.append(
356 reinterpret_cast<const char*>(tailStart_),
357 cachePtr_->cachedRange.first - tailStart_);
358 }
359}
360
361void IOBufQueue::gather(std::size_t maxLength) {
362 auto guard = updateGuard();
363 if (head_ != nullptr) {
364 head_->gather(maxLength);
365 }
366}
367
368} // namespace folly
369