1#pragma once
2
3#include <queue>
4#include <type_traits>
5
6#include <Poco/Mutex.h>
7#include <Poco/Semaphore.h>
8
9#include <common/Types.h>
10
11
12namespace detail
13{
14 template <typename T, bool is_nothrow_move_assignable = std::is_nothrow_move_assignable_v<T>>
15 struct MoveOrCopyIfThrow;
16
17 template <typename T>
18 struct MoveOrCopyIfThrow<T, true>
19 {
20 void operator()(T && src, T & dst) const
21 {
22 dst = std::forward<T>(src);
23 }
24 };
25
26 template <typename T>
27 struct MoveOrCopyIfThrow<T, false>
28 {
29 void operator()(T && src, T & dst) const
30 {
31 dst = src;
32 }
33 };
34
35 template <typename T>
36 void moveOrCopyIfThrow(T && src, T & dst)
37 {
38 MoveOrCopyIfThrow<T>()(std::forward<T>(src), dst);
39 }
40}
41
42/** A very simple thread-safe queue of limited size.
43 * If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty.
44 * If you try to push an element into an overflowed queue, the thread is blocked until space appears in the queue.
45 */
46template <typename T>
47class ConcurrentBoundedQueue
48{
49private:
50 std::queue<T> queue;
51 Poco::FastMutex mutex;
52 Poco::Semaphore fill_count;
53 Poco::Semaphore empty_count;
54
55public:
56 ConcurrentBoundedQueue(size_t max_fill)
57 : fill_count(0, max_fill), empty_count(max_fill, max_fill) {}
58
59 void push(const T & x)
60 {
61 empty_count.wait();
62 {
63 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
64 queue.push(x);
65 }
66 fill_count.set();
67 }
68
69 template <typename... Args>
70 void emplace(Args &&... args)
71 {
72 empty_count.wait();
73 {
74 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
75 queue.emplace(std::forward<Args>(args)...);
76 }
77 fill_count.set();
78 }
79
80 void pop(T & x)
81 {
82 fill_count.wait();
83 {
84 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
85 detail::moveOrCopyIfThrow(std::move(queue.front()), x);
86 queue.pop();
87 }
88 empty_count.set();
89 }
90
91 bool tryPush(const T & x, UInt64 milliseconds = 0)
92 {
93 if (empty_count.tryWait(milliseconds))
94 {
95 {
96 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
97 queue.push(x);
98 }
99 fill_count.set();
100 return true;
101 }
102 return false;
103 }
104
105 template <typename... Args>
106 bool tryEmplace(UInt64 milliseconds, Args &&... args)
107 {
108 if (empty_count.tryWait(milliseconds))
109 {
110 {
111 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
112 queue.emplace(std::forward<Args>(args)...);
113 }
114 fill_count.set();
115 return true;
116 }
117 return false;
118 }
119
120 bool tryPop(T & x, UInt64 milliseconds = 0)
121 {
122 if (fill_count.tryWait(milliseconds))
123 {
124 {
125 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
126 detail::moveOrCopyIfThrow(std::move(queue.front()), x);
127 queue.pop();
128 }
129 empty_count.set();
130 return true;
131 }
132 return false;
133 }
134
135 size_t size()
136 {
137 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
138 return queue.size();
139 }
140
141 void clear()
142 {
143 while (fill_count.tryWait(0))
144 {
145 {
146 Poco::ScopedLock<Poco::FastMutex> lock(mutex);
147 queue.pop();
148 }
149 empty_count.set();
150 }
151 }
152};
153