1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20
21#include <folly/Function.h>
22#include <folly/Synchronized.h>
23#include <folly/ThreadLocal.h>
24#include <glog/logging.h>
25
26namespace folly {
27
28namespace detail {
29
30// This is a thread-local cached, multi-producer single-consumer
31// queue, similar to a concurrent version of std::list.
32//
33class ThreadCachedListsBase {
34 public:
35 struct Node {
36 folly::Function<void()> cb_;
37 Node* next_{nullptr};
38 };
39};
40
41template <typename Tag>
42class ThreadCachedLists : public ThreadCachedListsBase {
43 public:
44 struct AtomicListHead {
45 std::atomic<Node*> tail_{nullptr};
46 std::atomic<Node*> head_{nullptr};
47 };
48
49 // Non-concurrent list, similar to std::list.
50 struct ListHead {
51 Node* head_{nullptr};
52 Node* tail_{nullptr};
53
54 // Run func on each list node.
55 template <typename Func>
56 void forEach(Func func) {
57 auto node = tail_;
58 while (node != nullptr) {
59 auto next = node->next_;
60 func(node);
61 node = next;
62 }
63 }
64
65 // Splice other in to this list.
66 // Afterwards, other is a valid empty listhead.
67 void splice(ListHead& other);
68
69 void splice(AtomicListHead& other);
70 };
71
72 // Push a node on a thread-local list. Returns true if local list
73 // was pushed global.
74 void push(Node* node);
75
76 // Collect all thread local lists to a single local list.
77 // This function is threadsafe with concurrent push()es,
78 // but only a single thread may call collect() at a time.
79 void collect(ListHead& list);
80
81 private:
82 // Push list to the global list.
83 void pushGlobal(ListHead& list);
84
85 folly::Synchronized<ListHead> ghead_;
86
87 struct TLHead : public AtomicListHead {
88 ThreadCachedLists* parent_;
89
90 public:
91 TLHead(ThreadCachedLists* parent) : parent_(parent) {}
92
93 ~TLHead() {
94 parent_->ghead_->splice(*this);
95 }
96 };
97
98 folly::ThreadLocalPtr<TLHead, Tag> lhead_;
99};
100
101// push() and splice() are optimistic w.r.t setting the list head: The
102// first pusher cas's the list head, which functions as a lock until
103// tail != null. The first pusher then sets tail_ = head_.
104//
105// splice() does the opposite: steals the tail_ via exchange, then
106// unlocks the list again by setting head_ to null.
107template <typename Tag>
108void ThreadCachedLists<Tag>::push(Node* node) {
109 DCHECK(node->next_ == nullptr);
110 static thread_local TLHead* cache_{nullptr};
111
112 if (!cache_) {
113 auto l = lhead_.get();
114 if (!l) {
115 lhead_.reset(new TLHead(this));
116 l = lhead_.get();
117 DCHECK(l);
118 }
119 cache_ = l;
120 }
121
122 while (true) {
123 auto head = cache_->head_.load(std::memory_order_relaxed);
124 if (!head) {
125 node->next_ = nullptr;
126 if (cache_->head_.compare_exchange_weak(head, node)) {
127 cache_->tail_.store(node);
128 break;
129 }
130 } else {
131 auto tail = cache_->tail_.load(std::memory_order_relaxed);
132 if (tail) {
133 node->next_ = tail;
134 if (cache_->tail_.compare_exchange_weak(node->next_, node)) {
135 break;
136 }
137 }
138 }
139 }
140}
141
142template <typename Tag>
143void ThreadCachedLists<Tag>::collect(ListHead& list) {
144 auto acc = lhead_.accessAllThreads();
145
146 for (auto& thr : acc) {
147 list.splice(thr);
148 }
149
150 list.splice(*ghead_.wlock());
151}
152
153template <typename Tag>
154void ThreadCachedLists<Tag>::ListHead::splice(ListHead& other) {
155 if (other.head_ != nullptr) {
156 DCHECK(other.tail_ != nullptr);
157 } else {
158 DCHECK(other.tail_ == nullptr);
159 return;
160 }
161
162 if (head_) {
163 DCHECK(tail_ != nullptr);
164 DCHECK(head_->next_ == nullptr);
165 head_->next_ = other.tail_;
166 head_ = other.head_;
167 } else {
168 DCHECK(head_ == nullptr);
169 head_ = other.head_;
170 tail_ = other.tail_;
171 }
172
173 other.head_ = nullptr;
174 other.tail_ = nullptr;
175}
176
177template <typename Tag>
178void ThreadCachedLists<Tag>::ListHead::splice(AtomicListHead& list) {
179 ListHead local;
180
181 auto tail = list.tail_.load();
182 if (tail) {
183 local.tail_ = list.tail_.exchange(nullptr);
184 local.head_ = list.head_.exchange(nullptr);
185 splice(local);
186 }
187}
188
189} // namespace detail
190} // namespace folly
191