1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | |
20 | #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
21 | #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 |
22 | |
23 | #include <cstdlib> |
24 | #include <cstring> |
25 | #include <limits> |
26 | #include <boost/scoped_array.hpp> |
27 | |
28 | #include <thrift/transport/TTransport.h> |
29 | #include <thrift/transport/TVirtualTransport.h> |
30 | |
31 | #ifdef __GNUC__ |
32 | #define TDB_LIKELY(val) (__builtin_expect((val), 1)) |
33 | #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) |
34 | #else |
35 | #define TDB_LIKELY(val) (val) |
36 | #define TDB_UNLIKELY(val) (val) |
37 | #endif |
38 | |
39 | namespace apache { |
40 | namespace thrift { |
41 | namespace transport { |
42 | |
43 | /** |
44 | * Base class for all transports that use read/write buffers for performance. |
45 | * |
46 | * TBufferBase is designed to implement the fast-path "memcpy" style |
47 | * operations that work in the common case. It does so with small and |
48 | * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract |
49 | * class. Subclasses are expected to define the "slow path" operations |
50 | * that have to be done when the buffers are full or empty. |
51 | * |
52 | */ |
53 | class TBufferBase : public TVirtualTransport<TBufferBase> { |
54 | |
55 | public: |
56 | /** |
57 | * Fast-path read. |
58 | * |
59 | * When we have enough data buffered to fulfill the read, we can satisfy it |
60 | * with a single memcpy, then adjust our internal pointers. If the buffer |
61 | * is empty, we call out to our slow path, implemented by a subclass. |
62 | * This method is meant to eventually be nonvirtual and inlinable. |
63 | */ |
64 | uint32_t read(uint8_t* buf, uint32_t len) { |
65 | uint8_t* new_rBase = rBase_ + len; |
66 | if (TDB_LIKELY(new_rBase <= rBound_)) { |
67 | std::memcpy(buf, rBase_, len); |
68 | rBase_ = new_rBase; |
69 | return len; |
70 | } |
71 | return readSlow(buf, len); |
72 | } |
73 | |
74 | /** |
75 | * Shortcutted version of readAll. |
76 | */ |
77 | uint32_t readAll(uint8_t* buf, uint32_t len) { |
78 | uint8_t* new_rBase = rBase_ + len; |
79 | if (TDB_LIKELY(new_rBase <= rBound_)) { |
80 | std::memcpy(buf, rBase_, len); |
81 | rBase_ = new_rBase; |
82 | return len; |
83 | } |
84 | return apache::thrift::transport::readAll(*this, buf, len); |
85 | } |
86 | |
87 | /** |
88 | * Fast-path write. |
89 | * |
90 | * When we have enough empty space in our buffer to accommodate the write, we |
91 | * can satisfy it with a single memcpy, then adjust our internal pointers. |
92 | * If the buffer is full, we call out to our slow path, implemented by a |
93 | * subclass. This method is meant to eventually be nonvirtual and |
94 | * inlinable. |
95 | */ |
96 | void write(const uint8_t* buf, uint32_t len) { |
97 | uint8_t* new_wBase = wBase_ + len; |
98 | if (TDB_LIKELY(new_wBase <= wBound_)) { |
99 | std::memcpy(wBase_, buf, len); |
100 | wBase_ = new_wBase; |
101 | return; |
102 | } |
103 | writeSlow(buf, len); |
104 | } |
105 | |
106 | /** |
107 | * Fast-path borrow. A lot like the fast-path read. |
108 | */ |
109 | const uint8_t* borrow(uint8_t* buf, uint32_t* len) { |
110 | if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { |
111 | // With strict aliasing, writing to len shouldn't force us to |
112 | // refetch rBase_ from memory. TODO(dreiss): Verify this. |
113 | *len = static_cast<uint32_t>(rBound_ - rBase_); |
114 | return rBase_; |
115 | } |
116 | return borrowSlow(buf, len); |
117 | } |
118 | |
119 | /** |
120 | * Consume doesn't require a slow path. |
121 | */ |
122 | void consume(uint32_t len) { |
123 | if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { |
124 | rBase_ += len; |
125 | } else { |
126 | throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow." ); |
127 | } |
128 | } |
129 | |
130 | protected: |
131 | /// Slow path read. |
132 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; |
133 | |
134 | /// Slow path write. |
135 | virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; |
136 | |
137 | /** |
138 | * Slow path borrow. |
139 | * |
140 | * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len |
141 | */ |
142 | virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; |
143 | |
144 | /** |
145 | * Trivial constructor. |
146 | * |
147 | * Initialize pointers safely. Constructing is not a very |
148 | * performance-sensitive operation, so it is okay to just leave it to |
149 | * the concrete class to set up pointers correctly. |
150 | */ |
151 | TBufferBase() : rBase_(NULL), rBound_(NULL), wBase_(NULL), wBound_(NULL) {} |
152 | |
153 | /// Convenience mutator for setting the read buffer. |
154 | void setReadBuffer(uint8_t* buf, uint32_t len) { |
155 | rBase_ = buf; |
156 | rBound_ = buf + len; |
157 | } |
158 | |
159 | /// Convenience mutator for setting the write buffer. |
160 | void setWriteBuffer(uint8_t* buf, uint32_t len) { |
161 | wBase_ = buf; |
162 | wBound_ = buf + len; |
163 | } |
164 | |
165 | virtual ~TBufferBase() {} |
166 | |
167 | /// Reads begin here. |
168 | uint8_t* rBase_; |
169 | /// Reads may extend to just before here. |
170 | uint8_t* rBound_; |
171 | |
172 | /// Writes begin here. |
173 | uint8_t* wBase_; |
174 | /// Writes may extend to just before here. |
175 | uint8_t* wBound_; |
176 | }; |
177 | |
178 | /** |
179 | * Buffered transport. For reads it will read more data than is requested |
180 | * and will serve future data out of a local buffer. For writes, data is |
181 | * stored to an in memory buffer before being written out. |
182 | * |
183 | */ |
184 | class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> { |
185 | public: |
186 | static const int DEFAULT_BUFFER_SIZE = 512; |
187 | |
188 | /// Use default buffer sizes. |
189 | TBufferedTransport(stdcxx::shared_ptr<TTransport> transport) |
190 | : transport_(transport), |
191 | rBufSize_(DEFAULT_BUFFER_SIZE), |
192 | wBufSize_(DEFAULT_BUFFER_SIZE), |
193 | rBuf_(new uint8_t[rBufSize_]), |
194 | wBuf_(new uint8_t[wBufSize_]) { |
195 | initPointers(); |
196 | } |
197 | |
198 | /// Use specified buffer sizes. |
199 | TBufferedTransport(stdcxx::shared_ptr<TTransport> transport, uint32_t sz) |
200 | : transport_(transport), |
201 | rBufSize_(sz), |
202 | wBufSize_(sz), |
203 | rBuf_(new uint8_t[rBufSize_]), |
204 | wBuf_(new uint8_t[wBufSize_]) { |
205 | initPointers(); |
206 | } |
207 | |
208 | /// Use specified read and write buffer sizes. |
209 | TBufferedTransport(stdcxx::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) |
210 | : transport_(transport), |
211 | rBufSize_(rsz), |
212 | wBufSize_(wsz), |
213 | rBuf_(new uint8_t[rBufSize_]), |
214 | wBuf_(new uint8_t[wBufSize_]) { |
215 | initPointers(); |
216 | } |
217 | |
218 | void open() { transport_->open(); } |
219 | |
220 | bool isOpen() { return transport_->isOpen(); } |
221 | |
222 | bool peek() { |
223 | if (rBase_ == rBound_) { |
224 | setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); |
225 | } |
226 | return (rBound_ > rBase_); |
227 | } |
228 | |
229 | void close() { |
230 | flush(); |
231 | transport_->close(); |
232 | } |
233 | |
234 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len); |
235 | |
236 | virtual void writeSlow(const uint8_t* buf, uint32_t len); |
237 | |
238 | void flush(); |
239 | |
240 | /** |
241 | * Returns the origin of the underlying transport |
242 | */ |
243 | virtual const std::string getOrigin() { return transport_->getOrigin(); } |
244 | |
245 | /** |
246 | * The following behavior is currently implemented by TBufferedTransport, |
247 | * but that may change in a future version: |
248 | * 1/ If len is at most rBufSize_, borrow will never return NULL. |
249 | * Depending on the underlying transport, it could throw an exception |
250 | * or hang forever. |
251 | * 2/ Some borrow requests may copy bytes internally. However, |
252 | * if len is at most rBufSize_/2, none of the copied bytes |
253 | * will ever have to be copied again. For optimial performance, |
254 | * stay under this limit. |
255 | */ |
256 | virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
257 | |
258 | stdcxx::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } |
259 | |
260 | /* |
261 | * TVirtualTransport provides a default implementation of readAll(). |
262 | * We want to use the TBufferBase version instead. |
263 | */ |
264 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } |
265 | |
266 | protected: |
267 | void initPointers() { |
268 | setReadBuffer(rBuf_.get(), 0); |
269 | setWriteBuffer(wBuf_.get(), wBufSize_); |
270 | // Write size never changes. |
271 | } |
272 | |
273 | stdcxx::shared_ptr<TTransport> transport_; |
274 | |
275 | uint32_t rBufSize_; |
276 | uint32_t wBufSize_; |
277 | boost::scoped_array<uint8_t> rBuf_; |
278 | boost::scoped_array<uint8_t> wBuf_; |
279 | }; |
280 | |
281 | /** |
282 | * Wraps a transport into a buffered one. |
283 | * |
284 | */ |
285 | class TBufferedTransportFactory : public TTransportFactory { |
286 | public: |
287 | TBufferedTransportFactory() {} |
288 | |
289 | virtual ~TBufferedTransportFactory() {} |
290 | |
291 | /** |
292 | * Wraps the transport into a buffered one. |
293 | */ |
294 | virtual stdcxx::shared_ptr<TTransport> getTransport(stdcxx::shared_ptr<TTransport> trans) { |
295 | return stdcxx::shared_ptr<TTransport>(new TBufferedTransport(trans)); |
296 | } |
297 | }; |
298 | |
299 | /** |
300 | * Framed transport. All writes go into an in-memory buffer until flush is |
301 | * called, at which point the transport writes the length of the entire |
302 | * binary chunk followed by the data payload. This allows the receiver on the |
303 | * other end to always do fixed-length reads. |
304 | * |
305 | */ |
306 | class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> { |
307 | public: |
308 | static const int DEFAULT_BUFFER_SIZE = 512; |
309 | static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; |
310 | |
311 | /// Use default buffer sizes. |
312 | TFramedTransport() |
313 | : transport_(), |
314 | rBufSize_(0), |
315 | wBufSize_(DEFAULT_BUFFER_SIZE), |
316 | rBuf_(), |
317 | wBuf_(new uint8_t[wBufSize_]), |
318 | bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) { |
319 | initPointers(); |
320 | } |
321 | |
322 | TFramedTransport(stdcxx::shared_ptr<TTransport> transport) |
323 | : transport_(transport), |
324 | rBufSize_(0), |
325 | wBufSize_(DEFAULT_BUFFER_SIZE), |
326 | rBuf_(), |
327 | wBuf_(new uint8_t[wBufSize_]), |
328 | bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()), |
329 | maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) { |
330 | initPointers(); |
331 | } |
332 | |
333 | TFramedTransport(stdcxx::shared_ptr<TTransport> transport, |
334 | uint32_t sz, |
335 | uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)()) |
336 | : transport_(transport), |
337 | rBufSize_(0), |
338 | wBufSize_(sz), |
339 | rBuf_(), |
340 | wBuf_(new uint8_t[wBufSize_]), |
341 | bufReclaimThresh_(bufReclaimThresh), |
342 | maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) { |
343 | initPointers(); |
344 | } |
345 | |
346 | void open() { transport_->open(); } |
347 | |
348 | bool isOpen() { return transport_->isOpen(); } |
349 | |
350 | bool peek() { return (rBase_ < rBound_) || transport_->peek(); } |
351 | |
352 | void close() { |
353 | flush(); |
354 | transport_->close(); |
355 | } |
356 | |
357 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len); |
358 | |
359 | virtual void writeSlow(const uint8_t* buf, uint32_t len); |
360 | |
361 | virtual void flush(); |
362 | |
363 | uint32_t readEnd(); |
364 | |
365 | uint32_t writeEnd(); |
366 | |
367 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
368 | |
369 | stdcxx::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } |
370 | |
371 | /* |
372 | * TVirtualTransport provides a default implementation of readAll(). |
373 | * We want to use the TBufferBase version instead. |
374 | */ |
375 | using TBufferBase::readAll; |
376 | |
377 | /** |
378 | * Returns the origin of the underlying transport |
379 | */ |
380 | virtual const std::string getOrigin() { return transport_->getOrigin(); } |
381 | |
382 | /** |
383 | * Set the maximum size of the frame at read |
384 | */ |
385 | void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; } |
386 | |
387 | /** |
388 | * Get the maximum size of the frame at read |
389 | */ |
390 | uint32_t getMaxFrameSize() { return maxFrameSize_; } |
391 | |
392 | protected: |
393 | /** |
394 | * Reads a frame of input from the underlying stream. |
395 | * |
396 | * Returns true if a frame was read successfully, or false on EOF. |
397 | * (Raises a TTransportException if EOF occurs after a partial frame.) |
398 | */ |
399 | virtual bool readFrame(); |
400 | |
401 | void initPointers() { |
402 | setReadBuffer(NULL, 0); |
403 | setWriteBuffer(wBuf_.get(), wBufSize_); |
404 | |
405 | // Pad the buffer so we can insert the size later. |
406 | int32_t pad = 0; |
407 | this->write((uint8_t*)&pad, sizeof(pad)); |
408 | } |
409 | |
410 | stdcxx::shared_ptr<TTransport> transport_; |
411 | |
412 | uint32_t rBufSize_; |
413 | uint32_t wBufSize_; |
414 | boost::scoped_array<uint8_t> rBuf_; |
415 | boost::scoped_array<uint8_t> wBuf_; |
416 | uint32_t bufReclaimThresh_; |
417 | uint32_t maxFrameSize_; |
418 | }; |
419 | |
420 | /** |
421 | * Wraps a transport into a framed one. |
422 | * |
423 | */ |
424 | class TFramedTransportFactory : public TTransportFactory { |
425 | public: |
426 | TFramedTransportFactory() {} |
427 | |
428 | virtual ~TFramedTransportFactory() {} |
429 | |
430 | /** |
431 | * Wraps the transport into a framed one. |
432 | */ |
433 | virtual stdcxx::shared_ptr<TTransport> getTransport(stdcxx::shared_ptr<TTransport> trans) { |
434 | return stdcxx::shared_ptr<TTransport>(new TFramedTransport(trans)); |
435 | } |
436 | }; |
437 | |
438 | /** |
439 | * A memory buffer is a tranpsort that simply reads from and writes to an |
440 | * in memory buffer. Anytime you call write on it, the data is simply placed |
441 | * into a buffer, and anytime you call read, data is read from that buffer. |
442 | * |
443 | * The buffers are allocated using C constructs malloc,realloc, and the size |
444 | * doubles as necessary. We've considered using scoped |
445 | * |
446 | */ |
447 | class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { |
448 | private: |
449 | // Common initialization done by all constructors. |
450 | void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { |
451 | |
452 | maxBufferSize_ = std::numeric_limits<uint32_t>::max(); |
453 | |
454 | if (buf == NULL && size != 0) { |
455 | assert(owner); |
456 | buf = (uint8_t*)std::malloc(size); |
457 | if (buf == NULL) { |
458 | throw std::bad_alloc(); |
459 | } |
460 | } |
461 | |
462 | buffer_ = buf; |
463 | bufferSize_ = size; |
464 | |
465 | rBase_ = buffer_; |
466 | rBound_ = buffer_ + wPos; |
467 | // TODO(dreiss): Investigate NULL-ing this if !owner. |
468 | wBase_ = buffer_ + wPos; |
469 | wBound_ = buffer_ + bufferSize_; |
470 | |
471 | owner_ = owner; |
472 | |
473 | // rBound_ is really an artifact. In principle, it should always be |
474 | // equal to wBase_. We update it in a few places (computeRead, etc.). |
475 | } |
476 | |
477 | public: |
478 | static const uint32_t defaultSize = 1024; |
479 | |
480 | /** |
481 | * This enum specifies how a TMemoryBuffer should treat |
482 | * memory passed to it via constructors or resetBuffer. |
483 | * |
484 | * OBSERVE: |
485 | * TMemoryBuffer will simply store a pointer to the memory. |
486 | * It is the callers responsibility to ensure that the pointer |
487 | * remains valid for the lifetime of the TMemoryBuffer, |
488 | * and that it is properly cleaned up. |
489 | * Note that no data can be written to observed buffers. |
490 | * |
491 | * COPY: |
492 | * TMemoryBuffer will make an internal copy of the buffer. |
493 | * The caller has no responsibilities. |
494 | * |
495 | * TAKE_OWNERSHIP: |
496 | * TMemoryBuffer will become the "owner" of the buffer, |
497 | * and will be responsible for freeing it. |
498 | * The membory must have been allocated with malloc. |
499 | */ |
500 | enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 }; |
501 | |
502 | /** |
503 | * Construct a TMemoryBuffer with a default-sized buffer, |
504 | * owned by the TMemoryBuffer object. |
505 | */ |
506 | TMemoryBuffer() { initCommon(NULL, defaultSize, true, 0); } |
507 | |
508 | /** |
509 | * Construct a TMemoryBuffer with a buffer of a specified size, |
510 | * owned by the TMemoryBuffer object. |
511 | * |
512 | * @param sz The initial size of the buffer. |
513 | */ |
514 | TMemoryBuffer(uint32_t sz) { initCommon(NULL, sz, true, 0); } |
515 | |
516 | /** |
517 | * Construct a TMemoryBuffer with buf as its initial contents. |
518 | * |
519 | * @param buf The initial contents of the buffer. |
520 | * Note that, while buf is a non-const pointer, |
521 | * TMemoryBuffer will not write to it if policy == OBSERVE, |
522 | * so it is safe to const_cast<uint8_t*>(whatever). |
523 | * @param sz The size of @c buf. |
524 | * @param policy See @link MemoryPolicy @endlink . |
525 | */ |
526 | TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
527 | if (buf == NULL && sz != 0) { |
528 | throw TTransportException(TTransportException::BAD_ARGS, |
529 | "TMemoryBuffer given null buffer with non-zero size." ); |
530 | } |
531 | |
532 | switch (policy) { |
533 | case OBSERVE: |
534 | case TAKE_OWNERSHIP: |
535 | initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); |
536 | break; |
537 | case COPY: |
538 | initCommon(NULL, sz, true, 0); |
539 | this->write(buf, sz); |
540 | break; |
541 | default: |
542 | throw TTransportException(TTransportException::BAD_ARGS, |
543 | "Invalid MemoryPolicy for TMemoryBuffer" ); |
544 | } |
545 | } |
546 | |
547 | ~TMemoryBuffer() { |
548 | if (owner_) { |
549 | std::free(buffer_); |
550 | } |
551 | } |
552 | |
553 | bool isOpen() { return true; } |
554 | |
555 | bool peek() { return (rBase_ < wBase_); } |
556 | |
557 | void open() {} |
558 | |
559 | void close() {} |
560 | |
561 | // TODO(dreiss): Make bufPtr const. |
562 | void getBuffer(uint8_t** bufPtr, uint32_t* sz) { |
563 | *bufPtr = rBase_; |
564 | *sz = static_cast<uint32_t>(wBase_ - rBase_); |
565 | } |
566 | |
567 | std::string getBufferAsString() { |
568 | if (buffer_ == NULL) { |
569 | return "" ; |
570 | } |
571 | uint8_t* buf; |
572 | uint32_t sz; |
573 | getBuffer(&buf, &sz); |
574 | return std::string((char*)buf, (std::string::size_type)sz); |
575 | } |
576 | |
577 | void appendBufferToString(std::string& str) { |
578 | if (buffer_ == NULL) { |
579 | return; |
580 | } |
581 | uint8_t* buf; |
582 | uint32_t sz; |
583 | getBuffer(&buf, &sz); |
584 | str.append((char*)buf, sz); |
585 | } |
586 | |
587 | void resetBuffer() { |
588 | rBase_ = buffer_; |
589 | rBound_ = buffer_; |
590 | wBase_ = buffer_; |
591 | // It isn't safe to write into a buffer we don't own. |
592 | if (!owner_) { |
593 | wBound_ = wBase_; |
594 | bufferSize_ = 0; |
595 | } |
596 | } |
597 | |
598 | /// See constructor documentation. |
599 | void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
600 | // Use a variant of the copy-and-swap trick for assignment operators. |
601 | // This is sub-optimal in terms of performance for two reasons: |
602 | // 1/ The constructing and swapping of the (small) values |
603 | // in the temporary object takes some time, and is not necessary. |
604 | // 2/ If policy == COPY, we allocate the new buffer before |
605 | // freeing the old one, precluding the possibility of |
606 | // reusing that memory. |
607 | // I doubt that either of these problems could be optimized away, |
608 | // but the second is probably no a common case, and the first is minor. |
609 | // I don't expect resetBuffer to be a common operation, so I'm willing to |
610 | // bite the performance bullet to make the method this simple. |
611 | |
612 | // Construct the new buffer. |
613 | TMemoryBuffer new_buffer(buf, sz, policy); |
614 | // Move it into ourself. |
615 | this->swap(new_buffer); |
616 | // Our old self gets destroyed. |
617 | } |
618 | |
619 | /// See constructor documentation. |
620 | void resetBuffer(uint32_t sz) { |
621 | // Construct the new buffer. |
622 | TMemoryBuffer new_buffer(sz); |
623 | // Move it into ourself. |
624 | this->swap(new_buffer); |
625 | // Our old self gets destroyed. |
626 | } |
627 | |
628 | std::string readAsString(uint32_t len) { |
629 | std::string str; |
630 | (void)readAppendToString(str, len); |
631 | return str; |
632 | } |
633 | |
634 | uint32_t readAppendToString(std::string& str, uint32_t len); |
635 | |
636 | // return number of bytes read |
637 | uint32_t readEnd() { |
638 | // This cast should be safe, because buffer_'s size is a uint32_t |
639 | uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_); |
640 | if (rBase_ == wBase_) { |
641 | resetBuffer(); |
642 | } |
643 | return bytes; |
644 | } |
645 | |
646 | // Return number of bytes written |
647 | uint32_t writeEnd() { |
648 | // This cast should be safe, because buffer_'s size is a uint32_t |
649 | return static_cast<uint32_t>(wBase_ - buffer_); |
650 | } |
651 | |
652 | uint32_t available_read() const { |
653 | // Remember, wBase_ is the real rBound_. |
654 | return static_cast<uint32_t>(wBase_ - rBase_); |
655 | } |
656 | |
657 | uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); } |
658 | |
659 | // Returns a pointer to where the client can write data to append to |
660 | // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a |
661 | // write of the provided length. The returned pointer is very convenient for |
662 | // passing to read(), recv(), or similar. You must call wroteBytes() as soon |
663 | // as data is written or the buffer will not be aware that data has changed. |
664 | uint8_t* getWritePtr(uint32_t len) { |
665 | ensureCanWrite(len); |
666 | return wBase_; |
667 | } |
668 | |
669 | // Informs the buffer that the client has written 'len' bytes into storage |
670 | // that had been provided by getWritePtr(). |
671 | void wroteBytes(uint32_t len); |
672 | |
673 | /* |
674 | * TVirtualTransport provides a default implementation of readAll(). |
675 | * We want to use the TBufferBase version instead. |
676 | */ |
677 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } |
678 | |
679 | //! \brief Get the current buffer size |
680 | //! \returns the current buffer size |
681 | uint32_t getBufferSize() const { |
682 | return bufferSize_; |
683 | } |
684 | |
685 | //! \brief Get the current maximum buffer size |
686 | //! \returns the current maximum buffer size |
687 | uint32_t getMaxBufferSize() const { |
688 | return maxBufferSize_; |
689 | } |
690 | |
691 | //! \brief Change the maximum buffer size |
692 | //! \param[in] maxSize the new maximum buffer size allowed to grow to |
693 | //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size |
694 | void setMaxBufferSize(uint32_t maxSize) { |
695 | if (maxSize < bufferSize_) { |
696 | throw TTransportException(TTransportException::BAD_ARGS, |
697 | "Maximum buffer size would be less than current buffer size" ); |
698 | } |
699 | maxBufferSize_ = maxSize; |
700 | } |
701 | |
702 | protected: |
703 | void swap(TMemoryBuffer& that) { |
704 | using std::swap; |
705 | swap(buffer_, that.buffer_); |
706 | swap(bufferSize_, that.bufferSize_); |
707 | |
708 | swap(rBase_, that.rBase_); |
709 | swap(rBound_, that.rBound_); |
710 | swap(wBase_, that.wBase_); |
711 | swap(wBound_, that.wBound_); |
712 | |
713 | swap(owner_, that.owner_); |
714 | } |
715 | |
716 | // Make sure there's at least 'len' bytes available for writing. |
717 | void ensureCanWrite(uint32_t len); |
718 | |
719 | // Compute the position and available data for reading. |
720 | void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); |
721 | |
722 | uint32_t readSlow(uint8_t* buf, uint32_t len); |
723 | |
724 | void writeSlow(const uint8_t* buf, uint32_t len); |
725 | |
726 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
727 | |
728 | // Data buffer |
729 | uint8_t* buffer_; |
730 | |
731 | // Allocated buffer size |
732 | uint32_t bufferSize_; |
733 | |
734 | // Maximum allowed size |
735 | uint32_t maxBufferSize_; |
736 | |
737 | // Is this object the owner of the buffer? |
738 | bool owner_; |
739 | |
740 | // Don't forget to update constrctors, initCommon, and swap if |
741 | // you add new members. |
742 | }; |
743 | } |
744 | } |
745 | } // apache::thrift::transport |
746 | |
747 | #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
748 | |