1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/test/DeterministicSchedule.h>
18
19#include <assert.h>
20
21#include <algorithm>
22#include <list>
23#include <mutex>
24#include <random>
25#include <unordered_map>
26#include <utility>
27
28#include <folly/Random.h>
29
30namespace folly {
31namespace test {
32
33FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
34FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
35FOLLY_TLS bool DeterministicSchedule::tls_exiting;
36FOLLY_TLS DSchedThreadId DeterministicSchedule::tls_threadId;
37thread_local AuxAct DeterministicSchedule::tls_aux_act;
38AuxChk DeterministicSchedule::aux_chk;
39
40// access is protected by futexLock
41static std::unordered_map<
42 const detail::Futex<DeterministicAtomic>*,
43 std::list<std::pair<uint32_t, bool*>>>
44 futexQueues;
45
46static std::mutex futexLock;
47
48void ThreadTimestamps::sync(const ThreadTimestamps& src) {
49 if (src.timestamps_.size() > timestamps_.size()) {
50 timestamps_.resize(src.timestamps_.size());
51 }
52 for (size_t i = 0; i < src.timestamps_.size(); i++) {
53 timestamps_[i].sync(src.timestamps_[i]);
54 }
55}
56
57DSchedTimestamp ThreadTimestamps::advance(DSchedThreadId tid) {
58 assert(timestamps_.size() > tid.val);
59 return timestamps_[tid.val].advance();
60}
61
62void ThreadTimestamps::setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts) {
63 assert(ts.initialized());
64 if (tid.val >= timestamps_.size()) {
65 timestamps_.resize(tid.val + 1);
66 }
67 if (!timestamps_[tid.val].initialized()) {
68 timestamps_[tid.val].sync(ts);
69 }
70}
71
72void ThreadTimestamps::clear() {
73 timestamps_.clear();
74}
75
76bool ThreadTimestamps::atLeastAsRecentAs(DSchedThreadId tid, DSchedTimestamp ts)
77 const {
78 // It is not meaningful learn whether any instance is at least
79 // as recent as timestamp 0.
80 assert(ts.initialized());
81 if (tid.val >= timestamps_.size()) {
82 return false;
83 }
84 return timestamps_[tid.val].atLeastAsRecentAs(ts);
85}
86
87bool ThreadTimestamps::atLeastAsRecentAsAny(const ThreadTimestamps& src) const {
88 size_t min = timestamps_.size() < src.timestamps_.size()
89 ? timestamps_.size()
90 : src.timestamps_.size();
91 for (size_t i = 0; i < min; i++) {
92 if (src.timestamps_[i].initialized() &&
93 timestamps_[i].atLeastAsRecentAs(src.timestamps_[i])) {
94 return true;
95 }
96 }
97 return false;
98}
99
100void ThreadSyncVar::acquire() {
101 ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
102 DSchedThreadId tid = DeterministicSchedule::getThreadId();
103 threadInfo.acqRelOrder_.advance(tid);
104 threadInfo.acqRelOrder_.sync(order_);
105}
106
107void ThreadSyncVar::release() {
108 ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
109 DSchedThreadId tid = DeterministicSchedule::getThreadId();
110 threadInfo.acqRelOrder_.advance(tid);
111 order_.sync(threadInfo.acqRelOrder_);
112}
113
114void ThreadSyncVar::acq_rel() {
115 ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
116 DSchedThreadId tid = DeterministicSchedule::getThreadId();
117 threadInfo.acqRelOrder_.advance(tid);
118 threadInfo.acqRelOrder_.sync(order_);
119 order_.sync(threadInfo.acqRelOrder_);
120}
121
122DeterministicSchedule::DeterministicSchedule(
123 const std::function<size_t(size_t)>& scheduler)
124 : scheduler_(scheduler), nextThreadId_(0), step_(0) {
125 assert(tls_sem == nullptr);
126 assert(tls_sched == nullptr);
127 assert(tls_aux_act == nullptr);
128
129 tls_exiting = false;
130 tls_sem = new sem_t;
131 sem_init(tls_sem, 0, 1);
132 sems_.push_back(tls_sem);
133
134 tls_threadId = nextThreadId_++;
135 threadInfoMap_.emplace_back(tls_threadId);
136 tls_sched = this;
137}
138
139DeterministicSchedule::~DeterministicSchedule() {
140 assert(tls_sched == this);
141 assert(sems_.size() == 1);
142 assert(sems_[0] == tls_sem);
143 beforeThreadExit();
144}
145
146std::function<size_t(size_t)> DeterministicSchedule::uniform(uint64_t seed) {
147 auto rand = std::make_shared<std::ranlux48>(seed);
148 return [rand](size_t numActive) {
149 auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1);
150 return dist(*rand);
151 };
152}
153
154struct UniformSubset {
155 UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect)
156 : uniform_(DeterministicSchedule::uniform(seed)),
157 subsetSize_(subsetSize),
158 stepsBetweenSelect_(stepsBetweenSelect),
159 stepsLeft_(0) {}
160
161 size_t operator()(size_t numActive) {
162 adjustPermSize(numActive);
163 if (stepsLeft_-- == 0) {
164 stepsLeft_ = stepsBetweenSelect_ - 1;
165 shufflePrefix();
166 }
167 return perm_[uniform_(std::min(numActive, subsetSize_))];
168 }
169
170 private:
171 std::function<size_t(size_t)> uniform_;
172 const size_t subsetSize_;
173 const size_t stepsBetweenSelect_;
174
175 size_t stepsLeft_;
176 // only the first subsetSize_ is properly randomized
177 std::vector<size_t> perm_;
178
179 void adjustPermSize(size_t numActive) {
180 if (perm_.size() > numActive) {
181 perm_.erase(
182 std::remove_if(
183 perm_.begin(),
184 perm_.end(),
185 [=](size_t x) { return x >= numActive; }),
186 perm_.end());
187 } else {
188 while (perm_.size() < numActive) {
189 perm_.push_back(perm_.size());
190 }
191 }
192 assert(perm_.size() == numActive);
193 }
194
195 void shufflePrefix() {
196 for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
197 size_t j = uniform_(perm_.size() - i) + i;
198 std::swap(perm_[i], perm_[j]);
199 }
200 }
201};
202
203std::function<size_t(size_t)>
204DeterministicSchedule::uniformSubset(uint64_t seed, size_t n, size_t m) {
205 auto gen = std::make_shared<UniformSubset>(seed, n, m);
206 return [=](size_t numActive) { return (*gen)(numActive); };
207}
208
209void DeterministicSchedule::beforeSharedAccess() {
210 if (tls_sem) {
211 sem_wait(tls_sem);
212 }
213}
214
215void DeterministicSchedule::afterSharedAccess() {
216 auto sched = tls_sched;
217 if (!sched) {
218 return;
219 }
220 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
221}
222
223void DeterministicSchedule::afterSharedAccess(bool success) {
224 auto sched = tls_sched;
225 if (!sched) {
226 return;
227 }
228 sched->callAux(success);
229 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
230}
231
232size_t DeterministicSchedule::getRandNumber(size_t n) {
233 if (tls_sched) {
234 return tls_sched->scheduler_(n);
235 }
236 return Random::rand32() % n;
237}
238
239int DeterministicSchedule::getcpu(
240 unsigned* cpu,
241 unsigned* node,
242 void* /* unused */) {
243 if (cpu) {
244 *cpu = tls_threadId.val;
245 }
246 if (node) {
247 *node = tls_threadId.val;
248 }
249 return 0;
250}
251
252void DeterministicSchedule::setAuxAct(AuxAct& aux) {
253 tls_aux_act = aux;
254}
255
256void DeterministicSchedule::setAuxChk(AuxChk& aux) {
257 aux_chk = aux;
258}
259
260void DeterministicSchedule::clearAuxChk() {
261 aux_chk = nullptr;
262}
263
264void DeterministicSchedule::reschedule(sem_t* sem) {
265 auto sched = tls_sched;
266 if (sched) {
267 sched->sems_.push_back(sem);
268 }
269}
270
271sem_t* DeterministicSchedule::descheduleCurrentThread() {
272 auto sched = tls_sched;
273 if (sched) {
274 sched->sems_.erase(
275 std::find(sched->sems_.begin(), sched->sems_.end(), tls_sem));
276 }
277 return tls_sem;
278}
279
280sem_t* DeterministicSchedule::beforeThreadCreate() {
281 sem_t* s = new sem_t;
282 sem_init(s, 0, 0);
283 beforeSharedAccess();
284 sems_.push_back(s);
285 afterSharedAccess();
286 return s;
287}
288
289void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
290 assert(tls_sem == nullptr);
291 assert(tls_sched == nullptr);
292 tls_exiting = false;
293 tls_sem = sem;
294 tls_sched = this;
295 bool started = false;
296 while (!started) {
297 beforeSharedAccess();
298 if (active_.count(std::this_thread::get_id()) == 1) {
299 started = true;
300 tls_threadId = nextThreadId_++;
301 assert(tls_threadId.val == threadInfoMap_.size());
302 threadInfoMap_.emplace_back(tls_threadId);
303 }
304 afterSharedAccess();
305 }
306 atomic_thread_fence(std::memory_order_seq_cst);
307}
308
309void DeterministicSchedule::beforeThreadExit() {
310 assert(tls_sched == this);
311
312 atomic_thread_fence(std::memory_order_seq_cst);
313 beforeSharedAccess();
314 auto parent = joins_.find(std::this_thread::get_id());
315 if (parent != joins_.end()) {
316 reschedule(parent->second);
317 joins_.erase(parent);
318 }
319 sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
320 active_.erase(std::this_thread::get_id());
321 if (sems_.size() > 0) {
322 FOLLY_TEST_DSCHED_VLOG("exiting");
323 /* Wait here so that parent thread can control when the thread
324 * enters the thread local destructors. */
325 exitingSems_[std::this_thread::get_id()] = tls_sem;
326 afterSharedAccess();
327 sem_wait(tls_sem);
328 }
329 tls_sched = nullptr;
330 tls_aux_act = nullptr;
331 tls_exiting = true;
332 sem_destroy(tls_sem);
333 delete tls_sem;
334 tls_sem = nullptr;
335}
336
337void DeterministicSchedule::waitForBeforeThreadExit(std::thread& child) {
338 assert(tls_sched == this);
339 beforeSharedAccess();
340 assert(tls_sched->joins_.count(child.get_id()) == 0);
341 if (tls_sched->active_.count(child.get_id())) {
342 sem_t* sem = descheduleCurrentThread();
343 tls_sched->joins_.insert({child.get_id(), sem});
344 afterSharedAccess();
345 // Wait to be scheduled by exiting child thread
346 beforeSharedAccess();
347 assert(!tls_sched->active_.count(child.get_id()));
348 }
349 afterSharedAccess();
350}
351
352void DeterministicSchedule::joinAll(std::vector<std::thread>& children) {
353 auto sched = tls_sched;
354 if (sched) {
355 // Wait until all children are about to exit
356 for (auto& child : children) {
357 sched->waitForBeforeThreadExit(child);
358 }
359 }
360 atomic_thread_fence(std::memory_order_seq_cst);
361 /* Let each child thread proceed one at a time to protect
362 * shared access during thread local destructors.*/
363 for (auto& child : children) {
364 if (sched) {
365 sem_post(sched->exitingSems_[child.get_id()]);
366 }
367 child.join();
368 }
369}
370
371void DeterministicSchedule::join(std::thread& child) {
372 auto sched = tls_sched;
373 if (sched) {
374 sched->waitForBeforeThreadExit(child);
375 }
376 atomic_thread_fence(std::memory_order_seq_cst);
377 FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
378 if (sched) {
379 sem_post(sched->exitingSems_[child.get_id()]);
380 }
381 child.join();
382}
383
384void DeterministicSchedule::callAux(bool success) {
385 ++step_;
386 if (tls_aux_act) {
387 tls_aux_act(success);
388 tls_aux_act = nullptr;
389 }
390 if (aux_chk) {
391 aux_chk(step_);
392 }
393}
394
395static std::unordered_map<sem_t*, std::unique_ptr<ThreadSyncVar>> semSyncVar;
396
397void DeterministicSchedule::post(sem_t* sem) {
398 beforeSharedAccess();
399 if (semSyncVar.count(sem) == 0) {
400 semSyncVar[sem] = std::make_unique<ThreadSyncVar>();
401 }
402 semSyncVar[sem]->release();
403 sem_post(sem);
404 FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
405 afterSharedAccess();
406}
407
408bool DeterministicSchedule::tryWait(sem_t* sem) {
409 beforeSharedAccess();
410 if (semSyncVar.count(sem) == 0) {
411 semSyncVar[sem] = std::make_unique<ThreadSyncVar>();
412 }
413
414 int rv = sem_trywait(sem);
415 int e = rv == 0 ? 0 : errno;
416 FOLLY_TEST_DSCHED_VLOG(
417 "sem_trywait(" << sem << ") = " << rv << " errno=" << e);
418 if (rv == 0) {
419 semSyncVar[sem]->acq_rel();
420 } else {
421 semSyncVar[sem]->acquire();
422 }
423
424 afterSharedAccess();
425 if (rv == 0) {
426 return true;
427 } else {
428 assert(e == EAGAIN);
429 return false;
430 }
431}
432
433void DeterministicSchedule::wait(sem_t* sem) {
434 while (!tryWait(sem)) {
435 // we're not busy waiting because this is a deterministic schedule
436 }
437}
438
439ThreadInfo& DeterministicSchedule::getCurrentThreadInfo() {
440 auto sched = tls_sched;
441 assert(sched);
442 assert(tls_threadId.val < sched->threadInfoMap_.size());
443 return sched->threadInfoMap_[tls_threadId.val];
444}
445
446void DeterministicSchedule::atomic_thread_fence(std::memory_order mo) {
447 if (!tls_sched) {
448 std::atomic_thread_fence(mo);
449 return;
450 }
451 beforeSharedAccess();
452 ThreadInfo& threadInfo = getCurrentThreadInfo();
453 switch (mo) {
454 case std::memory_order_relaxed:
455 assert(false);
456 break;
457 case std::memory_order_consume:
458 case std::memory_order_acquire:
459 threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
460 break;
461 case std::memory_order_release:
462 threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
463 break;
464 case std::memory_order_acq_rel:
465 threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
466 threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
467 break;
468 case std::memory_order_seq_cst:
469 threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_);
470 threadInfo.acqRelOrder_.sync(tls_sched->seqCstFenceOrder_);
471 tls_sched->seqCstFenceOrder_ = threadInfo.acqRelOrder_;
472 threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_);
473 break;
474 }
475 FOLLY_TEST_DSCHED_VLOG("fence: " << folly::detail::memory_order_to_str(mo));
476 afterSharedAccess();
477}
478
479detail::FutexResult futexWaitImpl(
480 const detail::Futex<DeterministicAtomic>* futex,
481 uint32_t expected,
482 std::chrono::system_clock::time_point const* absSystemTimeout,
483 std::chrono::steady_clock::time_point const* absSteadyTimeout,
484 uint32_t waitMask) {
485 return deterministicFutexWaitImpl<DeterministicAtomic>(
486 futex,
487 futexLock,
488 futexQueues,
489 expected,
490 absSystemTimeout,
491 absSteadyTimeout,
492 waitMask);
493}
494
495int futexWakeImpl(
496 const detail::Futex<DeterministicAtomic>* futex,
497 int count,
498 uint32_t wakeMask) {
499 return deterministicFutexWakeImpl<DeterministicAtomic>(
500 futex, futexLock, futexQueues, count, wakeMask);
501}
502
503} // namespace test
504} // namespace folly
505
506namespace folly {
507
508template <>
509CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
510 static CacheLocality cache(CacheLocality::uniform(16));
511 return cache;
512}
513
514template <>
515Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
516 return &test::DeterministicSchedule::getcpu;
517}
518} // namespace folly
519