1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/test/DeterministicSchedule.h> |
18 | |
19 | #include <assert.h> |
20 | |
21 | #include <algorithm> |
22 | #include <list> |
23 | #include <mutex> |
24 | #include <random> |
25 | #include <unordered_map> |
26 | #include <utility> |
27 | |
28 | #include <folly/Random.h> |
29 | |
30 | namespace folly { |
31 | namespace test { |
32 | |
33 | FOLLY_TLS sem_t* DeterministicSchedule::tls_sem; |
34 | FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched; |
35 | FOLLY_TLS bool DeterministicSchedule::tls_exiting; |
36 | FOLLY_TLS DSchedThreadId DeterministicSchedule::tls_threadId; |
37 | thread_local AuxAct DeterministicSchedule::tls_aux_act; |
38 | AuxChk DeterministicSchedule::aux_chk; |
39 | |
40 | // access is protected by futexLock |
41 | static std::unordered_map< |
42 | const detail::Futex<DeterministicAtomic>*, |
43 | std::list<std::pair<uint32_t, bool*>>> |
44 | futexQueues; |
45 | |
46 | static std::mutex futexLock; |
47 | |
48 | void ThreadTimestamps::sync(const ThreadTimestamps& src) { |
49 | if (src.timestamps_.size() > timestamps_.size()) { |
50 | timestamps_.resize(src.timestamps_.size()); |
51 | } |
52 | for (size_t i = 0; i < src.timestamps_.size(); i++) { |
53 | timestamps_[i].sync(src.timestamps_[i]); |
54 | } |
55 | } |
56 | |
57 | DSchedTimestamp ThreadTimestamps::advance(DSchedThreadId tid) { |
58 | assert(timestamps_.size() > tid.val); |
59 | return timestamps_[tid.val].advance(); |
60 | } |
61 | |
62 | void ThreadTimestamps::setIfNotPresent(DSchedThreadId tid, DSchedTimestamp ts) { |
63 | assert(ts.initialized()); |
64 | if (tid.val >= timestamps_.size()) { |
65 | timestamps_.resize(tid.val + 1); |
66 | } |
67 | if (!timestamps_[tid.val].initialized()) { |
68 | timestamps_[tid.val].sync(ts); |
69 | } |
70 | } |
71 | |
72 | void ThreadTimestamps::clear() { |
73 | timestamps_.clear(); |
74 | } |
75 | |
76 | bool ThreadTimestamps::atLeastAsRecentAs(DSchedThreadId tid, DSchedTimestamp ts) |
77 | const { |
78 | // It is not meaningful learn whether any instance is at least |
79 | // as recent as timestamp 0. |
80 | assert(ts.initialized()); |
81 | if (tid.val >= timestamps_.size()) { |
82 | return false; |
83 | } |
84 | return timestamps_[tid.val].atLeastAsRecentAs(ts); |
85 | } |
86 | |
87 | bool ThreadTimestamps::atLeastAsRecentAsAny(const ThreadTimestamps& src) const { |
88 | size_t min = timestamps_.size() < src.timestamps_.size() |
89 | ? timestamps_.size() |
90 | : src.timestamps_.size(); |
91 | for (size_t i = 0; i < min; i++) { |
92 | if (src.timestamps_[i].initialized() && |
93 | timestamps_[i].atLeastAsRecentAs(src.timestamps_[i])) { |
94 | return true; |
95 | } |
96 | } |
97 | return false; |
98 | } |
99 | |
100 | void ThreadSyncVar::acquire() { |
101 | ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo(); |
102 | DSchedThreadId tid = DeterministicSchedule::getThreadId(); |
103 | threadInfo.acqRelOrder_.advance(tid); |
104 | threadInfo.acqRelOrder_.sync(order_); |
105 | } |
106 | |
107 | void ThreadSyncVar::release() { |
108 | ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo(); |
109 | DSchedThreadId tid = DeterministicSchedule::getThreadId(); |
110 | threadInfo.acqRelOrder_.advance(tid); |
111 | order_.sync(threadInfo.acqRelOrder_); |
112 | } |
113 | |
114 | void ThreadSyncVar::acq_rel() { |
115 | ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo(); |
116 | DSchedThreadId tid = DeterministicSchedule::getThreadId(); |
117 | threadInfo.acqRelOrder_.advance(tid); |
118 | threadInfo.acqRelOrder_.sync(order_); |
119 | order_.sync(threadInfo.acqRelOrder_); |
120 | } |
121 | |
122 | DeterministicSchedule::DeterministicSchedule( |
123 | const std::function<size_t(size_t)>& scheduler) |
124 | : scheduler_(scheduler), nextThreadId_(0), step_(0) { |
125 | assert(tls_sem == nullptr); |
126 | assert(tls_sched == nullptr); |
127 | assert(tls_aux_act == nullptr); |
128 | |
129 | tls_exiting = false; |
130 | tls_sem = new sem_t; |
131 | sem_init(tls_sem, 0, 1); |
132 | sems_.push_back(tls_sem); |
133 | |
134 | tls_threadId = nextThreadId_++; |
135 | threadInfoMap_.emplace_back(tls_threadId); |
136 | tls_sched = this; |
137 | } |
138 | |
139 | DeterministicSchedule::~DeterministicSchedule() { |
140 | assert(tls_sched == this); |
141 | assert(sems_.size() == 1); |
142 | assert(sems_[0] == tls_sem); |
143 | beforeThreadExit(); |
144 | } |
145 | |
146 | std::function<size_t(size_t)> DeterministicSchedule::uniform(uint64_t seed) { |
147 | auto rand = std::make_shared<std::ranlux48>(seed); |
148 | return [rand](size_t numActive) { |
149 | auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1); |
150 | return dist(*rand); |
151 | }; |
152 | } |
153 | |
154 | struct UniformSubset { |
155 | UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect) |
156 | : uniform_(DeterministicSchedule::uniform(seed)), |
157 | subsetSize_(subsetSize), |
158 | stepsBetweenSelect_(stepsBetweenSelect), |
159 | stepsLeft_(0) {} |
160 | |
161 | size_t operator()(size_t numActive) { |
162 | adjustPermSize(numActive); |
163 | if (stepsLeft_-- == 0) { |
164 | stepsLeft_ = stepsBetweenSelect_ - 1; |
165 | shufflePrefix(); |
166 | } |
167 | return perm_[uniform_(std::min(numActive, subsetSize_))]; |
168 | } |
169 | |
170 | private: |
171 | std::function<size_t(size_t)> uniform_; |
172 | const size_t subsetSize_; |
173 | const size_t stepsBetweenSelect_; |
174 | |
175 | size_t stepsLeft_; |
176 | // only the first subsetSize_ is properly randomized |
177 | std::vector<size_t> perm_; |
178 | |
179 | void adjustPermSize(size_t numActive) { |
180 | if (perm_.size() > numActive) { |
181 | perm_.erase( |
182 | std::remove_if( |
183 | perm_.begin(), |
184 | perm_.end(), |
185 | [=](size_t x) { return x >= numActive; }), |
186 | perm_.end()); |
187 | } else { |
188 | while (perm_.size() < numActive) { |
189 | perm_.push_back(perm_.size()); |
190 | } |
191 | } |
192 | assert(perm_.size() == numActive); |
193 | } |
194 | |
195 | void shufflePrefix() { |
196 | for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) { |
197 | size_t j = uniform_(perm_.size() - i) + i; |
198 | std::swap(perm_[i], perm_[j]); |
199 | } |
200 | } |
201 | }; |
202 | |
203 | std::function<size_t(size_t)> |
204 | DeterministicSchedule::uniformSubset(uint64_t seed, size_t n, size_t m) { |
205 | auto gen = std::make_shared<UniformSubset>(seed, n, m); |
206 | return [=](size_t numActive) { return (*gen)(numActive); }; |
207 | } |
208 | |
209 | void DeterministicSchedule::beforeSharedAccess() { |
210 | if (tls_sem) { |
211 | sem_wait(tls_sem); |
212 | } |
213 | } |
214 | |
215 | void DeterministicSchedule::afterSharedAccess() { |
216 | auto sched = tls_sched; |
217 | if (!sched) { |
218 | return; |
219 | } |
220 | sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]); |
221 | } |
222 | |
223 | void DeterministicSchedule::afterSharedAccess(bool success) { |
224 | auto sched = tls_sched; |
225 | if (!sched) { |
226 | return; |
227 | } |
228 | sched->callAux(success); |
229 | sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]); |
230 | } |
231 | |
232 | size_t DeterministicSchedule::getRandNumber(size_t n) { |
233 | if (tls_sched) { |
234 | return tls_sched->scheduler_(n); |
235 | } |
236 | return Random::rand32() % n; |
237 | } |
238 | |
239 | int DeterministicSchedule::getcpu( |
240 | unsigned* cpu, |
241 | unsigned* node, |
242 | void* /* unused */) { |
243 | if (cpu) { |
244 | *cpu = tls_threadId.val; |
245 | } |
246 | if (node) { |
247 | *node = tls_threadId.val; |
248 | } |
249 | return 0; |
250 | } |
251 | |
252 | void DeterministicSchedule::setAuxAct(AuxAct& aux) { |
253 | tls_aux_act = aux; |
254 | } |
255 | |
256 | void DeterministicSchedule::setAuxChk(AuxChk& aux) { |
257 | aux_chk = aux; |
258 | } |
259 | |
260 | void DeterministicSchedule::clearAuxChk() { |
261 | aux_chk = nullptr; |
262 | } |
263 | |
264 | void DeterministicSchedule::reschedule(sem_t* sem) { |
265 | auto sched = tls_sched; |
266 | if (sched) { |
267 | sched->sems_.push_back(sem); |
268 | } |
269 | } |
270 | |
271 | sem_t* DeterministicSchedule::descheduleCurrentThread() { |
272 | auto sched = tls_sched; |
273 | if (sched) { |
274 | sched->sems_.erase( |
275 | std::find(sched->sems_.begin(), sched->sems_.end(), tls_sem)); |
276 | } |
277 | return tls_sem; |
278 | } |
279 | |
280 | sem_t* DeterministicSchedule::beforeThreadCreate() { |
281 | sem_t* s = new sem_t; |
282 | sem_init(s, 0, 0); |
283 | beforeSharedAccess(); |
284 | sems_.push_back(s); |
285 | afterSharedAccess(); |
286 | return s; |
287 | } |
288 | |
289 | void DeterministicSchedule::afterThreadCreate(sem_t* sem) { |
290 | assert(tls_sem == nullptr); |
291 | assert(tls_sched == nullptr); |
292 | tls_exiting = false; |
293 | tls_sem = sem; |
294 | tls_sched = this; |
295 | bool started = false; |
296 | while (!started) { |
297 | beforeSharedAccess(); |
298 | if (active_.count(std::this_thread::get_id()) == 1) { |
299 | started = true; |
300 | tls_threadId = nextThreadId_++; |
301 | assert(tls_threadId.val == threadInfoMap_.size()); |
302 | threadInfoMap_.emplace_back(tls_threadId); |
303 | } |
304 | afterSharedAccess(); |
305 | } |
306 | atomic_thread_fence(std::memory_order_seq_cst); |
307 | } |
308 | |
309 | void DeterministicSchedule::beforeThreadExit() { |
310 | assert(tls_sched == this); |
311 | |
312 | atomic_thread_fence(std::memory_order_seq_cst); |
313 | beforeSharedAccess(); |
314 | auto parent = joins_.find(std::this_thread::get_id()); |
315 | if (parent != joins_.end()) { |
316 | reschedule(parent->second); |
317 | joins_.erase(parent); |
318 | } |
319 | sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem)); |
320 | active_.erase(std::this_thread::get_id()); |
321 | if (sems_.size() > 0) { |
322 | FOLLY_TEST_DSCHED_VLOG("exiting" ); |
323 | /* Wait here so that parent thread can control when the thread |
324 | * enters the thread local destructors. */ |
325 | exitingSems_[std::this_thread::get_id()] = tls_sem; |
326 | afterSharedAccess(); |
327 | sem_wait(tls_sem); |
328 | } |
329 | tls_sched = nullptr; |
330 | tls_aux_act = nullptr; |
331 | tls_exiting = true; |
332 | sem_destroy(tls_sem); |
333 | delete tls_sem; |
334 | tls_sem = nullptr; |
335 | } |
336 | |
337 | void DeterministicSchedule::waitForBeforeThreadExit(std::thread& child) { |
338 | assert(tls_sched == this); |
339 | beforeSharedAccess(); |
340 | assert(tls_sched->joins_.count(child.get_id()) == 0); |
341 | if (tls_sched->active_.count(child.get_id())) { |
342 | sem_t* sem = descheduleCurrentThread(); |
343 | tls_sched->joins_.insert({child.get_id(), sem}); |
344 | afterSharedAccess(); |
345 | // Wait to be scheduled by exiting child thread |
346 | beforeSharedAccess(); |
347 | assert(!tls_sched->active_.count(child.get_id())); |
348 | } |
349 | afterSharedAccess(); |
350 | } |
351 | |
352 | void DeterministicSchedule::joinAll(std::vector<std::thread>& children) { |
353 | auto sched = tls_sched; |
354 | if (sched) { |
355 | // Wait until all children are about to exit |
356 | for (auto& child : children) { |
357 | sched->waitForBeforeThreadExit(child); |
358 | } |
359 | } |
360 | atomic_thread_fence(std::memory_order_seq_cst); |
361 | /* Let each child thread proceed one at a time to protect |
362 | * shared access during thread local destructors.*/ |
363 | for (auto& child : children) { |
364 | if (sched) { |
365 | sem_post(sched->exitingSems_[child.get_id()]); |
366 | } |
367 | child.join(); |
368 | } |
369 | } |
370 | |
371 | void DeterministicSchedule::join(std::thread& child) { |
372 | auto sched = tls_sched; |
373 | if (sched) { |
374 | sched->waitForBeforeThreadExit(child); |
375 | } |
376 | atomic_thread_fence(std::memory_order_seq_cst); |
377 | FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id()); |
378 | if (sched) { |
379 | sem_post(sched->exitingSems_[child.get_id()]); |
380 | } |
381 | child.join(); |
382 | } |
383 | |
384 | void DeterministicSchedule::callAux(bool success) { |
385 | ++step_; |
386 | if (tls_aux_act) { |
387 | tls_aux_act(success); |
388 | tls_aux_act = nullptr; |
389 | } |
390 | if (aux_chk) { |
391 | aux_chk(step_); |
392 | } |
393 | } |
394 | |
395 | static std::unordered_map<sem_t*, std::unique_ptr<ThreadSyncVar>> semSyncVar; |
396 | |
397 | void DeterministicSchedule::post(sem_t* sem) { |
398 | beforeSharedAccess(); |
399 | if (semSyncVar.count(sem) == 0) { |
400 | semSyncVar[sem] = std::make_unique<ThreadSyncVar>(); |
401 | } |
402 | semSyncVar[sem]->release(); |
403 | sem_post(sem); |
404 | FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")" ); |
405 | afterSharedAccess(); |
406 | } |
407 | |
408 | bool DeterministicSchedule::tryWait(sem_t* sem) { |
409 | beforeSharedAccess(); |
410 | if (semSyncVar.count(sem) == 0) { |
411 | semSyncVar[sem] = std::make_unique<ThreadSyncVar>(); |
412 | } |
413 | |
414 | int rv = sem_trywait(sem); |
415 | int e = rv == 0 ? 0 : errno; |
416 | FOLLY_TEST_DSCHED_VLOG( |
417 | "sem_trywait(" << sem << ") = " << rv << " errno=" << e); |
418 | if (rv == 0) { |
419 | semSyncVar[sem]->acq_rel(); |
420 | } else { |
421 | semSyncVar[sem]->acquire(); |
422 | } |
423 | |
424 | afterSharedAccess(); |
425 | if (rv == 0) { |
426 | return true; |
427 | } else { |
428 | assert(e == EAGAIN); |
429 | return false; |
430 | } |
431 | } |
432 | |
433 | void DeterministicSchedule::wait(sem_t* sem) { |
434 | while (!tryWait(sem)) { |
435 | // we're not busy waiting because this is a deterministic schedule |
436 | } |
437 | } |
438 | |
439 | ThreadInfo& DeterministicSchedule::getCurrentThreadInfo() { |
440 | auto sched = tls_sched; |
441 | assert(sched); |
442 | assert(tls_threadId.val < sched->threadInfoMap_.size()); |
443 | return sched->threadInfoMap_[tls_threadId.val]; |
444 | } |
445 | |
446 | void DeterministicSchedule::atomic_thread_fence(std::memory_order mo) { |
447 | if (!tls_sched) { |
448 | std::atomic_thread_fence(mo); |
449 | return; |
450 | } |
451 | beforeSharedAccess(); |
452 | ThreadInfo& threadInfo = getCurrentThreadInfo(); |
453 | switch (mo) { |
454 | case std::memory_order_relaxed: |
455 | assert(false); |
456 | break; |
457 | case std::memory_order_consume: |
458 | case std::memory_order_acquire: |
459 | threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_); |
460 | break; |
461 | case std::memory_order_release: |
462 | threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_); |
463 | break; |
464 | case std::memory_order_acq_rel: |
465 | threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_); |
466 | threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_); |
467 | break; |
468 | case std::memory_order_seq_cst: |
469 | threadInfo.acqRelOrder_.sync(threadInfo.acqFenceOrder_); |
470 | threadInfo.acqRelOrder_.sync(tls_sched->seqCstFenceOrder_); |
471 | tls_sched->seqCstFenceOrder_ = threadInfo.acqRelOrder_; |
472 | threadInfo.relFenceOrder_.sync(threadInfo.acqRelOrder_); |
473 | break; |
474 | } |
475 | FOLLY_TEST_DSCHED_VLOG("fence: " << folly::detail::memory_order_to_str(mo)); |
476 | afterSharedAccess(); |
477 | } |
478 | |
479 | detail::FutexResult futexWaitImpl( |
480 | const detail::Futex<DeterministicAtomic>* futex, |
481 | uint32_t expected, |
482 | std::chrono::system_clock::time_point const* absSystemTimeout, |
483 | std::chrono::steady_clock::time_point const* absSteadyTimeout, |
484 | uint32_t waitMask) { |
485 | return deterministicFutexWaitImpl<DeterministicAtomic>( |
486 | futex, |
487 | futexLock, |
488 | futexQueues, |
489 | expected, |
490 | absSystemTimeout, |
491 | absSteadyTimeout, |
492 | waitMask); |
493 | } |
494 | |
495 | int futexWakeImpl( |
496 | const detail::Futex<DeterministicAtomic>* futex, |
497 | int count, |
498 | uint32_t wakeMask) { |
499 | return deterministicFutexWakeImpl<DeterministicAtomic>( |
500 | futex, futexLock, futexQueues, count, wakeMask); |
501 | } |
502 | |
503 | } // namespace test |
504 | } // namespace folly |
505 | |
506 | namespace folly { |
507 | |
508 | template <> |
509 | CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() { |
510 | static CacheLocality cache(CacheLocality::uniform(16)); |
511 | return cache; |
512 | } |
513 | |
514 | template <> |
515 | Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() { |
516 | return &test::DeterministicSchedule::getcpu; |
517 | } |
518 | } // namespace folly |
519 | |