1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | |
20 | #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
21 | #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 |
22 | |
23 | #include <cstdlib> |
24 | #include <cstddef> |
25 | #include <cstring> |
26 | #include <limits> |
27 | //#include <boost/scoped_array.hpp> // FUCK OFF |
28 | |
29 | #include <thrift/transport/TTransport.h> |
30 | #include <thrift/transport/TVirtualTransport.h> |
31 | |
32 | #ifdef __GNUC__ |
33 | #define TDB_LIKELY(val) (__builtin_expect((val), 1)) |
34 | #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) |
35 | #else |
36 | #define TDB_LIKELY(val) (val) |
37 | #define TDB_UNLIKELY(val) (val) |
38 | #endif |
39 | |
40 | namespace apache { |
41 | namespace thrift { |
42 | namespace transport { |
43 | |
44 | /** |
45 | * Base class for all transports that use read/write buffers for performance. |
46 | * |
47 | * TBufferBase is designed to implement the fast-path "memcpy" style |
48 | * operations that work in the common case. It does so with small and |
49 | * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract |
50 | * class. Subclasses are expected to define the "slow path" operations |
51 | * that have to be done when the buffers are full or empty. |
52 | * |
53 | */ |
54 | class TBufferBase : public TVirtualTransport<TBufferBase> { |
55 | |
56 | public: |
57 | /** |
58 | * Fast-path read. |
59 | * |
60 | * When we have enough data buffered to fulfill the read, we can satisfy it |
61 | * with a single memcpy, then adjust our internal pointers. If the buffer |
62 | * is empty, we call out to our slow path, implemented by a subclass. |
63 | * This method is meant to eventually be nonvirtual and inlinable. |
64 | */ |
65 | uint32_t read(uint8_t* buf, uint32_t len) { |
66 | uint8_t* new_rBase = rBase_ + len; |
67 | if (TDB_LIKELY(new_rBase <= rBound_)) { |
68 | std::memcpy(buf, rBase_, len); |
69 | rBase_ = new_rBase; |
70 | return len; |
71 | } |
72 | return readSlow(buf, len); |
73 | } |
74 | |
75 | /** |
76 | * Shortcutted version of readAll. |
77 | */ |
78 | uint32_t readAll(uint8_t* buf, uint32_t len) { |
79 | uint8_t* new_rBase = rBase_ + len; |
80 | if (TDB_LIKELY(new_rBase <= rBound_)) { |
81 | std::memcpy(buf, rBase_, len); |
82 | rBase_ = new_rBase; |
83 | return len; |
84 | } |
85 | return apache::thrift::transport::readAll(*this, buf, len); |
86 | } |
87 | |
88 | /** |
89 | * Fast-path write. |
90 | * |
91 | * When we have enough empty space in our buffer to accommodate the write, we |
92 | * can satisfy it with a single memcpy, then adjust our internal pointers. |
93 | * If the buffer is full, we call out to our slow path, implemented by a |
94 | * subclass. This method is meant to eventually be nonvirtual and |
95 | * inlinable. |
96 | */ |
97 | void write(const uint8_t* buf, uint32_t len) { |
98 | uint8_t* new_wBase = wBase_ + len; |
99 | if (TDB_LIKELY(new_wBase <= wBound_)) { |
100 | std::memcpy(wBase_, buf, len); |
101 | wBase_ = new_wBase; |
102 | return; |
103 | } |
104 | writeSlow(buf, len); |
105 | } |
106 | |
107 | /** |
108 | * Fast-path borrow. A lot like the fast-path read. |
109 | */ |
110 | const uint8_t* borrow(uint8_t* buf, uint32_t* len) { |
111 | if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { |
112 | // With strict aliasing, writing to len shouldn't force us to |
113 | // refetch rBase_ from memory. TODO(dreiss): Verify this. |
114 | *len = static_cast<uint32_t>(rBound_ - rBase_); |
115 | return rBase_; |
116 | } |
117 | return borrowSlow(buf, len); |
118 | } |
119 | |
120 | /** |
121 | * Consume doesn't require a slow path. |
122 | */ |
123 | void consume(uint32_t len) { |
124 | if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { |
125 | rBase_ += len; |
126 | } else { |
127 | throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow." ); |
128 | } |
129 | } |
130 | |
131 | protected: |
132 | /// Slow path read. |
133 | virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; |
134 | |
135 | /// Slow path write. |
136 | virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; |
137 | |
138 | /** |
139 | * Slow path borrow. |
140 | * |
141 | * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len |
142 | */ |
143 | virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; |
144 | |
145 | /** |
146 | * Trivial constructor. |
147 | * |
148 | * Initialize pointers safely. Constructing is not a very |
149 | * performance-sensitive operation, so it is okay to just leave it to |
150 | * the concrete class to set up pointers correctly. |
151 | */ |
152 | TBufferBase() : rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {} |
153 | |
154 | /// Convenience mutator for setting the read buffer. |
155 | void setReadBuffer(uint8_t* buf, uint32_t len) { |
156 | rBase_ = buf; |
157 | rBound_ = buf + len; |
158 | } |
159 | |
160 | /// Convenience mutator for setting the write buffer. |
161 | void setWriteBuffer(uint8_t* buf, uint32_t len) { |
162 | wBase_ = buf; |
163 | wBound_ = buf + len; |
164 | } |
165 | |
166 | ~TBufferBase() override = default; |
167 | |
168 | /// Reads begin here. |
169 | uint8_t* rBase_; |
170 | /// Reads may extend to just before here. |
171 | uint8_t* rBound_; |
172 | |
173 | /// Writes begin here. |
174 | uint8_t* wBase_; |
175 | /// Writes may extend to just before here. |
176 | uint8_t* wBound_; |
177 | }; |
178 | |
179 | |
180 | /** |
181 | * A memory buffer is a tranpsort that simply reads from and writes to an |
182 | * in memory buffer. Anytime you call write on it, the data is simply placed |
183 | * into a buffer, and anytime you call read, data is read from that buffer. |
184 | * |
185 | * The buffers are allocated using C constructs malloc,realloc, and the size |
186 | * doubles as necessary. We've considered using scoped |
187 | * |
188 | */ |
189 | class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { |
190 | private: |
191 | // Common initialization done by all constructors. |
192 | void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { |
193 | |
194 | maxBufferSize_ = (std::numeric_limits<uint32_t>::max)(); |
195 | |
196 | if (buf == nullptr && size != 0) { |
197 | assert(owner); |
198 | buf = (uint8_t*)std::malloc(size); |
199 | if (buf == nullptr) { |
200 | throw std::bad_alloc(); |
201 | } |
202 | } |
203 | |
204 | buffer_ = buf; |
205 | bufferSize_ = size; |
206 | |
207 | rBase_ = buffer_; |
208 | rBound_ = buffer_ + wPos; |
209 | // TODO(dreiss): Investigate NULL-ing this if !owner. |
210 | wBase_ = buffer_ + wPos; |
211 | wBound_ = buffer_ + bufferSize_; |
212 | |
213 | owner_ = owner; |
214 | |
215 | // rBound_ is really an artifact. In principle, it should always be |
216 | // equal to wBase_. We update it in a few places (computeRead, etc.). |
217 | } |
218 | |
219 | public: |
220 | static const uint32_t defaultSize = 1024; |
221 | |
222 | /** |
223 | * This enum specifies how a TMemoryBuffer should treat |
224 | * memory passed to it via constructors or resetBuffer. |
225 | * |
226 | * OBSERVE: |
227 | * TMemoryBuffer will simply store a pointer to the memory. |
228 | * It is the callers responsibility to ensure that the pointer |
229 | * remains valid for the lifetime of the TMemoryBuffer, |
230 | * and that it is properly cleaned up. |
231 | * Note that no data can be written to observed buffers. |
232 | * |
233 | * COPY: |
234 | * TMemoryBuffer will make an internal copy of the buffer. |
235 | * The caller has no responsibilities. |
236 | * |
237 | * TAKE_OWNERSHIP: |
238 | * TMemoryBuffer will become the "owner" of the buffer, |
239 | * and will be responsible for freeing it. |
240 | * The membory must have been allocated with malloc. |
241 | */ |
242 | enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 }; |
243 | |
244 | /** |
245 | * Construct a TMemoryBuffer with a default-sized buffer, |
246 | * owned by the TMemoryBuffer object. |
247 | */ |
248 | TMemoryBuffer() { initCommon(nullptr, defaultSize, true, 0); } |
249 | |
250 | /** |
251 | * Construct a TMemoryBuffer with a buffer of a specified size, |
252 | * owned by the TMemoryBuffer object. |
253 | * |
254 | * @param sz The initial size of the buffer. |
255 | */ |
256 | TMemoryBuffer(uint32_t sz) { initCommon(nullptr, sz, true, 0); } |
257 | |
258 | /** |
259 | * Construct a TMemoryBuffer with buf as its initial contents. |
260 | * |
261 | * @param buf The initial contents of the buffer. |
262 | * Note that, while buf is a non-const pointer, |
263 | * TMemoryBuffer will not write to it if policy == OBSERVE, |
264 | * so it is safe to const_cast<uint8_t*>(whatever). |
265 | * @param sz The size of @c buf. |
266 | * @param policy See @link MemoryPolicy @endlink . |
267 | */ |
268 | TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
269 | if (buf == nullptr && sz != 0) { |
270 | throw TTransportException(TTransportException::BAD_ARGS, |
271 | "TMemoryBuffer given null buffer with non-zero size." ); |
272 | } |
273 | |
274 | switch (policy) { |
275 | case OBSERVE: |
276 | case TAKE_OWNERSHIP: |
277 | initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); |
278 | break; |
279 | case COPY: |
280 | initCommon(nullptr, sz, true, 0); |
281 | this->write(buf, sz); |
282 | break; |
283 | default: |
284 | throw TTransportException(TTransportException::BAD_ARGS, |
285 | "Invalid MemoryPolicy for TMemoryBuffer" ); |
286 | } |
287 | } |
288 | |
289 | ~TMemoryBuffer() override { |
290 | if (owner_) { |
291 | std::free(buffer_); |
292 | } |
293 | } |
294 | |
295 | bool isOpen() const override { return true; } |
296 | |
297 | bool peek() override { return (rBase_ < wBase_); } |
298 | |
299 | void open() override {} |
300 | |
301 | void close() override {} |
302 | |
303 | // TODO(dreiss): Make bufPtr const. |
304 | void getBuffer(uint8_t** bufPtr, uint32_t* sz) { |
305 | *bufPtr = rBase_; |
306 | *sz = static_cast<uint32_t>(wBase_ - rBase_); |
307 | } |
308 | |
309 | std::string getBufferAsString() { |
310 | if (buffer_ == nullptr) { |
311 | return "" ; |
312 | } |
313 | uint8_t* buf; |
314 | uint32_t sz; |
315 | getBuffer(&buf, &sz); |
316 | return std::string((char*)buf, (std::string::size_type)sz); |
317 | } |
318 | |
319 | void appendBufferToString(std::string& str) { |
320 | if (buffer_ == nullptr) { |
321 | return; |
322 | } |
323 | uint8_t* buf; |
324 | uint32_t sz; |
325 | getBuffer(&buf, &sz); |
326 | str.append((char*)buf, sz); |
327 | } |
328 | |
329 | void resetBuffer() { |
330 | rBase_ = buffer_; |
331 | rBound_ = buffer_; |
332 | wBase_ = buffer_; |
333 | // It isn't safe to write into a buffer we don't own. |
334 | if (!owner_) { |
335 | wBound_ = wBase_; |
336 | bufferSize_ = 0; |
337 | } |
338 | } |
339 | |
340 | /// See constructor documentation. |
341 | void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
342 | // Use a variant of the copy-and-swap trick for assignment operators. |
343 | // This is sub-optimal in terms of performance for two reasons: |
344 | // 1/ The constructing and swapping of the (small) values |
345 | // in the temporary object takes some time, and is not necessary. |
346 | // 2/ If policy == COPY, we allocate the new buffer before |
347 | // freeing the old one, precluding the possibility of |
348 | // reusing that memory. |
349 | // I doubt that either of these problems could be optimized away, |
350 | // but the second is probably no a common case, and the first is minor. |
351 | // I don't expect resetBuffer to be a common operation, so I'm willing to |
352 | // bite the performance bullet to make the method this simple. |
353 | |
354 | // Construct the new buffer. |
355 | TMemoryBuffer new_buffer(buf, sz, policy); |
356 | // Move it into ourself. |
357 | this->swap(new_buffer); |
358 | // Our old self gets destroyed. |
359 | } |
360 | |
361 | /// See constructor documentation. |
362 | void resetBuffer(uint32_t sz) { |
363 | // Construct the new buffer. |
364 | TMemoryBuffer new_buffer(sz); |
365 | // Move it into ourself. |
366 | this->swap(new_buffer); |
367 | // Our old self gets destroyed. |
368 | } |
369 | |
370 | std::string readAsString(uint32_t len) { |
371 | std::string str; |
372 | (void)readAppendToString(str, len); |
373 | return str; |
374 | } |
375 | |
376 | uint32_t readAppendToString(std::string& str, uint32_t len); |
377 | |
378 | // return number of bytes read |
379 | uint32_t readEnd() override { |
380 | // This cast should be safe, because buffer_'s size is a uint32_t |
381 | auto bytes = static_cast<uint32_t>(rBase_ - buffer_); |
382 | if (rBase_ == wBase_) { |
383 | resetBuffer(); |
384 | } |
385 | return bytes; |
386 | } |
387 | |
388 | // Return number of bytes written |
389 | uint32_t writeEnd() override { |
390 | // This cast should be safe, because buffer_'s size is a uint32_t |
391 | return static_cast<uint32_t>(wBase_ - buffer_); |
392 | } |
393 | |
394 | uint32_t available_read() const { |
395 | // Remember, wBase_ is the real rBound_. |
396 | return static_cast<uint32_t>(wBase_ - rBase_); |
397 | } |
398 | |
399 | uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); } |
400 | |
401 | // Returns a pointer to where the client can write data to append to |
402 | // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a |
403 | // write of the provided length. The returned pointer is very convenient for |
404 | // passing to read(), recv(), or similar. You must call wroteBytes() as soon |
405 | // as data is written or the buffer will not be aware that data has changed. |
406 | uint8_t* getWritePtr(uint32_t len) { |
407 | ensureCanWrite(len); |
408 | return wBase_; |
409 | } |
410 | |
411 | // Informs the buffer that the client has written 'len' bytes into storage |
412 | // that had been provided by getWritePtr(). |
413 | void wroteBytes(uint32_t len); |
414 | |
415 | /* |
416 | * TVirtualTransport provides a default implementation of readAll(). |
417 | * We want to use the TBufferBase version instead. |
418 | */ |
419 | uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } |
420 | |
421 | //! \brief Get the current buffer size |
422 | //! \returns the current buffer size |
423 | uint32_t getBufferSize() const { |
424 | return bufferSize_; |
425 | } |
426 | |
427 | //! \brief Get the current maximum buffer size |
428 | //! \returns the current maximum buffer size |
429 | uint32_t getMaxBufferSize() const { |
430 | return maxBufferSize_; |
431 | } |
432 | |
433 | //! \brief Change the maximum buffer size |
434 | //! \param[in] maxSize the new maximum buffer size allowed to grow to |
435 | //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size |
436 | void setMaxBufferSize(uint32_t maxSize) { |
437 | if (maxSize < bufferSize_) { |
438 | throw TTransportException(TTransportException::BAD_ARGS, |
439 | "Maximum buffer size would be less than current buffer size" ); |
440 | } |
441 | maxBufferSize_ = maxSize; |
442 | } |
443 | |
444 | protected: |
445 | void swap(TMemoryBuffer& that) { |
446 | using std::swap; |
447 | swap(buffer_, that.buffer_); |
448 | swap(bufferSize_, that.bufferSize_); |
449 | |
450 | swap(rBase_, that.rBase_); |
451 | swap(rBound_, that.rBound_); |
452 | swap(wBase_, that.wBase_); |
453 | swap(wBound_, that.wBound_); |
454 | |
455 | swap(owner_, that.owner_); |
456 | } |
457 | |
458 | // Make sure there's at least 'len' bytes available for writing. |
459 | void ensureCanWrite(uint32_t len); |
460 | |
461 | // Compute the position and available data for reading. |
462 | void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); |
463 | |
464 | uint32_t readSlow(uint8_t* buf, uint32_t len) override; |
465 | |
466 | void writeSlow(const uint8_t* buf, uint32_t len) override; |
467 | |
468 | const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; |
469 | |
470 | // Data buffer |
471 | uint8_t* buffer_; |
472 | |
473 | // Allocated buffer size |
474 | uint32_t bufferSize_; |
475 | |
476 | // Maximum allowed size |
477 | uint32_t maxBufferSize_; |
478 | |
479 | // Is this object the owner of the buffer? |
480 | bool owner_; |
481 | |
482 | // Don't forget to update constrctors, initCommon, and swap if |
483 | // you add new members. |
484 | }; |
485 | } |
486 | } |
487 | } // apache::thrift::transport |
488 | |
489 | #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
490 | |