1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
21#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
22
23#include <cstdlib>
24#include <cstddef>
25#include <cstring>
26#include <limits>
27//#include <boost/scoped_array.hpp> // FUCK OFF
28
29#include <thrift/transport/TTransport.h>
30#include <thrift/transport/TVirtualTransport.h>
31
32#ifdef __GNUC__
33#define TDB_LIKELY(val) (__builtin_expect((val), 1))
34#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
35#else
36#define TDB_LIKELY(val) (val)
37#define TDB_UNLIKELY(val) (val)
38#endif
39
40namespace apache {
41namespace thrift {
42namespace transport {
43
44/**
45 * Base class for all transports that use read/write buffers for performance.
46 *
47 * TBufferBase is designed to implement the fast-path "memcpy" style
48 * operations that work in the common case. It does so with small and
49 * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract
50 * class. Subclasses are expected to define the "slow path" operations
51 * that have to be done when the buffers are full or empty.
52 *
53 */
54class TBufferBase : public TVirtualTransport<TBufferBase> {
55
56public:
57 /**
58 * Fast-path read.
59 *
60 * When we have enough data buffered to fulfill the read, we can satisfy it
61 * with a single memcpy, then adjust our internal pointers. If the buffer
62 * is empty, we call out to our slow path, implemented by a subclass.
63 * This method is meant to eventually be nonvirtual and inlinable.
64 */
65 uint32_t read(uint8_t* buf, uint32_t len) {
66 uint8_t* new_rBase = rBase_ + len;
67 if (TDB_LIKELY(new_rBase <= rBound_)) {
68 std::memcpy(buf, rBase_, len);
69 rBase_ = new_rBase;
70 return len;
71 }
72 return readSlow(buf, len);
73 }
74
75 /**
76 * Shortcutted version of readAll.
77 */
78 uint32_t readAll(uint8_t* buf, uint32_t len) {
79 uint8_t* new_rBase = rBase_ + len;
80 if (TDB_LIKELY(new_rBase <= rBound_)) {
81 std::memcpy(buf, rBase_, len);
82 rBase_ = new_rBase;
83 return len;
84 }
85 return apache::thrift::transport::readAll(*this, buf, len);
86 }
87
88 /**
89 * Fast-path write.
90 *
91 * When we have enough empty space in our buffer to accommodate the write, we
92 * can satisfy it with a single memcpy, then adjust our internal pointers.
93 * If the buffer is full, we call out to our slow path, implemented by a
94 * subclass. This method is meant to eventually be nonvirtual and
95 * inlinable.
96 */
97 void write(const uint8_t* buf, uint32_t len) {
98 uint8_t* new_wBase = wBase_ + len;
99 if (TDB_LIKELY(new_wBase <= wBound_)) {
100 std::memcpy(wBase_, buf, len);
101 wBase_ = new_wBase;
102 return;
103 }
104 writeSlow(buf, len);
105 }
106
107 /**
108 * Fast-path borrow. A lot like the fast-path read.
109 */
110 const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
111 if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
112 // With strict aliasing, writing to len shouldn't force us to
113 // refetch rBase_ from memory. TODO(dreiss): Verify this.
114 *len = static_cast<uint32_t>(rBound_ - rBase_);
115 return rBase_;
116 }
117 return borrowSlow(buf, len);
118 }
119
120 /**
121 * Consume doesn't require a slow path.
122 */
123 void consume(uint32_t len) {
124 if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
125 rBase_ += len;
126 } else {
127 throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow.");
128 }
129 }
130
131protected:
132 /// Slow path read.
133 virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
134
135 /// Slow path write.
136 virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
137
138 /**
139 * Slow path borrow.
140 *
141 * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
142 */
143 virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
144
145 /**
146 * Trivial constructor.
147 *
148 * Initialize pointers safely. Constructing is not a very
149 * performance-sensitive operation, so it is okay to just leave it to
150 * the concrete class to set up pointers correctly.
151 */
152 TBufferBase() : rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {}
153
154 /// Convenience mutator for setting the read buffer.
155 void setReadBuffer(uint8_t* buf, uint32_t len) {
156 rBase_ = buf;
157 rBound_ = buf + len;
158 }
159
160 /// Convenience mutator for setting the write buffer.
161 void setWriteBuffer(uint8_t* buf, uint32_t len) {
162 wBase_ = buf;
163 wBound_ = buf + len;
164 }
165
166 ~TBufferBase() override = default;
167
168 /// Reads begin here.
169 uint8_t* rBase_;
170 /// Reads may extend to just before here.
171 uint8_t* rBound_;
172
173 /// Writes begin here.
174 uint8_t* wBase_;
175 /// Writes may extend to just before here.
176 uint8_t* wBound_;
177};
178
179
180/**
181 * A memory buffer is a tranpsort that simply reads from and writes to an
182 * in memory buffer. Anytime you call write on it, the data is simply placed
183 * into a buffer, and anytime you call read, data is read from that buffer.
184 *
185 * The buffers are allocated using C constructs malloc,realloc, and the size
186 * doubles as necessary. We've considered using scoped
187 *
188 */
189class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
190private:
191 // Common initialization done by all constructors.
192 void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
193
194 maxBufferSize_ = (std::numeric_limits<uint32_t>::max)();
195
196 if (buf == nullptr && size != 0) {
197 assert(owner);
198 buf = (uint8_t*)std::malloc(size);
199 if (buf == nullptr) {
200 throw std::bad_alloc();
201 }
202 }
203
204 buffer_ = buf;
205 bufferSize_ = size;
206
207 rBase_ = buffer_;
208 rBound_ = buffer_ + wPos;
209 // TODO(dreiss): Investigate NULL-ing this if !owner.
210 wBase_ = buffer_ + wPos;
211 wBound_ = buffer_ + bufferSize_;
212
213 owner_ = owner;
214
215 // rBound_ is really an artifact. In principle, it should always be
216 // equal to wBase_. We update it in a few places (computeRead, etc.).
217 }
218
219public:
220 static const uint32_t defaultSize = 1024;
221
222 /**
223 * This enum specifies how a TMemoryBuffer should treat
224 * memory passed to it via constructors or resetBuffer.
225 *
226 * OBSERVE:
227 * TMemoryBuffer will simply store a pointer to the memory.
228 * It is the callers responsibility to ensure that the pointer
229 * remains valid for the lifetime of the TMemoryBuffer,
230 * and that it is properly cleaned up.
231 * Note that no data can be written to observed buffers.
232 *
233 * COPY:
234 * TMemoryBuffer will make an internal copy of the buffer.
235 * The caller has no responsibilities.
236 *
237 * TAKE_OWNERSHIP:
238 * TMemoryBuffer will become the "owner" of the buffer,
239 * and will be responsible for freeing it.
240 * The membory must have been allocated with malloc.
241 */
242 enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 };
243
244 /**
245 * Construct a TMemoryBuffer with a default-sized buffer,
246 * owned by the TMemoryBuffer object.
247 */
248 TMemoryBuffer() { initCommon(nullptr, defaultSize, true, 0); }
249
250 /**
251 * Construct a TMemoryBuffer with a buffer of a specified size,
252 * owned by the TMemoryBuffer object.
253 *
254 * @param sz The initial size of the buffer.
255 */
256 TMemoryBuffer(uint32_t sz) { initCommon(nullptr, sz, true, 0); }
257
258 /**
259 * Construct a TMemoryBuffer with buf as its initial contents.
260 *
261 * @param buf The initial contents of the buffer.
262 * Note that, while buf is a non-const pointer,
263 * TMemoryBuffer will not write to it if policy == OBSERVE,
264 * so it is safe to const_cast<uint8_t*>(whatever).
265 * @param sz The size of @c buf.
266 * @param policy See @link MemoryPolicy @endlink .
267 */
268 TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
269 if (buf == nullptr && sz != 0) {
270 throw TTransportException(TTransportException::BAD_ARGS,
271 "TMemoryBuffer given null buffer with non-zero size.");
272 }
273
274 switch (policy) {
275 case OBSERVE:
276 case TAKE_OWNERSHIP:
277 initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
278 break;
279 case COPY:
280 initCommon(nullptr, sz, true, 0);
281 this->write(buf, sz);
282 break;
283 default:
284 throw TTransportException(TTransportException::BAD_ARGS,
285 "Invalid MemoryPolicy for TMemoryBuffer");
286 }
287 }
288
289 ~TMemoryBuffer() override {
290 if (owner_) {
291 std::free(buffer_);
292 }
293 }
294
295 bool isOpen() const override { return true; }
296
297 bool peek() override { return (rBase_ < wBase_); }
298
299 void open() override {}
300
301 void close() override {}
302
303 // TODO(dreiss): Make bufPtr const.
304 void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
305 *bufPtr = rBase_;
306 *sz = static_cast<uint32_t>(wBase_ - rBase_);
307 }
308
309 std::string getBufferAsString() {
310 if (buffer_ == nullptr) {
311 return "";
312 }
313 uint8_t* buf;
314 uint32_t sz;
315 getBuffer(&buf, &sz);
316 return std::string((char*)buf, (std::string::size_type)sz);
317 }
318
319 void appendBufferToString(std::string& str) {
320 if (buffer_ == nullptr) {
321 return;
322 }
323 uint8_t* buf;
324 uint32_t sz;
325 getBuffer(&buf, &sz);
326 str.append((char*)buf, sz);
327 }
328
329 void resetBuffer() {
330 rBase_ = buffer_;
331 rBound_ = buffer_;
332 wBase_ = buffer_;
333 // It isn't safe to write into a buffer we don't own.
334 if (!owner_) {
335 wBound_ = wBase_;
336 bufferSize_ = 0;
337 }
338 }
339
340 /// See constructor documentation.
341 void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
342 // Use a variant of the copy-and-swap trick for assignment operators.
343 // This is sub-optimal in terms of performance for two reasons:
344 // 1/ The constructing and swapping of the (small) values
345 // in the temporary object takes some time, and is not necessary.
346 // 2/ If policy == COPY, we allocate the new buffer before
347 // freeing the old one, precluding the possibility of
348 // reusing that memory.
349 // I doubt that either of these problems could be optimized away,
350 // but the second is probably no a common case, and the first is minor.
351 // I don't expect resetBuffer to be a common operation, so I'm willing to
352 // bite the performance bullet to make the method this simple.
353
354 // Construct the new buffer.
355 TMemoryBuffer new_buffer(buf, sz, policy);
356 // Move it into ourself.
357 this->swap(new_buffer);
358 // Our old self gets destroyed.
359 }
360
361 /// See constructor documentation.
362 void resetBuffer(uint32_t sz) {
363 // Construct the new buffer.
364 TMemoryBuffer new_buffer(sz);
365 // Move it into ourself.
366 this->swap(new_buffer);
367 // Our old self gets destroyed.
368 }
369
370 std::string readAsString(uint32_t len) {
371 std::string str;
372 (void)readAppendToString(str, len);
373 return str;
374 }
375
376 uint32_t readAppendToString(std::string& str, uint32_t len);
377
378 // return number of bytes read
379 uint32_t readEnd() override {
380 // This cast should be safe, because buffer_'s size is a uint32_t
381 auto bytes = static_cast<uint32_t>(rBase_ - buffer_);
382 if (rBase_ == wBase_) {
383 resetBuffer();
384 }
385 return bytes;
386 }
387
388 // Return number of bytes written
389 uint32_t writeEnd() override {
390 // This cast should be safe, because buffer_'s size is a uint32_t
391 return static_cast<uint32_t>(wBase_ - buffer_);
392 }
393
394 uint32_t available_read() const {
395 // Remember, wBase_ is the real rBound_.
396 return static_cast<uint32_t>(wBase_ - rBase_);
397 }
398
399 uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); }
400
401 // Returns a pointer to where the client can write data to append to
402 // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a
403 // write of the provided length. The returned pointer is very convenient for
404 // passing to read(), recv(), or similar. You must call wroteBytes() as soon
405 // as data is written or the buffer will not be aware that data has changed.
406 uint8_t* getWritePtr(uint32_t len) {
407 ensureCanWrite(len);
408 return wBase_;
409 }
410
411 // Informs the buffer that the client has written 'len' bytes into storage
412 // that had been provided by getWritePtr().
413 void wroteBytes(uint32_t len);
414
415 /*
416 * TVirtualTransport provides a default implementation of readAll().
417 * We want to use the TBufferBase version instead.
418 */
419 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
420
421 //! \brief Get the current buffer size
422 //! \returns the current buffer size
423 uint32_t getBufferSize() const {
424 return bufferSize_;
425 }
426
427 //! \brief Get the current maximum buffer size
428 //! \returns the current maximum buffer size
429 uint32_t getMaxBufferSize() const {
430 return maxBufferSize_;
431 }
432
433 //! \brief Change the maximum buffer size
434 //! \param[in] maxSize the new maximum buffer size allowed to grow to
435 //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size
436 void setMaxBufferSize(uint32_t maxSize) {
437 if (maxSize < bufferSize_) {
438 throw TTransportException(TTransportException::BAD_ARGS,
439 "Maximum buffer size would be less than current buffer size");
440 }
441 maxBufferSize_ = maxSize;
442 }
443
444protected:
445 void swap(TMemoryBuffer& that) {
446 using std::swap;
447 swap(buffer_, that.buffer_);
448 swap(bufferSize_, that.bufferSize_);
449
450 swap(rBase_, that.rBase_);
451 swap(rBound_, that.rBound_);
452 swap(wBase_, that.wBase_);
453 swap(wBound_, that.wBound_);
454
455 swap(owner_, that.owner_);
456 }
457
458 // Make sure there's at least 'len' bytes available for writing.
459 void ensureCanWrite(uint32_t len);
460
461 // Compute the position and available data for reading.
462 void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
463
464 uint32_t readSlow(uint8_t* buf, uint32_t len) override;
465
466 void writeSlow(const uint8_t* buf, uint32_t len) override;
467
468 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
469
470 // Data buffer
471 uint8_t* buffer_;
472
473 // Allocated buffer size
474 uint32_t bufferSize_;
475
476 // Maximum allowed size
477 uint32_t maxBufferSize_;
478
479 // Is this object the owner of the buffer?
480 bool owner_;
481
482 // Don't forget to update constrctors, initCommon, and swap if
483 // you add new members.
484};
485}
486}
487} // apache::thrift::transport
488
489#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
490