| 1 | // |
| 2 | // FIFOBuffer.h |
| 3 | // |
| 4 | // Library: Foundation |
| 5 | // Package: Core |
| 6 | // Module: FIFOBuffer |
| 7 | // |
| 8 | // Definition of the FIFOBuffer class. |
| 9 | // |
| 10 | // Copyright (c) 2006, Applied Informatics Software Engineering GmbH. |
| 11 | // and Contributors. |
| 12 | // |
| 13 | // SPDX-License-Identifier: BSL-1.0 |
| 14 | // |
| 15 | |
| 16 | |
| 17 | #ifndef Foundation_FIFOBuffer_INCLUDED |
| 18 | #define Foundation_FIFOBuffer_INCLUDED |
| 19 | |
| 20 | |
| 21 | #include "Poco/Foundation.h" |
| 22 | #include "Poco/Exception.h" |
| 23 | #include "Poco/Buffer.h" |
| 24 | #include "Poco/BasicEvent.h" |
| 25 | #include "Poco/Mutex.h" |
| 26 | #include "Poco/Format.h" |
| 27 | |
| 28 | |
| 29 | namespace Poco { |
| 30 | |
| 31 | |
| 32 | template <class T> |
| 33 | class BasicFIFOBuffer |
| 34 | /// A simple buffer class with support for re-entrant, |
| 35 | /// FIFO-style read/write operations, as well as (optional) |
| 36 | /// empty/non-empty/full (i.e. writable/readable) transition |
| 37 | /// notifications. Buffer can be flagged with end-of-file and |
| 38 | /// error flags, which renders it un-readable/writable. |
| 39 | /// |
| 40 | /// Critical portions of code are protected by a recursive mutex. |
| 41 | /// However, to achieve thread-safety in cases where multiple |
| 42 | /// member function calls are involved and have to be atomic, |
| 43 | /// the mutex must be locked externally. |
| 44 | /// |
| 45 | /// Buffer size, as well as amount of unread data and |
| 46 | /// available space introspections are supported as well. |
| 47 | /// |
| 48 | /// This class is useful anywhere where a FIFO functionality |
| 49 | /// is needed. |
| 50 | { |
| 51 | public: |
| 52 | typedef T Type; |
| 53 | |
| 54 | mutable Poco::BasicEvent<bool> writable; |
| 55 | /// Event indicating "writability" of the buffer, |
| 56 | /// triggered as follows: |
| 57 | /// |
| 58 | /// * when buffer transitions from non-full to full, |
| 59 | /// Writable event observers are notified, with |
| 60 | /// false value as the argument |
| 61 | /// |
| 62 | /// * when buffer transitions from full to non-full, |
| 63 | /// Writable event observers are notified, with |
| 64 | /// true value as the argument |
| 65 | |
| 66 | mutable Poco::BasicEvent<bool> readable; |
| 67 | /// Event indicating "readability" of the buffer, |
| 68 | /// triggered as follows: |
| 69 | /// |
| 70 | /// * when buffer transitions from non-empty to empty, |
| 71 | /// Readable event observers are notified, with false |
| 72 | /// value as the argument |
| 73 | /// |
| 74 | /// * when FIFOBuffer transitions from empty to non-empty, |
| 75 | /// Readable event observers are notified, with true value |
| 76 | /// as the argument |
| 77 | |
| 78 | BasicFIFOBuffer(std::size_t bufferSize, bool bufferNotify = false): |
| 79 | _buffer(bufferSize), |
| 80 | _begin(0), |
| 81 | _used(0), |
| 82 | _notify(bufferNotify), |
| 83 | _eof(false), |
| 84 | _error(false) |
| 85 | /// Creates the FIFOBuffer. |
| 86 | { |
| 87 | } |
| 88 | |
| 89 | BasicFIFOBuffer(T* pBuffer, std::size_t bufferSize, bool bufferNotify = false): |
| 90 | _buffer(pBuffer, bufferSize), |
| 91 | _begin(0), |
| 92 | _used(0), |
| 93 | _notify(bufferNotify), |
| 94 | _eof(false), |
| 95 | _error(false) |
| 96 | /// Creates the FIFOBuffer. |
| 97 | { |
| 98 | } |
| 99 | |
| 100 | BasicFIFOBuffer(const T* pBuffer, std::size_t bufferSize, bool bufferNotify = false): |
| 101 | _buffer(pBuffer, bufferSize), |
| 102 | _begin(0), |
| 103 | _used(bufferSize), |
| 104 | _notify(bufferNotify), |
| 105 | _eof(false), |
| 106 | _error(false) |
| 107 | /// Creates the FIFOBuffer. |
| 108 | { |
| 109 | } |
| 110 | |
| 111 | ~BasicFIFOBuffer() |
| 112 | /// Destroys the FIFOBuffer. |
| 113 | { |
| 114 | } |
| 115 | |
| 116 | void resize(std::size_t newSize, bool preserveContent = true) |
| 117 | /// Resizes the buffer. If preserveContent is true, |
| 118 | /// the content of the old buffer is preserved. |
| 119 | /// New size can be larger or smaller than |
| 120 | /// the current size, but it must not be 0. |
| 121 | /// Additionally, if the new length is smaller |
| 122 | /// than currently used length and preserveContent |
| 123 | /// is true, InvalidAccessException is thrown. |
| 124 | { |
| 125 | Mutex::ScopedLock lock(_mutex); |
| 126 | |
| 127 | if (preserveContent && (newSize < _used)) |
| 128 | throw InvalidAccessException("Can not resize FIFO without data loss." ); |
| 129 | |
| 130 | std::size_t usedBefore = _used; |
| 131 | _buffer.resize(newSize, preserveContent); |
| 132 | if (!preserveContent) _used = 0; |
| 133 | if (_notify) notify(usedBefore); |
| 134 | } |
| 135 | |
| 136 | std::size_t peek(T* pBuffer, std::size_t length) const |
| 137 | /// Peeks into the data currently in the FIFO |
| 138 | /// without actually extracting it. |
| 139 | /// If length is zero, the return is immediate. |
| 140 | /// If length is greater than used length, |
| 141 | /// it is substituted with the the current FIFO |
| 142 | /// used length. |
| 143 | /// |
| 144 | /// Returns the number of elements copied in the |
| 145 | /// supplied buffer. |
| 146 | { |
| 147 | if (0 == length) return 0; |
| 148 | Mutex::ScopedLock lock(_mutex); |
| 149 | if (!isReadable()) return 0; |
| 150 | if (length > _used) length = _used; |
| 151 | std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T)); |
| 152 | return length; |
| 153 | } |
| 154 | |
| 155 | std::size_t peek(Poco::Buffer<T>& rBuffer, std::size_t length = 0) const |
| 156 | /// Peeks into the data currently in the FIFO |
| 157 | /// without actually extracting it. |
| 158 | /// Resizes the supplied buffer to the size of |
| 159 | /// data written to it. If length is not |
| 160 | /// supplied by the caller or is greater than length |
| 161 | /// of currently used data, the current FIFO used |
| 162 | /// data length is substituted for it. |
| 163 | /// |
| 164 | /// Returns the number of elements copied in the |
| 165 | /// supplied buffer. |
| 166 | { |
| 167 | Mutex::ScopedLock lock(_mutex); |
| 168 | if (!isReadable()) return 0; |
| 169 | if (0 == length || length > _used) length = _used; |
| 170 | rBuffer.resize(length); |
| 171 | return peek(rBuffer.begin(), length); |
| 172 | } |
| 173 | |
| 174 | std::size_t read(T* pBuffer, std::size_t length) |
| 175 | /// Copies the data currently in the FIFO |
| 176 | /// into the supplied buffer, which must be |
| 177 | /// preallocated to at least the length size |
| 178 | /// before calling this function. |
| 179 | /// |
| 180 | /// Returns the size of the copied data. |
| 181 | { |
| 182 | if (0 == length) return 0; |
| 183 | Mutex::ScopedLock lock(_mutex); |
| 184 | if (!isReadable()) return 0; |
| 185 | std::size_t usedBefore = _used; |
| 186 | std::size_t readLen = peek(pBuffer, length); |
| 187 | poco_assert (_used >= readLen); |
| 188 | _used -= readLen; |
| 189 | if (0 == _used) _begin = 0; |
| 190 | else _begin += length; |
| 191 | |
| 192 | if (_notify) notify(usedBefore); |
| 193 | |
| 194 | return readLen; |
| 195 | } |
| 196 | |
| 197 | std::size_t read(Poco::Buffer<T>& rBuffer, std::size_t length = 0) |
| 198 | /// Copies the data currently in the FIFO |
| 199 | /// into the supplied buffer. |
| 200 | /// Resizes the supplied buffer to the size of |
| 201 | /// data written to it. |
| 202 | /// |
| 203 | /// Returns the size of the copied data. |
| 204 | { |
| 205 | Mutex::ScopedLock lock(_mutex); |
| 206 | if (!isReadable()) return 0; |
| 207 | std::size_t usedBefore = _used; |
| 208 | std::size_t readLen = peek(rBuffer, length); |
| 209 | poco_assert (_used >= readLen); |
| 210 | _used -= readLen; |
| 211 | if (0 == _used) _begin = 0; |
| 212 | else _begin += length; |
| 213 | |
| 214 | if (_notify) notify(usedBefore); |
| 215 | |
| 216 | return readLen; |
| 217 | } |
| 218 | |
| 219 | std::size_t write(const T* pBuffer, std::size_t length) |
| 220 | /// Writes data from supplied buffer to the FIFO buffer. |
| 221 | /// If there is no sufficient space for the whole |
| 222 | /// buffer to be written, data up to available |
| 223 | /// length is written. |
| 224 | /// The length of data to be written is determined from the |
| 225 | /// length argument. Function does nothing and returns zero |
| 226 | /// if length argument is equal to zero. |
| 227 | /// |
| 228 | /// Returns the length of data written. |
| 229 | { |
| 230 | if (0 == length) return 0; |
| 231 | |
| 232 | Mutex::ScopedLock lock(_mutex); |
| 233 | |
| 234 | if (!isWritable()) return 0; |
| 235 | |
| 236 | if (_buffer.size() - (_begin + _used) < length) |
| 237 | { |
| 238 | std::memmove(_buffer.begin(), begin(), _used * sizeof(T)); |
| 239 | _begin = 0; |
| 240 | } |
| 241 | |
| 242 | std::size_t usedBefore = _used; |
| 243 | std::size_t availableBefore = _buffer.size() - _used - _begin; |
| 244 | std::size_t len = length > availableBefore ? availableBefore : length; |
| 245 | std::memcpy(begin() + _used, pBuffer, len * sizeof(T)); |
| 246 | _used += len; |
| 247 | poco_assert (_used <= _buffer.size()); |
| 248 | if (_notify) notify(usedBefore); |
| 249 | |
| 250 | return len; |
| 251 | } |
| 252 | |
| 253 | std::size_t write(const Buffer<T>& rBuffer, std::size_t length = 0) |
| 254 | /// Writes data from supplied buffer to the FIFO buffer. |
| 255 | /// If there is no sufficient space for the whole |
| 256 | /// buffer to be written, data up to available |
| 257 | /// length is written. |
| 258 | /// The length of data to be written is determined from the |
| 259 | /// length argument or buffer size (when length argument is |
| 260 | /// default zero or greater than buffer size). |
| 261 | /// |
| 262 | /// Returns the length of data written. |
| 263 | { |
| 264 | if (length == 0 || length > rBuffer.size()) |
| 265 | length = rBuffer.size(); |
| 266 | |
| 267 | return write(rBuffer.begin(), length); |
| 268 | } |
| 269 | |
| 270 | std::size_t size() const |
| 271 | /// Returns the size of the buffer. |
| 272 | { |
| 273 | return _buffer.size(); |
| 274 | } |
| 275 | |
| 276 | std::size_t used() const |
| 277 | /// Returns the size of the used portion of the buffer. |
| 278 | { |
| 279 | return _used; |
| 280 | } |
| 281 | |
| 282 | std::size_t available() const |
| 283 | /// Returns the size of the available portion of the buffer. |
| 284 | { |
| 285 | return size() - _used; |
| 286 | } |
| 287 | |
| 288 | void drain(std::size_t length = 0) |
| 289 | /// Drains length number of elements from the buffer. |
| 290 | /// If length is zero or greater than buffer current |
| 291 | /// content length, buffer is emptied. |
| 292 | { |
| 293 | Mutex::ScopedLock lock(_mutex); |
| 294 | |
| 295 | std::size_t usedBefore = _used; |
| 296 | |
| 297 | if (0 == length || length >= _used) |
| 298 | { |
| 299 | _begin = 0; |
| 300 | _used = 0; |
| 301 | } |
| 302 | else |
| 303 | { |
| 304 | _begin += length; |
| 305 | _used -= length; |
| 306 | } |
| 307 | |
| 308 | if (_notify) notify(usedBefore); |
| 309 | } |
| 310 | |
| 311 | void copy(const T* ptr, std::size_t length) |
| 312 | /// Copies the supplied data to the buffer and adjusts |
| 313 | /// the used buffer size. |
| 314 | { |
| 315 | poco_check_ptr(ptr); |
| 316 | if (0 == length) return; |
| 317 | |
| 318 | Mutex::ScopedLock lock(_mutex); |
| 319 | |
| 320 | if (length > available()) |
| 321 | throw Poco::InvalidAccessException("Cannot extend buffer." ); |
| 322 | |
| 323 | if (!isWritable()) |
| 324 | throw Poco::InvalidAccessException("Buffer not writable." ); |
| 325 | |
| 326 | std::memcpy(begin() + _used, ptr, length * sizeof(T)); |
| 327 | std::size_t usedBefore = _used; |
| 328 | _used += length; |
| 329 | if (_notify) notify(usedBefore); |
| 330 | } |
| 331 | |
| 332 | void advance(std::size_t length) |
| 333 | /// Advances buffer by length elements. |
| 334 | /// Should be called AFTER the data |
| 335 | /// was copied into the buffer. |
| 336 | { |
| 337 | Mutex::ScopedLock lock(_mutex); |
| 338 | |
| 339 | if (length > _buffer.size() - _used - _begin) |
| 340 | throw Poco::InvalidAccessException("Cannot extend buffer." ); |
| 341 | |
| 342 | if (!isWritable()) |
| 343 | throw Poco::InvalidAccessException("Buffer not writable." ); |
| 344 | |
| 345 | std::size_t usedBefore = _used; |
| 346 | _used += length; |
| 347 | if (_notify) notify(usedBefore); |
| 348 | } |
| 349 | |
| 350 | T* begin() |
| 351 | /// Returns the pointer to the beginning of the buffer. |
| 352 | { |
| 353 | Mutex::ScopedLock lock(_mutex); |
| 354 | if (_begin != 0) |
| 355 | { |
| 356 | // Move the data to the start of the buffer so begin() and next() |
| 357 | // always return consistent pointers with each other and allow writing |
| 358 | // to the end of the buffer. |
| 359 | std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T)); |
| 360 | _begin = 0; |
| 361 | } |
| 362 | return _buffer.begin(); |
| 363 | } |
| 364 | |
| 365 | T* next() |
| 366 | /// Returns the pointer to the next available position in the buffer. |
| 367 | { |
| 368 | Mutex::ScopedLock lock(_mutex); |
| 369 | return begin() + _used; |
| 370 | } |
| 371 | |
| 372 | T& operator [] (std::size_t index) |
| 373 | /// Returns value at index position. |
| 374 | /// Throws InvalidAccessException if index is larger than |
| 375 | /// the last valid (used) buffer position. |
| 376 | { |
| 377 | Mutex::ScopedLock lock(_mutex); |
| 378 | if (index >= _used) |
| 379 | throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)" , index, _used - 1)); |
| 380 | |
| 381 | return _buffer[_begin + index]; |
| 382 | } |
| 383 | |
| 384 | const T& operator [] (std::size_t index) const |
| 385 | /// Returns value at index position. |
| 386 | /// Throws InvalidAccessException if index is larger than |
| 387 | /// the last valid (used) buffer position. |
| 388 | { |
| 389 | Mutex::ScopedLock lock(_mutex); |
| 390 | if (index >= _used) |
| 391 | throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)" , index, _used - 1)); |
| 392 | |
| 393 | return _buffer[_begin + index]; |
| 394 | } |
| 395 | |
| 396 | const Buffer<T>& buffer() const |
| 397 | /// Returns const reference to the underlying buffer. |
| 398 | { |
| 399 | return _buffer; |
| 400 | } |
| 401 | |
| 402 | void setError(bool error = true) |
| 403 | /// Sets the error flag on the buffer and empties it. |
| 404 | /// If notifications are enabled, they will be triggered |
| 405 | /// if appropriate. |
| 406 | /// |
| 407 | /// Setting error flag to true prevents reading and writing |
| 408 | /// to the buffer; to re-enable FIFOBuffer for reading/writing, |
| 409 | /// the error flag must be set to false. |
| 410 | { |
| 411 | if (error) |
| 412 | { |
| 413 | bool f = false; |
| 414 | Mutex::ScopedLock lock(_mutex); |
| 415 | if (error && isReadable() && _notify) readable.notify(this, f); |
| 416 | if (error && isWritable() && _notify) writable.notify(this, f); |
| 417 | _error = error; |
| 418 | _used = 0; |
| 419 | } |
| 420 | else |
| 421 | { |
| 422 | bool t = true; |
| 423 | Mutex::ScopedLock lock(_mutex); |
| 424 | _error = false; |
| 425 | if (_notify && !_eof) writable.notify(this, t); |
| 426 | } |
| 427 | } |
| 428 | |
| 429 | bool isValid() const |
| 430 | /// Returns true if error flag is not set on the buffer, |
| 431 | /// otherwise returns false. |
| 432 | { |
| 433 | return !_error; |
| 434 | } |
| 435 | |
| 436 | void setEOF(bool eof = true) |
| 437 | /// Sets end-of-file flag on the buffer. |
| 438 | /// |
| 439 | /// Setting EOF flag to true prevents writing to the |
| 440 | /// buffer; reading from the buffer will still be |
| 441 | /// allowed until all data present in the buffer at the |
| 442 | /// EOF set time is drained. After that, to re-enable |
| 443 | /// FIFOBuffer for reading/writing, EOF must be |
| 444 | /// set to false. |
| 445 | /// |
| 446 | /// Setting EOF flag to false clears EOF state if it |
| 447 | /// was previously set. If EOF was not set, it has no |
| 448 | /// effect. |
| 449 | { |
| 450 | Mutex::ScopedLock lock(_mutex); |
| 451 | bool flag = !eof; |
| 452 | if (_notify) writable.notify(this, flag); |
| 453 | _eof = eof; |
| 454 | } |
| 455 | |
| 456 | bool hasEOF() const |
| 457 | /// Returns true if EOF flag has been set. |
| 458 | { |
| 459 | return _eof; |
| 460 | } |
| 461 | |
| 462 | bool isEOF() const |
| 463 | /// Returns true if EOF flag has been set and buffer is empty. |
| 464 | { |
| 465 | return isEmpty() && _eof; |
| 466 | } |
| 467 | |
| 468 | bool isEmpty() const |
| 469 | /// Returns true is buffer is empty, false otherwise. |
| 470 | { |
| 471 | return 0 == _used; |
| 472 | } |
| 473 | |
| 474 | bool isFull() const |
| 475 | /// Returns true is buffer is full, false otherwise. |
| 476 | { |
| 477 | return size() == _used; |
| 478 | } |
| 479 | |
| 480 | bool isReadable() const |
| 481 | /// Returns true if buffer contains data and is not |
| 482 | /// in error state. |
| 483 | { |
| 484 | return !isEmpty() && isValid(); |
| 485 | } |
| 486 | |
| 487 | bool isWritable() const |
| 488 | /// Returns true if buffer is not full and is not |
| 489 | /// in error state. |
| 490 | { |
| 491 | return !isFull() && isValid() && !_eof; |
| 492 | } |
| 493 | |
| 494 | void setNotify(bool bufferNotify = true) |
| 495 | /// Enables/disables notifications. |
| 496 | { |
| 497 | _notify = bufferNotify; |
| 498 | } |
| 499 | |
| 500 | bool getNotify() const |
| 501 | /// Returns true if notifications are enabled, false otherwise. |
| 502 | { |
| 503 | return _notify; |
| 504 | } |
| 505 | |
| 506 | Mutex& mutex() |
| 507 | /// Returns reference to mutex. |
| 508 | { |
| 509 | return _mutex; |
| 510 | } |
| 511 | |
| 512 | private: |
| 513 | void notify(std::size_t usedBefore) |
| 514 | { |
| 515 | bool t = true, f = false; |
| 516 | if (usedBefore == 0 && _used > 0) |
| 517 | readable.notify(this, t); |
| 518 | else if (usedBefore > 0 && 0 == _used) |
| 519 | readable.notify(this, f); |
| 520 | |
| 521 | if (usedBefore == _buffer.size() && _used < _buffer.size()) |
| 522 | writable.notify(this, t); |
| 523 | else if (usedBefore < _buffer.size() && _used == _buffer.size()) |
| 524 | writable.notify(this, f); |
| 525 | } |
| 526 | |
| 527 | BasicFIFOBuffer(); |
| 528 | BasicFIFOBuffer(const BasicFIFOBuffer&); |
| 529 | BasicFIFOBuffer& operator = (const BasicFIFOBuffer&); |
| 530 | |
| 531 | Buffer<T> _buffer; |
| 532 | std::size_t _begin; |
| 533 | std::size_t _used; |
| 534 | bool _notify; |
| 535 | mutable Mutex _mutex; |
| 536 | bool _eof; |
| 537 | bool _error; |
| 538 | }; |
| 539 | |
| 540 | |
| 541 | // |
| 542 | // We provide an instantiation for char |
| 543 | // |
| 544 | typedef BasicFIFOBuffer<char> FIFOBuffer; |
| 545 | |
| 546 | |
| 547 | } // namespace Poco |
| 548 | |
| 549 | |
| 550 | #endif // Foundation_FIFOBuffer_INCLUDED |
| 551 | |