1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <assert.h> |
20 | #include <boost/noncopyable.hpp> |
21 | #include <errno.h> |
22 | #include <glog/logging.h> |
23 | #include <atomic> |
24 | #include <functional> |
25 | #include <list> |
26 | #include <mutex> |
27 | #include <queue> |
28 | #include <thread> |
29 | #include <unordered_set> |
30 | #include <vector> |
31 | |
32 | #include <folly/ScopeGuard.h> |
33 | #include <folly/concurrency/CacheLocality.h> |
34 | #include <folly/detail/Futex.h> |
35 | #include <folly/portability/Semaphore.h> |
36 | #include <folly/synchronization/detail/AtomicUtils.h> |
37 | |
38 | namespace folly { |
39 | namespace test { |
40 | |
41 | // This is ugly, but better perf for DeterministicAtomic translates |
42 | // directly to more states explored and tested |
43 | #define FOLLY_TEST_DSCHED_VLOG(...) \ |
44 | do { \ |
45 | if (false) { \ |
46 | VLOG(2) << std::hex << std::this_thread::get_id() << ": " \ |
47 | << __VA_ARGS__; \ |
48 | } \ |
49 | } while (false) |
50 | |
51 | /* signatures of user-defined auxiliary functions */ |
52 | using AuxAct = std::function<void(bool)>; |
53 | using AuxChk = std::function<void(uint64_t)>; |
54 | |
55 | struct DSchedThreadId { |
56 | unsigned val; |
57 | explicit constexpr DSchedThreadId() : val(0) {} |
58 | explicit constexpr DSchedThreadId(unsigned v) : val(v) {} |
59 | unsigned operator=(unsigned v) { |
60 | return val = v; |
61 | } |
62 | }; |
63 | |
64 | class DSchedTimestamp { |
65 | public: |
66 | constexpr explicit DSchedTimestamp() : val_(0) {} |
67 | DSchedTimestamp advance() { |
68 | return DSchedTimestamp(++val_); |
69 | } |
70 | bool atLeastAsRecentAs(const DSchedTimestamp& other) const { |
71 | return val_ >= other.val_; |
72 | } |
73 | void sync(const DSchedTimestamp& other) { |
74 | val_ = std::max(val_, other.val_); |
75 | } |
76 | bool initialized() const { |
77 | return val_ > 0; |
78 | } |
79 | static constexpr DSchedTimestamp initial() { |
80 | return DSchedTimestamp(1); |
81 | } |
82 | |
83 | protected: |
84 | constexpr explicit DSchedTimestamp(size_t v) : val_(v) {} |
85 | |
86 | private: |
87 | size_t val_; |
88 | }; |
89 | |
90 | class ThreadTimestamps { |
91 | public: |
92 | void sync(const ThreadTimestamps& src); |
93 | DSchedTimestamp advance(DSchedThreadId tid); |
94 | |
95 | void setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts); |
96 | void clear(); |
97 | bool atLeastAsRecentAs(DSchedThreadId tid, DSchedTimestamp ts) const; |
98 | bool atLeastAsRecentAsAny(const ThreadTimestamps& src) const; |
99 | |
100 | private: |
101 | std::vector<DSchedTimestamp> timestamps_; |
102 | }; |
103 | |
104 | struct ThreadInfo { |
105 | ThreadInfo() = delete; |
106 | explicit ThreadInfo(DSchedThreadId tid) { |
107 | acqRelOrder_.setIfNotPresent(tid, DSchedTimestamp::initial()); |
108 | } |
109 | ThreadTimestamps acqRelOrder_; |
110 | ThreadTimestamps acqFenceOrder_; |
111 | ThreadTimestamps relFenceOrder_; |
112 | }; |
113 | |
114 | class ThreadSyncVar { |
115 | public: |
116 | ThreadSyncVar() = default; |
117 | |
118 | void acquire(); |
119 | void release(); |
120 | void acq_rel(); |
121 | |
122 | private: |
123 | ThreadTimestamps order_; |
124 | }; |
125 | |
126 | /** |
127 | * DeterministicSchedule coordinates the inter-thread communication of a |
128 | * set of threads under test, so that despite concurrency the execution is |
129 | * the same every time. It works by stashing a reference to the schedule |
130 | * in a thread-local variable, then blocking all but one thread at a time. |
131 | * |
132 | * In order for DeterministicSchedule to work, it needs to intercept |
133 | * all inter-thread communication. To do this you should use |
134 | * DeterministicAtomic<T> instead of std::atomic<T>, create threads |
135 | * using DeterministicSchedule::thread() instead of the std::thread |
136 | * constructor, DeterministicSchedule::join(thr) instead of thr.join(), |
137 | * and access semaphores via the helper functions in DeterministicSchedule. |
138 | * Locks are not yet supported, although they would be easy to add with |
139 | * the same strategy as the mapping of sem_wait. |
140 | * |
141 | * The actual schedule is defined by a function from n -> [0,n). At |
142 | * each step, the function will be given the number of active threads |
143 | * (n), and it returns the index of the thread that should be run next. |
144 | * Invocations of the scheduler function will be serialized, but will |
145 | * occur from multiple threads. A good starting schedule is uniform(0). |
146 | */ |
147 | class DeterministicSchedule : boost::noncopyable { |
148 | public: |
149 | /** |
150 | * Arranges for the current thread (and all threads created by |
151 | * DeterministicSchedule::thread on a thread participating in this |
152 | * schedule) to participate in a deterministic schedule. |
153 | */ |
154 | explicit DeterministicSchedule( |
155 | const std::function<size_t(size_t)>& scheduler); |
156 | |
157 | /** Completes the schedule. */ |
158 | ~DeterministicSchedule(); |
159 | |
160 | /** |
161 | * Returns a scheduling function that randomly chooses one of the |
162 | * runnable threads at each step, with no history. This implements |
163 | * a schedule that is equivalent to one in which the steps between |
164 | * inter-thread communication are random variables following a poisson |
165 | * distribution. |
166 | */ |
167 | static std::function<size_t(size_t)> uniform(uint64_t seed); |
168 | |
169 | /** |
170 | * Returns a scheduling function that chooses a subset of the active |
171 | * threads and randomly chooses a member of the subset as the next |
172 | * runnable thread. The subset is chosen with size n, and the choice |
173 | * is made every m steps. |
174 | */ |
175 | static std::function<size_t(size_t)> |
176 | uniformSubset(uint64_t seed, size_t n = 2, size_t m = 64); |
177 | |
178 | /** Obtains permission for the current thread to perform inter-thread |
179 | * communication. */ |
180 | static void beforeSharedAccess(); |
181 | |
182 | /** Releases permission for the current thread to perform inter-thread |
183 | * communication. */ |
184 | static void afterSharedAccess(); |
185 | |
186 | /** Calls a user-defined auxiliary function if any, and releases |
187 | * permission for the current thread to perform inter-thread |
188 | * communication. The bool parameter indicates the success of the |
189 | * shared access (if conditional, true otherwise). */ |
190 | static void afterSharedAccess(bool success); |
191 | |
192 | /** Launches a thread that will participate in the same deterministic |
193 | * schedule as the current thread. */ |
194 | template <typename Func, typename... Args> |
195 | static inline std::thread thread(Func&& func, Args&&... args) { |
196 | // TODO: maybe future versions of gcc will allow forwarding to thread |
197 | atomic_thread_fence(std::memory_order_seq_cst); |
198 | auto sched = tls_sched; |
199 | auto sem = sched ? sched->beforeThreadCreate() : nullptr; |
200 | auto child = std::thread( |
201 | [=](Args... a) { |
202 | if (sched) { |
203 | sched->afterThreadCreate(sem); |
204 | beforeSharedAccess(); |
205 | FOLLY_TEST_DSCHED_VLOG("running" ); |
206 | afterSharedAccess(); |
207 | } |
208 | SCOPE_EXIT { |
209 | if (sched) { |
210 | sched->beforeThreadExit(); |
211 | } |
212 | }; |
213 | func(a...); |
214 | }, |
215 | args...); |
216 | if (sched) { |
217 | beforeSharedAccess(); |
218 | sched->active_.insert(child.get_id()); |
219 | FOLLY_TEST_DSCHED_VLOG("forked " << std::hex << child.get_id()); |
220 | afterSharedAccess(); |
221 | } |
222 | return child; |
223 | } |
224 | |
225 | /** Calls child.join() as part of a deterministic schedule. */ |
226 | static void join(std::thread& child); |
227 | |
228 | /** Waits for each thread in children to reach the end of their |
229 | * thread function without allowing them to fully terminate. Then, |
230 | * allow one child at a time to fully terminate and join each one. |
231 | * This functionality is important to protect shared access that can |
232 | * take place after beforeThreadExit() has already been invoked, |
233 | * for example when executing thread local destructors. |
234 | */ |
235 | static void joinAll(std::vector<std::thread>& children); |
236 | |
237 | /** Calls sem_post(sem) as part of a deterministic schedule. */ |
238 | static void post(sem_t* sem); |
239 | |
240 | /** Calls sem_trywait(sem) as part of a deterministic schedule, returning |
241 | * true on success and false on transient failure. */ |
242 | static bool tryWait(sem_t* sem); |
243 | |
244 | /** Calls sem_wait(sem) as part of a deterministic schedule. */ |
245 | static void wait(sem_t* sem); |
246 | |
247 | /** Used scheduler_ to get a random number b/w [0, n). If tls_sched is |
248 | * not set-up it falls back to std::rand() */ |
249 | static size_t getRandNumber(size_t n); |
250 | |
251 | /** Deterministic implemencation of getcpu */ |
252 | static int getcpu(unsigned* cpu, unsigned* node, void* unused); |
253 | |
254 | /** Sets up a thread-specific function for call immediately after |
255 | * the next shared access by the thread for managing auxiliary |
256 | * data. The function takes a bool parameter that indicates the |
257 | * success of the shared access (if it is conditional, true |
258 | * otherwise). The function is cleared after one use. */ |
259 | static void setAuxAct(AuxAct& aux); |
260 | |
261 | /** Sets up a function to be called after every subsequent shared |
262 | * access (until clearAuxChk() is called) for checking global |
263 | * invariants and logging. The function takes a uint64_t parameter |
264 | * that indicates the number of shared accesses so far. */ |
265 | static void setAuxChk(AuxChk& aux); |
266 | |
267 | /** Clears the function set by setAuxChk */ |
268 | static void clearAuxChk(); |
269 | |
270 | /** Remove the current thread's semaphore from sems_ */ |
271 | static sem_t* descheduleCurrentThread(); |
272 | |
273 | /** Returns true if the current thread has already completed |
274 | * the thread function, for example if the thread is executing |
275 | * thread local destructors. */ |
276 | static bool isCurrentThreadExiting() { |
277 | return tls_exiting; |
278 | } |
279 | |
280 | /** Add sem back into sems_ */ |
281 | static void reschedule(sem_t* sem); |
282 | |
283 | static bool isActive() { |
284 | return tls_sched != nullptr; |
285 | } |
286 | |
287 | static DSchedThreadId getThreadId() { |
288 | assert(tls_sched != nullptr); |
289 | return tls_threadId; |
290 | } |
291 | |
292 | static ThreadInfo& getCurrentThreadInfo(); |
293 | |
294 | static void atomic_thread_fence(std::memory_order mo); |
295 | |
296 | private: |
297 | static FOLLY_TLS sem_t* tls_sem; |
298 | static FOLLY_TLS DeterministicSchedule* tls_sched; |
299 | static FOLLY_TLS bool tls_exiting; |
300 | static FOLLY_TLS DSchedThreadId tls_threadId; |
301 | static thread_local AuxAct tls_aux_act; |
302 | static AuxChk aux_chk; |
303 | |
304 | std::function<size_t(size_t)> scheduler_; |
305 | std::vector<sem_t*> sems_; |
306 | std::unordered_set<std::thread::id> active_; |
307 | std::unordered_map<std::thread::id, sem_t*> joins_; |
308 | std::unordered_map<std::thread::id, sem_t*> exitingSems_; |
309 | |
310 | std::vector<ThreadInfo> threadInfoMap_; |
311 | ThreadTimestamps seqCstFenceOrder_; |
312 | |
313 | unsigned nextThreadId_; |
314 | /* step_ keeps count of shared accesses that correspond to user |
315 | * synchronization steps (atomic accesses for now). |
316 | * The reason for keeping track of this here and not just with |
317 | * auxiliary data is to provide users with warning signs (e.g., |
318 | * skipped steps) if they inadvertently forget to set up aux |
319 | * functions for some shared accesses. */ |
320 | uint64_t step_; |
321 | |
322 | sem_t* beforeThreadCreate(); |
323 | void afterThreadCreate(sem_t*); |
324 | void beforeThreadExit(); |
325 | void waitForBeforeThreadExit(std::thread& child); |
326 | /** Calls user-defined auxiliary function (if any) */ |
327 | void callAux(bool); |
328 | }; |
329 | |
330 | /** |
331 | * DeterministicAtomic<T> is a drop-in replacement std::atomic<T> that |
332 | * cooperates with DeterministicSchedule. |
333 | */ |
334 | template < |
335 | typename T, |
336 | typename Schedule = DeterministicSchedule, |
337 | template <typename> class Atom = std::atomic> |
338 | struct DeterministicAtomicImpl { |
339 | DeterministicAtomicImpl() = default; |
340 | ~DeterministicAtomicImpl() = default; |
341 | DeterministicAtomicImpl(DeterministicAtomicImpl<T> const&) = delete; |
342 | DeterministicAtomicImpl<T>& operator=(DeterministicAtomicImpl<T> const&) = |
343 | delete; |
344 | |
345 | constexpr /* implicit */ DeterministicAtomicImpl(T v) noexcept : data_(v) {} |
346 | |
347 | bool is_lock_free() const noexcept { |
348 | return data_.is_lock_free(); |
349 | } |
350 | |
351 | bool compare_exchange_strong( |
352 | T& v0, |
353 | T v1, |
354 | std::memory_order mo = std::memory_order_seq_cst) noexcept { |
355 | return compare_exchange_strong( |
356 | v0, v1, mo, ::folly::detail::default_failure_memory_order(mo)); |
357 | } |
358 | bool compare_exchange_strong( |
359 | T& v0, |
360 | T v1, |
361 | std::memory_order success, |
362 | std::memory_order failure) noexcept { |
363 | Schedule::beforeSharedAccess(); |
364 | auto orig = v0; |
365 | bool rv = data_.compare_exchange_strong(v0, v1, success, failure); |
366 | FOLLY_TEST_DSCHED_VLOG( |
367 | this << ".compare_exchange_strong(" << std::hex << orig << ", " |
368 | << std::hex << v1 << ") -> " << rv << "," << std::hex << v0); |
369 | Schedule::afterSharedAccess(rv); |
370 | return rv; |
371 | } |
372 | |
373 | bool compare_exchange_weak( |
374 | T& v0, |
375 | T v1, |
376 | std::memory_order mo = std::memory_order_seq_cst) noexcept { |
377 | return compare_exchange_weak( |
378 | v0, v1, mo, ::folly::detail::default_failure_memory_order(mo)); |
379 | } |
380 | bool compare_exchange_weak( |
381 | T& v0, |
382 | T v1, |
383 | std::memory_order success, |
384 | std::memory_order failure) noexcept { |
385 | Schedule::beforeSharedAccess(); |
386 | auto orig = v0; |
387 | bool rv = data_.compare_exchange_weak(v0, v1, success, failure); |
388 | FOLLY_TEST_DSCHED_VLOG( |
389 | this << ".compare_exchange_weak(" << std::hex << orig << ", " |
390 | << std::hex << v1 << ") -> " << rv << "," << std::hex << v0); |
391 | Schedule::afterSharedAccess(rv); |
392 | return rv; |
393 | } |
394 | |
395 | T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
396 | Schedule::beforeSharedAccess(); |
397 | T rv = data_.exchange(v, mo); |
398 | FOLLY_TEST_DSCHED_VLOG( |
399 | this << ".exchange(" << std::hex << v << ") -> " << std::hex << rv); |
400 | Schedule::afterSharedAccess(true); |
401 | return rv; |
402 | } |
403 | |
404 | /* implicit */ operator T() const noexcept { |
405 | Schedule::beforeSharedAccess(); |
406 | T rv = data_.operator T(); |
407 | FOLLY_TEST_DSCHED_VLOG(this << "() -> " << std::hex << rv); |
408 | Schedule::afterSharedAccess(true); |
409 | return rv; |
410 | } |
411 | |
412 | T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept { |
413 | Schedule::beforeSharedAccess(); |
414 | T rv = data_.load(mo); |
415 | FOLLY_TEST_DSCHED_VLOG(this << ".load() -> " << std::hex << rv); |
416 | Schedule::afterSharedAccess(true); |
417 | return rv; |
418 | } |
419 | |
420 | T operator=(T v) noexcept { |
421 | Schedule::beforeSharedAccess(); |
422 | T rv = (data_ = v); |
423 | FOLLY_TEST_DSCHED_VLOG(this << " = " << std::hex << v); |
424 | Schedule::afterSharedAccess(true); |
425 | return rv; |
426 | } |
427 | |
428 | void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
429 | Schedule::beforeSharedAccess(); |
430 | data_.store(v, mo); |
431 | FOLLY_TEST_DSCHED_VLOG(this << ".store(" << std::hex << v << ")" ); |
432 | Schedule::afterSharedAccess(true); |
433 | } |
434 | |
435 | T operator++() noexcept { |
436 | Schedule::beforeSharedAccess(); |
437 | T rv = ++data_; |
438 | FOLLY_TEST_DSCHED_VLOG(this << " pre++ -> " << std::hex << rv); |
439 | Schedule::afterSharedAccess(true); |
440 | return rv; |
441 | } |
442 | |
443 | T operator++(int /* postDummy */) noexcept { |
444 | Schedule::beforeSharedAccess(); |
445 | T rv = data_++; |
446 | FOLLY_TEST_DSCHED_VLOG(this << " post++ -> " << std::hex << rv); |
447 | Schedule::afterSharedAccess(true); |
448 | return rv; |
449 | } |
450 | |
451 | T operator--() noexcept { |
452 | Schedule::beforeSharedAccess(); |
453 | T rv = --data_; |
454 | FOLLY_TEST_DSCHED_VLOG(this << " pre-- -> " << std::hex << rv); |
455 | Schedule::afterSharedAccess(true); |
456 | return rv; |
457 | } |
458 | |
459 | T operator--(int /* postDummy */) noexcept { |
460 | Schedule::beforeSharedAccess(); |
461 | T rv = data_--; |
462 | FOLLY_TEST_DSCHED_VLOG(this << " post-- -> " << std::hex << rv); |
463 | Schedule::afterSharedAccess(true); |
464 | return rv; |
465 | } |
466 | |
467 | T operator+=(T v) noexcept { |
468 | Schedule::beforeSharedAccess(); |
469 | T rv = (data_ += v); |
470 | FOLLY_TEST_DSCHED_VLOG( |
471 | this << " += " << std::hex << v << " -> " << std::hex << rv); |
472 | Schedule::afterSharedAccess(true); |
473 | return rv; |
474 | } |
475 | |
476 | T fetch_add(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
477 | Schedule::beforeSharedAccess(); |
478 | T rv = data_.fetch_add(v, mo); |
479 | FOLLY_TEST_DSCHED_VLOG( |
480 | this << ".fetch_add(" << std::hex << v << ") -> " << std::hex << rv); |
481 | Schedule::afterSharedAccess(true); |
482 | return rv; |
483 | } |
484 | |
485 | T operator-=(T v) noexcept { |
486 | Schedule::beforeSharedAccess(); |
487 | T rv = (data_ -= v); |
488 | FOLLY_TEST_DSCHED_VLOG( |
489 | this << " -= " << std::hex << v << " -> " << std::hex << rv); |
490 | Schedule::afterSharedAccess(true); |
491 | return rv; |
492 | } |
493 | |
494 | T fetch_sub(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
495 | Schedule::beforeSharedAccess(); |
496 | T rv = data_.fetch_sub(v, mo); |
497 | FOLLY_TEST_DSCHED_VLOG( |
498 | this << ".fetch_sub(" << std::hex << v << ") -> " << std::hex << rv); |
499 | Schedule::afterSharedAccess(true); |
500 | return rv; |
501 | } |
502 | |
503 | T operator&=(T v) noexcept { |
504 | Schedule::beforeSharedAccess(); |
505 | T rv = (data_ &= v); |
506 | FOLLY_TEST_DSCHED_VLOG( |
507 | this << " &= " << std::hex << v << " -> " << std::hex << rv); |
508 | Schedule::afterSharedAccess(true); |
509 | return rv; |
510 | } |
511 | |
512 | T fetch_and(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
513 | Schedule::beforeSharedAccess(); |
514 | T rv = data_.fetch_and(v, mo); |
515 | FOLLY_TEST_DSCHED_VLOG( |
516 | this << ".fetch_and(" << std::hex << v << ") -> " << std::hex << rv); |
517 | Schedule::afterSharedAccess(true); |
518 | return rv; |
519 | } |
520 | |
521 | T operator|=(T v) noexcept { |
522 | Schedule::beforeSharedAccess(); |
523 | T rv = (data_ |= v); |
524 | FOLLY_TEST_DSCHED_VLOG( |
525 | this << " |= " << std::hex << v << " -> " << std::hex << rv); |
526 | Schedule::afterSharedAccess(true); |
527 | return rv; |
528 | } |
529 | |
530 | T fetch_or(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
531 | Schedule::beforeSharedAccess(); |
532 | T rv = data_.fetch_or(v, mo); |
533 | FOLLY_TEST_DSCHED_VLOG( |
534 | this << ".fetch_or(" << std::hex << v << ") -> " << std::hex << rv); |
535 | Schedule::afterSharedAccess(true); |
536 | return rv; |
537 | } |
538 | |
539 | T operator^=(T v) noexcept { |
540 | Schedule::beforeSharedAccess(); |
541 | T rv = (data_ ^= v); |
542 | FOLLY_TEST_DSCHED_VLOG( |
543 | this << " ^= " << std::hex << v << " -> " << std::hex << rv); |
544 | Schedule::afterSharedAccess(true); |
545 | return rv; |
546 | } |
547 | |
548 | T fetch_xor(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept { |
549 | Schedule::beforeSharedAccess(); |
550 | T rv = data_.fetch_xor(v, mo); |
551 | FOLLY_TEST_DSCHED_VLOG( |
552 | this << ".fetch_xor(" << std::hex << v << ") -> " << std::hex << rv); |
553 | Schedule::afterSharedAccess(true); |
554 | return rv; |
555 | } |
556 | |
557 | /** Read the value of the atomic variable without context switching */ |
558 | T load_direct() const noexcept { |
559 | return data_.load(std::memory_order_relaxed); |
560 | } |
561 | |
562 | private: |
563 | Atom<T> data_; |
564 | }; |
565 | |
566 | template <typename T> |
567 | using DeterministicAtomic = DeterministicAtomicImpl<T, DeterministicSchedule>; |
568 | |
569 | /* Futex extensions for DeterministicSchedule based Futexes */ |
570 | int futexWakeImpl( |
571 | const detail::Futex<test::DeterministicAtomic>* futex, |
572 | int count, |
573 | uint32_t wakeMask); |
574 | detail::FutexResult futexWaitImpl( |
575 | const detail::Futex<test::DeterministicAtomic>* futex, |
576 | uint32_t expected, |
577 | std::chrono::system_clock::time_point const* absSystemTime, |
578 | std::chrono::steady_clock::time_point const* absSteadyTime, |
579 | uint32_t waitMask); |
580 | |
581 | /* Generic futex extensions to allow sharing between DeterministicAtomic and |
582 | * BufferedDeterministicAtomic.*/ |
583 | template <template <typename> class Atom> |
584 | int deterministicFutexWakeImpl( |
585 | const detail::Futex<Atom>* futex, |
586 | std::mutex& futexLock, |
587 | std::unordered_map< |
588 | const detail::Futex<Atom>*, |
589 | std::list<std::pair<uint32_t, bool*>>>& futexQueues, |
590 | int count, |
591 | uint32_t wakeMask) { |
592 | using namespace test; |
593 | using namespace std::chrono; |
594 | |
595 | int rv = 0; |
596 | DeterministicSchedule::beforeSharedAccess(); |
597 | futexLock.lock(); |
598 | if (futexQueues.count(futex) > 0) { |
599 | auto& queue = futexQueues[futex]; |
600 | auto iter = queue.begin(); |
601 | while (iter != queue.end() && rv < count) { |
602 | auto cur = iter++; |
603 | if ((cur->first & wakeMask) != 0) { |
604 | *(cur->second) = true; |
605 | rv++; |
606 | queue.erase(cur); |
607 | } |
608 | } |
609 | if (queue.empty()) { |
610 | futexQueues.erase(futex); |
611 | } |
612 | } |
613 | futexLock.unlock(); |
614 | FOLLY_TEST_DSCHED_VLOG( |
615 | "futexWake(" << futex << ", " << count << ", " << std::hex << wakeMask |
616 | << ") -> " << rv); |
617 | DeterministicSchedule::afterSharedAccess(); |
618 | return rv; |
619 | } |
620 | |
621 | template <template <typename> class Atom> |
622 | detail::FutexResult deterministicFutexWaitImpl( |
623 | const detail::Futex<Atom>* futex, |
624 | std::mutex& futexLock, |
625 | std::unordered_map< |
626 | const detail::Futex<Atom>*, |
627 | std::list<std::pair<uint32_t, bool*>>>& futexQueues, |
628 | uint32_t expected, |
629 | std::chrono::system_clock::time_point const* absSystemTimeout, |
630 | std::chrono::steady_clock::time_point const* absSteadyTimeout, |
631 | uint32_t waitMask) { |
632 | using namespace test; |
633 | using namespace std::chrono; |
634 | using namespace folly::detail; |
635 | |
636 | bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr; |
637 | bool awoken = false; |
638 | FutexResult result = FutexResult::AWOKEN; |
639 | |
640 | DeterministicSchedule::beforeSharedAccess(); |
641 | FOLLY_TEST_DSCHED_VLOG( |
642 | "futexWait(" << futex << ", " << std::hex << expected << ", .., " |
643 | << std::hex << waitMask << ") beginning.." ); |
644 | futexLock.lock(); |
645 | // load_direct avoids deadlock on inner call to beforeSharedAccess |
646 | if (futex->load_direct() == expected) { |
647 | auto& queue = futexQueues[futex]; |
648 | queue.emplace_back(waitMask, &awoken); |
649 | auto ours = queue.end(); |
650 | ours--; |
651 | while (!awoken) { |
652 | futexLock.unlock(); |
653 | DeterministicSchedule::afterSharedAccess(); |
654 | DeterministicSchedule::beforeSharedAccess(); |
655 | futexLock.lock(); |
656 | |
657 | // Simulate spurious wake-ups, timeouts each time with |
658 | // a 10% probability if we haven't been woken up already |
659 | if (!awoken && hasTimeout && |
660 | DeterministicSchedule::getRandNumber(100) < 10) { |
661 | assert(futexQueues.count(futex) != 0 && &futexQueues[futex] == &queue); |
662 | queue.erase(ours); |
663 | if (queue.empty()) { |
664 | futexQueues.erase(futex); |
665 | } |
666 | // Simulate ETIMEDOUT 90% of the time and other failures |
667 | // remaining time |
668 | result = DeterministicSchedule::getRandNumber(100) >= 10 |
669 | ? FutexResult::TIMEDOUT |
670 | : FutexResult::INTERRUPTED; |
671 | break; |
672 | } |
673 | } |
674 | } else { |
675 | result = FutexResult::VALUE_CHANGED; |
676 | } |
677 | futexLock.unlock(); |
678 | |
679 | char const* resultStr = "?" ; |
680 | switch (result) { |
681 | case FutexResult::AWOKEN: |
682 | resultStr = "AWOKEN" ; |
683 | break; |
684 | case FutexResult::TIMEDOUT: |
685 | resultStr = "TIMEDOUT" ; |
686 | break; |
687 | case FutexResult::INTERRUPTED: |
688 | resultStr = "INTERRUPTED" ; |
689 | break; |
690 | case FutexResult::VALUE_CHANGED: |
691 | resultStr = "VALUE_CHANGED" ; |
692 | break; |
693 | } |
694 | FOLLY_TEST_DSCHED_VLOG( |
695 | "futexWait(" << futex << ", " << std::hex << expected << ", .., " |
696 | << std::hex << waitMask << ") -> " << resultStr); |
697 | DeterministicSchedule::afterSharedAccess(); |
698 | return result; |
699 | } |
700 | |
701 | /** |
702 | * Implementations of the atomic_wait API for DeterministicAtomic, these are |
703 | * no-ops here. Which for a correct implementation should not make a |
704 | * difference because threads are required to have atomic operations around |
705 | * waits and wakes |
706 | */ |
707 | template <typename Integer> |
708 | void atomic_wait(const DeterministicAtomic<Integer>*, Integer) {} |
709 | template <typename Integer, typename Clock, typename Duration> |
710 | std::cv_status atomic_wait_until( |
711 | const DeterministicAtomic<Integer>*, |
712 | Integer, |
713 | const std::chrono::time_point<Clock, Duration>&) { |
714 | return std::cv_status::no_timeout; |
715 | } |
716 | template <typename Integer> |
717 | void atomic_notify_one(const DeterministicAtomic<Integer>*) {} |
718 | template <typename Integer> |
719 | void atomic_notify_all(const DeterministicAtomic<Integer>*) {} |
720 | |
721 | /** |
722 | * DeterministicMutex is a drop-in replacement of std::mutex that |
723 | * cooperates with DeterministicSchedule. |
724 | */ |
725 | struct DeterministicMutex { |
726 | std::mutex m; |
727 | std::queue<sem_t*> waiters_; |
728 | ThreadSyncVar syncVar_; |
729 | |
730 | DeterministicMutex() = default; |
731 | ~DeterministicMutex() = default; |
732 | DeterministicMutex(DeterministicMutex const&) = delete; |
733 | DeterministicMutex& operator=(DeterministicMutex const&) = delete; |
734 | |
735 | void lock() { |
736 | FOLLY_TEST_DSCHED_VLOG(this << ".lock()" ); |
737 | DeterministicSchedule::beforeSharedAccess(); |
738 | while (!m.try_lock()) { |
739 | sem_t* sem = DeterministicSchedule::descheduleCurrentThread(); |
740 | if (sem) { |
741 | waiters_.push(sem); |
742 | } |
743 | DeterministicSchedule::afterSharedAccess(); |
744 | // Wait to be scheduled by unlock |
745 | DeterministicSchedule::beforeSharedAccess(); |
746 | } |
747 | if (DeterministicSchedule::isActive()) { |
748 | syncVar_.acquire(); |
749 | } |
750 | DeterministicSchedule::afterSharedAccess(); |
751 | } |
752 | |
753 | bool try_lock() { |
754 | DeterministicSchedule::beforeSharedAccess(); |
755 | bool rv = m.try_lock(); |
756 | if (rv && DeterministicSchedule::isActive()) { |
757 | syncVar_.acquire(); |
758 | } |
759 | FOLLY_TEST_DSCHED_VLOG(this << ".try_lock() -> " << rv); |
760 | DeterministicSchedule::afterSharedAccess(); |
761 | return rv; |
762 | } |
763 | |
764 | void unlock() { |
765 | FOLLY_TEST_DSCHED_VLOG(this << ".unlock()" ); |
766 | DeterministicSchedule::beforeSharedAccess(); |
767 | m.unlock(); |
768 | if (DeterministicSchedule::isActive()) { |
769 | syncVar_.release(); |
770 | } |
771 | if (!waiters_.empty()) { |
772 | sem_t* sem = waiters_.front(); |
773 | DeterministicSchedule::reschedule(sem); |
774 | waiters_.pop(); |
775 | } |
776 | DeterministicSchedule::afterSharedAccess(); |
777 | } |
778 | }; |
779 | } // namespace test |
780 | |
781 | template <> |
782 | Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(); |
783 | |
784 | } // namespace folly |
785 | |