1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/Function.h>
20#include <folly/ThreadLocal.h>
21#include <folly/synchronization/AsymmetricMemoryBarrier.h>
22
23// This is unlike folly::ThreadCachedInt in that the full value
24// is never rounded up globally and cached, it only supports readFull.
25//
26// folly/experimental/TLRefCount is similar, but does not support a
27// waitForZero, and is not reset-able.
28//
29// Note that the RCU implementation is completely abstracted from the
30// counter implementation, a rseq implementation can be dropped in
31// if the kernel supports it.
32
33namespace folly {
34
35namespace detail {
36
37template <typename Tag>
38class ThreadCachedInts {
39 std::atomic<int64_t> orphan_inc_[2] = {};
40 std::atomic<int64_t> orphan_dec_[2] = {};
41 folly::detail::Futex<> waiting_{0};
42
43 class Integer {
44 public:
45 ThreadCachedInts* ints_;
46 constexpr Integer(ThreadCachedInts* ints) noexcept
47 : ints_(ints), inc_{}, dec_{}, cache_(ints->int_cache_) {}
48 std::atomic<int64_t> inc_[2];
49 std::atomic<int64_t> dec_[2];
50 Integer*& cache_; // reference to the cached ptr
51 ~Integer() noexcept {
52 // Increment counts must be set before decrement counts
53 ints_->orphan_inc_[0].fetch_add(
54 inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
55 ints_->orphan_inc_[1].fetch_add(
56 inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
57 folly::asymmetricLightBarrier(); // B
58 ints_->orphan_dec_[0].fetch_add(
59 dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
60 ints_->orphan_dec_[1].fetch_add(
61 dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
62 ints_->waiting_.store(0, std::memory_order_release);
63 detail::futexWake(&ints_->waiting_);
64 // reset the cache_ on destructor so we can handle the delete/recreate
65 cache_ = nullptr;
66 }
67 };
68 folly::ThreadLocalPtr<Integer, Tag> cs_;
69
70 // Cache the int pointer in a threadlocal.
71 static thread_local Integer* int_cache_;
72
73 void init() {
74 auto ret = new Integer(this);
75 cs_.reset(ret);
76 int_cache_ = ret;
77 }
78
79 public:
80 FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
81 if (!int_cache_) {
82 init();
83 }
84
85 auto& c = int_cache_->inc_[epoch];
86 auto val = c.load(std::memory_order_relaxed);
87 c.store(val + 1, std::memory_order_relaxed);
88
89 folly::asymmetricLightBarrier(); // A
90 }
91
92 FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
93 folly::asymmetricLightBarrier(); // B
94 if (!int_cache_) {
95 init();
96 }
97
98 auto& c = int_cache_->dec_[epoch];
99 auto val = c.load(std::memory_order_relaxed);
100 c.store(val + 1, std::memory_order_relaxed);
101
102 folly::asymmetricLightBarrier(); // C
103 if (waiting_.load(std::memory_order_acquire)) {
104 waiting_.store(0, std::memory_order_release);
105 detail::futexWake(&waiting_);
106 }
107 }
108
109 int64_t readFull(uint8_t epoch) {
110 int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
111
112 // Matches A - ensure all threads have seen new value of version,
113 // *and* that we see current values of counters in readFull()
114 //
115 // Note that in lock_shared if a reader is currently between the
116 // version load and counter increment, they may update the wrong
117 // epoch. However, this is ok - they started concurrently *after*
118 // any callbacks that will run, and therefore it is safe to run
119 // the callbacks.
120 folly::asymmetricHeavyBarrier();
121 for (auto& i : cs_.accessAllThreads()) {
122 full -= i.dec_[epoch].load(std::memory_order_relaxed);
123 }
124
125 // Matches B - ensure that all increments are seen if decrements
126 // are seen. This is necessary because increment and decrement
127 // are allowed to happen on different threads.
128 folly::asymmetricHeavyBarrier();
129
130 auto accessor = cs_.accessAllThreads();
131 for (auto& i : accessor) {
132 full += i.inc_[epoch].load(std::memory_order_relaxed);
133 }
134
135 // orphan is read behind accessAllThreads lock
136 return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
137 }
138
139 void waitForZero(uint8_t phase) {
140 // Try reading before futex sleeping.
141 if (readFull(phase) == 0) {
142 return;
143 }
144
145 while (true) {
146 waiting_.store(1, std::memory_order_release);
147 // Matches C. Ensure either decrement sees waiting_,
148 // or we see their decrement and can safely sleep.
149 folly::asymmetricHeavyBarrier();
150 if (readFull(phase) == 0) {
151 break;
152 }
153 detail::futexWait(&waiting_, 1);
154 }
155 waiting_.store(0, std::memory_order_relaxed);
156 }
157
158 // We are guaranteed to be called while StaticMeta lock is still
159 // held because of ordering in AtForkList. We can therefore safely
160 // touch orphan_ and clear out all counts.
161 void resetAfterFork() {
162 if (int_cache_) {
163 int_cache_->dec_[0].store(0, std::memory_order_relaxed);
164 int_cache_->dec_[1].store(0, std::memory_order_relaxed);
165 int_cache_->inc_[0].store(0, std::memory_order_relaxed);
166 int_cache_->inc_[1].store(0, std::memory_order_relaxed);
167 }
168 orphan_inc_[0].store(0, std::memory_order_relaxed);
169 orphan_inc_[1].store(0, std::memory_order_relaxed);
170 orphan_dec_[0].store(0, std::memory_order_relaxed);
171 orphan_dec_[1].store(0, std::memory_order_relaxed);
172 }
173};
174
175template <typename Tag>
176thread_local typename detail::ThreadCachedInts<Tag>::Integer*
177 detail::ThreadCachedInts<Tag>::int_cache_ = nullptr;
178
179} // namespace detail
180} // namespace folly
181