1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/ScopeGuard.h>
20#include <folly/io/IOBuf.h>
21
22#include <stdexcept>
23#include <string>
24
25namespace folly {
26
27/**
28 * An IOBufQueue encapsulates a chain of IOBufs and provides
29 * convenience functions to append data to the back of the chain
30 * and remove data from the front.
31 *
32 * You may also prepend data into the headroom of the first buffer in the
33 * chain, if any.
34 */
35class IOBufQueue {
36 private:
37 /**
38 * This guard should be taken by any method that intends to do any changes
39 * to in data_ (e.g. appending to it).
40 *
41 * It flushes the writable tail cache and refills it on destruction.
42 */
43 auto updateGuard() {
44 flushCache();
45 return folly::makeGuard([this] { updateWritableTailCache(); });
46 }
47
48 struct WritableRangeCacheData {
49 std::pair<uint8_t*, uint8_t*> cachedRange;
50 bool attached{false};
51
52 WritableRangeCacheData() = default;
53
54 WritableRangeCacheData(WritableRangeCacheData&& other)
55 : cachedRange(other.cachedRange), attached(other.attached) {
56 other.cachedRange = {};
57 other.attached = false;
58 }
59 WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
60 cachedRange = other.cachedRange;
61 attached = other.attached;
62
63 other.cachedRange = {};
64 other.attached = false;
65
66 return *this;
67 }
68
69 WritableRangeCacheData(const WritableRangeCacheData&) = delete;
70 WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
71 };
72
73 public:
74 struct Options {
75 Options() : cacheChainLength(false) {}
76 bool cacheChainLength;
77 };
78
79 /**
80 * Commonly used Options, currently the only possible value other than
81 * the default.
82 */
83 static Options cacheChainLength() {
84 Options options;
85 options.cacheChainLength = true;
86 return options;
87 }
88
89 /**
90 * WritableRangeCache represents a cache of current writable tail and provides
91 * cheap and simple interface to append to it that avoids paying the cost of
92 * preallocate/postallocate pair (i.e. indirections and checks).
93 *
94 * The cache is flushed on destruction/copy/move and on non-const accesses to
95 * the underlying IOBufQueue.
96 *
97 * Note: there can be only one active cache for a given IOBufQueue, i.e. when
98 * you fill a cache object it automatically invalidates other
99 * cache (if any).
100 */
101 class WritableRangeCache {
102 public:
103 explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
104 if (queue_) {
105 fillCache();
106 }
107 }
108
109 /**
110 * Move constructor/assignment can move the cached range, but must update
111 * the reference in IOBufQueue.
112 */
113 WritableRangeCache(WritableRangeCache&& other)
114 : data_(std::move(other.data_)), queue_(other.queue_) {
115 if (data_.attached) {
116 queue_->updateCacheRef(data_);
117 }
118 }
119 WritableRangeCache& operator=(WritableRangeCache&& other) {
120 if (data_.attached) {
121 queue_->clearWritableRangeCache();
122 }
123
124 data_ = std::move(other.data_);
125 queue_ = other.queue_;
126
127 if (data_.attached) {
128 queue_->updateCacheRef(data_);
129 }
130
131 return *this;
132 }
133
134 /**
135 * Copy constructor/assignment cannot copy the cached range.
136 */
137 WritableRangeCache(const WritableRangeCache& other)
138 : queue_(other.queue_) {}
139 WritableRangeCache& operator=(const WritableRangeCache& other) {
140 if (data_.attached) {
141 queue_->clearWritableRangeCache();
142 }
143
144 queue_ = other.queue_;
145
146 return *this;
147 }
148
149 ~WritableRangeCache() {
150 if (data_.attached) {
151 queue_->clearWritableRangeCache();
152 }
153 }
154
155 /**
156 * Reset the underlying IOBufQueue, will flush current cache if present.
157 */
158 void reset(IOBufQueue* q) {
159 if (data_.attached) {
160 queue_->clearWritableRangeCache();
161 }
162
163 queue_ = q;
164
165 if (queue_) {
166 fillCache();
167 }
168 }
169
170 /**
171 * Get a pointer to the underlying IOBufQueue object.
172 */
173 IOBufQueue* queue() {
174 return queue_;
175 }
176
177 /**
178 * Return a pointer to the start of cached writable tail.
179 *
180 * Note: doesn't populate cache.
181 */
182 uint8_t* writableData() {
183 dcheckIntegrity();
184 return data_.cachedRange.first;
185 }
186
187 /**
188 * Return a length of cached writable tail.
189 *
190 * Note: doesn't populate cache.
191 */
192 size_t length() {
193 dcheckIntegrity();
194 return data_.cachedRange.second - data_.cachedRange.first;
195 }
196
197 /**
198 * Mark n bytes as occupied (e.g. postallocate).
199 */
200 void append(size_t n) {
201 dcheckIntegrity();
202 // This can happen only if somebody is misusing the interface.
203 // E.g. calling append after touching IOBufQueue or without checking
204 // the length().
205 if (LIKELY(data_.cachedRange.first != nullptr)) {
206 DCHECK_LE(n, length());
207 data_.cachedRange.first += n;
208 } else {
209 appendSlow(n);
210 }
211 }
212
213 /**
214 * Same as append(n), but avoids checking if there is a cache.
215 * The caller must guarantee that the cache is set (e.g. the caller just
216 * called fillCache or checked that it's not empty).
217 */
218 void appendUnsafe(size_t n) {
219 data_.cachedRange.first += n;
220 }
221
222 /**
223 * Fill the cache of writable tail from the underlying IOBufQueue.
224 */
225 void fillCache() {
226 queue_->fillWritableRangeCache(data_);
227 }
228
229 private:
230 WritableRangeCacheData data_;
231 IOBufQueue* queue_;
232
233 FOLLY_NOINLINE void appendSlow(size_t n) {
234 queue_->postallocate(n);
235 }
236
237 void dcheckIntegrity() {
238 // Tail start should always be less than tail end.
239 DCHECK_LE(
240 (void*)data_.cachedRange.first, (void*)data_.cachedRange.second);
241 DCHECK(
242 data_.cachedRange.first != nullptr ||
243 data_.cachedRange.second == nullptr);
244
245 // Cached range should be always empty if the cache is not attached.
246 DCHECK(
247 data_.attached ||
248 (data_.cachedRange.first == nullptr &&
249 data_.cachedRange.second == nullptr));
250
251 // We cannot be in attached state if the queue_ is not set.
252 DCHECK(queue_ != nullptr || !data_.attached);
253
254 // If we're attached and the cache is not empty, then it should coincide
255 // with the tail buffer.
256 DCHECK(
257 !data_.attached || data_.cachedRange.first == nullptr ||
258 (queue_->head_ != nullptr &&
259 data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
260 data_.cachedRange.second ==
261 queue_->head_->prev()->writableTail() +
262 queue_->head_->prev()->tailroom()));
263 }
264 };
265
266 explicit IOBufQueue(const Options& options = Options());
267 ~IOBufQueue();
268
269 /**
270 * Return a space to prepend bytes and the amount of headroom available.
271 */
272 std::pair<void*, std::size_t> headroom();
273
274 /**
275 * Indicate that n bytes from the headroom have been used.
276 */
277 void markPrepended(std::size_t n);
278
279 /**
280 * Prepend an existing range; throws std::overflow_error if not enough
281 * room.
282 */
283 void prepend(const void* buf, std::size_t n);
284
285 /**
286 * Add a buffer or buffer chain to the end of this queue. The
287 * queue takes ownership of buf.
288 *
289 * If pack is true, we try to reduce wastage at the end of this queue
290 * by copying some data from the first buffers in the buf chain (and
291 * releasing the buffers), if possible. If pack is false, we leave
292 * the chain topology unchanged.
293 */
294 void append(std::unique_ptr<folly::IOBuf>&& buf, bool pack = false);
295
296 /**
297 * Add a queue to the end of this queue. The queue takes ownership of
298 * all buffers from the other queue.
299 */
300 void append(IOBufQueue& other, bool pack = false);
301 void append(IOBufQueue&& other, bool pack = false) {
302 append(other, pack); // call lvalue reference overload, above
303 }
304
305 /**
306 * Copy len bytes, starting at buf, to the end of this queue.
307 * The caller retains ownership of the source data.
308 */
309 void append(const void* buf, size_t len);
310
311 /**
312 * Copy a string to the end of this queue.
313 * The caller retains ownership of the source data.
314 */
315 void append(StringPiece sp) {
316 append(sp.data(), sp.size());
317 }
318
319 /**
320 * Append a chain of IOBuf objects that point to consecutive regions
321 * within buf.
322 *
323 * Just like IOBuf::wrapBuffer, this should only be used when the caller
324 * knows ahead of time and can ensure that all IOBuf objects that will point
325 * to this buffer will be destroyed before the buffer itself is destroyed;
326 * all other caveats from wrapBuffer also apply.
327 *
328 * Every buffer except for the last will wrap exactly blockSize bytes.
329 * Importantly, this method may be used to wrap buffers larger than 4GB.
330 */
331 void wrapBuffer(
332 const void* buf,
333 size_t len,
334 std::size_t blockSize = (1U << 31)); // default block size: 2GB
335
336 /**
337 * Obtain a writable block of contiguous bytes at the end of this
338 * queue, allocating more space if necessary. The amount of space
339 * reserved will be at least min. If min contiguous space is not
340 * available at the end of the queue, and IOBuf with size newAllocationSize
341 * is appended to the chain and returned. The actual available space
342 * may be larger than newAllocationSize, but will be truncated to max,
343 * if specified.
344 *
345 * If the caller subsequently writes anything into the returned space,
346 * it must call the postallocate() method.
347 *
348 * @return The starting address of the block and the length in bytes.
349 *
350 * @note The point of the preallocate()/postallocate() mechanism is
351 * to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback
352 * that request a buffer from the application and then, in a later
353 * callback, tell the application how much of the buffer they've
354 * filled with data.
355 */
356 std::pair<void*, std::size_t> preallocate(
357 std::size_t min,
358 std::size_t newAllocationSize,
359 std::size_t max = std::numeric_limits<std::size_t>::max()) {
360 dcheckCacheIntegrity();
361
362 if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
363 return std::make_pair(
364 writableTail(), std::min<std::size_t>(max, tailroom()));
365 }
366
367 return preallocateSlow(min, newAllocationSize, max);
368 }
369
370 /**
371 * Tell the queue that the caller has written data into the first n
372 * bytes provided by the previous preallocate() call.
373 *
374 * @note n should be less than or equal to the size returned by
375 * preallocate(). If n is zero, the caller may skip the call
376 * to postallocate(). If n is nonzero, the caller must not
377 * invoke any other non-const methods on this IOBufQueue between
378 * the call to preallocate and the call to postallocate().
379 */
380 void postallocate(std::size_t n) {
381 dcheckCacheIntegrity();
382 DCHECK_LE(
383 (void*)(cachePtr_->cachedRange.first + n),
384 (void*)cachePtr_->cachedRange.second);
385 cachePtr_->cachedRange.first += n;
386 }
387
388 /**
389 * Obtain a writable block of n contiguous bytes, allocating more space
390 * if necessary, and mark it as used. The caller can fill it later.
391 */
392 void* allocate(std::size_t n) {
393 void* p = preallocate(n, n).first;
394 postallocate(n);
395 return p;
396 }
397
398 void* writableTail() const {
399 dcheckCacheIntegrity();
400 return cachePtr_->cachedRange.first;
401 }
402
403 size_t tailroom() const {
404 dcheckCacheIntegrity();
405 return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
406 }
407
408 /**
409 * Split off the first n bytes of the queue into a separate IOBuf chain,
410 * and transfer ownership of the new chain to the caller. The IOBufQueue
411 * retains ownership of everything after the split point.
412 *
413 * @warning If the split point lies in the middle of some IOBuf within
414 * the chain, this function may, as an implementation detail,
415 * clone that IOBuf.
416 *
417 * @throws std::underflow_error if n exceeds the number of bytes
418 * in the queue.
419 */
420 std::unique_ptr<folly::IOBuf> split(size_t n) {
421 return split(n, true);
422 }
423
424 /**
425 * Similar to split, but will return the entire queue instead of throwing
426 * if n exceeds the number of bytes in the queue.
427 */
428 std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
429 return split(n, false);
430 }
431
432 /**
433 * Similar to IOBuf::trimStart, but works on the whole queue. Will
434 * pop off buffers that have been completely trimmed.
435 */
436 void trimStart(size_t amount);
437
438 /**
439 * Similar to trimStart, but will trim at most amount bytes and returns
440 * the number of bytes trimmed.
441 */
442 size_t trimStartAtMost(size_t amount);
443
444 /**
445 * Similar to IOBuf::trimEnd, but works on the whole queue. Will
446 * pop off buffers that have been completely trimmed.
447 */
448 void trimEnd(size_t amount);
449
450 /**
451 * Similar to trimEnd, but will trim at most amount bytes and returns
452 * the number of bytes trimmed.
453 */
454 size_t trimEndAtMost(size_t amount);
455
456 /**
457 * Transfer ownership of the queue's entire IOBuf chain to the caller.
458 */
459 std::unique_ptr<folly::IOBuf> move() {
460 auto guard = updateGuard();
461 std::unique_ptr<folly::IOBuf> res = std::move(head_);
462 chainLength_ = 0;
463 return res;
464 }
465
466 /**
467 * Access the front IOBuf.
468 *
469 * Note: caller will see the current state of the chain, but may not see
470 * future updates immediately, due to the presence of a tail cache.
471 * Note: the caller may potentially clone the chain, thus marking all buffers
472 * as shared. We may still continue writing to the tail of the last
473 * IOBuf without checking if it's shared, but this is fine, since the
474 * cloned IOBufs won't reference that data.
475 */
476 const folly::IOBuf* front() const {
477 flushCache();
478 return head_.get();
479 }
480
481 /**
482 * returns the first IOBuf in the chain and removes it from the chain
483 *
484 * @return first IOBuf in the chain or nullptr if none.
485 */
486 std::unique_ptr<folly::IOBuf> pop_front();
487
488 /**
489 * Total chain length, only valid if cacheLength was specified in the
490 * constructor.
491 */
492 size_t chainLength() const {
493 if (UNLIKELY(!options_.cacheChainLength)) {
494 throw std::invalid_argument("IOBufQueue: chain length not cached");
495 }
496 dcheckCacheIntegrity();
497 return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
498 }
499
500 /**
501 * Returns true iff the IOBuf chain length is 0.
502 */
503 bool empty() const {
504 dcheckCacheIntegrity();
505 return !head_ ||
506 (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
507 }
508
509 const Options& options() const {
510 return options_;
511 }
512
513 /**
514 * Clear the queue. Note that this does not release the buffers, it
515 * just sets their length to zero; useful if you want to reuse the
516 * same queue without reallocating.
517 */
518 void clear();
519
520 /**
521 * Append the queue to a std::string. Non-destructive.
522 */
523 void appendToString(std::string& out) const;
524
525 /**
526 * Calls IOBuf::gather() on the head of the queue, if it exists.
527 */
528 void gather(std::size_t maxLength);
529
530 /** Movable */
531 IOBufQueue(IOBufQueue&&) noexcept;
532 IOBufQueue& operator=(IOBufQueue&&);
533
534 private:
535 std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
536
537 static const size_t kChainLengthNotCached = (size_t)-1;
538 /** Not copyable */
539 IOBufQueue(const IOBufQueue&) = delete;
540 IOBufQueue& operator=(const IOBufQueue&) = delete;
541
542 Options options_;
543
544 // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
545 // because doing it unchecked in postallocate() is faster (no (mis)predicted
546 // branch)
547 mutable size_t chainLength_{0};
548 /**
549 * Everything that has been appended but not yet discarded or moved out
550 * Note: anything that needs to operate on a tail should either call
551 * flushCache() or grab updateGuard() (it will flush the cache itself).
552 */
553 std::unique_ptr<folly::IOBuf> head_;
554
555 mutable uint8_t* tailStart_{nullptr};
556 WritableRangeCacheData* cachePtr_{nullptr};
557 WritableRangeCacheData localCache_;
558
559 void dcheckCacheIntegrity() const {
560 // Tail start should always be less than tail end.
561 DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first);
562 DCHECK_LE(
563 (void*)cachePtr_->cachedRange.first,
564 (void*)cachePtr_->cachedRange.second);
565 DCHECK(
566 cachePtr_->cachedRange.first != nullptr ||
567 cachePtr_->cachedRange.second == nullptr);
568
569 // There is always an attached cache instance.
570 DCHECK(cachePtr_->attached);
571
572 // Either cache is empty or it coincides with the tail.
573 DCHECK(
574 cachePtr_->cachedRange.first == nullptr ||
575 (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
576 tailStart_ <= cachePtr_->cachedRange.first &&
577 cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
578 cachePtr_->cachedRange.second ==
579 head_->prev()->writableTail() + head_->prev()->tailroom()));
580 }
581
582 /**
583 * Populate dest with writable tail range cache.
584 */
585 void fillWritableRangeCache(WritableRangeCacheData& dest) {
586 dcheckCacheIntegrity();
587 if (cachePtr_ != &dest) {
588 dest = std::move(*cachePtr_);
589 cachePtr_ = &dest;
590 }
591 }
592
593 /**
594 * Clear current writable tail cache and reset it to localCache_
595 */
596 void clearWritableRangeCache() {
597 flushCache();
598
599 if (cachePtr_ != &localCache_) {
600 localCache_ = std::move(*cachePtr_);
601 cachePtr_ = &localCache_;
602 }
603
604 DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
605 }
606
607 /**
608 * Commit any pending changes to the tail of the queue.
609 */
610 void flushCache() const {
611 dcheckCacheIntegrity();
612
613 if (tailStart_ != cachePtr_->cachedRange.first) {
614 auto buf = head_->prev();
615 DCHECK_EQ(
616 (void*)(buf->writableTail() + buf->tailroom()),
617 (void*)cachePtr_->cachedRange.second);
618 auto len = cachePtr_->cachedRange.first - tailStart_;
619 buf->append(len);
620 chainLength_ += len;
621 tailStart_ += len;
622 }
623 }
624
625 // For WritableRangeCache move assignment/construction.
626 void updateCacheRef(WritableRangeCacheData& newRef) {
627 cachePtr_ = &newRef;
628 }
629
630 /**
631 * Update cached writable tail range. Called by updateGuard()
632 */
633 void updateWritableTailCache() {
634 if (LIKELY(head_ != nullptr)) {
635 IOBuf* buf = head_->prev();
636 if (LIKELY(!buf->isSharedOne())) {
637 tailStart_ = buf->writableTail();
638 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
639 tailStart_, tailStart_ + buf->tailroom());
640 return;
641 }
642 }
643 tailStart_ = nullptr;
644 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
645 }
646
647 std::pair<void*, std::size_t> preallocateSlow(
648 std::size_t min,
649 std::size_t newAllocationSize,
650 std::size_t max);
651};
652
653} // namespace folly
654