1 | /* |
2 | * coroutine queues and locks |
3 | * |
4 | * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com> |
5 | * |
6 | * Permission is hereby granted, free of charge, to any person obtaining a copy |
7 | * of this software and associated documentation files (the "Software"), to deal |
8 | * in the Software without restriction, including without limitation the rights |
9 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
10 | * copies of the Software, and to permit persons to whom the Software is |
11 | * furnished to do so, subject to the following conditions: |
12 | * |
13 | * The above copyright notice and this permission notice shall be included in |
14 | * all copies or substantial portions of the Software. |
15 | * |
16 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
17 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
18 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
19 | * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
20 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
21 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
22 | * THE SOFTWARE. |
23 | * |
24 | * The lock-free mutex implementation is based on OSv |
25 | * (core/lfmutex.cc, include/lockfree/mutex.hh). |
26 | * Copyright (C) 2013 Cloudius Systems, Ltd. |
27 | */ |
28 | |
29 | #include "qemu/osdep.h" |
30 | #include "qemu/coroutine.h" |
31 | #include "qemu/coroutine_int.h" |
32 | #include "qemu/processor.h" |
33 | #include "qemu/queue.h" |
34 | #include "block/aio.h" |
35 | #include "trace.h" |
36 | |
37 | void qemu_co_queue_init(CoQueue *queue) |
38 | { |
39 | QSIMPLEQ_INIT(&queue->entries); |
40 | } |
41 | |
42 | void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock) |
43 | { |
44 | Coroutine *self = qemu_coroutine_self(); |
45 | QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next); |
46 | |
47 | if (lock) { |
48 | qemu_lockable_unlock(lock); |
49 | } |
50 | |
51 | /* There is no race condition here. Other threads will call |
52 | * aio_co_schedule on our AioContext, which can reenter this |
53 | * coroutine but only after this yield and after the main loop |
54 | * has gone through the next iteration. |
55 | */ |
56 | qemu_coroutine_yield(); |
57 | assert(qemu_in_coroutine()); |
58 | |
59 | /* TODO: OSv implements wait morphing here, where the wakeup |
60 | * primitive automatically places the woken coroutine on the |
61 | * mutex's queue. This avoids the thundering herd effect. |
62 | * This could be implemented for CoMutexes, but not really for |
63 | * other cases of QemuLockable. |
64 | */ |
65 | if (lock) { |
66 | qemu_lockable_lock(lock); |
67 | } |
68 | } |
69 | |
70 | static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) |
71 | { |
72 | Coroutine *next; |
73 | |
74 | if (QSIMPLEQ_EMPTY(&queue->entries)) { |
75 | return false; |
76 | } |
77 | |
78 | while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) { |
79 | QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); |
80 | aio_co_wake(next); |
81 | if (single) { |
82 | break; |
83 | } |
84 | } |
85 | return true; |
86 | } |
87 | |
88 | bool coroutine_fn qemu_co_queue_next(CoQueue *queue) |
89 | { |
90 | assert(qemu_in_coroutine()); |
91 | return qemu_co_queue_do_restart(queue, true); |
92 | } |
93 | |
94 | void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue) |
95 | { |
96 | assert(qemu_in_coroutine()); |
97 | qemu_co_queue_do_restart(queue, false); |
98 | } |
99 | |
100 | bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock) |
101 | { |
102 | Coroutine *next; |
103 | |
104 | next = QSIMPLEQ_FIRST(&queue->entries); |
105 | if (!next) { |
106 | return false; |
107 | } |
108 | |
109 | QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); |
110 | if (lock) { |
111 | qemu_lockable_unlock(lock); |
112 | } |
113 | aio_co_wake(next); |
114 | if (lock) { |
115 | qemu_lockable_lock(lock); |
116 | } |
117 | return true; |
118 | } |
119 | |
120 | bool qemu_co_queue_empty(CoQueue *queue) |
121 | { |
122 | return QSIMPLEQ_FIRST(&queue->entries) == NULL; |
123 | } |
124 | |
125 | /* The wait records are handled with a multiple-producer, single-consumer |
126 | * lock-free queue. There cannot be two concurrent pop_waiter() calls |
127 | * because pop_waiter() can only be called while mutex->handoff is zero. |
128 | * This can happen in three cases: |
129 | * - in qemu_co_mutex_unlock, before the hand-off protocol has started. |
130 | * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and |
131 | * not take part in the handoff. |
132 | * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from |
133 | * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail |
134 | * the cmpxchg (it will see either 0 or the next sequence value) and |
135 | * exit. The next hand-off cannot begin until qemu_co_mutex_lock has |
136 | * woken up someone. |
137 | * - in qemu_co_mutex_unlock, if it takes the hand-off token itself. |
138 | * In this case another iteration starts with mutex->handoff == 0; |
139 | * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and |
140 | * qemu_co_mutex_unlock will go back to case (1). |
141 | * |
142 | * The following functions manage this queue. |
143 | */ |
144 | typedef struct CoWaitRecord { |
145 | Coroutine *co; |
146 | QSLIST_ENTRY(CoWaitRecord) next; |
147 | } CoWaitRecord; |
148 | |
149 | static void push_waiter(CoMutex *mutex, CoWaitRecord *w) |
150 | { |
151 | w->co = qemu_coroutine_self(); |
152 | QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next); |
153 | } |
154 | |
155 | static void move_waiters(CoMutex *mutex) |
156 | { |
157 | QSLIST_HEAD(, CoWaitRecord) reversed; |
158 | QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push); |
159 | while (!QSLIST_EMPTY(&reversed)) { |
160 | CoWaitRecord *w = QSLIST_FIRST(&reversed); |
161 | QSLIST_REMOVE_HEAD(&reversed, next); |
162 | QSLIST_INSERT_HEAD(&mutex->to_pop, w, next); |
163 | } |
164 | } |
165 | |
166 | static CoWaitRecord *pop_waiter(CoMutex *mutex) |
167 | { |
168 | CoWaitRecord *w; |
169 | |
170 | if (QSLIST_EMPTY(&mutex->to_pop)) { |
171 | move_waiters(mutex); |
172 | if (QSLIST_EMPTY(&mutex->to_pop)) { |
173 | return NULL; |
174 | } |
175 | } |
176 | w = QSLIST_FIRST(&mutex->to_pop); |
177 | QSLIST_REMOVE_HEAD(&mutex->to_pop, next); |
178 | return w; |
179 | } |
180 | |
181 | static bool has_waiters(CoMutex *mutex) |
182 | { |
183 | return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push); |
184 | } |
185 | |
186 | void qemu_co_mutex_init(CoMutex *mutex) |
187 | { |
188 | memset(mutex, 0, sizeof(*mutex)); |
189 | } |
190 | |
191 | static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co) |
192 | { |
193 | /* Read co before co->ctx; pairs with smp_wmb() in |
194 | * qemu_coroutine_enter(). |
195 | */ |
196 | smp_read_barrier_depends(); |
197 | mutex->ctx = co->ctx; |
198 | aio_co_wake(co); |
199 | } |
200 | |
201 | static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, |
202 | CoMutex *mutex) |
203 | { |
204 | Coroutine *self = qemu_coroutine_self(); |
205 | CoWaitRecord w; |
206 | unsigned old_handoff; |
207 | |
208 | trace_qemu_co_mutex_lock_entry(mutex, self); |
209 | w.co = self; |
210 | push_waiter(mutex, &w); |
211 | |
212 | /* This is the "Responsibility Hand-Off" protocol; a lock() picks from |
213 | * a concurrent unlock() the responsibility of waking somebody up. |
214 | */ |
215 | old_handoff = atomic_mb_read(&mutex->handoff); |
216 | if (old_handoff && |
217 | has_waiters(mutex) && |
218 | atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) { |
219 | /* There can be no concurrent pops, because there can be only |
220 | * one active handoff at a time. |
221 | */ |
222 | CoWaitRecord *to_wake = pop_waiter(mutex); |
223 | Coroutine *co = to_wake->co; |
224 | if (co == self) { |
225 | /* We got the lock ourselves! */ |
226 | assert(to_wake == &w); |
227 | mutex->ctx = ctx; |
228 | return; |
229 | } |
230 | |
231 | qemu_co_mutex_wake(mutex, co); |
232 | } |
233 | |
234 | qemu_coroutine_yield(); |
235 | trace_qemu_co_mutex_lock_return(mutex, self); |
236 | } |
237 | |
238 | void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) |
239 | { |
240 | AioContext *ctx = qemu_get_current_aio_context(); |
241 | Coroutine *self = qemu_coroutine_self(); |
242 | int waiters, i; |
243 | |
244 | /* Running a very small critical section on pthread_mutex_t and CoMutex |
245 | * shows that pthread_mutex_t is much faster because it doesn't actually |
246 | * go to sleep. What happens is that the critical section is shorter |
247 | * than the latency of entering the kernel and thus FUTEX_WAIT always |
248 | * fails. With CoMutex there is no such latency but you still want to |
249 | * avoid wait and wakeup. So introduce it artificially. |
250 | */ |
251 | i = 0; |
252 | retry_fast_path: |
253 | waiters = atomic_cmpxchg(&mutex->locked, 0, 1); |
254 | if (waiters != 0) { |
255 | while (waiters == 1 && ++i < 1000) { |
256 | if (atomic_read(&mutex->ctx) == ctx) { |
257 | break; |
258 | } |
259 | if (atomic_read(&mutex->locked) == 0) { |
260 | goto retry_fast_path; |
261 | } |
262 | cpu_relax(); |
263 | } |
264 | waiters = atomic_fetch_inc(&mutex->locked); |
265 | } |
266 | |
267 | if (waiters == 0) { |
268 | /* Uncontended. */ |
269 | trace_qemu_co_mutex_lock_uncontended(mutex, self); |
270 | mutex->ctx = ctx; |
271 | } else { |
272 | qemu_co_mutex_lock_slowpath(ctx, mutex); |
273 | } |
274 | mutex->holder = self; |
275 | self->locks_held++; |
276 | } |
277 | |
278 | void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) |
279 | { |
280 | Coroutine *self = qemu_coroutine_self(); |
281 | |
282 | trace_qemu_co_mutex_unlock_entry(mutex, self); |
283 | |
284 | assert(mutex->locked); |
285 | assert(mutex->holder == self); |
286 | assert(qemu_in_coroutine()); |
287 | |
288 | mutex->ctx = NULL; |
289 | mutex->holder = NULL; |
290 | self->locks_held--; |
291 | if (atomic_fetch_dec(&mutex->locked) == 1) { |
292 | /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */ |
293 | return; |
294 | } |
295 | |
296 | for (;;) { |
297 | CoWaitRecord *to_wake = pop_waiter(mutex); |
298 | unsigned our_handoff; |
299 | |
300 | if (to_wake) { |
301 | qemu_co_mutex_wake(mutex, to_wake->co); |
302 | break; |
303 | } |
304 | |
305 | /* Some concurrent lock() is in progress (we know this because |
306 | * mutex->locked was >1) but it hasn't yet put itself on the wait |
307 | * queue. Pick a sequence number for the handoff protocol (not 0). |
308 | */ |
309 | if (++mutex->sequence == 0) { |
310 | mutex->sequence = 1; |
311 | } |
312 | |
313 | our_handoff = mutex->sequence; |
314 | atomic_mb_set(&mutex->handoff, our_handoff); |
315 | if (!has_waiters(mutex)) { |
316 | /* The concurrent lock has not added itself yet, so it |
317 | * will be able to pick our handoff. |
318 | */ |
319 | break; |
320 | } |
321 | |
322 | /* Try to do the handoff protocol ourselves; if somebody else has |
323 | * already taken it, however, we're done and they're responsible. |
324 | */ |
325 | if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) { |
326 | break; |
327 | } |
328 | } |
329 | |
330 | trace_qemu_co_mutex_unlock_return(mutex, self); |
331 | } |
332 | |
333 | void qemu_co_rwlock_init(CoRwlock *lock) |
334 | { |
335 | memset(lock, 0, sizeof(*lock)); |
336 | qemu_co_queue_init(&lock->queue); |
337 | qemu_co_mutex_init(&lock->mutex); |
338 | } |
339 | |
340 | void qemu_co_rwlock_rdlock(CoRwlock *lock) |
341 | { |
342 | Coroutine *self = qemu_coroutine_self(); |
343 | |
344 | qemu_co_mutex_lock(&lock->mutex); |
345 | /* For fairness, wait if a writer is in line. */ |
346 | while (lock->pending_writer) { |
347 | qemu_co_queue_wait(&lock->queue, &lock->mutex); |
348 | } |
349 | lock->reader++; |
350 | qemu_co_mutex_unlock(&lock->mutex); |
351 | |
352 | /* The rest of the read-side critical section is run without the mutex. */ |
353 | self->locks_held++; |
354 | } |
355 | |
356 | void qemu_co_rwlock_unlock(CoRwlock *lock) |
357 | { |
358 | Coroutine *self = qemu_coroutine_self(); |
359 | |
360 | assert(qemu_in_coroutine()); |
361 | if (!lock->reader) { |
362 | /* The critical section started in qemu_co_rwlock_wrlock. */ |
363 | qemu_co_queue_restart_all(&lock->queue); |
364 | } else { |
365 | self->locks_held--; |
366 | |
367 | qemu_co_mutex_lock(&lock->mutex); |
368 | lock->reader--; |
369 | assert(lock->reader >= 0); |
370 | /* Wakeup only one waiting writer */ |
371 | if (!lock->reader) { |
372 | qemu_co_queue_next(&lock->queue); |
373 | } |
374 | } |
375 | qemu_co_mutex_unlock(&lock->mutex); |
376 | } |
377 | |
378 | void qemu_co_rwlock_downgrade(CoRwlock *lock) |
379 | { |
380 | Coroutine *self = qemu_coroutine_self(); |
381 | |
382 | /* lock->mutex critical section started in qemu_co_rwlock_wrlock or |
383 | * qemu_co_rwlock_upgrade. |
384 | */ |
385 | assert(lock->reader == 0); |
386 | lock->reader++; |
387 | qemu_co_mutex_unlock(&lock->mutex); |
388 | |
389 | /* The rest of the read-side critical section is run without the mutex. */ |
390 | self->locks_held++; |
391 | } |
392 | |
393 | void qemu_co_rwlock_wrlock(CoRwlock *lock) |
394 | { |
395 | qemu_co_mutex_lock(&lock->mutex); |
396 | lock->pending_writer++; |
397 | while (lock->reader) { |
398 | qemu_co_queue_wait(&lock->queue, &lock->mutex); |
399 | } |
400 | lock->pending_writer--; |
401 | |
402 | /* The rest of the write-side critical section is run with |
403 | * the mutex taken, so that lock->reader remains zero. |
404 | * There is no need to update self->locks_held. |
405 | */ |
406 | } |
407 | |
408 | void qemu_co_rwlock_upgrade(CoRwlock *lock) |
409 | { |
410 | Coroutine *self = qemu_coroutine_self(); |
411 | |
412 | qemu_co_mutex_lock(&lock->mutex); |
413 | assert(lock->reader > 0); |
414 | lock->reader--; |
415 | lock->pending_writer++; |
416 | while (lock->reader) { |
417 | qemu_co_queue_wait(&lock->queue, &lock->mutex); |
418 | } |
419 | lock->pending_writer--; |
420 | |
421 | /* The rest of the write-side critical section is run with |
422 | * the mutex taken, similar to qemu_co_rwlock_wrlock. Do |
423 | * not account for the lock twice in self->locks_held. |
424 | */ |
425 | self->locks_held--; |
426 | } |
427 | |