1 | #pragma once |
2 | #include <IO/ReadBuffer.h> |
3 | #include <IO/BufferWithOwnMemory.h> |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | namespace ErrorCodes |
9 | { |
10 | extern const int LOGICAL_ERROR; |
11 | extern const int MEMORY_LIMIT_EXCEEDED; |
12 | } |
13 | |
14 | /// Also allows to set checkpoint at some position in stream and come back to this position later. |
15 | /// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer |
16 | /// Sub-buffer should not be accessed directly during the lifetime of peekable buffer. |
17 | /// If position() of peekable buffer is explicitly set to some position before checkpoint |
18 | /// (e.g. by istr.position() = prev_pos), behavior is undefined. |
19 | class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer> |
20 | { |
21 | friend class PeekableReadBufferCheckpoint; |
22 | public: |
23 | explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE, |
24 | size_t unread_limit_ = 16 * DBMS_DEFAULT_BUFFER_SIZE); |
25 | |
26 | ~PeekableReadBuffer() override; |
27 | |
28 | /// Sets checkpoint at current position |
29 | ALWAYS_INLINE inline void setCheckpoint() |
30 | { |
31 | #ifndef NDEBUG |
32 | if (checkpoint) |
33 | throw DB::Exception("Does not support recursive checkpoints." , ErrorCodes::LOGICAL_ERROR); |
34 | #endif |
35 | checkpoint_in_own_memory = currentlyReadFromOwnMemory(); |
36 | if (!checkpoint_in_own_memory) |
37 | { |
38 | /// Don't need to store unread data anymore |
39 | peeked_size = 0; |
40 | } |
41 | checkpoint = pos; |
42 | } |
43 | |
44 | /// Forget checkpoint and all data between checkpoint and position |
45 | ALWAYS_INLINE inline void dropCheckpoint() |
46 | { |
47 | #ifndef NDEBUG |
48 | if (!checkpoint) |
49 | throw DB::Exception("There is no checkpoint" , ErrorCodes::LOGICAL_ERROR); |
50 | #endif |
51 | if (!currentlyReadFromOwnMemory()) |
52 | { |
53 | /// Don't need to store unread data anymore |
54 | peeked_size = 0; |
55 | } |
56 | checkpoint = nullptr; |
57 | checkpoint_in_own_memory = false; |
58 | } |
59 | |
60 | /// Sets position at checkpoint. |
61 | /// All pointers (such as this->buffer().end()) may be invalidated |
62 | void rollbackToCheckpoint(); |
63 | |
64 | /// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory, |
65 | /// so data between checkpoint and position will be in continuous memory. |
66 | void makeContinuousMemoryFromCheckpointToPos(); |
67 | |
68 | /// Returns true if there unread data extracted from sub-buffer in own memory. |
69 | /// This data will be lost after destruction of peekable buffer. |
70 | bool hasUnreadData() const; |
71 | |
72 | private: |
73 | bool nextImpl() override; |
74 | |
75 | bool peekNext(); |
76 | |
77 | inline bool useSubbufferOnly() const { return !peeked_size; } |
78 | inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); } |
79 | inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; } |
80 | |
81 | void checkStateCorrect() const; |
82 | |
83 | /// Makes possible to append `bytes_to_append` bytes to data in own memory. |
84 | /// Updates all invalidated pointers and sizes. |
85 | void resizeOwnMemoryIfNecessary(size_t bytes_to_append); |
86 | |
87 | |
88 | ReadBuffer & sub_buf; |
89 | const size_t unread_limit; |
90 | size_t peeked_size = 0; |
91 | Position checkpoint = nullptr; |
92 | bool checkpoint_in_own_memory = false; |
93 | }; |
94 | |
95 | |
96 | class PeekableReadBufferCheckpoint : boost::noncopyable |
97 | { |
98 | PeekableReadBuffer & buf; |
99 | bool auto_rollback; |
100 | public: |
101 | explicit PeekableReadBufferCheckpoint(PeekableReadBuffer & buf_, bool auto_rollback_ = false) |
102 | : buf(buf_), auto_rollback(auto_rollback_) { buf.setCheckpoint(); } |
103 | ~PeekableReadBufferCheckpoint() |
104 | { |
105 | if (!buf.checkpoint) |
106 | return; |
107 | if (auto_rollback) |
108 | buf.rollbackToCheckpoint(); |
109 | buf.dropCheckpoint(); |
110 | } |
111 | |
112 | }; |
113 | |
114 | } |
115 | |