1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/ScopeGuard.h> |
20 | #include <folly/io/IOBuf.h> |
21 | |
22 | #include <stdexcept> |
23 | #include <string> |
24 | |
25 | namespace folly { |
26 | |
27 | /** |
28 | * An IOBufQueue encapsulates a chain of IOBufs and provides |
29 | * convenience functions to append data to the back of the chain |
30 | * and remove data from the front. |
31 | * |
32 | * You may also prepend data into the headroom of the first buffer in the |
33 | * chain, if any. |
34 | */ |
35 | class IOBufQueue { |
36 | private: |
37 | /** |
38 | * This guard should be taken by any method that intends to do any changes |
39 | * to in data_ (e.g. appending to it). |
40 | * |
41 | * It flushes the writable tail cache and refills it on destruction. |
42 | */ |
43 | auto updateGuard() { |
44 | flushCache(); |
45 | return folly::makeGuard([this] { updateWritableTailCache(); }); |
46 | } |
47 | |
48 | struct WritableRangeCacheData { |
49 | std::pair<uint8_t*, uint8_t*> cachedRange; |
50 | bool attached{false}; |
51 | |
52 | WritableRangeCacheData() = default; |
53 | |
54 | WritableRangeCacheData(WritableRangeCacheData&& other) |
55 | : cachedRange(other.cachedRange), attached(other.attached) { |
56 | other.cachedRange = {}; |
57 | other.attached = false; |
58 | } |
59 | WritableRangeCacheData& operator=(WritableRangeCacheData&& other) { |
60 | cachedRange = other.cachedRange; |
61 | attached = other.attached; |
62 | |
63 | other.cachedRange = {}; |
64 | other.attached = false; |
65 | |
66 | return *this; |
67 | } |
68 | |
69 | WritableRangeCacheData(const WritableRangeCacheData&) = delete; |
70 | WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete; |
71 | }; |
72 | |
73 | public: |
74 | struct Options { |
75 | Options() : cacheChainLength(false) {} |
76 | bool cacheChainLength; |
77 | }; |
78 | |
79 | /** |
80 | * Commonly used Options, currently the only possible value other than |
81 | * the default. |
82 | */ |
83 | static Options cacheChainLength() { |
84 | Options options; |
85 | options.cacheChainLength = true; |
86 | return options; |
87 | } |
88 | |
89 | /** |
90 | * WritableRangeCache represents a cache of current writable tail and provides |
91 | * cheap and simple interface to append to it that avoids paying the cost of |
92 | * preallocate/postallocate pair (i.e. indirections and checks). |
93 | * |
94 | * The cache is flushed on destruction/copy/move and on non-const accesses to |
95 | * the underlying IOBufQueue. |
96 | * |
97 | * Note: there can be only one active cache for a given IOBufQueue, i.e. when |
98 | * you fill a cache object it automatically invalidates other |
99 | * cache (if any). |
100 | */ |
101 | class WritableRangeCache { |
102 | public: |
103 | explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) { |
104 | if (queue_) { |
105 | fillCache(); |
106 | } |
107 | } |
108 | |
109 | /** |
110 | * Move constructor/assignment can move the cached range, but must update |
111 | * the reference in IOBufQueue. |
112 | */ |
113 | WritableRangeCache(WritableRangeCache&& other) |
114 | : data_(std::move(other.data_)), queue_(other.queue_) { |
115 | if (data_.attached) { |
116 | queue_->updateCacheRef(data_); |
117 | } |
118 | } |
119 | WritableRangeCache& operator=(WritableRangeCache&& other) { |
120 | if (data_.attached) { |
121 | queue_->clearWritableRangeCache(); |
122 | } |
123 | |
124 | data_ = std::move(other.data_); |
125 | queue_ = other.queue_; |
126 | |
127 | if (data_.attached) { |
128 | queue_->updateCacheRef(data_); |
129 | } |
130 | |
131 | return *this; |
132 | } |
133 | |
134 | /** |
135 | * Copy constructor/assignment cannot copy the cached range. |
136 | */ |
137 | WritableRangeCache(const WritableRangeCache& other) |
138 | : queue_(other.queue_) {} |
139 | WritableRangeCache& operator=(const WritableRangeCache& other) { |
140 | if (data_.attached) { |
141 | queue_->clearWritableRangeCache(); |
142 | } |
143 | |
144 | queue_ = other.queue_; |
145 | |
146 | return *this; |
147 | } |
148 | |
149 | ~WritableRangeCache() { |
150 | if (data_.attached) { |
151 | queue_->clearWritableRangeCache(); |
152 | } |
153 | } |
154 | |
155 | /** |
156 | * Reset the underlying IOBufQueue, will flush current cache if present. |
157 | */ |
158 | void reset(IOBufQueue* q) { |
159 | if (data_.attached) { |
160 | queue_->clearWritableRangeCache(); |
161 | } |
162 | |
163 | queue_ = q; |
164 | |
165 | if (queue_) { |
166 | fillCache(); |
167 | } |
168 | } |
169 | |
170 | /** |
171 | * Get a pointer to the underlying IOBufQueue object. |
172 | */ |
173 | IOBufQueue* queue() { |
174 | return queue_; |
175 | } |
176 | |
177 | /** |
178 | * Return a pointer to the start of cached writable tail. |
179 | * |
180 | * Note: doesn't populate cache. |
181 | */ |
182 | uint8_t* writableData() { |
183 | dcheckIntegrity(); |
184 | return data_.cachedRange.first; |
185 | } |
186 | |
187 | /** |
188 | * Return a length of cached writable tail. |
189 | * |
190 | * Note: doesn't populate cache. |
191 | */ |
192 | size_t length() { |
193 | dcheckIntegrity(); |
194 | return data_.cachedRange.second - data_.cachedRange.first; |
195 | } |
196 | |
197 | /** |
198 | * Mark n bytes as occupied (e.g. postallocate). |
199 | */ |
200 | void append(size_t n) { |
201 | dcheckIntegrity(); |
202 | // This can happen only if somebody is misusing the interface. |
203 | // E.g. calling append after touching IOBufQueue or without checking |
204 | // the length(). |
205 | if (LIKELY(data_.cachedRange.first != nullptr)) { |
206 | DCHECK_LE(n, length()); |
207 | data_.cachedRange.first += n; |
208 | } else { |
209 | appendSlow(n); |
210 | } |
211 | } |
212 | |
213 | /** |
214 | * Same as append(n), but avoids checking if there is a cache. |
215 | * The caller must guarantee that the cache is set (e.g. the caller just |
216 | * called fillCache or checked that it's not empty). |
217 | */ |
218 | void appendUnsafe(size_t n) { |
219 | data_.cachedRange.first += n; |
220 | } |
221 | |
222 | /** |
223 | * Fill the cache of writable tail from the underlying IOBufQueue. |
224 | */ |
225 | void fillCache() { |
226 | queue_->fillWritableRangeCache(data_); |
227 | } |
228 | |
229 | private: |
230 | WritableRangeCacheData data_; |
231 | IOBufQueue* queue_; |
232 | |
233 | FOLLY_NOINLINE void appendSlow(size_t n) { |
234 | queue_->postallocate(n); |
235 | } |
236 | |
237 | void dcheckIntegrity() { |
238 | // Tail start should always be less than tail end. |
239 | DCHECK_LE( |
240 | (void*)data_.cachedRange.first, (void*)data_.cachedRange.second); |
241 | DCHECK( |
242 | data_.cachedRange.first != nullptr || |
243 | data_.cachedRange.second == nullptr); |
244 | |
245 | // Cached range should be always empty if the cache is not attached. |
246 | DCHECK( |
247 | data_.attached || |
248 | (data_.cachedRange.first == nullptr && |
249 | data_.cachedRange.second == nullptr)); |
250 | |
251 | // We cannot be in attached state if the queue_ is not set. |
252 | DCHECK(queue_ != nullptr || !data_.attached); |
253 | |
254 | // If we're attached and the cache is not empty, then it should coincide |
255 | // with the tail buffer. |
256 | DCHECK( |
257 | !data_.attached || data_.cachedRange.first == nullptr || |
258 | (queue_->head_ != nullptr && |
259 | data_.cachedRange.first >= queue_->head_->prev()->writableTail() && |
260 | data_.cachedRange.second == |
261 | queue_->head_->prev()->writableTail() + |
262 | queue_->head_->prev()->tailroom())); |
263 | } |
264 | }; |
265 | |
266 | explicit IOBufQueue(const Options& options = Options()); |
267 | ~IOBufQueue(); |
268 | |
269 | /** |
270 | * Return a space to prepend bytes and the amount of headroom available. |
271 | */ |
272 | std::pair<void*, std::size_t> headroom(); |
273 | |
274 | /** |
275 | * Indicate that n bytes from the headroom have been used. |
276 | */ |
277 | void markPrepended(std::size_t n); |
278 | |
279 | /** |
280 | * Prepend an existing range; throws std::overflow_error if not enough |
281 | * room. |
282 | */ |
283 | void prepend(const void* buf, std::size_t n); |
284 | |
285 | /** |
286 | * Add a buffer or buffer chain to the end of this queue. The |
287 | * queue takes ownership of buf. |
288 | * |
289 | * If pack is true, we try to reduce wastage at the end of this queue |
290 | * by copying some data from the first buffers in the buf chain (and |
291 | * releasing the buffers), if possible. If pack is false, we leave |
292 | * the chain topology unchanged. |
293 | */ |
294 | void append(std::unique_ptr<folly::IOBuf>&& buf, bool pack = false); |
295 | |
296 | /** |
297 | * Add a queue to the end of this queue. The queue takes ownership of |
298 | * all buffers from the other queue. |
299 | */ |
300 | void append(IOBufQueue& other, bool pack = false); |
301 | void append(IOBufQueue&& other, bool pack = false) { |
302 | append(other, pack); // call lvalue reference overload, above |
303 | } |
304 | |
305 | /** |
306 | * Copy len bytes, starting at buf, to the end of this queue. |
307 | * The caller retains ownership of the source data. |
308 | */ |
309 | void append(const void* buf, size_t len); |
310 | |
311 | /** |
312 | * Copy a string to the end of this queue. |
313 | * The caller retains ownership of the source data. |
314 | */ |
315 | void append(StringPiece sp) { |
316 | append(sp.data(), sp.size()); |
317 | } |
318 | |
319 | /** |
320 | * Append a chain of IOBuf objects that point to consecutive regions |
321 | * within buf. |
322 | * |
323 | * Just like IOBuf::wrapBuffer, this should only be used when the caller |
324 | * knows ahead of time and can ensure that all IOBuf objects that will point |
325 | * to this buffer will be destroyed before the buffer itself is destroyed; |
326 | * all other caveats from wrapBuffer also apply. |
327 | * |
328 | * Every buffer except for the last will wrap exactly blockSize bytes. |
329 | * Importantly, this method may be used to wrap buffers larger than 4GB. |
330 | */ |
331 | void wrapBuffer( |
332 | const void* buf, |
333 | size_t len, |
334 | std::size_t blockSize = (1U << 31)); // default block size: 2GB |
335 | |
336 | /** |
337 | * Obtain a writable block of contiguous bytes at the end of this |
338 | * queue, allocating more space if necessary. The amount of space |
339 | * reserved will be at least min. If min contiguous space is not |
340 | * available at the end of the queue, and IOBuf with size newAllocationSize |
341 | * is appended to the chain and returned. The actual available space |
342 | * may be larger than newAllocationSize, but will be truncated to max, |
343 | * if specified. |
344 | * |
345 | * If the caller subsequently writes anything into the returned space, |
346 | * it must call the postallocate() method. |
347 | * |
348 | * @return The starting address of the block and the length in bytes. |
349 | * |
350 | * @note The point of the preallocate()/postallocate() mechanism is |
351 | * to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback |
352 | * that request a buffer from the application and then, in a later |
353 | * callback, tell the application how much of the buffer they've |
354 | * filled with data. |
355 | */ |
356 | std::pair<void*, std::size_t> preallocate( |
357 | std::size_t min, |
358 | std::size_t newAllocationSize, |
359 | std::size_t max = std::numeric_limits<std::size_t>::max()) { |
360 | dcheckCacheIntegrity(); |
361 | |
362 | if (LIKELY(writableTail() != nullptr && tailroom() >= min)) { |
363 | return std::make_pair( |
364 | writableTail(), std::min<std::size_t>(max, tailroom())); |
365 | } |
366 | |
367 | return preallocateSlow(min, newAllocationSize, max); |
368 | } |
369 | |
370 | /** |
371 | * Tell the queue that the caller has written data into the first n |
372 | * bytes provided by the previous preallocate() call. |
373 | * |
374 | * @note n should be less than or equal to the size returned by |
375 | * preallocate(). If n is zero, the caller may skip the call |
376 | * to postallocate(). If n is nonzero, the caller must not |
377 | * invoke any other non-const methods on this IOBufQueue between |
378 | * the call to preallocate and the call to postallocate(). |
379 | */ |
380 | void postallocate(std::size_t n) { |
381 | dcheckCacheIntegrity(); |
382 | DCHECK_LE( |
383 | (void*)(cachePtr_->cachedRange.first + n), |
384 | (void*)cachePtr_->cachedRange.second); |
385 | cachePtr_->cachedRange.first += n; |
386 | } |
387 | |
388 | /** |
389 | * Obtain a writable block of n contiguous bytes, allocating more space |
390 | * if necessary, and mark it as used. The caller can fill it later. |
391 | */ |
392 | void* allocate(std::size_t n) { |
393 | void* p = preallocate(n, n).first; |
394 | postallocate(n); |
395 | return p; |
396 | } |
397 | |
398 | void* writableTail() const { |
399 | dcheckCacheIntegrity(); |
400 | return cachePtr_->cachedRange.first; |
401 | } |
402 | |
403 | size_t tailroom() const { |
404 | dcheckCacheIntegrity(); |
405 | return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first; |
406 | } |
407 | |
408 | /** |
409 | * Split off the first n bytes of the queue into a separate IOBuf chain, |
410 | * and transfer ownership of the new chain to the caller. The IOBufQueue |
411 | * retains ownership of everything after the split point. |
412 | * |
413 | * @warning If the split point lies in the middle of some IOBuf within |
414 | * the chain, this function may, as an implementation detail, |
415 | * clone that IOBuf. |
416 | * |
417 | * @throws std::underflow_error if n exceeds the number of bytes |
418 | * in the queue. |
419 | */ |
420 | std::unique_ptr<folly::IOBuf> split(size_t n) { |
421 | return split(n, true); |
422 | } |
423 | |
424 | /** |
425 | * Similar to split, but will return the entire queue instead of throwing |
426 | * if n exceeds the number of bytes in the queue. |
427 | */ |
428 | std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) { |
429 | return split(n, false); |
430 | } |
431 | |
432 | /** |
433 | * Similar to IOBuf::trimStart, but works on the whole queue. Will |
434 | * pop off buffers that have been completely trimmed. |
435 | */ |
436 | void trimStart(size_t amount); |
437 | |
438 | /** |
439 | * Similar to trimStart, but will trim at most amount bytes and returns |
440 | * the number of bytes trimmed. |
441 | */ |
442 | size_t trimStartAtMost(size_t amount); |
443 | |
444 | /** |
445 | * Similar to IOBuf::trimEnd, but works on the whole queue. Will |
446 | * pop off buffers that have been completely trimmed. |
447 | */ |
448 | void trimEnd(size_t amount); |
449 | |
450 | /** |
451 | * Similar to trimEnd, but will trim at most amount bytes and returns |
452 | * the number of bytes trimmed. |
453 | */ |
454 | size_t trimEndAtMost(size_t amount); |
455 | |
456 | /** |
457 | * Transfer ownership of the queue's entire IOBuf chain to the caller. |
458 | */ |
459 | std::unique_ptr<folly::IOBuf> move() { |
460 | auto guard = updateGuard(); |
461 | std::unique_ptr<folly::IOBuf> res = std::move(head_); |
462 | chainLength_ = 0; |
463 | return res; |
464 | } |
465 | |
466 | /** |
467 | * Access the front IOBuf. |
468 | * |
469 | * Note: caller will see the current state of the chain, but may not see |
470 | * future updates immediately, due to the presence of a tail cache. |
471 | * Note: the caller may potentially clone the chain, thus marking all buffers |
472 | * as shared. We may still continue writing to the tail of the last |
473 | * IOBuf without checking if it's shared, but this is fine, since the |
474 | * cloned IOBufs won't reference that data. |
475 | */ |
476 | const folly::IOBuf* front() const { |
477 | flushCache(); |
478 | return head_.get(); |
479 | } |
480 | |
481 | /** |
482 | * returns the first IOBuf in the chain and removes it from the chain |
483 | * |
484 | * @return first IOBuf in the chain or nullptr if none. |
485 | */ |
486 | std::unique_ptr<folly::IOBuf> pop_front(); |
487 | |
488 | /** |
489 | * Total chain length, only valid if cacheLength was specified in the |
490 | * constructor. |
491 | */ |
492 | size_t chainLength() const { |
493 | if (UNLIKELY(!options_.cacheChainLength)) { |
494 | throw std::invalid_argument("IOBufQueue: chain length not cached" ); |
495 | } |
496 | dcheckCacheIntegrity(); |
497 | return chainLength_ + (cachePtr_->cachedRange.first - tailStart_); |
498 | } |
499 | |
500 | /** |
501 | * Returns true iff the IOBuf chain length is 0. |
502 | */ |
503 | bool empty() const { |
504 | dcheckCacheIntegrity(); |
505 | return !head_ || |
506 | (head_->empty() && cachePtr_->cachedRange.first == tailStart_); |
507 | } |
508 | |
509 | const Options& options() const { |
510 | return options_; |
511 | } |
512 | |
513 | /** |
514 | * Clear the queue. Note that this does not release the buffers, it |
515 | * just sets their length to zero; useful if you want to reuse the |
516 | * same queue without reallocating. |
517 | */ |
518 | void clear(); |
519 | |
520 | /** |
521 | * Append the queue to a std::string. Non-destructive. |
522 | */ |
523 | void appendToString(std::string& out) const; |
524 | |
525 | /** |
526 | * Calls IOBuf::gather() on the head of the queue, if it exists. |
527 | */ |
528 | void gather(std::size_t maxLength); |
529 | |
530 | /** Movable */ |
531 | IOBufQueue(IOBufQueue&&) noexcept; |
532 | IOBufQueue& operator=(IOBufQueue&&); |
533 | |
534 | private: |
535 | std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow); |
536 | |
537 | static const size_t kChainLengthNotCached = (size_t)-1; |
538 | /** Not copyable */ |
539 | IOBufQueue(const IOBufQueue&) = delete; |
540 | IOBufQueue& operator=(const IOBufQueue&) = delete; |
541 | |
542 | Options options_; |
543 | |
544 | // NOTE that chainLength_ is still updated even if !options_.cacheChainLength |
545 | // because doing it unchecked in postallocate() is faster (no (mis)predicted |
546 | // branch) |
547 | mutable size_t chainLength_{0}; |
548 | /** |
549 | * Everything that has been appended but not yet discarded or moved out |
550 | * Note: anything that needs to operate on a tail should either call |
551 | * flushCache() or grab updateGuard() (it will flush the cache itself). |
552 | */ |
553 | std::unique_ptr<folly::IOBuf> head_; |
554 | |
555 | mutable uint8_t* tailStart_{nullptr}; |
556 | WritableRangeCacheData* cachePtr_{nullptr}; |
557 | WritableRangeCacheData localCache_; |
558 | |
559 | void dcheckCacheIntegrity() const { |
560 | // Tail start should always be less than tail end. |
561 | DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first); |
562 | DCHECK_LE( |
563 | (void*)cachePtr_->cachedRange.first, |
564 | (void*)cachePtr_->cachedRange.second); |
565 | DCHECK( |
566 | cachePtr_->cachedRange.first != nullptr || |
567 | cachePtr_->cachedRange.second == nullptr); |
568 | |
569 | // There is always an attached cache instance. |
570 | DCHECK(cachePtr_->attached); |
571 | |
572 | // Either cache is empty or it coincides with the tail. |
573 | DCHECK( |
574 | cachePtr_->cachedRange.first == nullptr || |
575 | (head_ != nullptr && tailStart_ == head_->prev()->writableTail() && |
576 | tailStart_ <= cachePtr_->cachedRange.first && |
577 | cachePtr_->cachedRange.first >= head_->prev()->writableTail() && |
578 | cachePtr_->cachedRange.second == |
579 | head_->prev()->writableTail() + head_->prev()->tailroom())); |
580 | } |
581 | |
582 | /** |
583 | * Populate dest with writable tail range cache. |
584 | */ |
585 | void fillWritableRangeCache(WritableRangeCacheData& dest) { |
586 | dcheckCacheIntegrity(); |
587 | if (cachePtr_ != &dest) { |
588 | dest = std::move(*cachePtr_); |
589 | cachePtr_ = &dest; |
590 | } |
591 | } |
592 | |
593 | /** |
594 | * Clear current writable tail cache and reset it to localCache_ |
595 | */ |
596 | void clearWritableRangeCache() { |
597 | flushCache(); |
598 | |
599 | if (cachePtr_ != &localCache_) { |
600 | localCache_ = std::move(*cachePtr_); |
601 | cachePtr_ = &localCache_; |
602 | } |
603 | |
604 | DCHECK(cachePtr_ == &localCache_ && localCache_.attached); |
605 | } |
606 | |
607 | /** |
608 | * Commit any pending changes to the tail of the queue. |
609 | */ |
610 | void flushCache() const { |
611 | dcheckCacheIntegrity(); |
612 | |
613 | if (tailStart_ != cachePtr_->cachedRange.first) { |
614 | auto buf = head_->prev(); |
615 | DCHECK_EQ( |
616 | (void*)(buf->writableTail() + buf->tailroom()), |
617 | (void*)cachePtr_->cachedRange.second); |
618 | auto len = cachePtr_->cachedRange.first - tailStart_; |
619 | buf->append(len); |
620 | chainLength_ += len; |
621 | tailStart_ += len; |
622 | } |
623 | } |
624 | |
625 | // For WritableRangeCache move assignment/construction. |
626 | void updateCacheRef(WritableRangeCacheData& newRef) { |
627 | cachePtr_ = &newRef; |
628 | } |
629 | |
630 | /** |
631 | * Update cached writable tail range. Called by updateGuard() |
632 | */ |
633 | void updateWritableTailCache() { |
634 | if (LIKELY(head_ != nullptr)) { |
635 | IOBuf* buf = head_->prev(); |
636 | if (LIKELY(!buf->isSharedOne())) { |
637 | tailStart_ = buf->writableTail(); |
638 | cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>( |
639 | tailStart_, tailStart_ + buf->tailroom()); |
640 | return; |
641 | } |
642 | } |
643 | tailStart_ = nullptr; |
644 | cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(); |
645 | } |
646 | |
647 | std::pair<void*, std::size_t> preallocateSlow( |
648 | std::size_t min, |
649 | std::size_t newAllocationSize, |
650 | std::size_t max); |
651 | }; |
652 | |
653 | } // namespace folly |
654 | |