1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * barrier.c |
4 | * Barriers for synchronizing cooperating processes. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * From Wikipedia[1]: "In parallel computing, a barrier is a type of |
10 | * synchronization method. A barrier for a group of threads or processes in |
11 | * the source code means any thread/process must stop at this point and cannot |
12 | * proceed until all other threads/processes reach this barrier." |
13 | * |
14 | * This implementation of barriers allows for static sets of participants |
15 | * known up front, or dynamic sets of participants which processes can join or |
16 | * leave at any time. In the dynamic case, a phase number can be used to |
17 | * track progress through a parallel algorithm, and may be necessary to |
18 | * synchronize with the current phase of a multi-phase algorithm when a new |
19 | * participant joins. In the static case, the phase number is used |
20 | * internally, but it isn't strictly necessary for client code to access it |
21 | * because the phase can only advance when the declared number of participants |
22 | * reaches the barrier, so client code should be in no doubt about the current |
23 | * phase of computation at all times. |
24 | * |
25 | * Consider a parallel algorithm that involves separate phases of computation |
26 | * A, B and C where the output of each phase is needed before the next phase |
27 | * can begin. |
28 | * |
29 | * In the case of a static barrier initialized with 4 participants, each |
30 | * participant works on phase A, then calls BarrierArriveAndWait to wait until |
31 | * all 4 participants have reached that point. When BarrierArriveAndWait |
32 | * returns control, each participant can work on B, and so on. Because the |
33 | * barrier knows how many participants to expect, the phases of computation |
34 | * don't need labels or numbers, since each process's program counter implies |
35 | * the current phase. Even if some of the processes are slow to start up and |
36 | * begin running phase A, the other participants are expecting them and will |
37 | * patiently wait at the barrier. The code could be written as follows: |
38 | * |
39 | * perform_a(); |
40 | * BarrierArriveAndWait(&barrier, ...); |
41 | * perform_b(); |
42 | * BarrierArriveAndWait(&barrier, ...); |
43 | * perform_c(); |
44 | * BarrierArriveAndWait(&barrier, ...); |
45 | * |
46 | * If the number of participants is not known up front, then a dynamic barrier |
47 | * is needed and the number should be set to zero at initialization. New |
48 | * complications arise because the number necessarily changes over time as |
49 | * participants attach and detach, and therefore phases B, C or even the end |
50 | * of processing may be reached before any given participant has started |
51 | * running and attached. Therefore the client code must perform an initial |
52 | * test of the phase number after attaching, because it needs to find out |
53 | * which phase of the algorithm has been reached by any participants that are |
54 | * already attached in order to synchronize with that work. Once the program |
55 | * counter or some other representation of current progress is synchronized |
56 | * with the barrier's phase, normal control flow can be used just as in the |
57 | * static case. Our example could be written using a switch statement with |
58 | * cases that fall-through, as follows: |
59 | * |
60 | * phase = BarrierAttach(&barrier); |
61 | * switch (phase) |
62 | * { |
63 | * case PHASE_A: |
64 | * perform_a(); |
65 | * BarrierArriveAndWait(&barrier, ...); |
66 | * case PHASE_B: |
67 | * perform_b(); |
68 | * BarrierArriveAndWait(&barrier, ...); |
69 | * case PHASE_C: |
70 | * perform_c(); |
71 | * BarrierArriveAndWait(&barrier, ...); |
72 | * } |
73 | * BarrierDetach(&barrier); |
74 | * |
75 | * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic |
76 | * barriers behave similarly to Java's java.util.concurrent.Phaser. |
77 | * |
78 | * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science) |
79 | * |
80 | * IDENTIFICATION |
81 | * src/backend/storage/ipc/barrier.c |
82 | * |
83 | *------------------------------------------------------------------------- |
84 | */ |
85 | |
86 | #include "postgres.h" |
87 | #include "storage/barrier.h" |
88 | |
89 | static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive); |
90 | |
91 | /* |
92 | * Initialize this barrier. To use a static party size, provide the number of |
93 | * participants to wait for at each phase indicating that that number of |
94 | * backends is implicitly attached. To use a dynamic party size, specify zero |
95 | * here and then use BarrierAttach() and |
96 | * BarrierDetach()/BarrierArriveAndDetach() to register and deregister |
97 | * participants explicitly. |
98 | */ |
99 | void |
100 | BarrierInit(Barrier *barrier, int participants) |
101 | { |
102 | SpinLockInit(&barrier->mutex); |
103 | barrier->participants = participants; |
104 | barrier->arrived = 0; |
105 | barrier->phase = 0; |
106 | barrier->elected = 0; |
107 | barrier->static_party = participants > 0; |
108 | ConditionVariableInit(&barrier->condition_variable); |
109 | } |
110 | |
111 | /* |
112 | * Arrive at this barrier, wait for all other attached participants to arrive |
113 | * too and then return. Increments the current phase. The caller must be |
114 | * attached. |
115 | * |
116 | * While waiting, pg_stat_activity shows a wait_event_type and wait_event |
117 | * controlled by the wait_event_info passed in, which should be a value from |
118 | * one of the WaitEventXXX enums defined in pgstat.h. |
119 | * |
120 | * Return true in one arbitrarily chosen participant. Return false in all |
121 | * others. The return code can be used to elect one participant to execute a |
122 | * phase of work that must be done serially while other participants wait. |
123 | */ |
124 | bool |
125 | BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info) |
126 | { |
127 | bool release = false; |
128 | bool elected; |
129 | int start_phase; |
130 | int next_phase; |
131 | |
132 | SpinLockAcquire(&barrier->mutex); |
133 | start_phase = barrier->phase; |
134 | next_phase = start_phase + 1; |
135 | ++barrier->arrived; |
136 | if (barrier->arrived == barrier->participants) |
137 | { |
138 | release = true; |
139 | barrier->arrived = 0; |
140 | barrier->phase = next_phase; |
141 | barrier->elected = next_phase; |
142 | } |
143 | SpinLockRelease(&barrier->mutex); |
144 | |
145 | /* |
146 | * If we were the last expected participant to arrive, we can release our |
147 | * peers and return true to indicate that this backend has been elected to |
148 | * perform any serial work. |
149 | */ |
150 | if (release) |
151 | { |
152 | ConditionVariableBroadcast(&barrier->condition_variable); |
153 | |
154 | return true; |
155 | } |
156 | |
157 | /* |
158 | * Otherwise we have to wait for the last participant to arrive and |
159 | * advance the phase. |
160 | */ |
161 | elected = false; |
162 | ConditionVariablePrepareToSleep(&barrier->condition_variable); |
163 | for (;;) |
164 | { |
165 | /* |
166 | * We know that phase must either be start_phase, indicating that we |
167 | * need to keep waiting, or next_phase, indicating that the last |
168 | * participant that we were waiting for has either arrived or detached |
169 | * so that the next phase has begun. The phase cannot advance any |
170 | * further than that without this backend's participation, because |
171 | * this backend is attached. |
172 | */ |
173 | SpinLockAcquire(&barrier->mutex); |
174 | Assert(barrier->phase == start_phase || barrier->phase == next_phase); |
175 | release = barrier->phase == next_phase; |
176 | if (release && barrier->elected != next_phase) |
177 | { |
178 | /* |
179 | * Usually the backend that arrives last and releases the other |
180 | * backends is elected to return true (see above), so that it can |
181 | * begin processing serial work while it has a CPU timeslice. |
182 | * However, if the barrier advanced because someone detached, then |
183 | * one of the backends that is awoken will need to be elected. |
184 | */ |
185 | barrier->elected = barrier->phase; |
186 | elected = true; |
187 | } |
188 | SpinLockRelease(&barrier->mutex); |
189 | if (release) |
190 | break; |
191 | ConditionVariableSleep(&barrier->condition_variable, wait_event_info); |
192 | } |
193 | ConditionVariableCancelSleep(); |
194 | |
195 | return elected; |
196 | } |
197 | |
198 | /* |
199 | * Arrive at this barrier, but detach rather than waiting. Returns true if |
200 | * the caller was the last to detach. |
201 | */ |
202 | bool |
203 | BarrierArriveAndDetach(Barrier *barrier) |
204 | { |
205 | return BarrierDetachImpl(barrier, true); |
206 | } |
207 | |
208 | /* |
209 | * Attach to a barrier. All waiting participants will now wait for this |
210 | * participant to call BarrierArriveAndWait(), BarrierDetach() or |
211 | * BarrierArriveAndDetach(). Return the current phase. |
212 | */ |
213 | int |
214 | BarrierAttach(Barrier *barrier) |
215 | { |
216 | int phase; |
217 | |
218 | Assert(!barrier->static_party); |
219 | |
220 | SpinLockAcquire(&barrier->mutex); |
221 | ++barrier->participants; |
222 | phase = barrier->phase; |
223 | SpinLockRelease(&barrier->mutex); |
224 | |
225 | return phase; |
226 | } |
227 | |
228 | /* |
229 | * Detach from a barrier. This may release other waiters from |
230 | * BarrierArriveAndWait() and advance the phase if they were only waiting for |
231 | * this backend. Return true if this participant was the last to detach. |
232 | */ |
233 | bool |
234 | BarrierDetach(Barrier *barrier) |
235 | { |
236 | return BarrierDetachImpl(barrier, false); |
237 | } |
238 | |
239 | /* |
240 | * Return the current phase of a barrier. The caller must be attached. |
241 | */ |
242 | int |
243 | BarrierPhase(Barrier *barrier) |
244 | { |
245 | /* |
246 | * It is OK to read barrier->phase without locking, because it can't |
247 | * change without us (we are attached to it), and we executed a memory |
248 | * barrier when we either attached or participated in changing it last |
249 | * time. |
250 | */ |
251 | return barrier->phase; |
252 | } |
253 | |
254 | /* |
255 | * Return an instantaneous snapshot of the number of participants currently |
256 | * attached to this barrier. For debugging purposes only. |
257 | */ |
258 | int |
259 | BarrierParticipants(Barrier *barrier) |
260 | { |
261 | int participants; |
262 | |
263 | SpinLockAcquire(&barrier->mutex); |
264 | participants = barrier->participants; |
265 | SpinLockRelease(&barrier->mutex); |
266 | |
267 | return participants; |
268 | } |
269 | |
270 | /* |
271 | * Detach from a barrier. If 'arrive' is true then also increment the phase |
272 | * if there are no other participants. If there are other participants |
273 | * waiting, then the phase will be advanced and they'll be released if they |
274 | * were only waiting for the caller. Return true if this participant was the |
275 | * last to detach. |
276 | */ |
277 | static inline bool |
278 | BarrierDetachImpl(Barrier *barrier, bool arrive) |
279 | { |
280 | bool release; |
281 | bool last; |
282 | |
283 | Assert(!barrier->static_party); |
284 | |
285 | SpinLockAcquire(&barrier->mutex); |
286 | Assert(barrier->participants > 0); |
287 | --barrier->participants; |
288 | |
289 | /* |
290 | * If any other participants are waiting and we were the last participant |
291 | * waited for, release them. If no other participants are waiting, but |
292 | * this is a BarrierArriveAndDetach() call, then advance the phase too. |
293 | */ |
294 | if ((arrive || barrier->participants > 0) && |
295 | barrier->arrived == barrier->participants) |
296 | { |
297 | release = true; |
298 | barrier->arrived = 0; |
299 | ++barrier->phase; |
300 | } |
301 | else |
302 | release = false; |
303 | |
304 | last = barrier->participants == 0; |
305 | SpinLockRelease(&barrier->mutex); |
306 | |
307 | if (release) |
308 | ConditionVariableBroadcast(&barrier->condition_variable); |
309 | |
310 | return last; |
311 | } |
312 | |