1 | // |
2 | // FIFOBuffer.h |
3 | // |
4 | // Library: Foundation |
5 | // Package: Core |
6 | // Module: FIFOBuffer |
7 | // |
8 | // Definition of the FIFOBuffer class. |
9 | // |
10 | // Copyright (c) 2006, Applied Informatics Software Engineering GmbH. |
11 | // and Contributors. |
12 | // |
13 | // SPDX-License-Identifier: BSL-1.0 |
14 | // |
15 | |
16 | |
17 | #ifndef Foundation_FIFOBuffer_INCLUDED |
18 | #define Foundation_FIFOBuffer_INCLUDED |
19 | |
20 | |
21 | #include "Poco/Foundation.h" |
22 | #include "Poco/Exception.h" |
23 | #include "Poco/Buffer.h" |
24 | #include "Poco/BasicEvent.h" |
25 | #include "Poco/Mutex.h" |
26 | #include "Poco/Format.h" |
27 | |
28 | |
29 | namespace Poco { |
30 | |
31 | |
32 | template <class T> |
33 | class BasicFIFOBuffer |
34 | /// A simple buffer class with support for re-entrant, |
35 | /// FIFO-style read/write operations, as well as (optional) |
36 | /// empty/non-empty/full (i.e. writable/readable) transition |
37 | /// notifications. Buffer can be flagged with end-of-file and |
38 | /// error flags, which renders it un-readable/writable. |
39 | /// |
40 | /// Critical portions of code are protected by a recursive mutex. |
41 | /// However, to achieve thread-safety in cases where multiple |
42 | /// member function calls are involved and have to be atomic, |
43 | /// the mutex must be locked externally. |
44 | /// |
45 | /// Buffer size, as well as amount of unread data and |
46 | /// available space introspections are supported as well. |
47 | /// |
48 | /// This class is useful anywhere where a FIFO functionality |
49 | /// is needed. |
50 | { |
51 | public: |
52 | typedef T Type; |
53 | |
54 | mutable Poco::BasicEvent<bool> writable; |
55 | /// Event indicating "writability" of the buffer, |
56 | /// triggered as follows: |
57 | /// |
58 | /// * when buffer transitions from non-full to full, |
59 | /// Writable event observers are notified, with |
60 | /// false value as the argument |
61 | /// |
62 | /// * when buffer transitions from full to non-full, |
63 | /// Writable event observers are notified, with |
64 | /// true value as the argument |
65 | |
66 | mutable Poco::BasicEvent<bool> readable; |
67 | /// Event indicating "readability" of the buffer, |
68 | /// triggered as follows: |
69 | /// |
70 | /// * when buffer transitions from non-empty to empty, |
71 | /// Readable event observers are notified, with false |
72 | /// value as the argument |
73 | /// |
74 | /// * when FIFOBuffer transitions from empty to non-empty, |
75 | /// Readable event observers are notified, with true value |
76 | /// as the argument |
77 | |
78 | BasicFIFOBuffer(std::size_t bufferSize, bool bufferNotify = false): |
79 | _buffer(bufferSize), |
80 | _begin(0), |
81 | _used(0), |
82 | _notify(bufferNotify), |
83 | _eof(false), |
84 | _error(false) |
85 | /// Creates the FIFOBuffer. |
86 | { |
87 | } |
88 | |
89 | BasicFIFOBuffer(T* pBuffer, std::size_t bufferSize, bool bufferNotify = false): |
90 | _buffer(pBuffer, bufferSize), |
91 | _begin(0), |
92 | _used(0), |
93 | _notify(bufferNotify), |
94 | _eof(false), |
95 | _error(false) |
96 | /// Creates the FIFOBuffer. |
97 | { |
98 | } |
99 | |
100 | BasicFIFOBuffer(const T* pBuffer, std::size_t bufferSize, bool bufferNotify = false): |
101 | _buffer(pBuffer, bufferSize), |
102 | _begin(0), |
103 | _used(bufferSize), |
104 | _notify(bufferNotify), |
105 | _eof(false), |
106 | _error(false) |
107 | /// Creates the FIFOBuffer. |
108 | { |
109 | } |
110 | |
111 | ~BasicFIFOBuffer() |
112 | /// Destroys the FIFOBuffer. |
113 | { |
114 | } |
115 | |
116 | void resize(std::size_t newSize, bool preserveContent = true) |
117 | /// Resizes the buffer. If preserveContent is true, |
118 | /// the content of the old buffer is preserved. |
119 | /// New size can be larger or smaller than |
120 | /// the current size, but it must not be 0. |
121 | /// Additionally, if the new length is smaller |
122 | /// than currently used length and preserveContent |
123 | /// is true, InvalidAccessException is thrown. |
124 | { |
125 | Mutex::ScopedLock lock(_mutex); |
126 | |
127 | if (preserveContent && (newSize < _used)) |
128 | throw InvalidAccessException("Can not resize FIFO without data loss." ); |
129 | |
130 | std::size_t usedBefore = _used; |
131 | _buffer.resize(newSize, preserveContent); |
132 | if (!preserveContent) _used = 0; |
133 | if (_notify) notify(usedBefore); |
134 | } |
135 | |
136 | std::size_t peek(T* pBuffer, std::size_t length) const |
137 | /// Peeks into the data currently in the FIFO |
138 | /// without actually extracting it. |
139 | /// If length is zero, the return is immediate. |
140 | /// If length is greater than used length, |
141 | /// it is substituted with the the current FIFO |
142 | /// used length. |
143 | /// |
144 | /// Returns the number of elements copied in the |
145 | /// supplied buffer. |
146 | { |
147 | if (0 == length) return 0; |
148 | Mutex::ScopedLock lock(_mutex); |
149 | if (!isReadable()) return 0; |
150 | if (length > _used) length = _used; |
151 | std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T)); |
152 | return length; |
153 | } |
154 | |
155 | std::size_t peek(Poco::Buffer<T>& rBuffer, std::size_t length = 0) const |
156 | /// Peeks into the data currently in the FIFO |
157 | /// without actually extracting it. |
158 | /// Resizes the supplied buffer to the size of |
159 | /// data written to it. If length is not |
160 | /// supplied by the caller or is greater than length |
161 | /// of currently used data, the current FIFO used |
162 | /// data length is substituted for it. |
163 | /// |
164 | /// Returns the number of elements copied in the |
165 | /// supplied buffer. |
166 | { |
167 | Mutex::ScopedLock lock(_mutex); |
168 | if (!isReadable()) return 0; |
169 | if (0 == length || length > _used) length = _used; |
170 | rBuffer.resize(length); |
171 | return peek(rBuffer.begin(), length); |
172 | } |
173 | |
174 | std::size_t read(T* pBuffer, std::size_t length) |
175 | /// Copies the data currently in the FIFO |
176 | /// into the supplied buffer, which must be |
177 | /// preallocated to at least the length size |
178 | /// before calling this function. |
179 | /// |
180 | /// Returns the size of the copied data. |
181 | { |
182 | if (0 == length) return 0; |
183 | Mutex::ScopedLock lock(_mutex); |
184 | if (!isReadable()) return 0; |
185 | std::size_t usedBefore = _used; |
186 | std::size_t readLen = peek(pBuffer, length); |
187 | poco_assert (_used >= readLen); |
188 | _used -= readLen; |
189 | if (0 == _used) _begin = 0; |
190 | else _begin += length; |
191 | |
192 | if (_notify) notify(usedBefore); |
193 | |
194 | return readLen; |
195 | } |
196 | |
197 | std::size_t read(Poco::Buffer<T>& rBuffer, std::size_t length = 0) |
198 | /// Copies the data currently in the FIFO |
199 | /// into the supplied buffer. |
200 | /// Resizes the supplied buffer to the size of |
201 | /// data written to it. |
202 | /// |
203 | /// Returns the size of the copied data. |
204 | { |
205 | Mutex::ScopedLock lock(_mutex); |
206 | if (!isReadable()) return 0; |
207 | std::size_t usedBefore = _used; |
208 | std::size_t readLen = peek(rBuffer, length); |
209 | poco_assert (_used >= readLen); |
210 | _used -= readLen; |
211 | if (0 == _used) _begin = 0; |
212 | else _begin += length; |
213 | |
214 | if (_notify) notify(usedBefore); |
215 | |
216 | return readLen; |
217 | } |
218 | |
219 | std::size_t write(const T* pBuffer, std::size_t length) |
220 | /// Writes data from supplied buffer to the FIFO buffer. |
221 | /// If there is no sufficient space for the whole |
222 | /// buffer to be written, data up to available |
223 | /// length is written. |
224 | /// The length of data to be written is determined from the |
225 | /// length argument. Function does nothing and returns zero |
226 | /// if length argument is equal to zero. |
227 | /// |
228 | /// Returns the length of data written. |
229 | { |
230 | if (0 == length) return 0; |
231 | |
232 | Mutex::ScopedLock lock(_mutex); |
233 | |
234 | if (!isWritable()) return 0; |
235 | |
236 | if (_buffer.size() - (_begin + _used) < length) |
237 | { |
238 | std::memmove(_buffer.begin(), begin(), _used * sizeof(T)); |
239 | _begin = 0; |
240 | } |
241 | |
242 | std::size_t usedBefore = _used; |
243 | std::size_t availableBefore = _buffer.size() - _used - _begin; |
244 | std::size_t len = length > availableBefore ? availableBefore : length; |
245 | std::memcpy(begin() + _used, pBuffer, len * sizeof(T)); |
246 | _used += len; |
247 | poco_assert (_used <= _buffer.size()); |
248 | if (_notify) notify(usedBefore); |
249 | |
250 | return len; |
251 | } |
252 | |
253 | std::size_t write(const Buffer<T>& rBuffer, std::size_t length = 0) |
254 | /// Writes data from supplied buffer to the FIFO buffer. |
255 | /// If there is no sufficient space for the whole |
256 | /// buffer to be written, data up to available |
257 | /// length is written. |
258 | /// The length of data to be written is determined from the |
259 | /// length argument or buffer size (when length argument is |
260 | /// default zero or greater than buffer size). |
261 | /// |
262 | /// Returns the length of data written. |
263 | { |
264 | if (length == 0 || length > rBuffer.size()) |
265 | length = rBuffer.size(); |
266 | |
267 | return write(rBuffer.begin(), length); |
268 | } |
269 | |
270 | std::size_t size() const |
271 | /// Returns the size of the buffer. |
272 | { |
273 | return _buffer.size(); |
274 | } |
275 | |
276 | std::size_t used() const |
277 | /// Returns the size of the used portion of the buffer. |
278 | { |
279 | return _used; |
280 | } |
281 | |
282 | std::size_t available() const |
283 | /// Returns the size of the available portion of the buffer. |
284 | { |
285 | return size() - _used; |
286 | } |
287 | |
288 | void drain(std::size_t length = 0) |
289 | /// Drains length number of elements from the buffer. |
290 | /// If length is zero or greater than buffer current |
291 | /// content length, buffer is emptied. |
292 | { |
293 | Mutex::ScopedLock lock(_mutex); |
294 | |
295 | std::size_t usedBefore = _used; |
296 | |
297 | if (0 == length || length >= _used) |
298 | { |
299 | _begin = 0; |
300 | _used = 0; |
301 | } |
302 | else |
303 | { |
304 | _begin += length; |
305 | _used -= length; |
306 | } |
307 | |
308 | if (_notify) notify(usedBefore); |
309 | } |
310 | |
311 | void copy(const T* ptr, std::size_t length) |
312 | /// Copies the supplied data to the buffer and adjusts |
313 | /// the used buffer size. |
314 | { |
315 | poco_check_ptr(ptr); |
316 | if (0 == length) return; |
317 | |
318 | Mutex::ScopedLock lock(_mutex); |
319 | |
320 | if (length > available()) |
321 | throw Poco::InvalidAccessException("Cannot extend buffer." ); |
322 | |
323 | if (!isWritable()) |
324 | throw Poco::InvalidAccessException("Buffer not writable." ); |
325 | |
326 | std::memcpy(begin() + _used, ptr, length * sizeof(T)); |
327 | std::size_t usedBefore = _used; |
328 | _used += length; |
329 | if (_notify) notify(usedBefore); |
330 | } |
331 | |
332 | void advance(std::size_t length) |
333 | /// Advances buffer by length elements. |
334 | /// Should be called AFTER the data |
335 | /// was copied into the buffer. |
336 | { |
337 | Mutex::ScopedLock lock(_mutex); |
338 | |
339 | if (length > _buffer.size() - _used - _begin) |
340 | throw Poco::InvalidAccessException("Cannot extend buffer." ); |
341 | |
342 | if (!isWritable()) |
343 | throw Poco::InvalidAccessException("Buffer not writable." ); |
344 | |
345 | std::size_t usedBefore = _used; |
346 | _used += length; |
347 | if (_notify) notify(usedBefore); |
348 | } |
349 | |
350 | T* begin() |
351 | /// Returns the pointer to the beginning of the buffer. |
352 | { |
353 | Mutex::ScopedLock lock(_mutex); |
354 | if (_begin != 0) |
355 | { |
356 | // Move the data to the start of the buffer so begin() and next() |
357 | // always return consistent pointers with each other and allow writing |
358 | // to the end of the buffer. |
359 | std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T)); |
360 | _begin = 0; |
361 | } |
362 | return _buffer.begin(); |
363 | } |
364 | |
365 | T* next() |
366 | /// Returns the pointer to the next available position in the buffer. |
367 | { |
368 | Mutex::ScopedLock lock(_mutex); |
369 | return begin() + _used; |
370 | } |
371 | |
372 | T& operator [] (std::size_t index) |
373 | /// Returns value at index position. |
374 | /// Throws InvalidAccessException if index is larger than |
375 | /// the last valid (used) buffer position. |
376 | { |
377 | Mutex::ScopedLock lock(_mutex); |
378 | if (index >= _used) |
379 | throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)" , index, _used - 1)); |
380 | |
381 | return _buffer[_begin + index]; |
382 | } |
383 | |
384 | const T& operator [] (std::size_t index) const |
385 | /// Returns value at index position. |
386 | /// Throws InvalidAccessException if index is larger than |
387 | /// the last valid (used) buffer position. |
388 | { |
389 | Mutex::ScopedLock lock(_mutex); |
390 | if (index >= _used) |
391 | throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)" , index, _used - 1)); |
392 | |
393 | return _buffer[_begin + index]; |
394 | } |
395 | |
396 | const Buffer<T>& buffer() const |
397 | /// Returns const reference to the underlying buffer. |
398 | { |
399 | return _buffer; |
400 | } |
401 | |
402 | void setError(bool error = true) |
403 | /// Sets the error flag on the buffer and empties it. |
404 | /// If notifications are enabled, they will be triggered |
405 | /// if appropriate. |
406 | /// |
407 | /// Setting error flag to true prevents reading and writing |
408 | /// to the buffer; to re-enable FIFOBuffer for reading/writing, |
409 | /// the error flag must be set to false. |
410 | { |
411 | if (error) |
412 | { |
413 | bool f = false; |
414 | Mutex::ScopedLock lock(_mutex); |
415 | if (error && isReadable() && _notify) readable.notify(this, f); |
416 | if (error && isWritable() && _notify) writable.notify(this, f); |
417 | _error = error; |
418 | _used = 0; |
419 | } |
420 | else |
421 | { |
422 | bool t = true; |
423 | Mutex::ScopedLock lock(_mutex); |
424 | _error = false; |
425 | if (_notify && !_eof) writable.notify(this, t); |
426 | } |
427 | } |
428 | |
429 | bool isValid() const |
430 | /// Returns true if error flag is not set on the buffer, |
431 | /// otherwise returns false. |
432 | { |
433 | return !_error; |
434 | } |
435 | |
436 | void setEOF(bool eof = true) |
437 | /// Sets end-of-file flag on the buffer. |
438 | /// |
439 | /// Setting EOF flag to true prevents writing to the |
440 | /// buffer; reading from the buffer will still be |
441 | /// allowed until all data present in the buffer at the |
442 | /// EOF set time is drained. After that, to re-enable |
443 | /// FIFOBuffer for reading/writing, EOF must be |
444 | /// set to false. |
445 | /// |
446 | /// Setting EOF flag to false clears EOF state if it |
447 | /// was previously set. If EOF was not set, it has no |
448 | /// effect. |
449 | { |
450 | Mutex::ScopedLock lock(_mutex); |
451 | bool flag = !eof; |
452 | if (_notify) writable.notify(this, flag); |
453 | _eof = eof; |
454 | } |
455 | |
456 | bool hasEOF() const |
457 | /// Returns true if EOF flag has been set. |
458 | { |
459 | return _eof; |
460 | } |
461 | |
462 | bool isEOF() const |
463 | /// Returns true if EOF flag has been set and buffer is empty. |
464 | { |
465 | return isEmpty() && _eof; |
466 | } |
467 | |
468 | bool isEmpty() const |
469 | /// Returns true is buffer is empty, false otherwise. |
470 | { |
471 | return 0 == _used; |
472 | } |
473 | |
474 | bool isFull() const |
475 | /// Returns true is buffer is full, false otherwise. |
476 | { |
477 | return size() == _used; |
478 | } |
479 | |
480 | bool isReadable() const |
481 | /// Returns true if buffer contains data and is not |
482 | /// in error state. |
483 | { |
484 | return !isEmpty() && isValid(); |
485 | } |
486 | |
487 | bool isWritable() const |
488 | /// Returns true if buffer is not full and is not |
489 | /// in error state. |
490 | { |
491 | return !isFull() && isValid() && !_eof; |
492 | } |
493 | |
494 | void setNotify(bool bufferNotify = true) |
495 | /// Enables/disables notifications. |
496 | { |
497 | _notify = bufferNotify; |
498 | } |
499 | |
500 | bool getNotify() const |
501 | /// Returns true if notifications are enabled, false otherwise. |
502 | { |
503 | return _notify; |
504 | } |
505 | |
506 | Mutex& mutex() |
507 | /// Returns reference to mutex. |
508 | { |
509 | return _mutex; |
510 | } |
511 | |
512 | private: |
513 | void notify(std::size_t usedBefore) |
514 | { |
515 | bool t = true, f = false; |
516 | if (usedBefore == 0 && _used > 0) |
517 | readable.notify(this, t); |
518 | else if (usedBefore > 0 && 0 == _used) |
519 | readable.notify(this, f); |
520 | |
521 | if (usedBefore == _buffer.size() && _used < _buffer.size()) |
522 | writable.notify(this, t); |
523 | else if (usedBefore < _buffer.size() && _used == _buffer.size()) |
524 | writable.notify(this, f); |
525 | } |
526 | |
527 | BasicFIFOBuffer(); |
528 | BasicFIFOBuffer(const BasicFIFOBuffer&); |
529 | BasicFIFOBuffer& operator = (const BasicFIFOBuffer&); |
530 | |
531 | Buffer<T> _buffer; |
532 | std::size_t _begin; |
533 | std::size_t _used; |
534 | bool _notify; |
535 | mutable Mutex _mutex; |
536 | bool _eof; |
537 | bool _error; |
538 | }; |
539 | |
540 | |
541 | // |
542 | // We provide an instantiation for char |
543 | // |
544 | typedef BasicFIFOBuffer<char> FIFOBuffer; |
545 | |
546 | |
547 | } // namespace Poco |
548 | |
549 | |
550 | #endif // Foundation_FIFOBuffer_INCLUDED |
551 | |