| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * barrier.c |
| 4 | * Barriers for synchronizing cooperating processes. |
| 5 | * |
| 6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 7 | * Portions Copyright (c) 1994, Regents of the University of California |
| 8 | * |
| 9 | * From Wikipedia[1]: "In parallel computing, a barrier is a type of |
| 10 | * synchronization method. A barrier for a group of threads or processes in |
| 11 | * the source code means any thread/process must stop at this point and cannot |
| 12 | * proceed until all other threads/processes reach this barrier." |
| 13 | * |
| 14 | * This implementation of barriers allows for static sets of participants |
| 15 | * known up front, or dynamic sets of participants which processes can join or |
| 16 | * leave at any time. In the dynamic case, a phase number can be used to |
| 17 | * track progress through a parallel algorithm, and may be necessary to |
| 18 | * synchronize with the current phase of a multi-phase algorithm when a new |
| 19 | * participant joins. In the static case, the phase number is used |
| 20 | * internally, but it isn't strictly necessary for client code to access it |
| 21 | * because the phase can only advance when the declared number of participants |
| 22 | * reaches the barrier, so client code should be in no doubt about the current |
| 23 | * phase of computation at all times. |
| 24 | * |
| 25 | * Consider a parallel algorithm that involves separate phases of computation |
| 26 | * A, B and C where the output of each phase is needed before the next phase |
| 27 | * can begin. |
| 28 | * |
| 29 | * In the case of a static barrier initialized with 4 participants, each |
| 30 | * participant works on phase A, then calls BarrierArriveAndWait to wait until |
| 31 | * all 4 participants have reached that point. When BarrierArriveAndWait |
| 32 | * returns control, each participant can work on B, and so on. Because the |
| 33 | * barrier knows how many participants to expect, the phases of computation |
| 34 | * don't need labels or numbers, since each process's program counter implies |
| 35 | * the current phase. Even if some of the processes are slow to start up and |
| 36 | * begin running phase A, the other participants are expecting them and will |
| 37 | * patiently wait at the barrier. The code could be written as follows: |
| 38 | * |
| 39 | * perform_a(); |
| 40 | * BarrierArriveAndWait(&barrier, ...); |
| 41 | * perform_b(); |
| 42 | * BarrierArriveAndWait(&barrier, ...); |
| 43 | * perform_c(); |
| 44 | * BarrierArriveAndWait(&barrier, ...); |
| 45 | * |
| 46 | * If the number of participants is not known up front, then a dynamic barrier |
| 47 | * is needed and the number should be set to zero at initialization. New |
| 48 | * complications arise because the number necessarily changes over time as |
| 49 | * participants attach and detach, and therefore phases B, C or even the end |
| 50 | * of processing may be reached before any given participant has started |
| 51 | * running and attached. Therefore the client code must perform an initial |
| 52 | * test of the phase number after attaching, because it needs to find out |
| 53 | * which phase of the algorithm has been reached by any participants that are |
| 54 | * already attached in order to synchronize with that work. Once the program |
| 55 | * counter or some other representation of current progress is synchronized |
| 56 | * with the barrier's phase, normal control flow can be used just as in the |
| 57 | * static case. Our example could be written using a switch statement with |
| 58 | * cases that fall-through, as follows: |
| 59 | * |
| 60 | * phase = BarrierAttach(&barrier); |
| 61 | * switch (phase) |
| 62 | * { |
| 63 | * case PHASE_A: |
| 64 | * perform_a(); |
| 65 | * BarrierArriveAndWait(&barrier, ...); |
| 66 | * case PHASE_B: |
| 67 | * perform_b(); |
| 68 | * BarrierArriveAndWait(&barrier, ...); |
| 69 | * case PHASE_C: |
| 70 | * perform_c(); |
| 71 | * BarrierArriveAndWait(&barrier, ...); |
| 72 | * } |
| 73 | * BarrierDetach(&barrier); |
| 74 | * |
| 75 | * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic |
| 76 | * barriers behave similarly to Java's java.util.concurrent.Phaser. |
| 77 | * |
| 78 | * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science) |
| 79 | * |
| 80 | * IDENTIFICATION |
| 81 | * src/backend/storage/ipc/barrier.c |
| 82 | * |
| 83 | *------------------------------------------------------------------------- |
| 84 | */ |
| 85 | |
| 86 | #include "postgres.h" |
| 87 | #include "storage/barrier.h" |
| 88 | |
| 89 | static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive); |
| 90 | |
| 91 | /* |
| 92 | * Initialize this barrier. To use a static party size, provide the number of |
| 93 | * participants to wait for at each phase indicating that that number of |
| 94 | * backends is implicitly attached. To use a dynamic party size, specify zero |
| 95 | * here and then use BarrierAttach() and |
| 96 | * BarrierDetach()/BarrierArriveAndDetach() to register and deregister |
| 97 | * participants explicitly. |
| 98 | */ |
| 99 | void |
| 100 | BarrierInit(Barrier *barrier, int participants) |
| 101 | { |
| 102 | SpinLockInit(&barrier->mutex); |
| 103 | barrier->participants = participants; |
| 104 | barrier->arrived = 0; |
| 105 | barrier->phase = 0; |
| 106 | barrier->elected = 0; |
| 107 | barrier->static_party = participants > 0; |
| 108 | ConditionVariableInit(&barrier->condition_variable); |
| 109 | } |
| 110 | |
| 111 | /* |
| 112 | * Arrive at this barrier, wait for all other attached participants to arrive |
| 113 | * too and then return. Increments the current phase. The caller must be |
| 114 | * attached. |
| 115 | * |
| 116 | * While waiting, pg_stat_activity shows a wait_event_type and wait_event |
| 117 | * controlled by the wait_event_info passed in, which should be a value from |
| 118 | * one of the WaitEventXXX enums defined in pgstat.h. |
| 119 | * |
| 120 | * Return true in one arbitrarily chosen participant. Return false in all |
| 121 | * others. The return code can be used to elect one participant to execute a |
| 122 | * phase of work that must be done serially while other participants wait. |
| 123 | */ |
| 124 | bool |
| 125 | BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info) |
| 126 | { |
| 127 | bool release = false; |
| 128 | bool elected; |
| 129 | int start_phase; |
| 130 | int next_phase; |
| 131 | |
| 132 | SpinLockAcquire(&barrier->mutex); |
| 133 | start_phase = barrier->phase; |
| 134 | next_phase = start_phase + 1; |
| 135 | ++barrier->arrived; |
| 136 | if (barrier->arrived == barrier->participants) |
| 137 | { |
| 138 | release = true; |
| 139 | barrier->arrived = 0; |
| 140 | barrier->phase = next_phase; |
| 141 | barrier->elected = next_phase; |
| 142 | } |
| 143 | SpinLockRelease(&barrier->mutex); |
| 144 | |
| 145 | /* |
| 146 | * If we were the last expected participant to arrive, we can release our |
| 147 | * peers and return true to indicate that this backend has been elected to |
| 148 | * perform any serial work. |
| 149 | */ |
| 150 | if (release) |
| 151 | { |
| 152 | ConditionVariableBroadcast(&barrier->condition_variable); |
| 153 | |
| 154 | return true; |
| 155 | } |
| 156 | |
| 157 | /* |
| 158 | * Otherwise we have to wait for the last participant to arrive and |
| 159 | * advance the phase. |
| 160 | */ |
| 161 | elected = false; |
| 162 | ConditionVariablePrepareToSleep(&barrier->condition_variable); |
| 163 | for (;;) |
| 164 | { |
| 165 | /* |
| 166 | * We know that phase must either be start_phase, indicating that we |
| 167 | * need to keep waiting, or next_phase, indicating that the last |
| 168 | * participant that we were waiting for has either arrived or detached |
| 169 | * so that the next phase has begun. The phase cannot advance any |
| 170 | * further than that without this backend's participation, because |
| 171 | * this backend is attached. |
| 172 | */ |
| 173 | SpinLockAcquire(&barrier->mutex); |
| 174 | Assert(barrier->phase == start_phase || barrier->phase == next_phase); |
| 175 | release = barrier->phase == next_phase; |
| 176 | if (release && barrier->elected != next_phase) |
| 177 | { |
| 178 | /* |
| 179 | * Usually the backend that arrives last and releases the other |
| 180 | * backends is elected to return true (see above), so that it can |
| 181 | * begin processing serial work while it has a CPU timeslice. |
| 182 | * However, if the barrier advanced because someone detached, then |
| 183 | * one of the backends that is awoken will need to be elected. |
| 184 | */ |
| 185 | barrier->elected = barrier->phase; |
| 186 | elected = true; |
| 187 | } |
| 188 | SpinLockRelease(&barrier->mutex); |
| 189 | if (release) |
| 190 | break; |
| 191 | ConditionVariableSleep(&barrier->condition_variable, wait_event_info); |
| 192 | } |
| 193 | ConditionVariableCancelSleep(); |
| 194 | |
| 195 | return elected; |
| 196 | } |
| 197 | |
| 198 | /* |
| 199 | * Arrive at this barrier, but detach rather than waiting. Returns true if |
| 200 | * the caller was the last to detach. |
| 201 | */ |
| 202 | bool |
| 203 | BarrierArriveAndDetach(Barrier *barrier) |
| 204 | { |
| 205 | return BarrierDetachImpl(barrier, true); |
| 206 | } |
| 207 | |
| 208 | /* |
| 209 | * Attach to a barrier. All waiting participants will now wait for this |
| 210 | * participant to call BarrierArriveAndWait(), BarrierDetach() or |
| 211 | * BarrierArriveAndDetach(). Return the current phase. |
| 212 | */ |
| 213 | int |
| 214 | BarrierAttach(Barrier *barrier) |
| 215 | { |
| 216 | int phase; |
| 217 | |
| 218 | Assert(!barrier->static_party); |
| 219 | |
| 220 | SpinLockAcquire(&barrier->mutex); |
| 221 | ++barrier->participants; |
| 222 | phase = barrier->phase; |
| 223 | SpinLockRelease(&barrier->mutex); |
| 224 | |
| 225 | return phase; |
| 226 | } |
| 227 | |
| 228 | /* |
| 229 | * Detach from a barrier. This may release other waiters from |
| 230 | * BarrierArriveAndWait() and advance the phase if they were only waiting for |
| 231 | * this backend. Return true if this participant was the last to detach. |
| 232 | */ |
| 233 | bool |
| 234 | BarrierDetach(Barrier *barrier) |
| 235 | { |
| 236 | return BarrierDetachImpl(barrier, false); |
| 237 | } |
| 238 | |
| 239 | /* |
| 240 | * Return the current phase of a barrier. The caller must be attached. |
| 241 | */ |
| 242 | int |
| 243 | BarrierPhase(Barrier *barrier) |
| 244 | { |
| 245 | /* |
| 246 | * It is OK to read barrier->phase without locking, because it can't |
| 247 | * change without us (we are attached to it), and we executed a memory |
| 248 | * barrier when we either attached or participated in changing it last |
| 249 | * time. |
| 250 | */ |
| 251 | return barrier->phase; |
| 252 | } |
| 253 | |
| 254 | /* |
| 255 | * Return an instantaneous snapshot of the number of participants currently |
| 256 | * attached to this barrier. For debugging purposes only. |
| 257 | */ |
| 258 | int |
| 259 | BarrierParticipants(Barrier *barrier) |
| 260 | { |
| 261 | int participants; |
| 262 | |
| 263 | SpinLockAcquire(&barrier->mutex); |
| 264 | participants = barrier->participants; |
| 265 | SpinLockRelease(&barrier->mutex); |
| 266 | |
| 267 | return participants; |
| 268 | } |
| 269 | |
| 270 | /* |
| 271 | * Detach from a barrier. If 'arrive' is true then also increment the phase |
| 272 | * if there are no other participants. If there are other participants |
| 273 | * waiting, then the phase will be advanced and they'll be released if they |
| 274 | * were only waiting for the caller. Return true if this participant was the |
| 275 | * last to detach. |
| 276 | */ |
| 277 | static inline bool |
| 278 | BarrierDetachImpl(Barrier *barrier, bool arrive) |
| 279 | { |
| 280 | bool release; |
| 281 | bool last; |
| 282 | |
| 283 | Assert(!barrier->static_party); |
| 284 | |
| 285 | SpinLockAcquire(&barrier->mutex); |
| 286 | Assert(barrier->participants > 0); |
| 287 | --barrier->participants; |
| 288 | |
| 289 | /* |
| 290 | * If any other participants are waiting and we were the last participant |
| 291 | * waited for, release them. If no other participants are waiting, but |
| 292 | * this is a BarrierArriveAndDetach() call, then advance the phase too. |
| 293 | */ |
| 294 | if ((arrive || barrier->participants > 0) && |
| 295 | barrier->arrived == barrier->participants) |
| 296 | { |
| 297 | release = true; |
| 298 | barrier->arrived = 0; |
| 299 | ++barrier->phase; |
| 300 | } |
| 301 | else |
| 302 | release = false; |
| 303 | |
| 304 | last = barrier->participants == 0; |
| 305 | SpinLockRelease(&barrier->mutex); |
| 306 | |
| 307 | if (release) |
| 308 | ConditionVariableBroadcast(&barrier->condition_variable); |
| 309 | |
| 310 | return last; |
| 311 | } |
| 312 | |