1#include <IO/PeekableReadBuffer.h>
2
3namespace DB
4{
5
6PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= DBMS_DEFAULT_BUFFER_SIZE*/,
7 size_t unread_limit_ /* = default_limit*/)
8 : BufferWithOwnMemory(start_size_), sub_buf(sub_buf_), unread_limit(unread_limit_)
9{
10 padded &= sub_buf.isPadded();
11 /// Read from sub-buffer
12 Buffer & sub_working = sub_buf.buffer();
13 BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
14
15 checkStateCorrect();
16}
17
18bool PeekableReadBuffer::peekNext()
19{
20 checkStateCorrect();
21
22 Position copy_from = pos;
23 size_t bytes_to_copy = sub_buf.available();
24 if (useSubbufferOnly())
25 {
26 /// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer)
27 if (checkpoint)
28 copy_from = checkpoint;
29 bytes_to_copy = sub_buf.buffer().end() - copy_from;
30 if (!bytes_to_copy)
31 {
32 sub_buf.position() = copy_from;
33
34 /// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data.
35 bool res = sub_buf.next();
36 BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
37 if (checkpoint)
38 checkpoint = pos;
39
40 checkStateCorrect();
41 return res;
42 }
43 }
44
45 /// May throw an exception
46 resizeOwnMemoryIfNecessary(bytes_to_copy);
47
48 if (useSubbufferOnly())
49 {
50 sub_buf.position() = copy_from;
51 }
52
53 /// Save unread data from sub-buffer to own memory
54 memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_copy);
55
56 /// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary
57 /// Otherwise, checkpoint now at the beginning of own memory
58 if (checkpoint && useSubbufferOnly())
59 {
60 checkpoint = memory.data();
61 checkpoint_in_own_memory = true;
62 }
63 if (currentlyReadFromOwnMemory())
64 {
65 /// Update buffer size
66 BufferBase::set(memory.data(), peeked_size + bytes_to_copy, offset());
67 }
68 else
69 {
70 /// Switch to reading from own memory
71 size_t pos_offset = peeked_size + this->offset();
72 if (useSubbufferOnly())
73 {
74 if (checkpoint)
75 pos_offset = bytes_to_copy;
76 else
77 pos_offset = 0;
78 }
79 BufferBase::set(memory.data(), peeked_size + bytes_to_copy, pos_offset);
80
81 }
82
83 peeked_size += bytes_to_copy;
84 sub_buf.position() += bytes_to_copy;
85
86 checkStateCorrect();
87 return sub_buf.next();
88}
89
90void PeekableReadBuffer::rollbackToCheckpoint()
91{
92 checkStateCorrect();
93 if (!checkpoint)
94 throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
95 else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory())
96 pos = checkpoint;
97 else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory
98 BufferBase::set(memory.data(), peeked_size, checkpoint - memory.data());
99 checkStateCorrect();
100}
101
102bool PeekableReadBuffer::nextImpl()
103{
104 /// FIXME wrong bytes count because it can read the same data again after rollbackToCheckpoint()
105 /// However, changing bytes count on every call of next() (even after rollback) allows to determine if some pointers were invalidated.
106 checkStateCorrect();
107 bool res;
108
109 if (checkpoint)
110 {
111 if (currentlyReadFromOwnMemory())
112 res = sub_buf.hasPendingData() || sub_buf.next();
113 else
114 res = peekNext();
115 }
116 else
117 {
118 if (useSubbufferOnly())
119 {
120 /// Load next data to sub_buf
121 sub_buf.position() = pos;
122 res = sub_buf.next();
123 }
124 else
125 {
126 /// All copied data have been read from own memory, continue reading from sub_buf
127 peeked_size = 0;
128 res = sub_buf.hasPendingData() || sub_buf.next();
129 }
130 }
131
132 /// Switch to reading from sub_buf (or just update it if already switched)
133 Buffer & sub_working = sub_buf.buffer();
134 BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
135 working_buffer_offset = sub_buf.offset();
136
137 checkStateCorrect();
138 return res;
139}
140
141
142void PeekableReadBuffer::checkStateCorrect() const
143{
144#ifndef NDEBUG
145 if (checkpoint)
146 {
147 if (checkpointInOwnMemory())
148 {
149 if (!peeked_size)
150 throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR);
151 if (currentlyReadFromOwnMemory() && pos < checkpoint)
152 throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR);
153 if (!currentlyReadFromOwnMemory() && pos < sub_buf.position())
154 throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR);
155 }
156 else
157 {
158 if (peeked_size)
159 throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR);
160 if (currentlyReadFromOwnMemory())
161 throw DB::Exception("Current position in own buffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR);
162 if (pos < checkpoint)
163 throw DB::Exception("Current position in subbuffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR);
164 }
165 }
166 else
167 {
168 if (!currentlyReadFromOwnMemory() && peeked_size)
169 throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR);
170 }
171 if (currentlyReadFromOwnMemory() && !peeked_size)
172 throw DB::Exception("Pos in empty own buffer", ErrorCodes::LOGICAL_ERROR);
173 if (unread_limit < memory.size())
174 throw DB::Exception("Size limit exceed", ErrorCodes::LOGICAL_ERROR);
175#endif
176}
177
178void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
179{
180 checkStateCorrect();
181 bool need_update_checkpoint = checkpointInOwnMemory();
182 bool need_update_pos = currentlyReadFromOwnMemory();
183 size_t offset = 0;
184 if (need_update_checkpoint)
185 offset = checkpoint - memory.data();
186 else if (need_update_pos)
187 offset = this->offset();
188
189 size_t new_size = peeked_size + bytes_to_append;
190 if (memory.size() < new_size)
191 {
192 if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size())
193 {
194 /// Move unread data to the beginning of own memory instead of resize own memory
195 peeked_size -= offset;
196 memmove(memory.data(), memory.data() + offset, peeked_size);
197
198 if (need_update_checkpoint)
199 checkpoint -= offset;
200 if (need_update_pos)
201 pos -= offset;
202 }
203 else
204 {
205 if (unread_limit < new_size)
206 throw DB::Exception("PeekableReadBuffer: Memory limit exceed", ErrorCodes::MEMORY_LIMIT_EXCEEDED);
207
208 size_t pos_offset = pos - memory.data();
209
210 size_t new_size_amortized = memory.size() * 2;
211 if (new_size_amortized < new_size)
212 new_size_amortized = new_size;
213 else if (unread_limit < new_size_amortized)
214 new_size_amortized = unread_limit;
215 memory.resize(new_size_amortized);
216
217 if (need_update_checkpoint)
218 checkpoint = memory.data() + offset;
219 if (need_update_pos)
220 {
221 BufferBase::set(memory.data(), peeked_size, pos_offset);
222 }
223 }
224 }
225 checkStateCorrect();
226}
227
228void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
229{
230#ifndef NDEBUG
231 if (!checkpoint)
232 throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
233 checkStateCorrect();
234#endif
235 if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
236 return; /// is't already continuous
237
238 size_t bytes_to_append = pos - sub_buf.position();
239 resizeOwnMemoryIfNecessary(bytes_to_append);
240 memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_append);
241 sub_buf.position() = pos;
242 peeked_size += bytes_to_append;
243}
244
245PeekableReadBuffer::~PeekableReadBuffer()
246{
247 if (!currentlyReadFromOwnMemory())
248 sub_buf.position() = pos;
249}
250
251bool PeekableReadBuffer::hasUnreadData() const
252{
253 return peeked_size && pos != memory.data() + peeked_size;
254}
255
256}
257