1// Copyright 2009-2021 Intel Corporation
2// SPDX-License-Identifier: Apache-2.0
3
4#include "barrier.h"
5#include "condition.h"
6#include "regression.h"
7#include "thread.h"
8
9#if defined (__WIN32__)
10
11#define WIN32_LEAN_AND_MEAN
12#include <windows.h>
13
14namespace embree
15{
16 struct BarrierSysImplementation
17 {
18 __forceinline BarrierSysImplementation (size_t N)
19 : i(0), enterCount(0), exitCount(0), barrierSize(0)
20 {
21 events[0] = CreateEvent(nullptr, TRUE, FALSE, nullptr);
22 events[1] = CreateEvent(nullptr, TRUE, FALSE, nullptr);
23 init(N);
24 }
25
26 __forceinline ~BarrierSysImplementation ()
27 {
28 CloseHandle(events[0]);
29 CloseHandle(events[1]);
30 }
31
32 __forceinline void init(size_t N)
33 {
34 barrierSize = N;
35 enterCount.store(N);
36 exitCount.store(N);
37 }
38
39 __forceinline void wait()
40 {
41 /* every thread entering the barrier decrements this count */
42 size_t i0 = i;
43 size_t cnt0 = enterCount--;
44
45 /* all threads except the last one are wait in the barrier */
46 if (cnt0 > 1)
47 {
48 if (WaitForSingleObject(events[i0], INFINITE) != WAIT_OBJECT_0)
49 THROW_RUNTIME_ERROR("WaitForSingleObjects failed");
50 }
51
52 /* the last thread starts all threads waiting at the barrier */
53 else
54 {
55 i = 1-i;
56 enterCount.store(barrierSize);
57 if (SetEvent(events[i0]) == 0)
58 THROW_RUNTIME_ERROR("SetEvent failed");
59 }
60
61 /* every thread leaving the barrier decrements this count */
62 size_t cnt1 = exitCount--;
63
64 /* the last thread that left the barrier resets the event again */
65 if (cnt1 == 1)
66 {
67 exitCount.store(barrierSize);
68 if (ResetEvent(events[i0]) == 0)
69 THROW_RUNTIME_ERROR("ResetEvent failed");
70 }
71 }
72
73 public:
74 HANDLE events[2];
75 atomic<size_t> i;
76 atomic<size_t> enterCount;
77 atomic<size_t> exitCount;
78 size_t barrierSize;
79 };
80}
81
82#else
83
84namespace embree
85{
86 struct BarrierSysImplementation
87 {
88 __forceinline BarrierSysImplementation (size_t N)
89 : count(0), barrierSize(0)
90 {
91 init(N);
92 }
93
94 __forceinline void init(size_t N)
95 {
96 assert(count == 0);
97 count = 0;
98 barrierSize = N;
99 }
100
101 __forceinline void wait()
102 {
103 mutex.lock();
104 count++;
105
106 if (count == barrierSize) {
107 count = 0;
108 cond.notify_all();
109 mutex.unlock();
110 return;
111 }
112
113 cond.wait(mutex);
114 mutex.unlock();
115 return;
116 }
117
118 public:
119 MutexSys mutex;
120 ConditionSys cond;
121 volatile size_t count;
122 volatile size_t barrierSize;
123 };
124}
125
126#endif
127
128namespace embree
129{
130 BarrierSys::BarrierSys (size_t N) {
131 opaque = new BarrierSysImplementation(N);
132 }
133
134 BarrierSys::~BarrierSys () {
135 delete (BarrierSysImplementation*) opaque;
136 }
137
138 void BarrierSys::init(size_t count) {
139 ((BarrierSysImplementation*) opaque)->init(count);
140 }
141
142 void BarrierSys::wait() {
143 ((BarrierSysImplementation*) opaque)->wait();
144 }
145
146 LinearBarrierActive::LinearBarrierActive (size_t N)
147 : count0(nullptr), count1(nullptr), mode(0), flag0(0), flag1(0), threadCount(0)
148 {
149 if (N == 0) N = getNumberOfLogicalThreads();
150 init(N);
151 }
152
153 LinearBarrierActive::~LinearBarrierActive()
154 {
155 delete[] count0;
156 delete[] count1;
157 }
158
159 void LinearBarrierActive::init(size_t N)
160 {
161 if (threadCount != N) {
162 threadCount = N;
163 if (count0) delete[] count0; count0 = new unsigned char[N];
164 if (count1) delete[] count1; count1 = new unsigned char[N];
165 }
166 mode = 0;
167 flag0 = 0;
168 flag1 = 0;
169 for (size_t i=0; i<N; i++) count0[i] = 0;
170 for (size_t i=0; i<N; i++) count1[i] = 0;
171 }
172
173 void LinearBarrierActive::wait (const size_t threadIndex)
174 {
175 if (mode == 0)
176 {
177 if (threadIndex == 0)
178 {
179 for (size_t i=0; i<threadCount; i++)
180 count1[i] = 0;
181
182 for (size_t i=1; i<threadCount; i++)
183 {
184 while (likely(count0[i] == 0))
185 pause_cpu();
186 }
187 mode = 1;
188 flag1 = 0;
189 __memory_barrier();
190 flag0 = 1;
191 }
192 else
193 {
194 count0[threadIndex] = 1;
195 {
196 while (likely(flag0 == 0))
197 pause_cpu();
198 }
199
200 }
201 }
202 else
203 {
204 if (threadIndex == 0)
205 {
206 for (size_t i=0; i<threadCount; i++)
207 count0[i] = 0;
208
209 for (size_t i=1; i<threadCount; i++)
210 {
211 while (likely(count1[i] == 0))
212 pause_cpu();
213 }
214
215 mode = 0;
216 flag0 = 0;
217 __memory_barrier();
218 flag1 = 1;
219 }
220 else
221 {
222 count1[threadIndex] = 1;
223 {
224 while (likely(flag1 == 0))
225 pause_cpu();
226 }
227 }
228 }
229 }
230
231 struct barrier_sys_regression_test : public RegressionTest
232 {
233 BarrierSys barrier;
234 std::atomic<size_t> threadID;
235 std::atomic<size_t> numFailed;
236 std::vector<size_t> threadResults;
237
238 barrier_sys_regression_test()
239 : RegressionTest("barrier_sys_regression_test"), threadID(0), numFailed(0)
240 {
241 registerRegressionTest(this);
242 }
243
244 static void thread_alloc(barrier_sys_regression_test* This)
245 {
246 size_t tid = This->threadID++;
247 for (size_t j=0; j<1000; j++)
248 {
249 This->barrier.wait();
250 This->threadResults[tid] = tid;
251 This->barrier.wait();
252 }
253 }
254
255 bool run ()
256 {
257 threadID.store(0);
258 numFailed.store(0);
259
260 size_t numThreads = getNumberOfLogicalThreads();
261 threadResults.resize(numThreads);
262 barrier.init(numThreads+1);
263
264 /* create threads */
265 std::vector<thread_t> threads;
266 for (size_t i=0; i<numThreads; i++)
267 threads.push_back(createThread((thread_func)thread_alloc,this));
268
269 /* run test */
270 for (size_t i=0; i<1000; i++)
271 {
272 for (size_t i=0; i<numThreads; i++) threadResults[i] = 0;
273 barrier.wait();
274 barrier.wait();
275 for (size_t i=0; i<numThreads; i++) numFailed += threadResults[i] != i;
276 }
277
278 /* destroy threads */
279 for (size_t i=0; i<numThreads; i++)
280 join(threads[i]);
281
282 return numFailed == 0;
283 }
284 };
285
286 barrier_sys_regression_test barrier_sys_regression_test;
287}
288
289
290