1 | #pragma once |
2 | |
3 | #include <queue> |
4 | #include <type_traits> |
5 | |
6 | #include <Poco/Mutex.h> |
7 | #include <Poco/Semaphore.h> |
8 | |
9 | #include <common/Types.h> |
10 | |
11 | |
12 | namespace detail |
13 | { |
14 | template <typename T, bool is_nothrow_move_assignable = std::is_nothrow_move_assignable_v<T>> |
15 | struct MoveOrCopyIfThrow; |
16 | |
17 | template <typename T> |
18 | struct MoveOrCopyIfThrow<T, true> |
19 | { |
20 | void operator()(T && src, T & dst) const |
21 | { |
22 | dst = std::forward<T>(src); |
23 | } |
24 | }; |
25 | |
26 | template <typename T> |
27 | struct MoveOrCopyIfThrow<T, false> |
28 | { |
29 | void operator()(T && src, T & dst) const |
30 | { |
31 | dst = src; |
32 | } |
33 | }; |
34 | |
35 | template <typename T> |
36 | void moveOrCopyIfThrow(T && src, T & dst) |
37 | { |
38 | MoveOrCopyIfThrow<T>()(std::forward<T>(src), dst); |
39 | } |
40 | } |
41 | |
42 | /** A very simple thread-safe queue of limited size. |
43 | * If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty. |
44 | * If you try to push an element into an overflowed queue, the thread is blocked until space appears in the queue. |
45 | */ |
46 | template <typename T> |
47 | class ConcurrentBoundedQueue |
48 | { |
49 | private: |
50 | std::queue<T> queue; |
51 | Poco::FastMutex mutex; |
52 | Poco::Semaphore fill_count; |
53 | Poco::Semaphore empty_count; |
54 | |
55 | public: |
56 | ConcurrentBoundedQueue(size_t max_fill) |
57 | : fill_count(0, max_fill), empty_count(max_fill, max_fill) {} |
58 | |
59 | void push(const T & x) |
60 | { |
61 | empty_count.wait(); |
62 | { |
63 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
64 | queue.push(x); |
65 | } |
66 | fill_count.set(); |
67 | } |
68 | |
69 | template <typename... Args> |
70 | void emplace(Args &&... args) |
71 | { |
72 | empty_count.wait(); |
73 | { |
74 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
75 | queue.emplace(std::forward<Args>(args)...); |
76 | } |
77 | fill_count.set(); |
78 | } |
79 | |
80 | void pop(T & x) |
81 | { |
82 | fill_count.wait(); |
83 | { |
84 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
85 | detail::moveOrCopyIfThrow(std::move(queue.front()), x); |
86 | queue.pop(); |
87 | } |
88 | empty_count.set(); |
89 | } |
90 | |
91 | bool tryPush(const T & x, UInt64 milliseconds = 0) |
92 | { |
93 | if (empty_count.tryWait(milliseconds)) |
94 | { |
95 | { |
96 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
97 | queue.push(x); |
98 | } |
99 | fill_count.set(); |
100 | return true; |
101 | } |
102 | return false; |
103 | } |
104 | |
105 | template <typename... Args> |
106 | bool tryEmplace(UInt64 milliseconds, Args &&... args) |
107 | { |
108 | if (empty_count.tryWait(milliseconds)) |
109 | { |
110 | { |
111 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
112 | queue.emplace(std::forward<Args>(args)...); |
113 | } |
114 | fill_count.set(); |
115 | return true; |
116 | } |
117 | return false; |
118 | } |
119 | |
120 | bool tryPop(T & x, UInt64 milliseconds = 0) |
121 | { |
122 | if (fill_count.tryWait(milliseconds)) |
123 | { |
124 | { |
125 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
126 | detail::moveOrCopyIfThrow(std::move(queue.front()), x); |
127 | queue.pop(); |
128 | } |
129 | empty_count.set(); |
130 | return true; |
131 | } |
132 | return false; |
133 | } |
134 | |
135 | size_t size() |
136 | { |
137 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
138 | return queue.size(); |
139 | } |
140 | |
141 | void clear() |
142 | { |
143 | while (fill_count.tryWait(0)) |
144 | { |
145 | { |
146 | Poco::ScopedLock<Poco::FastMutex> lock(mutex); |
147 | queue.pop(); |
148 | } |
149 | empty_count.set(); |
150 | } |
151 | } |
152 | }; |
153 | |