1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/Function.h>
18#include <folly/detail/AtFork.h>
19#include <folly/detail/TurnSequencer.h>
20
21namespace folly {
22
23template <typename Tag>
24bool rcu_domain<Tag>::singleton_ = false;
25
26template <typename Tag>
27rcu_domain<Tag>::rcu_domain(Executor* executor) noexcept
28 : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) {
29 // Please use a unique tag for each domain.
30 CHECK(!singleton_);
31 singleton_ = true;
32
33 // Register fork handlers. Holding read locks across fork is not
34 // supported. Using read locks in other atfork handlers is not
35 // supported. Other atfork handlers launching new child threads
36 // that use read locks *is* supported.
37 detail::AtFork::registerHandler(
38 this,
39 [this]() { return syncMutex_.try_lock(); },
40 [this]() { syncMutex_.unlock(); },
41 [this]() {
42 counters_.resetAfterFork();
43 syncMutex_.unlock();
44 });
45}
46
47template <typename Tag>
48rcu_domain<Tag>::~rcu_domain() {
49 detail::AtFork::unregisterHandler(this);
50}
51
52template <typename Tag>
53rcu_token<Tag> rcu_domain<Tag>::lock_shared() {
54 auto idx = version_.load(std::memory_order_acquire);
55 idx &= 1;
56 counters_.increment(idx);
57
58 return rcu_token<Tag>(idx);
59}
60
61template <typename Tag>
62void rcu_domain<Tag>::unlock_shared(rcu_token<Tag>&& token) {
63 DCHECK(0 == token.epoch_ || 1 == token.epoch_);
64 counters_.decrement(token.epoch_);
65}
66
67template <typename Tag>
68template <typename T>
69void rcu_domain<Tag>::call(T&& cbin) {
70 auto node = new list_node;
71 node->cb_ = [node, cb = std::forward<T>(cbin)]() {
72 cb();
73 delete node;
74 };
75 retire(node);
76}
77
78template <typename Tag>
79void rcu_domain<Tag>::retire(list_node* node) noexcept {
80 q_.push(node);
81
82 // Note that it's likely we hold a read lock here,
83 // so we can only half_sync(false). half_sync(true)
84 // or a synchronize() call might block forever.
85 uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
86 std::chrono::steady_clock::now().time_since_epoch())
87 .count();
88 auto syncTime = syncTime_.load(std::memory_order_relaxed);
89 if (time > syncTime + syncTimePeriod_ &&
90 syncTime_.compare_exchange_strong(
91 syncTime, time, std::memory_order_relaxed)) {
92 list_head finished;
93 {
94 std::lock_guard<std::mutex> g(syncMutex_);
95 half_sync(false, finished);
96 }
97 // callbacks are called outside of syncMutex_
98 finished.forEach(
99 [&](list_node* item) { executor_->add(std::move(item->cb_)); });
100 }
101}
102
103template <typename Tag>
104void rcu_domain<Tag>::synchronize() noexcept {
105 auto curr = version_.load(std::memory_order_acquire);
106 // Target is two epochs away.
107 auto target = curr + 2;
108 while (true) {
109 // Try to assign ourselves to do the sync work.
110 // If someone else is already assigned, we can wait for
111 // the work to be finished by waiting on turn_.
112 auto work = work_.load(std::memory_order_acquire);
113 auto tmp = work;
114 if (work < target && work_.compare_exchange_strong(tmp, target)) {
115 list_head finished;
116 {
117 std::lock_guard<std::mutex> g(syncMutex_);
118 while (version_.load(std::memory_order_acquire) < target) {
119 half_sync(true, finished);
120 }
121 }
122 // callbacks are called outside of syncMutex_
123 finished.forEach(
124 [&](list_node* node) { executor_->add(std::move(node->cb_)); });
125 return;
126 } else {
127 if (version_.load(std::memory_order_acquire) >= target) {
128 return;
129 }
130 std::atomic<uint32_t> cutoff{100};
131 // Wait for someone to finish the work.
132 turn_.tryWaitForTurn(work, cutoff, false);
133 }
134 }
135}
136
137/*
138 * Not multithread safe, but it could be with proper version
139 * checking and stronger increment of version. See
140 * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf
141 *
142 * This version, however, can go to sleep if there are outstanding
143 * readers, and does not spin or need rescheduling, unless blocking = false.
144 */
145template <typename Tag>
146void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) {
147 uint64_t curr = version_.load(std::memory_order_acquire);
148 auto next = curr + 1;
149
150 // Push all work to a queue for moving through two epochs. One
151 // version is not enough because of late readers of the version_
152 // counter in lock_shared.
153 //
154 // Note that for a similar reason we can't swap out the q here,
155 // and instead drain it, so concurrent calls to call() are safe,
156 // and will wait for the next epoch.
157 q_.collect(queues_[0]);
158
159 if (blocking) {
160 counters_.waitForZero(next & 1);
161 } else {
162 if (counters_.readFull(next & 1) != 0) {
163 return;
164 }
165 }
166
167 // Run callbacks that have been through two epochs, and swap queues
168 // for those only through a single epoch.
169 finished.splice(queues_[1]);
170 queues_[1].splice(queues_[0]);
171
172 version_.store(next, std::memory_order_release);
173 // Notify synchronous waiters in synchronize().
174 turn_.completeTurn(curr);
175}
176
177} // namespace folly
178