1#pragma once
2#include <IO/ReadBuffer.h>
3#include <IO/BufferWithOwnMemory.h>
4
5namespace DB
6{
7
8namespace ErrorCodes
9{
10 extern const int LOGICAL_ERROR;
11 extern const int MEMORY_LIMIT_EXCEEDED;
12}
13
14/// Also allows to set checkpoint at some position in stream and come back to this position later.
15/// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer
16/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer.
17/// If position() of peekable buffer is explicitly set to some position before checkpoint
18/// (e.g. by istr.position() = prev_pos), behavior is undefined.
19class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
20{
21 friend class PeekableReadBufferCheckpoint;
22public:
23 explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE,
24 size_t unread_limit_ = 16 * DBMS_DEFAULT_BUFFER_SIZE);
25
26 ~PeekableReadBuffer() override;
27
28 /// Sets checkpoint at current position
29 ALWAYS_INLINE inline void setCheckpoint()
30 {
31#ifndef NDEBUG
32 if (checkpoint)
33 throw DB::Exception("Does not support recursive checkpoints.", ErrorCodes::LOGICAL_ERROR);
34#endif
35 checkpoint_in_own_memory = currentlyReadFromOwnMemory();
36 if (!checkpoint_in_own_memory)
37 {
38 /// Don't need to store unread data anymore
39 peeked_size = 0;
40 }
41 checkpoint = pos;
42 }
43
44 /// Forget checkpoint and all data between checkpoint and position
45 ALWAYS_INLINE inline void dropCheckpoint()
46 {
47#ifndef NDEBUG
48 if (!checkpoint)
49 throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
50#endif
51 if (!currentlyReadFromOwnMemory())
52 {
53 /// Don't need to store unread data anymore
54 peeked_size = 0;
55 }
56 checkpoint = nullptr;
57 checkpoint_in_own_memory = false;
58 }
59
60 /// Sets position at checkpoint.
61 /// All pointers (such as this->buffer().end()) may be invalidated
62 void rollbackToCheckpoint();
63
64 /// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory,
65 /// so data between checkpoint and position will be in continuous memory.
66 void makeContinuousMemoryFromCheckpointToPos();
67
68 /// Returns true if there unread data extracted from sub-buffer in own memory.
69 /// This data will be lost after destruction of peekable buffer.
70 bool hasUnreadData() const;
71
72private:
73 bool nextImpl() override;
74
75 bool peekNext();
76
77 inline bool useSubbufferOnly() const { return !peeked_size; }
78 inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); }
79 inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; }
80
81 void checkStateCorrect() const;
82
83 /// Makes possible to append `bytes_to_append` bytes to data in own memory.
84 /// Updates all invalidated pointers and sizes.
85 void resizeOwnMemoryIfNecessary(size_t bytes_to_append);
86
87
88 ReadBuffer & sub_buf;
89 const size_t unread_limit;
90 size_t peeked_size = 0;
91 Position checkpoint = nullptr;
92 bool checkpoint_in_own_memory = false;
93};
94
95
96class PeekableReadBufferCheckpoint : boost::noncopyable
97{
98 PeekableReadBuffer & buf;
99 bool auto_rollback;
100public:
101 explicit PeekableReadBufferCheckpoint(PeekableReadBuffer & buf_, bool auto_rollback_ = false)
102 : buf(buf_), auto_rollback(auto_rollback_) { buf.setCheckpoint(); }
103 ~PeekableReadBufferCheckpoint()
104 {
105 if (!buf.checkpoint)
106 return;
107 if (auto_rollback)
108 buf.rollbackToCheckpoint();
109 buf.dropCheckpoint();
110 }
111
112};
113
114}
115