1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <atomic> |
20 | |
21 | #include <folly/Function.h> |
22 | #include <folly/Synchronized.h> |
23 | #include <folly/ThreadLocal.h> |
24 | #include <glog/logging.h> |
25 | |
26 | namespace folly { |
27 | |
28 | namespace detail { |
29 | |
30 | // This is a thread-local cached, multi-producer single-consumer |
31 | // queue, similar to a concurrent version of std::list. |
32 | // |
33 | class ThreadCachedListsBase { |
34 | public: |
35 | struct Node { |
36 | folly::Function<void()> cb_; |
37 | Node* next_{nullptr}; |
38 | }; |
39 | }; |
40 | |
41 | template <typename Tag> |
42 | class ThreadCachedLists : public ThreadCachedListsBase { |
43 | public: |
44 | struct AtomicListHead { |
45 | std::atomic<Node*> tail_{nullptr}; |
46 | std::atomic<Node*> head_{nullptr}; |
47 | }; |
48 | |
49 | // Non-concurrent list, similar to std::list. |
50 | struct ListHead { |
51 | Node* head_{nullptr}; |
52 | Node* tail_{nullptr}; |
53 | |
54 | // Run func on each list node. |
55 | template <typename Func> |
56 | void forEach(Func func) { |
57 | auto node = tail_; |
58 | while (node != nullptr) { |
59 | auto next = node->next_; |
60 | func(node); |
61 | node = next; |
62 | } |
63 | } |
64 | |
65 | // Splice other in to this list. |
66 | // Afterwards, other is a valid empty listhead. |
67 | void splice(ListHead& other); |
68 | |
69 | void splice(AtomicListHead& other); |
70 | }; |
71 | |
72 | // Push a node on a thread-local list. Returns true if local list |
73 | // was pushed global. |
74 | void push(Node* node); |
75 | |
76 | // Collect all thread local lists to a single local list. |
77 | // This function is threadsafe with concurrent push()es, |
78 | // but only a single thread may call collect() at a time. |
79 | void collect(ListHead& list); |
80 | |
81 | private: |
82 | // Push list to the global list. |
83 | void pushGlobal(ListHead& list); |
84 | |
85 | folly::Synchronized<ListHead> ghead_; |
86 | |
87 | struct TLHead : public AtomicListHead { |
88 | ThreadCachedLists* parent_; |
89 | |
90 | public: |
91 | TLHead(ThreadCachedLists* parent) : parent_(parent) {} |
92 | |
93 | ~TLHead() { |
94 | parent_->ghead_->splice(*this); |
95 | } |
96 | }; |
97 | |
98 | folly::ThreadLocalPtr<TLHead, Tag> lhead_; |
99 | }; |
100 | |
101 | // push() and splice() are optimistic w.r.t setting the list head: The |
102 | // first pusher cas's the list head, which functions as a lock until |
103 | // tail != null. The first pusher then sets tail_ = head_. |
104 | // |
105 | // splice() does the opposite: steals the tail_ via exchange, then |
106 | // unlocks the list again by setting head_ to null. |
107 | template <typename Tag> |
108 | void ThreadCachedLists<Tag>::push(Node* node) { |
109 | DCHECK(node->next_ == nullptr); |
110 | static thread_local TLHead* cache_{nullptr}; |
111 | |
112 | if (!cache_) { |
113 | auto l = lhead_.get(); |
114 | if (!l) { |
115 | lhead_.reset(new TLHead(this)); |
116 | l = lhead_.get(); |
117 | DCHECK(l); |
118 | } |
119 | cache_ = l; |
120 | } |
121 | |
122 | while (true) { |
123 | auto head = cache_->head_.load(std::memory_order_relaxed); |
124 | if (!head) { |
125 | node->next_ = nullptr; |
126 | if (cache_->head_.compare_exchange_weak(head, node)) { |
127 | cache_->tail_.store(node); |
128 | break; |
129 | } |
130 | } else { |
131 | auto tail = cache_->tail_.load(std::memory_order_relaxed); |
132 | if (tail) { |
133 | node->next_ = tail; |
134 | if (cache_->tail_.compare_exchange_weak(node->next_, node)) { |
135 | break; |
136 | } |
137 | } |
138 | } |
139 | } |
140 | } |
141 | |
142 | template <typename Tag> |
143 | void ThreadCachedLists<Tag>::collect(ListHead& list) { |
144 | auto acc = lhead_.accessAllThreads(); |
145 | |
146 | for (auto& thr : acc) { |
147 | list.splice(thr); |
148 | } |
149 | |
150 | list.splice(*ghead_.wlock()); |
151 | } |
152 | |
153 | template <typename Tag> |
154 | void ThreadCachedLists<Tag>::ListHead::splice(ListHead& other) { |
155 | if (other.head_ != nullptr) { |
156 | DCHECK(other.tail_ != nullptr); |
157 | } else { |
158 | DCHECK(other.tail_ == nullptr); |
159 | return; |
160 | } |
161 | |
162 | if (head_) { |
163 | DCHECK(tail_ != nullptr); |
164 | DCHECK(head_->next_ == nullptr); |
165 | head_->next_ = other.tail_; |
166 | head_ = other.head_; |
167 | } else { |
168 | DCHECK(head_ == nullptr); |
169 | head_ = other.head_; |
170 | tail_ = other.tail_; |
171 | } |
172 | |
173 | other.head_ = nullptr; |
174 | other.tail_ = nullptr; |
175 | } |
176 | |
177 | template <typename Tag> |
178 | void ThreadCachedLists<Tag>::ListHead::splice(AtomicListHead& list) { |
179 | ListHead local; |
180 | |
181 | auto tail = list.tail_.load(); |
182 | if (tail) { |
183 | local.tail_ = list.tail_.exchange(nullptr); |
184 | local.head_ = list.head_.exchange(nullptr); |
185 | splice(local); |
186 | } |
187 | } |
188 | |
189 | } // namespace detail |
190 | } // namespace folly |
191 | |