| 1 | #pragma once | 
|---|
| 2 | #include <IO/ReadBuffer.h> | 
|---|
| 3 | #include <IO/BufferWithOwnMemory.h> | 
|---|
| 4 |  | 
|---|
| 5 | namespace DB | 
|---|
| 6 | { | 
|---|
| 7 |  | 
|---|
| 8 | namespace ErrorCodes | 
|---|
| 9 | { | 
|---|
| 10 | extern const int LOGICAL_ERROR; | 
|---|
| 11 | extern const int MEMORY_LIMIT_EXCEEDED; | 
|---|
| 12 | } | 
|---|
| 13 |  | 
|---|
| 14 | /// Also allows to set checkpoint at some position in stream and come back to this position later. | 
|---|
| 15 | /// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer | 
|---|
| 16 | /// Sub-buffer should not be accessed directly during the lifetime of peekable buffer. | 
|---|
| 17 | /// If position() of peekable buffer is explicitly set to some position before checkpoint | 
|---|
| 18 | /// (e.g. by istr.position() = prev_pos), behavior is undefined. | 
|---|
| 19 | class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer> | 
|---|
| 20 | { | 
|---|
| 21 | friend class PeekableReadBufferCheckpoint; | 
|---|
| 22 | public: | 
|---|
| 23 | explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE, | 
|---|
| 24 | size_t unread_limit_ = 16 * DBMS_DEFAULT_BUFFER_SIZE); | 
|---|
| 25 |  | 
|---|
| 26 | ~PeekableReadBuffer() override; | 
|---|
| 27 |  | 
|---|
| 28 | /// Sets checkpoint at current position | 
|---|
| 29 | ALWAYS_INLINE inline void setCheckpoint() | 
|---|
| 30 | { | 
|---|
| 31 | #ifndef NDEBUG | 
|---|
| 32 | if (checkpoint) | 
|---|
| 33 | throw DB::Exception( "Does not support recursive checkpoints.", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 34 | #endif | 
|---|
| 35 | checkpoint_in_own_memory = currentlyReadFromOwnMemory(); | 
|---|
| 36 | if (!checkpoint_in_own_memory) | 
|---|
| 37 | { | 
|---|
| 38 | /// Don't need to store unread data anymore | 
|---|
| 39 | peeked_size = 0; | 
|---|
| 40 | } | 
|---|
| 41 | checkpoint = pos; | 
|---|
| 42 | } | 
|---|
| 43 |  | 
|---|
| 44 | /// Forget checkpoint and all data between checkpoint and position | 
|---|
| 45 | ALWAYS_INLINE inline void dropCheckpoint() | 
|---|
| 46 | { | 
|---|
| 47 | #ifndef NDEBUG | 
|---|
| 48 | if (!checkpoint) | 
|---|
| 49 | throw DB::Exception( "There is no checkpoint", ErrorCodes::LOGICAL_ERROR); | 
|---|
| 50 | #endif | 
|---|
| 51 | if (!currentlyReadFromOwnMemory()) | 
|---|
| 52 | { | 
|---|
| 53 | /// Don't need to store unread data anymore | 
|---|
| 54 | peeked_size = 0; | 
|---|
| 55 | } | 
|---|
| 56 | checkpoint = nullptr; | 
|---|
| 57 | checkpoint_in_own_memory = false; | 
|---|
| 58 | } | 
|---|
| 59 |  | 
|---|
| 60 | /// Sets position at checkpoint. | 
|---|
| 61 | /// All pointers (such as this->buffer().end()) may be invalidated | 
|---|
| 62 | void rollbackToCheckpoint(); | 
|---|
| 63 |  | 
|---|
| 64 | /// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory, | 
|---|
| 65 | /// so data between checkpoint and position will be in continuous memory. | 
|---|
| 66 | void makeContinuousMemoryFromCheckpointToPos(); | 
|---|
| 67 |  | 
|---|
| 68 | /// Returns true if there unread data extracted from sub-buffer in own memory. | 
|---|
| 69 | /// This data will be lost after destruction of peekable buffer. | 
|---|
| 70 | bool hasUnreadData() const; | 
|---|
| 71 |  | 
|---|
| 72 | private: | 
|---|
| 73 | bool nextImpl() override; | 
|---|
| 74 |  | 
|---|
| 75 | bool peekNext(); | 
|---|
| 76 |  | 
|---|
| 77 | inline bool useSubbufferOnly() const { return !peeked_size; } | 
|---|
| 78 | inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); } | 
|---|
| 79 | inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; } | 
|---|
| 80 |  | 
|---|
| 81 | void checkStateCorrect() const; | 
|---|
| 82 |  | 
|---|
| 83 | /// Makes possible to append `bytes_to_append` bytes to data in own memory. | 
|---|
| 84 | /// Updates all invalidated pointers and sizes. | 
|---|
| 85 | void resizeOwnMemoryIfNecessary(size_t bytes_to_append); | 
|---|
| 86 |  | 
|---|
| 87 |  | 
|---|
| 88 | ReadBuffer & sub_buf; | 
|---|
| 89 | const size_t unread_limit; | 
|---|
| 90 | size_t peeked_size = 0; | 
|---|
| 91 | Position checkpoint = nullptr; | 
|---|
| 92 | bool checkpoint_in_own_memory = false; | 
|---|
| 93 | }; | 
|---|
| 94 |  | 
|---|
| 95 |  | 
|---|
| 96 | class PeekableReadBufferCheckpoint : boost::noncopyable | 
|---|
| 97 | { | 
|---|
| 98 | PeekableReadBuffer & buf; | 
|---|
| 99 | bool auto_rollback; | 
|---|
| 100 | public: | 
|---|
| 101 | explicit PeekableReadBufferCheckpoint(PeekableReadBuffer & buf_, bool auto_rollback_ = false) | 
|---|
| 102 | : buf(buf_), auto_rollback(auto_rollback_) { buf.setCheckpoint(); } | 
|---|
| 103 | ~PeekableReadBufferCheckpoint() | 
|---|
| 104 | { | 
|---|
| 105 | if (!buf.checkpoint) | 
|---|
| 106 | return; | 
|---|
| 107 | if (auto_rollback) | 
|---|
| 108 | buf.rollbackToCheckpoint(); | 
|---|
| 109 | buf.dropCheckpoint(); | 
|---|
| 110 | } | 
|---|
| 111 |  | 
|---|
| 112 | }; | 
|---|
| 113 |  | 
|---|
| 114 | } | 
|---|
| 115 |  | 
|---|