1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/Function.h> |
18 | #include <folly/detail/AtFork.h> |
19 | #include <folly/detail/TurnSequencer.h> |
20 | |
21 | namespace folly { |
22 | |
23 | template <typename Tag> |
24 | bool rcu_domain<Tag>::singleton_ = false; |
25 | |
26 | template <typename Tag> |
27 | rcu_domain<Tag>::rcu_domain(Executor* executor) noexcept |
28 | : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) { |
29 | // Please use a unique tag for each domain. |
30 | CHECK(!singleton_); |
31 | singleton_ = true; |
32 | |
33 | // Register fork handlers. Holding read locks across fork is not |
34 | // supported. Using read locks in other atfork handlers is not |
35 | // supported. Other atfork handlers launching new child threads |
36 | // that use read locks *is* supported. |
37 | detail::AtFork::registerHandler( |
38 | this, |
39 | [this]() { return syncMutex_.try_lock(); }, |
40 | [this]() { syncMutex_.unlock(); }, |
41 | [this]() { |
42 | counters_.resetAfterFork(); |
43 | syncMutex_.unlock(); |
44 | }); |
45 | } |
46 | |
47 | template <typename Tag> |
48 | rcu_domain<Tag>::~rcu_domain() { |
49 | detail::AtFork::unregisterHandler(this); |
50 | } |
51 | |
52 | template <typename Tag> |
53 | rcu_token<Tag> rcu_domain<Tag>::lock_shared() { |
54 | auto idx = version_.load(std::memory_order_acquire); |
55 | idx &= 1; |
56 | counters_.increment(idx); |
57 | |
58 | return rcu_token<Tag>(idx); |
59 | } |
60 | |
61 | template <typename Tag> |
62 | void rcu_domain<Tag>::unlock_shared(rcu_token<Tag>&& token) { |
63 | DCHECK(0 == token.epoch_ || 1 == token.epoch_); |
64 | counters_.decrement(token.epoch_); |
65 | } |
66 | |
67 | template <typename Tag> |
68 | template <typename T> |
69 | void rcu_domain<Tag>::call(T&& cbin) { |
70 | auto node = new list_node; |
71 | node->cb_ = [node, cb = std::forward<T>(cbin)]() { |
72 | cb(); |
73 | delete node; |
74 | }; |
75 | retire(node); |
76 | } |
77 | |
78 | template <typename Tag> |
79 | void rcu_domain<Tag>::retire(list_node* node) noexcept { |
80 | q_.push(node); |
81 | |
82 | // Note that it's likely we hold a read lock here, |
83 | // so we can only half_sync(false). half_sync(true) |
84 | // or a synchronize() call might block forever. |
85 | uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>( |
86 | std::chrono::steady_clock::now().time_since_epoch()) |
87 | .count(); |
88 | auto syncTime = syncTime_.load(std::memory_order_relaxed); |
89 | if (time > syncTime + syncTimePeriod_ && |
90 | syncTime_.compare_exchange_strong( |
91 | syncTime, time, std::memory_order_relaxed)) { |
92 | list_head finished; |
93 | { |
94 | std::lock_guard<std::mutex> g(syncMutex_); |
95 | half_sync(false, finished); |
96 | } |
97 | // callbacks are called outside of syncMutex_ |
98 | finished.forEach( |
99 | [&](list_node* item) { executor_->add(std::move(item->cb_)); }); |
100 | } |
101 | } |
102 | |
103 | template <typename Tag> |
104 | void rcu_domain<Tag>::synchronize() noexcept { |
105 | auto curr = version_.load(std::memory_order_acquire); |
106 | // Target is two epochs away. |
107 | auto target = curr + 2; |
108 | while (true) { |
109 | // Try to assign ourselves to do the sync work. |
110 | // If someone else is already assigned, we can wait for |
111 | // the work to be finished by waiting on turn_. |
112 | auto work = work_.load(std::memory_order_acquire); |
113 | auto tmp = work; |
114 | if (work < target && work_.compare_exchange_strong(tmp, target)) { |
115 | list_head finished; |
116 | { |
117 | std::lock_guard<std::mutex> g(syncMutex_); |
118 | while (version_.load(std::memory_order_acquire) < target) { |
119 | half_sync(true, finished); |
120 | } |
121 | } |
122 | // callbacks are called outside of syncMutex_ |
123 | finished.forEach( |
124 | [&](list_node* node) { executor_->add(std::move(node->cb_)); }); |
125 | return; |
126 | } else { |
127 | if (version_.load(std::memory_order_acquire) >= target) { |
128 | return; |
129 | } |
130 | std::atomic<uint32_t> cutoff{100}; |
131 | // Wait for someone to finish the work. |
132 | turn_.tryWaitForTurn(work, cutoff, false); |
133 | } |
134 | } |
135 | } |
136 | |
137 | /* |
138 | * Not multithread safe, but it could be with proper version |
139 | * checking and stronger increment of version. See |
140 | * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf |
141 | * |
142 | * This version, however, can go to sleep if there are outstanding |
143 | * readers, and does not spin or need rescheduling, unless blocking = false. |
144 | */ |
145 | template <typename Tag> |
146 | void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) { |
147 | uint64_t curr = version_.load(std::memory_order_acquire); |
148 | auto next = curr + 1; |
149 | |
150 | // Push all work to a queue for moving through two epochs. One |
151 | // version is not enough because of late readers of the version_ |
152 | // counter in lock_shared. |
153 | // |
154 | // Note that for a similar reason we can't swap out the q here, |
155 | // and instead drain it, so concurrent calls to call() are safe, |
156 | // and will wait for the next epoch. |
157 | q_.collect(queues_[0]); |
158 | |
159 | if (blocking) { |
160 | counters_.waitForZero(next & 1); |
161 | } else { |
162 | if (counters_.readFull(next & 1) != 0) { |
163 | return; |
164 | } |
165 | } |
166 | |
167 | // Run callbacks that have been through two epochs, and swap queues |
168 | // for those only through a single epoch. |
169 | finished.splice(queues_[1]); |
170 | queues_[1].splice(queues_[0]); |
171 | |
172 | version_.store(next, std::memory_order_release); |
173 | // Notify synchronous waiters in synchronize(). |
174 | turn_.completeTurn(curr); |
175 | } |
176 | |
177 | } // namespace folly |
178 | |