1 | #include <IO/PeekableReadBuffer.h> |
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= DBMS_DEFAULT_BUFFER_SIZE*/, |
7 | size_t unread_limit_ /* = default_limit*/) |
8 | : BufferWithOwnMemory(start_size_), sub_buf(sub_buf_), unread_limit(unread_limit_) |
9 | { |
10 | padded &= sub_buf.isPadded(); |
11 | /// Read from sub-buffer |
12 | Buffer & sub_working = sub_buf.buffer(); |
13 | BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); |
14 | |
15 | checkStateCorrect(); |
16 | } |
17 | |
18 | bool PeekableReadBuffer::peekNext() |
19 | { |
20 | checkStateCorrect(); |
21 | |
22 | Position copy_from = pos; |
23 | size_t bytes_to_copy = sub_buf.available(); |
24 | if (useSubbufferOnly()) |
25 | { |
26 | /// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer) |
27 | if (checkpoint) |
28 | copy_from = checkpoint; |
29 | bytes_to_copy = sub_buf.buffer().end() - copy_from; |
30 | if (!bytes_to_copy) |
31 | { |
32 | sub_buf.position() = copy_from; |
33 | |
34 | /// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data. |
35 | bool res = sub_buf.next(); |
36 | BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); |
37 | if (checkpoint) |
38 | checkpoint = pos; |
39 | |
40 | checkStateCorrect(); |
41 | return res; |
42 | } |
43 | } |
44 | |
45 | /// May throw an exception |
46 | resizeOwnMemoryIfNecessary(bytes_to_copy); |
47 | |
48 | if (useSubbufferOnly()) |
49 | { |
50 | sub_buf.position() = copy_from; |
51 | } |
52 | |
53 | /// Save unread data from sub-buffer to own memory |
54 | memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_copy); |
55 | |
56 | /// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary |
57 | /// Otherwise, checkpoint now at the beginning of own memory |
58 | if (checkpoint && useSubbufferOnly()) |
59 | { |
60 | checkpoint = memory.data(); |
61 | checkpoint_in_own_memory = true; |
62 | } |
63 | if (currentlyReadFromOwnMemory()) |
64 | { |
65 | /// Update buffer size |
66 | BufferBase::set(memory.data(), peeked_size + bytes_to_copy, offset()); |
67 | } |
68 | else |
69 | { |
70 | /// Switch to reading from own memory |
71 | size_t pos_offset = peeked_size + this->offset(); |
72 | if (useSubbufferOnly()) |
73 | { |
74 | if (checkpoint) |
75 | pos_offset = bytes_to_copy; |
76 | else |
77 | pos_offset = 0; |
78 | } |
79 | BufferBase::set(memory.data(), peeked_size + bytes_to_copy, pos_offset); |
80 | |
81 | } |
82 | |
83 | peeked_size += bytes_to_copy; |
84 | sub_buf.position() += bytes_to_copy; |
85 | |
86 | checkStateCorrect(); |
87 | return sub_buf.next(); |
88 | } |
89 | |
90 | void PeekableReadBuffer::rollbackToCheckpoint() |
91 | { |
92 | checkStateCorrect(); |
93 | if (!checkpoint) |
94 | throw DB::Exception("There is no checkpoint" , ErrorCodes::LOGICAL_ERROR); |
95 | else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory()) |
96 | pos = checkpoint; |
97 | else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory |
98 | BufferBase::set(memory.data(), peeked_size, checkpoint - memory.data()); |
99 | checkStateCorrect(); |
100 | } |
101 | |
102 | bool PeekableReadBuffer::nextImpl() |
103 | { |
104 | /// FIXME wrong bytes count because it can read the same data again after rollbackToCheckpoint() |
105 | /// However, changing bytes count on every call of next() (even after rollback) allows to determine if some pointers were invalidated. |
106 | checkStateCorrect(); |
107 | bool res; |
108 | |
109 | if (checkpoint) |
110 | { |
111 | if (currentlyReadFromOwnMemory()) |
112 | res = sub_buf.hasPendingData() || sub_buf.next(); |
113 | else |
114 | res = peekNext(); |
115 | } |
116 | else |
117 | { |
118 | if (useSubbufferOnly()) |
119 | { |
120 | /// Load next data to sub_buf |
121 | sub_buf.position() = pos; |
122 | res = sub_buf.next(); |
123 | } |
124 | else |
125 | { |
126 | /// All copied data have been read from own memory, continue reading from sub_buf |
127 | peeked_size = 0; |
128 | res = sub_buf.hasPendingData() || sub_buf.next(); |
129 | } |
130 | } |
131 | |
132 | /// Switch to reading from sub_buf (or just update it if already switched) |
133 | Buffer & sub_working = sub_buf.buffer(); |
134 | BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); |
135 | working_buffer_offset = sub_buf.offset(); |
136 | |
137 | checkStateCorrect(); |
138 | return res; |
139 | } |
140 | |
141 | |
142 | void PeekableReadBuffer::checkStateCorrect() const |
143 | { |
144 | #ifndef NDEBUG |
145 | if (checkpoint) |
146 | { |
147 | if (checkpointInOwnMemory()) |
148 | { |
149 | if (!peeked_size) |
150 | throw DB::Exception("Checkpoint in empty own buffer" , ErrorCodes::LOGICAL_ERROR); |
151 | if (currentlyReadFromOwnMemory() && pos < checkpoint) |
152 | throw DB::Exception("Current position in own buffer before checkpoint in own buffer" , ErrorCodes::LOGICAL_ERROR); |
153 | if (!currentlyReadFromOwnMemory() && pos < sub_buf.position()) |
154 | throw DB::Exception("Current position in subbuffer less than sub_buf.position()" , ErrorCodes::LOGICAL_ERROR); |
155 | } |
156 | else |
157 | { |
158 | if (peeked_size) |
159 | throw DB::Exception("Own buffer is not empty" , ErrorCodes::LOGICAL_ERROR); |
160 | if (currentlyReadFromOwnMemory()) |
161 | throw DB::Exception("Current position in own buffer before checkpoint in subbuffer" , ErrorCodes::LOGICAL_ERROR); |
162 | if (pos < checkpoint) |
163 | throw DB::Exception("Current position in subbuffer before checkpoint in subbuffer" , ErrorCodes::LOGICAL_ERROR); |
164 | } |
165 | } |
166 | else |
167 | { |
168 | if (!currentlyReadFromOwnMemory() && peeked_size) |
169 | throw DB::Exception("Own buffer is not empty" , ErrorCodes::LOGICAL_ERROR); |
170 | } |
171 | if (currentlyReadFromOwnMemory() && !peeked_size) |
172 | throw DB::Exception("Pos in empty own buffer" , ErrorCodes::LOGICAL_ERROR); |
173 | if (unread_limit < memory.size()) |
174 | throw DB::Exception("Size limit exceed" , ErrorCodes::LOGICAL_ERROR); |
175 | #endif |
176 | } |
177 | |
178 | void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append) |
179 | { |
180 | checkStateCorrect(); |
181 | bool need_update_checkpoint = checkpointInOwnMemory(); |
182 | bool need_update_pos = currentlyReadFromOwnMemory(); |
183 | size_t offset = 0; |
184 | if (need_update_checkpoint) |
185 | offset = checkpoint - memory.data(); |
186 | else if (need_update_pos) |
187 | offset = this->offset(); |
188 | |
189 | size_t new_size = peeked_size + bytes_to_append; |
190 | if (memory.size() < new_size) |
191 | { |
192 | if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size()) |
193 | { |
194 | /// Move unread data to the beginning of own memory instead of resize own memory |
195 | peeked_size -= offset; |
196 | memmove(memory.data(), memory.data() + offset, peeked_size); |
197 | |
198 | if (need_update_checkpoint) |
199 | checkpoint -= offset; |
200 | if (need_update_pos) |
201 | pos -= offset; |
202 | } |
203 | else |
204 | { |
205 | if (unread_limit < new_size) |
206 | throw DB::Exception("PeekableReadBuffer: Memory limit exceed" , ErrorCodes::MEMORY_LIMIT_EXCEEDED); |
207 | |
208 | size_t pos_offset = pos - memory.data(); |
209 | |
210 | size_t new_size_amortized = memory.size() * 2; |
211 | if (new_size_amortized < new_size) |
212 | new_size_amortized = new_size; |
213 | else if (unread_limit < new_size_amortized) |
214 | new_size_amortized = unread_limit; |
215 | memory.resize(new_size_amortized); |
216 | |
217 | if (need_update_checkpoint) |
218 | checkpoint = memory.data() + offset; |
219 | if (need_update_pos) |
220 | { |
221 | BufferBase::set(memory.data(), peeked_size, pos_offset); |
222 | } |
223 | } |
224 | } |
225 | checkStateCorrect(); |
226 | } |
227 | |
228 | void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos() |
229 | { |
230 | #ifndef NDEBUG |
231 | if (!checkpoint) |
232 | throw DB::Exception("There is no checkpoint" , ErrorCodes::LOGICAL_ERROR); |
233 | checkStateCorrect(); |
234 | #endif |
235 | if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory()) |
236 | return; /// is't already continuous |
237 | |
238 | size_t bytes_to_append = pos - sub_buf.position(); |
239 | resizeOwnMemoryIfNecessary(bytes_to_append); |
240 | memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_append); |
241 | sub_buf.position() = pos; |
242 | peeked_size += bytes_to_append; |
243 | } |
244 | |
245 | PeekableReadBuffer::~PeekableReadBuffer() |
246 | { |
247 | if (!currentlyReadFromOwnMemory()) |
248 | sub_buf.position() = pos; |
249 | } |
250 | |
251 | bool PeekableReadBuffer::hasUnreadData() const |
252 | { |
253 | return peeked_size && pos != memory.data() + peeked_size; |
254 | } |
255 | |
256 | } |
257 | |