1// Observable Library
2// Copyright (c) 2016-2017 David Capello
3//
4// This file is released under the terms of the MIT license.
5// Read LICENSE.txt for more information.
6
7#ifndef OBS_SAFE_LIST_H_INCLUDED
8#define OBS_SAFE_LIST_H_INCLUDED
9#pragma once
10
11#include <atomic>
12#include <cassert>
13#include <chrono>
14#include <condition_variable>
15#include <iterator>
16#include <mutex>
17#include <thread>
18#include <vector>
19
20namespace obs {
21
22// A STL-like list which is safe to remove/add items from multiple
23// threads while it's being iterated by multiple threads too.
24template<typename T>
25class safe_list {
26public:
27 class iterator;
28
29private:
30 // A node in the linked list.
31 struct node {
32 // Pointer to a slot or an observer instance.
33 //
34 // As we cannot modify the list when we are in for-loops iterating
35 // the list, we can temporally mark nodes as disabled changing
36 // this "value" member to nullptr when we use erase(), Then when
37 // the list is not iterated anymore (m_ref==0), we call
38 // delete_nodes() to remove all disabled nodes from the list.
39 //
40 // We try to skip nodes with value=nullptr in the iteration
41 // process (iterator::operator++). But it isn't possible to ensure
42 // that an iterator will always return a non-nullptr value (so the
43 // client have to check the return value from iterators).
44 T* value;
45
46 // Number of locks for this node, it means the number of iterators
47 // being used and currently pointing to this node.
48 //
49 // This variable is incremented/decremented only when
50 // m_mutex_nodes is locked.
51 int locks = 0;
52
53 // Next node in the list. It's nullptr for the last node in the list.
54 node* next = nullptr;
55
56 // Thread used to add the node to the list (i.e. the thread where
57 // safe_list::push_back() was used). We suppose that the same
58 // thread will remove the node.
59 std::thread::id creator_thread;
60
61 // Pointer to the first iterator that locked this node in the same
62 // thread it was created. It is used to unlock() the node when
63 // erase() is called in the same iterator loop/call.
64 iterator* creator_thread_iterator = nullptr;
65
66 node(T* value = nullptr)
67 : value(value),
68 creator_thread(std::this_thread::get_id()) {
69 }
70
71 node(const node&) = delete;
72 node& operator=(const node&) = delete;
73
74 // Returns true if we are using this node from the same thread
75 // where it was created. (i.e. the thread where
76 // safe_list::push_back() was used.)
77 //
78 // This function is used to know if an iterator that locks/unlocks
79 // a node belongs to the same "creator thread," so when we erase()
80 // the node, we can (must) unlock all those iterators.
81 bool in_creator_thread() const {
82 return (creator_thread == std::this_thread::get_id());
83 }
84
85 // Locks the node by the given iterator. It means that
86 // iterator::operator*() is going to return the node's value so we
87 // can use it. (E.g. in case of a slot, we can call the slot
88 // function.)
89 void lock(iterator* it);
90
91 // Indicates that the node is not being used by the given iterator
92 // anymore. So we could delete it in case that erase() is
93 // called.
94 void unlock(iterator* it);
95
96 // Notifies to all iterators in the "creator thread" that they
97 // don't own a node lock anymore. It's used to erase() the node.
98 void unlock_all();
99 };
100
101 // Mutex used to modify the linked-list (m_first/m_last and node::next).
102 mutable std::mutex m_mutex_nodes;
103
104 // Used to iterate the list from the first element to the last one.
105 node* m_first = nullptr;
106
107 // Used to add new items at the end of the list (with push_back()).
108 node* m_last = nullptr;
109
110 // "m_ref" indicates the number of times this list is being iterated
111 // simultaneously. When "m_ref" reaches 0, the delete_nodes()
112 // function is called to delete all unused nodes (unlocked nodes
113 // with value=nullptr). While "m_ref" > 0 it means that we shouldn't
114 // remove nodes (so we can ensure that an actual node reference is
115 // still valid until the next unref()).
116 std::atomic<int> m_ref = { 0 };
117
118 // Flag that indicates if some node was erased and delete_nodes()
119 // should iterate the whole list to clean disabled nodes (nodes with
120 // value = nullptr).
121 bool m_delete_nodes = false;
122
123 // Used to notify when a node's locks is zero so erase() can continue.
124 std::condition_variable m_delete_cv;
125
126public:
127
128 // A STL-like iterator for safe_list. It is not a fully working
129 // iterator, and shouldn't be used directly, it's expected to be
130 // used only in range-based for loops.
131 //
132 // The iterator works in the following way:
133 //
134 // 1. It adds a new reference (ref()/unref()) to the safe_list so
135 // nodes are not deleted while the iterator is alive.
136 // 2. It "locks" the given node in iterator() ctor so the node is
137 // not deleted when there is an existent iterator pointing to it.
138 // 3. operator*() returns the node's value.
139 // 4. When the iterator is incremented (operator++) it unlocks the
140 // previous node, goes to the next one, and locks it.
141 class iterator {
142 public:
143 friend struct node;
144
145 typedef T* value_type;
146 typedef std::ptrdiff_t difference_type;
147 typedef T** pointer;
148 typedef T*& reference;
149 typedef std::forward_iterator_tag iterator_category;
150
151 iterator(safe_list& list, node* node)
152 : m_list(list),
153 m_node(node) {
154 m_list.ref();
155
156 // Lock the node because this iterator is pointing to it.
157 if (m_node)
158 lock();
159 }
160
161 // Cannot copy iterators
162 iterator(const iterator&) = delete;
163 iterator& operator=(const iterator&) = delete;
164
165 // We can only move iterators
166 iterator(iterator&& other)
167 : m_list(other.m_list),
168 m_node(other.m_node) {
169 assert(!other.m_locked);
170 m_list.ref();
171 }
172
173 ~iterator() {
174 if (m_node) {
175 std::lock_guard<std::mutex> l(m_list.m_mutex_nodes);
176 unlock();
177 }
178
179 assert(!m_locked);
180 m_list.unref();
181 }
182
183 // Called when erase() is used from the iterators created in the
184 // "creator thread".
185 void notify_unlock(const node* node) {
186 if (m_locked) {
187 assert(m_node == node);
188 assert(m_locked);
189 m_locked = false;
190 }
191 }
192
193 // Unlocks the current m_node and goes to the next one and locks it.
194 iterator& operator++() {
195 std::lock_guard<std::mutex> l(m_list.m_mutex_nodes);
196 assert(m_node);
197 if (m_node) {
198 unlock();
199
200 // Go to the next node.
201 m_node = m_node->next;
202
203 // Lock the new node that we're pointing to now.
204 if (m_node)
205 lock();
206 }
207 return *this;
208 }
209
210 // Returns the node's value. The node at this point is locked.
211 //
212 // If the node was already deleted, it will return nullptr and the
213 // client will need to call operator++() again. We cannot
214 // guarantee that this function will return a value != nullptr.
215 T* operator*() const {
216 assert(m_node && m_locked);
217 return m_value;
218 }
219
220 // This can be used only to compare an iterator created from
221 // begin() (in "this" pointer) with end() ("other" argument).
222 bool operator!=(const iterator& other) const {
223 std::lock_guard<std::mutex> l(m_list.m_mutex_nodes);
224 if (m_node && other.m_node)
225 return (m_node != other.m_node->next);
226 else
227 return false;
228 }
229
230 private:
231 // Adds a lock to m_node before we access to its value. It's used
232 // to keep track of how many iterators are using the node in the
233 // list.
234 void lock() {
235 if (m_locked)
236 return;
237
238 assert(m_node);
239 m_node->lock(this);
240 m_value = m_node->value;
241 m_locked = true;
242 }
243
244 void unlock() {
245 if (!m_locked)
246 return;
247
248 assert(m_node);
249 m_node->unlock(this);
250 m_value = nullptr;
251 m_locked = false;
252
253 // node's locks count is zero
254 if (m_node->locks == 0)
255 m_list.m_delete_cv.notify_all();
256 }
257
258 safe_list& m_list;
259
260 // Current node being iterated. It is never nullptr.
261 node* m_node;
262
263 // Cached value of m_node->value
264 T* m_value = nullptr;
265
266 // True if this iterator has added a lock to the "m_node"
267 bool m_locked = false;
268
269 // Next iterator locking the same "m_node" from its creator
270 // thread.
271 iterator* m_next_iterator = nullptr;
272 };
273
274 safe_list() {
275 }
276
277 ~safe_list() {
278 assert(m_ref == 0);
279 delete_nodes(true);
280
281 assert(m_first == m_last);
282 assert(m_first == nullptr);
283 }
284
285 bool empty() const {
286 std::lock_guard<std::mutex> lock(m_mutex_nodes);
287 return (m_first == m_last);
288 }
289
290 void push_back(T* value) {
291 node* n = new node(value);
292
293 std::lock_guard<std::mutex> lock(m_mutex_nodes);
294 if (!m_first)
295 m_first = m_last = n;
296 else {
297 m_last->next = n;
298 m_last = n;
299 }
300 }
301
302 void erase(T* value) {
303 // We add a ref to avoid calling delete_nodes().
304 ref();
305 {
306 std::unique_lock<std::mutex> lock(m_mutex_nodes);
307
308 for (node* node=m_first; node; node=node->next) {
309 if (node->value == value) {
310 // We disable the node so it isn't used anymore by other
311 // iterators.
312 assert(node->value);
313 node->unlock_all();
314 node->value = nullptr;
315 m_delete_nodes = true;
316
317 // In this case we should wait until the node is unlocked,
318 // because after erase() the client could be deleting the
319 // value that we are using in other thread.
320 if (node->locks) {
321 // Wait until the node is completely unlocked by other
322 // threads.
323 m_delete_cv.wait(lock, [node]{ return node->locks == 0; });
324 }
325
326 assert(node->locks == 0);
327
328 // The node will be finally deleted when we leave the
329 // iteration loop (m_ref==0, i.e. the end() iterator is
330 // destroyed)
331 break;
332 }
333 }
334 }
335 unref();
336 }
337
338 iterator begin() {
339 std::lock_guard<std::mutex> lock(m_mutex_nodes);
340 return iterator(*this, m_first);
341 }
342
343 iterator end() {
344 std::lock_guard<std::mutex> lock(m_mutex_nodes);
345 return iterator(*this, m_last);
346 }
347
348 void ref() {
349#if !defined(NDEBUG)
350 int v =
351#endif
352 m_ref.fetch_add(1);
353 assert(v >= 0);
354 }
355
356 void unref() {
357 int v = m_ref.fetch_sub(1);
358 assert(v >= 1);
359 if (v == 1)
360 delete_nodes(false);
361 }
362
363private:
364 // Deletes nodes from the list. If "all" is true, deletes all nodes,
365 // if it's false, it deletes only nodes with value == nullptr, which
366 // are nodes that were disabled
367 void delete_nodes(bool all) {
368 std::lock_guard<std::mutex> lock(m_mutex_nodes);
369 if (!all && !m_delete_nodes)
370 return;
371
372 node* prev = nullptr;
373 node* next = nullptr;
374
375 for (node* node=m_first; node; node=next) {
376 next = node->next;
377
378 if (all || (!node->value && !node->locks)) {
379 if (prev) {
380 prev->next = next;
381 if (node == m_last)
382 m_last = prev;
383 }
384 else {
385 m_first = next;
386 if (node == m_last)
387 m_last = m_first;
388 }
389
390 assert(!node->locks);
391 delete node;
392 }
393 else {
394 prev = node;
395 }
396 }
397
398 m_delete_nodes = false;
399 }
400
401};
402
403template<typename T>
404void safe_list<T>::node::lock(iterator* it) {
405 ++locks;
406 assert(locks > 0);
407
408 // If we are in the creator thread, we add this iterator in the
409 // "creator thread iterators" linked-list so the iterator is
410 // notified in case that the node is erased.
411 if (in_creator_thread()) {
412 it->m_next_iterator = creator_thread_iterator;
413 creator_thread_iterator = it;
414 }
415}
416
417template<typename T>
418void safe_list<T>::node::unlock(iterator* it) {
419 assert(it);
420
421 // In this case we are unlocking just one iterator, if we are in the
422 // creator thread, we've to remove this iterator from the "creator
423 // thread iterators" linked-list.
424 if (in_creator_thread()) {
425 iterator* prev = nullptr;
426 iterator* next = nullptr;
427 for (auto it2=creator_thread_iterator; it2; it2=next) {
428 next = it2->m_next_iterator;
429 if (it2 == it) {
430 if (prev)
431 prev->m_next_iterator = next;
432 else
433 creator_thread_iterator = next;
434
435 break;
436 }
437 prev = it2;
438 }
439 }
440
441 assert(locks > 0);
442 --locks;
443}
444
445// In this case we've called erase() to delete this node, so we have
446// to unlock the node from the creator thread if we are in the
447// creator thread.
448template<typename T>
449void safe_list<T>::node::unlock_all() {
450 if (in_creator_thread()) {
451 // Notify to all iterators in the creator thread that they don't
452 // have the node locked anymore. In this way we can continue the
453 // erase() call.
454 for (auto it=creator_thread_iterator; it; it=it->m_next_iterator) {
455 it->notify_unlock(this);
456
457 assert(locks > 0);
458 --locks;
459 }
460 creator_thread_iterator = nullptr;
461 }
462}
463
464} // namespace obs
465
466#endif
467