1//
2// FIFOBuffer.h
3//
4// Library: Foundation
5// Package: Core
6// Module: FIFOBuffer
7//
8// Definition of the FIFOBuffer class.
9//
10// Copyright (c) 2006, Applied Informatics Software Engineering GmbH.
11// and Contributors.
12//
13// SPDX-License-Identifier: BSL-1.0
14//
15
16
17#ifndef Foundation_FIFOBuffer_INCLUDED
18#define Foundation_FIFOBuffer_INCLUDED
19
20
21#include "Poco/Foundation.h"
22#include "Poco/Exception.h"
23#include "Poco/Buffer.h"
24#include "Poco/BasicEvent.h"
25#include "Poco/Mutex.h"
26#include "Poco/Format.h"
27
28
29namespace Poco {
30
31
32template <class T>
33class BasicFIFOBuffer
34 /// A simple buffer class with support for re-entrant,
35 /// FIFO-style read/write operations, as well as (optional)
36 /// empty/non-empty/full (i.e. writable/readable) transition
37 /// notifications. Buffer can be flagged with end-of-file and
38 /// error flags, which renders it un-readable/writable.
39 ///
40 /// Critical portions of code are protected by a recursive mutex.
41 /// However, to achieve thread-safety in cases where multiple
42 /// member function calls are involved and have to be atomic,
43 /// the mutex must be locked externally.
44 ///
45 /// Buffer size, as well as amount of unread data and
46 /// available space introspections are supported as well.
47 ///
48 /// This class is useful anywhere where a FIFO functionality
49 /// is needed.
50{
51public:
52 typedef T Type;
53
54 mutable Poco::BasicEvent<bool> writable;
55 /// Event indicating "writability" of the buffer,
56 /// triggered as follows:
57 ///
58 /// * when buffer transitions from non-full to full,
59 /// Writable event observers are notified, with
60 /// false value as the argument
61 ///
62 /// * when buffer transitions from full to non-full,
63 /// Writable event observers are notified, with
64 /// true value as the argument
65
66 mutable Poco::BasicEvent<bool> readable;
67 /// Event indicating "readability" of the buffer,
68 /// triggered as follows:
69 ///
70 /// * when buffer transitions from non-empty to empty,
71 /// Readable event observers are notified, with false
72 /// value as the argument
73 ///
74 /// * when FIFOBuffer transitions from empty to non-empty,
75 /// Readable event observers are notified, with true value
76 /// as the argument
77
78 BasicFIFOBuffer(std::size_t bufferSize, bool bufferNotify = false):
79 _buffer(bufferSize),
80 _begin(0),
81 _used(0),
82 _notify(bufferNotify),
83 _eof(false),
84 _error(false)
85 /// Creates the FIFOBuffer.
86 {
87 }
88
89 BasicFIFOBuffer(T* pBuffer, std::size_t bufferSize, bool bufferNotify = false):
90 _buffer(pBuffer, bufferSize),
91 _begin(0),
92 _used(0),
93 _notify(bufferNotify),
94 _eof(false),
95 _error(false)
96 /// Creates the FIFOBuffer.
97 {
98 }
99
100 BasicFIFOBuffer(const T* pBuffer, std::size_t bufferSize, bool bufferNotify = false):
101 _buffer(pBuffer, bufferSize),
102 _begin(0),
103 _used(bufferSize),
104 _notify(bufferNotify),
105 _eof(false),
106 _error(false)
107 /// Creates the FIFOBuffer.
108 {
109 }
110
111 ~BasicFIFOBuffer()
112 /// Destroys the FIFOBuffer.
113 {
114 }
115
116 void resize(std::size_t newSize, bool preserveContent = true)
117 /// Resizes the buffer. If preserveContent is true,
118 /// the content of the old buffer is preserved.
119 /// New size can be larger or smaller than
120 /// the current size, but it must not be 0.
121 /// Additionally, if the new length is smaller
122 /// than currently used length and preserveContent
123 /// is true, InvalidAccessException is thrown.
124 {
125 Mutex::ScopedLock lock(_mutex);
126
127 if (preserveContent && (newSize < _used))
128 throw InvalidAccessException("Can not resize FIFO without data loss.");
129
130 std::size_t usedBefore = _used;
131 _buffer.resize(newSize, preserveContent);
132 if (!preserveContent) _used = 0;
133 if (_notify) notify(usedBefore);
134 }
135
136 std::size_t peek(T* pBuffer, std::size_t length) const
137 /// Peeks into the data currently in the FIFO
138 /// without actually extracting it.
139 /// If length is zero, the return is immediate.
140 /// If length is greater than used length,
141 /// it is substituted with the the current FIFO
142 /// used length.
143 ///
144 /// Returns the number of elements copied in the
145 /// supplied buffer.
146 {
147 if (0 == length) return 0;
148 Mutex::ScopedLock lock(_mutex);
149 if (!isReadable()) return 0;
150 if (length > _used) length = _used;
151 std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T));
152 return length;
153 }
154
155 std::size_t peek(Poco::Buffer<T>& rBuffer, std::size_t length = 0) const
156 /// Peeks into the data currently in the FIFO
157 /// without actually extracting it.
158 /// Resizes the supplied buffer to the size of
159 /// data written to it. If length is not
160 /// supplied by the caller or is greater than length
161 /// of currently used data, the current FIFO used
162 /// data length is substituted for it.
163 ///
164 /// Returns the number of elements copied in the
165 /// supplied buffer.
166 {
167 Mutex::ScopedLock lock(_mutex);
168 if (!isReadable()) return 0;
169 if (0 == length || length > _used) length = _used;
170 rBuffer.resize(length);
171 return peek(rBuffer.begin(), length);
172 }
173
174 std::size_t read(T* pBuffer, std::size_t length)
175 /// Copies the data currently in the FIFO
176 /// into the supplied buffer, which must be
177 /// preallocated to at least the length size
178 /// before calling this function.
179 ///
180 /// Returns the size of the copied data.
181 {
182 if (0 == length) return 0;
183 Mutex::ScopedLock lock(_mutex);
184 if (!isReadable()) return 0;
185 std::size_t usedBefore = _used;
186 std::size_t readLen = peek(pBuffer, length);
187 poco_assert (_used >= readLen);
188 _used -= readLen;
189 if (0 == _used) _begin = 0;
190 else _begin += length;
191
192 if (_notify) notify(usedBefore);
193
194 return readLen;
195 }
196
197 std::size_t read(Poco::Buffer<T>& rBuffer, std::size_t length = 0)
198 /// Copies the data currently in the FIFO
199 /// into the supplied buffer.
200 /// Resizes the supplied buffer to the size of
201 /// data written to it.
202 ///
203 /// Returns the size of the copied data.
204 {
205 Mutex::ScopedLock lock(_mutex);
206 if (!isReadable()) return 0;
207 std::size_t usedBefore = _used;
208 std::size_t readLen = peek(rBuffer, length);
209 poco_assert (_used >= readLen);
210 _used -= readLen;
211 if (0 == _used) _begin = 0;
212 else _begin += length;
213
214 if (_notify) notify(usedBefore);
215
216 return readLen;
217 }
218
219 std::size_t write(const T* pBuffer, std::size_t length)
220 /// Writes data from supplied buffer to the FIFO buffer.
221 /// If there is no sufficient space for the whole
222 /// buffer to be written, data up to available
223 /// length is written.
224 /// The length of data to be written is determined from the
225 /// length argument. Function does nothing and returns zero
226 /// if length argument is equal to zero.
227 ///
228 /// Returns the length of data written.
229 {
230 if (0 == length) return 0;
231
232 Mutex::ScopedLock lock(_mutex);
233
234 if (!isWritable()) return 0;
235
236 if (_buffer.size() - (_begin + _used) < length)
237 {
238 std::memmove(_buffer.begin(), begin(), _used * sizeof(T));
239 _begin = 0;
240 }
241
242 std::size_t usedBefore = _used;
243 std::size_t availableBefore = _buffer.size() - _used - _begin;
244 std::size_t len = length > availableBefore ? availableBefore : length;
245 std::memcpy(begin() + _used, pBuffer, len * sizeof(T));
246 _used += len;
247 poco_assert (_used <= _buffer.size());
248 if (_notify) notify(usedBefore);
249
250 return len;
251 }
252
253 std::size_t write(const Buffer<T>& rBuffer, std::size_t length = 0)
254 /// Writes data from supplied buffer to the FIFO buffer.
255 /// If there is no sufficient space for the whole
256 /// buffer to be written, data up to available
257 /// length is written.
258 /// The length of data to be written is determined from the
259 /// length argument or buffer size (when length argument is
260 /// default zero or greater than buffer size).
261 ///
262 /// Returns the length of data written.
263 {
264 if (length == 0 || length > rBuffer.size())
265 length = rBuffer.size();
266
267 return write(rBuffer.begin(), length);
268 }
269
270 std::size_t size() const
271 /// Returns the size of the buffer.
272 {
273 return _buffer.size();
274 }
275
276 std::size_t used() const
277 /// Returns the size of the used portion of the buffer.
278 {
279 return _used;
280 }
281
282 std::size_t available() const
283 /// Returns the size of the available portion of the buffer.
284 {
285 return size() - _used;
286 }
287
288 void drain(std::size_t length = 0)
289 /// Drains length number of elements from the buffer.
290 /// If length is zero or greater than buffer current
291 /// content length, buffer is emptied.
292 {
293 Mutex::ScopedLock lock(_mutex);
294
295 std::size_t usedBefore = _used;
296
297 if (0 == length || length >= _used)
298 {
299 _begin = 0;
300 _used = 0;
301 }
302 else
303 {
304 _begin += length;
305 _used -= length;
306 }
307
308 if (_notify) notify(usedBefore);
309 }
310
311 void copy(const T* ptr, std::size_t length)
312 /// Copies the supplied data to the buffer and adjusts
313 /// the used buffer size.
314 {
315 poco_check_ptr(ptr);
316 if (0 == length) return;
317
318 Mutex::ScopedLock lock(_mutex);
319
320 if (length > available())
321 throw Poco::InvalidAccessException("Cannot extend buffer.");
322
323 if (!isWritable())
324 throw Poco::InvalidAccessException("Buffer not writable.");
325
326 std::memcpy(begin() + _used, ptr, length * sizeof(T));
327 std::size_t usedBefore = _used;
328 _used += length;
329 if (_notify) notify(usedBefore);
330 }
331
332 void advance(std::size_t length)
333 /// Advances buffer by length elements.
334 /// Should be called AFTER the data
335 /// was copied into the buffer.
336 {
337 Mutex::ScopedLock lock(_mutex);
338
339 if (length > _buffer.size() - _used - _begin)
340 throw Poco::InvalidAccessException("Cannot extend buffer.");
341
342 if (!isWritable())
343 throw Poco::InvalidAccessException("Buffer not writable.");
344
345 std::size_t usedBefore = _used;
346 _used += length;
347 if (_notify) notify(usedBefore);
348 }
349
350 T* begin()
351 /// Returns the pointer to the beginning of the buffer.
352 {
353 Mutex::ScopedLock lock(_mutex);
354 if (_begin != 0)
355 {
356 // Move the data to the start of the buffer so begin() and next()
357 // always return consistent pointers with each other and allow writing
358 // to the end of the buffer.
359 std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T));
360 _begin = 0;
361 }
362 return _buffer.begin();
363 }
364
365 T* next()
366 /// Returns the pointer to the next available position in the buffer.
367 {
368 Mutex::ScopedLock lock(_mutex);
369 return begin() + _used;
370 }
371
372 T& operator [] (std::size_t index)
373 /// Returns value at index position.
374 /// Throws InvalidAccessException if index is larger than
375 /// the last valid (used) buffer position.
376 {
377 Mutex::ScopedLock lock(_mutex);
378 if (index >= _used)
379 throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
380
381 return _buffer[_begin + index];
382 }
383
384 const T& operator [] (std::size_t index) const
385 /// Returns value at index position.
386 /// Throws InvalidAccessException if index is larger than
387 /// the last valid (used) buffer position.
388 {
389 Mutex::ScopedLock lock(_mutex);
390 if (index >= _used)
391 throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
392
393 return _buffer[_begin + index];
394 }
395
396 const Buffer<T>& buffer() const
397 /// Returns const reference to the underlying buffer.
398 {
399 return _buffer;
400 }
401
402 void setError(bool error = true)
403 /// Sets the error flag on the buffer and empties it.
404 /// If notifications are enabled, they will be triggered
405 /// if appropriate.
406 ///
407 /// Setting error flag to true prevents reading and writing
408 /// to the buffer; to re-enable FIFOBuffer for reading/writing,
409 /// the error flag must be set to false.
410 {
411 if (error)
412 {
413 bool f = false;
414 Mutex::ScopedLock lock(_mutex);
415 if (error && isReadable() && _notify) readable.notify(this, f);
416 if (error && isWritable() && _notify) writable.notify(this, f);
417 _error = error;
418 _used = 0;
419 }
420 else
421 {
422 bool t = true;
423 Mutex::ScopedLock lock(_mutex);
424 _error = false;
425 if (_notify && !_eof) writable.notify(this, t);
426 }
427 }
428
429 bool isValid() const
430 /// Returns true if error flag is not set on the buffer,
431 /// otherwise returns false.
432 {
433 return !_error;
434 }
435
436 void setEOF(bool eof = true)
437 /// Sets end-of-file flag on the buffer.
438 ///
439 /// Setting EOF flag to true prevents writing to the
440 /// buffer; reading from the buffer will still be
441 /// allowed until all data present in the buffer at the
442 /// EOF set time is drained. After that, to re-enable
443 /// FIFOBuffer for reading/writing, EOF must be
444 /// set to false.
445 ///
446 /// Setting EOF flag to false clears EOF state if it
447 /// was previously set. If EOF was not set, it has no
448 /// effect.
449 {
450 Mutex::ScopedLock lock(_mutex);
451 bool flag = !eof;
452 if (_notify) writable.notify(this, flag);
453 _eof = eof;
454 }
455
456 bool hasEOF() const
457 /// Returns true if EOF flag has been set.
458 {
459 return _eof;
460 }
461
462 bool isEOF() const
463 /// Returns true if EOF flag has been set and buffer is empty.
464 {
465 return isEmpty() && _eof;
466 }
467
468 bool isEmpty() const
469 /// Returns true is buffer is empty, false otherwise.
470 {
471 return 0 == _used;
472 }
473
474 bool isFull() const
475 /// Returns true is buffer is full, false otherwise.
476 {
477 return size() == _used;
478 }
479
480 bool isReadable() const
481 /// Returns true if buffer contains data and is not
482 /// in error state.
483 {
484 return !isEmpty() && isValid();
485 }
486
487 bool isWritable() const
488 /// Returns true if buffer is not full and is not
489 /// in error state.
490 {
491 return !isFull() && isValid() && !_eof;
492 }
493
494 void setNotify(bool bufferNotify = true)
495 /// Enables/disables notifications.
496 {
497 _notify = bufferNotify;
498 }
499
500 bool getNotify() const
501 /// Returns true if notifications are enabled, false otherwise.
502 {
503 return _notify;
504 }
505
506 Mutex& mutex()
507 /// Returns reference to mutex.
508 {
509 return _mutex;
510 }
511
512private:
513 void notify(std::size_t usedBefore)
514 {
515 bool t = true, f = false;
516 if (usedBefore == 0 && _used > 0)
517 readable.notify(this, t);
518 else if (usedBefore > 0 && 0 == _used)
519 readable.notify(this, f);
520
521 if (usedBefore == _buffer.size() && _used < _buffer.size())
522 writable.notify(this, t);
523 else if (usedBefore < _buffer.size() && _used == _buffer.size())
524 writable.notify(this, f);
525 }
526
527 BasicFIFOBuffer();
528 BasicFIFOBuffer(const BasicFIFOBuffer&);
529 BasicFIFOBuffer& operator = (const BasicFIFOBuffer&);
530
531 Buffer<T> _buffer;
532 std::size_t _begin;
533 std::size_t _used;
534 bool _notify;
535 mutable Mutex _mutex;
536 bool _eof;
537 bool _error;
538};
539
540
541//
542// We provide an instantiation for char
543//
544typedef BasicFIFOBuffer<char> FIFOBuffer;
545
546
547} // namespace Poco
548
549
550#endif // Foundation_FIFOBuffer_INCLUDED
551