| 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | |
| 20 | #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
| 21 | #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 |
| 22 | |
| 23 | #include <cstdlib> |
| 24 | #include <cstring> |
| 25 | #include <limits> |
| 26 | #include <boost/scoped_array.hpp> |
| 27 | |
| 28 | #include <thrift/transport/TTransport.h> |
| 29 | #include <thrift/transport/TVirtualTransport.h> |
| 30 | |
| 31 | #ifdef __GNUC__ |
| 32 | #define TDB_LIKELY(val) (__builtin_expect((val), 1)) |
| 33 | #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) |
| 34 | #else |
| 35 | #define TDB_LIKELY(val) (val) |
| 36 | #define TDB_UNLIKELY(val) (val) |
| 37 | #endif |
| 38 | |
| 39 | namespace apache { |
| 40 | namespace thrift { |
| 41 | namespace transport { |
| 42 | |
| 43 | /** |
| 44 | * Base class for all transports that use read/write buffers for performance. |
| 45 | * |
| 46 | * TBufferBase is designed to implement the fast-path "memcpy" style |
| 47 | * operations that work in the common case. It does so with small and |
| 48 | * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract |
| 49 | * class. Subclasses are expected to define the "slow path" operations |
| 50 | * that have to be done when the buffers are full or empty. |
| 51 | * |
| 52 | */ |
| 53 | class TBufferBase : public TVirtualTransport<TBufferBase> { |
| 54 | |
| 55 | public: |
| 56 | /** |
| 57 | * Fast-path read. |
| 58 | * |
| 59 | * When we have enough data buffered to fulfill the read, we can satisfy it |
| 60 | * with a single memcpy, then adjust our internal pointers. If the buffer |
| 61 | * is empty, we call out to our slow path, implemented by a subclass. |
| 62 | * This method is meant to eventually be nonvirtual and inlinable. |
| 63 | */ |
| 64 | uint32_t read(uint8_t* buf, uint32_t len) { |
| 65 | uint8_t* new_rBase = rBase_ + len; |
| 66 | if (TDB_LIKELY(new_rBase <= rBound_)) { |
| 67 | std::memcpy(buf, rBase_, len); |
| 68 | rBase_ = new_rBase; |
| 69 | return len; |
| 70 | } |
| 71 | return readSlow(buf, len); |
| 72 | } |
| 73 | |
| 74 | /** |
| 75 | * Shortcutted version of readAll. |
| 76 | */ |
| 77 | uint32_t readAll(uint8_t* buf, uint32_t len) { |
| 78 | uint8_t* new_rBase = rBase_ + len; |
| 79 | if (TDB_LIKELY(new_rBase <= rBound_)) { |
| 80 | std::memcpy(buf, rBase_, len); |
| 81 | rBase_ = new_rBase; |
| 82 | return len; |
| 83 | } |
| 84 | return apache::thrift::transport::readAll(*this, buf, len); |
| 85 | } |
| 86 | |
| 87 | /** |
| 88 | * Fast-path write. |
| 89 | * |
| 90 | * When we have enough empty space in our buffer to accommodate the write, we |
| 91 | * can satisfy it with a single memcpy, then adjust our internal pointers. |
| 92 | * If the buffer is full, we call out to our slow path, implemented by a |
| 93 | * subclass. This method is meant to eventually be nonvirtual and |
| 94 | * inlinable. |
| 95 | */ |
| 96 | void write(const uint8_t* buf, uint32_t len) { |
| 97 | uint8_t* new_wBase = wBase_ + len; |
| 98 | if (TDB_LIKELY(new_wBase <= wBound_)) { |
| 99 | std::memcpy(wBase_, buf, len); |
| 100 | wBase_ = new_wBase; |
| 101 | return; |
| 102 | } |
| 103 | writeSlow(buf, len); |
| 104 | } |
| 105 | |
| 106 | /** |
| 107 | * Fast-path borrow. A lot like the fast-path read. |
| 108 | */ |
| 109 | const uint8_t* borrow(uint8_t* buf, uint32_t* len) { |
| 110 | if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { |
| 111 | // With strict aliasing, writing to len shouldn't force us to |
| 112 | // refetch rBase_ from memory. TODO(dreiss): Verify this. |
| 113 | *len = static_cast<uint32_t>(rBound_ - rBase_); |
| 114 | return rBase_; |
| 115 | } |
| 116 | return borrowSlow(buf, len); |
| 117 | } |
| 118 | |
| 119 | /** |
| 120 | * Consume doesn't require a slow path. |
| 121 | */ |
| 122 | void consume(uint32_t len) { |
| 123 | if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { |
| 124 | rBase_ += len; |
| 125 | } else { |
| 126 | throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow." ); |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | protected: |
| 131 | /// Slow path read. |
| 132 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; |
| 133 | |
| 134 | /// Slow path write. |
| 135 | virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; |
| 136 | |
| 137 | /** |
| 138 | * Slow path borrow. |
| 139 | * |
| 140 | * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len |
| 141 | */ |
| 142 | virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; |
| 143 | |
| 144 | /** |
| 145 | * Trivial constructor. |
| 146 | * |
| 147 | * Initialize pointers safely. Constructing is not a very |
| 148 | * performance-sensitive operation, so it is okay to just leave it to |
| 149 | * the concrete class to set up pointers correctly. |
| 150 | */ |
| 151 | TBufferBase() : rBase_(NULL), rBound_(NULL), wBase_(NULL), wBound_(NULL) {} |
| 152 | |
| 153 | /// Convenience mutator for setting the read buffer. |
| 154 | void setReadBuffer(uint8_t* buf, uint32_t len) { |
| 155 | rBase_ = buf; |
| 156 | rBound_ = buf + len; |
| 157 | } |
| 158 | |
| 159 | /// Convenience mutator for setting the write buffer. |
| 160 | void setWriteBuffer(uint8_t* buf, uint32_t len) { |
| 161 | wBase_ = buf; |
| 162 | wBound_ = buf + len; |
| 163 | } |
| 164 | |
| 165 | virtual ~TBufferBase() {} |
| 166 | |
| 167 | /// Reads begin here. |
| 168 | uint8_t* rBase_; |
| 169 | /// Reads may extend to just before here. |
| 170 | uint8_t* rBound_; |
| 171 | |
| 172 | /// Writes begin here. |
| 173 | uint8_t* wBase_; |
| 174 | /// Writes may extend to just before here. |
| 175 | uint8_t* wBound_; |
| 176 | }; |
| 177 | |
| 178 | /** |
| 179 | * Buffered transport. For reads it will read more data than is requested |
| 180 | * and will serve future data out of a local buffer. For writes, data is |
| 181 | * stored to an in memory buffer before being written out. |
| 182 | * |
| 183 | */ |
| 184 | class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> { |
| 185 | public: |
| 186 | static const int DEFAULT_BUFFER_SIZE = 512; |
| 187 | |
| 188 | /// Use default buffer sizes. |
| 189 | TBufferedTransport(stdcxx::shared_ptr<TTransport> transport) |
| 190 | : transport_(transport), |
| 191 | rBufSize_(DEFAULT_BUFFER_SIZE), |
| 192 | wBufSize_(DEFAULT_BUFFER_SIZE), |
| 193 | rBuf_(new uint8_t[rBufSize_]), |
| 194 | wBuf_(new uint8_t[wBufSize_]) { |
| 195 | initPointers(); |
| 196 | } |
| 197 | |
| 198 | /// Use specified buffer sizes. |
| 199 | TBufferedTransport(stdcxx::shared_ptr<TTransport> transport, uint32_t sz) |
| 200 | : transport_(transport), |
| 201 | rBufSize_(sz), |
| 202 | wBufSize_(sz), |
| 203 | rBuf_(new uint8_t[rBufSize_]), |
| 204 | wBuf_(new uint8_t[wBufSize_]) { |
| 205 | initPointers(); |
| 206 | } |
| 207 | |
| 208 | /// Use specified read and write buffer sizes. |
| 209 | TBufferedTransport(stdcxx::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) |
| 210 | : transport_(transport), |
| 211 | rBufSize_(rsz), |
| 212 | wBufSize_(wsz), |
| 213 | rBuf_(new uint8_t[rBufSize_]), |
| 214 | wBuf_(new uint8_t[wBufSize_]) { |
| 215 | initPointers(); |
| 216 | } |
| 217 | |
| 218 | void open() { transport_->open(); } |
| 219 | |
| 220 | bool isOpen() { return transport_->isOpen(); } |
| 221 | |
| 222 | bool peek() { |
| 223 | if (rBase_ == rBound_) { |
| 224 | setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); |
| 225 | } |
| 226 | return (rBound_ > rBase_); |
| 227 | } |
| 228 | |
| 229 | void close() { |
| 230 | flush(); |
| 231 | transport_->close(); |
| 232 | } |
| 233 | |
| 234 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len); |
| 235 | |
| 236 | virtual void writeSlow(const uint8_t* buf, uint32_t len); |
| 237 | |
| 238 | void flush(); |
| 239 | |
| 240 | /** |
| 241 | * Returns the origin of the underlying transport |
| 242 | */ |
| 243 | virtual const std::string getOrigin() { return transport_->getOrigin(); } |
| 244 | |
| 245 | /** |
| 246 | * The following behavior is currently implemented by TBufferedTransport, |
| 247 | * but that may change in a future version: |
| 248 | * 1/ If len is at most rBufSize_, borrow will never return NULL. |
| 249 | * Depending on the underlying transport, it could throw an exception |
| 250 | * or hang forever. |
| 251 | * 2/ Some borrow requests may copy bytes internally. However, |
| 252 | * if len is at most rBufSize_/2, none of the copied bytes |
| 253 | * will ever have to be copied again. For optimial performance, |
| 254 | * stay under this limit. |
| 255 | */ |
| 256 | virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
| 257 | |
| 258 | stdcxx::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } |
| 259 | |
| 260 | /* |
| 261 | * TVirtualTransport provides a default implementation of readAll(). |
| 262 | * We want to use the TBufferBase version instead. |
| 263 | */ |
| 264 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } |
| 265 | |
| 266 | protected: |
| 267 | void initPointers() { |
| 268 | setReadBuffer(rBuf_.get(), 0); |
| 269 | setWriteBuffer(wBuf_.get(), wBufSize_); |
| 270 | // Write size never changes. |
| 271 | } |
| 272 | |
| 273 | stdcxx::shared_ptr<TTransport> transport_; |
| 274 | |
| 275 | uint32_t rBufSize_; |
| 276 | uint32_t wBufSize_; |
| 277 | boost::scoped_array<uint8_t> rBuf_; |
| 278 | boost::scoped_array<uint8_t> wBuf_; |
| 279 | }; |
| 280 | |
| 281 | /** |
| 282 | * Wraps a transport into a buffered one. |
| 283 | * |
| 284 | */ |
| 285 | class TBufferedTransportFactory : public TTransportFactory { |
| 286 | public: |
| 287 | TBufferedTransportFactory() {} |
| 288 | |
| 289 | virtual ~TBufferedTransportFactory() {} |
| 290 | |
| 291 | /** |
| 292 | * Wraps the transport into a buffered one. |
| 293 | */ |
| 294 | virtual stdcxx::shared_ptr<TTransport> getTransport(stdcxx::shared_ptr<TTransport> trans) { |
| 295 | return stdcxx::shared_ptr<TTransport>(new TBufferedTransport(trans)); |
| 296 | } |
| 297 | }; |
| 298 | |
| 299 | /** |
| 300 | * Framed transport. All writes go into an in-memory buffer until flush is |
| 301 | * called, at which point the transport writes the length of the entire |
| 302 | * binary chunk followed by the data payload. This allows the receiver on the |
| 303 | * other end to always do fixed-length reads. |
| 304 | * |
| 305 | */ |
| 306 | class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> { |
| 307 | public: |
| 308 | static const int DEFAULT_BUFFER_SIZE = 512; |
| 309 | static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; |
| 310 | |
| 311 | /// Use default buffer sizes. |
| 312 | TFramedTransport() |
| 313 | : transport_(), |
| 314 | rBufSize_(0), |
| 315 | wBufSize_(DEFAULT_BUFFER_SIZE), |
| 316 | rBuf_(), |
| 317 | wBuf_(new uint8_t[wBufSize_]), |
| 318 | bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) { |
| 319 | initPointers(); |
| 320 | } |
| 321 | |
| 322 | TFramedTransport(stdcxx::shared_ptr<TTransport> transport) |
| 323 | : transport_(transport), |
| 324 | rBufSize_(0), |
| 325 | wBufSize_(DEFAULT_BUFFER_SIZE), |
| 326 | rBuf_(), |
| 327 | wBuf_(new uint8_t[wBufSize_]), |
| 328 | bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()), |
| 329 | maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) { |
| 330 | initPointers(); |
| 331 | } |
| 332 | |
| 333 | TFramedTransport(stdcxx::shared_ptr<TTransport> transport, |
| 334 | uint32_t sz, |
| 335 | uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)()) |
| 336 | : transport_(transport), |
| 337 | rBufSize_(0), |
| 338 | wBufSize_(sz), |
| 339 | rBuf_(), |
| 340 | wBuf_(new uint8_t[wBufSize_]), |
| 341 | bufReclaimThresh_(bufReclaimThresh), |
| 342 | maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) { |
| 343 | initPointers(); |
| 344 | } |
| 345 | |
| 346 | void open() { transport_->open(); } |
| 347 | |
| 348 | bool isOpen() { return transport_->isOpen(); } |
| 349 | |
| 350 | bool peek() { return (rBase_ < rBound_) || transport_->peek(); } |
| 351 | |
| 352 | void close() { |
| 353 | flush(); |
| 354 | transport_->close(); |
| 355 | } |
| 356 | |
| 357 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len); |
| 358 | |
| 359 | virtual void writeSlow(const uint8_t* buf, uint32_t len); |
| 360 | |
| 361 | virtual void flush(); |
| 362 | |
| 363 | uint32_t readEnd(); |
| 364 | |
| 365 | uint32_t writeEnd(); |
| 366 | |
| 367 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
| 368 | |
| 369 | stdcxx::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } |
| 370 | |
| 371 | /* |
| 372 | * TVirtualTransport provides a default implementation of readAll(). |
| 373 | * We want to use the TBufferBase version instead. |
| 374 | */ |
| 375 | using TBufferBase::readAll; |
| 376 | |
| 377 | /** |
| 378 | * Returns the origin of the underlying transport |
| 379 | */ |
| 380 | virtual const std::string getOrigin() { return transport_->getOrigin(); } |
| 381 | |
| 382 | /** |
| 383 | * Set the maximum size of the frame at read |
| 384 | */ |
| 385 | void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; } |
| 386 | |
| 387 | /** |
| 388 | * Get the maximum size of the frame at read |
| 389 | */ |
| 390 | uint32_t getMaxFrameSize() { return maxFrameSize_; } |
| 391 | |
| 392 | protected: |
| 393 | /** |
| 394 | * Reads a frame of input from the underlying stream. |
| 395 | * |
| 396 | * Returns true if a frame was read successfully, or false on EOF. |
| 397 | * (Raises a TTransportException if EOF occurs after a partial frame.) |
| 398 | */ |
| 399 | virtual bool readFrame(); |
| 400 | |
| 401 | void initPointers() { |
| 402 | setReadBuffer(NULL, 0); |
| 403 | setWriteBuffer(wBuf_.get(), wBufSize_); |
| 404 | |
| 405 | // Pad the buffer so we can insert the size later. |
| 406 | int32_t pad = 0; |
| 407 | this->write((uint8_t*)&pad, sizeof(pad)); |
| 408 | } |
| 409 | |
| 410 | stdcxx::shared_ptr<TTransport> transport_; |
| 411 | |
| 412 | uint32_t rBufSize_; |
| 413 | uint32_t wBufSize_; |
| 414 | boost::scoped_array<uint8_t> rBuf_; |
| 415 | boost::scoped_array<uint8_t> wBuf_; |
| 416 | uint32_t bufReclaimThresh_; |
| 417 | uint32_t maxFrameSize_; |
| 418 | }; |
| 419 | |
| 420 | /** |
| 421 | * Wraps a transport into a framed one. |
| 422 | * |
| 423 | */ |
| 424 | class TFramedTransportFactory : public TTransportFactory { |
| 425 | public: |
| 426 | TFramedTransportFactory() {} |
| 427 | |
| 428 | virtual ~TFramedTransportFactory() {} |
| 429 | |
| 430 | /** |
| 431 | * Wraps the transport into a framed one. |
| 432 | */ |
| 433 | virtual stdcxx::shared_ptr<TTransport> getTransport(stdcxx::shared_ptr<TTransport> trans) { |
| 434 | return stdcxx::shared_ptr<TTransport>(new TFramedTransport(trans)); |
| 435 | } |
| 436 | }; |
| 437 | |
| 438 | /** |
| 439 | * A memory buffer is a tranpsort that simply reads from and writes to an |
| 440 | * in memory buffer. Anytime you call write on it, the data is simply placed |
| 441 | * into a buffer, and anytime you call read, data is read from that buffer. |
| 442 | * |
| 443 | * The buffers are allocated using C constructs malloc,realloc, and the size |
| 444 | * doubles as necessary. We've considered using scoped |
| 445 | * |
| 446 | */ |
| 447 | class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { |
| 448 | private: |
| 449 | // Common initialization done by all constructors. |
| 450 | void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { |
| 451 | |
| 452 | maxBufferSize_ = std::numeric_limits<uint32_t>::max(); |
| 453 | |
| 454 | if (buf == NULL && size != 0) { |
| 455 | assert(owner); |
| 456 | buf = (uint8_t*)std::malloc(size); |
| 457 | if (buf == NULL) { |
| 458 | throw std::bad_alloc(); |
| 459 | } |
| 460 | } |
| 461 | |
| 462 | buffer_ = buf; |
| 463 | bufferSize_ = size; |
| 464 | |
| 465 | rBase_ = buffer_; |
| 466 | rBound_ = buffer_ + wPos; |
| 467 | // TODO(dreiss): Investigate NULL-ing this if !owner. |
| 468 | wBase_ = buffer_ + wPos; |
| 469 | wBound_ = buffer_ + bufferSize_; |
| 470 | |
| 471 | owner_ = owner; |
| 472 | |
| 473 | // rBound_ is really an artifact. In principle, it should always be |
| 474 | // equal to wBase_. We update it in a few places (computeRead, etc.). |
| 475 | } |
| 476 | |
| 477 | public: |
| 478 | static const uint32_t defaultSize = 1024; |
| 479 | |
| 480 | /** |
| 481 | * This enum specifies how a TMemoryBuffer should treat |
| 482 | * memory passed to it via constructors or resetBuffer. |
| 483 | * |
| 484 | * OBSERVE: |
| 485 | * TMemoryBuffer will simply store a pointer to the memory. |
| 486 | * It is the callers responsibility to ensure that the pointer |
| 487 | * remains valid for the lifetime of the TMemoryBuffer, |
| 488 | * and that it is properly cleaned up. |
| 489 | * Note that no data can be written to observed buffers. |
| 490 | * |
| 491 | * COPY: |
| 492 | * TMemoryBuffer will make an internal copy of the buffer. |
| 493 | * The caller has no responsibilities. |
| 494 | * |
| 495 | * TAKE_OWNERSHIP: |
| 496 | * TMemoryBuffer will become the "owner" of the buffer, |
| 497 | * and will be responsible for freeing it. |
| 498 | * The membory must have been allocated with malloc. |
| 499 | */ |
| 500 | enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 }; |
| 501 | |
| 502 | /** |
| 503 | * Construct a TMemoryBuffer with a default-sized buffer, |
| 504 | * owned by the TMemoryBuffer object. |
| 505 | */ |
| 506 | TMemoryBuffer() { initCommon(NULL, defaultSize, true, 0); } |
| 507 | |
| 508 | /** |
| 509 | * Construct a TMemoryBuffer with a buffer of a specified size, |
| 510 | * owned by the TMemoryBuffer object. |
| 511 | * |
| 512 | * @param sz The initial size of the buffer. |
| 513 | */ |
| 514 | TMemoryBuffer(uint32_t sz) { initCommon(NULL, sz, true, 0); } |
| 515 | |
| 516 | /** |
| 517 | * Construct a TMemoryBuffer with buf as its initial contents. |
| 518 | * |
| 519 | * @param buf The initial contents of the buffer. |
| 520 | * Note that, while buf is a non-const pointer, |
| 521 | * TMemoryBuffer will not write to it if policy == OBSERVE, |
| 522 | * so it is safe to const_cast<uint8_t*>(whatever). |
| 523 | * @param sz The size of @c buf. |
| 524 | * @param policy See @link MemoryPolicy @endlink . |
| 525 | */ |
| 526 | TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
| 527 | if (buf == NULL && sz != 0) { |
| 528 | throw TTransportException(TTransportException::BAD_ARGS, |
| 529 | "TMemoryBuffer given null buffer with non-zero size." ); |
| 530 | } |
| 531 | |
| 532 | switch (policy) { |
| 533 | case OBSERVE: |
| 534 | case TAKE_OWNERSHIP: |
| 535 | initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); |
| 536 | break; |
| 537 | case COPY: |
| 538 | initCommon(NULL, sz, true, 0); |
| 539 | this->write(buf, sz); |
| 540 | break; |
| 541 | default: |
| 542 | throw TTransportException(TTransportException::BAD_ARGS, |
| 543 | "Invalid MemoryPolicy for TMemoryBuffer" ); |
| 544 | } |
| 545 | } |
| 546 | |
| 547 | ~TMemoryBuffer() { |
| 548 | if (owner_) { |
| 549 | std::free(buffer_); |
| 550 | } |
| 551 | } |
| 552 | |
| 553 | bool isOpen() { return true; } |
| 554 | |
| 555 | bool peek() { return (rBase_ < wBase_); } |
| 556 | |
| 557 | void open() {} |
| 558 | |
| 559 | void close() {} |
| 560 | |
| 561 | // TODO(dreiss): Make bufPtr const. |
| 562 | void getBuffer(uint8_t** bufPtr, uint32_t* sz) { |
| 563 | *bufPtr = rBase_; |
| 564 | *sz = static_cast<uint32_t>(wBase_ - rBase_); |
| 565 | } |
| 566 | |
| 567 | std::string getBufferAsString() { |
| 568 | if (buffer_ == NULL) { |
| 569 | return "" ; |
| 570 | } |
| 571 | uint8_t* buf; |
| 572 | uint32_t sz; |
| 573 | getBuffer(&buf, &sz); |
| 574 | return std::string((char*)buf, (std::string::size_type)sz); |
| 575 | } |
| 576 | |
| 577 | void appendBufferToString(std::string& str) { |
| 578 | if (buffer_ == NULL) { |
| 579 | return; |
| 580 | } |
| 581 | uint8_t* buf; |
| 582 | uint32_t sz; |
| 583 | getBuffer(&buf, &sz); |
| 584 | str.append((char*)buf, sz); |
| 585 | } |
| 586 | |
| 587 | void resetBuffer() { |
| 588 | rBase_ = buffer_; |
| 589 | rBound_ = buffer_; |
| 590 | wBase_ = buffer_; |
| 591 | // It isn't safe to write into a buffer we don't own. |
| 592 | if (!owner_) { |
| 593 | wBound_ = wBase_; |
| 594 | bufferSize_ = 0; |
| 595 | } |
| 596 | } |
| 597 | |
| 598 | /// See constructor documentation. |
| 599 | void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
| 600 | // Use a variant of the copy-and-swap trick for assignment operators. |
| 601 | // This is sub-optimal in terms of performance for two reasons: |
| 602 | // 1/ The constructing and swapping of the (small) values |
| 603 | // in the temporary object takes some time, and is not necessary. |
| 604 | // 2/ If policy == COPY, we allocate the new buffer before |
| 605 | // freeing the old one, precluding the possibility of |
| 606 | // reusing that memory. |
| 607 | // I doubt that either of these problems could be optimized away, |
| 608 | // but the second is probably no a common case, and the first is minor. |
| 609 | // I don't expect resetBuffer to be a common operation, so I'm willing to |
| 610 | // bite the performance bullet to make the method this simple. |
| 611 | |
| 612 | // Construct the new buffer. |
| 613 | TMemoryBuffer new_buffer(buf, sz, policy); |
| 614 | // Move it into ourself. |
| 615 | this->swap(new_buffer); |
| 616 | // Our old self gets destroyed. |
| 617 | } |
| 618 | |
| 619 | /// See constructor documentation. |
| 620 | void resetBuffer(uint32_t sz) { |
| 621 | // Construct the new buffer. |
| 622 | TMemoryBuffer new_buffer(sz); |
| 623 | // Move it into ourself. |
| 624 | this->swap(new_buffer); |
| 625 | // Our old self gets destroyed. |
| 626 | } |
| 627 | |
| 628 | std::string readAsString(uint32_t len) { |
| 629 | std::string str; |
| 630 | (void)readAppendToString(str, len); |
| 631 | return str; |
| 632 | } |
| 633 | |
| 634 | uint32_t readAppendToString(std::string& str, uint32_t len); |
| 635 | |
| 636 | // return number of bytes read |
| 637 | uint32_t readEnd() { |
| 638 | // This cast should be safe, because buffer_'s size is a uint32_t |
| 639 | uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_); |
| 640 | if (rBase_ == wBase_) { |
| 641 | resetBuffer(); |
| 642 | } |
| 643 | return bytes; |
| 644 | } |
| 645 | |
| 646 | // Return number of bytes written |
| 647 | uint32_t writeEnd() { |
| 648 | // This cast should be safe, because buffer_'s size is a uint32_t |
| 649 | return static_cast<uint32_t>(wBase_ - buffer_); |
| 650 | } |
| 651 | |
| 652 | uint32_t available_read() const { |
| 653 | // Remember, wBase_ is the real rBound_. |
| 654 | return static_cast<uint32_t>(wBase_ - rBase_); |
| 655 | } |
| 656 | |
| 657 | uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); } |
| 658 | |
| 659 | // Returns a pointer to where the client can write data to append to |
| 660 | // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a |
| 661 | // write of the provided length. The returned pointer is very convenient for |
| 662 | // passing to read(), recv(), or similar. You must call wroteBytes() as soon |
| 663 | // as data is written or the buffer will not be aware that data has changed. |
| 664 | uint8_t* getWritePtr(uint32_t len) { |
| 665 | ensureCanWrite(len); |
| 666 | return wBase_; |
| 667 | } |
| 668 | |
| 669 | // Informs the buffer that the client has written 'len' bytes into storage |
| 670 | // that had been provided by getWritePtr(). |
| 671 | void wroteBytes(uint32_t len); |
| 672 | |
| 673 | /* |
| 674 | * TVirtualTransport provides a default implementation of readAll(). |
| 675 | * We want to use the TBufferBase version instead. |
| 676 | */ |
| 677 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } |
| 678 | |
| 679 | //! \brief Get the current buffer size |
| 680 | //! \returns the current buffer size |
| 681 | uint32_t getBufferSize() const { |
| 682 | return bufferSize_; |
| 683 | } |
| 684 | |
| 685 | //! \brief Get the current maximum buffer size |
| 686 | //! \returns the current maximum buffer size |
| 687 | uint32_t getMaxBufferSize() const { |
| 688 | return maxBufferSize_; |
| 689 | } |
| 690 | |
| 691 | //! \brief Change the maximum buffer size |
| 692 | //! \param[in] maxSize the new maximum buffer size allowed to grow to |
| 693 | //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size |
| 694 | void setMaxBufferSize(uint32_t maxSize) { |
| 695 | if (maxSize < bufferSize_) { |
| 696 | throw TTransportException(TTransportException::BAD_ARGS, |
| 697 | "Maximum buffer size would be less than current buffer size" ); |
| 698 | } |
| 699 | maxBufferSize_ = maxSize; |
| 700 | } |
| 701 | |
| 702 | protected: |
| 703 | void swap(TMemoryBuffer& that) { |
| 704 | using std::swap; |
| 705 | swap(buffer_, that.buffer_); |
| 706 | swap(bufferSize_, that.bufferSize_); |
| 707 | |
| 708 | swap(rBase_, that.rBase_); |
| 709 | swap(rBound_, that.rBound_); |
| 710 | swap(wBase_, that.wBase_); |
| 711 | swap(wBound_, that.wBound_); |
| 712 | |
| 713 | swap(owner_, that.owner_); |
| 714 | } |
| 715 | |
| 716 | // Make sure there's at least 'len' bytes available for writing. |
| 717 | void ensureCanWrite(uint32_t len); |
| 718 | |
| 719 | // Compute the position and available data for reading. |
| 720 | void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); |
| 721 | |
| 722 | uint32_t readSlow(uint8_t* buf, uint32_t len); |
| 723 | |
| 724 | void writeSlow(const uint8_t* buf, uint32_t len); |
| 725 | |
| 726 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
| 727 | |
| 728 | // Data buffer |
| 729 | uint8_t* buffer_; |
| 730 | |
| 731 | // Allocated buffer size |
| 732 | uint32_t bufferSize_; |
| 733 | |
| 734 | // Maximum allowed size |
| 735 | uint32_t maxBufferSize_; |
| 736 | |
| 737 | // Is this object the owner of the buffer? |
| 738 | bool owner_; |
| 739 | |
| 740 | // Don't forget to update constrctors, initCommon, and swap if |
| 741 | // you add new members. |
| 742 | }; |
| 743 | } |
| 744 | } |
| 745 | } // apache::thrift::transport |
| 746 | |
| 747 | #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
| 748 | |